Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add advanced Optuna plugin authoring methods #3050

Closed
wants to merge 35 commits into from

Conversation

granthamtaylor
Copy link
Contributor

@granthamtaylor granthamtaylor commented Jan 11, 2025

Fully Parallelized Wrapper Around Optuna Using Flyte

Overview

This documentation provides a guide to a fully parallelized Flyte plugin for Optuna. This wrapper leverages Flyte's scalable and distributed workflow orchestration capabilities to parallelize Optuna's hyperparameter optimization across multiple trials efficiently.

Features

  • Ease of Use: This plugin requires no external data storage or experiment tracking.
  • Parallelized Trial Execution: Enables concurrent execution of Optuna trials, dramatically speeding up optimization tasks.
  • Scalability: Leverages Flyte’s ability to scale horizontally to handle large-scale hyperparameter tuning jobs.
  • Flexible Integration: Compatible with various machine learning frameworks and training pipelines.

Installation

  • Install flytekit
  • Install flytekitplugins.optuna

Getting Started

Prerequisites

  • A Flyte deployment configured and running.
  • Python 3.9 or later.
  • Familiarity with Flyte and asynchronous programming.

Define the Objective Function

The objective function defines the problem to be optimized. It should include the hyperparameters to be tuned and return a value to minimize or maximize.

import math

import flytekit as fl

image = fl.ImageSpec(packages=["flytekitplugins.optuna"])

@fl.task(container_image=image)
async def objective(x: float, y: int, z: int, power: int) -> float:
    return math.log((((x - 5) ** 2) + (y + 4) ** 4 + (3 * z - 3) ** 2)) ** power

Configure the Flyte Workflow

The Flyte workflow orchestrates the parallel execution of Optuna trials. Below is an example:

import flytekit as fl
from flytekitplugins.optuna import Optimizer, suggest

@fl.eager(container_image=image)
async def train(concurrency: int, n_trials: int) -> float:

    optimizer = Optimizer(objective=objective, concurrency=concurrency, n_trials=n_trials)

    await optimizer(
        x = suggest.float(low=-10, high=10),
        y = suggest.integer(low=-10, high=10),
        z = suggest.category([-5, 0, 3, 6, 9]),
        power = 2,
    )

    print(optimizer.study.best_value)

Register and Execute the Workflow

Submit the workflow to Flyte for execution:

pyflyte register files .
pyflyte run --name train

Monitor Progress

You can monitor the progress of the trials via the Flyte Console. Each trial runs as a separate task, and the results are aggregated by the Optuna wrapper.

You may access the optuna.Study like so: optimizer.study.

Therefore, with plotly installed, you may create create Flyte Decks of the study like so:

import plotly

fig = optuna.visualization.plot_timeline(optimizer.study)
fl.Deck(name, plotly.io.to_html(fig))

Advanced Configuration

Custom Dictionary Inputs

Suggestions may be defined in recursive dictionaries:

import flytekit as fl
from flytekitplugins.optuna import Optimizer, suggest

image = fl.ImageSpec(packages=["flytekitplugins.optuna"])


@fl.task(container_image=image)
async def objective(params: dict[str, int | float | str]) -> float:
    ...


@fl.eager(container_image=image)
async def train(concurrency: int, n_trials: int):

    study = optuna.create_study(direction="maximize")

    optimizer = Optimizer(objective=objective, concurrency=concurrency, n_trials=n_trials, study=study)

    params = {
        "lambda": suggest.float(1e-8, 1.0, log=True),
        "alpha": suggest.float(1e-8, 1.0, log=True),
        "subsample": suggest.float(0.2, 1.0),
        "colsample_bytree": suggest.float(0.2, 1.0),
        "max_depth": suggest.integer(3, 9, step=2),
        "objective": "binary:logistic",
        "tree_method": "exact",
        "booster": "dart",
    }

    await optimizer(params=params)

Custom Callbacks

In some cases, you may need to create define the suggestions programmatically. This may be done

import flytekit as fl
import optuna
from flytekitplugins.optuna import optimize

image = fl.ImageSpec(packages=["flytekitplugins.optuna"])

@fl.task(container_image=image)
async def objective(params: dict[str, int | float | str]) -> float:
    ...

