aidial_analytics_realtime/influx_writer.py (17 lines of code) (raw):
import os
from typing import Awaitable, Callable, Tuple
from influxdb_client import Point
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
InfluxWriterAsync = Callable[[Point], Awaitable[None]]
def create_influx_writer() -> Tuple[InfluxDBClientAsync, InfluxWriterAsync]:
influx_url = os.environ["INFLUX_URL"]
influx_api_token = os.environ.get("INFLUX_API_TOKEN")
influx_org = os.environ["INFLUX_ORG"]
influx_bucket = os.environ["INFLUX_BUCKET"]
client = InfluxDBClientAsync(
url=influx_url, token=influx_api_token, org=influx_org
)
influx_write_api = client.write_api()
async def influx_writer_impl(record: Point):
await influx_write_api.write(bucket=influx_bucket, record=record)
return client, influx_writer_impl