Skip to content

Commit

Permalink
Merge pull request #876 from catalyst/retry-handler
Browse files Browse the repository at this point in the history
feat: Smart retries with delay + retry count for cron trigger
  • Loading branch information
keevan authored Mar 5, 2024
2 parents 60d50dd + 533e149 commit eb25aa3
Show file tree
Hide file tree
Showing 13 changed files with 292 additions and 11 deletions.
6 changes: 6 additions & 0 deletions classes/local/execution/engine.php
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,12 @@ public function abort(?\Throwable $reason = null) {
if ($status !== self::STATUS_FINISHED && !in_array($status, self::STATUS_TERMINATORS)) {
$this->set_current_step($enginestep);
$enginestep->abort();
} else {
// We need to signal to finished steps that the dataflow is aborted.
// This may require handling seperate to the step abort.
// This is done seperate to the finalise hook so that concerns are seperated for finalised vs aborted runs.
$this->set_current_step($enginestep);
$enginestep->dataflow_abort();
}
}
foreach ($this->flowcaps as $enginestep) {
Expand Down
7 changes: 7 additions & 0 deletions classes/local/execution/engine_step.php
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ public function abort() {
$this->steptype->on_abort();
}

/**
* Signal handler for a full dataflow abort.
*/
public function dataflow_abort() {
$this->steptype->on_dataflow_abort();
}

/**
* Attempt to execute the step.
*
Expand Down
39 changes: 38 additions & 1 deletion classes/local/scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static function get_scheduled_times(int $stepid) {
public static function set_scheduled_times(int $dataflowid, int $stepid, int $newtime, ?int $oldtime = null) {
global $DB;

$obj = (object) ['nextruntime' => $newtime, 'dataflowid' => $dataflowid, 'stepid' => $stepid];
$obj = (object) ['nextruntime' => $newtime, 'dataflowid' => $dataflowid, 'stepid' => $stepid, 'retrycount' => 0];
if (!is_null($oldtime)) {
$obj->lastruntime = $oldtime;
}
Expand All @@ -67,6 +67,43 @@ public static function set_scheduled_times(int $dataflowid, int $stepid, int $ne
}
}

/**
* Schedule a retry run. If the maximum retry count is reached, set to regular scheduled time and no retry count.
*
* @param int $dataflowid the flow id.
* @param int $stepid the step trigger id.
* @param int $retrytime when to run next on a retry.
* @param int $scheduledtime when to run next if allowed retries are exhausted.
* @param int $retriesallowed the amount of retries allowed before resuming regular schedule.
*/
public static function set_scheduled_retry(
int $dataflowid,
int $stepid,
int $retrytime,
int $scheduledtime,
int $retriesallowed) {

global $DB;
$schedule = $DB->get_record(self::TABLE, ['dataflowid' => $dataflowid, 'stepid' => $stepid]);

if (!$schedule) {
// This method has been called incorrectly for a schedule that has never run or doesn't exist.
throw new \coding_exception("Dataflow retry attempted on a trigger with no step.");
}

if ($schedule->retrycount >= $retriesallowed) {
// Allowed retries are exhausted. Set to regular schedule and no retries.
$schedule->retrycount = 0;
$schedule->nextruntime = $scheduledtime;
} else {
// Increment retry counter, and schedule the retry time.
$schedule->retrycount += 1;
$schedule->nextruntime = $retrytime;
}

$DB->update_record(self::TABLE, $schedule);
}

/**
* Gets a list of dataflows and timestamps that are due to run based on the given reference time.
*
Expand Down
6 changes: 6 additions & 0 deletions classes/local/step/base_step.php
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,12 @@ public function on_initialise() {
public function on_abort() {
}

/**
* Hook function that gets called when a dataflow has been aborted, at conclusion.
*/
public function on_dataflow_abort() {
}

/**
* Hook function that gets called when an engine step has been finalised.
*/
Expand Down
53 changes: 51 additions & 2 deletions classes/local/step/trigger_cron.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public static function form_define_fields(): array {
'day' => ['type' => PARAM_TEXT],
'month' => ['type' => PARAM_TEXT],
'dayofweek' => ['type' => PARAM_TEXT],
'retryinterval' => ['type' => PARAM_INT],
'retrycount' => ['type' => PARAM_INT],
'disabled' => ['type' => PARAM_TEXT],
];
}
Expand All @@ -60,6 +62,9 @@ public function form_get_default_data(\stdClass $data): \stdClass {
$data->{"config_$field"} = '*';
}
}
$data->config_retryinterval ??= 0;
$data->config_retrycount ??= 0;

