Source code for p4p.server.raw


import logging
_log = logging.getLogger(__name__)

from .._p4p import SharedPV as _SharedPV

__all__ = (
    'SharedPV',
    'Handler',
)


class ServOpWrap(object):

    def __init__(self, op, wrap, unwrap):
        self._op, self._wrap, self._unwrap = op, wrap, unwrap

    def value(self):
        V = self._op.value()
        try:
            return self._unwrap(V)
        except: # py3 will chain automatically, py2 won't
            raise ValueError("Unable to unwrap %r with %r"%(V, self._unwrap))

    def done(self, value=None, error=None):
        if value is not None:
            try:
                value = self._wrap(value)
            except:
                raise ValueError("Unable to wrap %r with %r"%(value, self._wrap))
        self._op.done(value, error)

    def __getattr__(self, key):
        return getattr(self._op, key) # dispatch to _p4p.ServerOperation


[docs]class Handler(object): """Skeleton of SharedPV Handler Use of this as a base class is optional. """
[docs] def put(self, pv, op): """ Called each time a client issues a Put operation on this Channel. :param SharedPV pv: The :py:class:`SharedPV` which this Handler is associated with. :param ServerOperation op: The operation being initiated. """ op.done(error='Not supported')
[docs] def rpc(self, pv, op): """ Called each time a client issues a Remote Procedure Call operation on this Channel. :param SharedPV pv: The :py:class:`SharedPV` which this Handler is associated with. :param ServerOperation op: The operation being initiated. """ op.done(error='Not supported')
[docs] def onFirstConnect(self, pv): """ Called when the first Client channel is created. :param SharedPV pv: The :py:class:`SharedPV` which this Handler is associated with. """ pass
[docs] def onLastDisconnect(self, pv): """ Called when the last Client channel is closed. :param SharedPV pv: The :py:class:`SharedPV` which this Handler is associated with. """ pass
[docs]class SharedPV(_SharedPV): """Shared state Process Variable. Callback based implementation. .. note:: if initial=None, the PV is initially **closed** and must be :py:meth:`open()`'d before any access is possible. :param handler: A object which will receive callbacks when eg. a Put operation is requested. May be omitted if the decorator syntax is used. :param Value initial: An initial Value for this PV. If omitted, :py:meth:`open()`s must be called before client access is possible. :param nt: An object with methods wrap() and unwrap(). eg :py:class:`p4p.nt.NTScalar`. :param callable wrap: As an alternative to providing 'nt=', A callable to transform Values passed to open() and post(). :param callable unwrap: As an alternative to providing 'nt=', A callable to transform Values returned Operations in Put/RPC handlers. :param dict options: A dictionary of configuration options. Creating a PV in the open state, with no handler for Put or RPC (attempts will error). :: from p4p.nt import NTScalar pv = SharedPV(nt=NTScalar('d'), value=0.0) # ... later pv.post(1.0) The full form of a handler object is: :: class MyHandler: def put(self, pv, op): pass def rpc(self, pv, op): pass def onFirstConnect(self): # may be omitted pass def onLastDisconnect(self): # may be omitted pass pv = SharedPV(MyHandler()) Alternatively, decorators may be used. :: pv = SharedPV() @pv.put def onPut(pv, op): pass The nt= or wrap= and unwrap= arguments can be used as a convience to allow the open(), post(), and associated Operation.value() to be automatically transform to/from :py:class:`Value` and more convienent Python types. See :ref:`unwrap` """ def __init__(self, handler=None, initial=None, nt=None, wrap=None, unwrap=None, options=None, **kws): self.nt = nt self._handler = handler or self._DummyHandler() self._whandler = self._WrapHandler(self, self._handler) self._wrap = wrap or (nt and nt.wrap) or (lambda x: x) self._unwrap = unwrap or (nt and nt.unwrap) or (lambda x: x) _SharedPV.__init__(self, self._whandler, options) if initial is not None: self.open(initial, nt=nt, wrap=wrap, unwrap=unwrap, **kws)
[docs] def open(self, value, nt=None, wrap=None, unwrap=None, **kws): """Mark the PV as opened an provide its initial value. This initial value is later updated with post(). :param value: A Value, or appropriate object (see nt= and wrap= of the constructor). Any clients which have begun connecting which began connecting while this PV was in the close'd state will complete connecting. Only those fields of the value which are marked as changed will be stored. """ self._wrap = wrap or (nt and nt.wrap) or self._wrap self._unwrap = unwrap or (nt and nt.unwrap) or self._unwrap try: V = self._wrap(value, **kws) except: # py3 will chain automatically, py2 won't raise ValueError("Unable to wrap %r with %r and %r"%(value, self._wrap, kws)) _SharedPV.open(self, V)
[docs] def post(self, value, **kws): """Provide an update to the Value of this PV. :param value: A Value, or appropriate object (see nt= and wrap= of the constructor). Only those fields of the value which are marked as changed will be stored. Any keyword arguments are forwarded to the NT wrap() method (if applicable). Common arguments include: timestamp= , severity= , and message= . """ try: V = self._wrap(value, **kws) except: # py3 will chain automatically, py2 won't raise ValueError("Unable to wrap %r with %r and %r"%(value, self._wrap, kws)) _SharedPV.post(self, V)
[docs] def current(self): V = _SharedPV.current(self) try: return self._unwrap(V) except: # py3 will chain automatically, py2 won't raise ValueError("Unable to unwrap %r with %r"%(V, self._unwrap))
def _exec(self, op, M, *args): # sub-classes will replace this try: M(*args) except Exception as e: if op is not None: op.done(error=str(e)) _log.exception("Unexpected") def _onFirstConnect(self, _junk): pass # see sub-classes. run before user onFirstConnect() def _onLastDisconnect(self, _junk): pass # see sub-classes. run after user onLastDisconnect() class _DummyHandler(object): pass class _WrapHandler(object): "Wrapper around user Handler which logs exceptions" def __init__(self, pv, real): self._pv = pv # this creates a reference cycle, which should be collectable since SharedPV supports GC self._real = real def onFirstConnect(self): self._pv._exec(None, self._pv._onFirstConnect, None) try: # user handler may omit onFirstConnect() M = self._real.onFirstConnect except AttributeError: return self._pv._exec(None, M, self._pv) def onLastDisconnect(self): try: M = self._real.onLastDisconnect except AttributeError: pass else: self._pv._exec(None, M, self._pv) self._pv._exec(None, self._pv._onLastDisconnect, None) def put(self, op): _log.debug('PUT %s %s', self._pv, op) try: self._pv._exec(op, self._real.put, self._pv, ServOpWrap(op, self._pv._wrap, self._pv._unwrap)) except AttributeError: op.done(error="Put not supported") def rpc(self, op): _log.debug('RPC %s %s', self._pv, op) try: self._pv._exec(op, self._real.rpc, self._pv, op) except AttributeError: op.done(error="RPC not supported") @property def onFirstConnect(self): def decorate(fn): self._handler.onFirstConnect = fn return fn return decorate @property def onLastDisconnect(self): def decorate(fn): self._handler.onLastDisconnect = fn return fn return decorate @property def put(self): def decorate(fn): self._handler.put = fn return fn return decorate @property def rpc(self): def decorate(fn): self._handler.rpc = fn return fn return decorate def __repr__(self): if self.isOpen(): return '%s(value=%s)' % (self.__class__.__name__, repr(self.current())) else: return "%s(<closed>)" % (self.__class__.__name__,) __str__ = __repr__