-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdataStructure.py
executable file
·291 lines (244 loc) · 11.5 KB
/
dataStructure.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
'''Created by Chen yizhi. 2017.??.??
'''
import numpy as np
import tensorflow as tf
import os
import SimpleITK
from scipy.ndimage import binary_dilation
from scipy.ndimage import generate_binary_structure
class Polyp_data:
'''data object for polyp.
'''
raw_CT_name = "CT.nii.gz"
raw_mask_name = "mask.nii.gz"
colon_mask_name = "colon_mask.nii.gz"
dilated_colon_mask_name = "dilated_colon_mask.nii.gz"
raw_tf_name = "data.tf" # vol: float32, mask: uint8
def __init__(self):
# directory would be accessed when read_data_from_raw_file() or write
self.base_dir = 0
self.raw_CT_dir = 0
self.raw_mask_dir = 0
self.colon_mask_dir = 0
self.dilated_colon_mask_dir = 0
# variables containing 3D data.
self.crop_CT_data = 0 # int16
self.crop_polyp_mask = 0 # uint8
self.crop_colon_mask = 0
def calculate_propety(self, polyp_mask, polyp_label):
self.label_in_polypmask = polyp_label
mask = polyp_mask == polyp_label
dots = np.where(mask!=0)
self.pixelsize_of_polyp = len(dots[0])
self.coord_of_center = [np.average(nums) for nums in dots]
assert len(self.coord_of_center)==3
self.coord_of_center = np.array(self.coord_of_center)
def Crop_Polyp_CTData(self, volume_data, crop_size, coord_of_center):
'''size must be a int. homogeneous'''
# Compute center point of dots
assert type(crop_size) == int
assert crop_size%2 == 0
CT_data = volume_data.CT_data
colon_mask = volume_data.colon_mask
polyp_mask = volume_data.polyp_mask
self.crop_CT_data = -999 * np.ones((crop_size,crop_size,crop_size),dtype=np.int16)
self.crop_colon_mask = np.zeros((crop_size,crop_size,crop_size), dtype=np.uint8)
self.crop_polyp_mask = np.zeros((crop_size,crop_size,crop_size), dtype=np.uint8)
#print "aver: ", aver
copy_ini = coord_of_center- crop_size/2
copy_end = coord_of_center + crop_size/2
paste_ini = [0,0,0]
paste_end = [crop_size, crop_size, crop_size]
for i in range(0,3):
if copy_ini[i] < 0:
paste_ini[i] = 0 - copy_ini[i]
copy_ini[i] = 0
pass
elif copy_end[i] > CT_data.shape[i]:
paste_end[i] = crop_size - (copy_end[i]-CT_data.shape[i])
copy_end[i] = CT_data.shape[i]
#dot_ini = copy_end.astype(np.int)
#dot_end = copy_ini.astype(np.int)
self.crop_CT_data[paste_ini[0]:paste_end[0],
paste_ini[1]:paste_end[1],
paste_ini[2]:paste_end[2]] = \
CT_data[copy_ini[0]:copy_end[0],
copy_ini[1]:copy_end[1],
copy_ini[2]:copy_end[2]]
self.crop_colon_mask[paste_ini[0]:paste_end[0],
paste_ini[1]:paste_end[1],
paste_ini[2]:paste_end[2]] = \
colon_mask[copy_ini[0]:copy_end[0],
copy_ini[1]:copy_end[1],
copy_ini[2]:copy_end[2]]
# Generate mask, which corresponds to the crop data.
self.crop_polyp_mask[paste_ini[0]:paste_end[0],
paste_ini[1]:paste_end[1],
paste_ini[2]:paste_end[2]] = \
polyp_mask[copy_ini[0]:copy_end[0],
copy_ini[1]:copy_end[1],
copy_ini[2]:copy_end[2]]
def _bytes_feature(self,value):
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def save_data_into_tf_file(self):
assert self.base_dir != 0
writer = tf.python_io.TFRecordWriter(os.path.join(self.base_dir, self.raw_tf_name))
example = tf.train.Example(features=tf.train.Features(feature={
'volume': self._bytes_feature(self.crop_CT_data.astype(np.int16).tostring()),
'polyp_mask': self._bytes_feature(self.crop_polyp_mask.astype(np.uint8).tostring()),
'colon_mask': self._bytes_feature(self.crop_colon_mask.astype(np.uint8).tostring()),
}))
writer.write(example.SerializeToString())
writer.close()
def colon_mask_dilation(self, iterations=8):
if self.crop_colon_mask == []:
raise ValueError
data = ((self.crop_colon_mask + self.crop_polyp_mask)!=0).astype(np.uint8) # Add mask data.
structure = generate_binary_structure(3,2)
self.dilated_crop_colon_mask = binary_dilation(data, structure, iterations)
#####################################################################################################################
def read_data_from_raw_file(self):
if self.raw_CT_dir != 0:
imageSitk = SimpleITK.ReadImage(self.raw_CT_dir)
self.crop_CT_data = SimpleITK.GetArrayFromImage(imageSitk)
if self.raw_mask_dir != 0:
imageSitk = SimpleITK.ReadImage(self.raw_mask_dir)
self.crop_polyp_mask = SimpleITK.GetArrayFromImage(imageSitk)
if self.colon_mask_dir != 0:
imageSitk = SimpleITK.ReadImage(self.colon_mask_dir)
self.crop_colon_mask = SimpleITK.GetArrayFromImage(imageSitk)
def write_data_into_raw_file(self):
'''Set one or more of the dirs, and save the corresponding data to the disk,
'''
self.raw_CT_dir = os.path.join(self.base_dir, self.raw_CT_name)
self.raw_mask_dir = os.path.join(self.base_dir, self.raw_mask_name)
self.colon_mask_dir = os.path.join(self.base_dir, self.colon_mask_name)
if self.raw_CT_dir != 0:
img = SimpleITK.GetImageFromArray(self.crop_CT_data)
SimpleITK.WriteImage(img, self.raw_CT_dir)
if self.raw_mask_dir != 0:
img = SimpleITK.GetImageFromArray(self.crop_polyp_mask)
SimpleITK.WriteImage(img, self.raw_mask_dir)
if self.colon_mask_dir != 0:
img = SimpleITK.GetImageFromArray(self.crop_colon_mask)
SimpleITK.WriteImage(img, self.colon_mask_dir)
#####################################################################################################################
class Volume_Data:
'''Structure containing all data for a CT Volume.
Make sure that all data of a volume are in the same fold, as self.base_dir
Here My Data is in nifit, so maybe you want to implement your own LOAD functions.'''
#TODO: Make it a base class. I\O functions should be implemented in another class.
def __init__(self):
self.base_dir = 0
self.patient_uid = None
self.volume_uid = None
self.shape = 0
self.CT_data = 0
self.colon_mask = 0
self.polyp_mask = 0
self.dilated_colon_mask = 0
self.score_map = 0
def Set_Directory(self, direc):
self.base_dir = direc
while direc[len(direc)-1] == '/':
direc = direc[:len(direc)-1]
# Delete two floors. As it may occur in some situations where only by this could it be reasonable.
tmp = direc[:direc.rfind("/")]
self.patient_dir = tmp[:tmp.rfind("/")]
self.patient_uid = self.patient_dir[self.patient_dir.rfind("/")+1:]
self.volume_uid = direc[len(self.patient_dir)+1:]
def clear_volume_data(self):
self.CT_data = 0
self.colon_mask = 0
self.dilated_colon_mask = 0
self.polyp_mask = 0
self.score_map = 0
def load_volume_data(self):
name = "CT_data.nii.gz"
if not os.path.exists(os.path.join(self.base_dir, name)):
return 0
image = SimpleITK.ReadImage(os.path.join(self.base_dir, name))
data = SimpleITK.GetArrayFromImage(image)
self.CT_data = (data<-999)*-999 + (data>=-999)*data
self.spacing = image.GetSpacing()
self.shape = self.CT_data.shape
return 1
def load_polyp_mask(self):
name = "polyp_mask.nii.gz"
if not os.path.exists(os.path.join(self.base_dir, name)):
return 0
image = SimpleITK.ReadImage(os.path.join(self.base_dir, name))
self.polyp_mask= SimpleITK.GetArrayFromImage(image)
return 1
def load_colon_mask(self):
name = "colon_mask.nii.gz"
if not os.path.exists(os.path.join(self.base_dir, name)):
return 0
image = SimpleITK.ReadImage(os.path.join(self.base_dir, name))
self.colon_mask = SimpleITK.GetArrayFromImage(image)
return 1
def colon_mask_dilation(self, iterations=8):
if self.colon_mask is 0:
raise ValueError
structure = generate_binary_structure(3,2)
self.dilated_colon_mask = binary_dilation(self.colon_mask, structure, iterations).astype(np.uint8)
def load_score_map(self, fold=0):
'''For nested cross validation, score maps are stored in the cross_1, cross_2, cross3... directory
Under the volume unit direcotry.
'''
name='score_map.nii.gz'
base_path = os.path.join(self.base_dir, fold, name)
if not os.path.exists(base_path):
return 0
image = SimpleITK.ReadImage(base_path)
self.score_map = SimpleITK.GetArrayFromImage(image)
return self.score_map
def load_colon_dilation(self):
name = "dilated_colon_mask.nii.gz"
if not os.path.exists(os.path.join(self.base_dir, name)):
return 0
image = SimpleITK.ReadImage(os.path.join(self.base_dir, name))
self.dilated_colon_mask = SimpleITK.GetArrayFromImage(image)
return 1
###################################################################
## Deprecated
'''
def load_colon_mask(self):
name = 'colonMask.raw'
if self.base_dir == 0:
print("Base dir not provided!")
raise IOError
if self.shape == 0:
print("Shape not provided!")
raise ValueError
if os.path.exists(os.path.join(self.base_dir, name)):
self.colon_mask_dir = os.path.join(self.base_dir, name)
else:
print("Cannot find colon mask data.")
raise IOError
with open(self.colon_mask_dir, "rb") as f:
data = f.read()
data = np.fromstring(data, dtype=np.uint8)
data = (data!=0).astype(np.uint8) # In case some map of polyp mask may have labels of 255.
self.colon_mask = np.reshape(data, self.shape)
def load_volume_data(self):
self.volume_data_dir = 0
names=['oriInterpolatedCTData.raw', 'InterpolatedCTData.raw']
for name in names:
if os.path.exists(os.path.join(self.base_dir, name)):
self.volume_data_dir = os.path.join(self.base_dir, name)
break
assert self.volume_data_dir != 0
ds = dicom.read_file(self.volume_data_dir)
data = ds.pixel_array
self.CT_data = (data<-999)*-999 + (data>=-999)*data
self.CT_data = self.CT_data.astype(np.int16)
self.shape = ds.pixel_array.shape
def load_polyp_mask(self):
name = 'polyp_mask.nrrd'
if not os.path.exists(os.path.join(self.base_dir, name)):
return 0
image = SimpleITK.ReadImage(os.path.join(self.base_dir, name))
self.polyp_mask = SimpleITK.GetArrayFromImage(image)
return 1
'''