Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1947] Send workUnitChange event when helix task consistently fail #3832

Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

package org.apache.gobblin.cluster;

import com.google.common.eventbus.Subscribe;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -30,6 +34,14 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import java.util.stream.Collectors;
import org.apache.gobblin.runtime.SourceDecorator;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaUtils;
import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
import org.apache.gobblin.source.workunit.WorkUnitStream;
import org.apache.gobblin.stream.WorkUnitChangeEvent;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -78,6 +90,8 @@
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.SerializationUtils;

import static org.apache.gobblin.util.JobLauncherUtils.*;


/**
* An implementation of {@link JobLauncher} that launches a Gobblin job using the Helix task framework.
Expand Down Expand Up @@ -131,7 +145,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
private final long workFlowExpiryTimeSeconds;
private final long helixJobStopTimeoutSeconds;
private final long helixWorkflowSubmissionTimeoutSeconds;
private Map<String, TaskConfig> workUnitToHelixConfig;
private Map<String, TaskConfig> helixIdTaskConfigMap;
private Retryer<Boolean> taskRetryer;

public GobblinHelixJobLauncher(Properties jobProps, final HelixManager helixManager, Path appWorkDir,
Expand Down Expand Up @@ -185,7 +199,7 @@ public GobblinHelixJobLauncher(Properties jobProps, final HelixManager helixMana
this.stateStores.getTaskStateStore(), this.outputTaskStateDir, this.getIssueRepository());

this.helixMetrics = helixMetrics;
this.workUnitToHelixConfig = new HashMap<>();
this.helixIdTaskConfigMap = new HashMap<>();
this.taskRetryer = RetryerBuilder.<Boolean>newBuilder()
.retryIfException()
.withStopStrategy(StopStrategies.stopAfterAttempt(3)).build();
Expand Down Expand Up @@ -254,6 +268,76 @@ protected void runWorkUnits(List<WorkUnit> workUnits) throws Exception {
}
}

/**
* The implementation of this method has the assumption that work unit change should never delete without adding new
* work units, which will cause starvation. Thus, process {@link WorkUnitChangeEvent} for two scenario:
* 1. workUnitChangeEvent only contains old tasks and no new tasks given: recalculate new work unit through kafka
* source and pack with a min container setting.
* 2. workUnitChangeEvent contains both valid old and new work unit: respect the information and directly remove
* old tasks and start new work units
*
* @param workUnitChangeEvent Event post by EventBus to specify old tasks to be removed and new tasks to be run
* @throws InvocationTargetException
*/
@Override
@Subscribe
public void handleWorkUnitChangeEvent(WorkUnitChangeEvent workUnitChangeEvent)
throws InvocationTargetException {
log.info("Received WorkUnitChangeEvent with old Task {} and new WU {}",
workUnitChangeEvent.getOldTaskIds(), workUnitChangeEvent.getNewWorkUnits());
final JobState jobState = this.jobContext.getJobState();
List<WorkUnit> workUnits = workUnitChangeEvent.getNewWorkUnits();
// Use old task Id to recalculate new work units
if(workUnits == null || workUnits.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which scenario, we will have no new work units here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we send workUnitChangeEvent from YarnAutoScalingManager. The yarn service only has the information of helix, so may not easy to pre-calculate the new workUnit as this process need KafkaSource, which yarn isn't aware of.

workUnits = recalculateWorkUnit(workUnitChangeEvent.getOldTaskIds());
// If no new valid work units can be generated, dismiss the WorkUnitChangeEvent
if(workUnits == null || workUnits.isEmpty()) {
log.info("Not able to update work unit meaningfully, dismiss the WorkUnitChangeEvent");
return;
}
}

// Follow how AbstractJobLauncher handles work units to make sure consistent behaviour
WorkUnitStream workUnitStream = new BasicWorkUnitStream.Builder(workUnits).build();
// For streaming use case, this might be a necessary step to find dataset specific namespace so that each workUnit
// can create staging and temp directories with the correct determination of shard-path
workUnitStream = this.executeHandlers(workUnitStream, this.destDatasetHandlerService);
workUnitStream = this.processWorkUnitStream(workUnitStream, jobState);
workUnits = materializeWorkUnitList(workUnitStream);
try {
this.removeTasksFromCurrentJob(workUnitChangeEvent.getOldTaskIds());
this.addTasksToCurrentJob(workUnits);
} catch (Exception e) {
//todo: emit some event to indicate there is an error handling this event that may cause starvation
log.error("Failed to process WorkUnitChangeEvent with old tasks {} and new workunits {}.",
workUnitChangeEvent.getOldTaskIds(), workUnits, e);
throw new InvocationTargetException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did we test what's the behavior for throw this exception? are we able to catch it and fail the whole application and restart directly? Or it will finally fail silently and starve?

Also curious why do we throw InvocationTargetException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwing error actually won't fail the application, so we rely on the Retryer and replanner(if Retryer also failed) here. I've run the test for a week and didn't see any error throwing, but I do agree this may be hard to debug.
I've tried to restart the whole workflow but it's not very straightforward

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the InvocationTargetException, it's actually inherited from the super class which written by you :)

}
}

