Skip to content

Commit

Permalink
mix of many things... should be separated.
Browse files Browse the repository at this point in the history
  • Loading branch information
Baunsgaard committed Nov 30, 2023
1 parent 88fe2b0 commit 2526b10
Show file tree
Hide file tree
Showing 152 changed files with 6,167 additions and 1,667 deletions.
16 changes: 9 additions & 7 deletions scripts/builtin/kmeans.dml
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ m_kmeans = function(Matrix[Double] X, Integer k = 10, Integer runs = 10, Integer
Integer seed = -1)
return (Matrix[Double] C, Matrix[Double] Y)
{

tStart = time()
if( is_verbose )
print ("BEGIN K-MEANS SCRIPT");
print ("BEGIN K-MEANS SCRIPT ... " + (tStart-time()) );

num_records = nrow (X);
num_features = ncol (X);
Expand All @@ -61,7 +63,7 @@ m_kmeans = function(Matrix[Double] X, Integer k = 10, Integer runs = 10, Integer
# STEP 1: INITIALIZE CENTROIDS FOR ALL RUNS FROM DATA SAMPLES:

if( is_verbose )
print ("Taking data samples for initialization...");
print ("Taking data samples for initialization..." + (tStart-time()));

[sample_maps, samples_vs_runs_map, sample_block_size] = get_sample_maps(
num_records, num_runs, num_centroids * avg_sample_size_per_centroid, seed);
Expand All @@ -71,7 +73,7 @@ m_kmeans = function(Matrix[Double] X, Integer k = 10, Integer runs = 10, Integer
X_samples_sq_norms = rowSums (X_samples ^ 2);

if( is_verbose )
print ("Initializing the centroids for all runs...");
print ("Initializing the centroids for all runs... " +(tStart-time()));

All_Centroids = matrix (0, num_runs * num_centroids, num_features);

Expand All @@ -95,8 +97,8 @@ m_kmeans = function(Matrix[Double] X, Integer k = 10, Integer runs = 10, Integer
centroid_ids = t(colSums (cdf_min_distances < threshold_matrix)) + 1;

# Place the selected centroids together, one per run, into a matrix:
centroid_placer = table (seq (1, num_runs),
sample_block_size * seq (0, num_runs - 1) + centroid_ids, num_runs, sample_block_size * num_runs);
centroid_placer = table (seq (1, num_runs), sample_block_size * seq (0, num_runs - 1) + centroid_ids,
num_runs, sample_block_size * num_runs);
centroids = centroid_placer %*% X_samples;

# Place the selected centroids into their appropriate slots in All_Centroids:
Expand All @@ -117,7 +119,7 @@ m_kmeans = function(Matrix[Double] X, Integer k = 10, Integer runs = 10, Integer
num_iterations = matrix (0, rows = num_runs, cols = 1);

if( is_verbose )
print ("Performing k-means iterations for all runs...");
print ("Performing k-means iterations for all runs..." + (tStart-time()));

parfor (run_index in 1 : num_runs, check = 0)
{
Expand All @@ -144,7 +146,7 @@ m_kmeans = function(Matrix[Double] X, Integer k = 10, Integer runs = 10, Integer
+ "; Centroid change (avg.sq.dist.) = " + (sum ((C - C_old) ^ 2) / num_centroids));
}

# Find the closest centroid for each record
# Find the closest centroid for each record (possibly multiple ones)
P = D <= minD;
# If some records belong to multiple centroids, share them equally
P = P / rowSums (P);
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/org/apache/sysds/hops/AggBinaryOp.java
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,7 @@ private boolean isApplicableForTransitiveSparkExecType(boolean left)
|| (left && !isLeftTransposeRewriteApplicable(true)))
&& getInput(index).getParent().size()==1 //bagg is only parent
&& !getInput(index).areDimsBelowThreshold()
&& (getInput(index).optFindExecType() == ExecType.SPARK
|| (getInput(index) instanceof DataOp && ((DataOp)getInput(index)).hasOnlyRDD()))
&& getInput(index).hasSparkOutput()
&& getInput(index).getOutputMemEstimate()>getOutputMemEstimate();
}

Expand Down
78 changes: 53 additions & 25 deletions src/main/java/org/apache/sysds/hops/BinaryOp.java
Original file line number Diff line number Diff line change
Expand Up @@ -751,8 +751,8 @@ protected ExecType optFindExecType(boolean transitive) {

checkAndSetForcedPlatform();

DataType dt1 = getInput().get(0).getDataType();
DataType dt2 = getInput().get(1).getDataType();
final DataType dt1 = getInput(0).getDataType();
final DataType dt2 = getInput(1).getDataType();

if( _etypeForced != null ) {
_etype = _etypeForced;
Expand Down Expand Up @@ -801,18 +801,28 @@ else if ( dt1 == DataType.SCALAR && dt2 == DataType.MATRIX ) {
checkAndSetInvalidCPDimsAndSize();
}

//spark-specific decision refinement (execute unary scalar w/ spark input and
// spark-specific decision refinement (execute unary scalar w/ spark input and
// single parent also in spark because it's likely cheap and reduces intermediates)
if(transitive && _etype == ExecType.CP && _etypeForced != ExecType.CP && _etypeForced != ExecType.FED &&
getDataType().isMatrix() // output should be a matrix
&& (dt1.isScalar() || dt2.isScalar()) // one side should be scalar
&& supportsMatrixScalarOperations() // scalar operations
&& !(getInput().get(dt1.isScalar() ? 1 : 0) instanceof DataOp) // input is not checkpoint
&& getInput().get(dt1.isScalar() ? 1 : 0).getParent().size() == 1 // unary scalar is only parent
&& !HopRewriteUtils.isSingleBlock(getInput().get(dt1.isScalar() ? 1 : 0)) // single block triggered exec
&& getInput().get(dt1.isScalar() ? 1 : 0).optFindExecType() == ExecType.SPARK) {
// pull unary scalar operation into spark
_etype = ExecType.SPARK;
if(transitive // we allow transitive Spark operations. continue sequences of spark operations
&& _etype == ExecType.CP // The instruction is currently in CP
&& _etypeForced != ExecType.CP // not forced CP
&& _etypeForced != ExecType.FED // not federated
&& (getDataType().isMatrix() || getDataType().isFrame()) // output should be a matrix or frame
) {
final boolean v1 = getInput(0).isScalarOrVectorBellowBlockSize();
final boolean v2 = getInput(1).isScalarOrVectorBellowBlockSize();
final boolean left = v1 == true; // left side is the vector or scalar
final Hop sparkIn = getInput(left ? 1 : 0);
if((v1 ^ v2) // XOR only one side is allowed to be a vector or a scalar.
&& (supportsMatrixScalarOperations() || op == OpOp2.APPLY_SCHEMA) // supported operation
&& sparkIn.getParent().size() == 1 // only one parent
&& !HopRewriteUtils.isSingleBlock(sparkIn) // single block triggered exec
&& sparkIn.optFindExecType() == ExecType.SPARK // input was spark op.
&& !(sparkIn instanceof DataOp) // input is not checkpoint
) {
// pull operation into spark
_etype = ExecType.SPARK;
}
}

if( OptimizerUtils.ALLOW_BINARY_UPDATE_IN_PLACE &&
Expand Down Expand Up @@ -842,7 +852,7 @@ else if( (op == OpOp2.CBIND && getDataType().isList())
|| (op == OpOp2.RBIND && getDataType().isList())) {
_etype = ExecType.CP;
}

//mark for recompile (forever)
setRequiresRecompileIfNecessary();

Expand Down Expand Up @@ -1157,17 +1167,35 @@ && getInput().get(0) == that2.getInput().get(0)
}

public boolean supportsMatrixScalarOperations() {
return ( op==OpOp2.PLUS ||op==OpOp2.MINUS
||op==OpOp2.MULT ||op==OpOp2.DIV
||op==OpOp2.MODULUS ||op==OpOp2.INTDIV
||op==OpOp2.LESS ||op==OpOp2.LESSEQUAL
||op==OpOp2.GREATER ||op==OpOp2.GREATEREQUAL
||op==OpOp2.EQUAL ||op==OpOp2.NOTEQUAL
||op==OpOp2.MIN ||op==OpOp2.MAX
||op==OpOp2.LOG ||op==OpOp2.POW
||op==OpOp2.AND ||op==OpOp2.OR ||op==OpOp2.XOR
||op==OpOp2.BITWAND ||op==OpOp2.BITWOR ||op==OpOp2.BITWXOR
||op==OpOp2.BITWSHIFTL ||op==OpOp2.BITWSHIFTR);
switch(op) {
case PLUS:
case MINUS:
case MULT:
case DIV:
case MODULUS:
case INTDIV:
case LESS:
case LESSEQUAL:
case GREATER:
case GREATEREQUAL:
case EQUAL:
case NOTEQUAL:
case MIN:
case MAX:
case LOG:
case POW:
case AND:
case OR:
case XOR:
case BITWAND:
case BITWOR:
case BITWXOR:
case BITWSHIFTL:
case BITWSHIFTR:
return true;
default:
return false;
}
}

public boolean isPPredOperation() {
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/org/apache/sysds/hops/DataOp.java
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,8 @@ public boolean allowsAllExecTypes()
protected double computeOutputMemEstimate( long dim1, long dim2, long nnz )
{
double ret = 0;

if ( getDataType() == DataType.SCALAR )
final DataType dt = getDataType();
if ( dt == DataType.SCALAR )
{
switch( getValueType() )
{
Expand All @@ -407,6 +407,11 @@ protected double computeOutputMemEstimate( long dim1, long dim2, long nnz )
ret = 0;
}
}
else if(dt == DataType.FRAME) {
if(_op == OpOpData.PERSISTENTREAD || _op == OpOpData.TRANSIENTREAD) {
ret = OptimizerUtils.estimateSizeExactFrame(dim1, dim2);
}
}
else //MATRIX / FRAME
{
if( _op == OpOpData.PERSISTENTREAD
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/org/apache/sysds/hops/Hop.java
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,12 @@ public final String toString() {
// ========================================================================================


protected boolean isScalarOrVectorBellowBlockSize(){
return getDataType().isScalar() || (dimsKnown() &&
(( _dc.getRows() == 1 && _dc.getCols() < ConfigurationManager.getBlocksize())
|| _dc.getCols() == 1 && _dc.getRows() < ConfigurationManager.getBlocksize()));
}

protected boolean isVector() {
return (dimsKnown() && (_dc.getRows() == 1 || _dc.getCols() == 1) );
}
Expand Down Expand Up @@ -1702,6 +1708,11 @@ protected void setMemoryAndComputeEstimates(Lop lop) {
lop.setComputeEstimate(ComputeCost.getHOPComputeCost(this));
}

protected boolean hasSparkOutput(){
return (this.optFindExecType() == ExecType.SPARK
|| (this instanceof DataOp && ((DataOp)this).hasOnlyRDD()));
}

/**
* Set parse information.
*
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/org/apache/sysds/hops/OptimizerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.util.IndexRange;
import org.apache.sysds.runtime.util.UtilFunctions;
import org.apache.sysds.utils.MemoryEstimates;

public class OptimizerUtils
{
Expand Down Expand Up @@ -782,6 +783,15 @@ public static long estimateSizeExactSparsity(long nrows, long ncols, long nnz)
double sp = getSparsity(nrows, ncols, nnz);
return estimateSizeExactSparsity(nrows, ncols, sp);
}


public static long estimateSizeExactFrame(long nRows, long nCols){
if(nRows > Integer.MAX_VALUE)
return Long.MAX_VALUE;

// assuming String arrays and on average 8 characters per value.
return (long)MemoryEstimates.stringArrayCost((int)nRows, 8) * nCols;
}

/**
* Estimates the footprint (in bytes) for an in-memory representation of a
Expand Down
34 changes: 24 additions & 10 deletions src/main/java/org/apache/sysds/hops/UnaryOp.java
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,11 @@ protected double computeOutputMemEstimate( long dim1, long dim2, long nnz )
} else {
sparsity = OptimizerUtils.getSparsity(dim1, dim2, nnz);
}
return OptimizerUtils.estimateSizeExactSparsity(dim1, dim2, sparsity);

if(getDataType() == DataType.FRAME)
return OptimizerUtils.estimateSizeExactFrame(dim1, dim2);
else
return OptimizerUtils.estimateSizeExactSparsity(dim1, dim2, sparsity);
}

@Override
Expand Down Expand Up @@ -465,6 +469,13 @@ public boolean isMetadataOperation() {
|| _op == OpOp1.CAST_AS_LIST;
}

private boolean isDisallowedSparkOps(){
return isCumulativeUnaryOperation()
|| isCastUnaryOperation()
|| _op==OpOp1.MEDIAN
|| _op==OpOp1.IQM;
}

@Override
protected ExecType optFindExecType(boolean transitive)
{
Expand Down Expand Up @@ -495,19 +506,22 @@ else if ( getInput().get(0).areDimsBelowThreshold() || getInput().get(0).isVecto
checkAndSetInvalidCPDimsAndSize();
}


//spark-specific decision refinement (execute unary w/ spark input and
//single parent also in spark because it's likely cheap and reduces intermediates)
if( _etype == ExecType.CP && _etypeForced != ExecType.CP
&& getInput().get(0).optFindExecType() == ExecType.SPARK
&& getDataType().isMatrix()
&& !isCumulativeUnaryOperation() && !isCastUnaryOperation()
&& _op!=OpOp1.MEDIAN && _op!=OpOp1.IQM
&& !(getInput().get(0) instanceof DataOp) //input is not checkpoint
&& getInput().get(0).getParent().size()==1 ) //unary is only parent
{
if(_etype == ExecType.CP // currently CP instruction
&& _etype != ExecType.SPARK /// currently not SP.
&& _etypeForced != ExecType.CP // not forced as CP instruction
&& getInput(0).hasSparkOutput() // input is a spark instruction
&& (getDataType().isMatrix() || getDataType().isFrame()) // output is a matrix or frame
&& !isDisallowedSparkOps() // is invalid spark instruction
// && !(getInput().get(0) instanceof DataOp) // input is not checkpoint
// && getInput(0).getParent().size() <= 1// unary is only parent
) {
//pull unary operation into spark
_etype = ExecType.SPARK;
}


//mark for recompile (forever)
setRequiresRecompileIfNecessary();
Expand All @@ -521,7 +535,7 @@ && getInput().get(0).getParent().size()==1 ) //unary is only parent
} else {
setRequiresRecompileIfNecessary();
}

return _etype;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,13 +539,13 @@ public static ArrayList<Hop> optimize(ArrayList<Hop> roots, boolean recompile)
}

//explain debug output cplans or generated source code
if( LOG.isTraceEnabled() || DMLScript.EXPLAIN.isHopsType(recompile) ) {
if( LOG.isInfoEnabled() || DMLScript.EXPLAIN.isHopsType(recompile) ) {
LOG.info("Codegen EXPLAIN (generated cplan for HopID: " + cplan.getKey() +
", line "+tmp.getValue().getBeginLine() + ", hash="+tmp.getValue().hashCode()+"):");
LOG.info(tmp.getValue().getClassname()
+ Explain.explainCPlan(cplan.getValue().getValue()));
}
if( LOG.isTraceEnabled() || DMLScript.EXPLAIN.isRuntimeType(recompile) ) {
if( LOG.isInfoEnabled() || DMLScript.EXPLAIN.isRuntimeType(recompile) ) {
LOG.info("JAVA Codegen EXPLAIN (generated code for HopID: " + cplan.getKey() +
", line "+tmp.getValue().getBeginLine() + ", hash="+tmp.getValue().hashCode()+"):");
LOG.info(CodegenUtils.printWithLineNumber(src));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1206,6 +1206,11 @@ public static boolean isSumSq(Hop hop) {
public static boolean isParameterBuiltinOp(Hop hop, ParamBuiltinOp type) {
return hop instanceof ParameterizedBuiltinOp && ((ParameterizedBuiltinOp) hop).getOp().equals(type);
}

public static boolean isParameterBuiltinOp(Hop hop, ParamBuiltinOp... types) {
return hop instanceof ParameterizedBuiltinOp &&
ArrayUtils.contains(types, ((ParameterizedBuiltinOp) hop).getOp());
}

public static boolean isRemoveEmpty(Hop hop, boolean rows) {
return isParameterBuiltinOp(hop, ParamBuiltinOp.RMEMPTY)
Expand Down
Loading

0 comments on commit 2526b10

Please sign in to comment.