Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
arjun4084346 committed Jan 10, 2024
1 parent ca84364 commit fe68d61
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ public synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist, bool
if (!this.runQueue[queueId].offer(dag)) {
throw new IOException("Could not add dag" + dagId + "to queue");
}
if (setStatus) {
DagManagerUtils.submitAndSet(dag, this.eventSubmitter);
if (setStatus && this.eventSubmitter.isPresent()) {
DagManagerUtils.submitPendingExecStatus(dag, this.eventSubmitter.get());
}
}

Expand Down Expand Up @@ -655,15 +655,15 @@ private void beginResumingDag(DagId dagIdToResume) throws IOException {
*/
private void finishResumingDags() throws IOException {
for (Map.Entry<String, Dag<JobExecutionPlan>> dag : this.resumingDags.entrySet()) {
JobStatus flowStatus = DagManagerUtils.pollFlowStatus(dag.getValue(), this.jobStatusRetriever, this.jobStatusPolledTimer);
if (flowStatus == null || !flowStatus.getEventName().equals(PENDING_RESUME.name())) {
Optional<JobStatus> flowStatus = DagManagerUtils.pollFlowStatus(dag.getValue(), this.jobStatusRetriever, this.jobStatusPolledTimer);
if (!flowStatus.isPresent() || !flowStatus.get().getEventName().equals(PENDING_RESUME.name())) {
continue;
}

boolean dagReady = true;
for (DagNode<JobExecutionPlan> node : dag.getValue().getNodes()) {
JobStatus jobStatus = DagManagerUtils.pollJobStatus(node, this.jobStatusRetriever, this.jobStatusPolledTimer);
if (jobStatus == null || jobStatus.getEventName().equals(FAILED.name()) || jobStatus.getEventName().equals(CANCELLED.name())) {
Optional<JobStatus> jobStatus = DagManagerUtils.pollJobStatus(node, this.jobStatusRetriever, this.jobStatusPolledTimer);
if (!jobStatus.isPresent() || jobStatus.get().getEventName().equals(FAILED.name()) || jobStatus.get().getEventName().equals(CANCELLED.name())) {
dagReady = false;
break;
}
Expand Down Expand Up @@ -795,7 +795,7 @@ private void pollAndAdvanceDag() {
try {
boolean slaKilled = slaKillIfNeeded(node);

JobStatus jobStatus = DagManagerUtils.pollJobStatus(node, this.jobStatusRetriever, this.jobStatusPolledTimer);
Optional<JobStatus> jobStatus = DagManagerUtils.pollJobStatus(node, this.jobStatusRetriever, this.jobStatusPolledTimer);

boolean killOrphanFlow = killJobIfOrphaned(node, jobStatus);

Expand Down Expand Up @@ -830,9 +830,9 @@ private void pollAndAdvanceDag() {
break;
}

if (jobStatus != null && jobStatus.isShouldRetry()) {
if (jobStatus.isPresent() && jobStatus.get().isShouldRetry()) {
log.info("Retrying job: {}, current attempts: {}, max attempts: {}", DagManagerUtils.getFullyQualifiedJobName(node),
jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
jobStatus.get().getCurrentAttempts(), jobStatus.get().getMaxAttempts());
this.jobToDag.get(node).setFlowEvent(null);
submitJob(node);
}
Expand Down Expand Up @@ -865,14 +865,14 @@ private void pollAndAdvanceDag() {
* @return true if the total time that the job remains in the ORCHESTRATED state exceeds
* {@value ConfigurationKeys#GOBBLIN_JOB_START_SLA_TIME}.
*/
private boolean killJobIfOrphaned(DagNode<JobExecutionPlan> node, JobStatus jobStatus)
private boolean killJobIfOrphaned(DagNode<JobExecutionPlan> node, Optional<JobStatus> jobStatus)
throws ExecutionException, InterruptedException {
if (jobStatus == null) {
if (!jobStatus.isPresent()) {
return false;
}
ExecutionStatus executionStatus = valueOf(jobStatus.getEventName());
ExecutionStatus executionStatus = valueOf(jobStatus.get().getEventName());
long timeOutForJobStart = DagManagerUtils.getJobStartSla(node, this.defaultJobStartSlaTimeMillis);
long jobOrchestratedTime = jobStatus.getOrchestratedTime();
long jobOrchestratedTime = jobStatus.get().getOrchestratedTime();
if (executionStatus == ORCHESTRATED && System.currentTimeMillis() - jobOrchestratedTime > timeOutForJobStart) {
log.info("Job {} of flow {} exceeded the job start SLA of {} ms. Killing the job now...",
DagManagerUtils.getJobName(node),
Expand All @@ -890,14 +890,14 @@ private boolean killJobIfOrphaned(DagNode<JobExecutionPlan> node, JobStatus jobS
}
}

private ExecutionStatus getJobExecutionStatus(boolean slaKilled, boolean killOrphanFlow, JobStatus jobStatus) {
private ExecutionStatus getJobExecutionStatus(boolean slaKilled, boolean killOrphanFlow, Optional<JobStatus> jobStatus) {
if (slaKilled || killOrphanFlow) {
return CANCELLED;
} else {
if (jobStatus == null) {
if (!jobStatus.isPresent()) {
return PENDING;
} else {
return valueOf(jobStatus.getEventName());
return valueOf(jobStatus.get().getEventName());
}
}
}
Expand Down Expand Up @@ -1129,8 +1129,8 @@ private void cleanUp() {
for (Iterator<String> dagIdIterator = this.dagIdstoClean.iterator(); dagIdIterator.hasNext();) {
String dagId = dagIdIterator.next();
Dag<JobExecutionPlan> dag = this.dags.get(dagId);
JobStatus flowStatus = DagManagerUtils.pollFlowStatus(dag, this.jobStatusRetriever, this.jobStatusPolledTimer);
if (flowStatus != null && FlowStatusGenerator.FINISHED_STATUSES.contains(flowStatus.getEventName())) {
Optional<JobStatus> flowStatus = DagManagerUtils.pollFlowStatus(dag, this.jobStatusRetriever, this.jobStatusPolledTimer);
if (flowStatus.isPresent() && FlowStatusGenerator.FINISHED_STATUSES.contains(flowStatus.get().getEventName())) {
FlowId flowId = DagManagerUtils.getFlowId(dag);
switch(dag.getFlowEvent()) {
case TimingEvent.FlowTimings.FLOW_SUCCEEDED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,21 +390,19 @@ static List<String> getDistinctUniqueRequesters(String serializedRequesters) {
}
}

public static void submitAndSet(Dag<JobExecutionPlan> dag, Optional<EventSubmitter> eventSubmitter) {
eventSubmitter.toJavaUtil().ifPresent(es -> {
for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(dagNode);
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
jobExecutionPlan.setExecutionStatus(PENDING);
}
});
public static void submitPendingExecStatus(Dag<JobExecutionPlan> dag, EventSubmitter eventSubmitter) {
for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(dagNode);
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
jobExecutionPlan.setExecutionStatus(PENDING);
}
}

/**
* Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
*/
public static JobStatus pollJobStatus(DagNode<JobExecutionPlan> dagNode, JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
public static Optional<JobStatus> pollJobStatus(DagNode<JobExecutionPlan> dagNode, JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
Expand All @@ -418,9 +416,9 @@ public static JobStatus pollJobStatus(DagNode<JobExecutionPlan> dagNode, JobStat
/**
* Retrieve the flow's {@link JobStatus} (i.e. job status with {@link JobStatusRetriever#NA_KEY} as job name/group) from a dag
*/
public static JobStatus pollFlowStatus(Dag<JobExecutionPlan> dag, JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
public static Optional<JobStatus> pollFlowStatus(Dag<JobExecutionPlan> dag, JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
if (dag == null || dag.isEmpty()) {
return null;
return Optional.absent();
}
Config jobConfig = dag.getNodes().get(0).getValue().getJobSpec().getConfig();
String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
Expand All @@ -433,17 +431,17 @@ public static JobStatus pollFlowStatus(Dag<JobExecutionPlan> dag, JobStatusRetr
/**
* Retrieve the flow's {@link JobStatus} and update the timer if jobStatusPolledTimer is present.
*/
public static JobStatus pollStatus(String flowGroup, String flowName, long flowExecutionId, String jobGroup, String jobName,
public static Optional<JobStatus> pollStatus(String flowGroup, String flowName, long flowExecutionId, String jobGroup, String jobName,
JobStatusRetriever jobStatusRetriever, Optional<Timer> jobStatusPolledTimer) {
long pollStartTime = System.nanoTime();
Iterator<JobStatus> jobStatusIterator =
jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId, jobName, jobGroup);
Instrumented.updateTimer(jobStatusPolledTimer, System.nanoTime() - pollStartTime, TimeUnit.NANOSECONDS);

if (jobStatusIterator.hasNext()) {
return jobStatusIterator.next();
return Optional.of(jobStatusIterator.next());
} else {
return null;
return Optional.absent();
}
}

Expand Down

0 comments on commit fe68d61

Please sign in to comment.