From d18ca6cb7233d5a065291978c22ef0d28baa08d8 Mon Sep 17 00:00:00 2001 From: JoeZiminski Date: Thu, 31 Oct 2024 17:07:22 +0000 Subject: [PATCH 01/14] =?UTF-8?q?Factor=20out=20covariance=20matrix=20comp?= =?UTF-8?q?utation,=20ensure=20eps=20parameter=20is=20exposed.=C2=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/spikeinterface/preprocessing/whiten.py | 68 +++++++++++++--------- 1 file changed, 42 insertions(+), 26 deletions(-) diff --git a/src/spikeinterface/preprocessing/whiten.py b/src/spikeinterface/preprocessing/whiten.py index 57400c1199..2dbad9ee79 100644 --- a/src/spikeinterface/preprocessing/whiten.py +++ b/src/spikeinterface/preprocessing/whiten.py @@ -188,31 +188,9 @@ def compute_whitening_matrix( The "mean" matrix """ - random_data = get_random_data_chunks(recording, concatenated=True, return_scaled=False, **random_chunk_kwargs) - random_data = random_data.astype(np.float32) - - regularize_kwargs = regularize_kwargs if regularize_kwargs is not None else {"method": "GraphicalLassoCV"} - - if apply_mean: - M = np.mean(random_data, axis=0) - M = M[None, :] - data = random_data - M - else: - M = None - data = random_data - - if not regularize: - cov = data.T @ data - cov = cov / data.shape[0] - else: - import sklearn.covariance - - method = regularize_kwargs.pop("method") - regularize_kwargs["assume_centered"] = True - estimator_class = getattr(sklearn.covariance, method) - estimator = estimator_class(**regularize_kwargs) - estimator.fit(data) - cov = estimator.covariance_ + data, cov, M = compute_covariance_matrix( + recording, apply_mean, regularize, regularize_kwargs, random_chunk_kwargs + ) # Here we determine eps used below to avoid division by zero. # Typically we can assume that data is either unscaled integers or in units of @@ -224,14 +202,17 @@ def compute_whitening_matrix( # where the data is on a scale less than 1. if eps is None: eps = 1e-8 + if data.dtype.kind == "f": median_data_sqr = np.median(data**2) # use the square because cov (and hence S) scales as the square if median_data_sqr < 1 and median_data_sqr > 0: - eps = max(1e-16, median_data_sqr * 1e-3) # use a small fraction of the median of the squared data + if eps is None: + eps = max(1e-16, median_data_sqr * 1e-3) # use a small fraction of the median of the squared data if mode == "global": U, S, Ut = np.linalg.svd(cov, full_matrices=True) W = (U @ np.diag(1 / np.sqrt(S + eps))) @ Ut + elif mode == "local": assert radius_um is not None n = cov.shape[0] @@ -247,3 +228,38 @@ def compute_whitening_matrix( raise ValueError(f"compute_whitening_matrix : wrong mode {mode}") return W, M + + +def compute_covariance_matrix(recording, apply_mean, regularize, regularize_kwargs, random_chunk_kwargs): # TODO: check order + """ + """ + random_data = get_random_data_chunks(recording, concatenated=True, return_scaled=False, **random_chunk_kwargs) + random_data = random_data.astype(np.float32) + + regularize_kwargs = regularize_kwargs if regularize_kwargs is not None else {"method": "GraphicalLassoCV"} + + if apply_mean: + M = np.mean(random_data, axis=0) + M = M[None, :] + data = random_data - M + else: + M = None + data = random_data + + if not regularize: + cov = data.T @ data + cov = cov / data.shape[0] + else: + import sklearn.covariance + + method = regularize_kwargs.pop("method") + regularize_kwargs["assume_centered"] = True + estimator_class = getattr(sklearn.covariance, method) + estimator = estimator_class(**regularize_kwargs) + estimator.fit(data) + cov = estimator.covariance_ + + return data, cov, M # TODO: rename data + + + From b9d3cd93407ede5a879861889dd307ddfb2b3ea6 Mon Sep 17 00:00:00 2001 From: JoeZiminski Date: Thu, 31 Oct 2024 17:20:02 +0000 Subject: [PATCH 02/14] Start extending whitening tests. --- .../tests/test_detect_bad_channels.py | 1 + .../preprocessing/tests/test_whiten.py | 156 +++++++++++++++++- 2 files changed, 151 insertions(+), 6 deletions(-) diff --git a/src/spikeinterface/preprocessing/tests/test_detect_bad_channels.py b/src/spikeinterface/preprocessing/tests/test_detect_bad_channels.py index 4622be1440..ba6679132c 100644 --- a/src/spikeinterface/preprocessing/tests/test_detect_bad_channels.py +++ b/src/spikeinterface/preprocessing/tests/test_detect_bad_channels.py @@ -247,6 +247,7 @@ def add_dead_channels(recording, is_dead): data[:, is_dead] = np.random.normal( mean, std * 0.1, size=(is_dead.size, recording.get_num_samples(segment_index)) ).T + breakpoint() recording._recording_segments[segment_index]._traces = data diff --git a/src/spikeinterface/preprocessing/tests/test_whiten.py b/src/spikeinterface/preprocessing/tests/test_whiten.py index b40627d836..0f4053add5 100644 --- a/src/spikeinterface/preprocessing/tests/test_whiten.py +++ b/src/spikeinterface/preprocessing/tests/test_whiten.py @@ -2,11 +2,160 @@ import numpy as np from spikeinterface.core import generate_recording - +from spikeinterface.core import BaseRecording, BaseRecordingSegment from spikeinterface.preprocessing import whiten, scale, compute_whitening_matrix +from spikeinterface.preprocessing.whiten import compute_covariance_matrix from pathlib import Path +import spikeinterface.full as si # TOOD: is this a bad idea? remove! + + +class CustomRecording(BaseRecording): + """ + + """ + def __init__(self, durations, num_channels, channel_ids, sampling_frequency, dtype): + BaseRecording.__init__(self, sampling_frequency=sampling_frequency, channel_ids=channel_ids, dtype=dtype) + + num_samples = [dur * sampling_frequency for dur in durations] + + for sample_num in num_samples: + rec_segment = CustomRecordingSegment( + sample_num, + num_channels, + sampling_frequency, + ) + self.add_recording_segment(rec_segment) + + self._kwargs = { + "num_channels": num_channels, + "durations": durations, + "sampling_frequency": sampling_frequency, + } + +# TODO +# 1) save covariance matrix +# 2) + + +class CustomRecordingSegment(BaseRecordingSegment): + """ + """ + def __init__(self, num_samples, num_channels, sampling_frequency): + self.num_samples = num_samples + self.num_channels = num_channels + self.sampling_frequency = sampling_frequency + self.data = np.zeros((num_samples, num_channels)) + + self.t_start = None + self.time_vector = None + + def set_data(self, data): + # TODO: do some checks + self.data = data + + def get_traces( + self, + start_frame: int | None = None, + end_frame: int | None = None, + channel_indices: list | None = None, + ): + return self.data[start_frame:end_frame, channel_indices] + + def get_num_samples(self): + return self.num_samples + +# TODO: return random cghuns scaled vs unscled + +@pytest.mark.parametrize("eps", [1e-8, 1]) +@pytest.mark.parametrize("num_segments", [1, 2]) +@pytest.mark.parametrize("dtype", [np.float32]) # np.int16 +def test_compute_whitening_matrix(eps, num_segments, dtype): + """ + """ + num_channels = 3 + + recording = CustomRecording( + durations=[10, 10] if num_segments == 2 else [10], # will auto-fill zeros + num_channels=num_channels, + channel_ids=np.arange(num_channels), + sampling_frequency=30000, + dtype=dtype + ) + num_samples = recording.get_num_samples(segment_index=0) + + # 1) setup the data with known mean and covariance. + mean_1 = mean_2 = np.zeros(num_channels) # TODO: diferent tests + # mean_2 = np.arange(num_channels) + + # Covariances for simulated data. Limit off-diagonals larger than variances + # for realism + stability / PSD. Actually, a better way is to just get + # some random data and compute x.T@x# + cov_1 = np.array( + [[1, 0.5, 0], + [0.5, 1, -0.25], + [0, -0.25, 1]] + ) + seg_1_data = np.random.multivariate_normal(mean_1, cov_1, recording.get_num_samples(segment_index=0)) + seg_1_data = seg_1_data.astype(dtype) + + recording._recording_segments[0].set_data(seg_1_data) + assert np.array_equal(recording.get_traces(segment_index=0), seg_1_data), "segment 1 test setup did not work." + + if num_segments == 2: + recording._recording_segments[1].set_data( + np.zeros((num_samples, num_channels)) + ) + + _, test_cov, test_mean = compute_covariance_matrix( + recording, + apply_mean=True, + regularize=False, + regularize_kwargs={}, + random_chunk_kwargs=dict( + num_chunks_per_segment=1, + chunk_size=recording.get_num_samples(segment_index=0)-1, + ) + ) + + if num_segments == 1: + assert np.allclose(test_cov, cov_1, rtol=0, atol=0.01) + else: + assert np.allclose(test_cov, cov_1 / 2, rtol=0, atol=0.01) + + # test_cov + # TOOD: own test for mean + + whitened_recording = si.whiten( + recording, + apply_mean=True, + regularize=False, + regularize_kwargs={}, + num_chunks_per_segment=1, + chunk_size=recording.get_num_samples(segment_index=0) - 1, + eps=eps + ) + + W = whitened_recording._kwargs["W"] + U, S, Vt = np.linalg.svd(W) + S_ = (1 / S) ** 2 - eps + P = U @ np.diag(S_) @ Vt + + if num_segments == 1: + assert np.allclose(P, cov_1, rtol=0, atol=0.01) + else: + assert np.allclose(P, cov_1 / 2, rtol=0, atol=0.01) + + # TODO: + # 1) test int16, MVN is not going to work. Completely new test that just tests against X.T@T/n + # 2) test apply mean on / off and means + # 3) make clear eps is tested above + # 4) test regularisation (use existing approach). Maybe test directly against sklearn function + # 5 )test local vs. global + # 6) monkeypatch regularisation and random kwargs to check they are passed correctly. + # 7) test radius and int scale in the simplest way + # 8) test W, M are saved correctly def test_whiten(create_cache_folder): cache_folder = create_cache_folder @@ -46,8 +195,3 @@ def test_whiten(create_cache_folder): # test regularization : norm should be smaller W2, M = compute_whitening_matrix(rec, "global", random_chunk_kwargs, apply_mean=False, regularize=True) assert np.linalg.norm(W1) > np.linalg.norm(W2) - - -if __name__ == "__main__": - cache_folder = Path(__file__).resolve().parents[4] / "cache_folder" - test_whiten(cache_folder) From 382e2fffe3ce837858d1a2c7083cd657e7b2a552 Mon Sep 17 00:00:00 2001 From: JoeZiminski Date: Thu, 7 Nov 2024 19:07:55 +0000 Subject: [PATCH 03/14] Continue expanding tests. --- .../preprocessing/tests/test_whiten.py | 293 ++++++++++++------ src/spikeinterface/preprocessing/whiten.py | 68 ++-- 2 files changed, 247 insertions(+), 114 deletions(-) diff --git a/src/spikeinterface/preprocessing/tests/test_whiten.py b/src/spikeinterface/preprocessing/tests/test_whiten.py index 0f4053add5..8f03f0577c 100644 --- a/src/spikeinterface/preprocessing/tests/test_whiten.py +++ b/src/spikeinterface/preprocessing/tests/test_whiten.py @@ -4,16 +4,15 @@ from spikeinterface.core import generate_recording from spikeinterface.core import BaseRecording, BaseRecordingSegment from spikeinterface.preprocessing import whiten, scale, compute_whitening_matrix -from spikeinterface.preprocessing.whiten import compute_covariance_matrix +from spikeinterface.preprocessing.whiten import compute_sklearn_covariance_matrix from pathlib import Path import spikeinterface.full as si # TOOD: is this a bad idea? remove! class CustomRecording(BaseRecording): - """ + """ """ - """ def __init__(self, durations, num_channels, channel_ids, sampling_frequency, dtype): BaseRecording.__init__(self, sampling_frequency=sampling_frequency, channel_ids=channel_ids, dtype=dtype) @@ -33,14 +32,10 @@ def __init__(self, durations, num_channels, channel_ids, sampling_frequency, dty "sampling_frequency": sampling_frequency, } -# TODO -# 1) save covariance matrix -# 2) - class CustomRecordingSegment(BaseRecordingSegment): - """ - """ + """ """ + def __init__(self, num_samples, num_channels, sampling_frequency): self.num_samples = num_samples self.num_channels = num_channels @@ -68,94 +63,208 @@ def get_num_samples(self): # TODO: return random cghuns scaled vs unscled -@pytest.mark.parametrize("eps", [1e-8, 1]) -@pytest.mark.parametrize("num_segments", [1, 2]) -@pytest.mark.parametrize("dtype", [np.float32]) # np.int16 -def test_compute_whitening_matrix(eps, num_segments, dtype): - """ - """ - num_channels = 3 - - recording = CustomRecording( - durations=[10, 10] if num_segments == 2 else [10], # will auto-fill zeros - num_channels=num_channels, - channel_ids=np.arange(num_channels), - sampling_frequency=30000, - dtype=dtype - ) - num_samples = recording.get_num_samples(segment_index=0) - - # 1) setup the data with known mean and covariance. - mean_1 = mean_2 = np.zeros(num_channels) # TODO: diferent tests - # mean_2 = np.arange(num_channels) - - # Covariances for simulated data. Limit off-diagonals larger than variances - # for realism + stability / PSD. Actually, a better way is to just get - # some random data and compute x.T@x# - cov_1 = np.array( - [[1, 0.5, 0], - [0.5, 1, -0.25], - [0, -0.25, 1]] - ) - seg_1_data = np.random.multivariate_normal(mean_1, cov_1, recording.get_num_samples(segment_index=0)) - seg_1_data = seg_1_data.astype(dtype) - - recording._recording_segments[0].set_data(seg_1_data) - assert np.array_equal(recording.get_traces(segment_index=0), seg_1_data), "segment 1 test setup did not work." - - if num_segments == 2: + +class TestWhiten: + + def get_float_test_data(self, num_segments, dtype, mean=None, covar=None): + """ + mention the segment thing + """ + num_channels = 3 + dtype = np.float32 + + if mean is None: + mean = np.zeros(num_channels) + + if covar is None: + covar = np.array([[1, 0.5, 0], [0.5, 1, -0.25], [0, -0.25, 1]]) + + recording = self.get_empty_custom_recording(num_segments, num_channels, dtype) + + seg_1_data = np.random.multivariate_normal( + mean, covar, recording.get_num_samples(segment_index=0) # TODO: RENAME! + ) + if dtype == np.float32: + seg_1_data = seg_1_data.astype(dtype) + elif dtype == np.int16: + seg_1_data = np.round(seg_1_data * 32767).astype(np.int16) + else: + raise ValueError("dtype must be float32 or int16") + + recording._recording_segments[0].set_data(seg_1_data) + assert np.array_equal(recording.get_traces(segment_index=0), seg_1_data), "segment 1 test setup did not work." + + return mean, covar, recording + + def get_empty_custom_recording(self, num_segments, num_channels, dtype): + """ """ + return CustomRecording( + durations=[10, 10] if num_segments == 2 else [10], # will auto-fill zeros + num_channels=num_channels, + channel_ids=np.arange(num_channels), + sampling_frequency=30000, + dtype=dtype, + ) + + def covar_from_whitening_mat(self, whitened_recording, eps): + """ + The whitening matrix is computed as the + inverse square root of the covariance matrix + (Sigma, 'S' below + some eps for regularising. + + Here the inverse process is performed to compute + the covariance matrix from the whitening matrix + for testing purposes. This allows the entire + workflow to be tested rather than subfunction only. + """ + W = whitened_recording._kwargs["W"] + U, D, Vt = np.linalg.svd(W) + D_new = (1 / D) ** 2 - eps + S = U @ np.diag(D_new) @ Vt + + return S + + @pytest.mark.parametrize("eps", [1e-8, 1]) + @pytest.mark.parametrize("dtype", [np.float32, np.int16]) + def test_compute_covariance_matrix(self, dtype, eps): + """ """ + eps = 1e-8 + mean, covar, recording = self.get_float_test_data(num_segments=1, dtype=dtype) + + whitened_recording = si.whiten( + recording, + apply_mean=True, + regularize=False, + num_chunks_per_segment=1, + chunk_size=recording.get_num_samples(segment_index=0) - 1, + eps=eps, + ) + + test_covar = self.covar_from_whitening_mat(whitened_recording, eps) + + assert np.allclose(test_covar, covar, rtol=0, atol=0.01) + + if eps != 1: + X = whitened_recording.get_traces() + X = X - np.mean(X, axis=0) + S = X.T @ X / X.shape[0] + + assert np.allclose(S, np.eye(recording.get_num_channels()), rtol=0, atol=1e-4) + + def test_compute_covariance_matrix_float_2_segments(self): + """ """ + eps = 1e-8 + mean, covar, recording = self.get_float_test_data(num_segments=2, dtype=np.float32) + recording._recording_segments[1].set_data( - np.zeros((num_samples, num_channels)) + np.zeros((recording.get_num_samples(segment_index=0), recording.get_num_channels())) ) - _, test_cov, test_mean = compute_covariance_matrix( - recording, - apply_mean=True, - regularize=False, - regularize_kwargs={}, - random_chunk_kwargs=dict( + whitened_recording = si.whiten( + recording, + apply_mean=True, + regularize=False, + regularize_kwargs={}, num_chunks_per_segment=1, - chunk_size=recording.get_num_samples(segment_index=0)-1, + chunk_size=recording.get_num_samples(segment_index=0) - 1, + eps=eps, ) - ) - - if num_segments == 1: - assert np.allclose(test_cov, cov_1, rtol=0, atol=0.01) - else: - assert np.allclose(test_cov, cov_1 / 2, rtol=0, atol=0.01) - - # test_cov - # TOOD: own test for mean - - whitened_recording = si.whiten( - recording, - apply_mean=True, - regularize=False, - regularize_kwargs={}, - num_chunks_per_segment=1, - chunk_size=recording.get_num_samples(segment_index=0) - 1, - eps=eps - ) - - W = whitened_recording._kwargs["W"] - U, S, Vt = np.linalg.svd(W) - S_ = (1 / S) ** 2 - eps - P = U @ np.diag(S_) @ Vt - - if num_segments == 1: - assert np.allclose(P, cov_1, rtol=0, atol=0.01) - else: - assert np.allclose(P, cov_1 / 2, rtol=0, atol=0.01) - - # TODO: - # 1) test int16, MVN is not going to work. Completely new test that just tests against X.T@T/n - # 2) test apply mean on / off and means - # 3) make clear eps is tested above - # 4) test regularisation (use existing approach). Maybe test directly against sklearn function - # 5 )test local vs. global - # 6) monkeypatch regularisation and random kwargs to check they are passed correctly. - # 7) test radius and int scale in the simplest way - # 8) test W, M are saved correctly + + test_covar = self.covar_from_whitening_mat(whitened_recording, eps) + + assert np.allclose(test_covar, covar / 2, rtol=0, atol=0.01) + + @pytest.mark.parametrize("apply_mean", [True, False]) + def test_apply_mean(self, apply_mean): + + means = np.array([10, 20, 30]) + + eps = 1e-8 + mean, covar, recording = self.get_float_test_data(num_segments=1, dtype=np.float32, mean=means) + + whitened_recording = si.whiten( + recording, + apply_mean=apply_mean, + regularize=False, + regularize_kwargs={}, + num_chunks_per_segment=1, + chunk_size=recording.get_num_samples(segment_index=0) - 1, + eps=eps, + ) + + test_covar = self.covar_from_whitening_mat(whitened_recording, eps) + + if apply_mean: + assert np.allclose(test_covar, covar, rtol=0, atol=0.01) + else: + assert np.allclose(np.diag(test_covar), means**2, rtol=0, atol=5) + + breakpoint() # TODO: check whitened data is cov identity even when apply_mean=False... + + def test_compute_sklearn_covariance_matrix(self): + """ + TODO: assume centered is fixed to True + Test some random stuff + + # TODO: this is not appropraite for all covariance functions. only one with the fit method! e.g. does not work with leodit_wolf + """ + from sklearn import covariance + + X = np.random.randn(100, 100) + + test_cov = compute_sklearn_covariance_matrix( + X, {"method": "GraphicalLasso", "alpha": 1, "enet_tol": 0.04} + ) # RENAME test_cov + cov = covariance.GraphicalLasso(alpha=1, enet_tol=0.04, assume_centered=True).fit(X).covariance_ + assert np.allclose(test_cov, cov) + + test_cov = compute_sklearn_covariance_matrix( + X, {"method": "ShrunkCovariance", "shrinkage": 0.3} + ) # RENAME test_cov + cov = covariance.ShrunkCovariance(shrinkage=0.3, assume_centered=True).fit(X).covariance_ + assert np.allclose(test_cov, cov) + + def test_whiten_regularisation_norm(self): + """ """ + from sklearn import covariance + + _, _, recording = self.get_float_test_data(num_segments=1, dtype=np.float32) + + whitened_recording = si.whiten( + recording, + regularize=True, + regularize_kwargs={"method": "ShrunkCovariance", "shrinkage": 0.3}, + apply_mean=True, + num_chunks_per_segment=1, + chunk_size=recording.get_num_samples(segment_index=0) - 1, + eps=1e-8, + ) + + test_covar = self.covar_from_whitening_mat(whitened_recording, eps=1e-8) + + X = recording.get_traces()[:-1, :] + X = X - np.mean(X, axis=0) + + covar = covariance.ShrunkCovariance(shrinkage=0.3, assume_centered=True).fit(X).covariance_ + + assert np.allclose(test_covar, covar, rtol=0, atol=1e-4) + + def test_local_vs_global_whiten(self): + # Make test data with 4 channels, known covar between all + # do with radius = 2. compute manually. Will need to change channel locations + # check matches well. + pass + + def test_passed_W_and_M(self): + pass + + def test_all_kwargs(self): + pass + + def test_saved_to_disk(self): + # check attributes are saved properly + pass + def test_whiten(create_cache_folder): cache_folder = create_cache_folder diff --git a/src/spikeinterface/preprocessing/whiten.py b/src/spikeinterface/preprocessing/whiten.py index 2dbad9ee79..b2d741c166 100644 --- a/src/spikeinterface/preprocessing/whiten.py +++ b/src/spikeinterface/preprocessing/whiten.py @@ -7,7 +7,6 @@ from ..core import get_random_data_chunks, get_channel_distances from .filter import fix_dtype -from ..core.globals import get_global_job_kwargs class WhitenRecording(BasePreprocessor): @@ -76,9 +75,12 @@ def __init__( dtype_ = fix_dtype(recording, dtype) if dtype_.kind == "i": - assert ( - int_scale is not None - ), "For recording with dtype=int you must set the output dtype to float OR set a int_scale" + assert int_scale is not None, ( + "For recording with dtype=int you must set the output dtype to float OR set a int_scale" + ) + + if not apply_mean and regularize: + raise ValueError("`apply_mean` must be `True` if regularising. `assume_centered` is fixed to `True`.") if W is not None: W = np.asarray(W) @@ -145,10 +147,6 @@ def get_traces(self, start_frame, end_frame, channel_indices): return whiten_traces.astype(self.dtype) -# function for API -whiten = define_function_from_class(source_class=WhitenRecording, name="whiten") - - def compute_whitening_matrix( recording, mode, random_chunk_kwargs, apply_mean, radius_um=None, eps=None, regularize=False, regularize_kwargs=None ): @@ -188,9 +186,7 @@ def compute_whitening_matrix( The "mean" matrix """ - data, cov, M = compute_covariance_matrix( - recording, apply_mean, regularize, regularize_kwargs, random_chunk_kwargs - ) + data, cov, M = compute_covariance_matrix(recording, apply_mean, regularize, regularize_kwargs, random_chunk_kwargs) # Here we determine eps used below to avoid division by zero. # Typically we can assume that data is either unscaled integers or in units of @@ -230,9 +226,10 @@ def compute_whitening_matrix( return W, M -def compute_covariance_matrix(recording, apply_mean, regularize, regularize_kwargs, random_chunk_kwargs): # TODO: check order - """ - """ +def compute_covariance_matrix( + recording, apply_mean, regularize, regularize_kwargs, random_chunk_kwargs +): # TODO: check order + """ """ random_data = get_random_data_chunks(recording, concatenated=True, return_scaled=False, **random_chunk_kwargs) random_data = random_data.astype(np.float32) @@ -250,16 +247,43 @@ def compute_covariance_matrix(recording, apply_mean, regularize, regularize_kwar cov = data.T @ data cov = cov / data.shape[0] else: - import sklearn.covariance - - method = regularize_kwargs.pop("method") - regularize_kwargs["assume_centered"] = True - estimator_class = getattr(sklearn.covariance, method) - estimator = estimator_class(**regularize_kwargs) - estimator.fit(data) - cov = estimator.covariance_ + cov = compute_sklearn_covariance_matrix(data, regularize_kwargs) + breakpoint() + # import sklearn.covariance + # method = regularize_kwargs.pop("method") + # regularize_kwargs["assume_centered"] = True + # estimator_class = getattr(sklearn.covariance, method) + # estimator = estimator_class(**regularize_kwargs) + # estimator.fit(data) + # cov = estimator.covariance_ return data, cov, M # TODO: rename data +# TODO: do we want to fix assume centered here or directly use `apply_mean`? + +def compute_sklearn_covariance_matrix(data, regularize_kwargs): + + import sklearn.covariance + + if "assume_centered" in regularize_kwargs and not regularize_kwargs["assume_centered"]: + raise ValueError("Cannot use `assume_centered=False` for `regularize_kwargs`. " "Fixing to `True`.") + + method = regularize_kwargs.pop("method") + regularize_kwargs["assume_centered"] = True + estimator_class = getattr(sklearn.covariance, method) + estimator = estimator_class(**regularize_kwargs) + estimator.fit(data) + cov = estimator.covariance_ + + return cov + + +# 1) factor out to own function +# 2) test function is simple way +# 3) monkeypatch compute covariance to check function is called and returned +# 4) check norm is smaller + +# function for API +whiten = define_function_from_class(source_class=WhitenRecording, name="whiten") From 1c77e2b520b47e7de2243596d4414dce4d65fce6 Mon Sep 17 00:00:00 2001 From: JoeZiminski Date: Mon, 11 Nov 2024 19:47:36 +0000 Subject: [PATCH 04/14] Continue extending whitening tests. --- .../preprocessing/tests/test_whiten.py | 220 ++++++++++++++---- src/spikeinterface/preprocessing/whiten.py | 3 +- 2 files changed, 172 insertions(+), 51 deletions(-) diff --git a/src/spikeinterface/preprocessing/tests/test_whiten.py b/src/spikeinterface/preprocessing/tests/test_whiten.py index 8f03f0577c..93ecf73d87 100644 --- a/src/spikeinterface/preprocessing/tests/test_whiten.py +++ b/src/spikeinterface/preprocessing/tests/test_whiten.py @@ -9,6 +9,9 @@ from pathlib import Path import spikeinterface.full as si # TOOD: is this a bad idea? remove! +################################################# +# Custom Recording - TODO: get feedback and move +################################################# class CustomRecording(BaseRecording): """ """ @@ -32,9 +35,25 @@ def __init__(self, durations, num_channels, channel_ids, sampling_frequency, dty "sampling_frequency": sampling_frequency, } + def set_data(self, data, segment_index=0): + + if data.shape[0] != self.get_num_samples(segment_index=segment_index): + raise ValueError("The first dimension must be the same size as" + "the number of samples.") + + if data.shape[1] != self.get_num_channels(): + raise ValueError("The second dimension of the data be the same" + "size as the number of channels.") + + if data.dtype != self.dtype: + raise ValueError("The dtype of the data must match the recording dtype.") + + self._recording_segments[segment_index].data = data + class CustomRecordingSegment(BaseRecordingSegment): - """ """ + """ + """ def __init__(self, num_samples, num_channels, sampling_frequency): self.num_samples = num_samples @@ -45,10 +64,6 @@ def __init__(self, num_samples, num_channels, sampling_frequency): self.t_start = None self.time_vector = None - def set_data(self, data): - # TODO: do some checks - self.data = data - def get_traces( self, start_frame: int | None = None, @@ -60,18 +75,21 @@ def get_traces( def get_num_samples(self): return self.num_samples +################################################# +# Test Class +################################################# -# TODO: return random cghuns scaled vs unscled +class TestWhiten: + """ -class TestWhiten: + """ def get_float_test_data(self, num_segments, dtype, mean=None, covar=None): """ mention the segment thing """ num_channels = 3 - dtype = np.float32 if mean is None: mean = np.zeros(num_channels) @@ -91,7 +109,7 @@ def get_float_test_data(self, num_segments, dtype, mean=None, covar=None): else: raise ValueError("dtype must be float32 or int16") - recording._recording_segments[0].set_data(seg_1_data) + recording.set_data(seg_1_data) assert np.array_equal(recording.get_traces(segment_index=0), seg_1_data), "segment 1 test setup did not work." return mean, covar, recording @@ -118,46 +136,79 @@ def covar_from_whitening_mat(self, whitened_recording, eps): workflow to be tested rather than subfunction only. """ W = whitened_recording._kwargs["W"] + U, D, Vt = np.linalg.svd(W) D_new = (1 / D) ** 2 - eps S = U @ np.diag(D_new) @ Vt return S - @pytest.mark.parametrize("eps", [1e-8, 1]) + ################################################################################### + # Tests + ################################################################################### + @pytest.mark.parametrize("dtype", [np.float32, np.int16]) - def test_compute_covariance_matrix(self, dtype, eps): - """ """ + def test_compute_covariance_matrix(self, dtype): + """ + + """ eps = 1e-8 mean, covar, recording = self.get_float_test_data(num_segments=1, dtype=dtype) whitened_recording = si.whiten( recording, - apply_mean=True, + apply_mean=False, regularize=False, num_chunks_per_segment=1, chunk_size=recording.get_num_samples(segment_index=0) - 1, eps=eps, + dtype=np.float32, ) - test_covar = self.covar_from_whitening_mat(whitened_recording, eps) + if dtype == np.float32: + test_covar = self.covar_from_whitening_mat(whitened_recording, eps) + assert np.allclose(test_covar, covar, rtol=0, atol=0.01) - assert np.allclose(test_covar, covar, rtol=0, atol=0.01) + # TODO: OWN FUNCTION + X = whitened_recording.get_traces() + X = X - np.mean(X, axis=0) + S = X.T @ X / X.shape[0] - if eps != 1: - X = whitened_recording.get_traces() - X = X - np.mean(X, axis=0) - S = X.T @ X / X.shape[0] + assert np.allclose(S, np.eye(recording.get_num_channels()), rtol=0, atol=1e-4) - assert np.allclose(S, np.eye(recording.get_num_channels()), rtol=0, atol=1e-4) + def test_non_default_eps(self): + """ + + """ + eps = 1 + mean, covar, recording = self.get_float_test_data(num_segments=1, dtype=np.float32) + + whitened_recording = si.whiten( + recording, + apply_mean=False, + regularize=False, + num_chunks_per_segment=1, + chunk_size=recording.get_num_samples(segment_index=0) - 1, + eps=eps, + ) + test_covar = self.covar_from_whitening_mat(whitened_recording, eps) + assert np.allclose(test_covar, covar, rtol=0, atol=0.01) def test_compute_covariance_matrix_float_2_segments(self): - """ """ + """ + + """ eps = 1e-8 mean, covar, recording = self.get_float_test_data(num_segments=2, dtype=np.float32) - recording._recording_segments[1].set_data( - np.zeros((recording.get_num_samples(segment_index=0), recording.get_num_channels())) + all_zero_data = np.zeros( + (recording.get_num_samples(segment_index=0), recording.get_num_channels()), + dtype=np.float32, + ) + + recording.set_data( + all_zero_data, + segment_index=1, ) whitened_recording = si.whiten( @@ -176,7 +227,9 @@ def test_compute_covariance_matrix_float_2_segments(self): @pytest.mark.parametrize("apply_mean", [True, False]) def test_apply_mean(self, apply_mean): + """ + """ means = np.array([10, 20, 30]) eps = 1e-8 @@ -199,14 +252,16 @@ def test_apply_mean(self, apply_mean): else: assert np.allclose(np.diag(test_covar), means**2, rtol=0, atol=5) - breakpoint() # TODO: check whitened data is cov identity even when apply_mean=False... + # TODO: insert test cov is white function + # breakpoint() # TODO: check whitened data is cov identity even when apply_mean=False... def test_compute_sklearn_covariance_matrix(self): """ TODO: assume centered is fixed to True Test some random stuff - # TODO: this is not appropraite for all covariance functions. only one with the fit method! e.g. does not work with leodit_wolf + # TODO: this is not appropraite for all covariance functions. + only one with the fit method! e.g. does not work with leodit_wolf """ from sklearn import covariance @@ -225,7 +280,9 @@ def test_compute_sklearn_covariance_matrix(self): assert np.allclose(test_cov, cov) def test_whiten_regularisation_norm(self): - """ """ + """ + + """ from sklearn import covariance _, _, recording = self.get_float_test_data(num_segments=1, dtype=np.float32) @@ -249,41 +306,105 @@ def test_whiten_regularisation_norm(self): assert np.allclose(test_covar, covar, rtol=0, atol=1e-4) + # TODO: insert test whitened recording is white + def test_local_vs_global_whiten(self): # Make test data with 4 channels, known covar between all # do with radius = 2. compute manually. Will need to change channel locations # check matches well. - pass + + _, _, recording = self.get_float_test_data(num_segments=1, dtype=np.float32) + + y_dist = 2 + recording.set_channel_locations([ + [0.0, 0], + [0.0, y_dist * 1], + [0.0, y_dist * 2], + ]) + + results = {"global": None, "local": None} + + for mode in ["global", "local"]: + whitened_recording = si.whiten( + recording, + apply_mean=True, + num_chunks_per_segment=1, + chunk_size=recording.get_num_samples(segment_index=0) - 1, + eps=1e-8, + mode=mode, + radius_um=y_dist + 1e-01, + ) + results[mode] = whitened_recording + + assert results["local"]._kwargs["W"][0][2] == 0.0 + assert results["global"]._kwargs["W"][0][2] != 0.0 + + # TEST + whitened_data = results["local"].get_traces() + + set_1 = whitened_data[:, :2] - np.mean(whitened_data[:, :2], axis=0) + set_2 = whitened_data[:, 1:] - np.mean(whitened_data[:, 1:], axis=0) + + assert np.allclose( + np.eye(2), set_1.T@set_1 / set_1.shape[0], + rtol=0, atol=1e-2 + ) + assert np.allclose( + np.eye(2), set_2.T@set_2 / set_2.shape[0], + rtol=0, atol=1e-2 + ) + # TODO: own function + X = whitened_data - np.mean(whitened_data, axis=0) + covar_ = X.T@X - X.shape[0] + assert not np.allclose(np.eye(3), covar_, rtol=0, atol=1e-2) def test_passed_W_and_M(self): - pass + """ + TODO: Need options make clear same whitening matrix for all segments. Is this realistic? + """ + num_chan = 4 + recording = self.get_empty_custom_recording(2, num_chan, dtype=np.float32) - def test_all_kwargs(self): - pass + test_W = np.random.normal(size=(num_chan, num_chan)) + test_M = np.random.normal(size=num_chan) - def test_saved_to_disk(self): - # check attributes are saved properly - pass + whitened_recording = si.whiten( + recording, + W=test_W, + M=test_M + ) + for seg_idx in [0, 1]: + assert np.array_equal( + whitened_recording._recording_segments[seg_idx].W, + test_W + ) + assert np.array_equal( + whitened_recording._recording_segments[seg_idx].M, + test_M + ) + + assert whitened_recording._kwargs["W"] == test_W.tolist() + assert whitened_recording._kwargs["M"] == test_M.tolist() + + def test_whiten_general(self, create_cache_folder): + """ + """ + cache_folder = create_cache_folder + rec = generate_recording(num_channels=4, seed=2205) -def test_whiten(create_cache_folder): - cache_folder = create_cache_folder - rec = generate_recording(num_channels=4, seed=2205) + random_chunk_kwargs = {} + W1, M = compute_whitening_matrix(rec, "global", random_chunk_kwargs, apply_mean=False, radius_um=None) - print(rec.get_channel_locations()) - random_chunk_kwargs = {"seed": 2205} - W1, M = compute_whitening_matrix(rec, "global", random_chunk_kwargs, apply_mean=False, radius_um=None) - # print(W) - # print(M) + with pytest.raises(AssertionError): + W, M = compute_whitening_matrix(rec, "local", random_chunk_kwargs, apply_mean=False, radius_um=None) + W, M = compute_whitening_matrix(rec, "local", random_chunk_kwargs, apply_mean=False, radius_um=25) - with pytest.raises(AssertionError): - W, M = compute_whitening_matrix(rec, "local", random_chunk_kwargs, apply_mean=False, radius_um=None) - W, M = compute_whitening_matrix(rec, "local", random_chunk_kwargs, apply_mean=False, radius_um=25) - # W must be sparse - np.sum(W == 0) == 6 + # W must be sparse + np.sum(W == 0) == 6 - rec2 = whiten(rec) - rec2.save(verbose=False) + rec2 = whiten(rec) + rec2.save(verbose=False) # test dtype rec_int = scale(rec2, dtype="int16") @@ -296,7 +417,7 @@ def test_whiten(create_cache_folder): np.testing.assert_array_equal(rec3.get_traces(segment_index=0), rec_par.get_traces(segment_index=0)) with pytest.raises(AssertionError): - rec4 = whiten(rec_int, dtype=None) + rec4 = whiten(rec_int, dtype=None) # int_scale should be applied rec4 = whiten(rec_int, dtype=None, int_scale=256) assert rec4.get_dtype() == "int16" assert rec4._kwargs["M"] is None @@ -304,3 +425,4 @@ def test_whiten(create_cache_folder): # test regularization : norm should be smaller W2, M = compute_whitening_matrix(rec, "global", random_chunk_kwargs, apply_mean=False, regularize=True) assert np.linalg.norm(W1) > np.linalg.norm(W2) + diff --git a/src/spikeinterface/preprocessing/whiten.py b/src/spikeinterface/preprocessing/whiten.py index b2d741c166..7d9d4e8bba 100644 --- a/src/spikeinterface/preprocessing/whiten.py +++ b/src/spikeinterface/preprocessing/whiten.py @@ -213,7 +213,7 @@ def compute_whitening_matrix( assert radius_um is not None n = cov.shape[0] distances = get_channel_distances(recording) - W = np.zeros((n, n), dtype="float64") + W = np.zeros((n, n), dtype="float64") # TODO: should fix to float32 for consistency for c in range(n): (inds,) = np.nonzero(distances[c, :] <= radius_um) cov_local = cov[inds, :][:, inds] @@ -248,7 +248,6 @@ def compute_covariance_matrix( cov = cov / data.shape[0] else: cov = compute_sklearn_covariance_matrix(data, regularize_kwargs) - breakpoint() # import sklearn.covariance # method = regularize_kwargs.pop("method") # regularize_kwargs["assume_centered"] = True From 14c83af103c2d2e17a462b1f760b571c20e3a236 Mon Sep 17 00:00:00 2001 From: JoeZiminski Date: Tue, 12 Nov 2024 13:07:51 +0000 Subject: [PATCH 05/14] Finalise tests, tidy up, add docs, final eps change. --- .../tests/test_detect_bad_channels.py | 1 - .../preprocessing/tests/test_whiten.py | 340 +++++++++++------- src/spikeinterface/preprocessing/whiten.py | 72 ++-- 3 files changed, 245 insertions(+), 168 deletions(-) diff --git a/src/spikeinterface/preprocessing/tests/test_detect_bad_channels.py b/src/spikeinterface/preprocessing/tests/test_detect_bad_channels.py index ba6679132c..4622be1440 100644 --- a/src/spikeinterface/preprocessing/tests/test_detect_bad_channels.py +++ b/src/spikeinterface/preprocessing/tests/test_detect_bad_channels.py @@ -247,7 +247,6 @@ def add_dead_channels(recording, is_dead): data[:, is_dead] = np.random.normal( mean, std * 0.1, size=(is_dead.size, recording.get_num_samples(segment_index)) ).T - breakpoint() recording._recording_segments[segment_index]._traces = data diff --git a/src/spikeinterface/preprocessing/tests/test_whiten.py b/src/spikeinterface/preprocessing/tests/test_whiten.py index 93ecf73d87..01c9051298 100644 --- a/src/spikeinterface/preprocessing/tests/test_whiten.py +++ b/src/spikeinterface/preprocessing/tests/test_whiten.py @@ -6,15 +6,24 @@ from spikeinterface.preprocessing import whiten, scale, compute_whitening_matrix from spikeinterface.preprocessing.whiten import compute_sklearn_covariance_matrix -from pathlib import Path -import spikeinterface.full as si # TOOD: is this a bad idea? remove! +try: + from sklearn import covariance as sklearn_covariance + + HAS_SKLEARN = True +except ImportError: + HAS_SKLEARN = False + ################################################# # Custom Recording - TODO: get feedback and move ################################################# + class CustomRecording(BaseRecording): - """ """ + """ + A convenience class for adding custom data to + a recording for test purposes. + """ def __init__(self, durations, num_channels, channel_ids, sampling_frequency, dtype): BaseRecording.__init__(self, sampling_frequency=sampling_frequency, channel_ids=channel_ids, dtype=dtype) @@ -36,14 +45,16 @@ def __init__(self, durations, num_channels, channel_ids, sampling_frequency, dty } def set_data(self, data, segment_index=0): - + """ + Set the `data` on on the segment of index `segment_index`. + `data` must be the same size (num_samples, num_channels) + and dtype as the reocrding. + """ if data.shape[0] != self.get_num_samples(segment_index=segment_index): - raise ValueError("The first dimension must be the same size as" - "the number of samples.") + raise ValueError("The first dimension must be the same size as" "the number of samples.") if data.shape[1] != self.get_num_channels(): - raise ValueError("The second dimension of the data be the same" - "size as the number of channels.") + raise ValueError("The second dimension of the data be the same" "size as the number of channels.") if data.dtype != self.dtype: raise ValueError("The dtype of the data must match the recording dtype.") @@ -53,6 +64,9 @@ def set_data(self, data, segment_index=0): class CustomRecordingSegment(BaseRecordingSegment): """ + Segment for the `CustomRecording` which simply returns + the set data when `get_traces()` is called. See + `CustomRecording.set_data()` for details on the set data. """ def __init__(self, num_samples, num_channels, sampling_frequency): @@ -66,65 +80,96 @@ def __init__(self, num_samples, num_channels, sampling_frequency): def get_traces( self, - start_frame: int | None = None, - end_frame: int | None = None, - channel_indices: list | None = None, + start_frame=None, + end_frame=None, + channel_indices=None, ): return self.data[start_frame:end_frame, channel_indices] def get_num_samples(self): return self.num_samples + ################################################# # Test Class ################################################# + class TestWhiten: """ + Test the whitening preprocessing step. - + The strategy is to generate a recording that has data + with a known covariance matrix, then testing that the + covariance matrix is computed properly and that the + returned data is indeed white. """ - def get_float_test_data(self, num_segments, dtype, mean=None, covar=None): + def get_float_test_data(self, num_segments, dtype, means=None): """ - mention the segment thing + Generate a set of test data with known covariance matrix and mean. + Test data is drawn from a multivariate Gaussian distribute with + means `mean` and covariance matrix `cov_mat`. + + A fixture is not used because we often want to change the options, + and it is very quick to generate this test data. + + The number of channels (3) and covariance matrix is fixed + and directly tested against in below tests. + + Parameters + ---------- + + num_segments : int + Number of segments for the recording. Note that only the first + segment is filled with data. Data for other segments must be + set manually. + + dtype : np.float32 | np.int16 + Datatype of the generated recording. + + means : None | np.ndarray + The `means` should be an array of length 3 (num samples) + or `None`. If `None`, means will be zero. """ num_channels = 3 - if mean is None: - mean = np.zeros(num_channels) + if means is None: + means = np.zeros(num_channels) - if covar is None: - covar = np.array([[1, 0.5, 0], [0.5, 1, -0.25], [0, -0.25, 1]]) + cov_mat = np.array([[1, 0.5, 0], [0.5, 1, -0.25], [0, -0.25, 1]]) + # Generate recording and multivariate Gaussian data to set recording = self.get_empty_custom_recording(num_segments, num_channels, dtype) - seg_1_data = np.random.multivariate_normal( - mean, covar, recording.get_num_samples(segment_index=0) # TODO: RENAME! - ) + seg_1_data = np.random.multivariate_normal(means, cov_mat, recording.get_num_samples(segment_index=0)) + + # Set the dtype, if `int16`, first scale to +/- 1 then cast to int16 range. if dtype == np.float32: seg_1_data = seg_1_data.astype(dtype) + elif dtype == np.int16: - seg_1_data = np.round(seg_1_data * 32767).astype(np.int16) + seg_1_data /= seg_1_data.max() + seg_1_data = np.round((seg_1_data) * 32767).astype(np.int16) else: raise ValueError("dtype must be float32 or int16") + # Set the data on the recording and return recording.set_data(seg_1_data) assert np.array_equal(recording.get_traces(segment_index=0), seg_1_data), "segment 1 test setup did not work." - return mean, covar, recording + return means, cov_mat, recording def get_empty_custom_recording(self, num_segments, num_channels, dtype): - """ """ return CustomRecording( - durations=[10, 10] if num_segments == 2 else [10], # will auto-fill zeros + durations=[10 for _ in range(num_segments)], num_channels=num_channels, channel_ids=np.arange(num_channels), sampling_frequency=30000, dtype=dtype, ) - def covar_from_whitening_mat(self, whitened_recording, eps): + def cov_mat_from_whitening_mat(self, whitened_recording, eps): """ The whitening matrix is computed as the inverse square root of the covariance matrix @@ -140,7 +185,24 @@ def covar_from_whitening_mat(self, whitened_recording, eps): U, D, Vt = np.linalg.svd(W) D_new = (1 / D) ** 2 - eps S = U @ np.diag(D_new) @ Vt + return S + + def assert_recording_is_white(self, recording): + """ + Compute the covariance matrix of the recording, + and assert that it is close to identity. + """ + X = recording.get_traces() + S = self.compute_cov_mat(X) + assert np.allclose(S, np.eye(recording.get_num_channels()), rtol=0, atol=1e-4) + def compute_cov_mat(self, X): + """ + Estimate the covariance matrix from data + using the standard linear algebra approach. + """ + X = X - np.mean(X, axis=0) + S = X.T @ X / X.shape[0] return S ################################################################################### @@ -150,12 +212,16 @@ def covar_from_whitening_mat(self, whitened_recording, eps): @pytest.mark.parametrize("dtype", [np.float32, np.int16]) def test_compute_covariance_matrix(self, dtype): """ - + Test that the covariance matrix is computed as expected and + data is white after whitening step. Test against float32 and + int16, testing int16 is important to ensure data + is cast to float before computing the covariance matrix, + otherwise it can overflow. """ eps = 1e-8 - mean, covar, recording = self.get_float_test_data(num_segments=1, dtype=dtype) + _, cov_mat, recording = self.get_float_test_data(num_segments=1, dtype=dtype) - whitened_recording = si.whiten( + whitened_recording = whiten( recording, apply_mean=False, regularize=False, @@ -165,25 +231,27 @@ def test_compute_covariance_matrix(self, dtype): dtype=np.float32, ) - if dtype == np.float32: - test_covar = self.covar_from_whitening_mat(whitened_recording, eps) - assert np.allclose(test_covar, covar, rtol=0, atol=0.01) + test_cov_mat = self.cov_mat_from_whitening_mat(whitened_recording, eps) - # TODO: OWN FUNCTION - X = whitened_recording.get_traces() - X = X - np.mean(X, axis=0) - S = X.T @ X / X.shape[0] + # If the data is in `int16` the covariance matrix will be scaled up + # as data is set to +/32767 range before cast. + if dtype == np.int16: + test_cov_mat /= test_cov_mat[0][0] + assert np.allclose(test_cov_mat, cov_mat, rtol=0, atol=0.01) - assert np.allclose(S, np.eye(recording.get_num_channels()), rtol=0, atol=1e-4) + self.assert_recording_is_white(whitened_recording) def test_non_default_eps(self): """ - + Try a new non-default eps and check that it is correctly + propagated to the matrix computation. The test is that + the `cov_mat_from_whitening_mat` should recovery exctly + the cov mat if the correct eps is used. """ eps = 1 - mean, covar, recording = self.get_float_test_data(num_segments=1, dtype=np.float32) + _, cov_mat, recording = self.get_float_test_data(num_segments=1, dtype=np.float32) - whitened_recording = si.whiten( + whitened_recording = whiten( recording, apply_mean=False, regularize=False, @@ -191,15 +259,20 @@ def test_non_default_eps(self): chunk_size=recording.get_num_samples(segment_index=0) - 1, eps=eps, ) - test_covar = self.covar_from_whitening_mat(whitened_recording, eps) - assert np.allclose(test_covar, covar, rtol=0, atol=0.01) + test_cov_mat = self.cov_mat_from_whitening_mat(whitened_recording, eps) + assert np.allclose(test_cov_mat, cov_mat, rtol=0, atol=0.01) - def test_compute_covariance_matrix_float_2_segments(self): + def test_compute_covariance_matrix_2_segments(self): """ - + Check that the covariance marix is estimated from across + segments in a multi-segment recording. This is done simply + by setting the second segment as all zeros and checking the + estimated covariances are all halved. This makes sense as + the zeros do not affect the covariance estimation + but the covariance matrix is scaled by 1 / N. """ eps = 1e-8 - mean, covar, recording = self.get_float_test_data(num_segments=2, dtype=np.float32) + _, cov_mat, recording = self.get_float_test_data(num_segments=2, dtype=np.float32) all_zero_data = np.zeros( (recording.get_num_samples(segment_index=0), recording.get_num_channels()), @@ -211,7 +284,7 @@ def test_compute_covariance_matrix_float_2_segments(self): segment_index=1, ) - whitened_recording = si.whiten( + whitened_recording = whiten( recording, apply_mean=True, regularize=False, @@ -221,21 +294,25 @@ def test_compute_covariance_matrix_float_2_segments(self): eps=eps, ) - test_covar = self.covar_from_whitening_mat(whitened_recording, eps) + test_cov_mat = self.cov_mat_from_whitening_mat(whitened_recording, eps) - assert np.allclose(test_covar, covar / 2, rtol=0, atol=0.01) + assert np.allclose(test_cov_mat, cov_mat / 2, rtol=0, atol=0.01) @pytest.mark.parametrize("apply_mean", [True, False]) def test_apply_mean(self, apply_mean): """ - + Test that the `apply_mean` argument is propagated correctly. + Note that in the case `apply_mean=False`, the covariance matrix + is in unusual scaling and so the varainces alone are checked. + Also, the data is not as well whitened and so this is not + tested against. """ means = np.array([10, 20, 30]) eps = 1e-8 - mean, covar, recording = self.get_float_test_data(num_segments=1, dtype=np.float32, mean=means) + _, cov_mat, recording = self.get_float_test_data(num_segments=1, dtype=np.float32, means=means) - whitened_recording = si.whiten( + whitened_recording = whiten( recording, apply_mean=apply_mean, regularize=False, @@ -245,49 +322,53 @@ def test_apply_mean(self, apply_mean): eps=eps, ) - test_covar = self.covar_from_whitening_mat(whitened_recording, eps) + test_cov_mat = self.cov_mat_from_whitening_mat(whitened_recording, eps) if apply_mean: - assert np.allclose(test_covar, covar, rtol=0, atol=0.01) + assert np.allclose(test_cov_mat, cov_mat, rtol=0, atol=0.01) else: - assert np.allclose(np.diag(test_covar), means**2, rtol=0, atol=5) + assert np.allclose(np.diag(test_cov_mat), means**2, rtol=0, atol=5) - # TODO: insert test cov is white function - # breakpoint() # TODO: check whitened data is cov identity even when apply_mean=False... + # Note the recording is typically not white if the mean is + # not removed prior to covariance matrix estimation. + if apply_mean: + self.assert_recording_is_white(whitened_recording) + @pytest.mark.skipif(not HAS_SKLEARN, reason="sklearn must be installed.") def test_compute_sklearn_covariance_matrix(self): """ - TODO: assume centered is fixed to True - Test some random stuff + A basic check that the `compute_sklearn_covariance_matrix` + function from `whiten.py` computes the same matrix + as using the sklearn function directly for some + arbitraily chosen methods / parameters. - # TODO: this is not appropraite for all covariance functions. - only one with the fit method! e.g. does not work with leodit_wolf + Note that the function-style sklearn covariance + methods are not supported. """ - from sklearn import covariance - X = np.random.randn(100, 100) test_cov = compute_sklearn_covariance_matrix( X, {"method": "GraphicalLasso", "alpha": 1, "enet_tol": 0.04} ) # RENAME test_cov - cov = covariance.GraphicalLasso(alpha=1, enet_tol=0.04, assume_centered=True).fit(X).covariance_ + cov = sklearn_covariance.GraphicalLasso(alpha=1, enet_tol=0.04, assume_centered=True).fit(X).covariance_ assert np.allclose(test_cov, cov) test_cov = compute_sklearn_covariance_matrix( X, {"method": "ShrunkCovariance", "shrinkage": 0.3} ) # RENAME test_cov - cov = covariance.ShrunkCovariance(shrinkage=0.3, assume_centered=True).fit(X).covariance_ + cov = sklearn_covariance.ShrunkCovariance(shrinkage=0.3, assume_centered=True).fit(X).covariance_ assert np.allclose(test_cov, cov) + @pytest.mark.skipif(not HAS_SKLEARN, reason="sklearn must be installed.") def test_whiten_regularisation_norm(self): """ - + Check that the covariance matrix estimated by the + whitening preprocessing is the same as the one + computed from sklearn when regularise kwargs are given. """ - from sklearn import covariance - _, _, recording = self.get_float_test_data(num_segments=1, dtype=np.float32) - whitened_recording = si.whiten( + whitened_recording = whiten( recording, regularize=True, regularize_kwargs={"method": "ShrunkCovariance", "shrinkage": 0.3}, @@ -297,35 +378,37 @@ def test_whiten_regularisation_norm(self): eps=1e-8, ) - test_covar = self.covar_from_whitening_mat(whitened_recording, eps=1e-8) + test_cov_mat = self.cov_mat_from_whitening_mat(whitened_recording, eps=1e-8) + # Compute covariance matrix using sklearn directly and compare. X = recording.get_traces()[:-1, :] X = X - np.mean(X, axis=0) + cov_mat = sklearn_covariance.ShrunkCovariance(shrinkage=0.3, assume_centered=True).fit(X).covariance_ - covar = covariance.ShrunkCovariance(shrinkage=0.3, assume_centered=True).fit(X).covariance_ - - assert np.allclose(test_covar, covar, rtol=0, atol=1e-4) - - # TODO: insert test whitened recording is white + assert np.allclose(test_cov_mat, cov_mat, rtol=0, atol=1e-4) def test_local_vs_global_whiten(self): - # Make test data with 4 channels, known covar between all - # do with radius = 2. compute manually. Will need to change channel locations - # check matches well. - + """ + Generate a set of channels each separated by y_dist. Set the + radius_um to just above y_dist such that only neighbouring + channels are considered for whitening. Test that whitening + is correct for the first pair and last pair. + """ _, _, recording = self.get_float_test_data(num_segments=1, dtype=np.float32) y_dist = 2 - recording.set_channel_locations([ - [0.0, 0], - [0.0, y_dist * 1], - [0.0, y_dist * 2], - ]) + recording.set_channel_locations( + [ + [0.0, 0], + [0.0, y_dist * 1], + [0.0, y_dist * 2], + ] + ) results = {"global": None, "local": None} for mode in ["global", "local"]: - whitened_recording = si.whiten( + whitened_recording = whiten( recording, apply_mean=True, num_chunks_per_segment=1, @@ -339,28 +422,27 @@ def test_local_vs_global_whiten(self): assert results["local"]._kwargs["W"][0][2] == 0.0 assert results["global"]._kwargs["W"][0][2] != 0.0 - # TEST - whitened_data = results["local"].get_traces() + # Parse out the data into two pairs of channels + # from which the local variance was computed. + whitened_data = results["local"].get_traces() set_1 = whitened_data[:, :2] - np.mean(whitened_data[:, :2], axis=0) set_2 = whitened_data[:, 1:] - np.mean(whitened_data[:, 1:], axis=0) - assert np.allclose( - np.eye(2), set_1.T@set_1 / set_1.shape[0], - rtol=0, atol=1e-2 - ) - assert np.allclose( - np.eye(2), set_2.T@set_2 / set_2.shape[0], - rtol=0, atol=1e-2 - ) - # TODO: own function - X = whitened_data - np.mean(whitened_data, axis=0) - covar_ = X.T@X - X.shape[0] - assert not np.allclose(np.eye(3), covar_, rtol=0, atol=1e-2) + # Check that the pairs of close channels + # whitened together are white + assert np.allclose(np.eye(2), self.compute_cov_mat(set_1), rtol=0, atol=1e-2) + assert np.allclose(np.eye(2), self.compute_cov_mat(set_2), rtol=0, atol=1e-2) + + # Check that the data overall is not white + assert not np.allclose(np.eye(3), self.compute_cov_mat(whitened_data), rtol=0, atol=1e-2) def test_passed_W_and_M(self): """ - TODO: Need options make clear same whitening matrix for all segments. Is this realistic? + Check that passing W (whitening matrix) and M (means) is + sucessfully propagated to the relevant segments and stored + on the kwargs. It is assumed if this is true, they will + be used for the actual whitening computation. """ num_chan = 4 recording = self.get_empty_custom_recording(2, num_chan, dtype=np.float32) @@ -368,32 +450,28 @@ def test_passed_W_and_M(self): test_W = np.random.normal(size=(num_chan, num_chan)) test_M = np.random.normal(size=num_chan) - whitened_recording = si.whiten( - recording, - W=test_W, - M=test_M - ) + whitened_recording = whiten(recording, W=test_W, M=test_M) for seg_idx in [0, 1]: - assert np.array_equal( - whitened_recording._recording_segments[seg_idx].W, - test_W - ) - assert np.array_equal( - whitened_recording._recording_segments[seg_idx].M, - test_M - ) + assert np.array_equal(whitened_recording._recording_segments[seg_idx].W, test_W) + assert np.array_equal(whitened_recording._recording_segments[seg_idx].M, test_M) assert whitened_recording._kwargs["W"] == test_W.tolist() assert whitened_recording._kwargs["M"] == test_M.tolist() def test_whiten_general(self, create_cache_folder): """ + Perform some general tests on the whitening functionality. + + First, perform smoke test that `compute_whitening_matrix` is running, + check recording output datatypes are as expected. Check that + saving preseves datatype, `int_scale` is propagated, and + regularisation reduces the norm. """ cache_folder = create_cache_folder rec = generate_recording(num_channels=4, seed=2205) - random_chunk_kwargs = {} + random_chunk_kwargs = {"seed": 2205} W1, M = compute_whitening_matrix(rec, "global", random_chunk_kwargs, apply_mean=False, radius_um=None) with pytest.raises(AssertionError): @@ -406,23 +484,23 @@ def test_whiten_general(self, create_cache_folder): rec2 = whiten(rec) rec2.save(verbose=False) - # test dtype - rec_int = scale(rec2, dtype="int16") - rec3 = whiten(rec_int, dtype="float16") - rec3 = rec3.save(folder=cache_folder / "rec1") - assert rec3.get_dtype() == "float16" + # test dtype + rec_int = scale(rec2, dtype="int16") + rec3 = whiten(rec_int, dtype="float16") + rec3 = rec3.save(folder=cache_folder / "rec1") + assert rec3.get_dtype() == "float16" - # test parallel - rec_par = rec3.save(folder=cache_folder / "rec_par", n_jobs=2) - np.testing.assert_array_equal(rec3.get_traces(segment_index=0), rec_par.get_traces(segment_index=0)) - - with pytest.raises(AssertionError): - rec4 = whiten(rec_int, dtype=None) # int_scale should be applied - rec4 = whiten(rec_int, dtype=None, int_scale=256) - assert rec4.get_dtype() == "int16" - assert rec4._kwargs["M"] is None - - # test regularization : norm should be smaller - W2, M = compute_whitening_matrix(rec, "global", random_chunk_kwargs, apply_mean=False, regularize=True) - assert np.linalg.norm(W1) > np.linalg.norm(W2) + # test parallel + rec_par = rec3.save(folder=cache_folder / "rec_par", n_jobs=2) + np.testing.assert_array_equal(rec3.get_traces(segment_index=0), rec_par.get_traces(segment_index=0)) + with pytest.raises(AssertionError): + rec4 = whiten(rec_int, dtype=None) # int_scale should be applied + rec4 = whiten(rec_int, dtype=None, int_scale=256) + assert rec4.get_dtype() == "int16" + assert rec4._kwargs["M"] is None + + # test regularization : norm should be smaller + if HAS_SKLEARN: + W2, M = compute_whitening_matrix(rec, "global", random_chunk_kwargs, apply_mean=False, regularize=True) + assert np.linalg.norm(W1) > np.linalg.norm(W2) diff --git a/src/spikeinterface/preprocessing/whiten.py b/src/spikeinterface/preprocessing/whiten.py index 7d9d4e8bba..8e5c761012 100644 --- a/src/spikeinterface/preprocessing/whiten.py +++ b/src/spikeinterface/preprocessing/whiten.py @@ -47,7 +47,8 @@ class WhitenRecording(BasePreprocessor): of sklearn, specified in regularize_kwargs. Default is GraphicalLassoCV regularize_kwargs : {'method' : 'GraphicalLassoCV'} Dictionary of the parameters that could be provided to the method of sklearn, if - the covariance matrix needs to be regularized. + the covariance matrix needs to be regularized. Note that sklearn covariance methods + that are implemented as functions, not classes, are not supported. **random_chunk_kwargs : Keyword arguments for `spikeinterface.core.get_random_data_chunk()` function Returns @@ -75,9 +76,9 @@ def __init__( dtype_ = fix_dtype(recording, dtype) if dtype_.kind == "i": - assert int_scale is not None, ( - "For recording with dtype=int you must set the output dtype to float OR set a int_scale" - ) + assert ( + int_scale is not None + ), "For recording with dtype=int you must set the output dtype to float OR set a int_scale" if not apply_mean and regularize: raise ValueError("`apply_mean` must be `True` if regularising. `assume_centered` is fixed to `True`.") @@ -197,28 +198,25 @@ def compute_whitening_matrix( # type and we estimate a more reasonable eps in the case # where the data is on a scale less than 1. if eps is None: - eps = 1e-8 - - if data.dtype.kind == "f": - median_data_sqr = np.median(data**2) # use the square because cov (and hence S) scales as the square - if median_data_sqr < 1 and median_data_sqr > 0: - if eps is None: + if data.dtype.kind == "f": + median_data_sqr = np.median(data**2) # use the square because cov (and hence S) scales as the square + if median_data_sqr < 1 and median_data_sqr > 0: eps = max(1e-16, median_data_sqr * 1e-3) # use a small fraction of the median of the squared data + else: + eps = 1e-8 if mode == "global": - U, S, Ut = np.linalg.svd(cov, full_matrices=True) - W = (U @ np.diag(1 / np.sqrt(S + eps))) @ Ut + W = compute_whitening_from_covariance(cov, eps) elif mode == "local": assert radius_um is not None n = cov.shape[0] distances = get_channel_distances(recording) - W = np.zeros((n, n), dtype="float64") # TODO: should fix to float32 for consistency + W = np.zeros((n, n), dtype="float32") for c in range(n): (inds,) = np.nonzero(distances[c, :] <= radius_um) cov_local = cov[inds, :][:, inds] - U, S, Ut = np.linalg.svd(cov_local, full_matrices=True) - W_local = (U @ np.diag(1 / np.sqrt(S + eps))) @ Ut + W_local = compute_whitening_from_covariance(cov_local, eps) W[inds, c] = W_local[c == inds] else: raise ValueError(f"compute_whitening_matrix : wrong mode {mode}") @@ -226,10 +224,22 @@ def compute_whitening_matrix( return W, M -def compute_covariance_matrix( - recording, apply_mean, regularize, regularize_kwargs, random_chunk_kwargs -): # TODO: check order - """ """ +def compute_whitening_from_covariance(cov, eps): + """ + Compute the whitening matrix from the covariance + matrix using ZCA whitening approach. Note the `eps` + ensures division by zero is not possible and regularises. + """ + U, S, Ut = np.linalg.svd(cov, full_matrices=True) + W = (U @ np.diag(1 / np.sqrt(S + eps))) @ Ut + return W + + +def compute_covariance_matrix(recording, apply_mean, regularize, regularize_kwargs, random_chunk_kwargs): + """ + Compute the covariance matrix from randomly sampled data chunsk. + See `compute_whitening_matrix()` for parameters. + """ random_data = get_random_data_chunks(recording, concatenated=True, return_scaled=False, **random_chunk_kwargs) random_data = random_data.astype(np.float32) @@ -248,26 +258,21 @@ def compute_covariance_matrix( cov = cov / data.shape[0] else: cov = compute_sklearn_covariance_matrix(data, regularize_kwargs) - # import sklearn.covariance - # method = regularize_kwargs.pop("method") - # regularize_kwargs["assume_centered"] = True - # estimator_class = getattr(sklearn.covariance, method) - # estimator = estimator_class(**regularize_kwargs) - # estimator.fit(data) - # cov = estimator.covariance_ - - return data, cov, M # TODO: rename data - -# TODO: do we want to fix assume centered here or directly use `apply_mean`? + return data, cov, M def compute_sklearn_covariance_matrix(data, regularize_kwargs): + """ + Estimate the covariance matrix using scikit-learn functions. + Note that sklearn covariance methods that are implemented + as functions, not classes, are not supported. + """ import sklearn.covariance if "assume_centered" in regularize_kwargs and not regularize_kwargs["assume_centered"]: - raise ValueError("Cannot use `assume_centered=False` for `regularize_kwargs`. " "Fixing to `True`.") + raise ValueError("Cannot use `assume_centered=False` for `regularize_kwargs`. Fixing to `True`.") method = regularize_kwargs.pop("method") regularize_kwargs["assume_centered"] = True @@ -279,10 +284,5 @@ def compute_sklearn_covariance_matrix(data, regularize_kwargs): return cov -# 1) factor out to own function -# 2) test function is simple way -# 3) monkeypatch compute covariance to check function is called and returned -# 4) check norm is smaller - # function for API whiten = define_function_from_class(source_class=WhitenRecording, name="whiten") From 1d84e5de9249b77552b9cd9c87243326ad739f70 Mon Sep 17 00:00:00 2001 From: JoeZiminski Date: Tue, 12 Nov 2024 14:57:54 +0000 Subject: [PATCH 06/14] Amend eps further. --- .../preprocessing/tests/test_whiten.py | 12 ++++++------ src/spikeinterface/preprocessing/whiten.py | 9 ++++----- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/spikeinterface/preprocessing/tests/test_whiten.py b/src/spikeinterface/preprocessing/tests/test_whiten.py index 01c9051298..1abed61a79 100644 --- a/src/spikeinterface/preprocessing/tests/test_whiten.py +++ b/src/spikeinterface/preprocessing/tests/test_whiten.py @@ -218,7 +218,7 @@ def test_compute_covariance_matrix(self, dtype): is cast to float before computing the covariance matrix, otherwise it can overflow. """ - eps = 1e-8 + eps = 1e-16 _, cov_mat, recording = self.get_float_test_data(num_segments=1, dtype=dtype) whitened_recording = whiten( @@ -271,7 +271,7 @@ def test_compute_covariance_matrix_2_segments(self): the zeros do not affect the covariance estimation but the covariance matrix is scaled by 1 / N. """ - eps = 1e-8 + eps = 1e-16 _, cov_mat, recording = self.get_float_test_data(num_segments=2, dtype=np.float32) all_zero_data = np.zeros( @@ -309,7 +309,7 @@ def test_apply_mean(self, apply_mean): """ means = np.array([10, 20, 30]) - eps = 1e-8 + eps = 1e-16 _, cov_mat, recording = self.get_float_test_data(num_segments=1, dtype=np.float32, means=means) whitened_recording = whiten( @@ -375,10 +375,10 @@ def test_whiten_regularisation_norm(self): apply_mean=True, num_chunks_per_segment=1, chunk_size=recording.get_num_samples(segment_index=0) - 1, - eps=1e-8, + eps=1e-16, ) - test_cov_mat = self.cov_mat_from_whitening_mat(whitened_recording, eps=1e-8) + test_cov_mat = self.cov_mat_from_whitening_mat(whitened_recording, eps=1e-16) # Compute covariance matrix using sklearn directly and compare. X = recording.get_traces()[:-1, :] @@ -413,7 +413,7 @@ def test_local_vs_global_whiten(self): apply_mean=True, num_chunks_per_segment=1, chunk_size=recording.get_num_samples(segment_index=0) - 1, - eps=1e-8, + eps=1e-16, mode=mode, radius_um=y_dist + 1e-01, ) diff --git a/src/spikeinterface/preprocessing/whiten.py b/src/spikeinterface/preprocessing/whiten.py index 8e5c761012..03e56ae20b 100644 --- a/src/spikeinterface/preprocessing/whiten.py +++ b/src/spikeinterface/preprocessing/whiten.py @@ -198,12 +198,11 @@ def compute_whitening_matrix( # type and we estimate a more reasonable eps in the case # where the data is on a scale less than 1. if eps is None: - if data.dtype.kind == "f": - median_data_sqr = np.median(data**2) # use the square because cov (and hence S) scales as the square - if median_data_sqr < 1 and median_data_sqr > 0: - eps = max(1e-16, median_data_sqr * 1e-3) # use a small fraction of the median of the squared data + median_data_sqr = np.median(data**2) # use the square because cov (and hence S) scales as the square + if median_data_sqr < 1 and median_data_sqr > 0: + eps = max(1e-16, median_data_sqr * 1e-3) # use a small fraction of the median of the squared data else: - eps = 1e-8 + eps = 1e-16 if mode == "global": W = compute_whitening_from_covariance(cov, eps) From e7ac4522f2bc842dfcfb60077e2493605aa6a98d Mon Sep 17 00:00:00 2001 From: JoeZiminski Date: Fri, 15 Nov 2024 10:34:23 +0000 Subject: [PATCH 07/14] Use NumpyRecording and delete CustomRecording --- src/spikeinterface/core/numpyextractors.py | 18 ++++ .../preprocessing/tests/test_whiten.py | 102 +++--------------- 2 files changed, 31 insertions(+), 89 deletions(-) diff --git a/src/spikeinterface/core/numpyextractors.py b/src/spikeinterface/core/numpyextractors.py index f4790817a8..406f1372b6 100644 --- a/src/spikeinterface/core/numpyextractors.py +++ b/src/spikeinterface/core/numpyextractors.py @@ -38,6 +38,7 @@ class NumpyRecording(BaseRecording): """ def __init__(self, traces_list, sampling_frequency, t_starts=None, channel_ids=None): + if isinstance(traces_list, list): all_elements_are_list = all(isinstance(e, list) for e in traces_list) if all_elements_are_list: @@ -103,6 +104,23 @@ def from_recording(source_recording, **job_kwargs): ) return recording + def update_traces(self, traces, segment_index=0): + """ + Set the `traces` on on the segment of index `segment_index`. + `traces` must be the same size (num_samples, num_channels) + and dtype as the recording. + """ + if traces.shape[0] != self.get_num_samples(segment_index=segment_index): + raise ValueError("The first dimension must be the same size as" "the number of samples.") + + if traces.shape[1] != self.get_num_channels(): + raise ValueError("The second dimension of the data be the same" "size as the number of channels.") + + if traces.dtype != self.dtype: + raise ValueError("The dtype of the data must match the recording dtype.") + + self._recording_segments[segment_index]._traces = traces + class NumpyRecordingSegment(BaseRecordingSegment): def __init__(self, traces, sampling_frequency, t_start): diff --git a/src/spikeinterface/preprocessing/tests/test_whiten.py b/src/spikeinterface/preprocessing/tests/test_whiten.py index 1abed61a79..0a594f97a3 100644 --- a/src/spikeinterface/preprocessing/tests/test_whiten.py +++ b/src/spikeinterface/preprocessing/tests/test_whiten.py @@ -3,6 +3,7 @@ from spikeinterface.core import generate_recording from spikeinterface.core import BaseRecording, BaseRecordingSegment +from spikeinterface.core.numpyextractors import NumpyRecording from spikeinterface.preprocessing import whiten, scale, compute_whitening_matrix from spikeinterface.preprocessing.whiten import compute_sklearn_covariance_matrix @@ -14,82 +15,6 @@ HAS_SKLEARN = False -################################################# -# Custom Recording - TODO: get feedback and move -################################################# - - -class CustomRecording(BaseRecording): - """ - A convenience class for adding custom data to - a recording for test purposes. - """ - - def __init__(self, durations, num_channels, channel_ids, sampling_frequency, dtype): - BaseRecording.__init__(self, sampling_frequency=sampling_frequency, channel_ids=channel_ids, dtype=dtype) - - num_samples = [dur * sampling_frequency for dur in durations] - - for sample_num in num_samples: - rec_segment = CustomRecordingSegment( - sample_num, - num_channels, - sampling_frequency, - ) - self.add_recording_segment(rec_segment) - - self._kwargs = { - "num_channels": num_channels, - "durations": durations, - "sampling_frequency": sampling_frequency, - } - - def set_data(self, data, segment_index=0): - """ - Set the `data` on on the segment of index `segment_index`. - `data` must be the same size (num_samples, num_channels) - and dtype as the reocrding. - """ - if data.shape[0] != self.get_num_samples(segment_index=segment_index): - raise ValueError("The first dimension must be the same size as" "the number of samples.") - - if data.shape[1] != self.get_num_channels(): - raise ValueError("The second dimension of the data be the same" "size as the number of channels.") - - if data.dtype != self.dtype: - raise ValueError("The dtype of the data must match the recording dtype.") - - self._recording_segments[segment_index].data = data - - -class CustomRecordingSegment(BaseRecordingSegment): - """ - Segment for the `CustomRecording` which simply returns - the set data when `get_traces()` is called. See - `CustomRecording.set_data()` for details on the set data. - """ - - def __init__(self, num_samples, num_channels, sampling_frequency): - self.num_samples = num_samples - self.num_channels = num_channels - self.sampling_frequency = sampling_frequency - self.data = np.zeros((num_samples, num_channels)) - - self.t_start = None - self.time_vector = None - - def get_traces( - self, - start_frame=None, - end_frame=None, - channel_indices=None, - ): - return self.data[start_frame:end_frame, channel_indices] - - def get_num_samples(self): - return self.num_samples - - ################################################# # Test Class ################################################# @@ -155,18 +80,21 @@ def get_float_test_data(self, num_segments, dtype, means=None): raise ValueError("dtype must be float32 or int16") # Set the data on the recording and return - recording.set_data(seg_1_data) + recording.update_traces(seg_1_data) assert np.array_equal(recording.get_traces(segment_index=0), seg_1_data), "segment 1 test setup did not work." return means, cov_mat, recording def get_empty_custom_recording(self, num_segments, num_channels, dtype): - return CustomRecording( - durations=[10 for _ in range(num_segments)], - num_channels=num_channels, - channel_ids=np.arange(num_channels), + + sampling_frequency = 30000 + num_samples = int(10 * sampling_frequency) + + traces_list = [np.zeros((num_samples, num_channels), dtype=dtype) for _ in range(num_segments)] + + return NumpyRecording( + traces_list, sampling_frequency=30000, - dtype=dtype, ) def cov_mat_from_whitening_mat(self, whitened_recording, eps): @@ -279,7 +207,7 @@ def test_compute_covariance_matrix_2_segments(self): dtype=np.float32, ) - recording.set_data( + recording.update_traces( all_zero_data, segment_index=1, ) @@ -347,15 +275,11 @@ def test_compute_sklearn_covariance_matrix(self): """ X = np.random.randn(100, 100) - test_cov = compute_sklearn_covariance_matrix( - X, {"method": "GraphicalLasso", "alpha": 1, "enet_tol": 0.04} - ) # RENAME test_cov + test_cov = compute_sklearn_covariance_matrix(X, {"method": "GraphicalLasso", "alpha": 1, "enet_tol": 0.04}) cov = sklearn_covariance.GraphicalLasso(alpha=1, enet_tol=0.04, assume_centered=True).fit(X).covariance_ assert np.allclose(test_cov, cov) - test_cov = compute_sklearn_covariance_matrix( - X, {"method": "ShrunkCovariance", "shrinkage": 0.3} - ) # RENAME test_cov + test_cov = compute_sklearn_covariance_matrix(X, {"method": "ShrunkCovariance", "shrinkage": 0.3}) cov = sklearn_covariance.ShrunkCovariance(shrinkage=0.3, assume_centered=True).fit(X).covariance_ assert np.allclose(test_cov, cov) From 4a5ff585a45bfe34cdfaa07a99c816135bfefb10 Mon Sep 17 00:00:00 2001 From: JoeZiminski Date: Fri, 15 Nov 2024 11:02:27 +0000 Subject: [PATCH 08/14] Rework to avoid introducing a 'update_traces' function. --- src/spikeinterface/core/numpyextractors.py | 18 ----- .../preprocessing/tests/test_whiten.py | 74 +++++++++---------- 2 files changed, 34 insertions(+), 58 deletions(-) diff --git a/src/spikeinterface/core/numpyextractors.py b/src/spikeinterface/core/numpyextractors.py index 406f1372b6..f4790817a8 100644 --- a/src/spikeinterface/core/numpyextractors.py +++ b/src/spikeinterface/core/numpyextractors.py @@ -38,7 +38,6 @@ class NumpyRecording(BaseRecording): """ def __init__(self, traces_list, sampling_frequency, t_starts=None, channel_ids=None): - if isinstance(traces_list, list): all_elements_are_list = all(isinstance(e, list) for e in traces_list) if all_elements_are_list: @@ -104,23 +103,6 @@ def from_recording(source_recording, **job_kwargs): ) return recording - def update_traces(self, traces, segment_index=0): - """ - Set the `traces` on on the segment of index `segment_index`. - `traces` must be the same size (num_samples, num_channels) - and dtype as the recording. - """ - if traces.shape[0] != self.get_num_samples(segment_index=segment_index): - raise ValueError("The first dimension must be the same size as" "the number of samples.") - - if traces.shape[1] != self.get_num_channels(): - raise ValueError("The second dimension of the data be the same" "size as the number of channels.") - - if traces.dtype != self.dtype: - raise ValueError("The dtype of the data must match the recording dtype.") - - self._recording_segments[segment_index]._traces = traces - class NumpyRecordingSegment(BaseRecordingSegment): def __init__(self, traces, sampling_frequency, t_start): diff --git a/src/spikeinterface/preprocessing/tests/test_whiten.py b/src/spikeinterface/preprocessing/tests/test_whiten.py index 0a594f97a3..7aa3bf6705 100644 --- a/src/spikeinterface/preprocessing/tests/test_whiten.py +++ b/src/spikeinterface/preprocessing/tests/test_whiten.py @@ -30,7 +30,7 @@ class TestWhiten: returned data is indeed white. """ - def get_float_test_data(self, num_segments, dtype, means=None): + def get_test_recording(self, num_segments, dtype, means=None): """ Generate a set of test data with known covariance matrix and mean. Test data is drawn from a multivariate Gaussian distribute with @@ -57,6 +57,17 @@ def get_float_test_data(self, num_segments, dtype, means=None): The `means` should be an array of length 3 (num samples) or `None`. If `None`, means will be zero. """ + sampling_frequency = 30000 + num_samples = int(10 * sampling_frequency) # 10 s recording + + means, cov_mat, data = self.get_test_data_with_known_distribution(num_samples, dtype, means) + + recording = NumpyRecording([data], sampling_frequency) + + return means, cov_mat, recording + + def get_test_data_with_known_distribution(self, num_samples, dtype, means=None): + """ """ num_channels = 3 if means is None: @@ -64,38 +75,19 @@ def get_float_test_data(self, num_segments, dtype, means=None): cov_mat = np.array([[1, 0.5, 0], [0.5, 1, -0.25], [0, -0.25, 1]]) - # Generate recording and multivariate Gaussian data to set - recording = self.get_empty_custom_recording(num_segments, num_channels, dtype) - - seg_1_data = np.random.multivariate_normal(means, cov_mat, recording.get_num_samples(segment_index=0)) + data = np.random.multivariate_normal(means, cov_mat, num_samples) # Set the dtype, if `int16`, first scale to +/- 1 then cast to int16 range. if dtype == np.float32: - seg_1_data = seg_1_data.astype(dtype) + data = data.astype(dtype) elif dtype == np.int16: - seg_1_data /= seg_1_data.max() - seg_1_data = np.round((seg_1_data) * 32767).astype(np.int16) + data /= data.max() + data = np.round((data) * 32767).astype(np.int16) else: raise ValueError("dtype must be float32 or int16") - # Set the data on the recording and return - recording.update_traces(seg_1_data) - assert np.array_equal(recording.get_traces(segment_index=0), seg_1_data), "segment 1 test setup did not work." - - return means, cov_mat, recording - - def get_empty_custom_recording(self, num_segments, num_channels, dtype): - - sampling_frequency = 30000 - num_samples = int(10 * sampling_frequency) - - traces_list = [np.zeros((num_samples, num_channels), dtype=dtype) for _ in range(num_segments)] - - return NumpyRecording( - traces_list, - sampling_frequency=30000, - ) + return means, cov_mat, data def cov_mat_from_whitening_mat(self, whitened_recording, eps): """ @@ -147,7 +139,7 @@ def test_compute_covariance_matrix(self, dtype): otherwise it can overflow. """ eps = 1e-16 - _, cov_mat, recording = self.get_float_test_data(num_segments=1, dtype=dtype) + _, cov_mat, recording = self.get_test_recording(num_segments=1, dtype=dtype) whitened_recording = whiten( recording, @@ -177,7 +169,7 @@ def test_non_default_eps(self): the cov mat if the correct eps is used. """ eps = 1 - _, cov_mat, recording = self.get_float_test_data(num_segments=1, dtype=np.float32) + _, cov_mat, recording = self.get_test_recording(num_segments=1, dtype=np.float32) whitened_recording = whiten( recording, @@ -200,17 +192,14 @@ def test_compute_covariance_matrix_2_segments(self): but the covariance matrix is scaled by 1 / N. """ eps = 1e-16 - _, cov_mat, recording = self.get_float_test_data(num_segments=2, dtype=np.float32) + sampling_frequency = 30000 + num_samples = 10 * sampling_frequency - all_zero_data = np.zeros( - (recording.get_num_samples(segment_index=0), recording.get_num_channels()), - dtype=np.float32, - ) + _, cov_mat, data = self.get_test_data_with_known_distribution(num_samples, np.float32) - recording.update_traces( - all_zero_data, - segment_index=1, - ) + traces_list = [data, np.zeros_like(data)] + + recording = NumpyRecording(traces_list, sampling_frequency) whitened_recording = whiten( recording, @@ -238,7 +227,7 @@ def test_apply_mean(self, apply_mean): means = np.array([10, 20, 30]) eps = 1e-16 - _, cov_mat, recording = self.get_float_test_data(num_segments=1, dtype=np.float32, means=means) + _, cov_mat, recording = self.get_test_recording(num_segments=1, dtype=np.float32, means=means) whitened_recording = whiten( recording, @@ -290,7 +279,7 @@ def test_whiten_regularisation_norm(self): whitening preprocessing is the same as the one computed from sklearn when regularise kwargs are given. """ - _, _, recording = self.get_float_test_data(num_segments=1, dtype=np.float32) + _, _, recording = self.get_test_recording(num_segments=1, dtype=np.float32) whitened_recording = whiten( recording, @@ -318,7 +307,7 @@ def test_local_vs_global_whiten(self): channels are considered for whitening. Test that whitening is correct for the first pair and last pair. """ - _, _, recording = self.get_float_test_data(num_segments=1, dtype=np.float32) + _, _, recording = self.get_test_recording(num_segments=1, dtype=np.float32) y_dist = 2 recording.set_channel_locations( @@ -369,7 +358,12 @@ def test_passed_W_and_M(self): be used for the actual whitening computation. """ num_chan = 4 - recording = self.get_empty_custom_recording(2, num_chan, dtype=np.float32) + num_samples = 10000 + + recording = NumpyRecording( + [np.zeros((num_samples, num_chan))] * 2, + sampling_frequency=30000, + ) test_W = np.random.normal(size=(num_chan, num_chan)) test_M = np.random.normal(size=num_chan) From 0f97f46b05a11426ada6d2e5602c9551a6310127 Mon Sep 17 00:00:00 2001 From: JoeZiminski Date: Fri, 15 Nov 2024 11:11:12 +0000 Subject: [PATCH 09/14] More tidy ups. --- .../preprocessing/tests/test_whiten.py | 24 +++++++------------ 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/src/spikeinterface/preprocessing/tests/test_whiten.py b/src/spikeinterface/preprocessing/tests/test_whiten.py index 7aa3bf6705..22297839f1 100644 --- a/src/spikeinterface/preprocessing/tests/test_whiten.py +++ b/src/spikeinterface/preprocessing/tests/test_whiten.py @@ -15,11 +15,6 @@ HAS_SKLEARN = False -################################################# -# Test Class -################################################# - - class TestWhiten: """ Test the whitening preprocessing step. @@ -30,7 +25,7 @@ class TestWhiten: returned data is indeed white. """ - def get_test_recording(self, num_segments, dtype, means=None): + def get_test_recording(self, dtype, means=None): """ Generate a set of test data with known covariance matrix and mean. Test data is drawn from a multivariate Gaussian distribute with @@ -45,11 +40,6 @@ def get_test_recording(self, num_segments, dtype, means=None): Parameters ---------- - num_segments : int - Number of segments for the recording. Note that only the first - segment is filled with data. Data for other segments must be - set manually. - dtype : np.float32 | np.int16 Datatype of the generated recording. @@ -67,7 +57,10 @@ def get_test_recording(self, num_segments, dtype, means=None): return means, cov_mat, recording def get_test_data_with_known_distribution(self, num_samples, dtype, means=None): - """ """ + """ + Create multivariate normal data with known means and covariance matrixs. + If `dtype` is int16, scale to full range of int16 before cast. + """ num_channels = 3 if means is None: @@ -118,8 +111,7 @@ def assert_recording_is_white(self, recording): def compute_cov_mat(self, X): """ - Estimate the covariance matrix from data - using the standard linear algebra approach. + Estimate the covariance matrix as the sample covariance. """ X = X - np.mean(X, axis=0) S = X.T @ X / X.shape[0] @@ -205,7 +197,6 @@ def test_compute_covariance_matrix_2_segments(self): recording, apply_mean=True, regularize=False, - regularize_kwargs={}, num_chunks_per_segment=1, chunk_size=recording.get_num_samples(segment_index=0) - 1, eps=eps, @@ -233,7 +224,6 @@ def test_apply_mean(self, apply_mean): recording, apply_mean=apply_mean, regularize=False, - regularize_kwargs={}, num_chunks_per_segment=1, chunk_size=recording.get_num_samples(segment_index=0) - 1, eps=eps, @@ -332,6 +322,8 @@ def test_local_vs_global_whiten(self): ) results[mode] = whitened_recording + # In local, parts of the covariance matrix are exactly zero + # (when pairs of channels are not in the same group). assert results["local"]._kwargs["W"][0][2] == 0.0 assert results["global"]._kwargs["W"][0][2] != 0.0 From 6dc1f72469624f0c2fe5326a407e8db97cd103dc Mon Sep 17 00:00:00 2001 From: JoeZiminski Date: Fri, 15 Nov 2024 11:55:20 +0000 Subject: [PATCH 10/14] Fix tests. --- src/spikeinterface/preprocessing/tests/test_whiten.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/spikeinterface/preprocessing/tests/test_whiten.py b/src/spikeinterface/preprocessing/tests/test_whiten.py index 22297839f1..e3d0d6c2e3 100644 --- a/src/spikeinterface/preprocessing/tests/test_whiten.py +++ b/src/spikeinterface/preprocessing/tests/test_whiten.py @@ -131,7 +131,7 @@ def test_compute_covariance_matrix(self, dtype): otherwise it can overflow. """ eps = 1e-16 - _, cov_mat, recording = self.get_test_recording(num_segments=1, dtype=dtype) + _, cov_mat, recording = self.get_test_recording(dtype=dtype) whitened_recording = whiten( recording, @@ -161,7 +161,7 @@ def test_non_default_eps(self): the cov mat if the correct eps is used. """ eps = 1 - _, cov_mat, recording = self.get_test_recording(num_segments=1, dtype=np.float32) + _, cov_mat, recording = self.get_test_recording(dtype=np.float32) whitened_recording = whiten( recording, @@ -218,7 +218,7 @@ def test_apply_mean(self, apply_mean): means = np.array([10, 20, 30]) eps = 1e-16 - _, cov_mat, recording = self.get_test_recording(num_segments=1, dtype=np.float32, means=means) + _, cov_mat, recording = self.get_test_recording(dtype=np.float32, means=means) whitened_recording = whiten( recording, @@ -269,7 +269,7 @@ def test_whiten_regularisation_norm(self): whitening preprocessing is the same as the one computed from sklearn when regularise kwargs are given. """ - _, _, recording = self.get_test_recording(num_segments=1, dtype=np.float32) + _, _, recording = self.get_test_recording(dtype=np.float32) whitened_recording = whiten( recording, @@ -297,7 +297,7 @@ def test_local_vs_global_whiten(self): channels are considered for whitening. Test that whitening is correct for the first pair and last pair. """ - _, _, recording = self.get_test_recording(num_segments=1, dtype=np.float32) + _, _, recording = self.get_test_recording(dtype=np.float32) y_dist = 2 recording.set_channel_locations( From e751e0a88a6e6f48a4f3c01e068cc1a5e4934eb5 Mon Sep 17 00:00:00 2001 From: Alessio Buccino Date: Mon, 3 Feb 2025 11:20:00 +0100 Subject: [PATCH 11/14] Update src/spikeinterface/preprocessing/tests/test_whiten.py --- src/spikeinterface/preprocessing/tests/test_whiten.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/spikeinterface/preprocessing/tests/test_whiten.py b/src/spikeinterface/preprocessing/tests/test_whiten.py index e3d0d6c2e3..d2b9a93a6a 100644 --- a/src/spikeinterface/preprocessing/tests/test_whiten.py +++ b/src/spikeinterface/preprocessing/tests/test_whiten.py @@ -86,7 +86,7 @@ def cov_mat_from_whitening_mat(self, whitened_recording, eps): """ The whitening matrix is computed as the inverse square root of the covariance matrix - (Sigma, 'S' below + some eps for regularising. + (Sigma, 'S' below + some eps for regularising.) Here the inverse process is performed to compute the covariance matrix from the whitening matrix From 2eb7116ef0e945756c488f99c9af76b1e6f2b585 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 3 Feb 2025 10:22:50 +0000 Subject: [PATCH 12/14] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/spikeinterface/sortingcomponents/motion/motion_utils.py | 1 - .../motion/tests/test_motion_interpolation.py | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/spikeinterface/sortingcomponents/motion/motion_utils.py b/src/spikeinterface/sortingcomponents/motion/motion_utils.py index 5c02646497..3186d5ba07 100644 --- a/src/spikeinterface/sortingcomponents/motion/motion_utils.py +++ b/src/spikeinterface/sortingcomponents/motion/motion_utils.py @@ -632,6 +632,5 @@ def ensure_time_bins(time_bin_centers_s=None, time_bin_edges_s=None): return time_bin_centers_s, time_bin_edges_s - def ensure_time_bin_edges(time_bin_centers_s=None, time_bin_edges_s=None): return ensure_time_bins(time_bin_centers_s, time_bin_edges_s)[1] diff --git a/src/spikeinterface/sortingcomponents/motion/tests/test_motion_interpolation.py b/src/spikeinterface/sortingcomponents/motion/tests/test_motion_interpolation.py index 807b8e6c9e..616c4fcbf2 100644 --- a/src/spikeinterface/sortingcomponents/motion/tests/test_motion_interpolation.py +++ b/src/spikeinterface/sortingcomponents/motion/tests/test_motion_interpolation.py @@ -12,6 +12,7 @@ from spikeinterface.sortingcomponents.tests.common import make_dataset from spikeinterface.core import generate_ground_truth_recording + def make_fake_motion(rec): # make a fake motion object @@ -25,7 +26,7 @@ def make_fake_motion(rec): seg_time_bins = np.arange(0.5, duration - 0.49, 0.5) seg_disp = np.zeros((seg_time_bins.size, spatial_bins.size)) seg_disp[:, :] = np.linspace(-30, 30, seg_time_bins.size)[:, None] - + temporal_bins.append(seg_time_bins) displacement.append(seg_disp) @@ -204,7 +205,6 @@ def test_InterpolateMotionRecording(): seed=2205, ) - motion = make_fake_motion(rec) rec2 = InterpolateMotionRecording(rec, motion, border_mode="force_extrapolate") From 25dc000277806c6df09fc5652f06d47527deccfc Mon Sep 17 00:00:00 2001 From: Alessio Buccino Date: Mon, 3 Feb 2025 12:00:18 +0100 Subject: [PATCH 13/14] Update src/spikeinterface/preprocessing/whiten.py --- src/spikeinterface/preprocessing/whiten.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/spikeinterface/preprocessing/whiten.py b/src/spikeinterface/preprocessing/whiten.py index 03e56ae20b..4576fd7480 100644 --- a/src/spikeinterface/preprocessing/whiten.py +++ b/src/spikeinterface/preprocessing/whiten.py @@ -211,7 +211,7 @@ def compute_whitening_matrix( assert radius_um is not None n = cov.shape[0] distances = get_channel_distances(recording) - W = np.zeros((n, n), dtype="float32") + W = np.zeros((n, n), dtype="float64") for c in range(n): (inds,) = np.nonzero(distances[c, :] <= radius_um) cov_local = cov[inds, :][:, inds] From 4cb48e1697afcadaed49208d3c6195cb58f47727 Mon Sep 17 00:00:00 2001 From: Alessio Buccino Date: Mon, 3 Feb 2025 12:19:28 +0100 Subject: [PATCH 14/14] Fix type mismatch in sklearn covariance --- src/spikeinterface/preprocessing/whiten.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/spikeinterface/preprocessing/whiten.py b/src/spikeinterface/preprocessing/whiten.py index 4576fd7480..00c454f8f3 100644 --- a/src/spikeinterface/preprocessing/whiten.py +++ b/src/spikeinterface/preprocessing/whiten.py @@ -211,7 +211,7 @@ def compute_whitening_matrix( assert radius_um is not None n = cov.shape[0] distances = get_channel_distances(recording) - W = np.zeros((n, n), dtype="float64") + W = np.zeros((n, n), dtype="float32") for c in range(n): (inds,) = np.nonzero(distances[c, :] <= radius_um) cov_local = cov[inds, :][:, inds] @@ -257,6 +257,7 @@ def compute_covariance_matrix(recording, apply_mean, regularize, regularize_kwar cov = cov / data.shape[0] else: cov = compute_sklearn_covariance_matrix(data, regularize_kwargs) + cov = cov.astype("float32") return data, cov, M @@ -277,7 +278,7 @@ def compute_sklearn_covariance_matrix(data, regularize_kwargs): regularize_kwargs["assume_centered"] = True estimator_class = getattr(sklearn.covariance, method) estimator = estimator_class(**regularize_kwargs) - estimator.fit(data) + estimator.fit(data.astype("float64")) # sklearn covariance methods require float64 cov = estimator.covariance_ return cov