From 7ee9ea1de3fb9e98cfb80fb62796abf4f7c091aa Mon Sep 17 00:00:00 2001 From: libailin Date: Tue, 28 Nov 2023 11:10:29 +0800 Subject: [PATCH] [hotfix-#1849][jdbc] fixed load data using lru method is empty, hide clear text passwords in logs, fixed build cache key when GenericRowData --- .../jdbc/converter/JdbcSqlConverter.java | 20 ++++++++++++++++ .../jdbc/lookup/JdbcLruTableFunction.java | 24 +++++++++++++------ .../lookup/AbstractLruTableFunction.java | 11 +++++++++ .../com/dtstack/chunjun/util/JsonUtil.java | 3 ++- 4 files changed, 50 insertions(+), 8 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/converter/JdbcSqlConverter.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/converter/JdbcSqlConverter.java index f57938ce47..b14b8f39ca 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/converter/JdbcSqlConverter.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/converter/JdbcSqlConverter.java @@ -130,6 +130,26 @@ public RowData toInternalLookup(JsonArray jsonArray) throws Exception { GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount()); for (int pos = 0; pos < rowType.getFieldCount(); pos++) { Object field = jsonArray.getValue(pos); + // 当sql里声明的字段类型为BIGINT时,将BigInteger (BIGINT UNSIGNED) 转换为Long + if (rowType.getFields() + .get(pos) + .getType() + .getTypeRoot() + .name() + .equalsIgnoreCase("BIGINT") + && field instanceof BigInteger) { + field = ((BigInteger) field).longValue(); + } + // 当sql里声明的字段类型为INT时,将Long (INT UNSIGNED) 转换为Integer + if (rowType.getFields() + .get(pos) + .getType() + .getTypeRoot() + .name() + .equalsIgnoreCase("INTEGER") + && field instanceof Long) { + field = ((Long) field).intValue(); + } genericRowData.setField(pos, toInternalConverters.get(pos).deserialize(field)); } return genericRowData; diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/lookup/JdbcLruTableFunction.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/lookup/JdbcLruTableFunction.java index 45ddce87c9..9e8dc46860 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/lookup/JdbcLruTableFunction.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/lookup/JdbcLruTableFunction.java @@ -29,8 +29,10 @@ import com.dtstack.chunjun.lookup.config.LookupConfig; import com.dtstack.chunjun.throwable.NoRestartException; import com.dtstack.chunjun.util.DateUtil; +import com.dtstack.chunjun.util.JsonUtil; import com.dtstack.chunjun.util.ThreadUtil; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.types.logical.RowType; @@ -138,7 +140,8 @@ public void open(FunctionContext context) throws Exception { new LinkedBlockingQueue<>(MAX_TASK_QUEUE_SIZE.defaultValue()), new ChunJunThreadFactory("rdbAsyncExec"), new ThreadPoolExecutor.CallerRunsPolicy()); - log.info("async dim table JdbcOptions info: {} ", jdbcConfig.toString()); + // 隐藏日志中明文密码 + log.info("async dim table JdbcOptions info: {} ", JsonUtil.toPrintJson(jdbcConfig)); } @Override @@ -153,12 +156,19 @@ public void handleAsyncInvoke(CompletableFuture> future, Obj Thread.sleep(100); } - executor.execute( - () -> - connectWithRetry( - future, - rdbSqlClient, - Stream.of(keys).map(this::convertDataType).toArray(Object[]::new))); + List keyList = new ArrayList<>(); + for (Object key : keys) { + if (key instanceof GenericRowData) { + GenericRowData genericRowData = (GenericRowData) key; + for (int i = 0; i < genericRowData.getArity(); i++) { + keyList.add(this.convertDataType(genericRowData.getField(i))); + } + } else { + keyList.add(this.convertDataType(key)); + } + } + + executor.execute(() -> connectWithRetry(future, rdbSqlClient, keyList.toArray())); } private Object convertDataType(Object val) { diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/lookup/AbstractLruTableFunction.java b/chunjun-core/src/main/java/com/dtstack/chunjun/lookup/AbstractLruTableFunction.java index 68bb87f281..e5c82c4f8a 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/lookup/AbstractLruTableFunction.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/lookup/AbstractLruTableFunction.java @@ -29,6 +29,7 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.execution.SuppressRestartsException; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.functions.AsyncLookupFunction; import org.apache.flink.table.functions.FunctionContext; @@ -279,6 +280,16 @@ public abstract void handleAsyncInvoke( * @return */ public String buildCacheKey(Object... keys) { + if (keys != null && keys.length == 1 && keys[0] instanceof GenericRowData) { + GenericRowData rowData = (GenericRowData) keys[0]; + int[] keyIndexes = new int[rowData.getArity()]; + for (int i = 0; i < rowData.getArity(); i++) { + keyIndexes[i] = i; + } + return Arrays.stream(keyIndexes) + .mapToObj(index -> String.valueOf(rowData.getField(index))) + .collect(Collectors.joining("_")); + } return Arrays.stream(keys).map(String::valueOf).collect(Collectors.joining("_")); } diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/util/JsonUtil.java b/chunjun-core/src/main/java/com/dtstack/chunjun/util/JsonUtil.java index 290c6647f7..a13194184b 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/util/JsonUtil.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/util/JsonUtil.java @@ -109,7 +109,8 @@ public static String toPrintJson(Object obj) { try { Map result = objectMapper.readValue(objectMapper.writeValueAsString(obj), HashMap.class); - MapUtil.replaceAllElement(result, Lists.newArrayList("pwd", "password"), "******"); + MapUtil.replaceAllElement( + result, Lists.newArrayList("pwd", "password", "druid.password"), "******"); return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(result); } catch (Exception e) { throw new RuntimeException("error parse [" + obj + "] to json", e);