Skip to content

Commit

Permalink
[MINOR] RMM Identity CLA
Browse files Browse the repository at this point in the history
verify on single SDC fix

systemds jar not cp improvement

Util equals

set debugging to true

estiamte and actual unique

sample size log output

fix minimum sample code

optimized double parsing

add double parser code

beginning frame arrays row parallel

arrays range

fix

parallel frame Apply

parallel

64 blocks

intermediate poit fix

remainder from string

default minimum lower

fix not equals

split float parse

notice

detect schema

reduce sample size ?

parallel memory size

read CSV single thread

more

improve allocation

debug disable

uncompressed

offset speedup

remove redundant import

merge
  • Loading branch information
Baunsgaard committed Jan 15, 2024
1 parent 02d4b01 commit c061d4a
Show file tree
Hide file tree
Showing 169 changed files with 9,035 additions and 3,025 deletions.
34 changes: 13 additions & 21 deletions bin/systemds
Original file line number Diff line number Diff line change
Expand Up @@ -402,28 +402,22 @@ NATIVE_LIBS="$SYSTEMDS_ROOT${DIR_SEP}target${DIR_SEP}classes${DIR_SEP}lib"
export PATH=${HADOOP_REL}${DIR_SEP}bin${PATH_SEP}${PATH}${PATH_SEP}$NATIVE_LIBS
export LD_LIBRARY_PATH=${HADOOP_REL}${DIR_SEP}bin${PATH_SEP}${LD_LIBRARY_PATH}

# set java class path
CLASSPATH="${SYSTEMDS_JAR_FILE}${PATH_SEP} \
${SYSTEMDS_ROOT}${DIR_SEP}lib${DIR_SEP}*${PATH_SEP} \
${SYSTEMDS_ROOT}${DIR_SEP}target${DIR_SEP}lib${DIR_SEP}*"
# trim whitespace (introduced by the line breaks above)
CLASSPATH=$(echo "${CLASSPATH}" | tr -d '[:space:]')

if [ $PRINT_SYSDS_HELP == 1 ]; then
echo "----------------------------------------------------------------------"
echo "Further help on SystemDS arguments:"
java -cp "$CLASSPATH" org.apache.sysds.api.DMLScript -help
java -jar $SYSTEMDS_JAR_FILE -help
exit 1
fi

print_out "###############################################################################"
print_out "# SYSTEMDS_ROOT= $SYSTEMDS_ROOT"
print_out "# SYSTEMDS_JAR_FILE= $SYSTEMDS_JAR_FILE"
print_out "# SYSDS_EXEC_MODE= $SYSDS_EXEC_MODE"
print_out "# CONFIG_FILE= $CONFIG_FILE"
print_out "# LOG4JPROP= $LOG4JPROP"
print_out "# CLASSPATH= $CLASSPATH"
print_out "# HADOOP_HOME= $HADOOP_HOME"
if [ $SYSDS_QUIET == 0 ]; then
print_out "###############################################################################"
print_out "# SYSTEMDS_ROOT= $SYSTEMDS_ROOT"
print_out "# SYSTEMDS_JAR_FILE= $SYSTEMDS_JAR_FILE"
print_out "# SYSDS_EXEC_MODE= $SYSDS_EXEC_MODE"
print_out "# CONFIG_FILE= $CONFIG_FILE"
print_out "# LOG4JPROP= $LOG4JPROP"
print_out "# HADOOP_HOME= $HADOOP_HOME"
fi

