Skip to content

Commit

Permalink
[FLINK-10006][network] improve logging in BarrierBuffer: prepend owni…
Browse files Browse the repository at this point in the history
…ng task name

This closes apache#6470.
  • Loading branch information
Nico Kruber committed Aug 13, 2018
1 parent 398f464 commit ed61c3a
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public interface InputGate {

int getNumberOfInputChannels();

String getOwningTaskName();

boolean isFinished();

void requestPartitions() throws IOException, InterruptedException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,11 @@ public int getNumberOfQueuedBuffers() {
return 0;
}

@Override
public String getOwningTaskName() {
return owningTaskName;
}

// ------------------------------------------------------------------------
// Setup/Life-cycle
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ public int getNumberOfInputChannels() {
return totalNumberOfInputChannels;
}

@Override
public String getOwningTaskName() {
// all input gates have the same owning task
return inputGates[0].getOwningTaskName();
}

@Override
public boolean isFinished() {
for (InputGate inputGate : inputGates) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
}

private void completeBufferedSequence() throws IOException {
LOG.debug("Finished feeding back buffered data");
LOG.debug("{}: Finished feeding back buffered data.", inputGate.getOwningTaskName());

currentBuffered.cleanup();
currentBuffered = queuedBuffered.pollFirst();
Expand Down Expand Up @@ -247,8 +247,11 @@ private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex)
}
else if (barrierId > currentCheckpointId) {
// we did not complete the current checkpoint, another started before
LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.", barrierId, currentCheckpointId);
LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.",
inputGate.getOwningTaskName(),
barrierId,
currentCheckpointId);

// let the task know we are not completing this
notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));
Expand Down Expand Up @@ -279,8 +282,10 @@ else if (barrierId > currentCheckpointId) {
if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
// actually trigger checkpoint
if (LOG.isDebugEnabled()) {
LOG.debug("Received all barriers, triggering checkpoint {} at {}",
receivedBarrier.getId(), receivedBarrier.getTimestamp());
LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",
inputGate.getOwningTaskName(),
receivedBarrier.getId(),
receivedBarrier.getTimestamp());
}

