diff --git a/README.md b/README.md index c859ce6..bbccb51 100644 --- a/README.md +++ b/README.md @@ -8,30 +8,35 @@ This bundle provides interfaces for registering and handle scheduled tasks withi [![License](https://poser.okvpn.org/okvpn/cron-bundle/license)](https://packagist.org/packages/okvpn/cron-bundle) ## Purpose -This is a more simpler alternative of existing cron bundle without doctrine deps. +This is a simpler alternative of existing cron bundle without doctrine deps. Here also added support middleware for customization handling cron jobs across a cluster install: (Send jobs to message queue, like Symfony Messenger; locking; etc.). -This allow to limit the number of parallel running processes and prioritized it. +This allows to limit the number of parallel running processes and prioritized it. Features -------- - Not need doctrine/database. - Docker friendly, runs as background command without `crond`. +- Schedule tasks with one-millisecond precision. +- More ways to randomize crons with `@random 3600` and `jitter`. - Integration with Symfony Messenger. - Load a cron job from a different storage (config.yml, tagged services, commands). -- Support many engines to run cron (in parallel process, message queue, consistently), allow to use `random` expression. +- Support many engines to run cron (in parallel process, message queue, consistently). - Support many types of cron handlers/command: (services, symfony commands, UNIX shell commands). +- Can be used along with timers, subscriber and async I/O with React EventLoop, like Redis subscriber [clue/redis-react](https://github.com/clue/reactphp-redis). - Middleware and customization. ## Table of Contents - [Install](#install) + - [Commands](#commands) - [Registration a new scheduled task](#registration-a-new-scheduled-task) - [Configuration](#full-configuration-reference) - [Symfony Messenger Integration](#handle-cron-jobs-via-symfony-messenger) - [Your own Scheduled Tasks Loader](#your-own-scheduled-tasks-loaders) - [Handling cron jobs across a cluster](#handling-cron-jobs-across-a-cluster-or-custom-message-queue) + - [Use ReactPHP EventLoop](#use-reactphp-eventloop) Install ------ @@ -54,7 +59,7 @@ return [ ## Quick Usage -You can use `AsCron` attribute for autoconfigure. +You can use `AsCron` or `AsPeriodicTask` attribute for autoconfigure. ```php addOption('command', null, InputOption::VALUE_OPTIONAL, 'Run only selected command') ->addOption('demand', null, InputOption::VALUE_NONE, 'Start cron scheduler every one minute without exit') ->addOption('group', null, InputOption::VALUE_OPTIONAL | InputOption::VALUE_IS_ARRAY, 'Run schedules for specific groups.') - ->addOption('time-limit', null, InputOption::VALUE_OPTIONAL, 'Run cron scheduler during this time (sec.)'); + ->addOption('time-limit', null, InputOption::VALUE_OPTIONAL, 'Run cron scheduler during this time (sec.)') + ->addOption('dry-run', null, InputOption::VALUE_NONE, 'Debug periodical tasks without execution it.'); } /** @@ -83,33 +84,38 @@ protected function executeLoop(InputInterface $input, OutputInterface $output): { $output->writeln('Run scheduler without exit'); + $loop = $this->scheduleLoop; if (($timeLimit = $input->getOption('time-limit')) > 0) { - $this->scheduleLoop->addTimer((int)$timeLimit, function () { - $this->scheduleLoop->stop(); + $loop->addTimer((int)$timeLimit, static function () use ($loop) { + $loop->stop(); }); } - $schedulerRunner = function () use ($input, $output) { + $test = 0; + $schedulerRunner = function () use ($input, $output, $loop, &$test) { $runAt = \microtime(true); - if ($this->scheduleLoop instanceof ReactLoopAdapter) { - $this->scheduleLoop->setDefaultLoopTime($this->getCurrentDate()); + if ($loop instanceof ReactLoopAdapter) { + $loop->setDefaultLoopTime($this->getCurrentDate()); } + if ($runAt - $test < 2) { + $output->writeln("ERROR"); + } + $test = $runAt; + $this->scheduler($input, $output); - $output->writeln(sprintf('All schedule tasks completed in %.3f seconds', \microtime(true) - $runAt), OutputInterface::VERBOSITY_VERBOSE); - if ($this->scheduleLoop instanceof ReactLoopAdapter) { - $this->scheduleLoop->setDefaultLoopTime(); + $output->writeln(sprintf('[%s] All schedule tasks completed in %.3f seconds', $this->getCurrentDate()->format('Y-m-d H:i:s.u'), \microtime(true) - $runAt), OutputInterface::VERBOSITY_VERBOSE); + if ($loop instanceof ReactLoopAdapter) { + $loop->setDefaultLoopTime(); } }; - if (null !== $this->dispatcher) { - $this->dispatcher->dispatch(new StartLoopEvent($this->scheduleLoop), StartLoopEvent::START_LOOP); - } + $this->dispatchLoopEvent(LoopEvent::LOOP_INIT); - $delayRun = 60 - fmod((float)$this->getCurrentDate()->format('U.u'), 60.0); - $this->scheduleLoop->addTimer($delayRun, function () use ($schedulerRunner) { - $this->scheduleLoop->futureTick($schedulerRunner); - $this->scheduleLoop->addPeriodicTimer(60, $schedulerRunner); + $delayRun = 60.0 - fmod((float)$this->getCurrentDate()->format('U.u'), 60.0); + $loop->addTimer($delayRun, static function () use ($schedulerRunner, $loop) { + $loop->futureTick($schedulerRunner); + $loop->addPeriodicTimer(60, $schedulerRunner); }); $this->scheduleLoop->run(); @@ -132,12 +138,14 @@ protected function scheduler(InputInterface $input, OutputInterface $output): vo $now = $this->getCurrentDate(); $roundTime = (int)(round($now->getTimestamp()/60)*60); - $options['now'] = new \DateTimeImmutable('@'.$roundTime, $now->getTimezone()); $options['demand'] = $input->getOption('demand'); + $options['dry-run'] = $input->getOption('dry-run'); - $envStamp = new EnvironmentStamp($options); + $envStamp = new EnvironmentStamp($options + ['now' => new \DateTimeImmutable('@'.$roundTime, $now->getTimezone()), 'dispatch-loop' => null !== $this->dispatcher]); $loggerStamp = $this->createLoggerStamp($output); + $this->dispatchLoopEvent(LoopEvent::LOOP_START); + foreach ($this->loader->getSchedules($options) as $schedule) { if (null !== $command && $schedule->getCommand() !== $command) { continue; @@ -150,6 +158,8 @@ protected function scheduler(InputInterface $input, OutputInterface $output): vo $this->scheduleRunner->execute($schedule); } + + $this->dispatchLoopEvent(LoopEvent::LOOP_END); } protected function createLoggerStamp(OutputInterface $output) @@ -166,4 +176,11 @@ protected function getCurrentDate(): \DateTimeImmutable return $now; } + + protected function dispatchLoopEvent(string $name): void + { + if (null !== $this->dispatcher && null !== $this->scheduleLoop) { + $this->dispatcher->dispatch(new LoopEvent($this->scheduleLoop), $name); + } + } } diff --git a/src/Event/StartLoopEvent.php b/src/Event/LoopEvent.php similarity index 50% rename from src/Event/StartLoopEvent.php rename to src/Event/LoopEvent.php index 3d00cda..2a7bff8 100644 --- a/src/Event/StartLoopEvent.php +++ b/src/Event/LoopEvent.php @@ -7,9 +7,22 @@ use Okvpn\Bundle\CronBundle\Runner\ScheduleLoopInterface; use Symfony\Contracts\EventDispatcher\Event; -class StartLoopEvent extends Event +class LoopEvent extends Event { - public const START_LOOP = 'startLoop'; + /** + * Dispatch on init event loop, before $loop->run() + */ + public const LOOP_INIT = 'loopInit'; + + /** + * Dispatch before running event loops. Executed every minutes + */ + public const LOOP_START = 'loopStart'; + + /** + * Dispatch after running event loops. Executed every minutes + */ + public const LOOP_END = 'loopEnd'; private $loop; diff --git a/src/Middleware/CronMiddlewareEngine.php b/src/Middleware/CronMiddlewareEngine.php index 15652aa..4d54d4f 100644 --- a/src/Middleware/CronMiddlewareEngine.php +++ b/src/Middleware/CronMiddlewareEngine.php @@ -23,7 +23,6 @@ final class CronMiddlewareEngine implements MiddlewareEngineInterface /** @var ScheduleLoopInterface|null */ private $scheduleLoop; - private $lastLoopId = null; private $lastLoopTasks = []; private $timers; @@ -41,59 +40,88 @@ public function __construct(CronChecker $checker, string $timeZone = null, PsrCl */ public function handle(ScheduleEnvelope $envelope, StackInterface $stack): ScheduleEnvelope { + $env = $envelope->has(EnvironmentStamp::class) ? $envelope->get(EnvironmentStamp::class)->toArray() : []; + // For testing usage. drops next middlewares + if ($dryRun = ($env['dry-run'] ?? false)) { + $stack->end(); + } + if (!$stamp = $envelope->get(PeriodicalStampInterface::class)) { - ET::info($envelope, "{{ task }} > Run schedule task."); + $dryRun ? null : ET::info($envelope, "{{ task }} > Run schedule task."); return $stack->next()->handle($envelope, $stack); } - $env = $envelope->get(EnvironmentStamp::class); - $useDemand = $env && $env->get('demand'); - $noLoop = $env ? $env->get('no-loop') ?? false : false; - $now = $env ? $env->get('now') : null; - $now = $now instanceof \DateTimeInterface ? $now : $this->getNow(); + $useDemand = $env['demand'] ?? false; + $noLoop = $env['no-loop'] ?? false; + $now = ($env['now'] ?? null) instanceof \DateTimeInterface ? $env['now'] : $this->getNow(); return true === $useDemand && null !== $this->scheduleLoop && false === $noLoop ? $this->handleDemand($now, $envelope, $stamp, $stack) : $this->handleNoDemand($now, $envelope, $stamp, $stack); } - private function handleDemand(\DateTimeInterface $loopTime, ScheduleEnvelope $envelope, PeriodicalStampInterface $stamp, StackInterface $stack): ScheduleEnvelope + private function handleDemand(\DateTimeInterface $now, ScheduleEnvelope $envelope, PeriodicalStampInterface $stamp, StackInterface $stack): ScheduleEnvelope { - $loopId = (int)(60 * floor($loopTime->getTimestamp()/60)); - if ($this->lastLoopId !== $loopId) { - $this->cancelOrphanTasks(); - $this->lastLoopId = $loopId; - } + $this->lastLoopTasks[$hash = ET::calculateHash($envelope)] = 1; + if ($this->timers->hasTimer($hash)) { + list($timer, $prevEnvelope) = $this->timers->getTimer($hash); + if ((string)$prevEnvelope->get(PeriodicalStampInterface::class) !== (string) $stamp) { + $this->timers->remove($hash); + $this->scheduleLoop->cancelTimer($timer); - $taskHash = ET::calculateHash($envelope); - $this->lastLoopTasks[$taskHash] = 1; - if ($this->timers->hasTimer($taskHash)) { - return $stack->end()->handle($envelope, $stack); + ET::notice($envelope, "{{ task }} > Cron expression has been changed."); + } else { + $this->timers->refreshEnvelope($envelope); + return $stack->end()->handle($envelope, $stack); + } } + $prevEnvelope = $envelope; + $timers = $this->timers; $loop = $this->scheduleLoop; + $nextTime = $stamp->getNextRunDate($now = $loop->now()); - $this->timers->attach($envelope, $runner = static function () use ($envelope, $stack, $stamp, $loop, &$runner) { + $timers->attach($envelope, $runner = static function (/* $periodical = true*/) use ($timers, $prevEnvelope, $hash, $stack, $stamp, $loop, &$runner): ScheduleEnvelope { + if (null === ($envelope = $timers->findByHash($hash))) { + ET::notice($prevEnvelope, "{{ task }} > Task canceled. Someone detached an envelope from timers storage"); + return ($clone = clone $stack)->end()->handle($envelope->without(PeriodicalStampInterface::class), $clone); + } + ET::info($envelope, "{{ task }} > Run schedule task."); - ($clone = clone $stack)->next()->handle($envelope->without(PeriodicalStampInterface::class), $clone); + try { + $result = ($clone = clone $stack)->next()->handle($envelope->without(PeriodicalStampInterface::class), $clone); + } catch (\Throwable $e) { + $result = $envelope; + ET::error($envelope, "{{ task }} > Task ERRORED. {$e->getMessage()}", ['e' => $e]); + } - $nextTime = $stamp->getNextRunDate($now = $loop->now()); - $delay = (float) $nextTime->format('U.u') - (float) $now->format('U.u'); - $loop->addTimer($delay, $runner); + if (false !== (\func_get_args()[0] ?? null)) { + $nextTime = $stamp->getNextRunDate($now = $loop->now()); + $delay = (float) $nextTime->format('U.u') - (float) $now->format('U.u'); - ET::debug($envelope, "{{ task }} > was scheduled with delay $delay sec."); + $loop->addTimer($delay, $runner); + ET::debug($envelope, \sprintf("{{ task }} > was scheduled with delay %.6f sec.", $delay)); + } + + return $result; }); $delay = (float) $nextTime->format('U.u') - (float) $now->format('U.u'); - ET::debug($envelope, "{{ task }} > was scheduled with delay $delay sec."); + ET::debug($envelope, \sprintf("{{ task }} > was scheduled with delay %.6f sec.", $delay)); $loop->addTimer($delay, $runner); return ($clone = clone $stack)->end()->handle($envelope, $clone); } + public function onLoopEnd(): void + { + $this->cancelOrphanTasks(); + $this->lastLoopTasks = []; + } + private function handleNoDemand(\DateTimeInterface $now, ScheduleEnvelope $envelope, PeriodicalStampInterface $stamp, StackInterface $stack): ScheduleEnvelope { if ($stamp instanceof ScheduleStamp) { @@ -104,11 +132,11 @@ private function handleNoDemand(\DateTimeInterface $now, ScheduleEnvelope $envel return $stack->end()->handle($envelope, $stack); } } else { - $currentTime = (int)(60 * floor($now->getTimestamp()/60)); + $currentTime = (int)(60 * \floor($now->getTimestamp()/60)); $now = new \DateTimeImmutable('@'.($currentTime-1), $now->getTimezone()); $nextRun = $stamp->getNextRunDate($now); - $nextRun = (int)(60 * floor($nextRun->getTimestamp()/60)); + $nextRun = (int)(60 * \floor($nextRun->getTimestamp()/60)); $isDue = $nextRun === $currentTime; } @@ -132,8 +160,6 @@ private function cancelOrphanTasks(): void $this->timers->remove($hash); } } - - $this->lastLoopTasks = []; } private function getNow(): \DateTimeImmutable diff --git a/src/Resources/config/services.yml b/src/Resources/config/services.yml index 8a4fed9..6d7fa26 100644 --- a/src/Resources/config/services.yml +++ b/src/Resources/config/services.yml @@ -31,6 +31,7 @@ services: - '@Okvpn\Bundle\CronBundle\Runner\TimerStorage' tags: - { name: okvpn_cron.middleware, priority: 20 } + - { name: kernel.event_listener, event: 'loopEnd' } okvpn_cron.middleware.execute_service: class: Okvpn\Bundle\CronBundle\Middleware\ServiceInvokeEngine diff --git a/src/Runner/StandaloneLoop.php b/src/Runner/StandaloneLoop.php index a18d977..f061559 100644 --- a/src/Runner/StandaloneLoop.php +++ b/src/Runner/StandaloneLoop.php @@ -24,7 +24,7 @@ public function __construct(ClockInterface $clock = null, string $timeZone = nul */ public function addTimer(float $interval, \Closure $callback): void { - $this->timers[] = [$callback, $interval+$this->getUnix(), 0]; + $this->timers[] = [$callback, $interval + $this->getUnix(), 0]; $this->needSort = true; } @@ -33,7 +33,7 @@ public function addTimer(float $interval, \Closure $callback): void */ public function addPeriodicTimer(float $interval, \Closure $callback): void { - $this->timers[] = [$callback, $interval+$this->getUnix(), $interval]; + $this->timers[] = [$callback, $interval + $this->getUnix(), $interval]; $this->needSort = true; } @@ -73,7 +73,7 @@ public function run(): void foreach ($this->timers as $i => $timer) { $unix = $this->getUnix(); if ($timer[1] < $unix) { - \call_user_func($timer[0]); + $this->execute($timer); if ($timer[2] === 0) { unset($this->timers[$i]); } else { @@ -136,4 +136,9 @@ protected function sleep(float $seconds): void { $this->clock->sleep($seconds); } + + protected function execute(array $timer): void + { + \call_user_func($timer[0]); + } } diff --git a/src/Runner/TimerStorage.php b/src/Runner/TimerStorage.php index 542d35c..9151889 100644 --- a/src/Runner/TimerStorage.php +++ b/src/Runner/TimerStorage.php @@ -22,10 +22,20 @@ public function attach(ScheduleEnvelope $envelope, \Closure $runner): void $this->timers[ET::calculateHash($envelope)] = [$runner, $envelope]; } + public function refreshEnvelope(ScheduleEnvelope $envelope): void + { + if ($this->hasTimer($hash = ET::calculateHash($envelope))) { + $this->timers[$hash][1] = $envelope; + } + } + public function remove($envelope): void { $envelope = $envelope instanceof ScheduleEnvelope ? ET::calculateHash($envelope) : $envelope; - unset($this->timers[$envelope]); + + if (\is_string($envelope)) { + unset($this->timers[$envelope]); + } } /** @@ -53,7 +63,12 @@ public function hasTimer($envelope): bool { $envelope = $envelope instanceof ScheduleEnvelope ? ET::calculateHash($envelope) : $envelope; - return isset($this->timers[$envelope]); + return $envelope && isset($this->timers[$envelope]); + } + + public function findByHash(string $hash): ?ScheduleEnvelope + { + return $this->timers[$hash][1] ?? null; } public function find(string $command, /* array|string */ $args = null): ?ScheduleEnvelope