Source code for p4p.rpc


import sys
import logging
import inspect
from functools import wraps, partial
_log = logging.getLogger(__name__)

from .wrapper import Value, Type
from .nt import NTURI
from .client.raw import RemoteError
from .server import DynamicProvider
from .server.raw import SharedPV
from .util import ThreadedWorkQueue, WorkQueue, Full, Empty

__all__ = [
    'rpc',
    'rpccall',
    'rpcproxy',
    'RemoteError',
    'WorkQueue',
    'NTURIDispatcher',
]


[docs]def rpc(rtype=None): """Decorator marks a method for export. :param type: Specifies which :py:class:`Type` this method will return. The return type (rtype) must be one of: - An instance of :py:class:`p4p.Type` - None, in which case the method must return a :py:class:`p4p.Value` - One of the NT helper classes (eg :py:class:`p4p.nt.NTScalar`). - A list or tuple used to construct a :py:class:`p4p.Type`. Exported methods raise an :py:class:`Exception` to indicate an error to the remote caller. :py:class:`RemoteError` may be raised to send a specific message describing the error condition. >>> class Example(object): @rpc(NTScalar.buildType('d')) def add(self, lhs, rhs): return {'value':float(lhs)+flost(rhs)} """ wrap = None if rtype is None or isinstance(rtype, Type): pass elif isinstance(type, (list, tuple)): rtype = Type(rtype) elif hasattr(rtype, 'type'): # eg. one of the NT* helper classes wrap = rtype.wrap rtype = rtype.type else: raise TypeError("Not supported") def wrapper(fn): if wrap is not None: orig = fn @wraps(orig) def wrapper2(*args, **kws): return wrap(orig(*args, **kws)) fn = wrapper2 fn._reply_Type = rtype return fn return wrapper
[docs]def rpccall(pvname, request=None, rtype=None): """Decorator marks a client proxy method. :param str pvname: The PV name, which will be formated using the 'format' argument of the proxy class constructor. :param request: A pvRequest string or :py:class:`p4p.Value` passed to eg. :py:meth:`p4p.client.thread.Context.rpc`. The method to be decorated must have all keyword arguments, where the keywords are type code strings or :class:`~p4p.Type`. """ def wrapper(fn): fn._call_PV = pvname fn._call_Request = request fn._reply_Type = rtype return fn return wrapper
class RPCDispatcherBase(DynamicProvider): def __init__(self, queue, target=None, channels=set(), name=None): DynamicProvider.__init__(self, name, self) # we are our own Handler self.queue = queue self.target = target self.channels = set(channels) self.name = name self.__pv = SharedPV( handler=self, # no per-channel state, and only RPC used, so only need on PV initial=Value(Type([])), # we don't support get/put/monitor, so use empty struct ) M = self.methods = {} for name, mem in inspect.getmembers(target): if not hasattr(mem, '_reply_Type'): continue M[name] = mem def getMethodNameArgs(self, request): raise NotImplementedError("Sub-class must implement getMethodName") # sub-class needs to extract method name from request # return 'name', {'var':'val'} def testChannel(self, name): _log.debug("Test RPC channel %s = %s", name, name in self.channels) return name in self.channels def makeChannel(self, name, src): if self.testChannel(name): _log.debug("Open RPC channel %s", name) return self.__pv # no per-channel tracking needed else: _log.warn("Ignore RPC channel %s", name) def rpc(self, pv, op): _log.debug("RPC call %s", op) try: self.queue.push(partial(self._handle, op)) except Full: _log.warn("RPC call queue overflow") op.done(error="Too many concurrent RPC calls") def _handle(self, op): try: request = op.value() name, args = self.getMethodNameArgs(request) fn = self.methods[name] rtype = fn._reply_Type R = fn(**args) if not isinstance(R, Value): try: R = Value(rtype, R) except: _log.exception("Error encoding %s as %s", R, rtype) op.done(error="Error encoding reply") return _log.debug("RPC reply %s -> %r", request, R) op.done(R) except RemoteError as e: _log.debug("RPC reply %s -> error: %s", request, e) op.done(error=str(e)) except: _log.exception("Error handling RPC %s", request) op.done(error="Error handling RPC")
[docs]class NTURIDispatcher(RPCDispatcherBase): """RPC dispatcher using NTURI (a al. eget) Method names are prefixed with a fixed string. >>> queue = WorkQueue() >>> class Summer(object): @rpc([('result', 'i')]) def add(self, a=None, b=None): return {'result': int(a)+int(b)} >>> installProvider("arbitrary", NTURIDispatcher(queue, target=Summer(), prefix="pv:prefix:")) Making a call with the CLI 'eget' utility:: $ eget -s pv:prefix:add -a a=1 -a b=2 .... int result 3 :param queue WorkQueue: A WorkQueue to which RPC calls will be added :param prefix str: PV name prefix used by RPC methods :param target: The object which has the RPC calls """ def __init__(self, queue, prefix=None, **kws): RPCDispatcherBase.__init__(self, queue, **kws) self.prefix = prefix self.methods = dict([(prefix + meth, fn) for meth, fn in self.methods.items()]) self.channels = set(self.methods.keys()) _log.debug('NTURI methods: %s', ', '.join(self.channels)) def getMethodNameArgs(self, request): # {'schema':'pva', 'path':'pvname', 'query':{'var':'val', ...}} return request.path, dict(request.query.items())
# legecy for MASAR only # do not use in new code class MASARDispatcher(RPCDispatcherBase): def __init__(self, queue, **kws): RPCDispatcherBase.__init__(self, queue, **kws) _log.debug("MASAR pv %s methods %s", self.channels, self.methods) def getMethodNameArgs(self, request): # all through a single PV, method name in request # {'function':'rpcname', 'name':['name', ...], 'value':['val', ...]} return request.function, dict(zip(request.get('name', []), request.get('value', [])))
[docs]def quickRPCServer(provider, prefix, target, maxsize=20, workers=1, useenv=True, conf=None, isolate=False): """Run an RPC server in the current thread Calls are handled sequentially, and always in the current thread, if workers=1 (the default). If workers>1 then calls are handled concurrently by a pool of worker threads. Requires NTURI style argument encoding. :param str provider: A provider name. Must be unique in this process. :param str prefix: PV name prefix. Along with method names, must be globally unique. :param target: The object which is exporting methods. (use the :func:`rpc` decorator) :param int maxsize: Number of pending RPC calls to be queued. :param int workers: Number of worker threads (default 1) :param useenv: Passed to :class:`~p4p.server.Server` :param conf: Passed to :class:`~p4p.server.Server` :param isolate: Passed to :class:`~p4p.server.Server` """ from p4p.server import Server import time queue = ThreadedWorkQueue(maxsize=maxsize, workers=workers) provider = NTURIDispatcher(queue, target=target, prefix=prefix, name=provider) threads = [] server = Server(providers=[provider], useenv=useenv, conf=conf, isolate=isolate) with server, queue: while True: time.sleep(10.0)
[docs]class RPCProxyBase(object): """Base class for automatically generated proxy classes """ context = None "The Context provided on construction" format = None "The tuple/dict used to format ('%' operator) PV name strings." timeout = 3.0 "Timeout of RPC calls in seconds" authority = '' "Authority string sent with NTURI requests" throw = True "Whether call errors raise an exception, or return it" scheme = None # set to override automatic
def _wrapMethod(K, V): pv, req = V._call_PV, V._call_Request if sys.version_info >= (3, 0): S = inspect.getfullargspec(V) keywords = S.varkw else: S = inspect.getargspec(V) keywords = S.keywords if S.varargs is not None or keywords is not None: raise TypeError("vararg not supported for proxy method %s" % K) if len(S.args) != len(S.defaults): raise TypeError("proxy method %s must specify types for all arguments" % K) try: NT = NTURI(zip(S.args, S.defaults)) except Exception as e: raise TypeError("%s : failed to build method from %s, %s" % (e, S.args, S.defaults)) @wraps(V) def mcall(self, *args, **kws): pvname = pv % self.format try: uri = NT.wrap(pvname, args, kws, scheme=self.scheme or self.context.name, authority=self.authority) except Exception as e: raise ValueError("Unable to wrap %s %s as %s (%s)" % (args, kws, NT, e)) return self.context.rpc(pvname, uri, request=req, timeout=self.timeout, throw=self.throw) return mcall
[docs]def rpcproxy(spec): """Decorator to enable this class to proxy RPC client calls The decorated class constructor takes two additional arguments, `context=` is required to be a :class:`~p4p.client.thread.Context`. `format`= can be a string, tuple, or dictionary and is applied to PV name strings given to :py:func:`rpcall`. Other arguments are passed to the user class constructor. :: @rpcproxy class MyProxy(object): @rpccall("%s:add") def add(lhs='d', rhs='d'): pass ctxt = Context('pva') proxy = MyProxy(context=ctxt, format="tst:") # evaluates "%s:add"%"tst:" The decorated class will be a sub-class of the provided class and :class:`RPCProxyBase`. """ # inject our ctor first so we don't have to worry about super() non-sense. def _proxyinit(self, context=None, format={}, **kws): assert context is not None, context self.context = context self.format = format spec.__init__(self, **kws) obj = {'__init__': _proxyinit} for K, V in inspect.getmembers(spec, lambda M: hasattr(M, '_call_PV')): obj[K] = _wrapMethod(K, V) return type(spec.__name__, (RPCProxyBase, spec), obj)