Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
chikamura committed Jan 31, 2025
1 parent 54501ac commit c6143f2
Showing 1 changed file with 6 additions and 30 deletions.
36 changes: 6 additions & 30 deletions src/main/java/org/embulk/parser/jsonpath/JsonpathParserPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,10 @@ public int read()
skipOrThrow(new DataException(e), stopOnInvalidRecord);
continue;
}
Map<Column, JsonNode> additionalValues = createAdditionalColumns(jsonPathMap, rootNode);
if (json.isArray()) {
for (JsonNode recordValue : json) {
try {
createRecordFromJson(recordValue, schema, jsonPathMap, visitor, pageBuilder, additionalValues);
createRecordFromJson(rootNode, recordValue, schema, jsonPathMap, visitor, pageBuilder);
}
catch (DataException e) {
skipOrThrow(e, stopOnInvalidRecord);
Expand All @@ -195,7 +194,7 @@ public int read()
}
else {
try {
createRecordFromJson(json, schema, jsonPathMap, visitor, pageBuilder, additionalValues);
createRecordFromJson(rootNode, json, schema, jsonPathMap, visitor, pageBuilder);
}
catch (DataException e) {
skipOrThrow(e, stopOnInvalidRecord);
Expand All @@ -209,23 +208,6 @@ public int read()
}
}

private Map<Column, JsonNode> createAdditionalColumns(Map<Column, String> jsonPathMap, JsonNode rootNode)
{
Map<Column, JsonNode> additionalColumns = new HashMap<>();
jsonPathMap.forEach((column, path) -> {
if (path.startsWith("$")) {
try {
additionalColumns.put(
column,
JsonPath.using(JSON_PATH_CONFIG).parse(rootNode).read(path, JsonNode.class)
);
} catch (PathNotFoundException e) {
logger.warn("Failed to get %s", path);
}
}
});
return Collections.unmodifiableMap(additionalColumns);
}
private Map<Column, String> createJsonPathMap(PluginTask task, Schema schema)
{
Map<Column, String> columnMap = new HashMap<>();
Expand All @@ -239,7 +221,7 @@ private Map<Column, String> createJsonPathMap(PluginTask task, Schema schema)
return Collections.unmodifiableMap(columnMap);
}

private void createRecordFromJson(JsonNode json, Schema schema, Map<Column, String> jsonPathMap, ColumnVisitorImpl visitor, PageBuilder pageBuilder, Map<Column, JsonNode> additionalValues)
private void createRecordFromJson(JsonNode root, JsonNode json, Schema schema, Map<Column, String> jsonPathMap, ColumnVisitorImpl visitor, PageBuilder pageBuilder)
{
if (json.getNodeType() != JsonNodeType.OBJECT) {
throw new JsonRecordValidateException(format(Locale.ENGLISH,
Expand All @@ -248,27 +230,21 @@ private void createRecordFromJson(JsonNode json, Schema schema, Map<Column, Stri

for (Column column : schema.getColumns()) {
JsonNode value = null;
if (jsonPathMap.containsKey(column) && !jsonPathMap.get(column).startsWith("$")) {
if (jsonPathMap.containsKey(column)) {
try {
value = JsonPath.using(JSON_PATH_CONFIG).parse(json).read(jsonPathMap.get(column));
String path = jsonPathMap.get(column);
value = JsonPath.using(JSON_PATH_CONFIG).parse(path.startsWith("$") ? root : json).read(path, JsonNode.class);
}
catch (PathNotFoundException e) {
// pass (value is nullable)
}
}
else {
value = json.get(column.getName());

}
visitor.setValue(value);
column.visit(visitor);
}
additionalValues.forEach( (k, v) -> {
visitor.setValue(v);
k.visit(visitor);
}
);

pageBuilder.addRecord();
}

Expand Down

0 comments on commit c6143f2

Please sign in to comment.