-
Notifications
You must be signed in to change notification settings - Fork 751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-1947] Send workUnitChange event when helix task consistently fail #3832
[GOBBLIN-1947] Send workUnitChange event when helix task consistently fail #3832
Conversation
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #3832 +/- ##
============================================
- Coverage 47.53% 45.96% -1.57%
+ Complexity 11035 2181 -8854
============================================
Files 2156 416 -1740
Lines 85377 18044 -67333
Branches 9491 2199 -7292
============================================
- Hits 40581 8294 -32287
+ Misses 41099 8865 -32234
+ Partials 3697 885 -2812 ☔ View full report in Codecov by Sentry. |
final JobState jobState = this.jobContext.getJobState(); | ||
List<WorkUnit> workUnits = workUnitChangeEvent.getNewWorkUnits(); | ||
// Use old task Id to recalculate new work units | ||
if(workUnits == null || workUnits.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In which scenario, we will have no new work units here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we send workUnitChangeEvent from YarnAutoScalingManager. The yarn service only has the information of helix, so may not easy to pre-calculate the new workUnit as this process need KafkaSource, which yarn isn't aware of.
//todo: emit some event to indicate there is an error handling this event that may cause starvation | ||
log.error("Failed to process WorkUnitChangeEvent with old tasks {} and new workunits {}.", | ||
workUnitChangeEvent.getOldTaskIds(), workUnits, e); | ||
throw new InvocationTargetException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did we test what's the behavior for throw this exception? are we able to catch it and fail the whole application and restart directly? Or it will finally fail silently and starve?
Also curious why do we throw InvocationTargetException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throwing error actually won't fail the application, so we rely on the Retryer and replanner(if Retryer also failed) here. I've run the test for a week and didn't see any error throwing, but I do agree this may be hard to debug.
I've tried to restart the whole workflow but it's not very straightforward
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the InvocationTargetException, it's actually inherited from the super class which written by you :)
RetryException { | ||
String jobName = this.jobContext.getJobId(); | ||
try (ParallelRunner stateSerDeRunner = new ParallelRunner(this.stateSerDeRunnerThreads, this.fs)) { | ||
for (String workUnitId : workUnitIdsToRemove) { | ||
for (String helixTaskId : helixTaskIdsToRemove) { | ||
String workUnitId = helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(ConfigurationKeys.TASK_ID_KEY); | ||
taskRetryer.call(new Callable<Boolean>() { | ||
@Override | ||
public Boolean call() throws Exception { | ||
String taskId = workUnitToHelixConfig.get(workUnitId).getId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you still need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
@@ -514,6 +599,7 @@ private TaskConfig getTaskConfig(WorkUnit workUnit, ParallelRunner stateSerDeRun | |||
rawConfigMap.put(GobblinClusterConfigurationKeys.TASK_SUCCESS_OPTIONAL_KEY, "true"); | |||
TaskConfig taskConfig = TaskConfig.Builder.from(rawConfigMap); | |||
workUnitToHelixConfig.put(workUnit.getId(), taskConfig); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, do we still need this? I feel you want to use helix TaskId instead of work unit Id here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically seems helixIdTaskConfigMap and workUnitToHelixConfig are similar, can we just use one of them to reduce complexity?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed workUnitToHelixConfig
Boolean canCleanUp = this.canCleanStagingData(this.jobContext.getJobState()); | ||
workUnitStream = closer.register(new DestinationDatasetHandlerService(jobState, canCleanUp, this.eventSubmitter)) | ||
.executeHandlers(workUnitStream); | ||
this.canCleanUpStagingData = this.canCleanStagingData(this.jobContext.getJobState()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this from other change? seems not related to this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is actually needed and may easily get overlooked. It's a process that make sure the workUnit can handle shards for target directory. Took me a while the figure out and debug when testing in the cluster...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused for the issue we are trying to solve here. Can you add comment in code to explain that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
… fail (apache#3832) * Send WorkUnitChangeEvent when helix task consistently fail * make lancher and scheduler correctly process work unit change event * change back pack config key * correctly process workunit stream before run * only use helix task map * update WorkUnitPreparator for job launcher * update log * use workunit id for state store
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
When YarnAutoScalingManager detect helix task consistently fail, give an option to send WorkUnitChangeEvent to let GobblinHelixJobLauncher handle the event and split the work unit during runtime. This can help resolving consistent failing containers issue(like OOM) during runtime instead of relying on replaner to restart the whole pipeline
Tests
Updated test cases and tested in cluster
Commits