aidial_analytics_realtime/analytics.py (343 lines of code) (raw):
from datetime import datetime
from decimal import Decimal
from enum import Enum
from logging import Logger
from typing import Awaitable, Callable
from uuid import uuid4
from influxdb_client import Point
from langid.langid import LanguageIdentifier, model
from typing_extensions import assert_never
from aidial_analytics_realtime.dial import (
get_chat_completion_request_contents,
get_chat_completion_response_contents,
get_embeddings_request_contents,
)
from aidial_analytics_realtime.rates import RatesCalculator
from aidial_analytics_realtime.topic_model import TopicModel
from aidial_analytics_realtime.utils.concurrency import (
run_in_cpu_tasks_executor,
)
identifier = LanguageIdentifier.from_modelstring(model, norm_probs=True)
class RequestType(Enum):
CHAT_COMPLETION = 1
EMBEDDING = 2
async def detect_lang(
logger: Logger, request: dict, response: dict, request_type: RequestType
) -> str:
match request_type:
case RequestType.CHAT_COMPLETION:
request_contents = get_chat_completion_request_contents(
logger, request
)
response_content = get_chat_completion_response_contents(
logger, response
)
text = "\n\n".join(request_contents[-1:] + response_content)
case RequestType.EMBEDDING:
text = "\n\n".join(get_embeddings_request_contents(logger, request))
case _:
assert_never(request_type)
return to_string(await detect_lang_by_text(text))
async def detect_lang_by_text(text: str) -> str | None:
text = text.strip()
if not text:
return None
try:
lang, prob = await run_in_cpu_tasks_executor(identifier.classify, text)
if prob > 0.998:
return lang
except Exception:
pass
return None
def to_string(obj: str | None) -> str:
return obj or "undefined"
def build_execution_path(path: list | None):
return "undefined" if not path else "/".join(map(to_string, path))
async def make_point(
logger: Logger,
deployment: str,
model: str,
project_id: str,
chat_id: str | None,
upstream_url: str | None,
user_hash: str,
user_title: str,
timestamp: datetime,
request: dict | None,
response: dict | None,
request_type: RequestType,
usage: dict | None,
topic_model: TopicModel,
rates_calculator: RatesCalculator,
parent_deployment: str | None,
trace: dict | None,
execution_path: list | None,
):
topic = None
response_content = ""
request_content = ""
if response is not None and request is not None:
match request_type:
case RequestType.CHAT_COMPLETION:
response_contents = get_chat_completion_response_contents(
logger, response
)
request_contents = get_chat_completion_request_contents(
logger, request
)
request_content = "\n".join(request_contents)
response_content = "\n".join(response_contents)
if chat_id:
topic = to_string(
await topic_model.get_topic_by_text(
"\n\n".join(request_contents + response_contents)
)
)
case RequestType.EMBEDDING:
request_contents = get_embeddings_request_contents(
logger, request
)
request_content = "\n".join(request_contents)
if chat_id:
topic = to_string(
await topic_model.get_topic_by_text(
"\n\n".join(request_contents)
)
)
case _:
assert_never(request_type)
price = Decimal(0)
deployment_price = Decimal(0)
if usage is not None and usage.get("price") is not None:
price = usage["price"]
deployment_price = usage.get("deployment_price", Decimal(0))
else:
price = rates_calculator.calculate_price(
deployment, model, request_content, response_content, usage
)
point = (
Point("analytics")
.tag("model", model)
.tag("deployment", deployment)
.tag("parent_deployment", to_string(parent_deployment))
.tag(
"execution_path",
build_execution_path(execution_path),
)
.tag("trace_id", "undefined" if not trace else trace["trace_id"])
.tag(
"core_span_id", "undefined" if not trace else trace["core_span_id"]
)
.tag(
"core_parent_span_id",
(
"undefined"
if not trace
else to_string(trace.get("core_parent_span_id"))
),
)
.tag("project_id", project_id)
.tag(
"language",
(
"undefined"
if not chat_id or request is None or response is None
else await detect_lang(logger, request, response, request_type)
),
)
.tag("upstream", to_string(upstream_url))
.tag("topic", topic)
.tag("title", to_string(user_title))
.tag(
"response_id",
(
response["id"]
if request_type == RequestType.CHAT_COMPLETION
and response is not None
else uuid4()
),
)
.field("user_hash", to_string(user_hash))
.field("price", price)
.field("deployment_price", deployment_price)
.field(
"number_request_messages",
(
0
if request is None
else (
len(request["messages"])
if request_type == RequestType.CHAT_COMPLETION
else (
1
if isinstance(request["input"], str)
else len(request["input"])
)
)
),
)
.field("chat_id", to_string(chat_id))
.time(timestamp)
)
if usage is not None:
point.field(
"completion_tokens",
usage["completion_tokens"] if "completion_tokens" in usage else 0,
)
point.field(
"prompt_tokens",
usage["prompt_tokens"] if "prompt_tokens" in usage else 0,
)
else:
point.field("completion_tokens", 0)
point.field("prompt_tokens", 0)
return point
def make_rate_point(
deployment: str,
project_id: str,
chat_id: str | None,
user_hash: str,
user_title: str,
timestamp: datetime,
request_body: dict,
):
like = request_body["rate"]
like_count = 1 if like else 0
dislike_count = 1 if not like else 0
point = (
Point("rate_analytics")
.tag("deployment", deployment)
.tag("project_id", project_id)
.tag("title", to_string(user_title))
.tag("response_id", request_body["responseId"])
.tag("user_hash", to_string(user_hash))
.tag("chat_id", to_string(chat_id))
.field("dislike_count", dislike_count)
.field("like_count", like_count)
.time(timestamp)
)
return point
async def parse_usage_per_model(response: dict | None):
if response is None:
return []
statistics = response.get("statistics")
if statistics is None:
return []
if not isinstance(statistics, dict) or "usage_per_model" not in statistics:
return []
usage_per_model = statistics["usage_per_model"]
if not isinstance(usage_per_model, list):
return []
return usage_per_model
async def on_message(
logger: Logger,
influx_writer: Callable[[Point], Awaitable[None]],
deployment: str,
model: str,
project_id: str,
chat_id: str,
upstream_url: str,
user_hash: str,
user_title: str,
timestamp: datetime,
request: dict | None,
response: dict | None,
type: RequestType,
topic_model: TopicModel,
rates_calculator: RatesCalculator,
token_usage: dict | None,
parent_deployment: str | None,
trace: dict | None,
execution_path: list | None,
):
logger.info(f"Chat completion response length {len(response or [])}")
usage_per_model = await parse_usage_per_model(response)
response_usage = None if response is None else response.get("usage")
if token_usage is not None:
point = await make_point(
logger,
deployment,
model,
project_id,
chat_id,
upstream_url,
user_hash,
user_title,
timestamp,
request,
response,
type,
token_usage,
topic_model,
rates_calculator,
parent_deployment,
trace,
execution_path,
)
await influx_writer(point)
elif len(usage_per_model) == 0:
point = await make_point(
logger,
deployment,
model,
project_id,
chat_id,
upstream_url,
user_hash,
user_title,
timestamp,
request,
response,
type,
response_usage,
topic_model,
rates_calculator,
parent_deployment,
trace,
execution_path,
)
await influx_writer(point)
else:
point = await make_point(
logger,
deployment,
model,
project_id,
chat_id,
upstream_url,
user_hash,
user_title,
timestamp,
request,
response,
type,
None,
topic_model,
rates_calculator,
parent_deployment,
trace,
execution_path,
)
await influx_writer(point)
for usage in usage_per_model:
point = await make_point(
logger,
deployment,
usage["model"],
project_id,
None,
None,
user_hash,
user_title,
timestamp,
request,
response,
type,
usage,
topic_model,
rates_calculator,
parent_deployment,
trace,
execution_path,
)
await influx_writer(point)