Skip to content

Commit

Permalink
Pipe: Support specifing receiver's username and password when syncing…
Browse files Browse the repository at this point in the history
… data between clusters (#13933) (#13941)
  • Loading branch information
SteveYurongSu authored Oct 29, 2024
1 parent c372cab commit 2877daf
Show file tree
Hide file tree
Showing 22 changed files with 384 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.pipe.it.manual;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema;
import org.apache.iotdb.rpc.TSStatusCode;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import static org.junit.jupiter.api.Assertions.fail;

@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2ManualCreateSchema.class})
public class IoTDBPipePermissionIT extends AbstractPipeDualManualIT {
@Override
@Before
public void setUp() {
MultiEnvFactory.createEnv(2);
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);

// TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(false)
.setDefaultSchemaRegionGroupNumPerDatabase(1)
.setTimestampPrecision("ms")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
receiverEnv
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(false)
.setTimestampPrecision("ms")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
.setSchemaReplicationFactor(3)
.setDataReplicationFactor(2);

// 10 min, assert that the operations will not time out
senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);

senderEnv.initClusterEnvironment();
receiverEnv.initClusterEnvironment(3, 3);
}

@Test
public void testWithSyncConnector() throws Exception {
testWithConnector("iotdb-thrift-sync-connector");
}

@Test
public void testWithAsyncConnector() throws Exception {
testWithConnector("iotdb-thrift-async-connector");
}

private void testWithConnector(final String connector) throws Exception {
if (!TestUtils.tryExecuteNonQueriesWithRetry(
receiverEnv,
Arrays.asList(
"create user `thulab` 'passwd'",
"create role `admin`",
"grant role `admin` to `thulab`",
"grant WRITE, READ, MANAGE_DATABASE on root.** to role `admin`"))) {
return;
}

final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList(
"create timeseries root.ln.wf02.wt01.temperature with datatype=INT64,encoding=PLAIN",
"create timeseries root.ln.wf02.wt01.status with datatype=BOOLEAN,encoding=PLAIN",
"insert into root.ln.wf02.wt01(time, temperature, status) values (1800000000000, 23, true)"))) {
fail();
return;
}

final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.inclusion", "all");

connectorAttributes.put("connector", connector);
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
connectorAttributes.put("connector.username", "thulab");
connectorAttributes.put("connector.password", "passwd");

final TSStatus status =
client.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());

