Server API

Running a PVA Server

A Server instance starts/stops a PVAccess server. In order to be useful, a Server must be associated with one or more Providers.

Two Provider containers are available: StaticProvider or DynamicProvider. Both are used with one of the SharedPV classes:, thread.SharedPV, asyncio.SharedPV, and/or cothread.SharedPV. Instances of these different concurrency models may be mixed into a single Provider.

Example

A server with a single “mailbox” PV.

import time
from p4p.nt import NTScalar
from p4p.server import Server
from p4p.server.thread import SharedPV

pv = SharedPV(nt=NTScalar('d'), # scalar double
              initial=0.0)      # setting initial value also open()'s
@pv.put
def handle(pv, op):
    pv.post(op.value()) # just store and update subscribers
    op.done()

Server.forever(providers=[{
    'demo:pv:name':pv, # PV name only appears here
}]) # runs until KeyboardInterrupt

This server can be tested using the included command line tools. eg.

$ python -m p4p.client.cli get demo:pv:name
$ python -m p4p.client.cli put demo:pv:name
$ python -m p4p.client.cli get demo:pv:name

And in another shell.

$ python -m p4p.client.cli monitor demo:pv:name

Note that this example allows clients to change any field, not just ‘.value’. This may be restricted by inspecting the Value returned by ‘op.value()’ to see which fields are marked as changed with eg. Value.changed().

A client put operation can be failed with eg. ‘op.done(error=”oops”)’.

In the put handler function ‘pv’ is the SharedPV and ‘op’ is a ServerOperation.

Server API

class p4p.server.Server(conf=None, useenv=True, providers=[''])[source]
Parameters:
  • providers – A list of provider names or instances. See below.

  • conf (dict) – Configuration keys for the server. Uses same names as environment variables (aka. EPICS_PVAS_*)

  • useenv (bool) – Whether to use process environment in addition to provided config.

  • isolate (bool) – If True, override conf= and useenv= to select a configuration suitable for isolated testing. eg. listening only on localhost with a randomly chosen port number. Use conf() to determine which port is being used.

Run a PVAccess server serving Channels from the listed providers. The server is running after construction, until stop().

S = Server(providers=["example"])
# do something else
S.stop()

As a convenience, a Server may be used as a context manager to automatically stop().

with Server(providers=["example"]) as S:
    # do something else

When configuring a Server, conf keys provided to the constructor have the same name as the environment variables. If both are given, then the provided conf dict is used.

Call Server.conf() to see a list of valid server (EPICS_PVAS_*) key names and the actual values.

The providers list may contain: name strings (cf. installProvider()), StaticProvider or DynamicProvider instances, or a dict “{‘pv:name’:SharedPV}” to implicitly creat a StaticProvider. Each entry may also be a tuple “(provider, order)” where “provider” is any of the allowed types, and “order” is an integer used to resolve ambiguity if more than one provider may claim a PV name. (lower numbers are queried first, the default order is 0)

conf()[source]

Return a dict() with the effective configuration this server is using.

Suitable to pass to another Server to duplicate this configuration, or to a client Context to allow it to connect to this server.

with Server(providers=["..."], isolate=True) as S:
    with p4p.client.thread.Context('pva', conf=S.conf(), useenv=False) as C:
        print(C.get("pv:name"))
stop()[source]

Force server to stop serving, and close connections to existing clients.

class p4p.server.StaticProvider(name=None)[source]

A channel provider which servers from a clearly defined list of names. This list may change at any time.

Parameters:

name (str) – Provider name. Must be unique within the local context in which it is used. None, the default, will choose an appropriate value.

close()
add()
remove()
keys()

For situations where PV names are not known ahead of time, or when PVs are “created” only as requested, DynamicProvider should be used.

class p4p.server.DynamicProvider(name, handler)[source]

A channel provider which does not maintain a list of provided channel names.

The following example shows a simple case, in fact so simple that StaticProvider is a better fit.

class DynHandler(object):
    def __init__(self):
        self.pv = SharedPV()
    def testChannel(self, name): # return True, False, or DynamicProvider.NotYet
        return name=="blah"
    def makeChannel(self, name, peer):
        assert name=="blah"
        return self.pv
provider = DynamicProvider("arbitrary", DynHandler())
server = Server(providers=[provider])
NotYet = b'nocache'

DynamicProvider Handler Interface

A DynamicProvider Handler class will define the following:

class p4p.server.ProviderHandler
testChannel(pvname)

Called with a PV name which some client is searching for.

Returns:

True to claim this PV. False to “permenently” disclaim this PV. Or DynamicProvider.NotYet to temporarily disclaim.

Each DynamicProvider maintains a cache of negative search results (when `testChannel()==False`) to avoid extra work on a subnet with many clients searching for non-existant PVs. If it is desirable to defeat this behavour, for example as part of lazy pv creation, then testChannel() can return DynamicProvider.NotYet instead of False.

makeChannel(pvname, src):

Called when a client attempts to create a Channel for some PV. The object which is returned will not be collected until the client closes the Channel or becomes disconnected.

Returns:

A SharedPV instance.

ServerOperation

This class is passed to SharedPV handler Handler.put() and Handler.rpc() methods.

