aidial_adapter_openai/embeddings/azure_ai_vision.py (112 lines of code) (raw):

""" Adapter for multi-modal embeddings provided by Azure AI Vision service. 1. Conceptual overview: https://aka.ms/image-retrieval 2. How-to article: https://learn.microsoft.com/en-us/azure/ai-services/computer-vision/how-to/image-retrieval?tabs=python 3. REST API (image url, binary image, text): https://learn.microsoft.com/en-gb/rest/api/computervision/image-retrieval?view=rest-computervision-v4.0-preview%20(2023-04-01) 4. A plug-in for Azure Search service: https://learn.microsoft.com/en-gb/azure/search/vector-search-vectorizer-ai-services-vision 5. Example of usage in a RAG: https://github.com/Azure-Samples/azure-search-openai-demo/blob/0946893fe904cab1e89de2a38c4421e38d508608/app/backend/prepdocslib/embeddings.py#L226-L260 Note that currently there is no Python SDK for this API. There is SDK for Image Analysis 4.0 API, but it doesn't cover the multi-modal embeddings API: https://learn.microsoft.com/en-us/azure/ai-services/computer-vision/how-to/call-analyze-image-40?pivots=programming-language-python Input requirements: 1. The file size of the image must be less than 20 megabytes (MB). 2. The dimensions of the image must be greater than 10 x 10 pixels and less than 16,000 x 16,000 pixels. 3. The text string must be between (inclusive) one word and 70 words. 4. Supported media types: "application/octet-stream", "image/jpeg", "image/gif", "image/tiff", "image/bmp", "image/png" Output characteristics: 1. The vector embeddings are normalized. 2. Image and text vector embeddings have 1024 dimensions. Limitations: 1. Batching isn't supported. Note that when both "url" and "text" fields are sent in a request, the "text" field is ignored. """ import asyncio from typing import AsyncIterator, List, assert_never import aiohttp from aidial_sdk.chat_completion.request import Attachment from aidial_sdk.embeddings.request import EmbeddingsRequest from aidial_sdk.embeddings.response import Embedding, EmbeddingResponse, Usage from aidial_sdk.exceptions import HTTPException as DialException from pydantic import BaseModel from aidial_adapter_openai.dial_api.embedding_inputs import ( collect_embedding_inputs, ) from aidial_adapter_openai.dial_api.resource import AttachmentResource from aidial_adapter_openai.dial_api.storage import FileStorage from aidial_adapter_openai.utils.auth import OpenAICreds from aidial_adapter_openai.utils.resource import Resource # The latest Image Analysis API offers two models: # * version 2023-04-15 which supports text search in many languages, # * the legacy 2022-04-11 model which supports only English. _VERSION_PARAMS = { "api-version": "2024-02-01", "model-version": "2023-04-15", } def _get_auth_headers(creds: OpenAICreds) -> dict[str, str]: if "api_key" in creds: return {"Ocp-Apim-Subscription-Key": creds["api_key"]} if "azure_ad_token" in creds: return {"Authorization": f"Bearer {creds['azure_ad_token']}"} raise ValueError("Invalid credentials") class VectorizeResponse(BaseModel): class Config: extra = "allow" vector: List[float] async def embeddings( creds: OpenAICreds, deployment: str, endpoint: str, file_storage: FileStorage | None, data: dict, ) -> EmbeddingResponse: input = EmbeddingsRequest.parse_obj(data) async def on_text(text: str) -> str: return text async def on_attachment(attachment: Attachment) -> Resource: return await AttachmentResource(attachment=attachment).download( file_storage ) inputs_iter: AsyncIterator[str | Resource] = collect_embedding_inputs( input, on_text=on_text, on_attachment=on_attachment, ) inputs: List[str | Resource] = [input async for input in inputs_iter] async def _get_embedding( session: aiohttp.ClientSession, input: str | Resource ) -> VectorizeResponse: if isinstance(input, str): return await _get_text_embedding(session, endpoint, input) elif isinstance(input, Resource): return await _get_image_embedding(session, endpoint, input) else: assert_never(input) async with aiohttp.ClientSession( raise_for_status=_error_handler, headers=_get_auth_headers(creds), ) as session: tasks = [ asyncio.create_task(_get_embedding(session, input_)) for input_ in inputs ] responses = await asyncio.gather(*tasks) vectors = [ Embedding(embedding=r.vector, index=idx) for idx, r in enumerate(responses) ] n = len(vectors) usage = Usage(prompt_tokens=n, total_tokens=n) return EmbeddingResponse(model=deployment, data=vectors, usage=usage) async def _get_image_embedding( session: aiohttp.ClientSession, endpoint: str, resource: Resource, ) -> VectorizeResponse: resp = await session.post( url=endpoint.rstrip("/") + "/computervision/retrieval:vectorizeImage", params=_VERSION_PARAMS, headers={"content-type": resource.type}, data=resource.data, ) return VectorizeResponse.parse_obj(await resp.json()) async def _get_text_embedding( session: aiohttp.ClientSession, endpoint: str, text: str, ) -> VectorizeResponse: resp = await session.post( url=endpoint.rstrip("/") + "/computervision/retrieval:vectorizeText", params=_VERSION_PARAMS, json={"text": text}, ) return VectorizeResponse.parse_obj(await resp.json()) async def _error_handler(response: aiohttp.ClientResponse) -> None: # The Azure AI Vision service returns error responses in a format similar to the OpenAI error format if not response.ok: body = await response.json() error = body.get("error") or {} message = error.get("message") or response.reason or "Unknown Error" code = error.get("code") type = error.get("type") param = error.get("param") raise DialException( message=message, status_code=response.status, type=type, param=param, code=code, )