Source code for p4p.nt.scalar


import time
import sys
import numpy

from ..wrapper import Type, Value
from .common import alarm, timeStamp, NTBase

if sys.version_info >= (3, 0):
    unicode = str


class ntwrappercommon(object):
    raw = timestamp = None

    def _store(self, value):
        assert isinstance(value, Value), value
        self.raw = value
        self.severity = value.get('alarm.severity', 0)
        self.status = value.get('alarm.status', 0)
        S, NS = value.get('timeStamp.secondsPastEpoch', 0), value.get('timeStamp.nanoseconds', 0)
        self.raw_stamp = S, NS
        self.timestamp = S + NS * 1e-9
        # TODO: unpack display/control
        return self

    def __str__(self):
        return '%s %s' % (time.ctime(self.timestamp), self.__repr__())

    tostr = __str__

[docs]class ntfloat(ntwrappercommon, float): """ Augmented float with additional attributes * .severity * .status * .timestamp - Seconds since 1 Jan 1970 UTC as a float * .raw_stamp - A tuple (seconds, nanoseconds) * .raw - The underlying :py:class:`p4p.Value`. """
[docs]class ntint(ntwrappercommon, int): """ Augmented integer with additional attributes * .severity * .status * .timestamp - Seconds since 1 Jan 1970 UTC as a float * .raw_stamp - A tuple (seconds, nanoseconds) * .raw - The underlying :py:class:`p4p.Value`. """
class ntbool(ntwrappercommon, int): """ Augmented boolean with additional attributes * .severity * .status * .timestamp - Seconds since 1 Jan 1970 UTC as a float * .raw_stamp - A tuple (seconds, nanoseconds) * .raw - The underlying :py:class:`p4p.Value`. """ def __new__(cls, value): return int.__new__(cls, bool(value)) def __repr__(self): return bool(self).__repr__().lower()
[docs]class ntstr(ntwrappercommon, unicode): """ Augmented string with additional attributes * .severity * .status * .timestamp - Seconds since 1 Jan 1970 UTC as a float * .raw_stamp - A tuple (seconds, nanoseconds) * .raw - The underlying :py:class:`p4p.Value`. """
[docs]class ntnumericarray(ntwrappercommon, numpy.ndarray): """ Augmented numpy.ndarray with additional attributes * .severity * .status * .timestamp - Seconds since 1 Jan 1970 UTC as a float * .raw_stamp - A tuple (seconds, nanoseconds) * .raw - The underlying :py:class:`p4p.Value`. """ @classmethod def build(klass, val): assert len(val.shape) == 1, val.shape # clone return klass(shape=val.shape, dtype=val.dtype, buffer=val.data, strides=val.strides)
[docs]class ntstringarray(ntwrappercommon, list): """ Augmented list of strings with additional attributes * .severity * .status * .timestamp - Seconds since 1 Jan 1970 UTC as a float * .raw_stamp - A tuple (seconds, nanoseconds) * .raw - The underlying :py:class:`p4p.Value`. """
def _metaHelper(F, valtype, display=False, control=False, valueAlarm=False, form=False): isnumeric = valtype[-1:] not in '?su' if display and isnumeric: F.extend([ ('display', ('S', None, [ ('limitLow', valtype[-1:]), ('limitHigh', valtype[-1:]), ('description', 's'), ('precision', 'i'), ('form', ('S', 'enum_t', [ ('index', 'i'), ('choices', 'as'), ])), ('units', 's'), ] if form else [ ('limitLow', valtype[-1:]), ('limitHigh', valtype[-1:]), ('description', 's'), ('format', 's'), ('units', 's'), ])), ]) elif display and not isnumeric: F.extend([ ('display', ('S', None, [ ('description', 's'), ('units', 's'), ])), ]) if control and isnumeric: F.extend([ ('control', ('S', None, [ ('limitLow', valtype[-1:]), ('limitHigh', valtype[-1:]), ('minStep', valtype[-1:]), ])), ]) if valueAlarm and isnumeric: F.extend([ ('valueAlarm', ('S', None, [ ('active', '?'), ('lowAlarmLimit', valtype[-1:]), ('lowWarningLimit', valtype[-1:]), ('highWarningLimit', valtype[-1:]), ('highAlarmLimit', valtype[-1:]), ('lowAlarmSeverity', 'i'), ('lowWarningSeverity', 'i'), ('highWarningSeverity', 'i'), ('highAlarmSeverity', 'i'), ('hysteresis', 'd'), ])), ])
[docs]class NTScalar(NTBase): """Describes a single scalar or array of scalar values and associated meta-data >>> stype = NTScalar('d') # scalar double >>> V = stype.wrap(4.2) >>> assert isinstance(V, Value) >>> stype = NTScalar.buildType('ad') # vector double >>> V = Value(stype, {'value': [4.2, 4.3]}) The result of `wrap()` is an augmented value object combining `ntwrappercommon` and a python value type (`str`, `int`, `float`, `numpy.ndarray`). Agumented values have some additional attributes including: * .timestamp - The update timestamp is a float representing seconds since 1 jan 1970 UTC. * .raw_stamp - A tuple of (seconds, nanoseconds) * .severity - An integer in the range [0, 3] * .raw - The complete underlying :class:`~p4p.Value` :param str valtype: A type code to be used with the 'value' field. See :ref:`valuecodes` :param list extra: A list of tuples describing additional non-standard fields :param bool display: Include optional fields for display meta-data :param bool control: Include optional fields for control meta-data :param bool valueAlarm: Include optional fields for alarm level meta-data :param bool form: Include ``display.form`` instead of the deprecated ``display.format``. """ Value = Value
[docs] @staticmethod def buildType(valtype, extra=[], *args, **kws): """Build a Type :param str valtype: A type code to be used with the 'value' field. See :ref:`valuecodes` :param list extra: A list of tuples describing additional non-standard fields :param bool display: Include optional fields for display meta-data :param bool control: Include optional fields for control meta-data :param bool valueAlarm: Include optional fields for alarm level meta-data :param bool form: Include ``display.form`` instead of the deprecated ``display.format``. :returns: A :py:class:`Type` """ isarray = valtype[:1] == 'a' F = [ ('value', valtype), ('alarm', alarm), ('timeStamp', timeStamp), ] _metaHelper(F, valtype, *args, **kws) F.extend(extra) return Type(id="epics:nt/NTScalarArray:1.0" if isarray else "epics:nt/NTScalar:1.0", spec=F)
def __init__(self, valtype='d', **kws): self.type = self.buildType(valtype, **kws)
[docs] def wrap(self, value, **kws): """Pack python value into Value Accepts dict to explicitly initialize fields by name. Any other type is assigned to the 'value' field. """ if isinstance(value, Value): pass elif isinstance(value, ntwrappercommon): kws.setdefault('timestamp', value.timestamp) value = value.raw elif isinstance(value, dict): value = self.Value(self.type, value) else: value = self.Value(self.type, {'value': value}) return self._annotate(value, **kws)
typeMap = { bool: ntbool, int: ntint, float: ntfloat, unicode: ntstr, numpy.ndarray: ntnumericarray.build, list: ntstringarray, }
[docs] @classmethod def unwrap(klass, value): """Unpack a Value into an augmented python type (selected from the 'value' field) """ assert isinstance(value, Value), value V = value.value try: T = klass.typeMap[type(V)] except KeyError: raise ValueError("Can't unwrap value of type %s" % type(V)) try: return T(value.value)._store(value) except Exception as e: raise ValueError("Can't construct %s around %s (%s): %s" % (T, value, type(value), e))
[docs] def assign(self, V, py): """Store python value in Value """ V.value = py
if sys.version_info < (3, 0): class ntlong(ntwrappercommon, long): pass NTScalar.typeMap[long] = ntlong