diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index 5ac5085a694c8..dc80f08e72cfe 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -397,6 +397,7 @@ add_parquet_test(writer-test add_parquet_test(arrow-test SOURCES + arrow/arrow_metadata_test.cc arrow/arrow_reader_writer_test.cc arrow/arrow_schema_test.cc arrow/arrow_statistics_test.cc) diff --git a/cpp/src/parquet/arrow/arrow_metadata_test.cc b/cpp/src/parquet/arrow/arrow_metadata_test.cc new file mode 100644 index 0000000000000..6f512227708b9 --- /dev/null +++ b/cpp/src/parquet/arrow/arrow_metadata_test.cc @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gtest/gtest.h" + +#include "arrow/table.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/key_value_metadata.h" + +#include "parquet/api/writer.h" + +#include "parquet/arrow/reader.h" +#include "parquet/arrow/schema.h" +#include "parquet/arrow/writer.h" +#include "parquet/file_writer.h" +#include "parquet/test_util.h" + +namespace parquet::arrow { + +TEST(Metadata, AppendMetadata) { + // A sample table, type and structure does not matter in this test case + auto schema = ::arrow::schema({::arrow::field("f", ::arrow::utf8())}); + auto table = ::arrow::Table::Make( + schema, {::arrow::ArrayFromJSON(::arrow::utf8(), R"(["a", "b", "c"])")}); + + auto sink = CreateOutputStream(); + ArrowWriterProperties::Builder builder; + builder.store_schema(); + ASSERT_OK_AND_ASSIGN(auto writer, + parquet::arrow::FileWriter::Open( + *schema, ::arrow::default_memory_pool(), sink, + parquet::default_writer_properties(), builder.build())); + + auto kv_meta = std::make_shared(); + kv_meta->Append("test_key_1", "test_value_1"); + // would be overwritten later. + kv_meta->Append("test_key_2", "test_value_2_temp"); + ASSERT_OK(writer->AddKeyValueMetadata(kv_meta)); + + // Key value metadata that will be added to the file. + auto kv_meta_added = std::make_shared<::arrow::KeyValueMetadata>(); + kv_meta_added->Append("test_key_2", "test_value_2"); + kv_meta_added->Append("test_key_3", "test_value_3"); + + ASSERT_OK(writer->AddKeyValueMetadata(kv_meta_added)); + ASSERT_OK(writer->Close()); + + // return error if the file is closed + ASSERT_RAISES(IOError, writer->AddKeyValueMetadata(kv_meta_added)); + + auto verify_key_value_metadata = + [&](const std::shared_ptr& key_value_metadata) { + ASSERT_TRUE(nullptr != key_value_metadata); + + // Verify keys that were added before file writer was closed are present. + for (int i = 1; i <= 3; ++i) { + auto index = std::to_string(i); + PARQUET_ASSIGN_OR_THROW(auto value, + key_value_metadata->Get("test_key_" + index)); + EXPECT_EQ("test_value_" + index, value); + } + EXPECT_TRUE(key_value_metadata->Contains("ARROW:schema")); + }; + // verify the metadata in writer + verify_key_value_metadata(writer->metadata()->key_value_metadata()); + + ASSERT_OK(writer->Close()); + + ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); + // verify the metadata in reader + { + std::unique_ptr reader; + FileReaderBuilder reader_builder; + ASSERT_OK_NO_THROW( + reader_builder.Open(std::make_shared<::arrow::io::BufferReader>(buffer))); + ASSERT_OK( + reader_builder.properties(default_arrow_reader_properties())->Build(&reader)); + + verify_key_value_metadata(reader->parquet_reader()->metadata()->key_value_metadata()); + } +} + +} // namespace parquet::arrow diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index bd6f542d11c72..4fd7ef1b47b39 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -482,6 +482,14 @@ class FileWriterImpl : public FileWriter { return writer_->metadata(); } + /// \brief Append the key-value metadata to the file metadata + ::arrow::Status AddKeyValueMetadata( + const std::shared_ptr& key_value_metadata) + override { + PARQUET_CATCH_NOT_OK(writer_->AddKeyValueMetadata(key_value_metadata)); + return Status::OK(); + } + private: friend class FileWriter; diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h index 1decafedc97fd..4a1a033a7b7b8 100644 --- a/cpp/src/parquet/arrow/writer.h +++ b/cpp/src/parquet/arrow/writer.h @@ -143,6 +143,16 @@ class PARQUET_EXPORT FileWriter { virtual ~FileWriter(); virtual MemoryPool* memory_pool() const = 0; + /// \brief Add key-value metadata to the file. + /// \param[in] key_value_metadata the metadata to add. + /// \note This will overwrite any existing metadata with the same key. + /// \return Error if Close() has been called. + /// + /// WARNING: If `store_schema` is enabled, `ARROW:schema` would be stored + /// in the key-value metadata. Overwriting this key would result in + /// `store_schema` being unusable during read. + virtual ::arrow::Status AddKeyValueMetadata( + const std::shared_ptr& key_value_metadata) = 0; /// \brief Return the file metadata, only available after calling Close(). virtual const std::shared_ptr metadata() const = 0; }; diff --git a/cpp/src/parquet/file_writer.h b/cpp/src/parquet/file_writer.h index 31706af86dbde..d5ea1d7c98a0e 100644 --- a/cpp/src/parquet/file_writer.h +++ b/cpp/src/parquet/file_writer.h @@ -202,7 +202,7 @@ class PARQUET_EXPORT ParquetFileWriter { /// \brief Add key-value metadata to the file. /// \param[in] key_value_metadata the metadata to add. - /// \note This will overwrite any existing metadata with the same key. + /// \note This will overwrite any existing metadata with the same key(s). /// \throw ParquetException if Close() has been called. void AddKeyValueMetadata( const std::shared_ptr& key_value_metadata); diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd index ae4094d8b4b5f..1bfa505c54470 100644 --- a/python/pyarrow/_parquet.pxd +++ b/python/pyarrow/_parquet.pxd @@ -554,6 +554,7 @@ cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil: CStatus WriteTable(const CTable& table, int64_t chunk_size) CStatus NewRowGroup(int64_t chunk_size) CStatus Close() + CStatus AddKeyValueMetadata(const shared_ptr[const CKeyValueMetadata]& key_value_metadata) const shared_ptr[CFileMetaData] metadata() const diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index f7724b9b1fdc7..414f0cef4e52b 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -29,9 +29,10 @@ from pyarrow.includes.libarrow_python cimport * from pyarrow.lib cimport (_Weakrefable, Buffer, Schema, check_status, MemoryPool, maybe_unbox_memory_pool, - Table, NativeFile, + Table, KeyValueMetadata, pyarrow_wrap_chunked_array, pyarrow_wrap_schema, + pyarrow_unwrap_metadata, pyarrow_unwrap_schema, pyarrow_wrap_table, pyarrow_wrap_batch, @@ -2206,6 +2207,15 @@ cdef class ParquetWriter(_Weakrefable): check_status(self.writer.get() .WriteTable(deref(ctable), c_row_group_size)) + def add_key_value_metadata(self, key_value_metadata): + cdef: + shared_ptr[const CKeyValueMetadata] c_metadata + + c_metadata = pyarrow_unwrap_metadata(KeyValueMetadata(key_value_metadata)) + with nogil: + check_status(self.writer.get() + .AddKeyValueMetadata(c_metadata)) + @property def metadata(self): cdef: diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py index 81798b1544474..eaff79c8b137c 100644 --- a/python/pyarrow/parquet/core.py +++ b/python/pyarrow/parquet/core.py @@ -1108,6 +1108,19 @@ def close(self): if self.file_handle is not None: self.file_handle.close() + def add_key_value_metadata(self, key_value_metadata): + """ + Add key-value metadata to the file. + This will overwrite any existing metadata with the same key. + + Parameters + ---------- + key_value_metadata : dict + Keys and values must be string-like / coercible to bytes. + """ + assert self.is_open + self.writer.add_key_value_metadata(key_value_metadata) + def _get_pandas_index_columns(keyvalues): return (json.loads(keyvalues[b'pandas'].decode('utf8')) diff --git a/python/pyarrow/tests/parquet/test_parquet_writer.py b/python/pyarrow/tests/parquet/test_parquet_writer.py index f4ee7529ae87d..bc3714a6232b1 100644 --- a/python/pyarrow/tests/parquet/test_parquet_writer.py +++ b/python/pyarrow/tests/parquet/test_parquet_writer.py @@ -346,3 +346,18 @@ def test_parquet_writer_store_schema(tempdir): meta = pq.read_metadata(path2) assert meta.metadata is None + + +def test_parquet_writer_append_key_value_metadata(tempdir): + table = pa.Table.from_arrays([pa.array([], type='int32')], ['f0']) + path = tempdir / 'metadata.parquet' + + with pq.ParquetWriter(path, table.schema) as writer: + writer.write_table(table) + writer.add_key_value_metadata({'key1': '1', 'key2': 'x'}) + writer.add_key_value_metadata({'key2': '2', 'key3': '3'}) + reader = pq.ParquetFile(path) + metadata = reader.metadata.metadata + assert metadata[b'key1'] == b'1' + assert metadata[b'key2'] == b'2' + assert metadata[b'key3'] == b'3'