From 567df307321a9ec39f0bbe7708c6275e71a04556 Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Thu, 24 Oct 2024 17:53:32 +0800 Subject: [PATCH 1/3] Fix wrong error msg --- .../org/apache/iotdb/rpc/TSStatusCode.java | 5 ++- .../exception/MemoryNotEnoughException.java | 8 +++-- .../queryengine/execution/driver/Driver.java | 6 +++- .../execution/driver/DriverContext.java | 6 ++-- .../fragment/FragmentInstanceContext.java | 6 +++- .../fragment/FragmentInstanceExecution.java | 2 +- .../schedule/AbstractDriverThread.java | 11 ++++--- .../execution/schedule/DriverScheduler.java | 32 +++++++++++-------- .../execution/schedule/DriverTaskThread.java | 5 ++- .../DriverTaskTimeoutSentinelThread.java | 3 +- .../execution/schedule/IDriverScheduler.java | 2 +- .../execution/schedule/task/DriverTask.java | 9 +++--- .../iotdb/db/utils/ErrorHandlingUtils.java | 3 -- .../schedule/DriverSchedulerTest.java | 2 +- .../exception/QueryTimeoutException.java | 10 ++++++ 15 files changed, 73 insertions(+), 37 deletions(-) create mode 100644 iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/QueryTimeoutException.java diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index 0f091451e5bc..70175e3ec359 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -131,7 +131,10 @@ public enum TSStatusCode { EXPLAIN_ANALYZE_FETCH_ERROR(716), TOO_MANY_CONCURRENT_QUERIES_ERROR(717), - OPERATOR_NOT_FOUND(716), + OPERATOR_NOT_FOUND(718), + + QUERY_EXECUTION_MEMORY_NOT_ENOUGH(719), + QUERY_TIMEOUT(720), // Arithmetic NUMERIC_VALUE_OUT_OF_RANGE(750), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughException.java index c0911254cb7e..769399d51abd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughException.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/exception/MemoryNotEnoughException.java @@ -19,9 +19,13 @@ package org.apache.iotdb.db.queryengine.exception; -public class MemoryNotEnoughException extends RuntimeException { +import org.apache.iotdb.commons.exception.IoTDBRuntimeException; + +import static org.apache.iotdb.rpc.TSStatusCode.QUERY_EXECUTION_MEMORY_NOT_ENOUGH; + +public class MemoryNotEnoughException extends IoTDBRuntimeException { public MemoryNotEnoughException(String message) { - super(message); + super(message, QUERY_EXECUTION_MEMORY_NOT_ENOUGH.getStatusCode()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java index 812c84298fc6..248cce28240f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java @@ -246,7 +246,11 @@ private ListenableFuture processInternal() { List interrupterStack = exclusiveLock.getInterrupterStack(); if (interrupterStack == null) { driverContext.failed(t); - throw new RuntimeException(t); + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else { + throw new RuntimeException(t); + } } // Driver thread was interrupted which should only happen if the task is already finished. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java index 79cb4acf3b93..0c3fa448c64f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DriverContext.java @@ -121,8 +121,10 @@ public FragmentInstanceContext getFragmentInstanceContext() { } public void failed(Throwable cause) { - fragmentInstanceContext.failed(cause); - finished.set(true); + if (finished.compareAndSet(false, true)) { + fragmentInstanceContext.failed(cause); + finished.set(true); + } } public void finished() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 94917b12cd63..7771358e9de5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -395,7 +395,11 @@ public SessionInfo getSessionInfo() { } public Optional getFailureCause() { - return Optional.ofNullable(stateMachine.getFailureCauses().peek()); + return Optional.ofNullable( + stateMachine.getFailureCauses().stream() + .filter(e -> e instanceof IoTDBException || e instanceof IoTDBRuntimeException) + .findFirst() + .orElse(stateMachine.getFailureCauses().peek())); } public Filter getGlobalTimeFilter() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java index d1c8cc8ff6a3..fbca6d8442b4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java @@ -327,7 +327,7 @@ private void initialize(IDriverScheduler scheduler, boolean isExplainAnalyze) { context.releaseMemoryReservationManager(); if (newState.isFailed()) { - scheduler.abortFragmentInstance(instanceId); + scheduler.abortFragmentInstance(instanceId, context.getFailureCause().orElse(null)); } } catch (Throwable t) { try (SetThreadName threadName = new SetThreadName(instanceId.getFullId())) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java index 889eed8675f0..58cbfb13db76 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/AbstractDriverThread.java @@ -81,10 +81,10 @@ public void run() { new SetThreadName(next.getDriver().getDriverTaskId().getFullId())) { Throwable rootCause = ErrorHandlingUtils.getRootCause(e); if (rootCause instanceof IoTDBRuntimeException) { - next.setAbortCause(e.getMessage()); + next.setAbortCause(rootCause); } else { logger.warn("[ExecuteFailed]", e); - next.setAbortCause(getAbortCause(e)); + next.setAbortCause(getAbortCause(e, next.getDriverTaskId().getFullId())); } scheduler.toAborted(next); } @@ -122,11 +122,12 @@ public void close() throws IOException { closed = true; } - private String getAbortCause(final Exception e) { + private Throwable getAbortCause(final Exception e, String fullId) { Throwable rootCause = ErrorHandlingUtils.getRootCause(e); if (rootCause instanceof MemoryNotEnoughException) { - return DriverTaskAbortedException.BY_MEMORY_NOT_ENOUGH; + return rootCause; } - return DriverTaskAbortedException.BY_INTERNAL_ERROR_SCHEDULED; + return new DriverTaskAbortedException( + fullId, DriverTaskAbortedException.BY_INTERNAL_ERROR_SCHEDULED); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java index 09dab3d7819e..dfe4fc9a742e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java @@ -50,7 +50,6 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +59,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.OptionalInt; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -301,7 +301,10 @@ public void abortQuery(QueryId queryId) { for (Set fragmentRelatedTasks : queryRelatedTasks.values()) { if (fragmentRelatedTasks != null) { for (DriverTask task : fragmentRelatedTasks) { - task.setAbortCause(DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED); + task.setAbortCause( + new DriverTaskAbortedException( + task.getDriverTaskId().getFullId(), + DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED)); clearDriverTask(task); } } @@ -310,7 +313,7 @@ public void abortQuery(QueryId queryId) { } @Override - public void abortFragmentInstance(FragmentInstanceId instanceId) { + public void abortFragmentInstance(FragmentInstanceId instanceId, Throwable t) { Map> queryRelatedTasks = queryMap.get(instanceId.getQueryId()); if (queryRelatedTasks != null) { @@ -321,7 +324,12 @@ public void abortFragmentInstance(FragmentInstanceId instanceId) { if (task == null) { return; } - task.setAbortCause(DriverTaskAbortedException.BY_FRAGMENT_ABORT_CALLED); + task.setAbortCause( + t == null + ? new DriverTaskAbortedException( + task.getDriverTaskId().getFullId(), + DriverTaskAbortedException.BY_FRAGMENT_ABORT_CALLED) + : t); clearDriverTask(task); } } @@ -380,12 +388,9 @@ private void clearDriverTask(DriverTask task) { } try { task.lock(); - if (task.getAbortCause() != null) { + if (task.getAbortCause().isPresent()) { try { - task.getDriver() - .failed( - new DriverTaskAbortedException( - task.getDriver().getDriverTaskId().getFullId(), task.getAbortCause())); + task.getDriver().failed(task.getAbortCause().get()); } catch (Exception e) { logger.error("Clear DriverTask failed", e); } @@ -586,7 +591,7 @@ public void toAborted(DriverTask task) { task.unlock(); } clearDriverTask(task); - String abortCause = task.getAbortCause(); + Optional abortCause = task.getAbortCause(); QueryId queryId = task.getDriverTaskId().getQueryId(); Map> queryRelatedTasks = queryMap.remove(queryId); if (queryRelatedTasks != null) { @@ -598,9 +603,10 @@ public void toAborted(DriverTask task) { continue; } otherTask.setAbortCause( - StringUtils.isEmpty(abortCause) - ? DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED - : abortCause); + abortCause.orElse( + new DriverTaskAbortedException( + otherTask.getDriverTaskId().getFullId(), + DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED))); clearDriverTask(otherTask); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskThread.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskThread.java index 8cbbb537670a..132c11dadc17 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskThread.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskThread.java @@ -83,7 +83,10 @@ public void execute(DriverTask task) throws InterruptedException { ListenableFuture future = driver.processFor(timeSlice); // If the future is cancelled, the task is in an error and should be thrown. if (future.isCancelled()) { - task.setAbortCause(DriverTaskAbortedException.BY_ALREADY_BEING_CANCELLED); + task.setAbortCause( + new DriverTaskAbortedException( + task.getDriverTaskId().getFullId(), + DriverTaskAbortedException.BY_ALREADY_BEING_CANCELLED)); scheduler.toAborted(task); return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java index 9fb4e59dda2f..69f7e3020b7b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThread.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.execution.schedule; +import org.apache.iotdb.commons.exception.QueryTimeoutException; import org.apache.iotdb.db.queryengine.execution.schedule.queue.IndexedBlockingQueue; import org.apache.iotdb.db.queryengine.execution.schedule.task.DriverTask; @@ -84,7 +85,7 @@ public void execute(DriverTask task) throws InterruptedException { "[DriverTaskTimeout] Current time is {}, ddl of task is {}", System.currentTimeMillis(), task.getDDL()); - task.setAbortCause(DriverTaskAbortedException.BY_TIMEOUT); + task.setAbortCause(new QueryTimeoutException()); scheduler.toAborted(task); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/IDriverScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/IDriverScheduler.java index faa33ce112b1..1569870bd493 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/IDriverScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/IDriverScheduler.java @@ -57,5 +57,5 @@ void submitDrivers( * * @param instanceId the id of the fragment instance to be aborted. */ - void abortFragmentInstance(FragmentInstanceId instanceId); + void abortFragmentInstance(FragmentInstanceId instanceId, Throwable t); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTask.java index acbbcde6fab5..fcfef17db1d0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/task/DriverTask.java @@ -37,6 +37,7 @@ import io.airlift.units.Duration; import java.util.Comparator; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -52,7 +53,7 @@ public class DriverTask implements IDIndexedAccessible { private final boolean isHighestPriority; - private String abortCause; + private Throwable abortCause; private final AtomicReference priority; @@ -149,11 +150,11 @@ public boolean equals(Object o) { return o instanceof DriverTask && ((DriverTask) o).getDriverTaskId().equals(getDriverTaskId()); } - public String getAbortCause() { - return abortCause; + public Optional getAbortCause() { + return Optional.ofNullable(abortCause); } - public void setAbortCause(String abortCause) { + public void setAbortCause(Throwable abortCause) { this.abortCause = abortCause; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java index b3b173ce37da..76fdbb04010c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java @@ -30,7 +30,6 @@ import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.protocol.thrift.OperationType; -import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -161,8 +160,6 @@ private static TSStatus tryCatchQueryException(Exception e) { return RpcUtils.getStatus(((IoTDBRuntimeException) t).getErrorCode(), t.getMessage()); } else if (t instanceof ModelException) { return RpcUtils.getStatus(((ModelException) t).getStatusCode(), rootCause.getMessage()); - } else if (t instanceof MemoryNotEnoughException) { - return RpcUtils.getStatus(TSStatusCode.QUOTA_MEM_QUERY_NOT_ENOUGH, rootCause.getMessage()); } if (t instanceof RuntimeException && rootCause instanceof IoTDBException) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverSchedulerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverSchedulerTest.java index 4b9e6d7d8c5a..7af659397921 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverSchedulerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverSchedulerTest.java @@ -133,7 +133,7 @@ public void testManagingDriver() throws CpuNotEnoughException, MemoryNotEnoughEx // Abort one FragmentInstance Mockito.reset(mockDriver1); Mockito.when(mockDriver1.getDriverTaskId()).thenReturn(driverTaskId1); - manager.abortFragmentInstance(instanceId1); + manager.abortFragmentInstance(instanceId1, null); Mockito.verify(mockMPPDataExchangeManager, Mockito.times(1)) .forceDeregisterFragmentInstance(Mockito.any()); Assert.assertTrue(manager.getBlockedTasks().isEmpty()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/QueryTimeoutException.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/QueryTimeoutException.java new file mode 100644 index 000000000000..e3c80bbfdb74 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/QueryTimeoutException.java @@ -0,0 +1,10 @@ +package org.apache.iotdb.commons.exception; + +import static org.apache.iotdb.rpc.TSStatusCode.QUERY_TIMEOUT; + +public class QueryTimeoutException extends IoTDBRuntimeException { + + public QueryTimeoutException() { + super("Query execution is time out", QUERY_TIMEOUT.getStatusCode(), true); + } +} From 0ce498cc58d4604210c27ed8ee5e03db718c3cb5 Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Thu, 24 Oct 2024 18:03:04 +0800 Subject: [PATCH 2/3] add license --- .../exception/QueryTimeoutException.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/QueryTimeoutException.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/QueryTimeoutException.java index e3c80bbfdb74..61569a838268 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/QueryTimeoutException.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/QueryTimeoutException.java @@ -1,3 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + package org.apache.iotdb.commons.exception; import static org.apache.iotdb.rpc.TSStatusCode.QUERY_TIMEOUT; From 3d1f9e310c7804fe3cc289c897694d851eeabaa5 Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Thu, 24 Oct 2024 19:42:55 +0800 Subject: [PATCH 3/3] fix ut --- .../schedule/DriverTaskAbortedException.java | 2 +- .../execution/schedule/DriverSchedulerTest.java | 12 ++++++++---- .../DriverTaskTimeoutSentinelThreadTest.java | 14 ++++++++------ 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskAbortedException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskAbortedException.java index aac6d9fdebed..3fc00cd559b2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskAbortedException.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskAbortedException.java @@ -25,7 +25,7 @@ public class DriverTaskAbortedException extends Exception { public static final String BY_TIMEOUT = "timeout"; - public static final String BY_FRAGMENT_ABORT_CALLED = " called"; + public static final String BY_FRAGMENT_ABORT_CALLED = "called"; public static final String BY_QUERY_CASCADING_ABORTED = "query cascading aborted"; public static final String BY_ALREADY_BEING_CANCELLED = "already being cancelled"; public static final String BY_INTERNAL_ERROR_SCHEDULED = "internal error scheduled"; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverSchedulerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverSchedulerTest.java index 7af659397921..b651b62441ed 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverSchedulerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverSchedulerTest.java @@ -146,7 +146,9 @@ public void testManagingDriver() throws CpuNotEnoughException, MemoryNotEnoughEx Assert.assertEquals(DriverTaskStatus.READY, task3.getStatus()); Assert.assertEquals(DriverTaskStatus.READY, task4.getStatus()); Mockito.verify(mockDriver1, Mockito.times(1)).failed(Mockito.any()); - Assert.assertEquals(DriverTaskAbortedException.BY_FRAGMENT_ABORT_CALLED, task1.getAbortCause()); + Assert.assertEquals( + "DriverTask test.0.inst-0.0 is aborted by called", + task1.getAbortCause().get().getMessage()); // Abort the whole query Mockito.reset(mockMPPDataExchangeManager); @@ -173,9 +175,11 @@ public void testManagingDriver() throws CpuNotEnoughException, MemoryNotEnoughEx Mockito.verify(mockDriver3, Mockito.times(1)).failed(Mockito.any()); Mockito.verify(mockDriver4, Mockito.never()).failed(Mockito.any()); Assert.assertEquals( - DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED, task2.getAbortCause()); + "DriverTask test.0.inst-1.0 is aborted by query cascading aborted", + task2.getAbortCause().get().getMessage()); Assert.assertEquals( - DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED, task3.getAbortCause()); - Assert.assertNull(task4.getAbortCause()); + "DriverTask test.0.inst-2.0 is aborted by query cascading aborted", + task3.getAbortCause().get().getMessage()); + Assert.assertFalse(task4.getAbortCause().isPresent()); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java index 85575b136a14..22aa7dfc27a2 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverTaskTimeoutSentinelThreadTest.java @@ -99,7 +99,7 @@ public void testHandleInvalidStateTask() throws ExecutionException, InterruptedE executor.execute(testTask); Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask.getStatus()); Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any()); - Assert.assertNull(testTask.getAbortCause()); + Assert.assertFalse(testTask.getAbortCause().isPresent()); Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any()); Mockito.verify(mockScheduler, Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any()); Mockito.verify(mockScheduler, Mockito.never()).runningToFinished(Mockito.any(), Mockito.any()); @@ -139,7 +139,8 @@ public void testHandleTaskByCancelledInstance() throws ExecutionException, Inter executor.execute(testTask); Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any()); Assert.assertEquals( - DriverTaskAbortedException.BY_ALREADY_BEING_CANCELLED, testTask.getAbortCause()); + "DriverTask test.0.inst-0.0 is aborted by already being cancelled", + testTask.getAbortCause().get().getMessage()); Mockito.verify(mockScheduler, Mockito.times(1)).toAborted(Mockito.any()); Mockito.verify(mockScheduler, Mockito.never()).runningToReady(Mockito.any(), Mockito.any()); Mockito.verify(mockScheduler, Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any()); @@ -179,7 +180,7 @@ public void testHandleTaskByFinishedInstance() throws ExecutionException, Interr DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY, null, 0, false); executor.execute(testTask); Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any()); - Assert.assertNull(testTask.getAbortCause()); + Assert.assertFalse(testTask.getAbortCause().isPresent()); Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any()); Mockito.verify(mockScheduler, Mockito.never()).runningToReady(Mockito.any(), Mockito.any()); Mockito.verify(mockScheduler, Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any()); @@ -229,7 +230,7 @@ public void testHandleTaskByBlockedInstance() throws ExecutionException, Interru DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY, null, 0, false); executor.execute(testTask); Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any()); - Assert.assertNull(testTask.getAbortCause()); + Assert.assertFalse(testTask.getAbortCause().isPresent()); Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any()); Mockito.verify(mockScheduler, Mockito.never()).runningToReady(Mockito.any(), Mockito.any()); Mockito.verify(mockScheduler, Mockito.times(1)).runningToBlocked(Mockito.any(), Mockito.any()); @@ -280,7 +281,7 @@ public void testHandleTaskByReadyInstance() throws ExecutionException, Interrupt DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY, null, 0, false); executor.execute(testTask); Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any()); - Assert.assertNull(testTask.getAbortCause()); + Assert.assertFalse(testTask.getAbortCause().isPresent()); Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any()); Mockito.verify(mockScheduler, Mockito.times(1)).runningToReady(Mockito.any(), Mockito.any()); Mockito.verify(mockScheduler, Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any()); @@ -323,7 +324,8 @@ public void testHandleTaskWithInternalError() { executor.run(); // Here we use run() instead of start() to execute the task in the same thread Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any()); Assert.assertEquals( - DriverTaskAbortedException.BY_INTERNAL_ERROR_SCHEDULED, testTask.getAbortCause()); + "DriverTask test.0.inst-0.0 is aborted by internal error scheduled", + testTask.getAbortCause().get().getMessage()); Assert.assertEquals(0, taskQueue.size()); Mockito.verify(mockScheduler, Mockito.times(1)).toAborted(Mockito.any()); Mockito.verify(mockScheduler, Mockito.never()).runningToReady(Mockito.any(), Mockito.any());