diff --git a/aeon/dj_pipeline/__init__.py b/aeon/dj_pipeline/__init__.py index b319f55b..72e57718 100644 --- a/aeon/dj_pipeline/__init__.py +++ b/aeon/dj_pipeline/__init__.py @@ -44,6 +44,7 @@ def fetch_stream(query, drop_pk=True): df.drop(columns=cols2drop, inplace=True, errors="ignore") df.rename(columns={"timestamps": "time"}, inplace=True) df.set_index("time", inplace=True) + df.sort_index(inplace=True) return df diff --git a/aeon/dj_pipeline/acquisition.py b/aeon/dj_pipeline/acquisition.py index 9823435d..b20c1a0c 100644 --- a/aeon/dj_pipeline/acquisition.py +++ b/aeon/dj_pipeline/acquisition.py @@ -385,18 +385,13 @@ def ingest_chunks(cls, experiment_name): continue chunk_start = chunk.name + chunk_start = max(chunk_start, epoch_start) # first chunk of the epoch starts at epoch_start chunk_end = chunk_start + datetime.timedelta(hours=io_api.CHUNK_DURATION) if EpochEnd & epoch_key: epoch_end = (EpochEnd & epoch_key).fetch1("epoch_end") chunk_end = min(chunk_end, epoch_end) - if chunk_start in chunk_starts: - # handle cases where two chunks with identical start_time - # (starts in the same hour) but from 2 consecutive epochs - # using epoch_start as chunk_start in this case - chunk_start = epoch_start - # --- insert to Chunk --- chunk_key = {"experiment_name": experiment_name, "chunk_start": chunk_start} diff --git a/aeon/dj_pipeline/analysis/block_analysis.py b/aeon/dj_pipeline/analysis/block_analysis.py index 517ff373..e49cf71d 100644 --- a/aeon/dj_pipeline/analysis/block_analysis.py +++ b/aeon/dj_pipeline/analysis/block_analysis.py @@ -1,5 +1,4 @@ import json - import datajoint as dj import numpy as np import pandas as pd @@ -58,7 +57,7 @@ def make(self, key): ) block_query = acquisition.Environment.BlockState & chunk_restriction - block_df = fetch_stream(block_query).sort_index()[previous_block_start:chunk_end] + block_df = fetch_stream(block_query)[previous_block_start:chunk_end] block_ends = block_df[block_df.pellet_ct.diff() < 0] @@ -87,7 +86,7 @@ def make(self, key): block_entries[-1]["block_end"] = block_end block_entries.append({**exp_key, "block_start": block_end, "block_end": None}) - Block.insert(block_entries) + Block.insert(block_entries, skip_duplicates=True) self.insert1(key) @@ -155,10 +154,9 @@ def make(self, key): ) for streams_table in streams_tables: if len(streams_table & chunk_keys) < len(streams_table.key_source & chunk_keys): - logger.info( - f"{streams_table.__name__} not yet fully ingested for block: {key}. Skip BlockAnalysis (to retry later)..." + raise ValueError( + f"BlockAnalysis Not Ready - {streams_table.__name__} not yet fully ingested for block: {key}. Skipping (to retry later)..." ) - return self.insert1({**key, "block_duration": (block_end - block_start).total_seconds() / 3600}) @@ -323,7 +321,6 @@ def make(self, key): subject_names = [s["subject_name"] for s in block_subjects] # Construct subject position dataframe subjects_positions_df = pd.concat( - [ pd.DataFrame( {"subject_name": [s["subject_name"]] * len(s["position_timestamps"])} diff --git a/aeon/dj_pipeline/populate/worker.py b/aeon/dj_pipeline/populate/worker.py index c1a8f86c..e538c8cb 100644 --- a/aeon/dj_pipeline/populate/worker.py +++ b/aeon/dj_pipeline/populate/worker.py @@ -64,7 +64,7 @@ def ingest_environment_visits(): acquisition_worker(acquisition.EpochConfig) acquisition_worker(acquisition.Environment) # acquisition_worker(ingest_environment_visits) -acquisition_worker(block_analysis.BlockDetection) +# acquisition_worker(block_analysis.BlockDetection) # configure a worker to handle pyrat sync pyrat_worker = DataJointWorker( @@ -87,6 +87,7 @@ def ingest_environment_visits(): db_prefix=db_prefix, max_idled_cycle=50, sleep_duration=60, + autoclear_error_patterns=["%BlockAnalysis Not Ready%"], ) for attr in vars(streams).values(): diff --git a/aeon/dj_pipeline/utils/load_metadata.py b/aeon/dj_pipeline/utils/load_metadata.py index 56993880..f2639c22 100644 --- a/aeon/dj_pipeline/utils/load_metadata.py +++ b/aeon/dj_pipeline/utils/load_metadata.py @@ -201,10 +201,9 @@ def ingest_epoch_metadata(experiment_name, devices_schema, metadata_yml_filepath if not (streams.Device & device_key): logger.warning( - f"Device {device_name} (serial number: {device_sn}) is not yet registered in streams.Device. Skipping..." + f"Device {device_name} (serial number: {device_sn}) is not yet registered in streams.Device.\nThis should not happen - check if metadata.yml and schemas dotmap are consistent. Skipping..." ) # skip if this device (with a serial number) is not yet inserted in streams.Device - # this should not happen - check if metadata.yml and schemas dotmap are consistent continue device_list.append(device_key)