From b21da313d6388c8c444f4eb6311abd79f67ff067 Mon Sep 17 00:00:00 2001 From: Kamil Janecek Date: Sat, 4 Jan 2025 23:14:51 +0100 Subject: [PATCH] #515: improve unresolved task deletion job --- .../kagkarlsson/scheduler/Scheduler.java | 30 ++++++++++++++++++- .../kagkarlsson/scheduler/TaskResolver.java | 10 ------- .../scheduler/testhelper/TestHelper.java | 6 ++++ .../scheduler/DeadExecutionsTest.java | 1 + .../functional/DeleteUnresolvedTest.java | 20 +++++++++---- 5 files changed, 50 insertions(+), 17 deletions(-) diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java index 678ee2f7..9c8ff9f3 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java @@ -17,6 +17,7 @@ import static java.util.stream.Collectors.toList; import com.github.kagkarlsson.scheduler.SchedulerState.SettableSchedulerState; +import com.github.kagkarlsson.scheduler.TaskResolver.UnresolvedTask; import com.github.kagkarlsson.scheduler.event.ExecutionInterceptor; import com.github.kagkarlsson.scheduler.event.SchedulerListener; import com.github.kagkarlsson.scheduler.event.SchedulerListener.SchedulerEventType; @@ -34,7 +35,9 @@ import java.time.Duration; import java.time.Instant; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -354,8 +357,33 @@ public List getCurrentlyExecutingWithStaleHeartbeat() { @SuppressWarnings({"rawtypes", "unchecked"}) protected void detectDeadExecutions() { LOG.debug("Deleting executions with unresolved tasks."); + + Map unresolvedTaskToNewestExecution = new HashMap<>(); + schedulerTaskRepository + .getScheduledExecutions( + ScheduledExecutionsFilter.all(), + execution -> unresolvedTaskToNewestExecution.merge( + execution.taskInstance.getTaskName(), + execution.executionTime, + (oldValue, newValue) -> oldValue.isAfter(newValue) ? oldValue : newValue + )); + taskResolver - .getUnresolvedTaskNames(deleteUnresolvedAfter) + .getUnresolved() + .stream() + .map(UnresolvedTask::getTaskName) + .filter(taskName -> { + Instant newestExecution = unresolvedTaskToNewestExecution.get(taskName); + + if (newestExecution == null) { + // probably deleted by other node + return true; + } + + Duration age = Duration.between(newestExecution, clock.now()); + + return age.compareTo(deleteUnresolvedAfter) >= 0; + }) .forEach( taskName -> { LOG.warn( diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskResolver.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskResolver.java index e294ef6a..9a918890 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskResolver.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskResolver.java @@ -79,16 +79,6 @@ public List getUnresolved() { return new ArrayList<>(unresolvedTasks.values()); } - public List getUnresolvedTaskNames(Duration unresolvedFor) { - return unresolvedTasks.values().stream() - .filter( - unresolved -> - Duration.between(unresolved.firstUnresolved, clock.now()).toMillis() - > unresolvedFor.toMillis()) - .map(UnresolvedTask::getTaskName) - .collect(Collectors.toList()); - } - public void clearUnresolved(String taskName) { unresolvedTasks.remove(taskName); } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java index 62b0438b..3feae312 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java @@ -24,6 +24,7 @@ import com.github.kagkarlsson.scheduler.stats.StatsRegistryAdapter; import com.github.kagkarlsson.scheduler.task.OnStartup; import com.github.kagkarlsson.scheduler.task.Task; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -70,6 +71,11 @@ public ManualSchedulerBuilder pollingStrategy(PollingStrategyConfig pollingStrat return this; } + public ManualSchedulerBuilder deleteUnresolvedAfter(Duration deleteUnresolvedAfter) { + super.deleteUnresolvedAfter = deleteUnresolvedAfter; + return this; + } + public ManualScheduler build() { final TaskResolver taskResolver = new TaskResolver(statsRegistry, clock, knownTasks); final JdbcTaskRepository schedulerTaskRepository = diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java index f6804439..23c7fa12 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java @@ -118,6 +118,7 @@ public void scheduler_should_detect_dead_execution_that_never_updated_heartbeat( assertThat(nonCompletingExecutionHandler.timesExecuted.get(), is(2)); } + public static class NonCompletingTask extends OneTimeTask { private final VoidExecutionHandler handler; diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/DeleteUnresolvedTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/DeleteUnresolvedTest.java index b384a160..e148b093 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/DeleteUnresolvedTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/DeleteUnresolvedTest.java @@ -17,6 +17,7 @@ import java.time.LocalTime; import java.time.ZoneId; import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; import java.util.Collections; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -41,28 +42,35 @@ public void setUp() { @Test public void should_delete_executions_with_old_unresolved_tasknames() { - OneTimeTask onetime = Tasks.oneTime("onetime").execute(TestTasks.DO_NOTHING); + OneTimeTask first = Tasks.oneTime("onetime_first").execute(TestTasks.DO_NOTHING); + OneTimeTask second = Tasks.oneTime("onetime_second").execute(TestTasks.DO_NOTHING); TestableRegistry testableRegistry = new TestableRegistry(false, Collections.emptyList()); - // Missing task with name 'onetime' + // Missing tasks with name 'onetime_first' and 'onetime_second' ManualScheduler scheduler = TestHelper.createManualScheduler(postgres.getDataSource()) .clock(clock) .statsRegistry(testableRegistry) + .deleteUnresolvedAfter(Duration.ofDays(5)) .build(); - scheduler.schedule(onetime.instance("id1"), clock.now()); + scheduler.schedule(first.instance("id_f"), clock.now()); + scheduler.schedule(second.instance("id_s"), clock.now().minus(1, ChronoUnit.DAYS)); assertEquals(0, testableRegistry.getCount(StatsRegistry.SchedulerStatsEvent.UNRESOLVED_TASK)); scheduler.runAnyDueExecutions(); - assertEquals(1, testableRegistry.getCount(StatsRegistry.SchedulerStatsEvent.UNRESOLVED_TASK)); + assertEquals(2, testableRegistry.getCount(StatsRegistry.SchedulerStatsEvent.UNRESOLVED_TASK)); - assertEquals(1, DbUtils.countExecutions(postgres.getDataSource())); + assertEquals(2, DbUtils.countExecutions(postgres.getDataSource())); + + scheduler.runDeadExecutionDetection(); + assertEquals(2, DbUtils.countExecutions(postgres.getDataSource())); + clock.tick(Duration.ofDays(4)); scheduler.runDeadExecutionDetection(); assertEquals(1, DbUtils.countExecutions(postgres.getDataSource())); - clock.set(clock.now().plus(Duration.ofDays(30))); + clock.tick(Duration.ofDays(1)); scheduler.runDeadExecutionDetection(); assertEquals(0, DbUtils.countExecutions(postgres.getDataSource()));