Source code for p4p.server

import sys
import logging
import warnings
import re
import time
import uuid
from weakref import ref

from weakref import WeakSet

from .. import _p4p
from .._p4p import (Server as _Server,
                    StaticProvider as _StaticProvider,
                    DynamicProvider as _DynamicProvider,
                    ServerOperation,
                    )

if sys.version_info >= (3, 0):
    unicode = str

_log = logging.getLogger(__name__)

__all__ = (
    'Server',
        'installProvider',
        'removeProvider',
        'StaticProvider',
        'DynamicProvider',
        'ServerOperation',
)

[docs]def installProvider(name, provider): _p4p._providers[name] = ref(provider)
[docs]def removeProvider(name): _p4p._providers.pop(name, None)
def clearProviders(): _p4p._providers.clear()
[docs]class Server(object): """Server(conf=None, useenv=True, providers=[""]) :param providers: A list of provider names or instances. See below. :param dict conf: Configuration keys for the server. Uses same names as environment variables (aka. EPICS_PVAS_*) :param bool useenv: Whether to use process environment in addition to provided config. :param bool isolate: If True, override conf= and useenv= to select a configuration suitable for isolated testing. eg. listening only on localhost with a randomly chosen port number. Use `conf()` to determine which port is being used. Run a PVAccess server serving Channels from the listed providers. The server is running after construction, until stop(). :: S = Server(providers=["example"]) # do something else S.stop() As a convenience, a Server may be used as a context manager to automatically `stop()`. :: with Server(providers=["example"]) as S: # do something else When configuring a Server, conf keys provided to the constructor have the same name as the environment variables. If both are given, then the provided conf dict is used. Call Server.conf() to see a list of valid server (EPICS_PVAS_*) key names and the actual values. The providers list may contain: name strings (cf. installProvider()), `StaticProvider` or `DynamicProvider` instances, or a dict "{'pv:name':`SharedPV`}" to implicitly creat a `StaticProvider`. Each entry may also be a tuple "(provider, order)" where "provider" is any of the allowed types, and "order" is an integer used to resolve ambiguity if more than one provider may claim a PV name. (lower numbers are queried first, the default order is 0) """ def __init__(self, providers, isolate=False, **kws): self.__keep_alive = [] # ick... if isinstance(providers, (bytes, unicode)): providers = providers.split() # split on space warnings.warn("Server providers list should be a list", DeprecationWarning) Ps = [] for provider in providers: if isinstance(provider, tuple): provider, order = provider elif hasattr(provider, 'order'): order = provider.order else: order = 0 if isinstance(provider, (bytes, unicode)): if not re.match(r'^[^ \t\n\r]+$', provider): raise ValueError("Invalid provider name: '%s'"%provider) Ps.append((provider, order)) elif isinstance(provider, (_StaticProvider, _DynamicProvider, _p4p.Source)): Ps.append((provider, order)) elif hasattr(provider, 'items'): P = StaticProvider() for name, pv in provider.items(): P.add(name, pv) Ps.append((P, order)) # Normally user code is responsible for keeping the StaticProvider alive. # Not possible in this case though. self.__keep_alive.append(P) else: raise ValueError("providers=[] must be a list of string, SharedPV, or dict. Not %s"%provider) if isolate: assert 'useenv' not in kws and 'conf' not in kws, kws kws['useenv'] = False kws['conf'] = { 'EPICS_PVAS_INTF_ADDR_LIST': '127.0.0.1', 'EPICS_PVA_ADDR_LIST': '127.0.0.1', 'EPICS_PVA_AUTO_ADDR_LIST': '0', 'EPICS_PVA_SERVER_PORT': '0', 'EPICS_PVA_BROADCAST_PORT': '0', } _log.debug("Starting Server isolated=%s, %s", isolate, kws) self._S = _Server(providers=Ps, **kws) self.tostr = self._S.tostr self._S.start() try: if _log.isEnabledFor(logging.DEBUG): _log.debug("New Server: %s", self.tostr(5)) except: self._S.stop() raise def __enter__(self): return self def __exit__(self, A, B, C): self.stop()
[docs] def conf(self): """Return a dict() with the effective configuration this server is using. Suitable to pass to another Server to duplicate this configuration, or to a client Context to allow it to connect to this server. :: with Server(providers=["..."], isolate=True) as S: with p4p.client.thread.Context('pva', conf=S.conf(), useenv=False) as C: print(C.get("pv:name")) """ return self._S.conf()
[docs] def stop(self): """Force server to stop serving, and close connections to existing clients. """ _log.debug("Stopping Server") self._S.stop() self.__keep_alive = []
@classmethod def forever(klass, *args, **kws): """Create a server and block the calling thread until KeyboardInterrupt. Shorthand for: :: with Server(*args, **kws): try; time.sleep(99999999) except KeyboardInterrupt: pass """ with klass(*args, **kws): _log.info("Running server") try: while True: time.sleep(100) except KeyboardInterrupt: pass finally: _log.info("Stopping server")
[docs]class StaticProvider(_StaticProvider): """A channel provider which servers from a clearly defined list of names. This list may change at any time. :param str name: Provider name. Must be unique within the local context in which it is used. None, the default, will choose an appropriate value. """ def __init__(self, name=None): if name is None: # Caller doesn't care. Pick something unique w/o spaces name = str(uuid.uuid4()) super(StaticProvider, self).__init__(name)
[docs]class DynamicProvider(_DynamicProvider): """A channel provider which does not maintain a list of provided channel names. The following example shows a simple case, in fact so simple that StaticProvider is a better fit. :: class DynHandler(object): def __init__(self): self.pv = SharedPV() def testChannel(self, name): # return True, False, or DynamicProvider.NotYet return name=="blah" def makeChannel(self, name, peer): assert name=="blah" return self.pv provider = DynamicProvider("arbitrary", DynHandler()) server = Server(providers=[provider]) """ # Return from Handler.testChannel() to prevent caching of negative result. # Use when testChannel('name') might shortly return True NotYet = b'nocache' def __init__(self, name, handler): _DynamicProvider.__init__(self, name, self._WrapHandler(handler)) class _WrapHandler(object): "Wrapper around user Handler which logs exception" def __init__(self, real): self._real = real def testChannel(self, name): try: return self._real.testChannel(name) except: _log.exception("Unexpected") def makeChannel(self, name, peer): try: return self._real.makeChannel(name, peer) except: _log.exception("Unexpected")
def _cleanup_servers(): _log.debug("Stopping all Server instances") servers = list(_p4p.all_servers) for srv in servers: srv.stop()