Skip to content

Commit

Permalink
add gap accommodation
Browse files Browse the repository at this point in the history
  • Loading branch information
yiyun-sj committed Aug 7, 2024
1 parent f71b0ab commit d520e44
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 35 deletions.
11 changes: 11 additions & 0 deletions src/include/storage/store/csr_node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ struct CSRNodeGroupScanState final : NodeGroupScanState {
uint32_t nextCSRToScan;

bool persistentInitialized = false;
common::sel_t prevCSREndOffset;
// States at the csr list level. Cached during scan over a single csr list.
CSRNodeGroupScanSource source = CSRNodeGroupScanSource::COMMITTED_PERSISTENT;

Expand All @@ -133,6 +134,16 @@ struct CSRNodeGroupScanState final : NodeGroupScanState {
persistentCSRLists.clear();
nextCSRToScan = 0;
}

common::sel_t getGap(common::offset_t offsetInGroup) {
common::sel_t result = 0;
if (source == CSRNodeGroupScanSource::COMMITTED_PERSISTENT) {
auto curCSRStartOffset = csrHeader->getStartCSROffset(offsetInGroup);
result = curCSRStartOffset - prevCSREndOffset;
prevCSREndOffset = curCSRStartOffset + csrHeader->getCSRLength(offsetInGroup);
}
return result;
}
};

