Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into drop-input
Browse files Browse the repository at this point in the history
  • Loading branch information
san81 authored Jul 1, 2024
2 parents 1630794 + af6bce4 commit bce2e6a
Show file tree
Hide file tree
Showing 15 changed files with 109 additions and 37 deletions.
2 changes: 1 addition & 1 deletion data-prepper-plugins/opensearch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ dependencies {
implementation 'software.amazon.awssdk:apache-client'
implementation 'software.amazon.awssdk:netty-nio-client'
implementation 'co.elastic.clients:elasticsearch-java:7.17.0'
implementation('org.apache.maven:maven-artifact:3.9.6') {
implementation('org.apache.maven:maven-artifact:3.9.8') {
exclude group: 'org.codehaus.plexus'
}
testImplementation testLibs.junit.vintage
Expand Down
16 changes: 12 additions & 4 deletions data-prepper-plugins/parquet-codecs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,24 @@ dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation libs.avro.core
implementation libs.hadoop.common
implementation(libs.hadoop.mapreduce) {
exclude group: 'org.apache.hadoop', module: 'hadoop-hdfs-client'
}
implementation 'org.apache.commons:commons-text:1.11.0'
implementation 'org.apache.parquet:parquet-avro:1.14.0'
implementation 'org.apache.parquet:parquet-column:1.14.0'
implementation 'org.apache.parquet:parquet-common:1.14.0'
implementation 'org.apache.parquet:parquet-hadoop:1.14.0'
runtimeOnly(libs.hadoop.common) {
exclude group: 'org.eclipse.jetty'
exclude group: 'org.apache.hadoop', module: 'hadoop-auth'
}
runtimeOnly(libs.hadoop.mapreduce) {
exclude group: 'org.apache.hadoop', module: 'hadoop-hdfs-client'
}
testImplementation project(':data-prepper-test-common')
testImplementation project(':data-prepper-test-event')
testImplementation(libs.hadoop.common) {
exclude group: 'org.eclipse.jetty'
exclude group: 'org.apache.hadoop', module: 'hadoop-auth'
}

constraints {
implementation('com.nimbusds:nimbus-jose-jwt') {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
package org.opensearch.dataprepper.plugins.codec.parquet;

import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.conf.PlainParquetConfiguration;
import org.apache.parquet.hadoop.ParquetReader;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
Expand Down Expand Up @@ -46,13 +47,13 @@ public class ParquetInputCodec implements InputCodec {

private static final Logger LOG = LoggerFactory.getLogger(ParquetInputCodec.class);

private final Configuration configuration;
private final ParquetConfiguration configuration;
private final EventFactory eventFactory;

@DataPrepperPluginConstructor
public ParquetInputCodec(final EventFactory eventFactory) {
this.eventFactory = eventFactory;
configuration = new Configuration();
configuration = new PlainParquetConfiguration();
configuration.setBoolean(READ_INT96_AS_FIXED, true);
}

Expand Down Expand Up @@ -80,8 +81,7 @@ public void parse(final InputFile inputFile, final DecompressionEngine decompres
}

private void parseParquetFile(final InputFile inputFile, final Consumer<Record<Event>> eventConsumer) throws IOException {
try (ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(inputFile)
.withConf(this.configuration)
try (ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(inputFile, this.configuration)
.build()) {
GenericRecordJsonEncoder encoder = new GenericRecordJsonEncoder();
GenericRecord record = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,17 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.conf.PlainParquetConfiguration;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.NanoTime;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -33,12 +42,15 @@
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
import java.time.OffsetDateTime;
import java.time.temporal.JulianFields;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -161,6 +173,22 @@ public void parseInputFile_parsesCorrectly() throws IOException {
assertRecordsCorrect(actualRecords);
}

@Test
public void parseInputStream_parsesCorrectly_with_int96() throws IOException {
final File testDataFile = File.createTempFile(FILE_PREFIX + "-int96-", FILE_SUFFIX);
testDataFile.deleteOnExit();
generateTestDataInt96(testDataFile);
InputStream targetStream = new FileInputStream(testDataFile);

parquetInputCodec.parse(targetStream, mockConsumer);

final ArgumentCaptor<Record<Event>> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class);
verify(mockConsumer, times(10)).accept(recordArgumentCaptor.capture());

final List<Record<Event>> actualRecords = recordArgumentCaptor.getAllValues();
assertThat(actualRecords.size(), equalTo(10));
}

@Test
public void parseInputFile_snappyInputFile() throws IOException, URISyntaxException {
URL resource = getClass().getClassLoader().getResource("sample.snappy.parquet");
Expand Down Expand Up @@ -203,8 +231,10 @@ public void parseInputFile_testParquetFile() throws IOException, URISyntaxExcept
private static void generateTestData(final File file) throws IOException {
Schema schema = new Schema.Parser().parse(SCHEMA_JSON);

ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(new LocalOutputFile(file))
final ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(new LocalOutputFile(file))
.withSchema(schema)
.withConf(new PlainParquetConfiguration())
.withEncryption(null)
.build();

for (int i = 0; i < 10; i++) {
Expand All @@ -220,6 +250,34 @@ private static void generateTestData(final File file) throws IOException {
writer.close();
}

/**
* Generates a Parquet file with INT96 data. This must use the example
* schema rather than Avro, or it would not correctly reproduce possible INT96
* error.
*
* @param file The file for Parquet
*/
private static void generateTestDataInt96(final File file) throws IOException {
final MessageType schema = new MessageType("test", List.of(
new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.INT96, "my_timestamp_value")
));
final PlainParquetConfiguration conf = new PlainParquetConfiguration();
conf.setStrings(WRITE_FIXED_AS_INT96, "my_timestamp_value");
conf.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString());
final ParquetWriter<Group> writer = ExampleParquetWriter.builder(new LocalOutputFile(file))
.withConf(conf)
.withEncryption(null)
.build();

for (int i = 0; i < 10; i++) {
final Group group = new SimpleGroup(schema);
group.add("my_timestamp_value", createInt96());

writer.write(group);
}
writer.close();
}

private void assertRecordsCorrect(final List<Record<Event>> records) {
assertThat(records.size(), equalTo(10));
for (int i = 0; i < 10; i++) {
Expand All @@ -240,5 +298,9 @@ private void assertRecordsCorrect(final List<Record<Event>> records) {
assertThat(record.getData().getMetadata().getEventType(), equalTo(EVENT_TYPE));
}
}

private static NanoTime createInt96() {
return new NanoTime((int) OffsetDateTime.now().getLong(JulianFields.JULIAN_DAY), System.nanoTime());
}
}

2 changes: 1 addition & 1 deletion data-prepper-plugins/rss-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ dependencies {
implementation 'joda-time:joda-time:2.12.7'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.apptasticsoftware:rssreader:3.6.0'
implementation 'com.apptasticsoftware:rssreader:3.7.0'
testImplementation libs.commons.lang3
testImplementation project(':data-prepper-test-common')
testImplementation 'org.mock-server:mockserver-junit-jupiter-no-dependencies:5.15.0'
Expand Down
5 changes: 4 additions & 1 deletion data-prepper-plugins/s3-sink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ dependencies {
implementation 'org.jetbrains.kotlin:kotlin-stdlib:1.9.22'
implementation project(':data-prepper-plugins:avro-codecs')
implementation libs.avro.core
implementation libs.hadoop.common
implementation(libs.hadoop.common) {
exclude group: 'org.eclipse.jetty'
exclude group: 'org.apache.hadoop', module: 'hadoop-auth'
}
implementation 'org.apache.parquet:parquet-avro:1.14.0'
implementation 'software.amazon.awssdk:apache-client'
implementation 'org.jetbrains.kotlin:kotlin-stdlib-common:1.9.22'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.avro.util.Utf8;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.conf.PlainParquetConfiguration;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
Expand Down Expand Up @@ -65,7 +66,7 @@ public void validate(int expectedRecords, final List<Map<String, Object>> sample
int validatedRecords = 0;

int count = 0;
try (final ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(inputFile)
try (final ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(inputFile, new PlainParquetConfiguration())
.build()) {
GenericRecord record;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.LocalInputFile;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.RecordReader;
import org.apache.parquet.schema.MessageType;
Expand Down Expand Up @@ -79,6 +77,7 @@
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -413,7 +412,7 @@ private List<HashMap<String, Object>> createParquetRecordsList(final InputStream
final File tempFile = File.createTempFile(FILE_NAME, FILE_SUFFIX);
Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
List<HashMap<String, Object>> actualRecordList = new ArrayList<>();
try (ParquetFileReader parquetFileReader = new ParquetFileReader(HadoopInputFile.fromPath(new Path(tempFile.toURI()), new Configuration()), ParquetReadOptions.builder().build())) {
try (final ParquetFileReader parquetFileReader = new ParquetFileReader(new LocalInputFile(Path.of(tempFile.toURI())), ParquetReadOptions.builder().build())) {
final ParquetMetadata footer = parquetFileReader.getFooter();
final MessageType schema = createdParquetSchema(footer);
PageReadStore pages;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@

import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.LocalInputFile;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.RecordReader;
import org.apache.parquet.schema.MessageType;
Expand Down Expand Up @@ -46,6 +44,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -556,7 +555,7 @@ private List<Map<String, Object>> createParquetRecordsList(final InputStream inp
final File tempFile = new File(tempDirectory, FILE_NAME);
Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
List<Map<String, Object>> actualRecordList = new ArrayList<>();
try (ParquetFileReader parquetFileReader = new ParquetFileReader(HadoopInputFile.fromPath(new Path(tempFile.toURI()), new Configuration()), ParquetReadOptions.builder().build())) {
try (final ParquetFileReader parquetFileReader = new ParquetFileReader(new LocalInputFile(Path.of(tempFile.toURI())), ParquetReadOptions.builder().build())) {
final ParquetMetadata footer = parquetFileReader.getFooter();
final MessageType schema = createdParquetSchema(footer);
PageReadStore pages;
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-plugins/s3-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ dependencies {
implementation 'dev.failsafe:failsafe:3.3.2'
implementation 'org.apache.httpcomponents:httpcore:4.4.16'
testImplementation libs.commons.lang3
testImplementation 'org.wiremock:wiremock:3.4.2'
testImplementation 'org.wiremock:wiremock:3.8.0'
testImplementation 'org.eclipse.jetty:jetty-bom:11.0.20'
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation testLibs.junit.vintage
Expand Down
2 changes: 1 addition & 1 deletion performance-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ configurations.all {
group 'org.opensearch.dataprepper.test.performance'

dependencies {
gatlingImplementation 'software.amazon.awssdk:auth:2.25.21'
gatlingImplementation 'software.amazon.awssdk:auth:2.26.12'
implementation 'com.fasterxml.jackson.core:jackson-core'
testRuntimeOnly testLibs.junit.engine

Expand Down
12 changes: 6 additions & 6 deletions release/staging-resources-cdk/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions testing/aws-testing-cdk/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit bce2e6a

Please sign in to comment.