Skip to content

Commit

Permalink
Miscellaneous Changes
Browse files Browse the repository at this point in the history
* asdf2salvus  now reads a collection inventories from a folder
* Added checks improving fault-tolerance against network connectivity errors
  • Loading branch information
geojunky committed Nov 14, 2023
1 parent ef83434 commit 2e607fd
Showing 1 changed file with 124 additions and 94 deletions.
218 changes: 124 additions & 94 deletions seismic/ASDFdatabase/asdf2salvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
LastUpdate: dd/mm/yyyy Who Optional description
"""

import os, shutil
import os, shutil, time

from collections import defaultdict
import numpy as np
Expand All @@ -23,7 +23,7 @@
from mpi4py import MPI
from seismic.ASDFdatabase.FederatedASDFDataSet import FederatedASDFDataSet
from seismic.ASDFdatabase.utils import remove_comments
from seismic.misc import split_list
from seismic.misc import split_list, recursive_glob
import click
from shapely.geometry.polygon import Polygon, Point
from tqdm import tqdm
Expand Down Expand Up @@ -128,7 +128,7 @@ def has_masked_values(data_stream):
return stream
# end func

DEBUG = True
DEBUG = False
def extract_data_for_event(fds:FederatedASDFDataSet,
domain:Domain, event:dict,
inventory:Inventory,
Expand Down Expand Up @@ -251,7 +251,7 @@ def is_preferred_component(cha: str):
assert len(event) == 1, 'Invalid event dict {}. Must contain a single event. Aborting..'.format(event)

tag = 'raw_recording'
nets = set({'AU'})
nets = set({'G'})

# make folders
ename = list(event.keys())[0]
Expand Down Expand Up @@ -309,7 +309,7 @@ def is_preferred_component(cha: str):

receivers_dict['{}.{}.{}'.format(net, sta, loc)] = \
{"class_name": "salvus.flow.simple_config.receiver.seismology.SideSetPoint3D",
"salvus_version": "0.12.8",
"salvus_version": "0.12.13",
"arguments": {
"depth_in_m": 0.0,
"radius_of_sphere_in_m": 6371000.0,
Expand All @@ -335,92 +335,109 @@ def is_preferred_component(cha: str):
###############################################################
# Now download additional data from IRIS
###############################################################
client = Client("IRIS")
for net in inventory.networks:
for sta in net.stations:
netsta = '{}.{}'.format(net.code, sta.code)
MAX_ATTEMPTS = 10
SLEEP_SECONDS = 5
retry_count = 1
while(retry_count < MAX_ATTEMPTS):
client = None
try:
client = Client("IRIS")
except Exception as e:
print('Failed to connect to IRIS with exception: {}'.format(e))
print('Retrying after 5 seconds..')
time.sleep(SLEEP_SECONDS)
retry_count += 1
continue
# end try

already_added = False
for seed_id in data_added_set:
if (netsta in seed_id):
already_added = True
break
# end if
# end for
if(already_added): continue
for net in inventory.networks:
#if (DEBUG and net.code not in nets): continue
for sta in net.stations:
netsta = '{}.{}'.format(net.code, sta.code)

stream = []
try:
stream = client.get_waveforms(net.code, sta.code, '*', 'BH?,HH?',
st-1, et+1)
stream.trim(st, et)
except:
pass
# end try
already_added = False
for seed_id in data_added_set:
if (netsta in seed_id):
already_added = True
break
# end if
# end for
if(already_added): continue

stream = []
try:
stream = client.get_waveforms(net.code, sta.code, '*', 'BH?,HH?',
st-1, et+1)
stream.trim(st, et)
except:
pass
# end try

if (len(stream)):

rows = []
for tr in stream: rows.append([tr.stats.network,
tr.stats.station,
tr.stats.location,
tr.stats.channel,
None, None, None])
rows = filter_channels_locations(rows)

for row in rows:
tr = stream.select(network=row[0],
station=row[1],
location=row[2],
channel=row[3])
tr = tr[0]
if((tr.stats.endtime - tr.stats.starttime) < (et - st)): continue
print('Adding trace: {}'.format(tr.id))

resp = None
try:
resp = inventory.get_response(tr.id, tr.stats.starttime)
except Exception as e:
continue
# end try

if(resp):
if (len(stream)):

rows = []
for tr in stream: rows.append([tr.stats.network,
tr.stats.station,
tr.stats.location,
tr.stats.channel,
None, None, None])
rows = filter_channels_locations(rows)

for row in rows:
tr = stream.select(network=row[0],
station=row[1],
location=row[2],
channel=row[3])
tr = tr[0]
if((tr.stats.endtime - tr.stats.starttime) < (et - st)): continue
print('Adding trace: {}'.format(tr.id))

resp = None
try:
oinv = inventory.select(network=tr.stats.network,
station=tr.stats.station,
location=tr.stats.location,
channel=tr.stats.channel)
oinv = remove_comments(oinv)
ods.add_stationxml(oinv)

ods.add_waveforms(tr, tag)

receivers_dict['{}.{}.{}'.format(tr.stats.network,
tr.stats.station,
tr.stats.location)] = \
{"class_name": "salvus.flow.simple_config.receiver.seismology.SideSetPoint3D",
"salvus_version": "0.12.8",
"arguments": {
"depth_in_m": 0.0,
"radius_of_sphere_in_m": 6371000.0,
"network_code": tr.stats.network,
"location_code": tr.stats.location,
"side_set_name": "r1",
"latitude": sta.latitude,
"longitude": sta.longitude,
"station_code": tr.stats.station,
"fields": [receiver_fields]}}

data_added_set.add(tr.id)
resp = inventory.get_response(tr.id, tr.stats.starttime)
except Exception as e:
print('Failed to add inventory/waveform with error: {}. Moving along..'.format(str(e)))
continue
# end try
# end if
# end for
#if (DEBUG): break
# end if

if(resp):
try:
oinv = inventory.select(network=tr.stats.network,
station=tr.stats.station,
location=tr.stats.location,
channel=tr.stats.channel)
oinv = remove_comments(oinv)
ods.add_stationxml(oinv)

ods.add_waveforms(tr, tag)

receivers_dict['{}.{}.{}'.format(tr.stats.network,
tr.stats.station,
tr.stats.location)] = \
{"class_name": "salvus.flow.simple_config.receiver.seismology.SideSetPoint3D",
"salvus_version": "0.12.13",
"arguments": {
"depth_in_m": 0.0,
"radius_of_sphere_in_m": 6371000.0,
"network_code": tr.stats.network,
"location_code": tr.stats.location,
"side_set_name": "r1",
"latitude": sta.latitude,
"longitude": sta.longitude,
"station_code": tr.stats.station,
"fields": [receiver_fields]}}

data_added_set.add(tr.id)
except Exception as e:
print('Failed to add inventory/waveform with error: {}. Moving along..'.format(str(e)))
# end try
# end if
# end for
#if (DEBUG): break
# end if
# end for
# end for
# end for
break # download completed successfully
# wend

json.dump(receivers_dict, open(receivers_ofn, 'w+'), indent=4)
del ods
Expand Down Expand Up @@ -457,7 +474,7 @@ def trim_inventory(inv:Inventory, dom:Domain):
type=click.Path(exists=True))
@click.argument('salvus-events-file', required=True,
type=click.Path(exists=True))
@click.argument('iris_inventory', required=True,
@click.argument('inventory-folder', required=True,
type=click.Path(exists=True))
@click.argument('output-folder', required=True,
type=click.Path(exists=True))
Expand All @@ -470,16 +487,35 @@ def trim_inventory(inv:Inventory, dom:Domain):
@click.option('--receiver-fields', type=str, default='displacement', show_default=True,
help="Name of data folder within Salvus' expected folder hierarchy")
def process(asdf_source, salvus_domain_file, salvus_events_file,
iris_inventory, output_folder,
inventory_folder, output_folder,
seconds_before, seconds_after, data_name, receiver_fields):
"""
ASDF_SOURCE: Path to text file containing paths to ASDF files\n
SALVUS_DOMAIN_FILE: Path to Salvus domain file in json format\n
SALVUS_EVENTS_FILE: Path to Salvus events file in json format\n
IRIS_INVENTORY: Path to complete channel-level IRIS inventory, containing instrument responses\n
INVENTORY_FOLDER: Path to folder containing complete channel-level inventories,
containing instrument responses. The folder is recursively scanned
for station-xml files\n
OUTPUT_FOLDER: Output folder \n
"""
def _read_inventories(inventory_folder):
inv_files = recursive_glob(inventory_folder, '*.xml')

# Read inventories
inv = None
for inv_file in inv_files:
try:
print('Reading inventory file: {}'.format(inv_file))
if(inv is None): inv = read_inventory(inv_file)
else: inv += read_inventory(inv_file)
except Exception as e:
print(e)
assert 0, 'Failed to read inventory file: {}'.format(inv_file)
# end try
# end for
return inv
# end func

fds = None
dom = None
Expand Down Expand Up @@ -516,13 +552,7 @@ def process(asdf_source, salvus_domain_file, salvus_events_file,
tempdir = None
trimmed_inventory_fn = None
if(rank == 0):
try:
print('Reading inventory file with responses: {}'.format(iris_inventory))
inv = read_inventory(iris_inventory)
except Exception as e:
print(str(e))
assert 0, 'Failed to read inventory file: {}'.format(iris_inventory)
# end try
inv = _read_inventories(inventory_folder)

# trim inventory by Domain
inv = trim_inventory(inv, dom)
Expand Down

0 comments on commit 2e607fd

Please sign in to comment.