Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
406 changes: 406 additions & 0 deletions src/btree.c

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions src/btree.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,44 @@ btree_iterator_init(cache *cc,
bool32 do_prefetch,
uint32 height);

// clang-format off
DEFINE_ASYNC_STATE(btree_iterator_async_state, 5,
param, cache *, cc,
param, const btree_config *, cfg,
param, btree_iterator *, itor,
param, uint64, root_addr,
param, page_type, type,
param, comparison, min_key_comparison,
param, key, min_key,
param, comparison, max_key_comparison,
param, key, max_key,
param, comparison, start_type,
param, key, start_key,
param, bool32, do_prefetch,
param, uint32, height,
param, async_callback_fn, callback,
param, void *, callback_arg,
local, platform_status, __async_result,
local, btree_lookup_async_state, lookup_state,
local, page_get_async_state_buffer, cache_get_state,
local, btree_node, end,
local, key, target,
local, comparison, position_rule,
local, bool32, found,
local, bool32, forward,
local, int64, tmp,
local, uint64, curr_addr,
local, uint64, last_addr,
local, uint64, next_addr,
local, uint64, prev_addr,
local, uint64, num_entries,
local, key, first_key,
local, key, last_key)
// clang-format on

async_status
btree_iterator_init_async(btree_iterator_async_state *state);

void
btree_iterator_deinit(btree_iterator *itor);

Expand Down
238 changes: 156 additions & 82 deletions src/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -793,57 +793,104 @@ core_lookup_from_memtable_generation_locked(core_handle *spl,
return STATUS_OK;
}

/*
* Branch iterator wrapper functions
*/
typedef struct core_btree_iterator_init_async_context {
btree_iterator_async_state state;
bool32 ready;
bool32 done;
} core_btree_iterator_init_async_context;

static void
core_branch_iterator_init(core_handle *spl,
btree_iterator *itor,
uint64 branch_addr,
comparison min_key_comparison,
key min_key,
comparison max_key_comparison,
key max_key,
comparison start_key_comparison,
key start_key,
bool32 do_prefetch,
bool32 should_inc_ref)
core_btree_iterator_init_async_callback(void *arg)
{
core_btree_iterator_init_async_context *ctxt = arg;
ctxt->ready = TRUE;
}

