diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java index 20934df53ad..c3dbbf4a3d0 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java @@ -37,6 +37,7 @@ import io.kestra.jdbc.repository.AbstractJdbcWorkerJobRunningRepository; import io.kestra.plugin.core.flow.ForEachItem; import io.kestra.plugin.core.flow.Template; +import io.micronaut.context.annotation.Value; import io.micronaut.context.event.ApplicationEventPublisher; import io.micronaut.transaction.exceptions.CannotCreateTransactionException; import io.opentelemetry.api.trace.Span; @@ -173,6 +174,9 @@ public class JdbcExecutor implements ExecutorInterface, Service { @Inject private SLAService slaService; + @Value("${kestra.jdbc.executor.thread-count:0}") + private int threadCount; + private final Tracer tracer; private final FlowRepositoryInterface flowRepository; @@ -228,8 +232,13 @@ public void run() { Await.until(() -> this.allFlows != null, Duration.ofMillis(100), Duration.ofMinutes(5)); - this.receiveCancellations.addFirst(this.executionQueue.receive(Executor.class, this::executionQueue)); - this.receiveCancellations.addFirst(this.workerTaskResultQueue.receive(Executor.class, this::workerTaskResultQueue)); + // By default, we start half-available processors consumers of the execution and worker task result queue with a minimum of two. + // Other queues would not benefit from more consumers. + int numberOfThreads = threadCount != 0 ? threadCount : Math.max(2, Runtime.getRuntime().availableProcessors() / 2); + for (int i = 0; i < numberOfThreads; i++) { + this.receiveCancellations.addFirst(this.executionQueue.receive(Executor.class, this::executionQueue)); + this.receiveCancellations.addFirst(this.workerTaskResultQueue.receive(Executor.class, this::workerTaskResultQueue)); + } this.receiveCancellations.addFirst(this.killQueue.receive(Executor.class, this::killQueue)); this.receiveCancellations.addFirst(this.subflowExecutionResultQueue.receive(Executor.class, this::subflowExecutionResultQueue)); this.receiveCancellations.addFirst(this.subflowExecutionEndQueue.receive(Executor.class, this::subflowExecutionEndQueue)); @@ -322,6 +331,7 @@ public void run() { } )); setState(ServiceState.RUNNING); + log.info("Executor started with {} thread(s)", numberOfThreads); } private void clusterEventQueue(Either either) {