Source code for p4p.client.raw


from __future__ import print_function

import logging
_log = logging.getLogger(__name__)

import warnings
import sys

try:
    from Queue import Queue, Full, Empty
except ImportError:
    from queue import Queue, Full, Empty

from .. import _p4p
from .._p4p import Cancelled, Disconnected, Finished, RemoteError

from ..wrapper import Value, Type
from ..nt import buildNT

if sys.version_info >= (3, 0):
    unicode = str

__all__ = (
    'Subscription',
    'Context',
    'RemoteError',
)


def unwrapHandler(handler, nt):
    """Wrap get/rpc handler to unwrap Value
    """
    def dounwrap(code, msg, val, handler=handler):
        _log.debug("Handler (%s, %s, %r) -> %s", code, msg, val, handler)
        try:
            if code == 0:
                handler(RemoteError(msg))
            elif code == 1:
                handler(Cancelled())
            elif code == 2: # exception during builder callback
                A, B, C = val
                if unicode is str:
                    E = A(B).with_traceback(C) # py 3
                else:
                    E = A(B) # py 2 (bye bye traceback...)
                handler(E)
            else:
                if val is not None:
                    val = nt.unwrap(val)
                handler(val)
        except:
            _log.exception("Exception in Operation handler")
    return dounwrap


def monHandler(handler):
    def cb(handler=handler):
        _log.debug("Update %s", handler)
        try:
            handler()
        except:
            _log.exception("Exception in Monitor handler")
    return cb


def defaultBuilder(value, nt):
    """Reasonably sensible default handling of put builder
    """
    if callable(value):
        return value

    def builder(V):
        if isinstance(value, Value):
            V[None] = value
        elif isinstance(value, dict):
            for k, v in value.items():
                V[k] = v
        else:
            nt.assign(V, value)
    return builder


def wrapRequest(request):
    if request is None or isinstance(request, Value):
        return request
    return Context.makeRequest(request)


[docs]class Subscription(_p4p.ClientMonitor): """Interface to monitor subscription FIFO Use method poll() to try to pop an item from the FIFO. None indicates the FIFO is empty, must wait for another Data event before calling poll() again. complete()==True after poll()==False indicates that no more updates will ever be forthcoming. This is normal (not error) completion. cancel() aborts the subscription. """ def __init__(self, context, name, nt, **kws): _log.debug("Subscription(%s)", kws) super(Subscription, self).__init__(context, name, **kws) self.context = context self._nt = nt self.done = False def pop(self): val = super(Subscription, self).pop() assert val is None or isinstance(val, (Value, Exception)), val if isinstance(val, Value): val = self._nt.unwrap(val) elif isinstance(val, Finished): self.done = True _log.debug("poll() -> %r", val) return val def complete(self): return self.done def __enter__(self): return self def __exit__(self, A, B, C): self.close() if unicode is str: def __del__(self): self.close()
[docs]class Context(object): """ :param str provider: A Provider name. Try "pva" or run :py:meth:`Context.providers` for a complete list. :param conf dict: Configuration to pass to provider. Depends on provider selected. :param bool useenv: Allow the provider to use configuration from the process environment. :param dict nt: Controls :ref:`unwrap`. None uses defaults. Set False to disable :param dict unwrap: Legacy :ref:`unwrap`. """ def __init__(self, provider='pva', conf=None, useenv=None, unwrap=None, nt=None, **kws): self.name = provider super(Context, self).__init__(**kws) self._nt = buildNT(nt, unwrap) self._ctxt = None self._ctxt = _ClientProvider(provider, conf=conf, useenv=useenv) self.conf = self._ctxt.conf self.hurryUp = self._ctxt.hurryUp makeRequest = _p4p.ClientProvider.makeRequest def close(self): if self._ctxt is None: return self._ctxt.close() self._ctxt = None def __enter__(self): return self def __exit__(self, A, B, C): self.close()
[docs] def disconnect(self, name=None): """Clear internal Channel cache, allowing currently unused channels to be implictly closed. :param str name: None, to clear the entire cache, or a name string to clear only a certain entry. """ self._ctxt.disconnect(name)
def _request(self, process=None, wait=None): """helper for building pvRequests :param str process: Control remote processing. May be 'true', 'false', 'passive', or None. :param bool wait: Wait for all server processing to complete. """ opts = [] if process is not None: opts.append('process=%s' % process) if wait is not None: if wait: opts.append('wait=true') else: opts.append('wait=false') return 'field()record[%s]' % (','.join(opts))
[docs] def get(self, name, handler, request=None): """Begin Fetch of current value of a PV :param name: A single name string or list of name strings :param request: A :py:class:`p4p.Value` or string to qualify this request, or None to use a default. :param callable handler: Completion notification. Called with a Value, RemoteError, or Cancelled :returns: A object with a method cancel() which may be used to abort the operation. """ return _ClientOperation(self._ctxt, name, handler=unwrapHandler(handler, self._nt), pvRequest=wrapRequest(request), get=True, put=False)
[docs] def put(self, name, handler, builder=None, request=None, get=True): """Write a new value to a PV. :param name: A single name string or list of name strings :param callable handler: Completion notification. Called with None (success), RemoteError, or Cancelled :param callable builder: Called when the PV Put type is known. A builder is responsible for filling in the Value to be sent. builder(value) :param request: A :py:class:`p4p.Value` or string to qualify this request, or None to use a default. :param bool get: Whether to do a Get before the Put. If True then the value passed to the builder callable will be initialized with recent PV values. eg. use this with NTEnum to find the enumeration list. :returns: A object with a method cancel() which may be used to abort the operation. """ return _ClientOperation(self._ctxt, name, handler=unwrapHandler(handler, self._nt), builder=defaultBuilder(builder, self._nt), pvRequest=wrapRequest(request), get=get, put=True)
[docs] def rpc(self, name, handler, value, request=None): """Perform RPC operation on PV :param name: A single name string or list of name strings :param callable handler: Completion notification. Called with a Value, RemoteError, or Cancelled :param request: A :py:class:`p4p.Value` or string to qualify this request, or None to use a default. :returns: A object with a method cancel() which may be used to abort the operation. """ if value is None: value = Value(Type([])) return _ClientOperation(self._ctxt, name, handler=unwrapHandler(handler, self._nt), value=value, pvRequest=wrapRequest(request), rpc=True)
[docs] def monitor(self, name, handler, request=None, **kws): """Begin subscription to named PV :param str name: PV name string :param callable handler: Completion notification. Called with None (FIFO not empty), RemoteError, Cancelled, or Disconnected :param request: A :py:class:`p4p.Value` or string to qualify this request, or None to use a default. :param bool notify_disconnect: Whether disconnect (and done) notifications are delivered to the callback (as None). :returns: A Subscription """ return Subscription(self._ctxt, name, nt=self._nt, handler=monHandler(handler), pvRequest=wrapRequest(request), **kws)
[docs] @staticmethod def providers(): return ["pva"]
[docs] @staticmethod def set_debug(lvl): _p4p.set_debug(lvl)
set_debug = _p4p.logger_level_set def _cleanup_contexts(): contexts = list(_p4p.all_providers) _log.debug("Closing %d Client contexts", len(contexts)) for ctxt in contexts: ctxt.close() class _ClientOperation(_p4p.ClientOperation): if unicode is str: def __del__(self): self.close() class _ClientProvider(_p4p.ClientProvider): if unicode is str: def __del__(self): self.close()