-
Notifications
You must be signed in to change notification settings - Fork 211
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add bug fixes and improvements to DDB source #3559
Conversation
Signed-off-by: Aiden Dai <[email protected]>
Signed-off-by: Aiden Dai <[email protected]>
@@ -37,30 +46,37 @@ public class DataFileLoader implements Runnable { | |||
*/ | |||
private static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 2 * 60_000; | |||
|
|||
static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60); | |||
static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how were this number chosen ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both number won't make too much differences.
There will be retries after timeout anyway. 1000 is the batch size we used for reading file and reading stream data, so we just use the same.
@@ -128,7 +150,9 @@ public void run() { | |||
int lineCount = 0; | |||
int lastLineProcessed = 0; | |||
|
|||
try (GZIPInputStream gzipInputStream = new GZIPInputStream(s3ObjectReader.readFile(bucketName, key))) { | |||
try { | |||
InputStream inputStream = objectReader.readFile(bucketName, key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
try (
InputStream inputStream = objectReader.readFile(bucketName, key);
GZIPInputStream gzipInputStream = new GZIPInputStream(inputStream);
BufferedReader reader = new BufferedReader(new InputStreamReader(gzipInputStream));
) {
..
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't know we could do that. I thought it can only be one.
LOG.info("Complete loading s3://{}/{}", bucketName, key); | ||
} catch (Exception e) { | ||
} catch (IOException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this exception scoped down ? How are we handling other exception ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. There may be other exceptions. Since this is merged, I will update this in the next PR.
Description
Fix below bugs:
out of range of int
for item countsMake below improvements:
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.