From 538912d71202d44ccea1291561f88409b13467b1 Mon Sep 17 00:00:00 2001 From: limbo-24 <1003239855@qq.com> Date: Sat, 31 Aug 2024 20:19:44 +0800 Subject: [PATCH 1/6] feat: support rocketmq-connect-oss sink --- .../aliyun/rocketmq-connect-oss/README.md | 49 ++++ .../aliyun/rocketmq-connect-oss/pom.xml | 218 ++++++++++++++++++ .../connect/oss/sink/OssSinkConnector.java | 81 +++++++ .../connect/oss/sink/OssSinkTask.java | 215 +++++++++++++++++ .../oss/sink/constant/OssConstant.java | 14 ++ .../connect/eventbridge/sink/OssSinkTest.java | 156 +++++++++++++ 6 files changed, 733 insertions(+) create mode 100644 connectors/aliyun/rocketmq-connect-oss/README.md create mode 100644 connectors/aliyun/rocketmq-connect-oss/pom.xml create mode 100644 connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkConnector.java create mode 100644 connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java create mode 100644 connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/constant/OssConstant.java create mode 100644 connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/OssSinkTest.java diff --git a/connectors/aliyun/rocketmq-connect-oss/README.md b/connectors/aliyun/rocketmq-connect-oss/README.md new file mode 100644 index 000000000..7a62d72ff --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-oss/README.md @@ -0,0 +1,49 @@ +# rocketmq-connect-oss +* **rocketmq-connect-oss** 说明 +``` +Be responsible for consuming messages from producer and writing data to oss. +``` + +## rocketmq-connect-oss 打包 +``` +mvn clean install -Dmaven.test.skip=true +``` + +## rocketmq-connect-oss 启动 + +* **rocketmq-connect-oss** 启动 + +``` +http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-oss-sink-connector-name} +?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.oss.sink.OssSinkConnector","connect-topicname":"${connect-topicname}","accessKeyId":"${accessKeyId}","accessKeySecret":"${accessKeySecret}","accountEndpoint":"${accountEndpoint}","bucketName":"${bucketName}","fileUrlPrefix":"${fileUrlPrefix}","objectName":"${objectName}","region":"${region}","partitionMethod":"${partitionMethod}"} +``` + +例子 +``` +http://localhost:8081/connectors/ossConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster", +"connector-class":"org.apache.rocketmq.connect.oss.sink.OssSinkConnector","connect-topicname":"oss-topic","accountEndpoint":"xxxx","accessKeyId":"xxxx","accessKeySecret":"xxxx", +"bucketName":"xxxx","objectName":"xxxx","region":"xxxx","partitionMethod":"xxxx"} +``` + +>**注:** `rocketmq-oss-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中 + +## rocketmq-connect-oss 停止 + +``` +http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-oss-connector-name}/stop +``` + +## rocketmq-connect-oss 参数说明 +* **oss-sink-connector 参数说明** + +| KEY | TYPE | Must be filled | Description | Example +|------------------------|---------|----------------|----------------------------------|---------| +|accountEndpoint | String | YES | OSS endpoint | oss-cn-beijing.aliyuncs.com| +|accessKeyId | String | YES | 阿里云授信账号的AK | xxxx | +|accessKeySecret | String | YES | 阿里云授信账号的SK | xxx | +|bucketName | String | YES | OSS bucketName | test_bucket | +|objectName | String | YES | 上传目的object名字 | test.txt | +|region | String | YES | OSS region | cn-beijing | +|partitionMethod | String | YES | 分区模式,Normal表示不分区,Time表示按时间分区 | Time | +|fileUrlPrefix | String | YES | 到object的URL前缀 | file1/ | + diff --git a/connectors/aliyun/rocketmq-connect-oss/pom.xml b/connectors/aliyun/rocketmq-connect-oss/pom.xml new file mode 100644 index 000000000..f303c5326 --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-oss/pom.xml @@ -0,0 +1,218 @@ + + + 4.0.0 + + org.apache.rocketmq + rocketmq-connect-oss + 0.0.1-SNAPSHOT + + connect-oss + + 8 + 8 + 1.7.36 + 1.2.11 + 4.13.2 + 1.2.83 + 3.22.0 + 4.5.1 + 0.1.4 + 3.12.0 + UTF-8 + 3.17.2 + 3.1.0 + 4.6.0 + 2.9.0 + + + + + + org.codehaus.mojo + versions-maven-plugin + 2.3 + + + org.codehaus.mojo + clirr-maven-plugin + 2.7 + + + maven-dependency-plugin + + ${project.build.directory}/lib + false + true + + + + maven-compiler-plugin + 3.6.1 + + ${maven.compiler.source} + ${maven.compiler.target} + ${maven.compiler.source} + true + true + + + + maven-surefire-plugin + 2.19.1 + + -Xms512m -Xmx1024m + always + + **/*Test.java + + + + + maven-site-plugin + 3.6 + + en_US + UTF-8 + UTF-8 + + + + maven-source-plugin + 3.0.1 + + + attach-sources + + jar + + + + + + maven-javadoc-plugin + 2.10.4 + + UTF-8 + en_US + io.openmessaging.internal + + + + aggregate + + aggregate + + site + + + + + maven-resources-plugin + 3.0.2 + + ${project.build.sourceEncoding} + + + + org.codehaus.mojo + findbugs-maven-plugin + 3.0.4 + + + maven-assembly-plugin + 3.0.0 + + + + org.apache.rocketmq.connect.oss.sink.OssSinkConnector + + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + ch.qos.logback + logback-classic + ${logback.version} + + + ch.qos.logback + logback-core + ${logback.version} + + + org.assertj + assertj-core + ${assertj.version} + + + com.alibaba + fastjson + ${fastjson.version} + + + io.openmessaging + openmessaging-connector + ${openmessaging-connector.version} + + + junit + junit + ${junit.version} + test + + + org.mockito + mockito-core + ${mockito.version} + test + + + org.apache.commons + commons-lang3 + ${commons-lang3.version} + + + com.aliyun.oss + aliyun-sdk-oss + ${aliyun-sdk-oss.version} + + + com.google.code.gson + gson + ${gson.version} + + + com.aliyun + aliyun-java-sdk-core + ${aliyun-java-sdk-core.version} + + + com.aliyun + aliyun-java-sdk-sts + ${aliyun-java-sdk-sts.version} + + + + \ No newline at end of file diff --git a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkConnector.java b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkConnector.java new file mode 100644 index 000000000..2493f8a96 --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkConnector.java @@ -0,0 +1,81 @@ +package org.apache.rocketmq.connect.oss.sink; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.Task; +import io.openmessaging.connector.api.component.task.sink.SinkConnector; +import io.openmessaging.internal.DefaultKeyValue; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.connect.oss.sink.constant.OssConstant; + +import java.util.ArrayList; +import java.util.List; + +public class OssSinkConnector extends SinkConnector { + + private String accessKeyId; + + private String accessKeySecret; + + private String accountEndpoint; + + private String bucketName; + + private String fileUrlPrefix; + + private String objectName; + + private String region; + + private String partitionMethod; + + @Override + public List taskConfigs(int maxTasks) { + List keyValueList = new ArrayList<>(11); + KeyValue keyValue = new DefaultKeyValue(); + keyValue.put(OssConstant.ACCESS_KEY_ID, accessKeyId); + keyValue.put(OssConstant.ACCESS_KEY_SECRET, accessKeySecret); + keyValue.put(OssConstant.ACCOUNT_ENDPOINT, accountEndpoint); + keyValue.put(OssConstant.BUCKET_NAME, bucketName); + keyValue.put(OssConstant.FILE_URL_PREFIX, fileUrlPrefix); + keyValue.put(OssConstant.OBJECT_NAME, objectName); + keyValue.put(OssConstant.REGION, region); + keyValue.put(OssConstant.PARTITION_METHOD, partitionMethod); + keyValueList.add(keyValue); + return keyValueList; + } + + @Override + public Class taskClass() { + return OssSinkTask.class; + } + + @Override + public void validate(KeyValue config) { + if (StringUtils.isBlank(config.getString(OssConstant.ACCESS_KEY_ID)) + || StringUtils.isBlank(config.getString(OssConstant.ACCESS_KEY_SECRET)) + || StringUtils.isBlank(config.getString(OssConstant.ACCOUNT_ENDPOINT)) + || StringUtils.isBlank(config.getString(OssConstant.BUCKET_NAME)) + || StringUtils.isBlank(config.getString(OssConstant.OBJECT_NAME)) + || StringUtils.isBlank(config.getString(OssConstant.REGION)) + || StringUtils.isBlank(config.getString(OssConstant.PARTITION_METHOD))) { + throw new RuntimeException("Oss required parameter is null !"); + } + } + + @Override + public void start(KeyValue config) { + accessKeyId = config.getString(OssConstant.ACCESS_KEY_ID); + accessKeySecret = config.getString(OssConstant.ACCESS_KEY_SECRET); + accountEndpoint = config.getString(OssConstant.ACCOUNT_ENDPOINT); + bucketName = config.getString(OssConstant.BUCKET_NAME); + fileUrlPrefix = config.getString(OssConstant.FILE_URL_PREFIX); + objectName = config.getString(OssConstant.OBJECT_NAME); + region = config.getString(OssConstant.REGION); + partitionMethod = config.getString(OssConstant.PARTITION_METHOD); + } + + @Override + public void stop() { + + } +} diff --git a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java new file mode 100644 index 000000000..72d1b61d7 --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java @@ -0,0 +1,215 @@ +package org.apache.rocketmq.connect.oss.sink; + + +import com.aliyuncs.DefaultAcsClient; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.aliyuncs.IAcsClient; +import com.aliyuncs.http.FormatType; +import com.aliyuncs.profile.DefaultProfile; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.sink.SinkTask; +import io.openmessaging.connector.api.component.task.sink.SinkTaskContext; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.errors.ConnectException; +import io.openmessaging.connector.api.errors.RetriableException; +import org.apache.rocketmq.connect.oss.sink.constant.OssConstant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.aliyun.oss.ClientException; +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClientBuilder; +import com.aliyun.oss.ClientBuilderConfiguration; +import com.aliyun.oss.OSSException; +import com.aliyun.oss.common.auth.CredentialsProviderFactory; +import com.aliyun.oss.common.auth.DefaultCredentialProvider; +import com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider; +import com.aliyun.oss.common.auth.*; +import com.aliyun.oss.common.comm.SignVersion; +import com.aliyun.oss.model.AppendObjectRequest; +import com.aliyun.oss.model.AppendObjectResult; +import com.aliyun.oss.model.OSSObject; +import com.aliyun.oss.model.PutObjectRequest; +import com.aliyun.oss.model.PutObjectResult; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.IOException; + +public class OssSinkTask extends SinkTask { + private static final Logger log = LoggerFactory.getLogger(OssSinkTask.class); + + private String accessKeyId; + + private String accessKeySecret; + + private String accountEndpoint; + + private String bucketName; + + private String fileUrlPrefix; + + private String region; + + private OSS ossClient; + + private String objectName; + + private String partitionMethod; + + private String compressType; + + private long lastOffset; + + private long lastTimeStamp; + + private String lastPrefix; + + private String genFilePrefixByPartition(ConnectRecord record) throws ConnectException { + if (partitionMethod.equals("Normal")) { + return fileUrlPrefix; + } else if (partitionMethod.equals("Time")) { + long nowTimeStamp = record.getTimestamp(); + if (lastTimeStamp != nowTimeStamp) { + Date myDate = new Date(nowTimeStamp); + String year = String.format("%tY", myDate); + String month = String.format("%tm", myDate); + String day = String.format("%td", myDate); + String hour = String.format("%tH", myDate); + lastPrefix = fileUrlPrefix + year + "/" + month + "/" + day + "/" + hour + "/"; + return lastPrefix; + } + return lastPrefix; + } else { + throw new RetriableException("Illegal partition method."); + // log.error("Illegal partition method."); + // return ""; + } + } + + private long genObjectOffset(ConnectRecord record, String objectUrl) throws ConnectException, IOException { + if (partitionMethod.equals("Normal")) { + return lastOffset; + } else if (partitionMethod.equals("Time")) { + if (lastTimeStamp != record.getTimestamp()) { + boolean exists = ossClient.doesObjectExist(bucketName, objectUrl); + // If the object does not exist, create it and set offset to 0, otherwise read the offset of the current object + if (exists) { + OSSObject ossObject = ossClient.getObject(bucketName, objectUrl); + InputStream inputStream = ossObject.getObjectContent(); + lastOffset = inputStream.available(); + return lastOffset; + } else { + lastOffset = 0; + return lastOffset; + } + } else { + return lastOffset; + } + } else { + throw new RetriableException("Illegal partition method."); + // log.error("Illegal partition method."); + // return 0; + } + } + + @Override + public void put(List sinkRecords) throws ConnectException { + try { + sinkRecords.forEach(sinkRecord -> { + try { + //Create JOSN to save the info of connectrecord, now only contains the data content + JSONObject jsonObject = new JSONObject(); + jsonObject.put("data", sinkRecord.getData()); + String context = JSON.toJSONString(jsonObject); + + String prefix = genFilePrefixByPartition(sinkRecord); + String absolutePath = prefix + objectName; + long appendOffset = genObjectOffset(sinkRecord, absolutePath); + + // Create an append write request and send it + AppendObjectRequest appendObjectRequest = new AppendObjectRequest(bucketName, absolutePath, new ByteArrayInputStream(context.getBytes())); + appendObjectRequest.setPosition(appendOffset); + AppendObjectResult appendObjectResult = ossClient.appendObject(appendObjectRequest); + + // Update + lastOffset = appendObjectResult.getNextPosition(); + lastTimeStamp = sinkRecord.getTimestamp(); + } catch (OSSException oe) { + System.out.println("Caught an OSSException, which means your request made it to OSS, " + + "but was rejected with an error response for some reason."); + System.out.println("Error Message:" + oe.getErrorMessage()); + System.out.println("Error Code:" + oe.getErrorCode()); + System.out.println("Request ID:" + oe.getRequestId()); + System.out.println("Host ID:" + oe.getHostId()); + } catch (ClientException ce) { + System.out.println("Caught an ClientException, which means the client encountered " + + "a serious internal problem while trying to communicate with OSS, " + + "such as not being able to access the network."); + System.out.println("Error Message:" + ce.getMessage()); + } catch (Exception e) { + log.error("OSSSinkTask | genObjectOffset | error => ", e); + } + }); + } catch (Exception e) { + log.error("OSSSinkTask | put | error => ", e); + } + } + + @Override + public void validate(KeyValue config) { + + } + + @Override + public void start(KeyValue config) { + accessKeyId = config.getString(OssConstant.ACCESS_KEY_ID); + accessKeySecret = config.getString(OssConstant.ACCESS_KEY_SECRET); + accountEndpoint = config.getString(OssConstant.ACCOUNT_ENDPOINT); + bucketName = config.getString(OssConstant.BUCKET_NAME); + fileUrlPrefix = config.getString(OssConstant.FILE_URL_PREFIX); + objectName = config.getString(OssConstant.OBJECT_NAME); + region = config.getString(OssConstant.REGION); + partitionMethod = config.getString(OssConstant.PARTITION_METHOD); + compressType = config.getString(OssConstant.COMPRESS_TYPE); + + try { + DefaultCredentialProvider credentialsProvider = CredentialsProviderFactory.newDefaultCredentialProvider(accessKeyId, accessKeySecret); + + ClientBuilderConfiguration clientBuilderConfiguration = new ClientBuilderConfiguration(); + clientBuilderConfiguration.setSignatureVersion(SignVersion.V4); + ossClient = OSSClientBuilder.create() + .endpoint(accountEndpoint) + .credentialsProvider(credentialsProvider) + .clientConfiguration(clientBuilderConfiguration) + .region(region) + .build(); + if (partitionMethod.equals("Normal")) { + boolean exists = ossClient.doesObjectExist(bucketName, fileUrlPrefix + objectName); + // If the object does not exist, create it and set offset to 0, otherwise read the offset of the current object + if (exists) { + OSSObject ossObject = ossClient.getObject(bucketName, fileUrlPrefix + objectName); + InputStream inputStream = ossObject.getObjectContent(); + long offset_now = inputStream.available(); + lastOffset = offset_now; + } else { + lastOffset = 0; + } + } + } catch (Exception e) { + log.error("OssSinkTask | start | error => ", e); + throw new RuntimeException(e); + } + } + + @Override + public void stop() { + ossClient.shutdown(); + } + +} diff --git a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/constant/OssConstant.java b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/constant/OssConstant.java new file mode 100644 index 000000000..3c0648763 --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/constant/OssConstant.java @@ -0,0 +1,14 @@ +package org.apache.rocketmq.connect.oss.sink.constant; + +public class OssConstant { + + public static final String ACCESS_KEY_ID = "accessKeyId"; + public static final String ACCESS_KEY_SECRET = "accessKeySecret"; + public static final String ACCOUNT_ENDPOINT = "accountEndpoint"; + public static final String BUCKET_NAME = "bucketName"; + public static final String FILE_URL_PREFIX = "fileUrlPrefix"; + public static final String OBJECT_NAME = "objectName"; + public static final String REGION = "region"; + public static final String PARTITION_METHOD = "partitionMethod"; + public static final String COMPRESS_TYPE = "compressType"; +} diff --git a/connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/OssSinkTest.java b/connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/OssSinkTest.java new file mode 100644 index 000000000..b462b1b18 --- /dev/null +++ b/connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/OssSinkTest.java @@ -0,0 +1,156 @@ +package org.apache.rocketmq.connect.oss.sink; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.sink.SinkTaskContext; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.RecordOffset; +import io.openmessaging.connector.api.data.RecordPartition; +import io.openmessaging.internal.DefaultKeyValue; +import org.apache.rocketmq.connect.oss.sink.constant.OssConstant; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + + +public class OssSinkTest { + @Test + public void testTaskConfigs() { + OssSinkConnector ossSinkConnector = new OssSinkConnector(); + Assert.assertEquals(ossSinkConnector.taskConfigs(1).size(), 1); + } + + @Test + public void testNormalPut() { + OssSinkTask ossSinkTask = new OssSinkTask(); + KeyValue keyValue = new DefaultKeyValue(); + // Replace KV pair with your own message + keyValue.put(OssConstant.ACCESS_KEY_ID, "LTAI5t68yKJXx6HbkrKowqe8"); + keyValue.put(OssConstant.ACCESS_KEY_SECRET, "eiDUU47CIJ0ShVX2zzl3KhehyscrSY"); + keyValue.put(OssConstant.ACCOUNT_ENDPOINT, "oss-cn-beijing.aliyuncs.com"); + keyValue.put(OssConstant.BUCKET_NAME, "rocketmqoss"); + keyValue.put(OssConstant.FILE_URL_PREFIX, "test/"); + keyValue.put(OssConstant.OBJECT_NAME, "oss_new.txt"); + keyValue.put(OssConstant.REGION, "cn-beijing"); + keyValue.put(OssConstant.PARTITION_METHOD, "Normal"); + + ossSinkTask.start(keyValue); + List connectRecordList = new ArrayList<>(); + ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis()); + connectRecord.setData("{\n" + + "\t\"test\" : \"test\"\n" + + "}"); + connectRecordList.add(connectRecord); + ossSinkTask.init(new SinkTaskContext() { + @Override + public String getConnectorName() { + return null; + } + + @Override + public String getTaskName() { + return null; + } + + @Override public KeyValue configs() { + return null; + } + + @Override + public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) { + + } + + @Override + public void resetOffset(Map map) { + + } + + @Override + public void pause(List list) { + + } + + @Override + public void resume(List list) { + + } + + @Override + public Set assignment() { + return null; + } + }); + ossSinkTask.put(connectRecordList); + } + + @Test + public void testTimePut() { + OssSinkTask ossSinkTask = new OssSinkTask(); + KeyValue keyValue = new DefaultKeyValue(); + // Replace KV pair with your own message + keyValue.put(OssConstant.ACCESS_KEY_ID, "LTAI5t68yKJXx6HbkrKowqe8"); + keyValue.put(OssConstant.ACCESS_KEY_SECRET, "eiDUU47CIJ0ShVX2zzl3KhehyscrSY"); + keyValue.put(OssConstant.ACCOUNT_ENDPOINT, "oss-cn-beijing.aliyuncs.com"); + keyValue.put(OssConstant.BUCKET_NAME, "rocketmqoss"); + keyValue.put(OssConstant.FILE_URL_PREFIX, "test/"); + keyValue.put(OssConstant.OBJECT_NAME, "oss_new.txt"); + keyValue.put(OssConstant.REGION, "cn-beijing"); + keyValue.put(OssConstant.PARTITION_METHOD, "Time"); + + ossSinkTask.start(keyValue); + List connectRecordList = new ArrayList<>(); + ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis()); + connectRecord.setData("{\n" + + "\t\"test\" : \"test\"\n" + + "}"); + connectRecordList.add(connectRecord); + ossSinkTask.init(new SinkTaskContext() { + @Override + public String getConnectorName() { + return null; + } + + @Override + public String getTaskName() { + return null; + } + + @Override public KeyValue configs() { + return null; + } + + @Override + public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) { + + } + + @Override + public void resetOffset(Map map) { + + } + + @Override + public void pause(List list) { + + } + + @Override + public void resume(List list) { + + } + + @Override + public Set assignment() { + return null; + } + }); + ossSinkTask.put(connectRecordList); + } + + +} From df80b507f1d129421bb7d6e8b4d487d5ee43748f Mon Sep 17 00:00:00 2001 From: Limbo-24 <1003239855@qq.com> Date: Mon, 9 Sep 2024 15:53:34 +0800 Subject: [PATCH 2/6] feat:support rocketmq-connect-oss sink --- .../aliyun/rocketmq-connect-oss/README.md | 24 ++-- .../connect/oss/sink/OssSinkConnector.java | 32 +++-- .../connect/oss/sink/OssSinkTask.java | 123 +++++++++++++++--- .../oss/sink/constant/OssConstant.java | 2 + .../connect/eventbridge/sink/OssSinkTest.java | 2 + 5 files changed, 140 insertions(+), 43 deletions(-) diff --git a/connectors/aliyun/rocketmq-connect-oss/README.md b/connectors/aliyun/rocketmq-connect-oss/README.md index 7a62d72ff..db5eed45a 100644 --- a/connectors/aliyun/rocketmq-connect-oss/README.md +++ b/connectors/aliyun/rocketmq-connect-oss/README.md @@ -22,7 +22,7 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-oss-sink-connector-na ``` http://localhost:8081/connectors/ossConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster", "connector-class":"org.apache.rocketmq.connect.oss.sink.OssSinkConnector","connect-topicname":"oss-topic","accountEndpoint":"xxxx","accessKeyId":"xxxx","accessKeySecret":"xxxx", -"bucketName":"xxxx","objectName":"xxxx","region":"xxxx","partitionMethod":"xxxx"} +"bucketName":"xxxx","objectName":"xxxx","region":"xxxx","partitionMethod":"xxxx","fileUrlPrefix":"xxxx"} ``` >**注:** `rocketmq-oss-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中 @@ -36,14 +36,16 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-oss-connector-name}/s ## rocketmq-connect-oss 参数说明 * **oss-sink-connector 参数说明** -| KEY | TYPE | Must be filled | Description | Example -|------------------------|---------|----------------|----------------------------------|---------| -|accountEndpoint | String | YES | OSS endpoint | oss-cn-beijing.aliyuncs.com| -|accessKeyId | String | YES | 阿里云授信账号的AK | xxxx | -|accessKeySecret | String | YES | 阿里云授信账号的SK | xxx | -|bucketName | String | YES | OSS bucketName | test_bucket | -|objectName | String | YES | 上传目的object名字 | test.txt | -|region | String | YES | OSS region | cn-beijing | -|partitionMethod | String | YES | 分区模式,Normal表示不分区,Time表示按时间分区 | Time | -|fileUrlPrefix | String | YES | 到object的URL前缀 | file1/ | +| KEY | TYPE | Must be filled | Description | Example +|-----------------|--------|----------------|--------------------------------|----------------------------| +| accountEndpoint | String | YES | OSS endpoint | oss-cn-beijing.aliyuncs.com | +| accessKeyId | String | YES | 阿里云授信账号的AK | xxxx | +| accessKeySecret | String | YES | 阿里云授信账号的SK | xxx | +| bucketName | String | YES | OSS bucketName | test_bucket | +| objectName | String | YES | 上传目的object名字 | test.txt | +| region | String | YES | OSS region | cn-beijing | +| partitionMethod | String | YES | 分区模式,Normal表示不分区,Time表示按时间分区 | Time | +| fileUrlPrefix | String | YES | 到object的URL前缀 | file1/ | +| enableBatchPut | String | NO | 是否开启批处理模式 | true | +| taskId | Int | NO | task id | 1 | diff --git a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkConnector.java b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkConnector.java index 2493f8a96..e244a5d82 100644 --- a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkConnector.java +++ b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkConnector.java @@ -28,19 +28,27 @@ public class OssSinkConnector extends SinkConnector { private String partitionMethod; + private boolean enableBatchPut; + + private int taskId; + @Override public List taskConfigs(int maxTasks) { - List keyValueList = new ArrayList<>(11); - KeyValue keyValue = new DefaultKeyValue(); - keyValue.put(OssConstant.ACCESS_KEY_ID, accessKeyId); - keyValue.put(OssConstant.ACCESS_KEY_SECRET, accessKeySecret); - keyValue.put(OssConstant.ACCOUNT_ENDPOINT, accountEndpoint); - keyValue.put(OssConstant.BUCKET_NAME, bucketName); - keyValue.put(OssConstant.FILE_URL_PREFIX, fileUrlPrefix); - keyValue.put(OssConstant.OBJECT_NAME, objectName); - keyValue.put(OssConstant.REGION, region); - keyValue.put(OssConstant.PARTITION_METHOD, partitionMethod); - keyValueList.add(keyValue); + List keyValueList = new ArrayList<>(); + for (int i = 0; i < maxTasks; ++i) { + KeyValue keyValue = new DefaultKeyValue(); + keyValue.put(OssConstant.ACCESS_KEY_ID, accessKeyId); + keyValue.put(OssConstant.ACCESS_KEY_SECRET, accessKeySecret); + keyValue.put(OssConstant.ACCOUNT_ENDPOINT, accountEndpoint); + keyValue.put(OssConstant.BUCKET_NAME, bucketName); + keyValue.put(OssConstant.FILE_URL_PREFIX, fileUrlPrefix); + keyValue.put(OssConstant.OBJECT_NAME, objectName); + keyValue.put(OssConstant.REGION, region); + keyValue.put(OssConstant.PARTITION_METHOD, partitionMethod); + keyValue.put(OssConstant.ENABLE_BATCH_PUT, String.valueOf(enableBatchPut)); + keyValue.put(OssConstant.TASK_ID, taskId + i); + keyValueList.add(keyValue); + } return keyValueList; } @@ -72,6 +80,8 @@ public void start(KeyValue config) { objectName = config.getString(OssConstant.OBJECT_NAME); region = config.getString(OssConstant.REGION); partitionMethod = config.getString(OssConstant.PARTITION_METHOD); + enableBatchPut = Boolean.parseBoolean(config.getString(OssConstant.ENABLE_BATCH_PUT, "false")); + taskId = config.getInt(OssConstant.TASK_ID, 0); } @Override diff --git a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java index 72d1b61d7..a33a5a8ff 100644 --- a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java +++ b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java @@ -8,6 +8,7 @@ import com.aliyuncs.http.FormatType; import com.aliyuncs.profile.DefaultProfile; import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.Struct; import io.openmessaging.connector.api.component.task.sink.SinkTask; import io.openmessaging.connector.api.component.task.sink.SinkTaskContext; import io.openmessaging.connector.api.data.ConnectRecord; @@ -33,6 +34,12 @@ import com.aliyun.oss.model.PutObjectRequest; import com.aliyun.oss.model.PutObjectResult; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Collectors; +import java.util.HashMap; import java.net.URI; import java.util.ArrayList; import java.util.Date; @@ -64,12 +71,46 @@ public class OssSinkTask extends SinkTask { private String compressType; + private boolean enableBatchPut; + + private int taskId; + private long lastOffset; private long lastTimeStamp; private String lastPrefix; + private HashMap> recordMap = new HashMap<>(); + + private ReentrantLock mapLock = new ReentrantLock(); + + private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + + private void processMap() throws ConnectException, IOException { + mapLock.lock(); + recordMap.forEach((key, values) -> { + String joinedString = values.stream().collect(Collectors.joining("\n")); + String absolutePath = key + objectName; + boolean exists = ossClient.doesObjectExist(bucketName, absolutePath); + long offset = 0; + // If the object does not exist, create it and set offset to 0, otherwise read the offset of the current object + if (exists) { + try { + OSSObject ossObject = ossClient.getObject(bucketName, absolutePath); + InputStream inputStream = ossObject.getObjectContent(); + offset = inputStream.available(); + } catch (Exception e) { + log.error("OSSSinkTask | getObjectContent | error => ", e); + } + } else { + offset = 0; + } + putOss(absolutePath, offset, joinedString); + }); + mapLock.unlock(); + } + private String genFilePrefixByPartition(ConnectRecord record) throws ConnectException { if (partitionMethod.equals("Normal")) { return fileUrlPrefix; @@ -87,8 +128,6 @@ private String genFilePrefixByPartition(ConnectRecord record) throws ConnectExce return lastPrefix; } else { throw new RetriableException("Illegal partition method."); - // log.error("Illegal partition method."); - // return ""; } } @@ -113,8 +152,50 @@ private long genObjectOffset(ConnectRecord record, String objectUrl) throws Conn } } else { throw new RetriableException("Illegal partition method."); - // log.error("Illegal partition method."); - // return 0; + } + } + + private void putOss(String absolutePath, long offset, String context) throws ConnectException { + try { + // Create an append write request and send it + AppendObjectRequest appendObjectRequest = new AppendObjectRequest(bucketName, absolutePath, new ByteArrayInputStream(context.getBytes())); + appendObjectRequest.setPosition(offset); + AppendObjectResult appendObjectResult = ossClient.appendObject(appendObjectRequest); + + // Update + lastOffset = appendObjectResult.getNextPosition(); + } catch (OSSException oe) { + System.out.println("Caught an OSSException, which means your request made it to OSS, " + + "but was rejected with an error response for some reason."); + System.out.println("Error Message:" + oe.getErrorMessage()); + System.out.println("Error Code:" + oe.getErrorCode()); + System.out.println("Request ID:" + oe.getRequestId()); + System.out.println("Host ID:" + oe.getHostId()); + } catch (ClientException ce) { + System.out.println("Caught an ClientException, which means the client encountered " + + "a serious internal problem while trying to communicate with OSS, " + + "such as not being able to access the network."); + System.out.println("Error Message:" + ce.getMessage()); + } + } + + private void handleRecord(ConnectRecord record) throws ConnectException, IOException { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("data", record.getData()); + String context = JSON.toJSONString(jsonObject); + String prefix = genFilePrefixByPartition(record); + if (enableBatchPut) { + mapLock.lock(); + if (!recordMap.containsKey(prefix)) { + recordMap.put(prefix, new ArrayList<>()); + } + recordMap.get(prefix).add(context); + mapLock.unlock(); + } else { + String absolutePath = prefix + objectName; + long appendOffset = genObjectOffset(record, absolutePath); + putOss(absolutePath, appendOffset, context); + lastTimeStamp = record.getTimestamp(); } } @@ -123,23 +204,7 @@ public void put(List sinkRecords) throws ConnectException { try { sinkRecords.forEach(sinkRecord -> { try { - //Create JOSN to save the info of connectrecord, now only contains the data content - JSONObject jsonObject = new JSONObject(); - jsonObject.put("data", sinkRecord.getData()); - String context = JSON.toJSONString(jsonObject); - - String prefix = genFilePrefixByPartition(sinkRecord); - String absolutePath = prefix + objectName; - long appendOffset = genObjectOffset(sinkRecord, absolutePath); - - // Create an append write request and send it - AppendObjectRequest appendObjectRequest = new AppendObjectRequest(bucketName, absolutePath, new ByteArrayInputStream(context.getBytes())); - appendObjectRequest.setPosition(appendOffset); - AppendObjectResult appendObjectResult = ossClient.appendObject(appendObjectRequest); - - // Update - lastOffset = appendObjectResult.getNextPosition(); - lastTimeStamp = sinkRecord.getTimestamp(); + handleRecord(sinkRecord); } catch (OSSException oe) { System.out.println("Caught an OSSException, which means your request made it to OSS, " + "but was rejected with an error response for some reason."); @@ -177,6 +242,21 @@ public void start(KeyValue config) { region = config.getString(OssConstant.REGION); partitionMethod = config.getString(OssConstant.PARTITION_METHOD); compressType = config.getString(OssConstant.COMPRESS_TYPE); + enableBatchPut = Boolean.parseBoolean(config.getString(OssConstant.ENABLE_BATCH_PUT, "false")); + taskId = config.getInt(OssConstant.TASK_ID); + fileUrlPrefix = fileUrlPrefix + "task_" + Integer.toString(taskId) + "/"; + + if (enableBatchPut) { + scheduler.scheduleAtFixedRate(() -> { + try { + if (!recordMap.isEmpty()) { + processMap(); + } + } catch (Exception e) { + log.error("OSSSinkTask | processMap | error => ", e); + } + }, 0, 10, TimeUnit.SECONDS); + } try { DefaultCredentialProvider credentialsProvider = CredentialsProviderFactory.newDefaultCredentialProvider(accessKeyId, accessKeySecret); @@ -209,6 +289,7 @@ public void start(KeyValue config) { @Override public void stop() { + scheduler.shutdown(); ossClient.shutdown(); } diff --git a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/constant/OssConstant.java b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/constant/OssConstant.java index 3c0648763..03b6f2c5a 100644 --- a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/constant/OssConstant.java +++ b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/constant/OssConstant.java @@ -11,4 +11,6 @@ public class OssConstant { public static final String REGION = "region"; public static final String PARTITION_METHOD = "partitionMethod"; public static final String COMPRESS_TYPE = "compressType"; + public static final String ENABLE_BATCH_PUT = "enableBatchPut"; + public static final String TASK_ID = "task_id"; } diff --git a/connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/OssSinkTest.java b/connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/OssSinkTest.java index b462b1b18..e2d7dfc20 100644 --- a/connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/OssSinkTest.java +++ b/connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/OssSinkTest.java @@ -37,6 +37,7 @@ public void testNormalPut() { keyValue.put(OssConstant.OBJECT_NAME, "oss_new.txt"); keyValue.put(OssConstant.REGION, "cn-beijing"); keyValue.put(OssConstant.PARTITION_METHOD, "Normal"); + keyValue.put(OssConstant.ENABLE_BATCH_PUT, "false"); ossSinkTask.start(keyValue); List connectRecordList = new ArrayList<>(); @@ -101,6 +102,7 @@ public void testTimePut() { keyValue.put(OssConstant.OBJECT_NAME, "oss_new.txt"); keyValue.put(OssConstant.REGION, "cn-beijing"); keyValue.put(OssConstant.PARTITION_METHOD, "Time"); + keyValue.put(OssConstant.ENABLE_BATCH_PUT, "false"); ossSinkTask.start(keyValue); List connectRecordList = new ArrayList<>(); From 7bb048d1f3b565cea4cf773f52eed42263bc2571 Mon Sep 17 00:00:00 2001 From: Limbo-24 <1003239855@qq.com> Date: Fri, 20 Sep 2024 11:05:59 +0800 Subject: [PATCH 3/6] feat:support rocketmq-connect-oss sink --- .../aliyun/rocketmq-connect-oss/README.md | 1 - .../connect/oss/sink/OssSinkConnector.java | 4 -- .../connect/oss/sink/OssSinkTask.java | 40 +++++-------------- .../oss/sink/constant/OssConstant.java | 1 - .../connect/eventbridge/sink/OssSinkTest.java | 12 +++--- 5 files changed, 17 insertions(+), 41 deletions(-) diff --git a/connectors/aliyun/rocketmq-connect-oss/README.md b/connectors/aliyun/rocketmq-connect-oss/README.md index db5eed45a..87644cbc4 100644 --- a/connectors/aliyun/rocketmq-connect-oss/README.md +++ b/connectors/aliyun/rocketmq-connect-oss/README.md @@ -47,5 +47,4 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-oss-connector-name}/s | partitionMethod | String | YES | 分区模式,Normal表示不分区,Time表示按时间分区 | Time | | fileUrlPrefix | String | YES | 到object的URL前缀 | file1/ | | enableBatchPut | String | NO | 是否开启批处理模式 | true | -| taskId | Int | NO | task id | 1 | diff --git a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkConnector.java b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkConnector.java index e244a5d82..c8c9a3dd3 100644 --- a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkConnector.java +++ b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkConnector.java @@ -30,8 +30,6 @@ public class OssSinkConnector extends SinkConnector { private boolean enableBatchPut; - private int taskId; - @Override public List taskConfigs(int maxTasks) { List keyValueList = new ArrayList<>(); @@ -46,7 +44,6 @@ public List taskConfigs(int maxTasks) { keyValue.put(OssConstant.REGION, region); keyValue.put(OssConstant.PARTITION_METHOD, partitionMethod); keyValue.put(OssConstant.ENABLE_BATCH_PUT, String.valueOf(enableBatchPut)); - keyValue.put(OssConstant.TASK_ID, taskId + i); keyValueList.add(keyValue); } return keyValueList; @@ -81,7 +78,6 @@ public void start(KeyValue config) { region = config.getString(OssConstant.REGION); partitionMethod = config.getString(OssConstant.PARTITION_METHOD); enableBatchPut = Boolean.parseBoolean(config.getString(OssConstant.ENABLE_BATCH_PUT, "false")); - taskId = config.getInt(OssConstant.TASK_ID, 0); } @Override diff --git a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java index a33a5a8ff..83b936f76 100644 --- a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java +++ b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java @@ -34,10 +34,6 @@ import com.aliyun.oss.model.PutObjectRequest; import com.aliyun.oss.model.PutObjectResult; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; import java.util.HashMap; import java.net.URI; @@ -73,7 +69,7 @@ public class OssSinkTask extends SinkTask { private boolean enableBatchPut; - private int taskId; + private String taskId; private long lastOffset; @@ -83,12 +79,7 @@ public class OssSinkTask extends SinkTask { private HashMap> recordMap = new HashMap<>(); - private ReentrantLock mapLock = new ReentrantLock(); - - private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - private void processMap() throws ConnectException, IOException { - mapLock.lock(); recordMap.forEach((key, values) -> { String joinedString = values.stream().collect(Collectors.joining("\n")); String absolutePath = key + objectName; @@ -108,7 +99,6 @@ private void processMap() throws ConnectException, IOException { } putOss(absolutePath, offset, joinedString); }); - mapLock.unlock(); } private String genFilePrefixByPartition(ConnectRecord record) throws ConnectException { @@ -185,12 +175,10 @@ private void handleRecord(ConnectRecord record) throws ConnectException, IOExcep String context = JSON.toJSONString(jsonObject); String prefix = genFilePrefixByPartition(record); if (enableBatchPut) { - mapLock.lock(); if (!recordMap.containsKey(prefix)) { recordMap.put(prefix, new ArrayList<>()); } recordMap.get(prefix).add(context); - mapLock.unlock(); } else { String absolutePath = prefix + objectName; long appendOffset = genObjectOffset(record, absolutePath); @@ -221,6 +209,9 @@ public void put(List sinkRecords) throws ConnectException { log.error("OSSSinkTask | genObjectOffset | error => ", e); } }); + if (enableBatchPut && !recordMap.isEmpty()) { + processMap(); + } } catch (Exception e) { log.error("OSSSinkTask | put | error => ", e); } @@ -231,6 +222,11 @@ public void validate(KeyValue config) { } + @Override + public void init(SinkTaskContext sinkTaskContext) { + taskId = sinkTaskContext.getConnectorName() + "-" + sinkTaskContext.getTaskName(); + } + @Override public void start(KeyValue config) { accessKeyId = config.getString(OssConstant.ACCESS_KEY_ID); @@ -243,21 +239,8 @@ public void start(KeyValue config) { partitionMethod = config.getString(OssConstant.PARTITION_METHOD); compressType = config.getString(OssConstant.COMPRESS_TYPE); enableBatchPut = Boolean.parseBoolean(config.getString(OssConstant.ENABLE_BATCH_PUT, "false")); - taskId = config.getInt(OssConstant.TASK_ID); - fileUrlPrefix = fileUrlPrefix + "task_" + Integer.toString(taskId) + "/"; - - if (enableBatchPut) { - scheduler.scheduleAtFixedRate(() -> { - try { - if (!recordMap.isEmpty()) { - processMap(); - } - } catch (Exception e) { - log.error("OSSSinkTask | processMap | error => ", e); - } - }, 0, 10, TimeUnit.SECONDS); - } - + + fileUrlPrefix = fileUrlPrefix + taskId + "/"; try { DefaultCredentialProvider credentialsProvider = CredentialsProviderFactory.newDefaultCredentialProvider(accessKeyId, accessKeySecret); @@ -289,7 +272,6 @@ public void start(KeyValue config) { @Override public void stop() { - scheduler.shutdown(); ossClient.shutdown(); } diff --git a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/constant/OssConstant.java b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/constant/OssConstant.java index 03b6f2c5a..083d740fb 100644 --- a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/constant/OssConstant.java +++ b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/constant/OssConstant.java @@ -12,5 +12,4 @@ public class OssConstant { public static final String PARTITION_METHOD = "partitionMethod"; public static final String COMPRESS_TYPE = "compressType"; public static final String ENABLE_BATCH_PUT = "enableBatchPut"; - public static final String TASK_ID = "task_id"; } diff --git a/connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/OssSinkTest.java b/connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/OssSinkTest.java index e2d7dfc20..9fabeebab 100644 --- a/connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/OssSinkTest.java +++ b/connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/OssSinkTest.java @@ -39,7 +39,6 @@ public void testNormalPut() { keyValue.put(OssConstant.PARTITION_METHOD, "Normal"); keyValue.put(OssConstant.ENABLE_BATCH_PUT, "false"); - ossSinkTask.start(keyValue); List connectRecordList = new ArrayList<>(); ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis()); connectRecord.setData("{\n" + @@ -49,12 +48,12 @@ public void testNormalPut() { ossSinkTask.init(new SinkTaskContext() { @Override public String getConnectorName() { - return null; + return "test_connect"; } @Override public String getTaskName() { - return null; + return "test_task"; } @Override public KeyValue configs() { @@ -86,6 +85,7 @@ public Set assignment() { return null; } }); + ossSinkTask.start(keyValue); ossSinkTask.put(connectRecordList); } @@ -104,7 +104,6 @@ public void testTimePut() { keyValue.put(OssConstant.PARTITION_METHOD, "Time"); keyValue.put(OssConstant.ENABLE_BATCH_PUT, "false"); - ossSinkTask.start(keyValue); List connectRecordList = new ArrayList<>(); ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis()); connectRecord.setData("{\n" + @@ -114,12 +113,12 @@ public void testTimePut() { ossSinkTask.init(new SinkTaskContext() { @Override public String getConnectorName() { - return null; + return "test_connect"; } @Override public String getTaskName() { - return null; + return "test_task"; } @Override public KeyValue configs() { @@ -151,6 +150,7 @@ public Set assignment() { return null; } }); + ossSinkTask.start(keyValue); ossSinkTask.put(connectRecordList); } From 8f5b5097b49d4f5c468ebac80fa5aa6cbfe8ca7c Mon Sep 17 00:00:00 2001 From: limbo-24 <1003239855@qq.com> Date: Sun, 22 Sep 2024 00:17:25 +0800 Subject: [PATCH 4/6] fix by comments --- .../connect/oss/sink/OssSinkTask.java | 54 +++++++++++-------- 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java index 83b936f76..0deebc228 100644 --- a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java +++ b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java @@ -34,11 +34,13 @@ import com.aliyun.oss.model.PutObjectRequest; import com.aliyun.oss.model.PutObjectResult; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.HashMap; import java.net.URI; import java.util.ArrayList; import java.util.Date; +import java.text.SimpleDateFormat; import java.util.List; import java.io.ByteArrayInputStream; import java.io.InputStream; @@ -79,6 +81,8 @@ public class OssSinkTask extends SinkTask { private HashMap> recordMap = new HashMap<>(); + private static final long OBJECT_SIZE_THRESHOLD = 200 * 1024 * 1024; + private void processMap() throws ConnectException, IOException { recordMap.forEach((key, values) -> { String joinedString = values.stream().collect(Collectors.joining("\n")); @@ -91,6 +95,14 @@ private void processMap() throws ConnectException, IOException { OSSObject ossObject = ossClient.getObject(bucketName, absolutePath); InputStream inputStream = ossObject.getObjectContent(); offset = inputStream.available(); + if (offset > OBJECT_SIZE_THRESHOLD) { + SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String sufix = formatter.format(new Date()); + ossClient.copyObject(bucketName, absolutePath, bucketName, absolutePath + sufix); + ossClient.deleteObject(bucketName, absolutePath); + ossClient.doesObjectExist(bucketName, absolutePath); + offset = 0; + } } catch (Exception e) { log.error("OSSSinkTask | getObjectContent | error => ", e); } @@ -155,17 +167,11 @@ private void putOss(String absolutePath, long offset, String context) throws Con // Update lastOffset = appendObjectResult.getNextPosition(); } catch (OSSException oe) { - System.out.println("Caught an OSSException, which means your request made it to OSS, " - + "but was rejected with an error response for some reason."); - System.out.println("Error Message:" + oe.getErrorMessage()); - System.out.println("Error Code:" + oe.getErrorCode()); - System.out.println("Request ID:" + oe.getRequestId()); - System.out.println("Host ID:" + oe.getHostId()); + log.error("Caught an OSSException, which means your request made it to OSS, but was rejected with an error response for some reason." + + " Error Message: {}, Error Code: {}, Request ID: {}, Host ID: {}.", oe.getErrorMessage(),oe.getErrorCode(), oe.getRequestId(), oe.getHostId()); } catch (ClientException ce) { - System.out.println("Caught an ClientException, which means the client encountered " - + "a serious internal problem while trying to communicate with OSS, " - + "such as not being able to access the network."); - System.out.println("Error Message:" + ce.getMessage()); + log.error("Caught an ClientException, which means the client encountered a serious internal problem while trying to communicate with OSS," + + " such as not being able to access the network. Error Message: {}.", ce.getMessage()); } } @@ -182,6 +188,14 @@ private void handleRecord(ConnectRecord record) throws ConnectException, IOExcep } else { String absolutePath = prefix + objectName; long appendOffset = genObjectOffset(record, absolutePath); + if (appendOffset > OBJECT_SIZE_THRESHOLD) { + SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String sufix = formatter.format(new Date()); + ossClient.copyObject(bucketName, absolutePath, bucketName, absolutePath + sufix); + ossClient.deleteObject(bucketName, absolutePath); + ossClient.doesObjectExist(bucketName, absolutePath); + lastOffset = 0; + } putOss(absolutePath, appendOffset, context); lastTimeStamp = record.getTimestamp(); } @@ -190,26 +204,24 @@ private void handleRecord(ConnectRecord record) throws ConnectException, IOExcep @Override public void put(List sinkRecords) throws ConnectException { try { + AtomicBoolean hasException = new AtomicBoolean(false); sinkRecords.forEach(sinkRecord -> { try { handleRecord(sinkRecord); } catch (OSSException oe) { - System.out.println("Caught an OSSException, which means your request made it to OSS, " - + "but was rejected with an error response for some reason."); - System.out.println("Error Message:" + oe.getErrorMessage()); - System.out.println("Error Code:" + oe.getErrorCode()); - System.out.println("Request ID:" + oe.getRequestId()); - System.out.println("Host ID:" + oe.getHostId()); + log.error("Caught an OSSException, which means your request made it to OSS, but was rejected with an error response for some reason." + + " Error Message: {}, Error Code: {}, Request ID: {}, Host ID: {}.", oe.getErrorMessage(),oe.getErrorCode(), oe.getRequestId(), oe.getHostId()); + hasException.set(true); } catch (ClientException ce) { - System.out.println("Caught an ClientException, which means the client encountered " - + "a serious internal problem while trying to communicate with OSS, " - + "such as not being able to access the network."); - System.out.println("Error Message:" + ce.getMessage()); + log.error("Caught an ClientException, which means the client encountered a serious internal problem while trying to communicate with OSS," + + " such as not being able to access the network. Error Message: {}.", ce.getMessage()); + hasException.set(true); } catch (Exception e) { log.error("OSSSinkTask | genObjectOffset | error => ", e); + hasException.set(true); } }); - if (enableBatchPut && !recordMap.isEmpty()) { + if (!hasException.get() && enableBatchPut && !recordMap.isEmpty()) { processMap(); } } catch (Exception e) { From 246da5de44426baf3a570d908d0c9d0281cbee00 Mon Sep 17 00:00:00 2001 From: limbo-24 <1003239855@qq.com> Date: Mon, 28 Oct 2024 00:55:42 +0800 Subject: [PATCH 5/6] remove enableBatchPut --- .../aliyun/rocketmq-connect-oss/README.md | 22 ++++++------ .../connect/oss/sink/OssSinkConnector.java | 4 --- .../connect/oss/sink/OssSinkTask.java | 35 +++++++------------ .../oss/sink/constant/OssConstant.java | 1 - .../connect/eventbridge/sink/OssSinkTest.java | 10 +++--- 5 files changed, 27 insertions(+), 45 deletions(-) diff --git a/connectors/aliyun/rocketmq-connect-oss/README.md b/connectors/aliyun/rocketmq-connect-oss/README.md index 87644cbc4..161608406 100644 --- a/connectors/aliyun/rocketmq-connect-oss/README.md +++ b/connectors/aliyun/rocketmq-connect-oss/README.md @@ -36,15 +36,13 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-oss-connector-name}/s ## rocketmq-connect-oss 参数说明 * **oss-sink-connector 参数说明** -| KEY | TYPE | Must be filled | Description | Example -|-----------------|--------|----------------|--------------------------------|----------------------------| -| accountEndpoint | String | YES | OSS endpoint | oss-cn-beijing.aliyuncs.com | -| accessKeyId | String | YES | 阿里云授信账号的AK | xxxx | -| accessKeySecret | String | YES | 阿里云授信账号的SK | xxx | -| bucketName | String | YES | OSS bucketName | test_bucket | -| objectName | String | YES | 上传目的object名字 | test.txt | -| region | String | YES | OSS region | cn-beijing | -| partitionMethod | String | YES | 分区模式,Normal表示不分区,Time表示按时间分区 | Time | -| fileUrlPrefix | String | YES | 到object的URL前缀 | file1/ | -| enableBatchPut | String | NO | 是否开启批处理模式 | true | - +| KEY | TYPE | Must be filled | Description | Example +|-----------------|--------|----------------|-------------------------------|----------------------------| +| accountEndpoint | String | YES | OSS endpoint | oss-cn-beijing.aliyuncs.com | +| accessKeyId | String | YES | 阿里云授信账号的AK | xxxx | +| accessKeySecret | String | YES | 阿里云授信账号的SK | xxx | +| bucketName | String | YES | OSS bucketName | test_bucket | +| objectName | String | YES | 上传目的object名字 | test.txt | +| region | String | YES | OSS region | cn-beijing | +| partitionMethod | String | YES | 分区模式,Normal表示不分区,Time表示按时间分区 | Time | +| fileUrlPrefix | String | YES | 到object的URL前缀 | file1/ | diff --git a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkConnector.java b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkConnector.java index c8c9a3dd3..3c1d73870 100644 --- a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkConnector.java +++ b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkConnector.java @@ -28,8 +28,6 @@ public class OssSinkConnector extends SinkConnector { private String partitionMethod; - private boolean enableBatchPut; - @Override public List taskConfigs(int maxTasks) { List keyValueList = new ArrayList<>(); @@ -43,7 +41,6 @@ public List taskConfigs(int maxTasks) { keyValue.put(OssConstant.OBJECT_NAME, objectName); keyValue.put(OssConstant.REGION, region); keyValue.put(OssConstant.PARTITION_METHOD, partitionMethod); - keyValue.put(OssConstant.ENABLE_BATCH_PUT, String.valueOf(enableBatchPut)); keyValueList.add(keyValue); } return keyValueList; @@ -77,7 +74,6 @@ public void start(KeyValue config) { objectName = config.getString(OssConstant.OBJECT_NAME); region = config.getString(OssConstant.REGION); partitionMethod = config.getString(OssConstant.PARTITION_METHOD); - enableBatchPut = Boolean.parseBoolean(config.getString(OssConstant.ENABLE_BATCH_PUT, "false")); } @Override diff --git a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java index 0deebc228..002bc30bd 100644 --- a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java +++ b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/OssSinkTask.java @@ -69,8 +69,6 @@ public class OssSinkTask extends SinkTask { private String compressType; - private boolean enableBatchPut; - private String taskId; private long lastOffset; @@ -180,25 +178,19 @@ private void handleRecord(ConnectRecord record) throws ConnectException, IOExcep jsonObject.put("data", record.getData()); String context = JSON.toJSONString(jsonObject); String prefix = genFilePrefixByPartition(record); - if (enableBatchPut) { - if (!recordMap.containsKey(prefix)) { - recordMap.put(prefix, new ArrayList<>()); - } - recordMap.get(prefix).add(context); - } else { - String absolutePath = prefix + objectName; - long appendOffset = genObjectOffset(record, absolutePath); - if (appendOffset > OBJECT_SIZE_THRESHOLD) { - SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - String sufix = formatter.format(new Date()); - ossClient.copyObject(bucketName, absolutePath, bucketName, absolutePath + sufix); - ossClient.deleteObject(bucketName, absolutePath); - ossClient.doesObjectExist(bucketName, absolutePath); - lastOffset = 0; - } - putOss(absolutePath, appendOffset, context); - lastTimeStamp = record.getTimestamp(); + + String absolutePath = prefix + objectName; + long appendOffset = genObjectOffset(record, absolutePath); + if (appendOffset > OBJECT_SIZE_THRESHOLD) { + SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String sufix = formatter.format(new Date()); + ossClient.copyObject(bucketName, absolutePath, bucketName, absolutePath + sufix); + ossClient.deleteObject(bucketName, absolutePath); + ossClient.doesObjectExist(bucketName, absolutePath); + lastOffset = 0; } + putOss(absolutePath, appendOffset, context); + lastTimeStamp = record.getTimestamp(); } @Override @@ -221,7 +213,7 @@ public void put(List sinkRecords) throws ConnectException { hasException.set(true); } }); - if (!hasException.get() && enableBatchPut && !recordMap.isEmpty()) { + if (!hasException.get() && !recordMap.isEmpty()) { processMap(); } } catch (Exception e) { @@ -250,7 +242,6 @@ public void start(KeyValue config) { region = config.getString(OssConstant.REGION); partitionMethod = config.getString(OssConstant.PARTITION_METHOD); compressType = config.getString(OssConstant.COMPRESS_TYPE); - enableBatchPut = Boolean.parseBoolean(config.getString(OssConstant.ENABLE_BATCH_PUT, "false")); fileUrlPrefix = fileUrlPrefix + taskId + "/"; try { diff --git a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/constant/OssConstant.java b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/constant/OssConstant.java index 083d740fb..3c0648763 100644 --- a/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/constant/OssConstant.java +++ b/connectors/aliyun/rocketmq-connect-oss/src/main/java/org/apache/rocketmq/connect/oss/sink/constant/OssConstant.java @@ -11,5 +11,4 @@ public class OssConstant { public static final String REGION = "region"; public static final String PARTITION_METHOD = "partitionMethod"; public static final String COMPRESS_TYPE = "compressType"; - public static final String ENABLE_BATCH_PUT = "enableBatchPut"; } diff --git a/connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/OssSinkTest.java b/connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/OssSinkTest.java index 9fabeebab..1e21169b9 100644 --- a/connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/OssSinkTest.java +++ b/connectors/aliyun/rocketmq-connect-oss/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/OssSinkTest.java @@ -29,15 +29,14 @@ public void testNormalPut() { OssSinkTask ossSinkTask = new OssSinkTask(); KeyValue keyValue = new DefaultKeyValue(); // Replace KV pair with your own message - keyValue.put(OssConstant.ACCESS_KEY_ID, "LTAI5t68yKJXx6HbkrKowqe8"); - keyValue.put(OssConstant.ACCESS_KEY_SECRET, "eiDUU47CIJ0ShVX2zzl3KhehyscrSY"); + keyValue.put(OssConstant.ACCESS_KEY_ID, "xxx"); // Input yuor accesskey id + keyValue.put(OssConstant.ACCESS_KEY_SECRET, "xxx"); // Input your accesskey secret keyValue.put(OssConstant.ACCOUNT_ENDPOINT, "oss-cn-beijing.aliyuncs.com"); keyValue.put(OssConstant.BUCKET_NAME, "rocketmqoss"); keyValue.put(OssConstant.FILE_URL_PREFIX, "test/"); keyValue.put(OssConstant.OBJECT_NAME, "oss_new.txt"); keyValue.put(OssConstant.REGION, "cn-beijing"); keyValue.put(OssConstant.PARTITION_METHOD, "Normal"); - keyValue.put(OssConstant.ENABLE_BATCH_PUT, "false"); List connectRecordList = new ArrayList<>(); ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis()); @@ -94,15 +93,14 @@ public void testTimePut() { OssSinkTask ossSinkTask = new OssSinkTask(); KeyValue keyValue = new DefaultKeyValue(); // Replace KV pair with your own message - keyValue.put(OssConstant.ACCESS_KEY_ID, "LTAI5t68yKJXx6HbkrKowqe8"); - keyValue.put(OssConstant.ACCESS_KEY_SECRET, "eiDUU47CIJ0ShVX2zzl3KhehyscrSY"); + keyValue.put(OssConstant.ACCESS_KEY_ID, "xxx"); // Input yuor accesskey id + keyValue.put(OssConstant.ACCESS_KEY_SECRET, "xxx"); // Input your accesskey secret keyValue.put(OssConstant.ACCOUNT_ENDPOINT, "oss-cn-beijing.aliyuncs.com"); keyValue.put(OssConstant.BUCKET_NAME, "rocketmqoss"); keyValue.put(OssConstant.FILE_URL_PREFIX, "test/"); keyValue.put(OssConstant.OBJECT_NAME, "oss_new.txt"); keyValue.put(OssConstant.REGION, "cn-beijing"); keyValue.put(OssConstant.PARTITION_METHOD, "Time"); - keyValue.put(OssConstant.ENABLE_BATCH_PUT, "false"); List connectRecordList = new ArrayList<>(); ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis()); From 18089e43f8a25968a374a1ffcb7c27dbb83a2bee Mon Sep 17 00:00:00 2001 From: limbo-24 <1003239855@qq.com> Date: Mon, 28 Oct 2024 21:28:28 +0800 Subject: [PATCH 6/6] modify version --- connectors/aliyun/rocketmq-connect-oss/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/connectors/aliyun/rocketmq-connect-oss/pom.xml b/connectors/aliyun/rocketmq-connect-oss/pom.xml index f303c5326..9ac09327a 100644 --- a/connectors/aliyun/rocketmq-connect-oss/pom.xml +++ b/connectors/aliyun/rocketmq-connect-oss/pom.xml @@ -6,7 +6,7 @@ org.apache.rocketmq rocketmq-connect-oss - 0.0.1-SNAPSHOT + 1.0.0 connect-oss @@ -215,4 +215,4 @@ - \ No newline at end of file +