"""Classes that implement the blocks for MDF version 4"""
from collections.abc import Callable
from datetime import datetime, timedelta, timezone
from hashlib import md5
import inspect
import logging
import pathlib
from pathlib import Path
import re
from struct import pack, unpack, unpack_from
from textwrap import wrap
import time
from traceback import format_exc
import typing
from typing import Final, Literal, TYPE_CHECKING, Union
from xml.dom import minidom
import xml.etree.ElementTree as ET
import dateutil.tz
from lz4.frame import compress as lz_compress
from lz4.frame import decompress as lz_decompress
from numexpr import evaluate
import numpy as np
from numpy.typing import ArrayLike, NDArray
from typing_extensions import Any, Buffer, overload, SupportsBytes, TypedDict, Unpack
from zstd import compress as zstd_compress
from zstd import decompress as zstd_decompress
from .. import tool
from . import v4_constants as v4c
from .cutils import bytes_dtype_size
from .types import StrPath
from .utils import (
block_fields,
BlockKwargs,
escape_xml_string,
extract_display_names,
extract_ev_tool,
FLOAT64_u,
get_text_v4,
handle_incomplete_block,
is_file_like,
MdfException,
UINT8_uf,
UINT64_u,
UINT64_uf,
)
COMPRESSION_LEVEL = 1
AT_COMPRESSION_LEVEL = 9
try:
from deflate import zlib_compress as compress
from deflate import zlib_decompress
def decompress(data, bufsize):
return zlib_decompress(data, originalsize=bufsize)
except ImportError:
try:
from isal.isal_zlib import compress, decompress
except ImportError:
from zlib import ( # type: ignore[assignment, no-redef, unused-ignore]
compress,
decompress,
)
try:
from sympy import lambdify, symbols
except:
lambdify, symbols = None, None
if TYPE_CHECKING:
from .source_utils import Source
SEEK_START: Final = v4c.SEEK_START
SEEK_END: Final = v4c.SEEK_END
COMMON_SIZE: Final = v4c.COMMON_SIZE
COMMON_u = v4c.COMMON_u
COMMON_uf = v4c.COMMON_uf
CN_BLOCK_SIZE: Final = v4c.CN_BLOCK_SIZE
CN_SINGLE_ATTACHMENT_BLOCK_SIZE: Final = v4c.CN_SINGLE_ATTACHMENT_BLOCK_SIZE
SIMPLE_CHANNEL_PARAMS_uf = v4c.SIMPLE_CHANNEL_PARAMS_uf
SINGLE_ATTACHMENT_CHANNEL_PARAMS_uf = v4c.SINGLE_ATTACHMENT_CHANNEL_PARAMS_uf
EIGHT_BYTES: Final = bytes(8)
logger = logging.getLogger("asammdf")
__all__ = [
"AttachmentBlock",
"Channel",
"ChannelArrayBlock",
"ChannelConversion",
"ChannelGroup",
"DataBlock",
"DataGroup",
"DataList",
"DataZippedBlock",
"EventBlock",
"FileHistory",
"FileIdentificationBlock",
"HeaderBlock",
"HeaderList",
"SourceInformation",
"TextBlock",
]
class AttachmentBlockKwargs(BlockKwargs, total=False):
data: bytes
comment: str
mime: str
file_name: StrPath
compression: bool
embedded: bool
creator_index: int
file_limit: int | float
compression_type: str
[docs]
class AttachmentBlock:
"""When adding new attachments only embedded attachments are allowed, with
keyword argument `data` of type bytes.
`AttachmentBlock` has the following attributes, which are also available as
dict-like key-value pairs.
ATBLOCK fields:
* ``id`` - bytes : block ID; always b'##AT'
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``next_at_addr`` - int : next ATBLOCK address
* ``file_name_addr`` - int : address of TXBLOCK that contains the
attachment file name
* ``mime_addr`` - int : address of TXBLOCK that contains the attachment
mime type description
* ``comment_addr`` - int : address of TXBLOCK/MDBLOCK that contains the
attachment comment
* ``flags`` - int : ATBLOCK flags
* ``creator_index`` - int : index of file history block
* ``reserved1`` - int : reserved bytes
* ``md5_sum`` - bytes : attachment file MD5 sum
* ``original_size`` - int : original uncompressed file size in bytes
* ``embedded_size`` - int : embedded compressed file size in bytes
* ``embedded_data`` - bytes : embedded attachment bytes
Other attributes:
* ``address`` - int : attachment address
* ``file_name`` - str : attachment file name
* ``mime`` - str : mime type
* ``comment`` - str : attachment comment
Parameters
----------
address : int
Block address; to be used for objects created from file.
stream : handle
File handle; to be used for objects created from file.
for dynamically created objects :
See the key-value pairs.
"""
__slots__ = (
"address",
"block_len",
"comment",
"comment_addr",
"creator_index",
"embedded_data",
"embedded_size",
"file_name",
"file_name_addr",
"file_name_zip",
"file_name_zip_addr",
"flags",
"id",
"links_nr",
"md5_sum",
"mime",
"mime_addr",
"mime_zip",
"mime_zip_addr",
"next_at_addr",
"original_size",
"path_syntax",
"reserved0",
"reserved1",
"zip_type",
)
def __init__(self, **kwargs: Unpack[AttachmentBlockKwargs]) -> None:
self.file_name = self.mime = self.comment = ""
self.file_name_zip_addr = self.mime_zip_addr = 0
self.file_name_zip = self.mime_zip = ""
try:
self.address = address = kwargs["address"]
stream = kwargs["stream"]
mapped = kwargs.get("mapped", False) or not is_file_like(stream)
file_limit = kwargs["file_limit"]
if address + v4c.AT_COMMON_SIZE > file_limit:
handle_incomplete_block(address, file_limit)
raise KeyError
stream.seek(address)
(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
) = v4c.COMMON_u(stream.read(v4c.COMMON_SIZE))
if self.id != b"##AT":
raise MdfException(f"expected ATBLOCK (b'##AT') at {address=:X} but found {self.id}")
links = unpack(f"<{self.links_nr}Q", stream.read(self.links_nr * 8))
self.next_at_addr, self.file_name_addr, self.mime_addr, self.comment_addr, *links = links
(
self.flags,
self.creator_index,
self.zip_type,
self.path_syntax,
self.reserved1,
self.md5_sum,
self.original_size,
self.embedded_size,
) = v4c.AT_TAIL_u(stream.read(v4c.AT_TAIL_SIZE))
if self.flags & v4c.FLAG_AT_ZIP_FILE_NAME_VALID:
self.file_name_zip_addr = links.pop(0)
self.file_name_zip = get_text_v4(self.file_name_zip_addr, stream, file_limit=file_limit)
if self.flags & v4c.FLAG_AT_ZIP_MIME_TYPE_VALID:
self.mime_zip_addr = links.pop(0)
self.mime_zip = get_text_v4(self.mime_zip_addr, stream, file_limit=file_limit)
if address + self.block_len > file_limit:
handle_incomplete_block(address, file_limit)
raise KeyError
self.embedded_data = stream.read(self.embedded_size)
if self.id != b"##AT":
message = f'Expected "##AT" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
self.file_name = get_text_v4(self.file_name_addr, stream, mapped=mapped, file_limit=file_limit)
self.mime = get_text_v4(self.mime_addr, stream, mapped=mapped, file_limit=file_limit)
self.comment = get_text_v4(self.comment_addr, stream, mapped=mapped, file_limit=file_limit)
except KeyError:
self.address = 0
self.comment = kwargs.get("comment", "")
self.mime = kwargs.get("mime", "")
file_name = Path(kwargs.get("file_name", None) or "bin.bin")
self.zip_type = self.path_syntax = 0
data = kwargs.get("data", b"")
original_size = embedded_size = len(data)
compression = kwargs.get("compression", False)
compression_type = kwargs.get("compression_type", "deflate")
embedded = kwargs.get("embedded", False)
md5_sum = md5(data).digest()
flags = v4c.FLAG_AT_MD5_VALID
if embedded:
flags |= v4c.FLAG_AT_EMBEDDED
if compression:
match compression_type:
case "deflate":
flags |= v4c.FLAG_AT_COMPRESSED_EMBEDDED
data = compress(data, AT_COMPRESSION_LEVEL)
embedded_size = len(data)
self.zip_type = v4c.AT_ZIP_TYPE_DEFLATE
case "zstd":
flags |= v4c.FLAG_AT_GENERAL_COMPRESSED_EMBEDDED
data = zstd_compress(data, AT_COMPRESSION_LEVEL)
embedded_size = len(data)
self.zip_type = v4c.AT_ZIP_TYPE_ZSTD
self.path_syntax = ord(pathlib.os.sep)
case "lz4":
flags |= v4c.FLAG_AT_GENERAL_COMPRESSED_EMBEDDED
data = lz_compress(data, AT_COMPRESSION_LEVEL)
embedded_size = len(data)
self.zip_type = v4c.AT_ZIP_TYPE_LZ4
self.path_syntax = ord(pathlib.os.sep)
self.file_name = file_name.name
else:
self.file_name = str(file_name)
embedded_size = 0
data = b""
self.id = b"##AT"
self.reserved0 = 0
self.block_len = v4c.AT_COMMON_SIZE + embedded_size
self.links_nr = 4
self.next_at_addr = 0
self.file_name_addr = 0
self.mime_addr = 0
self.comment_addr = 0
self.flags = flags
self.creator_index = kwargs.get("creator_index", 0)
self.reserved1 = 0
self.md5_sum = md5_sum
self.original_size = original_size
self.embedded_size = embedded_size
self.embedded_data = data
def to_blocks(
self, address: int, blocks: list[bytes | SupportsBytes], defined_texts: dict[bytes | str, int]
) -> int:
text = self.file_name
if text:
if text in defined_texts:
self.file_name_addr = defined_texts[text]
else:
tx_block = TextBlock(text=str(text))
self.file_name_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.file_name_addr = 0
text = self.mime
if text:
if text in defined_texts:
self.mime_addr = defined_texts[text]
else:
tx_block = TextBlock(text=text)
self.mime_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.mime_addr = 0
text = self.comment
if text:
if text in defined_texts:
self.comment_addr = defined_texts[text]
else:
meta = text.startswith("<ATcomment")
tx_block = TextBlock(text=text, meta=meta)
self.comment_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.comment_addr = 0
text = self.file_name_zip
if text:
if text in defined_texts:
self.file_name_zip_addr = defined_texts[text]
else:
tx_block = TextBlock(text=str(text))
self.file_name_zip_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.file_name_zip_addr = 0
text = self.mime_zip
if text:
if text in defined_texts:
self.mime_zip_addr = defined_texts[text]
else:
tx_block = TextBlock(text=str(text))
self.mime_zip_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.mime_zip_addr = 0
blocks.append(self)
self.address = address
address += self.block_len
align = address % 8
if align:
blocks.append(b"\0" * (8 - align))
address += 8 - align
return address
def __getitem__(self, item: str) -> object:
return getattr(self, item)
def __setitem__(self, item: str, value: object) -> None:
setattr(self, item, value)
def __bytes__(self) -> bytes:
fmt = f"<4sI{self.links_nr + 2}Q2H2BH16s2Q{self.embedded_size}s"
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"next_at_addr",
"file_name_addr",
"mime_addr",
"comment_addr",
)
if self.flags & v4c.FLAG_AT_ZIP_FILE_NAME_VALID:
keys = (*keys, "file_name_zip_addr")
if self.flags & v4c.FLAG_AT_ZIP_MIME_TYPE_VALID:
keys = (*keys, "mime_zip_addr")
keys = keys + (
"flags",
"creator_index",
"zip_type",
"path_syntax",
"reserved1",
"md5_sum",
"original_size",
"embedded_size",
"embedded_data",
)
result = pack(fmt, *[self[key] for key in keys])
return result
def __repr__(self) -> str:
return f"ATBLOCK(address={self.address:x}, file_name={self.file_name}, comment={self.comment})"
class ChannelKwargs(BlockKwargs, total=False):
at_map: dict[int, int]
parsed_strings: tuple[str, dict[str, str], str] | None
use_display_names: bool
cc_map: dict[bytes | int, "ChannelConversion"]
si_map: dict[Union[bytes, int, "Source"], "SourceInformation"]
channel_type: int
sync_type: int
data_type: int
bit_offset: int
byte_offset: int
bit_count: int
flags: int
pos_invalidation_bit: int
precision: int
min_raw_value: float
max_raw_value: float
lower_limit: float
upper_limit: float
lower_ext_limit: float
upper_ext_limit: float
attachment_addr: int
file_limit: int | float
CN = b"##CN"
[docs]
class Channel:
"""If the `load_metadata` keyword argument is not provided or is False,
then the conversion, source and display name information is not processed.
Furthermore if the `parse_xml_comment` is not provided or is False, then
the display name information from the channel comment is not processed
(this is done to avoid expensive XML operations).
`Channel` has the following attributes, which are also available as
dict-like key-value pairs.
CNBLOCK fields:
* ``id`` - bytes : block ID; always b'##CN'
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``next_ch_addr`` - int : next ATBLOCK address
* ``component_addr`` - int : address of first channel in case of structure
channel composition, or ChannelArrayBlock in case of arrays file name
* ``name_addr`` - int : address of TXBLOCK that contains the channel name
* ``source_addr`` - int : address of channel source block
* ``conversion_addr`` - int : address of channel conversion block
* ``data_block_addr`` - int : address of signal data block for VLSD channels
* ``unit_addr`` - int : address of TXBLOCK that contains the channel unit
* ``comment_addr`` - int : address of TXBLOCK/MDBLOCK that contains the
channel comment
* ``attachment_<N>_addr`` - int : address of N-th ATBLOCK referenced by the
current channel; if no ATBLOCK is referenced there will be no such
key-value pair
* ``default_X_dg_addr`` - int : address of DGBLOCK where the default X axis
channel for the current channel is found; this key-value pair will not
exist for channels that don't have a default X axis
* ``default_X_cg_addr`` - int : address of CGBLOCK where the default X axis
channel for the current channel is found; this key-value pair will not
exist for channels that don't have a default X axis
* ``default_X_ch_addr`` - int : address of default X axis
channel for the current channel; this key-value pair will not
exist for channels that don't have a default X axis
* ``channel_type`` - int : integer code for the channel type
* ``sync_type`` - int : integer code for the channel's sync type
* ``data_type`` - int : integer code for the channel's data type
* ``bit_offset`` - int : bit offset
* ``byte_offset`` - int : byte offset within the data record
* ``bit_count`` - int : channel bit count
* ``flags`` - int : CNBLOCK flags
* ``pos_invalidation_bit`` - int : invalidation bit position for the
current channel if there are invalidation bytes in the data record
* ``precision`` - int : integer code for the precision
* ``reserved1`` - int : reserved bytes
* ``min_raw_value`` - int : min raw value of all samples
* ``max_raw_value`` - int : max raw value of all samples
* ``lower_limit`` - int : min physical value of all samples
* ``upper_limit`` - int : max physical value of all samples
* ``lower_ext_limit`` - int : min physical value of all samples
* ``upper_ext_limit`` - int : max physical value of all samples
Other attributes:
* ``address`` - int : channel address
* ``attachments`` - list : list of referenced attachment blocks indexes;
the index reference to the attachment block index
* ``comment`` - str : channel comment
* ``conversion`` - ChannelConversion : channel conversion; None if the
channel has no conversion
* ``display_name`` - str : channel display name; this is extracted from the
XML channel comment
* ``name`` - str : channel name
* ``source`` - SourceInformation : channel source information; None if the
channel has no source information
* ``unit`` - str : channel unit
Parameters
----------
address : int
Block address; to be used for objects created from file.
stream : handle
File handle; to be used for objects created from file.
load_metadata : bool
Option to load conversion, source and display_name; default is True.
parse_xml_comment : bool
Option to parse XML channel comment to search for display name; default
is True.
for dynamically created objects :
See the key-value pairs.
"""
__slots__ = (
"address",
"attachment",
"attachment_addr",
"attachment_nr",
"axes",
"bit_count",
"bit_offset",
"block_len",
"byte_offset",
"channel_type",
"comment",
"comment_addr",
"component_addr",
"conversion",
"conversion_addr",
"conversions",
"data_block_addr",
"data_type",
"default_X_cg_addr",
"default_X_ch_addr",
"default_X_dg_addr",
"display_names",
"dtype_fmt",
"fast_path",
"flags",
"id",
"links_nr",
"lower_ext_limit",
"lower_limit",
"max_raw_value",
"min_raw_value",
"name",
"name_addr",
"next_ch_addr",
"pos_invalidation_bit",
"precision",
"reserved0",
"reserved1",
"source",
"source_addr",
"standard_C_size",
"sync_type",
"unit",
"unit_addr",
"units",
"upper_ext_limit",
"upper_limit",
)
def __init__(self, **kwargs: Unpack[ChannelKwargs]) -> None:
self.dtype_fmt: np.dtype[Any] = np.dtype(np.void)
self.axes = self.conversions = self.units = None
if "stream" in kwargs:
self.address = address = kwargs["address"]
self.attachment = None
stream = kwargs["stream"]
mapped = kwargs["mapped"]
file_limit = kwargs["file_limit"]
if address + COMMON_SIZE > file_limit:
handle_incomplete_block(address, file_limit)
raise KeyError
if mapped:
self.id, self.reserved0, self.block_len, self.links_nr = COMMON_uf(stream, address)
if address + self.block_len > file_limit:
handle_incomplete_block(address, file_limit)
raise KeyError
if self.id != b"##CN":
message = f'Expected "##CN" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
if self.block_len == CN_BLOCK_SIZE:
(
self.next_ch_addr,
self.component_addr,
self.name_addr,
self.source_addr,
self.conversion_addr,
self.data_block_addr,
self.unit_addr,
self.comment_addr,
self.channel_type,
self.sync_type,
self.data_type,
self.bit_offset,
self.byte_offset,
self.bit_count,
self.flags,
self.pos_invalidation_bit,
self.precision,
self.reserved1,
self.attachment_nr,
self.min_raw_value,
self.max_raw_value,
self.lower_limit,
self.upper_limit,
self.lower_ext_limit,
self.upper_ext_limit,
) = SIMPLE_CHANNEL_PARAMS_uf(stream, address + COMMON_SIZE)
elif self.block_len == CN_SINGLE_ATTACHMENT_BLOCK_SIZE:
(
self.next_ch_addr,
self.component_addr,
self.name_addr,
self.source_addr,
self.conversion_addr,
self.data_block_addr,
self.unit_addr,
self.comment_addr,
self.attachment_addr,
self.channel_type,
self.sync_type,
self.data_type,
self.bit_offset,
self.byte_offset,
self.bit_count,
self.flags,
self.pos_invalidation_bit,
self.precision,
self.reserved1,
self.attachment_nr,
self.min_raw_value,
self.max_raw_value,
self.lower_limit,
self.upper_limit,
self.lower_ext_limit,
self.upper_ext_limit,
) = SINGLE_ATTACHMENT_CHANNEL_PARAMS_uf(stream, address + COMMON_SIZE)
self.attachment = kwargs["at_map"].get(self.attachment_addr, 0)
else:
stream.seek(address + COMMON_SIZE)
block = stream.read(self.block_len - COMMON_SIZE)
links_nr = self.links_nr
links: tuple[int, ...] = unpack_from(f"<{links_nr}Q", block)
params: v4c.Channel = unpack_from(v4c.FMT_CHANNEL_PARAMS, block, links_nr * 8)
(
self.next_ch_addr,
self.component_addr,
self.name_addr,
self.source_addr,
self.conversion_addr,
self.data_block_addr,
self.unit_addr,
self.comment_addr,
) = links[:8]
at_map = kwargs.get("at_map", {})
if params[10]:
self.attachment_addr = links[8]
self.attachment = at_map.get(links[8], 0)
self.links_nr -= params[10] - 1
self.block_len -= (params[10] - 1) * 8
params = (
params[0],
params[1],
params[2],
params[3],
params[4],
params[5],
params[6],
params[7],
params[8],
params[9],
1,
params[11],
params[12],
params[13],
params[14],
params[15],
params[16],
)
if params[6] & v4c.FLAG_CN_DEFAULT_X:
(
self.default_X_dg_addr,
self.default_X_cg_addr,
self.default_X_ch_addr,
) = links[-3:]
# default X not supported yet
(
self.default_X_dg_addr,
self.default_X_cg_addr,
self.default_X_ch_addr,
) = (0, 0, 0)
(
self.channel_type,
self.sync_type,
self.data_type,
self.bit_offset,
self.byte_offset,
self.bit_count,
self.flags,
self.pos_invalidation_bit,
self.precision,
self.reserved1,
self.attachment_nr,
self.min_raw_value,
self.max_raw_value,
self.lower_limit,
self.upper_limit,
self.lower_ext_limit,
self.upper_ext_limit,
) = params
parsed_strings = kwargs["parsed_strings"]
if parsed_strings is None:
self.name = get_text_v4(self.name_addr, stream, mapped=mapped, file_limit=file_limit)
self.comment = get_text_v4(self.comment_addr, stream, mapped=mapped, file_limit=file_limit)
if kwargs["use_display_names"]:
self.display_names = extract_display_names(self.comment)
else:
self.display_names = {}
else:
self.name, self.display_names, self.comment = parsed_strings
cache = kwargs.get("units_map", {})
if self.unit_addr in cache:
self.unit = cache[self.unit_addr]
else:
self.unit = get_text_v4(self.unit_addr, stream, mapped=mapped, file_limit=file_limit)
cache[self.unit_addr] = self.unit
address = self.conversion_addr
if address:
cc_map = kwargs["cc_map"]
try:
if address in cc_map:
conv = cc_map[address]
else:
if address + 16 > file_limit:
handle_incomplete_block(address, file_limit)
raise MdfException(f"Incomplete block at {address:x}")
(size,) = UINT64_uf(stream, address + 8)
if address + size > file_limit:
handle_incomplete_block(address, file_limit)
raise MdfException(f"Incomplete block at {address:x}")
raw_bytes = stream[address : address + size]
if raw_bytes in cc_map:
conv = cc_map[raw_bytes]
else:
conv = ChannelConversion(
raw_bytes=raw_bytes,
stream=stream,
address=address,
mapped=mapped,
file_limit=file_limit,
)
cc_map[raw_bytes] = cc_map[address] = conv
except:
logger.warning(
f"Channel conversion parsing error: {format_exc()}. The error is ignored and the channel conversion is None"
)
conv = None
self.conversion = conv
else:
self.conversion = None
address = self.source_addr
if address:
si_map = kwargs["si_map"]
try:
if address in si_map:
source = si_map[address]
else:
if address + v4c.SI_BLOCK_SIZE > file_limit:
handle_incomplete_block(address, file_limit)
raise MdfException(f"Incomplete block at {address:x}")
raw_bytes = stream[address : address + v4c.SI_BLOCK_SIZE]
if raw_bytes in si_map:
source = si_map[raw_bytes]
else:
source = SourceInformation(
raw_bytes=raw_bytes,
stream=stream,
address=address,
mapped=mapped,
file_limit=file_limit,
)
si_map[raw_bytes] = si_map[address] = source
except:
logger.warning(
f"Channel source parsing error: {format_exc()}. The error is ignored and the channel source is None"
)
source = None
self.source = source
else:
self.source = None
else:
stream.seek(address)
if address + CN_SINGLE_ATTACHMENT_BLOCK_SIZE > file_limit:
handle_incomplete_block(address, file_limit)
raise KeyError
block = stream.read(CN_SINGLE_ATTACHMENT_BLOCK_SIZE)
self.id, self.reserved0, self.block_len, self.links_nr = COMMON_uf(block)
if address + self.block_len > file_limit:
handle_incomplete_block(address, file_limit)
raise KeyError
if self.id != b"##CN":
message = f'Expected "##CN" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
if self.block_len == CN_BLOCK_SIZE:
(
self.next_ch_addr,
self.component_addr,
self.name_addr,
self.source_addr,
self.conversion_addr,
self.data_block_addr,
self.unit_addr,
self.comment_addr,
self.channel_type,
self.sync_type,
self.data_type,
self.bit_offset,
self.byte_offset,
self.bit_count,
self.flags,
self.pos_invalidation_bit,
self.precision,
self.reserved1,
self.attachment_nr,
self.min_raw_value,
self.max_raw_value,
self.lower_limit,
self.upper_limit,
self.lower_ext_limit,
self.upper_ext_limit,
) = SIMPLE_CHANNEL_PARAMS_uf(block, COMMON_SIZE)
elif self.block_len == CN_SINGLE_ATTACHMENT_BLOCK_SIZE:
(
self.next_ch_addr,
self.component_addr,
self.name_addr,
self.source_addr,
self.conversion_addr,
self.data_block_addr,
self.unit_addr,
self.comment_addr,
self.attachment_addr,
self.channel_type,
self.sync_type,
self.data_type,
self.bit_offset,
self.byte_offset,
self.bit_count,
self.flags,
self.pos_invalidation_bit,
self.precision,
self.reserved1,
self.attachment_nr,
self.min_raw_value,
self.max_raw_value,
self.lower_limit,
self.upper_limit,
self.lower_ext_limit,
self.upper_ext_limit,
) = SINGLE_ATTACHMENT_CHANNEL_PARAMS_uf(block, COMMON_SIZE)
at_map = kwargs.get("at_map", {})
self.attachment = at_map.get(self.attachment_addr, 0)
else:
block = block[24:] + stream.read(self.block_len - CN_BLOCK_SIZE)
links_nr = self.links_nr
links = unpack_from(f"<{links_nr}Q", block)
params = unpack_from(v4c.FMT_CHANNEL_PARAMS, block, links_nr * 8)
(
self.next_ch_addr,
self.component_addr,
self.name_addr,
self.source_addr,
self.conversion_addr,
self.data_block_addr,
self.unit_addr,
self.comment_addr,
) = links[:8]
at_map = kwargs.get("at_map", {})
if params[10]:
self.attachment_addr = links[8]
self.attachment = at_map.get(links[8], 0)
self.links_nr -= params[10] - 1
self.block_len -= (params[10] - 1) * 8
params = (
params[0],
params[1],
params[2],
params[3],
params[4],
params[5],
params[6],
params[7],
params[8],
params[9],
1,
params[11],
params[12],
params[13],
params[14],
params[15],
params[16],
)
if params[6] & v4c.FLAG_CN_DEFAULT_X:
(
self.default_X_dg_addr,
self.default_X_cg_addr,
self.default_X_ch_addr,
) = links[-3:]
# default X not supported yet
(
self.default_X_dg_addr,
self.default_X_cg_addr,
self.default_X_ch_addr,
) = (0, 0, 0)
(
self.channel_type,
self.sync_type,
self.data_type,
self.bit_offset,
self.byte_offset,
self.bit_count,
self.flags,
self.pos_invalidation_bit,
self.precision,
self.reserved1,
self.attachment_nr,
self.min_raw_value,
self.max_raw_value,
self.lower_limit,
self.upper_limit,
self.lower_ext_limit,
self.upper_ext_limit,
) = params
parsed_strings = kwargs["parsed_strings"]
if parsed_strings is None:
self.name = get_text_v4(self.name_addr, stream, file_limit=file_limit)
self.comment = get_text_v4(self.comment_addr, stream, file_limit=file_limit)
if kwargs["use_display_names"]:
self.display_names = extract_display_names(self.comment)
else:
self.display_names = {}
else:
self.name, self.display_names, self.comment = parsed_strings
self.unit = get_text_v4(self.unit_addr, stream, mapped=mapped, file_limit=file_limit)
si_map = kwargs["si_map"]
cc_map = kwargs["cc_map"]
address = self.conversion_addr
if address:
try:
if address in cc_map:
conv = cc_map[address]
else:
if address + 16 > file_limit:
handle_incomplete_block(address, file_limit)
raise MdfException(f"Incomplete block at {address:x}")
stream.seek(address + 8)
(size,) = UINT64_u(stream.read(8))
if address + size > file_limit:
handle_incomplete_block(address, file_limit)
raise MdfException(f"Incomplete block at {address:x}")
stream.seek(address)
raw_bytes = stream.read(size)
if raw_bytes in cc_map:
conv = cc_map[raw_bytes]
else:
conv = ChannelConversion(
raw_bytes=raw_bytes,
stream=stream,
address=address,
mapped=mapped,
file_limit=file_limit,
)
cc_map[raw_bytes] = cc_map[address] = conv
except:
logger.warning(
f"Channel conversion parsing error: {format_exc()}. The error is ignored and the channel conversion is None"
)
conv = None
self.conversion = conv
else:
self.conversion = None
address = self.source_addr
if address:
try:
if address in si_map:
source = si_map[address]
else:
if address + v4c.SI_BLOCK_SIZE > file_limit:
handle_incomplete_block(address, file_limit)
raise MdfException(f"Incomplete block at {address:x}")
stream.seek(address)
raw_bytes = stream.read(v4c.SI_BLOCK_SIZE)
if raw_bytes in si_map:
source = si_map[raw_bytes]
else:
source = SourceInformation(
raw_bytes=raw_bytes,
stream=stream,
address=address,
mapped=mapped,
file_limit=file_limit,
)
si_map[raw_bytes] = si_map[address] = source
except:
logger.warning(
f"Channel source parsing error: {format_exc()}. The error is ignored and the channel source is None"
)
source = None
self.source = source
else:
self.source = None
else:
self.address = 0
self.name = self.comment = self.unit = ""
self.display_names = {}
self.conversion = self.source = self.attachment = None
(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.next_ch_addr,
self.component_addr,
self.name_addr,
self.source_addr,
self.conversion_addr,
self.data_block_addr,
self.unit_addr,
self.comment_addr,
self.channel_type,
self.sync_type,
self.data_type,
self.bit_offset,
self.byte_offset,
self.bit_count,
self.flags,
self.pos_invalidation_bit,
self.precision,
self.reserved1,
self.attachment_nr,
self.min_raw_value,
self.max_raw_value,
self.lower_limit,
self.upper_limit,
self.lower_ext_limit,
self.upper_ext_limit,
) = (
b"##CN",
0,
v4c.CN_BLOCK_SIZE,
8,
0,
0,
0,
0,
0,
0,
0,
0,
kwargs.get("channel_type", v4c.CHANNEL_TYPE_VALUE),
kwargs.get("sync_type", 0),
kwargs.get("data_type", v4c.DATA_TYPE_UNSIGNED_INTEL),
kwargs.get("bit_offset", 0),
kwargs.get("byte_offset", 0),
kwargs.get("bit_count", 32),
kwargs.get("flags", 0),
kwargs.get("pos_invalidation_bit", 0),
kwargs.get("precision", 3),
0,
0,
kwargs.get("min_raw_value", 0),
kwargs.get("max_raw_value", 0),
kwargs.get("lower_limit", 0),
kwargs.get("upper_limit", 0),
kwargs.get("lower_ext_limit", 0),
kwargs.get("upper_ext_limit", 0),
)
if "attachment_addr" in kwargs:
self.attachment_addr = kwargs["attachment_addr"]
self.block_len += 8
self.links_nr += 1
self.attachment_nr = 1
# ignore MLSD signal data
if self.channel_type == v4c.CHANNEL_TYPE_MLSD:
self.data_block_addr = 0
self.channel_type = v4c.CHANNEL_TYPE_VALUE
if self.name in self.display_names:
del self.display_names[self.name]
self.standard_C_size = True
self.fast_path: tuple[int, int, int, int, int, np.dtype[Any]] | None = None
def __getitem__(self, item: str) -> object:
return getattr(self, item)
def __setitem__(self, item: str, value: object) -> None:
setattr(self, item, value)
def to_blocks(
self,
address: int,
blocks: list[bytes | SupportsBytes],
defined_texts: dict[bytes | str, int],
cc_map: dict[bytes | int, int],
si_map: dict[bytes | int, int],
) -> int:
text = self.name
if text in defined_texts:
self.name_addr = defined_texts[text]
else:
tx_block = TextBlock(
text=text.encode("utf-8", "replace"),
meta=False,
safe=True,
)
self.name_addr = defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
text = self.unit
if text in defined_texts:
self.unit_addr = defined_texts[text]
else:
tx_block = TextBlock(
text=text.encode("utf-8", "replace"),
meta=False,
safe=True,
)
self.unit_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
comment = self.comment
display_names = self.display_names
if display_names:
items = []
for _name, description in display_names.items():
description = "display"
items.append(f"<{description}>{_name}</{description}>")
display_names_tags = "\n".join(items)
else:
display_names_tags = ""
if display_names_tags and not comment:
text = v4c.CN_COMMENT_TEMPLATE.format("", display_names_tags)
elif display_names_tags and comment:
if not comment.startswith("<CN"):
text = v4c.CN_COMMENT_TEMPLATE.format(escape_xml_string(comment), display_names_tags)
else:
if any(_name not in comment for _name in display_names):
try:
CNcomment = ET.fromstring(comment.replace(' xmlns="http://www.asam.net/mdf/v4"', ""))
elem = CNcomment.find("TX")
if elem is None:
raise RuntimeError("cannot find 'TX' element")
tx_tag = elem.text or ""
text = v4c.CN_COMMENT_TEMPLATE.format(escape_xml_string(tx_tag), display_names_tags)
except UnicodeEncodeError:
text = comment
else:
text = comment
else:
text = comment
if text in defined_texts:
self.comment_addr = defined_texts[text]
else:
meta = text.startswith("<CN")
tx_block = TextBlock(
text=text.encode("utf-8", "replace"),
meta=meta,
safe=True,
)
self.comment_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
conversion = self.conversion
if conversion:
address = conversion.to_blocks(address, blocks, defined_texts, cc_map)
self.conversion_addr = conversion.address
else:
self.conversion_addr = 0
source = self.source
if source:
address = source.to_blocks(address, blocks, defined_texts, si_map)
self.source_addr = source.address
else:
self.source_addr = 0
blocks.append(self)
# keep extra space for an eventual attachment address
blocks.append(EIGHT_BYTES)
self.address = address
address += self.block_len + 8
return address
def __bytes__(self) -> bytes:
if self.block_len == v4c.CN_BLOCK_SIZE:
return v4c.SIMPLE_CHANNEL_PACK(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.next_ch_addr,
self.component_addr,
self.name_addr,
self.source_addr,
self.conversion_addr,
self.data_block_addr,
self.unit_addr,
self.comment_addr,
self.channel_type,
self.sync_type,
self.data_type,
self.bit_offset,
self.byte_offset,
self.bit_count,
self.flags,
self.pos_invalidation_bit,
self.precision,
self.reserved1,
self.attachment_nr,
self.min_raw_value,
self.max_raw_value,
self.lower_limit,
self.upper_limit,
self.lower_ext_limit,
self.upper_ext_limit,
)
elif self.attachment_nr == 1:
return v4c.SINGLE_ATTACHMENT_CHANNEL_PACK(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.next_ch_addr,
self.component_addr,
self.name_addr,
self.source_addr,
self.conversion_addr,
self.data_block_addr,
self.unit_addr,
self.comment_addr,
self.attachment_addr,
self.channel_type,
self.sync_type,
self.data_type,
self.bit_offset,
self.byte_offset,
self.bit_count,
self.flags,
self.pos_invalidation_bit,
self.precision,
self.reserved1,
self.attachment_nr,
self.min_raw_value,
self.max_raw_value,
self.lower_limit,
self.upper_limit,
self.lower_ext_limit,
self.upper_ext_limit,
)
else:
fmt = v4c.FMT_CHANNEL.format(self.links_nr)
keys: tuple[str, ...] = (
"id",
"reserved0",
"block_len",
"links_nr",
"next_ch_addr",
"component_addr",
"name_addr",
"source_addr",
"conversion_addr",
"data_block_addr",
"unit_addr",
"comment_addr",
)
if self.attachment_nr:
keys += ("attachment_addr",)
if self.flags & v4c.FLAG_CN_DEFAULT_X:
keys += ("default_X_dg_addr", "default_X_cg_addr", "default_X_ch_addr")
keys += (
"channel_type",
"sync_type",
"data_type",
"bit_offset",
"byte_offset",
"bit_count",
"flags",
"pos_invalidation_bit",
"precision",
"reserved1",
"attachment_nr",
"min_raw_value",
"max_raw_value",
"lower_limit",
"upper_limit",
"lower_ext_limit",
"upper_ext_limit",
)
return pack(fmt, *[getattr(self, key) for key in keys])
def __str__(self) -> str:
return f"""<Channel (name: {self.name}, unit: {self.unit}, comment: {self.comment}, address: {hex(self.address)},
conversion: {self.conversion},
source: {self.source},
fields: {", ".join(block_fields(self))})>"""
def metadata(self) -> str:
keys: tuple[str, ...]
if self.block_len == v4c.CN_BLOCK_SIZE:
keys = v4c.KEYS_SIMPLE_CHANNEL
else:
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"next_ch_addr",
"component_addr",
"name_addr",
"source_addr",
"conversion_addr",
"data_block_addr",
"unit_addr",
"comment_addr",
)
if self.attachment_nr:
keys += ("attachment_addr",)
if self.flags & v4c.FLAG_CN_DEFAULT_X:
keys += ("default_X_dg_addr", "default_X_cg_addr", "default_X_ch_addr")
keys += (
"channel_type",
"sync_type",
"data_type",
"bit_offset",
"byte_offset",
"bit_count",
"flags",
"pos_invalidation_bit",
"precision",
"reserved1",
"attachment_nr",
"min_raw_value",
"max_raw_value",
"lower_limit",
"upper_limit",
"lower_ext_limit",
"upper_ext_limit",
)
max_len = max(len(key) for key in keys)
template = f"{{: <{max_len}}}: {{}}"
metadata: list[str] = []
lines = f"""
name: {self.name}
display names: {self.display_names}
address: {hex(self.address)}
comment: {self.comment}
unit: {self.unit}
""".split("\n")
for key in keys:
val = getattr(self, key)
if key.endswith("addr") or key.startswith("text_"):
lines.append(template.format(key, hex(val)))
elif isinstance(val, float):
lines.append(template.format(key, val))
else:
if isinstance(val, bytes):
lines.append(template.format(key, val.strip(b"\0")))
else:
lines.append(template.format(key, val))
if key == "data_type":
lines[-1] += f" = {v4c.DATA_TYPE_TO_STRING[self.data_type]}"
elif key == "channel_type":
lines[-1] += f" = {v4c.CHANNEL_TYPE_TO_STRING[self.channel_type]}"
elif key == "sync_type":
lines[-1] += f" = {v4c.SYNC_TYPE_TO_STRING[self.sync_type]}"
elif key == "flags":
flags = []
for fl, string in v4c.FLAG_CN_TO_STRING.items():
if self.flags & fl:
flags.append(string)
if flags:
lines[-1] += f" [0x{self.flags:X}= {', '.join(flags)}]"
for line in lines:
if not line:
metadata.append(line)
else:
for wrapped_line in wrap(line, width=120):
metadata.append(wrapped_line)
return "\n".join(metadata)
def __contains__(self, item: str) -> bool:
return hasattr(self, item)
def __lt__(self, other: "Channel") -> bool:
self_byte_offset = self.byte_offset
other_byte_offset = other.byte_offset
if self_byte_offset < other_byte_offset:
result = True
elif self_byte_offset == other_byte_offset:
self_range = self.bit_offset + self.bit_count
other_range = other.bit_offset + other.bit_count
if self_range > other_range:
result = True
else:
result = False
else:
result = False
return result
class _ChannelArrayBlockBase:
__slots__ = (
"address",
"axis_channels",
"axis_conversions",
"block_len",
"byte_offset_base",
"ca_type",
"comparison_quantity_channel",
"dims",
"dynamic_size_channels",
"flags",
"id",
"input_quantity_channels",
"invalidation_bit_base",
"links_nr",
"output_quantity_channel",
"reserved0",
"storage",
)
class ChannelArrayBlockKwargs(BlockKwargs, total=False):
ca_type: int
dims: int
dim_size_0: int
flags: int
byte_offset_base: int
invalidation_bit_base: int
file_limit: int | float
cc_map: dict[bytes | int, "ChannelConversion"]
class ChannelArrayBlock(_ChannelArrayBlockBase):
"""
Other attributes:
* ``address`` - int : array block address
* ``axis_channels`` - list : list of (group index, channel index)
pairs referencing the axis of this array block
* ``dynamic_size_channels`` - list : list of (group index, channel index)
pairs referencing the axis dynamic size of this array block
* ``input_quantity_channels`` - list : list of (group index, channel index)
pairs referencing the input quantity channels of this array block
* ``output_quantity_channels`` - tuple | None : (group index, channel index)
pair referencing the output quantity channel of this array block
* ``comparison_quantity_channel`` - tuple | None : (group index, channel index)
pair referencing the comparison quantity channel of this array block
"""
def __init__(self, **kwargs: Unpack[ChannelArrayBlockKwargs]) -> None:
self.axis_channels: list[tuple[int, int] | None] = []
self.dynamic_size_channels: list[tuple[int, int] | None] = []
self.input_quantity_channels: list[tuple[int, int] | None] = []
self.output_quantity_channel: tuple[int, int] | None = None
self.comparison_quantity_channel: tuple[int, int] | None = None
try:
self.address = address = kwargs["address"]
stream = kwargs["stream"]
mapped = kwargs.get("mapped", False) or not is_file_like(stream)
cc_map = kwargs["cc_map"]
file_limit = kwargs["file_limit"]
if address + COMMON_SIZE > file_limit:
handle_incomplete_block(address, file_limit)
raise KeyError
if mapped:
self.id, self.reserved0, self.block_len, self.links_nr = COMMON_uf(stream, address)
if self.id != b"##CA":
message = f'Expected "##CA" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
nr = self.links_nr
address += COMMON_SIZE
links: tuple[int, ...] = unpack_from(f"<{nr}Q", stream, address)
self.composition_addr = links[0]
links = links[1:]
address += nr * 8
values: tuple[int, int, int, int, int, int] = unpack_from("<2BHIiI", stream, address)
dims_nr = values[2]
(
self.ca_type,
self.storage,
self.dims,
self.flags,
self.byte_offset_base,
self.invalidation_bit_base,
) = values
address += 16
dim_sizes: tuple[int, ...] = unpack_from(f"<{dims_nr}Q", stream, address)
for i, size in enumerate(dim_sizes):
self[f"dim_size_{i}"] = size
stream.seek(address + dims_nr * 8)
if self.storage == v4c.CA_STORAGE_TYPE_DG_TEMPLATE:
data_links_nr = 1
for size in dim_sizes:
data_links_nr *= size
for i in range(data_links_nr):
self[f"data_link_{i}"] = links[i]
links = links[data_links_nr:]
if self.flags & v4c.FLAG_CA_DYNAMIC_AXIS:
for i in range(dims_nr):
self[f"dynamic_size_{i}_dg_addr"] = links[3 * i]
self[f"dynamic_size_{i}_cg_addr"] = links[3 * i + 1]
self[f"dynamic_size_{i}_ch_addr"] = links[3 * i + 2]
links = links[dims_nr * 3 :]
if self.flags & v4c.FLAG_CA_INPUT_QUANTITY:
for i in range(dims_nr):
self[f"input_quantity_{i}_dg_addr"] = links[3 * i]
self[f"input_quantity_{i}_cg_addr"] = links[3 * i + 1]
self[f"input_quantity_{i}_ch_addr"] = links[3 * i + 2]
links = links[dims_nr * 3 :]
if self.flags & v4c.FLAG_CA_OUTPUT_QUANTITY:
self.output_quantity_dg_addr = links[0]
self.output_quantity_cg_addr = links[1]
self.output_quantity_ch_addr = links[2]
links = links[3:]
if self.flags & v4c.FLAG_CA_COMPARISON_QUANTITY:
self.comparison_quantity_dg_addr = links[0]
self.comparison_quantity_cg_addr = links[1]
self.comparison_quantity_ch_addr = links[2]
links = links[3:]
if self.flags & v4c.FLAG_CA_AXIS:
for i in range(dims_nr):
address = links[i]
self[f"axis_conversion_{i}_addr"] = address
if address:
if address in cc_map:
conv = cc_map[address]
else:
if address + 16 > file_limit:
handle_incomplete_block(address, file_limit)
raise MdfException(f"Incomplete block at {address:x}")
(size,) = UINT64_uf(stream, address + 8)
if address + size > file_limit:
handle_incomplete_block(address, file_limit)
raise MdfException(f"Incomplete block at {address:x}")
raw_bytes = stream[address : address + size]
if raw_bytes in cc_map:
conv = cc_map[raw_bytes]
else:
conv = ChannelConversion(
raw_bytes=raw_bytes,
stream=stream,
address=address,
mapped=mapped,
file_limit=file_limit,
)
cc_map[raw_bytes] = cc_map[address] = conv
else:
conv = None
self[f"axis_conversion_{i}"] = conv
links = links[dims_nr:]
if (self.flags & v4c.FLAG_CA_AXIS) and not (self.flags & v4c.FLAG_CA_FIXED_AXIS):
for i in range(dims_nr):
self[f"scale_axis_{i}_dg_addr"] = links[3 * i]
self[f"scale_axis_{i}_cg_addr"] = links[3 * i + 1]
self[f"scale_axis_{i}_ch_addr"] = links[3 * i + 2]
links = links[dims_nr * 3 :]
if self.flags & v4c.FLAG_CA_FIXED_AXIS:
for i in range(dims_nr):
for j in range(typing.cast(int, self[f"dim_size_{i}"])):
(value,) = FLOAT64_u(stream.read(8))
self[f"axis_{i}_value_{j}"] = value
else:
stream.seek(address)
self.id, self.reserved0, self.block_len, self.links_nr = COMMON_u(stream.read(24))
if self.id != b"##CA":
message = f'Expected "##CA" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
nr = self.links_nr
links = unpack(f"<{nr}Q", stream.read(8 * nr))
self.composition_addr = links[0]
links = links[1:]
values = unpack("<2BHIiI", stream.read(16))
dims_nr = values[2]
(
self.ca_type,
self.storage,
self.dims,
self.flags,
self.byte_offset_base,
self.invalidation_bit_base,
) = values
dim_sizes = unpack(f"<{dims_nr}Q", stream.read(8 * dims_nr))
for i, size in enumerate(dim_sizes):
self[f"dim_size_{i}"] = size
if self.storage == v4c.CA_STORAGE_TYPE_DG_TEMPLATE:
data_links_nr = 1
for size in dim_sizes:
data_links_nr *= size
for i in range(data_links_nr):
self[f"data_link_{i}"] = links[i]
links = links[data_links_nr:]
if self.flags & v4c.FLAG_CA_DYNAMIC_AXIS:
for i in range(dims_nr):
self[f"dynamic_size_{i}_dg_addr"] = links[3 * i]
self[f"dynamic_size_{i}_cg_addr"] = links[3 * i + 1]
self[f"dynamic_size_{i}_ch_addr"] = links[3 * i + 2]
links = links[dims_nr * 3 :]
if self.flags & v4c.FLAG_CA_INPUT_QUANTITY:
for i in range(dims_nr):
self[f"input_quantity_{i}_dg_addr"] = links[3 * i]
self[f"input_quantity_{i}_cg_addr"] = links[3 * i + 1]
self[f"input_quantity_{i}_ch_addr"] = links[3 * i + 2]
links = links[dims_nr * 3 :]
if self.flags & v4c.FLAG_CA_OUTPUT_QUANTITY:
self.output_quantity_dg_addr = links[0]
self.output_quantity_cg_addr = links[1]
self.output_quantity_ch_addr = links[2]
links = links[3:]
if self.flags & v4c.FLAG_CA_COMPARISON_QUANTITY:
self.comparison_quantity_dg_addr = links[0]
self.comparison_quantity_cg_addr = links[1]
self.comparison_quantity_ch_addr = links[2]
links = links[3:]
if self.flags & v4c.FLAG_CA_AXIS:
current_address = stream.tell()
for i in range(dims_nr):
address = links[i]
self[f"axis_conversion_{i}_addr"] = address
if address:
if address in cc_map:
conv = cc_map[address]
else:
if address + 16 > file_limit:
handle_incomplete_block(address, file_limit)
raise MdfException(f"Incomplete block at {address:x}")
stream.seek(address + 8)
(size,) = UINT64_uf(stream.read(8))
if address + size > file_limit:
handle_incomplete_block(address, file_limit)
raise MdfException(f"Incomplete block at {address:x}")
stream.seek(address)
raw_bytes = stream.read(size)
if raw_bytes in cc_map:
conv = cc_map[raw_bytes]
else:
conv = ChannelConversion(
raw_bytes=raw_bytes,
stream=stream,
address=address,
mapped=mapped,
file_limit=file_limit,
)
cc_map[raw_bytes] = cc_map[address] = conv
else:
conv = None
self[f"axis_conversion_{i}"] = conv
links = links[dims_nr:]
stream.seek(current_address)
if (self.flags & v4c.FLAG_CA_AXIS) and not (self.flags & v4c.FLAG_CA_FIXED_AXIS):
for i in range(dims_nr):
self[f"scale_axis_{i}_dg_addr"] = links[3 * i]
self[f"scale_axis_{i}_cg_addr"] = links[3 * i + 1]
self[f"scale_axis_{i}_ch_addr"] = links[3 * i + 2]
links = links[dims_nr * 3 :]
if self.flags & v4c.FLAG_CA_FIXED_AXIS:
for i in range(dims_nr):
for j in range(typing.cast(int, self[f"dim_size_{i}"])):
(value,) = FLOAT64_u(stream.read(8))
self[f"axis_{i}_value_{j}"] = value
except KeyError:
self.id = b"##CA"
self.reserved0 = 0
self.address = 0
ca_type = kwargs["ca_type"]
if ca_type == v4c.CA_TYPE_ARRAY:
dims_nr = kwargs["dims"]
self.block_len = 48 + dims_nr * 8
self.links_nr = 1
self.composition_addr = 0
self.ca_type = v4c.CA_TYPE_ARRAY
self.storage = v4c.CA_STORAGE_TYPE_CN_TEMPLATE
self.dims = dims_nr
self.flags = 0
self.byte_offset_base = kwargs.get("byte_offset_base", 1)
self.invalidation_bit_base = kwargs.get("invalidation_bit_base", 0)
for i in range(dims_nr):
self[f"dim_size_{i}"] = kwargs[f"dim_size_{i}"] # type: ignore[literal-required]
elif ca_type == v4c.CA_TYPE_SCALE_AXIS:
self.block_len = 56
self.links_nr = 1
self.composition_addr = 0
self.ca_type = v4c.CA_TYPE_SCALE_AXIS
self.storage = v4c.CA_STORAGE_TYPE_CN_TEMPLATE
self.dims = 1
self.flags = 0
self.byte_offset_base = kwargs.get("byte_offset_base", 1)
self.invalidation_bit_base = kwargs.get("invalidation_bit_base", 0)
self.dim_size_0 = kwargs["dim_size_0"]
elif ca_type == v4c.CA_TYPE_LOOKUP:
flags = kwargs["flags"]
dims_nr = kwargs["dims"]
dim_size_sum = sum(kwargs[f"dim_size_{i}"] for i in range(dims_nr)) # type: ignore[literal-required]
if flags & v4c.FLAG_CA_FIXED_AXIS:
self.block_len = 48 + dims_nr * 16 + dim_size_sum * 8
self.links_nr = 1 + dims_nr
self.composition_addr = 0
for i in range(dims_nr):
self[f"axis_conversion_{i}_addr"] = 0
self[f"axis_conversion_{i}"] = kwargs.get(f"axis_conversion_{i}", None)
self.ca_type = v4c.CA_TYPE_LOOKUP
self.storage = v4c.CA_STORAGE_TYPE_CN_TEMPLATE
self.dims = dims_nr
self.flags = v4c.FLAG_CA_FIXED_AXIS | v4c.FLAG_CA_AXIS
self.byte_offset_base = kwargs.get("byte_offset_base", 1)
self.invalidation_bit_base = kwargs.get("invalidation_bit_base", 0)
for i in range(dims_nr):
self[f"dim_size_{i}"] = kwargs[f"dim_size_{i}"] # type: ignore[literal-required]
for i in range(dims_nr):
for j in range(typing.cast(int, self[f"dim_size_{i}"])):
self[f"axis_{i}_value_{j}"] = kwargs.get(f"axis_{i}_value_{j}", j)
else:
self.block_len = 48 + dims_nr * 5 * 8
self.links_nr = 1 + dims_nr * 4
self.composition_addr = 0
for i in range(dims_nr):
self[f"axis_conversion_{i}_addr"] = 0
self[f"axis_conversion_{i}"] = kwargs.get(f"axis_conversion_{i}", None)
for i in range(dims_nr):
self[f"scale_axis_{i}_dg_addr"] = 0
self[f"scale_axis_{i}_cg_addr"] = 0
self[f"scale_axis_{i}_ch_addr"] = 0
self.ca_type = v4c.CA_TYPE_LOOKUP
self.storage = v4c.CA_STORAGE_TYPE_CN_TEMPLATE
self.dims = dims_nr
self.flags = v4c.FLAG_CA_AXIS
self.byte_offset_base = kwargs.get("byte_offset_base", 1)
self.invalidation_bit_base = kwargs.get("invalidation_bit_base", 0)
for i in range(dims_nr):
self[f"dim_size_{i}"] = kwargs[f"dim_size_{i}"] # type: ignore[literal-required]
def __getitem__(self, item: str) -> object:
return getattr(self, item)
def __setitem__(self, item: str, value: object) -> None:
setattr(self, item, value)
def __str__(self) -> str:
return f"<ChannelArrayBlock (referenced channels: {self.axis_channels}, address: {hex(self.address)}, fields: {inspect.getmembers(self)})>"
def __bytes__(self) -> bytes:
flags = self.flags
dims_nr = self.dims
keys: tuple[str, ...] = (
"id",
"reserved0",
"block_len",
"links_nr",
"composition_addr",
)
if self.storage:
dim_sizes = [typing.cast(int, self[f"dim_size_{i}"]) for i in range(dims_nr)]
data_links_nr = 1
for size in dim_sizes:
data_links_nr *= size
else:
dim_sizes = []
data_links_nr = 0
if self.storage == v4c.CA_STORAGE_TYPE_DG_TEMPLATE:
keys += tuple(f"data_link_{i}" for i in range(data_links_nr))
if flags & v4c.FLAG_CA_DYNAMIC_AXIS:
for i in range(dims_nr):
keys += (
f"dynamic_size_{i}_dg_addr",
f"dynamic_size_{i}_cg_addr",
f"dynamic_size_{i}_ch_addr",
)
if flags & v4c.FLAG_CA_INPUT_QUANTITY:
for i in range(dims_nr):
keys += (
f"input_quantity_{i}_dg_addr",
f"input_quantity_{i}_cg_addr",
f"input_quantity_{i}_ch_addr",
)
if flags & v4c.FLAG_CA_OUTPUT_QUANTITY:
keys += (
"output_quantity_dg_addr",
"output_quantity_cg_addr",
"output_quantity_ch_addr",
)
if flags & v4c.FLAG_CA_COMPARISON_QUANTITY:
keys += (
"comparison_quantity_dg_addr",
"comparison_quantity_cg_addr",
"comparison_quantity_ch_addr",
)
if flags & v4c.FLAG_CA_AXIS:
keys += tuple(f"axis_conversion_{i}_addr" for i in range(dims_nr))
if (flags & v4c.FLAG_CA_AXIS) and not (flags & v4c.FLAG_CA_FIXED_AXIS):
for i in range(dims_nr):
keys += (
f"scale_axis_{i}_dg_addr",
f"scale_axis_{i}_cg_addr",
f"scale_axis_{i}_ch_addr",
)
keys += (
"ca_type",
"storage",
"dims",
"flags",
"byte_offset_base",
"invalidation_bit_base",
)
keys += tuple(f"dim_size_{i}" for i in range(dims_nr))
if flags & v4c.FLAG_CA_FIXED_AXIS:
keys += tuple(
f"axis_{i}_value_{j}" for i in range(dims_nr) for j in range(typing.cast(int, self[f"dim_size_{i}"]))
)
dim_sizes = [1 for i in range(dims_nr) for j in range(typing.cast(int, self[f"dim_size_{i}"]))]
if self.storage:
keys += tuple(f"cycle_count_{i}" for i in range(data_links_nr))
fmt = f"<4sI{self.links_nr + 2}Q2BHIiI{dims_nr}Q{sum(dim_sizes)}d{data_links_nr}Q"
result = pack(fmt, *[getattr(self, key) for key in keys])
return result
def get_axes_information(self) -> list:
axes = []
for i in range(self.dims):
info = {}
match self.ca_type:
case v4c.CA_TYPE_ARRAY:
info["type"] = "NO_AXIS"
info["size"] = typing.cast(int, self[f"dim_size_{i}"])
case v4c.CA_TYPE_SCALE_AXIS:
info["type"] = "SCALE_AXIS"
info["size"] = typing.cast(int, self[f"dim_size_{i}"])
case _:
if self.flags & v4c.FLAG_CA_FIXED_AXIS:
info["type"] = "FIXED_AXIS"
info["size"] = typing.cast(int, self[f"dim_size_{i}"])
info["values"] = [
self[f"axis_{i}_value_{j}"] for j in range(typing.cast(int, self[f"dim_size_{i}"]))
]
else:
info["type"] = "REF_AXIS"
info["size"] = typing.cast(int, self[f"dim_size_{i}"])
info["ref"] = self.axis_channels[i]
info["ref_name"] = ""
if self.flags & v4c.FLAG_CA_AXIS:
info["conversion"] = self[f"axis_conversion_{i}"]
else:
info["conversion"] = None
info["inverse_layout"] = self.flags & v4c.FLAG_CA_INVERSE_LAYOUT
info["byte_offset_base"] = self.byte_offset_base
info["dtype_field_name"] = ""
axes.append(info)
return axes
def get_byte_offset_factors(self) -> list[int]:
"""Return a list of factors f(d), used to calculate byte offset."""
return self._factors(self.byte_offset_base)
def get_bit_pos_inval_factors(self) -> list[int]:
"""Return a list of factors f(d), used to calculate invalidation bit
position.
"""
return self._factors(self.invalidation_bit_base)
def _factors(self, base: int) -> list[int]:
factor = base
factors = [factor]
for i in range(1, self.dims):
factor *= typing.cast(int, self[f"dim_size_{i - 1}"])
factors.append(factor)
# row oriented layout
if not (self.flags & v4c.FLAG_CA_INVERSE_LAYOUT):
factors = factors[::-1]
return factors
def to_blocks(
self,
address: int,
blocks: list[bytes | SupportsBytes],
defined_texts: dict[bytes | str, int],
cc_map: dict[bytes | int, int],
) -> int:
if self.flags & v4c.FLAG_CA_AXIS:
for i in range(self.dims):
conversion = self[f"axis_conversion_{i}"]
if conversion:
address = conversion.to_blocks(address, blocks, defined_texts, cc_map)
self[f"axis_conversion_{i}_addr"] = conversion.address
else:
self[f"axis_conversion_{i}_addr"] = 0
blocks.append(self)
self.address = address
address += self.block_len
return address
class ChannelGroupKwargs(BlockKwargs, total=False):
reserved0: int
next_cg_addr: int
first_ch_addr: int
acq_name_addr: int
acq_source_addr: int
first_sample_reduction_addr: int
comment_addr: int
record_id: int
cycles_nr: int
flags: int
path_separator: int
reserved1: int
samples_byte_nr: int
invalidation_bytes_nr: int
cg_master_addr: int
si_map: dict[Union[bytes, int, "Source"], "SourceInformation"]
file_limit: int | float
[docs]
class ChannelGroup:
"""`ChannelGroup` has the following attributes, which are also available as
dict-like key-value pairs.
CGBLOCK fields:
* ``id`` - bytes : block ID; always b'##CG'
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``next_cg_addr`` - int : next channel group address
* ``first_ch_addr`` - int : address of first channel of this channel group
* ``acq_name_addr`` - int : address of TextBLock that contains the channel
group acquisition name
* ``acq_source_addr`` - int : address of SourceInformation that contains
the channel group source
* ``first_sample_reduction_addr`` - int : address of first SRBLOCK; this is
considered 0 since sample reduction is not yet supported
* ``comment_addr`` - int : address of TXBLOCK/MDBLOCK that contains the
channel group comment
* ``record_id`` - int : record ID for the channel group
* ``cycles_nr`` - int : number of cycles for this channel group
* ``flags`` - int : channel group flags
* ``path_separator`` - int : ordinal for character used as path separator
* ``reserved1`` - int : reserved bytes
* ``samples_byte_nr`` - int : number of bytes used for channels samples in
the record for this channel group; this does not contain the invalidation
bytes
* ``invalidation_bytes_nr`` - int : number of bytes used for invalidation
bits by this channel group
Other attributes:
* ``acq_name`` - str : acquisition name
* ``acq_source`` - SourceInformation : acquisition source information
* ``address`` - int : channel group address
* ``comment`` - str : channel group comment
"""
__slots__ = (
"acq_name",
"acq_name_addr",
"acq_source",
"acq_source_addr",
"address",
"block_len",
"cg_master_addr",
"cg_master_index",
"comment",
"comment_addr",
"cycles_nr",
"first_ch_addr",
"first_sample_reduction_addr",
"flags",
"id",
"invalidation_bytes_nr",
"links_nr",
"next_cg_addr",
"path_separator",
"record_id",
"reserved0",
"reserved1",
"samples_byte_nr",
)
def __init__(self, **kwargs: Unpack[ChannelGroupKwargs]) -> None:
self.comment = ""
self.acq_name: str | None = ""
self.acq_source = None
self.cg_master_index: int | None = None
try:
self.address = address = kwargs["address"]
stream = kwargs["stream"]
mapped = kwargs.get("mapped", False) or not is_file_like(stream)
file_limit = kwargs["file_limit"]
if address + COMMON_SIZE > file_limit:
handle_incomplete_block(address, file_limit)
raise KeyError
if mapped:
self.id, self.reserved0, self.block_len, self.links_nr = COMMON_uf(stream, address)
if address + self.block_len > file_limit:
handle_incomplete_block(address, file_limit)
raise KeyError
if self.block_len == v4c.CG_BLOCK_SIZE:
(
self.next_cg_addr,
self.first_ch_addr,
self.acq_name_addr,
self.acq_source_addr,
self.first_sample_reduction_addr,
self.comment_addr,
self.record_id,
self.cycles_nr,
self.flags,
self.path_separator,
self.reserved1,
self.samples_byte_nr,
self.invalidation_bytes_nr,
) = v4c.CHANNEL_GROUP_SHORT_uf(stream, address + COMMON_SIZE)
else:
(
self.next_cg_addr,
self.first_ch_addr,
self.acq_name_addr,
self.acq_source_addr,
self.first_sample_reduction_addr,
self.comment_addr,
self.cg_master_addr,
self.record_id,
self.cycles_nr,
self.flags,
self.path_separator,
self.reserved1,
self.samples_byte_nr,
self.invalidation_bytes_nr,
) = v4c.CHANNEL_GROUP_RM_SHORT_uf(stream, address + COMMON_SIZE)
else:
stream.seek(address)
(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
) = v4c.COMMON_u(stream.read(COMMON_SIZE))
if address + self.block_len > file_limit:
handle_incomplete_block(address, file_limit)
raise KeyError
if self.block_len == v4c.CG_BLOCK_SIZE:
(
self.next_cg_addr,
self.first_ch_addr,
self.acq_name_addr,
self.acq_source_addr,
self.first_sample_reduction_addr,
self.comment_addr,
self.record_id,
self.cycles_nr,
self.flags,
self.path_separator,
self.reserved1,
self.samples_byte_nr,
self.invalidation_bytes_nr,
) = v4c.CHANNEL_GROUP_SHORT_u(stream.read(v4c.CG_BLOCK_SIZE - COMMON_SIZE))
else:
(
self.next_cg_addr,
self.first_ch_addr,
self.acq_name_addr,
self.acq_source_addr,
self.first_sample_reduction_addr,
self.comment_addr,
self.cg_master_addr,
self.record_id,
self.cycles_nr,
self.flags,
self.path_separator,
self.reserved1,
self.samples_byte_nr,
self.invalidation_bytes_nr,
) = v4c.CHANNEL_GROUP_RM_SHORT_u(stream.read(v4c.CG_RM_BLOCK_SIZE - COMMON_SIZE))
if self.id != b"##CG":
message = f'Expected "##CG" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
self.acq_name = get_text_v4(self.acq_name_addr, stream, mapped=mapped, file_limit=file_limit)
self.comment = get_text_v4(self.comment_addr, stream, mapped=mapped, file_limit=file_limit)
si_map = kwargs["si_map"]
address = self.acq_source_addr
if address:
if address + v4c.SI_BLOCK_SIZE > file_limit:
source = None
else:
if mapped:
raw_bytes = stream[address : address + v4c.SI_BLOCK_SIZE]
else:
stream.seek(address)
raw_bytes = stream.read(v4c.SI_BLOCK_SIZE)
if raw_bytes in si_map:
source = si_map[raw_bytes]
else:
source = SourceInformation(
raw_bytes=raw_bytes,
stream=stream,
address=address,
mapped=mapped,
file_limit=file_limit,
)
si_map[raw_bytes] = source
self.acq_source = source
else:
self.acq_source = None
except KeyError:
self.address = 0
self.id = b"##CG"
self.reserved0 = kwargs.get("reserved0", 0)
self.next_cg_addr = kwargs.get("next_cg_addr", 0)
self.first_ch_addr = kwargs.get("first_ch_addr", 0)
self.acq_name_addr = kwargs.get("acq_name_addr", 0)
self.acq_source_addr = kwargs.get("acq_source_addr", 0)
self.first_sample_reduction_addr = kwargs.get("first_sample_reduction_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.record_id = kwargs.get("record_id", 1)
self.cycles_nr = kwargs.get("cycles_nr", 0)
self.flags = kwargs.get("flags", 0)
self.path_separator = kwargs.get("path_separator", 0)
self.reserved1 = kwargs.get("reserved1", 0)
self.samples_byte_nr = kwargs.get("samples_byte_nr", 0)
self.invalidation_bytes_nr = kwargs.get("invalidation_bytes_nr", 0)
if self.flags & v4c.FLAG_CG_REMOTE_MASTER:
self.cg_master_addr = kwargs.get("cg_master_addr", 0)
self.block_len = v4c.CG_RM_BLOCK_SIZE
self.links_nr = 7
else:
self.block_len = v4c.CG_BLOCK_SIZE
self.links_nr = 6
def __getitem__(self, item: str) -> object:
return getattr(self, item)
def __setitem__(self, item: str, value: object) -> None:
setattr(self, item, value)
def to_blocks(
self,
address: int,
blocks: list[bytes | SupportsBytes],
defined_texts: dict[bytes | str, int],
si_map: dict[bytes | int, int],
) -> int:
text = self.acq_name
if text:
if text in defined_texts:
self.acq_name_addr = defined_texts[text]
else:
tx_block = TextBlock(text=text)
self.acq_name_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.acq_name_addr = 0
text = self.comment
if text:
if text in defined_texts:
self.comment_addr = defined_texts[text]
else:
meta = text.startswith("<CGcomment")
tx_block = TextBlock(text=text, meta=meta)
self.comment_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.comment_addr = 0
source = self.acq_source
if source:
address = source.to_blocks(address, blocks, defined_texts, si_map)
self.acq_source_addr = source.address
else:
self.acq_source_addr = 0
blocks.append(self)
self.address = address
address += self.block_len
return address
def __bytes__(self) -> bytes:
if self.flags & v4c.FLAG_CG_REMOTE_MASTER:
result = v4c.CHANNEL_GROUP_RM_p(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.next_cg_addr,
self.first_ch_addr,
self.acq_name_addr,
self.acq_source_addr,
self.first_sample_reduction_addr,
self.comment_addr,
self.cg_master_addr,
self.record_id,
self.cycles_nr,
self.flags,
self.path_separator,
self.reserved1,
self.samples_byte_nr,
self.invalidation_bytes_nr,
)
else:
result = v4c.CHANNEL_GROUP_p(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.next_cg_addr,
self.first_ch_addr,
self.acq_name_addr,
self.acq_source_addr,
self.first_sample_reduction_addr,
self.comment_addr,
self.record_id,
self.cycles_nr,
self.flags,
self.path_separator,
self.reserved1,
self.samples_byte_nr,
self.invalidation_bytes_nr,
)
return result
def metadata(self) -> str:
keys: tuple[str, ...] = (
"id",
"reserved0",
"block_len",
"links_nr",
"next_cg_addr",
"first_ch_addr",
"acq_name_addr",
"acq_source_addr",
"first_sample_reduction_addr",
"comment_addr",
)
if self.block_len == v4c.CG_RM_BLOCK_SIZE:
keys += ("cg_master_addr",)
keys += (
"record_id",
"cycles_nr",
"flags",
"path_separator",
"reserved1",
"samples_byte_nr",
"invalidation_bytes_nr",
)
max_len = max(len(key) for key in keys)
template = f"{{: <{max_len}}}: {{}}"
metadata: list[str] = []
lines = f"""
name: {self.acq_name}
address: {hex(self.address)}
comment: {self.comment}
""".split("\n")
for key in keys:
val = getattr(self, key)
if key.endswith("addr") or key.startswith("text_"):
lines.append(template.format(key, hex(val)))
elif isinstance(val, float):
lines.append(template.format(key, val))
else:
if isinstance(val, bytes):
lines.append(template.format(key, val.strip(b"\0")))
else:
lines.append(template.format(key, val))
if key == "flags":
flags = []
for fl, string in v4c.FLAG_CG_TO_STRING.items():
if self.flags & fl:
flags.append(string)
if flags:
lines[-1] += f" [0x{self.flags:X}= {', '.join(flags)}]"
elif key == "path_separator":
if self.path_separator:
sep = pack("<H", self.path_separator).decode("utf-16")
lines[-1] += f" (= '{sep}')"
else:
lines[-1] += " (= <undefined>)"
for line in lines:
if not line:
metadata.append(line)
else:
for wrapped_line in wrap(line, width=120):
metadata.append(wrapped_line)
return "\n".join(metadata)
class _ChannelConversionBase:
__slots__ = (
"P1",
"P2",
"P3",
"P4",
"P5",
"P6",
"a",
"address",
"b",
"block_len",
"comment",
"comment_addr",
"conversion_type",
"flags",
"formula",
"id",
"inv_conv_addr",
"links_nr",
"max_phy_value",
"min_phy_value",
"name",
"name_addr",
"precision",
"ref_param_nr",
"referenced_blocks",
"reserved0",
"unit",
"unit_addr",
"val_param_nr",
)
class ChannelConversionKwargs(BlockKwargs, total=False):
raw_bytes: bytes
name: str
unit: str
comment: str
formula: str
links_nr: int
name_addr: int
unit_addr: int
comment_addr: int
inv_conv_addr: int
formula_addr: int
conversion_type: int
precision: int
flags: int
ref_param_nr: int
val_default: float
val_param_nr: int
min_phy_value: float
max_phy_value: float
a: float
b: float
default_addr: Union[bytes, "ChannelConversion"]
default: float
P1: float
P2: float
P3: float
P4: float
P5: float
P6: float
file_limit: int | float
class _BoundCache(TypedDict):
phys: list[Union[bytes, "ChannelConversion"]]
lower: NDArray[Any] | list[Any]
upper: NDArray[Any] | list[Any]
type: Literal["big", "small"]
class _ValsCache(TypedDict):
phys: list[Union[bytes, "ChannelConversion"]]
raw_vals: NDArray[Any] | list[Any]
type: Literal["big", "small"]
_Cache = _BoundCache | _ValsCache
[docs]
class ChannelConversion(_ChannelConversionBase):
"""`ChannelConversion` has the following attributes, which are also
available as dict-like key-value pairs.
CCBLOCK common fields:
* ``id`` - bytes : block ID; always b'##CC'
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``name_addr`` - int : address of TXBLOCK that contains the
conversion name
* ``unit_addr`` - int : address of TXBLOCK that contains the
conversion unit
* ``comment_addr`` - int : address of TXBLOCK/MDBLOCK that contains the
conversion comment
* ``inv_conv_addr`` - int : address of inverse conversion
* ``conversion_type`` - int : integer code for conversion type
* ``precision`` - int : integer code for precision
* ``flags`` - int : conversion block flags
* ``ref_param_nr`` - int : number fo referenced parameters (linked
parameters)
* ``val_param_nr`` - int : number of value parameters
* ``min_phy_value`` - float : minimum physical channel value
* ``max_phy_value`` - float : maximum physical channel value
CCBLOCK specific fields:
* linear conversion
* ``a`` - float : factor
* ``b`` - float : offset
* rational conversion
* ``P1`` to ``P6`` - float : parameters
* algebraic conversion
* ``formula_addr`` - address of TXBLOCK that contains
the algebraic conversion formula
* tabular conversion with or without interpolation
* ``raw_<N>`` - float : N-th raw value
* ``phys_<N>`` - float : N-th physical value
* tabular range conversion
* ``lower_<N>`` - float : N-th lower value
* ``upper_<N>`` - float : N-th upper value
* ``phys_<N>`` - float : N-th physical value
* tabular value to text conversion
* ``val_<N>`` - float : N-th raw value
* ``text_<N>`` - int : address of N-th TXBLOCK that
contains the physical value
* ``default`` - int : address of TXBLOCK that contains
the default physical value
* tabular range to text conversion
* ``lower_<N>`` - float : N-th lower value
* ``upper_<N>`` - float : N-th upper value
* ``text_<N>`` - int : address of N-th TXBLOCK that
contains the physical value
* ``default`` - int : address of TXBLOCK that contains
the default physical value
* text to value conversion
* ``val_<N>`` - float : N-th physical value
* ``text_<N>`` - int : address of N-th TXBLOCK that
contains the raw value
* ``val_default`` - float : default physical value
* text transformation (translation) conversion
* ``input_<N>_addr`` - int : address of N-th TXBLOCK that
contains the raw value
* ``output_<N>_addr`` - int : address of N-th TXBLOCK that
contains the physical value
* ``default_addr`` - int : address of TXBLOCK that contains
the default physical value
Other attributes:
* ``address`` - int : channel conversion address
* ``comment`` - str : channel conversion comment
* ``formula`` - str : algebraic conversion formula; default ''
* ``referenced_blocks`` - dict : dict of referenced blocks; can be TextBlock
objects for value to text, and text to text conversions; for partial
conversions the referenced blocks can be ChannelConversion object as well
* ``name`` - str : channel conversion name
* ``unit`` - str : channel conversion unit
"""
def __init__(self, **kwargs: Unpack[ChannelConversionKwargs]) -> None:
self._cache: _Cache | None = None
self.is_user_defined = False
if "stream" in kwargs:
stream = kwargs["stream"]
mapped = kwargs["mapped"]
self.address = address = kwargs["address"]
file_limit = kwargs["file_limit"]
if address + COMMON_SIZE > file_limit:
handle_incomplete_block(address, file_limit)
raise KeyError
try:
tx_block = kwargs["raw_bytes"]
self.id, self.reserved0, self.block_len, self.links_nr = COMMON_uf(tx_block)
if address + self.block_len > file_limit:
handle_incomplete_block(address, file_limit)
raise KeyError
if self.id != b"##CC":
message = f'Expected "##CC" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
tx_block = tx_block[COMMON_SIZE:]
except KeyError:
stream.seek(address)
self.id, self.reserved0, self.block_len, self.links_nr = COMMON_u(stream.read(COMMON_SIZE))
if address + self.block_len > file_limit:
handle_incomplete_block(address, file_limit)
raise KeyError from None
if self.id != b"##CC":
message = f'Expected "##CC" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message) from None
tx_block = stream.read(self.block_len - COMMON_SIZE)
(conv,) = UINT8_uf(tx_block, self.links_nr * 8)
if conv == v4c.CONVERSION_TYPE_NON:
(
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
) = v4c.CONVERSION_NONE_INIT_u(tx_block)
elif conv == v4c.CONVERSION_TYPE_LIN:
(
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
self.b,
self.a,
) = v4c.CONVERSION_LINEAR_INIT_u(tx_block)
elif conv == v4c.CONVERSION_TYPE_RAT:
(
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
self.P1,
self.P2,
self.P3,
self.P4,
self.P5,
self.P6,
) = typing.cast(v4c.ConversionRatInit, unpack(v4c.FMT_CONVERSION_RAT_INIT, tx_block))
elif conv == v4c.CONVERSION_TYPE_ALG:
(
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
self.formula_addr,
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
) = typing.cast(v4c.ConversionAlgebraicInit, unpack(v4c.FMT_CONVERSION_ALGEBRAIC_INIT, tx_block))
elif conv in (v4c.CONVERSION_TYPE_TABI, v4c.CONVERSION_TYPE_TAB):
(
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
) = v4c.CONVERSION_NONE_INIT_uf(tx_block)
nr = self.val_param_nr
values: tuple[float, ...] = unpack_from(f"<{nr}d", tx_block, 56)
for i in range(nr // 2):
self[f"raw_{i}"], self[f"phys_{i}"] = (
values[i * 2],
values[2 * i + 1],
)
elif conv == v4c.CONVERSION_TYPE_RTAB:
(
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
) = v4c.CONVERSION_NONE_INIT_uf(tx_block)
nr = self.val_param_nr
values = unpack_from(f"<{nr}d", tx_block, 56)
for i in range((nr - 1) // 3):
self[f"lower_{i}"], self[f"upper_{i}"], self[f"phys_{i}"] = (
values[i * 3],
values[3 * i + 1],
values[3 * i + 2],
)
(self.default,) = FLOAT64_u(tx_block[-8:])
elif conv == v4c.CONVERSION_TYPE_TABX:
(
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
) = typing.cast(tuple[int, int, int, int], unpack_from("<4Q", tx_block))
links_nr = self.links_nr - 4
links: tuple[int, ...] = unpack_from(f"<{links_nr}Q", tx_block, 32)
for i, link in enumerate(links[:-1]):
self[f"text_{i}"] = link
self.default_addr = links[-1]
(
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
) = typing.cast(
tuple[int, int, int, int, int, float, float], unpack_from("<2B3H2d", tx_block, 32 + links_nr * 8)
)
values = unpack_from(f"<{links_nr - 1}d", tx_block, 32 + links_nr * 8 + 24)
for i, val in enumerate(values):
self[f"val_{i}"] = val
elif conv == v4c.CONVERSION_TYPE_RTABX:
(
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
) = typing.cast(tuple[int, int, int, int], unpack_from("<4Q", tx_block))
links_nr = self.links_nr - 4
links = unpack_from(f"<{links_nr}Q", tx_block, 32)
for i, link in enumerate(links[:-1]):
self[f"text_{i}"] = link
self.default_addr = links[-1]
(
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
) = typing.cast(
tuple[int, int, int, int, int, float, float], unpack_from("<2B3H2d", tx_block, 32 + links_nr * 8)
)
values = unpack_from(f"<{self.val_param_nr}d", tx_block, 32 + links_nr * 8 + 24)
self.default_lower = self.default_upper = 0
for i in range(self.val_param_nr // 2):
j = 2 * i
self[f"lower_{i}"] = values[j]
self[f"upper_{i}"] = values[j + 1]
elif conv == v4c.CONVERSION_TYPE_TTAB:
(
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
) = typing.cast(tuple[int, int, int, int], unpack_from("<4Q", tx_block))
links_nr = self.links_nr - 4
links = unpack_from(f"<{links_nr}Q", tx_block, 32)
for i, link in enumerate(links):
self[f"text_{i}"] = link
(
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
) = typing.cast(
tuple[int, int, int, int, int, float, float], unpack_from("<2B3H2d", tx_block, 32 + links_nr * 8)
)
values = unpack_from(f"<{self.val_param_nr}d", tx_block, 32 + links_nr * 8 + 24)
for i, val in enumerate(values[:-1]):
self[f"val_{i}"] = val
self.val_default = values[-1]
elif conv == v4c.CONVERSION_TYPE_TRANS:
(
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
) = typing.cast(tuple[int, int, int, int], unpack_from("<4Q", tx_block))
links_nr = self.links_nr - 4
links = unpack_from(f"<{links_nr}Q", tx_block, 32)
for i in range((links_nr - 1) // 2):
j = 2 * i
self[f"input_{i}_addr"] = links[j]
self[f"output_{i}_addr"] = links[j + 1]
self.default_addr = links[-1]
(
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
) = typing.cast(
tuple[int, int, int, int, int, float, float], unpack_from("<2B3H2d", tx_block, 32 + links_nr * 8)
)
elif conv == v4c.CONVERSION_TYPE_BITFIELD:
(
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
) = typing.cast(tuple[int, int, int, int], unpack_from("<4Q", tx_block))
links_nr = self.links_nr - 4
links = unpack_from(f"<{links_nr}Q", tx_block, 32)
for i, link in enumerate(links):
self[f"text_{i}"] = link
(
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
) = typing.cast(
tuple[int, int, int, int, int, float, float], unpack_from("<2B3H2d", tx_block, 32 + links_nr * 8)
)
values = unpack_from(f"<{self.val_param_nr}Q", tx_block, 32 + links_nr * 8 + 24)
for i, val in enumerate(values):
self[f"mask_{i}"] = val
self.referenced_blocks: dict[str, bytes | ChannelConversion] = {}
self.name = get_text_v4(self.name_addr, stream, mapped=mapped, file_limit=file_limit)
self.unit = get_text_v4(self.unit_addr, stream, mapped=mapped, file_limit=file_limit)
self.comment = get_text_v4(self.comment_addr, stream, mapped=mapped, file_limit=file_limit)
conv_type = conv
if conv_type == v4c.CONVERSION_TYPE_ALG:
self.formula = get_text_v4(self.formula_addr, stream, mapped=mapped, file_limit=file_limit).replace(
"x", "X"
)
else:
self.formula = ""
if conv_type in v4c.TABULAR_CONVERSIONS:
refs = self.referenced_blocks = {}
if conv_type in (
v4c.CONVERSION_TYPE_TTAB,
v4c.CONVERSION_TYPE_BITFIELD,
):
tabs = self.links_nr - 4
else:
tabs = self.links_nr - 4 - 1
for i in range(tabs):
address = typing.cast(int, self[f"text_{i}"])
if address:
if address + 4 > file_limit:
handle_incomplete_block(address, file_limit)
refs[f"text_{i}"] = b""
continue
stream.seek(address)
_id = stream.read(4)
if _id == b"##TX":
refs[f"text_{i}"] = get_text_v4(
address=address,
stream=stream,
mapped=mapped,
decode=False,
file_limit=file_limit,
)
elif _id == b"##CC":
cc_block = ChannelConversion(
address=address,
stream=stream,
mapped=mapped,
file_limit=file_limit,
)
refs[f"text_{i}"] = cc_block
else:
message = f'Expected "##TX" or "##CC" block @{hex(address)} but found "{_id!r}"'
logger.exception(message)
raise MdfException(message)
else:
refs[f"text_{i}"] = b""
if conv_type not in (
v4c.CONVERSION_TYPE_TTAB,
v4c.CONVERSION_TYPE_BITFIELD,
):
address = self.default_addr
if address:
if address + 4 > file_limit:
handle_incomplete_block(address, file_limit)
refs["default_addr"] = b""
else:
stream.seek(address)
_id = stream.read(4)
if _id == b"##TX":
refs["default_addr"] = get_text_v4(
address=address,
stream=stream,
mapped=mapped,
decode=False,
file_limit=file_limit,
)
elif _id == b"##CC":
cc_block = ChannelConversion(
address=address,
stream=stream,
mapped=mapped,
file_limit=file_limit,
)
refs["default_addr"] = cc_block
else:
message = f'Expected "##TX" or "##CC" block @{hex(address)} but found "{_id!r}"'
logger.exception(message)
raise MdfException(message)
else:
refs["default_addr"] = b""
elif conv_type == v4c.CONVERSION_TYPE_TRANS:
refs = self.referenced_blocks = {}
# link_nr - common links (4) - default text link (1)
for i in range((self.links_nr - 4 - 1) // 2):
for key in (f"input_{i}_addr", f"output_{i}_addr"):
address = typing.cast(int, self[key])
refs[key] = get_text_v4(
address=address,
stream=stream,
mapped=mapped,
decode=False,
file_limit=file_limit,
)
address = self.default_addr
refs["default_addr"] = get_text_v4(
address=address,
stream=stream,
mapped=mapped,
decode=False,
file_limit=file_limit,
)
else:
self.name = kwargs.get("name", "")
self.unit = kwargs.get("unit", "")
self.comment = kwargs.get("comment", "")
self.formula = kwargs.get("formula", "")
self.referenced_blocks = {}
self.address = 0
self.id = b"##CC"
self.reserved0 = 0
if not "conversion_type" in kwargs:
kwargs["conversion_type"] = v4c.CONVERSION_TYPE_NON
if kwargs["conversion_type"] == v4c.CONVERSION_TYPE_NON:
self.block_len = v4c.CC_NONE_BLOCK_SIZE
self.links_nr = 4
self.name_addr = kwargs.get("name_addr", 0)
self.unit_addr = kwargs.get("unit_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.inv_conv_addr = 0
self.conversion_type = v4c.CONVERSION_TYPE_NON
self.precision = 1
self.flags = 0
self.ref_param_nr = 0
self.val_param_nr = 0
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
elif kwargs["conversion_type"] == v4c.CONVERSION_TYPE_LIN:
self.block_len = v4c.CC_LIN_BLOCK_SIZE
self.links_nr = 4
self.name_addr = kwargs.get("name_addr", 0)
self.unit_addr = kwargs.get("unit_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.inv_conv_addr = kwargs.get("inv_conv_addr", 0)
self.conversion_type = v4c.CONVERSION_TYPE_LIN
self.precision = kwargs.get("precision", 1)
self.flags = kwargs.get("flags", 0)
self.ref_param_nr = 0
self.val_param_nr = 2
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
self.b = float(kwargs["b"])
self.a = float(kwargs["a"])
elif kwargs["conversion_type"] == v4c.CONVERSION_TYPE_ALG:
self.block_len = v4c.CC_ALG_BLOCK_SIZE
self.links_nr = 5
self.name_addr = kwargs.get("name_addr", 0)
self.unit_addr = kwargs.get("unit_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.inv_conv_addr = kwargs.get("inv_conv_addr", 0)
self.formula_addr = kwargs.get("formula_addr", 0)
self.conversion_type = v4c.CONVERSION_TYPE_ALG
self.precision = kwargs.get("precision", 1)
self.flags = kwargs.get("flags", 0)
self.ref_param_nr = 1
self.val_param_nr = 0
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
elif kwargs["conversion_type"] in (
v4c.CONVERSION_TYPE_TAB,
v4c.CONVERSION_TYPE_TABI,
):
nr = kwargs["val_param_nr"]
self.block_len = 80 + 8 * nr
self.links_nr = 4
self.name_addr = kwargs.get("name_addr", 0)
self.unit_addr = kwargs.get("unit_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.inv_conv_addr = kwargs.get("inv_conv_addr", 0)
self.conversion_type = kwargs["conversion_type"]
self.precision = kwargs.get("precision", 1)
self.flags = kwargs.get("flags", 0)
self.ref_param_nr = 0
self.val_param_nr = nr
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
for i in range(nr // 2):
self[f"raw_{i}"] = kwargs[f"raw_{i}"] # type: ignore[literal-required]
self[f"phys_{i}"] = kwargs[f"phys_{i}"] # type: ignore[literal-required]
elif kwargs["conversion_type"] == v4c.CONVERSION_TYPE_RTAB:
self.block_len = kwargs["val_param_nr"] * 8 + 80
self.links_nr = 4
self.name_addr = kwargs.get("name_addr", 0)
self.unit_addr = kwargs.get("unit_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.inv_conv_addr = kwargs.get("inv_conv_addr", 0)
self.conversion_type = v4c.CONVERSION_TYPE_RTAB
self.precision = kwargs.get("precision", 0)
self.flags = kwargs.get("flags", 0)
self.ref_param_nr = 0
self.val_param_nr = kwargs["val_param_nr"]
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
for i in range((kwargs["val_param_nr"] - 1) // 3):
self[f"lower_{i}"] = kwargs[f"lower_{i}"] # type: ignore[literal-required]
self[f"upper_{i}"] = kwargs[f"upper_{i}"] # type: ignore[literal-required]
self[f"phys_{i}"] = kwargs[f"phys_{i}"] # type: ignore[literal-required]
self.default = kwargs["default"]
elif kwargs["conversion_type"] == v4c.CONVERSION_TYPE_RAT:
self.block_len = 80 + 6 * 8
self.links_nr = 4
self.name_addr = kwargs.get("name_addr", 0)
self.unit_addr = kwargs.get("unit_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.inv_conv_addr = kwargs.get("inv_conv_addr", 0)
self.conversion_type = kwargs["conversion_type"]
self.precision = kwargs.get("precision", 1)
self.flags = kwargs.get("flags", 0)
self.ref_param_nr = 0
self.val_param_nr = kwargs.get("val_param_nr", 6)
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
for i in range(1, 7):
self[f"P{i}"] = kwargs[f"P{i}"] # type: ignore[literal-required]
elif kwargs["conversion_type"] == v4c.CONVERSION_TYPE_TABX:
self.referenced_blocks = {}
nr = kwargs["ref_param_nr"] - 1
self.block_len = (nr * 8 * 2) + 88
self.links_nr = nr + 5
self.name_addr = kwargs.get("name_addr", 0)
self.unit_addr = kwargs.get("unit_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.inv_conv_addr = kwargs.get("inv_conv_addr", 0)
for i in range(nr):
key = f"text_{i}"
self[key] = 0
self.referenced_blocks[key] = kwargs[key] # type: ignore[literal-required]
self.default_addr = 0
key = "default_addr"
if "default_addr" in kwargs:
default = kwargs["default_addr"]
else:
default = kwargs.get("default_addr", b"")
self.referenced_blocks[key] = default
self.conversion_type = v4c.CONVERSION_TYPE_TABX
self.precision = kwargs.get("precision", 0)
self.flags = kwargs.get("flags", 0)
self.ref_param_nr = nr + 1
self.val_param_nr = nr
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
for i in range(nr):
self[f"val_{i}"] = kwargs[f"val_{i}"] # type: ignore[literal-required]
elif kwargs["conversion_type"] == v4c.CONVERSION_TYPE_RTABX:
self.referenced_blocks = {}
nr = kwargs["ref_param_nr"] - 1
self.block_len = (nr * 8 * 3) + 88
self.links_nr = nr + 5
self.name_addr = kwargs.get("name_addr", 0)
self.unit_addr = kwargs.get("unit_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.inv_conv_addr = kwargs.get("inv_conv_addr", 0)
for i in range(nr):
key = f"text_{i}"
self[key] = 0
self.referenced_blocks[key] = kwargs[key] # type: ignore[literal-required]
self.default_addr = 0
self.default_lower = self.default_upper = 0
if "default_addr" in kwargs:
default = kwargs["default_addr"]
else:
default = kwargs.get("default_addr", b"")
if isinstance(default, bytes) and b"{X}" in default:
formula = default.decode("latin-1").replace("{X}", "X").split('"')[1]
cc_block = ChannelConversion(conversion_type=v4c.CONVERSION_TYPE_ALG, formula=formula)
self.referenced_blocks["default_addr"] = cc_block
else:
self.referenced_blocks["default_addr"] = default
self.conversion_type = v4c.CONVERSION_TYPE_RTABX
self.precision = kwargs.get("precision", 0)
self.flags = kwargs.get("flags", 0)
self.ref_param_nr = nr + 1
self.val_param_nr = nr * 2
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
for i in range(nr):
self[f"lower_{i}"] = kwargs[f"lower_{i}"] # type: ignore[literal-required]
self[f"upper_{i}"] = kwargs[f"upper_{i}"] # type: ignore[literal-required]
elif kwargs["conversion_type"] == v4c.CONVERSION_TYPE_TTAB:
self.block_len = ((kwargs["links_nr"] - 4) * 8 * 2) + 88
self.links_nr = kwargs["links_nr"]
self.name_addr = kwargs.get("name_addr", 0)
self.unit_addr = kwargs.get("unit_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.inv_conv_addr = kwargs.get("inv_conv_addr", 0)
for i in range(kwargs["links_nr"] - 4):
self[f"text_{i}"] = kwargs.get(f"text_{i}", 0)
self.conversion_type = v4c.CONVERSION_TYPE_TTAB
self.precision = kwargs.get("precision", 0)
self.flags = kwargs.get("flags", 0)
self.ref_param_nr = kwargs["links_nr"] - 4
self.val_param_nr = kwargs["links_nr"] - 4 + 1
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
for i in range(kwargs["links_nr"] - 4):
self[f"val_{i}"] = kwargs[f"val_{i}"] # type: ignore[literal-required]
self.val_default = kwargs["val_default"]
elif kwargs["conversion_type"] == v4c.CONVERSION_TYPE_BITFIELD:
self.referenced_blocks = {}
nr = kwargs["val_param_nr"]
self.block_len = (nr * 8 * 2) + 80
self.links_nr = nr + 4
self.name_addr = kwargs.get("name_addr", 0)
self.unit_addr = kwargs.get("unit_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.inv_conv_addr = kwargs.get("inv_conv_addr", 0)
for i in range(nr):
key = f"text_{i}"
self[key] = 0
self.referenced_blocks[key] = kwargs[key] # type: ignore[literal-required]
self.conversion_type = v4c.CONVERSION_TYPE_BITFIELD
self.precision = kwargs.get("precision", 0)
self.flags = kwargs.get("flags", 0)
self.ref_param_nr = nr
self.val_param_nr = nr
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
for i in range(nr):
self[f"mask_{i}"] = kwargs[f"mask_{i}"] # type: ignore[literal-required]
else:
message = "Conversion {} dynamic creation not implemented"
message = message.format(kwargs["conversion_type"])
logger.exception(message)
raise MdfException(message)
# the inverse conversion is not used (see issue #1017)
self.inv_conv_addr = 0
# use the first unit found in a refereced block (see issue #1205)
if not self.unit:
for block in self.referenced_blocks.values():
if isinstance(block, ChannelConversion) and block.unit:
self.unit = block.unit
break
def to_blocks(
self,
address: int,
blocks: list[bytes | SupportsBytes],
defined_texts: dict[bytes | str, int],
cc_map: dict[bytes | int, int],
) -> int:
if id(self) in cc_map:
return address
text = self.name
if text:
if text in defined_texts:
self.name_addr = defined_texts[text]
else:
tx_block = TextBlock(
text=text.encode("utf-8", "replace"),
meta=False,
safe=True,
)
self.name_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.name_addr = 0
text = self.unit
if text:
if text in defined_texts:
self.unit_addr = defined_texts[text]
else:
tx_block = TextBlock(
text=text.encode("utf-8", "replace"),
meta=False,
safe=True,
)
self.unit_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.unit_addr = 0
if self.conversion_type == v4c.CONVERSION_TYPE_ALG:
text = self.formula
if text:
if text in defined_texts:
self.formula_addr = defined_texts[text]
else:
tx_block = TextBlock(
text=text.encode("utf-8", "replace"),
meta=False,
safe=True,
)
self.formula_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.formula_addr = 0
text = self.comment
if text:
if text in defined_texts:
self.comment_addr = defined_texts[text]
else:
meta = text.startswith("<CCcomment")
tx_block = TextBlock(
text=text.encode("utf-8", "replace"),
meta=meta,
safe=True,
)
self.comment_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.comment_addr = 0
if self.referenced_blocks:
for key, block in self.referenced_blocks.items():
if block:
if isinstance(block, ChannelConversion):
address = block.to_blocks(address, blocks, defined_texts, cc_map)
self[key] = block.address
else:
if block in defined_texts:
self[key] = defined_texts[block]
else:
tx_block = TextBlock(text=block)
defined_texts[block] = address
blocks.append(tx_block)
self[key] = address
address += tx_block.block_len
else:
self[key] = 0
bts = bytes(self)
if bts in cc_map:
self.address = cc_map[bts]
else:
blocks.append(bts)
self.address = address
cc_map[bts] = cc_map[id(self)] = address
address += self.block_len
return address
@overload
def convert(
self,
values: list[Any] | NDArray[Any],
as_object: bool = ...,
as_bytes: bool = ...,
ignore_value2text_conversions: bool = ...,
) -> NDArray[Any]: ...
@overload
def convert(
self,
values: ArrayLike,
as_object: bool = ...,
as_bytes: bool = ...,
ignore_value2text_conversions: bool = ...,
) -> NDArray[Any] | np.number[Any]: ...
def convert(
self,
values: ArrayLike,
as_object: bool = False,
as_bytes: bool = False,
ignore_value2text_conversions: bool = False,
) -> NDArray[Any] | np.number[Any]:
identical = ChannelConversion(conversion_type=v4c.CONVERSION_TYPE_NON)
scalar = False
if not isinstance(values, np.ndarray):
if isinstance(values, (int, float)):
new_values = np.array([values])
scalar = True
else:
new_values = np.array(values)
else:
new_values = values
values_count = len(new_values)
index: np.intp | int
conversion_type = self.conversion_type
if conversion_type == v4c.CONVERSION_TYPE_NON:
pass
elif conversion_type == v4c.CONVERSION_TYPE_LIN:
a = self.a
b = self.b
if (a, b) != (1, 0):
names = new_values.dtype.names
if names:
name = names[0]
vals = new_values[name]
vals = vals * a
if b:
vals += b
new_values = np.rec.fromarrays(
[vals] + [new_values[name] for name in names[1:]],
dtype=[(name, vals.dtype, vals.shape[1:])]
+ [(name, new_values[name].dtype, new_values[name].shape[1:]) for name in names[1:]],
)
else:
new_values = new_values * a
if b:
new_values += b
elif conversion_type == v4c.CONVERSION_TYPE_RAT:
P1 = self.P1
P2 = self.P2
P3 = self.P3
P4 = self.P4
P5 = self.P5
P6 = self.P6
names = new_values.dtype.names
if names:
name = names[0]
vals = new_values[name]
if (P1, P3, P4, P5) == (0, 0, 0, 0):
if P2 != P6:
vals = vals * (P2 / P6)
elif (P2, P3, P4, P6) == (0, 0, 0, 0):
if P1 != P5:
vals = vals * (P1 / P5)
else:
X = vals
try:
vals = evaluate(v4c.CONV_RAT_TEXT)
except TypeError:
vals = (P1 * X**2 + P2 * X + P3) / (P4 * X**2 + P5 * X + P6)
new_values = np.rec.fromarrays(
[vals] + [new_values[name] for name in names[1:]],
dtype=[(name, vals.dtype, vals.shape[1:])]
+ [(name, new_values[name].dtype, new_values[name].shape[1:]) for name in names[1:]],
)
else:
X = new_values
if (P1, P3, P4, P5) == (0, 0, 0, 0):
if P2 != P6:
new_values = new_values * (P2 / P6)
elif (P2, P3, P4, P6) == (0, 0, 0, 0):
if P1 != P5:
new_values = new_values * (P1 / P5)
else:
try:
new_values = evaluate(v4c.CONV_RAT_TEXT)
except TypeError:
new_values = (P1 * X**2 + P2 * X + P3) / (P4 * X**2 + P5 * X + P6)
elif conversion_type == v4c.CONVERSION_TYPE_ALG:
try:
X = new_values
INF = np.inf
NaN = np.nan
new_values = evaluate(self.formula.replace("X1", "X"))
except:
if lambdify is not None:
X_symbol = symbols("X")
expr = lambdify(
X_symbol,
self.formula.replace("X1", "X"),
modules=[{"INF": np.inf, "NaN": np.nan}, "numpy"],
dummify=False,
cse=True,
)
new_values = expr(new_values)
elif conversion_type in (v4c.CONVERSION_TYPE_TABI, v4c.CONVERSION_TYPE_TAB):
nr = self.val_param_nr // 2
raw_vals = np.array([self[f"raw_{i}"] for i in range(nr)])
phys_vals = np.array([self[f"phys_{i}"] for i in range(nr)])
if conversion_type == v4c.CONVERSION_TYPE_TABI:
new_values = np.interp(new_values, raw_vals, phys_vals)
else:
dim = raw_vals.shape[0]
inds = np.searchsorted(raw_vals, new_values)
inds[inds >= dim] = dim - 1 # type: ignore[index, unused-ignore]
inds2 = inds - 1
inds2[inds2 < 0] = 0 # type: ignore[index, unused-ignore]
cond = np.abs(new_values - raw_vals[inds]) >= np.abs(new_values - raw_vals[inds2])
new_values = np.where(cond, phys_vals[inds2], phys_vals[inds])
elif conversion_type == v4c.CONVERSION_TYPE_RTAB:
nr = (self.val_param_nr - 1) // 3
lower_vals = np.array([self[f"lower_{i}"] for i in range(nr)])
upper_vals = np.array([self[f"upper_{i}"] for i in range(nr)])
phys_vals = np.array([self[f"phys_{i}"] for i in range(nr)])
if new_values.dtype.kind == "f":
idx1 = np.searchsorted(lower_vals, new_values, side="right") - 1
idx2 = np.searchsorted(upper_vals, new_values, side="right")
else:
idx1 = np.searchsorted(lower_vals, new_values, side="right") - 1
idx2 = np.searchsorted(upper_vals, new_values, side="right") - 1
idx_ne = np.nonzero(idx1 != idx2)[0]
idx_eq = np.nonzero(idx1 == idx2)[0]
new_values = np.zeros(len(new_values), dtype=phys_vals.dtype)
if len(idx_ne):
new_values[idx_ne] = self.default
if len(idx_eq):
new_values[idx_eq] = phys_vals[idx1[idx_eq]] # type: ignore[index, unused-ignore]
elif conversion_type == v4c.CONVERSION_TYPE_TABX and values_count >= 150:
if ignore_value2text_conversions:
nr = self.val_param_nr
raw = [self[f"val_{i}"] for i in range(nr)]
phys: list[bytes | ChannelConversion] = []
for i in range(nr):
tx_ref = self.referenced_blocks[f"text_{i}"]
if isinstance(tx_ref, bytes):
phys.append(identical)
else:
phys.append(tx_ref)
pairs = sorted(zip(raw, phys, strict=False))
raw_vals = np.array([e[0] for e in pairs], dtype="<i8")
phys = [e[1] for e in pairs]
ref = self.referenced_blocks["default_addr"]
if ref is None:
ref = identical
if isinstance(ref, bytes):
default: bytes | ChannelConversion = identical
else:
default = ref
else:
self._cache = typing.cast(_ValsCache | None, self._cache)
if self._cache is None or self._cache["type"] != "big":
nr = self.val_param_nr
raw = [self[f"val_{i}"] for i in range(nr)]
phys = [self.referenced_blocks[f"text_{i}"] for i in range(nr)]
pairs = sorted(zip(raw, phys, strict=False))
raw_vals = np.array([e[0] for e in pairs], dtype="<i8")
phys = [e[1] for e in pairs]
self._cache = {"phys": phys, "raw_vals": raw_vals, "type": "big"}
else:
phys = self._cache["phys"]
raw_vals = typing.cast(NDArray[Any], self._cache["raw_vals"])
ref_block = self.referenced_blocks["default_addr"]
if ref_block is None:
default = identical
else:
default = ref_block
names = new_values.dtype.names
if names:
name = names[0]
vals = new_values[name]
shape = vals.shape
vals = vals.ravel()
ret_vals: NDArray[np.object_] = np.full(len(vals), None, np.dtype("O"))
idx1 = np.searchsorted(raw_vals, vals, side="right") - 1
idx2 = np.searchsorted(raw_vals, vals, side="left")
idx = np.argwhere(idx1 != idx2).ravel()
if isinstance(default, bytes):
ret_vals[idx] = default
else:
ret_vals[idx] = default.convert(
vals[idx], ignore_value2text_conversions=ignore_value2text_conversions
)
idx = np.argwhere(idx1 == idx2).ravel()
if idx.size:
indexes = idx1[idx] # type: ignore[index, unused-ignore]
unique = np.unique(indexes)
for index in typing.cast(list[int], unique.tolist()):
item = phys[index]
idx_ = np.argwhere(indexes == index).ravel()
if isinstance(item, bytes):
ret_vals[idx[idx_]] = item
else:
ret_vals[idx[idx_]] = item.convert(
vals[idx[idx_]], ignore_value2text_conversions=ignore_value2text_conversions
)
size = bytes_dtype_size(ret_vals)
if size >= 0:
ret_vals = ret_vals.astype(f"S{size}")
else:
try:
ret_vals = ret_vals.astype("f8")
except:
if as_bytes:
ret_vals = ret_vals.astype(bytes)
elif not as_object:
ret_vals = np.array(
[
np.nan if isinstance(v, bytes) else v
for v in typing.cast(list[object], ret_vals.tolist())
]
)
ret_vals = ret_vals.reshape(shape)
new_values = np.rec.fromarrays(
[ret_vals] + [new_values[name] for name in names[1:]],
dtype=[(name, ret_vals.dtype, ret_vals.shape[1:])]
+ [(name, new_values[name].dtype, new_values[name].shape[1:]) for name in names[1:]],
)
else:
ret_vals = np.full(new_values.size, None, "O")
idx1 = np.searchsorted(raw_vals, new_values, side="right") - 1
idx2 = np.searchsorted(raw_vals, new_values, side="left")
idx = np.argwhere(idx1 != idx2).ravel()
if idx.size:
# some raw values were not found in the conversion table
# so the default physical value must be returned
if isinstance(default, bytes):
ret_vals[idx] = default
else:
ret_vals[idx] = default.convert(
new_values[idx], ignore_value2text_conversions=ignore_value2text_conversions
)
idx = np.argwhere(idx1 == idx2).ravel()
if idx.size:
indexes = idx1[idx] # type: ignore[index, unused-ignore]
if indexes.size <= 300:
index_list = sorted(set(typing.cast(list[int], indexes.tolist())))
else:
index_list = typing.cast(list[int], np.unique(indexes).tolist())
for index in index_list:
item = phys[index]
idx_ = np.argwhere(indexes == index).ravel()
if isinstance(item, bytes):
ret_vals[idx[idx_]] = item
else:
ret_vals[idx[idx_]] = item.convert(
new_values[idx[idx_]], ignore_value2text_conversions=ignore_value2text_conversions
)
else:
# all the raw values are found in the conversion table
if idx1.size:
if idx1.size <= 300:
index_list = sorted(set(typing.cast(list[int], idx1.tolist())))
else:
index_list = typing.cast(list[int], np.unique(idx1).tolist())
for index in index_list:
item = phys[index]
idx_ = np.argwhere(idx1 == index).ravel()
if isinstance(item, bytes):
ret_vals[idx_] = item
else:
ret_vals[idx_] = item.convert(
new_values[idx_], ignore_value2text_conversions=ignore_value2text_conversions
)
size = bytes_dtype_size(ret_vals)
if size >= 0:
ret_vals = ret_vals.astype(f"S{size}")
else:
try:
ret_vals = ret_vals.astype("f8")
except:
if as_bytes:
ret_vals = ret_vals.astype(bytes)
elif not as_object:
ret_vals = np.array([np.nan if isinstance(v, bytes) else v for v in ret_vals.tolist()])
new_values = ret_vals
elif conversion_type == v4c.CONVERSION_TYPE_TABX:
if ignore_value2text_conversions:
nr = self.val_param_nr
raw = [self[f"val_{i}"] for i in range(nr)]
phys = []
for i in range(nr):
tx_ref = self.referenced_blocks[f"text_{i}"]
if isinstance(tx_ref, bytes):
phys.append(identical)
else:
phys.append(tx_ref)
pairs = sorted(zip(raw, phys, strict=False))
raw = [e[0] for e in pairs]
phys = [e[1] for e in pairs]
ref = self.referenced_blocks["default_addr"]
if ref is None:
ref = identical
if isinstance(ref, bytes):
default = identical
else:
default = ref
default_is_bytes = False
else:
self._cache = typing.cast(_ValsCache | None, self._cache)
if self._cache is None or self._cache["type"] != "small":
nr = self.val_param_nr
raw = [self[f"val_{i}"] for i in range(nr)]
phys = [self.referenced_blocks[f"text_{i}"] for i in range(nr)]
pairs = sorted(zip(raw, phys, strict=False))
raw = [e[0] for e in pairs]
phys = [e[1] for e in pairs]
self._cache = {"phys": phys, "raw_vals": raw, "type": "small"}
else:
phys = self._cache["phys"]
raw = typing.cast(list[object], self._cache["raw_vals"])
ref = self.referenced_blocks["default_addr"]
if ref is None:
default = identical
else:
default = ref
default_is_bytes = isinstance(default, bytes)
names = new_values.dtype.names
if names:
name = names[0]
vals = new_values[name]
shape = vals.shape
objects = vals.ravel().tolist()
ret: list[object] = []
all_bytes = True
for v in objects:
try:
v_ = phys[raw.index(v)]
if isinstance(v_, bytes):
ret.append(v_)
else:
v_ = v_.convert([v], ignore_value2text_conversions=ignore_value2text_conversions)[0]
ret.append(v_)
if all_bytes and not isinstance(v_, bytes):
all_bytes = False
except:
if default_is_bytes:
default = typing.cast(bytes, default)
ret.append(default)
else:
default = typing.cast(ChannelConversion, default)
v_ = default.convert([v], ignore_value2text_conversions=ignore_value2text_conversions)[0]
ret.append(v_)
if all_bytes and not isinstance(v_, bytes):
all_bytes = False
if not all_bytes:
try:
ret_vals = np.array(ret, dtype="<f8")
except:
ret_vals = np.array(ret, dtype="O")
if as_bytes:
ret_vals = ret_vals.astype(bytes)
elif not as_object:
ret_vals = np.array([np.nan if isinstance(v, bytes) else v for v in ret_vals])
else:
ret_vals = np.array(ret, dtype=bytes)
ret_vals = ret_vals.reshape(shape)
new_values = np.rec.fromarrays(
[ret_vals] + [new_values[name] for name in names[1:]],
dtype=[(name, ret_vals.dtype, ret_vals.shape[1:])]
+ [(name, new_values[name].dtype, new_values[name].shape[1:]) for name in names[1:]],
)
else:
ret = []
all_bytes = True
objects = typing.cast(list[Any], new_values.tolist())
for v in objects:
try:
v_ = phys[raw.index(v)]
if isinstance(v_, bytes):
ret.append(v_)
else:
v_ = v_.convert([v], ignore_value2text_conversions=ignore_value2text_conversions)[0]
ret.append(v_)
if all_bytes and not isinstance(v_, bytes):
all_bytes = False
except:
if default_is_bytes:
default = typing.cast(bytes, default)
ret.append(default)
else:
default = typing.cast(ChannelConversion, default)
v_ = default.convert([v], ignore_value2text_conversions=ignore_value2text_conversions)[0]
ret.append(v_)
if all_bytes and not isinstance(v_, bytes):
all_bytes = False
if not all_bytes:
try:
ret_vals = np.array(ret, dtype="<f8")
except:
ret_vals = np.array(ret, dtype="O")
if as_bytes:
ret_vals = ret_vals.astype(bytes)
elif not as_object:
ret_vals = np.array([np.nan if isinstance(v, bytes) else v for v in ret_vals.tolist()])
else:
ret_vals = np.array(ret, dtype=bytes)
new_values = ret_vals
elif conversion_type == v4c.CONVERSION_TYPE_RTABX and values_count >= 100:
if ignore_value2text_conversions:
nr = self.val_param_nr // 2
phys = []
for i in range(nr):
tx_ref = self.referenced_blocks[f"text_{i}"]
if isinstance(tx_ref, bytes):
phys.append(identical)
else:
phys.append(tx_ref)
lower = [self[f"lower_{i}"] for i in range(nr)]
upper = [self[f"upper_{i}"] for i in range(nr)]
triplets = sorted(zip(lower, upper, phys, strict=False))
lower_vals = np.array([e[0] for e in triplets], dtype="<i8")
upper_vals = np.array([e[1] for e in triplets], dtype="<i8")
phys = [e[2] for e in triplets]
ref = self.referenced_blocks["default_addr"]
if ref is None:
ref = identical
if isinstance(ref, bytes):
default = identical
else:
default = ref
default_is_bytes = False
else:
self._cache = typing.cast(_BoundCache | None, self._cache)
if self._cache is None or self._cache["type"] != "big":
nr = self.val_param_nr // 2
phys = [self.referenced_blocks[f"text_{i}"] for i in range(nr)]
lower = [self[f"lower_{i}"] for i in range(nr)]
upper = [self[f"upper_{i}"] for i in range(nr)]
triplets = sorted(zip(lower, upper, phys, strict=False))
lower_vals = np.array([e[0] for e in triplets], dtype="<i8")
upper_vals = np.array([e[1] for e in triplets], dtype="<i8")
phys = [e[2] for e in triplets]
self._cache = {
"phys": phys,
"lower": lower_vals,
"upper": upper_vals,
"type": "big",
}
else:
phys = self._cache["phys"]
lower_vals = typing.cast(NDArray[Any], self._cache["lower"])
upper_vals = typing.cast(NDArray[Any], self._cache["upper"])
ref = self.referenced_blocks["default_addr"]
if ref is None:
default = identical
else:
default = ref
ret_vals = np.full(new_values.size, None, "O")
idx1 = np.searchsorted(lower_vals, new_values, side="right") - 1
idx2 = np.searchsorted(upper_vals, new_values, side="left")
idx_ne = np.argwhere(idx1 != idx2).ravel()
idx_eq = np.argwhere(idx1 == idx2).ravel()
if isinstance(default, bytes):
ret_vals[idx_ne] = default
else:
ret_vals[idx_ne] = default.convert(
new_values[idx_ne], ignore_value2text_conversions=ignore_value2text_conversions
)
if idx_eq.size:
indexes = idx1[idx_eq] # type: ignore[index, unused-ignore]
unique = np.unique(indexes)
for index in unique:
item = phys[index]
idx_ = np.argwhere(indexes == index).ravel()
if isinstance(item, bytes):
ret_vals[idx_eq[idx_]] = item
else:
try:
ret_vals[idx_eq[idx_]] = item.convert(
new_values[idx_eq[idx_]], ignore_value2text_conversions=ignore_value2text_conversions
)
except:
raise
size = bytes_dtype_size(ret_vals)
if size >= 0:
ret_vals = ret_vals.astype(f"S{size}")
else:
try:
ret_vals = ret_vals.astype("f8")
except:
if as_bytes:
ret_vals = ret_vals.astype(bytes)
elif not as_object:
ret_vals = np.array([np.nan if isinstance(v, bytes) else v for v in ret_vals.tolist()])
new_values = ret_vals
elif conversion_type == v4c.CONVERSION_TYPE_RTABX:
if ignore_value2text_conversions:
nr = self.val_param_nr // 2
phys = []
for i in range(nr):
tx_ref = self.referenced_blocks[f"text_{i}"]
if isinstance(tx_ref, bytes):
phys.append(identical)
else:
phys.append(tx_ref)
lower = [self[f"lower_{i}"] for i in range(nr)]
upper = [self[f"upper_{i}"] for i in range(nr)]
triplets = sorted(zip(lower, upper, phys, strict=False))
lower = [e[0] for e in triplets]
upper = [e[1] for e in triplets]
phys = [e[2] for e in triplets]
ref = self.referenced_blocks["default_addr"]
if ref is None:
ref = identical
if isinstance(ref, bytes):
default = identical
else:
default = ref
default_is_bytes = False
else:
self._cache = typing.cast(_BoundCache | None, self._cache)
if self._cache is None or self._cache["type"] != "small":
nr = self.val_param_nr // 2
phys = [self.referenced_blocks[f"text_{i}"] for i in range(nr)]
lower = [self[f"lower_{i}"] for i in range(nr)]
upper = [self[f"upper_{i}"] for i in range(nr)]
triplets = sorted(zip(lower, upper, phys, strict=False))
lower = [e[0] for e in triplets]
upper = [e[1] for e in triplets]
phys = [e[2] for e in triplets]
self._cache = {
"phys": phys,
"lower": lower,
"upper": upper,
"type": "small",
}
else:
phys = self._cache["phys"]
lower = typing.cast(list[object], self._cache["lower"])
upper = typing.cast(list[object], self._cache["upper"])
ref = self.referenced_blocks["default_addr"]
if ref is None:
default = identical
else:
default = ref
default_is_bytes = isinstance(default, bytes)
ret = []
objects = typing.cast(list[Any], new_values.tolist())
all_bytes = True
if new_values.dtype.kind in "ui":
for v in objects:
for l, u, p in zip(lower, upper, phys, strict=False):
if l <= v <= u:
if isinstance(p, bytes):
ret.append(p)
else:
p = p.convert([v], ignore_value2text_conversions=ignore_value2text_conversions)[0]
ret.append(p)
if all_bytes and not isinstance(p, bytes):
all_bytes = False
break
else:
if default_is_bytes:
default = typing.cast(bytes, default)
ret.append(default)
else:
default = typing.cast(ChannelConversion, default)
v_ = default.convert([v], ignore_value2text_conversions=ignore_value2text_conversions)[0]
ret.append(v_)
if all_bytes and not isinstance(v_, bytes):
all_bytes = False
else:
for v in objects:
for l, u, p in zip(lower, upper, phys, strict=False):
if l <= v < u:
if isinstance(p, bytes):
ret.append(p)
else:
p = p.convert([v], ignore_value2text_conversions=ignore_value2text_conversions)[0]
ret.append(p)
if all_bytes and not isinstance(p, bytes):
all_bytes = False
break
else:
if default_is_bytes:
default = typing.cast(bytes, default)
ret.append(default)
else:
default = typing.cast(ChannelConversion, default)
v_ = default.convert([v], ignore_value2text_conversions=ignore_value2text_conversions)[0]
ret.append(v_)
if all_bytes and not isinstance(v_, bytes):
all_bytes = False
if not all_bytes:
try:
ret_vals = np.array(ret, dtype="f8")
except:
ret_vals = np.array(ret, dtype="O")
if as_bytes:
ret_vals = ret_vals.astype(bytes)
elif not as_object:
ret_vals = np.array([np.nan if isinstance(v, bytes) else v for v in ret_vals.tolist()])
else:
ret_vals = np.array(ret, dtype=bytes)
new_values = ret_vals
elif conversion_type == v4c.CONVERSION_TYPE_TTAB:
nr = self.val_param_nr - 1
phys_2 = [self.referenced_blocks[f"text_{i}"] for i in range(nr)]
raw = [self[f"val_{i}"] for i in range(nr)]
val_default = self.val_default
ret = []
for val in new_values:
try:
obj = raw[phys_2.index(val)]
except ValueError:
obj = val_default
ret.append(obj)
new_values = np.array(ret)
elif conversion_type == v4c.CONVERSION_TYPE_TRANS:
if not ignore_value2text_conversions:
nr = (self.ref_param_nr - 1) // 2
in_ = [self.referenced_blocks[f"input_{i}_addr"] for i in range(nr)]
out_ = [self.referenced_blocks[f"output_{i}_addr"] for i in range(nr)]
default_addr = self.referenced_blocks["default_addr"]
ret = []
for val in new_values:
try:
obj = out_[in_.index(val.strip(b"\0"))]
except ValueError:
obj = default_addr
ret.append(obj)
new_values = np.array(ret)
elif conversion_type == v4c.CONVERSION_TYPE_BITFIELD:
if not ignore_value2text_conversions:
nr = self.val_param_nr
ref_blocks = [self.referenced_blocks[f"text_{i}"] for i in range(nr)]
masks = np.array([self[f"mask_{i}"] for i in range(nr)], dtype="u8")
block_or_cc_list = [
(
block
if isinstance(block, bytes)
else ((f"{block.name}=".encode(), block) if block.name else (b"", block))
)
for block in ref_blocks
]
bytes_values: list[bytes] = []
values_as_int = new_values.astype("u8")
non_int = new_values != values_as_int
for val in typing.cast(list[int], values_as_int.tolist()):
new_val: list[bytes] = []
masked_values = typing.cast(list[int], (masks & val).tolist())
for on, conv in zip(masked_values, block_or_cc_list, strict=False):
if isinstance(conv, bytes):
if conv:
new_val.append(conv)
else:
prefix, conversion = conv
converted_val = conversion.convert(
[on], ignore_value2text_conversions=ignore_value2text_conversions
)[0]
if converted_val:
if prefix:
new_val.append(prefix + converted_val)
else:
new_val.append(converted_val)
elif prefix:
new_val.append(prefix)
bytes_values.append(b"|".join(new_val))
new_values = np.array(bytes_values, dtype=bytes)
new_values[non_int] = b""
if scalar:
return typing.cast(np.number[Any], new_values[0])
else:
return new_values
def metadata(self, indent: str = "") -> str:
keys: tuple[str, ...]
if self.conversion_type == v4c.CONVERSION_TYPE_NON:
keys = v4c.KEYS_CONVERSION_NONE
elif self.conversion_type == v4c.CONVERSION_TYPE_LIN:
keys = v4c.KEYS_CONVERSION_LINEAR
elif self.conversion_type == v4c.CONVERSION_TYPE_RAT:
keys = v4c.KEYS_CONVERSION_RAT
elif self.conversion_type == v4c.CONVERSION_TYPE_ALG:
keys = v4c.KEYS_CONVERSION_ALGEBRAIC
elif self.conversion_type in (
v4c.CONVERSION_TYPE_TABI,
v4c.CONVERSION_TYPE_TAB,
):
keys = v4c.KEYS_CONVERSION_NONE
for i in range(self.val_param_nr // 2):
keys += (f"raw_{i}", f"phys_{i}")
elif self.conversion_type == v4c.CONVERSION_TYPE_RTAB:
keys = v4c.KEYS_CONVERSION_NONE
for i in range(self.val_param_nr // 3):
keys += (f"lower_{i}", f"upper_{i}", f"phys_{i}")
keys += ("default",)
elif self.conversion_type == v4c.CONVERSION_TYPE_TABX:
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"name_addr",
"unit_addr",
"comment_addr",
"inv_conv_addr",
)
keys += tuple(f"text_{i}" for i in range(self.links_nr - 4 - 1))
keys += ("default_addr",)
keys += (
"conversion_type",
"precision",
"flags",
"ref_param_nr",
"val_param_nr",
"min_phy_value",
"max_phy_value",
)
keys += tuple(f"val_{i}" for i in range(self.val_param_nr))
elif self.conversion_type == v4c.CONVERSION_TYPE_RTABX:
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"name_addr",
"unit_addr",
"comment_addr",
"inv_conv_addr",
)
keys += tuple(f"text_{i}" for i in range(self.links_nr - 4 - 1))
keys += ("default_addr",)
keys += (
"conversion_type",
"precision",
"flags",
"ref_param_nr",
"val_param_nr",
"min_phy_value",
"max_phy_value",
)
for i in range(self.val_param_nr // 2):
keys += (f"lower_{i}", f"upper_{i}")
elif self.conversion_type == v4c.CONVERSION_TYPE_TTAB:
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"name_addr",
"unit_addr",
"comment_addr",
"inv_conv_addr",
)
keys += tuple(f"text_{i}" for i in range(self.links_nr - 4))
keys += (
"conversion_type",
"precision",
"flags",
"ref_param_nr",
"val_param_nr",
"min_phy_value",
"max_phy_value",
)
keys += tuple(f"val_{i}" for i in range(self.val_param_nr - 1))
keys += ("val_default",)
elif self.conversion_type == v4c.CONVERSION_TYPE_TRANS:
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"name_addr",
"unit_addr",
"comment_addr",
"inv_conv_addr",
)
for i in range((self.links_nr - 4 - 1) // 2):
keys += (f"input_{i}_addr", f"output_{i}_addr")
keys += (
"default_addr",
"conversion_type",
"precision",
"flags",
"ref_param_nr",
"val_param_nr",
"min_phy_value",
"max_phy_value",
)
keys += tuple(f"val_{i}" for i in range(self.val_param_nr - 1))
elif self.conversion_type == v4c.CONVERSION_TYPE_BITFIELD:
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"name_addr",
"unit_addr",
"comment_addr",
"inv_conv_addr",
)
keys += tuple(f"text_{i}" for i in range(self.links_nr - 4))
keys += (
"conversion_type",
"precision",
"flags",
"ref_param_nr",
"val_param_nr",
"min_phy_value",
"max_phy_value",
)
keys += tuple(f"mask_{i}" for i in range(self.val_param_nr))
max_len = max(len(key) for key in keys)
template = f"{{: <{max_len}}}: {{}}"
metadata: list[str] = []
lines = f"""
name: {self.name}
unit: {self.unit}
address: {hex(self.address)}
comment: {self.comment}
formula: {self.formula}
""".split("\n")
for key in keys:
val = getattr(self, key)
if key.endswith("addr") or key.startswith("text_"):
lines.append(template.format(key, hex(val)))
elif isinstance(val, float):
lines.append(template.format(key, val))
else:
if isinstance(val, bytes):
lines.append(template.format(key, val.strip(b"\0")))
else:
lines.append(template.format(key, val))
if key == "conversion_type":
lines[-1] += f" = {v4c.CONVERSION_TYPE_TO_STRING[self.conversion_type]}"
elif self.referenced_blocks and key in self.referenced_blocks:
val = self.referenced_blocks[key]
if isinstance(val, bytes):
lines[-1] += f" (= {str(val)[1:]})"
else:
lines[-1] += f" (= CCBLOCK @ {hex(val.address)})"
if self.referenced_blocks:
max_len = max(len(key) for key in self.referenced_blocks)
template = f"{{: <{max_len}}}: {{}}"
lines.append("")
lines.append("Referenced blocks:")
for key, block in self.referenced_blocks.items():
if isinstance(block, ChannelConversion):
lines.append(template.format(key, ""))
lines.extend(block.metadata(indent + " ").split("\n"))
else:
lines.append(template.format(key, str(block)[1:]))
for line in lines:
if not line:
metadata.append(line)
else:
for wrapped_line in wrap(line, initial_indent=indent, subsequent_indent=indent, width=120):
metadata.append(wrapped_line)
return "\n".join(metadata)
def __getitem__(self, item: str) -> object:
return getattr(self, item)
def __setitem__(self, item: str, value: object) -> None:
setattr(self, item, value)
def __contains__(self, item: str) -> bool:
return hasattr(self, item)
def __bytes__(self) -> bytes:
keys: tuple[str, ...]
if self.conversion_type == v4c.CONVERSION_TYPE_NON:
result = v4c.CONVERSION_NONE_PACK(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
)
elif self.conversion_type == v4c.CONVERSION_TYPE_LIN:
result = v4c.CONVERSION_LINEAR_PACK(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
self.b,
self.a,
)
elif self.conversion_type == v4c.CONVERSION_TYPE_RAT:
result = v4c.CONVERSION_RAT_PACK(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
self.P1,
self.P2,
self.P3,
self.P4,
self.P5,
self.P6,
)
elif self.conversion_type == v4c.CONVERSION_TYPE_ALG:
result = v4c.CONVERSION_ALGEBRAIC_PACK(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
self.formula_addr,
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
)
elif self.conversion_type in (
v4c.CONVERSION_TYPE_TABI,
v4c.CONVERSION_TYPE_TAB,
):
fmt = f"<4sI{self.links_nr + 2}Q2B3H{self.val_param_nr + 2}d"
keys = v4c.KEYS_CONVERSION_NONE
for i in range(self.val_param_nr // 2):
keys += (f"raw_{i}", f"phys_{i}")
result = pack(fmt, *[getattr(self, key) for key in keys])
elif self.conversion_type == v4c.CONVERSION_TYPE_RTAB:
fmt = f"<4sI{self.links_nr + 2}Q2B3H{self.val_param_nr + 2}d"
keys = v4c.KEYS_CONVERSION_NONE
for i in range(self.val_param_nr // 3):
keys += (f"lower_{i}", f"upper_{i}", f"phys_{i}")
keys += ("default",)
result = pack(fmt, *[getattr(self, key) for key in keys])
elif self.conversion_type == v4c.CONVERSION_TYPE_TABX:
fmt = f"<4sI{self.links_nr + 2}Q2B3H{self.val_param_nr + 2}d"
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"name_addr",
"unit_addr",
"comment_addr",
"inv_conv_addr",
)
keys += tuple(f"text_{i}" for i in range(self.links_nr - 4 - 1))
keys += ("default_addr",)
keys += (
"conversion_type",
"precision",
"flags",
"ref_param_nr",
"val_param_nr",
"min_phy_value",
"max_phy_value",
)
keys += tuple(f"val_{i}" for i in range(self.val_param_nr))
result = pack(fmt, *[getattr(self, key) for key in keys])
elif self.conversion_type == v4c.CONVERSION_TYPE_RTABX:
fmt = f"<4sI{self.links_nr + 2}Q2B3H{self.val_param_nr + 2}d"
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"name_addr",
"unit_addr",
"comment_addr",
"inv_conv_addr",
)
keys += tuple(f"text_{i}" for i in range(self.links_nr - 4 - 1))
keys += ("default_addr",)
keys += (
"conversion_type",
"precision",
"flags",
"ref_param_nr",
"val_param_nr",
"min_phy_value",
"max_phy_value",
)
for i in range(self.val_param_nr // 2):
keys += (f"lower_{i}", f"upper_{i}")
result = pack(fmt, *[getattr(self, key) for key in keys])
elif self.conversion_type == v4c.CONVERSION_TYPE_TTAB:
fmt = f"<4sI{self.links_nr + 2}Q2B3H{self.val_param_nr + 2}d"
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"name_addr",
"unit_addr",
"comment_addr",
"inv_conv_addr",
)
keys += tuple(f"text_{i}" for i in range(self.links_nr - 4))
keys += (
"conversion_type",
"precision",
"flags",
"ref_param_nr",
"val_param_nr",
"min_phy_value",
"max_phy_value",
)
keys += tuple(f"val_{i}" for i in range(self.val_param_nr - 1))
keys += ("val_default",)
result = pack(fmt, *[getattr(self, key) for key in keys])
elif self.conversion_type == v4c.CONVERSION_TYPE_TRANS:
fmt = f"<4sI{self.links_nr + 2}Q2B3H{self.val_param_nr + 2}d"
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"name_addr",
"unit_addr",
"comment_addr",
"inv_conv_addr",
)
for i in range((self.links_nr - 4 - 1) // 2):
keys += (f"input_{i}_addr", f"output_{i}_addr")
keys += (
"default_addr",
"conversion_type",
"precision",
"flags",
"ref_param_nr",
"val_param_nr",
"min_phy_value",
"max_phy_value",
)
keys += tuple(f"val_{i}" for i in range(self.val_param_nr - 1))
result = pack(fmt, *[getattr(self, key) for key in keys])
elif self.conversion_type == v4c.CONVERSION_TYPE_BITFIELD:
fmt = f"<4sI{self.links_nr + 2}Q2B3H2d{self.val_param_nr}Q"
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"name_addr",
"unit_addr",
"comment_addr",
"inv_conv_addr",
)
keys += tuple(f"text_{i}" for i in range(self.val_param_nr))
keys += (
"conversion_type",
"precision",
"flags",
"ref_param_nr",
"val_param_nr",
"min_phy_value",
"max_phy_value",
)
keys += tuple(f"mask_{i}" for i in range(self.val_param_nr))
result = pack(fmt, *[getattr(self, key) for key in keys])
return result
def __str__(self) -> str:
return f"<ChannelConversion (name: {self.name}, unit: {self.unit}, comment: {self.comment}, formula: {self.formula}, referenced blocks: {self.referenced_blocks}, address: {self.address}, fields: {block_fields(self)})>"
class DataBlockKwargs(BlockKwargs, total=False):
data: bytes | bytearray
type: Literal["DT", "SD", "RD", "DV", "DI"]
file_limit: int | float
[docs]
class DataBlock:
"""Common implementation for DTBLOCK/RDBLOCK/SDBLOCK/DVBLOCK/DIBLOCK.
`DataBlock` has the following attributes, which are also available as
dict-like key-value pairs.
DTBLOCK fields:
* ``id`` - bytes : block ID; b'##DT' for DTBLOCK, b'##RD' for RDBLOCK,
b'##SD' for SDBLOCK, b'##DV' for DVBLOCK or b'##DI' for DIBLOCK
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``data`` - bytes : raw samples
Other attributes:
* ``address`` - int : data block address
Parameters
----------
address : int
DTBLOCK/RDBLOCK/SDBLOCK/DVBLOCK/DIBLOCK address inside the file.
stream : int
File handle.
reduction : bool
Sample reduction data block.
"""
__slots__ = ("address", "block_len", "data", "id", "links_nr", "reserved0")
def __init__(self, **kwargs: Unpack[DataBlockKwargs]) -> None:
self.data: bytes | bytearray
try:
self.address = address = kwargs["address"]
stream = kwargs["stream"]
mapped = kwargs.get("mapped", False) or not is_file_like(stream)
file_limit = kwargs["file_limit"]
if address + COMMON_SIZE > file_limit:
handle_incomplete_block(address, file_limit)
raise KeyError
if mapped:
self.id, self.reserved0, self.block_len, self.links_nr = COMMON_uf(stream, address)
if address + self.block_len > file_limit:
handle_incomplete_block(address, file_limit)
raise KeyError
if self.id not in (b"##DT", b"##RD", b"##SD", b"##DV", b"##DI"):
message = f'Expected "##DT", "##DV", "##DI", "##RD" or "##SD" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
self.data = stream[address + COMMON_SIZE : address + self.block_len]
else:
stream.seek(address)
self.id, self.reserved0, self.block_len, self.links_nr = COMMON_u(stream.read(COMMON_SIZE))
if address + self.block_len > file_limit:
handle_incomplete_block(address, file_limit)
raise KeyError
if self.id not in (b"##DT", b"##RD", b"##SD", b"##DV", b"##DI"):
message = f'Expected "##DT", "##DV", "##DI", "##RD" or "##SD" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
self.data = stream.read(self.block_len - COMMON_SIZE)
except KeyError:
self.address = 0
type = kwargs.get("type", "DT")
if type not in ("DT", "SD", "RD", "DV", "DI"):
type = "DT"
self.id = f"##{type}".encode("ascii")
self.reserved0 = 0
self.block_len = len(kwargs.get("data", b"")) + COMMON_SIZE
self.links_nr = 0
self.data = kwargs.get("data", b"")
def __getitem__(self, item: str) -> object:
return getattr(self, item)
def __setitem__(self, item: str, value: object) -> None:
setattr(self, item, value)
def __bytes__(self) -> bytes:
return v4c.COMMON_p(self.id, self.reserved0, self.block_len, self.links_nr) + self.data
class DataZippedBlockKwargs(BlockKwargs, total=False):
compression_level: int
data: bytes | bytearray
original_type: bytes
zip_type: int
param: int
transposed: bool
file_limit: int | float
class DataZippedBlock:
"""`DataZippedBlock` has the following attributes, which are also available
as dict-like key-value pairs.
DZBLOCK fields:
* ``id`` - bytes : block ID; always b'##DZ'
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``original_type`` - bytes : b'DT', b'SD', b'DI' or b'DV'
* ``zip_type`` - int : zip algorithm used
* ``reserved1`` - int : reserved bytes
* ``param`` - int : for transpose deflate the record size used for
transposition
* ``original_size`` - int : size of the original uncompressed raw bytes
* ``zip_size`` - int : size of compressed bytes
* ``data`` - bytes : compressed bytes
Other attributes:
* ``address`` - int : data zipped block address
* ``return_unzipped`` - bool : decompress data when accessing the 'data'
key
Parameters
----------
address : int
DTBLOCK address inside the file.
stream : int
File handle.
"""
__slots__ = (
"_prevent_data_setitem",
"_transposed",
"address",
"block_len",
"compression_level",
"data",
"id",
"links_nr",
"original_size",
"original_type",
"param",
"reserved0",
"reserved1",
"return_unzipped",
"zip_size",
"zip_type",
)
def __init__(self, **kwargs: Unpack[DataZippedBlockKwargs]) -> None:
self.data: bytes | bytearray
self._prevent_data_setitem = True
self._transposed = False
self.compression_level = kwargs.get("compression_level", COMPRESSION_LEVEL)
try:
self.address = address = kwargs["address"]
stream = kwargs["stream"]
file_limit = kwargs["file_limit"]
if address + v4c.DZ_COMMON_SIZE > file_limit:
handle_incomplete_block(address, file_limit)
raise KeyError
stream.seek(address)
(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.original_type,
self.zip_type,
self.reserved1,
self.param,
self.original_size,
self.zip_size,
) = typing.cast(v4c.DzCommon, unpack(v4c.FMT_DZ_COMMON, stream.read(v4c.DZ_COMMON_SIZE)))
if self.id != b"##DZ":
message = f'Expected "##DZ" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
if address + self.block_len > file_limit:
handle_incomplete_block(address, file_limit)
raise KeyError
self.data = stream.read(self.zip_size)
except KeyError:
self._prevent_data_setitem = False
self.address = 0
data = kwargs.get("data", b"")
self.id = b"##DZ"
self.reserved0 = 0
self.block_len = 0
self.links_nr = 0
self.original_type = kwargs.get("original_type", b"DT")
self.zip_type = kwargs.get("zip_type", v4c.FLAG_DZ_DEFLATE)
self.reserved1 = 0
if self.zip_type in (v4c.FLAG_DZ_DEFLATE, v4c.FLAG_DZ_ZSTD, v4c.FLAG_DZ_LZ4):
self.param = 0
else:
self.param = kwargs["param"]
self._transposed = kwargs.get("transposed", False)
self.data = data
self._prevent_data_setitem = False
self.return_unzipped = True
def __setattr__(self, item: str, value: object) -> None:
if item == "data" and not self._prevent_data_setitem:
data = typing.cast(bytes | bytearray, value)
original_size = len(data)
self.original_size = original_size
compress_func: Callable[[Buffer, int], bytes]
if self.zip_type in (v4c.FLAG_DZ_DEFLATE, v4c.FLAG_DZ_TRANSPOSED_DEFLATE):
compress_func = compress
elif self.zip_type in (v4c.FLAG_DZ_LZ4, v4c.FLAG_DZ_TRANSPOSED_LZ4):
compress_func = lz_compress
elif self.zip_type in (v4c.FLAG_DZ_ZSTD, v4c.FLAG_DZ_TRANSPOSED_ZSTD):
compress_func = zstd_compress
data = bytes(data) # must be a read only buffer
if self.zip_type in (v4c.FLAG_DZ_DEFLATE, v4c.FLAG_DZ_LZ4, v4c.FLAG_DZ_ZSTD):
data = compress_func(data, self.compression_level)
else:
if not self._transposed:
cols = self.param
lines = original_size // cols
if lines * cols < original_size:
data_view = memoryview(data)
data = (
np.frombuffer(data_view[: lines * cols], dtype="B")
.reshape((lines, cols))
.T.ravel()
.tobytes()
) + data_view[lines * cols :]
else:
data = np.frombuffer(data, dtype=np.uint8).reshape((lines, cols)).T.ravel().tobytes()
data = compress_func(data, self.compression_level)
zipped_size = len(data)
self.zip_size = zipped_size
self.block_len = zipped_size + v4c.DZ_COMMON_SIZE
DataZippedBlock.__dict__[item].__set__(self, data)
DataZippedBlock.__dict__["_transposed"].__set__(self, False)
else:
DataZippedBlock.__dict__[item].__set__(self, value)
DataZippedBlock.__dict__["_transposed"].__set__(self, False)
def __getattribute__(self, item: str) -> object:
if item == "data":
if self.return_unzipped:
data = DataZippedBlock.__dict__[item].__get__(self)
original_size = self.original_size
if self.zip_type in (v4c.FLAG_DZ_DEFLATE, v4c.FLAG_DZ_TRANSPOSED_DEFLATE):
data = decompress(data, bufsize=original_size)
elif self.zip_type in (v4c.FLAG_DZ_LZ4, v4c.FLAG_DZ_TRANSPOSED_LZ4):
data = lz_decompress(data)
elif self.zip_type in (v4c.FLAG_DZ_ZSTD, v4c.FLAG_DZ_TRANSPOSED_ZSTD):
data = zstd_decompress(data)
if self.zip_type == v4c.FLAG_DZ_TRANSPOSED_DEFLATE:
cols = self.param
lines = original_size // cols
if lines * cols < original_size:
data = memoryview(data)
data = (
np.frombuffer(data[: lines * cols], dtype=np.uint8)
.reshape((cols, lines))
.T.ravel()
.tobytes()
) + data[lines * cols :]
else:
data = np.frombuffer(data, dtype=np.uint8).reshape((cols, lines)).T.ravel().tobytes()
else:
data = DataZippedBlock.__dict__[item].__get__(self)
value = data
else:
value = DataZippedBlock.__dict__[item].__get__(self)
return value
def __setitem__(self, item: str, value: object) -> None:
setattr(self, item, value)
def __getitem__(self, item: str) -> object:
return getattr(self, item)
def __str__(self) -> str:
return f"""<DZBLOCK (address: {hex(self.address)}, original_size: {self.original_size}, zipped_size: {self.zip_size}, algo: {self.zip_type})>"""
def __bytes__(self) -> bytes:
self.return_unzipped = False
data = (
v4c.DZ_COMMON_p(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.original_type,
self.zip_type,
self.reserved1,
self.param,
self.original_size,
self.zip_size,
)
+ self.data
)
self.return_unzipped = True
return data
class DataGroupKwargs(BlockKwargs, total=False):
id: bytes
reserved0: int
block_len: int
links_nr: int
next_dg_addr: int
first_cg_addr: int
data_block_addr: int
comment_addr: int
record_id_len: int
reserved1: bytes
file_limit: int | float
[docs]
class DataGroup:
"""`DataGroup` has the following attributes, which are also available as
dict-like key-value pairs.
DGBLOCK fields:
* ``id`` - bytes : block ID; always b'##DG'
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``next_dg_addr`` - int : address of next data group block
* ``first_cg_addr`` - int : address of first channel group for this data
group
* ``data_block_addr`` - int : address of DTBLOCK, DZBLOCK, DLBLOCK or
HLBLOCK that contains the raw samples for this data group
* ``comment_addr`` - int : address of TXBLOCK/MDBLOCK that contains the
data group comment
* ``record_id_len`` - int : size of record ID used in case of unsorted
data groups; can be 1, 2, 4 or 8
* ``reserved1`` - int : reserved bytes
Other attributes:
* ``address`` - int : data group address
* ``comment`` - str : data group comment
"""
__slots__ = (
"address",
"block_len",
"comment",
"comment_addr",
"data_block_addr",
"first_cg_addr",
"id",
"links_nr",
"next_dg_addr",
"record_id_len",
"reserved0",
"reserved1",
)
def __init__(self, **kwargs: Unpack[DataGroupKwargs]) -> None:
self.comment = ""
try:
self.address = address = kwargs["address"]
stream = kwargs["stream"]
mapped = kwargs.get("mapped", False) or not is_file_like(stream)
file_limit = kwargs["file_limit"]
if address + v4c.DG_BLOCK_SIZE > file_limit:
handle_incomplete_block(address, file_limit)
raise KeyError
if mapped:
(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.next_dg_addr,
self.first_cg_addr,
self.data_block_addr,
self.comment_addr,
self.record_id_len,
self.reserved1,
) = v4c.DATA_GROUP_uf(stream, address)
else:
stream.seek(address)
(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.next_dg_addr,
self.first_cg_addr,
self.data_block_addr,
self.comment_addr,
self.record_id_len,
self.reserved1,
) = v4c.DATA_GROUP_u(stream.read(v4c.DG_BLOCK_SIZE))
if self.id != b"##DG":
message = f'Expected "##DG" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
self.comment = get_text_v4(self.comment_addr, stream, mapped=mapped, file_limit=file_limit)
except KeyError:
self.address = 0
self.id = b"##DG"
self.reserved0 = kwargs.get("reserved0", 0)
self.block_len = kwargs.get("block_len", v4c.DG_BLOCK_SIZE)
self.links_nr = kwargs.get("links_nr", 4)
self.next_dg_addr = kwargs.get("next_dg_addr", 0)
self.first_cg_addr = kwargs.get("first_cg_addr", 0)
self.data_block_addr = kwargs.get("data_block_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.record_id_len = kwargs.get("record_id_len", 0)
self.reserved1 = kwargs.get("reserved1", b"\00" * 7)
def copy(self) -> "DataGroup":
dg = DataGroup(
id=self.id,
reserved0=self.reserved0,
block_len=self.block_len,
links_nr=self.links_nr,
next_dg_addr=self.next_dg_addr,
first_cg_addr=self.first_cg_addr,
data_block_addr=self.data_block_addr,
comment_addr=self.comment_addr,
record_id_len=self.record_id_len,
reserved1=self.reserved1,
)
dg.comment = self.comment
dg.address = self.address
return dg
def to_blocks(
self, address: int, blocks: list[bytes | SupportsBytes], defined_texts: dict[bytes | str, int]
) -> int:
text = self.comment
if text:
if text in defined_texts:
self.comment_addr = defined_texts[text]
else:
meta = text.startswith("<DGcomment")
tx_block = TextBlock(text=text, meta=meta)
self.comment_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.comment_addr = 0
blocks.append(self)
self.address = address
address += self.block_len
return address
def __getitem__(self, item: str) -> object:
return getattr(self, item)
def __setitem__(self, item: str, value: object) -> None:
setattr(self, item, value)
def __bytes__(self) -> bytes:
result = v4c.DATA_GROUP_p(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.next_dg_addr,
self.first_cg_addr,
self.data_block_addr,
self.comment_addr,
self.record_id_len,
self.reserved1,
)
return result
class _DataListBase:
__slots__ = (
"address",
"block_len",
"data_block_len",
"data_block_nr",
"flags",
"id",
"links_nr",
"next_dl_addr",
"reserved0",
"reserved1",
)
class DataListKwargs(BlockKwargs, total=False):
links_nr: int
file_limit: int
flags: int
reserved1: bytes
data_block_nr: int
data_block_len: int
[docs]
class DataList(_DataListBase):
"""`DataList` has the following attributes, which are also available as
dict-like key-value pairs.
DLBLOCK common fields:
* ``id`` - bytes : block ID; always b'##DL'
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``next_dl_addr`` - int : address of next DLBLOCK
* ``data_block_addr<N>`` - int : address of N-th data block
* ``flags`` - int : data list flags
* ``reserved1`` - int : reserved bytes
* ``data_block_nr`` - int : number of data blocks referenced by this list
DLBLOCK specific fields:
* for equal-length blocks
* ``data_block_len`` - int : equal uncompressed size in bytes for all
referenced data blocks; last block can be smaller
* for variable-length blocks
* ``offset_<N>`` - int : byte offset of N-th data block
Other attributes:
* ``address`` - int : data list address
"""
def __init__(self, **kwargs: Unpack[DataListKwargs]) -> None:
try:
self.address = address = kwargs["address"]
stream = kwargs["stream"]
mapped = kwargs.get("mapped", False) or not is_file_like(stream)
file_limit = kwargs.get("file_limit", float("inf"))
if mapped:
if self.address + COMMON_SIZE > file_limit:
logger.warning(f"incomplete block at 0x{self.address:x} exceeds the file size")
self.next_dl_addr = 0
self.data_block_nr = 0
return
self.id, self.reserved0, self.block_len, self.links_nr = COMMON_uf(stream, address)
if self.id != b"##DL":
message = f'Expected "##DL" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
if self.address + self.block_len > file_limit:
logger.warning(f"incomplete block at 0x{self.address:x} exceeds the file size")
self.next_dl_addr = 0
self.data_block_nr = 0
return
address += COMMON_SIZE
links: tuple[int, ...] = unpack_from(f"<{self.links_nr}Q", stream, address)
self.next_dl_addr = links[0]
for i, addr in enumerate(links[1:]):
setattr(self, f"data_block_addr{i}", addr)
stream.seek(address + self.links_nr * 8)
self.flags = stream.read_byte()
if self.flags & v4c.FLAG_DL_EQUAL_LENGHT:
self.reserved1, self.data_block_nr, self.data_block_len = typing.cast(
tuple[bytes, int, int], unpack("<3sIQ", stream.read(15))
)
else:
self.reserved1, self.data_block_nr = typing.cast(tuple[bytes, int], unpack("<3sI", stream.read(7)))
offsets: tuple[int, ...] = unpack(
f"<{self.links_nr - 1}Q",
stream.read((self.links_nr - 1) * 8),
)
for i, offset in enumerate(offsets):
setattr(self, f"offset_{i}", offset)
else:
if self.address + COMMON_SIZE > file_limit:
logger.warning(f"incomplete block at 0x{self.address:x} exceeds the file size")
self.next_dl_addr = 0
self.data_block_nr = 0
return
stream.seek(address)
self.id, self.reserved0, self.block_len, self.links_nr = COMMON_u(stream.read(COMMON_SIZE))
if self.address + self.block_len > file_limit:
logger.warning(f"incomplete block at 0x{self.address:x} exceeds the file size")
self.next_dl_addr = 0
self.data_block_nr = 0
return
if self.id != b"##DL":
message = f'Expected "##DL" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
links = unpack(f"<{self.links_nr}Q", stream.read(self.links_nr * 8))
self.next_dl_addr = links[0]
for i, addr in enumerate(links[1:]):
self[f"data_block_addr{i}"] = addr
self.flags = stream.read(1)[0]
if self.flags & v4c.FLAG_DL_EQUAL_LENGHT:
self.reserved1, self.data_block_nr, self.data_block_len = typing.cast(
tuple[bytes, int, int], unpack("<3sIQ", stream.read(15))
)
else:
self.reserved1, self.data_block_nr = typing.cast(tuple[bytes, int], unpack("<3sI", stream.read(7)))
offsets = unpack(
f"<{self.links_nr - 1}Q",
stream.read((self.links_nr - 1) * 8),
)
for i, offset in enumerate(offsets):
self[f"offset_{i}"] = offset
except KeyError:
self.address = 0
self.id = b"##DL"
self.reserved0 = 0
self.block_len = 40 + 8 * kwargs.get("links_nr", 2)
self.links_nr = kwargs.get("links_nr", 2)
self.next_dl_addr = 0
for i in range(self.links_nr - 1):
self[f"data_block_addr{i}"] = kwargs.get(f"data_block_addr{i}", 0)
self.flags = kwargs.get("flags", 1)
self.reserved1 = kwargs.get("reserved1", b"\0\0\0")
self.data_block_nr = kwargs.get("data_block_nr", self.links_nr - 1)
if self.flags & v4c.FLAG_DL_EQUAL_LENGHT:
self.data_block_len = kwargs["data_block_len"]
else:
self.block_len = 40 + 8 * kwargs.get("links_nr", 2) - 8 + 8 * self.data_block_nr
for i in range(self.data_block_nr):
self[f"offset_{i}"] = kwargs[f"offset_{i}"] # type: ignore[literal-required]
def __getitem__(self, item: str) -> object:
return getattr(self, item)
def __setitem__(self, item: str, value: object) -> None:
setattr(self, item, value)
def __bytes__(self) -> bytes:
keys: tuple[str, ...] = ("id", "reserved0", "block_len", "links_nr", "next_dl_addr")
keys += tuple(f"data_block_addr{i}" for i in range(self.links_nr - 1))
if self.flags & v4c.FLAG_DL_EQUAL_LENGHT:
keys += ("flags", "reserved1", "data_block_nr", "data_block_len")
fmt = v4c.FMT_DATA_EQUAL_LIST.format(self.links_nr)
else:
keys += ("flags", "reserved1", "data_block_nr")
keys += tuple(f"offset_{i}" for i in range(self.data_block_nr))
fmt = v4c.FMT_DATA_LIST.format(self.links_nr, self.data_block_nr)
result = pack(fmt, *[getattr(self, key) for key in keys])
return result
class _EventBlockBase:
__slots__ = (
"address",
"attachment_nr",
"block_len",
"cause",
"comment",
"comment_addr",
"creator_index",
"event_type",
"flags",
"group_name",
"group_name_addr",
"id",
"links_nr",
"name",
"name_addr",
"next_ev_addr",
"parent",
"parent_ev_addr",
"range_start",
"range_start_ev_addr",
"range_type",
"reserved0",
"reserved1",
"scope_nr",
"scopes",
"sync_base",
"sync_factor",
"sync_type",
"tool",
)
class EventBlockKwargs(BlockKwargs, total=False):
next_ev_addr: int
parent_ev_addr: int
range_start_ev_addr: int
name_addr: int
comment_addr: int
event_type: int
sync_type: int
range_type: int
cause: int
flags: int
sync_base: int
sync_factor: float
group_name_addr: int
file_limit: int | float
[docs]
class EventBlock(_EventBlockBase):
"""`EventBlock` has the following attributes, which are also available as
dict-like key-value pairs.
EVBLOCK fields:
* ``id`` - bytes : block ID; always b'##EV'
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``next_ev_addr`` - int : address of next EVBLOCK
* ``parent_ev_addr`` - int : address of parent EVLBOCK
* ``range_start_ev_addr`` - int : address of EVBLOCK that is the start of
the range for which this event is the end
* ``name_addr`` - int : address of TXBLOCK that contains the event name
* ``comment_addr`` - int : address of TXBLOCK/MDBLOCK that contains the
event comment
* ``scope_<N>_addr`` - int : address of N-th block that represents a scope
for this event (can be CGBLOCK, CHBLOCK, DGBLOCK)
* ``attachemnt_<N>_addr`` - int : address of N-th attachment referenced by
this event
* ``event_type`` - int : integer code for event type
* ``sync_type`` - int : integer code for event sync type
* ``range_type`` - int : integer code for event range type
* ``cause`` - int : integer code for event cause
* ``flags`` - int : event flags
* ``reserved1`` - int : reserved bytes
* ``scope_nr`` - int : number of scopes referenced by this event
* ``attachment_nr`` - int : number of attachments referenced by this event
* ``creator_index`` - int : index of FHBLOCK
* ``sync_base`` - int : timestamp base value
* ``sync_factor`` - float : timestamp factor
Other attributes:
* ``address`` - int : event block address
* ``comment`` - str : event comment
* ``name`` - str : event name
* ``parent`` - int : index of event block that is the parent for the
current event
* ``range_start`` - int : index of event block that is the start of the
range for which the current event is the end
* ``scopes`` - list : list of (group index, channel index) or channel group
index that define the scope of the current event
"""
def __init__(self, **kwargs: Unpack[EventBlockKwargs]) -> None:
self.name = self.comment = self.group_name = ""
self.scopes: list[tuple[int, int] | int] = []
self.parent: int | None = None
self.range_start = None
try:
self.address = address = kwargs["address"]
stream = kwargs["stream"]
file_limit = kwargs["file_limit"]
if address + COMMON_SIZE > file_limit:
handle_incomplete_block(address, file_limit)
raise KeyError
stream.seek(address)
self.id, self.reserved0, self.block_len, self.links_nr = COMMON_u(stream.read(COMMON_SIZE))
if address + self.block_len > file_limit:
handle_incomplete_block(address, file_limit)
raise KeyError
block = stream.read(self.block_len - COMMON_SIZE)
links_nr = self.links_nr
links: tuple[int, ...] = unpack_from(f"<{links_nr}Q", block)
params: v4c.EventParams = unpack_from(v4c.FMT_EVENT_PARAMS, block, links_nr * 8)
(
self.next_ev_addr,
self.parent_ev_addr,
self.range_start_ev_addr,
self.name_addr,
self.comment_addr,
) = links[:5]
scope_nr = params[6]
for i in range(scope_nr):
self[f"scope_{i}_addr"] = links[5 + i]
attachment_nr = params[7]
for i in range(attachment_nr):
self[f"attachment_{i}_addr"] = links[5 + scope_nr + i]
(
self.event_type,
self.sync_type,
self.range_type,
self.cause,
self.flags,
self.reserved1,
self.scope_nr,
self.attachment_nr,
self.creator_index,
self.sync_base,
self.sync_factor,
) = params
if self.flags & v4c.FLAG_EV_GROUP_NAME:
self.group_name_addr = links[-1]
if self.id != b"##EV":
message = f'Expected "##EV" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
self.name = get_text_v4(self.name_addr, stream, file_limit=file_limit)
self.comment = get_text_v4(self.comment_addr, stream, file_limit=file_limit)
except KeyError:
self.address = 0
scopes = 0
while f"scope_{scopes}_addr" in kwargs:
scopes += 1
self.id = b"##EV"
self.reserved0 = 0
self.block_len = 56 + (scopes + 5) * 8
self.links_nr = scopes + 5
self.next_ev_addr = kwargs.get("next_ev_addr", 0)
self.parent_ev_addr = kwargs.get("parent_ev_addr", 0)
self.range_start_ev_addr = kwargs.get("range_start_ev_addr", 0)
self.name_addr = kwargs.get("name_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
for i in range(scopes):
self[f"scope_{i}_addr"] = kwargs[f"scope_{i}_addr"] # type: ignore[literal-required]
self.event_type = kwargs.get("event_type", v4c.EVENT_TYPE_TRIGGER)
self.sync_type = kwargs.get("sync_type", v4c.EVENT_SYNC_TYPE_S)
self.range_type = kwargs.get("range_type", v4c.EVENT_RANGE_TYPE_POINT)
self.cause = kwargs.get("cause", v4c.EVENT_CAUSE_TOOL)
self.flags = kwargs.get("flags", v4c.FLAG_EV_POST_PROCESSING)
self.reserved1 = b"\x00\x00\x00"
self.scope_nr = scopes
self.attachment_nr = 0
self.creator_index = 0
self.sync_base = kwargs.get("sync_base", 1)
self.sync_factor = kwargs.get("sync_factor", 1.0)
if self.flags & v4c.FLAG_EV_GROUP_NAME:
self.group_name_addr = kwargs.get("group_name_addr", 0)
self.tool = extract_ev_tool(self.comment)
def update_references(self, ch_map: dict[int, tuple[int, int]], cg_map: dict[int, int]) -> None:
self.scopes.clear()
for i in range(self.scope_nr):
addr = typing.cast(int, self[f"scope_{i}_addr"])
if addr in ch_map:
self.scopes.append(ch_map[addr])
elif addr in cg_map:
self.scopes.append(cg_map[addr])
else:
message = "{} is not a valid CNBLOCK or CGBLOCK address for the event scope"
message = message.format(hex(addr))
logger.exception(message)
raise MdfException(message)
def __bytes__(self) -> bytes:
fmt = v4c.FMT_EVENT.format(self.links_nr)
keys: tuple[str, ...] = (
"id",
"reserved0",
"block_len",
"links_nr",
"next_ev_addr",
"parent_ev_addr",
"range_start_ev_addr",
"name_addr",
"comment_addr",
)
keys += tuple(f"scope_{i}_addr" for i in range(self.scope_nr))
keys += tuple(f"attachment_{i}_addr" for i in range(self.attachment_nr))
if self.flags & v4c.FLAG_EV_GROUP_NAME:
keys += ("group_name_addr",)
keys += (
"event_type",
"sync_type",
"range_type",
"cause",
"flags",
"reserved1",
"scope_nr",
"attachment_nr",
"creator_index",
"sync_base",
"sync_factor",
)
result = pack(fmt, *[getattr(self, key) for key in keys])
return result
def __getitem__(self, item: str) -> object:
return getattr(self, item)
def __setitem__(self, item: str, value: object) -> None:
setattr(self, item, value)
def __str__(self) -> str:
return f"EventBlock (name: {self.name}, comment: {self.comment}, address: {hex(self.address)}, scopes: {self.scopes}, fields: {super().__str__()})"
@property
def value(self) -> float:
return self.sync_base * self.sync_factor
@value.setter
def value(self, val: float) -> None:
self.sync_factor = val / self.sync_base
def to_blocks(self, address: int, blocks: list[SupportsBytes]) -> int:
text = self.name
if text:
tx_block = TextBlock(text=text)
self.name_addr = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.name_addr = 0
text = self.comment
if text:
tx_block = TextBlock(text=text)
self.comment_addr = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.comment_addr = 0
if self.flags & v4c.FLAG_EV_GROUP_NAME:
text = self.group_name
if text:
tx_block = TextBlock(text=text)
self.group_name_addr = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.group_name_addr = 0
blocks.append(self)
self.address = address
address += self.block_len
return address
class FileIdentificationBlockKwargs(BlockKwargs, total=False):
version: str
[docs]
class FileIdentificationBlock:
"""`FileIdentificationBlock` has the following attributes, which are also
available as dict-like key-value pairs.
IDBLOCK fields:
* ``file_identification`` - bytes : file identifier
* ``version_str`` - bytes : format identifier
* ``program_identification`` - bytes : creator program identifier
* ``reserved0`` - bytes : reserved bytes
* ``mdf_version`` - int : version number of MDF format
* ``reserved1`` - bytes : reserved bytes
* ``unfinalized_standard_flags`` - int : standard flags for unfinalized MDF
* ``unfinalized_custom_flags`` - int : custom flags for unfinalized MDF
Other attributes:
* ``address`` - int : should always be 0
"""
__slots__ = (
"address",
"file_identification",
"mdf_version",
"program_identification",
"reserved0",
"reserved1",
"unfinalized_custom_flags",
"unfinalized_standard_flags",
"version_str",
)
def __init__(self, **kwargs: Unpack[FileIdentificationBlockKwargs]) -> None:
super().__init__()
self.address = 0
try:
stream = kwargs["stream"]
stream.seek(self.address)
(
self.file_identification,
self.version_str,
self.program_identification,
self.reserved0,
self.mdf_version,
self.reserved1,
self.unfinalized_standard_flags,
self.unfinalized_custom_flags,
) = typing.cast(
v4c.IdentificationBlock,
unpack(v4c.FMT_IDENTIFICATION_BLOCK, stream.read(v4c.IDENTIFICATION_BLOCK_SIZE)),
)
except KeyError:
version = kwargs.get("version", "4.00")
self.file_identification = b"MDF "
self.version_str = f"{version} ".encode()
self.program_identification = f"{tool.__tool_short__}{tool.__version__}".encode()
self.reserved0 = b"\0" * 4
self.mdf_version = int(version.replace(".", ""))
self.reserved1 = b"\0" * 30
self.unfinalized_standard_flags = 0
self.unfinalized_custom_flags = 0
def __getitem__(self, item: str) -> object:
return getattr(self, item)
def __setitem__(self, item: str, value: object) -> None:
setattr(self, item, value)
def __bytes__(self) -> bytes:
result = pack(
v4c.FMT_IDENTIFICATION_BLOCK,
*[self[key] for key in v4c.KEYS_IDENTIFICATION_BLOCK],
)
return result
class FileHistoryKwargs(BlockKwargs, total=False):
reserved0: int
block_len: int
links_nr: int
next_fh_addr: int
comment_addr: int
abs_time: int
tz_offset: int
daylight_save_time: int
time_flags: int
reserved1: bytes
file_limit: int | float
[docs]
class FileHistory:
"""`FileHistory` has the following attributes, which are also available as
dict-like key-value pairs.
FHBLOCK fields:
* ``id`` - bytes : block ID; always b'##FH'
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``next_fh_addr`` - int : address of next FHBLOCK
* ``comment_addr`` - int : address of TXBLOCK/MDBLOCK that contains the
file history comment
* ``abs_time`` - int : timestamp at which the file modification happened
* ``tz_offset`` - int : UTC time offset in hours (= GMT time zone)
* ``daylight_save_time`` - int : daylight saving time
* ``time_flags`` - int : time flags
* ``reserved1`` - bytes : reserved bytes
Other attributes:
* ``address`` - int : file history address
* ``comment`` - str : history comment
"""
__slots__ = (
"abs_time",
"address",
"block_len",
"comment",
"comment_addr",
"daylight_save_time",
"id",
"links_nr",
"next_fh_addr",
"reserved0",
"reserved1",
"time_flags",
"tz_offset",
)
def __init__(self, **kwargs: Unpack[FileHistoryKwargs]) -> None:
super().__init__()
self.comment = ""
try:
self.address = address = kwargs["address"]
stream = kwargs["stream"]
stream.seek(address)
file_limit = kwargs["file_limit"]
if address + v4c.FH_BLOCK_SIZE > file_limit:
handle_incomplete_block(address, file_limit)
raise KeyError
(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.next_fh_addr,
self.comment_addr,
self.abs_time,
self.tz_offset,
self.daylight_save_time,
self.time_flags,
self.reserved1,
) = typing.cast(v4c.FileHistory, unpack(v4c.FMT_FILE_HISTORY, stream.read(v4c.FH_BLOCK_SIZE)))
if self.id != b"##FH":
message = f'Expected "##FH" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
self.comment = get_text_v4(address=self.comment_addr, stream=stream, file_limit=file_limit)
except KeyError:
self.id = b"##FH"
self.reserved0 = kwargs.get("reserved0", 0)
self.block_len = kwargs.get("block_len", v4c.FH_BLOCK_SIZE)
self.links_nr = kwargs.get("links_nr", 2)
self.next_fh_addr = kwargs.get("next_fh_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.abs_time = kwargs.get("abs_time", int(time.time()) * 10**9)
self.tz_offset = kwargs.get("tz_offset", 120)
self.daylight_save_time = kwargs.get("daylight_save_time", 60)
self.time_flags = kwargs.get("time_flags", 2)
self.reserved1 = kwargs.get("reserved1", b"\x00" * 3)
localtz = dateutil.tz.tzlocal()
self.time_stamp = datetime.fromtimestamp(time.time(), tz=localtz)
def to_blocks(
self, address: int, blocks: list[bytes | SupportsBytes], defined_texts: dict[bytes | str, int]
) -> int:
text = self.comment
if text:
if text in defined_texts:
self.comment_addr = defined_texts[text]
else:
meta = text.startswith("<FHcomment")
tx_block = TextBlock(text=text, meta=meta)
self.comment_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.comment_addr = 0
blocks.append(self)
self.address = address
address += self.block_len
return address
def __getitem__(self, item: str) -> object:
return getattr(self, item)
def __setitem__(self, item: str, value: object) -> None:
setattr(self, item, value)
def __bytes__(self) -> bytes:
result = pack(v4c.FMT_FILE_HISTORY, *[self[key] for key in v4c.KEYS_FILE_HISTORY])
return result
@property
def time_stamp(self) -> datetime:
"""Getter and setter of the file history timestamp.
Returns
-------
timestamp : datetime.datetime
Start timestamp.
"""
timestamp = self.abs_time / 10**9
tz_local = False
if self.time_flags & v4c.FLAG_HD_LOCAL_TIME:
tz_local = True
tz = timezone.utc
else:
tz = timezone(timedelta(minutes=self.tz_offset + self.daylight_save_time))
try:
timestamp_dt = datetime.fromtimestamp(timestamp, tz)
if tz_local:
timestamp_dt = timestamp_dt.replace(tzinfo=None)
except OverflowError:
timestamp_dt = datetime.fromtimestamp(0, tz) + timedelta(seconds=timestamp)
return timestamp_dt
@time_stamp.setter
def time_stamp(self, timestamp: datetime) -> None:
if timestamp.tzinfo is None:
self.time_flags = v4c.FLAG_HD_LOCAL_TIME
self.abs_time = int(timestamp.replace(tzinfo=timezone.utc).timestamp() * 10**9)
self.tz_offset = 0
self.daylight_save_time = 0
else:
self.time_flags = v4c.FLAG_HD_TIME_OFFSET_VALID
tzinfo = timestamp.tzinfo
dst = tzinfo.dst(timestamp)
if dst is not None:
dst_offset = int(dst.total_seconds() / 60)
else:
dst_offset = 0
utc_offset = tzinfo.utcoffset(timestamp)
if utc_offset is None:
raise RuntimeError("utc_offset is None")
tz_offset = int(utc_offset.total_seconds() / 60) - dst_offset
self.tz_offset = tz_offset
self.daylight_save_time = dst_offset
self.abs_time = int(timestamp.timestamp() * 10**9)
def __repr__(self) -> str:
return f"FHBLOCK(time={self.time_stamp}, comment={self.comment})"
class GuardBlockKwargs(BlockKwargs, total=False):
reserved0: int
block_len: int
links_nr: int
gd_addr: int
gd_version: int
reserved1: bytes
class GuardBlock:
"""`GuardBlock` has the following attributes, which are also available as
dict-like key-value pairs.
GDBLOCK fields:
* ``id`` - bytes : block ID; always b'##GD'
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``gd_addr`` - int : address of guarded block
* ``gd_version`` - int : minimum MDF format version
* ``reserved1`` - int : reserved bytes
Other attributes:
* ``address`` - int : guard block address
"""
__slots__ = (
"address",
"block_len",
"gd_addr",
"gd_version",
"guarded_block",
"id",
"links_nr",
"reserved0",
"reserved1",
)
def __init__(self, **kwargs: Unpack[GuardBlockKwargs]) -> None:
self.guarded_block = None
try:
self.address = address = kwargs["address"]
stream = kwargs["stream"]
mapped = kwargs.get("mapped", False) or not is_file_like(stream)
if mapped:
(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.gd_addr,
self.gd_version,
self.reserved1,
) = v4c.GD_uf(stream, address)
else:
stream.seek(address)
(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.gd_addr,
self.gd_version,
self.reserved1,
) = v4c.GD_u(stream.read(v4c.GD_BLOCK_SIZE))
if self.id != b"##GD":
message = f'Expected "##GD" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
except KeyError:
self.address = 0
self.id = b"##GD"
self.reserved0 = kwargs.get("reserved0", 0)
self.block_len = kwargs.get("block_len", v4c.GD_BLOCK_SIZE)
self.links_nr = kwargs.get("links_nr", 1)
self.gd_addr = kwargs.get("gd_addr", 0)
self.gd_version = kwargs.get("gd_version", 430)
self.reserved1 = kwargs.get("reserved1", b"\00" * 6)
def copy(self) -> "GuardBlock":
gd = GuardBlock(
reserved0=self.reserved0,
block_len=self.block_len,
links_nr=self.links_nr,
gd_addr=self.gd_addr,
gd_version=self.gd_version,
reserved1=self.reserved1,
)
gd.address = self.address
return gd
def to_blocks(self, address: int, blocks: list[SupportsBytes], defined_texts: dict[str, int]) -> int:
blocks.append(self)
self.address = address
self.gd_addr = self.guarded_block.address if self.guarded_block else 0
address += self.block_len
return address
def __getitem__(self, item: str) -> object:
return getattr(self, item)
def __setitem__(self, item: str, value: object) -> None:
setattr(self, item, value)
def __bytes__(self) -> bytes:
result = v4c.GD_p(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.gd_addr,
self.gd_version,
self.reserved1,
)
return result
class HeaderBlockKwargs(BlockKwargs, total=False):
reserved3: int
block_len: int
links_nr: int
first_dg_addr: int
file_history_addr: int
channel_tree_addr: int
first_attachment_addr: int
first_event_addr: int
comment_addr: int
abs_time: int
tz_offset: int
daylight_save_time: int
time_flags: int
time_quality: int
flags: int
reserved4: int
start_angle: int
start_distance: int
file_limit: int
class HeaderListKwargs(BlockKwargs, total=False):
first_dl_addr: int
file_limit: int
flags: int
zip_type: int
class HeaderList:
"""`HeaderList` has the following attributes, which are also available as
dict-like key-value pairs.
HLBLOCK fields:
* ``id`` - bytes : block ID; always b'##HL'
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``first_dl_addr`` - int : address of first data list block for this
header list
* ``flags`` - int : source flags
* ``zip_type`` - int : integer code for zip type
* ``reserved1`` - bytes : reserved bytes
Other attributes:
* ``address`` - int : header list address
"""
__slots__ = (
"address",
"block_len",
"first_dl_addr",
"flags",
"id",
"links_nr",
"reserved0",
"reserved1",
"zip_type",
)
def __init__(self, **kwargs: Unpack[HeaderListKwargs]) -> None:
super().__init__()
try:
self.address = address = kwargs["address"]
stream = kwargs["stream"]
file_limit = kwargs["file_limit"]
if address + v4c.HL_BLOCK_SIZE > file_limit:
handle_incomplete_block(address, file_limit)
raise KeyError
stream.seek(address)
(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.first_dl_addr,
self.flags,
self.zip_type,
self.reserved1,
) = typing.cast(v4c.HlBlock, unpack(v4c.FMT_HL_BLOCK, stream.read(v4c.HL_BLOCK_SIZE)))
if self.id != b"##HL":
message = f'Expected "##HL" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
except KeyError:
self.address = 0
self.id = b"##HL"
self.reserved0 = 0
self.block_len = v4c.HL_BLOCK_SIZE
self.links_nr = 1
self.first_dl_addr = kwargs.get("first_dl_addr", 0)
self.flags = kwargs.get("flags", 1)
self.zip_type = kwargs.get("zip_type", 0)
self.reserved1 = b"\x00" * 5
def __getitem__(self, item: str) -> object:
return getattr(self, item)
def __setitem__(self, item: str, value: object) -> None:
setattr(self, item, value)
def __bytes__(self) -> bytes:
result = pack(v4c.FMT_HL_BLOCK, *[self[key] for key in v4c.KEYS_HL_BLOCK])
return result
class _ListDataBase:
__slots__ = (
"address",
"block_len",
"data_block_len",
"data_block_nr",
"flags",
"flags_ext",
"id",
"links_nr",
"next_ld_addr",
"reserved0",
"zip_info",
"zip_info_inval",
)
class ListDataKwargs(BlockKwargs, total=False):
data_block_nr: int
flags: int
data_block_len: int
file_limit: float
class ListData(_ListDataBase):
"""`ListData` has the following attributes, which are also available as
dict-like key-value pairs.
LDBLOCK common fields:
* ``id`` - bytes : block ID; always b'##LD'
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``next_ld_addr`` - int : address of next LDBLOCK
* ``data_block_addr_<N>`` - int : address of N-th data block
bits data block
* ``flags`` - int : data list flags
* ``data_block_nr`` - int : number of data blocks referenced by this list
LDBLOCK specific fields:
* if invalidation data present flag is set
* ``invalidation_bits_addr_<N>`` - int : address of N-th invalidation
* for equal-length blocks
* ``data_block_len`` - int : equal uncompressed size in bytes for all
referenced data blocks; last block can be smaller
* for variable-length blocks
* ``offset_<N>`` - int : byte offset of N-th data block
* if time values flag is set
* ``time_value_<N>`` - int | float : first raw timestamp value of
N-th data block
* if angle values flag is set
* ``angle_value_<N>`` - int | float : first raw angle value of
N-th data block
* if distance values flag is set
* ``distance_value_<N>`` - int | float : first raw distance value of
N-th data block
Other attributes:
* ``address`` - int : data list address
"""
def __init__(self, **kwargs: Unpack[ListDataKwargs]) -> None:
try:
self.address = address = kwargs["address"]
stream = kwargs["stream"]
mapped = kwargs.get("mapped", False) or not is_file_like(stream)
file_limit = kwargs.get("file_limit", float("inf"))
if mapped:
if self.address + COMMON_SIZE > file_limit:
logger.warning(f"incomplete block at 0x{self.address:x} exceeds the file size")
self.next_ld_addr = 0
self.data_block_nr = 0
return
self.id, self.reserved0, self.block_len, self.links_nr = COMMON_uf(stream, address)
if self.id != b"##LD":
message = f'Expected "##LD" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
if self.address + self.block_len > file_limit:
logger.warning(f"incomplete block at 0x{self.address:x} exceeds the file size")
self.next_ld_addr = 0
self.data_block_nr = 0
return
address += COMMON_SIZE
links: tuple[int, ...] = unpack_from(f"<{self.links_nr}Q", stream, address)
address += self.links_nr * 8
self.flags, self.zip_info, self.zip_info_inval, self.flags_ext, self.self.data_block_nr = unpack_from(
"<4BI", stream, address
)
address += 8
if self.flags & v4c.FLAG_LD_EQUAL_LENGHT:
(self.data_block_len,) = UINT64_uf(stream, address)
address += 8
else:
offsets: tuple[int, ...] = unpack_from(f"<{self.data_block_nr}Q", stream, address)
address += self.data_block_nr * 8
for i, offset in enumerate(offsets):
self[f"offset_{i}"] = offset
if self.flags & v4c.FLAG_LD_TIME_VALUES:
values: tuple[bytes, ...] = unpack_from(f"<{8 * self.data_block_nr}s", stream, address)
address += self.data_block_nr * 8
for i, value in enumerate(values):
self[f"time_value_{i}"] = value
if self.flags & v4c.FLAG_LD_ANGLE_VALUES:
values = unpack_from(f"<{8 * self.data_block_nr}s", stream, address)
address += self.data_block_nr * 8
for i, value in enumerate(values):
self[f"angle_value_{i}"] = value
if self.flags & v4c.FLAG_LD_DISTANCE_VALUES:
values = unpack_from(f"<{8 * self.data_block_nr}s", stream, address)
address += self.data_block_nr * 8
for i, value in enumerate(values):
self[f"distance_value_{i}"] = value
self.next_ld_addr = links[0]
for i in range(self.data_block_nr):
self[f"data_block_addr_{i}"] = links[i + 1]
if self.flags_ext & v4c.FLAG_LD_EXT_INVALIDATION_PRESENT:
for i in range(self.data_block_nr):
self[f"invalidation_bits_addr_{i}"] = links[self.data_block_nr + 1 + i]
else:
if self.address + COMMON_SIZE > file_limit:
logger.warning(f"incomplete block at 0x{self.address:x} exceeds the file size")
self.next_ld_addr = 0
self.data_block_nr = 0
return
stream.seek(address)
self.id, self.reserved0, self.block_len, self.links_nr = COMMON_u(stream.read(COMMON_SIZE))
if self.id != b"##LD":
message = f'Expected "##LD" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
if self.address + self.block_len > file_limit:
logger.warning(f"incomplete block at 0x{self.address:x} exceeds the file size")
self.next_ld_addr = 0
self.data_block_nr = 0
return
links = unpack(f"<{self.links_nr}Q", stream.read(self.links_nr * 8))
self.flags, self.zip_info, self.zip_info_inval, self.flags_ext, self.self.data_block_nr = typing.cast(
tuple[int, int], unpack("<4BI", stream.read(8))
)
if self.flags & v4c.FLAG_LD_EQUAL_LENGHT:
(self.data_block_len,) = UINT64_u(stream.read(8))
else:
offsets = unpack(f"<{self.data_block_nr}Q", stream.read(self.data_block_nr * 8))
for i, offset in enumerate(offsets):
self[f"offset_{i}"] = offset
if self.flags & v4c.FLAG_LD_TIME_VALUES:
values = unpack(
f"<{8 * self.data_block_nr}s",
stream.read(self.data_block_nr * 8),
)
for i, value in enumerate(values):
self[f"time_value_{i}"] = value
if self.flags & v4c.FLAG_LD_ANGLE_VALUES:
values = unpack(
f"<{8 * self.data_block_nr}s",
stream.read(self.data_block_nr * 8),
)
for i, value in enumerate(values):
self[f"angle_value_{i}"] = value
if self.flags & v4c.FLAG_LD_DISTANCE_VALUES:
values = unpack(
f"<{8 * self.data_block_nr}s",
stream.read(self.data_block_nr * 8),
)
for i, value in enumerate(values):
self[f"distance_value_{i}"] = value
self.next_ld_addr = links[0]
for i in range(self.data_block_nr, 1):
self[f"data_block_addr_{i}"] = links[i]
if self.flags_ext & v4c.FLAG_LD_EXT_INVALIDATION_PRESENT:
for i in range(self.data_block_nr, self.data_block_nr + 1):
self[f"invalidation_bits_addr_{i}"] = links[i]
except KeyError:
self.address = 0
self.id = b"##LD"
self.reserved0 = 0
self.data_block_nr = kwargs["data_block_nr"]
self.flags = kwargs["flags"]
self.flags_ext = kwargs.get("flags_ext", 0)
self.zip_info = kwargs.get("zip_info", 0)
self.zip_info_inval = kwargs.get("zip_info_inval", 0)
self.flags_ext = kwargs["flags_ext"]
self.data_block_len = kwargs["data_block_len"]
self.next_ld_addr = 0
for i in range(self.data_block_nr):
self[f"data_block_addr_{i}"] = kwargs[f"data_block_addr_{i}"] # type: ignore[literal-required]
if self.flags_ext & v4c.FLAG_LD_EXT_INVALIDATION_PRESENT:
self.links_nr = 2 * self.data_block_nr + 1
for i in range(self.data_block_nr):
self[f"invalidation_bits_addr_{i}"] = kwargs[f"invalidation_bits_addr_{i}"] # type: ignore[literal-required]
else:
self.links_nr = self.data_block_nr + 1
self.block_len = 24 + self.links_nr * 8 + 16
def __getitem__(self, item: str) -> object:
return getattr(self, item)
def __setitem__(self, item: str, value: object) -> None:
setattr(self, item, value)
def __bytes__(self) -> bytes:
fmt = "<4sI3Q"
keys: tuple[str, ...] = (
"id",
"reserved0",
"block_len",
"links_nr",
"next_ld_addr",
)
fmt += f"{self.data_block_nr}Q"
keys += tuple(f"data_block_addr_{i}" for i in range(self.data_block_nr))
if self.flags & v4c.FLAG_LD_EXT_INVALIDATION_PRESENT:
fmt += f"{self.data_block_nr}Q"
keys += tuple(f"invalidation_bits_addr_{i}" for i in range(self.data_block_nr))
fmt += "4BI"
keys += ("flags", "zip_info", "zip_info_inval", "flags_ext", "data_block_nr")
if self.flags & v4c.FLAG_LD_EQUAL_LENGHT:
fmt += "Q"
keys += ("data_block_len",)
else:
fmt += f"<{self.data_block_nr}Q"
keys += tuple(f"offset_{i}" for i in range(self.data_block_nr))
if self.flags & v4c.FLAG_LD_TIME_VALUES:
fmt += "8s" * self.data_block_nr
keys += tuple(f"time_value_{i}" for i in range(self.data_block_nr))
if self.flags & v4c.FLAG_LD_ANGLE_VALUES:
fmt += "8s" * self.data_block_nr
keys += tuple(f"angle_value_{i}" for i in range(self.data_block_nr))
if self.flags & v4c.FLAG_LD_DISTANCE_VALUES:
fmt += "8s" * self.data_block_nr
keys += tuple(f"distance_value_{i}" for i in range(self.data_block_nr))
result = pack(fmt, *[getattr(self, key) for key in keys])
return result
class SourceInformationKwargs(BlockKwargs, total=False):
raw_bytes: bytes
source_type: int
bus_type: int
file_limit: int | float
class TextBlockKwargs(BlockKwargs, total=False):
safe: bool
text: bytes | str
meta: bool
[docs]
class TextBlock:
"""Common TXBLOCK and MDBLOCK class.
`TextBlock` has the following attributes, which are also available as
dict-like key-value pairs.
TXBLOCK fields:
* ``id`` - bytes : block ID; b'##TX' for TXBLOCK and b'##MD' for MDBLOCK
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``text`` - bytes : actual text content
Other attributes:
* ``address`` - int : text block address
Parameters
----------
address : int
Block address.
stream : handle
File handle.
meta : bool
Flag to set the block type to MDBLOCK for dynamically created objects;
default is False.
text : bytes/str
Text content for dynamically created objects.
"""
__slots__ = ("address", "block_len", "id", "links_nr", "reserved0", "text")
def __init__(self, **kwargs: Unpack[TextBlockKwargs]) -> None:
if "safe" in kwargs:
self.address = 0
text = kwargs["text"]
size = len(text)
self.id = b"##MD" if kwargs["meta"] else b"##TX"
self.reserved0 = 0
self.links_nr = 0
self.text = text
self.block_len = size + 32 - size % 8
elif "stream" in kwargs:
stream = kwargs["stream"]
mapped = kwargs.get("mapped", False) or not is_file_like(stream)
self.address = address = kwargs["address"]
if mapped:
self.id, self.reserved0, self.block_len, self.links_nr = COMMON_uf(stream, address)
size = self.block_len - COMMON_SIZE
if self.id not in (b"##TX", b"##MD"):
message = f'Expected "##TX" or "##MD" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
self.text = text = stream[address + COMMON_SIZE : address + self.block_len]
else:
stream.seek(address)
self.id, self.reserved0, self.block_len, self.links_nr = COMMON_u(stream.read(COMMON_SIZE))
size = self.block_len - COMMON_SIZE
if self.id not in (b"##TX", b"##MD"):
message = f'Expected "##TX" or "##MD" block @{hex(address)} but found "{self.id!r}"'
logger.exception(message)
raise MdfException(message)
self.text = text = stream.read(size)
align = size % 8
if align:
self.block_len = size + COMMON_SIZE + 8 - align
else:
if text:
if text[-1]:
self.block_len += 8
else:
self.block_len += 8
else:
text = kwargs["text"]
if isinstance(text, str):
text = text.encode("utf-8", "replace")
size = len(text)
self.id = b"##MD" if kwargs.get("meta", False) else b"##TX"
self.reserved0 = 0
self.links_nr = 0
self.text = text
self.block_len = size + 32 - size % 8
def __getitem__(self, item: str) -> object:
return getattr(self, item)
def __setitem__(self, item: str, value: object) -> None:
setattr(self, item, value)
def __bytes__(self) -> bytes:
return pack(
f"<4sI2Q{self.block_len - COMMON_SIZE}s",
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.text,
)
def __repr__(self) -> str:
return (
f"TextBlock(id={self.id!r},"
f"reserved0={self.reserved0}, "
f"block_len={self.block_len}, "
f"links_nr={self.links_nr} "
f"text={self.text!r})"
)