diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java index 222b41c3474..30e4a31922f 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java @@ -37,12 +37,14 @@ import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -69,6 +71,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; /** * Base integration tests for Compute functionality. To add new compute job for testing both in embedded and standalone mode, add the @@ -469,24 +472,72 @@ void executeMapReduce() { assertThat(result, is(sumOfNodeNamesLengths)); } - @Test - void cancelsJobLocally() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void cancelsJob(boolean local) { Ignite entryNode = node(0); + Ignite executeNode = local ? node(0) : node(1); + // This job catches the interruption and throws a RuntimeException JobDescriptor job = JobDescriptor.builder(SleepJob.class).units(units()).build(); - JobExecution execution = entryNode.compute().submit(JobTarget.node(clusterNode(entryNode)), job, Long.MAX_VALUE); + JobExecution execution = entryNode.compute().submit(JobTarget.node(clusterNode(executeNode)), job, Long.MAX_VALUE); await().until(execution::stateAsync, willBe(jobStateWithStatus(EXECUTING))); assertThat(execution.cancelAsync(), willBe(true)); + CompletionException completionException = assertThrows(CompletionException.class, () -> execution.resultAsync().join()); + + // Unwrap CompletionException, ComputeException should be the cause thrown from the API + assertThat(completionException.getCause(), instanceOf(ComputeException.class)); + ComputeException computeException = (ComputeException) completionException.getCause(); + + // ComputeException should be caused by the RuntimeException thrown from the SleepJob + assertThat(computeException.getCause(), instanceOf(RuntimeException.class)); + RuntimeException runtimeException = (RuntimeException) computeException.getCause(); + + // RuntimeException is thrown when SleepJob catches the InterruptedException + assertThat(runtimeException.getCause(), instanceOf(InterruptedException.class)); + assertThat(runtimeException.getCause().getCause(), is(nullValue())); + await().until(execution::stateAsync, willBe(jobStateWithStatus(CANCELED))); } - @Test - void cancelsQueuedJobLocally() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void cancelsNotCancellableJob(boolean local) { Ignite entryNode = node(0); - var nodes = JobTarget.node(clusterNode(entryNode)); + Ignite executeNode = local ? node(0) : node(1); + + // This job catches the interruption and returns normally + JobDescriptor job = JobDescriptor.builder(SilentSleepJob.class).units(units()).build(); + JobExecution execution = entryNode.compute().submit(JobTarget.node(clusterNode(executeNode)), job, Long.MAX_VALUE); + + await().until(execution::stateAsync, willBe(jobStateWithStatus(EXECUTING))); + + assertThat(execution.cancelAsync(), willBe(true)); + + CompletionException completionException = assertThrows(CompletionException.class, () -> execution.resultAsync().join()); + + // Unwrap CompletionException, ComputeException should be the cause thrown from the API + assertThat(completionException.getCause(), instanceOf(ComputeException.class)); + ComputeException computeException = (ComputeException) completionException.getCause(); + + // ComputeException should be caused by the CancellationException thrown from the executor which detects that the job completes, + // but was previously cancelled + assertThat(computeException.getCause(), instanceOf(CancellationException.class)); + CancellationException cancellationException = (CancellationException) computeException.getCause(); + assertThat(cancellationException.getCause(), is(nullValue())); + + await().until(execution::stateAsync, willBe(jobStateWithStatus(CANCELED))); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void cancelsQueuedJob(boolean local) { + Ignite entryNode = node(0); + Ignite executeNode = local ? node(0) : node(1); + var nodes = JobTarget.node(clusterNode(executeNode)); JobDescriptor job = JobDescriptor.builder(SleepJob.class).units(units()).build(); @@ -510,38 +561,14 @@ void cancelsQueuedJobLocally() { await().until(execution1::stateAsync, willBe(jobStateWithStatus(CANCELED))); } - @Test - void cancelsJobRemotely() { - Ignite entryNode = node(0); - - JobDescriptor job = JobDescriptor.builder(SleepJob.class).units(units()).build(); - JobExecution execution = entryNode.compute().submit(JobTarget.node(clusterNode(node(1))), job, Long.MAX_VALUE); - - await().until(execution::stateAsync, willBe(jobStateWithStatus(EXECUTING))); - - assertThat(execution.cancelAsync(), willBe(true)); - - await().until(execution::stateAsync, willBe(jobStateWithStatus(CANCELED))); - } - - @Test - void changeExecutingJobPriorityLocally() { - Ignite entryNode = node(0); - - JobDescriptor job = JobDescriptor.builder(SleepJob.class).units(units()).build(); - JobExecution execution = entryNode.compute().submit(JobTarget.node(clusterNode(entryNode)), job, Long.MAX_VALUE); - await().until(execution::stateAsync, willBe(jobStateWithStatus(EXECUTING))); - - assertThat(execution.changePriorityAsync(2), willBe(false)); - assertThat(execution.cancelAsync(), willBe(true)); - } - - @Test - void changeExecutingJobPriorityRemotely() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void changeExecutingJobPriority(boolean local) { Ignite entryNode = node(0); + Ignite executeNode = local ? node(0) : node(1); JobDescriptor job = JobDescriptor.builder(SleepJob.class).units(units()).build(); - JobExecution execution = entryNode.compute().submit(JobTarget.node(clusterNode(node(1))), job, Long.MAX_VALUE); + JobExecution execution = entryNode.compute().submit(JobTarget.node(clusterNode(executeNode)), job, Long.MAX_VALUE); await().until(execution::stateAsync, willBe(jobStateWithStatus(EXECUTING))); assertThat(execution.changePriorityAsync(2), willBe(false)); diff --git a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SilentSleepJob.java b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SilentSleepJob.java new file mode 100644 index 00000000000..b942c644c5b --- /dev/null +++ b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SilentSleepJob.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.JobExecutionContext; + +/** Compute job that sleeps for a number of milliseconds passed in the argument and completes successfully in any case. */ +public class SilentSleepJob implements ComputeJob { + @Override + public CompletableFuture executeAsync(JobExecutionContext jobExecutionContext, Long timeout) { + try { + TimeUnit.SECONDS.sleep(timeout); + } catch (InterruptedException e) { + // no op. + } + return null; + } +} diff --git a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java index 605baa7929a..d2914691833 100644 --- a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java +++ b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java @@ -22,7 +22,7 @@ import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.JobExecutionContext; -/** Compute job that sleeps for a number of milliseconds passed in the argument. */ +/** Compute job that sleeps for a number of milliseconds passed in the argument and throws a {@link RuntimeException} if interrupted. */ public class SleepJob implements ComputeJob { @Override public CompletableFuture executeAsync(JobExecutionContext jobExecutionContext, Long timeout) { diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java index 26cc364721f..ffa7db4f54e 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java @@ -158,7 +158,6 @@ public CompletableFuture resultAsync() { @Override public CompletableFuture<@Nullable Boolean> cancelAsync() { - resultFuture.cancel(false); return runningJobExecution.get().cancelAsync(); } diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java index 21a9329ad3f..ea28dd9e9b5 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java @@ -21,6 +21,7 @@ import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; @@ -186,10 +187,11 @@ private void run() { } else { if (queueEntry.isInterrupted()) { stateMachine.cancelJob(jobId); + result.completeExceptionally(new CancellationException()); } else { stateMachine.completeJob(jobId); + result.complete(r); } - result.complete(r); } }); } diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java index e623649e0d7..a2ac45186c5 100644 --- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java @@ -245,7 +245,7 @@ void taskCatchesInterruption() { execution::state, jobStateWithStatusAndCreateTimeStartTime(CANCELED, executingState.createTime(), executingState.startTime()) ); - assertThat(execution.resultAsync(), willBe(0)); + assertThat(execution.resultAsync(), willThrow(CancellationException.class)); } @Test