diff --git a/.github/workflows/backwards_compatibility_marqo_execution.yml b/.github/workflows/backwards_compatibility_marqo_execution.yml new file mode 100644 index 000000000..2b1a82f38 --- /dev/null +++ b/.github/workflows/backwards_compatibility_marqo_execution.yml @@ -0,0 +1,147 @@ +# Execution workflow +name: Marqo Compatibility Tests Execution + +on: + workflow_call: + # from_version: Used as: the identifier for a workflow call, for logging purposes and for pulling image from DockerHub. We need to pass a version here: ex: 2.11.1 + # to_version: Used as: the identifier for a workflow call and for logging purposes. We cannot use this to pull images from ECR or DockerHub (as opposed to from_version) since the to_version image has not been released yet. We need to pass a version here: ex: 2.11.5 + # to_version_image_identifier: A unique identifier of the to_version image uploaded to ECR. Can either be the tag or the digest of the "To be released" image. This is specifically used to pull images from ECR. We need to pass a full qualified docker image name with tag or digest here, example: marqoai/marqo:abcd1234 or marqoai/marqo@sha256:1234567890abcdef resp. + inputs: + from_version: + description: 'Source Marqo version. This is calculated in backwards_compatibility_marqo_orchestrator.yml and passed to this workflow' + required: true + type: string + to_version: + description: 'Target Marqo version. This is used for logging purposes, to identify the target version of Marqo being tested and to calculate the from_versions in the backwards_compatibility_marqo_orchestrator.yml. It is NOT used to pull images from ECR.' + required: true + type: string + to_version_image_identifier: + description: 'To version image identifier is a unique identifier for the target Marqo image, which can either be a tag or a digest. It should contain complete qualified image name with tag or digest. For example: marqoai/marqo:abcd1234 or marqoai/marqo@sha256:1234567890abcdef. This is used to pull images from ECR.' + required: true + type: string + workflow_dispatch: + # from_version: Used as: the identifier for a workflow call, for logging purposes and for pulling image from DockerHub. We need to pass a version here: ex: 2.11.1 + # to_version: Used as: the identifier for a workflow call and for logging purposes. We cannot use this to pull images from ECR or DockerHub (as opposed to from_version) since the to_version image has not been released yet. We need to pass a version here: ex: 2.11.5 + # to_version_image_identifier: A unique identifier of the to_version image uploaded to ECR. Can either be the tag or the digest of the "To be released" image. This is specifically used to pull images from ECR. We need to pass a full qualified docker image name with tag or digest here, example: marqoai/marqo:abcd1234 or marqoai/marqo@sha256:1234567890abcdef resp. + + # If running manually, just specify the from_version, to_version and the fully qualified marqo image name with tag or digest in same format of the examples given above + inputs: + from_version: + description: 'Source Marqo version. This is used to pull the image from DockerHub and for logging purposes.' + required: true + type: string + to_version: + description: 'Target Marqo version. This is used for logging purposes and to identify the target version of Marqo being tested.' + required: true + type: string + to_version_image_identifier: + description: 'To version image identifier is a unique identifier for the target Marqo image, which can either be a tag or a digest. It should contain complete qualified image name with tag or digest. For example: marqoai/marqo:abcd1234 or marqoai/marqo@sha256:1234567890abcdef. This is used to pull images from ECR.' + required: true + type: string + +jobs: + Start-Runner: + permissions: write-all + name: Start self-hosted EC2 runner + runs-on: ubuntu-latest + outputs: + label: ${{ steps.start-ec2-runner.outputs.label }} + ec2-instance-id: ${{ steps.start-ec2-runner.outputs.ec2-instance-id }} + steps: + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.MARQO_WORKFLOW_TESTS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.MARQO_WORKFLOW_TESTS_SECRET_ACCESS_KEY }} + aws-region: us-east-1 + - name: Start EC2 runner + id: start-ec2-runner + uses: machulav/ec2-github-runner@v2 + with: + mode: start + github-token: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }} + ec2-image-id: ${{ secrets.MARQO_CPU_AMD64_TESTS_INSTANCE_AMI }} + ec2-instance-type: m6i.xlarge + subnet-id: ${{ secrets.MARQO_WORKFLOW_TESTS_SUBNET_ID }} + security-group-id: ${{ secrets.MARQO_WORKFLOW_TESTS_SECURITY_GROUP_ID }} + aws-resource-tags: > # optional, requires additional permissions + [ + {"Key": "Name", "Value": "marqo-compatibility-test-runner-${{ github.run_id }}"}, + {"Key": "GitHubRepo", "Value": "${{ github.repository }}"}, + {"Key": "WorkflowName", "Value": "${{ github.workflow }}"}, + {"Key": "WorkflowRunId", "Value": "${{ github.run_id }}"}, + {"Key": "WorlflowURL", "Value": "${{ github.event.repository.html_url }}/actions/runs/${{ github.run_id }}"}, + {"Key": "PoloRole", "Value": "testing"} + ] + + backwards_compatibility: + # This job runs on the newly created runner + runs-on: ${{ needs.start-runner.outputs.label }} + needs: Start-Runner + steps: + # Step to check out the Marqo repository + - name: checkout marqo repo + uses: actions/checkout@v3 + with: + repository: ${{ github.repository }} #Check out the repository that contains this action since the tests exist in the same repository + fetch-depth: 0 + + # Step to set up Python 3.9 + - name: Set up Python 3.9 + uses: actions/setup-python@v3 + with: + python-version: "3.9" + cache: "pip" + # Step to install dependencies from requirements.txt + - name: Install Dependencies + run: | + pip install -r tests/backwards_compatibility_tests/requirements.txt + + # Step to configure AWS credentials + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + aws-access-key-id: ${{ secrets.ECR_PUSHER_AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.ECR_PUSHER_AWS_SECRET_ACCESS_KEY }} + aws-region: us-east-1 + + # Step to login to Amazon ECR + - name: Login to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v2 + + # Step to run the backwards compatibility test + - name: Run backwards_compatibility test + env: + FROM_VERSION: ${{ inputs.from_version || github.event.inputs.from_version }} + TO_VERSION: ${{ inputs.to_version || github.event.inputs.to_version }} + TO_VERSION_IMAGE_IDENTIFIER: ${{ inputs.to_version_image_identifier || github.event.inputs.to_version_image_identifier }} + run: | + export PYTHONPATH=${{ github.workspace }}:$PYTHONPATH + python tests/backwards_compatibility_tests/compatibility_test_runner.py \ + --mode=backwards_compatibility \ + --from_version "$FROM_VERSION" \ + --to_version "$TO_VERSION" \ + --to_version_image_identifier "$TO_VERSION_IMAGE_IDENTIFIER" \ + + Stop-Runner: + name: Stop self-hosted EC2 runner + needs: + - Start-Runner # required to get output from the start-runner job + - backwards_compatibility # required to wait when the main job is done + runs-on: ubuntu-latest + if: ${{ always() }} # required to stop the runner even if the error happened in the previous jobs + steps: + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.MARQO_WORKFLOW_TESTS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.MARQO_WORKFLOW_TESTS_SECRET_ACCESS_KEY }} + aws-region: us-east-1 + - name: Stop EC2 runner + uses: machulav/ec2-github-runner@v2 + with: + mode: stop + github-token: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }} + label: ${{ needs.start-runner.outputs.label }} + ec2-instance-id: ${{ needs.start-runner.outputs.ec2-instance-id }} diff --git a/.github/workflows/backwards_compatibility_marqo_orchestrator.yml b/.github/workflows/backwards_compatibility_marqo_orchestrator.yml new file mode 100644 index 000000000..035af0be6 --- /dev/null +++ b/.github/workflows/backwards_compatibility_marqo_orchestrator.yml @@ -0,0 +1,154 @@ +# Orchestrator workflow +name: Marqo Compatibility Tests Orchestrator + +on: + push: + branches: + - mainline + paths-ignore: + - '**.md' + workflow_dispatch: + inputs: + to_version: + description: 'Target Marqo version' + required: true + max_versions_to_test: + description: 'Max versions to test' + required: false + #TODO: Add input for specifying py_marqo branch (https://github.com/marqo-ai/marqo/pull/1024#discussion_r1841556872) + +# Setting MAX_VERSIONS_TO_TEST, this can be a configurable value or if no input is provided, it can be a default value. +env: + MAX_VERSIONS_TO_TEST: ${{ github.event.inputs.max_versions_to_test || 3 }} + +jobs: + check-if-image-exists: + # Responsible for deciding if we should invoke build_push_img.yml GitHub actions workflow in the same repo. + # We do not want to build and push the image if it already exists in the ECR registry, which will be the case if this is a manual developer initiated re-run using the same commit. + name: Check if image already exists in ECR + runs-on: ubuntu-latest + environment: marqo-test-suite + outputs: + image_exists: ${{ steps.check-image.outputs.image_exists }} + image_identifier: ${{ steps.check-image.outputs.image_identifier }} + steps: + - name: Checkout marqo repo + uses: actions/checkout@v3 + with: + fetch-depth: 0 + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + aws-access-key-id: ${{ secrets.ECR_PUSHER_AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.ECR_PUSHER_AWS_SECRET_ACCESS_KEY }} + aws-region: us-east-1 + + # step to check for image existence - it uses aws cli to check if the image exists in the ECR registry "marqo-compatibility-tests" + - name: Check image existence and get identifier + id: check-image + run: | + echo "Checking for image existence" + if IMAGE_DETAILS=$(aws ecr describe-images --repository-name marqo-compatibility-tests --image-ids imageTag=${{ github.sha }} 2>/dev/null); then + echo "image_exists=true" >> $GITHUB_OUTPUT + echo "Image already exists in ECR, will not build and push again. Will be using the image digest from existing image" + + IMAGE_IDENTIFIER=$(echo "$IMAGE_DETAILS" | jq -r '.imageDetails[0].imageDigest') + REGISTRY_ID = "424082663841.dkr.ecr.us-east-1.amazonaws.com" + FULL_IDENTIFIER="${REGISTRY_ID}/marqo-compatibility-tests@${IMAGE_IDENTIFIER}" + echo "image_identifier=${FULL_IDENTIFIER}" >> $GITHUB_OUTPUT + else + echo "image_exists=false" >> $GITHUB_OUTPUT + echo "Image doesn't exist" + fi + + build-and-push-image: + # Job to actually build and push image to ECR registry. This job is only triggered if the image does not already exist in the ECR registry. + name: Build and Push Image + needs: check-if-image-exists + if: needs.check-if-image-exists.outputs.image_exists == 'false' + uses: ./.github/workflows/build_push_img.yml + secrets: inherit + with: + marqo_ref: "${{ github.sha }}" + push_to: "ECR" + image_repo: "marqo-compatibility-tests" + image_tag: "${{ github.sha }}" + + + orchestrate: + # Job to orchestrate backwards compatibility test execution. Majorly responsible for determining to_version and for generating the list of from_version(s) to test against. + name: Orchestrate backwards compatibility test execution + runs-on: ubuntu-latest + needs: [check-if-image-exists, build-and-push-image] + if: always () && (needs.check-if-image-exists.result == 'success') + outputs: + list: ${{ steps.generate-versions.outputs.list }} + to_version: ${{ steps.get-to-version.outputs.to_version }} + environment: marqo-test-suite + steps: + # Step to check out the Marqo repository + - name: Checkout marqo repo + uses: actions/checkout@v3 + with: + fetch-depth: 0 + + # Step to set up Python 3.9 + - name: Set up Python 3.9 + uses: actions/setup-python@v3 + with: + python-version: '3.9' + cache: "pip" + + # Step to install the semver package + - name: Install semver + run: | + pip install semver + + # Step to determine the target version + - name: Determine to_version + id: get-to-version + run: | + if [ "${{ github.event_name }}" == "workflow_dispatch" ] && [ -n "${{ github.event.inputs.to_version }}" ]; then + VERSION="${{ github.event.inputs.to_version }}" + else + VERSION=$(python tests/backwards_compatibility_tests/scripts/determine_to_version.py ${{ github.sha }}) + fi + echo "to_version=${VERSION}" >> $GITHUB_OUTPUT + # Step to generate the list of versions to test + - name: Generate version list #this code block just generates the from_version list and stores it in a versions variable as a list + id: generate-versions + run: | + # Run the Python script and capture its output + VERSION_LIST=$(python tests/backwards_compatibility_tests/scripts/generate_versions.py ${{ steps.get-to-version.outputs.to_version }} ${{ env.MAX_VERSIONS_TO_TEST }}) + echo "list=${VERSION_LIST}" >> $GITHUB_OUTPUT + # Step to display the versions to test + - name: display versions + run: | + echo "Versions to test: ${{ steps.generate-versions.outputs.list }} against to_version: ${{ steps.get-to-version.outputs.to_version }}" + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + aws-access-key-id: ${{ secrets.ECR_PUSHER_AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.ECR_PUSHER_AWS_SECRET_ACCESS_KEY }} + aws-region: us-east-1 + + run-execution-workflow: + # Job to trigger execution workflows for each version combination + name: Run all execution workflows + needs: [orchestrate, check-if-image-exists, build-and-push-image] + if: always() && (needs.orchestrate.result == 'success') + strategy: + matrix: + from_version: ${{ fromJson(needs.orchestrate.outputs.list) }} + uses: ./.github/workflows/backwards_compatibility_marqo_execution.yml + secrets: inherit + with: + from_version: ${{ matrix.from_version }} + to_version: ${{ needs.orchestrate.outputs.to_version }} + # Pass the image_identifier to the execution workflow. By image_identifier, we refer to the + # complete qualified image name with the image digest (i.e 424082663841.dkr.ecr.us-east-1.amazonaws.com/marqo-compatibility-tests@sha256:1234567890abcdef). + # The image_identifier can either come from the check-if-image-exists (i.e in case the image already exists in ECR) job or the build-and-push-image (i.e in case the image was built and pushed to ECR) job. + to_version_image_identifier: ${{ needs.check-if-image-exists.outputs.image_exists == 'true' && needs.check-if-image-exists.outputs.image_identifier + || needs.build-and-push-image.outputs.image_identifier }} \ No newline at end of file diff --git a/.github/workflows/build_push_img.yml b/.github/workflows/build_push_img.yml index 1076822d7..285da94d7 100644 --- a/.github/workflows/build_push_img.yml +++ b/.github/workflows/build_push_img.yml @@ -1,7 +1,36 @@ name: Build and Push Marqo Docker Image +# TODO: Add concurrency for these tasks (https://s2search.atlassian.net/browse/MQ-274) (https://github.com/marqo-ai/marqo/pull/1024#discussion_r1841652402) on: - workflow_dispatch: + workflow_call: + inputs: + marqo_ref: + description: 'Marqo branch-name, commit SHA or tag' + required: false + type: string + default: 'mainline' + push_to: + description: 'image destination. Options: "ECR" or "DockerHub" (Can be added in the future)' + required: true + type: string + default: 'ECR' + image_repo: + description: 'Image repository' + required: true + type: string + default: 'marqo' + image_tag: + description: 'Image tag' + required: true + type: string + outputs: + image_identifier: + # By image_identifier, we refer to the + # complete qualified image name with the image digest (i.e 424082663841.dkr.ecr.us-east-1.amazonaws.com/marqo-compatibility-tests@sha256:1234567890abcdef). image_identifier: + description: "The image_identifier (ex: 424082663841.dkr.ecr.us-east-1.amazonaws.com/marqo-compatibility-tests@sha256:1234567890abcdef)" + value: ${{ jobs.Docker-Build.outputs.image_identifier }} + + workflow_dispatch: #for manual triggers inputs: marqo_ref: description: 'Marqo branch-name, commit SHA or tag' @@ -20,12 +49,6 @@ on: image_tag: description: 'Image tag' required: true - dockerhub_username: - description: 'DockerHub username' - required: false - dockerhub_password: - description: 'DockerHub password' - required: false jobs: Start-Runner: @@ -41,6 +64,7 @@ jobs: with: aws-access-key-id: ${{ secrets.MARQO_WORKFLOW_TESTS_ACCESS_KEY_ID }} aws-secret-access-key: ${{ secrets.MARQO_WORKFLOW_TESTS_SECRET_ACCESS_KEY }} + #TODO: create a runner in us-east-1 and use it for the build https://github.com/marqo-ai/marqo/pull/1024#discussion_r1841644166 aws-region: us-west-2 - name: Start EC2 runner id: start-ec2-runner @@ -67,15 +91,24 @@ jobs: name: Build docker image needs: Start-Runner # required to start the main job when the runner is ready runs-on: ${{ needs.start-runner.outputs.label }} # run the job on the newly created runner - + outputs: + image_identifier: ${{ steps.set-image-identifier-in-output.outputs.image_identifier }} environment: marqo-build-environment steps: + - name: Set input variables + id: set-inputs + run: | + echo "MARQO_REF=${{ inputs.marqo_ref || github.event.inputs.marqo_ref }}" >> $GITHUB_ENV + echo "PUSH_TO=${{ inputs.push_to || github.event.inputs.push_to }}" >> $GITHUB_ENV + echo "IMAGE_REPO=${{ inputs.image_repo || github.event.inputs.image_repo }}" >> $GITHUB_ENV + echo "IMAGE_TAG=${{ inputs.image_tag || github.event.inputs.image_tag }}" >> $GITHUB_ENV + - name: Checkout Marqo uses: actions/checkout@v3 with: repository: marqo-ai/marqo - ref: ${{ github.event.inputs.marqo_ref }} + ref: ${{ env.MARQO_REF }} - name: Set up QEMU uses: docker/setup-qemu-action@v2 @@ -83,43 +116,37 @@ jobs: - name: Set up Docker Buildx uses: docker/setup-buildx-action@v2 - - name: Login to Docker Hub - uses: docker/login-action@v2 - if: github.event.inputs.push_to == 'DockerHub' - with: - username: ${{ github.event.inputs.dockerhub_username }} - password: ${{ github.event.inputs.dockerhub_password }} - - - name: Configure AWS Credentials - uses: aws-actions/configure-aws-credentials@v1 - if: github.event.inputs.push_to == 'ECR' - with: - aws-access-key-id: ${{ secrets.ECR_PUSHER_AWS_ACCESS_KEY_ID }} - aws-secret-access-key: ${{ secrets.ECR_PUSHER_AWS_SECRET_ACCESS_KEY }} - aws-region: us-east-1 - - name: Login to ECR uses: docker/login-action@v2 - if: github.event.inputs.push_to == 'ECR' + if: env.PUSH_TO == 'ECR' with: - registry: 424082663841.dkr.ecr.us-east-1.amazonaws.com/${{ github.event.inputs.image_repo }} + registry: 424082663841.dkr.ecr.us-east-1.amazonaws.com + username: ${{ secrets.ECR_PUSHER_AWS_ACCESS_KEY_ID }} + password: ${{ secrets.ECR_PUSHER_AWS_SECRET_ACCESS_KEY }} - name: Set registry and image repo id: prepare run: | - if [[ "${{ github.event.inputs.push_to }}" == "ECR" ]]; then - echo "::set-output name=registry::424082663841.dkr.ecr.us-east-1.amazonaws.com" + if [[ "${{ env.PUSH_TO }}" == "ECR" ]]; then + echo "registry=424082663841.dkr.ecr.us-east-1.amazonaws.com" >> $GITHUB_OUTPUT else - echo "::set-output name=registry::marqoai" + echo "registry=marqoai" >> $GITHUB_OUTPUT fi - name: Build and push + id: build-and-push uses: docker/build-push-action@v4 with: context: . push: true platforms: linux/amd64,linux/arm64 - tags: ${{ steps.prepare.outputs.registry }}/${{ github.event.inputs.image_repo }}:${{ github.event.inputs.image_tag }} + tags: ${{ steps.prepare.outputs.registry }}/${{ env.IMAGE_REPO }}:${{ env.IMAGE_TAG }} + + - name: Output image_identifier + # By image_identifier, we refer to the complete qualified image name with the image digest (i.e 424082663841.dkr.ecr.us-east-1.amazonaws.com/marqo-compatibility-tests@sha256:1234567890abcdef) + id: set-image-identifier-in-output + run: | + echo "image_identifier=${{ steps.prepare.outputs.registry }}/${{ env.IMAGE_REPO }}@${{ steps.build-and-push.outputs.digest }}" >> $GITHUB_OUTPUT Stop-Runner: name: Stop self-hosted EC2 runner diff --git a/.github/workflows/largemodel_unit_test_CI.yml b/.github/workflows/largemodel_unit_test_CI.yml index 9373f4bf3..6a0285ee4 100644 --- a/.github/workflows/largemodel_unit_test_CI.yml +++ b/.github/workflows/largemodel_unit_test_CI.yml @@ -162,7 +162,7 @@ jobs: export PRIVATE_MODEL_TESTS_HF_TOKEN=${{ secrets.PRIVATE_MODEL_TESTS_HF_TOKEN }} export PYTHONPATH="./marqo/tests:./marqo/src:./marqo" - pytest marqo/tests --largemodel --ignore=marqo/tests/test_documentation.py + pytest marqo/tests --largemodel --ignore=marqo/tests/test_documentation.py --ignore=tests/backwards_compatibility_tests Stop-Runner: name: Stop self-hosted EC2 runner diff --git a/.github/workflows/unit_test_200gb_CI.yml b/.github/workflows/unit_test_200gb_CI.yml index c0d9b8b39..6d9c7d19c 100644 --- a/.github/workflows/unit_test_200gb_CI.yml +++ b/.github/workflows/unit_test_200gb_CI.yml @@ -164,7 +164,7 @@ jobs: cd marqo export PYTHONPATH="./tests:./src:." - pytest --ignore=tests/test_documentation.py --durations=100 --cov=src --cov-branch --cov-context=test --cov-report=html:cov_html --cov-report=lcov:lcov.info tests + pytest --ignore=tests/test_documentation.py --ignore=tests/backwards_compatibility_tests --durations=100 --cov=src --cov-branch --cov-context=test --cov-report=html:cov_html --cov-report=lcov:lcov.info tests - name: Upload Test Report uses: actions/upload-artifact@v4 diff --git a/tests/backwards_compatibility_tests/__init__.py b/tests/backwards_compatibility_tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/backwards_compatibility_tests/base_compatibility_test_case.py b/tests/backwards_compatibility_tests/base_compatibility_test_case.py new file mode 100644 index 000000000..637216941 --- /dev/null +++ b/tests/backwards_compatibility_tests/base_compatibility_test_case.py @@ -0,0 +1,80 @@ +import json +import logging +from abc import abstractmethod, ABC +from pathlib import Path +from marqo_test import MarqoTestCase + + +class BaseCompatibilityTestCase(MarqoTestCase, ABC): + """ + Base class for backwards compatibility tests. Contains a prepare method that should be implemented by subclasses to + add documents / prepare marqo state. Also contains methods to save and load results to/from a file so that + test results can be compared across versions. + """ + indexes_to_delete = [] + + @classmethod + def setUpClass(cls) -> None: + super().setUpClass() + if not hasattr(cls, 'logger'): + cls.logger = logging.getLogger(cls.__name__) + if not cls.logger.hasHandlers(): + handler = logging.StreamHandler() + formatter = logging.Formatter('%(asctime)s | %(levelname)s | %(filename)s:%(lineno)d | %(message)s') + handler.setFormatter(formatter) + cls.logger.addHandler(handler) + cls.logger.setLevel(logging.INFO) + + @classmethod + def get_results_file_path(cls): + """Dynamically generate a unique file path based on the class name.""" + return Path(f"{cls.__qualname__}_stored_results.json") + + @classmethod + def tearDownClass(cls) -> None: + # A function that will be automatically called after each test call + # This removes all the loaded models. It will also remove all the indexes inside a marqo instance. + # Be sure to set the indexes_to_delete list with the indexes you want to delete, in the test class. + cls.removeAllModels() + if cls.indexes_to_delete: + cls.delete_indexes(cls.indexes_to_delete) + cls.logger.debug(f"Deleting indexes {cls.indexes_to_delete}") + + @classmethod + def save_results_to_file(cls, results): + """Save results to a JSON file.""" + filepath = cls.get_results_file_path() + with filepath.open('w') as f: + json.dump(results, f, indent=4) + cls.logger.debug(f"Results saved to {filepath}") + + @classmethod + def load_results_from_file(cls): + """Load results from a JSON file.""" + filepath = cls.get_results_file_path() + with filepath.open('r') as f: + results = json.load(f) + cls.logger.debug(f"Results loaded from {filepath}") + return results + + @classmethod + def delete_file(cls): + """Delete the results file.""" + filepath = cls.get_results_file_path() + filepath.unlink() + cls.logger.debug(f"Results file deleted: {filepath}") + + @abstractmethod + def prepare(self): + """Prepare marqo state like adding documents""" + pass + + @classmethod + def set_logging_level(cls, level: str): + log_level = getattr(logging, level.upper(), None) + if log_level is None: + raise ValueError(f"Invalid log level: {level}. Using current log level: {logging.getLevelName(cls.logger.level)}.") + cls.logger.setLevel(log_level) + for handler in cls.logger.handlers: + handler.setLevel(log_level) + cls.logger.info(f"Logging level changed to. {level.upper()}") \ No newline at end of file diff --git a/tests/backwards_compatibility_tests/compatibility_test_logger.py b/tests/backwards_compatibility_tests/compatibility_test_logger.py new file mode 100644 index 000000000..a106abdc9 --- /dev/null +++ b/tests/backwards_compatibility_tests/compatibility_test_logger.py @@ -0,0 +1,7 @@ +import logging + +def get_logger(name): + logging.basicConfig(format='%(asctime)s | %(levelname)s | %(filename)s:%(lineno)d | %(message)s', datefmt='%Y-%m-%d %H:%M:%S', level=logging.DEBUG) + logger = logging.getLogger(name) + + return logger \ No newline at end of file diff --git a/tests/backwards_compatibility_tests/compatibility_test_runner.py b/tests/backwards_compatibility_tests/compatibility_test_runner.py new file mode 100755 index 000000000..7741be07a --- /dev/null +++ b/tests/backwards_compatibility_tests/compatibility_test_runner.py @@ -0,0 +1,611 @@ +import argparse +import importlib +import pkgutil +import time + +import pytest +from typing import Optional, Set +import subprocess +import sys +import requests +import semver +import traceback + +from compatibility_test_logger import get_logger + +marqo_transfer_state_version = semver.VersionInfo.parse("2.9.0") + +from base_compatibility_test_case import BaseCompatibilityTestCase + +# Keep track of containers that need cleanup +containers_to_cleanup: Set[str] = set() +volumes_to_cleanup: Set[str] = set() + +logger = get_logger(__name__) + +def load_all_subclasses(package_name): + package = importlib.import_module(package_name) + package_path = package.__path__ + + for _, module_name, _ in pkgutil.iter_modules(package_path): + importlib.import_module(f"{package_name}.{module_name}") + + +#TODO: Explore using docker python SDK docker-py to replace the subprocess call, https://github.com/marqo-ai/marqo/pull/1024#discussion_r1841689970 +def pull_remote_image_from_ecr(image_identifier: str): + """ + Pulls a Docker image from Amazon ECR using the image_identifier and optionally retags it locally. + + Args: + image_identifier (str): The unique identifier for a to_version image. It can be either be the fully qualified image name with the tag + (ex: 424082663841.dkr.ecr.us-east-1.amazonaws.com/marqo-compatibility-tests:abcdefgh1234) + or the fully qualified image name with the digest (ex: 424082663841.dkr.ecr.us-east-1.amazonaws.com/marqo-compatibility-tests@sha256:1234567890abcdef). + This is constructed in build_push_image.yml workflow and will be the qualified image name with digest for an automatically triggered workflow. + + Returns: + str: The local tag of the pulled and retagged Docker image. + + Raises: + Exception: If there is an error during the Docker image pull or retagging process. + """ + ecr_registry = "424082663841.dkr.ecr.us-east-1.amazonaws.com" + image_repo = "marqo-compatibility-tests" + + try: + # Log in to ECR + login_password = subprocess.run( + ["aws", "ecr", "get-login-password", "--region", "us-east-1"], + check=True, + stdout=subprocess.PIPE + ).stdout.decode('utf-8') + subprocess.run( + ["docker", "login", "--username", "AWS", "--password-stdin", ecr_registry], + input=login_password.encode('utf-8'), + check=True + ) + # Pull the Docker image from ECR + image_full_name = image_identifier + logger.debug(f"Pulling image: {image_full_name}") + subprocess.run(["docker", "pull", image_full_name], check=True) + + # Optionally retag the image locally to marqo-ai/marqo + hash_part = image_identifier.split(":")[1] if ":" in image_identifier else image_identifier + local_tag = f"marqo-ai/marqo:{hash_part}" #it should now be called marqo-ai/marqo:sha-token for image with image digest or marqo-ai/marqo:github.sha for an image with image tag + logger.debug(f"Retagging image to: {local_tag}") + subprocess.run(["docker", "tag", image_full_name, local_tag], check=True) + return local_tag + except subprocess.CalledProcessError as e: + logger.debug(f"Command '{e.cmd}' failed with return code {e.returncode}") + logger.debug("Error output:", e.output.decode() if e.output else "No output") + traceback.print_exc() # Print the full stack trace for debugging + raise Exception(f"Failed to pull Docker image {image_identifier}: {e}") + except Exception as e: + logger.debug("An unexpected error occurred while pulling the Docker image.") + traceback.print_exc() # Print full stack trace for debugging + raise e + +def pull_marqo_image(image_identifier: str, source: str): + """ + Pull the specified Marqo Docker image. + + Args: + image_identifier (str): The identifier with which to pull the docker image. + It can simply be the image name if pulling from DockerHub, + or it can be the image digest if pulling from ECR + source (str): The source from which to pull the image. + It can be either 'docker' for Docker Hub or 'ECR' for Amazon ECR. + + Returns: + str: The name of the pulled Docker image. + + Raises: + Exception: If there is an error during the Docker image pull process. + """ + try: + if source == "docker": + logger.debug(f"pulling this image: {image_identifier} from Dockerhub") + subprocess.run(["docker", "pull", image_identifier], check=True) + return image_identifier + elif source == "ECR": + return pull_remote_image_from_ecr(image_identifier) + except subprocess.CalledProcessError as e: + raise Exception(f"Failed to pull Docker image {image_identifier}: {e}") + + +def start_marqo_from_version_container(from_version: str, from_version_volume, env_vars: Optional[list] = None): + """ + Start a Marqo container after pulling the required image from docker and creating a volume. + The volume is mounted to a specific point such that it can be later used to transfer state to a different version of Marqo. + + Args: + from_version (str): The version of the Marqo container to start. + from_version_volume: The volume to use for the container. + from_version_image (Optional[str]): The specific image to use for the container. Defaults to None. + env_vars (Optional[list]): A list of environment variables to set in the container. Defaults to None. + """ + + source = "docker" #The source for from_version image would always be docker because it's supposed to be an already released docker image + + logger.debug(f"Starting Marqo container with from_version: {from_version}, from_version_volume: {from_version_volume}, source: {source}") + from_version_image = f"marqoai/marqo:{from_version}" + container_name = f"marqo-{from_version}" + + logger.debug(f"Using image: {from_version_image} with container name: {container_name}") + + # Pull the image before starting the container + pull_marqo_image(from_version_image, source) + + # Stop and remove the container if it exists + try: + subprocess.run(["docker", "rm", "-f", container_name], check=True) + except Exception as e: + logger.debug(f"Container {container_name} not found, skipping removal.") + + # Prepare the docker run command + cmd = [ + "docker", "run", "-d", + "--name", container_name, + "-p", "8882:8882", + "-e", "MARQO_ENABLE_BATCH_APIS=TRUE", + "-e", "MARQO_MAX_CPU_MODEL_MEMORY=1.6" + ] + + # Append environment variables passed via the method + if env_vars: + for var in env_vars: + cmd.extend(["-e", var]) + + # Handle version-specific volume mounting + + # Mounting volumes for Marqo >= 2.9 + # Use the provided volume for state transfer + from_version_volume = create_volume_for_marqo_version(from_version, from_version_volume) + logger.debug(f"from_version volume = {from_version_volume}") + if from_version >= marqo_transfer_state_version: + # setting volume to be mounted at /opt/vespa/var because starting from 2.9, the state is stored in /opt/vespa/var + cmd.extend(["-v", f"{from_version_volume}:/opt/vespa/var"]) + else: + # setting volume to be mounted at /opt/vespa because before 2.9, the state was stored in /opt/vespa + cmd.extend(["-v", f"{from_version_volume}:/opt/vespa"]) # volume name will be marqo_2_12_0_volume + + # Append the image + cmd.append(from_version_image) + logger.debug(f"Running command: {' '.join(cmd)}") + + try: + # Run the docker command + subprocess.run(cmd, check=True) + containers_to_cleanup.add(container_name) + + # Follow docker logs + log_cmd = ["docker", "logs", "-f", container_name] + log_process = subprocess.Popen(log_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + + # Wait for the Marqo service to start + logger.debug("Waiting for Marqo to start...") + while True: + try: + response = requests.get("http://localhost:8882", verify=False) + if "Marqo" in response.text: + logger.debug("Marqo started successfully.") + break + except requests.ConnectionError: + pass + output = log_process.stdout.readline() + if output: + logger.debug(output.strip()) + time.sleep(0.1) + + # Stop following logs after Marqo starts + log_process.terminate() + log_process.wait() + logger.debug("Stopped following docker logs.") + + except subprocess.CalledProcessError as e: + logger.debug(f"Failed to start Docker container {container_name}: {e}") + raise + + # Show the running containers + try: + subprocess.run(["docker", "ps"], check=True) + except subprocess.CalledProcessError as e: + logger.debug(f"Failed to list Docker containers: {e}") + raise + +def start_marqo_to_version_container(to_version: str, from_version: str, from_version_volume: str, + to_version_identifier: str, env_vars: Optional[list] = None): + """ + Start a Marqo container for the specified to_version, transferring state from the from_version container. + The state is transferred by copying the state from the from_version container to the to_version container, by re-using the + from_version_volume created when starting from_version container. + Args: + to_version (str): The target version of the Marqo container to start. + from_version (str): The source version of the Marqo container. The from_version parameter is later used to determine how we transfer state. + from_version_volume (str): The volume to use for the container. + to_version_identifier (str): The unique identifier for a to_version image. It can be either be the fully qualified image name with the tag (ex: 424082663841.dkr.ecr.us-east-1.amazonaws.com/marqo-compatibility-tests:abcdefgh1234) + or the fully qualified image name with the digest (ex: 424082663841.dkr.ecr.us-east-1.amazonaws.com/marqo-compatibility-tests@sha256:1234567890abcdef). + This is constructed in build_push_image.yml workflow and will be the qualified image name with digest for an automatically triggered workflow. + env_vars (Optional[list]): A list of environment variables to set in the container. Defaults to None. + """ + source = "ECR" #Source of a to_version image will always be ECR because we build and push unpublished & to be tested images to ECR + logger.debug( + f"Starting Marqo container with to_version: {to_version}, " + f"from_version: {from_version} " + f"from_version_volume: {from_version_volume}, to_version_identifier: {to_version_identifier}, source: {source}") + container_name = f"marqo-{to_version}" + to_version = semver.VersionInfo.parse(to_version) + from_version = semver.VersionInfo.parse(from_version) + + logger.debug(f"Using image: {to_version_identifier} with container name: {container_name}") + + # Pull the image before starting the container + to_version_image_name = pull_marqo_image(to_version_identifier, source) + logger.debug(f" Printing image name {to_version_image_name}") + try: + subprocess.run(["docker", "rm", "-f", container_name], check=True) + except subprocess.CalledProcessError: + logger.debug(f"Container {container_name} not found, skipping removal.") + + # Prepare the docker run command + cmd = [ + "docker", "run", "-d", + "--name", container_name, + "-p", "8882:8882", + "-e", "MARQO_ENABLE_BATCH_APIS=TRUE", + "-e", "MARQO_MAX_CPU_MODEL_MEMORY=1.6" + ] + # Append environment variables passed via the method + if env_vars: + for var in env_vars: + cmd.extend(["-e", var]) + + + if from_version >= marqo_transfer_state_version and to_version >= marqo_transfer_state_version: + # Use the provided volume for state transfer + cmd.extend(["-v", f"{from_version_volume}:/opt/vespa/var"]) #setting volume to be mounted at /opt/vespa/var because starting from 2.9, the state is stored in /opt/vespa/var + elif from_version < marqo_transfer_state_version and to_version < marqo_transfer_state_version: + cmd.extend(["-v", f"{from_version_volume}:/opt/vespa"]) #setting volume to be mounted at /opt/vespa because before 2.9, the state was stored in /opt/vespa + elif from_version < marqo_transfer_state_version <= to_version: # Case when from_version is <2.9 and to_version is >=2.9 + # Here you need to explicitly copy + to_version_volume = create_volume_for_marqo_version(str(to_version), None) + copy_state_from_container(from_version_volume, to_version_volume, to_version_image_name) + cmd.extend(["-v", f"{to_version_volume}:/opt/vespa/var"]) + + cmd.append(to_version_image_name) + + logger.debug(f"Running command: {' '.join(cmd)}") + + try: + # Run the docker command + subprocess.run(cmd, check=True) + containers_to_cleanup.add(container_name) + + # Follow docker logs + log_cmd = ["docker", "logs", "-f", container_name] + log_process = subprocess.Popen(log_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + + # Wait for the Marqo service to start + logger.debug("Waiting for Marqo to start...") + while True: + try: + response = requests.get("http://localhost:8882", verify=False) + if "Marqo" in response.text: + logger.debug("Marqo started successfully.") + break + except requests.ConnectionError: + pass + output = log_process.stdout.readline() + if output: + logger.debug(output.strip()) + time.sleep(0.1) + + # Stop following logs after Marqo starts + log_process.terminate() + log_process.wait() + logger.debug("Stopped following docker logs.") + + except subprocess.CalledProcessError as e: + logger.debug(f"Failed to start Docker container {container_name}: {e}") + raise + + # Show the running containers + try: + subprocess.run(["docker", "ps"], check=True) + except subprocess.CalledProcessError as e: + logger.debug(f"Failed to list Docker containers: {e}") + raise + + +def stop_marqo_container(version: str): + """ + Stop a Marqo container but don't remove it yet. + + Args: + version (str): The version of the Marqo container to stop. + """ + container_name = f"marqo-{version}" + logger.debug(f"Stopping container with container name {container_name}") + try: + subprocess.run(["docker", "stop", container_name], check=True) + logger.debug(f"Successfully stopped container {container_name}") + except subprocess.CalledProcessError as e: + logger.debug(f"Warning: Failed to stop container {container_name}: {e}") + + +def cleanup_containers(): + """ + Remove all containers that were created during the test. + + This function iterates over the set of containers to clean up and attempts to remove each one using the Docker CLI. + If a container cannot be removed, a warning message is printed. + + Raises: + subprocess.CalledProcessError: If there is an error during the container removal process. + """ + for container_name in containers_to_cleanup: + try: + subprocess.run(["docker", "rm", "-f", container_name], check=True) + except subprocess.CalledProcessError as e: + logger.debug(f"Warning: Failed to remove container {container_name}: {e}") + containers_to_cleanup.clear() + +def cleanup_volumes(): + """ + Remove all Docker volumes that were created during the test. + + This function iterates over the set of volumes to clean up and attempts to remove each one using the Docker CLI. + If a volume cannot be removed, a warning message is printed. + + Raises: + subprocess.CalledProcessError: If there is an error during the volume removal process. + """ + for volume_name in volumes_to_cleanup: + try: + subprocess.run(["docker", "volume", "rm", volume_name], check=True) + except subprocess.CalledProcessError as e: + logger.debug(f"Warning: Failed to remove volume {volume_name}: {e}") + volumes_to_cleanup.clear() + +def backwards_compatibility_test(from_version: str, to_version: str, to_version_image_idenfitifer: str): + """ + Perform a backwards compatibility test between two versions of Marqo. + + This function starts a container with the from_version, runs tests in prepare mode, stops the container, + starts a container with the to_version by transferring state from from_version container, and runs tests in test mode. + + Args: + from_version (str): The source version of the Marqo container. + to_version (str): The target version of the Marqo container. + to_version_image_idenfitifer (str): The unique identifier for a to_version image. It can be either be the fully qualified image name with the tag + (ex: 424082663841.dkr.ecr.us-east-1.amazonaws.com/marqo-compatibility-tests:abcdefgh1234) + or the fully qualified image name with the digest (ex: 424082663841.dkr.ecr.us-east-1.amazonaws.com/marqo-compatibility-tests@sha256:1234567890abcdef). + This is constructed in build_push_image.yml workflow and will be the qualified image name with digest for an automatically triggered workflow. + + Raises: + ValueError: If the major versions of from_version and to_version are incompatible. + Exception: If there is an error during the test process. + """ + try: + # Step 1: Start from_version container and run tests in prepare mode + logger.debug(f"Starting backwards compatibility tests with from_version: {from_version}, to_version: {to_version}, to_version_image_idenfitifer: {to_version_image_idenfitifer}") + # Check for version compatibility + from_major_version = int(from_version.split('.')[0]) + logger.debug(f"from major version = {from_major_version}") + to_major_version = int(to_version.split('.')[0]) + if from_major_version != to_major_version: + logger.debug(f"from version & to_version can be tested for backwards_compatibility") + raise ValueError("Cannot transfer state between incompatible major versions of Marqo.") + logger.debug(f"Transferring state from version {from_version} to {to_version}") + + from_version_volume = get_volume_name_from_marqo_version(from_version) + start_marqo_from_version_container(from_version, from_version_volume) + logger.debug(f"Started marqo container {from_version}") + + try: + run_tests_across_versions("prepare", from_version, to_version) + except Exception as e: + logger.error(f"Error running tests across versions in 'prepare' mode: {e}") + raise e + # Step 2: Stop from_version container (but don't remove it) + stop_marqo_container(from_version) + + # Step 3: Start to_version container, transferring state + start_marqo_to_version_container(to_version, from_version, from_version_volume, to_version_image_idenfitifer) + logger.debug(f"Started marqo to_version: {to_version} container by transferring state") + # Step 4: Run tests + try: + run_tests_across_versions("test", from_version, to_version) + except Exception as e: + logger.error(f"Error running tests across versions in 'test' mode: {e}") + raise e + logger.debug("Finished running tests in Test mode") + # Step 5: Do a full test run which includes running tests in prepare and test mode on the same container + try: + full_test_run(to_version) + except Exception as e: + logger.error(f"Error running tests in full test run mode: {e}") + raise e + except Exception as e: + logger.debug(f"An error occurred while executing backwards compatibility tests: {e}") + raise e + finally: + # Stop the to_version container (but don't remove it yet) + logger.debug("Calling stop_marqo_container with " + str(to_version)) + stop_marqo_container(to_version) + # Clean up all containers at the end + cleanup_containers() + cleanup_volumes() + + + +def rollback_test(to_version: str, from_version: str, to_version_digest, from_image: Optional[str] = None, + to_image: Optional[str] = None): + """ + Perform a rollback test between two versions of Marqo. + + This function first performs a backwards compatibility test from the from_version to the to_version. + Then, it stops the to_version container, starts the from_version container again, and runs tests in test mode. + + Args: + to_version (str): The target version of the Marqo container. + from_version (str): The source version of the Marqo container. + to_version_digest: The specific image digest to use for the to_version container. + from_image (Optional[str]): The specific image to use for the from_version container. Defaults to None. + to_image (Optional[str]): The specific image to use for the to_version container. Defaults to None. + """ + try: + backwards_compatibility_test(from_version, to_version, None) + + stop_marqo_container(to_version) + + start_marqo_from_version_container(from_version, None) + + run_tests_across_versions("test", from_version, to_version) + finally: + # Stop the final container (but don't remove it yet) + stop_marqo_container(from_version) + # Clean up all containers at the end + cleanup_containers() + +def run_tests_across_versions(mode: str, from_version: str, to_version: str): + """ + This method will run tests across two Marqo versions, meaning it will run prepare on a Marqo from_version instance, + and run tests on a Marqo to_version instance. + """ + logger.debug(f"Running tests across versions with mode: {mode}, from_version: {from_version}, to_version: {to_version}") + + if mode == "prepare": + run_prepare_mode(from_version) + elif mode == "test": + run_test_mode(from_version) + +def full_test_run(to_version: str): + """ + This method will run tests on a single marqo version container, which means it will run both prepare and tests on the + to_version Marqo container. Note that to_version Marqo container has been created by transferring instance from a + previous from_version Marqo container. + """ + logger.debug(f"Running full_test_run with to_version: {to_version}") + #Step 1: Run tests in prepare mode + run_prepare_mode(to_version) + #Step 2: Run tests in test mode + run_test_mode(to_version) + +def run_prepare_mode(version_to_test_against: str): + load_all_subclasses("tests.backwards_compatibility_tests") + # Get all subclasses of `BaseCompatibilityTestCase` that match the `version_to_test_against` criterion + # The below condition also checks if the test class is not marked to be skipped + tests = [test_class for test_class in BaseCompatibilityTestCase.__subclasses__() + if (getattr(test_class, 'marqo_version', '0') <= version_to_test_against and getattr(test_class, 'skip', False) == False)] + for test_class in tests: + test_class.setUpClass() #setUpClass will be used to create Marqo client + test_instance = test_class() + test_instance.prepare() #Prepare method will be used to create index and add documents + +def construct_pytest_arguments(version_to_test_against): + pytest_args = [ + f"--version_to_compare_against={version_to_test_against}", + "-m", f"marqo_version", + "tests/backwards_compatibility_tests" + ] + return pytest_args + +def run_test_mode(version_to_test_against): + pytest_args = construct_pytest_arguments(version_to_test_against) + pytest.main(pytest_args) + +def create_volume_for_marqo_version(version: str, volume_name: str): + """ + Create a Docker volume for the specified Marqo version. + + This function replaces dots with underscores in the version string to format the volume name. + If no volume name is provided, it generates one based on the version. + + Args: + version (str): The version of the Marqo container. + volume_name (str): The name of the Docker volume to create. If None, a name is generated based on the version. + + Returns: + str: The name of the created Docker volume. + + Raises: + subprocess.CalledProcessError: If there is an error during the Docker volume creation process. + """ + # Replace dots with underscores to format the volume name + if volume_name is None: + volume_name = get_volume_name_from_marqo_version(version) + + # Create the Docker volume using the constructed volume name + try: + subprocess.run(["docker", "volume", "create", "--name", volume_name], check=True) + logger.debug(f"Successfully created volume: {volume_name}") + volumes_to_cleanup.add(volume_name) + return volume_name + except subprocess.CalledProcessError as e: + logger.debug(f"Failed to create volume: {volume_name}. Error: {e}") + + + #TODO: Make it compatible for when you directly pass and image and no version is passed. +def get_volume_name_from_marqo_version(version): + """ + Generate a Docker volume name based on the Marqo version. + + This function replaces dots with underscores in the version string to format the volume name. + + Args: + version (str): The version of the Marqo container. + + Returns: + str: The formatted Docker volume name. + """ + volume_name = f"marqo_{version.replace('.', '_')}_volume" + return volume_name + + +def copy_state_from_container( + from_version_volume: str, to_version_volume: str, image: str): + """ + Copy the state from one Docker volume to another using a specified Docker image. + + This function runs a Docker container with the specified image, mounts the source and target volumes, + and copies the contents from the source volume to the target volume. It is specifically used + in case when from_version is <2.9 and to_version is >=2.9. + + Args: + from_version_volume (str): The name of the source Docker volume. + to_version_volume (str): The name of the target Docker volume. + image (str): The Docker image to use for the container. + + Raises: + subprocess.CalledProcessError: If there is an error during the Docker run or copy process. + """ + + cmd = ["docker", "run", "--rm", "-it", "--entrypoint=''", + "-v", f"{from_version_volume}:/opt/vespa_old", + "-v", f"{to_version_volume}:/opt/vespa/var", + f"{image}", + "sh", "-c", 'cd /opt/vespa_old && cp -a . /opt/vespa/var'] + try: + subprocess.run(cmd, check=True) + logger.debug(f"Successfully copied state from {from_version_volume} to {to_version_volume}") + except subprocess.CalledProcessError as e: + logger.debug(f"Failed to copy state from {from_version_volume} to {to_version_volume}. Error: {e}") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Marqo Testing Runner") + parser.add_argument("--mode", choices=["backwards_compatibility", "rollback"], required=True) + parser.add_argument("--from_version", required=True) + parser.add_argument("--to_version", required=True) + parser.add_argument("--to_version_image_identifier", required=True) + args = parser.parse_args() + + from_version = semver.VersionInfo.parse(args.from_version) + to_version = semver.VersionInfo.parse(args.to_version) + if from_version >= to_version: + logger.debug("from_version should be less than to_version") + sys.exit(0) # TODO: figure out if we should just quit. + + if args.mode == "backwards_compatibility": + backwards_compatibility_test(args.from_version, args.to_version, args.to_version_image_identifier) + elif args.mode == "rollback": + rollback_test(args.to_version, args.from_version, args.to_version_image_identifier) diff --git a/tests/backwards_compatibility_tests/conftest.py b/tests/backwards_compatibility_tests/conftest.py new file mode 100644 index 000000000..86b2656cd --- /dev/null +++ b/tests/backwards_compatibility_tests/conftest.py @@ -0,0 +1,32 @@ +import pytest +import semver +from tests.backwards_compatibility_tests.compatibility_test_logger import get_logger + +logger = get_logger(__name__) + +def pytest_addoption(parser): + parser.addoption("--version_to_compare_against", action="store", default="2.7", help="version to start from") + +@pytest.fixture +def version_to_compare_against(request): + return request.config.getoption("--version_to_compare_against") + + +def pytest_collection_modifyitems(config, items): + version = config.getoption("--version_to_compare_against") # This version will help us determine which test to skip v/s which test to collect. + # The actual value inside the version can be from_version value (in case of test run where we run prepare on a from_version marqo instance, + # and tests on a to_version marqo instance) or a to_version value (in case of a full test run where we run prepare and test on the same Marqo instance) + + for item in items: + test_case_version_marker = item.get_closest_marker("marqo_version") + + if test_case_version_marker: + test_case_version = test_case_version_marker.args[0] #test_case_version is the version defined as the argument in the "marqo_version" marker above each compatibility test + logger.debug(f"Checking test: {item.name} with version: {test_case_version}") + # Compare the test's required version with the version + logger.debug(f"Test version: {test_case_version}, v/s version supplied in pytest arguments: {version}") + test_case_version = semver.VersionInfo.parse(test_case_version) + if test_case_version > version: + item.add_marker(pytest.mark.skip(reason=f"Test requires marqo_version {test_case_version} which is not greater than version supplied {version}. Skipping.")) + + logger.debug(f"Total collected tests: {len(items)}") diff --git a/tests/backwards_compatibility_tests/marqo_test.py b/tests/backwards_compatibility_tests/marqo_test.py new file mode 100644 index 000000000..f7ab0f29f --- /dev/null +++ b/tests/backwards_compatibility_tests/marqo_test.py @@ -0,0 +1,89 @@ +"""This test class requires you to have a running Marqo instance to test against! + +Pass its settings to local_marqo_settings. +""" +from typing import List, Dict +import json + +import unittest +from marqo.utils import construct_authorized_url +from marqo import Client +from marqo.errors import MarqoWebError +import requests + + +class MarqoTestCase(unittest.TestCase): + + indexes_to_delete = [] + _MARQO_URL = "http://localhost:8882" + + @classmethod + def setUpClass(cls) -> None: + local_marqo_settings = { + "url": cls._MARQO_URL + } + cls.client_settings = local_marqo_settings + cls.authorized_url = cls.client_settings["url"] + # A list with index names to be cleared in each setUp call and to be deleted in tearDownClass call + cls.indexes_to_delete: List[str] = [] + cls.client = Client(**cls.client_settings) + + @classmethod + def tearDownClass(cls) -> None: + # A function that will be automatically called after each test call + # This removes all the loaded models to save memory space. + cls.removeAllModels() + if cls.indexes_to_delete: + cls.delete_indexes(cls.indexes_to_delete) + + def setUp(self) -> None: + if self.indexes_to_delete: + self.clear_indexes(self.indexes_to_delete) + + @classmethod + def create_indexes(cls, index_settings_with_name: List[Dict]): + """A function to call the internal Marqo API to create a batch of indexes. + Use camelCase for the keys. + """ + + r = requests.post(f"{cls._MARQO_URL}/batch/indexes/create", data=json.dumps(index_settings_with_name)) + + try: + r.raise_for_status() + except requests.exceptions.HTTPError as e: + raise MarqoWebError(e) + + @classmethod + def delete_indexes(cls, index_names: List[str]): + r = requests.post(f"{cls._MARQO_URL}/batch/indexes/delete", data=json.dumps(index_names)) + + try: + r.raise_for_status() + except requests.exceptions.HTTPError as e: + raise MarqoWebError(e) + + @classmethod + def clear_indexes(cls, index_names: List[str]): + for index_name in index_names: + r = requests.delete(f"{cls._MARQO_URL}/indexes/{index_name}/documents/delete-all") + try: + r.raise_for_status() + except requests.exceptions.HTTPError as e: + raise MarqoWebError(e) + + + @classmethod + def removeAllModels(cls) -> None: + # A function that can be called to remove loaded models in Marqo. + # Use it whenever you think there is a risk of OOM problem. + # E.g., add it into the `tearDown` function to remove models between test cases. + client = Client(**cls.client_settings) + index_names_list: List[str] = [item["indexName"] for item in client.get_indexes()["results"]] + for index_name in index_names_list: + loaded_models = client.index(index_name).get_loaded_models().get("models", []) + for model in loaded_models: + try: + client.index(index_name).eject_model(model_name=model["model_name"], model_device=model["model_device"]) + except MarqoWebError: + pass + diff --git a/tests/backwards_compatibility_tests/pytest.ini b/tests/backwards_compatibility_tests/pytest.ini new file mode 100644 index 000000000..ca0646122 --- /dev/null +++ b/tests/backwards_compatibility_tests/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +markers = + marqo_version(version): marker for specifying Marqo version requirement in tests \ No newline at end of file diff --git a/tests/backwards_compatibility_tests/requirements.txt b/tests/backwards_compatibility_tests/requirements.txt new file mode 100755 index 000000000..e0e4f96b6 --- /dev/null +++ b/tests/backwards_compatibility_tests/requirements.txt @@ -0,0 +1,6 @@ +pytest==7.4.3 +pillow==9.3.0 +numpy==1.23.4 +marqo +semver==3.0.2 +uvicorn[STANDARD] \ No newline at end of file diff --git a/tests/backwards_compatibility_tests/scripts/determine_to_version.py b/tests/backwards_compatibility_tests/scripts/determine_to_version.py new file mode 100644 index 000000000..f602167bd --- /dev/null +++ b/tests/backwards_compatibility_tests/scripts/determine_to_version.py @@ -0,0 +1,45 @@ +import os +import subprocess +import sys + +sys.path.append(os.path.join(os.path.dirname(__file__), '../../../src')) +from marqo.version import __version__ +import semver + +def determine_to_version(run_commit_hash: str, current_marqo_version: str): + """ + This function determines the to_version. + It does so by looking at version.py file. However there can be times where active development is going on and + Marqo developers have not yet updated the version.py file with the next (i.e To be released) version. In such cases, we need to + determine the to version by looking at the latest tag in the git repository. + + If for version v, tag t exists, then it could mean that version v is already released and the developers are working on version v+1. + We determine if this is the case by comparing commit hash of tag t and commit hash of the github workflow run. If they're different, we can + conclude that version v is already released, and to test backwards compatibility we need to test against version v as well. Thus we set to_version = v+1. + + If the commit hash of tag t and commit hash of the github workflow run are the same, then we can conclude that this may be a re-run. Similar to this case, + if the tag t for version v doesn't exist yet, we can determine that version v is the upcoming To be released version. In this case we set to_version = v. + """ + tag = subprocess.check_output(["git", "tag", "--list", f"{current_marqo_version}"], + text=True).splitlines() #Determine if tags exist for current_marqo_version picked from version.py file + if tag: #If tag already exists for the current_marqo_version, it means that this version is already released and we are working towards the next version release, thus we need to treat this commit as commit of the next version release. + try: + tag_commit_hash = subprocess.check_output( #Determining commit hash of the tag + ["git", "rev-list", "-n", "1", tag[0]], + text=True + ).strip() + if tag_commit_hash != run_commit_hash: #If commit hashes don't match, it means that this commit is for the next version, thus we need to set to_version to version.bump_patch(). + to_version = semver.VersionInfo.parse(current_marqo_version).bump_patch() + return str(to_version) + elif tag_commit_hash == run_commit_hash: #If the commit hashes are the same - it means that this could be a manual re-run, in that case no need to set to_version to version.bump_patch(). + return current_marqo_version + except subprocess.CalledProcessError as e: + print(f"Error while determining to_version: {e}") + else: #If tags don't exist, it means that this commit is for a new version whose tag is yet to be released, thus our to_version can be the version picked up from versions.py + return current_marqo_version + +if __name__ == "__main__": + commit_hash = sys.argv[1] # Get to version from the command line + current_marqo_version = __version__ + to_version = determine_to_version(commit_hash, current_marqo_version) + print(to_version) # Output versions as a comma-separated string diff --git a/tests/backwards_compatibility_tests/scripts/generate_versions.py b/tests/backwards_compatibility_tests/scripts/generate_versions.py new file mode 100644 index 000000000..83614749a --- /dev/null +++ b/tests/backwards_compatibility_tests/scripts/generate_versions.py @@ -0,0 +1,50 @@ +import json + +import semver +import subprocess +import sys + +def generate_versions(to_version: str, num_versions: int = 3) -> list: + """ + Generate a list of previous versions based on the target version. + + This function generates a list of previous versions for a given target version. + It includes the previous patch version of the same minor version if applicable, + and the latest patch versions for preceding minor versions. + + Args: + to_version (str): The target version to generate previous versions for. + num_versions (int): The number of previous versions to generate. Defaults to 3. + + Returns: + list: A list of previous versions as strings. + """ + target_version = semver.VersionInfo.parse(to_version) + versions = [] + + # If this is a patch release, add the previous patch version of the same minor version + if target_version.patch > 0: + prev_patch_version = f"{target_version.major}.{target_version.minor}.{target_version.patch - 1}" + versions.append(prev_patch_version) + + # Gather the latest patch version for each preceding minor version + minor = target_version.minor - 1 + while len(versions) < num_versions and minor >= 0: + # Get all tags for the given minor version, sort, and pick the latest patch + tags = subprocess.check_output( + ["git", "tag", "--list", f"{target_version.major}.{minor}.*"], + text=True + ).splitlines() + + # Filter and find the latest patch version tag + if tags: + latest_patch = max(tags, key=semver.VersionInfo.parse) + versions.append(latest_patch.lstrip("v")) + minor -= 1 + return versions + +if __name__ == "__main__": + to_version = sys.argv[1] # Get to version from the command line + num_versions = sys.argv[2] # Get number of versions to generate + versions = generate_versions(to_version, int(num_versions)) + print(json.dumps(versions)) # Output versions as Json diff --git a/tests/backwards_compatibility_tests/test_general_compatibility.py b/tests/backwards_compatibility_tests/test_general_compatibility.py new file mode 100644 index 000000000..f22547a01 --- /dev/null +++ b/tests/backwards_compatibility_tests/test_general_compatibility.py @@ -0,0 +1,203 @@ +import pytest + +from base_compatibility_test_case import BaseCompatibilityTestCase +from marqo_test import MarqoTestCase +import marqo + + +@pytest.mark.marqo_version('2.11.0') #TODO: Check this again +class GeneralCompatibilityTest(BaseCompatibilityTestCase): + + image_model = 'open_clip/ViT-B-32/laion2b_s34b_b79k' + multimodal_weights = {"image_field": 0.9, "text_field": 0.1} + mappings = { + "multimodal_field": { + "type": "multimodal_combination", + "weights": multimodal_weights, + } + } + tensor_fields = ["multimodal_field", "text_field", "image_field"] + structured_index_metadata = { + "indexName": "structured-index-2-11", + "type": "structured", + "vectorNumericType": "float", + "model": image_model, + "normalizeEmbeddings": True, + "textPreprocessing": { + "splitLength": 2, + "splitOverlap": 0, + "splitMethod": "sentence", + }, + "imagePreprocessing": {"patchMethod": None}, + "allFields": [ + {"name": "text_field", "type": "text", "features": ["lexical_search"]}, + {"name": "caption", "type": "text", "features": ["lexical_search", "filter"]}, + {"name": "tags", "type": "array", "features": ["filter"]}, + {"name": "image_field", "type": "image_pointer"}, + {"name": "my_int", "type": "int", "features": ["score_modifier"]}, + # this field maps the above image field and text fields into a multimodal combination. + { + "name": "multimodal_field", + "type": "multimodal_combination", + "dependentFields": multimodal_weights, + }, + ], + "tensorFields": tensor_fields, + "annParameters": { + "spaceType": "prenormalized-angular", + "parameters": {"efConstruction": 512, "m": 16}, + }, + } + + unstructured_index_metadata = { + "indexName": "unstructured-index-2-11", + "model": image_model, + "treatUrlsAndPointersAsImages": True, + } + + hybrid_search_params = { + "retrievalMethod": "disjunction", + "rankingMethod": "rrf", + "alpha": 0.3, + "rrfK": 60, + "searchableAttributesLexical": ["text_field"], + "searchableAttributesTensor": ['image_field', 'multimodal_field'], + "scoreModifiersTensor": { + "add_to_score": [{"field_name": "my_int", "weight": 0.01}] + }, + "scoreModifiersLexical": { + "add_to_score": [{"field_name": "my_int", "weight": 0.01}] + }, + } + + docs = [ + { + '_id': 'example_doc_1', + 'text_field': 'Man riding a horse', + 'image_field': 'https://raw.githubusercontent.com/marqo-ai/marqo/mainline/examples/ImageSearchGuide/data/image1.jpg', + 'tags': ['man', 'horse'], + 'my_int': 1 + }, + { + "_id": f"example_doc_2", + "text_field": "Flying Plane", + "image_field": "https://raw.githubusercontent.com/marqo-ai/marqo/mainline/examples/ImageSearchGuide/data/image2.jpg", + 'tags': ['plane'], + 'my_int': 2 + }, + { + "_id": f"example_doc_3", + "text_field": "Traffic light", + "image_field": "https://raw.githubusercontent.com/marqo-ai/marqo/mainline/examples/ImageSearchGuide/data/image3.jpg", + 'tags': ['light'], + 'caption': 'example_doc_3' + }, + { + "_id": f"example_doc_4", + "text_field": "Red Bus", + "image_field": "https://raw.githubusercontent.com/marqo-ai/marqo/mainline/examples/ImageSearchGuide/data/image4.jpg", + 'tags': ['bus', 'red'], + 'caption': 'example_doc_4' + } + ] + + extra_docs = [ + { + '_id': 'example_doc_5', + 'text_field': 'Woman looking at phone', + 'image_field': 'https://raw.githubusercontent.com/marqo-ai/marqo/mainline/examples/ImageSearchGuide/data/image0.jpg', + 'tags': ['women', 'phone'], + 'my_int': 3 + }, + { + "_id": f"example_doc_6", + "text_field": "Woman skiing", + "image_field": "https://raw.githubusercontent.com/marqo-ai/marqo-clip-onnx/main/examples/coco.jpg", + 'tags': ['ski'], + 'caption': 'example_doc_6' + }, + ] + indexes_to_test_on = [structured_index_metadata, unstructured_index_metadata] + queries = ["travel", "horse light", "travel with plane"] + search_methods = ["TENSOR", "LEXICAL", "HYBRID"] + result_keys = search_methods # Set the result keys to be the same as search methods for easy comparison + searchable_attributes = {"TENSOR": ['image_field', 'multimodal_field'], "LEXICAL": ['text_field']} + + # We need to set indexes_to_delete variable in an overriden tearDownClass() method + # So that when the test method has finished running, pytest is able to delete the indexes added in + # prepare method of this class + @classmethod + def tearDownClass(cls) -> None: + cls.indexes_to_delete = [index['indexName'] for index in cls.indexes_to_test_on] + super().tearDownClass() + + @classmethod + def setUpClass(cls) -> None: + cls.indexes_to_delete = [index['indexName'] for index in cls.indexes_to_test_on] + super().setUpClass() + + def prepare(self): + """ + Prepare the indexes and add documents for the test. + Also store the search results for later comparison. + """ + self.logger.debug(f"Creating indexes {self.indexes_to_test_on}") + self.create_indexes(self.indexes_to_test_on) + try: + self.logger.debug(f'Feeding documents to {self.indexes_to_test_on}') + for index in self.indexes_to_test_on: + if index.get("type") is not None and index.get('type') == 'structured': + self.client.index(index_name=index['indexName']).add_documents(documents=self.docs) + else: + self.client.index(index_name=index['indexName']).add_documents(documents=self.docs, + mappings=self.mappings, + tensor_fields=self.tensor_fields) + self.logger.debug(f'Ran prepare method for {self.indexes_to_test_on} inside test class {self.__class__.__name__}') + except Exception as e: + self.logger.error(f"Exception occurred while adding documents {e}") + raise e + + all_results = {} + # Loop through queries, search methods, and result keys to populate unstructured_results + for index in self.indexes_to_test_on: + index_name = index['indexName'] + all_results[index_name] = {} + + # For each index, store results for different search methods + for query, search_method, result_key in zip(self.queries, self.search_methods, self.result_keys): + if index.get("type") is not None and index.get("type") == 'structured': + if search_method == 'HYBRID': + result = self.client.index(index_name).search(q=query, search_method=search_method, hybrid_parameters=self.hybrid_search_params) + else: + result = self.client.index(index_name).search(q=query, search_method=search_method, searchable_attributes=self.searchable_attributes[search_method]) + else: + result = self.client.index(index_name).search(q=query, search_method=search_method) + all_results[index_name][result_key] = result + + # store the result of search across all structured & unstructured indexes + self.save_results_to_file(all_results) + + def test_search(self): + """Run search queries and compare the results with the stored results.""" + + stored_results = self.load_results_from_file() + for index in self.indexes_to_test_on: + index_name = index['indexName'] + + # For each index, search for different queries and compare results + for query, search_method, result_key in zip(self.queries, self.search_methods, self.result_keys): + if index.get("type") is not None and index.get("type") == 'structured': + if search_method == 'HYBRID': + result = self.client.index(index_name).search(q=query, search_method=search_method, hybrid_parameters=self.hybrid_search_params) + else: + result = self.client.index(index_name).search(q=query, search_method=search_method, searchable_attributes=self.searchable_attributes[search_method]) + else: + result = self.client.index(index_name).search(q=query, search_method=search_method) + + self._compare_search_results(stored_results[index_name][result_key], result) + + + def _compare_search_results(self, expected_result, actual_result): + """Compare two search results and assert if they match.""" + # We compare just the hits because the result contains other fields like processingTime which changes in every search API call. + self.assertEqual(expected_result.get("hits"), actual_result.get("hits"), f"Results do not match. Expected: {expected_result}, Got: {actual_result}") \ No newline at end of file diff --git a/tests/backwards_compatibility_tests/test_vector_normalisation.py b/tests/backwards_compatibility_tests/test_vector_normalisation.py new file mode 100644 index 000000000..1d29e3eb6 --- /dev/null +++ b/tests/backwards_compatibility_tests/test_vector_normalisation.py @@ -0,0 +1,90 @@ +import pytest + +from base_compatibility_test_case import BaseCompatibilityTestCase +from marqo_test import MarqoTestCase +import marqo + + +@pytest.mark.marqo_version('2.13.0') +class CompatibilityTestVectorNormalisation(BaseCompatibilityTestCase): + text_index_with_normalize_embeddings_true = "add_doc_api_test_structured_index_with_normalize_embeddings_true" + + DEFAULT_DIMENSIONS = 384 + custom_vector = [1.0 for _ in range(DEFAULT_DIMENSIONS)] + expected_custom_vector_after_normalization = [0.05103103816509247 for _ in range(DEFAULT_DIMENSIONS)] + index_metadata = { + "indexName": text_index_with_normalize_embeddings_true, + "type": "structured", + "model": "sentence-transformers/all-MiniLM-L6-v2", + "normalizeEmbeddings": True, + "allFields": [ + {"name": "title", "type": "text"}, + {"name": "content", "type": "text"}, + {"name": "int_field_1", "type": "int"}, + {"name": "float_field_1", "type": "float"}, + {"name": "long_field_1", "type": "long"}, + {"name": "double_field_1", "type": "double"}, + {"name": "array_int_field_1", "type": "array"}, + {"name": "array_float_field_1", "type": "array"}, + {"name": "array_long_field_1", "type": "array"}, + {"name": "array_double_field_1", "type": "array"}, + {"name": "custom_vector_field_1", "type": "custom_vector", + "features": ["lexical_search", "filter"]}, + ], + "tensorFields": ["title", "content", "custom_vector_field_1"], + } + + indexes_to_test_on = [text_index_with_normalize_embeddings_true] + + # We need to set indexes_to_delete variable in an overriden tearDownClass() method + # So that when the test method has finished running, pytest is able to delete the indexes added in + # prepare method of this class + @classmethod + def tearDownClass(cls) -> None: + cls.indexes_to_delete = [cls.text_index_with_normalize_embeddings_true] + super().tearDownClass() + + def prepare(self): + # Create structured and unstructured indexes and add some documents, set normalise embeddings to true + # Add documents + self.logger.debug(f"Creating indexes {self.text_index_with_normalize_embeddings_true}") + self.create_indexes([self.index_metadata]) + + try: + add_docs_res_normalized = self.client.index(index_name=self.text_index_with_normalize_embeddings_true).add_documents( + documents=[ + { + "custom_vector_field_1": { + "content": "custom vector text", + "vector": self.custom_vector, + }, + "content": "normal text", + "_id": "doc1", + }, + { + "content": "second doc", + "_id": "doc2" + } + ]) + self.logger.debug(f"Added documents to index: {add_docs_res_normalized}") + self.logger.debug(f'Ran prepare mode test for {self.text_index_with_normalize_embeddings_true} inside test class {self.__class__.__name__}') + except Exception as e: + self.logger.error(f"Exception occurred while adding documents {e}") + raise e + + def test_custom_vector_doc_in_normalized_embedding_true(self): + # This runs on to_version + get_indexes = self.client.get_indexes() + self.logger.debug(f"Got these indexes {get_indexes}") + + for index_name in self.indexes_to_test_on: + self.logger.debug(f"Processing index: {index_name}") + try: + doc_res_normalized = self.client.index(index_name).get_document( + document_id="doc1", + expose_facets=True) + self.assertEqual(doc_res_normalized["custom_vector_field_1"], "custom vector text") + self.assertEqual(doc_res_normalized['_tensor_facets'][0]["custom_vector_field_1"], "custom vector text") + self.assertEqual(doc_res_normalized['_tensor_facets'][0]['_embedding'], self.expected_custom_vector_after_normalization) + except Exception as e: + self.logger.error(f"Got an exception while trying to query index: {e}") \ No newline at end of file diff --git a/tox.ini b/tox.ini index 09b67a864..8afd71611 100644 --- a/tox.ini +++ b/tox.ini @@ -25,4 +25,4 @@ commands = [testenv:clean] deps = coverage skip_install = true -commands = coverage erase +commands = coverage erase \ No newline at end of file