Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update GetUnityTaskEvents- feature Unitybase submodule support #1

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
85 changes: 85 additions & 0 deletions AsType.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import logging
import numpy as np
from neuropype.engine import *
from neuropype.utilities import cache


logger = logging.getLogger(__name__)


class AsType(Node):
"""Change data type."""

# --- Input/output ports ---
data = DataPort(Packet, "Data to process.", mutating=False)

# --- Properties ---
dtype = EnumPort('none', domain=['float64', 'float32', 'float16', 'int64', 'int32', 'int16', 'int8', 'none'],
help="""The new dtype. Use 'none' for no change.""")
data_class = EnumPort('none', domain=['DatasetView', 'ndarray', 'none'])
use_caching = BoolPort(False, """Enable caching.""", expert=True)

def __init__(self,
dtype: Union[str, None, Type[Keep]] = Keep,
data_class: Union[str, None, Type[Keep]] = Keep,
use_caching: Union[bool, None, Type[Keep]] = Keep,
**kwargs):
super().__init__(dtype=dtype, data_class=data_class, use_caching=use_caching, **kwargs)

@classmethod
def description(cls):
"""Declare descriptive information about the node."""
return Description(name='As Type',
description="""\
Change the dtype of the chunk data,
and/or change the block._data class.
""",
version='0.1.0', status=DevStatus.alpha)

@data.setter
def data(self, pkt):
import lazy_ops

# try to read from cache
record = cache.try_lookup(context=self, enabled=self.use_caching,
verbose=True, data=pkt, state=None)

if record.success():
self._data = record.data
return

out_chunks = {}
for n, chunk in enumerate_chunks(pkt, nonempty=True):

out_axes = deepcopy_most(chunk.block.axes)

dtype = {'float64': np.float64, 'float32': np.float32, 'float16': np.float16,
'int64': np.int64, 'int32': np.int32, 'int16': np.int16, 'int8': np.int8,
'none': chunk.block._data.dtype}[self.dtype]

if self.data_class == 'DatasetView' or\
(isinstance(chunk.block._data, lazy_ops.DatasetView) and self.data_class == 'none'):
# Create new DatasetView backed by tempfile
cache_settings = chunk.block._data._dataset.file.id.get_access_plist().get_cache()
file_kwargs = {'rdcc_nbytes': cache_settings[2],
'rdcc_nslots': cache_settings[1]}
data = lazy_ops.create_with_tempfile(chunk.block.shape, dtype=dtype,
chunks=chunk.block._data._dataset.chunks,
**file_kwargs)
data[:] = chunk.block._data

elif self.data_class == 'ndarray' or\
(isinstance(chunk.block._data, np.ndarray) and self.data_class == 'none'):
data = np.array(chunk.block._data).astype(dtype)

out_chunks[n] = Chunk(block=Block(data=data, axes=out_axes),
props=deepcopy_most(chunk.props))

pkt = Packet(chunks=out_chunks)
record.writeback(data=pkt)
self._data = pkt

def on_port_assigned(self):
"""Callback to reset internal state when a value was assigned to a
port (unless the port's setter has been overridden)."""
self.signal_changed(True)
34 changes: 0 additions & 34 deletions FixChannames.py

This file was deleted.

94 changes: 94 additions & 0 deletions GetNonzeroData.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import logging
import numpy as np

from neuropype.engine import *
from neuropype.utilities import cache


logger = logging.getLogger(__name__)


class GetNonzeroData(Node):
"""Get a copy of continuous timeseries including only samples where all channels were zero."""

# --- Input/output ports ---
data = DataPort(Packet, "Data to process.", mutating=False)

waveform_window = ListPort([0, 0], float, """Time window (seconds) around
each event that is assumed to have non-zero waveform data.
""", verbose_name='waveform window')
use_caching = BoolPort(False, """Enable caching.""", expert=True)

def __init__(self,
waveform_window: Union[List[float], None, Type[Keep]] = Keep,
use_caching: Union[bool, None, Type[Keep]] = Keep,
**kwargs):
super().__init__(waveform_window=waveform_window, use_caching=use_caching, **kwargs)

@classmethod
def description(cls):
"""Declare descriptive information about the node."""
return Description(name='Drop Blank Times',
description="""\
Drop samples (time axis) where all channels were value == 0
""",
version='0.1.0', status=DevStatus.alpha)

@data.setter
def data(self, pkt):

record = cache.try_lookup(context=self, enabled=self.use_caching,
verbose=True, data=pkt, state=None)
if record.success():
self._data = record.data
return

# Get the event train, if present.
evt_name, evt_chunk = find_first_chunk(pkt, with_axes=(space, time),
with_flags=Flags.is_sparse,
allow_markers=False)

# Get the signals chunk, if present.
sig_name, sig_chunk = find_first_chunk(pkt, with_axes=(space, time),
without_flags=Flags.is_sparse)

if sig_name is None:
return

b_keep = np.zeros((len(sig_chunk.block.axes['time']),), dtype=bool)
if evt_name is not None:
# if evt_chunk is present then we can use that to identify which samples were covered by a waveform
spk_blk = evt_chunk.block
spike_inds = np.sort(np.unique(spk_blk._data.indices))
wf_samps = [int(_ * sig_chunk.block.axes[time].nominal_rate) for _ in self.waveform_window]
spike_inds = spike_inds[np.logical_and(spike_inds > wf_samps[0], spike_inds < (len(b_keep) - wf_samps[1]))]
dat_inds = np.unique(spike_inds[:, None] + np.arange(wf_samps[0], wf_samps[1], dtype=int)[None, :])
b_keep[dat_inds] = True
else:
# else, scan the data. This is probably slower than above.
for ch_ix in range(len(sig_chunk.block.axes[space])):
b_keep = np.logical_or(b_keep,
sig_chunk.block[space[ch_ix], ...].data[0] != 0)

logger.info(f"Copying {np.sum(b_keep)} / {len(b_keep)} samples ({100.*np.sum(b_keep)/len(b_keep):.2f} %)...")

# Create output block that is copy of input
out_axes = list(sig_chunk.block[space, time].axes)
out_axes[-1] = TimeAxis(times=out_axes[-1].times[b_keep], nominal_rate=out_axes[-1].nominal_rate)
out_blk = Block(data=sig_chunk.block._data, axes=out_axes, data_only_for_type=True)
for ch_ix in range(len(sig_chunk.block.axes[space])):
# Non-slice and non-scalar indexing of long block axes is quite slow, so get full time then slice that.
out_blk[ch_ix:ch_ix+1, :].data = sig_chunk.block[ch_ix:ch_ix+1, :].data[:, b_keep]

# Create a new packet using only nonzero samples. Note this uses a ndarray, not DatasetView
self._data = Packet(chunks={sig_name: Chunk(
block=out_blk,
props=deepcopy_most(sig_chunk.props)
)})

record.writeback(data=self._data)

def on_port_assigned(self):
"""Callback to reset internal state when a value was assigned to a
port (unless the port's setter has been overridden)."""
self.signal_changed(True)
Loading