From c296157d4e0b62e63298845a5274bcb59da66139 Mon Sep 17 00:00:00 2001 From: Sebastian Krueger Date: Mon, 13 Apr 2020 00:02:02 +1200 Subject: [PATCH] Fix CSV parsing to handle nested comma delimiter inside quotes by using Apache Commons CSV. --- pom.xml | 9 +++- .../processors/CSVToJSONDataConverter.java | 41 +++++++++++++------ .../processors/DataConverterTest.java | 8 +++- 3 files changed, 41 insertions(+), 17 deletions(-) diff --git a/pom.xml b/pom.xml index 90427cd4..e1767173 100644 --- a/pom.xml +++ b/pom.xml @@ -65,18 +65,23 @@ com.fasterxml.jackson.core jackson-annotations - 2.8.11 + 2.9.10 com.fasterxml.jackson.core jackson-core - 2.8.11 + 2.9.10 com.fasterxml.jackson.core jackson-databind 2.9.10.3 + + org.apache.commons + commons-csv + 1.8 + com.google.code.findbugs jsr305 diff --git a/src/com/amazon/kinesis/streaming/agent/processing/processors/CSVToJSONDataConverter.java b/src/com/amazon/kinesis/streaming/agent/processing/processors/CSVToJSONDataConverter.java index 3668c209..c0ce3a2d 100644 --- a/src/com/amazon/kinesis/streaming/agent/processing/processors/CSVToJSONDataConverter.java +++ b/src/com/amazon/kinesis/streaming/agent/processing/processors/CSVToJSONDataConverter.java @@ -13,12 +13,17 @@ */ package com.amazon.kinesis.streaming.agent.processing.processors; +import java.io.IOException; +import java.io.StringReader; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; import org.slf4j.LoggerFactory; import com.amazon.kinesis.streaming.agent.ByteBuffers; @@ -69,19 +74,29 @@ public ByteBuffer convert(ByteBuffer data) throws DataConversionException { dataStr = dataStr.substring(0, (dataStr.length() - NEW_LINE.length())); } - String[] columns = dataStr.split(delimiter); - - for (int i = 0; i < fieldNames.size(); i++) { - try { - recordMap.put(fieldNames.get(i), columns[i]); - } catch (ArrayIndexOutOfBoundsException e) { - LoggerFactory.getLogger(getClass()).debug("Null field in CSV detected"); - recordMap.put(fieldNames.get(i), null); - } catch (Exception e) { - throw new DataConversionException("Unable to create the column map", e); - } - } - + CSVParser csvParser = null; + try { + if (delimiter.equals(",")) { + csvParser = CSVFormat.RFC4180.parse(new StringReader(dataStr)); + } else { + csvParser = CSVFormat.TDF.withIgnoreSurroundingSpaces(false).parse(new StringReader(dataStr)); + } + CSVRecord csvRecord = csvParser.getRecords().get(0); + + for (int i = 0; i < fieldNames.size(); i++) { + try { + recordMap.put(fieldNames.get(i), csvRecord.get(i)); + } catch (ArrayIndexOutOfBoundsException e) { + LoggerFactory.getLogger(getClass()).debug("Null field in CSV detected"); + recordMap.put(fieldNames.get(i), null); + } catch (Exception e) { + throw new DataConversionException("Unable to create the column map", e); + } + } + } catch (IOException e) { + throw new DataConversionException("Unable to parse the CSV records.", e); + } + String dataJson = jsonProducer.writeAsString(recordMap) + NEW_LINE; return ByteBuffer.wrap(dataJson.getBytes(StandardCharsets.UTF_8)); diff --git a/tst/com/amazon/kinesis/streaming/agent/processing/processors/DataConverterTest.java b/tst/com/amazon/kinesis/streaming/agent/processing/processors/DataConverterTest.java index 1f3b0be9..435558fb 100644 --- a/tst/com/amazon/kinesis/streaming/agent/processing/processors/DataConverterTest.java +++ b/tst/com/amazon/kinesis/streaming/agent/processing/processors/DataConverterTest.java @@ -54,7 +54,7 @@ public void testCSVToJSONDataConverter() throws Exception { put("customFieldNames", Arrays.asList("column1", "column2", "column3", "column4")); }}); final IDataConverter converter = new CSVToJSONDataConverter(config); - final String dataStr = "value1, value2,valu\ne3, value4\n"; + final String dataStr = "value1, value2,\"valu\ne3\", value4\n"; final String expectedStr = "{\"column1\":\"value1\",\"column2\":\" value2\",\"column3\":\"valu\\ne3\",\"column4\":\" value4\"}\n"; verifyDataConversion(converter, dataStr.getBytes(), expectedStr.getBytes()); @@ -64,7 +64,11 @@ public void testCSVToJSONDataConverter() throws Exception { final String dataStrMoreThanColumns = "value1,value2,value3,value4,value5\n"; final String expectedStrMoreThanColumns = "{\"column1\":\"value1\",\"column2\":\"value2\",\"column3\":\"value3\",\"column4\":\"value4\"}\n"; - verifyDataConversion(converter, dataStrMoreThanColumns.getBytes(), expectedStrMoreThanColumns.getBytes()); + verifyDataConversion(converter, dataStrMoreThanColumns.getBytes(), expectedStrMoreThanColumns.getBytes()); + + final String dataStrEmbeddedComma = "value1,\"val,ue2\",value3,value4\n"; + final String expectedStrEmbeddedComma = "{\"column1\":\"value1\",\"column2\":\"val,ue2\",\"column3\":\"value3\",\"column4\":\"value4\"}\n"; + verifyDataConversion(converter, dataStrEmbeddedComma.getBytes(), expectedStrEmbeddedComma.getBytes()); } @SuppressWarnings("serial")