diff --git a/src/main/java/org/embulk/parser/jsonpath/JsonpathParserPlugin.java b/src/main/java/org/embulk/parser/jsonpath/JsonpathParserPlugin.java index 80abc03..c64eb67 100644 --- a/src/main/java/org/embulk/parser/jsonpath/JsonpathParserPlugin.java +++ b/src/main/java/org/embulk/parser/jsonpath/JsonpathParserPlugin.java @@ -181,11 +181,10 @@ public int read() skipOrThrow(new DataException(e), stopOnInvalidRecord); continue; } - Map additionalValues = createAdditionalColumns(jsonPathMap, rootNode); if (json.isArray()) { for (JsonNode recordValue : json) { try { - createRecordFromJson(recordValue, schema, jsonPathMap, visitor, pageBuilder, additionalValues); + createRecordFromJson(rootNode, recordValue, schema, jsonPathMap, visitor, pageBuilder); } catch (DataException e) { skipOrThrow(e, stopOnInvalidRecord); @@ -195,7 +194,7 @@ public int read() } else { try { - createRecordFromJson(json, schema, jsonPathMap, visitor, pageBuilder, additionalValues); + createRecordFromJson(rootNode, json, schema, jsonPathMap, visitor, pageBuilder); } catch (DataException e) { skipOrThrow(e, stopOnInvalidRecord); @@ -209,23 +208,6 @@ public int read() } } - private Map createAdditionalColumns(Map jsonPathMap, JsonNode rootNode) - { - Map additionalColumns = new HashMap<>(); - jsonPathMap.forEach((column, path) -> { - if (path.startsWith("$")) { - try { - additionalColumns.put( - column, - JsonPath.using(JSON_PATH_CONFIG).parse(rootNode).read(path, JsonNode.class) - ); - } catch (PathNotFoundException e) { - logger.warn("Failed to get %s", path); - } - } - }); - return Collections.unmodifiableMap(additionalColumns); - } private Map createJsonPathMap(PluginTask task, Schema schema) { Map columnMap = new HashMap<>(); @@ -239,7 +221,7 @@ private Map createJsonPathMap(PluginTask task, Schema schema) return Collections.unmodifiableMap(columnMap); } - private void createRecordFromJson(JsonNode json, Schema schema, Map jsonPathMap, ColumnVisitorImpl visitor, PageBuilder pageBuilder, Map additionalValues) + private void createRecordFromJson(JsonNode root, JsonNode json, Schema schema, Map jsonPathMap, ColumnVisitorImpl visitor, PageBuilder pageBuilder) { if (json.getNodeType() != JsonNodeType.OBJECT) { throw new JsonRecordValidateException(format(Locale.ENGLISH, @@ -248,9 +230,10 @@ private void createRecordFromJson(JsonNode json, Schema schema, Map { - visitor.setValue(v); - k.visit(visitor); - } - ); - pageBuilder.addRecord(); } diff --git a/src/test/java/org/embulk/parser/jsonpath/TestJsonpathParserPlugin.java b/src/test/java/org/embulk/parser/jsonpath/TestJsonpathParserPlugin.java index ccd53ea..74ca46b 100644 --- a/src/test/java/org/embulk/parser/jsonpath/TestJsonpathParserPlugin.java +++ b/src/test/java/org/embulk/parser/jsonpath/TestJsonpathParserPlugin.java @@ -371,10 +371,11 @@ public void useJsonPath() throws Exception { SchemaConfig schema = schema( - column("__c0", BOOLEAN, config().set("path", "$._c0")), column("__c1", LONG, config().set("path", "$._c1")), - column("__c2", DOUBLE, config().set("path", "$._c2")), column("__c3", STRING, config().set("path", "$._c3")), - column("__c4", TIMESTAMP, config().set("format", "%Y-%m-%d %H:%M:%S %Z").set("path", "$._c4")), - column("__c5", JSON, config().set("path", "$._c5"))); + column("__c0", BOOLEAN, config().set("path", "_c0")), column("__c1", LONG, config().set("path", "_c1")), + column("__c2", DOUBLE, config().set("path", "_c2")), column("__c3", STRING, config().set("path", "_c3")), + column("__c4", TIMESTAMP, config().set("format", "%Y-%m-%d %H:%M:%S %Z").set("path", "_c4")), + column("__c5", JSON, config().set("path", "_c5")), + column("__c6", STRING, config().set("path", "$[0]._c3"))); ConfigSource config = this.config.deepCopy().set("columns", schema); transaction(config, fileInput( @@ -397,6 +398,7 @@ record = records.get(0); assertEquals("embulk", record[3]); assertEquals(Timestamp.ofEpochSecond(1451606400L), record[4]); assertEquals(newMap(newString("k"), newString("v")), record[5]); + assertEquals("embulk", record[6]); } { record = records.get(1); @@ -406,6 +408,7 @@ record = records.get(1); assertEquals("エンバルク", record[3]); assertEquals(Timestamp.ofEpochSecond(1451606400L), record[4]); assertEquals(newArray(newString("e0"), newString("e1")), record[5]); + assertEquals("embulk", record[6]); } recreatePageOutput(); @@ -416,10 +419,10 @@ public void writeNilsWithJsonPath() throws Exception { SchemaConfig schema = schema( - column("__c0", BOOLEAN, config().set("path", "$._c0")), column("__c1", LONG, config().set("path", "$._c1")), - column("__c2", DOUBLE, config().set("path", "$._c2")), column("__c3", STRING, config().set("path", "$._c3")), - column("__c4", TIMESTAMP, config().set("format", "%Y-%m-%d %H:%M:%S %Z").set("path", "$._c4")), - column("__c5", JSON, config().set("path", "$._c5"))); + column("__c0", BOOLEAN, config().set("path", "_c0")), column("__c1", LONG, config().set("path", "_c1")), + column("__c2", DOUBLE, config().set("path", "_c2")), column("__c3", STRING, config().set("path", "_c3")), + column("__c4", TIMESTAMP, config().set("format", "%Y-%m-%d %H:%M:%S %Z").set("path", "_c4")), + column("__c5", JSON, config().set("path", "_c5")), column("__c6", STRING, config().set("path", "$[0]._c3"))); ConfigSource config = this.config.deepCopy().set("columns", schema); transaction(config, fileInput( @@ -435,7 +438,7 @@ public void writeNilsWithJsonPath() assertEquals(4, records.size()); for (Object[] record : records) { - for (int i = 0; i < 6; i++) { + for (int i = 0; i < 7; i++) { assertNull(record[i]); } } @@ -445,7 +448,7 @@ public void writeNilsWithJsonPath() public void notArrayObject() throws Exception { - SchemaConfig schema = schema(column("__c0", STRING, config().set("path", "$._c0"))); + SchemaConfig schema = schema(column("__c0", STRING, config().set("path", "_c0"))); ConfigSource config = this.config.deepCopy().set("columns", schema); transaction(config, fileInput("{ \"_c0\": \"embulk\" }")); @@ -459,7 +462,7 @@ public void notArrayObject() public void rootPathJsonIsObject() throws Exception { - SchemaConfig schema = schema(column("__c0", STRING, config().set("path", "$._c0"))); + SchemaConfig schema = schema(column("__c0", STRING, config().set("path", "_c0"))); ConfigSource config = this.config.deepCopy().set("columns", schema).set("root", "$[0].root"); transaction(config, fileInput( @@ -477,7 +480,7 @@ public void rootPathJsonIsObject() public void rootPathJsonIsNotArrayOrObject() { assertThrows(DataException.class, () -> { - SchemaConfig schema = schema(column("__c0", STRING, config().set("path", "$._c0"))); + SchemaConfig schema = schema(column("__c0", STRING, config().set("path", "_c0"))); ConfigSource config = this.config.deepCopy() .set("columns", schema) .set("root", "$[0].root")