Skip to content

Commit

Permalink
apacheGH-41608: [C++][Python] Extends the add_key_value to parquet::a…
Browse files Browse the repository at this point in the history
…rrow and PyArrow (apache#41633)

### Rationale for this change

The previous pr ( apache#34889 ) add a `AddKeyValueMetadata` to FileWriter. And now we should export it to Parquet Arrow and Python API.

### What changes are included in this PR?

1. Add `AddKeyValueMetadata` in parquet::arrow
2. Add `add_key_value_metadata` in pyarrow
3. testing

### Are these changes tested?

Yes

### Are there any user-facing changes?

New api allowing add key-value metadata to Parquet file

* GitHub Issue: apache#41608

Authored-by: mwish <[email protected]>
Signed-off-by: mwish <[email protected]>
  • Loading branch information
mapleFU authored Jun 4, 2024
1 parent 524a463 commit d02a91b
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 2 deletions.
1 change: 1 addition & 0 deletions cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ add_parquet_test(writer-test

add_parquet_test(arrow-test
SOURCES
arrow/arrow_metadata_test.cc
arrow/arrow_reader_writer_test.cc
arrow/arrow_schema_test.cc
arrow/arrow_statistics_test.cc)
Expand Down
97 changes: 97 additions & 0 deletions cpp/src/parquet/arrow/arrow_metadata_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "gtest/gtest.h"

#include "arrow/table.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/key_value_metadata.h"

#include "parquet/api/writer.h"

#include "parquet/arrow/reader.h"
#include "parquet/arrow/schema.h"
#include "parquet/arrow/writer.h"
#include "parquet/file_writer.h"
#include "parquet/test_util.h"

namespace parquet::arrow {

TEST(Metadata, AppendMetadata) {
// A sample table, type and structure does not matter in this test case
auto schema = ::arrow::schema({::arrow::field("f", ::arrow::utf8())});
auto table = ::arrow::Table::Make(
schema, {::arrow::ArrayFromJSON(::arrow::utf8(), R"(["a", "b", "c"])")});

auto sink = CreateOutputStream();
ArrowWriterProperties::Builder builder;
builder.store_schema();
ASSERT_OK_AND_ASSIGN(auto writer,
parquet::arrow::FileWriter::Open(
*schema, ::arrow::default_memory_pool(), sink,
parquet::default_writer_properties(), builder.build()));

auto kv_meta = std::make_shared<KeyValueMetadata>();
kv_meta->Append("test_key_1", "test_value_1");
// <test_key_2, test_value_2_temp> would be overwritten later.
kv_meta->Append("test_key_2", "test_value_2_temp");
ASSERT_OK(writer->AddKeyValueMetadata(kv_meta));

// Key value metadata that will be added to the file.
auto kv_meta_added = std::make_shared<::arrow::KeyValueMetadata>();
kv_meta_added->Append("test_key_2", "test_value_2");
kv_meta_added->Append("test_key_3", "test_value_3");

ASSERT_OK(writer->AddKeyValueMetadata(kv_meta_added));
ASSERT_OK(writer->Close());

// return error if the file is closed
ASSERT_RAISES(IOError, writer->AddKeyValueMetadata(kv_meta_added));

auto verify_key_value_metadata =
[&](const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
ASSERT_TRUE(nullptr != key_value_metadata);

// Verify keys that were added before file writer was closed are present.
for (int i = 1; i <= 3; ++i) {
auto index = std::to_string(i);
PARQUET_ASSIGN_OR_THROW(auto value,
key_value_metadata->Get("test_key_" + index));
EXPECT_EQ("test_value_" + index, value);
}
EXPECT_TRUE(key_value_metadata->Contains("ARROW:schema"));
};
// verify the metadata in writer
verify_key_value_metadata(writer->metadata()->key_value_metadata());

ASSERT_OK(writer->Close());

ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
// verify the metadata in reader
{
std::unique_ptr<FileReader> reader;
FileReaderBuilder reader_builder;
ASSERT_OK_NO_THROW(
reader_builder.Open(std::make_shared<::arrow::io::BufferReader>(buffer)));
ASSERT_OK(
reader_builder.properties(default_arrow_reader_properties())->Build(&reader));

verify_key_value_metadata(reader->parquet_reader()->metadata()->key_value_metadata());
}
}

} // namespace parquet::arrow
8 changes: 8 additions & 0 deletions cpp/src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,14 @@ class FileWriterImpl : public FileWriter {
return writer_->metadata();
}

/// \brief Append the key-value metadata to the file metadata
::arrow::Status AddKeyValueMetadata(
const std::shared_ptr<const ::arrow::KeyValueMetadata>& key_value_metadata)
override {
PARQUET_CATCH_NOT_OK(writer_->AddKeyValueMetadata(key_value_metadata));
return Status::OK();
}

private:
friend class FileWriter;

Expand Down
10 changes: 10 additions & 0 deletions cpp/src/parquet/arrow/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,16 @@ class PARQUET_EXPORT FileWriter {
virtual ~FileWriter();

virtual MemoryPool* memory_pool() const = 0;
/// \brief Add key-value metadata to the file.
/// \param[in] key_value_metadata the metadata to add.
/// \note This will overwrite any existing metadata with the same key.
/// \return Error if Close() has been called.
///
/// WARNING: If `store_schema` is enabled, `ARROW:schema` would be stored
/// in the key-value metadata. Overwriting this key would result in
/// `store_schema` being unusable during read.
virtual ::arrow::Status AddKeyValueMetadata(
const std::shared_ptr<const ::arrow::KeyValueMetadata>& key_value_metadata) = 0;
/// \brief Return the file metadata, only available after calling Close().
virtual const std::shared_ptr<FileMetaData> metadata() const = 0;
};
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class PARQUET_EXPORT ParquetFileWriter {

/// \brief Add key-value metadata to the file.
/// \param[in] key_value_metadata the metadata to add.
/// \note This will overwrite any existing metadata with the same key.
/// \note This will overwrite any existing metadata with the same key(s).
/// \throw ParquetException if Close() has been called.
void AddKeyValueMetadata(
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata);
Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil:
CStatus WriteTable(const CTable& table, int64_t chunk_size)
CStatus NewRowGroup(int64_t chunk_size)
CStatus Close()
CStatus AddKeyValueMetadata(const shared_ptr[const CKeyValueMetadata]& key_value_metadata)

const shared_ptr[CFileMetaData] metadata() const

Expand Down
12 changes: 11 additions & 1 deletion python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ from pyarrow.includes.libarrow_python cimport *
from pyarrow.lib cimport (_Weakrefable, Buffer, Schema,
check_status,
MemoryPool, maybe_unbox_memory_pool,
Table, NativeFile,
Table, KeyValueMetadata,
pyarrow_wrap_chunked_array,
pyarrow_wrap_schema,
pyarrow_unwrap_metadata,
pyarrow_unwrap_schema,
pyarrow_wrap_table,
pyarrow_wrap_batch,
Expand Down Expand Up @@ -2206,6 +2207,15 @@ cdef class ParquetWriter(_Weakrefable):
check_status(self.writer.get()
.WriteTable(deref(ctable), c_row_group_size))

def add_key_value_metadata(self, key_value_metadata):
cdef:
shared_ptr[const CKeyValueMetadata] c_metadata

c_metadata = pyarrow_unwrap_metadata(KeyValueMetadata(key_value_metadata))
with nogil:
check_status(self.writer.get()
.AddKeyValueMetadata(c_metadata))

@property
def metadata(self):
cdef:
Expand Down
13 changes: 13 additions & 0 deletions python/pyarrow/parquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,19 @@ def close(self):
if self.file_handle is not None:
self.file_handle.close()

def add_key_value_metadata(self, key_value_metadata):
"""
Add key-value metadata to the file.
This will overwrite any existing metadata with the same key.
Parameters
----------
key_value_metadata : dict
Keys and values must be string-like / coercible to bytes.
"""
assert self.is_open
self.writer.add_key_value_metadata(key_value_metadata)


def _get_pandas_index_columns(keyvalues):
return (json.loads(keyvalues[b'pandas'].decode('utf8'))
Expand Down
15 changes: 15 additions & 0 deletions python/pyarrow/tests/parquet/test_parquet_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,3 +346,18 @@ def test_parquet_writer_store_schema(tempdir):

meta = pq.read_metadata(path2)
assert meta.metadata is None


def test_parquet_writer_append_key_value_metadata(tempdir):
table = pa.Table.from_arrays([pa.array([], type='int32')], ['f0'])
path = tempdir / 'metadata.parquet'

with pq.ParquetWriter(path, table.schema) as writer:
writer.write_table(table)
writer.add_key_value_metadata({'key1': '1', 'key2': 'x'})
writer.add_key_value_metadata({'key2': '2', 'key3': '3'})
reader = pq.ParquetFile(path)
metadata = reader.metadata.metadata
assert metadata[b'key1'] == b'1'
assert metadata[b'key2'] == b'2'
assert metadata[b'key3'] == b'3'

0 comments on commit d02a91b

Please sign in to comment.