aidial_adapter_dial/utils/streaming.py (17 lines of code) (raw):
from typing import AsyncIterator, Awaitable, Callable, Optional, TypeVar
_T = TypeVar("_T")
_V = TypeVar("_V")
async def map_stream(
func: Callable[[_T], Optional[_V]], iterator: AsyncIterator[_T]
) -> AsyncIterator[_V]:
async for item in iterator:
new_item = func(item)
if new_item is not None:
yield new_item
async def amap_stream(
func: Callable[[_T], Awaitable[Optional[_V]]], iterator: AsyncIterator[_T]
) -> AsyncIterator[_V]:
async for item in iterator:
new_item = await func(item)
if new_item is not None:
yield new_item