return $data;
}

Expand Down Expand Up @@ -128,6 +133,13 @@ public function form_add_custom_inputs(\MoodleQuickForm &$mform) {

$mform->addGroup($crontab, 'crontab', get_string('trigger_cron:crontab', 'tool_dataflows'), ' ', false);
$mform->addElement('static', 'crontab_desc', '', get_string('trigger_cron:crontab_desc', 'tool_dataflows'));

// Retry configurations.
$mform->addElement('duration', 'config_retryinterval', get_string('trigger_cron:retryinterval', 'tool_dataflows'));
$mform->setType('retryinterval', PARAM_INT);
$mform->addElement('text', 'config_retrycount', get_string('trigger_cron:retrycount', 'tool_dataflows'));
$mform->setType('retrycount', PARAM_INT);
$mform->setDefault('retrycount', 0);
}

/**
Expand All @@ -143,6 +155,13 @@ public function validate_config($config) {
return ['crontab' => get_string('trigger_cron:invalid', 'tool_dataflows', '', true)];
}
}
if ($config->retryinterval < 0) {
return ['config_retryinterval' => get_string('trigger_cron:positive_retryinterval', 'tool_dataflows', null, true)];
}
if ($config->retrycount < 0) {
return ['config_retrycount' => get_string('trigger_cron:positive_retryinterval', 'tool_dataflows', null, true)];
}

return true;
}

Expand Down Expand Up @@ -276,8 +295,18 @@ public function on_finalise() {
*/
public function on_abort() {
if (!$this->stepdef->dataflow->is_concurrency_enabled()) {
// Reschedule on aborts.
$this->reschedule();
// Reschedule a retry on aborts.
$this->reschedule_retry();
}
}

/**
* Hook function that gets called when When the dataflow engine is aborting.
*/
public function on_dataflow_abort() {
if (!$this->stepdef->dataflow->is_concurrency_enabled()) {
// Reschedule a retry on aborts.
$this->reschedule_retry();
}
}

Expand All @@ -295,6 +324,26 @@ protected function reschedule() {
$newtime,
$config->nextruntime ?? null
);
$this->log("Rescheduling dataflow to configured schedule.");
}
}

/**
* Schedule a retry for this flow. If the maximum retries are reached, the regular schedule will be used.
*/
public function reschedule_retry() {
$config = $this->get_variables()->get('config');
$scheduledtime = $this->get_next_scheduled_time($config);
$retrytime = time() + $config->retryinterval;

scheduler::set_scheduled_retry(
$this->stepdef->dataflowid,
$this->stepdef->id,
$retrytime,
$scheduledtime,
$config->retrycount
);

$this->log("Rescheduling dataflow to retry.");
}
}
1 change: 1 addition & 0 deletions db/install.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
<FIELD NAME="stepid" TYPE="int" LENGTH="10" NOTNULL="true" SEQUENCE="false"/>
<FIELD NAME="lastruntime" TYPE="int" LENGTH="10" NOTNULL="true" DEFAULT="0" SEQUENCE="false" COMMENT="The time the dataflow was last scheduled to be run"/>
<FIELD NAME="nextruntime" TYPE="int" LENGTH="10" NOTNULL="true" SEQUENCE="false" COMMENT="The time the dataflow is next scheduled to be run"/>
<FIELD NAME="retrycount" TYPE="int" LENGTH="10" NOTNULL="true" DEFAULT="0" SEQUENCE="false" COMMENT="Count of attempted retries on the current dataflow. Reset on scheduling a fresh run."/>
</FIELDS>
<KEYS>
<KEY NAME="primary" TYPE="primary" FIELDS="id"/>
Expand Down
51 changes: 47 additions & 4 deletions db/upgrade.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
* @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
*/

