Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenEphysBinaryRawIO: Separate Neural and Non-Neural Data into Distinct Streams #1624

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
81 changes: 68 additions & 13 deletions neo/rawio/openephysbinaryrawio.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ def _parse_header(self):
else:
event_stream_names = []

self._num_of_signal_streams = len(sig_stream_names)

# first loop to reassign stream by "stream_index" instead of "stream_name"
self._sig_streams = {}
self._evt_streams = {}
Expand All @@ -121,8 +123,9 @@ def _parse_header(self):
# signals zone
# create signals channel map: several channel per stream
signal_channels = []

for stream_index, stream_name in enumerate(sig_stream_names):
# stream_index is the index in vector sytream names
# stream_index is the index in vector stream names
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice :)

stream_id = str(stream_index)
buffer_id = stream_id
info = self._sig_streams[0][0][stream_index]
Expand All @@ -132,37 +135,66 @@ def _parse_header(self):
if "SYNC" in chan_id and not self.load_sync_channel:
# the channel is removed from stream but not the buffer
stream_id = ""

if chan_info["units"] == "":
# in some cases for some OE version the unit is "", but the gain is to "uV"
units = "uV"
else:
units = chan_info["units"]

if "ADC" in chan_id:
# These are non-neural channels and their stream should be separated
# We defined their stream_id as the stream_index of neural data plus the number of neural streams
# This is to not break backwards compatbility with the stream_id numbering
stream_id = str(stream_index + len(sig_stream_names))
# For ADC channels multiplying by the bit_volts when units are not provided converts to Volts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean here?

the way it is written sounds like if units are provided then we SHOULD NOT worry about using the gain,? So is the gain 1 in this case and our code is fine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's not very clear without context. More context: When the units are not provided the units of ADC are Volts. This is in opposition to the neural channels where if the units are not provided the units are microvolts.

https://open-ephys.github.io/gui-docs/User-Manual/Recording-data/Binary-format.html#continuous

