Source code for p4p.nt


import logging
_log = logging.getLogger(__name__)

try:
    from itertools import izip
except ImportError:
    izip = zip

from collections import OrderedDict
from ..wrapper import Type, Value
from .common import timeStamp, alarm, NTBase
from .scalar import NTScalar
from .ndarray import NTNDArray
from .enum import NTEnum

__all__ = [
    'NTScalar',
    'NTEnum',
    'NTMultiChannel',
    'NTTable',
    'NTNDArray',
    'defaultNT',
]

_default_nt = {
    "epics:nt/NTScalar:1.0": NTScalar,
    "epics:nt/NTScalarArray:1.0": NTScalar,
    "epics:nt/NTEnum:1.0": NTEnum,
    "epics:nt/NTNDArray:1.0": NTNDArray,
}

[docs]def defaultNT(): """Returns a copy of the default NT helper mappings. :since: 3.1.0 """ return _default_nt.copy()
class UnwrapOnly(object): def __init__(self, unwrap): self.unwrap = unwrap def __call__(self): return self # we are state-less def wrap(self, V): return V def buildNT(nt=None, unwrap=None): if unwrap is False or nt is False: return ClientUnwrapper({}) # disable use of wrappers if unwrap is not None: # legacy ret = {} # ignore new style for ID,fn in (unwrap or {}).items(): ret[ID] = UnwrapOnly(fn) else: ret = dict(_default_nt) ret.update(nt or {}) return ClientUnwrapper(ret) class ClientUnwrapper(object): def __init__(self, nt=None): self.nt = nt self.id = None self._wrap = self._unwrap = lambda x:x self._assign = self._default_assign def wrap(self, val, **kws): """Pack a arbitrary python object into a Value """ return self._wrap(val, **kws) def unwrap(self, val): """Unpack a Value as some other python type """ if val.getID()!=self.id: self._update(val) return self._unwrap(val) def assign(self, V, value): if V.getID()!=self.id: self._update(V) self._assign(V, value) def _update(self, val): # type change nt = self.nt.get(val.getID()) if nt is not None: nt = nt() # instancate self._wrap, self._unwrap = nt.wrap, nt.unwrap self._assign = nt.assign self.id = val.getID() else: # reset self._wrap = self._unwrap = lambda x:x self._assign = self._default_assign def _default_assign(self, V, value): V.value = value # assume NTScalar-like def __repr__(self): return '%s(%s)'%(self.__class__.__name__, repr(self.nt)) __str__ = __repr__
[docs]class NTMultiChannel(NTBase): """Describes a structure holding the equivalent of a number of NTScalar """ Value = Value
[docs] @staticmethod def buildType(valtype, extra=[]): """Build a Type :param str valtype: A type code to be used with the 'value' field. Must be an array :param list extra: A list of tuples describing additional non-standard fields :returns: A :py:class:`Type` """ assert valtype[:1] == 'a', 'valtype must be an array' return Type(id="epics:nt/NTMultiChannel:1.0", spec=[ ('value', valtype), ('channelName', 'as'), ('descriptor', 's'), ('alarm', alarm), ('timeStamp', timeStamp), ('severity', 'ai'), ('status', 'ai'), ('message', 'as'), ('secondsPastEpoch', 'al'), ('nanoseconds', 'ai'), ('userTag', 'ai'), ('isConnected', 'a?'), ] + extra)
[docs]class NTTable(NTBase): """A generic table >>> table = NTTable.buildType(columns=[ ('columnA', 'ai'), ('columnB', 'as'), ]) """ Value = Value
[docs] @staticmethod def buildType(columns=[], extra=[]): """Build a table :param list columns: List of column names and types. eg [('colA', 'd')] :param list extra: A list of tuples describing additional non-standard fields :returns: A :py:class:`Type` """ return Type(id="epics:nt/NTTable:1.0", spec=[ ('labels', 'as'), ('value', ('S', None, columns)), ('descriptor', 's'), ('alarm', alarm), ('timeStamp', timeStamp), ] + extra)
def __init__(self, columns=[], extra=[]): self.labels = [] C = [] for col, type in columns: if type[0] == 'a': raise ValueError("NTTable column types may not be array") C.append((col, 'a' + type)) self.labels.append(col) self.type = self.buildType(C, extra=extra)
[docs] def wrap(self, values, **kws): """Pack an iterable of dict into a Value >>> T=NTTable([('A', 'ai'), ('B', 'as')]) >>> V = T.wrap([ {'A':42, 'B':'one'}, {'A':43, 'B':'two'}, ]) """ if isinstance(values, Value): return values else: cols = dict([(L, []) for L in self.labels]) try: # unzip list of dict for V in values: for L in self.labels: try: cols[L].append(V[L]) except (IndexError, KeyError): pass # allow omit empty columns for L in self.labels: V = cols[L] if len(V) == 0: del cols[L] try: values = self.Value(self.type, { 'labels': self.labels, 'value': cols, }) except: _log.error("Failed to encode '%s' with %s", cols, self.labels) raise except: _log.exception("Failed to wrap: %s", values) raise return self._annotate(values, **kws)
[docs] @staticmethod def unwrap(value): """Iterate an NTTable :returns: An iterator yielding an OrderedDict for each column """ ret = [] # build lists of column names, and value lbl, cols = [], [] for cname, cval in value.value.items(): lbl.append(cname) cols.append(cval) # zip together column arrays to iterate over rows for rval in izip(*cols): # zip together column names and row values ret.append(OrderedDict(zip(lbl, rval))) return ret
[docs]class NTURI(object):
[docs] @staticmethod def buildType(args): """Build NTURI :param list args: A list of tuples of query argument name and PVD type code. >>> I = NTURI([ ('arg_a', 'I'), ('arg_two', 's'), ]) """ try: return Type(id="epics:nt/NTURI:1.0", spec=[ ('scheme', 's'), ('authority', 's'), ('path', 's'), ('query', ('S', None, args)), ]) except Exception as e: raise ValueError('Unable to build NTURI compatible type from %s' % args)
def __init__(self, args): self._args = list(args) self.type = self.buildType(args)
[docs] def wrap(self, path, args=(), kws={}, scheme='', authority=''): """Wrap argument values (tuple/list with optional dict) into Value :param str path: The PV name to which this call is made :param tuple args: Ordered arguments :param dict kws: Keyword arguments :rtype: Value """ # build dict of argument name+value AV = {} AV.update([A for A in kws.items() if A[1] is not None]) AV.update([(N, V) for (N, _T), V in zip(self._args, args)]) # list of argument name+type tuples for which a value was provided AT = [A for A in self._args if A[0] in AV] T = self.buildType(AT) try: return Value(T, { 'scheme': scheme, 'authority': authority, 'path': path, 'query': AV, }) except Exception as e: raise ValueError('Unable to initialize NTURI %s from %s using %s' % (AT, AV, self._args))