Source code for p4p.client.thread


from __future__ import print_function

import logging
import sys
_log = logging.getLogger(__name__)

try:
    from itertools import izip
except ImportError:
    izip = zip
from functools import partial
import json
import threading

try:
    from Queue import Queue, Full, Empty
except ImportError:
    from queue import Queue, Full, Empty

from . import raw
from .raw import Disconnected, RemoteError, Cancelled, Finished
from ..util import _defaultWorkQueue
from ..wrapper import Value, Type
from ..rpc import WorkQueue
from .._p4p import (logLevelAll, logLevelTrace, logLevelDebug,
                    logLevelInfo, logLevelWarn, logLevelError,
                    logLevelFatal, logLevelOff)

__all__ = [
    'Context',
    'Value',
    'Type',
    'RemoteError',
    'TimeoutError',
]

if sys.version_info >= (3, 0):
    unicode = str
    TimeoutError = TimeoutError

else:
    class TimeoutError(RuntimeError):
        "Local timeout has expired"
        def __init__(self):
            RuntimeError.__init__(self, 'Timeout')


[docs]class Subscription(object): """An active subscription. Returned by `Context.monitor`. """ def __init__(self, ctxt, name, cb, notify_disconnect=False, queue=None): self.name, self._S, self._cb = name, None, cb self._notify_disconnect = notify_disconnect self._Q = queue or ctxt._Q or _defaultWorkQueue() if notify_disconnect: # all subscriptions are inittially disconnected self._Q.push_wait(partial(cb, Disconnected()))
[docs] def close(self): """Close subscription. """ if self._S is not None: # after .close() self._event should never be called self._S.close() self._S = None
def __enter__(self): return self def __exit__(self, A, B, C): self.close() @property def done(self): 'Has all data for this subscription been received?' return self._S is None or self._S.done() @property def empty(self): 'Is data pending in event queue?' return self._S is None or self._S.empty() def _event(self): try: assert self._S is not None, self._S _log.debug('Subscription wakeup for %s', self.name) self._Q.push(self._handle) except: _log.exception("Lost Subscription update for %s", self.name) def _handle(self): try: S = self._S if S is None: # already close()'d return for n in range(4): E = S.pop() if E is None: break # monitor queue empty elif isinstance(E, Exception): _log.debug('Subscription notify for %s with %s', self.name, E) if self._notify_disconnect: self._cb(E) elif isinstance(E, RemoteError): _log.error("Subscription Error %s", E) if isinstance(E, Finished): _log.debug('Subscription complete %s', self.name) self._S = None S.close() else: self._cb(E) if E is not None: # removed 4 elements without emptying queue # re-schedule to mux with others self._Q.push(self._handle) except: _log.exception("Error processing Subscription event for %s", self.name) if self._S is not None: self._S.close() self._S = None
[docs]class Context(raw.Context): """Context(provider, conf=None, useenv=True) :param str provider: A Provider name. Try "pva" or run :py:meth:`Context.providers` for a complete list. :param dict conf: Configuration to pass to provider. Depends on provider selected. :param bool useenv: Allow the provider to use configuration from the process environment. :param int workers: Size of thread pool in which monitor callbacks are run. Default is 4 :param int maxsize: Size of internal work queue used for monitor callbacks. Default is unlimited :param dict nt: Controls :ref:`unwrap`. None uses defaults. Set False to disable :param dict unwrap: Legacy :ref:`unwrap`. :param WorkQueue queue: A work queue through which monitor callbacks are dispatched. The methods of this Context will block the calling thread until completion or timeout The meaning, and allowed keys, of the configuration dictionary depend on the provider. conf= will override values taken from the process environment. Pass useenv=False to ensure that environment variables are completely ignored. The "pva" provider understands the following keys: * EPICS_PVA_ADDR_LIST * EPICS_PVA_AUTO_ADDR_LIST * EPICS_PVA_SERVER_PORT * EPICS_PVA_BROADCAST_PORT """ Value = Value name = '' "Provider name string" def __init__(self, provider='pva', conf=None, useenv=True, nt=None, unwrap=None, maxsize=0, queue=None): self._channel_lock = threading.Lock() super(Context, self).__init__(provider, conf=conf, useenv=useenv, nt=nt, unwrap=unwrap) # lazy start threaded WorkQueue self._Q = self._T = None self._Q = queue def _channel(self, name): with self._channel_lock: return super(Context, self)._channel(name)
[docs] def disconnect(self, *args, **kws): with self._channel_lock: super(Context, self).disconnect(*args, **kws)
def _queue(self): if self._Q is None: Q = WorkQueue(maxsize=self._Qmax) Ts = [] for n in range(self._Wcnt): T = threading.Thread(name='p4p Context worker', target=Q.handle) T.daemon = True Ts.append(T) for T in Ts: T.start() _log.debug('Started %d Context worker', self._Wcnt) self._Q, self._T = Q, Ts return self._Q
[docs] def close(self): """Force close all Channels and cancel all Operations """ if self._Q is not None: for T in self._T: self._Q.interrupt() for n, T in enumerate(self._T): _log.debug('Join Context worker %d', n) T.join() _log.debug('Joined Context workers') self._Q, self._T = None, None if not Context: # Python 2.7 GC removes Context from scope during destruction of objects. return super(Context, self).close()
[docs] def get(self, name, request=None, timeout=5.0, throw=True): """Fetch current value of some number of PVs. :param name: A single name string or list of name strings :param request: A :py:class:`p4p.Value` or string to qualify this request, or None to use a default. :param float timeout: Operation timeout in seconds :param bool throw: When true, operation error throws an exception. If False then the Exception is returned instead of the Value :returns: A p4p.Value or Exception, or list of same. Subject to :py:ref:`unwrap`. When invoked with a single name then returns is a single value. When invoked with a list of name, then returns a list of values >>> ctxt = Context('pva') >>> V = ctxt.get('pv:name') >>> A, B = ctxt.get(['pv:1', 'pv:2']) >>> """ singlepv = isinstance(name, (bytes, unicode)) if singlepv: name = [name] request = [request] elif request is None: request = [None] * len(name) assert len(name) == len(request), (name, request) # use Queue instead of Event to allow KeyboardInterrupt done = Queue() result = [TimeoutError()] * len(name) ops = [None] * len(name) raw_get = super(Context, self).get try: for i, (N, req) in enumerate(izip(name, request)): def cb(value, i=i): try: if not isinstance(value, Cancelled): done.put_nowait((value, i)) _log.debug('get %s Q %r', N, value) except: _log.exception("Error queuing get result %s", value) _log.debug('get %s w/ %s', N, req) ops[i] = raw_get(N, cb, request=req) for _n in range(len(name)): try: value, i = done.get(timeout=timeout) except Empty: if throw: _log.debug('timeout %s after %s', name[i], timeout) raise TimeoutError() break _log.debug('got %s %r', name[i], value) if throw and isinstance(value, Exception): raise value result[i] = value finally: [op and op.close() for op in ops] if singlepv: return result[0] else: return result
[docs] def put(self, name, values, request=None, timeout=5.0, throw=True, process=None, wait=None, get=True): """Write a new value of some number of PVs. :param name: A single name string or list of name strings :param values: A single value, a list of values, a dict, a `Value`. May be modified by the constructor nt= argument. :param request: A :py:class:`p4p.Value` or string to qualify this request, or None to use a default. :param float timeout: Operation timeout in seconds :param bool throw: When true, operation error throws an exception. If False then the Exception is returned instead of the Value :param str process: Control remote processing. May be 'true', 'false', 'passive', or None. :param bool wait: Wait for all server processing to complete. :param bool get: Whether to do a Get before the Put. If True then the value passed to the builder callable will be initialized with recent PV values. eg. use this with NTEnum to find the enumeration list. :returns: A None or Exception, or list of same When invoked with a single name then returns is a single value. When invoked with a list of name, then returns a list of values If 'wait' or 'process' is specified, then 'request' must be omitted or None. >>> ctxt = Context('pva') >>> ctxt.put('pv:name', 5.0) >>> ctxt.put(['pv:1', 'pv:2'], [1.0, 2.0]) >>> ctxt.put('pv:name', {'value':5}) >>> The provided value(s) will be automatically coerced to the target type. If this is not possible then an Exception is raised/returned. Unless the provided value is a dict, it is assumed to be a plain value and an attempt is made to store it in '.value' field. """ singlepv = isinstance(name, (bytes, unicode)) if request and (process or wait is not None): raise ValueError("request= is mutually exclusive to process= or wait=") elif process or wait is not None: request = 'field()record[block=%s,process=%s]' % ('true' if wait else 'false', process or 'passive') if not singlepv: request = [request]*len(name) if singlepv: name = [name] values = [values] request = [request] elif request is None: request = [None] * len(name) assert len(name) == len(request), (name, request) assert len(name) == len(values), (name, values) # use Queue instead of Event to allow KeyboardInterrupt done = Queue() result = [TimeoutError()] * len(name) ops = [None] * len(name) raw_put = super(Context, self).put try: for i, (n, value, req) in enumerate(izip(name, values, request)): if isinstance(value, (bytes, unicode)) and value[:1] == '{': try: value = json.loads(value) except ValueError: raise ValueError("Unable to interpret '%s' as json" % value) # completion callback def cb(value, i=i): try: done.put_nowait((value, i)) except: _log.exception("Error queuing put result %r", value) ops[i] = raw_put(n, cb, builder=value, request=req, get=get) for _n in range(len(name)): try: value, i = done.get(timeout=timeout) except Empty: if throw: raise TimeoutError() break if throw and isinstance(value, Exception): raise value result[i] = value if singlepv: return result[0] else: return result finally: [op and op.close() for op in ops]
[docs] def rpc(self, name, value, request=None, timeout=5.0, throw=True): """Perform a Remote Procedure Call (RPC) operation :param str name: PV name string :param Value value: Arguments. Must be Value instance :param request: A :py:class:`p4p.Value` or string to qualify this request, or None to use a default. :param float timeout: Operation timeout in seconds :param bool throw: When true, operation error throws an exception. If False then the Exception is returned instead of the Value :returns: A Value or Exception. Subject to :py:ref:`unwrap`. >>> ctxt = Context('pva') >>> ctxt.rpc('pv:name:add', {'A':5, 'B'; 6}) >>> The provided value(s) will be automatically coerced to the target type. If this is not possible then an Exception is raised/returned. Unless the provided value is a dict, it is assumed to be a plain value and an attempt is made to store it in '.value' field. """ done = Queue() op = super(Context, self).rpc(name, done.put_nowait, value, request=request) try: try: result = done.get(timeout=timeout) except Empty: result = TimeoutError() if throw and isinstance(result, Exception): raise result return result except: op.close() raise
[docs] def monitor(self, name, cb, request=None, notify_disconnect=False, queue=None): """Create a subscription. :param str name: PV name string :param callable cb: Processing callback :param request: A :py:class:`p4p.Value` or string to qualify this request, or None to use a default. :param bool notify_disconnect: In additional to Values, the callback may also be call with instances of Exception. Specifically: Disconnected , RemoteError, or Cancelled :param WorkQueue queue: A work queue through which monitor callbacks are dispatched. :returns: a :py:class:`Subscription` instance The callable will be invoked with one argument which is either. * A p4p.Value (Subject to :py:ref:`unwrap`) * A sub-class of Exception (Disconnected , RemoteError, or Cancelled) """ R = Subscription(self, name, cb, notify_disconnect=notify_disconnect, queue=queue) R._S = super(Context, self).monitor(name, R._event, request) return R