diff --git a/firefly-common/src/main/java/com/fireflysource/common/actor/AbstractActor.java b/firefly-common/src/main/java/com/fireflysource/common/actor/AbstractActor.java index 2f9062ca7..b698d042a 100644 --- a/firefly-common/src/main/java/com/fireflysource/common/actor/AbstractActor.java +++ b/firefly-common/src/main/java/com/fireflysource/common/actor/AbstractActor.java @@ -36,7 +36,7 @@ public String getAddress() { } @Override - public boolean send(T message) { + public boolean offer(T message) { if (mailbox.offerUserMessage(message)) { dispatch(); return true; @@ -73,14 +73,15 @@ public ActorState getActorState() { @Override public void run() { while (true) { - handleSystemMessages(); + boolean systemMailboxEmpty = handleSystemMessages(); if (actorState == ActorState.PAUSE) { break; } - boolean empty = handleUserMessages(); - if (empty) { + boolean userMailboxEmpty = handleUserMessages(); + + if (systemMailboxEmpty && userMailboxEmpty) { break; } } @@ -124,7 +125,14 @@ private boolean handleUserMessages() { return empty; } - private void handleSystemMessages() { + protected void pauseInMessageProcessThread() { + if (actorState == ActorState.RUNNING) { + actorState = ActorState.PAUSE; + } + } + + private boolean handleSystemMessages() { + boolean empty; SystemMessage systemMessage = mailbox.pollSystemMessage(); if (systemMessage != null) { switch (systemMessage) { @@ -147,7 +155,11 @@ private void handleSystemMessages() { } break; } + empty = false; + } else { + empty = true; } + return empty; } private void sendSystemMessage(SystemMessage message) { @@ -221,8 +233,6 @@ public void dispatch(Runnable runnable) { public static class MailboxImpl implements Mailbox { private final Queue userMessageQueue; private final Queue systemMessageQueue; - private final AtomicInteger unhandledUserMessageCount = new AtomicInteger(0); - private final AtomicInteger unhandledSystemMessageCount = new AtomicInteger(0); public MailboxImpl(Queue userMessageQueue, Queue systemMessageQueue) { this.userMessageQueue = userMessageQueue; @@ -231,48 +241,32 @@ public MailboxImpl(Queue userMessageQueue, Queue systemMessage @Override public AbstractActor.SystemMessage pollSystemMessage() { - AbstractActor.SystemMessage systemMessage = systemMessageQueue.poll(); - if (systemMessage != null) { - unhandledSystemMessageCount.decrementAndGet(); - } - return systemMessage; + return systemMessageQueue.poll(); } @Override public boolean offerSystemMessage(AbstractActor.SystemMessage systemMessage) { - boolean success = systemMessageQueue.offer(systemMessage); - if (success) { - unhandledSystemMessageCount.incrementAndGet(); - } - return success; + return systemMessageQueue.offer(systemMessage); } @Override public boolean hasSystemMessage() { - return unhandledSystemMessageCount.get() > 0; + return systemMessageQueue.peek() != null; } @Override public T pollUserMessage() { - T message = userMessageQueue.poll(); - if (message != null) { - unhandledUserMessageCount.decrementAndGet(); - } - return message; + return userMessageQueue.poll(); } @Override public boolean offerUserMessage(T userMessage) { - boolean success = userMessageQueue.offer(userMessage); - if (success) { - unhandledUserMessageCount.incrementAndGet(); - } - return success; + return userMessageQueue.offer(userMessage); } @Override public boolean hasUserMessage() { - return unhandledUserMessageCount.get() > 0; + return userMessageQueue.peek() != null; } } } diff --git a/firefly-common/src/main/java/com/fireflysource/common/actor/AbstractAsyncActor.java b/firefly-common/src/main/java/com/fireflysource/common/actor/AbstractAsyncActor.java index ffafaa672..cd170c17b 100644 --- a/firefly-common/src/main/java/com/fireflysource/common/actor/AbstractAsyncActor.java +++ b/firefly-common/src/main/java/com/fireflysource/common/actor/AbstractAsyncActor.java @@ -16,7 +16,7 @@ public AbstractAsyncActor(String address, Dispatcher dispatcher, Mailbox { resume(); return Result.DONE; diff --git a/firefly-common/src/main/java/com/fireflysource/common/actor/Actor.java b/firefly-common/src/main/java/com/fireflysource/common/actor/Actor.java index 8ad049c91..131fb9613 100644 --- a/firefly-common/src/main/java/com/fireflysource/common/actor/Actor.java +++ b/firefly-common/src/main/java/com/fireflysource/common/actor/Actor.java @@ -15,11 +15,11 @@ public interface Actor { String getAddress(); /** - * Send message to this actor. + * Offer message to this actor's mailbox. * * @param message The message. - * @return If true, send message success. + * @return If true, offer message success. */ - boolean send(T message); + boolean offer(T message); } diff --git a/firefly-common/src/test/java/com/fireflysource/common/actor/TestActor.java b/firefly-common/src/test/java/com/fireflysource/common/actor/TestActor.java index 5b48c6750..ede25fdac 100644 --- a/firefly-common/src/test/java/com/fireflysource/common/actor/TestActor.java +++ b/firefly-common/src/test/java/com/fireflysource/common/actor/TestActor.java @@ -30,7 +30,7 @@ void test() throws Exception { log("Today sales amount: " + amount); return null; })); - store.send(closeMessage); + store.offer(closeMessage); CompletableFuture.allOf(results.stream().map(r -> r.handle((ignore, throwable) -> { Optional.ofNullable(throwable).map(Throwable::getMessage).ifPresent(System.out::println); @@ -41,13 +41,13 @@ void test() throws Exception { private void stock(StoreActor store, String name, long price, int count) { IntStream.range(0, count).parallel() - .forEach(i -> store.send(new StoreActor.StockMessage(new StoreActor.Product(name, price)))); + .forEach(i -> store.offer(new StoreActor.StockMessage(new StoreActor.Product(name, price)))); } private List> purchase(StoreActor store, String name, long price, int count) { return IntStream.range(0, count).parallel().boxed().map(i -> { StoreActor.PurchaseMessage purchaseMessage = new StoreActor.PurchaseMessage(new StoreActor.Product(name, price)); - store.send(purchaseMessage); + store.offer(purchaseMessage); return purchaseMessage.result.thenAccept(ignore -> log("purchase " + name + " success.")); }).collect(Collectors.toList()); } diff --git a/pom.xml b/pom.xml index 98bc7642f..cfeddfeb3 100644 --- a/pom.xml +++ b/pom.xml @@ -69,7 +69,7 @@ official - 1.9.10 + 1.9.20 true 8 @@ -324,7 +324,7 @@ org.jacoco jacoco-maven-plugin - 0.8.11 + 0.8.12 org.apache.maven.reporting