diff --git a/src/actipy/reader.py b/src/actipy/reader.py index d5ba60e..0048692 100644 --- a/src/actipy/reader.py +++ b/src/actipy/reader.py @@ -59,7 +59,12 @@ def read_device(input_file, # NOTE: Using process() increases data ref count by 1, which increases # memory. So instead we just do everything here. - sample_rate = info['SampleRate'] + # Basic quality control + timer.start("Quality control...") + data, info_qc = qc(data, info['SampleRate']) + info_qc['ReadErrors'] += info['ReadErrors'] + info.update(info_qc) + timer.stop() if len(data) == 0: print("File is empty. No data to process.") @@ -67,7 +72,7 @@ def read_device(input_file, if lowpass_hz not in (None, False): timer.start("Lowpass filter...") - data, info_lowpass = P.lowpass(data, sample_rate, lowpass_hz) + data, info_lowpass = P.lowpass(data, info['SampleRate'], lowpass_hz) info.update(info_lowpass) timer.stop() @@ -86,7 +91,7 @@ def read_device(input_file, if resample_hz not in (None, False): timer.start("Resampling...") if resample_hz in ('uniform', True): - data, info_resample = P.resample(data, sample_rate) + data, info_resample = P.resample(data, info['SampleRate']) else: data, info_resample = P.resample(data, resample_hz) info.update(info_resample) @@ -164,6 +169,54 @@ def process(data, sample_rate, return data, info +def qc(data, sample_rate): + """ Basic quality control. Returns a dict with general info. Also checks and + corrects for non-increasing timestamps. + """ + + info = {} + + if len(data) == 0: + info['NumTicks'] = 0 + info['StartTime'] = None + info['EndTime'] = None + info['WearTime(days)'] = 0 + info['NumInterrupts'] = 0 + info['ReadErrors'] = 0 + return info + + # Start/end times, wear time, interrupts + tol = pd.Timedelta('1s') + tdiff = data.index.to_series().diff() # Note: Index.diff() was only added in pandas 2.1 + total_wear = tdiff[tdiff < tol].sum().total_seconds() + num_interrupts = (tdiff > tol).sum() + time_format = "%Y-%m-%d %H:%M:%S" + info['NumTicks'] = len(data) + info['StartTime'] = data.index[0].strftime(time_format) + info['EndTime'] = data.index[-1].strftime(time_format) + info['WearTime(days)'] = total_wear / (60 * 60 * 24) + info['NumInterrupts'] = num_interrupts + + # Check for non-increasing timestamps. This is rare but can happen with + # buggy devices. TODO: Parser should do this. + errs = (tdiff <= pd.Timedelta(0)).sum() + del tdiff # we're done with this + if errs > 0: + print("Found non-increasing data timestamps. Fixing...") + data = data[data + .index + .to_series() + .cummax() + .diff() + .fillna(pd.Timedelta(1)) + .gt(pd.Timedelta(0))] + info['ReadErrors'] = int(np.ceil(errs / sample_rate)) + else: + info['ReadErrors'] = 0 + + return data, info + + def _read_device(input_file, verbose=True): """ Internal function that interfaces with the Java parser to read the device file. Returns parsed data as a pandas dataframe, and a dict with @@ -204,44 +257,7 @@ def _read_device(input_file, verbose=True): data_mmap = np.load(os.path.join(tmpdir, "data.npy"), mmap_mode='r') data = {c: np.asarray(data_mmap[c]) for c in data_mmap.dtype.names} data = pd.DataFrame(data, copy=False) - - if len(data) == 0: - info['NumTicks'] = 0 - info['StartTime'] = None - info['EndTime'] = None - info['WearTime(days)'] = 0 - info['NumInterrupts'] = 0 - data.set_index('time', inplace=True) - timer.stop() - return data, info - - # Check for non-increasing timestamps. This is rare but can happen with - # buggy devices. TODO: Parser should do this. - errs = (data['time'].diff() <= pd.Timedelta(0)).sum() - if errs > 0: - print("Found non-increasing data timestamps. Fixing...") - data = data[data['time'] - .cummax() - .diff() - .fillna(pd.Timedelta(1)) - > pd.Timedelta(0)] - info['ReadErrors'] += int(np.ceil(errs / info['SampleRate'])) - - # Start/end times, wear time, interrupts - t = data['time'] - tol = pd.Timedelta('1s') - total_wear = (t.diff().pipe(lambda x: x[x < tol].sum()).total_seconds()) - num_interrupts = (t.diff() > tol).sum() - strftime = "%Y-%m-%d %H:%M:%S" - info['NumTicks'] = len(data) - info['StartTime'] = t.iloc[0].strftime(strftime) - info['EndTime'] = t.iloc[-1].strftime(strftime) - info['WearTime(days)'] = total_wear / (60 * 60 * 24) - info['NumInterrupts'] = num_interrupts - del t - data.set_index('time', inplace=True) - timer.stop() return data, info