Skip to content

Commit

Permalink
refactor(reader): extract qc code out of _read_device into own function
Browse files Browse the repository at this point in the history
  • Loading branch information
chanshing committed Oct 22, 2024
1 parent 0f452db commit 5a74fc2
Showing 1 changed file with 56 additions and 40 deletions.
96 changes: 56 additions & 40 deletions src/actipy/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,20 @@ def read_device(input_file,
# NOTE: Using process() increases data ref count by 1, which increases
# memory. So instead we just do everything here.

sample_rate = info['SampleRate']
# Basic quality control
timer.start("Quality control...")
data, info_qc = qc(data, info['SampleRate'])
info_qc['ReadErrors'] += info['ReadErrors']
info.update(info_qc)
timer.stop()

if len(data) == 0:
print("File is empty. No data to process.")
return data, info

if lowpass_hz not in (None, False):
timer.start("Lowpass filter...")
data, info_lowpass = P.lowpass(data, sample_rate, lowpass_hz)
data, info_lowpass = P.lowpass(data, info['SampleRate'], lowpass_hz)
info.update(info_lowpass)
timer.stop()

Expand All @@ -86,7 +91,7 @@ def read_device(input_file,
if resample_hz not in (None, False):
timer.start("Resampling...")
if resample_hz in ('uniform', True):
data, info_resample = P.resample(data, sample_rate)
data, info_resample = P.resample(data, info['SampleRate'])
else:
data, info_resample = P.resample(data, resample_hz)
info.update(info_resample)
Expand Down Expand Up @@ -164,6 +169,54 @@ def process(data, sample_rate,
return data, info


def qc(data, sample_rate):
""" Basic quality control. Returns a dict with general info. Also checks and
corrects for non-increasing timestamps.
"""

info = {}

if len(data) == 0:
info['NumTicks'] = 0
info['StartTime'] = None
info['EndTime'] = None
info['WearTime(days)'] = 0
info['NumInterrupts'] = 0
info['ReadErrors'] = 0
return info

# Start/end times, wear time, interrupts
tol = pd.Timedelta('1s')
tdiff = data.index.to_series().diff() # Note: Index.diff() was only added in pandas 2.1
total_wear = tdiff[tdiff < tol].sum().total_seconds()
num_interrupts = (tdiff > tol).sum()
time_format = "%Y-%m-%d %H:%M:%S"
info['NumTicks'] = len(data)
info['StartTime'] = data.index[0].strftime(time_format)
info['EndTime'] = data.index[-1].strftime(time_format)
info['WearTime(days)'] = total_wear / (60 * 60 * 24)
info['NumInterrupts'] = num_interrupts

# Check for non-increasing timestamps. This is rare but can happen with
# buggy devices. TODO: Parser should do this.
errs = (tdiff <= pd.Timedelta(0)).sum()
del tdiff # we're done with this
if errs > 0:
print("Found non-increasing data timestamps. Fixing...")
data = data[data
.index
.to_series()
.cummax()
.diff()
.fillna(pd.Timedelta(1))
.gt(pd.Timedelta(0))]
info['ReadErrors'] = int(np.ceil(errs / sample_rate))
else:
info['ReadErrors'] = 0

return data, info


def _read_device(input_file, verbose=True):
""" Internal function that interfaces with the Java parser to read the
device file. Returns parsed data as a pandas dataframe, and a dict with
Expand Down Expand Up @@ -204,44 +257,7 @@ def _read_device(input_file, verbose=True):
data_mmap = np.load(os.path.join(tmpdir, "data.npy"), mmap_mode='r')
data = {c: np.asarray(data_mmap[c]) for c in data_mmap.dtype.names}
data = pd.DataFrame(data, copy=False)

if len(data) == 0:
info['NumTicks'] = 0
info['StartTime'] = None
info['EndTime'] = None
info['WearTime(days)'] = 0
info['NumInterrupts'] = 0
data.set_index('time', inplace=True)
timer.stop()
return data, info

# Check for non-increasing timestamps. This is rare but can happen with
# buggy devices. TODO: Parser should do this.
errs = (data['time'].diff() <= pd.Timedelta(0)).sum()
if errs > 0:
print("Found non-increasing data timestamps. Fixing...")
data = data[data['time']
.cummax()
.diff()
.fillna(pd.Timedelta(1))
> pd.Timedelta(0)]
info['ReadErrors'] += int(np.ceil(errs / info['SampleRate']))

# Start/end times, wear time, interrupts
t = data['time']
tol = pd.Timedelta('1s')
total_wear = (t.diff().pipe(lambda x: x[x < tol].sum()).total_seconds())
num_interrupts = (t.diff() > tol).sum()
strftime = "%Y-%m-%d %H:%M:%S"
info['NumTicks'] = len(data)
info['StartTime'] = t.iloc[0].strftime(strftime)
info['EndTime'] = t.iloc[-1].strftime(strftime)
info['WearTime(days)'] = total_wear / (60 * 60 * 24)
info['NumInterrupts'] = num_interrupts
del t

data.set_index('time', inplace=True)

timer.stop()

return data, info
Expand Down

0 comments on commit 5a74fc2

Please sign in to comment.