From f8c65449fa9cb25a3875dcb2e2a7dc5bfe1b9be8 Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Tue, 29 Dec 2020 16:12:12 -0500 Subject: [PATCH 1/4] deduplication step 1 - build out `_read_chunks` to read all data but retain stream-headers-only for resolve_streams. --- pyxdf/pyxdf.py | 323 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 223 insertions(+), 100 deletions(-) diff --git a/pyxdf/pyxdf.py b/pyxdf/pyxdf.py index 0727f53..7701384 100644 --- a/pyxdf/pyxdf.py +++ b/pyxdf/pyxdf.py @@ -199,27 +199,30 @@ def load_xdf( if verbose is not None: logger.setLevel(logging.DEBUG if verbose else logging.WARNING) + if isinstance(select_streams, int): + select_streams = [select_streams] + logger.info("Importing XDF file %s..." % filename) # if select_streams is an int or a list of int, load only streams # associated with the corresponding stream IDs # if select_streams is a list of dicts, use this to query and load streams # associated with these properties - if select_streams is None: - pass - elif isinstance(select_streams, int): - select_streams = [select_streams] - elif all([isinstance(elem, dict) for elem in select_streams]): - select_streams = match_streaminfos( - resolve_streams(filename), select_streams - ) - if not select_streams: # no streams found - raise ValueError("No matching streams found.") - elif not all([isinstance(elem, int) for elem in select_streams]): - raise ValueError( - "Argument 'select_streams' must be an int, a list of ints or a " - "list of dicts." - ) + # if select_streams is None: + # pass + # elif isinstance(select_streams, int): + # select_streams = [select_streams] + # elif all([isinstance(elem, dict) for elem in select_streams]): + # select_streams = match_streaminfos( + # resolve_streams(filename), select_streams + # ) + # if not select_streams: # no streams found + # raise ValueError("No matching streams found.") + # elif not all([isinstance(elem, int) for elem in select_streams]): + # raise ValueError( + # "Argument 'select_streams' must be an int, a list of ints or a " + # "list of dicts." + # ) # dict of returned streams, in order of appearance, indexed by stream id streams = OrderedDict() @@ -231,62 +234,32 @@ def load_xdf( with open_xdf(filename) as f: # for each chunk while True: - # noinspection PyBroadException - try: - # read [NumLengthBytes], [Length] - chunklen = _read_varlen_int(f) - except EOFError: - break - except Exception: - logger.exception("Error reading chunk length") - # If there's more data available (i.e. a read() succeeds), - # find the next boundary chunk - if f.read(1): - logger.warning( - "got zero-length chunk, scanning forward to next " - "boundary chunk." - ) - # move the stream position one byte back - f.seek(-1, 1) - if _scan_forward(f): - continue - logger.info(" reached end of file.") + # read [NumLengthBytes], [Length] + chunklen, err = _read_chunk_length(f) + if err == 1: + continue + elif err == -1: break # read [Tag] tag = struct.unpack(" Date: Tue, 29 Dec 2020 18:14:38 -0500 Subject: [PATCH 2/4] deduplication step 2: * main load func uses shared _read_chunks * support stream_headers_only * support select_streams on first pass --- pyxdf/pyxdf.py | 255 ++++++++++++++++++++++--------------------------- 1 file changed, 115 insertions(+), 140 deletions(-) diff --git a/pyxdf/pyxdf.py b/pyxdf/pyxdf.py index 7701384..f922443 100644 --- a/pyxdf/pyxdf.py +++ b/pyxdf/pyxdf.py @@ -46,6 +46,8 @@ def __init__(self, xml): self.srate = float(xml["info"]["nominal_srate"][0]) # format string (int8, int16, int32, float32, double64, string) self.fmt = xml["info"]["channel_format"][0] + # Whether or not to skip processing this chunk + self.skip = False # list of time-stamp chunks (each an ndarray, in seconds) self.time_stamps = [] # list of time-series chunks (each an ndarray or list of lists) @@ -81,6 +83,7 @@ def load_xdf( clock_reset_threshold_offset_seconds=1, clock_reset_threshold_offset_stds=10, winsor_threshold=0.0001, + stream_headers_only=False, verbose=None ): """Import an XDF file. @@ -113,6 +116,9 @@ def load_xdf( matching either the type *or* the name will be loaded. - None: load all streams (default). + stream_headers_only: Passing True will cause all non-StreamHeader chunks to be skipped. + Keyword arguments other than select_streams and verbose will be ignored. + verbose : Passing True will set logging level to DEBUG, False will set it to WARNING, and None will use root logger level. (default: None) @@ -199,126 +205,42 @@ def load_xdf( if verbose is not None: logger.setLevel(logging.DEBUG if verbose else logging.WARNING) - if isinstance(select_streams, int): - select_streams = [select_streams] - logger.info("Importing XDF file %s..." % filename) - # if select_streams is an int or a list of int, load only streams - # associated with the corresponding stream IDs - # if select_streams is a list of dicts, use this to query and load streams - # associated with these properties - # if select_streams is None: - # pass - # elif isinstance(select_streams, int): - # select_streams = [select_streams] - # elif all([isinstance(elem, dict) for elem in select_streams]): - # select_streams = match_streaminfos( - # resolve_streams(filename), select_streams - # ) - # if not select_streams: # no streams found - # raise ValueError("No matching streams found.") - # elif not all([isinstance(elem, int) for elem in select_streams]): - # raise ValueError( - # "Argument 'select_streams' must be an int, a list of ints or a " - # "list of dicts." - # ) - # dict of returned streams, in order of appearance, indexed by stream id streams = OrderedDict() # dict of per-stream temporary data (StreamData), indexed by stream id - temp = {} + temp_stream_data = {} # XML content of the file header chunk fileheader = None with open_xdf(filename) as f: - # for each chunk - while True: - # read [NumLengthBytes], [Length] - chunklen, err = _read_chunk_length(f) - if err == 1: + for chunk in _read_chunks(f, select_streams=select_streams, + stream_headers_only=stream_headers_only, skip_desc=False, + on_Samples_chunk=on_chunk): + # _read_chunks generator took care of most of the hard work involved in reading the data and error handling. + # Here we simply build up our streams dictionary and temporary data holders. + if chunk is None: continue - elif err == -1: - break - - # read [Tag] - tag = struct.unpack(" Date: Tue, 29 Dec 2020 21:02:23 -0500 Subject: [PATCH 3/4] Ensure parsed stream header retains stream_id. --- pyxdf/pyxdf.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pyxdf/pyxdf.py b/pyxdf/pyxdf.py index f922443..0411343 100644 --- a/pyxdf/pyxdf.py +++ b/pyxdf/pyxdf.py @@ -751,10 +751,13 @@ def parse_stream_header_chunks(chunks): for chunk in chunks: if chunk is not None and chunk["tag"] == 2: # stream header chunk # stream header info already normalized. Here we need to convert len=1 lists to single items. - streams.append({ + flatted_stream_header = { k: v[0] if isinstance(v, list) and (len(v) == 1) else v for k, v in chunk["info"].items() - }) + } + if "stream_id" in chunk: + flatted_stream_header["stream_id"] = chunk["stream_id"] + streams.append(flatted_stream_header) return streams From c037ae5e1ccfc5734149a2b86a057351d652865b Mon Sep 17 00:00:00 2001 From: Clemens Brunner Date: Thu, 1 Apr 2021 19:56:48 +0200 Subject: [PATCH 4/4] Revert parse_chunks renaming --- pyxdf/pyxdf.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyxdf/pyxdf.py b/pyxdf/pyxdf.py index e55fade..00d2a71 100644 --- a/pyxdf/pyxdf.py +++ b/pyxdf/pyxdf.py @@ -727,7 +727,7 @@ def resolve_streams(fname): stream_infos : list of dicts List of dicts containing information on each stream. """ - return parse_stream_header_chunks(parse_xdf(fname, stream_headers_only=True)) + return parse_chunks(parse_xdf(fname, stream_headers_only=True)) def parse_xdf(fname, stream_headers_only=False): @@ -750,7 +750,7 @@ def parse_xdf(fname, stream_headers_only=False): return chunks -def parse_stream_header_chunks(chunks): +def parse_chunks(chunks): """Parse stream header chunks and extract information on individual streams.""" streams = [] for chunk in chunks: