Skip to content

Commit

Permalink
[Cloud Deployment IVb] Rclone in AWS on EFS (#1085)
Browse files Browse the repository at this point in the history
Co-authored-by: Heberto Mayorquin <[email protected]>
  • Loading branch information
CodyCBakerPhD and h-mayorquin authored Nov 28, 2024
1 parent 006184a commit c4afad3
Show file tree
Hide file tree
Showing 9 changed files with 361 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ concurrency: # Cancel previous workflows on the same pull request
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
DANDI_API_KEY: ${{ secrets.DANDI_API_KEY }}

jobs:
run:
Expand All @@ -36,8 +35,8 @@ jobs:
git config --global user.email "[email protected]"
git config --global user.name "CI Almighty"
- name: Install full requirements
- name: Install AWS requirements
run: pip install .[aws,test]

- name: Run subset of tests that use S3 live services
run: pytest -rsx -n auto tests/test_minimal/test_tools/aws_tools.py
- name: Run generic AWS tests
run: pytest -rsx -n auto tests/test_minimal/test_tools/aws_tools_tests.py
46 changes: 46 additions & 0 deletions .github/workflows/rclone_aws_tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
name: Rclone AWS Tests
on:
schedule:
- cron: "0 16 * * 2" # Weekly at noon on Tuesday
workflow_dispatch:

concurrency: # Cancel previous workflows on the same pull request
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
RCLONE_DRIVE_ACCESS_TOKEN: ${{ secrets.RCLONE_DRIVE_ACCESS_TOKEN }}
RCLONE_DRIVE_REFRESH_TOKEN: ${{ secrets.RCLONE_DRIVE_REFRESH_TOKEN }}
RCLONE_EXPIRY_TOKEN: ${{ secrets.RCLONE_EXPIRY_TOKEN }}
DANDI_API_KEY: ${{ secrets.DANDI_API_KEY }}

jobs:
run:
name: ${{ matrix.os }} Python ${{ matrix.python-version }}
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
python-version: ["3.12"]
os: [ubuntu-latest]
steps:
- uses: actions/checkout@v4
- run: git fetch --prune --unshallow --tags
- name: Setup Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Global Setup
run: |
python -m pip install -U pip # Official recommended way
git config --global user.email "[email protected]"
git config --global user.name "CI Almighty"
- name: Install AWS requirements
run: pip install .[aws,test]

- name: Run RClone on AWS tests
run: pytest -rsx -n auto tests/test_on_data/test_yaml/yaml_aws_tools_tests.py
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Upcoming

## Features
* Added the `rclone_transfer_batch_job` helper function for executing Rclone data transfers in AWS Batch jobs. [PR #1085](https://github.com/catalystneuro/neuroconv/pull/1085)



## v0.6.4

## Deprecations
* Completely removed compression settings from most places [PR #1126](https://github.com/catalystneuro/neuroconv/pull/1126)

Expand Down Expand Up @@ -37,6 +44,8 @@
* Avoid running link test when the PR is on draft [PR #1093](https://github.com/catalystneuro/neuroconv/pull/1093)
* Centralize gin data preparation in a github action [PR #1095](https://github.com/catalystneuro/neuroconv/pull/1095)



# v0.6.4 (September 17, 2024)

## Bug Fixes
Expand Down
3 changes: 2 additions & 1 deletion src/neuroconv/tools/aws/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from ._submit_aws_batch_job import submit_aws_batch_job
from ._rclone_transfer_batch_job import rclone_transfer_batch_job

__all__ = ["submit_aws_batch_job"]
__all__ = ["submit_aws_batch_job", "rclone_transfer_batch_job"]
113 changes: 113 additions & 0 deletions src/neuroconv/tools/aws/_rclone_transfer_batch_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""Collection of helper functions for assessing and performing automated data transfers related to AWS."""

import warnings
from typing import Optional

from pydantic import FilePath, validate_call

from ._submit_aws_batch_job import submit_aws_batch_job


@validate_call
def rclone_transfer_batch_job(
*,
rclone_command: str,
job_name: str,
efs_volume_name: str,
rclone_config_file_path: Optional[FilePath] = None,
status_tracker_table_name: str = "neuroconv_batch_status_tracker",
compute_environment_name: str = "neuroconv_batch_environment",
job_queue_name: str = "neuroconv_batch_queue",
job_definition_name: Optional[str] = None,
minimum_worker_ram_in_gib: int = 4,
minimum_worker_cpus: int = 4,
submission_id: Optional[str] = None,
region: Optional[str] = None,
) -> dict[str, str]:
"""
Submit a job to AWS Batch for processing.
Requires AWS credentials saved to files in the `~/.aws/` folder or set as environment variables.
Parameters
----------
rclone_command : str
The command to pass directly to Rclone running on the EC2 instance.
E.g.: "rclone copy my_drive:testing_rclone /mnt/efs"
Must move data from or to '/mnt/efs'.
job_name : str
The name of the job to submit.
efs_volume_name : str
The name of an EFS volume to be created and attached to the job.
The path exposed to the container will always be `/mnt/efs`.
rclone_config_file_path : FilePath, optional
The path to the Rclone configuration file to use for the job.
If unspecified, method will attempt to find the file in `~/.rclone` and will raise an error if it cannot.
status_tracker_table_name : str, default: "neuroconv_batch_status_tracker"
The name of the DynamoDB table to use for tracking job status.
compute_environment_name : str, default: "neuroconv_batch_environment"
The name of the compute environment to use for the job.
job_queue_name : str, default: "neuroconv_batch_queue"
The name of the job queue to use for the job.
job_definition_name : str, optional
The name of the job definition to use for the job.
If unspecified, a name starting with 'neuroconv_batch_' will be generated.
minimum_worker_ram_in_gib : int, default: 4
The minimum amount of base worker memory required to run this job.
Determines the EC2 instance type selected by the automatic 'best fit' selector.
Recommended to be several GiB to allow comfortable buffer space for data chunk iterators.
minimum_worker_cpus : int, default: 4
The minimum number of CPUs required to run this job.
A minimum of 4 is required, even if only one will be used in the actual process.
submission_id : str, optional
The unique ID to pair with this job submission when tracking the status via DynamoDB.
Defaults to a random UUID4.
region : str, optional
The AWS region to use for the job.
If not provided, we will attempt to load the region from your local AWS configuration.
If that file is not found on your system, we will default to "us-east-2", the location of the DANDI Archive.
Returns
-------
info : dict
A dictionary containing information about this AWS Batch job.
info["job_submission_info"] is the return value of `boto3.client.submit_job` which contains the job ID.
info["table_submission_info"] is the initial row data inserted into the DynamoDB status tracking table.
"""
docker_image = "ghcr.io/catalystneuro/rclone_with_config:latest"

if "/mnt/efs" not in rclone_command:
message = (
f"The Rclone command '{rclone_command}' does not contain a reference to '/mnt/efs'. "
"Without utilizing the EFS mount, the instance is unlikely to have enough local disk space."
)
warnings.warn(message=message, stacklevel=2)

rclone_config_file_path = rclone_config_file_path or pathlib.Path.home() / ".rclone" / "rclone.conf"
if not rclone_config_file_path.exists():
raise FileNotFoundError(
f"Rclone configuration file not found at: {rclone_config_file_path}! "
"Please check that `rclone config` successfully created the file."
)
with open(file=rclone_config_file_path, mode="r") as io:
rclone_config_file_stream = io.read()

region = region or "us-east-2"

info = submit_aws_batch_job(
job_name=job_name,
docker_image=docker_image,
environment_variables={"RCLONE_CONFIG": rclone_config_file_stream, "RCLONE_COMMAND": rclone_command},
efs_volume_name=efs_volume_name,
status_tracker_table_name=status_tracker_table_name,
compute_environment_name=compute_environment_name,
job_queue_name=job_queue_name,
job_definition_name=job_definition_name,
minimum_worker_ram_in_gib=minimum_worker_ram_in_gib,
minimum_worker_cpus=minimum_worker_cpus,
submission_id=submission_id,
region=region,
)

return info
15 changes: 10 additions & 5 deletions src/neuroconv/tools/aws/_submit_aws_batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ def submit_aws_batch_job(
job_dependencies = job_dependencies or []
container_overrides = dict()
if environment_variables is not None:
container_overrides["environment"] = [{key: value} for key, value in environment_variables.items()]
container_overrides["environment"] = [
{"name": key, "value": value} for key, value in environment_variables.items()
]
if commands is not None:
container_overrides["command"] = commands

Expand Down Expand Up @@ -294,7 +296,7 @@ def _ensure_compute_environment_exists(
The AWS Batch client to use for the job.
max_retries : int, default: 12
If the compute environment does not already exist, then this is the maximum number of times to synchronously
check for its successful creation before erroring.
check for its successful creation before raising an error.
This is essential for a clean setup of the entire pipeline, or else later steps might error because they tried
to launch before the compute environment was ready.
"""
Expand Down Expand Up @@ -530,7 +532,11 @@ def _generate_job_definition_name(
"""
docker_tags = docker_image.split(":")[1:]
docker_tag = docker_tags[0] if len(docker_tags) > 1 else None
parsed_docker_image_name = docker_image.replace(":", "-") # AWS Batch does not allow colons in job definition names

# AWS Batch does not allow colons, slashes, or periods in job definition names
parsed_docker_image_name = str(docker_image)
for disallowed_character in [":", r"/", "."]:
parsed_docker_image_name = parsed_docker_image_name.replace(disallowed_character, "-")

job_definition_name = f"neuroconv_batch"
job_definition_name += f"_{parsed_docker_image_name}-image"
Expand All @@ -540,7 +546,6 @@ def _generate_job_definition_name(
job_definition_name += f"_{efs_id}"
if docker_tag is None or docker_tag == "latest":
date = datetime.now().strftime("%Y-%m-%d")
job_definition_name += f"_created-on-{date}"

return job_definition_name

Expand Down Expand Up @@ -641,7 +646,7 @@ def _ensure_job_definition_exists_and_get_arn(
]
mountPoints = [{"containerPath": "/mnt/efs/", "readOnly": False, "sourceVolume": "neuroconv_batch_efs_mounted"}]

# batch_client.register_job_definition() is not synchronous and so we need to wait a bit afterwards
# batch_client.register_job_definition is not synchronous and so we need to wait a bit afterwards
batch_client.register_job_definition(
jobDefinitionName=job_definition_name,
type="container",
Expand Down
3 changes: 2 additions & 1 deletion tests/docker_rclone_with_config_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ def test_direct_usage_of_rclone_with_config(self):

os.environ["RCLONE_CONFIG"] = rclone_config_file_stream
os.environ["RCLONE_COMMAND"] = (
f"rclone copy test_google_drive_remote:testing_rclone_with_config {self.test_folder} --verbose --progress --config ./rclone.conf"
f"rclone copy test_google_drive_remote:testing_rclone_with_config {self.test_folder} "
"--verbose --progress --config ./rclone.conf"
)

command = (
Expand Down
File renamed without changes.
Loading

0 comments on commit c4afad3

Please sign in to comment.