-
Notifications
You must be signed in to change notification settings - Fork 30
/
Copy pathasdf_data_set.py
2814 lines (2447 loc) · 104 KB
/
asdf_data_set.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
"""
Python implementation of the Adaptable Seismic Data Format (ASDF).
:copyright:
Lion Krischer ([email protected]), 2013-2021
:license:
BSD 3-Clause ("BSD New" or "BSD Simplified")
"""
# Import ObsPy first as import h5py on some machines will some reset paths
# and lxml cannot be loaded anymore afterwards...
import obspy
import collections
import copy
import io
import itertools
import math
import multiprocessing
import os
import re
import sys
import time
import traceback
import uuid
import warnings
import dill
import h5py
import lxml.etree
import numpy as np
import packaging.version
import prov
import prov.model
from .exceptions import (
ASDFException,
ASDFWarning,
ASDFValueError,
NoStationXMLForStation,
)
from .header import (
COMPRESSIONS,
FORMAT_NAME,
SUPPORTED_FORMAT_VERSIONS,
MSG_TAGS,
MAX_MEMORY_PER_WORKER_IN_MB,
POISON_PILL,
PROV_FILENAME_REGEX,
TAG_REGEX,
VALID_SEISMOGRAM_DTYPES,
AUXILIARY_DATA_PATH_PART_PATTERN,
)
from .query import Query, merge_query_functions
from .utils import (
is_mpi_env,
StationAccessor,
sizeof_fmt,
ReceivedMessage,
pretty_receiver_log,
pretty_sender_log,
JobQueueHelper,
StreamBuffer,
AuxiliaryDataGroupAccessor,
AuxiliaryDataContainer,
ProvenanceAccessor,
split_qualified_name,
_read_string_array,
FilteredWaveformAccessor,
label2string,
labelstring2list,
AuxiliaryDataAccessor,
wf_name2tag,
to_list_of_resource_identifiers,
)
from .inventory_utils import isolate_and_merge_station, merge_inventories
# Always raise ASDF warnings as they are important for the user.
warnings.simplefilter("always", ASDFWarning)
class ASDFDataSet:
"""
Object dealing with Adaptable Seismic Data Format (ASDF).
Central object of this Python package.
"""
q = Query()
def __init__(
self,
filename,
compression="gzip-3",
shuffle=True,
debug=False,
mpi=None,
mode="a",
single_item_read_limit_in_mb=4096.0,
format_version=None,
):
"""
:type filename: str
:param filename: The filename of the HDF5 file (to be).
:type compression: str
:param compression: The compression to use. Defaults to
``"gzip-3"`` which yielded good results in the past. Will
only be applied to newly created data sets. Existing ones are not
touched. Using parallel I/O will also disable compression as it
is not possible to use both at the same time.
**Available compressions choices (all of them are lossless):**
* ``None``: No compression
* ``"gzip-0"`` - ``"gzip-9"``: Gzip compression level 0 (worst
but fast) to 9 (best but slow)
* ``"lzf"``: LZF compression
* ``"szip-ec-8"``: szip compression
* ``"szip-ec-10"``: szip compression
* ``"szip-nn-8"``: szip compression
* ``"szip-nn-10"``: szip compression
:type shuffle: bool
:param shuffle: Turn the shuffle filter on or off. Turning it on
oftentimes increases the compression ratio at the expense of
some speed.
:type debug: bool
:param debug: If True, print debug messages. Potentially very verbose.
:param mpi: Force MPI on/off. Don't touch this unless you have a
reason.
:type mpi: bool
:param mode: The mode the file is opened in. Passed to the
underlying :class:`h5py.File` constructor. pyasdf expects to be
able to write to files for many operations so it might result in
strange errors if a read-only mode is used. Nonetheless this is
quite useful for some use cases as long as one is aware of the
potential repercussions.
:type mode: str
:type single_item_read_limit_in_mb: float
:param single_item_read_limit_in_mb: This limits the amount of waveform
data that can be read with a simple attribute or dictionary
access. Some ASDF files can get very big and this raises an
error if one tries to access more then the specified value. This
is mainly to guard against accidentally filling ones memory on
the interactive command line when just exploring an ASDF data
set. There are other ways to still access data and even this
setting can be overwritten. This test will only be triggered when
the file is beigger thatn this limit in the first place as the test
is fairly expensive.
:type format_version: str
:type format_version: The version of ASDF to use. If not given,
it will use the most recent version (currently 1.0.1) if the
ASDF file does not yes exist, or the version specified in the file.
"""
if format_version and format_version not in SUPPORTED_FORMAT_VERSIONS:
raise ASDFValueError(
"ASDF version '%s' is not supported. Supported versions: %s"
% (format_version, ", ".join(SUPPORTED_FORMAT_VERSIONS))
)
self.__force_mpi = mpi
self.debug = debug
# The limit on how much data can be read with a single item access.
self.single_item_read_limit_in_mb = single_item_read_limit_in_mb
# Deal with compression settings.
if compression not in COMPRESSIONS:
msg = (
"Unknown compressions '%s'. Available compressions: \n\t%s"
% (
compression,
"\n\t".join(sorted([str(i) for i in COMPRESSIONS.keys()])),
)
)
raise Exception(msg)
self.__compression = COMPRESSIONS[compression]
self.__shuffle = shuffle
# Turn off compression for parallel I/O. Any already written
# compressed data will be fine. Don't need to raise it if file is
# opened in read-only mode.
if self.__compression[0] and self.mpi and mode != "r":
msg = (
"Compression will be disabled as parallel HDF5 does not "
"support compression"
)
warnings.warn(msg, ASDFWarning)
self.__compression = COMPRESSIONS[None]
self.__shuffle = False
# No need to shuffle if no compression is used.
elif self.__compression[0] is None:
self.__shuffle = False
# Open file or take an already open HDF5 file object.
if not self.mpi:
self.__file = h5py.File(filename, mode=mode)
else:
self.__file = h5py.File(
filename, mode=mode, driver="mpio", comm=self.mpi.comm
)
# Workaround to HDF5 only storing the relative path by default.
self.__original_filename = os.path.abspath(filename)
# Write the file format header to the file.
if "file_format" in self.__file.attrs:
if self.__file.attrs["file_format"].decode() != FORMAT_NAME:
msg = "Not a '%s' file." % FORMAT_NAME
raise ASDFException(msg)
# Raise a warning as this is a bit fishy.
if "file_format_version" not in self.__file.attrs:
msg = (
"No file format version given in file '%s'. The "
"program will continue but the result is undefined."
% self.filename
)
warnings.warn(msg, ASDFWarning)
else:
self.__file.attrs["file_format"] = self._zeropad_ascii_string(
FORMAT_NAME
)
# Deal with the file format version. `format_version` is either None
# or valid (this is checked above).
__most_recent_version = "1.0.3"
# Case 1: Already some kind of format version in the file.
if "file_format_version" in self.__file.attrs:
version_in_file = self.__file.attrs["file_format_version"].decode()
if version_in_file not in SUPPORTED_FORMAT_VERSIONS:
self.asdf_format_version = (
format_version or __most_recent_version
)
msg = (
"The file claims an ASDF version of %s. This "
"version of pyasdf only supports versions: %s. All "
"following write operations will use version %s - "
"other tools might not be able to read the files "
"again - proceed with caution."
% (
version_in_file,
", ".join(SUPPORTED_FORMAT_VERSIONS),
self.asdf_format_version,
)
)
warnings.warn(msg, ASDFWarning)
else:
if format_version and format_version != version_in_file:
msg = (
"You are forcing ASDF version %s but the version "
"of the file is %s. Please proceed with caution "
"as other tools might not be able to read the "
"file again." % (format_version, version_in_file)
)
warnings.warn(msg, ASDFWarning)
self.asdf_format_version = format_version
else:
self.asdf_format_version = version_in_file
# Case 2: Format version not in file yet.
else:
# If not given, use the most recent one.
self.asdf_format_version = format_version or __most_recent_version
self.__file.attrs[
"file_format_version"
] = self._zeropad_ascii_string(self.asdf_format_version)
# Just a final safety check - should not be able to fail!
assert self.asdf_format_version in SUPPORTED_FORMAT_VERSIONS
# Useful for programmatic checks.
self._loose_asdf_format_version = packaging.version.parse(
self.asdf_format_version
)
# Create the waveform and provenance groups if mode is not "r".
if "Waveforms" not in self.__file and mode != "r":
self.__file.create_group("Waveforms")
if "Provenance" not in self.__file and mode != "r":
self.__file.create_group("Provenance")
if "AuxiliaryData" not in self.__file and mode != "r":
self.__file.create_group("AuxiliaryData")
# Easy access to the waveforms.
self.waveforms = StationAccessor(self)
self.auxiliary_data = AuxiliaryDataGroupAccessor(self)
self.provenance = ProvenanceAccessor(self)
# Force synchronous init if run in an MPI environment.
if self.mpi:
self.mpi.comm.barrier()
def __del__(self):
"""
Cleanup. Force flushing and close the file.
If called with MPI this will also enable MPI to cleanly shutdown.
"""
try:
self.flush()
self._close()
# We never want the __del__ method to raise but at least a warning
# should be printed.
except Exception as e:
msg = f"Failed to flush and close the file due to: {e}"
warnings.warn(msg, ASDFWarning)
def __eq__(self, other):
"""
More or less comprehensive equality check. Potentially quite slow as
it checks all data.
:type other:`~pyasdf.asdf_data_set.ASDFDDataSet`
"""
if type(self) is not type(other):
return False
if self._waveform_group.keys() != other._waveform_group.keys():
return False
if self._provenance_group.keys() != other._provenance_group.keys():
return False
if self.events != other.events:
return False
for station, group in self._waveform_group.items():
other_group = other._waveform_group[station]
for tag, data_set in group.items():
other_data_set = other_group[tag]
try:
if tag == "StationXML":
np.testing.assert_array_equal(
data_set[()], other_data_set[()]
)
else:
np.testing.assert_allclose(
data_set[()], other_data_set[()]
)
except AssertionError:
return False
return True
def __ne__(self, other):
return not self.__eq__(other)
def __enter__(self):
"""
Enable usage as a context manager.
"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Enable usage as a context manager.
"""
self.__del__()
return False
def __delattr__(self, key):
# Only the events can be deleted.
if key == "events":
del self.__file["QuakeML"]
# Otherwise try to get the item and if that succeed, raise an error
# that it cannot be deleted.
else:
# Triggers an AttributeError if the attribute does not exist.
getattr(self, key)
raise AttributeError("Attribute '%s' cannot be deleted." % key)
def flush(self):
"""
Flush the underlying HDF5 file.
"""
# A boolean check return False if the file has already been closed
# and thus cannot be flushed.
# There is also a strange interaction with capturing stdout/stderr
# where the __file attribute no longer exists. But it should
# be pretty safe to assume that we then also don't have to flush.
if not getattr(self, "_ASDFDataSet__file", False):
return
self.__file.flush()
def _close(self):
"""
Attempt to close the underlying HDF5 file.
"""
try:
self.__file.close()
except Exception:
pass
def _zeropad_ascii_string(self, text):
"""
Returns a zero padded ASCII string in the most compatible way possible.
Might later need to handle bytes/unicode.
:param text: The text to be converted.
"""
return np.string_((text + "\x00").encode())
@property
def _waveform_group(self):
return self.__file["Waveforms"]
@property
def _provenance_group(self):
return self.__file["Provenance"]
@property
def _auxiliary_data_group(self):
return self.__file["AuxiliaryData"]
@property
def asdf_format_version_in_file(self):
"""
Returns the version of the ASDF version specified in the file.
"""
return self.__file.attrs["file_format_version"].decode()
@property
def filename(self):
"""
Get the path of the underlying file on the filesystem. Works in most
circumstances.
"""
return self.__original_filename
@property
def mpi(self):
"""
Returns a named tuple with ``comm``, ``rank``, ``size``, and ``MPI``
if run with MPI and ``False`` otherwise.
"""
# Simple cache as this is potentially accessed a lot.
if hasattr(self, "__is_mpi"):
return self.__is_mpi
if self.__force_mpi is True:
self.__is_mpi = True
elif self.__force_mpi is False:
self.__is_mpi = False
else:
self.__is_mpi = is_mpi_env()
# If it actually is an mpi environment, set the communicator and the
# rank.
if self.__is_mpi:
# Check if HDF5 has been complied with parallel I/O.
c = h5py.get_config()
if not hasattr(c, "mpi") or not c.mpi:
is_parallel = False
else:
is_parallel = True
if not is_parallel:
msg = (
"Running under MPI requires HDF5/h5py to be complied "
"with support for parallel I/O."
)
raise RuntimeError(msg)
import mpi4py
# This is not needed on most mpi4py installations.
if not mpi4py.MPI.Is_initialized():
mpi4py.MPI.Init()
# Set mpi tuple to easy class wide access.
mpi_ns = collections.namedtuple(
"mpi_ns", ["comm", "rank", "size", "MPI"]
)
comm = mpi4py.MPI.COMM_WORLD
self.__is_mpi = mpi_ns(
comm=comm, rank=comm.rank, size=comm.size, MPI=mpi4py.MPI
)
return self.__is_mpi
@property
def events(self):
"""
Get all events stored in the data set.
:rtype: An ObsPy :class:`~obspy.core.event.Catalog` object.
"""
if "QuakeML" not in self.__file:
return obspy.core.event.Catalog()
data = self.__file["QuakeML"]
if not len(data[()]):
return obspy.core.event.Catalog()
with io.BytesIO(_read_string_array(data)) as buf:
try:
cat = obspy.read_events(buf, format="quakeml")
except Exception:
# ObsPy is not able to read empty QuakeML files but they are
# still valid QuakeML files.
buf.seek(0, 0)
result = None
try:
result = obspy.core.quakeml._validate(buf)
except Exception:
pass
# If validation is successful, but the initial read failed,
# it must have been an empty QuakeML object.
if result is True:
cat = obspy.core.event.Catalog()
else:
# Re-raise exception
raise
return cat
@events.setter
def events(self, event):
"""
Set the events of the dataset.
:param event: One or more events. Will replace all existing ones.
:type event: :class:`~obspy.core.event.Event` or
:class:`~obspy.core.event.Catalog`
"""
if isinstance(event, obspy.core.event.Event):
cat = obspy.core.event.Catalog(events=[event])
elif isinstance(event, obspy.core.event.Catalog):
cat = event
else:
raise TypeError("Must be an ObsPy event or catalog instance")
with io.BytesIO() as buf:
cat.write(buf, format="quakeml")
buf.seek(0, 0)
data = np.frombuffer(buf.read(), dtype=np.dtype("byte"))
# Create the QuakeML data set if it does not exist.
if "QuakeML" not in self.__file:
self.__file.create_dataset(
"QuakeML",
dtype=np.dtype("byte"),
compression=self.__compression[0],
compression_opts=self.__compression[1],
shuffle=self.__shuffle,
shape=(0,),
maxshape=(None,),
fletcher32=not bool(self.mpi),
)
self.__file["QuakeML"].resize(data.shape)
self.__file["QuakeML"][:] = data
@property
def waveform_tags(self):
"""
Returns a set with all tags in the dataset.
"""
return set(
itertools.chain.from_iterable(
i.get_waveform_tags() for i in self.waveforms
)
)
def add_auxiliary_data_file(
self, filename_or_object, path, parameters=None, provenance_id=None
):
"""
Special function adding a file or file like object as an auxiliary
data object denoting a file. This is very useful to store arbitrary
files in ASDF.
:param filename_or_object: Filename, open-file or file-like object.
File should really be opened in binary mode, but this i not
checked.
:param path: The path of the file. Has the same limitations as normal
tags.
:param parameters: Any additional options, as a Python dictionary.
:param provenance_id: The id of the provenance of this data. The
provenance information itself must be added separately. Must be
given as a qualified name, e.g. ``'{namespace_uri}id'``.
"""
if hasattr(filename_or_object, "read"):
data = np.frombuffer(
filename_or_object.read(), dtype=np.dtype("byte")
)
else:
with open(filename_or_object, "rb") as fh:
data = np.frombuffer(fh.read(), dtype=np.dtype("byte"))
if parameters is None:
parameters = {}
self.add_auxiliary_data(
data=data,
data_type="Files",
path=path,
parameters=parameters,
provenance_id=provenance_id,
)
def add_auxiliary_data(
self, data, data_type, path, parameters, provenance_id=None
):
"""
Adds auxiliary data to the file.
:param data: The actual data as a n-dimensional numpy array.
:param data_type: The type of data, think of it like a subfolder.
:param path: The path of the data. Must be unique per data_type. Can
be a path separated by forward slashes at which point it will be
stored in a nested structure.
:param parameters: Any additional options, as a Python dictionary.
:param provenance_id: The id of the provenance of this data. The
provenance information itself must be added separately. Must be
given as a qualified name, e.g. ``'{namespace_uri}id'``.
The data type is the category of the data and the path the name of
that particular piece of data within that category.
>>> ds.add_auxiliary_data(numpy.random.random(10),
... data_type="RandomArrays",
... path="test_data",
... parameters={"a": 1, "b": 2})
>>> ds.auxiliary_data.RandomArrays.test_data
Auxiliary Data of Type 'RandomArrays'
Path: 'test_data'
Data shape: '(10, )', dtype: 'float64'
Parameters:
a: 1
b: 2
The path can contain forward slashes to create a nested hierarchy of
auxiliary data.
>>> ds.add_auxiliary_data(numpy.random.random(10),
... data_type="RandomArrays",
... path="some/nested/path/test_data",
... parameters={"a": 1, "b": 2})
>>> ds.auxiliary_data.RandomArrays.some.nested.path.test_data
Auxiliary Data of Type 'RandomArrays'
Path: 'some/nested/path/test_data'
Data shape: '(10, )', dtype: 'float64'
Parameters:
a: 1
b: 2
"""
# Assert the data type name.
pattern = AUXILIARY_DATA_PATH_PART_PATTERN[self.asdf_format_version]
if re.match(pattern, data_type) is None:
raise ASDFValueError(
"Data type name '{name}' is invalid. It must validate "
"against the regular expression '{pattern}' in ASDF version "
"'{version}'.".format(
name=data_type,
pattern=pattern,
version=self.asdf_format_version,
)
)
# Split the path.
tag_path = path.strip("/").split("/")
for path in tag_path:
if re.match(pattern, path) is None:
raise ASDFValueError(
"Path part name '{name}' is invalid. It must validate "
"against the regular expression '{pattern}' in ASDF "
"version '{version}'.".format(
name=path,
pattern=pattern,
version=self.asdf_format_version,
)
)
if provenance_id is not None:
# Will raise an error if not a valid qualified name.
split_qualified_name(provenance_id)
# Complicated multi-step process but it enables one to use
# parallel I/O with the same functions.
info = self._add_auxiliary_data_get_collective_information(
data=data,
data_type=data_type,
tag_path=tag_path,
parameters=parameters,
provenance_id=provenance_id,
)
if info is None:
return
self._add_auxiliary_data_write_collective_information(info=info)
self._add_auxiliary_data_write_independent_information(
info=info, data=data
)
def _add_auxiliary_data_get_collective_information(
self, data, data_type, tag_path, parameters, provenance_id=None
):
"""
The information required for the collective part of adding some
auxiliary data.
This will extract the group name, the parameters of the dataset to
be created, and the attributes of the dataset.
"""
if "provenance_id" in parameters:
raise ASDFValueError(
"'provenance_id' is a reserved parameter "
"and cannot be used as an arbitrary "
"parameters."
)
# If the provenance id is set, add it to the parameters. At this
# point it is assumed, that the id is valid.
if provenance_id is not None:
parameters = copy.deepcopy(parameters)
parameters.update(
{"provenance_id": self._zeropad_ascii_string(provenance_id)}
)
group_name = "%s/%s" % (data_type, "/".join(tag_path))
if group_name in self._auxiliary_data_group:
msg = (
"Data '%s' already exists in file. Will not be added!"
% group_name
)
warnings.warn(msg, ASDFWarning)
return
# XXX: Figure out why this is necessary. It should work according to
# the specs.
if self.mpi:
fletcher32 = False
else:
fletcher32 = True
info = {
"data_name": group_name,
"data_type": data_type,
"dataset_creation_params": {
"name": "/".join(tag_path),
"shape": data.shape,
"dtype": data.dtype,
"compression": self.__compression[0],
"compression_opts": self.__compression[1],
"shuffle": self.__shuffle,
"fletcher32": fletcher32,
"maxshape": tuple([None] * len(data.shape)),
},
"dataset_attrs": parameters,
}
return info
def _add_auxiliary_data_write_independent_information(self, info, data):
"""
Writes the independent part of auxiliary data to the file.
"""
self._auxiliary_data_group[info["data_name"]][:] = data
def _add_auxiliary_data_write_collective_information(self, info):
"""
Writes the collective part of auxiliary data to the file.
"""
data_type = info["data_type"]
if data_type not in self._auxiliary_data_group:
self._auxiliary_data_group.create_group(data_type)
group = self._auxiliary_data_group[data_type]
# workaround for a bug in hdf5 1.10 and PPC64
info["dataset_creation_params"]["name"] in group
ds = group.create_dataset(**info["dataset_creation_params"])
for key, value in info["dataset_attrs"].items():
ds.attrs[key] = value
def add_quakeml(self, event):
"""
Adds a QuakeML file or an existing ObsPy event to the data set.
An exception will be raised if an event is attempted to be added
that already exists within the data set. Duplicates are detected
based on the public ids of the events.
:param event: Filename or existing ObsPy event object.
:type event: :class:`~obspy.core.event.Event` or
:class:`~obspy.core.event.Catalog`
:raises: ValueError
.. rubric:: Example
For now we will create a new ASDF file but one can also use an
existing one.
>>> impory pyasdf
>>> import obspy
>>> ds = pyasdf.ASDFDataSet("new_file.h5")
One can add an event either by passing a filename ...
>>> ds.add_quakeml("/path/to/quake.xml")
... or by passing an existing event or catalog object.
>>> cat = obspy.read_events("/path/to/quakem.xml")
>>> ds.add_quakeml(cat)
"""
if isinstance(event, obspy.core.event.Event):
cat = obspy.core.event.Catalog(events=[event])
elif isinstance(event, obspy.core.event.Catalog):
cat = event
else:
cat = obspy.read_events(event, format="quakeml")
old_cat = self.events
existing_resource_ids = set([_i.resource_id.id for _i in old_cat])
new_resource_ids = set([_i.resource_id.id for _i in cat])
intersection = existing_resource_ids.intersection(new_resource_ids)
if intersection:
msg = (
"Event id(s) %s already present in ASDF file. Adding "
"events cancelled"
)
raise ValueError(msg % ", ".join(intersection))
old_cat.extend(cat)
self.events = old_cat
def get_data_for_tag(self, station_name, tag):
"""
Returns the waveform and station data for the requested station and
path.
:param station_name: A string with network id and station id,
e.g. ``"IU.ANMO"``
:type station_name: str
:param tag: The path of the waveform.
:type tag: str
:return: tuple of the waveform and the inventory.
:rtype: (:class:`~obspy.core.stream.Stream`,
:class:`~obspy.core.inventory.inventory.Inventory`)
"""
station_name = station_name.replace(".", "_")
station = getattr(self.waveforms, station_name)
st = getattr(station, tag)
inv = (
getattr(station, "StationXML") if "StationXML" in station else None
)
return st, inv
def _get_idx_and_size_estimate(self, waveform_name, starttime, endtime):
network, station = waveform_name.split(".")[:2]
data = self.__file["Waveforms"]["%s.%s" % (network, station)][
waveform_name
]
idx_start = 0
idx_end = data.shape[0]
dt = 1.0 / data.attrs["sampling_rate"]
# Starttime is a timestamp in nanoseconds.
# Get time of first and time of last sample.
data_starttime = obspy.UTCDateTime(
float(data.attrs["starttime"]) / 1.0e9
)
data_endtime = data_starttime + float((idx_end - 1) * dt)
# Modify the data indices to restrict the data if necessary.
if starttime is not None and starttime > data_starttime:
offset = max(0, int((starttime - data_starttime) // dt))
idx_start = offset
# Also modify the data_starttime here as it changes the actually
# read data.
data_starttime += offset * dt
if endtime is not None and endtime < data_endtime:
offset = max(0, int((data_endtime - endtime) // dt))
idx_end -= offset
# Check the size against the limit.
array_size_in_mb = (
(idx_end - idx_start) * data.dtype.itemsize / 1024.0 / 1024.0
)
del data
return idx_start, idx_end, data_starttime, array_size_in_mb
def _get_waveform(self, waveform_name, starttime=None, endtime=None):
"""
Retrieves the waveform for a certain path name as a Trace object. For
internal use only, use the dot accessors for outside access.
"""
network, station, location, channel = waveform_name.split(".")[:4]
channel = channel[: channel.find("__")]
data = self.__file["Waveforms"]["%s.%s" % (network, station)][
waveform_name
]
# Read once upfront to optmize a bit.
attrs = dict(data.attrs)
# Only do the checks if the filesize actually allows for it as its
# fairly expensive or if start or endtime are given.
if (
(self.filesize > self.single_item_read_limit_in_mb * 1024**2)
or starttime is not None
or endtime is not None
):
(
idx_start,
idx_end,
data_starttime,
array_size_in_mb,
) = self._get_idx_and_size_estimate( # NOQA
waveform_name, starttime, endtime
)
if array_size_in_mb > self.single_item_read_limit_in_mb:
msg = (
"The current selection would read %.2f MB into memory "
"from '%s'. The current limit is %.2f MB. Adjust by "
"setting 'ASDFDataSet.single_item_read_limit_in_mb' or "
"use a different method to read the waveform data."
% (
array_size_in_mb,
waveform_name,
self.single_item_read_limit_in_mb,
)
)
raise ASDFValueError(msg)
else:
idx_start = 0
idx_end = data.shape[0]
data_starttime = obspy.UTCDateTime(
float(attrs["starttime"]) / 1.0e9
)
tr = obspy.Trace(data=data[idx_start:idx_end])
tr.stats.starttime = data_starttime
tr.stats.sampling_rate = attrs["sampling_rate"]
tr.stats.network = network
tr.stats.station = station
tr.stats.location = location
tr.stats.channel = channel
# Set some format specific details.
tr.stats._format = FORMAT_NAME
details = obspy.core.util.AttribDict()
setattr(tr.stats, FORMAT_NAME.lower(), details)
details.format_version = self.asdf_format_version
# Read all the ids if they are there.
ids = ["event_id", "origin_id", "magnitude_id", "focal_mechanism_id"]
for name in ids:
if name in attrs:
setattr(
details,
name + "s",
[
obspy.core.event.ResourceIdentifier(_i)
for _i in attrs[name].tobytes().decode().split(",")
],
)
if "provenance_id" in attrs:
details.provenance_id = attrs["provenance_id"].tobytes().decode()
if "labels" in attrs:
details.labels = labelstring2list(attrs["labels"])
# Add the tag to the stats dictionary.
details.tag = wf_name2tag(waveform_name)
return tr
def _get_auxiliary_data(self, data_type, tag):
group = self._auxiliary_data_group[data_type][tag]
if isinstance(group, h5py.Group):
return AuxiliaryDataAccessor(
auxiliary_data_type=re.sub(
r"^/AuxiliaryData/", "", group.name
),
asdf_data_set=self,
)
return AuxiliaryDataContainer(
data=group,
data_type=data_type,
tag=tag,
parameters={i: j for i, j in group.attrs.items()},
)
@property
def filesize(self):
"""
Return the current size of the file.
"""
return os.path.getsize(self.filename)
@property
def pretty_filesize(self):
"""
Return a pretty string representation of the size of the file.
"""
return sizeof_fmt(self.filesize)
def __str__(self):
"""