From fd8e1719ac0a9f1889d72f6bebeb564a13424d8d Mon Sep 17 00:00:00 2001 From: Shing Chan Date: Tue, 22 Oct 2024 18:16:39 +0100 Subject: [PATCH] refactor(reader): extract data qc logic to new function - Move qc code out of _read_device and into new function qc() - Merge calculate_wear_coverage into qc() --- src/actipy/reader.py | 117 ++++++++++++++++++++++--------------------- 1 file changed, 61 insertions(+), 56 deletions(-) diff --git a/src/actipy/reader.py b/src/actipy/reader.py index 9effe51..5b13297 100644 --- a/src/actipy/reader.py +++ b/src/actipy/reader.py @@ -59,7 +59,12 @@ def read_device(input_file, # NOTE: Using process() increases data ref count by 1, which increases # memory. So instead we just do everything here. - sample_rate = info['SampleRate'] + # Basic quality control + timer.start("Quality control...") + data, info_qc = qc(data, info['SampleRate']) + info_qc['ReadErrors'] += info['ReadErrors'] + info.update(info_qc) + timer.stop() if len(data) == 0: print("File is empty. No data to process.") @@ -67,7 +72,7 @@ def read_device(input_file, if lowpass_hz not in (None, False): timer.start("Lowpass filter...") - data, info_lowpass = P.lowpass(data, sample_rate, lowpass_hz) + data, info_lowpass = P.lowpass(data, info['SampleRate'], lowpass_hz) info.update(info_lowpass) timer.stop() @@ -86,15 +91,12 @@ def read_device(input_file, if resample_hz not in (None, False): timer.start("Resampling...") if resample_hz in ('uniform', True): - data, info_resample = P.resample(data, sample_rate) + data, info_resample = P.resample(data, info['SampleRate']) else: data, info_resample = P.resample(data, resample_hz) info.update(info_resample) timer.stop() - info_wear_coverage = calculate_wear_coverage(data) - info.update(info_wear_coverage) - return data, info @@ -164,6 +166,59 @@ def process(data, sample_rate, return data, info +def qc(data, sample_rate): + """ Basic quality control. Returns a dict with general info. Also checks and + corrects for non-increasing timestamps. + """ + + info = {} + + if len(data) == 0: + info['NumTicks'] = 0 + info['StartTime'] = None + info['EndTime'] = None + info['WearTime(days)'] = 0 + info['NumInterrupts'] = 0 + info['ReadErrors'] = 0 + return info + + # Start/end times, wear time, interrupts + tol = pd.Timedelta('1s') + tdiff = data.index.to_series().diff() # Note: Index.diff() was only added in pandas 2.1 + total_wear = tdiff[tdiff < tol].sum().total_seconds() + num_interrupts = (tdiff > tol).sum() + time_format = "%Y-%m-%d %H:%M:%S" + info['NumTicks'] = len(data) + info['StartTime'] = data.index[0].strftime(time_format) + info['EndTime'] = data.index[-1].strftime(time_format) + info['WearTime(days)'] = total_wear / (60 * 60 * 24) + info['NumInterrupts'] = num_interrupts + + # Check for non-increasing timestamps. This is rare but can happen with + # buggy devices. TODO: Parser should do this. + errs = (tdiff <= pd.Timedelta(0)).sum() + del tdiff # we're done with this + if errs > 0: + print("Found non-increasing data timestamps. Fixing...") + data = data[data + .index + .to_series() + .cummax() + .diff() + .fillna(pd.Timedelta(1)) + .gt(pd.Timedelta(0))] + info['ReadErrors'] = int(np.ceil(errs / sample_rate)) + else: + info['ReadErrors'] = 0 + + # Check if data covers all 24 hours of the day + coverage = data.notna().any(axis=1).groupby(data.index.hour).mean() + info['Covers24hOK'] = int(len(coverage) == 24 and np.min(coverage) >= 0.01) + del coverage + + return data, info + + def _read_device(input_file, verbose=True): """ Internal function that interfaces with the Java parser to read the device file. Returns parsed data as a pandas dataframe, and a dict with @@ -204,44 +259,7 @@ def _read_device(input_file, verbose=True): data_mmap = np.load(os.path.join(tmpdir, "data.npy"), mmap_mode='r') data = {c: np.asarray(data_mmap[c]) for c in data_mmap.dtype.names} data = pd.DataFrame(data, copy=False) - - if len(data) == 0: - info['NumTicks'] = 0 - info['StartTime'] = None - info['EndTime'] = None - info['WearTime(days)'] = 0 - info['NumInterrupts'] = 0 - data.set_index('time', inplace=True) - timer.stop() - return data, info - - # Check for non-increasing timestamps. This is rare but can happen with - # buggy devices. TODO: Parser should do this. - errs = (data['time'].diff() <= pd.Timedelta(0)).sum() - if errs > 0: - print("Found non-increasing data timestamps. Fixing...") - data = data[data['time'] - .cummax() - .diff() - .fillna(pd.Timedelta(1)) - > pd.Timedelta(0)] - info['ReadErrors'] += int(np.ceil(errs / info['SampleRate'])) - - # Start/end times, wear time, interrupts - t = data['time'] - tol = pd.Timedelta('1s') - total_wear = (t.diff().pipe(lambda x: x[x < tol].sum()).total_seconds()) - num_interrupts = (t.diff() > tol).sum() - strftime = "%Y-%m-%d %H:%M:%S" - info['NumTicks'] = len(data) - info['StartTime'] = t.iloc[0].strftime(strftime) - info['EndTime'] = t.iloc[-1].strftime(strftime) - info['WearTime(days)'] = total_wear / (60 * 60 * 24) - info['NumInterrupts'] = num_interrupts - del t - data.set_index('time', inplace=True) - timer.stop() return data, info @@ -408,19 +426,6 @@ def fix_nonincr_time(data): return data, errs -def calculate_wear_coverage(data): - """ Check device wear covers all 24 hours of the day """ - info = {} - coverage = data.notna().any(axis=1).groupby(data.index.hour).agg(lambda x: x.notna().mean()) - - if len(coverage) < 24 or np.min(coverage) < 0.01: - info['Covers24hOK'] = 0 - else: - info['Covers24hOK'] = 1 - - return info - - class Timer: def __init__(self, verbose=True): self.verbose = verbose