From 50b76b501485d27ef0e7b3782679bf587a53a583 Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Fri, 17 Jan 2025 16:06:44 -0600 Subject: [PATCH] NIFI-14155 Improved ObjectMapper handling in TokenParserFactory - Refactored TokenParserFactory interface to accept InputStream without configuration parameters - Avoided unnecessary ObjectMapper creation in JsonParserFactory --- .../json/AbstractJsonRowRecordReader.java | 7 +-- .../apache/nifi/json/JsonParserFactory.java | 30 +++++++++-- .../nifi/json/JsonPathRowRecordReader.java | 27 +++++----- .../apache/nifi/json/JsonRecordSource.java | 17 ++---- .../nifi/json/JsonTreeRowRecordReader.java | 25 +++++---- .../apache/nifi/json/TokenParserFactory.java | 7 +-- .../apache/nifi/yaml/YamlParserFactory.java | 42 +++++++++++---- .../apache/nifi/yaml/YamlRecordSource.java | 30 ----------- .../nifi/yaml/YamlTreeRowRecordReader.java | 4 +- .../processor/RecordTransformProxy.java | 3 +- .../salesforce/QuerySalesforceObject.java | 5 +- .../processors/standard/TestForkRecord.java | 10 ++-- .../org/apache/nifi/json/JsonPathReader.java | 18 ++++--- .../org/apache/nifi/json/JsonTreeReader.java | 14 ++--- .../org/apache/nifi/yaml/YamlTreeReader.java | 7 ++- .../TestInferJsonSchemaAccessStrategy.java | 3 +- .../json/TestJsonPathRowRecordReader.java | 52 ++++++++++--------- .../json/TestJsonTreeRowRecordReader.java | 12 ++++- .../yaml/TestYamlTreeRowRecordReader.java | 4 +- 19 files changed, 169 insertions(+), 148 deletions(-) delete mode 100644 nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlRecordSource.java diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java index de0b7dde17f9..003101ed478b 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java @@ -106,8 +106,6 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { * @param nestedFieldName the name of the field to start the processing from * @param captureFieldPredicate predicate that takes a JSON fieldName and fieldValue to capture top-level non-processed fields which can * be accessed by calling {@link #getCapturedFields()} - * @param allowComments whether to allow comments within the JSON stream - * @param streamReadConstraints configuration for the JsonFactory stream reader {@link StreamReadConstraints} * @param tokenParserFactory factory to provide an instance of com.fasterxml.jackson.core.JsonParser * @throws IOException in case of JSON stream processing failure * @throws MalformedRecordException in case of malformed JSON input @@ -120,8 +118,6 @@ protected AbstractJsonRowRecordReader(final InputStream in, final StartingFieldStrategy strategy, final String nestedFieldName, final BiPredicate captureFieldPredicate, - final boolean allowComments, - final StreamReadConstraints streamReadConstraints, final TokenParserFactory tokenParserFactory) throws IOException, MalformedRecordException { @@ -137,8 +133,7 @@ protected AbstractJsonRowRecordReader(final InputStream in, capturedFields = new LinkedHashMap<>(); try { - final StreamReadConstraints configuredStreamReadConstraints = streamReadConstraints == null ? DEFAULT_STREAM_READ_CONSTRAINTS : streamReadConstraints; - jsonParser = tokenParserFactory.getJsonParser(in, configuredStreamReadConstraints, allowComments); + jsonParser = tokenParserFactory.getJsonParser(in); jsonParser.enable(Feature.USE_FAST_DOUBLE_PARSER); jsonParser.enable(Feature.USE_FAST_BIG_NUMBER_PARSER); } catch (final JsonParseException e) { diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonParserFactory.java b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonParserFactory.java index d2e7484b3dee..827e7751b605 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonParserFactory.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonParserFactory.java @@ -26,17 +26,37 @@ import java.util.Objects; public class JsonParserFactory implements TokenParserFactory { - @Override - public JsonParser getJsonParser(final InputStream in, final StreamReadConstraints streamReadConstraints, final boolean allowComments) throws IOException { - Objects.requireNonNull(in, "Input Stream required"); + private static final ObjectMapper defaultObjectMapper = new ObjectMapper(); + + private final JsonFactory jsonFactory; + + /** + * JSON Parser Factory constructor using default ObjectMapper and associated configuration options + */ + public JsonParserFactory() { + jsonFactory = defaultObjectMapper.getFactory(); + } + + /** + * JSON Parser Factory constructor with configurable constraints + * + * @param streamReadConstraints Stream Read Constraints + * @param allowComments Allow Comments during parsing + */ + public JsonParserFactory(final StreamReadConstraints streamReadConstraints, final boolean allowComments) { Objects.requireNonNull(streamReadConstraints, "Stream Read Constraints required"); final ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints); if (allowComments) { objectMapper.enable(JsonParser.Feature.ALLOW_COMMENTS); } - final JsonFactory jsonFactory = objectMapper.getFactory(); + jsonFactory = objectMapper.getFactory(); + jsonFactory.setStreamReadConstraints(streamReadConstraints); + } + + @Override + public JsonParser getJsonParser(final InputStream in) throws IOException { + Objects.requireNonNull(in, "Input Stream required"); return jsonFactory.createParser(in); } } diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java index db6e606e0e12..fe50140ab09e 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java @@ -17,7 +17,6 @@ package org.apache.nifi.json; -import com.fasterxml.jackson.core.StreamReadConstraints; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.jayway.jsonpath.Configuration; @@ -57,26 +56,24 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader { private final RecordSchema schema; private final Configuration providerConfiguration; - public JsonPathRowRecordReader(final LinkedHashMap jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog logger, - final String dateFormat, final String timeFormat, final String timestampFormat) - throws MalformedRecordException, IOException { - this(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat, false, DEFAULT_STREAM_READ_CONSTRAINTS); - } - - public JsonPathRowRecordReader(final LinkedHashMap jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog logger, - final String dateFormat, final String timeFormat, final String timestampFormat, - final boolean allowComments, final StreamReadConstraints streamReadConstraints) - throws MalformedRecordException, IOException { - - super(in, logger, dateFormat, timeFormat, timestampFormat, null, null, null, allowComments, streamReadConstraints, new JsonParserFactory()); + public JsonPathRowRecordReader( + final LinkedHashMap jsonPaths, + final RecordSchema schema, + final InputStream in, + final ComponentLog logger, + final String dateFormat, + final String timeFormat, + final String timestampFormat, + final ObjectMapper objectMapper, + final TokenParserFactory tokenParserFactory + ) throws MalformedRecordException, IOException { + super(in, logger, dateFormat, timeFormat, timestampFormat, null, null, null, tokenParserFactory); this.schema = schema; this.jsonPaths = jsonPaths; this.in = in; this.logger = logger; - final ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints); final JsonProvider jsonProvider = new JacksonJsonProvider(objectMapper); providerConfiguration = Configuration.builder().jsonProvider(jsonProvider).build(); } diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonRecordSource.java b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonRecordSource.java index 904083d82085..56c9436e6aab 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonRecordSource.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonRecordSource.java @@ -35,24 +35,17 @@ public class JsonRecordSource implements RecordSource { private static final boolean ALLOW_COMMENTS_ENABLED = true; + private static final TokenParserFactory defaultTokenParserFactory = new JsonParserFactory(DEFAULT_STREAM_READ_CONSTRAINTS, ALLOW_COMMENTS_ENABLED); + private final JsonParser jsonParser; private final StartingFieldStrategy strategy; public JsonRecordSource(final InputStream in) throws IOException { - this(in, null, null, DEFAULT_STREAM_READ_CONSTRAINTS); - } - - public JsonRecordSource(final InputStream in, StreamReadConstraints streamReadConstraints) throws IOException { - this(in, null, null, streamReadConstraints); - } - - public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, StreamReadConstraints streamReadConstraints) throws IOException { - this(in, strategy, startingFieldName, new JsonParserFactory(), streamReadConstraints); + this(in, null, null, defaultTokenParserFactory); } - public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, TokenParserFactory tokenParserFactory, - StreamReadConstraints streamReadConstraints) throws IOException { - jsonParser = tokenParserFactory.getJsonParser(in, streamReadConstraints, ALLOW_COMMENTS_ENABLED); + public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, final TokenParserFactory tokenParserFactory) throws IOException { + jsonParser = tokenParserFactory.getJsonParser(in); this.strategy = strategy; if (strategy == StartingFieldStrategy.NESTED_FIELD) { diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java index 43cc121f1af9..aade3d104074 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java @@ -17,7 +17,6 @@ package org.apache.nifi.json; -import com.fasterxml.jackson.core.StreamReadConstraints; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; @@ -53,15 +52,21 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader { private final RecordSchema schema; - public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, - final String dateFormat, final String timeFormat, final String timestampFormat, - final StartingFieldStrategy startingFieldStrategy, final String startingFieldName, - final SchemaApplicationStrategy schemaApplicationStrategy, final BiPredicate captureFieldPredicate, - final boolean allowComments, final StreamReadConstraints streamReadConstraints, final TokenParserFactory tokenParserFactory) - throws IOException, MalformedRecordException { - - super(in, logger, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, captureFieldPredicate, - allowComments, streamReadConstraints, tokenParserFactory); + public JsonTreeRowRecordReader( + final InputStream in, + final ComponentLog logger, + final RecordSchema schema, + final String dateFormat, + final String timeFormat, + final String timestampFormat, + final StartingFieldStrategy startingFieldStrategy, + final String startingFieldName, + final SchemaApplicationStrategy schemaApplicationStrategy, + final BiPredicate captureFieldPredicate, + final TokenParserFactory tokenParserFactory + ) throws IOException, MalformedRecordException { + + super(in, logger, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, captureFieldPredicate, tokenParserFactory); if (startingFieldStrategy == StartingFieldStrategy.NESTED_FIELD && schemaApplicationStrategy == SchemaApplicationStrategy.WHOLE_JSON) { this.schema = getSelectedSchema(schema, startingFieldName); diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/TokenParserFactory.java b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/TokenParserFactory.java index 1c3a812fe2a4..dfeb4ab5ebc2 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/TokenParserFactory.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/TokenParserFactory.java @@ -17,20 +17,17 @@ package org.apache.nifi.json; import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.StreamReadConstraints; import java.io.IOException; import java.io.InputStream; public interface TokenParserFactory { /** - * Get JSON Parser implementation for provided Input Stream with configured settings + * Get JSON Parser implementation for provided Input Stream with preconfigured settings * * @param in Input Stream to be parsed - * @param streamReadConstraints Stream Read Constraints applied - * @param allowComments Whether to allow comments when parsing * @return JSON Parser * @throws IOException Thrown on failures to read the Input Stream */ - JsonParser getJsonParser(InputStream in, StreamReadConstraints streamReadConstraints, boolean allowComments) throws IOException; + JsonParser getJsonParser(InputStream in) throws IOException; } diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlParserFactory.java b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlParserFactory.java index 9fd65a4638cc..a79c7d417b59 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlParserFactory.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlParserFactory.java @@ -25,26 +25,48 @@ import java.io.IOException; import java.io.InputStream; +import java.util.Objects; public class YamlParserFactory implements TokenParserFactory { + private static final YAMLMapper yamlMapper = new YAMLMapper(); + + private final YAMLFactory yamlFactory; + + /** + * YAML Parser Factory constructor with default configuration for YAML Mapper + */ + public YamlParserFactory() { + yamlFactory = YAMLFactory.builder().build(); + yamlFactory.setCodec(yamlMapper); + } + + /** + * YAML Parser Factory constructor with configurable parsing constraints + * + * @param streamReadConstraints Stream Read Constraints required + * @param allowComments Allow Comments during parsing + */ + public YamlParserFactory(final StreamReadConstraints streamReadConstraints, final boolean allowComments) { + Objects.requireNonNull(streamReadConstraints, "Stream Read Constraints required"); + + final LoaderOptions loaderOptions = new LoaderOptions(); + loaderOptions.setCodePointLimit(streamReadConstraints.getMaxStringLength()); + loaderOptions.setProcessComments(allowComments); + + yamlFactory = YAMLFactory.builder().loaderOptions(loaderOptions).build(); + yamlFactory.setCodec(yamlMapper); + } + /** * Get Parser implementation for YAML * * @param in Input Stream to be parsed - * @param streamReadConstraints Stream Read Constraints are not supported in YAML - * @param allowComments Whether to allow comments when parsing does not apply to YAML * @return YAML Parser * @throws IOException Thrown on parser creation failures */ @Override - public JsonParser getJsonParser(final InputStream in, final StreamReadConstraints streamReadConstraints, final boolean allowComments) throws IOException { - final LoaderOptions loaderOptions = new LoaderOptions(); - loaderOptions.setCodePointLimit(streamReadConstraints.getMaxStringLength()); - final YAMLFactory yamlFactory = YAMLFactory.builder() - .loaderOptions(loaderOptions) - .build(); - - return yamlFactory.setCodec(new YAMLMapper()).createParser(in); + public JsonParser getJsonParser(final InputStream in) throws IOException { + return yamlFactory.createParser(in); } } diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlRecordSource.java b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlRecordSource.java deleted file mode 100644 index e9a07c0895e4..000000000000 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlRecordSource.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.yaml; - -import com.fasterxml.jackson.core.StreamReadConstraints; -import org.apache.nifi.json.JsonRecordSource; -import org.apache.nifi.json.StartingFieldStrategy; - -import java.io.IOException; -import java.io.InputStream; - -public class YamlRecordSource extends JsonRecordSource { - public YamlRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, StreamReadConstraints streamReadConstraints) throws IOException { - super(in, strategy, startingFieldName, new YamlParserFactory(), streamReadConstraints); - } -} diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlTreeRowRecordReader.java b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlTreeRowRecordReader.java index 25136d85aa3c..93048df79dec 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlTreeRowRecordReader.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-record-utils/nifi-yaml-record-utils/src/main/java/org/apache/nifi/yaml/YamlTreeRowRecordReader.java @@ -30,6 +30,8 @@ public class YamlTreeRowRecordReader extends JsonTreeRowRecordReader { + private static final YamlParserFactory yamlParserFactory = new YamlParserFactory(); + public YamlTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException, MalformedRecordException { this(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null); @@ -42,6 +44,6 @@ public YamlTreeRowRecordReader(final InputStream in, final ComponentLog logger, throws IOException, MalformedRecordException { super(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, schemaApplicationStrategy, - captureFieldPredicate, true, null, new YamlParserFactory()); + captureFieldPredicate, yamlParserFactory); } } diff --git a/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java index e6f981165164..8dd10705b786 100644 --- a/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java +++ b/nifi-extension-bundles/nifi-py4j-extension-bundle/nifi-py4j-bridge/src/main/java/org/apache/nifi/python/processor/RecordTransformProxy.java @@ -76,6 +76,7 @@ public class RecordTransformProxy extends PythonProcessorProxy .required(true) .build(); + private static final JsonParserFactory jsonParserFactory = new JsonParserFactory(); public RecordTransformProxy(final String processorType, final Supplier bridgeFactory, final boolean initialize) { super(processorType, bridgeFactory, initialize); @@ -303,7 +304,7 @@ private Record createRecordFromJson(final RecordTransformResult transformResult) try (final InputStream in = new ByteArrayInputStream(jsonBytes)) { final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, getLogger(), schema, null, null, null, null, - null, null, null, false, null, new JsonParserFactory()); + null, null, null, jsonParserFactory); final Record record = reader.nextRecord(false, false); return record; } diff --git a/nifi-extension-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java b/nifi-extension-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java index 80f7437da400..b8fd2899699b 100644 --- a/nifi-extension-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java +++ b/nifi-extension-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java @@ -274,6 +274,7 @@ public class QuerySalesforceObject extends AbstractProcessor { private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory(); private static final String TOTAL_RECORD_COUNT_ATTRIBUTE = "total.record.count"; private static final int MAX_RECORD_COUNT = 2000; + private static final JsonParserFactory jsonParserFactory = new JsonParserFactory(); private volatile SalesforceToRecordSchemaConverter salesForceToRecordSchemaConverter; private volatile SalesforceRestClient salesforceRestService; @@ -508,9 +509,7 @@ private JsonTreeRowRecordReader createJsonReader(InputStream querySObjectResultI STARTING_FIELD_NAME, SchemaApplicationStrategy.SELECTED_PART, CAPTURE_PREDICATE, - false, - null, - new JsonParserFactory() + jsonParserFactory ); } diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java index 4c10b7d52906..db0fff865d64 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java @@ -453,6 +453,8 @@ public void testExtractMode() throws InitializationException, IOException { private class JsonRecordReader extends AbstractControllerService implements RecordReaderFactory { + private static final JsonParserFactory jsonParserFactory = new JsonParserFactory(); + RecordSchema schema; public JsonRecordReader(RecordSchema schema) { @@ -460,14 +462,14 @@ public JsonRecordReader(RecordSchema schema) { } @Override - public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { - return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null, false, null, new JsonParserFactory()); + public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException { + return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null, jsonParserFactory); } @Override public RecordReader createRecordReader(Map variables, InputStream in, long inputLength, ComponentLog logger) - throws MalformedRecordException, IOException, SchemaNotFoundException { - return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null, false, null, new JsonParserFactory()); + throws MalformedRecordException, IOException { + return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null, jsonParserFactory); } } diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java index e0c5eace08dd..df7d8d5cad9b 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.core.StreamReadConstraints; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.jayway.jsonpath.JsonPath; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -75,8 +76,8 @@ public class JsonPathReader extends SchemaRegistryService implements RecordReade private volatile String timeFormat; private volatile String timestampFormat; private volatile LinkedHashMap jsonPaths; - private volatile boolean allowComments; - private volatile StreamReadConstraints streamReadConstraints; + private volatile ObjectMapper objectMapper; + private volatile TokenParserFactory tokenParserFactory; @Override protected List getSupportedPropertyDescriptors() { @@ -107,8 +108,13 @@ public void compileJsonPaths(final ConfigurationContext context) { this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue(); final int maxStringLength = context.getProperty(AbstractJsonRowRecordReader.MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue(); - this.streamReadConstraints = StreamReadConstraints.builder().maxStringLength(maxStringLength).build(); - this.allowComments = context.getProperty(AbstractJsonRowRecordReader.ALLOW_COMMENTS).asBoolean(); + + final StreamReadConstraints streamReadConstraints = StreamReadConstraints.builder().maxStringLength(maxStringLength).build(); + this.objectMapper = new ObjectMapper(); + objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints); + + final boolean allowComments = context.getProperty(AbstractJsonRowRecordReader.ALLOW_COMMENTS).asBoolean(); + this.tokenParserFactory = new JsonParserFactory(streamReadConstraints, allowComments); final LinkedHashMap compiled = new LinkedHashMap<>(); for (final PropertyDescriptor descriptor : context.getProperties().keySet()) { @@ -155,7 +161,7 @@ protected List getSchemaAccessStrategyValues() { @Override protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final PropertyContext context) { - final RecordSourceFactory jsonSourceFactory = (var, in) -> new JsonRecordSource(in, streamReadConstraints); + final RecordSourceFactory jsonSourceFactory = (var, in) -> new JsonRecordSource(in, null, null, tokenParserFactory); final Supplier> inferenceSupplier = () -> new JsonSchemaInference(new TimeValueInference(dateFormat, timeFormat, timestampFormat)); return SchemaInferenceUtil.getSchemaAccessStrategy(strategy, context, getLogger(), jsonSourceFactory, inferenceSupplier, @@ -171,6 +177,6 @@ protected AllowableValue getDefaultSchemaAccessStrategy() { public RecordReader createRecordReader(final Map variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException { final RecordSchema schema = getSchema(variables, in, null); - return new JsonPathRowRecordReader(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat, allowComments, streamReadConstraints); + return new JsonPathRowRecordReader(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat, objectMapper, tokenParserFactory); } } diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java index 8980ddd82318..bdc649122cb9 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java @@ -74,8 +74,7 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade protected volatile String startingFieldName; protected volatile StartingFieldStrategy startingFieldStrategy; protected volatile SchemaApplicationStrategy schemaApplicationStrategy; - protected volatile StreamReadConstraints streamReadConstraints; - private volatile boolean allowComments; + protected volatile TokenParserFactory tokenParserFactory; public static final PropertyDescriptor STARTING_FIELD_STRATEGY = new PropertyDescriptor.Builder() .name("starting-field-strategy") @@ -135,8 +134,11 @@ public void storePropertyValues(final ConfigurationContext context) { this.startingFieldStrategy = StartingFieldStrategy.valueOf(context.getProperty(STARTING_FIELD_STRATEGY).getValue()); this.startingFieldName = context.getProperty(STARTING_FIELD_NAME).getValue(); this.schemaApplicationStrategy = SchemaApplicationStrategy.valueOf(context.getProperty(SCHEMA_APPLICATION_STRATEGY).getValue()); - this.streamReadConstraints = buildStreamReadConstraints(context); - this.allowComments = isAllowCommentsEnabled(context); + this.tokenParserFactory = createTokenParserFactory(context); + } + + protected TokenParserFactory createTokenParserFactory(final ConfigurationContext context) { + return new JsonParserFactory(buildStreamReadConstraints(context), isAllowCommentsEnabled(context)); } /** @@ -179,7 +181,7 @@ protected SchemaAccessStrategy getSchemaAccessStrategy(final String schemaAccess } protected RecordSourceFactory createJsonRecordSourceFactory() { - return (variables, in) -> new JsonRecordSource(in, startingFieldStrategy, startingFieldName, streamReadConstraints); + return (variables, in) -> new JsonRecordSource(in, startingFieldStrategy, startingFieldName, tokenParserFactory); } @Override @@ -195,6 +197,6 @@ public RecordReader createRecordReader(final Map variables, fina protected JsonTreeRowRecordReader createJsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema) throws IOException, MalformedRecordException { return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, - schemaApplicationStrategy, null, allowComments, streamReadConstraints, new JsonParserFactory()); + schemaApplicationStrategy, null, tokenParserFactory); } } diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/yaml/YamlTreeReader.java b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/yaml/YamlTreeReader.java index abc0820e5056..8569103d3744 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/yaml/YamlTreeReader.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/yaml/YamlTreeReader.java @@ -17,15 +17,14 @@ package org.apache.nifi.yaml; -import com.fasterxml.jackson.databind.JsonNode; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.json.JsonTreeRowRecordReader; +import org.apache.nifi.json.TokenParserFactory; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.schema.inference.RecordSourceFactory; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.record.RecordSchema; @@ -53,8 +52,8 @@ protected List getSupportedPropertyDescriptors() { } @Override - protected RecordSourceFactory createJsonRecordSourceFactory() { - return (var, in) -> new YamlRecordSource(in, startingFieldStrategy, startingFieldName, streamReadConstraints); + protected TokenParserFactory createTokenParserFactory(final ConfigurationContext context) { + return new YamlParserFactory(buildStreamReadConstraints(context), isAllowCommentsEnabled(context)); } @Override diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java index 8af290ef9ecc..3d9bbe15d03c 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.json; -import com.fasterxml.jackson.core.StreamReadConstraints; import com.fasterxml.jackson.databind.JsonNode; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.inference.InferSchemaAccessStrategy; @@ -240,7 +239,7 @@ private RecordSchema inferSchema(final File file, final StartingFieldStrategy st final InputStream bufferedIn = new BufferedInputStream(in)) { final InferSchemaAccessStrategy accessStrategy = new InferSchemaAccessStrategy<>( - (var, content) -> new JsonRecordSource(content, strategy, startingFieldName, StreamReadConstraints.defaults()), + (var, content) -> new JsonRecordSource(content, strategy, startingFieldName, new JsonParserFactory()), timestampInference, Mockito.mock(ComponentLog.class) ); diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java index 99e7ee5634b0..2626cc018da2 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java @@ -17,6 +17,7 @@ package org.apache.nifi.json; +import com.fasterxml.jackson.databind.ObjectMapper; import com.jayway.jsonpath.JsonPath; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.serialization.MalformedRecordException; @@ -28,9 +29,7 @@ import org.apache.nifi.serialization.record.RecordSchema; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; -import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; @@ -48,6 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; class TestJsonPathRowRecordReader { private final String dateFormat = RecordFieldType.DATE.getDefaultFormat(); @@ -56,6 +56,9 @@ class TestJsonPathRowRecordReader { private final LinkedHashMap allJsonPaths = new LinkedHashMap<>(); + private final ObjectMapper mapper = new ObjectMapper(); + private final TokenParserFactory parserFactory = new JsonParserFactory(); + @BeforeEach public void populateJsonPaths() { allJsonPaths.clear(); @@ -97,8 +100,8 @@ private RecordSchema getAccountSchema() { void testReadArray() throws IOException, MalformedRecordException { final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); - try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json")); - final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) { + try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array.json"); + final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat, mapper, parserFactory)) { final List fieldNames = schema.getFieldNames(); final List expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country"); @@ -123,8 +126,8 @@ void testReadArray() throws IOException, MalformedRecordException { void testReadOneLine() throws IOException, MalformedRecordException { final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); - try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-oneline.json")); - final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) { + try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-oneline.json"); + final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat, mapper, parserFactory)) { final List fieldNames = schema.getFieldNames(); final List expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country"); @@ -149,8 +152,8 @@ void testReadOneLine() throws IOException, MalformedRecordException { void testSingleJsonElement() throws IOException, MalformedRecordException { final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); - try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account.json")); - final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) { + try (final InputStream in = new FileInputStream("src/test/resources/json/single-bank-account.json"); + final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat, mapper, parserFactory)) { final List fieldNames = schema.getFieldNames(); final List expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country"); @@ -177,9 +180,10 @@ void testTimestampCoercedFromString() throws IOException, MalformedRecordExcepti jsonPaths.put("timestamp", JsonPath.compile("$.timestamp")); jsonPaths.put("field_not_in_schema", JsonPath.compile("$.field_not_in_schema")); + final String customFormat = "yyyy/MM/dd HH:mm:ss"; for (final boolean coerceTypes : new boolean[] {true, false}) { - try (final InputStream in = new FileInputStream(new File("src/test/resources/json/timestamp.json")); - final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, "yyyy/MM/dd HH:mm:ss")) { + try (final InputStream in = new FileInputStream("src/test/resources/json/timestamp.json"); + final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, mock(ComponentLog.class), dateFormat, timeFormat, customFormat, mapper, parserFactory)) { final Record record = reader.nextRecord(coerceTypes, false); final Object value = record.getValue("timestamp"); @@ -203,8 +207,8 @@ void testElementWithNestedData() throws IOException, MalformedRecordException { fields.add(new RecordField("account", accountType)); final RecordSchema schema = new SimpleRecordSchema(fields); - try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested.json")); - final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) { + try (final InputStream in = new FileInputStream("src/test/resources/json/single-element-nested.json"); + final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat, mapper, parserFactory)) { final List fieldNames = schema.getFieldNames(); final List expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country", "account"); @@ -241,14 +245,14 @@ void testElementWithNestedArray() throws IOException, MalformedRecordException { final RecordSchema schema = new SimpleRecordSchema(fields); - try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested-array.json")); - final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) { + try (final InputStream in = new FileInputStream("src/test/resources/json/single-element-nested-array.json"); + final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat, mapper, parserFactory)) { final List fieldNames = schema.getFieldNames(); final List expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country", "accounts"); assertEquals(expectedFieldNames, fieldNames); - final List dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList()); + final List dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList()); final List expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.ARRAY); assertEquals(expectedTypes, dataTypes); @@ -283,14 +287,14 @@ void testElementWithNestedArray() throws IOException, MalformedRecordException { void testReadArrayDifferentSchemas() throws IOException, MalformedRecordException { final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); - try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json")); - final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) { + try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array-different-schemas.json"); + final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat, mapper, parserFactory)) { final List fieldNames = schema.getFieldNames(); final List expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country"); assertEquals(expectedFieldNames, fieldNames); - final List dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList()); + final List dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList()); final List expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING); assertEquals(expectedTypes, dataTypes); @@ -318,14 +322,14 @@ void testReadArrayDifferentSchemasWithOverride() throws IOException, MalformedRe final RecordSchema schema = new SimpleRecordSchema(fields); - try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json")); - final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) { + try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array-different-schemas.json"); + final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat, mapper, parserFactory)) { final List fieldNames = schema.getFieldNames(); final List expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country", "address2"); assertEquals(expectedFieldNames, fieldNames); - final List dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList()); + final List dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList()); final List expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING); assertEquals(expectedTypes, dataTypes); @@ -353,14 +357,14 @@ void testPrimitiveTypeArrays() throws IOException, MalformedRecordException { fields.add(new RecordField("accountIds", idsType)); final RecordSchema schema = new SimpleRecordSchema(fields); - try (final InputStream in = new FileInputStream(new File("src/test/resources/json/primitive-type-array.json")); - final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) { + try (final InputStream in = new FileInputStream("src/test/resources/json/primitive-type-array.json"); + final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat, mapper, parserFactory)) { final List fieldNames = schema.getFieldNames(); final List expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country", "accountIds"); assertEquals(expectedFieldNames, fieldNames); - final List dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList()); + final List dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList()); final List expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.ARRAY); assertEquals(expectedTypes, dataTypes); diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java index f08e8f129c74..cb2ad449ae3f 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java @@ -1312,7 +1312,7 @@ private void testReadRecords(InputStream jsonStream, private RecordSchema inferSchema(InputStream jsonStream, StartingFieldStrategy strategy, String startingFieldName) throws IOException { RecordSchema schema = new InferSchemaAccessStrategy<>( - (__, inputStream) -> new JsonRecordSource(inputStream, strategy, startingFieldName, StreamReadConstraints.defaults()), + (__, inputStream) -> new JsonRecordSource(inputStream, strategy, startingFieldName, new JsonParserFactory()), new JsonSchemaInference(new TimeValueInference(null, null, null)), log ).getSchema(Collections.emptyMap(), jsonStream, null); @@ -1329,7 +1329,15 @@ private JsonTreeRowRecordReader createJsonTreeRowRecordReader(InputStream inputS StartingFieldStrategy startingFieldStrategy, String startingFieldName, SchemaApplicationStrategy schemaApplicationStrategy, BiPredicate captureFieldPredicate, boolean allowComments, StreamReadConstraints streamReadConstraints) throws Exception { + + final TokenParserFactory tokenParserFactory; + if (streamReadConstraints == null) { + tokenParserFactory = new JsonParserFactory(); + } else { + tokenParserFactory = new JsonParserFactory(streamReadConstraints, allowComments); + } + return new JsonTreeRowRecordReader(inputStream, log, recordSchema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, schemaApplicationStrategy, - captureFieldPredicate, allowComments, streamReadConstraints, new JsonParserFactory()); + captureFieldPredicate, tokenParserFactory); } } diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java index 4b42ff75e6b0..4776980c8b7e 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java @@ -17,10 +17,10 @@ package org.apache.nifi.yaml; -import com.fasterxml.jackson.core.StreamReadConstraints; import org.apache.avro.Schema; import org.apache.commons.io.FileUtils; import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.json.JsonRecordSource; import org.apache.nifi.json.JsonSchemaInference; import org.apache.nifi.json.JsonTreeRowRecordReader; import org.apache.nifi.json.SchemaApplicationStrategy; @@ -1091,7 +1091,7 @@ private void testNestedReadRecords(InputStream yamlStream, private RecordSchema inferSchema(InputStream jsonStream, StartingFieldStrategy strategy, String startingFieldName) throws IOException { RecordSchema schema = new InferSchemaAccessStrategy<>( - (__, inputStream) -> new YamlRecordSource(inputStream, strategy, startingFieldName, StreamReadConstraints.defaults()), + (__, inputStream) -> new JsonRecordSource(inputStream, strategy, startingFieldName, new YamlParserFactory()), new JsonSchemaInference(new TimeValueInference(null, null, null)), mock(ComponentLog.class) ).getSchema(Collections.emptyMap(), jsonStream, null);