Skip to content

Commit

Permalink
Reformat
Browse files Browse the repository at this point in the history
  • Loading branch information
t3t5u committed Dec 18, 2024
1 parent 2077c09 commit 0a9c0ee
Showing 1 changed file with 72 additions and 15 deletions.
87 changes: 72 additions & 15 deletions src/main/java/org/embulk/output/s3/S3FileOutputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -362,17 +362,42 @@ private void executeMultipartUpload(Path from, String key, ExecutorService execu
int partNumber = 1;
int totalParts = (int) (fileSize / partSize) + (fileSize % partSize == 0 ? 0 : 1);
List<Future<PartETag>> partETags = new ArrayList<>();
multipartUploadId = client.initiateMultipartUpload(new InitiateMultipartUploadRequest(bucket, key)).getUploadId();
multipartUploadId = client.initiateMultipartUpload(
new InitiateMultipartUploadRequest(bucket, key)).getUploadId();
for (; fileOffset < fileSize; fileOffset += partSize, partNumber++) {
partETags.add(submitUploadPart(key, file, fileSize, fileOffset, partSize, partNumber, totalParts, executor));
}
client.completeMultipartUpload(new CompleteMultipartUploadRequest(bucket, key, multipartUploadId, collect(partETags)));
partETags.add(submitUploadPart(
key,
file,
fileSize,
fileOffset,
partSize,
partNumber,
totalParts,
executor));
}
client.completeMultipartUpload(
new CompleteMultipartUploadRequest(bucket, key, multipartUploadId, collect(partETags)));
multipartUploadId = null; // Successfully completed
}

private Future<PartETag> submitUploadPart(String key, File file, long fileSize, long fileOffset, long partSize, int partNumber, int totalParts, ExecutorService executor)
private Future<PartETag> submitUploadPart(
String key,
File file,
long fileSize,
long fileOffset,
long partSize,
int partNumber,
int totalParts,
ExecutorService executor)
{
return executor.submit(() -> new UploadPart(key, file, fileSize, fileOffset, partSize, partNumber, totalParts).runInterruptible());
return executor.submit(() -> new UploadPart(
key,
file,
fileSize,
fileOffset,
partSize,
partNumber,
totalParts).runInterruptible());
}

private class UploadPart implements Retryable<PartETag>
Expand All @@ -389,7 +414,14 @@ private class UploadPart implements Retryable<PartETag>
final boolean isLastPart;
final String md5Digest;

UploadPart(String key, File file, long fileSize, long fileOffset, long partSize, int partNumber, int totalParts)
UploadPart(
String key,
File file,
long fileSize,
long fileOffset,
long partSize,
int partNumber,
int totalParts)
{
this.key = key;
this.file = file;
Expand All @@ -404,9 +436,15 @@ private class UploadPart implements Retryable<PartETag>

PartETag runInterruptible() throws InterruptedException, RetryGiveupException
{
logger.info("Uploading a part {} / {}. bucket '{}', key '{}', upload id '{}'", partNumber, totalParts, bucket, key, multipartUploadId);
logger.info("Uploading a part {} / {}."
+ " bucket '{}', key '{}', upload id '{}'",
partNumber, totalParts,
bucket, key, multipartUploadId);
PartETag partETag = re.runInterruptible(this);
logger.info("Uploaded {} / {} bytes of the file. entity tag '{}'", df.format(fileOffset + partSize), df.format(fileSize), partETag.getETag());
logger.info("Uploaded {} / {} bytes of the file."
+ " entity tag '{}'",
df.format(fileOffset + partSize), df.format(fileSize),
partETag.getETag());
return partETag;
}

Expand All @@ -425,17 +463,29 @@ public boolean isRetryableException(Exception exception)
@Override
public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait)
{
logger.info("An error occurred while uploading a part {} / {}, will retry {} / {} after {} milliseconds.", partNumber, totalParts, retryCount, retryLimit, df.format(retryWait), exception);
logger.info("An error occurred while uploading a part {} / {},"
+ " will retry {} / {} after {} milliseconds.",
partNumber, totalParts,
retryCount, retryLimit, df.format(retryWait), exception);
}

@Override
public void onGiveup(Exception firstException, Exception lastException)
{
logger.warn("An error occurred while uploading a part {} / {}, give up on retries.", partNumber, totalParts, lastException);
logger.warn("An error occurred while uploading a part {} / {},"
+ " give up on retries.",
partNumber, totalParts, lastException);
}
}

private PartETag uploadPart(String key, File file, long fileOffset, long partSize, int partNumber, boolean isLastPart, String md5Digest)
private PartETag uploadPart(
String key,
File file,
long fileOffset,
long partSize,
int partNumber,
boolean isLastPart,
String md5Digest)
{
return client.uploadPart(new UploadPartRequest()
.withBucketName(bucket)
Expand All @@ -457,11 +507,15 @@ private void abortMultipartUploadIfNecessary(String key, ExecutorService executo
}
try {
abortMultipartUpload(key, executor);
logger.info("Aborts a multipart upload. bucket '{}', key '{}', upload id '{}'", bucket, key, multipartUploadId);
logger.info("Aborts a multipart upload."
+ " bucket '{}', key '{}', upload id '{}'",
bucket, key, multipartUploadId);
}
catch (RuntimeException e) {
logger.warn("An error occurred while aborting a multipart upload.", e);
logger.warn("An incomplete multipart upload may remain. bucket '{}', key '{}', upload id '{}'", bucket, key, multipartUploadId);
logger.warn("An incomplete multipart upload may remain."
+ " bucket '{}', key '{}', upload id '{}'",
bucket, key, multipartUploadId);
}
}

Expand Down Expand Up @@ -629,7 +683,10 @@ public static class MultipartUpload
public final int retryLimit;

@JsonCreator
public MultipartUpload(@JsonProperty("part_size") String partSize, @JsonProperty("max_threads") Integer maxThreads, @JsonProperty("retry_limit") Integer retryLimit)
public MultipartUpload(
@JsonProperty("part_size") String partSize,
@JsonProperty("max_threads") Integer maxThreads,
@JsonProperty("retry_limit") Integer retryLimit)
{
this.partSize = parseLong(partSize != null ? partSize : "5g");
this.maxThreads = maxThreads != null ? maxThreads : 4;
Expand Down

0 comments on commit 0a9c0ee

Please sign in to comment.