releaseBlocksAndResetBarriers();
Expand Down Expand Up @@ -309,16 +314,21 @@ private void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) th
if (barrierId == currentCheckpointId) {
// cancel this alignment
if (LOG.isDebugEnabled()) {
LOG.debug("Checkpoint {} canceled, aborting alignment", barrierId);
LOG.debug("{}: Checkpoint {} canceled, aborting alignment.",
inputGate.getOwningTaskName(),
barrierId);
}

releaseBlocksAndResetBarriers();
notifyAbortOnCancellationBarrier(barrierId);
}
else if (barrierId > currentCheckpointId) {
// we canceled the next which also cancels the current
LOG.warn("Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.", barrierId, currentCheckpointId);
LOG.warn("{}: Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.",
inputGate.getOwningTaskName(),
barrierId,
currentCheckpointId);

// this stops the current alignment
releaseBlocksAndResetBarriers();
Expand Down Expand Up @@ -347,7 +357,9 @@ else if (barrierId > currentCheckpointId) {
latestAlignmentDurationNanos = 0L;

if (LOG.isDebugEnabled()) {
LOG.debug("Checkpoint {} canceled, skipping alignment", barrierId);
LOG.debug("{}: Checkpoint {} canceled, skipping alignment.",
inputGate.getOwningTaskName(),
barrierId);
}

notifyAbortOnCancellationBarrier(barrierId);
Expand Down Expand Up @@ -401,8 +413,10 @@ private void notifyAbort(long checkpointId, CheckpointDeclineException cause) th
private void checkSizeLimit() throws Exception {
if (maxBufferedBytes > 0 && (numQueuedBytes + bufferBlocker.getBytesBlocked()) > maxBufferedBytes) {
// exceeded our limit - abort this checkpoint
LOG.info("Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded",
currentCheckpointId, maxBufferedBytes);
LOG.info("{}: Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded.",
inputGate.getOwningTaskName(),
currentCheckpointId,
maxBufferedBytes);

releaseBlocksAndResetBarriers();
notifyAbort(currentCheckpointId, new AlignmentLimitExceededException(maxBufferedBytes));
Expand Down Expand Up @@ -444,7 +458,9 @@ private void beginNewAlignment(long checkpointId, int channelIndex) throws IOExc
startOfAlignmentTimestamp = System.nanoTime();

if (LOG.isDebugEnabled()) {
LOG.debug("Starting stream alignment for checkpoint " + checkpointId + '.');
LOG.debug("{}: Starting stream alignment for checkpoint {}.",
inputGate.getOwningTaskName(),
checkpointId);
}
}

Expand All @@ -470,7 +486,9 @@ private void onBarrier(int channelIndex) throws IOException {
numBarriersReceived++;

if (LOG.isDebugEnabled()) {
LOG.debug("Received barrier from channel " + channelIndex);
LOG.debug("{}: Received barrier from channel {}.",
inputGate.getOwningTaskName(),
channelIndex);
}
}
else {
Expand All @@ -483,7 +501,8 @@ private void onBarrier(int channelIndex) throws IOException {
* Makes sure the just written data is the next to be consumed.
*/
private void releaseBlocksAndResetBarriers() throws IOException {
LOG.debug("End of stream alignment, feeding buffered data back");
LOG.debug("{}: End of stream alignment, feeding buffered data back.",
inputGate.getOwningTaskName());

for (int i = 0; i < blockedChannels.length; i++) {
blockedChannels[i] = false;
Expand All @@ -499,8 +518,9 @@ private void releaseBlocksAndResetBarriers() throws IOException {
else {
// uncommon case: buffered data pending
// push back the pending data, if we have any
LOG.debug("Checkpoint skipped via buffered data:" +
"Pushing back current alignment buffers and feeding back new alignment data first.");
LOG.debug("{}: Checkpoint skipped via buffered data:" +
"Pushing back current alignment buffers and feeding back new alignment data first.",
inputGate.getOwningTaskName());

// since we did not fully drain the previous sequence, we need to allocate a new buffer for this one
BufferOrEventSequence bufferedNow = bufferBlocker.rollOverWithoutReusingResources();
Expand All @@ -513,8 +533,9 @@ private void releaseBlocksAndResetBarriers() throws IOException {
}

if (LOG.isDebugEnabled()) {
LOG.debug("Size of buffered data: {} bytes",
currentBuffered == null ? 0L : currentBuffered.size());
LOG.debug("{}: Size of buffered data: {} bytes",
inputGate.getOwningTaskName(),
currentBuffered == null ? 0L : currentBuffered.size());
}

// the next barrier that comes must assume it is the first
Expand Down Expand Up @@ -555,7 +576,10 @@ public long getAlignmentDurationNanos() {

@Override
public String toString() {
return String.format("last checkpoint: %d, current barriers: %d, closed channels: %d",
currentCheckpointId, numBarriersReceived, numClosedChannels);
return String.format("%s: last checkpoint: %d, current barriers: %d, closed channels: %d",
inputGate.getOwningTaskName(),
currentCheckpointId,
numBarriersReceived,
numClosedChannels);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,18 +139,30 @@ private static class RandomGeneratingInputGate implements InputGate {
private int currentChannel = 0;
private long c = 0;

private final String owningTaskName;

public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
this(bufferPools, barrierGens, "TestTask");
}

public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens, String owningTaskName) {
this.numChannels = bufferPools.length;
this.currentBarriers = new int[numChannels];
this.bufferPools = bufferPools;
this.barrierGens = barrierGens;
this.owningTaskName = owningTaskName;
}

@Override
public int getNumberOfInputChannels() {
return numChannels;
}

@Override
public String getOwningTaskName() {
return owningTaskName;
}

@Override
public boolean isFinished() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,18 @@ public class MockInputGate implements InputGate {

private int closedChannels;

private final String owningTaskName;

public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> bufferOrEvents) {
this(pageSize, numChannels, bufferOrEvents, "MockTask");
}

public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> bufferOrEvents, String owningTaskName) {
this.pageSize = pageSize;
this.numChannels = numChannels;
this.bufferOrEvents = new ArrayDeque<BufferOrEvent>(bufferOrEvents);
this.closed = new boolean[numChannels];
this.owningTaskName = owningTaskName;
}

@Override
Expand All @@ -61,6 +68,11 @@ public int getNumberOfInputChannels() {
return numChannels;
}

@Override
public String getOwningTaskName() {
return owningTaskName;
}

@Override
public boolean isFinished() {
return bufferOrEvents.isEmpty();
Expand Down

0 comments on commit ed61c3a

Please sign in to comment.