Skip to content

Commit

Permalink
updated client API and added new exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
elmiomar committed Jun 12, 2024
1 parent 83c19b8 commit f6b285a
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 66 deletions.
157 changes: 97 additions & 60 deletions python/nistoar/pdr/preserv/archive/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,23 @@
import json
import logging
from abc import ABC, abstractmethod
from datetime import datetime
from urllib.parse import urlparse

import boto3
from botocore.exceptions import ClientError
from .exceptions import SQSException

import json
from jsonschema import ValidationError
from urllib.parse import urlparse
from nistoar.pdr.config import ConfigurationException

from .exceptions import SQSException, ValidationException

logger = logging.getLogger("archive.client")

DEFAULT_MAX_MESSAGES = 10 # Max number of messages to be polled
DEFAULT_WAIT_TIME = 20 # 20s for long polling


class AbstractArchiveClient(ABC):
class ArchiveClient(ABC):
"""
An abstract base class that defines the interface for an archive client.
Expand All @@ -33,7 +34,7 @@ class AbstractArchiveClient(ABC):
"""

@abstractmethod
def send_archive_request(self, message):
def request_archive(self, aipid, filenames, priority=None):
"""
Sends an archive request to the queue system.
Expand Down Expand Up @@ -68,9 +69,9 @@ def delete_message(self, receipt_handle):
pass


class MockArchiveClient(AbstractArchiveClient):
class MockArchiveClient(ArchiveClient):
"""
A mock implementation of the AbstractArchiveClient for testing purposes.
A mock implementation of the ArchiveClient for testing purposes.
This client simulates interactions with a message queue system without performing
any real network operations. It stores messages in-memory and allows for basic
operations like sending and receiving messages, mimicking the behavior expected
Expand All @@ -88,7 +89,7 @@ def __init__(self):
self.messages = []
self.completed_messages = []

def send_archive_request(self, message):
def request_archive(self, aipid, filenames, priority=None):
"""
Simulates sending an archive request by adding the message to the 'messages' list.
Generates a unique hash ID based on the message content.
Expand All @@ -98,6 +99,9 @@ def send_archive_request(self, message):
:return dict: A mock response dictionary indicating successful message sending, which includes a unique message ID
based on the hash of the message content and HTTP status code.
"""
message = json.dumps(
{"aipid": aipid, "filenames": filenames, "priority": priority or "medium"}
)
# Generate a hash ID based on the message content
message_id = hashlib.sha256(message.encode("utf-8")).hexdigest()
self.messages.append(message)
Expand Down Expand Up @@ -126,17 +130,17 @@ def delete_message(self, receipt_handle):
pass


class SQSArchiveClient(AbstractArchiveClient):
class SQSArchiveClient(ArchiveClient):
"""
SQSArchiveClient provides a concrete implementation of the AbstractArchiveClient,
SQSArchiveClient provides a concrete implementation of the ArchiveClient,
specifically for interacting with AWS Simple Queue Service (SQS). This class handles
sending messages to an SQS queue, receiving messages from it, and deleting messages
from the queue.
:attr sqs (boto3.client): The boto3 SQS client.
:attr request_queue_url (str): URL of the SQS queue to which archive requests are sent.
:attr completion_queue_url (str): URL of the SQS queue from which completion messages are received.
:attr validator (Validator): The validator object used to validate messages against a schema.
:attr sqs boto3.client: The boto3 SQS client.
:attr request_queue_url str: URL of the SQS queue to which archive requests are sent.
:attr completion_queue_url str: URL of the SQS queue from which completion messages are received.
:attr validator Validator: The validator object used to validate messages against a schema.
"""