@fl.eager(container_image=image)
async def train(concurrency: int, n_trials: int):

    study = optuna.create_study(direction="maximize")

    @optimize(n_trials=n_trials, concurrency=concurrency, study=study)
    def optimizer(trial: optuna.Trial, verbosity: int, tree_method: str):

        params = {
            "verbosity:": verbosity,
            "tree_method": tree_method,
            "objective": "binary:logistic",
            # defines booster, gblinear for linear functions.
            "booster": trial.suggest_categorical("booster", ["gbtree", "gblinear", "dart"]),
            # sampling according to each tree.
            "colsample_bytree": trial.suggest_float("colsample_bytree", 0.2, 1.0),
        }

        if params["booster"] in ["gbtree", "dart"]:
            # maximum depth of the tree, signifies complexity of the tree.
            params["max_depth"] = trial.suggest_int("max_depth", 3, 9, step=2)

        if params["booster"] == "dart":
            params["sample_type"] = trial.suggest_categorical("sample_type", ["uniform", "weighted"])
            params["normalize_type"] = trial.suggest_categorical("normalize_type", ["tree", "forest"])

        return objective(params)

    await optimizer(verbosity=0, tree_method="exact")

Troubleshooting

Resource Constraints: Ensure sufficient compute resources are allocated for the number of parallel jobs specified.

Flyte Errors: Refer to the Flyte logs and documentation to debug workflow execution issues.

Summary by Bito

This PR enhances the Optuna plugin by implementing parameter bundling capabilities and improving validation. Key improvements include nested parameter suggestions, enhanced type handling, flexible configuration options, and support for multi-objective optimization. The implementation includes delay parameter control and supports both direct function calls and decorator syntax. The test suite has been comprehensively enhanced with simplified objective functions and new test cases.

Unit tests added: True

Estimated effort to review (1-5, lower is better): 3

@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 11, 2025

Code Review Agent Run #0054cc

Actionable Suggestions - 4
  • plugins/flytekit-optuna/flytekitplugins/optuna/optimizer.py - 3
    • Consider impact of removing workflow type · Line 59-59
    • Consider adding delay parameter validation · Line 63-63
    • Consider adding type check for suggesters · Line 174-175
  • plugins/flytekit-optuna/setup.py - 1
    • Consider adding upper bound for flytekit · Line 7-7
Additional Suggestions - 1
  • plugins/flytekit-optuna/flytekitplugins/optuna/optimizer.py - 1
    • Consider simplifying execution mode handling · Line 138-143
Review Details
  • Files reviewed - 7 · Commit Range: 4298d66..9589dfa
    • plugins/flytekit-optuna/flytekitplugins/optuna/__init__.py
    • plugins/flytekit-optuna/flytekitplugins/optuna/optimizer.py
    • plugins/flytekit-optuna/setup.py
    • plugins/flytekit-optuna/tests/test_callback.py
    • plugins/flytekit-optuna/tests/test_decorator.py
    • plugins/flytekit-optuna/tests/test_imperative.py
    • plugins/flytekit-optuna/tests/test_optimizer.py
  • Files skipped - 1
    • plugins/flytekit-optuna/README.md - Reason: Filter setting
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

@granthamtaylor granthamtaylor changed the title Grantham/add optuna param bundle Add advanced Optuna plugin authoring methods Jan 11, 2025
@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 11, 2025

Changelist by Bito

This pull request implements the following key changes.

Key Change Files Impacted
Feature Improvement - Enhanced Optuna Plugin Core Functionality

__init__.py - Added optimize function to public API

optimizer.py - Implemented advanced parameter bundling and callback support with improved type handling

Testing - Comprehensive Test Coverage

test_callback.py - Added tests for callback functionality

test_decorator.py - Added tests for decorator pattern usage

test_imperative.py - Added tests for imperative API usage

test_validation.py - Added validation tests for optimizer parameters

test_optimizer.py - Removed old test file in favor of more specific test modules

Other Improvements - Dependency Updates

setup.py - Added typing-extensions dependency

Copy link

codecov bot commented Jan 11, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 92.08%. Comparing base (f634d53) to head (734c2cf).
Report is 5 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #3050      +/-   ##
==========================================
+ Coverage   82.79%   92.08%   +9.29%     
==========================================
  Files           3      112     +109     
  Lines         186     4601    +4415     
==========================================
+ Hits          154     4237    +4083     
- Misses         32      364     +332     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 11, 2025