struct CSRNodeGroupCheckpointState final : NodeGroupCheckpointState {
Expand Down
55 changes: 26 additions & 29 deletions src/storage/store/csr_node_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ void CSRNodeGroup::initializeScanState(Transaction* transaction, TableScanState&
nodeGroupScanState.source = CSRNodeGroupScanSource::NONE;
if (persistentChunkGroup && !nodeGroupScanState.persistentInitialized) {
nodeGroupScanState.source = CSRNodeGroupScanSource::COMMITTED_PERSISTENT;
nodeGroupScanState.persistentInitialized = true;
// Scan the csr header chunks from disk.
initializePersistentCSRHeader(transaction, relScanState, nodeGroupScanState);
// Queue persistent nodes to be scanned in the node group.
Expand Down Expand Up @@ -120,36 +121,29 @@ length_t CSRNodeGroup::getCSRLength(CSRNodeGroupScanState& state, offset_t offse
NodeGroupScanResult CSRNodeGroup::scan(Transaction* transaction, TableScanState& state) {
const auto& relScanState = state.cast<RelTableScanState>();
auto& nodeGroupScanState = relScanState.nodeGroupScanState->cast<CSRNodeGroupScanState>();
while (true) {
switch (nodeGroupScanState.source) {
case CSRNodeGroupScanSource::COMMITTED_PERSISTENT: {
auto result = scanCommittedPersistent(transaction, relScanState, nodeGroupScanState);
if (result == NODE_GROUP_SCAN_EMMPTY_RESULT) {
initializeInMemScanState(state);
nodeGroupScanState.nextCSRToScan = 0;
continue;
}
return result;
}
case CSRNodeGroupScanSource::COMMITTED_IN_MEMORY: {
const auto result =
nodeGroupScanState.inMemCSRList.isSequential ?
scanCommittedInMemSequential(transaction, relScanState, nodeGroupScanState) :
scanCommittedInMemRandom(transaction, relScanState, nodeGroupScanState);
if (result == NODE_GROUP_SCAN_EMMPTY_RESULT) {
relScanState.IDVector->state->getSelVectorUnsafe().setSelSize(0);
return NODE_GROUP_SCAN_EMMPTY_RESULT;
}
return result;
}
case CSRNodeGroupScanSource::NONE: {
switch (nodeGroupScanState.source) {
case CSRNodeGroupScanSource::COMMITTED_PERSISTENT: {
auto result = scanCommittedPersistent(transaction, relScanState, nodeGroupScanState);
return result;
}
case CSRNodeGroupScanSource::COMMITTED_IN_MEMORY: {
const auto result =
nodeGroupScanState.inMemCSRList.isSequential ?
scanCommittedInMemSequential(transaction, relScanState, nodeGroupScanState) :
scanCommittedInMemRandom(transaction, relScanState, nodeGroupScanState);
if (result == NODE_GROUP_SCAN_EMMPTY_RESULT) {
relScanState.IDVector->state->getSelVectorUnsafe().setSelSize(0);
return NODE_GROUP_SCAN_EMMPTY_RESULT;
}
default: {
KU_UNREACHABLE;
}
}
return result;
}
case CSRNodeGroupScanSource::NONE: {
relScanState.IDVector->state->getSelVectorUnsafe().setSelSize(0);
return NODE_GROUP_SCAN_EMMPTY_RESULT;
}
default: {
KU_UNREACHABLE;
}
}
}

Expand All @@ -161,6 +155,7 @@ NodeGroupScanResult CSRNodeGroup::scanCommittedPersistent(const Transaction* tra
const auto startRow = csrList.startRow + nodeGroupScanState.nextRowToScan;
if (result.startRow == INVALID_ROW_IDX) {
result.startRow = startRow;
nodeGroupScanState.prevCSREndOffset = startRow;
} else if (startRow - result.startRow >= DEFAULT_VECTOR_CAPACITY) {
break;
}
Expand All @@ -178,8 +173,10 @@ NodeGroupScanResult CSRNodeGroup::scanCommittedPersistent(const Transaction* tra
break;
}
}
persistentChunkGroup->scan(transaction, tableState, nodeGroupScanState, result.startRow,
result.numRows);
if (result != NODE_GROUP_SCAN_EMMPTY_RESULT) {
persistentChunkGroup->scan(transaction, tableState, nodeGroupScanState, result.startRow,
result.numRows);
}
return result;
}

Expand Down
11 changes: 5 additions & 6 deletions src/storage/store/rel_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,18 @@ bool RelTable::scanInternal(Transaction* transaction, TableScanState& scanState)
auto startNodeOffset = StorageUtils::getStartOffsetOfNodeGroup(relScanState.nodeGroupIdx);
auto& csrNodeGroupScanState =
relScanState.nodeGroupScanState->cast<CSRNodeGroupScanState>();
posInLastCSR = csrNodeGroupScanState.nextRowToScan;
currCSRSize = relScanState.nodeGroup->cast<CSRNodeGroup>().getCSRLength(
csrNodeGroupScanState, curNodeOffset - startNodeOffset);
posInLastCSR = csrNodeGroupScanState.nextRowToScan;
// Accommodate for gaps in persistent data scan
relScanState.currentCSROffset += csrNodeGroupScanState.getGap(curNodeOffset - startNodeOffset);
} break;
case TableScanSource::UNCOMMITTED: {
currCSRSize = 0;
posInLastCSR = relScanState.localTableScanState->nextRowToScan;
auto localTable = relScanState.localTableScanState->localRelTable;
auto& index = relScanState.direction == RelDataDirection::FWD ? localTable->getFWDIndex() :
localTable->getBWDIndex();
if (index.contains(curNodeOffset)) {
currCSRSize = index[curNodeOffset].size();
}
currCSRSize = index.contains(curNodeOffset) ? index[curNodeOffset].size() : 0;
} break;
case TableScanSource::NONE: {
return false;
Expand All @@ -141,7 +140,7 @@ bool RelTable::scanInternal(Transaction* transaction, TableScanState& scanState)
}
}
KU_ASSERT(currCSRSize != INVALID_OFFSET);
// We rescan after last batch is all processed (usually 2048 tuples)
// We rescan after last batch is all processed (max 2048 tuples)
if (relScanState.currentCSROffset >= relScanState.batchSize) {
if (!scanNext(transaction, relScanState)) {
return false;
Expand Down

0 comments on commit d520e44

Please sign in to comment.