use Symfony\Component\Yaml\Yaml;

/**
* Function to upgrade tool_dataflows.
*
Expand Down Expand Up @@ -305,20 +307,40 @@ function xmldb_tool_dataflows_upgrade($oldversion) {
}

if ($oldversion < 2024030200) {

// Define field loghandlers to be added to tool_dataflows.
// Define field notifyonabort to be added to tool_dataflows.
$table = new xmldb_table('tool_dataflows');
$field = new xmldb_field('notifyonabort', XMLDB_TYPE_CHAR, '255', null, null, null, '', 'confighash');

// Conditionally launch add field loghandlers.
// Conditionally launch add field notifyonabort.
if (!$dbman->field_exists($table, $field)) {
$dbman->add_field($table, $field);
}

// Dataflows savepoint reached.
upgrade_plugin_savepoint(true, 2024030200, 'tool', 'dataflows');
}

if ($oldversion < 2024030201) {
// Define field retrycount to be added to tool_dataflows_schedule.
$table = new xmldb_table('tool_dataflows_schedule');
$field = new xmldb_field('retrycount', XMLDB_TYPE_INTEGER, '10', null, XMLDB_NOTNULL, null, 0, 'nextruntime');

// Conditionally launch add field retrycount.
if (!$dbman->field_exists($table, $field)) {
$dbman->add_field($table, $field);
}

// Also upgrade any cron_trigger step.
$type = \tool_dataflows\local\step\trigger_cron::class;
$newfields = [
'retryinterval' => 0,
'retrycount' => 0
];
xmldb_tool_dataflows_step_config_helper($type, $newfields);

// Dataflows savepoint reached.
upgrade_plugin_savepoint(true, 2024030201, 'tool', 'dataflows');
}

return true;
}

Expand Down Expand Up @@ -348,3 +370,24 @@ function xmldb_tool_dataflows_logfile_rename_helper(string $path, string $patter
}
}
}

/**
* Upgrade step helper function. Appends fields into existing configuration.
*
* @param string $type the classname of the step to upgrade.
* @param array $newfields new fields to append into the configuration array. String => primitive scalar.
*/
function xmldb_tool_dataflows_step_config_helper(string $type, array $newfields) {
global $DB;

$steps = $DB->get_records('tool_dataflows_steps', ['type' => $type]);
$transaction = $DB->start_delegated_transaction();
foreach ($steps as $step) {
$config = Yaml::parse($step->config);
$updatedconf = array_merge($config, $newfields);
$step->config = Yaml::dump($updatedconf);
$DB->update_record('tool_dataflows_steps', $step);
}
$transaction->allow_commit();
}

4 changes: 4 additions & 0 deletions lang/en/tool_dataflows.php
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,10 @@
$string['trigger_cron:crontab_desc'] = 'The schedule is edited as five values: minute, hour, day, month and day of month, in that order. The values are in crontab format.';
$string['trigger_cron:strftime_datetime'] = '%d %b %Y, %H:%M';
$string['trigger_cron:next_run_time'] = 'Next run time: {$a}';
$string['trigger_cron:retryinterval'] = 'Retry interval';
$string['trigger_cron:retrycount'] = 'Number of retries';
$string['trigger_cron:positive_retrycount'] = 'Number of retries must be positive or 0';
$string['trigger_cron:positive_retryinterval'] = 'Retry interval must be positive or 0';

