-
Notifications
You must be signed in to change notification settings - Fork 751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-1982] Show a consistent flowExecutionId btwn Compilation & Execution #3854
[GOBBLIN-1982] Show a consistent flowExecutionId btwn Compilation & Execution #3854
Conversation
@@ -309,7 +309,7 @@ public void testReminderEventAcquireLeaseOnCompletedLease() throws IOException, | |||
Thread.sleep(MORE_THAN_EPSILON); | |||
// Now have a reminder event check-in on the completed lease | |||
LeaseAttemptStatus attemptStatus = | |||
mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, selectInfoResult.getEventTimeMillis(), true); | |||
mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, selectInfoResult.getEventTimeMillis(), true, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably want to add one test where the the replacement boolean is set to true and validate that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forgot to commit changes, see test below now
@@ -95,6 +95,7 @@ public Optional<Dag<JobExecutionPlan>> createExecutionPlanIfValid(FlowSpec flowS | |||
} | |||
|
|||
addFlowExecutionIdIfAbsent(flowMetadata, optionalFlowExecutionId, jobExecutionPlanDagOptional.get()); | |||
// TODO: emit flow compilation metric |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the line below already emits a metric, unless you mean like a count?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was also left in by mistake, updated
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #3854 +/- ##
============================================
+ Coverage 47.57% 47.58% +0.01%
+ Complexity 11073 11068 -5
============================================
Files 2160 2160
Lines 85564 85563 -1
Branches 9507 9509 +2
============================================
+ Hits 40703 40717 +14
+ Misses 41153 41136 -17
- Partials 3708 3710 +2 ☔ View full report in Codecov by Sentry. |
…n to the user. * removes flow compilation (and event emission) done before lease arbitration for multi-active scheduler
2bc4f32
to
3b479ae
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
critical change here, and glad to see it's relatively straight-forward and self-contained to plumb in
@@ -53,10 +53,14 @@ public interface MultiActiveLeaseArbiter { | |||
* @param flowAction uniquely identifies the flow and the present action upon it | |||
* @param eventTimeMillis is the time this flow action was triggered | |||
* @param isReminderEvent true if the flow action event we're checking on is a reminder event | |||
* @param skipFlowExecutionIdReplacement if true then does not replace the flowExecutionId in the flowAction returned |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
naming "skip... replacement" seems indirect. better might be to reverse the sense and call it replaceFlowExecId
. even better might be adoptConsensusFlowExecutionId
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed to the latter and updated tests
_log.info("Multi-active scheduler finished handling trigger event: [{}, is: {}, triggerEventTimestamp: {}]", | ||
flowAction, isReminderEvent ? "reminder" : "original", triggerTimestampMillis); | ||
} else { | ||
Optional<TimingEvent> flowCompilationTimer = | ||
this.eventSubmitter.transform(submitter -> new TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED)); | ||
Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd clarify the sense of optional by calling compiledDag
or validatedDag
@@ -463,7 +461,7 @@ private void deleteFromExecutor(Spec spec, Properties headers) { | |||
Deletes spec from flowCatalog if it is an adhoc flow (not containing a job schedule) | |||
*/ | |||
private void deleteSpecFromCatalogIfAdhoc(FlowSpec flowSpec) { | |||
if (!flowSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)) { | |||
if (!flowSpec.isScheduled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
big improvement! :)
did we have this method all along, but just weren't using it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I went to add this util method thinking it would be useful and found it already exists
Re-running tests on my branch https://github.com/umustafi/gobblin/pull/16/checks the one that failed here looks flakey as it took 120 min to have a timeout. |
* @param skipFlowExecutionIdReplacement if true then does not replace the flowExecutionId in the flowAction returned | ||
* in LeaseAttemptStatuses | ||
* @param adoptConsensusFlowExecutionId if true then replaces the flowAction flowExecutionId returned in | ||
* LeaseAttemptStatuses with the consensual eventTime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"consensus" seems the more canonical term and can be used as an adj
…xecution (apache#3854) * Use existing FlowExecutionId for non-scheduled flows to match id given to the user. * removes flow compilation (and event emission) done before lease arbitration for multi-active scheduler * Rename fields to make more readable * Update javadoc --------- Co-authored-by: Urmi Mustafi <[email protected]>
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
The problem statement addressed in this issue is to determine a unique ID per execution that is agreed upon by all hosts, computed before returning any information back to user (about compilation or execution).
Upon receiving the request for an
adhoc flow
, the recipient host creates aflowExecutionId
when initializingFlowSpec
fromconfig
for non-scheduled flows (see code). ThisflowExecutionId
is returned to the user for tracking the flow status. This should not change later on.Scheduled flows are fired upon each host at a different system clock time, so those ones need a consensus mechanism to coordinate between hosts. During
multiActiveLeaseArbitration
we update theflowExecutionId
of aDagAction
with an agreed upon value from the database to gain this consistency. However, this should only be done for scheduled flows before we any information externally about theflowExecutionId
until later.To address the problems above we
flowExecutionId
replacement for adhoc flowsGTE
emission before the consensus onflowExecutionId
is removed.There's no significant impact of removing this check. It will result in
dagActions
created for flows that may fail compilation later (after lease arbitration and before execution). Since we already compile the flow on accepting it, we are okay with a slight delay in failing a flow.Tests
The flowExecutionId replacement is tested by existing unit test
testAcquireLeaseSingleParticipant
and the new functionality is tested bytestSkipFlowExecutionIdReplacement
.Commits