Skip to content

Commit

Permalink
Create Marqo settings schema at startup (#730)
Browse files Browse the repository at this point in the history
* Create Marqo settings schema in on-start script
* Flush welcome message right away
* Terminate API process for SIGINT and SIGTERM
  • Loading branch information
farshidz authored Jan 24, 2024
1 parent da275df commit 4ccc4a4
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 44 deletions.
42 changes: 23 additions & 19 deletions run_marqo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,28 @@ VESPA_IS_INTERNAL=False
# Vespa local run
if ([ -n "$VESPA_QUERY_URL" ] || [ -n "$VESPA_DOCUMENT_URL" ] || [ -n "$VESPA_CONFIG_URL" ]) && \
([ -z "$VESPA_QUERY_URL" ] || [ -z "$VESPA_DOCUMENT_URL" ] || [ -z "$VESPA_CONFIG_URL" ]); then
echo "Error: Partial VESPA environment variables set. Please provide all or none of the VESPA_QUERY_URL, VESPA_DOCUMENT_URL, VESPA_CONFIG_URL."
echo "Error: Partial external vector store configuration detected. \
Please provide all or none of the VESPA_QUERY_URL, VESPA_DOCUMENT_URL, VESPA_CONFIG_URL. \
See https://docs.marqo.ai/2.0.0/Guides/Advanced-Usage/configuration/ for more information"
exit 1

elif [ -z "$VESPA_QUERY_URL" ] && [ -z "$VESPA_DOCUMENT_URL" ] && [ -z "$VESPA_CONFIG_URL" ]; then
# Start local vespa
echo "Running Vespa Locally"
echo "External vector store not configured. Using local vector store"
tmux new-session -d -s vespa "bash /usr/local/bin/start_vespa.sh"

echo "Waiting for Vespa to start"
echo "Waiting for vector store to start"
for i in {1..5}; do
echo -ne "Waiting... $i seconds\r"
sleep 1
if [ $i -eq 1 ]; then
suffix="second"
else
suffix="seconds"
fi
echo -ne "Waiting... $i $suffix\r"
sleep 1
done
echo -e "\nDone waiting."

# Try to deploy the application and branch on the output
echo "Setting up Marqo local vector search application..."
END_POINT="http://localhost:19071/application/v2/tenant/default/application/default"
MAX_RETRIES=10
RETRY_COUNT=0
Expand All @@ -66,35 +71,34 @@ elif [ -z "$VESPA_QUERY_URL" ] && [ -z "$VESPA_DOCUMENT_URL" ] && [ -z "$VESPA_C

# Check for the specific "not found" error response which indicates there is no application package deployed
if echo "$RESPONSE" | grep -q '"error-code":"NOT_FOUND"'; then
echo "Marqo does not find an existing index"
echo "Marqo is deploying the application and waiting for the response from document API to start..."
echo "Marqo did not find an existing vector store. Setting up vector store..."
# Deploy a dummy application package
vespa deploy /app/scripts/vespa_dummy_app --wait 300 >/dev/null 2>&1

until curl -f -X GET http://localhost:8080 >/dev/null 2>&1; do
echo " Waiting for Vespa document API to be available..."
echo " Waiting for vector store to be available..."
sleep 10
done
echo " Vespa document API is available. Local Vespa setup complete."
echo " Vector store is available. Vector store setup complete"
break

# Check for the "generation" success response which indicates there is an existing application package deployed
elif echo "$RESPONSE" | grep -q '"generation":'; then
echo "Marqo found an existing index. Waiting for the response from document API to start Marqo..."
echo "Marqo found an existing vector store. Waiting for vector store to be available..."

until curl -f -X GET http://localhost:8080 >/dev/null 2>&1; do
echo " Waiting for Vespa document API to be available..."
echo " Waiting for vector store to be available..."
sleep 10
done
echo " Vespa document API is available. Local Vespa setup complete."
echo " Vector store is available. Vector store setup complete"
break
fi
((RETRY_COUNT++))
sleep 5
done

if [ $RETRY_COUNT -eq $MAX_RETRIES ]; then
echo "Warning: Marqo didn't configure local vector . Marqo is still starting but unexpected error may happen."
echo "Warning: Failed to configure local vector store. Marqo may not function correctly"
fi

export VESPA_QUERY_URL="http://localhost:8080"
Expand All @@ -103,7 +107,7 @@ elif [ -z "$VESPA_QUERY_URL" ] && [ -z "$VESPA_DOCUMENT_URL" ] && [ -z "$VESPA_C
export VESPA_IS_INTERNAL=True

else
echo "All VESPA environment variables provided. Skipping local Vespa setup."
echo "External vector store configured. Using external vector store"
fi

# Start up redis
Expand All @@ -123,7 +127,7 @@ if [ "$MARQO_ENABLE_THROTTLING" != "FALSE" ]; then
elapsed_time=$(expr $current_time - $start_time)
if [ $elapsed_time -ge 2000 ]; then
# Expected start time should be < 30ms in reality.
echo "redis-server failed to start within 2s. skipping."
echo "Marqo throttling server failed to start within 2s. skipping"
break
fi
sleep 0.1
Expand All @@ -132,7 +136,7 @@ if [ "$MARQO_ENABLE_THROTTLING" != "FALSE" ]; then
echo "Marqo throttling is now running"

else
echo "Throttling has been disabled. Skipping Marqo throttling start."
echo "Throttling has been disabled. Skipping Marqo throttling start"
fi

# set the default value to info and convert to lower case
Expand All @@ -142,7 +146,7 @@ MARQO_LOG_LEVEL=`echo "$MARQO_LOG_LEVEL" | tr '[:upper:]' '[:lower:]'`
# Start the tensor search web app in the background
cd /app/src/marqo/tensor_search || exit
uvicorn api:app --host 0.0.0.0 --port 8882 --timeout-keep-alive 75 --log-level $MARQO_LOG_LEVEL &
api_pid=$!
export api_pid=$!
wait "$api_pid"


Expand Down
7 changes: 4 additions & 3 deletions scripts/shutdown.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/bin/bash
echo "stopping marqo..."
redis-cli shutdown 2>/dev/null || true
/opt/vespa/bin/vespa-stop-services >/dev/null 2>&1
echo "Stopping Marqo..."
kill -SIGINT $api_pid >/dev/null 2>&1 || true
redis-cli shutdown >/dev/null 2>&1 || true
/opt/vespa/bin/vespa-stop-services >/dev/null 2>&1 || true
23 changes: 20 additions & 3 deletions src/marqo/core/index_management/index_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,23 @@ class IndexManagement:
def __init__(self, vespa_client: VespaClient):
self.vespa_client = vespa_client

def create_settings_schema(self) -> bool:
"""
Create the Marqo settings schema if it does not exist.
Returns:
True if schema was created, False if it already existed
"""
app = self.vespa_client.download_application()
settings_schema_created = self._add_marqo_settings_schema(app)

if settings_schema_created:
self.vespa_client.deploy_application(app)
self.vespa_client.wait_for_application_convergence()
return True

return False

def create_index(self, marqo_index_request: MarqoIndexRequest) -> MarqoIndex:
"""
Create a Marqo index.
Expand All @@ -53,7 +70,7 @@ def create_index(self, marqo_index_request: MarqoIndexRequest) -> MarqoIndex:
InvalidVespaApplicationError: If Vespa application is invalid after applying the index
"""
app = self.vespa_client.download_application()
settings_schema_created = self._create_marqo_settings_schema(app)
settings_schema_created = self._add_marqo_settings_schema(app)

if not settings_schema_created and self.index_exists(marqo_index_request.name):
raise IndexExistsError(f"Index {marqo_index_request.name} already exists")
Expand Down Expand Up @@ -88,7 +105,7 @@ def batch_create_indexes(self, marqo_index_requests: List[MarqoIndexRequest]) ->
InvalidVespaApplicationError: If Vespa application is invalid after applying the indexes
"""
app = self.vespa_client.download_application()
settings_schema_created = self._create_marqo_settings_schema(app)
settings_schema_created = self._add_marqo_settings_schema(app)

if not settings_schema_created:
for index in marqo_index_requests:
Expand Down Expand Up @@ -224,7 +241,7 @@ def index_exists(self, index_name: str) -> bool:
except IndexNotFoundError:
return False

def _create_marqo_settings_schema(self, app: str) -> bool:
def _add_marqo_settings_schema(self, app: str) -> bool:
"""
Create the Marqo settings schema if it does not exist.
Args:
Expand Down
16 changes: 16 additions & 0 deletions src/marqo/documentation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import marqo.version

version = marqo.version.__version__
base_url = 'https://docs.marqo.ai'


def _build_url(path):
return f'{base_url}/{version}/{path}'


def configuring_marqo():
return _build_url('Guides/Advanced-Usage/configuration/')


def create_index():
return _build_url('API-Reference/Indexes/create_index/')
11 changes: 6 additions & 5 deletions src/marqo/tensor_search/index_meta_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time
from typing import Dict

from marqo import documentation
from marqo.api import exceptions
from marqo.config import Config
from marqo.core.exceptions import IndexNotFoundError
Expand Down Expand Up @@ -106,14 +107,14 @@ def refresh():
if isinstance(e, VespaStatusError) and e.status_code == 400:
# This can happen when settings schema doesn't exist
logger.warn(
'Failed to populate index cache due to 400 error from Vespa. This is expected if you '
f'have not created an index yet. Error: {e}'
'Failed to populate index cache due to 400 error from vector store. This can happen '
'if Marqo settings schema does not exist. Error: {e}'
)
else:
logger.warn(
"Failed to connect to Vespa Document API. If you are using an external Vespa instance, "
"ensure that the VESPA_DOCUMENT_URL environment variable is set and your Vespa "
f"instance is running and healthy. Error: {e}"
"Failed to connect to vector store. If you are using an external vector store, "
"ensure that Marqo is configured properly for this. See "
f"{documentation.configuring_marqo()} for more details. Error: {e}"
)
except Exception as e:
logger.error(f'Unexpected error in index cache refresh thread: {e}')
Expand Down
40 changes: 33 additions & 7 deletions src/marqo/tensor_search/on_start_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,28 @@

import torch

from marqo import config
from marqo import config, documentation
from marqo.api import exceptions
from marqo.connections import redis_driver
from marqo.s2_inference.s2_inference import vectorise
# we need to import backend before index_meta_cache to prevent circular import error:
from marqo.tensor_search import index_meta_cache, utils
from marqo.tensor_search.enums import EnvVars
from marqo.tensor_search.tensor_search_logging import get_logger
from marqo.vespa.exceptions import VespaError

logger = get_logger(__name__)


def on_start(config: config.Config):
to_run_on_start = (
CreateSettingsSchema(config),
PopulateCache(config),
DownloadStartText(),
CUDAAvailable(),
SetBestAvailableDevice(),
ModelsForCacheing(),
InitializeRedis("localhost", 6379), # TODO, have these variable
InitializeRedis("localhost", 6379),
DownloadFinishText(),
MarqoWelcome(),
MarqoPhrase(),
Expand All @@ -33,13 +35,36 @@ def on_start(config: config.Config):
thing_to_start.run()


class CreateSettingsSchema:
"""Create the Marqo settings schema on Vespa"""

def __init__(self, config: config.Config):
self.config = config

def run(self):
try:
logger.debug('Creating Marqo settings schema')
created = self.config.index_management.create_settings_schema()
if created:
logger.debug('Marqo settings schema created')
else:
logger.debug('Marqo settings schema already exists. Skipping')
except VespaError as e:
logger.warn(
f"Could not create Marqo settings schema. If you are using an external vector store, "
"ensure that Marqo is configured properly for this. See "
f"{documentation.configuring_marqo()} for more details. Error: {e}"
)


class PopulateCache:
"""Populates the cache on start"""

def __init__(self, config: config.Config):
self.config = config

def run(self):
logger.debug('Starting index cache refresh thread')
index_meta_cache.start_refresh_thread(self.config)


Expand All @@ -64,7 +89,7 @@ def id_to_device(id):
for device_id in device_ids:
device_names.append({'id': device_id, 'name': id_to_device(device_id)})

self.logger.info(f"found devices {device_names}")
self.logger.info(f"Found devices {device_names}")


class SetBestAvailableDevice:
Expand Down Expand Up @@ -181,6 +206,7 @@ def __init__(self, host: str, port: int):
self.port = port

def run(self):
logger.debug('Initializing Redis')
# Can be turned off with MARQO_ENABLE_THROTTLING = 'FALSE'
if utils.read_env_vars_and_defaults(EnvVars.MARQO_ENABLE_THROTTLING) == "TRUE":
redis_driver.init_from_app(self.host, self.port)
Expand All @@ -195,7 +221,7 @@ def run(self):
print("###### STARTING DOWNLOAD OF MARQO ARTEFACTS################")
print("###########################################################")
print("###########################################################")
print('\n')
print('\n', flush=True)


class DownloadFinishText:
Expand All @@ -207,7 +233,7 @@ def run(self):
print("###### !!COMPLETED SUCCESSFULLY!!! ################")
print("###########################################################")
print("###########################################################")
print('\n')
print('\n', flush=True)


class MarqoPhrase:
Expand All @@ -222,7 +248,7 @@ def run(self):
"""

print(message)
print(message, flush=True)


class MarqoWelcome:
Expand All @@ -238,4 +264,4 @@ def run(self):
\_/\_/ |_____||_____|\____| \___/ |___|___||_____| |__| \___/ |___|___||__|__||__|\_|\__,_| \___/ |__|
"""
print(message)
print(message, flush=True)
Loading

0 comments on commit 4ccc4a4

Please sign in to comment.