Skip to content

Commit

Permalink
[SYSTEMDS-3548] load python parallel
Browse files Browse the repository at this point in the history
This commit:

- fixes the load_numpy string performance test case.
It keeps the CLI usage consistent with the other test cases,
but converts the dtype to the correct one internally.

- fixes the array boolean convert breaking for
row numbers above 64. It also adds a bit more error handling
to prevent cases like this in the future.

- parallelizes the column processing in the pandas
DataFrame to FrameBlock conversion.

- moves the assignment of column data to the FrameBlock
to the parallel column processing.

Closes #2154
  • Loading branch information
Nakroma authored and Baunsgaard committed Dec 30, 2024
1 parent c504549 commit 5f360ef
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 49 deletions.
14 changes: 7 additions & 7 deletions scripts/perftest/python/io/load_native.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,30 @@

import argparse
import timeit
from systemds.context import SystemDSContext


setup = "\n".join(
[
"from systemds.context import SystemDSContext",
"from systemds.script_building.script import DMLScript",
]
)


run = "\n".join(
[
"with SystemDSContext(logging_level=10, py4j_logging_level=50) as ctx:",
" node = ctx.read(src)",
" script = DMLScript(ctx)",
" script.build_code(node)",
" script.execute()",
"node = ctx.read(src)",
"script = DMLScript(ctx)",
"script.build_code(node)",
"script.execute()",
]
)


def main(args):
gvars = {"src": args.src}
gvars = {"src": args.src, "ctx": SystemDSContext(logging_level=10, py4j_logging_level=50)}
print(timeit.timeit(run, setup, globals=gvars, number=args.number))
gvars["ctx"].close()


if __name__ == "__main__":
Expand Down
18 changes: 11 additions & 7 deletions scripts/perftest/python/io/load_numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@

import argparse
import timeit
from systemds.context import SystemDSContext