But the units are always the units of the data AFTER you use the gain. Only the unitless(units=="") case differs between neural and non-neural channels. Microvolts for the former, volts for the latter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, yeah that makes sense in the context (and is the same with intan so ADC being volts and neural being microvolts seems common enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-wrote the whole thing to separate unit determination from stream determination and I think it reads better now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree it makes it clearer to me at least!

units = "V" if units == "" else units

gain = chan_info["bit_volts"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a safe float? or do we need to convert it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean safe? as in numpy?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I mean is it a numpy scalar (I should have written that better :) ) or is it a python float?

Copy link
Contributor Author

@h-mayorquin h-mayorquin Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For floats, there is no difference I think right? We only have to be concerned about integers. I think np.float is the same thing as float in python (from my memory).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it anyway I guess null casting should have no cost.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I looked it up mostly from stackoverflow discussion
https://stackoverflow.com/questions/40726490/overflow-error-in-pythons-numpy-exp-function
and reddit
https://www.reddit.com/r/learnpython/comments/feakn2/numpy_overflow_help_needed/?rdt=48979
And seems like it can overflow, but honestly the issue of float overflow seems to be more benign and casting to python seems to cost precision that honestly shouldn't really matter right? But float precision is so OS dependent that I think requiring that level of precision can't really be expected from floats. Appreciate the casting though, but in general your intuition/memory of this being an int thing seems to be true.

offset = 0.0
new_channels.append(
(
chan_info["channel_name"],
chan_id,
float(info["sample_rate"]),
info["dtype"],
units,
chan_info["bit_volts"],
0.0,
gain,
offset,
stream_id,
buffer_id,
)
)
signal_channels.extend(new_channels)

signal_channels = np.array(signal_channels, dtype=_signal_channel_dtype)

signal_streams = []
signal_buffers = []
for stream_index, stream_name in enumerate(sig_stream_names):
stream_id = str(stream_index)
buffer_id = str(stream_index)
signal_buffers.append((stream_name, buffer_id))

unique_streams_ids = np.unique(signal_channels["stream_id"])
for stream_id in unique_streams_ids:
# Handle special case of Synch channel having stream_id empty
if stream_id == "":
continue
stream_index = int(stream_id)
# Neural signal
if stream_index < self._num_of_signal_streams:
stream_name = sig_stream_names[stream_index]
buffer_id = stream_id
# We add the buffers here as both the neural and the ADC channels are in the same buffer
signal_buffers.append((stream_name, buffer_id))
else: # This names the ADC streams
neural_stream_index = stream_index - self._num_of_signal_streams
neural_stream_name = sig_stream_names[neural_stream_index]
stream_name = f"{neural_stream_name}_ADC"
buffer_id = str(neural_stream_index)
signal_streams.append((stream_name, stream_id, buffer_id))

signal_streams = np.array(signal_streams, dtype=_signal_stream_dtype)
signal_buffers = np.array(signal_buffers, dtype=_signal_buffer_dtype)


# create memmap for signals
self._buffer_descriptions = {}
self._stream_buffer_slice = {}
Expand Down Expand Up @@ -191,12 +223,30 @@ def _parse_header(self):
raise ValueError(
"SYNC channel is not present in the recording. " "Set load_sync_channel to False"
)

if has_sync_trace and not self.load_sync_channel:
self._stream_buffer_slice[stream_id] = slice(None, -1)

num_neural_channels = sum(1 for ch in info["channels"] if "ADC" not in ch["channel_name"])
num_non_neural_channels = sum(1 for ch in info["channels"] if "ADC" in ch["channel_name"])


if num_non_neural_channels == 0:
if has_sync_trace and not self.load_sync_channel:
self._stream_buffer_slice[stream_id] = slice(None, -1)
else:
self._stream_buffer_slice[stream_id] = None
else:
self._stream_buffer_slice[stream_id] = None

# For ADC channels, we remove the last channel which is the Synch channel
stream_id_neural = stream_id
stream_id_non_neural = str(int(stream_id) + self._num_of_signal_streams)

# Note this implementation assumes that the neural channels come before the non-neural channels
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here guys, these are the strongest assumptions that the reader makes. I wanted to check with @alejoe91 if he knows about this:

  • Is it true that the synch trace will always be the last channel even if you have non-neural channels? I could not find this in the documentation.
  • Are the neural channels and non-neural always in order and not interleaved?

If the first case is not True, we can extract the specific index from the structure.oebin. That's a bigger change but I think doable.

For the second case, if it is not true, I am afraid we will have to error somehow as the current data model does not support non-contigious streams across the buffer with the buffer slice mechanism (@samuelgarcia )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am adding error messages for this advising people to open an issue if they have interleaved neural and non-neural or synch trace that is not at the end.


self._stream_buffer_slice[stream_id_neural] = slice(0, num_neural_channels)
self._stream_buffer_slice[stream_id_non_neural] = slice(num_neural_channels, None)

# It also assumes that the synch channel is the last channel in the buffer
if has_sync_trace and not self.load_sync_channel:
self._stream_buffer_slice[stream_id_non_neural] = slice(num_neural_channels, -1)

# events zone
# channel map: one channel one stream
event_channels = []
Expand Down Expand Up @@ -431,7 +481,12 @@ def _channels_to_group_id(self, channel_indexes):
return group_id

def _get_signal_t_start(self, block_index, seg_index, stream_index):
t_start = self._sig_streams[block_index][seg_index][stream_index]["t_start"]
if stream_index < self._num_of_signal_streams:
_sig_stream_index = stream_index
else:
_sig_stream_index = stream_index - self._num_of_signal_streams

t_start = self._sig_streams[block_index][seg_index][_sig_stream_index]["t_start"]
return t_start

def _spike_count(self, block_index, seg_index, unit_index):
Expand Down
11 changes: 11 additions & 0 deletions neo/test/rawiotest/test_openephysbinaryrawio.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class TestOpenEphysBinaryRawIO(BaseTestRawIO, unittest.TestCase):
"openephysbinary/v0.6.x_neuropixels_multiexp_multistream",
"openephysbinary/v0.6.x_neuropixels_with_sync",
"openephysbinary/v0.6.x_neuropixels_missing_folders",
"openephysbinary/neural_and_non_neural_data_mixed"
]

def test_sync(self):
Expand Down Expand Up @@ -78,6 +79,16 @@ def test_multiple_ttl_events_parsing(self):
assert np.allclose(ttl_events["durations"][ttl_events["labels"] == "6"], 0.025, atol=0.001)
assert np.allclose(ttl_events["durations"][ttl_events["labels"] == "7"], 0.016666, atol=0.001)

def test_separating_stream_for_non_neural_data(self):
rawio = OpenEphysBinaryRawIO(
self.get_local_path("openephysbinary/neural_and_non_neural_data_mixed"), load_sync_channel=False
)
rawio.parse_header()
# Check that the non-neural data stream is correctly separated
assert len(rawio.header["signal_streams"]["name"]) == 2
assert rawio.header["signal_streams"]["name"].tolist() == ["'Rhythm_FPGA-100.0", "'Rhythm_FPGA-100.0_ADC"]



if __name__ == "__main__":
unittest.main()
Loading