Skip to content

Commit

Permalink
[GOBBLIN-1968] Temporal commit step integration (#3829)
Browse files Browse the repository at this point in the history
Add commit step to Gobblin temporal workflow for job publish
  • Loading branch information
Will-Lo authored Dec 15, 2023
1 parent cb36e2c commit 18fba9e
Show file tree
Hide file tree
Showing 10 changed files with 396 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ public static Optional<Class<? extends DataPublisher>> getJobDataPublisherClass(
* Data should be committed by the job if either {@link ConfigurationKeys#JOB_COMMIT_POLICY_KEY} is set to "full",
* or {@link ConfigurationKeys#PUBLISH_DATA_AT_JOB_LEVEL} is set to true.
*/
private static boolean shouldCommitDataInJob(State state) {
public static boolean shouldCommitDataInJob(State state) {
boolean jobCommitPolicyIsFull =
JobCommitPolicy.getCommitPolicy(state.getProperties()) == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
boolean publishDataAtJobLevel = state.getPropAsBoolean(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
*/
@RequiredArgsConstructor
@Slf4j
final class SafeDatasetCommit implements Callable<Void> {
public final class SafeDatasetCommit implements Callable<Void> {

private static final Object GLOBAL_LOCK = new Object();

Expand Down Expand Up @@ -319,8 +319,8 @@ private void finalizeDatasetStateBeforeCommit(JobState.DatasetState datasetState
datasetState.setState(JobState.RunningState.FAILED);
datasetState.incrementJobFailures();
Optional<String> taskStateException = taskState.getTaskFailureException();
log.warn("At least one task did not get committed successfully. Setting dataset state to FAILED. "
+ (taskStateException.isPresent() ? taskStateException.get() : "Exception not set."));
log.warn("Failed task state for {} At least one task did not get committed successfully. Setting dataset state to FAILED. {}" ,
taskState.getWorkunit().getOutputFilePath(), taskStateException.isPresent() ? taskStateException.get() : "Exception not set.");
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.Callable;
Expand All @@ -29,10 +30,7 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Queues;
Expand Down Expand Up @@ -68,8 +66,6 @@
@Slf4j
public class TaskStateCollectorService extends AbstractScheduledService {

private static final Logger LOGGER = LoggerFactory.getLogger(TaskStateCollectorService.class);

private final JobState jobState;

private final EventBus eventBus;
Expand Down Expand Up @@ -145,7 +141,7 @@ public TaskStateCollectorService(Properties jobProps, JobState jobState, EventBu
throw new RuntimeException("Could not construct TaskCollectorHandler " + handlerTypeName, rfe);
}
} else {
optionalTaskCollectorHandler = Optional.absent();
optionalTaskCollectorHandler = Optional.empty();
}

isJobProceedOnCollectorServiceFailure =
Expand All @@ -166,13 +162,13 @@ protected Scheduler scheduler() {

@Override
protected void startUp() throws Exception {
LOGGER.info("Starting the " + TaskStateCollectorService.class.getSimpleName());
log.info("Starting the " + TaskStateCollectorService.class.getSimpleName());
super.startUp();
}

@Override
protected void shutDown() throws Exception {
LOGGER.info("Stopping the " + TaskStateCollectorService.class.getSimpleName());
log.info("Stopping the " + TaskStateCollectorService.class.getSimpleName());
try {
runOneIteration();
} finally {
Expand All @@ -193,42 +189,14 @@ protected void shutDown() throws Exception {
* @throws IOException if it fails to collect the output {@link TaskState}s
*/
private void collectOutputTaskStates() throws IOException {
List<String> taskStateNames = taskStateStore.getTableNames(outputTaskStateDir.getName(), new Predicate<String>() {
@Override
public boolean apply(String input) {
return input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)
&& !input.startsWith(FsStateStore.TMP_FILE_PREFIX);
}});

if (taskStateNames == null || taskStateNames.size() == 0) {
LOGGER.debug("No output task state files found in " + this.outputTaskStateDir);
final Optional<Queue<TaskState>> taskStateQueue = deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir.getName(), this.stateSerDeRunnerThreads);
if (!taskStateQueue.isPresent()) {
return;
}

final Queue<TaskState> taskStateQueue = Queues.newConcurrentLinkedQueue();
try (ParallelRunner stateSerDeRunner = new ParallelRunner(this.stateSerDeRunnerThreads, null)) {
for (final String taskStateName : taskStateNames) {
LOGGER.debug("Found output task state file " + taskStateName);
// Deserialize the TaskState and delete the file
stateSerDeRunner.submitCallable(new Callable<Void>() {
@Override
public Void call() throws Exception {
TaskState taskState = taskStateStore.getAll(outputTaskStateDir.getName(), taskStateName).get(0);
taskStateQueue.add(taskState);
taskStateStore.delete(outputTaskStateDir.getName(), taskStateName);
return null;
}
}, "Deserialize state for " + taskStateName);
}
} catch (IOException ioe) {
LOGGER.warn("Could not read all task state files.");
}

LOGGER.info(String.format("Collected task state of %d completed tasks", taskStateQueue.size()));

// Add the TaskStates of completed tasks to the JobState so when the control
// returns to the launcher, it sees the TaskStates of all completed tasks.
for (TaskState taskState : taskStateQueue) {
for (TaskState taskState : taskStateQueue.get()) {
consumeTaskIssues(taskState);
taskState.setJobState(this.jobState);
this.jobState.addTaskState(taskState);
Expand All @@ -241,22 +209,68 @@ public Void call() throws Exception {
// Finish any additional steps defined in handler on driver level.
// Currently implemented handler for Hive registration only.
if (optionalTaskCollectorHandler.isPresent()) {
LOGGER.info("Execute Pipelined TaskStateCollectorService Handler for " + taskStateQueue.size() + " tasks");
log.info("Execute Pipelined TaskStateCollectorService Handler for " + taskStateQueue.get().size() + " tasks");

try {
optionalTaskCollectorHandler.get().handle(taskStateQueue);
optionalTaskCollectorHandler.get().handle(taskStateQueue.get());
} catch (Throwable t) {
if (isJobProceedOnCollectorServiceFailure) {
log.error("Failed to commit dataset while job proceeds", t);
SafeDatasetCommit.setTaskFailureException(taskStateQueue, t);
SafeDatasetCommit.setTaskFailureException(taskStateQueue.get(), t);
} else {
throw new RuntimeException("Hive Registration as the TaskStateCollectorServiceHandler failed.", t);
}
}
}

// Notify the listeners for the completion of the tasks
this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue.get())));
}

/**
* Reads in a {@link StateStore} and deserializes all task states found in the provided table name
* Task State files are populated by the {@link GobblinMultiTaskAttempt} to record the output of remote concurrent tasks (e.g. MR mappers)
* @param taskStateStore
* @param taskStateTableName
* @param numDeserializerThreads
* @return Queue of TaskStates, optional if no task states are found in the provided state store
* @throws IOException
*/
public static Optional<Queue<TaskState>> deserializeTaskStatesFromFolder(StateStore<TaskState> taskStateStore, String taskStateTableName,
int numDeserializerThreads) throws IOException {
List<String> taskStateNames = taskStateStore.getTableNames(taskStateTableName, new Predicate<String>() {
@Override
public boolean apply(String input) {
return input != null
&& input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)
&& !input.startsWith(FsStateStore.TMP_FILE_PREFIX);
}});

if (taskStateNames == null || taskStateNames.isEmpty()) {
log.warn("No output task state files found in " + taskStateTableName);
return Optional.empty();
}

final Queue<TaskState> taskStateQueue = Queues.newConcurrentLinkedQueue();
try (ParallelRunner stateSerDeRunner = new ParallelRunner(numDeserializerThreads, null)) {
for (final String taskStateName : taskStateNames) {
log.debug("Found output task state file " + taskStateName);
// Deserialize the TaskState and delete the file
stateSerDeRunner.submitCallable(new Callable<Void>() {
@Override
public Void call() throws Exception {
TaskState taskState = taskStateStore.getAll(taskStateTableName, taskStateName).get(0);
taskStateQueue.add(taskState);
taskStateStore.delete(taskStateTableName, taskStateName);
return null;
}
}, "Deserialize state for " + taskStateName);
}
} catch (IOException ioe) {
log.error("Could not read all task state files due to", ioe);
}
log.info(String.format("Collected task state of %d completed tasks in %s", taskStateQueue.size(), taskStateTableName));
return Optional.of(taskStateQueue);
}

/**
Expand All @@ -267,15 +281,15 @@ public Void call() throws Exception {
private void reportJobProgress(TaskState taskState) {
String stringSize = taskState.getProp(ServiceConfigKeys.WORK_UNIT_SIZE);
if (stringSize == null) {
LOGGER.warn("Expected to report job progress but work unit byte size property null");
log.warn("Expected to report job progress but work unit byte size property null");
return;
}

Long taskByteSize = Long.parseLong(stringSize);

// If progress reporting is enabled, value should be present
if (!this.jobState.contains(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE)) {
LOGGER.warn("Expected to report job progress but total bytes to copy property null");
log.warn("Expected to report job progress but total bytes to copy property null");
return;
}
this.totalSizeToCopy = this.jobState.getPropAsLong(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE);
Expand All @@ -287,7 +301,7 @@ private void reportJobProgress(TaskState taskState) {
this.workUnitsCompletedSoFar += 1;

if (this.totalNumWorkUnits == 0) {
LOGGER.warn("Expected to report job progress but work units are not countable");
log.warn("Expected to report job progress but work units are not countable");
return;
}
newPercentageCopied = this.workUnitsCompletedSoFar / this.totalNumWorkUnits;
Expand All @@ -307,7 +321,7 @@ private void reportJobProgress(TaskState taskState) {
Map<String, String> progress = new HashMap<>();
progress.put(TimingEvent.JOB_COMPLETION_PERCENTAGE, String.valueOf(percentageToReport));

LOGGER.info("Sending copy progress event with percentage " + percentageToReport + "%");
log.info("Sending copy progress event with percentage " + percentageToReport + "%");
new TimingEvent(this.eventSubmitter, TimingEvent.JOB_COMPLETION_PERCENTAGE).stop(progress);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.activity;

import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;


/** Activity for reading the output of work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl} by
* reading in a {@link WUProcessingSpec} to determine the location of the output task states */
@ActivityInterface
public interface CommitActivity {
/**
* Commit the output of the work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl}
* @param workSpec
* @return number of workunits committed
*/
@ActivityMethod
int commit(WUProcessingSpec workSpec);
}
Loading

0 comments on commit 18fba9e

Please sign in to comment.