Source code for asammdf.signal

"""asammdf `Signal` class module for time-correct signal processing"""

from collections.abc import Iterator
import logging
from pathlib import Path
from textwrap import fill
from typing import TYPE_CHECKING, Union
import warnings

import numpy as np
from numpy.typing import ArrayLike, DTypeLike, NDArray
from typing_extensions import Any, overload

from .blocks import v2_v3_blocks as v3b
from .blocks import v4_blocks as v4b
from .blocks.conversion_utils import from_dict
from .blocks.options import FloatInterpolation, IntegerInterpolation
from .blocks.source_utils import Source
from .blocks.types import (
    ChannelConversionType,
    FloatInterpolationModeType,
    IntInterpolationModeType,
    SourceType,
)
from .blocks.utils import extract_xml_comment, MdfException, SignalFlags
from .version import __version__

if TYPE_CHECKING:
    from mpl_toolkits.mplot3d.art3d import Line3DCollection

try:
    encode = np.strings.encode
except:
    encode = np.char.encode

logger = logging.getLogger("asammdf")

ORIGIN_UNKNOWN = (-1, -1)


def convert(arr, ignore_value2text_conversions=False):
    fields = arr.dtype.fields

    if fields:
        arrays = []
        types = []

        for name, (dt, offset) in arr.dtype.fields.items():
            converted = convert(arr[name], ignore_value2text_conversions=ignore_value2text_conversions)
            arrays.append(converted)
            types.append((name, converted.dtype, converted.shape[1:]))

        res = np.rec.fromarrays(arrays, dtype=types)
        res = res.view(np.lib.format.drop_metadata(res.dtype))

    else:

        dtype = arr.dtype
        if (metadata := dtype.base.metadata) and "conversion" in metadata:
            conversion = metadata["conversion"]
            if conversion:
                res = conversion.convert(arr, ignore_value2text_conversions=ignore_value2text_conversions)
            else:
                res = arr
        else:
            res = arr

        res = res.view(np.lib.format.drop_metadata(res.dtype))

    return res


def replace_metadata(dtype, name, attribute, value, current_name=""):
    if dtype.fields:
        new_dt = {
            "names": [],
            "formats": [],
            "offsets": [],
        }
        for fname, (dt, offset, *_) in dtype.fields.items():
            new_dt["names"].append(fname)
            new_dt["offsets"].append(offset)
            new_dt["formats"].append(replace_metadata(dt, name, attribute, value, fname))

        if old_metadata := (dtype.metadata or dtype.base.metadata):
            return np.dtype(new_dt, metadata=dict(old_metadata))
        else:
            return np.dtype(new_dt)

    else:
        if current_name and current_name != name:
            return dtype
        else:
            metadata = dict(dtype.metadata or dtype.base.metadata or {})
            metadata[attribute] = value
            return np.dtype(np.lib.format.drop_metadata(dtype), metadata=metadata)


class InvalidationArray(np.ndarray[tuple[int], np.dtype[np.bool]]):
    ORIGIN_UNKNOWN = ORIGIN_UNKNOWN

    def __new__(cls, input_array: ArrayLike, origin: tuple[int, int] = ORIGIN_UNKNOWN) -> "InvalidationArray":
        obj = np.asarray(input_array).view(cls)
        obj.origin = getattr(input_array, "origin", origin)
        return obj

    def __array_finalize__(self, obj: NDArray[np.bool] | None) -> None:
        if obj is None:
            return
        self.origin: tuple[int, int] = getattr(obj, "origin", ORIGIN_UNKNOWN)


