Skip to content

Commit

Permalink
update for python 2.7 compatibility and refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
elmiomar committed Jun 21, 2024
1 parent ba92c59 commit 37d5e68
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 92 deletions.
133 changes: 81 additions & 52 deletions python/nistoar/pdr/preserv/archive/cli.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,29 @@
#!/usr/bin/python
import argparse
import json
import os
import logging
from client import SQSArchiveClient
from datetime import datetime, timezone

LOG_FILE = "logs/sqs_messages.log"
LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"

# Configure logging
# logging.basicConfig(
# level=logging.INFO,
# filename=LOG_FILE,
# format="%(asctime)s - %(levelname)s - %(message)s",
# )
import sys
import os

# Current directory
current_dir = os.path.dirname(os.path.abspath(__file__))

def send_message(client, aipid, filenames):
"""Construct and send a message to the SQS queue."""
message_dict = {
"action": "archive",
"aipid": aipid,
"filenames": filenames,
"timestamp": datetime.now(timezone.utc).isoformat(), # ISO 8601 format
"priority": "low",
}
logging.info(
f"Sending request message to archive AIP ID {aipid} with filenames {filenames}"
)
try:
response = client.send_archive_request(message_dict)
logging.info(f"Request message sent. SQS response: {response}")
except Exception as e:
logging.error(f"Failed to send message due to: {e}")
raise
# Add the root directory to PYTHONPATH
sys.path.append(os.path.abspath(os.path.join(current_dir, "../../..")))

from nistoar.pdr.preserv.archive.client import SQSArchiveClient
from nistoar.pdr.preserv.archive.validators import JSONSchemaValidator

def receive_message(client):
"""Receive a message from the SQS queue."""
logging.info("Receiving message from the SQS queue.")
response = client.receive_completion_message()
logging.info(f"Message received: {response}")
LOG_FILE = os.path.join(current_dir, "logs", "sqs_cli.log")
LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"


# Configure logging
def setup_logging(level):
"""Setup logging configuration."""

