Skip to content

Commit

Permalink
add configuration properties for workers
Browse files Browse the repository at this point in the history
  • Loading branch information
Loic Hermann authored and Loic Hermann committed Aug 12, 2024
1 parent 383c798 commit e7461f9
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,45 @@

import jakarta.enterprise.inject.spi.CDI;

import io.quarkiverse.temporal.config.WorkerRuntimeConfig;
import io.quarkiverse.temporal.config.WorkersRuntimeConfig;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.annotations.Recorder;
import io.temporal.client.WorkflowClient;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.worker.WorkerOptions;

@Recorder
public class WorkerFactoryRecorder {

public WorkerFactoryRecorder(WorkersRuntimeConfig config) {
this.config = config;
}

final WorkersRuntimeConfig config;

public RuntimeValue<WorkerFactory> createWorkerFactory(WorkflowClient workflowClient) {
return new RuntimeValue<>(WorkerFactory.newInstance(workflowClient));
}

public void createWorker(RuntimeValue<WorkerFactory> runtimeValue, String queueName, List<Class<?>> workflows,
public WorkerOptions createWorkerOptions(WorkerRuntimeConfig config) {
if (config == null) {
return WorkerOptions.getDefaultInstance();
}
WorkerOptions.Builder builder = WorkerOptions.newBuilder();

config.maxWorkerActivitiesPerSecond().ifPresent(builder::setMaxWorkerActivitiesPerSecond);
config.maxConcurrentActivityExecutionSize().ifPresent(builder::setMaxConcurrentActivityExecutionSize);

return builder.build();
}

public void createWorker(RuntimeValue<WorkerFactory> runtimeValue, String name, List<Class<?>> workflows,
List<Class<?>> activities) {
WorkerFactory workerFactory = runtimeValue.getValue();
Worker worker = workerFactory.newWorker(queueName);
Worker worker = workerFactory.newWorker(name, createWorkerOptions(config.workers().get(name)));
for (var workflow : workflows) {
worker.registerWorkflowImplementationTypes(workflow);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.quarkiverse.temporal.config;

import java.util.Optional;

import io.quarkus.runtime.annotations.ConfigGroup;
import io.smallrye.config.WithDefault;

Expand All @@ -11,4 +13,14 @@ public interface WorkerRuntimeConfig {
*/
@WithDefault("default")
String taskQueue();

/**
* Maximum number of activities started per second by this worker. Default is 0 which means unlimited.
*/
Optional<Double> maxWorkerActivitiesPerSecond();

/**
* Maximum number of activities executed in parallel. Default is 200, which is chosen if set to zero.
*/
Optional<Integer> maxConcurrentActivityExecutionSize();
}

0 comments on commit e7461f9

Please sign in to comment.