Skip to content

Commit

Permalink
decouple persistent and in memory logic
Browse files Browse the repository at this point in the history
  • Loading branch information
yiyun-sj committed Aug 7, 2024
1 parent d0cac92 commit d652bfe
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 116 deletions.
13 changes: 5 additions & 8 deletions src/include/storage/store/csr_node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,11 @@ struct CSRNodeGroupScanState final : NodeGroupScanState {
// group. State for reading from checkpointed node group.
std::unique_ptr<ChunkedCSRHeader> csrHeader;
std::vector<csr_list_t> persistentCSRLists;
std::vector<NodeCSRIndex> inMemCSRLists;
NodeCSRIndex inMemCSRList;
// position in vector of either csr list/index vector
uint32_t nextCSRToScan;

bool persistentInitialized = false;
// States at the csr list level. Cached during scan over a single csr list.
CSRNodeGroupScanSource source = CSRNodeGroupScanSource::COMMITTED_PERSISTENT;

Expand All @@ -130,7 +131,6 @@ struct CSRNodeGroupScanState final : NodeGroupScanState {
csrHeader->resetToEmpty();
source = CSRNodeGroupScanSource::COMMITTED_IN_MEMORY;
persistentCSRLists.clear();
inMemCSRLists.clear();
nextCSRToScan = 0;
}
};
Expand Down Expand Up @@ -213,20 +213,17 @@ class CSRNodeGroup final : public NodeGroup {
private:
void initializePersistentCSRHeader(transaction::Transaction* transaction,
RelTableScanState& relScanState, CSRNodeGroupScanState& nodeGroupScanState) const;
void initializeInMemScanState(TableScanState& state);

void updateCSRIndex(common::offset_t boundNodeOffsetInGroup, common::row_idx_t startRow,
common::length_t length) const;

NodeGroupScanResult scanCommittedPersistent(const transaction::Transaction* transaction,
const RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState) const;
NodeGroupScanResult scanCommittedInMem(transaction::Transaction* transaction,
const RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState);
NodeGroupScanResult scanCommittedInMemSequential(const transaction::Transaction* transaction,
const RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState,
common::row_idx_t numScanned);
const RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState);
NodeGroupScanResult scanCommittedInMemRandom(transaction::Transaction* transaction,
const RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState,
common::row_idx_t numScanned);
const RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState);

