Skip to content

Commit

Permalink
Update the lut linear interpolation to be used by different lookup ta…
Browse files Browse the repository at this point in the history
…ble files (#981)

* First idea for beam center, may move compute to plans

* Actually make it a plan

* Fix the tests

* Tidy up docstring

* Reword comments
  • Loading branch information
noemifrisina authored and stan-dot committed Jan 17, 2025
1 parent 83a5d7a commit 687ef29
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 23 deletions.
1 change: 1 addition & 0 deletions src/dodal/beamlines/i24.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
)
DISPLAY_CONFIG = "/dls_sw/i24/software/gda_versions/var/display.configuration"


BL = get_beamline_name("s24")
set_log_beamline(BL)
set_utils_beamline(BL)
Expand Down
2 changes: 1 addition & 1 deletion src/dodal/devices/i24/beam_center.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@

class DetectorBeamCenter(StandardReadable):
def __init__(self, prefix: str, name: str = "") -> None:
self.beam_x = epics_signal_rw(float, prefix + "BeamX")
self.beam_x = epics_signal_rw(float, prefix + "BeamX") # in pixels
self.beam_y = epics_signal_rw(float, prefix + "BeamY")
super().__init__(name)
24 changes: 14 additions & 10 deletions src/dodal/devices/util/lookup_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,32 @@ async def energy_distance_table(lookup_table_path: str) -> np.ndarray:
return loadtxt(StringIO(raw_table), comments=["#", "Units"])


def linear_interpolation_lut(filename: str) -> Callable[[float], float]:
def parse_lookup_table(filename: str) -> list[Sequence]:
"""Parse a generic lookup table with a number of columns >= 2 and return a list \
in column major order of the values in it."""
LOGGER.info(f"Parsing lookup table file {filename}")

lut_vals = zip(*loadtxt(filename, comments=["#", "Units"]), strict=False)
return list(lut_vals)


def linear_interpolation_lut(
s_values: Sequence, t_values: Sequence
) -> Callable[[float], float]:
"""Returns a callable that converts values by linear interpolation of lookup table
values.
If the value falls outside the lookup table then the closest value will be used."""
LOGGER.info(f"Using lookup table {filename}")
s_and_t_vals = zip(*loadtxt(filename, comments=["#", "Units"]), strict=False)

s_values: Sequence
t_values: Sequence
s_values, t_values = s_and_t_vals

# numpy interp expects x-values to be increasing
if not np.all(np.diff(s_values) > 0):
LOGGER.info(
f"Configuration file {filename} values are not ascending, trying reverse order..."
"Configuration values in the lookup table are not ascending, trying reverse order..."
)
s_values = list(reversed(s_values))
t_values = list(reversed(t_values))
if not np.all(np.diff(s_values) > 0):
raise AssertionError(
f"Configuration file {filename} lookup table does not monotonically increase or decrease."
"Configuration lookup table does not monotonically increase or decrease."
)

def s_to_t2(s: float) -> float:
Expand Down
34 changes: 22 additions & 12 deletions tests/devices/unit_tests/util/test_lookup_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@
from dodal.devices.util.lookup_tables import (
energy_distance_table,
linear_interpolation_lut,
parse_lookup_table,
)

TEST_DCM_LUT = "tests/test_data/test_beamline_dcm_roll_converter.txt"
TEST_DCM_LUT_REVERSED = "tests/test_data/test_beamline_dcm_roll_converter_reversed.txt"
TEST_BAD_DCM_LUT = "tests/test_data/test_beamline_dcm_roll_converter_non_monotonic.txt"
TEST_DET_DIST_CONVERTER_LUT = "tests/test_data/test_det_dist_converter.txt"


async def test_energy_to_distance_table_correct_format():
table = await energy_distance_table(
Expand All @@ -16,36 +22,40 @@ async def test_energy_to_distance_table_correct_format():
assert table.shape == (50, 2)


@mark.parametrize(
"lut_path, num_columns", [(TEST_DCM_LUT, 2), (TEST_DET_DIST_CONVERTER_LUT, 3)]
)
def test_parse_lookup_table_returns_list_of_the_same_length_as_num_of_columns(
lut_path, num_columns
):
lut_values = parse_lookup_table(lut_path)

assert isinstance(lut_values, list) and len(lut_values) == num_columns


@mark.parametrize("s, expected_t", [(2.0, 1.0), (3.0, 1.5), (5.0, 4.0), (5.25, 6.0)])
def test_linear_interpolation(s, expected_t):
lut_converter = linear_interpolation_lut(
"tests/test_data/test_beamline_dcm_roll_converter.txt"
)
lut_converter = linear_interpolation_lut(*parse_lookup_table(TEST_DCM_LUT))
assert lut_converter(s) == expected_t


@mark.parametrize(
"s, expected_t", [(2.0, 1.0), (3.0, 1.5), (5.0, 4.0), (5.25, 6.0), (5.5, 8.0)]
)
def test_linear_interpolation_reverse_order(s, expected_t):
lut_converter = linear_interpolation_lut(
"tests/test_data/test_beamline_dcm_roll_converter_reversed.txt"
)
lut_converter = linear_interpolation_lut(*parse_lookup_table(TEST_DCM_LUT_REVERSED))
actual_t = lut_converter(s)
assert actual_t == expected_t, f"actual {actual_t} != expected {expected_t}"


@mark.parametrize("s, expected_t", [(1.0, 1.0), (7.0, 8.0)])
def test_linear_interpolation_extrapolates_returning_the_last_value(s, expected_t):
lut_converter = linear_interpolation_lut(
"tests/test_data/test_beamline_dcm_roll_converter.txt"
)
lut_converter = linear_interpolation_lut(*parse_lookup_table(TEST_DCM_LUT))
actual_t = lut_converter(s)
assert actual_t == expected_t, f"actual {actual_t} != expected {expected_t}"


def test_linear_interpolation_rejects_non_monotonic_increasing():
test_s, test_t = parse_lookup_table(TEST_BAD_DCM_LUT)
with pytest.raises(AssertionError):
linear_interpolation_lut(
"tests/test_data/test_beamline_dcm_roll_converter_non_monotonic.txt"
)
linear_interpolation_lut(test_s, test_t)

0 comments on commit 687ef29

Please sign in to comment.