diff --git a/judgels-backends/judgels-grader-app/src/main/java/judgels/gabriel/grading/GradingModule.java b/judgels-backends/judgels-grader-app/src/main/java/judgels/gabriel/grading/GradingModule.java index e33b50846..e067e9c35 100644 --- a/judgels-backends/judgels-grader-app/src/main/java/judgels/gabriel/grading/GradingModule.java +++ b/judgels-backends/judgels-grader-app/src/main/java/judgels/gabriel/grading/GradingModule.java @@ -4,7 +4,9 @@ import dagger.Provides; import io.dropwizard.lifecycle.setup.LifecycleEnvironment; import java.nio.file.Path; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import javax.inject.Named; import javax.inject.Provider; import javax.inject.Singleton; @@ -41,10 +43,11 @@ GradingRequestPoller gradingRequestPoller( ExecutorService executorService = lifecycleEnv.executorService("grading-worker-%d") .maxThreads(config.getNumWorkerThreads()) .minThreads(config.getNumWorkerThreads()) + .workQueue(new ArrayBlockingQueue<>(config.getNumWorkerThreads())) .build(); return new GradingRequestPoller( - executorService, + (ThreadPoolExecutor) executorService, config.getGradingRequestQueueName(), messageClient, workerFactory); diff --git a/judgels-backends/judgels-grader-app/src/main/java/judgels/gabriel/grading/GradingRequestPoller.java b/judgels-backends/judgels-grader-app/src/main/java/judgels/gabriel/grading/GradingRequestPoller.java index 06ceabfd8..980bbd701 100644 --- a/judgels-backends/judgels-grader-app/src/main/java/judgels/gabriel/grading/GradingRequestPoller.java +++ b/judgels-backends/judgels-grader-app/src/main/java/judgels/gabriel/grading/GradingRequestPoller.java @@ -1,9 +1,8 @@ package judgels.gabriel.grading; -import java.time.Duration; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import javax.inject.Provider; import judgels.messaging.MessageClient; import judgels.messaging.api.Message; @@ -13,15 +12,13 @@ public class GradingRequestPoller implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(GradingRequestPoller.class); - private static final Duration POLLING_DELAY = Duration.ofSeconds(2); - - private final ExecutorService executorService; + private final ThreadPoolExecutor executorService; private final String queueName; private final MessageClient messageClient; private final Provider workerFactory; public GradingRequestPoller( - ExecutorService executorService, + ThreadPoolExecutor executorService, String queueName, MessageClient messageClient, Provider workerFactory) { @@ -36,13 +33,14 @@ public GradingRequestPoller( public void run() { while (true) { try { + if (executorService.getQueue().remainingCapacity() == 0) { + sleep(2 * 1000); + continue; + } + Optional maybeMessage = messageClient.receiveMessage(queueName); - if (!maybeMessage.isPresent()) { - try { - Thread.sleep(POLLING_DELAY.toMillis()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + if (maybeMessage.isEmpty()) { + sleep(2 * 1000); continue; } @@ -55,14 +53,18 @@ public void run() { return null; }); - try { - Thread.sleep((int) (Math.random() * 1000)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + sleep((long) (Math.random() * 1000)); } catch (Throwable e) { LOGGER.error("Failed to run grading request poller", e); } } } + + private void sleep(long milliseconds) { + try { + Thread.sleep(milliseconds); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } }