Skip to content

Commit

Permalink
Merge pull request #904 from EmmaRenauld/tests_dwi
Browse files Browse the repository at this point in the history
Finish cleaning the main of dwi_ scripts, to allow eventual unit tests
  • Loading branch information
arnaudbore authored Feb 19, 2024
2 parents 065a43a + c65e446 commit e39da13
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 70 deletions.
87 changes: 87 additions & 0 deletions scilpy/dwi/operations.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import logging
import math
import pprint

import numpy as np

from scilpy.gradients.bvec_bval_tools import identify_shells, \
round_bvals_to_shell, DEFAULT_B0_THRESHOLD


def apply_bias_field(dwi_data, bias_field_data, mask_data):
"""
Expand Down Expand Up @@ -131,3 +137,84 @@ def compute_dwi_attenuation(dwi_weights: np.ndarray, b0: np.ndarray):
dwi_attenuation[np.logical_not(np.isfinite(dwi_attenuation))] = 0.

return dwi_attenuation


def detect_volume_outliers(data, bvecs, bvals, std_scale, verbose,
b0_thr=DEFAULT_B0_THRESHOLD):
"""
Parameters
----------
data: np.ndarray
The dwi data.
bvecs: np.ndarray
The bvecs
bvals: np.array
The b-values vector.
std_scale: float
How many deviation from the mean are required to be considered an
outlier.
verbose: bool
If True, print even more stuff.
b0_thr: float
Value below which b-values are considered as b0.
"""
results_dict = {}
shells_to_extract = identify_shells(bvals, b0_thr, sort=True)[0]
bvals = round_bvals_to_shell(bvals, shells_to_extract)
for bval in shells_to_extract[shells_to_extract > b0_thr]:
shell_idx = np.where(bvals == bval)[0]
shell = bvecs[shell_idx]
results_dict[bval] = np.ones((len(shell), 3)) * -1
for i, vec in enumerate(shell):
if np.linalg.norm(vec) < 0.001:
continue

dot_product = np.clip(np.tensordot(shell, vec, axes=1), -1, 1)
angle = np.arccos(dot_product) * 180 / math.pi
angle[np.isnan(angle)] = 0
idx = np.argpartition(angle, 4).tolist()
idx.remove(i)

avg_angle = np.average(angle[idx[:3]])
corr = np.corrcoef([data[..., shell_idx[i]].ravel(),
data[..., shell_idx[idx[0]]].ravel(),
data[..., shell_idx[idx[1]]].ravel(),
data[..., shell_idx[idx[2]]].ravel()])
results_dict[bval][i] = [shell_idx[i], avg_angle,
np.average(corr[0, 1:])]

for key in results_dict.keys():
avg_angle = np.round(np.average(results_dict[key][:, 1]), 4)
std_angle = np.round(np.std(results_dict[key][:, 1]), 4)

avg_corr = np.round(np.average(results_dict[key][:, 2]), 4)
std_corr = np.round(np.std(results_dict[key][:, 2]), 4)

outliers_angle = np.argwhere(
results_dict[key][:, 1] < avg_angle - (std_scale * std_angle))
outliers_corr = np.argwhere(
results_dict[key][:, 2] < avg_corr - (std_scale * std_corr))

print('Results for shell {} with {} directions:'
.format(key, len(results_dict[key])))
print('AVG and STD of angles: {} +/- {}'
.format(avg_angle, std_angle))
print('AVG and STD of correlations: {} +/- {}'
.format(avg_corr, std_corr))

if len(outliers_angle) or len(outliers_corr):
print('Possible outliers ({} STD below or above average):'
.format(std_scale))
print('Outliers based on angle [position (4D), value]')
for i in outliers_angle:
print(results_dict[key][i, :][0][0:2])
print('Outliers based on correlation [position (4D), value]')
for i in outliers_corr:
print(results_dict[key][i, :][0][0::2])
else:
print('No outliers detected.')

if verbose:
print('Shell with b-value {}'.format(key))
pprint.pprint(results_dict[key])
print()
Empty file added scilpy/dwi/tests/__init__.py
Empty file.
13 changes: 13 additions & 0 deletions scilpy/dwi/tests/test_operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-


def test_apply_bias_field():
pass


def test_compute_dwi_attenuation():
pass


def test_detect_volume_outliers():
pass
9 changes: 9 additions & 0 deletions scilpy/dwi/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# -*- coding: utf-8 -*-


def test_extract_dwi_shell():
pass


def test_extract_b0():
pass
4 changes: 3 additions & 1 deletion scilpy/io/varian_fdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,9 @@ def correct_procpar_intensity(dwi_data, dwi_path, b0_path):
P1 = 10**(Ldb/20)
"""

