Skip to content

Commit

Permalink
Run health checks on the I/O pool.
Browse files Browse the repository at this point in the history
  • Loading branch information
JonathanLennox committed Mar 1, 2024
1 parent 877b475 commit 56599de
Showing 1 changed file with 44 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class JvbDoctor
/**
* Health check tasks map.
*/
private final Map<Bridge, ScheduledFuture<?>> tasks = new ConcurrentHashMap<>();
private final Map<Bridge, PeriodicHealthCheckTask> tasks = new ConcurrentHashMap<>();

private final HealthCheckListener listener;

Expand Down Expand Up @@ -95,7 +95,7 @@ synchronized public void shutdown()
@Override
public void bridgeRemoved(Bridge bridge)
{
ScheduledFuture<?> healthTask = tasks.remove(bridge);
PeriodicHealthCheckTask healthTask = tasks.remove(bridge);
if (healthTask == null)
{
logger.warn("Trying to remove a bridge that does not exist anymore: " + bridge);
Expand All @@ -104,7 +104,7 @@ public void bridgeRemoved(Bridge bridge)

logger.info("Stopping health-check task for: " + bridge);

healthTask.cancel(true);
healthTask.cancel();
}

@Override
Expand All @@ -120,14 +120,10 @@ public void bridgeAdded(Bridge bridge)
? new HealthCheckPresenceTask(bridge)
: new HealthCheckTask(bridge);

ScheduledFuture<?> healthTask
= TaskPools.getScheduledPool().scheduleAtFixedRate(
task,
healthCheckInterval,
healthCheckInterval,
TimeUnit.MILLISECONDS);
PeriodicHealthCheckTask periodicTask
= new PeriodicHealthCheckTask(task, healthCheckInterval);

tasks.put(bridge, healthTask);
tasks.put(bridge, periodicTask);

logger.info("Scheduled health-check task for: " + bridge);
}
Expand All @@ -137,6 +133,44 @@ public void bridgeIsShuttingDown(@NotNull Bridge bridge)
{
}

private static class PeriodicHealthCheckTask implements Runnable
{
private Runnable innerTask;

private final ScheduledFuture<?> future;
private Future<?> innerFuture;
private final Object lock = new Object();

private PeriodicHealthCheckTask(Runnable task, long healthCheckInterval)
{
innerTask = task;
future = TaskPools.getScheduledPool().scheduleAtFixedRate(
innerTaskRunnable,
healthCheckInterval,
healthCheckInterval,
TimeUnit.MILLISECONDS);
}

@Override
public void run()
{
innerFuture = TaskPools.getIoPool().submit(innerTaskRunnable);
}

private final Runnable innerTaskRunnable = () -> {
synchronized (lock) {
innerTask.run();
}
};

private void cancel() {
future.cancel(true);
if (innerFuture != null) {
innerFuture.cancel(true);
}
}
}

private class HealthCheckTask extends AbstractHealthCheckTask
{
private HealthCheckTask(Bridge bridge)
Expand Down

0 comments on commit 56599de

Please sign in to comment.