Skip to content

Commit

Permalink
initial commit for refactoring dag manager
Browse files Browse the repository at this point in the history
remove changes not related to dag manager refactoring
commit before preemption
  • Loading branch information
Meeth Gala authored and arjun4084346 committed Dec 4, 2023
1 parent 694ed37 commit 3845841
Show file tree
Hide file tree
Showing 42 changed files with 3,052 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class ServiceConfigKeys {
public static final boolean DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED = false;
public static final String GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "jobStatusMonitor.enabled";
public static final String GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "warmStandby.enabled";
public static final String GOBBLIN_SERVICE_MULTI_ACTIVE_DAG_MANAGER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "multiActiveDagManager.enabled";
public static final String GOBBLIN_SERVICE_MULTI_ACTIVE_SCHEDULER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "multiActiveScheduler.enabled";
// If true, will mark up/down d2 servers on leadership so that all requests will be routed to the leader node
public static final String GOBBLIN_SERVICE_D2_ONLY_ANNOUNCE_LEADER = GOBBLIN_SERVICE_PREFIX + "d2.onlyAnnounceLeader";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
import org.apache.gobblin.service.monitoring.GaaSObservabilityEventProducer;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor;
Expand Down Expand Up @@ -775,7 +776,7 @@ class MockKafkaAvroJobStatusMonitor extends KafkaAvroJobStatusMonitor {
public MockKafkaAvroJobStatusMonitor(String topic, Config config, int numThreads,
AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle, GaaSObservabilityEventProducer producer)
throws IOException, ReflectiveOperationException {
super(topic, config, numThreads, mock(JobIssueEventHandler.class), producer);
super(topic, config, numThreads, mock(JobIssueEventHandler.class), producer, mock(DagProcessingEngine.class));
shouldThrowFakeExceptionInParseJobStatus = shouldThrowFakeExceptionInParseJobStatusToggle;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,26 @@ public Iterator<JobStatus> getLatestJobStatusByFlowNameAndGroup(String flowName,
* @return deserialize {@link State} into a {@link JobStatus}.
*/
protected JobStatus getJobStatus(State jobState) {
JobStatus.JobStatusBuilder jobStatusBuilder = createJobStatusBuilderFromState(jobState);

String contextId = TroubleshooterUtils.getContextIdForJob(jobState.getProperties());

Supplier<List<Issue>> jobIssues = Suppliers.memoize(() -> {
List<Issue> issues;
try {
issues = issueRepository.getAll(contextId);
} catch (TroubleshooterException e) {
log.warn("Cannot retrieve job issues", e);
issues = Collections.emptyList();
}
return issues;
});

jobStatusBuilder.issues(jobIssues);
return jobStatusBuilder.build();
}

public static JobStatus.JobStatusBuilder createJobStatusBuilderFromState(State jobState) {
String flowGroup = getFlowGroup(jobState);
String flowName = getFlowName(jobState);
long flowExecutionId = getFlowExecutionId(jobState);
Expand All @@ -130,48 +150,35 @@ protected JobStatus getJobStatus(State jobState) {
int progressPercentage = jobState.getPropAsInt(TimingEvent.JOB_COMPLETION_PERCENTAGE, 0);
long lastProgressEventTime = jobState.getPropAsLong(TimingEvent.JOB_LAST_PROGRESS_EVENT_TIME, 0);

String contextId = TroubleshooterUtils.getContextIdForJob(jobState.getProperties());

Supplier<List<Issue>> jobIssues = Suppliers.memoize(() -> {
List<Issue> issues;
try {
issues = issueRepository.getAll(contextId);
} catch (TroubleshooterException e) {
log.warn("Cannot retrieve job issues", e);
issues = Collections.emptyList();
}
return issues;
});

return JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
jobName(jobName).jobGroup(jobGroup).jobTag(jobTag).jobExecutionId(jobExecutionId).eventName(eventName).
lowWatermark(lowWatermark).highWatermark(highWatermark).orchestratedTime(orchestratedTime).startTime(startTime).endTime(endTime).
message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts).currentGeneration(currentGeneration).
shouldRetry(shouldRetry).progressPercentage(progressPercentage).lastProgressEventTime(lastProgressEventTime).
issues(jobIssues).build();
return JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).jobName(jobName)
.jobGroup(jobGroup).jobTag(jobTag).jobExecutionId(jobExecutionId).eventName(eventName).lowWatermark(lowWatermark)
.highWatermark(highWatermark).orchestratedTime(orchestratedTime).startTime(startTime).endTime(endTime)
.message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts)
.currentGeneration(currentGeneration).shouldRetry(shouldRetry).progressPercentage(progressPercentage)
.lastProgressEventTime(lastProgressEventTime);
}

protected final String getFlowGroup(State jobState) {
protected static final String getFlowGroup(State jobState) {
return jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
}

protected final String getFlowName(State jobState) {
protected static final String getFlowName(State jobState) {
return jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
}

protected final long getFlowExecutionId(State jobState) {
protected static final long getFlowExecutionId(State jobState) {
return Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
}

protected final String getJobGroup(State jobState) {
protected static final String getJobGroup(State jobState) {
return jobState.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
}

protected final String getJobName(State jobState) {
protected static final String getJobName(State jobState) {
return jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
}

protected final long getJobExecutionId(State jobState) {
protected static final long getJobExecutionId(State jobState) {
return Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, "0"));
}

Expand All @@ -183,7 +190,9 @@ protected List<FlowStatus> asFlowStatuses(List<FlowExecutionJobStateGrouping> fl
return flowExecutionGroupings.stream().map(exec -> {
List<JobStatus> jobStatuses = ImmutableList.copyOf(asJobStatuses(exec.getJobStates().stream().sorted(
// rationalized order, to facilitate test assertions
Comparator.comparing(this::getJobGroup).thenComparing(this::getJobName).thenComparing(this::getJobExecutionId)
Comparator.comparing(JobStatusRetriever::getJobGroup)
.thenComparing(JobStatusRetriever::getJobName)
.thenComparing(JobStatusRetriever::getJobExecutionId)
).collect(Collectors.toList())));
return new FlowStatus(exec.getFlowName(), exec.getFlowGroup(), exec.getFlowExecutionId(), jobStatuses.iterator(),
getFlowStatusFromJobStatuses(dagManagerEnabled, jobStatuses.iterator()));
Expand All @@ -201,10 +210,8 @@ protected static class FlowExecutionJobStateGrouping {

protected List<FlowExecutionJobStateGrouping> groupByFlowExecutionAndRetainLatest(
String flowGroup, List<State> jobStatusStates, int maxCountPerFlowName) {
Map<String, Map<Long, List<State>>> statesByFlowExecutionIdByName =
jobStatusStates.stream().collect(Collectors.groupingBy(
this::getFlowName,
Collectors.groupingBy(this::getFlowExecutionId)));
Map<String, Map<Long, List<State>>> statesByFlowExecutionIdByName = jobStatusStates.stream().collect(
Collectors.groupingBy(JobStatusRetriever::getFlowName, Collectors.groupingBy(JobStatusRetriever::getFlowExecutionId)));

return statesByFlowExecutionIdByName.entrySet().stream().sorted(Map.Entry.comparingByKey()).flatMap(flowNameEntry -> {
String flowName = flowNameEntry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import lombok.ToString;

import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
import org.apache.gobblin.util.ConfigUtils;


Expand Down Expand Up @@ -79,6 +80,9 @@ public class GobblinServiceConfiguration {
@Getter
private final boolean onlyAnnounceLeader;

@Getter
private final boolean isDagProcessingEngineEnabled;

@Getter
private final Config innerConfig;

Expand Down Expand Up @@ -124,5 +128,6 @@ public GobblinServiceConfiguration(String serviceName, String serviceId, Config
this.isTopologySpecFactoryEnabled =
ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY, true);
this.onlyAnnounceLeader = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_D2_ONLY_ANNOUNCE_LEADER, false);
this.isDagProcessingEngineEnabled = ConfigUtils.getBoolean(config, DagProcessingEngine.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@
import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagProcFactory;
import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
import org.apache.gobblin.service.modules.orchestration.DagTaskStream;
import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
import org.apache.gobblin.service.modules.orchestration.InMemoryDagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandlerWithWarmStandby;
Expand Down Expand Up @@ -168,16 +173,26 @@ public void configure(Binder binder) {

OptionalBinder.newOptionalBinder(binder, MultiActiveLeaseArbiter.class);
OptionalBinder.newOptionalBinder(binder, FlowTriggerHandler.class);
OptionalBinder.newOptionalBinder(binder, DagProcFactory.class);
OptionalBinder.newOptionalBinder(binder, DagProcessingEngine.class);
OptionalBinder.newOptionalBinder(binder, DagManagementStateStore.class);
OptionalBinder.newOptionalBinder(binder, DagTaskStream.class);

if (serviceConfig.isMultiActiveSchedulerEnabled()) {
binder.bind(MultiActiveLeaseArbiter.class).to(MysqlMultiActiveLeaseArbiter.class);
binder.bind(FlowTriggerHandler.class);
if(serviceConfig.isDagProcessingEngineEnabled()) {
binder.bind(DagManagementStateStore.class).to(InMemoryDagManagementStateStore.class);
binder.bind(DagProcFactory.class).in(Singleton.class);
binder.bind(DagProcessingEngine.class).in(Singleton.class);
binder.bind(DagTaskStream.class).in(Singleton.class);
}
}

binder.bind(FlowConfigsResource.class);
binder.bind(FlowConfigsV2Resource.class);
binder.bind(FlowStatusResource.class);
binder.bind(FlowExecutionResource.class);

binder.bind(FlowConfigResourceLocalHandler.class);
binder.bind(FlowConfigV2ResourceLocalHandler.class);
binder.bind(FlowExecutionResourceLocalHandler.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,20 @@
import java.util.Set;
import java.util.stream.Collectors;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.typesafe.config.Config;

import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;

import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;


/**
* An implementation of Dag. Assumes that nodes have unique values. Nodes with duplicate values will produce
Expand Down Expand Up @@ -255,10 +262,12 @@ public static class DagNode<T> {
private T value;
//List of parent Nodes that are dependencies of this Node.
private List<DagNode<T>> parentNodes;
private DagNodeId id;

//Constructor
public DagNode(T value) {
this.value = value;
this.id = createId(((JobExecutionPlan) this.getValue()).getJobSpec().getConfig());
}

public void addParentNode(DagNode<T> node) {
Expand All @@ -285,6 +294,32 @@ public boolean equals(Object o) {
public int hashCode() {
return this.getValue().hashCode();
}

private static DagNodeId createId(Config jobConfig) {
String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName =jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
long flowExecutionId = jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);

return new DagNodeId(flowGroup, flowName, flowExecutionId, jobGroup, jobName);
}

@Getter
@EqualsAndHashCode
@AllArgsConstructor
public static class DagNodeId {
String flowGroup;
String flowName;
long flowExecutionId;
String jobGroup;
String jobName;

@Override
public String toString() {
return Joiner.on("_").join(flowGroup, flowName, flowExecutionId, jobGroup, jobName);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.service.modules.orchestration;

import java.io.IOException;
import java.net.URI;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;

import com.typesafe.config.Config;

import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.KillFlowEvent;
import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
import org.apache.gobblin.service.monitoring.event.JobStatusEvent;


/**
* Responsible for defining the behavior of {@link DagTask} handling scenarios for launch, resume, kill, job start
* and flow completion deadlines
*
*/
@Alpha
public interface DagManagement {

DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> topologySpecMap);
void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);

/**
* defines what to do when a job (dag node) finishes
* @param dagNode dag node that finished
* @return next set of DagNodes to run
* @throws IOException
*/
Map<String, Set<Dag.DagNode<JobExecutionPlan>>> onJobFinish(Dag.DagNode<JobExecutionPlan> dagNode)
throws IOException;

/**
* submit next dag nodes to run
* @param dagId dag id for which next dag nodes to run
* @return a set of dag nodes that were submitted to run by this method
* @throws IOException
*/
// todo : maybe return just a set
Map<String, Set<Dag.DagNode<JobExecutionPlan>>> submitNext(String dagId) throws IOException;
void removeDagActionFromStore(NewDagManager.DagId dagIdToResume, DagActionStore.FlowActionType flowActionType)
throws IOException;

void handleJobStatusEvent(JobStatusEvent jobStatusEvent);
void handleKillFlowEvent(KillFlowEvent killFlowEvent);
void handleLaunchFlowEvent(DagActionStore.DagAction launchAction);
void handleResumeFlowEvent(ResumeFlowEvent resumeFlowEvent) throws IOException;

Map<String, Dag<JobExecutionPlan>> getDags();
Map<String, Dag<JobExecutionPlan>> getResumingDags();
Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> getJobToDag();
Map<String, Dag.DagNode<JobExecutionPlan>> getDagNodes();
Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> getDagToJobs();
Map<String, Long> getDagToSLA();
Set<String> getFailedDagIds();
DagStateStore getFailedDagStateStore();
DagStateStore getDagStateStore();


void setActive() throws IOException;
}
Loading

0 comments on commit 3845841

Please sign in to comment.