void checkpointInMemOnly(const common::UniqLock& lock, NodeGroupCheckpointState& state);
void checkpointInMemAndOnDisk(const common::UniqLock& lock, NodeGroupCheckpointState& state);
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct RelTableScanState : TableScanState {
Column* csrOffsetColumn;
Column* csrLengthColumn;

bool resetCommitted = false;
common::sel_t currNodeIdx = 0;
common::sel_t endNodeIdx = 0;
common::sel_t totalNodeIdx = 0;
Expand Down
165 changes: 72 additions & 93 deletions src/storage/store/csr_node_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,47 +13,58 @@ namespace storage {
void CSRNodeGroup::initializeScanState(Transaction* transaction, TableScanState& state) {
auto& relScanState = state.cast<RelTableScanState>();
auto& nodeSelVector = relScanState.boundNodeIDVector->state->getSelVector();
KU_ASSERT(nodeGroupIdx == StorageUtils::getNodeGroupIdx(
relScanState.boundNodeIDVector->readNodeOffset(nodeSelVector[relScanState.currNodeIdx])
));
KU_ASSERT(relScanState.nodeGroupScanState);
auto& nodeGroupScanState = relScanState.nodeGroupScanState->cast<CSRNodeGroupScanState>();
relScanState.nodeGroupScanState->resetState();
relScanState.nodeGroupIdx = nodeGroupIdx;
// Scan the csr header chunks from disk.
if (persistentChunkGroup) {
initializePersistentCSRHeader(transaction, relScanState, nodeGroupScanState);
}
// Queue all nodes to be scanned in the node group.
nodeGroupScanState.nextRowToScan = 0;
const auto startNodeOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
for (auto i = relScanState.currNodeIdx; i < relScanState.endNodeIdx; i++) {
const auto offsetInGroup =
relScanState.boundNodeIDVector->readNodeOffset(nodeSelVector[i]) - startNodeOffset;
if (persistentChunkGroup) {
nodeGroupScanState.source = CSRNodeGroupScanSource::NONE;
if (persistentChunkGroup && !nodeGroupScanState.persistentInitialized) {
nodeGroupScanState.source = CSRNodeGroupScanSource::COMMITTED_PERSISTENT;
// Scan the csr header chunks from disk.
initializePersistentCSRHeader(transaction, relScanState, nodeGroupScanState);
// Queue persistent nodes to be scanned in the node group.
while (relScanState.endNodeIdx < relScanState.totalNodeIdx) {
auto nodeOffset =
relScanState.boundNodeIDVector->readNodeOffset(nodeSelVector[relScanState.endNodeIdx]);
if (nodeOffset >= StorageConstants::MAX_NUM_ROWS_IN_TABLE) {
break;
}
// Node table scan should only give node offsets within the same nodegroup
KU_ASSERT(nodeGroupIdx == StorageUtils::getNodeGroupIdx(nodeOffset));
const auto offsetInGroup = nodeOffset - startNodeOffset;
auto offset = nodeGroupScanState.csrHeader->getStartCSROffset(offsetInGroup);
auto length = nodeGroupScanState.csrHeader->getCSRLength(offsetInGroup);
if (length > 0) {
nodeGroupScanState.persistentCSRLists.emplace_back(offset, length);
}
nodeGroupScanState.persistentCSRLists.emplace_back(offset, length);
relScanState.endNodeIdx++;
}
if (csrIndex) {
auto& index = csrIndex->indices[offsetInGroup];
if (!index.isSequential) {
KU_ASSERT(std::is_sorted(index.rowIndices.begin(), index.rowIndices.end()));
}
if (index.rowIndices.size() > 0) {
nodeGroupScanState.inMemCSRLists.push_back(index);
}
// after scanning all persistent data, reset nodeIdxs back to 0 for in memory data
relScanState.resetCommitted = true;
}
} else if (csrIndex) {
// Either we only have in memory data to scan or we have already scanned the persistent data
initializeInMemScanState(state);
}
if (!nodeGroupScanState.persistentCSRLists.empty()) {
nodeGroupScanState.source = CSRNodeGroupScanSource::COMMITTED_PERSISTENT;
} else if (!nodeGroupScanState.inMemCSRLists.empty()) {
nodeGroupScanState.source = CSRNodeGroupScanSource::COMMITTED_IN_MEMORY;
} else {
nodeGroupScanState.source = CSRNodeGroupScanSource::NONE;
}

void CSRNodeGroup::initializeInMemScanState(TableScanState& state) {
// We do not batch in memory scans
auto& relScanState = state.cast<RelTableScanState>();
auto& nodeSelVector = relScanState.boundNodeIDVector->state->getSelVector();
KU_ASSERT(csrIndex);
KU_ASSERT(relScanState.nodeGroupScanState);
auto& nodeGroupScanState = relScanState.nodeGroupScanState->cast<CSRNodeGroupScanState>();
const auto startNodeOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
const auto offsetInGroup = relScanState.boundNodeIDVector->readNodeOffset(
nodeSelVector[relScanState.endNodeIdx++]) - startNodeOffset;
nodeGroupScanState.inMemCSRList = csrIndex->indices[offsetInGroup];
if (!nodeGroupScanState.inMemCSRList.isSequential) {
KU_ASSERT(std::is_sorted(nodeGroupScanState.inMemCSRList.rowIndices.begin(),
nodeGroupScanState.inMemCSRList.rowIndices.end()));
}
nodeGroupScanState.source = nodeGroupScanState.inMemCSRList.rowIndices.size() > 0 ?
CSRNodeGroupScanSource::COMMITTED_IN_MEMORY : CSRNodeGroupScanSource::NONE;
}

void CSRNodeGroup::initializePersistentCSRHeader(Transaction* transaction,
Expand Down Expand Up @@ -112,16 +123,16 @@ NodeGroupScanResult CSRNodeGroup::scan(Transaction* transaction, TableScanState&
case CSRNodeGroupScanSource::COMMITTED_PERSISTENT: {
auto result = scanCommittedPersistent(transaction, relScanState, nodeGroupScanState);
if (result == NODE_GROUP_SCAN_EMMPTY_RESULT) {
nodeGroupScanState.source = nodeGroupScanState.inMemCSRLists.empty() ?
CSRNodeGroupScanSource::NONE :
CSRNodeGroupScanSource::COMMITTED_IN_MEMORY;
initializeInMemScanState(state);
nodeGroupScanState.nextCSRToScan = 0;
continue;
}
return result;
}
case CSRNodeGroupScanSource::COMMITTED_IN_MEMORY: {
const auto result = scanCommittedInMem(transaction, relScanState, nodeGroupScanState);
const auto result = nodeGroupScanState.inMemCSRList.isSequential ?
scanCommittedInMemSequential(transaction, relScanState, nodeGroupScanState) :
scanCommittedInMemRandom(transaction, relScanState, nodeGroupScanState);
if (result == NODE_GROUP_SCAN_EMMPTY_RESULT) {
relScanState.IDVector->state->getSelVectorUnsafe().setSelSize(0);
return NODE_GROUP_SCAN_EMMPTY_RESULT;
Expand Down Expand Up @@ -165,71 +176,44 @@ NodeGroupScanResult CSRNodeGroup::scanCommittedPersistent(const Transaction* tra
return result;
}

NodeGroupScanResult CSRNodeGroup::scanCommittedInMem(Transaction* transaction,
const RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState) {
auto result = NODE_GROUP_SCAN_EMMPTY_RESULT;
while (nodeGroupScanState.nextCSRToScan < nodeGroupScanState.inMemCSRLists.size()) {
auto& index = nodeGroupScanState.inMemCSRLists[nodeGroupScanState.nextCSRToScan];
auto current = index.isSequential ? scanCommittedInMemSequential(transaction, tableState,
nodeGroupScanState, result.numRows) :
scanCommittedInMemRandom(transaction, tableState,
nodeGroupScanState, result.numRows);
if (result.startRow == INVALID_ROW_IDX) {
result.startRow = current.startRow;
}
result.numRows += current.numRows;
if (result.numRows == DEFAULT_VECTOR_CAPACITY) {
break;
}
}
return result;
}

NodeGroupScanResult CSRNodeGroup::scanCommittedInMemSequential(const Transaction* transaction,
const RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState,
row_idx_t numScanned) {
auto& index = nodeGroupScanState.inMemCSRLists[nodeGroupScanState.nextCSRToScan];
const auto startRow = index.rowIndices[0] + nodeGroupScanState.nextRowToScan;
const auto numRows = std::min(index.rowIndices[1] - nodeGroupScanState.nextRowToScan,
DEFAULT_VECTOR_CAPACITY - numScanned);
KU_ASSERT(numRows > 0);
row_idx_t rowsScanned = 0;
while (rowsScanned < numRows) {
auto [chunkIdx, startRowInChunk] = StorageUtils::getQuotientRemainder(
startRow + rowsScanned, ChunkedNodeGroup::CHUNK_CAPACITY);
rowsScanned +=
std::min(numRows - rowsScanned, ChunkedNodeGroup::CHUNK_CAPACITY - startRowInChunk);
ChunkedNodeGroup* chunkedGroup;
{
const auto lock = chunkedGroups.lock();
chunkedGroup = chunkedGroups.getGroup(lock, chunkIdx);
}
chunkedGroup->scan(transaction, tableState, nodeGroupScanState, startRowInChunk,
numRows - rowsScanned);
}
KU_ASSERT(rowsScanned == numRows);
if (numRows == index.rowIndices[1] - nodeGroupScanState.nextRowToScan) {
nodeGroupScanState.nextCSRToScan++;
nodeGroupScanState.nextRowToScan = 0;
} else {
nodeGroupScanState.nextRowToScan += numRows;
const RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState) {
const auto startRow =
nodeGroupScanState.inMemCSRList.rowIndices[0] + nodeGroupScanState.nextRowToScan;
auto numRows =
std::min(nodeGroupScanState.inMemCSRList.rowIndices[1] - nodeGroupScanState.nextRowToScan,
DEFAULT_VECTOR_CAPACITY);
auto [chunkIdx, startRowInChunk] =
StorageUtils::getQuotientRemainder(startRow, ChunkedNodeGroup::CHUNK_CAPACITY);
numRows = std::min(numRows, ChunkedNodeGroup::CHUNK_CAPACITY - startRowInChunk);
if (numRows == 0) {
return NODE_GROUP_SCAN_EMMPTY_RESULT;
}
ChunkedNodeGroup* chunkedGroup;
{
const auto lock = chunkedGroups.lock();
chunkedGroup = chunkedGroups.getGroup(lock, chunkIdx);
}
chunkedGroup->scan(transaction, tableState, nodeGroupScanState, startRowInChunk, numRows);
nodeGroupScanState.nextRowToScan += numRows;
return NodeGroupScanResult{startRow, numRows};
}

NodeGroupScanResult CSRNodeGroup::scanCommittedInMemRandom(Transaction* transaction,
const RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState,
row_idx_t numScanned) {
auto& index = nodeGroupScanState.inMemCSRLists[nodeGroupScanState.nextCSRToScan];
const auto numRows = std::min(index.rowIndices.size() - nodeGroupScanState.nextRowToScan,
DEFAULT_VECTOR_CAPACITY - numScanned);
KU_ASSERT(numRows > 0);
const RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState) {
const auto numRows = std::min(nodeGroupScanState.inMemCSRList.rowIndices.size() -
nodeGroupScanState.nextRowToScan,
DEFAULT_VECTOR_CAPACITY);
if (numRows == 0) {
return NODE_GROUP_SCAN_EMMPTY_RESULT;
}
row_idx_t nextRow = 0;
ChunkedNodeGroup* chunkedGroup = nullptr;
node_group_idx_t currentChunkIdx = INVALID_NODE_GROUP_IDX;
sel_t numSelected = 0;
while (nextRow < numRows) {
const auto rowIdx = index.rowIndices[nextRow + nodeGroupScanState.nextRowToScan];
const auto rowIdx =
nodeGroupScanState.inMemCSRList.rowIndices[nextRow + nodeGroupScanState.nextRowToScan];
auto [chunkIdx, rowInChunk] =
StorageUtils::getQuotientRemainder(rowIdx, DEFAULT_VECTOR_CAPACITY);
if (chunkIdx != currentChunkIdx) {
Expand All @@ -242,12 +226,7 @@ NodeGroupScanResult CSRNodeGroup::scanCommittedInMemRandom(Transaction* transact
numSelected);
nextRow++;
}
if (numRows == index.rowIndices.size() - nodeGroupScanState.nextRowToScan) {
nodeGroupScanState.nextCSRToScan++;
nodeGroupScanState.nextRowToScan = 0;
} else {
nodeGroupScanState.nextRowToScan += numRows;
}
nodeGroupScanState.nextRowToScan += numRows;
tableState.IDVector->state->getSelVectorUnsafe().setSelSize(numSelected);
return NodeGroupScanResult{0, numRows};
}
Expand Down
21 changes: 6 additions & 15 deletions src/storage/store/rel_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ std::unique_ptr<RelTable> RelTable::loadTable(Deserializer& deSer, const Catalog
void RelTable::initializeScanState(Transaction* transaction, TableScanState& scanState) {
// Scan always start with committed data first.
auto& relScanState = scanState.cast<RelTableScanState>();

auto& nodeSelVector = relScanState.boundNodeIDVector->state->getSelVector();
relScanState.totalNodeIdx = nodeSelVector.getSelSize();
KU_ASSERT(relScanState.totalNodeIdx > 0);
Expand All @@ -73,23 +72,9 @@ void RelTable::initializeScanState(Transaction* transaction, TableScanState& sca
}
return;
}

relScanState.source = TableScanSource::COMMITTED;
relScanState.currentCSROffset = 0;
auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset);
// collect all node ids that can be read from the same node group
while (relScanState.endNodeIdx < relScanState.totalNodeIdx) {
nodeOffset =
relScanState.boundNodeIDVector->readNodeOffset(nodeSelVector[relScanState.endNodeIdx]);
if (nodeOffset >= StorageConstants::MAX_NUM_ROWS_IN_TABLE) {
break;
}
auto curNodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset);
if (curNodeGroupIdx != nodeGroupIdx) {
break;
}
relScanState.endNodeIdx++;
}
relScanState.nodeGroup = relScanState.direction == RelDataDirection::FWD ?
fwdRelTableData->getNodeGroup(nodeGroupIdx) :
bwdRelTableData->getNodeGroup(nodeGroupIdx);
Expand Down Expand Up @@ -117,6 +102,12 @@ bool RelTable::scanInternal(Transaction* transaction, TableScanState& scanState)
}
// We need to reinitialize per node group
if (relScanState.currNodeIdx == relScanState.endNodeIdx) {
if (relScanState.resetCommitted) {
// reset to read committed, in memory data
relScanState.currNodeIdx = 0;
relScanState.endNodeIdx = 0;
relScanState.resetCommitted = false;
}
initializeScanState(transaction, relScanState);
}
offset_t curNodeOffset =
Expand Down

0 comments on commit d652bfe

Please sign in to comment.