# Not really an io function: does not load and save! But it is the only
# method we have concerning varian_fdf. Kept here, as decided by guru
# Arnaud.
dwi_gain = get_gain(dwi_path)
b0_gain = get_gain(b0_path)

Expand Down
75 changes: 6 additions & 69 deletions scripts/scil_dwi_detect_volume_outliers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,16 @@
"""

import argparse
import pprint

from dipy.io.gradients import read_bvals_bvecs
import nibabel as nib
import numpy as np

from scilpy.dwi.operations import detect_volume_outliers
from scilpy.io.utils import (assert_inputs_exist,
add_force_b0_arg,
add_verbose_arg)
from scilpy.gradients.bvec_bval_tools import (check_b0_threshold,
identify_shells,
normalize_bvecs,
round_bvals_to_shell)
import math
normalize_bvecs)


def _build_arg_parser():
Expand All @@ -49,7 +45,7 @@ def _build_arg_parser():
'diffusion weighting. [%(default)s]')
p.add_argument('--std_scale', type=float, default=2.0,
help='How many deviation from the mean are required to be '
'considered an outliers. [%(default)s]')
'considered an outlier. [%(default)s]')

add_force_b0_arg(p)
add_verbose_arg(p)
Expand All @@ -64,73 +60,14 @@ def main():
assert_inputs_exist(parser, [args.in_dwi, args.in_bval, args.in_bvec])

bvals, bvecs = read_bvals_bvecs(args.in_bval, args.in_bvec)
dwi = nib.load(args.in_dwi)
data = dwi.get_fdata()
data = nib.load(args.in_dwi).get_fdata()

b0_thr = check_b0_threshold(args.force_b0_threshold,
bvals.min(), args.b0_thr)
bvecs = normalize_bvecs(bvecs)

results_dict = {}
shells_to_extract = identify_shells(bvals, b0_thr, sort=True)[0]
bvals = round_bvals_to_shell(bvals, shells_to_extract)
for bval in shells_to_extract[shells_to_extract > args.b0_thr]:
shell_idx = np.where(bvals == bval)[0]
shell = bvecs[shell_idx]
results_dict[bval] = np.ones((len(shell), 3)) * -1
for i, vec in enumerate(shell):
if np.linalg.norm(vec) < 0.001:
continue

dot_product = np.clip(np.tensordot(shell, vec, axes=1), -1, 1)
angle = np.arccos(dot_product) * 180 / math.pi
angle[np.isnan(angle)] = 0
idx = np.argpartition(angle, 4).tolist()
idx.remove(i)

avg_angle = np.average(angle[idx[:3]])
corr = np.corrcoef([data[..., shell_idx[i]].ravel(),
data[..., shell_idx[idx[0]]].ravel(),
data[..., shell_idx[idx[1]]].ravel(),
data[..., shell_idx[idx[2]]].ravel()])
results_dict[bval][i] = [shell_idx[i], avg_angle,
np.average(corr[0, 1:])]

for key in results_dict.keys():
avg_angle = np.round(np.average(results_dict[key][:, 1]), 4)
std_angle = np.round(np.std(results_dict[key][:, 1]), 4)

avg_corr = np.round(np.average(results_dict[key][:, 2]), 4)
std_corr = np.round(np.std(results_dict[key][:, 2]), 4)

outliers_angle = np.argwhere(
results_dict[key][:, 1] < avg_angle-(args.std_scale*std_angle))
outliers_corr = np.argwhere(
results_dict[key][:, 2] < avg_corr-(args.std_scale*std_corr))

print('Results for shell {} with {} directions:'.format(
key, len(results_dict[key])))
print('AVG and STD of angles: {} +/- {}'.format(
avg_angle, std_angle))
print('AVG and STD of correlations: {} +/- {}'.format(
avg_corr, std_corr))

if len(outliers_angle) or len(outliers_corr):
print('Possible outliers ({} STD below or above average):'.format(
args.std_scale))
print('Outliers based on angle [position (4D), value]')
for i in outliers_angle:
print(results_dict[key][i, :][0][0:2])
print('Outliers based on correlation [position (4D), value]')
for i in outliers_corr:
print(results_dict[key][i, :][0][0::2])
else:
print('No outliers detected.')

if args.verbose:
print('Shell with b-value {}'.format(key))
pprint.pprint(results_dict[key])
print()
detect_volume_outliers(data, bvecs, bvals, args.std_scale,
args.verbose, b0_thr)


if __name__ == "__main__":
Expand Down

0 comments on commit e39da13

Please sign in to comment.