Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TrainKNN Runner/Operation for Benchmarking Approximate KNN Algorithms #556

Merged
merged 9 commits into from
Jul 18, 2024
75 changes: 73 additions & 2 deletions osbenchmark/worker_coordinator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
from osbenchmark.utils import convert
from osbenchmark.client import RequestContextHolder
# Mapping from operation type to specific runner
from osbenchmark.utils.parse import parse_int_parameter, parse_string_parameter
from osbenchmark.utils.parse import parse_int_parameter, parse_string_parameter, parse_float_parameter

__RUNNERS = {}

Expand Down Expand Up @@ -105,7 +105,7 @@ def register_default_runners():
register_runner(workload.OperationType.DeleteMlModel, Retry(DeleteMlModel()), async_runner=True)
register_runner(workload.OperationType.RegisterMlModel, Retry(RegisterMlModel()), async_runner=True)
register_runner(workload.OperationType.DeployMlModel, Retry(DeployMlModel()), async_runner=True)

register_runner(workload.OperationType.TrainKnnModel, Retry(TrainKnnModel()), async_runner=True)

def runner_for(operation_type):
try:
Expand Down Expand Up @@ -652,6 +652,77 @@ def __repr__(self, *args, **kwargs):
return "bulk-index"


class TrainKnnModel(Runner):
"""
Trains model named model_id until training is complete or retries are exhausted.
"""

NAME = "train-knn-model"

async def __call__(self, opensearch, params):
"""
Create and train one model named model_id.

:param opensearch: The OpenSearch client.
:param params: A hash with all parameters. See below for details.
:return: A hash with meta data for this bulk operation. See below for details.
:raises: Exception if training fails, times out, or a different error occurs.
It expects a parameter dict with the following mandatory keys:

* ``body``: containing parameters to pass on to the train engine.
See https://opensearch.org/docs/latest/search-plugins/knn/api/#train-a-model for information.
* ``retries``: Maximum number of retries allowed for the training to complete (seconds).
* ``polling-interval``: Polling interval to see if the model has been trained yet (seconds).
* ``model_id``: ID of the model to train.
"""
body = params["body"]
model_id = parse_string_parameter("model_id", params)
max_retries = parse_int_parameter("retries", params)
poll_period = parse_float_parameter("poll_period", params)

method = "POST"
model_uri = f"/_plugins/_knn/models/{model_id}"
request_context_holder.on_client_request_start()
await opensearch.transport.perform_request(method, f"{model_uri}/_train", body=body)

current_number_retries = 0
while True:
model_response = await opensearch.transport.perform_request("GET", model_uri)

if 'state' not in model_response.keys():
request_context_holder.on_client_request_end()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

request_context_holder.on_client_request_start() is missing

self.logger.error(
"Failed to create model [%s] with error response: [%s]", model_id, model_response)
raise Exception(
f"Failed to create model {model_id} with error response: {model_response}")

if current_number_retries > max_retries:
request_context_holder.on_client_request_end()
self.logger.error(
"Failed to create model [%s] within [%i] retries.", model_id, max_retries)
raise TimeoutError(
f'Failed to create model: {model_id} within {max_retries} retries')

if model_response['state'] == 'training':
current_number_retries += 1
await asyncio.sleep(poll_period)
continue

# at this point, training either failed or finished.
request_context_holder.on_client_request_end()
if model_response['state'] == 'created':
self.logger.info(
"Training model [%s] was completed successfully.", model_id)
break
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: return instead of break


# training failed.
self.logger.error(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, we should add model_response['state'] == 'failed' to validate failure condition, and, raise exception if state contains unexpected value. This will help us in case knn decided to add new status

"Training for model [%s] failed. Response: [%s]", model_id, model_response)
raise Exception(f"Failed to create model: {model_response}")

def __repr__(self, *args, **kwargs):
return self.NAME

