Skip to content

Commit

Permalink
fix(tests): added debug information in case of WorkerTest.failOnWorke…
Browse files Browse the repository at this point in the history
…rTaskWithFlowable test failure
  • Loading branch information
brian-mulier-p committed Dec 22, 2023
1 parent 3f8285a commit c985c31
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
10 changes: 9 additions & 1 deletion core/src/main/java/io/kestra/core/utils/Await.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,22 @@ public static void until(BooleanSupplier condition, Duration sleep) {
}

public static void until(BooleanSupplier condition, Duration sleep, Duration timeout) throws TimeoutException {
until(null, condition, sleep, timeout);
}

public static void until(Supplier<String> errorMessageInCaseOfFailure, BooleanSupplier condition, Duration sleep, Duration timeout) throws TimeoutException {
if (sleep == null) {
sleep = defaultSleep;
}

long start = System.currentTimeMillis();
while (!condition.getAsBoolean()) {
if (System.currentTimeMillis() - start > timeout.toMillis()) {
throw new TimeoutException(String.format("Await failed to terminate within %s", timeout));
throw new TimeoutException(String.format(
"Await failed to terminate within %s.%s",
timeout,
errorMessageInCaseOfFailure == null ? "" : " " + errorMessageInCaseOfFailure.get()
));
} else {
try {
Thread.sleep(sleep.toMillis());
Expand Down
9 changes: 8 additions & 1 deletion core/src/test/java/io/kestra/core/runners/WorkerTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.core.runners;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionKilled;
Expand All @@ -10,6 +11,7 @@
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.tasks.flows.Pause;
import io.kestra.core.tasks.flows.WorkingDirectory;
import io.kestra.core.tasks.test.Sleep;
Expand All @@ -30,6 +32,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import static io.kestra.core.utils.Rethrow.throwSupplier;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -84,7 +87,7 @@ void workerGroup() {
}

@Test
void failOnWorkerTaskWithFlowable() throws TimeoutException {
void failOnWorkerTaskWithFlowable() throws TimeoutException, JsonProcessingException {
Worker worker = new Worker(applicationContext, 8, null);
worker.run();

Expand Down Expand Up @@ -122,6 +125,10 @@ void failOnWorkerTaskWithFlowable() throws TimeoutException {
workerTaskQueue.emit(workerTask);

Await.until(
throwSupplier(() -> {
WorkerTaskResult taskResult = workerTaskResult.get();
return "WorkerTaskResult was " + (taskResult == null ? null : JacksonMapper.ofJson().writeValueAsString(taskResult));
}),
() -> workerTaskResult.get() != null && workerTaskResult.get().getTaskRun().getState().isFailed(),
Duration.ofMillis(100),
Duration.ofMinutes(1)
Expand Down

0 comments on commit c985c31

Please sign in to comment.