setup = "\n".join(
[
"from systemds.context import SystemDSContext",
"from systemds.script_building.script import DMLScript",
"import numpy as np",
"array = np.loadtxt(src, delimiter=',')",
Expand All @@ -37,11 +37,10 @@

run = "\n".join(
[
"with SystemDSContext(logging_level=10, py4j_logging_level=50) as ctx:",
" matrix_from_np = ctx.from_numpy(array)",
" script = DMLScript(ctx)",
" script.add_input_from_python('test', matrix_from_np)",
" script.execute()",
"matrix_from_np = ctx.from_numpy(array)",
"script = DMLScript(ctx)",
"script.add_input_from_python('test', matrix_from_np)",
"script.execute()",
]
)

Expand All @@ -66,8 +65,9 @@


def main(args):
gvars = {"src": args.src, "dtype": args.dtype}
gvars = {"src": args.src, "dtype": args.dtype, "ctx": SystemDSContext(logging_level=10, py4j_logging_level=50)}
print(timeit.timeit(run, setup, globals=gvars, number=args.number))
gvars["ctx"].close()


if __name__ == "__main__":
Expand All @@ -86,4 +86,8 @@ def main(args):
help=help_force_dtype,
)
args = parser.parse_args()

if args.dtype == "string": # numpy has no "string" dtype, convert to "str"
args.dtype = "str"

main(args)
14 changes: 7 additions & 7 deletions scripts/perftest/python/io/load_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@

import argparse
import timeit
from systemds.context import SystemDSContext

setup = "\n".join(
[
"from systemds.context import SystemDSContext",
"from systemds.script_building.script import DMLScript",
"import pandas as pd",
"df = pd.read_csv(src, header=None)",
Expand All @@ -36,11 +36,10 @@

run = "\n".join(
[
"with SystemDSContext(logging_level=10, py4j_logging_level=50) as ctx:",
" frame_from_pandas = ctx.from_pandas(df)",
" script = DMLScript(ctx)",
" script.add_input_from_python('test', frame_from_pandas)",
" script.execute()",
"frame_from_pandas = ctx.from_pandas(df)",
"script = DMLScript(ctx)",
"script.add_input_from_python('test', frame_from_pandas)",
"script.execute()",
]
)

Expand All @@ -64,8 +63,9 @@


def main(args):
gvars = {"src": args.src, "dtype": args.dtype}
gvars = {"src": args.src, "dtype": args.dtype, "ctx": SystemDSContext(logging_level=10, py4j_logging_level=50)}
print(timeit.timeit(run, setup, globals=gvars, number=args.number))
gvars["ctx"].close()


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.frame.data.columns.Array;
import org.apache.sysds.runtime.frame.data.columns.ArrayFactory;
import org.apache.sysds.runtime.frame.data.columns.BitSetArray;
import org.apache.sysds.runtime.frame.data.columns.BooleanArray;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;

Expand Down Expand Up @@ -157,7 +158,13 @@ public static Array<?> convert(byte[] data, int numElements, Types.ValueType val
break;
case BOOLEAN:
for(int i = 0; i < numElements; i++) {
((BooleanArray) array).set(i, buffer.get() != 0);
if (array instanceof BooleanArray) {
((BooleanArray) array).set(i, buffer.get() != 0);
} else if (array instanceof BitSetArray) {
((BitSetArray) array).set(i, buffer.get() != 0);
} else {
throw new DMLRuntimeException("Array factory returned invalid array type for boolean values.");
}
}
break;
case STRING:
Expand Down
70 changes: 43 additions & 27 deletions src/main/python/systemds/utils/converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import numpy as np
import pandas as pd
import concurrent.futures
from py4j.java_gateway import JavaClass, JavaGateway, JavaObject, JVMView


Expand Down Expand Up @@ -81,6 +82,33 @@ def matrix_block_to_numpy(jvm: JVMView, mb: JavaObject):
)


def convert_column(jvm, rows, j, col_type, pd_col, fb, col_name):
"""Converts a given pandas column to a FrameBlock representation.
:param jvm: The JVMView of the current SystemDS context.
:param rows: The number of rows in the pandas DataFrame.
:param j: The current column index.
:param col_type: The ValueType of the column.
:param pd_col: The pandas column to convert.
"""
if col_type == jvm.org.apache.sysds.common.Types.ValueType.STRING:
byte_data = bytearray()
for value in pd_col.astype(str):
encoded_value = value.encode("utf-8")
byte_data.extend(struct.pack(">I", len(encoded_value)))
byte_data.extend(encoded_value)
else:
col_data = pd_col.fillna("").to_numpy()
byte_data = bytearray(col_data.tobytes())

converted_array = jvm.org.apache.sysds.runtime.util.Py4jConverterUtils.convert(
byte_data, rows, col_type
)

fb.setColumnName(j, str(col_name))
fb.setColumn(j, converted_array)


def pandas_to_frame_block(sds, pd_df: pd.DataFrame):
"""Converts a given pandas DataFrame to an internal FrameBlock representation.
Expand Down Expand Up @@ -120,49 +148,37 @@ def pandas_to_frame_block(sds, pd_df: pd.DataFrame):
jc_String = jvm.java.lang.String
jc_FrameBlock = jvm.org.apache.sysds.runtime.frame.data.FrameBlock
j_valueTypeArray = java_gate.new_array(jc_ValueType, len(schema))
j_colNameArray = java_gate.new_array(jc_String, len(col_names))

# execution speed increases with optimized code when the number of rows exceeds 4
if rows > 4:
for i in range(len(schema)):
j_valueTypeArray[i] = schema[i]
for i in range(len(col_names)):
j_colNameArray[i] = str(col_names[i])

fb = jc_FrameBlock(j_valueTypeArray, j_colNameArray, rows)
fb = jc_FrameBlock(j_valueTypeArray, rows)

# convert and set data for each column
for j, col_name in enumerate(col_names):
col_type = schema[j]
if col_type == jvm.org.apache.sysds.common.Types.ValueType.STRING:
byte_data = bytearray()
for value in pd_df[col_name].astype(str):
encoded_value = value.encode("utf-8")
byte_data.extend(struct.pack(">I", len(encoded_value)))
byte_data.extend(encoded_value)
else:
col_data = pd_df[col_name].fillna("").to_numpy()
byte_data = bytearray(col_data.tobytes())

converted_array = (
jvm.org.apache.sysds.runtime.util.Py4jConverterUtils.convert(
byte_data, rows, col_type
)
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(
lambda j, col_name: convert_column(
jvm, rows, j, schema[j], pd_df[col_name], fb, col_name
),
range(len(col_names)),
col_names,
)
fb.setColumn(j, converted_array)

return fb
else:
j_dataArray = java_gate.new_array(jc_String, rows, cols)
for i in range(len(schema)):
j_valueTypeArray[i] = schema[i]
for i in range(len(col_names)):
j_colNameArray[i] = str(col_names[i])
j = 0
j_colNameArray = java_gate.new_array(jc_String, len(col_names))

for j, col_name in enumerate(col_names):
j_valueTypeArray[j] = schema[j]
j_colNameArray[j] = str(col_names[j])
col_data = pd_df[col_name].fillna("").to_numpy(dtype=str)

for i in range(col_data.shape[0]):
if col_data[i]:
j_dataArray[i][j] = col_data[i]

fb = jc_FrameBlock(j_valueTypeArray, j_colNameArray, j_dataArray)
return fb

Expand Down

0 comments on commit 5f360ef

Please sign in to comment.