diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index f0fcc258f24..15fff870b60 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -17,10 +17,14 @@ package org.apache.gobblin.cluster; +import com.google.common.eventbus.Subscribe; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.net.URI; import java.time.Duration; +import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; @@ -30,6 +34,14 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import org.apache.gobblin.runtime.SourceDecorator; +import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition; +import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource; +import org.apache.gobblin.source.extractor.extract.kafka.KafkaUtils; +import org.apache.gobblin.source.workunit.BasicWorkUnitStream; +import org.apache.gobblin.source.workunit.WorkUnitStream; +import org.apache.gobblin.stream.WorkUnitChangeEvent; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -78,6 +90,8 @@ import org.apache.gobblin.util.PropertiesUtils; import org.apache.gobblin.util.SerializationUtils; +import static org.apache.gobblin.util.JobLauncherUtils.*; + /** * An implementation of {@link JobLauncher} that launches a Gobblin job using the Helix task framework. @@ -131,7 +145,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { private final long workFlowExpiryTimeSeconds; private final long helixJobStopTimeoutSeconds; private final long helixWorkflowSubmissionTimeoutSeconds; - private Map workUnitToHelixConfig; + private Map helixIdTaskConfigMap; private Retryer taskRetryer; public GobblinHelixJobLauncher(Properties jobProps, final HelixManager helixManager, Path appWorkDir, @@ -185,7 +199,7 @@ public GobblinHelixJobLauncher(Properties jobProps, final HelixManager helixMana this.stateStores.getTaskStateStore(), this.outputTaskStateDir, this.getIssueRepository()); this.helixMetrics = helixMetrics; - this.workUnitToHelixConfig = new HashMap<>(); + this.helixIdTaskConfigMap = new HashMap<>(); this.taskRetryer = RetryerBuilder.newBuilder() .retryIfException() .withStopStrategy(StopStrategies.stopAfterAttempt(3)).build(); @@ -254,6 +268,76 @@ protected void runWorkUnits(List workUnits) throws Exception { } } + /** + * The implementation of this method has the assumption that work unit change should never delete without adding new + * work units, which will cause starvation. Thus, process {@link WorkUnitChangeEvent} for two scenario: + * 1. workUnitChangeEvent only contains old tasks and no new tasks given: recalculate new work unit through kafka + * source and pack with a min container setting. + * 2. workUnitChangeEvent contains both valid old and new work unit: respect the information and directly remove + * old tasks and start new work units + * + * @param workUnitChangeEvent Event post by EventBus to specify old tasks to be removed and new tasks to be run + * @throws InvocationTargetException + */ + @Override + @Subscribe + public void handleWorkUnitChangeEvent(WorkUnitChangeEvent workUnitChangeEvent) + throws InvocationTargetException { + log.info("Received WorkUnitChangeEvent with old Task {} and new WU {}", + workUnitChangeEvent.getOldTaskIds(), workUnitChangeEvent.getNewWorkUnits()); + final JobState jobState = this.jobContext.getJobState(); + List workUnits = workUnitChangeEvent.getNewWorkUnits(); + // Use old task Id to recalculate new work units + if(workUnits == null || workUnits.isEmpty()) { + workUnits = recalculateWorkUnit(workUnitChangeEvent.getOldTaskIds()); + // If no new valid work units can be generated, dismiss the WorkUnitChangeEvent + if(workUnits == null || workUnits.isEmpty()) { + log.info("Not able to update work unit meaningfully, dismiss the WorkUnitChangeEvent"); + return; + } + } + + // Follow how AbstractJobLauncher handles work units to make sure consistent behaviour + WorkUnitStream workUnitStream = new BasicWorkUnitStream.Builder(workUnits).build(); + // For streaming use case, this might be a necessary step to find dataset specific namespace so that each workUnit + // can create staging and temp directories with the correct determination of shard-path + workUnitStream = this.executeHandlers(workUnitStream, this.destDatasetHandlerService); + workUnitStream = this.processWorkUnitStream(workUnitStream, jobState); + workUnits = materializeWorkUnitList(workUnitStream); + try { + this.removeTasksFromCurrentJob(workUnitChangeEvent.getOldTaskIds()); + this.addTasksToCurrentJob(workUnits); + } catch (Exception e) { + //todo: emit some event to indicate there is an error handling this event that may cause starvation + log.error("Failed to process WorkUnitChangeEvent with old tasks {} and new workunits {}.", + workUnitChangeEvent.getOldTaskIds(), workUnits, e); + throw new InvocationTargetException(e); + } + } + + private List recalculateWorkUnit(List oldHelixTaskIDs) { + JobState jobState = this.jobContext.getJobState(); + Map> filteredTopicPartition = new HashMap<>(); + for(String id : oldHelixTaskIDs) { + WorkUnit workUnit = getWorkUnitFromStateStoreByHelixId(id); + String topicName = workUnit.getProp(KafkaSource.TOPIC_NAME); + List partitions = filteredTopicPartition.getOrDefault(topicName, + new LinkedList<>()); + partitions.addAll(KafkaUtils.getPartitions(workUnit).stream().map(KafkaPartition::getId).collect(Collectors.toList())); + filteredTopicPartition.put(topicName, partitions); + } + // If a topic contains less than 2 filtered partition, it can't be further split so remove from map + filteredTopicPartition.values().removeIf(list -> list == null || list.size() < 2); + if(filteredTopicPartition.isEmpty()) { + return new ArrayList<>(); + } + KafkaSource source = (KafkaSource) ((SourceDecorator) this.jobContext.getSource()).getSource(); + //TODO: having a smarter way to calculate the new work unit size to replace the current static approach to simply double + int newWorkUnitSize = oldHelixTaskIDs.size() * 2; + return source.getWorkunitsForFilteredPartitions(jobState, + com.google.common.base.Optional.of(filteredTopicPartition), com.google.common.base.Optional.of(newWorkUnitSize)); + } + @Override protected void executeCancellation() { if (this.jobSubmitted) { @@ -275,31 +359,31 @@ protected void executeCancellation() { } } - protected void removeTasksFromCurrentJob(List workUnitIdsToRemove) throws IOException, ExecutionException, + protected void removeTasksFromCurrentJob(List helixTaskIdsToRemove) throws IOException, ExecutionException, RetryException { String jobName = this.jobContext.getJobId(); try (ParallelRunner stateSerDeRunner = new ParallelRunner(this.stateSerDeRunnerThreads, this.fs)) { - for (String workUnitId : workUnitIdsToRemove) { + for (String helixTaskId : helixTaskIdsToRemove) { taskRetryer.call(new Callable() { @Override public Boolean call() throws Exception { - String taskId = workUnitToHelixConfig.get(workUnitId).getId(); + String workUnitId = helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(ConfigurationKeys.TASK_ID_KEY); boolean remove = - HelixUtils.deleteTaskFromHelixJob(helixWorkFlowName, jobName, taskId, helixTaskDriver); + HelixUtils.deleteTaskFromHelixJob(helixWorkFlowName, jobName, helixTaskId, helixTaskDriver); if (remove) { - log.info(String.format("Removed helix task %s with gobblin task id %s from helix job %s:%s ", taskId, + log.info(String.format("Removed helix task %s with gobblin task id %s from helix job %s:%s ", helixTaskId, workUnitId, helixWorkFlowName, jobName)); } else { throw new IOException( - String.format("Cannot remove task %s from helix job %s:%s", workUnitId, + String.format("Cannot remove Helix task %s from helix job %s:%s", helixTaskId, helixWorkFlowName, jobName)); } return true; } }); - deleteWorkUnitFromStateStore(workUnitId, stateSerDeRunner); - log.info(String.format("remove task state for %s in state store", workUnitId)); - this.workUnitToHelixConfig.remove(workUnitId); + deleteWorkUnitFromStateStoreByHelixId(helixTaskId, stateSerDeRunner); + log.info(String.format("Removed task state for Helix task %s in state store", helixTaskId)); + this.helixIdTaskConfigMap.remove(helixTaskId); } } } @@ -513,7 +597,7 @@ private TaskConfig getTaskConfig(WorkUnit workUnit, ParallelRunner stateSerDeRun rawConfigMap.put(ConfigurationKeys.TASK_ID_KEY, workUnit.getId()); rawConfigMap.put(GobblinClusterConfigurationKeys.TASK_SUCCESS_OPTIONAL_KEY, "true"); TaskConfig taskConfig = TaskConfig.Builder.from(rawConfigMap); - workUnitToHelixConfig.put(workUnit.getId(), taskConfig); + helixIdTaskConfigMap.put(taskConfig.getId(), taskConfig); return taskConfig; } @@ -526,12 +610,36 @@ private void addWorkUnit(WorkUnit workUnit, ParallelRunner stateSerDeRunner, Map taskConfigMap.put(workUnit.getId(), getTaskConfig(workUnit, stateSerDeRunner)); } + /** + * get a single {@link WorkUnit} (flattened) from state store. + */ + private WorkUnit getWorkUnitFromStateStoreByHelixId(String helixTaskId) { + String workUnitFilePath = + helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH); + final StateStore stateStore; + Path workUnitFile = new Path(workUnitFilePath); + String workUnitId = helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(ConfigurationKeys.TASK_ID_KEY); + final String fileName = workUnitFile.getName(); + final String storeName = workUnitFile.getParent().getName(); + if (fileName.endsWith(MULTI_WORK_UNIT_FILE_EXTENSION)) { + stateStore = stateStores.getMwuStateStore(); + } else { + stateStore = stateStores.getWuStateStore(); + } + try { + return (WorkUnit) stateStore.get(storeName, fileName, workUnitId); + } catch (IOException ioException) { + log.error("Failed to fetch workUnit for helix task {} from path {}", helixTaskId, workUnitFilePath); + } + return null; + } + /** * Delete a single {@link WorkUnit} (flattened) from state store. */ - private void deleteWorkUnitFromStateStore(String workUnitId, ParallelRunner stateSerDeRunner) { + private void deleteWorkUnitFromStateStoreByHelixId(String helixTaskId, ParallelRunner stateSerDeRunner) { String workUnitFilePath = - workUnitToHelixConfig.get(workUnitId).getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH); + helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH); Path workUnitFile = new Path(workUnitFilePath); final String fileName = workUnitFile.getName(); final String storeName = workUnitFile.getParent().getName(); diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java index d30554d2bb3..4049ee3d14e 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java @@ -275,13 +275,14 @@ public GobblinHelixJobLauncher buildJobLauncher(Properties jobProps) Properties combinedProps = new Properties(); combinedProps.putAll(properties); combinedProps.putAll(jobProps); - - return new GobblinHelixJobLauncher(combinedProps, + GobblinHelixJobLauncher gobblinHelixJobLauncher = new GobblinHelixJobLauncher(combinedProps, this.jobHelixManager, this.appWorkDir, this.metadataTags, this.jobRunningMap, Optional.of(this.helixMetrics)); + this.eventBus.register(gobblinHelixJobLauncher); + return gobblinHelixJobLauncher; } public Future scheduleJobImmediately(Properties jobProps, JobListener jobListener) { diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallableTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallableTest.java index 779e7dd0dd1..f2572d18f17 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallableTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallableTest.java @@ -17,6 +17,7 @@ package org.apache.gobblin.cluster; +import com.google.common.eventbus.EventBus; import java.io.File; import java.util.Optional; import java.util.Properties; @@ -59,8 +60,8 @@ public void testBuildJobLauncher() MutableJobCatalog jobCatalog = new NonObservingFSJobCatalog(config); SchedulerService schedulerService = new SchedulerService(new Properties()); Path appWorkDir = new Path(TMP_DIR); - GobblinHelixJobScheduler jobScheduler = new GobblinHelixJobScheduler(ConfigFactory.empty(), getMockHelixManager(), Optional.empty(), null, - appWorkDir, Lists.emptyList(), schedulerService, jobCatalog); + GobblinHelixJobScheduler jobScheduler = new GobblinHelixJobScheduler(ConfigFactory.empty(), getMockHelixManager(), Optional.empty(), + new EventBus("Test"), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog); GobblinHelixJobLauncher jobLauncher = HelixRetriggeringJobCallable.buildJobLauncherForCentralizedMode(jobScheduler, getDummyJob()); String jobId = jobLauncher.getJobId(); Assert.assertNotNull(jobId); diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java index 80ef8c09d5e..3eb0967658a 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java @@ -244,10 +244,17 @@ public List getWorkunitsForFilteredPartitions(SourceState state, Collection topics; if(filteredTopicPartition.isPresent()) { - // If filteredTopicPartition present, use it to construct the whitelist pattern while leave blacklist empty - topics = this.kafkaConsumerClient.get().getFilteredTopics(Collections.emptyList(), - filteredTopicPartitionMap.keySet().stream().map(Pattern::compile).collect(Collectors.toList())); + if(filteredTopicPartition.get().isEmpty()) { + // return an empty list as filteredTopicPartition is present but contains no valid entry + return new ArrayList<>(); + } else { + // If filteredTopicPartition present, use it to construct the whitelist pattern while leave blacklist empty + topics = this.kafkaConsumerClient.get() + .getFilteredTopics(Collections.emptyList(), + filteredTopicPartitionMap.keySet().stream().map(Pattern::compile).collect(Collectors.toList())); + } } else { + // get topics based on job level config topics = getValidTopics(getFilteredTopics(state), state); } this.topicsToProcess = topics.stream().map(KafkaTopic::getName).collect(toSet()); @@ -263,6 +270,8 @@ public String apply(KafkaTopic topic) { int numOfThreads = state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS, ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT); + // No need to allocate more thread than the topic size, but minimum should 1 + numOfThreads = Math.max(Math.min(numOfThreads, topics.size()), 1); ExecutorService threadPool = Executors.newFixedThreadPool(numOfThreads, ExecutorsUtils.newThreadFactory(Optional.of(LOG))); @@ -277,7 +286,7 @@ public String apply(KafkaTopic topic) { Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted(); for (KafkaTopic topic : topics) { - LOG.info("Discovered topic " + topic); + LOG.info("Discovered topic {} with {} number of partitions", topic.getName(), topic.getPartitions().size()); if (topic.getTopicSpecificState().isPresent()) { topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new State()) .addAllIfNotExist(topic.getTopicSpecificState().get()); @@ -343,6 +352,8 @@ public String apply(KafkaTopic topic) { protected void populateClientPool(int count, GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory, Config config) { + // Clear the pool first as clients within may already be close + kafkaConsumerClientPool.clear(); for (int i = 0; i < count; i++) { kafkaConsumerClientPool.offer(kafkaConsumerClientFactory.create(config)); } diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java index 9c22d47cc36..764eeae0988 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.PriorityQueue; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.gobblin.metrics.event.GobblinEventBuilder; import org.apache.hadoop.fs.Path; @@ -135,6 +136,10 @@ public enum ContainerCapacityComputationStrategy { private static final String NUM_CONTAINERS_EVENT_NAME = "NumContainers"; + // id to append to the task output directory to make it unique to avoid multiple flush publishers attempting to move + // the same file. Make it static and atomic as during runtime packing can happen multiple times simultaneously + private static AtomicInteger uniqueId = new AtomicInteger(0); + private final long packingStartTimeMillis; private final double minimumContainerCapacity; private final Optional watermarkStorage; @@ -305,7 +310,8 @@ private void addStatsToWorkUnits(Map> workUnitsByTopic) t private Double getDefaultWorkUnitSize() { return state.getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY, - KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY) / state.getPropAsDouble(DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER_KEY, DEFAULT_DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER); + KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY) / + state.getPropAsDouble(DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER_KEY, DEFAULT_DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER); } /** @@ -339,9 +345,6 @@ protected List squeezeMultiWorkUnits(List multiWorkUnit if (state.getPropAsBoolean(INDEXING_ENABLED, DEFAULT_INDEXING_ENABLED)) { List indexedWorkUnitList = new ArrayList<>(); - // id to append to the task output directory to make it unique to avoid multiple flush publishers - // attempting to move the same file. - int uniqueId = 0; for (MultiWorkUnit mwu : multiWorkUnits) { // Select a sample WU. WorkUnit indexedWorkUnit = mwu.getWorkUnits().get(0); @@ -355,7 +358,8 @@ protected List squeezeMultiWorkUnits(List multiWorkUnit // Need to make the task output directory unique to file move conflicts in the flush publisher. String outputDir = state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR); - indexedWorkUnit.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(outputDir, Integer.toString(uniqueId++))); + indexedWorkUnit.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, + new Path(outputDir, Integer.toString(uniqueId.getAndIncrement()))); indexedWorkUnitList.add(indexedWorkUnit); } return indexedWorkUnitList; diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java index bf86e628e45..fbd4294deec 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java @@ -32,6 +32,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.Getter; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -145,6 +147,9 @@ public abstract class AbstractJobLauncher implements JobLauncher { // This contains all job context information protected final JobContext jobContext; + // Helper to prepare WorkUnit with necessary information. This final object can make sure the uniqueness of task IDs + protected final WorkUnitPreparator workUnitPreparator; + // This (optional) JobLock is used to prevent the next scheduled run // of the job from starting if the current run has not finished yet protected Optional jobLockOptional = Optional.absent(); @@ -171,6 +176,7 @@ public abstract class AbstractJobLauncher implements JobLauncher { protected final EventSubmitter eventSubmitter; // This is for dispatching events related to job launching and execution to registered subscribers + @Getter protected final EventBus eventBus = new EventBus(AbstractJobLauncher.class.getSimpleName()); // A list of JobListeners that will be injected into the user provided JobListener @@ -183,6 +189,9 @@ public abstract class AbstractJobLauncher implements JobLauncher { protected final GobblinJobMetricReporter gobblinJobMetricsReporter; + protected Boolean canCleanUpStagingData = false; + protected DestinationDatasetHandlerService destDatasetHandlerService; + public AbstractJobLauncher(Properties jobProps, List> metadataTags) throws Exception { this(jobProps, metadataTags, null); @@ -221,6 +230,7 @@ public AbstractJobLauncher(Properties jobProps, List> metadataT this.jobContext = new JobContext(this.jobProps, LOG, instanceBroker, troubleshooter.getIssueRepository()); this.eventBus.register(this.jobContext); + this.workUnitPreparator = new WorkUnitPreparator(this.jobContext.getJobId()); this.cancellationExecutor = Executors.newSingleThreadExecutor( ExecutorsUtils.newDaemonThreadFactory(Optional.of(LOG), Optional.of("CancellationExecutor"))); @@ -507,9 +517,10 @@ public void apply(JobListener jobListener, JobContext jobContext) } // Perform work needed before writing is done - Boolean canCleanUp = this.canCleanStagingData(this.jobContext.getJobState()); - workUnitStream = closer.register(new DestinationDatasetHandlerService(jobState, canCleanUp, this.eventSubmitter)) - .executeHandlers(workUnitStream); + this.canCleanUpStagingData = this.canCleanStagingData(this.jobContext.getJobState()); + this.destDatasetHandlerService = new DestinationDatasetHandlerService(jobState, canCleanUpStagingData, this.eventSubmitter); + closer.register(this.destDatasetHandlerService); + workUnitStream = this.executeHandlers(workUnitStream, this.destDatasetHandlerService); //Initialize writer and converter(s) closer.register(WriterInitializerFactory.newInstace(jobState, workUnitStream)).initialize(); @@ -541,19 +552,7 @@ public void apply(JobListener jobListener, JobContext jobContext) TimingEvent workUnitsPreparationTimer = this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_PREPARATION); - // Add task ids - workUnitStream = prepareWorkUnits(workUnitStream, jobState); - // Remove skipped workUnits from the list of work units to execute. - workUnitStream = workUnitStream.filter(new SkippedWorkUnitsFilter(jobState)); - // Add surviving tasks to jobState - workUnitStream = workUnitStream.transform(new MultiWorkUnitForEach() { - @Override - public void forWorkUnit(WorkUnit workUnit) { - jobState.incrementTaskCount(); - jobState.addTaskState(new TaskState(new WorkUnitState(workUnit, jobState))); - } - }); - + workUnitStream = processWorkUnitStream(workUnitStream, jobState); // If it is a streaming source, workunits cannot be counted this.jobContext.getJobState().setProp(NUM_WORKUNITS, workUnitStream.isSafeToMaterialize() ? workUnitStream.getMaterializedWorkUnitCollection().size() : 0); @@ -711,6 +710,26 @@ private void executeUnfinishedCommitSequences(String jobName) } } + protected WorkUnitStream executeHandlers (WorkUnitStream workUnitStream, DestinationDatasetHandlerService datasetHandlerService){ + return datasetHandlerService.executeHandlers(workUnitStream); + } + + protected WorkUnitStream processWorkUnitStream(WorkUnitStream workUnitStream, JobState jobState) { + // Add task ids + workUnitStream = prepareWorkUnits(workUnitStream); + // Remove skipped workUnits from the list of work units to execute. + workUnitStream = workUnitStream.filter(new SkippedWorkUnitsFilter(jobState)); + // Add surviving tasks to jobState + workUnitStream = workUnitStream.transform(new MultiWorkUnitForEach() { + @Override + public void forWorkUnit(WorkUnit workUnit) { + jobState.incrementTaskCount(); + jobState.addTaskState(new TaskState(new WorkUnitState(workUnit, jobState))); + } + }); + return workUnitStream; + } + /** * Subclasses can override this method to do whatever processing on the {@link TaskState}s, * e.g., aggregate task-level metrics into job-level metrics. @@ -817,7 +836,7 @@ protected void runWorkUnitStream(WorkUnitStream workUnitStream) throws Exception /** * Materialize a {@link WorkUnitStream} into an in-memory list. Note that infinite work unit streams cannot be materialized. */ - private List materializeWorkUnitList(WorkUnitStream workUnitStream) { + protected List materializeWorkUnitList(WorkUnitStream workUnitStream) { if (!workUnitStream.isFiniteStream()) { throw new UnsupportedOperationException("Cannot materialize an infinite work unit stream."); } @@ -888,8 +907,8 @@ public void run() { /** * Prepare the flattened {@link WorkUnit}s for execution by populating the job and task IDs. */ - private WorkUnitStream prepareWorkUnits(WorkUnitStream workUnits, JobState jobState) { - return workUnits.transform(new WorkUnitPreparator(this.jobContext.getJobId())); + private WorkUnitStream prepareWorkUnits(WorkUnitStream workUnits) { + return workUnits.transform(workUnitPreparator); } private static abstract class MultiWorkUnitForEach implements Function { @@ -912,13 +931,13 @@ public WorkUnit apply(WorkUnit input) { @RequiredArgsConstructor private static class WorkUnitPreparator extends MultiWorkUnitForEach { - private int taskIdSequence = 0; + private final AtomicInteger taskIdSequence = new AtomicInteger(0); private final String jobId; @Override protected void forWorkUnit(WorkUnit workUnit) { workUnit.setProp(ConfigurationKeys.JOB_ID_KEY, this.jobId); - String taskId = JobLauncherUtils.newTaskId(this.jobId, this.taskIdSequence++); + String taskId = JobLauncherUtils.newTaskId(this.jobId, taskIdSequence.getAndIncrement()); workUnit.setId(taskId); workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, taskId); workUnit.setProp(ConfigurationKeys.TASK_KEY_KEY, Long.toString(Id.Task.parse(taskId).getSequence())); diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java index 658b308b79a..8fbe8ac9385 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java @@ -296,7 +296,7 @@ public Optional getJobMetricsOptional() { * * @return an instance of the {@link Source} class specified in the job configuration */ - Source getSource() { + public Source getSource() { return this.source; } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java index f1e8928bee0..2dc167c0a46 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; +import lombok.Getter; import org.apache.gobblin.source.InfiniteSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +49,7 @@ public class SourceDecorator implements WorkUnitStreamSource, Decorator { private static final Logger LOG = LoggerFactory.getLogger(SourceDecorator.class); + @Getter private final Source source; private final String jobId; private final Logger logger; diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java index c447af99ecc..21fc47c4a46 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java @@ -18,6 +18,7 @@ package org.apache.gobblin.yarn; import java.util.ArrayDeque; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -31,6 +32,7 @@ import java.util.stream.Collectors; import org.apache.commons.compress.utils.Sets; +import org.apache.gobblin.stream.WorkUnitChangeEvent; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; @@ -71,7 +73,10 @@ public class YarnAutoScalingManager extends AbstractIdleService { private final String AUTO_SCALING_PREFIX = GobblinYarnConfigurationKeys.GOBBLIN_YARN_PREFIX + "autoScaling."; private final String AUTO_SCALING_POLLING_INTERVAL_SECS = AUTO_SCALING_PREFIX + "pollingIntervalSeconds"; - private static final int THRESHOLD_NUMBER_OF_ATTEMPTS_FOR_LOGGING = 20; + private final String TASK_NUMBER_OF_ATTEMPTS_THRESHOLD = AUTO_SCALING_PREFIX + "taskAttemptsThreshold"; + private final int DEFAULT_TASK_NUMBER_OF_ATTEMPTS_THRESHOLD = 20; + private final String SPLIT_WORKUNIT_REACH_ATTEMPTS_THRESHOLD = AUTO_SCALING_PREFIX + "splitWorkUnitReachThreshold"; + private final boolean DEFAULT_SPLIT_WORKUNIT_REACH_ATTEMPTS_THRESHOLD = false; private final int DEFAULT_AUTO_SCALING_POLLING_INTERVAL_SECS = 60; // Only one container will be requested for each N partitions of work private final String AUTO_SCALING_PARTITIONS_PER_CONTAINER = AUTO_SCALING_PREFIX + "partitionsPerContainer"; @@ -85,6 +90,8 @@ public class YarnAutoScalingManager extends AbstractIdleService { private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX + "initialDelay"; private final int DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS = 60; + private int taskAttemptsThreshold; + private final boolean splitWorkUnitReachThreshold; private final String AUTO_SCALING_WINDOW_SIZE = AUTO_SCALING_PREFIX + "windowSize"; @@ -125,6 +132,10 @@ public YarnAutoScalingManager(GobblinApplicationMaster appMaster) { GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG); this.defaultContainerMemoryMbs = config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY); this.defaultContainerCores = config.getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY); + this.taskAttemptsThreshold = ConfigUtils.getInt(this.config, TASK_NUMBER_OF_ATTEMPTS_THRESHOLD, + DEFAULT_TASK_NUMBER_OF_ATTEMPTS_THRESHOLD); + this.splitWorkUnitReachThreshold = ConfigUtils.getBoolean(this.config, SPLIT_WORKUNIT_REACH_ATTEMPTS_THRESHOLD, + DEFAULT_SPLIT_WORKUNIT_REACH_ATTEMPTS_THRESHOLD); } @Override @@ -139,7 +150,7 @@ protected void startUp() { this.autoScalingExecutor.scheduleAtFixedRate(new YarnAutoScalingRunnable(new TaskDriver(this.helixManager), this.yarnService, this.partitionsPerContainer, this.overProvisionFactor, this.slidingFixedSizeWindow, this.helixManager.getHelixDataAccessor(), this.defaultHelixInstanceTags, - this.defaultContainerMemoryMbs, this.defaultContainerCores), + this.defaultContainerMemoryMbs, this.defaultContainerCores, this.taskAttemptsThreshold, this.splitWorkUnitReachThreshold), initialDelay, scheduleInterval, TimeUnit.SECONDS); } @@ -166,6 +177,8 @@ static class YarnAutoScalingRunnable implements Runnable { private final String defaultHelixInstanceTags; private final int defaultContainerMemoryMbs; private final int defaultContainerCores; + private final int taskAttemptsThreshold; + private final boolean splitWorkUnitReachThreshold; /** * A static map that keep track of an idle instance and its latest beginning idle time. @@ -185,10 +198,17 @@ public void run() { } private String getInuseParticipantForHelixPartition(JobContext jobContext, int partition) { - if (jobContext.getPartitionNumAttempts(partition) > THRESHOLD_NUMBER_OF_ATTEMPTS_FOR_LOGGING) { + if (jobContext.getPartitionNumAttempts(partition) > taskAttemptsThreshold) { log.warn("Helix task {} has been retried for {} times, please check the config to see how we can handle this task better", jobContext.getTaskIdForPartition(partition), jobContext.getPartitionNumAttempts(partition)); + if(splitWorkUnitReachThreshold) { + String helixTaskID = jobContext.getTaskIdForPartition(partition); + log.info("Sending WorkUnitChangeEvent to split helix task:{}", helixTaskID); + this.yarnService.getEventBus().post(new WorkUnitChangeEvent( + Collections.singletonList(helixTaskID), null)); + } } + if (!UNUSUAL_HELIX_TASK_STATES.contains(jobContext.getPartitionState(partition))) { return jobContext.getAssignedParticipant(partition); } diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java index e1da50d94ee..370b8ba87fe 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java @@ -134,6 +134,7 @@ public class YarnService extends AbstractIdleService { private final Config config; + @Getter private final EventBus eventBus; private final Configuration yarnConfiguration; diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java index 10563c2bbef..d21d530d7b5 100644 --- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java @@ -78,7 +78,8 @@ public void testOneJob() { YarnAutoScalingManager.YarnAutoScalingRunnable runnable = new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, - 1.0, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, defaultContainerCores); + 1.0, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, + defaultContainerCores, 20, false); runnable.run(); ArgumentCaptor argument = ArgumentCaptor.forClass(YarnContainerRequestBundle.class); @@ -108,7 +109,8 @@ public void testTwoJobs() { YarnAutoScalingManager.YarnAutoScalingRunnable runnable = new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, - 1.0, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, defaultContainerCores); + 1.0, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, + defaultContainerCores, 20, false); runnable.run(); @@ -145,7 +147,8 @@ public void testTwoWorkflows() { YarnAutoScalingManager.YarnAutoScalingRunnable runnable = new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, - 1.0, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, defaultContainerCores); + 1.0, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, + defaultContainerCores, 20, false); runnable.run(); @@ -184,7 +187,8 @@ public void testNotInProgressOrBeingDeleted() { YarnAutoScalingManager.YarnAutoScalingRunnable runnable = new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, - 1.0, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, defaultContainerCores); + 1.0, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, + defaultContainerCores, 20, false); runnable.run(); @@ -211,7 +215,8 @@ public void testMultiplePartitionsPerContainer() { YarnAutoScalingManager.YarnAutoScalingRunnable runnable = new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 2, - 1.0, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, defaultContainerCores); + 1.0, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, + defaultContainerCores, 20, false); runnable.run(); @@ -234,7 +239,8 @@ public void testOverprovision() { YarnAutoScalingManager.YarnAutoScalingRunnable runnable1 = new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, - 1.2, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, defaultContainerCores); + 1.2, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, + defaultContainerCores, 20, false); runnable1.run(); @@ -246,7 +252,8 @@ public void testOverprovision() { YarnAutoScalingManager.YarnAutoScalingRunnable runnable2 = new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, - 0.1, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, defaultContainerCores); + 0.1, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, + defaultContainerCores, 20, false); runnable2.run(); @@ -258,7 +265,8 @@ public void testOverprovision() { Mockito.reset(mockYarnService); YarnAutoScalingManager.YarnAutoScalingRunnable runnable3 = new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, - 6.0, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, defaultContainerCores); + 6.0, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, + defaultContainerCores, 20, false); runnable3.run(); @@ -384,7 +392,8 @@ public void testFlowsWithHelixTags() { YarnAutoScalingManager.YarnAutoScalingRunnable runnable = new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, - 1.0, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, defaultContainerCores); + 1.0, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, + defaultContainerCores, 20, false); runnable.run(); @@ -469,7 +478,8 @@ private static class TestYarnAutoScalingRunnable extends YarnAutoScalingManager. public TestYarnAutoScalingRunnable(TaskDriver taskDriver, YarnService yarnService, int partitionsPerContainer, HelixDataAccessor helixDataAccessor) { super(taskDriver, yarnService, partitionsPerContainer, 1.0, - noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, defaultContainerCores); + noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, + defaultContainerCores, 20, false); } @Override