diff --git a/src/java/org/apache/cassandra/concurrent/Stage.java b/src/java/org/apache/cassandra/concurrent/Stage.java index 86b6d32f7638..624a0ffe91b7 100644 --- a/src/java/org/apache/cassandra/concurrent/Stage.java +++ b/src/java/org/apache/cassandra/concurrent/Stage.java @@ -170,25 +170,52 @@ else if (statement instanceof ModificationStatement || statement instanceof Batc } // Convenience functions to execute on this stage - public void execute(Runnable command) { executor().execute(withTimeMeasurement(command)); } + public void execute(Runnable command) + { + long enqueueStartTime = System.nanoTime(); + executor().execute(withTimeMeasurement(command, enqueueStartTime)); + } + + public void execute(Runnable command, ExecutorLocals locals) + { + long enqueueStartTime = System.nanoTime(); + executor().execute(withTimeMeasurement(command, enqueueStartTime), locals); + } - public void execute(Runnable command, ExecutorLocals locals) { executor().execute(withTimeMeasurement(command), locals); } - public void maybeExecuteImmediately(Runnable command) { executor().maybeExecuteImmediately(withTimeMeasurement(command)); } - public CompletableFuture submit(Callable task) { return CompletableFuture.supplyAsync(() -> { + public void maybeExecuteImmediately(Runnable command) + { + long enqueueStartTime = System.nanoTime(); + executor().maybeExecuteImmediately(withTimeMeasurement(command, enqueueStartTime)); + } + + public CompletableFuture submit(Callable task) + { + long enqueueStartTime = System.nanoTime(); + return CompletableFuture.supplyAsync(() -> { try { - return withTimeMeasurement(task).call(); + return withTimeMeasurement(task, enqueueStartTime).call(); } catch (Exception e) { throw Throwables.unchecked(e); } }, executor()); } - public CompletableFuture submit(Runnable task) { return CompletableFuture.runAsync(withTimeMeasurement(task), executor()); } - public CompletableFuture submit(Runnable task, T result) { return CompletableFuture.supplyAsync(() -> { - withTimeMeasurement(task).run(); - return result; - }, executor()); } + + public CompletableFuture submit(Runnable task) + { + long enqueueStartTime = System.nanoTime(); + return CompletableFuture.runAsync(withTimeMeasurement(task, enqueueStartTime), executor()); + } + + public CompletableFuture submit(Runnable task, T result) + { + long enqueueStartTime = System.nanoTime(); + return CompletableFuture.supplyAsync(() -> { + withTimeMeasurement(task, enqueueStartTime).run(); + return result; + }, executor()); + } private LocalAwareExecutorService executor() { @@ -409,9 +436,8 @@ public int getPendingTaskCount() } } - private Runnable withTimeMeasurement(Runnable command) + private Runnable withTimeMeasurement(Runnable command, long queueStartTime) { - long queueStartTime = System.nanoTime(); return () -> { long executionStartTime = System.nanoTime(); try @@ -426,17 +452,18 @@ private Runnable withTimeMeasurement(Runnable command) }; } - private Callable withTimeMeasurement(Callable command) + private Callable withTimeMeasurement(Callable command, long queueStartTime) { return () -> { - long startTime = System.nanoTime(); + long executionStartTime = System.nanoTime(); try { + TaskExecutionCallback.instance.onDequeue(this, executionStartTime - queueStartTime); return command.call(); } finally { - TaskExecutionCallback.instance.onCompleted(this, System.nanoTime() - startTime); + TaskExecutionCallback.instance.onCompleted(this, System.nanoTime() - executionStartTime); } }; } diff --git a/test/unit/org/apache/cassandra/concurrent/StageTimeMeasurementTest.java b/test/unit/org/apache/cassandra/concurrent/StageTimeMeasurementTest.java new file mode 100644 index 000000000000..2c6c84677a0e --- /dev/null +++ b/test/unit/org/apache/cassandra/concurrent/StageTimeMeasurementTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.concurrent; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.awaitility.Awaitility; + +import static org.junit.Assert.assertEquals; + +public class StageTimeMeasurementTest +{ + private static final Logger logger = LoggerFactory.getLogger(StageTimeMeasurementTest.class); + + public static final Stage TESTED_STAGE = Stage.READ; + private static final int MAX_CONCURRENCY = 2; + private static final long TASK_DURATION_NANOS = TimeUnit.MILLISECONDS.toNanos(100); + static TestTaskExecutionCallback callback; + + @BeforeClass + public static void setup() + { + CassandraRelevantProperties.CUSTOM_TASK_EXECUTION_CALLBACK_CLASS.setString(TestTaskExecutionCallback.class.getName()); + callback = (TestTaskExecutionCallback) TaskExecutionCallback.instance; + DatabaseDescriptor.daemonInitialization(); + Stage.READ.setMaximumPoolSize(MAX_CONCURRENCY); + + // prime the stage, so that the first task doesn't have to wait for the stage to be initialized + for (int i = 0; i < MAX_CONCURRENCY; i++) + { + TESTED_STAGE.execute(new LongRunnable()); + } + Awaitility.await().until(() -> callback.executionTimes.size() == MAX_CONCURRENCY); + } + + @Before + public void reset() + { + callback.executionTimes.clear(); + callback.enqueuedTimes.clear(); + } + + @Test + public void executionAndQueueTimeAreCountedOnExecute() + { + testExecutionAndQueueTimeAreCounted(TESTED_STAGE::execute); + } + + @Test + public void executionAndQueueTimeAreCountedOnExecuteWithLocals() + { + testExecutionAndQueueTimeAreCounted(r -> TESTED_STAGE.execute(r, null)); + } + + @Test + public void executionAndQueueTimeAreCountedOnMaybeExecuteImmediately() + { + testExecutionAndQueueTimeAreCounted(TESTED_STAGE::maybeExecuteImmediately); + } + + @Test + public void executionAndQueueTimeAreCountedOnSubmit() + { + testExecutionAndQueueTimeAreCounted(TESTED_STAGE::submit); + } + + @Test + public void executionAndQueueTimeAreCountedOnSubmitWithResult() + { + testExecutionAndQueueTimeAreCounted(r -> TESTED_STAGE.submit(r, null)); + } + + @Test + public void executionAndQueueTimeAreCountedOnSubmitCallable() + { + testExecutionAndQueueTimeAreCounted(r -> TESTED_STAGE.submit(() -> { r.run(); return null; })); + } + + public void testExecutionAndQueueTimeAreCounted(Consumer runnableRunner) + { + int NUM_TASKS = 10; + + for (int i = 0; i < NUM_TASKS; i++) + { + ForkJoinPool.commonPool().execute(() -> runnableRunner.accept(new LongRunnable())); + } + + Awaitility.await().until(() -> callback.executionTimes.size() == NUM_TASKS); + + logger.info("Completed tasks: {}", TESTED_STAGE.getCompletedTaskCount()); + logger.info("Execution times: {}", callback.executionTimes); + logger.info("Queue times: {}", callback.enqueuedTimes); + + final double MAX_ACCEPTABLE_MEASUREMENT_ERROR = 0.1 * TASK_DURATION_NANOS; + + for (int i = 0; i < NUM_TASKS; i++) + { + // expect each task takes roughly TASK_DURATION_MS + assertEquals(TASK_DURATION_NANOS, callback.executionTimes.get(i), MAX_ACCEPTABLE_MEASUREMENT_ERROR); + } + for (int i = 0; i < NUM_TASKS; i += MAX_CONCURRENCY) + { + // expect in each iteration tasks are enqueued for TASK_DURATION_NANOS more + for (int concurrentTask = 0; concurrentTask < MAX_CONCURRENCY; concurrentTask++) + { + assertEquals((double) i / MAX_CONCURRENCY * TASK_DURATION_NANOS, callback.enqueuedTimes.get(i + concurrentTask), MAX_ACCEPTABLE_MEASUREMENT_ERROR); + } + } + } + + public static class TestTaskExecutionCallback implements TaskExecutionCallback + { + private final List executionTimes = new CopyOnWriteArrayList<>(); + private final List enqueuedTimes = new CopyOnWriteArrayList<>(); + + @Override + public void onCompleted(Stage stage, long executionDurationNanos) + { + assertEquals(TESTED_STAGE, stage); + executionTimes.add(executionDurationNanos); + } + + @Override + public void onDequeue(Stage stage, long enqueuedDurationNanos) + { + assertEquals(TESTED_STAGE, stage); + enqueuedTimes.add(enqueuedDurationNanos); + } + } + + private static class LongRunnable implements Runnable + { + @Override + public void run() + { + Uninterruptibles.sleepUninterruptibly(TASK_DURATION_NANOS, TimeUnit.NANOSECONDS); + } + } +} \ No newline at end of file