import logging
_log = logging.getLogger(__name__)
try:
from itertools import izip
except ImportError:
izip = zip
from collections import OrderedDict
from ..wrapper import Type, Value
from .common import timeStamp, alarm, NTBase
from .scalar import NTScalar
from .ndarray import NTNDArray
from .enum import NTEnum
__all__ = [
'NTScalar',
'NTEnum',
'NTMultiChannel',
'NTTable',
'NTNDArray',
'defaultNT',
]
_default_nt = {
"epics:nt/NTScalar:1.0": NTScalar,
"epics:nt/NTScalarArray:1.0": NTScalar,
"epics:nt/NTEnum:1.0": NTEnum,
"epics:nt/NTNDArray:1.0": NTNDArray,
}
[docs]def defaultNT():
"""Returns a copy of the default NT helper mappings.
:since: 3.1.0
"""
return _default_nt.copy()
class UnwrapOnly(object):
def __init__(self, unwrap):
self.unwrap = unwrap
def __call__(self):
return self # we are state-less
def wrap(self, V):
return V
def buildNT(nt=None, unwrap=None):
if unwrap is False or nt is False:
return ClientUnwrapper({}) # disable use of wrappers
if unwrap is not None:
# legacy
ret = {} # ignore new style
for ID,fn in (unwrap or {}).items():
ret[ID] = UnwrapOnly(fn)
else:
ret = dict(_default_nt)
ret.update(nt or {})
return ClientUnwrapper(ret)
class ClientUnwrapper(object):
def __init__(self, nt=None):
self.nt = nt
self.id = None
self._wrap = self._unwrap = lambda x:x
self._assign = self._default_assign
def wrap(self, val, **kws):
"""Pack a arbitrary python object into a Value
"""
return self._wrap(val, **kws)
def unwrap(self, val):
"""Unpack a Value as some other python type
"""
if val.getID()!=self.id:
self._update(val)
return self._unwrap(val)
def assign(self, V, value):
if V.getID()!=self.id:
self._update(V)
self._assign(V, value)
def _update(self, val):
# type change
nt = self.nt.get(val.getID())
if nt is not None:
nt = nt() # instancate
self._wrap, self._unwrap = nt.wrap, nt.unwrap
self._assign = nt.assign
self.id = val.getID()
else:
# reset
self._wrap = self._unwrap = lambda x:x
self._assign = self._default_assign
def _default_assign(self, V, value):
V.value = value # assume NTScalar-like
def __repr__(self):
return '%s(%s)'%(self.__class__.__name__, repr(self.nt))
__str__ = __repr__
[docs]class NTMultiChannel(NTBase):
"""Describes a structure holding the equivalent of a number of NTScalar
"""
Value = Value
[docs] @staticmethod
def buildType(valtype, extra=[]):
"""Build a Type
:param str valtype: A type code to be used with the 'value' field. Must be an array
:param list extra: A list of tuples describing additional non-standard fields
:returns: A :py:class:`Type`
"""
assert valtype[:1] == 'a', 'valtype must be an array'
return Type(id="epics:nt/NTMultiChannel:1.0",
spec=[
('value', valtype),
('channelName', 'as'),
('descriptor', 's'),
('alarm', alarm),
('timeStamp', timeStamp),
('severity', 'ai'),
('status', 'ai'),
('message', 'as'),
('secondsPastEpoch', 'al'),
('nanoseconds', 'ai'),
('userTag', 'ai'),
('isConnected', 'a?'),
] + extra)
[docs]class NTTable(NTBase):
"""A generic table
>>> table = NTTable.buildType(columns=[
('columnA', 'ai'),
('columnB', 'as'),
])
"""
Value = Value
[docs] @staticmethod
def buildType(columns=[], extra=[]):
"""Build a table
:param list columns: List of column names and types. eg [('colA', 'd')]
:param list extra: A list of tuples describing additional non-standard fields
:returns: A :py:class:`Type`
"""
return Type(id="epics:nt/NTTable:1.0",
spec=[
('labels', 'as'),
('value', ('S', None, columns)),
('descriptor', 's'),
('alarm', alarm),
('timeStamp', timeStamp),
] + extra)
def __init__(self, columns=[], extra=[]):
self.labels = []
C = []
for col, type in columns:
if type[0] == 'a':
raise ValueError("NTTable column types may not be array")
C.append((col, 'a' + type))
self.labels.append(col)
self.type = self.buildType(C, extra=extra)
[docs] def wrap(self, values, **kws):
"""Pack an iterable of dict into a Value
>>> T=NTTable([('A', 'ai'), ('B', 'as')])
>>> V = T.wrap([
{'A':42, 'B':'one'},
{'A':43, 'B':'two'},
])
"""
if isinstance(values, Value):
return values
else:
cols = dict([(L, []) for L in self.labels])
try:
# unzip list of dict
for V in values:
for L in self.labels:
try:
cols[L].append(V[L])
except (IndexError, KeyError):
pass
# allow omit empty columns
for L in self.labels:
V = cols[L]
if len(V) == 0:
del cols[L]
try:
values = self.Value(self.type, {
'labels': self.labels,
'value': cols,
})
except:
_log.error("Failed to encode '%s' with %s", cols, self.labels)
raise
except:
_log.exception("Failed to wrap: %s", values)
raise
return self._annotate(values, **kws)
[docs] @staticmethod
def unwrap(value):
"""Iterate an NTTable
:returns: An iterator yielding an OrderedDict for each column
"""
ret = []
# build lists of column names, and value
lbl, cols = [], []
for cname, cval in value.value.items():
lbl.append(cname)
cols.append(cval)
# zip together column arrays to iterate over rows
for rval in izip(*cols):
# zip together column names and row values
ret.append(OrderedDict(zip(lbl, rval)))
return ret
[docs]class NTURI(object):
[docs] @staticmethod
def buildType(args):
"""Build NTURI
:param list args: A list of tuples of query argument name and PVD type code.
>>> I = NTURI([
('arg_a', 'I'),
('arg_two', 's'),
])
"""
try:
return Type(id="epics:nt/NTURI:1.0", spec=[
('scheme', 's'),
('authority', 's'),
('path', 's'),
('query', ('S', None, args)),
])
except Exception as e:
raise ValueError('Unable to build NTURI compatible type from %s' % args)
def __init__(self, args):
self._args = list(args)
self.type = self.buildType(args)
[docs] def wrap(self, path, args=(), kws={}, scheme='', authority=''):
"""Wrap argument values (tuple/list with optional dict) into Value
:param str path: The PV name to which this call is made
:param tuple args: Ordered arguments
:param dict kws: Keyword arguments
:rtype: Value
"""
# build dict of argument name+value
AV = {}
AV.update([A for A in kws.items() if A[1] is not None])
AV.update([(N, V) for (N, _T), V in zip(self._args, args)])
# list of argument name+type tuples for which a value was provided
AT = [A for A in self._args if A[0] in AV]
T = self.buildType(AT)
try:
return Value(T, {
'scheme': scheme,
'authority': authority,
'path': path,
'query': AV,
})
except Exception as e:
raise ValueError('Unable to initialize NTURI %s from %s using %s' % (AT, AV, self._args))