Code Review Agent Run #0bfefd

Actionable Suggestions - 5
  • plugins/flytekit-optuna/tests/test_validation.py - 1
    • Consider improving test objective function coverage · Line 8-9
  • plugins/flytekit-optuna/tests/test_decorator.py - 1
    • Consider extracting optimization logic to helper · Line 75-114
  • plugins/flytekit-optuna/flytekitplugins/optuna/optimizer.py - 3
    • Mismatch between validation and error message · Line 82-83
    • Duplicate float return type check · Line 92-93
    • Consider adding parameter validation checks · Line 154-159
Review Details
  • Files reviewed - 3 · Commit Range: 9589dfa..d7ebaba
    • plugins/flytekit-optuna/flytekitplugins/optuna/optimizer.py
    • plugins/flytekit-optuna/tests/test_decorator.py
    • plugins/flytekit-optuna/tests/test_validation.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

Comment on lines +8 to +9
async def objective(x: float, y: int, z: int, power: int) -> float:
return 1.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider improving test objective function coverage

The objective function returns a hardcoded value 1.0 which may not provide meaningful test coverage for validation. Consider implementing test cases that validate different input combinations and their impact on the optimization process.

Code suggestion
Check the AI-generated fix before applying
Suggested change
async def objective(x: float, y: int, z: int, power: int) -> float:
return 1.0
async def objective(x: float, y: int, z: int, power: int) -> float:
return x + y + z + power # Example computation using all parameters

Code Review Run #0bfefd


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +75 to +114
def test_unparameterized_callback():


@fl.task
async def objective(letter: str, number: Union[float, int], other: str, fixed: str) -> float:

loss = len(letter) + number + len(other) + len(fixed)

return float(loss)

@optimize
def optimizer(trial: optuna.Trial, fixed: str):

letter = trial.suggest_categorical("booster", ["A", "B", "BLAH"])

if letter == "A":
number = trial.suggest_int("number_A", 1, 10)
elif letter == "B":
number = trial.suggest_float("number_B", 10., 20.)
else:
number = 10

other = trial.suggest_categorical("other", ["Something", "another word", "a phrase"])

return objective(letter, number, other, fixed)


@fl.eager
async def train(concurrency: int, n_trials: int) -> float:

optimizer.n_trials = n_trials
optimizer.concurrency = concurrency

await optimizer(fixed="hello!")

return float(optimizer.study.best_value)

loss = asyncio.run(train(concurrency=2, n_trials=10))

assert isinstance(loss, float)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider extracting optimization logic to helper

Consider extracting the optimization logic from test_unparameterized_callback() into a separate helper function to improve test readability and maintainability. The test contains complex optimization setup that could be reused.

Code suggestion
Check the AI-generated fix before applying
 @@ -75,40 +75,45 @@
  def test_unparameterized_callback():
 +    optimizer = create_test_optimizer()
 +    loss = asyncio.run(train(optimizer, concurrency=2, n_trials=10))
 +    assert isinstance(loss, float)
 +
 def create_test_optimizer():
      @fl.task
      async def objective(letter: str, number: Union[float, int], other: str, fixed: str) -> float:
          loss = len(letter) + number + len(other) + len(fixed)
          return float(loss)

      @optimize
      def optimizer(trial: optuna.Trial, fixed: str):
          letter = trial.suggest_categorical("booster", ["A", "B", "BLAH"])
          if letter == "A":
              number = trial.suggest_int("number_A", 1, 10)
          elif letter == "B":
              number = trial.suggest_float("number_B", 10., 20.)
          else:
              number = 10
          other = trial.suggest_categorical("other", ["Something", "another word", "a phrase"])
          return objective(letter, number, other, fixed)
      return optimizer

