Skip to content

Commit

Permalink
add vision client
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfeil committed Feb 11, 2025
1 parent 215f795 commit 154160c
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,59 @@
import threading
from concurrent.futures import ThreadPoolExecutor, Future
import numpy as np # Import numpy
from typing import Union
from typing import Union, Literal

HAS_IMPORTS = True
try:
from PIL import Image
import numpy as np

except ImportError:
HAS_IMPORTS = False

try:
import requests
from requests.adapters import HTTPAdapter, Retry
except ImportError:
HAS_IMPORTS = False


class InfinityVisionAPI:
def __init__(self, url: str = "https://infinity-multimodal.modal.michaelfeil.eu", format="base64") -> None:
def __init__(
self,
url: str = "https://infinity-multimodal.modal.michaelfeil.eu",
format: Literal["base64", "float"] = "base64",
model: str = "michaelfeil/colqwen2-v0.1",
) -> None:
"""client usage for infinity multimodal api
Args:
url (str, optional): url of the deployment. Defaults to "https://infinity-multimodal.modal.michaelfeil.eu".
format (str, optional): base. Defaults to "base64".
model (str, optional): served_model_name in the deployment. Defaults to "michaelfeil/colqwen2-v0.1".
"""
req = requests.post(
url + "/embeddings",
json={ # get shape of output
"model": "michaelfeil/colqwen2-v0.1",
json={ # get shape of output by sending a float request
"model": model,
"input": ["test"],
"encoding_format": "float",
"modality": "text"
}
"modality": "text",
},
)
req.raise_for_status()
self.url = url
self.hidden_dim = np.array(req.json()["data"][0]["embedding"]).shape[-1]
self.format = format
self.tp = ThreadPoolExecutor()
self.tp.__enter__()

self.sem = threading.Semaphore(64)
self.session = requests.Session()
adapter = HTTPAdapter(max_retries=Retry(total=10, backoff_factor=0.5))
self.session.mount("https://", adapter)
self.session.mount("http://", adapter)

def _image_payload(self, images: list["Image.Image"]) -> list[str]:
if not HAS_IMPORTS:
raise ImportError("PIL is required to use this class")
Expand All @@ -54,15 +68,15 @@ def _image_payload(self, images: list["Image.Image"]) -> list[str]:
img_str = base64.b64encode(buffered.getvalue()).decode("utf-8")
b64_strs.append(f"data:image/jpeg;base64,{img_str}")
return b64_strs

def _text_payload(self, texts: list[str]) -> list[str]:
return texts

def health(self) -> bool:
req = self.session.get(f"{self.url}/health")
req.raise_for_status()
return req.status_code == 200

def _request(self, model: str, images_or_text: list[Union["Image.Image", str]]) -> dict:
if all(hasattr(item, "save") for item in images_or_text):
payload = self._image_payload(images_or_text)
Expand All @@ -75,45 +89,37 @@ def _request(self, model: str, images_or_text: list[Union["Image.Image", str]])

embeddings_req = self.session.post(
f"{self.url}/embeddings",
json={
"model": model,
"input": payload,
"encoding_format": self.format,
"modality": modality
}
json={"model": model, "input": payload, "encoding_format": self.format, "modality": modality},
)
embeddings_req.raise_for_status()
embeddings = embeddings_req.json()

if self.format == "base64":
embeddings_decoded = [
np.frombuffer(
base64.b64decode(e["embedding"]), dtype=np.float32
).reshape(-1, self.hidden_dim)
np.frombuffer(base64.b64decode(e["embedding"]), dtype=np.float32).reshape(-1, self.hidden_dim)
for e in embeddings["data"]
]
else:
embeddings_decoded = [
np.array(e["embedding"])
for e in embeddings["data"]
]
embeddings_decoded = [np.array(e["embedding"]) for e in embeddings["data"]]
return embeddings_decoded, embeddings["usage"]["total_tokens"]

def embed(self, model: str, sentences: list[str]) -> Future[list]:
self.health()
with self.sem:
return self.tp.submit(self._request, model=model, images_or_text=sentences)

def image_embed(self, model: str, images: list["Image.Image"]) -> Future[list]:
self.health() # Call once instead of per image
with self.sem:
return self.tp.submit(self._request, model=model, images_or_text=images)



def test_colpali():
colpali = InfinityVisionAPI()
future = colpali.embed("michaelfeil/colqwen2-v0.1", ["test"])
embeddings, total_tokens = future.result()
print(embeddings, total_tokens)



if __name__ == "__main__":
test_colpali()
test_colpali()
2 changes: 1 addition & 1 deletion libs/client_infinity/run_generate_with_hook.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ for i in {1..10}; do
done

