diff --git a/src/include/storage/store/csr_node_group.h b/src/include/storage/store/csr_node_group.h index 82d189a93d6..93d1ce9f64e 100644 --- a/src/include/storage/store/csr_node_group.h +++ b/src/include/storage/store/csr_node_group.h @@ -113,10 +113,11 @@ struct CSRNodeGroupScanState final : NodeGroupScanState { // group. State for reading from checkpointed node group. std::unique_ptr csrHeader; std::vector persistentCSRLists; - std::vector inMemCSRLists; + NodeCSRIndex inMemCSRList; // position in vector of either csr list/index vector uint32_t nextCSRToScan; + bool persistentInitialized = false; // States at the csr list level. Cached during scan over a single csr list. CSRNodeGroupScanSource source = CSRNodeGroupScanSource::COMMITTED_PERSISTENT; @@ -130,7 +131,6 @@ struct CSRNodeGroupScanState final : NodeGroupScanState { csrHeader->resetToEmpty(); source = CSRNodeGroupScanSource::COMMITTED_IN_MEMORY; persistentCSRLists.clear(); - inMemCSRLists.clear(); nextCSRToScan = 0; } }; @@ -213,20 +213,17 @@ class CSRNodeGroup final : public NodeGroup { private: void initializePersistentCSRHeader(transaction::Transaction* transaction, RelTableScanState& relScanState, CSRNodeGroupScanState& nodeGroupScanState) const; + void initializeInMemScanState(TableScanState& state); void updateCSRIndex(common::offset_t boundNodeOffsetInGroup, common::row_idx_t startRow, common::length_t length) const; NodeGroupScanResult scanCommittedPersistent(const transaction::Transaction* transaction, const RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState) const; - NodeGroupScanResult scanCommittedInMem(transaction::Transaction* transaction, - const RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState); NodeGroupScanResult scanCommittedInMemSequential(const transaction::Transaction* transaction, - const RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState, - common::row_idx_t numScanned); + const RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState); NodeGroupScanResult scanCommittedInMemRandom(transaction::Transaction* transaction, - const RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState, - common::row_idx_t numScanned); + const RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState); void checkpointInMemOnly(const common::UniqLock& lock, NodeGroupCheckpointState& state); void checkpointInMemAndOnDisk(const common::UniqLock& lock, NodeGroupCheckpointState& state); diff --git a/src/include/storage/store/rel_table.h b/src/include/storage/store/rel_table.h index 86fb8406299..fe4a1294c57 100644 --- a/src/include/storage/store/rel_table.h +++ b/src/include/storage/store/rel_table.h @@ -17,6 +17,7 @@ struct RelTableScanState : TableScanState { Column* csrOffsetColumn; Column* csrLengthColumn; + bool resetCommitted = false; common::sel_t currNodeIdx = 0; common::sel_t endNodeIdx = 0; common::sel_t totalNodeIdx = 0; diff --git a/src/storage/store/csr_node_group.cpp b/src/storage/store/csr_node_group.cpp index 38b2e6015f4..9d839dc0a89 100644 --- a/src/storage/store/csr_node_group.cpp +++ b/src/storage/store/csr_node_group.cpp @@ -13,47 +13,58 @@ namespace storage { void CSRNodeGroup::initializeScanState(Transaction* transaction, TableScanState& state) { auto& relScanState = state.cast(); auto& nodeSelVector = relScanState.boundNodeIDVector->state->getSelVector(); - KU_ASSERT(nodeGroupIdx == StorageUtils::getNodeGroupIdx( - relScanState.boundNodeIDVector->readNodeOffset(nodeSelVector[relScanState.currNodeIdx]) - )); KU_ASSERT(relScanState.nodeGroupScanState); auto& nodeGroupScanState = relScanState.nodeGroupScanState->cast(); relScanState.nodeGroupScanState->resetState(); relScanState.nodeGroupIdx = nodeGroupIdx; - // Scan the csr header chunks from disk. - if (persistentChunkGroup) { - initializePersistentCSRHeader(transaction, relScanState, nodeGroupScanState); - } - // Queue all nodes to be scanned in the node group. - nodeGroupScanState.nextRowToScan = 0; const auto startNodeOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); - for (auto i = relScanState.currNodeIdx; i < relScanState.endNodeIdx; i++) { - const auto offsetInGroup = - relScanState.boundNodeIDVector->readNodeOffset(nodeSelVector[i]) - startNodeOffset; - if (persistentChunkGroup) { + nodeGroupScanState.source = CSRNodeGroupScanSource::NONE; + if (persistentChunkGroup && !nodeGroupScanState.persistentInitialized) { + nodeGroupScanState.source = CSRNodeGroupScanSource::COMMITTED_PERSISTENT; + // Scan the csr header chunks from disk. + initializePersistentCSRHeader(transaction, relScanState, nodeGroupScanState); + // Queue persistent nodes to be scanned in the node group. + while (relScanState.endNodeIdx < relScanState.totalNodeIdx) { + auto nodeOffset = + relScanState.boundNodeIDVector->readNodeOffset(nodeSelVector[relScanState.endNodeIdx]); + if (nodeOffset >= StorageConstants::MAX_NUM_ROWS_IN_TABLE) { + break; + } + // Node table scan should only give node offsets within the same nodegroup + KU_ASSERT(nodeGroupIdx == StorageUtils::getNodeGroupIdx(nodeOffset)); + const auto offsetInGroup = nodeOffset - startNodeOffset; auto offset = nodeGroupScanState.csrHeader->getStartCSROffset(offsetInGroup); auto length = nodeGroupScanState.csrHeader->getCSRLength(offsetInGroup); - if (length > 0) { - nodeGroupScanState.persistentCSRLists.emplace_back(offset, length); - } + nodeGroupScanState.persistentCSRLists.emplace_back(offset, length); + relScanState.endNodeIdx++; } if (csrIndex) { - auto& index = csrIndex->indices[offsetInGroup]; - if (!index.isSequential) { - KU_ASSERT(std::is_sorted(index.rowIndices.begin(), index.rowIndices.end())); - } - if (index.rowIndices.size() > 0) { - nodeGroupScanState.inMemCSRLists.push_back(index); - } + // after scanning all persistent data, reset nodeIdxs back to 0 for in memory data + relScanState.resetCommitted = true; } + } else if (csrIndex) { + // Either we only have in memory data to scan or we have already scanned the persistent data + initializeInMemScanState(state); } - if (!nodeGroupScanState.persistentCSRLists.empty()) { - nodeGroupScanState.source = CSRNodeGroupScanSource::COMMITTED_PERSISTENT; - } else if (!nodeGroupScanState.inMemCSRLists.empty()) { - nodeGroupScanState.source = CSRNodeGroupScanSource::COMMITTED_IN_MEMORY; - } else { - nodeGroupScanState.source = CSRNodeGroupScanSource::NONE; +} + +void CSRNodeGroup::initializeInMemScanState(TableScanState& state) { + // We do not batch in memory scans + auto& relScanState = state.cast(); + auto& nodeSelVector = relScanState.boundNodeIDVector->state->getSelVector(); + KU_ASSERT(csrIndex); + KU_ASSERT(relScanState.nodeGroupScanState); + auto& nodeGroupScanState = relScanState.nodeGroupScanState->cast(); + const auto startNodeOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); + const auto offsetInGroup = relScanState.boundNodeIDVector->readNodeOffset( + nodeSelVector[relScanState.endNodeIdx++]) - startNodeOffset; + nodeGroupScanState.inMemCSRList = csrIndex->indices[offsetInGroup]; + if (!nodeGroupScanState.inMemCSRList.isSequential) { + KU_ASSERT(std::is_sorted(nodeGroupScanState.inMemCSRList.rowIndices.begin(), + nodeGroupScanState.inMemCSRList.rowIndices.end())); } + nodeGroupScanState.source = nodeGroupScanState.inMemCSRList.rowIndices.size() > 0 ? + CSRNodeGroupScanSource::COMMITTED_IN_MEMORY : CSRNodeGroupScanSource::NONE; } void CSRNodeGroup::initializePersistentCSRHeader(Transaction* transaction, @@ -112,16 +123,16 @@ NodeGroupScanResult CSRNodeGroup::scan(Transaction* transaction, TableScanState& case CSRNodeGroupScanSource::COMMITTED_PERSISTENT: { auto result = scanCommittedPersistent(transaction, relScanState, nodeGroupScanState); if (result == NODE_GROUP_SCAN_EMMPTY_RESULT) { - nodeGroupScanState.source = nodeGroupScanState.inMemCSRLists.empty() ? - CSRNodeGroupScanSource::NONE : - CSRNodeGroupScanSource::COMMITTED_IN_MEMORY; + initializeInMemScanState(state); nodeGroupScanState.nextCSRToScan = 0; continue; } return result; } case CSRNodeGroupScanSource::COMMITTED_IN_MEMORY: { - const auto result = scanCommittedInMem(transaction, relScanState, nodeGroupScanState); + const auto result = nodeGroupScanState.inMemCSRList.isSequential ? + scanCommittedInMemSequential(transaction, relScanState, nodeGroupScanState) : + scanCommittedInMemRandom(transaction, relScanState, nodeGroupScanState); if (result == NODE_GROUP_SCAN_EMMPTY_RESULT) { relScanState.IDVector->state->getSelVectorUnsafe().setSelSize(0); return NODE_GROUP_SCAN_EMMPTY_RESULT; @@ -165,71 +176,44 @@ NodeGroupScanResult CSRNodeGroup::scanCommittedPersistent(const Transaction* tra return result; } -NodeGroupScanResult CSRNodeGroup::scanCommittedInMem(Transaction* transaction, - const RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState) { - auto result = NODE_GROUP_SCAN_EMMPTY_RESULT; - while (nodeGroupScanState.nextCSRToScan < nodeGroupScanState.inMemCSRLists.size()) { - auto& index = nodeGroupScanState.inMemCSRLists[nodeGroupScanState.nextCSRToScan]; - auto current = index.isSequential ? scanCommittedInMemSequential(transaction, tableState, - nodeGroupScanState, result.numRows) : - scanCommittedInMemRandom(transaction, tableState, - nodeGroupScanState, result.numRows); - if (result.startRow == INVALID_ROW_IDX) { - result.startRow = current.startRow; - } - result.numRows += current.numRows; - if (result.numRows == DEFAULT_VECTOR_CAPACITY) { - break; - } - } - return result; -} - NodeGroupScanResult CSRNodeGroup::scanCommittedInMemSequential(const Transaction* transaction, - const RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState, - row_idx_t numScanned) { - auto& index = nodeGroupScanState.inMemCSRLists[nodeGroupScanState.nextCSRToScan]; - const auto startRow = index.rowIndices[0] + nodeGroupScanState.nextRowToScan; - const auto numRows = std::min(index.rowIndices[1] - nodeGroupScanState.nextRowToScan, - DEFAULT_VECTOR_CAPACITY - numScanned); - KU_ASSERT(numRows > 0); - row_idx_t rowsScanned = 0; - while (rowsScanned < numRows) { - auto [chunkIdx, startRowInChunk] = StorageUtils::getQuotientRemainder( - startRow + rowsScanned, ChunkedNodeGroup::CHUNK_CAPACITY); - rowsScanned += - std::min(numRows - rowsScanned, ChunkedNodeGroup::CHUNK_CAPACITY - startRowInChunk); - ChunkedNodeGroup* chunkedGroup; - { - const auto lock = chunkedGroups.lock(); - chunkedGroup = chunkedGroups.getGroup(lock, chunkIdx); - } - chunkedGroup->scan(transaction, tableState, nodeGroupScanState, startRowInChunk, - numRows - rowsScanned); - } - KU_ASSERT(rowsScanned == numRows); - if (numRows == index.rowIndices[1] - nodeGroupScanState.nextRowToScan) { - nodeGroupScanState.nextCSRToScan++; - nodeGroupScanState.nextRowToScan = 0; - } else { - nodeGroupScanState.nextRowToScan += numRows; + const RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState) { + const auto startRow = + nodeGroupScanState.inMemCSRList.rowIndices[0] + nodeGroupScanState.nextRowToScan; + auto numRows = + std::min(nodeGroupScanState.inMemCSRList.rowIndices[1] - nodeGroupScanState.nextRowToScan, + DEFAULT_VECTOR_CAPACITY); + auto [chunkIdx, startRowInChunk] = + StorageUtils::getQuotientRemainder(startRow, ChunkedNodeGroup::CHUNK_CAPACITY); + numRows = std::min(numRows, ChunkedNodeGroup::CHUNK_CAPACITY - startRowInChunk); + if (numRows == 0) { + return NODE_GROUP_SCAN_EMMPTY_RESULT; + } + ChunkedNodeGroup* chunkedGroup; + { + const auto lock = chunkedGroups.lock(); + chunkedGroup = chunkedGroups.getGroup(lock, chunkIdx); } + chunkedGroup->scan(transaction, tableState, nodeGroupScanState, startRowInChunk, numRows); + nodeGroupScanState.nextRowToScan += numRows; return NodeGroupScanResult{startRow, numRows}; } NodeGroupScanResult CSRNodeGroup::scanCommittedInMemRandom(Transaction* transaction, - const RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState, - row_idx_t numScanned) { - auto& index = nodeGroupScanState.inMemCSRLists[nodeGroupScanState.nextCSRToScan]; - const auto numRows = std::min(index.rowIndices.size() - nodeGroupScanState.nextRowToScan, - DEFAULT_VECTOR_CAPACITY - numScanned); - KU_ASSERT(numRows > 0); + const RelTableScanState& tableState, CSRNodeGroupScanState& nodeGroupScanState) { + const auto numRows = std::min(nodeGroupScanState.inMemCSRList.rowIndices.size() - + nodeGroupScanState.nextRowToScan, + DEFAULT_VECTOR_CAPACITY); + if (numRows == 0) { + return NODE_GROUP_SCAN_EMMPTY_RESULT; + } row_idx_t nextRow = 0; ChunkedNodeGroup* chunkedGroup = nullptr; node_group_idx_t currentChunkIdx = INVALID_NODE_GROUP_IDX; sel_t numSelected = 0; while (nextRow < numRows) { - const auto rowIdx = index.rowIndices[nextRow + nodeGroupScanState.nextRowToScan]; + const auto rowIdx = + nodeGroupScanState.inMemCSRList.rowIndices[nextRow + nodeGroupScanState.nextRowToScan]; auto [chunkIdx, rowInChunk] = StorageUtils::getQuotientRemainder(rowIdx, DEFAULT_VECTOR_CAPACITY); if (chunkIdx != currentChunkIdx) { @@ -242,12 +226,7 @@ NodeGroupScanResult CSRNodeGroup::scanCommittedInMemRandom(Transaction* transact numSelected); nextRow++; } - if (numRows == index.rowIndices.size() - nodeGroupScanState.nextRowToScan) { - nodeGroupScanState.nextCSRToScan++; - nodeGroupScanState.nextRowToScan = 0; - } else { - nodeGroupScanState.nextRowToScan += numRows; - } + nodeGroupScanState.nextRowToScan += numRows; tableState.IDVector->state->getSelVectorUnsafe().setSelSize(numSelected); return NodeGroupScanResult{0, numRows}; } diff --git a/src/storage/store/rel_table.cpp b/src/storage/store/rel_table.cpp index 7512ef17fc9..9ef50b49761 100644 --- a/src/storage/store/rel_table.cpp +++ b/src/storage/store/rel_table.cpp @@ -55,7 +55,6 @@ std::unique_ptr RelTable::loadTable(Deserializer& deSer, const Catalog void RelTable::initializeScanState(Transaction* transaction, TableScanState& scanState) { // Scan always start with committed data first. auto& relScanState = scanState.cast(); - auto& nodeSelVector = relScanState.boundNodeIDVector->state->getSelVector(); relScanState.totalNodeIdx = nodeSelVector.getSelSize(); KU_ASSERT(relScanState.totalNodeIdx > 0); @@ -73,23 +72,9 @@ void RelTable::initializeScanState(Transaction* transaction, TableScanState& sca } return; } - relScanState.source = TableScanSource::COMMITTED; relScanState.currentCSROffset = 0; auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); - // collect all node ids that can be read from the same node group - while (relScanState.endNodeIdx < relScanState.totalNodeIdx) { - nodeOffset = - relScanState.boundNodeIDVector->readNodeOffset(nodeSelVector[relScanState.endNodeIdx]); - if (nodeOffset >= StorageConstants::MAX_NUM_ROWS_IN_TABLE) { - break; - } - auto curNodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset); - if (curNodeGroupIdx != nodeGroupIdx) { - break; - } - relScanState.endNodeIdx++; - } relScanState.nodeGroup = relScanState.direction == RelDataDirection::FWD ? fwdRelTableData->getNodeGroup(nodeGroupIdx) : bwdRelTableData->getNodeGroup(nodeGroupIdx); @@ -117,6 +102,12 @@ bool RelTable::scanInternal(Transaction* transaction, TableScanState& scanState) } // We need to reinitialize per node group if (relScanState.currNodeIdx == relScanState.endNodeIdx) { + if (relScanState.resetCommitted) { + // reset to read committed, in memory data + relScanState.currNodeIdx = 0; + relScanState.endNodeIdx = 0; + relScanState.resetCommitted = false; + } initializeScanState(transaction, relScanState); } offset_t curNodeOffset =