diff --git a/tests/test_Inference.py b/tests/test_Inference.py index 39431f5..2a2a212 100644 --- a/tests/test_Inference.py +++ b/tests/test_Inference.py @@ -1,11 +1,7 @@ -""" Create artificial test data or use the Wine Quality Dataset from the UCI Machine Learning Repository to test the -Inference class. The Inference class is used to train and test different regression models (e.g., Ridge, MLPRegressor, -SNPE) for parameter estimation. - """ +""" This script generates artificial electrophysiological data and tests the functionality of the Inference class. """ import os import sys - import matplotlib.pyplot as plt import numpy as np import pandas as pd @@ -39,24 +35,21 @@ def cohen_d(x, y): dof = nx + ny - 2 return (np.mean(x) - np.mean(y)) / np.sqrt(((nx-1)*np.var(x) + (ny-1)*np.var(y)) / dof) -def create_artificial_data(dataset_type=0, n_samples=1000): - """ Create artificial data for testing. - - Create a Pandas DataFrame with the following columns: 'ID', 'Group', 'Epoch', 'Sensor' and 'Data', and the following - attributes: 'Recording' and 'fs'. The 'ID' column contains the unique identifier of the subject or animal. The - 'Group' column defines the group in which the subject/animal has been classified: for example, 'Control' or 'AD'. - The 'Epoch' column contains the epoch number. The 'Sensor' column contains the sensor number. The 'Data' column - contains the time-series data values. The 'Recording' attribute contains the type of recording (e.g., 'LFP') and - the 'fs' attribute contains the sampling frequency of the data. +def create_artificial_data(n_samples=1000, len_data=1000): + """ Generate artificial data with dynamic properties resembling those of electrophysiological signals + for testing purposes. Create a Pandas DataFrame with the following columns: 'ID', 'Group', 'Epoch', 'Sensor' and + 'Data', and the following attributes: 'Recording' and 'fs'. The 'ID' column contains the unique identifier of the + subject or animal. The 'Group' column defines the group in which the subject/animal has been classified: for example, + 'Control' or 'AD'. The 'Epoch' column contains the epoch number. The 'Sensor' column contains the sensor identifier. + The 'Data' column contains the time-series data values. The 'Recording' attribute contains the type of recording + (e.g., 'LFP' or 'EEG') and the 'fs' attribute contains the sampling frequency of the data. Parameters ---------- - dataset_type: int - Type of dataset to generate. The following types are available: - - 0: Generate a dataset with time-series data. - - 1: Use the Wine Quality Dataset from the UCI Machine Learning Repository. n_samples : int - Number of samples to generate (only used for dataset_type=0). + Number of samples to generate. + len_data : int + Length of the time-series data. Returns ------- @@ -64,46 +57,27 @@ def create_artificial_data(dataset_type=0, n_samples=1000): Pandas DataFrame containing the artificial data """ - if dataset_type == 0: - # Create a list of unique identifiers (there is a total of 10 subjects/animals). - ID = np.repeat(np.arange(1, 11), int(n_samples/10)) - # Create a list of groups. There are two groups: 'Control' and 'Experiment'. - Group = np.repeat(['Control', 'Experiment'], int(n_samples/2)) - # Create a list of epochs. There are n_samples/10 epochs for each subject/animal. - Epoch = np.tile(np.arange(1, int(n_samples/10)+1), 10) - # Create a list of sensors (there is just one sensor). - Sensor = np.ones(n_samples) - - # Create a list of time-series data values. The data values are generated by convolving a Poisson spike train with - # an exponential kernel, and adding Gaussian noise. The length of the time-series data is 1000 samples. - Data = [] - for i in range(n_samples): - spike_train = np.random.poisson(0.1, 1500) - if Group[i] == 'Control': - kernel = np.exp(-np.arange(0, 1500)/10.0) - elif Group[i] == 'Experiment': - kernel = np.exp(-np.arange(0, 1500)/20.0) - data = np.convolve(spike_train, kernel, mode='full')[250:1250] + np.random.normal(0, 0.1, 1000) - Data.append(data) - - elif dataset_type == 1: - # Download the Wine Quality Dataset - print('Downloading the Wine Quality Dataset...') - wq_df = pd.read_csv( - 'https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv', - sep=';') - # There are as many subjects as samples in the Wine Quality Dataset - ID = np.arange(1, wq_df.shape[0]+1) - # There are two groups: 'Control' (Quality < 6.) and 'Experiment' (Quality >= 6.) - Group = np.where(wq_df['quality'] < 6., 'Control', 'Experiment') - # There is just one epoch per subject - Epoch = np.ones(wq_df.shape[0]) - # There is just one sensor - Sensor = np.ones(wq_df.shape[0]) - # The data values are the features of the Wine Quality Dataset (excluding the 'quality' column) - Data = wq_df.drop('quality', axis=1).values - # Transform Data to a list of lists - Data = [list(Data[i]) for i in range(Data.shape[0])] + # Create a list of unique identifiers. In this example, there is a total of 10 subjects/animals. + ID = np.repeat(np.arange(1, 11), int(n_samples/10)) + # Create a list of groups. There are two groups: 'Control' and 'Experiment'. + Group = np.repeat(['Control', 'Experiment'], int(n_samples/2)) + # Create a list of epochs. There are n_samples/10 epochs for each subject/animal. + Epoch = np.tile(np.arange(1, int(n_samples/10)+1), 10) + # Create a list of sensors (there is just one sensor). + Sensor = np.ones(n_samples) + + # Create a list of time-series data values. The data values are generated by convolving a Poisson spike train with + # an exponential kernel, and adding Gaussian noise. + Data = [] + for i in range(n_samples): + spike_train = np.random.poisson(0.1, len_data+500) + if Group[i] == 'Control': + kernel = np.exp(-np.arange(0, len_data+500)/10.0) + elif Group[i] == 'Experiment': + kernel = np.exp(-np.arange(0, len_data+500)/14.0) + data = (np.convolve(spike_train, kernel, mode='full')[250:len_data+250] + + np.random.normal(0, 0.1, len_data)) + Data.append(data) # Create the Pandas DataFrame. df = pd.DataFrame({'ID': ID, 'Group': Group, 'Epoch': Epoch, 'Sensor': Sensor, 'Data': Data}) @@ -114,24 +88,31 @@ def create_artificial_data(dataset_type=0, n_samples=1000): if __name__ == '__main__': + # Set the seed + np.random.seed(0) + # Create the dataset - data = create_artificial_data(dataset_type=0,n_samples=5000) + data = create_artificial_data(n_samples=2000, len_data=1000) # Compute features - if len(data['Data'][0]) > 100: - features = ncpi.Features(method='catch22') - data = features.compute_features(data) - # If the data is already in feature format (Wine Quality Dataset), just rename the 'Data' column to 'Features' - else: - data['Features'] = data['Data'] - - # # Plot a random feature as a function of the group - # rand_feat = np.random.randint(0, len(data['Features'][0])) - # rand_feat_pd = pd.DataFrame({'Group': data['Group'], - # 'Feature': data['Features'].apply(lambda x: x[rand_feat])}) - # sns.boxplot(x='Group', y='Feature', data=rand_feat_pd) - # plt.title(f'Feature {rand_feat}') - # plt.show() + features = ncpi.Features(method='catch22') + data = features.compute_features(data) + + # Plot all features as a function of the group + plt.figure(dpi=300) + plt.rc('font', size=8) + plt.rc('font', family='Arial') + + for i in range(len(data['Features'][0])): + plt.subplot(5, 5, i + 1) + sns.boxplot(x='Group', y='Feature', data=pd.DataFrame({'Group': data['Group'], + 'Feature': data['Features'].apply(lambda x: x[i])}), + palette='Set2', legend=False, hue='Group') + plt.title(f'Feature {i}') + plt.gca().set_xticklabels([]) + plt.xlabel('') + plt.ylabel('') + plt.tight_layout() # Split data into training and testing sets train_data = data.sample(frac=0.9) @@ -148,24 +129,16 @@ def create_artificial_data(dataset_type=0, n_samples=1000): # Test different regression models models = ['Ridge', 'MLPRegressor','SNPE'] # Hyperparameters to be optimized - hyperparams_opt = {'Ridge': [{'alpha': 0.1}, {'alpha': 1.0}, {'alpha': 10.0}, {'alpha': 100.0}], + hyperparams_opt = {'Ridge': [{'alpha': 0.01}, {'alpha': 0.1}, {'alpha': 1.}, {'alpha': 10.}, {'alpha': 100.}], 'MLPRegressor': - [{'hidden_layer_sizes': (25,), 'max_iter': 1000, 'tol': 1e-2, 'n_iter_no_change': 4}, - {'hidden_layer_sizes': (50,), 'max_iter': 1000, 'tol': 1e-2, 'n_iter_no_change': 4}, - {'hidden_layer_sizes': (100,), 'max_iter': 1000, 'tol': 1e-2, 'n_iter_no_change': 4}, - {'hidden_layer_sizes': (25,25), 'max_iter': 1000, 'tol': 1e-2, 'n_iter_no_change': 4}, - {'hidden_layer_sizes': (50,50), 'max_iter': 1000, 'tol': 1e-2, 'n_iter_no_change': 4}, - {'hidden_layer_sizes': (100,100), 'max_iter': 1000, 'tol': 1e-2, 'n_iter_no_change': 4}, - {'hidden_layer_sizes': (25,25,25), 'max_iter': 1000, 'tol': 1e-2, 'n_iter_no_change': 4}, - {'hidden_layer_sizes': (50,50,50), 'max_iter': 1000, 'tol': 1e-2, 'n_iter_no_change': 4}, - {'hidden_layer_sizes': (100,100,100), 'max_iter': 1000, 'tol': 1e-2, 'n_iter_no_change': 4}], + [{'hidden_layer_sizes': (25, 25, 25), 'max_iter': 1000, 'tol': 1e-2}, + {'hidden_layer_sizes': (50, 50, 50), 'max_iter': 1000, 'tol': 1e-2}, + {'hidden_layer_sizes': (100, 100, 100), 'max_iter': 1000, 'tol': 1e-2}], 'SNPE': - [{'prior': None, 'density_estimator': { - 'model':"maf", 'hidden_features':2, 'num_transforms':1}}, - {'prior': None, 'density_estimator': { - 'model':"maf", 'hidden_features':5, 'num_transforms':1}}, - {'prior': None, 'density_estimator': { - 'model':"maf", 'hidden_features':10, 'num_transforms':1}} + [{'prior': None, 'density_estimator': {'model': "maf", 'hidden_features': 2, + 'num_transforms': 1}}, + {'prior': None, 'density_estimator': {'model': "maf", 'hidden_features': 2, + 'num_transforms': 2}} ]} predictions = {} @@ -190,14 +163,16 @@ def create_artificial_data(dataset_type=0, n_samples=1000): else: inference.train(param_grid=hyperparams_opt[model], n_splits=5, n_repeats=1) - # Test the model + # Compute predictions print(f'\nComputing predictions for {model}...') - predictions[model+'_'+mode] = inference.predict(np.array(test_data['Features'].tolist())) + preds = inference.predict(np.array(test_data['Features'].tolist())) + if model == 'SNPE': + predictions[model+'_'+mode] = [preds[i][0] for i in range(len(preds))] + else: + predictions[model+'_'+mode] = preds # Plot results plt.figure(dpi = 300) - plt.rc('font', size=8) - plt.rc('font', family='Arial') for i,model in enumerate(models): for j,label in enumerate(['no_param_grid', 'param_grid']): @@ -212,7 +187,7 @@ def create_artificial_data(dataset_type=0, n_samples=1000): horizontalalignment='center', verticalalignment='center', transform=plt.gca().transAxes) - # Title and labels + plt.title(f'{model} ({label})') plt.ylabel('') plt.xlabel('')