Skip to content

Commit

Permalink
CTP-3867 Digest SORA SQS messages
Browse files Browse the repository at this point in the history
  • Loading branch information
aydevworks committed Nov 26, 2024
1 parent d403436 commit 9b800cc
Show file tree
Hide file tree
Showing 12 changed files with 455 additions and 20 deletions.
239 changes: 239 additions & 0 deletions classes/extension/aws_queue_processor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
<?php
// This file is part of Moodle - http://moodle.org/
//
// Moodle is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Moodle is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Moodle. If not, see <http://www.gnu.org/licenses/>.

namespace local_sitsgradepush\extension;

use local_sitsgradepush\aws\sqs;
use local_sitsgradepush\logger;

/**
* Parent class for queue processors.
*
* @package local_sitsgradepush
* @copyright 2024 onwards University College London {@link https://www.ucl.ac.uk/}
* @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
* @author Alex Yeung <[email protected]>
*/
abstract class aws_queue_processor {

/** @var int Maximum number of messages to fetch per call. 10 is the highest number, limited by AWS */
const MAX_MESSAGES = 10;

/** @var int Visibility timeout in seconds */
const VISIBILITY_TIMEOUT = 60;

/** @var int Wait time in seconds */
const WAIT_TIME_SECONDS = 5;

/** @var string Message status - processed */
const STATUS_PROCESSED = 'processed';

/** @var string Message status - failed */
const STATUS_FAILED = 'failed';

/** @var int Maximum number of batches */
const MAX_BATCHES = 30;

/** @var int Maximum number of messages to fetch */
const MAX_MESSAGES_TO_PROCESS = 300;

/** @var int Maximum execution time in seconds */
const MAX_EXECUTION_TIME = 1800; // 30 minutes

/**
* Get the queue URL.
*
* @return string
*/
abstract protected function get_queue_url(): string;

/**
* Process the message.
*
* @param array $messagebody AWS SQS message body
* @return void
*/
abstract protected function process_message(array $messagebody): void;

/**
* Fetch messages from the queue.
*
* @param int $maxmessages Maximum number of messages to fetch
* @param int $visibilitytimeout Visibility timeout in seconds
* @param int $waittimeseconds Wait time in seconds
* @return array
*/
protected function fetch_messages(
int $maxmessages = self::MAX_MESSAGES,
int $visibilitytimeout = self::VISIBILITY_TIMEOUT,
int $waittimeseconds = self::WAIT_TIME_SECONDS
): array {
$sqs = new sqs();
$result = $sqs->get_client()->receiveMessage([
'QueueUrl' => $this->get_queue_url(),
'MaxNumberOfMessages' => $maxmessages,
'VisibilityTimeout' => $visibilitytimeout,
'WaitTimeSeconds' => $waittimeseconds,
]);

return $result->get('Messages') ?? [];
}

/**
* Check if message is already processed.
*
* @param string $messageid AWS SQS Message ID
* @return bool True if message is processed already, false otherwise
* @throws \dml_exception
*/
protected function is_processed_message(string $messageid): bool {
global $DB;

try {
// Allow processing if message has not been processed successfully.
return $DB->record_exists(
'local_sitsgradepush_aws_log',
['messageid' => $messageid, 'status' => self::STATUS_PROCESSED]
);
} catch (\Exception $e) {
logger::log($e->getMessage(), null, 'Failed to check message status');
return false;
}
}

/**
* Execute the queue processor with batch processing support
*
* @return void
* @throws \Exception
*/
public function execute(): void {
try {
$processedcount = 0;
$batchnumber = 0;
$starttime = time();

do {
// Check safety limits.
if ($batchnumber >= self::MAX_BATCHES) {
mtrace("Maximum batch limit (" . self::MAX_BATCHES . ") reached");
break;
}

if ($processedcount >= self::MAX_MESSAGES_TO_PROCESS) {
mtrace("Maximum message limit (" . self::MAX_MESSAGES_TO_PROCESS . ") reached");
break;
}

$elapsedtime = time() - $starttime;
if ($elapsedtime >= self::MAX_EXECUTION_TIME) {
mtrace("Maximum execution time (" . self::MAX_EXECUTION_TIME . " seconds) reached");
break;
}

// Fetch messages from the queue.
$messages = $this->fetch_messages();
if (empty($messages)) {
if ($batchnumber === 0) {
mtrace('No messages found.');
}
break;
}

$batchnumber++;
mtrace(sprintf('Processing batch %d with %d messages...', $batchnumber, count($messages)));

foreach ($messages as $message) {
try {
if ($this->is_processed_message($message['MessageId'])) {
mtrace("Skipping processed message: {$message['MessageId']}");
continue;
}
$data = json_decode($message['Body'], true);
if (json_last_error() !== JSON_ERROR_NONE) {
throw new \Exception('Invalid JSON data: ' . json_last_error_msg());
}
$this->process_message($data);
$this->save_message_record($message);
$this->delete_message($message['ReceiptHandle']);
$processedcount++;
} catch (\Exception $e) {
logger::log($e->getMessage(), null, static::class . ' Processing Error');
$this->save_message_record($message, self::STATUS_FAILED, $e->getMessage());
}
}

} while (!empty($messages));

mtrace(sprintf('Completed processing %d messages in %d batches (%.2f seconds)',
$processedcount,
$batchnumber,
time() - $starttime
));
} catch (\Exception $e) {
logger::log($e->getMessage(), null, static::class . ' Queue Error');
throw $e;
}
}

/**
* Delete the message from the queue.
*
* @param string $receipthandle
* @return void
*/
protected function delete_message(string $receipthandle): void {
$sqs = new sqs();
$sqs->get_client()->deleteMessage([
'QueueUrl' => $this->get_queue_url(),
'ReceiptHandle' => $receipthandle,
]);
}

/**
* Save message processing details to database
*
* @param array $message SQS message data
* @param string $status Processing status
* @param string|null $error Error message if any
* @return bool|int Returns record ID on success, false on failure
* @throws \dml_exception
*/
protected function save_message_record(
array $message,
string $status = self::STATUS_PROCESSED,
?string $error = null
): bool|int {
global $DB, $USER;

try {
$record = new \stdClass();
$record->messageid = $message['MessageId'];
$record->receipthandle = $message['ReceiptHandle'];
$record->queueurl = $this->get_queue_url();
$record->status = $status;
$record->payload = $message['Body'];
$record->error_message = $error;
$record->timecreated = time();
$record->usermodified = $USER->id;

return $DB->insert_record('local_sitsgradepush_aws_log', $record);
} catch (\Exception $e) {
logger::log($e->getMessage(), null, 'Failed to save message record');
return false;
}
}
}
6 changes: 3 additions & 3 deletions classes/extension/ec.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ public function process_extension(): void {
* Set the EC properties from the AWS EC update message.
* Note: The AWS EC update message is not yet developed, will implement this when the message is available.
*
* @param string $message
* @param string $messagebody
* @return void
* @throws \dml_exception|\moodle_exception
*/
public function set_properties_from_aws_message(string $message): void {
public function set_properties_from_aws_message(string $messagebody): void {
// Decode the JSON message.
$messagedata = $this->parse_event_json($message);
$messagedata = $this->parse_event_json($messagebody);

// Set the user ID of the student.
$this->set_userid($messagedata->student_code);
Expand Down
22 changes: 19 additions & 3 deletions classes/extension/extension.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ abstract class extension implements iextension {
/**
* Set properties from JSON message like SORA / EC update message from AWS.
*
* @param string $message
* @param string $messagebody
* @return void
*/
abstract public function set_properties_from_aws_message(string $message): void;
abstract public function set_properties_from_aws_message(string $messagebody): void;

/**
* Set properties from get students API.
Expand Down Expand Up @@ -71,6 +71,19 @@ public function get_mab_identifier(): string {
return $this->mabidentifier;
}

/**
* Check if the module type is supported.
*
* @param string|null $module
* @return bool
*/
public static function is_module_supported(?string $module): bool {
if (empty($module)) {
return false;
}
return in_array($module, self::SUPPORTED_MODULE_TYPES);
}

/**
* Get all the assessment mappings by MAB identifier.
*
Expand Down Expand Up @@ -157,8 +170,11 @@ protected function get_mappings_by_userid(int $userid): array {
*/
protected function parse_event_json(string $message): \stdClass {
$messageobject = json_decode($message);
if (json_last_error() !== JSON_ERROR_NONE) {
throw new \Exception(get_string('error:invalid_json_data', 'local_sitsgradepush', json_last_error_msg()));
}
if (empty($messageobject)) {
throw new \Exception('Invalid message data');
throw new \Exception(get_string('error:empty_json_data', 'local_sitsgradepush'));
}
return $messageobject;
}
Expand Down
28 changes: 17 additions & 11 deletions classes/extension/sora.php
Original file line number Diff line number Diff line change
Expand Up @@ -125,32 +125,38 @@ public function get_extension_group_name(): string {
/**
* Set properties from AWS SORA update message.
*
* @param string $message
* @param string $messagebody
* @return void
* @throws \dml_exception
* @throws \moodle_exception
*/
public function set_properties_from_aws_message(string $message): void {
public function set_properties_from_aws_message(string $messagebody): void {

// Decode the JSON message.
$messagedata = $this->parse_event_json($message);
// Decode the JSON message body.
$messagedata = $this->parse_event_json($messagebody);

// Check the message is valid.
if (empty($messagedata->entity->person_sora->sora[0])) {
throw new \moodle_exception('error:invalid_message', 'local_sitsgradepush', '', null, $message);
throw new \moodle_exception('error:invalid_message', 'local_sitsgradepush', '', null, $messagebody);
}

$soradata = $messagedata->entity->person_sora->sora[0];

// Set the user ID of the student.
$this->set_userid($soradata->person->student_code);

// Set properties.
$this->extraduration = (int) $soradata->extra_duration;
$this->restduration = (int) $soradata->rest_duration;
$this->extraduration = (int) $soradata->extra_duration ?? 0;
$this->restduration = (int) $soradata->rest_duration ?? 0;

// A SORA update message must have at least one of the durations.
if ($this->extraduration == 0 && $this->restduration == 0) {
throw new \moodle_exception('error:invalid_duration', 'local_sitsgradepush');
}

// Calculate and set the time extension in seconds.
$this->timeextension = $this->calculate_time_extension($this->get_extra_duration(), $this->get_rest_duration());
$this->timeextension = $this->calculate_time_extension($this->extraduration, $this->restduration);

// Set the user ID of the student.
$this->set_userid($soradata->person->student_code);

$this->dataisset = true;
}

Expand Down
Loading

0 comments on commit 9b800cc

Please sign in to comment.