Skip to content

Commit

Permalink
feat: add global policy for file rotation
Browse files Browse the repository at this point in the history
  • Loading branch information
lavkesh committed Nov 7, 2023
1 parent 1631230 commit 3abb74a
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public interface BlobSinkConfig extends AppConfig {
@Key("SINK_BLOB_FILE_PARTITION_PROTO_TIMESTAMP_FIELD_NAME")
String getFilePartitionProtoTimestampFieldName();

@Key("SINK_BLOB_GLOBAL_FILE_ROTATION_MAX_SIZE_BYTES")
@DefaultValue("268435456")
long getGlobalFileRotationMaxSizeBytes();


@Key("SINK_BLOB_FILE_PARTITION_TIME_TYPE")
@DefaultValue("EVENT_TIMESTAMP")
TimePartitionType getFilePartitionTimeType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.gotocompany.firehose.sink.blob.message.MessageDeSerializer;
import com.gotocompany.firehose.sink.blob.writer.WriterOrchestrator;
import com.gotocompany.firehose.sink.blob.writer.local.LocalStorage;
import com.gotocompany.firehose.sink.blob.writer.local.policy.GlobalWriterPolicy;
import com.gotocompany.firehose.sink.blob.writer.local.policy.SizeBasedRotatingPolicy;
import com.gotocompany.firehose.sink.blob.writer.local.policy.TimeBasedRotatingPolicy;
import com.gotocompany.firehose.sink.blob.writer.local.policy.WriterPolicy;
Expand Down Expand Up @@ -53,13 +54,16 @@ private static LocalStorage getLocalFileWriterWrapper(BlobSinkConfig sinkConfig,
Descriptors.Descriptor outputMessageDescriptor = stencilClient.get(sinkConfig.getInputSchemaProtoClass());
Descriptors.Descriptor metadataMessageDescriptor = getMetadataMessageDescriptor(sinkConfig);
List<WriterPolicy> writerPolicies = new ArrayList<>();
List<GlobalWriterPolicy> globalWriterPolicies = new ArrayList<>();
writerPolicies.add(new TimeBasedRotatingPolicy(sinkConfig.getLocalFileRotationDurationMS()));
writerPolicies.add(new SizeBasedRotatingPolicy(sinkConfig.getLocalFileRotationMaxSizeBytes()));
globalWriterPolicies.add(new SizeBasedRotatingPolicy(sinkConfig.getGlobalFileRotationMaxSizeBytes()));
return new LocalStorage(
sinkConfig,
outputMessageDescriptor,
metadataMessageDescriptor.getFields(),
writerPolicies,
globalWriterPolicies,
new FirehoseInstrumentation(statsDReporter, LocalStorage.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,16 @@ public LocalFileChecker(Queue<LocalFileMetadata> toBeFlushedToRemotePaths,
@Override
public void run() {
firehoseInstrumentation.captureValue(BlobStorageMetrics.LOCAL_FILE_OPEN_TOTAL, timePartitionWriterMap.size());
Map<Path, LocalFileWriter> toBeRotated =
timePartitionWriterMap.entrySet().stream().filter(kv -> localStorage.shouldRotate(kv.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Map<Path, LocalFileWriter> toBeRotated;
if (localStorage.shouldRotate(timePartitionWriterMap.values())) {
// rotate all
toBeRotated = timePartitionWriterMap.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
} else {
toBeRotated =
timePartitionWriterMap.entrySet().stream().filter(kv -> localStorage.shouldRotate(kv.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
timePartitionWriterMap.entrySet().removeAll(toBeRotated.entrySet());
toBeRotated.forEach((path, writer) -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@
import com.gotocompany.firehose.config.BlobSinkConfig;
import com.gotocompany.firehose.exception.ConfigurationException;
import com.gotocompany.firehose.metrics.FirehoseInstrumentation;
import com.gotocompany.firehose.sink.blob.writer.local.policy.GlobalWriterPolicy;
import com.gotocompany.firehose.sink.blob.writer.local.policy.WriterPolicy;
import lombok.AllArgsConstructor;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

@AllArgsConstructor
public class LocalStorage {
Expand All @@ -21,6 +25,7 @@ public class LocalStorage {
private final Descriptors.Descriptor messageDescriptor;
private final List<Descriptors.FieldDescriptor> metadataFieldDescriptor;
private final List<WriterPolicy> policies;
private final List<GlobalWriterPolicy> globalPolicies;
private final FirehoseInstrumentation firehoseInstrumentation;

public LocalFileWriter createLocalFileWriter(Path partitionPath) {
Expand Down Expand Up @@ -78,4 +83,10 @@ public void deleteLocalFile(Path... paths) throws IOException {
public Boolean shouldRotate(LocalFileWriter writer) {
return policies.stream().anyMatch(writerPolicy -> writerPolicy.shouldRotate(writer.getMetadata()));
}

public Boolean shouldRotate(Collection<LocalFileWriter> writers) {
return globalPolicies.stream().anyMatch(policy -> policy.shouldRotate(
writers.stream().map(LocalFileWriter::getMetadata).collect(Collectors.toList())
));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.gotocompany.firehose.sink.blob.writer.local.policy;

import com.gotocompany.firehose.sink.blob.writer.local.LocalFileMetadata;

import java.util.List;

public interface GlobalWriterPolicy {

boolean shouldRotate(List<LocalFileMetadata> metadata);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import com.gotocompany.firehose.sink.blob.writer.local.LocalFileMetadata;

public class SizeBasedRotatingPolicy implements WriterPolicy {
import java.util.List;

public class SizeBasedRotatingPolicy implements WriterPolicy, GlobalWriterPolicy {

private final long maxSize;

Expand All @@ -17,4 +19,10 @@ public SizeBasedRotatingPolicy(long maxSize) {
public boolean shouldRotate(LocalFileMetadata metadata) {
return metadata.getSize() >= maxSize;
}

@Override
public boolean shouldRotate(List<LocalFileMetadata> metadataList) {
long totalSize = metadataList.stream().map(LocalFileMetadata::getSize).reduce(0L, Long::sum);
return totalSize >= maxSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.gotocompany.firehose.config.BlobSinkConfig;
import com.gotocompany.firehose.metrics.FirehoseInstrumentation;
import com.gotocompany.firehose.sink.blob.Constants;
import com.gotocompany.firehose.sink.blob.writer.local.policy.GlobalWriterPolicy;
import com.gotocompany.firehose.sink.blob.writer.local.policy.WriterPolicy;
import org.junit.Test;
import org.mockito.Mockito;
Expand All @@ -20,8 +21,9 @@ public void shouldDeleteFiles() throws Exception {
BlobSinkConfig sinkConfig = Mockito.mock(BlobSinkConfig.class);
List<Descriptors.FieldDescriptor> metadataFieldDescriptor = new ArrayList<>();
List<WriterPolicy> policies = new ArrayList<>();
List<GlobalWriterPolicy> globalWriterPolicies = new ArrayList<>();
FirehoseInstrumentation firehoseInstrumentation = Mockito.mock(FirehoseInstrumentation.class);
LocalStorage storage = new LocalStorage(sinkConfig, null, metadataFieldDescriptor, policies, firehoseInstrumentation);
LocalStorage storage = new LocalStorage(sinkConfig, null, metadataFieldDescriptor, policies, globalWriterPolicies, firehoseInstrumentation);
LocalStorage spy = Mockito.spy(storage);
Mockito.doNothing().when(spy).deleteLocalFile(Paths.get("/tmp/a"), Paths.get("/tmp/.a.crc"));
Mockito.when(sinkConfig.getLocalFileWriterType()).thenReturn(Constants.WriterType.PARQUET);
Expand Down

0 comments on commit 3abb74a

Please sign in to comment.