// Email notification.
$string['connector_email:message'] = 'Message';
Expand Down
4 changes: 3 additions & 1 deletion tests/fixtures/sample-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ steps:
day: '*'
month: '*'
dayofweek: '*'
retryinterval: 0
retrycount: 0
sql_reader:
name: 'SQL reader'
depends_on: cron
Expand All @@ -21,4 +23,4 @@ steps:
debugging_writer:
name: 'Debugging writer'
depends_on: sql_reader
type: tool_dataflows\local\step\writer_debugging
type: tool_dataflows\local\step\writer_debugging
2 changes: 1 addition & 1 deletion tests/tool_dataflows_ad_hoc_task_test.php
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private function create_dataflow() {
$cron = new step();
$cron->name = 'cron';
$cron->type = 'tool_dataflows\local\step\trigger_cron';
$cron->config = "minute: '*'\nhour: '*'\nday: '*'\nmonth: '*'\ndayofweek: '*'";
$cron->config = "minute: '*'\nhour: '*'\nday: '*'\nmonth: '*'\ndayofweek: '*'\nretryinterval: 0\nretrycount: 0";
$dataflow->add_step($cron);

$reader = new step();
Expand Down
55 changes: 55 additions & 0 deletions tests/tool_dataflows_scheduler_test.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

namespace tool_dataflows;

use coding_exception;
use tool_dataflows\local\scheduler;

/**
Expand Down Expand Up @@ -70,6 +71,60 @@ public function test_update_next_scheduled_time() {
$this->assertEquals((object) ['lastruntime' => 160, 'nextruntime' => 220], scheduler::get_scheduled_times(12));
}

/**
* Tests the scheduling of retry run on an invalid dataflow.
*
* @covers \tool_dataflows\local\scheduler::set_scheduled_retry
*/
public function test_set_scheduled_retry_invalid() {
// Retry cannot be called for a run that hasn't been regularly scheduled.
$this->expectException(coding_exception::class);
$this->expectExceptionMessage(
'Coding error detected, it must be fixed by a programmer: Dataflow retry attempted on a trigger with no step.'
);
scheduler::set_scheduled_retry(1, 1, 1, 1, 1);
}

/**
* Tests the scheduling of retry runs.
*
* @covers \tool_dataflows\local\scheduler::set_scheduled_retry
*/
public function test_set_scheduled_retry() {
global $DB;

// Default 0.
scheduler::set_scheduled_times(1, 1, 123, 1);
$this->assertEquals(0, $DB->get_field(scheduler::TABLE, 'retrycount', ['stepid' => 1]));

// Schedule a retry when none are allowed.
$regulartime = 555;
$retrytime = 444;
scheduler::set_scheduled_retry(1, 1, $retrytime, $regulartime, 0);
$this->assertEquals((object) ['lastruntime' => 1, 'nextruntime' => $regulartime], scheduler::get_scheduled_times(1));

// Schedule a retry when retries are permitted.
scheduler::set_scheduled_retry(1, 1, $retrytime, $regulartime, 2);
$this->assertEquals((object) ['lastruntime' => 1, 'nextruntime' => $retrytime], scheduler::get_scheduled_times(1));

// Now run again and confirm counter has been incremented twice.
scheduler::set_scheduled_retry(1, 1, $retrytime, $regulartime, 2);
$this->assertEquals((object) ['lastruntime' => 1, 'nextruntime' => $retrytime], scheduler::get_scheduled_times(1));
$this->assertEquals(2, $DB->get_field(scheduler::TABLE, 'retrycount', ['stepid' => 1]));

// Now attempt to schedule another retry. The counter should reset and go to regular time.
scheduler::set_scheduled_retry(1, 1, $retrytime, $regulartime, 2);
$this->assertEquals((object) ['lastruntime' => 1, 'nextruntime' => $regulartime], scheduler::get_scheduled_times(1));
$this->assertEquals(0, $DB->get_field(scheduler::TABLE, 'retrycount', ['stepid' => 1]));

// Now confirm that if a successful run is registered while there are still retries left, the counter is reset.
scheduler::set_scheduled_retry(1, 1, $retrytime, $regulartime, 2);
$this->assertEquals((object) ['lastruntime' => 1, 'nextruntime' => $retrytime], scheduler::get_scheduled_times(1));
$this->assertEquals(1, $DB->get_field(scheduler::TABLE, 'retrycount', ['stepid' => 1]));
scheduler::set_scheduled_times(1, 1, $regulartime);
$this->assertEquals(0, $DB->get_field(scheduler::TABLE, 'retrycount', ['stepid' => 1]));
}

/**
* Tests the get_due_dataflows() function.
*
Expand Down
Loading

0 comments on commit eb25aa3

Please sign in to comment.