-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspecimage.py
2059 lines (1917 loc) · 102 KB
/
specimage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
"""
Class SpecImage that holds info for a set of spectra (usually from an image file)
Assume image file has format "x y i1 i2 ... iN" (that is, one whole pixel spectrum per line) and first line has wavelengths
It also assumes that wavelengths are ordered (increasing or decreasing, but not random)
"""
import matplotlib
matplotlib.use('Agg') # do not use the X11 as default, e.g. on HPC (must be before any other module)
import gzip
import pickle
import re # regular expressions, perl-style
import os, re, gzip, string, sys
import math
import numpy as np
import statsmodels.api as sm
from matplotlib import colors
from matplotlib.collections import LineCollection
import matplotlib.pyplot as plt
from scipy import interpolate, ndimage, signal, sparse, spatial, optimize, stats
from sklearn import manifold, metrics, cluster, neighbors, decomposition, preprocessing
from collections import Counter
global_vars = [True] ## used by multiprocessing.Pool
class SpecImage:
"""
* wl is a 1D array with the wave numbers (in cm-1)
* spc is a 2D array with the spectra. Size may differ from size of XY since spc stores only valid, and distinct spectra
* xy are the _original_ image coordinates (that is, before subsampling, removal, etc.)
* xy_null is a list of coordinates for all removed/invalid spectra
* idx is a mapping between XY and SPC, with same size as XY (since several pixels may point to same spectrum).
"""
def __init__(self, copyfrom=None, filename=None):
self.xy_null = None
if filename:
import h5py
mlab=h5py.File(filename,"r")
self.wl = np.array(mlab["wl"],dtype=float)
self.xy = np.array(mlab["xy"],dtype=float)
self.spc = np.array(mlab["spc"],dtype=float)
self.idx = np.array(mlab["idx"],dtype=int)
self.description = str(mlab["description"])
if "xy_null" in mlab:
self.xy_null = np.array(mlab["xy_null"],dtype=int)
mlab.close()
else:
if copyfrom is None:
self.wl = np.array([],dtype=float)
self.xy = np.array([],dtype=float).reshape(-1,2)
self.spc = np.array([[]],dtype=float)
self.idx = np.array([],dtype=int) # ID wrt the original file
self.description = ""
else:
self.wl = np.array(copyfrom.wl, copy=True)
self.xy = np.array(copyfrom.xy, copy=True)
self.spc = np.array(copyfrom.spc, copy=True)
self.idx = np.array(copyfrom.idx, copy=True)
self.description = copyfrom.description
try:
self.xy_null = copyfrom.xy_null
except AttributeError:
# print "SpecImage object is from old module version, which does not have xy_null parameter. Which is fine by the way."
pass
self.remove_nan_pixels()
def save(self, filename=None):
import h5py
if filename is None:
filename = "spc" + self.description.strip() + ".mat"
mlab=h5py.File(filename,"w")
mlab.create_dataset("wl", data=self.wl, compression="gzip")
mlab.create_dataset("xy", data=self.xy, compression="gzip")
mlab.create_dataset("spc", data=self.spc, compression="gzip")
mlab.create_dataset("idx", data=self.idx, compression="gzip")
mlab.create_dataset("description", data=self.description) # scalars can't be compressed
try: ## if self.xy_null is not None:
## http://stackoverflow.com/questions/610883/how-to-know-if-an-object-has-an-attribute-in-python
mlab.create_dataset("xy_null", data=self.xy_null, compression="gzip")
except:
pass
mlab.close()
def read_from_matlab(self, mfilename=None, varname=None):
if (mfilename is None):
print ("I need the name of matlab file\n")
return
import scipy.io
mlab=scipy.io.loadmat(mfilename)
if (varname is None):
varname = "Data_rr" # guess
varname = str(varname)
img=mlab[varname]
if (len(img.shape) == 2): # no dimensional info; must guess (closest to a square)
n_x = int(factorise(img.shape[0])[0]) # first element is closest to srqt(n)
n_y = int(img.shape[0]/n_x)
else: # img.shape == 3 dimensions
n_x = img.shape[1]
n_y = img.shape[0]
#EXAMPLE# img = np.transpose(img,(1,0,2)) # invert X and Y leaving wl untouched
self.xy = np.array([[n_x-1-x,y] for y in range(n_y) for x in range(n_x)], dtype=float)
self.wl = np.array(mlab["xaxis"][:,0], dtype=float) # it's an Nx1 matrix, we want the "first" (and only) column
self.spc = np.array(img,dtype=float).reshape(n_x * n_y, -1) # minus one means to calculate; ammounts to the number of wavelengths
self.idx = np.arange(0, n_x * n_y)
if self.wl[0] > self.wl[-1]:
self.wl = self.wl[::-1] #same as self.wl.reverse()
self.spc = np.fliplr(self.spc) # flipr() == [::-1] == inverts the order left-right
self.remove_nan_pixels()
self.description=varname
return self
def read_witec(self, filename=None, excitation_wavelength=None, is_zipped=None):
"""
Import WiTec text files. Three files must be present, with suffixes "Header", "X-Axis" and "Y-axis". The Y-Axis
file may be zipped, since tends to be very large
"""
if is_zipped is None:
zipped = False
if filename is None:
print ("I need the (single) common prefix for the 3 filenames (before the \"(Header).txt\" etc.)\n")
return
for line in open (filename + "(Header).txt", "r"):
if "SizeGraph" in line: # redundant: "X-Axis" file will also give this info
size_graph = int(line.split(" ")[-1].strip()) # store last column using string.split()
if "SizeX" in line:
n_x = int(line.split(" ")[-1].strip()) # store last column using string.split()
if "SizeY" in line:
n_y = int(line.split(" ")[-1].strip()) # store last column using string.split()
self.wl = np.array([float(line) for line in open (filename + "(X-Axis).txt", "r")], dtype=float) # assumes ONE column file
if excitation_wavelength:
self.wl = (1./float(excitation_wavelength) - 1./self.wl) * 10**7 # transform wavelengths (nm) into wave numbers (cm-1)
if size_graph != self.wl.shape[0]:
print ("Wave length 'SizeGraph' described by Header (= " + size_graph + ") disagrees with X-Axis file (= " + \
self.wl.shape[0] + ")\n")
if is_zipped:
# minus one on reshape -> divide total size by new size (will lead to wl size)
self.spc = np.array([float(line) for line in gzip.open (filename + "(Y-Axis).txt.gz", "r")], dtype=float).reshape(n_x * n_y, -1)
else:
self.spc = np.array([float(line) for line in open (filename + "(Y-Axis).txt", "r")], dtype=float).reshape(n_x * n_y, -1)
self.idx = np.arange(0, n_x * n_y)
self.xy = np.array([[x,y] for y in range(n_y) for x in range(n_x)], dtype=float)
self.description = filename
return self
def read_image(self, filename, is_zipped=None, remove_zero_wl=None):
if is_zipped is None:
is_zipped=True
if is_zipped:
ifile = gzip.open(filename, "r")
else:
ifile = open(filename, "r")
if remove_zero_wl is None:
remove_zero_wl = True
is_reversed = False
self.description = str(filename)
line = ifile.readline().rstrip("\n\t ") # first line are wavelengths
self.wl = np.array([float(i) for i in re.split("\s+",line.lstrip()) if i])
# check if wave numbers are in crescent order
if self.wl[0] > self.wl[-1]:
is_reversed = True
self.wl = self.wl[::-1] #same as self.wl.reverse()
i_idx = 2; f_idx = len(self.wl) + 2 # plus two since dlines[] will also have XY info
if remove_zero_wl:# find smallest index larger than "zero" (~10) wave number to define slice
zero_idx = next(i for i,j in enumerate(self.wl) if j > 10)
self.wl = self.wl[zero_idx:]
if is_reversed:
f_idx -= zero_idx
else:
i_idx += zero_idx
# now read XY and SPC values
line = ifile.readline().rstrip("\n\t ")
dlines = []
while line: # first read whole file in memory as list (much faster than updating spc)
dlines.append([float(i) for i in re.split("\s+",line.lstrip().rstrip())])
line = ifile.readline().rstrip("\n\t ")
if not len(dlines)%1000:
sys.stdout.write("."); sys.stdout.flush()
sys.stdout.write("*"); sys.stdout.flush()
ifile.close()
self.xy = np.array([x[:2] for x in dlines], dtype=float) # first elements are X,Y
self.spc = np.array([x[i_idx:f_idx] for x in dlines], dtype=float) # other elements are intensities at this location
self.idx = np.arange(len(self.xy))
if is_reversed:
self.spc = np.fliplr(self.spc) # flipr() == [::-1] == inverts the order left-right
self.remove_nan_pixels()
sys.stdout.write("+\n");
return self
def change_resolution(self, resolution=None):
"""
"""
if resolution is None:
resolution = 200
xy=np.array(xy, dtype=float)
if xy_null is not None:
xy=np.concatenate((xy, xy_null),0)
if null_zvalue is None:
null_zvalue = z.max() + 0.1 * np.std(z)
z =np.concatenate((z,np.repeat(null_zvalue, xy_null.shape[0])),0)
# from http://stackoverflow.com/questions/15586919/inputs-x-and-y-must-be-1d-or-2d-error-in-matplotlib
rangeX = max(xy[:,0]) - min(xy[:,0])
rangeY = max(xy[:,1]) - min(xy[:,1])
rmax = max(rangeX,rangeY)
rangeX /= rmax
rangeY /= rmax
xi = np.linspace(min(xy[:,0]), max(xy[:,0]), int(resolution * rangeX))
yi = np.linspace(min(xy[:,1]), max(xy[:,1]), int(resolution * rangeY))
X, Y = np.meshgrid(xi, yi)
Z = interpolate.griddata((xy[:,0], xy[:,1]), np.array(z), (X, Y), method='nearest')
extent=[min(xy[:,0]),max(xy[:,0]),min(xy[:,1]),max(xy[:,1])]
return Z, extent
def set_valid_pixels(self, valid_pixels, are_invalid = None):
"""
reduces self.spc to only those elements in valid_pixels[]; IDX is updated with new indexes, and temporary values "-1"
indicate that XY and IDX should be updated (removed). valid_pixels[] can also be the list of invalid indexes, if
the boolean are_invalid is set.
"""
if are_invalid is True:
valid_pixels = np.array(list( set(range(self.spc.shape[0])) - set(valid_pixels) )) # complement to invalid list
valid_pixels = np.array(valid_pixels)
if (valid_pixels.shape[0] == self.spc.shape[0]):
return ## nothing to do
valid_pix = [int(i) for i in sorted(set(valid_pixels))] # although set had sorted elements already
self.spc = np.array([self.spc[i] for i in valid_pix], dtype=float)
valid_dict = {j:i for i,j in enumerate(valid_pix)}
valid = []
invalid = []
for i in range(len(self.idx)):
try:
self.idx[i] = valid_dict[self.idx[i]] # new value is simply index in spc[] (same location as valid[])
valid.append(i)
except KeyError:
self.idx[i] = -1
invalid.append(i)
try:
self.xy_null = np.concatenate((self.xy_null,self.xy[invalid]),0)
except AttributeError:
self.xy_null = np.array(self.xy[invalid],dtype=float)
except ValueError:
self.xy_null = np.array(self.xy[invalid],dtype=float)
self.idx = self.idx[valid]
self.xy = self.xy[valid]
return self
def quick_remove_pixels_list(self, invalid = None):
"""
Quick-and-dirty solution to remove pixels before anything else is done. Work-around for cosmic spike removal, for
instance.
OBS: Does NOT work if index list is used (that is, any mapping/simplification was applied), since assumes XY and IDX
lists have same size (number of pixels in SPC.)
"""
if invalid is None:
return self
valid_pixels = np.array(list( set(range(self.spc.shape[0])) - set(invalid) )) # complement to invalid list
self.spc = self.spc[valid_pixels]
##self.idx = self.idx[valid_pixels]
self.idx = np.arange(valid_pixels.shape[0])
self.xy = self.xy[valid_pixels]
return self
def remove_nan_pixels(self):
invalid_bool = [math.isnan(np.mean(spec)) for spec in self.spc]
if not any(invalid_bool):
return self
self.set_valid_pixels([i for i in range(len(invalid_bool)) if not invalid_bool[i]])
return self
def remove_negative_valued_pixels(self, tolerance=None):
if tolerance is None:
tolerance = 0.01
tolerance = int(tolerance * self.spc.shape[1]) # how many wave numbers
invalid_bool = [(np.sum(spec<0) > tolerance) for spec in self.spc]
if not any(invalid_bool):
return self
self.set_valid_pixels([i for i in range(len(invalid_bool)) if not invalid_bool[i]])
return self
def remove_noisy_pixels(self, valid_regions=None, debug=None, find_only=None):
"""
Remove spectra that look like white noise, based on the p-values of the autocorrelation function (ACF). Also
remove spectra that are too similar to a constant value (since the ACF cannot handle constant series.)
"""
if valid_regions is None:
valid_regions = [[100,3000]] # 1800~2700 is silent region but we want whole series (apart from extremes)
valid = [] # notice that include[] is a list, not an np.array
for vreg in valid_regions:
valid += [i for i in range(len(self.wl)) if (self.wl[i] >= vreg[0] and self.wl[i] <= vreg[1])]
valid_pixels = []
invalid = []
for i in range(self.spc.shape[0]):
# baseline correction using 20% of points
spec = ndimage.morphology.white_tophat(self.spc[i][valid], None, np.repeat([1], int(self.wl.size * 0.2) + 1))
spec = spec - np.min(spec)
if spec.sum() > 1e-4: # do not run ACF if all zeros
_, _, pvalue = sm.tsa.stattools.acf(spec,nlags=20,qstat=True) # 3 arrays: acf, qstats and pvalue
sum_pvalue = sum(pvalue[:20])
else:
sum_pvalue = 1
if sum_pvalue < 0.1:
valid_pixels.append(i)
else:
invalid.append(i)
if debug and len(invalid):
sys.stdout.write("DEBUG: list of noise pixels: " + str(invalid) + "\n")
if find_only is not True:
self.set_valid_pixels(valid_pixels=valid_pixels)
return self
else: # just return a list of valid and invalid indexes
return valid_pixels, invalid
def remove_cosmic_spikes(self, sd_threshold=None, n_iter=None, debug=None, neighbours=None, mode=None):
"""
Find cosmic spikes based on standardized derivatives over pixels per wavenumber, and over wavenumbers of each
spectrum. sd_threshold is the number of standard deviations away (from a N(0,1)) to detect a peak. n_iter controls
how many iterations should it look for peaks, shuffling the pixels location and reducing SD at each iteration.
Based on idea from doi:10.1366/12-06660 (Mozharov et al. Appl Spectro 2012, p.1326), with important differences:
(1) z-score computation to find outliers, removing conditional tests (since spikes _can_ leak to neighboring pixels)
(2) complete hyperspec scan over wavenumbers and pixels
(3) spike removal by baseline-type interpolation, assuming closest neighbors
(4) iterative permutation over pixels
"debug" produces a longer output, with [iteration, pixel, wl] list for, resp: scan over WL, scan over PX, true
peaks (found in both) and false peaks (found in only one of scans)
"""
if mode is None:
mode = 0
if neighbours is None:
neighbours = 1
if neighbours> 10:
neighbours = 10
if sd_threshold is None or sd_threshold < 2: # S.D. threshold for assigning a spike
sd_threshold = 6
if n_iter is None or n_iter < 1:
n_iter = 2
sd = np.linspace (2 * sd_threshold, sd_threshold, n_iter)
if (n_iter < 2):
sd = [sd_threshold] # only one iteration
no_px = list(range(self.spc.shape[0])) # reordering of pixels (spectra)
no_wl = list(range(self.spc.shape[1])) # reordering of wavenumbers
if debug:
truepeak = np.array([[]],dtype=int).reshape(0,3)
falspeak = np.array([[]],dtype=int).reshape(0,3)
wl_peak = np.array([[]],dtype=int).reshape(0,3)
px_peak = np.array([[]],dtype=int).reshape(0,3)
for iteration in range(n_iter):
np.random.shuffle(no_px) # in-place reordering
pixel_wl_1 = np.array([], dtype=int).reshape(0,2)
pixel_wl_2 = np.array([], dtype=int).reshape(0,2)
for wl in no_wl:
dif = np.diff(self.spc[no_px,wl]) # vector of pixels, in random order, for this wavenumber wl
dif = (dif - np.mean(dif))/np.std(dif)
list_of_pixels = np.where(dif < - sd[iteration])[0] # list of indexes with extreme values (potentials spike)
# create a pair [pixel, wl] for each peak at this wavenumber and add to overall list
if list_of_pixels.shape[0]:
pixel_wl_1 = np.concatenate((pixel_wl_1, np.array([[no_px[i],wl] for i in list_of_pixels])))
for px in no_px: # it can be in natural order
# remove obvious trends, and possibly use random order of wavenumbers, before differentiating
dif = np.diff(ndimage.morphology.white_tophat(self.spc[px,no_wl], None, np.repeat([1], int(self.wl.size * 0.1) + 1)))
dif = (dif - np.mean(dif))/np.std(dif)
list_of_wl = np.where(dif < - sd[iteration])[0] # list of wavenumbers with extreme values (potentials spike)
# create a pair [pixel, wl] for each peak from this spectrum and add to overall list
if list_of_wl.shape[0]:
pixel_wl_2 = np.concatenate((pixel_wl_2, np.array([[px,no_wl[i]] for i in list_of_wl])))
# http://stackoverflow.com/questions/9269681/intersection-of-2d-numpy-ndarrays
p1_view = pixel_wl_1.view([('', int)] * pixel_wl_1.shape[1]) # pairs [pixel,wl] become tuples
p2_view = pixel_wl_2.view([('', int)] * pixel_wl_2.shape[1]) # btw, we know that pixel_wl_2.shape[1] is 2
if mode == 0: # peaks are only those found by both methods
pixel_wl = np.intersect1d(p1_view, p2_view)
elif mode == 1: # peaks are only those found for a given wl across pixels
pixel_wl = p1_view
elif mode == 2: # peaks are only those found for a given spectrum across wavenumbers
pixel_wl = p2_view
else: # peaks found by any method will be replaced
pixel_wl = np.union1d(p1_view, p2_view)
pixel_wl = pixel_wl.view(int).reshape(-1, pixel_wl_1.shape[1])
if debug: # add column with iter number to pixel_wl[], and then concatenate to big table
# table with all peaks found scanning over WL and pixels
thistab = np.hstack((np.repeat(iteration,pixel_wl_1.shape[0]).reshape(pixel_wl_1.shape[0],-1),pixel_wl_1))
wl_peak = np.concatenate((wl_peak, thistab), axis=0)
thistab = np.hstack((np.repeat(iteration,pixel_wl_2.shape[0]).reshape(pixel_wl_2.shape[0],-1),pixel_wl_2))
px_peak = np.concatenate((px_peak, thistab), axis=0)
# table with only peaks found by both ways
thistab = np.hstack((np.repeat(iteration,pixel_wl.shape[0]).reshape(pixel_wl.shape[0],-1),pixel_wl))
truepeak = np.concatenate((truepeak, thistab), axis=0)
# we also store peaks that turned out to be false since appeared in only one
false_wl = np.setxor1d(p1_view, p2_view) # points that were found in only one of the scans
false_wl = false_wl.view(int).reshape(-1, pixel_wl_1.shape[1])
thistab = np.hstack((np.repeat(iteration,false_wl.shape[0]).reshape(false_wl.shape[0],-1),false_wl))
falspeak = np.concatenate((falspeak, thistab), axis=0)
for px in set(pixel_wl[:,0]): ## pixels where there is a spike
peak_list = np.array(sorted(set(pixel_wl[np.where(pixel_wl[:,0] == px)[0],1])))
for i in range(neighbours):
peak_list = np.concatenate((peak_list, peak_list - 1, peak_list + 1)) # more neighbors
peak_list = np.array(sorted(set(peak_list)))
peak_list = peak_list[ np.where(peak_list >= 0)[0] ] # remove negative values
peak_list = peak_list[ np.where(peak_list < self.wl.shape[0])[0] ] # remove values exceeding largest WL
dif = (self.spc[px] - np.mean(self.spc[px]))/np.std(self.spc[px])
exclude_interpol = np.concatenate((peak_list, np.where(dif > 8)[0])) # remove all extreme values from interpolation
valid = np.array(list( set(range(self.wl.shape[0])) - set(exclude_interpol) )) # array, not list (safer for slices)
if valid.shape[0] > 2 * peak_list.shape[0]: # whole spectrum can be outlier if intensity larger than neighbours
#interpol = interpolate.InterpolatedUnivariateSpline(self.wl[valid], self.spc[px,valid]) # spline OO function
#interpol = interpolate.interp1d(self.wl[valid], self.spc[px,valid], kind="cubic") # cubic OO function
#self.spc[px,peak_list] = interpol(self.wl[peak_list]) # replace peaks by interpolated values
self.spc[px] = np.interp(self.wl, self.wl[valid], self.spc[px,valid]) ## much much faster than OO functions
if debug:
return self, wl_peak, px_peak, truepeak, falspeak
else:
return self
def remove_cosmic_spikes_rescaled(self, sd_threshold=None, n_iter=None, neighbours=None):
"""
Find cosmic spikes based on standardized derivatives over pixels per wavenumber, and over wavenumbers of each
spectrum. sd_threshold is the number of standard deviations away (from a N(0,1)) to detect a peak. n_iter controls
how many iterations should it look for peaks, shuffling the pixels location and reducing SD at each iteration.
Based on idea from doi:10.1366/12-06660 (Mozharov et al. Appl Spectro 2012, p.1326), with important differences:
(1) z-score computation to find outliers, removing conditional tests (since spikes _can_ leak to neighboring pixels)
(2) complete hyperspec scan over wavenumbers and pixels
(3) spike removal by baseline-type interpolation, assuming closest neighbors
(4) iterative permutation over pixels
Alternative where reshuffling takes place first, and all spectra are rescaled to the z-score
"""
if neighbours is None:
neighbours = 1
if neighbours> 10:
neighbours = 10
if sd_threshold is None or sd_threshold < 2: # S.D. threshold for assigning a spike
sd_threshold = 4
if n_iter is None or n_iter < 1:
n_iter = 5
no_px = range(self.spc.shape[0]) # reordering of pixels (spectra)
meanval = np.array([np.mean(spec) for spec in self.spc])
sdval = np.array([np.std(spec) for spec in self.spc])
self.spc = np.array([(spec - mu)/sd for spec,mu,sd in zip(self.spc,meanval,sdval)],dtype=float)
pixel_wl_1 = np.array([], dtype=int).reshape(0,2)
for iteration in range(n_iter):
np.random.shuffle(no_px) # in-place reordering
for wl in range(self.spc.shape[1]):
dif = np.diff(self.spc[no_px,wl]) # vector of pixels, in random order, for this wavenumber wl
dif = (dif - np.mean(dif))/np.std(dif)
list_of_pixels = np.where(dif < - sd_threshold)[0] # list of indexes with extreme values (potentials spike)
# create a pair [pixel, wl] for each peak at this wavenumber and add to overall list
if list_of_pixels.shape[0]:
pixel_wl_1 = np.concatenate((pixel_wl_1, np.array([[no_px[i],wl] for i in list_of_pixels])))
# array -> tuples followed by Counter().items() for frequencies
pixel_wl = np.array([i for i,j in Counter(tuple([tuple(i) for i in pixel_wl_1])).items() if (j>n_iter/2)],dtype=int)
for px in no_px:
peak_list = np.array(sorted(set(pixel_wl[np.where(pixel_wl[:,0] == px)[0],1])), dtype=int)
for i in range(neighbours):
peak_list = np.concatenate((peak_list, peak_list - 1, peak_list + 1)) # remove neighbors
peak_list = np.array(sorted(set(peak_list)),dtype=int)
peak_list = peak_list[ np.where(peak_list >= 0)[0] ] # remove negative values
peak_list = peak_list[ np.where(peak_list < self.wl.shape[0])[0] ] # remove values exceeding largest WL
dif = np.diff(ndimage.morphology.white_tophat(self.spc[px], None, np.repeat([1], int(self.wl.size * 0.2) + 1)))
dif = (dif - np.mean(dif))/np.std(dif)
peak_list = np.concatenate((peak_list, np.where(dif > 2 * sd_threshold)[0])) # remove all extreme values from interpolation
valid = np.array(list( set(range(self.wl.shape[0])) - set(peak_list) ),dtype=int) # array, not list (safer for slices)
if valid.shape[0] > 2 * peak_list.shape[0]: # whole spectrum can be outlier if intensity larger than neighbours
interpol = interpolate.InterpolatedUnivariateSpline(self.wl[valid], self.spc[px,valid], k=5) # spline OO function
for i in peak_list: # replace peak by interpolated value
self.spc[px,i] = interpol(self.wl[i])
# self.spc[px,i] = -20
# revert spectra to original scale
self.spc = np.array([spec * sd + mu for spec,mu,sd in zip(self.spc,meanval,sdval)],dtype=float)
return self
def set_valid_area(self, x=None, y=None):
if x is None:
x = False
if y is None:
y = False
valid_xy = [i for i in range(self.xy.shape[0]) if
(((not x) or (x[0] <= self.xy[i,0] <= x[1])) and ((not y) or (y[0] <= self.xy[i,1] <= y[1])))]
# valid_xy is indexed by xy (or idx) arrays; must be mapped to spc array (some repetitions maybe):
self.set_valid_pixels(self.idx[valid_xy]) # also removes elements from XY (and IDX) outside area (all map to pixel "-1")
return self
def flip_xy(self):
"""
change X and Y information (for when we mistakenly read the transpose of the image, mixing Y and Y dimensions)
"""
self.xy = np.array([[x,y] for x in sorted(set(self.xy[:,1])) for y in sorted(set(self.xy[:,0]))])
def set_subsample(self, sample=None):
if sample is None or sample >= 1.:
sample = 0.1
valid = np.random.choice(self.spc.shape[0], int(sample * self.spc.shape[0]), replace=False)
self.set_valid_pixels(valid)
return self
def set_wavelength_interval(self, lower, upper):
valid=[i for i in range(len(self.wl)) if (self.wl[i] > lower) and (self.wl[i] < upper)]
self.wl = self.wl[valid]
self.spc = self.spc[:,valid]
return self
def set_wavelength_intervals(self, valid_regions=None):
"""
Matrix of regions (in wave number units) to be maintained. Others (silent region, extremes) are excluded.
Notice that this is a newer version of the set_wavelength_interval(), which allowed only extremes...
"""
if valid_regions is None:
valid_regions = [[100,1900],[2600,3200]] # 1800~2700 is silent region, but we can keep a few values
valid = [] # notice that include[] is a list, not an np.array
for vreg in valid_regions:
valid += [i for i in range(len(self.wl)) if (self.wl[i] >= vreg[0] and self.wl[i] <= vreg[1])]
self.wl = self.wl[valid]
self.spc = self.spc[:,valid]
return self
def interpolate_spline(self, frac=None, spline=None, interval=None):
if frac==None or frac<0.01:
frac=1
if spline is None:
spline=False
if interval is None:
interval = [self.wl.min(),self.wl.max()]
if interval[0] < self.wl.min():
interval[0] = self.wl.min()
if interval[1] > self.wl.max():
interval[1] = self.wl.max()
new_wl=np.linspace(interval[0],interval[1],int(frac * len(self.wl)))
if spline:
tck=[interpolate.splrep(self.wl,row) for row in self.spc] ## it's linear iff s=0 explicitly
self.spc=np.array([interpolate.splev(new_wl,i,der=0) for i in tck],dtype=float)
else:
self.spc=np.array([np.interp(new_wl,self.wl, row) for row in self.spc],dtype=float)
self.wl=new_wl
return self
def interpolate_savgol(self, n=None, order=None, deriv=None):
"""
Interpolation followed by Savitzky-Golay filter. Notice that the SavGol filter must be applied on equally-spaced
points, and therefore this function may change the wavelengths. If further simplification (subsampling over
wavelengths) is needed, then run interpolation_spline() *after* applying the SavGol filter. "n" and "order" are the parameters
for the SavGol slidding window, while "deriv" is the order of the derivative (zero meaning no derivative calculation)
"""
if n is None:
n = 5
if not n%2:
n += 1
if order is None:
order = (n+2)/2
order = int(order)
if deriv is None or deriv is False:
deriv=0
if deriv is True or deriv > order: # deriv should be a number, but user might mistakenly use it as bool
deriv=2
new_wl=np.linspace(min(self.wl),max(self.wl),len(self.wl))
self.spc=np.array([signal.savgol_filter(np.interp(new_wl,self.wl, row), window_length=n, polyorder=order, deriv=deriv) \
for row in self.spc],dtype=float)
self.wl=new_wl
return self
def moving_average_smoothing(self, n=None, power=None):
"""
Smooths all spectra as well as wavelength using a vector of powered triangular weights in a moving average
(slidding window). A power lower than one generates a flat (uniform) weight for all 2n-1 points, while higher
values give much more weight to central points.
Notice that this function will change the wave number values and even vector size (since they must be smoothed as well)
"""
if n is None or n < 2:
n = 3
if power is None:
power = 2.
n = int(n)
weights = np.power( np.convolve (np.repeat(1./float(n), n), np.repeat(1./float(n), n)), power ) # exponentiated triangular weights
weights /= sum(weights)
self.wl = moving_average(self.wl, weights=weights)
self.spc=np.array([moving_average(row, weights=weights) for row in self.spc],dtype=float)
return self
def lowess(self, frac=None):
if frac==None or frac>0.9:
frac=0.01
self.spc=np.array([sm.nonparametric.lowess(row, self.wl, frac=frac, is_sorted=True, return_sorted=False) for row in self.spc],dtype=float)
return self
def rescale_01(self):
# for spec in np.nditer(self.spc, op_flags=['readwrite']): # overwrite values as we iterate --> does not work, goes element-wise
self.spc = np.array([rescale_spectrum_01(spec) for spec in self.spc],dtype=float)
return self
def rescale_zero(self):
self.spc = np.array([spec - min(spec) for spec in self.spc],dtype=float)
return self
def rescale_sum(self):
self.spc = np.array([spec/np.sum(spec) for spec in self.spc],dtype=float)
return self
def rescale_mean(self):
self.spc = np.array([spec/np.mean(spec) for spec in self.spc],dtype=float)
return self
def rescale_zscore(self):
self.spc = np.array([(spec - np.mean(spec))/np.std(spec) for spec in self.spc],dtype=float)
return self
def background_correction_medpolish(self, max_iter=None): # not actually a "background correction"
"""
Replaces the signals by the residuals of a two-way table with an additive model, which can be found
through the Median polish algorithm. The rows and columns correspond to pixels and wave numbers, respectively. The
additive model is given by y_ij = x + a_i + b_j + e_ij (the median polish maintains all these values at all
iterations).
"""
if max_iter is None:
max_iter = 2
self.spc, pixel_effect, wl_effect = median_polish(self.spc, max_iter=max_iter)
return self, pixel_effect, wl_effect
def simplify_spectral_matrix(self, npoints=None, n_neighbors=None, smooth_fraction=None, smooth_size=None, valid_regions=None):
"""
removes noisy spectra and merge remaining similar ones.
Also creates updated list of IDX: an index mapping pixels before and after simplification, which may be used as clustering label.
All spectra are baseline-corrected (tophat) for analysis, and the indexing/classification is based on a rough
interpolation. Furthermore only non-noise spectra are maintained.
If "n_neighbors" is an integer greater than zero, than clustering is constrained to pixels at this vicinity.
"""
self.remove_noisy_pixels(valid_regions=valid_regions)
if npoints is None:
npoints = 512
if npoints >= self.spc.shape[0]:
return # more clusters than objects
if n_neighbors and n_neighbors > 0.5 * self.spc.shape[0]:
n_neighbors = 0.5 * self.spc.shape[0]
if n_neighbors and n_neighbors < 3:
n_neighbors = 3
if smooth_fraction is None:
smooth_fraction = 0.2
if smooth_size is None:
smooth_size=10
if valid_regions is None:
valid_regions = [[100,1900],[2600,3000]] # 1800~2700 is silent region, but we can keep a few values
simple = SpecImage() # temporary specs therefore don't need wl, idx, or description
simple.xy = np.array(self.xy) # copy
simple.idx = np.repeat(-1, self.idx.shape[0]) # index with same size as *original* (not simplified) SpecImage
valid_idx = [] # indexes from *original* specs that should be maintained after simplification; others are purged
location_map = [] # maps idx from simplified to original spc
sm_weights = np.repeat(1./float(smooth_size), smooth_size) # uniform weights; default is triangular
smooth_wl = moving_average(self.wl, weights=sm_weights) # used for rough spectra (quick clustering)
rough_wl = np.linspace(smooth_wl[0], smooth_wl[-1], int(smooth_fraction * len(smooth_wl)))
if valid_regions:
include_rough=[]
for vreg in valid_regions:
include_rough += [i for i in range(len(rough_wl)) if (rough_wl[i] >= vreg[0] and rough_wl[i] <= vreg[1])]
rough_wl = np.sort([rough_wl[i] for i in include_rough])
for i in range(self.spc.shape[0]):
corrected_spc = ndimage.morphology.white_tophat(self.spc[i], None, np.repeat([1], int(self.wl.size * 0.2) + 1))
spectrum = moving_average(corrected_spc, weights=sm_weights) # spectrum points are associated to smooth_wl values
rough_spec = np.interp(rough_wl, smooth_wl, spectrum) # some regions are excluded
if simple.spc.size:
simple.spc = np.vstack([simple.spc, rough_spec])
else:
simple.spc = np.array(rough_spec, dtype=float)
simple.idx[np.where(self.idx == i)[0]] = i # simple.idx has original size but points to elements in new spc[]
location_map.append(i)
valid=[i for i,j in enumerate(simple.idx) if j > -1] # remove XY/IDX info about excluded spectra
simple.idx = simple.idx[valid]
simple.xy = simple.xy[valid]
if n_neighbors:
connect = kneighbors_graph_by_idx (simple.xy, simple.idx, n_neighbors=n_neighbors)
kmod=cluster.AgglomerativeClustering(n_clusters=npoints,connectivity=connect,affinity="cosine",linkage="average").fit(simple.spc)
else:
dists = metrics.pairwise_distances(simple.spc,metric="cosine",n_jobs=-1)
kmod=cluster.AgglomerativeClustering(n_clusters=npoints,affinity="precomputed",linkage="average").fit(dists)
# from split_by_cluster()
clusters = list(set(kmod.labels_)) # use labels_ to create a list of SpecImages
cluster_idx=[] # will hold a list of indexes for each cluster
for c_id in clusters: # enumerate returns a tuple (index_of_value, value)
cluster_idx.append([i for i,j in enumerate(kmod.labels_) if j==c_id])
for i in range(len(cluster_idx)): # cluster_idx[i] is a list (can be used to slice np.arrays, NOT lists)
# map_idx[cluster_idx[i]] = cluster_idx[i][0] # map_idx is an np.array -> all elems map to first idx, per cluster
valid_idx.append(location_map[cluster_idx[i][0]]) # location_map has idx from original SPC
self.spc = np.array([self.spc[i] for i in valid_idx], dtype=float)
self.idx = kmod.labels_[simple.idx] # simple.idx now has cluster ID for each new spectrum
self.xy = simple.xy
return self
def simplify_spectral_matrix_pca(self, npoints=None, fraction_dims=None):
"""
merge similar spectra based on their PCA dimensions (as a fraction of the total number of wave numbers).
Unlike simplify_spectral_matrix(), no preprocessing (like tophat correction, interpolation and smoothing) is
applied to spectra, and no removal of noise-only spectra is attempted.
Updates IDX list: an index mapping pixels before and after simplification, which may be used as clustering label.
"""
# self.xy is not touched; self.idx will have several duplicates; self.spc will have averages.
if npoints is None:
npoints = int(self.spc.shape[0]/10)
if npoints < 2:
npoints = 2
if npoints >= self.spc.shape[0]:
return # more clusters than objects
if fraction_dims is None:
fraction_dims = 0.05
n_dims = int(float(self.wl.shape[0]) * fraction_dims)
if n_dims < 2:
n_dims = 2
transf=decomposition.PCA(n_components=n_dims).fit_transform(self.spc)
dists = metrics.pairwise_distances(transf,metric="cosine",n_jobs=-1)
kmod=cluster.AgglomerativeClustering(n_clusters=npoints,affinity="precomputed",linkage="average").fit(dists)
# from split_by_cluster()
clusters = sorted(set(kmod.labels_))
cluster_idx=[] # will hold a list of indexes for each cluster
avge_spec = []
for c_id in clusters: # enumerate returns a tuple (index_of_value, value)
cluster_idx.append([i for i,j in enumerate(kmod.labels_) if j==c_id])
for i in range(len(cluster_idx)): # cluster_idx[i] is a list (can be used to slice np.arrays, NOT lists)
avge_spec.append(np.average(self.spc[cluster_idx[i]], axis=0)) # list of arrays
self.spc = np.array(avge_spec, dtype=float)
self.idx = kmod.labels_[self.idx] # spec.idx may be already larger than original spc (previous simplifications)
return self
def subtract_background_spectra(self, k_neighbours=None, weighted=None, strength=None, smooth_fraction=None, mode=None):
"""
Finds all background spectra and creates a Voronoi diagram for them, such that each non-background spectrum uses
only the background closer to it. This algorithm uses an efficient KDTree algorithm to find closest neighbours,
and generally works with wavenumbers below 3000 cm-1. If the results are not satisfactory, try changing "mode"
(between 1 and 4)
OBS: This function currently does NOT work with arbitrary IDX vectors ("sparse spectra"). It will also in the
future allow for smoothing the background, and dynamically calculating the background concentration (like SIS).
"""
## FIXME: weighted=True seems to be problematic
if self.idx.shape[0] != self.spc.shape[0]:
print ("Unfortunately the background subtraction cannot be applied to sparse spectra (that is, with arbitrary \
pixel<->spectra mappings)")
return
if k_neighbours is None or k_neighbours < 1:
k_neighbours = 1
if weighted is None:
weighted = False
if smooth_fraction is None:
smooth_fraction = 0.5
if mode is None:
mode = 3
if strength is None or strength > 1 or strength < 0.0001:
strength = 0.5
water_idx = min(range(self.wl.shape[0]), key=lambda i: abs(self.wl[i]-3000)) # index of wl closest to 3000
stride = int(1./smooth_fraction)
if water_idx/stride < 10:
stride = water_idx / 10
if stride < 1:
stride = 1
# spectral matrix is smoothed, which will lead to the first clustering (using only wl<water and skipping stride elems)
spec = np.array([moving_average(row[:water_idx], n=stride)[::stride] for row in self.spc])
if mode != 2:
# find which cluster, based on roughly smoothed spectra, represents background (=lower intensities excluding water region)
kmod1 = cluster.AgglomerativeClustering(n_clusters=2, affinity='cosine', linkage='complete').fit(spec)
back1 = np.where(kmod1.labels_ == 0)[0]; tmp = np.where(kmod1.labels_ == 1)[0]
avge_back1 = np.average(spec[back1],1).min(); avge_tmp = np.average(spec[tmp],1).min()
if avge_back1 > avge_tmp: # cluster "1" has member with lower intensities than cluster "0"
back1 = tmp; avge_back1 = avge_tmp # we will use the average values later
if mode > 1:
# another clustering, based on rescaled, baseline-corrected and a different distance (Euclidean)
spec = np.array([ndimage.morphology.white_tophat(row, None, np.repeat([1], int(self.wl.size * 0.1) + 1)) for row in spec])
spec = np.array([rescale_spectrum_01(row) for row in spec],dtype=float)
kmod2 = cluster.AgglomerativeClustering(n_clusters=2, linkage='ward').fit(spec) # Ward only works with Euclidean distances
# background RESCALED spectra are those with HIGHER intensities over region (signal-rich spectra have a few close to one)
back2 = np.where(kmod2.labels_ == 0)[0]; tmp = np.where(kmod2.labels_ == 1)[0]
avge_back2 = np.average(spec[back2],1).min(); avge_tmp = np.average(spec[tmp],1).min()
if avge_back2 < avge_tmp: # cluster "1" has higher intensities (more noisy) than cluster "0"
back2 = tmp; avge_back2 = avge_tmp # we will use the average values later
if mode == 1:
back_idx = back1
elif mode == 2:
back_idx = back2
elif mode == 3:
back_idx = np.array(sorted( set.intersection(set(back1),set(back2)) ),dtype=int) # background spectra according to both clusterings
if back_idx.shape[0] == 0: # No elements in common between clusterings: choose the lowest values based on ORIGINAL spectra
avge_back1 = np.average(self.spc[back1,:water_idx]); # average over flattened matrix (all values)
avge_back2 = np.average(self.spc[back2,:water_idx]);
if avge_back1 > avge_back2:
back_idx = back2
else:
back_idx = back1
elif mode == 4:
if back1.shape[0] > back2.shape[0]:
back_idx = back1
else:
back_idx = back2
# quick search for location of nearby background spectra ( http://stackoverflow.com/a/23772423/204903 )
tree=spatial.cKDTree(self.xy[back_idx]) # FIXME: works only when IDX is not needed
spec = np.array(self.spc,copy=True) # new matrix, to avoid overwritting
for i in range(self.spc.shape[0]): # includes cell *and* background
dist, indx = tree.query(self.xy[i], k=k_neighbours)
if (k_neighbours > 1): # dist and indx are lists
dist = np.array(dist[np.where(indx < tree.n)[0]]) # tree.n (=self.xy[back_idx].shape[0]) means that neighbour wasn't found
indx = np.array(indx[np.where(indx < tree.n)[0]]) # tree.n (=self.xy[back_idx].shape[0]) means that neighbour wasn't found
else: # dist and indx are single variables
if (indx < tree.n):
dist=np.array([dist])
indx=np.array([indx])
else:
dist=np.array([])
indx=np.array([])
if indx.shape[0]: # at least one neighbour was found; otherwise don't do anything
if weighted: # indx is relative to the back_idx list, not to the whole image!
background = np.average(self.spc[back_idx[indx]], weights = (1./(float(dist)+1e-2)), axis=0) # to avoid NaN
else:
background = np.average(self.spc[back_idx[indx]], axis=0)
spec[i] = self.spc[i] - strength * background
spec[i] -= min(spec[i]) - min(background) # maintains original scale
# else (if indx.shape[0] is zero) then do nothing: spec[i] already has spectrum from self.spc[i]
self.spc=spec
def baseline_correction_polynomial(self, order=None, noise_std=None, zero=None, n_iter=None):
"""
Finds the baseline for each spectrum by iteratively fitting a polynomial or spline and excluding values higher
than fitted. finally subtracts this baseline signal.
Spline smoothing is achieved by a polynomial of order zero. (not true?)
Alternative would be to find local minima, and/or using them in weighted splines -- see splrep() function)
"""
if order is None:
order=5
elif order > 15:
order=15
if noise_std is None:
noise_std = 0.1
if zero is None:
zero = False
if n_iter is None:
n_iter = len(self.wl)
min_points = max(int(len(self.wl)/20), int(3 * order + 1)) # from spc.fit.poly.below() in hyperSpec
for s in range(len(self.spc)):
valid, baseline = baseline_points_by_polynomial(self.wl, self.spc[s], order=order, noise_std=noise_std,
min_points=min_points, n_iter = n_iter)
self.spc[s] = baseline_subtraction(self.spc[s], baseline, zero=zero)
return self
def baseline_correction_rubberband (self, band_size=None, noise_std=None, zero=None):
"""
Baseline correction based on spline smoothing through the rubberband minimum points.
"""
if band_size is None:
band_size = 1.
if noise_std is None:
noise_std = 1.
if zero is None:
zero = False
for s in range(len(self.spc)):
valid, baseline = baseline_points_by_rubberband (self.wl, self.spc[s], band_size=band_size, noise_std=noise_std)
# discard rubberband baseline above, which is just piecewise minima.
baseline = interpolate.splev(self.wl, interpolate.splrep(self.wl[valid],self.spc[s,valid]), der=0)
self.spc[s] = baseline_subtraction(self.spc[s], baseline, zero=zero)
return self
def baseline_correction_als(self, smooth=None, asym=None, n_iter=None, zero=None):
"""
http://stackoverflow.com/questions/29156532/python-baseline-correction-library
"Asymmetric Least Squares Smoothing" by P. Eilers and H. Boelens in 2005.
"smooth" is log(lambda) and "asym" is p in the paper (that is, lambda between 7 and 20 are equiv to 10^3 ~ 10^9)
"""
if (smooth is None) or (smooth < 1):
smooth = 10
if (asym is None) or (asym > 0.999) or (asym < 1e-5):
asym = 0.01
if n_iter is None:
n_iter = 20
if zero is None:
zero = False
L = len(self.wl)
D = sparse.csc_matrix(np.diff(np.eye(L), 2))
for s in range(len(self.spc)):
valid, baseline = baseline_points_by_als (self.wl, self.spc[s], smooth=smooth, asym=asym, n_iter=n_iter)
self.spc[s] = baseline_subtraction(self.spc[s], baseline, zero=zero)
return self
def baseline_correction_tophat(self, f = None):
if (f == None) or (f < 0.000001) or (f > 1.):
footp = np.repeat([1], int(self.wl.size * 0.4) + 1)
else:
footp = np.repeat([1], int(self.wl.size * f) + 1)
min_spec = self.spc.min(1) # 1 = min() per row; 0 = min() per col || tophat forces min() to zero
self.spc = np.array([ndimage.morphology.white_tophat(spec, None, footp) + z for spec,z in zip(self.spc,min_spec)],dtype=float)
return self
def background_correction_remove_percentile(self, percentile=None, flatten=None):
"""
Calculate the x% lower intensity value over all samples per wavelength, and then for each spectrum subtract these
values -- that is, create a spectrum of "minima", representing the background, that should be discounted
Warning: here "percentile" is a single parameter
"""
if percentile is None:
percentile = 1
if flatten is None:
flatten = False
background = np.percentile(self.spc,q=percentile,axis=0) # axis=0 -> over wavelengths
self.spc=np.array([spec - background for spec in self.spc],dtype=float)
if flatten:
self.spc[ self.spc < 0.] = 0.
return self
def background_correction_remove_minimum(self):
background = np.min(self.spc,axis=0) # axis=0 -> over wavelengths
self.spc=np.array([spec - background for spec in self.spc],dtype=float)
return self
def background_correction_emsc(self, order=None, fit=None):
"""Extended Multiplicative Scatter Correction (Ref H. Martens)
order - order of polynomial
fit - if None then use average spectrum, otherwise provide a spectrum
as a column vector to which all others fitted
-- imported from PyChem http://pychem.sourceforge.net/ (Roger Jarvis, GPL)
"""
if fit is None: # fitting spectrum
mx = np.mean(self.spc,axis=0)[:,np.newaxis]
else:
mx = fit
if order is None:
order = 2
corr = np.zeros(self.spc.shape)
for i in range(len(self.spc)): # do fitting
b,f,r = pychem_mlr (mx, self.spc[i,:][:,np.newaxis], order)
corr[i,:] = np.reshape((r/b[0,0]) + mx, (corr.shape[1],))
self.spc = corr
return self
def background_correction_pca(self, n_components=None, approximate=None):
"""
Based on PCA Noise Reduction technique, that aims at re-weighting wavelengths based on how much they contribute to
variance (PCA-based dimensionality reduction). This function also finds non-informative pixels, besides wavelengths.
n_components[] determine how many PCs are used over wavelengths [0] and over pixels [1], with values too large or
zero indicating to skip this step.
Must use carefully since assumes conserved values are not informative (for this image), which might remove signal
relevant when comparing images.
"""
if n_components is None:
n_components = [5,5]
if n_components[0] < self.spc.shape[1] and n_components[0] > 0:
if approximate is None or approximate is False:
pca=decomposition.PCA(n_components=n_components[0])
else:
pca=decomposition.RandomizedPCA(n_components=n_components[0])
Y = pca.fit_transform(self.spc) # dimensions are wavelengths and samples are pixels
self.spc = pca.inverse_transform(Y) # reconstruct original data using only "n_components" projections
# do same thing across pixels instead of wavelengths
if n_components[1] < self.spc.shape[0] and n_components[1] > 0:
if approximate is None or approximate is False:
pca=decomposition.PCA(n_components=n_components[1])
else:
pca=decomposition.RandomizedPCA(n_components=n_components[1])
Y = pca.fit_transform(self.spc.T) # dimensions are now the pixels and "samples" are the wavelengths
self.spc = pca.inverse_transform(Y).T # transpose again, to have correct matrix dimension
return self
def find_endmembers(self, n_members=None, method=None):
"""
Extraction of endmembers (a.k.a. pure spectra) using pysptools library (http://pysptools.sourceforge.net/eea_fn.html)
Possible methods are "nfindr", "ppi" (pixel purity index), "fippi"(Fast Iterative Pixel Purity Index), and
"atgp"(Automatic Target Generation Process).
This function will return a numpy array with n_members endmembers and a vector with indexes of such endmembers,
when available (that is, for all except PPI.)
OBS: FIPPI method does not respect n_members, can give larger value!
"""
try:
from pysptools.eea import eea, nfindr
except ImportError:
sys.stderr.write("You don't seem to have http://pysptools.sourceforge.net/ properly installed. Sorry.\n")
return np.array([]), np.array([])
if n_members is None:
n_members = 4
if method is "atgp":
em, idx = eea.ATGP (self.spc, n_members)
elif method is "fippi":
em, idx = eea.FIPPI (self.spc, n_members)
elif method is "ppi":
em = eea.PPI (self.spc, n_members, 1000) # suggested numSkewers is 10k
idx = []
else: # "nfindr" or None
em, _, idx, _ = nfindr.NFINDR (self.spc, n_members)
return em, idx
def find_abundance_maps(self, endmembers=None, method=None, normalize=None, closure=None, reorder=None):
"""
Abundance maps (contribution of each endmember) for each spectrum using pysptools library
(http://pysptools.sourceforge.net/abundance_maps_fn.html)
Possible methods are "fcls" (fully constrained least squares), "nnls" (non-negative constrained least squares),