Skip to content

Commit

Permalink
Overload function and add more documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
Urmi Mustafi committed Nov 14, 2023
1 parent 3d80b3e commit 36162db
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@
* than epsilon and encapsulate executor communication latency including retry attempts
*
* The `event_timestamp` is the time of the flow_action event request.
* --- Note ---
* --- Database event_timestamp laundering ---
* We only use the participant's local event_timestamp internally to identify the particular flow_action event, but
* after interacting with the database utilize the CURRENT_TIMESTAMP of the database to insert or keep
* track of our event. This is to avoid any discrepancies due to clock drift between participants as well as
* variation in local time and database time for future comparisons.
* ---Event consolidation---
* track of our event, "laundering" or replacing the local timestamp with the database one. This is to avoid any
* discrepancies due to clock drift between participants as well as variation in local time and database time for
* future comparisons.
* --- Event consolidation ---
* Note that for the sake of simplification, we only allow one event associated with a particular flow's flow_action
* (ie: only one LAUNCH for example of flow FOO, but there can be a LAUNCH, KILL, & RESUME for flow FOO at once) during
* the time it takes to execute the flow action. In most cases, the execution time should be so negligible that this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil
return;
}
Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec);
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, Optional.absent(),
jobExecutionPlanDagOptional.get());
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDagOptional.get());

// If multi-active scheduler is enabled do not pass onto DagManager, otherwise scheduler forwards it directly
// Skip flow compilation as well, since we recompile after receiving event from DagActionStoreChangeMonitor later
Expand Down Expand Up @@ -286,8 +285,7 @@ public void orchestrate(Spec spec, Properties jobProps, long triggerTimestampMil
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);

FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, Optional.absent(),
jobExecutionPlanDag);
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
if (flowCompilationTimer.isPresent()) {
flowCompilationTimer.get().stop(flowMetadata);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ public final class FlowCompilationValidationHelper {
* flowspec can be compiled. If the pre-conditions hold, then a JobExecutionPlan is constructed and returned to the
* caller.
* @param flowSpec
* @param optionalFlowExecutionId provided for executions of scheduled events which should use a consistent id
* @param optionalFlowExecutionId for scheduled (non-ad-hoc) flows, to pass the ID "laundered" via the DB;
* see: {@link MysqlMultiActiveLeaseArbiter javadoc section titled
* `Database event_timestamp laundering`}
* @return jobExecutionPlan dag if one can be constructed for the given flowSpec
*/
public Optional<Dag<JobExecutionPlan>> createExecutionPlanIfValid(FlowSpec flowSpec,
Expand Down Expand Up @@ -178,6 +180,15 @@ public static void populateFlowCompilationFailedEventMessage(Optional<EventSubmi
}
}

/**
* If it is a scheduled flow (which does not have flowExecutionId in the FlowSpec) and the flow compilation is
* successful, retrieve flowExecutionId from the JobSpec.
*/
public static void addFlowExecutionIdIfAbsent(Map<String,String> flowMetadata,
Dag<JobExecutionPlan> jobExecutionPlanDag) {
addFlowExecutionIdIfAbsent(flowMetadata, Optional.absent(), jobExecutionPlanDag);
}

/**
* If it is a scheduled flow (which does not have flowExecutionId in the FlowSpec) and the flow compilation is
* successful, add a flowExecutionId using the optional parameter if it exists otherwise retrieve it from the JobSpec.
Expand Down

0 comments on commit 36162db

Please sign in to comment.