aidial_adapter_bedrock/llm/model/claude/v1_v2/adapter.py (122 lines of code) (raw):
from typing import Any, AsyncIterator, Dict
import anthropic
from anthropic._tokenizers import async_get_tokenizer
from tokenizers import Tokenizer
import aidial_adapter_bedrock.utils.stream as stream_utils
from aidial_adapter_bedrock.bedrock import Bedrock
from aidial_adapter_bedrock.deployments import ChatCompletionDeployment
from aidial_adapter_bedrock.dial_api.request import ModelParameters
from aidial_adapter_bedrock.dial_api.token_usage import TokenUsage
from aidial_adapter_bedrock.llm.chat_emulator import (
BasicChatEmulator,
ChatEmulator,
CueMapping,
)
from aidial_adapter_bedrock.llm.chat_model import (
PseudoChatModel,
trivial_partitioner,
)
from aidial_adapter_bedrock.llm.consumer import Consumer
from aidial_adapter_bedrock.llm.message import BaseMessage, SystemMessage
from aidial_adapter_bedrock.llm.model.conf import DEFAULT_MAX_TOKENS_ANTHROPIC
from aidial_adapter_bedrock.llm.tools.claude_emulator import (
legacy_tools_emulator,
)
from aidial_adapter_bedrock.llm.tools.default_emulator import (
default_tools_emulator,
)
# NOTE: See https://docs.anthropic.com/claude/reference/complete_post
def convert_params(params: ModelParameters) -> Dict[str, Any]:
ret = {}
if params.max_tokens is not None:
ret["max_tokens_to_sample"] = params.max_tokens
else:
# The max tokens parameter is required for Anthropic models.
# Choosing reasonable default.
ret["max_tokens_to_sample"] = DEFAULT_MAX_TOKENS_ANTHROPIC
if params.stop:
ret["stop_sequences"] = params.stop
if params.temperature is not None:
ret["temperature"] = params.temperature
if params.top_p is not None:
ret["top_p"] = params.top_p
return ret
def create_request(prompt: str, params: Dict[str, Any]) -> Dict[str, Any]:
return {"prompt": prompt, **params}
async def chunks_to_stream(
chunks: AsyncIterator[dict],
) -> AsyncIterator[str]:
async for chunk in chunks:
yield chunk["completion"]
async def response_to_stream(response: dict) -> AsyncIterator[str]:
yield response["completion"]
def get_anthropic_emulator(is_system_message_supported: bool) -> ChatEmulator:
def add_cue(message: BaseMessage, idx: int) -> bool:
if (
idx == 0
and isinstance(message, SystemMessage)
and is_system_message_supported
):
return False
return True
return BasicChatEmulator(
prelude_template=None,
add_cue=add_cue,
add_invitation_cue=True,
fallback_to_completion=False,
cues=CueMapping(
system=anthropic.HUMAN_PROMPT.strip(),
human=anthropic.HUMAN_PROMPT.strip(),
ai=anthropic.AI_PROMPT.strip(),
),
separator="\n\n",
)
class Adapter(PseudoChatModel):
model: str
client: Bedrock
tokenizer: Tokenizer
is_claude_v2_1: bool
@classmethod
async def create(cls, client: Bedrock, model: str):
is_claude_v2_1 = (
model == ChatCompletionDeployment.ANTHROPIC_CLAUDE_V2_1.value
)
chat_emulator = get_anthropic_emulator(
is_system_message_supported=is_claude_v2_1
)
tools_emulator = (
legacy_tools_emulator if is_claude_v2_1 else default_tools_emulator
)
tokenizer = await async_get_tokenizer()
return cls(
client=client,
model=model,
tokenize_string=lambda text: len(tokenizer.encode(text).ids),
chat_emulator=chat_emulator,
tools_emulator=tools_emulator,
partitioner=trivial_partitioner,
is_claude_v2_1=is_claude_v2_1,
tokenizer=tokenizer,
)
async def predict(
self, consumer: Consumer, params: ModelParameters, prompt: str
):
args = create_request(prompt, convert_params(params))
if params.stream:
chunks = self.client.ainvoke_streaming(self.model, args)
stream = chunks_to_stream(chunks)
else:
response, _headers = await self.client.ainvoke_non_streaming(
self.model, args
)
stream = response_to_stream(response)
stream = stream_utils.lstrip(stream)
completion = ""
async for content in stream:
completion += content
consumer.append_content(content)
consumer.close_content()
consumer.add_usage(self._compute_usage(prompt, completion))
def _compute_usage(self, prompt: str, completion: str) -> TokenUsage:
batch = self.tokenizer.encode_batch([prompt, completion])
return TokenUsage(
prompt_tokens=len(batch[0].ids),
completion_tokens=len(batch[1].ids),
)