Source code for asammdf.signal

# -*- coding: utf-8 -*-
""" asammdf *Signal* class module for time correct signal processing """

from __future__ import annotations

from collections.abc import Iterator
import logging
from textwrap import fill
from traceback import format_exc
from typing import Any

import numpy as np
from numpy.core.defchararray import encode
from numpy.typing import ArrayLike, DTypeLike, NDArray

from .blocks import v2_v3_blocks as v3b
from .blocks import v4_blocks as v4b
from .blocks.conversion_utils import from_dict
from .blocks.options import FloatInterpolation, IntegerInterpolation
from .blocks.source_utils import Source
from .blocks.utils import extract_xml_comment, MdfException, SignalFlags
from .types import (
    ChannelConversionType,
    FloatInterpolationModeType,
    IntInterpolationModeType,
    SourceType,
    SyncType,
)
from .version import __version__

logger = logging.getLogger("asammdf")


[docs]class Signal(object): """ The *Signal* represents a channel described by it's samples and timestamps. It can perform arithmetic operations against other *Signal* or numeric types. The operations are computed in respect to the timestamps (time correct). The non-float signals are not interpolated, instead the last value relative to the current timestamp is used. *samples*, *timestamps* and *name* are mandatory arguments. Parameters ---------- samples : numpy.array | list | tuple signal samples timestamps : numpy.array | list | tuple signal timestamps unit : str signal unit name : str signal name conversion : dict | channel conversion block dict that contains extra conversion information about the signal , default *None* comment : str signal comment, default '' raw : bool signal samples are raw values, with no physical conversion applied master_metadata : list master name and sync type display_names : dict display names used by mdf version 3 attachment : bytes, name channel attachment and name from MDF version 4 source : Source source information named tuple bit_count : int bit count; useful for integer channels invalidation_bits : numpy.array | None channel invalidation bits, default *None* encoding : str | None encoding for string signals; default *None* flags : Signal.Flags flags for user defined attributes and stream sync """ Flags = SignalFlags def __init__( self, samples: ArrayLike | None = None, timestamps: ArrayLike | None = None, unit: str = "", name: str = "", conversion: dict[str, Any] | ChannelConversionType | None = None, comment: str = "", raw: bool = True, master_metadata: tuple[str, SyncType] | None = None, display_names: dict[str, str] | None = None, attachment: tuple[bytes, str | None, str | None] | None = None, source: SourceType | None = None, bit_count: int | None = None, invalidation_bits: ArrayLike | None = None, encoding: str | None = None, group_index: int = -1, channel_index: int = -1, flags: Flags = Flags.no_flags, ) -> None: if samples is None or timestamps is None or not name: message = ( '"samples", "timestamps" and "name" are mandatory ' f"for Signal class __init__: samples={samples}\n" f"timestamps={timestamps}\nname={name}" ) raise MdfException(message) else: if not isinstance(samples, np.ndarray): samples = np.array(samples) if samples.dtype.kind == "U": if encoding is None: encodings = ["utf-8", "latin-1"] else: encodings = [encoding, "utf-8", "latin-1"] for encoding in encodings: try: samples = encode(samples, encoding) break except: continue else: samples = encode(samples, encodings[0], errors="ignore") if not isinstance(timestamps, np.ndarray): timestamps = np.array(timestamps, dtype=np.float64) if samples.shape[0] != timestamps.shape[0]: message = "{} samples and timestamps length mismatch ({} vs {})" message = message.format(name, samples.shape[0], timestamps.shape[0]) logger.exception(message) raise MdfException(message) self.samples = samples self.timestamps = timestamps self.unit = unit self.name = name self.comment = comment self.flags = flags self._plot_axis = None self.raw = raw self.master_metadata = master_metadata self.display_names = display_names or {} self.attachment = attachment self.encoding = encoding self.group_index = group_index self.channel_index = channel_index if source: if not isinstance(source, Source): source = Source.from_source(source) self.source = source if bit_count is None: self.bit_count = samples.dtype.itemsize * 8 else: self.bit_count = bit_count if invalidation_bits is not None: if not isinstance(invalidation_bits, np.ndarray): invalidation_bits = np.array(invalidation_bits) if invalidation_bits.shape[0] != samples.shape[0]: message = ( "{} samples and invalidation bits length mismatch ({} vs {})" ) message = message.format( name, samples.shape[0], invalidation_bits.shape[0] ) logger.exception(message) raise MdfException(message) self.invalidation_bits = invalidation_bits if conversion: if not isinstance( conversion, (v4b.ChannelConversion, v3b.ChannelConversion) ): conversion = from_dict(conversion) self.conversion = conversion def __repr__(self): return f"""<Signal {self.name}: \tsamples={self.samples} \ttimestamps={self.timestamps} \tinvalidation_bits={self.invalidation_bits} \tunit="{self.unit}" \tconversion={self.conversion} \tsource={self.source} \tcomment="{self.comment}" \tflags="{self.flags}" \tmastermeta="{self.master_metadata}" \traw={self.raw} \tdisplay_names={self.display_names} \tattachment={self.attachment}> """
[docs] def plot(self, validate: bool = True, index_only: bool = False) -> None: """plot Signal samples. Pyqtgraph is used if it is available; in this case see the GUI plot documentation to see the available commands Parameters ---------- validate (True): bool apply the invalidation bits index_only (False) : bool use index based X axis. This can be useful if the master (usually time based) is corrupted with NaN, inf or if it is not strictly increasing """ try: from .gui.plot import plot plot(self, validate=True, index_only=False) return except: print(format_exc()) try: import matplotlib.pyplot as plt from matplotlib.widgets import Slider except ImportError: logging.warning("Signal plotting requires pyqtgraph or matplotlib") return if len(self.samples.shape) <= 1 and self.samples.dtype.names is None: fig = plt.figure() fig.canvas.manager.set_window_title(self.name) fig.text( 0.95, 0.05, f"asammdf {__version__}", fontsize=8, color="red", ha="right", va="top", alpha=0.5, ) name = self.name if self.comment: comment = self.comment.replace("$", "") comment = extract_xml_comment(comment) comment = fill(comment, 120).replace("\\n", " ") title = f"{name}\n({comment})" plt.title(title) else: plt.title(name) try: if not self.master_metadata: plt.xlabel("Time [s]") plt.ylabel(f"[{self.unit}]") plt.plot(self.timestamps, self.samples, "b") plt.plot(self.timestamps, self.samples, "b.") plt.grid(True) plt.show() else: master_name, sync_type = self.master_metadata if sync_type in (0, 1): plt.xlabel(f"{master_name} [s]") elif sync_type == 2: plt.xlabel(f"{master_name} [deg]") elif sync_type == 3: plt.xlabel(f"{master_name} [m]") elif sync_type == 4: plt.xlabel(f"{master_name} [index]") plt.ylabel(f"[{self.unit}]") plt.plot(self.timestamps, self.samples, "b") plt.plot(self.timestamps, self.samples, "b.") plt.grid(True) plt.show() except ValueError: plt.close(fig) else: try: names = self.samples.dtype.names if self.samples.dtype.names is None or len(names) == 1: if names: samples = self.samples[names[0]] else: samples = self.samples shape = samples.shape[1:] fig = plt.figure() fig.canvas.set_window_title(self.name) fig.text( 0.95, 0.05, f"asammdf {__version__}", fontsize=8, color="red", ha="right", va="top", alpha=0.5, ) if self.comment: comment = self.comment.replace("$", "") plt.title(f"{self.name}\n({comment})") else: plt.title(self.name) ax = fig.add_subplot(111, projection="3d") # Grab some test data. X = np.array(range(shape[1])) Y = np.array(range(shape[0])) X, Y = np.meshgrid(X, Y) Z = samples[0] # Plot a basic wireframe. self._plot_axis = ax.plot_wireframe(X, Y, Z, rstride=1, cstride=1) # Place Sliders on Graph ax_a = plt.axes([0.25, 0.1, 0.65, 0.03]) # Create Sliders & Determine Range sa = Slider( ax_a, "Time [s]", self.timestamps[0], self.timestamps[-1], valinit=self.timestamps[0], ) def update(val): self._plot_axis.remove() idx = np.searchsorted(self.timestamps, sa.val, side="right") Z = samples[idx - 1] self._plot_axis = ax.plot_wireframe( X, Y, Z, rstride=1, cstride=1 ) fig.canvas.draw_idle() sa.on_changed(update) plt.show() else: fig = plt.figure() fig.canvas.set_window_title(self.name) fig.text( 0.95, 0.05, f"asammdf {__version__}", fontsize=8, color="red", ha="right", va="top", alpha=0.5, ) if self.comment: comment = self.comment.replace("$", "") plt.title(f"{self.name}\n({comment})") else: plt.title(self.name) ax = fig.add_subplot(111, projection="3d") samples = self.samples[names[0]] axis1 = self.samples[names[1]] axis2 = self.samples[names[2]] # Grab some test data. X, Y = np.meshgrid(axis2[0], axis1[0]) Z = samples[0] # Plot a basic wireframe. self._plot_axis = ax.plot_wireframe(X, Y, Z, rstride=1, cstride=1) # Place Sliders on Graph ax_a = plt.axes([0.25, 0.1, 0.65, 0.03]) # Create Sliders & Determine Range sa = Slider( ax_a, "Time [s]", self.timestamps[0], self.timestamps[-1], valinit=self.timestamps[0], ) def update(val): self._plot_axis.remove() idx = np.searchsorted(self.timestamps, sa.val, side="right") Z = samples[idx - 1] X, Y = np.meshgrid(axis2[idx - 1], axis1[idx - 1]) self._plot_axis = ax.plot_wireframe( X, Y, Z, rstride=1, cstride=1 ) fig.canvas.draw_idle() sa.on_changed(update) plt.show() except Exception as err: print(err)
[docs] def cut( self, start: float | None = None, stop: float | None = None, include_ends: bool = True, integer_interpolation_mode: IntInterpolationModeType | IntegerInterpolation = IntegerInterpolation.REPEAT_PREVIOUS_SAMPLE, float_interpolation_mode: FloatInterpolationModeType | FloatInterpolation = FloatInterpolation.LINEAR_INTERPOLATION, ) -> Signal: """ Cuts the signal according to the *start* and *stop* values, by using the insertion indexes in the signal's *time* axis. Parameters ---------- start : float start timestamp for cutting stop : float stop timestamp for cutting include_ends : bool include the *start* and *stop* timestamps after cutting the signal. If *start* and *stop* are found in the original timestamps, then the new samples will be computed using interpolation. Default *True* integer_interpolation_mode : int interpolation mode for integer signals; default 0 * 0 - repeat previous samples * 1 - linear interpolation * 2 - hybrid interpolation: channels with integer data type (raw values) that have a conversion that outputs float values will use linear interpolation, otherwise the previous sample is used .. versionadded:: 6.2.0 float_interpolation_mode : int interpolation mode for float channels; default 1 * 0 - repeat previous sample * 1 - use linear interpolation .. versionadded:: 6.2.0 Returns ------- result : Signal new *Signal* cut from the original Examples -------- >>> new_sig = old_sig.cut(1.0, 10.5) >>> new_sig.timestamps[0], new_sig.timestamps[-1] 0.98, 10.48 """ integer_interpolation_mode = IntegerInterpolation(integer_interpolation_mode) float_interpolation_mode = FloatInterpolation(float_interpolation_mode) original_start, original_stop = (start, stop) if self.samples.size == 0: result = Signal( np.array([], dtype=self.samples.dtype), np.array([], dtype=self.timestamps.dtype), self.unit, self.name, self.conversion, self.comment, self.raw, self.master_metadata, self.display_names, self.attachment, self.source, self.bit_count, encoding=self.encoding, group_index=self.group_index, channel_index=self.channel_index, flags=self.flags, ) elif start is None and stop is None: # return the channel uncut result = Signal( self.samples.copy(), self.timestamps.copy(), self.unit, self.name, self.conversion, self.comment, self.raw, self.master_metadata, self.display_names, self.attachment, self.source, self.bit_count, invalidation_bits=self.invalidation_bits.copy() if self.invalidation_bits is not None else None, encoding=self.encoding, group_index=self.group_index, channel_index=self.channel_index, flags=self.flags, ) else: if start is None: # cut from begining to stop if stop < self.timestamps[0]: result = Signal( np.array([], dtype=self.samples.dtype), np.array([], dtype=self.timestamps.dtype), self.unit, self.name, self.conversion, self.comment, self.raw, self.master_metadata, self.display_names, self.attachment, self.source, self.bit_count, encoding=self.encoding, group_index=self.group_index, channel_index=self.channel_index, flags=self.flags, ) else: stop = np.searchsorted(self.timestamps, stop, side="right") if ( include_ends and original_stop not in self.timestamps and original_stop < self.timestamps[-1] ): interpolated = self.interp( [original_stop], integer_interpolation_mode=integer_interpolation_mode, float_interpolation_mode=float_interpolation_mode, ) if len(interpolated): samples = np.append( self.samples[:stop], interpolated.samples, axis=0 ) timestamps = np.append( self.timestamps[:stop], interpolated.timestamps ) if self.invalidation_bits is not None: invalidation_bits = np.append( self.invalidation_bits[:stop], interpolated.invalidation_bits, ) else: invalidation_bits = None else: samples = self.samples[:stop].copy() timestamps = self.timestamps[:stop].copy() if self.invalidation_bits is not None: invalidation_bits = self.invalidation_bits[:stop].copy() else: invalidation_bits = None if samples.dtype != self.samples.dtype: samples = samples.astype(self.samples.dtype) result = Signal( samples, timestamps, self.unit, self.name, self.conversion, self.comment, self.raw, self.master_metadata, self.display_names, self.attachment, self.source, self.bit_count, invalidation_bits=invalidation_bits, encoding=self.encoding, group_index=self.group_index, channel_index=self.channel_index, flags=self.flags, ) elif stop is None: # cut from start to end if start > self.timestamps[-1]: result = Signal( np.array([], dtype=self.samples.dtype), np.array([], dtype=self.timestamps.dtype), self.unit, self.name, self.conversion, self.comment, self.raw, self.master_metadata, self.display_names, self.attachment, self.source, self.bit_count, encoding=self.encoding, group_index=self.group_index, channel_index=self.channel_index, flags=self.flags, ) else: start = np.searchsorted(self.timestamps, start, side="left") if ( include_ends and original_start not in self.timestamps and original_start > self.timestamps[0] ): interpolated = self.interp( [original_start], integer_interpolation_mode=integer_interpolation_mode, float_interpolation_mode=float_interpolation_mode, ) if len(interpolated): samples = np.append( interpolated.samples, self.samples[start:], axis=0 ) timestamps = np.append( interpolated.timestamps, self.timestamps[start:] ) if self.invalidation_bits is not None: invalidation_bits = np.append( interpolated.invalidation_bits, self.invalidation_bits[start:], ) else: invalidation_bits = None else: samples = self.samples[start:].copy() timestamps = self.timestamps[start:].copy() if self.invalidation_bits is not None: invalidation_bits = self.invalidation_bits[start:].copy() else: invalidation_bits = None if samples.dtype != self.samples.dtype: samples = samples.astype(self.samples.dtype) result = Signal( samples, timestamps, self.unit, self.name, self.conversion, self.comment, self.raw, self.master_metadata, self.display_names, self.attachment, self.source, self.bit_count, invalidation_bits=invalidation_bits, encoding=self.encoding, group_index=self.group_index, channel_index=self.channel_index, flags=self.flags, ) else: # cut between start and stop if start > self.timestamps[-1] or stop < self.timestamps[0]: result = Signal( np.array([], dtype=self.samples.dtype), np.array([], dtype=self.timestamps.dtype), self.unit, self.name, self.conversion, self.comment, self.raw, self.master_metadata, self.display_names, self.attachment, self.source, self.bit_count, encoding=self.encoding, group_index=self.group_index, channel_index=self.channel_index, flags=self.flags, ) else: start = np.searchsorted(self.timestamps, start, side="left") stop = np.searchsorted(self.timestamps, stop, side="right") if start == stop: if include_ends: if original_start == original_stop: ends = np.array( [original_start], dtype=self.timestamps.dtype ) else: ends = np.array( [original_start, original_stop], dtype=self.timestamps.dtype, ) interpolated = self.interp( ends, integer_interpolation_mode=integer_interpolation_mode, float_interpolation_mode=float_interpolation_mode, ) samples = interpolated.samples timestamps = ends invalidation_bits = interpolated.invalidation_bits else: samples = np.array([], dtype=self.samples.dtype) timestamps = np.array([], dtype=self.timestamps.dtype) if self.invalidation_bits is not None: invalidation_bits = np.array([], dtype=bool) else: invalidation_bits = None else: samples = self.samples[start:stop].copy() timestamps = self.timestamps[start:stop].copy() if self.invalidation_bits is not None: invalidation_bits = self.invalidation_bits[ start:stop ].copy() else: invalidation_bits = None if ( include_ends and original_stop not in self.timestamps and original_stop < self.timestamps[-1] ): interpolated = self.interp( [original_stop], integer_interpolation_mode=integer_interpolation_mode, float_interpolation_mode=float_interpolation_mode, ) if len(interpolated): samples = np.append( samples, interpolated.samples, axis=0 ) timestamps = np.append( timestamps, interpolated.timestamps ) if invalidation_bits is not None: invalidation_bits = np.append( invalidation_bits, interpolated.invalidation_bits, ) if ( include_ends and original_start not in self.timestamps and original_start > self.timestamps[0] ): interpolated = self.interp( [original_start], integer_interpolation_mode=integer_interpolation_mode, float_interpolation_mode=float_interpolation_mode, ) if len(interpolated): samples = np.append( interpolated.samples, samples, axis=0 ) timestamps = np.append( interpolated.timestamps, timestamps ) if invalidation_bits is not None: invalidation_bits = np.append( interpolated.invalidation_bits, invalidation_bits, ) if samples.dtype != self.samples.dtype: samples = samples.astype(self.samples.dtype) result = Signal( samples, timestamps, self.unit, self.name, self.conversion, self.comment, self.raw, self.master_metadata, self.display_names, self.attachment, self.source, self.bit_count, invalidation_bits=invalidation_bits, encoding=self.encoding, group_index=self.group_index, channel_index=self.channel_index, flags=self.flags, ) return result
[docs] def extend(self, other: Signal) -> Signal: """extend signal with samples from another signal Parameters ---------- other : Signal Returns ------- signal : Signal new extended *Signal* """ if len(self.timestamps): last_stamp = self.timestamps[-1] else: last_stamp = 0 if len(other): other_first_sample = other.timestamps[0] if last_stamp >= other_first_sample: timestamps = other.timestamps + last_stamp else: timestamps = other.timestamps if self.invalidation_bits is None and other.invalidation_bits is None: invalidation_bits = None elif self.invalidation_bits is None and other.invalidation_bits is not None: invalidation_bits = np.concatenate( (np.zeros(len(self), dtype=bool), other.invalidation_bits) ) elif self.invalidation_bits is not None and other.invalidation_bits is None: invalidation_bits = np.concatenate( (self.invalidation_bits, np.zeros(len(other), dtype=bool)) ) else: invalidation_bits = np.append( self.invalidation_bits, other.invalidation_bits ) result = Signal( np.append(self.samples, other.samples, axis=0), np.append(self.timestamps, timestamps), self.unit, self.name, self.conversion, self.comment, self.raw, self.master_metadata, self.display_names, self.attachment, self.source, self.bit_count, invalidation_bits=invalidation_bits, encoding=self.encoding, group_index=self.group_index, channel_index=self.channel_index, flags=self.flags, ) else: result = self return result
[docs] def interp( self, new_timestamps: NDArray[Any], integer_interpolation_mode: IntInterpolationModeType | IntegerInterpolation = IntegerInterpolation.REPEAT_PREVIOUS_SAMPLE, float_interpolation_mode: FloatInterpolationModeType | FloatInterpolation = FloatInterpolation.LINEAR_INTERPOLATION, ) -> Signal: """returns a new *Signal* interpolated using the *new_timestamps* Parameters ---------- new_timestamps : np.array timestamps used for interpolation integer_interpolation_mode : int interpolation mode for integer signals; default 0 * 0 - repeat previous samples * 1 - linear interpolation * 2 - hybrid interpolation: channels with integer data type (raw values) that have a conversion that outputs float values will use linear interpolation, otherwise the previous sample is used .. versionadded:: 6.2.0 float_interpolation_mode : int interpolation mode for float channels; default 1 * 0 - repeat previous sample * 1 - use linear interpolation .. versionadded:: 6.2.0 Returns ------- signal : Signal new interpolated *Signal* """ integer_interpolation_mode = IntegerInterpolation(integer_interpolation_mode) float_interpolation_mode = FloatInterpolation(float_interpolation_mode) if not len(self.samples) or not len(new_timestamps): return Signal( self.samples[:0].copy(), self.timestamps[:0].copy(), self.unit, self.name, comment=self.comment, conversion=self.conversion, raw=self.raw, master_metadata=self.master_metadata, display_names=self.display_names, attachment=self.attachment, invalidation_bits=None, encoding=self.encoding, group_index=self.group_index, channel_index=self.channel_index, flags=self.flags, ) else: # # we need to validate first otherwise we can get false invalid data # # if the new timebase and the invalidation bits are aligned in an # # infavorable way # # if self.invalidation_bits is not None: # signal = self.validate() # has_invalidation = True # else: # signal = self # has_invalidation = False signal = self invalidation_bits = signal.invalidation_bits if not len(signal.samples): return Signal( self.samples[:0].copy(), self.timestamps[:0].copy(), self.unit, self.name, comment=self.comment, conversion=self.conversion, raw=self.raw, master_metadata=self.master_metadata, display_names=self.display_names, attachment=self.attachment, invalidation_bits=None if invalidation_bits is None else np.array([], dtype=bool), encoding=self.encoding, group_index=self.group_index, channel_index=self.channel_index, flags=self.flags, ) if len(signal.samples.shape) > 1: idx = np.searchsorted(signal.timestamps, new_timestamps, side="right") idx -= 1 idx[idx < 0] = 0 s = signal.samples[idx] if invalidation_bits is not None: invalidation_bits = invalidation_bits[idx] else: kind = signal.samples.dtype.kind if kind == "f": if ( float_interpolation_mode == FloatInterpolation.REPEAT_PREVIOUS_SAMPLE ): idx = np.searchsorted( signal.timestamps, new_timestamps, side="right" ) idx -= 1 idx[idx < 0] = 0 s = signal.samples[idx] if invalidation_bits is not None: invalidation_bits = invalidation_bits[idx] else: s = np.interp(new_timestamps, signal.timestamps, signal.samples) if invalidation_bits is not None: idx = np.searchsorted( signal.timestamps, new_timestamps, side="right" ) idx -= 1 idx[idx < 0] = 0 invalidation_bits = invalidation_bits[idx] elif kind in "ui": if ( integer_interpolation_mode == IntegerInterpolation.HYBRID_INTERPOLATION ): if signal.raw and signal.conversion: kind = signal.conversion.convert( signal.samples[:1] ).dtype.kind if kind == "f": integer_interpolation_mode = ( IntegerInterpolation.LINEAR_INTERPOLATION ) if ( integer_interpolation_mode == IntegerInterpolation.HYBRID_INTERPOLATION ): integer_interpolation_mode = ( IntegerInterpolation.REPEAT_PREVIOUS_SAMPLE ) if ( integer_interpolation_mode == IntegerInterpolation.LINEAR_INTERPOLATION ): s = np.interp( new_timestamps, signal.timestamps, signal.samples ).astype(signal.samples.dtype) if invalidation_bits is not None: idx = np.searchsorted( signal.timestamps, new_timestamps, side="right" ) idx -= 1 idx[idx < 0] = 0 invalidation_bits = invalidation_bits[idx] elif ( integer_interpolation_mode == IntegerInterpolation.REPEAT_PREVIOUS_SAMPLE ): idx = np.searchsorted( signal.timestamps, new_timestamps, side="right" ) idx -= 1 idx[idx < 0] = 0 s = signal.samples[idx] if invalidation_bits is not None: invalidation_bits = invalidation_bits[idx] else: idx = np.searchsorted( signal.timestamps, new_timestamps, side="right" ) idx -= 1 idx[idx < 0] = 0 s = signal.samples[idx] if invalidation_bits is not None: invalidation_bits = invalidation_bits[idx] if s.dtype != self.samples.dtype: s = s.astype(self.samples.dtype) return Signal( s, new_timestamps, self.unit, self.name, comment=self.comment, conversion=self.conversion, source=self.source, raw=self.raw, master_metadata=self.master_metadata, display_names=self.display_names, attachment=self.attachment, invalidation_bits=invalidation_bits, encoding=self.encoding, group_index=self.group_index, channel_index=self.channel_index, flags=self.flags, )
def __apply_func( self, other: Signal | NDArray[Any] | None, func_name: str ) -> Signal: """delegate operations to the *samples* attribute, but in a time correct manner by considering the *timestamps* """ if isinstance(other, Signal): if len(self) and len(other): start = max(self.timestamps[0], other.timestamps[0]) stop = min(self.timestamps[-1], other.timestamps[-1]) s1 = self.physical().cut(start, stop) s2 = other.physical().cut(start, stop) else: s1 = self s2 = other time = np.union1d(s1.timestamps, s2.timestamps) s = s1.interp(time).samples o = s2.interp(time).samples func = getattr(s, func_name) conversion = None s = func(o) elif other is None: s = self.samples conversion = self.conversion time = self.timestamps else: func = getattr(self.samples, func_name) s = func(other) conversion = self.conversion time = self.timestamps return Signal( samples=s, timestamps=time, unit=self.unit, name=self.name, conversion=conversion, raw=self.raw, master_metadata=self.master_metadata, display_names=self.display_names, attachment=self.attachment, invalidation_bits=self.invalidation_bits, source=self.source, encoding=self.encoding, group_index=self.group_index, channel_index=self.channel_index, flags=self.flags, ) def __pos__(self) -> Signal: return self def __neg__(self) -> Signal: return Signal( np.negative(self.samples), self.timestamps, unit=self.unit, name=self.name, conversion=self.conversion, raw=self.raw, master_metadata=self.master_metadata, display_names=self.display_names, attachment=self.attachment, invalidation_bits=self.invalidation_bits, source=self.source, encoding=self.encoding, flags=self.flags, ) def __round__(self, n: int) -> Signal: return Signal( np.around(self.samples, n), self.timestamps, unit=self.unit, name=self.name, conversion=self.conversion, raw=self.raw, master_metadata=self.master_metadata, display_names=self.display_names, attachment=self.attachment, invalidation_bits=self.invalidation_bits, source=self.source, encoding=self.encoding, flags=self.flags, ) def __sub__(self, other: Signal | NDArray[Any] | None) -> Signal: return self.__apply_func(other, "__sub__") def __isub__(self, other: Signal | NDArray[Any] | None) -> Signal: return self.__sub__(other) def __rsub__(self, other: Signal | NDArray[Any] | None) -> Signal: return -self.__sub__(other) def __add__(self, other: Signal | NDArray[Any] | None) -> Signal: return self.__apply_func(other, "__add__") def __iadd__(self, other: Signal | NDArray[Any] | None) -> Signal: return self.__add__(other) def __radd__(self, other: Signal | NDArray[Any] | None) -> Signal: return self.__add__(other) def __truediv__(self, other: Signal | NDArray[Any] | None) -> Signal: return self.__apply_func(other, "__truediv__") def __itruediv__(self, other: Signal | NDArray[Any] | None) -> Signal: return self.__truediv__(other) def __rtruediv__(self, other: Signal | NDArray[Any] | None) -> Signal: return self.__apply_func(other, "__rtruediv__") def __mul__(self, other: Signal | NDArray[Any] | None) -> Signal: return self.__apply_func(other, "__mul__") def __imul__(self, other: Signal | NDArray[Any] | None) -> Signal: return self.__mul__(other) def __rmul__(self, other: Signal | NDArray[Any] | None) -> Signal: return self.__mul__(other) def __floordiv__(self, other: Signal | NDArray[Any] | None) -> Signal: return self.__apply_func(other, "__floordiv__") def __ifloordiv__(self, other: Signal | NDArray[Any] | None) -> Signal: return self.__truediv__(other) def __rfloordiv__(self, other: Signal | NDArray[Any] | None) -> Signal: return 1 / self.__apply_func(other, "__rfloordiv__") def __mod__(self, other: Signal | NDArray[Any] | None) -> Signal: return self.__apply_func(other, "__mod__") def __pow__(self, other: Signal | NDArray[Any] | None) -> Signal: return self.__apply_func(other, "__pow__") def __and__(self, other: Signal | NDArray[Any] | None) -> Signal: return self.__apply_func(other, "__and__") def __or__(self, other: Signal | NDArray[Any] | None) -> Signal: return self.__apply_func(other, "__or__") def __xor__(self, other: Signal | NDArray[Any] | None) -> Signal: return self.__apply_func(other, "__xor__") def __invert__(self) -> Signal: s = ~self.samples time = self.timestamps return Signal( s, time, unit=self.unit, name=self.name, conversion=self.conversion, raw=self.raw, master_metadata=self.master_metadata, display_names=self.display_names, attachment=self.attachment, invalidation_bits=self.invalidation_bits, source=self.source, encoding=self.encoding, flags=self.flags, ) def __lshift__(self, other: Signal | NDArray[Any] | None) -> Signal: return self.__apply_func(other, "__lshift__") def __rshift__(self, other: Signal | NDArray[Any] | None) -> Signal: return self.__apply_func(other, "__rshift__") def __lt__(self, other: Signal | NDArray[Any] | None) -> bool: return self.__apply_func(other, "__lt__") def __le__(self, other: Signal | NDArray[Any] | None) -> bool: return self.__apply_func(other, "__le__") def __gt__(self, other: Signal | NDArray[Any] | None) -> bool: return self.__apply_func(other, "__gt__") def __ge__(self, other: Signal | NDArray[Any] | None) -> bool: return self.__apply_func(other, "__ge__") def __eq__(self, other: Signal | NDArray[Any] | None) -> bool: return self.__apply_func(other, "__eq__") def __ne__(self, other: Signal | NDArray[Any] | None) -> bool: return self.__apply_func(other, "__ne__") def __iter__(self) -> Iterator[Any]: for item in (self.samples, self.timestamps, self.unit, self.name): yield item def __reversed__(self) -> Iterator[tuple[int, tuple[Any, Any]]]: return enumerate(zip(reversed(self.samples), reversed(self.timestamps))) def __len__(self) -> int: return len(self.samples) def __abs__(self) -> Signal: return Signal( np.fabs(self.samples), self.timestamps, unit=self.unit, name=self.name, conversion=self.conversion, raw=self.raw, master_metadata=self.master_metadata, display_names=self.display_names, attachment=self.attachment, invalidation_bits=self.invalidation_bits, source=self.source, flags=self.flags, ) def __getitem__(self, val: int) -> Any: return self.samples[val] def __setitem__(self, idx: int, val: Any) -> None: self.samples[idx] = val
[docs] def astype(self, np_type: DTypeLike) -> Signal: """returns new *Signal* with samples of dtype *np_type* Parameters ---------- np_type : np.dtype new numpy dtye Returns ------- signal : Signal new *Signal* with the samples of *np_type* dtype """ return Signal( self.samples.astype(np_type), self.timestamps, unit=self.unit, name=self.name, conversion=self.conversion, raw=self.raw, master_metadata=self.master_metadata, display_names=self.display_names, attachment=self.attachment, invalidation_bits=self.invalidation_bits, source=self.source, encoding=self.encoding, flags=self.flags, )
[docs] def physical(self) -> Signal: """ get the physical samples values Returns ------- phys : Signal new *Signal* with physical values """ if not self.raw or self.conversion is None: samples = self.samples.copy() encoding = None else: samples = self.conversion.convert(self.samples) if samples.dtype.kind == "S": encoding = "utf-8" if self.conversion.id == b"##CC" else "latin-1" else: encoding = None return Signal( samples, self.timestamps.copy(), unit=self.unit, name=self.name, conversion=None, raw=False, master_metadata=self.master_metadata, display_names=self.display_names, attachment=self.attachment, invalidation_bits=self.invalidation_bits, source=self.source, encoding=encoding, group_index=self.group_index, channel_index=self.channel_index, flags=self.flags, )
[docs] def validate(self, copy: bool = True) -> Signal: """appply invalidation bits if they are available for this signal Parameters ---------- copy (True) : bool return a copy of the result .. versionadded:: 5.12.0 """ if self.invalidation_bits is None: signal = self else: idx = np.nonzero(~self.invalidation_bits)[0] signal = Signal( self.samples[idx], self.timestamps[idx], self.unit, self.name, self.conversion, self.comment, self.raw, self.master_metadata, self.display_names, self.attachment, self.source, self.bit_count, invalidation_bits=None, encoding=self.encoding, group_index=self.group_index, channel_index=self.channel_index, flags=self.flags, ) if copy: signal = signal.copy() return signal
[docs] def copy(self) -> Signal: """copy all attributes to a new Signal""" return Signal( self.samples.copy(), self.timestamps.copy(), self.unit, self.name, self.conversion, self.comment, self.raw, self.master_metadata, self.display_names, self.attachment, self.source, self.bit_count, invalidation_bits=self.invalidation_bits.copy() if self.invalidation_bits is not None else None, encoding=self.encoding, group_index=self.group_index, channel_index=self.channel_index, flags=self.flags, )
if __name__ == "__main__": pass