diff --git a/src/include/storage/store/csr_node_group.h b/src/include/storage/store/csr_node_group.h index 87e29ff9dce..53363a68061 100644 --- a/src/include/storage/store/csr_node_group.h +++ b/src/include/storage/store/csr_node_group.h @@ -118,6 +118,7 @@ struct CSRNodeGroupScanState final : NodeGroupScanState { uint32_t nextCSRToScan; bool persistentInitialized = false; + common::sel_t prevCSREndOffset; // States at the csr list level. Cached during scan over a single csr list. CSRNodeGroupScanSource source = CSRNodeGroupScanSource::COMMITTED_PERSISTENT; @@ -133,6 +134,16 @@ struct CSRNodeGroupScanState final : NodeGroupScanState { persistentCSRLists.clear(); nextCSRToScan = 0; } + + common::sel_t getGap(common::offset_t offsetInGroup) { + common::sel_t result = 0; + if (source == CSRNodeGroupScanSource::COMMITTED_PERSISTENT) { + auto curCSRStartOffset = csrHeader->getStartCSROffset(offsetInGroup); + result = curCSRStartOffset - prevCSREndOffset; + prevCSREndOffset = curCSRStartOffset + csrHeader->getCSRLength(offsetInGroup); + } + return result; + } }; struct CSRNodeGroupCheckpointState final : NodeGroupCheckpointState { diff --git a/src/storage/store/csr_node_group.cpp b/src/storage/store/csr_node_group.cpp index 9a2ec65399c..7c805495c4d 100644 --- a/src/storage/store/csr_node_group.cpp +++ b/src/storage/store/csr_node_group.cpp @@ -21,6 +21,7 @@ void CSRNodeGroup::initializeScanState(Transaction* transaction, TableScanState& nodeGroupScanState.source = CSRNodeGroupScanSource::NONE; if (persistentChunkGroup && !nodeGroupScanState.persistentInitialized) { nodeGroupScanState.source = CSRNodeGroupScanSource::COMMITTED_PERSISTENT; + nodeGroupScanState.persistentInitialized = true; // Scan the csr header chunks from disk. initializePersistentCSRHeader(transaction, relScanState, nodeGroupScanState); // Queue persistent nodes to be scanned in the node group. @@ -120,36 +121,29 @@ length_t CSRNodeGroup::getCSRLength(CSRNodeGroupScanState& state, offset_t offse NodeGroupScanResult CSRNodeGroup::scan(Transaction* transaction, TableScanState& state) { const auto& relScanState = state.cast(); auto& nodeGroupScanState = relScanState.nodeGroupScanState->cast(); - while (true) { - switch (nodeGroupScanState.source) { - case CSRNodeGroupScanSource::COMMITTED_PERSISTENT: { - auto result = scanCommittedPersistent(transaction, relScanState, nodeGroupScanState); - if (result == NODE_GROUP_SCAN_EMMPTY_RESULT) { - initializeInMemScanState(state); - nodeGroupScanState.nextCSRToScan = 0; - continue; - } - return result; - } - case CSRNodeGroupScanSource::COMMITTED_IN_MEMORY: { - const auto result = - nodeGroupScanState.inMemCSRList.isSequential ? - scanCommittedInMemSequential(transaction, relScanState, nodeGroupScanState) : - scanCommittedInMemRandom(transaction, relScanState, nodeGroupScanState); - if (result == NODE_GROUP_SCAN_EMMPTY_RESULT) { - relScanState.IDVector->state->getSelVectorUnsafe().setSelSize(0); - return NODE_GROUP_SCAN_EMMPTY_RESULT; - } - return result; - } - case CSRNodeGroupScanSource::NONE: { + switch (nodeGroupScanState.source) { + case CSRNodeGroupScanSource::COMMITTED_PERSISTENT: { + auto result = scanCommittedPersistent(transaction, relScanState, nodeGroupScanState); + return result; + } + case CSRNodeGroupScanSource::COMMITTED_IN_MEMORY: { + const auto result = + nodeGroupScanState.inMemCSRList.isSequential ? + scanCommittedInMemSequential(transaction, relScanState, nodeGroupScanState) : + scanCommittedInMemRandom(transaction, relScanState, nodeGroupScanState); + if (result == NODE_GROUP_SCAN_EMMPTY_RESULT) { relScanState.IDVector->state->getSelVectorUnsafe().setSelSize(0); return NODE_GROUP_SCAN_EMMPTY_RESULT; } - default: { - KU_UNREACHABLE; - } - } + return result; + } + case CSRNodeGroupScanSource::NONE: { + relScanState.IDVector->state->getSelVectorUnsafe().setSelSize(0); + return NODE_GROUP_SCAN_EMMPTY_RESULT; + } + default: { + KU_UNREACHABLE; + } } } @@ -161,6 +155,7 @@ NodeGroupScanResult CSRNodeGroup::scanCommittedPersistent(const Transaction* tra const auto startRow = csrList.startRow + nodeGroupScanState.nextRowToScan; if (result.startRow == INVALID_ROW_IDX) { result.startRow = startRow; + nodeGroupScanState.prevCSREndOffset = startRow; } else if (startRow - result.startRow >= DEFAULT_VECTOR_CAPACITY) { break; } @@ -178,8 +173,10 @@ NodeGroupScanResult CSRNodeGroup::scanCommittedPersistent(const Transaction* tra break; } } - persistentChunkGroup->scan(transaction, tableState, nodeGroupScanState, result.startRow, - result.numRows); + if (result != NODE_GROUP_SCAN_EMMPTY_RESULT) { + persistentChunkGroup->scan(transaction, tableState, nodeGroupScanState, result.startRow, + result.numRows); + } return result; } diff --git a/src/storage/store/rel_table.cpp b/src/storage/store/rel_table.cpp index 25bd496c824..39d14bbcc27 100644 --- a/src/storage/store/rel_table.cpp +++ b/src/storage/store/rel_table.cpp @@ -119,19 +119,18 @@ bool RelTable::scanInternal(Transaction* transaction, TableScanState& scanState) auto startNodeOffset = StorageUtils::getStartOffsetOfNodeGroup(relScanState.nodeGroupIdx); auto& csrNodeGroupScanState = relScanState.nodeGroupScanState->cast(); + posInLastCSR = csrNodeGroupScanState.nextRowToScan; currCSRSize = relScanState.nodeGroup->cast().getCSRLength( csrNodeGroupScanState, curNodeOffset - startNodeOffset); - posInLastCSR = csrNodeGroupScanState.nextRowToScan; + // Accommodate for gaps in persistent data scan + relScanState.currentCSROffset += csrNodeGroupScanState.getGap(curNodeOffset - startNodeOffset); } break; case TableScanSource::UNCOMMITTED: { - currCSRSize = 0; posInLastCSR = relScanState.localTableScanState->nextRowToScan; auto localTable = relScanState.localTableScanState->localRelTable; auto& index = relScanState.direction == RelDataDirection::FWD ? localTable->getFWDIndex() : localTable->getBWDIndex(); - if (index.contains(curNodeOffset)) { - currCSRSize = index[curNodeOffset].size(); - } + currCSRSize = index.contains(curNodeOffset) ? index[curNodeOffset].size() : 0; } break; case TableScanSource::NONE: { return false; @@ -141,7 +140,7 @@ bool RelTable::scanInternal(Transaction* transaction, TableScanState& scanState) } } KU_ASSERT(currCSRSize != INVALID_OFFSET); - // We rescan after last batch is all processed (usually 2048 tuples) + // We rescan after last batch is all processed (max 2048 tuples) if (relScanState.currentCSROffset >= relScanState.batchSize) { if (!scanNext(transaction, relScanState)) { return false;