Skip to content
This repository has been archived by the owner on Oct 30, 2023. It is now read-only.

GIRAPH-1165: Skip iterating through vertices in supersteps with just … #54

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,9 @@ public void compute(Vertex vertex, Iterable messages) throws IOException {
public void postSuperstep() {
workerLogic.postSuperstep();
}

@Override
public boolean isVertexNoOp() {
return workerLogic.isVertexNoOp();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@

import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
import org.apache.giraph.block_app.framework.piece.AbstractPiece.InnerVertexReceiver;
import org.apache.giraph.block_app.framework.piece.AbstractPiece.InnerVertexSender;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
import org.apache.giraph.graph.Vertex;

/**
Expand All @@ -31,7 +30,7 @@
public class BlockWorkerLogic {
private final BlockWorkerPieces pieces;

private transient VertexReceiver receiveFunctions;
private transient InnerVertexReceiver receiveFunctions;
private transient InnerVertexSender sendFunctions;

public BlockWorkerLogic(BlockWorkerPieces pieces) {
Expand Down Expand Up @@ -60,11 +59,22 @@ public void compute(Vertex vertex, Iterable messages) {
}

public void postSuperstep() {
if (receiveFunctions instanceof VertexPostprocessor) {
((VertexPostprocessor) receiveFunctions).postprocess();
if (receiveFunctions != null) {
receiveFunctions.postprocess();
}
if (sendFunctions != null) {
sendFunctions.postprocess();
}
}

/**
* Return true iff compute() function is empty
*
* @return True iff compute function is empty, so we know we don't have to
* iterate through vertices
*/
public boolean isVertexNoOp() {
return (receiveFunctions == null || receiveFunctions.isVertexNoOp()) &&
(sendFunctions == null || sendFunctions.isVertexNoOp());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
import org.apache.giraph.block_app.framework.piece.AbstractPiece;
import org.apache.giraph.block_app.framework.piece.AbstractPiece.InnerVertexReceiver;
import org.apache.giraph.block_app.framework.piece.AbstractPiece.InnerVertexSender;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
import org.apache.hadoop.io.Writable;

/**
Expand Down Expand Up @@ -73,10 +73,10 @@ public void masterCompute(BlockMasterApi masterApi) {
}
}

public VertexReceiver getVertexReceiver(
public InnerVertexReceiver getVertexReceiver(
BlockWorkerReceiveApi receiveApi) {
if (piece != null) {
return piece.getVertexReceiver(receiveApi, executionStage);
return piece.getWrappedVertexReceiver(receiveApi, executionStage);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,22 @@ public abstract void wrappedRegisterReducers(
// getVertexSender(BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage)

/**
* Add automatic handling of reducers to getVertexSender.
* Add automatic handling of reducers to getVertexSender and vertex no-op
* check.
*
* Only for Framework internal use.
*/
public abstract InnerVertexSender getWrappedVertexSender(
final BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage);

/**
* Add vertex no-op check.
*
* Only for Framework internal use.
*/
public abstract InnerVertexReceiver getWrappedVertexReceiver(
final BlockWorkerReceiveApi<I> workerApi, S executionStage);

/**
* Override to have worker context send computation.
*
Expand Down Expand Up @@ -186,25 +195,6 @@ public void workerContextReceive(
WV workerValue, List<WM> workerMessages) {
}

/**
* Override to do vertex receive processing.
*
* Creates handler that defines what should be executed on worker
* for each vertex during receive phase.
*
* This logic executed last.
* This function is called once on each worker on each thread, in parallel,
* on their copy of Piece object to create functions handler.
*
* If returned object implements Postprocessor interface, then corresponding
* postprocess() function is going to be called once, after all vertices
* corresponding thread needed to process are done.
*/
public VertexReceiver<I, V, E, M> getVertexReceiver(
BlockWorkerReceiveApi<I> workerApi, S executionStage) {
return null;
}

/**
* Returns MessageClasses definition for messages being sent by this Piece.
*/
Expand Down Expand Up @@ -242,13 +232,33 @@ public abstract class InnerVertexSender
implements VertexSender<I, V, E>, VertexPostprocessor {
@Override
public void postprocess() { }

/**
* Return true iff vertexSend function is empty
*
* @return True iff vertexSend function is empty,
* so we know we don't have to iterate through vertices
*/
public boolean isVertexNoOp() {
return false;
}
}

/** Inner class to provide clean use without specifying types */
public abstract class InnerVertexReceiver
implements VertexReceiver<I, V, E, M>, VertexPostprocessor {
@Override
public void postprocess() { }

/**
* Return true iff vertexReceive function is empty
*
* @return True iff vertexReceive function is empty,
* so we know we don't have to iterate through vertices
*/
public boolean isVertexNoOp() {
return false;
}
}

// Internal implementation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package org.apache.giraph.block_app.framework.piece;

import org.apache.giraph.block_app.framework.api.BlockMasterApi;
import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
import org.apache.giraph.block_app.framework.api.CreateReducersApi;
import org.apache.giraph.block_app.framework.piece.global_comm.ReduceUtilsObject;
import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
import org.apache.giraph.block_app.framework.piece.global_comm.internal.CreateReducersApiWrapper;
import org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
import org.apache.giraph.block_app.framework.piece.messages.ObjectMessageClasses;
import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf;
Expand Down Expand Up @@ -135,6 +137,25 @@ public VertexSender<I, V, E> getVertexSender(
return null;
}

/**
* Override to do vertex receive processing.
*
* Creates handler that defines what should be executed on worker
* for each vertex during receive phase.
*
* This logic executed last.
* This function is called once on each worker on each thread, in parallel,
* on their copy of Piece object to create functions handler.
*
* If returned object implements Postprocessor interface, then corresponding
* postprocess() function is going to be called once, after all vertices
* corresponding thread needed to process are done.
*/
public VertexReceiver<I, V, E, M> getVertexReceiver(
BlockWorkerReceiveApi<I> workerApi, S executionStage) {
return null;
}

/**
* Override to specify type of the message this Piece sends, if it does
* send messages.
Expand Down Expand Up @@ -284,6 +305,12 @@ public void vertexSend(Vertex<I, V, E> vertex) {
functions.vertexSend(vertex);
}
}

@Override
public boolean isVertexNoOp() {
return functions == null;
}

@Override
public void postprocess() {
if (functions instanceof VertexPostprocessor) {
Expand All @@ -294,6 +321,33 @@ public void postprocess() {
};
}

@Override
public final InnerVertexReceiver getWrappedVertexReceiver(
final BlockWorkerReceiveApi<I> workerApi, S executionStage) {
final VertexReceiver<I, V, E, M> functions =
getVertexReceiver(workerApi, executionStage);
return new InnerVertexReceiver() {
@Override
public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
if (functions != null) {
functions.vertexReceive(vertex, messages);
}
}

@Override
public boolean isVertexNoOp() {
return functions == null;
}

@Override
public void postprocess() {
if (functions instanceof VertexPostprocessor) {
((VertexPostprocessor) functions).postprocess();
}
}
};
}

@Override
public final void wrappedRegisterReducers(
BlockMasterApi masterApi, S executionStage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ protected DelegateWorkerSendFunctions delegateWorkerSendFunctions(
}

protected DelegateWorkerReceiveFunctions delegateWorkerReceiveFunctions(
ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions,
ArrayList<InnerVertexReceiver> workerReceiveFunctions,
BlockWorkerReceiveApi<I> workerApi, S executionStage) {
return new DelegateWorkerReceiveFunctions(workerReceiveFunctions);
}
Expand All @@ -115,13 +115,13 @@ public InnerVertexSender getWrappedVertexSender(
}

@Override
public InnerVertexReceiver getVertexReceiver(
public InnerVertexReceiver getWrappedVertexReceiver(
BlockWorkerReceiveApi<I> workerApi, S executionStage) {
ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions =
ArrayList<InnerVertexReceiver> workerReceiveFunctions =
new ArrayList<>(innerPieces.size());
for (AbstractPiece<I, V, E, M, WV, WM, S> innerPiece : innerPieces) {
workerReceiveFunctions.add(
innerPiece.getVertexReceiver(workerApi, executionStage));
innerPiece.getWrappedVertexReceiver(workerApi, executionStage));
}
return delegateWorkerReceiveFunctions(
workerReceiveFunctions, workerApi, executionStage);
Expand All @@ -145,6 +145,16 @@ public void vertexSend(Vertex<I, V, E> vertex) {
}
}

@Override
public boolean isVertexNoOp() {
for (InnerVertexSender functions : workerSendFunctions) {
if (functions != null && !functions.isVertexNoOp()) {
return false;
}
}
return true;
}

@Override
public void postprocess() {
for (InnerVertexSender functions : workerSendFunctions) {
Expand All @@ -157,11 +167,11 @@ public void postprocess() {

/** Delegating WorkerReceivePiece */
protected class DelegateWorkerReceiveFunctions extends InnerVertexReceiver {
private final ArrayList<VertexReceiver<I, V, E, M>>
private final ArrayList<InnerVertexReceiver>
workerReceiveFunctions;

public DelegateWorkerReceiveFunctions(
ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions) {
ArrayList<InnerVertexReceiver> workerReceiveFunctions) {
this.workerReceiveFunctions = workerReceiveFunctions;
}

Expand All @@ -175,6 +185,16 @@ public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
}
}

@Override
public boolean isVertexNoOp() {
for (InnerVertexReceiver functions : workerReceiveFunctions) {
if (functions != null && !functions.isVertexNoOp()) {
return false;
}
}
return true;
}

@Override
public void postprocess() {
for (VertexReceiver<I, V, E, M> functions :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
import org.apache.giraph.block_app.framework.piece.AbstractPiece;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
import org.apache.giraph.function.vertex.SupplierFromVertex;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.Writable;
Expand Down Expand Up @@ -122,12 +121,17 @@ public void vertexSend(Vertex<I, V, E> vertex) {
super.vertexSend(vertex);
}
}

@Override
public boolean isVertexNoOp() {
return toCallSend == null;
}
};
}

@Override
protected DelegateWorkerReceiveFunctions delegateWorkerReceiveFunctions(
ArrayList<VertexReceiver<I, V, E, M>> workerReceiveFunctions,
ArrayList<InnerVertexReceiver> workerReceiveFunctions,
BlockWorkerReceiveApi<I> workerApi, S executionStage) {
return new DelegateWorkerReceiveFunctions(workerReceiveFunctions) {
@Override
Expand All @@ -136,6 +140,11 @@ public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
super.vertexReceive(vertex, messages);
}
}

@Override
public boolean isVertexNoOp() {
return toCallReceive == null;
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ public class Pieces {

private Pieces() { }

/**
* Piece which does nothing
*/
public static Piece<WritableComparable, Writable, Writable, NoMessage,
Object> noOpPiece() {
return new Piece<WritableComparable, Writable, Writable, NoMessage,
Object>() {
};
}

/**
* For each vertex execute given process function.
* Computation is happening in send phase of the returned Piece.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,4 +294,9 @@ public final int getWorkerForVertex(I vertexId) {
return allWorkersInfo.getWorkerIndex(
serviceWorker.getVertexPartitionOwner(vertexId).getWorkerInfo());
}

@Override
public boolean isVertexNoOp() {
return false;
}
}
Loading