# Run the tests
pip install openapi-python-client==0.21.1
python -m pip install openapi-python-client==0.21.1 && \
openapi-python-client generate \
--url http://0.0.0.0:7993/openapi.json \
--config client_config.yaml \
Expand Down
70 changes: 39 additions & 31 deletions libs/client_infinity/template/vision_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,59 @@
import threading
from concurrent.futures import ThreadPoolExecutor, Future
import numpy as np # Import numpy
from typing import Union
from typing import Union, Literal

HAS_IMPORTS = True
try:
from PIL import Image
import numpy as np

except ImportError:
HAS_IMPORTS = False

try:
import requests
from requests.adapters import HTTPAdapter, Retry
except ImportError:
HAS_IMPORTS = False


class InfinityVisionAPI:
def __init__(self, url: str = "https://infinity-multimodal.modal.michaelfeil.eu", format="base64") -> None:
def __init__(
self,
url: str = "https://infinity-multimodal.modal.michaelfeil.eu",
format: Literal["base64", "float"] = "base64",
model: str = "michaelfeil/colqwen2-v0.1",
) -> None:
"""client usage for infinity multimodal api
Args:
url (str, optional): url of the deployment. Defaults to "https://infinity-multimodal.modal.michaelfeil.eu".
format (str, optional): base. Defaults to "base64".
model (str, optional): served_model_name in the deployment. Defaults to "michaelfeil/colqwen2-v0.1".
"""
req = requests.post(
url + "/embeddings",
json={ # get shape of output
"model": "michaelfeil/colqwen2-v0.1",
json={ # get shape of output by sending a float request
"model": model,
"input": ["test"],
"encoding_format": "float",
"modality": "text"
}
"modality": "text",
},
)
req.raise_for_status()
self.url = url
self.hidden_dim = np.array(req.json()["data"][0]["embedding"]).shape[-1]
self.format = format
self.tp = ThreadPoolExecutor()
self.tp.__enter__()

self.sem = threading.Semaphore(64)
self.session = requests.Session()
adapter = HTTPAdapter(max_retries=Retry(total=10, backoff_factor=0.5))
self.session.mount("https://", adapter)
self.session.mount("http://", adapter)

def _image_payload(self, images: list["Image.Image"]) -> list[str]:
if not HAS_IMPORTS:
raise ImportError("PIL is required to use this class")
Expand All @@ -54,15 +68,15 @@ def _image_payload(self, images: list["Image.Image"]) -> list[str]:
img_str = base64.b64encode(buffered.getvalue()).decode("utf-8")
b64_strs.append(f"data:image/jpeg;base64,{img_str}")
return b64_strs

def _text_payload(self, texts: list[str]) -> list[str]:
return texts

def health(self) -> bool:
req = self.session.get(f"{self.url}/health")
req.raise_for_status()
return req.status_code == 200

def _request(self, model: str, images_or_text: list[Union["Image.Image", str]]) -> dict:
if all(hasattr(item, "save") for item in images_or_text):
payload = self._image_payload(images_or_text)
Expand All @@ -75,45 +89,39 @@ def _request(self, model: str, images_or_text: list[Union["Image.Image", str]])

embeddings_req = self.session.post(
f"{self.url}/embeddings",
json={
"model": model,
"input": payload,
"encoding_format": self.format,
"modality": modality
}
json={"model": model, "input": payload, "encoding_format": self.format, "modality": modality},
)
embeddings_req.raise_for_status()
embeddings = embeddings_req.json()

if self.format == "base64":
embeddings_decoded = [
np.frombuffer(
base64.b64decode(e["embedding"]), dtype=np.float32
).reshape(-1, self.hidden_dim)
np.frombuffer(base64.b64decode(e["embedding"]), dtype=np.float32).reshape(-1, self.hidden_dim)
for e in embeddings["data"]
]
else:
embeddings_decoded = [
np.array(e["embedding"])
for e in embeddings["data"]
]
embeddings_decoded = [np.array(e["embedding"]) for e in embeddings["data"]]
return embeddings_decoded, embeddings["usage"]["total_tokens"]

def embed(self, model: str, sentences: list[str]) -> Future[list]:
self.health()
with self.sem:
return self.tp.submit(self._request, model=model, images_or_text=sentences)

def image_embed(self, model: str, images: list["Image.Image"]) -> Future[list]:
self.health() # Call once instead of per image
with self.sem:
return self.tp.submit(self._request, model=model, images_or_text=images)



def test_colpali():
"""example usage"""
colpali = InfinityVisionAPI()
assert colpali.health()
future = colpali.embed("michaelfeil/colqwen2-v0.1", ["test"])
embeddings, total_tokens = future.result()
print(embeddings, total_tokens)



if __name__ == "__main__":
test_colpali()
test_colpali()

0 comments on commit 154160c

Please sign in to comment.