#build the command to run
if [ $WORKER == 1 ]; then
Expand All @@ -432,7 +426,7 @@ if [ $WORKER == 1 ]; then
print_out "###############################################################################"
CMD=" \
java $SYSTEMDS_STANDALONE_OPTS \
-cp $CLASSPATH \
-jar $SYSTEMDS_JAR_FILE \
$LOG4JPROPFULL \
org.apache.sysds.api.DMLScript \
-w $PORT \
Expand All @@ -447,9 +441,8 @@ elif [ "$FEDMONITORING" == 1 ]; then
print_out "###############################################################################"
CMD=" \
java $SYSTEMDS_STANDALONE_OPTS \
-cp $CLASSPATH \
$LOG4JPROPFULL \
org.apache.sysds.api.DMLScript \
-jar $SYSTEMDS_JAR_FILE \
-fedMonitoring $PORT \
$CONFIG_FILE \
$*"
Expand All @@ -462,9 +455,8 @@ elif [ $SYSDS_DISTRIBUTED == 0 ]; then
print_out "###############################################################################"
CMD=" \
java $SYSTEMDS_STANDALONE_OPTS \
-cp $CLASSPATH \
$LOG4JPROPFULL \
org.apache.sysds.api.DMLScript \
-jar $SYSTEMDS_JAR_FILE \
-f $SCRIPT_FILE \
-exec $SYSDS_EXEC_MODE \
$CONFIG_FILE \
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/org/apache/sysds/hops/AggBinaryOp.java
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,7 @@ private boolean isApplicableForTransitiveSparkExecType(boolean left)
|| (left && !isLeftTransposeRewriteApplicable(true)))
&& getInput(index).getParent().size()==1 //bagg is only parent
&& !getInput(index).areDimsBelowThreshold()
&& (getInput(index).optFindExecType() == ExecType.SPARK
|| (getInput(index) instanceof DataOp && ((DataOp)getInput(index)).hasOnlyRDD()))
&& getInput(index).hasSparkOutput()
&& getInput(index).getOutputMemEstimate()>getOutputMemEstimate();
}

