Skip to content

Commit

Permalink
fixed to allow path from root($) to be specified in path
Browse files Browse the repository at this point in the history
  • Loading branch information
d-hrs committed Oct 30, 2024
1 parent 4330bba commit b6e8c6a
Showing 1 changed file with 31 additions and 6 deletions.
37 changes: 31 additions & 6 deletions src/main/java/org/embulk/parser/jsonpath/JsonpathParserPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,10 @@ public int read()
}
};
final JsonNode json;
final JsonNode rootNode;
try {
json = JsonPath.using(JSON_PATH_CONFIG).parse(toParse).read(jsonRoot, JsonNode.class);
rootNode = JsonPath.using(JSON_PATH_CONFIG).parse(toParse).read("$", JsonNode.class);
json = JsonPath.using(JSON_PATH_CONFIG).parse(rootNode).read(jsonRoot, JsonNode.class);
}
catch (PathNotFoundException e) {
skipOrThrow(new DataException(format(Locale.ENGLISH,
Expand All @@ -179,11 +181,11 @@ public int read()
skipOrThrow(new DataException(e), stopOnInvalidRecord);
continue;
}

Map<Column, JsonNode> additionalValues = createAdditionalColumns(jsonPathMap, rootNode);
if (json.isArray()) {
for (JsonNode recordValue : json) {
try {
createRecordFromJson(recordValue, schema, jsonPathMap, visitor, pageBuilder);
createRecordFromJson(recordValue, schema, jsonPathMap, visitor, pageBuilder, additionalValues);
}
catch (DataException e) {
skipOrThrow(e, stopOnInvalidRecord);
Expand All @@ -193,7 +195,7 @@ public int read()
}
else {
try {
createRecordFromJson(json, schema, jsonPathMap, visitor, pageBuilder);
createRecordFromJson(json, schema, jsonPathMap, visitor, pageBuilder, additionalValues);
}
catch (DataException e) {
skipOrThrow(e, stopOnInvalidRecord);
Expand All @@ -207,6 +209,23 @@ public int read()
}
}

private Map<Column, JsonNode> createAdditionalColumns(Map<Column, String> jsonPathMap, JsonNode rootNode)
{
Map<Column, JsonNode> additionalColumns = new HashMap<>();
jsonPathMap.forEach((column, path) -> {
if (path.startsWith("$")) {
try {
additionalColumns.put(
column,
JsonPath.using(JSON_PATH_CONFIG).parse(rootNode).read(path, JsonNode.class)
);
} catch (PathNotFoundException e) {
logger.warn("Failed to get %s", path);
}
}
});
return Collections.unmodifiableMap(additionalColumns);
}
private Map<Column, String> createJsonPathMap(PluginTask task, Schema schema)
{
Map<Column, String> columnMap = new HashMap<>();
Expand All @@ -220,7 +239,7 @@ private Map<Column, String> createJsonPathMap(PluginTask task, Schema schema)
return Collections.unmodifiableMap(columnMap);
}

private void createRecordFromJson(JsonNode json, Schema schema, Map<Column, String> jsonPathMap, ColumnVisitorImpl visitor, PageBuilder pageBuilder)
private void createRecordFromJson(JsonNode json, Schema schema, Map<Column, String> jsonPathMap, ColumnVisitorImpl visitor, PageBuilder pageBuilder, Map<Column, JsonNode> additionalValues)
{
if (json.getNodeType() != JsonNodeType.OBJECT) {
throw new JsonRecordValidateException(format(Locale.ENGLISH,
Expand All @@ -229,7 +248,7 @@ private void createRecordFromJson(JsonNode json, Schema schema, Map<Column, Stri

for (Column column : schema.getColumns()) {
JsonNode value = null;
if (jsonPathMap.containsKey(column)) {
if (jsonPathMap.containsKey(column) && !jsonPathMap.get(column).startsWith("$")) {
try {
value = JsonPath.using(JSON_PATH_CONFIG).parse(json).read(jsonPathMap.get(column));
}
Expand All @@ -239,10 +258,16 @@ private void createRecordFromJson(JsonNode json, Schema schema, Map<Column, Stri
}
else {
value = json.get(column.getName());

}
visitor.setValue(value);
column.visit(visitor);
}
additionalValues.forEach( (k, v) -> {
visitor.setValue(v);
k.visit(visitor);
}
);

pageBuilder.addRecord();
}
Expand Down

0 comments on commit b6e8c6a

Please sign in to comment.