# Set logging level based on a user input
numeric_level = getattr(logging, level.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError(f"Invalid log level: {level}")
raise ValueError("Invalid log level: {}".format(level))

logging.basicConfig(
level=numeric_level,
Expand All @@ -62,25 +36,67 @@ def load_config(profile=None):
"""Load configuration from a specified profile in the config directory."""
default_profile = "local"
profile = profile or default_profile
config_path = f"config/config.{profile}.json"
config_path = os.path.join(current_dir, "config", "config.{}.json".format(profile))

if not os.path.exists(config_path):
logging.error(
f"No configuration file found for the profile '{profile}' at '{config_path}'"
"No configuration file found for the profile '{}' at '{}'".format(
profile, config_path
)
)
raise FileNotFoundError(
f"No configuration file found for the profile '{profile}' at '{config_path}'"
raise IOError(
"No configuration file found for the profile '{}' at '{}'".format(
profile, config_path
)
)

logging.info(f"Loading configuration from {config_path}")
logging.info("Loading configuration from {}".format(config_path))
with open(config_path, "r") as file:
return json.load(file)


def create_client(config):
"""Create an SQSArchiveClient instance from configuration."""
logging.info("Creating an SQS client with the provided configuration.")
return SQSArchiveClient(config)
validator = JSONSchemaValidator(config.get("schema_file"))
return SQSArchiveClient(config, validator)


def send_message(client, aipid, filenames):
"""Construct and send a message to the SQS queue."""
logging.info(
"Sending request message to archive AIP ID {} with filenames {}".format(
aipid, filenames
)
)
try:
response = client.request_archive(aipid, filenames)
logging.info("Request message sent. SQS response: {}".format(response))
except Exception as e:
logging.error("Failed to send message due to: {}".format(e))
raise


def receive_message(client):
"""Receive a message from the SQS queue."""
logging.info("Receiving message from the SQS queue.")
try:
message = client.receive_completion_message()
logging.info("Message received: {}".format(message))
except Exception as e:
logging.error("Failed to receive message due to: {}".format(e))
raise


def delete_message(client, receipt_handle):
"""Delete a message from the SQS queue using the receipt handle."""
logging.info("Deleting message with receipt handle {}".format(receipt_handle))
try:
client.delete_message(receipt_handle)
logging.info("Message deleted successfully.")
except Exception as e:
logging.error("Failed to delete message due to: {}".format(e))
raise


def main():
Expand All @@ -97,31 +113,44 @@ def main():
help="Set the logging level (e.g., DEBUG, INFO)",
)

subparsers = parser.add_subparsers(dest="command", help="Commands", required=True)
subparsers = parser.add_subparsers(dest="command", help="Commands")

# Subparser for sending messages
send_parser = subparsers.add_parser("send", help="Send a message to the queue")
send_parser.add_argument("--aipid", type=str, required=True, help="AIP identifier")
send_parser.add_argument(
"--filenames", nargs="+", required=True, help="List of filenames to be archived"
)
send_parser.set_defaults(func=send_message)

# Subparser for receiving messages
receive_parser = subparsers.add_parser(
"receive", help="Receive a message from the queue"
)
receive_parser.set_defaults(func=receive_message)

# Subparser for deleting messages
delete_parser = subparsers.add_parser(
"delete", help="Delete a message from the queue"
)
delete_parser.add_argument(
"--receipt-handle",
type=str,
required=True,
help="Receipt handle of the message to delete",
)

args = parser.parse_args()
setup_logging(args.log_level)
config = load_config(args.profile)
client = create_client(config)

try:
args.func(client, **vars(args))
except Exception as e:
logging.error(f"An error occurred: {e}")
if args.command == "send":
send_message(client, args.aipid, args.filenames)
elif args.command == "receive":
receive_message(client)
elif args.command == "delete":
delete_message(client, args.receipt_handle)
else:
parser.print_help()


if __name__ == "__main__":
Expand Down
102 changes: 75 additions & 27 deletions python/nistoar/pdr/preserv/archive/client.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,46 @@
import hashlib
import json
import logging
from abc import ABC, abstractmethod
import os
import sys
from datetime import datetime
from urllib.parse import urlparse

import boto3
from botocore.exceptions import ClientError
# Add the root directory of the project to the PYTHONPATH
sys.path.append(
os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../../"))
)

from abc import ABCMeta, abstractmethod


from jsonschema import ValidationError
from nistoar.pdr.config import ConfigurationException
from nistoar.pdr.exceptions import ConfigurationException

from .exceptions import SQSException, ValidationException
from nistoar.pdr.preserv.archive.exceptions import SQSException, ValidationException

logger = logging.getLogger("archive.client")


try:
import boto3
except ImportError:
logger.warning("No module named botocore or boto3. You may need to install boto3")
sys.exit(1)

boto3.compat.filter_python_deprecation_warnings()

from botocore.exceptions import (
ClientError,
NoCredentialsError,
PartialCredentialsError,
ParamValidationError,
)

DEFAULT_MAX_MESSAGES = 10 # Max number of messages to be polled
DEFAULT_WAIT_TIME = 20 # 20s for long polling


class ArchiveClient(ABC):
class ArchiveClient(object):
"""
An abstract base class that defines the interface for an archive client.
Expand All @@ -33,6 +55,8 @@ class ArchiveClient(ABC):
unit testing by allowing the use of a mock implementation.
"""

__metaclass__ = ABCMeta

@abstractmethod
def request_archive(self, aipid, filenames, priority=None):
"""
Expand Down Expand Up @@ -152,35 +176,39 @@ def __init__(self, config, validator):
and 'completion_queue_url'.
:param validator MessageValidator: The validator to use for validating messages.
"""
# This is to replace the KeyError exception, this fails faster and provides more user feedback
required_keys = [
"region",
"aws_access_key_id",
"aws_secret_access_key",
"request_queue_url",
"completion_queue_url",
]

# This is to replace the KeyError exception, this fails faster and provides more user feedback
missing_keys = [key for key in required_keys if key not in config]
if missing_keys:
raise ConfigurationException(
f"Missing required configuration keys: {', '.join(missing_keys)}"
"Missing required configuration keys: %s" % ", ".join(missing_keys)
)

self.sqs = boto3.client(
"sqs",
region_name=config["region"],
aws_access_key_id=config["aws_access_key_id"],
aws_secret_access_key=config["aws_secret_access_key"],
region_name=config.get("region"),
aws_access_key_id=config.get("aws_access_key_id"),
aws_secret_access_key=config.get("aws_secret_access_key"),
)
self.request_queue_url = config["request_queue_url"]
self.completion_queue_url = config["completion_queue_url"]
self.request_queue_url = config.get("request_queue_url")
self.completion_queue_url = config.get("completion_queue_url")

self.validator = validator

def request_archive(self, aipid, filenames, priority="medium"):
"""
Sends an archive request to the configured SQS request queue.
:param message dict: The message content in dictionary format to be sent to the queue.
:param aipid str: The AIP ID for the archive request.
:param filenames list: The list of filenames to be archived.
:param priority str: The priority of the request (default is "medium").
:return dict: A dictionary containing the response from the SQS service, which includes
details such as the message ID and other metadata.
Expand All @@ -202,9 +230,11 @@ def request_archive(self, aipid, filenames, priority="medium"):
)
return response
except ClientError as e:
logger.error(f"Failed to send archive request for AIP ID {aipid}: {e}")
logger.error(
"Failed to send archive request for AIP ID {}: {}".format(aipid, str(e))
)
raise SQSException(
e, message=f"Failed to send archive request for AIP ID {aipid}."
e, message="Failed to send archive request for AIP ID {}.".format(aipid)
)

def receive_completion_message(self):
Expand All @@ -214,10 +244,19 @@ def receive_completion_message(self):
:return dict: A dictionary representing the message received from SQS, which may include
the message body and receipt handle among other details.
"""
max_messages = int(
self.config.get("max_number_of_messages", DEFAULT_MAX_MESSAGES)
)
wait_time_seconds = int(self.config.get("wait_time_seconds", DEFAULT_WAIT_TIME))
try:
max_messages = int(
self.config.get("max_number_of_messages", DEFAULT_MAX_MESSAGES)
)
except ValueError:
raise ConfigurationException("Invalid value for max_number_of_messages")

try:
wait_time_seconds = int(
self.config.get("wait_time_seconds", DEFAULT_WAIT_TIME)
)
except ValueError:
raise ConfigurationException("Invalid value for wait_time_seconds")

try:
response = self.sqs.receive_message(
Expand All @@ -232,13 +271,18 @@ def receive_completion_message(self):
self.validator.validate(message_body)
return message_body
except ValidationError as ve:
logger.error(f"Message validation failed: {ve}")
logger.error("Message validation failed: {}".format(str(ve)))
raise ValidationException(
"Message validation failed.", errors=[str(ve)]
)
return None
except ClientError as e:
logger.error(f"Failed to receive completion message: {e}")
except (
ClientError,
NoCredentialsError,
PartialCredentialsError,
ParamValidationError,
) as e:
logger.error("Failed to receive completion message: {}".format(str(e)))
raise SQSException(e, message="Failed to receive completion message.")

def delete_message(self, receipt_handle):
Expand All @@ -254,10 +298,14 @@ def delete_message(self, receipt_handle):
self.sqs.delete_message(
QueueUrl=self.completion_queue_url, ReceiptHandle=receipt_handle
)
logger.info(f"Deleted message with receipt handle: {receipt_handle}")
logger.info(
"Deleted message with receipt handle: {}".format(receipt_handle)
)
except ClientError as e:
logger.error(f"Failed to delete message from SQS: {e}")
logger.error("Failed to delete message from SQS: {}".format(str(e)))
raise SQSException(
e,
message=f"Failed to delete message receipt handle {receipt_handle} from SQS",
message="Failed to delete message receipt handle {} from SQS".format(
receipt_handle
),
)
6 changes: 4 additions & 2 deletions python/nistoar/pdr/preserv/archive/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ def __init__(self, original_exception, message=None):
:param original_exception Exception: The original exception thrown by AWS SQS.
:param message str: Optional custom message to provide additional context about the error.
"""
default_message = f"An error occurred with SQS: {str(original_exception)}"
super().__init__(
default_message = "An error occurred with SQS: {}".format(
str(original_exception)
)
super(SQSException, self).__init__(
msg=message if message else default_message, cause=original_exception
)

Expand Down
Loading

0 comments on commit 37d5e68

Please sign in to comment.