Skip to content

Commit

Permalink
add debug log
Browse files Browse the repository at this point in the history
  • Loading branch information
BePPPower committed Jan 24, 2025
1 parent 993eeef commit 7459eb7
Showing 1 changed file with 34 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
Expand All @@ -50,6 +52,7 @@

@Slf4j
public class ExportTaskExecutor implements TransientTaskExecutor {
private static final Logger LOG = LogManager.getLogger(ExportTaskExecutor.class);

List<StatementBase> selectStmtLists;

Expand Down Expand Up @@ -78,22 +81,32 @@ public Long getId() {

@Override
public void execute() throws JobException {
LOG.debug("[Export Task] taskId: {} starting execution", taskId);
if (isCanceled.get()) {
LOG.debug("[Export Task] taskId: {} was already canceled before execution", taskId);
throw new JobException("Export executor has been canceled, task id: {}", taskId);
}
LOG.debug("[Export Task] taskId: {} updating state to EXPORTING", taskId);
exportJob.updateExportJobState(ExportJobState.EXPORTING, taskId, null, null, null);
List<OutfileInfo> outfileInfoList = Lists.newArrayList();
for (int idx = 0; idx < selectStmtLists.size(); ++idx) {
LOG.debug("[Export Task] taskId: {} processing statement {}/{}",
taskId, idx + 1, selectStmtLists.size());
if (isCanceled.get()) {
LOG.debug("[Export Task] taskId: {} canceled during execution at statement {}", taskId, idx + 1);
throw new JobException("Export executor has been canceled, task id: {}", taskId);
}
// check the version of tablets, skip if the consistency is in partition level.
if (exportJob.getExportTable().isManagedTable() && !exportJob.isPartitionConsistency()) {
LOG.debug("[Export Task] taskId: {} checking tablet versions for statement {}", taskId, idx + 1);
try {
Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(
exportJob.getTableName().getDb());
OlapTable table = db.getOlapTableOrAnalysisException(exportJob.getTableName().getTbl());
LOG.debug("[Export Lock] taskId: {}, table: {} about to acquire readLock",
taskId, table.getName());
table.readLock();
LOG.debug("[Export Lock] taskId: {}, table: {} acquired readLock", taskId, table.getName());
try {
List<Long> tabletIds;
LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) selectStmtLists.get(idx);
Expand All @@ -108,18 +121,26 @@ public void execute() throws JobException {
long nowVersion = partition.getVisibleVersion();
long oldVersion = exportJob.getPartitionToVersion().get(partition.getName());
if (nowVersion != oldVersion) {
LOG.debug("[Export Lock] taskId: {}, table: {} about to release readLock"
+ "due to version mismatch", taskId, table.getName());
exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null,
CancelType.RUN_FAIL, "The version of tablet {" + tabletId + "} has changed");
throw new JobException("Export Job[{}]: Tablet {} has changed version, old version = {}"
+ ", now version = {}", exportJob.getId(), tabletId, oldVersion, nowVersion);
}
}
} catch (Exception e) {
LOG.debug("[Export Lock] taskId: {}, table: {} about to release readLock"
+ "due to exception: {}", taskId, table.getName(), e.getMessage());
exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null,
ExportFailMsg.CancelType.RUN_FAIL, e.getMessage());
throw new JobException(e);
} finally {
LOG.debug("[Export Lock] taskId: {}, table: {} releasing readLock in finally block",
taskId, table.getName());
table.readUnlock();
LOG.debug("[Export Lock] taskId: {}, table: {} released readLock successfully",
taskId, table.getName());
}
} catch (AnalysisException e) {
exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null,
Expand All @@ -129,26 +150,39 @@ public void execute() throws JobException {
}

try (AutoCloseConnectContext r = buildConnectContext()) {
LOG.debug("[Export Task] taskId: {} executing statement {}", taskId, idx + 1);
stmtExecutor = new StmtExecutor(r.connectContext, selectStmtLists.get(idx));
stmtExecutor.execute();
if (r.connectContext.getState().getStateType() == MysqlStateType.ERR) {
LOG.debug("[Export Task] taskId: {} failed with MySQL error: {}", taskId,
r.connectContext.getState().getErrorMessage());
exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null,
ExportFailMsg.CancelType.RUN_FAIL, r.connectContext.getState().getErrorMessage());
return;
}
LOG.debug("[Export Task] taskId: {} statement {} executed successfully", taskId, idx + 1);
OutfileInfo outfileInfo = getOutFileInfo(r.connectContext.getResultAttachedInfo());
LOG.debug("[Export Task] taskId: {} got outfile info for statement {}:"
+ "fileNumber={}, totalRows={}, fileSize={}",
taskId, idx + 1, outfileInfo.getFileNumber(),
outfileInfo.getTotalRows(), outfileInfo.getFileSize());
outfileInfoList.add(outfileInfo);
} catch (Exception e) {
LOG.debug("[Export Task] taskId: {} failed with exception during statement {}: {}",
taskId, idx + 1, e.getMessage(), e);
exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null,
ExportFailMsg.CancelType.RUN_FAIL, e.getMessage());
throw new JobException(e);
}
}
if (isCanceled.get()) {
LOG.debug("[Export Task] taskId: {} canceled after processing all statements", taskId);
throw new JobException("Export executor has been canceled, task id: {}", taskId);
}
LOG.debug("[Export Task] taskId: {} completed successfully, updating state to FINISHED", taskId);
exportJob.updateExportJobState(ExportJobState.FINISHED, taskId, outfileInfoList, null, null);
isFinished.getAndSet(true);
LOG.debug("[Export Task] taskId: {} execution completed", taskId);
}

@Override
Expand Down

0 comments on commit 7459eb7

Please sign in to comment.