Source code for p4p.client.Qt

import logging
from math import ceil

from qtpy.QtCore import QObject, QCoreApplication, Signal, QEvent

from . import raw
from .raw import Disconnected, RemoteError, Cancelled, Finished
from ..wrapper import Value, Type
from .._p4p import ClientProvider
from .._p4p import (logLevelAll, logLevelTrace, logLevelDebug,
                    logLevelInfo, logLevelWarn, logLevelError,
                    logLevelFatal, logLevelOff)

from .thread import TimeoutError

__all__ = (
    'Context',
    'Value',
    'Type',
    'RemoteError',
    'TimeoutError',
)

_log = logging.getLogger(__name__)

# some pyqt callbacks are delicate, and will SIGSEGV is a python exception is allowed to propagate
def exceptionGuard(fn):
    def wrapper(*args, **kws):
        try:
            fn(*args, **kws)
        except:
            _log.exception('oops')
    return wrapper

class CBEvent(QEvent):
    # allocate an event ID#
    EVENT_TYPE = QEvent.Type(QEvent.registerEventType())

    def __init__(self, result):
        QEvent.__init__(self, self.EVENT_TYPE)
        self.result = result

class Operation(QObject):
    _op = None # _p4p.ClientOperation sub-class
    _result = None
    # receives a Value or an Exception
    result = Signal(object)

    def __init__(self, parent, timeout):
        QObject.__init__(self, parent)
        self._active = self.startTimer(ceil(timeout*1000))

    def close(self):
        self._op.close()

    @exceptionGuard
    def timerEvent(self, evt):
        if self._result is None:
            self._result = TimeoutError(self._op.name if self._op else 'Timeout')
            self.result.emit(self._result)

    def _resultcb(self, value):
        # called on PVA worker thread
        QCoreApplication.postEvent(self, CBEvent(value))

    @exceptionGuard
    def customEvent(self, evt):
        if self._active is not None:
            self.killTimer(self._active)
            self._active = None

        if self._result is None:
            self._result = evt.result
            self.result.emit(self._result)

class MCache(QObject):
    _op = None # _p4p.ClientMonitor sub-class
    # receives a Value or an Exception
    update = Signal(object)

    def __init__(self, name, parent):
        QObject.__init__(self, parent)

        self.name = name
        self._active = None
        self._holdoff = 10*1000 # acts as low limit on high limit
        self._last = Disconnected()

    def _add(self, slot, limitHz=10.0):
        holdoff = int(max(0.1, 1.0/limitHz)*1000)

        # Rate limiting for multiple consumers is hard.
        # We throttle to the highest consumer rate (shortest holdoff).
        if self._holdoff is None or self._holdoff > holdoff:
            self._holdoff = holdoff
            if self._active is not None:
                # restart timer
                self.killTimer(self._active)
                self._active = self.startTimer(ceil(self._holdoff))

        # TODO: re-adjust on slot disconnect?

        # schedule to receive initial update later (avoids recursion)
        QCoreApplication.postEvent(self, CBEvent(slot))

    def _event(self):
        _log.debug('event1 %s', self.name)
        # called on PVA worker thread

        QCoreApplication.postEvent(self, CBEvent(None))

    @exceptionGuard
    def customEvent(self, evt):
        E = evt.result
        _log.debug('event2 %s %s', self.name, E)
        # E will be one of:
        #   None - FIFO not empty (call pop())
        #   RemoteError
        #   Disconnected
        #   some method, adding new subscriber

        if E is None:
            if self._active is None:
                self._active = self.startTimer(ceil(self._holdoff))
                _log.debug('Start timer with %s', self._holdoff)
            return

        else:
            E(self._last)
            self.update.connect(E)

    @exceptionGuard
    def timerEvent(self, evt):
        V = self._op.pop()
        _log.debug('tick %s %s', self.name, V)

        if V is not None:
            self._last = V
            self.update.emit(V)

        elif self._active is not None:
            self.killTimer(self._active)
            self._active = None

