Skip to content

Commit

Permalink
NIFI-13930 PutAzureDataLakeStorage sets close flag on file write so t…
Browse files Browse the repository at this point in the history
…hat Azure can emit FlushWithClose event

Signed-off-by: Pierre Villard <[email protected]>

This closes apache#9451.
  • Loading branch information
turcsanyip authored and pvillard31 committed Oct 27, 2024
1 parent b6952f1 commit eb8d4ee
Showing 1 changed file with 13 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
*/
package org.apache.nifi.processors.azure.storage;

import com.azure.core.util.Context;
import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import com.azure.storage.file.datalake.options.DataLakeFileFlushOptions;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
Expand Down Expand Up @@ -225,21 +227,18 @@ private void createDirectoryIfNotExists(final DataLakeDirectoryClient directoryC

private void uploadFile(final ProcessSession session, final FlowFile flowFile, final Optional<FileResource> fileResourceFound,
final long transferSize, final DataLakeFileClient fileClient) throws Exception {
if (transferSize > 0) {
try (final InputStream inputStream = new BufferedInputStream(
fileResourceFound.map(FileResource::getInputStream)
.orElseGet(() -> session.read(flowFile)))
) {
uploadContent(fileClient, inputStream, transferSize);
} catch (final Exception e) {
removeFile(fileClient);
throw e;
}
try (final InputStream inputStream = new BufferedInputStream(
fileResourceFound.map(FileResource::getInputStream)
.orElseGet(() -> session.read(flowFile)))
) {
uploadContent(fileClient, inputStream, transferSize);
} catch (final Exception e) {
removeFile(fileClient);
throw e;
}
}

//Visible for testing
static void uploadContent(final DataLakeFileClient fileClient, final InputStream in, final long length) throws IOException {
private static void uploadContent(final DataLakeFileClient fileClient, final InputStream in, final long length) throws IOException {
long chunkStart = 0;
long chunkSize;

Expand All @@ -258,8 +257,7 @@ static void uploadContent(final DataLakeFileClient fileClient, final InputStream
chunkStart += chunkSize;
}

// use overwrite mode due to https://github.com/Azure/azure-sdk-for-java/issues/31248
fileClient.flush(length, true);
fileClient.flushWithResponse(length, new DataLakeFileFlushOptions().setClose(true), null, Context.NONE);
}

/**
Expand All @@ -272,7 +270,7 @@ static void uploadContent(final DataLakeFileClient fileClient, final InputStream
* @return the file client of the uploaded file or {@code null} if the file already exists and conflict resolution strategy is 'ignore'
* @throws ProcessException if the file already exists and the conflict resolution strategy is 'fail'; also in case of other errors
*/
DataLakeFileClient createFile(DataLakeDirectoryClient directoryClient, final String fileName, final String conflictResolution) {
private DataLakeFileClient createFile(DataLakeDirectoryClient directoryClient, final String fileName, final String conflictResolution) {
final String destinationPath = createPath(directoryClient.getDirectoryPath(), fileName);

try {
Expand Down

0 comments on commit eb8d4ee

Please sign in to comment.