"""Helper for handling NTNDArray a la. areaDetector.
Known attributes
"ColorMode" (inner-most left, as given in NDArray.cpp, numpy.ndarray.shape is reversed from this)
0 - Mono [Nx, Ny]
1 - Bayer [Nx, Ny]
2 - RGB1 [3, Nx, Ny]
3 - RGB2 [Nx, 3, Ny]
4 - RGB3 [Nx, Ny, 3]
5 - YUV444 ?
6 - YUV422 ??
7 - YUV411 ???
"""
import logging
_log = logging.getLogger(__name__)
import time
import numpy
from ..wrapper import Type, Value
from .common import alarm, timeStamp, NTBase
from .scalar import ntwrappercommon
[docs]class ntndarray(ntwrappercommon, numpy.ndarray):
"""
Augmented numpy.ndarray with additional attributes
* .attrib - dictionary
* .severity
* .status
* .timestamp - Seconds since 1 Jan 1970 UTC as a float
* .raw_stamp - A tuple (seconds, nanoseconds)
* .raw - The underlying :py:class:`p4p.Value`.
Keys in the ``attrib`` dictionary may be any python which may be stored in a PVA field,
including an arbitrary ``Value``.
However, special handling is attempted if the provided ``Value`` appears to be an NTScalar
or similar, in which case the .value, .alarm and .timeStamp are unpacked to the NTAttribute
and other fields are discarded.
"""
def __init__(self, *args, **kws):
super(ntndarray, self).__init__(*args, **kws)
self.attrib = {}
def _store(self, value):
ntwrappercommon._store(self, value)
self.attrib = {}
for elem in value.get('attribute', []):
self.attrib[elem.name] = elem.value
# we will fail if dimension[] contains None s, or if
# the advertised shape isn't consistent with the pixel array length.
shape = [D.size for D in value.dimension]
shape.reverse()
# in-place reshape! Isn't numpy fun
self.shape = shape or [0] # can't reshape if 0-d, so treat as 1-d if no dimensions provided
return self
[docs]class NTNDArray(NTBase):
"""Representation of an N-dimensional array with meta-data
Translates into `ntndarray`
"""
Value = Value
ntndarray = ntndarray
# map numpy.dtype.char to .value union member name
_code2u = {
'?':'booleanValue',
'b':'byteValue',
'h':'shortValue',
'i':'intValue',
'l':'longValue',
'B':'ubyteValue',
'H':'ushortValue',
'I':'uintValue',
'L':'ulongValue',
'f':'floatValue',
'd':'doubleValue',
}
[docs] @classmethod
def buildType(klass, extra=[]):
"""Build type
"""
ret = klass._default_type
if extra:
L = ret.aspy()
L.extend(extra)
ret = Type(L, ret.getID())
return ret
_default_type = Type([
('value', ('U', None, [
('booleanValue', 'a?'),
('byteValue', 'ab'),
('shortValue', 'ah'),
('intValue', 'ai'),
('longValue', 'al'),
('ubyteValue', 'aB'),
('ushortValue', 'aH'),
('uintValue', 'aI'),
('ulongValue', 'aL'),
('floatValue', 'af'),
('doubleValue', 'ad'),
])),
('codec', ('S', 'codec_t', [
('name', 's'),
('parameters', 'v'),
])),
('compressedSize', 'l'),
('uncompressedSize', 'l'),
('uniqueId', 'i'),
('dataTimeStamp', timeStamp),
('alarm', alarm),
('timeStamp', timeStamp),
('dimension', ('aS', 'dimension_t', [
('size', 'i'),
('offset', 'i'),
('fullSize', 'i'),
('binning', 'i'),
('reverse', '?'),
])),
('attribute', ('aS', 'epics:nt/NTAttribute:1.0', [
('name', 's'),
('value', 'v'),
('tags', 'as'),
('descriptor', 's'),
('alarm', alarm),
('timeStamp', timeStamp),
('sourceType', 'i'),
('source', 's'),
])),
], id='epics:nt/NTNDArray:1.0')
def __init__(self, **kws):
self.type = self.buildType(**kws)
[docs] def wrap(self, value, **kws):
"""Wrap numpy.ndarray as Value
"""
attrib = getattr(value, 'attrib', None) or kws.pop('attrib', None) or {}
value = numpy.asarray(value) # loses any special/augmented attributes
dims = value.shape
if 'ColorMode' not in attrib:
# attempt to infer color mode from shape
if value.ndim==2:
attrib['ColorMode'] = 0 # gray
elif value.ndim==3:
for idx,dim in enumerate(reversed(dims)): # inner-most sent as left
if dim==3: # assume it's a color
attrib['ColorMode'] = 2 + idx # 2 - RGB1, 3 - RGB2, 4 - RGB3
break # assume that the first is color, and any subsequent dim=3 is a thin ROI
else:
raise ValueError("Unable to deduce color dimension from shape %r"%dims)
dataSize = value.nbytes
return self._annotate(Value(self.type, {
'value': (self._code2u[value.dtype.char], value.flatten()),
'compressedSize': dataSize,
'uncompressedSize': dataSize,
'uniqueId': 0,
'attribute': [translateNDAttribute(K,V) for K, V in attrib.items()],
'dimension': [{'size': N,
'offset': 0,
'fullSize': N,
'binning': 1,
'reverse': False} for N in reversed(dims)],
}), **kws)
[docs] @classmethod
def unwrap(klass, value):
"""Unwrap Value as NTNDArray
"""
V = value.value
if V is None:
# Union empty. treat as zero-length char array
V = numpy.zeros((0,), dtype=numpy.uint8)
return V.view(klass.ntndarray)._store(value)
[docs] def assign(self, V, py):
"""Store python value in Value
"""
V[None] = self.wrap(py)
def translateNDAttribute(name, value):
if isinstance(value, Value) and 'value' in value: # assume to be NT-like
V = {
'name': name,
'value': value['value'],
}
if 'alarm' in value:
V['alarm'] = value['alarm']
if 'timeStamp' in value:
V['timeStamp'] = value['timeStamp']
return V
return {'name': name, 'value': value}