aidial_adapter_vertexai/chat/consumer.py (107 lines of code) (raw):
from abc import ABC, abstractmethod
from types import TracebackType
from typing import Optional
from aidial_sdk.chat_completion import (
Attachment,
Choice,
FinishReason,
Response,
Stage,
)
from aidial_adapter_vertexai.dial_api.token_usage import TokenUsage
class Consumer(ABC):
@abstractmethod
async def append_content(self, content: str):
pass
@abstractmethod
async def create_function_call(self, name: str, arguments: str | None):
pass
@abstractmethod
async def create_tool_call(self, id: str, name: str, arguments: str | None):
pass
@abstractmethod
async def add_attachment(self, attachment: Attachment):
pass
@abstractmethod
async def set_usage(self, usage: TokenUsage):
pass
@abstractmethod
async def set_finish_reason(self, finish_reason: FinishReason):
pass
@abstractmethod
def is_empty(self) -> bool:
pass
@abstractmethod
async def create_stage(self, name: str) -> Stage:
pass
class ChoiceConsumer(Consumer):
response: Response
_choice: Optional[Choice]
usage: TokenUsage
finish_reason: Optional[FinishReason]
empty: bool
"""
Whether the consumer has sent something to the choice or not.
"""
def __init__(self, response: Response):
self.response = response
self._choice = None
self.empty = True
self.usage = TokenUsage()
self.finish_reason = None
@property
def choice(self) -> Choice:
if self._choice is None:
# Delay opening a choice to the very last moment
# so as to give opportunity for exceptions to bubble up to
# the level of HTTP response (instead of error objects in a stream).
choice = self._choice = self.response.create_choice()
choice.open()
return choice
else:
return self._choice
@property
def choice_idx(self) -> int | None:
if self._choice is None:
return None
return self._choice.index
def __enter__(self):
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
traceback: TracebackType | None,
) -> bool | None:
if exc is None and self._choice is not None:
self._choice.close()
return False
def is_empty(self) -> bool:
return self.empty
async def create_function_call(self, name: str, arguments: str | None):
self.empty = False
await self.set_finish_reason(FinishReason.FUNCTION_CALL)
self.choice.create_function_call(name, arguments)
async def create_tool_call(self, id: str, name: str, arguments: str | None):
self.empty = False
await self.set_finish_reason(FinishReason.TOOL_CALLS)
self.choice.create_function_tool_call(id, name, arguments)
async def append_content(self, content: str):
self.empty = self.empty and content == ""
self.choice.append_content(content)
async def add_attachment(self, attachment: Attachment):
self.empty = False
self.choice.add_attachment(attachment)
async def set_usage(self, usage: TokenUsage):
self.usage = usage
async def set_finish_reason(self, finish_reason: FinishReason):
if finish_reason == FinishReason.STOP and self.finish_reason in [
FinishReason.FUNCTION_CALL,
FinishReason.TOOL_CALLS,
]:
return
if (
self.finish_reason is not None
and self.finish_reason != finish_reason
):
raise RuntimeError(
"finish_reason was set twice with different values: "
f"{self.finish_reason}, {finish_reason}"
)
self.finish_reason = finish_reason
async def create_stage(self, name) -> Stage:
return self.choice.create_stage(name)