Skip to content

Commit

Permalink
feat: batch scheduling (#595)
Browse files Browse the repository at this point in the history
  • Loading branch information
geirsagberg committed Jan 12, 2025
1 parent 541bf20 commit 4b3c9a7
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 27 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,21 @@ scheduler.schedule(
.scheduledTo(Instant.now().plusSeconds(5)));
```

### Batch scheduling

It is possible to schedule a batch of executions at once. This is useful when scheduling a large number of executions for better performance.

```java
Stream<TaskInstance<?>> taskInstances = Stream.of(
MY_TASK.instance("my-task-1", 1),
MY_TASK.instance("my-task-2", 2),
MY_TASK.instance("my-task-3", 3));

scheduler.scheduleBatch(taskInstances, Instant.now());
```

**Note:** If any of the executions already exists, the scheduling will fail and an exception will be thrown.

### More examples

#### Plain Java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.stream.Stream;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -264,6 +265,16 @@ public <T> boolean scheduleIfNotExists(SchedulableInstance<T> schedulableInstanc
return this.delegate.scheduleIfNotExists(schedulableInstance);
}

@Override
public void scheduleBatch(Stream<TaskInstance<?>> taskInstances, Instant executionTime) {
this.delegate.scheduleBatch(taskInstances, executionTime);
}

@Override
public void scheduleBatch(Stream<SchedulableInstance<?>> schedulableInstances) {
this.delegate.scheduleBatch(schedulableInstances);
}

@Override
public <T> void schedule(TaskInstance<T> taskInstance, Instant executionTime) {
this.delegate.schedule(taskInstance, executionTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.SchedulableInstance;
import com.github.kagkarlsson.scheduler.task.SchedulableTaskInstance;
import com.github.kagkarlsson.scheduler.task.Task;
import com.github.kagkarlsson.scheduler.task.TaskInstance;
import com.github.kagkarlsson.scheduler.task.TaskInstanceId;
Expand All @@ -35,6 +36,7 @@
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Stream;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -116,6 +118,26 @@ <T> boolean schedule(
*/
<T> boolean scheduleIfNotExists(SchedulableInstance<T> schedulableInstance);

/**
* Schedule a batch of executions. If any of the executions already exists, the scheduling will
* fail and an exception will be thrown.
*
* @param taskInstances Task-instance, optionally with data
* @param executionTime Instant it should run
* @see java.time.Instant
* @see com.github.kagkarlsson.scheduler.task.TaskInstance
*/
void scheduleBatch(Stream<TaskInstance<?>> taskInstances, Instant executionTime);

/**
* Schedule a batch of executions. If any of the executions already exists, the scheduling will
* fail and an exception will be thrown.
*
* @param schedulableInstances Task-instances with invididual schedules
* @see com.github.kagkarlsson.scheduler.task.SchedulableInstance
*/
void scheduleBatch(Stream<SchedulableInstance<?>> schedulableInstances);

/**
* Update an existing execution to a new execution-time. If the execution does not exist or if it
* is currently running, an exception is thrown.
Expand Down Expand Up @@ -355,6 +377,8 @@ class StandardSchedulerClient implements SchedulerClient {
private final Clock clock;
private final SchedulerListeners schedulerListeners;

private final int BATCH_SIZE = 1000;

StandardSchedulerClient(TaskRepository taskRepository, Clock clock) {
this(taskRepository, SchedulerListeners.NOOP, clock);
}
Expand Down Expand Up @@ -389,6 +413,20 @@ public <T> boolean scheduleIfNotExists(SchedulableInstance<T> schedulableInstanc
schedulableInstance.getNextExecutionTime(clock.now()));
}

@Override
public void scheduleBatch(Stream<TaskInstance<?>> taskInstances, Instant executionTime) {
Stream<SchedulableInstance<?>> schedulableInstances =
taskInstances.map(
taskInstance -> new SchedulableTaskInstance<>(taskInstance, executionTime));
scheduleBatch(schedulableInstances);
}

@Override
public void scheduleBatch(Stream<SchedulableInstance<?>> schedulableInstances) {
StreamUtils.chunkStream(schedulableInstances, BATCH_SIZE)
.forEach(taskRepository::createBatch);
}

@Override
public <T> void schedule(SchedulableInstance<T> schedulableInstance) {
schedule(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.github.kagkarlsson.scheduler;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Stream;

public class StreamUtils {

public static <T> Stream<List<T>> chunkStream(Stream<T> stream, int chunkSize) {
if (chunkSize <= 0) {
throw new IllegalArgumentException("Chunk size must be greater than 0");
}

Iterator<T> iterator = stream.iterator();
return Stream.generate(
() -> {
List<T> chunk = new ArrayList<>();
for (int i = 0; i < chunkSize && iterator.hasNext(); i++) {
chunk.add(iterator.next());
}
return chunk;
})
.takeWhile(chunk -> !chunk.isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public interface TaskRepository {

List<Execution> getDue(Instant now, int limit);

void createBatch(List<SchedulableInstance<?>> executions);

Instant replace(Execution toBeReplaced, SchedulableInstance<?> newInstance);

void getScheduledExecutions(ScheduledExecutionsFilter filter, Consumer<Execution> consumer);
Expand Down Expand Up @@ -71,7 +73,6 @@ boolean reschedule(
default Optional<Execution> getExecution(TaskInstanceId taskInstance) {
return getExecution(taskInstance.getTaskName(), taskInstance.getId());
}
;

int removeExecutions(String taskName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package com.github.kagkarlsson.scheduler.exceptions;

public abstract class DbSchedulerException extends RuntimeException {
static final long serialVersionUID = -2132850112553296790L;
private static final long serialVersionUID = -2132850112553296790L;

public DbSchedulerException(String message) {
super(message);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.github.kagkarlsson.scheduler.exceptions;

public class TaskBatchException extends DbSchedulerException {
private static final long serialVersionUID = -2132850112553296792L;

public TaskBatchException(String message) {
super(message);
}

public TaskBatchException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package com.github.kagkarlsson.scheduler.exceptions;

public class TaskInstanceException extends DbSchedulerException {
static final long serialVersionUID = -2132850112553296790L;
private static final long serialVersionUID = -2132850112553296791L;
private static final String TASK_NAME_INSTANCE_MESSAGE_PART = " (task name: %s, instance id: %s)";

private final String taskName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.github.kagkarlsson.scheduler.TaskResolver;
import com.github.kagkarlsson.scheduler.TaskResolver.UnresolvedTask;
import com.github.kagkarlsson.scheduler.exceptions.ExecutionException;
import com.github.kagkarlsson.scheduler.exceptions.TaskBatchException;
import com.github.kagkarlsson.scheduler.exceptions.TaskInstanceException;
import com.github.kagkarlsson.scheduler.serializer.Serializer;
import com.github.kagkarlsson.scheduler.task.Execution;
Expand Down Expand Up @@ -144,8 +145,11 @@ protected JdbcTaskRepository(
this.jdbcCustomization = jdbcCustomization;
this.orderByPriority = orderByPriority;
this.clock = clock;
this.insertQuery = getInsertQuery(tableName);
}

private final String insertQuery;

@Override
public boolean createIfNotExists(SchedulableInstance instance) {
final TaskInstance taskInstance = instance.getTaskInstance();
Expand All @@ -158,30 +162,7 @@ public boolean createIfNotExists(SchedulableInstance instance) {
return false;
}

// FIXLATER: replace with some sort of generic SQL-builder, possibly extend micro-jdbc
// with execute(query-and-pss) and have builder return that..
jdbcRunner.execute(
"insert into "
+ tableName
+ "(task_name, task_instance, task_data, execution_time, picked, version"
+ (orderByPriority ? ", priority" : "")
+ ") values(?, ?, ?, ?, ?, ? "
+ (orderByPriority ? ", ?" : "")
+ ")",
(PreparedStatement p) -> {
int parameterIndex = 1;
p.setString(parameterIndex++, taskInstance.getTaskName());
p.setString(parameterIndex++, taskInstance.getId());
jdbcCustomization.setTaskData(
p, parameterIndex++, serializer.serialize(taskInstance.getData()));
jdbcCustomization.setInstant(
p, parameterIndex++, instance.getNextExecutionTime(clock.now()));
p.setBoolean(parameterIndex++, false);
p.setLong(parameterIndex++, 1L);
if (orderByPriority) {
p.setInt(parameterIndex++, taskInstance.getPriority());
}
});
jdbcRunner.execute(insertQuery, (PreparedStatement p) -> setInsertParameters(instance, p));
return true;

} catch (SQLRuntimeException e) {
Expand All @@ -196,6 +177,43 @@ public boolean createIfNotExists(SchedulableInstance instance) {
}
}

@Override
public void createBatch(List<SchedulableInstance<?>> executions) {
try {
jdbcRunner.executeBatch(insertQuery, executions, this::setInsertParameters);
} catch (SQLRuntimeException e) {
LOG.debug("Failed to create all executions. Some might already exist.", e);
throw new TaskBatchException("Failed to create all executions.", e);
}
}

private void setInsertParameters(SchedulableInstance<?> value, PreparedStatement ps)
throws SQLException {
var taskInstance = value.getTaskInstance();
int index = 0;
ps.setString(++index, taskInstance.getTaskName());
ps.setString(++index, taskInstance.getId());
jdbcCustomization.setTaskData(ps, ++index, serializer.serialize(taskInstance.getData()));
jdbcCustomization.setInstant(ps, ++index, value.getNextExecutionTime(clock.now()));
ps.setBoolean(++index, false);
ps.setLong(++index, 1L);
if (orderByPriority) {
ps.setInt(++index, taskInstance.getPriority());
}
}

private String getInsertQuery(String tableName) {
// FIXLATER: replace with some sort of generic SQL-builder, possibly extend micro-jdbc
// with execute(query-and-pss) and have builder return that..
return "insert into "
+ tableName
+ "(task_name, task_instance, task_data, execution_time, picked, version"
+ (orderByPriority ? ", priority" : "")
+ ") values(?, ?, ?, ?, ?, ? "
+ (orderByPriority ? ", ?" : "")
+ ")";
}

/**
* Instead of doing delete+insert, we allow updating an existing execution will all new fields
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
import static org.junit.Assert.assertThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import co.unruly.matchers.OptionalMatchers;
import com.github.kagkarlsson.scheduler.exceptions.TaskBatchException;
import com.github.kagkarlsson.scheduler.helper.TestableRegistry;
import com.github.kagkarlsson.scheduler.helper.TimeHelper;
import com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepository;
Expand Down Expand Up @@ -80,6 +82,36 @@ public void test_createIfNotExists() {
assertTrue(taskRepository.createIfNotExists(new SchedulableTaskInstance<>(instance2, now)));
}

@Test
public void test_createBatch() {
Instant now = TimeHelper.truncatedInstantNow();
TaskInstance<Void> instance1 = oneTimeTask.instance("id1");
TaskInstance<Void> instance2 = oneTimeTask.instance("id2");
List<SchedulableInstance<?>> executions =
List.of(
new SchedulableTaskInstance<>(instance1, now),
new SchedulableTaskInstance<>(instance2, now));

taskRepository.createBatch(executions);

assertTrue(taskRepository.getExecution(instance1).isPresent());
assertTrue(taskRepository.getExecution(instance2).isPresent());
}

@Test
public void test_createBatch_fails_if_any_execution_already_exists() {
Instant now = TimeHelper.truncatedInstantNow();
TaskInstance<Void> instance1 = oneTimeTask.instance("id1");
TaskInstance<Void> instance2 = oneTimeTask.instance("id2");
List<SchedulableInstance<?>> executions =
List.of(
new SchedulableTaskInstance<>(instance1, now),
new SchedulableTaskInstance<>(instance2, now));
taskRepository.createIfNotExists(new SchedulableTaskInstance<>(instance2, now));

assertThrows(TaskBatchException.class, () -> taskRepository.createBatch(executions));
}

@Test
public void test_replace() {
Instant now = TimeHelper.truncatedInstantNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -99,6 +100,17 @@ public void client_should_be_able_to_schedule_executions() {
assertThat(onetimeTaskHandlerA.timesExecuted.get(), CoreMatchers.is(2));
}

@Test
public void client_should_be_able_to_schedule_batch_executions() {
SchedulerClient client = create(DB.getDataSource()).build();

client.scheduleBatch(
Stream.of(oneTimeTaskA.instance("1"), oneTimeTaskA.instance("2")), settableClock.now());
scheduler.runAnyDueExecutions();

assertThat(onetimeTaskHandlerA.timesExecuted.get(), CoreMatchers.is(2));
}

@Test
public void should_be_able_to_schedule_other_executions_from_an_executionhandler() {
scheduler.schedule(scheduleAnotherTask.instance("1"), settableClock.now());
Expand Down

0 comments on commit 4b3c9a7

Please sign in to comment.