Skip to content

Commit

Permalink
Made parquet merge, works but needs max_files_open?
Browse files Browse the repository at this point in the history
  • Loading branch information
zbilodea committed Jun 17, 2024
1 parent 517289b commit 7808f6b
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 0 deletions.
100 changes: 100 additions & 0 deletions src/hepconvert/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,106 @@
from hepconvert.histogram_adding import _hadd_1d, _hadd_2d, _hadd_3d


def merge_parquet(
out_file,
in_files,
*,
max_files=2,
force=False,
list_to32=False,
string_to32=True,
bytestring_to32=True,
emptyarray_to=None,
categorical_as_dictionary=False,
extensionarray=True,
count_nulls=True,
compression="zstd",
compression_level=None,
row_group_size=64 * 1024 * 1024,
data_page_size=None,
parquet_flavor=None,
parquet_version="2.4",
parquet_page_version="1.0",
parquet_metadata_statistics=True,
parquet_dictionary_encoding=False,
parquet_byte_stream_split=False,
parquet_coerce_timestamps=None,
parquet_old_int96_timestamps=None,
parquet_compliant_nested=False,
parquet_extra_options=None,
storage_options=None,
skip_bad_files=False,
):
"""Merges Parquet files together.
Args:
destination (_type_): _description_
file (_type_): _description_
name (str, optional): _description_. Defaults to "tree".
force (bool, optional): _description_. Defaults to False.
branch_types (_type_, optional): _description_. Defaults to None.
progress_bar (bool, optional): _description_. Defaults to False.
append (bool, optional): _description_. Defaults to False.
title (str, optional): _description_. Defaults to "".
field_name (_type_, optional): _description_. Defaults to lambdaouter.
inner (innerifouter, optional): _description_. Defaults to =""elseouter+"_"+inner.
initial_basket_capacity (int, optional): _description_. Defaults to 10.
counter_name (_type_, optional): _description_. Defaults to lambdacounted:"n"+counted.
resize_factor (float, optional): _description_. Defaults to 10.0.
compression (str, optional): _description_. Defaults to "ZLIB".
compression_level (int, optional): _description_. Defaults to 1.
"""
if len(in_files) < 2:
msg = f"Must have at least 2 files to merge, not {len(in_files)} files."
raise AttributeError(msg)
path = Path(out_file)
if Path.is_file(path) and not force:
raise FileExistsError

data = False
for file in in_files:
try:
metadata = ak.metadata_from_parquet(file)
except FileNotFoundError:
if skip_bad_files:
continue
msg = "File: {file} does not exist or is corrupt."
raise FileNotFoundError(msg) from None
if isinstance(data, bool):
data = ak.from_parquet(file)
else:
data = ak.merge_union_of_records(
ak.concatenate((data, ak.from_parquet(file)))
)

ak.to_parquet(
data,
out_file,
list_to32=list_to32,
string_to32=string_to32,
bytestring_to32=bytestring_to32,
emptyarray_to=emptyarray_to,
categorical_as_dictionary=categorical_as_dictionary,
extensionarray=extensionarray,
count_nulls=count_nulls,
compression=compression,
compression_level=compression_level,
row_group_size=row_group_size,
data_page_size=data_page_size,
parquet_flavor=parquet_flavor,
parquet_version=parquet_version,
parquet_page_version=parquet_page_version,
parquet_metadata_statistics=parquet_metadata_statistics,
parquet_dictionary_encoding=parquet_dictionary_encoding,
parquet_byte_stream_split=parquet_byte_stream_split,
parquet_coerce_timestamps=parquet_coerce_timestamps,
parquet_old_int96_timestamps=parquet_old_int96_timestamps,
parquet_compliant_nested=parquet_compliant_nested,
parquet_extra_options=parquet_extra_options,
storage_options=storage_options,
)


def merge_root(
destination,
files,
Expand Down
62 changes: 62 additions & 0 deletions tests/test_merge_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from __future__ import annotations

from pathlib import Path

import awkward as ak
import numpy as np
import pytest
import uproot

from hepconvert import merge

skhep_testdata = pytest.importorskip("skhep_testdata")

def simple_test(tmp_path):
arr1 = ak.Array(
{
"a": [
1,
2,
],
"b": [
1,
2,
],
"c": [
1,
2,
],
}
)
ak.to_parquet(arr1, "/Users/zobil/Documents/hepconvert/tests/samples/arr1.parquet")
arr2 = ak.Array(
{
"a": [7, 8, 9],
"b": [
3,
4,
5,
],
}
)
ak.to_parquet(arr2, "/Users/zobil/Documents/hepconvert/tests/samples/arr2.parquet")
arr3 = ak.Array(
{
"a": [10, 11, 12, 13, 14],
"c": [3, 4, 5, 6, 7],
"d": [1, 2, 3, 4, 5],
}
)
ak.to_parquet(arr3, "/Users/zobil/Documents/hepconvert/tests/samples/arr3.parquet")

merge.merge_parquet(
"/Users/zobil/Documents/hepconvert/tests/samples/new.parquet",
[
"/Users/zobil/Documents/hepconvert/tests/samples/arr1.parquet",
"/Users/zobil/Documents/hepconvert/tests/samples/arr2.parquet",
"/Users/zobil/Documents/hepconvert/tests/samples/arr3.parquet",
],
force=True,
)
array = ak.from_parquet("/Users/zobil/Documents/hepconvert/tests/samples/new.parquet")

File renamed without changes.

0 comments on commit 7808f6b

Please sign in to comment.