[docs] class Context(raw.Context): """Context(provider, conf=None, useenv=True) PyQt aware Context. Methods in the class give notification of completion/update through Qt signals. :param str provider: A Provider name. Try "pva" or run :py:meth:`Context.providers` for a complete list. :param dict conf: Configuration to pass to provider. Depends on provider selected. :param bool useenv: Allow the provider to use configuration from the process environment. :param dict nt: Controls :ref:`unwrap`. None uses defaults. Set False to disable :param dict unwrap: Legacy :ref:`unwrap`. :param parent QObject: Parent for QObjects created through this Context. """ def __init__(self, provider, parent=None, **kws): super(Context, self).__init__(provider, **kws) self._parent = QObject(parent) self._mcache = {} self._puts = {}
[docs] def disconnect(self, name=None): if name is None: self._mcache = {} self._puts = {} else: self._mcache.pop(name) self._puts.pop(name) super(Context, self).disconnect(name)
# get() omitted (why would a gui want to do this?)
[docs] def put(self, name, value, slot=None, request=None, timeout=5.0, process=None, wait=None, get=True): """Write a new value to a single PV. Returns an Operation instance which will emit either a success and error signal. If the slot argument is provided, this will be connected in a race free way. The slot function will receive a python object which is either None (Success) or an Exception. Note that the returned Operation will also be stored internally by the Context. So the caller is not required to store it as well. This internal storage will only keep the most recent put() Operation for each PV name. A previous incomplete put() will be cancelled if/when put() is called again. :param name: A single name string or list of name strings :param values: A single value, a list of values, a dict, a `Value`. May be modified by the constructor nt= argument. :param request: A :py:class:`p4p.Value` or string to qualify this request, or None to use a default. :param slot: A callable object, such as a bound method, which can be passed to QObject.connect() :param float timeout: Operation timeout in seconds :param str process: Control remote processing. May be 'true', 'false', 'passive', or None. :param bool wait: Wait for all server processing to complete. :param bool get: Whether to do a Get before the Put. If True then the value passed to the builder callable will be initialized with recent PV values. eg. use this with NTEnum to find the enumeration list. """ if request and (process or wait is not None): raise ValueError("request= is mutually exclusive to process= or wait=") elif process or wait is not None: request = 'field()record[block=%s,process=%s]' % ('true' if wait else 'false', process or 'passive') prev = self._puts.get(name) if prev is not None: # issuing new Put implicitly cancels any pending/queued Put prev.close() self._puts[name] = op = Operation(self._parent, timeout) if slot is not None: op.result.connect(slot) op._op = super(Context, self).put(name, op._resultcb, builder=value, request=request, get=get) return op
[docs] def rpc(self, name, value, slot, request=None, timeout=5.0, throw=True): """Perform a Remote Procedure Call (RPC) operation Returns an Operation instance which will emit either a success and error signal to the provided slot. This Operation instance must be stored by the caller or it will be implicitly cancelled. The slot function will receive a python object which is either a Value or an Exception. :param str name: PV name string :param Value value: Arguments. Must be Value instance :param slot: A callable object, such as a bound method, which can be passed to QObject.connect() :param request: A :py:class:`p4p.Value` or string to qualify this request, or None to use a default. :param float timeout: Operation timeout in seconds :param bool throw: When true, operation error throws an exception. If False then the Exception is returned instead of the Value :returns: An Operation """ op = Operation(self._parent, timeout) op.result.connect(slot) op._op = super(Context, self).rpc(name, op._resultcb, value, request=request) return op
[docs] def monitor(self, name, slot, request=None, limitHz=10.0): """Request subscription to named PV Request notification to the provided slot when a PV is updated. Subscriptions are managed by an internal cache, so than multiple calls to monitor() with the same PV name may be satisfied through a single subscription. limitHz, which must be provided, puts an upper limit on the rate at which the update signal will be emitted. Some update will be dropped in the PV updates more frequently. Reduction is done by discarding the second to last update. eg. It is guaranteed that the last update (present value) in the burst will be delivered. :param str name: PV name string :param callable cb: Processing callback :param slot: A callable object, such as a bound method, which can be passed to QObject.connect() :param request: A :py:class:`p4p.Value` or string to qualify this request, or None to use a default. :param limitHz float: Maximum rate at which signals will be emitted. In signals per second. :returns: a :py:class:`MCache` instance """ _log.debug('Subscribe to %s with %s', name, request) if request is not None: raise NotImplementedError("monitor() with request= not yet implemented") #if isinstance(request, (str, bytes)): # request = ClientProvider.makeRequest(request) #if isinstance(request, Value): # request = serialize(request) key = (name, request) # (str, bytes|None) try: op = self._mcache[key] except KeyError: self._mcache[key] = op = MCache(name, self._parent) op._op = super(Context, self).monitor(name, op._event, request) op._add(slot, limitHz=limitHz) return op