"""
ASAM MDF version 4 file format module
"""
from __future__ import annotations
import bisect
from collections import defaultdict
from collections.abc import Callable, Iterable, Iterator, Sequence, Sized
from copy import deepcopy
from datetime import datetime
from functools import lru_cache
from hashlib import md5
from io import BufferedReader, BytesIO
import logging
from math import ceil, floor
import mmap
import os
from pathlib import Path
import shutil
import sys
from tempfile import gettempdir, TemporaryFile
from time import sleep
from traceback import format_exc
from typing import Any, overload
from zipfile import ZIP_DEFLATED, ZipFile
from zlib import decompress
from typing_extensions import Literal
try:
from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes
CRYPTOGRAPHY_AVAILABLE = True
except:
CRYPTOGRAPHY_AVAILABLE = False
import canmatrix
from canmatrix.canmatrix import CanMatrix
from lz4.frame import compress as lz_compress
from lz4.frame import decompress as lz_decompress
import numpy as np
from numpy import (
arange,
argwhere,
array,
array_equal,
bool_,
bytes_,
column_stack,
concatenate,
cumsum,
dtype,
empty,
fliplr,
float32,
float64,
frombuffer,
full,
linspace,
nonzero,
packbits,
searchsorted,
transpose,
uint8,
uint16,
uint32,
uint64,
unique,
where,
zeros,
)
from numpy.core.defchararray import decode, encode
from numpy.core.records import fromarrays, fromstring
from numpy.typing import NDArray
from pandas import DataFrame
from . import v4_constants as v4c
from ..signal import Signal
from ..types import (
BusType,
ChannelsType,
CompressionType,
RasterType,
ReadableBufferType,
StrPathType,
WritableBufferType,
)
from ..version import __version__
from .bus_logging_utils import extract_mux
from .conversion_utils import conversion_transfer
from .mdf_common import MDF_Common
from .options import get_global_option
from .source_utils import Source
from .utils import (
all_blocks_addresses,
as_non_byte_sized_signed_int,
CHANNEL_COUNT,
ChannelsDB,
CONVERT,
count_channel_groups,
DataBlockInfo,
debug_channel,
extract_display_names,
extract_encryption_information,
extract_xml_comment,
fmt_to_datatype_v4,
get_fmt_v4,
get_text_v4,
Group,
InvalidationBlockInfo,
is_file_like,
load_can_database,
MdfException,
sanitize_xml,
SignalDataBlockInfo,
TERMINATED,
UINT8_uf,
UINT16_uf,
UINT32_p,
UINT32_uf,
UINT64_uf,
UniqueDB,
validate_version_argument,
VirtualChannelGroup,
)
from .v4_blocks import (
AttachmentBlock,
Channel,
ChannelArrayBlock,
ChannelConversion,
ChannelGroup,
DataBlock,
DataGroup,
DataList,
DataZippedBlock,
EventBlock,
FileHistory,
FileIdentificationBlock,
HeaderBlock,
HeaderList,
ListData,
SourceInformation,
TextBlock,
)
try:
from isal.isal_zlib import decompress
except ImportError:
pass
MASTER_CHANNELS = (v4c.CHANNEL_TYPE_MASTER, v4c.CHANNEL_TYPE_VIRTUAL_MASTER)
COMMON_SIZE = v4c.COMMON_SIZE
COMMON_u = v4c.COMMON_u
COMMON_uf = v4c.COMMON_uf
COMMON_SHORT_SIZE = v4c.COMMON_SHORT_SIZE
COMMON_SHORT_uf = v4c.COMMON_SHORT_uf
COMMON_SHORT_u = v4c.COMMON_SHORT_u
VALID_DATA_TYPES = v4c.VALID_DATA_TYPES
EMPTY_TUPLE = tuple()
# 100 extra steps for the sorting, 1 step after sorting and 1 step at finish
SORT_STEPS = 102
logger = logging.getLogger("asammdf")
__all__ = ["MDF4"]
from .cutils import (
data_block_from_arrays,
extract,
get_channel_raw_bytes,
get_vlsd_max_sample_size,
get_vlsd_offsets,
lengths,
sort_data_block,
)
[docs]class MDF4(MDF_Common):
"""The *header* attibute is a *HeaderBlock*.
The *groups* attribute is a list of dicts, each one with the following keys:
* ``data_group`` - DataGroup object
* ``channel_group`` - ChannelGroup object
* ``channels`` - list of Channel objects with the same order as found in the mdf file
* ``channel_dependencies`` - list of *ChannelArrayBlock* in case of channel arrays;
list of Channel objects in case of structure channel composition
* ``data_block`` - address of data block
* ``data_location``- integer code for data location (original file, temporary file or
memory)
* ``data_block_addr`` - list of raw samples starting addresses
* ``data_block_type`` - list of codes for data block type
* ``data_block_size`` - list of raw samples block size
* ``sorted`` - sorted indicator flag
* ``record_size`` - dict that maps record ID's to record sizes in bytes (including invalidation bytes)
* ``param`` - row size used for transposition, in case of transposed zipped blocks
Parameters
----------
name : string
mdf file name (if provided it must be a real file name) or
file-like object
version : string
mdf file version ('4.00', '4.10', '4.11', '4.20'); default '4.10'
kwargs :
use_display_names (True) : bool
keyword only argument: for MDF4 files parse the XML channel comment to
search for the display name; XML parsing is quite expensive so setting
this to *False* can decrease the loading times very much; default
*True*
remove_source_from_channel_names (True) : bool
copy_on_get (True) : bool
copy channel values (np.array) to avoid high memory usage
compact_vlsd (False) : bool
use slower method to save the exact sample size for VLSD channels
column_storage (True) : bool
use column storage for MDF version >= 4.20
password : bytes | str
use this password to decode encrypted attachments
Attributes
----------
attachments : list
list of file attachments
channels_db : dict
used for fast channel access by name; for each name key the value is a
list of (group index, channel index) tuples
events : list
list event blocks
file_comment : TextBlock
file comment TextBlock
file_history : list
list of (FileHistory, TextBlock) pairs
groups : list
list of data group dicts
header : HeaderBlock
mdf file header
identification : FileIdentificationBlock
mdf file start block
last_call_info : dict | None
a dict to hold information about the last called method.
.. versionadded:: 5.12.0
masters_db : dict
used for fast master channel access; for each group index key the value
is the master channel index
name : string
mdf file name
version : str
mdf version
"""
def __init__(
self,
name: BufferedReader | BytesIO | StrPathType | None = None,
version: str = "4.10",
channels: list[str] | None = None,
**kwargs,
) -> None:
if not kwargs.get("__internal__", False):
raise MdfException(
"Always use the MDF class; do not use the class MDF4 directly"
)
self._kwargs = kwargs
self.original_name = kwargs["original_name"]
self.groups = []
self.header = None
self.identification = None
self.file_history = []
self.channels_db = ChannelsDB()
self.masters_db = {}
self.attachments = []
self._attachments_cache = {}
self.file_comment = None
self.events = []
self.bus_logging_map = {"CAN": {}, "ETHERNET": {}, "FLEXRAY": {}, "LIN": {}}
self._attachments_map = {}
self._ch_map = {}
self._master_channel_metadata = {}
self._invalidation_cache = {}
self._external_dbc_cache = {}
self._si_map = {}
self._file_si_map = {}
self._cc_map = {}
self._file_cc_map = {}
self._cg_map = {}
self._cn_data_map = {}
self._dbc_cache = {}
self._interned_strings = {}
self._closed = False
self.temporary_folder = kwargs.get("temporary_folder", None)
if channels is None:
self.load_filter = set()
self.use_load_filter = False
else:
self.load_filter = set(channels)
self.use_load_filter = True
self._tempfile = TemporaryFile(dir=self.temporary_folder)
self._file = None
self._read_fragment_size = get_global_option("read_fragment_size")
self._write_fragment_size = get_global_option("write_fragment_size")
self._single_bit_uint_as_bool = get_global_option("single_bit_uint_as_bool")
self._integer_interpolation = get_global_option("integer_interpolation")
self._float_interpolation = get_global_option("float_interpolation")
self._raise_on_multiple_occurrences = kwargs.get(
"raise_on_multiple_occurrences",
get_global_option("raise_on_multiple_occurrences"),
)
self._use_display_names = kwargs.get(
"use_display_names", get_global_option("use_display_names")
)
self._fill_0_for_missing_computation_channels = kwargs.get(
"fill_0_for_missing_computation_channels",
get_global_option("fill_0_for_missing_computation_channels"),
)
self._remove_source_from_channel_names = kwargs.get(
"remove_source_from_channel_names", False
)
self._password = kwargs.get("password", None)
self._force_attachment_encryption = kwargs.get(
"force_attachment_encryption", False
)
self.copy_on_get = kwargs.get("copy_on_get", True)
self.compact_vlsd = kwargs.get("compact_vlsd", False)
self.virtual_groups = {} # master group 2 referencing groups
self.virtual_groups_map = {} # group index 2 master group
self.vlsd_max_length = (
{}
) # hint about the maximum vlsd length for group_index, name pairs
self._master = None
self.last_call_info = None
# make sure no appended block has the address 0
self._tempfile.write(b"\0")
self._delete_on_close = False
self._mapped_file = None
progress = kwargs.get("progress", None)
if name:
if is_file_like(name):
self._file = name
self.name = self.original_name = Path("From_FileLike.mf4")
self._from_filelike = True
self._read(mapped=False, progress=progress)
else:
with open(name, "rb") as stream:
identification = FileIdentificationBlock(stream=stream)
version = identification["version_str"]
version = version.decode("utf-8").strip(" \n\t\0")
flags = identification["unfinalized_standard_flags"]
if version >= "4.10" and flags:
tmpdir = Path(gettempdir())
self.name = tmpdir / Path(name).name
shutil.copy(name, self.name)
self._file = open(self.name, "rb+")
self._from_filelike = False
self._delete_on_close = True
self._read(mapped=False, progress=progress)
else:
if sys.maxsize < 2**32:
self.name = Path(name)
self._file = open(self.name, "rb")
self._from_filelike = False
self._read(mapped=False, progress=progress)
else:
self.name = Path(name)
self._mapped_file = open(self.name, "rb")
self._file = mmap.mmap(
self._mapped_file.fileno(), 0, access=mmap.ACCESS_READ
)
self._from_filelike = False
self._read(mapped=True, progress=progress)
else:
self._from_filelike = False
version = validate_version_argument(version)
self.header = HeaderBlock()
self.identification = FileIdentificationBlock(version=version)
self.version = version
self.name = Path("__new__.mf4")
if self.version >= "4.20":
self._column_storage = kwargs.get("column_storage", True)
else:
self._column_storage = False
self._parent = None
def __del__(self) -> None:
self.close()
def _check_finalised(self) -> int:
flags = self.identification["unfinalized_standard_flags"]
if flags & 1:
message = (
f"Unfinalised file {self.name}:"
" Update of cycle counters for CG/CA blocks required"
)
logger.info(message)
if flags & 1 << 1:
message = f"Unfinalised file {self.name}: Update of cycle counters for SR blocks required"
logger.info(message)
if flags & 1 << 2:
message = f"Unfinalised file {self.name}: Update of length for last DT block required"
logger.info(message)
if flags & 1 * 8:
message = f"Unfinalised file {self.name}: Update of length for last RD block required"
logger.info(message)
if flags & 1 << 4:
message = (
f"Unfinalised file {self.name}:"
" Update of last DL block in each chained list"
" of DL blocks required"
)
logger.info(message)
if flags & 1 << 5:
message = (
f"Unfinalised file {self.name}:"
" Update of cg_data_bytes and cg_inval_bytes"
" in VLSD CG block required"
)
logger.info(message)
if flags & 1 << 6:
message = (
f"Unfinalised file {self.name}:"
" Update of offset values for VLSD channel required"
" in case a VLSD CG block is used"
)
logger.info(message)
return flags
def _read(self, mapped: bool = False, progress=None) -> None:
stream = self._file
self._mapped = mapped
dg_cntr = 0
stream.seek(0, 2)
self.file_limit = stream.tell()
stream.seek(0)
cg_count, _ = count_channel_groups(stream)
progress_steps = cg_count + SORT_STEPS
if progress is not None:
if callable(progress):
progress(0, progress_steps)
current_cg_index = 0
self.identification = FileIdentificationBlock(stream=stream, mapped=mapped)
version = self.identification["version_str"]
self.version = version.decode("utf-8").strip(" \n\t\r\0")
if self.version >= "4.10":
# Check for finalization past version 4.10
finalisation_flags = self._check_finalised()
if finalisation_flags:
message = f"Attempting finalization of {self.name}"
logger.info(message)
self._finalize()
self._mapped = mapped = False
stream = self._file
self.header = HeaderBlock(address=0x40, stream=stream, mapped=mapped)
# read file history
fh_addr = self.header["file_history_addr"]
while fh_addr:
if (fh_addr + v4c.FH_BLOCK_SIZE) > self.file_limit:
logger.warning(
f"File history address {fh_addr:X} is outside the file size {self.file_limit}"
)
break
history_block = FileHistory(address=fh_addr, stream=stream, mapped=mapped)
self.file_history.append(history_block)
fh_addr = history_block.next_fh_addr
# read attachments
at_addr = self.header["first_attachment_addr"]
index = 0
while at_addr:
if (at_addr + v4c.AT_COMMON_SIZE) > self.file_limit:
logger.warning(
f"Attachment address {at_addr:X} is outside the file size {self.file_limit}"
)
break
at_block = AttachmentBlock(address=at_addr, stream=stream, mapped=mapped)
self._attachments_map[at_addr] = index
self.attachments.append(at_block)
at_addr = at_block.next_at_addr
index += 1
# go to first date group and read each data group sequentially
dg_addr = self.header.first_dg_addr
while dg_addr:
if (dg_addr + v4c.DG_BLOCK_SIZE) > self.file_limit:
logger.warning(
f"Data group address {dg_addr:X} is outside the file size {self.file_limit}"
)
break
new_groups = []
group = DataGroup(address=dg_addr, stream=stream, mapped=mapped)
record_id_nr = group.record_id_len
# go to first channel group of the current data group
cg_addr = first_cg_addr = group.first_cg_addr
cg_nr = 0
cg_size = {}
while cg_addr:
if (cg_addr + v4c.CG_BLOCK_SIZE) > self.file_limit:
logger.warning(
f"Channel group address {cg_addr:X} is outside the file size {self.file_limit}"
)
break
cg_nr += 1
if cg_addr == first_cg_addr:
grp = Group(group)
else:
grp = Group(group.copy())
# read each channel group sequentially
block = ChannelGroup(
address=cg_addr,
stream=stream,
mapped=mapped,
si_map=self._si_map,
version=self.version,
tx_map=self._interned_strings,
)
self._cg_map[cg_addr] = dg_cntr
channel_group = grp.channel_group = block
grp.record_size = cg_size
if channel_group.flags & v4c.FLAG_CG_VLSD:
# VLDS flag
record_id = channel_group.record_id
cg_size[record_id] = 0
elif channel_group.flags & v4c.FLAG_CG_BUS_EVENT:
samples_size = channel_group.samples_byte_nr
inval_size = channel_group.invalidation_bytes_nr
record_id = channel_group.record_id
cg_size[record_id] = samples_size + inval_size
else:
# in case no `cg_flags` are set
samples_size = channel_group.samples_byte_nr
inval_size = channel_group.invalidation_bytes_nr
record_id = channel_group.record_id
cg_size[record_id] = samples_size + inval_size
if record_id_nr:
grp.sorted = False
else:
grp.sorted = True
# go to first channel of the current channel group
ch_addr = channel_group.first_ch_addr
ch_cntr = 0
# Read channels by walking recursively in the channel group
# starting from the first channel
self._read_channels(
ch_addr, grp, stream, dg_cntr, ch_cntr, mapped=mapped
)
cg_addr = channel_group.next_cg_addr
dg_cntr += 1
current_cg_index += 1
if progress is not None:
if callable(progress):
progress(current_cg_index, progress_steps)
new_groups.append(grp)
# store channel groups record sizes dict in each
# new group data belong to the initial unsorted group, and add
# the key 'sorted' with the value False to use a flag;
address = group.data_block_addr
total_size = 0
inval_total_size = 0
block_type = b"##DT"
for new_group in new_groups:
channel_group = new_group.channel_group
if channel_group.flags & v4c.FLAG_CG_REMOTE_MASTER:
block_type = b"##DV"
total_size += (
channel_group.samples_byte_nr * channel_group.cycles_nr
)
inval_total_size += (
channel_group.invalidation_bytes_nr * channel_group.cycles_nr
)
record_size = channel_group.samples_byte_nr
else:
block_type = b"##DT"
total_size += (
channel_group.samples_byte_nr
+ channel_group.invalidation_bytes_nr
) * channel_group.cycles_nr
record_size = (
channel_group.samples_byte_nr
+ channel_group.invalidation_bytes_nr
)
if (
self.identification["unfinalized_standard_flags"]
& v4c.FLAG_UNFIN_UPDATE_CG_COUNTER
):
total_size = int(10**12)
inval_total_size = int(10**12)
data_blocks_info = self._get_data_blocks_info(
address=address,
stream=stream,
block_type=block_type,
mapped=mapped,
total_size=total_size,
inval_total_size=inval_total_size,
record_size=record_size,
)
data_blocks = []
uses_ld = self._uses_ld(
address=address,
stream=stream,
block_type=block_type,
mapped=mapped,
)
for grp in new_groups:
grp.data_location = v4c.LOCATION_ORIGINAL_FILE
grp.data_blocks_info_generator = data_blocks_info
grp.data_blocks = data_blocks
grp.uses_ld = uses_ld
self._prepare_record(grp)
self.groups.extend(new_groups)
dg_addr = group.next_dg_addr
# all channels have been loaded so now we can link the
# channel dependencies and load the signal data for VLSD channels
for gp_index, grp in enumerate(self.groups):
if (
self.version >= "4.20"
and grp.channel_group.flags & v4c.FLAG_CG_REMOTE_MASTER
):
grp.channel_group.cg_master_index = self._cg_map[
grp.channel_group.cg_master_addr
]
index = grp.channel_group.cg_master_index
else:
index = gp_index
self.virtual_groups_map[gp_index] = index
if index not in self.virtual_groups:
self.virtual_groups[index] = VirtualChannelGroup()
virtual_channel_group = self.virtual_groups[index]
virtual_channel_group.groups.append(gp_index)
virtual_channel_group.record_size += (
grp.channel_group.samples_byte_nr
+ grp.channel_group.invalidation_bytes_nr
)
virtual_channel_group.cycles_nr = grp.channel_group.cycles_nr
for ch_index, dep_list in enumerate(grp.channel_dependencies):
if not dep_list:
continue
for dep in dep_list:
if isinstance(dep, ChannelArrayBlock):
if dep.flags & v4c.FLAG_CA_DYNAMIC_AXIS:
for i in range(dep.dims):
ch_addr = dep[f"dynamic_size_{i}_ch_addr"]
if ch_addr:
ref_channel = self._ch_map[ch_addr]
dep.dynamic_size_channels.append(ref_channel)
else:
dep.dynamic_size_channels.append(None)
if dep.flags & v4c.FLAG_CA_INPUT_QUANTITY:
for i in range(dep.dims):
ch_addr = dep[f"input_quantity_{i}_ch_addr"]
if ch_addr:
ref_channel = self._ch_map[ch_addr]
dep.input_quantity_channels.append(ref_channel)
else:
dep.input_quantity_channels.append(None)
if dep.flags & v4c.FLAG_CA_OUTPUT_QUANTITY:
ch_addr = dep["output_quantity_ch_addr"]
if ch_addr:
ref_channel = self._ch_map[ch_addr]
dep.output_quantity_channel = ref_channel
else:
dep.output_quantity_channel = None
if dep.flags & v4c.FLAG_CA_COMPARISON_QUANTITY:
ch_addr = dep["comparison_quantity_ch_addr"]
if ch_addr:
ref_channel = self._ch_map[ch_addr]
dep.comparison_quantity_channel = ref_channel
else:
dep.comparison_quantity_channel = None
if dep.flags & v4c.FLAG_CA_AXIS:
for i in range(dep.dims):
cc_addr = dep[f"axis_conversion_{i}"]
if cc_addr:
conv = ChannelConversion(
stream=stream,
address=cc_addr,
mapped=mapped,
tx_map={},
)
dep.axis_conversions.append(conv)
else:
dep.axis_conversions.append(None)
if (dep.flags & v4c.FLAG_CA_AXIS) and not (
dep.flags & v4c.FLAG_CA_FIXED_AXIS
):
for i in range(dep.dims):
ch_addr = dep[f"scale_axis_{i}_ch_addr"]
if ch_addr:
ref_channel = self._ch_map[ch_addr]
dep.axis_channels.append(ref_channel)
else:
dep.axis_channels.append(None)
else:
break
self._sort(
current_progress_index=current_cg_index,
max_progress_count=progress_steps,
progress=progress,
)
if progress is not None:
if callable(progress):
progress(progress_steps - 1, progress_steps) # second to last step now
for grp in self.groups:
channels = grp.channels
if (
len(channels) == 1
and channels[0].dtype_fmt.itemsize == grp.channel_group.samples_byte_nr
):
grp.single_channel_dtype = channels[0].dtype_fmt
self._process_bus_logging()
# read events
addr = self.header.first_event_addr
ev_map = {}
event_index = 0
while addr:
if (addr + v4c.COMMON_SIZE) > self.file_limit:
logger.warning(
f"Event address {addr:X} is outside the file size {self.file_limit}"
)
break
event = EventBlock(address=addr, stream=stream, mapped=mapped)
event.update_references(self._ch_map, self._cg_map)
self.events.append(event)
ev_map[addr] = event_index
event_index += 1
addr = event.next_ev_addr
for event in self.events:
addr = event.parent_ev_addr
if addr:
parent = ev_map.get(addr, None)
if parent is not None:
event.parent = parent
else:
event.parent = None
addr = event.range_start_ev_addr
if addr:
range_start_ev_addr = ev_map.get(addr, None)
if range_start_ev_addr is not None:
event.parent = range_start_ev_addr
else:
event.parent = None
self._si_map.clear()
self._ch_map.clear()
self._cc_map.clear()
self._interned_strings.clear()
self._attachments_map.clear()
if progress is not None:
if callable(progress):
progress(
progress_steps, progress_steps
) # last step, we've completely loaded the file for sure
self.progress = cg_count, cg_count
def _read_channels(
self,
ch_addr: int,
grp: Group,
stream: ReadableBufferType,
dg_cntr: int,
ch_cntr: int,
channel_composition: bool = False,
mapped: bool = False,
) -> tuple[int, list[tuple[int, int]] | None, dtype | None]:
filter_channels = self.use_load_filter
use_display_names = self._use_display_names
channels = grp.channels
dependencies = grp.channel_dependencies
unique_names = UniqueDB()
if channel_composition:
composition = []
composition_channels = []
if grp.channel_group.path_separator:
path_separator = chr(grp.channel_group.path_separator)
else:
path_separator = "\\"
while ch_addr:
# read channel block and create channel object
if (ch_addr + v4c.COMMON_SIZE) > self.file_limit:
logger.warning(
f"Channel address {ch_addr:X} is outside the file size {self.file_limit}"
)
break
if filter_channels:
if mapped:
(
id_,
links_nr,
next_ch_addr,
component_addr,
name_addr,
comment_addr,
) = v4c.CHANNEL_FILTER_uf(stream, ch_addr)
channel_type = stream[ch_addr + v4c.COMMON_SIZE + links_nr * 8]
name = get_text_v4(name_addr, stream, mapped=mapped)
if use_display_names:
comment = get_text_v4(comment_addr, stream, mapped=mapped)
display_names = extract_display_names(comment)
else:
display_names = {}
comment = None
else:
stream.seek(ch_addr)
(
id_,
links_nr,
next_ch_addr,
component_addr,
name_addr,
comment_addr,
) = v4c.CHANNEL_FILTER_u(stream.read(v4c.CHANNEL_FILTER_SIZE))
stream.seek(ch_addr + v4c.COMMON_SIZE + links_nr * 8)
channel_type = stream.read(1)[0]
name = get_text_v4(name_addr, stream, mapped=mapped)
if use_display_names:
comment = get_text_v4(comment_addr, stream, mapped=mapped)
display_names = extract_display_names(comment)
else:
display_names = {}
comment = None
if id_ != b"##CN":
message = f'Expected "##CN" block @{hex(ch_addr)} but found "{id_}"'
raise MdfException(message)
if self._remove_source_from_channel_names:
name = name.split(path_separator, 1)[0]
display_names = {
_name.split(path_separator, 1)[0]: val
for _name, val in display_names.items()
}
if (
channel_composition
or channel_type in v4c.MASTER_TYPES
or name in self.load_filter
or (
use_display_names
and any(
dsp_name in self.load_filter for dsp_name in display_names
)
)
):
if comment is None:
comment = get_text_v4(comment_addr, stream, mapped=mapped)
channel = Channel(
address=ch_addr,
stream=stream,
cc_map=self._cc_map,
si_map=self._si_map,
at_map=self._attachments_map,
use_display_names=use_display_names,
mapped=mapped,
tx_map=self._interned_strings,
file_limit=self.file_limit,
parsed_strings=(name, display_names, comment),
)
elif not component_addr:
ch_addr = next_ch_addr
continue
else:
if (component_addr + v4c.CC_ALG_BLOCK_SIZE) > self.file_limit:
logger.warning(
f"Channel component address {component_addr:X} is outside the file size {self.file_limit}"
)
break
# check if it is a CABLOCK or CNBLOCK
stream.seek(component_addr)
blk_id = stream.read(4)
if blk_id == b"##CN":
(
ch_cntr,
_1,
_2,
) = self._read_channels(
component_addr,
grp,
stream,
dg_cntr,
ch_cntr,
False,
mapped=mapped,
)
ch_addr = next_ch_addr
continue
else:
channel = Channel(
address=ch_addr,
stream=stream,
cc_map=self._cc_map,
si_map=self._si_map,
at_map=self._attachments_map,
use_display_names=use_display_names,
mapped=mapped,
tx_map=self._interned_strings,
file_limit=self.file_limit,
parsed_strings=None,
)
if channel.data_type not in VALID_DATA_TYPES:
ch_addr = channel.next_ch_addr
continue
if channel.channel_type == v4c.CHANNEL_TYPE_SYNC:
channel.attachment = self._attachments_map.get(
channel.data_block_addr,
None,
)
if self._remove_source_from_channel_names:
channel.name = channel.name.split(path_separator, 1)[0]
channel.display_names = {
_name.split(path_separator, 1)[0]: val
for _name, val in channel.display_names.items()
}
entry = (dg_cntr, ch_cntr)
self._ch_map[ch_addr] = entry
channels.append(channel)
if channel_composition:
composition.append(entry)
composition_channels.append(channel)
for _name in channel.display_names:
self.channels_db.add(_name, entry)
self.channels_db.add(channel.name, entry)
# signal data
cn_data_addr = channel.data_block_addr
if cn_data_addr:
grp.signal_data.append(
([], self._get_signal_data_blocks_info(cn_data_addr, stream))
)
else:
grp.signal_data.append(None)
if cn_data_addr:
self._cn_data_map[cn_data_addr] = entry
if channel.channel_type in MASTER_CHANNELS:
self.masters_db[dg_cntr] = ch_cntr
ch_cntr += 1
component_addr = channel.component_addr
if component_addr:
if (component_addr + 4) > self.file_limit:
logger.warning(
f"Channel component address {component_addr:X} is outside the file size {self.file_limit}"
)
break
# check if it is a CABLOCK or CNBLOCK
stream.seek(component_addr)
blk_id = stream.read(4)
if blk_id == b"##CN":
index = ch_cntr - 1
dependencies.append(None)
(
ch_cntr,
ret_composition,
ret_composition_dtype,
) = self._read_channels(
component_addr,
grp,
stream,
dg_cntr,
ch_cntr,
True,
mapped=mapped,
)
dependencies[index] = ret_composition
channel.dtype_fmt = ret_composition_dtype
else:
# only channel arrays with storage=CN_TEMPLATE are
# supported so far
first_dep = ca_block = ChannelArrayBlock(
address=component_addr, stream=stream, mapped=mapped
)
if ca_block.storage != v4c.CA_STORAGE_TYPE_CN_TEMPLATE:
logger.warning("Only CN template arrays are supported")
ca_list = [ca_block]
while ca_block.composition_addr:
stream.seek(ca_block.composition_addr)
blk_id = stream.read(4)
if blk_id == b"##CA":
ca_block = ChannelArrayBlock(
address=ca_block.composition_addr,
stream=stream,
mapped=mapped,
)
ca_list.append(ca_block)
else:
logger.warning(
"skipping CN block; CN block within CA block"
" is not implemented yet"
)
break
dependencies.append(ca_list)
channel.dtype_fmt = dtype(
get_fmt_v4(
channel.data_type,
channel.bit_offset + channel.bit_count,
channel.channel_type,
)
)
else:
dependencies.append(None)
channel.dtype_fmt = dtype(
get_fmt_v4(
channel.data_type,
channel.bit_offset + channel.bit_count,
channel.channel_type,
)
)
# go to next channel of the current channel group
ch_addr = channel.next_ch_addr
if channel_composition:
composition_channels.sort()
composition_dtype = dtype(
[
(unique_names.get_unique_name(channel.name), channel.dtype_fmt)
for channel in composition_channels
]
)
else:
composition = None
composition_dtype = None
return ch_cntr, composition, composition_dtype
def _load_signal_data(
self,
group: Group | None = None,
index: int | None = None,
start_offset: int | None = None,
end_offset: int | None = None,
) -> bytes:
"""this method is used to get the channel signal data, usually for
VLSD channels
Parameters
----------
address : int
address of referenced block
stream : handle
file IO stream handle
Returns
-------
data : bytes
signal data bytes
"""
data = []
if group is not None and index is not None:
info_blocks = group.signal_data[index]
if info_blocks is not None:
if start_offset is None and end_offset is None:
for info in group.get_signal_data_blocks(index):
address, original_size, compressed_size, block_type, param = (
info.address,
info.original_size,
info.compressed_size,
info.block_type,
info.param,
)
if not info.original_size:
continue
if info.location == v4c.LOCATION_TEMPORARY_FILE:
stream = self._tempfile
else:
stream = self._file
stream.seek(address)
new_data = stream.read(compressed_size)
if block_type == v4c.DZ_BLOCK_DEFLATE:
new_data = decompress(new_data, bufsize=original_size)
elif block_type == v4c.DZ_BLOCK_TRANSPOSED:
new_data = decompress(new_data, bufsize=original_size)
cols = param
lines = original_size // cols
nd = frombuffer(new_data[: lines * cols], dtype=uint8)
nd = nd.reshape((cols, lines))
new_data = nd.T.ravel().tobytes() + new_data[lines * cols :]
elif block_type == v4c.DZ_BLOCK_LZ:
new_data = lz_decompress(new_data)
data.append(new_data)
else:
start_offset = int(start_offset)
end_offset = int(end_offset)
current_offset = 0
for info in group.get_signal_data_blocks(index):
address, original_size, compressed_size, block_type, param = (
info.address,
info.original_size,
info.compressed_size,
info.block_type,
info.param,
)
if not info.original_size:
continue
if info.location == v4c.LOCATION_TEMPORARY_FILE:
stream = self._tempfile
else:
stream = self._file
if current_offset + original_size < start_offset:
current_offset += original_size
continue
stream.seek(address)
new_data = stream.read(compressed_size)
if block_type == v4c.DZ_BLOCK_DEFLATE:
new_data = decompress(new_data, bufsize=original_size)
elif block_type == v4c.DZ_BLOCK_TRANSPOSED:
new_data = decompress(new_data, bufsize=original_size)
cols = param
lines = original_size // cols
nd = frombuffer(new_data[: lines * cols], dtype=uint8)
nd = nd.reshape((cols, lines))
new_data = nd.T.ravel().tobytes() + new_data[lines * cols :]
elif block_type == v4c.DZ_BLOCK_LZ:
new_data = lz_decompress(new_data)
if current_offset + original_size > end_offset:
start_index = max(0, start_offset - current_offset)
(last_sample_size,) = UINT32_uf(
new_data, end_offset - current_offset
)
data.append(
new_data[
start_index : end_offset
- current_offset
+ last_sample_size
+ 4
]
)
break
else:
if start_offset > current_offset:
data.append(new_data[start_offset - current_offset :])
else:
data.append(new_data)
current_offset += original_size
data = b"".join(data)
else:
data = b""
else:
data = b""
return data
def _load_data(
self,
group: Group,
record_offset: int = 0,
record_count: int | None = None,
optimize_read: bool = False,
) -> Iterator[tuple[bytes, int, int, bytes | None]]:
"""get group's data block bytes"""
offset = 0
invalidation_offset = 0
has_yielded = False
_count = 0
data_group = group.data_group
data_blocks_info_generator = group.data_blocks_info_generator
channel_group = group.channel_group
if group.data_location == v4c.LOCATION_ORIGINAL_FILE:
stream = self._file
else:
stream = self._tempfile
read = stream.read
seek = stream.seek
if group.uses_ld:
samples_size = channel_group.samples_byte_nr
invalidation_size = channel_group.invalidation_bytes_nr
invalidation_record_offset = record_offset * invalidation_size
rm = True
else:
rm = False
samples_size = (
channel_group.samples_byte_nr + channel_group.invalidation_bytes_nr
)
invalidation_size = channel_group.invalidation_bytes_nr
record_offset *= samples_size
finished = False
if record_count is not None:
invalidation_record_count = record_count * invalidation_size
record_count *= samples_size
if not samples_size:
if rm:
yield b"", offset, _count, b""
else:
yield b"", offset, _count, None
else:
if group.read_split_count:
split_size = group.read_split_count * samples_size
invalidation_split_size = group.read_split_count * invalidation_size
else:
if self._read_fragment_size:
split_size = self._read_fragment_size // samples_size
invalidation_split_size = split_size * invalidation_size
split_size *= samples_size
else:
channels_nr = len(group.channels)
y_axis = CONVERT
idx = searchsorted(CHANNEL_COUNT, channels_nr, side="right") - 1
if idx < 0:
idx = 0
split_size = y_axis[idx]
split_size = split_size // samples_size
invalidation_split_size = split_size * invalidation_size
split_size *= samples_size
if split_size == 0:
split_size = samples_size
invalidation_split_size = invalidation_size
split_size = int(split_size)
invalidation_split_size = int(invalidation_split_size)
blocks = iter(group.data_blocks)
cur_size = 0
data = []
cur_invalidation_size = 0
invalidation_data = []
while True:
try:
info = next(blocks)
(
address,
original_size,
compressed_size,
block_type,
param,
block_limit,
) = (
info.address,
info.original_size,
info.compressed_size,
info.block_type,
info.param,
info.block_limit,
)
if rm and invalidation_size:
invalidation_info = info.invalidation_block
else:
invalidation_info = None
except StopIteration:
try:
info = next(data_blocks_info_generator)
(
address,
original_size,
compressed_size,
block_type,
param,
block_limit,
) = (
info.address,
info.original_size,
info.compressed_size,
info.block_type,
info.param,
info.block_limit,
)
if rm and invalidation_size:
invalidation_info = info.invalidation_block
else:
invalidation_info = None
group.data_blocks.append(info)
except StopIteration:
break
if offset + original_size < record_offset + 1:
offset += original_size
if rm and invalidation_size:
if invalidation_info.all_valid:
count = original_size // samples_size
invalidation_offset += count * invalidation_size
else:
invalidation_offset += invalidation_info.original_size
continue
seek(address)
new_data = read(compressed_size)
if block_type == v4c.DZ_BLOCK_DEFLATE:
new_data = decompress(new_data, bufsize=original_size)
elif block_type == v4c.DZ_BLOCK_TRANSPOSED:
new_data = decompress(new_data, bufsize=original_size)
cols = param
lines = original_size // cols
nd = frombuffer(new_data[: lines * cols], dtype=uint8)
nd = nd.reshape((cols, lines))
new_data = nd.T.ravel().tobytes() + new_data[lines * cols :]
elif block_type == v4c.DZ_BLOCK_LZ:
new_data = lz_decompress(new_data)
if block_limit is not None:
new_data = new_data[:block_limit]
if len(data) > split_size - cur_size:
new_data = memoryview(new_data)
if rm and invalidation_size:
if invalidation_info.all_valid:
count = original_size // samples_size
new_invalidation_data = b"\0" * (count * invalidation_size)
else:
seek(invalidation_info.address)
new_invalidation_data = read(invalidation_info.size)
if invalidation_info.block_type == v4c.DZ_BLOCK_DEFLATE:
new_invalidation_data = decompress(
new_invalidation_data,
bufsize=invalidation_info.original_size,
)
elif invalidation_info.block_type == v4c.DZ_BLOCK_TRANSPOSED:
new_invalidation_data = decompress(
new_invalidation_data,
bufsize=invalidation_info.original_size,
)
cols = invalidation_info.param
lines = invalidation_info.original_size // cols
nd = frombuffer(
new_invalidation_data[: lines * cols], dtype=uint8
)
nd = nd.reshape((cols, lines))
new_invalidation_data = (
nd.T.ravel().tobytes()
+ new_invalidation_data[lines * cols :]
)
if invalidation_info.block_limit is not None:
new_invalidation_data = new_invalidation_data[
: invalidation_info.block_limit
]
inv_size = len(new_invalidation_data)
if offset < record_offset:
delta = record_offset - offset
new_data = new_data[delta:]
original_size -= delta
offset = record_offset
if rm and invalidation_size:
delta = invalidation_record_offset - invalidation_offset
new_invalidation_data = new_invalidation_data[delta:]
inv_size -= delta
invalidation_offset = invalidation_record_offset
while original_size >= split_size - cur_size:
if data:
data.append(new_data[: split_size - cur_size])
new_data = new_data[split_size - cur_size :]
data_ = b"".join(data)
if rm and invalidation_size:
invalidation_data.append(
new_invalidation_data[
: invalidation_split_size - cur_invalidation_size
]
)
new_invalidation_data = new_invalidation_data[
invalidation_split_size - cur_invalidation_size :
]
invalidation_data_ = b"".join(invalidation_data)
if record_count is not None:
if rm and invalidation_size:
__data = data_[:record_count]
_count = len(__data) // samples_size
yield __data, offset // samples_size, _count, invalidation_data_[
:invalidation_record_count
]
invalidation_record_count -= len(invalidation_data_)
else:
__data = data_[:record_count]
_count = len(__data) // samples_size
yield __data, offset // samples_size, _count, None
has_yielded = True
record_count -= len(data_)
if record_count <= 0:
finished = True
break
else:
if rm and invalidation_size:
_count = len(data_) // samples_size
yield data_, offset // samples_size, _count, invalidation_data_
else:
_count = len(data_) // samples_size
yield data_, offset // samples_size, _count, None
has_yielded = True
data = []
else:
data_, new_data = (
new_data[:split_size],
new_data[split_size:],
)
if rm and invalidation_size:
invalidation_data_ = new_invalidation_data[
:invalidation_split_size
]
new_invalidation_data = new_invalidation_data[
invalidation_split_size:
]
if record_count is not None:
if rm and invalidation_size:
yield data_[
:record_count
], offset // samples_size, _count, invalidation_data_[
:invalidation_record_count
]
invalidation_record_count -= len(invalidation_data_)
else:
__data = data_[:record_count]
_count = len(__data) // samples_size
yield __data, offset // samples_size, _count, None
has_yielded = True
record_count -= len(data_)
if record_count <= 0:
finished = True
break
else:
if rm and invalidation_size:
_count = len(data_) // samples_size
yield data_, offset // samples_size, _count, invalidation_data_
else:
_count = len(data_) // samples_size
yield data_, offset // samples_size, _count, None
has_yielded = True
offset += split_size
original_size -= split_size - cur_size
data = []
cur_size = 0
if rm and invalidation_size:
invalidation_offset += invalidation_split_size
invalidation_data = []
cur_invalidation_size = 0
inv_size -= invalidation_split_size - cur_invalidation_size
if finished:
data = []
if rm and invalidation_size:
invalidation_data = []
break
if original_size:
data.append(new_data)
cur_size += original_size
original_size = 0
if rm and invalidation_size:
invalidation_data.append(new_invalidation_data)
cur_invalidation_size += inv_size
if data:
data_ = b"".join(data)
if rm and invalidation_size:
invalidation_data_ = b"".join(invalidation_data)
if record_count is not None:
if rm and invalidation_size:
__data = data_[:record_count]
_count = len(__data) // samples_size
yield __data, offset // samples_size, _count, invalidation_data_[
:invalidation_record_count
]
invalidation_record_count -= len(invalidation_data_)
else:
__data = data_[:record_count]
_count = len(__data) // samples_size
yield __data, offset // samples_size, _count, None
has_yielded = True
record_count -= len(data_)
else:
if rm and invalidation_size:
_count = len(data_) // samples_size
yield data_, offset // samples_size, _count, invalidation_data_
else:
_count = len(data_) // samples_size
yield data_, offset // samples_size, _count, None
has_yielded = True
data = []
if not has_yielded:
if rm and invalidation_size:
yield b"", 0, 0, b""
else:
yield b"", 0, 0, None
def _prepare_record(self, group: Group) -> list:
"""compute record
Parameters
----------
group : dict
MDF group dict
Returns
-------
record : list
mapping of channels to records fields, records fields dtype
"""
record = group.record
if record is None:
channels = group.channels
record = []
for idx, new_ch in enumerate(channels):
start_offset = new_ch.byte_offset
bit_offset = new_ch.bit_offset
data_type = new_ch.data_type
bit_count = new_ch.bit_count
ch_type = new_ch.channel_type
dependency_list = group.channel_dependencies[idx]
if ch_type not in v4c.VIRTUAL_TYPES and not dependency_list:
# adjust size to 1, 2, 4 or 8 bytes
size = bit_offset + bit_count
byte_size, rem = divmod(size, 8)
if rem:
byte_size += 1
bit_size = byte_size * 8
if data_type in (
v4c.DATA_TYPE_SIGNED_MOTOROLA,
v4c.DATA_TYPE_UNSIGNED_MOTOROLA,
):
if size > 32:
bit_offset += 64 - bit_size
elif size > 16:
bit_offset += 32 - bit_size
elif size > 8:
bit_offset += 16 - bit_size
if not new_ch.dtype_fmt:
new_ch.dtype_fmt = dtype(get_fmt_v4(data_type, size, ch_type))
if (
bit_offset
or new_ch.dtype_fmt.kind in "ui"
and size < 64
and size not in (8, 16, 32)
):
new_ch.standard_C_size = False
record.append(
(
new_ch.dtype_fmt,
new_ch.dtype_fmt.itemsize,
start_offset,
bit_offset,
)
)
else:
record.append(None)
group.record = record
return record
def _uses_ld(
self,
address: int,
stream: ReadableBufferType,
block_type: bytes = b"##DT",
mapped: bool = False,
) -> bool:
info = []
mapped = mapped or not is_file_like(stream)
uses_ld = False
if mapped:
if address:
id_string, block_len = COMMON_SHORT_uf(stream, address)
if id_string == b"##LD":
uses_ld = True
# or a header list
elif id_string == b"##HL":
hl = HeaderList(address=address, stream=stream, mapped=mapped)
address = hl.first_dl_addr
uses_ld = self._uses_ld(
address,
stream,
block_type,
mapped,
)
else:
if address:
stream.seek(address)
id_string, block_len = COMMON_SHORT_u(stream.read(COMMON_SHORT_SIZE))
# can be a DataBlock
if id_string == b"##LD":
uses_ld = True
# or a header list
elif id_string == b"##HL":
hl = HeaderList(address=address, stream=stream)
address = hl.first_dl_addr
uses_ld = self._uses_ld(
address,
stream,
block_type,
mapped,
)
return uses_ld
def _get_data_blocks_info(
self,
address: int,
stream: ReadableBufferType,
block_type: bytes = b"##DT",
mapped: bool = False,
total_size: int = 0,
inval_total_size: int = 0,
record_size: int = 0,
) -> Iterator[DataBlockInfo]:
mapped = mapped or not is_file_like(stream)
if record_size:
_32_MB = 32 * 1024 * 1024 // record_size * record_size
else:
_32_MB = 32 * 1024 * 1024
if mapped:
if address:
id_string, block_len = COMMON_SHORT_uf(stream, address)
# can be a DataBlock
if id_string == block_type:
size = block_len - 24
if size:
address = address + COMMON_SIZE
# split the DTBLOCK into chucks of up to 32MB
while True:
if size > _32_MB:
total_size -= _32_MB
size -= _32_MB
yield DataBlockInfo(
address=address,
block_type=v4c.DT_BLOCK,
original_size=_32_MB,
compressed_size=_32_MB,
param=0,
block_limit=None,
)
address += _32_MB
else:
if total_size < size:
block_limit = total_size
else:
block_limit = None
yield DataBlockInfo(
address=address,
block_type=v4c.DT_BLOCK,
original_size=size,
compressed_size=size,
param=0,
block_limit=block_limit,
)
break
# or a DataZippedBlock
elif id_string == b"##DZ":
(
zip_type,
param,
original_size,
zip_size,
) = v4c.DZ_COMMON_INFO_uf(
stream, address + v4c.DZ_INFO_COMMON_OFFSET
)
if original_size:
if zip_type == v4c.FLAG_DZ_DEFLATE:
block_type_ = v4c.DZ_BLOCK_DEFLATE
param = 0
else:
block_type_ = v4c.DZ_BLOCK_TRANSPOSED
if total_size < original_size:
block_limit = total_size
else:
block_limit = None
total_size -= original_size
yield DataBlockInfo(
address=address + v4c.DZ_COMMON_SIZE,
block_type=block_type_,
original_size=original_size,
compressed_size=zip_size,
param=param,
block_limit=block_limit,
)
# or a DataList
elif id_string == b"##DL":
while address:
dl = DataList(address=address, stream=stream, mapped=mapped)
for i in range(dl.data_block_nr):
addr = dl[f"data_block_addr{i}"]
id_string, block_len = COMMON_SHORT_uf(stream, addr)
# can be a DataBlock
if id_string == block_type:
size = block_len - 24
if size:
addr = addr + COMMON_SIZE
# split the DTBLOCK into chucks of up to 32MB
while True:
if size > _32_MB:
total_size -= _32_MB
size -= _32_MB
yield DataBlockInfo(
address=addr,
block_type=v4c.DT_BLOCK,
original_size=_32_MB,
compressed_size=_32_MB,
param=0,
block_limit=None,
)
addr += _32_MB
else:
if total_size < size:
block_limit = total_size
else:
block_limit = None
total_size -= size
yield DataBlockInfo(
address=addr,
block_type=v4c.DT_BLOCK,
original_size=size,
compressed_size=size,
param=0,
block_limit=block_limit,
)
break
# or a DataZippedBlock
elif id_string == b"##DZ":
(
zip_type,
param,
original_size,
zip_size,
) = v4c.DZ_COMMON_INFO_uf(
stream, addr + v4c.DZ_INFO_COMMON_OFFSET
)
if original_size:
if zip_type == v4c.FLAG_DZ_DEFLATE:
block_type_ = v4c.DZ_BLOCK_DEFLATE
param = 0
else:
block_type_ = v4c.DZ_BLOCK_TRANSPOSED
if total_size < original_size:
block_limit = total_size
else:
block_limit = None
total_size -= original_size
yield DataBlockInfo(
address=addr + v4c.DZ_COMMON_SIZE,
block_type=block_type_,
original_size=original_size,
compressed_size=zip_size,
param=param,
block_limit=block_limit,
)
address = dl.next_dl_addr
# or a ListData
elif id_string == b"##LD":
uses_ld = True
while address:
ld = ListData(address=address, stream=stream, mapped=mapped)
has_invalidation = ld.flags & v4c.FLAG_LD_INVALIDATION_PRESENT
for i in range(ld.data_block_nr):
addr = ld[f"data_block_addr_{i}"]
id_string, block_len = COMMON_SHORT_uf(stream, addr)
# can be a DataBlock
if id_string == b"##DV":
size = block_len - 24
if size:
if total_size < size:
block_limit = total_size
else:
block_limit = None
total_size -= size
data_info = DataBlockInfo(
address=addr + COMMON_SIZE,
block_type=v4c.DT_BLOCK,
original_size=size,
compressed_size=size,
param=0,
block_limit=block_limit,
)
# or a DataZippedBlock
elif id_string == b"##DZ":
(
zip_type,
param,
original_size,
zip_size,
) = v4c.DZ_COMMON_INFO_uf(
stream, addr + v4c.DZ_INFO_COMMON_OFFSET
)
if original_size:
if zip_type == v4c.FLAG_DZ_DEFLATE:
block_type_ = v4c.DZ_BLOCK_DEFLATE
param = 0
else:
block_type_ = v4c.DZ_BLOCK_TRANSPOSED
if total_size < original_size:
block_limit = total_size
else:
block_limit = None
total_size -= original_size
data_info = DataBlockInfo(
address=addr + v4c.DZ_COMMON_SIZE,
block_type=block_type_,
original_size=original_size,
compressed_size=zip_size,
param=param,
block_limit=block_limit,
)
if has_invalidation:
inval_addr = ld[f"invalidation_bits_addr_{i}"]
if inval_addr:
id_string, block_len = COMMON_SHORT_uf(
stream, inval_addr
)
if id_string == b"##DI":
size = block_len - 24
if size:
if inval_total_size < size:
block_limit = inval_total_size
else:
block_limit = None
inval_total_size -= size
data_info.invalidation_block = (
InvalidationBlockInfo(
address=inval_addr + COMMON_SIZE,
block_type=v4c.DT_BLOCK,
original_size=size,
compressed_size=size,
param=0,
block_limit=block_limit,
)
)
else:
(
zip_type,
param,
original_size,
zip_size,
) = v4c.DZ_COMMON_INFO_uf(
stream,
inval_addr + v4c.DZ_INFO_COMMON_OFFSET,
)
if original_size:
if zip_type == v4c.FLAG_DZ_DEFLATE:
block_type_ = v4c.DZ_BLOCK_DEFLATE
param = 0
else:
block_type_ = v4c.DZ_BLOCK_TRANSPOSED
if inval_total_size < original_size:
block_limit = inval_total_size
else:
block_limit = None
inval_total_size -= original_size
data_info.invalidation_block = (
InvalidationBlockInfo(
address=inval_addr
+ v4c.DZ_COMMON_SIZE,
block_type=block_type_,
original_size=original_size,
compressed_size=zip_size,
param=param,
block_limit=block_limit,
)
)
else:
data_info.invalidation_block = (
InvalidationBlockInfo(
address=0,
block_type=v4c.DT_BLOCK,
original_size=None,
compressed_size=None,
param=None,
all_valid=True,
)
)
yield data_info
address = ld.next_ld_addr
# or a header list
elif id_string == b"##HL":
hl = HeaderList(address=address, stream=stream, mapped=mapped)
address = hl.first_dl_addr
yield from self._get_data_blocks_info(
address,
stream,
block_type,
mapped,
total_size,
inval_total_size,
record_size,
)
else:
if address:
stream.seek(address)
id_string, block_len = COMMON_SHORT_u(stream.read(COMMON_SHORT_SIZE))
# can be a DataBlock
if id_string == block_type:
size = block_len - 24
if size:
address = address + COMMON_SIZE
# split the DTBLOCK into chucks of up to 32MB
while True:
if size > _32_MB:
total_size -= _32_MB
size -= _32_MB
yield DataBlockInfo(
address=address,
block_type=v4c.DT_BLOCK,
original_size=_32_MB,
compressed_size=_32_MB,
param=0,
block_limit=None,
)
address += _32_MB
else:
if total_size < size:
block_limit = total_size
else:
block_limit = None
yield DataBlockInfo(
address=address,
block_type=v4c.DT_BLOCK,
original_size=size,
compressed_size=size,
param=0,
block_limit=block_limit,
)
break
# or a DataZippedBlock
elif id_string == b"##DZ":
stream.seek(address + v4c.DZ_INFO_COMMON_OFFSET)
(
zip_type,
param,
original_size,
zip_size,
) = v4c.DZ_COMMON_INFO_u(stream.read(v4c.DZ_COMMON_INFO_SIZE))
if original_size:
if zip_type == v4c.FLAG_DZ_DEFLATE:
block_type_ = v4c.DZ_BLOCK_DEFLATE
param = 0
else:
block_type_ = v4c.DZ_BLOCK_TRANSPOSED
if total_size < original_size:
block_limit = total_size
else:
block_limit = None
total_size -= original_size
yield DataBlockInfo(
address=address + v4c.DZ_COMMON_SIZE,
block_type=block_type_,
original_size=original_size,
compressed_size=zip_size,
param=param,
block_limit=block_limit,
)
# or a DataList
elif id_string == b"##DL":
while address:
dl = DataList(address=address, stream=stream)
for i in range(dl.data_block_nr):
addr = dl[f"data_block_addr{i}"]
stream.seek(addr)
id_string, block_len = COMMON_SHORT_u(
stream.read(COMMON_SHORT_SIZE)
)
# can be a DataBlock
if id_string == block_type:
size = block_len - 24
if size:
addr = addr + COMMON_SIZE
# split the DTBLOCK into chucks of up to 32MB
while True:
if size > _32_MB:
total_size -= _32_MB
size -= _32_MB
yield DataBlockInfo(
address=addr,
block_type=v4c.DT_BLOCK,
original_size=_32_MB,
compressed_size=_32_MB,
param=0,
block_limit=None,
)
addr += _32_MB
else:
if total_size < size:
block_limit = total_size
else:
block_limit = None
total_size -= size
yield DataBlockInfo(
address=addr,
block_type=v4c.DT_BLOCK,
original_size=size,
compressed_size=size,
param=0,
block_limit=block_limit,
)
break
# or a DataZippedBlock
elif id_string == b"##DZ":
stream.seek(addr + v4c.DZ_INFO_COMMON_OFFSET)
(
zip_type,
param,
original_size,
zip_size,
) = v4c.DZ_COMMON_INFO_u(
stream.read(v4c.DZ_COMMON_INFO_SIZE)
)
if original_size:
if zip_type == v4c.FLAG_DZ_DEFLATE:
block_type_ = v4c.DZ_BLOCK_DEFLATE
param = 0
else:
block_type_ = v4c.DZ_BLOCK_TRANSPOSED
if total_size < original_size:
block_limit = total_size
else:
block_limit = None
total_size -= original_size
yield DataBlockInfo(
address=addr + v4c.DZ_COMMON_SIZE,
block_type=block_type_,
original_size=original_size,
compressed_size=zip_size,
param=param,
block_limit=block_limit,
)
address = dl.next_dl_addr
# or a DataList
elif id_string == b"##LD":
uses_ld = True
while address:
ld = ListData(address=address, stream=stream)
has_invalidation = ld.flags & v4c.FLAG_LD_INVALIDATION_PRESENT
for i in range(ld.data_block_nr):
addr = ld[f"data_block_addr{i}"]
stream.seek(addr)
id_string, block_len = COMMON_SHORT_u(
stream.read(COMMON_SHORT_SIZE)
)
# can be a DataBlock
if id_string == b"##DV":
size = block_len - 24
if size:
if total_size < size:
block_limit = total_size
else:
block_limit = None
total_size -= size
data_info = DataBlockInfo(
address=addr + COMMON_SIZE,
block_type=v4c.DT_BLOCK,
original_size=size,
compressed_size=size,
param=0,
block_limit=block_limit,
)
# or a DataZippedBlock
elif id_string == b"##DZ":
stream.seek(addr + v4c.DZ_INFO_COMMON_OFFSET)
(
zip_type,
param,
original_size,
zip_size,
) = v4c.DZ_COMMON_INFO_u(
stream.read(v4c.DZ_COMMON_INFO_SIZE)
)
if original_size:
if zip_type == v4c.FLAG_DZ_DEFLATE:
block_type_ = v4c.DZ_BLOCK_DEFLATE
param = 0
else:
block_type_ = v4c.DZ_BLOCK_TRANSPOSED
if total_size < original_size:
block_limit = total_size
else:
block_limit = None
total_size -= original_size
data_info = DataBlockInfo(
address=addr + v4c.DZ_COMMON_SIZE,
block_type=block_type_,
original_size=original_size,
compressed_size=zip_size,
param=param,
block_limit=block_limit,
)
if has_invalidation:
inval_addr = ld[f"invalidation_bits_addr_{i}"]
if inval_addr:
stream.seek(inval_addr)
id_string, block_len = COMMON_SHORT_u(
stream.read(COMMON_SHORT_SIZE)
)
if id_string == b"##DI":
size = block_len - 24
if size:
if inval_total_size < size:
block_limit = inval_total_size
else:
block_limit = None
inval_total_size -= size
data_info.invalidation_block = (
InvalidationBlockInfo(
address=inval_addr + COMMON_SIZE,
block_type=v4c.DT_BLOCK,
original_size=size,
compressed_size=size,
param=0,
block_limit=block_limit,
)
)
else:
stream.seek(
inval_addr + v4c.DZ_INFO_COMMON_OFFSET
)
(
zip_type,
param,
original_size,
zip_size,
) = v4c.DZ_COMMON_INFO_u(
stream.read(v4c.DZ_COMMON_INFO_SIZE)
)
if original_size:
if zip_type == v4c.FLAG_DZ_DEFLATE:
block_type_ = v4c.DZ_BLOCK_DEFLATE
param = 0
else:
block_type_ = v4c.DZ_BLOCK_TRANSPOSED
if inval_total_size < original_size:
block_limit = inval_total_size
else:
block_limit = None
inval_total_size -= original_size
data_info.invalidation_block = (
InvalidationBlockInfo(
address=inval_addr
+ v4c.DZ_COMMON_SIZE,
block_type=block_type_,
original_size=original_size,
compressed_size=zip_size,
param=param,
block_limit=block_limit,
)
)
else:
data_info.invalidation_block = (
InvalidationBlockInfo(
address=0,
block_type=v4c.DT_BLOCK,
original_size=0,
compressed_size=0,
param=0,
all_valid=True,
)
)
yield data_info
address = ld.next_ld_addr
# or a header list
elif id_string == b"##HL":
hl = HeaderList(address=address, stream=stream)
address = hl.first_dl_addr
yield from self._get_data_blocks_info(
address,
stream,
block_type,
mapped,
total_size,
inval_total_size,
record_size,
)
def _get_signal_data_blocks_info(
self,
address: int,
stream: ReadableBufferType,
) -> Iterator[SignalDataBlockInfo]:
if not address:
raise MdfException(
f"Expected non-zero SDBLOCK address but got 0x{address:X}"
)
stream.seek(address)
id_string, block_len = COMMON_SHORT_u(stream.read(COMMON_SHORT_SIZE))
# can be a DataBlock
if id_string == b"##SD":
size = block_len - 24
if size:
yield SignalDataBlockInfo(
address=address + COMMON_SIZE,
compressed_size=size,
original_size=size,
block_type=v4c.DT_BLOCK,
)
# or a DataZippedBlock
elif id_string == b"##DZ":
stream.seek(address + v4c.DZ_INFO_COMMON_OFFSET)
(
zip_type,
param,
original_size,
zip_size,
) = v4c.DZ_COMMON_INFO_u(stream.read(v4c.DZ_COMMON_INFO_SIZE))
if original_size:
if zip_type == v4c.FLAG_DZ_DEFLATE:
block_type_ = v4c.DZ_BLOCK_DEFLATE
param = 0
else:
block_type_ = v4c.DZ_BLOCK_TRANSPOSED
yield SignalDataBlockInfo(
address=address + v4c.DZ_COMMON_SIZE,
block_type=block_type_,
original_size=original_size,
compressed_size=zip_size,
param=param,
)
# or a DataList
elif id_string == b"##DL":
while address:
dl = DataList(address=address, stream=stream)
for i in range(dl.data_block_nr):
addr = dl[f"data_block_addr{i}"]
stream.seek(addr)
id_string, block_len = COMMON_SHORT_u(
stream.read(COMMON_SHORT_SIZE)
)
# can be a DataBlock
if id_string == b"##SD":
size = block_len - 24
if size:
yield SignalDataBlockInfo(
address=addr + COMMON_SIZE,
compressed_size=size,
original_size=size,
block_type=v4c.DT_BLOCK,
)
# or a DataZippedBlock
elif id_string == b"##DZ":
stream.seek(addr + v4c.DZ_INFO_COMMON_OFFSET)
(
zip_type,
param,
original_size,
zip_size,
) = v4c.DZ_COMMON_INFO_u(stream.read(v4c.DZ_COMMON_INFO_SIZE))
if original_size:
if zip_type == v4c.FLAG_DZ_DEFLATE:
block_type_ = v4c.DZ_BLOCK_DEFLATE
param = 0
else:
block_type_ = v4c.DZ_BLOCK_TRANSPOSED
yield SignalDataBlockInfo(
address=addr + v4c.DZ_COMMON_SIZE,
block_type=block_type_,
original_size=original_size,
compressed_size=zip_size,
param=param,
)
address = dl.next_dl_addr
# or a header list
elif id_string == b"##HL":
hl = HeaderList(address=address, stream=stream)
address = hl.first_dl_addr
yield from self._get_signal_data_blocks_info(
address,
stream,
)
def _filter_occurrences(
self,
occurrences: Sequence[tuple[int, int]],
source_name: str | None = None,
source_path: str | None = None,
acq_name: str | None = None,
) -> Iterator[tuple[int, int]]:
if source_name is not None:
occurrences = (
(gp_idx, cn_idx)
for gp_idx, cn_idx in occurrences
if (
self.groups[gp_idx].channels[cn_idx].source is not None
and self.groups[gp_idx].channels[cn_idx].source.name == source_name
)
or (
self.groups[gp_idx].channel_group.acq_source is not None
and self.groups[gp_idx].channel_group.acq_source.name == source_name
)
)
if source_path is not None:
occurrences = (
(gp_idx, cn_idx)
for gp_idx, cn_idx in occurrences
if (
self.groups[gp_idx].channels[cn_idx].source is not None
and self.groups[gp_idx].channels[cn_idx].source.path == source_path
)
or (
self.groups[gp_idx].channel_group.acq_source is not None
and self.groups[gp_idx].channel_group.acq_source.path == source_path
)
)
if acq_name is not None:
occurrences = (
(gp_idx, cn_idx)
for gp_idx, cn_idx in occurrences
if self.groups[gp_idx].channel_group.acq_name == acq_name
)
return occurrences
[docs] def get_invalidation_bits(
self,
group_index: int,
channel: Channel,
fragment: tuple[bytes, int, int, ReadableBufferType | None],
) -> NDArray[bool_]:
"""get invalidation indexes for the channel
Parameters
----------
group_index : int
group index
channel : Channel
channel object
fragment : (bytes, int)
(fragment bytes, fragment offset)
Returns
-------
invalidation_bits : iterable
iterable of valid channel indexes; if all are valid `None` is
returned
"""
group = self.groups[group_index]
data_bytes, offset, _count, invalidation_bytes = fragment
try:
invalidation = self._invalidation_cache[(group_index, offset, _count)]
except KeyError:
size = group.channel_group.invalidation_bytes_nr
if invalidation_bytes is None:
record = group.record
if record is None:
self._prepare_record(group)
invalidation_bytes = get_channel_raw_bytes(
data_bytes,
group.channel_group.samples_byte_nr
+ group.channel_group.invalidation_bytes_nr,
group.channel_group.samples_byte_nr,
size,
)
invalidation = frombuffer(invalidation_bytes, dtype=f"({size},)u1")
self._invalidation_cache[(group_index, offset, _count)] = invalidation
ch_invalidation_pos = channel.pos_invalidation_bit
pos_byte, pos_offset = ch_invalidation_pos // 8, ch_invalidation_pos % 8
mask = 1 << pos_offset
invalidation_bits = invalidation[:, pos_byte] & mask
invalidation_bits = invalidation_bits.astype(bool)
return invalidation_bits
[docs] def append(
self,
signals: list[Signal] | Signal | DataFrame,
acq_name: str | None = None,
acq_source: Source | None = None,
comment: str = "Python",
common_timebase: bool = False,
units: dict[str, str | bytes] | None = None,
) -> int | None:
"""
Appends a new data group.
For channel dependencies type Signals, the *samples* attribute must be
a numpy.recarray
Parameters
----------
signals : list | Signal | pandas.DataFrame
list of *Signal* objects, or a single *Signal* object, or a pandas
*DataFrame* object. All bytes columns in the pandas *DataFrame*
must be *utf-8* encoded
acq_name : str
channel group acquisition name
acq_source : Source
channel group acquisition source
comment : str
channel group comment; default 'Python'
common_timebase : bool
flag to hint that the signals have the same timebase. Only set this
if you know for sure that all appended channels share the same
time base
units : dict
will contain the signal units mapped to the signal names when
appending a pandas DataFrame
Examples
--------
>>> # case 1 conversion type None
>>> s1 = np.array([1, 2, 3, 4, 5])
>>> s2 = np.array([-1, -2, -3, -4, -5])
>>> s3 = np.array([0.1, 0.04, 0.09, 0.16, 0.25])
>>> t = np.array([0.001, 0.002, 0.003, 0.004, 0.005])
>>> names = ['Positive', 'Negative', 'Float']
>>> units = ['+', '-', '.f']
>>> info = {}
>>> s1 = Signal(samples=s1, timestamps=t, unit='+', name='Positive')
>>> s2 = Signal(samples=s2, timestamps=t, unit='-', name='Negative')
>>> s3 = Signal(samples=s3, timestamps=t, unit='flts', name='Floats')
>>> mdf = MDF4('new.mdf')
>>> mdf.append([s1, s2, s3], comment='created by asammdf v4.0.0')
>>> # case 2: VTAB conversions from channels inside another file
>>> mdf1 = MDF4('in.mf4')
>>> ch1 = mdf1.get("Channel1_VTAB")
>>> ch2 = mdf1.get("Channel2_VTABR")
>>> sigs = [ch1, ch2]
>>> mdf2 = MDF4('out.mf4')
>>> mdf2.append(sigs, comment='created by asammdf v4.0.0')
>>> mdf2.append(ch1, comment='just a single channel')
>>> df = pd.DataFrame.from_dict({'s1': np.array([1, 2, 3, 4, 5]), 's2': np.array([-1, -2, -3, -4, -5])})
>>> units = {'s1': 'V', 's2': 'A'}
>>> mdf2.append(df, units=units)
"""
source_block = (
SourceInformation.from_common_source(acq_source)
if acq_source
else acq_source
)
if isinstance(signals, Signal):
signals = [signals]
elif isinstance(signals, DataFrame):
self._append_dataframe(
signals,
acq_name=acq_name,
acq_source=source_block,
comment=comment,
units=units,
)
return
if not signals:
return
# check if the signals have a common timebase
# if not interpolate the signals using the union of all timebases
if signals:
t_ = signals[0].timestamps
if not common_timebase:
for s in signals[1:]:
if not array_equal(s.timestamps, t_):
different = True
break
else:
different = False
if different:
times = [s.timestamps for s in signals]
t = unique(concatenate(times)).astype(float64)
signals = [
s.interp(
t,
integer_interpolation_mode=self._integer_interpolation,
float_interpolation_mode=self._float_interpolation,
)
for s in signals
]
else:
t = t_
else:
t = t_
else:
t = []
if self.version >= "4.20" and (self._column_storage or 1):
return self._append_column_oriented(
signals, acq_name=acq_name, acq_source=source_block, comment=comment
)
dg_cntr = len(self.groups)
gp = Group(None)
gp.signal_data = gp_sdata = []
gp.channels = gp_channels = []
gp.channel_dependencies = gp_dep = []
gp.signal_types = gp_sig_types = []
cycles_nr = len(t)
# channel group
kwargs = {"cycles_nr": cycles_nr, "samples_byte_nr": 0}
gp.channel_group = ChannelGroup(**kwargs)
gp.channel_group.acq_name = acq_name
gp.channel_group.acq_source = source_block
gp.channel_group.comment = comment
gp.record = record = []
if any(sig.invalidation_bits is not None for sig in signals):
invalidation_bytes_nr = 1
gp.channel_group.invalidation_bytes_nr = invalidation_bytes_nr
inval_bits = []
else:
invalidation_bytes_nr = 0
inval_bits = []
inval_cntr = 0
self.groups.append(gp)
fields = []
ch_cntr = 0
offset = 0
defined_texts = {}
si_map = self._si_map
# setup all blocks related to the time master channel
file = self._tempfile
tell = file.tell
seek = file.seek
seek(0, 2)
if signals:
master_metadata = signals[0].master_metadata
else:
master_metadata = None
if master_metadata:
time_name, sync_type = master_metadata
if sync_type in (0, 1):
time_unit = "s"
elif sync_type == 2:
time_unit = "deg"
elif sync_type == 3:
time_unit = "m"
elif sync_type == 4:
time_unit = "index"
else:
time_name, sync_type = "time", v4c.SYNC_TYPE_TIME
time_unit = "s"
gp.channel_group.acq_source = source_block
if signals:
# time channel
t_type, t_size = fmt_to_datatype_v4(t.dtype, t.shape)
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_MASTER,
"data_type": t_type,
"sync_type": sync_type,
"byte_offset": 0,
"bit_offset": 0,
"bit_count": t_size,
}
ch = Channel(**kwargs)
ch.unit = time_unit
ch.name = time_name
ch.source = source_block
ch.dtype_fmt = t.dtype
name = time_name
gp_channels.append(ch)
gp_sdata.append(None)
self.channels_db.add(name, (dg_cntr, ch_cntr))
self.masters_db[dg_cntr] = 0
record.append(
(
t.dtype,
t.dtype.itemsize,
0,
0,
)
)
# time channel doesn't have channel dependencies
gp_dep.append(None)
fields.append((t.tobytes(), t.itemsize))
offset += t_size // 8
ch_cntr += 1
gp_sig_types.append(0)
for signal in signals:
sig = signal
samples = sig.samples
sig_dtype = samples.dtype
sig_shape = samples.shape
names = sig_dtype.names
name = signal.name
if names is None:
sig_type = v4c.SIGNAL_TYPE_SCALAR
if sig_dtype.kind in "SV":
sig_type = v4c.SIGNAL_TYPE_STRING
else:
prepare_record = False
if names in (v4c.CANOPEN_TIME_FIELDS, v4c.CANOPEN_DATE_FIELDS):
sig_type = v4c.SIGNAL_TYPE_CANOPEN
elif names[0] != sig.name:
sig_type = v4c.SIGNAL_TYPE_STRUCTURE_COMPOSITION
else:
sig_type = v4c.SIGNAL_TYPE_ARRAY
gp_sig_types.append(sig_type)
# first add the signals in the simple signal list
if sig_type == v4c.SIGNAL_TYPE_SCALAR:
# compute additional byte offset for large records size
s_type, s_size = fmt_to_datatype_v4(sig_dtype, sig_shape)
if (s_type, s_size) == (v4c.DATA_TYPE_BYTEARRAY, 0):
offsets = arange(len(samples), dtype=uint64) * (sig_shape[1] + 4)
values = [
full(len(samples), sig_shape[1], dtype=uint32),
samples,
]
types_ = [("o", uint32), ("s", sig_dtype, sig_shape[1:])]
data = fromarrays(values, dtype=types_)
data_size = len(data) * data.itemsize
if data_size:
data_addr = tell()
info = SignalDataBlockInfo(
address=data_addr,
compressed_size=data_size,
original_size=data_size,
location=v4c.LOCATION_TEMPORARY_FILE,
)
gp_sdata.append(
(
[info],
iter(EMPTY_TUPLE),
)
)
data.tofile(file)
else:
data_addr = 0
gp_sdata.append(
(
[],
iter(EMPTY_TUPLE),
)
)
byte_size = 8
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VLSD,
"bit_count": 64,
"byte_offset": offset,
"bit_offset": 0,
"data_type": s_type,
"data_block_addr": data_addr,
"flags": 0,
}
if invalidation_bytes_nr:
if signal.invalidation_bits is not None:
inval_bits.append(signal.invalidation_bits)
kwargs["flags"] |= v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = inval_cntr
inval_cntr += 1
ch = Channel(**kwargs)
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
ch.display_names = signal.display_names
ch.dtype_fmt = dtype("<u8")
# conversions for channel
conversion = conversion_transfer(signal.conversion, version=4)
if signal.raw:
ch.conversion = conversion
# source for channel
source = signal.source
if source:
if source in si_map:
ch.source = si_map[source]
else:
new_source = SourceInformation(
source_type=source.source_type, bus_type=source.bus_type
)
new_source.name = source.name
new_source.path = source.path
new_source.comment = source.comment
si_map[source] = new_source
ch.source = new_source
gp_channels.append(ch)
record.append(
(
uint64,
8,
offset,
0,
)
)
offset += byte_size
entry = (dg_cntr, ch_cntr)
self.channels_db.add(name, entry)
for _name in ch.display_names:
self.channels_db.add(_name, entry)
fields.append((offsets.tobytes(), 8))
ch_cntr += 1
# simple channels don't have channel dependencies
gp_dep.append(None)
else:
byte_size = s_size // 8 or 1
data_block_addr = 0
if sig_dtype.kind == "u" and signal.bit_count <= 4:
s_size = signal.bit_count
if signal.flags & signal.Flags.stream_sync:
channel_type = v4c.CHANNEL_TYPE_SYNC
if signal.attachment:
at_data, at_name, hash_sum = signal.attachment
attachment_index = self.attach(
at_data,
at_name,
hash_sum,
mime="video/avi",
embedded=False,
)
attachment = attachment_index
else:
attachment = None
sync_type = v4c.SYNC_TYPE_TIME
else:
channel_type = v4c.CHANNEL_TYPE_VALUE
sync_type = v4c.SYNC_TYPE_NONE
if signal.attachment:
at_data, at_name, hash_sum = signal.attachment
attachment_index = self.attach(at_data, at_name, hash_sum)
attachment = attachment_index
else:
attachment = None
kwargs = {
"channel_type": channel_type,
"sync_type": sync_type,
"bit_count": s_size,
"byte_offset": offset,
"bit_offset": 0,
"data_type": s_type,
"data_block_addr": data_block_addr,
"flags": 0,
}
if attachment is not None:
kwargs["attachment_addr"] = 0
if invalidation_bytes_nr and signal.invalidation_bits is not None:
inval_bits.append(signal.invalidation_bits)
kwargs["flags"] = v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = inval_cntr
inval_cntr += 1
ch = Channel(**kwargs)
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
ch.display_names = signal.display_names
if len(sig_shape) > 1:
ch.dtype_fmt = dtype((sig_dtype, sig_shape[1:]))
else:
ch.dtype_fmt = sig_dtype
ch.attachment = attachment
# conversions for channel
if signal.raw:
ch.conversion = conversion_transfer(
signal.conversion, version=4
)
# source for channel
source = signal.source
if source:
if source in si_map:
ch.source = si_map[source]
else:
new_source = SourceInformation(
source_type=source.source_type, bus_type=source.bus_type
)
new_source.name = source.name
new_source.path = source.path
new_source.comment = source.comment
si_map[source] = new_source
ch.source = new_source
gp_channels.append(ch)
record.append(
(
ch.dtype_fmt,
ch.dtype_fmt.itemsize,
offset,
0,
)
)
offset += byte_size
fields.append((samples.tobytes(), byte_size))
gp_sdata.append(None)
entry = (dg_cntr, ch_cntr)
self.channels_db.add(name, entry)
for _name in ch.display_names:
self.channels_db.add(_name, entry)
ch_cntr += 1
# simple channels don't have channel dependencies
gp_dep.append(None)
elif sig_type == v4c.SIGNAL_TYPE_CANOPEN:
if names == v4c.CANOPEN_TIME_FIELDS:
record.append(
(
dtype("V6"),
6,
offset,
0,
)
)
vals = signal.samples.tobytes()
fields.append((vals, 6))
byte_size = 6
s_type = v4c.DATA_TYPE_CANOPEN_TIME
s_dtype = dtype("V6")
else:
record.append(
(
dtype("V7"),
7,
offset,
0,
)
)
vals = []
for field in ("ms", "min", "hour", "day", "month", "year"):
if field == "hour":
vals.append(
signal.samples[field]
+ (signal.samples["summer_time"] << 7)
)
elif field == "day":
vals.append(
signal.samples[field]
+ (signal.samples["day_of_week"] << 4)
)
else:
vals.append(signal.samples[field])
vals = fromarrays(vals).tobytes()
fields.append((vals, 7))
byte_size = 7
s_type = v4c.DATA_TYPE_CANOPEN_DATE
s_dtype = dtype("V7")
s_size = byte_size * 8
# there is no channel dependency
gp_dep.append(None)
# add channel block
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VALUE,
"bit_count": s_size,
"byte_offset": offset,
"bit_offset": 0,
"data_type": s_type,
"flags": 0,
}
if invalidation_bytes_nr and signal.invalidation_bits is not None:
inval_bits.append(signal.invalidation_bits)
kwargs["flags"] |= v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = inval_cntr
inval_cntr += 1
ch = Channel(**kwargs)
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
ch.display_names = signal.display_names
ch.dtype_fmt = s_dtype
# source for channel
source = signal.source
if source:
if source in si_map:
ch.source = si_map[source]
else:
new_source = SourceInformation(
source_type=source.source_type, bus_type=source.bus_type
)
new_source.name = source.name
new_source.path = source.path
new_source.comment = source.comment
si_map[source] = new_source
ch.source = new_source
gp_channels.append(ch)
offset += byte_size
entry = (dg_cntr, ch_cntr)
self.channels_db.add(name, entry)
for _name in ch.display_names:
self.channels_db.add(_name, entry)
gp_sdata.append(None)
ch_cntr += 1
elif sig_type == v4c.SIGNAL_TYPE_STRUCTURE_COMPOSITION:
(
offset,
dg_cntr,
ch_cntr,
struct_self,
new_fields,
inval_cntr,
) = self._append_structure_composition(
gp,
signal,
offset,
dg_cntr,
ch_cntr,
defined_texts,
invalidation_bytes_nr,
inval_bits,
inval_cntr,
)
fields.extend(new_fields)
elif sig_type == v4c.SIGNAL_TYPE_ARRAY:
# here we have channel arrays or mdf v3 channel dependencies
samples = signal.samples[names[0]]
shape = samples.shape[1:]
if len(names) > 1 or len(shape) > 1:
# add channel dependency block for composed parent channel
dims_nr = len(shape)
names_nr = len(names)
if names_nr == 0:
kwargs = {
"dims": dims_nr,
"ca_type": v4c.CA_TYPE_LOOKUP,
"flags": v4c.FLAG_CA_FIXED_AXIS,
"byte_offset_base": samples.dtype.itemsize,
}
for i in range(dims_nr):
kwargs[f"dim_size_{i}"] = shape[i]
elif len(names) == 1:
kwargs = {
"dims": dims_nr,
"ca_type": v4c.CA_TYPE_ARRAY,
"flags": 0,
"byte_offset_base": samples.dtype.itemsize,
}
for i in range(dims_nr):
kwargs[f"dim_size_{i}"] = shape[i]
else:
kwargs = {
"dims": dims_nr,
"ca_type": v4c.CA_TYPE_LOOKUP,
"flags": v4c.FLAG_CA_AXIS,
"byte_offset_base": samples.dtype.itemsize,
}
for i in range(dims_nr):
kwargs[f"dim_size_{i}"] = shape[i]
parent_dep = ChannelArrayBlock(**kwargs)
gp_dep.append([parent_dep])
else:
# add channel dependency block for composed parent channel
kwargs = {
"dims": 1,
"ca_type": v4c.CA_TYPE_ARRAY,
"flags": 0,
"byte_offset_base": samples.dtype.itemsize,
"dim_size_0": shape[0],
}
parent_dep = ChannelArrayBlock(**kwargs)
gp_dep.append([parent_dep])
# first we add the structure channel
s_type, s_size = fmt_to_datatype_v4(samples.dtype, samples.shape, True)
# add channel block
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VALUE,
"bit_count": s_size,
"byte_offset": offset,
"bit_offset": 0,
"data_type": s_type,
"flags": 0,
}
if invalidation_bytes_nr:
if signal.invalidation_bits is not None:
inval_bits.append(signal.invalidation_bits)
kwargs["flags"] |= v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = inval_cntr
inval_cntr += 1
ch = Channel(**kwargs)
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
ch.display_names = signal.display_names
ch.dtype_fmt = samples.dtype
record.append(
(
samples.dtype,
samples.dtype.itemsize,
offset,
0,
)
)
# source for channel
source = signal.source
if source:
if source in si_map:
ch.source = si_map[source]
else:
new_source = SourceInformation(
source_type=source.source_type, bus_type=source.bus_type
)
new_source.name = source.name
new_source.path = source.path
new_source.comment = source.comment
si_map[source] = new_source
ch.source = new_source
gp_channels.append(ch)
size = s_size // 8
for dim in shape:
size *= dim
offset += size
fields.append((samples.tobytes(), size))
gp_sdata.append(None)
entry = (dg_cntr, ch_cntr)
self.channels_db.add(name, entry)
for _name in ch.display_names:
self.channels_db.add(_name, entry)
ch_cntr += 1
for name in names[1:]:
samples = signal.samples[name]
shape = samples.shape[1:]
# add channel dependency block
kwargs = {
"dims": 1,
"ca_type": v4c.CA_TYPE_SCALE_AXIS,
"flags": 0,
"byte_offset_base": samples.dtype.itemsize,
"dim_size_0": shape[0],
}
dep = ChannelArrayBlock(**kwargs)
gp_dep.append([dep])
# add components channel
s_type, s_size = fmt_to_datatype_v4(samples.dtype, ())
byte_size = s_size // 8 or 1
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VALUE,
"bit_count": s_size,
"byte_offset": offset,
"bit_offset": 0,
"data_type": s_type,
"flags": 0,
}
if invalidation_bytes_nr:
if signal.invalidation_bits is not None:
inval_bits.append(signal.invalidation_bits)
kwargs["flags"] |= v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = inval_cntr
inval_cntr += 1
ch = Channel(**kwargs)
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
ch.display_names = signal.display_names
ch.dtype_fmt = samples.dtype
record.append(
(
samples.dtype,
samples.dtype.itemsize,
offset,
0,
)
)
gp_channels.append(ch)
entry = dg_cntr, ch_cntr
parent_dep.axis_channels.append(entry)
for dim in shape:
byte_size *= dim
offset += byte_size
fields.append((samples.tobytes(), byte_size))
gp_sdata.append(None)
self.channels_db.add(name, entry)
ch_cntr += 1
else:
encoding = signal.encoding
samples = signal.samples
sig_dtype = samples.dtype
if encoding == "utf-8":
data_type = v4c.DATA_TYPE_STRING_UTF_8
elif encoding == "latin-1":
data_type = v4c.DATA_TYPE_STRING_LATIN_1
elif encoding == "utf-16-be":
data_type = v4c.DATA_TYPE_STRING_UTF_16_BE
elif encoding == "utf-16-le":
data_type = v4c.DATA_TYPE_STRING_UTF_16_LE
else:
raise MdfException(f'wrong encoding "{encoding}" for string signal')
if self.compact_vlsd:
data = []
offsets = []
off = 0
if encoding == "utf-16-le":
for elem in samples:
offsets.append(off)
size = len(elem)
if size % 2:
size += 1
elem = elem + b"\0"
data.append(UINT32_p(size))
data.append(elem)
off += size + 4
else:
for elem in samples:
offsets.append(off)
size = len(elem)
data.append(UINT32_p(size))
data.append(elem)
off += size + 4
data_size = off
offsets = array(offsets, dtype=uint64)
if data_size:
data_addr = tell()
info = SignalDataBlockInfo(
address=data_addr,
compressed_size=data_size,
original_size=data_size,
location=v4c.LOCATION_TEMPORARY_FILE,
)
gp_sdata.append(
(
[info],
iter(EMPTY_TUPLE),
)
)
file.seek(0, 2)
file.write(b"".join(data))
else:
data_addr = 0
gp_sdata.append(
(
[],
iter(EMPTY_TUPLE),
)
)
else:
offsets = arange(len(samples), dtype=uint64) * (
signal.samples.itemsize + 4
)
values = [
full(len(samples), samples.itemsize, dtype=uint32),
samples,
]
types_ = [("o", uint32), ("s", sig_dtype)]
data = fromarrays(values, dtype=types_)
data_size = len(data) * data.itemsize
if data_size:
data_addr = tell()
info = SignalDataBlockInfo(
address=data_addr,
compressed_size=data_size,
original_size=data_size,
location=v4c.LOCATION_TEMPORARY_FILE,
)
gp_sdata.append(
(
[info],
iter(EMPTY_TUPLE),
)
)
data.tofile(file)
else:
data_addr = 0
gp_sdata.append(
(
[],
iter(EMPTY_TUPLE),
)
)
# compute additional byte offset for large records size
byte_size = 8
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VLSD,
"bit_count": 64,
"byte_offset": offset,
"bit_offset": 0,
"data_type": data_type,
"data_block_addr": data_addr,
"flags": 0,
}
if invalidation_bytes_nr:
if signal.invalidation_bits is not None:
inval_bits.append(signal.invalidation_bits)
kwargs["flags"] |= v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = inval_cntr
inval_cntr += 1
ch = Channel(**kwargs)
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
ch.display_names = signal.display_names
ch.dtype_fmt = dtype("<u8")
# conversions for channel
conversion = conversion_transfer(signal.conversion, version=4)
if signal.raw:
ch.conversion = conversion
# source for channel
source = signal.source
if source:
if source in si_map:
ch.source = si_map[source]
else:
new_source = SourceInformation(
source_type=source.source_type, bus_type=source.bus_type
)
new_source.name = source.name
new_source.path = source.path
new_source.comment = source.comment
si_map[source] = new_source
ch.source = new_source
gp_channels.append(ch)
record.append(
(
uint64,
8,
offset,
0,
)
)
offset += byte_size
entry = (dg_cntr, ch_cntr)
self.channels_db.add(name, entry)
for _name in ch.display_names:
self.channels_db.add(_name, entry)
fields.append((offsets.tobytes(), 8))
ch_cntr += 1
# simple channels don't have channel dependencies
gp_dep.append(None)
if invalidation_bytes_nr:
invalidation_bytes_nr = len(inval_bits)
for _ in range(8 - invalidation_bytes_nr % 8):
inval_bits.append(zeros(cycles_nr, dtype=bool))
inval_bits.reverse()
invalidation_bytes_nr = len(inval_bits) // 8
gp.channel_group.invalidation_bytes_nr = invalidation_bytes_nr
inval_bits = fliplr(
packbits(array(inval_bits).T).reshape(
(cycles_nr, invalidation_bytes_nr)
)
)
if self.version < "4.20":
fields.append((inval_bits.tobytes(), invalidation_bytes_nr))
gp.channel_group.cycles_nr = cycles_nr
gp.channel_group.samples_byte_nr = offset
virtual_group = VirtualChannelGroup()
self.virtual_groups[dg_cntr] = virtual_group
self.virtual_groups_map[dg_cntr] = dg_cntr
virtual_group.groups.append(dg_cntr)
virtual_group.record_size = offset + invalidation_bytes_nr
virtual_group.cycles_nr = cycles_nr
# data group
gp.data_group = DataGroup()
gp.sorted = True
samples = data_block_from_arrays(fields, cycles_nr)
size = len(samples)
samples = memoryview(samples)
del fields
if size:
if self.version < "4.20":
block_size = self._write_fragment_size or 20 * 1024 * 1024
count = ceil(size / block_size)
for i in range(count):
data_ = samples[i * block_size : (i + 1) * block_size]
raw_size = len(data_)
data_ = lz_compress(data_)
size = len(data_)
data_address = self._tempfile.tell()
self._tempfile.write(data_)
gp.data_blocks.append(
DataBlockInfo(
address=data_address,
block_type=v4c.DZ_BLOCK_LZ,
original_size=raw_size,
compressed_size=size,
param=0,
)
)
else:
data_address = self._tempfile.tell()
gp.uses_ld = True
data_address = tell()
data = samples
raw_size = len(data)
data = lz_compress(data)
size = len(data)
self._tempfile.write(data)
gp.data_blocks.append(
DataBlockInfo(
address=data_address,
block_type=v4c.DZ_BLOCK_LZ,
original_size=raw_size,
compressed_size=size,
param=0,
)
)
if inval_bits is not None:
addr = tell()
data = inval_bits.tobytes()
raw_size = len(data)
data = lz_compress(data)
size = len(data)
self._tempfile.write(data)
gp.data_blocks[-1].invalidation_block(
InvalidationBlockInfo(
address=addr,
block_type=v4c.DZ_BLOCK_LZ,
original_size=raw_size,
compressed_size=size,
param=None,
)
)
gp.data_location = v4c.LOCATION_TEMPORARY_FILE
return dg_cntr
def _append_column_oriented(
self,
signals: list[Signal],
acq_name: str | None = None,
acq_source: Source | None = None,
comment: str | None = None,
) -> int:
defined_texts = {}
si_map = self._si_map
# setup all blocks related to the time master channel
file = self._tempfile
tell = file.tell
seek = file.seek
write = file.write
seek(0, 2)
dg_cntr = initial_dg_cntr = len(self.groups)
# add the master group
gp = Group(None)
gp.signal_data = gp_sdata = []
gp.channels = gp_channels = []
gp.channel_dependencies = gp_dep = []
gp.signal_types = gp_sig_types = []
gp.uses_ld = True
gp.data_group = DataGroup()
gp.sorted = True
gp.record = record = []
samples = signals[0].timestamps
cycles_nr = len(samples)
# channel group
kwargs = {"cycles_nr": cycles_nr, "samples_byte_nr": 0}
gp.channel_group = remote_master_channel_group = ChannelGroup(**kwargs)
gp.channel_group.acq_name = acq_name
gp.channel_group.acq_source = acq_source
gp.channel_group.comment = comment
self.groups.append(gp)
ch_cntr = 0
types = []
ch_cntr = 0
offset = 0
prepare_record = True
source_block = None
master_metadata = signals[0].master_metadata
if master_metadata:
time_name, sync_type = master_metadata
if sync_type in (0, 1):
time_unit = "s"
elif sync_type == 2:
time_unit = "deg"
elif sync_type == 3:
time_unit = "m"
elif sync_type == 4:
time_unit = "index"
else:
time_name, sync_type = "time", v4c.SYNC_TYPE_TIME
time_unit = "s"
gp.channel_group.acq_source = source_block
# time channel
t_type, t_size = fmt_to_datatype_v4(samples.dtype, samples.shape)
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_MASTER,
"data_type": t_type,
"sync_type": sync_type,
"byte_offset": 0,
"bit_offset": 0,
"bit_count": t_size,
}
ch = Channel(**kwargs)
ch.unit = time_unit
ch.name = time_name
ch.source = source_block
ch.dtype_fmt = samples.dtype
name = time_name
gp_channels.append(ch)
gp_sdata.append(None)
self.channels_db.add(name, (dg_cntr, ch_cntr))
self.masters_db[dg_cntr] = 0
record.append(
(
samples.dtype,
samples.dtype.itemsize,
offset,
0,
)
)
# time channel doesn't have channel dependencies
gp_dep.append(None)
types.append((name, samples.dtype))
offset += t_size // 8
ch_cntr += 1
gp_sig_types.append(0)
gp.channel_group.samples_byte_nr = offset
# data group
gp.data_group = DataGroup()
# data block
types = dtype(types)
gp.sorted = True
size = cycles_nr * samples.itemsize
cg_master_index = dg_cntr
virtual_group = VirtualChannelGroup()
self.virtual_groups[cg_master_index] = virtual_group
self.virtual_groups_map[dg_cntr] = dg_cntr
virtual_group.groups.append(dg_cntr)
virtual_group.record_size = offset
virtual_group.cycles_nr = cycles_nr
dg_cntr += 1
if size:
data_address = tell()
gp.data_location = v4c.LOCATION_TEMPORARY_FILE
write(samples.tobytes())
chunk = self._write_fragment_size // samples.itemsize
chunk *= samples.itemsize
while size:
if size > chunk:
gp.data_blocks.append(
DataBlockInfo(
address=data_address,
block_type=v4c.DT_BLOCK,
original_size=chunk,
compressed_size=chunk,
param=0,
)
)
data_address += chunk
size -= chunk
else:
gp.data_blocks.append(
DataBlockInfo(
address=data_address,
block_type=v4c.DT_BLOCK,
original_size=size,
compressed_size=size,
param=0,
)
)
size = 0
else:
gp.data_location = v4c.LOCATION_TEMPORARY_FILE
for signal in signals:
gp = Group(None)
gp.signal_data = gp_sdata = []
gp.channels = gp_channels = []
gp.channel_dependencies = gp_dep = []
gp.signal_types = gp_sig_types = []
gp.data_group = DataGroup()
gp.sorted = True
gp.uses_ld = True
gp.record = record = []
# channel group
kwargs = {
"cycles_nr": cycles_nr,
"samples_byte_nr": 0,
"flags": v4c.FLAG_CG_REMOTE_MASTER,
}
gp.channel_group = ChannelGroup(**kwargs)
gp.channel_group.acq_name = acq_name
gp.channel_group.acq_source = acq_source
gp.channel_group.comment = remote_master_channel_group.comment
gp.channel_group.cg_master_index = cg_master_index
self.groups.append(gp)
types = []
ch_cntr = 0
offset = 0
field_names = UniqueDB()
sig = signal
samples = sig.samples
sig_dtype = samples.dtype
sig_shape = samples.shape
names = sig_dtype.names
name = signal.name
if names is None:
sig_type = v4c.SIGNAL_TYPE_SCALAR
if sig_dtype.kind in "SV":
sig_type = v4c.SIGNAL_TYPE_STRING
else:
if names in (v4c.CANOPEN_TIME_FIELDS, v4c.CANOPEN_DATE_FIELDS):
sig_type = v4c.SIGNAL_TYPE_CANOPEN
elif names[0] != sig.name:
sig_type = v4c.SIGNAL_TYPE_STRUCTURE_COMPOSITION
else:
sig_type = v4c.SIGNAL_TYPE_ARRAY
gp_sig_types.append(sig_type)
# first add the signals in the simple signal list
if sig_type == v4c.SIGNAL_TYPE_SCALAR:
# compute additional byte offset for large records size
s_type, s_size = fmt_to_datatype_v4(sig_dtype, sig_shape)
byte_size = s_size // 8 or 1
if sig_dtype.kind == "u" and signal.bit_count <= 4:
s_size = signal.bit_count
if signal.flags & signal.Flags.stream_sync:
channel_type = v4c.CHANNEL_TYPE_SYNC
if signal.attachment:
at_data, at_name, hash_sum = signal.attachment
attachment_addr = self.attach(
at_data, at_name, hash_sum, mime="video/avi", embedded=False
)
data_block_addr = attachment_addr
else:
data_block_addr = 0
sync_type = v4c.SYNC_TYPE_TIME
else:
channel_type = v4c.CHANNEL_TYPE_VALUE
data_block_addr = 0
sync_type = v4c.SYNC_TYPE_NONE
kwargs = {
"channel_type": channel_type,
"sync_type": sync_type,
"bit_count": s_size,
"byte_offset": offset,
"bit_offset": 0,
"data_type": s_type,
"data_block_addr": data_block_addr,
"flags": 0,
}
if signal.invalidation_bits is not None:
invalidation_bits = signal.invalidation_bits
kwargs["flags"] = v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = 0
else:
invalidation_bits = None
ch = Channel(**kwargs)
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
ch.display_names = signal.display_names
# conversions for channel
if signal.raw:
ch.conversion = conversion_transfer(signal.conversion, version=4)
# source for channel
source = signal.source
if source:
if source in si_map:
ch.source = si_map[source]
else:
new_source = SourceInformation(
source_type=source.source_type, bus_type=source.bus_type
)
new_source.name = source.name
new_source.path = source.path
new_source.comment = source.comment
si_map[source] = new_source
ch.source = new_source
gp_channels.append(ch)
gp_sdata.append(None)
entry = (dg_cntr, ch_cntr)
self.channels_db.add(name, entry)
for _name in ch.display_names:
self.channels_db.add(_name, entry)
_shape = sig_shape[1:]
types.append((name, sig_dtype, _shape))
gp.single_channel_dtype = ch.dtype_fmt = dtype((sig_dtype, _shape))
record.append(
(
ch.dtype_fmt,
ch.dtype_fmt.itemsize,
0,
0,
)
)
offset = byte_size
# simple channels don't have channel dependencies
gp_dep.append(None)
elif sig_type == v4c.SIGNAL_TYPE_CANOPEN:
if names == v4c.CANOPEN_TIME_FIELDS:
record.append(
(
dtype("V6"),
6,
0,
0,
)
)
types.append((name, "V6"))
gp.single_channel_dtype = dtype("V6")
byte_size = 6
s_type = v4c.DATA_TYPE_CANOPEN_TIME
else:
record.append(
(
dtype("V7"),
7,
0,
0,
)
)
vals = []
for field in ("ms", "min", "hour", "day", "month", "year"):
if field == "hour":
vals.append(
signal.samples[field]
+ (signal.samples["summer_time"] << 7)
)
elif field == "day":
vals.append(
signal.samples[field]
+ (signal.samples["day_of_week"] << 4)
)
else:
vals.append(signal.samples[field])
samples = fromarrays(vals)
types.append((name, "V7"))
gp.single_channel_dtype = dtype("V7")
byte_size = 7
s_type = v4c.DATA_TYPE_CANOPEN_DATE
s_size = byte_size * 8
# there is no channel dependency
gp_dep.append(None)
# add channel block
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VALUE,
"bit_count": s_size,
"byte_offset": offset,
"bit_offset": 0,
"data_type": s_type,
"flags": 0,
}
if signal.invalidation_bits is not None:
invalidation_bits = signal.invalidation_bits
kwargs["flags"] = v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = 0
else:
invalidation_bits = None
ch = Channel(**kwargs)
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
ch.display_names = signal.display_names
ch.dtype_fmt = gp.single_channel_dtype
# source for channel
source = signal.source
if source:
if source in si_map:
ch.source = si_map[source]
else:
new_source = SourceInformation(
source_type=source.source_type, bus_type=source.bus_type
)
new_source.name = source.name
new_source.path = source.path
new_source.comment = source.comment
si_map[source] = new_source
ch.source = new_source
gp_channels.append(ch)
offset = byte_size
entry = (dg_cntr, ch_cntr)
self.channels_db.add(name, entry)
for _name in ch.display_names:
self.channels_db.add(_name, entry)
gp_sdata.append(None)
elif sig_type == v4c.SIGNAL_TYPE_STRUCTURE_COMPOSITION:
(
offset,
dg_cntr,
ch_cntr,
struct_self,
new_fields,
new_types,
) = self._append_structure_composition_column_oriented(
gp,
signal,
field_names,
offset,
dg_cntr,
ch_cntr,
defined_texts,
)
if signal.invalidation_bits is not None:
invalidation_bits = signal.invalidation_bits
else:
invalidation_bits = None
gp["types"] = dtype(new_types)
offset = gp["types"].itemsize
samples = signal.samples
elif sig_type == v4c.SIGNAL_TYPE_ARRAY:
fields = []
# here we have channel arrays or mdf v3 channel dependencies
samples = signal.samples[names[0]]
shape = samples.shape[1:]
if len(names) > 1 or len(shape) > 1:
# add channel dependency block for composed parent channel
dims_nr = len(shape)
names_nr = len(names)
if names_nr == 0:
kwargs = {
"dims": dims_nr,
"ca_type": v4c.CA_TYPE_LOOKUP,
"flags": v4c.FLAG_CA_FIXED_AXIS,
"byte_offset_base": samples.dtype.itemsize,
}
for i in range(dims_nr):
kwargs[f"dim_size_{i}"] = shape[i]
elif len(names) == 1:
kwargs = {
"dims": dims_nr,
"ca_type": v4c.CA_TYPE_ARRAY,
"flags": 0,
"byte_offset_base": samples.dtype.itemsize,
}
for i in range(dims_nr):
kwargs[f"dim_size_{i}"] = shape[i]
else:
kwargs = {
"dims": dims_nr,
"ca_type": v4c.CA_TYPE_LOOKUP,
"flags": v4c.FLAG_CA_AXIS,
"byte_offset_base": samples.dtype.itemsize,
}
for i in range(dims_nr):
kwargs[f"dim_size_{i}"] = shape[i]
parent_dep = ChannelArrayBlock(**kwargs)
gp_dep.append([parent_dep])
else:
# add channel dependency block for composed parent channel
kwargs = {
"dims": 1,
"ca_type": v4c.CA_TYPE_SCALE_AXIS,
"flags": 0,
"byte_offset_base": samples.dtype.itemsize,
"dim_size_0": shape[0],
}
parent_dep = ChannelArrayBlock(**kwargs)
gp_dep.append([parent_dep])
field_name = field_names.get_unique_name(name)
fields.append(samples)
dtype_pair = field_name, samples.dtype, shape
types.append(dtype_pair)
record.append(
(
samples.dtype,
samples.dtype.itemsize,
offset,
0,
)
)
# first we add the structure channel
s_type, s_size = fmt_to_datatype_v4(samples.dtype, samples.shape, True)
# add channel block
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VALUE,
"bit_count": s_size,
"byte_offset": offset,
"bit_offset": 0,
"data_type": s_type,
"flags": 0,
}
if signal.invalidation_bits is not None:
invalidation_bits = signal.invalidation_bits
kwargs["flags"] = v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = 0
else:
invalidation_bits = None
ch = Channel(**kwargs)
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
ch.display_names = signal.display_names
ch.dtype_fmt = samples.dtype
# source for channel
source = signal.source
if source:
if source in si_map:
ch.source = si_map[source]
else:
new_source = SourceInformation(
source_type=source.source_type, bus_type=source.bus_type
)
new_source.name = source.name
new_source.path = source.path
new_source.comment = source.comment
si_map[source] = new_source
ch.source = new_source
gp_channels.append(ch)
size = s_size // 8
for dim in shape:
size *= dim
offset += size
gp_sdata.append(None)
entry = (dg_cntr, ch_cntr)
self.channels_db.add(name, entry)
for _name in ch.display_names:
self.channels_db.add(_name, entry)
ch_cntr += 1
for name in names[1:]:
field_name = field_names.get_unique_name(name)
samples = signal.samples[name]
shape = samples.shape[1:]
fields.append(samples)
types.append((field_name, samples.dtype, shape))
record.append(
(
samples.dtype,
samples.dtype.itemsize,
offset,
0,
)
)
# add channel dependency block
kwargs = {
"dims": 1,
"ca_type": v4c.CA_TYPE_SCALE_AXIS,
"flags": 0,
"byte_offset_base": samples.dtype.itemsize,
"dim_size_0": shape[0],
}
dep = ChannelArrayBlock(**kwargs)
gp_dep.append([dep])
# add components channel
s_type, s_size = fmt_to_datatype_v4(samples.dtype, ())
byte_size = s_size // 8 or 1
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VALUE,
"bit_count": s_size,
"byte_offset": offset,
"bit_offset": 0,
"data_type": s_type,
"flags": 0,
}
if signal.invalidation_bits is not None:
invalidation_bits = signal.invalidation_bits
kwargs["flags"] = v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = 0
else:
invalidation_bits = None
ch = Channel(**kwargs)
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
ch.display_names = signal.display_names
ch.dtype_fmt = samples.dtype
gp_channels.append(ch)
entry = dg_cntr, ch_cntr
parent_dep.axis_channels.append(entry)
for dim in shape:
byte_size *= dim
offset += byte_size
gp_sdata.append(None)
self.channels_db.add(name, entry)
ch_cntr += 1
gp["types"] = dtype(types)
samples = signal.samples
else:
encoding = signal.encoding
samples = signal.samples
sig_dtype = samples.dtype
if encoding == "utf-8":
data_type = v4c.DATA_TYPE_STRING_UTF_8
elif encoding == "latin-1":
data_type = v4c.DATA_TYPE_STRING_LATIN_1
elif encoding == "utf-16-be":
data_type = v4c.DATA_TYPE_STRING_UTF_16_BE
elif encoding == "utf-16-le":
data_type = v4c.DATA_TYPE_STRING_UTF_16_LE
else:
raise MdfException(f'wrong encoding "{encoding}" for string signal')
offsets = arange(len(samples), dtype=uint64) * (
signal.samples.itemsize + 4
)
values = [full(len(samples), samples.itemsize, dtype=uint32), samples]
types_ = [("o", uint32), ("s", sig_dtype)]
data = fromarrays(values, dtype=types_)
data_size = len(data) * data.itemsize
if data_size:
data_addr = tell()
info = SignalDataBlockInfo(
address=data_addr,
compressed_size=data_size,
original_size=data_size,
location=v4c.LOCATION_TEMPORARY_FILE,
)
gp_sdata.append(
(
[info],
iter(EMPTY_TUPLE),
)
)
data.tofile(file)
else:
data_addr = 0
gp_sdata.append(
(
[],
iter(EMPTY_TUPLE),
)
)
# compute additional byte offset for large records size
byte_size = 8
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VLSD,
"bit_count": 64,
"byte_offset": offset,
"bit_offset": 0,
"data_type": data_type,
"data_block_addr": data_addr,
"flags": 0,
}
if signal.invalidation_bits is not None:
invalidation_bits = signal.invalidation_bits
kwargs["flags"] = v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = 0
else:
invalidation_bits = None
ch = Channel(**kwargs)
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
ch.display_names = signal.display_names
# conversions for channel
conversion = conversion_transfer(signal.conversion, version=4)
if signal.raw:
ch.conversion = conversion
# source for channel
source = signal.source
if source:
if source in si_map:
ch.source = si_map[source]
else:
new_source = SourceInformation(
source_type=source.source_type, bus_type=source.bus_type
)
new_source.name = source.name
new_source.path = source.path
new_source.comment = source.comment
si_map[source] = new_source
ch.source = new_source
gp_channels.append(ch)
record.append(
(
uint64,
8,
offset,
0,
)
)
offset = byte_size
entry = (dg_cntr, ch_cntr)
self.channels_db.add(name, entry)
for _name in ch.display_names:
self.channels_db.add(_name, entry)
types.append((name, uint64))
gp.single_channel_dtype = ch.dtype_fmt = uint64
samples = offsets
# simple channels don't have channel dependencies
gp_dep.append(None)
gp.channel_group.samples_byte_nr = offset
if invalidation_bits is not None:
gp.channel_group.invalidation_bytes_nr = 1
virtual_group.groups.append(dg_cntr)
self.virtual_groups_map[dg_cntr] = cg_master_index
virtual_group.record_size += offset
if signal.invalidation_bits:
virtual_group.record_size += 1
dg_cntr += 1
size = cycles_nr * samples.itemsize
if size:
data_address = tell()
data = samples.tobytes()
raw_size = len(data)
data = lz_compress(data)
size = len(data)
write(data)
gp.data_blocks.append(
DataBlockInfo(
address=data_address,
block_type=v4c.DZ_BLOCK_LZ,
original_size=raw_size,
compressed_size=size,
param=0,
)
)
if invalidation_bits is not None:
addr = tell()
data = invalidation_bits.tobytes()
raw_size = len(data)
data = lz_compress(data)
size = len(data)
write(data)
gp.data_blocks[-1].invalidation_block(
InvalidationBlockInfo(
address=addr,
block_type=v4c.DZ_BLOCK_LZ,
original_size=raw_size,
compressed_size=size,
param=None,
)
)
gp.data_location = v4c.LOCATION_TEMPORARY_FILE
return initial_dg_cntr
def _append_dataframe(
self,
df: DataFrame,
acq_name: str | None = None,
acq_source: Source | None = None,
comment: str | None = None,
units: dict[str, str | bytes] = None,
) -> None:
"""
Appends a new data group from a Pandas data frame.
"""
units = units or {}
if df.shape == (0, 0):
return
t = df.index
index_name = df.index.name
time_name = index_name or "time"
sync_type = v4c.SYNC_TYPE_TIME
time_unit = "s"
dg_cntr = len(self.groups)
gp = Group(None)
gp.signal_data = gp_sdata = []
gp.channels = gp_channels = []
gp.channel_dependencies = gp_dep = []
gp.signal_types = gp_sig_types = []
gp.record = record = []
cycles_nr = len(t)
# channel group
kwargs = {"cycles_nr": cycles_nr, "samples_byte_nr": 0}
gp.channel_group = ChannelGroup(**kwargs)
gp.channel_group.acq_name = acq_name
gp.channel_group.acq_source = acq_source
gp.channel_group.comment = comment
self.groups.append(gp)
fields = []
types = []
ch_cntr = 0
offset = 0
field_names = UniqueDB()
# setup all blocks related to the time master channel
file = self._tempfile
tell = file.tell
seek = file.seek
seek(0, 2)
virtual_group = VirtualChannelGroup()
self.virtual_groups[dg_cntr] = virtual_group
self.virtual_groups_map[dg_cntr] = dg_cntr
virtual_group.groups.append(dg_cntr)
virtual_group.cycles_nr = cycles_nr
# time channel
t_type, t_size = fmt_to_datatype_v4(t.dtype, t.shape)
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_MASTER,
"data_type": t_type,
"sync_type": sync_type,
"byte_offset": 0,
"bit_offset": 0,
"bit_count": t_size,
"min_raw_value": t[0] if cycles_nr else 0,
"max_raw_value": t[-1] if cycles_nr else 0,
"lower_limit": t[0] if cycles_nr else 0,
"upper_limit": t[-1] if cycles_nr else 0,
"flags": v4c.FLAG_PHY_RANGE_OK | v4c.FLAG_VAL_RANGE_OK,
}
ch = Channel(**kwargs)
ch.unit = time_unit
ch.name = time_name
ch.dtype_fmt = t.dtype
name = time_name
gp_channels.append(ch)
gp_sdata.append(None)
self.channels_db.add(name, (dg_cntr, ch_cntr))
self.masters_db[dg_cntr] = 0
record.append(
(
t.dtype,
t.dtype.itemsize,
offset,
0,
)
)
# time channel doesn't have channel dependencies
gp_dep.append(None)
fields.append(t)
types.append((name, t.dtype))
field_names.get_unique_name(name)
offset += t_size // 8
ch_cntr += 1
gp_sig_types.append(0)
for signal in df:
if index_name == signal:
continue
sig = df[signal]
name = signal
sig_type = v4c.SIGNAL_TYPE_SCALAR
if sig.dtype.kind in "SV":
sig_type = v4c.SIGNAL_TYPE_STRING
gp_sig_types.append(sig_type)
# first add the signals in the simple signal list
if sig_type == v4c.SIGNAL_TYPE_SCALAR:
# compute additional byte offset for large records size
if sig.dtype.kind == "O":
sig = encode(sig.values.astype(str), "utf-8")
s_type, s_size = fmt_to_datatype_v4(sig.dtype, sig.shape)
byte_size = s_size // 8 or 1
channel_type = v4c.CHANNEL_TYPE_VALUE
data_block_addr = 0
sync_type = v4c.SYNC_TYPE_NONE
kwargs = {
"channel_type": channel_type,
"sync_type": sync_type,
"bit_count": s_size,
"byte_offset": offset,
"bit_offset": 0,
"data_type": s_type,
"data_block_addr": data_block_addr,
}
ch = Channel(**kwargs)
ch.name = name
ch.unit = units.get(name, "")
ch.dtype_fmt = dtype((sig.dtype, sig.shape[1:]))
record.append(
(
ch.dtype_fmt,
ch.dtype_fmt.itemsize,
offset,
0,
)
)
gp_channels.append(ch)
offset += byte_size
gp_sdata.append(None)
self.channels_db.add(name, (dg_cntr, ch_cntr))
field_name = field_names.get_unique_name(name)
fields.append(sig)
types.append((field_name, sig.dtype, sig.shape[1:]))
ch_cntr += 1
# simple channels don't have channel dependencies
gp_dep.append(None)
elif sig_type == v4c.SIGNAL_TYPE_STRING:
offsets = arange(len(sig), dtype=uint64) * (sig.dtype.itemsize + 4)
values = [full(len(sig), sig.dtype.itemsize, dtype=uint32), sig.values]
types_ = [("", uint32), ("", sig.dtype)]
data = fromarrays(values, dtype=types_)
data_size = len(data) * data.itemsize
if data_size:
data_addr = tell()
info = SignalDataBlockInfo(
address=data_addr,
compressed_size=data_size,
original_size=data_size,
location=v4c.LOCATION_TEMPORARY_FILE,
)
gp_sdata.append(
(
[info],
iter(EMPTY_TUPLE),
)
)
data.tofile(file)
else:
data_addr = 0
gp_sdata.append(
(
[],
iter(EMPTY_TUPLE),
)
)
# compute additional byte offset for large records size
byte_size = 8
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VLSD,
"bit_count": 64,
"byte_offset": offset,
"bit_offset": 0,
"data_type": v4c.DATA_TYPE_STRING_UTF_8,
"min_raw_value": 0,
"max_raw_value": 0,
"lower_limit": 0,
"upper_limit": 0,
"flags": 0,
"data_block_addr": data_addr,
}
ch = Channel(**kwargs)
ch.name = name
ch.unit = units.get(name, "")
ch.dtype_fmt = dtype("<u8")
gp_channels.append(ch)
record.append(
(
uint64,
8,
offset,
0,
)
)
offset += byte_size
self.channels_db.add(name, (dg_cntr, ch_cntr))
field_name = field_names.get_unique_name(name)
fields.append(offsets)
types.append((field_name, uint64))
ch_cntr += 1
# simple channels don't have channel dependencies
gp_dep.append(None)
virtual_group.record_size = offset
virtual_group.cycles_nr = cycles_nr
gp.channel_group.cycles_nr = cycles_nr
gp.channel_group.samples_byte_nr = offset
# data group
gp.data_group = DataGroup()
# data block
types = dtype(types)
gp.sorted = True
if df.shape[0]:
samples = fromarrays(fields, dtype=types)
else:
samples = array([])
size = len(samples) * samples.itemsize
if size:
data_address = self._tempfile.tell()
gp.data_location = v4c.LOCATION_TEMPORARY_FILE
samples.tofile(self._tempfile)
self._tempfile.write(samples.tobytes())
gp.data_blocks.append(
DataBlockInfo(
address=data_address,
block_type=v4c.DT_BLOCK,
original_size=size,
compressed_size=size,
param=0,
)
)
else:
gp.data_location = v4c.LOCATION_TEMPORARY_FILE
def _append_structure_composition(
self,
grp: Group,
signal: Signal,
offset: int,
dg_cntr: int,
ch_cntr: int,
defined_texts: dict[str, int],
invalidation_bytes_nr: int,
inval_bits: list[NDArray[Any]],
inval_cntr: int,
) -> tuple[
int,
int,
int,
tuple[int, int],
list[NDArray[Any]],
list[tuple[str, dtype[Any], tuple[int, ...]]],
int,
]:
si_map = self._si_map
fields = []
file = self._tempfile
seek = file.seek
seek(0, 2)
gp = grp
gp_sdata = gp.signal_data
gp_channels = gp.channels
gp_dep = gp.channel_dependencies
record = gp.record
name = signal.name
names = signal.samples.dtype.names
# first we add the structure channel
if signal.attachment and signal.attachment[0]:
at_data, at_name, hash_sum = signal.attachment
if at_name is not None:
suffix = Path(at_name).suffix.lower().strip(".")
else:
suffix = "dbc"
if suffix == "a2l":
mime = "applciation/A2L"
else:
mime = f"application/x-{suffix}"
attachment_index = self.attach(
at_data, at_name, hash_sum=hash_sum, mime=mime
)
attachment = attachment_index
else:
attachment = None
# add channel block
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VALUE,
"bit_count": signal.samples.dtype.itemsize * 8,
"byte_offset": offset,
"bit_offset": 0,
"data_type": v4c.DATA_TYPE_BYTEARRAY,
"precision": 0,
}
if attachment is not None:
kwargs["attachment_addr"] = 0
source_bus = signal.source and signal.source.source_type == v4c.SOURCE_BUS
if source_bus:
kwargs["flags"] = v4c.FLAG_CN_BUS_EVENT
flags_ = v4c.FLAG_CN_BUS_EVENT
grp.channel_group.flags |= (
v4c.FLAG_CG_BUS_EVENT | v4c.FLAG_CG_PLAIN_BUS_EVENT
)
else:
kwargs["flags"] = 0
flags_ = 0
if invalidation_bytes_nr and signal.invalidation_bits is not None:
inval_bits.append(signal.invalidation_bits)
kwargs["flags"] |= v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = inval_cntr
inval_cntr += 1
ch = Channel(**kwargs)
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
ch.display_names = signal.display_names
ch.attachment = attachment
ch.dtype_fmt = signal.samples.dtype
record.append((ch.dtype_fmt, ch.dtype_fmt.itemsize, offset, 0))
if source_bus and grp.channel_group.acq_source is None:
grp.channel_group.acq_source = SourceInformation.from_common_source(
signal.source
)
if signal.source.bus_type == v4c.BUS_TYPE_CAN:
grp.channel_group.path_separator = 46
grp.channel_group.acq_name = "CAN"
elif signal.source.bus_type == v4c.BUS_TYPE_FLEXRAY:
grp.channel_group.path_separator = 46
grp.channel_group.acq_name = "FLEXRAY"
elif signal.source.bus_type == v4c.BUS_TYPE_ETHERNET:
grp.channel_group.path_separator = 46
grp.channel_group.acq_name = "ETHERNET"
elif signal.source.bus_type == v4c.BUS_TYPE_K_LINE:
grp.channel_group.path_separator = 46
grp.channel_group.acq_name = "K_LINE"
elif signal.source.bus_type == v4c.BUS_TYPE_MOST:
grp.channel_group.path_separator = 46
grp.channel_group.acq_name = "MOST"
elif signal.source.bus_type == v4c.BUS_TYPE_LIN:
grp.channel_group.path_separator = 46
grp.channel_group.acq_name = "LIN"
# source for channel
source = signal.source
if source:
if source in si_map:
ch.source = si_map[source]
else:
new_source = SourceInformation(
source_type=source.source_type, bus_type=source.bus_type
)
new_source.name = source.name
new_source.path = source.path
new_source.comment = source.comment
si_map[source] = new_source
ch.source = new_source
entry = dg_cntr, ch_cntr
gp_channels.append(ch)
struct_self = entry
gp_sdata.append(None)
self.channels_db.add(name, entry)
for _name in ch.display_names:
self.channels_db.add(_name, entry)
ch_cntr += 1
dep_list = []
gp_dep.append(dep_list)
# then we add the fields
for name in names:
samples = signal.samples[name]
fld_names = samples.dtype.names
if fld_names is None:
sig_type = v4c.SIGNAL_TYPE_SCALAR
if samples.dtype.kind in "SV":
sig_type = v4c.SIGNAL_TYPE_STRING
else:
if fld_names in (v4c.CANOPEN_TIME_FIELDS, v4c.CANOPEN_DATE_FIELDS):
sig_type = v4c.SIGNAL_TYPE_CANOPEN
elif fld_names[0] != name:
sig_type = v4c.SIGNAL_TYPE_STRUCTURE_COMPOSITION
else:
sig_type = v4c.SIGNAL_TYPE_ARRAY
if sig_type in (v4c.SIGNAL_TYPE_SCALAR, v4c.SIGNAL_TYPE_STRING):
s_type, s_size = fmt_to_datatype_v4(samples.dtype, samples.shape)
byte_size = s_size // 8 or 1
# add channel block
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VALUE,
"bit_count": s_size,
"byte_offset": offset,
"bit_offset": 0,
"data_type": s_type,
"flags": flags_,
}
if invalidation_bytes_nr:
if signal.invalidation_bits is not None:
inval_bits.append(signal.invalidation_bits)
kwargs["flags"] |= v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = inval_cntr
inval_cntr += 1
ch = Channel(**kwargs)
ch.name = name
ch.dtype_fmt = dtype((samples.dtype, samples.shape[1:]))
record.append(
(
ch.dtype_fmt,
ch.dtype_fmt.itemsize,
offset,
0,
)
)
entry = (dg_cntr, ch_cntr)
gp_channels.append(ch)
dep_list.append(entry)
offset += byte_size
fields.append((samples.tobytes(), byte_size))
gp_sdata.append(None)
self.channels_db.add(name, entry)
ch_cntr += 1
gp_dep.append(None)
elif sig_type == v4c.SIGNAL_TYPE_ARRAY:
# here we have channel arrays or mdf v3 channel dependencies
array_samples = samples
names = samples.dtype.names
samples = array_samples[names[0]]
shape = samples.shape[1:]
if len(names) > 1:
# add channel dependency block for composed parent channel
dims_nr = len(shape)
names_nr = len(names)
if names_nr == 0:
kwargs = {
"dims": dims_nr,
"ca_type": v4c.CA_TYPE_LOOKUP,
"flags": v4c.FLAG_CA_FIXED_AXIS,
"byte_offset_base": samples.dtype.itemsize,
}
for i in range(dims_nr):
kwargs[f"dim_size_{i}"] = shape[i]
elif len(names) == 1:
kwargs = {
"dims": dims_nr,
"ca_type": v4c.CA_TYPE_ARRAY,
"flags": 0,
"byte_offset_base": samples.dtype.itemsize,
}
for i in range(dims_nr):
kwargs[f"dim_size_{i}"] = shape[i]
else:
kwargs = {
"dims": dims_nr,
"ca_type": v4c.CA_TYPE_LOOKUP,
"flags": v4c.FLAG_CA_AXIS,
"byte_offset_base": samples.dtype.itemsize,
}
for i in range(dims_nr):
kwargs[f"dim_size_{i}"] = shape[i]
parent_dep = ChannelArrayBlock(**kwargs)
gp_dep.append([parent_dep])
else:
# add channel dependency block for composed parent channel
kwargs = {
"dims": 1,
"ca_type": v4c.CA_TYPE_SCALE_AXIS,
"flags": 0,
"byte_offset_base": samples.dtype.itemsize,
"dim_size_0": shape[0],
}
parent_dep = ChannelArrayBlock(**kwargs)
gp_dep.append([parent_dep])
record.append(
(
samples.dtype,
samples.dtype.itemsize,
offset,
0,
)
)
# first we add the structure channel
s_type, s_size = fmt_to_datatype_v4(samples.dtype, samples.shape, True)
# add channel block
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VALUE,
"bit_count": s_size,
"byte_offset": offset,
"bit_offset": 0,
"data_type": s_type,
"flags": 0,
}
if invalidation_bytes_nr:
if signal.invalidation_bits is not None:
inval_bits.append(signal.invalidation_bits)
kwargs["flags"] |= v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = inval_cntr
inval_cntr += 1
ch = Channel(**kwargs)
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
ch.display_names = signal.display_names
ch.dtype_fmt = samples.dtype
# source for channel
source = signal.source
if source:
if source in si_map:
ch.source = si_map[source]
else:
new_source = SourceInformation(
source_type=source.source_type, bus_type=source.bus_type
)
new_source.name = source.name
new_source.path = source.path
new_source.comment = source.comment
si_map[source] = new_source
ch.source = new_source
gp_channels.append(ch)
size = s_size // 8
for dim in shape:
size *= dim
offset += size
fields.append((samples.tobytes(), size))
gp_sdata.append(None)
entry = (dg_cntr, ch_cntr)
self.channels_db.add(name, entry)
for _name in ch.display_names:
self.channels_db.add(_name, entry)
ch_cntr += 1
for name in names[1:]:
samples = array_samples[name]
shape = samples.shape[1:]
record.append(
(
samples.dtype,
samples.dtype.itemsize,
offset,
0,
)
)
# add channel dependency block
kwargs = {
"dims": 1,
"ca_type": v4c.CA_TYPE_SCALE_AXIS,
"flags": 0,
"byte_offset_base": samples.dtype.itemsize,
"dim_size_0": shape[0],
}
dep = ChannelArrayBlock(**kwargs)
gp_dep.append([dep])
# add components channel
s_type, s_size = fmt_to_datatype_v4(samples.dtype, ())
byte_size = s_size // 8 or 1
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VALUE,
"bit_count": s_size,
"byte_offset": offset,
"bit_offset": 0,
"data_type": s_type,
"flags": 0,
}
if invalidation_bytes_nr:
if signal.invalidation_bits is not None:
inval_bits.append(signal.invalidation_bits)
kwargs["flags"] |= v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = inval_cntr
inval_cntr += 1
ch = Channel(**kwargs)
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
ch.display_names = signal.display_names
ch.dtype_fmt = samples.dtype
gp_channels.append(ch)
entry = dg_cntr, ch_cntr
parent_dep.axis_channels.append(entry)
for dim in shape:
byte_size *= dim
offset += byte_size
fields.append((samples.tobytes(), byte_size))
gp_sdata.append(None)
self.channels_db.add(name, entry)
ch_cntr += 1
elif sig_type == v4c.SIGNAL_TYPE_STRUCTURE_COMPOSITION:
struct = Signal(
samples,
samples,
name=name,
invalidation_bits=signal.invalidation_bits,
)
(
offset,
dg_cntr,
ch_cntr,
sub_structure,
new_fields,
inval_cntr,
) = self._append_structure_composition(
grp,
struct,
offset,
dg_cntr,
ch_cntr,
defined_texts,
invalidation_bytes_nr,
inval_bits,
inval_cntr,
)
dep_list.append(sub_structure)
fields.extend(new_fields)
return offset, dg_cntr, ch_cntr, struct_self, fields, inval_cntr
def _append_structure_composition_column_oriented(
self,
grp: Group,
signal: Signal,
field_names: UniqueDB,
offset: int,
dg_cntr: int,
ch_cntr: int,
defined_texts: dict[str, int],
) -> tuple[
int,
int,
int,
tuple[int, int],
list[NDArray[Any]],
list[tuple[str, dtype[Any], tuple[int, ...]]],
]:
si_map = self._si_map
fields = []
types = []
file = self._tempfile
seek = file.seek
seek(0, 2)
gp = grp
gp_sdata = gp.signal_data
gp_channels = gp.channels
gp_dep = gp.channel_dependencies
record = gp.record
name = signal.name
names = signal.samples.dtype.names
field_name = field_names.get_unique_name(name)
# first we add the structure channel
if signal.attachment and signal.attachment[0]:
at_data, at_name, hash_sum = signal.attachment
if at_name is not None:
suffix = Path(at_name).suffix.strip(".")
else:
suffix = "dbc"
attachment_index = self.attach(
at_data, at_name, hash_sum=hash_sum, mime=f"application/x-{suffix}"
)
attachment = attachment_index
else:
attachment = None
# add channel block
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VALUE,
"bit_count": signal.samples.dtype.itemsize * 8,
"byte_offset": offset,
"bit_offset": 0,
"data_type": v4c.DATA_TYPE_BYTEARRAY,
"precision": 0,
}
if attachment is not None:
kwargs["attachment_addr"] = 0
source_bus = signal.source and signal.source.source_type == v4c.SOURCE_BUS
if source_bus:
kwargs["flags"] = v4c.FLAG_CN_BUS_EVENT
flags_ = v4c.FLAG_CN_BUS_EVENT
grp.channel_group.flags |= v4c.FLAG_CG_BUS_EVENT
else:
kwargs["flags"] = 0
flags_ = 0
if signal.invalidation_bits is not None:
kwargs["flags"] |= v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = 0
ch = Channel(**kwargs)
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
ch.display_names = signal.display_names
ch.attachment = attachment
ch.dtype_fmt = signal.samples.dtype
if source_bus:
grp.channel_group.acq_source = SourceInformation.from_common_source(
signal.source
)
if signal.source.bus_type == v4c.BUS_TYPE_CAN:
grp.channel_group.path_separator = 46
grp.channel_group.acq_name = "CAN"
elif signal.source.bus_type == v4c.BUS_TYPE_FLEXRAY:
grp.channel_group.path_separator = 46
grp.channel_group.acq_name = "FLEXRAY"
elif signal.source.bus_type == v4c.BUS_TYPE_ETHERNET:
grp.channel_group.path_separator = 46
grp.channel_group.acq_name = "ETHERNET"
# source for channel
source = signal.source
if source:
if source in si_map:
ch.source = si_map[source]
else:
new_source = SourceInformation(
source_type=source.source_type, bus_type=source.bus_type
)
new_source.name = source.name
new_source.path = source.path
new_source.comment = source.comment
si_map[source] = new_source
ch.source = new_source
entry = dg_cntr, ch_cntr
gp_channels.append(ch)
struct_self = entry
gp_sdata.append(None)
self.channels_db.add(name, entry)
for _name in ch.display_names:
self.channels_db.add(_name, entry)
ch_cntr += 1
dep_list = []
gp_dep.append(dep_list)
record.append((ch.dtype_fmt, ch.dtype_fmt.itemsize, offset, 0))
# then we add the fields
for name in names:
field_name = field_names.get_unique_name(name)
samples = signal.samples[name]
fld_names = samples.dtype.names
if fld_names is None:
sig_type = v4c.SIGNAL_TYPE_SCALAR
if samples.dtype.kind in "SV":
sig_type = v4c.SIGNAL_TYPE_STRING
else:
if fld_names in (v4c.CANOPEN_TIME_FIELDS, v4c.CANOPEN_DATE_FIELDS):
sig_type = v4c.SIGNAL_TYPE_CANOPEN
elif fld_names[0] != name:
sig_type = v4c.SIGNAL_TYPE_STRUCTURE_COMPOSITION
else:
sig_type = v4c.SIGNAL_TYPE_ARRAY
if sig_type in (v4c.SIGNAL_TYPE_SCALAR, v4c.SIGNAL_TYPE_STRING):
s_type, s_size = fmt_to_datatype_v4(samples.dtype, samples.shape)
byte_size = s_size // 8 or 1
fields.append(samples)
types.append((field_name, samples.dtype, samples.shape[1:]))
# add channel block
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VALUE,
"bit_count": s_size,
"byte_offset": offset,
"bit_offset": 0,
"data_type": s_type,
"flags": flags_,
}
if signal.invalidation_bits is not None:
kwargs["flags"] |= v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = 0
ch = Channel(**kwargs)
ch.name = name
ch.dtype_fmt = dtype((samples.dtype, samples.shape[1:]))
record.append(
(
ch.dtype_fmt,
ch.dtype_fmt.itemsize,
offset,
0,
)
)
entry = (dg_cntr, ch_cntr)
gp_channels.append(ch)
dep_list.append(entry)
offset += byte_size
gp_sdata.append(None)
self.channels_db.add(name, entry)
ch_cntr += 1
gp_dep.append(None)
elif sig_type == v4c.SIGNAL_TYPE_ARRAY:
# here we have channel arrays or mdf v3 channel dependencies
array_samples = samples
names = samples.dtype.names
samples = array_samples[names[0]]
shape = samples.shape[1:]
record.append(
(
samples.dtype,
samples.dtype.itemsize,
offset,
0,
)
)
if len(names) > 1:
# add channel dependency block for composed parent channel
dims_nr = len(shape)
names_nr = len(names)
if names_nr == 0:
kwargs = {
"dims": dims_nr,
"ca_type": v4c.CA_TYPE_LOOKUP,
"flags": v4c.FLAG_CA_FIXED_AXIS,
"byte_offset_base": samples.dtype.itemsize,
}
for i in range(dims_nr):
kwargs[f"dim_size_{i}"] = shape[i]
elif len(names) == 1:
kwargs = {
"dims": dims_nr,
"ca_type": v4c.CA_TYPE_ARRAY,
"flags": 0,
"byte_offset_base": samples.dtype.itemsize,
}
for i in range(dims_nr):
kwargs[f"dim_size_{i}"] = shape[i]
else:
kwargs = {
"dims": dims_nr,
"ca_type": v4c.CA_TYPE_LOOKUP,
"flags": v4c.FLAG_CA_AXIS,
"byte_offset_base": samples.dtype.itemsize,
}
for i in range(dims_nr):
kwargs[f"dim_size_{i}"] = shape[i]
parent_dep = ChannelArrayBlock(**kwargs)
gp_dep.append([parent_dep])
else:
# add channel dependency block for composed parent channel
kwargs = {
"dims": 1,
"ca_type": v4c.CA_TYPE_SCALE_AXIS,
"flags": 0,
"byte_offset_base": samples.dtype.itemsize,
"dim_size_0": shape[0],
}
parent_dep = ChannelArrayBlock(**kwargs)
gp_dep.append([parent_dep])
field_name = field_names.get_unique_name(name)
fields.append(samples)
dtype_pair = field_name, samples.dtype, shape
types.append(dtype_pair)
# first we add the structure channel
s_type, s_size = fmt_to_datatype_v4(samples.dtype, samples.shape, True)
# add channel block
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VALUE,
"bit_count": s_size,
"byte_offset": offset,
"bit_offset": 0,
"data_type": s_type,
"flags": 0,
}
if signal.invalidation_bits is not None:
kwargs["flags"] |= v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = 0
ch = Channel(**kwargs)
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
ch.display_names = signal.display_names
ch.dtype_fmt = samples.dtype
# source for channel
source = signal.source
if source:
if source in si_map:
ch.source = si_map[source]
else:
new_source = SourceInformation(
source_type=source.source_type, bus_type=source.bus_type
)
new_source.name = source.name
new_source.path = source.path
new_source.comment = source.comment
si_map[source] = new_source
ch.source = new_source
gp_channels.append(ch)
size = s_size // 8
for dim in shape:
size *= dim
offset += size
gp_sdata.append(None)
entry = (dg_cntr, ch_cntr)
self.channels_db.add(name, entry)
for _name in ch.display_names:
self.channels_db.add(_name, entry)
ch_cntr += 1
for name in names[1:]:
field_name = field_names.get_unique_name(name)
samples = array_samples[name]
shape = samples.shape[1:]
fields.append(samples)
types.append((field_name, samples.dtype, shape))
record.append(
(
samples.dtype,
samples.dtype.itemsize,
offset,
0,
)
)
# add channel dependency block
kwargs = {
"dims": 1,
"ca_type": v4c.CA_TYPE_SCALE_AXIS,
"flags": 0,
"byte_offset_base": samples.dtype.itemsize,
"dim_size_0": shape[0],
}
dep = ChannelArrayBlock(**kwargs)
gp_dep.append([dep])
# add components channel
s_type, s_size = fmt_to_datatype_v4(samples.dtype, ())
byte_size = s_size // 8 or 1
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VALUE,
"bit_count": s_size,
"byte_offset": offset,
"bit_offset": 0,
"data_type": s_type,
"flags": 0,
}
if signal.invalidation_bits is not None:
kwargs["flags"] |= v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = 0
ch = Channel(**kwargs)
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
ch.display_names = signal.display_names
ch.dtype_fmt = samples.dtype
gp_channels.append(ch)
entry = dg_cntr, ch_cntr
parent_dep.axis_channels.append(entry)
for dim in shape:
byte_size *= dim
offset += byte_size
gp_sdata.append(None)
self.channels_db.add(name, entry)
ch_cntr += 1
elif sig_type == v4c.SIGNAL_TYPE_STRUCTURE_COMPOSITION:
struct = Signal(
samples,
samples,
name=name,
invalidation_bits=signal.invalidation_bits,
)
(
offset,
dg_cntr,
ch_cntr,
sub_structure,
new_fields,
new_types,
) = self._append_structure_composition_column_oriented(
grp,
struct,
field_names,
offset,
dg_cntr,
ch_cntr,
defined_texts,
)
dep_list.append(sub_structure)
fields.extend(new_fields)
types.extend(new_types)
return offset, dg_cntr, ch_cntr, struct_self, fields, types
[docs] def extend(
self, index: int, signals: list[tuple[NDArray[Any], NDArray[Any] | None]]
) -> None:
"""
Extend a group with new samples. *signals* contains (values, invalidation_bits)
pairs for each extended signal. The first pair is the master channel's pair, and the
next pairs must respect the same order in which the signals were appended. The samples must have raw
or physical values according to the *Signals* used for the initial append.
Parameters
----------
index : int
group index
signals : list
list on (numpy.ndarray, numpy.ndarray) objects
Examples
--------
>>> # case 1 conversion type None
>>> s1 = np.array([1, 2, 3, 4, 5])
>>> s2 = np.array([-1, -2, -3, -4, -5])
>>> s3 = np.array([0.1, 0.04, 0.09, 0.16, 0.25])
>>> t = np.array([0.001, 0.002, 0.003, 0.004, 0.005])
>>> names = ['Positive', 'Negative', 'Float']
>>> units = ['+', '-', '.f']
>>> s1 = Signal(samples=s1, timestamps=t, unit='+', name='Positive')
>>> s2 = Signal(samples=s2, timestamps=t, unit='-', name='Negative')
>>> s3 = Signal(samples=s3, timestamps=t, unit='flts', name='Floats')
>>> mdf = MDF4('new.mdf')
>>> mdf.append([s1, s2, s3], comment='created by asammdf v1.1.0')
>>> t = np.array([0.006, 0.007, 0.008, 0.009, 0.010])
>>> # extend without invalidation bits
>>> mdf2.extend(0, [(t, None), (s1, None), (s2, None), (s3, None)])
>>> # some invaldiation btis
>>> s1_inv = np.array([0,0,0,1,1], dtype=np.bool)
>>> mdf2.extend(0, [(t, None), (s1.samples, None), (s2.samples, None), (s3.samples, None)])
"""
if self.version >= "4.20" and (self._column_storage or 1):
return self._extend_column_oriented(index, signals)
gp = self.groups[index]
if not signals:
message = '"append" requires a non-empty list of Signal objects'
raise MdfException(message)
stream = self._tempfile
fields = []
inval_bits = []
added_cycles = len(signals[0][0])
invalidation_bytes_nr = gp.channel_group.invalidation_bytes_nr
for i, ((signal, invalidation_bits), sig_type) in enumerate(
zip(signals, gp.signal_types)
):
# first add the signals in the simple signal list
if sig_type == v4c.SIGNAL_TYPE_SCALAR:
s_type, s_size = fmt_to_datatype_v4(signal.dtype, signal.shape)
byte_size = s_size // 8 or 1
fields.append((signal.tobytes(), byte_size))
if invalidation_bytes_nr and invalidation_bits is not None:
inval_bits.append(invalidation_bits)
elif sig_type == v4c.SIGNAL_TYPE_CANOPEN:
names = signal.dtype.names
if names == v4c.CANOPEN_TIME_FIELDS:
vals = signal.tobytes()
fields.append((vals, 6))
else:
vals = []
for field in ("ms", "min", "hour", "day", "month", "year"):
vals.append(signal[field])
vals = fromarrays(vals).tobytes()
fields.append((vals, 7))
if invalidation_bytes_nr and invalidation_bits is not None:
inval_bits.append(invalidation_bits)
elif sig_type == v4c.SIGNAL_TYPE_STRUCTURE_COMPOSITION:
if invalidation_bytes_nr and invalidation_bits is not None:
inval_bits.append(invalidation_bits)
fields.append((signal.tobytes(), signal.dtype.itemsize))
elif sig_type == v4c.SIGNAL_TYPE_ARRAY:
names = signal.dtype.names
samples = signal[names[0]]
shape = samples.shape[1:]
s_type, s_size = fmt_to_datatype_v4(samples.dtype, samples.shape, True)
size = s_size // 8
for dim in shape:
size *= dim
fields.append((samples.tobytes(), size))
if invalidation_bytes_nr and invalidation_bits is not None:
inval_bits.append(invalidation_bits)
for name in names[1:]:
samples = signal[name]
shape = samples.shape[1:]
s_type, s_size = fmt_to_datatype_v4(samples.dtype, ())
size = s_size // 8
for dim in shape:
size *= dim
fields.append((samples.tobytes(), size))
if invalidation_bytes_nr and invalidation_bits is not None:
inval_bits.append(invalidation_bits)
else:
if self.compact_vlsd:
cur_offset = sum(
blk.original_size for blk in gp.get_signal_data_blocks(i)
)
data = []
offsets = []
off = 0
if gp.channels[i].data_type == v4c.DATA_TYPE_STRING_UTF_16_LE:
for elem in signal:
offsets.append(off)
size = len(elem)
if size % 2:
size += 1
elem = elem + b"\0"
data.append(UINT32_p(size))
data.append(elem)
off += size + 4
else:
for elem in signal:
offsets.append(off)
size = len(elem)
data.append(UINT32_p(size))
data.append(elem)
off += size + 4
offsets = array(offsets, dtype=uint64)
stream.seek(0, 2)
addr = stream.tell()
data_size = off
if data_size:
info = SignalDataBlockInfo(
address=addr,
compressed_size=data_size,
original_size=data_size,
location=v4c.LOCATION_TEMPORARY_FILE,
)
gp.signal_data[i][0].append(info)
stream.write(b"".join(data))
offsets += cur_offset
fields.append((offsets.tobytes(), 8))
else:
cur_offset = sum(
blk.original_size for blk in gp.get_signal_data_blocks(i)
)
offsets = arange(len(signal), dtype=uint64) * (signal.itemsize + 4)
values = [full(len(signal), signal.itemsize, dtype=uint32), signal]
types_ = [("", uint32), ("", signal.dtype)]
values = fromarrays(values, dtype=types_)
stream.seek(0, 2)
addr = stream.tell()
block_size = len(values) * values.itemsize
if block_size:
info = SignalDataBlockInfo(
address=addr,
compressed_size=block_size,
original_size=block_size,
location=v4c.LOCATION_TEMPORARY_FILE,
)
gp.signal_data[i][0].append(info)
values.tofile(stream)
offsets += cur_offset
fields.append((offsets.tobytes(), 8))
if invalidation_bytes_nr and invalidation_bits is not None:
inval_bits.append(invalidation_bits)
if invalidation_bytes_nr:
invalidation_bytes_nr = len(inval_bits)
cycles_nr = len(inval_bits[0])
for _ in range(8 - invalidation_bytes_nr % 8):
inval_bits.append(zeros(cycles_nr, dtype=bool))
inval_bits.reverse()
invalidation_bytes_nr = len(inval_bits) // 8
gp.channel_group.invalidation_bytes_nr = invalidation_bytes_nr
inval_bits = fliplr(
packbits(array(inval_bits).T).reshape(
(cycles_nr, invalidation_bytes_nr)
)
)
if self.version < "4.20":
fields.append((inval_bits.tobytes(), invalidation_bytes_nr))
samples = data_block_from_arrays(fields, added_cycles)
size = len(samples)
del fields
stream.seek(0, 2)
addr = stream.tell()
if size:
if self.version < "4.20":
data = samples
raw_size = size
data = lz_compress(data)
size = len(data)
stream.write(data)
gp.data_blocks.append(
DataBlockInfo(
address=addr,
block_type=v4c.DZ_BLOCK_LZ,
original_size=raw_size,
compressed_size=size,
param=0,
)
)
gp.channel_group.cycles_nr += added_cycles
self.virtual_groups[index].cycles_nr += added_cycles
else:
data = samples
raw_size = size
data = lz_compress(data)
size = len(data)
stream.write(data)
gp.data_blocks.append(
DataBlockInfo(
address=addr,
block_type=v4c.DT_BLOCK_LZ,
original_size=raw_size,
compressed_size=size,
param=0,
)
)
gp.channel_group.cycles_nr += added_cycles
self.virtual_groups[index].cycles_nr += added_cycles
if invalidation_bytes_nr:
addr = stream.tell()
data = inval_bits.tobytes()
raw_size = len(data)
data = lz_compress(data)
size = len(data)
stream.write(data)
gp.data_blocks[-1].invalidation_block(
InvalidationBlockInfo(
address=addr,
block_type=v4c.DT_BLOCK_LZ,
original_size=raw_size,
compressed_size=size,
param=None,
)
)
def _extend_column_oriented(
self, index: int, signals: list[tuple[NDArray[Any], NDArray[Any] | None]]
) -> None:
"""
Extend a group with new samples. *signals* contains (values, invalidation_bits)
pairs for each extended signal. The first pair is the master channel's pair, and the
next pairs must respect the same order in which the signals were appended. The samples must have raw
or physical values according to the *Signals* used for the initial append.
Parameters
----------
index : int
group index
signals : list
list on (numpy.ndarray, numpy.ndarray) objects
Examples
--------
>>> # case 1 conversion type None
>>> s1 = np.array([1, 2, 3, 4, 5])
>>> s2 = np.array([-1, -2, -3, -4, -5])
>>> s3 = np.array([0.1, 0.04, 0.09, 0.16, 0.25])
>>> t = np.array([0.001, 0.002, 0.003, 0.004, 0.005])
>>> names = ['Positive', 'Negative', 'Float']
>>> units = ['+', '-', '.f']
>>> s1 = Signal(samples=s1, timestamps=t, unit='+', name='Positive')
>>> s2 = Signal(samples=s2, timestamps=t, unit='-', name='Negative')
>>> s3 = Signal(samples=s3, timestamps=t, unit='flts', name='Floats')
>>> mdf = MDF4('new.mdf')
>>> mdf.append([s1, s2, s3], comment='created by asammdf v1.1.0')
>>> t = np.array([0.006, 0.007, 0.008, 0.009, 0.010])
>>> # extend without invalidation bits
>>> mdf2.extend(0, [(t, None), (s1, None), (s2, None), (s3, None)])
>>> # some invaldiation btis
>>> s1_inv = np.array([0,0,0,1,1], dtype=np.bool)
>>> mdf2.extend(0, [(t, None), (s1.samples, None), (s2.samples, None), (s3.samples, None)])
"""
gp = self.groups[index]
if not signals:
message = '"append" requires a non-empty list of Signal objects'
raise MdfException(message)
stream = self._tempfile
stream.seek(0, 2)
write = stream.write
tell = stream.tell
added_cycles = len(signals[0][0])
self.virtual_groups[index].cycles_nr += added_cycles
for i, (signal, invalidation_bits) in enumerate(signals):
gp = self.groups[index + i]
sig_type = gp.signal_types[0]
# first add the signals in the simple signal list
if sig_type == v4c.SIGNAL_TYPE_SCALAR:
samples = signal
elif sig_type == v4c.SIGNAL_TYPE_CANOPEN:
names = signal.dtype.names
if names == v4c.CANOPEN_TIME_FIELDS:
samples = signal
else:
vals = []
for field in ("ms", "min", "hour", "day", "month", "year"):
vals.append(signal[field])
samples = fromarrays(vals)
elif sig_type == v4c.SIGNAL_TYPE_STRUCTURE_COMPOSITION:
samples = signal
elif sig_type == v4c.SIGNAL_TYPE_ARRAY:
samples = signal
else:
cur_offset = sum(
blk.original_size for blk in gp.get_signal_data_blocks(0)
)
offsets = arange(len(signal), dtype=uint64) * (signal.itemsize + 4)
values = [full(len(signal), signal.itemsize, dtype=uint32), signal]
types_ = [("", uint32), ("", signal.dtype)]
values = fromarrays(values, dtype=types_)
addr = tell()
block_size = len(values) * values.itemsize
if block_size:
info = SignalDataBlockInfo(
address=addr,
compressed_size=block_size,
original_size=block_size,
location=v4c.LOCATION_TEMPORARY_FILE,
)
gp.signal_data[i][0].append(info)
write(values.tobytes())
offsets += cur_offset
samples = offsets
addr = tell()
if added_cycles:
data = samples.tobytes()
raw_size = len(data)
data = lz_compress(data)
size = len(data)
write(data)
gp.data_blocks.append(
DataBlockInfo(
address=addr,
block_type=v4c.DZ_BLOCK_LZ,
original_size=raw_size,
compressed_size=size,
param=0,
)
)
gp.channel_group.cycles_nr += added_cycles
if invalidation_bits is not None:
addr = tell()
data = invalidation_bits.tobytes()
raw_size = len(data)
data = lz_compress(data)
size = len(data)
write(data)
gp.data_blocks[-1].invalidation_block(
InvalidationBlockInfo(
address=addr,
block_type=v4c.DZ_BLOCK_LZ,
original_size=raw_size,
compressed_size=size,
param=None,
)
)
[docs] def attach(
self,
data: bytes,
file_name: str | None = None,
hash_sum: bytes | None = None,
comment: str = "",
compression: bool = True,
mime: str = r"application/octet-stream",
embedded: bool = True,
password: str | bytes | None = None,
) -> int:
"""attach embedded attachment as application/octet-stream.
Parameters
----------
data : bytes
data to be attached
file_name : str
string file name
hash_sum : bytes
md5 of the data
comment : str
attachment comment
compression : bool
use compression for embedded attachment data
mime : str
mime type string
embedded : bool
attachment is embedded in the file
password : str | bytes | None , default None
password used to encrypt the data using AES256 encryption
.. versionadded:: 7.0.0
Returns
-------
index : int
new attachment index
"""
if self._force_attachment_encryption:
password = password or self._password
if password and not CRYPTOGRAPHY_AVAILABLE:
raise MdfException(
"cryptography must be installed for attachment encryption"
)
if hash_sum is None:
worker = md5()
worker.update(data)
hash_sum = worker.hexdigest()
if hash_sum in self._attachments_cache:
return self._attachments_cache[hash_sum]
if password:
if isinstance(password, str):
password = password.encode("utf-8")
size = len(password)
if size < 32:
password = password + bytes(32 - size)
else:
password = password[:32]
iv = os.urandom(16)
cipher = Cipher(algorithms.AES(password), modes.CBC(iv))
encryptor = cipher.encryptor()
original_size = len(data)
rem = original_size % 16
if rem:
data += os.urandom(16 - rem)
data = iv + encryptor.update(data) + encryptor.finalize()
worker = md5()
worker.update(data)
hash_sum_encrypted = worker.hexdigest()
comment = f"""<ATcomment>
<TX>{comment}</TX>
<extensions>
<extension>
<encrypted>true</encrypted>
<algorithm>AES256</algorithm>
<original_md5_sum>{hash_sum_encrypted}</original_md5_sum>
<original_size>{original_size}</original_size>
</extension>
</extensions>
</ATcomment>
"""
else:
hash_sum_encrypted = hash_sum
comment = comment
if hash_sum_encrypted in self._attachments_cache:
return self._attachments_cache[hash_sum]
creator_index = len(self.file_history)
fh = FileHistory()
fh.comment = """<FHcomment>
<TX>Added new embedded attachment from {}</TX>
<tool_id>asammdf</tool_id>
<tool_vendor>asammdf</tool_vendor>
<tool_version>{}</tool_version>
</FHcomment>""".format(
file_name if file_name else "bin.bin", __version__
)
self.file_history.append(fh)
file_name = file_name or "bin.bin"
at_block = AttachmentBlock(
data=data,
compression=compression,
embedded=embedded,
file_name=file_name,
comment=comment,
)
at_block.comment = comment
at_block["creator_index"] = creator_index
self.attachments.append(at_block)
suffix = Path(file_name).suffix.lower().strip(".")
if suffix == "a2l":
mime = "application/A2L"
else:
mime = f"application/x-{suffix}"
at_block.mime = mime
index = len(self.attachments) - 1
self._attachments_cache[hash_sum] = index
self._attachments_cache[hash_sum_encrypted] = index
return index
[docs] def close(self) -> None:
"""if the MDF was created with memory=False and new
channels have been appended, then this must be called just before the
object is not used anymore to clean-up the temporary file"""
if self._closed:
return
else:
self._closed = True
self._parent = None
if self._tempfile is not None:
self._tempfile.close()
if not self._from_filelike and self._file is not None:
self._file.close()
if self._mapped_file is not None:
self._mapped_file.close()
if self._delete_on_close:
try:
Path(self.name).unlink()
except:
pass
if self.original_name is not None:
if Path(self.original_name).suffix.lower() in (
".bz2",
".gzip",
".mf4z",
".zip",
):
try:
Path(self.name).unlink()
except:
pass
for gp in self.groups:
gp.clear()
self.groups.clear()
self.header = None
self.identification = None
self.file_history.clear()
self.channels_db.clear()
self.masters_db.clear()
self.attachments.clear()
self._attachments_cache.clear()
self.file_comment = None
self.events.clear()
self._ch_map.clear()
self._master_channel_metadata.clear()
self._invalidation_cache.clear()
self._external_dbc_cache.clear()
self._si_map.clear()
self._file_si_map.clear()
self._cc_map.clear()
self._file_cc_map.clear()
self._cg_map.clear()
self._cn_data_map.clear()
self._dbc_cache.clear()
self.virtual_groups.clear()
@overload
def get(
self,
name: str | None = ...,
group: int | None = ...,
index: int | None = ...,
raster: RasterType | None = ...,
samples_only: Literal[False] = ...,
data: bytes | None = ...,
raw: bool = ...,
ignore_invalidation_bits: bool = ...,
record_offset: int = ...,
record_count: int | None = ...,
skip_channel_validation: bool = ...,
) -> Signal:
...
@overload
def get(
self,
name: str | None = ...,
group: int | None = ...,
index: int | None = ...,
raster: RasterType | None = ...,
samples_only: Literal[True] = ...,
data: bytes | None = ...,
raw: bool = ...,
ignore_invalidation_bits: bool = ...,
record_offset: int = ...,
record_count: int | None = ...,
skip_channel_validation: bool = ...,
) -> tuple[NDArray[Any], NDArray[Any]]:
...
[docs] def get(
self,
name: str | None = None,
group: int | None = None,
index: int | None = None,
raster: RasterType | None = None,
samples_only: bool = False,
data: bytes | None = None,
raw: bool = False,
ignore_invalidation_bits: bool = False,
record_offset: int = 0,
record_count: int | None = None,
skip_channel_validation: bool = False,
) -> Signal | tuple[NDArray[Any], NDArray[Any]]:
"""Gets channel samples. The raw data group samples are not loaded to
memory so it is advised to use ``filter`` or ``select`` instead of
performing several ``get`` calls.
Channel can be specified in two ways:
* using the first positional argument *name*
* if there are multiple occurrences for this channel then the
*group* and *index* arguments can be used to select a specific
group.
* if there are multiple occurrences for this channel and either the
*group* or *index* arguments is None then a warning is issued
* using the group number (keyword argument *group*) and the channel
number (keyword argument *index*). Use *info* method for group and
channel numbers
If the *raster* keyword argument is not *None* the output is
interpolated accordingly
Parameters
----------
name : string
name of channel
group : int
0-based group index
index : int
0-based channel index
raster : float
time raster in seconds
samples_only : bool
if *True* return only the channel samples as numpy array; if
*False* return a *Signal* object
data : bytes
prevent redundant data read by providing the raw data group samples
raw : bool
return channel samples without applying the conversion rule; default
`False`
ignore_invalidation_bits : bool
option to ignore invalidation bits
record_offset : int
if *data=None* use this to select the record offset from which the
group data should be loaded
record_count : int
number of records to read; default *None* and in this case all
available records are used
skip_channel_validation (False) : bool
skip validation of channel name, group index and channel index; defualt
*False*. If *True*, the caller has to make sure that the *group* and *index*
arguments are provided and are correct.
..versionadded:: 7.0.0
Returns
-------
res : (numpy.array, numpy.array) | Signal
returns *Signal* if *samples_only*=*False* (default option),
otherwise returns a (numpy.array, numpy.array) tuple of samples and
invalidation bits. If invalidation bits are not used or if
*ignore_invalidation_bits* if False, then the second item will be
None.
The *Signal* samples are:
* numpy recarray for channels that have composition/channel
array address or for channel of type
CANOPENDATE, CANOPENTIME
* numpy array for all the rest
Raises
------
MdfException :
* if the channel name is not found
* if the group index is out of range
* if the channel index is out of range
Examples
--------
>>> from asammdf import MDF, Signal
>>> import numpy as np
>>> t = np.arange(5)
>>> s = np.ones(5)
>>> mdf = MDF(version='4.10')
>>> for i in range(4):
... sigs = [Signal(s*(i*10+j), t, name='Sig') for j in range(1, 4)]
... mdf.append(sigs)
...
>>> # first group and channel index of the specified channel name
...
>>> mdf.get('Sig')
UserWarning: Multiple occurrences for channel "Sig". Using first occurrence from data group 4. Provide both "group" and "index" arguments to select another data group
<Signal Sig:
samples=[ 1. 1. 1. 1. 1.]
timestamps=[0 1 2 3 4]
unit=""
info=None
comment="">
>>> # first channel index in the specified group
...
>>> mdf.get('Sig', 1)
<Signal Sig:
samples=[ 11. 11. 11. 11. 11.]
timestamps=[0 1 2 3 4]
unit=""
info=None
comment="">
>>> # channel named Sig from group 1 channel index 2
...
>>> mdf.get('Sig', 1, 2)
<Signal Sig:
samples=[ 12. 12. 12. 12. 12.]
timestamps=[0 1 2 3 4]
unit=""
info=None
comment="">
>>> # channel index 1 or group 2
...
>>> mdf.get(None, 2, 1)
<Signal Sig:
samples=[ 21. 21. 21. 21. 21.]
timestamps=[0 1 2 3 4]
unit=""
info=None
comment="">
>>> mdf.get(group=2, index=1)
<Signal Sig:
samples=[ 21. 21. 21. 21. 21.]
timestamps=[0 1 2 3 4]
unit=""
info=None
comment="">
"""
if skip_channel_validation:
gp_nr, ch_nr = group, index
else:
gp_nr, ch_nr = self._validate_channel_selection(name, group, index)
grp = self.groups[gp_nr]
# get the channel object
channel = grp.channels[ch_nr]
dependency_list = grp.channel_dependencies[ch_nr]
master_is_required = not samples_only or raster
vals = None
all_invalid = False
if (
channel.byte_offset + (channel.bit_offset + channel.bit_count) / 8
> grp.channel_group.samples_byte_nr
):
all_invalid = True
logger.warning(
"\n\t".join(
[
f"Channel {channel.name} byte offset too high:",
f"byte offset = {channel.byte_offset}",
f"bit offset = {channel.bit_offset}",
f"bit count = {channel.bit_count}",
f"group record size = {grp.channel_group.samples_byte_nr}",
f"group index = {gp_nr}",
f"channel index = {ch_nr}",
]
)
)
if (
channel.bit_offset + channel.bit_count
) / 8 > grp.channel_group.samples_byte_nr:
vals, timestamps, invalidation_bits, encoding = [], [], None, None
else:
channel = deepcopy(channel)
channel.byte_offset = 0
if vals is None:
if dependency_list:
if not isinstance(dependency_list[0], ChannelArrayBlock):
vals, timestamps, invalidation_bits, encoding = self._get_structure(
channel=channel,
group=grp,
group_index=gp_nr,
channel_index=ch_nr,
dependency_list=dependency_list,
raster=raster,
data=data,
ignore_invalidation_bits=ignore_invalidation_bits,
record_offset=record_offset,
record_count=record_count,
master_is_required=master_is_required,
raw=raw,
)
else:
vals, timestamps, invalidation_bits, encoding = self._get_array(
channel=channel,
group=grp,
group_index=gp_nr,
channel_index=ch_nr,
dependency_list=dependency_list,
raster=raster,
data=data,
ignore_invalidation_bits=ignore_invalidation_bits,
record_offset=record_offset,
record_count=record_count,
master_is_required=master_is_required,
)
else:
vals, timestamps, invalidation_bits, encoding = self._get_scalar(
channel=channel,
group=grp,
group_index=gp_nr,
channel_index=ch_nr,
dependency_list=dependency_list,
raster=raster,
data=data,
ignore_invalidation_bits=ignore_invalidation_bits,
record_offset=record_offset,
record_count=record_count,
master_is_required=master_is_required,
)
if all_invalid:
invalidation_bits = np.ones(len(vals), dtype=bool)
if samples_only:
if not raw:
conversion = channel.conversion
if conversion:
vals = conversion.convert(vals)
res = vals, invalidation_bits
else:
conversion = channel.conversion
if not raw:
if conversion:
vals = conversion.convert(vals)
conversion = None
if vals.dtype.kind == "S":
encoding = "utf-8"
channel_type = channel.channel_type
if name is None:
name = channel.name
unit = conversion and conversion.unit or channel.unit
comment = channel.comment
source = channel.source
if source:
source = Source.from_source(source)
else:
cg_source = grp.channel_group.acq_source
if cg_source:
source = Source.from_source(cg_source)
else:
source = None
if channel.attachment is not None:
attachment = self.extract_attachment(
channel.attachment,
)
else:
attachment = None
master_metadata = self._master_channel_metadata.get(gp_nr, None)
if channel_type == v4c.CHANNEL_TYPE_SYNC:
flags = Signal.Flags.stream_sync
else:
flags = Signal.Flags.no_flags
try:
res = Signal(
samples=vals,
timestamps=timestamps,
unit=unit,
name=name,
comment=comment,
conversion=conversion,
raw=raw,
master_metadata=master_metadata,
attachment=attachment,
source=source,
display_names=channel.display_names,
bit_count=channel.bit_count,
flags=flags,
invalidation_bits=invalidation_bits,
encoding=encoding,
group_index=gp_nr,
channel_index=ch_nr,
)
except:
debug_channel(self, grp, channel, dependency_list)
raise
return res
def _get_structure(
self,
channel: Channel,
group: Group,
group_index: int,
channel_index: int,
dependency_list: list[tuple[int, int]],
raster: RasterType | None,
data: bytes | None,
ignore_invalidation_bits: bool,
record_offset: int,
record_count: int | None,
master_is_required: bool,
raw: bool,
) -> tuple[NDArray[Any], NDArray[Any] | None, NDArray[Any] | None, None]:
grp = group
gp_nr = group_index
# get data group record
self._prepare_record(grp)
# get group data
if data is None:
data = self._load_data(
grp, record_offset=record_offset, record_count=record_count
)
else:
data = (data,)
groups = self.groups
channel_invalidation_present = channel.flags & (
v4c.FLAG_CN_ALL_INVALID | v4c.FLAG_CN_INVALIDATION_PRESENT
)
_dtype = dtype(channel.dtype_fmt)
conditions = [
_dtype.itemsize == channel.bit_count // 8,
all(
groups[dg_nr].channels[ch_nr].channel_type != v4c.CHANNEL_TYPE_VLSD
for (dg_nr, ch_nr) in dependency_list
),
]
if all(conditions):
fast_path = True
channel_values = []
timestamps = []
invalidation_bits = []
byte_offset = channel.byte_offset
record_size = (
grp.channel_group.samples_byte_nr
+ grp.channel_group.invalidation_bytes_nr
)
count = 0
for fragment in data:
bts = fragment[0]
buffer = get_channel_raw_bytes(
bts, record_size, byte_offset, _dtype.itemsize
)
channel_values.append(frombuffer(buffer, dtype=_dtype))
if master_is_required:
timestamps.append(self.get_master(gp_nr, fragment, one_piece=True))
if channel_invalidation_present:
invalidation_bits.append(
self.get_invalidation_bits(gp_nr, channel, fragment)
)
count += 1
else:
unique_names = UniqueDB()
fast_path = False
names = [
unique_names.get_unique_name(grp.channels[ch_nr].name)
for _, ch_nr in dependency_list
]
channel_values = [[] for _ in dependency_list]
timestamps = []
invalidation_bits = []
count = 0
for fragment in data:
for i, (dg_nr, ch_nr) in enumerate(dependency_list):
vals = self.get(
group=dg_nr,
index=ch_nr,
samples_only=True,
data=fragment,
ignore_invalidation_bits=ignore_invalidation_bits,
record_offset=record_offset,
record_count=record_count,
raw=raw,
)[0]
channel_values[i].append(vals)
if master_is_required:
timestamps.append(self.get_master(gp_nr, fragment, one_piece=True))
if channel_invalidation_present:
invalidation_bits.append(
self.get_invalidation_bits(gp_nr, channel, fragment)
)
count += 1
if fast_path:
total_size = sum(len(_) for _ in channel_values)
shape = (total_size,) + channel_values[0].shape[1:]
if count > 1:
out = empty(shape, dtype=channel_values[0].dtype)
vals = concatenate(channel_values, out=out)
else:
vals = channel_values[0]
else:
total_size = sum(len(_) for _ in channel_values[0])
if count > 1:
arrays = []
for lst in channel_values:
shape_len = len(lst[0].shape)
# fix bytearray signals if the length changes between the chunks
if shape_len == 2:
shape = [total_size]
vlsd_max_size = max(l.shape[1] for l in lst)
shape.append(vlsd_max_size)
max_vlsd_arrs = []
for arr in lst:
if arr.shape[1] < vlsd_max_size:
arr = np.hstack(
(
arr,
np.zeros(
(
arr.shape[0],
vlsd_max_size - arr.shape[1],
),
dtype=arr.dtype,
),
)
)
max_vlsd_arrs.append(arr)
arr = concatenate(
max_vlsd_arrs,
out=empty(shape, dtype=max_vlsd_arrs[0].dtype),
)
arrays.append(arr)
elif shape_len == 1:
arr = concatenate(
lst,
out=empty((total_size,), dtype=lst[0].dtype),
)
arrays.append(arr)
else:
arrays.append(
concatenate(
lst,
out=empty(
(total_size,) + lst[0].shape[1:], dtype=lst[0].dtype
),
)
)
else:
arrays = [lst[0] for lst in channel_values]
types = [
(name_, arr.dtype, arr.shape[1:]) for name_, arr in zip(names, arrays)
]
types = dtype(types)
vals = fromarrays(arrays, dtype=types)
if master_is_required:
if count > 1:
out = empty(total_size, dtype=timestamps[0].dtype)
timestamps = concatenate(timestamps, out=out)
else:
timestamps = timestamps[0]
else:
timestamps = None
if channel_invalidation_present:
if count > 1:
out = empty(total_size, dtype=invalidation_bits[0].dtype)
invalidation_bits = concatenate(invalidation_bits, out=out)
else:
invalidation_bits = invalidation_bits[0]
if not ignore_invalidation_bits:
vals = vals[nonzero(~invalidation_bits)[0]]
if master_is_required:
timestamps = timestamps[nonzero(~invalidation_bits)[0]]
invalidation_bits = None
else:
invalidation_bits = None
if raster and len(timestamps) > 1:
t = arange(timestamps[0], timestamps[-1], raster)
vals = Signal(
vals, timestamps, name="_", invalidation_bits=invalidation_bits
).interp(
t,
integer_interpolation_mode=self._integer_interpolation,
float_interpolation_mode=self._float_interpolation,
)
vals, timestamps, invalidation_bits = (
vals.samples,
vals.timestamps,
vals.invalidation_bits,
)
return vals, timestamps, invalidation_bits, None
def _get_array(
self,
channel: Channel,
group: Group,
group_index: int,
channel_index: int,
dependency_list: list[tuple[int, int]],
raster: RasterType | None,
data: bytes | None,
ignore_invalidation_bits: bool,
record_offset: int,
record_count: int | None,
master_is_required: bool,
) -> tuple[NDArray[Any], NDArray[Any] | None, NDArray[Any] | None, None]:
grp = group
gp_nr = group_index
ch_nr = channel_index
# get data group record
self._prepare_record(grp)
# get group data
if data is None:
data = self._load_data(
grp, record_offset=record_offset, record_count=record_count
)
else:
data = (data,)
dep = ca_block = dependency_list[0]
shape = tuple(ca_block[f"dim_size_{i}"] for i in range(ca_block.dims))
shape = tuple(dim for dim in shape if dim > 1)
shape = shape or (1,)
dim = 1
for d in shape:
dim *= d
item_size = channel.bit_count // 8
size = item_size * dim
if group.uses_ld:
record_size = group.channel_group.samples_byte_nr
else:
record_size = (
group.channel_group.samples_byte_nr
+ group.channel_group.invalidation_bytes_nr
)
channel_dtype = get_fmt_v4(
channel.data_type,
channel.bit_count,
channel.channel_type,
)
byte_offset = channel.byte_offset
types = [
("", f"a{byte_offset}"),
("vals", channel_dtype, shape),
("", f"a{record_size - size - byte_offset}"),
]
dtype_fmt = dtype(types)
channel_invalidation_present = channel.flags & (
v4c.FLAG_CN_ALL_INVALID | v4c.FLAG_CN_INVALIDATION_PRESENT
)
channel_group = grp.channel_group
samples_size = (
channel_group.samples_byte_nr + channel_group.invalidation_bytes_nr
)
channel_values = []
timestamps = []
invalidation_bits = []
count = 0
for fragment in data:
arrays = []
types = []
data_bytes, offset, _count, invalidation_bytes = fragment
cycles = len(data_bytes) // samples_size
vals = frombuffer(data_bytes, dtype=dtype_fmt)["vals"]
if dep.flags & v4c.FLAG_CA_INVERSE_LAYOUT:
shape = vals.shape
shape = (shape[0],) + shape[1:][::-1]
vals = vals.reshape(shape)
axes = (0,) + tuple(range(len(shape) - 1, 0, -1))
vals = transpose(vals, axes=axes)
cycles_nr = len(vals)
for ca_block in dependency_list[:1]:
dims_nr = ca_block.dims
if ca_block.ca_type == v4c.CA_TYPE_SCALE_AXIS:
shape = (ca_block.dim_size_0,)
arrays.append(vals)
dtype_pair = channel.name, vals.dtype, shape
types.append(dtype_pair)
elif ca_block.ca_type == v4c.CA_TYPE_LOOKUP:
shape = vals.shape[1:]
arrays.append(vals)
dtype_pair = channel.name, vals.dtype, shape
types.append(dtype_pair)
if ca_block.flags & v4c.FLAG_CA_FIXED_AXIS:
for i in range(dims_nr):
shape = (ca_block[f"dim_size_{i}"],)
axis = []
for j in range(shape[0]):
key = f"axis_{i}_value_{j}"
axis.append(ca_block[key])
axis = array(axis)
axis = array([axis for _ in range(cycles_nr)])
arrays.append(axis)
dtype_pair = (f"axis_{i}", axis.dtype, shape)
types.append(dtype_pair)
else:
for i in range(dims_nr):
axis = ca_block.axis_channels[i]
shape = (ca_block[f"dim_size_{i}"],)
if axis is None:
axisname = f"axis_{i}"
axis_values = array(
[arange(shape[0])] * cycles,
dtype=f"({shape[0]},)f8",
)
else:
try:
(ref_dg_nr, ref_ch_nr) = ca_block.axis_channels[i]
except:
debug_channel(self, grp, channel, dependency_list)
raise
axisname = (
self.groups[ref_dg_nr].channels[ref_ch_nr].name
)
if ref_dg_nr == gp_nr:
axis_values = self.get(
group=ref_dg_nr,
index=ref_ch_nr,
samples_only=True,
data=fragment,
ignore_invalidation_bits=ignore_invalidation_bits,
record_offset=record_offset,
record_count=cycles,
raw=True,
)[0]
else:
channel_group = grp.channel_group
record_size = channel_group.samples_byte_nr
record_size += channel_group.invalidation_bytes_nr
start = offset // record_size
end = start + len(data_bytes) // record_size + 1
ref = self.get(
group=ref_dg_nr,
index=ref_ch_nr,
samples_only=True,
ignore_invalidation_bits=ignore_invalidation_bits,
record_offset=record_offset,
record_count=cycles,
raw=True,
)[0]
axis_values = ref[start:end].copy()
axis_values = axis_values[axisname]
if len(axis_values) == 0 and cycles:
axis_values = array([arange(shape[0])] * cycles)
arrays.append(axis_values)
dtype_pair = (axisname, axis_values.dtype, shape)
types.append(dtype_pair)
elif ca_block.ca_type == v4c.CA_TYPE_ARRAY:
shape = vals.shape[1:]
arrays.append(vals)
dtype_pair = channel.name, vals.dtype, shape
types.append(dtype_pair)
for ca_block in dependency_list[1:]:
dims_nr = ca_block.dims
if ca_block.flags & v4c.FLAG_CA_FIXED_AXIS:
for i in range(dims_nr):
shape = (ca_block[f"dim_size_{i}"],)
axis = []
for j in range(shape[0]):
key = f"axis_{i}_value_{j}"
axis.append(ca_block[key])
axis = array(
[axis for _ in range(cycles_nr)], dtype=f"{shape}f8"
)
arrays.append(axis)
types.append((f"axis_{i}", axis.dtype, shape))
else:
for i in range(dims_nr):
axis = ca_block.axis_channels[i]
shape = (ca_block[f"dim_size_{i}"],)
if axis is None:
axisname = f"axis_{i}"
axis_values = array(
[arange(shape[0])] * cycles, dtype=f"({shape[0]},)f8"
)
else:
try:
ref_dg_nr, ref_ch_nr = ca_block.axis_channels[i]
except:
debug_channel(self, grp, channel, dependency_list)
raise
axisname = self.groups[ref_dg_nr].channels[ref_ch_nr].name
if ref_dg_nr == gp_nr:
axis_values = self.get(
group=ref_dg_nr,
index=ref_ch_nr,
samples_only=True,
data=fragment,
ignore_invalidation_bits=ignore_invalidation_bits,
record_offset=record_offset,
record_count=cycles,
raw=True,
)[0]
else:
channel_group = grp.channel_group
record_size = channel_group.samples_byte_nr
record_size += channel_group.invalidation_bytes_nr
start = offset // record_size
end = start + len(data_bytes) // record_size + 1
ref = self.get(
group=ref_dg_nr,
index=ref_ch_nr,
samples_only=True,
ignore_invalidation_bits=ignore_invalidation_bits,
record_offset=record_offset,
record_count=cycles,
raw=True,
)[0]
axis_values = ref[start:end].copy()
axis_values = axis_values[axisname]
if len(axis_values) == 0 and cycles:
axis_values = array([arange(shape[0])] * cycles)
arrays.append(axis_values)
dtype_pair = (axisname, axis_values.dtype, shape)
types.append(dtype_pair)
vals = fromarrays(arrays, dtype(types))
if master_is_required:
timestamps.append(self.get_master(gp_nr, fragment, one_piece=True))
if channel_invalidation_present:
invalidation_bits.append(
self.get_invalidation_bits(gp_nr, channel, fragment)
)
channel_values.append(vals)
count += 1
if count > 1:
total_size = sum(len(_) for _ in channel_values)
shape = (total_size,) + channel_values[0].shape[1:]
if count > 1:
out = empty(shape, dtype=channel_values[0].dtype)
vals = concatenate(channel_values, out=out)
elif count == 1:
vals = channel_values[0]
else:
vals = []
if master_is_required:
if count > 1:
out = empty(total_size, dtype=timestamps[0].dtype)
timestamps = concatenate(timestamps, out=out)
else:
timestamps = timestamps[0]
else:
timestamps = None
if channel_invalidation_present:
if count > 1:
out = empty(total_size, dtype=invalidation_bits[0].dtype)
invalidation_bits = concatenate(invalidation_bits, out=out)
else:
invalidation_bits = invalidation_bits[0]
if not ignore_invalidation_bits:
vals = vals[nonzero(~invalidation_bits)[0]]
if master_is_required:
timestamps = timestamps[nonzero(~invalidation_bits)[0]]
invalidation_bits = None
else:
invalidation_bits = None
if raster and len(timestamps) > 1:
t = arange(timestamps[0], timestamps[-1], raster)
vals = Signal(
vals, timestamps, name="_", invalidation_bits=invalidation_bits
).interp(
t,
integer_interpolation_mode=self._integer_interpolation,
float_interpolation_mode=self._float_interpolation,
)
vals, timestamps, invalidation_bits = (
vals.samples,
vals.timestamps,
vals.invalidation_bits,
)
return vals, timestamps, invalidation_bits, None
def _get_scalar(
self,
channel: Channel,
group: Group,
group_index: int,
channel_index: int,
dependency_list: list[tuple[int, int]],
raster: RasterType | None,
data: bytes | None,
ignore_invalidation_bits: bool,
record_offset: int,
record_count: int | None,
master_is_required: bool,
skip_vlsd: bool = False,
) -> tuple[NDArray[Any], NDArray[Any] | None, NDArray[Any] | None, str | None]:
grp = group
gp_nr = group_index
ch_nr = channel_index
# get group data
if data is None:
data = self._load_data(
grp, record_offset=record_offset, record_count=record_count
)
one_piece = False
else:
one_piece = True
channel_invalidation_present = channel.flags & (
v4c.FLAG_CN_ALL_INVALID | v4c.FLAG_CN_INVALIDATION_PRESENT
)
data_type = channel.data_type
channel_type = channel.channel_type
bit_count = channel.bit_count
encoding = None
channel_dtype = channel.dtype_fmt
# get channel values
if channel_type in {
v4c.CHANNEL_TYPE_VIRTUAL,
v4c.CHANNEL_TYPE_VIRTUAL_MASTER,
}:
if not channel.dtype_fmt:
channel.dtype_fmt = dtype(get_fmt_v4(data_type, 64))
ch_dtype = channel.dtype_fmt
channel_values = []
timestamps = []
invalidation_bits = []
channel_group = grp.channel_group
record_size = channel_group.samples_byte_nr
record_size += channel_group.invalidation_bytes_nr
count = 0
if one_piece:
data = (data,)
for fragment in data:
data_bytes, offset, _count, invalidation_bytes = fragment
offset = offset // record_size
vals = arange(len(data_bytes) // record_size, dtype=ch_dtype)
vals += offset
if master_is_required:
timestamps.append(
self.get_master(
gp_nr,
fragment,
record_offset=offset,
record_count=_count,
one_piece=True,
)
)
if channel_invalidation_present:
invalidation_bits.append(
self.get_invalidation_bits(gp_nr, channel, fragment)
)
channel_values.append(vals)
count += 1
if count > 1:
total_size = sum(len(_) for _ in channel_values)
shape = (total_size,) + channel_values[0].shape[1:]
if count > 1:
out = empty(shape, dtype=channel_values[0].dtype)
vals = concatenate(channel_values, out=out)
elif count == 1:
vals = channel_values[0]
else:
vals = []
if master_is_required:
if count > 1:
out = empty(total_size, dtype=timestamps[0].dtype)
timestamps = concatenate(timestamps, out=out)
else:
timestamps = timestamps[0]
if channel_invalidation_present:
if count > 1:
out = empty(total_size, dtype=invalidation_bits[0].dtype)
invalidation_bits = concatenate(invalidation_bits, out=out)
else:
invalidation_bits = invalidation_bits[0]
if not ignore_invalidation_bits:
vals = vals[nonzero(~invalidation_bits)[0]]
if master_is_required:
timestamps = timestamps[nonzero(~invalidation_bits)[0]]
invalidation_bits = None
else:
invalidation_bits = None
if raster and len(timestamps) > 1:
num = float(float32((timestamps[-1] - timestamps[0]) / raster))
if num.is_integer():
t = linspace(timestamps[0], timestamps[-1], int(num))
else:
t = arange(timestamps[0], timestamps[-1], raster)
vals = Signal(
vals, timestamps, name="_", invalidation_bits=invalidation_bits
).interp(
t,
integer_interpolation_mode=self._integer_interpolation,
float_interpolation_mode=self._float_interpolation,
)
vals, timestamps, invalidation_bits = (
vals.samples,
vals.timestamps,
vals.invalidation_bits,
)
if channel.conversion:
vals = channel.conversion.convert(vals)
else:
channel_group = grp.channel_group
record_size = channel_group.samples_byte_nr
if one_piece:
fragment = data
data_bytes = fragment[0]
info = grp.record[ch_nr]
if info is not None:
dtype_, byte_size, byte_offset, bit_offset = info
if (
ch_nr == 0
and len(grp.channels) == 1
and channel.dtype_fmt.itemsize == record_size
):
buffer = bytearray(data_bytes)
else:
buffer = get_channel_raw_bytes(
data_bytes,
record_size + channel_group.invalidation_bytes_nr,
byte_offset,
byte_size,
)
vals = frombuffer(buffer, dtype=dtype_)
if not channel.standard_C_size:
size = byte_size
if channel_dtype.byteorder == "|" and data_type in (
v4c.DATA_TYPE_SIGNED_MOTOROLA,
v4c.DATA_TYPE_UNSIGNED_MOTOROLA,
):
view = f">u{vals.itemsize}"
else:
view = f"{channel_dtype.byteorder}u{vals.itemsize}"
if dtype(view) != vals.dtype:
vals = vals.view(view)
if bit_offset:
vals >>= bit_offset
if bit_count != size * 8:
if data_type in v4c.SIGNED_INT:
vals = as_non_byte_sized_signed_int(vals, bit_count)
else:
mask = (1 << bit_count) - 1
vals &= mask
elif data_type in v4c.SIGNED_INT:
view = f"{channel_dtype.byteorder}i{vals.itemsize}"
if dtype(view) != vals.dtype:
vals = vals.view(view)
else:
vals = self._get_not_byte_aligned_data(data_bytes, grp, ch_nr)
if bit_count == 1 and self._single_bit_uint_as_bool:
vals = array(vals, dtype=bool)
if master_is_required:
timestamps = self.get_master(gp_nr, fragment, one_piece=True)
else:
timestamps = None
if channel_invalidation_present:
invalidation_bits = self.get_invalidation_bits(
gp_nr, channel, fragment
)
if not ignore_invalidation_bits:
vals = vals[nonzero(~invalidation_bits)[0]]
if master_is_required:
timestamps = timestamps[nonzero(~invalidation_bits)[0]]
invalidation_bits = None
else:
invalidation_bits = None
else:
channel_values = []
timestamps = []
invalidation_bits = []
info = grp.record[ch_nr]
if info is None:
for count, fragment in enumerate(data, 1):
data_bytes, offset, _count, invalidation_bytes = fragment
vals = self._get_not_byte_aligned_data(data_bytes, grp, ch_nr)
if bit_count == 1 and self._single_bit_uint_as_bool:
vals = array(vals, dtype=bool)
if master_is_required:
timestamps.append(
self.get_master(gp_nr, fragment, one_piece=True)
)
if channel_invalidation_present:
invalidation_bits.append(
self.get_invalidation_bits(gp_nr, channel, fragment)
)
channel_values.append(vals)
vals = concatenate(channel_values)
else:
dtype_, byte_size, byte_offset, bit_offset = info
buffer = []
count = 0
for count, fragment in enumerate(data, 1):
data_bytes = fragment[0]
if (
ch_nr == 0
and len(grp.channels) == 1
and channel.dtype_fmt.itemsize == record_size
):
buffer.append(data_bytes)
else:
buffer.append(
get_channel_raw_bytes(
data_bytes,
record_size + channel_group.invalidation_bytes_nr,
byte_offset,
byte_size,
)
)
if master_is_required:
timestamps.append(
self.get_master(gp_nr, fragment, one_piece=True)
)
if channel_invalidation_present:
invalidation_bits.append(
self.get_invalidation_bits(gp_nr, channel, fragment)
)
if count > 1:
buffer = bytearray().join(buffer)
elif count == 1:
buffer = buffer[0]
else:
buffer = bytearray()
vals = frombuffer(buffer, dtype=dtype_)
if not channel.standard_C_size:
size = dtype_.itemsize
if channel_dtype.byteorder == "|" and data_type in (
v4c.DATA_TYPE_SIGNED_MOTOROLA,
v4c.DATA_TYPE_UNSIGNED_MOTOROLA,
):
view = f">u{vals.itemsize}"
else:
view = f"{channel_dtype.byteorder}u{vals.itemsize}"
if dtype(view) != dtype_:
vals = vals.view(view)
if bit_offset:
vals >>= bit_offset
if bit_count != size * 8:
if data_type in v4c.SIGNED_INT:
vals = as_non_byte_sized_signed_int(vals, bit_count)
else:
mask = (1 << bit_count) - 1
vals &= mask
elif data_type in v4c.SIGNED_INT:
view = f"{channel_dtype.byteorder}i{vals.itemsize}"
if dtype(view) != vals.dtype:
vals = vals.view(view)
if bit_count == 1 and self._single_bit_uint_as_bool:
vals = array(vals, dtype=bool)
total_size = len(vals)
if master_is_required:
if count > 1:
out = empty(total_size, dtype=timestamps[0].dtype)
timestamps = concatenate(timestamps, out=out)
elif count == 1:
timestamps = timestamps[0]
else:
timestamps = []
if channel_invalidation_present:
if count > 1:
out = empty(total_size, dtype=invalidation_bits[0].dtype)
invalidation_bits = concatenate(invalidation_bits, out=out)
elif count == 1:
invalidation_bits = invalidation_bits[0]
else:
invalidation_bits = []
if not ignore_invalidation_bits:
vals = vals[nonzero(~invalidation_bits)[0]]
if master_is_required:
timestamps = timestamps[nonzero(~invalidation_bits)[0]]
invalidation_bits = None
else:
invalidation_bits = None
if raster and len(timestamps) > 1:
num = float(float32((timestamps[-1] - timestamps[0]) / raster))
if num.is_integer():
t = linspace(timestamps[0], timestamps[-1], int(num))
else:
t = arange(timestamps[0], timestamps[-1], raster)
vals = Signal(
vals, timestamps, name="_", invalidation_bits=invalidation_bits
).interp(
t,
integer_interpolation_mode=self._integer_interpolation,
float_interpolation_mode=self._float_interpolation,
)
vals, timestamps, invalidation_bits = (
vals.samples,
vals.timestamps,
vals.invalidation_bits,
)
if channel_type == v4c.CHANNEL_TYPE_VLSD and not skip_vlsd:
count_ = len(vals)
if count_:
signal_data = self._load_signal_data(
group=grp, index=ch_nr, start_offset=vals[0], end_offset=vals[-1]
)
else:
signal_data = b""
max_vlsd_size = self.determine_max_vlsd_sample_size(
group_index, channel_index
)
if signal_data:
if data_type in (
v4c.DATA_TYPE_BYTEARRAY,
v4c.DATA_TYPE_UNSIGNED_INTEL,
v4c.DATA_TYPE_UNSIGNED_MOTOROLA,
v4c.DATA_TYPE_MIME_SAMPLE,
v4c.DATA_TYPE_MIME_STREAM,
):
vals = extract(signal_data, 1, vals - vals[0])
if vals.shape[1] < max_vlsd_size:
vals = np.hstack(
(
vals,
np.zeros(
(
vals.shape[0],
max_vlsd_size - vals.shape[1],
),
dtype=vals.dtype,
),
)
)
else:
vals = extract(signal_data, 0, vals - vals[0])
if data_type not in (
v4c.DATA_TYPE_BYTEARRAY,
v4c.DATA_TYPE_UNSIGNED_INTEL,
v4c.DATA_TYPE_UNSIGNED_MOTOROLA,
v4c.DATA_TYPE_MIME_SAMPLE,
v4c.DATA_TYPE_MIME_STREAM,
):
if data_type == v4c.DATA_TYPE_STRING_UTF_16_BE:
encoding = "utf-16-be"
elif data_type == v4c.DATA_TYPE_STRING_UTF_16_LE:
encoding = "utf-16-le"
elif data_type == v4c.DATA_TYPE_STRING_UTF_8:
encoding = "utf-8"
vals = np.array(
[e.rsplit(b"\0")[0] for e in vals.tolist()],
dtype=vals.dtype,
)
elif data_type == v4c.DATA_TYPE_STRING_LATIN_1:
encoding = "latin-1"
vals = np.array(
[e.rsplit(b"\0")[0] for e in vals.tolist()],
dtype=vals.dtype,
)
else:
raise MdfException(
f'wrong data type "{data_type}" for vlsd channel "{channel.name}"'
)
vals = vals.astype(f"S{max_vlsd_size}")
else:
if len(vals):
raise MdfException(
f'Wrong signal data block refence (0x{channel.data_block_addr:X}) for VLSD channel "{channel.name}"'
)
# no VLSD signal data samples
if data_type != v4c.DATA_TYPE_BYTEARRAY:
vals = array([], dtype=f"S{max_vlsd_size}")
if data_type == v4c.DATA_TYPE_STRING_UTF_16_BE:
encoding = "utf-16-be"
elif data_type == v4c.DATA_TYPE_STRING_UTF_16_LE:
encoding = "utf-16-le"
elif data_type == v4c.DATA_TYPE_STRING_UTF_8:
encoding = "utf-8"
elif data_type == v4c.DATA_TYPE_STRING_LATIN_1:
encoding = "latin-1"
else:
raise MdfException(
f'wrong data type "{data_type}" for vlsd channel "{channel.name}"'
)
else:
vals = array([], dtype=f"({max_vlsd_size},)u1")
elif (
v4c.DATA_TYPE_STRING_LATIN_1 <= data_type <= v4c.DATA_TYPE_STRING_UTF_16_BE
):
if channel_type in (v4c.CHANNEL_TYPE_VALUE, v4c.CHANNEL_TYPE_MLSD):
if data_type == v4c.DATA_TYPE_STRING_UTF_16_BE:
encoding = "utf-16-be"
elif data_type == v4c.DATA_TYPE_STRING_UTF_16_LE:
encoding = "utf-16-le"
elif data_type == v4c.DATA_TYPE_STRING_UTF_8:
encoding = "utf-8"
elif data_type == v4c.DATA_TYPE_STRING_LATIN_1:
encoding = "latin-1"
else:
raise MdfException(
f'wrong data type "{data_type}" for string channel'
)
elif data_type in (v4c.DATA_TYPE_CANOPEN_TIME, v4c.DATA_TYPE_CANOPEN_DATE):
# CANopen date
if data_type == v4c.DATA_TYPE_CANOPEN_DATE:
types = dtype(
[
("ms", "<u2"),
("min", "<u1"),
("hour", "<u1"),
("day", "<u1"),
("month", "<u1"),
("year", "<u1"),
]
)
vals = vals.view(types)
arrays = []
arrays.append(vals["ms"])
# bit 6 and 7 of minutes are reserved
arrays.append(vals["min"] & 0x3F)
# only firt 4 bits of hour are used
arrays.append(vals["hour"] & 0xF)
# the first 4 bits are the day number
arrays.append(vals["day"] & 0xF)
# bit 6 and 7 of month are reserved
arrays.append(vals["month"] & 0x3F)
# bit 7 of year is reserved
arrays.append(vals["year"] & 0x7F)
# add summer or standard time information for hour
arrays.append((vals["hour"] & 0x80) >> 7)
# add day of week information
arrays.append((vals["day"] & 0xF0) >> 4)
names = [
"ms",
"min",
"hour",
"day",
"month",
"year",
"summer_time",
"day_of_week",
]
vals = fromarrays(arrays, names=names)
# CANopen time
elif data_type == v4c.DATA_TYPE_CANOPEN_TIME:
types = dtype([("ms", "<u4"), ("days", "<u2")])
vals = vals.view(types)
return vals, timestamps, invalidation_bits, encoding
def _get_not_byte_aligned_data(
self, data: bytes, group: Group, ch_nr: int
) -> NDArray[Any]:
big_endian_types = (
v4c.DATA_TYPE_UNSIGNED_MOTOROLA,
v4c.DATA_TYPE_REAL_MOTOROLA,
v4c.DATA_TYPE_SIGNED_MOTOROLA,
)
if group.uses_ld:
record_size = group.channel_group.samples_byte_nr
else:
record_size = (
group.channel_group.samples_byte_nr
+ group.channel_group.invalidation_bytes_nr
)
channel = group.channels[ch_nr]
bit_offset = channel.bit_offset
byte_offset = channel.byte_offset
bit_count = channel.bit_count
if ch_nr >= 0:
dependencies = group.channel_dependencies[ch_nr]
if dependencies and isinstance(dependencies[0], ChannelArrayBlock):
ca_block = dependencies[0]
size = bit_count // 8
shape = tuple(ca_block[f"dim_size_{i}"] for i in range(ca_block.dims))
if ca_block.byte_offset_base // size > 1 and len(shape) == 1:
shape += (ca_block.byte_offset_base // size,)
dim = 1
for d in shape:
dim *= d
size *= dim
bit_count = size * 8
byte_size = bit_offset + bit_count
if byte_size % 8:
byte_size = (byte_size // 8) + 1
else:
byte_size //= 8
types = [
("", f"a{byte_offset}"),
("vals", f"({byte_size},)u1"),
("", f"a{record_size - byte_size - byte_offset}"),
]
vals = fromstring(data, dtype=dtype(types))
vals = vals["vals"]
if byte_size in {1, 2, 4, 8}:
extra_bytes = 0
elif byte_size < 8:
extra_bytes = 4 - (byte_size % 4)
else:
extra_bytes = 0
std_size = byte_size + extra_bytes
big_endian = channel.data_type in big_endian_types
# prepend or append extra bytes columns
# to get a standard size number of bytes
if extra_bytes:
if big_endian:
vals = column_stack(
[vals, zeros(len(vals), dtype=f"<({extra_bytes},)u1")]
)
try:
vals = vals.view(f">u{std_size}").ravel()
except:
vals = frombuffer(vals.tobytes(), dtype=f">u{std_size}")
vals = vals >> (extra_bytes * 8 + bit_offset)
vals &= (1 << bit_count) - 1
else:
vals = column_stack(
[vals, zeros(len(vals), dtype=f"<({extra_bytes},)u1")]
)
try:
vals = vals.view(f"<u{std_size}").ravel()
except:
vals = frombuffer(vals.tobytes(), dtype=f"<u{std_size}")
vals = vals >> bit_offset
vals &= (1 << bit_count) - 1
else:
if big_endian:
try:
vals = vals.view(f">u{std_size}").ravel()
except:
vals = frombuffer(vals.tobytes(), dtype=f">u{std_size}")
vals = vals >> bit_offset
vals &= (1 << bit_count) - 1
else:
try:
vals = vals.view(f"<u{std_size}").ravel()
except:
vals = frombuffer(vals.tobytes(), dtype=f"<u{std_size}")
vals = vals >> bit_offset
vals &= (1 << bit_count) - 1
data_type = channel.data_type
if data_type in v4c.SIGNED_INT:
return as_non_byte_sized_signed_int(vals, bit_count)
elif data_type in v4c.FLOATS:
return vals.view(get_fmt_v4(data_type, bit_count))
else:
return vals
@lru_cache(maxsize=1024 * 1024)
def determine_max_vlsd_sample_size(self, group, index):
group_index = group
channel_index = index
group = self.groups[group]
ch = group.channels[index]
if ch.channel_type != v4c.CHANNEL_TYPE_VLSD:
return None
if (group_index, ch.name) in self.vlsd_max_length:
return self.vlsd_max_length[(group_index, ch.name)]
else:
offsets, *_ = self._get_scalar(
ch,
group,
group_index,
channel_index,
group.channel_dependencies[channel_index],
raster=None,
data=None,
ignore_invalidation_bits=True,
record_offset=0,
record_count=None,
master_is_required=False,
skip_vlsd=True,
)
offsets = offsets.astype("u8")
data = self._load_signal_data(group, channel_index)
max_size = get_vlsd_max_sample_size(data, offsets, len(offsets))
return max_size
def included_channels(
self,
index: int | None = None,
channels: ChannelsType | None = None,
skip_master: bool = True,
minimal: bool = True,
) -> dict[int, dict[int, Sequence[int]]]:
if channels is None:
virtual_channel_group = self.virtual_groups[index]
groups = virtual_channel_group.groups
gps = {}
for gp_index in groups:
group = self.groups[gp_index]
included_channels = set(range(len(group.channels)))
master_index = self.masters_db.get(gp_index, None)
if master_index is not None:
included_channels.remove(master_index)
channels = group.channels
for dependencies in group.channel_dependencies:
if dependencies is None:
continue
if all(
not isinstance(dep, ChannelArrayBlock) for dep in dependencies
):
for _, ch_nr in dependencies:
try:
included_channels.remove(ch_nr)
except KeyError:
pass
else:
for dep in dependencies:
for referenced_channels in (
dep.axis_channels,
dep.dynamic_size_channels,
dep.input_quantity_channels,
):
for gp_nr, ch_nr in referenced_channels:
if gp_nr == gp_index:
try:
included_channels.remove(ch_nr)
except KeyError:
pass
if dep.output_quantity_channel:
gp_nr, ch_nr = dep.output_quantity_channel
if gp_nr == gp_index:
try:
included_channels.remove(ch_nr)
except KeyError:
pass
if dep.comparison_quantity_channel:
gp_nr, ch_nr = dep.comparison_quantity_channel
if gp_nr == gp_index:
try:
included_channels.remove(ch_nr)
except KeyError:
pass
gps[gp_index] = sorted(included_channels)
result = {index: gps}
else:
gps = {}
for item in channels:
if isinstance(item, (list, tuple)):
if len(item) not in (2, 3):
raise MdfException(
"The items used for filtering must be strings, "
"or they must match the first 3 arguments of the get "
"method"
)
else:
group, idx = self._validate_channel_selection(*item)
gps_idx = gps.setdefault(group, set())
gps_idx.add(idx)
else:
name = item
group, idx = self._validate_channel_selection(name)
gps_idx = gps.setdefault(group, set())
gps_idx.add(idx)
result = {}
for gp_index, channels in gps.items():
master = self.virtual_groups_map[gp_index]
group = self.groups[gp_index]
if minimal:
channel_dependencies = [
group.channel_dependencies[ch_nr] for ch_nr in channels
]
for dependencies in channel_dependencies:
if dependencies is None:
continue
if all(
not isinstance(dep, ChannelArrayBlock)
for dep in dependencies
):
for _, ch_nr in dependencies:
try:
channels.remove(ch_nr)
except KeyError:
pass
else:
for dep in dependencies:
for referenced_channels in (
dep.axis_channels,
dep.dynamic_size_channels,
dep.input_quantity_channels,
):
for gp_nr, ch_nr in referenced_channels:
if gp_nr == gp_index:
try:
channels.remove(ch_nr)
except KeyError:
pass
if dep.output_quantity_channel:
gp_nr, ch_nr = dep.output_quantity_channel
if gp_nr == gp_index:
try:
channels.remove(ch_nr)
except KeyError:
pass
if dep.comparison_quantity_channel:
gp_nr, ch_nr = dep.comparison_quantity_channel
if gp_nr == gp_index:
try:
channels.remove(ch_nr)
except KeyError:
pass
gp_master = self.masters_db.get(gp_index, None)
if gp_master is not None and gp_master in channels:
channels.remove(gp_master)
if master not in result:
result[master] = {}
result[master][master] = [self.masters_db.get(master, None)]
result[master][gp_index] = sorted(channels)
return result
def _yield_selected_signals(
self,
index: int,
groups: dict[int, Sequence[int]] | None = None,
record_offset: int = 0,
record_count: int | None = None,
skip_master: bool = True,
version: str | None = None,
) -> Iterator[Signal | tuple[NDArray[Any], NDArray[Any]]]:
version = version or self.version
virtual_channel_group = self.virtual_groups[index]
record_size = virtual_channel_group.record_size
if groups is None:
groups = self.included_channels(index, skip_master=skip_master)[index]
record_size = 0
for group_index in groups:
grp = self.groups[group_index]
record_size += (
grp.channel_group.samples_byte_nr
+ grp.channel_group.invalidation_bytes_nr
)
record_size = record_size or 1
if self._read_fragment_size:
count = self._read_fragment_size // record_size or 1
else:
if version < "4.20":
count = 16 * 1024 * 1024 // record_size or 1
else:
count = 128 * 1024 * 1024 // record_size or 1
data_streams = []
for idx, group_index in enumerate(groups):
grp = self.groups[group_index]
grp.read_split_count = count
data_streams.append(
self._load_data(
grp, record_offset=record_offset, record_count=record_count
)
)
if group_index == index:
master_index = idx
encodings = {group_index: [None] for groups_index in groups}
self._set_temporary_master(None)
idx = 0
while True:
try:
fragments = [next(stream) for stream in data_streams]
except:
break
_master = self.get_master(index, data=fragments[master_index])
self._set_temporary_master(_master)
if idx == 0:
signals = []
else:
signals = [(_master, None)]
vlsd_max_sizes = []
for fragment, (group_index, channels) in zip(fragments, groups.items()):
grp = self.groups[group_index]
if not grp.single_channel_dtype:
self._prepare_record(grp)
if idx == 0:
for channel_index in channels:
signal = self.get(
group=group_index,
index=channel_index,
data=fragment,
raw=True,
ignore_invalidation_bits=True,
samples_only=False,
)
signals.append(signal)
pass
else:
for channel_index in channels:
signal, invalidation_bits = self.get(
group=group_index,
index=channel_index,
data=fragment,
raw=True,
ignore_invalidation_bits=True,
samples_only=True,
)
signals.append((signal, invalidation_bits))
if version < "4.00":
if idx == 0:
for sig, channel_index in zip(signals, channels):
if sig.samples.dtype.kind == "S":
strsig = self.get(
group=group_index,
index=channel_index,
samples_only=True,
ignore_invalidation_bits=True,
)[0]
_dtype = strsig.dtype
sig.samples = sig.samples.astype(_dtype)
encodings[group_index].append((sig.encoding, _dtype))
del strsig
if sig.encoding != "latin-1":
if sig.encoding == "utf-16-le":
sig.samples = (
sig.samples.view(uint16)
.byteswap()
.view(sig.samples.dtype)
)
sig.samples = encode(
decode(sig.samples, "utf-16-be"), "latin-1"
)
else:
sig.samples = encode(
decode(sig.samples, sig.encoding),
"latin-1",
)
sig.samples = sig.samples.astype(_dtype)
else:
encodings[group_index].append(None)
else:
for i, (sig, encoding_tuple) in enumerate(
zip(signals, encodings[group_index])
):
if encoding_tuple:
encoding, _dtype = encoding_tuple
samples = sig[0]
if encoding != "latin-1":
if encoding == "utf-16-le":
samples = (
samples.view(uint16)
.byteswap()
.view(samples.dtype)
)
samples = encode(
decode(samples, "utf-16-be"), "latin-1"
)
else:
samples = encode(
decode(samples, encoding), "latin-1"
)
samples = samples.astype(_dtype)
signals[i] = (samples, sig[1])
self._set_temporary_master(None)
idx += 1
yield signals
[docs] def get_master(
self,
index: int,
data: bytes | None = None,
raster: RasterType | None = None,
record_offset: int = 0,
record_count: int | None = None,
one_piece: bool = False,
) -> NDArray[Any]:
"""returns master channel samples for given group
Parameters
----------
index : int
group index
data : (bytes, int, int, bytes|None)
(data block raw bytes, fragment offset, count, invalidation bytes); default None
raster : float
raster to be used for interpolation; default None
.. deprecated:: 5.13.0
record_offset : int
if *data=None* use this to select the record offset from which the
group data should be loaded
record_count : int
number of records to read; default *None* and in this case all
available records are used
Returns
-------
t : numpy.array
master channel samples
"""
if raster is not None:
PendingDeprecationWarning(
"the argument raster is deprecated since version 5.13.0 "
"and will be removed in a future release"
)
if self._master is not None:
return self._master
group = self.groups[index]
if group.channel_group.flags & v4c.FLAG_CG_REMOTE_MASTER:
if data is not None:
record_offset = data[1]
record_count = data[2]
return self.get_master(
group.channel_group.cg_master_index,
record_offset=record_offset,
record_count=record_count,
)
time_ch_nr = self.masters_db.get(index, None)
channel_group = group.channel_group
record_size = channel_group.samples_byte_nr
record_size += channel_group.invalidation_bytes_nr
if record_count is not None:
cycles_nr = record_count
else:
cycles_nr = group.channel_group.cycles_nr
fragment = data
if fragment:
data_bytes, offset, _count, invalidation_bytes = fragment
cycles_nr = len(data_bytes) // record_size if record_size else 0
else:
offset = 0
_count = record_count
if time_ch_nr is None:
if record_size:
t = arange(cycles_nr, dtype=float64)
t += offset
else:
t = array([], dtype=float64)
metadata = ("timestamps", v4c.SYNC_TYPE_TIME)
else:
time_ch = group.channels[time_ch_nr]
time_conv = time_ch.conversion
time_name = time_ch.name
metadata = (time_name, time_ch.sync_type)
if time_ch.channel_type == v4c.CHANNEL_TYPE_VIRTUAL_MASTER:
time_a = time_conv["a"]
time_b = time_conv["b"]
t = arange(cycles_nr, dtype=float64)
t += offset
t *= time_a
t += time_b
if record_count is None:
t = t[record_offset:]
else:
t = t[record_offset : record_offset + record_count]
else:
# check if the channel group contains just the master channel
# and that there are no padding bytes
if (
len(group.channels) == 1
and time_ch.dtype_fmt.itemsize == record_size
):
if one_piece:
data_bytes, offset, _count, _ = data
t = frombuffer(data_bytes, dtype=time_ch.dtype_fmt)
else:
# get data
if fragment is None:
data = self._load_data(
group,
record_offset=record_offset,
record_count=record_count,
)
else:
data = (fragment,)
buffer = bytearray().join([fragment[0] for fragment in data])
t = frombuffer(buffer, dtype=time_ch.dtype_fmt)
else:
dtype_, byte_size, byte_offset, bit_offset = group.record[
time_ch_nr
]
if one_piece:
data_bytes = data[0]
buffer = get_channel_raw_bytes(
data_bytes,
record_size,
byte_offset,
byte_size,
)
t = frombuffer(buffer, dtype=dtype_)
else:
# get data
if fragment is None:
data = self._load_data(
group,
record_offset=record_offset,
record_count=record_count,
)
else:
data = (fragment,)
buffer = bytearray().join(
[
get_channel_raw_bytes(
fragment[0],
record_size,
byte_offset,
byte_size,
)
for fragment in data
]
)
t = frombuffer(buffer, dtype=dtype_)
if not time_ch.standard_C_size:
channel_dtype = time_ch.dtype_fmt
bit_count = time_ch.bit_count
data_type = time_ch.data_type
size = byte_size
if channel_dtype.byteorder == "|" and time_ch.data_type in (
v4c.DATA_TYPE_SIGNED_MOTOROLA,
v4c.DATA_TYPE_UNSIGNED_MOTOROLA,
):
view = f">u{t.itemsize}"
else:
view = f"{channel_dtype.byteorder}u{t.itemsize}"
if dtype(view) != t.dtype:
t = t.view(view)
if bit_offset:
t >>= bit_offset
if bit_count != size * 8:
if data_type in v4c.SIGNED_INT:
t = as_non_byte_sized_signed_int(t, bit_count)
else:
mask = (1 << bit_count) - 1
t &= mask
elif data_type in v4c.SIGNED_INT:
view = f"{channel_dtype.byteorder}i{t.itemsize}"
if dtype(view) != t.dtype:
t = t.view(view)
# get timestamps
if time_conv:
t = time_conv.convert(t)
self._master_channel_metadata[index] = metadata
if not t.dtype == float64:
t = t.astype(float64)
if raster and t.size:
timestamps = t
if len(t) > 1:
num = float(float32((timestamps[-1] - timestamps[0]) / raster))
if int(num) == num:
timestamps = linspace(t[0], t[-1], int(num))
else:
timestamps = arange(t[0], t[-1], raster)
else:
timestamps = t
return timestamps
[docs] def get_bus_signal(
self,
bus: BusType,
name: str,
database: CanMatrix | StrPathType | None = None,
ignore_invalidation_bits: bool = False,
data: bytes | None = None,
raw: bool = False,
ignore_value2text_conversion: bool = True,
) -> Signal:
"""get a signal decoded from a raw bus logging. The currently supported buses are
CAN and LIN (LDF databases are not supported, they need to be converted to DBC and
feed to this function)
.. versionadded:: 6.0.0
Parameters
----------
bus : str
"CAN" or "LIN"
name : str
signal name
database : str
path of external CAN/LIN database file (.dbc or .arxml) or canmatrix.CanMatrix; default *None*
.. versionchanged:: 6.0.0
`db` and `database` arguments were merged into this single argument
ignore_invalidation_bits : bool
option to ignore invalidation bits
raw : bool
return channel samples without applying the conversion rule; default
`False`
ignore_value2text_conversion : bool
return channel samples without values that have a description in .dbc or .arxml file
`True`
Returns
-------
sig : Signal
Signal object with the physical values
"""
if bus == "CAN":
return self.get_can_signal(
name,
database=database,
ignore_invalidation_bits=ignore_invalidation_bits,
data=data,
raw=raw,
ignore_value2text_conversion=ignore_value2text_conversion,
)
elif bus == "LIN":
return self.get_lin_signal(
name,
database=database,
ignore_invalidation_bits=ignore_invalidation_bits,
data=data,
raw=raw,
ignore_value2text_conversion=ignore_value2text_conversion,
)
[docs] def get_can_signal(
self,
name: str,
database: CanMatrix | StrPathType | None = None,
ignore_invalidation_bits: bool = False,
data: bytes | None = None,
raw: bool = False,
ignore_value2text_conversion: bool = True,
) -> Signal:
"""get CAN message signal. You can specify an external CAN database (
*database* argument) or canmatrix database object that has already been
loaded from a file (*db* argument).
The signal name can be specified in the following ways
* ``CAN<ID>.<MESSAGE_NAME>.<SIGNAL_NAME>`` - the `ID` value starts from 1
and must match the ID found in the measurement (the source CAN bus ID)
Example: CAN1.Wheels.FL_WheelSpeed
* ``CAN<ID>.CAN_DataFrame_<MESSAGE_ID>.<SIGNAL_NAME>`` - the `ID` value
starts from 1 and the `MESSAGE_ID` is the decimal message ID as found
in the database. Example: CAN1.CAN_DataFrame_218.FL_WheelSpeed
* ``<MESSAGE_NAME>.<SIGNAL_NAME>`` - in this case the first occurrence of
the message name and signal are returned (the same message could be
found on multiple CAN buses; for example on CAN1 and CAN3)
Example: Wheels.FL_WheelSpeed
* ``CAN_DataFrame_<MESSAGE_ID>.<SIGNAL_NAME>`` - in this case the first
occurrence of the message name and signal are returned (the same
message could be found on multiple CAN buses; for example on CAN1 and
CAN3). Example: CAN_DataFrame_218.FL_WheelSpeed
* ``<SIGNAL_NAME>`` - in this case the first occurrence of the signal
name is returned (the same signal name could be found in multiple
messages and on multiple CAN buses). Example: FL_WheelSpeed
Parameters
----------
name : str
signal name
database : str
path of external CAN database file (.dbc or .arxml) or canmatrix.CanMatrix; default *None*
.. versionchanged:: 6.0.0
`db` and `database` arguments were merged into this single argument
ignore_invalidation_bits : bool
option to ignore invalidation bits
raw : bool
return channel samples without applying the conversion rule; default
`False`
ignore_value2text_conversion : bool
return channel samples without values that have a description in .dbc or .arxml file
`True`
Returns
-------
sig : Signal
Signal object with the physical values
"""
if database is None:
return self.get(name)
if isinstance(database, (str, Path)):
database_path = Path(database)
if database_path.suffix.lower() not in (".arxml", ".dbc"):
message = f'Expected .dbc or .arxml file as CAN channel attachment but got "{database_path}"'
logger.exception(message)
raise MdfException(message)
else:
db_string = database_path.read_bytes()
md5_sum = md5(db_string).digest()
if md5_sum in self._external_dbc_cache:
db = self._external_dbc_cache[md5_sum]
else:
db = load_can_database(database_path, contents=db_string)
if db is None:
raise MdfException("failed to load database")
else:
db = database
is_j1939 = db.contains_j1939
name_ = name.split(".")
if len(name_) == 3:
can_id_str, message_id_str, signal = name_
can_id = v4c.CAN_ID_PATTERN.search(can_id_str)
if can_id is None:
raise MdfException(
f'CAN id "{can_id_str}" of signal name "{name}" is not recognised by this library'
)
else:
can_id = int(can_id.group("id"))
message_id = v4c.CAN_DATA_FRAME_PATTERN.search(message_id_str)
if message_id is None:
message_id = message_id_str
else:
message_id = int(message_id)
if isinstance(message_id, str):
message = db.frame_by_name(message_id)
else:
message = db.frame_by_id(message_id)
elif len(name_) == 2:
message_id_str, signal = name_
can_id = None
message_id = v4c.CAN_DATA_FRAME_PATTERN.search(message_id_str)
if message_id is None:
message_id = message_id_str
else:
message_id = int(message_id.group("id"))
if isinstance(message_id, str):
message = db.frame_by_name(message_id)
else:
message = db.frame_by_id(message_id)
else:
message = None
for msg in db:
for signal in msg:
if signal.name == name:
message = msg
can_id = None
signal = name
if message is None:
raise MdfException(f"Could not find signal {name} in {database}")
for sig in message.signals:
if sig.name == signal:
signal = sig
break
else:
raise MdfException(
f'Signal "{signal}" not found in message "{message.name}" of "{database}"'
)
if can_id is None:
index = None
for _can_id, messages in self.bus_logging_map["CAN"].items():
if is_j1939:
test_ids = [
canmatrix.ArbitrationId(id_, extended=True).pgn
for id_ in self.bus_logging_map["CAN"][_can_id]
]
id_ = message.arbitration_id.pgn
else:
id_ = message.arbitration_id.id
test_ids = self.bus_logging_map["CAN"][_can_id]
if id_ in test_ids:
if is_j1939:
for id__, idx in self.bus_logging_map["CAN"][_can_id].items():
if canmatrix.ArbitrationId(id__, extended=True).pgn == id_:
index = idx
break
else:
index = self.bus_logging_map["CAN"][_can_id][
message.arbitration_id.id
]
if index is not None:
break
else:
raise MdfException(
f'Message "{message.name}" (ID={hex(message.arbitration_id.id)}) not found in the measurement'
)
else:
if can_id in self.bus_logging_map["CAN"]:
if is_j1939:
test_ids = [
canmatrix.ArbitrationId(id_, extended=True).pgn
for id_ in self.bus_logging_map["CAN"][can_id]
]
id_ = message.arbitration_id.pgn
else:
id_ = message.arbitration_id.id
test_ids = self.bus_logging_map["CAN"][can_id]
if id_ in test_ids:
if is_j1939:
for id__, idx in self.bus_logging_map["CAN"][can_id].items():
if canmatrix.ArbitrationId(id__, extended=True).pgn == id_:
index = idx
break
else:
index = self.bus_logging_map["CAN"][can_id][
message.arbitration_id.id
]
else:
raise MdfException(
f'Message "{message.name}" (ID={hex(message.arbitration_id.id)}) not found in the measurement'
)
else:
raise MdfException(
f'No logging from "{can_id}" was found in the measurement'
)
can_ids = self.get(
"CAN_DataFrame.ID",
group=index,
ignore_invalidation_bits=ignore_invalidation_bits,
data=data,
)
can_ids.samples = can_ids.samples.astype("<u4") & 0x1FFFFFFF
payload = self.get(
"CAN_DataFrame.DataBytes",
group=index,
samples_only=True,
ignore_invalidation_bits=ignore_invalidation_bits,
data=data,
)[0]
if is_j1939:
tmp_pgn = can_ids.samples >> 8
ps = tmp_pgn & 0xFF
pf = (can_ids.samples >> 16) & 0xFF
_pgn = tmp_pgn & 0x3FF00
can_ids.samples = where(pf >= 240, _pgn + ps, _pgn)
idx = argwhere(can_ids.samples == message.arbitration_id.pgn).ravel()
else:
idx = argwhere(can_ids.samples == message.arbitration_id.id).ravel()
payload = payload[idx]
t = can_ids.timestamps[idx].copy()
if can_ids.invalidation_bits is not None:
invalidation_bits = can_ids.invalidation_bits[idx]
else:
invalidation_bits = None
if not ignore_invalidation_bits and invalidation_bits is not None:
payload = payload[nonzero(~invalidation_bits)[0]]
t = t[nonzero(~invalidation_bits)[0]]
extracted_signals = extract_mux(
payload,
message,
None,
None,
t,
original_message_id=None,
ignore_value2text_conversion=ignore_value2text_conversion,
raw=raw,
)
comment = signal.comment or ""
for entry, signals in extracted_signals.items():
for name_, sig in signals.items():
if name_ == signal.name:
sig = Signal(
samples=sig["samples"],
timestamps=sig["t"],
name=name,
unit=signal.unit or "",
comment=comment,
)
if len(sig):
return sig
else:
raise MdfException(
f'No logging from "{signal}" was found in the measurement'
)
raise MdfException(f'No logging from "{signal}" was found in the measurement')
[docs] def get_lin_signal(
self,
name: str,
database: CanMatrix | StrPathType | None = None,
ignore_invalidation_bits: bool = False,
data: bytes | None = None,
raw: bool = False,
ignore_value2text_conversion: bool = True,
) -> Signal:
"""get LIN message signal. You can specify an external LIN database (
*database* argument) or canmatrix database object that has already been
loaded from a file (*db* argument).
The signal name can be specified in the following ways
* ``LIN_Frame_<MESSAGE_ID>.<SIGNAL_NAME>`` - Example: LIN_Frame_218.FL_WheelSpeed
* ``<MESSAGE_NAME>.<SIGNAL_NAME>`` - Example: Wheels.FL_WheelSpeed
* ``<SIGNAL_NAME>`` - Example: FL_WheelSpeed
.. versionadded:: 6.0.0
Parameters
----------
name : str
signal name
database : str
path of external LIN database file (.dbc, .arxml or .ldf) or canmatrix.CanMatrix;
default *None*
ignore_invalidation_bits : bool
option to ignore invalidation bits
raw : bool
return channel samples without applying the conversion rule; default
`False`
ignore_value2text_conversion : bool
return channel samples without values that have a description in .dbc, .arxml or .ldf file
`True`
Returns
-------
sig : Signal
Signal object with the physical values
"""
if database is None:
return self.get(name)
if isinstance(database, (str, Path)):
database_path = Path(database)
if database_path.suffix.lower() not in (".arxml", ".dbc", ".ldf"):
message = f'Expected .dbc, .arxml or .ldf file as LIN channel attachment but got "{database_path}"'
logger.exception(message)
raise MdfException(message)
else:
db_string = database_path.read_bytes()
md5_sum = md5(db_string).digest()
if md5_sum in self._external_dbc_cache:
db = self._external_dbc_cache[md5_sum]
else:
contents = (
None if database_path.suffix.lower() == ".ldf" else db_string
)
db = load_can_database(database_path, contents=contents)
if db is None:
raise MdfException("failed to load database")
else:
db = database
name_ = name.split(".")
if len(name_) == 2:
message_id_str, signal = name_
message_id = v4c.LIN_DATA_FRAME_PATTERN.search(message_id_str)
if message_id is None:
message_id = message_id_str
else:
message_id = int(message_id.group("id"))
if isinstance(message_id, str):
message = db.frame_by_name(message_id)
else:
message = db.frame_by_id(message_id)
else:
message = None
for msg in db:
for signal in msg:
if signal.name == name:
message = msg
signal = name
if message is None:
raise MdfException(f"Could not find signal {name} in {database}")
for sig in message.signals:
if sig.name == signal:
signal = sig
break
else:
raise MdfException(
f'Signal "{signal}" not found in message "{message.name}" of "{database}"'
)
id_ = message.arbitration_id.id
if id_ in self.bus_logging_map["LIN"]:
index = self.bus_logging_map["LIN"][id_]
else:
raise MdfException(
f'Message "{message.name}" (ID={hex(message.arbitration_id.id)}) not found in the measurement'
)
can_ids = self.get(
"LIN_Frame.ID",
group=index,
ignore_invalidation_bits=ignore_invalidation_bits,
data=data,
)
can_ids.samples = can_ids.samples.astype("<u4") & 0x1FFFFFFF
payload = self.get(
"LIN_Frame.DataBytes",
group=index,
samples_only=True,
ignore_invalidation_bits=ignore_invalidation_bits,
data=data,
)[0]
idx = argwhere(can_ids.samples == message.arbitration_id.id).ravel()
payload = payload[idx]
t = can_ids.timestamps[idx].copy()
if can_ids.invalidation_bits is not None:
invalidation_bits = can_ids.invalidation_bits[idx]
else:
invalidation_bits = None
if not ignore_invalidation_bits and invalidation_bits is not None:
payload = payload[nonzero(~invalidation_bits)[0]]
t = t[nonzero(~invalidation_bits)[0]]
extracted_signals = extract_mux(
payload,
message,
None,
None,
t,
original_message_id=None,
ignore_value2text_conversion=ignore_value2text_conversion,
raw=raw,
)
comment = signal.comment or ""
for entry, signals in extracted_signals.items():
for name_, sig in signals.items():
if name_ == signal.name:
sig = Signal(
samples=sig["samples"],
timestamps=sig["t"],
name=name,
unit=signal.unit or "",
comment=comment,
)
if len(sig):
return sig
else:
raise MdfException(
f'No logging from "{signal}" was found in the measurement'
)
raise MdfException(f'No logging from "{signal}" was found in the measurement')
[docs] def info(self) -> dict[str, Any]:
"""get MDF information as a dict
Examples
--------
>>> mdf = MDF4('test.mdf')
>>> mdf.info()
"""
info = {
"version": self.version,
"program": self.identification.program_identification.decode("utf-8").strip(
" \0\n\r\t"
),
"comment": self.header.comment,
}
info["groups"] = len(self.groups)
for i, gp in enumerate(self.groups):
inf = {}
info[f"group {i}"] = inf
inf["cycles"] = gp.channel_group.cycles_nr
inf["comment"] = gp.channel_group.comment
inf["channels count"] = len(gp.channels)
for j, channel in enumerate(gp.channels):
name = channel.name
ch_type = v4c.CHANNEL_TYPE_TO_DESCRIPTION[channel.channel_type]
inf[f"channel {j}"] = f'name="{name}" type={ch_type}'
return info
@property
def start_time(self) -> datetime:
"""getter and setter the measurement start timestamp
Returns
-------
timestamp : datetime.datetime
start timestamp
"""
return self.header.start_time
@start_time.setter
def start_time(self, timestamp: datetime) -> None:
self.header.start_time = timestamp
[docs] def save(
self,
dst: WritableBufferType | StrPathType,
overwrite: bool = False,
compression: CompressionType = 0,
progress=None,
add_history_block: bool = True,
) -> Path:
"""Save MDF to *dst*. If overwrite is *True* then the destination file
is overwritten, otherwise the file name is appended with '.<cntr>', were
'<cntr>' is the first counter that produces a new file name
(that does not already exist in the filesystem)
Parameters
----------
dst : str
destination file name, Default ''
overwrite : bool
overwrite flag, default *False*
compression : int
use compressed data blocks, default 0; valid since version 4.10
* 0 - no compression
* 1 - deflate (slower, but produces smaller files)
* 2 - transposition + deflate (slowest, but produces
the smallest files)
add_history_block : bool
option to add file historyu block
Returns
-------
output_file : pathlib.Path
path to saved file
"""
if is_file_like(dst):
dst_ = dst
file_like = True
if hasattr(dst, "name"):
dst = Path(dst.name)
else:
dst = Path("__file_like.mf4")
dst_.seek(0)
suffix = ".mf4"
else:
file_like = False
suffix = Path(dst).suffix.lower()
dst = Path(dst).with_suffix(".mf4")
destination_dir = dst.parent
destination_dir.mkdir(parents=True, exist_ok=True)
if overwrite is False:
if dst.is_file():
cntr = 0
while True:
name = dst.with_suffix(f".{cntr}.mf4")
if not name.exists():
break
else:
cntr += 1
message = (
f'Destination file "{dst}" already exists '
f'and "overwrite" is False. Saving MDF file as "{name}"'
)
logger.warning(message)
dst = name
if dst == self.name:
destination = dst.with_suffix(".savetemp")
else:
destination = dst
dst_ = open(destination, "wb+")
if not self.file_history:
comment = "created"
else:
comment = "updated"
if add_history_block:
fh = FileHistory()
fh.comment = f"""<FHcomment>
<TX>{comment}</TX>
<tool_id>asammdf</tool_id>
<tool_vendor>asammdf</tool_vendor>
<tool_version>{__version__}</tool_version>
</FHcomment>"""
self.file_history.append(fh)
cg_map = {}
try:
defined_texts = {"": 0, b"": 0}
cc_map = {}
si_map = {}
groups_nr = len(self.groups)
write = dst_.write
tell = dst_.tell
seek = dst_.seek
blocks = []
write(bytes(self.identification))
self.header.to_blocks(dst_.tell(), blocks)
for block in blocks:
write(bytes(block))
original_data_addresses = []
if compression == 1:
zip_type = v4c.FLAG_DZ_DEFLATE
else:
zip_type = v4c.FLAG_DZ_TRANPOSED_DEFLATE
# write DataBlocks first
for gp_nr, gp in enumerate(self.groups):
original_data_addresses.append(gp.data_group.data_block_addr)
if gp.channel_group.flags & v4c.FLAG_CG_VLSD:
continue
address = tell()
total_size = (
gp.channel_group.samples_byte_nr
+ gp.channel_group.invalidation_bytes_nr
) * gp.channel_group.cycles_nr
if total_size:
if self._write_fragment_size:
samples_size = (
gp.channel_group.samples_byte_nr
+ gp.channel_group.invalidation_bytes_nr
)
if samples_size:
split_size = self._write_fragment_size // samples_size
split_size *= samples_size
if split_size == 0:
split_size = samples_size
chunks = float(total_size) / split_size
chunks = int(ceil(chunks))
self._read_fragment_size = split_size
else:
chunks = 1
else:
chunks = 1
data = self._load_data(gp)
if chunks == 1:
data_, _1, _2, inval_ = next(data)
if self.version >= "4.20" and gp.uses_ld:
if compression:
if gp.channel_group.samples_byte_nr > 1:
current_zip_type = zip_type
if compression == 1:
param = 0
else:
param = gp.channel_group.samples_byte_nr
else:
current_zip_type = v4c.FLAG_DZ_DEFLATE
param = 0
kwargs = {
"data": data_,
"zip_type": current_zip_type,
"param": param,
"original_type": b"DV",
}
data_block = DataZippedBlock(**kwargs)
else:
data_block = DataBlock(data=data_, type="DV")
write(bytes(data_block))
data_address = address
align = data_block.block_len % 8
if align:
write(b"\0" * (8 - align))
if inval_ is not None:
inval_address = address = tell()
if compression:
if compression == 1:
param = 0
else:
param = gp.channel_group.invalidation_bytes_nr
kwargs = {
"data": inval_,
"zip_type": zip_type,
"param": param,
"original_type": b"DI",
}
inval_block = DataZippedBlock(**kwargs)
else:
inval_block = DataBlock(data=inval_, type="DI")
write(bytes(inval_block))
align = inval_block.block_len % 8
if align:
write(b"\0" * (8 - align))
address = tell()
kwargs = {
"flags": v4c.FLAG_LD_EQUAL_LENGHT,
"data_block_nr": 1,
"data_block_len": gp.channel_group.cycles_nr,
"data_block_addr_0": data_address,
}
if inval_:
kwargs["flags"] |= v4c.FLAG_LD_INVALIDATION_PRESENT
kwargs["invalidation_bits_addr_0"] = inval_address
ld_block = ListData(**kwargs)
write(bytes(ld_block))
align = ld_block.block_len % 8
if align:
write(b"\0" * (8 - align))
if gp.channel_group.cycles_nr:
gp.data_group.data_block_addr = address
else:
gp.data_group.data_block_addr = 0
else:
if compression and self.version >= "4.10":
if compression == 1:
param = 0
else:
param = (
gp.channel_group.samples_byte_nr
+ gp.channel_group.invalidation_bytes_nr
)
kwargs = {
"data": data_,
"zip_type": zip_type,
"param": param,
}
data_block = DataZippedBlock(**kwargs)
else:
data_block = DataBlock(data=data_)
write(bytes(data_block))
align = data_block.block_len % 8
if align:
write(b"\0" * (8 - align))
if gp.channel_group.cycles_nr:
gp.data_group.data_block_addr = address
else:
gp.data_group.data_block_addr = 0
else:
if self.version >= "4.20" and gp.uses_ld:
dv_addr = []
di_addr = []
block_size = 0
for i, (data_, _1, _2, inval_) in enumerate(data):
if i == 0:
block_size = len(data_)
if compression:
if compression == 1:
param = 0
else:
param = gp.channel_group.samples_byte_nr
kwargs = {
"data": data_,
"zip_type": zip_type,
"param": param,
"original_type": b"DV",
}
data_block = DataZippedBlock(**kwargs)
else:
data_block = DataBlock(data=data_, type="DV")
dv_addr.append(tell())
write(bytes(data_block))
align = data_block.block_len % 8
if align:
write(b"\0" * (8 - align))
if inval_ is not None:
if compression:
if compression == 1:
param = 0
else:
param = (
gp.channel_group.invalidation_bytes_nr
)
kwargs = {
"data": inval_,
"zip_type": zip_type,
"param": param,
"original_type": b"DI",
}
inval_block = DataZippedBlock(**kwargs)
else:
inval_block = DataBlock(data=inval_, type="DI")
di_addr.append(tell())
write(bytes(inval_block))
align = inval_block.block_len % 8
if align:
write(b"\0" * (8 - align))
address = tell()
kwargs = {
"flags": v4c.FLAG_LD_EQUAL_LENGHT,
"data_block_nr": len(dv_addr),
"data_block_len": block_size
// gp.channel_group.samples_byte_nr,
}
for i, addr in enumerate(dv_addr):
kwargs[f"data_block_addr_{i}"] = addr
if di_addr:
kwargs["flags"] |= v4c.FLAG_LD_INVALIDATION_PRESENT
for i, addr in enumerate(di_addr):
kwargs[f"invalidation_bits_addr_{i}"] = addr
ld_block = ListData(**kwargs)
write(bytes(ld_block))
align = ld_block.block_len % 8
if align:
write(b"\0" * (8 - align))
if gp.channel_group.cycles_nr:
gp.data_group.data_block_addr = address
else:
gp.data_group.data_block_addr = 0
else:
kwargs = {
"flags": v4c.FLAG_DL_EQUAL_LENGHT,
"zip_type": zip_type,
}
hl_block = HeaderList(**kwargs)
kwargs = {
"flags": v4c.FLAG_DL_EQUAL_LENGHT,
"links_nr": chunks + 1,
"data_block_nr": chunks,
"data_block_len": split_size,
}
dl_block = DataList(**kwargs)
for i, data__ in enumerate(data):
data_ = data__[0]
if compression and self.version >= "4.10":
if compression == 1:
zip_type = v4c.FLAG_DZ_DEFLATE
else:
zip_type = v4c.FLAG_DZ_TRANPOSED_DEFLATE
if compression == 1:
param = 0
else:
param = (
gp.channel_group.samples_byte_nr
+ gp.channel_group.invalidation_bytes_nr
)
kwargs = {
"data": data_,
"zip_type": zip_type,
"param": param,
}
block = DataZippedBlock(**kwargs)
else:
block = DataBlock(data=data_)
address = tell()
block.address = address
write(bytes(block))
align = block.block_len % 8
if align:
write(b"\0" * (8 - align))
dl_block[f"data_block_addr{i}"] = address
address = tell()
dl_block.address = address
write(bytes(dl_block))
if compression and self.version != "4.00":
hl_block.first_dl_addr = address
address = tell()
hl_block.address = address
write(bytes(hl_block))
gp.data_group.data_block_addr = address
else:
gp.data_group.data_block_addr = 0
if progress is not None:
progress.signals.setValue.emit(int(50 * (gp_nr + 1) / groups_nr))
if progress.stop:
dst_.close()
self.close()
return TERMINATED
address = tell()
blocks = []
# file history blocks
for fh in self.file_history:
address = fh.to_blocks(address, blocks, defined_texts)
for i, fh in enumerate(self.file_history[:-1]):
fh.next_fh_addr = self.file_history[i + 1].address
self.file_history[-1].next_fh_addr = 0
# data groups
gp_rec_ids = []
valid_data_groups = []
for gp in self.groups:
if gp.channel_group.flags & v4c.FLAG_CG_VLSD:
continue
valid_data_groups.append(gp.data_group)
gp_rec_ids.append(gp.data_group.record_id_len)
address = gp.data_group.to_blocks(address, blocks, defined_texts)
if valid_data_groups:
for i, dg in enumerate(valid_data_groups[:-1]):
addr_ = valid_data_groups[i + 1].address
dg.next_dg_addr = addr_
valid_data_groups[-1].next_dg_addr = 0
# go through each data group and append the rest of the blocks
for i, gp in enumerate(self.groups):
channels = gp.channels
for j, channel in enumerate(channels):
if channel.attachment is not None:
channel.attachment_addr = self.attachments[
channel.attachment
].address
elif channel.attachment_nr:
channel.attachment_addr = 0
address = channel.to_blocks(
address, blocks, defined_texts, cc_map, si_map
)
if channel.channel_type == v4c.CHANNEL_TYPE_SYNC:
if channel.attachment is not None:
channel.data_block_addr = self.attachments[
channel.attachment
].address
else:
sdata = self._load_signal_data(group=gp, index=j)
if sdata:
split_size = self._write_fragment_size
if self._write_fragment_size:
chunks = float(len(sdata)) / split_size
chunks = int(ceil(chunks))
else:
chunks = 1
if chunks == 1:
if compression and self.version > "4.00":
signal_data = DataZippedBlock(
data=sdata,
zip_type=v4c.FLAG_DZ_DEFLATE,
original_type=b"SD",
)
signal_data.address = address
address += signal_data.block_len
blocks.append(signal_data)
align = signal_data.block_len % 8
if align:
blocks.append(b"\0" * (8 - align))
address += 8 - align
else:
signal_data = DataBlock(data=sdata, type="SD")
signal_data.address = address
address += signal_data.block_len
blocks.append(signal_data)
align = signal_data.block_len % 8
if align:
blocks.append(b"\0" * (8 - align))
address += 8 - align
channel.data_block_addr = signal_data.address
else:
kwargs = {
"flags": v4c.FLAG_DL_EQUAL_LENGHT,
"links_nr": chunks + 1,
"data_block_nr": chunks,
"data_block_len": self._write_fragment_size,
}
dl_block = DataList(**kwargs)
for k in range(chunks):
data_ = sdata[k * split_size : (k + 1) * split_size]
if compression and self.version > "4.00":
zip_type = v4c.FLAG_DZ_DEFLATE
param = 0
kwargs = {
"data": data_,
"zip_type": zip_type,
"param": param,
"original_type": b"SD",
}
block = DataZippedBlock(**kwargs)
else:
block = DataBlock(data=data_, type="SD")
blocks.append(block)
block.address = address
address += block.block_len
align = block.block_len % 8
if align:
blocks.append(b"\0" * (8 - align))
address += 8 - align
dl_block[f"data_block_addr{k}"] = block.address
dl_block.address = address
blocks.append(dl_block)
address += dl_block.block_len
if compression and self.version > "4.00":
kwargs = {
"flags": v4c.FLAG_DL_EQUAL_LENGHT,
"zip_type": v4c.FLAG_DZ_DEFLATE,
"first_dl_addr": dl_block.address,
}
hl_block = HeaderList(**kwargs)
hl_block.address = address
address += hl_block.block_len
blocks.append(hl_block)
channel.data_block_addr = hl_block.address
else:
channel.data_block_addr = dl_block.address
else:
channel.data_block_addr = 0
dep_list = gp.channel_dependencies[j]
if dep_list:
if all(isinstance(dep, ChannelArrayBlock) for dep in dep_list):
for dep in dep_list:
dep.address = address
address += dep.block_len
blocks.append(dep)
for k, dep in enumerate(dep_list[:-1]):
dep.composition_addr = dep_list[k + 1].address
dep_list[-1].composition_addr = 0
channel.component_addr = dep_list[0].address
else:
index = dep_list[0][1]
addr_ = gp.channels[index].address
group_channels = gp.channels
if group_channels:
for j, channel in enumerate(group_channels[:-1]):
channel.next_ch_addr = group_channels[j + 1].address
group_channels[-1].next_ch_addr = 0
# channel dependecies
j = len(channels) - 1
while j >= 0:
dep_list = gp.channel_dependencies[j]
if dep_list and all(isinstance(dep, tuple) for dep in dep_list):
index = dep_list[0][1]
channels[j].component_addr = channels[index].address
index = dep_list[-1][1]
channels[j].next_ch_addr = channels[index].next_ch_addr
channels[index].next_ch_addr = 0
for _, ch_nr in dep_list:
channels[ch_nr].source_addr = 0
j -= 1
# channel group
if gp.channel_group.flags & v4c.FLAG_CG_VLSD:
continue
gp.channel_group.first_sample_reduction_addr = 0
if channels:
gp.channel_group.first_ch_addr = gp.channels[0].address
else:
gp.channel_group.first_ch_addr = 0
gp.channel_group.next_cg_addr = 0
address = gp.channel_group.to_blocks(
address, blocks, defined_texts, si_map
)
gp.data_group.first_cg_addr = gp.channel_group.address
cg_map[i] = gp.channel_group.address
if progress is not None:
progress.signals.setValue.emit(int(50 * (i + 1) / groups_nr) + 25)
if progress.stop:
dst_.close()
self.close()
return TERMINATED
for gp in self.groups:
for dep_list in gp.channel_dependencies:
if dep_list:
if all(isinstance(dep, ChannelArrayBlock) for dep in dep_list):
for dep in dep_list:
for i, (gp_nr, ch_nr) in enumerate(
dep.dynamic_size_channels
):
grp = self.groups[gp_nr]
ch = grp.channels[ch_nr]
dep[
f"dynamic_size_{i}_dg_addr"
] = grp.data_group.address
dep[
f"dynamic_size_{i}_cg_addr"
] = grp.channel_group.address
dep[f"dynamic_size_{i}_ch_addr"] = ch.address
for i, (gp_nr, ch_nr) in enumerate(
dep.input_quantity_channels
):
grp = self.groups[gp_nr]
ch = grp.channels[ch_nr]
dep[
f"input_quantity_{i}_dg_addr"
] = grp.data_group.address
dep[
f"input_quantity_{i}_cg_addr"
] = grp.channel_group.address
dep[f"input_quantity_{i}_ch_addr"] = ch.address
for i, conversion in enumerate(dep.axis_conversions):
if conversion:
address = conversion.to_blocks(
address, blocks, defined_texts, cc_map
)
dep[f"axis_conversion_{i}"] = conversion.address
else:
dep[f"axis_conversion_{i}"] = 0
if dep.output_quantity_channel:
gp_nr, ch_nr = dep.output_quantity_channel
grp = self.groups[gp_nr]
ch = grp.channels[ch_nr]
dep[
f"output_quantity_dg_addr"
] = grp.data_group.address
dep[
f"output_quantity_cg_addr"
] = grp.channel_group.address
dep[f"output_quantity_ch_addr"] = ch.address
if dep.comparison_quantity_channel:
gp_nr, ch_nr = dep.comparison_quantity_channel
grp = self.groups[gp_nr]
ch = grp.channels[ch_nr]
dep[
f"comparison_quantity_dg_addr"
] = grp.data_group.address
dep[
f"comparison_quantity_cg_addr"
] = grp.channel_group.address
dep[f"comparison_quantity_ch_addr"] = ch.address
for i, (gp_nr, ch_nr) in enumerate(dep.axis_channels):
grp = self.groups[gp_nr]
ch = grp.channels[ch_nr]
dep[
f"scale_axis_{i}_dg_addr"
] = grp.data_group.address
dep[
f"scale_axis_{i}_cg_addr"
] = grp.channel_group.address
dep[f"scale_axis_{i}_ch_addr"] = ch.address
position = tell()
for gp in self.groups:
gp.data_group.record_id_len = 0
cg_master_index = gp.channel_group.cg_master_index
if cg_master_index is not None:
gp.channel_group.cg_master_addr = cg_map[cg_master_index]
seek(gp.channel_group.address)
write(bytes(gp.channel_group))
seek(position)
ev_map = []
if self.events:
for event in self.events:
for i, ref in enumerate(event.scopes):
try:
dg_cntr, ch_cntr = ref
event[f"scope_{i}_addr"] = (
self.groups[dg_cntr].channels[ch_cntr].address
)
except TypeError:
dg_cntr = ref
event[f"scope_{i}_addr"] = self.groups[
dg_cntr
].channel_group.address
blocks.append(event)
ev_map.append(address)
event.address = address
address += event.block_len
if event.name:
tx_block = TextBlock(text=event.name)
tx_block.address = address
blocks.append(tx_block)
address += tx_block.block_len
event.name_addr = tx_block.address
else:
event.name_addr = 0
if event.comment:
meta = event.comment.startswith("<EVcomment")
tx_block = TextBlock(text=event.comment, meta=meta)
tx_block.address = address
blocks.append(tx_block)
address += tx_block.block_len
event.comment_addr = tx_block.address
else:
event.comment_addr = 0
if event.parent is not None:
event.parent_ev_addr = ev_map[event.parent]
if event.range_start is not None:
event.range_start_ev_addr = ev_map[event.range_start]
for i in range(len(self.events) - 1):
self.events[i].next_ev_addr = self.events[i + 1].address
self.events[-1].next_ev_addr = 0
self.header.first_event_addr = self.events[0].address
if progress is not None and progress.stop:
dst_.close()
self.close()
return TERMINATED
# attachments
at_map = {}
if self.attachments:
# put the attachment texts before the attachments
for at_block in self.attachments:
for text in (at_block.file_name, at_block.mime, at_block.comment):
if text not in defined_texts:
tx_block = TextBlock(text=str(text))
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
for at_block in self.attachments:
address = at_block.to_blocks(address, blocks, defined_texts)
for i in range(len(self.attachments) - 1):
at_block = self.attachments[i]
at_block.next_at_addr = self.attachments[i + 1].address
self.attachments[-1].next_at_addr = 0
if self.events:
for event in self.events:
for i in range(event.attachment_nr):
key = f"attachment_{i}_addr"
addr = event[key]
event[key] = at_map[addr]
for i, gp in enumerate(self.groups):
for j, channel in enumerate(gp.channels):
if channel.attachment is not None:
channel.attachment_addr = self.attachments[
channel.attachment
].address
elif channel.attachment_nr:
channel.attachment_addr = 0
if (
channel.channel_type == v4c.CHANNEL_TYPE_SYNC
and channel.attachment is not None
):
channel.data_block_addr = self.attachments[
channel.attachment
].address
if progress is not None:
blocks_nr = len(blocks)
threshold = blocks_nr / 25
count = 1
for i, block in enumerate(blocks):
write(bytes(block))
if i >= threshold:
progress.signals.setValue.emit(75 + count)
count += 1
threshold += blocks_nr / 25
else:
for block in blocks:
write(bytes(block))
for gp, rec_id in zip(self.groups, gp_rec_ids):
gp.data_group.record_id_len = rec_id
if valid_data_groups:
addr_ = valid_data_groups[0].address
self.header.first_dg_addr = addr_
else:
self.header.first_dg_addr = 0
self.header.file_history_addr = self.file_history[0].address
if self.attachments:
first_attachment = self.attachments[0]
addr_ = first_attachment.address
self.header.first_attachment_addr = addr_
else:
self.header.first_attachment_addr = 0
seek(v4c.IDENTIFICATION_BLOCK_SIZE)
write(bytes(self.header))
for orig_addr, gp in zip(original_data_addresses, self.groups):
gp.data_group.data_block_addr = orig_addr
at_map = {value: key for key, value in at_map.items()}
for event in self.events:
for i in range(event.attachment_nr):
key = f"attachment_{i}_addr"
addr = event[key]
event[key] = at_map[addr]
except:
if not file_like:
dst_.close()
raise
else:
if not file_like:
dst_.close()
if suffix in (".zip", ".mf4z"):
output_fname = dst.with_suffix(suffix)
try:
zipped_mf4 = ZipFile(output_fname, "w", compression=ZIP_DEFLATED)
zipped_mf4.write(
str(dst),
dst.name,
compresslevel=1,
)
zipped_mf4.close()
os.remove(destination)
dst = output_fname
except:
pass
if dst == self.name:
self.close()
try:
Path.unlink(self.name)
Path.rename(destination, self.name)
except:
pass
self.groups.clear()
self.header = None
self.identification = None
self.file_history.clear()
self.channels_db.clear()
self.masters_db.clear()
self.attachments.clear()
self.file_comment = None
self._ch_map.clear()
self._tempfile = TemporaryFile(dir=self.temporary_folder)
self._file = open(self.name, "rb")
self._read()
return dst
[docs] def get_channel_name(self, group: int, index: int) -> str:
"""Gets channel name.
Parameters
----------
group : int
0-based group index
index : int
0-based channel index
Returns
-------
name : str
found channel name
"""
gp_nr, ch_nr = self._validate_channel_selection(None, group, index)
return self.groups[gp_nr].channels[ch_nr].name
def get_channel_metadata(
self,
name: str | None = None,
group: int | None = None,
index: int | None = None,
) -> Channel:
gp_nr, ch_nr = self._validate_channel_selection(name, group, index)
grp = self.groups[gp_nr]
channel = grp.channels[ch_nr]
return channel
[docs] def get_channel_unit(
self,
name: str | None = None,
group: int | None = None,
index: int | None = None,
) -> str:
"""Gets channel unit.
Channel can be specified in two ways:
* using the first positional argument *name*
* if there are multiple occurrences for this channel then the
*group* and *index* arguments can be used to select a specific
group.
* if there are multiple occurrences for this channel and either the
*group* or *index* arguments is None then a warning is issued
* using the group number (keyword argument *group*) and the channel
number (keyword argument *index*). Use *info* method for group and
channel numbers
If the *raster* keyword argument is not *None* the output is
interpolated accordingly.
Parameters
----------
name : string
name of channel
group : int
0-based group index
index : int
0-based channel index
Returns
-------
unit : str
found channel unit
"""
gp_nr, ch_nr = self._validate_channel_selection(name, group, index)
grp = self.groups[gp_nr]
channel = grp.channels[ch_nr]
conversion = channel.conversion
unit = conversion and conversion.unit or channel.unit or ""
return unit
def _finalize(self) -> None:
"""
Attempt finalization of the file.
:return: None
"""
flags = self.identification.unfinalized_standard_flags
stream = self._file
blocks, block_groups, addresses = all_blocks_addresses(stream)
stream.seek(0, 2)
limit = stream.tell()
mapped = self._mapped
if flags & v4c.FLAG_UNFIN_UPDATE_LAST_DL:
for dg_addr in block_groups[b"##DG\x00\x00"]:
group = DataGroup(address=dg_addr, stream=stream, mapped=mapped)
data_addr = group.data_block_addr
if not data_addr:
continue
stream.seek(data_addr)
blk_id = stream.read(4)
if blk_id == b"##DT":
continue
elif blk_id in (b"##DL", b"##HL"):
if blk_id == b"##HL":
hl = HeaderList(address=data_addr, stream=stream, mapped=mapped)
data_addr = hl.first_dl_addr
while True:
dl = DataList(address=data_addr, stream=stream, mapped=mapped)
if not dl.next_dl_addr:
break
kwargs = {}
count = dl.links_nr - 1
valid_count = 0
for i in range(count):
dt_addr = dl[f"data_block_addr{i}"]
if dt_addr:
valid_count += 1
kwargs[f"data_block_addr{i}"] = dt_addr
else:
break
starting_address = dl.address
next_block_position = bisect.bisect_right(
addresses, starting_address
)
# search for data blocks after the DLBLOCK
for j in range(i, count):
if next_block_position >= len(addresses):
break
next_block_address = addresses[next_block_position]
next_block_type = blocks[next_block_address]
if next_block_type not in {b"##DZ", b"##DT", b"##DV", b"##DI"}:
break
else:
stream.seek(next_block_address + v4c.DZ_INFO_COMMON_OFFSET)
if next_block_type == b"##DZ":
(
zip_type,
param,
original_size,
zip_size,
) = v4c.DZ_COMMON_INFO_uf(
stream.read(v4c.DZ_COMMON_INFO_SIZE)
)
exceeded = (
limit
- (
next_block_address
+ v4c.DZ_COMMON_SIZE
+ zip_size
)
< 0
)
else:
id_string, block_len = COMMON_SHORT_uf(
stream.read(v4c.COMMON_SIZE)
)
original_size = block_len - 24
exceeded = limit - (next_block_address + block_len) < 0
# update the data block size in case all links were NULL before
if i == 0 and (dl.flags & v4c.FLAG_DL_EQUAL_LENGHT):
kwargs["data_block_len"] = original_size
# check if the file limit is exceeded
if exceeded:
break
else:
next_block_position += 1
valid_count += 1
kwargs[f"data_block_addr{j}"] = next_block_address
kwargs["links_nr"] = valid_count + 1
kwargs["flags"] = dl.flags
if dl.flags & v4c.FLAG_DL_EQUAL_LENGHT:
kwargs["data_block_len"] = dl.data_block_len
else:
for i in enumerate(valid_count):
kwargs[f"offset_{i}"] = dl[f"offset_{i}"]
stream.seek(data_addr)
stream.write(bytes(DataList(**kwargs)))
self.identification[
"unfinalized_standard_flags"
] -= v4c.FLAG_UNFIN_UPDATE_LAST_DL
if flags & v4c.FLAG_UNFIN_UPDATE_LAST_DT_LENGTH:
try:
for dg_addr in block_groups[b"##DG\x00\x00"]:
group = DataGroup(address=dg_addr, stream=stream, mapped=mapped)
data_addr = group.data_block_addr
if not data_addr:
continue
stream.seek(data_addr)
blk_id = stream.read(4)
if blk_id == b"##DT":
blk = DataBlock(address=data_addr, stream=stream, mapped=mapped)
elif blk_id == b"##DL":
while True:
dl = DataList(
address=data_addr, stream=stream, mapped=mapped
)
if not dl.next_dl_addr:
break
data_addr = dl[f"data_block_addr{dl.links_nr - 2}"]
blk = DataBlock(address=data_addr, stream=stream, mapped=mapped)
elif blk_id == b"##HL":
hl = HeaderList(address=data_addr, stream=stream, mapped=mapped)
data_addr = hl.first_dl_addr
while True:
dl = DataList(
address=data_addr, stream=stream, mapped=mapped
)
if not dl.next_dl_addr:
break
data_addr = dl[f"data_block_addr{dl.links_nr - 2}"]
blk = DataBlock(address=data_addr, stream=stream, mapped=mapped)
next_block = bisect.bisect_right(addresses, data_addr)
if next_block == len(addresses):
block_len = limit - data_addr
else:
block_len = addresses[next_block] - data_addr
blk.block_len = block_len
stream.seek(data_addr)
stream.write(bytes(blk))
except:
print(format_exc())
raise
self.identification.unfinalized_standard_flags -= (
v4c.FLAG_UNFIN_UPDATE_LAST_DT_LENGTH
)
self.identification.file_identification = b"MDF "
def _sort(
self,
compress: bool = True,
current_progress_index: int = 0,
max_progress_count: int = 0,
progress=None,
) -> None:
if self._file is None:
return
flags = self.identification["unfinalized_standard_flags"]
common = defaultdict(list)
for i, group in enumerate(self.groups):
if group.sorted:
continue
try:
data_block = next(group.get_data_blocks())
common[data_block.address].append((i, group.channel_group.record_id))
except:
continue
read = self._file.read
seek = self._file.seek
self._tempfile.seek(0, 2)
tell = self._tempfile.tell
write = self._tempfile.write
for address, groups in common.items():
cg_map = {
rec_id: self.groups[index_].channel_group for index_, rec_id in groups
}
final_records = {id_: [] for (_, id_) in groups}
for rec_id, channel_group in cg_map.items():
if channel_group.address in self._cn_data_map:
dg_cntr, ch_cntr = self._cn_data_map[channel_group.address]
self.groups[dg_cntr].signal_data[ch_cntr] = ([], iter(EMPTY_TUPLE))
group = self.groups[groups[0][0]]
record_id_nr = group.data_group.record_id_len
cg_size = group.record_size
if record_id_nr == 1:
_unpack_stuct = UINT8_uf
elif record_id_nr == 2:
_unpack_stuct = UINT16_uf
elif record_id_nr == 4:
_unpack_stuct = UINT32_uf
elif record_id_nr == 8:
_unpack_stuct = UINT64_uf
else:
message = f"invalid record id size {record_id_nr}"
raise MdfException(message)
rem = b""
blocks = list(group.get_data_blocks()) # might be expensive ?
# most of the steps are for sorting, but the last 2 are after we've done sorting
# so remove the 2 steps that are not related to sorting from the count
step = float(SORT_STEPS - 2) / len(blocks) / len(common)
index = float(current_progress_index)
previous = index
for info in blocks:
dtblock_address, dtblock_raw_size, dtblock_size, block_type, param = (
info.address,
info.original_size,
info.compressed_size,
info.block_type,
info.param,
)
index += step
# if we've been told to notify about progress
# and we've been given a max progress count (only way we can do progress updates)
# and there's a tick update (at least 1 integer between the last update and the current index)
# then we can notify about the callback progress
if (
callable(progress)
and max_progress_count
and floor(previous) < floor(index)
):
progress(floor(index), max_progress_count)
previous = index
seek(dtblock_address)
if block_type != v4c.DT_BLOCK:
partial_records = {id_: [] for _, id_ in groups}
new_data = read(dtblock_size)
if block_type == v4c.DZ_BLOCK_DEFLATE:
new_data = decompress(new_data, bufsize=dtblock_raw_size)
elif block_type == v4c.DZ_BLOCK_TRANSPOSED:
new_data = decompress(new_data, bufsize=dtblock_raw_size)
cols = param
lines = dtblock_raw_size // cols
nd = fromstring(new_data[: lines * cols], dtype=uint8)
nd = nd.reshape((cols, lines))
new_data = nd.T.ravel().tobytes() + new_data[lines * cols :]
new_data = rem + new_data
try:
rem = sort_data_block(
new_data,
partial_records,
cg_size,
record_id_nr,
_unpack_stuct,
)
except:
print(format_exc())
raise
for rec_id, new_data in partial_records.items():
channel_group = cg_map[rec_id]
if channel_group.address in self._cn_data_map:
dg_cntr, ch_cntr = self._cn_data_map[channel_group.address]
else:
dg_cntr, ch_cntr = None, None
if new_data:
tempfile_address = tell()
new_data = b"".join(new_data)
original_size = len(new_data)
if original_size:
if compress:
new_data = lz_compress(new_data)
compressed_size = len(new_data)
write(new_data)
if dg_cntr is not None:
info = SignalDataBlockInfo(
address=tempfile_address,
compressed_size=compressed_size,
original_size=original_size,
block_type=v4c.DZ_BLOCK_LZ,
location=v4c.LOCATION_TEMPORARY_FILE,
)
self.groups[dg_cntr].signal_data[ch_cntr][
0
].append(info)
else:
block_info = DataBlockInfo(
address=tempfile_address,
block_type=v4c.DZ_BLOCK_LZ,
compressed_size=compressed_size,
original_size=original_size,
param=0,
)
final_records[rec_id].append(block_info)
else:
write(new_data)
if dg_cntr is not None:
info = SignalDataBlockInfo(
address=tempfile_address,
compressed_size=original_size,
original_size=original_size,
block_type=v4c.DT_BLOCK,
location=v4c.LOCATION_TEMPORARY_FILE,
)
self.groups[dg_cntr].signal_data[ch_cntr][
0
].append(info)
else:
block_info = DataBlockInfo(
address=tempfile_address,
block_type=v4c.DT_BLOCK,
compressed_size=original_size,
original_size=original_size,
param=0,
)
final_records[rec_id].append(block_info)
else: # DTBLOCK
seek(dtblock_address)
limit = 32 * 1024 * 1024 # 32MB
while dtblock_size:
if dtblock_size > limit:
dtblock_size -= limit
new_data = rem + read(limit)
else:
new_data = rem + read(dtblock_size)
dtblock_size = 0
partial_records = {id_: [] for _, id_ in groups}
rem = sort_data_block(
new_data,
partial_records,
cg_size,
record_id_nr,
_unpack_stuct,
)
for rec_id, new_data in partial_records.items():
channel_group = cg_map[rec_id]
if channel_group.address in self._cn_data_map:
dg_cntr, ch_cntr = self._cn_data_map[
channel_group.address
]
else:
dg_cntr, ch_cntr = None, None
if new_data:
tempfile_address = tell()
new_data = b"".join(new_data)
original_size = len(new_data)
if original_size:
if compress:
new_data = lz_compress(new_data)
compressed_size = len(new_data)
write(new_data)
if dg_cntr is not None:
info = SignalDataBlockInfo(
address=tempfile_address,
compressed_size=compressed_size,
original_size=original_size,
block_type=v4c.DZ_BLOCK_LZ,
location=v4c.LOCATION_TEMPORARY_FILE,
)
self.groups[dg_cntr].signal_data[ch_cntr][
0
].append(info)
else:
block_info = DataBlockInfo(
address=tempfile_address,
block_type=v4c.DZ_BLOCK_LZ,
compressed_size=compressed_size,
original_size=original_size,
param=None,
)
final_records[rec_id].append(block_info)
else:
write(new_data)
if dg_cntr is not None:
info = SignalDataBlockInfo(
address=tempfile_address,
compressed_size=original_size,
original_size=original_size,
block_type=v4c.DT_BLOCK,
location=v4c.LOCATION_TEMPORARY_FILE,
)
self.groups[dg_cntr].signal_data[ch_cntr][
0
].append(info)
else:
block_info = DataBlockInfo(
address=tempfile_address,
block_type=v4c.DT_BLOCK,
compressed_size=original_size,
original_size=original_size,
param=None,
)
final_records[rec_id].append(block_info)
# after we read all DTBLOCKs in the original file,
# we assign freshly created blocks from temporary file to
# corresponding groups.
for idx, rec_id in groups:
group = self.groups[idx]
group.data_location = v4c.LOCATION_TEMPORARY_FILE
group.set_blocks_info(final_records[rec_id])
group.sorted = True
for i, group in enumerate(self.groups):
if flags & v4c.FLAG_UNFIN_UPDATE_CG_COUNTER:
channel_group = group.channel_group
if channel_group.flags & v4c.FLAG_CG_VLSD:
continue
if (
self.version >= "4.20"
and channel_group.flags & v4c.FLAG_CG_REMOTE_MASTER
):
index = channel_group.cg_master_index
else:
index = i
if group.uses_ld:
samples_size = channel_group.samples_byte_nr
else:
samples_size = (
channel_group.samples_byte_nr
+ channel_group.invalidation_bytes_nr
)
total_size = sum(blk.original_size for blk in group.get_data_blocks())
cycles_nr = total_size // samples_size
virtual_channel_group = self.virtual_groups[index]
virtual_channel_group.cycles_nr = cycles_nr
channel_group.cycles_nr = cycles_nr
if (
self.identification["unfinalized_standard_flags"]
& v4c.FLAG_UNFIN_UPDATE_CG_COUNTER
):
self.identification[
"unfinalized_standard_flags"
] -= v4c.FLAG_UNFIN_UPDATE_CG_COUNTER
if (
self.identification["unfinalized_standard_flags"]
& v4c.FLAG_UNFIN_UPDATE_VLSD_BYTES
):
self.identification[
"unfinalized_standard_flags"
] -= v4c.FLAG_UNFIN_UPDATE_VLSD_BYTES
def _process_bus_logging(self) -> None:
groups_count = len(self.groups)
for index in range(groups_count):
group = self.groups[index]
if group.channel_group.flags & v4c.FLAG_CG_BUS_EVENT:
source = group.channel_group.acq_source
if (
source
and source.bus_type in (v4c.BUS_TYPE_CAN, v4c.BUS_TYPE_OTHER)
and "CAN_DataFrame" in [ch.name for ch in group.channels]
):
try:
self._process_can_logging(index, group)
except Exception as e:
message = f"Error during CAN logging processing: {format_exc()}"
logger.error(message)
if (
source
and source.bus_type in (v4c.BUS_TYPE_LIN, v4c.BUS_TYPE_OTHER)
and "LIN_Frame" in [ch.name for ch in group.channels]
):
try:
self._process_lin_logging(index, group)
except Exception as e:
message = f"Error during LIN logging processing: {e}"
logger.error(message)
def _process_can_logging(self, group_index: int, grp: Group) -> None:
channels = grp.channels
group = grp
dbc = None
for i, channel in enumerate(channels):
if channel.name == "CAN_DataFrame":
attachment_addr = channel.attachment
if attachment_addr is not None:
if attachment_addr not in self._dbc_cache:
attachment, at_name, md5_sum = self.extract_attachment(
index=attachment_addr,
)
if at_name.suffix.lower() not in (".arxml", ".dbc"):
message = f'Expected .dbc or .arxml file as CAN channel attachment but got "{at_name}"'
logger.warning(message)
elif not attachment:
message = f'Attachment "{at_name}" not found'
logger.warning(message)
else:
dbc = load_can_database(at_name, contents=attachment)
if dbc:
self._dbc_cache[attachment_addr] = dbc
else:
dbc = self._dbc_cache[attachment_addr]
break
if not group.channel_group.flags & v4c.FLAG_CG_PLAIN_BUS_EVENT:
self._prepare_record(group)
data = self._load_data(group, record_offset=0, record_count=1)
for fragment_index, fragment in enumerate(data):
self._set_temporary_master(None)
self._set_temporary_master(self.get_master(group_index, data=fragment))
bus_ids = self.get(
"CAN_DataFrame.BusChannel",
group=group_index,
data=fragment,
samples_only=True,
)[0].astype("<u1")
msg_ids = (
self.get(
"CAN_DataFrame.ID",
group=group_index,
data=fragment,
samples_only=True,
)[0].astype("<u4")
& 0x1FFFFFFF
)
if len(bus_ids) == 0:
continue
bus = bus_ids[0]
msg_id = msg_ids[0]
bus_map = self.bus_logging_map["CAN"].setdefault(bus, {})
bus_map[int(msg_id)] = group_index
self._set_temporary_master(None)
elif dbc is None:
self._prepare_record(group)
data = self._load_data(group, optimize_read=False)
for fragment_index, fragment in enumerate(data):
self._set_temporary_master(None)
self._set_temporary_master(self.get_master(group_index, data=fragment))
bus_ids = self.get(
"CAN_DataFrame.BusChannel",
group=group_index,
data=fragment,
samples_only=True,
)[0].astype("<u1")
msg_ids = (
self.get(
"CAN_DataFrame.ID",
group=group_index,
data=fragment,
samples_only=True,
)[0].astype("<u4")
& 0x1FFFFFFF
)
if len(bus_ids) == 0:
continue
buses = unique(bus_ids)
for bus in buses:
bus_msg_ids = msg_ids[bus_ids == bus]
unique_ids = unique(bus_msg_ids)
unique_ids.sort()
unique_ids = unique_ids.tolist()
bus_map = self.bus_logging_map["CAN"].setdefault(bus, {})
for msg_id in unique_ids:
bus_map[int(msg_id)] = group_index
self._set_temporary_master(None)
else:
is_j1939 = dbc.contains_j1939
if is_j1939:
messages = {message.arbitration_id.pgn: message for message in dbc}
else:
messages = {message.arbitration_id.id: message for message in dbc}
msg_map = {}
self._prepare_record(group)
data = self._load_data(group, optimize_read=False)
for fragment_index, fragment in enumerate(data):
self._set_temporary_master(None)
self._set_temporary_master(self.get_master(group_index, data=fragment))
data_bytes = self.get(
"CAN_DataFrame.DataBytes",
group=group_index,
data=fragment,
samples_only=True,
)[0]
bus_ids = self.get(
"CAN_DataFrame.BusChannel",
group=group_index,
data=fragment,
samples_only=True,
)[0].astype("<u1")
msg_ids = (
self.get(
"CAN_DataFrame.ID", group=group_index, data=fragment
).astype("<u4")
& 0x1FFFFFFF
)
if is_j1939:
tmp_pgn = msg_ids.samples >> 8
ps = tmp_pgn & 0xFF
pf = (msg_ids.samples >> 16) & 0xFF
_pgn = tmp_pgn & 0x3FF00
msg_ids.samples = where(pf >= 240, _pgn + ps, _pgn)
buses = unique(bus_ids)
if len(bus_ids) == 0:
continue
for bus in buses:
idx_ = bus_ids == bus
bus_msg_ids = msg_ids.samples[idx_]
bus_t = msg_ids.timestamps[idx_]
bus_data_bytes = data_bytes[idx_]
unique_ids = sorted(unique(bus_msg_ids).astype("<u8"))
bus_map = self.bus_logging_map["CAN"].setdefault(bus, {})
for msg_id in unique_ids:
bus_map[int(msg_id)] = group_index
for msg_id in unique_ids:
message = messages.get(msg_id, None)
if message is None:
continue
idx = bus_msg_ids == msg_id
payload = bus_data_bytes[idx]
t = bus_t[idx]
extracted_signals = extract_mux(
payload, message, msg_id, bus, t
)
for entry, signals in extracted_signals.items():
if len(next(iter(signals.values()))["samples"]) == 0:
continue
if entry not in msg_map:
sigs = []
for name_, signal in signals.items():
sig = Signal(
samples=signal["samples"],
timestamps=signal["t"],
name=signal["name"],
comment=signal["comment"],
unit=signal["unit"],
invalidation_bits=signal["invalidation_bits"],
display_names={
f"{message.name}.{signal['name']}": "message_name",
f"CAN{bus}.{message.name}.{signal['name']}": "bus_name",
},
)
sigs.append(sig)
cg_nr = self.append(
sigs,
acq_name=f"from CAN{bus} message ID=0x{msg_id:X}",
comment=f"{message} 0x{msg_id:X}",
common_timebase=True,
)
msg_map[entry] = cg_nr
for ch_index, ch in enumerate(
self.groups[cg_nr].channels
):
if ch_index == 0:
continue
entry = cg_nr, ch_index
name_ = f"{message}.{ch.name}"
self.channels_db.add(name_, entry)
name_ = f"CAN{bus}.{message}.{ch.name}"
self.channels_db.add(name_, entry)
name_ = f"CAN_DataFrame_{msg_id}.{ch.name}"
self.channels_db.add(name_, entry)
name_ = f"CAN{bus}.CAN_DataFrame_{msg_id}.{ch.name}"
self.channels_db.add(name_, entry)
else:
index = msg_map[entry]
sigs = []
for name_, signal in signals.items():
sigs.append(
(signal["samples"], signal["invalidation_bits"])
)
t = signal["t"]
sigs.insert(0, (t, None))
self.extend(index, sigs)
self._set_temporary_master(None)
def _process_lin_logging(self, group_index: int, grp: Group) -> None:
channels = grp.channels
group = grp
dbc = None
for i, channel in enumerate(channels):
if channel.name == "LIN_Frame":
attachment_addr = channel.attachment
if attachment_addr is not None:
if attachment_addr not in self._dbc_cache:
attachment, at_name, md5_sum = self.extract_attachment(
index=attachment_addr,
)
if at_name.suffix.lower() not in (".arxml", ".dbc", ".ldf"):
message = f'Expected .dbc, .arxml or .ldf file as LIN channel attachment but got "{at_name}"'
logger.warning(message)
elif not attachment:
message = f'Attachment "{at_name}" not found'
logger.warning(message)
else:
contents = (
None if at_name.suffix.lower() == ".ldf" else attachment
)
dbc = load_can_database(at_name, contents=contents)
if dbc:
self._dbc_cache[attachment_addr] = dbc
else:
dbc = self._dbc_cache[attachment_addr]
break
if dbc is None:
self._prepare_record(group)
data = self._load_data(group, optimize_read=False)
for fragment_index, fragment in enumerate(data):
self._set_temporary_master(None)
self._set_temporary_master(self.get_master(group_index, data=fragment))
msg_ids = (
self.get(
"LIN_Frame.ID",
group=group_index,
data=fragment,
samples_only=True,
)[0].astype("<u4")
& 0x1FFFFFFF
)
unique_ids = sorted(unique(msg_ids).astype("<u8"))
lin_map = self.bus_logging_map["LIN"]
for msg_id in unique_ids:
lin_map[int(msg_id)] = group_index
self._set_temporary_master(None)
else:
messages = {message.arbitration_id.id: message for message in dbc}
msg_map = {}
self._prepare_record(group)
data = self._load_data(group, optimize_read=False)
for fragment_index, fragment in enumerate(data):
self._set_temporary_master(None)
self._set_temporary_master(self.get_master(group_index, data=fragment))
msg_ids = (
self.get("LIN_Frame.ID", group=group_index, data=fragment).astype(
"<u4"
)
& 0x1FFFFFFF
)
data_bytes = self.get(
"LIN_Frame.DataBytes",
group=group_index,
data=fragment,
samples_only=True,
)[0]
bus_msg_ids = msg_ids.samples
bus_t = msg_ids.timestamps
bus_data_bytes = data_bytes
unique_ids = sorted(unique(bus_msg_ids).astype("<u8"))
lin_map = self.bus_logging_map["LIN"]
for msg_id in unique_ids:
lin_map[int(msg_id)] = group_index
for msg_id in unique_ids:
message = messages.get(msg_id, None)
if message is None:
continue
idx = bus_msg_ids == msg_id
payload = bus_data_bytes[idx]
t = bus_t[idx]
extracted_signals = extract_mux(payload, message, msg_id, 0, t)
for entry, signals in extracted_signals.items():
if len(next(iter(signals.values()))["samples"]) == 0:
continue
if entry not in msg_map:
sigs = []
for name_, signal in signals.items():
sig = Signal(
samples=signal["samples"],
timestamps=signal["t"],
name=signal["name"],
comment=signal["comment"],
unit=signal["unit"],
invalidation_bits=signal["invalidation_bits"],
display_names={
f"{message.name}.{signal['name']}": "message_name",
f"LIN.{message.name}.{signal['name']}": "bus_name",
},
)
sigs.append(sig)
cg_nr = self.append(
sigs,
acq_name=f"from LIN message ID=0x{msg_id:X}",
comment=f"{message} 0x{msg_id:X}",
common_timebase=True,
)
msg_map[entry] = cg_nr
for ch_index, ch in enumerate(self.groups[cg_nr].channels):
if ch_index == 0:
continue
entry = cg_nr, ch_index
name_ = f"{message}.{ch.name}"
self.channels_db.add(name_, entry)
name_ = f"LIN.{message}.{ch.name}"
self.channels_db.add(name_, entry)
name_ = f"LIN_Frame_{msg_id}.{ch.name}"
self.channels_db.add(name_, entry)
name_ = f"LIN.LIN_Frame_{msg_id}.{ch.name}"
self.channels_db.add(name_, entry)
else:
index = msg_map[entry]
sigs = []
for name_, signal in signals.items():
sigs.append(
(signal["samples"], signal["invalidation_bits"])
)
t = signal["t"]
sigs.insert(0, (t, None))
self.extend(index, sigs)
self._set_temporary_master(None)
def reload_header(self):
self.header = HeaderBlock(address=0x40, stream=self._file)