forked from Project-MONAI/tutorials
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathunet_evaluation_dict.py
112 lines (97 loc) · 4.17 KB
/
unet_evaluation_dict.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# Copyright (c) MONAI Consortium
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import sys
import tempfile
from glob import glob
import nibabel as nib
import numpy as np
import torch
from torch.utils.data import DataLoader
import monai
from monai.data import create_test_image_3d, list_data_collate, decollate_batch
from monai.engines import get_devices_spec
from monai.inferers import sliding_window_inference
from monai.metrics import DiceMetric
from monai.networks.nets import UNet
from monai.transforms import (
Activations,
EnsureChannelFirstd,
AsDiscrete,
Compose,
LoadImaged,
SaveImage,
ScaleIntensityd,
)
def main(tempdir):
monai.config.print_config()
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
print(f"generating synthetic data to {tempdir} (this may take a while)")
for i in range(5):
im, seg = create_test_image_3d(128, 128, 128, num_seg_classes=1, channel_dim=-1)
n = nib.Nifti1Image(im, np.eye(4))
nib.save(n, os.path.join(tempdir, f"im{i:d}.nii.gz"))
n = nib.Nifti1Image(seg, np.eye(4))
nib.save(n, os.path.join(tempdir, f"seg{i:d}.nii.gz"))
images = sorted(glob(os.path.join(tempdir, "im*.nii.gz")))
segs = sorted(glob(os.path.join(tempdir, "seg*.nii.gz")))
val_files = [{"img": img, "seg": seg} for img, seg in zip(images, segs)]
# define transforms for image and segmentation
val_transforms = Compose(
[
LoadImaged(keys=["img", "seg"]),
EnsureChannelFirstd(keys=["img", "seg"]),
ScaleIntensityd(keys="img"),
]
)
val_ds = monai.data.Dataset(data=val_files, transform=val_transforms)
# sliding window inference need to input 1 image in every iteration
val_loader = DataLoader(val_ds, batch_size=1, num_workers=4, collate_fn=list_data_collate)
dice_metric = DiceMetric(include_background=True, reduction="mean", get_not_nans=False)
post_trans = Compose([Activations(sigmoid=True), AsDiscrete(threshold=0.5)])
saver = SaveImage(output_dir="./output", output_ext=".nii.gz", output_postfix="seg")
# try to use all the available GPUs
devices = [torch.device("cuda" if torch.cuda.is_available() else "cpu")]
# devices = get_devices_spec(None)
model = UNet(
spatial_dims=3,
in_channels=1,
out_channels=1,
channels=(16, 32, 64, 128, 256),
strides=(2, 2, 2, 2),
num_res_units=2,
).to(devices[0])
model.load_state_dict(torch.load("best_metric_model_segmentation3d_dict.pth"))
# if we have multiple GPUs, set data parallel to execute sliding window inference
if len(devices) > 1:
model = torch.nn.DataParallel(model, device_ids=devices)
model.eval()
with torch.no_grad():
for val_data in val_loader:
val_images, val_labels = val_data["img"].to(devices[0]), val_data["seg"].to(devices[0])
# define sliding window size and batch size for windows inference
roi_size = (96, 96, 96)
sw_batch_size = 4
val_outputs = sliding_window_inference(val_images, roi_size, sw_batch_size, model)
val_outputs = [post_trans(i) for i in decollate_batch(val_outputs)]
val_labels = decollate_batch(val_labels)
# compute metric for current iteration
dice_metric(y_pred=val_outputs, y=val_labels)
for val_output in val_outputs:
saver(val_output)
# aggregate the final mean dice result
print("evaluation metric:", dice_metric.aggregate().item())
# reset the status
dice_metric.reset()
if __name__ == "__main__":
with tempfile.TemporaryDirectory() as tempdir:
main(tempdir)