Skip to content

Commit

Permalink
feat: add light java sdk (#3642)
Browse files Browse the repository at this point in the history
* feat: add light java sdk

* test: add test

* refact: type

* fix: fix light

* fix: fix compile

* test: fix online mode

* fix: fix comment

---------

Co-authored-by: denglong <[email protected]>
Co-authored-by: 4paradigm <[email protected]>
  • Loading branch information
3 people authored Dec 7, 2023
1 parent a8c0226 commit e540195
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class SdkOption {
private int glogLevel = 0;
private String glogDir = "";
private int maxSqlCacheSize = 50;
private boolean isLight = false;

private void buildBaseOptions(BasicRouterOptions opt) {
opt.setEnable_debug(getEnableDebug());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,12 @@ public class SqlException extends Exception {
public SqlException(String message) {
super(message);
}

public SqlException(String message, Throwable cause) {
super(message, cause);
}

public SqlException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,27 +62,28 @@ public class SqlClusterExecutor implements SqlExecutor {
private static final AtomicBoolean initialized = new AtomicBoolean(false);
private SQLRouter sqlRouter;
private DeploymentManager deploymentManager;
private ZKClient zkClient;
private InsertPreparedStatementCache insertCache;

public SqlClusterExecutor(SdkOption option, String libraryPath) throws SqlException {
initJavaSdkLibrary(libraryPath);

ZKClient zkClient = null;
if (option.isClusterMode()) {
SQLRouterOptions sqlOpt = option.buildSQLRouterOptions();
this.sqlRouter = sql_router_sdk.NewClusterSQLRouter(sqlOpt);
sqlOpt.delete();
zkClient = new ZKClient(ZKConfig.builder()
.cluster(option.getZkCluster())
.namespace(option.getZkPath())
.sessionTimeout((int)option.getSessionTimeout())
.build());
try {
if (!zkClient.connect()) {
throw new SqlException("zk client connect failed.");
if (!option.isLight()) {
zkClient = new ZKClient(ZKConfig.builder()
.cluster(option.getZkCluster())
.namespace(option.getZkPath())
.sessionTimeout((int)option.getSessionTimeout())
.build());
try {
if (!zkClient.connect()) {
throw new SqlException("zk client connect failed.");
}
} catch (Exception e) {
throw new SqlException("init zk client failed", e);
}
} catch (Exception e) {
throw new SqlException("init zk client failed. " + e.getMessage());
}
} else {
StandaloneOptions sqlOpt = option.buildStandaloneOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

public class SQLRouterSmokeTest {
public static SqlExecutor clusterExecutor;
public static SqlExecutor lightClusterExecutor;
public static SqlExecutor standaloneExecutor;

static {
Expand All @@ -54,9 +55,10 @@ public class SQLRouterSmokeTest {
option.setZkCluster(TestConfig.ZK_CLUSTER);
option.setSessionTimeout(200000);
clusterExecutor = new SqlClusterExecutor(option);
java.sql.Statement state = clusterExecutor.getStatement();
state.execute("SET @@execute_mode='online';");
state.close();
setOnlineMode(clusterExecutor);
option.setLight(true);
lightClusterExecutor = new SqlClusterExecutor(option);
setOnlineMode(lightClusterExecutor);
// create standalone router
SdkOption standaloneOption = new SdkOption();
standaloneOption.setHost(TestConfig.HOST);
Expand All @@ -69,6 +71,16 @@ public class SQLRouterSmokeTest {
}
}

static void setOnlineMode(SqlExecutor executor) {
java.sql.Statement state = executor.getStatement();
try {
state.execute("SET @@execute_mode='online';");
state.close();
} catch (Exception e) {
e.printStackTrace();
}
}

@Test
void testMoreOptions() throws Exception {
SdkOption option = new SdkOption();
Expand All @@ -82,7 +94,7 @@ void testMoreOptions() throws Exception {

@DataProvider(name = "executor")
public Object[] executor() {
return new Object[] { clusterExecutor, standaloneExecutor };
return new Object[] { clusterExecutor, lightClusterExecutor, standaloneExecutor };
}

@Test(dataProvider = "executor")
Expand Down Expand Up @@ -128,7 +140,7 @@ public void testSmoke(SqlExecutor router) {

// select
String select1 = "select * from tsql1010;";
SQLResultSet rs1 = (SQLResultSet) router .executeSQL(dbname, select1);
SQLResultSet rs1 = (SQLResultSet) router.executeSQL(dbname, select1);

Assert.assertEquals(2, rs1.GetInternalSchema().getColumnList().size());
Assert.assertEquals(Types.BIGINT, rs1.GetInternalSchema().getColumnType(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public StructType inferSchema(CaseInsensitiveStringMap options) {
option = new SdkOption();
option.setZkCluster(zkCluster);
option.setZkPath(zkPath);
option.setLight(true);
String timeout = options.get("sessionTimeout");
if (timeout != null) {
option.setSessionTimeout(Integer.parseInt(timeout));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public OpenmldbDataSingleWriter(OpenmldbWriteConfig config, int partitionId, lon
SdkOption option = new SdkOption();
option.setZkCluster(config.zkCluster);
option.setZkPath(config.zkPath);
option.setLight(true);
SqlClusterExecutor executor = new SqlClusterExecutor(option);
String dbName = config.dbName;
String tableName = config.tableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public OpenmldbDataWriter(OpenmldbWriteConfig config, int partitionId, long task
SdkOption option = new SdkOption();
option.setZkCluster(config.zkCluster);
option.setZkPath(config.zkPath);
option.setLight(true);
SqlClusterExecutor executor = new SqlClusterExecutor(option);
String dbName = config.dbName;
String tableName = config.tableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class OpenmldbPartitionReader(config: OpenmldbReadConfig) extends PartitionReade
val option = new SdkOption
option.setZkCluster(config.zkCluster)
option.setZkPath(config.zkPath)
option.setLight(true)
val executor = new SqlClusterExecutor(option)
val dbName: String = config.dbName
val tableName: String = config.tableName
Expand Down

0 comments on commit e540195

Please sign in to comment.