Skip to content

Commit

Permalink
initial SQS Archive Client implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
elmiomar committed Jun 6, 2024
1 parent 21c3d6f commit 83c19b8
Show file tree
Hide file tree
Showing 16 changed files with 907 additions and 0 deletions.
9 changes: 9 additions & 0 deletions python/nistoar/pdr/preserv/archive/.env.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# sample .env file
AWS_ACCESS_KEY_ID="XXX"
AWS_SECRET_ACCESS_KEY="XXX"
AWS_REGION="us-east-1"
SQS_REQUEST_QUEUE_URL="https://sqs.us-east-1.amazonaws.com/1234567890/request-queue.fifo"
SQS_COMPLETION_QUEUE_URL="https://sqs.us-east-1.amazonaws.com/1234567890/completion-queue.fifo"
SCHEMA_FILE="schema/message_schema.json"
MAX_NUMBER_OF_MESSAGES=10
WAIT_TIME_SECONDS=20
6 changes: 6 additions & 0 deletions python/nistoar/pdr/preserv/archive/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

config/*
!config/config.sample.json
logs/*
.env
!.env.sample
94 changes: 94 additions & 0 deletions python/nistoar/pdr/preserv/archive/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# SQS Archive Client

This package contains Python scripts and modules designed to interact with AWS SQS for archiving purposes. The system includes components for sending messages to an SQS queue, polling messages from the queue, and processing those messages based on a specific message schema.

## Components

### AbstractArchiveClient

An abstract base class for implementing various archive clients.

### SQSArchiveClient

An implementation of the `AbstractArchiveClient` that interacts with AWS SQS to send and receive messages.

### MockArchiveClient

A mock implementation of the `AbstractArchiveClient` for testing purposes without actual AWS SQS communication.

### CLI Program

A command-line interface program that configures an `SQSArchiveClient` to send formatted messages to the SQS request queue.

### process_message.py

A script used to process messages retrieved from the SQS queue. It handles each message according to the specified schema and performs necessary archive operations.

### poll_messages.py

A script that continuously polls the SQS queue for new messages and uses `process_message.py` to process each message.




## Usage

### Sending Messages

Before running the CLI, set up these environment variables:

- `AWS_ACCESS_KEY_ID`: AWS access key.
- `AWS_SECRET_ACCESS_KEY`: AWS secret access key.
- `AWS_REGION`: AWS region.
- `SQS_REQUEST_QUEUE_URL`: URL of the SQS request queue.
- `SQS_COMPLETION_QUEUE_URL`: URL of the SQS completion queue.

Use `export` command or a `.env` to set them.

To send messages to the SQS queue using the CLI program, run a command similar to the following:

```sh
python cli.py --profile local send --aipid "mds2909" --filenames "trial1.txt" "trial2.txt"
```

- `--profile local`: specifies which configuration profile to use. In this case, local is the profile name, which points to specific configurations suitable for a local dev inside the `config` directory.

- `send`: a subcommand of `cli.py`. It is used to send a message to the configured **request** queue. The CLI program also supports a `receive` subcommand for receiving messages from the **completion** queue.

- `--aipid "mds2909"`: specifies the Archive Information Package Identifier (AIP ID).

- `--filenames "trial1.txt" "trial2.txt"`: specifies the filenames to be included in the message sent to SQS. This `filenames` option might be used to identify specific files to be archived. This will also depend on the receiver, who will decide what to do with the filenames list.

## Scripts

### poll_messages.py

This script polls an AWS SQS queue for messages, validates them against a predefined JSON schema, and processes each message asynchronously.

- **Key Functions**:

- `handle_message()`: processes individual messages by calling an external script (`process_message.py`) via a subprocess (system call), passing the message data and handling subprocess execution.
- `poll_messages()`: retrieves messages from the SQS queue, handles them based on the validator's response, and manages asynchronous processing of each message using threads.
- `process_messages_async()`: manages the asynchronous execution of message processing tasks, using Python's `ThreadPoolExecutor` for concurrent processing.

### process_message.py

This script is used to process individual messages based on the data received from `poll_messages.py`, and it is executed as a subprocess.

- **Key Functions**:

- `process_message()`: takes message data and an AIP ID as input, logs processing details, and performs specified actions based on the message content. This function is designed to be called with command-line arguments specifying the AIP ID and receives the message content via stdin. AIPID here is used as placehoder for monitoring purposes.
- `main()`: uses `argparser` to accept the AIP ID, reads the message data from stdin, and calls `process_message()` with the appropriate arguments.

### Workflow

- `poll_messages.py` is executed, typically as a cron job, and enters a polling loop where it retrieves messages from an SQS queue; each message is validated against a JSON schema. Valid messages are passed to `process_message.py` for further processing.

- `process_message.py` processes each message in a separate subprocess to ensure that the processing of each message does not block the polling loop.


## Notes

- Make sure that the AWS credentials are set up correctly in the config file.
- Adjust the thread pool size in `poll_messages.py` based on the expected load and system capabilities.
- The logging paths, configurations paths/profiles, and schemas paths may need to be adjusted based on the deployment environment.
46 changes: 46 additions & 0 deletions python/nistoar/pdr/preserv/archive/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from .client import SQSArchiveClient, MockArchiveClient


def create_client(config):
"""Create and return an archive client based on the specified configuration."""
client_type = config.get("client_type")
if client_type == "sqs":
return SQSArchiveClient(config)
elif client_type == "mock":
return MockArchiveClient()
else:
raise UnsupportedClientTypeException(client_type)


class ConfigurationException(Exception):
"""
Exception raised for errors that are related to the application configuration.
Attributes:
message -- explanation of the error
"""

def __init__(self, message):
self.message = message
super().__init__(self.message)


class UnsupportedClientTypeException(ConfigurationException):
"""Exception raised when an unsupported client type is specified."""

def __init__(self, client_type, message=None):
if message is None:
message = (
f"Unsupported client type '{client_type}'. Expected 'sqs' or 'mock'."
)
super().__init__(message)


class SQSException(Exception):
"""Custom exception for handling SQS specific errors."""

def __init__(self, original_exception, message=None):
self.original_exception = original_exception
if message is None:
message = f"An error occurred with SQS: {str(original_exception)}"
super().__init__(message)
128 changes: 128 additions & 0 deletions python/nistoar/pdr/preserv/archive/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import argparse
import json
import os
import logging
from client import SQSArchiveClient
from datetime import datetime, timezone

LOG_FILE = "logs/sqs_messages.log"
LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"

# Configure logging
# logging.basicConfig(
# level=logging.INFO,
# filename=LOG_FILE,
# format="%(asctime)s - %(levelname)s - %(message)s",
# )


def send_message(client, aipid, filenames):
"""Construct and send a message to the SQS queue."""
message_dict = {
"action": "archive",
"aipid": aipid,
"filenames": filenames,
"timestamp": datetime.now(timezone.utc).isoformat(), # ISO 8601 format
"priority": "low",
}
logging.info(
f"Sending request message to archive AIP ID {aipid} with filenames {filenames}"
)
try:
response = client.send_archive_request(message_dict)
logging.info(f"Request message sent. SQS response: {response}")
except Exception as e:
logging.error(f"Failed to send message due to: {e}")
raise


def receive_message(client):
"""Receive a message from the SQS queue."""
logging.info("Receiving message from the SQS queue.")
response = client.receive_completion_message()
logging.info(f"Message received: {response}")


def setup_logging(level):
"""Setup logging configuration."""

# Set logging level based on a user input
numeric_level = getattr(logging, level.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError(f"Invalid log level: {level}")

logging.basicConfig(
level=numeric_level,
filename=LOG_FILE,
format=LOG_FORMAT,
)


def load_config(profile=None):
"""Load configuration from a specified profile in the config directory."""
default_profile = "local"
profile = profile or default_profile
config_path = f"config/config.{profile}.json"

if not os.path.exists(config_path):
logging.error(
f"No configuration file found for the profile '{profile}' at '{config_path}'"
)
raise FileNotFoundError(
f"No configuration file found for the profile '{profile}' at '{config_path}'"
)

logging.info(f"Loading configuration from {config_path}")
with open(config_path, "r") as file:
return json.load(file)


def create_client(config):
"""Create an SQSArchiveClient instance from configuration."""
logging.info("Creating an SQS client with the provided configuration.")
return SQSArchiveClient(config)


def main():
parser = argparse.ArgumentParser(
description="CLI for interacting with SQS via the SQSArchiveClient."
)
parser.add_argument(
"--profile", type=str, help="Configuration profile to use", default="local"
)
parser.add_argument(
"--log-level",
type=str,
default="INFO",
help="Set the logging level (e.g., DEBUG, INFO)",
)

subparsers = parser.add_subparsers(dest="command", help="Commands", required=True)

# Subparser for sending messages
send_parser = subparsers.add_parser("send", help="Send a message to the queue")
send_parser.add_argument("--aipid", type=str, required=True, help="AIP identifier")
send_parser.add_argument(
"--filenames", nargs="+", required=True, help="List of filenames to be archived"
)
send_parser.set_defaults(func=send_message)

# Subparser for receiving messages
receive_parser = subparsers.add_parser(
"receive", help="Receive a message from the queue"
)
receive_parser.set_defaults(func=receive_message)

args = parser.parse_args()
setup_logging(args.log_level)
config = load_config(args.profile)
client = create_client(config)

try:
args.func(client, **vars(args))
except Exception as e:
logging.error(f"An error occurred: {e}")


if __name__ == "__main__":
main()
Loading

0 comments on commit 83c19b8

Please sign in to comment.