Source code for inginious.common.asyncio_utils

# -*- coding: utf-8 -*-
#
# This file is part of INGInious. See the LICENSE and the COPYRIGHTS files for
# more information about the licensing of this file.

"""
    Utilities for asyncio
"""

import asyncio
import threading
from functools import wraps, partial


[docs]class AsyncIteratorWrapper(object): """ A wrapper that converts old-style-generators to async generators using run_in_executor """ def __init__(self, obj): self._it = obj self._loop = asyncio.get_event_loop() self._queue = asyncio.Queue() self._last_item = object() self._thread = threading.Thread(target=self._unroll) self._thread.daemon = True self._thread.start() def __aiter__(self): return self async def __anext__(self): value = await self._queue.get() if value == self._last_item: raise StopAsyncIteration return value async def _add_to_queue(self, o): await self._queue.put(o) def _unroll(self): try: for i in self._it: self._loop.call_soon_threadsafe(asyncio.ensure_future, self._add_to_queue(i)) except Exception: pass self._loop.call_soon_threadsafe(asyncio.ensure_future, self._add_to_queue(self._last_item))
[docs]class AsyncProxy(object): """ An asyncio proxy for modules and classes """ def __init__(self, module, loop=None, executor=None): self._module = module self._loop = loop or asyncio.get_event_loop() self._executor = executor @property def sync(self): """ Return the original sync module/class """ return self._module def __getattr__(self, name): function = getattr(self._module, name) if not callable(function): return AsyncProxy(function) @wraps(function) async def _inner(*args, **kwargs): f = partial(function, *args, **kwargs) return await self._loop.run_in_executor(self._executor, f) return _inner