class p4p.server.ServerOperation

An in-progress Put or RPC operation from a client.

done(value=None, error=None)

Signal completion of the operation.

# successful completion without result (Put or RPC)
done()
# successful completion with result (RPC only)
done(Value)
# unsuccessful completion (Put or RPC)
done(error="msg")
pvRequest() Value

Access the request Value provided by the client, which may ignored, or used to modify handling.

value() Value

For an RPC operation, the argument Value provided

name() str

The PV name used by the client

peer() str

Client address

account() str

Client identity

roles() {str}

Client group memberships

onCancel(callable|None)

Set callable which will be invoked if the remote operation is cancelled by the client, or if client connection is lost.

SharedPV concurrency options

There is a SharedPV class for each of other three concurrency models supported: OS threading, asyncio coroutines, and cothreads. All have the same methods as thread.SharedPV.

The difference between thread.SharedPV, asyncio.SharedPV, and cothread.SharedPV is the context in which the handler methods are called (an OS thread, an asyncio coroutine, or a cothread). This distinction determines how blocking operations may be carried out.

Note that thread.SharedPV uses a fixed size thread pool. This limits the number of concurrent callbacks.

SharedPV API

class p4p.server.thread.SharedPV(queue=None, **kws)[source]

Shared state Process Variable. Callback based implementation.

Note

if initial=None, the PV is initially closed and must be open()’d before any access is possible.

Parameters:
  • handler – A object which will receive callbacks when eg. a Put operation is requested. May be omitted if the decorator syntax is used.

  • initial (Value) – An initial Value for this PV. If omitted, open() s must be called before client access is possible.

  • nt – An object with methods wrap() and unwrap(). eg p4p.nt.NTScalar.

  • wrap (callable) – As an alternative to providing ‘nt=’, A callable to transform Values passed to open() and post().

  • unwrap (callable) – As an alternative to providing ‘nt=’, A callable to transform Values returned Operations in Put/RPC handlers.

  • queue (WorkQueue) – The threaded WorkQueue on which handlers will be run.

  • options (dict) – A dictionary of configuration options.

Creating a PV in the open state, with no handler for Put or RPC (attempts will error).

from p4p.nt import NTScalar
pv = SharedPV(nt=NTScalar('d'), value=0.0)
# ... later
pv.post(1.0)

The full form of a handler object is:

class MyHandler:
    def put(self, op):
        pass
    def rpc(self, op):
        pass
    def onFirstConnect(self): # may be omitted
        pass
    def onLastDisconnect(self): # may be omitted
        pass
pv = SharedPV(MyHandler())

Alternatively, decorators may be used.

pv = SharedPV()
@pv.put
def onPut(pv, op):
    pass
open(value, nt=None, wrap=None, unwrap=None, **kws)[source]

Mark the PV as opened an provide its initial value. This initial value is later updated with post().

Parameters:

value – A Value, or appropriate object (see nt= and wrap= of the constructor).

Any clients which have begun connecting which began connecting while this PV was in the close’d state will complete connecting.

Only those fields of the value which are marked as changed will be stored.

close(destroy=False, sync=False, timeout=None)[source]

Close PV, disconnecting any clients.

Parameters:
  • destroy (bool) – Indicate “permanent” closure. Current clients will not see subsequent open().

  • sync (bool) – When block until any pending onLastDisconnect() is delivered (timeout applies).

  • timeout (float) – Applies only when sync=True. None for no timeout, otherwise a non-negative floating point value.

close() with destory=True or sync=True will not prevent clients from re-connecting. New clients may prevent sync=True from succeeding. Prevent reconnection by __first__ stopping the Server, removing with StaticProvider.remove(), or preventing a DynamicProvider from making new channels to this SharedPV.

post(value, **kws)[source]

Provide an update to the Value of this PV.

Parameters:

value – A Value, or appropriate object (see nt= and wrap= of the constructor).

Only those fields of the value which are marked as changed will be stored.

Any keyword arguments are forwarded to the NT wrap() method (if applicable). Common arguments include: timestamp= , severity= , and message= .

current()[source]

SharedPV Handler Interface

class p4p.server.thread.Handler[source]

Skeleton of SharedPV Handler

Use of this as a base class is optional.

put(pv, op)[source]

Called each time a client issues a Put operation on this Channel.

Parameters:
rpc(pv, op)[source]

Called each time a client issues a Remote Procedure Call operation on this Channel.

Parameters:
onFirstConnect(pv)[source]

Called when the first Client channel is created.

Parameters:

pv (SharedPV) – The SharedPV which this Handler is associated with.

onLastDisconnect(pv)[source]

Called when the last Client channel is closed.

Parameters:

pv (SharedPV) – The SharedPV which this Handler is associated with.

asyncio or cothread

class p4p.server.asyncio.SharedPV[source]

Same arguments as thread.SharedPV except that queue= is replaced with loop= .

Parameters:

loop – An asyncio event loop, or None to use the default.

class p4p.server.cothread.SharedPV

Global Provider registry

If it becomes necessary for a Provider to be included in a server which is started outside of Python code, then it must be placed in the global provider registry with installProvider().

p4p.server.installProvider(name, provider)[source]
p4p.server.removeProvider(name)[source]