private List<WorkUnit> recalculateWorkUnit(List<String> oldHelixTaskIDs) {
JobState jobState = this.jobContext.getJobState();
Map<String, List<Integer>> filteredTopicPartition = new HashMap<>();
for(String id : oldHelixTaskIDs) {
WorkUnit workUnit = getWorkUnitFromStateStoreByHelixId(id);
String topicName = workUnit.getProp(KafkaSource.TOPIC_NAME);
List<Integer> partitions = filteredTopicPartition.getOrDefault(topicName,
new LinkedList<>());
partitions.addAll(KafkaUtils.getPartitions(workUnit).stream().map(KafkaPartition::getId).collect(Collectors.toList()));
filteredTopicPartition.put(topicName, partitions);
}
// If a topic contains less than 2 filtered partition, it can't be further split so remove from map
filteredTopicPartition.values().removeIf(list -> list == null || list.size() < 2);
if(filteredTopicPartition.isEmpty()) {
return new ArrayList<>();
}
KafkaSource<?, ?> source = (KafkaSource<?, ?>) ((SourceDecorator<?, ?>) this.jobContext.getSource()).getSource();
//TODO: having a smarter way to calculate the new work unit size to replace the current static approach to simply double
int newWorkUnitSize = oldHelixTaskIDs.size() * 2;
return source.getWorkunitsForFilteredPartitions(jobState,
com.google.common.base.Optional.of(filteredTopicPartition), com.google.common.base.Optional.of(newWorkUnitSize));
}

