Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bounds_transformer could bypass global_bounds due to the test logic within _trim function in domain_reduction.py #441

Merged
merged 15 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 103 additions & 31 deletions bayes_opt/domain_reduction.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import numpy as np
from .target_space import TargetSpace
from warnings import warn


class DomainTransformer():
Expand Down Expand Up @@ -36,7 +37,10 @@ def __init__(
self.minimum_window_value = minimum_window

def initialize(self, target_space: TargetSpace) -> None:
"""Initialize all of the parameters"""
"""Initialize all of the parameters.
"""

# Set the original bounds
self.original_bounds = np.copy(target_space.bounds)
self.bounds = [self.original_bounds]

Expand All @@ -47,6 +51,7 @@ def initialize(self, target_space: TargetSpace) -> None:
else:
self.minimum_window = [self.minimum_window_value] * len(target_space.bounds)

# Set initial values
self.previous_optimal = np.mean(target_space.bounds, axis=1)
self.current_optimal = np.mean(target_space.bounds, axis=1)
self.r = target_space.bounds[:, 1] - target_space.bounds[:, 0]
Expand All @@ -72,14 +77,13 @@ def initialize(self, target_space: TargetSpace) -> None:
self._window_bounds_compatibility(self.original_bounds)

def _update(self, target_space: TargetSpace) -> None:

""" Updates contraction rate, window size, and window center.
"""
# setting the previous
self.previous_optimal = self.current_optimal
self.previous_d = self.current_d

self.current_optimal = target_space.params[
np.argmax(target_space.target)
]

self.current_optimal = self._windowed_max(target_space)

self.current_d = 2.0 * (self.current_optimal -
self.previous_optimal) / self.r
Expand All @@ -95,34 +99,101 @@ def _update(self, target_space: TargetSpace) -> None:
np.abs(self.current_d) * (self.gamma - self.eta)

self.r = self.contraction_rate * self.r

def _windowed_max(self, target_space: TargetSpace) -> np.array:
perezed00 marked this conversation as resolved.
Show resolved Hide resolved
"""Returns the parameters that produce the greatest target value within its bounds.
"""
# extract the components we need from the target space
params = np.copy(target_space.params.T)
target = np.copy(target_space.target)
bounds = np.copy(target_space.bounds)

# create a mask by checking each params against its bounds
mask = np.zeros_like(target)
for n, row in enumerate(params):
lower_bound = bounds[n, 0]
upper_bound = bounds[n, 1]
mask += (row <= lower_bound).astype(int)
mask += (row >= upper_bound).astype(int)
mask = mask<1

# apply the mask
params = params.T[mask]
targets = target[mask]

best_params = params[np.argmax(targets)]

return best_params

def _trim(self, new_bounds: np.array, global_bounds: np.array) -> np.array:
for i, variable in enumerate(new_bounds):
if variable[0] < global_bounds[i, 0]:
variable[0] = global_bounds[i, 0]
if variable[1] > global_bounds[i, 1]:
variable[1] = global_bounds[i, 1]
for i, entry in enumerate(new_bounds):
if entry[0] > entry[1]:
new_bounds[i, 0] = entry[1]
new_bounds[i, 1] = entry[0]
window_width = abs(entry[0] - entry[1])
if window_width < self.minimum_window[i]:
dw = (self.minimum_window[i] - window_width) / 2.0
left_expansion_space = abs(global_bounds[i, 0] - entry[0]) # should be non-positive
right_expansion_space = abs(global_bounds[i, 1] - entry[1]) # should be non-negative
# conservative
dw_l = min(dw, left_expansion_space)
dw_r = min(dw, right_expansion_space)
# this crawls towards the edge
ddw_r = dw_r + max(dw - dw_l, 0)
ddw_l = dw_l + max(dw - dw_r, 0)
new_bounds[i, 0] -= ddw_l
new_bounds[i, 1] += ddw_r
"""
Adjust the new_bounds and verify that they adhere to global_bounds and minimum_window.

Parameters:
-----------
new_bounds : np.array
The proposed new_bounds that (may) need adjustment.

global_bounds : np.array
The maximum allowable bounds for each parameter.

Returns:
--------
new_bounds : np.array
The adjusted bounds after enforcing constraints.
"""

#sort bounds
new_bounds = np.sort(new_bounds)

# Validate each parameter's bounds against the global_bounds
for i, pbounds in enumerate(new_bounds):
# If the lower bound of the parameter is outside the global bounds, reset the lower bound
if (pbounds[0] < global_bounds[i, 0] or pbounds[0] > global_bounds[i, 1]):
pbounds[0] = global_bounds[i, 0]
warn("""Domain Reduction Warning:
A parameter's lower bound has exceeded its global limit.
The offensive boundary has been reset, but be cautious of optimizer convergence.""")
perezed00 marked this conversation as resolved.
Show resolved Hide resolved

# If the upper bound bound of the parameter is outside the global bounds, reset the lower bound
if (pbounds[1] > global_bounds[i, 1] or pbounds[1] < global_bounds[i, 0]):
pbounds[1] = global_bounds[i, 1]
till-m marked this conversation as resolved.
Show resolved Hide resolved
warn("""Domain reduction warning:
A parameter's lower bound has exceeded its global limit.
The offensive boundary has been reset, but be cautious of optimizer convergence.""")
perezed00 marked this conversation as resolved.
Show resolved Hide resolved

# Adjust new_bounds to ensure they respect the minimum window width for each parameter
for i, pbounds in enumerate(new_bounds):
current_window_width = abs(pbounds[0] - pbounds[1])

# If the window width is less than the minimum allowable width, adjust it
# Note that minimum_window < width of the global bounds one side always has more space than required
if current_window_width < self.minimum_window[i]:
width_deficit = (self.minimum_window[i] - current_window_width) / 2.0
available_left_space = abs(global_bounds[i, 0] - pbounds[0])
available_right_space = abs(global_bounds[i, 1] - pbounds[1])

# determine how much to expand on the left and right
expand_left = min(width_deficit, available_left_space)
expand_right = min(width_deficit, available_right_space)

# calculate the deficit on each side
expand_left_deficit = width_deficit - expand_left
expand_right_deficit = width_deficit - expand_right

# shift the deficit to the side with more space
adjust_left = expand_left + max(expand_right_deficit, 0)
adjust_right = expand_right + max(expand_left_deficit, 0)

# adjust the bounds
pbounds[0] -= adjust_left
pbounds[1] += adjust_right
perezed00 marked this conversation as resolved.
Show resolved Hide resolved

return new_bounds

def _window_bounds_compatibility(self, global_bounds: np.array) -> bool:
"""Checks if global bounds are compatible with the minimum window sizes."""
"""Checks if global bounds are compatible with the minimum window sizes.
"""
for i, entry in enumerate(global_bounds):
global_window_width = abs(entry[1] - entry[0])
if global_window_width < self.minimum_window[i]:
Expand All @@ -133,7 +204,8 @@ def _create_bounds(self, parameters: dict, bounds: np.array) -> dict:
return {param: bounds[i, :] for i, param in enumerate(parameters)}

def transform(self, target_space: TargetSpace) -> dict:

"""Reduces the bounds of the target space.
"""
self._update(target_space)

new_bounds = np.array(
Expand All @@ -143,6 +215,6 @@ def transform(self, target_space: TargetSpace) -> dict:
]
).T

