Skip to content

Commit

Permalink
[chunjun-server] 回滚其他功能调整的改动在该提交里面。
Browse files Browse the repository at this point in the history
  • Loading branch information
zoudaokoulife committed Jan 22, 2024
1 parent 112150d commit 916c61d
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 56 deletions.
4 changes: 2 additions & 2 deletions bin/start-server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ echo "CHUNJUN_HOME:"$CHUNJUN_HOME >&2
HO_HEAP_SIZE="${HO_HEAP_SIZE:=1024m}"

JAVA_OPTS="$JAVA_OPTS -Xmx${HO_HEAP_SIZE}"
JAVA_OPTS="$JAVA_OPTS -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=10006"
#JAVA_OPTS="$JAVA_OPTS -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=10006"

JAVA_OPTS="$JAVA_OPTS -Xms${HO_HEAP_SIZE}"

Expand Down Expand Up @@ -78,7 +78,7 @@ CLASS_NAME=com.dtstack.chunjun.server.ServerLauncher

start(){
echo "ChunJun server starting ..."
nice -n ${CHUNJUN_NICE} $JAVA_RUN $JAVA_OPTS -cp $JAR_DIR $CLASS_NAME $@ 1> "${CHUNJUN_LOG_DIR}/chunjun.stdout" 2> "${CHUNJUN_LOG_DIR}/chunjun.err" &
nice -n ${CHUNJUN_NICE} $JAVA_RUN $JAVA_OPTS -cp $JAR_DIR $CLASS_NAME $@ 1> "${CHUNJUN_LOG_DIR}/chunjun.stdout" 2>&1 &
echo $! > $pidfile
ret=$?
return 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public JdbcLruTableFunction(
String[] fieldNames,
String[] keyNames,
RowType rowType) {
super(lookupConfig, jdbcDialect.getRowConverter(rowType), keyNames, rowType);
super(lookupConfig, jdbcDialect.getRowConverter(rowType));
this.jdbcConfig = jdbcConfig;
this.jdbcDialect = jdbcDialect;
this.asyncPoolSize = ((JdbcLookupConfig) lookupConfig).getAsyncPoolSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,9 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.AsyncLookupFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;

import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
Expand All @@ -64,25 +61,9 @@ public abstract class AbstractLruTableFunction extends AsyncLookupFunction {
private static final int TIMEOUT_LOG_FLUSH_NUM = 10;
private int timeOutNum = 0;

protected final RowData.FieldGetter[] fieldGetters;

public AbstractLruTableFunction(
LookupConfig lookupConfig,
AbstractRowConverter rowConverter,
String[] keyNames,
RowType rowType) {
public AbstractLruTableFunction(LookupConfig lookupConfig, AbstractRowConverter rowConverter) {
this.lookupConfig = lookupConfig;
this.rowConverter = rowConverter;

this.fieldGetters = new RowData.FieldGetter[keyNames.length];
List<Pair<LogicalType, Integer>> fieldTypeAndPositionOfKeyField =
getFieldTypeAndPositionOfKeyField(keyNames, rowType);
for (int i = 0; i < fieldTypeAndPositionOfKeyField.size(); i++) {
Pair<LogicalType, Integer> typeAndPosition = fieldTypeAndPositionOfKeyField.get(i);
fieldGetters[i] =
RowData.createFieldGetter(
typeAndPosition.getLeft(), typeAndPosition.getRight());
}
}

@Override
Expand Down Expand Up @@ -217,19 +198,15 @@ protected void preInvoke(CompletableFuture<Collection<RowData>> future, Object..
public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
CompletableFuture<Collection<RowData>> lookupFuture = new CompletableFuture<>();
try {
Object[] keyData = new Object[keyRow.getArity()];
for (int i = 0; i < keyRow.getArity(); i++) {
keyData[i] = fieldGetters[i].getFieldOrNull(keyRow);
}
preInvoke(lookupFuture, keyRow);

preInvoke(lookupFuture, keyData);
String cacheKey = buildCacheKey(keyData);
String cacheKey = buildCacheKey(keyRow);
// 缓存判断
if (isUseCache(cacheKey)) {
invokeWithCache(cacheKey, lookupFuture);
return lookupFuture;
}
handleAsyncInvoke(lookupFuture, keyData);
handleAsyncInvoke(lookupFuture, keyRow);
} catch (Exception e) {
// todo 优化
log.error(e.getMessage());
Expand Down Expand Up @@ -331,14 +308,4 @@ protected void dealFillDataError(CompletableFuture<Collection<RowData>> future,
dealMissKey(future);
}
}

protected List<Pair<LogicalType, Integer>> getFieldTypeAndPositionOfKeyField(
String[] keyNames, RowType rowType) {
List<Pair<LogicalType, Integer>> typeAndPosition = Lists.newLinkedList();
for (int i = 0; i < keyNames.length; i++) {
LogicalType type = rowType.getTypeAt(rowType.getFieldIndex(keyNames[i]));
typeAndPosition.add(Pair.of(type, i));
}
return typeAndPosition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ public static String toPrintJson(Object obj) {
Map<String, Object> result =
objectMapper.readValue(objectMapper.writeValueAsString(obj), HashMap.class);
MapUtil.replaceAllElement(
result, Lists.newArrayList("pwd", "password", "druid.password", "secretKey"), "******");
result,
Lists.newArrayList("pwd", "password", "druid.password", "secretKey"),
"******");
return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(result);
} catch (Exception e) {
throw new RuntimeException("error parse [" + obj + "] to json", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.types.logical.RowType;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
Expand All @@ -60,11 +59,7 @@ public class AbstractLruTableFunctionTest {
public void setup() {
LookupConfig lookupConfig = new LookupConfig();
MockRowConverter mockRowConverter = new MockRowConverter();

String[] keyNames = new String[] {"id"};
RowType rowType = mockRowConverter.getRowType();
this.lruTableFunction =
new MockLruTableFunction(lookupConfig, mockRowConverter, keyNames, rowType);
this.lruTableFunction = new MockLruTableFunction(lookupConfig, mockRowConverter);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,14 @@
import com.dtstack.chunjun.lookup.config.LookupConfig;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;

public class MockLruTableFunction extends AbstractLruTableFunction {

public MockLruTableFunction(
LookupConfig lookupConfig,
AbstractRowConverter rowConverter,
String[] keyNames,
RowType rowType) {
super(lookupConfig, rowConverter, keyNames, rowType);
public MockLruTableFunction(LookupConfig lookupConfig, AbstractRowConverter rowConverter) {
super(lookupConfig, rowConverter);
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
<module>chunjun-restore</module>
<module>chunjun-ddl</module>
<module>chunjun-assembly</module>
<module>chunjun-e2e</module>
<module>chunjun-local-test</module>
<!-- <module>chunjun-e2e</module>-->
<!-- <module>chunjun-local-test</module>-->
<module>chunjun-server</module>
</modules>

Expand Down

0 comments on commit 916c61d

Please sign in to comment.