Skip to content

Commit

Permalink
[GOBBLIN-1947] Send workUnitChange event when helix task consistently…
Browse files Browse the repository at this point in the history
… fail (apache#3832)

* Send WorkUnitChangeEvent when helix task consistently fail

* make lancher and scheduler correctly process work unit change event

* change back pack config key

* correctly process workunit stream before run

* only use helix task map

* update WorkUnitPreparator for job launcher

* update log

* use workunit id for state store
  • Loading branch information
hanghangliu authored and arjun4084346 committed Jan 10, 2024
1 parent 7bc94d3 commit e285952
Show file tree
Hide file tree
Showing 11 changed files with 239 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

package org.apache.gobblin.cluster;

import com.google.common.eventbus.Subscribe;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -30,6 +34,14 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import java.util.stream.Collectors;
import org.apache.gobblin.runtime.SourceDecorator;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaUtils;
import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
import org.apache.gobblin.source.workunit.WorkUnitStream;
import org.apache.gobblin.stream.WorkUnitChangeEvent;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -78,6 +90,8 @@
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.SerializationUtils;

import static org.apache.gobblin.util.JobLauncherUtils.*;


/**
* An implementation of {@link JobLauncher} that launches a Gobblin job using the Helix task framework.
Expand Down Expand Up @@ -131,7 +145,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
private final long workFlowExpiryTimeSeconds;
private final long helixJobStopTimeoutSeconds;
private final long helixWorkflowSubmissionTimeoutSeconds;
private Map<String, TaskConfig> workUnitToHelixConfig;
private Map<String, TaskConfig> helixIdTaskConfigMap;
private Retryer<Boolean> taskRetryer;

public GobblinHelixJobLauncher(Properties jobProps, final HelixManager helixManager, Path appWorkDir,
Expand Down Expand Up @@ -185,7 +199,7 @@ public GobblinHelixJobLauncher(Properties jobProps, final HelixManager helixMana
this.stateStores.getTaskStateStore(), this.outputTaskStateDir, this.getIssueRepository());

this.helixMetrics = helixMetrics;
this.workUnitToHelixConfig = new HashMap<>();
this.helixIdTaskConfigMap = new HashMap<>();
this.taskRetryer = RetryerBuilder.<Boolean>newBuilder()
.retryIfException()
.withStopStrategy(StopStrategies.stopAfterAttempt(3)).build();
Expand Down Expand Up @@ -254,6 +268,76 @@ protected void runWorkUnits(List<WorkUnit> workUnits) throws Exception {
}
}

/**
* The implementation of this method has the assumption that work unit change should never delete without adding new
* work units, which will cause starvation. Thus, process {@link WorkUnitChangeEvent} for two scenario:
* 1. workUnitChangeEvent only contains old tasks and no new tasks given: recalculate new work unit through kafka
* source and pack with a min container setting.
* 2. workUnitChangeEvent contains both valid old and new work unit: respect the information and directly remove
* old tasks and start new work units
*
* @param workUnitChangeEvent Event post by EventBus to specify old tasks to be removed and new tasks to be run
* @throws InvocationTargetException
*/
@Override
@Subscribe
public void handleWorkUnitChangeEvent(WorkUnitChangeEvent workUnitChangeEvent)
throws InvocationTargetException {
log.info("Received WorkUnitChangeEvent with old Task {} and new WU {}",
workUnitChangeEvent.getOldTaskIds(), workUnitChangeEvent.getNewWorkUnits());
final JobState jobState = this.jobContext.getJobState();
List<WorkUnit> workUnits = workUnitChangeEvent.getNewWorkUnits();
// Use old task Id to recalculate new work units
if(workUnits == null || workUnits.isEmpty()) {
workUnits = recalculateWorkUnit(workUnitChangeEvent.getOldTaskIds());
// If no new valid work units can be generated, dismiss the WorkUnitChangeEvent
if(workUnits == null || workUnits.isEmpty()) {
log.info("Not able to update work unit meaningfully, dismiss the WorkUnitChangeEvent");
return;
}
}

// Follow how AbstractJobLauncher handles work units to make sure consistent behaviour
WorkUnitStream workUnitStream = new BasicWorkUnitStream.Builder(workUnits).build();
// For streaming use case, this might be a necessary step to find dataset specific namespace so that each workUnit
// can create staging and temp directories with the correct determination of shard-path
workUnitStream = this.executeHandlers(workUnitStream, this.destDatasetHandlerService);
workUnitStream = this.processWorkUnitStream(workUnitStream, jobState);
workUnits = materializeWorkUnitList(workUnitStream);
try {
this.removeTasksFromCurrentJob(workUnitChangeEvent.getOldTaskIds());
this.addTasksToCurrentJob(workUnits);
} catch (Exception e) {
//todo: emit some event to indicate there is an error handling this event that may cause starvation
log.error("Failed to process WorkUnitChangeEvent with old tasks {} and new workunits {}.",
workUnitChangeEvent.getOldTaskIds(), workUnits, e);
throw new InvocationTargetException(e);
}
}

private List<WorkUnit> recalculateWorkUnit(List<String> oldHelixTaskIDs) {
JobState jobState = this.jobContext.getJobState();
Map<String, List<Integer>> filteredTopicPartition = new HashMap<>();
for(String id : oldHelixTaskIDs) {
WorkUnit workUnit = getWorkUnitFromStateStoreByHelixId(id);
String topicName = workUnit.getProp(KafkaSource.TOPIC_NAME);
List<Integer> partitions = filteredTopicPartition.getOrDefault(topicName,
new LinkedList<>());
partitions.addAll(KafkaUtils.getPartitions(workUnit).stream().map(KafkaPartition::getId).collect(Collectors.toList()));
filteredTopicPartition.put(topicName, partitions);
}
// If a topic contains less than 2 filtered partition, it can't be further split so remove from map
filteredTopicPartition.values().removeIf(list -> list == null || list.size() < 2);
if(filteredTopicPartition.isEmpty()) {
return new ArrayList<>();
}
KafkaSource<?, ?> source = (KafkaSource<?, ?>) ((SourceDecorator<?, ?>) this.jobContext.getSource()).getSource();
//TODO: having a smarter way to calculate the new work unit size to replace the current static approach to simply double
int newWorkUnitSize = oldHelixTaskIDs.size() * 2;
return source.getWorkunitsForFilteredPartitions(jobState,
com.google.common.base.Optional.of(filteredTopicPartition), com.google.common.base.Optional.of(newWorkUnitSize));
}

@Override
protected void executeCancellation() {
if (this.jobSubmitted) {
Expand All @@ -275,31 +359,31 @@ protected void executeCancellation() {
}
}

protected void removeTasksFromCurrentJob(List<String> workUnitIdsToRemove) throws IOException, ExecutionException,
protected void removeTasksFromCurrentJob(List<String> helixTaskIdsToRemove) throws IOException, ExecutionException,
RetryException {
String jobName = this.jobContext.getJobId();
try (ParallelRunner stateSerDeRunner = new ParallelRunner(this.stateSerDeRunnerThreads, this.fs)) {
for (String workUnitId : workUnitIdsToRemove) {
for (String helixTaskId : helixTaskIdsToRemove) {
taskRetryer.call(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
String taskId = workUnitToHelixConfig.get(workUnitId).getId();
String workUnitId = helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(ConfigurationKeys.TASK_ID_KEY);
boolean remove =
HelixUtils.deleteTaskFromHelixJob(helixWorkFlowName, jobName, taskId, helixTaskDriver);
HelixUtils.deleteTaskFromHelixJob(helixWorkFlowName, jobName, helixTaskId, helixTaskDriver);
if (remove) {
log.info(String.format("Removed helix task %s with gobblin task id %s from helix job %s:%s ", taskId,
log.info(String.format("Removed helix task %s with gobblin task id %s from helix job %s:%s ", helixTaskId,
workUnitId, helixWorkFlowName, jobName));
} else {
throw new IOException(
String.format("Cannot remove task %s from helix job %s:%s", workUnitId,
String.format("Cannot remove Helix task %s from helix job %s:%s", helixTaskId,
helixWorkFlowName, jobName));
}
return true;
}
});
deleteWorkUnitFromStateStore(workUnitId, stateSerDeRunner);
log.info(String.format("remove task state for %s in state store", workUnitId));
this.workUnitToHelixConfig.remove(workUnitId);
deleteWorkUnitFromStateStoreByHelixId(helixTaskId, stateSerDeRunner);
log.info(String.format("Removed task state for Helix task %s in state store", helixTaskId));
this.helixIdTaskConfigMap.remove(helixTaskId);
}
}
}
Expand Down Expand Up @@ -513,7 +597,7 @@ private TaskConfig getTaskConfig(WorkUnit workUnit, ParallelRunner stateSerDeRun
rawConfigMap.put(ConfigurationKeys.TASK_ID_KEY, workUnit.getId());
rawConfigMap.put(GobblinClusterConfigurationKeys.TASK_SUCCESS_OPTIONAL_KEY, "true");
TaskConfig taskConfig = TaskConfig.Builder.from(rawConfigMap);
workUnitToHelixConfig.put(workUnit.getId(), taskConfig);
helixIdTaskConfigMap.put(taskConfig.getId(), taskConfig);
return taskConfig;
}

Expand All @@ -526,12 +610,36 @@ private void addWorkUnit(WorkUnit workUnit, ParallelRunner stateSerDeRunner, Map
taskConfigMap.put(workUnit.getId(), getTaskConfig(workUnit, stateSerDeRunner));
}

/**
* get a single {@link WorkUnit} (flattened) from state store.
*/
private WorkUnit getWorkUnitFromStateStoreByHelixId(String helixTaskId) {
String workUnitFilePath =
helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH);
final StateStore stateStore;
Path workUnitFile = new Path(workUnitFilePath);
String workUnitId = helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(ConfigurationKeys.TASK_ID_KEY);
final String fileName = workUnitFile.getName();
final String storeName = workUnitFile.getParent().getName();
if (fileName.endsWith(MULTI_WORK_UNIT_FILE_EXTENSION)) {
stateStore = stateStores.getMwuStateStore();
} else {
stateStore = stateStores.getWuStateStore();
}
try {
return (WorkUnit) stateStore.get(storeName, fileName, workUnitId);
} catch (IOException ioException) {
log.error("Failed to fetch workUnit for helix task {} from path {}", helixTaskId, workUnitFilePath);
}
return null;
}

/**
* Delete a single {@link WorkUnit} (flattened) from state store.
*/
private void deleteWorkUnitFromStateStore(String workUnitId, ParallelRunner stateSerDeRunner) {
private void deleteWorkUnitFromStateStoreByHelixId(String helixTaskId, ParallelRunner stateSerDeRunner) {
String workUnitFilePath =
workUnitToHelixConfig.get(workUnitId).getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH);
helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH);
Path workUnitFile = new Path(workUnitFilePath);
final String fileName = workUnitFile.getName();
final String storeName = workUnitFile.getParent().getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,14 @@ public GobblinHelixJobLauncher buildJobLauncher(Properties jobProps)
Properties combinedProps = new Properties();
combinedProps.putAll(properties);
combinedProps.putAll(jobProps);

return new GobblinHelixJobLauncher(combinedProps,
GobblinHelixJobLauncher gobblinHelixJobLauncher = new GobblinHelixJobLauncher(combinedProps,
this.jobHelixManager,
this.appWorkDir,
this.metadataTags,
this.jobRunningMap,
Optional.of(this.helixMetrics));
this.eventBus.register(gobblinHelixJobLauncher);
return gobblinHelixJobLauncher;
}

public Future<?> scheduleJobImmediately(Properties jobProps, JobListener jobListener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.gobblin.cluster;

import com.google.common.eventbus.EventBus;
import java.io.File;
import java.util.Optional;
import java.util.Properties;
Expand Down Expand Up @@ -59,8 +60,8 @@ public void testBuildJobLauncher()
MutableJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
SchedulerService schedulerService = new SchedulerService(new Properties());
Path appWorkDir = new Path(TMP_DIR);
GobblinHelixJobScheduler jobScheduler = new GobblinHelixJobScheduler(ConfigFactory.empty(), getMockHelixManager(), Optional.empty(), null,
appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
GobblinHelixJobScheduler jobScheduler = new GobblinHelixJobScheduler(ConfigFactory.empty(), getMockHelixManager(), Optional.empty(),
new EventBus("Test"), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
GobblinHelixJobLauncher jobLauncher = HelixRetriggeringJobCallable.buildJobLauncherForCentralizedMode(jobScheduler, getDummyJob());
String jobId = jobLauncher.getJobId();
Assert.assertNotNull(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,17 @@ public List<WorkUnit> getWorkunitsForFilteredPartitions(SourceState state,

Collection<KafkaTopic> topics;
if(filteredTopicPartition.isPresent()) {
// If filteredTopicPartition present, use it to construct the whitelist pattern while leave blacklist empty
topics = this.kafkaConsumerClient.get().getFilteredTopics(Collections.emptyList(),
filteredTopicPartitionMap.keySet().stream().map(Pattern::compile).collect(Collectors.toList()));
if(filteredTopicPartition.get().isEmpty()) {
// return an empty list as filteredTopicPartition is present but contains no valid entry
return new ArrayList<>();
} else {
// If filteredTopicPartition present, use it to construct the whitelist pattern while leave blacklist empty
topics = this.kafkaConsumerClient.get()
.getFilteredTopics(Collections.emptyList(),
filteredTopicPartitionMap.keySet().stream().map(Pattern::compile).collect(Collectors.toList()));
}
} else {
// get topics based on job level config
topics = getValidTopics(getFilteredTopics(state), state);
}
this.topicsToProcess = topics.stream().map(KafkaTopic::getName).collect(toSet());
Expand All @@ -263,6 +270,8 @@ public String apply(KafkaTopic topic) {

int numOfThreads = state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT);
// No need to allocate more thread than the topic size, but minimum should 1
numOfThreads = Math.max(Math.min(numOfThreads, topics.size()), 1);
ExecutorService threadPool =
Executors.newFixedThreadPool(numOfThreads, ExecutorsUtils.newThreadFactory(Optional.of(LOG)));

Expand All @@ -277,7 +286,7 @@ public String apply(KafkaTopic topic) {
Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted();

for (KafkaTopic topic : topics) {
LOG.info("Discovered topic " + topic);
LOG.info("Discovered topic {} with {} number of partitions", topic.getName(), topic.getPartitions().size());
if (topic.getTopicSpecificState().isPresent()) {
topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new State())
.addAllIfNotExist(topic.getTopicSpecificState().get());
Expand Down Expand Up @@ -343,6 +352,8 @@ public String apply(KafkaTopic topic) {
protected void populateClientPool(int count,
GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory,
Config config) {
// Clear the pool first as clients within may already be close
kafkaConsumerClientPool.clear();
for (int i = 0; i < count; i++) {
kafkaConsumerClientPool.offer(kafkaConsumerClientFactory.create(config));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;

import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.hadoop.fs.Path;

Expand Down Expand Up @@ -135,6 +136,10 @@ public enum ContainerCapacityComputationStrategy {

private static final String NUM_CONTAINERS_EVENT_NAME = "NumContainers";

// id to append to the task output directory to make it unique to avoid multiple flush publishers attempting to move
// the same file. Make it static and atomic as during runtime packing can happen multiple times simultaneously
private static AtomicInteger uniqueId = new AtomicInteger(0);

private final long packingStartTimeMillis;
private final double minimumContainerCapacity;
private final Optional<StateStoreBasedWatermarkStorage> watermarkStorage;
Expand Down Expand Up @@ -305,7 +310,8 @@ private void addStatsToWorkUnits(Map<String, List<WorkUnit>> workUnitsByTopic) t

private Double getDefaultWorkUnitSize() {
return state.getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY,
KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY) / state.getPropAsDouble(DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER_KEY, DEFAULT_DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER);
KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY) /
state.getPropAsDouble(DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER_KEY, DEFAULT_DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER);
}

/**
Expand Down Expand Up @@ -339,9 +345,6 @@ protected List<WorkUnit> squeezeMultiWorkUnits(List<MultiWorkUnit> multiWorkUnit
if (state.getPropAsBoolean(INDEXING_ENABLED, DEFAULT_INDEXING_ENABLED)) {
List<WorkUnit> indexedWorkUnitList = new ArrayList<>();

// id to append to the task output directory to make it unique to avoid multiple flush publishers
// attempting to move the same file.
int uniqueId = 0;
for (MultiWorkUnit mwu : multiWorkUnits) {
// Select a sample WU.
WorkUnit indexedWorkUnit = mwu.getWorkUnits().get(0);
Expand All @@ -355,7 +358,8 @@ protected List<WorkUnit> squeezeMultiWorkUnits(List<MultiWorkUnit> multiWorkUnit

// Need to make the task output directory unique to file move conflicts in the flush publisher.
String outputDir = state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR);
indexedWorkUnit.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(outputDir, Integer.toString(uniqueId++)));
indexedWorkUnit.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR,
new Path(outputDir, Integer.toString(uniqueId.getAndIncrement())));
indexedWorkUnitList.add(indexedWorkUnit);
}
return indexedWorkUnitList;
Expand Down
Loading

0 comments on commit e285952

Please sign in to comment.