Skip to content

Commit

Permalink
Support append mode to file sink
Browse files Browse the repository at this point in the history
Signed-off-by: Kondaka <[email protected]>
  • Loading branch information
kkondaka committed Nov 29, 2023
1 parent 9449f24 commit 4141fac
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.StandardOpenOption;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collection;
Expand All @@ -41,6 +42,8 @@ public class FileSink implements Sink<Record<Object>> {
private boolean initialized;
private final String tagsTargetKey;

private final boolean appendMode;

/**
* Mandatory constructor for Data Prepper Component - This constructor is used by Data Prepper
* runtime engine to construct an instance of {@link FileSink} using an instance of {@link PluginSetting} which
Expand All @@ -54,6 +57,7 @@ public class FileSink implements Sink<Record<Object>> {
public FileSink(final FileSinkConfig fileSinkConfig, final SinkContext sinkContext) {
this.outputFilePath = fileSinkConfig.getPath();
isStopRequested = false;
this.appendMode = fileSinkConfig.getAppendMode();
initialized = false;
lock = new ReentrantLock(true);
tagsTargetKey = Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null;
Expand Down Expand Up @@ -117,7 +121,13 @@ public void shutdown() {
@Override
public void initialize() {
try {
writer = Files.newBufferedWriter(Paths.get(outputFilePath), StandardCharsets.UTF_8);
if (appendMode) {


writer = Files.newBufferedWriter(Paths.get(outputFilePath), StandardCharsets.UTF_8, StandardOpenOption.APPEND, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
} else {
writer = Files.newBufferedWriter(Paths.get(outputFilePath), StandardCharsets.UTF_8);
}
} catch (final IOException ex) {
throw new RuntimeException(format("Encountered exception opening/creating file %s", outputFilePath), ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@ public class FileSinkConfig {
@NotEmpty
private String path = "src/resources/file-test-sample-output.txt";

@JsonProperty("append")
private boolean appendMode = false;

public String getPath() {
return path;
}

public boolean getAppendMode() {
return appendMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ class FileSinkTests {
// TODO: remove with the completion of: https://github.com/opensearch-project/data-prepper/issues/546
private final List<Record<Object>> TEST_STRING_RECORDS = Arrays.asList(TEST_STRING_RECORD_1, TEST_STRING_RECORD_2);
private List<Record<Object>> TEST_RECORDS;

private Record<Object> TEST_RECORD1;

private Record<Object> TEST_RECORD2;

private FileSinkConfig fileSinkConfig;
private SinkContext sinkContext;

Expand All @@ -60,12 +65,14 @@ void setUp() throws IOException {
.withData(Map.of(TEST_KEY, TEST_DATA_1))
.build();
event.getMetadata().addTags(List.of(tagStr1, tagStr2));
TEST_RECORDS.add(new Record<>(event));
TEST_RECORD1 = new Record<>(event);
TEST_RECORDS.add(TEST_RECORD1);
event = JacksonEvent.builder()
.withEventType("event")
.withData(Map.of(TEST_KEY, TEST_DATA_2))
.build();
TEST_RECORDS.add(new Record<>(event));
TEST_RECORD2 = new Record<>(event);
TEST_RECORDS.add(TEST_RECORD2);
}

private FileSink createObjectUnderTest() {
Expand Down Expand Up @@ -108,6 +115,55 @@ void testValidFilePathStringRecord() throws IOException {
Assertions.assertTrue(outputData.contains(TEST_DATA_2));
}

@Test
void testValidFilePathStringRecordInAppendMode() throws IOException {
when(sinkContext.getTagsTargetKey()).thenReturn(null);
when(fileSinkConfig.getAppendMode()).thenReturn(true);
FileSink fileSink = createObjectUnderTest();
fileSink.initialize();

Assertions.assertTrue(fileSink.isReady());
fileSink.output(List.of(TEST_RECORD1));
fileSink.shutdown();
String outputData = readDocFromFile(TEST_OUTPUT_FILE);
Assertions.assertTrue(outputData.contains(TEST_DATA_1));
Assertions.assertFalse(outputData.contains(TEST_DATA_2));

fileSink = createObjectUnderTest();
fileSink.initialize();
Assertions.assertTrue(fileSink.isReady());
fileSink.output(List.of(TEST_RECORD2));
fileSink.shutdown();

outputData = readDocFromFile(TEST_OUTPUT_FILE);
Assertions.assertTrue(outputData.contains(TEST_DATA_1));
Assertions.assertTrue(outputData.contains(TEST_DATA_2));
}

@Test
void testValidFilePathStringRecordInAppendModeFalse() throws IOException {
when(sinkContext.getTagsTargetKey()).thenReturn(null);
FileSink fileSink = createObjectUnderTest();
fileSink.initialize();

Assertions.assertTrue(fileSink.isReady());
fileSink.output(List.of(TEST_RECORD1));
fileSink.shutdown();
String outputData = readDocFromFile(TEST_OUTPUT_FILE);
Assertions.assertTrue(outputData.contains(TEST_DATA_1));
Assertions.assertFalse(outputData.contains(TEST_DATA_2));

fileSink = createObjectUnderTest();
fileSink.initialize();
Assertions.assertTrue(fileSink.isReady());
fileSink.output(List.of(TEST_RECORD2));
fileSink.shutdown();

outputData = readDocFromFile(TEST_OUTPUT_FILE);
Assertions.assertFalse(outputData.contains(TEST_DATA_1));
Assertions.assertTrue(outputData.contains(TEST_DATA_2));
}

@Test
void testValidFilePathStringRecord_EventsWithTags() throws IOException {
when(sinkContext.getTagsTargetKey()).thenReturn("tags");
Expand Down

0 comments on commit 4141fac

Please sign in to comment.