Skip to content

Commit

Permalink
Fix / Data pipeline tuning (#195)
Browse files Browse the repository at this point in the history
* Increase size of large file data round trip test

* Increase size of large file data round trip test

* Bring down the size of the elastic buffer (expose error conditions at smaller sizes)

* Fix data pump loop

* Fix backpressure on encoder / decoder implementations

* Process subscriptions synchronously in the hub stream processor
  • Loading branch information
Martin Traverse authored Oct 27, 2022
1 parent 990a091 commit 5507a89
Show file tree
Hide file tree
Showing 13 changed files with 100,065 additions and 10,033 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,18 @@ public void subscribe(Flow.Subscriber<? super T> subscriber) {
return;
}

eventLoop.execute(() -> doNewSubscription(subscriber));
// NOTE: Do not defer onSubscribe() (i.e. doNewSubscription) to the event loop

// Deferring to the event loop can cause deadlocks in the unit tests
// This is because subscriptions may not be set up until after a streaming operation starts
// It may be possible to make this watertight with a more thorough review of the hub logic
// But, calling back to onSubscribe() synchronously is common practice, and avoids the issue

// This issue manifests in test code, where the server and client are on separate event loops
// However it indicates a race condition that could occur on the platform
// To be safe, call doNewSubscription() synchronously

doNewSubscription(subscriber);
}

private void doNewSubscription(Flow.Subscriber<? super T> subscriber) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,9 @@ public DataPipeline.BufferApi dataInterface() {

@Override
public boolean isReady() {
// Buffer decoders will only ever receive on chunk, so they are always ready
return true;
}

@Override
public void pump() {

}
// Implementations must supply pump(), because BufferDecoder is not immediate
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,14 @@ protected StreamingDecoder() {
public DataPipeline.StreamApi dataInterface() {
return this;
}

@Override
public boolean isReady() {
return consumerReady();
}

@Override
public void pump() {
/* no-op, immediate stage */
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,14 @@ protected StreamingEncoder() {
public DataPipeline.ArrowApi dataInterface() {
return this;
}

@Override
public boolean isReady() {
return consumerReady();
}

@Override
public void pump() {
/* no-op, immediate stage */
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ public ArrowEncoder() {

}

@Override public boolean isReady() { return true; }
@Override public void pump() { /* no-op, immediate stage */ }

@Override
public void onStart(VectorSchemaRoot root) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ public CsvEncoder() {

}

@Override public boolean isReady() { return true; }
@Override public void pump() { /* no-op, immediate stage */ }

@Override
public void onStart(VectorSchemaRoot root) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ public JsonDecoder(BufferAllocator arrowAllocator, Schema arrowSchema) {
vector.allocateNew();
}

@Override public boolean isReady() { return true; }
@Override public void pump() { /* no-op, immediate stage */ }

@Override
public void onStart() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ public JsonEncoder() {

}

@Override public boolean isReady() { return true; }
@Override public void pump() { /* no-op, immediate stage */ }

@Override
public void onStart(VectorSchemaRoot root) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;


public class DataPipelineImpl implements DataPipeline {
Expand All @@ -39,6 +40,7 @@ public class DataPipelineImpl implements DataPipeline {

private final IDataContext ctx;
private final List<DataStage> stages;
private final AtomicBoolean pumpScheduled;
private final CompletableFuture<Void> completion;

private SourceStage sourceStage;
Expand All @@ -49,7 +51,9 @@ public DataPipelineImpl(IDataContext ctx) {

this.ctx = ctx;
this.stages = new ArrayList<>();
this.pumpScheduled = new AtomicBoolean(false);
this.completion = new CompletableFuture<>();

this.started = false;
}

Expand All @@ -68,6 +72,7 @@ public CompletionStage<Void> execute() {
sinkStage.connect();

// Schedule running the data pump on the pipeline's event loop
pumpScheduled.set(true);
ctx.eventLoopExecutor().execute(this::runDataPump);

return completion;
Expand All @@ -76,13 +81,17 @@ public CompletionStage<Void> execute() {
void pumpData() {

// Schedule running the data pump on the pipeline's event loop
ctx.eventLoopExecutor().execute(this::runDataPump);

var notAlreadyScheduled = pumpScheduled.compareAndSet(false, true);

if (notAlreadyScheduled)
ctx.eventLoopExecutor().execute(this::runDataPump);
}

private void runDataPump() {

// The data pump makes one pass over all the stages in the pipeline, starting at the back
// Any stages that are in a ready state are pumped
// Stages are pumped if their consumer is ready to accept data

// The feedback is that stages can request a pump if they have entered a ready state
// This happens by a stage calling pumpData(), to schedule another pump on the event loop
Expand All @@ -97,28 +106,26 @@ private void runDataPump() {

try {

pumpScheduled.set(false);

if (completion.isDone())
return;

var pipeReady = true;
var consumerReady = true;

for (var i = stages.size() - 1; i >= 0; i--) {

var stage = stages.get(i);

// If a stage is complete, all preceding stages must also be complete
if (stage.isDone())
continue;

var stageReady = stage.isReady();
return;

if (pipeReady && stageReady)
if (consumerReady)
stage.pump();

pipeReady = stageReady;
consumerReady = stage.isReady();
}

if (pipeReady && !sourceStage.isDone())
sourceStage.pump();
}
catch (Throwable error) {
reportUnhandledError(error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class ElasticBuffer

private final Logger log = LoggerFactory.getLogger(getClass());

private static final int QUEUE_LIMIT = 4096;
private static final int QUEUE_LIMIT = 1024;
private static final int QUEUE_SAFETY_LIMIT = 512;

// The special buffer Unpooled.EMPTY_BUFFER can be (and is) sent out by upstream components
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class DataRoundTripTest {

private static final String BASIC_CSV_DATA = SampleData.BASIC_CSV_DATA_RESOURCE;
private static final String BASIC_JSON_DATA = SampleData.BASIC_JSON_DATA_RESOURCE;
private static final String LARGE_CSV_DATA = "/large_csv_data_10000.csv";
private static final String LARGE_CSV_DATA = "/large_csv_data_100000.csv";

private static final byte[] BASIC_CSV_CONTENT = TestResourceHelpers.loadResourceAsBytes(BASIC_CSV_DATA);

Expand Down Expand Up @@ -127,7 +127,15 @@ void roundTrip_csvLarge() throws Exception {
throw new RuntimeException("Test data not found");

var testDataBytes = testDataStream.readAllBytes();
var testData = List.of(ByteString.copyFrom(testDataBytes));
var testData = new ArrayList<ByteString>();
var mb2 = 2 * 1024 * 1024;

for (var offset = 0; offset < testDataBytes.length; offset += mb2) {

var sliceSize = Math.min(mb2, testDataBytes.length - offset);
var slice = ByteString.copyFrom(testDataBytes, offset, sliceSize);
testData.add(slice);
}

var mimeType = "text/csv";

Expand Down Expand Up @@ -167,7 +175,7 @@ void roundTrip_csvLarge() throws Exception {
var integerField = roundTripData.get(1);

for (var i = 0; i < integerField.size(); i++)
Assertions.assertEquals((long) i + 1, integerField.get(i));
Assertions.assertEquals((long) i % 10000 + 1, integerField.get(i));
}
}

Expand Down
Loading

0 comments on commit 5507a89

Please sign in to comment.