Code Review Run #0bfefd


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines 82 to 83
if not isinstance(self.delay, int) or (self.delay < 0):
raise ValueError("delay must be an integer greater than 0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mismatch between validation and error message

The error message states 'greater than 0' but the validation allows 0 (delay < 0). Consider aligning the validation with the error message.

Code suggestion
Check the AI-generated fix before applying
Suggested change
if not isinstance(self.delay, int) or (self.delay < 0):
raise ValueError("delay must be an integer greater than 0")
if not isinstance(self.delay, int) or (self.delay <= 0):
raise ValueError("delay must be an integer greater than 0")

Code Review Run #0bfefd


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines 92 to 93
elif signature.return_annotation is float:
args = signature.return_annotation.__args__
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate float return type check

The condition signature.return_annotation is float appears to be duplicated from line 88, which may lead to unreachable code. Consider removing the duplicate condition or updating it to check for tuple return type.

Code suggestion
Check the AI-generated fix before applying
Suggested change
elif signature.return_annotation is float:
args = signature.return_annotation.__args__
elif hasattr(signature.return_annotation, '__args__'):
args = signature.return_annotation.__args__

Code Review Run #0bfefd


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +154 to +159
def optimize(
objective: Optional[Union[CallbackType, PythonFunctionTask]] = None,
concurrency: int = 1,
n_trials: int = 1,
study: Optional[optuna.Study] = None,
):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding parameter validation checks

Consider adding validation for concurrency and n_trials parameters to ensure they are positive integers. These parameters control optimization behavior and should be validated early.

Code suggestion
Check the AI-generated fix before applying
Suggested change
def optimize(
objective: Optional[Union[CallbackType, PythonFunctionTask]] = None,
concurrency: int = 1,
n_trials: int = 1,
study: Optional[optuna.Study] = None,
):
def optimize(
objective: Optional[Union[CallbackType, PythonFunctionTask]] = None,
concurrency: int = 1,
n_trials: int = 1,
study: Optional[optuna.Study] = None,
):
if concurrency < 1:
raise ValueError('concurrency must be a positive integer')
if n_trials < 1:
raise ValueError('n_trials must be a positive integer')

Code Review Run #0bfefd


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 11, 2025

Code Review Agent Run #f1b6b3

Actionable Suggestions - 2
  • plugins/flytekit-optuna/tests/test_imperative.py - 2
    • Consider breaking down complex expressions · Line 42-43
    • Unused parameter in objective function calculation · Line 76-76
Review Details
  • Files reviewed - 3 · Commit Range: d7ebaba..734c2cf
    • plugins/flytekit-optuna/flytekitplugins/optuna/optimizer.py
    • plugins/flytekit-optuna/tests/test_decorator.py
    • plugins/flytekit-optuna/tests/test_imperative.py
  • Files skipped - 1
    • plugins/flytekit-optuna/README.md - Reason: Filter setting
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

Comment on lines +42 to +43
y0 = (((x - 5) ** 2) + (y + 4) ** 4 + (3 * z - 3) ** 2) ** power
y1 = (((x - 2) ** 4) + (y + 1) ** 2 + (4 * z - 1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider breaking down complex expressions

Consider extracting the complex mathematical expressions into separate variables or helper functions to improve readability and maintainability. The expressions (((x - 5) ** 2) + (y + 4) ** 4 + (3 * z - 3) ** 2) ** power and (((x - 2) ** 4) + (y + 1) ** 2 + (4 * z - 1)) could be broken down into smaller parts.

Code suggestion
Check the AI-generated fix before applying
Suggested change
y0 = (((x - 5) ** 2) + (y + 4) ** 4 + (3 * z - 3) ** 2) ** power
y1 = (((x - 2) ** 4) + (y + 1) ** 2 + (4 * z - 1))
def calc_y0(x, y, z, power):
term1 = (x - 5) ** 2
term2 = (y + 4) ** 4
term3 = (3 * z - 3) ** 2
return (term1 + term2 + term3) ** power
def calc_y1(x, y, z):
term1 = (x - 2) ** 4
term2 = (y + 1) ** 2
term3 = (4 * z - 1)
return term1 + term2 + term3
y0 = calc_y0(x, y, z, power)
y1 = calc_y1(x, y, z)

Code Review Run #f1b6b3


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


x, y = suggestions["x"], suggestions["y"]

return (((x - 5) ** 2) + (y + 4) ** 4) ** power
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused parameter in objective function calculation

The objective function appears to have been simplified by removing the z parameter from the calculation, but it is still being passed as an argument. Consider either using the z parameter in the calculation or removing it from the function signature if it's not needed.

Code suggestion
Check the AI-generated fix before applying
 -    async def objective(suggestions: dict[str, Union[int, float]], z: int, power: int) -> float:
 +    async def objective(suggestions: dict[str, Union[int, float]], power: int) -> float:

Code Review Run #f1b6b3


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants