Skip to content

Commit

Permalink
[hotfix-#1745][connector][hdfs][hive] Data loss occurs when multiple …
Browse files Browse the repository at this point in the history
…tasks write to the same hdfs (or hive) data source
  • Loading branch information
gaoliang authored and zoudaokoulife committed Nov 3, 2023
1 parent e5c0066 commit f4402f0
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ private HdfsConfig getHdfsConfig(ReadableConfig config) {
hdfsConfig.setEncoding(config.get(BaseFileOptions.ENCODING));
hdfsConfig.setMaxFileSize(config.get(BaseFileOptions.MAX_FILE_SIZE));
hdfsConfig.setNextCheckRows(config.get(BaseFileOptions.NEXT_CHECK_ROWS));
hdfsConfig.setJobIdentifier(config.get(BaseFileOptions.JOB_IDENTIFIER));

hdfsConfig.setDefaultFS(config.get(HdfsOptions.DEFAULT_FS));
hdfsConfig.setFileType(config.get(HdfsOptions.FILE_TYPE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,6 @@ public class BaseFileConfig extends CommonConfig {
private long nextCheckRows = 5000;

private String suffix;

private String jobIdentifier = "";
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,12 @@ protected void initVariableFields() {
} else {
outputFilePath = baseFileConfig.getPath();
}
tmpPath = outputFilePath + File.separatorChar + TMP_DIR_NAME;
tmpPath =
outputFilePath
+ File.separatorChar
+ TMP_DIR_NAME
+ baseFileConfig.getJobIdentifier();
log.info("[initVariableFields] get tmpPath: {}", tmpPath);
nextNumForCheckDataSize = baseFileConfig.getNextCheckRows();
openSource();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,11 @@ public class BaseFileOptions {
.longType()
.defaultValue(5000L)
.withDescription("The number of data written in the next file size check");

public static final ConfigOption<String> JOB_IDENTIFIER =
ConfigOptions.key("properties.job-identifier")
.stringType()
.defaultValue("")
.withDescription(
"To solve the problem of file loss caused by multiple tasks writing to the same output source at the same time");
}
42 changes: 42 additions & 0 deletions chunjun-examples/sql/hdfs/stream_sink_hdfs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
CREATE TABLE source
(
id int,
col_boolean boolean,
col_tinyint tinyint
) WITH (
'connector' = 'stream-x'
,'number-of-rows' = '10000'
);

CREATE TABLE sink
(
id int,
col_boolean boolean
) WITH (
'connector' = 'hdfs-x'
,'path' = 'hdfs://ns/user/hive/warehouse/tudou.db/type_txt'
,'file-name' = 'pt=1'
,'properties.hadoop.user.name' = 'root'
,'properties.dfs.ha.namenodes.ns' = 'nn1,nn2'
,'properties.fs.defaultFS' = 'hdfs://ns'
,'properties.dfs.namenode.rpc-address.ns.nn2' = 'ip:9000'
,'properties.dfs.client.failover.proxy.provider.ns' = 'org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
,'properties.dfs.namenode.rpc-address.ns.nn1' = 'ip:9000'
,'properties.dfs.nameservices' = 'ns'
,'properties.fs.hdfs.impl.disable.cache' = 'true'
,'properties.fs.hdfs.impl' = 'org.apache.hadoop.hdfs.DistributedFileSystem'
,'default-fs' = 'hdfs://ns'
,'field-delimiter' = ','
,'encoding' = 'utf-8'
,'max-file-size' = '10485760'
,'next-check-rows' = '20000'
,'write-mode' = 'overwrite'
,'file-type' = 'text'
-- 为了处理多任务(HDFS、HIVE)同时输出到同一数据源时数据丢失的问题
-- 每个任务指定唯一的字符标识,且不能变化
,'properties.job-identifier' = 'job_id_tmp'
);

insert into sink
select *
from source u;

0 comments on commit f4402f0

Please sign in to comment.