Skip to content

Commit

Permalink
Merge pull request #107 from AllenNeuralDynamics/release-v0.13.0
Browse files Browse the repository at this point in the history
Release v0.13.0
  • Loading branch information
helen-m-lin authored Nov 25, 2024
2 parents a82387b + 27bd3e0 commit 8dc615c
Show file tree
Hide file tree
Showing 12 changed files with 416 additions and 362 deletions.
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ dependencies = [
"boto3",
"boto3-stubs[s3]",
"pydantic-settings>=2.0",
"pydantic>=2.7,<2.9",
"pydantic>=2.10",
"pymongo==4.3.3",
"dask==2023.5.0",
"aind-data-schema==1.0.0",
"aind-data-schema==1.2.0",
"aind-codeocean-api==0.5.0",
]

Expand Down
2 changes: 1 addition & 1 deletion src/aind_data_asset_indexer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Package"""

__version__ = "0.12.0"
__version__ = "0.13.0"
21 changes: 13 additions & 8 deletions src/aind_data_asset_indexer/aind_bucket_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import boto3
import dask.bag as dask_bag
from aind_data_schema.base import is_dict_corrupt
from mypy_boto3_s3 import S3Client
from mypy_boto3_s3.type_defs import CopySourceTypeDef
from pymongo import MongoClient
Expand All @@ -30,7 +31,6 @@
get_dict_of_file_info,
get_s3_bucket_and_prefix,
get_s3_location,
is_dict_corrupt,
is_prefix_valid,
is_record_location_valid,
iterate_through_top_level,
Expand Down Expand Up @@ -215,15 +215,15 @@ def _resolve_schema_information(
The fields in the DocDb record that will require updating.
"""
docdb_record_fields_to_update = dict()
for core_schema_file_name in core_schema_file_names:
field_name = core_schema_file_name.replace(".json", "")
for (
field_name,
core_schema_file_name,
) in core_schema_file_names.items():
is_in_record = docdb_record.get(field_name) is not None
is_in_root = (
core_schema_info_in_root.get(core_schema_file_name) is not None
)
is_in_copy_subdir = (
core_schema_file_name in list_of_schemas_in_copy_subdir
)
is_in_copy_subdir = field_name in list_of_schemas_in_copy_subdir
# To avoid copying and pasting the same arguments, we'll keep it
# them in a dict
common_kwargs = {
Expand Down Expand Up @@ -580,8 +580,9 @@ def _process_prefix(
collection = db[
self.job_settings.doc_db_collection_name
]
if "_id" in json_contents:
# TODO: check is_dict_corrupt(json_contents)
if "_id" in json_contents and not is_dict_corrupt(
json_contents
):
logging.info(
f"Adding record to docdb for: {location}"
)
Expand All @@ -602,6 +603,10 @@ def _process_prefix(
self.job_settings.copy_original_md_subdir
),
)
elif "_id" in json_contents:
logging.warning(
f"Metadata record for {location} is corrupt!"
)
else:
logging.warning(
f"Metadata record for {location} "
Expand Down
142 changes: 47 additions & 95 deletions src/aind_data_asset_indexer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@

from aind_codeocean_api.codeocean import CodeOceanClient
from aind_data_schema.core.data_description import DataLevel, DataRegex
from aind_data_schema.core.metadata import ExternalPlatforms, Metadata
from aind_data_schema.utils.json_writer import SchemaWriter
from aind_data_schema.core.metadata import CORE_FILES as CORE_SCHEMAS
from aind_data_schema.core.metadata import (
ExternalPlatforms,
Metadata,
create_metadata_json,
)
from botocore.exceptions import ClientError
from mypy_boto3_s3 import S3Client
from mypy_boto3_s3.type_defs import (
Expand All @@ -23,12 +27,9 @@

metadata_filename = Metadata.default_filename()

# TODO: This would be better if it was available in aind-data-schema
core_schema_file_names = [
s.default_filename()
for s in SchemaWriter.get_schemas()
if s.default_filename() != metadata_filename
]
core_schema_file_names = {
field_name: f"{field_name}.json" for field_name in CORE_SCHEMAS
}


def create_object_key(prefix: str, filename: str) -> str:
Expand Down Expand Up @@ -328,11 +329,10 @@ def does_s3_metadata_copy_exist(
Bucket=bucket, Prefix=copy_prefix, Delimiter="/"
)
if "Contents" in response:
core_schemas = [s.replace(".json", "") for s in core_schema_file_names]
pattern = re.escape(copy_prefix) + r"([a-zA-Z0-9_]+)\.\d{8}\.json$"
for obj in response["Contents"]:
m = re.match(pattern, obj["Key"])
if m is not None and m.group(1) in core_schemas:
if m is not None and m.group(1) in CORE_SCHEMAS:
return True
return False

Expand All @@ -359,8 +359,8 @@ def list_metadata_copies(
Returns
-------
List[str]
A list of the core schemas in the copy_subdir without timestamp, e..g,
["subject.json", "procedures.json", "processing.json"]
A list of the core schemas in the copy_subdir without timestamp, e.g,
["subject", "procedures", "processing"]
"""
# Use trailing slash and delimiter to get top-level objects in copy_subdir
copy_prefix = create_object_key(prefix, copy_subdir.strip("/") + "/")
Expand All @@ -369,12 +369,11 @@ def list_metadata_copies(
)
files = []
if "Contents" in response:
core_schemas = [s.replace(".json", "") for s in core_schema_file_names]
pattern = re.escape(copy_prefix) + r"([a-zA-Z0-9_]+)\.\d{8}\.json$"
for obj in response["Contents"]:
m = re.match(pattern, obj["Key"])
if m is not None and m.group(1) in core_schemas:
files.append(f"{m.group(1)}.json")
if m is not None and m.group(1) in CORE_SCHEMAS:
files.append(m.group(1))
return files


Expand Down Expand Up @@ -440,12 +439,10 @@ def get_dict_of_core_schema_file_info(
...
}
"""
key_map = dict(
[
(create_object_key(prefix=prefix, filename=s), s)
for s in core_schema_file_names
]
)
key_map = {
create_object_key(prefix=prefix, filename=file_name): file_name
for file_name in core_schema_file_names.values()
}
file_info = get_dict_of_file_info(
s3_client=s3_client, bucket=bucket, keys=list(key_map.keys())
)
Expand Down Expand Up @@ -491,32 +488,6 @@ def iterate_through_top_level(
]


def is_dict_corrupt(input_dict: dict) -> bool:
"""
Checks that all the keys, included nested keys, don't contain '$' or '.'
Parameters
----------
input_dict : dict
Returns
-------
bool
True if input_dict is not a dict, or if nested dictionary keys contain
forbidden characters. False otherwise.
"""
if not isinstance(input_dict, dict):
return True
for key, value in input_dict.items():
if "$" in key or "." in key:
return True
elif isinstance(value, dict):
if is_dict_corrupt(value):
return True
return False


def download_json_file_from_s3(
s3_client: S3Client, bucket: str, object_key: str
) -> Optional[dict]:
Expand Down Expand Up @@ -578,44 +549,34 @@ def build_metadata_record_from_prefix(
there are issues with Metadata construction.
"""
file_keys = [
create_object_key(prefix=prefix, filename=file_name)
for file_name in core_schema_file_names
]
s3_file_responses = get_dict_of_file_info(
s3_client=s3_client, bucket=bucket, keys=file_keys
core_files_infos = get_dict_of_core_schema_file_info(
s3_client=s3_client, bucket=bucket, prefix=prefix
)
record_name = prefix.strip("/") if optional_name is None else optional_name
try:
metadata_dict = {
"name": record_name,
"location": get_s3_location(bucket=bucket, prefix=prefix),
}
if optional_created is not None:
metadata_dict["created"] = optional_created
if optional_external_links is not None:
metadata_dict["external_links"] = optional_external_links
for object_key, response_data in s3_file_responses.items():
core_jsons = dict()
for field_name, file_name in core_schema_file_names.items():
response_data = core_files_infos.get(file_name)
if response_data is not None:
field_name = object_key.split("/")[-1].replace(".json", "")
object_key = create_object_key(prefix, file_name)
json_contents = download_json_file_from_s3(
s3_client=s3_client, bucket=bucket, object_key=object_key
)
if json_contents is not None:
is_corrupt = is_dict_corrupt(input_dict=json_contents)
if not is_corrupt:
metadata_dict[field_name] = json_contents
# TODO: We should handle constructing the Metadata file in a better way
# in aind-data-schema. By using model_validate, a lot of info from the
# original files get removed. For now, we can use model_construct
# until a better method is implemented in aind-data-schema. This will
# mark all the initial files as metadata_status=Unknown
metadata_dict = Metadata.model_construct(
**metadata_dict
).model_dump_json(warnings=False, by_alias=True)
core_jsons[field_name] = json_contents
# Construct Metadata file using core schema jsons
# Validation and de/serialization are handled in aind-data-schema
metadata_dict = create_metadata_json(
name=record_name,
location=get_s3_location(bucket=bucket, prefix=prefix),
core_jsons=core_jsons,
optional_created=optional_created,
optional_external_links=optional_external_links,
)
metadata_str = json.dumps(metadata_dict)
except Exception:
metadata_dict = None
return metadata_dict
metadata_str = None
return metadata_str


def cond_copy_then_sync_core_json_files(
Expand Down Expand Up @@ -705,17 +666,13 @@ def copy_core_json_files(
"""
tgt_copy_subdir = copy_original_md_subdir.strip("/")
tgt_copy_prefix = create_object_key(prefix, tgt_copy_subdir)
core_files_keys = [
create_object_key(prefix=prefix, filename=s)
for s in core_schema_file_names
]
core_files_infos = get_dict_of_file_info(
s3_client=s3_client, bucket=bucket, keys=core_files_keys
core_files_infos = get_dict_of_core_schema_file_info(
s3_client=s3_client, bucket=bucket, prefix=prefix
)
for file_name in core_schema_file_names:
for file_name in core_schema_file_names.values():
source = create_object_key(prefix, file_name)
source_location = get_s3_location(bucket=bucket, prefix=source)
source_file_info = core_files_infos[source]
source_file_info = core_files_infos[file_name]
if source_file_info is not None:
date_stamp = source_file_info["last_modified"].strftime("%Y%m%d")
target = create_object_key(
Expand Down Expand Up @@ -766,16 +723,11 @@ def sync_core_json_files(
None
"""
md_record_json = json.loads(metadata_json)
core_files_keys = [
create_object_key(prefix=prefix, filename=s)
for s in core_schema_file_names
]
core_files_infos = get_dict_of_file_info(
s3_client=s3_client, bucket=bucket, keys=core_files_keys
core_files_infos = get_dict_of_core_schema_file_info(
s3_client=s3_client, bucket=bucket, prefix=prefix
)
for file_name in core_schema_file_names:
for field_name, file_name in core_schema_file_names.items():
object_key = create_object_key(prefix, file_name)
field_name = file_name.replace(".json", "")
location = get_s3_location(bucket=bucket, prefix=object_key)
if (
field_name in md_record_json
Expand All @@ -785,7 +737,7 @@ def sync_core_json_files(
field_contents_str = json.dumps(field_contents)
# Core schema jsons are created if they don't already exist.
# Otherwise, they are only updated if their contents are outdated.
if core_files_infos[object_key] is None:
if core_files_infos[file_name] is None:
logging.info(f"Uploading new {field_name} to {location}")
response = upload_json_str_to_s3(
bucket=bucket,
Expand All @@ -795,7 +747,7 @@ def sync_core_json_files(
)
logging.debug(response)
else:
s3_object_hash = core_files_infos[object_key]["e_tag"].strip(
s3_object_hash = core_files_infos[file_name]["e_tag"].strip(
'"'
)
core_field_md5_hash = compute_md5_hash(field_contents_str)
Expand All @@ -818,7 +770,7 @@ def sync_core_json_files(
else:
# If a core field is None but the core json exists,
# delete the core json.
if core_files_infos[object_key] is not None:
if core_files_infos[file_name] is not None:
logging.info(
f"{field_name} not found in metadata.nd.json for "
f"{prefix} but {location} exists! Deleting."
Expand Down
Loading

0 comments on commit 8dc615c

Please sign in to comment.