-
Notifications
You must be signed in to change notification settings - Fork 751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[draft] Dag refactor #3825
[draft] Dag refactor #3825
Conversation
1ebc701
to
55b2ae2
Compare
97dd7b0
to
2b88493
Compare
8eac85d
to
3845841
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my initial thoughts on PR, will go in depth to check everything.
// public void addDag(Dag<JobExecutionPlan> dag); | ||
|
||
public void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); | ||
|
||
public void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); | ||
|
||
public boolean hasRunningJobs(String dagId); | ||
|
||
public void removeDagActionFromStore(DagActionStore.DagAction dagAction) throws IOException; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put TODO for the commented out methods?
DagActionStore.DagAction dagAction = DagManagerUtils.createDagAction(flowGroup, flowName, String.valueOf(flowExecutionId), DagActionStore.FlowActionType.KILL); | ||
this.dagProcessingEngine.get().addDagAction(dagAction); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this point we should also/instead add a line to store the new dag action in dagAction store so we can receive this in changeMonitor across all hosts. Remember we want all actions persisted in MySQL and then coordination across hosts.
return; | ||
} | ||
// todo - find next dag node to run | ||
Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(new JobExecutionPlan(JobSpec.builder().build(), null)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use dagId to retrieve DagNode from DagManagementStateStore (the next dagNode without a status)
@Slf4j | ||
public abstract class DagProc<T extends DagTask> { | ||
protected final DagProcFactory dagProcFactory; | ||
protected final UserQuotaManager quotaManager; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should checking quota happen in DagProc or in TaskStream (dagProc needs dagNode to check).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make the quota checking a util function as we may need to reuse for launch and advance and retry/resume
//flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, System.currentTimeMillis()); | ||
//if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) { | ||
// can it return null? is this iterator allowed to return null? | ||
return createDagTask(dagAction, new MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction, System.currentTimeMillis())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now we are spoofing getting LeaseObtainedStatus
since there is no handler at the moment, but we should have a while loop that continues polling until we get a DagTask that we obtain lease for. Let's still encode that logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while lease not obtained { // poll dag action queue // attempt lease on it }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding that will create findBugsMain error
public class NewDagManager implements DagManagement { | ||
public static final String DAG_MANAGER_PREFIX = "gobblin.service.dagManager."; | ||
|
||
private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this copy of NewDagManager needed or can existing DagManager be re-used? What are the key differences here? The main change we want to make is that we have an abstraction around obtaining/reading dags through the DagManagementStateStore
to interchangeably use in memory versus mysql dag storage right.
private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>(); | ||
private final Map<String, Dag<JobExecutionPlan>> resumingDags = new HashMap<>(); | ||
private static final long DAG_FLOW_STATUS_TOLERANCE_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5); | ||
|
||
|
||
private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag = new HashMap<>(); | ||
private final Map<String, Dag.DagNode<JobExecutionPlan>> dagNodes = new HashMap<>(); | ||
private final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new HashMap<>(); | ||
final Map<String, Long> dagToSLA = new HashMap<>(); | ||
DagManager.DagManagerThread[] dagManagerThreads; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DagManagement
has api to use DagManagementStateStore
and we should remove these in-memory references
1b826a4
to
280ba6d
Compare
gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
Show resolved
Hide resolved
long flowExecutionId = ConfigUtils.getLong(jobConfig, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 0L); | ||
String jobName = ConfigUtils.getString(jobConfig, ConfigurationKeys.JOB_NAME_KEY, ""); | ||
String jobGroup = ConfigUtils.getString(jobConfig, ConfigurationKeys.JOB_GROUP_KEY, ""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we codify the defaults used for jobname/group, execution id
if none are provided in ConfigurationKeys? We may need again in other locations.
void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); | ||
void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can u add brief java doc to explain how these are used? I expect these methods to be defined in DagStateStore
and these are a wrapper around those and these methods are meant to be used only internally and should be private/protected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addJobState
is called during dag initialization. It was used privately because DagManager does dag initialization. But now when we are moving stuffs away, DagProc
is doing dag initialization and is using it from outside.
void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); | ||
|
||
/** | ||
* defines what to do when a job (dag node) finishes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: capitalize first word here and in other docs
Map<String, Set<Dag.DagNode<JobExecutionPlan>>> submitNext(String dagId) throws IOException; | ||
void removeDagActionFromStore(DagManager.DagId dagIdToResume, DagActionStore.FlowActionType flowActionType) | ||
throws IOException; | ||
|
||
void handleJobStatusEvent(JobStatusEvent jobStatusEvent); | ||
void handleKillFlowEvent(KillFlowEvent killFlowEvent); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's be consistent with spacing, this is a bit hard to read. Can you add sections for the methods? ie:
// High Level flow actions
void handleKillFlowEvent...
//flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction, System.currentTimeMillis()); | ||
//if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) { | ||
// can it return null? is this iterator allowed to return null? | ||
return createDagTask(dagAction, new MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction, System.currentTimeMillis())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while lease not obtained { // poll dag action queue // attempt lease on it }
case RESUME: | ||
case LAUNCH: | ||
case ADVANCE: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add TODOs to complete later
//dagTask.conclude(this.flowTriggerHandler.getMultiActiveLeaseArbiter()); | ||
} | ||
|
||
public void addDagAction(DagActionStore.DagAction dagAction) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clarify name "addDagActionToQueue"
boolean flowKilled = enforceFlowStartDeadline(node); | ||
boolean jobKilled = false; | ||
|
||
if (!flowKilled) { | ||
JobStatus jobStatus = pollJobStatus(node); | ||
try { | ||
jobKilled = enforceJobCompletionDeadline(node, jobStatus); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we try to enforce the SLA kills first even when we receive an API request? I was not aware this is the norm. Is this not supposed to kill the dag because of api request or its used differently?
private JobStatus pollStatus(String flowGroup, String flowName, long flowExecutionId, String jobGroup, String jobName) { | ||
long pollStartTime = System.nanoTime(); | ||
Iterator<JobStatus> jobStatusIterator = | ||
this.dagManager.getJobStatusRetriever().getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId, jobName, jobGroup); | ||
Instrumented.updateTimer(this.dagManager.getJobStatusPolledTimer(), System.nanoTime() - pollStartTime, TimeUnit.NANOSECONDS); | ||
|
||
if (jobStatusIterator.hasNext()) { | ||
return jobStatusIterator.next(); | ||
} else { | ||
return null; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method seems like it would be used by other action threads too. can we put this into a util class?
@Slf4j | ||
public abstract class DagProc<T extends DagTask> { | ||
protected final DagProcFactory dagProcFactory; | ||
protected final UserQuotaManager quotaManager; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make the quota checking a util function as we may need to reuse for launch and advance and retry/resume
try { | ||
quotaManager.checkQuota(Collections.singleton(dagNode)); | ||
|
||
producer = DagManagerUtils.getSpecProducer(dagNode); | ||
TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get(). | ||
getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : null; | ||
|
||
// Increment job count before submitting the job onto the spec producer, in case that throws an exception. | ||
// By this point the quota is allocated, so it's imperative to increment as missing would introduce the potential to decrement below zero upon quota release. | ||
// Quota release is guaranteed, despite failure, because exception handling within would mark the job FAILED. | ||
// When the ensuing kafka message spurs DagManager processing, the quota is released and the counts decremented | ||
// Ensure that we do not double increment for flows that are retried | ||
if (dagNode.getValue().getCurrentAttempts() == 1) { | ||
dagManagerMetrics.incrementRunningJobMetrics(dagNode); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
repeated code here, it seems like we have scope to consolidate
*/ | ||
@Slf4j | ||
@Alpha | ||
// todo - maybe reload need different treatment from other operations, because these are already present in the dag store |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when is reload used? It will be helpful to describe why we need reload, is this only on startup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes this is only on startup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we still need this? what is loading here? every time we start up we only need to initialize the scheduler then the scheduler should take care of triggering launch tasks. or if there are unfinished tasks in the dagActionStore those will be separately dealt with by the startup sequence.
@Alpha | ||
public final class KillDagProc extends DagProc<KillDagTask> { | ||
|
||
// should dag task be a part of dag proc? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we already have access to the KillDagTask if we're extending it? it does make sense to me to have access to the original task associated with the Proc
@@ -154,9 +165,15 @@ protected void processMessage(DecodeableKafkaRecord message) { | |||
log.info("DagAction change ({}) received for flow: {}", dagActionType, dagAction); | |||
if (dagActionType.equals(DagActionStore.FlowActionType.RESUME)) { | |||
dagManager.handleResumeFlowRequest(flowGroup, flowName,Long.parseLong(flowExecutionId)); | |||
if (isMultiLeaderDagManagerEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we remove this for now if we are also not doing launch? do u want to test only kill or kill and resume in first iteration?
280ba6d
to
eafad07
Compare
|
||
|
||
/** | ||
* NewDagManager manages dags in memory and various mappings. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The job of the newDagManager is actually to only add new work ("tasks") to the dagTaskStream. Let's update the description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a lightweight class that is only adding new tasks. Do we want all this logic for status retrieving & clean up after a task finishes? Need to decide who polls for job status after it completes and cleans up or submits a next job
...lin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
Show resolved
Hide resolved
} | ||
if (dagActionStore.isPresent()) { | ||
Collection<DagActionStore.DagAction> dagActions = dagActionStore.get().getDagActions(); | ||
for (DagActionStore.DagAction action : dagActions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#3841 moved in recent PR if u rebase. all of these are in dagActionStoremonitor and we should call DagManagement interface to handle these
@@ -138,7 +146,8 @@ public <V> void onRetry(Attempt<V> attempt) { | |||
} | |||
} | |||
})); | |||
this.eventProducer = observabilityEventProducer; | |||
this.eventProducer = observabilityEventProducer; | |||
this.dagProcessingEngine = dagProcessingEngine; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we interacting with dagProcessingEngine
? We should be passing work to newDagManager
through event bus right? Also can you clarify this in comment that it is being passed to newDagManager
7416b31
to
48794e9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be a good start, while I get to the last few files a bit later
gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
Outdated
Show resolved
Hide resolved
...gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
Outdated
Show resolved
Hide resolved
@@ -94,6 +100,8 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by | |||
@Getter | |||
private final StateStore<org.apache.gobblin.configuration.State> stateStore; | |||
private final ScheduledExecutorService scheduledExecutorService; | |||
@Getter(AccessLevel.PUBLIC) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure whether I'm having trouble w/ the diff... but I can't find anyone accessing this.
correspondingly, I can't tell the intention behind it. that really deserves comments to document plus a name to indicate the purpose of this EventBus
(and perhaps also the type of its payload), as opposed to merely repeating its type name
...service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
Outdated
Show resolved
Hide resolved
if (isMultiLeaderDagManagerEnabled) { | ||
this.dagProcessingEngine.addDagAction(dagAction); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the Null Pattern is cleaner than every time checking a separate boolean. i.e. define an interface, where one impl has a addDagAction
that actually does real processing, and another "no-op" impl where addDagAction
does nothing. then instantiate the NoOp if isMultiActive == false
*/ | ||
@Alpha | ||
@Slf4j | ||
public abstract class DagProc<T extends DagTask> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see where you're using the type param, T
. the specific DagTask
derived type can probably be passed into the ctor for the DagProc
derived type.
generics would come in handy though for:
a. the return type of initialize()
(which is the param type of act()
)
b. the return type of act()
(which is the param type of commit()
)
...lin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
Outdated
Show resolved
Hide resolved
...lin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
Outdated
Show resolved
Hide resolved
...service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
Outdated
Show resolved
Hide resolved
public DagProc host(DagTaskVisitor<DagProc> visitor) { | ||
return visitor.meet(this); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't this work:
<T extends DagProc>
public T host(DTV<T> visitor) {
return visitor.meet(this);
?
88affd9
to
d45f8d5
Compare
remove changes not related to dag manager refactoring commit before preemption use visiter pattern store dag action instead of dag task in dag task stream use dag management store in NewDagManager merge conflicts some cleaning fix tests
d45f8d5
to
a9d3ca0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove this task type as well as Proc because we load up tasks as needed when there is a dagAction
pending. Steady state active host (or a newly starting up one) will load dag from dag state store after getting lease for the dagTask (that was added to the dagTaskStream).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see comment on ReloadDagTask
} | ||
this.dagStateStore.writeCheckpoint(this.dag); | ||
for (Dag.DagNode<JobExecutionPlan> dagNode : this.dag.getStartNodes()) { | ||
this.dagProcFactory.dagProcessingEngine.addAdvanceDagAction(dagNode); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall flow dagAction
stored in dagActionStore
-> dagActionStoreMonitor
receives action and adds task to dagTaskStream
-> dagProcessingEngine
iterates through tasks and "acts" on each one to do work via dagProc
return; | ||
} | ||
this.dagStateStore.writeCheckpoint(this.dag); | ||
for (Dag.DagNode<JobExecutionPlan> dagNode : this.dag.getStartNodes()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two options for how to deal with multi-hop flow
- the way you have here
- update status for dagStateStore
- then create separate dagActions for each dagNode
- launch 1 dagNode at a time, upon completion check for more nodes to launch and add them one by one committing each time
* An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states | ||
* and allows add/delete and other functions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add some detail in java doc: load/update/save any state relating to dag. used by dagProcs
void removeDagActionFromStore(DagActionStore.DagAction dagAction) throws IOException; | ||
void handleJobStatusEvent(JobStatusEvent jobStatusEvent); | ||
void handleKillFlowEvent(KillFlowEvent killFlowEvent); | ||
void handleResumeFlowEvent(ResumeFlowEvent resumeFlowEvent) throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these are the core functionality of dagManagement
(handling new dag action, enforce SLA/resume/launch/kill ... and add to stream). the other methods below should be in dagManagementStateStore
and dagProc
interact with dagManagementStateStore
* e) load {@link Dag}s on service-start / set-active. | ||
*/ | ||
@Slf4j | ||
public class NewDagManager implements DagManagement { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if these methods already exist elsewhere we should remove or see if we can abstract away unnecessary methods and functionality. We can clean up after ironing out interface for everything else. For now let's keep b).
Instead of c) for start/kill SLA enforcement we need to send some action to revisit and check if started/completed but all hosts need to know of it so need to durably store it somewhere (timer). Then only if SLA breached we add a dagAction to kill. launchDagAction
-> durably storesetStartSlaTimer
AND setCompletionSlaTimer
somewhere and set the timer for urself (on restart we can load these in or have another stream for it). When all host timers go off, do lease arbitration to enforce and emit a cancelBCStartSlaBreached/CompletionSlaBreached
action
import org.apache.gobblin.service.monitoring.event.JobStatusEvent; | ||
|
||
|
||
public interface DagManagement { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we have a method to handle a launch?
@Override | ||
public Object meet(ReloadDagTask reloadDagTask, DagProcessingEngine dagProcessingEngine) { | ||
return new ReloadDagProc(reloadDagTask, dagProcessingEngine); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we get rid of this one after previous discussion about no need to reload preemptively? we only access dagStateStore at time of execution
@Slf4j | ||
@AllArgsConstructor | ||
// change to iterable | ||
public class DagTaskStream implements Iterator<DagTask>{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the java doc mentions implementing DagManagement
but after recent convo and seeing NewDagManager
is the one to implement the code looks fine but update ja
*/ | ||
@Slf4j | ||
@Alpha | ||
// todo - maybe reload need different treatment from other operations, because these are already present in the dag store |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we still need this? what is loading here? every time we start up we only need to initialize the scheduler then the scheduler should take care of triggering launch tasks. or if there are unfinished tasks in the dagActionStore those will be separately dealt with by the startup sequence.
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
Tests
Commits