def __init__(self, config, validator):
Expand All @@ -148,83 +152,116 @@ def __init__(self, config, validator):
and 'completion_queue_url'.
:param validator MessageValidator: The validator to use for validating messages.
"""
self.sqs = boto3.client(
"sqs",
region_name=config["region"],
aws_access_key_id=config["aws_access_key_id"],
aws_secret_access_key=config["aws_secret_access_key"],
# aws_session_token=config["aws_session_token"], # uncomment if using temporary credentials
)
# This is to replace the KeyError exception, this fails faster and provides more user feedback
required_keys = [
"region",
"aws_access_key_id",
"aws_secret_access_key",
"request_queue_url",
"completion_queue_url",
]
missing_keys = [key for key in required_keys if key not in config]
if missing_keys:
raise ConfigurationException(
f"Missing required configuration keys: {', '.join(missing_keys)}"
)

try:
self.sqs = boto3.client(
"sqs",
region_name=config["region"],
aws_access_key_id=config["aws_access_key_id"],
aws_secret_access_key=config["aws_secret_access_key"],
)
except Exception as e:
raise ConfigurationException(f"Error initializing AWS SQS client: {e}")

self.request_queue_url = config["request_queue_url"]
self.completion_queue_url = config["completion_queue_url"]
self.validator = validator

def send_archive_request(self, message):
def request_archive(self, aipid, filenames, priority="medium"):
"""
Sends an archive request to the configured SQS request queue.
:param message (dict): The message content in dictionary format to be sent to the queue.
:param message dict: The message content in dictionary format to be sent to the queue.
:return (dict): A dictionary containing the response from the SQS service, which includes
:return dict: A dictionary containing the response from the SQS service, which includes
details such as the message ID and other metadata.
"""
message = {
"action": "archive",
"aipid": aipid,
"filenames": filenames,
"priority": priority,
"timestamp": datetime.now().isoformat(),
}
message_json = json.dumps(message)
message_group_id = "archive-" + message["aipid"]
response = self.sqs.send_message(
QueueUrl=self.request_queue_url,
MessageBody=message_json,
MessageGroupId=message_group_id,
)
return response
message_group_id = "archive-" + aipid
try:
response = self.sqs.send_message(
QueueUrl=self.request_queue_url,
MessageBody=message_json,
MessageGroupId=message_group_id,
)
return response
except ClientError as e:
logger.error(f"Failed to send archive request for AIP ID {aipid}: {e}")
raise SQSException(
e, message=f"Failed to send archive request for AIP ID {aipid}."
)

def receive_completion_message(self):
"""
Receives a completion message from the configured SQS completion queue.
:return (dict): A dictionary representing the message received from SQS, which may include
:return dict: A dictionary representing the message received from SQS, which may include
the message body and receipt handle among other details.
"""
max_messages = int(
self.config.get("max_number_of_messages", DEFAULT_MAX_MESSAGES)
)
wait_time_seconds = int(self.config.get("wait_time_seconds", DEFAULT_WAIT_TIME))

response = self.sqs.receive_message(
QueueUrl=self.completion_queue_url,
MaxNumberOfMessages=max_messages,
WaitTimeSeconds=wait_time_seconds,
)
if "Messages" in response:
for msg in response["Messages"]:
message_body = json.loads(msg["Body"])
try:
self.validator.validate(message_body)
logging.info("Message validated successfully.")
except ValidationError:
logging.info(
"Received message failed validation and will be discarded."
)
continue
return response
return None
try:
response = self.sqs.receive_message(
QueueUrl=self.completion_queue_url,
MaxNumberOfMessages=max_messages,
WaitTimeSeconds=wait_time_seconds,
)
if "Messages" in response:
for msg in response["Messages"]:
message_body = json.loads(msg["Body"])
try:
self.validator.validate(message_body)
return message_body
except ValidationError as ve:
logger.error(f"Message validation failed: {ve}")
raise ValidationException(
"Message validation failed.", errors=[str(ve)]
)
return None
except ClientError as e:
logger.error(f"Failed to receive completion message: {e}")
raise SQSException(e, message="Failed to receive completion message.")

def delete_message(self, receipt_handle):
"""
Deletes a message from the configured completion SQS queue using the provided receipt handle.
:param receipt_handle (str): The receipt handle of the message to be deleted.
:param receipt_handle str: The receipt handle of the message to be deleted.
:return (str): Confirmation message noting the deletion of the receipt handle.
:raises (ClientError): If the SQS service reports a client error, especially if the queue does not exist.
:return str: Confirmation message noting the deletion of the receipt handle.
:raises ClientError: If the SQS service reports a client error, especially if the queue does not exist.
"""
try:
self.sqs.delete_message(
QueueUrl=self.completion_queue_url, ReceiptHandle=receipt_handle
)
logging.info(f"deleted: {receipt_handle}")
logger.info(f"Deleted message with receipt handle: {receipt_handle}")
except ClientError as e:
if e.response["Error"]["Code"] == "AWS.SimpleQueueService.NonExistentQueue":
logging.info("Queue does not exist, nothing to do.")
else:
logging.info(f"Unexpected error: {e}")
raise SQSException(e, message="Failed to delete message from SQS.")
logger.error(f"Failed to delete message from SQS: {e}")
raise SQSException(
e,
message=f"Failed to delete message receipt handle {receipt_handle} from SQS",
)
72 changes: 66 additions & 6 deletions python/nistoar/pdr/preserv/archive/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,68 @@
class SQSException(Exception):
"""Custom exception for handling SQS specific errors."""
import sys
from nistoar.pdr.preserv import PreservationException


class ArchivingException(PreservationException):
"""
Base exception for all archiving-related errors within the archiving client.
This exception is used as the base class for all exceptions raised specifically
in the context of archiving operations, providing a unified way to handle all
archive-related errors.
"""

def __init__(self, msg=None, errors=None, cause=None):
"""
Initialize the ArchivingException with a message, a list of detailed errors, and an optional cause.
:param msg str: A message describing the error.
:param errors list: A list of specific error messages with details.
:param cause Exception: An underlying cause in the form of an Exception.
"""
super().__init__(msg=msg, errors=errors, cause=cause)


class SQSException(ArchivingException):
"""
Specific exception for handling errors related to interactions with AWS SQS.
This subclass captures exceptions that are specific to SQS operations and provides
more detail.
"""

def __init__(self, original_exception, message=None):
self.original_exception = original_exception
if message is None:
message = f"An error occurred with SQS: {str(original_exception)}"
super().__init__(message)
"""
Initialize the SQSException with the original exception thrown by AWS SQS and an optional custom message.
:param original_exception Exception: The original exception thrown by AWS SQS.
:param message str: Optional custom message to provide additional context about the error.
"""
default_message = f"An error occurred with SQS: {str(original_exception)}"
super().__init__(
msg=message if message else default_message, cause=original_exception
)


class ValidationException(ArchivingException):
"""
An exception indicating a failure in validating data against predefined schemas
or rules within the archiving process.
This exception is used to highlight problems with the data being processed that
do not meet the required standards or expectations, such as format, completeness,
or logical consistency.
"""

def __init__(self, message, errors=None, cause=None):
"""
Initializes a new instance of ValidationException.
:param message str: A general message describing the validation failure.
:param errors list: A list of specific error messages providing
detailed information about what failed during
validation.
:param cause Exception: An underlying cause in the form of an
Exception instance, providing more context
about the source of the failure.
"""
super().__init__(msg=message, errors=errors, cause=cause)

0 comments on commit f6b285a

Please sign in to comment.