Skip to content

Commit

Permalink
[Releases 2.15] Add the missing patchs in 2.14.1 (#1097)
Browse files Browse the repository at this point in the history
  • Loading branch information
wanliAlex authored Jan 24, 2025
1 parent 7c21efc commit 9fcce96
Show file tree
Hide file tree
Showing 14 changed files with 566 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def _cleanup_container_config(self):
"""
container_element = self._ensure_only_one('container')
for child in container_element.findall('*'):
if child.tag in ['document-api', 'search']:
if child.tag in ['document-api', 'document-processing', 'search']:
child.clear()
elif child.tag != 'nodes':
container_element.remove(child)
Expand Down
22 changes: 16 additions & 6 deletions src/marqo/core/inference/device_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,30 @@ def cuda_device_health_check(self) -> None:

oom_errors = []
for device in self.cuda_devices:
memory_stats = None
try:
cuda_device = torch.device(f'cuda:{device.id}')
memory_stats = torch.cuda.memory_stats(cuda_device)
logger.debug(f'CUDA device {device.full_name} with total memory {device.total_memory}. '
f'Memory stats: {str(memory_stats)}')

torch.randn(3, device=cuda_device)
# Marqo usually allocates 20MiB cuda memory at a time when processing media files. When OOM happens,
# there are usually a few MiB free in reserved memory. To consistently trigger OOM from this health
# check request, we try to create a tensor that won't fit in the reserved memory to always trigger
# another allocation. Please note this is not leaky since we don't store the reference to this tensor.
# Once it's out of scope, the memory it takes will be returned to reserved space. Our assumption is
# that this method will not be called too often or with high concurrency (only used by liveness check
# for now which run once every few seconds).
tensor_size = int(20 * 1024 * 1024 / 4) # 20MiB, float32 (4 bytes)
torch.randn(tensor_size, device=cuda_device)
except RuntimeError as e:
if 'out of memory' in str(e).lower():
logger.error(f'CUDA device {device.full_name} is out of memory. Total memory: {device.total_memory}. '
f'Memory stats: {str(memory_stats)}')
allocated_mem = memory_stats.get("allocated.all.current", None) if memory_stats else None
oom_errors.append(f'CUDA device {device.full_name} is out of memory:'
f' ({allocated_mem}/{device.total_memory})')
logger.error(f'CUDA device {device.full_name} is out of memory. Original error: {str(e)}. '
f'Memory stats: {str(memory_stats)}.')
allocated_mem = memory_stats.get("allocated_bytes.all.current", None) if memory_stats else None
reserved_mem = memory_stats.get("reserved_bytes.all.current", None) if memory_stats else None
oom_errors.append(f'CUDA device {device.full_name} is out of memory (reserved: {reserved_mem}, '
f'allocated: {allocated_mem}, total: {device.total_memory})')
else:
# Log out a warning message when encounter other transient errors.
logger.error(f'Encountered issue inspecting CUDA device {device.full_name}: {str(e)}')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,12 @@ def to_marqo_document(self, marqo_index: SemiStructuredMarqoIndex) -> Dict[str,
marqo_document[key] = []
marqo_document[key].append(value)

# marqo_document.update(self.fixed_fields.short_string_fields)
# Add int and float fields back
# Please note that int-map and float-map fields are flattened in the result. The correct behaviour is to convert
# them back to the format when they are indexed. We will keep the behaviour as is to avoid breaking changes.
marqo_document.update(self.fixed_fields.int_fields)
marqo_document.update(self.fixed_fields.float_fields)

marqo_document.update({k: bool(v) for k, v in self.fixed_fields.bool_fields.items()})
marqo_document[index_constants.MARQO_DOC_ID] = self.fixed_fields.marqo__id

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,23 @@ def to_marqo_document(self, vespa_document: Dict[str, Any], return_highlights: b
def to_vespa_query(self, marqo_query: MarqoQuery) -> Dict[str, Any]:
# Verify attributes to retrieve, if defined
if marqo_query.attributes_to_retrieve is not None:
if len(marqo_query.attributes_to_retrieve) > 0:
# Retrieve static fields content to extract non-string values from combined fields
marqo_query.attributes_to_retrieve.extend([
common.STRING_ARRAY,
common.INT_FIELDS,
common.FLOAT_FIELDS,
common.BOOL_FIELDS,
])

marqo_query.attributes_to_retrieve.append(common.VESPA_FIELD_ID)

# add chunk field names for tensor fields
marqo_query.attributes_to_retrieve.extend(
[self.get_marqo_index().tensor_field_map[att].chunk_field_name
for att in marqo_query.attributes_to_retrieve
if att in self.get_marqo_index().tensor_field_map]
)

# Hybrid must be checked first since it is a subclass of Tensor and Lexical
if isinstance(marqo_query, MarqoHybridQuery):
return StructuredVespaIndex._to_vespa_hybrid_query(self, marqo_query)
Expand Down
17 changes: 11 additions & 6 deletions src/marqo/core/unstructured_vespa_index/unstructured_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,19 @@ def from_vespa_document(cls, document: Dict) -> "UnstructuredVespaDocument":
if unstructured_common.VESPA_DOC_HYBRID_RAW_TENSOR_SCORE in fields else None

return cls(id=document[cls._VESPA_DOC_ID],
raw_tensor_score = raw_tensor_score,
raw_lexical_score = raw_lexical_score,
fields=UnstructuredVespaDocumentFields(**fields))
raw_tensor_score=raw_tensor_score,
raw_lexical_score=raw_lexical_score,
fields=UnstructuredVespaDocumentFields(**fields))

@classmethod
def from_marqo_document(cls, document: Dict, filter_string_max_length: int) -> "UnstructuredVespaDocument":
"""Instantiate an UnstructuredVespaDocument from a valid Marqo document from
add_documents"""

if index_constants.MARQO_DOC_ID not in document:
raise MarqoDocumentParsingError(f"Unstructured Marqo document does not have a {index_constants.MARQO_DOC_ID} field. "
f"This should be assigned for a valid document")
raise MarqoDocumentParsingError(f"Unstructured Marqo document does not have a "
f"{index_constants.MARQO_DOC_ID} field. "
f"This should be assigned for a valid document")

doc_id = document[index_constants.MARQO_DOC_ID]
instance = cls(id=doc_id, fields=UnstructuredVespaDocumentFields(marqo__id=doc_id))
Expand Down Expand Up @@ -123,7 +124,8 @@ def from_marqo_document(cls, document: Dict, filter_string_max_length: int) -> "
instance.fields.score_modifiers_fields[f"{key}.{k}"] = v
else:
raise MarqoDocumentParsingError(f"In document {doc_id}, field {key} has an "
f"unsupported type {type(value)} which has not been validated in advance.")
f"unsupported type {type(value)} which has not been "
f"validated in advance.")

instance.fields.vespa_multimodal_params = document.get(unstructured_common.MARQO_DOC_MULTIMODAL_PARAMS, {})
instance.fields.vespa_embeddings = document.get(index_constants.MARQO_DOC_EMBEDDINGS, {})
Expand Down Expand Up @@ -152,8 +154,11 @@ def to_marqo_document(self, return_highlights: bool = False) -> Dict[str, Any]:
marqo_document[key].append(value)

# Add int and float fields back
# Please note that int-map and float-map fields are flattened in the result. The correct behaviour is to convert
# them back to the format when they are indexed. We will keep the behaviour as is to avoid breaking changes.
marqo_document.update(self.fields.int_fields)
marqo_document.update(self.fields.float_fields)

marqo_document.update({k: bool(v) for k, v in self.fields.bool_fields.items()})
marqo_document[index_constants.MARQO_DOC_ID] = self.fields.marqo__id

Expand Down
9 changes: 7 additions & 2 deletions src/marqo/tensor_search/tensor_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -1795,7 +1795,8 @@ def gather_documents_from_response(response: QueryResult, marqo_index: MarqoInde
marqo_doc = vespa_index.to_marqo_document(doc.dict(), return_highlights=highlights)
marqo_doc['_score'] = doc.relevance

if marqo_index.type == IndexType.Unstructured and attributes_to_retrieve is not None:
if (marqo_index.type in [IndexType.Unstructured, IndexType.SemiStructured] and
attributes_to_retrieve is not None):
# For an unstructured index, we do the attributes_to_retrieve after search
marqo_doc = unstructured_index_attributes_to_retrieve(marqo_doc, attributes_to_retrieve)

Expand All @@ -1811,7 +1812,11 @@ def unstructured_index_attributes_to_retrieve(marqo_doc: Dict[str, Any], attribu
str, Any]:
# attributes_to_retrieve should already be validated at the start of search
attributes_to_retrieve = list(set(attributes_to_retrieve).union({"_id", "_score", "_highlights"}))
return {k: v for k, v in marqo_doc.items() if k in attributes_to_retrieve}
return {k: v for k, v in marqo_doc.items() if k in attributes_to_retrieve or
# Please note that numeric map fields are flattened for unstructured or semi-structured indexes.
# Therefore, when filtering on attributes_to_retrieve, we need to also include flattened map fields
# with the specified attributes as prefixes. We keep this behaviour only for compatibility reasons.
any([k.startswith(attribute + ".") for attribute in attributes_to_retrieve])}


def assign_query_to_vector_job(
Expand Down
2 changes: 1 addition & 1 deletion src/marqo/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "2.15.0"
__version__ = "2.15.1"

def get_version() -> str:
return f"{__version__}"
65 changes: 65 additions & 0 deletions tests/core/index_management/test_services_xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,71 @@ def test_config_components(self):
"""
self._assertStringsEqualIgnoringWhitespace(expected_xml, service_xml.to_xml())

def test_config_components_should_preserve_document_processing_nodes(self):
"""
This test case tests that document-processing node in the `container` section is preserved when configuring
components since it is usually referenced in the `content` section, and we do not change content section.
"""
xml = """<?xml version="1.0" encoding="utf-8" ?>
<services version="1.0" xmlns:deploy="vespa" xmlns:preprocess="properties">
<container id="default" version="1.0">
<document-api/>
<document-processing/>
<search/>
<nodes>
<node hostalias="node1"/>
</nodes>
</container>
<content id="content_default" version="1.0">
<documents>
<document type="marqo__existing_00index" mode="index"/>
<document-processing cluster="default"/>
</documents>
</content>
</services>
"""

service_xml = ServicesXml(xml)
service_xml.config_components()

# removed all random element and custom components from container element
# added searcher, handler and other custom components to container element
# kept nodes in container as is
# kept content element as is
expected_xml = """<?xml version="1.0" encoding="utf-8" ?>
<services version="1.0" xmlns:deploy="vespa" xmlns:preprocess="properties">
<container id="default" version="1.0">
<document-api/>
<document-processing/>
<search>
<chain id="marqo" inherits="vespa">
<searcher id="ai.marqo.search.HybridSearcher" bundle="marqo-custom-searchers"/>
</chain>
</search>
<nodes>
<node hostalias="node1"/>
</nodes>
<handler id="ai.marqo.index.IndexSettingRequestHandler" bundle="marqo-custom-searchers">
<binding>http://*/index-settings/*</binding>
<binding>http://*/index-settings</binding>
</handler>
<component id="ai.marqo.index.IndexSettings" bundle="marqo-custom-searchers">
<config name="ai.marqo.index.index-settings">
<indexSettingsFile>marqo_index_settings.json</indexSettingsFile>
<indexSettingsHistoryFile>marqo_index_settings_history.json</indexSettingsHistoryFile>
</config>
</component>
</container>
<content id="content_default" version="1.0">
<documents>
<document type="marqo__existing_00index" mode="index"/>
<document-processing cluster="default"/>
</documents>
</content>
</services>
"""
self._assertStringsEqualIgnoringWhitespace(expected_xml, service_xml.to_xml())

def _assertStringsEqualIgnoringWhitespace(self, s1: str, s2: str):
"""Custom assertion to compare strings ignoring whitespace."""

Expand Down
24 changes: 17 additions & 7 deletions tests/core/inference/test_device_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ def _device_manager_with_multiple_cuda_devices(self, total_memory: int = 1_000_0

return DeviceManager()

def _mem_stats(self, allocated: int, reserved: int) -> OrderedDict:
return OrderedDict({
"allocated_bytes.all.current": allocated,
"reserved_bytes.all.current": reserved
})

def test_init_with_cpu(self):
device_manager = self._device_manager_without_cuda()

Expand Down Expand Up @@ -79,35 +85,39 @@ def test_cuda_health_check_should_fail_when_cuda_device_is_out_of_memory(self):

with mock.patch("torch.cuda.is_available", return_value=True), \
mock.patch("torch.randn", side_effect=RuntimeError("CUDA error: out of memory")), \
mock.patch("torch.cuda.memory_stats", return_value=OrderedDict({"allocated.all.current": 900_000})):
mock.patch("torch.cuda.memory_stats", return_value=self._mem_stats(900_000, 950_000)):
with self.assertRaises(CudaOutOfMemoryError) as err:
device_manager.cuda_device_health_check()

self.assertEqual(str(err.exception), "CUDA device cuda:0(Tesla T4) is out of memory: (900000/1000000)")
self.assertEqual(str(err.exception), "CUDA device cuda:0(Tesla T4) is out of memory "
"(reserved: 950000, allocated: 900000, total: 1000000)")

def test_cuda_health_check_should_fail_when_any_cuda_device_is_out_of_memory(self):
device_manager = self._device_manager_with_multiple_cuda_devices(total_memory=1_000_000)

with mock.patch("torch.cuda.is_available", return_value=True), \
mock.patch("torch.randn", side_effect=[torch.tensor([1, 2, 3]), RuntimeError("CUDA error: out of memory")]), \
mock.patch("torch.cuda.memory_stats", return_value=OrderedDict({"allocated.all.current": 900_000})):
mock.patch("torch.cuda.memory_stats", return_value=self._mem_stats(900_000, 950_000)):
with self.assertRaises(CudaOutOfMemoryError) as err:
device_manager.cuda_device_health_check()

self.assertEqual(str(err.exception), "CUDA device cuda:1(Tesla H200) is out of memory: (900000/1000000)")
self.assertEqual(str(err.exception), "CUDA device cuda:1(Tesla H200) is out of memory "
"(reserved: 950000, allocated: 900000, total: 1000000)")

def test_cuda_health_check_should_check_if_all_cuda_devices_are_out_of_memory(self):
device_manager = self._device_manager_with_multiple_cuda_devices(total_memory=1_000_000)

with mock.patch("torch.cuda.is_available", return_value=True), \
mock.patch("torch.randn",
side_effect=[RuntimeError("CUDA error: out of memory"), RuntimeError("CUDA error: out of memory")]), \
mock.patch("torch.cuda.memory_stats", return_value=OrderedDict({"allocated.all.current": 900_000})):
mock.patch("torch.cuda.memory_stats", return_value=self._mem_stats(900_000, 950_000)):
with self.assertRaises(CudaOutOfMemoryError) as err:
device_manager.cuda_device_health_check()

self.assertEqual(str(err.exception), "CUDA device cuda:0(Tesla T4) is out of memory: (900000/1000000);"
"CUDA device cuda:1(Tesla H200) is out of memory: (900000/1000000)")
self.assertEqual(str(err.exception), "CUDA device cuda:0(Tesla T4) is out of memory "
"(reserved: 950000, allocated: 900000, total: 1000000);"
"CUDA device cuda:1(Tesla H200) is out of memory "
"(reserved: 950000, allocated: 900000, total: 1000000)")

def test_cuda_health_check_should_pass_and_log_error_message_when_cuda_calls_encounter_issue_other_than_oom(self):
device_manager = self._device_manager_with_multiple_cuda_devices()
Expand Down
Loading

0 comments on commit 9fcce96

Please sign in to comment.