Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Releases/2.14 #1099

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
push:
branches:
- mainline
- releases/*
paths-ignore:
- '**.md'
workflow_dispatch:
Expand All @@ -19,7 +20,7 @@ on:

# Setting MAX_VERSIONS_TO_TEST, this can be a configurable value or if no input is provided, it can be a default value.
env:
MAX_VERSIONS_TO_TEST: ${{ github.event.inputs.max_versions_to_test || 3 }}
MAX_VERSIONS_TO_TEST: ${{ github.event.inputs.max_versions_to_test || 5 }}

jobs:
check-if-image-exists:
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ COPY vespa .
RUN mvn clean package

# Stage 2: Base image for Python setup
FROM marqoai/marqo-base:46 as base_image
FROM marqoai/marqo-base:47 as base_image

# Allow mounting volume containing data and configs for vespa
VOLUME /opt/vespa/var
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def _cleanup_container_config(self):
"""
container_element = self._ensure_only_one('container')
for child in container_element.findall('*'):
if child.tag in ['document-api', 'search']:
if child.tag in ['document-api', 'document-processing', 'search']:
child.clear()
elif child.tag != 'nodes':
container_element.remove(child)
Expand Down
22 changes: 16 additions & 6 deletions src/marqo/core/inference/device_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,30 @@ def cuda_device_health_check(self) -> None:

oom_errors = []
for device in self.cuda_devices:
memory_stats = None
try:
cuda_device = torch.device(f'cuda:{device.id}')
memory_stats = torch.cuda.memory_stats(cuda_device)
logger.debug(f'CUDA device {device.full_name} with total memory {device.total_memory}. '
f'Memory stats: {str(memory_stats)}')

torch.randn(3, device=cuda_device)
# Marqo usually allocates 20MiB cuda memory at a time when processing media files. When OOM happens,
# there are usually a few MiB free in reserved memory. To consistently trigger OOM from this health
# check request, we try to create a tensor that won't fit in the reserved memory to always trigger
# another allocation. Please note this is not leaky since we don't store the reference to this tensor.
# Once it's out of scope, the memory it takes will be returned to reserved space. Our assumption is
# that this method will not be called too often or with high concurrency (only used by liveness check
# for now which run once every few seconds).
tensor_size = int(20 * 1024 * 1024 / 4) # 20MiB, float32 (4 bytes)
torch.randn(tensor_size, device=cuda_device)
except RuntimeError as e:
if 'out of memory' in str(e).lower():
logger.error(f'CUDA device {device.full_name} is out of memory. Total memory: {device.total_memory}. '
f'Memory stats: {str(memory_stats)}')
allocated_mem = memory_stats.get("allocated.all.current", None) if memory_stats else None
oom_errors.append(f'CUDA device {device.full_name} is out of memory:'
f' ({allocated_mem}/{device.total_memory})')
logger.error(f'CUDA device {device.full_name} is out of memory. Original error: {str(e)}. '
f'Memory stats: {str(memory_stats)}.')
allocated_mem = memory_stats.get("allocated_bytes.all.current", None) if memory_stats else None
reserved_mem = memory_stats.get("reserved_bytes.all.current", None) if memory_stats else None
oom_errors.append(f'CUDA device {device.full_name} is out of memory (reserved: {reserved_mem}, '
f'allocated: {allocated_mem}, total: {device.total_memory})')
else:
# Log out a warning message when encounter other transient errors.
logger.error(f'Encountered issue inspecting CUDA device {device.full_name}: {str(e)}')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,12 @@ def to_marqo_document(self, marqo_index: SemiStructuredMarqoIndex) -> Dict[str,
marqo_document[key] = []
marqo_document[key].append(value)

# marqo_document.update(self.fixed_fields.short_string_fields)
# Add int and float fields back
# Please note that int-map and float-map fields are flattened in the result. The correct behaviour is to convert
# them back to the format when they are indexed. We will keep the behaviour as is to avoid breaking changes.
marqo_document.update(self.fixed_fields.int_fields)
marqo_document.update(self.fixed_fields.float_fields)

marqo_document.update({k: bool(v) for k, v in self.fixed_fields.bool_fields.items()})
marqo_document[index_constants.MARQO_DOC_ID] = self.fixed_fields.marqo__id

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,23 @@ def to_marqo_document(self, vespa_document: Dict[str, Any], return_highlights: b
def to_vespa_query(self, marqo_query: MarqoQuery) -> Dict[str, Any]:
# Verify attributes to retrieve, if defined
if marqo_query.attributes_to_retrieve is not None:
if len(marqo_query.attributes_to_retrieve) > 0:
# Retrieve static fields content to extract non-string values from combined fields
marqo_query.attributes_to_retrieve.extend([
common.STRING_ARRAY,
common.INT_FIELDS,
common.FLOAT_FIELDS,
common.BOOL_FIELDS,
])

marqo_query.attributes_to_retrieve.append(common.VESPA_FIELD_ID)

# add chunk field names for tensor fields
marqo_query.attributes_to_retrieve.extend(
[self.get_marqo_index().tensor_field_map[att].chunk_field_name
for att in marqo_query.attributes_to_retrieve
if att in self.get_marqo_index().tensor_field_map]
)

# Hybrid must be checked first since it is a subclass of Tensor and Lexical
if isinstance(marqo_query, MarqoHybridQuery):
return StructuredVespaIndex._to_vespa_hybrid_query(self, marqo_query)
Expand Down
17 changes: 11 additions & 6 deletions src/marqo/core/unstructured_vespa_index/unstructured_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,19 @@ def from_vespa_document(cls, document: Dict) -> "UnstructuredVespaDocument":
if unstructured_common.VESPA_DOC_HYBRID_RAW_TENSOR_SCORE in fields else None

return cls(id=document[cls._VESPA_DOC_ID],
raw_tensor_score = raw_tensor_score,
raw_lexical_score = raw_lexical_score,
fields=UnstructuredVespaDocumentFields(**fields))
raw_tensor_score=raw_tensor_score,
raw_lexical_score=raw_lexical_score,
fields=UnstructuredVespaDocumentFields(**fields))

@classmethod
def from_marqo_document(cls, document: Dict, filter_string_max_length: int) -> "UnstructuredVespaDocument":
"""Instantiate an UnstructuredVespaDocument from a valid Marqo document from
add_documents"""

if index_constants.MARQO_DOC_ID not in document:
raise MarqoDocumentParsingError(f"Unstructured Marqo document does not have a {index_constants.MARQO_DOC_ID} field. "
f"This should be assigned for a valid document")
raise MarqoDocumentParsingError(f"Unstructured Marqo document does not have a "
f"{index_constants.MARQO_DOC_ID} field. "
f"This should be assigned for a valid document")

doc_id = document[index_constants.MARQO_DOC_ID]
instance = cls(id=doc_id, fields=UnstructuredVespaDocumentFields(marqo__id=doc_id))
Expand Down Expand Up @@ -123,7 +124,8 @@ def from_marqo_document(cls, document: Dict, filter_string_max_length: int) -> "
instance.fields.score_modifiers_fields[f"{key}.{k}"] = v
else:
raise MarqoDocumentParsingError(f"In document {doc_id}, field {key} has an "
f"unsupported type {type(value)} which has not been validated in advance.")
f"unsupported type {type(value)} which has not been "
f"validated in advance.")

instance.fields.vespa_multimodal_params = document.get(unstructured_common.MARQO_DOC_MULTIMODAL_PARAMS, {})
instance.fields.vespa_embeddings = document.get(index_constants.MARQO_DOC_EMBEDDINGS, {})
Expand Down Expand Up @@ -152,8 +154,11 @@ def to_marqo_document(self, return_highlights: bool = False) -> Dict[str, Any]:
marqo_document[key].append(value)

# Add int and float fields back
# Please note that int-map and float-map fields are flattened in the result. The correct behaviour is to convert
# them back to the format when they are indexed. We will keep the behaviour as is to avoid breaking changes.
marqo_document.update(self.fields.int_fields)
marqo_document.update(self.fields.float_fields)

marqo_document.update({k: bool(v) for k, v in self.fields.bool_fields.items()})
marqo_document[index_constants.MARQO_DOC_ID] = self.fields.marqo__id

Expand Down
9 changes: 7 additions & 2 deletions src/marqo/tensor_search/tensor_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -1782,7 +1782,8 @@ def gather_documents_from_response(response: QueryResult, marqo_index: MarqoInde
marqo_doc = vespa_index.to_marqo_document(doc.dict(), return_highlights=highlights)
marqo_doc['_score'] = doc.relevance

if marqo_index.type == IndexType.Unstructured and attributes_to_retrieve is not None:
if (marqo_index.type in [IndexType.Unstructured, IndexType.SemiStructured] and
attributes_to_retrieve is not None):
# For an unstructured index, we do the attributes_to_retrieve after search
marqo_doc = unstructured_index_attributes_to_retrieve(marqo_doc, attributes_to_retrieve)

Expand All @@ -1798,7 +1799,11 @@ def unstructured_index_attributes_to_retrieve(marqo_doc: Dict[str, Any], attribu
str, Any]:
# attributes_to_retrieve should already be validated at the start of search
attributes_to_retrieve = list(set(attributes_to_retrieve).union({"_id", "_score", "_highlights"}))
return {k: v for k, v in marqo_doc.items() if k in attributes_to_retrieve}
return {k: v for k, v in marqo_doc.items() if k in attributes_to_retrieve or
# Please note that numeric map fields are flattened for unstructured or semi-structured indexes.
# Therefore, when filtering on attributes_to_retrieve, we need to also include flattened map fields
# with the specified attributes as prefixes. We keep this behaviour only for compatibility reasons.
any([k.startswith(attribute + ".") for attribute in attributes_to_retrieve])}


def assign_query_to_vector_job(
Expand Down
2 changes: 1 addition & 1 deletion src/marqo/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "2.14.0"
__version__ = "2.14.1"

def get_version() -> str:
return f"{__version__}"
19 changes: 11 additions & 8 deletions tests/backwards_compatibility_tests/scripts/generate_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
import subprocess
import sys

def generate_versions(to_version: str, num_versions: int = 3) -> list:
def generate_versions(to_version: str, num_minor_versions_to_test: int = 3) -> list:
"""
Generate a list of previous versions based on the target version.

This function generates a list of previous versions for a given target version.
It includes the previous patch version of the same minor version if applicable,
and the latest patch versions for preceding minor versions.
It includes all the previous patch versions of the same minor version if applicable,
and the latest patch versions for preceding minor versions of up to num_minor_versions_to_test.

Args:
to_version (str): The target version to generate previous versions for.
num_versions (int): The number of previous versions to generate. Defaults to 3.
num_minor_versions_to_test (int): The number of previous minor versions to generate. Defaults to 3.

Returns:
list: A list of previous versions as strings.
Expand All @@ -24,13 +24,16 @@ def generate_versions(to_version: str, num_versions: int = 3) -> list:

# If this is a patch release, add the previous patch version of the same minor version
if target_version.patch > 0:
prev_patch_version = f"{target_version.major}.{target_version.minor}.{target_version.patch - 1}"
versions.append(prev_patch_version)
versions.extend(
f"{target_version.major}.{target_version.minor}.{i}"
for i in range(target_version.patch - 1, -1, -1)
)

# Gather the latest patch version for each preceding minor version
minor = target_version.minor - 1
while len(versions) < num_versions and minor >= 0:
# Get all tags for the given minor version, sort, and pick the latest patch
for _ in range(num_minor_versions_to_test):
if minor < 0:
break
tags = subprocess.check_output(
["git", "tag", "--list", f"{target_version.major}.{minor}.*"],
text=True
Expand Down
65 changes: 65 additions & 0 deletions tests/core/index_management/test_services_xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,71 @@ def test_config_components(self):
"""
self._assertStringsEqualIgnoringWhitespace(expected_xml, service_xml.to_xml())

def test_config_components_should_preserve_document_processing_nodes(self):
"""
This test case tests that document-processing node in the `container` section is preserved when configuring
components since it is usually referenced in the `content` section, and we do not change content section.
"""
xml = """<?xml version="1.0" encoding="utf-8" ?>
<services version="1.0" xmlns:deploy="vespa" xmlns:preprocess="properties">
<container id="default" version="1.0">
<document-api/>
<document-processing/>
<search/>
<nodes>
<node hostalias="node1"/>
</nodes>
</container>
<content id="content_default" version="1.0">
<documents>
<document type="marqo__existing_00index" mode="index"/>
<document-processing cluster="default"/>
</documents>
</content>
</services>
"""

service_xml = ServicesXml(xml)
service_xml.config_components()

# removed all random element and custom components from container element
# added searcher, handler and other custom components to container element
# kept nodes in container as is
# kept content element as is
expected_xml = """<?xml version="1.0" encoding="utf-8" ?>
<services version="1.0" xmlns:deploy="vespa" xmlns:preprocess="properties">
<container id="default" version="1.0">
<document-api/>
<document-processing/>
<search>
<chain id="marqo" inherits="vespa">
<searcher id="ai.marqo.search.HybridSearcher" bundle="marqo-custom-searchers"/>
</chain>
</search>
<nodes>
<node hostalias="node1"/>
</nodes>
<handler id="ai.marqo.index.IndexSettingRequestHandler" bundle="marqo-custom-searchers">
<binding>http://*/index-settings/*</binding>
<binding>http://*/index-settings</binding>
</handler>
<component id="ai.marqo.index.IndexSettings" bundle="marqo-custom-searchers">
<config name="ai.marqo.index.index-settings">
<indexSettingsFile>marqo_index_settings.json</indexSettingsFile>
<indexSettingsHistoryFile>marqo_index_settings_history.json</indexSettingsHistoryFile>
</config>
</component>
</container>
<content id="content_default" version="1.0">
<documents>
<document type="marqo__existing_00index" mode="index"/>
<document-processing cluster="default"/>
</documents>
</content>
</services>
"""
self._assertStringsEqualIgnoringWhitespace(expected_xml, service_xml.to_xml())

def _assertStringsEqualIgnoringWhitespace(self, s1: str, s2: str):
"""Custom assertion to compare strings ignoring whitespace."""

Expand Down
Loading
Loading