From e7461f9a26d71cbea1569fd58bef291a0c929f9b Mon Sep 17 00:00:00 2001 From: Loic Hermann Date: Mon, 12 Aug 2024 16:13:34 -0400 Subject: [PATCH] add configuration properties for workers --- .../temporal/WorkerFactoryRecorder.java | 25 +++++++++++++++++-- .../temporal/config/WorkerRuntimeConfig.java | 12 +++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/extension/runtime/src/main/java/io/quarkiverse/temporal/WorkerFactoryRecorder.java b/extension/runtime/src/main/java/io/quarkiverse/temporal/WorkerFactoryRecorder.java index fa054cb..26b882f 100644 --- a/extension/runtime/src/main/java/io/quarkiverse/temporal/WorkerFactoryRecorder.java +++ b/extension/runtime/src/main/java/io/quarkiverse/temporal/WorkerFactoryRecorder.java @@ -4,24 +4,45 @@ import jakarta.enterprise.inject.spi.CDI; +import io.quarkiverse.temporal.config.WorkerRuntimeConfig; +import io.quarkiverse.temporal.config.WorkersRuntimeConfig; import io.quarkus.runtime.RuntimeValue; import io.quarkus.runtime.ShutdownContext; import io.quarkus.runtime.annotations.Recorder; import io.temporal.client.WorkflowClient; import io.temporal.worker.Worker; import io.temporal.worker.WorkerFactory; +import io.temporal.worker.WorkerOptions; @Recorder public class WorkerFactoryRecorder { + public WorkerFactoryRecorder(WorkersRuntimeConfig config) { + this.config = config; + } + + final WorkersRuntimeConfig config; + public RuntimeValue createWorkerFactory(WorkflowClient workflowClient) { return new RuntimeValue<>(WorkerFactory.newInstance(workflowClient)); } - public void createWorker(RuntimeValue runtimeValue, String queueName, List> workflows, + public WorkerOptions createWorkerOptions(WorkerRuntimeConfig config) { + if (config == null) { + return WorkerOptions.getDefaultInstance(); + } + WorkerOptions.Builder builder = WorkerOptions.newBuilder(); + + config.maxWorkerActivitiesPerSecond().ifPresent(builder::setMaxWorkerActivitiesPerSecond); + config.maxConcurrentActivityExecutionSize().ifPresent(builder::setMaxConcurrentActivityExecutionSize); + + return builder.build(); + } + + public void createWorker(RuntimeValue runtimeValue, String name, List> workflows, List> activities) { WorkerFactory workerFactory = runtimeValue.getValue(); - Worker worker = workerFactory.newWorker(queueName); + Worker worker = workerFactory.newWorker(name, createWorkerOptions(config.workers().get(name))); for (var workflow : workflows) { worker.registerWorkflowImplementationTypes(workflow); } diff --git a/extension/runtime/src/main/java/io/quarkiverse/temporal/config/WorkerRuntimeConfig.java b/extension/runtime/src/main/java/io/quarkiverse/temporal/config/WorkerRuntimeConfig.java index 2b2c5e8..055256a 100644 --- a/extension/runtime/src/main/java/io/quarkiverse/temporal/config/WorkerRuntimeConfig.java +++ b/extension/runtime/src/main/java/io/quarkiverse/temporal/config/WorkerRuntimeConfig.java @@ -1,5 +1,7 @@ package io.quarkiverse.temporal.config; +import java.util.Optional; + import io.quarkus.runtime.annotations.ConfigGroup; import io.smallrye.config.WithDefault; @@ -11,4 +13,14 @@ public interface WorkerRuntimeConfig { */ @WithDefault("default") String taskQueue(); + + /** + * Maximum number of activities started per second by this worker. Default is 0 which means unlimited. + */ + Optional maxWorkerActivitiesPerSecond(); + + /** + * Maximum number of activities executed in parallel. Default is 200, which is chosen if set to zero. + */ + Optional maxConcurrentActivityExecutionSize(); }