[docs] class Signal: # noqa: PLW1641 """The `Signal` represents a channel described by its samples and timestamps. It can perform arithmetic operations against other `Signal` objects or numeric types. The operations are computed in respect to the timestamps (time-correct). The non-float signals are not interpolated, instead the last value relative to the current timestamp is used. `samples`, `timestamps` and `name` are mandatory arguments. Parameters ---------- samples : array-like Signal samples. timestamps : array-like Signal timestamps. unit : str, optional Signal unit. name : str Signal name. conversion : dict | channel conversion block, optional Dict that contains extra conversion information about the signal. comment : str, optional Signal comment. raw : bool, default True Signal samples are raw values, with no physical conversion applied. Deprecated master_metadata : tuple, optional Master name and sync type. display_names : dict, optional Display names used by MDF version 3. attachment : tuple, optional Channel attachment and name from MDF version 4. source : Source, optional Source information named tuple. bit_count : int, optional Bit count; useful for integer channels. invalidation_bits : array-like, optional Channel invalidation bits. encoding : str, optional Encoding for string signals. flags : int, optional Flags for user-defined attributes and stream sync. """ Flags = SignalFlags def __init__( self, samples: ArrayLike, timestamps: ArrayLike, unit: str = "", name: str = "", conversion: dict[str, object] | ChannelConversionType | None = None, comment: str = "", master_metadata: tuple[str, int] | None = None, display_names: dict[str, str] | None = None, attachment: tuple[bytes | str, Path, bytes | str] | tuple[bytes | str, Path, bytes | str, str] | None = None, source: SourceType | None = None, bit_count: int | None = None, invalidation_bits: ArrayLike | None = None, encoding: str | None = None, group_index: int = -1, channel_index: int = -1, flags: int = Flags.no_flags, virtual_conversion: dict[str, object] | ChannelConversionType | None = None, virtual_master_conversion: dict[str, object] | ChannelConversionType | None = None, **kwargs, ) -> None: if "raw" in kwargs: warnings.warn("the 'raw' argument for the Signal class has been removed") if not name: message = ( '"samples", "timestamps" and "name" are mandatory ' f"for Signal class __init__: samples={samples!r}\n" f"timestamps={timestamps!r}\nname={name}" ) raise MdfException(message) else: self._samples: NDArray[Any] if not isinstance(samples, np.ndarray): samples = np.array(samples) kind = samples.dtype.kind if kind == "U": if encoding is None: encodings = ["utf-8", "latin-1"] else: encodings = [encoding, "utf-8", "latin-1"] for _encoding in encodings: try: self._samples = encode(samples, _encoding) break except: continue else: self._samples = encode(samples, encodings[0], errors="ignore") elif kind == "O": self._samples = samples.astype(np.bytes_) else: self._samples = samples else: self._samples = samples self.timestamps: NDArray[Any] if not isinstance(timestamps, np.ndarray): self.timestamps = np.array(timestamps, dtype=np.float64) else: self.timestamps = timestamps if self.samples.shape[0] != self.timestamps.shape[0]: message = "{} samples and timestamps length mismatch ({} vs {})" message = message.format(name, self.samples.shape[0], self.timestamps.shape[0]) logger.exception(message) raise MdfException(message) self.unit = unit self.name = name self.comment = comment self.flags = flags self._plot_axis: Line3DCollection | None = None self.master_metadata = master_metadata self.display_names = display_names or {} self.attachment = attachment self.encoding = encoding self.group_index = group_index self.channel_index = channel_index self._invalidation_bits = InvalidationArray(invalidation_bits) if invalidation_bits is not None else None self.source: Source | None if source: if not isinstance(source, Source): self.source = Source.from_source(source) else: self.source = source else: self.source = None if bit_count is None: self.bit_count = self.samples.dtype.itemsize * 8 else: self.bit_count = bit_count if not self.samples.dtype.base.metadata: if conversion: if not isinstance(conversion, (v4b.ChannelConversion, v3b.ChannelConversion)): self.conversion = from_dict(conversion) else: self.conversion = conversion else: self.conversion = None self.virtual_conversion: ChannelConversionType | None if self.flags & self.Flags.virtual: if not isinstance(virtual_conversion, (v4b.ChannelConversion, v3b.ChannelConversion)): self.virtual_conversion = from_dict(virtual_conversion) else: self.virtual_conversion = virtual_conversion else: self.virtual_conversion = None self.virtual_master_conversion: ChannelConversionType | None if self.flags & self.Flags.virtual_master: if not isinstance(virtual_master_conversion, (v4b.ChannelConversion, v3b.ChannelConversion)): self.virtual_master_conversion = from_dict(virtual_master_conversion) else: self.virtual_master_conversion = virtual_master_conversion else: self.virtual_master_conversion = None @property def conversion(self): metadata = self._samples.dtype.metadata or self._samples.dtype.base.metadata or {} return metadata.get("conversion", None) @conversion.setter def conversion(self, conv): self._samples = self._samples.view(replace_metadata(self._samples.dtype, self.name, "conversion", conv)) @property def invalidation_bits(self) -> InvalidationArray | None: return self._invalidation_bits @invalidation_bits.setter def invalidation_bits(self, value: ArrayLike | None) -> None: if value is None: self._invalidation_bits = None else: if not isinstance(value, InvalidationArray): value = InvalidationArray(value) if value.shape[0] != self.samples.shape[0]: message = "{} samples and invalidation bits length mismatch ({} vs {})" message = message.format(self.name, self.samples.shape[0], value.shape[0]) logger.exception(message) raise MdfException(message) self._invalidation_bits = value @property def samples(self): return self._samples @samples.setter def samples(self, vals): conversion = self.conversion self._samples = vals.view(replace_metadata(vals.dtype, self.name, "conversion", conversion)) @samples.deleter def samples(self): del self._samples def __repr__(self) -> str: return f"""<Signal {self.name}: \tsamples={self.samples} \ttimestamps={self.timestamps} \tinvalidation_bits={self.invalidation_bits} \tunit="{self.unit}" \tconversion={self.conversion} \tsource={self.source} \tcomment="{self.comment}" \tflags="{self.flags}" \tmastermeta="{self.master_metadata}" \tdisplay_names={self.display_names} \tattachment={self.attachment}> """
[docs] def plot(self, validate: bool = True) -> None: """Plot Signal samples. Pyqtgraph is used if it is available; in this case see the GUI plot documentation to see the available commands. Parameters ---------- validate : bool, default True Apply the invalidation bits. """ try: from .gui.plot import plot plot(self, validate=True) return except: try: import matplotlib.pyplot as plt from matplotlib.widgets import Slider from mpl_toolkits.mplot3d import Axes3D except ImportError: logging.warning("Signal plotting requires pyqtgraph or matplotlib") return if len(self.samples.shape) <= 1 and self.samples.dtype.names is None: fig = plt.figure() if fig.canvas.manager: fig.canvas.manager.set_window_title(self.name) fig.text( 0.95, 0.05, f"asammdf {__version__}", fontsize=8, color="red", ha="right", va="top", alpha=0.5, ) name = self.name if self.comment: comment = self.comment.replace("$", "") comment = extract_xml_comment(comment) comment = fill(comment, 120).replace("\\n", " ") title = f"{name}\n({comment})" plt.title(title) else: plt.title(name) try: if not self.master_metadata: plt.xlabel("Time [s]") plt.ylabel(f"[{self.unit}]") plt.plot(self.timestamps, self.samples, "b") plt.plot(self.timestamps, self.samples, "b.") plt.grid(True) plt.show() else: master_name, sync_type = self.master_metadata match sync_type: case 0 | 1: plt.xlabel(f"{master_name} [s]") case 2: plt.xlabel(f"{master_name} [deg]") case 3: plt.xlabel(f"{master_name} [m]") case 4: plt.xlabel(f"{master_name} [index]") plt.ylabel(f"[{self.unit}]") plt.plot(self.timestamps, self.samples, "b") plt.plot(self.timestamps, self.samples, "b.") plt.grid(True) plt.show() except ValueError: plt.close(fig) else: try: names = self.samples.dtype.names if names is None or len(names) == 1: if names: samples = self.samples[names[0]] else: samples = self.samples shape = samples.shape[1:] fig = plt.figure() if fig.canvas.manager: fig.canvas.manager.set_window_title(self.name) fig.text( 0.95, 0.05, f"asammdf {__version__}", fontsize=8, color="red", ha="right", va="top", alpha=0.5, ) if self.comment: comment = self.comment.replace("$", "") plt.title(f"{self.name}\n({comment})") else: plt.title(self.name) ax: Axes3D = fig.add_subplot(111, projection="3d") # Grab some test data. X = np.array(range(shape[1])) Y = np.array(range(shape[0])) X, Y = np.meshgrid(X, Y) Z = samples[0] # Plot a basic wireframe. self._plot_axis = plot_axis = ax.plot_wireframe(X, Y, Z, rstride=1, cstride=1) # Place Sliders on Graph ax_a = plt.axes((0.25, 0.1, 0.65, 0.03)) # Create Sliders & Determine Range sa = Slider( ax_a, "Time [s]", self.timestamps[0], self.timestamps[-1], valinit=self.timestamps[0], ) def update(val: float) -> None: plot_axis.remove() idx = np.searchsorted(self.timestamps, sa.val, side="right") Z = samples[idx - 1] self._plot_axis = ax.plot_wireframe(X, Y, Z, rstride=1, cstride=1) fig.canvas.draw_idle() sa.on_changed(update) plt.show() else: fig = plt.figure() if fig.canvas.manager: fig.canvas.manager.set_window_title(self.name) fig.text( 0.95, 0.05, f"asammdf {__version__}", fontsize=8, color="red", ha="right", va="top", alpha=0.5, ) if self.comment: comment = self.comment.replace("$", "") plt.title(f"{self.name}\n({comment})") else: plt.title(self.name) ax = fig.add_subplot(111, projection="3d") samples = self.samples[names[0]] axis1 = self.samples[names[1]] axis2 = self.samples[names[2]] # Grab some test data. X, Y = np.meshgrid(axis2[0], axis1[0]) Z = samples[0] # Plot a basic wireframe. self._plot_axis = plot_axis = ax.plot_wireframe(X, Y, Z, rstride=1, cstride=1) # Place Sliders on Graph ax_a = plt.axes((0.25, 0.1, 0.65, 0.03)) # Create Sliders & Determine Range sa = Slider( ax_a, "Time [s]", self.timestamps[0], self.timestamps[-1], valinit=self.timestamps[0], ) def update(val: float) -> None: plot_axis.remove() idx = np.searchsorted(self.timestamps, sa.val, side="right") Z = samples[idx - 1] X, Y = np.meshgrid(axis2[idx - 1], axis1[idx - 1]) self._plot_axis = ax.plot_wireframe(X, Y, Z, rstride=1, cstride=1) fig.canvas.draw_idle() sa.on_changed(update) plt.show() except Exception as err: print(err)
[docs] def cut( self, start: float | None = None, stop: float | None = None, include_ends: bool = True, integer_interpolation_mode: ( IntInterpolationModeType | IntegerInterpolation ) = IntegerInterpolation.REPEAT_PREVIOUS_SAMPLE, float_interpolation_mode: ( FloatInterpolationModeType | FloatInterpolation ) = FloatInterpolation.LINEAR_INTERPOLATION, ) -> "Signal": """Cut the signal according to the `start` and `stop` values, by using the insertion indexes in the signal's time axis. Parameters ---------- start : float, optional Start timestamp for cutting. stop : float, optional Stop timestamp for cutting. include_ends : bool, default True Include the `start` and `stop` timestamps after cutting the signal. If `start` and `stop` are not found in the original timestamps, then the new samples will be computed using interpolation. integer_interpolation_mode : int, default 0 Interpolation mode for integer signals. * 0 - repeat previous sample * 1 - linear interpolation * 2 - hybrid interpolation: channels with integer data type (raw values) that have a conversion that outputs float values will use linear interpolation, otherwise the previous sample is used .. versionadded:: 6.2.0 float_interpolation_mode : int, default 1 Interpolation mode for float channels. * 0 - repeat previous sample * 1 - linear interpolation .. versionadded:: 6.2.0 Returns ------- result : Signal New `Signal` cut from the original. Examples -------- >>> from asammdf import Signal >>> import numpy as np >>> old_sig = Signal(np.arange(0.03, 100, 0.05), np.arange(0.03, 100, 0.05), name='SIG') >>> new_sig = old_sig.cut(1.0, 10.5) >>> new_sig.timestamps[0], new_sig.timestamps[-1] (1.0, 10.5) >>> new_sig = old_sig.cut(1.0, 10.5, include_ends=False) >>> new_sig.timestamps[0], new_sig.timestamps[-1] (1.03, 10.48) >>> new_sig = old_sig.cut(1.0, 10.5, float_interpolation_mode=0) >>> new_sig.samples[0], new_sig.samples[-1] (0.98, 10.48) """ integer_interpolation_mode = IntegerInterpolation(integer_interpolation_mode) float_interpolation_mode = FloatInterpolation(float_interpolation_mode) if self.samples.size == 0: return Signal( np.array([], dtype=self.samples.dtype), np.array([], dtype=self.timestamps.dtype), **self.invariable_attributes(), ) invalidation_bits: NDArray[np.bool] | None if start is None: if stop is None: # return the channel uncut result = Signal( self.samples.copy(), self.timestamps.copy(), invalidation_bits=self.invalidation_bits.copy() if self.invalidation_bits is not None else None, **self.invariable_attributes(), ) else: # cut from begining to stop if stop < self.timestamps[0]: result = Signal( np.array([], dtype=self.samples.dtype), np.array([], dtype=self.timestamps.dtype), **self.invariable_attributes(), ) else: stop_idx = np.searchsorted(self.timestamps, stop, side="right") if include_ends and stop not in self.timestamps and stop < self.timestamps[-1]: interpolated = self.interp( [stop], integer_interpolation_mode=integer_interpolation_mode, float_interpolation_mode=float_interpolation_mode, ) if len(interpolated): samples = np.append(self.samples[:stop_idx], interpolated.samples, axis=0) timestamps = np.append(self.timestamps[:stop_idx], interpolated.timestamps) if self.invalidation_bits is not None and interpolated.invalidation_bits is not None: invalidation_bits = InvalidationArray( np.append( self.invalidation_bits[:stop_idx], interpolated.invalidation_bits, ), self.invalidation_bits.origin, ) else: invalidation_bits = None else: samples = self.samples[:stop_idx].copy() timestamps = self.timestamps[:stop_idx].copy() if self.invalidation_bits is not None: invalidation_bits = self.invalidation_bits[:stop_idx].copy() else: invalidation_bits = None if samples.dtype != self.samples.dtype: samples = samples.astype(self.samples.dtype) result = Signal( samples, timestamps, invalidation_bits=invalidation_bits, **self.invariable_attributes() ) else: if stop is None: # cut from start to end if start > self.timestamps[-1]: result = Signal( np.array([], dtype=self.samples.dtype), np.array([], dtype=self.timestamps.dtype), **self.invariable_attributes(), ) else: start_idx = np.searchsorted(self.timestamps, start, side="left") if include_ends and start not in self.timestamps and start > self.timestamps[0]: interpolated = self.interp( [start], integer_interpolation_mode=integer_interpolation_mode, float_interpolation_mode=float_interpolation_mode, ) if len(interpolated): samples = np.append(interpolated.samples, self.samples[start_idx:], axis=0) timestamps = np.append(interpolated.timestamps, self.timestamps[start_idx:]) if self.invalidation_bits is not None and interpolated.invalidation_bits is not None: invalidation_bits = InvalidationArray( np.append( interpolated.invalidation_bits, self.invalidation_bits[start_idx:], ), self.invalidation_bits.origin, ) else: invalidation_bits = None else: samples = self.samples[start_idx:].copy() timestamps = self.timestamps[start_idx:].copy() if self.invalidation_bits is not None: invalidation_bits = self.invalidation_bits[start_idx:].copy() else: invalidation_bits = None if samples.dtype != self.samples.dtype: samples = samples.astype(self.samples.dtype) result = Signal( samples, timestamps, invalidation_bits=invalidation_bits, **self.invariable_attributes() ) else: # cut between start and stop if start > self.timestamps[-1] or stop < self.timestamps[0]: result = Signal( np.array([], dtype=self.samples.dtype), np.array([], dtype=self.timestamps.dtype), **self.invariable_attributes(), ) else: start_idx = np.searchsorted(self.timestamps, start, side="left") stop_idx = np.searchsorted(self.timestamps, stop, side="right") if start_idx == stop_idx: if include_ends: if start == stop: ends = np.array([start], dtype=self.timestamps.dtype) else: ends = np.array( [start, stop], dtype=self.timestamps.dtype, ) interpolated = self.interp( ends, integer_interpolation_mode=integer_interpolation_mode, float_interpolation_mode=float_interpolation_mode, ) samples = interpolated.samples timestamps = ends invalidation_bits = interpolated.invalidation_bits else: samples = np.array([], dtype=self.samples.dtype) timestamps = np.array([], dtype=self.timestamps.dtype) if self.invalidation_bits is not None: invalidation_bits = self.invalidation_bits[0:0] else: invalidation_bits = None else: samples = self.samples[start_idx:stop_idx].copy() timestamps = self.timestamps[start_idx:stop_idx].copy() if self.invalidation_bits is not None: invalidation_bits = self.invalidation_bits[start_idx:stop_idx].copy() else: invalidation_bits = None if include_ends and stop not in self.timestamps and stop < self.timestamps[-1]: interpolated = self.interp( [stop], integer_interpolation_mode=integer_interpolation_mode, float_interpolation_mode=float_interpolation_mode, ) if len(interpolated): samples = np.append(samples, interpolated.samples, axis=0) timestamps = np.append(timestamps, interpolated.timestamps) if invalidation_bits is not None and interpolated.invalidation_bits is not None: invalidation_bits = InvalidationArray( np.append( invalidation_bits, interpolated.invalidation_bits, ), interpolated.invalidation_bits.origin, ) if include_ends and start not in self.timestamps and start > self.timestamps[0]: interpolated = self.interp( [start], integer_interpolation_mode=integer_interpolation_mode, float_interpolation_mode=float_interpolation_mode, ) if len(interpolated): samples = np.append(interpolated.samples, samples, axis=0) timestamps = np.append(interpolated.timestamps, timestamps) if invalidation_bits is not None and interpolated.invalidation_bits is not None: invalidation_bits = InvalidationArray( np.append( interpolated.invalidation_bits, invalidation_bits, ), interpolated.invalidation_bits.origin, ) if samples.dtype != self.samples.dtype: samples = samples.astype(self.samples.dtype) result = Signal( samples, timestamps, invalidation_bits=invalidation_bits, **self.invariable_attributes() ) return result
[docs] def extend(self, other: "Signal") -> "Signal": """Extend Signal with samples from another Signal. Parameters ---------- other : Signal Returns ------- signal : Signal New extended `Signal`. """ if len(self.timestamps): last_stamp = self.timestamps[-1] else: last_stamp = 0 if len(other): other_first_sample = other.timestamps[0] if last_stamp >= other_first_sample: timestamps = other.timestamps + last_stamp else: timestamps = other.timestamps if self.invalidation_bits is None: if other.invalidation_bits is None: invalidation_bits = None else: invalidation_bits = InvalidationArray( np.concatenate((np.zeros(len(self), dtype=bool), other.invalidation_bits)), other.invalidation_bits.origin, ) else: if other.invalidation_bits is None: invalidation_bits = InvalidationArray( np.concatenate((self.invalidation_bits, np.zeros(len(other), dtype=bool))), self.invalidation_bits.origin, ) else: invalidation_bits = InvalidationArray( np.append(self.invalidation_bits, other.invalidation_bits), self.invalidation_bits.origin ) result = Signal( np.append(self.samples, other.samples, axis=0), np.append(self.timestamps, timestamps), invalidation_bits=invalidation_bits, **self.invariable_attributes(), ) else: result = self return result
[docs] def interp( self, new_timestamps: NDArray[Any] | list[float], integer_interpolation_mode: ( IntInterpolationModeType | IntegerInterpolation ) = IntegerInterpolation.REPEAT_PREVIOUS_SAMPLE, float_interpolation_mode: ( FloatInterpolationModeType | FloatInterpolation ) = FloatInterpolation.LINEAR_INTERPOLATION, ) -> "Signal": """Return a new `Signal` interpolated using the `new_timestamps`. Parameters ---------- new_timestamps : np.ndarray | list Timestamps used for interpolation. integer_interpolation_mode : int, default 0 Interpolation mode for integer signals. * 0 - repeat previous sample * 1 - linear interpolation * 2 - hybrid interpolation: channels with integer data type (raw values) that have a conversion that outputs float values will use linear interpolation, otherwise the previous sample is used .. versionadded:: 6.2.0 float_interpolation_mode : int, default 1 Interpolation mode for float channels. * 0 - repeat previous sample * 1 - linear interpolation .. versionadded:: 6.2.0 Returns ------- signal : Signal New interpolated `Signal`. """ integer_interpolation_mode = IntegerInterpolation(integer_interpolation_mode) float_interpolation_mode = FloatInterpolation(float_interpolation_mode) if not len(self.samples) or not len(new_timestamps): return Signal(self.samples[:0].copy(), self.timestamps[:0].copy(), **self.invariable_attributes()) else: # # we need to validate first otherwise we can get false invalid data # # if the new timebase and the invalidation bits are aligned in an # # infavorable way # # if self.invalidation_bits is not None: # signal = self.validate() # has_invalidation = True # else: # signal = self # has_invalidation = False signal = self invalidation_bits: NDArray[np.bool] | None = signal.invalidation_bits if not len(signal.samples): return Signal( self.samples[:0].copy(), self.timestamps[:0].copy(), invalidation_bits=None if invalidation_bits is None else np.array([], dtype=bool), **self.invariable_attributes(), ) if len(signal.samples.shape) > 1: idx = np.searchsorted(signal.timestamps, new_timestamps, side="right") idx -= 1 idx[idx < 0] = 0 s = signal.samples[idx] if invalidation_bits is not None: invalidation_bits = invalidation_bits[idx] else: kind = signal.samples.dtype.kind if kind == "f": if float_interpolation_mode == FloatInterpolation.REPEAT_PREVIOUS_SAMPLE: idx = np.searchsorted(signal.timestamps, new_timestamps, side="right") idx -= 1 idx[idx < 0] = 0 s = signal.samples[idx] if invalidation_bits is not None: invalidation_bits = invalidation_bits[idx] else: s = np.interp(new_timestamps, signal.timestamps, signal.samples) if invalidation_bits is not None: idx = np.searchsorted(signal.timestamps, new_timestamps, side="right") idx -= 1 idx[idx < 0] = 0 invalidation_bits = invalidation_bits[idx] elif kind in "ui": if integer_interpolation_mode == IntegerInterpolation.HYBRID_INTERPOLATION: if signal.conversion: kind = signal.conversion.convert(signal.samples[:1]).dtype.kind if kind == "f": integer_interpolation_mode = IntegerInterpolation.LINEAR_INTERPOLATION if integer_interpolation_mode == IntegerInterpolation.HYBRID_INTERPOLATION: integer_interpolation_mode = IntegerInterpolation.REPEAT_PREVIOUS_SAMPLE if integer_interpolation_mode == IntegerInterpolation.LINEAR_INTERPOLATION: s = np.interp(new_timestamps, signal.timestamps, signal.samples).astype(signal.samples.dtype) if invalidation_bits is not None: idx = np.searchsorted(signal.timestamps, new_timestamps, side="right") idx -= 1 idx[idx < 0] = 0 invalidation_bits = invalidation_bits[idx] elif integer_interpolation_mode == IntegerInterpolation.REPEAT_PREVIOUS_SAMPLE: idx = np.searchsorted(signal.timestamps, new_timestamps, side="right") idx -= 1 idx[idx < 0] = 0 s = signal.samples[idx] if invalidation_bits is not None: invalidation_bits = invalidation_bits[idx] else: idx = np.searchsorted(signal.timestamps, new_timestamps, side="right") idx -= 1 idx[idx < 0] = 0 s = signal.samples[idx] if invalidation_bits is not None: invalidation_bits = invalidation_bits[idx] if s.dtype != self.samples.dtype: s = s.astype(self.samples.dtype) return Signal(s, new_timestamps, invalidation_bits=invalidation_bits, **self.invariable_attributes())
def __apply_func(self, other: Union["Signal", NDArray[Any], float] | None, func_name: str) -> "Signal": """Delegate operations to the `samples` attribute, but in a time-correct manner by considering the `timestamps`. """ if isinstance(other, Signal): if len(self) and len(other): start = max(self.timestamps[0], other.timestamps[0]) stop = min(self.timestamps[-1], other.timestamps[-1]) s1 = self.physical().cut(start, stop) s2 = other.physical().cut(start, stop) else: s1 = self s2 = other time = np.union1d(s1.timestamps, s2.timestamps) s1 = s1.interp(time) s2 = s2.interp(time) invalidation_bits: NDArray[np.bool] | None if s1.invalidation_bits is not None or s2.invalidation_bits is not None: if s1.invalidation_bits is None: invalidation_bits = s2.invalidation_bits elif s2.invalidation_bits is None: invalidation_bits = s1.invalidation_bits else: invalidation_bits = s1.invalidation_bits | s2.invalidation_bits else: invalidation_bits = None func = getattr(s1.samples, func_name) conversion = None s = func(s2.samples) elif other is None: s = self.samples conversion = self.conversion time = self.timestamps invalidation_bits = self.invalidation_bits else: samples = self.physical().samples func = getattr(samples, func_name) s = func(other) conversion = None time = self.timestamps invalidation_bits = self.invalidation_bits return Signal( samples=s, timestamps=time, invalidation_bits=invalidation_bits, **self.invariable_attributes(conversion=conversion), ) def __pos__(self) -> "Signal": return self def __neg__(self) -> "Signal": return Signal( np.negative(self.samples), self.timestamps, invalidation_bits=self.invalidation_bits, **self.invariable_attributes(), ) def __round__(self, n: int) -> "Signal": return Signal( np.around(self.samples, n), self.timestamps, invalidation_bits=self.invalidation_bits, **self.invariable_attributes(), ) def __sub__(self, other: Union["Signal", NDArray[Any], float] | None) -> "Signal": return self.__apply_func(other, "__sub__") def __isub__(self, other: Union["Signal", NDArray[Any], float] | None) -> "Signal": return self.__sub__(other) def __rsub__(self, other: Union["Signal", NDArray[Any], float] | None) -> "Signal": return -self.__sub__(other) def __add__(self, other: Union["Signal", NDArray[Any], float] | None) -> "Signal": return self.__apply_func(other, "__add__") def __iadd__(self, other: Union["Signal", NDArray[Any], float] | None) -> "Signal": return self.__add__(other) def __radd__(self, other: Union["Signal", NDArray[Any], float] | None) -> "Signal": return self.__add__(other) def __truediv__(self, other: Union["Signal", NDArray[Any], float] | None) -> "Signal": return self.__apply_func(other, "__truediv__") def __itruediv__(self, other: Union["Signal", NDArray[Any], float] | None) -> "Signal": return self.__truediv__(other) def __rtruediv__(self, other: Union["Signal", NDArray[Any], float] | None) -> "Signal": return self.__apply_func(other, "__rtruediv__") def __mul__(self, other: Union["Signal", NDArray[Any], float] | None) -> "Signal": return self.__apply_func(other, "__mul__") def __imul__(self, other: Union["Signal", NDArray[Any], float] | None) -> "Signal": return self.__mul__(other) def __rmul__(self, other: Union["Signal", NDArray[Any], float] | None) -> "Signal": return self.__mul__(other) def __floordiv__(self, other: Union["Signal", NDArray[Any], float] | None) -> "Signal": return self.__apply_func(other, "__floordiv__") def __ifloordiv__(self, other: Union["Signal", NDArray[Any], float] | None) -> "Signal": return self.__truediv__(other) def __rfloordiv__(self, other: Union["Signal", NDArray[Any], float] | None) -> "Signal": return 1 / self.__apply_func(other, "__rfloordiv__") def __mod__(self, other: Union["Signal", NDArray[Any], float] | None) -> "Signal": return self.__apply_func(other, "__mod__") def __pow__(self, other: Union["Signal", NDArray[Any], float] | None) -> "Signal": return self.__apply_func(other, "__pow__") def __and__(self, other: Union["Signal", NDArray[Any], float] | None) -> "Signal": return self.__apply_func(other, "__and__") def __or__(self, other: Union["Signal", NDArray[Any], float] | None) -> "Signal": return self.__apply_func(other, "__or__") def __xor__(self, other: Union["Signal", NDArray[Any], float] | None) -> "Signal": return self.__apply_func(other, "__xor__") def __invert__(self) -> "Signal": s = ~self.samples time = self.timestamps return Signal(s, time, invalidation_bits=self.invalidation_bits, **self.invariable_attributes()) def __lshift__(self, other: Union["Signal", NDArray[Any], float] | None) -> "Signal": return self.__apply_func(other, "__lshift__") def __rshift__(self, other: Union["Signal", NDArray[Any], float] | None) -> "Signal": return self.__apply_func(other, "__rshift__") def __lt__(self, other: Union["Signal", NDArray[Any]] | None) -> "Signal": return self.__apply_func(other, "__lt__") def __le__(self, other: Union["Signal", NDArray[Any]] | None) -> "Signal": return self.__apply_func(other, "__le__") def __gt__(self, other: Union["Signal", NDArray[Any]] | None) -> "Signal": return self.__apply_func(other, "__gt__") def __ge__(self, other: Union["Signal", NDArray[Any]] | None) -> "Signal": return self.__apply_func(other, "__ge__") def __eq__(self, other: Union["Signal", NDArray[Any]] | None) -> "Signal": # type: ignore[override] return self.__apply_func(other, "__eq__") def __ne__(self, other: Union["Signal", NDArray[Any]] | None) -> "Signal": # type: ignore[override] return self.__apply_func(other, "__ne__") def __iter__(self) -> Iterator[NDArray[Any] | str]: yield from (self.samples, self.timestamps, self.unit, self.name) def __reversed__(self) -> Iterator[tuple[int, tuple[Any, Any]]]: return enumerate(zip(reversed(self.samples), reversed(self.timestamps), strict=False)) def __len__(self) -> int: return len(self.samples) def __abs__(self) -> "Signal": return Signal( np.fabs(self.samples), self.timestamps, invalidation_bits=self.invalidation_bits, **self.invariable_attributes(), ) @overload def __getitem__(self, val: str) -> NDArray[Any]: ... @overload def __getitem__(self, val: int | slice) -> "Signal": ... def __getitem__(self, val: int | slice | str) -> Union[NDArray[Any], "Signal"]: if isinstance(val, str): return self.samples[val] else: return Signal( self.samples[val], self.timestamps[val], invalidation_bits=self.invalidation_bits[val] if self.invalidation_bits is not None else None, **self.invariable_attributes(), ) def __setitem__(self, idx: Any, val: Any) -> None: self.samples[idx] = val
[docs] def astype(self, np_type: DTypeLike) -> "Signal": """Return a new `Signal` with samples of dtype `np_type`. Parameters ---------- np_type : np.dtype New numpy dtype. Returns ------- signal : Signal New `Signal` with the samples of dtype `np_type`. """ return Signal( self.samples.astype(np_type), self.timestamps, invalidation_bits=self.invalidation_bits, **self.invariable_attributes(), )
[docs] def raw(self, copy=False): """Get the raw samples values Parameters ---------- copy : bool, default True Copy the samples and timestamps in the returned Signal. Returns ------- phys : Signal New `Signal` with raw values. """ samples = self.samples.view(np.lib.format.drop_metadata(self.samples.dtype)) if copy: samples = samples.copy() return Signal( samples, self.timestamps.copy() if copy else self.timestamps, invalidation_bits=self.invalidation_bits, **self.invariable_attributes(encoding=None, conversion=None), )
[docs] def physical(self, copy: bool = True, ignore_value2text_conversions: bool = False) -> "Signal": """Get the scaled (physical) samples values. Parameters ---------- copy : bool, default True Copy the samples and timestamps in the returned Signal. .. versionadded:: 7.4.0 ignore_value2text_conversions : bool, default False Make sure that the output signal has numeric samples by ignoring the value to text conversions. .. versionadded:: 8.3.0 Returns ------- phys : Signal New `Signal` with scaled (physical) values. """ samples = convert(self.samples, ignore_value2text_conversions=ignore_value2text_conversions) if samples.dtype.kind == "S": encoding = "utf-8" if self.conversion and self.conversion.id == b"##CC" else "latin-1" else: encoding = None samples = samples.view(np.lib.format.drop_metadata(samples.dtype)) return Signal( samples, self.timestamps.copy() if copy else self.timestamps, invalidation_bits=self.invalidation_bits, **self.invariable_attributes(dropped_attributes=("conversion",), encoding=encoding), )
scaled = physical
[docs] def validate(self, copy: bool = True) -> "Signal": """Apply invalidation bits if they are available for this signal. Parameters ---------- copy : bool, default True Return a copy of the result. .. versionadded:: 5.12.0 """ if self.invalidation_bits is None: signal = self else: idx = np.nonzero(~self.invalidation_bits)[0] if len(idx) == len(self.samples): signal = self else: signal = Signal(self.samples[idx], self.timestamps[idx], **self.invariable_attributes()) if copy: signal = signal.copy() return signal
[docs] def copy(self) -> "Signal": """Copy all attributes to a new Signal.""" return Signal( self.samples.copy(), self.timestamps.copy(), invalidation_bits=self.invalidation_bits.copy() if self.invalidation_bits is not None else None, **self.invariable_attributes(), )
def invariable_attributes(self, dropped_attributes=(), **kwargs): attrs = { "unit": self.unit, "name": self.name, "conversion": self.conversion, "comment": self.comment, "master_metadata": self.master_metadata, "display_names": self.display_names, "attachment": self.attachment, "source": self.source, "bit_count": self.bit_count, "encoding": self.encoding, "group_index": self.group_index, "channel_index": self.channel_index, "flags": self.flags, "virtual_conversion": self.virtual_conversion, "virtual_master_conversion": self.virtual_master_conversion, } attrs.update(kwargs) for attr in dropped_attributes: del attrs[attr] return attrs
if __name__ == "__main__": pass