diff --git a/pom.xml b/pom.xml
index 90427cd4..e1767173 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,18 +65,23 @@
com.fasterxml.jackson.core
jackson-annotations
- 2.8.11
+ 2.9.10
com.fasterxml.jackson.core
jackson-core
- 2.8.11
+ 2.9.10
com.fasterxml.jackson.core
jackson-databind
2.9.10.3
+
+ org.apache.commons
+ commons-csv
+ 1.8
+
com.google.code.findbugs
jsr305
diff --git a/src/com/amazon/kinesis/streaming/agent/processing/processors/CSVToJSONDataConverter.java b/src/com/amazon/kinesis/streaming/agent/processing/processors/CSVToJSONDataConverter.java
index 3668c209..c0ce3a2d 100644
--- a/src/com/amazon/kinesis/streaming/agent/processing/processors/CSVToJSONDataConverter.java
+++ b/src/com/amazon/kinesis/streaming/agent/processing/processors/CSVToJSONDataConverter.java
@@ -13,12 +13,17 @@
*/
package com.amazon.kinesis.streaming.agent.processing.processors;
+import java.io.IOException;
+import java.io.StringReader;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
import org.slf4j.LoggerFactory;
import com.amazon.kinesis.streaming.agent.ByteBuffers;
@@ -69,19 +74,29 @@ public ByteBuffer convert(ByteBuffer data) throws DataConversionException {
dataStr = dataStr.substring(0, (dataStr.length() - NEW_LINE.length()));
}
- String[] columns = dataStr.split(delimiter);
-
- for (int i = 0; i < fieldNames.size(); i++) {
- try {
- recordMap.put(fieldNames.get(i), columns[i]);
- } catch (ArrayIndexOutOfBoundsException e) {
- LoggerFactory.getLogger(getClass()).debug("Null field in CSV detected");
- recordMap.put(fieldNames.get(i), null);
- } catch (Exception e) {
- throw new DataConversionException("Unable to create the column map", e);
- }
- }
-
+ CSVParser csvParser = null;
+ try {
+ if (delimiter.equals(",")) {
+ csvParser = CSVFormat.RFC4180.parse(new StringReader(dataStr));
+ } else {
+ csvParser = CSVFormat.TDF.withIgnoreSurroundingSpaces(false).parse(new StringReader(dataStr));
+ }
+ CSVRecord csvRecord = csvParser.getRecords().get(0);
+
+ for (int i = 0; i < fieldNames.size(); i++) {
+ try {
+ recordMap.put(fieldNames.get(i), csvRecord.get(i));
+ } catch (ArrayIndexOutOfBoundsException e) {
+ LoggerFactory.getLogger(getClass()).debug("Null field in CSV detected");
+ recordMap.put(fieldNames.get(i), null);
+ } catch (Exception e) {
+ throw new DataConversionException("Unable to create the column map", e);
+ }
+ }
+ } catch (IOException e) {
+ throw new DataConversionException("Unable to parse the CSV records.", e);
+ }
+
String dataJson = jsonProducer.writeAsString(recordMap) + NEW_LINE;
return ByteBuffer.wrap(dataJson.getBytes(StandardCharsets.UTF_8));
diff --git a/tst/com/amazon/kinesis/streaming/agent/processing/processors/DataConverterTest.java b/tst/com/amazon/kinesis/streaming/agent/processing/processors/DataConverterTest.java
index 1f3b0be9..435558fb 100644
--- a/tst/com/amazon/kinesis/streaming/agent/processing/processors/DataConverterTest.java
+++ b/tst/com/amazon/kinesis/streaming/agent/processing/processors/DataConverterTest.java
@@ -54,7 +54,7 @@ public void testCSVToJSONDataConverter() throws Exception {
put("customFieldNames", Arrays.asList("column1", "column2", "column3", "column4"));
}});
final IDataConverter converter = new CSVToJSONDataConverter(config);
- final String dataStr = "value1, value2,valu\ne3, value4\n";
+ final String dataStr = "value1, value2,\"valu\ne3\", value4\n";
final String expectedStr = "{\"column1\":\"value1\",\"column2\":\" value2\",\"column3\":\"valu\\ne3\",\"column4\":\" value4\"}\n";
verifyDataConversion(converter, dataStr.getBytes(), expectedStr.getBytes());
@@ -64,7 +64,11 @@ public void testCSVToJSONDataConverter() throws Exception {
final String dataStrMoreThanColumns = "value1,value2,value3,value4,value5\n";
final String expectedStrMoreThanColumns = "{\"column1\":\"value1\",\"column2\":\"value2\",\"column3\":\"value3\",\"column4\":\"value4\"}\n";
- verifyDataConversion(converter, dataStrMoreThanColumns.getBytes(), expectedStrMoreThanColumns.getBytes());
+ verifyDataConversion(converter, dataStrMoreThanColumns.getBytes(), expectedStrMoreThanColumns.getBytes());
+
+ final String dataStrEmbeddedComma = "value1,\"val,ue2\",value3,value4\n";
+ final String expectedStrEmbeddedComma = "{\"column1\":\"value1\",\"column2\":\"val,ue2\",\"column3\":\"value3\",\"column4\":\"value4\"}\n";
+ verifyDataConversion(converter, dataStrEmbeddedComma.getBytes(), expectedStrEmbeddedComma.getBytes());
}
@SuppressWarnings("serial")