diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java index b7eb436fa0e..d666543da4d 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java @@ -27,6 +27,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; @@ -42,6 +43,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.exception.RuntimeExceptionWithoutStackTrace; import org.apache.gobblin.metastore.FsStateStore; import org.apache.gobblin.metastore.StateStore; import org.apache.gobblin.runtime.troubleshooter.Issue; @@ -50,6 +52,7 @@ import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.metrics.event.TimingEvent; import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.util.measurement.GrowthMilestoneTracker; import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ParallelRunner; @@ -252,6 +255,8 @@ public boolean apply(String input) { } final Queue taskStateQueue = Queues.newConcurrentLinkedQueue(); + AtomicLong numStateStoreMissing = new AtomicLong(0L); + GrowthMilestoneTracker growthTracker = new GrowthMilestoneTracker(); try (ParallelRunner stateSerDeRunner = new ParallelRunner(numDeserializerThreads, null)) { for (final String taskStateName : taskStateNames) { log.debug("Found output task state file " + taskStateName); @@ -259,15 +264,23 @@ public boolean apply(String input) { stateSerDeRunner.submitCallable(new Callable() { @Override public Void call() throws Exception { - TaskState taskState = taskStateStore.getAll(taskStateTableName, taskStateName).get(0); - taskStateQueue.add(taskState); + List matchingTaskStates = taskStateStore.getAll(taskStateTableName, taskStateName); + if (matchingTaskStates.isEmpty()) { + long currNumMissing = numStateStoreMissing.incrementAndGet(); + // only log selective milestones to avoid flooding log w/ O(100k) stacktraces + if (growthTracker.isAnotherMilestone(currNumMissing)) { + throw new RuntimeExceptionWithoutStackTrace("missing task state [running total: " + currNumMissing + "] - " + taskStateName); + } + return null; // otherwise, when not a milestone, silently skip + } + taskStateQueue.add(matchingTaskStates.get(0)); taskStateStore.delete(taskStateTableName, taskStateName); return null; } }, "Deserialize state for " + taskStateName); } } catch (IOException ioe) { - log.error("Could not read all task state files due to", ioe); + log.error("Could not read all task state files [missing final total: " + numStateStoreMissing.get() + "] - ", ioe); } log.info(String.format("Collected task state of %d completed tasks in %s", taskStateQueue.size(), taskStateTableName)); return Optional.of(taskStateQueue); diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/exception/RuntimeExceptionWithoutStackTrace.java b/gobblin-utility/src/main/java/org/apache/gobblin/exception/RuntimeExceptionWithoutStackTrace.java new file mode 100644 index 00000000000..3f9ea8d1766 --- /dev/null +++ b/gobblin-utility/src/main/java/org/apache/gobblin/exception/RuntimeExceptionWithoutStackTrace.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.exception; + + +/** {@link RuntimeException} that omits the stack trace to streamline both instantiation cost and log volume */ +public class RuntimeExceptionWithoutStackTrace extends RuntimeException { + public RuntimeExceptionWithoutStackTrace(String message) { + super(message); + } + + public RuntimeExceptionWithoutStackTrace(String message, Throwable cause) { + super(message, cause); + } + + /** Secret sauce: no-op */ + @Override + public synchronized Throwable fillInStackTrace() { + return this; + } +}