Skip to content

Commit

Permalink
refactor/transfermech_and_fct/clip_scale_offset (#3171)
Browse files Browse the repository at this point in the history
• transferfunctions.py:
  - TransferFunction:  add _set_bounds() to be compatible with any assignments to scale and/or offset
  - Logistic(): modify offset to be vertical vs. horizontal

• transfermechanism.py:
  - clip: add setter, which checks against bounds of function

• test_transfer_mechanism.py:
  - add test_clip_with_respect_to_fct_bounds()

---------

Co-authored-by: jdcpni <pniintel55>
  • Loading branch information
jdcpni authored Jan 26, 2025
1 parent d05a63a commit ef11a7c
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 112 deletions.
165 changes: 111 additions & 54 deletions psyneulink/core/components/functions/nonstateful/transferfunctions.py

Large diffs are not rendered by default.

137 changes: 85 additions & 52 deletions psyneulink/core/components/mechanisms/processing/transfermechanism.py
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,54 @@ def _integrator_mode_setter(value, owning_component=None, context=None, *, compi

return value

def _clip_setter(value, owning_component=None, context=None):

if (value is not None
and owning_component.function
and hasattr(owning_component.function, 'bounds')
and owning_component.function.bounds is not None
and isinstance(owning_component.function.bounds, tuple)
and owning_component.function.bounds != (None, None)):
bounds = owning_component.function.bounds
if bounds[0] is None:
lower_bound = -np.inf
else:
lower_bound = bounds[0]
if bounds[1] is None:
upper_bound = np.inf
else:
upper_bound = bounds[1]
if value[0] is None:
lower_clip = -np.inf
else:
lower_clip = value[0]
if value[1] is None:
upper_clip = np.inf
else:
upper_clip = value[1]
clip = (lower_clip, upper_clip)
lower_bad = clip[0] < lower_bound
upper_bad = clip[1] > upper_bound
lower_msg = (f"lower value of clip for '{owning_component.name}' ({value[0]}) is "
f"below its function's lower bound ({lower_bound})") if lower_bad else ""
if lower_bad and upper_bad:
val_msg = "clip"
upper_msg = (f" and its upper value ({value[1]}) is above the function's upper bound "
f"({upper_bound})") if upper_bad else ""
else:
val_msg = "it"
upper_msg = (f"upper value of clip for '{owning_component.name}' ({value[1]}) is above "
f"its function's upper bound ({upper_bound})") if upper_bad else ""
if lower_bad or upper_bad:
# Avoid duplicate warnings:
warning_msg = (f"The {lower_msg}{upper_msg}, so {val_msg} will not have an effect.")
if (not hasattr(owning_component, "clip_warning_msg")
or owning_component.clip_warning_msg != warning_msg):
warnings.warn(warning_msg)
owning_component.clip_warning_msg = warning_msg
return clip
return value


# IMPLEMENTATION NOTE: IMPLEMENTS OFFSET PARAM BUT IT IS NOT CURRENTLY BEING USED
class TransferMechanism(ProcessingMechanism_Base):
Expand Down Expand Up @@ -1023,7 +1071,12 @@ class TransferMechanism(ProcessingMechanism_Base):
determines the allowable range for all elements of the result of `function <Mechanism_Base.function>`.
The 1st item (index 0) determines the minimum allowable value of the result, and the 2nd item (index 1)
determines the maximum allowable value; any element of the result that exceeds the specified minimum or
maximum value is set to the value of clip that it exceeds.
maximum value is set to the value of clip that it exceeds. If either item is `None`, no clipping is
performed for that item. If the `function <Mechanism_Base.function>` returns an array, the clip is applied
elementwise (i.e., the clip is applied to each element of the array independently). If either item is outside
the interval of the function's `bounds <TransferFunction.bounds>` after its `scale <TransferFunction.scale>` and
`offset <TransferFunction.offset>` have been applied (i.e., :math:`function.bounds(lower,upper) * scale +
offset`), a warning is issued and that item of the clip is ignored.
integrator_mode : bool
determines whether the TransferMechanism uses its `integrator_function <TransferMechanism.integrator_function>`
Expand Down Expand Up @@ -1216,7 +1269,7 @@ class Parameters(ProcessingMechanism_Base.Parameters):
function = Parameter(Linear, stateful=False, loggable=False, dependencies='integrator_function')
integrator_function_value = Parameter([[0]], read_only=True)
on_resume_integrator_mode = Parameter(CURRENT_VALUE, stateful=False, loggable=False)
clip = None
clip = Parameter(None, setter=_clip_setter)#, modulable=False, stateful=False, loggable=False)
noise = FunctionParameter(0.0, function_name='integrator_function')
termination_measure = Parameter(
Distance(metric=MAX_ABS_DIFF),
Expand Down Expand Up @@ -1470,29 +1523,40 @@ def _validate_noise(self, noise):
raise MechanismError(f"Noise parameter ({noise}) for {self.name} must be a float, "
f"function, or array/list of these.")

def _instantiate_parameter_ports(self, function=None, context=None):
def _instantiate_attributes_after_function(self, context=None):
"""Determine number of items expected by termination_measure and check clip if specified"""
super()._instantiate_attributes_after_function(context)

# If function is a logistic, and clip has not been specified, bound it between 0 and 1
if (
(
isinstance(function, Logistic)
or (
inspect.isclass(function)
and issubclass(function, Logistic)
)
)
and self.clip is None
):
self.clip = (0,1)
measure = self.termination_measure

super()._instantiate_parameter_ports(function=function, context=context)
if isinstance(measure, TimeScale):
self._termination_measure_num_items_expected = 0
self.parameters.termination_comparison_op._set(GREATER_THAN_OR_EQUAL, context)
return

def _instantiate_attributes_before_function(self, function=None, context=None):
super()._instantiate_attributes_before_function(function=function, context=context)
try:
# If measure is a Function, use its default_variable to determine expected number of items
self._termination_measure_num_items_expected = len(measure.parameters.variable.default_value)
except:
# Otherwise, use "duck typing"
try:
# Try a single item first (only uses value, and not previous_value)
measure(np.array([0,0]))
self._termination_measure_num_items_expected = 1
except:
try:
# termination_measure takes two arguments -- value and previous_value -- (e.g., Distance)
measure(np.array([[0,0],[0,0]]))
self._termination_measure_num_items_expected = 2
except:
assert False, f"PROGRAM ERROR: Unable to determine length of input for" \
f" {repr(TERMINATION_MEASURE)} arg of {self.name}"

if self.parameters.initial_value._get(context) is None:
self.defaults.initial_value = copy.deepcopy(self.defaults.variable)
self.parameters.initial_value._set(copy.deepcopy(self.defaults.variable), context)
self.parameters.value.history_min_length = self._termination_measure_num_items_expected - 1

# Force call to _clip_setter in order to check against bounds (now that any function bounds are known)
if self.clip is not None:
self.clip = self.clip

def _instantiate_output_ports(self, context=None):
# If user specified more than one item for variable, but did not specify any custom OutputPorts,
Expand Down Expand Up @@ -1725,37 +1789,6 @@ def _parse_function_variable(self, variable, context=None):
else:
return self._get_instantaneous_function_input(variable, noise, context)

def _instantiate_attributes_after_function(self, context=None):
"""Determine numberr of items expected by termination_measure"""
super()._instantiate_attributes_after_function(context)

measure = self.termination_measure

if isinstance(measure, TimeScale):
self._termination_measure_num_items_expected = 0
self.parameters.termination_comparison_op._set(GREATER_THAN_OR_EQUAL, context)
return

try:
# If measure is a Function, use its default_variable to determine expected number of items
self._termination_measure_num_items_expected = len(measure.parameters.variable.default_value)
except:
# Otherwise, use "duck typing"
try:
# Try a single item first (only uses value, and not previous_value)
measure(np.array([0,0]))
self._termination_measure_num_items_expected = 1
except:
try:
# termination_measure takes two arguments -- value and previous_value -- (e.g., Distance)
measure(np.array([[0,0],[0,0]]))
self._termination_measure_num_items_expected = 2
except:
assert False, f"PROGRAM ERROR: Unable to determine length of input for" \
f" {repr(TERMINATION_MEASURE)} arg of {self.name}"

self.parameters.value.history_min_length = self._termination_measure_num_items_expected - 1

def _report_mechanism_execution(self, input, params=None, output=None, context=None):
"""Override super to report previous_input rather than input, and selected params
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ def __init__(self,
prefs=prefs,
**kwargs
)
assert True

def _parse_function_variable(self, variable, context=None):
return self._kwta_scale(variable, context=context)
Expand Down
2 changes: 1 addition & 1 deletion tests/functions/test_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
gaussian_helper = RAND3 * gaussian_helper + RAND4

relu_helper = np.maximum(RAND1 * (test_var - RAND2), RAND3 * RAND1 *(test_var - RAND2))
logistic_helper = RAND4 / (1 + np.exp(-(RAND1 * (test_var - RAND2)) + RAND3))
logistic_helper = RAND4 / (1 + np.exp(-(RAND1 * (test_var - RAND2)))) + RAND3

def gaussian_distort_helper(seed):
state = np.random.RandomState([seed])
Expand Down
2 changes: 1 addition & 1 deletion tests/log/test_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -1353,7 +1353,7 @@ def test_multilayer(self):

middle_weights.log.print_entries()

# Test Programatic logging
# Test Programmatic logging
hidden_layer_2.log.log_values(pnl.VALUE, comp)
log_val = hidden_layer_2.log.nparray(header=False)
expected_log_val = np.array(
Expand Down
4 changes: 2 additions & 2 deletions tests/mechanisms/test_kwta.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def test_kwta_log_offset(self):
k_value=2
)
val = K.execute(input=[1, 2, 3])
np.testing.assert_allclose(val, [[0.425557483188341, 0.6681877721681662, 0.84553473491646523]])
np.testing.assert_allclose(val, [[0.17754067, 0.42245933, 0.61757448]])

# the inhibition would have to be positive in order to get the desired activity level: thus, inhibition is set to 0
def test_kwta_log_gain_offset(self):
Expand All @@ -132,7 +132,7 @@ def test_kwta_log_gain_offset(self):
k_value=1
)
val = K.execute(input = [.1, -4])
np.testing.assert_allclose(val, [[0.017636340339722684, 0.039165722796764356]])
np.testing.assert_allclose(val, [[4.49500017, 4.68997448]])

def test_kwta_linear(self): # inhibition would be positive: so instead it is set to zero
K = KWTAMechanism(
Expand Down
4 changes: 2 additions & 2 deletions tests/mechanisms/test_recurrent_transfer_mechanism.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ def test_recurrent_mech_function_logistic(self):
function=Logistic(gain=2, offset=1)
)
val = R.execute(np.ones(10))
np.testing.assert_allclose(val, [np.full(10, 0.7310585786300049)])
np.testing.assert_allclose(val, [np.full(10, 1.880797)])

def test_recurrent_mech_function_psyneulink(self):

Expand All @@ -490,7 +490,7 @@ def test_recurrent_mech_function_psyneulink(self):
function=a
)
val = R.execute(np.zeros(7))
np.testing.assert_allclose(val, [np.full(7, 0.2689414213699951)])
np.testing.assert_allclose(val, [np.full(7, 1.5)])

def test_recurrent_mech_function_custom(self):
# I don't know how to do this at the moment but it seems highly important.
Expand Down
32 changes: 32 additions & 0 deletions tests/mechanisms/test_transfer_mechanism.py
Original file line number Diff line number Diff line change
Expand Up @@ -1715,6 +1715,38 @@ def test_clip_2d_array(self, mech_mode):
np.testing.assert_allclose(EX([[-5.0, -1.0, 5.0], [5.0, -5.0, 1.0], [1.0, 5.0, 5.0]]),
[[-2.0, -1.0, 2.0], [2.0, -2.0, 1.0], [1.0, 2.0, 2.0]])

test_params = [
# test_for clip scale offset input expected warning_msg
["no clip", None, 2, 1, 1.5, 2.63514895, None],
["ok clip", (1.0, 3.0), 2, 1, 1.5, 2.63514895, None],
["clip lower", (1.0, 3.0), 2, -1, 1.5, 1.0, None],
["clip upper", (1.0, 3.0), 2, 2, 1.5, 3.0, None],
["warning lower", (-1.0, 2.0), 2, 0, 1.5, 1.63514895, ("The lower value of clip for 'MECH' (-1.0) "
"is below its function's lower bound (0), "
"so it will not have an effect.")],
["warning upper", (-1.0, 3.0), 2, -1, 1.5, 0.63514895, ("The upper value of clip for 'MECH' (3.0) "
"is above its function's upper bound (1), "
"so it will not have an effect.")],
["warning both", (1.0, 5.0), 2, 2, 1.5, 3.63514895, ("The lower value of clip for 'MECH' (1.0) "
"is below its function's lower bound (2) "
"and its upper value (5.0) is above the "
"function's upper bound (4), so clip will "
"not have an effect.")],
]
arg_names = "test_for, clip, scale, offset, input, expected, warning"

@pytest.mark.parametrize(arg_names, test_params, ids=[x[0] for x in test_params])
def test_clip_with_respect_to_fct_bounds(self, test_for, clip, scale, offset, input, expected, warning):
"""Test for clip that falls outside range of function"""
if warning:
with pytest.warns(UserWarning) as warnings:
mech = TransferMechanism(name='MECH', clip=clip, function=Logistic(scale=scale, offset=offset))
assert warning in str(warnings.list[0].message)
else:
mech = TransferMechanism(clip=clip, function=Logistic(scale=scale, offset=offset))
assert mech.clip == clip
np.testing.assert_allclose(mech.execute(input), expected)


class TestOutputPorts:
def test_output_ports_match_input_ports(self):
Expand Down

0 comments on commit ef11a7c

Please sign in to comment.