diff --git a/python/mspasspy/db/database.py b/python/mspasspy/db/database.py index 26105bb00..1d4f2bbbe 100755 --- a/python/mspasspy/db/database.py +++ b/python/mspasspy/db/database.py @@ -47,7 +47,6 @@ import uuid from mspasspy.ccore.io import _mseed_file_indexer, _fwrite_to_file, _fread_from_file -from mspasspy.util.converter import Trace2TimeSeries from mspasspy.ccore.seismic import ( TimeSeries, @@ -166,7 +165,7 @@ def __setstate__(self, data): # The following is also needed for this object to be serialized correctly # with dask distributed. Otherwise, the deserialized codec_options # will become a different type unrecognized by pymongo. Not sure why... - + from bson.codec_options import CodecOptions, TypeRegistry, DatetimeConversion from bson.binary import UuidRepresentation @@ -1053,6 +1052,7 @@ def save_data( data_tag=None, cremate=False, save_history=True, + normalizing_collections=["channel", "site", "source"], alg_name="save_data", alg_id="0", ): @@ -1419,6 +1419,19 @@ def save_data( with body as input. That creates a document for each dead datum either in the "cemetery" or "abortions" collection. See above for more deails. + :param normalizing_collections: list of collection names dogmatically treated + as normalizing collection names. The keywords in the list are used + to always (i.e. for all modes) erase any attribute with a key name + of the form `collection_attribute where `collection` is one of the collection + names in this list and attribute is any string. Attribute names with the "_" + separator are saved unless the collection field matches one one of the + strings (e.g. "channel_vang" will be erased before saving to the + wf collection while "foo_bar" will not be erased.) This list should + ONLY be changed if a different schema than the default mspass schema + is used and different names are used for normalizing collections. + (e.g. if one added a "shot" collection to the schema the list would need + to be changed to at least add "shot".) + :type normalizing_collection: list if strings defining collection names. :param save_history: When True the optional history data will be saved to the database if it was actually enabled in the workflow. If the history container is empty will silently do nothing. @@ -1506,6 +1519,7 @@ def save_data( save_history, data_tag, storage_mode, + normalizing_collections, alg_name, alg_id, ) @@ -1523,6 +1537,7 @@ def save_data( save_history, data_tag, storage_mode, + normalizing_collections, alg_name, alg_id, ) @@ -2630,6 +2645,7 @@ def update_metadata( mode="cautious", exclude_keys=None, force_keys=None, + normalizing_collections=["channel", "site", "source"], alg_name="Database.update_metadata", ): """ @@ -2768,29 +2784,8 @@ def update_metadata( if not str(copied_metadata[k]).strip(): copied_metadata.erase(k) - # remove any defined items in exclude list - for k in exclude_keys: - if k in copied_metadata: - copied_metadata.erase(k) - # Now remove any readonly data - for k in copied_metadata.keys(): - if k == "_id": - continue - if save_schema.is_defined(k): - if save_schema.readonly(k): - if k in changed_key_list: - newkey = "READONLYERROR_" + k - copied_metadata.change_key(k, newkey) - mspass_object.elog.log_error( - "Database.update_metadata", - "readonly attribute with key=" - + k - + " was improperly modified. Saved changed value with key=" - + newkey, - ErrorSeverity.Complaint, - ) - else: - copied_metadata.erase(k) + # always exclude normalization data defined by names like site_lat + copied_metadata = _erase_normalized(copied_metadata, normalizing_collections) # Done editing, now we convert copied_metadata to a python dict # using this Metadata method or the long version when in cautious or pedantic mode insertion_dict = dict() @@ -2801,6 +2796,26 @@ def update_metadata( if k in copied_metadata: insertion_dict[k] = copied_metadata[k] else: + # first handle readonly constraints + for k in copied_metadata.keys(): + if k == "_id": + continue + if save_schema.is_defined(k): + if save_schema.readonly(k): + if k in changed_key_list: + newkey = "READONLYERROR_" + k + copied_metadata.change_key(k, newkey) + mspass_object.elog.log_error( + "Database.update_metadata", + "readonly attribute with key=" + + k + + " was improperly modified. Saved changed value with key=" + + newkey, + ErrorSeverity.Complaint, + ) + else: + copied_metadata.erase(k) + # Other modes have to test every key and type of value # before continuing. pedantic logs an error for all problems # Both attempt to fix type mismatches before update. Cautious @@ -2928,6 +2943,7 @@ def update_data( exclude_keys=None, force_keys=None, data_tag=None, + normalizing_collections=["channel", "site", "source"], alg_id="0", alg_name="Database.update_data", ): @@ -2979,6 +2995,19 @@ def update_data( also will drop any key-value pairs where the value cannot be converted to the type defined in the schema. :type mode: :class:`str` + :param normalizing_collections: list of collection names dogmatically treated + as normalizing collection names. The keywords in the list are used + to always (i.e. for all modes) erase any attribute with a key name + of the form `collection_attribute where `collection` is one of the collection + names in this list and attribute is any string. Attribute names with the "_" + separator are saved unless the collection field matches one one of the + strings (e.g. "channel_vang" will be erased before saving to the + wf collection while "foo_bar" will not be erased.) This list should + ONLY be changed if a different schema than the default mspass schema + is used and different names are used for normalizing collections. + (e.g. if one added a "shot" collection to the schema the list would need + to be changed to at least add "shot".) + :type normalizing_collection: list if strings defining collection names. :param alg_name: alg_name is the name the func we are gonna save while preserving the history. (defaults to 'Database.update_data' and should not normally need to be changed) :type alg_name: :class:`str` @@ -3006,6 +3035,7 @@ def update_data( mode=mode, exclude_keys=exclude_keys, force_keys=force_keys, + normalizing_collections=normalizing_collections, alg_name=alg_name, ) except: @@ -6417,6 +6447,7 @@ def _atomic_save_all_documents( save_history, data_tag, storage_mode, + normalizing_collections, alg_name, alg_id, ): @@ -6434,7 +6465,11 @@ def _atomic_save_all_documents( """ self._sync_metadata_before_update(mspass_object) insertion_dict, aok, elog = md2doc( - mspass_object, save_schema, exclude_keys=exclude_keys, mode=mode + mspass_object, + save_schema, + exclude_keys=exclude_keys, + mode=mode, + normalizing_collections=normalizing_collections, ) # exclude_keys edits insertion_dict but we need to do the same to mspass_object # to assure whem data is returned it is identical to what would @@ -7423,7 +7458,13 @@ def index_mseed_file_parallel(db, *arg, **kwargs): return ret -def md2doc(md, save_schema, exclude_keys=None, mode="promiscuous") -> []: +def md2doc( + md, + save_schema, + exclude_keys=None, + mode="promiscuous", + normalizing_collections=["channel", "site", "source"], +) -> {}: """ Converts a Metadata container to a python dict applying a schema constraints. @@ -7531,7 +7572,7 @@ def md2doc(md, save_schema, exclude_keys=None, mode="promiscuous") -> []: if k in copied_metadata: copied_metadata.erase(k) # the special mongodb key _id is currently set readonly in - # the mspass schema. It would be cleard in the following loop + # the mspass schema. It would be cleared in the following loop # but it is better to not depend on that external constraint. # The reason is the insert_one used below for wf collections # will silently update an existing record if the _id key @@ -7540,24 +7581,13 @@ def md2doc(md, save_schema, exclude_keys=None, mode="promiscuous") -> []: # we make sure we clear it if "_id" in copied_metadata: copied_metadata.erase("_id") - # Now remove any readonly data - for k in copied_metadata.keys(): - if save_schema.is_defined(k): - if save_schema.readonly(k): - if k in changed_key_list: - newkey = "READONLYERROR_" + k - copied_metadata.change_key(k, newkey) - elog.log_error( - "Database.save_data", - "readonly attribute with key=" - + k - + " was improperly modified. Saved changed value with key=" - + newkey, - ErrorSeverity.Complaint, - ) - else: - copied_metadata.erase(k) + # always strip normalizing data from standard collections + # Note this usage puts the definition of "standard collection" + # to the default of the argument normalizing_collection of this + # function. May want to allow callers to set this and add a + # value for the list to args of this function + copied_metadata = _erase_normalized(copied_metadata, normalizing_collections) # this section creates a python dict from the metadata container. # it applies safties based on mode argument (see user's manual) if mode == "promiscuous": @@ -7565,6 +7595,23 @@ def md2doc(md, save_schema, exclude_keys=None, mode="promiscuous") -> []: # the way the bindings were defined insertion_doc = dict(copied_metadata) else: + for k in copied_metadata.keys(): + if save_schema.is_defined(k): + if save_schema.readonly(k): + if k in changed_key_list: + newkey = "READONLYERROR_" + k + copied_metadata.change_key(k, newkey) + elog.log_error( + "Database.save_data", + "readonly attribute with key=" + + k + + " was improperly modified. Saved changed value with key=" + + newkey, + ErrorSeverity.Complaint, + ) + else: + copied_metadata.erase(k) + # Other modes have to test every key and type of value # before continuing. pedantic kills data with any problems # Cautious tries to fix the problem first @@ -8078,3 +8125,45 @@ class pointing to the named collection. The other allowed normalizer_list.append(this_normalizer) return normalizer_list + + +def _erase_normalized( + md, normalizing_collections=["channel", "site", "source"] +) -> Metadata: + """ + Erases data from a Metadata container assumed to come from normalization. + + In MsPASS attributes loaded with data from normalizing collections + are always of the form collection_key where collection is the name of the + normalizing collection and key is a simpler named used to store that + attribute in the normalizing collection. e.g. the "lat" latitude of + an entry in the "site" collection would be posted to a waveform + Metadata as "site_lat". One does not normally want to copy such + attributes back to the database when saving as it defeats the purpose of + normalization and can create confusions about which copy is definitive. + For that reason Database such attributes are erased before saving + unless overridden. This simple function standardizes that process. + + This function is mainly for internal use and has no safties. + + :param md: input Metadata container to use. Note this can be a + MsPASS seismic data object that inherits metadata and the function + will work as it copies the content to a Metadata container. + :type md: :class:`mspasspy.ccore.utility.Metadata` or a C++ data + object that inherits Metadata (that means all MsPASS seismid data objects) + :param normalizing_collection: list of standard collection names that + are defined for normalization. These are an argument only to put them + in a standard place. They should not be changed unless a new + normalizing collection name is added or a different schema is used + that has different names. + """ + # make a copy - not essential but small cost for stability + # make a copy - not essential but small cost for stability + mdout = Metadata(md) + for k in mdout.keys(): + split_list = k.split("_") + if len(split_list) >= 2: # gt allows things like channel_foo_bar + if split_list[1] != "id": # never erase any of the form word_id + if split_list[0] in normalizing_collections: + mdout.erase(k) + return mdout diff --git a/python/mspasspy/io/distributed.py b/python/mspasspy/io/distributed.py index 6a2cde11c..cb12f48d2 100644 --- a/python/mspasspy/io/distributed.py +++ b/python/mspasspy/io/distributed.py @@ -18,19 +18,8 @@ from mspasspy.db.client import DBClient -from mspasspy.ccore.seismic import ( - TimeSeries, - Seismogram, - TimeSeriesEnsemble, - SeismogramEnsemble, -) from mspasspy.ccore.utility import ( - AtomicType, - Metadata, - MsPASSError, ErrorLogger, - ErrorSeverity, - ProcessingHistory, ) import dask @@ -985,6 +974,7 @@ def _save_ensemble_wfdocs( exclude_keys, mode, undertaker, + normalizing_collections, cremate=False, post_elog=True, save_history=False, @@ -1042,6 +1032,7 @@ def _save_ensemble_wfdocs( User's manual for description of this common argument. :poram undertaker: Instance of :class:`mspasspy.util.Undertaker` to handle dead data (see above) + :param normalizing_collections: see docstring for `write_distributed_data`. :param cremate: tells Undertaker how to handle dead data (see above) :param post_elog: see above :param save_history: see above @@ -1073,7 +1064,11 @@ def _save_ensemble_wfdocs( # md2doc failure signaled with aok False for d in ensemble_data.member: doc, aok, elog = md2doc( - d, save_schema=save_schema, exclude_keys=exclude_keys, mode=mode + d, + save_schema=save_schema, + exclude_keys=exclude_keys, + mode=mode, + normalizing_collections=normalizing_collections, ) if aok: if data_tag: @@ -1127,6 +1122,7 @@ def _atomic_extract_wf_document( save_schema, exclude_keys, mode, + normalizing_collections, post_elog=True, elog_key="error_log", post_history=False, @@ -1156,9 +1152,8 @@ def _atomic_extract_wf_document( Error log and history data handling are as described in the docstring for `write_distributed_data` with which this function is - intimately linked. - - + intimately linked. Similarly all the argument descriptions can + be found in that docstring. :return: python dict translation of Metadata container of input datum d. Note the return always has a boolean value associated @@ -1177,6 +1172,7 @@ def _atomic_extract_wf_document( save_schema=save_schema, exclude_keys=exclude_keys, mode=mode, + normalizing_collections=normalizing_collections, ) # cremate or bury dead data. # both return an edited data object reduced to ashes or a skeleton @@ -1236,6 +1232,7 @@ def write_distributed_data( save_history=False, post_history=False, cremate=False, + normalizing_collections=["channel", "site", "source"], alg_name="write_distributed_data", alg_id="0", ) -> list: @@ -1470,6 +1467,21 @@ class method `save_data`. See the docstring for details but the `bury` method will be called instead which saves a skeleton (error log and Metadata content) of the results in the "cemetery" collection. + + :param normalizing_collections: list of collection names dogmatically treated + as normalizing collection names. The keywords in the list are used + to always (i.e. for all modes) erase any attribute with a key name + of the form `collection_attribute where `collection` is one of the collection + names in this list and attribute is any string. Attribute names with the "_" + separator are saved unless the collection field matches one one of the + strings (e.g. "channel_vang" will be erased before saving to the + wf collection while "foo_bar" will not be erased.) This list should + ONLY be changed if a different schema than the default mspass schema + is used and different names are used for normalizing collections. + (e.g. if one added a "shot" collection to the schema the list would need + to be changed to at least add "shot".) + :type normalizing_collection: list if strings defining collection names. + :param alg_name: do not change :param alg_id: algorithm id for object-level history. Normally assigned by global history manager. @@ -1557,6 +1569,7 @@ class method `save_data`. See the docstring for details but the save_schema, exclude_keys, mode, + normalizing_collections, post_elog=post_elog, save_history=save_history, post_history=post_history, @@ -1592,6 +1605,7 @@ class method `save_data`. See the docstring for details but the exclude_keys, mode, stedronsky, + normalizing_collections, cremate=cremate, post_elog=post_elog, save_history=save_history, @@ -1619,6 +1633,7 @@ class method `save_data`. See the docstring for details but the save_schema, exclude_keys, mode, + normalizing_collections, post_elog=post_elog, save_history=save_history, post_history=post_history, @@ -1653,6 +1668,7 @@ class method `save_data`. See the docstring for details but the exclude_keys, mode, stedronsky, + normalizing_collections, cremate=cremate, post_elog=post_elog, save_history=save_history, @@ -1744,13 +1760,6 @@ def read_to_dataframe( :type retrieve_history_record: :class:`bool` """ collection = cursor.collection.name - try: - wf_collection = db.database_schema.default_name(collection) - except MsPASSError as err: - raise MsPASSError( - "collection {} is not defined in database schema".format(collection), - "Invalid", - ) from err dbschema = db.database_schema mdschema = db.metadata_schema this_elog = ErrorLogger() diff --git a/python/tests/db/test_database.py b/python/tests/db/test_database.py index 34bf38efe..1c04e2c25 100644 --- a/python/tests/db/test_database.py +++ b/python/tests/db/test_database.py @@ -52,6 +52,7 @@ from mspasspy.db.database import Database from mspasspy.db.client import DBClient + class TestDatabase: def setup_class(self): client = DBClient("localhost") @@ -66,17 +67,13 @@ def setup_class(self): # this is may not be necessary but is useful to be sure # state is clear self.test_ts.clear_modified() - # this is used to test automatic dropping of any value + # this is used to test automatic dropping of any value # that is all all spaces - self.test_ts["test"] = " " + self.test_ts["test"] = " " self.test_ts["extra1"] = "extra1" self.test_ts["extra2"] = "extra2" # exclude - self.test_ts.elog.log_error( - "alg", str("message"), ErrorSeverity.Informational - ) - self.test_ts.elog.log_error( - "alg", str("message"), ErrorSeverity.Informational - ) + self.test_ts.elog.log_error("alg", str("message"), ErrorSeverity.Informational) + self.test_ts.elog.log_error("alg", str("message"), ErrorSeverity.Informational) # this is used to test aliases self.test_ts.erase("starttime") self.test_ts["t0"] = datetime.utcnow().timestamp() @@ -86,9 +83,9 @@ def setup_class(self): # this is may not be necessary but is useful to be sure # state is clear self.test_ts.clear_modified() - # this is used to test automatic dropping of any value + # this is used to test automatic dropping of any value # that is all all spaces - self.test_seis["test"] = " " + self.test_seis["test"] = " " self.test_seis["extra1"] = "extra1" self.test_seis["extra2"] = "extra2" # exclude self.test_seis.elog.log_error( @@ -97,11 +94,10 @@ def setup_class(self): self.test_seis.elog.log_error( "alg", str("message"), ErrorSeverity.Informational ) - + self.test_seis.erase("starttime") self.test_seis["t0"] = datetime.utcnow().timestamp() - site_id = ObjectId() channel_id = ObjectId() source_id = ObjectId() @@ -214,9 +210,7 @@ def test_save_and_read_data(self): tmp_seis_2 = Seismogram() tmp_seis_2.npts = 255 self.db._read_data_from_dfile(tmp_seis_2, dir, dfile, foff) - assert all( - a.any() == b.any() for a, b in zip(tmp_seis.data, tmp_seis_2.data) - ) + assert all(a.any() == b.any() for a, b in zip(tmp_seis.data, tmp_seis_2.data)) tmp_ts = get_live_timeseries() ts_return = self.db._save_sample_data_to_file(tmp_ts, dir, dfile) @@ -258,16 +252,14 @@ def test_save_and_read_data(self): nbytes = seis_return["nbytes"] assert seis_return["format"] == "mseed" assert seis_return["storage_mode"] == "file" - + tmp_seis_2 = Seismogram() # this method is a bit weird in acting like a subroutine # result is returned in tmp_seis_2 self.db._read_data_from_dfile( tmp_seis_2, dir, dfile, foff, nbytes, format="mseed" ) - assert all( - a.any() == b.any() for a, b in zip(tmp_seis.data,tmp_seis_2.data) - ) + assert all(a.any() == b.any() for a, b in zip(tmp_seis.data, tmp_seis_2.data)) tmp_ts = get_live_timeseries() ts_return = self.db._save_sample_data_to_file( @@ -285,7 +277,7 @@ def test_save_and_read_data(self): nbytes = ts_return["nbytes"] assert ts_return["format"] == "mseed" assert ts_return["storage_mode"] == "file" - + tmp_ts_2 = TimeSeries() self.db._read_data_from_dfile( tmp_ts_2, dir, dfile, foff, nbytes, format="mseed" @@ -302,9 +294,7 @@ def test_save_and_read_gridfs(self): tmp_seis_2 = Seismogram() tmp_seis_2.npts = 255 self.db._read_data_from_gridfs(tmp_seis_2, gridfs_id) - assert all( - a.any() == b.any() for a, b in zip(tmp_seis.data, tmp_seis_2.data) - ) + assert all(a.any() == b.any() for a, b in zip(tmp_seis.data, tmp_seis_2.data)) with pytest.raises(KeyError, match="npts is not defined"): tmp_seis_2.erase("npts") @@ -373,23 +363,24 @@ def test_save_load_history(self): assert res assert "save_uuid" in res assert res["save_stage"] == 2 - # when ot given an alg_name and alg_id _save_history extracts it - # from the end of the history chain. This tests that is done + # when ot given an alg_name and alg_id _save_history extracts it + # from the end of the history chain. This tests that is done assert res["alg_name"] == "dummy_func_2" assert res["alg_id"] == "2" assert "processing_history" in res - - # We tested _test_history above in isolation. Now - # use save_data and verify we get the same history - # chain saved but with a wf id cross reference in the + # We tested _test_history above in isolation. Now + # use save_data and verify we get the same history + # chain saved but with a wf id cross reference in the # saved document ts = TimeSeries(ts0) - ts_saved = self.db.save_data(ts,save_history=True,alg_id="testid",return_data=True) + ts_saved = self.db.save_data( + ts, save_history=True, alg_id="testid", return_data=True + ) assert ts_saved.live assert "history_object_id" in ts_saved hid = ts_saved["history_object_id"] - doc = self.db.history_object.find_one({"_id" : hid}) + doc = self.db.history_object.find_one({"_id": hid}) assert doc assert "save_uuid" in doc # save_data adds it's stamp to history with tag "save_data" @@ -401,22 +392,24 @@ def test_save_load_history(self): # only difference from direct _save_history call are these assert "wf_TimeSeries_id" in doc assert ts_saved.is_origin() - - # Now simulate taking the output of save and passing it to - # a differnt algorithm and saving it. - # save_data clears the history chain after saving and sets + + # Now simulate taking the output of save and passing it to + # a differnt algorithm and saving it. + # save_data clears the history chain after saving and sets # ts_saved to define the result as an "origin" (assert immediatley above) # after simulting the algorithm we save and verify that result logging_helper.info(ts_saved, "3", "dummy_func_3") nodes = ts_saved.get_nodes() assert ts_saved.number_of_stages() == 1 - #changing alg_id in this context would not be normal. Done + # changing alg_id in this context would not be normal. Done # here to assure test below is unique for the saved document - ts_saved = self.db.save_data(ts_saved,save_history=True,alg_id="testid2",return_data=True) + ts_saved = self.db.save_data( + ts_saved, save_history=True, alg_id="testid2", return_data=True + ) assert ts_saved.live assert "history_object_id" in ts_saved hid = ts_saved["history_object_id"] - doc = self.db.history_object.find_one({"_id" : hid}) + doc = self.db.history_object.find_one({"_id": hid}) assert doc assert "save_uuid" in doc # save_data adds it's stamp to history with tag "save_data" @@ -431,17 +424,17 @@ def test_save_load_history(self): # Next test the private _load_history method by itself # We retrieve the document last saved into a - # TimeSeries container. Content tests match those - # immediately above, but we use the ProcessingHistory + # TimeSeries container. Content tests match those + # immediately above, but we use the ProcessingHistory # api instead of the document retrieved with find_one # ts is copied as a convenience to get a live datum. - # note use of ts_saved is a dependence from above that is + # note use of ts_saved is a dependence from above that is # critical for this test ts_2 = TimeSeries(ts) - history_object_id = ts_saved['history_object_id'] + history_object_id = ts_saved["history_object_id"] self.db._load_history( - ts_2, - history_object_id, + ts_2, + history_object_id, alg_name="test_load_history", alg_id="tlh_0", ) @@ -449,60 +442,55 @@ def test_save_load_history(self): assert ts_2.current_nodedata().algorithm == "test_load_history" assert ts_2.current_nodedata().algid == "tlh_0" - # repeat with null id field # In that case result should be set as origin - # and there should be no elog entry as that state is + # and there should be no elog entry as that state is # treated as a signal to define this as an origin ts_3 = TimeSeries(ts) self.db._load_history( ts_3, - None, - alg_name="test_alg_name", - alg_id="test_alg_id", + None, + alg_name="test_alg_name", + alg_id="test_alg_id", ) assert ts_3.is_origin() nd = ts_3.current_nodedata() assert nd.algorithm == "test_alg_name" assert nd.algid == "test_alg_id" - assert ts_3.elog.size()==0 - - # Repeat above with an invalid ObjectId. Result - # shouldd be similar to that with None but there + assert ts_3.elog.size() == 0 + + # Repeat above with an invalid ObjectId. Result + # shouldd be similar to that with None but there # should be an elog message we verify here - bad_hid=ObjectId() + bad_hid = ObjectId() ts_3 = TimeSeries(ts) self.db._load_history( ts_3, - bad_hid, - alg_name="test_alg_name", - alg_id="test_alg_id", + bad_hid, + alg_name="test_alg_name", + alg_id="test_alg_id", ) assert ts_3.is_origin() nd = ts_3.current_nodedata() assert nd.algorithm == "test_alg_name" assert nd.algid == "test_alg_id" - assert ts_3.elog.size()==1 - + assert ts_3.elog.size() == 1 + # test with read_data using and defining load_history True ts_4 = self.db.read_data( - ts_3['_id'], - load_history=True, - alg_id="testreader_fakeid" - ) + ts_3["_id"], load_history=True, alg_id="testreader_fakeid" + ) assert ts_4.live assert ts_4.is_origin() - assert ts_4.number_of_stages()==3 + assert ts_4.number_of_stages() == 3 nd = ts_4.current_nodedata() - assert nd.algorithm=="read_data" - assert nd.algid=="testreader_fakeid" - + assert nd.algorithm == "read_data" + assert nd.algid == "testreader_fakeid" def test_update_metadata(self): - ts = copy.deepcopy(self.test_ts) - logging_helper.info(ts, "1", "deepcopy") + ts = TimeSeries(self.test_ts) exclude = ["extra2"] - # insert ts into the database + # insert document into wf_TimeSeries - use this id for later updates wfid = ( self.db["wf_TimeSeries"] .insert_one( @@ -526,11 +514,12 @@ def test_update_metadata(self): res_ts = self.db.update_metadata(123) # test dead object - ts.live = False + ts.kill() res_ts = self.db.update_metadata(ts) assert not res_ts - ts.set_live() + # could use set_live but safer to make a fresh copy to avoid side effects of kill + ts = TimeSeries(self.test_ts) # test mode that not in promiscuous, cautious and pedantic with pytest.raises( MsPASSError, @@ -550,168 +539,188 @@ def test_update_metadata(self): ts["_id"] = wfid # test promiscuous ts["extra1"] = "extra1+" - ts["net"] = "Asia" + # compound keys implying normalization should always be deleted + # even in promiscuous mode + ts["site_net"] = "Asia" ts["npts"] = 255 logging_helper.info(ts, "2", "update_metadata") res_ts = self.db.update_metadata( ts, mode="promiscuous", exclude_keys=exclude, force_keys=["extra3"] ) assert res_ts.live - res = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) - # test read-only attribute - assert "net" not in res - assert "READONLYERROR_net" not in res - assert ( - ts.elog.get_error_log()[-1].message - == "readonly attribute with key=net was improperly modified. Saved changed value with key=READONLYERROR_net" - ) - assert len(ts.elog.get_error_log()) == 3 - ts.erase("net") - # test default update - assert res["extra1"] == "extra1+" - assert "source_id" in res - assert "site_id" in res - assert "channel_id" in res - assert res["npts"] == 255 + doc = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) + # test that normalized attribute defined key was deleted + assert "site_net" not in doc + # same - update_metadata should not add an error + # weirdness in this test is that the parent (self.test_ts) has two informational + # elog entries so the size of elog to begin is 2. Hence we test for 2 not 0 + assert ts.elog.size() == 2 + # test updates are as expected + assert doc["extra1"] == "extra1+" + assert "source_id" in doc + assert "site_id" in doc + assert "channel_id" in doc + assert doc["npts"] == 255 # test exclude keys - assert "extra2" not in res + assert "extra2" not in doc # test clear alias - assert "t0" not in res - assert "starttime" in res + assert "t0" not in doc + assert "starttime" in doc # test empty keys - assert "test" not in res + assert "test" not in doc # test sync_metadata_before_update - assert "utc_convertible" in res - assert "time_standard" in res + assert "utc_convertible" in doc + assert "time_standard" in doc # test force_keys(but extra3 is not in metadata) - assert "extra3" not in res + assert "extra3" not in doc # test cautious(required key) -> fail + ts = TimeSeries(self.test_ts) old_npts = ts["npts"] ts.put_string("npts", "xyz") logging_helper.info(ts, "2", "update_metadata") + # adding id needed since we are testing update + ts["_id"] = wfid res_ts = self.db.update_metadata( ts, exclude_keys=["extra1", "extra2", "utc_convertible"] ) - # object is killed - assert not res_ts.live - res = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) - # attr value remains the same - assert res["npts"] == old_npts - # add one elog entry - assert len(ts.elog.get_error_log()) == 4 + + assert res_ts.dead() + # attain parent had elog size of 2. The above should add an entry so + # expect 3 + assert len(res_ts.elog.get_error_log()) == 3 # test cautious(required key) -> success - ts.live = True - ts.put_string("npts", "123") - logging_helper.info(ts, "2", "update_metadata") + ts = TimeSeries(self.test_ts) + # adding id needed since we are testing update + ts["_id"] = wfid + ts.put_string("npts", "100") res_ts = self.db.update_metadata( - ts, exclude_keys=["extra1", "extra2", "utc_convertible"] + ts, mode="cautious", exclude_keys=["extra1", "extra2", "utc_convertible"] ) assert res_ts.live - res = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) + doc = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) # attr value remains the same - assert res["npts"] == 123 - # add one elog entry - assert len(ts.elog.get_error_log()) == 5 - ts.put("npts", 255) + npts = doc["npts"] + assert isinstance(npts, int) + assert doc["npts"] == 100 + # leaves datum live but should add one elog entry + assert res_ts.elog.size() == 3 # test cautious(normal key) -> fail + ts = TimeSeries(self.test_ts) + # adding id needed since we are testing update + ts["_id"] = wfid old_sampling_rate = ts["sampling_rate"] ts.put_string("sampling_rate", "xyz") - logging_helper.info(ts, "2", "update_metadata") res_ts = self.db.update_metadata( - ts, exclude_keys=["extra1", "extra2", "utc_convertible"] + ts, mode="cautious", exclude_keys=["extra1", "extra2", "utc_convertible"] ) assert res_ts.live - res = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) + doc = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) # attr value remains the same - assert res["sampling_rate"] == old_sampling_rate + assert doc["sampling_rate"] == old_sampling_rate # add one elog entry - assert len(ts.elog.get_error_log()) == 6 + assert res_ts.elog.size() == 3 # test cautious(normal key) -> success + ts = TimeSeries(self.test_ts) + # adding id needed since we are testing update + ts["_id"] = wfid ts.put_string("sampling_rate", "1.0") - logging_helper.info(ts, "2", "update_metadata") res_ts = self.db.update_metadata( - ts, exclude_keys=["extra1", "extra2", "utc_convertible"] + ts, mode="cautious", exclude_keys=["extra1", "extra2", "utc_convertible"] ) assert res_ts.live - res = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) + doc = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) # attr value remains the same - assert res["sampling_rate"] == 1.0 - # add one elog entry - assert len(ts.elog.get_error_log()) == 7 - ts.put("sampling_rate", 1.0) + assert doc["sampling_rate"] == 1.0 + # adds one elog entry + assert res_ts.elog.size() == 3 # test cautious(schema undefined key) + ts = TimeSeries(self.test_ts) + # adding id needed since we are testing update + ts["_id"] = wfid ts["extra3"] = "123" - logging_helper.info(ts, "2", "update_metadata") res_ts = self.db.update_metadata( - ts, exclude_keys=["extra1", "extra2", "utc_convertible"] + ts, mode="cautious", exclude_keys=["extra1", "extra2", "utc_convertible"] ) assert res_ts.live - res = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) - # can add attribute to the database - assert res["extra3"] == "123" - # add 1 more log error to the elog - assert len(ts.elog.get_error_log()) == 8 - ts.erase("extra3") + doc = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) + # Verify new attribute was properly added + assert doc["extra3"] == "123" + # adds 1 more log error to the elog + assert res_ts.elog.size() == 3 # test pedantic(required key) -> fail - old_npts = ts["npts"] + ts = TimeSeries(self.test_ts) + # adding id needed since we are testing update + ts["_id"] = wfid + # earlier edits of a common record alter npts so we have to + # fetch that value not the one from ts + doc = self.db["wf_TimeSeries"].find_one({"_id": wfid}) + old_npts = doc["npts"] ts.put_string("npts", "xyz") - logging_helper.info(ts, "2", "update_metadata") res_ts = self.db.update_metadata( ts, mode="pedantic", exclude_keys=["extra1", "extra2", "utc_convertible"], ) - assert not res_ts.live - res = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) + assert res_ts.dead() + doc = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) # attr value remains the same - assert res["npts"] == old_npts - # add one elog entry - assert len(ts.elog.get_error_log()) == 10 + assert doc["npts"] == old_npts + # That update should add two elog entries + assert res_ts.elog.size() == 4 - # test pedantic(required key) -> success - ts.live = True + # test pedantic(required key) -> does auto repair and returns live + ts = TimeSeries(self.test_ts) + # adding id needed since we are testing update + ts["_id"] = wfid ts.put_string("npts", "123") - logging_helper.info(ts, "2", "update_metadata") res_ts = self.db.update_metadata( ts, mode="pedantic", exclude_keys=["extra1", "extra2", "utc_convertible"], ) assert res_ts.live - res = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) + doc = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) # attr value remains the same - assert res["npts"] == 123 - # add one elog entry - assert len(ts.elog.get_error_log()) == 12 - ts.put("npts", 255) + assert doc["npts"] == 123 + # This update problem adds 2 elog entries + assert res_ts.elog.size() == 4 - # test pedantic(normal key) -> fail - old_sampling_rate = ts["sampling_rate"] + # test pedantic(normal key) that fails and causes a kill + ts = TimeSeries(self.test_ts) + # adding id needed since we are testing update + ts["_id"] = wfid + # earlier updates may change sampling rate stored for the + # document we've been altering so we pull it from there + # not ts + doc = self.db.wf_TimeSeries.find_one({"_id": wfid}) + old_sampling_rate = doc["sampling_rate"] ts.put_string("sampling_rate", "xyz") - logging_helper.info(ts, "2", "update_metadata") res_ts = self.db.update_metadata( ts, mode="pedantic", exclude_keys=["extra1", "extra2", "utc_convertible"], ) - # this test probably should be testing if ts is dead - assert not res_ts.live - res = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) - # attr value remains the same - assert res["sampling_rate"] == old_sampling_rate + assert res_ts.dead() + doc = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) + # verify value did not change in database document even if + # the update failed + assert doc["sampling_rate"] == old_sampling_rate # add two more log error to the elog - assert len(ts.elog.get_error_log()) == 14 + assert res_ts.elog.size() == 4 - # test pedantic(normal key) -> success + # test pedantic(normal key) where the auto repairs work + # will leave datum returned live but adds elog entries + ts = TimeSeries(self.test_ts) + # adding id needed since we are testing update + ts["_id"] = wfid ts.live = True ts.put_string("sampling_rate", "5.0") - logging_helper.info(ts, "2", "update_metadata") res_ts = self.db.update_metadata( ts, mode="pedantic", @@ -719,40 +728,46 @@ def test_update_metadata(self): ) # this test probably should be testing if ts is dead assert res_ts.live - res = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) + doc = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) # attr value remains the same - assert res["sampling_rate"] == 5.0 + assert doc["sampling_rate"] == 5.0 # add two more log error to the elog - assert len(ts.elog.get_error_log()) == 16 - ts.put("sampling_rate", 20.0) + assert res_ts.elog.size() == 4 - # test pedantic(schema undefined key) + # test pedantic mode with a key not defined in the schema + ts = TimeSeries(self.test_ts) + # adding id needed since we are testing update + ts["_id"] = wfid ts["extra4"] = "123" - logging_helper.info(ts, "2", "update_metadata") res_ts = self.db.update_metadata( ts, mode="pedantic", exclude_keys=["extra1", "extra2", "utc_convertible"], ) assert res_ts.live - res = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) - # can add attribute to the database - assert "extra4" not in res - # add 1 more log error to the elog - assert len(ts.elog.get_error_log()) == 17 - - # test _id which we can't find the corresponding document in database + doc = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) + # in pedantic mode update will be refused so the test + # attribute should not be present + assert "extra4" not in doc + # This error adds only one elog entry + assert res_ts.elog.size() == 3 + + # test trying to do an update with an id not in the db + # should add a new document with the new objectid + ts = TimeSeries(self.test_ts) + # adding id needed since we are testing update ts["_id"] = ObjectId() res_ts = self.db.update_metadata(ts) # should insert a document into wf collection - res = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) - assert ts.live - assert res + doc = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) + assert res_ts.live + assert doc is not None # test tmatrix attribute when update seismogram - test_seis = get_live_seismogram() - logging_helper.info(test_seis, "1", "deepcopy") - # insert ts into the database + test_seis0 = get_live_seismogram() + # work with a copy to avoid side effects + test_seis = Seismogram(test_seis0) + # insert template for updates into the database wfid = ( self.db["wf_Seismogram"] .insert_one( @@ -777,7 +792,9 @@ def test_update_metadata(self): assert "orthogonal" in res and res["orthogonal"] assert res["tmatrix"] == [1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0] # change tmatrix - logging_helper.info(test_seis, "2", "update_metadata") + # refresh with a copy + test_seis = Seismogram(test_seis0) + test_seis["_id"] = wfid test_seis.tmatrix = [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] res_seis = self.db.update_metadata(test_seis, mode="promiscuous") res = self.db["wf_Seismogram"].find_one({"_id": test_seis["_id"]}) @@ -789,10 +806,12 @@ def test_update_data(self): ts = copy.deepcopy(self.test_ts) logging_helper.info(ts, "1", "deepcopy") # insert ts into the database - res_ts = self.db.save_data(ts, mode="cautious", storage_mode="gridfs",return_data=True) + res_ts = self.db.save_data( + ts, mode="cautious", storage_mode="gridfs", return_data=True + ) assert ts.live # This test was reversed in api change mid 2023. Now always set storage_mode so reverse test - #assert not "storage_mode" in ts + # assert not "storage_mode" in ts assert "storage_mode" in ts # change read only attribute to create a elog entry ts["net"] = "test_net" @@ -814,10 +833,11 @@ def test_update_data(self): # Changes for V2 modified the output of this test due to # an implementation detail. V1 had the following: # should add 3 more elog entries(one in update_metadata, two in update_data) - #assert len(ts.elog.get_error_log()) == old_elog_size + 3 - # Revision for V2 removed use of update_metadata and only 2 errors - # are logged by update_data. Hence the following revision - assert len(ts.elog.get_error_log()) == old_elog_size + 2 + # assert len(ts.elog.get_error_log()) == old_elog_size + 3 + # Revision for V2 update_data does not add an elog entry so this + # assertion was dropped. Retained commented out in the event + # this gets changed later. + # assert en(ts.elog.get_error_log()) == old_elog_size + 2 # check history_object collection and elog_id collection wf_res = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) elog_res = self.db["elog"].find_one({"_id": ts["elog_id"]}) @@ -852,23 +872,23 @@ def test_update_data(self): def test_save_read_data_timeseries(self): """ This method tests various permutations of read and write - of TimeSeries data objects. It probably should be broken into - about a dozen different functions but for now think of it - as a series of tests of read_data and write_data and - associated baggage. Instead we have comments that enumerate + of TimeSeries data objects. It probably should be broken into + about a dozen different functions but for now think of it + as a series of tests of read_data and write_data and + associated baggage. Instead we have comments that enumerate blocks testing specific features. It has a companion - below called "tes_save_read_data_seismogram" that is - nearly identical but for Seismogram objects. - The main differences is noremalization with channel instead of + below called "tes_save_read_data_seismogram" that is + nearly identical but for Seismogram objects. + The main differences is noremalization with channel instead of site. """ - # Test 0 - nothing to read with a random object id + # Test 0 - nothing to read with a random object id # returns a default constructed dead datum with an elog entry fail_ts = self.db.read_data( ObjectId(), mode="cautious", normalize=["channel", "source"] ) assert fail_ts.dead() - assert fail_ts.elog.size()>0 + assert fail_ts.elog.size() > 0 # first create valid simulation data for tests promiscuous_ts0 = copy.deepcopy(self.test_ts) @@ -877,17 +897,17 @@ def test_save_read_data_timeseries(self): logging_helper.info(promiscuous_ts0, "1", "deepcopy") logging_helper.info(cautious_ts0, "1", "deepcopy") logging_helper.info(pedantic_ts0, "1", "deepcopy") - # We use the C++ copy constructor here to build working + # We use the C++ copy constructor here to build working # copies. It doesn't alter history so they are pure clones # of 0 version with stage 1 of history "deepcopy" - promiscuous_ts=TimeSeries(promiscuous_ts0) + promiscuous_ts = TimeSeries(promiscuous_ts0) cautious_ts = TimeSeries(cautious_ts0) pedantic_ts = TimeSeries(pedantic_ts0) # Test 1 - # initial test of a basic save in promiscuous mode and all + # initial test of a basic save in promiscuous mode and all # else defaulted - promiscuous is actually the default but set - # here for emphasis. Well, actually return_data is needed + # here for emphasis. Well, actually return_data is needed # to validate but default is false res_ts = self.db.save_data( promiscuous_ts, @@ -895,15 +915,15 @@ def test_save_read_data_timeseries(self): return_data=True, ) assert res_ts.live - # read it back in and compare the metadata entries. + # read it back in and compare the metadata entries. # they will not match completely because t0 is an alias - # that is converted in all modes to starttime. The original - # promiscuous_ts saved has a "test" key that is empty and + # that is converted in all modes to starttime. The original + # promiscuous_ts saved has a "test" key that is empty and # is also automatically dropped in all modes - all spaces # is dogmatically treated as null - res_ts_read = self.db.read_data(res_ts['_id'],collection="wf_TimeSeries") - skipkeys=['t0','test', 'is_abortion'] - for k in res_ts: + res_ts_read = self.db.read_data(res_ts["_id"], collection="wf_TimeSeries") + skipkeys = ["t0", "test", "is_abortion"] + for k in res_ts: if k not in skipkeys: assert res_ts_read[k] == res_ts[k] # this verifies the alias @@ -914,12 +934,12 @@ def test_save_read_data_timeseries(self): assert "is_abortion" in res_ts_read assert not res_ts_read["is_abortion"] # We uses this to retrieve this record for later test - basic_save_ts_id=res_ts['_id'] - - # + basic_save_ts_id = res_ts["_id"] + + # # Test 2 # same as above but add exclude_keys to verify that works - # use a fresh copy of initial datum + # use a fresh copy of initial datum promiscuous_ts = TimeSeries(promiscuous_ts0) res_ts = self.db.save_data( promiscuous_ts, @@ -928,10 +948,10 @@ def test_save_read_data_timeseries(self): return_data=True, ) assert res_ts.live - # check extra2 was excluded - res_ts_read = self.db.read_data(res_ts['_id'],collection="wf_TimeSeries") - skipkeys=['t0','test', 'is_abortion'] - for k in res_ts: + # check extra2 was excluded + res_ts_read = self.db.read_data(res_ts["_id"], collection="wf_TimeSeries") + skipkeys = ["t0", "test", "is_abortion"] + for k in res_ts: if k not in skipkeys: assert res_ts_read[k] == res_ts[k] # this verifies the alias @@ -942,8 +962,8 @@ def test_save_read_data_timeseries(self): assert "is_abortion" in res_ts_read assert not res_ts_read["is_abortion"] # We uses this to retrieve this record for later test - basic_save_ts_id=res_ts['_id'] - + basic_save_ts_id = res_ts["_id"] + # test 3 # here we validate the history data was saved correctly # this actualy somewhat duplicates tests in test_save_load_history @@ -972,7 +992,7 @@ def test_save_read_data_timeseries(self): cautious_ts = TimeSeries(cautious_ts0) cautious_ts.put_string("npts", "xyz") res_ts = self.db.save_data( - cautious_ts, mode="cautious", storage_mode="gridfs",return_data=True + cautious_ts, mode="cautious", storage_mode="gridfs", return_data=True ) assert res_ts.dead() # save kills both original and return because data is passed by reference @@ -983,21 +1003,21 @@ def test_save_read_data_timeseries(self): # for invalid required args cautious and pedantic act the same # here we use sampling_rate wrong instead of npts # i.e. this is a comparable test to previous - pedantic_ts=TimeSeries(pedantic_ts0) + pedantic_ts = TimeSeries(pedantic_ts0) pedantic_ts.put_string("sampling_rate", "xyz") res_ts = self.db.save_data( - pedantic_ts, mode="pedantic", storage_mode="gridfs",return_data=True + pedantic_ts, mode="pedantic", storage_mode="gridfs", return_data=True ) assert res_ts.dead() assert pedantic_ts.dead() # the cemetery should contain two documents from each of the # above saves at this point - n_dead=self.db.cemetery.count_documents({}) + n_dead = self.db.cemetery.count_documents({}) assert n_dead == 2 # test 6 # Valitdate normalziation works correctly - # the following uses the id of the basic save done near the + # the following uses the id of the basic save done near the # top of this test function # in this mode the retrieved @@ -1008,24 +1028,28 @@ def test_save_read_data_timeseries(self): normalize=["channel", "source"], collection="wf_TimeSeries", ) - assert '_id' in promiscuous_ts2 + assert "_id" in promiscuous_ts2 # 2 should have the same metadata as original plus _id # hence loop is driven by datum prior to being saved # 2 should also contain the normalizatoin data which we test after this loop - skipkeys=['t0','test', 'is_abortion'] + skipkeys = ["t0", "test", "is_abortion"] for k in promiscuous_ts: if k not in skipkeys: assert promiscuous_ts[k] == promiscuous_ts2[k] assert not promiscuous_ts2["is_abortion"] # normalize should set these - channellist=['channel_lat', 'channel_lon', 'channel_elev', 'channel_starttime', - 'channel_endtime'] - sourcelist=['source_lat', 'source_lon', 'source_depth', 'source_time'] + channellist = [ + "channel_lat", + "channel_lon", + "channel_elev", + "channel_starttime", + "channel_endtime", + ] + sourcelist = ["source_lat", "source_lon", "source_depth", "source_time"] for k in channellist: assert k in promiscuous_ts2 for k in sourcelist: - assert k in promiscuous_ts2 - + assert k in promiscuous_ts2 # test 7 # test exclude_keys feature of read_data - above tested save_data @@ -1040,7 +1064,6 @@ def test_save_read_data_timeseries(self): for k in exclude_list: assert k not in exclude_promiscuous_ts2 - # test 8 # This test saves a datum with an invalid value in promiscuous # mode. It is then read back cautious and we validate @@ -1049,7 +1072,7 @@ def test_save_read_data_timeseries(self): # add the error as the above changes it. That makes this # test more stable and less confusing than the previous version cautious_ts = TimeSeries(cautious_ts0) - cautious_ts["npts"]="foobar" + cautious_ts["npts"] = "foobar" res_ts = self.db.save_data( cautious_ts, mode="promiscuous", @@ -1064,7 +1087,10 @@ def test_save_read_data_timeseries(self): # previous save has invalid npts which makes it impossible to # reconstruct the datum cautious_ts2 = self.db.read_data( - res_ts["_id"], mode="cautious", normalize=["channel", "source"],collection="wf_TimeSeries" + res_ts["_id"], + mode="cautious", + normalize=["channel", "source"], + collection="wf_TimeSeries", ) # This next test could change if the schema changes. # currently it issues a complaint for keys history_id, elog_id, and extra1 @@ -1075,61 +1101,76 @@ def test_save_read_data_timeseries(self): assert cautious_ts2_elog[1].badness == ErrorSeverity.Invalid assert "data" not in cautious_ts2 - assert cautious_ts2['is_abortion'] - + assert cautious_ts2["is_abortion"] + # test 9 # Now do the same thing but set npts to a string that can # be converted to an int - note 255 is magic from the seis generator cautious_ts = TimeSeries(cautious_ts0) cautious_ts.put_string("npts", "255") res_ts = self.db.save_data( - cautious_ts, mode="promiscuous", storage_mode="gridfs",return_data=True,collection="wf_TimeSeries" + cautious_ts, + mode="promiscuous", + storage_mode="gridfs", + return_data=True, + collection="wf_TimeSeries", ) assert res_ts.live assert cautious_ts.live cautious_ts2 = self.db.read_data( - res_ts["_id"], mode="cautious", + res_ts["_id"], + mode="cautious", ) assert cautious_ts2.live assert cautious_ts2["npts"] == 255 - assert not cautious_ts2['is_abortion'] - # in pedantic mode the same read should cause the datum + assert not cautious_ts2["is_abortion"] + # in pedantic mode the same read should cause the datum # to be killed pedantic_ts2 = self.db.read_data( - res_ts["_id"], mode="pedantic",collection="wf_TimeSeries" + res_ts["_id"], mode="pedantic", collection="wf_TimeSeries" ) assert pedantic_ts2.dead() assert pedantic_ts2["is_abortion"] - # test 10 # test save with non exist id under cautious mode - cautious_ts=TimeSeries(cautious_ts0) + cautious_ts = TimeSeries(cautious_ts0) non_exist_id = ObjectId() cautious_ts["_id"] = non_exist_id logging_helper.info(cautious_ts, "3", "save_data") - res_ts = self.db.save_data(cautious_ts, mode="cautious",return_data=True) + res_ts = self.db.save_data(cautious_ts, mode="cautious", return_data=True) assert res_ts.live assert cautious_ts.live assert "_id" in cautious_ts assert not cautious_ts["_id"] == non_exist_id - # Test 11 # Test new normalize method using list of matchers # use both cached and db version to test both - from mspasspy.db.normalize import ObjectIdMatcher,ObjectIdDBMatcher - chankeylist=['_id', 'lat', 'lon', 'elev','starttime','endtime',"hang","vang"] - channel_matcher=ObjectIdMatcher(self.db, - collection='channel', - attributes_to_load=chankeylist, - load_if_defined=['net','sta','loc'], - ) - source_matcher=ObjectIdDBMatcher(self.db, - collection='source', - attributes_to_load=['_id', 'lat', 'lon', 'depth','time'], - load_if_defined=['magnitude'], - ) + from mspasspy.db.normalize import ObjectIdMatcher, ObjectIdDBMatcher + + chankeylist = [ + "_id", + "lat", + "lon", + "elev", + "starttime", + "endtime", + "hang", + "vang", + ] + channel_matcher = ObjectIdMatcher( + self.db, + collection="channel", + attributes_to_load=chankeylist, + load_if_defined=["net", "sta", "loc"], + ) + source_matcher = ObjectIdDBMatcher( + self.db, + collection="source", + attributes_to_load=["_id", "lat", "lon", "depth", "time"], + load_if_defined=["magnitude"], + ) # basic_save_ts_id is defined earlier in this function # is the stock datum saved with defaults in promiscuous mode promiscuous_ts2 = self.db.read_data( @@ -1138,9 +1179,9 @@ def test_save_read_data_timeseries(self): normalize=[channel_matcher, source_matcher], collection="wf_TimeSeries", ) - # this is a looser list than what we used earlier that - # was carried forward in major revision aug 2023. - # minor issue - the major point of this is after we + # this is a looser list than what we used earlier that + # was carried forward in major revision aug 2023. + # minor issue - the major point of this is after we # verify these match wf_keys = [ @@ -1170,14 +1211,13 @@ def test_save_read_data_timeseries(self): assert "extra2" not in promiscuous_ts2 # now verify normalization worked correctly - res = self.db["channel"].find_one({"_id": promiscuous_ts["channel_id"]}) for k in chankeylist: - if k=="_id": - k2="channel_id" + if k == "_id": + k2 = "channel_id" else: k2 = "channel_{}".format(k) - assert promiscuous_ts2[k2]==res[k] + assert promiscuous_ts2[k2] == res[k] # TODO: should handle load_if_defined properly res = self.db["source"].find_one({"_id": promiscuous_ts["source_id"]}) @@ -1189,22 +1229,23 @@ def test_save_read_data_timeseries(self): # Necessary to avoid state problems with other tests self.db.drop_collection("cemetery") self.db.drop_collection("abortions") + def test_save_read_data_seismogram(self): """ This method tests various permutations of read and write - of Seismogram data objects. It probably should be broken into - about a dozen different functions but for now think of it - as a series of tests of read_data and write_data and - associated baggage. Instead we have comments that enumerate + of Seismogram data objects. It probably should be broken into + about a dozen different functions but for now think of it + as a series of tests of read_data and write_data and + associated baggage. Instead we have comments that enumerate blocks testing specific features. """ - # Test 0 - nothing to read with a random object id + # Test 0 - nothing to read with a random object id # returns a default constructed dead datum with an elog entry fail_seis = self.db.read_data( ObjectId(), mode="cautious", normalize=["site", "source"] ) assert fail_seis.dead() - assert fail_seis.elog.size()>0 + assert fail_seis.elog.size() > 0 # tests for Seismogram # first create valid simulation data for tests @@ -1214,17 +1255,17 @@ def test_save_read_data_seismogram(self): logging_helper.info(promiscuous_seis0, "1", "deepcopy") logging_helper.info(cautious_seis0, "1", "deepcopy") logging_helper.info(pedantic_seis0, "1", "deepcopy") - # We use the C++ copy constructor here to build working + # We use the C++ copy constructor here to build working # copies. It doesn't alter history so they are pure clones # of 0 version with stage 1 of history "deepcopy" - promiscuous_seis=Seismogram(promiscuous_seis0) + promiscuous_seis = Seismogram(promiscuous_seis0) cautious_seis = Seismogram(cautious_seis0) pedantic_seis = Seismogram(pedantic_seis0) # Test 1 - # initial test of a basic save in promiscuous mode and all + # initial test of a basic save in promiscuous mode and all # else defaulted - promiscuous is actually the default but set - # here for emphasis. Well, actually return_data is needed + # here for emphasis. Well, actually return_data is needed # to validate but default is false res_seis = self.db.save_data( promiscuous_seis, @@ -1232,15 +1273,15 @@ def test_save_read_data_seismogram(self): return_data=True, ) assert res_seis.live - # read it back in and compare the metadata entries. + # read it back in and compare the metadata entries. # they will not match completely because t0 is an alias - # that is converted in all modes to starttime. The original - # promiscuous_seis saved has a "test" key that is empty and + # that is converted in all modes to starttime. The original + # promiscuous_seis saved has a "test" key that is empty and # is also automatically dropped in all modes - all spaces # is dogmatically treated as null - res_seis_read = self.db.read_data(res_seis['_id'],collection="wf_Seismogram") - skipkeys=['t0','test', 'is_abortion'] - for k in res_seis: + res_seis_read = self.db.read_data(res_seis["_id"], collection="wf_Seismogram") + skipkeys = ["t0", "test", "is_abortion"] + for k in res_seis: if k not in skipkeys: assert res_seis_read[k] == res_seis[k] # this verifies the alias @@ -1251,12 +1292,12 @@ def test_save_read_data_seismogram(self): assert "is_abortion" in res_seis_read assert not res_seis_read["is_abortion"] # We uses this to retrieve this record for later test - basic_save_seis_id=res_seis['_id'] - - # + basic_save_seis_id = res_seis["_id"] + + # # Test 2 # same as above but add exclude_keys to verify that works - # use a fresh copy of initial datum + # use a fresh copy of initial datum promiscuous_seis = Seismogram(promiscuous_seis0) res_seis = self.db.save_data( promiscuous_seis, @@ -1265,10 +1306,10 @@ def test_save_read_data_seismogram(self): return_data=True, ) assert res_seis.live - # check extra2 was excluded - res_seis_read = self.db.read_data(res_seis['_id'],collection="wf_Seismogram") - skipkeys=['t0','test', 'is_abortion'] - for k in res_seis: + # check extra2 was excluded + res_seis_read = self.db.read_data(res_seis["_id"], collection="wf_Seismogram") + skipkeys = ["t0", "test", "is_abortion"] + for k in res_seis: if k not in skipkeys: assert res_seis_read[k] == res_seis[k] # this verifies the alias @@ -1279,8 +1320,8 @@ def test_save_read_data_seismogram(self): assert "is_abortion" in res_seis_read assert not res_seis_read["is_abortion"] # We uses this to retrieve this record for later test - basic_save_seis_id=res_seis['_id'] - + basic_save_seis_id = res_seis["_id"] + # test 3 # here we validate the history data was saved correctly # this actualy somewhat duplicates tests in test_save_load_history @@ -1309,7 +1350,7 @@ def test_save_read_data_seismogram(self): cautious_seis = Seismogram(cautious_seis0) cautious_seis.put_string("npts", "xyz") res_seis = self.db.save_data( - cautious_seis, mode="cautious", storage_mode="gridfs",return_data=True + cautious_seis, mode="cautious", storage_mode="gridfs", return_data=True ) assert res_seis.dead() # save kills both original and return because data is passed by reference @@ -1320,21 +1361,21 @@ def test_save_read_data_seismogram(self): # for invalid required args cautious and pedantic act the same # here we use sampling_rate wrong instead of npts # i.e. this is a comparable test to previous - pedantic_seis=Seismogram(pedantic_seis0) + pedantic_seis = Seismogram(pedantic_seis0) pedantic_seis.put_string("sampling_rate", "xyz") res_seis = self.db.save_data( - pedantic_seis, mode="pedantic", storage_mode="gridfs",return_data=True + pedantic_seis, mode="pedantic", storage_mode="gridfs", return_data=True ) assert res_seis.dead() assert pedantic_seis.dead() # the cemetery should contain two documents from each of the # above saves at this point - n_dead=self.db.cemetery.count_documents({}) + n_dead = self.db.cemetery.count_documents({}) assert n_dead == 2 # test 6 # Valitdate normalziation works correctly - # the following uses the id of the basic save done near the + # the following uses the id of the basic save done near the # top of this test function # in this mode the retrieved @@ -1345,24 +1386,28 @@ def test_save_read_data_seismogram(self): normalize=["site", "source"], collection="wf_Seismogram", ) - assert '_id' in promiscuous_seis2 + assert "_id" in promiscuous_seis2 # 2 should have the same metadata as original plus _id # hence loop is driven by datum prior to being saved # 2 should also contain the normalizatoin data which we test after this loop - skipkeys=['t0','test', 'is_abortion'] + skipkeys = ["t0", "test", "is_abortion"] for k in promiscuous_seis: if k not in skipkeys: assert promiscuous_seis[k] == promiscuous_seis2[k] assert not promiscuous_seis2["is_abortion"] # normalize should set these - sitelist=['site_lat', 'site_lon', 'site_elev', 'site_starttime', - 'site_endtime'] - sourcelist=['source_lat', 'source_lon', 'source_depth', 'source_time'] + sitelist = [ + "site_lat", + "site_lon", + "site_elev", + "site_starttime", + "site_endtime", + ] + sourcelist = ["source_lat", "source_lon", "source_depth", "source_time"] for k in sitelist: assert k in promiscuous_seis2 for k in sourcelist: - assert k in promiscuous_seis2 - + assert k in promiscuous_seis2 # test 7 # test exclude_keys feature of read_data - above tested save_data @@ -1377,7 +1422,6 @@ def test_save_read_data_seismogram(self): for k in exclude_list: assert k not in exclude_promiscuous_seis2 - # test 8 # This test saves a datum with an invalid value in promiscuous # mode. It is then read back cautious and we validate @@ -1386,7 +1430,7 @@ def test_save_read_data_seismogram(self): # add the error as the above changes it. That makes this # test more stable and less confusing than the previous version cautious_seis = Seismogram(cautious_seis0) - cautious_seis["npts"]="foobar" + cautious_seis["npts"] = "foobar" res_seis = self.db.save_data( cautious_seis, mode="promiscuous", @@ -1401,7 +1445,10 @@ def test_save_read_data_seismogram(self): # previous save has invalid npts which makes it impossible to # reconstruct the datum cautious_seis2 = self.db.read_data( - res_seis["_id"], mode="cautious", normalize=["site", "source"],collection="wf_Seismogram" + res_seis["_id"], + mode="cautious", + normalize=["site", "source"], + collection="wf_Seismogram", ) # This next test could change if the schema changes. # currently it issues a complaint for keys history_id, elog_id, and extra1 @@ -1412,60 +1459,66 @@ def test_save_read_data_seismogram(self): assert cautious_seis2_elog[1].badness == ErrorSeverity.Invalid assert "data" not in cautious_seis2 - assert cautious_seis2['is_abortion'] - + assert cautious_seis2["is_abortion"] + # test 9 # Now do the same thing but set npts to a string that can # be converted to an int - note 255 is magic from the seis generator cautious_seis = Seismogram(cautious_seis0) cautious_seis.put_string("npts", "255") res_seis = self.db.save_data( - cautious_seis, mode="promiscuous", storage_mode="gridfs",return_data=True,collection="wf_Seismogram" + cautious_seis, + mode="promiscuous", + storage_mode="gridfs", + return_data=True, + collection="wf_Seismogram", ) assert res_seis.live assert cautious_seis.live cautious_seis2 = self.db.read_data( - res_seis["_id"], mode="cautious", + res_seis["_id"], + mode="cautious", ) assert cautious_seis2.live assert cautious_seis2["npts"] == 255 - assert not cautious_seis2['is_abortion'] - # in pedantic mode the same read should cause the datum + assert not cautious_seis2["is_abortion"] + # in pedantic mode the same read should cause the datum # to be killed pedantic_seis2 = self.db.read_data( - res_seis["_id"], mode="pedantic",collection="wf_Seismogram" + res_seis["_id"], mode="pedantic", collection="wf_Seismogram" ) assert pedantic_seis2.dead() assert pedantic_seis2["is_abortion"] - # test 10 # test save with non exist id under cautious mode - cautious_seis=Seismogram(cautious_seis0) + cautious_seis = Seismogram(cautious_seis0) non_exist_id = ObjectId() cautious_seis["_id"] = non_exist_id logging_helper.info(cautious_seis, "3", "save_data") - res_seis = self.db.save_data(cautious_seis, mode="cautious",return_data=True) + res_seis = self.db.save_data(cautious_seis, mode="cautious", return_data=True) assert res_seis.live assert cautious_seis.live assert "_id" in cautious_seis assert not cautious_seis["_id"] == non_exist_id - # Test 11 # Test new normalize method using list of matchers # use both cached and db version to test both - from mspasspy.db.normalize import ObjectIdMatcher,ObjectIdDBMatcher - site_matcher=ObjectIdMatcher(self.db, - collection='site', - attributes_to_load=['_id', 'lat', 'lon', 'elev','starttime','endtime'], - load_if_defined=['net','sta','loc'], - ) - source_matcher=ObjectIdDBMatcher(self.db, - collection='source', - attributes_to_load=['_id', 'lat', 'lon', 'depth','time'], - load_if_defined=['magnitude'], - ) + from mspasspy.db.normalize import ObjectIdMatcher, ObjectIdDBMatcher + + site_matcher = ObjectIdMatcher( + self.db, + collection="site", + attributes_to_load=["_id", "lat", "lon", "elev", "starttime", "endtime"], + load_if_defined=["net", "sta", "loc"], + ) + source_matcher = ObjectIdDBMatcher( + self.db, + collection="source", + attributes_to_load=["_id", "lat", "lon", "depth", "time"], + load_if_defined=["magnitude"], + ) # basic_save_seis_id is defined earlier in this function # is the stock datum saved with defaults in promiscuous mode promiscuous_seis2 = self.db.read_data( @@ -1474,9 +1527,9 @@ def test_save_read_data_seismogram(self): normalize=[site_matcher, source_matcher], collection="wf_Seismogram", ) - # this is a looser list than what we used earlier that - # was carried forward in major revision aug 2023. - # minor issue - the major point of this is after we + # this is a looser list than what we used earlier that + # was carried forward in major revision aug 2023. + # minor issue - the major point of this is after we # verify these match wf_keys = [ @@ -1527,7 +1580,6 @@ def test_save_read_data_seismogram(self): self.db.drop_collection("cemetery") self.db.drop_collection("abortions") - def test_index_mseed_file(self): dir = "python/tests/data/" dfile = "3channels.mseed" @@ -1546,7 +1598,11 @@ def test_delete_wf(self): ts = copy.deepcopy(self.test_ts) logging_helper.info(ts, "1", "deepcopy") save_res = self.db.save_data( - ts, mode="promiscuous", storage_mode="gridfs", exclude_keys=["extra2"], return_data=True + ts, + mode="promiscuous", + storage_mode="gridfs", + exclude_keys=["extra2"], + return_data=True, ) res = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) assert save_res.live @@ -1571,17 +1627,13 @@ def test_delete_wf(self): gfsh = gridfs.GridFS(self.db) assert gfsh.exists(res["gridfs_id"]) # insert a dummy elog document with wf_TimeSeries_id equals to ts['_id'] - self.db["elog"].insert_one( - {"_id": ObjectId(), "wf_TimeSeries_id": ts["_id"]} - ) + self.db["elog"].insert_one({"_id": ObjectId(), "wf_TimeSeries_id": ts["_id"]}) # grid_fs delete(clear_history, clear_elog) self.db.delete_data(ts["_id"], "TimeSeries") assert not self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) assert not gfsh.exists(res["gridfs_id"]) - assert not self.db["history_object"].find_one( - {"_id": res["history_object_id"]} - ) + assert not self.db["history_object"].find_one({"_id": res["history_object_id"]}) assert not self.db["elog"].find_one({"_id": res["elog_id"]}) assert self.db["elog"].count_documents({"wf_TimeSeries_id": ts["_id"]}) == 0 @@ -1602,9 +1654,7 @@ def test_delete_wf(self): assert save_res.live self.db.delete_data(ts["_id"], "TimeSeries") assert not self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) - assert not self.db["history_object"].find_one( - {"_id": res["history_object_id"]} - ) + assert not self.db["history_object"].find_one({"_id": res["history_object_id"]}) assert not self.db["elog"].find_one({"_id": res["elog_id"]}) assert self.db["elog"].count_documents({"wf_TimeSeries_id": ts["_id"]}) == 0 # file still exists @@ -1639,9 +1689,7 @@ def test_delete_wf(self): assert save_res.live self.db.delete_data(ts["_id"], "TimeSeries", remove_unreferenced_files=True) assert not self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) - assert not self.db["history_object"].find_one( - {"_id": res["history_object_id"]} - ) + assert not self.db["history_object"].find_one({"_id": res["history_object_id"]}) assert not self.db["elog"].find_one({"_id": res["elog_id"]}) assert self.db["elog"].count_documents({"wf_TimeSeries_id": ts["_id"]}) == 0 # file still exists, because another wf doc is using it @@ -1650,17 +1698,13 @@ def test_delete_wf(self): res2 = self.db["wf_TimeSeries"].find_one({"_id": ts2["_id"]}) assert save_res2.live - self.db.delete_data( - ts2["_id"], "TimeSeries", remove_unreferenced_files=True - ) + self.db.delete_data(ts2["_id"], "TimeSeries", remove_unreferenced_files=True) assert not self.db["wf_TimeSeries"].find_one({"_id": ts2["_id"]}) assert not self.db["history_object"].find_one( {"_id": res2["history_object_id"]} ) assert not self.db["elog"].find_one({"_id": res2["elog_id"]}) - assert ( - self.db["elog"].count_documents({"wf_TimeSeries_id": ts2["_id"]}) == 0 - ) + assert self.db["elog"].count_documents({"wf_TimeSeries_id": ts2["_id"]}) == 0 # file not exists fname = os.path.join(res2["dir"], res2["dfile"]) assert not os.path.exists(fname) @@ -1670,9 +1714,7 @@ def test_clean_collection(self): self.db["wf_TimeSeries"].delete_many({}) # test non exist document in the database - fixed_cnt = self.db.clean_collection( - "wf_TimeSeries", query={"_id": ObjectId()} - ) + fixed_cnt = self.db.clean_collection("wf_TimeSeries", query={"_id": ObjectId()}) assert not fixed_cnt # test fixed_out @@ -1688,11 +1730,19 @@ def test_clean_collection(self): ts2["starttime"] = "123" save_res = self.db.save_data( - ts1, mode="promiscuous", storage_mode="gridfs", exclude_keys=["extra2"], return_data=True + ts1, + mode="promiscuous", + storage_mode="gridfs", + exclude_keys=["extra2"], + return_data=True, ) assert save_res.live save_res = self.db.save_data( - ts2, mode="promiscuous", storage_mode="gridfs", exclude_keys=["extra2"], return_data=True + ts2, + mode="promiscuous", + storage_mode="gridfs", + exclude_keys=["extra2"], + return_data=True, ) assert save_res.live @@ -1707,7 +1757,6 @@ def test_clean(self, capfd): # from previous tests self.db.drop_collection("wf_TimeSeries") - self.db.database_schema.set_default("wf_TimeSeries", "wf") ts = copy.deepcopy(self.test_ts) logging_helper.info(ts, "1", "deepcopy") @@ -1730,18 +1779,22 @@ def test_clean(self, capfd): self.db.clean(ObjectId(), required_xref_list="123") # Test 2: - # erase a required field in TimeSeries. In promiscuous mode that + # erase a required field in TimeSeries. In promiscuous mode that # will work but produce a wf cocument that is invalid # later tests are to handle that problem datum ts.erase("npts") ts["starttime_shift"] = 1.0 save_res = self.db.save_data( - ts, mode="promiscuous", storage_mode="gridfs", exclude_keys=["extra2"], return_data=True + ts, + mode="promiscuous", + storage_mode="gridfs", + exclude_keys=["extra2"], + return_data=True, ) assert save_res.live assert ts.live # save this for later test - bad_datum_id = save_res['_id'] + bad_datum_id = save_res["_id"] # test nonexist document nonexist_id = ObjectId() @@ -1777,18 +1830,22 @@ def test_clean(self, capfd): ) ) - # Test 3: + # Test 3: # test check_xref and delete required xref_keys missing document ts = copy.deepcopy(self.test_ts) logging_helper.info(ts, "1", "deepcopy") ts["starttime_shift"] = 1.0 ts.erase("site_id") save_res = self.db.save_data( - ts, mode="promiscuous", storage_mode="gridfs", exclude_keys=["extra2"], return_data=True + ts, + mode="promiscuous", + storage_mode="gridfs", + exclude_keys=["extra2"], + return_data=True, ) assert save_res.live # The document with this id has the missing site_id value - bad_xref_wfid = save_res['_id'] + bad_xref_wfid = save_res["_id"] fixes_cnt = self.db.clean( bad_xref_wfid, verbose=True, @@ -1816,7 +1873,11 @@ def test_clean(self, capfd): ts["npts"] = "123" ts["starttime_shift"] = 1.0 save_res = self.db.save_data( - ts, mode="promiscuous", storage_mode="gridfs", exclude_keys=["extra2"], return_data=True + ts, + mode="promiscuous", + storage_mode="gridfs", + exclude_keys=["extra2"], + return_data=True, ) assert save_res.live fixes_cnt = self.db.clean(ts["_id"], verbose=True) @@ -1841,7 +1902,11 @@ def test_clean(self, capfd): ts["npts"] = "xyz" ts["starttime_shift"] = 1.0 save_res = self.db.save_data( - ts, mode="promiscuous", storage_mode="gridfs", exclude_keys=["extra2"], return_data=True + ts, + mode="promiscuous", + storage_mode="gridfs", + exclude_keys=["extra2"], + return_data=True, ) fixes_cnt = self.db.clean(ts["_id"], verbose=True) res = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) @@ -1886,7 +1951,11 @@ def test_clean(self, capfd): logging_helper.info(ts, "1", "deepcopy") ts["starttime_shift"] = 1.0 save_res = self.db.save_data( - ts, mode="promiscuous", storage_mode="gridfs", exclude_keys=["extra2"], return_data=True + ts, + mode="promiscuous", + storage_mode="gridfs", + exclude_keys=["extra2"], + return_data=True, ) assert save_res.live res = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) @@ -1905,7 +1974,11 @@ def test_clean(self, capfd): logging_helper.info(ts, "1", "deepcopy") ts["starttime_shift"] = 1.0 save_res = self.db.save_data( - ts, mode="promiscuous", storage_mode="gridfs", exclude_keys=["extra2"], return_data=True + ts, + mode="promiscuous", + storage_mode="gridfs", + exclude_keys=["extra2"], + return_data=True, ) assert save_res.live res = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) @@ -1949,7 +2022,11 @@ def test_verify(self): # mismatch type ts["delta"] = "123" save_res = self.db.save_data( - ts, mode="promiscuous", storage_mode="gridfs", exclude_keys=["extra2"], return_data=True + ts, + mode="promiscuous", + storage_mode="gridfs", + exclude_keys=["extra2"], + return_data=True, ) assert save_res.live res = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) @@ -1977,9 +2054,7 @@ def test_verify(self): and "xref" in problematic_keys["site_id"] and "type" in problematic_keys["site_id"] ) - assert "npts" in problematic_keys and problematic_keys["npts"] == [ - "undefined" - ] + assert "npts" in problematic_keys and problematic_keys["npts"] == ["undefined"] assert "delta" in problematic_keys and problematic_keys["delta"] == ["type"] def test_check_xref_key(self): @@ -2057,9 +2132,7 @@ def test_check_undefined_keys(self): return_data=True, ) assert save_res.live - self.db["wf_TimeSeries"].update_one( - {"_id": ts["_id"]}, {"$set": {"t0": 1.0}} - ) + self.db["wf_TimeSeries"].update_one({"_id": ts["_id"]}, {"$set": {"t0": 1.0}}) res = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) assert "starttime" not in res assert "npts" not in res @@ -2097,7 +2170,11 @@ def test_delete_attributes(self): logging_helper.info(ts, "1", "deepcopy") ts["starttime_shift"] = 1.0 save_res = self.db.save_data( - ts, mode="promiscuous", storage_mode="gridfs", exclude_keys=["extra2"],return_data=True + ts, + mode="promiscuous", + storage_mode="gridfs", + exclude_keys=["extra2"], + return_data=True, ) assert save_res.live res = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) @@ -2112,9 +2189,7 @@ def test_delete_attributes(self): } res = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) assert ( - not "delta" in res - and not "sampling_rate" in res - and not "starttime" in res + not "delta" in res and not "sampling_rate" in res and not "starttime" in res ) def test_rename_attributes(self): @@ -2124,7 +2199,11 @@ def test_rename_attributes(self): logging_helper.info(ts, "1", "deepcopy") ts["starttime_shift"] = 1.0 save_res = self.db.save_data( - ts, mode="promiscuous", storage_mode="gridfs", exclude_keys=["extra2"], return_data=True + ts, + mode="promiscuous", + storage_mode="gridfs", + exclude_keys=["extra2"], + return_data=True, ) assert save_res.live res = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) @@ -2143,9 +2222,7 @@ def test_rename_attributes(self): } res = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) assert ( - not "delta" in res - and not "sampling_rate" in res - and not "starttime" in res + not "delta" in res and not "sampling_rate" in res and not "starttime" in res ) assert "dt" in res and "sr" in res and "st" in res assert ( @@ -2163,7 +2240,11 @@ def test_fix_attribute_types(self): ts["delta"] = "123" ts["sampling_rate"] = "123" save_res = self.db.save_data( - ts, mode="promiscuous", storage_mode="gridfs", exclude_keys=["extra2"], return_data=True + ts, + mode="promiscuous", + storage_mode="gridfs", + exclude_keys=["extra2"], + return_data=True, ) assert save_res.live res = self.db["wf_TimeSeries"].find_one({"_id": ts["_id"]}) @@ -2285,9 +2366,7 @@ def test_check_links(self): xref_key=["site_id", "source_id"], collection="wf" ) assert len(bad_id_list) == 2 - assert set(bad_id_list) == set( - [bad_site_id_ts["_id"], bad_source_id_ts["_id"]] - ) + assert set(bad_id_list) == set([bad_site_id_ts["_id"], bad_source_id_ts["_id"]]) assert len(missing_id_list) == 1 assert missing_id_list == [missing_site_id_ts["_id"]] @@ -2413,7 +2492,7 @@ def test_update_ensemble_metadata(self): logging_helper.info(ts1, "1", "deepcopy") logging_helper.info(ts2, "1", "deepcopy") logging_helper.info(ts3, "1", "deepcopy") - self.db.save_data(ts1, storage_mode="gridfs",return_data=True) + self.db.save_data(ts1, storage_mode="gridfs", return_data=True) self.db.save_data(ts2, storage_mode="gridfs", return_data=True) self.db.save_data(ts3, storage_mode="gridfs", return_data=True) @@ -2421,7 +2500,7 @@ def test_update_ensemble_metadata(self): ts1.t0 = time ts1["tst"] = time ts2.t0 = time - ts3.t0 = time+5.0 + ts3.t0 = time + 5.0 ts_ensemble = TimeSeriesEnsemble() ts_ensemble.member.append(ts1) ts_ensemble.member.append(ts2) @@ -2433,18 +2512,18 @@ def test_update_ensemble_metadata(self): logging_helper.info(ts_ensemble.member[2], "2", "update_data") # Test this section is a temporary to see if save_ensemble_data # resolves the _id problem: - res=self.db.save_data(ts_ensemble,return_data=True) + res = self.db.save_data(ts_ensemble, return_data=True) # This test needs to be moved and/or changed. It is failing with # and error that says it needs the _id to do an update. # I'm commenting out the next 3 asserts because they will fail until # that is resolved - self.db.update_ensemble_metadata(ts_ensemble, mode='promiscuous') - doc = self.db['wf_TimeSeries'].find_one({'_id': res.member[0]['_id']}) - assert doc['starttime'] == time - doc = self.db['wf_TimeSeries'].find_one({'_id': res.member[1]['_id']}) - assert doc['starttime'] == time - doc = self.db['wf_TimeSeries'].find_one({'_id': res.member[2]['_id']}) - assert doc['starttime'] != time + self.db.update_ensemble_metadata(ts_ensemble, mode="promiscuous") + doc = self.db["wf_TimeSeries"].find_one({"_id": res.member[0]["_id"]}) + assert doc["starttime"] == time + doc = self.db["wf_TimeSeries"].find_one({"_id": res.member[1]["_id"]}) + assert doc["starttime"] == time + doc = self.db["wf_TimeSeries"].find_one({"_id": res.member[2]["_id"]}) + assert doc["starttime"] != time time_new = datetime.utcnow().timestamp() ts_ensemble.member[0]["tst"] = time + 1 @@ -2516,15 +2595,15 @@ def test_update_ensemble_metadata(self): def test_save_ensemble_data(self): """ - Tests writer for ensembles. In v1 this was done with - different methods. From v2 forward the recommended use is - to run with save_data. These tests were modified to make - that change Sept 2023. Also cleaned up some internal - state dependencies in earlier version that carried data forward - inside this function in confusing ways that caused maintenance - issues. - - Note retained compatibility tests for old api. If and when + Tests writer for ensembles. In v1 this was done with + different methods. From v2 forward the recommended use is + to run with save_data. These tests were modified to make + that change Sept 2023. Also cleaned up some internal + state dependencies in earlier version that carried data forward + inside this function in confusing ways that caused maintenance + issues. + + Note retained compatibility tests for old api. If and when those are completely deprecated those sections will need to be changed. """ @@ -2563,7 +2642,7 @@ def test_save_ensemble_data(self): dir_list=dir_list, exclude_objects=[1], ) - assert len(ts_ensemble.member)==2 + assert len(ts_ensemble.member) == 2 # Test atomic read of members just written # Note exclude_objects now deletes data excluded from member vector # so we don't get here without passing the assert immediately above @@ -2577,7 +2656,6 @@ def test_save_ensemble_data(self): assert res.live assert np.isclose(ts_ensemble.member[i].data, res.data).all() - # Repeat same test as immediately above using gridfs storage mode # and adding a second history node ts_ensemble = copy.deepcopy(ts_ensemble0) @@ -2595,10 +2673,10 @@ def test_save_ensemble_data(self): self.db.database_schema.set_default("wf_TimeSeries", "wf") for i in range(len(ts_ensemble.member)): res = self.db.read_data( - ts_ensemble.member[i]["_id"], - mode="promiscuous", - normalize=["site", "source", "channel"], - ) + ts_ensemble.member[i]["_id"], + mode="promiscuous", + normalize=["site", "source", "channel"], + ) assert res.live assert np.isclose(ts_ensemble.member[i].data, res.data).all() @@ -2615,20 +2693,19 @@ def test_save_ensemble_data(self): return_data=True, ) assert ts_ensemble.live - assert len(ts_ensemble.member)==3 + assert len(ts_ensemble.member) == 3 self.db.database_schema.set_default("wf_TimeSeries", "wf") # using atomic reader for now - ensemble reader is tested # in test_read_ensemble_data for i in range(len(ts_ensemble.member)): res = self.db.read_data( - ts_ensemble.member[i]["_id"], - mode="promiscuous", - normalize=["site", "source", "channel"], - ) + ts_ensemble.member[i]["_id"], + mode="promiscuous", + normalize=["site", "source", "channel"], + ) assert res.live assert np.isclose(ts_ensemble.member[i].data, res.data).all() - # Repeat same test as immediately above using gridfs storage mode # and adding a second history node ts_ensemble = copy.deepcopy(ts_ensemble0) @@ -2641,16 +2718,16 @@ def test_save_ensemble_data(self): storage_mode="gridfs", return_data=True, ) - assert len(ts_ensemble.member)==3 + assert len(ts_ensemble.member) == 3 self.db.database_schema.set_default("wf_TimeSeries", "wf") # using atomic reader for now - ensemble reader is tested # in test_read_ensemble_data for i in range(len(ts_ensemble.member)): res = self.db.read_data( - ts_ensemble.member[i]["_id"], - mode="promiscuous", - normalize=["site", "source", "channel"], - ) + ts_ensemble.member[i]["_id"], + mode="promiscuous", + normalize=["site", "source", "channel"], + ) assert res.live assert np.isclose(ts_ensemble.member[i].data, res.data).all() @@ -2688,17 +2765,17 @@ def test_save_ensemble_data(self): dir_list=dir_list, exclude_objects=[1], ) - assert len(seis_ensemble.member)==2 + assert len(seis_ensemble.member) == 2 # Test atomic read of members just written # Note exclude_objects now deletes exlcuded from member vector # so we don't get here without passing the assert immediately above for i in range(len(seis_ensemble.member)): res = self.db.read_data( - seis_ensemble.member[i]["_id"], - mode="promiscuous", - normalize=["site", "source"], - collection="wf_Seismogram", - ) + seis_ensemble.member[i]["_id"], + mode="promiscuous", + normalize=["site", "source"], + collection="wf_Seismogram", + ) assert res.live assert np.isclose(seis_ensemble.member[i].data, res.data).all() @@ -2739,7 +2816,7 @@ def test_save_ensemble_data(self): return_data=True, ) assert seis_ensemble.live - assert len(seis_ensemble.member)==3 + assert len(seis_ensemble.member) == 3 self.db.database_schema.set_default("wf_Seismogram", "wf") # using atomic reader for now - ensemble reader is tested # in test_read_ensemble_data @@ -2752,7 +2829,6 @@ def test_save_ensemble_data(self): assert res.live assert np.isclose(seis_ensemble.member[i].data, res.data).all() - # Repeat same test as immediately above using gridfs storage mode # and adding a second history node seis_ensemble = copy.deepcopy(seis_ensemble0) @@ -2765,28 +2841,27 @@ def test_save_ensemble_data(self): storage_mode="gridfs", return_data=True, ) - assert len(seis_ensemble.member)==3 + assert len(seis_ensemble.member) == 3 self.db.database_schema.set_default("wf_Seismogram", "wf") # using atomic reader for now - ensemble reader is tested # in test_read_ensemble_data for i in range(len(seis_ensemble.member)): res = self.db.read_data( - seis_ensemble.member[i]["_id"], - mode="promiscuous", - normalize=["site", "source"], + seis_ensemble.member[i]["_id"], + mode="promiscuous", + normalize=["site", "source"], ) assert res.live assert np.isclose(seis_ensemble.member[i].data, res.data).all() - def test_read_ensemble_data(self): """ - Test function for reading ensembles. This file has multiple - tests for different features. It currently contains - legacy tests of methods that may be depcrecated in the - future when only read_data and read_distributed_data + Test function for reading ensembles. This file has multiple + tests for different features. It currently contains + legacy tests of methods that may be depcrecated in the + future when only read_data and read_distributed_data will be the mspass readers. Be warned this set of tests - will need to be altered when and if the old ensmeble + will need to be altered when and if the old ensmeble functions are removed. """ self.db.drop_collection("wf_TimeSeries") @@ -2803,15 +2878,15 @@ def test_read_ensemble_data(self): ts_ensemble.member.append(ts2) ts_ensemble.member.append(ts3) ts_ensemble.set_live() - + self.db.database_schema.set_default("wf_TimeSeries", "wf") - # as a legacy function wrapper for this method sets optional - # return_data True. + # as a legacy function wrapper for this method sets optional + # return_data True. ensemble_saved = self.db.save_ensemble_data( ts_ensemble, mode="promiscuous", storage_mode="gridfs" ) # Test legacy reader with cursor - # this works because there is only one previous write to + # this works because there is only one previous write to # wf_TimeSeries cursor = self.db.wf_TimeSeries.find({}) res = self.db.read_ensemble_data( @@ -2826,16 +2901,14 @@ def test_read_ensemble_data(self): # test ensemble_metadata ts_ensemble_metadata = Metadata(res) assert ( - "key1" in ts_ensemble_metadata - and ts_ensemble_metadata["key1"] == "value1" + "key1" in ts_ensemble_metadata and ts_ensemble_metadata["key1"] == "value1" ) assert ( - "key2" in ts_ensemble_metadata - and ts_ensemble_metadata["key2"] == "value2" + "key2" in ts_ensemble_metadata and ts_ensemble_metadata["key2"] == "value2" ) - # test legacy use of list of ObjectIds. + # test legacy use of list of ObjectIds. # That should now generate a TypeError exception we test here - with pytest.raises(TypeError,match="for arg0"): + with pytest.raises(TypeError, match="for arg0"): res = self.db.read_ensemble_data( [ ensemble_saved.member[0]["_id"], @@ -2849,7 +2922,7 @@ def test_read_ensemble_data(self): # test read_data method with cursor which is the read_data signal for ensemble # hence we use read_data not the legacy function - # With current implementation this is identical to call + # With current implementation this is identical to call # above with read_ensmble_data cursor = self.db["wf_TimeSeries"].find({}) res = self.db.read_data( @@ -2865,15 +2938,13 @@ def test_read_ensemble_data(self): # test ensemble_metadata ts_ensemble_metadata = Metadata(res) assert ( - "key1" in ts_ensemble_metadata - and ts_ensemble_metadata["key1"] == "value1" + "key1" in ts_ensemble_metadata and ts_ensemble_metadata["key1"] == "value1" ) assert ( - "key2" in ts_ensemble_metadata - and ts_ensemble_metadata["key2"] == "value2" + "key2" in ts_ensemble_metadata and ts_ensemble_metadata["key2"] == "value2" ) - # repeat for Seismogram - these tests are a near copy + # repeat for Seismogram - these tests are a near copy # of above for TimeSeries seis1 = copy.deepcopy(self.test_seis) seis2 = copy.deepcopy(self.test_seis) @@ -2887,13 +2958,13 @@ def test_read_ensemble_data(self): seis_ensemble.member.append(seis3) seis_ensemble.set_live() self.db.database_schema.set_default("wf_Seismogram", "wf") - # as a legacy function wrapper for this method sets optional - # return_data True. + # as a legacy function wrapper for this method sets optional + # return_data True. ensemble_saved = self.db.save_ensemble_data( seis_ensemble, mode="promiscuous", storage_mode="gridfs" ) # Test legacy reader with cursor - # this works because there is only one previous write to + # this works because there is only one previous write to # wf_TimeSeries cursor = self.db.wf_Seismogram.find({}) res = self.db.read_ensemble_data( @@ -2915,9 +2986,9 @@ def test_read_ensemble_data(self): "key2" in seis_ensemble_metadata and seis_ensemble_metadata["key2"] == "value2" ) - # test legacy use of list of ObjectIds. + # test legacy use of list of ObjectIds. # That should now generate a TypeError exception we test here - with pytest.raises(TypeError,match="for arg0"): + with pytest.raises(TypeError, match="for arg0"): res = self.db.read_ensemble_data( [ ensemble_saved.member[0]["_id"], @@ -2931,7 +3002,7 @@ def test_read_ensemble_data(self): # test read_data method with cursor which is the read_data signal for ensemble # hence we use read_data not the legacy function - # With current implementation this is identical to call + # With current implementation this is identical to call # above with read_ensmble_data cursor = self.db["wf_Seismogram"].find({}) res = self.db.read_data( @@ -2954,8 +3025,6 @@ def test_read_ensemble_data(self): "key2" in seis_ensemble_metadata and seis_ensemble_metadata["key2"] == "value2" ) - - def test_get_response(self): inv = obspy.read_inventory("python/tests/data/TA.035A.xml") @@ -2969,13 +3038,11 @@ def test_get_response(self): assert r == r0 with pytest.raises(MsPASSError, match="missing one of required arguments"): self.db.get_response() - assert ( - self.db.get_response(net="TA", sta="036A", chan="BHE", time=time) - is None - ) + assert self.db.get_response(net="TA", sta="036A", chan="BHE", time=time) is None def teardown_class(self): import glob + try: os.remove("python/tests/data/test_db_output") os.remove("python/tests/data/test_mseed_output") @@ -2983,10 +3050,10 @@ def teardown_class(self): pass client = DBClient("localhost") client.drop_database("dbtest") - # version 2 tests have random file name creation we handle this - # way with glob - filelist=glob.glob("./*-binary") # binary format writes to undefine dfile - filelist2=glob.glob("./*.ms") # mseed writes to undefined to here + # version 2 tests have random file name creation we handle this + # way with glob + filelist = glob.glob("./*-binary") # binary format writes to undefine dfile + filelist2 = glob.glob("./*.ms") # mseed writes to undefined to here try: for f in filelist: os.remove(f) @@ -3120,9 +3187,7 @@ def test_index_and_read_s3_continuous(self): src_mseed_doc["storage_mode"] = "s3_continuous" src_mseed_doc["format"] = "mseed" - mseed_upload_key = ( - "continuous_waveforms/2017/2017_005/CICAC__HNZ___2017005.ms" - ) + mseed_upload_key = "continuous_waveforms/2017/2017_005/CICAC__HNZ___2017005.ms" s3_client.upload_file( Filename=mseed_path, Bucket=src_bucket, Key=mseed_upload_key ) @@ -3235,9 +3300,7 @@ def mock_make_api_call(self, operation_name, kwarg): self, operation_name, kwarg ) - with patch( - "botocore.client.BaseClient._make_api_call", new=mock_make_api_call - ): + with patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call): ts = self.db._download_windowed_mseed_file( "fake_access_key", "fake_secret_key", @@ -3270,9 +3333,7 @@ def mock_make_api_call(self, operation_name, kwarg): ts = TimeSeries(src_mseed_doc, np.ndarray([0], dtype=np.float64)) ts.npts = src_mseed_doc["npts"] - with patch( - "botocore.client.BaseClient._make_api_call", new=mock_make_api_call - ): + with patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call): self.db._read_data_from_s3_lambda( mspass_object=ts, aws_access_key_id="fake_access_key",