Skip to content

Commit

Permalink
updating ActorRuntime
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Kaps <[email protected]>
  • Loading branch information
salaboy authored and akkie committed Feb 7, 2025
1 parent fcf3c1e commit ea8222e
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.dapr.spring.boot.autoconfigure.client;

import io.dapr.actors.client.ActorClient;
import io.dapr.actors.runtime.ActorRuntime;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.config.Properties;
Expand Down Expand Up @@ -78,6 +79,13 @@ ActorClient daprActorClient(DaprConnectionDetails daprConnectionDetails) {
return new ActorClient(properties);
}

@Bean
@ConditionalOnMissingBean
ActorRuntime daprActorRuntime(DaprConnectionDetails daprConnectionDetails) {
Properties properties = createPropertiesFromConnectionDetails(daprConnectionDetails);
return ActorRuntime.getInstance(properties);
}

@Bean
@ConditionalOnMissingBean
WorkflowRuntimeBuilder daprWorkflowRuntimeBuilder(DaprConnectionDetails daprConnectionDetails) {
Expand Down
103 changes: 55 additions & 48 deletions sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
import io.dapr.config.Properties;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.Version;
import io.dapr.utils.NetworkUtils;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import reactor.core.publisher.Mono;

import java.io.Closeable;
Expand Down Expand Up @@ -80,23 +79,32 @@ public class ActorRuntime implements Closeable {
* @throws IllegalStateException If cannot instantiate Runtime.
*/
private ActorRuntime() throws IllegalStateException {
this(buildManagedChannel());
this(new Properties());
}

/**
* The default constructor. This should not be called directly.
*
* @throws IllegalStateException If cannot instantiate Runtime.
*/
private ActorRuntime(Properties properties) throws IllegalStateException {
this(NetworkUtils.buildGrpcManagedChannel(properties));
}

/**
* Constructor once channel is available. This should not be called directly.
*
* @param channel GRPC managed channel to be closed (or null).
* @throws IllegalStateException If cannot instantiate Runtime.
* @throws IllegalStateException If you cannot instantiate Runtime.
*/
private ActorRuntime(ManagedChannel channel) throws IllegalStateException {
this(channel, buildDaprClient(channel));
this(channel, new DaprClientImpl(channel));
}

/**
* Constructor with dependency injection, useful for testing. This should not be called directly.
*
* @param channel GRPC managed channel to be closed (or null).
* @param channel GRPC managed channel to be closed (or null).
* @param daprClient Client to communicate with Dapr.
* @throws IllegalStateException If class has one instance already.
*/
Expand Down Expand Up @@ -128,6 +136,24 @@ public static ActorRuntime getInstance() {
return instance;
}

/**
* Returns an ActorRuntime object.
*
* @param properties Properties to be used for the runtime.
* @return An ActorRuntime object.
*/
public static ActorRuntime getInstance(Properties properties) {
if (instance == null) {
synchronized (ActorRuntime.class) {
if (instance == null) {
instance = new ActorRuntime(properties);
}
}
}

return instance;
}

/**
* Gets the Actor configuration for this runtime.
*
Expand All @@ -149,24 +175,22 @@ public byte[] serializeConfig() throws IOException {

/**
* Registers an actor with the runtime, using {@link DefaultObjectSerializer} and {@link DefaultActorFactory}.
*
* {@link DefaultObjectSerializer} is not recommended for production scenarios.
*
* @param clazz The type of actor.
* @param <T> Actor class type.
* @param clazz The type of actor.
* @param <T> Actor class type.
*/
public <T extends AbstractActor> void registerActor(Class<T> clazz) {
registerActor(clazz, new DefaultObjectSerializer(), new DefaultObjectSerializer());
}

/**
* Registers an actor with the runtime, using {@link DefaultObjectSerializer}.
*
* {@link DefaultObjectSerializer} is not recommended for production scenarios.
*
* @param clazz The type of actor.
* @param actorFactory An optional factory to create actors. This can be used for dependency injection.
* @param <T> Actor class type.
* @param clazz The type of actor.
* @param actorFactory An optional factory to create actors. This can be used for dependency injection.
* @param <T> Actor class type.
*/
public <T extends AbstractActor> void registerActor(Class<T> clazz, ActorFactory<T> actorFactory) {
registerActor(clazz, actorFactory, new DefaultObjectSerializer(), new DefaultObjectSerializer());
Expand All @@ -181,8 +205,8 @@ public <T extends AbstractActor> void registerActor(Class<T> clazz, ActorFactory
* @param <T> Actor class type.
*/
public <T extends AbstractActor> void registerActor(
Class<T> clazz, DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer) {
registerActor(clazz, new DefaultActorFactory<T>(), objectSerializer, stateSerializer);
Class<T> clazz, DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer) {
registerActor(clazz, new DefaultActorFactory<T>(), objectSerializer, stateSerializer);
}

/**
Expand All @@ -195,9 +219,9 @@ public <T extends AbstractActor> void registerActor(
* @param <T> Actor class type.
*/
public <T extends AbstractActor> void registerActor(
Class<T> clazz, ActorFactory<T> actorFactory,
DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer) {
Class<T> clazz, ActorFactory<T> actorFactory,
DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer) {
if (clazz == null) {
throw new IllegalArgumentException("Class is required.");
}
Expand All @@ -216,12 +240,12 @@ public <T extends AbstractActor> void registerActor(
// Create ActorManager, if not yet registered.
this.actorManagers.computeIfAbsent(actorTypeInfo.getName(), (k) -> {
ActorRuntimeContext<T> context = new ActorRuntimeContext<>(
this,
objectSerializer,
actorFactory,
actorTypeInfo,
this.daprClient,
new DaprStateAsyncProvider(this.daprClient, stateSerializer));
this,
objectSerializer,
actorFactory,
actorTypeInfo,
this.daprClient,
new DaprStateAsyncProvider(this.daprClient, stateSerializer));
this.config.addRegisteredActorType(actorTypeInfo.getName());
return new ActorManager<T>(context);
});
Expand All @@ -236,7 +260,7 @@ public <T extends AbstractActor> void registerActor(
*/
public Mono<Void> deactivate(String actorTypeName, String actorId) {
return Mono.fromSupplier(() -> this.getActorManager(actorTypeName))
.flatMap(m -> m.deactivateActor(new ActorId(actorId)));
.flatMap(m -> m.deactivateActor(new ActorId(actorId)));
}

/**
Expand All @@ -252,8 +276,8 @@ public Mono<Void> deactivate(String actorTypeName, String actorId) {
public Mono<byte[]> invoke(String actorTypeName, String actorId, String actorMethodName, byte[] payload) {
ActorId id = new ActorId(actorId);
return Mono.fromSupplier(() -> this.getActorManager(actorTypeName))
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager)m).invokeMethod(id, actorMethodName, payload));
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager) m).invokeMethod(id, actorMethodName, payload));
}

/**
Expand All @@ -268,8 +292,8 @@ public Mono<byte[]> invoke(String actorTypeName, String actorId, String actorMet
public Mono<Void> invokeReminder(String actorTypeName, String actorId, String reminderName, byte[] params) {
ActorId id = new ActorId(actorId);
return Mono.fromSupplier(() -> this.getActorManager(actorTypeName))
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager)m).invokeReminder(new ActorId(actorId), reminderName, params));
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager) m).invokeReminder(new ActorId(actorId), reminderName, params));
}

/**
Expand All @@ -284,8 +308,8 @@ public Mono<Void> invokeReminder(String actorTypeName, String actorId, String re
public Mono<Void> invokeTimer(String actorTypeName, String actorId, String timerName, byte[] params) {
ActorId id = new ActorId(actorId);
return Mono.fromSupplier(() -> this.getActorManager(actorTypeName))
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager)m).invokeTimer(new ActorId(actorId), timerName, params));
.flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager) m).invokeTimer(new ActorId(actorId), timerName, params));
}

/**
Expand Down Expand Up @@ -318,23 +342,6 @@ private static DaprClient buildDaprClient(ManagedChannel channel) {
return new DaprClientImpl(channel);
}

/**
* Creates a GRPC managed channel (or null, if not applicable).
*
* @return GRPC managed channel or null.
*/
private static ManagedChannel buildManagedChannel() {
int port = Properties.GRPC_PORT.get();
if (port <= 0) {
throw new IllegalStateException("Invalid port.");
}

return ManagedChannelBuilder.forAddress(Properties.SIDECAR_IP.get(), port)
.usePlaintext()
.userAgent(Version.getSdkVersion())
.build();
}

/**
* {@inheritDoc}
*/
Expand Down

0 comments on commit ea8222e

Please sign in to comment.