Skip to content

Commit

Permalink
Add parallel k-core implementation (#4676)
Browse files Browse the repository at this point in the history
* KCore Parallel implementation (#4661)

* current draft implementation

---------

Co-authored-by: CI Bot <[email protected]>

* Improve parallel kcore

* Run clang-format

---------

Co-authored-by: Howe Wang <[email protected]>
Co-authored-by: CI Bot <[email protected]>
Co-authored-by: CI Bot <[email protected]>
  • Loading branch information
4 people authored Jan 9, 2025
1 parent 6c78434 commit 2e07232
Show file tree
Hide file tree
Showing 12 changed files with 454 additions and 53 deletions.
7 changes: 4 additions & 3 deletions src/function/function_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,10 @@ FunctionCollection* FunctionCollection::getFunctions() {

// Algorithm functions
ALGORITHM_FUNCTION(WeaklyConnectedComponentsFunction),
ALGORITHM_FUNCTION(VarLenJoinsFunction), ALGORITHM_FUNCTION(AllSPDestinationsFunction),
ALGORITHM_FUNCTION(AllSPPathsFunction), ALGORITHM_FUNCTION(SingleSPDestinationsFunction),
ALGORITHM_FUNCTION(SingleSPPathsFunction), ALGORITHM_FUNCTION(PageRankFunction),
ALGORITHM_FUNCTION(KCoreDecompositionFunction), ALGORITHM_FUNCTION(VarLenJoinsFunction),
ALGORITHM_FUNCTION(AllSPDestinationsFunction), ALGORITHM_FUNCTION(AllSPPathsFunction),
ALGORITHM_FUNCTION(SingleSPDestinationsFunction), ALGORITHM_FUNCTION(SingleSPPathsFunction),
ALGORITHM_FUNCTION(PageRankFunction),

// Export functions
EXPORT_FUNCTION(ExportCSVFunction), EXPORT_FUNCTION(ExportParquetFunction),
Expand Down
3 changes: 2 additions & 1 deletion src/function/gds/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ add_library(kuzu_function_algorithm
gds_utils.cpp
output_writer.cpp
variable_length_path.cpp
weakly_connected_components.cpp)
weakly_connected_components.cpp
k_core_decomposition.cpp)

set(ALL_OBJECT_FILES
${ALL_OBJECT_FILES} $<TARGET_OBJECTS:kuzu_function_algorithm>
Expand Down
1 change: 0 additions & 1 deletion src/function/gds/gds_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ void GDSUtils::scheduleFrontierTask(table_id_t boundTableID, table_id_t nbrTable
task->runSparse();
return;
}

// GDSUtils::runFrontiersUntilConvergence is called from a GDSCall operator, which is
// already executed by a worker thread Tm of the task scheduler. So this function is
// executed by Tm. Because this function will monitor the task and wait for it to
Expand Down
366 changes: 366 additions & 0 deletions src/function/gds/k_core_decomposition.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,366 @@
#include "binder/binder.h"
#include "binder/expression/expression_util.h"
#include "common/types/types.h"
#include "function/gds/gds_frontier.h"
#include "function/gds/gds_function_collection.h"
#include "function/gds/gds_object_manager.h"
#include "function/gds/gds_utils.h"
#include "function/gds/output_writer.h"
#include "function/gds_function.h"
#include "graph/graph.h"
#include "processor/execution_context.h"
#include "processor/result/factorized_table.h"

using namespace kuzu::binder;
using namespace kuzu::common;
using namespace kuzu::processor;
using namespace kuzu::storage;
using namespace kuzu::graph;

namespace kuzu {
namespace function {

using degree_t = uint64_t;
static constexpr degree_t INVALID_DEGREE = UINT64_MAX;

class Degrees {
public:
Degrees(const table_id_map_t<offset_t>& numNodesMap, MemoryManager* mm) {
init(numNodesMap, mm);
}

void pinTable(table_id_t tableID) { degreeValues = degreeValuesMap.getData(tableID); }

void addDegree(offset_t offset, uint64_t degree) {
degreeValues[offset].fetch_add(degree, std::memory_order_relaxed);
}

void decreaseDegreeByOne(offset_t offset) {
degreeValues[offset].fetch_sub(1, std::memory_order_relaxed);
}

degree_t getValue(offset_t offset) {
return degreeValues[offset].load(std::memory_order_relaxed);
}

private:
void init(const table_id_map_t<offset_t>& numNodesMap, MemoryManager* mm) {
for (const auto& [tableID, numNodes] : numNodesMap) {
degreeValuesMap.allocate(tableID, numNodes, mm);
pinTable(tableID);
for (auto i = 0u; i < numNodes; ++i) {
degreeValues[i].store(0, std::memory_order_relaxed);
}
}
}

private:
std::atomic<degree_t>* degreeValues = nullptr;
ObjectArraysMap<std::atomic<degree_t>> degreeValuesMap;
};

class CoreValues {
public:
CoreValues(const table_id_map_t<offset_t>& numNodesMap, MemoryManager* mm) {
init(numNodesMap, mm);
}

void pinTable(table_id_t tableID) { coreValues = coreValuesMap.getData(tableID); }

bool isValid(offset_t offset) {
return coreValues[offset].load(std::memory_order_relaxed) != INVALID_DEGREE;
}
degree_t getValue(offset_t offset) {
return coreValues[offset].load(std::memory_order_relaxed);
}
void setCoreValue(offset_t offset, degree_t value) {
coreValues[offset].store(value, std::memory_order_relaxed);
}

private:
void init(const table_id_map_t<offset_t>& numNodesMap, MemoryManager* mm) {
for (const auto& [tableID, numNodes] : numNodesMap) {
coreValuesMap.allocate(tableID, numNodes, mm);
pinTable(tableID);
for (auto i = 0u; i < numNodes; ++i) {
coreValues[i].store(INVALID_DEGREE, std::memory_order_relaxed);
}
}
}

private:
std::atomic<degree_t>* coreValues = nullptr;
ObjectArraysMap<std::atomic<degree_t>> coreValuesMap;
};

class KCoreFrontierPair : public FrontierPair {
public:
KCoreFrontierPair(std::shared_ptr<GDSFrontier> curFrontier,
std::shared_ptr<GDSFrontier> nextFrontier, Degrees* degrees, CoreValues* coreValues)
: FrontierPair(curFrontier, nextFrontier), degrees{degrees}, coreValues{coreValues} {}

void initRJFromSource(nodeID_t /* source */) override{};

void pinCurrFrontier(table_id_t tableID) override {
FrontierPair::pinCurrFrontier(tableID);
curDenseFrontier->ptrCast<PathLengths>()->pinCurFrontierTableID(tableID);
}

void pinNextFrontier(table_id_t tableID) override {
FrontierPair::pinNextFrontier(tableID);
nextDenseFrontier->ptrCast<PathLengths>()->pinNextFrontierTableID(tableID);
}

void beginFrontierComputeBetweenTables(table_id_t curTableID, table_id_t nextTableID) override {
FrontierPair::beginFrontierComputeBetweenTables(curTableID, nextTableID);
degrees->pinTable(nextTableID);
coreValues->pinTable(nextTableID);
}

void beginNewIterationInternalNoLock() override {
std::swap(curDenseFrontier, nextDenseFrontier);
curDenseFrontier->ptrCast<PathLengths>()->incrementCurIter();
nextDenseFrontier->ptrCast<PathLengths>()->incrementCurIter();
}

private:
Degrees* degrees;
CoreValues* coreValues;
};

struct DegreeEdgeCompute : public EdgeCompute {
Degrees* degrees;

explicit DegreeEdgeCompute(Degrees* degrees) : degrees{degrees} {}

std::vector<nodeID_t> edgeCompute(nodeID_t boundNodeID, graph::NbrScanState::Chunk& chunk,
bool) override {
degrees->addDegree(boundNodeID.offset, chunk.size());
return {};
}

std::unique_ptr<EdgeCompute> copy() override {
return std::make_unique<DegreeEdgeCompute>(degrees);
}
};

struct RemoveVertexEdgeCompute : public EdgeCompute {
Degrees* degrees;

explicit RemoveVertexEdgeCompute(Degrees* degrees) : degrees{degrees} {}

std::vector<nodeID_t> edgeCompute(nodeID_t, graph::NbrScanState::Chunk& chunk, bool) override {
chunk.forEach(
[&](auto nbrNodeID, auto) { degrees->decreaseDegreeByOne(nbrNodeID.offset); });
return {};
}

std::unique_ptr<EdgeCompute> copy() override {
return std::make_unique<RemoveVertexEdgeCompute>(degrees);
}
};

class KCoreOutputWriter : GDSOutputWriter {
public:
KCoreOutputWriter(main::ClientContext* context, processor::NodeOffsetMaskMap* outputNodeMask,
CoreValues* coreValues)
: GDSOutputWriter{context, outputNodeMask}, coreValues{coreValues} {
nodeIDVector = createVector(LogicalType::INTERNAL_ID(), context->getMemoryManager());
kValueVector = createVector(LogicalType::UINT64(), context->getMemoryManager());
}

void pinTableID(table_id_t tableID) override {
GDSOutputWriter::pinTableID(tableID);
coreValues->pinTable(tableID);
}

void materialize(offset_t startOffset, offset_t endOffset, table_id_t tableID,
FactorizedTable& table) const {
for (auto i = startOffset; i < endOffset; ++i) {
auto nodeID = nodeID_t{i, tableID};
nodeIDVector->setValue<nodeID_t>(0, nodeID);
kValueVector->setValue<uint64_t>(0, coreValues->getValue(i));
table.append(vectors);
}
}

std::unique_ptr<KCoreOutputWriter> copy() const {
return std::make_unique<KCoreOutputWriter>(context, outputNodeMask, coreValues);
}

private:
std::unique_ptr<ValueVector> nodeIDVector;
std::unique_ptr<ValueVector> kValueVector;
CoreValues* coreValues;
};

class OutputVertexCompute : public VertexCompute {
public:
OutputVertexCompute(MemoryManager* mm, processor::GDSCallSharedState* sharedState,
std::unique_ptr<KCoreOutputWriter> outputWriter)
: mm{mm}, sharedState{sharedState}, outputWriter{std::move(outputWriter)} {
localFT = sharedState->claimLocalTable(mm);
}
~OutputVertexCompute() override { sharedState->returnLocalTable(localFT); }

bool beginOnTable(table_id_t tableID) override {
outputWriter->pinTableID(tableID);
return true;
}

void vertexCompute(offset_t startOffset, offset_t endOffset, table_id_t tableID) override {
outputWriter->materialize(startOffset, endOffset, tableID, *localFT);
}

std::unique_ptr<VertexCompute> copy() override {
return std::make_unique<OutputVertexCompute>(mm, sharedState, outputWriter->copy());
}

private:
MemoryManager* mm;
processor::GDSCallSharedState* sharedState;
std::unique_ptr<KCoreOutputWriter> outputWriter;
processor::FactorizedTable* localFT;
};

class DegreeLessThanCoreVertexCompute : public VertexCompute {
public:
DegreeLessThanCoreVertexCompute(Degrees* degrees, CoreValues* coreValues,
FrontierPair* frontierPair, degree_t coreValue, std::atomic<offset_t>& numActiveNodes)
: degrees{degrees}, coreValues{coreValues}, frontierPair{frontierPair},
coreValue{coreValue}, numActiveNodes{numActiveNodes} {}

bool beginOnTable(table_id_t tableID) override {
degrees->pinTable(tableID);
coreValues->pinTable(tableID);
return true;
}

void vertexCompute(offset_t startOffset, offset_t endOffset, table_id_t tableID) override {
for (auto i = startOffset; i < endOffset; ++i) {
if (coreValues->isValid(i)) { // Core has been computed
continue;
}
auto degree = degrees->getValue(i);
if (degree <= coreValue) {
frontierPair->addNodeToNextDenseFrontier(nodeID_t{i, tableID});
coreValues->setCoreValue(i, coreValue);
numActiveNodes.fetch_add(1, std::memory_order_relaxed);
}
}
}

std::unique_ptr<VertexCompute> copy() override {
return std::make_unique<DegreeLessThanCoreVertexCompute>(degrees, coreValues, frontierPair,
coreValue, numActiveNodes);
}

private:
Degrees* degrees;
CoreValues* coreValues;
FrontierPair* frontierPair;
degree_t coreValue;
std::atomic<offset_t>& numActiveNodes;
};

class KCoreDecomposition final : public GDSAlgorithm {
static constexpr char GROUP_ID_COLUMN_NAME[] = "k_degree";

public:
KCoreDecomposition() = default;
KCoreDecomposition(const KCoreDecomposition& other) : GDSAlgorithm{other} {}

std::vector<LogicalTypeID> getParameterTypeIDs() const override {
return std::vector<LogicalTypeID>{LogicalTypeID::ANY};
}

binder::expression_vector getResultColumns(binder::Binder* binder) const override {
expression_vector columns;
auto& outputNode = bindData->getNodeOutput()->constCast<NodeExpression>();
columns.push_back(outputNode.getInternalID());
columns.push_back(binder->createVariable(GROUP_ID_COLUMN_NAME, LogicalType::INT64()));
return columns;
}

void bind(const GDSBindInput& input, main::ClientContext& context) override {
auto graphName = binder::ExpressionUtil::getLiteralValue<std::string>(*input.getParam(0));
auto graphEntry = bindGraphEntry(context, graphName);
auto nodeOutput = bindNodeOutput(input.binder, graphEntry.nodeEntries);
bindData = std::make_unique<GDSBindData>(std::move(graphEntry), nodeOutput);
}

void exec(processor::ExecutionContext* context) override {
auto clientContext = context->clientContext;
auto mm = clientContext->getMemoryManager();
auto graph = sharedState->graph.get();
auto numNodesMap = graph->getNumNodesMap(clientContext->getTransaction());
auto degrees = Degrees(numNodesMap, mm);
auto coreValues = CoreValues(numNodesMap, mm);
auto currentFrontier = getPathLengthsFrontier(context, PathLengths::UNVISITED);
auto nextFrontier = getPathLengthsFrontier(context, 0);
auto frontierPair = std::make_unique<KCoreFrontierPair>(currentFrontier, nextFrontier,
&degrees, &coreValues);
// Initialize starting nodes (all nodes) in the next frontier.
// When beginNewIteration, next frontier will become current frontier
frontierPair->setActiveNodesForNextIter();
frontierPair->getNextSparseFrontier().disable();
// Compute Degree
auto degreeEdgeCompute = std::make_unique<DegreeEdgeCompute>(&degrees);
auto computeState = GDSComputeState(std::move(frontierPair), std::move(degreeEdgeCompute),
sharedState->getOutputNodeMaskMap());
GDSUtils::runFrontiersUntilConvergence(context, computeState, graph, ExtendDirection::BOTH,
1 /* maxIters */);
// Compute Core values
auto removeVertexEdgeCompute = std::make_unique<RemoveVertexEdgeCompute>(&degrees);
computeState.edgeCompute = std::move(removeVertexEdgeCompute);
auto coreValue = 0u;
auto numNodes = graph->getNumNodes(clientContext->getTransaction());
auto numNodesComputed = 0u;
while (numNodes != numNodesComputed) {
// Compute current core value
while (true) {
std::atomic<offset_t> numActiveNodes;
numActiveNodes.store(0);
// Find nodes with degree less than current core.
auto vc = DegreeLessThanCoreVertexCompute(&degrees, &coreValues,
computeState.frontierPair.get(), coreValue, numActiveNodes);
GDSUtils::runVertexCompute(context, sharedState->graph.get(), vc);
numNodesComputed += numActiveNodes.load();
if (numActiveNodes.load() == 0) {
break;
}
// Remove found nodes by decreasing their nbrs degree by one.
computeState.frontierPair->setActiveNodesForNextIter();
computeState.frontierPair->getNextSparseFrontier().disable();
GDSUtils::runFrontiersUntilConvergence(context, computeState, graph,
ExtendDirection::BOTH,
computeState.frontierPair->getCurrentIter() + 1 /* maxIters */);
// Repeat until all remaining nodes has degree greater than current core.
}
coreValue++;
}
// Write output
auto writer = std::make_unique<KCoreOutputWriter>(clientContext,
sharedState->getOutputNodeMaskMap(), &coreValues);
auto vertexCompute = OutputVertexCompute(clientContext->getMemoryManager(),
sharedState.get(), std::move(writer));
GDSUtils::runVertexCompute(context, sharedState->graph.get(), vertexCompute);
sharedState->mergeLocalTables();
}

std::unique_ptr<GDSAlgorithm> copy() const override {
return std::make_unique<KCoreDecomposition>(*this);
}
};

function_set KCoreDecompositionFunction::getFunctionSet() {
function_set result;
auto algo = std::make_unique<KCoreDecomposition>();
auto function =
std::make_unique<GDSFunction>(name, algo->getParameterTypeIDs(), std::move(algo));
result.push_back(std::move(function));
return result;
}

} // namespace function
} // namespace kuzu
Loading

0 comments on commit 2e07232

Please sign in to comment.