Skip to content

Commit

Permalink
Add FFmpeg-cuda Support (#1030)
Browse files Browse the repository at this point in the history
  • Loading branch information
wanliAlex authored Nov 27, 2024
1 parent 606b13c commit 42a7f52
Show file tree
Hide file tree
Showing 12 changed files with 538 additions and 190 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ COPY vespa .
RUN mvn clean package

# Stage 2: Base image for Python setup
FROM marqoai/marqo-base:44 as base_image
FROM marqoai/marqo-base:46 as base_image

# Allow mounting volume containing data and configs for vespa
VOLUME /opt/vespa/var
Expand Down
1 change: 1 addition & 0 deletions src/marqo/api/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,5 @@ def default_env_vars() -> dict:
EnvVars.MARQO_MAX_LEXICAL_FIELD_COUNT_UNSTRUCTURED: 100,
EnvVars.MARQO_INDEX_DEPLOYMENT_LOCK_TIMEOUT: 5, # index operations acquire this distributed lock with a timeout
EnvVars.ZOOKEEPER_CONNECTION_TIMEOUT: 15,
EnvVars.MARQO_ENABLE_VIDEO_GPU_ACCELERATION: None # on_start_script will determine this.
}
18 changes: 7 additions & 11 deletions src/marqo/s2_inference/s2_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,28 @@
"""
import datetime
import threading
from typing import List, Dict, Optional

import numpy as np
import torch
from PIL import UnidentifiedImageError
from PIL.Image import Image
from torch import Tensor
from torchvision.transforms import Compose
from typing import List, Dict, Any, Optional

from marqo import marqo_docs
from marqo.api.configs import EnvVars
from marqo.api.exceptions import ModelCacheManagementError, ConfigurationError, InternalError
from marqo.inference.inference_cache.marqo_inference_cache import MarqoInferenceCache
from marqo.s2_inference import constants
from marqo.core.inference.embedding_models.open_clip_model import OPEN_CLIP
from marqo.s2_inference.clip_utils import CLIP
from marqo.s2_inference.configs import get_default_normalization, get_default_seq_length
from marqo.s2_inference.errors import (
VectoriseError, InvalidModelPropertiesError, ModelLoadError,
UnknownModelError, ModelNotInCacheError, ModelDownloadError)
from marqo.inference.inference_cache.marqo_inference_cache import MarqoInferenceCache
from marqo.s2_inference.logger import get_logger
from marqo.s2_inference.model_registry import load_model_properties
from marqo.s2_inference.models.model_type import ModelType
from marqo.s2_inference.types import *
from marqo.s2_inference.multimodal_model_load import *
from marqo.api.configs import EnvVars
from marqo.s2_inference.types import *
from marqo.tensor_search.enums import AvailableModelsKey
from marqo.tensor_search.models.preprocessors_model import Preprocessors
from marqo.tensor_search.models.private_models import ModelAuth
from marqo.tensor_search.utils import read_env_vars_and_defaults, generate_batches, read_env_vars_and_defaults_ints

Expand Down Expand Up @@ -214,7 +210,7 @@ def load_multimodal_model_and_get_preprocessors(model_name: str, model_propertie
device: Optional[str] = None,
model_auth: Optional[ModelAuth] = None,
normalize_embeddings: bool = get_default_normalization()) \
-> Tuple[Any, Dict[str, Optional[Compose]]]:
-> Tuple[Any, Preprocessors]:
"""Load the model and return preprocessors for different modalities.
Args:
Expand Down Expand Up @@ -252,7 +248,7 @@ def load_multimodal_model_and_get_preprocessors(model_name: str, model_propertie
"text": None # Future preprocessor
}

return model, preprocessors
return model, Preprocessors(**preprocessors)


def _get_max_vectorise_batch_size() -> int:
Expand Down
74 changes: 32 additions & 42 deletions src/marqo/tensor_search/add_docs.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,36 @@
"""Functions used to fulfill the add_documents endpoint"""
import concurrent
import copy
import logging
import math
import os
import threading
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from typing import ContextManager
import threading
import torch
import ffmpeg

import logging

import numpy as np
import PIL
import ffmpeg
import numpy as np
import torch
from PIL.ImageFile import ImageFile
from torchvision.transforms import Compose