static platform_status
core_start_btree_iterator_init_async(
core_handle *spl,
core_btree_iterator_init_async_context *ctxt,
btree_iterator *itor,
uint64 root_addr,
page_type page_type,
comparison min_key_comparison,
key min_key,
comparison max_key_comparison,
key max_key,
comparison start_key_comparison,
key start_key,
bool32 do_prefetch)
{
cache *cc = spl->cc;
btree_config *btree_cfg = spl->cfg.btree_cfg;
if (branch_addr != 0 && should_inc_ref) {
btree_inc_ref(cc, btree_cfg, branch_addr);
btree_iterator_async_state_init(&ctxt->state,
spl->cc,
spl->cfg.btree_cfg,
itor,
root_addr,
page_type,
min_key_comparison,
min_key,
max_key_comparison,
max_key,
start_key_comparison,
start_key,
do_prefetch,
0,
core_btree_iterator_init_async_callback,
ctxt);
ctxt->ready = FALSE;
ctxt->done = FALSE;

if (btree_iterator_init_async(&ctxt->state) == ASYNC_STATUS_DONE) {
ctxt->done = TRUE;
return async_result(&ctxt->state);
}
btree_iterator_init(cc,
btree_cfg,
itor,
branch_addr,
PAGE_TYPE_BRANCH,
min_key_comparison,
min_key,
max_key_comparison,
max_key,
start_key_comparison,
start_key,
do_prefetch,
0);

return STATUS_OK;
}

static void
core_branch_iterator_deinit(core_handle *spl,
btree_iterator *itor,
bool32 should_dec_ref)
static platform_status
core_drain_btree_iterator_init_async(
cache *cc,
core_btree_iterator_init_async_context *ctxt,
uint64 num_inits)
{
if (itor->root_addr == 0) {
return;
platform_status result = STATUS_OK;
uint64 done_count = 0;
for (uint64 i = 0; i < num_inits; i++) {
if (ctxt[i].done) {
done_count++;
platform_status rc = async_result(&ctxt[i].state);
if (!SUCCESS(rc) && SUCCESS(result)) {
result = rc;
}
}
}
cache *cc = spl->cc;
btree_config *btree_cfg = spl->cfg.btree_cfg;
btree_iterator_deinit(itor);
if (should_dec_ref) {
btree_dec_ref(cc, btree_cfg, itor->root_addr, PAGE_TYPE_BRANCH);

while (done_count < num_inits) {
bool32 made_progress = FALSE;
for (uint64 i = 0; i < num_inits; i++) {
if (ctxt[i].done || !ctxt[i].ready) {
continue;
}

ctxt[i].ready = FALSE;
made_progress = TRUE;
if (btree_iterator_init_async(&ctxt[i].state) == ASYNC_STATUS_DONE) {
ctxt[i].done = TRUE;
done_count++;
platform_status rc = async_result(&ctxt[i].state);
if (!SUCCESS(rc) && SUCCESS(result)) {
result = rc;
}
}
}

if (!made_progress) {
cache_cleanup(cc);
}
}

return result;
}

/*
Expand Down Expand Up @@ -918,6 +965,8 @@ core_range_iterator_init(core_handle *spl,
range_itor->can_next = TRUE;
range_itor->min_key_comparison = min_key_comparison;
range_itor->max_key_comparison = max_key_comparison;
ZERO_ARRAY(range_itor->compacted);
ZERO_ARRAY(range_itor->btree_itor_initialized);

key_buffer_init(&range_itor->min_key, PROCESS_PRIVATE_HEAP_ID);
key_buffer_init(&range_itor->max_key, PROCESS_PRIVATE_HEAP_ID);
Expand Down Expand Up @@ -964,9 +1013,6 @@ core_range_iterator_init(core_handle *spl,
return rc;
}


ZERO_ARRAY(range_itor->compacted);

// grab the lookup lock
memtable_begin_lookup(&spl->mt_ctxt);

Expand Down Expand Up @@ -1061,43 +1107,68 @@ core_range_iterator_init(core_handle *spl,
}
}

core_btree_iterator_init_async_context *init_ctxt = NULL;
if (range_itor->num_branches != 0) {
init_ctxt = TYPED_ARRAY_ZALLOC(
PROCESS_PRIVATE_HEAP_ID, init_ctxt, range_itor->num_branches);
}
if (range_itor->num_branches != 0 && init_ctxt == NULL) {
core_range_iterator_deinit(range_itor);
return STATUS_NO_MEMORY;
}

uint64 started_inits = 0;
for (uint64 i = 0; i < range_itor->num_branches; i++) {
uint64 branch_no = range_itor->num_branches - i - 1;
btree_iterator *btree_itor = &range_itor->btree_itor[branch_no];
uint64 branch_addr = range_itor->branch[branch_no].addr;
page_type page_type = range_itor->branch[branch_no].type;
bool32 do_prefetch = FALSE;
if (range_itor->compacted[branch_no]) {
bool32 do_prefetch =
do_prefetch =
range_itor->compacted[branch_no] && num_tuples > CORE_PREFETCH_MIN
? TRUE
: FALSE;
core_branch_iterator_init(spl,
btree_itor,
branch_addr,
range_itor->local_min_key_comparison,
key_buffer_key(&range_itor->local_min_key),
range_itor->local_max_key_comparison,
key_buffer_key(&range_itor->local_max_key),
start_key_comparison,
start_key,
do_prefetch,
FALSE);
} else {
bool32 is_live = branch_no == 0;
core_memtable_iterator_init(spl,
btree_itor,
branch_addr,
range_itor->local_min_key_comparison,
key_buffer_key(&range_itor->local_min_key),
range_itor->local_max_key_comparison,
key_buffer_key(&range_itor->local_max_key),
start_key_comparison,
start_key,
is_live,
FALSE);
}
rc = core_start_btree_iterator_init_async(
spl,
&init_ctxt[i],
btree_itor,
branch_addr,
page_type,
range_itor->local_min_key_comparison,
key_buffer_key(&range_itor->local_min_key),
range_itor->local_max_key_comparison,
key_buffer_key(&range_itor->local_max_key),
start_key_comparison,
start_key,
do_prefetch);
started_inits++;
if (!SUCCESS(rc)) {
break;
}
range_itor->itor[i] = &btree_itor->super;
}

platform_status drain_rc =
core_drain_btree_iterator_init_async(spl->cc, init_ctxt, started_inits);
if (SUCCESS(rc)) {
rc = drain_rc;
}
for (uint64 i = 0; i < started_inits; i++) {
if (init_ctxt[i].done) {
uint64 branch_no = range_itor->num_branches - i - 1;
range_itor->btree_itor_initialized[branch_no] = TRUE;
}
}
if (init_ctxt != NULL) {
platform_free(PROCESS_PRIVATE_HEAP_ID, init_ctxt);
}
if (!SUCCESS(rc)) {
core_range_iterator_deinit(range_itor);
return rc;
}

rc = merge_iterator_create(PROCESS_PRIVATE_HEAP_ID,
spl->cfg.data_cfg,
range_itor->num_branches,
Expand Down Expand Up @@ -1343,18 +1414,21 @@ core_range_iterator_deinit(core_range_iterator *range_itor)
core_handle *spl = range_itor->spl;
if (range_itor->merge_itor != NULL) {
merge_iterator_destroy(PROCESS_PRIVATE_HEAP_ID, &range_itor->merge_itor);
for (uint64 i = 0; i < range_itor->num_branches; i++) {
btree_iterator *btree_itor = &range_itor->btree_itor[i];
if (range_itor->compacted[i]) {
uint64 root_addr = btree_itor->root_addr;
core_branch_iterator_deinit(spl, btree_itor, FALSE);
btree_dec_ref(
spl->cc, spl->cfg.btree_cfg, root_addr, PAGE_TYPE_BRANCH);
} else {
uint64 mt_gen = range_itor->memtable_start_gen - i;
core_memtable_iterator_deinit(spl, btree_itor, mt_gen, FALSE);
core_memtable_dec_ref(spl, mt_gen);
}
}
for (uint64 i = 0; i < range_itor->num_branches; i++) {
btree_iterator *btree_itor = &range_itor->btree_itor[i];
if (range_itor->btree_itor_initialized[i]) {
btree_iterator_deinit(btree_itor);
range_itor->btree_itor_initialized[i] = FALSE;
}
if (range_itor->compacted[i]) {
btree_dec_ref(spl->cc,
spl->cfg.btree_cfg,
range_itor->branch[i].addr,
PAGE_TYPE_BRANCH);
} else {
uint64 mt_gen = range_itor->memtable_start_gen - i;
core_memtable_dec_ref(spl, mt_gen);
}
}
key_buffer_deinit(&range_itor->min_key);
Expand Down
1 change: 1 addition & 0 deletions src/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ typedef struct core_range_iterator {
comparison local_max_key_comparison;
bool32 local_max_key_truncated;
btree_iterator btree_itor[CORE_RANGE_ITOR_MAX_BRANCHES];
bool32 btree_itor_initialized[CORE_RANGE_ITOR_MAX_BRANCHES];
trunk_branch_info branch[CORE_RANGE_ITOR_MAX_BRANCHES];

// used for merge iterator construction
Expand Down
43 changes: 40 additions & 3 deletions src/merge.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,48 @@ bsearch_insert(register const ordered_iterator *key,
register ordered_iterator **p;
bool32 prev_equal = FALSE;
bool32 next_equal = FALSE;
bool32 keys_equal = FALSE;

if (nmemb == 0) {
*prev_equal_out = FALSE;
*next_equal_out = FALSE;
return -1;
}

size_t nrel = 1;

while (nrel <= nmemb && nrel < 4) {
cmp = bsearch_comp(key, base[nrel - 1], forwards, cfg, &keys_equal);
if (cmp <= 0) {
*prev_equal_out = prev_equal;
*next_equal_out = keys_equal;
return nrel - 2;
}
prev_equal |= keys_equal;
nrel++;
}

if (nrel > nmemb) {
*prev_equal_out = prev_equal;
*next_equal_out = FALSE;
return nmemb - 1;
}

if (4 <= nrel) {
while (nrel <= nmemb
&& bsearch_comp(key, base[nrel - 1], forwards, cfg, &keys_equal)
> 0)
{
nrel *= 2;
}
}

if (nmemb < nrel) {
nrel = nmemb;
}

for (lim = nmemb; lim != 0; lim >>= 1) {
p = base + (lim >> 1);
bool32 keys_equal;
for (lim = nrel; lim != 0; lim >>= 1) {
p = base + (lim >> 1);
cmp = bsearch_comp(key, *p, forwards, cfg, &keys_equal);
debug_assert(cmp != 0);

Expand Down
Loading
Loading