diff --git a/classes/local/execution/engine.php b/classes/local/execution/engine.php
index 85d52752..5902b11a 100644
--- a/classes/local/execution/engine.php
+++ b/classes/local/execution/engine.php
@@ -575,6 +575,12 @@ public function abort(?\Throwable $reason = null) {
if ($status !== self::STATUS_FINISHED && !in_array($status, self::STATUS_TERMINATORS)) {
$this->set_current_step($enginestep);
$enginestep->abort();
+ } else {
+ // We need to signal to finished steps that the dataflow is aborted.
+ // This may require handling seperate to the step abort.
+ // This is done seperate to the finalise hook so that concerns are seperated for finalised vs aborted runs.
+ $this->set_current_step($enginestep);
+ $enginestep->dataflow_abort();
}
}
foreach ($this->flowcaps as $enginestep) {
diff --git a/classes/local/execution/engine_step.php b/classes/local/execution/engine_step.php
index 9c79d0c9..b1523cb8 100644
--- a/classes/local/execution/engine_step.php
+++ b/classes/local/execution/engine_step.php
@@ -144,6 +144,13 @@ public function abort() {
$this->steptype->on_abort();
}
+ /**
+ * Signal handler for a full dataflow abort.
+ */
+ public function dataflow_abort() {
+ $this->steptype->on_dataflow_abort();
+ }
+
/**
* Attempt to execute the step.
*
diff --git a/classes/local/scheduler.php b/classes/local/scheduler.php
index 7cf8eb02..37387795 100644
--- a/classes/local/scheduler.php
+++ b/classes/local/scheduler.php
@@ -54,7 +54,7 @@ public static function get_scheduled_times(int $stepid) {
public static function set_scheduled_times(int $dataflowid, int $stepid, int $newtime, ?int $oldtime = null) {
global $DB;
- $obj = (object) ['nextruntime' => $newtime, 'dataflowid' => $dataflowid, 'stepid' => $stepid];
+ $obj = (object) ['nextruntime' => $newtime, 'dataflowid' => $dataflowid, 'stepid' => $stepid, 'retrycount' => 0];
if (!is_null($oldtime)) {
$obj->lastruntime = $oldtime;
}
@@ -67,6 +67,43 @@ public static function set_scheduled_times(int $dataflowid, int $stepid, int $ne
}
}
+ /**
+ * Schedule a retry run. If the maximum retry count is reached, set to regular scheduled time and no retry count.
+ *
+ * @param int $dataflowid the flow id.
+ * @param int $stepid the step trigger id.
+ * @param int $retrytime when to run next on a retry.
+ * @param int $scheduledtime when to run next if allowed retries are exhausted.
+ * @param int $retriesallowed the amount of retries allowed before resuming regular schedule.
+ */
+ public static function set_scheduled_retry(
+ int $dataflowid,
+ int $stepid,
+ int $retrytime,
+ int $scheduledtime,
+ int $retriesallowed) {
+
+ global $DB;
+ $schedule = $DB->get_record(self::TABLE, ['dataflowid' => $dataflowid, 'stepid' => $stepid]);
+
+ if (!$schedule) {
+ // This method has been called incorrectly for a schedule that has never run or doesn't exist.
+ throw new \coding_exception("Dataflow retry attempted on a trigger with no step.");
+ }
+
+ if ($schedule->retrycount >= $retriesallowed) {
+ // Allowed retries are exhausted. Set to regular schedule and no retries.
+ $schedule->retrycount = 0;
+ $schedule->nextruntime = $scheduledtime;
+ } else {
+ // Increment retry counter, and schedule the retry time.
+ $schedule->retrycount += 1;
+ $schedule->nextruntime = $retrytime;
+ }
+
+ $DB->update_record(self::TABLE, $schedule);
+ }
+
/**
* Gets a list of dataflows and timestamps that are due to run based on the given reference time.
*
diff --git a/classes/local/step/base_step.php b/classes/local/step/base_step.php
index 482e9db9..be8f1ebb 100644
--- a/classes/local/step/base_step.php
+++ b/classes/local/step/base_step.php
@@ -587,6 +587,12 @@ public function on_initialise() {
public function on_abort() {
}
+ /**
+ * Hook function that gets called when a dataflow has been aborted, at conclusion.
+ */
+ public function on_dataflow_abort() {
+ }
+
/**
* Hook function that gets called when an engine step has been finalised.
*/
diff --git a/classes/local/step/trigger_cron.php b/classes/local/step/trigger_cron.php
index 252ca71a..20b12466 100644
--- a/classes/local/step/trigger_cron.php
+++ b/classes/local/step/trigger_cron.php
@@ -42,6 +42,8 @@ public static function form_define_fields(): array {
'day' => ['type' => PARAM_TEXT],
'month' => ['type' => PARAM_TEXT],
'dayofweek' => ['type' => PARAM_TEXT],
+ 'retryinterval' => ['type' => PARAM_INT],
+ 'retrycount' => ['type' => PARAM_INT],
'disabled' => ['type' => PARAM_TEXT],
];
}
@@ -60,6 +62,9 @@ public function form_get_default_data(\stdClass $data): \stdClass {
$data->{"config_$field"} = '*';
}
}
+ $data->config_retryinterval ??= 0;
+ $data->config_retrycount ??= 0;
+
return $data;
}
@@ -128,6 +133,13 @@ public function form_add_custom_inputs(\MoodleQuickForm &$mform) {
$mform->addGroup($crontab, 'crontab', get_string('trigger_cron:crontab', 'tool_dataflows'), ' ', false);
$mform->addElement('static', 'crontab_desc', '', get_string('trigger_cron:crontab_desc', 'tool_dataflows'));
+
+ // Retry configurations.
+ $mform->addElement('duration', 'config_retryinterval', get_string('trigger_cron:retryinterval', 'tool_dataflows'));
+ $mform->setType('retryinterval', PARAM_INT);
+ $mform->addElement('text', 'config_retrycount', get_string('trigger_cron:retrycount', 'tool_dataflows'));
+ $mform->setType('retrycount', PARAM_INT);
+ $mform->setDefault('retrycount', 0);
}
/**
@@ -143,6 +155,13 @@ public function validate_config($config) {
return ['crontab' => get_string('trigger_cron:invalid', 'tool_dataflows', '', true)];
}
}
+ if ($config->retryinterval < 0) {
+ return ['config_retryinterval' => get_string('trigger_cron:positive_retryinterval', 'tool_dataflows', null, true)];
+ }
+ if ($config->retrycount < 0) {
+ return ['config_retrycount' => get_string('trigger_cron:positive_retryinterval', 'tool_dataflows', null, true)];
+ }
+
return true;
}
@@ -276,8 +295,18 @@ public function on_finalise() {
*/
public function on_abort() {
if (!$this->stepdef->dataflow->is_concurrency_enabled()) {
- // Reschedule on aborts.
- $this->reschedule();
+ // Reschedule a retry on aborts.
+ $this->reschedule_retry();
+ }
+ }
+
+ /**
+ * Hook function that gets called when When the dataflow engine is aborting.
+ */
+ public function on_dataflow_abort() {
+ if (!$this->stepdef->dataflow->is_concurrency_enabled()) {
+ // Reschedule a retry on aborts.
+ $this->reschedule_retry();
}
}
@@ -295,6 +324,26 @@ protected function reschedule() {
$newtime,
$config->nextruntime ?? null
);
+ $this->log("Rescheduling dataflow to configured schedule.");
}
}
+
+ /**
+ * Schedule a retry for this flow. If the maximum retries are reached, the regular schedule will be used.
+ */
+ public function reschedule_retry() {
+ $config = $this->get_variables()->get('config');
+ $scheduledtime = $this->get_next_scheduled_time($config);
+ $retrytime = time() + $config->retryinterval;
+
+ scheduler::set_scheduled_retry(
+ $this->stepdef->dataflowid,
+ $this->stepdef->id,
+ $retrytime,
+ $scheduledtime,
+ $config->retrycount
+ );
+
+ $this->log("Rescheduling dataflow to retry.");
+ }
}
diff --git a/db/install.xml b/db/install.xml
index a71a972e..7f5be5d8 100644
--- a/db/install.xml
+++ b/db/install.xml
@@ -68,6 +68,7 @@
+
diff --git a/db/upgrade.php b/db/upgrade.php
index 83af26f0..57834d32 100644
--- a/db/upgrade.php
+++ b/db/upgrade.php
@@ -23,6 +23,8 @@
* @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
*/
+use Symfony\Component\Yaml\Yaml;
+
/**
* Function to upgrade tool_dataflows.
*
@@ -305,20 +307,40 @@ function xmldb_tool_dataflows_upgrade($oldversion) {
}
if ($oldversion < 2024030200) {
-
- // Define field loghandlers to be added to tool_dataflows.
+ // Define field notifyonabort to be added to tool_dataflows.
$table = new xmldb_table('tool_dataflows');
$field = new xmldb_field('notifyonabort', XMLDB_TYPE_CHAR, '255', null, null, null, '', 'confighash');
- // Conditionally launch add field loghandlers.
+ // Conditionally launch add field notifyonabort.
if (!$dbman->field_exists($table, $field)) {
$dbman->add_field($table, $field);
}
- // Dataflows savepoint reached.
upgrade_plugin_savepoint(true, 2024030200, 'tool', 'dataflows');
}
+ if ($oldversion < 2024030201) {
+ // Define field retrycount to be added to tool_dataflows_schedule.
+ $table = new xmldb_table('tool_dataflows_schedule');
+ $field = new xmldb_field('retrycount', XMLDB_TYPE_INTEGER, '10', null, XMLDB_NOTNULL, null, 0, 'nextruntime');
+
+ // Conditionally launch add field retrycount.
+ if (!$dbman->field_exists($table, $field)) {
+ $dbman->add_field($table, $field);
+ }
+
+ // Also upgrade any cron_trigger step.
+ $type = \tool_dataflows\local\step\trigger_cron::class;
+ $newfields = [
+ 'retryinterval' => 0,
+ 'retrycount' => 0
+ ];
+ xmldb_tool_dataflows_step_config_helper($type, $newfields);
+
+ // Dataflows savepoint reached.
+ upgrade_plugin_savepoint(true, 2024030201, 'tool', 'dataflows');
+ }
+
return true;
}
@@ -348,3 +370,24 @@ function xmldb_tool_dataflows_logfile_rename_helper(string $path, string $patter
}
}
}
+
+/**
+ * Upgrade step helper function. Appends fields into existing configuration.
+ *
+ * @param string $type the classname of the step to upgrade.
+ * @param array $newfields new fields to append into the configuration array. String => primitive scalar.
+ */
+function xmldb_tool_dataflows_step_config_helper(string $type, array $newfields) {
+ global $DB;
+
+ $steps = $DB->get_records('tool_dataflows_steps', ['type' => $type]);
+ $transaction = $DB->start_delegated_transaction();
+ foreach ($steps as $step) {
+ $config = Yaml::parse($step->config);
+ $updatedconf = array_merge($config, $newfields);
+ $step->config = Yaml::dump($updatedconf);
+ $DB->update_record('tool_dataflows_steps', $step);
+ }
+ $transaction->allow_commit();
+}
+
diff --git a/lang/en/tool_dataflows.php b/lang/en/tool_dataflows.php
index e14a2466..9c2b3d36 100644
--- a/lang/en/tool_dataflows.php
+++ b/lang/en/tool_dataflows.php
@@ -404,6 +404,10 @@
$string['trigger_cron:crontab_desc'] = 'The schedule is edited as five values: minute, hour, day, month and day of month, in that order. The values are in crontab format.';
$string['trigger_cron:strftime_datetime'] = '%d %b %Y, %H:%M';
$string['trigger_cron:next_run_time'] = 'Next run time: {$a}';
+$string['trigger_cron:retryinterval'] = 'Retry interval';
+$string['trigger_cron:retrycount'] = 'Number of retries';
+$string['trigger_cron:positive_retrycount'] = 'Number of retries must be positive or 0';
+$string['trigger_cron:positive_retryinterval'] = 'Retry interval must be positive or 0';
// Email notification.
$string['connector_email:message'] = 'Message';
diff --git a/tests/fixtures/sample-cron.yml b/tests/fixtures/sample-cron.yml
index 7d53f584..1dab43ab 100644
--- a/tests/fixtures/sample-cron.yml
+++ b/tests/fixtures/sample-cron.yml
@@ -10,6 +10,8 @@ steps:
day: '*'
month: '*'
dayofweek: '*'
+ retryinterval: 0
+ retrycount: 0
sql_reader:
name: 'SQL reader'
depends_on: cron
@@ -21,4 +23,4 @@ steps:
debugging_writer:
name: 'Debugging writer'
depends_on: sql_reader
- type: tool_dataflows\local\step\writer_debugging
\ No newline at end of file
+ type: tool_dataflows\local\step\writer_debugging
diff --git a/tests/tool_dataflows_ad_hoc_task_test.php b/tests/tool_dataflows_ad_hoc_task_test.php
index 059ce74a..76fcddbb 100644
--- a/tests/tool_dataflows_ad_hoc_task_test.php
+++ b/tests/tool_dataflows_ad_hoc_task_test.php
@@ -95,7 +95,7 @@ private function create_dataflow() {
$cron = new step();
$cron->name = 'cron';
$cron->type = 'tool_dataflows\local\step\trigger_cron';
- $cron->config = "minute: '*'\nhour: '*'\nday: '*'\nmonth: '*'\ndayofweek: '*'";
+ $cron->config = "minute: '*'\nhour: '*'\nday: '*'\nmonth: '*'\ndayofweek: '*'\nretryinterval: 0\nretrycount: 0";
$dataflow->add_step($cron);
$reader = new step();
diff --git a/tests/tool_dataflows_scheduler_test.php b/tests/tool_dataflows_scheduler_test.php
index 6443f430..86251031 100644
--- a/tests/tool_dataflows_scheduler_test.php
+++ b/tests/tool_dataflows_scheduler_test.php
@@ -16,6 +16,7 @@
namespace tool_dataflows;
+use coding_exception;
use tool_dataflows\local\scheduler;
/**
@@ -70,6 +71,60 @@ public function test_update_next_scheduled_time() {
$this->assertEquals((object) ['lastruntime' => 160, 'nextruntime' => 220], scheduler::get_scheduled_times(12));
}
+ /**
+ * Tests the scheduling of retry run on an invalid dataflow.
+ *
+ * @covers \tool_dataflows\local\scheduler::set_scheduled_retry
+ */
+ public function test_set_scheduled_retry_invalid() {
+ // Retry cannot be called for a run that hasn't been regularly scheduled.
+ $this->expectException(coding_exception::class);
+ $this->expectExceptionMessage(
+ 'Coding error detected, it must be fixed by a programmer: Dataflow retry attempted on a trigger with no step.'
+ );
+ scheduler::set_scheduled_retry(1, 1, 1, 1, 1);
+ }
+
+ /**
+ * Tests the scheduling of retry runs.
+ *
+ * @covers \tool_dataflows\local\scheduler::set_scheduled_retry
+ */
+ public function test_set_scheduled_retry() {
+ global $DB;
+
+ // Default 0.
+ scheduler::set_scheduled_times(1, 1, 123, 1);
+ $this->assertEquals(0, $DB->get_field(scheduler::TABLE, 'retrycount', ['stepid' => 1]));
+
+ // Schedule a retry when none are allowed.
+ $regulartime = 555;
+ $retrytime = 444;
+ scheduler::set_scheduled_retry(1, 1, $retrytime, $regulartime, 0);
+ $this->assertEquals((object) ['lastruntime' => 1, 'nextruntime' => $regulartime], scheduler::get_scheduled_times(1));
+
+ // Schedule a retry when retries are permitted.
+ scheduler::set_scheduled_retry(1, 1, $retrytime, $regulartime, 2);
+ $this->assertEquals((object) ['lastruntime' => 1, 'nextruntime' => $retrytime], scheduler::get_scheduled_times(1));
+
+ // Now run again and confirm counter has been incremented twice.
+ scheduler::set_scheduled_retry(1, 1, $retrytime, $regulartime, 2);
+ $this->assertEquals((object) ['lastruntime' => 1, 'nextruntime' => $retrytime], scheduler::get_scheduled_times(1));
+ $this->assertEquals(2, $DB->get_field(scheduler::TABLE, 'retrycount', ['stepid' => 1]));
+
+ // Now attempt to schedule another retry. The counter should reset and go to regular time.
+ scheduler::set_scheduled_retry(1, 1, $retrytime, $regulartime, 2);
+ $this->assertEquals((object) ['lastruntime' => 1, 'nextruntime' => $regulartime], scheduler::get_scheduled_times(1));
+ $this->assertEquals(0, $DB->get_field(scheduler::TABLE, 'retrycount', ['stepid' => 1]));
+
+ // Now confirm that if a successful run is registered while there are still retries left, the counter is reset.
+ scheduler::set_scheduled_retry(1, 1, $retrytime, $regulartime, 2);
+ $this->assertEquals((object) ['lastruntime' => 1, 'nextruntime' => $retrytime], scheduler::get_scheduled_times(1));
+ $this->assertEquals(1, $DB->get_field(scheduler::TABLE, 'retrycount', ['stepid' => 1]));
+ scheduler::set_scheduled_times(1, 1, $regulartime);
+ $this->assertEquals(0, $DB->get_field(scheduler::TABLE, 'retrycount', ['stepid' => 1]));
+ }
+
/**
* Tests the get_due_dataflows() function.
*
diff --git a/tests/tool_dataflows_upgrade_test.php b/tests/tool_dataflows_upgrade_test.php
new file mode 100644
index 00000000..a955017e
--- /dev/null
+++ b/tests/tool_dataflows_upgrade_test.php
@@ -0,0 +1,71 @@
+.
+
+namespace tool_dataflows;
+use Symfony\Component\Yaml\Yaml;
+
+defined('MOODLE_INTERNAL') || die();
+require_once(__DIR__.'/../db/upgrade.php');
+
+/**
+ * Unit test for the dataflows upgrade steps.
+ *
+ * @package tool_dataflows
+ * @author Peter Burnett
+ * @copyright 2024, Catalyst IT
+ * @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
+ */
+class tool_dataflows_upgrade_test extends \advanced_testcase {
+ /**
+ * Test config upgrade helper.
+ *
+ * @covers \tool_dataflows\local\step\trigger_event::validate_config
+ */
+ public function test_step_config_helper() {
+ global $DB;
+ $this->resetAfterTest();
+
+ $dataflow = new dataflow();
+ $dataflow->name = 'testupgrade';
+ $dataflow->enabled = true;
+ $dataflow->save();
+
+ $step = new step();
+ $step->name = 'cron';
+ $step->type = 'tool_dataflows\local\step\trigger_cron';
+ $config = [
+ 'minute' => '*',
+ 'hour' => '*',
+ 'day' => '*',
+ 'month' => '*',
+ 'dayofweek' => '*',
+ 'retryinterval' => '0',
+ 'retrycount' => '0',
+ ];
+ $step->config = Yaml::dump($config);
+ $dataflow->add_step($step);
+
+ // Regular Additional config.
+ $existingstep = $DB->get_record('tool_dataflows_steps', ['type' => $step->type]);
+ $this->assertEquals($config, Yaml::parse($existingstep->config));
+
+ $config['newstep'] = 'upgrade';
+ $newfields = ['newstep' => 'upgrade'];
+ xmldb_tool_dataflows_step_config_helper($step->type, $newfields);
+ $updatedrec = $DB->get_record('tool_dataflows_steps', ['type' => $step->type]);
+ $this->assertEquals($config, Yaml::parse($updatedrec->config));
+ }
+}
diff --git a/version.php b/version.php
index d6dd6f0d..d3de945f 100644
--- a/version.php
+++ b/version.php
@@ -25,8 +25,8 @@
defined('MOODLE_INTERNAL') || die();
-$plugin->version = 2024030200;
-$plugin->release = 2024030200;
+$plugin->version = 2024030201;
+$plugin->release = 2024030201;
$plugin->requires = 2022112800; // Our lowest supported Moodle (3.3.0).
$plugin->supported = [400, 402];
// TODO $plugin->incompatible = ; // Available as of Moodle 3.9.0 or later.