Source code for p4p.server.asyncio
import logging
import warnings
_log = logging.getLogger(__name__)
from functools import partial
from weakref import WeakSet
import asyncio
from .raw import SharedPV as _SharedPV, Handler
from ..client.raw import LazyRepr
from ..client.thread import RemoteError
__all__ = (
'SharedPV',
'Handler',
)
@asyncio.coroutine
def _sync(loop):
# wait until any pending callbacks are run
evt = asyncio.Event(loop=loop)
loop.call_soon(evt.set)
yield from evt.wait()
evt.clear() # reuse
wait4 = set(loop._SharedPV_handlers) # snapshot of in-progress
# like asyncio.wait() but non-invasive if further callbacks are added.
# eg. like overlapping calls of _sync()
# We're abusing the callback chain to avoid creating an Event for
# each inprogress Future on the assumption that calls to _sync()
# are relatively rare.
cnt = len(wait4)
fut = asyncio.Future(loop=loop)
def _done(V):
nonlocal cnt
cnt -= 1
if cnt==0 and not fut.cancelled():
fut.set_result(None)
return V # pass result along
if cnt==0:
fut.set_result(None)
else:
[W.add_done_callback(_done) for W in wait4]
yield from fut
def _log_err(V):
if isinstance(V, Exception):
_log.error("Unhandled from SharedPV handler: %s", V)
# TODO: figure out how to show stack trace...
# until then, propagate in the hope that someone else will
return V
def _handle(loop, op, M, args):
try:
_log.debug('SERVER HANDLE %s %s %s', op, M, LazyRepr(args))
maybeco = M(*args)
if asyncio.iscoroutine(maybeco):
_log.debug('SERVER SCHEDULE %s', maybeco)
task = loop.create_task(maybeco)
task.add_done_callback(_log_err)
task._log_destroy_pending = False # hack as we don't currently provide a way to join
loop._SharedPV_handlers.add(task) # track in-progress
return
except RemoteError as e:
err = e
except Exception as e:
_log.exception("Unexpected")
err = e
if op is not None:
op.done(error=str(err))
[docs]class SharedPV(_SharedPV):
def __init__(self, handler=None, loop=None, **kws):
self.loop = loop or asyncio.get_event_loop()
_SharedPV.__init__(self, handler=handler, **kws)
self._handler.loop = self.loop
self._disconnected = asyncio.Event(loop=self.loop)
self._disconnected.set()
if not hasattr(self.loop, '_SharedPV_handlers'):
self.loop._SharedPV_handlers = WeakSet() # holds our in-progress Futures
def _exec(self, op, M, *args):
self.loop.call_soon_threadsafe(partial(_handle, self.loop, op, M, args))
# 3.5 adds asyncio.run_coroutine_threadsafe()
def _onFirstConnect(self, _junk):
self._disconnected.clear()
def _onLastDisconnect(self, _junk):
self._disconnected.set()
@asyncio.coroutine
def _wait_closed(self):
yield from _sync(self.loop)
yield from self._disconnected.wait()
def close(self, destroy=False, sync=False):
"""Close PV, disconnecting any clients.
:param bool destroy: Indicate "permanent" closure. Current clients will not see subsequent open().
:param bool sync: When block until any pending onLastDisconnect() is delivered (timeout applies).
:param float timeout: Applies only when sync=True. None for no timeout, otherwise a non-negative floating point value.
close() with destory=True or sync=True will not prevent clients from re-connecting.
New clients may prevent sync=True from succeeding.
Prevent reconnection by __first__ stopping the Server, removing with :py:method:`StaticProvider.remove()`,
or preventing a :py:class:`DynamicProvider` from making new channels to this SharedPV.
"""
_SharedPV.close(self, destroy)
if sync:
return self._wait_closed()