Skip to content

Commit

Permalink
Try removing dependency on priority-column in table (to allow users t…
Browse files Browse the repository at this point in the history
…o upgrade without modifying the schema)
  • Loading branch information
kagkarlsson committed Oct 25, 2024
1 parent 397d70a commit cc6f76f
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public class FetchCandidates implements PollStrategy {
AtomicInteger currentGenerationNumber = new AtomicInteger(0);
private final int lowerLimit;
private final int upperLimit;
private final boolean priorityEnabled;

public FetchCandidates(
Executor executor,
Expand All @@ -59,8 +58,7 @@ public FetchCandidates(
Clock clock,
PollingStrategyConfig pollingStrategyConfig,
Runnable triggerCheckForNewExecutions,
HeartbeatConfig heartbeatConfig,
boolean priorityEnabled) {
HeartbeatConfig heartbeatConfig) {
this.executor = executor;
this.taskRepository = taskRepository;
this.schedulerClient = schedulerClient;
Expand All @@ -73,7 +71,6 @@ public FetchCandidates(
this.pollingStrategyConfig = pollingStrategyConfig;
this.triggerCheckForNewExecutions = triggerCheckForNewExecutions;
this.heartbeatConfig = heartbeatConfig;
this.priorityEnabled = priorityEnabled;
lowerLimit = pollingStrategyConfig.getLowerLimit(threadpoolSize);
// FIXLATER: this is not "upper limit", but rather nr of executions to get. those already in
// queue will become stale
Expand All @@ -87,8 +84,7 @@ public void run() {
// Fetch new candidates for execution. Old ones still in ExecutorService will become stale and
// be discarded
final int executionsToFetch = upperLimit;
List<Execution> fetchedDueExecutions =
taskRepository.getDue(now, executionsToFetch, priorityEnabled);
List<Execution> fetchedDueExecutions = taskRepository.getDue(now, executionsToFetch);
LOG.trace(
"Fetched {} task instances due for execution at {}", fetchedDueExecutions.size(), now);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public class LockAndFetchCandidates implements PollStrategy {
private final int lowerLimit;
private final int upperLimit;
private AtomicBoolean moreExecutionsInDatabase = new AtomicBoolean(false);
private final boolean priorityEnabled;

public LockAndFetchCandidates(
Executor executor,
Expand All @@ -56,8 +55,7 @@ public LockAndFetchCandidates(
Clock clock,
PollingStrategyConfig pollingStrategyConfig,
Runnable triggerCheckForNewExecutions,
HeartbeatConfig maxAgeBeforeConsideredDead,
boolean priorityEnabled) {
HeartbeatConfig maxAgeBeforeConsideredDead) {
this.executor = executor;
this.taskRepository = taskRepository;
this.schedulerClient = schedulerClient;
Expand All @@ -70,7 +68,6 @@ public LockAndFetchCandidates(
this.pollingStrategyConfig = pollingStrategyConfig;
this.triggerCheckForNewExecutions = triggerCheckForNewExecutions;
this.maxAgeBeforeConsideredDead = maxAgeBeforeConsideredDead;
this.priorityEnabled = priorityEnabled;
lowerLimit = pollingStrategyConfig.getLowerLimit(threadpoolSize);
upperLimit = pollingStrategyConfig.getUpperLimit(threadpoolSize);
}
Expand All @@ -88,8 +85,7 @@ public void run() {
}

// FIXLATER: should it fetch here if not under lowerLimit? probably
List<Execution> pickedExecutions =
taskRepository.lockAndGetDue(now, executionsToFetch, priorityEnabled);
List<Execution> pickedExecutions = taskRepository.lockAndGetDue(now, executionsToFetch);
LOG.trace("Picked {} taskinstances due for execution", pickedExecutions.size());

// Shared indicator for if there are more due executions in the database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public class Scheduler implements SchedulerClient {
private final Waiter heartbeatWaiter;
final SettableSchedulerState schedulerState = new SettableSchedulerState();
final ConfigurableLogger failureLogger;
final boolean priorityEnabled;

protected Scheduler(
Clock clock,
Expand All @@ -91,8 +90,7 @@ protected Scheduler(
boolean logStackTrace,
List<OnStartup> onStartup,
ExecutorService dueExecutor,
ScheduledExecutorService housekeeperExecutor,
boolean priorityEnabled) {
ScheduledExecutorService housekeeperExecutor) {
this.clock = clock;
this.schedulerTaskRepository = schedulerTaskRepository;
this.taskResolver = taskResolver;
Expand All @@ -114,7 +112,6 @@ protected Scheduler(
this.housekeeperExecutor = housekeeperExecutor;
delegate = new StandardSchedulerClient(clientTaskRepository, this.schedulerListeners, clock);
this.failureLogger = ConfigurableLogger.create(LOG, logLevel, logStackTrace);
this.priorityEnabled = priorityEnabled;

if (pollingStrategyConfig.type == PollingStrategyConfig.Type.LOCK_AND_FETCH) {
schedulerTaskRepository.verifySupportsLockAndFetch();
Expand All @@ -132,8 +129,7 @@ protected Scheduler(
clock,
pollingStrategyConfig,
this::triggerCheckForDueExecutions,
heartbeatConfig,
priorityEnabled);
heartbeatConfig);
} else if (pollingStrategyConfig.type == PollingStrategyConfig.Type.FETCH) {
executeDueStrategy =
new FetchCandidates(
Expand All @@ -149,8 +145,7 @@ protected Scheduler(
clock,
pollingStrategyConfig,
this::triggerCheckForDueExecutions,
heartbeatConfig,
priorityEnabled);
heartbeatConfig);
} else {
throw new IllegalArgumentException(
"Unknown polling-strategy type: " + pollingStrategyConfig.type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,7 @@ public Scheduler build() {
logStackTrace,
startTasks,
candidateDueExecutor,
candidateHousekeeperExecutor,
enablePriority);
candidateHousekeeperExecutor);

if (enableImmediateExecution) {
scheduler.registerSchedulerListener(new ImmediateCheckForDueExecutions(scheduler, clock));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public interface TaskRepository {

boolean createIfNotExists(SchedulableInstance execution);

List<Execution> getDue(Instant now, int limit, boolean orderByPriority);
List<Execution> getDue(Instant now, int limit);

Instant replace(Execution toBeReplaced, SchedulableInstance newInstance);

Expand All @@ -37,7 +37,7 @@ void getScheduledExecutions(

List<Execution> lockAndFetchGeneric(Instant now, int limit);

List<Execution> lockAndGetDue(Instant now, int limit, boolean orderByPriority);
List<Execution> lockAndGetDue(Instant now, int limit);

void remove(Execution execution);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,18 +159,29 @@ public boolean createIfNotExists(SchedulableInstance instance) {
return false;
}

// FIXLATER: replace with some sort of generic SQL-builder, possibly extend micro-jdbc
// with execute(query-and-pss) and have builder return that..
jdbcRunner.execute(
"insert into "
+ tableName
+ "(task_name, task_instance, task_data, execution_time, picked, version, priority) values(?, ?, ?, ?, ?, ?, ?)",
+ "(task_name, task_instance, task_data, execution_time, picked, version"
+ (orderByPriority ? ", priority" : "")
+ ") values(?, ?, ?, ?, ?, ? "
+ (orderByPriority ? ", ?" : "")
+ ")",
(PreparedStatement p) -> {
p.setString(1, taskInstance.getTaskName());
p.setString(2, taskInstance.getId());
jdbcCustomization.setTaskData(p, 3, serializer.serialize(taskInstance.getData()));
jdbcCustomization.setInstant(p, 4, instance.getNextExecutionTime(clock.now()));
p.setBoolean(5, false);
p.setLong(6, 1L);
p.setInt(7, taskInstance.getPriority());
int parameterIndex = 1;
p.setString(parameterIndex++, taskInstance.getTaskName());
p.setString(parameterIndex++, taskInstance.getId());
jdbcCustomization.setTaskData(
p, parameterIndex++, serializer.serialize(taskInstance.getData()));
jdbcCustomization.setInstant(
p, parameterIndex++, instance.getNextExecutionTime(clock.now()));
p.setBoolean(parameterIndex++, false);
p.setLong(parameterIndex++, 1L);
if (orderByPriority) {
p.setInt(parameterIndex++, taskInstance.getPriority());
}
});
return true;

Expand Down Expand Up @@ -285,7 +296,7 @@ public void getScheduledExecutions(
}

@Override
public List<Execution> getDue(Instant now, int limit, boolean orderByPriority) {
public List<Execution> getDue(Instant now, int limit) {
LOG.trace("Using generic fetch-then-lock query");
final UnresolvedFilter unresolvedFilter = new UnresolvedFilter(taskResolver.getUnresolved());
String selectDueQuery =
Expand Down Expand Up @@ -397,7 +408,7 @@ private List<Execution> updateToPicked(
}

@Override
public List<Execution> lockAndGetDue(Instant now, int limit, boolean orderByPriority) {
public List<Execution> lockAndGetDue(Instant now, int limit) {
if (jdbcCustomization.supportsSingleStatementLockAndFetch()) {
LOG.trace("Using single-statement lock-and-fetch");
return jdbcCustomization.lockAndFetchSingleStatement(
Expand Down Expand Up @@ -787,7 +798,8 @@ public Void map(ResultSet rs) throws SQLException {
// default
Instant lastHeartbeat = jdbcCustomization.getInstant(rs, "last_heartbeat");
long version = rs.getLong("version");
int priority = rs.getInt("priority");

int priority = orderByPriority ? rs.getInt("priority") : 0;

Supplier dataSupplier =
memoize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ public class ManualScheduler extends Scheduler {
logStackTrace,
onStartup,
dueExecutor,
houseKeeperExecutor,
priorityEnabled);
houseKeeperExecutor);
this.clock = clock;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void scheduler_should_handle_dead_executions() {
new SchedulableTaskInstance<>(taskInstance, now.minus(Duration.ofDays(1)));
jdbcTaskRepository.createIfNotExists(execution1);

final List<Execution> due = jdbcTaskRepository.getDue(now, POLLING_LIMIT, false);
final List<Execution> due = jdbcTaskRepository.getDue(now, POLLING_LIMIT);
assertThat(due, Matchers.hasSize(1));
final Execution execution = due.get(0);
final Optional<Execution> pickedExecution = jdbcTaskRepository.pick(execution, now);
Expand All @@ -87,7 +87,7 @@ public void scheduler_should_handle_dead_executions() {
assertThat(rescheduled.get().picked, is(false));
assertThat(rescheduled.get().pickedBy, nullValue());

assertThat(jdbcTaskRepository.getDue(Instant.now(), POLLING_LIMIT, false), hasSize(1));
assertThat(jdbcTaskRepository.getDue(Instant.now(), POLLING_LIMIT), hasSize(1));
}

@Test
Expand Down
Loading

0 comments on commit cc6f76f

Please sign in to comment.