From cc6f76fb8d40a5c7268e04f2c6120abcca2a8e54 Mon Sep 17 00:00:00 2001 From: Gustav Karlsson Date: Fri, 25 Oct 2024 11:30:12 +0200 Subject: [PATCH] Try removing dependency on priority-column in table (to allow users to upgrade without modifying the schema) --- .../scheduler/FetchCandidates.java | 8 +- .../scheduler/LockAndFetchCandidates.java | 8 +- .../kagkarlsson/scheduler/Scheduler.java | 11 +- .../scheduler/SchedulerBuilder.java | 3 +- .../kagkarlsson/scheduler/TaskRepository.java | 4 +- .../scheduler/jdbc/JdbcTaskRepository.java | 34 ++++-- .../scheduler/testhelper/ManualScheduler.java | 3 +- .../scheduler/DeadExecutionsTest.java | 4 +- .../scheduler/JdbcTaskRepositoryTest.java | 105 +++++++++--------- .../compatibility/CompatibilityTest.java | 24 ++-- .../functional/PriorityExecutionTest.java | 2 - .../JdbcTaskRepositoryExceptionsTest.java | 5 +- 12 files changed, 105 insertions(+), 106 deletions(-) diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java index d6d2d753..1ee9a9a5 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/FetchCandidates.java @@ -44,7 +44,6 @@ public class FetchCandidates implements PollStrategy { AtomicInteger currentGenerationNumber = new AtomicInteger(0); private final int lowerLimit; private final int upperLimit; - private final boolean priorityEnabled; public FetchCandidates( Executor executor, @@ -59,8 +58,7 @@ public FetchCandidates( Clock clock, PollingStrategyConfig pollingStrategyConfig, Runnable triggerCheckForNewExecutions, - HeartbeatConfig heartbeatConfig, - boolean priorityEnabled) { + HeartbeatConfig heartbeatConfig) { this.executor = executor; this.taskRepository = taskRepository; this.schedulerClient = schedulerClient; @@ -73,7 +71,6 @@ public FetchCandidates( this.pollingStrategyConfig = pollingStrategyConfig; this.triggerCheckForNewExecutions = triggerCheckForNewExecutions; this.heartbeatConfig = heartbeatConfig; - this.priorityEnabled = priorityEnabled; lowerLimit = pollingStrategyConfig.getLowerLimit(threadpoolSize); // FIXLATER: this is not "upper limit", but rather nr of executions to get. those already in // queue will become stale @@ -87,8 +84,7 @@ public void run() { // Fetch new candidates for execution. Old ones still in ExecutorService will become stale and // be discarded final int executionsToFetch = upperLimit; - List fetchedDueExecutions = - taskRepository.getDue(now, executionsToFetch, priorityEnabled); + List fetchedDueExecutions = taskRepository.getDue(now, executionsToFetch); LOG.trace( "Fetched {} task instances due for execution at {}", fetchedDueExecutions.size(), now); diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java index 98d39af9..b17106fc 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/LockAndFetchCandidates.java @@ -41,7 +41,6 @@ public class LockAndFetchCandidates implements PollStrategy { private final int lowerLimit; private final int upperLimit; private AtomicBoolean moreExecutionsInDatabase = new AtomicBoolean(false); - private final boolean priorityEnabled; public LockAndFetchCandidates( Executor executor, @@ -56,8 +55,7 @@ public LockAndFetchCandidates( Clock clock, PollingStrategyConfig pollingStrategyConfig, Runnable triggerCheckForNewExecutions, - HeartbeatConfig maxAgeBeforeConsideredDead, - boolean priorityEnabled) { + HeartbeatConfig maxAgeBeforeConsideredDead) { this.executor = executor; this.taskRepository = taskRepository; this.schedulerClient = schedulerClient; @@ -70,7 +68,6 @@ public LockAndFetchCandidates( this.pollingStrategyConfig = pollingStrategyConfig; this.triggerCheckForNewExecutions = triggerCheckForNewExecutions; this.maxAgeBeforeConsideredDead = maxAgeBeforeConsideredDead; - this.priorityEnabled = priorityEnabled; lowerLimit = pollingStrategyConfig.getLowerLimit(threadpoolSize); upperLimit = pollingStrategyConfig.getUpperLimit(threadpoolSize); } @@ -88,8 +85,7 @@ public void run() { } // FIXLATER: should it fetch here if not under lowerLimit? probably - List pickedExecutions = - taskRepository.lockAndGetDue(now, executionsToFetch, priorityEnabled); + List pickedExecutions = taskRepository.lockAndGetDue(now, executionsToFetch); LOG.trace("Picked {} taskinstances due for execution", pickedExecutions.size()); // Shared indicator for if there are more due executions in the database. diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java index c3aa6cf1..32d1c329 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java @@ -69,7 +69,6 @@ public class Scheduler implements SchedulerClient { private final Waiter heartbeatWaiter; final SettableSchedulerState schedulerState = new SettableSchedulerState(); final ConfigurableLogger failureLogger; - final boolean priorityEnabled; protected Scheduler( Clock clock, @@ -91,8 +90,7 @@ protected Scheduler( boolean logStackTrace, List onStartup, ExecutorService dueExecutor, - ScheduledExecutorService housekeeperExecutor, - boolean priorityEnabled) { + ScheduledExecutorService housekeeperExecutor) { this.clock = clock; this.schedulerTaskRepository = schedulerTaskRepository; this.taskResolver = taskResolver; @@ -114,7 +112,6 @@ protected Scheduler( this.housekeeperExecutor = housekeeperExecutor; delegate = new StandardSchedulerClient(clientTaskRepository, this.schedulerListeners, clock); this.failureLogger = ConfigurableLogger.create(LOG, logLevel, logStackTrace); - this.priorityEnabled = priorityEnabled; if (pollingStrategyConfig.type == PollingStrategyConfig.Type.LOCK_AND_FETCH) { schedulerTaskRepository.verifySupportsLockAndFetch(); @@ -132,8 +129,7 @@ protected Scheduler( clock, pollingStrategyConfig, this::triggerCheckForDueExecutions, - heartbeatConfig, - priorityEnabled); + heartbeatConfig); } else if (pollingStrategyConfig.type == PollingStrategyConfig.Type.FETCH) { executeDueStrategy = new FetchCandidates( @@ -149,8 +145,7 @@ protected Scheduler( clock, pollingStrategyConfig, this::triggerCheckForDueExecutions, - heartbeatConfig, - priorityEnabled); + heartbeatConfig); } else { throw new IllegalArgumentException( "Unknown polling-strategy type: " + pollingStrategyConfig.type); diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java index ab595e29..0b60f621 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java @@ -325,8 +325,7 @@ public Scheduler build() { logStackTrace, startTasks, candidateDueExecutor, - candidateHousekeeperExecutor, - enablePriority); + candidateHousekeeperExecutor); if (enableImmediateExecution) { scheduler.registerSchedulerListener(new ImmediateCheckForDueExecutions(scheduler, clock)); diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskRepository.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskRepository.java index 1278df95..ea22a803 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskRepository.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskRepository.java @@ -26,7 +26,7 @@ public interface TaskRepository { boolean createIfNotExists(SchedulableInstance execution); - List getDue(Instant now, int limit, boolean orderByPriority); + List getDue(Instant now, int limit); Instant replace(Execution toBeReplaced, SchedulableInstance newInstance); @@ -37,7 +37,7 @@ void getScheduledExecutions( List lockAndFetchGeneric(Instant now, int limit); - List lockAndGetDue(Instant now, int limit, boolean orderByPriority); + List lockAndGetDue(Instant now, int limit); void remove(Execution execution); diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepository.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepository.java index 2c3243de..f9791e38 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepository.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepository.java @@ -159,18 +159,29 @@ public boolean createIfNotExists(SchedulableInstance instance) { return false; } + // FIXLATER: replace with some sort of generic SQL-builder, possibly extend micro-jdbc + // with execute(query-and-pss) and have builder return that.. jdbcRunner.execute( "insert into " + tableName - + "(task_name, task_instance, task_data, execution_time, picked, version, priority) values(?, ?, ?, ?, ?, ?, ?)", + + "(task_name, task_instance, task_data, execution_time, picked, version" + + (orderByPriority ? ", priority" : "") + + ") values(?, ?, ?, ?, ?, ? " + + (orderByPriority ? ", ?" : "") + + ")", (PreparedStatement p) -> { - p.setString(1, taskInstance.getTaskName()); - p.setString(2, taskInstance.getId()); - jdbcCustomization.setTaskData(p, 3, serializer.serialize(taskInstance.getData())); - jdbcCustomization.setInstant(p, 4, instance.getNextExecutionTime(clock.now())); - p.setBoolean(5, false); - p.setLong(6, 1L); - p.setInt(7, taskInstance.getPriority()); + int parameterIndex = 1; + p.setString(parameterIndex++, taskInstance.getTaskName()); + p.setString(parameterIndex++, taskInstance.getId()); + jdbcCustomization.setTaskData( + p, parameterIndex++, serializer.serialize(taskInstance.getData())); + jdbcCustomization.setInstant( + p, parameterIndex++, instance.getNextExecutionTime(clock.now())); + p.setBoolean(parameterIndex++, false); + p.setLong(parameterIndex++, 1L); + if (orderByPriority) { + p.setInt(parameterIndex++, taskInstance.getPriority()); + } }); return true; @@ -285,7 +296,7 @@ public void getScheduledExecutions( } @Override - public List getDue(Instant now, int limit, boolean orderByPriority) { + public List getDue(Instant now, int limit) { LOG.trace("Using generic fetch-then-lock query"); final UnresolvedFilter unresolvedFilter = new UnresolvedFilter(taskResolver.getUnresolved()); String selectDueQuery = @@ -397,7 +408,7 @@ private List updateToPicked( } @Override - public List lockAndGetDue(Instant now, int limit, boolean orderByPriority) { + public List lockAndGetDue(Instant now, int limit) { if (jdbcCustomization.supportsSingleStatementLockAndFetch()) { LOG.trace("Using single-statement lock-and-fetch"); return jdbcCustomization.lockAndFetchSingleStatement( @@ -787,7 +798,8 @@ public Void map(ResultSet rs) throws SQLException { // default Instant lastHeartbeat = jdbcCustomization.getInstant(rs, "last_heartbeat"); long version = rs.getLong("version"); - int priority = rs.getInt("priority"); + + int priority = orderByPriority ? rs.getInt("priority") : 0; Supplier dataSupplier = memoize( diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java index 2368afa7..c79e4fe3 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java @@ -70,8 +70,7 @@ public class ManualScheduler extends Scheduler { logStackTrace, onStartup, dueExecutor, - houseKeeperExecutor, - priorityEnabled); + houseKeeperExecutor); this.clock = clock; } diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java index 03d6b90d..f6804439 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java @@ -74,7 +74,7 @@ public void scheduler_should_handle_dead_executions() { new SchedulableTaskInstance<>(taskInstance, now.minus(Duration.ofDays(1))); jdbcTaskRepository.createIfNotExists(execution1); - final List due = jdbcTaskRepository.getDue(now, POLLING_LIMIT, false); + final List due = jdbcTaskRepository.getDue(now, POLLING_LIMIT); assertThat(due, Matchers.hasSize(1)); final Execution execution = due.get(0); final Optional pickedExecution = jdbcTaskRepository.pick(execution, now); @@ -87,7 +87,7 @@ public void scheduler_should_handle_dead_executions() { assertThat(rescheduled.get().picked, is(false)); assertThat(rescheduled.get().pickedBy, nullValue()); - assertThat(jdbcTaskRepository.getDue(Instant.now(), POLLING_LIMIT, false), hasSize(1)); + assertThat(jdbcTaskRepository.getDue(Instant.now(), POLLING_LIMIT), hasSize(1)); } @Test diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/JdbcTaskRepositoryTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/JdbcTaskRepositoryTest.java index 88345367..f80888a8 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/JdbcTaskRepositoryTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/JdbcTaskRepositoryTest.java @@ -64,15 +64,7 @@ public void setUp() { knownTasks.add(alternativeOneTimeTask); testableRegistry = new TestableRegistry(true, Collections.emptyList()); taskResolver = new TaskResolver(testableRegistry, knownTasks); - taskRepository = - new JdbcTaskRepository( - DB.getDataSource(), - false, - DEFAULT_TABLE_NAME, - taskResolver, - new SchedulerName.Fixed(SCHEDULER_NAME), - false, - new SystemClock()); + taskRepository = createRepository(taskResolver, false); } @Test @@ -111,8 +103,8 @@ public void get_due_should_only_include_due_executions() { taskRepository.createIfNotExists( new SchedulableTaskInstance<>(oneTimeTask.instance("id1"), now)); - assertThat(taskRepository.getDue(now, POLLING_LIMIT, false), hasSize(1)); - assertThat(taskRepository.getDue(now.minusSeconds(1), POLLING_LIMIT, false), hasSize(0)); + assertThat(taskRepository.getDue(now, POLLING_LIMIT), hasSize(1)); + assertThat(taskRepository.getDue(now.minusSeconds(1), POLLING_LIMIT), hasSize(0)); } @Test @@ -123,8 +115,8 @@ public void get_due_should_honor_max_results_limit() { new SchedulableTaskInstance<>(oneTimeTask.instance("id1"), now)); taskRepository.createIfNotExists( new SchedulableTaskInstance<>(oneTimeTask.instance("id2"), now)); - assertThat(taskRepository.getDue(now, 1, false), hasSize(1)); - assertThat(taskRepository.getDue(now, 2, false), hasSize(2)); + assertThat(taskRepository.getDue(now, 1), hasSize(1)); + assertThat(taskRepository.getDue(now, 2), hasSize(2)); } @Test @@ -137,7 +129,7 @@ public void get_due_should_be_sorted() { new SchedulableTaskInstance<>( oneTimeTask.instance("id" + i), now.minusSeconds(new Random().nextInt(10000))))); - List due = taskRepository.getDue(now, POLLING_LIMIT, false); + List due = taskRepository.getDue(now, POLLING_LIMIT); assertThat(due, hasSize(100)); List sortedDue = new ArrayList<>(due); @@ -146,7 +138,8 @@ public void get_due_should_be_sorted() { } @Test - public void get_due_should_be_sorted_by_priority() { + public void get_due_should_be_sorted_by_priority_when_enabled() { + JdbcTaskRepository taskRespositoryWithPriority = createRepository(taskResolver, true); Instant now = TimeHelper.truncatedInstantNow(); SchedulableTaskInstance id1 = new SchedulableTaskInstance<>( @@ -158,16 +151,17 @@ public void get_due_should_be_sorted_by_priority() { new SchedulableTaskInstance<>( oneTimeTask.instanceBuilder("id3").priority(5).build(), now.minus(Duration.ofDays(3))); - Stream.of(id1, id2, id3).forEach(taskRepository::createIfNotExists); + Stream.of(id1, id2, id3).forEach(taskRespositoryWithPriority::createIfNotExists); List orderedByPriority = - taskRepository.getDue(now, POLLING_LIMIT, true).stream() + taskRespositoryWithPriority.getDue(now, POLLING_LIMIT).stream() .map(Execution::getId) .collect(Collectors.toList()); assertThat(orderedByPriority, contains("id2", "id3", "id1")); + // default task-repository should have no order List orderedByExecutionTime = - taskRepository.getDue(now, POLLING_LIMIT, false).stream() + taskRepository.getDue(now, POLLING_LIMIT).stream() .map(Execution::getId) .collect(Collectors.toList()); assertThat(orderedByExecutionTime, contains("id3", "id2", "id1")); @@ -188,11 +182,11 @@ public void get_due_should_not_include_previously_unresolved() { // 1 taskRepository.createIfNotExists( new SchedulableTaskInstance<>(unresolved1.instance("id"), now)); - assertThat(taskRepository.getDue(now, POLLING_LIMIT, false), hasSize(0)); + assertThat(taskRepository.getDue(now, POLLING_LIMIT), hasSize(0)); assertThat(taskResolver.getUnresolved(), hasSize(1)); assertEquals(1, testableRegistry.getCount(SchedulerStatsEvent.UNRESOLVED_TASK)); - assertThat(taskRepository.getDue(now, POLLING_LIMIT, false), hasSize(0)); + assertThat(taskRepository.getDue(now, POLLING_LIMIT), hasSize(0)); assertThat(taskResolver.getUnresolved(), hasSize(1)); assertEquals( 1, @@ -202,14 +196,14 @@ public void get_due_should_not_include_previously_unresolved() { // 1, 2 taskRepository.createIfNotExists( new SchedulableTaskInstance<>(unresolved2.instance("id"), now)); - assertThat(taskRepository.getDue(now, POLLING_LIMIT, false), hasSize(0)); + assertThat(taskRepository.getDue(now, POLLING_LIMIT), hasSize(0)); assertThat(taskResolver.getUnresolved(), hasSize(2)); assertEquals(2, testableRegistry.getCount(SchedulerStatsEvent.UNRESOLVED_TASK)); // 1, 2, 3 taskRepository.createIfNotExists( new SchedulableTaskInstance<>(unresolved3.instance("id"), now)); - assertThat(taskRepository.getDue(now, POLLING_LIMIT, false), hasSize(0)); + assertThat(taskRepository.getDue(now, POLLING_LIMIT), hasSize(0)); assertThat(taskResolver.getUnresolved(), hasSize(3)); assertEquals(3, testableRegistry.getCount(SchedulerStatsEvent.UNRESOLVED_TASK)); } @@ -219,11 +213,11 @@ public void picked_executions_should_not_be_returned_as_due() { Instant now = TimeHelper.truncatedInstantNow(); taskRepository.createIfNotExists( new SchedulableTaskInstance<>(oneTimeTask.instance("id1"), now)); - List due = taskRepository.getDue(now, POLLING_LIMIT, false); + List due = taskRepository.getDue(now, POLLING_LIMIT); assertThat(due, hasSize(1)); taskRepository.pick(due.get(0), now); - assertThat(taskRepository.getDue(now, POLLING_LIMIT, false), hasSize(0)); + assertThat(taskRepository.getDue(now, POLLING_LIMIT), hasSize(0)); } @Test @@ -231,7 +225,7 @@ public void picked_execution_should_have_information_about_which_scheduler_proce Instant now = TimeHelper.truncatedInstantNow(); final TaskInstance instance = oneTimeTask.instance("id1"); taskRepository.createIfNotExists(new SchedulableTaskInstance<>(instance, now)); - List due = taskRepository.getDue(now, POLLING_LIMIT, false); + List due = taskRepository.getDue(now, POLLING_LIMIT); assertThat(due, hasSize(1)); taskRepository.pick(due.get(0), now); @@ -240,7 +234,7 @@ public void picked_execution_should_have_information_about_which_scheduler_proce assertThat(pickedExecution.get().picked, is(true)); assertThat(pickedExecution.get().pickedBy, is(SCHEDULER_NAME)); assertThat(pickedExecution.get().lastHeartbeat, notNullValue()); - assertThat(taskRepository.getDue(now, POLLING_LIMIT, false), hasSize(0)); + assertThat(taskRepository.getDue(now, POLLING_LIMIT), hasSize(0)); } @Test @@ -249,7 +243,7 @@ public void should_not_be_able_to_pick_execution_that_has_been_rescheduled() { final TaskInstance instance = oneTimeTask.instance("id1"); taskRepository.createIfNotExists(new SchedulableTaskInstance<>(instance, now)); - List due = taskRepository.getDue(now, POLLING_LIMIT, false); + List due = taskRepository.getDue(now, POLLING_LIMIT); assertThat(due, hasSize(1)); final Execution execution = due.get(0); final Optional pickedExecution = taskRepository.pick(execution, now); @@ -264,7 +258,7 @@ public void reschedule_should_move_execution_in_time() { Instant now = TimeHelper.truncatedInstantNow(); final TaskInstance instance = oneTimeTask.instance("id1"); taskRepository.createIfNotExists(new SchedulableTaskInstance<>(instance, now)); - List due = taskRepository.getDue(now, POLLING_LIMIT, false); + List due = taskRepository.getDue(now, POLLING_LIMIT); assertThat(due, hasSize(1)); Execution execution = due.get(0); @@ -272,8 +266,8 @@ public void reschedule_should_move_execution_in_time() { final Instant nextExecutionTime = now.plus(Duration.ofMinutes(1)); taskRepository.reschedule(pickedExecution.get(), nextExecutionTime, now, null, 0); - assertThat(taskRepository.getDue(now, POLLING_LIMIT, false), hasSize(0)); - assertThat(taskRepository.getDue(nextExecutionTime, POLLING_LIMIT, false), hasSize(1)); + assertThat(taskRepository.getDue(now, POLLING_LIMIT), hasSize(0)); + assertThat(taskRepository.getDue(nextExecutionTime, POLLING_LIMIT), hasSize(1)); final Optional nextExecution = taskRepository.getExecution(instance); assertTrue(nextExecution.isPresent()); @@ -287,7 +281,7 @@ public void reschedule_should_persist_consecutive_failures() { Instant now = TimeHelper.truncatedInstantNow(); final TaskInstance instance = oneTimeTask.instance("id1"); taskRepository.createIfNotExists(new SchedulableTaskInstance<>(instance, now)); - List due = taskRepository.getDue(now, POLLING_LIMIT, false); + List due = taskRepository.getDue(now, POLLING_LIMIT); assertThat(due, hasSize(1)); Execution execution = due.get(0); @@ -323,7 +317,7 @@ public void test_get_failing_executions() { final TaskInstance instance = oneTimeTask.instance("id1"); taskRepository.createIfNotExists(new SchedulableTaskInstance<>(instance, now)); - List due = taskRepository.getDue(now, POLLING_LIMIT, false); + List due = taskRepository.getDue(now, POLLING_LIMIT); assertThat(due, hasSize(1)); assertThat(taskRepository.getExecutionsFailingLongerThan(Duration.ZERO), hasSize(0)); @@ -395,13 +389,6 @@ public void get_scheduled_by_task_name() { assertThat(getScheduledExecutions(all().withPicked(false), "non-existing"), empty()); } - private List getScheduledExecutions( - ScheduledExecutionsFilter filter, String taskName) { - List alternativeTasks = new ArrayList<>(); - taskRepository.getScheduledExecutions(filter, taskName, alternativeTasks::add); - return alternativeTasks; - } - @Test public void get_dead_executions_should_not_include_previously_unresolved() { Instant now = TimeHelper.truncatedInstantNow(); @@ -411,15 +398,7 @@ public void get_dead_executions_should_not_include_previously_unresolved() { createDeadExecution(oneTimeTask.instance("id1"), timeDied); TaskResolver taskResolverMissingTask = new TaskResolver(testableRegistry); - JdbcTaskRepository repoMissingTask = - new JdbcTaskRepository( - DB.getDataSource(), - false, - DEFAULT_TABLE_NAME, - taskResolverMissingTask, - new SchedulerName.Fixed(SCHEDULER_NAME), - false, - new SystemClock()); + JdbcTaskRepository repoMissingTask = createRepository(taskResolverMissingTask, false); assertThat(taskResolverMissingTask.getUnresolved(), hasSize(0)); assertEquals(0, testableRegistry.getCount(SchedulerStatsEvent.UNRESOLVED_TASK)); @@ -441,7 +420,7 @@ public void get_scheduled_executions_should_work_with_unresolved() { TestTasks.oneTime(taskName, Void.class, TestTasks.DO_NOTHING); taskRepository.createIfNotExists( new SchedulableTaskInstance<>(unresolved1.instance("id"), now)); - assertThat(taskRepository.getDue(now, POLLING_LIMIT, false), hasSize(0)); + assertThat(taskRepository.getDue(now, POLLING_LIMIT), hasSize(0)); assertThat(taskResolver.getUnresolved(), hasSize(1)); assertThat(getScheduledExecutions(ScheduledExecutionsFilter.onlyResolved()), hasSize(0)); @@ -458,11 +437,11 @@ public void lockAndGetDue_should_pick_due() { new SchedulableTaskInstance<>(oneTimeTask.instance("future1"), now.plusSeconds(10))); taskRepository.createIfNotExists( new SchedulableTaskInstance<>(oneTimeTask.instance("id1"), now)); - List picked = taskRepository.lockAndGetDue(now, POLLING_LIMIT, false); + List picked = taskRepository.lockAndGetDue(now, POLLING_LIMIT); assertThat(picked, hasSize(1)); // should not be able to pick the same execution twice - assertThat(taskRepository.lockAndGetDue(now, POLLING_LIMIT, false), hasSize(0)); + assertThat(taskRepository.lockAndGetDue(now, POLLING_LIMIT), hasSize(0)); assertThat(taskRepository.pick(picked.get(0), now), OptionalMatchers.empty()); } @@ -476,8 +455,8 @@ public void lockAndGetDue_should_not_include_previously_unresolved() { taskRepository.createIfNotExists( new SchedulableTaskInstance<>(unresolved1.instance("id"), now)); - assertThat(taskRepository.lockAndGetDue(now, POLLING_LIMIT, false), hasSize(0)); - assertThat(taskRepository.lockAndGetDue(now, POLLING_LIMIT, false), hasSize(0)); + assertThat(taskRepository.lockAndGetDue(now, POLLING_LIMIT), hasSize(0)); + assertThat(taskRepository.lockAndGetDue(now, POLLING_LIMIT), hasSize(0)); assertThat(taskResolver.getUnresolved(), hasSize(1)); assertEquals(1, testableRegistry.getCount(SchedulerStatsEvent.UNRESOLVED_TASK)); } @@ -497,6 +476,24 @@ public void lockAndFetchGeneric_happy() { assertThat(taskRepository.pick(picked.get(0), now), OptionalMatchers.empty()); } + private List getScheduledExecutions( + ScheduledExecutionsFilter filter, String taskName) { + List alternativeTasks = new ArrayList<>(); + taskRepository.getScheduledExecutions(filter, taskName, alternativeTasks::add); + return alternativeTasks; + } + + private JdbcTaskRepository createRepository(TaskResolver taskResolver, boolean orderByPriority) { + return new JdbcTaskRepository( + DB.getDataSource(), + false, + DEFAULT_TABLE_NAME, + taskResolver, + new SchedulerName.Fixed(SCHEDULER_NAME), + orderByPriority, + new SystemClock()); + } + private void createDeadExecution(TaskInstance taskInstance, Instant timeDied) { taskRepository.createIfNotExists(new SchedulableTaskInstance<>(taskInstance, timeDied)); final Execution due = getSingleExecution(); @@ -506,7 +503,7 @@ private void createDeadExecution(TaskInstance taskInstance, Instant timeDi } private Execution getSingleDueExecution() { - List due = taskRepository.getDue(Instant.now(), POLLING_LIMIT, false); + List due = taskRepository.getDue(Instant.now(), POLLING_LIMIT); return due.get(0); } diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/compatibility/CompatibilityTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/compatibility/CompatibilityTest.java index b0f4364f..f5749d05 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/compatibility/CompatibilityTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/compatibility/CompatibilityTest.java @@ -222,7 +222,7 @@ public void test_jdbc_repository_select_for_update_compatibility() { SchedulableInstance.of(oneTime.instance("future1"), now.plusSeconds(10))); jdbcTaskRepository.createIfNotExists( new SchedulableTaskInstance<>(oneTime.instance("id1"), now)); - List picked = jdbcTaskRepository.lockAndGetDue(now, POLLING_LIMIT, false); + List picked = jdbcTaskRepository.lockAndGetDue(now, POLLING_LIMIT); assertThat(picked, IsCollectionWithSize.hasSize(1)); assertThat(jdbcTaskRepository.pick(picked.get(0), now), OptionalMatchers.empty()); @@ -265,6 +265,16 @@ private void doJDBCRepositoryCompatibilityTestUsingData(String data) { false, new SystemClock()); + final JdbcTaskRepository jdbcTaskRepositoryWithPriority = + new JdbcTaskRepository( + dataSource, + commitWhenAutocommitDisabled(), + DEFAULT_TABLE_NAME, + taskResolver, + new SchedulerName.Fixed("scheduler1"), + true, + new SystemClock()); + final Instant now = TimeHelper.truncatedInstantNow(); final TaskInstance taskInstance = oneTime.instance("id1", data); @@ -275,24 +285,24 @@ private void doJDBCRepositoryCompatibilityTestUsingData(String data) { assertThat(storedExecution.getExecutionTime(), is(now)); // priority=true - assertThat(jdbcTaskRepository.getDue(now, POLLING_LIMIT, true), hasSize(1)); + assertThat(jdbcTaskRepositoryWithPriority.getDue(now, POLLING_LIMIT), hasSize(1)); + // priority=false - final List due = jdbcTaskRepository.getDue(now, POLLING_LIMIT, false); + final List due = jdbcTaskRepository.getDue(now, POLLING_LIMIT); assertThat(due, hasSize(1)); final Optional pickedExecution = jdbcTaskRepository.pick(due.get(0), now); assertThat(pickedExecution.isPresent(), is(true)); - assertThat(jdbcTaskRepository.getDue(now, POLLING_LIMIT, false), hasSize(0)); + assertThat(jdbcTaskRepository.getDue(now, POLLING_LIMIT), hasSize(0)); jdbcTaskRepository.updateHeartbeat(pickedExecution.get(), now.plusSeconds(1)); assertThat(jdbcTaskRepository.getDeadExecutions(now.plus(Duration.ofDays(1))), hasSize(1)); jdbcTaskRepository.reschedule( pickedExecution.get(), now.plusSeconds(1), now.minusSeconds(1), now.minusSeconds(1), 0); - assertThat(jdbcTaskRepository.getDue(now, POLLING_LIMIT, false), hasSize(0)); + assertThat(jdbcTaskRepository.getDue(now, POLLING_LIMIT), hasSize(0)); assertThat( - jdbcTaskRepository.getDue(now.plus(Duration.ofMinutes(1)), POLLING_LIMIT, false), - hasSize(1)); + jdbcTaskRepository.getDue(now.plus(Duration.ofMinutes(1)), POLLING_LIMIT), hasSize(1)); final Optional rescheduled = jdbcTaskRepository.getExecution(taskInstance); assertThat(rescheduled.isPresent(), is(true)); diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/PriorityExecutionTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/PriorityExecutionTest.java index f940b318..4c834945 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/PriorityExecutionTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/PriorityExecutionTest.java @@ -166,8 +166,6 @@ public void test_when_priority_is_disabled() { .map(e -> e.taskInstance.getPriority()) .collect(Collectors.toList()); - assertThat(orderOfPriorities, contains(2, 0, 1, 3)); - registry.assertNoFailures(); }); } diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepositoryExceptionsTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepositoryExceptionsTest.java index 2d1a2f80..6fb5b915 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepositoryExceptionsTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepositoryExceptionsTest.java @@ -57,10 +57,7 @@ public void createIfNotExistsFailsToAddNewTask() { .thenReturn(emptyList()); SQLRuntimeException rootCause = new SQLRuntimeException("SQL GO BOOM!!!"); when(mockJdbcRunner.execute( - ArgumentMatchers.eq( - "insert into " - + expectedTableName - + "(task_name, task_instance, task_data, execution_time, picked, version, priority) values(?, ?, ?, ?, ?, ?, ?)"), + ArgumentMatchers.startsWith("insert into " + expectedTableName), any(PreparedStatementSetter.class))) .thenThrow(rootCause);