-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathpysolo_sleep_fun.py
282 lines (209 loc) · 7.08 KB
/
pysolo_sleep_fun.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
#####
## ARRAY MANIPULATIVE FUNCTIONS
#####
## The current numpy/scipy version seem to have some problem manipulating some
## masked arrays or nan arrays so here we overcome with custom made functions
## hopefully they soon will be obsolete
import numpy as np
import scipy.stats.stats as stats
from numpy.ma import *
def hasMasked(a, axis=None):
'''
Return True if there are Masked values along the given axis.
'''
a, axis = _chk_asarray (a, axis)
if axis == None:
a = a.ravel()
axis = 0
return (np.ma.getmask(a).sum(axis) != 0 )
def permute(listPerm):
'''
Return all possible and different permutations of a list
'''
np = []
for i in range(0,len(listPerm)):
for p in range (i, len(listPerm)):
if i != p:
np.append((listPerm[i], listPerm[p]))
return np
def hasNaN(a):
'''
return True if the target is a NaN or if it's an array containing NaNs
'''
a = np.asarray(a)
return np.isnan(a).any()
def std(a, axis=None):
'''
Return std value along the given axis.
It is able to discriminate between masked and unmaked array and
behave accordingly
'''
a, axis = _chk_asarray (a, axis)
if axis == None:
a = a.ravel()
if np.shape(a) == ():
axis = None
if not np.isscalar(a):
std_out = a.std(axis)
else:
std_out = 0.
return std_out
def stde(a, axis=None):
'''
Calculates the standard error of a distribution
'''
if axis == None:
a = a.ravel()
axis = 0
if np.shape(a) == ():
stde_out = 0.0
else:
masked = np.ma.getmask(a).any()
if masked:
stde_out = (a.std(axis) / np.sqrt(a.count(axis)))
else:
stde_out = ( std(a, axis) / np.sqrt(a.shape[axis]))
return stde_out
## Sleep Specific Functions
def isAllMasked(a):
'''
Return true if all the contents of the given array are masked objects
'''
return np.ma.getmask(a).all()
def SleepAmountByFly(s5, t0=None, t1=None ):
'''
SleepAmountByFly(s5, t0=None, t1=None )
Takes the 3D array s5 and returns a 2D array containing
the amount of sleep for each fly, for each day between the time interval t0, t1
'''
return s5[:,:,t0:t1].sum(axis=2)
def ActivityIndexByFly(ax, s5, t0=None, t1=None):
'''
ActivityIndexByFly(ax, s5, t0=None, t1=None)
Takes the 3D arrays ax and s5 and returns a 2D array containing
the AI for each fly, for each day between the time interval t0, t1
'''
tot_activity = ax[:,:,t0:t1].sum(axis=2)
tot_wake_time = float(ax.shape[2]) - s5[:,:,t0:t1].sum(axis=2)
return tot_activity / tot_wake_time
def number_sleep_episodes(s5, t0 = None, t1 = None):
'''
Returns the length of all sleep episodes in the given interval (t0,t1)
'''
d = len(s5.shape)
if d == 1: s5 = s5.reshape((1,1,-1))
if d == 2: s5 = s5.reshape((1,-1,-1))
s5_1 = s5[:,:,t0:t1]
s5_1[:,:,0]=0 #WE NEED TO DO THIS
s5_2 = np.roll(s5_1, -1, axis=2) #copy and roll everything 1 position on the left
# subtract the two.
trans = s5_1 - s5_2
# in the resulting array
# 0 indicates continuity
# -1 indicates W->S transition
# +1 indicates S->W transition
w2s = (trans == -1)
return w2s.sum(axis = 2)
def sleep_latency(s5, lightsoff=720):
"""
returns sleep latency in minutes, that is time between lights off and first recorded sleep episode of at least 5 minutes
"""
grid = np.indices(s5.shape)[-1] # compile a grid containing the indices of the array
k = grid * s5 # keep only the indices where s5 is 1
sl = np.argmax(k > lightsoff, axis=-1) - lightsoff
return np.ma.masked_less(sl, 0)
def all_sleep_episodes(s5, t0 = None, t1 = None):
'''
Returns the length of all sleep episodes in the given interval (t0,t1)
'''
d = len(s5.shape)
if d == 1: s5 = s5.reshape((1,1,-1))
if d == 2: s5 = s5.reshape((1,-1,-1))
s5_1 = s5[:,:,t0:t1]
s5_1[:,:,0]=0 #WE NEED TO DO THIS
s5_2 = np.roll(s5_1, -1, axis=2) #copy and roll everything 1 position on the left
# subtract the two.
trans = s5_1 - s5_2
# in the resulting array
# 0 indicates continuity
# -1 indicates W->S transition
# +1 indicates S->W transition
# now takes the coordinates on the third dimension of where the transition w2s occurred
w2s = np.ma.where(trans == -1)[2]
#do the same on the other transition
s2w = np.ma.where(trans == 1)[2]
#subtract one from the other to know length in minutes
return (s2w - w2s)
def ba(s5, t0 = None, t1 = None):
'''
Compute brief awakenings analysis.
A brief awakening is defined as 1 minute of activity between two stretches of sleep.
returns an array "ba". the number of ba can be obtained as ba.sum(axis=2)
'''
s5_1 = s5[:,:,t0:t1]
s5_2 = np.roll(s5_1, -1, axis=2)
s5_3 = np.roll(s5_1, +1, axis=2)
# we subtract the value of position +1 and -1 frm position 0
ba_1 = s5_1 - (s5_2 + s5_3)
ba_2 = (ba_1 == -2)
# lastly we convert the data_type from boolean to integer and we return it
dt = s5.dtype
return ba_2.astype(dt)
def concatenate(arrays, axis=0):
'''
We need to bypass the crash in case the user is doing a concatenation he should not
this functions try to understand on which axis do the concatenation
axis can be the number referring to the axis or 'auto'
'''
try:
return np.ma.concatenate (arrays, axis)
except:
return arrays[0]
def _chk_asarray(a, axis = None):
a = np.ma.asarray(a)
if axis is None:
a = a.ravel()
outaxis = 0
else:
outaxis = axis
return a, outaxis
def _chk2_asarray(a, b, axis = None):
if axis is None:
a = a.ravel()
b = b.ravel()
outaxis = 0
else:
a = np.ma.asarray(a)
b = np.ma.asarray(b)
outaxis = axis
return a, b, outaxis
def ttest_ind(a, b, axis=0):
"""Calculates the t-obtained T-test on TWO INDEPENDENT samples of scores
a, and b. From Numerical Recipies, p.483. Axis can equal None (ravel
array first), or an integer (the axis over which to operate on a and b).
Returns: t-value, two-tailed p-value
"""
a, b, axis = _chk2_asarray(a, b, axis)
x1 = np.average( a, axis)
x2 = np.average( b, axis)
v1 = a.var(axis)
v2 = b.var(axis)
if np.ma.getmask(a).any():
n1 = a.shape[axis] - a.mask.sum(axis)
else:
n1 = a.shape[axis]
if np.ma.getmask(b).any():
n2 = b.shape[axis] - b.mask.sum(axis)
else:
n2 = b.shape[axis]
df = n1+n2-2
svar = ((n1-1.0)*v1+(n2-1.0)*v2) / df
zerodivproblem = svar == 0
t = (x1-x2)/np.sqrt(svar*(1.0/n1 + 1.0/n2)) # N-D COMPUTATION HERE!!!!!!
t = np.where(zerodivproblem, 1.0, t) # replace NaN t-values with 1.0
probs = stats.betai(0.5*df, 0.5, df /(df+t*t))
if not np.isscalar(t):
probs = probs.reshape(t.shape)
if not np.isscalar(probs) and len(probs) == 1:
probs = probs[0]
return t, probs