@Override
protected void executeCancellation() {
if (this.jobSubmitted) {
Expand All @@ -275,31 +359,31 @@ protected void executeCancellation() {
}
}

protected void removeTasksFromCurrentJob(List<String> workUnitIdsToRemove) throws IOException, ExecutionException,
protected void removeTasksFromCurrentJob(List<String> helixTaskIdsToRemove) throws IOException, ExecutionException,
RetryException {
String jobName = this.jobContext.getJobId();
try (ParallelRunner stateSerDeRunner = new ParallelRunner(this.stateSerDeRunnerThreads, this.fs)) {
for (String workUnitId : workUnitIdsToRemove) {
for (String helixTaskId : helixTaskIdsToRemove) {
taskRetryer.call(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
String taskId = workUnitToHelixConfig.get(workUnitId).getId();
String workUnitId = helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(ConfigurationKeys.TASK_ID_KEY);
boolean remove =
HelixUtils.deleteTaskFromHelixJob(helixWorkFlowName, jobName, taskId, helixTaskDriver);
HelixUtils.deleteTaskFromHelixJob(helixWorkFlowName, jobName, helixTaskId, helixTaskDriver);
if (remove) {
log.info(String.format("Removed helix task %s with gobblin task id %s from helix job %s:%s ", taskId,
log.info(String.format("Removed helix task %s with gobblin task id %s from helix job %s:%s ", helixTaskId,
workUnitId, helixWorkFlowName, jobName));
} else {
throw new IOException(
String.format("Cannot remove task %s from helix job %s:%s", workUnitId,
String.format("Cannot remove Helix task %s from helix job %s:%s", helixTaskId,
helixWorkFlowName, jobName));
}
return true;
}
});
deleteWorkUnitFromStateStore(workUnitId, stateSerDeRunner);
log.info(String.format("remove task state for %s in state store", workUnitId));
this.workUnitToHelixConfig.remove(workUnitId);
deleteWorkUnitFromStateStoreByHelixId(helixTaskId, stateSerDeRunner);
log.info(String.format("Removed task state for Helix task %s in state store", helixTaskId));
this.helixIdTaskConfigMap.remove(helixTaskId);
}
}
}
Expand Down Expand Up @@ -513,7 +597,7 @@ private TaskConfig getTaskConfig(WorkUnit workUnit, ParallelRunner stateSerDeRun
rawConfigMap.put(ConfigurationKeys.TASK_ID_KEY, workUnit.getId());
rawConfigMap.put(GobblinClusterConfigurationKeys.TASK_SUCCESS_OPTIONAL_KEY, "true");
TaskConfig taskConfig = TaskConfig.Builder.from(rawConfigMap);
workUnitToHelixConfig.put(workUnit.getId(), taskConfig);
helixIdTaskConfigMap.put(taskConfig.getId(), taskConfig);
return taskConfig;
}

Expand All @@ -526,12 +610,36 @@ private void addWorkUnit(WorkUnit workUnit, ParallelRunner stateSerDeRunner, Map
taskConfigMap.put(workUnit.getId(), getTaskConfig(workUnit, stateSerDeRunner));
}

/**
* get a single {@link WorkUnit} (flattened) from state store.
*/
private WorkUnit getWorkUnitFromStateStoreByHelixId(String helixTaskId) {
String workUnitFilePath =
helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH);
final StateStore stateStore;
Path workUnitFile = new Path(workUnitFilePath);
String workUnitId = helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(ConfigurationKeys.TASK_ID_KEY);
final String fileName = workUnitFile.getName();
final String storeName = workUnitFile.getParent().getName();
if (fileName.endsWith(MULTI_WORK_UNIT_FILE_EXTENSION)) {
stateStore = stateStores.getMwuStateStore();
} else {
stateStore = stateStores.getWuStateStore();
}
try {
return (WorkUnit) stateStore.get(storeName, fileName, workUnitId);
} catch (IOException ioException) {
log.error("Failed to fetch workUnit for helix task {} from path {}", helixTaskId, workUnitFilePath);
}
return null;
}

/**
* Delete a single {@link WorkUnit} (flattened) from state store.
*/
private void deleteWorkUnitFromStateStore(String workUnitId, ParallelRunner stateSerDeRunner) {
private void deleteWorkUnitFromStateStoreByHelixId(String helixTaskId, ParallelRunner stateSerDeRunner) {
String workUnitFilePath =
workUnitToHelixConfig.get(workUnitId).getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH);
helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH);
Path workUnitFile = new Path(workUnitFilePath);
final String fileName = workUnitFile.getName();
final String storeName = workUnitFile.getParent().getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,14 @@ public GobblinHelixJobLauncher buildJobLauncher(Properties jobProps)
Properties combinedProps = new Properties();
combinedProps.putAll(properties);
combinedProps.putAll(jobProps);

return new GobblinHelixJobLauncher(combinedProps,
GobblinHelixJobLauncher gobblinHelixJobLauncher = new GobblinHelixJobLauncher(combinedProps,
this.jobHelixManager,
this.appWorkDir,
this.metadataTags,
this.jobRunningMap,
Optional.of(this.helixMetrics));
this.eventBus.register(gobblinHelixJobLauncher);
return gobblinHelixJobLauncher;
}

public Future<?> scheduleJobImmediately(Properties jobProps, JobListener jobListener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.gobblin.cluster;

import com.google.common.eventbus.EventBus;
import java.io.File;
import java.util.Optional;
import java.util.Properties;
Expand Down Expand Up @@ -59,8 +60,8 @@ public void testBuildJobLauncher()
MutableJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
SchedulerService schedulerService = new SchedulerService(new Properties());
Path appWorkDir = new Path(TMP_DIR);
GobblinHelixJobScheduler jobScheduler = new GobblinHelixJobScheduler(ConfigFactory.empty(), getMockHelixManager(), Optional.empty(), null,
appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
GobblinHelixJobScheduler jobScheduler = new GobblinHelixJobScheduler(ConfigFactory.empty(), getMockHelixManager(), Optional.empty(),
new EventBus("Test"), appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
GobblinHelixJobLauncher jobLauncher = HelixRetriggeringJobCallable.buildJobLauncherForCentralizedMode(jobScheduler, getDummyJob());
String jobId = jobLauncher.getJobId();
Assert.assertNotNull(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,17 @@ public List<WorkUnit> getWorkunitsForFilteredPartitions(SourceState state,

Collection<KafkaTopic> topics;
if(filteredTopicPartition.isPresent()) {
// If filteredTopicPartition present, use it to construct the whitelist pattern while leave blacklist empty
topics = this.kafkaConsumerClient.get().getFilteredTopics(Collections.emptyList(),
filteredTopicPartitionMap.keySet().stream().map(Pattern::compile).collect(Collectors.toList()));
if(filteredTopicPartition.get().isEmpty()) {
// return an empty list as filteredTopicPartition is present but contains no valid entry
return new ArrayList<>();
} else {
// If filteredTopicPartition present, use it to construct the whitelist pattern while leave blacklist empty
topics = this.kafkaConsumerClient.get()
.getFilteredTopics(Collections.emptyList(),
filteredTopicPartitionMap.keySet().stream().map(Pattern::compile).collect(Collectors.toList()));
}
} else {
// get topics based on job level config
topics = getValidTopics(getFilteredTopics(state), state);
}
this.topicsToProcess = topics.stream().map(KafkaTopic::getName).collect(toSet());
Expand All @@ -263,6 +270,8 @@ public String apply(KafkaTopic topic) {

int numOfThreads = state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT);
// No need to allocate more thread than the topic size, but minimum should 1
numOfThreads = Math.max(Math.min(numOfThreads, topics.size()), 1);
ExecutorService threadPool =
Executors.newFixedThreadPool(numOfThreads, ExecutorsUtils.newThreadFactory(Optional.of(LOG)));

Expand All @@ -277,7 +286,7 @@ public String apply(KafkaTopic topic) {
Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted();

for (KafkaTopic topic : topics) {
LOG.info("Discovered topic " + topic);
LOG.info("Discovered topic {} with {} number of partitions", topic.getName(), topic.getPartitions().size());
if (topic.getTopicSpecificState().isPresent()) {
topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new State())
.addAllIfNotExist(topic.getTopicSpecificState().get());
Expand Down Expand Up @@ -343,6 +352,8 @@ public String apply(KafkaTopic topic) {
protected void populateClientPool(int count,
GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory,
Config config) {
// Clear the pool first as clients within may already be close
kafkaConsumerClientPool.clear();
for (int i = 0; i < count; i++) {
kafkaConsumerClientPool.offer(kafkaConsumerClientFactory.create(config));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;

import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.hadoop.fs.Path;

Expand Down Expand Up @@ -135,6 +136,10 @@ public enum ContainerCapacityComputationStrategy {

private static final String NUM_CONTAINERS_EVENT_NAME = "NumContainers";

// id to append to the task output directory to make it unique to avoid multiple flush publishers attempting to move
// the same file. Make it static and atomic as during runtime packing can happen multiple times simultaneously
private static AtomicInteger uniqueId = new AtomicInteger(0);

private final long packingStartTimeMillis;
private final double minimumContainerCapacity;
private final Optional<StateStoreBasedWatermarkStorage> watermarkStorage;
Expand Down Expand Up @@ -305,7 +310,8 @@ private void addStatsToWorkUnits(Map<String, List<WorkUnit>> workUnitsByTopic) t

private Double getDefaultWorkUnitSize() {
return state.getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY,
KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY) / state.getPropAsDouble(DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER_KEY, DEFAULT_DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER);
KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY) /
state.getPropAsDouble(DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER_KEY, DEFAULT_DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER);
}

/**
Expand Down Expand Up @@ -339,9 +345,6 @@ protected List<WorkUnit> squeezeMultiWorkUnits(List<MultiWorkUnit> multiWorkUnit
if (state.getPropAsBoolean(INDEXING_ENABLED, DEFAULT_INDEXING_ENABLED)) {
List<WorkUnit> indexedWorkUnitList = new ArrayList<>();

// id to append to the task output directory to make it unique to avoid multiple flush publishers
// attempting to move the same file.
int uniqueId = 0;
for (MultiWorkUnit mwu : multiWorkUnits) {
// Select a sample WU.
WorkUnit indexedWorkUnit = mwu.getWorkUnits().get(0);
Expand All @@ -355,7 +358,8 @@ protected List<WorkUnit> squeezeMultiWorkUnits(List<MultiWorkUnit> multiWorkUnit

// Need to make the task output directory unique to file move conflicts in the flush publisher.
String outputDir = state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR);
indexedWorkUnit.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(outputDir, Integer.toString(uniqueId++)));
indexedWorkUnit.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR,
new Path(outputDir, Integer.toString(uniqueId.getAndIncrement())));
indexedWorkUnitList.add(indexedWorkUnit);
}
return indexedWorkUnitList;
Expand Down
Loading
Loading