aidial_sdk/utils/_content_stream.py (9 lines of code) (raw):

from typing import Protocol class ContentReceiver(Protocol): def append_content(self, content: str) -> None: ... class ContentStream: """ The ContentStream class allows using the receiver in contexts where typing.SupportsWrite[str] is expected. For example: 1. Redirecting print statements: print("Hello, world", file=content_stream) 2. Using with tqdm for progress bars: import tqdm for item in tqdm(items, file=content_stream): process(item) 3. Redirecting logs to the content stream: import logging logging_handler = logging.StreamHandler(stream=content_stream) 4. Writing CSV data: import csv csv.writer(content_stream).writerows(data) """ _receiver: ContentReceiver def __init__(self, receiver: ContentReceiver) -> None: self._receiver = receiver def write(self, s: str) -> None: self._receiver.append_content(s)