aidial_adapter_vertexai/utils/concurrency.py (20 lines of code) (raw):
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, List, TypeVar
T = TypeVar("T")
A = TypeVar("A")
_single_thread_async_lock = asyncio.Lock()
async def make_single_thread_async(func: Callable[[A], T], arg: A) -> T:
"""
Function to run a synchronous function in separate thread,
but only one at a time.
"""
async with _single_thread_async_lock:
with ThreadPoolExecutor() as executor:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(executor, func, arg)
async def make_async(func: Callable[[A], T], arg: A) -> T:
with ThreadPoolExecutor() as executor:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(executor, func, arg)
async def gather_sync(sync_tasks: List[Callable[[], T]], **kwargs) -> List[T]:
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(**kwargs) as executor:
tasks = [loop.run_in_executor(executor, task) for task in sync_tasks]
return await asyncio.gather(*tasks)