Skip to content

Commit

Permalink
[hotfix-#1849][jdbc] fixed load data using lru method is empty, hide …
Browse files Browse the repository at this point in the history
…clear text passwords in logs, fixed build cache key when GenericRowData
  • Loading branch information
libailin authored and zoudaokoulife committed Dec 6, 2023
1 parent 2d5f489 commit 41b96a8
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,26 @@ public RowData toInternalLookup(JsonArray jsonArray) throws Exception {
GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount());
for (int pos = 0; pos < rowType.getFieldCount(); pos++) {
Object field = jsonArray.getValue(pos);
// 当sql里声明的字段类型为BIGINT时,将BigInteger (BIGINT UNSIGNED) 转换为Long
if (rowType.getFields()
.get(pos)
.getType()
.getTypeRoot()
.name()
.equalsIgnoreCase("BIGINT")
&& field instanceof BigInteger) {
field = ((BigInteger) field).longValue();
}
// 当sql里声明的字段类型为INT时,将Long (INT UNSIGNED) 转换为Integer
if (rowType.getFields()
.get(pos)
.getType()
.getTypeRoot()
.name()
.equalsIgnoreCase("INTEGER")
&& field instanceof Long) {
field = ((Long) field).intValue();
}
genericRowData.setField(pos, toInternalConverters.get(pos).deserialize(field));
}
return genericRowData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import com.dtstack.chunjun.lookup.config.LookupConfig;
import com.dtstack.chunjun.throwable.NoRestartException;
import com.dtstack.chunjun.util.DateUtil;
import com.dtstack.chunjun.util.JsonUtil;
import com.dtstack.chunjun.util.ThreadUtil;

import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -138,7 +140,8 @@ public void open(FunctionContext context) throws Exception {
new LinkedBlockingQueue<>(MAX_TASK_QUEUE_SIZE.defaultValue()),
new ChunJunThreadFactory("rdbAsyncExec"),
new ThreadPoolExecutor.CallerRunsPolicy());
log.info("async dim table JdbcOptions info: {} ", jdbcConfig.toString());
// 隐藏日志中明文密码
log.info("async dim table JdbcOptions info: {} ", JsonUtil.toPrintJson(jdbcConfig));
}

@Override
Expand All @@ -153,12 +156,19 @@ public void handleAsyncInvoke(CompletableFuture<Collection<RowData>> future, Obj
Thread.sleep(100);
}

executor.execute(
() ->
connectWithRetry(
future,
rdbSqlClient,
Stream.of(keys).map(this::convertDataType).toArray(Object[]::new)));
List<Object> keyList = new ArrayList<>();
for (Object key : keys) {
if (key instanceof GenericRowData) {
GenericRowData genericRowData = (GenericRowData) key;
for (int i = 0; i < genericRowData.getArity(); i++) {
keyList.add(this.convertDataType(genericRowData.getField(i)));
}
} else {
keyList.add(this.convertDataType(key));
}
}

executor.execute(() -> connectWithRetry(future, rdbSqlClient, keyList.toArray()));
}

private Object convertDataType(Object val) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.AsyncLookupFunction;
import org.apache.flink.table.functions.FunctionContext;
Expand Down Expand Up @@ -279,6 +280,16 @@ public abstract void handleAsyncInvoke(
* @return
*/
public String buildCacheKey(Object... keys) {
if (keys != null && keys.length == 1 && keys[0] instanceof GenericRowData) {
GenericRowData rowData = (GenericRowData) keys[0];
int[] keyIndexes = new int[rowData.getArity()];
for (int i = 0; i < rowData.getArity(); i++) {
keyIndexes[i] = i;
}
return Arrays.stream(keyIndexes)
.mapToObj(index -> String.valueOf(rowData.getField(index)))
.collect(Collectors.joining("_"));
}
return Arrays.stream(keys).map(String::valueOf).collect(Collectors.joining("_"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public static String toPrintJson(Object obj) {
try {
Map<String, Object> result =
objectMapper.readValue(objectMapper.writeValueAsString(obj), HashMap.class);
MapUtil.replaceAllElement(result, Lists.newArrayList("pwd", "password"), "******");
MapUtil.replaceAllElement(
result, Lists.newArrayList("pwd", "password", "druid.password"), "******");
return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(result);
} catch (Exception e) {
throw new RuntimeException("error parse [" + obj + "] to json", e);
Expand Down

0 comments on commit 41b96a8

Please sign in to comment.