"""Classes that implement the blocks for MDF versions 2 and 3"""
from collections.abc import Iterator, Sequence
from datetime import datetime, timedelta, timezone
from getpass import getuser
import logging
from struct import pack, unpack, unpack_from
import sys
from textwrap import wrap
from traceback import format_exc
import typing
from typing import Final
import xml.etree.ElementTree as ET
import dateutil.tz
from numexpr import evaluate
import numpy as np
from numpy.typing import ArrayLike, NDArray
from typing_extensions import Any, overload, SupportsBytes, TypedDict, Unpack
from .. import tool
from . import v2_v3_constants as v23c
from .utils import (
BlockKwargs,
get_fields,
get_text_v3,
MdfException,
UINT16_u,
UINT16_uf,
)
try:
from sympy import lambdify, symbols
except:
lambdify, symbols = None, None
SEEK_START: Final = v23c.SEEK_START
SEEK_END: Final = v23c.SEEK_END
CHANNEL_DISPLAYNAME_u = v23c.CHANNEL_DISPLAYNAME_u
CHANNEL_DISPLAYNAME_uf = v23c.CHANNEL_DISPLAYNAME_uf
CHANNEL_LONGNAME_u = v23c.CHANNEL_LONGNAME_u
CHANNEL_LONGNAME_uf = v23c.CHANNEL_LONGNAME_uf
CHANNEL_SHORT_u = v23c.CHANNEL_SHORT_u
CHANNEL_SHORT_uf = v23c.CHANNEL_SHORT_uf
COMMON_uf = v23c.COMMON_uf
COMMON_u = v23c.COMMON_u
CONVERSION_COMMON_SHORT_uf = v23c.CONVERSION_COMMON_SHORT_uf
SOURCE_COMMON_uf = v23c.SOURCE_COMMON_uf
SOURCE_EXTRA_ECU_uf = v23c.SOURCE_EXTRA_ECU_uf
SOURCE_EXTRA_VECTOR_uf = v23c.SOURCE_EXTRA_VECTOR_uf
SOURCE_COMMON_u = v23c.SOURCE_COMMON_u
SOURCE_EXTRA_ECU_u = v23c.SOURCE_EXTRA_ECU_u
SOURCE_EXTRA_VECTOR_u = v23c.SOURCE_EXTRA_VECTOR_u
logger = logging.getLogger("asammdf")
__all__ = [
"Channel",
"ChannelConversion",
"ChannelDependency",
"ChannelExtension",
"ChannelGroup",
"DataBlock",
"DataGroup",
"FileIdentificationBlock",
"HeaderBlock",
"ProgramBlock",
"TextBlock",
"TriggerBlock",
]
class ChannelKwargs(BlockKwargs, total=False):
parsed_strings: tuple[str, dict[str, str]] | None
cc_map: dict[bytes | int, "ChannelConversion"]
si_map: dict[bytes | int, "ChannelExtension"]
block_len: int
next_ch_addr: int
conversion_addr: int
source_addr: int
component_addr: int
comment_addr: int
channel_type: int
short_name: bytes
description: bytes
start_offset: int
bit_count: int
data_type: int
range_flag: int
min_raw_value: float
max_raw_value: float
sampling_rate: float
long_name_addr: int
display_name_addr: int
additional_byte_offset: int
[docs]
class Channel:
"""CNBLOCK class.
If the `load_metadata` keyword argument is not provided or is False,
then the conversion, source and display name information is not processed.
CNBLOCK fields:
* ``id`` - bytes : block ID; always b'CN'
* ``block_len`` - int : block bytes size
* ``next_ch_addr`` - int : next CNBLOCK address
* ``conversion_addr`` - int : address of channel conversion block
* ``source_addr`` - int : address of channel source block
* ``component_addr`` - int : address of dependency block (CDBLOCK) of this
channel
* ``comment_addr`` - int : address of TXBLOCK that contains the
channel comment
* ``channel_type`` - int : integer code for channel type
* ``short_name`` - bytes : short signal name
* ``description`` - bytes : signal description
* ``start_offset`` - int : start offset in bits to determine the first bit
of the signal in the data record
* ``bit_count`` - int : channel bit count
* ``data_type`` - int : integer code for channel data type
* ``range_flag`` - int : value range valid flag
* ``min_raw_value`` - float : min raw value of all samples
* ``max_raw_value`` - float : max raw value of all samples
* ``sampling_rate`` - float : sampling rate in *'s'* for a virtual time
channel
* ``long_name_addr`` - int : address of TXBLOCK that contains the channel's
name
* ``display_name_addr`` - int : address of TXBLOCK that contains the
channel's display name
* ``additional_byte_offset`` - int : additional Byte offset of the channel
in the data record
Other attributes:
* ``address`` - int : block address inside MDF file
* ``comment`` - str : channel comment
* ``conversion`` - ChannelConversion : channel conversion; None if the
channel has no conversion
* ``display_names`` - dict : channel display names
.. versionchanged:: 7.0.0
Changed from str to dict.
* ``name`` - str : full channel name
* ``source`` - SourceInformation : channel source information; None if the
channel has no source information
Parameters
----------
address : int
Block address; to be used for objects created from file.
stream : handle
File handle; to be used for objects created from file.
load_metadata : bool
Option to load conversion, source and display_names; default is True.
for dynamically created objects :
See the key-value pairs.
Examples
--------
>>> with open('test.mdf', 'rb') as mdf:
... ch1 = Channel(stream=mdf, address=0xBA52)
>>> ch2 = Channel()
>>> ch1.name
'VehicleSpeed'
>>> ch1['id']
b'CN'
"""
__slots__ = (
"additional_byte_offset",
"address",
"bit_count",
"block_len",
"channel_type",
"comment",
"comment_addr",
"component_addr",
"conversion",
"conversion_addr",
"data_type",
"description",
"display_name_addr",
"display_names",
"dtype_fmt",
"id",
"long_name_addr",
"max_raw_value",
"min_raw_value",
"name",
"next_ch_addr",
"range_flag",
"sampling_rate",
"short_name",
"source",
"source_addr",
"start_offset",
"unit",
)
def __init__(self, **kwargs: Unpack[ChannelKwargs]) -> None:
super().__init__()
self.name = self.comment = self.unit = ""
self.display_names: dict[str, str] = {}
self.conversion = self.source = None
self.dtype_fmt: np.dtype[Any] | None = None
try:
stream = kwargs["stream"]
self.address = address = kwargs["address"]
mapped = kwargs.get("mapped", False)
if mapped:
(size,) = UINT16_uf(stream, address + 2)
if size == v23c.CN_DISPLAYNAME_BLOCK_SIZE:
(
self.id,
self.block_len,
self.next_ch_addr,
self.conversion_addr,
self.source_addr,
self.component_addr,
self.comment_addr,
self.channel_type,
self.short_name,
self.description,
self.start_offset,
self.bit_count,
self.data_type,
self.range_flag,
self.min_raw_value,
self.max_raw_value,
self.sampling_rate,
self.long_name_addr,
self.display_name_addr,
self.additional_byte_offset,
) = CHANNEL_DISPLAYNAME_uf(stream, address)
parsed_strings = kwargs["parsed_strings"]
if parsed_strings is None:
addr = self.long_name_addr
if addr:
self.name = get_text_v3(address=addr, stream=stream, mapped=mapped)
else:
self.name = self.short_name.decode("latin-1").strip(" \t\n\r\0")
addr = self.display_name_addr
if addr:
self.display_names = {
get_text_v3(address=addr, stream=stream, mapped=mapped): "display_name",
}
else:
self.name, self.display_names = parsed_strings
elif size == v23c.CN_LONGNAME_BLOCK_SIZE:
(
self.id,
self.block_len,
self.next_ch_addr,
self.conversion_addr,
self.source_addr,
self.component_addr,
self.comment_addr,
self.channel_type,
self.short_name,
self.description,
self.start_offset,
self.bit_count,
self.data_type,
self.range_flag,
self.min_raw_value,
self.max_raw_value,
self.sampling_rate,
self.long_name_addr,
) = CHANNEL_LONGNAME_uf(stream, address)
parsed_strings = kwargs["parsed_strings"]
if parsed_strings is None:
addr = self.long_name_addr
if addr:
self.name = get_text_v3(address=addr, stream=stream, mapped=mapped)
else:
self.name = self.short_name.decode("latin-1").strip(" \t\n\r\0")
else:
self.name, self.display_names = parsed_strings
else:
(
self.id,
self.block_len,
self.next_ch_addr,
self.conversion_addr,
self.source_addr,
self.component_addr,
self.comment_addr,
self.channel_type,
self.short_name,
self.description,
self.start_offset,
self.bit_count,
self.data_type,
self.range_flag,
self.min_raw_value,
self.max_raw_value,
self.sampling_rate,
) = CHANNEL_SHORT_uf(stream, address)
self.name = self.short_name.decode("latin-1").strip(" \t\n\r\0")
cc_map = kwargs.get("cc_map", {})
si_map = kwargs.get("si_map", {})
address = self.conversion_addr
if address:
try:
if address in cc_map:
conv = cc_map[address]
else:
(size,) = UINT16_uf(stream, address + 2)
raw_bytes = stream[address : address + size]
if raw_bytes in cc_map:
conv = cc_map[raw_bytes]
else:
conv = ChannelConversion(
raw_bytes=raw_bytes,
stream=stream,
address=address,
mapped=mapped,
)
cc_map[raw_bytes] = cc_map[address] = conv
except:
logger.warning(
f"Channel conversion parsing error: {format_exc()}. The error is ignored and the channel conversion is None"
)
conv = None
self.conversion = conv
address = self.source_addr
if address:
try:
if address in si_map:
source = si_map[address]
else:
raw_bytes = stream[address : address + v23c.CE_BLOCK_SIZE]
if raw_bytes in si_map:
source = si_map[raw_bytes]
else:
source = ChannelExtension(
raw_bytes=raw_bytes,
stream=stream,
address=address,
mapped=mapped,
)
si_map[raw_bytes] = si_map[address] = source
except:
logger.warning(
f"Channel source parsing error: {format_exc()}. The error is ignored and the channel source is None"
)
source = None
self.source = source
self.comment = get_text_v3(address=self.comment_addr, stream=stream, mapped=mapped)
else:
stream.seek(address + 2)
(size,) = UINT16_u(stream.read(2))
stream.seek(address)
block = stream.read(size)
if size == v23c.CN_DISPLAYNAME_BLOCK_SIZE:
(
self.id,
self.block_len,
self.next_ch_addr,
self.conversion_addr,
self.source_addr,
self.component_addr,
self.comment_addr,
self.channel_type,
self.short_name,
self.description,
self.start_offset,
self.bit_count,
self.data_type,
self.range_flag,
self.min_raw_value,
self.max_raw_value,
self.sampling_rate,
self.long_name_addr,
self.display_name_addr,
self.additional_byte_offset,
) = CHANNEL_DISPLAYNAME_u(block)
parsed_strings = kwargs["parsed_strings"]
if parsed_strings is None:
addr = self.long_name_addr
if addr:
self.name = get_text_v3(address=addr, stream=stream, mapped=mapped)
else:
self.name = self.short_name.decode("latin-1").strip(" \t\n\r\0")
addr = self.display_name_addr
if addr:
self.display_names = {
get_text_v3(address=addr, stream=stream, mapped=mapped): "display_name",
}
else:
self.name, self.display_names = parsed_strings
elif size == v23c.CN_LONGNAME_BLOCK_SIZE:
(
self.id,
self.block_len,
self.next_ch_addr,
self.conversion_addr,
self.source_addr,
self.component_addr,
self.comment_addr,
self.channel_type,
self.short_name,
self.description,
self.start_offset,
self.bit_count,
self.data_type,
self.range_flag,
self.min_raw_value,
self.max_raw_value,
self.sampling_rate,
self.long_name_addr,
) = CHANNEL_LONGNAME_u(block)
parsed_strings = kwargs["parsed_strings"]
if parsed_strings is None:
addr = self.long_name_addr
if addr:
self.name = get_text_v3(address=addr, stream=stream, mapped=mapped)
else:
self.name = self.short_name.decode("latin-1").strip(" \t\n\r\0")
else:
self.name, self.display_names = parsed_strings
else:
(
self.id,
self.block_len,
self.next_ch_addr,
self.conversion_addr,
self.source_addr,
self.component_addr,
self.comment_addr,
self.channel_type,
self.short_name,
self.description,
self.start_offset,
self.bit_count,
self.data_type,
self.range_flag,
self.min_raw_value,
self.max_raw_value,
self.sampling_rate,
) = CHANNEL_SHORT_u(block)
self.name = self.short_name.decode("latin-1").strip(" \t\n\r\0")
cc_map = kwargs.get("cc_map", {})
si_map = kwargs.get("si_map", {})
address = self.conversion_addr
if address:
try:
if address in cc_map:
conv = cc_map[address]
else:
stream.seek(address + 2)
(size,) = UINT16_u(stream.read(2))
stream.seek(address)
raw_bytes = stream.read(size)
if raw_bytes in cc_map:
conv = cc_map[raw_bytes]
else:
conv = ChannelConversion(
raw_bytes=raw_bytes,
stream=stream,
address=address,
mapped=mapped,
)
cc_map[raw_bytes] = cc_map[address] = conv
except:
logger.warning(
f"Channel conversion parsing error: {format_exc()}. The error is ignored and the channel conversion is None"
)
conv = None
self.conversion = conv
address = self.source_addr
if address:
try:
if address in si_map:
source = si_map[address]
else:
stream.seek(address)
raw_bytes = stream.read(v23c.CE_BLOCK_SIZE)
if raw_bytes in si_map:
source = si_map[raw_bytes]
else:
source = ChannelExtension(
raw_bytes=raw_bytes,
stream=stream,
address=address,
mapped=mapped,
)
si_map[raw_bytes] = si_map[address] = source
except:
logger.warning(
f"Channel source parsing error: {format_exc()}. The error is ignored and the channel source is None"
)
source = None
self.source = source
self.comment = get_text_v3(address=self.comment_addr, stream=stream, mapped=mapped)
if self.id != b"CN":
message = f'Expected "CN" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
except KeyError:
self.address = 0
self.id = b"CN"
self.block_len = kwargs.get("block_len", v23c.CN_DISPLAYNAME_BLOCK_SIZE)
self.next_ch_addr = kwargs.get("next_ch_addr", 0)
self.conversion_addr = kwargs.get("conversion_addr", 0)
self.source_addr = kwargs.get("source_addr", 0)
self.component_addr = kwargs.get("component_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.channel_type = kwargs.get("channel_type", 0)
self.short_name = kwargs.get("short_name", (b"\0" * 32))
self.description = kwargs.get("description", (b"\0" * 128))
self.start_offset = kwargs.get("start_offset", 0)
self.bit_count = kwargs.get("bit_count", 8)
self.data_type = kwargs.get("data_type", 0)
self.range_flag = kwargs.get("range_flag", 1)
self.min_raw_value = kwargs.get("min_raw_value", 0)
self.max_raw_value = kwargs.get("max_raw_value", 0)
self.sampling_rate = kwargs.get("sampling_rate", 0)
if self.block_len >= v23c.CN_LONGNAME_BLOCK_SIZE:
self.long_name_addr = kwargs.get("long_name_addr", 0)
if self.block_len >= v23c.CN_DISPLAYNAME_BLOCK_SIZE:
self.display_name_addr = kwargs.get("display_name_addr", 0)
self.additional_byte_offset = kwargs.get("additional_byte_offset", 0)
if self.name in self.display_names:
del self.display_names[self.name]
def to_blocks(
self,
address: int,
blocks: list[bytes | SupportsBytes],
defined_texts: dict[bytes | str, int],
cc_map: dict[bytes, int],
si_map: dict[bytes, int],
) -> int:
text = self.name
if self.block_len >= v23c.CN_LONGNAME_BLOCK_SIZE:
if len(text) > 31:
if text in defined_texts:
self.long_name_addr = defined_texts[text]
else:
tx_block = TextBlock(text=text)
self.long_name_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.long_name_addr = 0
self.short_name = text.encode("latin-1", "backslashreplace")[:31]
text = list(self.display_names)[0] if self.display_names else ""
if self.block_len >= v23c.CN_DISPLAYNAME_BLOCK_SIZE:
if text:
if text in defined_texts:
self.display_name_addr = defined_texts[text]
else:
tx_block = TextBlock(text=text)
self.display_name_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.display_name_addr = 0
text = self.comment
if text:
if len(text) < 128:
self.description = text.encode("latin-1", "backslashreplace")[:127]
self.comment_addr = 0
else:
if text in defined_texts:
self.comment_addr = defined_texts[text]
else:
tx_block = TextBlock(text=text)
self.comment_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
self.description = b"\0"
else:
self.comment_addr = 0
conversion = self.conversion
if conversion:
address = conversion.to_blocks(address, blocks, defined_texts, cc_map)
self.conversion_addr = conversion.address
else:
self.conversion_addr = 0
source = self.source
if source:
address = source.to_blocks(address, blocks, defined_texts, si_map)
self.source_addr = source.address
else:
self.source_addr = 0
blocks.append(self)
self.address = address
address += self.block_len
return address
def metadata(self) -> str:
max_len = max(len(key) for key in self)
template = f"{{: <{max_len}}}: {{}}"
metadata: list[str] = []
lines = f"""
name: {self.name}
display names: {self.display_names}
address: {hex(self.address)}
comment: {self.comment}
""".split("\n")
keys = (
"id",
"block_len",
"next_ch_addr",
"conversion_addr",
"source_addr",
"component_addr",
"comment_addr",
"channel_type",
"short_name",
"description",
"start_offset",
"bit_count",
"data_type",
"range_flag",
"min_raw_value",
"max_raw_value",
"sampling_rate",
"long_name_addr",
"display_name_addr",
"additional_byte_offset",
)
for key in keys:
if not hasattr(self, key):
continue
val = getattr(self, key)
if key.endswith("addr") or key.startswith("text_"):
lines.append(template.format(key, hex(val)))
elif isinstance(val, float):
lines.append(template.format(key, val))
else:
if isinstance(val, bytes):
lines.append(template.format(key, val.strip(b"\0")))
else:
lines.append(template.format(key, val))
if key == "data_type":
lines[-1] += f" = {v23c.DATA_TYPE_TO_STRING[self.data_type]}"
elif key == "channel_type":
lines[-1] += f" = {v23c.CHANNEL_TYPE_TO_STRING[self.channel_type]}"
for line in lines:
if not line:
metadata.append(line)
else:
for wrapped_line in wrap(line, width=120):
metadata.append(wrapped_line)
return "\n".join(metadata)
def __bytes__(self) -> bytes:
block_len = self.block_len
if block_len == v23c.CN_DISPLAYNAME_BLOCK_SIZE:
return v23c.CHANNEL_DISPLAYNAME_p(
self.id,
self.block_len,
self.next_ch_addr,
self.conversion_addr,
self.source_addr,
self.component_addr,
self.comment_addr,
self.channel_type,
self.short_name,
self.description,
self.start_offset,
self.bit_count,
self.data_type,
self.range_flag,
self.min_raw_value,
self.max_raw_value,
self.sampling_rate,
self.long_name_addr,
self.display_name_addr,
self.additional_byte_offset,
)
elif block_len == v23c.CN_LONGNAME_BLOCK_SIZE:
return v23c.CHANNEL_LONGNAME_p(
self.id,
self.block_len,
self.next_ch_addr,
self.conversion_addr,
self.source_addr,
self.component_addr,
self.comment_addr,
self.channel_type,
self.short_name,
self.description,
self.start_offset,
self.bit_count,
self.data_type,
self.range_flag,
self.min_raw_value,
self.max_raw_value,
self.sampling_rate,
self.long_name_addr,
)
else:
return v23c.CHANNEL_SHORT_p(
self.id,
self.block_len,
self.next_ch_addr,
self.conversion_addr,
self.source_addr,
self.component_addr,
self.comment_addr,
self.channel_type,
self.short_name,
self.description,
self.start_offset,
self.bit_count,
self.data_type,
self.range_flag,
self.min_raw_value,
self.max_raw_value,
self.sampling_rate,
)
def __getitem__(self, item: str) -> object:
return self.__getattribute__(item)
def __setitem__(self, item: str, value: object) -> None:
self.__setattr__(item, value)
def __contains__(self, item: str) -> bool:
return hasattr(self, item)
def __iter__(self) -> Iterator[str]:
for attr in dir(self):
if attr[:2] + attr[-2:] == "____":
continue
try:
if callable(getattr(self, attr)):
continue
yield attr
except AttributeError:
continue
def __lt__(self, other: "Channel") -> int:
self_start = self.start_offset
other_start = other.start_offset
try:
self_additional_offset = self.additional_byte_offset
if self_additional_offset:
self_start += 8 * self_additional_offset
other_additional_offset = other.additional_byte_offset
if other_additional_offset:
other_start += 8 * other_additional_offset
except AttributeError:
pass
if self_start < other_start:
result = 1
elif self_start == other_start:
if self.bit_count >= other.bit_count:
result = 1
else:
result = 0
else:
result = 0
return result
def __repr__(self) -> str:
fields = []
for attr in dir(self):
if attr[:2] + attr[-2:] == "____":
continue
try:
if callable(getattr(self, attr)):
continue
fields.append(f"{attr}:{getattr(self, attr)}")
except AttributeError:
continue
return f"Channel (name: {self.name}, display names: {self.display_names}, comment: {self.comment}, address: {hex(self.address)}, fields: {fields})"
class _ChannelConversionBase:
__slots__ = (
"P1",
"P2",
"P3",
"P4",
"P5",
"P6",
"P7",
"a",
"address",
"b",
"block_len",
"comment_addr",
"conversion_type",
"flags",
"formula",
"formula_field",
"id",
"inv_conv_addr",
"max_phy_value",
"min_phy_value",
"precision",
"ref_param_nr",
"referenced_blocks",
"reserved0",
"unit",
"unit_field",
"val_param_nr",
)
class ChannelConversionKwargs(BlockKwargs, total=False):
raw_bytes: bytes
unit: bytes
conversion_type: int
range_flag: int
min_phy_value: float
max_phy_value: float
b: float
a: float
P1: float
P2: float
P3: float
P4: float
P5: float
P6: float
P7: float
formula: bytes | str
ref_param_nr: int
CANapeHiddenExtra: bytes
default_addr: bytes
[docs]
class ChannelConversion(_ChannelConversionBase):
"""CCBLOCK class.
`ChannelConversion` has the following common fields:
* ``id`` - bytes : block ID; always b'CC'
* ``block_len`` - int : block bytes size
* ``range_flag`` - int : value range valid flag
* ``min_phy_value`` - float : min raw value of all samples
* ``max_phy_value`` - float : max raw value of all samples
* ``unit`` - bytes : physical unit
* ``conversion_type`` - int : integer code for conversion type
* ``ref_param_nr`` - int : number of referenced parameters
`ChannelConversion` has the following specific fields:
* linear conversion
* ``a`` - float : factor
* ``b`` - float : offset
* ``CANapeHiddenExtra`` - bytes : sometimes CANape appends extra
information; not compliant with MDF specs
* algebraic conversion
* ``formula`` - bytes : equation as string
* polynomial or rational conversion
* ``P1`` to ``P6`` - float : parameters
* exponential or logarithmic conversion
* ``P1`` to ``P7`` - float : parameters
* tabular with or without interpolation (grouped by index)
* ``raw_<N>`` - int : N-th raw value (X axis)
* ``phys_<N>`` - float : N-th physical value (Y axis)
* text table conversion
* ``param_val_<N>`` - int : N-th raw value (X axis)
* ``text_<N>`` - N-th text physical value (Y axis)
* text range table conversion
* ``default_lower`` - float : default lower raw value
* ``default_upper`` - float : default upper raw value
* ``default_addr`` - int : address of default text physical value
* ``lower_<N>`` - float : N-th lower raw value
* ``upper_<N>`` - float : N-th upper raw value
* ``text_<N>`` - int : address of N-th text physical value
Other attributes:
* ``address`` - int : block address inside MDF file
* ``formula`` - str : formula string in case of algebraic conversion
* ``referenced_blocks`` - list : list of CCBLOCK/TXBLOCK referenced by the
conversion
* ``unit`` - str : physical unit
Parameters
----------
address : int
Block address inside MDF file.
raw_bytes : bytes
Complete block read from disk.
stream : file handle
MDF file handle.
for dynamically created objects :
See the key-value pairs.
Examples
--------
>>> with open('test.mdf', 'rb') as mdf:
... cc1 = ChannelConversion(stream=mdf, address=0xBA52)
>>> cc2 = ChannelConversion(conversion_type=0)
>>> cc1['b'], cc1['a']
0, 100.0
"""
def __init__(self, **kwargs: Unpack[ChannelConversionKwargs]) -> None:
super().__init__()
self.is_user_defined = False
self.unit = self.formula = ""
self.referenced_blocks: dict[str, bytes | ChannelConversion] = {}
if "raw_bytes" in kwargs or "stream" in kwargs:
mapped = kwargs.get("mapped", False)
self.address = address = kwargs.get("address", 0)
try:
block = kwargs["raw_bytes"]
self.id, self.block_len = COMMON_uf(block)
size = self.block_len
block_size = len(block)
block = block[4:]
stream = kwargs["stream"]
except KeyError:
stream = kwargs["stream"]
if mapped:
self.id, self.block_len = COMMON_uf(stream, address)
block_size = size = self.block_len
block = stream[address + 4 : address + size]
else:
stream.seek(address)
block = stream.read(4)
self.id, self.block_len = COMMON_u(block)
block_size = size = self.block_len
block = stream.read(size - 4)
(
self.range_flag,
self.min_phy_value,
self.max_phy_value,
self.unit_field,
self.conversion_type,
self.ref_param_nr,
) = CONVERSION_COMMON_SHORT_uf(block)
self.unit = self.unit_field.decode("latin-1").strip(" \t\r\n\0")
conv_type = self.conversion_type
if conv_type == v23c.CONVERSION_TYPE_LINEAR:
self.b, self.a = typing.cast(tuple[float, float], unpack_from("<2d", block, v23c.CC_COMMON_SHORT_SIZE))
if size != v23c.CC_LIN_BLOCK_SIZE:
self.CANapeHiddenExtra = block[v23c.CC_LIN_BLOCK_SIZE - 4 :]
size = len(self.CANapeHiddenExtra)
nr = size // 40
values = unpack_from("<" + "d32s" * nr, block, v23c.CC_COMMON_SHORT_SIZE)
for i in range(nr):
self[f"param_val_{i}"], self[f"text_{i}"] = (
values[i * 2],
values[2 * i + 1],
)
elif conv_type == v23c.CONVERSION_TYPE_NONE:
pass
elif conv_type == v23c.CONVERSION_TYPE_FORMULA:
self.formula_field = block[v23c.CC_COMMON_SHORT_SIZE :]
self.formula = self.formula_field.decode("latin-1").strip(" \t\r\n\0").replace("x", "X")
if "X1" not in self.formula:
self.formula = self.formula.replace("X", "X1")
elif conv_type in (v23c.CONVERSION_TYPE_TABI, v23c.CONVERSION_TYPE_TAB):
nr = self.ref_param_nr
size = v23c.CC_COMMON_BLOCK_SIZE + nr * 16
if block_size == v23c.MAX_UINT16:
stream.seek(address)
raw_bytes = stream.read(size)
conversion = ChannelConversion(raw_bytes=raw_bytes, stream=stream, address=address)
conversion.block_len = size
# self.update(conversion)
self.referenced_blocks = conversion.referenced_blocks
else:
float_values: tuple[float, ...] = unpack_from(f"<{2 * nr}d", block, v23c.CC_COMMON_SHORT_SIZE)
for i in range(nr):
self[f"raw_{i}"], self[f"phys_{i}"] = (
float_values[i * 2],
float_values[2 * i + 1],
)
elif conv_type in (v23c.CONVERSION_TYPE_POLY, v23c.CONVERSION_TYPE_RAT):
self.P1, self.P2, self.P3, self.P4, self.P5, self.P6 = typing.cast(
tuple[float, float, float, float, float, float],
unpack_from("<6d", block, v23c.CC_COMMON_SHORT_SIZE),
)
elif conv_type in (v23c.CONVERSION_TYPE_EXPO, v23c.CONVERSION_TYPE_LOGH):
(
self.P1,
self.P2,
self.P3,
self.P4,
self.P5,
self.P6,
self.P7,
) = typing.cast(
tuple[float, float, float, float, float, float, float],
unpack_from("<7d", block, v23c.CC_COMMON_SHORT_SIZE),
)
elif conv_type == v23c.CONVERSION_TYPE_TABX:
nr = self.ref_param_nr
size = v23c.CC_COMMON_BLOCK_SIZE + nr * 40
if block_size == v23c.MAX_UINT16:
stream.seek(address)
raw_bytes = stream.read(size)
conversion = ChannelConversion(raw_bytes=raw_bytes, stream=stream, address=address)
conversion.block_len = size
for attr in get_fields(conversion):
setattr(self, attr, getattr(conversion, attr))
self.referenced_blocks = conversion.referenced_blocks
else:
values = unpack_from("<" + "d32s" * nr, block, v23c.CC_COMMON_SHORT_SIZE)
for i in range(nr):
self[f"param_val_{i}"], self[f"text_{i}"] = (
values[i * 2],
values[2 * i + 1],
)
elif conv_type == v23c.CONVERSION_TYPE_RTABX:
nr = self.ref_param_nr - 1
size = v23c.CC_COMMON_BLOCK_SIZE + (nr + 1) * 20
if block_size == v23c.MAX_UINT16:
stream.seek(address)
raw_bytes = stream.read(size)
conversion = ChannelConversion(raw_bytes=raw_bytes, stream=stream, address=address)
conversion.block_len = size
for attr in get_fields(conversion):
setattr(self, attr, getattr(conversion, attr))
self.referenced_blocks = conversion.referenced_blocks
else:
(
self.default_lower,
self.default_upper,
self.default_addr,
) = typing.cast(tuple[float, float, int], unpack_from("<2dI", block, v23c.CC_COMMON_SHORT_SIZE))
if self.default_addr:
self.referenced_blocks["default_addr"] = get_text_v3(
address=self.default_addr,
stream=stream,
mapped=mapped,
decode=False,
)
else:
self.referenced_blocks["default_addr"] = b""
values = unpack_from("<" + "2dI" * nr, block, v23c.CC_COMMON_SHORT_SIZE + 20)
for i in range(nr):
self[f"lower_{i}"], self[f"upper_{i}"], self[f"text_{i}"] = (
values[i * 3],
values[3 * i + 1],
values[3 * i + 2],
)
if values[3 * i + 2]:
block = get_text_v3(
address=values[3 * i + 2],
stream=stream,
mapped=mapped,
decode=False,
)
self.referenced_blocks[f"text_{i}"] = block
else:
self.referenced_blocks[f"text_{i}"] = b""
if self.id != b"CC":
message = f'Expected "CC" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
else:
self.address = 0
self.id = b"CC"
self.unit_field = kwargs.get("unit", b"")
if kwargs["conversion_type"] == v23c.CONVERSION_TYPE_NONE:
self.block_len = v23c.CC_COMMON_BLOCK_SIZE
self.range_flag = kwargs.get("range_flag", 1)
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
self.unit_field = kwargs.get("unit", ("\0" * 20).encode("latin-1"))
self.conversion_type = v23c.CONVERSION_TYPE_NONE
self.ref_param_nr = 0
elif kwargs["conversion_type"] == v23c.CONVERSION_TYPE_LINEAR:
self.block_len = v23c.CC_LIN_BLOCK_SIZE
self.range_flag = kwargs.get("range_flag", 1)
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
self.unit_field = kwargs.get("unit", ("\0" * 20).encode("latin-1"))
self.conversion_type = v23c.CONVERSION_TYPE_LINEAR
self.ref_param_nr = 2
self.b = kwargs.get("b", 0)
self.a = kwargs.get("a", 1)
if self.block_len != v23c.CC_LIN_BLOCK_SIZE:
self.CANapeHiddenExtra = kwargs["CANapeHiddenExtra"]
elif kwargs["conversion_type"] in (
v23c.CONVERSION_TYPE_POLY,
v23c.CONVERSION_TYPE_RAT,
):
self.block_len = v23c.CC_POLY_BLOCK_SIZE
self.range_flag = kwargs.get("range_flag", 1)
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
self.unit_field = kwargs.get("unit", ("\0" * 20).encode("latin-1"))
self.conversion_type = kwargs["conversion_type"]
self.ref_param_nr = 6
self.P1 = kwargs.get("P1", 0)
self.P2 = kwargs.get("P2", 0)
self.P3 = kwargs.get("P3", 0)
self.P4 = kwargs.get("P4", 0)
self.P5 = kwargs.get("P5", 0)
self.P6 = kwargs.get("P6", 0)
elif kwargs["conversion_type"] in (
v23c.CONVERSION_TYPE_EXPO,
v23c.CONVERSION_TYPE_LOGH,
):
self.block_len = v23c.CC_EXPO_BLOCK_SIZE
self.range_flag = kwargs.get("range_flag", 1)
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
self.unit_field = kwargs.get("unit", ("\0" * 20).encode("latin-1"))
self.conversion_type = kwargs["conversion_type"]
self.ref_param_nr = 7
self.P1 = kwargs.get("P1", 0)
self.P2 = kwargs.get("P2", 0)
self.P3 = kwargs.get("P3", 0)
self.P4 = kwargs.get("P4", 0)
self.P5 = kwargs.get("P5", 0)
self.P6 = kwargs.get("P6", 0)
self.P7 = kwargs.get("P7", 0)
elif kwargs["conversion_type"] == v23c.CONVERSION_TYPE_FORMULA:
formula = kwargs["formula"]
formula_len = len(formula)
if isinstance(formula, bytes):
self.formula = formula.decode("latin-1")
formula += b"\0"
else:
self.formula = formula
formula = formula.encode("latin-1") + b"\0"
self.block_len = 46 + formula_len + 1
self.range_flag = kwargs.get("range_flag", 1)
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
self.unit_field = kwargs.get("unit", ("\0" * 20).encode("latin-1"))
self.conversion_type = v23c.CONVERSION_TYPE_FORMULA
self.ref_param_nr = formula_len
self.formula_field = formula
elif kwargs["conversion_type"] in (
v23c.CONVERSION_TYPE_TABI,
v23c.CONVERSION_TYPE_TAB,
):
nr = kwargs["ref_param_nr"]
self.block_len = v23c.CC_COMMON_BLOCK_SIZE + nr * 2 * 8
self.range_flag = kwargs.get("range_flag", 1)
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
self.unit_field = kwargs.get("unit", ("\0" * 20).encode("latin-1"))
self.conversion_type = kwargs["conversion_type"]
self.ref_param_nr = nr
for i in range(nr):
self[f"raw_{i}"] = kwargs[f"raw_{i}"] # type: ignore[literal-required]
self[f"phys_{i}"] = kwargs[f"phys_{i}"] # type: ignore[literal-required]
elif kwargs["conversion_type"] == v23c.CONVERSION_TYPE_TABX:
nr = kwargs["ref_param_nr"]
self.block_len = v23c.CC_COMMON_BLOCK_SIZE + 40 * nr
self.range_flag = kwargs.get("range_flag", 0)
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
self.unit_field = kwargs.get("unit", ("\0" * 20).encode("latin-1"))
self.conversion_type = v23c.CONVERSION_TYPE_TABX
self.ref_param_nr = nr
for i in range(nr):
self[f"param_val_{i}"] = kwargs[f"param_val_{i}"] # type: ignore[literal-required]
self[f"text_{i}"] = kwargs[f"text_{i}"] # type: ignore[literal-required]
elif kwargs["conversion_type"] == v23c.CONVERSION_TYPE_RTABX:
nr = kwargs["ref_param_nr"]
self.block_len = v23c.CC_COMMON_BLOCK_SIZE + 20 * nr
self.range_flag = kwargs.get("range_flag", 0)
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
self.unit_field = kwargs.get("unit", ("\0" * 20).encode("latin-1"))
self.conversion_type = v23c.CONVERSION_TYPE_RTABX
self.ref_param_nr = nr
self.default_lower = 0
self.default_upper = 0
self.default_addr = 0
key = "default_addr"
if key in kwargs:
self.referenced_blocks[key] = kwargs[key] # type: ignore[literal-required]
else:
self.referenced_blocks[key] = b""
for i in range(nr - 1):
self[f"lower_{i}"] = kwargs[f"lower_{i}"] # type: ignore[literal-required]
self[f"upper_{i}"] = kwargs[f"upper_{i}"] # type: ignore[literal-required]
key = f"text_{i}"
self.default_addr = 0
self.referenced_blocks[key] = kwargs[key] # type: ignore[literal-required]
else:
message = f'Conversion type "{kwargs["conversion_type"]}" not implemented'
logger.exception(message)
raise MdfException(message)
def to_blocks(
self,
address: int,
blocks: list[bytes | SupportsBytes],
defined_texts: dict[bytes | str, int],
cc_map: dict[bytes, int],
) -> int:
self.unit_field = self.unit.encode("latin-1", "ignore")[:19]
if self.conversion_type == v23c.CONVERSION_TYPE_FORMULA:
formula = self.formula
if not formula.endswith("\0"):
formula += "\0"
self.formula_field = formula.encode("latin-1")
self.block_len = v23c.CC_COMMON_BLOCK_SIZE + len(self.formula_field)
for key, block in self.referenced_blocks.items():
if block:
if isinstance(block, ChannelConversion):
address = block.to_blocks(address, blocks, defined_texts, cc_map)
self[key] = block.address
else:
text = block
if text in defined_texts:
self[key] = defined_texts[text]
else:
tx_block = TextBlock(text=text)
defined_texts[text] = address
blocks.append(tx_block)
self[key] = address
address += tx_block.block_len
else:
self[key] = 0
bts = bytes(self)
if bts in cc_map:
self.address = cc_map[bts]
else:
blocks.append(bts)
self.address = address
cc_map[bts] = address
size = self.block_len
address += len(bts)
return address
def metadata(self, indent: str = "") -> str:
conv = self.conversion_type
keys: Sequence[str]
if conv == v23c.CONVERSION_TYPE_NONE:
keys = v23c.KEYS_CONVERSION_NONE
elif conv == v23c.CONVERSION_TYPE_FORMULA:
keys = v23c.KEYS_CONVERSION_FORMULA
elif conv == v23c.CONVERSION_TYPE_LINEAR:
keys = v23c.KEYS_CONVERSION_LINEAR
if self.block_len != v23c.CC_LIN_BLOCK_SIZE:
keys += ("CANapeHiddenExtra",)
nr = self.ref_param_nr
new_keys: list[str] = []
for i in range(nr):
new_keys.extend((f"param_val_{i}", f"text_{i}"))
keys += tuple(new_keys)
elif conv in (v23c.CONVERSION_TYPE_POLY, v23c.CONVERSION_TYPE_RAT):
keys = v23c.KEYS_CONVERSION_POLY_RAT
elif conv in (v23c.CONVERSION_TYPE_EXPO, v23c.CONVERSION_TYPE_LOGH):
keys = v23c.KEYS_CONVERSION_EXPO_LOGH
elif conv in (v23c.CONVERSION_TYPE_TABI, v23c.CONVERSION_TYPE_TAB):
nr = self.ref_param_nr
keys = list(v23c.KEYS_CONVERSION_NONE)
for i in range(nr):
keys.append(f"raw_{i}")
keys.append(f"phys_{i}")
elif conv == v23c.CONVERSION_TYPE_RTABX:
nr = self.ref_param_nr
keys = list(v23c.KEYS_CONVERSION_NONE)
keys += ["default_lower", "default_upper", "default_addr"]
for i in range(nr - 1):
keys.append(f"lower_{i}")
keys.append(f"upper_{i}")
keys.append(f"text_{i}")
elif conv == v23c.CONVERSION_TYPE_TABX:
nr = self.ref_param_nr
keys = list(v23c.KEYS_CONVERSION_NONE)
for i in range(nr):
keys.append(f"param_val_{i}")
keys.append(f"text_{i}")
max_len = max(len(key) for key in keys)
template = f"{{: <{max_len}}}: {{}}"
metadata: list[str] = []
lines = f"""
address: {hex(self.address)}
""".split("\n")
for key in keys:
val = getattr(self, key)
if key.endswith("addr") or (key.startswith("text_") and isinstance(val, int)):
lines.append(template.format(key, hex(val)))
elif isinstance(val, float):
lines.append(template.format(key, val))
else:
if isinstance(val, bytes):
lines.append(template.format(key, val.strip(b"\0")))
else:
lines.append(template.format(key, val))
if key == "conversion_type":
lines[-1] += f" [{v23c.CONVERSION_TYPE_TO_STRING[self.conversion_type]}]"
elif self.referenced_blocks and key in self.referenced_blocks:
val = self.referenced_blocks[key]
if isinstance(val, bytes):
lines[-1] += f" (= {str(val)[1:]})"
else:
lines[-1] += f" (= CCBLOCK @ {hex(val.address)})"
if self.referenced_blocks:
max_len = max(len(key) for key in self.referenced_blocks)
template = f"{{: <{max_len}}}: {{}}"
lines.append("")
lines.append("Referenced blocks:")
for key, block in self.referenced_blocks.items():
if isinstance(block, ChannelConversion):
lines.append(template.format(key, ""))
lines.extend(block.metadata(indent + " ").split("\n"))
else:
lines.append(template.format(key, str(block)[1:]))
for line in lines:
if not line:
metadata.append(line)
else:
for wrapped_line in wrap(line, initial_indent=indent, subsequent_indent=indent, width=120):
metadata.append(wrapped_line)
return "\n".join(metadata)
@overload
def convert(
self,
values: NDArray[Any],
as_object: bool = ...,
as_bytes: bool = ...,
ignore_value2text_conversions: bool = ...,
) -> NDArray[Any]: ...
@overload
def convert(
self,
values: ArrayLike,
as_object: bool = ...,
as_bytes: bool = ...,
ignore_value2text_conversions: bool = ...,
) -> NDArray[Any] | np.number[Any]: ...
def convert(
self,
values: ArrayLike,
as_object: bool = False,
as_bytes: bool = False,
ignore_value2text_conversions: bool = False,
) -> NDArray[Any] | np.number[Any]:
conversion_type = self.conversion_type
scalar = False
if not isinstance(values, np.ndarray):
if isinstance(values, (int, float)):
new_values = np.array([values])
scalar = True
else:
new_values = np.array(values)
else:
new_values = values
if conversion_type == v23c.CONVERSION_TYPE_NONE:
pass
elif conversion_type == v23c.CONVERSION_TYPE_LINEAR:
a = self.a
b = self.b
if (a, b) != (1, 0):
new_values = new_values * a
if b:
new_values += b
elif conversion_type in (v23c.CONVERSION_TYPE_TABI, v23c.CONVERSION_TYPE_TAB):
nr = self.ref_param_nr
raw_vals = np.array([self[f"raw_{i}"] for i in range(nr)])
phys = np.array([self[f"phys_{i}"] for i in range(nr)])
if conversion_type == v23c.CONVERSION_TYPE_TABI:
new_values = np.interp(new_values, raw_vals, phys)
else:
dim = raw_vals.shape[0]
inds = np.searchsorted(raw_vals, new_values)
inds[inds >= dim] = dim - 1 # type: ignore[index, unused-ignore]
inds2 = inds - 1
inds2[inds2 < 0] = 0 # type: ignore[index, unused-ignore]
cond = np.abs(new_values - raw_vals[inds]) >= np.abs(new_values - raw_vals[inds2])
new_values = np.where(cond, phys[inds2], phys[inds])
elif conversion_type == v23c.CONVERSION_TYPE_TABX:
if not ignore_value2text_conversions:
nr = self.ref_param_nr
raw_vals = np.array([self[f"param_val_{i}"] for i in range(nr)])
phys = np.array([self[f"text_{i}"] for i in range(nr)])
x = sorted(zip(raw_vals, phys, strict=False))
raw_vals = np.array([e[0] for e in x], dtype="<i8")
phys = np.array([e[1] for e in x])
default = b""
idx1 = np.searchsorted(raw_vals, new_values, side="right") - 1
idx2 = np.searchsorted(raw_vals, new_values, side="left")
idx = np.argwhere(idx1 != idx2).flatten()
new_values = np.zeros(len(new_values), dtype=max(phys.dtype, np.array([default]).dtype))
new_values[idx] = default
idx = np.argwhere(idx1 == idx2).flatten()
if len(idx):
new_values[idx] = phys[idx1[idx]] # type: ignore[index, unused-ignore]
elif conversion_type == v23c.CONVERSION_TYPE_RTABX:
if not ignore_value2text_conversions:
nr = self.ref_param_nr - 1
phys_list: list[bytes | ChannelConversion | None] = []
for i in range(nr):
value = self.referenced_blocks[f"text_{i}"]
phys_list.append(value)
phys = np.array(phys_list)
default = typing.cast(bytes, self.referenced_blocks["default_addr"])
if b"{X}" in default:
default_text = default.decode("latin-1").replace("{X}", "X").split('"')[1]
else:
default_text = None
lower = np.array([self[f"lower_{i}"] for i in range(nr)])
upper = np.array([self[f"upper_{i}"] for i in range(nr)])
idx1 = np.searchsorted(lower, new_values, side="right") - 1
idx2 = np.searchsorted(upper, new_values, side="left")
idx = np.argwhere(idx1 != idx2).flatten()
if default_text and len(idx):
X = new_values[idx]
new_values = np.zeros(len(new_values), dtype=np.float64)
a = float(default_text.split("*")[0])
b = float(default_text.split("X")[-1])
new_values[idx] = a * X + b
idx = np.argwhere(idx1 == idx2).flatten()
if len(idx):
new_values[idx] = np.nan
else:
if len(idx):
new_values = np.zeros(len(new_values), dtype=max(phys.dtype, np.array([default]).dtype))
new_values[idx] = default
idx = np.argwhere(idx1 == idx2).flatten()
if len(idx):
new_values[idx] = phys[idx1[idx]] # type: ignore[index, unused-ignore]
else:
new_values = phys[idx1]
elif conversion_type in (v23c.CONVERSION_TYPE_EXPO, v23c.CONVERSION_TYPE_LOGH):
# pylint: disable=C0103
func: np.ufunc
if conversion_type == v23c.CONVERSION_TYPE_EXPO:
func = np.exp
else:
func = np.log
P1 = self.P1
P2 = self.P2
P3 = self.P3
P4 = self.P4
P5 = self.P5
P6 = self.P6
P7 = self.P7
if P4 == 0:
new_values = func(((new_values - P7) * P6 - P3) / P1) / P2
elif P1 == 0:
new_values = func((P3 / (new_values - P7) - P6) / P4) / P5
else:
message = f"wrong conversion {conversion_type}"
raise ValueError(message)
elif conversion_type == v23c.CONVERSION_TYPE_RAT:
# pylint: disable=unused-variable,C0103
P1 = self.P1
P2 = self.P2
P3 = self.P3
P4 = self.P4
P5 = self.P5
P6 = self.P6
X = new_values
if (P1, P3, P4, P5) == (0, 0, 0, 0):
if P2 != P6:
new_values = new_values * (P2 / P6)
elif (P2, P3, P4, P6) == (0, 0, 0, 0):
if P1 != P5:
new_values = new_values * (P1 / P5)
else:
try:
new_values = evaluate(v23c.RAT_CONV_TEXT)
except TypeError:
new_values = (P1 * X**2 + P2 * X + P3) / (P4 * X**2 + P5 * X + P6)
elif conversion_type == v23c.CONVERSION_TYPE_POLY:
# pylint: disable=unused-variable,C0103
P1 = self.P1
P2 = self.P2
P3 = self.P3
P4 = self.P4
P5 = self.P5
P6 = self.P6
X = new_values
coefs = (P2, P3, P5, P6)
if coefs == (0, 0, 0, 0):
if P1 != P4:
try:
new_values = evaluate(v23c.POLY_CONV_SHORT_TEXT)
except TypeError:
new_values = P4 * X / P1
else:
try:
new_values = evaluate(v23c.POLY_CONV_LONG_TEXT)
except TypeError:
new_values = (P2 - (P4 * (X - P5 - P6))) / (P3 * (X - P5 - P6) - P1)
elif conversion_type == v23c.CONVERSION_TYPE_FORMULA:
# pylint: disable=unused-variable,C0103
try:
X1 = new_values
INF = np.inf
NaN = np.nan
new_values = evaluate(self.formula)
except:
if lambdify is not None and symbols is not None:
X_symbol = symbols("X1")
expr = lambdify(
X_symbol,
self.formula,
modules=[{"INF": np.inf, "NaN": np.nan}, "numpy"],
dummify=False,
cse=True,
)
new_values = expr(new_values)
if scalar:
return typing.cast(np.number[Any], new_values[0])
else:
return new_values
def __getitem__(self, item: str) -> object:
return self.__getattribute__(item)
def __setitem__(self, item: str, value: object) -> None:
self.__setattr__(item, value)
def __bytes__(self) -> bytes:
conv = self.conversion_type
# compute the fmt
if conv == v23c.CONVERSION_TYPE_NONE:
fmt = v23c.FMT_CONVERSION_COMMON
elif conv == v23c.CONVERSION_TYPE_FORMULA:
fmt = v23c.FMT_CONVERSION_FORMULA.format(self.block_len - v23c.CC_COMMON_BLOCK_SIZE)
elif conv == v23c.CONVERSION_TYPE_LINEAR:
fmt = v23c.FMT_CONVERSION_LINEAR
if self.block_len != v23c.CC_LIN_BLOCK_SIZE:
fmt += f"{self.block_len - v23c.CC_LIN_BLOCK_SIZE}s"
elif conv in (v23c.CONVERSION_TYPE_POLY, v23c.CONVERSION_TYPE_RAT):
fmt = v23c.FMT_CONVERSION_POLY_RAT
elif conv in (v23c.CONVERSION_TYPE_EXPO, v23c.CONVERSION_TYPE_LOGH):
fmt = v23c.FMT_CONVERSION_EXPO_LOGH
elif conv in (v23c.CONVERSION_TYPE_TABI, v23c.CONVERSION_TYPE_TAB):
nr = self.ref_param_nr
fmt = v23c.FMT_CONVERSION_COMMON + f"{2 * nr}d"
elif conv == v23c.CONVERSION_TYPE_RTABX:
nr = self.ref_param_nr
fmt = v23c.FMT_CONVERSION_COMMON + "2dI" * nr
elif conv == v23c.CONVERSION_TYPE_TABX:
nr = self.ref_param_nr
fmt = v23c.FMT_CONVERSION_COMMON + "d32s" * nr
keys: Sequence[str]
if conv == v23c.CONVERSION_TYPE_NONE:
keys = v23c.KEYS_CONVERSION_NONE
elif conv == v23c.CONVERSION_TYPE_FORMULA:
keys = v23c.KEYS_CONVERSION_FORMULA
elif conv == v23c.CONVERSION_TYPE_LINEAR:
keys = v23c.KEYS_CONVERSION_LINEAR
if self.block_len != v23c.CC_LIN_BLOCK_SIZE:
keys += ("CANapeHiddenExtra",)
elif conv in (v23c.CONVERSION_TYPE_POLY, v23c.CONVERSION_TYPE_RAT):
keys = v23c.KEYS_CONVERSION_POLY_RAT
elif conv in (v23c.CONVERSION_TYPE_EXPO, v23c.CONVERSION_TYPE_LOGH):
keys = v23c.KEYS_CONVERSION_EXPO_LOGH
elif conv in (v23c.CONVERSION_TYPE_TABI, v23c.CONVERSION_TYPE_TAB):
nr = self.ref_param_nr
keys = list(v23c.KEYS_CONVERSION_NONE)
for i in range(nr):
keys.append(f"raw_{i}")
keys.append(f"phys_{i}")
elif conv == v23c.CONVERSION_TYPE_RTABX:
nr = self.ref_param_nr
keys = list(v23c.KEYS_CONVERSION_NONE)
keys += ["default_lower", "default_upper", "default_addr"]
for i in range(nr - 1):
keys.append(f"lower_{i}")
keys.append(f"upper_{i}")
keys.append(f"text_{i}")
elif conv == v23c.CONVERSION_TYPE_TABX:
nr = self.ref_param_nr
keys = list(v23c.KEYS_CONVERSION_NONE)
for i in range(nr):
keys.append(f"param_val_{i}")
keys.append(f"text_{i}")
self.block_len = min(self.block_len, v23c.MAX_UINT16)
result = pack(fmt, *[self[key] for key in keys])
return result
def __str__(self) -> str:
fields = []
for attr in dir(self):
if attr[:2] + attr[-2:] == "____":
continue
try:
if callable(getattr(self, attr)):
continue
fields.append(f"{attr}:{getattr(self, attr)}")
except AttributeError:
continue
return f"ChannelConversion (referenced blocks: {self.referenced_blocks}, address: {hex(self.address)}, fields: {fields})"
class ChannelDependencyKwargs(BlockKwargs, total=False):
sd_nr: int
[docs]
class ChannelDependency:
"""CDBLOCK class.
CDBLOCK fields:
* ``id`` - bytes : block ID; always b'CD'
* ``block_len`` - int : block bytes size
* ``dependency_type`` - int : integer code for dependency type
* ``sd_nr`` - int : total number of signals dependencies
* ``dg_<N>`` - address of data group block (DGBLOCK) of N-th
signal dependency
* ``dg_<N>`` - address of channel group block (CGBLOCK) of N-th
signal dependency
* ``dg_<N>`` - address of channel block (CNBLOCK) of N-th
signal dependency
* ``dim_<K>`` - int : Optional size of dimension *K* for N-dimensional
dependency
Other attributes:
* ``address`` - int : block address inside MDF file
* ``referenced_channels`` - list : list of (group index, channel index)
pairs
Parameters
----------
stream : file handle
MDF file handle.
address : int
Block address inside MDF file.
for dynamically created objects :
See the key-value pairs.
"""
def __init__(self, **kwargs: Unpack[ChannelDependencyKwargs]) -> None:
super().__init__()
self.referenced_channels: list[tuple[int, int]] = []
try:
stream = kwargs["stream"]
self.address = address = kwargs["address"]
stream.seek(address)
self.id, self.block_len, self.dependency_type, self.sd_nr = typing.cast(
tuple[bytes, int, int, int], unpack("<2s3H", stream.read(8))
)
links_size = 3 * 4 * self.sd_nr
links: tuple[int, ...] = unpack(f"<{3 * self.sd_nr}I", stream.read(links_size))
for i in range(self.sd_nr):
self[f"dg_{i}"] = links[3 * i]
self[f"cg_{i}"] = links[3 * i + 1]
self[f"ch_{i}"] = links[3 * i + 2]
optional_dims_nr = (self.block_len - 8 - links_size) // 2
if optional_dims_nr:
dims: tuple[int, ...] = unpack(f"<{optional_dims_nr}H", stream.read(optional_dims_nr * 2))
for i, dim in enumerate(dims):
self[f"dim_{i}"] = dim
if self.id != b"CD":
message = f'Expected "CD" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
except KeyError:
sd_nr = kwargs["sd_nr"]
self.id = b"CD"
self.block_len = 8 + 3 * 4 * sd_nr
self.dependency_type = 1
self.sd_nr = sd_nr
for i in range(sd_nr):
self[f"dg_{i}"] = 0
self[f"cg_{i}"] = 0
self[f"ch_{i}"] = 0
i = 0
while True:
try:
self[f"dim_{i}"] = kwargs[f"dim_{i}"] # type: ignore[literal-required]
i += 1
except KeyError:
break
if i:
self.dependency_type = 256 + i
self.block_len += 2 * i
def __getitem__(self, item: str) -> object:
return self.__getattribute__(item)
def __setitem__(self, item: str, value: object) -> None:
self.__setattr__(item, value)
def __bytes__(self) -> bytes:
fmt = f"<2s3H{self.sd_nr * 3}I"
keys: tuple[str, ...] = ("id", "block_len", "dependency_type", "sd_nr")
for i in range(self.sd_nr):
keys += (f"dg_{i}", f"cg_{i}", f"ch_{i}")
links_size = 3 * 4 * self.sd_nr
option_dims_nr = (self.block_len - 8 - links_size) // 2
if option_dims_nr:
fmt += f"{option_dims_nr}H"
keys += tuple(f"dim_{i}" for i in range(option_dims_nr))
result = pack(fmt, *[self[key] for key in keys])
return result
class ChannelExtensionKwargs(BlockKwargs, total=False):
raw_bytes: bytes
block_len: int
type: int
module_nr: int
module_address: int
description: bytes
ECU_identification: bytes
reserved0: bytes
CAN_id: int
CAN_ch_index: int
message_name: bytes
sender_name: bytes
[docs]
class ChannelExtension:
"""CEBLOCK class.
CEBLOCK has the following common fields:
* ``id`` - bytes : block ID; always b'CE'
* ``block_len`` - int : block bytes size
* ``type`` - int : extension type identifier
CEBLOCK has the following specific fields:
* for DIM block
* ``module_nr`` - int: module number
* ``module_address`` - int : module address
* ``description`` - bytes : module description
* ``ECU_identification`` - bytes : identification of ECU
* ``reserved0`` - bytes : reserved bytes
* for Vector CAN block
* ``CAN_id`` - int : CAN message ID
* ``CAN_ch_index`` - int : index of CAN channel
* ``message_name`` - bytes : message name
* ``sender_name`` - bytes : sender name
* ``reserved0`` - bytes : reserved bytes
Other attributes:
* ``address`` - int : block address inside MDF file
* ``comment`` - str : extension comment
* ``name`` - str : extension name
* ``path`` - str : extension path
Parameters
----------
stream : file handle
MDF file handle.
address : int
Block address inside MDF file.
for dynamically created objects :
See the key-value pairs.
"""
__slots__ = (
"CAN_ch_index",
"CAN_id",
"ECU_identification",
"address",
"block_len",
"comment",
"description",
"id",
"message_name",
"module_address",
"module_nr",
"name",
"path",
"reserved0",
"sender_name",
"type",
)
def __init__(self, **kwargs: Unpack[ChannelExtensionKwargs]) -> None:
super().__init__()
self.name = self.path = self.comment = ""
if "stream" in kwargs:
stream = kwargs["stream"]
try:
self.id, self.block_len, self.type = SOURCE_COMMON_uf(kwargs["raw_bytes"])
if self.type == v23c.SOURCE_ECU:
(
self.module_nr,
self.module_address,
self.description,
self.ECU_identification,
self.reserved0,
) = SOURCE_EXTRA_ECU_uf(kwargs["raw_bytes"], 6)
elif self.type == v23c.SOURCE_VECTOR:
(
self.CAN_id,
self.CAN_ch_index,
self.message_name,
self.sender_name,
self.reserved0,
) = SOURCE_EXTRA_VECTOR_uf(kwargs["raw_bytes"], 6)
self.address = kwargs.get("address", 0)
except KeyError:
if kwargs.get("mapped", False):
self.address = address = kwargs["address"]
self.id, self.block_len, self.type = SOURCE_COMMON_uf(stream, address)
if self.type == v23c.SOURCE_ECU:
(
self.module_nr,
self.module_address,
self.description,
self.ECU_identification,
self.reserved0,
) = SOURCE_EXTRA_ECU_uf(stream, address + 6)
elif self.type == v23c.SOURCE_VECTOR:
(
self.CAN_id,
self.CAN_ch_index,
self.message_name,
self.sender_name,
self.reserved0,
) = SOURCE_EXTRA_VECTOR_uf(stream, address + 6)
else:
self.address = address = kwargs["address"]
stream.seek(address)
block = stream.read(v23c.CE_BLOCK_SIZE)
self.id, self.block_len, self.type = SOURCE_COMMON_uf(block)
if self.type == v23c.SOURCE_ECU:
(
self.module_nr,
self.module_address,
self.description,
self.ECU_identification,
self.reserved0,
) = SOURCE_EXTRA_ECU_uf(block, 6)
elif self.type == v23c.SOURCE_VECTOR:
(
self.CAN_id,
self.CAN_ch_index,
self.message_name,
self.sender_name,
self.reserved0,
) = SOURCE_EXTRA_VECTOR_uf(block, 6)
if self.id != b"CE":
message = f'Expected "CE" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
else:
self.address = 0
self.id = b"CE"
self.block_len = kwargs.get("block_len", v23c.CE_BLOCK_SIZE)
self.type = kwargs.get("type", 2)
if self.type == v23c.SOURCE_ECU:
self.module_nr = kwargs.get("module_nr", 0)
self.module_address = kwargs.get("module_address", 0)
self.description = kwargs.get("description", b"\0")
self.ECU_identification = kwargs.get("ECU_identification", b"\0")
self.reserved0 = kwargs.get("reserved0", b"\0")
elif self.type == v23c.SOURCE_VECTOR:
self.CAN_id = kwargs.get("CAN_id", 0)
self.CAN_ch_index = kwargs.get("CAN_ch_index", 0)
self.message_name = kwargs.get("message_name", b"\0")
self.sender_name = kwargs.get("sender_name", b"\0")
self.reserved0 = kwargs.get("reserved0", b"\0")
if self.type == v23c.SOURCE_ECU:
self.path = self.ECU_identification.decode("latin-1").strip(" \t\n\r\0")
self.name = self.description.decode("latin-1").strip(" \t\n\r\0")
self.comment = f"Module number={self.module_nr} @ address={self.module_address}"
else:
self.path = self.sender_name.decode("latin-1").strip(" \t\n\r\0")
self.name = self.message_name.decode("latin-1").strip(" \t\n\r\0")
self.comment = f"Message ID={hex(self.CAN_id)} on CAN bus {self.CAN_ch_index}"
def to_blocks(
self,
address: int,
blocks: list[bytes | SupportsBytes],
defined_texts: dict[bytes | str, int],
cc_map: dict[bytes, int],
) -> int:
if self.type == v23c.SOURCE_ECU:
self.ECU_identification = self.path.encode("latin-1")[:31]
self.description = self.name.encode("latin-1")[:79]
else:
self.sender_name = self.path.encode("latin-1")[:35]
self.message_name = self.name.encode("latin-1")[:35]
bts = bytes(self)
if bts in cc_map:
self.address = cc_map[bts]
else:
blocks.append(bts)
self.address = address
cc_map[bts] = address
address += self.block_len
return address
def __getitem__(self, item: str) -> object:
return self.__getattribute__(item)
def __setitem__(self, item: str, value: object) -> None:
self.__setattr__(item, value)
def metadata(self) -> str:
if self.type == v23c.SOURCE_ECU:
keys = (
"id",
"block_len",
"type",
"module_nr",
"module_address",
"description",
"ECU_identification",
"reserved0",
)
else:
keys = (
"id",
"block_len",
"type",
"CAN_id",
"CAN_ch_index",
"message_name",
"sender_name",
"reserved0",
)
max_len = max(len(key) for key in keys)
template = f"{{: <{max_len}}}: {{}}"
metadata: list[str] = []
lines = f"""
address: {hex(self.address)}
""".split("\n")
for key in keys:
val = getattr(self, key)
if key.endswith("addr") or key.startswith("text_"):
lines.append(template.format(key, hex(val)))
elif isinstance(val, float):
lines.append(template.format(key, val))
else:
if isinstance(val, bytes):
lines.append(template.format(key, val.strip(b"\0")))
else:
lines.append(template.format(key, val))
if key == "type":
lines[-1] += f" = {v23c.SOURCE_TYPE_TO_STRING[self.type]}"
# elif key == "bus_type":
# lines[-1] += f" = {v23c.BUS_TYPE_TO_STRING[self.bus_type]}"
for line in lines:
if not line:
metadata.append(line)
else:
for wrapped_line in wrap(line, width=120):
metadata.append(wrapped_line)
return "\n".join(metadata)
def __bytes__(self) -> bytes:
typ = self.type
if typ == v23c.SOURCE_ECU:
return v23c.SOURCE_ECU_p(
self.id,
self.block_len,
self.type,
self.module_nr,
self.module_address,
self.description,
self.ECU_identification,
self.reserved0,
)
else:
return v23c.SOURCE_VECTOR_p(
self.id,
self.block_len,
self.type,
self.CAN_id,
self.CAN_ch_index,
self.message_name,
self.sender_name,
self.reserved0,
)
def __str__(self) -> str:
fields = []
for attr in dir(self):
if attr[:2] + attr[-2:] == "____":
continue
try:
if callable(getattr(self, attr)):
continue
fields.append(f"{attr}:{getattr(self, attr)}")
except AttributeError:
continue
return f"ChannelExtension (name: {self.name}, path: {self.path}, comment: {self.comment}, address: {hex(self.address)}, fields: {fields})"
class ChannelGroupKwargs(BlockKwargs, total=False):
block_len: int
next_cg_addr: int
first_ch_addr: int
comment_addr: int
record_id: int
ch_nr: int
samples_byte_nr: int
cycles_nr: int
[docs]
class ChannelGroup:
"""CGBLOCK class.
CGBLOCK fields:
* ``id`` - bytes : block ID; always b'CG'
* ``block_len`` - int : block bytes size
* ``next_cg_addr`` - int : next CGBLOCK address
* ``first_ch_addr`` - int : address of first channel block (CNBLOCK)
* ``comment_addr`` - int : address of TXBLOCK that contains the channel
group comment
* ``record_id`` - int : record ID used as identifier for a record if
the DGBLOCK defines a number of record IDs > 0 (unsorted group)
* ``ch_nr`` - int : number of channels
* ``samples_byte_nr`` - int : size of data record in bytes without
record ID
* ``cycles_nr`` - int : number of cycles (records) of this type in the data
block
* ``sample_reduction_addr`` - int : address to first sample reduction
block
Other attributes:
* ``address`` - int : block address inside MDF file
* ``comment`` - str : channel group comment
Parameters
----------
stream : file handle
MDF file handle.
address : int
Block address inside MDF file.
for dynamically created objects :
See the key-value pairs.
Examples
--------
>>> with open('test.mdf', 'rb') as mdf:
... cg1 = ChannelGroup(stream=mdf, address=0xBA52)
>>> cg2 = ChannelGroup(sample_bytes_nr=32)
>>> hex(cg1.address)
0xBA52
>>> cg1['id']
b'CG'
"""
__slots__ = (
"address",
"block_len",
"ch_nr",
"comment",
"comment_addr",
"cycles_nr",
"first_ch_addr",
"id",
"next_cg_addr",
"record_id",
"sample_reduction_addr",
"samples_byte_nr",
)
def __init__(self, **kwargs: Unpack[ChannelGroupKwargs]) -> None:
super().__init__()
self.comment = ""
try:
stream = kwargs["stream"]
mapped = kwargs.get("mapped", False)
self.address = address = kwargs["address"]
if mapped:
(
self.id,
self.block_len,
self.next_cg_addr,
self.first_ch_addr,
self.comment_addr,
self.record_id,
self.ch_nr,
self.samples_byte_nr,
self.cycles_nr,
) = v23c.CHANNEL_GROUP_uf(stream, address)
if self.block_len == v23c.CG_POST_330_BLOCK_SIZE:
# sample reduction blocks are not yet used
self.sample_reduction_addr = 0
else:
stream.seek(address)
block = stream.read(v23c.CG_PRE_330_BLOCK_SIZE)
(
self.id,
self.block_len,
self.next_cg_addr,
self.first_ch_addr,
self.comment_addr,
self.record_id,
self.ch_nr,
self.samples_byte_nr,
self.cycles_nr,
) = v23c.CHANNEL_GROUP_u(block)
if self.block_len == v23c.CG_POST_330_BLOCK_SIZE:
# sample reduction blocks are not yet used
self.sample_reduction_addr = 0
if self.id != b"CG":
message = f'Expected "CG" block @{hex(address)} but found "{self.id!r}"'
raise MdfException(message.format(self.id))
if self.comment_addr:
self.comment = get_text_v3(address=self.comment_addr, stream=stream, mapped=mapped)
except KeyError:
self.address = 0
self.id = b"CG"
self.block_len = kwargs.get("block_len", v23c.CG_PRE_330_BLOCK_SIZE)
self.next_cg_addr = kwargs.get("next_cg_addr", 0)
self.first_ch_addr = kwargs.get("first_ch_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.record_id = kwargs.get("record_id", 1)
self.ch_nr = kwargs.get("ch_nr", 0)
self.samples_byte_nr = kwargs.get("samples_byte_nr", 0)
self.cycles_nr = kwargs.get("cycles_nr", 0)
if self.block_len == v23c.CG_POST_330_BLOCK_SIZE:
self.sample_reduction_addr = 0
def to_blocks(
self,
address: int,
blocks: list[bytes | SupportsBytes],
defined_texts: dict[bytes | str, int],
si_map: dict[bytes, int],
) -> int:
text = self.comment
if text:
if text in defined_texts:
self.comment_addr = defined_texts[text]
else:
tx_block = TextBlock(text=text)
self.comment_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.comment_addr = 0
blocks.append(self)
self.address = address
address += self.block_len
return address
def __getitem__(self, item: str) -> object:
return self.__getattribute__(item)
def __setitem__(self, item: str, value: object) -> None:
self.__setattr__(item, value)
def __bytes__(self) -> bytes:
if self.block_len == v23c.CG_POST_330_BLOCK_SIZE:
return (
v23c.CHANNEL_GROUP_p(
self.id,
self.block_len,
self.next_cg_addr,
self.first_ch_addr,
self.comment_addr,
self.record_id,
self.ch_nr,
self.samples_byte_nr,
self.cycles_nr,
)
+ b"\0" * 4
)
else:
return v23c.CHANNEL_GROUP_p(
self.id,
self.block_len,
self.next_cg_addr,
self.first_ch_addr,
self.comment_addr,
self.record_id,
self.ch_nr,
self.samples_byte_nr,
self.cycles_nr,
)
def metadata(self) -> str:
keys = (
"id",
"block_len",
"next_cg_addr",
"first_ch_addr",
"comment_addr",
"record_id",
"ch_nr",
"samples_byte_nr",
"cycles_nr",
"sample_reduction_addr",
)
max_len = max(len(key) for key in keys)
template = f"{{: <{max_len}}}: {{}}"
metadata: list[str] = []
lines = f"""
address: {hex(self.address)}
comment: {self.comment}
""".split("\n")
for key in keys:
if not hasattr(self, key):
continue
val = getattr(self, key)
if key.endswith("addr") or key.startswith("text_"):
lines.append(template.format(key, hex(val)))
elif isinstance(val, float):
lines.append(template.format(key, val))
else:
if isinstance(val, bytes):
lines.append(template.format(key, val.strip(b"\0")))
else:
lines.append(template.format(key, val))
for line in lines:
if not line:
metadata.append(line)
else:
for wrapped_line in wrap(line, width=120):
metadata.append(wrapped_line)
return "\n".join(metadata)
class DataBlockKwargs(BlockKwargs, total=False):
size: int
data: bytes
class DataBlock:
"""Data Block class (pseudo block not defined by the MDF 3 standard).
`DataBlock` attributes:
* ``data`` - bytes : raw samples bytes
* ``address`` - int : block address
Parameters
----------
address : int
Block address inside the measurement file.
stream : file.io.handle
Binary file stream.
data : bytes
When created dynamically.
"""
__slots__ = "address", "data"
def __init__(self, **kwargs: Unpack[DataBlockKwargs]) -> None:
super().__init__()
try:
stream = kwargs["stream"]
size = kwargs["size"]
self.address = address = kwargs["address"]
stream.seek(address)
self.data = stream.read(size)
except KeyError:
self.address = 0
self.data = kwargs.get("data", b"")
def __getitem__(self, item: str) -> object:
return self.__getattribute__(item)
def __setitem__(self, item: str, value: object) -> None:
self.__setattr__(item, value)
def __bytes__(self) -> bytes:
return self.data
class DataGroupKwargs(BlockKwargs, total=False):
block_len: int
next_dg_addr: int
first_cg_addr: int
comment_addr: int
data_block_addr: int
cg_nr: int
record_id_len: int
[docs]
class DataGroup:
"""DGBLOCK class.
DGBLOCK fields:
* ``id`` - bytes : block ID; always b'DG'
* ``block_len`` - int : block bytes size
* ``next_dg_addr`` - int : next DGBLOCK address
* ``first_cg_addr`` - int : address of first channel group block (CGBLOCK)
* ``trigger_addr`` - int : address of trigger block (TRBLOCK)
* ``data_block_addr`` - address of data block
* ``cg_nr`` - int : number of channel groups
* ``record_id_len`` - int : number of record IDs in the data block
* ``reserved0`` - bytes : reserved bytes
Other attributes:
* ``address`` - int : block address inside MDF file
Parameters
----------
stream : file handle
MDF file handle.
address : int
Block address inside MDF file.
for dynamically created objects :
See the key-value pairs.
"""
__slots__ = (
"address",
"block_len",
"cg_nr",
"data_block_addr",
"first_cg_addr",
"id",
"next_dg_addr",
"record_id_len",
"reserved0",
"trigger_addr",
)
def __init__(self, **kwargs: Unpack[DataGroupKwargs]) -> None:
super().__init__()
try:
stream = kwargs["stream"]
mapped = kwargs.get("mapped", False)
self.address = address = kwargs["address"]
if mapped:
block = stream.read(v23c.DG_PRE_320_BLOCK_SIZE)
(
self.id,
self.block_len,
self.next_dg_addr,
self.first_cg_addr,
self.trigger_addr,
self.data_block_addr,
self.cg_nr,
self.record_id_len,
) = v23c.DATA_GROUP_PRE_320_uf(stream, address)
if self.block_len == v23c.DG_POST_320_BLOCK_SIZE:
self.reserved0 = stream[
address + v23c.DG_PRE_320_BLOCK_SIZE : address + v23c.DG_POST_320_BLOCK_SIZE
]
else:
stream.seek(address)
block = stream.read(v23c.DG_PRE_320_BLOCK_SIZE)
(
self.id,
self.block_len,
self.next_dg_addr,
self.first_cg_addr,
self.trigger_addr,
self.data_block_addr,
self.cg_nr,
self.record_id_len,
) = v23c.DATA_GROUP_PRE_320_u(block)
if self.block_len == v23c.DG_POST_320_BLOCK_SIZE:
self.reserved0 = stream.read(4)
if self.id != b"DG":
message = f'Expected "DG" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
except KeyError:
self.address = kwargs.get("address", 0)
self.id = b"DG"
self.block_len = kwargs.get("block_len", v23c.DG_PRE_320_BLOCK_SIZE)
self.next_dg_addr = kwargs.get("next_dg_addr", 0)
self.first_cg_addr = kwargs.get("first_cg_addr", 0)
self.trigger_addr = kwargs.get("comment_addr", 0)
self.data_block_addr = kwargs.get("data_block_addr", 0)
self.cg_nr = kwargs.get("cg_nr", 1)
self.record_id_len = kwargs.get("record_id_len", 0)
if self.block_len == v23c.DG_POST_320_BLOCK_SIZE:
self.reserved0 = b"\0\0\0\0"
def __getitem__(self, item: str) -> object:
return self.__getattribute__(item)
def __setitem__(self, item: str, value: object) -> None:
self.__setattr__(item, value)
def __bytes__(self) -> bytes:
keys: tuple[str, ...]
if self.block_len == v23c.DG_POST_320_BLOCK_SIZE:
fmt = v23c.FMT_DATA_GROUP_POST_320
keys = v23c.KEYS_DATA_GROUP_POST_320
else:
fmt = v23c.FMT_DATA_GROUP_PRE_320
keys = v23c.KEYS_DATA_GROUP_PRE_320
result = pack(fmt, *[self[key] for key in keys])
return result
class FileIdentificationBlockKwargs(BlockKwargs, total=False):
version: str
[docs]
class FileIdentificationBlock:
"""IDBLOCK class.
IDBLOCK fields:
* ``file_identification`` - bytes : file identifier
* ``version_str`` - bytes : format identifier
* ``program_identification`` - bytes : creator program identifier
* ``byte_order`` - int : integer code for byte order (endianness)
* ``float_format`` - int : integer code for floating-point format
* ``mdf_version`` - int : version number of MDF format
* ``code_page`` - int : unicode code page number
* ``reserved0`` - bytes : reserved bytes
* ``reserved1`` - bytes : reserved bytes
* ``unfinalized_standard_flags`` - int : standard flags for unfinalized MDF
* ``unfinalized_custom_flags`` - int : custom flags for unfinalized MDF
Other attributes:
* ``address`` - int : block address inside MDF file; should be 0 always
Parameters
----------
stream : file handle
MDF file handle.
version : int
MDF version in case of new file (dynamically created).
"""
__slots__ = (
"address",
"byte_order",
"code_page",
"file_identification",
"float_format",
"mdf_version",
"program_identification",
"reserved0",
"reserved1",
"unfinalized_custom_flags",
"unfinalized_standard_flags",
"version_str",
)
def __init__(self, **kwargs: Unpack[FileIdentificationBlockKwargs]) -> None:
super().__init__()
self.address = 0
try:
stream = kwargs["stream"]
stream.seek(0)
(
self.file_identification,
self.version_str,
self.program_identification,
self.byte_order,
self.float_format,
self.mdf_version,
self.code_page,
self.reserved0,
self.reserved1,
self.unfinalized_standard_flags,
self.unfinalized_custom_flags,
) = typing.cast(v23c.Id, unpack(v23c.ID_FMT, stream.read(v23c.ID_BLOCK_SIZE)))
except KeyError:
version = kwargs["version"]
self.file_identification = "MDF ".encode("latin-1")
self.version_str = version.encode("latin-1") + b"\0" * 4
self.program_identification = f"{tool.__tool_short__}{tool.__version__}".encode("latin-1")
self.byte_order = v23c.BYTE_ORDER_INTEL if sys.byteorder == "little" else v23c.BYTE_ORDER_MOTOROLA
self.float_format = 0
self.mdf_version = int(version.replace(".", ""))
self.code_page = 0
self.reserved0 = b"\0" * 2
self.reserved1 = b"\0" * 26
self.unfinalized_standard_flags = 0
self.unfinalized_custom_flags = 0
def __getitem__(self, item: str) -> object:
return self.__getattribute__(item)
def __setitem__(self, item: str, value: object) -> None:
self.__setattr__(item, value)
def __bytes__(self) -> bytes:
result = pack(v23c.ID_FMT, *[self[key] for key in v23c.ID_KEYS])
return result
class HeaderBlockKwargs(BlockKwargs, total=False):
version: str
class _CommonProperties(TypedDict, total=False):
author: str
project: str
department: str
subject: str
class ProgramBlockKwargs(BlockKwargs, total=False):
data: bytes
[docs]
class ProgramBlock:
"""PRBLOCK class.
PRBLOCK fields:
* ``id`` - bytes : block ID; always b'PR'
* ``block_len`` - int : block bytes size
* ``data`` - bytes : creator program free format data
Other attributes:
* ``address`` - int : block address inside MDF file
Parameters
----------
stream : file handle
MDF file handle.
address : int
Block address inside MDF file.
"""
__slots__ = ("address", "block_len", "data", "id")
def __init__(self, **kwargs: Unpack[ProgramBlockKwargs]) -> None:
super().__init__()
try:
stream = kwargs["stream"]
self.address = address = kwargs["address"]
stream.seek(address)
self.id, self.block_len = COMMON_u(stream.read(4))
self.data = stream.read(self.block_len - 4)
if self.id != b"PR":
message = f'Expected "PR" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
except KeyError:
self.id = b"PR"
self.block_len = len(kwargs["data"]) + 6
self.data = kwargs["data"]
def __getitem__(self, item: str) -> object:
return self.__getattribute__(item)
def __setitem__(self, item: str, value: object) -> None:
self.__setattr__(item, value)
def __bytes__(self) -> bytes:
fmt = v23c.FMT_PROGRAM_BLOCK.format(self.block_len - 4)
result = pack(fmt, *[self[key] for key in v23c.KEYS_PROGRAM_BLOCK])
return result
class TextBlockKwargs(BlockKwargs, total=False):
text: bytes | str
[docs]
class TextBlock:
r"""TXBLOCK class.
TXBLOCK fields:
* ``id`` - bytes : block ID; always b'TX'
* ``block_len`` - int : block bytes size
* ``text`` - bytes : text content
Other attributes:
* ``address`` - int : block address inside MDF file
Parameters
----------
stream : file handle
MDF file handle.
address : int
Block address inside MDF file.
text : bytes | str
Bytes or str for creating a new TextBlock.
Examples
--------
>>> tx1 = TextBlock(text='VehicleSpeed')
>>> tx1['text']
b'VehicleSpeed\x00'
"""
__slots__ = ("address", "block_len", "id", "text")
def __init__(self, **kwargs: Unpack[TextBlockKwargs]) -> None:
super().__init__()
try:
stream = kwargs["stream"]
mapped = kwargs.get("mapped", False)
self.address = address = kwargs["address"]
if mapped:
self.id, self.block_len = COMMON_uf(stream, address)
if self.id != b"TX":
message = f'Expected "TX" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
self.text = stream[address + 4 : address + self.block_len]
else:
stream.seek(address)
self.id, self.block_len = COMMON_u(stream.read(4))
if self.id != b"TX":
message = f'Expected "TX" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
size = self.block_len - 4
self.text = stream.read(size)
except KeyError:
self.address = 0
text = kwargs["text"]
if isinstance(text, str):
text_bytes = text.encode("latin-1", "backslashreplace")
else:
text_bytes = text
self.id = b"TX"
self.block_len = len(text_bytes) + 5
self.text = text_bytes + b"\0"
if self.block_len > 65000:
self.block_len = 65000 + 5
self.text = self.text[:65000] + b"\0"
def __getitem__(self, item: str) -> object:
return self.__getattribute__(item)
def __setitem__(self, item: str, value: object) -> None:
self.__setattr__(item, value)
def __bytes__(self) -> bytes:
return v23c.COMMON_p(self.id, self.block_len) + self.text
def __repr__(self) -> str:
return f"TextBlock(id={self.id!r}, block_len={self.block_len}, text={self.text!r})"
[docs]
class TriggerBlock:
"""TRBLOCK class.
TRBLOCK fields:
* ``id`` - bytes : block ID; always b'TR'
* ``block_len`` - int : block bytes size
* ``text_addr`` - int : address of TXBLOCK that contains the trigger
comment text
* ``trigger_events_nr`` - int : number of trigger events
* ``trigger_<N>_time`` - float : trigger time [s] of trigger's N-th event
* ``trigger_<N>_pretime`` - float : pre trigger time [s] of trigger's N-th
event
* ``trigger_<N>_posttime`` - float : post trigger time [s] of trigger's
N-th event
Other attributes:
* ``address`` - int : block address inside MDF file
* ``comment`` - str : trigger comment
Parameters
----------
stream : file handle
MDF file handle.
address : int
Block address inside MDF file.
"""
def __init__(self, **kwargs: Unpack[BlockKwargs]) -> None:
super().__init__()
self.comment = ""
try:
self.address = address = kwargs["address"]
stream = kwargs["stream"]
stream.seek(address + 2)
(size,) = UINT16_u(stream.read(2))
stream.seek(address)
block = stream.read(size)
self.id, self.block_len, self.text_addr, self.trigger_events_nr = typing.cast(
tuple[bytes, int, int, int], unpack("<2sHIH", block[:10])
)
nr = self.trigger_events_nr
if nr:
values: tuple[float, ...] = unpack(f"<{3 * nr}d", block[10:])
for i in range(nr):
(
self[f"trigger_{i}_time"],
self[f"trigger_{i}_pretime"],
self[f"trigger_{i}_posttime"],
) = (values[i * 3], values[3 * i + 1], values[3 * i + 2])
if self.text_addr:
self.comment = get_text_v3(address=self.text_addr, stream=stream)
if self.id != b"TR":
message = f'Expected "TR" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
except KeyError:
self.address = 0
nr = 0
while f"trigger_{nr}_time" in kwargs:
nr += 1
self.id = b"TR"
self.block_len = 10 + nr * 8 * 3
self.text_addr = 0
self.trigger_events_nr = nr
for i in range(nr):
key = f"trigger_{i}_time"
self[key] = kwargs[key] # type: ignore[literal-required]
key = f"trigger_{i}_pretime"
self[key] = kwargs[key] # type: ignore[literal-required]
key = f"trigger_{i}_posttime"
self[key] = kwargs[key] # type: ignore[literal-required]
def to_blocks(self, address: int, blocks: list[bytes | SupportsBytes]) -> int:
text = self.comment
if text:
tx_block = TextBlock(text=text)
self.text_addr = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.text_addr = 0
blocks.append(self)
self.address = address
address += self.block_len
return address
def __getitem__(self, item: str) -> object:
return self.__getattribute__(item)
def __setitem__(self, item: str, value: object) -> None:
self.__setattr__(item, value)
def __bytes__(self) -> bytes:
triggers_nr = self.trigger_events_nr
fmt = f"<2sHIH{triggers_nr * 3}d"
keys: tuple[str, ...] = ("id", "block_len", "text_addr", "trigger_events_nr")
for i in range(triggers_nr):
keys += (
f"trigger_{i}_time",
f"trigger_{i}_pretime",
f"trigger_{i}_posttime",
)
result = pack(fmt, *[self[key] for key in keys])
return result