diff --git a/src/marqo/core/index_management/index_management.py b/src/marqo/core/index_management/index_management.py index c2a2cb6bc..d01e68f53 100644 --- a/src/marqo/core/index_management/index_management.py +++ b/src/marqo/core/index_management/index_management.py @@ -86,15 +86,26 @@ def bootstrap_vespa(self) -> bool: Returns: True if Vespa was bootstrapped, False if it was already up-to-date """ - with self._vespa_deployment_lock(): - vespa_app = self._get_vespa_application(check_configured=False, need_binary_file_support=True) - to_version = version.get_version() - from_version = vespa_app.get_marqo_config().version if vespa_app.is_configured else None + # We skip the Vespa convergence check here so that Marqo instance can be bootstrapped even when Vespa is + # not converged. + to_version = version.get_version() + vespa_app_for_version_check = self._get_vespa_application(check_configured=False, need_binary_file_support=True, + check_for_application_convergence=False) + from_version = vespa_app_for_version_check.get_marqo_config().version \ + if vespa_app_for_version_check.is_configured else None + + if from_version and semver.VersionInfo.parse(from_version) >= semver.VersionInfo.parse(to_version): + # skip bootstrapping if already bootstrapped to this version or later + return False - if from_version and semver.VersionInfo.parse(from_version) >= semver.VersionInfo.parse(to_version): - # skip bootstrapping if already bootstrapped to this version or later - return False + with self._vespa_deployment_lock(): + # Initialise another session based on the latest active Vespa session. The reason we do this again while + # holding the distributed lock is that the Vespa application might be changed by other operations when + # we wait for the lock. This time, we error out if the Vespa application is not converged, which reduces + # the chance of running into race conditions. + vespa_app = self._get_vespa_application(check_configured=False, need_binary_file_support=True, + check_for_application_convergence=True) # Only retrieving existing index when the vespa app is not configured and the index settings schema exists existing_indexes = self._get_existing_indexes() if not vespa_app.is_configured and \ @@ -105,8 +116,12 @@ def bootstrap_vespa(self) -> bool: return True def rollback_vespa(self) -> None: + """ + Roll back Vespa application package to the previous version backed up in the current app package. + """ with self._vespa_deployment_lock(): - self._get_vespa_application(need_binary_file_support=True).rollback(version.get_version()) + vespa_app = self._get_vespa_application(need_binary_file_support=True) + vespa_app.rollback(version.get_version()) def create_index(self, marqo_index_request: MarqoIndexRequest) -> MarqoIndex: """ @@ -274,8 +289,8 @@ def get_marqo_version(self) -> str: """ return self._get_vespa_application().get_marqo_config().version - def _get_vespa_application(self, check_configured: bool = True, need_binary_file_support: bool = False) \ - -> VespaApplicationPackage: + def _get_vespa_application(self, check_configured: bool = True, need_binary_file_support: bool = False, + check_for_application_convergence: bool = True) -> VespaApplicationPackage: """ Retrieve a Vespa application package. Depending on whether we need to handle binary files and the Vespa version, it uses different implementation of VespaApplicationStore. @@ -283,6 +298,8 @@ def _get_vespa_application(self, check_configured: bool = True, need_binary_file Args: check_configured: if set to True, it checks whether the application package is configured or not. need_binary_file_support: indicates whether the support for binary file is needed. + check_for_application_convergence: whether we check convergence of the Vespa app package. If set to true and + Vespa is not converged, this process will fail with a VespaError raised. Returns: The VespaApplicationPackage instance we can use to do bootstrapping/rollback and any index operations. @@ -314,13 +331,15 @@ def _get_vespa_application(self, check_configured: bool = True, need_binary_file application_package_store = VespaApplicationFileStore( vespa_client=self.vespa_client, deploy_timeout=self._deployment_timeout_seconds, - wait_for_convergence_timeout=self._convergence_timeout_seconds + wait_for_convergence_timeout=self._convergence_timeout_seconds, + check_for_application_convergence=check_for_application_convergence ) else: application_package_store = ApplicationPackageDeploymentSessionStore( vespa_client=self.vespa_client, deploy_timeout=self._deployment_timeout_seconds, - wait_for_convergence_timeout=self._convergence_timeout_seconds + wait_for_convergence_timeout=self._convergence_timeout_seconds, + check_for_application_convergence=check_for_application_convergence ) application = VespaApplicationPackage(application_package_store) diff --git a/src/marqo/core/index_management/vespa_application_package.py b/src/marqo/core/index_management/vespa_application_package.py index cd463f3bd..275b669f0 100644 --- a/src/marqo/core/index_management/vespa_application_package.py +++ b/src/marqo/core/index_management/vespa_application_package.py @@ -425,9 +425,11 @@ class VespaApplicationFileStore(VespaApplicationStore): more details. This is the only viable option to deploy changes of binary files before Vespa version 8.382.22. We implement this approach to support bootstrapping and rollback for Vespa version prior to 8.382.22. """ - def __init__(self, vespa_client: VespaClient, deploy_timeout: int, wait_for_convergence_timeout: int): + def __init__(self, vespa_client: VespaClient, deploy_timeout: int, wait_for_convergence_timeout: int, + check_for_application_convergence: bool = True): super().__init__(vespa_client, deploy_timeout, wait_for_convergence_timeout) - self._app_root_path = vespa_client.download_application(check_for_application_convergence=True) + self._app_root_path = vespa_client.download_application( + check_for_application_convergence=check_for_application_convergence) def _full_path(self, *paths: str) -> str: return os.path.join(self._app_root_path, *paths) @@ -483,9 +485,11 @@ class ApplicationPackageDeploymentSessionStore(VespaApplicationStore): See https://docs.vespa.ai/en/reference/deploy-rest-api-v2.html#create-session for more details. However, this approach does not support binary files for Vespa version prior to 8.382.22. """ - def __init__(self, vespa_client: VespaClient, deploy_timeout: int, wait_for_convergence_timeout: int): + def __init__(self, vespa_client: VespaClient, deploy_timeout: int, wait_for_convergence_timeout: int, + check_for_application_convergence: bool = True): super().__init__(vespa_client, deploy_timeout, wait_for_convergence_timeout) - self._content_base_url, self._prepare_url = vespa_client.create_deployment_session() + self._content_base_url, self._prepare_url = vespa_client.create_deployment_session( + check_for_application_convergence) self._all_contents = vespa_client.list_contents(self._content_base_url) def file_exists(self, *paths: str) -> bool: diff --git a/src/marqo/vespa/vespa_client.py b/src/marqo/vespa/vespa_client.py index a5b04f4ab..2f4310a34 100644 --- a/src/marqo/vespa/vespa_client.py +++ b/src/marqo/vespa/vespa_client.py @@ -95,9 +95,12 @@ def deploy_application(self, application: str, timeout: int = 60) -> None: self._raise_for_status(response) - def create_deployment_session(self) -> Tuple[str, str]: + def create_deployment_session(self, check_for_application_convergence: bool = True) -> Tuple[str, str]: """ Create a Vespa deployment session. + Args: + check_for_application_convergence: check for the application to converge before create a deployment session. + Returns: Tuple[str, str]: - content_base_url is the base url for contents in this session @@ -107,7 +110,9 @@ def create_deployment_session(self) -> Tuple[str, str]: via Zookeeper. Following requests should use content_base_url and prepare_url to make sure it can hit the right config server that this session is created on. """ - self.check_for_application_convergence() + if check_for_application_convergence: + self.check_for_application_convergence() + res = self._create_deploy_session(self.http_client) content_base_url = res['content'] prepare_url = res['prepared'] @@ -193,7 +198,8 @@ def wait_for_application_convergence(self, timeout: int = 120) -> None: except (httpx.TimeoutException, httpcore.TimeoutException): logger.error("Marqo timed out waiting for Vespa application to converge. Will retry.") - raise VespaError(f"Vespa application did not converge within {timeout} seconds") + raise VespaError(f"Vespa application did not converge within {timeout} seconds. " + f"The convergence status is {self._get_convergence_status()}") def query(self, yql: str, hits: int = 10, ranking: str = None, model_restrict: str = None, query_features: Dict[str, Any] = None, timeout: float = None, **kwargs) -> QueryResult: diff --git a/tests/core/index_management/test_index_management.py b/tests/core/index_management/test_index_management.py index 1d3119689..87d550028 100644 --- a/tests/core/index_management/test_index_management.py +++ b/tests/core/index_management/test_index_management.py @@ -42,7 +42,7 @@ def setUp(self): zookeeper_client=self.zookeeper_client, enable_index_operations=True, deployment_timeout_seconds=30, - convergence_timeout_seconds=60) + convergence_timeout_seconds=120) # this resets the application package to a clean state self._test_dir = str(os.path.dirname(os.path.abspath(__file__))) self._deploy_initial_app_package() @@ -243,6 +243,36 @@ def test_rollback_should_succeed(self): os.path.join(latest_version, *file) ) + @patch('marqo.vespa.vespa_client.VespaClient.check_for_application_convergence') + def test_bootstrap_and_rollback_should_skip_convergence_check(self, mock_check_convergence): + self.index_management.bootstrap_vespa() + mock_check_convergence.assert_not_called() + + mock_check_convergence.reset_mock() + + try: + self.index_management.rollback_vespa() + except ApplicationRollbackError: + pass + mock_check_convergence.assert_not_called() + + @patch('marqo.vespa.vespa_client.VespaClient.check_for_application_convergence') + @patch('marqo.vespa.vespa_client.VespaClient.get_vespa_version') + def test_bootstrap_and_rollback_should_not_skip_convergence_check_for_older_vespa_version(self, mock_vespa_version, + mock_check_convergence): + mock_vespa_version.return_value = '8.382.21' + + self.index_management.bootstrap_vespa() + mock_check_convergence.assert_called_once() + + mock_check_convergence.reset_mock() + + try: + self.index_management.rollback_vespa() + except ApplicationRollbackError: + pass + mock_check_convergence.assert_called_once() + def test_rollback_should_fail_when_target_version_is_current_version(self): self.index_management.bootstrap_vespa() with self.assertRaisesStrict(ApplicationRollbackError) as e: @@ -304,45 +334,45 @@ def test_rollback_should_fail_when_admin_config_is_changed(self): self.assertEqual("Aborting rollback. Reason: Vector store config has been changed since the last backup.", str(e.exception)) - def test_index_operation_methods_should_raise_error_if_index_operation_is_disabled(self): - # Create an index management instance with index operation disabled (by default) - self.index_management = IndexManagement(self.vespa_client, zookeeper_client=None) + def _index_operations(self, index_management: IndexManagement): index_request_1 = self.structured_marqo_index_request( fields=[FieldRequest(name='title', type=FieldType.Text)], tensor_fields=['title'] ) index_request_2 = self.unstructured_marqo_index_request() - with self.assertRaisesStrict(InternalError): - self.index_management.create_index(index_request_1) - - with self.assertRaisesStrict(InternalError): - self.index_management.batch_create_indexes([index_request_1, index_request_2]) + return [ + ('create single index', lambda: index_management.create_index(index_request_1)), + ('batch create indexes', lambda: index_management.batch_create_indexes([index_request_1, index_request_2])), + ('delete single index', lambda: index_management.delete_index_by_name(index_request_1.name)), + ('batch delete indexes', lambda: index_management.batch_delete_indexes_by_name([index_request_1.name, index_request_2.name])), + ] - with self.assertRaisesStrict(InternalError): - self.index_management.delete_index_by_name(index_request_1.name) + def test_index_operation_methods_should_raise_error_if_index_operation_is_disabled(self): + index_management_without_zookeeper = IndexManagement(self.vespa_client, zookeeper_client=None) - with self.assertRaisesStrict(InternalError): - self.index_management.batch_delete_indexes_by_name([index_request_1.name, index_request_2.name]) + for test_case, index_operation in self._index_operations(index_management_without_zookeeper): + with self.subTest(test_case): + with self.assertRaisesStrict(InternalError): + index_operation() def test_index_operation_methods_should_raise_error_if_marqo_is_not_bootstrapped(self): - index_request_1 = self.structured_marqo_index_request( - fields=[FieldRequest(name='title', type=FieldType.Text)], - tensor_fields=['title'] - ) - index_request_2 = self.unstructured_marqo_index_request() - - with self.assertRaisesStrict(ApplicationNotInitializedError): - self.index_management.create_index(index_request_1) - - with self.assertRaisesStrict(ApplicationNotInitializedError): - self.index_management.batch_create_indexes([index_request_1, index_request_2]) - - with self.assertRaisesStrict(ApplicationNotInitializedError): - self.index_management.delete_index_by_name(index_request_1.name) - - with self.assertRaisesStrict(ApplicationNotInitializedError): - self.index_management.batch_delete_indexes_by_name([index_request_1.name, index_request_2.name]) + for test_case, index_operation in self._index_operations(self.index_management): + with self.subTest(test_case): + with self.assertRaisesStrict(ApplicationNotInitializedError): + index_operation() + + @patch('marqo.vespa.vespa_client.VespaClient.check_for_application_convergence') + def test_index_operation_methods_should_check_convergence(self, mock_check_convergence): + for test_case, index_operation in self._index_operations(self.index_management): + with self.subTest(test_case): + try: + index_operation() + except ApplicationNotInitializedError: + pass + + mock_check_convergence.assert_called_once() + mock_check_convergence.reset_mock() def test_create_and_delete_index_should_succeed(self): # merge batch create and delete happy path to save some testing time