aidial_adapter_bedrock/utils/stream.py (61 lines of code) (raw):

from typing import AsyncIterator, List import tests.utils.string as string async def lstrip(stream: AsyncIterator[str]) -> AsyncIterator[str]: start = True async for chunk in stream: if start: chunk = chunk.lstrip() if chunk != "": start = False yield chunk else: yield chunk async def remove_prefix( stream: AsyncIterator[str], prefix: str ) -> AsyncIterator[str]: acc = "" start = True async for chunk in stream: if start: acc += chunk if len(acc) >= len(prefix): yield string.remove_prefix(prefix, acc) start = False else: yield chunk if start: yield acc async def stop_at( stream: AsyncIterator[str], stop_sequences: List[str] ) -> AsyncIterator[str]: if len(stop_sequences) == 0: async for item in stream: yield item return buffer_len = max(map(len, stop_sequences)) - 1 hold = "" async for chunk in stream: hold += chunk min_index = len(hold) for stop_sequence in stop_sequences: if stop_sequence in hold: min_index = min(min_index, hold.index(stop_sequence)) if min_index < len(hold): commit = hold[:min_index] if commit: yield commit return commit, hold = hold[:-buffer_len], hold[-buffer_len:] if commit: yield commit if hold: yield hold async def ensure_not_empty( gen: AsyncIterator[str], default: str ) -> AsyncIterator[str]: all_chunks_are_empty = True async for chunk in gen: all_chunks_are_empty = all_chunks_are_empty and chunk == "" yield chunk if all_chunks_are_empty: yield default