Expand Down
78 changes: 53 additions & 25 deletions src/main/java/org/apache/sysds/hops/BinaryOp.java
Original file line number Diff line number Diff line change
Expand Up @@ -751,8 +751,8 @@ protected ExecType optFindExecType(boolean transitive) {

checkAndSetForcedPlatform();

DataType dt1 = getInput().get(0).getDataType();
DataType dt2 = getInput().get(1).getDataType();
final DataType dt1 = getInput(0).getDataType();
final DataType dt2 = getInput(1).getDataType();

if( _etypeForced != null ) {
_etype = _etypeForced;
Expand Down Expand Up @@ -801,18 +801,28 @@ else if ( dt1 == DataType.SCALAR && dt2 == DataType.MATRIX ) {
checkAndSetInvalidCPDimsAndSize();
}

//spark-specific decision refinement (execute unary scalar w/ spark input and
// spark-specific decision refinement (execute unary scalar w/ spark input and
// single parent also in spark because it's likely cheap and reduces intermediates)
if(transitive && _etype == ExecType.CP && _etypeForced != ExecType.CP && _etypeForced != ExecType.FED &&
getDataType().isMatrix() // output should be a matrix
&& (dt1.isScalar() || dt2.isScalar()) // one side should be scalar
&& supportsMatrixScalarOperations() // scalar operations
&& !(getInput().get(dt1.isScalar() ? 1 : 0) instanceof DataOp) // input is not checkpoint
&& getInput().get(dt1.isScalar() ? 1 : 0).getParent().size() == 1 // unary scalar is only parent
&& !HopRewriteUtils.isSingleBlock(getInput().get(dt1.isScalar() ? 1 : 0)) // single block triggered exec
&& getInput().get(dt1.isScalar() ? 1 : 0).optFindExecType() == ExecType.SPARK) {
// pull unary scalar operation into spark
_etype = ExecType.SPARK;
if(transitive // we allow transitive Spark operations. continue sequences of spark operations
&& _etype == ExecType.CP // The instruction is currently in CP
&& _etypeForced != ExecType.CP // not forced CP
&& _etypeForced != ExecType.FED // not federated
&& (getDataType().isMatrix() || getDataType().isFrame()) // output should be a matrix or frame
) {
final boolean v1 = getInput(0).isScalarOrVectorBellowBlockSize();
final boolean v2 = getInput(1).isScalarOrVectorBellowBlockSize();
final boolean left = v1 == true; // left side is the vector or scalar
final Hop sparkIn = getInput(left ? 1 : 0);
if((v1 ^ v2) // XOR only one side is allowed to be a vector or a scalar.
&& (supportsMatrixScalarOperations() || op == OpOp2.APPLY_SCHEMA) // supported operation
&& sparkIn.getParent().size() == 1 // only one parent
&& !HopRewriteUtils.isSingleBlock(sparkIn) // single block triggered exec
&& sparkIn.optFindExecType() == ExecType.SPARK // input was spark op.
&& !(sparkIn instanceof DataOp) // input is not checkpoint
) {
// pull operation into spark
_etype = ExecType.SPARK;
}
}

if( OptimizerUtils.ALLOW_BINARY_UPDATE_IN_PLACE &&
Expand Down Expand Up @@ -842,7 +852,7 @@ else if( (op == OpOp2.CBIND && getDataType().isList())
|| (op == OpOp2.RBIND && getDataType().isList())) {
_etype = ExecType.CP;
}

//mark for recompile (forever)
setRequiresRecompileIfNecessary();

Expand Down Expand Up @@ -1157,17 +1167,35 @@ && getInput().get(0) == that2.getInput().get(0)
}

public boolean supportsMatrixScalarOperations() {
return ( op==OpOp2.PLUS ||op==OpOp2.MINUS
||op==OpOp2.MULT ||op==OpOp2.DIV
||op==OpOp2.MODULUS ||op==OpOp2.INTDIV
||op==OpOp2.LESS ||op==OpOp2.LESSEQUAL
||op==OpOp2.GREATER ||op==OpOp2.GREATEREQUAL
||op==OpOp2.EQUAL ||op==OpOp2.NOTEQUAL
||op==OpOp2.MIN ||op==OpOp2.MAX
||op==OpOp2.LOG ||op==OpOp2.POW
||op==OpOp2.AND ||op==OpOp2.OR ||op==OpOp2.XOR
||op==OpOp2.BITWAND ||op==OpOp2.BITWOR ||op==OpOp2.BITWXOR
||op==OpOp2.BITWSHIFTL ||op==OpOp2.BITWSHIFTR);
switch(op) {
case PLUS:
case MINUS:
case MULT:
case DIV:
case MODULUS:
case INTDIV:
case LESS:
case LESSEQUAL:
case GREATER:
case GREATEREQUAL:
case EQUAL:
case NOTEQUAL:
case MIN:
case MAX:
case LOG:
case POW:
case AND:
case OR:
case XOR:
case BITWAND:
case BITWOR:
case BITWXOR:
case BITWSHIFTL:
case BITWSHIFTR:
return true;
default:
return false;
}
}

public boolean isPPredOperation() {
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/org/apache/sysds/hops/DataOp.java
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,8 @@ public boolean allowsAllExecTypes()
protected double computeOutputMemEstimate( long dim1, long dim2, long nnz )
{
double ret = 0;

if ( getDataType() == DataType.SCALAR )
final DataType dt = getDataType();
if ( dt == DataType.SCALAR )
{
switch( getValueType() )
{
Expand All @@ -407,6 +407,11 @@ protected double computeOutputMemEstimate( long dim1, long dim2, long nnz )
ret = 0;
}
}
else if(dt == DataType.FRAME) {
if(_op == OpOpData.PERSISTENTREAD || _op == OpOpData.TRANSIENTREAD) {
ret = OptimizerUtils.estimateSizeExactFrame(dim1, dim2);
}
}
else //MATRIX / FRAME
{
if( _op == OpOpData.PERSISTENTREAD
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/org/apache/sysds/hops/Hop.java
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,12 @@ public final String toString() {
// ========================================================================================


protected boolean isScalarOrVectorBellowBlockSize(){
return getDataType().isScalar() || (dimsKnown() &&
(( _dc.getRows() == 1 && _dc.getCols() < ConfigurationManager.getBlocksize())
|| _dc.getCols() == 1 && _dc.getRows() < ConfigurationManager.getBlocksize()));
}

protected boolean isVector() {
return (dimsKnown() && (_dc.getRows() == 1 || _dc.getCols() == 1) );
}
Expand Down Expand Up @@ -1702,6 +1708,11 @@ protected void setMemoryAndComputeEstimates(Lop lop) {
lop.setComputeEstimate(ComputeCost.getHOPComputeCost(this));
}

protected boolean hasSparkOutput(){
return (this.optFindExecType() == ExecType.SPARK
|| (this instanceof DataOp && ((DataOp)this).hasOnlyRDD()));
}

/**
* Set parse information.
*
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/org/apache/sysds/hops/OptimizerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.util.IndexRange;
import org.apache.sysds.runtime.util.UtilFunctions;
import org.apache.sysds.utils.MemoryEstimates;

public class OptimizerUtils
{
Expand Down Expand Up @@ -788,6 +789,15 @@ public static long estimateSizeExactSparsity(long nrows, long ncols, long nnz)
double sp = getSparsity(nrows, ncols, nnz);
return estimateSizeExactSparsity(nrows, ncols, sp);
}


public static long estimateSizeExactFrame(long nRows, long nCols){
if(nRows > Integer.MAX_VALUE)
return Long.MAX_VALUE;

// assuming String arrays and on average 8 characters per value.
return (long)MemoryEstimates.stringArrayCost((int)nRows, 8) * nCols;
}

/**
* Estimates the footprint (in bytes) for an in-memory representation of a
Expand Down
34 changes: 24 additions & 10 deletions src/main/java/org/apache/sysds/hops/UnaryOp.java
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,11 @@ protected double computeOutputMemEstimate( long dim1, long dim2, long nnz )
} else {
sparsity = OptimizerUtils.getSparsity(dim1, dim2, nnz);
}
return OptimizerUtils.estimateSizeExactSparsity(dim1, dim2, sparsity);

if(getDataType() == DataType.FRAME)
return OptimizerUtils.estimateSizeExactFrame(dim1, dim2);
else
return OptimizerUtils.estimateSizeExactSparsity(dim1, dim2, sparsity);
}

@Override
Expand Down Expand Up @@ -468,6 +472,13 @@ public boolean isMetadataOperation() {
|| _op == OpOp1.CAST_AS_LIST;
}

private boolean isDisallowedSparkOps(){
return isCumulativeUnaryOperation()
|| isCastUnaryOperation()
|| _op==OpOp1.MEDIAN
|| _op==OpOp1.IQM;
}

@Override
protected ExecType optFindExecType(boolean transitive)
{
Expand Down Expand Up @@ -498,19 +509,22 @@ else if ( getInput().get(0).areDimsBelowThreshold() || getInput().get(0).isVecto
checkAndSetInvalidCPDimsAndSize();
}


//spark-specific decision refinement (execute unary w/ spark input and
//single parent also in spark because it's likely cheap and reduces intermediates)
if( _etype == ExecType.CP && _etypeForced != ExecType.CP
&& getInput().get(0).optFindExecType() == ExecType.SPARK
&& getDataType().isMatrix()
&& !isCumulativeUnaryOperation() && !isCastUnaryOperation()
&& _op!=OpOp1.MEDIAN && _op!=OpOp1.IQM
&& !(getInput().get(0) instanceof DataOp) //input is not checkpoint
&& getInput().get(0).getParent().size()==1 ) //unary is only parent
{
if(_etype == ExecType.CP // currently CP instruction
&& _etype != ExecType.SPARK /// currently not SP.
&& _etypeForced != ExecType.CP // not forced as CP instruction
&& getInput(0).hasSparkOutput() // input is a spark instruction
&& (getDataType().isMatrix() || getDataType().isFrame()) // output is a matrix or frame
&& !isDisallowedSparkOps() // is invalid spark instruction
// && !(getInput().get(0) instanceof DataOp) // input is not checkpoint
// && getInput(0).getParent().size() <= 1// unary is only parent
) {
//pull unary operation into spark
_etype = ExecType.SPARK;
}


//mark for recompile (forever)
setRequiresRecompileIfNecessary();
Expand All @@ -524,7 +538,7 @@ && getInput().get(0).getParent().size()==1 ) //unary is only parent
} else {
setRequiresRecompileIfNecessary();
}

return _etype;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1206,6 +1206,11 @@ public static boolean isSumSq(Hop hop) {
public static boolean isParameterBuiltinOp(Hop hop, ParamBuiltinOp type) {
return hop instanceof ParameterizedBuiltinOp && ((ParameterizedBuiltinOp) hop).getOp().equals(type);
}

public static boolean isParameterBuiltinOp(Hop hop, ParamBuiltinOp... types) {
return hop instanceof ParameterizedBuiltinOp &&
ArrayUtils.contains(types, ((ParameterizedBuiltinOp) hop).getOp());
}

public static boolean isRemoveEmpty(Hop hop, boolean rows) {
return isParameterBuiltinOp(hop, ParamBuiltinOp.RMEMPTY)
Expand Down
Loading

0 comments on commit c061d4a

Please sign in to comment.