diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml
index 2db9bfe4515..dc4566687c4 100644
--- a/.github/workflows/pr.yml
+++ b/.github/workflows/pr.yml
@@ -487,7 +487,7 @@ jobs:
if-no-files-found: ignore
- name: Create Pull Request
id: update_snapshots
- uses: peter-evans/create-pull-request@v4
+ uses: peter-evans/create-pull-request@v7
if: always() && (github.event_name == 'push' || github.event.pull_request.head.repo.full_name == github.repository)
env:
HASH: ${{ format('#{0}', github.event.number) }}
diff --git a/components/openapi/src/it/resources/enrichers-with-optional-fields.yaml b/components/openapi/src/it/resources/enrichers-with-optional-fields.yaml
new file mode 100644
index 00000000000..50bb27252db
--- /dev/null
+++ b/components/openapi/src/it/resources/enrichers-with-optional-fields.yaml
@@ -0,0 +1,54 @@
+openapi: "3.0.0"
+info:
+ title: Simple API overview
+ version: 2.0.0
+paths:
+ /customer:
+ post:
+ summary: Returns ComponentsBykeys.
+ requestBody:
+ description:
+ Keys of the Customers.
+ required: true
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/MultiKeys'
+ responses:
+ '201':
+ description:
+ OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/Customer'
+
+components:
+ schemas:
+ MultiKeys:
+ type: array
+ minItems: 1
+ items:
+ type: object
+ properties:
+ primaryKey:
+ type: string
+ additionalKey:
+ type: string
+ validFor:
+ type: integer
+ format: int64
+ minimum: 1
+ maximum: 2592000
+ required:
+ - primaryKey
+ - additionalKey
+ Customer:
+ type: object
+ properties:
+ name:
+ type: string
+ category:
+ type: string
+ id:
+ type: integer
diff --git a/components/openapi/src/it/scala/pl/touk/nussknacker/openapi/functional/OpenApiScenarioIntegrationTest.scala b/components/openapi/src/it/scala/pl/touk/nussknacker/openapi/functional/OpenApiScenarioIntegrationTest.scala
index fd2671fdab0..0e8290c1789 100644
--- a/components/openapi/src/it/scala/pl/touk/nussknacker/openapi/functional/OpenApiScenarioIntegrationTest.scala
+++ b/components/openapi/src/it/scala/pl/touk/nussknacker/openapi/functional/OpenApiScenarioIntegrationTest.scala
@@ -12,6 +12,7 @@ import pl.touk.nussknacker.engine.api.typed.TypedMap
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.flink.test.FlinkSpec
import pl.touk.nussknacker.engine.graph.expression.Expression
+import pl.touk.nussknacker.engine.spel
import pl.touk.nussknacker.engine.util.test.{ClassBasedTestScenarioRunner, RunResult, TestScenarioRunner}
import pl.touk.nussknacker.openapi.enrichers.SwaggerEnricher
import pl.touk.nussknacker.openapi.parser.SwaggerParser
@@ -53,6 +54,11 @@ class OpenApiScenarioIntegrationTest
test(prepareScenarioRunner(port, sttpBackend, _.copy(allowedMethods = List("POST"))))
}
+ def withRequestBody(sttpBackend: SttpBackend[Future, Any])(test: ClassBasedTestScenarioRunner => Any) =
+ new StubService("/enrichers-with-optional-fields.yaml").withCustomerService { port =>
+ test(prepareScenarioRunner(port, sttpBackend, _.copy(allowedMethods = List("POST"))))
+ }
+
val stubbedBackend: SttpBackendStub[Future, Any] = SttpBackendStub.asynchronousFuture.whenRequestMatchesPartial {
case request =>
request.headers match {
@@ -93,6 +99,21 @@ class OpenApiScenarioIntegrationTest
)
}
+ it should "call enricher with request body" in withRequestBody(stubbedBackend) { testScenarioRunner =>
+ // given
+ val data = List("10")
+ val scenario =
+ scenarioWithEnricher((SingleBodyParameter.name, """{{additionalKey:"sss", primaryKey:"dfgdf"}}""".spel))
+
+ // when
+ val result = testScenarioRunner.runWithData(scenario, data)
+
+ // then
+ result.validValue shouldBe RunResult.success(
+ TypedMap(Map("name" -> "Robert Wright", "id" -> 10L, "category" -> "GOLD"))
+ )
+ }
+
it should "call enricher returning string" in withPrimitiveReturnType(
SttpBackendStub.asynchronousFuture.whenRequestMatchesPartial { case _ =>
Response.ok((s""""justAString""""))
diff --git a/components/openapi/src/main/scala/pl/touk/nussknacker/openapi/extractor/ParametersExtractor.scala b/components/openapi/src/main/scala/pl/touk/nussknacker/openapi/extractor/ParametersExtractor.scala
index 4d1a7121ea8..38f2253685d 100644
--- a/components/openapi/src/main/scala/pl/touk/nussknacker/openapi/extractor/ParametersExtractor.scala
+++ b/components/openapi/src/main/scala/pl/touk/nussknacker/openapi/extractor/ParametersExtractor.scala
@@ -43,7 +43,7 @@ object ParametersExtractor {
ParameterWithBodyFlag(
Parameter(
ParameterName(propertyName),
- swaggerType.typingResult,
+ SwaggerTyped.typingResult(swaggerType, resolveListOfObjects = false),
editor = swaggerType.editorOpt,
validators = List.empty,
defaultValue = None,
diff --git a/defaultModel/src/main/resources/defaultModelConfig.conf b/defaultModel/src/main/resources/defaultModelConfig.conf
index 493316e66f9..cb761635f22 100644
--- a/defaultModel/src/main/resources/defaultModelConfig.conf
+++ b/defaultModel/src/main/resources/defaultModelConfig.conf
@@ -20,13 +20,10 @@
split: {
docsUrl: ${baseDocsUrl}"BasicNodes#split"
}
- input {
+ builtin-input {
docsUrl: ${baseDocsUrl}"Fragments#inputs"
}
- fragmentInputDefinition {
- docsUrl: ${baseDocsUrl}"Fragments#inputs"
- }
- fragmentOutputDefinition {
+ builtin-output {
docsUrl: ${baseDocsUrl}"Fragments#outputs"
}
}
diff --git a/designer/client/cypress/e2e/__image_snapshots__/electron/Linux/Fragment should allow adding input parameters and display used fragment graph in modal #4.png b/designer/client/cypress/e2e/__image_snapshots__/electron/Linux/Fragment should allow adding input parameters and display used fragment graph in modal #4.png
index b6f7e64d25a..b2fe8e0bad8 100644
Binary files a/designer/client/cypress/e2e/__image_snapshots__/electron/Linux/Fragment should allow adding input parameters and display used fragment graph in modal #4.png and b/designer/client/cypress/e2e/__image_snapshots__/electron/Linux/Fragment should allow adding input parameters and display used fragment graph in modal #4.png differ
diff --git a/designer/client/cypress/e2e/__image_snapshots__/electron/Linux/Fragment should allow adding input parameters and display used fragment graph in modal #6.png b/designer/client/cypress/e2e/__image_snapshots__/electron/Linux/Fragment should allow adding input parameters and display used fragment graph in modal #6.png
index 5bb4036fb75..784a54e2e75 100644
Binary files a/designer/client/cypress/e2e/__image_snapshots__/electron/Linux/Fragment should allow adding input parameters and display used fragment graph in modal #6.png and b/designer/client/cypress/e2e/__image_snapshots__/electron/Linux/Fragment should allow adding input parameters and display used fragment graph in modal #6.png differ
diff --git a/designer/client/cypress/e2e/__image_snapshots__/electron/Linux/Fragment should allow adding input parameters and display used fragment graph in modal #8.png b/designer/client/cypress/e2e/__image_snapshots__/electron/Linux/Fragment should allow adding input parameters and display used fragment graph in modal #8.png
index cf5e477fe63..439567f0484 100644
Binary files a/designer/client/cypress/e2e/__image_snapshots__/electron/Linux/Fragment should allow adding input parameters and display used fragment graph in modal #8.png and b/designer/client/cypress/e2e/__image_snapshots__/electron/Linux/Fragment should allow adding input parameters and display used fragment graph in modal #8.png differ
diff --git a/designer/client/cypress/e2e/__image_snapshots__/electron/Linux/Fragment should display dead-ended fragment correct #0.png b/designer/client/cypress/e2e/__image_snapshots__/electron/Linux/Fragment should display dead-ended fragment correct #0.png
index 784d14fc988..41dd23fa697 100644
Binary files a/designer/client/cypress/e2e/__image_snapshots__/electron/Linux/Fragment should display dead-ended fragment correct #0.png and b/designer/client/cypress/e2e/__image_snapshots__/electron/Linux/Fragment should display dead-ended fragment correct #0.png differ
diff --git a/designer/client/cypress/e2e/counts.cy.ts b/designer/client/cypress/e2e/counts.cy.ts
index 7424753a73a..6a3042c53ed 100644
--- a/designer/client/cypress/e2e/counts.cy.ts
+++ b/designer/client/cypress/e2e/counts.cy.ts
@@ -10,6 +10,9 @@ describe("Counts", () => {
});
it("should be available via button and modal", () => {
+ // There is no other way to make this test works as expected since we cannot mock dynamic date here,
+ // and blackout or response mock has a little no sense in this case
+ const maxDiffThreshold = 0.02;
cy.viewport("macbook-15");
// Collapse toolbar to make counts button visible
@@ -20,7 +23,7 @@ describe("Counts", () => {
cy.get("[data-testid=window]").contains("Quick ranges").should("be.visible");
cy.contains(/^latest deploy$/i).should("not.exist");
- cy.get("[data-testid=window]").matchImage();
+ cy.get("[data-testid=window]").matchImage({ maxDiffThreshold });
cy.get("[data-testid=window]")
.contains(/^cancel$/i)
.click();
@@ -29,7 +32,7 @@ describe("Counts", () => {
cy.get("@button").click();
cy.get("[data-testid=window]").contains("Quick ranges").should("be.visible");
cy.contains(/^latest deploy$/i).should("be.visible");
- cy.get("[data-testid=window]").matchImage();
+ cy.get("[data-testid=window]").matchImage({ maxDiffThreshold });
cy.get("[data-testid=window]")
.contains(/^cancel$/i)
.click();
@@ -44,7 +47,7 @@ describe("Counts", () => {
cy.contains(/^previous deployments...$/i)
.should("be.visible")
.click();
- cy.get("[data-testid=window]").matchImage();
+ cy.get("[data-testid=window]").matchImage({ maxDiffThreshold });
cy.get("[data-testid=window]").contains("no refresh").should("be.visible");
cy.get("[data-testid=window]").contains("Latest deploy").click();
cy.get("[data-testid=window]").contains("10 seconds").should("be.visible");
diff --git a/designer/client/cypress/e2e/fragment.cy.ts b/designer/client/cypress/e2e/fragment.cy.ts
index c2e39829b4a..e93a3b8cab9 100644
--- a/designer/client/cypress/e2e/fragment.cy.ts
+++ b/designer/client/cypress/e2e/fragment.cy.ts
@@ -12,7 +12,7 @@ describe("Fragment", () => {
});
beforeEach(() => {
- cy.viewport(1440, 1200);
+ cy.viewport("macbook-16");
});
it("should allow adding input parameters and display used fragment graph in modal", () => {
@@ -424,12 +424,13 @@ describe("Fragment", () => {
},
force: true,
});
+
+ cy.viewport("macbook-16");
cy.layoutScenario();
- cy.get("@sendSms")
- .parent()
- .matchImage({
- maxmaxDiffThreshold: 0.015,
- screenshotConfig: { padding: 16 } });
+ cy.get('[joint-selector="layers"]').matchImage({
+ maxmaxDiffThreshold: 0.015,
+ screenshotConfig: { padding: 16 },
+ });
});
});
diff --git a/designer/client/cypress/support/mocks.ts b/designer/client/cypress/support/mocks.ts
index 54bb1689426..a72548254ef 100644
--- a/designer/client/cypress/support/mocks.ts
+++ b/designer/client/cypress/support/mocks.ts
@@ -18,13 +18,8 @@ const mockWindowDate = () => {
// let originalDate: DateConstructor;
cy.on("window:before:load", (win) => {
- // originalDate = win.Date;
Object.assign(win, { Date: FakeDate });
});
-
- // cy.on("window:before:unload", (win) => {
- // Object.assign(win, { Date: originalDate });
- // });
};
Cypress.Commands.add("mockWindowDate", mockWindowDate);
diff --git a/designer/client/package-lock.json b/designer/client/package-lock.json
index 633f240ed85..992a97e28b1 100644
--- a/designer/client/package-lock.json
+++ b/designer/client/package-lock.json
@@ -24,7 +24,7 @@
"@touk/federated-component": "1.0.0",
"@touk/window-manager": "1.9.0",
"ace-builds": "1.34.2",
- "axios": "1.7.4",
+ "axios": "1.7.5",
"d3-transition": "3.0.1",
"d3-zoom": "3.0.0",
"dagre": "0.8.5",
@@ -8163,9 +8163,9 @@
"dev": true
},
"node_modules/axios": {
- "version": "1.7.4",
- "resolved": "https://registry.npmjs.org/axios/-/axios-1.7.4.tgz",
- "integrity": "sha512-DukmaFRnY6AzAALSH4J2M3k6PkaC+MfaAGdEERRWcC9q3/TWQwLpHR8ZRLKTdQ3aBDL64EdluRDjJqKw+BPZEw==",
+ "version": "1.7.5",
+ "resolved": "https://registry.npmjs.org/axios/-/axios-1.7.5.tgz",
+ "integrity": "sha512-fZu86yCo+svH3uqJ/yTdQ0QHpQu5oL+/QE+QPSv6BZSkDAoky9vytxp7u5qk83OJFS3kEBcesWni9WTZAv3tSw==",
"dependencies": {
"follow-redirects": "^1.15.6",
"form-data": "^4.0.0",
@@ -33785,9 +33785,9 @@
"dev": true
},
"axios": {
- "version": "1.7.4",
- "resolved": "https://registry.npmjs.org/axios/-/axios-1.7.4.tgz",
- "integrity": "sha512-DukmaFRnY6AzAALSH4J2M3k6PkaC+MfaAGdEERRWcC9q3/TWQwLpHR8ZRLKTdQ3aBDL64EdluRDjJqKw+BPZEw==",
+ "version": "1.7.5",
+ "resolved": "https://registry.npmjs.org/axios/-/axios-1.7.5.tgz",
+ "integrity": "sha512-fZu86yCo+svH3uqJ/yTdQ0QHpQu5oL+/QE+QPSv6BZSkDAoky9vytxp7u5qk83OJFS3kEBcesWni9WTZAv3tSw==",
"requires": {
"follow-redirects": "^1.15.6",
"form-data": "^4.0.0",
diff --git a/designer/client/package.json b/designer/client/package.json
index b1bd96dd684..edd48d78cdb 100644
--- a/designer/client/package.json
+++ b/designer/client/package.json
@@ -17,7 +17,7 @@
"@touk/federated-component": "1.0.0",
"@touk/window-manager": "1.9.0",
"ace-builds": "1.34.2",
- "axios": "1.7.4",
+ "axios": "1.7.5",
"d3-transition": "3.0.1",
"d3-zoom": "3.0.0",
"dagre": "0.8.5",
diff --git a/designer/client/src/components/modals/SaveProcessDialog.tsx b/designer/client/src/components/modals/SaveProcessDialog.tsx
index 17e073b19f7..d77f2222c57 100644
--- a/designer/client/src/components/modals/SaveProcessDialog.tsx
+++ b/designer/client/src/components/modals/SaveProcessDialog.tsx
@@ -7,7 +7,13 @@ import { displayCurrentProcessVersion, displayProcessActivity, loadProcessToolba
import { PromptContent } from "../../windowManager";
import { CommentInput } from "../comment/CommentInput";
import { ThunkAction } from "../../actions/reduxTypes";
-import { getScenarioGraph, getProcessName, getProcessUnsavedNewName, isProcessRenamed, getScenarioLabels } from "../../reducers/selectors/graph";
+import {
+ getScenarioGraph,
+ getProcessName,
+ getProcessUnsavedNewName,
+ isProcessRenamed,
+ getScenarioLabels,
+} from "../../reducers/selectors/graph";
import HttpService from "../../http/HttpService";
import { ActionCreators as UndoActionCreators } from "redux-undo";
import { visualizationUrl } from "../../common/VisualizationUrl";
diff --git a/designer/submodules/package-lock.json b/designer/submodules/package-lock.json
index 029b175f900..2abfbee709a 100644
--- a/designer/submodules/package-lock.json
+++ b/designer/submodules/package-lock.json
@@ -5132,9 +5132,9 @@
"integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q=="
},
"node_modules/axios": {
- "version": "1.7.4",
- "resolved": "https://registry.npmjs.org/axios/-/axios-1.7.4.tgz",
- "integrity": "sha512-DukmaFRnY6AzAALSH4J2M3k6PkaC+MfaAGdEERRWcC9q3/TWQwLpHR8ZRLKTdQ3aBDL64EdluRDjJqKw+BPZEw==",
+ "version": "1.7.5",
+ "resolved": "https://registry.npmjs.org/axios/-/axios-1.7.5.tgz",
+ "integrity": "sha512-fZu86yCo+svH3uqJ/yTdQ0QHpQu5oL+/QE+QPSv6BZSkDAoky9vytxp7u5qk83OJFS3kEBcesWni9WTZAv3tSw==",
"dependencies": {
"follow-redirects": "^1.15.6",
"form-data": "^4.0.0",
@@ -17189,7 +17189,7 @@
"@mui/material": "5.15.7",
"@mui/x-data-grid": "6.18.5",
"@types/chance": "1.1.3",
- "axios": "1.7.4",
+ "axios": "1.7.5",
"chance": "1.1.8",
"copy-to-clipboard": "3.3.1",
"history": "5.3.0",
@@ -19157,7 +19157,7 @@
"@types/react-highlight-words": "0.16.4",
"@types/react-virtualized": "9.21.22",
"@types/react-virtualized-auto-sizer": "1.0.1",
- "axios": "1.7.4",
+ "axios": "1.7.5",
"chance": "1.1.8",
"copy-to-clipboard": "3.3.1",
"history": "5.3.0",
@@ -20523,9 +20523,9 @@
"integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q=="
},
"axios": {
- "version": "1.7.4",
- "resolved": "https://registry.npmjs.org/axios/-/axios-1.7.4.tgz",
- "integrity": "sha512-DukmaFRnY6AzAALSH4J2M3k6PkaC+MfaAGdEERRWcC9q3/TWQwLpHR8ZRLKTdQ3aBDL64EdluRDjJqKw+BPZEw==",
+ "version": "1.7.5",
+ "resolved": "https://registry.npmjs.org/axios/-/axios-1.7.5.tgz",
+ "integrity": "sha512-fZu86yCo+svH3uqJ/yTdQ0QHpQu5oL+/QE+QPSv6BZSkDAoky9vytxp7u5qk83OJFS3kEBcesWni9WTZAv3tSw==",
"requires": {
"follow-redirects": "^1.15.6",
"form-data": "^4.0.0",
diff --git a/designer/submodules/packages/components/package.json b/designer/submodules/packages/components/package.json
index e8b52a33e8d..1b28828b63c 100644
--- a/designer/submodules/packages/components/package.json
+++ b/designer/submodules/packages/components/package.json
@@ -13,7 +13,7 @@
"@mui/material": "5.15.7",
"@mui/x-data-grid": "6.18.5",
"@types/chance": "1.1.3",
- "axios": "1.7.4",
+ "axios": "1.7.5",
"chance": "1.1.8",
"copy-to-clipboard": "3.3.1",
"history": "5.3.0",
diff --git a/docs/Changelog.md b/docs/Changelog.md
index 34272098f12..68c697f0231 100644
--- a/docs/Changelog.md
+++ b/docs/Changelog.md
@@ -57,7 +57,14 @@
(Not available yet)
-### 1.17.0 (Not released yet)
+### 1.17.1 (Not released yet)
+
+* [#6880](https://github.com/TouK/nussknacker/pull/6880) Performance optimization of generating Avro messages with unions
+ - shorter message in logs
+
+* [#6877](https://github.com/TouK/nussknacker/pull/6877) Fix application crash on edit node details window open
+
+### 1.17.0 (12 September 2024)
* [#6658](https://github.com/TouK/nussknacker/pull/6658) Bump up circe-yaml lib to 0.15.2
* [#6398](https://github.com/TouK/nussknacker/pull/6398) Added possibility to define hint texts for scenario properties in config.
@@ -125,6 +132,10 @@
* From now on it is possible to pass an array as a parameter of type List - e.g. `T(java.lang.String).join(',', #array)`.
* Fix result type of projection (`.!`) - e.g. `#array.![#this]` returns a type array instead of a type List.
* [#6685](https://github.com/TouK/nussknacker/pull/6685) Fixed an issue with dictionary parameter editor language being set to spel when no default value was present.
+* [#6797](https://github.com/TouK/nussknacker/pull/6797) [#6815](https://github.com/TouK/nussknacker/pull/6815) Performance optimization of Avro processing
+* [#6578](https://github.com/TouK/nussknacker/pull/6578) Introduced support for displaying a docsUrl with an icon in scenario properties. This feature is based on the processingType and is configurable via the model.conf file.
+* [#6725](https://github.com/TouK/nussknacker/pull/6725) Resolved an issue where properties were not saving when a fragment was in a different group than other fragments.
+* [#6697](https://github.com/TouK/nussknacker/pull/6697) Fix unhandled error when open survey
## 1.16
diff --git a/docs/MigrationGuide.md b/docs/MigrationGuide.md
index 13d87541ceb..54c8b87808c 100644
--- a/docs/MigrationGuide.md
+++ b/docs/MigrationGuide.md
@@ -29,7 +29,7 @@ To see the biggest differences please consult the [changelog](Changelog.md).
are registered based on class of Serializer instead of instance of Serializer. If you have values that were
serialized by these Serializers in some state, the state won't be restored after upgrade.
-## In version 1.17.0 (Not released yet)
+## In version 1.17.0
### Code API changes
@@ -112,6 +112,10 @@ To see the biggest differences please consult the [changelog](Changelog.md).
### Configuration changes
* [#6635](https://github.com/TouK/nussknacker/pull/6635) `globalParameters.useTypingResultTypeInformation` parameter was removed.
Now we always use TypingResultTypeInformation
+* [#6797](https://github.com/TouK/nussknacker/pull/6797) `AVRO_USE_STRING_FOR_STRING_TYPE` environment variable
+ is not supported anymore - we always use String for String type in Avro. If you didn't set up this
+ environment variable, no action is needed
+
## In version 1.16.3
diff --git a/docs/integration/DataTypingAndSchemasHandling.md b/docs/integration/DataTypingAndSchemasHandling.md
index bbae502a184..859b0887d2b 100644
--- a/docs/integration/DataTypingAndSchemasHandling.md
+++ b/docs/integration/DataTypingAndSchemasHandling.md
@@ -32,16 +32,16 @@ on [Streaming](../scenarios_authoring/DataSourcesAndSinks.md). You need
#### [Primitive types](https://avro.apache.org/docs/1.11.0/spec.html#schema_primitive)
-| Avro type | Java type | Comment |
-|-----------|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------|
-| null | null | |
-| string | String / Char | Defaults to Java `String` (UTF-8). Use the advanced configuration option `AVRO_USE_STRING_FOR_STRING_TYPE=false` to change it to `CharSequence` |
-| boolean | Boolean | |
-| int | Integer | 32 bit |
-| long | Long | 64 bit |
-| float | Float | single precision |
-| double | Double | double precision |
-| bytes | ByteBuffer | |
+| Avro type | Java type | Comment |
+|-----------|------------|------------------|
+| null | null | |
+| string | String | |
+| boolean | Boolean | |
+| int | Integer | 32 bit |
+| long | Long | 64 bit |
+| float | Float | single precision |
+| double | Double | double precision |
+| bytes | ByteBuffer | |
#### [Logical types](https://avro.apache.org/docs/1.11.0/spec.html#Logical+Types)
@@ -133,18 +133,20 @@ properties.
### Source conversion mapping
-| JSON Schema | Java type | Comment |
-|----------------------------------------------------------------------------------------------------|-------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| [null](https://json-schema.org/understanding-json-schema/reference/null.html) | null | |
-| [string](https://json-schema.org/understanding-json-schema/reference/string.html) | String | UTF-8 |
-| [boolean](https://json-schema.org/understanding-json-schema/reference/boolean.html) | Boolean | |
-| [integer](https://json-schema.org/understanding-json-schema/reference/numeric.html#integer) | Integer/Long/BigInteger | There will be chosen narrowest type depending upon minimum maximum value defined in the json schema. In the case when no min/max boundaries are available it will map to Long by default |
-| [number](https://json-schema.org/understanding-json-schema/reference/numeric.html#number) | BigDecimal | |
-| [enum](https://json-schema.org/understanding-json-schema/reference/generic.html#enumerated-values) | String | |
+| JSON Schema | Java type | Comment |
+|----------------------------------------------------------------------------------------------------|----------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| [null](https://json-schema.org/understanding-json-schema/reference/null.html) | null | |
+| [string](https://json-schema.org/understanding-json-schema/reference/string.html) | String | UTF-8 |
+| [boolean](https://json-schema.org/understanding-json-schema/reference/boolean.html) | Boolean | |
+| [integer](https://json-schema.org/understanding-json-schema/reference/numeric.html#integer) | Integer/Long/BigInteger | There will be chosen narrowest type depending upon minimum maximum value defined in the json schema. In the case when no min/max boundaries are available it will map to Long by default |
+| [number](https://json-schema.org/understanding-json-schema/reference/numeric.html#number) | BigDecimal | |
+| [enum](https://json-schema.org/understanding-json-schema/reference/generic.html#enumerated-values) | String | |
| [array](https://json-schema.org/understanding-json-schema/reference/array.html) | [list](../scenarios_authoring/Spel.md#arrayslists) | |
#### String Format
-We support the following JSON [string format](https://json-schema.org/understanding-json-schema/reference/string.html#format) keywords.
+
+We support the following
+JSON [string format](https://json-schema.org/understanding-json-schema/reference/string.html#format) keywords.
| JSON Schema | Java type | Sample | Comment |
|-------------|---------------|---------------------------|-----------------------------|
@@ -195,9 +197,9 @@ depends on `additionalProperties` type configuration and can be `Unknown`.
| JSON Schema | Properties | Comment |
|-------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------|
| [string](https://json-schema.org/understanding-json-schema/reference/string.html) | [length](https://json-schema.org/understanding-json-schema/reference/string.html#length), [regular expressions](https://json-schema.org/understanding-json-schema/reference/string.html#length), [format](https://json-schema.org/understanding-json-schema/reference/string.html#length) | |
-| [numeric](https://json-schema.org/understanding-json-schema/reference/numeric.html) | [multiples](https://json-schema.org/understanding-json-schema/reference/numeric.html#multiples) | |
+| [numeric](https://json-schema.org/understanding-json-schema/reference/numeric.html) | [multiples](https://json-schema.org/understanding-json-schema/reference/numeric.html#multiples) | |
| [array](https://json-schema.org/understanding-json-schema/reference/array.html) | [additional items](https://json-schema.org/understanding-json-schema/reference/array.html#additional-items), [tuple validation](https://json-schema.org/understanding-json-schema/reference/array.html#tuple-validation), [contains](https://json-schema.org/understanding-json-schema/reference/array.html#contains), [min/max](https://json-schema.org/understanding-json-schema/reference/array.html#mincontains-maxcontains), [length](https://json-schema.org/understanding-json-schema/reference/array.html#length), [uniqueness](https://json-schema.org/understanding-json-schema/reference/array.html#uniqueness) | |
-| [object](https://json-schema.org/understanding-json-schema/reference/object.html) | [unevaluatedProperties](https://json-schema.org/understanding-json-schema/reference/object.html#unevaluated-properties), [extending closed](https://json-schema.org/understanding-json-schema/reference/object.html#extending-closed-schemas), [property names](https://json-schema.org/understanding-json-schema/reference/object.html#property-names), [size](https://json-schema.org/understanding-json-schema/reference/object.html#size) | |
+| [object](https://json-schema.org/understanding-json-schema/reference/object.html) | [unevaluatedProperties](https://json-schema.org/understanding-json-schema/reference/object.html#unevaluated-properties), [extending closed](https://json-schema.org/understanding-json-schema/reference/object.html#extending-closed-schemas), [property names](https://json-schema.org/understanding-json-schema/reference/object.html#property-names), [size](https://json-schema.org/understanding-json-schema/reference/object.html#size) | |
| [composition](https://json-schema.org/understanding-json-schema/reference/combining.html) | [allOf](https://json-schema.org/understanding-json-schema/reference/combining.html#allof), [not](https://json-schema.org/understanding-json-schema/reference/combining.html#not) | Read more about [validation modes](#validation-and-encoding). |
These properties will be not validated by the Designer, because on during scenario authoring time we work only on
@@ -206,20 +208,32 @@ These properties will be not validated by the Designer, because on during scenar
#### Pattern properties
##### Sources
+
Object (also nested) in source schema will be represented during scenario authoring as:
+
* Map - when there is no property defined in `properties` field
- * if only `additionalProperties` are defined then map values will be typed to according to schema in `additionalProperties` field
- * if both `additionalProperties` and `patternProperties` are defined then values will be typed as `Union` with all possible types from `additionalProperties` and `patternProperties`
+ * if only `additionalProperties` are defined then map values will be typed to according to schema
+ in `additionalProperties` field
+ * if both `additionalProperties` and `patternProperties` are defined then values will be typed as `Union` with all
+ possible types from `additionalProperties` and `patternProperties`
* Record otherwise
- * all non explicit properties can then be accessed using `record["patternOrAdditionalPropertyName"]` syntax but for now only if `pl.touk.nussknacker.engine.api.process.ExpressionConfig.dynamicPropertyAccessAllowed` is enabled (only possible in deprecated instalations with own `ProcessConfigCreator`)
+ * all non explicit properties can then be accessed using `record["patternOrAdditionalPropertyName"]` syntax but for
+ now only if `pl.touk.nussknacker.engine.api.process.ExpressionConfig.dynamicPropertyAccessAllowed` is enabled (
+ only possible in deprecated instalations with own `ProcessConfigCreator`)
##### Sinks
-Pattern properties add additional requirements during scenario authoring for types that should be encoded into JSON Schema object type:
+
+Pattern properties add additional requirements during scenario authoring for types that should be encoded into JSON
+Schema object type:
+
* Strict mode
- * only records types are allowed (no map types) and only if their fields' types are valid according to pattern properties restrictions (in addition to properties and additionalProperties)
+ * only records types are allowed (no map types) and only if their fields' types are valid according to pattern
+ properties restrictions (in addition to properties and additionalProperties)
* Lax mode
- * records are allowed under the same conditions as in strict mode but additionally Unknown type is allowed as a value's type
- * map types are allowed if their value's type matches any of property, patternProperty or additionalProperties schema or is an Unknown type
+ * records are allowed under the same conditions as in strict mode but additionally Unknown type is allowed as a
+ value's type
+ * map types are allowed if their value's type matches any of property, patternProperty or additionalProperties
+ schema or is an Unknown type
## Validation and encoding
@@ -259,8 +273,11 @@ enabled.
| allow passing `Unknown` | no | yes | When data at runtime will not match against the sink schema, then error be reported during encoding. |
| passing `Union` | `Typing Information` union has to
be the same as union schema of the sink | Any of element from `Typing Information`
union should match | When data at runtime will not match against the sink schema, then error be reported during encoding. |
-General intuition is that in `strict` mode a scenario that was successfully validated should not produce any type connect encoding errors during runtime (it can still produce errors e.g. for range validation in JSON Schema or valid enum entry validation in Avro).
-On the other hand, in `lax` mode NU allows to deploy scenario if there is any chance it can encode data properly, but responsibility for passing valid type to sink (e.g. in Unknown type) is on end-user side.
+General intuition is that in `strict` mode a scenario that was successfully validated should not produce any type
+connect encoding errors during runtime (it can still produce errors e.g. for range validation in JSON Schema or valid
+enum entry validation in Avro).
+On the other hand, in `lax` mode NU allows to deploy scenario if there is any chance it can encode data properly, but
+responsibility for passing valid type to sink (e.g. in Unknown type) is on end-user side.
We leave to the user the decision of which validation mode to choose. But be aware of it, and remember it only impacts
how we validate data during scenario authoring, and some errors can still occur during encoding at runtime.
diff --git a/engine/flink/components/base/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkBaseComponentProvider.scala b/engine/flink/components/base/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkBaseComponentProvider.scala
index bfa97fc77f1..347019e33a6 100644
--- a/engine/flink/components/base/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkBaseComponentProvider.scala
+++ b/engine/flink/components/base/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkBaseComponentProvider.scala
@@ -46,7 +46,7 @@ object FlinkBaseComponentProvider {
.withRelativeDocs("DataSourcesAndSinks#deadend")
.withDesignerWideId("dead-end"),
ComponentDefinition(name = "decision-table", component = DecisionTable)
- .withRelativeDocs("BasicNodes#decisiontable")
+ .withRelativeDocs("Enrichers/#decision-table")
.withDesignerWideId("decision-table"),
ComponentDefinition("delay", DelayTransformer)
.withRelativeDocs("DataSourcesAndSinks#delay"),
diff --git a/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/confluent/kryo/SchemaIdBasedAvroGenericRecordSerializer.scala b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/confluent/kryo/SchemaIdBasedAvroGenericRecordSerializer.scala
index 9811570fe95..104a463bc28 100644
--- a/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/confluent/kryo/SchemaIdBasedAvroGenericRecordSerializer.scala
+++ b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/confluent/kryo/SchemaIdBasedAvroGenericRecordSerializer.scala
@@ -70,7 +70,7 @@ class SchemaIdBasedAvroGenericRecordSerializer(
}
private def writeDataBytes(record: GenericRecordWithSchemaId, bos: ByteArrayOutputStream): Unit = {
- val writer = createDatumWriter(record, record.getSchema, useSchemaReflection = false)
+ val writer = createDatumWriter(record.getSchema)
val encoder = this.encoderFactory.directBinaryEncoder(bos, null)
writer.write(record, encoder)
}
@@ -94,10 +94,9 @@ class SchemaIdBasedAvroGenericRecordSerializer(
}
private def readRecord(lengthOfData: Int, schemaId: SchemaId, dataBuffer: Array[Byte]) = {
- val parsedSchema = schemaRegistry.getSchemaById(schemaId).schema
- val writerSchema = AvroUtils.extractSchema(parsedSchema)
- val reader =
- createDatumReader(writerSchema, writerSchema, useSchemaReflection = false, useSpecificAvroReader = false)
+ val parsedSchema = schemaRegistry.getSchemaById(schemaId).schema
+ val writerSchema = AvroUtils.extractSchema(parsedSchema)
+ val reader = createDatumReader(writerSchema, writerSchema)
val binaryDecoder = decoderFactory.binaryDecoder(dataBuffer, 0, lengthOfData, null)
reader.read(null, binaryDecoder).asInstanceOf[GenericData.Record]
}
diff --git a/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/FlinkKafkaUniversalSink.scala b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/FlinkKafkaUniversalSink.scala
index 0dcb2f90a40..96ed8e8b824 100644
--- a/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/FlinkKafkaUniversalSink.scala
+++ b/engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/FlinkKafkaUniversalSink.scala
@@ -3,6 +3,7 @@ package pl.touk.nussknacker.engine.schemedkafka.sink.flink
import com.typesafe.scalalogging.LazyLogging
import io.confluent.kafka.schemaregistry.ParsedSchema
import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext}
+import org.apache.flink.configuration.Configuration
import org.apache.flink.formats.avro.typeutils.NkSerializableParsedSchema
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
import org.apache.flink.streaming.api.functions.sink.SinkFunction
@@ -66,14 +67,20 @@ class FlinkKafkaUniversalSink(
protected override val exceptionHandlerPreparer: RuntimeContext => ExceptionHandler =
flinkNodeContext.exceptionHandlerPreparer
+ @transient private var encodeRecord: Any => AnyRef = _
+
+ override def open(parameters: Configuration): Unit = {
+ super.open(parameters)
+ encodeRecord = schemaSupportDispatcher
+ .forSchemaType(schema.getParsedSchema.schemaType())
+ .formValueEncoder(schema.getParsedSchema, validationMode)
+ }
+
override def map(ctx: ValueWithContext[KeyedValue[AnyRef, AnyRef]]): KeyedValue[AnyRef, AnyRef] = {
ctx.value.mapValue { data =>
exceptionHandler
.handling(Some(NodeComponentInfo(nodeId, ComponentType.Sink, "flinkKafkaAvroSink")), ctx.context) {
- val encode = schemaSupportDispatcher
- .forSchemaType(schema.getParsedSchema.schemaType())
- .formValueEncoder(schema.getParsedSchema, validationMode)
- encode(data)
+ encodeRecord(data)
}
.orNull
}
diff --git a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/AvroStringSettingsInTests.scala b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/AvroStringSettingsInTests.scala
deleted file mode 100644
index 03c37f4ff01..00000000000
--- a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/AvroStringSettingsInTests.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-package pl.touk.nussknacker.engine.schemedkafka
-
-import pl.touk.nussknacker.engine.schemedkafka.schema.AvroStringSettings
-
-object AvroStringSettingsInTests {
- def enable(): Unit = setValue(true)
-
- def setDefault(): Unit = setValue(AvroStringSettings.default)
-
- def withStringEnabled[T](setting: Boolean)(execute: => T): T = {
- setValue(setting)
- try {
- execute
- } finally {
- setDefault()
- }
- }
-
- private def setValue(value: Boolean): Unit = {
- AvroStringSettings.forceUsingStringForStringSchema // initialize lazy value
- val field = Class
- .forName("pl.touk.nussknacker.engine.schemedkafka.schema.AvroStringSettings$")
- .getDeclaredField("forceUsingStringForStringSchema")
- field.setAccessible(true)
- field.setBoolean(AvroStringSettings, value)
- }
-
-}
diff --git a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/schema/StringForcingDatumReaderSpec.scala b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/schema/StringForcingDatumReaderSpec.scala
index 8de68babe83..9ad4d44ee4e 100644
--- a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/schema/StringForcingDatumReaderSpec.scala
+++ b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/schema/StringForcingDatumReaderSpec.scala
@@ -3,10 +3,9 @@ package pl.touk.nussknacker.engine.schemedkafka.schema
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}
import org.apache.avro.io.{DecoderFactory, EncoderFactory}
-import org.apache.avro.util.Utf8
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers
-import pl.touk.nussknacker.engine.schemedkafka.{AvroStringSettingsInTests, AvroUtils, LogicalTypesGenericRecordBuilder}
+import pl.touk.nussknacker.engine.schemedkafka.{AvroUtils, LogicalTypesGenericRecordBuilder}
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
@@ -19,31 +18,14 @@ class StringForcingDatumReaderSpec extends AnyFunSpec with Matchers {
val builder = new LogicalTypesGenericRecordBuilder(schema)
builder.set("foo", "bar")
val givenRecord = builder.build()
+ givenRecord.get("foo") shouldBe a[String]
- val readRecordWithUtf = AvroStringSettingsInTests.withStringEnabled(setting = false) {
- roundTripWriteRead(givenRecord)
- }
- readRecordWithUtf.get("foo") shouldBe a[Utf8]
+ val readWhenStringForced = roundTripWriteRead(givenRecord)
- val readWhenStringForced = roundTripWriteRead(readRecordWithUtf)
readWhenStringForced.get("foo") shouldBe a[String]
readWhenStringForced shouldEqual givenRecord
}
- it("should use correct type in provided default value") {
- val schema = wrapWithRecordSchema("""[
- | { "name": "foo", "type": "string", "default": "bar" }
- |]""".stripMargin)
-
- val record1 = new LogicalTypesGenericRecordBuilder(schema).build()
- record1.get("foo") shouldBe a[String]
-
- val record2 = AvroStringSettingsInTests.withStringEnabled(setting = false) {
- new LogicalTypesGenericRecordBuilder(schema).build()
- }
- record2.get("foo") shouldBe a[Utf8]
- }
-
private def wrapWithRecordSchema(fieldsDefinition: String) =
new Schema.Parser().parse(s"""{
| "name": "sample",
diff --git a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/KafkaAvroPayloadSourceFactorySpec.scala b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/KafkaAvroPayloadSourceFactorySpec.scala
index e860a1fb444..00186ba9145 100644
--- a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/KafkaAvroPayloadSourceFactorySpec.scala
+++ b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/KafkaAvroPayloadSourceFactorySpec.scala
@@ -361,9 +361,9 @@ class KafkaAvroPayloadSourceFactorySpec extends KafkaAvroSpecMixin with KafkaAvr
Map(
VariableConstants.InputVariableName -> Typed.record(
ListMap(
- "first" -> AvroStringSettings.stringTypingResult,
- "middle" -> AvroStringSettings.stringTypingResult,
- "last" -> AvroStringSettings.stringTypingResult
+ "first" -> Typed[String],
+ "middle" -> Typed[String],
+ "last" -> Typed[String]
),
Typed.typedClass[GenericRecord]
),
diff --git a/engine/lite/components/base/src/main/scala/pl/touk/nussknacker/engine/lite/components/LiteBaseComponentProvider.scala b/engine/lite/components/base/src/main/scala/pl/touk/nussknacker/engine/lite/components/LiteBaseComponentProvider.scala
index 72c5d37768e..7996e2bc4a5 100644
--- a/engine/lite/components/base/src/main/scala/pl/touk/nussknacker/engine/lite/components/LiteBaseComponentProvider.scala
+++ b/engine/lite/components/base/src/main/scala/pl/touk/nussknacker/engine/lite/components/LiteBaseComponentProvider.scala
@@ -48,7 +48,7 @@ object LiteBaseComponentProvider {
.withRelativeDocs("DataSourcesAndSinks#deadend")
.withDesignerWideId("dead-end"),
ComponentDefinition(name = "decision-table", component = DecisionTable)
- .withRelativeDocs("BasicNodes#decisiontable")
+ .withRelativeDocs("Enrichers/#decision-table")
.withDesignerWideId("decision-table")
)
diff --git a/utils/json-utils/src/main/scala/pl/touk/nussknacker/engine/json/swagger/SwaggerTyped.scala b/utils/json-utils/src/main/scala/pl/touk/nussknacker/engine/json/swagger/SwaggerTyped.scala
index b405b9d1f19..cc876093962 100644
--- a/utils/json-utils/src/main/scala/pl/touk/nussknacker/engine/json/swagger/SwaggerTyped.scala
+++ b/utils/json-utils/src/main/scala/pl/touk/nussknacker/engine/json/swagger/SwaggerTyped.scala
@@ -173,44 +173,51 @@ object SwaggerTyped {
Option(schema.getType)
.orElse(Option(schema.getTypes).map(_.asScala.head))
- def typingResult(swaggerTyped: SwaggerTyped): TypingResult = swaggerTyped match {
- case SwaggerObject(elementType, additionalProperties, patternProperties) =>
- handleSwaggerObject(elementType, additionalProperties, patternProperties)
- case SwaggerArray(ofType) =>
- Typed.genericTypeClass(classOf[java.util.List[_]], List(typingResult(ofType)))
- case SwaggerEnum(values) =>
- Typed.fromIterableOrUnknownIfEmpty(values.map(Typed.fromInstance))
- case SwaggerBool =>
- Typed.typedClass[java.lang.Boolean]
- case SwaggerString =>
- Typed.typedClass[String]
- case SwaggerInteger =>
- Typed.typedClass[java.lang.Integer]
- case SwaggerLong =>
- Typed.typedClass[java.lang.Long]
- case SwaggerBigInteger =>
- Typed.typedClass[java.math.BigInteger]
- case SwaggerDouble =>
- Typed.typedClass[java.lang.Double]
- case SwaggerBigDecimal =>
- Typed.typedClass[java.math.BigDecimal]
- case SwaggerDateTime =>
- Typed.typedClass[ZonedDateTime]
- case SwaggerDate =>
- Typed.typedClass[LocalDate]
- case SwaggerTime =>
- Typed.typedClass[LocalTime]
- case SwaggerUnion(types) => Typed.fromIterableOrUnknownIfEmpty(types.map(typingResult))
- case SwaggerAny =>
- Unknown
- case SwaggerNull =>
- TypedNull
- }
+ // `resolveListOfObjects` flag allows one to stop resolving Type recursion for SwaggerArray[SwaggerObject]
+ // this is needed for correct validations in openApi enrichers with input parameters that contains list of objects with optional fields
+ // TODO: validations in openApi enrichers should be based on actual schema instead of `TypingResult` instance
+ def typingResult(swaggerTyped: SwaggerTyped, resolveListOfObjects: Boolean = true): TypingResult =
+ swaggerTyped match {
+ case SwaggerObject(elementType, additionalProperties, patternProperties) =>
+ handleSwaggerObject(elementType, additionalProperties, patternProperties, resolveListOfObjects)
+ case SwaggerArray(SwaggerObject(_, _, _)) if !resolveListOfObjects =>
+ Typed.genericTypeClass(classOf[java.util.List[_]], List(Unknown))
+ case SwaggerArray(ofType) =>
+ Typed.genericTypeClass(classOf[java.util.List[_]], List(typingResult(ofType, resolveListOfObjects)))
+ case SwaggerEnum(values) =>
+ Typed.fromIterableOrUnknownIfEmpty(values.map(Typed.fromInstance))
+ case SwaggerBool =>
+ Typed.typedClass[java.lang.Boolean]
+ case SwaggerString =>
+ Typed.typedClass[String]
+ case SwaggerInteger =>
+ Typed.typedClass[java.lang.Integer]
+ case SwaggerLong =>
+ Typed.typedClass[java.lang.Long]
+ case SwaggerBigInteger =>
+ Typed.typedClass[java.math.BigInteger]
+ case SwaggerDouble =>
+ Typed.typedClass[java.lang.Double]
+ case SwaggerBigDecimal =>
+ Typed.typedClass[java.math.BigDecimal]
+ case SwaggerDateTime =>
+ Typed.typedClass[ZonedDateTime]
+ case SwaggerDate =>
+ Typed.typedClass[LocalDate]
+ case SwaggerTime =>
+ Typed.typedClass[LocalTime]
+ case SwaggerUnion(types) => Typed.fromIterableOrUnknownIfEmpty(types.map(typingResult(_, resolveListOfObjects)))
+ case SwaggerAny =>
+ Unknown
+ case SwaggerNull =>
+ TypedNull
+ }
private def handleSwaggerObject(
elementType: Map[PropertyName, SwaggerTyped],
additionalProperties: AdditionalProperties,
- patternProperties: List[PatternWithSwaggerTyped]
+ patternProperties: List[PatternWithSwaggerTyped],
+ resolveListOfObject: Boolean = true
): TypingResult = {
import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap
def typedStringKeyMap(valueType: TypingResult) = {
@@ -218,7 +225,7 @@ object SwaggerTyped {
}
if (elementType.isEmpty) {
val patternPropertiesTypesSet = patternProperties.map { case PatternWithSwaggerTyped(_, propertySwaggerTyped) =>
- typingResult(propertySwaggerTyped)
+ typingResult(propertySwaggerTyped, resolveListOfObject)
}
additionalProperties match {
case AdditionalPropertiesDisabled if patternPropertiesTypesSet.isEmpty =>
@@ -226,10 +233,10 @@ object SwaggerTyped {
case AdditionalPropertiesDisabled =>
typedStringKeyMap(Typed.fromIterableOrUnknownIfEmpty(patternPropertiesTypesSet))
case AdditionalPropertiesEnabled(value) =>
- typedStringKeyMap(Typed(NonEmptyList(typingResult(value), patternPropertiesTypesSet)))
+ typedStringKeyMap(Typed(NonEmptyList(typingResult(value, resolveListOfObject), patternPropertiesTypesSet)))
}
} else {
- Typed.record(elementType.mapValuesNow(typingResult))
+ Typed.record(elementType.mapValuesNow(typingResult(_, resolveListOfObject)))
}
}
diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/AvroUtils.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/AvroUtils.scala
index aa0bc70b9ad..1e2a1fc4b8d 100644
--- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/AvroUtils.scala
+++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/AvroUtils.scala
@@ -10,8 +10,6 @@ import org.apache.avro.Schema
import org.apache.avro.data.TimeConversions
import org.apache.avro.generic._
import org.apache.avro.io.{DatumReader, DecoderFactory, EncoderFactory}
-import org.apache.avro.reflect.ReflectData
-import org.apache.avro.specific.{SpecificData, SpecificDatumWriter, SpecificRecord}
import pl.touk.nussknacker.engine.schemedkafka.schema.StringForcingDatumReaderProvider
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.GenericRecordWithSchemaId
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.OpenAPIJsonSchema
@@ -28,7 +26,7 @@ object AvroUtils extends LazyLogging {
import scala.jdk.CollectionConverters._
- def genericData: GenericData = addLogicalTypeConversions(new GenericData(_) {
+ val genericData: GenericData = addLogicalTypeConversions(new GenericData(getClass.getClassLoader) {
override def deepCopy[T](schema: Schema, value: T): T = {
val copiedRecord = super.deepCopy(schema, value)
@@ -50,23 +48,7 @@ object AvroUtils extends LazyLogging {
StringForcingDatumReaderProvider.genericDatumReader[T](writer, reader, genericData)
}
- def specificData: SpecificData = addLogicalTypeConversions(new SpecificData(_) {
- override def createDatumReader(writer: Schema, reader: Schema): DatumReader[_] = StringForcingDatumReaderProvider
- .specificDatumReader(writer, reader, this.asInstanceOf[SpecificData])
-
- override def createDatumReader(schema: Schema): DatumReader[_] = createDatumReader(schema, schema)
- })
-
- def reflectData: ReflectData = addLogicalTypeConversions(new ReflectData(_) {
- override def createDatumReader(writer: Schema, reader: Schema): DatumReader[_] = StringForcingDatumReaderProvider
- .reflectDatumReader(writer, reader, this.asInstanceOf[ReflectData])
-
- override def createDatumReader(schema: Schema): DatumReader[_] = createDatumReader(schema, schema)
-
- })
-
- private def addLogicalTypeConversions[T <: GenericData](createData: ClassLoader => T): T = {
- val data = createData(Thread.currentThread.getContextClassLoader)
+ private def addLogicalTypeConversions(data: GenericData): GenericData = {
data.addLogicalTypeConversion(new TimeConversions.DateConversion)
data.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion)
data.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion)
@@ -136,13 +118,7 @@ object AvroUtils extends LazyLogging {
case v: Array[Byte] =>
output.write(v)
case v =>
- val writer = data match {
- case _: SpecificRecord =>
- new SpecificDatumWriter[Any](container.getSchema, AvroUtils.specificData)
- case _ =>
- new GenericDatumWriter[Any](container.getSchema, AvroUtils.genericData)
- }
-
+ val writer = new GenericDatumWriter[Any](container.getSchema, AvroUtils.genericData)
val encoder = EncoderFactory.get().binaryEncoder(output, null)
writer.write(v, encoder)
encoder.flush()
diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/encode/AvroSchemaOutputValidatorPrinter.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/encode/AvroSchemaOutputValidatorPrinter.scala
index 24d4009a5ad..0a20c466638 100644
--- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/encode/AvroSchemaOutputValidatorPrinter.scala
+++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/encode/AvroSchemaOutputValidatorPrinter.scala
@@ -1,9 +1,9 @@
package pl.touk.nussknacker.engine.schemedkafka.encode
import org.apache.avro.{LogicalType, LogicalTypes, Schema}
-import pl.touk.nussknacker.engine.schemedkafka.schema.AvroStringSettings
import pl.touk.nussknacker.engine.schemedkafka.typed.AvroSchemaTypeDefinitionExtractor
import pl.touk.nussknacker.engine.util.output.OutputValidatorErrorsMessageFormatter
+
import scala.jdk.CollectionConverters._
object AvroSchemaOutputValidatorPrinter {
@@ -31,8 +31,8 @@ object AvroSchemaOutputValidatorPrinter {
)
private def schemaTypeMapping = Map(
- Schema.Type.FIXED -> List(classOf[java.nio.ByteBuffer], AvroStringSettings.stringTypingResult.klass),
- Schema.Type.ENUM -> List(AvroStringSettings.stringTypingResult.klass),
+ Schema.Type.FIXED -> List(classOf[java.nio.ByteBuffer], classOf[String]),
+ Schema.Type.ENUM -> List(classOf[String]),
)
// We try to keep this representation convention similar to TypingResult.display convention
diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/encode/ToAvroSchemaBasedEncoder.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/encode/ToAvroSchemaBasedEncoder.scala
index e34fb716dd3..d9dd5e2ebce 100644
--- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/encode/ToAvroSchemaBasedEncoder.scala
+++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/encode/ToAvroSchemaBasedEncoder.scala
@@ -9,9 +9,8 @@ import org.apache.avro.generic.{GenericContainer, GenericData}
import org.apache.avro.util.Utf8
import org.apache.avro.{AvroRuntimeException, LogicalTypes, Schema}
import pl.touk.nussknacker.engine.api.validation.ValidationMode
-import pl.touk.nussknacker.engine.schemedkafka.{AvroUtils, LogicalTypesGenericRecordBuilder}
-import pl.touk.nussknacker.engine.schemedkafka.schema.AvroStringSettings.forceUsingStringForStringSchema
import pl.touk.nussknacker.engine.schemedkafka.schema.{AvroSchemaEvolution, DefaultAvroSchemaEvolution}
+import pl.touk.nussknacker.engine.schemedkafka.{AvroUtils, LogicalTypesGenericRecordBuilder}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
@@ -19,9 +18,9 @@ import java.time.chrono.ChronoZonedDateTime
import java.time.{Instant, LocalDate, LocalTime, OffsetDateTime}
import java.util
import java.util.UUID
+import scala.collection.compat.immutable.LazyList
import scala.math.BigDecimal.RoundingMode
import scala.util.Try
-import scala.collection.compat.immutable.LazyList
class ToAvroSchemaBasedEncoder(avroSchemaEvolution: AvroSchemaEvolution, validationMode: ValidationMode) {
@@ -87,7 +86,7 @@ class ToAvroSchemaBasedEncoder(avroSchemaEvolution: AvroSchemaEvolution, validat
case (Schema.Type.STRING, uuid: String) if schema.getLogicalType == LogicalTypes.uuid() =>
encodeUUIDorError(uuid)
case (Schema.Type.STRING, str: String) =>
- Valid(encodeString(str))
+ Valid(str)
case (Schema.Type.STRING, str: CharSequence) =>
Valid(str)
case (Schema.Type.BYTES, str: CharSequence) =>
@@ -211,7 +210,7 @@ class ToAvroSchemaBasedEncoder(avroSchemaEvolution: AvroSchemaEvolution, validat
.asInstanceOf[collection.Map[AnyRef, AnyRef]]
.map {
case (k: String, v) =>
- encode(v, schema.getValueType, Some(k)).map(encodeString(k) -> _)
+ encode(v, schema.getValueType, Some(k)).map(k -> _)
case (k: CharSequence, v) =>
encode(v, schema.getValueType, Some(k.toString)).map(k -> _)
case (k, v) =>
@@ -245,10 +244,6 @@ class ToAvroSchemaBasedEncoder(avroSchemaEvolution: AvroSchemaEvolution, validat
private def error(str: String): Invalid[NonEmptyList[String]] = Invalid(NonEmptyList.of(str))
- private def encodeString(str: String): CharSequence = {
- if (forceUsingStringForStringSchema) str else new Utf8(str)
- }
-
}
object ToAvroSchemaBasedEncoder {
diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schema/AvroStringSettings.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schema/AvroStringSettings.scala
deleted file mode 100644
index 1f960fcc677..00000000000
--- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schema/AvroStringSettings.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-package pl.touk.nussknacker.engine.schemedkafka.schema
-
-import com.typesafe.scalalogging.LazyLogging
-import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedClass}
-
-import scala.util.{Failure, Properties, Success, Try}
-
-object AvroStringSettings extends LazyLogging {
- val default: Boolean = true
- val envName = "AVRO_USE_STRING_FOR_STRING_TYPE"
-
- lazy val forceUsingStringForStringSchema: Boolean = Properties
- .envOrNone(envName)
- .map(str =>
- Try(str.toBoolean) match {
- case Failure(cause) =>
- throw new RuntimeException(s"Environment variable $envName=$str is not valid boolean value", cause)
- case Success(value) => value
- }
- )
- .getOrElse(default)
-
- lazy val stringTypingResult: TypedClass =
- if (forceUsingStringForStringSchema) Typed.typedClass[String] else Typed.typedClass[CharSequence]
-}
diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schema/DatumReaderWriterMixin.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schema/DatumReaderWriterMixin.scala
index 11380e67be6..d605965e7a8 100644
--- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schema/DatumReaderWriterMixin.scala
+++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schema/DatumReaderWriterMixin.scala
@@ -1,48 +1,19 @@
package pl.touk.nussknacker.engine.schemedkafka.schema
-import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils
import org.apache.avro.Schema
import org.apache.avro.generic.GenericDatumWriter
import org.apache.avro.io.DatumReader
-import org.apache.avro.reflect.ReflectDatumWriter
-import org.apache.avro.specific.{SpecificDatumWriter, SpecificRecord}
import pl.touk.nussknacker.engine.schemedkafka.AvroUtils
-import scala.collection.mutable
-import scala.jdk.CollectionConverters._
-
/**
* Mixin for DatumReader and DatumWriter. It collects factory methods for Datums.
*/
trait DatumReaderWriterMixin {
- /**
- * We use it on checking writerSchema is primitive - on creating DatumReader (createDatumReader).
- */
- protected val primitives: mutable.Map[String, Schema] = AvroSchemaUtils.getPrimitiveSchemas.asScala
-
- def createDatumWriter(record: Any, schema: Schema, useSchemaReflection: Boolean): GenericDatumWriter[Any] =
- record match {
- case _: SpecificRecord => new SpecificDatumWriter[Any](schema, AvroUtils.specificData)
- case _ if useSchemaReflection => new ReflectDatumWriter[Any](schema, AvroUtils.reflectData)
- case _ => new GenericDatumWriter[Any](schema, AvroUtils.genericData)
- }
-
- def createDatumReader(
- writerSchema: Schema,
- readerSchema: Schema,
- useSchemaReflection: Boolean,
- useSpecificAvroReader: Boolean
- ): DatumReader[AnyRef] = {
- val writerSchemaIsPrimitive = primitives.values.exists(_.equals(readerSchema))
+ def createDatumWriter(schema: Schema): GenericDatumWriter[Any] =
+ new GenericDatumWriter[Any](schema, AvroUtils.genericData)
- if (useSchemaReflection && !writerSchemaIsPrimitive) {
- StringForcingDatumReaderProvider.reflectDatumReader[AnyRef](writerSchema, readerSchema, AvroUtils.reflectData)
- } else if (useSpecificAvroReader && !writerSchemaIsPrimitive) {
- StringForcingDatumReaderProvider.specificDatumReader[AnyRef](writerSchema, readerSchema, AvroUtils.specificData)
- } else {
- StringForcingDatumReaderProvider.genericDatumReader[AnyRef](writerSchema, readerSchema, AvroUtils.genericData)
- }
- }
+ def createDatumReader(writerSchema: Schema, readerSchema: Schema): DatumReader[AnyRef] =
+ StringForcingDatumReaderProvider.genericDatumReader[AnyRef](writerSchema, readerSchema, AvroUtils.genericData)
}
diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schema/StringForcingDatumReaderProvider.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schema/StringForcingDatumReaderProvider.scala
index 91e7c40ba87..409f76b7b47 100644
--- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schema/StringForcingDatumReaderProvider.scala
+++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schema/StringForcingDatumReaderProvider.scala
@@ -2,16 +2,13 @@ package pl.touk.nussknacker.engine.schemedkafka.schema
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericDatumReader}
-import org.apache.avro.reflect.{ReflectData, ReflectDatumReader}
-import org.apache.avro.specific.{SpecificData, SpecificDatumReader}
trait StringForcingDatumReader[T] extends GenericDatumReader[T] {
self: GenericDatumReader[T] =>
override def findStringClass(schema: Schema): Class[_] = {
// This method is invoked e.g. when determining class for map key
- if ((schema.getType == Schema.Type.STRING || schema.getType == Schema.Type.MAP) && AvroStringSettings.forceUsingStringForStringSchema)
- classOf[String]
+ if (schema.getType == Schema.Type.STRING || schema.getType == Schema.Type.MAP) classOf[String]
else super.findStringClass(schema)
}
@@ -20,12 +17,6 @@ trait StringForcingDatumReader[T] extends GenericDatumReader[T] {
object StringForcingDatumReaderProvider {
def genericDatumReader[T](writer: Schema, reader: Schema, data: GenericData): GenericDatumReader[T] =
new GenericDatumReader[T](writer, reader, data) with StringForcingDatumReader[T]
-
- def specificDatumReader[T](writer: Schema, reader: Schema, data: SpecificData): SpecificDatumReader[T] =
- new SpecificDatumReader[T](writer, reader, data) with StringForcingDatumReader[T]
-
- def reflectDatumReader[T](writer: Schema, reader: Schema, data: ReflectData): ReflectDatumReader[T] =
- new ReflectDatumReader[T](writer, reader, data) with StringForcingDatumReader[T]
}
/**
@@ -34,10 +25,4 @@ object StringForcingDatumReaderProvider {
class StringForcingDatumReaderProvider[T] {
def genericDatumReader(writer: Schema, reader: Schema, data: GenericData): GenericDatumReader[T] =
StringForcingDatumReaderProvider.genericDatumReader[T](writer, reader, data)
-
- def specificDatumReader(writer: Schema, reader: Schema, data: SpecificData): SpecificDatumReader[T] =
- StringForcingDatumReaderProvider.specificDatumReader[T](writer, reader, data)
-
- def reflectDatumReader(writer: Schema, reader: Schema, data: ReflectData): ReflectDatumReader[T] =
- StringForcingDatumReaderProvider.reflectDatumReader[T](writer, reader, data)
}
diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/confluent/serialization/AbstractConfluentKafkaAvroSerializer.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/confluent/serialization/AbstractConfluentKafkaAvroSerializer.scala
index 1eccdaf4d95..075428ac33c 100644
--- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/confluent/serialization/AbstractConfluentKafkaAvroSerializer.scala
+++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/confluent/serialization/AbstractConfluentKafkaAvroSerializer.scala
@@ -90,7 +90,7 @@ class AbstractConfluentKafkaAvroSerializer(avroSchemaEvolution: AvroSchemaEvolut
case array: Array[Byte] => out.write(array)
case _ =>
val encoder = encoderToUse(avroSchema, out)
- val writer = createDatumWriter(data, avroSchema, useSchemaReflection = useSchemaReflection)
+ val writer = createDatumWriter(avroSchema)
writer.write(data, encoder)
encoder.flush()
}
diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/formatter/AvroMessageFormatter.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/formatter/AvroMessageFormatter.scala
index 548b1ee2f75..6486e2b5cae 100644
--- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/formatter/AvroMessageFormatter.scala
+++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/formatter/AvroMessageFormatter.scala
@@ -26,7 +26,7 @@ private[schemaregistry] object AvroMessageFormatter extends DatumReaderWriterMix
case bytes: Array[Byte] => ByteBuffer.wrap(bytes)
case other => other
}
- val writer = createDatumWriter(record, schema, useSchemaReflection = false)
+ val writer = createDatumWriter(schema)
writer.write(record, encoder)
encoder.flush()
val str = bos.toString(StandardCharsets.UTF_8)
diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/ParsedSchemaSupport.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/ParsedSchemaSupport.scala
index e5bf61c5dbe..12368b342a3 100644
--- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/ParsedSchemaSupport.scala
+++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/ParsedSchemaSupport.scala
@@ -153,8 +153,10 @@ object JsonSchemaSupport extends ParsedSchemaSupport[OpenAPIJsonSchema] {
}
}
- override def formValueEncoder(schema: ParsedSchema, mode: ValidationMode): Any => AnyRef = { (value: Any) =>
- new ToJsonSchemaBasedEncoder(mode).encodeOrError(value, schema.cast().rawSchema())
+ override def formValueEncoder(schema: ParsedSchema, mode: ValidationMode): Any => AnyRef = {
+ val encoder = new ToJsonSchemaBasedEncoder(mode)
+ val rawSchema = schema.cast().rawSchema()
+ (value: Any) => encoder.encodeOrError(value, rawSchema)
}
override def recordFormatterSupport(schemaRegistryClient: SchemaRegistryClient): RecordFormatterSupport =
diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaPayloadDeserializer.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaPayloadDeserializer.scala
index e15f037441c..287c3ed70d0 100644
--- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaPayloadDeserializer.scala
+++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaPayloadDeserializer.scala
@@ -25,13 +25,11 @@ trait UniversalSchemaPayloadDeserializer {
object AvroPayloadDeserializer {
def apply(config: KafkaConfig) =
- new AvroPayloadDeserializer(false, false, GenericRecordSchemaIdSerializationSupport(config), DecoderFactory.get())
+ new AvroPayloadDeserializer(GenericRecordSchemaIdSerializationSupport(config), DecoderFactory.get())
}
// This implementation is based on Confluent's one but currently deosn't use any Confluent specific things
class AvroPayloadDeserializer(
- useSchemaReflection: Boolean,
- useSpecificAvroReader: Boolean,
genericRecordSchemaIdSerializationSupport: GenericRecordSchemaIdSerializationSupport,
decoderFactory: DecoderFactory
) extends DatumReaderWriterMixin
@@ -47,12 +45,7 @@ class AvroPayloadDeserializer(
val avroExpectedSchemaData = expectedSchemaData.asInstanceOf[Option[RuntimeSchemaData[AvroSchema]]]
val avroWriterSchemaData = writerSchemaData.asInstanceOf[RuntimeSchemaData[AvroSchema]]
val readerSchemaData = avroExpectedSchemaData.getOrElse(avroWriterSchemaData)
- val reader = createDatumReader(
- avroWriterSchemaData.schema.rawSchema(),
- readerSchemaData.schema.rawSchema(),
- useSchemaReflection,
- useSpecificAvroReader
- )
+ val reader = createDatumReader(avroWriterSchemaData.schema.rawSchema(), readerSchemaData.schema.rawSchema())
val result = recordDeserializer.deserializeRecord(readerSchemaData.schema.rawSchema(), reader, buffer)
genericRecordSchemaIdSerializationSupport.wrapWithRecordWithSchemaIdIfNeeded(result, readerSchemaData)
}
diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaSupport.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaSupport.scala
index a3119a922a1..ce1ba698fd4 100644
--- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaSupport.scala
+++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaSupport.scala
@@ -62,8 +62,9 @@ trait UniversalSchemaSupport {
}
final def prepareMessageFormatter(schema: ParsedSchema, schemaRegistryClient: SchemaRegistryClient): Any => Json = {
- data =>
- recordFormatterSupport(schemaRegistryClient).formatMessage(formValueEncoder(schema, ValidationMode.lax)(data))
+ val recordFormatter = recordFormatterSupport(schemaRegistryClient)
+ val encodeRecord = formValueEncoder(schema, ValidationMode.lax)
+ (data: Any) => recordFormatter.formatMessage(encodeRecord(data))
}
}
diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/typed/AvroSchemaTypeDefinitionExtractor.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/typed/AvroSchemaTypeDefinitionExtractor.scala
index 7cc6e94baa0..94075faef90 100644
--- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/typed/AvroSchemaTypeDefinitionExtractor.scala
+++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/typed/AvroSchemaTypeDefinitionExtractor.scala
@@ -3,8 +3,7 @@ package pl.touk.nussknacker.engine.schemedkafka.typed
import org.apache.avro.generic.GenericData.EnumSymbol
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.{LogicalTypes, Schema}
-import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedClass, TypedNull, TypedObjectTypingResult, TypingResult}
-import pl.touk.nussknacker.engine.schemedkafka.schema.AvroStringSettings
+import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedClass, TypedNull, TypingResult}
import java.nio.ByteBuffer
import java.time.{Instant, LocalDate, LocalTime}
@@ -38,7 +37,7 @@ class AvroSchemaTypeDefinitionExtractor(recordUnderlyingType: TypedClass) {
Typed.genericTypeClass[java.util.List[_]](List(typeDefinition(schema.getElementType)))
case Schema.Type.MAP =>
Typed.genericTypeClass[java.util.Map[_, _]](
- List(AvroStringSettings.stringTypingResult, typeDefinition(schema.getValueType))
+ List(Typed[String], typeDefinition(schema.getValueType))
)
case Schema.Type.UNION =>
val childTypeDefinitions = schema.getTypes.asScala.map(sch => typeDefinition(sch)).toSet
@@ -52,10 +51,9 @@ class AvroSchemaTypeDefinitionExtractor(recordUnderlyingType: TypedClass) {
if schema.getLogicalType != null && schema.getLogicalType.isInstanceOf[LogicalTypes.Decimal] =>
Typed[java.math.BigDecimal]
case Schema.Type.STRING =>
- val baseType = AvroStringSettings.stringTypingResult
Option(schema.getProp(AvroSchemaTypeDefinitionExtractor.dictIdProperty))
- .map(Typed.taggedDictValue(baseType, _))
- .getOrElse(baseType)
+ .map(Typed.taggedDictValue(Typed.typedClass[String], _))
+ .getOrElse(Typed.typedClass[String])
case Schema.Type.BYTES =>
Typed[ByteBuffer]
case Schema.Type.FIXED =>
diff --git a/utils/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/helpers/SimpleKafkaAvroDeserializer.scala b/utils/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/helpers/SimpleKafkaAvroDeserializer.scala
index 63f8999832d..fbc8f2caa0e 100644
--- a/utils/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/helpers/SimpleKafkaAvroDeserializer.scala
+++ b/utils/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/helpers/SimpleKafkaAvroDeserializer.scala
@@ -19,8 +19,6 @@ class SimpleKafkaAvroDeserializer(schemaRegistry: SchemaRegistryClient) extends
protected lazy val decoderFactory: DecoderFactory = DecoderFactory.get()
private lazy val confluentAvroPayloadDeserializer = new AvroPayloadDeserializer(
- useSchemaReflection = false,
- useSpecificAvroReader = false,
new GenericRecordSchemaIdSerializationSupport(schemaIdSerializationEnabled = true),
decoderFactory
)