Skip to content

Commit

Permalink
kagkarlsson#515: improve unresolved task deletion job
Browse files Browse the repository at this point in the history
  • Loading branch information
kamko committed Jan 4, 2025
1 parent c21b620 commit b21da31
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static java.util.stream.Collectors.toList;

import com.github.kagkarlsson.scheduler.SchedulerState.SettableSchedulerState;
import com.github.kagkarlsson.scheduler.TaskResolver.UnresolvedTask;
import com.github.kagkarlsson.scheduler.event.ExecutionInterceptor;
import com.github.kagkarlsson.scheduler.event.SchedulerListener;
import com.github.kagkarlsson.scheduler.event.SchedulerListener.SchedulerEventType;
Expand All @@ -34,7 +35,9 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -354,8 +357,33 @@ public List<CurrentlyExecuting> getCurrentlyExecutingWithStaleHeartbeat() {
@SuppressWarnings({"rawtypes", "unchecked"})
protected void detectDeadExecutions() {
LOG.debug("Deleting executions with unresolved tasks.");

Map<String, Instant> unresolvedTaskToNewestExecution = new HashMap<>();
schedulerTaskRepository
.getScheduledExecutions(
ScheduledExecutionsFilter.all(),
execution -> unresolvedTaskToNewestExecution.merge(
execution.taskInstance.getTaskName(),
execution.executionTime,
(oldValue, newValue) -> oldValue.isAfter(newValue) ? oldValue : newValue
));

taskResolver
.getUnresolvedTaskNames(deleteUnresolvedAfter)
.getUnresolved()
.stream()
.map(UnresolvedTask::getTaskName)
.filter(taskName -> {
Instant newestExecution = unresolvedTaskToNewestExecution.get(taskName);

if (newestExecution == null) {
// probably deleted by other node
return true;
}

Duration age = Duration.between(newestExecution, clock.now());

return age.compareTo(deleteUnresolvedAfter) >= 0;
})
.forEach(
taskName -> {
LOG.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,6 @@ public List<UnresolvedTask> getUnresolved() {
return new ArrayList<>(unresolvedTasks.values());
}

public List<String> getUnresolvedTaskNames(Duration unresolvedFor) {
return unresolvedTasks.values().stream()
.filter(
unresolved ->
Duration.between(unresolved.firstUnresolved, clock.now()).toMillis()
> unresolvedFor.toMillis())
.map(UnresolvedTask::getTaskName)
.collect(Collectors.toList());
}

public void clearUnresolved(String taskName) {
unresolvedTasks.remove(taskName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.github.kagkarlsson.scheduler.stats.StatsRegistryAdapter;
import com.github.kagkarlsson.scheduler.task.OnStartup;
import com.github.kagkarlsson.scheduler.task.Task;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -70,6 +71,11 @@ public ManualSchedulerBuilder pollingStrategy(PollingStrategyConfig pollingStrat
return this;
}

public ManualSchedulerBuilder deleteUnresolvedAfter(Duration deleteUnresolvedAfter) {
super.deleteUnresolvedAfter = deleteUnresolvedAfter;
return this;
}

public ManualScheduler build() {
final TaskResolver taskResolver = new TaskResolver(statsRegistry, clock, knownTasks);
final JdbcTaskRepository schedulerTaskRepository =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public void scheduler_should_detect_dead_execution_that_never_updated_heartbeat(
assertThat(nonCompletingExecutionHandler.timesExecuted.get(), is(2));
}


public static class NonCompletingTask<T> extends OneTimeTask<T> {
private final VoidExecutionHandler<T> handler;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -41,28 +42,35 @@ public void setUp() {
@Test
public void should_delete_executions_with_old_unresolved_tasknames() {

OneTimeTask<Void> onetime = Tasks.oneTime("onetime").execute(TestTasks.DO_NOTHING);
OneTimeTask<Void> first = Tasks.oneTime("onetime_first").execute(TestTasks.DO_NOTHING);
OneTimeTask<Void> second = Tasks.oneTime("onetime_second").execute(TestTasks.DO_NOTHING);

TestableRegistry testableRegistry = new TestableRegistry(false, Collections.emptyList());
// Missing task with name 'onetime'
// Missing tasks with name 'onetime_first' and 'onetime_second'
ManualScheduler scheduler =
TestHelper.createManualScheduler(postgres.getDataSource())
.clock(clock)
.statsRegistry(testableRegistry)
.deleteUnresolvedAfter(Duration.ofDays(5))
.build();

scheduler.schedule(onetime.instance("id1"), clock.now());
scheduler.schedule(first.instance("id_f"), clock.now());
scheduler.schedule(second.instance("id_s"), clock.now().minus(1, ChronoUnit.DAYS));
assertEquals(0, testableRegistry.getCount(StatsRegistry.SchedulerStatsEvent.UNRESOLVED_TASK));

scheduler.runAnyDueExecutions();
assertEquals(1, testableRegistry.getCount(StatsRegistry.SchedulerStatsEvent.UNRESOLVED_TASK));
assertEquals(2, testableRegistry.getCount(StatsRegistry.SchedulerStatsEvent.UNRESOLVED_TASK));

assertEquals(1, DbUtils.countExecutions(postgres.getDataSource()));
assertEquals(2, DbUtils.countExecutions(postgres.getDataSource()));

scheduler.runDeadExecutionDetection();
assertEquals(2, DbUtils.countExecutions(postgres.getDataSource()));

clock.tick(Duration.ofDays(4));
scheduler.runDeadExecutionDetection();
assertEquals(1, DbUtils.countExecutions(postgres.getDataSource()));

clock.set(clock.now().plus(Duration.ofDays(30)));
clock.tick(Duration.ofDays(1));
scheduler.runDeadExecutionDetection();
assertEquals(0, DbUtils.countExecutions(postgres.getDataSource()));

Expand Down

0 comments on commit b21da31

Please sign in to comment.