From f5b77dc6ef961ab56f63c2677f21eb803fc6d952 Mon Sep 17 00:00:00 2001 From: Nicky Hochmuth Date: Fri, 22 Mar 2024 14:21:51 +0100 Subject: [PATCH] Publish_L2 / RID LUT Manager (#385) * bump all product versions to 2 * RID LUT data manager update and use in processing for trigger descaling * Update stixcore/io/RidLutManager.py * fixed API mock via static files on pup99 --------- Co-authored-by: Shane Maloney --- stixcore/__init__.py | 5 + stixcore/data/test.py | 9 + ...00000-9999999999_V02_2312148821-53879.fits | 3 + stixcore/data/test/publish/rid_lut.csv | 4 +- stixcore/data/test/publish/update_rid_lut.csv | 4 +- stixcore/io/RidLutManager.py | 187 ++++++++++++++++++ stixcore/io/fits/processors.py | 10 +- stixcore/io/tests/test_rid_lut_manager.py | 34 ++++ stixcore/processing/LBtoL0.py | 4 + stixcore/processing/find.py | 160 +++++++++++++++ stixcore/processing/pipeline.py | 16 ++ stixcore/processing/pipeline_cli.py | 12 ++ stixcore/processing/pipeline_status.py | 4 +- stixcore/processing/publish.py | 95 +-------- stixcore/processing/tests/test_processing.py | 53 +---- stixcore/products/level0/scienceL0.py | 65 ++++-- stixcore/products/level2/housekeepingL2.py | 7 +- stixcore/products/product.py | 8 +- stixcore/util/scripts/ddpd.py | 12 +- stixcore/version_conf.py | 12 ++ 20 files changed, 542 insertions(+), 162 deletions(-) create mode 100644 stixcore/data/test/products/solo_LB_stix-21-6-21_0000000000-9999999999_V02_2312148821-53879.fits create mode 100644 stixcore/io/RidLutManager.py create mode 100644 stixcore/io/tests/test_rid_lut_manager.py create mode 100644 stixcore/processing/find.py create mode 100644 stixcore/version_conf.py diff --git a/stixcore/__init__.py b/stixcore/__init__.py index 25994517..04745e1d 100644 --- a/stixcore/__init__.py +++ b/stixcore/__init__.py @@ -6,4 +6,9 @@ except ImportError: __version__ = "unknown" +try: + from .version_conf import __version_conf__ +except ImportError: + __version_conf__ = "unknown" + logger = get_logger(__name__) diff --git a/stixcore/data/test.py b/stixcore/data/test.py index c72ab5cd..b3c64919 100644 --- a/stixcore/data/test.py +++ b/stixcore/data/test.py @@ -34,6 +34,7 @@ def __init__(self, data_dir): self.DIR / "solo_L1_stix-ql-lightcurve_20210116_V01.fits"] self.L1_fits = list(self.DIR.glob('solo_L1_stix-*.fits')) self.LB_21_6_30_fits = self.DIR / "solo_LB_stix-21-6-30_0664156800_V01.fits" + self.LB_21_6_21_fits = self.DIR / "solo_LB_stix-21-6-21_0000000000-9999999999_V02_2312148821-53879.fits" # noqa self.__doc__ = "\n".join([f'{str(k)}: {repr(v)}\n\n' for k, v in self.__dict__.items()]) @@ -59,6 +60,13 @@ def __init__(self, data_dir): self.__doc__ = "\n".join([f'{str(k)}: {repr(v)}\n\n' for k, v in self.__dict__.items()]) +class RidLutTestData: + def __init__(self, data_dir): + self.PUB_DIR = data_dir / "publish" + self.RID_LUT = self.PUB_DIR / "rid_lut.csv" + self.RID_LUT_UPDATE = self.PUB_DIR / "update_rid_lut.csv" + + class TestData: def __init__(self, data_dir): self.ephemeris = EphemerisTestData(data_dir) @@ -67,6 +75,7 @@ def __init__(self, data_dir): self.products = IDBTestProduct(data_dir) self.io = IOTestData(data_dir) self.soop = SOOPTestData(data_dir) + self.rid_lut = RidLutTestData(data_dir) self.__doc__ = "\n".join([f"{k}\n******************\n\n{v.__doc__}\n\n\n" for k, v in self.__dict__.items()]) diff --git a/stixcore/data/test/products/solo_LB_stix-21-6-21_0000000000-9999999999_V02_2312148821-53879.fits b/stixcore/data/test/products/solo_LB_stix-21-6-21_0000000000-9999999999_V02_2312148821-53879.fits new file mode 100644 index 00000000..f73bbcac --- /dev/null +++ b/stixcore/data/test/products/solo_LB_stix-21-6-21_0000000000-9999999999_V02_2312148821-53879.fits @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:a0b96723472bdf9505ecff302c25b2bde141c587601b58907959d5fb9297e35d +size 843840 diff --git a/stixcore/data/test/publish/rid_lut.csv b/stixcore/data/test/publish/rid_lut.csv index 5da0185d..933f9af1 100644 --- a/stixcore/data/test/publish/rid_lut.csv +++ b/stixcore/data/test/publish/rid_lut.csv @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:630a0e14c0edfd69daa3c8dbf0425b63059c55dd16caa740fa7c709a22d4f7cc -size 650 +oid sha256:9aed9734a1e809e241520ba31eba3d20aab5e7a6c75d99e8d64d4c93863018aa +size 888 diff --git a/stixcore/data/test/publish/update_rid_lut.csv b/stixcore/data/test/publish/update_rid_lut.csv index 351d69bc..933f9af1 100644 --- a/stixcore/data/test/publish/update_rid_lut.csv +++ b/stixcore/data/test/publish/update_rid_lut.csv @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:1c9a15ea071003c7a0afc9fcff95fa9071f1ee894e4d56a99e2dbd98e821ec77 -size 365 +oid sha256:9aed9734a1e809e241520ba31eba3d20aab5e7a6c75d99e8d64d4c93863018aa +size 888 diff --git a/stixcore/io/RidLutManager.py b/stixcore/io/RidLutManager.py new file mode 100644 index 00000000..337dc574 --- /dev/null +++ b/stixcore/io/RidLutManager.py @@ -0,0 +1,187 @@ +import sys +import time +import tempfile +import urllib.request +from datetime import date, datetime, timedelta + +import numpy as np + +from astropy.io import ascii +from astropy.table import Table +from astropy.table.operations import unique, vstack + +from stixcore.config.config import CONFIG +from stixcore.util.logging import get_logger +from stixcore.util.singleton import Singleton + +__all__ = ['RidLutManager'] + +logger = get_logger(__name__) + + +class RidLutManager(metaclass=Singleton): + """Manages metadata for BSD requests + + The rid is used for a lookup in a csv table file where additional data + connected to a BSD request is stored. Such as a description of the request + purpose or state dependent configurations that are not part of the TM data. + Most important the trigger scaling factor that was used if the trigger scaling + schema is active. + + The data of th LUT is required over the the API endpoint: + https://datacenter.stix.i4ds.net/api/bsd/info/ + """ + + def __init__(self, file, update=False): + """Creates the manager by pointing to the LUT files and setting the update strategy. + + Parameters + ---------- + file : Path + points to the LUT file + update : bool, optional + Update strategy: is the LUT file updated via API?, by default False + """ + self.file = file + self.update = update + self.rid_lut = RidLutManager.read_rid_lut(self.file, self.update) + + def __str__(self) -> str: + return f"file: {self.file} update: {self.update} size: {len(self.rid_lut)}" + + def update_lut(self): + """Updates the LUT file via api request. + + Will create a new file if not available or do a incremental update otherwise, + using the last entry time stamp. + """ + self.rid_lut = RidLutManager.read_rid_lut(self.file, update=self.update) + + def get_reason(self, rid): + """Gets the verbal description of the request purpose by combining several descriptive columns. + + Parameters + ---------- + rid : int + the BSD request id + + Returns + ------- + str + verbal description of the request purpose + """ + request = self.rid_lut.loc[rid] + reason = " ".join(np.atleast_1d(request['description'])) + return reason + + def get_scaling_factor(self, rid): + """Gets the trigger descaling factor connected to the BSD request. + + Parameters + ---------- + rid : int + the BSD request id + + Returns + ------- + int + the proposed trigger descaling factor to use for the BSD processing + + Raises + ------ + ValueError + if no or to many entries found for the given rid + """ + try: + request = self.rid_lut.loc[rid] + except KeyError: + raise ValueError("can't get scaling factor: no request founds for rid: {rid}") + scaling_factor = np.atleast_1d(request['scaling_factor']) + if len(scaling_factor) > 1: + raise ValueError("can't get scaling factor: to many request founds for rid: {rid}") + scf = scaling_factor[0].strip() + return 30 if scf == '' else int(float(scf)) + + @classmethod + def read_rid_lut(cls, file, update=False): + """Reads or creates the LUT of all BSD RIDs and the request reason comment. + + On creation or update an api endpoint from the STIX data center is used + to get the information and persists as a LUT locally. + + Parameters + ---------- + file : Path + path the to LUT file. + update : bool, optional + should the LUT be updated at start up?, by default False + + Returns + ------- + Table + the LUT od RIDs and request reasons. + """ + converters = {'_id': np.uint, 'unique_id': np.uint, 'start_utc': datetime, + 'duration': np.uint, 'type': str, 'subject': str, + 'purpose': str, 'scaling_factor': str, 'ior_id': str, 'comment': str} + + if update or not file.exists(): + rid_lut = Table(names=converters.keys(), dtype=converters.values()) + # the api is limited to batch sizes of a month. in order to get the full table we have + # to ready each month after the start of STIX + last_date = date(2019, 1, 1) + today = date.today() + if file.exists(): + rid_lut = ascii.read(file, delimiter=",", converters=converters, + guess=False, quotechar='"') + mds = rid_lut['start_utc'].max() + try: + last_date = datetime.strptime(mds, '%Y-%m-%dT%H:%M:%S').date() + except ValueError: + last_date = datetime.strptime(mds, '%Y-%m-%dT%H:%M:%S.%f').date() + + if not file.parent.exists(): + logger.info(f'path not found to rid lut file dir: {file.parent} creating dir') + file.parent.mkdir(parents=True, exist_ok=True) + rid_lut_file_update_url = CONFIG.get('Publish', 'rid_lut_file_update_url') + + try: + while (last_date < today): + last_date_1m = last_date + timedelta(days=30) + ldf = last_date.strftime('%Y%m%d') + ld1mf = last_date_1m.strftime('%Y%m%d') + update_url = f"{rid_lut_file_update_url}{ldf}/{ld1mf}" + logger.info(f'download publish lut file: {update_url}') + last_date = last_date_1m + updatefile = tempfile.NamedTemporaryFile().name + urllib.request.urlretrieve(update_url, updatefile) + update_lut = ascii.read(updatefile, delimiter=",", converters=converters, + guess=False, quotechar='"') + + if len(update_lut) < 1: + continue + logger.info(f'found {len(update_lut)} entries') + rid_lut = vstack([rid_lut, update_lut]) + # the stix datacenter API is throttled to 2 calls per second + time.sleep(0.5) + except Exception: + logger.error("RID API ERROR", exc_info=True) + + rid_lut = unique(rid_lut, silent=True) + ascii.write(rid_lut, file, overwrite=True, delimiter=",", quotechar='"') + logger.info(f'write total {len(rid_lut)} entries to local storage') + else: + logger.info(f"read rid-lut from {file}") + rid_lut = ascii.read(file, delimiter=",", converters=converters) + + rid_lut['description'] = [", ".join(r.values()) for r in + rid_lut['subject', 'purpose', 'comment'].filled()] + rid_lut.add_index('unique_id') + + return rid_lut + + +if 'pytest' in sys.modules: + # only set the global in test scenario + from stixcore.data.test import test_data + RidLutManager.instance = RidLutManager(test_data.rid_lut.RID_LUT, update=False) diff --git a/stixcore/io/fits/processors.py b/stixcore/io/fits/processors.py index f350be6a..cfb433e5 100644 --- a/stixcore/io/fits/processors.py +++ b/stixcore/io/fits/processors.py @@ -11,7 +11,7 @@ import stixcore from stixcore.ephemeris.manager import Spice from stixcore.products.level0.scienceL0 import Aspect -from stixcore.products.product import Product +from stixcore.products.product import FitsHeaderMixin, Product from stixcore.soop.manager import SOOPManager, SoopObservationType from stixcore.time.datetime import SEC_IN_DAY from stixcore.util.logging import get_logger @@ -135,7 +135,8 @@ def generate_common_header(cls, filename, product, *, version=0): ('ORIGIN', 'STIX Team, FHNW', 'FHNW'), ('CREATOR', 'stixcore', 'FITS creation software'), ('VERS_SW', str(stixcore.__version__), 'Version of SW that provided FITS file'), - # ('VERS_CAL', '', 'Version of the calibration pack'), + ('VERS_CFG', str(stixcore.__version_conf__), + 'Version of the common instrument configuration package'), ('VERSION', version_format(version), 'Version of data product'), ('OBSRVTRY', 'Solar Orbiter', 'Satellite name'), ('TELESCOP', 'SOLO/STIX', 'Telescope/Sensor name'), @@ -558,6 +559,9 @@ def write_fits(self, product, path=None, *, version=0): primary_hdu = fits.PrimaryHDU() primary_hdu.header.update(primary_header) + if isinstance(product, FitsHeaderMixin): + primary_hdu.header.update(product.get_additional_header_keywords()) + # Add comment and history [primary_hdu.header.add_comment(com) for com in prod.comment] [primary_hdu.header.add_history(com) for com in prod.history] @@ -965,6 +969,8 @@ def generate_primary_header(self, filename, product, *, version=0): # Name, Value, Comment ('LEVEL', 'L2', 'Processing level of the data'), ('VERS_SW', str(stixcore.__version__), 'Version of SW that provided FITS file'), + ('VERS_CFG', str(stixcore.__version_conf__), + 'Version of the common instrument configuration package'), ('HISTORY', 'Processed by STIXCore L2'), ) diff --git a/stixcore/io/tests/test_rid_lut_manager.py b/stixcore/io/tests/test_rid_lut_manager.py new file mode 100644 index 00000000..dbc3d25d --- /dev/null +++ b/stixcore/io/tests/test_rid_lut_manager.py @@ -0,0 +1,34 @@ +import pytest + +from stixcore.io.RidLutManager import RidLutManager + + +def test_singleton(): + assert RidLutManager.instance + + +def test_get_reason(): + r = RidLutManager.instance.get_reason(1) + assert r == 'subject, purpose1, r1' + + +def test_get_reason_multi(): + r = RidLutManager.instance.get_reason(223) + assert r == 'subject, purpose, r223 , c2 subject, purpose_again, r223 , c2' + + +def test_get_scaling_factor(): + sf = RidLutManager.instance.get_scaling_factor(1) + assert sf == 1234 + + +def test_get_scaling_factor_not_found(): + with pytest.raises(ValueError) as e: + RidLutManager.instance.get_scaling_factor(123344) + assert str(e.value).startswith("can't get scaling factor") + + +def test_get_scaling_factor_to_many(): + with pytest.raises(ValueError) as e: + RidLutManager.instance.get_scaling_factor(223) + assert str(e.value).startswith("can't get scaling factor") diff --git a/stixcore/processing/LBtoL0.py b/stixcore/processing/LBtoL0.py index 33b259c0..eebfa188 100644 --- a/stixcore/processing/LBtoL0.py +++ b/stixcore/processing/LBtoL0.py @@ -8,6 +8,7 @@ from stixcore.ephemeris.manager import Spice, SpiceKernelManager from stixcore.idb.manager import IDBManager from stixcore.io.fits.processors import FitsL0Processor +from stixcore.io.RidLutManager import RidLutManager from stixcore.products.level0.scienceL0 import NotCombineException from stixcore.products.product import Product from stixcore.util.logging import get_logger @@ -73,6 +74,9 @@ def process_tm_type(files, tm_type, processor, spice_kernel_path, config, idbm): IDBManager.instance = idbm CONFIG = config + RidLutManager.instance = RidLutManager(Path(CONFIG.get('Publish', 'rid_lut_file')), + update=False) + # Stand alone packet data if (tm_type[0] == 21 and tm_type[-2] not in {20, 21, 22, 23, 24, 42}) or tm_type[0] != 21: for file in files: diff --git a/stixcore/processing/find.py b/stixcore/processing/find.py new file mode 100644 index 00000000..3d48dc5d --- /dev/null +++ b/stixcore/processing/find.py @@ -0,0 +1,160 @@ +import sys +import logging +import argparse +from pathlib import Path + +from astropy.io import fits + +from stixcore.config.config import CONFIG +from stixcore.time.datetime import SCETime +from stixcore.util.logging import get_logger + +__all__ = ['find_fits'] + +logger = get_logger(__name__) + + +def find_fits(args): + """CLI STIX find fits file by properties + + Parameters + ---------- + args : list + see -h for details + + Returns + ------- + list + list of fits files meeting the criteria + """ + + parser = argparse.ArgumentParser(description='STIX find fits files by properties', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + parser.add_argument("-v", "--include_versions", + help="what versions should be found", + default=CONFIG.get('Publish', 'include_versions', fallback="*"), type=str) + + parser.add_argument("-l", "--include_levels", + help="what levels should be found", type=str, + default=CONFIG.get('Publish', 'include_levels', fallback="LB, L0, L1, L2")) + + parser.add_argument("-p", "--include_products", + help="what products should be found", type=str, + default=CONFIG.get('Publish', 'include_products', + fallback="21,ql,hk,sci,aux,cal")) + + parser.add_argument("-s", "--start_obt", + help="start onboard time", + default=0, type=int) + + parser.add_argument("-e", "--end_obt", + help="end onboard time", + default=2 ^ 32, type=int) + + parser.add_argument("-f", "--fits_dir", + help="input FITS directory for files to publish ", + default=CONFIG.get('Publish', 'fits_dir'), type=str) + + parser.add_argument("--log_level", + help="the level of logging", + default=None, type=str, + choices=["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"], + dest='log_level') + + args = parser.parse_args(args) + + if args.log_level: + logger.setLevel(logging.getLevelName(args.log_level)) + + fits_dir = Path(args.fits_dir) + + include_levels = dict([(level, 1) for level in + args.include_levels.lower().replace(" ", "").split(",")]) + + if args.include_versions == "*": + include_all_versions = True + else: + include_all_versions = False + include_versions = dict([(int(version), 1) for version in + args.include_versions.lower().replace(" ", "") + .replace("v", "").split(",")]) + + if args.include_products == "*": + include_all_products = True + else: + include_all_products = False + include_products = dict([(prod, 1) for prod in + args.include_products.lower().replace(" ", "").split(",")]) + + candidates = fits_dir.rglob("solo_*.fits") + found = list() + + n_candidates = 0 + + args.start_obt = SCETime(args.start_obt) + args.end_obt = SCETime(args.end_obt) + + logger.info(f'include_all_products: {include_all_products}') + logger.info(f'include_products: {include_products}') + logger.info(f'include_all_versions: {include_all_versions}') + logger.info(f'include_versions: {include_all_products}') + logger.info(f'include_levels: {include_levels}') + logger.info(f'fits_dir: {fits_dir}') + logger.info(f'start obt: {args.start_obt}') + logger.info(f'end obt: {args.end_obt}') + + logger.info("start searching fits files") + + for c in candidates: + n_candidates += 1 + + if n_candidates % 1000 == 0: + logger.info(f'tested: {n_candidates} found {len(found)}') + + parts = c.name[:-5].split('_') + level = parts[1].lower() + + # version filter + if not include_all_versions: + version = int(parts[4].lower().replace("v", "")) + if version not in include_versions: + continue + + # level filter + if level not in include_levels: + continue + + # product filter + if not include_all_products: + product = str(parts[2].lower().split("-")[1]) + if product not in include_products: + continue + + # prod = Product(c) + obt_beg = SCETime.from_string(fits.getval(c, 'OBT_BEG')) + obt_end = SCETime.from_string(fits.getval(c, 'OBT_END')) + + # obt filter overlapping + if not ((obt_beg >= args.start_obt and obt_end <= args.end_obt) or + (obt_beg >= args.start_obt and args.end_obt < obt_end) or + (obt_beg < args.start_obt and obt_end > args.start_obt)): + continue + + found.append(c) + + logger.info(f'#candidates: {n_candidates}') + logger.info(f'#found: {len(found)}') + + for f in found: + print(str(f)) + + return found + + +def main(): + find_fits(sys.argv[1:]) + + +if __name__ == '__main__': + main() diff --git a/stixcore/processing/pipeline.py b/stixcore/processing/pipeline.py index 215f9850..24163fec 100644 --- a/stixcore/processing/pipeline.py +++ b/stixcore/processing/pipeline.py @@ -9,6 +9,7 @@ import logging import smtplib import warnings +import importlib import threading from queue import Queue from pprint import pformat @@ -24,6 +25,7 @@ from stixcore.config.config import CONFIG from stixcore.ephemeris.manager import Spice, SpiceKernelManager from stixcore.idb.manager import IDBManager +from stixcore.io.RidLutManager import RidLutManager from stixcore.io.soc.manager import SOCPacketFile from stixcore.processing.L0toL1 import Level1 from stixcore.processing.L1toL2 import Level2 @@ -33,6 +35,7 @@ from stixcore.soop.manager import SOOPManager from stixcore.util.logging import STX_LOGGER_DATE_FORMAT, STX_LOGGER_FORMAT, get_logger from stixcore.util.singleton import Singleton +from stixcore.version_conf import get_conf_version __all__ = ['GFTSFileHandler', 'process_tm', 'PipelineErrorReport', 'PipelineStatus'] @@ -201,12 +204,21 @@ def log_result(self, gen_files): def process_tm(path, **args): with PipelineErrorReport(path) as error_report: + # update the rid LUT file from the API and read in again + RidLutManager.instance.update_lut() + # set the latest spice kernel files for each run if ((args['spm'].get_latest_mk()[0] not in Spice.instance.meta_kernel_path) or (args['spm'].get_latest_mk_pred()[0] not in Spice.instance.meta_kernel_path)): Spice.instance = Spice(args['spm'].get_latest_mk_and_pred()) logger.info("new spice kernels detected and loaded") + # update version of common config it might have changed + if get_conf_version() != stixcore.__version_conf__: + importlib.reload(stixcore.version_conf) + importlib.reload(stixcore) + logger.info(f"new common conf detected new version is: {stixcore.__version_conf__}") + lb_files = process_tmtc_to_levelbinary([SOCPacketFile(path)]) logger.info(f"generated LB files: \n{pformat(lb_files)}") @@ -261,6 +273,7 @@ def get_singletons(): s = io.StringIO() s.write("\nSINGLETONS\n\n") s.write(f"SOOPManager: {SOOPManager.instance.data_root}\n") + s.write(f"RidLutManager: {RidLutManager.instance}\n") s.write(f"SPICE: {Spice.instance.meta_kernel_path}\n") s.write(f"IDBManager: {IDBManager.instance.data_root}\n" f"Versions:\n{IDBManager.instance.get_versions()}\n" @@ -274,6 +287,7 @@ def get_version(): s = io.StringIO() s.write("\nPIPELINE VERSION\n\n") s.write(f"Version: {str(stixcore.__version__)}\n") + s.write(f"Common instrument config version: {str(stixcore.__version_conf__)}\n") s.write("PROCESSING VERSIONS\n\n") for p in Product.registry: s.write(f"Prod: {p.__name__}\n File: {inspect.getfile(p)}\n" @@ -385,6 +399,8 @@ def main(): spm = SpiceKernelManager(Path(CONFIG.get("Paths", "spice_kernels"))) Spice.instance = Spice(spm.get_latest_mk_and_pred()) + RidLutManager.instance = RidLutManager(Path(CONFIG.get('Publish', 'rid_lut_file')), update=True) + logging_handler = LoggingEventHandler(logger=logger) soop_manager = SOOPManager(soop_path) diff --git a/stixcore/processing/pipeline_cli.py b/stixcore/processing/pipeline_cli.py index 4e88ee3c..ff19b95b 100755 --- a/stixcore/processing/pipeline_cli.py +++ b/stixcore/processing/pipeline_cli.py @@ -11,6 +11,7 @@ from stixcore.config.config import CONFIG from stixcore.ephemeris.manager import Spice, SpiceKernelManager +from stixcore.io.RidLutManager import RidLutManager from stixcore.io.soc.manager import SOCManager, SOCPacketFile from stixcore.processing.L0toL1 import Level1 from stixcore.processing.L1toL2 import Level2 @@ -136,6 +137,15 @@ def main(): help="clean all files from first", default=False, type=bool, const=True, nargs="?") + parser.add_argument("-r", "--rid_lut_file", + help=("Path to the rid LUT file"), + default=CONFIG.get('Publish', 'rid_lut_file'), type=str) + + parser.add_argument("--update_rid_lut", + help="update rid lut file before publishing", + default=False, + action='store_true', dest='update_rid_lut') + args = parser.parse_args() # pathes @@ -172,6 +182,8 @@ def main(): _spm = SpiceKernelManager(Path(CONFIG.get('Paths', 'spice_kernels'))) spicemeta = _spm.get_latest_mk_and_pred() + RidLutManager.instance = RidLutManager(Path(args.rid_lut_file), update=args.update_rid_lut) + Spice.instance = Spice(spicemeta) soc = SOCManager(Path(CONFIG.get('Paths', 'tm_archive'))) diff --git a/stixcore/processing/pipeline_status.py b/stixcore/processing/pipeline_status.py index cd4e02e6..fb9a35ce 100755 --- a/stixcore/processing/pipeline_status.py +++ b/stixcore/processing/pipeline_status.py @@ -24,11 +24,13 @@ def get_status(msg, port=12346): sock.sendall(msg) sock.sendall(b'\n') server = sock.makefile("rb") + response = '' while True: line = server.readline() if not line: break - return f"{line.decode().rstrip()}" + response += line.decode() + return f"{response.rstrip()}" finally: sock.close() diff --git a/stixcore/processing/publish.py b/stixcore/processing/publish.py index 77cac2b4..68fff6dd 100644 --- a/stixcore/processing/publish.py +++ b/stixcore/processing/publish.py @@ -1,21 +1,18 @@ import os import sys -import time import shutil import logging import smtplib import sqlite3 import argparse import tempfile -import urllib.request from io import StringIO from enum import Enum from pprint import pformat from pathlib import Path -from datetime import date, datetime, timedelta +from datetime import date, datetime from collections import defaultdict -import numpy as np from paramiko import SSHClient from scp import SCPClient @@ -23,10 +20,10 @@ from astropy.io import ascii, fits from astropy.io.fits.diff import HDUDiff from astropy.table import Table -from astropy.table.operations import unique, vstack from stixcore.config.config import CONFIG from stixcore.ephemeris.manager import Spice, SpiceKernelManager +from stixcore.io.RidLutManager import RidLutManager from stixcore.processing.pipeline_status import pipeline_status from stixcore.products.product import Product from stixcore.util.logging import get_logger @@ -387,15 +384,13 @@ def update_ephemeris_headers(fits_file, spice): logger.info(f"updated ephemeris headers of {fits_file}") -def add_BSD_comment(p, rid_lut): +def add_BSD_comment(p): """Adds a comment in the FITS header in case of BSD with the request ID and reason. Parameters ---------- p : Path Path to FITS file - rid_lut : Table - the LUT with all RIDs and reasons Returns ------- @@ -407,8 +402,9 @@ def add_BSD_comment(p, rid_lut): if len(parts) > 5: rid = parts[5].replace(".fits", "") rid = int(rid.split('-')[0]) - reqeust = rid_lut.loc[rid] - reason = " ".join(np.atleast_1d(reqeust['description'])) + + reason = RidLutManager.instance.get_reason(rid) + c_entry = f"BSD request id: '{rid}' reason: '{reason}'" fits.setval(p, "COMMENT", value=c_entry) return c_entry @@ -467,81 +463,6 @@ def copy_file(scp, p, target_dir, add_history_entry=True): return (False, e) -def read_rid_lut(file, update=False): - """Reads or creates the LUT of all BSD RIDs and the request reason comment. - - On creation or update an api endpoint from the STIX data center is used - to get the information and persists as a LUT locally. - - Parameters - ---------- - file : Path - path the to LUT file. - update : bool, optional - should the LUT be updated at start up?, by default False - - Returns - ------- - Table - the LUT od RIDs and request reasons. - """ - converters = {'_id': np.uint, 'unique_id': np.uint, 'start_utc': datetime, - 'duration': np.uint, 'type': str, 'subject': str, - 'purpose': str, 'comment': str} - - if update or not file.exists(): - rid_lut = Table(names=converters.keys(), dtype=converters.values()) - last_date = date(2018, 1, 1) - today = date.today() - if file.exists(): - rid_lut = ascii.read(file, delimiter=",", converters=converters) - mds = rid_lut['start_utc'].max() - try: - last_date = datetime.strptime(mds, '%Y-%m-%dT%H:%M:%S').date() - except ValueError: - last_date = datetime.strptime(mds, '%Y-%m-%dT%H:%M:%S.%f').date() - - if not file.parent.exists(): - logger.info(f'path not found to rid lut file dir: {file.parent} creating dir') - file.parent.mkdir(parents=True, exist_ok=True) - rid_lut_file_update_url = CONFIG.get('Publish', 'rid_lut_file_update_url') - - try: - while (last_date < today): - last_date_1m = last_date + timedelta(days=30) - ldf = last_date.strftime('%Y%m%d') - ld1mf = last_date_1m.strftime('%Y%m%d') - update_url = f"{rid_lut_file_update_url}{ldf}/{ld1mf}" - logger.info(f'download publish lut file: {update_url}') - last_date = last_date_1m - updatefile = tempfile.NamedTemporaryFile().name - urllib.request.urlretrieve(update_url, updatefile) - update_lut = ascii.read(updatefile, delimiter=",", converters=converters) - - if len(update_lut) < 1: - continue - logger.info(f'found {len(update_lut)} entries') - rid_lut = vstack([rid_lut, update_lut]) - # the stix datacenter API is throttled to 2 calls per second - time.sleep(0.5) - except Exception as e: - logger.error("RID API ERROR") - logger.error(e) - - rid_lut = unique(rid_lut, silent=True) - ascii.write(rid_lut, file, overwrite=True, delimiter=",") - logger.info(f'write total {len(rid_lut)} entries to local storage') - else: - logger.info(f"read rid-lut from {file}") - rid_lut = ascii.read(file, delimiter=",", converters=converters) - - rid_lut['description'] = [", ".join(r.values()) for r in - rid_lut['subject', 'purpose', 'comment'].filled()] - rid_lut.add_index('unique_id') - - return rid_lut - - def publish_fits_to_esa(args): """CLI STIX publish to ESA processing step @@ -662,7 +583,7 @@ def publish_fits_to_esa(args): spice = Spice(spicemeta) Spice.instance = spice - rid_lut = read_rid_lut(Path(args.rid_lut_file), update=args.update_rid_lut) + RidLutManager.instance = RidLutManager(Path(args.rid_lut_file), update=args.update_rid_lut) scp = None if args.target_host != 'localhost': @@ -822,7 +743,7 @@ def publish_fits_to_esa(args): published = defaultdict(list) for p in to_publish: try: - comment = add_BSD_comment(p, rid_lut) + comment = add_BSD_comment(p) if is_incomplete_file_name(p.name): p = p.rename(p.parent / get_complete_file_name(p.name)) diff --git a/stixcore/processing/tests/test_processing.py b/stixcore/processing/tests/test_processing.py index dc26bc72..0d07cde7 100644 --- a/stixcore/processing/tests/test_processing.py +++ b/stixcore/processing/tests/test_processing.py @@ -9,6 +9,7 @@ import numpy as np import pytest +from astropy.io import fits from astropy.io.fits.diff import FITSDiff from stixcore.config.config import CONFIG @@ -25,7 +26,7 @@ from stixcore.products.level0.quicklookL0 import LightCurve from stixcore.products.product import Product from stixcore.soop.manager import SOOPManager -from stixcore.tmtc.packets import TMTC, GenericTMPacket +from stixcore.tmtc.packets import GenericTMPacket from stixcore.util.logging import get_logger logger = get_logger(__name__) @@ -58,51 +59,18 @@ def packet(): return packet -@pytest.mark.skip(reason="will be replaces with end2end test soon") -def test_level_b(soc_manager, out_dir): - files_to_process = list(soc_manager.get_files(TMTC.TM)) - res = process_tmtc_to_levelbinary(files_to_process=files_to_process[0:1], archive_path=out_dir) - assert len(res) == 1 - fits = res.pop() - diff = FITSDiff(test_data.products.DIR / fits.name, fits, - ignore_keywords=['CHECKSUM', 'DATASUM', 'DATE', 'VERS_SW']) - if not diff.identical: - print(diff.report()) - assert diff.identical - - -@pytest.mark.skip(reason="will be replaces with end2end test soon") -def test_level_0(out_dir): - lb = test_data.products.LB_21_6_30_fits +def test_level_0_descaling_trigger(out_dir): + lb = test_data.products.LB_21_6_21_fits l0 = Level0(out_dir / 'LB', out_dir) res = l0.process_fits_files(files=[lb]) - assert len(res) == 2 - for fits in res: - diff = FITSDiff(test_data.products.DIR / fits.name, fits, - ignore_keywords=['CHECKSUM', 'DATASUM', 'DATE', 'VERS_SW']) - if not diff.identical: - print(diff.report()) - assert diff.identical - - -@pytest.mark.skip(reason="will be replaces with end2end test soon") -def test_level_1(out_dir): - SOOPManager.instance = SOOPManager(Path(__file__).parent.parent.parent - / 'data' / 'test' / 'soop') - - l0 = test_data.products.L0_LightCurve_fits - l1 = Level1(out_dir / 'LB', out_dir) - res = l1.process_fits_files(files=l0) assert len(res) == 1 - for fits in res: - diff = FITSDiff(test_data.products.DIR / fits.name, fits, - ignore_keywords=['CHECKSUM', 'DATASUM', 'DATE', 'VERS_SW']) - if not diff.identical: - print(diff.report()) - assert diff.identical + hist = fits.getval(res[0], 'HISTORY') + factor = fits.getval(res[0], 'TRIG_SCA') + assert "trigger descaled with 30" in hist + assert factor == 30 -@pytest.mark.skip(reason="needs proper spize pointing kernels") +@pytest.mark.skip(reason="needs proper spice pointing kernels") def test_level_2(out_dir, spicekernelmanager): SOOPManager.instance = SOOPManager(Path(__file__).parent.parent.parent / 'data' / 'test' / 'soop') @@ -120,7 +88,7 @@ def test_level_2(out_dir, spicekernelmanager): assert pl2.parent[0] in input_names -@pytest.mark.skip(reason="needs proper spize pointing kernels") +@pytest.mark.skip(reason="needs proper spice pointing kernels") def test_level_2_auxiliary(out_dir, spicekernelmanager): SOOPManager.instance = SOOPManager(Path(__file__).parent.parent.parent / 'data' / 'test' / 'soop') @@ -275,7 +243,6 @@ def test_single_vs_batch(out_dir): def test_pipeline_logging(spicekernelmanager, out_dir): - CONTINUE_ON_ERROR = CONFIG.getboolean('Logging', 'stop_on_error', fallback=False) FITS_ARCHIVE = CONFIG.get('Paths', 'fits_archive') LOG_LEVEL = CONFIG.get('Pipeline', 'log_level') diff --git a/stixcore/products/level0/scienceL0.py b/stixcore/products/level0/scienceL0.py index eaaee22e..169b7a57 100644 --- a/stixcore/products/level0/scienceL0.py +++ b/stixcore/products/level0/scienceL0.py @@ -6,6 +6,7 @@ import astropy.units as u from stixcore.config.reader import read_energy_channels +from stixcore.io.RidLutManager import RidLutManager from stixcore.products.common import ( _get_compression_scheme, _get_detector_mask, @@ -13,7 +14,13 @@ get_min_uint, unscale_triggers, ) -from stixcore.products.product import ControlSci, Data, EnergyChannelsMixin, GenericProduct +from stixcore.products.product import ( + ControlSci, + Data, + EnergyChannelsMixin, + FitsHeaderMixin, + GenericProduct, +) from stixcore.time import SCETime, SCETimeRange from stixcore.time.datetime import SCETimeDelta from stixcore.util.logging import get_logger @@ -64,7 +71,7 @@ class NotCombineException(Exception): pass -class ScienceProduct(GenericProduct, EnergyChannelsMixin): +class ScienceProduct(GenericProduct, EnergyChannelsMixin, FitsHeaderMixin): """Generic science data product class composed of control and data.""" def __init__(self, *, service_type, service_subtype, ssid, control, data, **kwargs): """Create a generic science data product composed of control and data. @@ -303,6 +310,9 @@ def from_levelb(cls, levelb, parent=''): t_skm, t_skm_meta = _get_compression_scheme(packets, 'NIX00242') control.add_data('compression_scheme_triggers_skm', (t_skm[0].reshape(1, 3), t_skm_meta)) + header_history = [] + additional_header_keywords = [] + data = Data() try: data['delta_time'] = np.uint32(packets.get_value('NIX00441').to(u.ds)) @@ -347,10 +357,13 @@ def from_levelb(cls, levelb, parent=''): for i in range(242, 258)]) if control['compression_scheme_triggers_skm'].tolist() == [[0, 0, 7]]: - logger.debug('Unscaling trigger ') + factor = RidLutManager.instance.get_scaling_factor(control['request_id'][0]) + header_history.append(f"trigger descaled with {factor}") + logger.debug(f'Unscaling trigger: {factor}') + additional_header_keywords.append(("TRIG_SCA", factor, 'used trigger descale factor')) triggers, triggers_var = unscale_triggers( triggers, integration=data['integration_time'], - detector_masks=data['detector_masks'], ssid=levelb.ssid) + detector_masks=data['detector_masks'], ssid=levelb.ssid, factor=factor) data['triggers'] = triggers.T.astype(get_min_uint(triggers)) data['triggers'].meta = {'NIXS': [f'NIX00{i}' for i in range(242, 258)]} @@ -510,13 +523,17 @@ def from_levelb(cls, levelb, parent=''): 'counts', 'counts_comp_err'] data['control_index'] = np.ubyte(0) - return cls(service_type=packets.service_type, + prod = cls(service_type=packets.service_type, service_subtype=packets.service_subtype, ssid=packets.ssid, control=control, data=data, idb_versions=idb_versions, - packets=packets) + packets=packets, + history=header_history) + + prod.add_additional_header_keywords(additional_header_keywords) + return prod @classmethod def is_datasource_for(cls, *, service_type, service_subtype, ssid, **kwargs): @@ -564,6 +581,9 @@ def from_levelb(cls, levelb, parent=''): control.add_data('compression_scheme_triggers_skm', (t_skm[0].reshape(1, 3), t_skm_meta)) data = Data() + header_history = [] + additional_header_keywords = [] + try: data['delta_time'] = packets.get_value('NIX00441') data.add_meta(name='delta_time', nix='NIX00441', packets=packets) @@ -594,10 +614,13 @@ def from_levelb(cls, levelb, parent=''): triggers = np.array(triggers).reshape(-1, 16).T if control['compression_scheme_triggers_skm'].tolist() == [[0, 0, 7]]: - logger.debug('Unscaling trigger ') + factor = RidLutManager.instance.get_scaling_factor(control['request_id'][0]) + header_history.append(f"trigger descaled with {factor}") + logger.debug(f'Unscaling trigger: {factor}') + additional_header_keywords.append(("TRIG_SCA", factor, 'used trigger descale factor')) triggers, triggers_var = unscale_triggers( triggers, integration=SCETimeDelta(data['integration_time']), - detector_masks=data['detector_masks'], ssid=levelb.ssid) + detector_masks=data['detector_masks'], ssid=levelb.ssid, factor=factor) data['triggers'] = triggers.T data['triggers'].meta = {'NIXS': [f'NIX00{i}' for i in range(242, 258)]} # , @@ -640,13 +663,17 @@ def from_levelb(cls, levelb, parent=''): + data['delta_time'] + data['integration_time'] / 2) data['timedel'] = SCETimeDelta(data['integration_time']) - return cls(service_type=packets.service_type, + prod = cls(service_type=packets.service_type, service_subtype=packets.service_subtype, ssid=packets.ssid, control=control, data=data, idb_versions=idb_versions, - packets=packets) + packets=packets, + history=header_history) + + prod.add_additional_header_keywords(additional_header_keywords) + return prod @classmethod def is_datasource_for(cls, *, service_type, service_subtype, ssid, **kwargs): @@ -677,6 +704,9 @@ def from_levelb(cls, levelb, parent=''): t_skm, t_skm_meta = _get_compression_scheme(packets, 'NIX00267') control.add_data('compression_scheme_triggers_skm', (t_skm[0].reshape(1, 3), t_skm_meta)) + header_history = [] + additional_header_keywords = [] + control['detector_masks'] = np.unique(_get_detector_mask(packets)[0], axis=0) control['detector_masks'] = fix_detector_mask(control, control['detector_masks']) control.add_meta(name='detector_masks', nix='NIX00407', packets=packets) @@ -770,10 +800,13 @@ def from_levelb(cls, levelb, parent=''): triggers_var = packets.get_value('NIX00267', attr='error') if control['compression_scheme_triggers_skm'].tolist() == [[0, 0, 7]]: - logger.debug('Unscaling trigger ') + factor = RidLutManager.instance.get_scaling_factor(control['request_id'][0]) + header_history.append(f"trigger descaled with {factor}") + additional_header_keywords.append(("TRIG_SCA", factor, 'used trigger descale factor')) + logger.debug(f'Unscaling trigger: {factor}') triggers, triggers_var = unscale_triggers( triggers, integration=deltas, - detector_masks=control['detector_masks'], ssid=levelb.ssid) + detector_masks=control['detector_masks'], ssid=levelb.ssid, factor=factor) # Data data = Data() @@ -796,13 +829,17 @@ def from_levelb(cls, levelb, parent=''): data['counts_comp_err'] = np.float32(np.sqrt(counts_var) * u.ct) data['control_index'] = np.ubyte(0) - return cls(service_type=packets.service_type, + prod = cls(service_type=packets.service_type, service_subtype=packets.service_subtype, ssid=packets.ssid, control=control, data=data, idb_versions=idb_versions, - packets=packets) + packets=packets, + history=header_history) + + prod.add_additional_header_keywords(additional_header_keywords) + return prod @classmethod def is_datasource_for(cls, *, service_type, service_subtype, ssid, **kwargs): diff --git a/stixcore/products/level2/housekeepingL2.py b/stixcore/products/level2/housekeepingL2.py index fbeb1588..a993f4a4 100644 --- a/stixcore/products/level2/housekeepingL2.py +++ b/stixcore/products/level2/housekeepingL2.py @@ -310,10 +310,10 @@ def postprocessing(self, result, fits_processor): aux = Ephemeris(control=control, data=data, idb_versions=HK.idb_versions) - aux.add_additional_header_keywords( + aux.add_additional_header_keyword( ('STX_GSW', result.idlgswversion[0].decode(), 'Version of STX-GSW that provided data')) - aux.add_additional_header_keywords( + aux.add_additional_header_keyword( ('HISTORY', 'aspect data processed by STX-GSW', '')) files.extend(fits_processor.write_fits(aux)) else: @@ -323,7 +323,8 @@ def postprocessing(self, result, fits_processor): class Ephemeris(HKProduct, L2Mixin): - """Aspect auxiliary data. + """Ephemeris data, including spacecraft attitude and coordinates as well as STIX + pointing with respect to Sun center as derived from the STIX aspect system. In level 2 format. """ diff --git a/stixcore/products/product.py b/stixcore/products/product.py index 4af09098..01185aa6 100644 --- a/stixcore/products/product.py +++ b/stixcore/products/product.py @@ -781,11 +781,17 @@ def get_additional_header_keywords(self): return self._additional_header_keywords if hasattr(self, '_additional_header_keywords')\ else None - def add_additional_header_keywords(self, keyword): + def add_additional_header_keyword(self, keyword): if not hasattr(self, '_additional_header_keywords'): setattr(self, '_additional_header_keywords', list()) self._additional_header_keywords.append(keyword) + def add_additional_header_keywords(self, keywords): + if not hasattr(self, '_additional_header_keywords'): + setattr(self, '_additional_header_keywords', list()) + for keyword in keywords: + self._additional_header_keywords.append(keyword) + class L1Mixin(FitsHeaderMixin): @property diff --git a/stixcore/util/scripts/ddpd.py b/stixcore/util/scripts/ddpd.py index 015dff96..cf9b769c 100644 --- a/stixcore/util/scripts/ddpd.py +++ b/stixcore/util/scripts/ddpd.py @@ -299,7 +299,6 @@ def product(file_in): "L0/21/6/30/solo_L0_stix-ql-lightcurve_0684892800_V02U.fits", "L0/21/6/33/solo_L0_stix-ql-variance_0687484800_V02U.fits", "L0/21/6/32/solo_L0_stix-ql-spectra_0680400000_V02U.fits", - "L0/21/6/41/solo_L0_stix-cal-energy_0683856000_V02U.fits", # HK "L0/3/25/2/solo_L0_stix-hk-maxi_0647913600_V02U.fits", "L0/3/25/1/solo_L0_stix-hk-mini_0643507200_V02U.fits", @@ -315,12 +314,11 @@ def product(file_in): "L1/2021/10/13/SCI/solo_L1_stix-sci-aspect-burst_20211013T034959-20211013T055031_V02_2110130059.fits", # noqa "L1/2021/06/28/SCI/solo_L1_stix-sci-xray-spec_20210628T230132-20210628T234123_V02_2106280041-54988.fits", # noqa # QL - "L1/2020/06/16/QL/solo_L1_stix-ql-background_20200616_V02U.fits", - "L1/2020/06/16/QL/solo_L1_stix-ql-flareflag_20200616_V02U.fits", - "L1/2020/06/16/QL/solo_L1_stix-ql-lightcurve_20200616_V02U.fits", - "L1/2020/06/16/QL/solo_L1_stix-ql-variance_20200616_V02U.fits", - "L1/2021/11/16/QL/solo_L1_stix-ql-spectra_20211116_V02U.fits", - "L1/2021/11/16/CAL/solo_L1_stix-cal-energy_20211116_V02U.fits", + "L1/2020/06/16/QL/solo_L1_stix-ql-background_20200616_V02.fits", + "L1/2020/06/16/QL/solo_L1_stix-ql-flareflag_20200616_V02.fits", + "L1/2020/06/16/QL/solo_L1_stix-ql-lightcurve_20200616_V02.fits", + "L1/2020/06/16/QL/solo_L1_stix-ql-variance_20200616_V02.fits", + "L1/2021/11/16/QL/solo_L1_stix-ql-spectra_20211116_V02.fits", # HK "L1/2020/06/16/HK/solo_L1_stix-hk-maxi_20200616_V02U.fits", "L1/2021/09/20/HK/solo_L1_stix-hk-mini_20210920_V02U.fits", diff --git a/stixcore/version_conf.py b/stixcore/version_conf.py new file mode 100644 index 00000000..1cf04707 --- /dev/null +++ b/stixcore/version_conf.py @@ -0,0 +1,12 @@ +from pathlib import Path + + +def get_conf_version(): + try: + with open(Path(__file__).parent / "config" / "data" / "common" / "VERSION.TXT") as f: + return f.readline().strip() + except Exception: + return 'v0.1.3' + + +__version_conf__ = get_conf_version()