# -*- coding: utf-8 -*-
""" asammdf *Signal* class module for time correct signal processing """
import numpy as np
import warnings
from .utils import MdfException
from .version import __version__
[docs]class Signal(object):
"""
The Signal represents a signal described by it's samples and timestamps.
It can do aritmethic operations agains other Signal or numeric type.
The operations are computed in respect to the timestamps (time correct).
The integer signals are not interpolated, instead the last value relative
to the current timestamp is used.
*samples*, *timstamps* and *name* are mandatory arguments.
Parameters
----------
samples : numpy.array | list | tuple
signal samples
timestamps : numpy.array | list | tuple
signal timestamps
unit : str
signal unit
name : str
signal name
info : dict
dict that contains extra information about the signal , default *None*
comment : str
signal comment, default ''
"""
__slots__ = [
'samples',
'timestamps',
'unit',
'name',
'info',
'comment',
'_plot_axis',
]
def __init__(self,
samples=None,
timestamps=None,
unit='',
name='',
info=None,
comment=''):
if samples is None or timestamps is None or name == '':
message = ('"samples", "timestamps" and "name" are mandatory '
'for Signal class __init__')
raise MdfException(message)
else:
if isinstance(samples, (list, tuple)):
samples = np.array(samples)
if isinstance(timestamps, (list, tuple)):
timestamps = np.array(timestamps, dtype=np.float64)
if not samples.shape[0] == timestamps.shape[0]:
message = 'samples and timestamps length missmatch ({} vs {})'
message = message.format(samples.shape[0], timestamps.shape[0])
raise MdfException(message)
self.samples = samples
self.timestamps = timestamps
self.unit = unit
self.name = name
self.info = info
self.comment = comment
self._plot_axis = None
def __str__(self):
string = """<Signal {}:
\tsamples={}
\ttimestamps={}
\tunit="{}"
\tinfo={}
\tcomment="{}">
"""
return string.format(self.name,
self.samples,
self.timestamps,
self.unit,
self.info,
self.comment)
def __repr__(self):
return str(self)
[docs] def plot(self):
""" plot Signal samples """
try:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import axes3d
from matplotlib.widgets import Slider
except ImportError:
warnings.warn("Signal plotting requires matplotlib")
return
if len(self.samples.shape) <= 1 and self.samples.dtype.names is None:
fig = plt.figure()
fig.canvas.set_window_title(self.name)
fig.text(0.95, 0.05, 'asammdf {}'.format(__version__),
fontsize=8, color='red',
ha='right', va='top', alpha=0.5)
if self.comment:
comment = self.comment.replace('$', '')
plt.title('{}\n({})'.format(self.name, comment))
else:
plt.title(self.name)
plt.xlabel('Time [s]')
plt.ylabel('[{}]'.format(self.unit))
plt.plot(self.timestamps, self.samples, 'b')
plt.plot(self.timestamps, self.samples, 'b.')
plt.grid(True)
plt.show()
else:
try:
names = self.samples.dtype.names
if self.samples.dtype.names is None or len(names) == 1:
if names:
samples = self.samples[names[0]]
else:
samples = self.samples
shape = samples.shape[1:]
fig = plt.figure()
fig.canvas.set_window_title(self.name)
fig.text(
0.95,
0.05,
'asammdf {}'.format(__version__),
fontsize=8,
color='red',
ha='right',
va='top',
alpha=0.5,
)
if self.comment:
comment = self.comment.replace('$', '')
plt.title('{}\n({})'.format(self.name, comment))
else:
plt.title(self.name)
ax = fig.add_subplot(111, projection='3d')
# Grab some test data.
X = np.array(range(shape[1]))
Y = np.array(range(shape[0]))
X, Y = np.meshgrid(X, Y)
Z = samples[0]
# Plot a basic wireframe.
self._plot_axis = ax.plot_wireframe(X, Y, Z, rstride=1, cstride=1)
# Place Sliders on Graph
ax_a = plt.axes([0.25, 0.1, 0.65, 0.03])
# Create Sliders & Determine Range
sa = Slider(ax_a,
'Time [s]',
self.timestamps[0],
self.timestamps[-1],
valinit=self.timestamps[0])
def update(val):
self._plot_axis.remove()
idx = np.searchsorted(self.timestamps,
sa.val,
side='right')
Z = samples[idx-1]
self._plot_axis = ax.plot_wireframe(X,
Y,
Z,
rstride=1,
cstride=1)
fig.canvas.draw_idle()
sa.on_changed(update)
plt.show()
else:
fig = plt.figure()
fig.canvas.set_window_title(self.name)
fig.text(0.95, 0.05, 'asammdf {}'.format(__version__),
fontsize=8, color='red',
ha='right', va='top', alpha=0.5)
if self.comment:
comment = self.comment.replace('$', '')
plt.title('{}\n({})'.format(self.name, comment))
else:
plt.title(self.name)
ax = fig.add_subplot(111, projection='3d')
samples = self.samples[names[0]]
axis1 = self.samples[names[1]]
axis2 = self.samples[names[2]]
# Grab some test data.
X, Y = np.meshgrid(axis2[0], axis1[0])
Z = samples[0]
# Plot a basic wireframe.
self._plot_axis = ax.plot_wireframe(X, Y, Z, rstride=1, cstride=1)
# Place Sliders on Graph
ax_a = plt.axes([0.25, 0.1, 0.65, 0.03])
# Create Sliders & Determine Range
sa = Slider(ax_a,
'Time [s]',
self.timestamps[0],
self.timestamps[-1],
valinit=self.timestamps[0])
def update(val):
self._plot_axis.remove()
idx = np.searchsorted(self.timestamps,
sa.val,
side='right')
Z = samples[idx-1]
X, Y = np.meshgrid(axis2[idx-1], axis1[idx-1])
self._plot_axis = ax.plot_wireframe(X,
Y,
Z,
rstride=1,
cstride=1)
fig.canvas.draw_idle()
sa.on_changed(update)
plt.show()
except Exception as err:
print(err)
[docs] def cut(self, start=None, stop=None):
"""
Cuts the signal according to the *start* and *stop* values, by using
the insertion indexes in the signal's *time* axis.
Parameters
----------
start : float
start timestamp for cutting
stop : float
stop timestamp for cutting
Returns
-------
result : Signal
new *Signal* cut from the original
Examples
--------
>>> new_sig = old_sig.cut(1.0, 10.5)
>>> new_sig.timestamps[0], new_sig.timestamps[-1]
0.98, 10.48
"""
if start is None and stop is None:
# return the channel uncut
result = self
else:
if start is None:
# cut from beggining to stop
stop = np.searchsorted(self.timestamps, stop, side='right')
if stop:
result = Signal(self.samples[: stop],
self.timestamps[:stop],
self.unit,
self.name,
self.info,
self.comment)
else:
result = Signal(np.array([]),
np.array([]),
self.unit,
self.name,
self.info,
self.comment)
elif stop is None:
# cut from start to end
start = np.searchsorted(self.timestamps, start, side='left')
result = Signal(self.samples[start: ],
self.timestamps[start: ],
self.unit,
self.name,
self.info,
self.comment)
else:
# cut between start and stop
start_ = np.searchsorted(self.timestamps, start, side='left')
stop_ = np.searchsorted(self.timestamps, stop, side='right')
if stop_ == start_:
if len(self.timestamps) and \
stop >= self.timestamps[0] and \
start <= self.timestamps[-1]:
# start and stop are found between 2 signal samples
# so return the previous sample
result = Signal(self.samples[start_: start_ + 1],
self.timestamps[start_: start_ + 1],
self.unit,
self.name,
self.info,
self.comment)
else:
# signal is empty or start and stop are outside the
# signal time base
result = Signal(np.array([]),
np.array([]),
self.unit,
self.name,
self.info,
self.comment)
else:
result = Signal(self.samples[start_: stop_],
self.timestamps[start_: stop_],
self.unit,
self.name,
self.info,
self.comment)
return result
[docs] def extend(self, other):
""" extend signal with samples from another signal
Parameters
----------
other : Signal
"""
if len(self.timestamps):
last_stamp = self.timestamps[-1]
delta = last_stamp / len(self) + last_stamp
else:
last_stamp = 0
delta = 0
if len(other):
other_first_sample = other.timestamps[0]
if last_stamp >= other_first_sample:
timestamps = other.timestamps + delta - other_first_sample
else:
timestamps = other.timestamps
result = Signal(np.append(self.samples, other.samples),
np.append(self.timestamps, timestamps),
self.unit,
self.name,
self.info,
self.comment)
else:
result = self
return result
[docs] def interp(self, new_timestamps):
""" returns a new *Signal* interpolated using the *new_timestamps* """
if self.samples.dtype.kind == 'f':
s = np.interp(new_timestamps, self.timestamps, self.samples)
else:
idx = np.searchsorted(self.timestamps,
new_timestamps,
side='right')
idx -= 1
idx = np.clip(idx, 0, idx[-1])
s = self.samples[idx]
return Signal(s, new_timestamps, self.unit, self.name, self.info)
def __apply_func(self, other, func_name):
if isinstance(other, Signal):
time = np.union1d(self.timestamps, other.timestamps)
s = self.interp(time).samples
o = other.interp(time).samples
func = getattr(s, func_name)
s = func(o)
elif other is None:
s = self.samples
time = self.timestamps
else:
func = getattr(self.samples, func_name)
s = func(other)
time = self.timestamps
return Signal(s,
time,
self.unit,
self.name,
self.info)
def __pos__(self):
return self
def __neg__(self):
return Signal(np.negative(self.samples),
self.timestamps,
self.unit,
self.name,
self.info)
def __round__(self, n):
return Signal(np.around(self.samples, n),
self.timestamps,
self.unit,
self.name,
self.info)
def __sub__(self, other):
return self.__apply_func(other, '__sub__')
def __isub__(self, other):
return self.__sub__(other)
def __rsub__(self, other):
return -self.__sub__(other)
def __add__(self, other):
return self.__apply_func(other, '__add__')
def __iadd__(self, other):
return self.__add__(other)
def __radd__(self, other):
return self.__add__(other)
def __mul__(self, other):
return self.__apply_func(other, '__mul__')
def __imul__(self, other):
return self.__mul__(other)
def __rmul__(self, other):
return self.__mul__(other)
def __truediv__(self, other):
return self.__apply_func(other, '__truediv__')
def __itruediv__(self, other):
return self.__truediv__(other)
def __rtruediv__(self, other):
return self.__apply_func(other, '__rtruediv__')
def __mod__(self, other):
return self.__apply_func(other, '__mod__')
def __pow__(self, other):
return self.__apply_func(other, '__pow__')
def __and__(self, other):
return self.__apply_func(other, '__and__')
def __or__(self, other):
return self.__apply_func(other, '__or__')
def __xor__(self, other):
return self.__apply_func(other, '__xor__')
def __invert__(self):
s = ~self.samples
time = self.timestamps
return Signal(s,
time,
self.unit,
self.name,
self.info)
def __lshift__(self, other):
return self.__apply_func(other, '__lshift__')
def __rshift__(self, other):
return self.__apply_func(other, '__rshift__')
def __lt__(self, other):
return self.__apply_func(other, '__lt__')
def __le__(self, other):
return self.__apply_func(other, '__le__')
def __gt__(self, other):
return self.__apply_func(other, '__gt__')
def __ge__(self, other):
return self.__apply_func(other, '__ge__')
def __eq__(self, other):
return self.__apply_func(other, '__eq__')
def __ne__(self, other):
return self.__apply_func(other, '__ne__')
def __iter__(self):
return zip(self.samples, self.timestamps)
def __reversed__(self):
return enumerate(zip(reversed(self.samples), reversed(self.timestamps)))
def __len__(self):
return len(self.samples)
def __abs__(self):
return Signal(np.fabs(self.samples),
self.timestamps,
self.unit,
self.name,
self.info)
def __getitem__(self, val):
return self.samples[val]
def __setitem__(self, idx, val):
self.samples[idx] = val
[docs] def astype(self, np_type):
""" returns new *Signal* with samples of dtype *np_type*"""
return Signal(self.samples.astype(np_type),
self.timestamps,
self.unit,
self.name,
self.info)
if __name__ == '__main__':
pass