Skip to content

Commit

Permalink
change parallel processing to batch by SCI product (#259)
Browse files Browse the repository at this point in the history
* change parallel processing to batch by SCI product

* typo in desc.

* fix test

* fixes for end2end

* fixes for set data type

* fix dtype of packet id information (#261)

* fix dtype of packet id information

* dtype fix for trigger as well

* shring the end2end test files the a minimum (#262)

* End2end test small (#263)

* shring the end2end test files the a minimum

* enable main

* End2end test small (#264)

* shring the end2end test files the a minimum

* enable main

* add logging into same dir

* End2end test small (#265)

* shring the end2end test files the a minimum

* enable main

* add logging into same dir

* add logging into same dir

* add logging into same dir

* change parallel processing to batch by SCI product

* typo in desc.

* fix test

* fixes for end2end

* update idb references (#267)

* update idb references

* fix test for latest idb .36

* change parallel processing to batch by SCI product

* typo in desc.

* fix test

* fixes for end2end

* change parallel processing to batch by SCI product

* fixes for end2end

* End2end fix aka order independend TM processing (#270)

* update idb references

* fix test for latest idb .36

* add pipeline config adn setup logging

* add pipeline config adn setup logging

* make TMprocessing independend of order of input

* remove fix idb version

* Update stixcore.ini

* refactor BSD LB to files per request (#244)

* refactor BSD LB to files per request

* more testing

* fix testing

* fix testing

* fix #251 #256 #234

* fix #255

* change IDB lookup to interval tree
add script for getting ASW update times

* remove testing fragment

* adress #254

* fix #254

* define order of test XML files as differ in OS

* fix  #252

* change order of DPDD extensions

* better error logging in LB2L0 stepp

* Update stixcore/util/scripts/idb_history.py

Co-authored-by: Shane Maloney <[email protected]>

* fixes

* fixes

* fix dtype of packet id information (#261)

* fix dtype of packet id information

* dtype fix for trigger as well

* shring the end2end test files the a minimum (#262)

* End2end test small (#263)

* shring the end2end test files the a minimum

* enable main

* End2end test small (#264)

* shring the end2end test files the a minimum

* enable main

* add logging into same dir

* End2end test small (#265)

* shring the end2end test files the a minimum

* enable main

* add logging into same dir

* add logging into same dir

* add logging into same dir

* update idb references (#267)

* update idb references

* fix test for latest idb .36

* refactor BSD LB to files per request

* more testing

* fix testing

* fix testing

* fix #251 #256 #234

* fix #255

* change IDB lookup to interval tree
add script for getting ASW update times

* remove testing fragment

* adress #254

* fix #254

* define order of test XML files as differ in OS

* fix  #252

* change order of DPDD extensions

* better error logging in LB2L0 stepp

* fixes

* Update stixcore/util/scripts/idb_history.py

Co-authored-by: Shane Maloney <[email protected]>

* fixes

* End2end fix aka order independend TM processing (#270)

* update idb references

* fix test for latest idb .36

* add pipeline config adn setup logging

* add pipeline config adn setup logging

* make TMprocessing independend of order of input

* remove fix idb version

* Update stixcore.ini

* refactor BSD LB to files per request

* more testing

* fix testing

* fix testing

* fix #251 #256 #234

* fix #255

* change IDB lookup to interval tree
add script for getting ASW update times

* remove testing fragment

* adress #254

* fix #254

* define order of test XML files as differ in OS

* fix  #252

* change order of DPDD extensions

* better error logging in LB2L0 stepp

* fixes

* Update stixcore/util/scripts/idb_history.py

Co-authored-by: Shane Maloney <[email protected]>

* fixes

* change IDB lookup to interval tree
add script for getting ASW update times

* remove testing fragment

* fix test for oldest IDB version

Co-authored-by: Shane Maloney <[email protected]>

* change parallel processing to batch by SCI product

* typo in desc.

* fix test

* fixes for end2end

* change parallel processing to batch by SCI product

* fixes for end2end

* change parallel processing to batch by SCI product

* fixes for end2end

* change parallel processing to batch by SCI product

* supress FITS warning only in pipelines

Co-authored-by: Shane Maloney <[email protected]>
  • Loading branch information
nicHoch and Shane Maloney authored Aug 30, 2022
1 parent cc21dc2 commit f06f619
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 12 deletions.
4 changes: 4 additions & 0 deletions stixcore/data/stixcore.ini
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@ stop_on_error = False
[IDLBridge]
enabled = False
gsw_path =
[Pipeline]
parallel_batchsize_L0 = 300
parallel_batchsize_L1 = 300
parallel_batchsize_L2 = 100
3 changes: 3 additions & 0 deletions stixcore/data/test/stixcore.ini
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ error_mail_receivers = [email protected]
error_mail_smpt_host = localhost
error_mail_smpt_port = 25
error_mail_sender = [email protected]
parallel_batchsize_L0 = 3
parallel_batchsize_L1 = 3
parallel_batchsize_L2 = 3
34 changes: 30 additions & 4 deletions stixcore/processing/L0toL1.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@


class Level1:
"""Processing step from L0 to L1.
Input are L0 Fits files and the result is written to FITS as well. Daily products will be added
to existing FITS files if already present.
Groups all files into parallel processes by product type if daily products. For by request
products it is also grouped by type but additional into batches of a given size.
A priority heuristic starts with ql data as this takes longest.
"""
def __init__(self, source_dir, output_dir):
self.source_dir = Path(source_dir)
self.output_dir = Path(output_dir)
Expand All @@ -28,15 +37,31 @@ def process_fits_files(self, files=None):
all_files = list()
if files is None:
files = self.level0_files

product_types = defaultdict(list)
product_types_batch = defaultdict(int)
batch_size = CONFIG.getint('Pipeline', 'parallel_batchsize_L1', fallback=150)

for file in files:
# group by service,subservice,ssid example: 'L0/21/6/30'
product_types[str(file.parent)].append(file)
# group by service,subservice, ssid example: 'L0/21/6/30' as default
# or (prio, service, subservice, [SSID], [BATCH]) if all data is available
batch = 0
prio = 3
product_type = str(file.parent)
if 'L0' in file._parts:
product_type = tuple(map(int, file._parts[file._parts.index('L0')+1:-1]))
if (product_type[0] == 21 and
product_type[-1] in {20, 21, 22, 23, 24, 42}): # sci data
product_types_batch[product_type] += 1
prio = 2
elif product_type[0] == 21: # ql data
prio = 1
batch = product_types_batch[product_type] // batch_size
product_types[(prio, ) + product_type + (batch, )].append(file)

jobs = []
with ProcessPoolExecutor() as executor:
for pt, files in product_types.items():
# simple heuristic that the daily QL data takes longest so we start early
for pt, files in sorted(product_types.items()):
jobs.append(executor.submit(process_type, files,
processor=FitsL1Processor(self.output_dir),
soopmanager=SOOPManager.instance,
Expand All @@ -61,6 +86,7 @@ def process_type(files, *, processor, soopmanager, spice_kernel_path, config):

for file in files:
l0 = Product(file)
logger.info(f"processing file: {file}")
try:
tmp = Product._check_registered_widget(level='L1', service_type=l0.service_type,
service_subtype=l0.service_subtype,
Expand Down
16 changes: 15 additions & 1 deletion stixcore/processing/L1toL2.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@


class Level2:
"""Processing step from L1 to L2.
Input are L1 Fits files and the result is written to FITS as well. Daily products will be added
to existing FITS files if already present.
Groups all files into parallel processes by product type if daily products. For by request
products it is also grouped by type but additional into batches of a given size.
"""
def __init__(self, source_dir, output_dir):
self.source_dir = Path(source_dir)
self.output_dir = Path(output_dir)
Expand All @@ -31,11 +39,17 @@ def process_fits_files(self, files=None):
files = self.level1_files

product_types = defaultdict(list)
product_types_batch = defaultdict(int)
batch_size = CONFIG.getint('Pipeline', 'parallel_batchsize_L2', fallback=100)

for file in files:
# group by product: '(HK,maxi)'
mission, level, identifier, *_ = file.name.split('_')
tm_type = tuple(identifier.split('-')[1:])
product_types[tm_type].append(file)
if tm_type[0] == 'sci':
product_types_batch[tm_type] += 1
batch = product_types_batch[tm_type] // batch_size
product_types[tm_type + (batch, )].append(file)

jobs = []
with ProcessPoolExecutor() as executor:
Expand Down
17 changes: 14 additions & 3 deletions stixcore/processing/LBtoL0.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@


class Level0:
"""
"""Processing step from LB to L0.
Input are LB Fits files and the result is written to FITS as well. Daily products will be added
to existing FITS files if already present.
Groups all files into parallel processes by product type if daily products. For by request
products it is also grouped by type but additional into batches of a given size.
"""
def __init__(self, source_dir, output_dir):
self.source_dir = Path(source_dir)
Expand All @@ -28,13 +33,18 @@ def __init__(self, source_dir, output_dir):
def process_fits_files(self, files=None):
all_files = list()
tm = defaultdict(list)
tm_batch = defaultdict(int)
batch_size = CONFIG.getint('Pipeline', 'parallel_batchsize_L0', fallback=300)
if files is None:
files = self.levelb_files
# Create list of file by type
for file in files:
mission, level, identifier, *_ = file.name.split('_')
tm_type = tuple(map(int, identifier.split('-')[1:]))
tm[tm_type].append(file)
if (tm_type[0] == 21 and tm_type[-1] in {20, 21, 22, 23, 24, 42}):
tm_batch[tm_type] += 1
batch = tm_batch[tm_type] // batch_size
tm[tm_type + (batch, )].append(file)

# For each type
with ProcessPoolExecutor() as executor:
Expand Down Expand Up @@ -64,9 +74,10 @@ def process_tm_type(files, tm_type, processor, spice_kernel_path, config, idbm):
CONFIG = config

# Stand alone packet data
if (tm_type[0] == 21 and tm_type[-1] not in {20, 21, 22, 23, 24, 42}) or tm_type[0] != 21:
if (tm_type[0] == 21 and tm_type[-2] not in {20, 21, 22, 23, 24, 42}) or tm_type[0] != 21:
for file in files:
levelb = Product(file)
logger.info(f"processing file: {file}")
tmp = Product._check_registered_widget(
level='L0', service_type=levelb.service_type,
service_subtype=levelb.service_subtype, ssid=levelb.ssid,
Expand Down
3 changes: 3 additions & 0 deletions stixcore/processing/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import shutil
import logging
import smtplib
import warnings
import threading
from queue import Queue
from pprint import pformat
Expand All @@ -28,6 +29,8 @@
'log_singletons']

logger = get_logger(__name__)
warnings.filterwarnings('ignore', module='astropy.io.fits.card')
warnings.filterwarnings('ignore', module='astropy.utils.metadata')

TM_REGEX = re.compile(r'.*PktTmRaw.*.xml$')

Expand Down
4 changes: 4 additions & 0 deletions stixcore/processing/stix-pipeline
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import sys
import shutil
import logging
import argparse
import warnings
from enum import Enum
from pathlib import Path
from collections import defaultdict
Expand All @@ -19,6 +20,9 @@ from stixcore.processing.TMTCtoLB import process_tmtc_to_levelbinary
from stixcore.soop.manager import SOOPManager
from stixcore.tmtc.packets import TMTC

warnings.filterwarnings('ignore', module='astropy.io.fits.card')
warnings.filterwarnings('ignore', module='astropy.utils.metadata')


def clear_dir(dir):
for files in os.listdir(dir):
Expand Down
4 changes: 0 additions & 4 deletions stixcore/util/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,10 @@
"""
import logging
import warnings

STX_LOGGER_FORMAT = '%(asctime)s %(levelname)s %(name)s %(lineno)s: %(message)s'
STX_LOGGER_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'

warnings.filterwarnings('ignore', module='astropy.io.fits.card')
warnings.filterwarnings('ignore', module='astropy.utils.metadata')


def get_logger(name, level=logging.INFO):
"""
Expand Down

0 comments on commit f06f619

Please sign in to comment.