Skip to content

Commit

Permalink
Feat 37 delete docdb (#45)
Browse files Browse the repository at this point in the history
* delete records, refactor

* unit tests

* scraps refactor, linters

* fixes logging str mismatch
  • Loading branch information
mekhlakapoor authored May 1, 2024
1 parent 130b312 commit a7d8cb9
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 36 deletions.
6 changes: 3 additions & 3 deletions src/aind_data_asset_indexer/s3_crawler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Module to crawl through s3"""
"""Module to crawl through s3 and create redshift table"""

import os
import subprocess
Expand Down Expand Up @@ -32,7 +32,7 @@ class MetadataAnalyticsTableRow:
}


class AnalyticsJobRunner:
class AnalyticsTableJobRunner:
"""Class to handle creating metadata analytics table in redshift"""

def __init__(
Expand Down Expand Up @@ -215,7 +215,7 @@ def run_job(self, folders_filepath, metadata_directory):


if __name__ == "__main__":
job_runner = AnalyticsJobRunner(
job_runner = AnalyticsTableJobRunner(
redshift_secrets_name=REDSHIFT_SECRETS_NAME,
buckets=BUCKETS,
table_name=TABLE_NAME,
Expand Down
47 changes: 42 additions & 5 deletions src/aind_data_asset_indexer/update_docdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import os
from dataclasses import dataclass, field
from typing import Dict
from typing import Dict, List, Optional

import boto3
from pymongo import MongoClient
Expand Down Expand Up @@ -88,9 +88,14 @@ def read_metadata_files(self) -> Dict:
json_data_dict[prefix] = json.load(file)
return json_data_dict

def bulk_write_records(self):
"""Updates DocDB collection with metadata files"""
json_data = self.read_metadata_files()
def bulk_write_records(self, json_data: Optional[Dict]):
"""
Updates DocDB collection with metadata files
Parameters
----------
json_data: Dict
Dictionary of records in s3.
"""
if json_data:
bulk_operations = []
for prefix, data in json_data.items():
Expand All @@ -112,6 +117,38 @@ def bulk_write_records(self):
)
return None

def delete_records(self, s3_prefixes: List[str]):
"""
Cross-checks the names of records in docDB with the ones in
s3 and deletes records from docdb if not in s3.
Parameters
----------
s3_prefixes: List[str]
The names of records in s3.
"""
docdb_prefixes = self.collection.distinct("name")
prefixes_to_delete = set(docdb_prefixes) - set(s3_prefixes)

if prefixes_to_delete:
self.collection.delete_many(
{"s3_prefix": {"$in": list(prefixes_to_delete)}}
)
logger.info(
f"Deleted {len(prefixes_to_delete)} records from "
f"DocDB collection."
)
else:
logger.info("Records in S3 and DocDB are synced.")

return None

def run_sync_records_job(self):
"""Syncs records in DocDB to S3. """
json_data = self.read_metadata_files()
s3_prefixes = list(json_data.keys())
self.bulk_write_records(json_data)
self.delete_records(s3_prefixes=s3_prefixes)


if __name__ == "__main__":
mongo_configs = get_mongo_credentials(
Expand All @@ -120,4 +157,4 @@ def bulk_write_records(self):
job_runner = DocDBUpdater(
metadata_dir=METADATA_DIR, mongo_configs=mongo_configs
)
job_runner.bulk_write_records()
job_runner.run_sync_records_job()
112 changes: 96 additions & 16 deletions tests/test_docdb_updater.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
"""Test module for docdb updater"""
import os
import unittest
from pathlib import Path
from unittest.mock import MagicMock, patch

from pymongo.operations import UpdateMany

from aind_data_asset_indexer.update_docdb import (
DocDBUpdater,
MongoConfigs,
get_mongo_credentials,
)
from pathlib import Path
from pymongo.operations import UpdateMany

TEST_DIR = Path(os.path.dirname(os.path.realpath(__file__)))
METADATA_DIR = TEST_DIR / "resources" / "metadata_dir"
Expand Down Expand Up @@ -86,10 +87,7 @@ def test_read_metadata_files(self):
result["ecephys_test_2"]["metadata_status"], "Invalid"
)

@patch(
"aind_data_asset_indexer.update_docdb.DocDBUpdater.read_metadata_files"
)
def test_bulk_write_records(self, mock_read_metadata_files):
def test_bulk_write_records(self):
"""Tests write records successfully as expected."""
mock_collection = MagicMock()
mock_db = MagicMock()
Expand All @@ -102,8 +100,7 @@ def test_bulk_write_records(self, mock_read_metadata_files):
)
docdb_updater.mongo_client = mock_mongo_client
docdb_updater.collection = mock_collection
mock_read_metadata_files.return_value = {"data": "test_data"}
docdb_updater.bulk_write_records()
docdb_updater.bulk_write_records(json_data={"data": "test_data"})

mock_collection.bulk_write.assert_called_once_with(
[
Expand All @@ -118,13 +115,8 @@ def test_bulk_write_records(self, mock_read_metadata_files):
]
)

@patch(
"aind_data_asset_indexer.update_docdb.DocDBUpdater.read_metadata_files"
)
@patch("aind_data_asset_indexer.update_docdb.logger.error")
def test_bulk_write_records_empty_dir(
self, mock_logging_error, mock_read_metadata_files
):
def test_bulk_write_records_empty_dir(self, mock_logging_error):
"""Tests write records fails as expected."""
mock_collection = MagicMock()
mock_db = MagicMock()
Expand All @@ -137,14 +129,102 @@ def test_bulk_write_records_empty_dir(
)
docdb_updater.mongo_client = mock_mongo_client
docdb_updater.collection = mock_collection
mock_read_metadata_files.return_value = None
docdb_updater.bulk_write_records()
docdb_updater.bulk_write_records(json_data=None)

mock_collection.insert_many.assert_not_called()
mock_logging_error.assert_called_once_with(
"No JSON files found in the directory empty_dir."
)

@patch("aind_data_asset_indexer.update_docdb.logger.info")
def test_delete_records(self, mock_logging_info):
"""Tests that records are deleted from docdb collection as expected."""
mock_collection = MagicMock()
mock_db = MagicMock()
mock_db.__getitem__.return_value = mock_collection
mock_mongo_client = MagicMock()
mock_mongo_client.__getitem__.return_value = mock_db
s3_prefixes = ["prefix1", "prefix2", "prefix3"]
docdb_prefixes = ["prefix1", "prefix2", "prefix3", "prefix4"]
expected_prefixes_to_delete = {"prefix4"}

docdb_updater = DocDBUpdater(
metadata_dir="test_dir", mongo_configs=self.expected_configs
)
docdb_updater.mongo_client = mock_mongo_client
docdb_updater.collection = mock_collection
docdb_updater.collection.distinct.return_value = docdb_prefixes
docdb_updater.delete_records(s3_prefixes)
docdb_updater.collection.delete_many.assert_called_once_with(
{"s3_prefix": {"$in": list(expected_prefixes_to_delete)}}
)
mock_logging_info.assert_called_once_with(
"Deleted 1 records from DocDB collection."
)

@patch("aind_data_asset_indexer.update_docdb.logger.info")
def test_delete_records_nothing_to_delete(self, mock_logging_info):
"""Tests that records are deleted from docdb collection as expected."""
mock_collection = MagicMock()
mock_db = MagicMock()
mock_db.__getitem__.return_value = mock_collection
mock_mongo_client = MagicMock()
mock_mongo_client.__getitem__.return_value = mock_db
s3_prefixes = ["prefix1", "prefix2", "prefix3"]
docdb_prefixes = ["prefix1", "prefix2", "prefix3"]

docdb_updater = DocDBUpdater(
metadata_dir="test_dir", mongo_configs=self.expected_configs
)
docdb_updater.mongo_client = mock_mongo_client
docdb_updater.collection = mock_collection
docdb_updater.collection.distinct.return_value = docdb_prefixes
docdb_updater.delete_records(s3_prefixes)
docdb_updater.collection.delete_many.assert_not_called()
mock_logging_info.assert_called_once_with(
"Records in S3 and DocDB are synced."
)

@patch("aind_data_asset_indexer.update_docdb.logger.info")
@patch(
"aind_data_asset_indexer.update_docdb.DocDBUpdater.read_metadata_files"
)
def test_run_sync_records_job(
self, mock_read_metadata_files, mock_logging_info
):
"""Runs job to sync records from s3 to docdb"""
mock_collection = MagicMock()
mock_db = MagicMock()
mock_db.__getitem__.return_value = mock_collection
mock_mongo_client = MagicMock()
mock_mongo_client.__getitem__.return_value = mock_db

docdb_updater = DocDBUpdater(
metadata_dir="test_dir", mongo_configs=self.expected_configs
)
docdb_updater.mongo_client = mock_mongo_client
docdb_updater.collection = mock_collection
mock_read_metadata_files.return_value = {"data": "test_data"}
docdb_updater.run_sync_records_job()

# assert that bulk write is called
mock_collection.bulk_write.assert_called_once_with(
[
UpdateMany(
{"name": "data"},
{"$set": "test_data"},
True,
None,
None,
None,
)
]
)
mock_collection.delete_many.assert_not_called()
mock_logging_info.assert_called_with(
"Records in S3 and DocDB are synced."
)


if __name__ == "__main__":
unittest.main()
28 changes: 16 additions & 12 deletions tests/test_s3_crawler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Tests methods AnalyticsJobRunner class"""
"""Tests methods AnalyticsTableJobRunner class"""

import builtins
import os
Expand All @@ -10,7 +10,7 @@
from aind_data_access_api.rds_tables import RDSCredentials

from aind_data_asset_indexer.s3_crawler import (
AnalyticsJobRunner,
AnalyticsTableJobRunner,
MetadataAnalyticsTableRow,
)

Expand All @@ -25,8 +25,8 @@ def __init__(self, name, is_dir):
self.configure_mock(is_dir=MagicMock(return_value=is_dir))


class TestAnalyticsJobRunner(unittest.TestCase):
"""Test methods in AnalyticsJobRunner class"""
class TestAnalyticsTableJobRunner(unittest.TestCase):
"""Test methods in AnalyticsTableJobRunner class"""

sample_rds_credentials = RDSCredentials(
username="some_rds_user",
Expand All @@ -52,10 +52,10 @@ class TestAnalyticsJobRunner(unittest.TestCase):
)
@patch("aind_data_asset_indexer.s3_crawler.RDSCredentials")
def setUp(self, mock_rds_credentials):
"""Constructs AnalyticsJobRunner with mock creds"""
"""Constructs AnalyticsTableJobRunner with mock creds"""
self.mock_credentials = Mock(spec=RDSCredentials)
mock_rds_credentials.return_value = self.sample_rds_credentials
self.runner = AnalyticsJobRunner(
self.runner = AnalyticsTableJobRunner(
redshift_secrets_name=os.getenv("REDSHIFT_SECRETS_NAME"),
buckets=os.getenv("BUCKETS"),
table_name=os.getenv("TABLE_NAME"),
Expand Down Expand Up @@ -189,11 +189,15 @@ def test_join_dataframes(self):
result_df = self.runner._join_dataframes(df1, df2, bucket_name)
pd.testing.assert_frame_equal(result_df, expected_df)

@patch.object(AnalyticsJobRunner, "_get_list_of_folders")
@patch.object(AnalyticsJobRunner, "_download_metadata_files")
@patch.object(AnalyticsJobRunner, "_create_dataframe_from_list_of_folders")
@patch.object(AnalyticsJobRunner, "_create_dataframe_from_metadata_files")
@patch.object(AnalyticsJobRunner, "_join_dataframes")
@patch.object(AnalyticsTableJobRunner, "_get_list_of_folders")
@patch.object(AnalyticsTableJobRunner, "_download_metadata_files")
@patch.object(
AnalyticsTableJobRunner, "_create_dataframe_from_list_of_folders"
)
@patch.object(
AnalyticsTableJobRunner, "_create_dataframe_from_metadata_files"
)
@patch.object(AnalyticsTableJobRunner, "_join_dataframes")
def test_crawl_s3_buckets(
self,
mock_join_dataframes,
Expand Down Expand Up @@ -276,7 +280,7 @@ def test_crawl_s3_buckets(
pd.testing.assert_frame_equal(result_df, expected_df)

@patch(
"aind_data_asset_indexer.s3_crawler.AnalyticsJobRunner."
"aind_data_asset_indexer.s3_crawler.AnalyticsTableJobRunner."
"_crawl_s3_buckets"
)
def test_run_job(self, mock_crawl_s3_buckets):
Expand Down

0 comments on commit a7d8cb9

Please sign in to comment.