From 3abb74a95065f4c6d770fc92aa757e0ebf77e96a Mon Sep 17 00:00:00 2001 From: lavkesh Date: Tue, 7 Nov 2023 15:24:00 +0800 Subject: [PATCH] feat: add global policy for file rotation --- .../gotocompany/firehose/config/BlobSinkConfig.java | 5 +++++ .../firehose/sink/blob/BlobSinkFactory.java | 4 ++++ .../sink/blob/writer/local/LocalFileChecker.java | 13 ++++++++++--- .../sink/blob/writer/local/LocalStorage.java | 11 +++++++++++ .../writer/local/policy/GlobalWriterPolicy.java | 10 ++++++++++ .../local/policy/SizeBasedRotatingPolicy.java | 10 +++++++++- .../sink/blob/writer/local/LocalStorageTest.java | 4 +++- 7 files changed, 52 insertions(+), 5 deletions(-) create mode 100644 src/main/java/com/gotocompany/firehose/sink/blob/writer/local/policy/GlobalWriterPolicy.java diff --git a/src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java b/src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java index 34288e274..e29dd5a17 100644 --- a/src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java +++ b/src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java @@ -50,6 +50,11 @@ public interface BlobSinkConfig extends AppConfig { @Key("SINK_BLOB_FILE_PARTITION_PROTO_TIMESTAMP_FIELD_NAME") String getFilePartitionProtoTimestampFieldName(); + @Key("SINK_BLOB_GLOBAL_FILE_ROTATION_MAX_SIZE_BYTES") + @DefaultValue("268435456") + long getGlobalFileRotationMaxSizeBytes(); + + @Key("SINK_BLOB_FILE_PARTITION_TIME_TYPE") @DefaultValue("EVENT_TIMESTAMP") TimePartitionType getFilePartitionTimeType(); diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/BlobSinkFactory.java b/src/main/java/com/gotocompany/firehose/sink/blob/BlobSinkFactory.java index 0f52b7392..502f8f811 100644 --- a/src/main/java/com/gotocompany/firehose/sink/blob/BlobSinkFactory.java +++ b/src/main/java/com/gotocompany/firehose/sink/blob/BlobSinkFactory.java @@ -7,6 +7,7 @@ import com.gotocompany.firehose.sink.blob.message.MessageDeSerializer; import com.gotocompany.firehose.sink.blob.writer.WriterOrchestrator; import com.gotocompany.firehose.sink.blob.writer.local.LocalStorage; +import com.gotocompany.firehose.sink.blob.writer.local.policy.GlobalWriterPolicy; import com.gotocompany.firehose.sink.blob.writer.local.policy.SizeBasedRotatingPolicy; import com.gotocompany.firehose.sink.blob.writer.local.policy.TimeBasedRotatingPolicy; import com.gotocompany.firehose.sink.blob.writer.local.policy.WriterPolicy; @@ -53,13 +54,16 @@ private static LocalStorage getLocalFileWriterWrapper(BlobSinkConfig sinkConfig, Descriptors.Descriptor outputMessageDescriptor = stencilClient.get(sinkConfig.getInputSchemaProtoClass()); Descriptors.Descriptor metadataMessageDescriptor = getMetadataMessageDescriptor(sinkConfig); List writerPolicies = new ArrayList<>(); + List globalWriterPolicies = new ArrayList<>(); writerPolicies.add(new TimeBasedRotatingPolicy(sinkConfig.getLocalFileRotationDurationMS())); writerPolicies.add(new SizeBasedRotatingPolicy(sinkConfig.getLocalFileRotationMaxSizeBytes())); + globalWriterPolicies.add(new SizeBasedRotatingPolicy(sinkConfig.getGlobalFileRotationMaxSizeBytes())); return new LocalStorage( sinkConfig, outputMessageDescriptor, metadataMessageDescriptor.getFields(), writerPolicies, + globalWriterPolicies, new FirehoseInstrumentation(statsDReporter, LocalStorage.class)); } diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/LocalFileChecker.java b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/LocalFileChecker.java index ddefbf81a..a0a8498b6 100644 --- a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/LocalFileChecker.java +++ b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/LocalFileChecker.java @@ -31,9 +31,16 @@ public LocalFileChecker(Queue toBeFlushedToRemotePaths, @Override public void run() { firehoseInstrumentation.captureValue(BlobStorageMetrics.LOCAL_FILE_OPEN_TOTAL, timePartitionWriterMap.size()); - Map toBeRotated = - timePartitionWriterMap.entrySet().stream().filter(kv -> localStorage.shouldRotate(kv.getValue())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Map toBeRotated; + if (localStorage.shouldRotate(timePartitionWriterMap.values())) { + // rotate all + toBeRotated = timePartitionWriterMap.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } else { + toBeRotated = + timePartitionWriterMap.entrySet().stream().filter(kv -> localStorage.shouldRotate(kv.getValue())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } timePartitionWriterMap.entrySet().removeAll(toBeRotated.entrySet()); toBeRotated.forEach((path, writer) -> { try { diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/LocalStorage.java b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/LocalStorage.java index 2552df33f..533ca0220 100644 --- a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/LocalStorage.java +++ b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/LocalStorage.java @@ -4,6 +4,7 @@ import com.gotocompany.firehose.config.BlobSinkConfig; import com.gotocompany.firehose.exception.ConfigurationException; import com.gotocompany.firehose.metrics.FirehoseInstrumentation; +import com.gotocompany.firehose.sink.blob.writer.local.policy.GlobalWriterPolicy; import com.gotocompany.firehose.sink.blob.writer.local.policy.WriterPolicy; import lombok.AllArgsConstructor; @@ -11,8 +12,11 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Collection; import java.util.List; +import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; @AllArgsConstructor public class LocalStorage { @@ -21,6 +25,7 @@ public class LocalStorage { private final Descriptors.Descriptor messageDescriptor; private final List metadataFieldDescriptor; private final List policies; + private final List globalPolicies; private final FirehoseInstrumentation firehoseInstrumentation; public LocalFileWriter createLocalFileWriter(Path partitionPath) { @@ -78,4 +83,10 @@ public void deleteLocalFile(Path... paths) throws IOException { public Boolean shouldRotate(LocalFileWriter writer) { return policies.stream().anyMatch(writerPolicy -> writerPolicy.shouldRotate(writer.getMetadata())); } + + public Boolean shouldRotate(Collection writers) { + return globalPolicies.stream().anyMatch(policy -> policy.shouldRotate( + writers.stream().map(LocalFileWriter::getMetadata).collect(Collectors.toList()) + )); + } } diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/policy/GlobalWriterPolicy.java b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/policy/GlobalWriterPolicy.java new file mode 100644 index 000000000..f58f5751f --- /dev/null +++ b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/policy/GlobalWriterPolicy.java @@ -0,0 +1,10 @@ +package com.gotocompany.firehose.sink.blob.writer.local.policy; + +import com.gotocompany.firehose.sink.blob.writer.local.LocalFileMetadata; + +import java.util.List; + +public interface GlobalWriterPolicy { + + boolean shouldRotate(List metadata); +} diff --git a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/policy/SizeBasedRotatingPolicy.java b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/policy/SizeBasedRotatingPolicy.java index 289d91e64..f3a5938d8 100644 --- a/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/policy/SizeBasedRotatingPolicy.java +++ b/src/main/java/com/gotocompany/firehose/sink/blob/writer/local/policy/SizeBasedRotatingPolicy.java @@ -2,7 +2,9 @@ import com.gotocompany.firehose.sink.blob.writer.local.LocalFileMetadata; -public class SizeBasedRotatingPolicy implements WriterPolicy { +import java.util.List; + +public class SizeBasedRotatingPolicy implements WriterPolicy, GlobalWriterPolicy { private final long maxSize; @@ -17,4 +19,10 @@ public SizeBasedRotatingPolicy(long maxSize) { public boolean shouldRotate(LocalFileMetadata metadata) { return metadata.getSize() >= maxSize; } + + @Override + public boolean shouldRotate(List metadataList) { + long totalSize = metadataList.stream().map(LocalFileMetadata::getSize).reduce(0L, Long::sum); + return totalSize >= maxSize; + } } diff --git a/src/test/java/com/gotocompany/firehose/sink/blob/writer/local/LocalStorageTest.java b/src/test/java/com/gotocompany/firehose/sink/blob/writer/local/LocalStorageTest.java index 411089ffa..87bbb725d 100644 --- a/src/test/java/com/gotocompany/firehose/sink/blob/writer/local/LocalStorageTest.java +++ b/src/test/java/com/gotocompany/firehose/sink/blob/writer/local/LocalStorageTest.java @@ -5,6 +5,7 @@ import com.gotocompany.firehose.config.BlobSinkConfig; import com.gotocompany.firehose.metrics.FirehoseInstrumentation; import com.gotocompany.firehose.sink.blob.Constants; +import com.gotocompany.firehose.sink.blob.writer.local.policy.GlobalWriterPolicy; import com.gotocompany.firehose.sink.blob.writer.local.policy.WriterPolicy; import org.junit.Test; import org.mockito.Mockito; @@ -20,8 +21,9 @@ public void shouldDeleteFiles() throws Exception { BlobSinkConfig sinkConfig = Mockito.mock(BlobSinkConfig.class); List metadataFieldDescriptor = new ArrayList<>(); List policies = new ArrayList<>(); + List globalWriterPolicies = new ArrayList<>(); FirehoseInstrumentation firehoseInstrumentation = Mockito.mock(FirehoseInstrumentation.class); - LocalStorage storage = new LocalStorage(sinkConfig, null, metadataFieldDescriptor, policies, firehoseInstrumentation); + LocalStorage storage = new LocalStorage(sinkConfig, null, metadataFieldDescriptor, policies, globalWriterPolicies, firehoseInstrumentation); LocalStorage spy = Mockito.spy(storage); Mockito.doNothing().when(spy).deleteLocalFile(Paths.get("/tmp/a"), Paths.get("/tmp/.a.crc")); Mockito.when(sinkConfig.getLocalFileWriterType()).thenReturn(Constants.WriterType.PARQUET);