Skip to content

Commit

Permalink
[minor] Compression SP Schema apply
Browse files Browse the repository at this point in the history
Closes #2215
  • Loading branch information
Baunsgaard committed Feb 5, 2025
1 parent dc3947a commit 5ff6274
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.sysds.lops.WeightedUnaryMM;
import org.apache.sysds.lops.WeightedUnaryMMR;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.instructions.cp.CPInstruction.CPType;
import org.apache.sysds.runtime.instructions.cp.CPOperand;
import org.apache.sysds.runtime.instructions.spark.AggregateTernarySPInstruction;
import org.apache.sysds.runtime.instructions.spark.AggregateUnarySPInstruction;
Expand Down Expand Up @@ -195,6 +196,7 @@ public class SPInstructionParser extends InstructionParser
String2SPInstructionType.put( "freplicate", SPType.Binary);
String2SPInstructionType.put( "mapdropInvalidLength", SPType.Binary);
String2SPInstructionType.put( "valueSwap", SPType.Binary);
String2SPInstructionType.put( "applySchema" , SPType.Binary);
String2SPInstructionType.put( "_map", SPType.Ternary); // _map refers to the operation map
// Relational Instruction Opcodes
String2SPInstructionType.put( "==" , SPType.Binary);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ else if(getOpcode().equals("valueSwap")) {
// Attach result frame with FrameBlock associated with output_name
sec.releaseFrameInput(input2.getName());
}
else if(getOpcode().equals("applySchema")){
Broadcast<FrameBlock> fb = sec.getSparkContext().broadcast(sec.getFrameInput(input2.getName()));
out = in1.mapValues(new applySchema(fb.getValue()));
sec.releaseFrameInput(input2.getName());
}
else {
JavaPairRDD<Long, FrameBlock> in2 = sec.getFrameBinaryBlockRDDHandleForVariable(input2.getName());
// create output frame
Expand All @@ -70,7 +75,9 @@ else if(getOpcode().equals("valueSwap")) {
//set output RDD and maintain dependencies
sec.setRDDHandleForVariable(output.getName(), out);
sec.addLineageRDD(output.getName(), input1.getName());
if( !getOpcode().equals("dropInvalidType") && !getOpcode().equals("valueSwap"))
if(!getOpcode().equals("dropInvalidType") && //
!getOpcode().equals("valueSwap") && //
!getOpcode().equals("applySchema"))
sec.addLineageRDD(output.getName(), input2.getName());
}

Expand Down Expand Up @@ -116,4 +123,20 @@ public FrameBlock call(FrameBlock arg0) throws Exception {
return arg0.valueSwap(schema_frame);
}
}


private static class applySchema implements Function<FrameBlock, FrameBlock>{
private static final long serialVersionUID = 58504021316402L;

private FrameBlock schema;

public applySchema(FrameBlock schema ) {
this.schema = schema;
}

@Override
public FrameBlock call(FrameBlock arg0) throws Exception {
return arg0.applySchema(schema);
}
}
}

0 comments on commit 5ff6274

Please sign in to comment.