Skip to content

Commit

Permalink
Added ReactPHP EventLoop and standalone cron runner engine.
Browse files Browse the repository at this point in the history
  • Loading branch information
vtsykun committed Aug 19, 2023
1 parent 72b84b7 commit cc9e758
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 60 deletions.
54 changes: 48 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,35 @@ This bundle provides interfaces for registering and handle scheduled tasks withi
[![License](https://poser.okvpn.org/okvpn/cron-bundle/license)](https://packagist.org/packages/okvpn/cron-bundle)

## Purpose
This is a more simpler alternative of existing cron bundle without doctrine deps.
This is a simpler alternative of existing cron bundle without doctrine deps.
Here also added support middleware for customization handling cron jobs across a cluster install:
(Send jobs to message queue, like Symfony Messenger; locking; etc.).
This allow to limit the number of parallel running processes and prioritized it.
This allows to limit the number of parallel running processes and prioritized it.

Features
--------

- Not need doctrine/database.
- Docker friendly, runs as background command without `crond`.
- Schedule tasks with one-millisecond precision.
- More ways to randomize crons with `@random 3600` and `jitter`.
- Integration with Symfony Messenger.
- Load a cron job from a different storage (config.yml, tagged services, commands).
- Support many engines to run cron (in parallel process, message queue, consistently), allow to use `random` expression.
- Support many engines to run cron (in parallel process, message queue, consistently).
- Support many types of cron handlers/command: (services, symfony commands, UNIX shell commands).
- Can be used along with timers, subscriber and async I/O with React EventLoop, like Redis subscriber [clue/redis-react](https://github.com/clue/reactphp-redis).
- Middleware and customization.

## Table of Contents

- [Install](#install)
- [Commands](#commands)
- [Registration a new scheduled task](#registration-a-new-scheduled-task)
- [Configuration](#full-configuration-reference)
- [Symfony Messenger Integration](#handle-cron-jobs-via-symfony-messenger)
- [Your own Scheduled Tasks Loader](#your-own-scheduled-tasks-loaders)
- [Handling cron jobs across a cluster](#handling-cron-jobs-across-a-cluster-or-custom-message-queue)
- [Use ReactPHP EventLoop](#use-reactphp-eventloop)

Install
------
Expand All @@ -54,26 +59,36 @@ return [

## Quick Usage

You can use `AsCron` attribute for autoconfigure.
You can use `AsCron` or `AsPeriodicTask` attribute for autoconfigure.

```php
<?php declare(strict_types=1);

namespace App\Service;

use Okvpn\Bundle\CronBundle\Attribute\AsCron;
use Okvpn\Bundle\CronBundle\Attribute\AsPeriodicTask;

#[AsCron('*/5 * * * *')]
#[AsCron('*/5 * * * *', messenger: true)]
class SyncAppWorker
{
public function __invoke(array $arguments = []): void
{
// code
}
}

#[AsCron('*/10 * * * *', jitter: 60)]
class Sync2AppWorker { /* ... */ } // Run each 10 minutes with 60 sec random delay

#[AsCron('@random 3600')]
class Sync3AppWorker { /* ... */ } // Run with random 0-3600 sec

#[AsPeriodicTask('30 seconds', jitter: 5)]
class Sync4AppWorker { /* ... */ } // Run each 30 sec with 5 sec random delay.
```

### Commands
## Commands

Runs the current cron schedule

Expand All @@ -98,6 +113,14 @@ php bin/console okvpn:debug:cron --execute-one=7

![debug](docs/image1.png)

#### Dry run cron tasks.

```
php bin/console okvpn:cron --dry-run --demand -vvv
```

![debug](docs/img2.png)

### Cron Expression

A CRON expression syntax was take from lib [dragonmantank/cron-expressions](https://github.com/dragonmantank/cron-expression#cron-expressions)
Expand Down Expand Up @@ -235,6 +258,12 @@ okvpn_cron:
cron: "*/30 * * * *"
async: true
arguments: { '--transport': 15 } # command arguments or options
jitter: 60 # 60 sec random delay

-
command: 'app:cron:wrfda-grib2' # run the command with 20 sec interval and 10 sec random delay
interval: "20 seconds"
jitter: 10
```
## Full Configuration Reference
Expand All @@ -257,6 +286,9 @@ okvpn_cron:
# Stamps it's markers that will add to each tasks.
with_stamps:
- 'Packagist\WebBundle\Cron\WorkerStamp'

# service name for run cron in demand (Okvpn\Bundle\CronBundle\Runner\ScheduleLoopInterface)
loop_engine: ~

tasks: # Defined tasks via configuration
-
Expand Down Expand Up @@ -417,6 +449,16 @@ See example of customization
[one](https://github.com/vtsykun/packeton/tree/master/src/Cron/WorkerMiddleware.php),
[two](https://github.com/vtsykun/packeton/tree/master/src/Cron/CronWorker.php)


## Use ReactPHP EventLoop

You can add your own periodic tasks directly to `Loop`.
The bundle uses a simple wrapper `Okvpn\Bundle\CronBundle\Runner\ScheduleLoopInterface` for the library

```php

```

License
-------

Expand Down
Binary file added docs/img2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
55 changes: 36 additions & 19 deletions src/Command/CronCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Okvpn\Bundle\CronBundle\Command;

use Okvpn\Bundle\CronBundle\Event\StartLoopEvent;
use Okvpn\Bundle\CronBundle\Event\LoopEvent;
use Okvpn\Bundle\CronBundle\Loader\ScheduleLoaderInterface;
use Okvpn\Bundle\CronBundle\Logger\CronConsoleLogger;
use Okvpn\Bundle\CronBundle\Model\EnvironmentStamp;
Expand Down Expand Up @@ -62,7 +62,8 @@ protected function configure(): void
->addOption('command', null, InputOption::VALUE_OPTIONAL, 'Run only selected command')
->addOption('demand', null, InputOption::VALUE_NONE, 'Start cron scheduler every one minute without exit')
->addOption('group', null, InputOption::VALUE_OPTIONAL | InputOption::VALUE_IS_ARRAY, 'Run schedules for specific groups.')
->addOption('time-limit', null, InputOption::VALUE_OPTIONAL, 'Run cron scheduler during this time (sec.)');
->addOption('time-limit', null, InputOption::VALUE_OPTIONAL, 'Run cron scheduler during this time (sec.)')
->addOption('dry-run', null, InputOption::VALUE_NONE, 'Debug periodical tasks without execution it.');
}

/**
Expand All @@ -83,33 +84,38 @@ protected function executeLoop(InputInterface $input, OutputInterface $output):
{
$output->writeln('Run scheduler without exit');

$loop = $this->scheduleLoop;
if (($timeLimit = $input->getOption('time-limit')) > 0) {
$this->scheduleLoop->addTimer((int)$timeLimit, function () {
$this->scheduleLoop->stop();
$loop->addTimer((int)$timeLimit, static function () use ($loop) {
$loop->stop();
});
}

$schedulerRunner = function () use ($input, $output) {
$test = 0;
$schedulerRunner = function () use ($input, $output, $loop, &$test) {
$runAt = \microtime(true);
if ($this->scheduleLoop instanceof ReactLoopAdapter) {
$this->scheduleLoop->setDefaultLoopTime($this->getCurrentDate());
if ($loop instanceof ReactLoopAdapter) {
$loop->setDefaultLoopTime($this->getCurrentDate());
}

if ($runAt - $test < 2) {
$output->writeln("ERROR");
}
$test = $runAt;

$this->scheduler($input, $output);
$output->writeln(sprintf('All schedule tasks completed in %.3f seconds', \microtime(true) - $runAt), OutputInterface::VERBOSITY_VERBOSE);
if ($this->scheduleLoop instanceof ReactLoopAdapter) {
$this->scheduleLoop->setDefaultLoopTime();
$output->writeln(sprintf('[%s] All schedule tasks completed in %.3f seconds', $this->getCurrentDate()->format('Y-m-d H:i:s.u'), \microtime(true) - $runAt), OutputInterface::VERBOSITY_VERBOSE);
if ($loop instanceof ReactLoopAdapter) {
$loop->setDefaultLoopTime();
}
};

if (null !== $this->dispatcher) {
$this->dispatcher->dispatch(new StartLoopEvent($this->scheduleLoop), StartLoopEvent::START_LOOP);
}
$this->dispatchLoopEvent(LoopEvent::LOOP_INIT);

$delayRun = 60 - fmod((float)$this->getCurrentDate()->format('U.u'), 60.0);
$this->scheduleLoop->addTimer($delayRun, function () use ($schedulerRunner) {
$this->scheduleLoop->futureTick($schedulerRunner);
$this->scheduleLoop->addPeriodicTimer(60, $schedulerRunner);
$delayRun = 60.0 - fmod((float)$this->getCurrentDate()->format('U.u'), 60.0);
$loop->addTimer($delayRun, static function () use ($schedulerRunner, $loop) {
$loop->futureTick($schedulerRunner);
$loop->addPeriodicTimer(60, $schedulerRunner);
});

$this->scheduleLoop->run();
Expand All @@ -132,12 +138,14 @@ protected function scheduler(InputInterface $input, OutputInterface $output): vo

$now = $this->getCurrentDate();
$roundTime = (int)(round($now->getTimestamp()/60)*60);
$options['now'] = new \DateTimeImmutable('@'.$roundTime, $now->getTimezone());
$options['demand'] = $input->getOption('demand');
$options['dry-run'] = $input->getOption('dry-run');

$envStamp = new EnvironmentStamp($options);
$envStamp = new EnvironmentStamp($options + ['now' => new \DateTimeImmutable('@'.$roundTime, $now->getTimezone()), 'dispatch-loop' => null !== $this->dispatcher]);
$loggerStamp = $this->createLoggerStamp($output);

$this->dispatchLoopEvent(LoopEvent::LOOP_START);

foreach ($this->loader->getSchedules($options) as $schedule) {
if (null !== $command && $schedule->getCommand() !== $command) {
continue;
Expand All @@ -150,6 +158,8 @@ protected function scheduler(InputInterface $input, OutputInterface $output): vo

$this->scheduleRunner->execute($schedule);
}

$this->dispatchLoopEvent(LoopEvent::LOOP_END);
}

protected function createLoggerStamp(OutputInterface $output)
Expand All @@ -166,4 +176,11 @@ protected function getCurrentDate(): \DateTimeImmutable

return $now;
}

protected function dispatchLoopEvent(string $name): void
{
if (null !== $this->dispatcher && null !== $this->scheduleLoop) {
$this->dispatcher->dispatch(new LoopEvent($this->scheduleLoop), $name);
}
}
}
17 changes: 15 additions & 2 deletions src/Event/StartLoopEvent.php → src/Event/LoopEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,22 @@
use Okvpn\Bundle\CronBundle\Runner\ScheduleLoopInterface;
use Symfony\Contracts\EventDispatcher\Event;

class StartLoopEvent extends Event
class LoopEvent extends Event
{
public const START_LOOP = 'startLoop';
/**
* Dispatch on init event loop, before $loop->run()
*/
public const LOOP_INIT = 'loopInit';

/**
* Dispatch before running event loops. Executed every minutes
*/
public const LOOP_START = 'loopStart';

/**
* Dispatch after running event loops. Executed every minutes
*/
public const LOOP_END = 'loopEnd';

private $loop;

Expand Down
Loading

0 comments on commit cc9e758

Please sign in to comment.