Skip to content

Commit

Permalink
refactor(reader): extract data qc logic to new function
Browse files Browse the repository at this point in the history
- Move qc code out of _read_device and into new function qc()
- Merge calculate_wear_coverage into qc()
  • Loading branch information
chanshing committed Oct 22, 2024
1 parent f8b2e1d commit fd8e171
Showing 1 changed file with 61 additions and 56 deletions.
117 changes: 61 additions & 56 deletions src/actipy/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,20 @@ def read_device(input_file,
# NOTE: Using process() increases data ref count by 1, which increases
# memory. So instead we just do everything here.

sample_rate = info['SampleRate']
# Basic quality control
timer.start("Quality control...")
data, info_qc = qc(data, info['SampleRate'])
info_qc['ReadErrors'] += info['ReadErrors']
info.update(info_qc)
timer.stop()

if len(data) == 0:
print("File is empty. No data to process.")
return data, info

if lowpass_hz not in (None, False):
timer.start("Lowpass filter...")
data, info_lowpass = P.lowpass(data, sample_rate, lowpass_hz)
data, info_lowpass = P.lowpass(data, info['SampleRate'], lowpass_hz)
info.update(info_lowpass)
timer.stop()

Expand All @@ -86,15 +91,12 @@ def read_device(input_file,
if resample_hz not in (None, False):
timer.start("Resampling...")
if resample_hz in ('uniform', True):
data, info_resample = P.resample(data, sample_rate)
data, info_resample = P.resample(data, info['SampleRate'])
else:
data, info_resample = P.resample(data, resample_hz)
info.update(info_resample)
timer.stop()

info_wear_coverage = calculate_wear_coverage(data)
info.update(info_wear_coverage)

return data, info


Expand Down Expand Up @@ -164,6 +166,59 @@ def process(data, sample_rate,
return data, info


def qc(data, sample_rate):
""" Basic quality control. Returns a dict with general info. Also checks and
corrects for non-increasing timestamps.
"""

info = {}

if len(data) == 0:
info['NumTicks'] = 0
info['StartTime'] = None
info['EndTime'] = None
info['WearTime(days)'] = 0
info['NumInterrupts'] = 0
info['ReadErrors'] = 0
return info

# Start/end times, wear time, interrupts
tol = pd.Timedelta('1s')
tdiff = data.index.to_series().diff() # Note: Index.diff() was only added in pandas 2.1
total_wear = tdiff[tdiff < tol].sum().total_seconds()
num_interrupts = (tdiff > tol).sum()
time_format = "%Y-%m-%d %H:%M:%S"
info['NumTicks'] = len(data)
info['StartTime'] = data.index[0].strftime(time_format)
info['EndTime'] = data.index[-1].strftime(time_format)
info['WearTime(days)'] = total_wear / (60 * 60 * 24)
info['NumInterrupts'] = num_interrupts

# Check for non-increasing timestamps. This is rare but can happen with
# buggy devices. TODO: Parser should do this.
errs = (tdiff <= pd.Timedelta(0)).sum()
del tdiff # we're done with this
if errs > 0:
print("Found non-increasing data timestamps. Fixing...")
data = data[data
.index
.to_series()
.cummax()
.diff()
.fillna(pd.Timedelta(1))
.gt(pd.Timedelta(0))]
info['ReadErrors'] = int(np.ceil(errs / sample_rate))
else:
info['ReadErrors'] = 0

# Check if data covers all 24 hours of the day
coverage = data.notna().any(axis=1).groupby(data.index.hour).mean()
info['Covers24hOK'] = int(len(coverage) == 24 and np.min(coverage) >= 0.01)
del coverage

return data, info


def _read_device(input_file, verbose=True):
""" Internal function that interfaces with the Java parser to read the
device file. Returns parsed data as a pandas dataframe, and a dict with
Expand Down Expand Up @@ -204,44 +259,7 @@ def _read_device(input_file, verbose=True):
data_mmap = np.load(os.path.join(tmpdir, "data.npy"), mmap_mode='r')
data = {c: np.asarray(data_mmap[c]) for c in data_mmap.dtype.names}
data = pd.DataFrame(data, copy=False)

if len(data) == 0:
info['NumTicks'] = 0
info['StartTime'] = None
info['EndTime'] = None
info['WearTime(days)'] = 0
info['NumInterrupts'] = 0
data.set_index('time', inplace=True)
timer.stop()
return data, info

# Check for non-increasing timestamps. This is rare but can happen with
# buggy devices. TODO: Parser should do this.
errs = (data['time'].diff() <= pd.Timedelta(0)).sum()
if errs > 0:
print("Found non-increasing data timestamps. Fixing...")
data = data[data['time']
.cummax()
.diff()
.fillna(pd.Timedelta(1))
> pd.Timedelta(0)]
info['ReadErrors'] += int(np.ceil(errs / info['SampleRate']))

# Start/end times, wear time, interrupts
t = data['time']
tol = pd.Timedelta('1s')
total_wear = (t.diff().pipe(lambda x: x[x < tol].sum()).total_seconds())
num_interrupts = (t.diff() > tol).sum()
strftime = "%Y-%m-%d %H:%M:%S"
info['NumTicks'] = len(data)
info['StartTime'] = t.iloc[0].strftime(strftime)
info['EndTime'] = t.iloc[-1].strftime(strftime)
info['WearTime(days)'] = total_wear / (60 * 60 * 24)
info['NumInterrupts'] = num_interrupts
del t

data.set_index('time', inplace=True)

timer.stop()

return data, info
Expand Down Expand Up @@ -408,19 +426,6 @@ def fix_nonincr_time(data):
return data, errs


def calculate_wear_coverage(data):
""" Check device wear covers all 24 hours of the day """
info = {}
coverage = data.notna().any(axis=1).groupby(data.index.hour).agg(lambda x: x.notna().mean())

if len(coverage) < 24 or np.min(coverage) < 0.01:
info['Covers24hOK'] = 0
else:
info['Covers24hOK'] = 1

return info


class Timer:
def __init__(self, verbose=True):
self.verbose = verbose
Expand Down

0 comments on commit fd8e171

Please sign in to comment.