self._trim(new_bounds, self.original_bounds)
new_bounds = self._trim(new_bounds, self.original_bounds)
self.bounds.append(new_bounds)
return self._create_bounds(target_space.keys, new_bounds)
39 changes: 38 additions & 1 deletion tests/test_seq_domain_red.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,41 @@ def test_exceeded_bounds():
verbose=0,
random_state=1,
bounds_transformer=bounds_transformer
)
)

def test_trim_both_new_bounds_beyond_global_bounds():
"""Test if the global bounds are respected when both new bounds for a given parameter
are beyond the global bounds."""

# initialize a bounds transformer
bounds_transformer = SequentialDomainReductionTransformer(minimum_window=10)
pbounds = {'x': (-10, 10),'y': (-10, 10)}
target_sp = TargetSpace(target_func=black_box_function, pbounds=pbounds)
bounds_transformer.initialize(target_sp)
global_bounds = np.asarray(list(pbounds.values()))

def verify_bounds_in_range(new_bounds, global_bounds):
"""Check if the new bounds are within the global bounds."""
test = True
for i, pbounds in enumerate(new_bounds):
if (pbounds[0] < global_bounds[i, 0] or pbounds[0] > global_bounds[i, 1]):
test = False
if (pbounds[1] > global_bounds[i, 1] or pbounds[1] < global_bounds[i, 0]):
test = False
return test

# test if the sorting of the bounds is correct
new_bounds = np.array( [[5, -5], [-10, 10]] )
trimmed_bounds = bounds_transformer._trim(new_bounds, global_bounds)
assert (trimmed_bounds == np.array( [[-5, 5], [-10, 10]] )).all()

# test if both bounds for one parameter are beyond the global bounds
new_bounds = np.array( [[-50, -20], [-10, 10]] )
trimmed_bounds = bounds_transformer._trim(new_bounds, global_bounds)
assert verify_bounds_in_range(trimmed_bounds, global_bounds)

# test if both bounds for one parameter are beyond the global bounds
# while they are out of order
new_bounds = np.array( [[-20, -50], [-10, 10]] )
trimmed_bounds = bounds_transformer._trim(new_bounds, global_bounds)
assert verify_bounds_in_range(trimmed_bounds, global_bounds)