Skip to content

Commit

Permalink
Merge branch 'master' into rescheduleOrCreate
Browse files Browse the repository at this point in the history
  • Loading branch information
beilCrxmarkets committed Nov 28, 2024
2 parents 6e31bab + ddfa556 commit aceb60a
Show file tree
Hide file tree
Showing 76 changed files with 954 additions and 259 deletions.
72 changes: 57 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ See also [why not Quartz?](#why-db-scheduler-when-there-is-quartz)
<dependency>
<groupId>com.github.kagkarlsson</groupId>
<artifactId>db-scheduler</artifactId>
<version>14.0.3</version>
<version>15.0.0</version>
</dependency>
```

Expand Down Expand Up @@ -125,25 +125,32 @@ An instance of a _one-time_ task has a single execution-time some time in the fu
Define a _one-time_ task and start the scheduler:

```java
OneTimeTask<MyTaskData> myAdhocTask = Tasks.oneTime("my-typed-adhoc-task", MyTaskData.class)
TaskDescriptor<MyTaskData> MY_TASK =
TaskDescriptor.of("my-onetime-task", MyTaskData.class);

OneTimeTask<MyTaskData> myTaskImplementation =
Tasks.oneTime(MY_TASK)
.execute((inst, ctx) -> {
System.out.println("Executed! Custom data, Id: " + inst.getData().id);
System.out.println("Executed! Custom data, Id: " + inst.getData().id);
});

final Scheduler scheduler = Scheduler
.create(dataSource, myAdhocTask)
.registerShutdownHook()
.build();
.create(dataSource, myTaskImplementation)
.registerShutdownHook()
.build();

scheduler.start();

```

... and then at some point (at runtime), an execution is scheduled using the `SchedulerClient`:

```java
// Schedule the task for execution a certain time in the future and optionally provide custom data for the execution
scheduler.schedule(myAdhocTask.instance("1045", new MyTaskData(1001L)), Instant.now().plusSeconds(5));
scheduler.schedule(
MY_TASK
.instanceWithId("1045")
.data(new MyTaskData(1001L))
.scheduledTo(Instant.now().plusSeconds(5)));
```

### More examples
Expand All @@ -158,7 +165,7 @@ scheduler.schedule(myAdhocTask.instance("1045", new MyTaskData(1001L)), Instant.
| [ExponentialBackoffWithMaxRetriesMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/ExponentialBackoffWithMaxRetriesMain.java) | How to use exponential backoff as retry strategy **and** a hard limit on the maximum number of retries. |
| [TrackingProgressRecurringTaskMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/TrackingProgressRecurringTaskMain.java) | Recurring jobs may store `task_data` as a way of persisting state across executions. This example shows how. |
| [SpawningOtherTasksMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/SpawningOtherTasksMain.java) | Demonstrates on task scheduling instances of another by using the `executionContext.getSchedulerClient()`. |
| [SchedulerClientMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerClientMain.java) | Demonstates some of the `SchedulerClient`'s capabilities. Scheduling, fetching scheduled executions etc. |
| [SchedulerClientMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/SchedulerClientMain.java) | Demonstrates some of the `SchedulerClient`'s capabilities. Scheduling, fetching scheduled executions etc. |
| [RecurringTaskWithPersistentScheduleMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/RecurringTaskWithPersistentScheduleMain.java) | Multi-instance recurring jobs where the `Schedule` is stored as part of the `task_data`. For example suitable for multi-tenant applications where each tenent should have a recurring task. |
| [StatefulRecurringTaskWithPersistentScheduleMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/StatefulRecurringTaskWithPersistentScheduleMain.java) | |
| [JsonSerializerMain.java](./examples/features/src/main/java/com/github/kagkarlsson/examples/JsonSerializerMain.java) | Overrides serialization of `task_data` from Java-serialization (default) to JSON. |
Expand All @@ -178,7 +185,7 @@ scheduler.schedule(myAdhocTask.instance("1045", new MyTaskData(1001L)), Instant.
| [TransactionallyStagedJob](./examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/TransactionallyStagedJobConfiguration.java) | Example of [transactionally staging a job](https://brandur.org/job-drain), i.e. making sure the background job runs **iff** the transaction commits (along with other db-modifications). |
| [LongRunningJob](./examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/LongRunningJobConfiguration.java) | Long-running jobs need to **survive application restarts** and avoid restarting from the beginning. This example demonstrates how to **persisting progress** on shutdown and additionally a technique for limiting the job to run nightly. |
| [RecurringStateTracking](./examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/RecurringStateTrackingConfiguration.java) | A recurring task with state that can be modified after each run. |
| [ParallellJobSpawner](./examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/ParallellJobConfiguration.java) | Demonstrates how to use a recurring job to spawn one-time jobs, e.g. for parallelization. |
| [ParallelJobSpawner](./examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/ParallellJobConfiguration.java) | Demonstrates how to use a recurring job to spawn one-time jobs, e.g. for parallelization. |
| [JobChaining](./examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/JobChainingConfiguration.java) | A one-time job with **multiple steps**. The next step is scheduled after the previous one completes. |
| [MultiInstanceRecurring](./examples/spring-boot-example/src/main/java/com/github/kagkarlsson/examples/boot/config/MultiInstanceRecurringConfiguration.java) | Demonstrates how to achieve **multiple recurring jobs** of the same type, but potentially differing schedules and data. |

Expand Down Expand Up @@ -225,10 +232,36 @@ How long the scheduler will wait before interrupting executor-service threads. I
consider if it is possible to instead regularly check `executionContext.getSchedulerState().isShuttingDown()`
in the ExecutionHandler and abort long-running task. Default `30min`.

:gear: `.enablePriority()`<br/>
It is possible to define a priority for executions which determines the order in which due executions
are fetched from the database. An execution with a higher value for priority will run before an
execution with a lower value (technically, the ordering will be `order by priority desc, execution_time asc`).
Consider using priorities in the range 0-32000 as the field is defined as a `SMALLINT`. If you need a larger value,
modify the schema. For now, this feature is **opt-in**, and column `priority` is only needed by users who choose to
enable priority via this config setting.

Set the priority per instance using the `TaskInstance.Builder`:

```java
scheduler.schedule(
MY_TASK
.instance("1")
.priority(100)
.scheduledTo(Instant.now()));
```

**Note:**
* When enabling this feature, make sure you have the new necessary indexes defined. If you
regularly have a state with large amounts of executions both due and future, it might be beneficial
to add an index on `(execution_time asc, priority desc)` (replacing the old `execution_time asc`).
* This feature is not recommended for users of **MySQL** and **MariaDB** below version 8.x,
as they do not support descending indexes.
* Value `null` for priority may be interpreted differently depending on database (low or high).

#### Polling strategy

If you are running >1000 executions/s you might want to use the `lock-and-fetch` polling-strategy for lower overhead
and higher througput ([read more](#polling-strategy-lock-and-fetch)). If not, the default `fetch-and-lock-on-execute` will be fine.
and higher throughput ([read more](#polling-strategy-lock-and-fetch)). If not, the default `fetch-and-lock-on-execute` will be fine.

:gear: `.pollUsingFetchAndLockOnExecute(double, double)`<br/>
Use default polling strategy `fetch-and-lock-on-execute`.<br/>
Expand All @@ -249,7 +282,7 @@ Fetched executions are already locked/picked for this scheduler-instance thus sa
<br/>For high throughput
(i.e. keep threads busy), set to for example `1.0, 4.0`. Currently hearbeats are not updated for picked executions
in queue (applicable if `upperLimitFractionOfThreads > 1.0`). If they stay there for more than
`4 * hearbeat-interval` (default `20m`), not starting execution, they will be detected as _dead_ and likely be
`4 * heartbeat-interval` (default `20m`), not starting execution, they will be detected as _dead_ and likely be
unlocked again (determined by `DeadExecutionHandler`). Currently supported by **postgres**. **sql-server** also supports
this, but testing has shown this is prone to deadlocks and thus not recommended until understood/resolved.

Expand Down Expand Up @@ -385,7 +418,7 @@ For Spring Boot applications, there is a starter `db-scheduler-spring-boot-start
<dependency>
<groupId>com.github.kagkarlsson</groupId>
<artifactId>db-scheduler-spring-boot-starter</artifactId>
<version>14.0.3</version>
<version>15.0.0</version>
</dependency>
```
**NOTE**: This includes the db-scheduler dependency itself.
Expand All @@ -408,6 +441,7 @@ db-scheduler.table-name=scheduled_tasks
db-scheduler.immediate-execution-enabled=false
db-scheduler.scheduler-name=
db-scheduler.threads=10
db-scheduler.priority-enabled=false

# Ignored if a custom DbSchedulerStarter bean is defined
db-scheduler.delay-startup-until-context-ready=false
Expand Down Expand Up @@ -579,6 +613,14 @@ There are a number of users that are using db-scheduler for high throughput use-

See [releases](https://github.com/kagkarlsson/db-scheduler/releases) for release-notes.

**Upgrading to 15.x**
* Priority is a new opt-in feature. To be able to use it, column `priority` and index `priority_execution_time_idx`
must be added to the database schema. See table definitions for
[postgresql](./db-scheduler/src/test/resources/postgresql_tables.sql),
[oracle](./db-scheduler/src/test/resources/oracle_tables.sql) or
[mysql](./db-scheduler/src/test/resources/mysql_tables.sql).
At some point, this column will be made mandatory. This will be made clear in future release/upgrade-notes.

**Upgrading to 8.x**
* Custom Schedules must implement a method `boolean isDeterministic()` to indicate whether they will always produce the same instants or not.

Expand All @@ -587,10 +629,10 @@ See [releases](https://github.com/kagkarlsson/db-scheduler/releases) for release

**Upgrading to 3.x**
* No schema changes
* Task creation are preferrably done through builders in `Tasks` class
* Task creation are preferably done through builders in `Tasks` class

**Upgrading to 2.x**
* Add column `task_data` to the database schema. See table definitions for [postgresql](./b-scheduler/src/test/resources/postgresql_tables.sql), [oracle](./db-scheduler/src/test/resources/oracle_tables.sql) or [mysql](./db-scheduler/src/test/resources/mysql_tables.sql).
* Add column `task_data` to the database schema. See table definitions for [postgresql](./db-scheduler/src/test/resources/postgresql_tables.sql), [oracle](./db-scheduler/src/test/resources/oracle_tables.sql) or [mysql](./db-scheduler/src/test/resources/mysql_tables.sql).

## Building the source

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ public Scheduler scheduler(DbSchedulerCustomizer customizer, StatsRegistry regis
builder.enableImmediateExecution();
}

if (config.isPriorityEnabled()) {
builder.enablePriority();
}

// Use custom executor service if provided
customizer.executorService().ifPresent(builder::executorService);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ public class DbSchedulerProperties {
/** Whether or not to log the {@link Throwable} that caused a task to fail. */
private boolean failureLoggerLogStackTrace = SchedulerBuilder.LOG_STACK_TRACE_ON_FAILURE;

/** Whether or executions are ordered by priority */
private boolean priorityEnabled = false;

public boolean isEnabled() {
return enabled;
}
Expand Down Expand Up @@ -243,4 +246,12 @@ public boolean isAlwaysPersistTimestampInUtc() {
public void setAlwaysPersistTimestampInUtc(boolean alwaysPersistTimestampInUTC) {
this.alwaysPersistTimestampInUtc = alwaysPersistTimestampInUTC;
}

public boolean isPriorityEnabled() {
return priorityEnabled;
}

public void setPriorityEnabled(boolean priorityEnabled) {
this.priorityEnabled = priorityEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,20 @@ public void it_should_start_when_the_context_is_ready() {
});
}

@Test
public void it_should_enable_priority() {
ctxRunner
.withPropertyValues("db-scheduler.priority-enabled=true")
.run(
(AssertableApplicationContext ctx) -> {
assertThat(ctx).hasSingleBean(DataSource.class);
assertThat(ctx).hasSingleBean(Scheduler.class);

DbSchedulerProperties props = ctx.getBean(DbSchedulerProperties.class);
assertThat(props.isPriorityEnabled()).isTrue();
});
}

@Test
public void it_should_support_custom_starting_strategies() {
ctxRunner
Expand Down
1 change: 1 addition & 0 deletions db-scheduler-boot-starter/src/test/resources/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ create table if not exists scheduled_tasks (
consecutive_failures INT,
last_heartbeat TIMESTAMP WITH TIME ZONE,
version BIGINT,
priority INT,
PRIMARY KEY (task_name, task_instance)
);
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class SchedulerBuilder {
protected PollingStrategyConfig pollingStrategyConfig = DEFAULT_POLLING_STRATEGY;
protected LogLevel logLevel = DEFAULT_FAILURE_LOG_LEVEL;
protected boolean logStackTrace = LOG_STACK_TRACE_ON_FAILURE;
protected boolean enablePriority = false;
private boolean registerShutdownHook = false;
private int numberOfMissedHeartbeatsBeforeDead = DEFAULT_MISSED_HEARTBEATS_LIMIT;
private boolean alwaysPersistTimestampInUTC = false;
Expand Down Expand Up @@ -230,6 +231,11 @@ public SchedulerBuilder registerShutdownHook() {
return this;
}

public SchedulerBuilder enablePriority() {
this.enablePriority = true;
return this;
}

public Scheduler build() {
if (schedulerName == null) {
schedulerName = new SchedulerName.Hostname();
Expand All @@ -249,6 +255,7 @@ public Scheduler build() {
taskResolver,
schedulerName,
serializer,
enablePriority,
clock);
final JdbcTaskRepository clientTaskRepository =
new JdbcTaskRepository(
Expand All @@ -259,6 +266,7 @@ public Scheduler build() {
taskResolver,
schedulerName,
serializer,
enablePriority,
clock);

ExecutorService candidateExecutorService = executorService;
Expand Down Expand Up @@ -287,11 +295,12 @@ public Scheduler build() {
}

LOG.info(
"Creating scheduler with configuration: threads={}, pollInterval={}s, heartbeat={}s enable-immediate-execution={}, table-name={}, name={}",
"Creating scheduler with configuration: threads={}, pollInterval={}s, heartbeat={}s, enable-immediate-execution={}, enable-priority={}, table-name={}, name={}",
executorThreads,
waiter.getWaitDuration().getSeconds(),
heartbeatInterval.getSeconds(),
enableImmediateExecution,
enablePriority,
tableName,
schedulerName.getName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ class Builder {
private Serializer serializer = Serializer.DEFAULT_JAVA_SERIALIZER;
private String tableName = JdbcTaskRepository.DEFAULT_TABLE_NAME;
private JdbcCustomization jdbcCustomization;
private boolean priority = false;

private Builder(DataSource dataSource, List<Task<?>> knownTasks) {
this.dataSource = dataSource;
Expand All @@ -265,6 +266,12 @@ public Builder tableName(String tableName) {
return this;
}

/** Will cause getScheduledExecutions(..) to return executions in priority order. */
public Builder enablePriority() {
this.priority = true;
return this;
}

public Builder jdbcCustomization(JdbcCustomization jdbcCustomization) {
this.jdbcCustomization = jdbcCustomization;
return this;
Expand All @@ -287,6 +294,7 @@ public SchedulerClient build() {
taskResolver,
new SchedulerClientName(),
serializer,
priority,
clock);

return new StandardSchedulerClient(taskRepository, clock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ public boolean supportsSingleStatementLockAndFetch() {

@Override
public List<Execution> lockAndFetchSingleStatement(
JdbcTaskRepositoryContext ctx, Instant now, int limit) {
return jdbcCustomization.lockAndFetchSingleStatement(ctx, now, limit);
JdbcTaskRepositoryContext ctx, Instant now, int limit, boolean orderByPriority) {
return jdbcCustomization.lockAndFetchSingleStatement(ctx, now, limit, orderByPriority);
}

@Override
Expand All @@ -143,14 +143,15 @@ public boolean supportsGenericLockAndFetch() {

@Override
public String createGenericSelectForUpdateQuery(
String tableName, int limit, String requiredAndCondition) {
String tableName, int limit, String requiredAndCondition, boolean orderByPriority) {
return jdbcCustomization.createGenericSelectForUpdateQuery(
tableName, limit, requiredAndCondition);
tableName, limit, requiredAndCondition, orderByPriority);
}

@Override
public String createSelectDueQuery(String tableName, int limit, String andCondition) {
return jdbcCustomization.createSelectDueQuery(tableName, limit, andCondition);
public String createSelectDueQuery(
String tableName, int limit, String andCondition, boolean orderByPriority) {
return jdbcCustomization.createSelectDueQuery(tableName, limit, andCondition, orderByPriority);
}

@Override
Expand Down
Loading

0 comments on commit aceb60a

Please sign in to comment.