Source code for p4p.server.asyncio


import logging

from functools import partial

import asyncio

from .raw import SharedPV as _SharedPV, Handler
from ..client.thread import RemoteError
from ..client.asyncio import get_running_loop, create_task, all_tasks

__all__ = (
    'SharedPV',
        'Handler',
)

_log = logging.getLogger(__name__)

def _log_err(V):
    if isinstance(V, Exception):
        _log.error("Unhandled from SharedPV handler: %s", V)
        # TODO: figure out how to show stack trace...
        # until then, propagate in the hope that someone else will
    return V


def _handle(pv, op, M, args): # callback in asyncio loop
    try:
        _log.debug('SERVER HANDLE %s %s %r', op, M, args)
        maybeco = M(*args)
        if asyncio.iscoroutine(maybeco):
            _log.debug('SERVER SCHEDULE %s', maybeco)
            task = create_task(maybeco)

            # we have no good place to join async put()/rpc() handler results
            # other than SharedPV.close(sync=True) which is both optional,
            # and potentially far in the future.  So we log and otherwise
            # discard the result.
            task.add_done_callback(_log_err)
            task._SharedPV = pv # mark so _wait_closed() can distinguish
        return # caller is responsible for op.done()
    except RemoteError as e:
        err = e
    except Exception as e:
        _log.exception("Unexpected")
        err = e
    if op is not None:
        op.done(error=str(err))


[docs]class SharedPV(_SharedPV): def __init__(self, handler=None, **kws): self.loop = get_running_loop() _SharedPV.__init__(self, handler=handler, **kws) self._disconnected = asyncio.Event() self._disconnected.set() def _exec(self, op, M, *args): # note than M may be _onFirstConnect or _onLastDisconnect self.loop.call_soon_threadsafe(partial(_handle, self, op, M, args)) def _onFirstConnect(self, _junk): self._disconnected.clear() def _onLastDisconnect(self, _junk): self._disconnected.set() async def _wait_closed(self): """Wait until any in-progress asynchronous put()/rpc() handler tasks have completed. """ _log.debug("Synchronizing %r", self) def _peak_done(F, V): F.set_result(V) return V Ts = [] for t in all_tasks(): if getattr(t, '_SharedPV', None) is not self: continue F = asyncio.Future() t.add_done_callback(partial(_peak_done, F)) Ts.append(F) await asyncio.gather(*Ts, return_exceptions=True) # ignore any returned exceptions as they have already been logged _log.debug("Synchronized %r", self) # wait for Disconnect notification as well await self._disconnected.wait() def close(self, destroy=False, sync=False): """Close PV, disconnecting any clients. :param bool destroy: Indicate "permanent" closure. Current clients will not see subsequent open(). :param bool sync: When block until any pending onLastDisconnect() is delivered (timeout applies). :param float timeout: Applies only when sync=True. None for no timeout, otherwise a non-negative floating point value. close() with destory=True or sync=True will not prevent clients from re-connecting. New clients may prevent sync=True from succeeding. Prevent reconnection by __first__ stopping the Server, removing with :py:method:`StaticProvider.remove()`, or preventing a :py:class:`DynamicProvider` from making new channels to this SharedPV. """ _SharedPV.close(self, destroy) if sync: return self._wait_closed()