From c3eace61a7bb514d89e7626ea9ff696d249ccc4f Mon Sep 17 00:00:00 2001 From: Szymon Rzepka Date: Sat, 16 Nov 2024 15:54:31 +0100 Subject: [PATCH] feat: Adding the ability to prioritize tasks (#519) This change introduces the possibility of using a task prioritization mechanism. This mechanism is disabled by default, and can be enabled using the `enablePrioritization()` method. ## Reminders - [x] Added/ran automated tests - [x] Update README and/or examples - [x] Ran `mvn spotless:apply` --- Co-authored-by: Gustav Karlsson --- README.md | 56 +++++- .../DbSchedulerAutoConfiguration.java | 4 + .../boot/config/DbSchedulerProperties.java | 11 ++ .../DbSchedulerAutoConfigurationTest.java | 14 ++ .../src/test/resources/schema.sql | 1 + .../scheduler/SchedulerBuilder.java | 11 +- .../scheduler/SchedulerClient.java | 8 + .../jdbc/AutodetectJdbcCustomization.java | 13 +- .../jdbc/DefaultJdbcCustomization.java | 10 +- .../scheduler/jdbc/JdbcCustomization.java | 7 +- .../scheduler/jdbc/JdbcTaskRepository.java | 55 ++++-- .../jdbc/MariaDBJdbcCustomization.java | 3 +- .../jdbc/MssqlJdbcCustomization.java | 8 +- .../jdbc/MySQL8JdbcCustomization.java | 3 +- .../jdbc/OracleJdbcCustomization.java | 3 +- .../jdbc/PostgreSqlJdbcCustomization.java | 15 +- .../kagkarlsson/scheduler/jdbc/Queries.java | 9 +- .../scheduler/task/AbstractTask.java | 12 +- .../kagkarlsson/scheduler/task/Priority.java | 21 +++ .../scheduler/task/SchedulableInstance.java | 46 ++++- .../kagkarlsson/scheduler/task/Task.java | 8 +- .../scheduler/task/TaskDescriptor.java | 14 +- .../scheduler/task/TaskInstance.java | 80 ++++++-- .../task/TaskWithDataDescriptor.java | 9 +- .../task/TaskWithoutDataDescriptor.java | 9 +- .../scheduler/task/helper/CustomTask.java | 7 +- .../scheduler/task/helper/OneTimeTask.java | 5 +- .../scheduler/task/helper/RecurringTask.java | 5 + .../RecurringTaskWithPersistentSchedule.java | 2 +- .../scheduler/testhelper/ManualScheduler.java | 3 +- .../scheduler/testhelper/TestHelper.java | 5 +- .../scheduler/CustomTableNameTest.java | 1 + .../github/kagkarlsson/scheduler/DbUtils.java | 2 +- .../scheduler/DeadExecutionsTest.java | 1 + .../kagkarlsson/scheduler/ExecutionTest.java | 1 - .../scheduler/JdbcTaskRepositoryTest.java | 76 +++++--- .../compatibility/CompatibilityTest.java | 112 ++++++++++-- .../ZoneSpecificJdbcCustomization.java | 14 +- .../functional/ImmediateExecutionTest.java | 1 - .../functional/PriorityExecutionTest.java | 172 ++++++++++++++++++ .../JdbcTaskRepositoryExceptionsTest.java | 7 +- .../scheduler/postgresql_custom_tablename.sql | 3 +- .../src/test/resources/hsql_tables.sql | 1 + .../src/test/resources/mariadb_tables.sql | 8 +- .../src/test/resources/mssql_tables.sql | 8 +- .../src/test/resources/mysql_tables.sql | 8 +- .../src/test/resources/oracle_tables.sql | 6 + .../src/test/resources/postgresql_tables.sql | 8 +- .../CheckForNewBatchDirectlyMain.java | 8 +- .../examples/DeletingUnresolvedTasksMain.java | 17 +- .../EnableImmediateExecutionMain.java | 13 +- .../examples/ExponentialBackoffMain.java | 9 +- .../ExponentialBackoffWithMaxRetriesMain.java | 8 +- .../examples/HeartbeatMonitoringMain.java | 8 +- .../JobChainingUsingSeparateTasksMain.java | 20 +- .../JobChainingUsingTaskDataMain.java | 13 +- .../examples/JsonSerializerMain.java | 12 +- .../kagkarlsson/examples/MaxRetriesMain.java | 7 +- .../kagkarlsson/examples/OneTimeTaskMain.java | 17 +- ...curringTaskWithPersistentScheduleMain.java | 18 +- .../examples/SchedulerClientMain.java | 8 +- .../examples/SerializingExperimentMain.java | 2 +- .../examples/SpawningOtherTasksMain.java | 9 +- ...curringTaskWithPersistentScheduleMain.java | 12 +- .../src/main/resources/hsql_tables.sql | 1 + .../config/BasicExamplesConfiguration.java | 13 +- .../boot/config/JobChainingConfiguration.java | 21 ++- .../config/LongRunningJobConfiguration.java | 13 +- .../MultiInstanceRecurringConfiguration.java | 12 +- .../config/ParallellJobConfiguration.java | 14 +- .../RecurringStateTrackingConfiguration.java | 14 +- ...TransactionallyStagedJobConfiguration.java | 12 +- .../src/main/resources/schema.sql | 1 + test/benchmark/infra/notes.txt | 4 +- 74 files changed, 930 insertions(+), 242 deletions(-) create mode 100644 db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/Priority.java create mode 100644 db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/PriorityExecutionTest.java diff --git a/README.md b/README.md index 74af727b..974e4c73 100644 --- a/README.md +++ b/README.md @@ -125,25 +125,32 @@ An instance of a _one-time_ task has a single execution-time some time in the fu Define a _one-time_ task and start the scheduler: ```java -OneTimeTask myAdhocTask = Tasks.oneTime("my-typed-adhoc-task", MyTaskData.class) +TaskDescriptor MY_TASK = + TaskDescriptor.of("my-onetime-task", MyTaskData.class); + +OneTimeTask myTaskImplementation = + Tasks.oneTime(MY_TASK) .execute((inst, ctx) -> { - System.out.println("Executed! Custom data, Id: " + inst.getData().id); + System.out.println("Executed! Custom data, Id: " + inst.getData().id); }); final Scheduler scheduler = Scheduler - .create(dataSource, myAdhocTask) - .registerShutdownHook() - .build(); + .create(dataSource, myTaskImplementation) + .registerShutdownHook() + .build(); scheduler.start(); - ``` ... and then at some point (at runtime), an execution is scheduled using the `SchedulerClient`: ```java // Schedule the task for execution a certain time in the future and optionally provide custom data for the execution -scheduler.schedule(myAdhocTask.instance("1045", new MyTaskData(1001L)), Instant.now().plusSeconds(5)); +scheduler.schedule( + MY_TASK + .instanceWithId("1045") + .data(new MyTaskData(1001L)) + .scheduledTo(Instant.now().plusSeconds(5))); ``` ### More examples @@ -225,6 +232,32 @@ How long the scheduler will wait before interrupting executor-service threads. I consider if it is possible to instead regularly check `executionContext.getSchedulerState().isShuttingDown()` in the ExecutionHandler and abort long-running task. Default `30min`. +:gear: `.enablePriority()`
+It is possible to define a priority for executions which determines the order in which due executions +are fetched from the database. An execution with a higher value for priority will run before an +execution with a lower value (technically, the ordering will be `order by priority desc, execution_time asc`). +Consider using priorities in the range 0-32000 as the field is defined as a `SMALLINT`. If you need a larger value, +modify the schema. For now, this feature is **opt-in**, and column `priority` is only needed by users who choose to +enable priority via this config setting. + +Set the priority per instance using the `TaskInstance.Builder`: + +```java + scheduler.schedule( + MY_TASK + .instance("1") + .priority(100) + .scheduledTo(Instant.now())); +``` + +**Note:** +* When enabling this feature, make sure you have the new necessary indexes defined. If you +regularly have a state with large amounts of executions both due and future, it might be beneficial +to add an index on `(execution_time asc, priority desc)` (replacing the old `execution_time asc`). +* This feature is not recommended for users of **MySQL** and **MariaDB** below version 8.x, +as they do not support descending indexes. +* Value `null` for priority may be interpreted differently depending on database (low or high). + #### Polling strategy If you are running >1000 executions/s you might want to use the `lock-and-fetch` polling-strategy for lower overhead @@ -408,6 +441,7 @@ db-scheduler.table-name=scheduled_tasks db-scheduler.immediate-execution-enabled=false db-scheduler.scheduler-name= db-scheduler.threads=10 +db-scheduler.priority-enabled=false # Ignored if a custom DbSchedulerStarter bean is defined db-scheduler.delay-startup-until-context-ready=false @@ -579,6 +613,14 @@ There are a number of users that are using db-scheduler for high throughput use- See [releases](https://github.com/kagkarlsson/db-scheduler/releases) for release-notes. +**Upgrading to 15.x** +* Priority is a new opt-in feature. To be able to use it, column `priority` and index `priority_execution_time_idx` + must be added to the database schema. See table definitions for + [postgresql](./b-scheduler/src/test/resources/postgresql_tables.sql), + [oracle](./db-scheduler/src/test/resources/oracle_tables.sql) or + [mysql](./db-scheduler/src/test/resources/mysql_tables.sql). + At some point, this column will be made mandatory. This will be made clear in future release/upgrade-notes. + **Upgrading to 8.x** * Custom Schedules must implement a method `boolean isDeterministic()` to indicate whether they will always produce the same instants or not. diff --git a/db-scheduler-boot-starter/src/main/java/com/github/kagkarlsson/scheduler/boot/autoconfigure/DbSchedulerAutoConfiguration.java b/db-scheduler-boot-starter/src/main/java/com/github/kagkarlsson/scheduler/boot/autoconfigure/DbSchedulerAutoConfiguration.java index 9198648f..3fdb7822 100644 --- a/db-scheduler-boot-starter/src/main/java/com/github/kagkarlsson/scheduler/boot/autoconfigure/DbSchedulerAutoConfiguration.java +++ b/db-scheduler-boot-starter/src/main/java/com/github/kagkarlsson/scheduler/boot/autoconfigure/DbSchedulerAutoConfiguration.java @@ -164,6 +164,10 @@ public Scheduler scheduler(DbSchedulerCustomizer customizer, StatsRegistry regis builder.enableImmediateExecution(); } + if (config.isPriorityEnabled()) { + builder.enablePriority(); + } + // Use custom executor service if provided customizer.executorService().ifPresent(builder::executorService); diff --git a/db-scheduler-boot-starter/src/main/java/com/github/kagkarlsson/scheduler/boot/config/DbSchedulerProperties.java b/db-scheduler-boot-starter/src/main/java/com/github/kagkarlsson/scheduler/boot/config/DbSchedulerProperties.java index 14e787aa..bb039cd2 100644 --- a/db-scheduler-boot-starter/src/main/java/com/github/kagkarlsson/scheduler/boot/config/DbSchedulerProperties.java +++ b/db-scheduler-boot-starter/src/main/java/com/github/kagkarlsson/scheduler/boot/config/DbSchedulerProperties.java @@ -114,6 +114,9 @@ public class DbSchedulerProperties { /** Whether or not to log the {@link Throwable} that caused a task to fail. */ private boolean failureLoggerLogStackTrace = SchedulerBuilder.LOG_STACK_TRACE_ON_FAILURE; + /** Whether or executions are ordered by priority */ + private boolean priorityEnabled = false; + public boolean isEnabled() { return enabled; } @@ -243,4 +246,12 @@ public boolean isAlwaysPersistTimestampInUtc() { public void setAlwaysPersistTimestampInUtc(boolean alwaysPersistTimestampInUTC) { this.alwaysPersistTimestampInUtc = alwaysPersistTimestampInUTC; } + + public boolean isPriorityEnabled() { + return priorityEnabled; + } + + public void setPriorityEnabled(boolean priorityEnabled) { + this.priorityEnabled = priorityEnabled; + } } diff --git a/db-scheduler-boot-starter/src/test/java/com/github/kagkarlsson/scheduler/boot/autoconfigure/DbSchedulerAutoConfigurationTest.java b/db-scheduler-boot-starter/src/test/java/com/github/kagkarlsson/scheduler/boot/autoconfigure/DbSchedulerAutoConfigurationTest.java index 985585c4..4df55c89 100644 --- a/db-scheduler-boot-starter/src/test/java/com/github/kagkarlsson/scheduler/boot/autoconfigure/DbSchedulerAutoConfigurationTest.java +++ b/db-scheduler-boot-starter/src/test/java/com/github/kagkarlsson/scheduler/boot/autoconfigure/DbSchedulerAutoConfigurationTest.java @@ -182,6 +182,20 @@ public void it_should_start_when_the_context_is_ready() { }); } + @Test + public void it_should_enable_priority() { + ctxRunner + .withPropertyValues("db-scheduler.priority-enabled=true") + .run( + (AssertableApplicationContext ctx) -> { + assertThat(ctx).hasSingleBean(DataSource.class); + assertThat(ctx).hasSingleBean(Scheduler.class); + + DbSchedulerProperties props = ctx.getBean(DbSchedulerProperties.class); + assertThat(props.isPriorityEnabled()).isTrue(); + }); + } + @Test public void it_should_support_custom_starting_strategies() { ctxRunner diff --git a/db-scheduler-boot-starter/src/test/resources/schema.sql b/db-scheduler-boot-starter/src/test/resources/schema.sql index 6242da80..f92d9918 100644 --- a/db-scheduler-boot-starter/src/test/resources/schema.sql +++ b/db-scheduler-boot-starter/src/test/resources/schema.sql @@ -10,5 +10,6 @@ create table if not exists scheduled_tasks ( consecutive_failures INT, last_heartbeat TIMESTAMP WITH TIME ZONE, version BIGINT, + priority INT, PRIMARY KEY (task_name, task_instance) ); diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java index 25177927..0b60f621 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerBuilder.java @@ -74,6 +74,7 @@ public class SchedulerBuilder { protected PollingStrategyConfig pollingStrategyConfig = DEFAULT_POLLING_STRATEGY; protected LogLevel logLevel = DEFAULT_FAILURE_LOG_LEVEL; protected boolean logStackTrace = LOG_STACK_TRACE_ON_FAILURE; + protected boolean enablePriority = false; private boolean registerShutdownHook = false; private int numberOfMissedHeartbeatsBeforeDead = DEFAULT_MISSED_HEARTBEATS_LIMIT; private boolean alwaysPersistTimestampInUTC = false; @@ -230,6 +231,11 @@ public SchedulerBuilder registerShutdownHook() { return this; } + public SchedulerBuilder enablePriority() { + this.enablePriority = true; + return this; + } + public Scheduler build() { if (schedulerName == null) { schedulerName = new SchedulerName.Hostname(); @@ -249,6 +255,7 @@ public Scheduler build() { taskResolver, schedulerName, serializer, + enablePriority, clock); final JdbcTaskRepository clientTaskRepository = new JdbcTaskRepository( @@ -259,6 +266,7 @@ public Scheduler build() { taskResolver, schedulerName, serializer, + enablePriority, clock); ExecutorService candidateExecutorService = executorService; @@ -287,11 +295,12 @@ public Scheduler build() { } LOG.info( - "Creating scheduler with configuration: threads={}, pollInterval={}s, heartbeat={}s enable-immediate-execution={}, table-name={}, name={}", + "Creating scheduler with configuration: threads={}, pollInterval={}s, heartbeat={}s, enable-immediate-execution={}, enable-priority={}, table-name={}, name={}", executorThreads, waiter.getWaitDuration().getSeconds(), heartbeatInterval.getSeconds(), enableImmediateExecution, + enablePriority, tableName, schedulerName.getName()); diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerClient.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerClient.java index c28895b0..c1832564 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerClient.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/SchedulerClient.java @@ -213,6 +213,7 @@ class Builder { private Serializer serializer = Serializer.DEFAULT_JAVA_SERIALIZER; private String tableName = JdbcTaskRepository.DEFAULT_TABLE_NAME; private JdbcCustomization jdbcCustomization; + private boolean priority = false; private Builder(DataSource dataSource, List> knownTasks) { this.dataSource = dataSource; @@ -237,6 +238,12 @@ public Builder tableName(String tableName) { return this; } + /** Will cause getScheduledExecutions(..) to return executions in priority order. */ + public Builder enablePriority() { + this.priority = true; + return this; + } + public Builder jdbcCustomization(JdbcCustomization jdbcCustomization) { this.jdbcCustomization = jdbcCustomization; return this; @@ -259,6 +266,7 @@ public SchedulerClient build() { taskResolver, new SchedulerClientName(), serializer, + priority, clock); return new StandardSchedulerClient(taskRepository, clock); diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/AutodetectJdbcCustomization.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/AutodetectJdbcCustomization.java index 0d2f3b0f..9b7c9c42 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/AutodetectJdbcCustomization.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/AutodetectJdbcCustomization.java @@ -132,8 +132,8 @@ public boolean supportsSingleStatementLockAndFetch() { @Override public List lockAndFetchSingleStatement( - JdbcTaskRepositoryContext ctx, Instant now, int limit) { - return jdbcCustomization.lockAndFetchSingleStatement(ctx, now, limit); + JdbcTaskRepositoryContext ctx, Instant now, int limit, boolean orderByPriority) { + return jdbcCustomization.lockAndFetchSingleStatement(ctx, now, limit, orderByPriority); } @Override @@ -143,14 +143,15 @@ public boolean supportsGenericLockAndFetch() { @Override public String createGenericSelectForUpdateQuery( - String tableName, int limit, String requiredAndCondition) { + String tableName, int limit, String requiredAndCondition, boolean orderByPriority) { return jdbcCustomization.createGenericSelectForUpdateQuery( - tableName, limit, requiredAndCondition); + tableName, limit, requiredAndCondition, orderByPriority); } @Override - public String createSelectDueQuery(String tableName, int limit, String andCondition) { - return jdbcCustomization.createSelectDueQuery(tableName, limit, andCondition); + public String createSelectDueQuery( + String tableName, int limit, String andCondition, boolean orderByPriority) { + return jdbcCustomization.createSelectDueQuery(tableName, limit, andCondition, orderByPriority); } @Override diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/DefaultJdbcCustomization.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/DefaultJdbcCustomization.java index 68075126..ca1b1f56 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/DefaultJdbcCustomization.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/DefaultJdbcCustomization.java @@ -26,6 +26,7 @@ import java.util.TimeZone; public class DefaultJdbcCustomization implements JdbcCustomization { + public static final Calendar UTC = GregorianCalendar.getInstance(TimeZone.getTimeZone("UTC")); private final boolean persistTimestampInUTC; @@ -88,7 +89,7 @@ public boolean supportsSingleStatementLockAndFetch() { @Override public List lockAndFetchSingleStatement( - JdbcTaskRepositoryContext ctx, Instant now, int limit) { + JdbcTaskRepositoryContext ctx, Instant now, int limit, boolean orderByPriority) { throw new UnsupportedOperationException( "lockAndFetch not supported for " + this.getClass().getName()); } @@ -100,19 +101,20 @@ public boolean supportsGenericLockAndFetch() { @Override public String createGenericSelectForUpdateQuery( - String tableName, int limit, String requiredAndCondition) { + String tableName, int limit, String requiredAndCondition, boolean orderByPriority) { throw new UnsupportedOperationException( "method must be implemented when supporting generic lock-and-fetch"); } @Override - public String createSelectDueQuery(String tableName, int limit, String andCondition) { + public String createSelectDueQuery( + String tableName, int limit, String andCondition, boolean orderByPriority) { final String explicitLimit = supportsExplicitQueryLimitPart() ? getQueryLimitPart(limit) : ""; return "select * from " + tableName + " where picked = ? and execution_time <= ? " + andCondition - + " order by execution_time asc " + + Queries.ansiSqlOrderPart(orderByPriority) + explicitLimit; } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcCustomization.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcCustomization.java index 36d0e192..4f482e12 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcCustomization.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcCustomization.java @@ -39,12 +39,13 @@ public interface JdbcCustomization { boolean supportsSingleStatementLockAndFetch(); List lockAndFetchSingleStatement( - JdbcTaskRepositoryContext ctx, Instant now, int limit); + JdbcTaskRepositoryContext ctx, Instant now, int limit, boolean orderByPriority); boolean supportsGenericLockAndFetch(); String createGenericSelectForUpdateQuery( - String tableName, int limit, String requiredAndCondition); + String tableName, int limit, String requiredAndCondition, boolean orderByPriority); - String createSelectDueQuery(String tableName, int limit, String andCondition); + String createSelectDueQuery( + String tableName, int limit, String andCondition, boolean orderByPriority); } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepository.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepository.java index e39d8476..f9791e38 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepository.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepository.java @@ -62,6 +62,7 @@ public class JdbcTaskRepository implements TaskRepository { private final Serializer serializer; private final String tableName; private final JdbcCustomization jdbcCustomization; + private final boolean orderByPriority; private final Clock clock; public JdbcTaskRepository( @@ -70,6 +71,7 @@ public JdbcTaskRepository( String tableName, TaskResolver taskResolver, SchedulerName schedulerSchedulerName, + boolean orderByPriority, Clock clock) { this( dataSource, @@ -79,6 +81,7 @@ public JdbcTaskRepository( taskResolver, schedulerSchedulerName, Serializer.DEFAULT_JAVA_SERIALIZER, + orderByPriority, clock); } @@ -89,6 +92,7 @@ public JdbcTaskRepository( String tableName, TaskResolver taskResolver, SchedulerName schedulerSchedulerName, + boolean orderByPriority, Clock clock) { this( dataSource, @@ -98,6 +102,7 @@ public JdbcTaskRepository( taskResolver, schedulerSchedulerName, Serializer.DEFAULT_JAVA_SERIALIZER, + orderByPriority, clock); } @@ -109,6 +114,7 @@ public JdbcTaskRepository( TaskResolver taskResolver, SchedulerName schedulerSchedulerName, Serializer serializer, + boolean orderByPriority, Clock clock) { this( jdbcCustomization, @@ -117,6 +123,7 @@ public JdbcTaskRepository( schedulerSchedulerName, serializer, new JdbcRunner(dataSource, commitWhenAutocommitDisabled), + orderByPriority, clock); } @@ -127,6 +134,7 @@ protected JdbcTaskRepository( SchedulerName schedulerSchedulerName, Serializer serializer, JdbcRunner jdbcRunner, + boolean orderByPriority, Clock clock) { this.tableName = tableName; this.taskResolver = taskResolver; @@ -134,6 +142,7 @@ protected JdbcTaskRepository( this.jdbcRunner = jdbcRunner; this.serializer = serializer; this.jdbcCustomization = jdbcCustomization; + this.orderByPriority = orderByPriority; this.clock = clock; } @@ -150,17 +159,29 @@ public boolean createIfNotExists(SchedulableInstance instance) { return false; } + // FIXLATER: replace with some sort of generic SQL-builder, possibly extend micro-jdbc + // with execute(query-and-pss) and have builder return that.. jdbcRunner.execute( "insert into " + tableName - + "(task_name, task_instance, task_data, execution_time, picked, version) values(?, ?, ?, ?, ?, ?)", + + "(task_name, task_instance, task_data, execution_time, picked, version" + + (orderByPriority ? ", priority" : "") + + ") values(?, ?, ?, ?, ?, ? " + + (orderByPriority ? ", ?" : "") + + ")", (PreparedStatement p) -> { - p.setString(1, taskInstance.getTaskName()); - p.setString(2, taskInstance.getId()); - jdbcCustomization.setTaskData(p, 3, serializer.serialize(taskInstance.getData())); - jdbcCustomization.setInstant(p, 4, instance.getNextExecutionTime(clock.now())); - p.setBoolean(5, false); - p.setLong(6, 1L); + int parameterIndex = 1; + p.setString(parameterIndex++, taskInstance.getTaskName()); + p.setString(parameterIndex++, taskInstance.getId()); + jdbcCustomization.setTaskData( + p, parameterIndex++, serializer.serialize(taskInstance.getData())); + jdbcCustomization.setInstant( + p, parameterIndex++, instance.getNextExecutionTime(clock.now())); + p.setBoolean(parameterIndex++, false); + p.setLong(parameterIndex++, 1L); + if (orderByPriority) { + p.setInt(parameterIndex++, taskInstance.getPriority()); + } }); return true; @@ -246,7 +267,7 @@ public void getScheduledExecutions( ScheduledExecutionsFilter filter, Consumer consumer) { UnresolvedFilter unresolvedFilter = new UnresolvedFilter(taskResolver.getUnresolved()); - QueryBuilder q = queryForFilter(filter); + QueryBuilder q = queryForFilter(filter, orderByPriority); if (unresolvedFilter.isActive() && !filter.getIncludeUnresolved()) { q.andCondition(unresolvedFilter); } @@ -262,7 +283,7 @@ public void getScheduledExecutions( ScheduledExecutionsFilter filter, String taskName, Consumer consumer) { UnresolvedFilter unresolvedFilter = new UnresolvedFilter(taskResolver.getUnresolved()); - QueryBuilder q = queryForFilter(filter); + QueryBuilder q = queryForFilter(filter, orderByPriority); if (unresolvedFilter.isActive() && !filter.getIncludeUnresolved()) { q.andCondition(unresolvedFilter); } @@ -279,7 +300,8 @@ public List getDue(Instant now, int limit) { LOG.trace("Using generic fetch-then-lock query"); final UnresolvedFilter unresolvedFilter = new UnresolvedFilter(taskResolver.getUnresolved()); String selectDueQuery = - jdbcCustomization.createSelectDueQuery(tableName, limit, unresolvedFilter.andCondition()); + jdbcCustomization.createSelectDueQuery( + tableName, limit, unresolvedFilter.andCondition(), orderByPriority); return jdbcRunner.query( selectDueQuery, @@ -303,7 +325,7 @@ public List lockAndFetchGeneric(Instant now, int limit) { new UnresolvedFilter(taskResolver.getUnresolved()); String selectForUpdateQuery = jdbcCustomization.createGenericSelectForUpdateQuery( - tableName, limit, unresolvedFilter.andCondition()); + tableName, limit, unresolvedFilter.andCondition(), orderByPriority); List candidates = txRunner.query( selectForUpdateQuery, @@ -389,7 +411,8 @@ private List updateToPicked( public List lockAndGetDue(Instant now, int limit) { if (jdbcCustomization.supportsSingleStatementLockAndFetch()) { LOG.trace("Using single-statement lock-and-fetch"); - return jdbcCustomization.lockAndFetchSingleStatement(getTaskRespositoryContext(), now, limit); + return jdbcCustomization.lockAndFetchSingleStatement( + getTaskRespositoryContext(), now, limit, orderByPriority); } else if (jdbcCustomization.supportsGenericLockAndFetch()) { LOG.trace("Using generic transaction-based lock-and-fetch"); return lockAndFetchGeneric(now, limit); @@ -688,7 +711,7 @@ private JdbcTaskRepositoryContext getTaskRespositoryContext() { () -> new ExecutionResultSetMapper(false, true)); } - private QueryBuilder queryForFilter(ScheduledExecutionsFilter filter) { + private QueryBuilder queryForFilter(ScheduledExecutionsFilter filter, boolean orderByPriority) { final QueryBuilder q = QueryBuilder.selectFromTable(tableName); filter @@ -698,7 +721,7 @@ private QueryBuilder queryForFilter(ScheduledExecutionsFilter filter) { q.andCondition(new PickedCondition(value)); }); - q.orderBy("execution_time asc"); + q.orderBy(orderByPriority ? "priority desc, execution_time asc" : "execution_time asc"); return q; } @@ -776,6 +799,8 @@ public Void map(ResultSet rs) throws SQLException { Instant lastHeartbeat = jdbcCustomization.getInstant(rs, "last_heartbeat"); long version = rs.getLong("version"); + int priority = orderByPriority ? rs.getInt("priority") : 0; + Supplier dataSupplier = memoize( () -> { @@ -789,7 +814,7 @@ public Void map(ResultSet rs) throws SQLException { this.consumer.accept( new Execution( executionTime, - new TaskInstance(taskName, instanceId, dataSupplier), + new TaskInstance(taskName, instanceId, dataSupplier, priority), picked, pickedBy, lastSuccess, diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/MariaDBJdbcCustomization.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/MariaDBJdbcCustomization.java index ce82a9d0..36c1c289 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/MariaDBJdbcCustomization.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/MariaDBJdbcCustomization.java @@ -43,9 +43,10 @@ public boolean supportsGenericLockAndFetch() { @Override public String createGenericSelectForUpdateQuery( - String tableName, int limit, String requiredAndCondition) { + String tableName, int limit, String requiredAndCondition, boolean orderByPriority) { return selectForUpdate( tableName, + Queries.ansiSqlOrderPart(orderByPriority), Queries.postgresSqlLimitPart(limit), requiredAndCondition, " FOR UPDATE SKIP LOCKED ", diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/MssqlJdbcCustomization.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/MssqlJdbcCustomization.java index 21e96cd1..2bb58641 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/MssqlJdbcCustomization.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/MssqlJdbcCustomization.java @@ -47,22 +47,24 @@ public boolean supportsGenericLockAndFetch() { } @Override - public String createSelectDueQuery(String tableName, int limit, String andCondition) { + public String createSelectDueQuery( + String tableName, int limit, String andCondition, boolean orderByPriority) { return "SELECT " + " * FROM " + tableName // try reading past locked rows to see if that helps on deadlock-warnings + " WITH (READPAST) WHERE picked = ? AND execution_time <= ? " + andCondition - + " ORDER BY execution_time ASC " + + Queries.ansiSqlOrderPart(orderByPriority) + getQueryLimitPart(limit); } @Override public String createGenericSelectForUpdateQuery( - String tableName, int limit, String requiredAndCondition) { + String tableName, int limit, String requiredAndCondition, boolean orderByPriority) { return selectForUpdate( tableName, + Queries.ansiSqlOrderPart(orderByPriority), Queries.ansiSqlLimitPart(limit), requiredAndCondition, null, diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/MySQL8JdbcCustomization.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/MySQL8JdbcCustomization.java index acdebb1a..8b86447e 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/MySQL8JdbcCustomization.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/MySQL8JdbcCustomization.java @@ -43,9 +43,10 @@ public boolean supportsGenericLockAndFetch() { @Override public String createGenericSelectForUpdateQuery( - String tableName, int limit, String requiredAndCondition) { + String tableName, int limit, String requiredAndCondition, boolean orderByPriority) { return selectForUpdate( tableName, + Queries.ansiSqlOrderPart(orderByPriority), Queries.postgresSqlLimitPart(limit), requiredAndCondition, " FOR UPDATE SKIP LOCKED ", diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/OracleJdbcCustomization.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/OracleJdbcCustomization.java index c5bd2058..6a2e921d 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/OracleJdbcCustomization.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/OracleJdbcCustomization.java @@ -34,9 +34,10 @@ public boolean supportsGenericLockAndFetch() { @Override public String createGenericSelectForUpdateQuery( - String tableName, int limit, String requiredAndCondition) { + String tableName, int limit, String requiredAndCondition, boolean orderByPriority) { return selectForUpdate( tableName, + Queries.ansiSqlOrderPart(orderByPriority), Queries.ansiSqlLimitPart(limit), requiredAndCondition, " FOR UPDATE SKIP LOCKED ", diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/PostgreSqlJdbcCustomization.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/PostgreSqlJdbcCustomization.java index fce03b1d..5961ed6b 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/PostgreSqlJdbcCustomization.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/PostgreSqlJdbcCustomization.java @@ -21,6 +21,7 @@ import java.util.List; public class PostgreSqlJdbcCustomization extends DefaultJdbcCustomization { + private final boolean useGenericLockAndFetch; public PostgreSqlJdbcCustomization( @@ -51,9 +52,10 @@ public boolean supportsGenericLockAndFetch() { @Override public String createGenericSelectForUpdateQuery( - String tableName, int limit, String requiredAndCondition) { + String tableName, int limit, String requiredAndCondition, boolean orderByPriority) { return selectForUpdate( tableName, + Queries.ansiSqlOrderPart(orderByPriority), getQueryLimitPart(limit), requiredAndCondition, " FOR UPDATE SKIP LOCKED ", @@ -62,12 +64,12 @@ public String createGenericSelectForUpdateQuery( @Override public List lockAndFetchSingleStatement( - JdbcTaskRepositoryContext ctx, Instant now, int limit) { + JdbcTaskRepositoryContext ctx, Instant now, int limit, boolean orderByPriority) { final JdbcTaskRepository.UnresolvedFilter unresolvedFilter = new JdbcTaskRepository.UnresolvedFilter(ctx.taskResolver.getUnresolved()); String selectForUpdateQuery = - " UPDATE " + " WITH locked_executions as (UPDATE " + ctx.tableName + " st1 SET picked = ?, picked_by = ?, last_heartbeat = ?, version = version + 1 " + " WHERE (st1.task_name, st1.task_instance) IN " @@ -76,10 +78,13 @@ public List lockAndFetchSingleStatement( + " st2 " + " WHERE picked = ? and execution_time <= ? " + unresolvedFilter.andCondition() - + " ORDER BY execution_time ASC FOR UPDATE SKIP LOCKED " + + Queries.ansiSqlOrderPart(orderByPriority) + + " FOR UPDATE SKIP LOCKED " + getQueryLimitPart(limit) + ")" - + " RETURNING st1.*"; + + " RETURNING st1.*) " + + " SELECT * FROM locked_executions " + + Queries.ansiSqlOrderPart(orderByPriority); return ctx.jdbcRunner.query( selectForUpdateQuery, diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/Queries.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/Queries.java index 544f2d63..0dd7f97a 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/Queries.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/jdbc/Queries.java @@ -19,6 +19,7 @@ public class Queries { public static String selectForUpdate( String tableName, + String orderPart, String limitPart, String requiredAndCondition, String postgresOracleStyleForUpdate, @@ -28,7 +29,7 @@ public static String selectForUpdate( + Optional.ofNullable(sqlServerStyleForUpdate).orElse("") + " WHERE picked = ? AND execution_time <= ? " + requiredAndCondition - + " ORDER BY execution_time ASC " + + orderPart + Optional.ofNullable(postgresOracleStyleForUpdate).orElse("") + limitPart; } @@ -40,4 +41,10 @@ public static String postgresSqlLimitPart(int limit) { public static String ansiSqlLimitPart(int limit) { return " OFFSET 0 ROWS FETCH FIRST " + limit + " ROWS ONLY "; } + + public static String ansiSqlOrderPart(boolean orderByPriority) { + return orderByPriority + ? " ORDER BY priority DESC, execution_time ASC " + : " ORDER BY execution_time ASC "; + } } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/AbstractTask.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/AbstractTask.java index cc6caa65..3cf9c252 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/AbstractTask.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/AbstractTask.java @@ -13,7 +13,10 @@ */ package com.github.kagkarlsson.scheduler.task; +import com.github.kagkarlsson.scheduler.task.TaskInstance.Builder; + public abstract class AbstractTask implements Task { + protected final String name; private final FailureHandler failureHandler; private final DeadExecutionHandler deadExecutionHandler; @@ -42,12 +45,17 @@ public Class getDataClass() { @Override public TaskInstance instance(String id) { - return new TaskInstance<>(this.name, id); + return instanceBuilder(id).build(); } @Override public TaskInstance instance(String id, T data) { - return new TaskInstance<>(this.name, id, data); + return instanceBuilder(id).priority(getDefaultPriority()).data(data).build(); + } + + @Override + public TaskInstance.Builder instanceBuilder(String id) { + return new Builder<>(this.name, id); } @Override diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/Priority.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/Priority.java new file mode 100644 index 00000000..734abb96 --- /dev/null +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/Priority.java @@ -0,0 +1,21 @@ +/* + * Copyright (C) Gustav Karlsson + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.kagkarlsson.scheduler.task; + +public class Priority { + + public static final int HIGH = 90; + public static final int MEDIUM = 50; + public static final int LOW = 10; +} diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/SchedulableInstance.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/SchedulableInstance.java index cc083718..012384a5 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/SchedulableInstance.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/SchedulableInstance.java @@ -14,9 +14,19 @@ package com.github.kagkarlsson.scheduler.task; import java.time.Instant; +import java.util.function.Supplier; public interface SchedulableInstance extends TaskInstanceId { + static SchedulableInstance of(TaskInstance taskInstance, Instant executionTime) { + return new SchedulableTaskInstance(taskInstance, executionTime); + } + + static SchedulableInstance of( + TaskInstance taskInstance, NextExecutionTime executionTime) { + return new SchedulableTaskInstance(taskInstance, executionTime); + } + TaskInstance getTaskInstance(); Instant getNextExecutionTime(Instant currentTime); @@ -29,12 +39,36 @@ default String getId() { return getTaskInstance().getId(); } - static SchedulableInstance of(TaskInstance taskInstance, Instant executionTime) { - return new SchedulableTaskInstance(taskInstance, executionTime); - } + class Builder { - static SchedulableInstance of( - TaskInstance taskInstance, NextExecutionTime executionTime) { - return new SchedulableTaskInstance(taskInstance, executionTime); + private final String taskName; + private final String id; + private Supplier dataSupplier = () -> (T) null; + private int priority = Priority.MEDIUM; + + public Builder(String taskName, String id) { + this.id = id; + this.taskName = taskName; + } + + public SchedulableInstance.Builder data(Supplier dataSupplier) { + this.dataSupplier = dataSupplier; + return this; + } + + public SchedulableInstance.Builder data(T data) { + this.dataSupplier = () -> (T) data; + return this; + } + + public SchedulableInstance.Builder priority(int priority) { + this.priority = priority; + return this; + } + + public SchedulableInstance scheduledTo(Instant executionTime) { + TaskInstance taskInstance = new TaskInstance<>(taskName, id, dataSupplier, priority); + return new SchedulableTaskInstance<>(taskInstance, executionTime); + } } } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/Task.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/Task.java index 0bd1b2bf..aa3ccf2c 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/Task.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/Task.java @@ -14,6 +14,7 @@ package com.github.kagkarlsson.scheduler.task; public interface Task extends ExecutionHandler, HasTaskName { + String getName(); Class getDataClass(); @@ -22,10 +23,11 @@ public interface Task extends ExecutionHandler, HasTaskName { TaskInstance instance(String id, T data); + TaskInstance.Builder instanceBuilder(String id); + default TaskInstanceId instanceId(String id) { return TaskInstanceId.of(getName(), id); } - ; SchedulableInstance schedulableInstance(String id); @@ -39,4 +41,8 @@ default TaskInstanceId instanceId(String id) { default String getTaskName() { return getName(); } + + default int getDefaultPriority() { + return Priority.MEDIUM; + } } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/TaskDescriptor.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/TaskDescriptor.java index 61ddb6d4..e4530ede 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/TaskDescriptor.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/TaskDescriptor.java @@ -19,8 +19,20 @@ public interface TaskDescriptor extends HasTaskName { Class getDataClass(); + static TaskDescriptor of(String name) { + return TaskDescriptor.of(name, Void.class); + } + static TaskDescriptor of(String name, Class dataClass) { - return new TaskDescriptor.SimpleTaskDescriptor(name, dataClass); + return new TaskDescriptor.SimpleTaskDescriptor<>(name, dataClass); + } + + default TaskInstance.Builder instance(String id) { + return new TaskInstance.Builder<>(getTaskName(), id); + } + + default TaskInstanceId instanceId(String id) { + return TaskInstanceId.of(getTaskName(), id); } class SimpleTaskDescriptor implements TaskDescriptor { diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/TaskInstance.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/TaskInstance.java index 2f3864fc..d32bc94f 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/TaskInstance.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/TaskInstance.java @@ -13,26 +13,31 @@ */ package com.github.kagkarlsson.scheduler.task; +import com.github.kagkarlsson.scheduler.task.helper.ScheduleAndData; +import java.time.Instant; +import java.util.Objects; import java.util.function.Supplier; -public final class TaskInstance implements TaskInstanceId { +public class TaskInstance implements TaskInstanceId { private final String taskName; private final String id; private final Supplier dataSupplier; + private final int priority; public TaskInstance(String taskName, String id) { this(taskName, id, (T) null); } public TaskInstance(String taskName, String id, T data) { - this(taskName, id, () -> data); + this(taskName, id, () -> data, Priority.MEDIUM); } - public TaskInstance(String taskName, String id, Supplier dataSupplier) { + public TaskInstance(String taskName, String id, Supplier dataSupplier, int priority) { this.taskName = taskName; this.id = id; this.dataSupplier = dataSupplier; + this.priority = priority; } public String getTaskAndInstance() { @@ -52,26 +57,79 @@ public T getData() { return dataSupplier.get(); } + public int getPriority() { + return priority; + } + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - TaskInstance that = (TaskInstance) o; - - if (!taskName.equals(that.taskName)) return false; - return id.equals(that.id); + return priority == that.priority + && Objects.equals(taskName, that.taskName) + && Objects.equals(id, that.id); } @Override public int hashCode() { - int result = taskName.hashCode(); - result = 31 * result + id.hashCode(); - return result; + return Objects.hash(taskName, id, priority); } @Override public String toString() { - return "TaskInstance: " + "task=" + taskName + ", id=" + id; + return "TaskInstance: " + "task=" + taskName + ", id=" + id + ", priority=" + priority; + } + + public static class Builder { + + private final String taskName; + private final String id; + private Supplier dataSupplier = () -> (T) null; + private int priority = Priority.MEDIUM; + + public Builder(String taskName, String id) { + this.id = id; + this.taskName = taskName; + } + + public Builder dataSupplier(Supplier dataSupplier) { + this.dataSupplier = dataSupplier; + return this; + } + + public Builder data(T data) { + this.dataSupplier = () -> (T) data; + ; + return this; + } + + public Builder priority(int priority) { + this.priority = priority; + return this; + } + + public TaskInstance build() { + return new TaskInstance<>(taskName, id, dataSupplier, priority); + } + + public SchedulableInstance scheduledTo(Instant executionTime) { + TaskInstance taskInstance = new TaskInstance<>(taskName, id, dataSupplier, priority); + return new SchedulableTaskInstance<>(taskInstance, executionTime); + } + + public SchedulableInstance scheduledAccordingToData() { + TaskInstance taskInstance = new TaskInstance<>(taskName, id, dataSupplier, priority); + T data = dataSupplier.get(); + if (!(data instanceof ScheduleAndData)) { + throw new RuntimeException( + "To be able to use method 'scheduledAccordingToData()', dataClass must implement ScheduleAndData interface and contain a Schedule"); + } + + ScheduleAndData scheduleAndData = (ScheduleAndData) data; + + return new SchedulableTaskInstance<>( + taskInstance, scheduleAndData.getSchedule()::getInitialExecutionTime); + } } } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/TaskWithDataDescriptor.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/TaskWithDataDescriptor.java index 20ebaa76..177a27f3 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/TaskWithDataDescriptor.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/TaskWithDataDescriptor.java @@ -13,7 +13,10 @@ */ package com.github.kagkarlsson.scheduler.task; -/** Experimental */ +/** + * @deprecated use {@link TaskDescriptor} directly instead. + */ +@Deprecated public class TaskWithDataDescriptor implements TaskDescriptor { private final String taskName; @@ -24,10 +27,6 @@ public TaskWithDataDescriptor(String taskName, Class dataClass) { // TODO: no this.dataClass = dataClass; } - public TaskInstance instance(String id, T data) { - return new TaskInstance<>(taskName, id, data); - } - @Override public String getTaskName() { return taskName; diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/TaskWithoutDataDescriptor.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/TaskWithoutDataDescriptor.java index 80c647a3..fe1a85a6 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/TaskWithoutDataDescriptor.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/TaskWithoutDataDescriptor.java @@ -13,7 +13,10 @@ */ package com.github.kagkarlsson.scheduler.task; -/** Experimental */ +/** + * @deprecated use {@link TaskDescriptor} directly instead. + */ +@Deprecated public class TaskWithoutDataDescriptor implements TaskDescriptor { private final String taskName; @@ -22,10 +25,6 @@ public TaskWithoutDataDescriptor(String taskName) { this.taskName = taskName; } - public TaskInstance instance(String id) { - return new TaskInstance<>(taskName, id); - } - @Override public String getTaskName() { return taskName; diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/CustomTask.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/CustomTask.java index 48d35a5e..6dcaa0f0 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/CustomTask.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/CustomTask.java @@ -22,13 +22,12 @@ import com.github.kagkarlsson.scheduler.task.OnStartup; import com.github.kagkarlsson.scheduler.task.SchedulableInstance; import com.github.kagkarlsson.scheduler.task.SchedulableTaskInstance; -import com.github.kagkarlsson.scheduler.task.TaskInstance; import java.time.Instant; import java.util.function.Function; public abstract class CustomTask extends AbstractTask implements OnStartup { - private ScheduleOnStartup scheduleOnStartup; private final NextExecutionTime defaultExecutionTime; + private ScheduleOnStartup scheduleOnStartup; public CustomTask( String name, @@ -44,13 +43,13 @@ public CustomTask( @Override public SchedulableInstance schedulableInstance(String id) { - return new SchedulableTaskInstance<>(new TaskInstance<>(getName(), id), defaultExecutionTime); + return new SchedulableTaskInstance<>(instanceBuilder(id).build(), defaultExecutionTime); } @Override public SchedulableInstance schedulableInstance(String id, T data) { return new SchedulableTaskInstance<>( - new TaskInstance<>(getName(), id, data), defaultExecutionTime); + instanceBuilder(id).data(data).build(), defaultExecutionTime); } @Override diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/OneTimeTask.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/OneTimeTask.java index 5b809e26..d59dbafa 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/OneTimeTask.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/OneTimeTask.java @@ -43,14 +43,13 @@ public OneTimeTask( @Override public SchedulableInstance schedulableInstance(String id) { - return new SchedulableTaskInstance<>( - new TaskInstance<>(getName(), id), (currentTime) -> currentTime); + return new SchedulableTaskInstance<>(instanceBuilder(id).build(), (currentTime) -> currentTime); } @Override public SchedulableInstance schedulableInstance(String id, T data) { return new SchedulableTaskInstance<>( - new TaskInstance<>(getName(), id, data), (currentTime) -> currentTime); + instanceBuilder(id).data(data).build(), (currentTime) -> currentTime); } @Override diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/RecurringTask.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/RecurringTask.java index 1e0e3b73..9c642363 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/RecurringTask.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/RecurringTask.java @@ -71,6 +71,11 @@ public RecurringTask( this.scheduleOnStartup = scheduleOnStartup; } + @Override + public int getDefaultPriority() { + return Priority.HIGH; + } + @Override public SchedulableInstance schedulableInstance(String id) { return new SchedulableTaskInstance<>( diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/RecurringTaskWithPersistentSchedule.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/RecurringTaskWithPersistentSchedule.java index dc2bdd50..ba66dee1 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/RecurringTaskWithPersistentSchedule.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/RecurringTaskWithPersistentSchedule.java @@ -47,7 +47,7 @@ public SchedulableInstance schedulableInstance(String id) { @Override public SchedulableInstance schedulableInstance(String id, T data) { return new SchedulableTaskInstance<>( - new TaskInstance<>(getName(), id, data), data.getSchedule()::getInitialExecutionTime); + instanceBuilder(id).data(data).build(), data.getSchedule()::getInitialExecutionTime); } @Override diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java index 8f3c8687..c79e4fe3 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/ManualScheduler.java @@ -48,7 +48,8 @@ public class ManualScheduler extends Scheduler { boolean logStackTrace, List onStartup, ExecutorService dueExecutor, - ScheduledExecutorService houseKeeperExecutor) { + ScheduledExecutorService houseKeeperExecutor, + boolean priorityEnabled) { super( clock, schedulerTaskRepository, diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java index 88899cd8..62b0438b 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/testhelper/TestHelper.java @@ -81,6 +81,7 @@ public ManualScheduler build() { taskResolver, new SchedulerName.Fixed("manual"), serializer, + enablePriority, clock); final JdbcTaskRepository clientTaskRepository = new JdbcTaskRepository( @@ -91,6 +92,7 @@ public ManualScheduler build() { taskResolver, new SchedulerName.Fixed("manual"), serializer, + enablePriority, clock); return new ManualScheduler( @@ -111,7 +113,8 @@ public ManualScheduler build() { true, startTasks, new ThrowingScheduledExecutorService(), - new ThrowingScheduledExecutorService()); + new ThrowingScheduledExecutorService(), + false); } public ManualScheduler start() { diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/CustomTableNameTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/CustomTableNameTest.java index 5af440e9..3c577f09 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/CustomTableNameTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/CustomTableNameTest.java @@ -41,6 +41,7 @@ public void setUp() { CUSTOM_TABLENAME, new TaskResolver(StatsRegistry.NOOP, knownTasks), new SchedulerName.Fixed(SCHEDULER_NAME), + false, new SystemClock()); DbUtils.runSqlResource("postgresql_custom_tablename.sql").accept(DB.getDataSource()); diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DbUtils.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DbUtils.java index 74e17ed5..e958d161 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DbUtils.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DbUtils.java @@ -35,7 +35,7 @@ public static Consumer runSqlResource(String resource, boolean split new InputStreamReader(DbUtils.class.getResourceAsStream(resource))); if (splitStatements) { for (String statement : statements.split(";")) { - if (!statement.trim().isEmpty()) { + if (!statement.trim().isEmpty() && !statement.trim().startsWith("--")) { jdbcRunner.execute(statement, NOOP); } } diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java index 61668d69..f6804439 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java @@ -56,6 +56,7 @@ public void setUp() { DEFAULT_TABLE_NAME, taskResolver, new SchedulerName.Fixed("scheduler1"), + false, settableClock); scheduler = diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ExecutionTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ExecutionTest.java index ab741bc6..79da09ef 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ExecutionTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ExecutionTest.java @@ -9,7 +9,6 @@ import com.github.kagkarlsson.scheduler.task.schedule.FixedDelay; import java.time.Duration; import java.time.Instant; -import org.junit.jupiter.api.Assertions.*; import org.junit.jupiter.api.Test; public class ExecutionTest { diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/JdbcTaskRepositoryTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/JdbcTaskRepositoryTest.java index 1e1628c9..f80888a8 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/JdbcTaskRepositoryTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/JdbcTaskRepositoryTest.java @@ -4,6 +4,7 @@ import static com.github.kagkarlsson.scheduler.ScheduledExecutionsFilter.onlyResolved; import static com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepository.DEFAULT_TABLE_NAME; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -29,7 +30,9 @@ import java.util.List; import java.util.Optional; import java.util.Random; +import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -61,14 +64,7 @@ public void setUp() { knownTasks.add(alternativeOneTimeTask); testableRegistry = new TestableRegistry(true, Collections.emptyList()); taskResolver = new TaskResolver(testableRegistry, knownTasks); - taskRepository = - new JdbcTaskRepository( - DB.getDataSource(), - false, - DEFAULT_TABLE_NAME, - taskResolver, - new SchedulerName.Fixed(SCHEDULER_NAME), - new SystemClock()); + taskRepository = createRepository(taskResolver, false); } @Test @@ -141,6 +137,36 @@ public void get_due_should_be_sorted() { assertThat(due, is(sortedDue)); } + @Test + public void get_due_should_be_sorted_by_priority_when_enabled() { + JdbcTaskRepository taskRespositoryWithPriority = createRepository(taskResolver, true); + Instant now = TimeHelper.truncatedInstantNow(); + SchedulableTaskInstance id1 = + new SchedulableTaskInstance<>( + oneTimeTask.instanceBuilder("id1").priority(1).build(), now.minus(Duration.ofDays(1))); + SchedulableTaskInstance id2 = + new SchedulableTaskInstance<>( + oneTimeTask.instanceBuilder("id2").priority(10).build(), now.minus(Duration.ofDays(2))); + SchedulableTaskInstance id3 = + new SchedulableTaskInstance<>( + oneTimeTask.instanceBuilder("id3").priority(5).build(), now.minus(Duration.ofDays(3))); + + Stream.of(id1, id2, id3).forEach(taskRespositoryWithPriority::createIfNotExists); + + List orderedByPriority = + taskRespositoryWithPriority.getDue(now, POLLING_LIMIT).stream() + .map(Execution::getId) + .collect(Collectors.toList()); + assertThat(orderedByPriority, contains("id2", "id3", "id1")); + + // default task-repository should have no order + List orderedByExecutionTime = + taskRepository.getDue(now, POLLING_LIMIT).stream() + .map(Execution::getId) + .collect(Collectors.toList()); + assertThat(orderedByExecutionTime, contains("id3", "id2", "id1")); + } + @Test public void get_due_should_not_include_previously_unresolved() { Instant now = TimeHelper.truncatedInstantNow(); @@ -363,13 +389,6 @@ public void get_scheduled_by_task_name() { assertThat(getScheduledExecutions(all().withPicked(false), "non-existing"), empty()); } - private List getScheduledExecutions( - ScheduledExecutionsFilter filter, String taskName) { - List alternativeTasks = new ArrayList<>(); - taskRepository.getScheduledExecutions(filter, taskName, alternativeTasks::add); - return alternativeTasks; - } - @Test public void get_dead_executions_should_not_include_previously_unresolved() { Instant now = TimeHelper.truncatedInstantNow(); @@ -379,14 +398,7 @@ public void get_dead_executions_should_not_include_previously_unresolved() { createDeadExecution(oneTimeTask.instance("id1"), timeDied); TaskResolver taskResolverMissingTask = new TaskResolver(testableRegistry); - JdbcTaskRepository repoMissingTask = - new JdbcTaskRepository( - DB.getDataSource(), - false, - DEFAULT_TABLE_NAME, - taskResolverMissingTask, - new SchedulerName.Fixed(SCHEDULER_NAME), - new SystemClock()); + JdbcTaskRepository repoMissingTask = createRepository(taskResolverMissingTask, false); assertThat(taskResolverMissingTask.getUnresolved(), hasSize(0)); assertEquals(0, testableRegistry.getCount(SchedulerStatsEvent.UNRESOLVED_TASK)); @@ -464,6 +476,24 @@ public void lockAndFetchGeneric_happy() { assertThat(taskRepository.pick(picked.get(0), now), OptionalMatchers.empty()); } + private List getScheduledExecutions( + ScheduledExecutionsFilter filter, String taskName) { + List alternativeTasks = new ArrayList<>(); + taskRepository.getScheduledExecutions(filter, taskName, alternativeTasks::add); + return alternativeTasks; + } + + private JdbcTaskRepository createRepository(TaskResolver taskResolver, boolean orderByPriority) { + return new JdbcTaskRepository( + DB.getDataSource(), + false, + DEFAULT_TABLE_NAME, + taskResolver, + new SchedulerName.Fixed(SCHEDULER_NAME), + orderByPriority, + new SystemClock()); + } + private void createDeadExecution(TaskInstance taskInstance, Instant timeDied) { taskRepository.createIfNotExists(new SchedulableTaskInstance<>(taskInstance, timeDied)); final Execution due = getSingleExecution(); diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/compatibility/CompatibilityTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/compatibility/CompatibilityTest.java index 1c821c70..5be88b2a 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/compatibility/CompatibilityTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/compatibility/CompatibilityTest.java @@ -1,10 +1,11 @@ package com.github.kagkarlsson.scheduler.compatibility; import static com.github.kagkarlsson.scheduler.SchedulerBuilder.UPPER_LIMIT_FRACTION_OF_THREADS_FOR_FETCH; +import static com.github.kagkarlsson.scheduler.helper.TimeHelper.truncatedInstantNow; import static com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepository.DEFAULT_TABLE_NAME; import static java.util.Collections.singletonList; +import static java.util.stream.Collectors.toList; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @@ -14,9 +15,9 @@ import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import co.unruly.matchers.OptionalMatchers; -import com.github.kagkarlsson.scheduler.*; import com.github.kagkarlsson.scheduler.DbUtils; import com.github.kagkarlsson.scheduler.Scheduler; +import com.github.kagkarlsson.scheduler.SchedulerBuilder; import com.github.kagkarlsson.scheduler.SchedulerName; import com.github.kagkarlsson.scheduler.StopSchedulerExtension; import com.github.kagkarlsson.scheduler.SystemClock; @@ -25,7 +26,6 @@ import com.github.kagkarlsson.scheduler.TestTasks.DoNothingHandler; import com.github.kagkarlsson.scheduler.helper.ExecutionCompletedCondition; import com.github.kagkarlsson.scheduler.helper.TestableRegistry; -import com.github.kagkarlsson.scheduler.helper.TimeHelper; import com.github.kagkarlsson.scheduler.jdbc.AutodetectJdbcCustomization; import com.github.kagkarlsson.scheduler.jdbc.JdbcCustomization; import com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepository; @@ -33,6 +33,7 @@ import com.github.kagkarlsson.scheduler.task.Execution; import com.github.kagkarlsson.scheduler.task.SchedulableInstance; import com.github.kagkarlsson.scheduler.task.SchedulableTaskInstance; +import com.github.kagkarlsson.scheduler.task.TaskDescriptor; import com.github.kagkarlsson.scheduler.task.TaskInstance; import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; import com.github.kagkarlsson.scheduler.task.helper.RecurringTask; @@ -47,7 +48,9 @@ import java.util.List; import java.util.Optional; import java.util.TimeZone; +import java.util.function.Function; import javax.sql.DataSource; +import org.hamcrest.Matchers; import org.hamcrest.collection.IsCollectionWithSize; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -60,6 +63,7 @@ public abstract class CompatibilityTest { private static final int POLLING_LIMIT = 10_000; + private static final TaskDescriptor ONETIME = TaskDescriptor.of("oneTime", String.class); private Logger LOG = LoggerFactory.getLogger(getClass()); private final boolean supportsSelectForUpdate; private final boolean shouldHavePersistentTimezone; @@ -97,7 +101,9 @@ public void setUp() { delayingHandlerOneTime = new TestTasks.CountingHandler<>(Duration.ofMillis(200)); delayingHandlerRecurring = new TestTasks.CountingHandler<>(Duration.ofMillis(200)); - oneTime = TestTasks.oneTimeWithType("oneTime", String.class, delayingHandlerOneTime); + oneTime = + TestTasks.oneTimeWithType( + ONETIME.getTaskName(), ONETIME.getDataClass(), delayingHandlerOneTime); recurring = TestTasks.recurring("recurring", FixedDelay.ofMillis(10), delayingHandlerRecurring); recurringWithData = TestTasks.recurringWithData( @@ -200,6 +206,70 @@ public void test_jdbc_repository_select_for_update_compatibility() { return; } + final JdbcTaskRepository jdbcTaskRepository = createJdbcTaskRepository(false); + + final Instant now = truncatedInstantNow(); + + jdbcTaskRepository.createIfNotExists( + ONETIME.instance("future1").scheduledTo(now.plusSeconds(10))); + jdbcTaskRepository.createIfNotExists(ONETIME.instance("id1").scheduledTo(now)); + List picked = jdbcTaskRepository.lockAndGetDue(now, POLLING_LIMIT); + assertThat(picked, IsCollectionWithSize.hasSize(1)); + + assertThat(jdbcTaskRepository.pick(picked.get(0), now), OptionalMatchers.empty()); + } + + @Test + public void test_jdbc_repository_get_due_correct_order_with_priority() { + assertCorrectOrder(true, (r) -> r.getDue(truncatedInstantNow(), POLLING_LIMIT)); + } + + @Test + public void test_jdbc_repository_get_due_correct_order_without_priority() { + assertCorrectOrder(false, (r) -> r.getDue(truncatedInstantNow(), POLLING_LIMIT)); + } + + @Test + public void test_jdbc_repository_lock_and_get_due_correct_order_with_priority() { + if (!supportsSelectForUpdate) { + return; + } + + assertCorrectOrder(true, (r) -> r.lockAndGetDue(truncatedInstantNow(), POLLING_LIMIT)); + } + + @Test + public void test_jdbc_repository_lock_and_get_due_correct_order_without_priority() { + if (!supportsSelectForUpdate) { + return; + } + + assertCorrectOrder(false, (r) -> r.lockAndGetDue(truncatedInstantNow(), POLLING_LIMIT)); + } + + private void assertCorrectOrder( + boolean orderByPriority, Function> methodUnderTest) { + final JdbcTaskRepository jdbcTaskRepository = createJdbcTaskRepository(orderByPriority); + final Instant now = truncatedInstantNow(); + + jdbcTaskRepository.createIfNotExists( + ONETIME.instance("1").priority(1).scheduledTo(now.minusSeconds(10))); + jdbcTaskRepository.createIfNotExists( + ONETIME.instance("2").priority(2).scheduledTo(now.minusSeconds(30))); + jdbcTaskRepository.createIfNotExists( + ONETIME.instance("3").priority(3).scheduledTo(now.minusSeconds(20))); + + List picked = methodUnderTest.apply(jdbcTaskRepository); + List ids = picked.stream().map(Execution::getId).collect(toList()); + + if (orderByPriority) { + assertThat("Contained: " + ids, ids, Matchers.contains("3", "2", "1")); + } else { + assertThat("Contained: " + ids, ids, Matchers.contains("2", "3", "1")); + } + } + + private JdbcTaskRepository createJdbcTaskRepository(boolean orderByPriority) { TaskResolver taskResolver = new TaskResolver(StatsRegistry.NOOP, new ArrayList<>()); taskResolver.addTask(oneTime); @@ -214,18 +284,9 @@ public void test_jdbc_repository_select_for_update_compatibility() { DEFAULT_TABLE_NAME, taskResolver, new SchedulerName.Fixed("scheduler1"), + orderByPriority, new SystemClock()); - - final Instant now = TimeHelper.truncatedInstantNow(); - - jdbcTaskRepository.createIfNotExists( - SchedulableInstance.of(oneTime.instance("future1"), now.plusSeconds(10))); - jdbcTaskRepository.createIfNotExists( - new SchedulableTaskInstance<>(oneTime.instance("id1"), now)); - List picked = jdbcTaskRepository.lockAndGetDue(now, POLLING_LIMIT); - assertThat(picked, IsCollectionWithSize.hasSize(1)); - - assertThat(jdbcTaskRepository.pick(picked.get(0), now), OptionalMatchers.empty()); + return jdbcTaskRepository; } @Test @@ -262,9 +323,20 @@ private void doJDBCRepositoryCompatibilityTestUsingData(String data) { DEFAULT_TABLE_NAME, taskResolver, new SchedulerName.Fixed("scheduler1"), + false, new SystemClock()); - final Instant now = TimeHelper.truncatedInstantNow(); + final JdbcTaskRepository jdbcTaskRepositoryWithPriority = + new JdbcTaskRepository( + dataSource, + commitWhenAutocommitDisabled(), + DEFAULT_TABLE_NAME, + taskResolver, + new SchedulerName.Fixed("scheduler1"), + true, + new SystemClock()); + + final Instant now = truncatedInstantNow(); final TaskInstance taskInstance = oneTime.instance("id1", data); final SchedulableTaskInstance newExecution = @@ -273,6 +345,10 @@ private void doJDBCRepositoryCompatibilityTestUsingData(String data) { Execution storedExecution = (jdbcTaskRepository.getExecution(taskInstance)).get(); assertThat(storedExecution.getExecutionTime(), is(now)); + // priority=true + assertThat(jdbcTaskRepositoryWithPriority.getDue(now, POLLING_LIMIT), hasSize(1)); + + // priority=false final List due = jdbcTaskRepository.getDue(now, POLLING_LIMIT); assertThat(due, hasSize(1)); final Optional pickedExecution = jdbcTaskRepository.pick(due.get(0), now); @@ -311,9 +387,10 @@ public void test_jdbc_repository_compatibility_set_data() { DEFAULT_TABLE_NAME, taskResolver, new SchedulerName.Fixed("scheduler1"), + false, new SystemClock()); - final Instant now = TimeHelper.truncatedInstantNow(); + final Instant now = truncatedInstantNow(); final TaskInstance taskInstance = recurringWithData.instance("id1", 1); final SchedulableTaskInstance newExecution = @@ -386,6 +463,7 @@ private JdbcTaskRepository createRepositoryForJdbcCustomization( DEFAULT_TABLE_NAME, defaultTaskResolver, new SchedulerName.Fixed("scheduler1"), + false, new SystemClock()); } } diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/compatibility/ZoneSpecificJdbcCustomization.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/compatibility/ZoneSpecificJdbcCustomization.java index 89c6bbaa..b49fe94d 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/compatibility/ZoneSpecificJdbcCustomization.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/compatibility/ZoneSpecificJdbcCustomization.java @@ -65,8 +65,8 @@ public boolean supportsSingleStatementLockAndFetch() { @Override public List lockAndFetchSingleStatement( - JdbcTaskRepositoryContext ctx, Instant now, int limit) { - return delegate.lockAndFetchSingleStatement(ctx, now, limit); + JdbcTaskRepositoryContext ctx, Instant now, int limit, boolean orderByPriority) { + return delegate.lockAndFetchSingleStatement(ctx, now, limit, orderByPriority); } @Override @@ -76,12 +76,14 @@ public boolean supportsGenericLockAndFetch() { @Override public String createGenericSelectForUpdateQuery( - String tableName, int limit, String requiredAndCondition) { - return delegate.createGenericSelectForUpdateQuery(tableName, limit, requiredAndCondition); + String tableName, int limit, String requiredAndCondition, boolean orderByPriority) { + return delegate.createGenericSelectForUpdateQuery( + tableName, limit, requiredAndCondition, orderByPriority); } @Override - public String createSelectDueQuery(String tableName, int limit, String andCondition) { - return delegate.createSelectDueQuery(tableName, limit, andCondition); + public String createSelectDueQuery( + String tableName, int limit, String andCondition, boolean orderByPriority) { + return delegate.createSelectDueQuery(tableName, limit, andCondition, orderByPriority); } } diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/ImmediateExecutionTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/ImmediateExecutionTest.java index 6a7b247a..3d897a44 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/ImmediateExecutionTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/ImmediateExecutionTest.java @@ -3,7 +3,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import co.unruly.matchers.TimeMatchers; diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/PriorityExecutionTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/PriorityExecutionTest.java new file mode 100644 index 00000000..4c834945 --- /dev/null +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/PriorityExecutionTest.java @@ -0,0 +1,172 @@ +package com.github.kagkarlsson.scheduler.functional; + +import static java.time.Duration.ofMinutes; +import static java.time.Instant.now; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; + +import com.github.kagkarlsson.scheduler.EmbeddedPostgresqlExtension; +import com.github.kagkarlsson.scheduler.Scheduler; +import com.github.kagkarlsson.scheduler.SchedulerName.Fixed; +import com.github.kagkarlsson.scheduler.StopSchedulerExtension; +import com.github.kagkarlsson.scheduler.TestTasks; +import com.github.kagkarlsson.scheduler.helper.TestableRegistry; +import com.github.kagkarlsson.scheduler.task.ExecutionComplete; +import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.stream.Collectors; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class PriorityExecutionTest { + @RegisterExtension + public EmbeddedPostgresqlExtension postgres = new EmbeddedPostgresqlExtension(); + + @RegisterExtension public StopSchedulerExtension stopScheduler = new StopSchedulerExtension(); + + private final OneTimeTask oneTimeTask = + TestTasks.oneTime("onetime-a", Void.class, TestTasks.DO_NOTHING); + + @Test + public void test_when_priority_is_enabled() { + TestableRegistry.Condition condition = TestableRegistry.Conditions.completed(4); + TestableRegistry registry = TestableRegistry.create().waitConditions(condition).build(); + + Scheduler scheduler = + Scheduler.create(postgres.getDataSource(), oneTimeTask) + .threads(1) // 1 thread to force being sequential + .pollingInterval(ofMinutes(1)) + .schedulerName(new Fixed("test")) + .statsRegistry(registry) + .enablePriority() + .build(); + + stopScheduler.register(scheduler); + + // no matter when they are scheduled, the highest priority should always be executed first + scheduler.schedule( + oneTimeTask.instanceBuilder("three").priority(3).build(), + Instant.parse("2020-01-01T20:00:00Z")); + + scheduler.schedule( + oneTimeTask.instanceBuilder("one").priority(1).build(), + Instant.parse("2020-01-01T18:00:00Z")); + + scheduler.schedule( + oneTimeTask.instanceBuilder("zero").priority(0).build(), + Instant.parse("2020-01-01T10:00:00Z")); + + scheduler.schedule( + oneTimeTask.instanceBuilder("two").priority(2).build(), + Instant.parse("2020-01-01T09:00:00Z")); + + assertTimeoutPreemptively( + Duration.ofSeconds(5), + () -> { + scheduler.start(); + condition.waitFor(); + + List completed = registry.getCompleted(); + assertThat(completed, hasSize(4)); + + // when priority is enabled + // the order should be maintained according to the priorities of tasks + List orderOfPriorities = + completed.stream() + .map(ExecutionComplete::getExecution) + .map(e -> e.taskInstance.getPriority()) + .collect(Collectors.toList()); + + assertThat(orderOfPriorities, contains(3, 2, 1, 0)); + + // and executions times should come second in the order + List orderOfExecutionTimes = + completed.stream() + .map(ExecutionComplete::getExecution) + .map(e -> e.executionTime) + .collect(Collectors.toList()); + assertThat( + orderOfExecutionTimes, + contains( + Instant.parse("2020-01-01T20:00:00Z"), + Instant.parse("2020-01-01T09:00:00Z"), + Instant.parse("2020-01-01T18:00:00Z"), + Instant.parse("2020-01-01T10:00:00Z"))); + + registry.assertNoFailures(); + }); + } + + @Test + public void test_when_priority_is_disabled() { + TestableRegistry.Condition condition = TestableRegistry.Conditions.completed(4); + TestableRegistry registry = TestableRegistry.create().waitConditions(condition).build(); + + Scheduler scheduler = + Scheduler.create(postgres.getDataSource(), oneTimeTask) + .threads(1) // 1 thread to force being sequential + .pollingInterval(ofMinutes(1)) + .schedulerName(new Fixed("test")) + .statsRegistry(registry) + .build(); + + stopScheduler.register(scheduler); + + // when priority is disabled priorities should be ignored + scheduler.schedule( + oneTimeTask.instanceBuilder("three").priority(3).build(), + Instant.parse("2020-01-01T20:00:00Z")); + + scheduler.schedule( + oneTimeTask.instanceBuilder("one").priority(1).build(), + Instant.parse("2020-01-01T18:00:00Z")); + + scheduler.schedule( + oneTimeTask.instanceBuilder("zero").priority(0).build(), + Instant.parse("2020-01-01T10:00:00Z")); + + scheduler.schedule( + oneTimeTask.instanceBuilder("two").priority(2).build(), + Instant.parse("2020-01-01T09:00:00Z")); + + scheduler.schedule(oneTimeTask.instanceBuilder("two").priority(2).build(), now()); + + assertTimeoutPreemptively( + Duration.ofSeconds(5), + () -> { + scheduler.start(); + condition.waitFor(); + + List completed = registry.getCompleted(); + assertThat(completed, hasSize(4)); + + // when priority is disabled + // the order should be maintained according to the execution times of tasks + List orderOfExecutionTimes = + completed.stream() + .map(ExecutionComplete::getExecution) + .map(e -> e.executionTime) + .collect(Collectors.toList()); + assertThat( + orderOfExecutionTimes, + contains( + Instant.parse("2020-01-01T09:00:00Z"), + Instant.parse("2020-01-01T10:00:00Z"), + Instant.parse("2020-01-01T18:00:00Z"), + Instant.parse("2020-01-01T20:00:00Z"))); + + // and priorities should be ignored + List orderOfPriorities = + completed.stream() + .map(ExecutionComplete::getExecution) + .map(e -> e.taskInstance.getPriority()) + .collect(Collectors.toList()); + + registry.assertNoFailures(); + }); + } +} diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepositoryExceptionsTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepositoryExceptionsTest.java index 47a40c71..6fb5b915 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepositoryExceptionsTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/jdbc/JdbcTaskRepositoryExceptionsTest.java @@ -42,7 +42,7 @@ public void setup() { expectedTableName = randomAlphanumeric(5); jdbcTaskRepository = new JdbcTaskRepository( - null, expectedTableName, null, null, null, mockJdbcRunner, new SystemClock()); + null, expectedTableName, null, null, null, mockJdbcRunner, false, new SystemClock()); } @Test @@ -57,10 +57,7 @@ public void createIfNotExistsFailsToAddNewTask() { .thenReturn(emptyList()); SQLRuntimeException rootCause = new SQLRuntimeException("SQL GO BOOM!!!"); when(mockJdbcRunner.execute( - ArgumentMatchers.eq( - "insert into " - + expectedTableName - + "(task_name, task_instance, task_data, execution_time, picked, version) values(?, ?, ?, ?, ?, ?)"), + ArgumentMatchers.startsWith("insert into " + expectedTableName), any(PreparedStatementSetter.class))) .thenThrow(rootCause); diff --git a/db-scheduler/src/test/resources/com/github/kagkarlsson/scheduler/postgresql_custom_tablename.sql b/db-scheduler/src/test/resources/com/github/kagkarlsson/scheduler/postgresql_custom_tablename.sql index 91021a74..88fae862 100644 --- a/db-scheduler/src/test/resources/com/github/kagkarlsson/scheduler/postgresql_custom_tablename.sql +++ b/db-scheduler/src/test/resources/com/github/kagkarlsson/scheduler/postgresql_custom_tablename.sql @@ -9,5 +9,6 @@ create table custom_tablename ( last_failure timestamp with time zone, last_heartbeat timestamp with time zone, version BIGINT not null, + priority INT, PRIMARY KEY (task_name, task_instance) -) \ No newline at end of file +) diff --git a/db-scheduler/src/test/resources/hsql_tables.sql b/db-scheduler/src/test/resources/hsql_tables.sql index bb7ee8cc..a018d949 100644 --- a/db-scheduler/src/test/resources/hsql_tables.sql +++ b/db-scheduler/src/test/resources/hsql_tables.sql @@ -10,5 +10,6 @@ create table scheduled_tasks ( consecutive_failures INT, last_heartbeat TIMESTAMP WITH TIME ZONE, version BIGINT, + priority SMALLINT, PRIMARY KEY (task_name, task_instance) ) diff --git a/db-scheduler/src/test/resources/mariadb_tables.sql b/db-scheduler/src/test/resources/mariadb_tables.sql index 21c28a0c..42259295 100644 --- a/db-scheduler/src/test/resources/mariadb_tables.sql +++ b/db-scheduler/src/test/resources/mariadb_tables.sql @@ -11,7 +11,13 @@ create table test.scheduled_tasks ( consecutive_failures INT, last_heartbeat timestamp(6) null, version BIGINT not null, + priority SMALLINT, PRIMARY KEY (task_name, task_instance), INDEX execution_time_idx (execution_time), - INDEX last_heartbeat_idx (last_heartbeat) + INDEX last_heartbeat_idx (last_heartbeat), + INDEX priority_execution_time_idx (priority desc, execution_time asc) ) + +-- an optimization for users of priority might be to add priority to the execution_time_idx +-- this _might_ save reads as the priority-value is already in the index +-- INDEX priority_execution_time_idx (execution_time asc, priority desc) diff --git a/db-scheduler/src/test/resources/mssql_tables.sql b/db-scheduler/src/test/resources/mssql_tables.sql index 2968a53a..f0f497fb 100644 --- a/db-scheduler/src/test/resources/mssql_tables.sql +++ b/db-scheduler/src/test/resources/mssql_tables.sql @@ -11,7 +11,13 @@ create table scheduled_tasks consecutive_failures int, last_heartbeat datetimeoffset, [version] bigint not null, + priority smallint, primary key (task_name, task_instance), index execution_time_idx (execution_time), - index last_heartbeat_idx (last_heartbeat) + index last_heartbeat_idx (last_heartbeat), + index priority_execution_time_idx (priority desc, execution_time asc) ) + +-- an optimization for users of priority might be to add priority to the priority_execution_time_idx +-- this _might_ save reads as the priority-value is already in the index +-- index priority_execution_time_idx (execution_time asc, priority desc) diff --git a/db-scheduler/src/test/resources/mysql_tables.sql b/db-scheduler/src/test/resources/mysql_tables.sql index 5ec9cd61..f6274785 100644 --- a/db-scheduler/src/test/resources/mysql_tables.sql +++ b/db-scheduler/src/test/resources/mysql_tables.sql @@ -10,7 +10,13 @@ create table test.scheduled_tasks ( consecutive_failures INT, last_heartbeat timestamp(6) null, version BIGINT not null, + priority SMALLINT, PRIMARY KEY (task_name, task_instance), INDEX execution_time_idx (execution_time), - INDEX last_heartbeat_idx (last_heartbeat) + INDEX last_heartbeat_idx (last_heartbeat), + INDEX priority_execution_time_idx (priority desc, execution_time asc) ) + +-- an optimization for users of priority might be to add priority to the execution_time_idx +-- this _might_ save reads as the priority-value is already in the index +-- CREATE INDEX execution_time_idx ON scheduled_tasks (execution_time asc, priority desc); diff --git a/db-scheduler/src/test/resources/oracle_tables.sql b/db-scheduler/src/test/resources/oracle_tables.sql index 3f79d9dc..4a6b06eb 100644 --- a/db-scheduler/src/test/resources/oracle_tables.sql +++ b/db-scheduler/src/test/resources/oracle_tables.sql @@ -11,8 +11,14 @@ create table scheduled_tasks consecutive_failures NUMBER(19, 0), last_heartbeat TIMESTAMP(6) WITH TIME ZONE, version NUMBER(19, 0), + priority SMALLINT, PRIMARY KEY (task_name, task_instance) ); CREATE INDEX scheduled_tasks_execution_time_idx on scheduled_tasks(execution_time); CREATE INDEX scheduled_tasks_last_heartbeat_idx on scheduled_tasks(last_heartbeat); +CREATE INDEX scheduled_tasks_priority_execution_time_idx on scheduled_tasks(priority desc, execution_time asc); + +-- an optimization for users of priority might be to add priority to the scheduled_tasks__execution_time__idx +-- this _might_ save reads as the priority-value is already in the index +-- CREATE INDEX scheduled_tasks_execution_time_idx on scheduled_tasks(execution_time asc, priority desc) diff --git a/db-scheduler/src/test/resources/postgresql_tables.sql b/db-scheduler/src/test/resources/postgresql_tables.sql index ce322004..6ff4d56f 100644 --- a/db-scheduler/src/test/resources/postgresql_tables.sql +++ b/db-scheduler/src/test/resources/postgresql_tables.sql @@ -10,8 +10,14 @@ create table scheduled_tasks ( consecutive_failures INT, last_heartbeat timestamp with time zone, version BIGINT not null, + priority SMALLINT, PRIMARY KEY (task_name, task_instance) ); CREATE INDEX execution_time_idx ON scheduled_tasks (execution_time); -CREATE INDEX last_heartbeat_idx ON scheduled_tasks (last_heartbeat); \ No newline at end of file +CREATE INDEX last_heartbeat_idx ON scheduled_tasks (last_heartbeat); +CREATE INDEX priority_execution_time_idx on scheduled_tasks (priority desc, execution_time asc); + +-- an optimization for users of priority might be to add priority to the execution_time_idx +-- this _might_ save reads as the priority-value is already in the index +-- CREATE INDEX execution_time_idx ON scheduled_tasks (execution_time asc, priority desc); diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/CheckForNewBatchDirectlyMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/CheckForNewBatchDirectlyMain.java index 8ec3f0ae..8389a8f5 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/CheckForNewBatchDirectlyMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/CheckForNewBatchDirectlyMain.java @@ -15,6 +15,7 @@ import com.github.kagkarlsson.examples.helpers.Example; import com.github.kagkarlsson.scheduler.Scheduler; +import com.github.kagkarlsson.scheduler.task.TaskDescriptor; import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; import com.github.kagkarlsson.scheduler.task.helper.Tasks; import java.time.Duration; @@ -27,10 +28,13 @@ public static void main(String[] args) { new CheckForNewBatchDirectlyMain().runWithDatasource(); } + public static final TaskDescriptor MY_TASK = TaskDescriptor.of("my_task"); + @Override public void run(DataSource dataSource) { + OneTimeTask onetimeTask = - Tasks.oneTime("my_task") + Tasks.oneTime(MY_TASK) .execute( (taskInstance, executionContext) -> { System.out.println("Executed!"); @@ -48,7 +52,7 @@ public void run(DataSource dataSource) { sleep(2); System.out.println("Scheduling 100 task-instances."); for (int i = 0; i < 100; i++) { - scheduler.schedule(onetimeTask.instance(String.valueOf(i)), Instant.now()); + scheduler.schedule(MY_TASK.instance(String.valueOf(i)).scheduledTo(Instant.now())); } } } diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/DeletingUnresolvedTasksMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/DeletingUnresolvedTasksMain.java index cfd68f20..7e6d5759 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/DeletingUnresolvedTasksMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/DeletingUnresolvedTasksMain.java @@ -16,6 +16,7 @@ import com.github.kagkarlsson.examples.helpers.Example; import com.github.kagkarlsson.scheduler.Scheduler; import com.github.kagkarlsson.scheduler.SchedulerClient; +import com.github.kagkarlsson.scheduler.task.TaskDescriptor; import com.github.kagkarlsson.scheduler.task.helper.RecurringTask; import com.github.kagkarlsson.scheduler.task.helper.Tasks; import com.github.kagkarlsson.scheduler.task.schedule.Schedules; @@ -26,6 +27,9 @@ public class DeletingUnresolvedTasksMain extends Example { + public static final TaskDescriptor UNRESOLVED_TASK_1 = TaskDescriptor.of("unresolved1"); + public static final TaskDescriptor UNRESOLVED_TASK_2 = TaskDescriptor.of("unresolved2"); + public static void main(String[] args) { new DeletingUnresolvedTasksMain().runWithDatasource(); } @@ -33,22 +37,25 @@ public static void main(String[] args) { @Override public void run(DataSource dataSource) { RecurringTask unresolvedTask = - Tasks.recurring("unresolved1", Schedules.fixedDelay(Duration.ofSeconds(1))) + Tasks.recurring(UNRESOLVED_TASK_1, Schedules.fixedDelay(Duration.ofSeconds(1))) .execute( (taskInstance, executionContext) -> { System.out.println("Ran"); }); RecurringTask unresolvedTask2 = - Tasks.recurring("unresolved2", Schedules.fixedDelay(Duration.ofSeconds(1))) + Tasks.recurring(UNRESOLVED_TASK_2, Schedules.fixedDelay(Duration.ofSeconds(1))) .execute( (taskInstance, executionContext) -> { System.out.println("Ran"); }); SchedulerClient client = SchedulerClient.Builder.create(dataSource).build(); - client.schedule(unresolvedTask.instance(RecurringTask.INSTANCE), Instant.now()); - client.schedule( - unresolvedTask2.instance(RecurringTask.INSTANCE), Instant.now().plusSeconds(10)); + client.scheduleIfNotExists( + UNRESOLVED_TASK_1.instance(RecurringTask.INSTANCE).scheduledTo(Instant.now())); + client.scheduleIfNotExists( + UNRESOLVED_TASK_2 + .instance(RecurringTask.INSTANCE) + .scheduledTo(Instant.now().plusSeconds(10))); final Scheduler scheduler = Scheduler.create(dataSource) diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/EnableImmediateExecutionMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/EnableImmediateExecutionMain.java index dd61b5bd..212f20b2 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/EnableImmediateExecutionMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/EnableImmediateExecutionMain.java @@ -15,6 +15,7 @@ import com.github.kagkarlsson.examples.helpers.Example; import com.github.kagkarlsson.scheduler.Scheduler; +import com.github.kagkarlsson.scheduler.task.TaskDescriptor; import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; import com.github.kagkarlsson.scheduler.task.helper.Tasks; import java.time.Duration; @@ -27,11 +28,13 @@ public static void main(String[] args) { new EnableImmediateExecutionMain().runWithDatasource(); } + public static final TaskDescriptor DESCRIPTOR = TaskDescriptor.of("my_task"); + @Override public void run(DataSource dataSource) { OneTimeTask onetimeTask = - Tasks.oneTime("my_task") + Tasks.oneTime(DESCRIPTOR) .execute( (taskInstance, executionContext) -> { System.out.println("Executed!"); @@ -48,8 +51,10 @@ public void run(DataSource dataSource) { sleep(2000); System.out.println("Scheduling task to executed immediately."); - scheduler.schedule(onetimeTask.instance("1"), Instant.now()); - // scheduler.triggerCheckForDueExecutions(); // another option for triggering execution - // directly + scheduler.schedule(DESCRIPTOR.instance("1").scheduledTo(Instant.now())); + + // scheduler.triggerCheckForDueExecutions(); + // another option for triggering execution directly + } } diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/ExponentialBackoffMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/ExponentialBackoffMain.java index 2127a0ae..784bcf04 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/ExponentialBackoffMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/ExponentialBackoffMain.java @@ -18,6 +18,7 @@ import com.github.kagkarlsson.examples.helpers.Example; import com.github.kagkarlsson.scheduler.Scheduler; import com.github.kagkarlsson.scheduler.task.FailureHandler; +import com.github.kagkarlsson.scheduler.task.TaskDescriptor; import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; import com.github.kagkarlsson.scheduler.task.helper.Tasks; import java.time.Instant; @@ -29,10 +30,14 @@ public static void main(String[] args) { new ExponentialBackoffMain().runWithDatasource(); } + public static final TaskDescriptor MY_TASK = + TaskDescriptor.of("exponential_backoff_task", Void.class); + @Override public void run(DataSource dataSource) { + OneTimeTask failingTask = - Tasks.oneTime("exponential_backoff_task") + Tasks.oneTime(MY_TASK) .onFailure(new FailureHandler.ExponentialBackoffFailureHandler<>(ofSeconds(1))) .execute( (taskInstance, executionContext) -> { @@ -45,7 +50,7 @@ public void run(DataSource dataSource) { .registerShutdownHook() .build(); - scheduler.schedule(failingTask.instance("1"), Instant.now()); + scheduler.schedule(MY_TASK.instance("1").scheduledTo(Instant.now())); scheduler.start(); } diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/ExponentialBackoffWithMaxRetriesMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/ExponentialBackoffWithMaxRetriesMain.java index 5d31b3a7..90714035 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/ExponentialBackoffWithMaxRetriesMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/ExponentialBackoffWithMaxRetriesMain.java @@ -18,6 +18,7 @@ import com.github.kagkarlsson.examples.helpers.Example; import com.github.kagkarlsson.scheduler.Scheduler; import com.github.kagkarlsson.scheduler.task.FailureHandler; +import com.github.kagkarlsson.scheduler.task.TaskDescriptor; import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; import com.github.kagkarlsson.scheduler.task.helper.Tasks; import java.time.Instant; @@ -29,10 +30,13 @@ public static void main(String[] args) { new ExponentialBackoffWithMaxRetriesMain().runWithDatasource(); } + public static final TaskDescriptor EXPONENTIAL_BACKOFF_TASK = + TaskDescriptor.of("exponential_backoff_with_max_retries_task"); + @Override public void run(DataSource dataSource) { OneTimeTask failingTask = - Tasks.oneTime("exponential_backoff_with_max_retries_task") + Tasks.oneTime(EXPONENTIAL_BACKOFF_TASK) .onFailure( new FailureHandler.MaxRetriesFailureHandler<>( 6, new FailureHandler.ExponentialBackoffFailureHandler<>(ofSeconds(1), 2))) @@ -47,7 +51,7 @@ public void run(DataSource dataSource) { .registerShutdownHook() .build(); - scheduler.schedule(failingTask.instance("1"), Instant.now()); + scheduler.schedule(EXPONENTIAL_BACKOFF_TASK.instance("1").scheduledTo(Instant.now())); scheduler.start(); } diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/HeartbeatMonitoringMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/HeartbeatMonitoringMain.java index f5a0771f..7dc24155 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/HeartbeatMonitoringMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/HeartbeatMonitoringMain.java @@ -19,6 +19,7 @@ import com.github.kagkarlsson.jdbc.JdbcRunner; import com.github.kagkarlsson.scheduler.HeartbeatState; import com.github.kagkarlsson.scheduler.Scheduler; +import com.github.kagkarlsson.scheduler.task.TaskDescriptor; import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; import com.github.kagkarlsson.scheduler.task.helper.Tasks; import java.time.Duration; @@ -31,11 +32,14 @@ public static void main(String[] args) { new HeartbeatMonitoringMain().runWithDatasource(); } + public static final TaskDescriptor WAIT_FOR_STALE_HEARTBEAT_TASK = + TaskDescriptor.of("wait-for-stale-heartbeat-task"); + @Override public void run(DataSource dataSource) { OneTimeTask waitForStaleHeartbeatTask = - Tasks.oneTime("wait-for-stale-heartbeat-task", Void.class) + Tasks.oneTime(WAIT_FOR_STALE_HEARTBEAT_TASK) .execute( (inst, ctx) -> { System.out.println("Running!"); @@ -57,7 +61,7 @@ public void run(DataSource dataSource) { scheduler.start(); - scheduler.schedule(waitForStaleHeartbeatTask.instance("1045"), Instant.now()); + scheduler.schedule(WAIT_FOR_STALE_HEARTBEAT_TASK.instance("1045").scheduledTo(Instant.now())); sleep(4000); JdbcRunner jdbcRunner = new JdbcRunner(dataSource); diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/JobChainingUsingSeparateTasksMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/JobChainingUsingSeparateTasksMain.java index 46de554f..6b9f60b8 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/JobChainingUsingSeparateTasksMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/JobChainingUsingSeparateTasksMain.java @@ -25,6 +25,11 @@ public class JobChainingUsingSeparateTasksMain extends Example { + public static final TaskDescriptor STEP1_TASK = + TaskDescriptor.of("job-step-1", JobId.class); + public static final TaskDescriptor STEP2_TASK = + TaskDescriptor.of("job-step-2", JobId.class); + public static void main(String[] args) { new JobChainingUsingSeparateTasksMain().runWithDatasource(); } @@ -33,15 +38,15 @@ public static void main(String[] args) { public void run(DataSource dataSource) { final CustomTask jobStep1 = - Tasks.custom("job-step-1", JobId.class) + Tasks.custom(STEP1_TASK) .execute( (taskInstance, executionContext) -> { System.out.println("Step1 ran. Job: " + taskInstance.getData()); - return new OnCompleteRemoveAndCreateNextStep("job-step-2"); + return new OnCompleteRemoveAndCreateNextStep(STEP2_TASK.getTaskName()); }); final CustomTask jobStep2 = - Tasks.custom("job-step-2", JobId.class) + Tasks.custom(STEP2_TASK) .execute( (taskInstance, executionContext) -> { System.out.println( @@ -60,13 +65,13 @@ public void run(DataSource dataSource) { sleep(1_000); // Schedule a multistep job. Simulate some instance-specific data, id=507 + // both steps will run directly scheduler.schedule( - jobStep1.instance("job-507", new JobId(507)), - Instant.now()); // both steps will run directly + STEP1_TASK.instance("job-507").data(new JobId(507)).scheduledTo(Instant.now())); } public static class JobId implements Serializable { - private static long serialVersionUID = 1L; + private static final long serialVersionUID = 1L; public int id; public JobId(int id) { @@ -79,7 +84,8 @@ public String toString() { } } - class OnCompleteRemoveAndCreateNextStep implements CompletionHandler { + @SuppressWarnings("rawtypes") + static class OnCompleteRemoveAndCreateNextStep implements CompletionHandler { private final String newTaskName; public OnCompleteRemoveAndCreateNextStep(String newTaskName) { diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/JobChainingUsingTaskDataMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/JobChainingUsingTaskDataMain.java index 5ae3fb72..425ce3e8 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/JobChainingUsingTaskDataMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/JobChainingUsingTaskDataMain.java @@ -16,6 +16,7 @@ import com.github.kagkarlsson.examples.helpers.Example; import com.github.kagkarlsson.scheduler.Scheduler; import com.github.kagkarlsson.scheduler.task.CompletionHandler; +import com.github.kagkarlsson.scheduler.task.TaskDescriptor; import com.github.kagkarlsson.scheduler.task.helper.CustomTask; import com.github.kagkarlsson.scheduler.task.helper.Tasks; import java.io.Serializable; @@ -25,6 +26,9 @@ public class JobChainingUsingTaskDataMain extends Example { + public static final TaskDescriptor JOB_CHAIN_TASK = + TaskDescriptor.of("job-chain-poc", JobState.class); + public static void main(String[] args) { new JobChainingUsingTaskDataMain().runWithDatasource(); } @@ -33,7 +37,7 @@ public static void main(String[] args) { public void run(DataSource dataSource) { final CustomTask chainingTask = - Tasks.custom("job-chain-poc", JobState.class) + Tasks.custom(JOB_CHAIN_TASK) .execute( (taskInstance, executionContext) -> { @@ -80,7 +84,10 @@ public void run(DataSource dataSource) { // Schedule a multistep job. Simulate some instance-specific data, id=507 scheduler.schedule( - chainingTask.instance("job-507", JobState.newJob(507)), Instant.now().plusSeconds(1)); + JOB_CHAIN_TASK + .instance("job-507") + .data(JobState.newJob(507)) + .scheduledTo(Instant.now().plusSeconds(1))); } public enum Step { @@ -90,7 +97,7 @@ public enum Step { } public static class JobState implements Serializable { - private static long serialVersionUID = 1L; + private static final long serialVersionUID = 1L; public Step currentStep; public int id; diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/JsonSerializerMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/JsonSerializerMain.java index b741f7b5..4bb0f4b6 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/JsonSerializerMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/JsonSerializerMain.java @@ -16,6 +16,7 @@ import com.github.kagkarlsson.examples.helpers.Example; import com.github.kagkarlsson.scheduler.Scheduler; import com.github.kagkarlsson.scheduler.serializer.GsonSerializer; +import com.github.kagkarlsson.scheduler.task.TaskDescriptor; import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; import com.github.kagkarlsson.scheduler.task.helper.Tasks; import java.time.Duration; @@ -29,11 +30,14 @@ public static void main(String[] args) { new JsonSerializerMain().runWithDatasource(); } + public static final TaskDescriptor JSON_TASK = + TaskDescriptor.of("json-task", JsonData.class); + @Override public void run(DataSource dataSource) { OneTimeTask myAdhocTask = - Tasks.oneTime("json-task", JsonData.class) + Tasks.oneTime(JSON_TASK) .execute( (inst, ctx) -> { System.out.println("Executed! Custom data: " + inst.getData()); @@ -49,8 +53,10 @@ public void run(DataSource dataSource) { scheduler.start(); scheduler.schedule( - myAdhocTask.instance("id1", new JsonData(1001L, Instant.now())), - Instant.now().plusSeconds(1)); + JSON_TASK + .instance("id1") + .data(new JsonData(1001L, Instant.now())) + .scheduledTo(Instant.now().plusSeconds(1))); } public static class JsonData { diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/MaxRetriesMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/MaxRetriesMain.java index 549d68f6..40975bab 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/MaxRetriesMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/MaxRetriesMain.java @@ -16,6 +16,7 @@ import com.github.kagkarlsson.examples.helpers.Example; import com.github.kagkarlsson.scheduler.Scheduler; import com.github.kagkarlsson.scheduler.task.FailureHandler; +import com.github.kagkarlsson.scheduler.task.TaskDescriptor; import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; import com.github.kagkarlsson.scheduler.task.helper.Tasks; import java.time.Duration; @@ -28,11 +29,13 @@ public static void main(String[] args) { new MaxRetriesMain().runWithDatasource(); } + public static final TaskDescriptor MAX_RETRIES_TASK = TaskDescriptor.of("max_retries_task"); + @Override public void run(DataSource dataSource) { OneTimeTask failingTask = - Tasks.oneTime("max_retries_task") + Tasks.oneTime(MAX_RETRIES_TASK) .onFailure( new FailureHandler.MaxRetriesFailureHandler<>( 3, @@ -56,7 +59,7 @@ public void run(DataSource dataSource) { .registerShutdownHook() .build(); - scheduler.schedule(failingTask.instance("1"), Instant.now()); + scheduler.schedule(MAX_RETRIES_TASK.instance("1").scheduledTo(Instant.now())); scheduler.start(); } diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/OneTimeTaskMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/OneTimeTaskMain.java index 400b1e94..7198b8ab 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/OneTimeTaskMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/OneTimeTaskMain.java @@ -15,6 +15,7 @@ import com.github.kagkarlsson.examples.helpers.Example; import com.github.kagkarlsson.scheduler.Scheduler; +import com.github.kagkarlsson.scheduler.task.TaskDescriptor; import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; import com.github.kagkarlsson.scheduler.task.helper.Tasks; import java.io.Serializable; @@ -23,6 +24,9 @@ public class OneTimeTaskMain extends Example { + public static final TaskDescriptor MY_TASK = + TaskDescriptor.of("my-onetime-task", MyTaskData.class); + public static void main(String[] args) { new OneTimeTaskMain().runWithDatasource(); } @@ -30,24 +34,29 @@ public static void main(String[] args) { @Override public void run(DataSource dataSource) { - OneTimeTask myAdhocTask = - Tasks.oneTime("my-typed-adhoc-task", MyTaskData.class) + OneTimeTask taskImplementation = + Tasks.oneTime(MY_TASK) .execute( (inst, ctx) -> { System.out.println("Executed! Custom data, Id: " + inst.getData().id); }); - final Scheduler scheduler = Scheduler.create(dataSource, myAdhocTask).threads(5).build(); + final Scheduler scheduler = + Scheduler.create(dataSource, taskImplementation).registerShutdownHook().build(); scheduler.start(); // Schedule the task for execution a certain time in the future and optionally provide custom // data for the execution scheduler.schedule( - myAdhocTask.instance("1045", new MyTaskData(1001L)), Instant.now().plusSeconds(5)); + MY_TASK + .instance("1045") + .data(new MyTaskData(1001L)) + .scheduledTo(Instant.now().plusSeconds(5))); } public static class MyTaskData implements Serializable { + private static final long serialVersionUID = 1L; public final long id; public MyTaskData(long id) { diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/RecurringTaskWithPersistentScheduleMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/RecurringTaskWithPersistentScheduleMain.java index 027a867f..7f8684e6 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/RecurringTaskWithPersistentScheduleMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/RecurringTaskWithPersistentScheduleMain.java @@ -15,6 +15,7 @@ import com.github.kagkarlsson.examples.helpers.Example; import com.github.kagkarlsson.scheduler.Scheduler; +import com.github.kagkarlsson.scheduler.task.TaskDescriptor; import com.github.kagkarlsson.scheduler.task.helper.RecurringTaskWithPersistentSchedule; import com.github.kagkarlsson.scheduler.task.helper.ScheduleAndData; import com.github.kagkarlsson.scheduler.task.helper.Tasks; @@ -24,6 +25,9 @@ public class RecurringTaskWithPersistentScheduleMain extends Example { + public static final TaskDescriptor DYNAMIC_RECURRING_TASK = + TaskDescriptor.of("dynamic-recurring-task", ScheduleAndNoData.class); + public static void main(String[] args) { new RecurringTaskWithPersistentScheduleMain().runWithDatasource(); } @@ -32,7 +36,7 @@ public static void main(String[] args) { public void run(DataSource dataSource) { final RecurringTaskWithPersistentSchedule task = - Tasks.recurringWithPersistentSchedule("dynamic-recurring-task", ScheduleAndNoData.class) + Tasks.recurringWithPersistentSchedule(DYNAMIC_RECURRING_TASK) .execute( (taskInstance, executionContext) -> { System.out.println( @@ -52,11 +56,15 @@ public void run(DataSource dataSource) { sleep(2_000); scheduler.schedule( - task.schedulableInstance( - "id1", new ScheduleAndNoData(Schedules.fixedDelay(Duration.ofSeconds(1))))); + DYNAMIC_RECURRING_TASK + .instance("id1") + .data(new ScheduleAndNoData(Schedules.fixedDelay(Duration.ofSeconds(1)))) + .scheduledAccordingToData()); scheduler.schedule( - task.schedulableInstance( - "id2", new ScheduleAndNoData(Schedules.fixedDelay(Duration.ofSeconds(6))))); + DYNAMIC_RECURRING_TASK + .instance("id2") + .data(new ScheduleAndNoData(Schedules.fixedDelay(Duration.ofSeconds(6)))) + .scheduledAccordingToData()); } public static class ScheduleAndNoData implements ScheduleAndData { diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerClientMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerClientMain.java index 079f79f7..5be2450d 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerClientMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerClientMain.java @@ -17,6 +17,7 @@ import com.github.kagkarlsson.scheduler.ScheduledExecutionsFilter; import com.github.kagkarlsson.scheduler.SchedulerClient; import com.github.kagkarlsson.scheduler.serializer.JacksonSerializer; +import com.github.kagkarlsson.scheduler.task.TaskDescriptor; import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; import com.github.kagkarlsson.scheduler.task.helper.Tasks; import java.time.Instant; @@ -28,11 +29,13 @@ public static void main(String[] args) { new SchedulerClientMain().runWithDatasource(); } + public static final TaskDescriptor MY_TASK = TaskDescriptor.of("task-a", Integer.class); + @Override public void run(DataSource dataSource) { final OneTimeTask task = - Tasks.oneTime("task-a", Integer.class) + Tasks.oneTime(MY_TASK) .execute( (taskInstance, executionContext) -> { System.out.println("Task a executed"); @@ -45,7 +48,8 @@ public void run(DataSource dataSource) { final Instant now = Instant.now(); for (int i = 0; i < 5; i++) { - clientWithTypeInformation.schedule(task.instance("id" + i, i), now.plusSeconds(i)); + clientWithTypeInformation.scheduleIfNotExists( + MY_TASK.instance("id" + i).data(i).scheduledTo(now.plusSeconds(i))); } System.out.println("Listing scheduled executions"); diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/SerializingExperimentMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/SerializingExperimentMain.java index b3e18dbc..f4319b88 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/SerializingExperimentMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/SerializingExperimentMain.java @@ -33,7 +33,7 @@ docker run -d --name my_postgres -v my_dbdata:/var/lib/postgresql/data -p 54320:5432 -e POSTGRES_PASSWORD=my_password postgres:13 psql -h localhost -p 54320 postgres postgres - create table scheduled_tasks ( task_name text not null, task_instance text not null, task_data bytea, execution_time timestamp with time zone not null, picked BOOLEAN not null, picked_by text, last_success timestamp with time zone, last_failure timestamp with time zone, consecutive_failures INT, last_heartbeat timestamp with time zone, version BIGINT not null, PRIMARY KEY (task_name, task_instance)); + create table scheduled_tasks ( task_name text not null, task_instance text not null, task_data bytea, execution_time timestamp with time zone not null, picked BOOLEAN not null, picked_by text, last_success timestamp with time zone, last_failure timestamp with time zone, consecutive_failures INT, last_heartbeat timestamp with time zone, version BIGINT not null, priority INT, PRIMARY KEY (task_name, task_instance)); get json data select convert_from(task_data, 'UTF-8') from scheduled_tasks ; diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/SpawningOtherTasksMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/SpawningOtherTasksMain.java index 82af8b33..2c9e65bd 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/SpawningOtherTasksMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/SpawningOtherTasksMain.java @@ -16,6 +16,7 @@ import com.github.kagkarlsson.examples.helpers.Example; import com.github.kagkarlsson.scheduler.Scheduler; import com.github.kagkarlsson.scheduler.SchedulerClient; +import com.github.kagkarlsson.scheduler.task.TaskDescriptor; import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask; import com.github.kagkarlsson.scheduler.task.helper.RecurringTask; import com.github.kagkarlsson.scheduler.task.helper.Tasks; @@ -30,11 +31,14 @@ public static void main(String[] args) { new SpawningOtherTasksMain().runWithDatasource(); } + public static final TaskDescriptor PRINTER_TASK = + TaskDescriptor.of("printer", Integer.class); + @Override public void run(DataSource dataSource) { final OneTimeTask printer = - Tasks.oneTime("printer", Integer.class) + Tasks.oneTime(PRINTER_TASK) .execute( (taskInstance, executionContext) -> { System.out.println("Printer: " + taskInstance.getData()); @@ -49,7 +53,8 @@ public void run(DataSource dataSource) { System.out.println("Scheduling printer executions."); for (int i = 0; i < 5; i++) { - client.schedule(printer.instance("print" + id + i, i), Instant.now()); + client.scheduleIfNotExists( + PRINTER_TASK.instance("print" + id + i).data(i).scheduledTo(Instant.now())); } }); diff --git a/examples/features/src/main/java/com/github/kagkarlsson/examples/StatefulRecurringTaskWithPersistentScheduleMain.java b/examples/features/src/main/java/com/github/kagkarlsson/examples/StatefulRecurringTaskWithPersistentScheduleMain.java index e8537754..cdd3a3a5 100644 --- a/examples/features/src/main/java/com/github/kagkarlsson/examples/StatefulRecurringTaskWithPersistentScheduleMain.java +++ b/examples/features/src/main/java/com/github/kagkarlsson/examples/StatefulRecurringTaskWithPersistentScheduleMain.java @@ -15,6 +15,7 @@ import com.github.kagkarlsson.examples.helpers.Example; import com.github.kagkarlsson.scheduler.Scheduler; +import com.github.kagkarlsson.scheduler.task.TaskDescriptor; import com.github.kagkarlsson.scheduler.task.helper.RecurringTaskWithPersistentSchedule; import com.github.kagkarlsson.scheduler.task.helper.ScheduleAndData; import com.github.kagkarlsson.scheduler.task.helper.Tasks; @@ -29,11 +30,14 @@ public static void main(String[] args) { new StatefulRecurringTaskWithPersistentScheduleMain().runWithDatasource(); } + public static final TaskDescriptor DYNAMIC_RECURRING_TASK = + TaskDescriptor.of("dynamic-recurring-task", ScheduleAndInteger.class); + @Override public void run(DataSource dataSource) { final RecurringTaskWithPersistentSchedule task = - Tasks.recurringWithPersistentSchedule("dynamic-recurring-task", ScheduleAndInteger.class) + Tasks.recurringWithPersistentSchedule(DYNAMIC_RECURRING_TASK) .executeStateful( (taskInstance, executionContext) -> { System.out.printf( @@ -54,8 +58,10 @@ public void run(DataSource dataSource) { sleep(2_000); scheduler.schedule( - task.schedulableInstance( - "id1", new ScheduleAndInteger(Schedules.fixedDelay(Duration.ofSeconds(3)), 1))); + DYNAMIC_RECURRING_TASK + .instance("id1") + .data(new ScheduleAndInteger(Schedules.fixedDelay(Duration.ofSeconds(3)), 1)) + .scheduledAccordingToData()); } public static class ScheduleAndInteger implements ScheduleAndData { diff --git a/examples/features/src/main/resources/hsql_tables.sql b/examples/features/src/main/resources/hsql_tables.sql index bb7ee8cc..1332d6b0 100644 --- a/examples/features/src/main/resources/hsql_tables.sql +++ b/examples/features/src/main/resources/hsql_tables.sql @@ -10,5 +10,6 @@ create table scheduled_tasks ( consecutive_failures INT, last_heartbeat TIMESTAMP WITH TIME ZONE, version BIGINT, + priority INT, PRIMARY KEY (task_name, task_instance) ) diff --git a/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/BasicExamplesConfiguration.java b/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/BasicExamplesConfiguration.java index f5d1b3e2..e19eef8b 100644 --- a/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/BasicExamplesConfiguration.java +++ b/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/BasicExamplesConfiguration.java @@ -18,7 +18,7 @@ import com.github.kagkarlsson.examples.boot.CounterService; import com.github.kagkarlsson.examples.boot.ExampleContext; import com.github.kagkarlsson.scheduler.task.Task; -import com.github.kagkarlsson.scheduler.task.TaskWithoutDataDescriptor; +import com.github.kagkarlsson.scheduler.task.TaskDescriptor; import com.github.kagkarlsson.scheduler.task.helper.Tasks; import java.time.Duration; import java.time.Instant; @@ -31,10 +31,10 @@ @Configuration public class BasicExamplesConfiguration { - public static final TaskWithoutDataDescriptor BASIC_ONE_TIME_TASK = - new TaskWithoutDataDescriptor("sample-one-time-task"); - public static final TaskWithoutDataDescriptor BASIC_RECURRING_TASK = - new TaskWithoutDataDescriptor("recurring-sample-task"); + public static final TaskDescriptor BASIC_ONE_TIME_TASK = + TaskDescriptor.of("sample-one-time-task"); + public static final TaskDescriptor BASIC_RECURRING_TASK = + TaskDescriptor.of("recurring-sample-task"); private static final Logger log = LoggerFactory.getLogger(BasicExamplesConfiguration.class); private static int ID = 1; @@ -44,7 +44,8 @@ public static void triggerOneTime(ExampleContext ctx) { "Scheduling a basic one-time task to run 'Instant.now()+seconds'. If seconds=0, the scheduler will pick " + "these up immediately since it is configured with 'immediate-execution-enabled=true'"); - ctx.schedulerClient.schedule(BASIC_ONE_TIME_TASK.instance(String.valueOf(ID++)), Instant.now()); + ctx.schedulerClient.scheduleIfNotExists( + BASIC_ONE_TIME_TASK.instance(String.valueOf(ID++)).scheduledTo(Instant.now())); } /** diff --git a/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/JobChainingConfiguration.java b/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/JobChainingConfiguration.java index a846fb57..c1346b45 100644 --- a/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/JobChainingConfiguration.java +++ b/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/JobChainingConfiguration.java @@ -15,7 +15,7 @@ import com.github.kagkarlsson.examples.boot.ExampleContext; import com.github.kagkarlsson.scheduler.task.CompletionHandler; -import com.github.kagkarlsson.scheduler.task.TaskWithDataDescriptor; +import com.github.kagkarlsson.scheduler.task.TaskDescriptor; import com.github.kagkarlsson.scheduler.task.helper.CustomTask; import com.github.kagkarlsson.scheduler.task.helper.Tasks; import com.github.kagkarlsson.scheduler.task.schedule.Schedules; @@ -29,12 +29,12 @@ @Configuration public class JobChainingConfiguration { - public static final TaskWithDataDescriptor CHAINED_STEP_1_TASK = - new TaskWithDataDescriptor<>("chained-step-1", JobState.class); - public static final TaskWithDataDescriptor CHAINED_STEP_2_TASK = - new TaskWithDataDescriptor<>("chained-step-2", JobState.class); - public static final TaskWithDataDescriptor CHAINED_STEP_3_TASK = - new TaskWithDataDescriptor<>("chained-step-3", JobState.class); + public static final TaskDescriptor CHAINED_STEP_1_TASK = + TaskDescriptor.of("chained-step-1", JobState.class); + public static final TaskDescriptor CHAINED_STEP_2_TASK = + TaskDescriptor.of("chained-step-2", JobState.class); + public static final TaskDescriptor CHAINED_STEP_3_TASK = + TaskDescriptor.of("chained-step-3", JobState.class); private static int CHAINED_JOB_ID = 1; /** Start the example */ @@ -42,8 +42,11 @@ public static void start(ExampleContext ctx) { ctx.log("Scheduling a chained one-time task to run."); int id = CHAINED_JOB_ID++; - ctx.schedulerClient.schedule( - CHAINED_STEP_1_TASK.instance("chain-" + id, new JobState(id, 0)), Instant.now()); + ctx.schedulerClient.scheduleIfNotExists( + CHAINED_STEP_1_TASK + .instance("chain-" + id) + .data(new JobState(id, 0)) + .scheduledTo(Instant.now())); } /** Bean definition */ diff --git a/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/LongRunningJobConfiguration.java b/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/LongRunningJobConfiguration.java index 94e6d794..a3e5c6ad 100644 --- a/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/LongRunningJobConfiguration.java +++ b/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/LongRunningJobConfiguration.java @@ -16,8 +16,8 @@ import com.github.kagkarlsson.examples.boot.ExampleContext; import com.github.kagkarlsson.scheduler.task.CompletionHandler; import com.github.kagkarlsson.scheduler.task.ExecutionContext; +import com.github.kagkarlsson.scheduler.task.TaskDescriptor; import com.github.kagkarlsson.scheduler.task.TaskInstance; -import com.github.kagkarlsson.scheduler.task.TaskWithDataDescriptor; import com.github.kagkarlsson.scheduler.task.helper.CustomTask; import com.github.kagkarlsson.scheduler.task.helper.Tasks; import com.github.kagkarlsson.scheduler.task.schedule.Schedules; @@ -31,8 +31,8 @@ @Configuration public class LongRunningJobConfiguration { - public static final TaskWithDataDescriptor LONG_RUNNING_TASK = - new TaskWithDataDescriptor<>("long-running-task", PrimeGeneratorState.class); + public static final TaskDescriptor LONG_RUNNING_TASK = + TaskDescriptor.of("long-running-task", PrimeGeneratorState.class); /** Start the example */ public static void start(ExampleContext ctx) { @@ -43,8 +43,11 @@ public static void start(ExampleContext ctx) { + "has found all prime-numbers smaller than 1.000.000."); PrimeGeneratorState initialState = new PrimeGeneratorState(0, 0); - ctx.schedulerClient.schedule( - LONG_RUNNING_TASK.instance("prime-generator", initialState), Instant.now()); + ctx.schedulerClient.scheduleIfNotExists( + LONG_RUNNING_TASK + .instance("prime-generator") + .data(initialState) + .scheduledTo(Instant.now())); } /** Bean definition */ diff --git a/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/MultiInstanceRecurringConfiguration.java b/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/MultiInstanceRecurringConfiguration.java index deeb7a85..ac621343 100644 --- a/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/MultiInstanceRecurringConfiguration.java +++ b/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/MultiInstanceRecurringConfiguration.java @@ -16,13 +16,12 @@ import com.github.kagkarlsson.examples.boot.ExampleContext; import com.github.kagkarlsson.scheduler.task.ExecutionContext; import com.github.kagkarlsson.scheduler.task.Task; +import com.github.kagkarlsson.scheduler.task.TaskDescriptor; import com.github.kagkarlsson.scheduler.task.TaskInstance; -import com.github.kagkarlsson.scheduler.task.TaskWithDataDescriptor; import com.github.kagkarlsson.scheduler.task.helper.ScheduleAndData; import com.github.kagkarlsson.scheduler.task.helper.Tasks; import com.github.kagkarlsson.scheduler.task.schedule.CronSchedule; import java.io.Serializable; -import java.time.Instant; import java.util.Random; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -31,8 +30,8 @@ @Configuration public class MultiInstanceRecurringConfiguration { - public static final TaskWithDataDescriptor MULTI_INSTANCE_RECURRING_TASK = - new TaskWithDataDescriptor<>("multi-instance-recurring-task", ScheduleAndCustomer.class); + public static final TaskDescriptor MULTI_INSTANCE_RECURRING_TASK = + TaskDescriptor.of("multi-instance-recurring-task", ScheduleAndCustomer.class); /** Start the example */ public static void start(ExampleContext ctx) { @@ -46,9 +45,8 @@ public static void start(ExampleContext ctx) { + " with data: " + data); - ctx.schedulerClient.schedule( - MULTI_INSTANCE_RECURRING_TASK.instance(customer.id, data), - cron.getInitialExecutionTime(Instant.now())); + ctx.schedulerClient.scheduleIfNotExists( + MULTI_INSTANCE_RECURRING_TASK.instance(customer.id).data(data).scheduledAccordingToData()); } /** Bean definition */ diff --git a/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/ParallellJobConfiguration.java b/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/ParallellJobConfiguration.java index b34844ad..afad0ac4 100644 --- a/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/ParallellJobConfiguration.java +++ b/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/ParallellJobConfiguration.java @@ -30,10 +30,10 @@ @Configuration public class ParallellJobConfiguration { - public static final TaskWithoutDataDescriptor PARALLEL_JOB_SPAWNER = - new TaskWithoutDataDescriptor("parallel-job-spawner"); - public static final TaskWithDataDescriptor PARALLEL_JOB = - new TaskWithDataDescriptor<>("parallel-job", Integer.class); + public static final TaskDescriptor PARALLEL_JOB_SPAWNER = + TaskDescriptor.of("parallel-job-spawner"); + public static final TaskDescriptor PARALLEL_JOB = + TaskDescriptor.of("parallel-job", Integer.class); private TransactionTemplate tx; public ParallellJobConfiguration(TransactionTemplate tx) { @@ -68,7 +68,11 @@ public Task parallelJobSpawner() { // dependency executionContext .getSchedulerClient() - .schedule(PARALLEL_JOB.instance("q" + quarter, quarter), Instant.now()); + .scheduleIfNotExists( + PARALLEL_JOB + .instance("q" + quarter) + .data(quarter) + .scheduledTo(Instant.now())); } }); EventLogger.logTask( diff --git a/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/RecurringStateTrackingConfiguration.java b/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/RecurringStateTrackingConfiguration.java index 5ad98d7b..eac1b012 100644 --- a/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/RecurringStateTrackingConfiguration.java +++ b/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/RecurringStateTrackingConfiguration.java @@ -16,8 +16,8 @@ import com.github.kagkarlsson.examples.boot.ExampleContext; import com.github.kagkarlsson.scheduler.task.ExecutionContext; import com.github.kagkarlsson.scheduler.task.Task; +import com.github.kagkarlsson.scheduler.task.TaskDescriptor; import com.github.kagkarlsson.scheduler.task.TaskInstance; -import com.github.kagkarlsson.scheduler.task.TaskWithDataDescriptor; import com.github.kagkarlsson.scheduler.task.helper.RecurringTask; import com.github.kagkarlsson.scheduler.task.helper.Tasks; import com.github.kagkarlsson.scheduler.task.schedule.Schedules; @@ -29,8 +29,8 @@ @Configuration public class RecurringStateTrackingConfiguration { - public static final TaskWithDataDescriptor STATE_TRACKING_RECURRING_TASK = - new TaskWithDataDescriptor<>("state-tracking-recurring-task", Integer.class); + public static final TaskDescriptor STATE_TRACKING_RECURRING_TASK = + TaskDescriptor.of("state-tracking-recurring-task", Integer.class); /** Start the example */ public static void start(ExampleContext ctx) { @@ -42,9 +42,11 @@ public static void start(ExampleContext ctx) { + data + ". Initial execution-time will be now (deviating from defined schedule)."); - ctx.schedulerClient.schedule( - STATE_TRACKING_RECURRING_TASK.instance(RecurringTask.INSTANCE, data), - Instant.now() // start-time, will run according to schedule after this + ctx.schedulerClient.scheduleIfNotExists( + STATE_TRACKING_RECURRING_TASK + .instance(RecurringTask.INSTANCE) + .data(data) + .scheduledTo(Instant.now()) // start-time, will run according to schedule after this ); } diff --git a/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/TransactionallyStagedJobConfiguration.java b/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/TransactionallyStagedJobConfiguration.java index 346f3e99..1d7a04e9 100644 --- a/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/TransactionallyStagedJobConfiguration.java +++ b/examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/TransactionallyStagedJobConfiguration.java @@ -16,8 +16,8 @@ import com.github.kagkarlsson.examples.boot.ExampleContext; import com.github.kagkarlsson.scheduler.task.ExecutionContext; import com.github.kagkarlsson.scheduler.task.Task; +import com.github.kagkarlsson.scheduler.task.TaskDescriptor; import com.github.kagkarlsson.scheduler.task.TaskInstance; -import com.github.kagkarlsson.scheduler.task.TaskWithoutDataDescriptor; import com.github.kagkarlsson.scheduler.task.helper.Tasks; import java.time.Instant; import java.util.Random; @@ -29,8 +29,8 @@ @Configuration public class TransactionallyStagedJobConfiguration { - public static final TaskWithoutDataDescriptor TRANSACTIONALLY_STAGED_TASK = - new TaskWithoutDataDescriptor("transactionally-staged-task"); + public static final TaskDescriptor TRANSACTIONALLY_STAGED_TASK = + TaskDescriptor.of("transactionally-staged-task"); private static int ID = 1; /** Start the example */ @@ -44,8 +44,10 @@ public static void start(ExampleContext ctx) { // Since it is scheduled in a transaction, the scheduler will not run it until the tx // commits // If the tx rolls back, the insert of the new job will also roll back, i.e. not run. - ctx.schedulerClient.schedule( - TRANSACTIONALLY_STAGED_TASK.instance(String.valueOf(ID++)), Instant.now()); + ctx.schedulerClient.scheduleIfNotExists( + TRANSACTIONALLY_STAGED_TASK + .instance(String.valueOf(ID++)) + .scheduledTo(Instant.now())); // Do additional database-operations here diff --git a/examples/spring-boot-example/src/main/resources/schema.sql b/examples/spring-boot-example/src/main/resources/schema.sql index fd5de87c..a35827d5 100644 --- a/examples/spring-boot-example/src/main/resources/schema.sql +++ b/examples/spring-boot-example/src/main/resources/schema.sql @@ -12,5 +12,6 @@ create table if not exists scheduled_tasks ( consecutive_failures INT, last_heartbeat TIMESTAMP WITH TIME ZONE, version BIGINT, + priority INT, PRIMARY KEY (task_name, task_instance) ); diff --git a/test/benchmark/infra/notes.txt b/test/benchmark/infra/notes.txt index 40e55c53..78c5e102 100644 --- a/test/benchmark/infra/notes.txt +++ b/test/benchmark/infra/notes.txt @@ -21,8 +21,8 @@ EOF # create test-data psql bench gustavkarlsson <