-
Notifications
You must be signed in to change notification settings - Fork 119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[OSPP2024]feat: support aliyun OSS Sink Connector #540
Conversation
try { | ||
OSSObject ossObject = ossClient.getObject(bucketName, absolutePath); | ||
InputStream inputStream = ossObject.getObjectContent(); | ||
offset = inputStream.available(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Roll the file once its size exceeds the threshold. For example, when the file size becomes larger than 200 MB, create a new file for writing and archive the old ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before writing to OSS, check the offset size. If it exceeds OBJECT_SIZE_THRESHOLD, append the current timestamp as a suffix to the specified file name. For example: test.txt -> test.txt_2024-09-21-22:33:00
try { | ||
handleRecord(sinkRecord); | ||
} catch (OSSException oe) { | ||
System.out.println("Caught an OSSException, which means your request made it to OSS, " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't write msg to STDOUT, use a log instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed: replaced System.out.println
with log.error
.
log.error("OSSSinkTask | genObjectOffset | error => ", e); | ||
} | ||
}); | ||
if (enableBatchPut && !recordMap.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When any exception happens above, I think processMap() should not be called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a boolean variable hasException
to ensure processMap
is called only if all records are processed successfully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| region | String | YES | OSS region | cn-beijing | | ||
| partitionMethod | String | YES | 分区模式,Normal表示不分区,Time表示按时间分区 | Time | | ||
| fileUrlPrefix | String | YES | 到object的URL前缀 | file1/ | | ||
| enableBatchPut | String | NO | 是否开启批处理模式 | true | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Under what circumstances will enableBatchPut be set to false? If such a scenario does not exist, this parameter can be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed enableBatchPut
|
||
<groupId>org.apache.rocketmq</groupId> | ||
<artifactId>rocketmq-connect-oss</artifactId> | ||
<version>0.0.1-SNAPSHOT</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use release version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
No description provided.