aidial_assistant/json_stream/chunked_char_stream.py (37 lines of code) (raw):
from abc import ABC
from collections.abc import AsyncIterator
from typing_extensions import override
class ChunkedCharStream(ABC, AsyncIterator[str]):
def __init__(self, source: AsyncIterator[str]):
self._source = source
self._chunk: str = ""
self._next_char_offset: int = 0
self._chunk_position: int = 0
@override
def __aiter__(self) -> AsyncIterator[str]:
return self
@override
async def __anext__(self) -> str:
result = await self.apeek()
self._next_char_offset += 1
return result
async def apeek(self) -> str:
while self._next_char_offset == len(self._chunk):
self._chunk_position += len(self._chunk)
self._chunk = await anext(self._source) # type: ignore
self._next_char_offset = 0
return self._chunk[self._next_char_offset]
async def askip(self):
await anext(self)
@property
def chunk_position(self) -> int:
return self._chunk_position
@property
def char_position(self) -> int:
return self._chunk_position + self._next_char_offset
async def skip_whitespaces(stream: ChunkedCharStream):
while True:
char = await stream.apeek()
if not str.isspace(char):
break
await stream.askip()