aidial_adapter_bedrock/utils/concurrency.py (28 lines of code) (raw):
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import (
AsyncIterator,
Callable,
Iterator,
Optional,
Tuple,
TypeVar,
cast,
)
T = TypeVar("T")
async def make_async(func: Callable[[], T]) -> T:
with ThreadPoolExecutor(max_workers=1) as executor:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(executor, func)
async def to_async_iterator(iter: Iterator[T]) -> AsyncIterator[T]:
def _next() -> Tuple[bool, Optional[T]]:
try:
return False, next(iter)
except StopIteration:
return True, None
while True:
is_end, item = await make_async(lambda: _next())
if is_end:
break
else:
yield cast(T, item)