final Set<String> expectedResSet = new HashSet<>();
expectedResSet.add(
"root.ln.wf02.wt01.temperature,null,root.ln,INT64,PLAIN,LZ4,null,null,null,null,BASE,");
expectedResSet.add(
"root.ln.wf02.wt01.status,null,root.ln,BOOLEAN,PLAIN,LZ4,null,null,null,null,BASE,");
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"show timeseries",
"Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,",
expectedResSet);
expectedResSet.clear();

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select * from root.**",
"Time,root.ln.wf02.wt01.temperature,root.ln.wf02.wt01.status,",
Collections.singleton("1800000000000,23,true,"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ public static class ValueHider {

static {
KEYS.add("ssl.trust-store-pwd");
KEYS.add("password");
}

static String hide(final String key, final String value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ private static void processSetParams() {
// ImportTsFileRemotely
ImportTsFileRemotely.setHost(host);
ImportTsFileRemotely.setPort(port);
ImportTsFileRemotely.setUsername(username);
ImportTsFileRemotely.setPassword(password);

// ImportTsFileBase
ImportTsFileBase.setSuccessAndFailDirAndOperation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
Expand Down Expand Up @@ -72,6 +73,9 @@ public class ImportTsFileRemotely extends ImportTsFileBase {
private static String host;
private static String port;

private static String username = SessionConfig.DEFAULT_USER;
private static String password = SessionConfig.DEFAULT_PASSWORD;

public ImportTsFileRemotely() {
initClient();
sendHandshake();
Expand Down Expand Up @@ -186,6 +190,8 @@ private Map<String, String> constructParamsMap() {
PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
Boolean.toString(true));
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, LOAD_STRATEGY);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);
return params;
}

Expand Down Expand Up @@ -335,4 +341,12 @@ public static void setHost(final String host) {
public static void setPort(final String port) {
ImportTsFileRemotely.port = port;
}

public static void setUsername(final String username) {
ImportTsFileRemotely.username = username;
}

public static void setPassword(final String password) {
ImportTsFileRemotely.password = password;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ public IoTDBConfigNodeSyncClientManager(
boolean useSSL,
String trustStorePath,
String trustStorePwd,
/* The following parameters are used locally. */
String loadBalanceStrategy,
/* The following parameters are used to handshake with the receiver. */
String username,
String password,
boolean shouldReceiverConvertOnTypeMismatch,
String loadTsFileStrategy) {
super(
Expand All @@ -48,6 +52,8 @@ public IoTDBConfigNodeSyncClientManager(
trustStorePwd,
false,
loadBalanceStrategy,
username,
password,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ protected byte[] generateHandShakeV2Payload() throws IOException {
Boolean.toString(shouldReceiverConvertOnTypeMismatch));
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, loadTsFileStrategy);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);

return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,12 @@ protected IoTDBSyncClientManager constructClient(
final boolean useSSL,
final String trustStorePath,
final String trustStorePwd,
/* The following parameters are used locally. */
final boolean useLeaderCache,
final String loadBalanceStrategy,
/* The following parameters are used to handshake with the receiver. */
final String username,
final String password,
final boolean shouldReceiverConvertOnTypeMismatch,
final String loadTsFileStrategy) {
return new IoTDBConfigNodeSyncClientManager(
Expand All @@ -72,6 +76,8 @@ protected IoTDBSyncClientManager constructClient(
Objects.nonNull(trustStorePath) ? ConfigNodeConfig.addHomeDir(trustStorePath) : null,
trustStorePwd,
loadBalanceStrategy,
username,
password,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Base64;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -72,22 +73,32 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager

private final LoadBalancer loadBalancer;

private final boolean shouldReceiverConvertOnTypeMismatch;

private final String loadTsFileStrategy;

public IoTDBDataNodeAsyncClientManager(
List<TEndPoint> endPoints,
/* The following parameters are used locally. */
boolean useLeaderCache,
String loadBalanceStrategy,
/* The following parameters are used to handshake with the receiver. */
String username,
String password,
boolean shouldReceiverConvertOnTypeMismatch,
String loadTsFileStrategy) {
super(endPoints, useLeaderCache);
super(
endPoints,
useLeaderCache,
username,
password,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy);

endPointSet = new HashSet<>(endPoints);

receiverAttributes =
String.format("%s-%s", shouldReceiverConvertOnTypeMismatch, loadTsFileStrategy);
String.format(
"%s-%s-%s",
Base64.getEncoder().encodeToString((username + ":" + password).getBytes()),
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy);
synchronized (IoTDBDataNodeAsyncClientManager.class) {
if (!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes)) {
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent(
Expand Down Expand Up @@ -118,9 +129,6 @@ public IoTDBDataNodeAsyncClientManager(
loadBalanceStrategy);
loadBalancer = new RoundRobinLoadBalancer();
}

this.shouldReceiverConvertOnTypeMismatch = shouldReceiverConvertOnTypeMismatch;
this.loadTsFileStrategy = loadTsFileStrategy;
}

public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
Expand Down Expand Up @@ -234,6 +242,8 @@ public void onError(Exception e) {
Boolean.toString(shouldReceiverConvertOnTypeMismatch));
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, loadTsFileStrategy);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);

client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params), callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ public IoTDBDataNodeSyncClientManager(
final boolean useSSL,
final String trustStorePath,
final String trustStorePwd,
/* The following parameters are used locally. */
final boolean useLeaderCache,
final String loadBalanceStrategy,
/* The following parameters are used to handshake with the receiver. */
final String username,
final String password,
final boolean shouldReceiverConvertOnTypeMismatch,
final String loadTsFileStrategy) {
super(
Expand All @@ -58,6 +62,8 @@ public IoTDBDataNodeSyncClientManager(
trustStorePwd,
useLeaderCache,
loadBalanceStrategy,
username,
password,
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ protected byte[] generateHandShakeV2Payload() throws IOException {
Boolean.toString(shouldReceiverConvertOnTypeMismatch));
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, loadTsFileStrategy);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, username);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, password);

return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USERNAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_USER_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_IP_KEY;
Expand All @@ -87,6 +88,7 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SYNC_CONNECTOR_VERSION_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USERNAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_USER_KEY;

public class IoTDBLegacyPipeConnector implements PipeConnector {
Expand Down Expand Up @@ -196,7 +198,11 @@ public void customize(

user =
parameters.getStringOrDefault(
Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
Arrays.asList(
CONNECTOR_IOTDB_USER_KEY,
SINK_IOTDB_USER_KEY,
CONNECTOR_IOTDB_USERNAME_KEY,
SINK_IOTDB_USERNAME_KEY),
CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
password =
parameters.getStringOrDefault(
Expand Down
Loading

0 comments on commit 2877daf

Please sign in to comment.