RPC Server Helpers

Server Example

Remote Procedure Calls are received by the methods of a “target” object. This is any class which has method decorated with rpc().

For example:

from p4p.rpc import rpc, quickRPCServer
from p4p.nt import NTScalar
class Summer(object):
    @rpc(NTScalar("d"))
    def add(self, lhs, rhs): # 'lhs' and 'rhs' are arbitrary names.  The method name 'add' will be part of the PV name
        return float(lhs) + float(rhs)
adder = Summer()

Turn on logging to see RPC related errors.

import logging
logging.basicConfig(level=logging.DEBUG)

Now run a server with quickRPCServer().

quickRPCServer(provider="Example",
               prefix="pv:call:",  # A prefix for method PV names.
               target=adder)

At this point the server is active.

Beginning with EPICS 7.0.2 this can be tested using the “pvcall” utility

$ pvcall pv:call:add lhs=1 rhs=1
2018-10-30 19:49:34.834  2

Previous to EPICS 7.0.2 this can be tested using the “eget” utility from the pvAccessCPP module.

$ eget -s pv:call:add -a lhs=1 -a rhs=1
2
$ eget -s pv:call:add -a lhs=1 -a rhs=1 -N  # for more detail
epics:nt/NTScalar:1.0
    double value 2
    alarm_t alarm NO_ALARM NO_STATUS <no message>
    time_t timeStamp 2017-05-20T08:14:31.917 0

Client Example

Remote procedure calls are made through the rpc() method of a Context. To assist in encoding arguments, a proxy object can be created with the rpcproxy() decorator. A proxy for the preceding example would be:

from p4p.rpc import rpcproxy, rpccall
@rpcproxy
class MyProxy(object):
    @rpccall('%sadd')
    def add(lhs='d', rhs='d'):
        pass

This proxy must be associated with a Context.

from p4p.client.thread import Context
ctxt = Context('pva')
proxy = MyProxy(context=ctxt, format='pv:call:')
print proxy.add(1, 1)

A decorated proxy class has two additional contructor arguments.

Using Low Level Client API

It may be helpful to illustrate what a proxy method call is actually doing.

from p4p import Type, Value
V = Value(Type([
    ('schema', 's'),
    ('path', 's'),
    ('query', ('s', None, [
        ('lhs', 'd'),
        ('rhs', 'd'),
    ])),
]), {
    'schema': 'pva',
    'path': 'pv:call:add',
    'query': {
        'lhs': 1,
        'rhs': 1,
    },
})
print ctxt.rpc('pv:call:add', V)

API Reference

p4p.rpc.rpc(rtype=None)[source]

Decorator marks a method for export.

Parameters

type – Specifies which Type this method will return.

The return type (rtype) must be one of:

Exported methods raise an Exception to indicate an error to the remote caller. RemoteError may be raised to send a specific message describing the error condition.

>>> class Example(object):
    @rpc(NTScalar.buildType('d'))
    def add(self, lhs, rhs):
        return {'value':float(lhs)+flost(rhs)}
p4p.rpc.rpcproxy(spec)[source]

Decorator to enable this class to proxy RPC client calls

The decorated class constructor takes two additional arguments, context= is required to be a Context. format`= can be a string, tuple, or dictionary and is applied to PV name strings given to :py:func:`rpcall. Other arguments are passed to the user class constructor.

@rpcproxy
class MyProxy(object):
    @rpccall("%s:add")
    def add(lhs='d', rhs='d'):
        pass

ctxt = Context('pva')
proxy = MyProxy(context=ctxt, format="tst:")  # evaluates "%s:add"%"tst:"

The decorated class will be a sub-class of the provided class and RPCProxyBase.

p4p.rpc.rpccall(pvname, request=None, rtype=None)[source]

Decorator marks a client proxy method.

Parameters
  • pvname (str) – The PV name, which will be formated using the ‘format’ argument of the proxy class constructor.

  • request – A pvRequest string or p4p.Value passed to eg. p4p.client.thread.Context.rpc().

The method to be decorated must have all keyword arguments, where the keywords are type code strings or Type.

p4p.rpc.quickRPCServer(provider, prefix, target, maxsize=20, workers=1, useenv=True, conf=None, isolate=False)[source]

Run an RPC server in the current thread

Calls are handled sequentially, and always in the current thread, if workers=1 (the default). If workers>1 then calls are handled concurrently by a pool of worker threads. Requires NTURI style argument encoding.

Parameters
  • provider (str) – A provider name. Must be unique in this process.

  • prefix (str) – PV name prefix. Along with method names, must be globally unique.

  • target – The object which is exporting methods. (use the rpc() decorator)

  • maxsize (int) – Number of pending RPC calls to be queued.

  • workers (int) – Number of worker threads (default 1)

  • useenv – Passed to Server

  • conf – Passed to Server

  • isolate – Passed to Server

class p4p.rpc.RPCProxyBase[source]

Base class for automatically generated proxy classes

Parameters
  • context (Context) – The client Context through which calls are made

  • format – A tuple or dict which is applied with the format ‘%’ operator to the name strings given to rpccall().

context = None

The Context provided on construction

timeout = 3.0

Timeout of RPC calls in seconds

authority = ''

Authority string sent with NTURI requests

throw = True

Whether call errors raise an exception, or return it

class p4p.rpc.NTURIDispatcher(queue, prefix=None, **kws)[source]

RPC dispatcher using NTURI (a al. eget)

Method names are prefixed with a fixed string.

>>> queue = WorkQueue()
>>> class Summer(object):
    @rpc([('result', 'i')])
    def add(self, a=None, b=None):
        return {'result': int(a)+int(b)}
>>> installProvider("arbitrary", NTURIDispatcher(queue, target=Summer(), prefix="pv:prefix:"))

Making a call with the CLI ‘eget’ utility:

$ eget -s pv:prefix:add -a a=1 -a b=2
....
int result 3
Parameters
  • WorkQueue (queue) – A WorkQueue to which RPC calls will be added

  • str (prefix) – PV name prefix used by RPC methods

  • target – The object which has the RPC calls

class p4p.rpc.RemoteError

Thrown with an error message which has been sent by a server to its remote client

class p4p.rpc.WorkQueue(maxsize=5)[source]

A threaded work queue.

handle()[source]

Process queued work until interrupt() is called

interrupt()[source]

Break one call to handle()

eg. Call N times to break N threads.

This call blocks if the queue is full.