From 8d7a544d0c4188bf2968408736014712434d67f3 Mon Sep 17 00:00:00 2001 From: Sagar <99425694+sgup432@users.noreply.github.com> Date: Tue, 23 May 2023 12:19:58 -0700 Subject: [PATCH] Adding task cancellation timestamp in task API (#7445) * Adding task cancellation time in task API Signed-off-by: Sagar Upadhyaya * Fixing unit tests and addressing comments Signed-off-by: Sagar Upadhyaya * Adding change log for unreleased 2.x Signed-off-by: Sagar Upadhyaya * Removing running time cancel info from task API Signed-off-by: Sagar Upadhyaya * Replacing long primitive with Long object Signed-off-by: Sagar Upadhyaya * Making cancelledAt field human readable Signed-off-by: Sagar Upadhyaya * Fixing failing test Signed-off-by: Sagar Upadhyaya * Removing the feature from unreleased 3.x Signed-off-by: Sagar Upadhyaya * Fixing ListTasksResponseTests failure Signed-off-by: Sagar Upadhyaya * Test failure fix Signed-off-by: Sagar Upadhyaya * Changing naming convention to cancellationStartTime Signed-off-by: Sagar Upadhyaya --------- Signed-off-by: Sagar Upadhyaya Signed-off-by: Sagar <99425694+sgup432@users.noreply.github.com> Co-authored-by: Andrew Ross --- CHANGELOG.md | 1 + .../core/tasks/GetTaskResponseTests.java | 7 ++- .../admin/cluster/node/tasks/TasksIT.java | 11 ++++ .../org/opensearch/tasks/CancellableTask.java | 18 ++++++ .../main/java/org/opensearch/tasks/Task.java | 10 ++- .../java/org/opensearch/tasks/TaskInfo.java | 61 ++++++++++++++++++- .../admin/cluster/node/tasks/TaskTests.java | 5 +- .../tasks/ListTasksResponseTests.java | 7 ++- .../org/opensearch/tasks/TaskInfoTests.java | 57 +++++++++++++---- 9 files changed, 155 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 549dc30e2184e..bf32ec511ea4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -89,6 +89,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244)) - Add 'unsigned_long' numeric field type ([#6237](https://github.com/opensearch-project/OpenSearch/pull/6237)) - Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375)) +- Add task cancellation timestamp in task API ([#7455](https://github.com/opensearch-project/OpenSearch/pull/7455)) - Adds ExtensionsManager.lookupExtensionSettingsById ([#7466](https://github.com/opensearch-project/OpenSearch/pull/7466)) - SegRep with Remote: Add hook for publishing checkpoint notifications after segment upload to remote store ([#7394](https://github.com/opensearch-project/OpenSearch/pull/7394)) - Provide mechanism to configure XContent parsing constraints (after update to Jackson 2.15.0 and above) ([#7550](https://github.com/opensearch-project/OpenSearch/pull/7550)) diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/core/tasks/GetTaskResponseTests.java b/client/rest-high-level/src/test/java/org/opensearch/client/core/tasks/GetTaskResponseTests.java index 3c0250c23ccae..a9b3591c08330 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/core/tasks/GetTaskResponseTests.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/core/tasks/GetTaskResponseTests.java @@ -95,6 +95,10 @@ static TaskInfo randomTaskInfo() { boolean cancellable = randomBoolean(); boolean cancelled = cancellable == true ? randomBoolean() : false; TaskId parentTaskId = randomBoolean() ? TaskId.EMPTY_TASK_ID : randomTaskId(); + Long cancellationStartTime = null; + if (cancelled) { + cancellationStartTime = randomNonNegativeLong(); + } Map headers = randomBoolean() ? Collections.emptyMap() : Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5)); @@ -110,7 +114,8 @@ static TaskInfo randomTaskInfo() { cancelled, parentTaskId, headers, - randomResourceStats() + randomResourceStats(), + cancellationStartTime ); } diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java index 4d297555c3acc..67e52529ae86b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java @@ -511,6 +511,17 @@ public void testTasksCancellation() throws Exception { .get(); assertEquals(1, cancelTasksResponse.getTasks().size()); + // Tasks are marked as cancelled at this point but not yet completed. + List taskInfoList = client().admin() + .cluster() + .prepareListTasks() + .setActions(TestTaskPlugin.TestTaskAction.NAME + "*") + .get() + .getTasks(); + for (TaskInfo taskInfo : taskInfoList) { + assertTrue(taskInfo.isCancelled()); + assertNotNull(taskInfo.getCancellationStartTime()); + } future.get(); logger.info("--> checking that test tasks are not running"); diff --git a/server/src/main/java/org/opensearch/tasks/CancellableTask.java b/server/src/main/java/org/opensearch/tasks/CancellableTask.java index 336f5b1f4c244..d32d04f006bbd 100644 --- a/server/src/main/java/org/opensearch/tasks/CancellableTask.java +++ b/server/src/main/java/org/opensearch/tasks/CancellableTask.java @@ -50,6 +50,14 @@ public abstract class CancellableTask extends Task { private volatile String reason; private final AtomicBoolean cancelled = new AtomicBoolean(false); private final TimeValue cancelAfterTimeInterval; + /** + * The time this task was cancelled as a wall clock time since epoch ({@link System#currentTimeMillis()} style). + */ + private Long cancellationStartTime = null; + /** + * The time this task was cancelled as a relative time ({@link System#nanoTime()} style). + */ + private Long cancellationStartTimeNanos = null; public CancellableTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { this(id, type, action, description, parentTaskId, headers, NO_TIMEOUT); @@ -74,6 +82,8 @@ public CancellableTask( public void cancel(String reason) { assert reason != null; if (cancelled.compareAndSet(false, true)) { + this.cancellationStartTime = System.currentTimeMillis(); + this.cancellationStartTimeNanos = System.nanoTime(); this.reason = reason; onCancelled(); } @@ -87,6 +97,14 @@ public boolean cancelOnParentLeaving() { return true; } + public Long getCancellationStartTime() { + return cancellationStartTime; + } + + public Long getCancellationStartTimeNanos() { + return cancellationStartTimeNanos; + } + /** * Returns true if this task can potentially have children that need to be cancelled when it parent is cancelled. */ diff --git a/server/src/main/java/org/opensearch/tasks/Task.java b/server/src/main/java/org/opensearch/tasks/Task.java index 638c9c7ab41bd..23266dca45efa 100644 --- a/server/src/main/java/org/opensearch/tasks/Task.java +++ b/server/src/main/java/org/opensearch/tasks/Task.java @@ -192,6 +192,11 @@ protected final TaskInfo taskInfo(String localNodeId, String description, Status * Build a proper {@link TaskInfo} for this task. */ protected final TaskInfo taskInfo(String localNodeId, String description, Status status, TaskResourceStats resourceStats) { + boolean cancelled = this instanceof CancellableTask && ((CancellableTask) this).isCancelled(); + Long cancellationStartTime = null; + if (cancelled) { + cancellationStartTime = ((CancellableTask) this).getCancellationStartTime(); + } return new TaskInfo( new TaskId(localNodeId, getId()), getType(), @@ -201,10 +206,11 @@ protected final TaskInfo taskInfo(String localNodeId, String description, Status startTime, System.nanoTime() - startTimeNanos, this instanceof CancellableTask, - this instanceof CancellableTask && ((CancellableTask) this).isCancelled(), + cancelled, parentTask, headers, - resourceStats + resourceStats, + cancellationStartTime ); } diff --git a/server/src/main/java/org/opensearch/tasks/TaskInfo.java b/server/src/main/java/org/opensearch/tasks/TaskInfo.java index 63bc46f8cfca6..2e5415279d804 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskInfo.java +++ b/server/src/main/java/org/opensearch/tasks/TaskInfo.java @@ -85,6 +85,8 @@ public final class TaskInfo implements Writeable, ToXContentFragment { private final boolean cancelled; + private final Long cancellationStartTime; + private final TaskId parentTaskId; private final Map headers; @@ -104,6 +106,38 @@ public TaskInfo( TaskId parentTaskId, Map headers, TaskResourceStats resourceStats + ) { + this( + taskId, + type, + action, + description, + status, + startTime, + runningTimeNanos, + cancellable, + cancelled, + parentTaskId, + headers, + resourceStats, + null + ); + } + + public TaskInfo( + TaskId taskId, + String type, + String action, + String description, + Task.Status status, + long startTime, + long runningTimeNanos, + boolean cancellable, + boolean cancelled, + TaskId parentTaskId, + Map headers, + TaskResourceStats resourceStats, + Long cancellationStartTime ) { if (cancellable == false && cancelled == true) { throw new IllegalArgumentException("task cannot be cancelled"); @@ -120,6 +154,7 @@ public TaskInfo( this.parentTaskId = parentTaskId; this.headers = headers; this.resourceStats = resourceStats; + this.cancellationStartTime = cancellationStartTime; } /** @@ -150,6 +185,11 @@ public TaskInfo(StreamInput in) throws IOException { } else { resourceStats = null; } + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + cancellationStartTime = in.readOptionalLong(); + } else { + cancellationStartTime = null; + } } @Override @@ -170,6 +210,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_1_0)) { out.writeOptionalWriteable(resourceStats); } + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeOptionalLong(cancellationStartTime); + } } public TaskId getTaskId() { @@ -228,6 +271,10 @@ public boolean isCancelled() { return cancelled; } + public Long getCancellationStartTime() { + return cancellationStartTime; + } + /** * Returns the parent task id */ @@ -281,6 +328,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws resourceStats.toXContent(builder, params); builder.endObject(); } + if (cancellationStartTime != null) { + builder.humanReadableField("cancellation_time_millis", "cancellation_time", new TimeValue(cancellationStartTime)); + } return builder; } @@ -308,6 +358,7 @@ public static TaskInfo fromXContent(XContentParser parser) { } @SuppressWarnings("unchecked") TaskResourceStats resourceStats = (TaskResourceStats) a[i++]; + Long cancellationStartTime = (Long) a[i++]; RawTaskStatus status = statusBytes == null ? null : new RawTaskStatus(statusBytes); TaskId parentTaskId = parentTaskIdString == null ? TaskId.EMPTY_TASK_ID : new TaskId(parentTaskIdString); return new TaskInfo( @@ -322,7 +373,8 @@ public static TaskInfo fromXContent(XContentParser parser) { cancelled, parentTaskId, headers, - resourceStats + resourceStats, + cancellationStartTime ); }); static { @@ -341,6 +393,7 @@ public static TaskInfo fromXContent(XContentParser parser) { PARSER.declareString(optionalConstructorArg(), new ParseField("parent_task_id")); PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), new ParseField("headers")); PARSER.declareObject(optionalConstructorArg(), (p, c) -> TaskResourceStats.fromXContent(p), new ParseField("resource_stats")); + PARSER.declareLong(optionalConstructorArg(), new ParseField("cancellation_time_millis")); } @Override @@ -366,7 +419,8 @@ public boolean equals(Object obj) { && Objects.equals(cancelled, other.cancelled) && Objects.equals(status, other.status) && Objects.equals(headers, other.headers) - && Objects.equals(resourceStats, other.resourceStats); + && Objects.equals(resourceStats, other.resourceStats) + && Objects.equals(cancellationStartTime, other.cancellationStartTime); } @Override @@ -383,7 +437,8 @@ public int hashCode() { cancelled, status, headers, - resourceStats + resourceStats, + cancellationStartTime ); } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskTests.java index 45db94577f15f..5ef99bae698a3 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskTests.java @@ -89,6 +89,7 @@ public void testCancellableOptionWhenCancelledTrue() { long taskId = randomIntBetween(0, 100000); long startTime = randomNonNegativeLong(); long runningTime = randomNonNegativeLong(); + long cancellationStartTime = randomNonNegativeLong(); boolean cancellable = true; boolean cancelled = true; TaskInfo taskInfo = new TaskInfo( @@ -103,12 +104,14 @@ public void testCancellableOptionWhenCancelledTrue() { cancelled, TaskId.EMPTY_TASK_ID, Collections.singletonMap("foo", "bar"), - randomResourceStats(randomBoolean()) + randomResourceStats(randomBoolean()), + cancellationStartTime ); String taskInfoString = taskInfo.toString(); Map map = XContentHelper.convertToMap(new BytesArray(taskInfoString.getBytes(StandardCharsets.UTF_8)), true).v2(); assertEquals(map.get("cancellable"), cancellable); assertEquals(map.get("cancelled"), cancelled); + assertEquals(map.get("cancellation_time_millis"), cancellationStartTime); } public void testCancellableOptionWhenCancelledFalse() { diff --git a/server/src/test/java/org/opensearch/tasks/ListTasksResponseTests.java b/server/src/test/java/org/opensearch/tasks/ListTasksResponseTests.java index d0002eb04914a..343f5b79a25a8 100644 --- a/server/src/test/java/org/opensearch/tasks/ListTasksResponseTests.java +++ b/server/src/test/java/org/opensearch/tasks/ListTasksResponseTests.java @@ -78,7 +78,8 @@ public void testNonEmptyToString() { { put("dummy-type1", new TaskResourceUsage(100, 100)); } - }) + }), + 0L ); ListTasksResponse tasksResponse = new ListTasksResponse(singletonList(info), emptyList(), emptyList()); assertEquals( @@ -105,7 +106,9 @@ public void testNonEmptyToString() { + " \"cpu_time_in_nanos\" : 100,\n" + " \"memory_in_bytes\" : 100\n" + " }\n" - + " }\n" + + " },\n" + + " \"cancellation_time\" : \"0s\",\n" + + " \"cancellation_time_millis\" : 0\n" + " }\n" + " ]\n" + "}", diff --git a/server/src/test/java/org/opensearch/tasks/TaskInfoTests.java b/server/src/test/java/org/opensearch/tasks/TaskInfoTests.java index c263972c9cb97..359b6898fa6b9 100644 --- a/server/src/test/java/org/opensearch/tasks/TaskInfoTests.java +++ b/server/src/test/java/org/opensearch/tasks/TaskInfoTests.java @@ -83,7 +83,7 @@ protected Predicate getRandomFieldsExcludeFilter() { @Override protected TaskInfo mutateInstance(TaskInfo info) { - switch (between(0, 10)) { + switch (between(0, 11)) { case 0: TaskId taskId = new TaskId(info.getTaskId().getNodeId() + randomAlphaOfLength(5), info.getTaskId().getId()); return new TaskInfo( @@ -98,7 +98,8 @@ protected TaskInfo mutateInstance(TaskInfo info) { info.isCancelled(), info.getParentTaskId(), info.getHeaders(), - info.getResourceStats() + info.getResourceStats(), + info.getCancellationStartTime() ); case 1: return new TaskInfo( @@ -113,7 +114,8 @@ protected TaskInfo mutateInstance(TaskInfo info) { info.isCancelled(), info.getParentTaskId(), info.getHeaders(), - info.getResourceStats() + info.getResourceStats(), + info.getCancellationStartTime() ); case 2: return new TaskInfo( @@ -128,7 +130,8 @@ protected TaskInfo mutateInstance(TaskInfo info) { info.isCancelled(), info.getParentTaskId(), info.getHeaders(), - info.getResourceStats() + info.getResourceStats(), + info.getCancellationStartTime() ); case 3: return new TaskInfo( @@ -143,7 +146,8 @@ protected TaskInfo mutateInstance(TaskInfo info) { info.isCancelled(), info.getParentTaskId(), info.getHeaders(), - info.getResourceStats() + info.getResourceStats(), + info.getCancellationStartTime() ); case 4: Task.Status newStatus = randomValueOtherThan(info.getStatus(), TaskInfoTests::randomRawTaskStatus); @@ -159,7 +163,8 @@ protected TaskInfo mutateInstance(TaskInfo info) { info.isCancelled(), info.getParentTaskId(), info.getHeaders(), - info.getResourceStats() + info.getResourceStats(), + info.getCancellationStartTime() ); case 5: return new TaskInfo( @@ -174,7 +179,8 @@ protected TaskInfo mutateInstance(TaskInfo info) { info.isCancelled(), info.getParentTaskId(), info.getHeaders(), - info.getResourceStats() + info.getResourceStats(), + info.getCancellationStartTime() ); case 6: return new TaskInfo( @@ -189,7 +195,8 @@ protected TaskInfo mutateInstance(TaskInfo info) { info.isCancelled(), info.getParentTaskId(), info.getHeaders(), - info.getResourceStats() + info.getResourceStats(), + info.getCancellationStartTime() ); case 7: return new TaskInfo( @@ -204,7 +211,8 @@ protected TaskInfo mutateInstance(TaskInfo info) { false, info.getParentTaskId(), info.getHeaders(), - info.getResourceStats() + info.getResourceStats(), + info.getCancellationStartTime() ); case 8: TaskId parentId = new TaskId(info.getParentTaskId().getNodeId() + randomAlphaOfLength(5), info.getParentTaskId().getId()); @@ -220,7 +228,8 @@ protected TaskInfo mutateInstance(TaskInfo info) { info.isCancelled(), parentId, info.getHeaders(), - info.getResourceStats() + info.getResourceStats(), + info.getCancellationStartTime() ); case 9: Map headers = info.getHeaders(); @@ -242,7 +251,8 @@ protected TaskInfo mutateInstance(TaskInfo info) { info.isCancelled(), info.getParentTaskId(), headers, - info.getResourceStats() + info.getResourceStats(), + info.getCancellationStartTime() ); case 10: Map resourceUsageMap; @@ -266,6 +276,22 @@ protected TaskInfo mutateInstance(TaskInfo info) { info.getHeaders(), new TaskResourceStats(resourceUsageMap) ); + case 11: + return new TaskInfo( + info.getTaskId(), + info.getType(), + info.getAction(), + info.getDescription(), + info.getStatus(), + info.getStartTime(), + info.getRunningTimeNanos(), + true, + true, + info.getParentTaskId(), + info.getHeaders(), + info.getResourceStats(), + randomNonNegativeLong() + ); default: throw new IllegalStateException(); } @@ -285,6 +311,10 @@ static TaskInfo randomTaskInfo(boolean detailed) { long runningTimeNanos = randomLong(); boolean cancellable = randomBoolean(); boolean cancelled = cancellable == true ? randomBoolean() : false; + Long cancellationStartTime = null; + if (cancelled) { + cancellationStartTime = randomNonNegativeLong(); + } TaskId parentTaskId = randomBoolean() ? TaskId.EMPTY_TASK_ID : randomTaskId(); Map headers = randomBoolean() ? Collections.emptyMap() @@ -301,7 +331,8 @@ static TaskInfo randomTaskInfo(boolean detailed) { cancelled, parentTaskId, headers, - randomResourceStats(detailed) + randomResourceStats(detailed), + cancellationStartTime ); } @@ -312,7 +343,7 @@ private static TaskId randomTaskId() { private static RawTaskStatus randomRawTaskStatus() { try (XContentBuilder builder = XContentBuilder.builder(Requests.INDEX_CONTENT_TYPE.xContent())) { builder.startObject(); - int fields = between(0, 10); + int fields = between(0, 11); for (int f = 0; f < fields; f++) { builder.field(randomAlphaOfLength(5), randomAlphaOfLength(5)); }