From 3d138cb6acdb2ece525d7b1499030a1dfc3a5ef9 Mon Sep 17 00:00:00 2001 From: Finn Roblin Date: Wed, 19 Jun 2024 15:02:43 -0700 Subject: [PATCH 1/9] added TrainKNN runner + tests Signed-off-by: Finn Roblin --- osbenchmark/worker_coordinator/runner.py | 70 +++++- osbenchmark/workload/workload.py | 3 + tests/worker_coordinator/runner_test.py | 265 ++++++++++++++++++++++- 3 files changed, 335 insertions(+), 3 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 1941c90ac..ccd6d43d4 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -47,7 +47,7 @@ from osbenchmark.utils import convert from osbenchmark.client import RequestContextHolder # Mapping from operation type to specific runner -from osbenchmark.utils.parse import parse_int_parameter, parse_string_parameter +from osbenchmark.utils.parse import parse_int_parameter, parse_string_parameter, parse_float_parameter __RUNNERS = {} @@ -105,7 +105,7 @@ def register_default_runners(): register_runner(workload.OperationType.DeleteMlModel, Retry(DeleteMlModel()), async_runner=True) register_runner(workload.OperationType.RegisterMlModel, Retry(RegisterMlModel()), async_runner=True) register_runner(workload.OperationType.DeployMlModel, Retry(DeployMlModel()), async_runner=True) - + register_runner(workload.OperationType.TrainKNNModel, Retry(TrainKNNModel()), async_runner=True) def runner_for(operation_type): try: @@ -651,6 +651,72 @@ def error_description(self, error_details): def __repr__(self, *args, **kwargs): return "bulk-index" +class TrainKNNModel(Runner): + """ + Trains model named model_id until training is complete or timeout is exceeded. + """ + + NAME = "train-knn-model" + + async def __call__(self, opensearch, params): + """ + Create and train one model named model_id. + + :param opensearch: The OpenSearch client. + :param params: A hash with all parameters. See below for details. + :return: A hash with meta data for this bulk operation. See below for details. + :raises: Exception if training fails, times out, or a different error occurs. + It expects a parameter dict with the following mandatory keys: + + * ``body``: containing parameters to pass on to the train engine. + See https://opensearch.org/docs/latest/search-plugins/knn/api/#train-a-model for information. + * ``retries``: Maximum number of retries allowed for the training to complete (seconds). + * ``polling-interval``: Polling interval to see if the model has been trained yet (seconds). + * ``model_id``: ID of the model to train. + """ + body = params["body"] + model_id = parse_string_parameter("model_id", params) + max_retries = parse_int_parameter("retries", params) + poll_period = parse_float_parameter("poll_period", params) + + method = "POST" + model_uri = "/_plugins/_knn/models/{}".format(model_id) + + await opensearch.transport.perform_request(method, "{}/_train".format(model_uri), body=body) + + current_number_retries = 0 + while True: + model_response = await opensearch.transport.perform_request("GET", model_uri) + + if 'state' not in model_response.keys() or current_number_retries > max_retries: + request_context_holder.on_client_request_end() + self.logger.error(f"Failed to create model {model_id} within {max_retries} retries.") + raise Exception('Failed to create model: {} within {} retries' + .format(model_id, max_retries)) + + if model_response['state'] == 'training': + current_number_retries += 1 + await asyncio.sleep(poll_period) + continue + + request_context_holder.on_client_request_end() # at this point, training either failed or finished. + if model_response['state'] == 'created': + self.logger.info(f"Training model {model_id} was completed successfully.") + break + else: + # training failed. + self.logger.error(f"Training for model {model_id} failed. Response: {model_response}") + raise Exception("Failed to create model: {}".format(model_response)) + + def inspect_model_response(model_response): + if model_response['state'] == 'created': + return 1, "models_trained" + + if model_response['state'] == 'failed': + raise Exception("Failed to create model: {}".format(model_response)) + + def __repr__(self, *args, **kwargs): + return self.NAME # TODO: Add retry logic to BulkIndex, so that we can remove BulkVectorDataSet and use BulkIndex. class BulkVectorDataSet(Runner): diff --git a/osbenchmark/workload/workload.py b/osbenchmark/workload/workload.py index 3e401d1e6..9aaedcc36 100644 --- a/osbenchmark/workload/workload.py +++ b/osbenchmark/workload/workload.py @@ -601,6 +601,7 @@ class OperationType(Enum): ListAllPointInTime = 16 VectorSearch = 17 BulkVectorDataSet = 18 + TrainKNNModel = 19 # administrative actions ForceMerge = 1001 @@ -746,6 +747,8 @@ def from_hyphenated_string(cls, v): return OperationType.RegisterMlModel elif v == "deploy-ml-model": return OperationType.DeployMlModel + elif v == "train-k-n-n-model": + return OperationType.TrainKNNModel else: raise KeyError(f"No enum value for [{v}]") diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index 6969937c1..5647809cf 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -31,7 +31,6 @@ import opensearchpy import pytest - from osbenchmark import client, exceptions from osbenchmark.worker_coordinator import runner from tests import run_async, as_future @@ -2259,6 +2258,270 @@ async def test_search_pipeline_using_request_params(self, opensearch, on_client_ opensearch.clear_scroll.assert_not_called() +class TrainKNNModelRunnerTests(TestCase): + model_id = "test-model-id" + retries = 120 + poll_period = 0.5 # seconds + + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch("asyncio.sleep", return_value=as_future()) + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_train_success(self, opensearch, sleep, on_client_request_start, on_client_request_end): + request = { + "index": "unittest", + "operation-type": "train-knn-model", + "model_id": self.model_id, + "poll_period": self.poll_period, + "retries": self.retries, + "body": { + "training_index": "test_train_index_name", + "training_field": "test_train_index_name", + "search_size": 500, + "dimension": 10, + "max_training_vector_count": 100, + + "method": { + "name":"ivf", + "engine":"faiss", + "space_type": "l2", + "parameters": { + "nlist": 10, + "nprobes": 5 + } + } + } + } + + train_api_status_response = { + "model_id": "1", + "model_blob": "", + "state": "created", + "timestamp": "2024-06-17T23:03:02.475277Z", + "description": "My model description", + "space_type": "l2", + "dimension": 10, + "engine": "faiss", + "training_node_assignment": "4QQIfIL3RzSWlPPf9K8b9w", + "model_definition": { + "name": "ivf", + "parameters": { + "nprobes": 5, + "nlist": 10 + } + } + } + + train_status_check_response = { + + 'weight': 1, 'unit': 'ops', 'success': True + } + + train_api_first_mock = as_future(train_status_check_response) + train_api_status_mock =as_future(train_api_status_response) + + opensearch.transport.perform_request.side_effect = [ train_api_first_mock, train_api_status_mock ] + + runner_under_test = runner.TrainKNNModel() + async with runner_under_test: + result = await runner_under_test(opensearch, request) + + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch("asyncio.sleep", return_value=as_future()) + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_train_failure(self, opensearch, sleep, on_client_request_start, on_client_request_end): + request = { + "index": "unittest", + "operation-type": "train-knn-model", + "model_id":self.model_id, + "poll_period": self.poll_period, + "retries": self.retries, + "body": { + "training_index": "test_train_index_name", + "training_field": "test_train_index_name", + "search_size": 500, + "dimension": 10, + "max_training_vector_count": 100, + + "method": { + "name":"ivf", + "engine":"faiss", + "space_type": "l2", + "parameters": { + "nlist": 10, + "nprobes": 5 + } + } + } + } + + train_api_status_response = { + "model_id": self.model_id, + "model_blob": "", + "state": "failed", + "timestamp": "2024-06-17T23:03:02.475277Z", + "description": "My model description", + "space_type": "l2", + "dimension": 10, + "engine": "faiss", + "training_node_assignment": "4QQIfIL3RzSWlPPf9K8b9w", + "model_definition": { + "name": "ivf", + "parameters": { + "nprobes": 5, + "nlist": 10 + } + } + } + + train_status_check_response = { + + 'weight': 1, 'unit': 'ops', 'success': True + } + + train_api_first_mock = as_future(train_status_check_response) + train_api_status_mock =as_future(train_api_status_response) + + opensearch.transport.perform_request.side_effect = [ train_api_first_mock, train_api_status_mock ] + + + runner_under_test = runner.TrainKNNModel() + + with self.assertRaisesRegex(Exception, f"Failed to create model: {train_api_status_response}"): + await runner_under_test(opensearch, request) + + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch("asyncio.sleep", return_value=as_future()) + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_train_error_response(self,opensearch, sleep, on_client_request_start, on_client_request_end): + error_response = { + "error": { + "root_cause": + { + "type": "index_not_found_exception", + "reason": "no such index [.opensearch-knn-models]", + "index": ".opensearch-knn-models", + "resource.id": ".opensearch-knn-models", + "resource.type": "index_expression", + "index_uuid": "_na_" + } + , + "type": "index_not_found_exception", + "reason": "no such index [.opensearch-knn-models]", + "index": ".opensearch-knn-models", + "resource.id": ".opensearch-knn-models", + "resource.type": "index_expression", + "index_uuid": "_na_" + }, + "status": 404 + } + + + + train_status_check_response = ({ + + 'weight': 1, 'unit': 'ops', 'success': True + }) + side_effect_list = [as_future(train_status_check_response), as_future(error_response)] + opensearch.transport.perform_request.side_effect = side_effect_list + runner_under_test = runner.TrainKNNModel() + request = { + "index": "unittest", + "operation-type": "train-knn-model", + "model_id": self.model_id, + "poll_period": self.poll_period, + "retries": self.retries, + "body": { + "training_index": "test_train_index_name", + "training_field": "test_train_index_name", + "search_size": 500, + "dimension": 10, + "max_training_vector_count": 100, + + "method": { + "name":"ivf", + "engine":"faiss", + "space_type": "l2", + "parameters": { + "nlist": 10, + "nprobes": 5 + } + } + } + } + + with self.assertRaises(Exception): + await runner_under_test(opensearch, request) + + + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch("asyncio.sleep", return_value=as_future()) + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_train_timeout(self,opensearch, sleep, on_client_request_start, on_client_request_end): + + + still_training_response = ({ + "model_id": self.model_id, + "model_blob": "", + "state": "training", + "timestamp": "2024-06-17T23:03:02.475277Z", + "description": "My model description", + "space_type": "l2", + "dimension": 10, + "engine": "faiss", + "training_node_assignment": "4QQIfIL3RzSWlPPf9K8b9w", + "model_definition": { + "name": "ivf", + "parameters": { + "nprobes": 5, + "nlist": 10 + } + } + }) + + train_status_check_response = ({ + + 'weight': 1, 'unit': 'ops', 'success': True + }) + side_effect_list = [as_future(train_status_check_response)] + [as_future(still_training_response) for _ in range(self.retries + 2)] + opensearch.transport.perform_request.side_effect = side_effect_list + runner_under_test = runner.TrainKNNModel() + request = { + "index": "unittest", + "operation-type": "train-knn-model", + "model_id": self.model_id, + "poll_period": self.poll_period, + "retries": self.retries, + "body": { + "training_index": "test_train_index_name", + "training_field": "test_train_index_name", + "search_size": 500, + "dimension": 10, + "max_training_vector_count": 100, + + "method": { + "name":"ivf", + "engine":"faiss", + "space_type": "l2", + "parameters": { + "nlist": 10, + "nprobes": 5 + } + } + } + } + # Set model state = Training. + with self.assertRaisesRegex(Exception, f'Failed to create model: {self.model_id} within {self.retries} retries'): + await runner_under_test(opensearch, request) + + class VectorSearchQueryRunnerTests(TestCase): @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') From 13da7b8c177e8313342a8806e58467413a20d885 Mon Sep 17 00:00:00 2001 From: Finn Roblin Date: Wed, 19 Jun 2024 17:28:12 -0700 Subject: [PATCH 2/9] Addressed Vijay's feedback, cleaned up tests, added TimeoutError Signed-off-by: Finn Roblin --- osbenchmark/worker_coordinator/runner.py | 55 +++--- osbenchmark/workload/workload.py | 6 +- tests/worker_coordinator/runner_test.py | 222 +++++++---------------- 3 files changed, 100 insertions(+), 183 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index ccd6d43d4..e7695bf1c 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -105,7 +105,7 @@ def register_default_runners(): register_runner(workload.OperationType.DeleteMlModel, Retry(DeleteMlModel()), async_runner=True) register_runner(workload.OperationType.RegisterMlModel, Retry(RegisterMlModel()), async_runner=True) register_runner(workload.OperationType.DeployMlModel, Retry(DeployMlModel()), async_runner=True) - register_runner(workload.OperationType.TrainKNNModel, Retry(TrainKNNModel()), async_runner=True) + register_runner(workload.OperationType.TrainKnnModel, Retry(TrainKnnModel()), async_runner=True) def runner_for(operation_type): try: @@ -651,13 +651,14 @@ def error_description(self, error_details): def __repr__(self, *args, **kwargs): return "bulk-index" -class TrainKNNModel(Runner): + +class TrainKnnModel(Runner): """ - Trains model named model_id until training is complete or timeout is exceeded. + Trains model named model_id until training is complete or retries are exhausted. """ NAME = "train-knn-model" - + async def __call__(self, opensearch, params): """ Create and train one model named model_id. @@ -680,41 +681,45 @@ async def __call__(self, opensearch, params): poll_period = parse_float_parameter("poll_period", params) method = "POST" - model_uri = "/_plugins/_knn/models/{}".format(model_id) + model_uri = f"/_plugins/_knn/models/{model_id}" + request_context_holder.on_client_request_start() + await opensearch.transport.perform_request(method, f"{model_uri}/_train", body=body) - await opensearch.transport.perform_request(method, "{}/_train".format(model_uri), body=body) - current_number_retries = 0 - while True: + while True: model_response = await opensearch.transport.perform_request("GET", model_uri) - if 'state' not in model_response.keys() or current_number_retries > max_retries: + if 'state' not in model_response.keys(): request_context_holder.on_client_request_end() - self.logger.error(f"Failed to create model {model_id} within {max_retries} retries.") - raise Exception('Failed to create model: {} within {} retries' - .format(model_id, max_retries)) - + self.logger.error( + "Failed to create model [%s] with error response: [%s]", model_id, model_response) + raise Exception( + f"Failed to create model {model_id} with error response: {model_response}") + + if current_number_retries > max_retries: + request_context_holder.on_client_request_end() + self.logger.error( + "Failed to create model [%s] within [%i] retries.", model_id, max_retries) + raise TimeoutError( + f'Failed to create model: {model_id} within {max_retries} retries') + if model_response['state'] == 'training': current_number_retries += 1 await asyncio.sleep(poll_period) continue - request_context_holder.on_client_request_end() # at this point, training either failed or finished. + # at this point, training either failed or finished. + request_context_holder.on_client_request_end() if model_response['state'] == 'created': - self.logger.info(f"Training model {model_id} was completed successfully.") + self.logger.info( + "Training model [%s] was completed successfully.", model_id) break - else: - # training failed. - self.logger.error(f"Training for model {model_id} failed. Response: {model_response}") - raise Exception("Failed to create model: {}".format(model_response)) - def inspect_model_response(model_response): - if model_response['state'] == 'created': - return 1, "models_trained" + # training failed. + self.logger.error( + "Training for model [%s] failed. Response: [%s]", model_id, model_response) + raise Exception(f"Failed to create model: {model_response}") - if model_response['state'] == 'failed': - raise Exception("Failed to create model: {}".format(model_response)) - def __repr__(self, *args, **kwargs): return self.NAME diff --git a/osbenchmark/workload/workload.py b/osbenchmark/workload/workload.py index 9aaedcc36..5fcee1067 100644 --- a/osbenchmark/workload/workload.py +++ b/osbenchmark/workload/workload.py @@ -601,7 +601,7 @@ class OperationType(Enum): ListAllPointInTime = 16 VectorSearch = 17 BulkVectorDataSet = 18 - TrainKNNModel = 19 + TrainKnnModel = 19 # administrative actions ForceMerge = 1001 @@ -747,8 +747,8 @@ def from_hyphenated_string(cls, v): return OperationType.RegisterMlModel elif v == "deploy-ml-model": return OperationType.DeployMlModel - elif v == "train-k-n-n-model": - return OperationType.TrainKNNModel + elif v == "train-knn-model": + return OperationType.TrainKnnModel else: raise KeyError(f"No enum value for [{v}]") diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index 5647809cf..3ffe19f1d 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -2258,10 +2258,39 @@ async def test_search_pipeline_using_request_params(self, opensearch, on_client_ opensearch.clear_scroll.assert_not_called() -class TrainKNNModelRunnerTests(TestCase): +class TrainKnnModelRunnerTests(TestCase): model_id = "test-model-id" retries = 120 - poll_period = 0.5 # seconds + poll_period = 0.5 # seconds + + request = { + "index": "unittest", + "operation-type": "train-knn-model", + "model_id": model_id, + "poll_period": poll_period, + "retries": retries, + "body": { + "training_index": "test_train_index_name", + "training_field": "test_train_index_name", + "search_size": 500, + "dimension": 10, + "max_training_vector_count": 100, + + "method": { + "name": "ivf", + "engine": "faiss", + "space_type": "l2", + "parameters": { + "nlist": 10, + "nprobes": 5 + } + } + } + } + train_status_check_response = { + + 'weight': 1, 'unit': 'ops', 'success': True + } @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @@ -2269,30 +2298,6 @@ class TrainKNNModelRunnerTests(TestCase): @mock.patch("opensearchpy.OpenSearch") @run_async async def test_train_success(self, opensearch, sleep, on_client_request_start, on_client_request_end): - request = { - "index": "unittest", - "operation-type": "train-knn-model", - "model_id": self.model_id, - "poll_period": self.poll_period, - "retries": self.retries, - "body": { - "training_index": "test_train_index_name", - "training_field": "test_train_index_name", - "search_size": 500, - "dimension": 10, - "max_training_vector_count": 100, - - "method": { - "name":"ivf", - "engine":"faiss", - "space_type": "l2", - "parameters": { - "nlist": 10, - "nprobes": 5 - } - } - } - } train_api_status_response = { "model_id": "1", @@ -2313,51 +2318,22 @@ async def test_train_success(self, opensearch, sleep, on_client_request_start, o } } - train_status_check_response = { - - 'weight': 1, 'unit': 'ops', 'success': True - } + train_api_first_mock = as_future(self.train_status_check_response) + train_api_status_mock = as_future(train_api_status_response) - train_api_first_mock = as_future(train_status_check_response) - train_api_status_mock =as_future(train_api_status_response) - - opensearch.transport.perform_request.side_effect = [ train_api_first_mock, train_api_status_mock ] + opensearch.transport.perform_request.side_effect = [ + train_api_first_mock, train_api_status_mock] - runner_under_test = runner.TrainKNNModel() + runner_under_test = runner.TrainKnnModel() async with runner_under_test: - result = await runner_under_test(opensearch, request) - + await runner_under_test(opensearch, self.request) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("asyncio.sleep", return_value=as_future()) @mock.patch("opensearchpy.OpenSearch") @run_async async def test_train_failure(self, opensearch, sleep, on_client_request_start, on_client_request_end): - request = { - "index": "unittest", - "operation-type": "train-knn-model", - "model_id":self.model_id, - "poll_period": self.poll_period, - "retries": self.retries, - "body": { - "training_index": "test_train_index_name", - "training_field": "test_train_index_name", - "search_size": 500, - "dimension": 10, - "max_training_vector_count": 100, - - "method": { - "name":"ivf", - "engine":"faiss", - "space_type": "l2", - "parameters": { - "nlist": 10, - "nprobes": 5 - } - } - } - } - train_api_status_response = { "model_id": self.model_id, "model_blob": "", @@ -2377,95 +2353,57 @@ async def test_train_failure(self, opensearch, sleep, on_client_request_start, o } } - train_status_check_response = { - - 'weight': 1, 'unit': 'ops', 'success': True - } + train_api_first_mock = as_future(self.train_status_check_response) + train_api_status_mock = as_future(train_api_status_response) - train_api_first_mock = as_future(train_status_check_response) - train_api_status_mock =as_future(train_api_status_response) - - opensearch.transport.perform_request.side_effect = [ train_api_first_mock, train_api_status_mock ] + opensearch.transport.perform_request.side_effect = [ + train_api_first_mock, train_api_status_mock] + runner_under_test = runner.TrainKnnModel() - - runner_under_test = runner.TrainKNNModel() - with self.assertRaisesRegex(Exception, f"Failed to create model: {train_api_status_response}"): - await runner_under_test(opensearch, request) + await runner_under_test(opensearch, self.request) @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("asyncio.sleep", return_value=as_future()) @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_train_error_response(self,opensearch, sleep, on_client_request_start, on_client_request_end): + async def test_train_error_response(self, opensearch, sleep, on_client_request_start, on_client_request_end): error_response = { "error": { - "root_cause": - { - "type": "index_not_found_exception", + "root_cause": + { + "type": "index_not_found_exception", "reason": "no such index [.opensearch-knn-models]", "index": ".opensearch-knn-models", "resource.id": ".opensearch-knn-models", "resource.type": "index_expression", "index_uuid": "_na_" - } - , + }, "type": "index_not_found_exception", "reason": "no such index [.opensearch-knn-models]", "index": ".opensearch-knn-models", "resource.id": ".opensearch-knn-models", "resource.type": "index_expression", "index_uuid": "_na_" - }, - "status": 404 + }, + "status": 404 } + side_effect_list = [ + as_future(self.train_status_check_response), as_future(error_response)] + opensearch.transport.perform_request.side_effect = side_effect_list + runner_under_test = runner.TrainKnnModel() - - train_status_check_response = ({ - - 'weight': 1, 'unit': 'ops', 'success': True - }) - side_effect_list = [as_future(train_status_check_response), as_future(error_response)] - opensearch.transport.perform_request.side_effect = side_effect_list - runner_under_test = runner.TrainKNNModel() - request = { - "index": "unittest", - "operation-type": "train-knn-model", - "model_id": self.model_id, - "poll_period": self.poll_period, - "retries": self.retries, - "body": { - "training_index": "test_train_index_name", - "training_field": "test_train_index_name", - "search_size": 500, - "dimension": 10, - "max_training_vector_count": 100, - - "method": { - "name":"ivf", - "engine":"faiss", - "space_type": "l2", - "parameters": { - "nlist": 10, - "nprobes": 5 - } - } - } - } - with self.assertRaises(Exception): - await runner_under_test(opensearch, request) - + await runner_under_test(opensearch, self.request) @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("asyncio.sleep", return_value=as_future()) @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_train_timeout(self,opensearch, sleep, on_client_request_start, on_client_request_end): - + async def test_train_timeout(self, opensearch, sleep, on_client_request_start, on_client_request_end): still_training_response = ({ "model_id": self.model_id, @@ -2486,40 +2424,14 @@ async def test_train_timeout(self,opensearch, sleep, on_client_request_start, on } }) - train_status_check_response = ({ - - 'weight': 1, 'unit': 'ops', 'success': True - }) - side_effect_list = [as_future(train_status_check_response)] + [as_future(still_training_response) for _ in range(self.retries + 2)] - opensearch.transport.perform_request.side_effect = side_effect_list - runner_under_test = runner.TrainKNNModel() - request = { - "index": "unittest", - "operation-type": "train-knn-model", - "model_id": self.model_id, - "poll_period": self.poll_period, - "retries": self.retries, - "body": { - "training_index": "test_train_index_name", - "training_field": "test_train_index_name", - "search_size": 500, - "dimension": 10, - "max_training_vector_count": 100, - - "method": { - "name":"ivf", - "engine":"faiss", - "space_type": "l2", - "parameters": { - "nlist": 10, - "nprobes": 5 - } - } - } - } - # Set model state = Training. - with self.assertRaisesRegex(Exception, f'Failed to create model: {self.model_id} within {self.retries} retries'): - await runner_under_test(opensearch, request) + side_effect_list = [as_future(self.train_status_check_response)] + [ + as_future(still_training_response) for _ in range(self.retries + 2)] + opensearch.transport.perform_request.side_effect = side_effect_list + runner_under_test = runner.TrainKnnModel() + + # Set model state = Training. + with self.assertRaisesRegex(TimeoutError, f'Failed to create model: {self.model_id} within {self.retries} retries'): + await runner_under_test(opensearch, self.request) class VectorSearchQueryRunnerTests(TestCase): From 9b206c0923d98644c69f99528d3f53f59684e6bb Mon Sep 17 00:00:00 2001 From: Finn Roblin Date: Fri, 21 Jun 2024 14:55:21 -0700 Subject: [PATCH 3/9] Added delete-knn-model runner Signed-off-by: Finn Roblin --- osbenchmark/worker_coordinator/runner.py | 31 ++++++++++ osbenchmark/worker_coordinator/test.py | 42 ++++++++++++++ osbenchmark/workload/workload.py | 3 + tests/worker_coordinator/runner_test.py | 72 ++++++++++++++++++++++++ 4 files changed, 148 insertions(+) create mode 100644 osbenchmark/worker_coordinator/test.py diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index e7695bf1c..7ec943429 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -106,6 +106,7 @@ def register_default_runners(): register_runner(workload.OperationType.RegisterMlModel, Retry(RegisterMlModel()), async_runner=True) register_runner(workload.OperationType.DeployMlModel, Retry(DeployMlModel()), async_runner=True) register_runner(workload.OperationType.TrainKnnModel, Retry(TrainKnnModel()), async_runner=True) + register_runner(workload.OperationType.DeleteKnnModel, Retry(DeleteKnnModel()), async_runner=True) def runner_for(operation_type): try: @@ -651,7 +652,37 @@ def error_description(self, error_details): def __repr__(self, *args, **kwargs): return "bulk-index" +class DeleteKnnModel(Runner): + """ + Deletes the K-NN model named model_id. + """ + NAME = "delete-knn-model" + + async def __call__(self, opensearch, params): + model_id = parse_string_parameter("model_id", params) + + method = "DELETE" + model_uri = f"/_plugins/_knn/models/{model_id}" + + request_context_holder.on_client_request_start() + response = await opensearch.transport.perform_request(method, model_uri) + + request_context_holder.on_client_request_end() + + if "error" in response.keys() and response["status"] == 404: + self.logger.debug("Model [%s] does not already exist, skipping delete.", model_id) + return + + if "error" in response.keys(): + self.logger.error("Request to delete model [%s] failed with error: with error response: [%s]", model_id, response) + raise Exception(f"Request to delete model {model_id} failed with error: with error response: {response}") + + self.logger.debug("Model [%s] deleted successfully.", model_id) + + def __repr__(self, *args, **kwargs): + return self.NAME + class TrainKnnModel(Runner): """ Trains model named model_id until training is complete or retries are exhausted. diff --git a/osbenchmark/worker_coordinator/test.py b/osbenchmark/worker_coordinator/test.py new file mode 100644 index 000000000..215287699 --- /dev/null +++ b/osbenchmark/worker_coordinator/test.py @@ -0,0 +1,42 @@ +import requests +import json + +# Define the endpoint and payload for the first POST request +url_1 = "http://localhost:9200/_plugins/_knn/models/3/_train" +payload_1 = { + "training_index": "train-index", + "training_field": "train_field_name", + "dimension": 10, + "description": "My model description", + "method": { + "name": "ivf", + "engine": "faiss", + "space_type": "l2", + "parameters": { + "nlist": 10, + "nprobes": 5 + } + } +} + +headers_1 = { + "Content-Type": "application/json" +} + +# Send the first POST request +url_2 = "http://localhost:9200/_plugins/_knn/models/3" + +response_1 = requests.post(url_1, headers=headers_1, data=json.dumps(payload_1)) +# print("First response status:", response_1.status_code) +# print(response_1.text) + +# # Define the endpoint for the second GET request + +# # Send the second GET request +response_2 = requests.get(url_2) +print("Second response status:", response_2.status_code) + +# Print the response from the second request +print("Second response content:") +print(response_2.text) +print(response_2) \ No newline at end of file diff --git a/osbenchmark/workload/workload.py b/osbenchmark/workload/workload.py index 5fcee1067..95545fb59 100644 --- a/osbenchmark/workload/workload.py +++ b/osbenchmark/workload/workload.py @@ -602,6 +602,7 @@ class OperationType(Enum): VectorSearch = 17 BulkVectorDataSet = 18 TrainKnnModel = 19 + DeleteKnnModel = 20 # administrative actions ForceMerge = 1001 @@ -749,6 +750,8 @@ def from_hyphenated_string(cls, v): return OperationType.DeployMlModel elif v == "train-knn-model": return OperationType.TrainKnnModel + elif v == "delete-knn-model": + return OperationType.DeleteKnnModel else: raise KeyError(f"No enum value for [{v}]") diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index 3ffe19f1d..82bda9d26 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -2258,6 +2258,78 @@ async def test_search_pipeline_using_request_params(self, opensearch, on_client_ opensearch.clear_scroll.assert_not_called() +class DeleteKnnModelRunnerTests(TestCase): + model_id = "test-model-id" + + request = { + "index": "unittest", + "operation-type": "train-knn-model", + "model_id": model_id + } + + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_delete_knn_success(self, opensearch, on_client_request_start, on_client_request_end): + response = { + "model_id": "test-model", + "result": "deleted" + } + opensearch.transport.perform_request.return_value = as_future(response) + runner_under_test = runner.DeleteKnnModel() + async with runner_under_test: + await runner_under_test(opensearch, self.request) + + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_delete_knn_404(self, opensearch, on_client_request_start, on_client_request_end): + + response = { + "error": { + "root_cause": [ + { + "type": "resource_not_found_exception", + "reason": "Unable to delete model [test-model]. Model does not exist" + } + ], + "type": "resource_not_found_exception", + "reason": "Unable to delete model [test-model]. Model does not exist" + }, + "status": 404 + } + opensearch.transport.perform_request.return_value = as_future(response) + runner_under_test = runner.DeleteKnnModel() + async with runner_under_test: + await runner_under_test(opensearch, self.request) + + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_delete_knn_400(self, opensearch, on_client_request_start, on_client_request_end): + + response = { + "error": { + "root_cause": [ + { + "type": "resource_not_found_exception", + "reason": "Unable to delete model [test-model]. Model does not exist" + } + ], + "type": "resource_not_found_exception", + "reason": "Unable to delete model [test-model]. Model does not exist" + }, + "status": 400 + } + opensearch.transport.perform_request.return_value = as_future(response) + runner_under_test = runner.DeleteKnnModel() + with self.assertRaises(Exception): + await runner_under_test(opensearch, self.request) + + class TrainKnnModelRunnerTests(TestCase): model_id = "test-model-id" retries = 120 From 9fac86265e23229ffac4382f61c3cec69d1309d9 Mon Sep 17 00:00:00 2001 From: Finn Roblin Date: Fri, 21 Jun 2024 16:51:55 -0700 Subject: [PATCH 4/9] Addressed Vijay feedback #2 Signed-off-by: Finn Roblin --- osbenchmark/worker_coordinator/runner.py | 17 ++++++---- osbenchmark/worker_coordinator/test.py | 42 ------------------------ tests/worker_coordinator/runner_test.py | 39 +++++++++++++++++++++- 3 files changed, 48 insertions(+), 50 deletions(-) delete mode 100644 osbenchmark/worker_coordinator/test.py diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 7ec943429..c5eb04c97 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -673,11 +673,11 @@ async def __call__(self, opensearch, params): if "error" in response.keys() and response["status"] == 404: self.logger.debug("Model [%s] does not already exist, skipping delete.", model_id) return - + if "error" in response.keys(): self.logger.error("Request to delete model [%s] failed with error: with error response: [%s]", model_id, response) raise Exception(f"Request to delete model {model_id} failed with error: with error response: {response}") - + self.logger.debug("Model [%s] deleted successfully.", model_id) def __repr__(self, *args, **kwargs): @@ -744,12 +744,15 @@ async def __call__(self, opensearch, params): if model_response['state'] == 'created': self.logger.info( "Training model [%s] was completed successfully.", model_id) - break + return + + if model_response['state'] == 'failed': + self.logger.error( + "Training for model [%s] failed. Response: [%s]", model_id, model_response) + raise Exception(f"Failed to create model {model_id}: {model_response}") - # training failed. - self.logger.error( - "Training for model [%s] failed. Response: [%s]", model_id, model_response) - raise Exception(f"Failed to create model: {model_response}") + self.logger.error("Model [%s] in unknown state [%s], response: [%s]", model_id, model_response["state"], model_response) + raise Exception(f"Model {model_id} in unknown state {model_response['state']}, response: {model_response}") def __repr__(self, *args, **kwargs): return self.NAME diff --git a/osbenchmark/worker_coordinator/test.py b/osbenchmark/worker_coordinator/test.py deleted file mode 100644 index 215287699..000000000 --- a/osbenchmark/worker_coordinator/test.py +++ /dev/null @@ -1,42 +0,0 @@ -import requests -import json - -# Define the endpoint and payload for the first POST request -url_1 = "http://localhost:9200/_plugins/_knn/models/3/_train" -payload_1 = { - "training_index": "train-index", - "training_field": "train_field_name", - "dimension": 10, - "description": "My model description", - "method": { - "name": "ivf", - "engine": "faiss", - "space_type": "l2", - "parameters": { - "nlist": 10, - "nprobes": 5 - } - } -} - -headers_1 = { - "Content-Type": "application/json" -} - -# Send the first POST request -url_2 = "http://localhost:9200/_plugins/_knn/models/3" - -response_1 = requests.post(url_1, headers=headers_1, data=json.dumps(payload_1)) -# print("First response status:", response_1.status_code) -# print(response_1.text) - -# # Define the endpoint for the second GET request - -# # Send the second GET request -response_2 = requests.get(url_2) -print("Second response status:", response_2.status_code) - -# Print the response from the second request -print("Second response content:") -print(response_2.text) -print(response_2) \ No newline at end of file diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index 82bda9d26..6cbaacabe 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -2432,7 +2432,44 @@ async def test_train_failure(self, opensearch, sleep, on_client_request_start, o train_api_first_mock, train_api_status_mock] runner_under_test = runner.TrainKnnModel() - with self.assertRaisesRegex(Exception, f"Failed to create model: {train_api_status_response}"): + with self.assertRaisesRegex(Exception, f"Failed to create model {self.model_id}: {train_api_status_response}"): + await runner_under_test(opensearch, self.request) + + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch("asyncio.sleep", return_value=as_future()) + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_train_illegal_model_state(self, opensearch, sleep, on_client_request_start, on_client_request_end): + illegal_state = "dummy state that is not supported" + train_api_status_response = { + "model_id": self.model_id, + "model_blob": "", + "state": "dummy state that is not supported", + "timestamp": "2024-06-17T23:03:02.475277Z", + "description": "My model description", + "space_type": "l2", + "dimension": 10, + "engine": "faiss", + "training_node_assignment": "4QQIfIL3RzSWlPPf9K8b9w", + "model_definition": { + "name": "ivf", + "parameters": { + "nprobes": 5, + "nlist": 10 + } + } + } + + train_api_first_mock = as_future(self.train_status_check_response) + train_api_status_mock = as_future(train_api_status_response) + + opensearch.transport.perform_request.side_effect = [ + train_api_first_mock, train_api_status_mock] + runner_under_test = runner.TrainKnnModel() + + with self.assertRaisesRegex(Exception, + f"Model {self.model_id} in unknown state {illegal_state}, response: {train_api_status_response}"): await runner_under_test(opensearch, self.request) @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') From 16e493b6554ac39ed3420114d0ba6c129ef79612 Mon Sep 17 00:00:00 2001 From: Finn Roblin Date: Mon, 24 Jun 2024 16:57:25 -0700 Subject: [PATCH 5/9] Added request ignore on 404; removed local testing file Signed-off-by: Finn Roblin --- osbenchmark/worker_coordinator/runner.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index c5eb04c97..e7cbc3f45 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -666,15 +666,12 @@ async def __call__(self, opensearch, params): request_context_holder.on_client_request_start() - response = await opensearch.transport.perform_request(method, model_uri) + # 404 indicates the model has not been created. + response = await opensearch.transport.perform_request(method, model_uri, params={"ignore": [404]}) request_context_holder.on_client_request_end() - - if "error" in response.keys() and response["status"] == 404: - self.logger.debug("Model [%s] does not already exist, skipping delete.", model_id) - return - - if "error" in response.keys(): + + if "error" in response.keys() and response["status"] != 404: self.logger.error("Request to delete model [%s] failed with error: with error response: [%s]", model_id, response) raise Exception(f"Request to delete model {model_id} failed with error: with error response: {response}") @@ -682,7 +679,7 @@ async def __call__(self, opensearch, params): def __repr__(self, *args, **kwargs): return self.NAME - + class TrainKnnModel(Runner): """ Trains model named model_id until training is complete or retries are exhausted. From b43cbc601e76d98bc801244ff723518addb5695e Mon Sep 17 00:00:00 2001 From: Finn Roblin Date: Tue, 25 Jun 2024 15:37:59 -0700 Subject: [PATCH 6/9] Fix linting errors Signed-off-by: Finn Roblin --- osbenchmark/worker_coordinator/runner.py | 91 ++++++++++++++++-------- tests/worker_coordinator/runner_test.py | 4 +- 2 files changed, 65 insertions(+), 30 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index e7cbc3f45..2a27659e1 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -652,10 +652,12 @@ def error_description(self, error_details): def __repr__(self, *args, **kwargs): return "bulk-index" + class DeleteKnnModel(Runner): """ - Deletes the K-NN model named model_id. + Deletes the K-NN model named model_id. """ + NAME = "delete-knn-model" async def __call__(self, opensearch, params): @@ -667,41 +669,50 @@ async def __call__(self, opensearch, params): request_context_holder.on_client_request_start() # 404 indicates the model has not been created. - response = await opensearch.transport.perform_request(method, model_uri, params={"ignore": [404]}) + response = await opensearch.transport.perform_request( + method, model_uri, params={"ignore": [404]} + ) request_context_holder.on_client_request_end() - + if "error" in response.keys() and response["status"] != 404: - self.logger.error("Request to delete model [%s] failed with error: with error response: [%s]", model_id, response) - raise Exception(f"Request to delete model {model_id} failed with error: with error response: {response}") + self.logger.error( + "Request to delete model [%s] failed with error: with error response: [%s]", + model_id, + response, + ) + raise Exception( + f"Request to delete model {model_id} failed with error: with error response: {response}" + ) self.logger.debug("Model [%s] deleted successfully.", model_id) def __repr__(self, *args, **kwargs): return self.NAME + class TrainKnnModel(Runner): """ - Trains model named model_id until training is complete or retries are exhausted. + Trains model named model_id until training is complete or retries are exhausted. """ NAME = "train-knn-model" async def __call__(self, opensearch, params): """ - Create and train one model named model_id. - + Create and train one model named model_id. + :param opensearch: The OpenSearch client. :param params: A hash with all parameters. See below for details. :return: A hash with meta data for this bulk operation. See below for details. - :raises: Exception if training fails, times out, or a different error occurs. + :raises: Exception if training fails, times out, or a different error occurs. It expects a parameter dict with the following mandatory keys: - * ``body``: containing parameters to pass on to the train engine. - See https://opensearch.org/docs/latest/search-plugins/knn/api/#train-a-model for information. + * ``body``: containing parameters to pass on to the train engine. + See https://opensearch.org/docs/latest/search-plugins/knn/api/#train-a-model for information. * ``retries``: Maximum number of retries allowed for the training to complete (seconds). - * ``polling-interval``: Polling interval to see if the model has been trained yet (seconds). - * ``model_id``: ID of the model to train. + * ``polling-interval``: Polling interval to see if the model has been trained yet (seconds). + * ``model_id``: ID of the model to train. """ body = params["body"] model_id = parse_string_parameter("model_id", params) @@ -711,49 +722,73 @@ async def __call__(self, opensearch, params): method = "POST" model_uri = f"/_plugins/_knn/models/{model_id}" request_context_holder.on_client_request_start() - await opensearch.transport.perform_request(method, f"{model_uri}/_train", body=body) + await opensearch.transport.perform_request( + method, f"{model_uri}/_train", body=body + ) current_number_retries = 0 while True: - model_response = await opensearch.transport.perform_request("GET", model_uri) + model_response = await opensearch.transport.perform_request( + "GET", model_uri + ) - if 'state' not in model_response.keys(): + if "state" not in model_response.keys(): request_context_holder.on_client_request_end() self.logger.error( - "Failed to create model [%s] with error response: [%s]", model_id, model_response) + "Failed to create model [%s] with error response: [%s]", + model_id, + model_response, + ) raise Exception( - f"Failed to create model {model_id} with error response: {model_response}") + f"Failed to create model {model_id} with error response: {model_response}" + ) if current_number_retries > max_retries: request_context_holder.on_client_request_end() self.logger.error( - "Failed to create model [%s] within [%i] retries.", model_id, max_retries) + "Failed to create model [%s] within [%i] retries.", + model_id, + max_retries, + ) raise TimeoutError( - f'Failed to create model: {model_id} within {max_retries} retries') + f"Failed to create model: {model_id} within {max_retries} retries" + ) - if model_response['state'] == 'training': + if model_response["state"] == "training": current_number_retries += 1 await asyncio.sleep(poll_period) continue # at this point, training either failed or finished. request_context_holder.on_client_request_end() - if model_response['state'] == 'created': + if model_response["state"] == "created": self.logger.info( - "Training model [%s] was completed successfully.", model_id) + "Training model [%s] was completed successfully.", model_id + ) return - if model_response['state'] == 'failed': + if model_response["state"] == "failed": self.logger.error( - "Training for model [%s] failed. Response: [%s]", model_id, model_response) + "Training for model [%s] failed. Response: [%s]", + model_id, + model_response, + ) raise Exception(f"Failed to create model {model_id}: {model_response}") - self.logger.error("Model [%s] in unknown state [%s], response: [%s]", model_id, model_response["state"], model_response) - raise Exception(f"Model {model_id} in unknown state {model_response['state']}, response: {model_response}") + self.logger.error( + "Model [%s] in unknown state [%s], response: [%s]", + model_id, + model_response["state"], + model_response, + ) + raise Exception( + f"Model {model_id} in unknown state {model_response['state']}, response: {model_response}" + ) def __repr__(self, *args, **kwargs): return self.NAME + # TODO: Add retry logic to BulkIndex, so that we can remove BulkVectorDataSet and use BulkIndex. class BulkVectorDataSet(Runner): """ diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index 6cbaacabe..ad67a344f 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -2505,7 +2505,7 @@ async def test_train_error_response(self, opensearch, sleep, on_client_request_s runner_under_test = runner.TrainKnnModel() with self.assertRaises(Exception): - await runner_under_test(opensearch, self.request) + await runner_under_test(opensearch, self.request) @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @@ -2540,7 +2540,7 @@ async def test_train_timeout(self, opensearch, sleep, on_client_request_start, o # Set model state = Training. with self.assertRaisesRegex(TimeoutError, f'Failed to create model: {self.model_id} within {self.retries} retries'): - await runner_under_test(opensearch, self.request) + await runner_under_test(opensearch, self.request) class VectorSearchQueryRunnerTests(TestCase): From a78351e49cc5681e1f06c57e99683253f0ad4171 Mon Sep 17 00:00:00 2001 From: Finn Roblin Date: Wed, 26 Jun 2024 14:27:43 -0700 Subject: [PATCH 7/9] Change DeleteKnnRunner behavior to return success status instead of exception Signed-off-by: Finn Roblin --- osbenchmark/worker_coordinator/runner.py | 29 +++++++++++---- tests/worker_coordinator/runner_test.py | 46 +++++++++++++++++++++--- 2 files changed, 64 insertions(+), 11 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 2a27659e1..db90c69a3 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -662,6 +662,7 @@ class DeleteKnnModel(Runner): async def __call__(self, opensearch, params): model_id = parse_string_parameter("model_id", params) + should_ignore_if_model_DNE = params.get("ignore-if-model-does-not-exist", False) method = "DELETE" model_uri = f"/_plugins/_knn/models/{model_id}" @@ -675,17 +676,29 @@ async def __call__(self, opensearch, params): request_context_holder.on_client_request_end() + if ( + "error" in response.keys() + and response["status"] == 404 + and not should_ignore_if_model_DNE + ): + self.logger.error( + "Request to delete model [%s] failed because the model does not exist "\ + "and ignore-if-model-does-not-exist was set to True. Response: [%s]", + model_id, + response, + ) + return {"success": False} + if "error" in response.keys() and response["status"] != 404: self.logger.error( "Request to delete model [%s] failed with error: with error response: [%s]", model_id, response, ) - raise Exception( - f"Request to delete model {model_id} failed with error: with error response: {response}" - ) + return {"success": False} self.logger.debug("Model [%s] deleted successfully.", model_id) + return {"success": True} def __repr__(self, *args, **kwargs): return self.NAME @@ -697,6 +710,8 @@ class TrainKnnModel(Runner): """ NAME = "train-knn-model" + DEFAULT_RETRIES = 1000 + DEFAULT_POLL_PERIOD = 0.5 async def __call__(self, opensearch, params): """ @@ -716,8 +731,10 @@ async def __call__(self, opensearch, params): """ body = params["body"] model_id = parse_string_parameter("model_id", params) - max_retries = parse_int_parameter("retries", params) - poll_period = parse_float_parameter("poll_period", params) + max_retries = parse_int_parameter("retries", params, self.DEFAULT_RETRIES) + poll_period = parse_float_parameter( + "poll_period", params, self.DEFAULT_POLL_PERIOD + ) method = "POST" model_uri = f"/_plugins/_knn/models/{model_id}" diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index ad67a344f..4f4b303b2 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -2278,15 +2278,48 @@ async def test_delete_knn_success(self, opensearch, on_client_request_start, on_ } opensearch.transport.perform_request.return_value = as_future(response) runner_under_test = runner.DeleteKnnModel() + async with runner_under_test: - await runner_under_test(opensearch, self.request) + result = await runner_under_test(opensearch, self.request) + + self.assertEqual(True, result["success"]) @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_delete_knn_404(self, opensearch, on_client_request_start, on_client_request_end): + async def test_delete_knn_404_success_when_ignore_if_model_DNE(self, opensearch, on_client_request_start, on_client_request_end): + request = { + "index": "unittest", + "operation-type": "train-knn-model", + "model_id": self.model_id, + "ignore-if-model-does-not-exist": True + } + response = { + "error": { + "root_cause": [ + { + "type": "resource_not_found_exception", + "reason": "Unable to delete model [test-model]. Model does not exist" + } + ], + "type": "resource_not_found_exception", + "reason": "Unable to delete model [test-model]. Model does not exist" + }, + "status": 404 + } + opensearch.transport.perform_request.return_value = as_future(response) + runner_under_test = runner.DeleteKnnModel() + async with runner_under_test: + result = await runner_under_test(opensearch, request) + self.assertEqual(True, result["success"]) + + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_delete_knn_404_fails_if_model_DNE(self, opensearch, on_client_request_start, on_client_request_end): response = { "error": { "root_cause": [ @@ -2303,7 +2336,9 @@ async def test_delete_knn_404(self, opensearch, on_client_request_start, on_clie opensearch.transport.perform_request.return_value = as_future(response) runner_under_test = runner.DeleteKnnModel() async with runner_under_test: - await runner_under_test(opensearch, self.request) + result = await runner_under_test(opensearch, self.request) + + self.assertEqual(False, result["success"]) @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @@ -2326,9 +2361,10 @@ async def test_delete_knn_400(self, opensearch, on_client_request_start, on_clie } opensearch.transport.perform_request.return_value = as_future(response) runner_under_test = runner.DeleteKnnModel() - with self.assertRaises(Exception): - await runner_under_test(opensearch, self.request) + async with runner_under_test: + result = await runner_under_test(opensearch, self.request) + self.assertEqual(False, result["success"]) class TrainKnnModelRunnerTests(TestCase): model_id = "test-model-id" From 95d445e3ed6f7a488bd55f805f9fad4e2cf5af2c Mon Sep 17 00:00:00 2001 From: Finn Roblin Date: Thu, 27 Jun 2024 16:18:26 -0700 Subject: [PATCH 8/9] Vijay feedback & reordered conditionals Signed-off-by: Finn Roblin --- osbenchmark/worker_coordinator/runner.py | 63 +++++++++++++++++------- 1 file changed, 44 insertions(+), 19 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index db90c69a3..697bf17fc 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -659,46 +659,71 @@ class DeleteKnnModel(Runner): """ NAME = "delete-knn-model" + MODEL_DOES_NOT_EXIST_STATUS_CODE = 404 async def __call__(self, opensearch, params): model_id = parse_string_parameter("model_id", params) - should_ignore_if_model_DNE = params.get("ignore-if-model-does-not-exist", False) + ignore_if_model_does_not_exist = params.get( + "ignore-if-model-does-not-exist", False + ) method = "DELETE" model_uri = f"/_plugins/_knn/models/{model_id}" request_context_holder.on_client_request_start() - # 404 indicates the model has not been created. + # 404 indicates the model has not been created. The runner's response depends on ignore_if_model_does_not_exist. response = await opensearch.transport.perform_request( - method, model_uri, params={"ignore": [404]} + method, + model_uri, + params={"ignore": [self.MODEL_DOES_NOT_EXIST_STATUS_CODE]}, ) request_context_holder.on_client_request_end() - if ( - "error" in response.keys() - and response["status"] == 404 - and not should_ignore_if_model_DNE - ): - self.logger.error( - "Request to delete model [%s] failed because the model does not exist "\ - "and ignore-if-model-does-not-exist was set to True. Response: [%s]", + # success condition. + if "result" in response.keys() and response["result"] == "deleted": + self.logger.debug("Model [%s] deleted successfully.", model_id) + return {"weight": 1, "unit": "ops", "success": True} + + if "error" not in response.keys(): + self.logger.warning( + "Request to delete model [%s] failed but no error, response: [%s]", model_id, response, ) - return {"success": False} + return {"weight": 1, "unit": "ops", "success": False} - if "error" in response.keys() and response["status"] != 404: - self.logger.error( - "Request to delete model [%s] failed with error: with error response: [%s]", + if response["status"] != self.MODEL_DOES_NOT_EXIST_STATUS_CODE: + self.logger.warning( + "Request to delete model [%s] failed with status [%s] and response: [%s]", model_id, + response["status"], response, ) - return {"success": False} + return {"weight": 1, "unit": "ops", "success": False} + + if ignore_if_model_does_not_exist: + self.logger.debug( + ( + "Model [%s] does not exist so it could not be deleted, " + "however ignore-if-model-does-not-exist is True so the " + "DeleteKnnModel operation succeeded." + ), + model_id, + ) - self.logger.debug("Model [%s] deleted successfully.", model_id) - return {"success": True} + return {"weight": 1, "unit": "ops", "success": True} + + self.logger.warning( + ( + "Request to delete model [%s] failed because the model does not exist " + "and ignore-if-model-does-not-exist was set to False. Response: [%s]" + ), + model_id, + response, + ) + return {"weight": 1, "unit": "ops", "success": False} def __repr__(self, *args, **kwargs): return self.NAME From 904ae3fa9a9b62da00abb1e180c590602190525d Mon Sep 17 00:00:00 2001 From: Finn Roblin Date: Thu, 27 Jun 2024 16:21:58 -0700 Subject: [PATCH 9/9] Revert autoformatter changes to comment at top of runner.py Signed-off-by: Finn Roblin --- osbenchmark/worker_coordinator/runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 697bf17fc..3a2d463e4 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -672,7 +672,7 @@ async def __call__(self, opensearch, params): request_context_holder.on_client_request_start() - # 404 indicates the model has not been created. The runner's response depends on ignore_if_model_does_not_exist. + # 404 indicates the model has not been created. In that case, the runner's response depends on ignore_if_model_does_not_exist. response = await opensearch.transport.perform_request( method, model_uri,