Source code for p4p.server.thread


import logging
_log = logging.getLogger(__name__)

from functools import partial
from threading import Event

from ..util import _defaultWorkQueue
from .raw import SharedPV as _SharedPV, Handler
from ..client.raw import RemoteError

__all__ = (
    'SharedPV',
    'Handler',
)

def _on_queue(op, M, *args):
    try:
        M(*args)
        return
    except RemoteError as e:
        err = e
    except Exception as e:
        _log.exception("Unexpected")
        err = e
    if op is not None:
        op.done(error=str(err))


[docs]class SharedPV(_SharedPV): """Shared state Process Variable. Callback based implementation. .. note:: if initial=None, the PV is initially **closed** and must be :py:meth:`open()`'d before any access is possible. :param handler: A object which will receive callbacks when eg. a Put operation is requested. May be omitted if the decorator syntax is used. :param Value initial: An initial Value for this PV. If omitted, :py:meth:`open` s must be called before client access is possible. :param nt: An object with methods wrap() and unwrap(). eg :py:class:`p4p.nt.NTScalar`. :param callable wrap: As an alternative to providing 'nt=', A callable to transform Values passed to open() and post(). :param callable unwrap: As an alternative to providing 'nt=', A callable to transform Values returned Operations in Put/RPC handlers. :param WorkQueue queue: The threaded :py:class:`WorkQueue` on which handlers will be run. :param dict options: A dictionary of configuration options. Creating a PV in the open state, with no handler for Put or RPC (attempts will error). :: from p4p.nt import NTScalar pv = SharedPV(nt=NTScalar('d'), value=0.0) # ... later pv.post(1.0) The full form of a handler object is: :: class MyHandler: def put(self, op): pass def rpc(self, op): pass def onFirstConnect(self): # may be omitted pass def onLastDisconnect(self): # may be omitted pass pv = SharedPV(MyHandler()) Alternatively, decorators may be used. :: pv = SharedPV() @pv.put def onPut(pv, op): pass """ def __init__(self, queue=None, **kws): _SharedPV.__init__(self, **kws) self._queue = queue or _defaultWorkQueue() self._disconnected = Event() self._disconnected.set() def _exec(self, op, M, *args): self._queue.push(partial(_on_queue, op, M, *args)) def _onFirstConnect(self, _junk): self._disconnected.clear() def _onLastDisconnect(self, _junk): self._disconnected.set()
[docs] def close(self, destroy=False, sync=False, timeout=None): """Close PV, disconnecting any clients. :param bool destroy: Indicate "permanent" closure. Current clients will not see subsequent open(). :param bool sync: When block until any pending onLastDisconnect() is delivered (timeout applies). :param float timeout: Applies only when sync=True. None for no timeout, otherwise a non-negative floating point value. close() with destory=True or sync=True will not prevent clients from re-connecting. New clients may prevent sync=True from succeeding. Prevent reconnection by __first__ stopping the Server, removing with :py:meth:`StaticProvider.remove()`, or preventing a :py:class:`DynamicProvider` from making new channels to this SharedPV. """ _SharedPV.close(self, destroy) if sync: # TODO: still not syncing PVA workers... self._queue.sync() self._disconnected.wait()