# TODO: Add retry logic to BulkIndex, so that we can remove BulkVectorDataSet and use BulkIndex.
class BulkVectorDataSet(Runner):
"""
Expand Down
3 changes: 3 additions & 0 deletions osbenchmark/workload/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,7 @@ class OperationType(Enum):
ListAllPointInTime = 16
VectorSearch = 17
BulkVectorDataSet = 18
TrainKnnModel = 19

# administrative actions
ForceMerge = 1001
Expand Down Expand Up @@ -746,6 +747,8 @@ def from_hyphenated_string(cls, v):
return OperationType.RegisterMlModel
elif v == "deploy-ml-model":
return OperationType.DeployMlModel
elif v == "train-knn-model":
return OperationType.TrainKnnModel
else:
raise KeyError(f"No enum value for [{v}]")

Expand Down
177 changes: 176 additions & 1 deletion tests/worker_coordinator/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import opensearchpy
import pytest

from osbenchmark import client, exceptions
from osbenchmark.worker_coordinator import runner
from tests import run_async, as_future
Expand Down Expand Up @@ -2259,6 +2258,182 @@ async def test_search_pipeline_using_request_params(self, opensearch, on_client_
opensearch.clear_scroll.assert_not_called()


class TrainKnnModelRunnerTests(TestCase):
model_id = "test-model-id"
retries = 120
poll_period = 0.5 # seconds

request = {
"index": "unittest",
"operation-type": "train-knn-model",
"model_id": model_id,
"poll_period": poll_period,
"retries": retries,
"body": {
"training_index": "test_train_index_name",
"training_field": "test_train_index_name",
"search_size": 500,
"dimension": 10,
"max_training_vector_count": 100,

"method": {
"name": "ivf",
"engine": "faiss",
"space_type": "l2",
"parameters": {
"nlist": 10,
"nprobes": 5
}
}
}
}
train_status_check_response = {

'weight': 1, 'unit': 'ops', 'success': True
}

@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end')
@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start')
@mock.patch("asyncio.sleep", return_value=as_future())
@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_train_success(self, opensearch, sleep, on_client_request_start, on_client_request_end):

train_api_status_response = {
"model_id": "1",
"model_blob": "",
"state": "created",
"timestamp": "2024-06-17T23:03:02.475277Z",
"description": "My model description",
"space_type": "l2",
"dimension": 10,
"engine": "faiss",
"training_node_assignment": "4QQIfIL3RzSWlPPf9K8b9w",
"model_definition": {
"name": "ivf",
"parameters": {
"nprobes": 5,
"nlist": 10
}
}
}

train_api_first_mock = as_future(self.train_status_check_response)
train_api_status_mock = as_future(train_api_status_response)

opensearch.transport.perform_request.side_effect = [
train_api_first_mock, train_api_status_mock]

runner_under_test = runner.TrainKnnModel()
async with runner_under_test:
await runner_under_test(opensearch, self.request)

@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end')
@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start')
@mock.patch("asyncio.sleep", return_value=as_future())
@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_train_failure(self, opensearch, sleep, on_client_request_start, on_client_request_end):
train_api_status_response = {
"model_id": self.model_id,
"model_blob": "",
"state": "failed",
"timestamp": "2024-06-17T23:03:02.475277Z",
"description": "My model description",
"space_type": "l2",
"dimension": 10,
"engine": "faiss",
"training_node_assignment": "4QQIfIL3RzSWlPPf9K8b9w",
"model_definition": {
"name": "ivf",
"parameters": {
"nprobes": 5,
"nlist": 10
}
}
}

train_api_first_mock = as_future(self.train_status_check_response)
train_api_status_mock = as_future(train_api_status_response)

opensearch.transport.perform_request.side_effect = [
train_api_first_mock, train_api_status_mock]
runner_under_test = runner.TrainKnnModel()

with self.assertRaisesRegex(Exception, f"Failed to create model: {train_api_status_response}"):
await runner_under_test(opensearch, self.request)

@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end')
@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start')
@mock.patch("asyncio.sleep", return_value=as_future())
@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_train_error_response(self, opensearch, sleep, on_client_request_start, on_client_request_end):
error_response = {
"error": {
"root_cause":
{
"type": "index_not_found_exception",
"reason": "no such index [.opensearch-knn-models]",
"index": ".opensearch-knn-models",
"resource.id": ".opensearch-knn-models",
"resource.type": "index_expression",
"index_uuid": "_na_"
},
"type": "index_not_found_exception",
"reason": "no such index [.opensearch-knn-models]",
"index": ".opensearch-knn-models",
"resource.id": ".opensearch-knn-models",
"resource.type": "index_expression",
"index_uuid": "_na_"
},
"status": 404
}

side_effect_list = [
as_future(self.train_status_check_response), as_future(error_response)]
opensearch.transport.perform_request.side_effect = side_effect_list
runner_under_test = runner.TrainKnnModel()

with self.assertRaises(Exception):
await runner_under_test(opensearch, self.request)

@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end')
@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start')
@mock.patch("asyncio.sleep", return_value=as_future())
@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_train_timeout(self, opensearch, sleep, on_client_request_start, on_client_request_end):

still_training_response = ({
"model_id": self.model_id,
"model_blob": "",
"state": "training",
"timestamp": "2024-06-17T23:03:02.475277Z",
"description": "My model description",
"space_type": "l2",
"dimension": 10,
"engine": "faiss",
"training_node_assignment": "4QQIfIL3RzSWlPPf9K8b9w",
"model_definition": {
"name": "ivf",
"parameters": {
"nprobes": 5,
"nlist": 10
}
}
})

side_effect_list = [as_future(self.train_status_check_response)] + [
as_future(still_training_response) for _ in range(self.retries + 2)]
opensearch.transport.perform_request.side_effect = side_effect_list
runner_under_test = runner.TrainKnnModel()

# Set model state = Training.
with self.assertRaisesRegex(TimeoutError, f'Failed to create model: {self.model_id} within {self.retries} retries'):
await runner_under_test(opensearch, self.request)


class VectorSearchQueryRunnerTests(TestCase):
@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end')
@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start')
Expand Down
Loading