Skip to content

Commit

Permalink
NIFI-11197 Added YamlTreeReader
Browse files Browse the repository at this point in the history
- Adjusted JsonTreeReader implementation for sharing common Jackson components

This closes apache#7665

Signed-off-by: David Handermann <[email protected]>
  • Loading branch information
dan-s1 authored and exceptionfactory committed Oct 23, 2023
1 parent 7f7e3f0 commit 4b95129
Show file tree
Hide file tree
Showing 40 changed files with 2,342 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.logging.ComponentLog;
Expand Down Expand Up @@ -56,7 +55,7 @@
public abstract class AbstractJsonRowRecordReader implements RecordReader {
public static final String DEFAULT_MAX_STRING_LENGTH = "20 MB";

static final PropertyDescriptor MAX_STRING_LENGTH = new PropertyDescriptor.Builder()
public static final PropertyDescriptor MAX_STRING_LENGTH = new PropertyDescriptor.Builder()
.name("Max String Length")
.displayName("Max String Length")
.description("The maximum allowed length of a string value when parsing the JSON document")
Expand Down Expand Up @@ -88,7 +87,6 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
private JsonParser jsonParser;
private JsonNode firstJsonNode;
private StartingFieldStrategy strategy;

private Map<String, String> capturedFields;
private BiPredicate<String, String> captureFieldPredicate;

Expand All @@ -104,28 +102,6 @@ private AbstractJsonRowRecordReader(final ComponentLog logger, final String date
lazyTimestampFormat = () -> tsf;
}

protected AbstractJsonRowRecordReader(final InputStream in,
final ComponentLog logger,
final String dateFormat,
final String timeFormat,
final String timestampFormat)
throws IOException, MalformedRecordException {

this(in, logger, dateFormat, timeFormat, timestampFormat, null, null, null, false, null);
}

protected AbstractJsonRowRecordReader(final InputStream in,
final ComponentLog logger,
final String dateFormat,
final String timeFormat,
final String timestampFormat,
final boolean allowComments,
final StreamReadConstraints streamReadConstraints)
throws IOException, MalformedRecordException {

this(in, logger, dateFormat, timeFormat, timestampFormat, null, null, null, allowComments, streamReadConstraints);
}

/**
* Constructor with initial logic for JSON to NiFi record parsing.
*
Expand All @@ -140,7 +116,7 @@ protected AbstractJsonRowRecordReader(final InputStream in,
* be accessed by calling {@link #getCapturedFields()}
* @param allowComments whether to allow comments within the JSON stream
* @param streamReadConstraints configuration for the JsonFactory stream reader {@link StreamReadConstraints}
*
* @param tokenParserFactory factory to provide an instance of com.fasterxml.jackson.core.JsonParser
* @throws IOException in case of JSON stream processing failure
* @throws MalformedRecordException in case of malformed JSON input
*/
Expand All @@ -153,7 +129,8 @@ protected AbstractJsonRowRecordReader(final InputStream in,
final String nestedFieldName,
final BiPredicate<String, String> captureFieldPredicate,
final boolean allowComments,
final StreamReadConstraints streamReadConstraints)
final StreamReadConstraints streamReadConstraints,
final TokenParserFactory tokenParserFactory)
throws IOException, MalformedRecordException {

this(logger, dateFormat, timeFormat, timestampFormat);
Expand All @@ -163,14 +140,8 @@ protected AbstractJsonRowRecordReader(final InputStream in,
capturedFields = new LinkedHashMap<>();

try {
final ObjectMapper codec = new ObjectMapper();
if (allowComments) {
codec.enable(JsonParser.Feature.ALLOW_COMMENTS);
}
codec.getFactory().setStreamReadConstraints(streamReadConstraints != null ? streamReadConstraints : DEFAULT_STREAM_READ_CONSTRAINTS);

jsonParser = codec.getFactory().createParser(in);
jsonParser.setCodec(codec);
final StreamReadConstraints configuredStreamReadConstraints = streamReadConstraints == null ? DEFAULT_STREAM_READ_CONSTRAINTS : streamReadConstraints;
jsonParser = tokenParserFactory.getJsonParser(in, configuredStreamReadConstraints, allowComments);

if (strategy == StartingFieldStrategy.NESTED_FIELD) {
while (jsonParser.nextToken() != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.json;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;

public class JsonParserFactory implements TokenParserFactory {
@Override
public JsonParser getJsonParser(final InputStream in, final StreamReadConstraints streamReadConstraints, final boolean allowComments) throws IOException {
Objects.requireNonNull(in, "Input Stream required");
Objects.requireNonNull(streamReadConstraints, "Stream Read Constraints required");

final ObjectMapper objectMapper = new ObjectMapper();
objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints);
if (allowComments) {
objectMapper.enable(JsonParser.Feature.ALLOW_COMMENTS);
}
final JsonFactory jsonFactory = objectMapper.getFactory();
return jsonFactory.createParser(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> jsonPaths,
final boolean allowComments, final StreamReadConstraints streamReadConstraints)
throws MalformedRecordException, IOException {

super(in, logger, dateFormat, timeFormat, timestampFormat, allowComments, streamReadConstraints);
super(in, logger, dateFormat, timeFormat, timestampFormat, null, null, null, allowComments, streamReadConstraints, new JsonParserFactory());

this.schema = schema;
this.jsonPaths = jsonPaths;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
*/
package org.apache.nifi.json;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.core.io.SerializedString;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.schema.inference.RecordSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -31,29 +30,28 @@

public class JsonRecordSource implements RecordSource<JsonNode> {
private static final Logger logger = LoggerFactory.getLogger(JsonRecordSource.class);
private static final JsonFactory jsonFactory;

private static final StreamReadConstraints DEFAULT_STREAM_READ_CONSTRAINTS = StreamReadConstraints.defaults();

private static final boolean ALLOW_COMMENTS_ENABLED = true;

private final JsonParser jsonParser;
private final StartingFieldStrategy strategy;
private final String startingFieldName;

static {
jsonFactory = new JsonFactory();
jsonFactory.setCodec(new ObjectMapper());
}

public JsonRecordSource(final InputStream in) throws IOException {
jsonParser = jsonFactory.createParser(in);
strategy = null;
startingFieldName = null;
this(in, null, null);
}

public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName) throws IOException {
jsonParser = jsonFactory.createParser(in);
this(in , strategy, startingFieldName, new JsonParserFactory());
}

public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, TokenParserFactory tokenParserFactory) throws IOException {
jsonParser = tokenParserFactory.getJsonParser(in, DEFAULT_STREAM_READ_CONSTRAINTS, ALLOW_COMMENTS_ENABLED);
this.strategy = strategy;
this.startingFieldName = startingFieldName;

if (strategy == StartingFieldStrategy.NESTED_FIELD) {
final SerializedString serializedNestedField = new SerializedString(this.startingFieldName);
final SerializedString serializedNestedField = new SerializedString(startingFieldName);
while (!jsonParser.nextFieldName(serializedNestedField) && jsonParser.hasCurrentToken());
logger.debug("Parsing starting at nested field [{}]", startingFieldName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,7 @@ public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger,
final String dateFormat, final String timeFormat, final String timestampFormat)
throws IOException, MalformedRecordException {

this(in, logger, schema, dateFormat, timeFormat, timestampFormat, false, null);
}

public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
final String dateFormat, final String timeFormat, final String timestampFormat,
final boolean allowComments, final StreamReadConstraints streamReadConstraints)
throws IOException, MalformedRecordException {

this(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null,
allowComments, streamReadConstraints);
this(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null);
}

public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
Expand All @@ -75,18 +66,18 @@ public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger,
throws IOException, MalformedRecordException {

this(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, schemaApplicationStrategy,
captureFieldPredicate, false, null);
captureFieldPredicate, false, null, new JsonParserFactory());
}

public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
final String dateFormat, final String timeFormat, final String timestampFormat,
final StartingFieldStrategy startingFieldStrategy, final String startingFieldName,
final SchemaApplicationStrategy schemaApplicationStrategy, final BiPredicate<String, String> captureFieldPredicate,
final boolean allowComments, final StreamReadConstraints streamReadConstraints)
final boolean allowComments, final StreamReadConstraints streamReadConstraints, final TokenParserFactory tokenParserFactory)
throws IOException, MalformedRecordException {

super(in, logger, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, captureFieldPredicate,
allowComments, streamReadConstraints);
allowComments, streamReadConstraints, tokenParserFactory);

if (startingFieldStrategy == StartingFieldStrategy.NESTED_FIELD && schemaApplicationStrategy == SchemaApplicationStrategy.WHOLE_JSON) {
this.schema = getSelectedSchema(schema, startingFieldName);
Expand All @@ -110,7 +101,6 @@ private RecordSchema getSelectedSchema(final RecordSchema schema, final String s
}
}
}

}
throw new RuntimeException(String.format("Selected schema field [%s] not found.", startingFieldName));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.json;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.StreamReadConstraints;

import java.io.IOException;
import java.io.InputStream;

public interface TokenParserFactory {
/**
* Get JSON Parser implementation for provided Input Stream with configured settings
*
* @param in Input Stream to be parsed
* @param streamReadConstraints Stream Read Constraints applied
* @param allowComments Whether to allow comments when parsing
* @return JSON Parser
* @throws IOException Thrown on failures to read the Input Stream
*/
JsonParser getJsonParser(InputStream in, StreamReadConstraints streamReadConstraints, boolean allowComments) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-yaml-record-utils</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-json-record-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.yaml;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
import org.apache.nifi.json.TokenParserFactory;

import java.io.IOException;
import java.io.InputStream;

public class YamlParserFactory implements TokenParserFactory {
private static final YAMLFactory YAML_FACTORY = new YAMLFactory(new YAMLMapper());

/**
* Get Parser implementation for YAML
*
* @param in Input Stream to be parsed
* @param streamReadConstraints Stream Read Constraints are not supported in YAML
* @param allowComments Whether to allow comments when parsing does not apply to YAML
* @return YAML Parser
* @throws IOException Thrown on parser creation failures
*/
@Override
public JsonParser getJsonParser(final InputStream in, final StreamReadConstraints streamReadConstraints, final boolean allowComments) throws IOException {
return YAML_FACTORY.createParser(in);
}
}
Loading

0 comments on commit 4b95129

Please sign in to comment.