Skip to content
Emil Koutanov edited this page May 11, 2017 · 9 revisions

Concept

Actors are similar to objects on the OO world, but with one notable difference. A traditional object is completely passive; it is characterised by state and behaviour, but is acted upon by an external entity (code residing in another object) and from within the execution context (thread) of the caller. An actor, on the other hand, is able to enact its own behaviour using an execution context that is distinct from that of the caller's. From this perspective, an actor is a special case of the Active Object pattern. An actor is invoked by passing it a message, which will be queued in the actor's mailbox and processed in the order they were enqueued. This leaves the caller free to continue processing, or to wait for the actor's response, if one is warranted.

Thus, an actor appears as somewhat of a cross between an object and a thread, as it possesses traits of both. Crucially, processing within an actor always takes place within a single thread of execution (although not necessarily the same thread each time), preserving the happens-before relationship with respect to each successive message processed. So one could also say that an actor is also a reminiscent of a synchronized block.

If the above doesn't make sense yet, don't worry. Just think of an actor as being like a person. It can do something when asked and it can remember things (being the actor's state), but it will only do one task at any given time even if several people ask it of favours - these just get banked up in a backlog until the actor gets around to them. When doing things, it can ask other people to help out, delegating all or a part of the task at hand. If we need lots of things done quickly, we can break up the work and hand it off to several similar actors.

Note: The human metaphor is quite important as it highlights the robustness of the actor model. When did you last see a deadlock or a race condition occur within a group of people?

Defining an Actor

Indigo provides two functionally equivalent but stylistically different means of defining actors. One approach is through classical interface inheritance, the other - using Java 8 lambdas.

The Actor interface

The first approach is through interface inheritance, by implementing the com.obsidiandynamics.indigo.Actor interface. This interface (shown below) has one mandatory method act() that must be implemented.

@FunctionalInterface
public interface Actor {
  default void activated(Activation a) {}
  
  default void passivated(Activation a) {}
  
  void act(Activation a, Message m);
}

For the time being, ignore the activated() and passivated() methods. We'll get to them later, when discussing activations and actor life-cycles.

For the remainder of this chapter we'll focus on a single contrived example - a simple concurrent adder. The adder keeps a running sum of all numbers given to it and provides a simple query mechanism for retrieving the sum. All of the code for this chapter is in the indigo/examples module, within the package com.obsidiandynamics.indigo.adder.

Note: This is among the most basic examples of asynchronous parallelism - collecting aggregate metrics across a number of process, such as the total number of requests processed by a server. Ideally, one would want to measure events by instrumenting the underlying processes in the least intrusive manner - without blocking these processes or otherwise imposing any sort of performance penalty.

The first step in defining an actor is to plan out its public API - the means by which other actors or external entities will issue commands and send queries to our actor. In the case of an adder the matter is simple; we need one command to add a number to the current sum and a query to retrieve the current sum. We'll define the public message contract separately to the actor, in a class called AdderContract.

Note: Defining a message contract separately to the actor implementation is akin to separating an interface from the implementation, and has a positive effect on coupling. While this is not strictly necessary, doing so will allow you to vary the actor's implementation without impacting the consumer, providing contract compatibility is retained.

/**
 *  The public message contract for the adder actor.
 */
public final class AdderContract {
  public static final String ROLE = "adder";
  
  /** This class is just a container for others; it doesn't require instantiation. */
  private AdderContract() {}
  
  /** Sent to the actor when adding a value. */
  public static final class Add {
    private int value;

    public Add(int value) {
      this.value = value;
    }

    int getValue() {
      return value;
    }
  }
  
  /** Sent to the actor when requesting the current sum. */
  public static final class Get {}
  
  /** Response sent by the actor containing the current sum. */
  public static final class GetResponse {
    private int sum;

    GetResponse(int sum) {
      this.sum = sum;
    }

    public int getSum() {
      return sum;
    }
  }
}

Let's digest the above, starting with the ROLE constant. Each actor must have a role to address it by, which is a free-form string. The string itself is unimportant, only as long as it's unique and the same role that is used to define the actor is also used when addressing it. Placing the role into the public contract is nothing but a useful means of canonicalising it.

Next, we'll need a message to add a number to the sum - AdderContract.Add - a simple POJO that encapsulates an addend, being the sole parameter to the sum function.

We'll need a way of getting the sum - AdderContract.Get - a blank POJO that signifies a parameterless query. The response will be carried in AdderContract.GetResponse - a POJO encapsulating a snapshot of the sum.

Note: Because the message contract isn't coupled to the actor's implementation, we'll reuse this contract later - when building actors with lambda functions.

With the contract 'formalities' out of the way, it's time to define our AdderActor.

public final class AdderActor implements Actor {
  /** The current sum. */
  private int sum;
  
  @Override
  public void act(Activation a, Message m) {
    m.select()
    .when(Add.class).then(b -> sum += b.getValue())
    .when(Get.class).then(b -> a.reply(m).tell(new GetResponse(sum)))
    .otherwise(a::messageFault);
  }
}

That's it. The entire actor implementation (sans the package declaration and imports) is barely a dozen lines. Let's dissect it, not that there is a lot to it.

An adder is stateful - it has to keep track of the current sum - hence the sum attribute.

The act() method is where the actor's behaviour is defined. It takes two parameters - an Activation object that links the actor with the rest of the actor system and a Message instance. A Message has a body, a from sender and a to recipient. (Other attributes exist in a message, but we can gloss over them for now.) Only the to attribute is required; the from and body attributes may be null.

Actors don't have multiple methods like ordinary classes; instead, they behave differently depending on the type of message passed to them. To be exact, when we say the 'type of message', we are actually referring to the type of the underlying body.

The easiest way to switch on the body type in Indigo is to use Message.select(). Use when(Class<B>) create a branch for a class type of interest and then(Consumer<B>) to handle a message body of the complying type. The when() method is like an if or an else if block, making the otherwise() method equivalent to an else. The select() API also has a whenNull(Runnable) method (omitted from our example), useful when a null message body is a valid input to the actor.

So effectively, our actor is saying:

  • If I get an Add message, I'll take the body of the message b of type Add and increment the sum by the value given to me within the b object;
  • If I get a Get message, I'll respond to the originator with the current sum, packing it into a GetResponse object;
  • Otherwise, if I don't understand the request, I'll raise a fault.

A reply is formed with the aid of the Activation.reply(Message) method, passing the response body as the parameter to the tell() chained method. There are two variations of this method - tell(Object) to respond with a message body and a parameterless tell() to respond with a null body.

The last point, while not strictly mandatory, is a messaging best-practice. As message type safety isn't enforced by the compiler, there is no way of being certain that a message will get handled. If a strange message type was received, the actor should assume that this isn't the intended behaviour as actors shouldn't be throwing random messages at one another to see what sticks. So raising a fault is almost always the sensible thing to do.

Note: We didn't use the volatile keyword or an AtomicInteger to model our state, or guard access to the state in synchronized block or a Lock implementation. In fact, there are no traces of the standard Java concurrency control primitives in our actor code. Yet it copes with multiple concurrent consumers and scales effortlessly.

Now that the actor's ready, it should be tested. Using JUnit 4.x we've whipped up a quick test.

public class AdderTest {
  @Test
  public void testInterfaceActor() throws InterruptedException {
    ActorSystem.create()
    .on(AdderContract.ROLE).cue(AdderActor::new)
    .ingress(AdderTest::addAndVerify)
    .shutdown();
  }
 
  private static void addAndVerify(Activation a) {
    for (int i = 1; i <= 10; i++) {
      a.to(ActorRef.of(AdderContract.ROLE)).tell(new AdderContract.Add(i));
    }
    
    a.to(ActorRef.of(AdderContract.ROLE))
    .ask(new AdderContract.Get())
    .onResponse(r -> assertEquals(55, r.<AdderContract.GetResponse>body().getSum()));
  }
}

The first step in our testInterfaceActor test method is to create an actor system and to register the actor implementation. Using the on(String) method, we specify the role of the upcoming actor, then follow with a call to cue(Supplier<Actor>), giving it a factory for instantiating our actor.

Note: We don't just give the actor system a pre-instantiated actor instance as, unlike most other actor framework, actors in Indigo are 'virtual' - spawned dynamically in response to demand. Even though our AdderActor will effectively be a singleton, the actual creation and disposal of the actor will be managed by the framework.

Once the actor is registered, we need to exercise the sum function and assert the outcome. But first we must touch on The First Rule of an actor system: all interactions with actors must occur from within the actor system. This sounds like a Catch 22, and if there weren't any exceptions then indeed this would be the case. Fortunately there are exceptions - two of them, to be precise. We'll touch on one for now - the ingress.

The ingress is an implementation of the agent pattern that passes actual behaviour within a message - any behaviour that can be defined by a function taking a single Activation parameter. Once delivered to an ingress actor, the code is executed as if it were an actor itself, responding to an artificial stimulus. Sounds complicated? Just think of an ingress as a portal into the actor system; anything defined within the ingress is masqueraded as an actor, 'teleported' into and run from within the actor system. Typically there are several ingress actors in an actor system, the default being the number of processor cores available to the JVM. So referring back to the portal metaphor, there are several concurrent portals available to run your code from within an actor system.

It's crucial that interactions with actors happen from within the actor system, using Indigo's internal thread scheduler, which brings us to The Second Rule of an actor systems - actors should never be accessed directly (even from other actors), but only through message passing. Failing to observe either of the rules will lead to corruption of the actor state and breaking of the one-message-at-a-time semantics of the act() method.

In our example, we've ingressed the addAndVerify function. It features a simple loop that sends integers 1..10 wrapped in AdderContract.Add to a singleton actor with the opaque address ActorRef.of(AdderContract.ROLE). Sending an unsolicited message is done by calling the to() method of the Activation instance, followed by tell(), remembering that an Activation is the bridge between a single actor and the rest of the actor system. The term unsolicited means that the message isn't expected by the target actor. This isn't always the case.

Note: How can we tell that the adder is a singleton? Because an actor can be uniquely identified by a composition of its role and its key, as multiple actors for a given role may exist. This would be done with the dual-parameter ActorRef.of(String role, String key) method. But when an actor is identified solely by its role with the single-parameter ActorRef.of(String role) method, as in our example, this implies by convention that the actor is a singleton. We say by convention as this isn't enforced by the framework. One could, in theory, message a singleton actor and also message keyed actors with the same role. There may be legitimate reasons for doing so, much like you can mix a singleton object with non-singletons of a same type in Java.

At this point we need to verify the sum to complete our test. So we send another unsolicited message to the adder, this time using the ask() method, passing an instance of AdderContract.Get, followed by the onResponse() method, passing it a callback function, to be invoked when the adder responds to our query. The callback is a simple Consumer<Message>. In our test, the callback uses the JUnit assertEquals method to compare the sum carried within the response body to the expected value. Note, the response is a considered a solicited message, as it is sent in response to a prior request.

The last part of our test is a call to shutdown(). This method has a dual purpose. It drains the actor system of any pending messages and terminates it. This includes any messages that may have been enqueued via the ingress and any responses that were solicited. If you want to drain the actor system without terminating it, simply call drain(). If we choose not to drain the actor system, we would face great difficulty in ensuring that the messages got to the adder and in asserting the resulting sum. One could put the main thread to sleep or, better still, use Awaitility, but draining is more elegant and more efficient.

Queueing and determinism

A slight detour; given the asynchronous nature of actors, how do know that the sum is 55 when we ask for it. Given that the tell() method only enqueues the message on the adder's mailbox, by the time the loop in first half of addAndVerify completes, there is no guarantee that the adder has received any of the messages, let alone processed and summed them all to yield the expected sum. The sum will be 55 eventually, but when? Yet the test passes; the sum is indeed as expected. That's because the query message enqueued with ask() also occupies the same mailbox and is ordered in strict sequence - a partial ordering with respect to the sending actor - the ingress. And because the Add commands and the Get query were sent from the same ingress actor, by the time the query arrives at the adder's doorstep, it would have processed all prior summations.

But what if we weren't using the same ingress. After all, the entire point is to enable concurrent and asynchronous programming. Let's rewrite the test to bombard the adder from multiple ingress points.

final ActorSystem system = ActorSystem.create()
.on(AdderContract.ROLE).cue(AdderActor::new);

system.ingress().times(10).act((a, i) ->
  a.to(ActorRef.of(AdderContract.ROLE)).tell(new AdderContract.Add(i + 1))
);

system.drain(0);

system.ingress(a ->
  a.to(ActorRef.of(AdderContract.ROLE))
  .ask(new AdderContract.Get())
  .onResponse(r -> assertEquals(55, r.<AdderContract.GetResponse>body().getSum()))
);

system.shutdown();

The ingress().times(int).act(BiConsumer<Activation, Integer>) pattern is a simple way of enqueuing multiple tasks for the ingress actor. (We could have just as easily used a for loop.). As we're no longer operating within the sanctity of a single ingress, we call the drain(int) method on the actor system, ensuring that all messages have been processed, before asserting the sum with a yet another ingress. The parameter to drain is the number of milliseconds to wait for the drain operation to complete, with 0 being the equivalent of 'forever'. The drain method returns the number of actors that still have at at least one message in their mailbox, or are waiting on at least one response message. By implication, when drain(0) returns, the return value will also be 0.

Lambda actors

Now that we have a solid understanding of actor basics and have built out first counting actor using interface inheritance, let's do the same with lambdas.

A StatelessLambdaActor is used for side effect-free processing of messages - the simplest case of message processing. Conversely, a StatefulLambdaActor maintains a mutable state object, supplying it to the act lambda on each message. Behind the scenes, both actor types implement the Actor interface; the lambda actors are just a different way of assembling an actor. Let's reconsider our example in the context of a stateful lambda definition.

Although this isn't mandated, for convenience a lambda actor is often defined inline, as per the test case below.

public class AdderTest {
  static class IntegerSum {
    int sum;
  }
  
  @Test
  public void testLambdaActor_longForm() throws InterruptedException {
    ActorSystem.create()
    .on(AdderContract.ROLE)
    .cue(StatefulLambdaActor.<IntegerSum>builder()
         .activated(a -> CompletableFuture.completedFuture(new IntegerSum()))
         .act((a, m, s) -> 
           m.select()
           .when(Add.class).then(b -> s.sum += b.getValue())
           .when(Get.class).then(b -> a.reply(m).tell(new GetResponse(s.sum)))
           .otherwise(a::messageFault)
         ))
    .ingress(AdderTest::addAndVerify)
    .shutdown();
  }

  // rest of the class
}

The first step is to define a state object IntegerSum - a capsule for holding the current state information of an actor. Like an actor, it is instantiated on demand by the framework.

Where we would normally provide an actor factory in cue(), we'll do so using the builder pattern exposed by the StatefulLambdaActor. The first thing you'll notice is that the actor is parametrised with a single generic type - the type of the state object. The builder takes three lambdas:

  • activated(Function<Activation, CompletableFuture<S>>): invoked when the actor is brought into existence, allowing the actor to initialise its state. A stateful actor would either start from a clean slate, or attempt to load its state from some persistent store (such as a database). The return type is a CompletableFuture, as the actor might have to carry out some asynchronous operations before returning a populated state object - such as querying a database and awaiting a response. Until the actor returns its state - either with completedFuture() or through an asynchronous assignment to the future object, it will not be handed any messages for processing.
  • act(TriConsumer<Activation, Message, S>): the main body of the actor, defining how the actor should behave when processing a message. This is similar to the act() method of an interface-derived actor, only it takes an additional parameter - the actor's state. The lambda is free to mutate the state.
  • passivated(BiConsumer<Activation, S>): invoked when the actor is about to be purged from existence, giving it an opportunity to clean up after itself and, most importantly, persist its state to stable storage. As we aren't using persistent actors in the present example, we don't need to pass in this lambda.

As you can see in the example above, the inline definition of the act lambda is almost identical to what we did earlier with the interface-derived actor. The only difference is that it operates on the state object s.

Short form

Lambda actors offer a succinct way of assembling an actor from its constituent parts using a builder pattern, but the ActorSystem API offers a slightly more simplified syntax for dealing with transient (non-persistent) actors, bypassing the need for the builder. This usage is illustrated in the snippet below (the rest of the class is unchanged).

ActorSystem.create()
.on(AdderContract.ROLE).cue(IntegerSum::new, (a, m, s) ->
  m.select()
  .when(Add.class).then(b -> s.sum += b.getValue())
  .when(Get.class).then(b -> a.reply(m).tell(new GetResponse(s.sum)))
  .otherwise(a::messageFault)
)
.ingress(AdderTest::addAndVerify)
.shutdown();

This time we've assembled the actor using an overloaded cue(Supplier<S> state, TriConsumer<Activation, Message, S>) method, where S is an implicit generic type corresponding to the type of the state object. The activation lambda has been simplified - rather than returning a CompletableFuture, it simply returns the object factory - the no-arg constructor of IntegerState. Happy days!

Lambdas vs interfaces - the approach to take

Neither of the above approaches to defining actors is superior to the other, but could be more suitable depending on the use case. Consider using the Actor interface when -

  • The actor definition is quite complex, such that it likely warrants its own class. And while one could still place pure functions into a dedicated class as static methods, and reference these from the lambda actor, a class with instance methods and attributes is arguably a more natural fit.
  • The actor is subject to multiple reuse within, or export outside of the project where it was defined. Packaging the actor up as class, preferably with a factory method, is more convenient than sharing multiple functions.
  • Inheritance may be a requirement, i.e. being able to define a base actor with one or more subclasses for more specialised use cases.
  • Porting from a different actor framework, where the actor code has already been implemented in a similar way (e.g. Akka uses class inheritance).
  • You are, for whatever reason, constrained from using lambdas in your project. However, bear in mind that the Indigo Activation API still heavily promotes the use of lambdas; using anonymous inner classes for things like callbacks would be a step backward.

Conversely, use lambdas when -

  • Writing short and succinct actor implementations.
  • Where it is useful to see multiple actor definitions together in the same class. This typically happens when two or more actors collectively solve a specific problem, such that the logic is split among each of the actors' definitions. Perhaps this used to be a larger actor that was subsequently partitioned to extract more fine-grained parallelism.
  • When you require the power and flexibility of lambdas, such as functional composition, or in any other case where there may be a strong preference towards functional programming.

The best of both worlds

To touch on the last point, the lambda actor APIs don't preclude one from packaging the lambdas in a dedicated class, along with a state object (in the stateful scenario). If taking this path, it's best to include a factory method for assembling the actor, delegating to StatefulLambdaActor.Builder or StatelessLambdaActor.Builder as appropriate.

Summary

Actors are active objects that communicate through message passing. Each actor has a dedicated mailbox and processes messages sequentially in the order of their arrival. An actor may have local state, which isn't shared with others. By eliminating shared state and blocking, concurrent applications are easier to write and are less prone to deadlocks and race conditions.

Plan out the actor's public message contract before settling on the implementation. If you expect the actor to be invoked from outside of your project, consider packaging the message contract into a separate class, as you would with a regular interface.

The Actor interface is the cornerstone of all actor implementations, including lambda actors. It has two optional methods - activated() and passivated() - used for handling life-cycle events - initialising state, loading the actor from stable storage, persisting the actor, cleaning up, and even signalling to other actors that might care. An Actor also has a single mandatory act() method, which defines the actor behaviour with respect to the messages it receives.

Use Message.select() to cleanly branch the actor's behaviour depending on the message (body) type. If the branches become large, break them out into separate methods. Get in the habit of including an otherwise(a::messageFault) to signal errors when facing unexpected messages.

To avoid wreaking havoc on the actor system, all interactions with actors must follow two fundamental rules:

  • The First Rule states that all interactions must occur from within the actor system. Any external (non-actor) code wishing to send and receive messages must be run from an ingress block.
  • The Second Rule forbids all direct interactions with actor objects - all interactions with actors must be through message passing.

Note: There's a somewhat less important third rule as well, as we'll discover in the next chapter.

An ActorRef is an opaque reference to an actor, consisting of a mandatory role and an optional key. By convention, an actor addressed only by a role is known as a singleton.

When functional programming is on the agenda, for virtually unrestricted power and flexibility use the builder pattern of StatefulLambdaActor and StatelessLambdaActor to assemble a working actor from its constituent parts. The actor definition can appear inline (with the ActorSystem definition) or packaged into a separate class for ease of distribution.

The short form lambda actor API is useful for assembling transient actors inline, with no builder boilerplate. It's somewhat less flexible and isn't intended to replace the builder, but works well in simple cases.

Use drain() to await all pending messages that are yet to be processed within the actor system. Use shutdown() to perform both a drain and to terminate the actor system.

Next: Activation