Skip to content

Commit

Permalink
Skip scan of rel id when possible (#4261)
Browse files Browse the repository at this point in the history
* skip scan of rel id when possible

* fix

* update
  • Loading branch information
ray6080 authored Sep 16, 2024
1 parent 6cf312d commit a0f987b
Show file tree
Hide file tree
Showing 30 changed files with 112 additions and 131 deletions.
22 changes: 11 additions & 11 deletions src/binder/visitor/property_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,6 @@ void PropertyCollector::visitQueryPartSkipNodeRel(const NormalizedQueryPart& que

void PropertyCollector::visitMatch(const BoundReadingClause& readingClause) {
auto& matchClause = readingClause.constCast<BoundMatchClause>();
for (auto& rel : matchClause.getQueryGraphCollection()->getQueryRels()) {
if (rel->isEmpty() || rel->getRelType() != QueryRelType::NON_RECURSIVE) {
// If a query rel is empty then it does not have an internal id property.
continue;
}
properties.insert(rel->getInternalIDProperty());
}
if (matchClause.hasPredicate()) {
collectProperties(matchClause.getPredicate());
}
Expand Down Expand Up @@ -89,21 +82,28 @@ void PropertyCollector::visitSet(const BoundUpdatingClause& updatingClause) {
}
collectProperties(info.columnData);
}
for (const auto& info : boundSetClause.getRelInfos()) {
auto& rel = info.pattern->constCast<RelExpression>();
KU_ASSERT(!rel.isEmpty() && rel.getRelType() == QueryRelType::NON_RECURSIVE);
properties.insert(rel.getInternalIDProperty());
}
}

void PropertyCollector::visitDelete(const BoundUpdatingClause& updatingClause) {
auto& boundDeleteClause = updatingClause.constCast<BoundDeleteClause>();
// Read primary key if we are deleting nodes;
for (auto& info : boundDeleteClause.getNodeInfos()) {
for (const auto& info : boundDeleteClause.getNodeInfos()) {
auto& node = info.pattern->constCast<NodeExpression>();
for (auto entry : node.getEntries()) {
for (const auto entry : node.getEntries()) {
properties.insert(node.getPrimaryKey(entry->getTableID()));
}
}
// Read rel internal id if we are deleting relationships.
for (auto& info : boundDeleteClause.getRelInfos()) {
for (const auto& info : boundDeleteClause.getRelInfos()) {
auto& rel = info.pattern->constCast<RelExpression>();
properties.insert(rel.getInternalIDProperty());
if (!rel.isEmpty() && rel.getRelType() == QueryRelType::NON_RECURSIVE) {
properties.insert(rel.getInternalIDProperty());
}
}
}

Expand Down
1 change: 1 addition & 0 deletions src/expression_evaluator/pattern_evaluator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ void PatternExpressionEvaluator::initFurther(const ResultSet&) {
StructPackFunctions::compileFunc(nullptr, parameters, resultVector);
const auto& dataType = expression->getDataType();
auto fieldIdx = StructType::getFieldIdx(dataType.copy(), InternalKeyword::ID);
KU_ASSERT(fieldIdx != INVALID_STRUCT_FIELD_IDX);
idVector = StructVector::getFieldVector(resultVector.get(), fieldIdx).get();
}

Expand Down
29 changes: 14 additions & 15 deletions src/graph/on_disk_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,21 +226,20 @@ std::vector<nodeID_t> OnDiskGraph::scanBwdRandom(nodeID_t, GraphScanState&) {
return result;
}

void OnDiskGraph::scan(nodeID_t nodeID, RelTable* relTable, OnDiskGraphScanStates& scanState,
RelTableScanState& relTableScanState, std::vector<nodeID_t>& nbrNodeIDs) {
scanState.srcNodeIDVector->setValue<nodeID_t>(0, nodeID);
relTableScanState.resetState();
relTable->initializeScanState(context->getTx(), relTableScanState);
while (relTableScanState.source != TableScanSource::NONE &&
relTable->scan(context->getTx(), relTableScanState)) {
if (relTableScanState.IDVector->state->getSelVector().getSelSize() > 0) {
for (auto i = 0u; i < relTableScanState.IDVector->state->getSelVector().getSelSize();
++i) {
auto nbrID = relTableScanState.IDVector->getValue<nodeID_t>(i);
nbrNodeIDs.push_back(nbrID);
}
}
}
void OnDiskGraph::scan(nodeID_t, RelTable*, OnDiskGraphScanStates&, RelTableScanState&,
std::vector<nodeID_t>&) {
// scanState.srcNodeIDVector->setValue<nodeID_t>(0, nodeID);
// relTableScanState.resetState();
// relTable->initializeScanState(context->getTx(), relTableScanState);
// while (relTableScanState.source != TableScanSource::NONE &&
// relTable->scan(context->getTx(), relTableScanState)) {
// if (relTableScanState.outState->getSelVector().getSelSize() > 0) {
// for (auto i = 0u; i < relTableScanState.outState->getSelVector().getSelSize(); ++i) {
// auto nbrID = relTableScanState.IDVector->getValue<nodeID_t>(i);
// nbrNodeIDs.push_back(nbrID);
// }
// }
// }
}

} // namespace graph
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/filtering_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class SelVectorOverWriter {
virtual ~SelVectorOverWriter() = default;

protected:
void restoreSelVector(common::DataChunkState& dataChunkState);
void restoreSelVector(common::DataChunkState& dataChunkState) const;

void saveSelVector(common::DataChunkState& dataChunkState);

Expand Down
4 changes: 2 additions & 2 deletions src/include/processor/operator/flatten.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ struct FlattenLocalState {
uint64_t sizeToFlatten = 0;
};

class Flatten : public PhysicalOperator, SelVectorOverWriter {
class Flatten final : public PhysicalOperator, SelVectorOverWriter {
static constexpr PhysicalOperatorType type_ = PhysicalOperatorType::FLATTEN;

public:
Flatten(data_chunk_pos_t dataChunkToFlattenPos, std::unique_ptr<PhysicalOperator> child,
uint32_t id, std::unique_ptr<OPPrintInfo> printInfo)
: PhysicalOperator{type_, std::move(child), id, std::move(printInfo)},
dataChunkToFlattenPos{dataChunkToFlattenPos} {}
dataChunkToFlattenPos{dataChunkToFlattenPos}, dataChunkState{nullptr} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

Expand Down
10 changes: 8 additions & 2 deletions src/include/processor/operator/persistent/delete_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ struct RelDeleteInfo {
common::ValueVector* dstNodeIDVector = nullptr;
common::ValueVector* relIDVector = nullptr;

RelDeleteInfo()
: srcNodeIDPos{INVALID_DATA_CHUNK_POS, INVALID_VALUE_VECTOR_POS},
dstNodeIDPos{INVALID_DATA_CHUNK_POS, INVALID_VALUE_VECTOR_POS},
relIDPos{INVALID_DATA_CHUNK_POS, INVALID_VALUE_VECTOR_POS} {}
RelDeleteInfo(DataPos srcNodeIDPos, DataPos dstNodeIDPos, DataPos relIDPos)
: srcNodeIDPos{srcNodeIDPos}, dstNodeIDPos{dstNodeIDPos}, relIDPos{relIDPos} {}
EXPLICIT_COPY_DEFAULT_MOVE(RelDeleteInfo);
Expand All @@ -154,7 +158,7 @@ class RelDeleteExecutor {
RelDeleteExecutor(const RelDeleteExecutor& other) : info{other.info.copy()} {}
virtual ~RelDeleteExecutor() = default;

void init(ResultSet* resultSet, ExecutionContext* context);
virtual void init(ResultSet* resultSet, ExecutionContext* context);

virtual void delete_(ExecutionContext* context) = 0;

Expand All @@ -166,9 +170,11 @@ class RelDeleteExecutor {

class EmptyRelDeleteExecutor final : public RelDeleteExecutor {
public:
explicit EmptyRelDeleteExecutor(RelDeleteInfo info) : RelDeleteExecutor{std::move(info)} {}
explicit EmptyRelDeleteExecutor() : RelDeleteExecutor{RelDeleteInfo{}} {}
EmptyRelDeleteExecutor(const EmptyRelDeleteExecutor& other) : RelDeleteExecutor{other} {}

void init(ResultSet*, ExecutionContext*) override {}

void delete_(ExecutionContext*) override {}

std::unique_ptr<RelDeleteExecutor> copy() const override {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace processor {

// OffsetScanNodeTable is only used as the source operator for RecursiveJoin and thus cannot be
// executed in parallel. Therefore, it does not have a shared state.
class OffsetScanNodeTable : public ScanTable {
class OffsetScanNodeTable final : public ScanTable {
static constexpr PhysicalOperatorType type_ = PhysicalOperatorType::OFFSET_SCAN_NODE_TABLE;

public:
Expand Down
15 changes: 8 additions & 7 deletions src/include/processor/operator/scan/scan_multi_rel_tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class RelTableCollectionScanner {
uint32_t nextTableIdx = 0;
};

class ScanMultiRelTable : public ScanTable {
class ScanMultiRelTable final : public ScanTable {
static constexpr PhysicalOperatorType type_ = PhysicalOperatorType::SCAN_REL_TABLE;

public:
Expand All @@ -57,23 +57,24 @@ class ScanMultiRelTable : public ScanTable {
std::unique_ptr<PhysicalOperator> child, uint32_t id,
std::unique_ptr<OPPrintInfo> printInfo)
: ScanTable{type_, std::move(info), std::move(child), id, std::move(printInfo)},
directionInfo{std::move(directionInfo)}, scanners{std::move(scanners)} {}
directionInfo{std::move(directionInfo)}, boundNodeIDVector{nullptr},
scanners{std::move(scanners)}, currentScanner{nullptr} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal(ExecutionContext* context) final;
bool getNextTuplesInternal(ExecutionContext* context) override;

std::unique_ptr<PhysicalOperator> clone() final;
std::unique_ptr<PhysicalOperator> clone() override;

private:
void resetState();
void initCurrentScanner(const common::nodeID_t& nodeID);

private:
DirectionInfo directionInfo;
common::ValueVector* boundNodeIDVector = nullptr;
common::ValueVector* boundNodeIDVector;
common::table_id_map_t<RelTableCollectionScanner> scanners;
RelTableCollectionScanner* currentScanner = nullptr;
RelTableCollectionScanner* currentScanner;
};

} // namespace processor
Expand Down
11 changes: 3 additions & 8 deletions src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,10 @@ struct NodeTableScanState final : TableScanState {
// Scan state for un-committed data.
// Ideally we shouldn't need columns to scan un-checkpointed but committed data.
explicit NodeTableScanState(std::vector<common::column_id_t> columnIDs)
: TableScanState{std::move(columnIDs), {}} {
nodeGroupScanState = std::make_unique<NodeGroupScanState>(this->columnIDs.size());
}

: NodeTableScanState{std::move(columnIDs), {}} {}
NodeTableScanState(std::vector<common::column_id_t> columnIDs, std::vector<Column*> columns)
: TableScanState{std::move(columnIDs), std::move(columns),
std::vector<ColumnPredicateSet>{}} {
nodeGroupScanState = std::make_unique<NodeGroupScanState>(this->columnIDs.size());
}
: NodeTableScanState{std::move(columnIDs), std::move(columns),
std::vector<ColumnPredicateSet>{}} {}
NodeTableScanState(std::vector<common::column_id_t> columnIDs, std::vector<Column*> columns,
std::vector<ColumnPredicateSet> columnPredicateSets)
: TableScanState{std::move(columnIDs), std::move(columns), std::move(columnPredicateSets)} {
Expand Down
11 changes: 2 additions & 9 deletions src/include/storage/store/rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,12 @@ struct RelTableScanState : TableScanState {
const std::vector<common::column_id_t>& columnIDs)
: RelTableScanState(memoryManager, columnIDs, {}, nullptr, nullptr,
common::RelDataDirection::FWD /* This is a dummy direction */,
std::vector<ColumnPredicateSet>{}) {
nodeGroupScanState =
std::make_unique<CSRNodeGroupScanState>(memoryManager, this->columnIDs.size());
}
std::vector<ColumnPredicateSet>{}) {}
RelTableScanState(MemoryManager& memoryManager,
const std::vector<common::column_id_t>& columnIDs, const std::vector<Column*>& columns,
Column* csrOffsetCol, Column* csrLengthCol, common::RelDataDirection direction)
: RelTableScanState(memoryManager, columnIDs, columns, csrOffsetCol, csrLengthCol,
direction, std::vector<ColumnPredicateSet>{}) {
nodeGroupScanState =
std::make_unique<CSRNodeGroupScanState>(memoryManager, this->columnIDs.size());
}
direction, std::vector<ColumnPredicateSet>{}) {}
RelTableScanState(MemoryManager& memoryManager,
const std::vector<common::column_id_t>& columnIDs, const std::vector<Column*>& columns,
Column* csrOffsetCol, Column* csrLengthCol, common::RelDataDirection direction,
Expand Down Expand Up @@ -76,7 +70,6 @@ struct LocalRelTableScanState final : RelTableScanState {
LocalRelTableScanState(MemoryManager& memoryManager, const RelTableScanState& state,
const std::vector<common::column_id_t>& columnIDs, LocalRelTable* localRelTable)
: RelTableScanState{memoryManager, columnIDs}, localRelTable{localRelTable} {
IDVector = state.IDVector;
direction = state.direction;
boundNodeIDVector = state.boundNodeIDVector;
outputVectors = state.outputVectors;
Expand Down
21 changes: 9 additions & 12 deletions src/include/storage/store/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ enum class TableScanSource : uint8_t { COMMITTED = 0, UNCOMMITTED = 1, NONE = 3
struct TableScanState {
std::unique_ptr<common::ValueVector> rowIdxVector;
// Node/Rel ID vector. We assume all output vectors are within the same DataChunk as this one.
common::ValueVector* IDVector;
common::ValueVector* nodeIDVector;
std::vector<common::ValueVector*> outputVectors;
common::DataChunkState* outState;
std::vector<common::column_id_t> columnIDs;
common::NodeSemiMask* semiMask;

Expand All @@ -36,21 +37,17 @@ struct TableScanState {
common::ZoneMapCheckResult zoneMapResult = common::ZoneMapCheckResult::ALWAYS_SCAN;

explicit TableScanState(std::vector<common::column_id_t> columnIDs)
: IDVector(nullptr), columnIDs{std::move(columnIDs)}, semiMask{nullptr} {
rowIdxVector = std::make_unique<common::ValueVector>(common::LogicalType::INT64());
}
: TableScanState{std::move(columnIDs), {}, {}} {}
TableScanState(std::vector<common::column_id_t> columnIDs, std::vector<Column*> columns)
: TableScanState{std::move(columnIDs), std::move(columns), {}} {}
TableScanState(std::vector<common::column_id_t> columnIDs, std::vector<Column*> columns,
std::vector<ColumnPredicateSet> columnPredicateSets)
: IDVector(nullptr), columnIDs{std::move(columnIDs)}, semiMask{nullptr},
columns{std::move(columns)}, columnPredicateSets{std::move(columnPredicateSets)} {
rowIdxVector = std::make_unique<common::ValueVector>(common::LogicalType::INT64());
}
explicit TableScanState(std::vector<common::column_id_t> columnIDs,
std::vector<Column*> columns)
: IDVector(nullptr), columnIDs{std::move(columnIDs)}, semiMask{nullptr},
columns{std::move(columns)} {
: nodeIDVector{nullptr}, outState{nullptr}, columnIDs{std::move(columnIDs)},
semiMask{nullptr}, columns{std::move(columns)},
columnPredicateSets{std::move(columnPredicateSets)} {
rowIdxVector = std::make_unique<common::ValueVector>(common::LogicalType::INT64());
}

virtual ~TableScanState() = default;
DELETE_COPY_DEFAULT_MOVE(TableScanState);

Expand Down
2 changes: 1 addition & 1 deletion src/main/storage_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ void StorageDriver::scanColumn(storage::Table* table, column_id_t columnID, offs
idVector->state = vectorState;
columnVector->state = vectorState;
scanState->rowIdxVector->state = vectorState;
scanState->IDVector = idVector.get();
scanState->nodeIDVector = idVector.get();
scanState->outputVectors.push_back(columnVector.get());
// Scan
// TODO: validate not more than 1 level nested
Expand Down
5 changes: 4 additions & 1 deletion src/optimizer/projection_push_down_optimizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,10 @@ void ProjectionPushDownOptimizer::visitDelete(LogicalOperator* op) {
auto& rel = info.pattern->constCast<RelExpression>();
collectExpressionsInUse(rel.getSrcNode()->getInternalID());
collectExpressionsInUse(rel.getDstNode()->getInternalID());
collectExpressionsInUse(rel.getInternalIDProperty());
KU_ASSERT(rel.getRelType() == QueryRelType::NON_RECURSIVE);
if (!rel.isEmpty()) {
collectExpressionsInUse(rel.getInternalIDProperty());
}
}
} break;
default:
Expand Down
2 changes: 1 addition & 1 deletion src/planner/operator/persistent/logical_delete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ std::string LogicalDelete::getExpressionsForPrinting() const {

f_group_pos_set LogicalDelete::getGroupsPosToFlatten() const {
KU_ASSERT(!infos.empty());
auto childSchema = children[0]->getSchema();
const auto childSchema = children[0]->getSchema();
f_group_pos_set dependentGroupPos;
switch (infos[0].tableType) {
case TableType::NODE: {
Expand Down
18 changes: 0 additions & 18 deletions src/planner/plan/append_extend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include "catalog/catalog.h"
#include "catalog/catalog_entry/rel_table_catalog_entry.h"
#include "common/enums/join_type.h"
#include "common/exception/runtime.h"
#include "planner/join_order/cost_model.h"
#include "planner/operator/extend/logical_extend.h"
#include "planner/operator/extend/logical_recursive_extend.h"
Expand Down Expand Up @@ -78,27 +77,10 @@ static std::shared_ptr<Expression> getIRIProperty(const expression_vector& prope
return nullptr;
}

static void validatePropertiesContainRelID(const RelExpression& rel,
const expression_vector& properties) {
if (rel.isEmpty()) {
return;
}
for (auto& property : properties) {
if (*property == *rel.getInternalIDProperty()) {
return;
}
}
// LCOV_EXCL_START
throw RuntimeException(stringFormat(
"Internal ID of relationship {} is not scanned. This should not happen.", rel.toString()));
// LCOV_EXCL_STOP
}

void Planner::appendNonRecursiveExtend(const std::shared_ptr<NodeExpression>& boundNode,
const std::shared_ptr<NodeExpression>& nbrNode, const std::shared_ptr<RelExpression>& rel,
ExtendDirection direction, bool extendFromSource, const expression_vector& properties,
LogicalPlan& plan) {
validatePropertiesContainRelID(*rel, properties);
// Filter bound node label if we know some incoming nodes won't have any outgoing rel. This
// cannot be done at binding time because the pruning is affected by extend direction.
auto boundNodeTableIDSet = getBoundNodeTableIDSet(*rel, direction);
Expand Down
10 changes: 5 additions & 5 deletions src/processor/map/map_delete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapDeleteNode(LogicalOperator* log

std::unique_ptr<RelDeleteExecutor> PlanMapper::getRelDeleteExecutor(
const BoundDeleteInfo& boundInfo, const Schema& schema) const {
auto storageManager = clientContext->getStorageManager();
auto& rel = boundInfo.pattern->constCast<RelExpression>();
if (rel.isEmpty()) {
return std::make_unique<EmptyRelDeleteExecutor>();
}
auto relIDPos = getDataPos(*rel.getInternalIDProperty(), schema);
auto srcNodeIDPos = getDataPos(*rel.getSrcNode()->getInternalID(), schema);
auto dstNodeIDPos = getDataPos(*rel.getDstNode()->getInternalID(), schema);
auto relIDPos = getDataPos(*rel.getInternalIDProperty(), schema);
auto info = RelDeleteInfo(srcNodeIDPos, dstNodeIDPos, relIDPos);
if (rel.isEmpty()) {
return std::make_unique<EmptyRelDeleteExecutor>(std::move(info));
}
auto storageManager = clientContext->getStorageManager();
if (rel.isMultiLabeled()) {
common::table_id_map_t<storage::RelTable*> tableIDToTableMap;
for (auto entry : rel.getEntries()) {
Expand Down
3 changes: 1 addition & 2 deletions src/processor/map/map_extend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,12 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapExtend(LogicalOperator* logical
auto prevOperator = mapOperator(logicalOperator->getChild(0).get());
auto inNodeIDPos = getDataPos(*boundNode->getInternalID(), *inFSchema);
auto outNodeIDPos = getDataPos(*nbrNode->getInternalID(), *outFSchema);
auto relIDPos = getDataPos(*rel->getInternalIDProperty(), *outFSchema);
std::vector<DataPos> outVectorsPos;
outVectorsPos.push_back(outNodeIDPos);
for (auto& expression : extend->getProperties()) {
outVectorsPos.push_back(getDataPos(*expression, *outFSchema));
}
auto scanInfo = ScanTableInfo(relIDPos, outVectorsPos);
auto scanInfo = ScanTableInfo(inNodeIDPos, outVectorsPos);
std::vector<std::string> tableNames;
auto storageManager = clientContext->getStorageManager();
for (auto entry : rel->getEntries()) {
Expand Down
2 changes: 1 addition & 1 deletion src/processor/operator/filtering_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ using namespace kuzu::common;
namespace kuzu {
namespace processor {

void SelVectorOverWriter::restoreSelVector(DataChunkState& dataChunkState) {
void SelVectorOverWriter::restoreSelVector(DataChunkState& dataChunkState) const {
if (prevSelVector != nullptr) {
dataChunkState.setSelVector(prevSelVector);
}
Expand Down
Loading

0 comments on commit a0f987b

Please sign in to comment.