Skip to content

Commit

Permalink
feat(jdbc): consume multiple times the execution and worker task resu…
Browse files Browse the repository at this point in the history
…lt queues

So that the executor use multiple threads to process messages.
  • Loading branch information
loicmathieu committed Feb 14, 2025
1 parent f8a6e3f commit 41712b8
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository;
import io.kestra.plugin.core.flow.ForEachItem;
import io.kestra.plugin.core.flow.Template;
import io.micronaut.context.annotation.Value;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.transaction.exceptions.CannotCreateTransactionException;
import io.opentelemetry.api.trace.Span;
Expand Down Expand Up @@ -173,6 +174,9 @@ public class JdbcExecutor implements ExecutorInterface, Service {
@Inject
private SLAService slaService;

@Value("${kestra.jdbc.executor.thread-count:0}")
private int threadCount;

private final Tracer tracer;

private final FlowRepositoryInterface flowRepository;
Expand Down Expand Up @@ -228,8 +232,13 @@ public void run() {

Await.until(() -> this.allFlows != null, Duration.ofMillis(100), Duration.ofMinutes(5));

this.receiveCancellations.addFirst(this.executionQueue.receive(Executor.class, this::executionQueue));
this.receiveCancellations.addFirst(this.workerTaskResultQueue.receive(Executor.class, this::workerTaskResultQueue));
// By default, we start half-available processors consumers of the execution and worker task result queue with a minimum of two.
// Other queues would not benefit from more consumers.
int numberOfThreads = threadCount != 0 ? threadCount : Math.max(2, Runtime.getRuntime().availableProcessors() / 2);
for (int i = 0; i < numberOfThreads; i++) {
this.receiveCancellations.addFirst(this.executionQueue.receive(Executor.class, this::executionQueue));
this.receiveCancellations.addFirst(this.workerTaskResultQueue.receive(Executor.class, this::workerTaskResultQueue));
}
this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, this::killQueue));
this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue));
this.receiveCancellations.addFirst(this.subflowExecutionEndQueue.receive(Executor.class, this::subflowExecutionEndQueue));
Expand Down Expand Up @@ -322,6 +331,7 @@ public void run() {
}
));
setState(ServiceState.RUNNING);
log.info("Executor started with {} thread(s)", numberOfThreads);
}

private void clusterEventQueue(Either<ClusterEvent, DeserializationException> either) {
Expand Down

0 comments on commit 41712b8

Please sign in to comment.