Skip to content

Commit

Permalink
Merge pull request #30 from quarkiverse/worker_config
Browse files Browse the repository at this point in the history
add configuration properties for workers
  • Loading branch information
rmanibus authored Aug 12, 2024
2 parents 383c798 + b83e7d6 commit 12443ed
Show file tree
Hide file tree
Showing 5 changed files with 334 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import io.quarkus.runtime.configuration.ConfigurationException;
import io.quarkus.test.QuarkusUnitTest;

class TemporalMockRequiredTest {
class MockRequiredTest {

@RegisterExtension
static final QuarkusUnitTest unitTest = new QuarkusUnitTest()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.quarkiverse.temporal.deployment;

import java.time.Duration;
import java.time.temporal.ChronoUnit;

import jakarta.inject.Inject;

import org.apache.commons.lang3.reflect.FieldUtils;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkiverse.temporal.deployment.discovery.NamedSimpleActivityImpl;
import io.quarkiverse.temporal.deployment.discovery.SimpleActivity;
import io.quarkus.test.QuarkusUnitTest;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.worker.WorkerOptions;

public class WorkerNamedConfigTest {

@RegisterExtension
static final QuarkusUnitTest unitTest = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClass(SimpleActivity.class)
.addClass(NamedSimpleActivityImpl.class)
.addAsResource(
new StringAsset("quarkus.temporal.start-workers: false\n" +
"quarkus.temporal.worker.namedWorker.max-worker-activities-per-second: 7\n" +
"quarkus.temporal.worker.namedWorker.max-concurrent-activity-execution-size: 11\n" +
"quarkus.temporal.worker.namedWorker.max-concurrent-workflow-task-execution-size: 13\n" +
"quarkus.temporal.worker.namedWorker.max-concurrent-local-activity-execution-size: 17\n" +
"quarkus.temporal.worker.namedWorker.max-task-queue-activities-per-second: 19\n" +
"quarkus.temporal.worker.namedWorker.max-concurrent-workflow-task-pollers: 23\n" +
"quarkus.temporal.worker.namedWorker.max-concurrent-activity-task-pollers: 29\n" +
"quarkus.temporal.worker.namedWorker.local-activity-worker-only: true\n" +
"quarkus.temporal.worker.namedWorker.default-deadlock-detection-timeout: 31\n" +
"quarkus.temporal.worker.namedWorker.max-heartbeat-throttle-interval: 37s\n" +
"quarkus.temporal.worker.namedWorker.default-heartbeat-throttle-interval: 41s\n" +
"quarkus.temporal.worker.namedWorker.sticky-queue-schedule-to-start-timeout: 43s\n" +
"quarkus.temporal.worker.namedWorker.disable-eager-execution: true\n" +
"quarkus.temporal.worker.namedWorker.use-build-id-for-versioning: true\n" +
"quarkus.temporal.worker.namedWorker.build-id: buildId\n" +
"quarkus.temporal.worker.namedWorker.sticky-task-queue-drain-timeout: 47s\n"),
"application.properties"));

@Inject
WorkerFactory factory;

@Test
public void test() throws IllegalAccessException {
Worker worker = factory.getWorker("namedWorker");
Assertions.assertNotNull(worker);
// worker config is not visible;
WorkerOptions options = (WorkerOptions) FieldUtils.readField(worker, "options", true);
Assertions.assertEquals(7, options.getMaxWorkerActivitiesPerSecond());
Assertions.assertEquals(11, options.getMaxConcurrentActivityExecutionSize());
Assertions.assertEquals(13, options.getMaxConcurrentWorkflowTaskExecutionSize());
Assertions.assertEquals(17, options.getMaxConcurrentLocalActivityExecutionSize());
Assertions.assertEquals(19, options.getMaxTaskQueueActivitiesPerSecond());
Assertions.assertEquals(23, options.getMaxConcurrentWorkflowTaskPollers());
Assertions.assertEquals(29, options.getMaxConcurrentActivityTaskPollers());
Assertions.assertTrue(options.isLocalActivityWorkerOnly());
Assertions.assertEquals(31, options.getDefaultDeadlockDetectionTimeout());
Assertions.assertEquals(Duration.of(37, ChronoUnit.SECONDS), options.getMaxHeartbeatThrottleInterval());
Assertions.assertEquals(Duration.of(41, ChronoUnit.SECONDS), options.getDefaultHeartbeatThrottleInterval());
Assertions.assertEquals(Duration.of(43, ChronoUnit.SECONDS), options.getStickyQueueScheduleToStartTimeout());
Assertions.assertTrue(options.isEagerExecutionDisabled());
Assertions.assertTrue(options.isUsingBuildIdForVersioning());
Assertions.assertEquals("buildId", options.getBuildId());
Assertions.assertEquals(Duration.of(47, ChronoUnit.SECONDS), options.getStickyTaskQueueDrainTimeout());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.quarkiverse.temporal.deployment;

import java.time.Duration;
import java.time.temporal.ChronoUnit;

import jakarta.inject.Inject;

import org.apache.commons.lang3.reflect.FieldUtils;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkiverse.temporal.deployment.discovery.DefaultSimpleActivityImpl;
import io.quarkiverse.temporal.deployment.discovery.SimpleActivity;
import io.quarkus.test.QuarkusUnitTest;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.worker.WorkerOptions;

public class WorkerUnnamedConfigTest {

@RegisterExtension
static final QuarkusUnitTest unitTest = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClass(SimpleActivity.class)
.addClass(DefaultSimpleActivityImpl.class)
.addAsResource(
new StringAsset("quarkus.temporal.start-workers: false\n" +
"quarkus.temporal.worker.max-worker-activities-per-second: 7\n" +
"quarkus.temporal.worker.max-concurrent-activity-execution-size: 11\n" +
"quarkus.temporal.worker.max-concurrent-workflow-task-execution-size: 13\n" +
"quarkus.temporal.worker.max-concurrent-local-activity-execution-size: 17\n" +
"quarkus.temporal.worker.max-task-queue-activities-per-second: 19\n" +
"quarkus.temporal.worker.max-concurrent-workflow-task-pollers: 23\n" +
"quarkus.temporal.worker.max-concurrent-activity-task-pollers: 29\n" +
"quarkus.temporal.worker.local-activity-worker-only: true\n" +
"quarkus.temporal.worker.default-deadlock-detection-timeout: 31\n" +
"quarkus.temporal.worker.max-heartbeat-throttle-interval: 37s\n" +
"quarkus.temporal.worker.default-heartbeat-throttle-interval: 41s\n" +
"quarkus.temporal.worker.sticky-queue-schedule-to-start-timeout: 43s\n" +
"quarkus.temporal.worker.disable-eager-execution: true\n" +
"quarkus.temporal.worker.use-build-id-for-versioning: true\n" +
"quarkus.temporal.worker.build-id: buildId\n" +
"quarkus.temporal.worker.sticky-task-queue-drain-timeout: 47s\n"),
"application.properties"));

@Inject
WorkerFactory factory;

@Test
public void test() throws IllegalAccessException {
Worker worker = factory.getWorker("<default>");
Assertions.assertNotNull(worker);
// worker config is not visible;
WorkerOptions options = (WorkerOptions) FieldUtils.readField(worker, "options", true);
Assertions.assertEquals(7, options.getMaxWorkerActivitiesPerSecond());
Assertions.assertEquals(11, options.getMaxConcurrentActivityExecutionSize());
Assertions.assertEquals(13, options.getMaxConcurrentWorkflowTaskExecutionSize());
Assertions.assertEquals(17, options.getMaxConcurrentLocalActivityExecutionSize());
Assertions.assertEquals(19, options.getMaxTaskQueueActivitiesPerSecond());
Assertions.assertEquals(23, options.getMaxConcurrentWorkflowTaskPollers());
Assertions.assertEquals(29, options.getMaxConcurrentActivityTaskPollers());
Assertions.assertTrue(options.isLocalActivityWorkerOnly());
Assertions.assertEquals(31, options.getDefaultDeadlockDetectionTimeout());
Assertions.assertEquals(Duration.of(37, ChronoUnit.SECONDS), options.getMaxHeartbeatThrottleInterval());
Assertions.assertEquals(Duration.of(41, ChronoUnit.SECONDS), options.getDefaultHeartbeatThrottleInterval());
Assertions.assertEquals(Duration.of(43, ChronoUnit.SECONDS), options.getStickyQueueScheduleToStartTimeout());
Assertions.assertTrue(options.isEagerExecutionDisabled());
Assertions.assertTrue(options.isUsingBuildIdForVersioning());
Assertions.assertEquals("buildId", options.getBuildId());
Assertions.assertEquals(Duration.of(47, ChronoUnit.SECONDS), options.getStickyTaskQueueDrainTimeout());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,62 @@

import jakarta.enterprise.inject.spi.CDI;

import io.quarkiverse.temporal.config.WorkerRuntimeConfig;
import io.quarkiverse.temporal.config.WorkersRuntimeConfig;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.annotations.Recorder;
import io.temporal.client.WorkflowClient;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.worker.WorkerOptions;

@Recorder
public class WorkerFactoryRecorder {

public WorkerFactoryRecorder(WorkersRuntimeConfig config) {
this.config = config;
}

final WorkersRuntimeConfig config;

public RuntimeValue<WorkerFactory> createWorkerFactory(WorkflowClient workflowClient) {
return new RuntimeValue<>(WorkerFactory.newInstance(workflowClient));
}

public void createWorker(RuntimeValue<WorkerFactory> runtimeValue, String queueName, List<Class<?>> workflows,
public WorkerOptions createWorkerOptions(WorkerRuntimeConfig config) {
if (config == null) {
return WorkerOptions.getDefaultInstance();
}

WorkerOptions.Builder builder = WorkerOptions.newBuilder()
.setMaxWorkerActivitiesPerSecond(config.maxWorkerActivitiesPerSecond())
.setMaxConcurrentActivityExecutionSize(config.maxConcurrentActivityExecutionSize())
.setMaxConcurrentWorkflowTaskExecutionSize(config.maxConcurrentWorkflowTaskExecutionSize())
.setMaxConcurrentLocalActivityExecutionSize(config.maxConcurrentLocalActivityExecutionSize())
.setMaxTaskQueueActivitiesPerSecond(config.maxTaskQueueActivitiesPerSecond())
.setMaxConcurrentWorkflowTaskPollers(config.maxConcurrentWorkflowTaskPollers())
.setMaxConcurrentActivityTaskPollers(config.maxConcurrentActivityTaskPollers())
.setLocalActivityWorkerOnly(config.localActivityWorkerOnly())
.setDefaultDeadlockDetectionTimeout(config.defaultDeadlockDetectionTimeout())
.setMaxHeartbeatThrottleInterval(config.maxHeartbeatThrottleInterval())
.setDefaultHeartbeatThrottleInterval(config.defaultHeartbeatThrottleInterval())
.setStickyQueueScheduleToStartTimeout(config.stickyQueueScheduleToStartTimeout())
.setDisableEagerExecution(config.disableEagerExecution())
.setUseBuildIdForVersioning(config.useBuildIdForVersioning())
.setStickyTaskQueueDrainTimeout(config.stickyTaskQueueDrainTimeout());

config.buildId().ifPresent(builder::setBuildId);
config.identity().ifPresent(builder::setIdentity);

return builder.build();

}

public void createWorker(RuntimeValue<WorkerFactory> runtimeValue, String name, List<Class<?>> workflows,
List<Class<?>> activities) {
WorkerFactory workerFactory = runtimeValue.getValue();
Worker worker = workerFactory.newWorker(queueName);
Worker worker = workerFactory.newWorker(name, createWorkerOptions(config.workers().get(name)));
for (var workflow : workflows) {
worker.registerWorkflowImplementationTypes(workflow);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package io.quarkiverse.temporal.config;

import java.time.Duration;
import java.util.Optional;

import io.quarkus.runtime.annotations.ConfigGroup;
import io.quarkus.runtime.configuration.DurationConverter;
import io.smallrye.config.WithConverter;
import io.smallrye.config.WithDefault;

@ConfigGroup
Expand All @@ -11,4 +16,140 @@ public interface WorkerRuntimeConfig {
*/
@WithDefault("default")
String taskQueue();

/**
* Maximum number of activities started per second by this worker. Default is 0 which means unlimited.
*/
@WithDefault("0")
Double maxWorkerActivitiesPerSecond();

/**
* Maximum number of activities executed in parallel. Default is 200, which is chosen if set to zero.
*/
@WithDefault("200")
Integer maxConcurrentActivityExecutionSize();

/**
* Maximum number of simultaneously executed workflow tasks. Default is 200, which is chosen if set to zero.
*/
@WithDefault("200")
Integer maxConcurrentWorkflowTaskExecutionSize();

/**
* Maximum number of local activities executed in parallel. Default is 200, which is chosen if set to zero.
*/
@WithDefault("200")
Integer maxConcurrentLocalActivityExecutionSize();

/**
* Sets the rate limiting on number of activities that can be executed per second. This is managed by the server and
* controls activities per second for the entire task queue across all the workers. Notice that the number is represented in
* double, so that you can set it to less than 1 if needed. For example, set the number to 0.1 means you want your activity
* to be executed once every 10 seconds. This can be used to protect down stream services from flooding. The zero value of
* this uses the default value. Default is unlimited.
*/
@WithDefault("0")
Double maxTaskQueueActivitiesPerSecond();

/**
* Sets the maximum number of simultaneous long poll requests to the Temporal Server to retrieve workflow tasks. Changing
* this value will affect the rate at which the worker is able to consume tasks from a task queue.
* Due to internal logic where pollers alternate between sticky and non-sticky queues, this value cannot be 1 and will be
* adjusted to 2 if set to that value.
* Default is 5, which is chosen if set to zero.
*/
@WithDefault("5")
Integer maxConcurrentWorkflowTaskPollers();

/**
* Number of simultaneous poll requests on activity task queue. Consider incrementing if the worker is not throttled due to
* `MaxActivitiesPerSecond` or `MaxConcurrentActivityExecutionSize` options and still cannot keep up with the request rate.
* Default is 5, which is chosen if set to zero.
*/
@WithDefault("5")
Integer maxConcurrentActivityTaskPollers();

/**
* If set to true worker would only handle workflow tasks and local activities. Non-local activities will not be executed by
* this worker.
* Default is false.
*/
@WithDefault("false")
Boolean localActivityWorkerOnly();

/**
* time period in ms that will be used to detect workflows deadlock. Default is 1000ms, which is chosen if set to zero.
* Specifies an amount of time in milliseconds that workflow tasks are allowed to execute without interruption. If workflow
* task runs longer than specified interval without yielding (like calling an Activity), it will fail automatically.
*/
@WithDefault("1000")
Long defaultDeadlockDetectionTimeout();

/**
* the maximum amount of time between sending each pending heartbeat to the server. Regardless of heartbeat timeout, no
* pending heartbeat will wait longer than this amount of time to send. Default is 60s, which is chosen if set to null or 0.
*/
@WithDefault("60s")
@WithConverter(DurationConverter.class)
Duration maxHeartbeatThrottleInterval();

/**
* the default amount of time between sending each pending heartbeat to the server. This is used if the ActivityOptions do
* not provide a HeartbeatTimeout. Otherwise, the interval becomes a value a bit smaller than the given HeartbeatTimeout.
* Default is 30s, which is chosen if set to null or 0.
*/
@WithDefault("30s")
@WithConverter(DurationConverter.class)
Duration defaultHeartbeatThrottleInterval();

/**
* Timeout for a workflow task routed to the "sticky worker" - host that has the workflow instance cached in memory. Once it
* times out, then it can be picked up by any worker.
* Default value is 5 seconds.
*/
@WithDefault("5s")
@WithConverter(DurationConverter.class)
Duration stickyQueueScheduleToStartTimeout();

/**
* Disable eager activities. If set to true, eager execution will not be requested for activities requested from workflows
* bound to this Worker.
* Eager activity execution means the server returns requested eager activities directly from the workflow task back to this
* worker which is faster than non-eager which may be dispatched to a separate worker.
* Defaults to false, meaning that eager activity execution is permitted
*/
@WithDefault("false")
Boolean disableEagerExecution();

/**
* Opts the worker in to the Build-ID-based versioning feature. This ensures that the worker will only receive tasks which
* it is compatible with. For more information see: TODO: Doc link
* Defaults to false
*/
@WithDefault("false")
Boolean useBuildIdForVersioning();

/**
* Set a unique identifier for this worker. The identifier should be stable with respect to the code the worker uses for
* workflows, activities, and interceptors. For more information see: TODO: Doc link
* A Build Id must be set if useBuildIdForVersioning is set true.
*/
Optional<String> buildId();

/**
* During graceful shutdown, as when calling WorkerFactory. shutdown(), if the workflow cache is enabled, this timeout
* controls how long to wait for the sticky task queue to drain before shutting down the worker. If set the worker will stop
* making new poll requests on the normal task queue, but will continue to poll the sticky task queue until the timeout is
* reached. This value should always be greater than clients rpc long poll timeout, which can be set via
* WorkflowServiceStubsOptions. Builder. setRpcLongPollTimeout(Duration).
* Default is not to wait.
*/
@WithDefault("0s")
@WithConverter(DurationConverter.class)
Duration stickyTaskQueueDrainTimeout();

/**
* Override identity of the worker primary specified in a WorkflowClient options.
*/
Optional<String> identity();
}

0 comments on commit 12443ed

Please sign in to comment.