Skip to content

Commit

Permalink
IGNITE-23696 Fix return result for remote execution (#4736)
Browse files Browse the repository at this point in the history
  • Loading branch information
valepakh authored Nov 20, 2024
1 parent eca117a commit 8373582
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
Expand All @@ -69,6 +71,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

/**
* Base integration tests for Compute functionality. To add new compute job for testing both in embedded and standalone mode, add the
Expand Down Expand Up @@ -469,24 +472,72 @@ void executeMapReduce() {
assertThat(result, is(sumOfNodeNamesLengths));
}

@Test
void cancelsJobLocally() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void cancelsJob(boolean local) {
Ignite entryNode = node(0);
Ignite executeNode = local ? node(0) : node(1);

// This job catches the interruption and throws a RuntimeException
JobDescriptor<Long, Void> job = JobDescriptor.builder(SleepJob.class).units(units()).build();
JobExecution<Void> execution = entryNode.compute().submit(JobTarget.node(clusterNode(entryNode)), job, Long.MAX_VALUE);
JobExecution<Void> execution = entryNode.compute().submit(JobTarget.node(clusterNode(executeNode)), job, Long.MAX_VALUE);

await().until(execution::stateAsync, willBe(jobStateWithStatus(EXECUTING)));

assertThat(execution.cancelAsync(), willBe(true));

CompletionException completionException = assertThrows(CompletionException.class, () -> execution.resultAsync().join());

// Unwrap CompletionException, ComputeException should be the cause thrown from the API
assertThat(completionException.getCause(), instanceOf(ComputeException.class));
ComputeException computeException = (ComputeException) completionException.getCause();

// ComputeException should be caused by the RuntimeException thrown from the SleepJob
assertThat(computeException.getCause(), instanceOf(RuntimeException.class));
RuntimeException runtimeException = (RuntimeException) computeException.getCause();

// RuntimeException is thrown when SleepJob catches the InterruptedException
assertThat(runtimeException.getCause(), instanceOf(InterruptedException.class));
assertThat(runtimeException.getCause().getCause(), is(nullValue()));

await().until(execution::stateAsync, willBe(jobStateWithStatus(CANCELED)));
}

@Test
void cancelsQueuedJobLocally() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void cancelsNotCancellableJob(boolean local) {
Ignite entryNode = node(0);
var nodes = JobTarget.node(clusterNode(entryNode));
Ignite executeNode = local ? node(0) : node(1);

// This job catches the interruption and returns normally
JobDescriptor<Long, Void> job = JobDescriptor.builder(SilentSleepJob.class).units(units()).build();
JobExecution<Void> execution = entryNode.compute().submit(JobTarget.node(clusterNode(executeNode)), job, Long.MAX_VALUE);

await().until(execution::stateAsync, willBe(jobStateWithStatus(EXECUTING)));

assertThat(execution.cancelAsync(), willBe(true));

CompletionException completionException = assertThrows(CompletionException.class, () -> execution.resultAsync().join());

// Unwrap CompletionException, ComputeException should be the cause thrown from the API
assertThat(completionException.getCause(), instanceOf(ComputeException.class));
ComputeException computeException = (ComputeException) completionException.getCause();

// ComputeException should be caused by the CancellationException thrown from the executor which detects that the job completes,
// but was previously cancelled
assertThat(computeException.getCause(), instanceOf(CancellationException.class));
CancellationException cancellationException = (CancellationException) computeException.getCause();
assertThat(cancellationException.getCause(), is(nullValue()));

await().until(execution::stateAsync, willBe(jobStateWithStatus(CANCELED)));
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void cancelsQueuedJob(boolean local) {
Ignite entryNode = node(0);
Ignite executeNode = local ? node(0) : node(1);
var nodes = JobTarget.node(clusterNode(executeNode));

JobDescriptor<Long, Void> job = JobDescriptor.builder(SleepJob.class).units(units()).build();

Expand All @@ -510,38 +561,14 @@ void cancelsQueuedJobLocally() {
await().until(execution1::stateAsync, willBe(jobStateWithStatus(CANCELED)));
}

@Test
void cancelsJobRemotely() {
Ignite entryNode = node(0);

JobDescriptor<Long, Void> job = JobDescriptor.builder(SleepJob.class).units(units()).build();
JobExecution<Void> execution = entryNode.compute().submit(JobTarget.node(clusterNode(node(1))), job, Long.MAX_VALUE);

await().until(execution::stateAsync, willBe(jobStateWithStatus(EXECUTING)));

assertThat(execution.cancelAsync(), willBe(true));

await().until(execution::stateAsync, willBe(jobStateWithStatus(CANCELED)));
}

@Test
void changeExecutingJobPriorityLocally() {
Ignite entryNode = node(0);

JobDescriptor<Long, Void> job = JobDescriptor.builder(SleepJob.class).units(units()).build();
JobExecution<Void> execution = entryNode.compute().submit(JobTarget.node(clusterNode(entryNode)), job, Long.MAX_VALUE);
await().until(execution::stateAsync, willBe(jobStateWithStatus(EXECUTING)));

assertThat(execution.changePriorityAsync(2), willBe(false));
assertThat(execution.cancelAsync(), willBe(true));
}

@Test
void changeExecutingJobPriorityRemotely() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void changeExecutingJobPriority(boolean local) {
Ignite entryNode = node(0);
Ignite executeNode = local ? node(0) : node(1);

JobDescriptor<Long, Void> job = JobDescriptor.builder(SleepJob.class).units(units()).build();
JobExecution<Void> execution = entryNode.compute().submit(JobTarget.node(clusterNode(node(1))), job, Long.MAX_VALUE);
JobExecution<Void> execution = entryNode.compute().submit(JobTarget.node(clusterNode(executeNode)), job, Long.MAX_VALUE);
await().until(execution::stateAsync, willBe(jobStateWithStatus(EXECUTING)));

assertThat(execution.changePriorityAsync(2), willBe(false));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.compute;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobExecutionContext;

/** Compute job that sleeps for a number of milliseconds passed in the argument and completes successfully in any case. */
public class SilentSleepJob implements ComputeJob<Long, Void> {
@Override
public CompletableFuture<Void> executeAsync(JobExecutionContext jobExecutionContext, Long timeout) {
try {
TimeUnit.SECONDS.sleep(timeout);
} catch (InterruptedException e) {
// no op.
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobExecutionContext;

/** Compute job that sleeps for a number of milliseconds passed in the argument. */
/** Compute job that sleeps for a number of milliseconds passed in the argument and throws a {@link RuntimeException} if interrupted. */
public class SleepJob implements ComputeJob<Long, Void> {
@Override
public CompletableFuture<Void> executeAsync(JobExecutionContext jobExecutionContext, Long timeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ public CompletableFuture<T> resultAsync() {

@Override
public CompletableFuture<@Nullable Boolean> cancelAsync() {
resultFuture.cancel(false);
return runningJobExecution.get().cancelAsync();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -186,10 +187,11 @@ private void run() {
} else {
if (queueEntry.isInterrupted()) {
stateMachine.cancelJob(jobId);
result.completeExceptionally(new CancellationException());
} else {
stateMachine.completeJob(jobId);
result.complete(r);
}
result.complete(r);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ void taskCatchesInterruption() {
execution::state,
jobStateWithStatusAndCreateTimeStartTime(CANCELED, executingState.createTime(), executingState.startTime())
);
assertThat(execution.resultAsync(), willBe(0));
assertThat(execution.resultAsync(), willThrow(CancellationException.class));
}

@Test
Expand Down

0 comments on commit 8373582

Please sign in to comment.