import marqo.exceptions as base_exceptions
from marqo.core.models.add_docs_params import AddDocsParams
from marqo.core.models.marqo_index import *
from marqo.exceptions import InternalError
from marqo.s2_inference import clip_utils
from marqo.s2_inference.errors import UnsupportedModalityError, S2InferenceError, MediaMismatchError, MediaDownloadError
from marqo.s2_inference.models.model_type import ModelType
from marqo.s2_inference.s2_inference import is_preprocess_image_model, load_multimodal_model_and_get_preprocessors, \
infer_modality, Modality
from marqo.s2_inference.errors import UnsupportedModalityError, S2InferenceError, MediaMismatchError, MediaDownloadError
from marqo.tensor_search.enums import EnvVars
from marqo.tensor_search.streaming_media_processor import StreamingMediaProcessor
from marqo.tensor_search import enums
from marqo.tensor_search import utils
from marqo.tensor_search.enums import EnvVars
from marqo.tensor_search.models.preprocessors_model import Preprocessors
from marqo.tensor_search.models.private_models import ModelAuth
from marqo.tensor_search.streaming_media_processor import StreamingMediaProcessor
from marqo.tensor_search.telemetry import RequestMetricsStore, RequestMetrics
from marqo.tensor_search.models.preprocessors_model import Preprocessors

from marqo.s2_inference.models.model_type import ModelType

logger = logging.getLogger(__name__)

