Skip to content

Commit

Permalink
Fix accidental wrong error msg
Browse files Browse the repository at this point in the history
  • Loading branch information
JackieTien97 authored Oct 25, 2024
1 parent f94865e commit 0db4ec4
Show file tree
Hide file tree
Showing 17 changed files with 110 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ public enum TSStatusCode {
EXPLAIN_ANALYZE_FETCH_ERROR(716),
TOO_MANY_CONCURRENT_QUERIES_ERROR(717),

OPERATOR_NOT_FOUND(716),
OPERATOR_NOT_FOUND(718),

QUERY_EXECUTION_MEMORY_NOT_ENOUGH(719),
QUERY_TIMEOUT(720),

// Arithmetic
NUMERIC_VALUE_OUT_OF_RANGE(750),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

package org.apache.iotdb.db.queryengine.exception;

public class MemoryNotEnoughException extends RuntimeException {
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;

import static org.apache.iotdb.rpc.TSStatusCode.QUERY_EXECUTION_MEMORY_NOT_ENOUGH;

public class MemoryNotEnoughException extends IoTDBRuntimeException {

public MemoryNotEnoughException(String message) {
super(message);
super(message, QUERY_EXECUTION_MEMORY_NOT_ENOUGH.getStatusCode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,11 @@ private ListenableFuture<?> processInternal() {
List<StackTraceElement> interrupterStack = exclusiveLock.getInterrupterStack();
if (interrupterStack == null) {
driverContext.failed(t);
throw new RuntimeException(t);
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RuntimeException(t);
}
}

// Driver thread was interrupted which should only happen if the task is already finished.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,10 @@ public FragmentInstanceContext getFragmentInstanceContext() {
}

public void failed(Throwable cause) {
fragmentInstanceContext.failed(cause);
finished.set(true);
if (finished.compareAndSet(false, true)) {
fragmentInstanceContext.failed(cause);
finished.set(true);
}
}

public void finished() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,11 @@ public SessionInfo getSessionInfo() {
}

public Optional<Throwable> getFailureCause() {
return Optional.ofNullable(stateMachine.getFailureCauses().peek());
return Optional.ofNullable(
stateMachine.getFailureCauses().stream()
.filter(e -> e instanceof IoTDBException || e instanceof IoTDBRuntimeException)
.findFirst()
.orElse(stateMachine.getFailureCauses().peek()));
}

public Filter getGlobalTimeFilter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ private void initialize(IDriverScheduler scheduler, boolean isExplainAnalyze) {
context.releaseMemoryReservationManager();

if (newState.isFailed()) {
scheduler.abortFragmentInstance(instanceId);
scheduler.abortFragmentInstance(instanceId, context.getFailureCause().orElse(null));
}
} catch (Throwable t) {
try (SetThreadName threadName = new SetThreadName(instanceId.getFullId())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ public void run() {
new SetThreadName(next.getDriver().getDriverTaskId().getFullId())) {
Throwable rootCause = ErrorHandlingUtils.getRootCause(e);
if (rootCause instanceof IoTDBRuntimeException) {
next.setAbortCause(e.getMessage());
next.setAbortCause(rootCause);
} else {
logger.warn("[ExecuteFailed]", e);
next.setAbortCause(getAbortCause(e));
next.setAbortCause(getAbortCause(e, next.getDriverTaskId().getFullId()));
}
scheduler.toAborted(next);
}
Expand Down Expand Up @@ -122,11 +122,12 @@ public void close() throws IOException {
closed = true;
}

private String getAbortCause(final Exception e) {
private Throwable getAbortCause(final Exception e, String fullId) {
Throwable rootCause = ErrorHandlingUtils.getRootCause(e);
if (rootCause instanceof MemoryNotEnoughException) {
return DriverTaskAbortedException.BY_MEMORY_NOT_ENOUGH;
return rootCause;
}
return DriverTaskAbortedException.BY_INTERNAL_ERROR_SCHEDULED;
return new DriverTaskAbortedException(
fullId, DriverTaskAbortedException.BY_INTERNAL_ERROR_SCHEDULED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@

import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -60,6 +59,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -301,7 +301,10 @@ public void abortQuery(QueryId queryId) {
for (Set<DriverTask> fragmentRelatedTasks : queryRelatedTasks.values()) {
if (fragmentRelatedTasks != null) {
for (DriverTask task : fragmentRelatedTasks) {
task.setAbortCause(DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED);
task.setAbortCause(
new DriverTaskAbortedException(
task.getDriverTaskId().getFullId(),
DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED));
clearDriverTask(task);
}
}
Expand All @@ -310,7 +313,7 @@ public void abortQuery(QueryId queryId) {
}

@Override
public void abortFragmentInstance(FragmentInstanceId instanceId) {
public void abortFragmentInstance(FragmentInstanceId instanceId, Throwable t) {
Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks =
queryMap.get(instanceId.getQueryId());
if (queryRelatedTasks != null) {
Expand All @@ -321,7 +324,12 @@ public void abortFragmentInstance(FragmentInstanceId instanceId) {
if (task == null) {
return;
}
task.setAbortCause(DriverTaskAbortedException.BY_FRAGMENT_ABORT_CALLED);
task.setAbortCause(
t == null
? new DriverTaskAbortedException(
task.getDriverTaskId().getFullId(),
DriverTaskAbortedException.BY_FRAGMENT_ABORT_CALLED)
: t);
clearDriverTask(task);
}
}
Expand Down Expand Up @@ -380,12 +388,9 @@ private void clearDriverTask(DriverTask task) {
}
try {
task.lock();
if (task.getAbortCause() != null) {
if (task.getAbortCause().isPresent()) {
try {
task.getDriver()
.failed(
new DriverTaskAbortedException(
task.getDriver().getDriverTaskId().getFullId(), task.getAbortCause()));
task.getDriver().failed(task.getAbortCause().get());
} catch (Exception e) {
logger.error("Clear DriverTask failed", e);
}
Expand Down Expand Up @@ -586,7 +591,7 @@ public void toAborted(DriverTask task) {
task.unlock();
}
clearDriverTask(task);
String abortCause = task.getAbortCause();
Optional<Throwable> abortCause = task.getAbortCause();
QueryId queryId = task.getDriverTaskId().getQueryId();
Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks = queryMap.remove(queryId);
if (queryRelatedTasks != null) {
Expand All @@ -598,9 +603,10 @@ public void toAborted(DriverTask task) {
continue;
}
otherTask.setAbortCause(
StringUtils.isEmpty(abortCause)
? DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED
: abortCause);
abortCause.orElse(
new DriverTaskAbortedException(
otherTask.getDriverTaskId().getFullId(),
DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED)));
clearDriverTask(otherTask);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
public class DriverTaskAbortedException extends Exception {

public static final String BY_TIMEOUT = "timeout";
public static final String BY_FRAGMENT_ABORT_CALLED = " called";
public static final String BY_FRAGMENT_ABORT_CALLED = "called";
public static final String BY_QUERY_CASCADING_ABORTED = "query cascading aborted";
public static final String BY_ALREADY_BEING_CANCELLED = "already being cancelled";
public static final String BY_INTERNAL_ERROR_SCHEDULED = "internal error scheduled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ public void execute(DriverTask task) throws InterruptedException {
ListenableFuture<?> future = driver.processFor(timeSlice);
// If the future is cancelled, the task is in an error and should be thrown.
if (future.isCancelled()) {
task.setAbortCause(DriverTaskAbortedException.BY_ALREADY_BEING_CANCELLED);
task.setAbortCause(
new DriverTaskAbortedException(
task.getDriverTaskId().getFullId(),
DriverTaskAbortedException.BY_ALREADY_BEING_CANCELLED));
scheduler.toAborted(task);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.queryengine.execution.schedule;

import org.apache.iotdb.commons.exception.QueryTimeoutException;
import org.apache.iotdb.db.queryengine.execution.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.db.queryengine.execution.schedule.task.DriverTask;

Expand Down Expand Up @@ -84,7 +85,7 @@ public void execute(DriverTask task) throws InterruptedException {
"[DriverTaskTimeout] Current time is {}, ddl of task is {}",
System.currentTimeMillis(),
task.getDDL());
task.setAbortCause(DriverTaskAbortedException.BY_TIMEOUT);
task.setAbortCause(new QueryTimeoutException());
scheduler.toAborted(task);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,5 @@ void submitDrivers(
*
* @param instanceId the id of the fragment instance to be aborted.
*/
void abortFragmentInstance(FragmentInstanceId instanceId);
void abortFragmentInstance(FragmentInstanceId instanceId, Throwable t);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.airlift.units.Duration;

import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -52,7 +53,7 @@ public class DriverTask implements IDIndexedAccessible {

private final boolean isHighestPriority;

private String abortCause;
private Throwable abortCause;

private final AtomicReference<Priority> priority;

Expand Down Expand Up @@ -149,11 +150,11 @@ public boolean equals(Object o) {
return o instanceof DriverTask && ((DriverTask) o).getDriverTaskId().equals(getDriverTaskId());
}

public String getAbortCause() {
return abortCause;
public Optional<Throwable> getAbortCause() {
return Optional.ofNullable(abortCause);
}

public void setAbortCause(String abortCause) {
public void setAbortCause(Throwable abortCause) {
this.abortCause = abortCause;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.protocol.thrift.OperationType;
import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;

Expand Down Expand Up @@ -161,8 +160,6 @@ private static TSStatus tryCatchQueryException(Exception e) {
return RpcUtils.getStatus(((IoTDBRuntimeException) t).getErrorCode(), t.getMessage());
} else if (t instanceof ModelException) {
return RpcUtils.getStatus(((ModelException) t).getStatusCode(), rootCause.getMessage());
} else if (t instanceof MemoryNotEnoughException) {
return RpcUtils.getStatus(TSStatusCode.QUOTA_MEM_QUERY_NOT_ENOUGH, rootCause.getMessage());
}

if (t instanceof RuntimeException && rootCause instanceof IoTDBException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void testManagingDriver() throws CpuNotEnoughException, MemoryNotEnoughEx
// Abort one FragmentInstance
Mockito.reset(mockDriver1);
Mockito.when(mockDriver1.getDriverTaskId()).thenReturn(driverTaskId1);
manager.abortFragmentInstance(instanceId1);
manager.abortFragmentInstance(instanceId1, null);
Mockito.verify(mockMPPDataExchangeManager, Mockito.times(1))
.forceDeregisterFragmentInstance(Mockito.any());
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
Expand All @@ -146,7 +146,9 @@ public void testManagingDriver() throws CpuNotEnoughException, MemoryNotEnoughEx
Assert.assertEquals(DriverTaskStatus.READY, task3.getStatus());
Assert.assertEquals(DriverTaskStatus.READY, task4.getStatus());
Mockito.verify(mockDriver1, Mockito.times(1)).failed(Mockito.any());
Assert.assertEquals(DriverTaskAbortedException.BY_FRAGMENT_ABORT_CALLED, task1.getAbortCause());
Assert.assertEquals(
"DriverTask test.0.inst-0.0 is aborted by called",
task1.getAbortCause().get().getMessage());

// Abort the whole query
Mockito.reset(mockMPPDataExchangeManager);
Expand All @@ -173,9 +175,11 @@ public void testManagingDriver() throws CpuNotEnoughException, MemoryNotEnoughEx
Mockito.verify(mockDriver3, Mockito.times(1)).failed(Mockito.any());
Mockito.verify(mockDriver4, Mockito.never()).failed(Mockito.any());
Assert.assertEquals(
DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED, task2.getAbortCause());
"DriverTask test.0.inst-1.0 is aborted by query cascading aborted",
task2.getAbortCause().get().getMessage());
Assert.assertEquals(
DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED, task3.getAbortCause());
Assert.assertNull(task4.getAbortCause());
"DriverTask test.0.inst-2.0 is aborted by query cascading aborted",
task3.getAbortCause().get().getMessage());
Assert.assertFalse(task4.getAbortCause().isPresent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void testHandleInvalidStateTask() throws ExecutionException, InterruptedE
executor.execute(testTask);
Assert.assertEquals(DriverTaskStatus.BLOCKED, testTask.getStatus());
Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
Assert.assertNull(testTask.getAbortCause());
Assert.assertFalse(testTask.getAbortCause().isPresent());
Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any());
Mockito.verify(mockScheduler, Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any());
Mockito.verify(mockScheduler, Mockito.never()).runningToFinished(Mockito.any(), Mockito.any());
Expand Down Expand Up @@ -139,7 +139,8 @@ public void testHandleTaskByCancelledInstance() throws ExecutionException, Inter
executor.execute(testTask);
Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
Assert.assertEquals(
DriverTaskAbortedException.BY_ALREADY_BEING_CANCELLED, testTask.getAbortCause());
"DriverTask test.0.inst-0.0 is aborted by already being cancelled",
testTask.getAbortCause().get().getMessage());
Mockito.verify(mockScheduler, Mockito.times(1)).toAborted(Mockito.any());
Mockito.verify(mockScheduler, Mockito.never()).runningToReady(Mockito.any(), Mockito.any());
Mockito.verify(mockScheduler, Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any());
Expand Down Expand Up @@ -179,7 +180,7 @@ public void testHandleTaskByFinishedInstance() throws ExecutionException, Interr
DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY, null, 0, false);
executor.execute(testTask);
Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
Assert.assertNull(testTask.getAbortCause());
Assert.assertFalse(testTask.getAbortCause().isPresent());
Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any());
Mockito.verify(mockScheduler, Mockito.never()).runningToReady(Mockito.any(), Mockito.any());
Mockito.verify(mockScheduler, Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any());
Expand Down Expand Up @@ -229,7 +230,7 @@ public void testHandleTaskByBlockedInstance() throws ExecutionException, Interru
DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY, null, 0, false);
executor.execute(testTask);
Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
Assert.assertNull(testTask.getAbortCause());
Assert.assertFalse(testTask.getAbortCause().isPresent());
Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any());
Mockito.verify(mockScheduler, Mockito.never()).runningToReady(Mockito.any(), Mockito.any());
Mockito.verify(mockScheduler, Mockito.times(1)).runningToBlocked(Mockito.any(), Mockito.any());
Expand Down Expand Up @@ -280,7 +281,7 @@ public void testHandleTaskByReadyInstance() throws ExecutionException, Interrupt
DriverTask testTask = new DriverTask(mockDriver, 100L, DriverTaskStatus.READY, null, 0, false);
executor.execute(testTask);
Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
Assert.assertNull(testTask.getAbortCause());
Assert.assertFalse(testTask.getAbortCause().isPresent());
Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any());
Mockito.verify(mockScheduler, Mockito.times(1)).runningToReady(Mockito.any(), Mockito.any());
Mockito.verify(mockScheduler, Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any());
Expand Down Expand Up @@ -323,7 +324,8 @@ public void testHandleTaskWithInternalError() {
executor.run(); // Here we use run() instead of start() to execute the task in the same thread
Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
Assert.assertEquals(
DriverTaskAbortedException.BY_INTERNAL_ERROR_SCHEDULED, testTask.getAbortCause());
"DriverTask test.0.inst-0.0 is aborted by internal error scheduled",
testTask.getAbortCause().get().getMessage());
Assert.assertEquals(0, taskQueue.size());
Mockito.verify(mockScheduler, Mockito.times(1)).toAborted(Mockito.any());
Mockito.verify(mockScheduler, Mockito.never()).runningToReady(Mockito.any(), Mockito.any());
Expand Down
Loading

0 comments on commit 0db4ec4

Please sign in to comment.