Skip to content

Commit

Permalink
Fix wrong error msg
Browse files Browse the repository at this point in the history
  • Loading branch information
JackieTien97 committed Oct 24, 2024
1 parent 4e79200 commit 567df30
Show file tree
Hide file tree
Showing 15 changed files with 73 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ public enum TSStatusCode {
EXPLAIN_ANALYZE_FETCH_ERROR(716),
TOO_MANY_CONCURRENT_QUERIES_ERROR(717),

OPERATOR_NOT_FOUND(716),
OPERATOR_NOT_FOUND(718),

QUERY_EXECUTION_MEMORY_NOT_ENOUGH(719),
QUERY_TIMEOUT(720),

// Arithmetic
NUMERIC_VALUE_OUT_OF_RANGE(750),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

package org.apache.iotdb.db.queryengine.exception;

public class MemoryNotEnoughException extends RuntimeException {
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;

import static org.apache.iotdb.rpc.TSStatusCode.QUERY_EXECUTION_MEMORY_NOT_ENOUGH;

public class MemoryNotEnoughException extends IoTDBRuntimeException {

public MemoryNotEnoughException(String message) {
super(message);
super(message, QUERY_EXECUTION_MEMORY_NOT_ENOUGH.getStatusCode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,11 @@ private ListenableFuture<?> processInternal() {
List<StackTraceElement> interrupterStack = exclusiveLock.getInterrupterStack();
if (interrupterStack == null) {
driverContext.failed(t);
throw new RuntimeException(t);
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RuntimeException(t);
}
}

// Driver thread was interrupted which should only happen if the task is already finished.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,10 @@ public FragmentInstanceContext getFragmentInstanceContext() {
}

public void failed(Throwable cause) {
fragmentInstanceContext.failed(cause);
finished.set(true);
if (finished.compareAndSet(false, true)) {
fragmentInstanceContext.failed(cause);
finished.set(true);
}
}

public void finished() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,11 @@ public SessionInfo getSessionInfo() {
}

public Optional<Throwable> getFailureCause() {
return Optional.ofNullable(stateMachine.getFailureCauses().peek());
return Optional.ofNullable(
stateMachine.getFailureCauses().stream()
.filter(e -> e instanceof IoTDBException || e instanceof IoTDBRuntimeException)
.findFirst()
.orElse(stateMachine.getFailureCauses().peek()));
}

public Filter getGlobalTimeFilter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ private void initialize(IDriverScheduler scheduler, boolean isExplainAnalyze) {
context.releaseMemoryReservationManager();

if (newState.isFailed()) {
scheduler.abortFragmentInstance(instanceId);
scheduler.abortFragmentInstance(instanceId, context.getFailureCause().orElse(null));
}
} catch (Throwable t) {
try (SetThreadName threadName = new SetThreadName(instanceId.getFullId())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ public void run() {
new SetThreadName(next.getDriver().getDriverTaskId().getFullId())) {
Throwable rootCause = ErrorHandlingUtils.getRootCause(e);
if (rootCause instanceof IoTDBRuntimeException) {
next.setAbortCause(e.getMessage());
next.setAbortCause(rootCause);
} else {
logger.warn("[ExecuteFailed]", e);
next.setAbortCause(getAbortCause(e));
next.setAbortCause(getAbortCause(e, next.getDriverTaskId().getFullId()));
}
scheduler.toAborted(next);
}
Expand Down Expand Up @@ -122,11 +122,12 @@ public void close() throws IOException {
closed = true;
}

private String getAbortCause(final Exception e) {
private Throwable getAbortCause(final Exception e, String fullId) {
Throwable rootCause = ErrorHandlingUtils.getRootCause(e);
if (rootCause instanceof MemoryNotEnoughException) {
return DriverTaskAbortedException.BY_MEMORY_NOT_ENOUGH;
return rootCause;
}
return DriverTaskAbortedException.BY_INTERNAL_ERROR_SCHEDULED;
return new DriverTaskAbortedException(
fullId, DriverTaskAbortedException.BY_INTERNAL_ERROR_SCHEDULED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@

import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -60,6 +59,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -301,7 +301,10 @@ public void abortQuery(QueryId queryId) {
for (Set<DriverTask> fragmentRelatedTasks : queryRelatedTasks.values()) {
if (fragmentRelatedTasks != null) {
for (DriverTask task : fragmentRelatedTasks) {
task.setAbortCause(DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED);
task.setAbortCause(
new DriverTaskAbortedException(
task.getDriverTaskId().getFullId(),
DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED));
clearDriverTask(task);
}
}
Expand All @@ -310,7 +313,7 @@ public void abortQuery(QueryId queryId) {
}

@Override
public void abortFragmentInstance(FragmentInstanceId instanceId) {
public void abortFragmentInstance(FragmentInstanceId instanceId, Throwable t) {
Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks =
queryMap.get(instanceId.getQueryId());
if (queryRelatedTasks != null) {
Expand All @@ -321,7 +324,12 @@ public void abortFragmentInstance(FragmentInstanceId instanceId) {
if (task == null) {
return;
}
task.setAbortCause(DriverTaskAbortedException.BY_FRAGMENT_ABORT_CALLED);
task.setAbortCause(
t == null
? new DriverTaskAbortedException(
task.getDriverTaskId().getFullId(),
DriverTaskAbortedException.BY_FRAGMENT_ABORT_CALLED)
: t);
clearDriverTask(task);
}
}
Expand Down Expand Up @@ -380,12 +388,9 @@ private void clearDriverTask(DriverTask task) {
}
try {
task.lock();
if (task.getAbortCause() != null) {
if (task.getAbortCause().isPresent()) {
try {
task.getDriver()
.failed(
new DriverTaskAbortedException(
task.getDriver().getDriverTaskId().getFullId(), task.getAbortCause()));
task.getDriver().failed(task.getAbortCause().get());
} catch (Exception e) {
logger.error("Clear DriverTask failed", e);
}
Expand Down Expand Up @@ -586,7 +591,7 @@ public void toAborted(DriverTask task) {
task.unlock();
}
clearDriverTask(task);
String abortCause = task.getAbortCause();
Optional<Throwable> abortCause = task.getAbortCause();
QueryId queryId = task.getDriverTaskId().getQueryId();
Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks = queryMap.remove(queryId);
if (queryRelatedTasks != null) {
Expand All @@ -598,9 +603,10 @@ public void toAborted(DriverTask task) {
continue;
}
otherTask.setAbortCause(
StringUtils.isEmpty(abortCause)
? DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED
: abortCause);
abortCause.orElse(
new DriverTaskAbortedException(
otherTask.getDriverTaskId().getFullId(),
DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED)));
clearDriverTask(otherTask);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ public void execute(DriverTask task) throws InterruptedException {
ListenableFuture<?> future = driver.processFor(timeSlice);
// If the future is cancelled, the task is in an error and should be thrown.
if (future.isCancelled()) {
task.setAbortCause(DriverTaskAbortedException.BY_ALREADY_BEING_CANCELLED);
task.setAbortCause(
new DriverTaskAbortedException(
task.getDriverTaskId().getFullId(),
DriverTaskAbortedException.BY_ALREADY_BEING_CANCELLED));
scheduler.toAborted(task);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.queryengine.execution.schedule;

import org.apache.iotdb.commons.exception.QueryTimeoutException;
import org.apache.iotdb.db.queryengine.execution.schedule.queue.IndexedBlockingQueue;
import org.apache.iotdb.db.queryengine.execution.schedule.task.DriverTask;

Expand Down Expand Up @@ -84,7 +85,7 @@ public void execute(DriverTask task) throws InterruptedException {
"[DriverTaskTimeout] Current time is {}, ddl of task is {}",
System.currentTimeMillis(),
task.getDDL());
task.setAbortCause(DriverTaskAbortedException.BY_TIMEOUT);
task.setAbortCause(new QueryTimeoutException());
scheduler.toAborted(task);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,5 @@ void submitDrivers(
*
* @param instanceId the id of the fragment instance to be aborted.
*/
void abortFragmentInstance(FragmentInstanceId instanceId);
void abortFragmentInstance(FragmentInstanceId instanceId, Throwable t);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.airlift.units.Duration;

import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -52,7 +53,7 @@ public class DriverTask implements IDIndexedAccessible {

private final boolean isHighestPriority;

private String abortCause;
private Throwable abortCause;

private final AtomicReference<Priority> priority;

Expand Down Expand Up @@ -149,11 +150,11 @@ public boolean equals(Object o) {
return o instanceof DriverTask && ((DriverTask) o).getDriverTaskId().equals(getDriverTaskId());
}

public String getAbortCause() {
return abortCause;
public Optional<Throwable> getAbortCause() {
return Optional.ofNullable(abortCause);
}

public void setAbortCause(String abortCause) {
public void setAbortCause(Throwable abortCause) {
this.abortCause = abortCause;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.protocol.thrift.OperationType;
import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;

Expand Down Expand Up @@ -161,8 +160,6 @@ private static TSStatus tryCatchQueryException(Exception e) {
return RpcUtils.getStatus(((IoTDBRuntimeException) t).getErrorCode(), t.getMessage());
} else if (t instanceof ModelException) {
return RpcUtils.getStatus(((ModelException) t).getStatusCode(), rootCause.getMessage());
} else if (t instanceof MemoryNotEnoughException) {
return RpcUtils.getStatus(TSStatusCode.QUOTA_MEM_QUERY_NOT_ENOUGH, rootCause.getMessage());
}

if (t instanceof RuntimeException && rootCause instanceof IoTDBException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void testManagingDriver() throws CpuNotEnoughException, MemoryNotEnoughEx
// Abort one FragmentInstance
Mockito.reset(mockDriver1);
Mockito.when(mockDriver1.getDriverTaskId()).thenReturn(driverTaskId1);
manager.abortFragmentInstance(instanceId1);
manager.abortFragmentInstance(instanceId1, null);
Mockito.verify(mockMPPDataExchangeManager, Mockito.times(1))
.forceDeregisterFragmentInstance(Mockito.any());
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.apache.iotdb.commons.exception;

import static org.apache.iotdb.rpc.TSStatusCode.QUERY_TIMEOUT;

public class QueryTimeoutException extends IoTDBRuntimeException {

public QueryTimeoutException() {
super("Query execution is time out", QUERY_TIMEOUT.getStatusCode(), true);
}
}

0 comments on commit 567df30

Please sign in to comment.