diff --git a/chunjun-connectors/chunjun-connector-ftp/pom.xml b/chunjun-connectors/chunjun-connector-ftp/pom.xml index b3f63b9f88..bb0e12ebdd 100644 --- a/chunjun-connectors/chunjun-connector-ftp/pom.xml +++ b/chunjun-connectors/chunjun-connector-ftp/pom.xml @@ -63,7 +63,7 @@ under the License. com.alibaba easyexcel - 3.0.1 + 3.2.0 diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/excel/ExcelReadListener.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/excel/ExcelReadListener.java index 1286fe368b..27b4a9cae7 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/excel/ExcelReadListener.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/client/excel/ExcelReadListener.java @@ -18,22 +18,35 @@ package com.dtstack.chunjun.connector.ftp.client.excel; +import com.dtstack.chunjun.util.DateUtil; + import com.alibaba.excel.context.AnalysisContext; import com.alibaba.excel.read.listener.ReadListener; +import java.time.LocalDateTime; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -public class ExcelReadListener implements ReadListener> { +public class ExcelReadListener implements ReadListener> { private final BlockingQueue queue = new LinkedBlockingQueue<>(4096); @Override - public void invoke(Map data, AnalysisContext context) { + public void invoke(Map data, AnalysisContext context) { String[] piece = new String[data.size()]; - for (Map.Entry entry : data.entrySet()) { - piece[entry.getKey()] = entry.getValue() == null ? "" : entry.getValue(); + for (Map.Entry entry : data.entrySet()) { + String value = ""; + if (entry.getValue() != null) { + if (entry.getValue() instanceof LocalDateTime) { + value = + DateUtil.timestampToString( + DateUtil.localDateTimetoDate((LocalDateTime) entry.getValue())); + } else { + value = String.valueOf(entry.getValue()); + } + } + piece[entry.getKey()] = value; } Row row = new Row( diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/converter/FtpSqlConverter.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/converter/FtpSqlConverter.java index 99ef5ac644..23862c5742 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/converter/FtpSqlConverter.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/converter/FtpSqlConverter.java @@ -19,13 +19,23 @@ package com.dtstack.chunjun.connector.ftp.converter; import com.dtstack.chunjun.converter.AbstractRowConverter; +import com.dtstack.chunjun.converter.IDeserializationConverter; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; -public class FtpSqlConverter extends AbstractRowConverter { +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; + +public class FtpSqlConverter extends AbstractRowConverter { private static final long serialVersionUID = 4127516611259169686L; @@ -41,10 +51,59 @@ public FtpSqlConverter(SerializationSchema valueSerialization) { this.valueSerialization = valueSerialization; } + public FtpSqlConverter(RowType rowType) { + super(rowType); + for (int i = 0; i < rowType.getFieldCount(); i++) { + toInternalConverters.add( + wrapIntoNullableInternalConverter( + createInternalConverter(rowType.getTypeAt(i)))); + } + } + + @Override + protected IDeserializationConverter createInternalConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case NULL: + return val -> null; + case INTEGER: + return val -> Integer.valueOf((String) val); + case BIGINT: + return val -> Long.valueOf((String) val); + case FLOAT: + return val -> Float.valueOf((String) val); + case DOUBLE: + return val -> Double.valueOf((String) val); + case DECIMAL: + DecimalType decimalType = (DecimalType) type; + final int precision = decimalType.getPrecision(); + final int scale = decimalType.getScale(); + return val -> { + BigDecimal decimal = new BigDecimal(String.valueOf(val)); + return DecimalData.fromBigDecimal(decimal, precision, scale); + }; + case CHAR: + case VARCHAR: + return val -> StringData.fromString((String) val); + case DATE: + return val -> + (int) ((Date.valueOf(String.valueOf(val))).toLocalDate().toEpochDay()); + case TIME_WITHOUT_TIME_ZONE: + return val -> + (int) + ((Time.valueOf(String.valueOf(val))).toLocalTime().toNanoOfDay() + / 1_000_000L); + default: + throw new UnsupportedOperationException(type.toString()); + } + } + @Override - public RowData toInternal(String input) throws Exception { - valueDeserialization.open(new DummyInitializationContext()); - return valueDeserialization.deserialize(input.getBytes()); + public RowData toInternal(String[] input) throws Exception { + GenericRowData rowData = new GenericRowData(input.length); + for (int i = 0; i < fieldTypes.length; i++) { + rowData.setField(i, toInternalConverters.get(i).deserialize(input[i])); + } + return rowData; } @Override diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandler.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandler.java index 6df31ae8f8..b6e35953a4 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandler.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/handler/FtpHandler.java @@ -46,6 +46,7 @@ public class FtpHandler implements DTFtpHandler { private FTPClient ftpClient = null; private String controlEncoding; private FtpConfig ftpConfig; + private boolean isEncodePath = true; public FTPClient getFtpClient() { return ftpClient; @@ -57,6 +58,8 @@ public void loginFtpServer(FtpConfig ftpConfig) { controlEncoding = ftpConfig.getControlEncoding(); ftpClient = new FTPClient(); try { + // 设置编码: 解决中文路径问题, 需要在连接前设置编码 + ftpClient.setControlEncoding(controlEncoding); // 连接 ftpClient.connect(ftpConfig.getHost(), ftpConfig.getPort()); // 登录 @@ -87,12 +90,18 @@ public void loginFtpServer(FtpConfig ftpConfig) { log.error(message); throw new RuntimeException(message); } - ftpClient.setControlEncoding(ftpConfig.getControlEncoding()); ftpClient.setListHiddenFiles(ftpConfig.isListHiddenFiles()); if (StringUtils.isNotEmpty(ftpConfig.getCompressType())) { // 设置文件传输类型为二进制 ftpClient.setFileType(FTP.BINARY_FILE_TYPE); } + // 开启服务器对UTF-8的支持,解决读取中文路径或者中文文件名失败的问题 + if (FTPReply.isPositiveCompletion(ftpClient.sendCommand("OPTS UTF8", "ON"))) { + log.info("ftp server support UTF-8"); + isEncodePath = false; + } else { + log.warn("ftp server not support UTF-8"); + } } catch (Exception e) { throw new RuntimeException(e); } @@ -440,7 +449,10 @@ private boolean isExist(String path) { } private String encodePath(String path) throws UnsupportedEncodingException { - return new String(path.getBytes(controlEncoding), FTP.DEFAULT_CONTROL_ENCODING); + if (isEncodePath) { + return new String(path.getBytes(controlEncoding), FTP.DEFAULT_CONTROL_ENCODING); + } + return new String(path.getBytes(controlEncoding)); } public void reconnectFtp() { diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/iformat/ExcelFileFormat.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/iformat/ExcelFileFormat.java index 4514ce4e90..352ead7362 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/iformat/ExcelFileFormat.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/iformat/ExcelFileFormat.java @@ -40,6 +40,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import static com.alibaba.excel.enums.ReadDefaultReturnEnum.ACTUAL_DATA; import static java.util.concurrent.TimeUnit.NANOSECONDS; @Slf4j @@ -62,13 +63,17 @@ public void open(File file, InputStream inputStream, IFormatConfig config) { ExcelReadListener listener = new ExcelReadListener(); this.queue = listener.getQueue(); this.ec = new ExcelSubExceptionCarrier(); - ExcelReaderBuilder builder = EasyExcel.read(inputStream, listener); if (!config.isFirstLineHeader()) { builder.headRowNumber(0); } builder.ignoreEmptyRow(true); builder.autoCloseStream(true); + // @since 3.2.0 + // STRING:会返回一个Map的数组,返回值就是你在excel里面不点击单元格看到的内容 + // ACTUAL_DATA:会返回一个Map的数组,返回实际上存储的数据,会帮自动转换类型,Object类型为BigDecimal、Boolean、String、LocalDateTime、null,中的一个, + // READ_CELL_DATA: 会返回一个Map>的数组,其中?类型参照ACTUAL_DATA的 + builder.readDefaultReturn(ACTUAL_DATA); ExcelReader reader = builder.build(); this.sheetNum = reader.excelExecutor().sheetList().size(); diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpDynamicTableSource.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpDynamicTableSource.java index 53a49f6723..c94a88413f 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpDynamicTableSource.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpDynamicTableSource.java @@ -63,7 +63,8 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon builder.setFtpConfig(ftpConfig); builder.setRowConverter( new FtpSqlConverter( - decodingFormat.createRuntimeDecoder(runtimeProviderContext, dataType))); + InternalTypeInfo.of(schema.toPhysicalRowDataType().getLogicalType()) + .toRowType())); return ParallelSourceFunctionProvider.of( new DtInputFormatSourceFunction<>(builder.finish(), typeInformation), diff --git a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpInputFormat.java b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpInputFormat.java index aa906f9ac8..ad4b4bc575 100644 --- a/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpInputFormat.java +++ b/chunjun-connectors/chunjun-connector-ftp/src/main/java/com/dtstack/chunjun/connector/ftp/source/FtpInputFormat.java @@ -173,7 +173,8 @@ protected RowData nextRecordInternal(RowData rowData) throws ReadRecordException } if (rowConverter instanceof FtpSqlConverter) { - rowData = rowConverter.toInternal(String.join(",", fields)); + // 解决数据里包含特殊符号(逗号、换行符) + rowData = rowConverter.toInternal(fields); } else if (rowConverter instanceof FtpSyncConverter) { List columns = ftpConfig.getColumn();