Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support to set version white list #92

Merged
merged 4 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ public class NebulaClientOptions implements Serializable {

private final SelfSignParams selfSignParams;

private final String version;

private NebulaClientOptions(String metaAddress, String graphAddress, String username,
String password, int timeout, int connectRetry,
boolean enableGraphSSL, boolean enableMetaSSL,
boolean enableStorageSSL,
SSLSignType sslSignType, CASignParams caSignParams,
SelfSignParams selfSignParams) {
SelfSignParams selfSignParams, String version) {
this.metaAddress = metaAddress;
this.graphAddress = graphAddress;
this.username = username;
Expand All @@ -59,6 +60,7 @@ private NebulaClientOptions(String metaAddress, String graphAddress, String user
this.sslSignType = sslSignType;
this.caSignParams = caSignParams;
this.selfSignParams = selfSignParams;
this.version = version;
}

public List<HostAddress> getMetaAddress() {
Expand Down Expand Up @@ -118,6 +120,10 @@ public SelfSignParams getSelfSignParam() {
return selfSignParams;
}

public String getVersion() {
return version;
}

/**
* Builder for {@link NebulaClientOptions}
*/
Expand All @@ -136,6 +142,7 @@ public static class NebulaClientOptionsBuilder {
private SSLSignType sslSignType = null;
private CASignParams caSignParams = null;
private SelfSignParams selfSignParams = null;
private String version = null;

public NebulaClientOptionsBuilder setMetaAddress(String metaAddress) {
this.metaAddress = metaAddress;
Expand Down Expand Up @@ -200,6 +207,11 @@ public NebulaClientOptionsBuilder setSelfSignParam(String crtFilePath, String ke
return this;
}

public NebulaClientOptionsBuilder setVersion(String version) {
this.version = version;
return this;
}

public NebulaClientOptions build() {
if (metaAddress == null || metaAddress.trim().isEmpty()) {
throw new IllegalArgumentException("meta address can not be empty.");
Expand Down Expand Up @@ -246,7 +258,8 @@ public NebulaClientOptions build() {
enableStorageSSL,
sslSignType,
caSignParams,
selfSignParams);
selfSignParams,
version);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.apache.flink.connector.nebula.connection;


import com.vesoft.nebula.client.graph.NebulaPoolConfig;
import com.vesoft.nebula.client.graph.data.CASignedSSLParam;
import com.vesoft.nebula.client.graph.data.HostAddress;
Expand Down Expand Up @@ -43,9 +42,9 @@ public NebulaPool getNebulaPool() throws UnknownHostException {
}

Collections.shuffle(addresses);
NebulaPool nebulaPool = new NebulaPool();
NebulaPoolConfig poolConfig = new NebulaPoolConfig();
poolConfig.setTimeout(nebulaClientOptions.getTimeout());
poolConfig.setVersion(nebulaClientOptions.getVersion());
if (nebulaClientOptions.isEnableGraphSSL()) {
poolConfig.setEnableSsl(true);
switch (nebulaClientOptions.getSSLSignType()) {
Expand All @@ -67,8 +66,12 @@ public NebulaPool getNebulaPool() throws UnknownHostException {
throw new IllegalArgumentException("ssl sign type is not supported.");
}
}
nebulaPool.init(addresses, poolConfig);
return nebulaPool;
NebulaPool nebulaPool = new NebulaPool();
if (nebulaPool.init(addresses, poolConfig)) {
return nebulaPool;
} else {
throw new RuntimeException("NebulaPool init failed.");
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public MetaClient getMetaClient() throws TException, ClientServerIncompatibleExc
metaClient = new MetaClient(addresses, timeout, retry, retry);
}

metaClient.setVersion(nebulaClientOptions.getVersion());
metaClient.connect();
return metaClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public StorageClient getStorageClient() throws Exception {
storageClient = new StorageClient(addresses, timeout);
}

storageClient.setVersion(nebulaClientOptions.getVersion());
if (!storageClient.connect()) {
throw new Exception("failed to connect storaged.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public void getNebulaPool() {
.setPassword("nebula")
.setConnectRetry(1)
.setTimeout(1000)
.setVersion("test")
.build();
NebulaGraphConnectionProvider graphConnectionProvider =
new NebulaGraphConnectionProvider(nebulaClientOptions);
Expand All @@ -52,10 +53,37 @@ public void getNebulaPool() {
}
}

@Test
public void getNebulaPoolWithWrongVersion() {
NebulaClientOptions nebulaClientOptions =
new NebulaClientOptions.NebulaClientOptionsBuilder()
.setGraphAddress("127.0.0.1:9669")
.setMetaAddress("127.0.0.1:9559")
.setUsername("root")
.setPassword("nebula")
.setConnectRetry(1)
.setTimeout(1000)
.setVersion("INVALID_VERSION")
.build();
NebulaGraphConnectionProvider graphConnectionProvider =
new NebulaGraphConnectionProvider(nebulaClientOptions);
try {
NebulaPool nebulaPool = graphConnectionProvider.getNebulaPool();
nebulaPool.getSession("root", "nebula", true);
} catch (Exception e) {
LOG.info("get session failed", e);
if (e.getMessage().contains("NebulaPool init failed.")) {
assert true;
} else {
assert false;
}
}
}

/**
* nebula server does not enable ssl, the connection cannot be established correctly.
*/
@Test(expected = NotValidConnectionException.class)
@Test(expected = RuntimeException.class)
public void getSessionWithSsl() throws NotValidConnectionException {
NebulaClientOptions nebulaClientOptions =
new NebulaClientOptions.NebulaClientOptionsBuilder()
Expand All @@ -81,7 +109,7 @@ public void getSessionWithSsl() throws NotValidConnectionException {
NebulaPool pool = graphConnectionProvider.getNebulaPool();
pool.getSession("root", "nebula", true);
} catch (UnknownHostException | IOErrorException | AuthFailedException
| ClientServerIncompatibleException e) {
| ClientServerIncompatibleException e) {
LOG.error("get session failed", e);
assert (false);
}
Expand Down
12 changes: 12 additions & 0 deletions connector/src/test/resources/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ services:
- --minloglevel=0
- --heartbeat_interval_secs=2
- --expired_time_factor=2
- --enable_client_white_list=true
- --client_white_list=3.0.0:test
healthcheck:
test: ["CMD", "curl", "-f", "http://172.28.1.1:11000/status"]
interval: 30s
Expand Down Expand Up @@ -52,6 +54,8 @@ services:
- --minloglevel=0
- --heartbeat_interval_secs=2
- --expired_time_factor=2
- --enable_client_white_list=true
- --client_white_list=3.0.0:test
healthcheck:
test: ["CMD", "curl", "-f", "http://172.28.1.2:11000/status"]
interval: 30s
Expand Down Expand Up @@ -88,6 +92,8 @@ services:
- --minloglevel=0
- --heartbeat_interval_secs=2
- --expired_time_factor=2
- --enable_client_white_list=true
- --client_white_list=3.0.0:test
healthcheck:
test: ["CMD", "curl", "-f", "http://172.28.1.3:11000/status"]
interval: 30s
Expand Down Expand Up @@ -242,6 +248,8 @@ services:
- --minloglevel=0
- --heartbeat_interval_secs=2
- --timezone_name=+08:00:00
- --enable_client_white_list=true
- --client_white_list=3.0.0:test
depends_on:
- metad0
- metad1
Expand Down Expand Up @@ -279,6 +287,8 @@ services:
- --minloglevel=0
- --heartbeat_interval_secs=2
- --timezone_name=+08:00:00
- --enable_client_white_list=true
- --client_white_list=3.0.0:test
depends_on:
- metad0
- metad1
Expand Down Expand Up @@ -316,6 +326,8 @@ services:
- --minloglevel=0
- --heartbeat_interval_secs=2
- --timezone_name=+08:00:00
- --enable_client_white_list=true
- --client_white_list=3.0.0:test
depends_on:
- metad0
- metad1
Expand Down
Loading