Expand All @@ -43,7 +42,7 @@ def threaded_download_and_preprocess_content(allocated_docs: List[dict],
media_field_types_mapping: Optional[Dict[str, FieldType]] = None,
media_download_headers: Optional[Dict] = None,
metric_obj: Optional[RequestMetrics] = None,
preprocessors: Optional[Dict[str, Compose]] = None,
preprocessors: Optional[Preprocessors] = None,
marqo_index_type: Optional[IndexType] = None,
marqo_index_model: Optional[Model] = None,
audio_preprocessing: Optional[AudioPreProcessing] = None,
Expand All @@ -55,7 +54,7 @@ def threaded_download_and_preprocess_content(allocated_docs: List[dict],
Args:
allocated_docs: docs with images to be downloaded by this thread,
image_repo: dictionary that will be mutated by this thread. It will add PIL images
media_repo: dictionary that will be mutated by this thread. It will add media
as values and the URLs as keys
tensor_fields: A tuple of tensor_fields. Images will be downloaded for these fields only.
media_download_headers: A dict of headers for image download. Can be used
Expand All @@ -77,7 +76,6 @@ def threaded_download_and_preprocess_content(allocated_docs: List[dict],
# Determine index type
is_structured_index = marqo_index_type == IndexType.Structured
is_unstructured_index = marqo_index_type in [IndexType.Unstructured, IndexType.SemiStructured]

# Generate pseudo-unique ID for thread metrics.
_id = f'image_download.{threading.get_ident()}'
TIMEOUT_SECONDS = 3
Expand Down Expand Up @@ -126,11 +124,11 @@ def threaded_download_and_preprocess_content(allocated_docs: List[dict],
metric_obj.increment_counter(f"{doc.get(field, '')}.UnidentifiedImageError")
continue
# preprocess image to tensor
if preprocessors is not None and preprocessors['image'] is not None:
if preprocessors is not None and preprocessors.image is not None:
if not device or not isinstance(device, str):
raise ValueError("Device must be provided for preprocessing images")
try:
media_repo[doc[field]] = preprocessors['image'](media_repo[doc[field]]).to(device)
media_repo[doc[field]] = preprocessors.image(media_repo[doc[field]]).to(device)
except OSError as e:
if "image file is truncated" in str(e):
media_repo[doc[field]] = e
Expand Down Expand Up @@ -167,7 +165,6 @@ def threaded_download_and_preprocess_content(allocated_docs: List[dict],
try:
processed_chunks = download_and_chunk_media(
url=doc[field], device=device, modality=inferred_modality,
marqo_index_type=marqo_index_type, marqo_index_model=marqo_index_model,
preprocessors=preprocessors, audio_preprocessing=audio_preprocessing,
video_preprocessing=video_preprocessing, media_download_headers=media_download_headers
)
Expand All @@ -180,36 +177,21 @@ def threaded_download_and_preprocess_content(allocated_docs: List[dict],
media_repo[doc[field]] = S2InferenceError(f"Error processing media file {doc}, detected as text, expected a {media_field_types_mapping[field]} pointer")
else:
pass

# For multimodal tensor combination
elif isinstance(doc[field], dict):
for sub_field in list(doc[field].values()):
if isinstance(sub_field, str) and clip_utils._is_image(sub_field):
if sub_field in media_repo:
continue
try:
media_repo[sub_field] = clip_utils.load_image_from_path(
sub_field,
media_download_headers,
timeout=TIMEOUT_SECONDS,
metrics_obj=metric_obj
)
except PIL.UnidentifiedImageError as e:
media_repo[sub_field] = e
metric_obj.increment_counter(f"{doc.get(field, '')}.UnidentifiedImageError")
continue
else:
raise InternalError(f"Invalid field type for {field} to be added in media repo. Must be a string "
f"but {type(field)}.")


def download_and_chunk_media(url: str, device: str, modality: Modality, marqo_index_type: IndexType, marqo_index_model: Model,
def download_and_chunk_media(url: str, device: str, modality: Modality,
preprocessors: Preprocessors, audio_preprocessing: AudioPreProcessing = None,
video_preprocessing: VideoPreProcessing = None,
media_download_headers: Optional[Dict] = None) -> List[Dict[str, torch.Tensor]]:
MAX_FILE_SIZE = 100 * 1024 * 1024 # 100 MB in bytes

processor = StreamingMediaProcessor(
url=url, device=device, modality=modality, marqo_index_type=marqo_index_type, marqo_index_model=marqo_index_model,
preprocessors=preprocessors, audio_preprocessing=audio_preprocessing, video_preprocessing=video_preprocessing,
media_download_headers=media_download_headers
url=url, device=device, modality=modality, preprocessors=preprocessors,
audio_preprocessing=audio_preprocessing, video_preprocessing=video_preprocessing,
media_download_headers=media_download_headers, enable_video_gpu_acceleration=_enable_video_gpu_acceleration()
)

if processor.total_size > MAX_FILE_SIZE:
Expand All @@ -219,6 +201,14 @@ def download_and_chunk_media(url: str, device: str, modality: Modality, marqo_in
return processor.process_media()


def _enable_video_gpu_acceleration() -> bool:
"""A helper function to determine if the video decoding should be done on the GPU.
The environment variable MARQO_ENABLE_VIDEO_GPU_ACCELERATION is set on marqo start_on script.
"""
return utils.read_env_vars_and_defaults(EnvVars.MARQO_ENABLE_VIDEO_GPU_ACCELERATION) == 'TRUE'


@contextmanager
def download_and_preprocess_multimedia_content(
docs: List[Dict[str, str]],
Expand Down Expand Up @@ -364,7 +354,7 @@ def process_batch(
)

if not is_preprocess_image_model(model_properties) or patch_method_exists:
preprocessors['image'] = None
preprocessors.image = None

media_repo = {}
m = [RequestMetrics() for i in range(thread_count)]
Expand Down
1 change: 1 addition & 0 deletions src/marqo/tensor_search/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class EnvVars:
MARQO_INDEX_DEPLOYMENT_LOCK_TIMEOUT = "MARQO_INDEX_DEPLOYMENT_LOCK_TIMEOUT"
ZOOKEEPER_HOSTS = "ZOOKEEPER_HOSTS"
ZOOKEEPER_CONNECTION_TIMEOUT = "ZOOKEEPER_CONNECTION_TIMEOUT"
MARQO_ENABLE_VIDEO_GPU_ACCELERATION = "MARQO_ENABLE_VIDEO_GPU_ACCELERATION" # The value will be determined in the Marqo startup script


class RequestType:
Expand Down
31 changes: 21 additions & 10 deletions src/marqo/tensor_search/models/preprocessors_model.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
from pydantic import BaseModel
from torchvision.transforms import Compose
from typing import Optional
from typing import Optional, Any

from marqo.base_model import MarqoBaseModel
from marqo.s2_inference.multimodal_model_load import Modality

class Preprocessors(BaseModel):
image: Optional[Compose] = None
text: Optional[Compose] = None
video: Optional[Compose] = None
audio: Optional[Compose] = None

class Config:
arbitrary_types_allowed = True
class Preprocessors(MarqoBaseModel):
"""The type of preprocessors is unknown, so we use Any."""
image: Optional[Any] = None
text: Optional[Any] = None
video: Optional[Any] = None
audio: Optional[Any] = None

def get_preprocessor(self, modality: Modality):
if modality == Modality.IMAGE:
return self.image
elif modality == Modality.TEXT:
return self.text
elif modality == Modality.VIDEO:
return self.video
elif modality == Modality.AUDIO:
return self.audio
else:
raise ValueError(f"Unknown modality {modality}")
69 changes: 68 additions & 1 deletion src/marqo/tensor_search/on_start_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
from marqo.tensor_search import index_meta_cache, utils
from marqo.tensor_search.enums import EnvVars
from marqo.tensor_search.tensor_search_logging import get_logger
from marqo import marqo_docs
import subprocess
import nltk



logger = get_logger(__name__)

Expand All @@ -30,8 +35,9 @@ def on_start(config: config.Config):
DownloadStartText(),
CUDAAvailable(),
SetBestAvailableDevice(),
CacheModels(),
SetEnableVideoGPUAcceleration(),
CheckNLTKTokenizers(),
CacheModels(),
InitializeRedis("localhost", 6379),
CachePatchModels(),
DownloadFinishText(),
Expand Down Expand Up @@ -259,6 +265,67 @@ def run(self):
self.logger.info(message)
self.logger.info("completed prewarming patch models")

class SetEnableVideoGPUAcceleration:

logger = get_logger('SetVideoProcessingDevice')

def run(self):
"""This method will set the env var MARQO_ENABLE_VIDEO_GPU_ACCELERATION to TRUE or FALSE."""
env_value = utils.read_env_vars_and_defaults(EnvVars.MARQO_ENABLE_VIDEO_GPU_ACCELERATION)
if env_value is None:
try:
self._check_video_gpu_acceleration_availability()
os.environ[EnvVars.MARQO_ENABLE_VIDEO_GPU_ACCELERATION] = "TRUE"
except exceptions.StartupSanityCheckError as e:
self.logger.debug(f"Failed to use GPU acceleration for video processing. We will disable it. "
f"Original error message: {e}")
os.environ[EnvVars.MARQO_ENABLE_VIDEO_GPU_ACCELERATION] = "FALSE"
elif env_value == "TRUE":
self._check_video_gpu_acceleration_availability()
elif env_value == "FALSE":
pass
else:
raise exceptions.EnvVarError(
f"Invalid value for {EnvVars.MARQO_ENABLE_VIDEO_GPU_ACCELERATION}. "
f"Please set it to either 'TRUE' or 'FALSE'."
)

def _check_video_gpu_acceleration_availability(self):
"""Check if the required dependencies are available for video processing with GPU acceleration for ffmpeg.
Raises:
exceptions.StartupSanityCheckError: If the required dependencies are not available.
"""
ffmpeg_command_gpu_check = [
'ffmpeg',
'-v', 'error', # Suppress output
'-hwaccel', 'cuda', # Use CUDA for hardware acceleration
'-f', 'lavfi', # Input format is a lavfi (FFmpeg's built-in filter)
'-i', 'nullsrc=s=200x100', # Generate a blank video source of 200x100 resolution
'-vframes', '1', # Process only 1 frame
'-c:v', 'h264_nvenc', # Use NVENC encoder
'-f', 'null', # Output to null (discard the output)
'-' # Output to stdout (discarded)
]
try:
_ = subprocess.run(
ffmpeg_command_gpu_check, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True,
text=True, timeout=10
)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
raise exceptions.StartupSanityCheckError(
f"Failed to use GPU acceleration for video processing. "
f"Ensure that your system has the required dependencies installed. "
f"You can set 'MARQO_ENABLE_VIDEO_GPU_ACCELERATION=FALSE' to disable GPU acceleration. "
f"Check {marqo_docs.configuring_marqo()} for more information. "
f"Original error message: {e.stderr}"
) from e
except (ValueError, OSError) as e:
raise exceptions.StartupSanityCheckError(
f"Marqo failed to run the ffmpeg sanity check. Your ffmepeg installation might be broken. "
f"Original error: {e}"
) from e


class CheckNLTKTokenizers:
"""Check if NLTK tokenizers are available, if not, download them.
Expand Down
Loading

0 comments on commit 42a7f52

Please sign in to comment.