# -*- coding: utf-8 -*-
"""
classes that implement the blocks for MDF version 4
"""
import logging
import xml.etree.ElementTree as ET
import time
from datetime import datetime
from hashlib import md5
from struct import pack, unpack, unpack_from
from textwrap import wrap
from zlib import compress, decompress
from pathlib import Path
import numpy as np
from numexpr import evaluate
from . import v4_constants as v4c
from .utils import (
MdfException,
get_text_v4,
SignalSource,
UINT8_uf,
UINT64_u,
UINT64_uf,
FLOAT64_u,
sanitize_xml,
block_fields,
)
from ..version import __version__
SEEK_START = v4c.SEEK_START
SEEK_END = v4c.SEEK_END
COMMON_SIZE = v4c.COMMON_SIZE
COMMON_u = v4c.COMMON_u
COMMON_uf = v4c.COMMON_uf
CN_BLOCK_SIZE = v4c.CN_BLOCK_SIZE
SIMPLE_CHANNEL_PARAMS_uf = v4c.SIMPLE_CHANNEL_PARAMS_uf
logger = logging.getLogger("asammdf")
__all__ = [
"AttachmentBlock",
"Channel",
"ChannelArrayBlock",
"ChannelGroup",
"ChannelConversion",
"DataBlock",
"DataZippedBlock",
"EventBlock",
"FileIdentificationBlock",
"HeaderBlock",
"HeaderList",
"DataList",
"DataGroup",
"FileHistory",
"SourceInformation",
"TextBlock",
]
[docs]class AttachmentBlock:
"""When adding new attachments only embedded attachments are allowed, with
keyword argument *data* of type bytes
*AttachmentBlock* has the following attributes, that are also available as
dict like key-value pairs
ATBLOCK fields
* ``id`` - bytes : block ID; always b'##AT'
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``next_at_addr`` - int : next ATBLOCK address
* ``file_name_addr`` - int : address of TXBLOCK that contains the attachment
file name
* ``mime_addr`` - int : address of TXBLOCK that contains the attachment
mime type description
* ``comment_addr`` - int : address of TXBLOCK/MDBLOCK that contains the
attachment comment
* ``flags`` - int : ATBLOCK flags
* ``creator_index`` - int : index of file history block
* ``reserved1`` - int : reserved bytes
* ``md5_sum`` - bytes : attachment file md5 sum
* ``original_size`` - int : original uncompress file size in bytes
* ``embedded_size`` - int : embedded compressed file size in bytes
* ``embedded_data`` - bytes : embedded atatchment bytes
Other attributes
* ``address`` - int : attachment address
* ``file_name`` - str : attachment file name
* ``mime`` - str : mime type
* ``comment`` - str : attachment comment
Parameters
----------
address : int
block address; to be used for objects created from file
stream : handle
file handle; to be used for objects created from file
for dynamically created objects :
see the key-value pairs
"""
__slots__ = (
"address",
"file_name",
"mime",
"comment",
"id",
"reserved0",
"block_len",
"links_nr",
"next_at_addr",
"file_name_addr",
"mime_addr",
"comment_addr",
"flags",
"creator_index",
"reserved1",
"md5_sum",
"original_size",
"embedded_size",
"embedded_data",
)
def __init__(self, **kwargs):
self.file_name = self.mime = self.comment = ""
try:
self.address = address = kwargs["address"]
stream = kwargs["stream"]
mapped = kwargs.get("mapped", False)
if mapped:
(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.next_at_addr,
self.file_name_addr,
self.mime_addr,
self.comment_addr,
self.flags,
self.creator_index,
self.reserved1,
self.md5_sum,
self.original_size,
self.embedded_size,
) = v4c.AT_COMMON_uf(stream, address)
address += v4c.AT_COMMON_SIZE
self.embedded_data = stream[address: address + self.embedded_size]
else:
stream.seek(address)
(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.next_at_addr,
self.file_name_addr,
self.mime_addr,
self.comment_addr,
self.flags,
self.creator_index,
self.reserved1,
self.md5_sum,
self.original_size,
self.embedded_size,
) = v4c.AT_COMMON_u(stream.read(v4c.AT_COMMON_SIZE))
self.embedded_data = stream.read(self.embedded_size)
if self.id != b"##AT":
message = f'Expected "##AT" block @{hex(address)} but found "{self.id}"'
logger.exception(message)
raise MdfException(message)
self.file_name = get_text_v4(self.file_name_addr, stream, mapped=mapped)
self.mime = get_text_v4(self.mime_addr, stream, mapped=mapped)
self.comment = get_text_v4(self.comment_addr, stream, mapped=mapped)
except KeyError:
self.file_name = Path(kwargs.get("file_name", None) or "bin.bin")
data = kwargs["data"]
original_size = embedded_size = len(data)
compression = kwargs.get("compression", False)
embedded = kwargs.get("embedded", False)
md5_sum = md5(data).digest()
flags = v4c.FLAG_AT_MD5_VALID
if embedded:
flags |= v4c.FLAG_AT_EMBEDDED
if compression:
flags |= v4c.FLAG_AT_COMPRESSED_EMBEDDED
data = compress(data)
embedded_size = len(data)
else:
self.file_name = Path(self.file_name.name)
self.file_name.write_bytes(data)
embedded_size = 0
data = b""
self.id = b"##AT"
self.reserved0 = 0
self.block_len = v4c.AT_COMMON_SIZE + embedded_size
self.links_nr = 4
self.next_at_addr = 0
self.file_name_addr = 0
self.mime_addr = 0
self.comment_addr = 0
self.flags = flags
self.creator_index = 0
self.reserved1 = 0
self.md5_sum = md5_sum
self.original_size = original_size
self.embedded_size = embedded_size
self.embedded_data = data
def to_blocks(self, address, blocks, defined_texts):
text = self.file_name
if text:
if text in defined_texts:
self.file_name_addr = defined_texts[text]
else:
tx_block = TextBlock(text=str(text))
self.file_name_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.file_name_addr = 0
text = self.mime
if text:
if text in defined_texts:
self.mime_addr = defined_texts[text]
else:
tx_block = TextBlock(text=text)
self.mime_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.mime_addr = 0
text = self.comment
if text:
if text in defined_texts:
self.comment_addr = defined_texts[text]
else:
meta = text.startswith("<ATcomment")
tx_block = TextBlock(text=text, meta=meta)
self.comment_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.comment_addr = 0
blocks.append(self)
self.address = address
address += self.block_len
align = address % 8
if align % 8:
blocks.append(b"\0" * (8 - align))
address += 8 - align
return address
def __getitem__(self, item):
return self.__getattribute__(item)
def __setitem__(self, item, value):
self.__setattr__(item, value)
def __bytes__(self):
fmt = f"{v4c.FMT_AT_COMMON}{self.embedded_size}s"
result = pack(fmt, *[self[key] for key in v4c.KEYS_AT_BLOCK])
return result
[docs]class Channel:
""" If the `load_metadata` keyword argument is not provided or is False,
then the conversion, source and display name information is not processed.
Further more if the `parse_xml_comment` is not provided or is False, then
the display name information from the channel comment is not processed (this
is done to avoid expensive XML operations)
*Channel* has the following attributes, that are also available as
dict like key-value pairs
CNBLOCK fields
* ``id`` - bytes : block ID; always b'##CN'
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``next_ch_addr`` - int : next ATBLOCK address
* ``component_addr`` - int : address of first channel in case of structure channel
composition, or ChannelArrayBlock in case of arrays
file name
* ``name_addr`` - int : address of TXBLOCK that contains the channel name
* ``source_addr`` - int : address of channel source block
* ``conversion_addr`` - int : address of channel conversion block
* ``data_block_addr`` - int : address of signal data block for VLSD channels
* ``unit_addr`` - int : address of TXBLOCK that contains the channel unit
* ``comment_addr`` - int : address of TXBLOCK/MDBLOCK that contains the
channel comment
* ``attachment_<N>_addr`` - int : address of N-th ATBLOCK referenced by the
current channel; if no ATBLOCK is referenced there will be no such key-value
pair
* ``default_X_dg_addr`` - int : address of DGBLOCK where the default X axis
channel for the current channel is found; this key-value pair will not
exist for channels that don't have a default X axis
* ``default_X_cg_addr`` - int : address of CGBLOCK where the default X axis
channel for the current channel is found; this key-value pair will not
exist for channels that don't have a default X axis
* ``default_X_ch_addr`` - int : address of default X axis
channel for the current channel; this key-value pair will not
exist for channels that don't have a default X axis
* ``channel_type`` - int : integer code for the channel type
* ``sync_type`` - int : integer code for the channel's sync type
* ``data_type`` - int : integer code for the channel's data type
* ``bit_offset`` - int : bit offset
* ``byte_offset`` - int : byte offset within the data record
* ``bit_count`` - int : channel bit count
* ``flags`` - int : CNBLOCK flags
* ``pos_invalidation_bit`` - int : invalidation bit position for the current
channel if there are invalidation bytes in the data record
* ``precision`` - int : integer code for teh precision
* ``reserved1`` - int : reserved bytes
* ``min_raw_value`` - int : min raw value of all samples
* ``max_raw_value`` - int : max raw value of all samples
* ``lower_limit`` - int : min physical value of all samples
* ``upper_limit`` - int : max physical value of all samples
* ``lower_ext_limit`` - int : min physical value of all samples
* ``upper_ext_limit`` - int : max physical value of all samples
Other attributes
* ``address`` - int : channel address
* ``attachments`` - list : list of referenced attachment blocks indexes;
the index referece to the attachment block index
* ``comment`` - str : channel comment
* ``conversion`` - ChannelConversion : channel conversion; *None* if the
channel has no conversion
* ``display_name`` - str : channel display name; this is extracted from the
XML channel comment
* ``name`` - str : channel name
* ``source`` - SourceInformation : channel source information; *None* if
the channel has no source information
* ``unit`` - str : channel unit
Parameters
----------
address : int
block address; to be used for objects created from file
stream : handle
file handle; to be used for objects created from file
load_metadata : bool
option to load conversion, source and display_name; default *True*
parse_xml_comment : bool
option to parse XML channel comment to search for display name; default
*True*
for dynamically created objects :
see the key-value pairs
"""
__slots__ = (
"name",
"unit",
"comment",
"display_name",
"conversion",
"source",
"attachment",
"address",
"dtype_fmt",
"id",
"reserved0",
"block_len",
"links_nr",
"next_ch_addr",
"component_addr",
"name_addr",
"source_addr",
"conversion_addr",
"data_block_addr",
"unit_addr",
"comment_addr",
"channel_type",
"sync_type",
"data_type",
"bit_offset",
"byte_offset",
"bit_count",
"flags",
"pos_invalidation_bit",
"precision",
"reserved1",
"attachment_nr",
"min_raw_value",
"max_raw_value",
"lower_limit",
"upper_limit",
"lower_ext_limit",
"upper_ext_limit",
"default_X_dg_addr",
"default_X_cg_addr",
"default_X_ch_addr",
"attachment_addr",
)
def __init__(self, **kwargs):
if "stream" in kwargs:
self.address = address = kwargs["address"]
stream = kwargs["stream"]
mapped = kwargs.get('mapped', False)
if mapped:
(self.id, self.reserved0, self.block_len, self.links_nr) = COMMON_uf(stream, address)
if self.block_len == CN_BLOCK_SIZE:
(
self.next_ch_addr,
self.component_addr,
self.name_addr,
self.source_addr,
self.conversion_addr,
self.data_block_addr,
self.unit_addr,
self.comment_addr,
self.channel_type,
self.sync_type,
self.data_type,
self.bit_offset,
self.byte_offset,
self.bit_count,
self.flags,
self.pos_invalidation_bit,
self.precision,
self.reserved1,
self.attachment_nr,
self.min_raw_value,
self.max_raw_value,
self.lower_limit,
self.upper_limit,
self.lower_ext_limit,
self.upper_ext_limit,
) = SIMPLE_CHANNEL_PARAMS_uf(stream, address + COMMON_SIZE)
else:
stream.seek(address + COMMON_SIZE)
block = stream.read(self.block_len - COMMON_SIZE)
links_nr = self.links_nr
links = unpack_from(f"<{links_nr}Q", block)
params = unpack_from(v4c.FMT_CHANNEL_PARAMS, block, links_nr * 8)
(
self.next_ch_addr,
self.component_addr,
self.name_addr,
self.source_addr,
self.conversion_addr,
self.data_block_addr,
self.unit_addr,
self.comment_addr,
) = links[:8]
at_map = kwargs.get("at_map", {})
if params[10]:
self.attachment = []
self.attachment_addr = links[8]
self.attachment = at_map.get(links[8], 0)
self.links_nr -= params[10] - 1
self.block_len -= (params[10] - 1) * 8
params = list(params)
params[10] = 1
if params[6] & v4c.FLAG_CN_DEFAULT_X:
(
self.default_X_dg_addr,
self.default_X_cg_addr,
self.default_X_ch_addr,
) = links[-3:]
# default X not supported yet
(
self.default_X_dg_addr,
self.default_X_cg_addr,
self.default_X_ch_addr,
) = (0, 0, 0)
(
self.channel_type,
self.sync_type,
self.data_type,
self.bit_offset,
self.byte_offset,
self.bit_count,
self.flags,
self.pos_invalidation_bit,
self.precision,
self.reserved1,
self.attachment_nr,
self.min_raw_value,
self.max_raw_value,
self.lower_limit,
self.upper_limit,
self.lower_ext_limit,
self.upper_ext_limit,
) = params
if self.id != b"##CN":
message = f'Expected "##CN" block @{hex(address)} but found "{self.id}"'
logger.exception(message)
raise MdfException(message)
self.name = get_text_v4(self.name_addr, stream, mapped=mapped)
self.unit = get_text_v4(self.unit_addr, stream, mapped=mapped)
self.comment = get_text_v4(self.comment_addr, stream, mapped=mapped)
if kwargs.get("use_display_names", True):
try:
display_name = ET.fromstring(sanitize_xml(self.comment)).find(
".//names/display"
)
if display_name is not None:
self.display_name = display_name.text
except:
self.display_name = ""
else:
self.display_name = ""
si_map = kwargs.get("si_map", {})
cc_map = kwargs.get("cc_map", {})
address = self.conversion_addr
if address:
if mapped:
(size,) = UINT64_uf(stream, address + 8)
raw_bytes = stream[address: address + size]
else:
stream.seek(address + 8)
(size,) = UINT64_u(stream.read(8))
stream.seek(address)
raw_bytes = stream.read(size)
if raw_bytes in cc_map:
conv = cc_map[raw_bytes]
else:
conv = ChannelConversion(
raw_bytes=raw_bytes, stream=stream, address=address,
mapped=mapped,
)
cc_map[raw_bytes] = conv
self.conversion = conv
else:
self.conversion = None
address = self.source_addr
if address:
if mapped:
raw_bytes = stream[address: address + v4c.SI_BLOCK_SIZE]
else:
stream.seek(address)
raw_bytes = stream.read(v4c.SI_BLOCK_SIZE)
if raw_bytes in si_map:
source = si_map[raw_bytes]
else:
source = SourceInformation(
raw_bytes=raw_bytes, stream=stream, address=address,
mapped=mapped,
)
si_map[raw_bytes] = source
self.source = source
else:
self.source = None
self.dtype_fmt = self.attachment = None
else:
stream.seek(address)
block = stream.read(CN_BLOCK_SIZE)
(self.id, self.reserved0, self.block_len, self.links_nr) = COMMON_uf(block)
if self.block_len == CN_BLOCK_SIZE:
(
self.next_ch_addr,
self.component_addr,
self.name_addr,
self.source_addr,
self.conversion_addr,
self.data_block_addr,
self.unit_addr,
self.comment_addr,
self.channel_type,
self.sync_type,
self.data_type,
self.bit_offset,
self.byte_offset,
self.bit_count,
self.flags,
self.pos_invalidation_bit,
self.precision,
self.reserved1,
self.attachment_nr,
self.min_raw_value,
self.max_raw_value,
self.lower_limit,
self.upper_limit,
self.lower_ext_limit,
self.upper_ext_limit,
) = SIMPLE_CHANNEL_PARAMS_uf(block, COMMON_SIZE)
else:
block = block[24:] + stream.read(self.block_len - CN_BLOCK_SIZE)
links_nr = self.links_nr
links = unpack_from(f"<{links_nr}Q", block)
params = unpack_from(v4c.FMT_CHANNEL_PARAMS, block, links_nr * 8)
(
self.next_ch_addr,
self.component_addr,
self.name_addr,
self.source_addr,
self.conversion_addr,
self.data_block_addr,
self.unit_addr,
self.comment_addr,
) = links[:8]
at_map = kwargs.get("at_map", {})
if params[10]:
self.attachment = []
self.attachment_addr = links[8]
self.attachment = at_map.get(links[8], 0)
self.links_nr -= params[10] - 1
self.block_len -= (params[10] - 1) * 8
params[10] = 1
if params[6] & v4c.FLAG_CN_DEFAULT_X:
(
self.default_X_dg_addr,
self.default_X_cg_addr,
self.default_X_ch_addr,
) = links[-3:]
# default X not supported yet
(
self.default_X_dg_addr,
self.default_X_cg_addr,
self.default_X_ch_addr,
) = (0, 0, 0)
(
self.channel_type,
self.sync_type,
self.data_type,
self.bit_offset,
self.byte_offset,
self.bit_count,
self.flags,
self.pos_invalidation_bit,
self.precision,
self.reserved1,
self.attachment_nr,
self.min_raw_value,
self.max_raw_value,
self.lower_limit,
self.upper_limit,
self.lower_ext_limit,
self.upper_ext_limit,
) = params
if self.id != b"##CN":
message = f'Expected "##CN" block @{hex(address)} but found "{self.id}"'
logger.exception(message)
raise MdfException(message)
self.name = get_text_v4(self.name_addr, stream)
self.unit = get_text_v4(self.unit_addr, stream)
self.comment = get_text_v4(self.comment_addr, stream)
if kwargs.get("use_display_names", True):
try:
display_name = ET.fromstring(sanitize_xml(self.comment)).find(
".//names/display"
)
if display_name is not None:
self.display_name = display_name.text
except ET.ParseError:
self.display_name = ""
else:
self.display_name = ""
si_map = kwargs.get("si_map", {})
cc_map = kwargs.get("cc_map", {})
address = self.conversion_addr
if address:
stream.seek(address + 8)
(size,) = UINT64_u(stream.read(8))
stream.seek(address)
raw_bytes = stream.read(size)
if raw_bytes in cc_map:
conv = cc_map[raw_bytes]
else:
conv = ChannelConversion(
raw_bytes=raw_bytes, stream=stream, address=address
)
cc_map[raw_bytes] = conv
self.conversion = conv
else:
self.conversion = None
address = self.source_addr
if address:
stream.seek(address)
raw_bytes = stream.read(v4c.SI_BLOCK_SIZE)
if raw_bytes in si_map:
source = si_map[raw_bytes]
else:
source = SourceInformation(
raw_bytes=raw_bytes, stream=stream, address=address
)
si_map[raw_bytes] = source
self.source = source
else:
self.source = None
self.dtype_fmt = self.attachment = None
else:
self.address = 0
self.name = self.comment = self.display_name = self.unit = ""
self.conversion = self.source = self.attachment = self.dtype_fmt = None
self.id = b"##CN"
self.reserved0 = 0
self.block_len = v4c.CN_BLOCK_SIZE
self.links_nr = 8
self.next_ch_addr = 0
self.component_addr = 0
self.name_addr = 0
self.source_addr = 0
self.conversion_addr = 0
self.data_block_addr = 0
self.unit_addr = 0
self.comment_addr = 0
try:
self.attachment_addr = kwargs["attachment_addr"]
self.block_len += 8
self.links_nr += 1
attachments = 1
except KeyError:
attachments = 0
self.channel_type = kwargs["channel_type"]
self.sync_type = kwargs.get("sync_type", 0)
self.data_type = kwargs["data_type"]
self.bit_offset = kwargs["bit_offset"]
self.byte_offset = kwargs["byte_offset"]
self.bit_count = kwargs["bit_count"]
self.flags = kwargs.get("flags", 0)
self.pos_invalidation_bit = kwargs.get("pos_invalidation_bit", 0)
self.precision = kwargs.get("precision", 3)
self.reserved1 = 0
self.attachment_nr = attachments
self.min_raw_value = kwargs.get("min_raw_value", 0)
self.max_raw_value = kwargs.get("max_raw_value", 0)
self.lower_limit = kwargs.get("lower_limit", 0)
self.upper_limit = kwargs.get("upper_limit", 0)
self.lower_ext_limit = kwargs.get("lower_ext_limit", 0)
self.upper_ext_limit = kwargs.get("upper_ext_limit", 0)
# ignore MLSD signal data
if self.channel_type == v4c.CHANNEL_TYPE_MLSD:
self.data_block_addr = 0
self.channel_type = v4c.CHANNEL_TYPE_VALUE
def __getitem__(self, item):
return self.__getattribute__(item)
def __setitem__(self, item, value):
self.__setattr__(item, value)
def to_blocks(self, address, blocks, defined_texts, cc_map, si_map):
text = self.name
if text:
if text in defined_texts:
self.name_addr = defined_texts[text]
else:
tx_block = TextBlock(text=text)
self.name_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.name_addr = 0
text = self.unit
if text:
if text in defined_texts:
self.unit_addr = defined_texts[text]
else:
tx_block = TextBlock(text=text)
self.unit_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.unit_addr = 0
comment = self.comment
display_name = self.display_name
if display_name and not text:
text = v4c.CN_COMMENT_TEMPLATE.format(comment, display_name)
elif display_name and comment:
if not comment.startswith("<CNcomment"):
text = v4c.CN_COMMENT_TEMPLATE.format(comment, display_name)
else:
if display_name not in comment:
try:
CNcomment = ET.fromstring(comment)
display_name_element = CNcomment.find(".//names/display")
if display_name is not None:
display_name_element.text = display_name
else:
display = ET.Element("display")
display.text = display_name
names = ET.Element("names")
names.append(display)
CNcomment.append(names)
text = ET.tostring(CNcomment).decode("utf-8")
except UnicodeEncodeError:
text = comment
else:
text = comment
else:
text = comment
if text:
if text in defined_texts:
self.comment_addr = defined_texts[text]
else:
meta = text.startswith("<CNcomment")
tx_block = TextBlock(text=text, meta=meta)
self.comment_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.comment_addr = 0
conversion = self.conversion
if conversion:
address = conversion.to_blocks(address, blocks, defined_texts, cc_map)
self.conversion_addr = conversion.address
else:
self.conversion_addr = 0
source = self.source
if source:
address = source.to_blocks(address, blocks, defined_texts, si_map)
self.source_addr = source.address
else:
self.source_addr = 0
blocks.append(self)
self.address = address
address += self.block_len
return address
def __bytes__(self):
if self.block_len == v4c.CN_BLOCK_SIZE:
return v4c.SIMPLE_CHANNEL_PACK(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.next_ch_addr,
self.component_addr,
self.name_addr,
self.source_addr,
self.conversion_addr,
self.data_block_addr,
self.unit_addr,
self.comment_addr,
self.channel_type,
self.sync_type,
self.data_type,
self.bit_offset,
self.byte_offset,
self.bit_count,
self.flags,
self.pos_invalidation_bit,
self.precision,
self.reserved1,
self.attachment_nr,
self.min_raw_value,
self.max_raw_value,
self.lower_limit,
self.upper_limit,
self.lower_ext_limit,
self.upper_ext_limit,
)
else:
fmt = v4c.FMT_CHANNEL.format(self.links_nr)
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"next_ch_addr",
"component_addr",
"name_addr",
"source_addr",
"conversion_addr",
"data_block_addr",
"unit_addr",
"comment_addr",
)
if self.attachment_nr:
keys += ("attachment_addr",)
if self.flags & v4c.FLAG_CN_DEFAULT_X:
keys += ("default_X_dg_addr", "default_X_cg_addr", "default_X_ch_addr")
keys += (
"channel_type",
"sync_type",
"data_type",
"bit_offset",
"byte_offset",
"bit_count",
"flags",
"pos_invalidation_bit",
"precision",
"reserved1",
"attachment_nr",
"min_raw_value",
"max_raw_value",
"lower_limit",
"upper_limit",
"lower_ext_limit",
"upper_ext_limit",
)
return pack(fmt, *[getattr(self, key) for key in keys])
def __str__(self):
return f"""<Channel (name: {self.name}, unit: {self.unit}, comment: {self.comment}, address: {hex(self.address)},
conversion: {self.conversion},
source: {self.source},
fields: {', '.join(block_fields(self))})>"""
def metadata(self):
if self.block_len == v4c.CN_BLOCK_SIZE:
keys = v4c.KEYS_SIMPLE_CHANNEL
else:
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"next_ch_addr",
"component_addr",
"name_addr",
"source_addr",
"conversion_addr",
"data_block_addr",
"unit_addr",
"comment_addr",
)
if self.attachment_nr:
keys += ("attachment_addr",)
if self.flags & v4c.FLAG_CN_DEFAULT_X:
keys += ("default_X_dg_addr", "default_X_cg_addr", "default_X_ch_addr")
keys += (
"channel_type",
"sync_type",
"data_type",
"bit_offset",
"byte_offset",
"bit_count",
"flags",
"pos_invalidation_bit",
"precision",
"reserved1",
"attachment_nr",
"min_raw_value",
"max_raw_value",
"lower_limit",
"upper_limit",
"lower_ext_limit",
"upper_ext_limit",
)
max_len = max(len(key) for key in keys)
template = f"{{: <{max_len}}}: {{}}"
metadata = []
lines = f"""
name: {self.name}
display name: {self.display_name}
address: {hex(self.address)}
comment: {self.comment}
""".split("\n")
for key in keys:
val = getattr(self, key)
if key.endswith("addr") or key.startswith("text_"):
lines.append(template.format(key, hex(val)))
elif isinstance(val, float):
lines.append(template.format(key, round(val, 6)))
else:
if isinstance(val, bytes):
lines.append(template.format(key, val.strip(b"\0")))
else:
lines.append(template.format(key, val))
for line in lines:
if not line:
metadata.append(line)
else:
for wrapped_line in wrap(line, width=120):
metadata.append(wrapped_line)
return "\n".join(metadata)
def __contains__(self, item):
return hasattr(self, item)
def __lt__(self, other):
self_byte_offset = self.byte_offset
other_byte_offset = other.byte_offset
if self_byte_offset < other_byte_offset:
result = 1
elif self_byte_offset == other_byte_offset:
self_range = self.bit_offset + self.bit_count
other_range = other.bit_offset + other.bit_count
if self_range > other_range:
result = 1
else:
result = 0
else:
result = 0
return result
class _ChannelArrayBlockBase:
__slots__ = (
"referenced_channels",
"address",
"id",
"reserved0",
"block_len",
"links_nr",
"ca_type",
"storage",
"dims",
"flags",
"byte_offset_base",
"invalidation_bit_base",
)
class ChannelArrayBlock(_ChannelArrayBlockBase):
"""
Other attributes
* ``address`` - int : array block address
* ``referenced_channels`` - list : list of (group index, channel index)
pairs referenced by this array block
"""
def __init__(self, **kwargs):
self.referenced_channels = []
try:
self.address = address = kwargs["address"]
stream = kwargs["stream"]
mapped = kwargs.get("mapped", False)
if mapped:
(self.id, self.reserved0, self.block_len, self.links_nr) = v4c.COMMON_uf(
stream, address
)
nr = self.links_nr
address += COMMON_SIZE
links = unpack_from(f"<{nr}Q", stream, address)
self.composition_addr = links[0]
address += nr * 8
values = unpack_from("<2BHIiI", stream, address)
dims_nr = values[2]
if nr == 1:
pass
# lookup table with fixed axis
elif nr == dims_nr + 1:
for i in range(dims_nr):
self[f"axis_conversion_{i}"] = links[i + 1]
# lookup table with CN template
elif nr == 4 * dims_nr + 1:
for i in range(dims_nr):
self[f"axis_conversion_{i}"] = links[i + 1]
links = links[dims_nr + 1 :]
for i in range(dims_nr):
self[f"scale_axis_{i}_dg_addr"] = links[3 * i]
self[f"scale_axis_{i}_cg_addr"] = links[3 * i + 1]
self[f"scale_axis_{i}_ch_addr"] = links[3 * i + 2]
(
self.ca_type,
self.storage,
self.dims,
self.flags,
self.byte_offset_base,
self.invalidation_bit_base,
) = values
address += 16
dim_sizes = unpack_from(f"<{dims_nr}Q", stream, address)
for i, size in enumerate(dim_sizes):
self[f"dim_size_{i}"] = size
if self.flags & v4c.FLAG_CA_FIXED_AXIS:
for i in range(dims_nr):
for j in range(self[f"dim_size_{i}"]):
(value,) = FLOAT64_u(stream.read(8))
self[f"axis_{i}_value_{j}"] = value
else:
stream.seek(address)
(self.id, self.reserved0, self.block_len, self.links_nr) = unpack(
"<4sI2Q", stream.read(24)
)
nr = self.links_nr
links = unpack(f"<{nr}Q", stream.read(8 * nr))
self.composition_addr = links[0]
values = unpack("<2BHIiI", stream.read(16))
dims_nr = values[2]
if nr == 1:
pass
# lookup table with fixed axis
elif nr == dims_nr + 1:
for i in range(dims_nr):
self[f"axis_conversion_{i}"] = links[i + 1]
# lookup table with CN template
elif nr == 4 * dims_nr + 1:
for i in range(dims_nr):
self[f"axis_conversion_{i}"] = links[i + 1]
links = links[dims_nr + 1 :]
for i in range(dims_nr):
self[f"scale_axis_{i}_dg_addr"] = links[3 * i]
self[f"scale_axis_{i}_cg_addr"] = links[3 * i + 1]
self[f"scale_axis_{i}_ch_addr"] = links[3 * i + 2]
(
self.ca_type,
self.storage,
self.dims,
self.flags,
self.byte_offset_base,
self.invalidation_bit_base,
) = values
dim_sizes = unpack(f"<{dims_nr}Q", stream.read(8 * dims_nr))
for i, size in enumerate(dim_sizes):
self[f"dim_size_{i}"] = size
if self.flags & v4c.FLAG_CA_FIXED_AXIS:
for i in range(dims_nr):
for j in range(self[f"dim_size_{i}"]):
(value,) = FLOAT64_u(stream.read(8))
self[f"axis_{i}_value_{j}"] = value
if self.id != b"##CA":
message = f'Expected "##CA" block @{hex(address)} but found "{self.id}"'
logger.exception(message)
raise MdfException(message)
except KeyError:
self.id = b"##CA"
self.reserved0 = 0
ca_type = kwargs["ca_type"]
if ca_type == v4c.CA_TYPE_ARRAY:
dims_nr = kwargs["dims"]
self.block_len = 48 + dims_nr * 8
self.links_nr = 1
self.composition_addr = 0
self.ca_type = v4c.CA_TYPE_ARRAY
self.storage = v4c.CA_STORAGE_TYPE_CN_TEMPLATE
self.dims = dims_nr
self.flags = 0
self.byte_offset_base = kwargs.get("byte_offset_base", 1)
self.invalidation_bit_base = kwargs.get("invalidation_bit_base", 0)
for i in range(dims_nr):
self[f"dim_size_{i}"] = kwargs[f"dim_size_{i}"]
elif ca_type == v4c.CA_TYPE_SCALE_AXIS:
self.block_len = 56
self.links_nr = 1
self.composition_addr = 0
self.ca_type = v4c.CA_TYPE_SCALE_AXIS
self.storage = v4c.CA_STORAGE_TYPE_CN_TEMPLATE
self.dims = 1
self.flags = 0
self.byte_offset_base = kwargs.get("byte_offset_base", 1)
self.invalidation_bit_base = kwargs.get("invalidation_bit_base", 0)
self.dim_size_0 = kwargs["dim_size_0"]
elif ca_type == v4c.CA_TYPE_LOOKUP:
flags = kwargs["flags"]
dims_nr = kwargs["dims"]
values = sum(kwargs[f"dim_size_{i}"] for i in range(dims_nr))
if flags & v4c.FLAG_CA_FIXED_AXIS:
self.block_len = 48 + dims_nr * 16 + values * 8
self.links_nr = 1 + dims_nr
self.composition_addr = 0
for i in range(dims_nr):
self[f"axis_conversion_{i}"] = 0
self.ca_type = v4c.CA_TYPE_LOOKUP
self.storage = v4c.CA_STORAGE_TYPE_CN_TEMPLATE
self.dims = dims_nr
self.flags = v4c.FLAG_CA_FIXED_AXIS | v4c.FLAG_CA_AXIS
self.byte_offset_base = kwargs.get("byte_offset_base", 1)
self.invalidation_bit_base = kwargs.get("invalidation_bit_base", 0)
for i in range(dims_nr):
self[f"dim_size_{i}"] = kwargs[f"dim_size_{i}"]
for i in range(dims_nr):
for j in range(self[f"dim_size_{i}"]):
self[f"axis_{i}_value_{j}"] = kwargs.get(
f"axis_{i}_value_{j}", j
)
else:
self.block_len = 48 + dims_nr * 5 * 8
self.links_nr = 1 + dims_nr * 4
self.composition_addr = 0
for i in range(dims_nr):
self[f"axis_conversion_{i}"] = 0
for i in range(dims_nr):
self[f"scale_axis_{i}_dg_addr"] = 0
self[f"scale_axis_{i}_cg_addr"] = 0
self[f"scale_axis_{i}_ch_addr"] = 0
self.ca_type = v4c.CA_TYPE_LOOKUP
self.storage = v4c.CA_STORAGE_TYPE_CN_TEMPLATE
self.dims = dims_nr
self.flags = v4c.FLAG_CA_AXIS
self.byte_offset_base = kwargs.get("byte_offset_base", 1)
self.invalidation_bit_base = kwargs.get("invalidation_bit_base", 0)
for i in range(dims_nr):
self[f"dim_size_{i}"] = kwargs[f"dim_size_{i}"]
def __getitem__(self, item):
return self.__getattribute__(item)
def __setitem__(self, item, value):
self.__setattr__(item, value)
def __str__(self):
return "<ChannelArrayBlock (referenced channels: {}, address: {}, fields: {})>".format(
self.referenced_channels, hex(self.address), dict(self)
)
def __bytes__(self):
flags = self.flags
ca_type = self.ca_type
dims_nr = self.dims
if ca_type == v4c.CA_TYPE_ARRAY:
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"composition_addr",
"ca_type",
"storage",
"dims",
"flags",
"byte_offset_base",
"invalidation_bit_base",
)
keys += tuple(f"dim_size_{i}" for i in range(dims_nr))
fmt = f"<4sI3Q2BHIiI{dims_nr}Q"
elif ca_type == v4c.CA_TYPE_SCALE_AXIS:
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"composition_addr",
"ca_type",
"storage",
"dims",
"flags",
"byte_offset_base",
"invalidation_bit_base",
"dim_size_0",
)
fmt = "<4sI3Q2BHIiIQ"
elif ca_type == v4c.CA_TYPE_LOOKUP:
if flags & v4c.FLAG_CA_FIXED_AXIS:
nr = sum(self[f"dim_size_{i}"] for i in range(dims_nr))
keys = ("id", "reserved0", "block_len", "links_nr", "composition_addr")
keys += tuple(f"axis_conversion_{i}" for i in range(dims_nr))
keys += (
"ca_type",
"storage",
"dims",
"flags",
"byte_offset_base",
"invalidation_bit_base",
)
keys += tuple(f"dim_size_{i}" for i in range(dims_nr))
keys += tuple(
f"axis_{i}_value_{j}"
for i in range(dims_nr)
for j in range(self[f"dim_size_{i}"])
)
fmt = "<4sI{}Q2BHIiI{}Q{}d"
fmt = fmt.format(self.links_nr + 2, dims_nr, nr)
else:
keys = ("id", "reserved0", "block_len", "links_nr", "composition_addr")
keys += tuple(f"axis_conversion_{i}" for i in range(dims_nr))
for i in range(dims_nr):
keys += (
f"scale_axis_{i}_dg_addr",
f"scale_axis_{i}_cg_addr",
f"scale_axis_{i}_ch_addr",
)
keys += (
"ca_type",
"storage",
"dims",
"flags",
"byte_offset_base",
"invalidation_bit_base",
)
keys += tuple(f"dim_size_{i}" for i in range(dims_nr))
fmt = "<4sI{}Q2BHIiI{}Q".format(self.links_nr + 2, dims_nr)
result = pack(fmt, *[getattr(self, key) for key in keys])
return result
[docs]class ChannelGroup:
"""*ChannelGroup* has the following attributes, that are also available as
dict like key-value pairs
CGBLOCK fields
* ``id`` - bytes : block ID; always b'##CG'
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``next_cg_addr`` - int : next channel group address
* ``first_ch_addr`` - int : address of first channel of this channel group
* ``acq_name_addr`` - int : address of TextBLock that contains the channel
group acquisition name
* ``acq_source_addr`` - int : addres of SourceInformation that contains the
channel group source
* ``first_sample_reduction_addr`` - int : address of first SRBLOCK; this is
considered 0 since sample reduction is not yet supported
* ``comment_addr`` - int : address of TXBLOCK/MDBLOCK that contains the
channel group comment
* ``record_id`` - int : record ID for thei channel group
* ``cycles_nr`` - int : number of cycles for this channel group
* ``flags`` - int : channel group flags
* ``path_separator`` - int : ordinal for character used as path separator
* ``reserved1`` - int : reserved bytes
* ``samples_byte_nr`` - int : number of bytes used for channels samples in
the record for this channel group; this does not contain the invalidation
bytes
* ``invalidation_bytes_nr`` - int : number of bytes used for invalidation
bits by this channl group
Other attributes
* ``acq_name`` - str : acquisition name
* ``acq_source`` - SourceInformation : acquisition source information
* ``address`` - int : channel group address
* ``comment`` - str : channel group comment
"""
__slots__ = (
"address",
"acq_name",
"acq_source",
"comment",
"name",
"id",
"reserved0",
"block_len",
"links_nr",
"next_cg_addr",
"first_ch_addr",
"acq_name_addr",
"acq_source_addr",
"first_sample_reduction_addr",
"comment_addr",
"record_id",
"cycles_nr",
"flags",
"path_separator",
"reserved1",
"samples_byte_nr",
"invalidation_bytes_nr",
)
def __init__(self, **kwargs):
self.acq_name = self.comment = ""
self.acq_source = None
try:
self.address = address = kwargs["address"]
stream = kwargs["stream"]
mapped = kwargs.get("mapped", False)
if mapped:
(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.next_cg_addr,
self.first_ch_addr,
self.acq_name_addr,
self.acq_source_addr,
self.first_sample_reduction_addr,
self.comment_addr,
self.record_id,
self.cycles_nr,
self.flags,
self.path_separator,
self.reserved1,
self.samples_byte_nr,
self.invalidation_bytes_nr,
) = v4c.CHANNEL_GROUP_uf(stream, address)
else:
stream.seek(address)
(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.next_cg_addr,
self.first_ch_addr,
self.acq_name_addr,
self.acq_source_addr,
self.first_sample_reduction_addr,
self.comment_addr,
self.record_id,
self.cycles_nr,
self.flags,
self.path_separator,
self.reserved1,
self.samples_byte_nr,
self.invalidation_bytes_nr,
) = v4c.CHANNEL_GROUP_u(stream.read(v4c.CG_BLOCK_SIZE))
if self.id != b"##CG":
message = f'Expected "##CG" block @{hex(address)} but found "{self.id}"'
logger.exception(message)
raise MdfException(message)
self.acq_name = get_text_v4(self.acq_name_addr, stream, mapped=mapped)
self.comment = get_text_v4(self.comment_addr, stream, mapped=mapped)
if self.acq_source_addr:
self.acq_source = SourceInformation(
address=self.acq_source_addr, stream=stream, mapped=mapped
)
except KeyError:
self.address = 0
self.id = b"##CG"
self.reserved0 = kwargs.get("reserved0", 0)
self.block_len = kwargs.get("block_len", v4c.CG_BLOCK_SIZE)
self.links_nr = kwargs.get("links_nr", 6)
self.next_cg_addr = kwargs.get("next_cg_addr", 0)
self.first_ch_addr = kwargs.get("first_ch_addr", 0)
self.acq_name_addr = kwargs.get("acq_name_addr", 0)
self.acq_source_addr = kwargs.get("acq_source_addr", 0)
self.first_sample_reduction_addr = kwargs.get(
"first_sample_reduction_addr", 0
)
self.comment_addr = kwargs.get("comment_addr", 0)
self.record_id = kwargs.get("record_id", 1)
self.cycles_nr = kwargs.get("cycles_nr", 0)
self.flags = kwargs.get("flags", 0)
self.path_separator = kwargs.get("path_separator", 0)
self.reserved1 = kwargs.get("reserved1", 0)
self.samples_byte_nr = kwargs.get("samples_byte_nr", 0)
self.invalidation_bytes_nr = kwargs.get("invalidation_bytes_nr", 0)
def __getitem__(self, item):
return self.__getattribute__(item)
def __setitem__(self, item, value):
self.__setattr__(item, value)
def to_blocks(self, address, blocks, defined_texts, si_map):
text = self.acq_name
if text:
if text in defined_texts:
self.acq_name_addr = defined_texts[text]
else:
tx_block = TextBlock(text=text)
self.acq_name_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.acq_name_addr = 0
text = self.comment
if text:
if text in defined_texts:
self.comment_addr = defined_texts[text]
else:
meta = text.startswith("<CGcomment")
tx_block = TextBlock(text=text, meta=meta)
self.comment_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.comment_addr = 0
source = self.acq_source
if source:
address = source.to_blocks(address, blocks, defined_texts, si_map)
self.acq_source_addr = source.address
else:
self.acq_source_addr = 0
blocks.append(self)
self.address = address
address += self.block_len
return address
def __bytes__(self):
result = v4c.CHANNEL_GROUP_p(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.next_cg_addr,
self.first_ch_addr,
self.acq_name_addr,
self.acq_source_addr,
self.first_sample_reduction_addr,
self.comment_addr,
self.record_id,
self.cycles_nr,
self.flags,
self.path_separator,
self.reserved1,
self.samples_byte_nr,
self.invalidation_bytes_nr,
)
return result
class _ChannelConversionBase:
__slots__ = (
"name",
"unit",
"comment",
"formula",
"referenced_blocks",
"address",
"id",
"reserved0",
"block_len",
"links_nr",
"name_addr",
"unit_addr",
"comment_addr",
"inv_conv_addr",
"conversion_type",
"precision",
"flags",
"ref_param_nr",
"val_param_nr",
"min_phy_value",
"max_phy_value",
"a",
"b",
"P1",
"P2",
"P3",
"P4",
"P5",
"P6",
)
[docs]class ChannelConversion(_ChannelConversionBase):
"""*ChannelConversion* has the following attributes, that are also available as
dict like key-value pairs
CCBLOCK common fields
* ``id`` - bytes : block ID; always b'##CG'
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``name_addr`` - int : address of TXBLOCK that contains the
conversion name
* ``unit_addr`` - int : address of TXBLOCK that contains the
conversion unit
* ``comment_addr`` - int : address of TXBLOCK/MDBLOCK that contains the
conversion comment
* ``inv_conv_addr`` int : address of invers conversion
* ``conversion_type`` int : integer code for conversion type
* ``precision`` - int : integer code for precision
* ``flags`` - int : conversion block flags
* ``ref_param_nr`` - int : number fo referenced parameters (linked
parameters)
* ``val_param_nr`` - int : number of value parameters
* ``min_phy_value`` - float : minimum physical channel value
* ``max_phy_value`` - float : maximum physical channel value
CCBLOCK specific fields
* linear conversion
* ``a`` - float : factor
* ``b`` - float : offset
* rational conversion
* ``P1`` to ``P6`` - float : parameters
* algebraic conversion
* ``formula_addr`` - address of TXBLOCK that contains the
the algebraic conversion formula
* tabluar conversion with or without interpolation
* ``raw_<N>`` - float : N-th raw value
* ``phys_<N>`` - float : N-th physical value
* tabular range conversion
* ``lower_<N>`` - float : N-th lower value
* ``upper_<N>`` - float : N-th upper value
* ``phys_<N>`` - float : N-th physical value
* tabular value to text conversion
* ``val_<N>`` - float : N-th raw value
* ``text_<N>`` - int : address of N-th TXBLOCK that
contains the physical value
* ``default`` - int : address of TXBLOCK that contains
the default physical value
* tabular range to text conversion
* ``lower_<N>`` - float : N-th lower value
* ``upper_<N>`` - float : N-th upper value
* ``text_<N>`` - int : address of N-th TXBLOCK that
contains the physical value
* ``default`` - int : address of TXBLOCK that contains
the default physical value
* text to value conversion
* ``val_<N>`` - float : N-th physical value
* ``text_<N>`` - int : address of N-th TXBLOCK that
contains the raw value
* ``val_default`` - float : default physical value
* text tranfosrmation (translation) conversion
* ``input_<N>_addr`` - int : address of N-th TXBLOCK that
contains the raw value
* ``output_<N>_addr`` - int : address of N-th TXBLOCK that
contains the physical value
* ``default_addr`` - int : address of TXBLOCK that contains
the default physical value
Other attributes
* ``address`` - int : channel conversion address
* ``comment`` - str : channel conversion comment
* ``formula`` - str : algebraic conversion formula; default ''
* ``referenced_blocks`` - list : list of refenced blocks; can be TextBlock
objects for value to text, and text to text conversions; for partial
conversions the referenced blocks can be ChannelConversion obejct as well
* ``name`` - str : channel conversion name
* ``unit`` - str : channel conversion unit
"""
def __init__(self, **kwargs):
self.name = self.unit = self.comment = self.formula = ""
self.referenced_blocks = None
if "stream" in kwargs:
mapped = kwargs.get("mapped", False)
stream = kwargs["stream"]
try:
block = kwargs["raw_bytes"]
(self.id, self.reserved0, self.block_len, self.links_nr) = COMMON_uf(
block
)
block = block[COMMON_SIZE:]
self.address = kwargs.get("address", 0)
except KeyError:
self.address = address = kwargs["address"]
stream.seek(address)
(self.id, self.reserved0, self.block_len, self.links_nr) = COMMON_u(
stream.read(COMMON_SIZE)
)
block = stream.read(self.block_len - COMMON_SIZE)
(conv,) = UINT8_uf(block, self.links_nr * 8)
if conv == v4c.CONVERSION_TYPE_NON:
(
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
) = v4c.CONVERSION_NONE_INIT_u(block)
elif conv == v4c.CONVERSION_TYPE_LIN:
(
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
self.b,
self.a,
) = v4c.CONVERSION_LINEAR_INIT_u(block)
elif conv == v4c.CONVERSION_TYPE_RAT:
(
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
self.P1,
self.P2,
self.P3,
self.P4,
self.P5,
self.P6,
) = unpack(v4c.FMT_CONVERSION_RAT_INIT, block)
elif conv == v4c.CONVERSION_TYPE_ALG:
(
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
self.formula_addr,
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
) = unpack(v4c.FMT_CONVERSION_ALGEBRAIC_INIT, block)
elif conv in (v4c.CONVERSION_TYPE_TABI, v4c.CONVERSION_TYPE_TAB):
(
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
) = unpack_from(v4c.FMT_CONVERSION_NONE_INIT, block)
nr = self.val_param_nr
values = unpack(f"<{nr}d", block[56:])
for i in range(nr // 2):
self[f"raw_{i}"], self[f"phys_{i}"] = (
values[i * 2],
values[2 * i + 1],
)
elif conv == v4c.CONVERSION_TYPE_RTAB:
(
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
) = unpack_from(v4c.FMT_CONVERSION_NONE_INIT, block)
nr = self.val_param_nr
values = unpack(f"<{nr}d", block[56:])
for i in range((nr - 1) // 3):
(self[f"lower_{i}"], self[f"upper_{i}"], self[f"phys_{i}"]) = (
values[i * 3],
values[3 * i + 1],
values[3 * i + 2],
)
(self.default,) = FLOAT64_u(block[-8:])
elif conv == v4c.CONVERSION_TYPE_TABX:
(
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
) = unpack_from("<4Q", block)
links_nr = self.links_nr - 4
links = unpack_from(f"<{links_nr}Q", block, 32)
for i, link in enumerate(links[:-1]):
self[f"text_{i}"] = link
self.default_addr = links[-1]
(
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
) = unpack_from("<2B3H2d", block, 32 + links_nr * 8)
values = unpack_from(f"<{links_nr - 1}d", block, 32 + links_nr * 8 + 24)
for i, val in enumerate(values):
self[f"val_{i}"] = val
elif conv == v4c.CONVERSION_TYPE_RTABX:
(
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
) = unpack_from("<4Q", block)
links_nr = self.links_nr - 4
links = unpack_from(f"<{links_nr}Q", block, 32)
for i, link in enumerate(links[:-1]):
self[f"text_{i}"] = link
self.default_addr = links[-1]
(
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
) = unpack_from("<2B3H2d", block, 32 + links_nr * 8)
values = unpack_from(
"<{}d".format((links_nr - 1) * 2), block, 32 + links_nr * 8 + 24
)
for i in range(self.val_param_nr // 2):
j = 2 * i
self[f"lower_{i}"] = values[j]
self[f"upper_{i}"] = values[j + 1]
elif conv == v4c.CONVERSION_TYPE_TTAB:
(
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
) = unpack_from("<4Q", block)
links_nr = self.links_nr - 4
links = unpack_from(f"<{links_nr}Q", block, 32)
for i, link in enumerate(links):
self[f"text_{i}"] = link
(
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
) = unpack_from("<2B3H2d", block, 32 + links_nr * 8)
values = unpack_from(
f"<{self.val_param_nr}d", block, 32 + links_nr * 8 + 24
)
for i, val in enumerate(values[:-1]):
self[f"val_{i}"] = val
self.val_default = values[-1]
elif conv == v4c.CONVERSION_TYPE_TRANS:
(
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
) = unpack_from("<4Q", block)
links_nr = self.links_nr - 4
links = unpack_from(f"<{links_nr}Q", block, 32)
for i in range((links_nr - 1) // 2):
j = 2 * i
self[f"input_{i}_addr"] = links[j]
self[f"output_{i}_addr"] = links[j + 1]
self.default_addr = links[-1]
(
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
) = unpack_from("<2B3H2d", block, 32 + links_nr * 8)
if self.id != b"##CC":
message = f'Expected "##CC" block @{hex(address)} but found "{self.id}"'
logger.exception(message)
raise MdfException(message)
self.name = get_text_v4(self.name_addr, stream, mapped=mapped)
self.unit = get_text_v4(self.unit_addr, stream, mapped=mapped)
self.comment = get_text_v4(self.comment_addr, stream, mapped=mapped)
self.formula = ""
self.referenced_blocks = None
conv_type = conv
if conv_type == v4c.CONVERSION_TYPE_ALG:
self.formula = get_text_v4(self.formula_addr, stream, mapped=mapped)
elif conv_type in v4c.TABULAR_CONVERSIONS:
refs = self.referenced_blocks = {}
if conv_type == v4c.CONVERSION_TYPE_TTAB:
tabs = self.links_nr - 4
else:
tabs = self.links_nr - 4 - 1
for i in range(tabs):
address = self[f"text_{i}"]
if address:
try:
block = TextBlock(address=address, stream=stream, mapped=mapped)
refs[f"text_{i}"] = block
except MdfException:
block = ChannelConversion(address=address, stream=stream, mapped=mapped)
refs[f"text_{i}"] = block
else:
refs[f"text_{i}"] = None
if conv_type != v4c.CONVERSION_TYPE_TTAB:
address = self.default_addr
if address:
try:
block = TextBlock(address=address, stream=stream, mapped=mapped)
refs["default_addr"] = block
except MdfException:
block = ChannelConversion(address=address, stream=stream, mapped=mapped)
refs["default_addr"] = block
else:
refs["default_addr"] = None
elif conv_type == v4c.CONVERSION_TYPE_TRANS:
refs = self.referenced_blocks = {}
# link_nr - common links (4) - default text link (1)
for i in range((self.links_nr - 4 - 1) // 2):
for key in (f"input_{i}_addr", f"output_{i}_addr"):
address = self[key]
if address:
block = TextBlock(address=address, stream=stream, mapped=mapped)
refs[key] = block
address = self.default_addr
if address:
block = TextBlock(address=address, stream=stream, mapped=mapped)
refs["default_addr"] = block
else:
refs["default_addr"] = None
else:
self.name = self.unit = self.comment = self.formula = ""
self.referenced_blocks = None
self.address = 0
self.id = b"##CC"
self.reserved0 = 0
if kwargs["conversion_type"] == v4c.CONVERSION_TYPE_NON:
self.block_len = v4c.CC_NONE_BLOCK_SIZE
self.links_nr = 4
self.name_addr = kwargs.get("name_addr", 0)
self.unit_addr = kwargs.get("unit_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.inv_conv_addr = 0
self.conversion_type = v4c.CONVERSION_TYPE_NON
self.precision = 1
self.flags = 0
self.ref_param_nr = 0
self.val_param_nr = 0
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
elif kwargs["conversion_type"] == v4c.CONVERSION_TYPE_LIN:
self.block_len = v4c.CC_LIN_BLOCK_SIZE
self.links_nr = 4
self.name_addr = kwargs.get("name_addr", 0)
self.unit_addr = kwargs.get("unit_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.inv_conv_addr = kwargs.get("inv_conv_addr", 0)
self.conversion_type = v4c.CONVERSION_TYPE_LIN
self.precision = kwargs.get("precision", 1)
self.flags = kwargs.get("flags", 0)
self.ref_param_nr = 0
self.val_param_nr = 2
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
self.b = kwargs["b"]
self.a = kwargs["a"]
elif kwargs["conversion_type"] == v4c.CONVERSION_TYPE_ALG:
self.block_len = v4c.CC_ALG_BLOCK_SIZE
self.links_nr = 5
self.name_addr = kwargs.get("name_addr", 0)
self.unit_addr = kwargs.get("unit_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.inv_conv_addr = kwargs.get("inv_conv_addr", 0)
self.formula_addr = kwargs.get("formula_addr", 0)
self.conversion_type = v4c.CONVERSION_TYPE_ALG
self.precision = kwargs.get("precision", 1)
self.flags = kwargs.get("flags", 0)
self.ref_param_nr = 1
self.val_param_nr = 0
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
self.formula = kwargs["formula"]
elif kwargs["conversion_type"] in (
v4c.CONVERSION_TYPE_TAB,
v4c.CONVERSION_TYPE_TABI,
):
nr = kwargs["val_param_nr"]
self.block_len = 80 + 8 * nr
self.links_nr = 4
self.name_addr = kwargs.get("name_addr", 0)
self.unit_addr = kwargs.get("unit_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.inv_conv_addr = kwargs.get("inv_conv_addr", 0)
self.conversion_type = kwargs["conversion_type"]
self.precision = kwargs.get("precision", 1)
self.flags = kwargs.get("flags", 0)
self.ref_param_nr = 0
self.val_param_nr = nr
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
for i in range(nr // 2):
self[f"raw_{i}"] = kwargs[f"raw_{i}"]
self[f"phys_{i}"] = kwargs[f"phys_{i}"]
elif kwargs["conversion_type"] == v4c.CONVERSION_TYPE_RTAB:
self.block_len = kwargs["val_param_nr"] * 8 + 80
self.links_nr = 4
self.name_addr = kwargs.get("name_addr", 0)
self.unit_addr = kwargs.get("unit_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.inv_conv_addr = kwargs.get("inv_conv_addr", 0)
self.conversion_type = v4c.CONVERSION_TYPE_RTAB
self.precision = kwargs.get("precision", 0)
self.flags = kwargs.get("flags", 0)
self.ref_param_nr = 0
self.val_param_nr = kwargs["val_param_nr"]
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
for i in range((kwargs["val_param_nr"] - 1) // 3):
self[f"lower_{i}"] = kwargs[f"lower_{i}"]
self[f"upper_{i}"] = kwargs[f"upper_{i}"]
self[f"phys_{i}"] = kwargs[f"phys_{i}"]
self.default = kwargs["default"]
elif kwargs["conversion_type"] == v4c.CONVERSION_TYPE_RAT:
self.block_len = 80 + 6 * 8
self.links_nr = 4
self.name_addr = kwargs.get("name_addr", 0)
self.unit_addr = kwargs.get("unit_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.inv_conv_addr = kwargs.get("inv_conv_addr", 0)
self.conversion_type = kwargs["conversion_type"]
self.precision = kwargs.get("precision", 1)
self.flags = kwargs.get("flags", 0)
self.ref_param_nr = 0
self.val_param_nr = kwargs.get("val_param_nr", 6)
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
for i in range(1, 7):
self[f"P{i}"] = kwargs[f"P{i}"]
elif kwargs["conversion_type"] == v4c.CONVERSION_TYPE_TABX:
self.referenced_blocks = {}
nr = kwargs["ref_param_nr"] - 1
self.block_len = (nr * 8 * 2) + 88
self.links_nr = nr + 5
self.name_addr = kwargs.get("name_addr", 0)
self.unit_addr = kwargs.get("unit_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.inv_conv_addr = kwargs.get("inv_conv_addr", 0)
for i in range(nr):
key = f"text_{i}"
self[key] = 0
self.referenced_blocks[key] = TextBlock(text=kwargs[key])
self.default_addr = 0
key = "default_addr"
if "default_addr" in kwargs:
default = kwargs["default_addr"]
else:
default = kwargs.get("default", b"")
if default:
self.referenced_blocks[key] = TextBlock(text=default)
else:
self.referenced_blocks[key] = None
self.conversion_type = v4c.CONVERSION_TYPE_TABX
self.precision = kwargs.get("precision", 0)
self.flags = kwargs.get("flags", 0)
self.ref_param_nr = nr + 1
self.val_param_nr = nr
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
for i in range(nr):
self[f"val_{i}"] = kwargs[f"val_{i}"]
elif kwargs["conversion_type"] == v4c.CONVERSION_TYPE_RTABX:
self.referenced_blocks = {}
nr = kwargs["ref_param_nr"] - 1
self.block_len = (nr * 8 * 3) + 88
self.links_nr = nr + 5
self.name_addr = kwargs.get("name_addr", 0)
self.unit_addr = kwargs.get("unit_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.inv_conv_addr = kwargs.get("inv_conv_addr", 0)
for i in range(nr):
key = f"text_{i}"
self[key] = 0
self.referenced_blocks[key] = TextBlock(text=kwargs[key])
self.default_addr = 0
if "default_addr" in kwargs:
default = kwargs["default_addr"]
else:
default = kwargs.get("default", b"")
if default:
if b"{X}" in default:
default = (
default.decode("latin-1").replace("{X}", "X").split('"')[1]
)
default = ChannelConversion(
conversion_type=v4c.CONVERSION_TYPE_ALG, formula=default
)
self.referenced_blocks["default_addr"] = default
else:
self.referenced_blocks["default_addr"] = TextBlock(text=default)
else:
self.referenced_blocks["default_addr"] = None
self.conversion_type = v4c.CONVERSION_TYPE_RTABX
self.precision = kwargs.get("precision", 0)
self.flags = kwargs.get("flags", 0)
self.ref_param_nr = nr + 1
self.val_param_nr = nr * 2
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
for i in range(nr):
self[f"lower_{i}"] = kwargs[f"lower_{i}"]
self[f"upper_{i}"] = kwargs[f"upper_{i}"]
elif kwargs["conversion_type"] == v4c.CONVERSION_TYPE_TTAB:
self.block_len = ((kwargs["links_nr"] - 4) * 8 * 2) + 88
self.links_nr = kwargs["links_nr"]
self.name_addr = kwargs.get("name_addr", 0)
self.unit_addr = kwargs.get("unit_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.inv_conv_addr = kwargs.get("inv_conv_addr", 0)
for i in range(kwargs["links_nr"] - 4):
self[f"text_{i}"] = kwargs.get(f"text_{i}", 0)
self.conversion_type = v4c.CONVERSION_TYPE_TTAB
self.precision = kwargs.get("precision", 0)
self.flags = kwargs.get("flags", 0)
self.ref_param_nr = kwargs["links_nr"] - 4
self.val_param_nr = kwargs["links_nr"] - 4 + 1
self.min_phy_value = kwargs.get("min_phy_value", 0)
self.max_phy_value = kwargs.get("max_phy_value", 0)
for i in range(kwargs["links_nr"] - 4):
self[f"val_{i}"] = kwargs[f"val_{i}"]
self.val_default = kwargs["val_default"]
else:
message = "Conversion {} dynamic creation not implementated"
message = message.format(kwargs["conversion_type"])
logger.exception(message)
raise MdfException(message)
def to_blocks(self, address, blocks, defined_texts, cc_map):
text = self.name
if text:
if text in defined_texts:
self.name_addr = defined_texts[text]
else:
tx_block = TextBlock(text=text)
self.name_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.name_addr = 0
text = self.unit
if text:
if text in defined_texts:
self.unit_addr = defined_texts[text]
else:
tx_block = TextBlock(text=text)
self.unit_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.unit_addr = 0
if self.conversion_type == v4c.CONVERSION_TYPE_ALG:
text = self.formula
if text:
if text in defined_texts:
self.formula_addr = defined_texts[text]
else:
tx_block = TextBlock(text=text)
self.formula_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.formula_addr = 0
text = self.comment
if text:
if text in defined_texts:
self.comment_addr = defined_texts[text]
else:
meta = text.startswith("<CCcomment")
tx_block = TextBlock(text=text, meta=meta)
self.comment_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.comment_addr = 0
if self.referenced_blocks:
for key, block in self.referenced_blocks.items():
if block:
if block["id"] == b"##TX":
text = block["text"]
if text in defined_texts:
self[key] = defined_texts[text]
else:
defined_texts[text] = address
blocks.append(block)
self[key] = address
address += block["block_len"]
else:
address = block.to_blocks(
address, blocks, defined_texts, cc_map
)
self[key] = block.address
else:
self[key] = 0
bts = bytes(self)
if bts in cc_map:
self.address = cc_map[bts]
else:
blocks.append(bts)
self.address = address
cc_map[bts] = address
address += self.block_len
return address
def convert(self, values):
conversion_type = self.conversion_type
if conversion_type == v4c.CONVERSION_TYPE_NON:
pass
elif conversion_type == v4c.CONVERSION_TYPE_LIN:
a = self.a
b = self.b
if (a, b) != (1, 0):
values = values * a
if b:
values += b
elif conversion_type == v4c.CONVERSION_TYPE_RAT:
P1 = self.P1
P2 = self.P2
P3 = self.P3
P4 = self.P4
P5 = self.P5
P6 = self.P6
X = values
if (P1, P4, P5, P6) == (0, 0, 0, 1):
if (P2, P3) != (1, 0):
values = values * P2
if P3:
values += P3
elif (P3, P4, P5, P6) == (0, 0, 1, 0):
if (P1, P2) != (1, 0):
values = values * P1
if P2:
values += P2
else:
try:
values = evaluate(v4c.CONV_RAT_TEXT)
except TypeError:
values = (P1 * X ** 2 + P2 * X + P3) / (P4 * X ** 2 + P5 * X + P6)
elif conversion_type == v4c.CONVERSION_TYPE_ALG:
X = values
values = evaluate(self.formula)
elif conversion_type in (v4c.CONVERSION_TYPE_TABI, v4c.CONVERSION_TYPE_TAB):
nr = self.val_param_nr // 2
raw_vals = np.array([self[f"raw_{i}"] for i in range(nr)])
phys = np.array([self[f"phys_{i}"] for i in range(nr)])
if conversion_type == v4c.CONVERSION_TYPE_TABI:
values = np.interp(values, raw_vals, phys)
else:
idx = np.searchsorted(raw_vals, values)
idx = np.clip(idx, 0, len(raw_vals) - 1)
values = phys[idx]
elif conversion_type == v4c.CONVERSION_TYPE_RTAB:
nr = (self.val_param_nr - 1) // 3
lower = np.array([self[f"lower_{i}"] for i in range(nr)])
upper = np.array([self[f"upper_{i}"] for i in range(nr)])
phys = np.array([self[f"phys_{i}"] for i in range(nr)])
default = self.default
if values.dtype.kind == "f":
idx1 = np.searchsorted(lower, values, side="right") - 1
idx2 = np.searchsorted(upper, values, side="right")
else:
idx1 = np.searchsorted(lower, values, side="right") - 1
idx2 = np.searchsorted(upper, values, side="right") - 1
idx_ne = np.nonzero(idx1 != idx2)[0]
idx_eq = np.nonzero(idx1 == idx2)[0]
new_values = np.zeros(len(values), dtype=phys.dtype)
if len(idx_ne):
new_values[idx_ne] = default
if len(idx_eq):
new_values[idx_eq] = phys[idx1[idx_eq]]
values = new_values
elif conversion_type == v4c.CONVERSION_TYPE_TABX:
nr = self.val_param_nr
raw_vals = np.array([self[f"val_{i}"] for i in range(nr)])
phys = []
for i in range(nr):
try:
value = self.referenced_blocks[f"text_{i}"].text
except AttributeError:
value = self.referenced_blocks[f"text_{i}"]
except TypeError:
value = b""
phys.append(value)
default = self.referenced_blocks.get("default_addr", {})
try:
default = default.text
except AttributeError:
pass
except TypeError:
default = b""
phys.insert(0, default)
raw_vals = np.insert(raw_vals, 0, raw_vals[0] - 1)
indexes = np.searchsorted(raw_vals, values)
np.place(indexes, indexes >= len(raw_vals), 0)
all_values = list(phys) + [default]
if all(isinstance(val, bytes) for val in all_values):
phys = np.array(phys)
values = phys[indexes]
else:
new_values = []
for i, idx in enumerate(indexes):
item = phys[idx]
if isinstance(item, bytes):
new_values.append(item)
else:
new_values.append(item.convert(values[i : i + 1])[0])
if all(isinstance(v, bytes) for v in new_values):
values = np.array(new_values)
else:
values = np.array(
[np.nan if isinstance(v, bytes) else v for v in new_values]
)
elif conversion_type == v4c.CONVERSION_TYPE_RTABX:
nr = self.val_param_nr // 2
phys = []
for i in range(nr):
try:
value = self.referenced_blocks[f"text_{i}"].text
except AttributeError:
value = self.referenced_blocks[f"text_{i}"]
except TypeError:
value = b""
phys.append(value)
default = self.referenced_blocks.get("default_addr", {})
try:
default = default.text
except AttributeError:
pass
except TypeError:
default = b""
lower = np.array([self[f"lower_{i}"] for i in range(nr)])
upper = np.array([self[f"upper_{i}"] for i in range(nr)])
all_values = phys + [default]
idx1 = np.searchsorted(lower, values, side="right") - 1
idx2 = np.searchsorted(upper, values, side="right")
idx_ne = np.nonzero(idx1 != idx2)[0]
idx_eq = np.nonzero(idx1 == idx2)[0]
if all(isinstance(val, bytes) for val in all_values):
phys = np.array(phys)
all_values = np.array(all_values)
new_values = np.zeros(len(values), dtype=all_values.dtype)
if len(idx_ne):
new_values[idx_ne] = default
if len(idx_eq):
new_values[idx_eq] = phys[idx1[idx_eq]]
values = new_values
else:
new_values = []
for i, val in enumerate(values):
if i in idx_ne:
item = default
else:
item = phys[idx1[i]]
if isinstance(item, bytes):
new_values.append(item)
else:
new_values.append(item.convert(values[i : i + 1])[0])
if all(isinstance(v, bytes) for v in new_values):
values = np.array(new_values)
else:
values = np.array(
[np.nan if isinstance(v, bytes) else v for v in new_values]
)
elif conversion_type == v4c.CONVERSION_TYPE_TTAB:
nr = self.val_param_nr - 1
raw_values = [
self.referenced_blocks[f"text_{i}"].text.strip(b"\0") for i in range(nr)
]
phys = [self[f"val_{i}"] for i in range(nr)]
default = self.val_default
new_values = []
for val in values:
try:
val = phys[raw_values.index(val)]
except ValueError:
val = default
new_values.append(val)
values = np.array(new_values)
elif conversion_type == v4c.CONVERSION_TYPE_TRANS:
nr = (self.ref_param_nr - 1) // 2
in_ = [
self.referenced_blocks[f"input_{i}_addr"].text.strip(b"\0")
for i in range(nr)
]
out_ = [
self.referenced_blocks[f"output_{i}_addr"].text.strip(b"\0")
for i in range(nr)
]
default = self.referenced_blocks["default_addr"].text.strip(b"\0")
new_values = []
for val in values:
try:
val = out_[in_.index(val.strip(b"\0"))]
except ValueError:
val = default
new_values.append(val)
values = np.array(new_values)
return values
def metadata(self, indent=""):
if self.conversion_type == v4c.CONVERSION_TYPE_NON:
keys = v4c.KEYS_CONVERSION_NONE
elif self.conversion_type == v4c.CONVERSION_TYPE_LIN:
keys = v4c.KEYS_CONVERSION_LINEAR
elif self.conversion_type == v4c.CONVERSION_TYPE_RAT:
keys = v4c.KEYS_CONVERSION_RAT
elif self.conversion_type == v4c.CONVERSION_TYPE_ALG:
keys = v4c.KEYS_CONVERSION_ALGEBRAIC
elif self.conversion_type in (
v4c.CONVERSION_TYPE_TABI,
v4c.CONVERSION_TYPE_TAB,
):
keys = v4c.KEYS_CONVERSION_NONE
for i in range(self.val_param_nr // 2):
keys += (f"raw_{i}", f"phys_{i}")
elif self.conversion_type == v4c.CONVERSION_TYPE_RTAB:
keys = v4c.KEYS_CONVERSION_NONE
for i in range(self.val_param_nr // 3):
keys += (f"lower_{i}", f"upper_{i}", f"phys_{i}")
keys += ("default",)
elif self.conversion_type == v4c.CONVERSION_TYPE_TABX:
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"name_addr",
"unit_addr",
"comment_addr",
"inv_conv_addr",
)
keys += tuple(f"text_{i}" for i in range(self.links_nr - 4 - 1))
keys += ("default_addr",)
keys += (
"conversion_type",
"precision",
"flags",
"ref_param_nr",
"val_param_nr",
"min_phy_value",
"max_phy_value",
)
keys += tuple(f"val_{i}" for i in range(self.val_param_nr))
elif self.conversion_type == v4c.CONVERSION_TYPE_RTABX:
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"name_addr",
"unit_addr",
"comment_addr",
"inv_conv_addr",
)
keys += tuple(f"text_{i}" for i in range(self.links_nr - 4 - 1))
keys += ("default_addr",)
keys += (
"conversion_type",
"precision",
"flags",
"ref_param_nr",
"val_param_nr",
"min_phy_value",
"max_phy_value",
)
for i in range(self.val_param_nr // 2):
keys += (f"lower_{i}", f"upper_{i}")
elif self.conversion_type == v4c.CONVERSION_TYPE_TTAB:
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"name_addr",
"unit_addr",
"comment_addr",
"inv_conv_addr",
)
keys += tuple(f"text_{i}" for i in range(self.links_nr - 4))
keys += (
"conversion_type",
"precision",
"flags",
"ref_param_nr",
"val_param_nr",
"min_phy_value",
"max_phy_value",
)
keys += tuple(f"val_{i}" for i in range(self.val_param_nr - 1))
keys += ("val_default",)
elif self.conversion_type == v4c.CONVERSION_TYPE_TRANS:
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"name_addr",
"unit_addr",
"comment_addr",
"inv_conv_addr",
)
for i in range((self.links_nr - 4 - 1) // 2):
keys += (f"input_{i}_addr", f"output_{i}_addr")
keys += (
"default_addr",
"conversion_type",
"precision",
"flags",
"ref_param_nr",
"val_param_nr",
"min_phy_value",
"max_phy_value",
)
keys += tuple(f"val_{i}" for i in range(self.val_param_nr - 1))
max_len = max(len(key) for key in keys)
template = f"{{: <{max_len}}}: {{}}"
metadata = []
lines = f"""
name: {self.name}
unit: {self.unit}
address: {hex(self.address)}
comment: {self.comment}
formula: {self.formula}
""".split("\n")
for key in keys:
val = getattr(self, key)
if key.endswith("addr") or key.startswith("text_"):
lines.append(template.format(key, hex(val)))
elif isinstance(val, float):
lines.append(template.format(key, round(val, 6)))
else:
if isinstance(val, bytes):
lines.append(template.format(key, val.strip(b"\0")))
else:
lines.append(template.format(key, val))
if self.referenced_blocks:
max_len = max(len(key) for key in self.referenced_blocks)
template = f"{{: <{max_len}}}: {{}}"
lines.append("")
lines.append("Referenced blocks:")
for key, block in self.referenced_blocks.items():
if isinstance(block, TextBlock):
lines.append(template.format(key, block["text"].strip(b"\0")))
else:
lines.append(template.format(key, ""))
lines.extend(block.metadata(indent + " ").split("\n"))
for line in lines:
if not line:
metadata.append(line)
else:
for wrapped_line in wrap(
line, initial_indent=indent, subsequent_indent=indent, width=120
):
metadata.append(wrapped_line)
return "\n".join(metadata)
def __getitem__(self, item):
try:
return self.__getattribute__(item)
except:
print(self)
raise
def __setitem__(self, item, value):
self.__setattr__(item, value)
def __contains__(self, item):
return hasattr(self, item)
def __bytes__(self):
if self.conversion_type == v4c.CONVERSION_TYPE_NON:
result = v4c.CONVERSION_NONE_PACK(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
)
elif self.conversion_type == v4c.CONVERSION_TYPE_LIN:
result = v4c.CONVERSION_LINEAR_PACK(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
self.b,
self.a,
)
elif self.conversion_type == v4c.CONVERSION_TYPE_RAT:
result = v4c.CONVERSION_RAT_PACK(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
self.P1,
self.P2,
self.P3,
self.P4,
self.P5,
self.P6,
)
elif self.conversion_type == v4c.CONVERSION_TYPE_ALG:
result = v4c.CONVERSION_ALGEBRAIC_PACK(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.name_addr,
self.unit_addr,
self.comment_addr,
self.inv_conv_addr,
self.formula_addr,
self.conversion_type,
self.precision,
self.flags,
self.ref_param_nr,
self.val_param_nr,
self.min_phy_value,
self.max_phy_value,
)
elif self.conversion_type in (
v4c.CONVERSION_TYPE_TABI,
v4c.CONVERSION_TYPE_TAB,
):
fmt = "<4sI{}Q2B3H{}d".format(self.links_nr + 2, self.val_param_nr + 2)
keys = v4c.KEYS_CONVERSION_NONE
for i in range(self.val_param_nr // 2):
keys += (f"raw_{i}", f"phys_{i}")
result = pack(fmt, *[getattr(self, key) for key in keys])
elif self.conversion_type == v4c.CONVERSION_TYPE_RTAB:
fmt = "<4sI{}Q2B3H{}d".format(self.links_nr + 2, self.val_param_nr + 2)
keys = v4c.KEYS_CONVERSION_NONE
for i in range(self.val_param_nr // 3):
keys += (f"lower_{i}", f"upper_{i}", f"phys_{i}")
keys += ("default",)
result = pack(fmt, *[getattr(self, key) for key in keys])
elif self.conversion_type == v4c.CONVERSION_TYPE_TABX:
fmt = "<4sI{}Q2B3H{}d".format(self.links_nr + 2, self.val_param_nr + 2)
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"name_addr",
"unit_addr",
"comment_addr",
"inv_conv_addr",
)
keys += tuple(f"text_{i}" for i in range(self.links_nr - 4 - 1))
keys += ("default_addr",)
keys += (
"conversion_type",
"precision",
"flags",
"ref_param_nr",
"val_param_nr",
"min_phy_value",
"max_phy_value",
)
keys += tuple(f"val_{i}" for i in range(self.val_param_nr))
result = pack(fmt, *[getattr(self, key) for key in keys])
elif self.conversion_type == v4c.CONVERSION_TYPE_RTABX:
fmt = "<4sI{}Q2B3H{}d".format(self.links_nr + 2, self.val_param_nr + 2)
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"name_addr",
"unit_addr",
"comment_addr",
"inv_conv_addr",
)
keys += tuple(f"text_{i}" for i in range(self.links_nr - 4 - 1))
keys += ("default_addr",)
keys += (
"conversion_type",
"precision",
"flags",
"ref_param_nr",
"val_param_nr",
"min_phy_value",
"max_phy_value",
)
for i in range(self.val_param_nr // 2):
keys += (f"lower_{i}", f"upper_{i}")
result = pack(fmt, *[getattr(self, key) for key in keys])
elif self.conversion_type == v4c.CONVERSION_TYPE_TTAB:
fmt = "<4sI{}Q2B3H{}d".format(self.links_nr + 2, self.val_param_nr + 2)
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"name_addr",
"unit_addr",
"comment_addr",
"inv_conv_addr",
)
keys += tuple(f"text_{i}" for i in range(self.links_nr - 4))
keys += (
"conversion_type",
"precision",
"flags",
"ref_param_nr",
"val_param_nr",
"min_phy_value",
"max_phy_value",
)
keys += tuple(f"val_{i}" for i in range(self.val_param_nr - 1))
keys += ("val_default",)
result = pack(fmt, *[getattr(self, key) for key in keys])
elif self.conversion_type == v4c.CONVERSION_TYPE_TRANS:
fmt = "<4sI{}Q2B3H{}d".format(self.links_nr + 2, self.val_param_nr + 2)
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"name_addr",
"unit_addr",
"comment_addr",
"inv_conv_addr",
)
for i in range((self.links_nr - 4 - 1) // 2):
keys += (f"input_{i}_addr", f"output_{i}_addr")
keys += (
"default_addr",
"conversion_type",
"precision",
"flags",
"ref_param_nr",
"val_param_nr",
"min_phy_value",
"max_phy_value",
)
keys += tuple(f"val_{i}" for i in range(self.val_param_nr - 1))
result = pack(fmt, *[getattr(self, key) for key in keys])
return result
def __str__(self):
return "<ChannelConversion (name: {}, unit: {}, comment: {}, formula: {}, referenced blocks: {}, address: {}, fields: {})>".format(
self.name,
self.unit,
self.comment,
self.formula,
self.referenced_blocks,
self.address,
block_fields(self),
)
[docs]class DataBlock:
"""Common implementation for DTBLOCK/RDBLOCK/SDBLOCK
*DataBlock* has the following attributes, that are also available as
dict like key-value pairs
DTBLOCK fields
* ``id`` - bytes : block ID; b'##DT' for DTBLOCK, b'##RD' for RDBLOCK or
b'##SD' for SDBLOCK
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``data`` - bytes : raw samples
Other attributes
* ``address`` - int : data block address
Parameters
----------
address : int
DTBLOCK/RDBLOCK/SDBLOCK address inside the file
stream : int
file handle
reduction : bool
sample reduction data block
"""
__slots__ = ("address", "id", "reserved0", "block_len", "links_nr", "data")
def __init__(self, **kwargs):
try:
self.address = address = kwargs["address"]
stream = kwargs["stream"]
mapped = kwargs.get("mapped", False)
if mapped:
(self.id, self.reserved0, self.block_len, self.links_nr) = COMMON_uf(
stream, address
)
address += COMMON_SIZE
self.data = stream[address: address + self.block_len]
else:
stream.seek(address)
(self.id, self.reserved0, self.block_len, self.links_nr) = COMMON_u(
stream.read(COMMON_SIZE)
)
self.data = stream.read(self.block_len - COMMON_SIZE)
if self.id not in (b"##DT", b"##RD", b"##SD"):
message = f'Expected "##DT", "##RD" or "##SD" block @{hex(address)} but found "{self.id}"'
logger.exception(message)
raise MdfException(message)
except KeyError:
self.address = 0
type = kwargs.get("type", "DT")
if type not in ("DT", "SD", "RD"):
type = "DT"
self.id = "##{}".format(type).encode("ascii")
self.reserved0 = 0
self.block_len = len(kwargs["data"]) + COMMON_SIZE
self.links_nr = 0
self.data = kwargs["data"]
def __getitem__(self, item):
return self.__getattribute__(item)
def __setitem__(self, item, value):
self.__setattr__(item, value)
def __bytes__(self):
return v4c.COMMON_p(
self.id, self.reserved0, self.block_len, self.links_nr
) + self.data
class DataZippedBlock(object):
"""*DataZippedBlock* has the following attributes, that are also available
as dict like key-value pairs
DZBLOCK fields
* ``id`` - bytes : block ID; always b'##DZ'
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``original_type`` - bytes : b'DT' or b'SD'
* ``zip_type`` - int : zip algorithm used
* ``reserved1`` - int : reserved bytes
* ``param`` - int : for transpose deflate the record size used for
transposition
* ``original_size`` - int : size of the original uncompressed raw bytes
* ``zip_size`` - int : size of compressed bytes
* ``data`` - bytes : compressed bytes
Other attributes
* ``address`` - int : data zipped block address
* ``return_unzipped`` - bool : decompress data when accessing the 'data'
key
Parameters
----------
address : int
DTBLOCK address inside the file
stream : int
file handle
"""
__slots__ = (
"address",
"_prevent_data_setitem",
"return_unzipped",
"id",
"reserved0",
"block_len",
"links_nr",
"original_type",
"zip_type",
"reserved1",
"param",
"original_size",
"zip_size",
"data",
)
def __init__(self, **kwargs):
self._prevent_data_setitem = True
try:
self.address = address = kwargs["address"]
stream = kwargs["stream"]
stream.seek(address)
(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.original_type,
self.zip_type,
self.reserved1,
self.param,
self.original_size,
self.zip_size,
) = unpack(v4c.FMT_DZ_COMMON, stream.read(v4c.DZ_COMMON_SIZE))
self.data = stream.read(self.zip_size)
if self.id != b"##DZ":
message = f'Expected "##DZ" block @{hex(address)} but found "{self.id}"'
logger.exception(message)
raise MdfException(message)
except KeyError:
self._prevent_data_setitem = False
self.address = 0
data = kwargs["data"]
self.id = b"##DZ"
self.reserved0 = 0
self.block_len = 0
self.links_nr = 0
self.original_type = kwargs.get("original_type", b"DT")
self.zip_type = kwargs.get("zip_type", v4c.FLAG_DZ_DEFLATE)
self.reserved1 = 0
if self.zip_type == v4c.FLAG_DZ_DEFLATE:
self.param = 0
else:
self.param = kwargs["param"]
# since prevent_data_setitem is False the rest of the keys will be
# handled by __setitem__
self.data = data
self._prevent_data_setitem = False
self.return_unzipped = True
def __setattr__(self, item, value):
if item == "data" and not self._prevent_data_setitem:
data = value
original_size = len(data)
self.original_size = original_size
if self.zip_type == v4c.FLAG_DZ_DEFLATE:
data = compress(data)
else:
cols = self.param
lines = original_size // cols
if lines * cols < original_size:
data = (
np.fromstring(data[: lines * cols], dtype=np.uint8)
.reshape((lines, cols))
.T.tostring()
) + data[lines * cols :]
else:
data = (
np.fromstring(data, dtype=np.uint8)
.reshape((lines, cols))
.T.tostring()
)
data = compress(data, 1)
zipped_size = len(data)
self.zip_size = zipped_size
self.block_len = zipped_size + v4c.DZ_COMMON_SIZE
DataZippedBlock.__dict__[item].__set__(self, data)
else:
DataZippedBlock.__dict__[item].__set__(self, value)
def __getattribute__(self, item):
if item == "data":
if self.return_unzipped:
data = DataZippedBlock.__dict__[item].__get__(self)
original_size = self.original_size
data = decompress(data, 0, original_size)
if self.zip_type == v4c.FLAG_DZ_TRANPOSED_DEFLATE:
cols = self.param
lines = original_size // cols
if lines * cols < original_size:
data = (
np.fromstring(data[: lines * cols], dtype=np.uint8)
.reshape((cols, lines))
.T.tostring()
) + data[lines * cols :]
else:
data = (
np.fromstring(data, dtype=np.uint8)
.reshape((cols, lines))
.T.tostring()
)
else:
data = DataZippedBlock.__dict__[item].__get__(self)
value = data
else:
value = DataZippedBlock.__dict__[item].__get__(self)
return value
def __setitem__(self, item, value):
self.__setattr__(item, value)
def __getitem__(self, item):
return self.__getattribute__(item)
def __bytes__(self):
self.return_unzipped = False
data = v4c.DZ_COMMON_p(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.original_type,
self.zip_type,
self.reserved1,
self.param,
self.original_size,
self.zip_size,
) + self.data
self.return_unzipped = True
return data
[docs]class DataGroup:
"""
*DataGroup* has the following attributes, that are also available as
dict like key-value pairs
DGBLOCK fields
* ``id`` - bytes : block ID; always b'##DG'
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``next_dg_addr`` - int : address of next data group block
* ``first_cg_addr`` - int : address of first channel group for this data
group
* ``data_block_addr`` - int : address of DTBLOCK, DZBLOCK, DLBLOCK or
HLBLOCK that contains the raw samples for this data group
* ``comment_addr`` - int : address of TXBLOCK/MDBLOCK tha contains the
data group comment
* ``record_id_len`` - int : size of record ID used in case of unsorted
data groups; can be 1, 2, 4 or 8
* ``reserved1`` - int : reserved bytes
Other attributes
* ``address`` - int : dat group address
* ``comment`` - str : data group comment
"""
__slots__ = (
"address",
"comment",
"id",
"reserved0",
"block_len",
"links_nr",
"next_dg_addr",
"first_cg_addr",
"data_block_addr",
"comment_addr",
"record_id_len",
"reserved1",
)
def __init__(self, **kwargs):
self.comment = ""
try:
self.address = address = kwargs["address"]
stream = kwargs["stream"]
mapped = kwargs.get("mapped", False)
if mapped:
stream.seek(address)
(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.next_dg_addr,
self.first_cg_addr,
self.data_block_addr,
self.comment_addr,
self.record_id_len,
self.reserved1,
) = v4c.DATA_GROUP_uf(stream, address)
else:
stream.seek(address)
(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.next_dg_addr,
self.first_cg_addr,
self.data_block_addr,
self.comment_addr,
self.record_id_len,
self.reserved1,
) = v4c.DATA_GROUP_u(stream.read(v4c.DG_BLOCK_SIZE))
if self.id != b"##DG":
message = f'Expected "##DG" block @{hex(address)} but found "{self.id}"'
logger.exception(message)
raise MdfException(message)
self.comment = get_text_v4(self.comment_addr, stream, mapped=mapped)
except KeyError:
self.address = 0
self.id = b"##DG"
self.reserved0 = kwargs.get("reserved0", 0)
self.block_len = kwargs.get("block_len", v4c.DG_BLOCK_SIZE)
self.links_nr = kwargs.get("links_nr", 4)
self.next_dg_addr = kwargs.get("next_dg_addr", 0)
self.first_cg_addr = kwargs.get("first_cg_addr", 0)
self.data_block_addr = kwargs.get("data_block_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.record_id_len = kwargs.get("record_id_len", 0)
self.reserved1 = kwargs.get("reserved1", b"\00" * 7)
def copy(self):
dg = DataGroup(
id=self.id,
reserved0=self.reserved0,
block_len=self.block_len,
links_nr=self.links_nr,
next_dg_addr=self.next_dg_addr,
first_cg_addr=self.first_cg_addr,
data_block_addr=self.data_block_addr,
comment_addr=self.comment_addr,
record_id_len=self.record_id_len,
reserved1=self.reserved1,
)
dg.comment = self.comment
dg.address = self.address
return dg
def to_blocks(self, address, blocks, defined_texts):
text = self.comment
if text:
if text in defined_texts:
self.comment_addr = defined_texts[text]
else:
meta = text.startswith("<DGcomment")
tx_block = TextBlock(text=text, meta=meta)
self.comment_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.comment_addr = 0
blocks.append(self)
self.address = address
address += self.block_len
return address
def __getitem__(self, item):
return self.__getattribute__(item)
def __setitem__(self, item, value):
self.__setattr__(item, value)
def __bytes__(self):
result = v4c.DATA_GROUP_p(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.next_dg_addr,
self.first_cg_addr,
self.data_block_addr,
self.comment_addr,
self.record_id_len,
self.reserved1,
)
return result
class _DataListBase:
__slots__ = (
"address",
"id",
"reserved0",
"block_len",
"links_nr",
"next_dl_addr",
"flags",
"reserved1",
"data_block_nr",
"data_block_len",
)
[docs]class DataList(_DataListBase):
"""
*DataList* has the following attributes, that are also available as
dict like key-value pairs
DLBLOCK common fields
* ``id`` - bytes : block ID; always b'##DL'
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``next_dl_addr`` - int : address of next DLBLOCK
* ``data_block_addr<N>`` - int : address of N-th data block
* ``flags`` - int : data list flags
* ``reserved1`` - int : reserved bytes
* ``data_block_nr`` - int : number of data blocks referenced by thsi list
DLBLOCK specific fields
* for equall lenght blocks
* ``data_block_len`` - int : equall uncompressed size in bytes for all
referenced data blocks; last block can be smaller
* for variable lenght blocks
* ``offset_<N>`` - int : byte offset of N-th data block
Other attributes
* ``address`` - int : data list address
"""
def __init__(self, **kwargs):
try:
self.address = address = kwargs["address"]
stream = kwargs["stream"]
mapped = kwargs.get("mapped", False)
if mapped:
(self.id, self.reserved0, self.block_len, self.links_nr) = COMMON_uf(
stream, address
)
address += COMMON_SIZE
links = unpack_from(
f"<{self.links_nr}Q", stream, address
)
self.next_dl_addr = links[0]
for i, addr in enumerate(links[1:]):
self[f"data_block_addr{i}"] = addr
stream.seek(address + self.links_nr * 8)
self.flags = stream.read(1)[0]
if self.flags & v4c.FLAG_DL_EQUAL_LENGHT:
(self.reserved1, self.data_block_nr, self.data_block_len) = unpack(
"<3sIQ", stream.read(15)
)
else:
(self.reserved1, self.data_block_nr) = unpack("<3sI", stream.read(7))
offsets = unpack(
"<{}Q".format(self.links_nr - 1),
stream.read((self.links_nr - 1) * 8),
)
for i, offset in enumerate(offsets):
self[f"offset_{i}"] = offset
else:
stream.seek(address)
(self.id, self.reserved0, self.block_len, self.links_nr) = COMMON_u(
stream.read(COMMON_SIZE)
)
links = unpack(
f"<{self.links_nr}Q", stream.read(self.links_nr * 8)
)
self.next_dl_addr = links[0]
for i, addr in enumerate(links[1:]):
self[f"data_block_addr{i}"] = addr
self.flags = stream.read(1)[0]
if self.flags & v4c.FLAG_DL_EQUAL_LENGHT:
(self.reserved1, self.data_block_nr, self.data_block_len) = unpack(
"<3sIQ", stream.read(15)
)
else:
(self.reserved1, self.data_block_nr) = unpack("<3sI", stream.read(7))
offsets = unpack(
"<{}Q".format(self.links_nr - 1),
stream.read((self.links_nr - 1) * 8),
)
for i, offset in enumerate(offsets):
self[f"offset_{i}"] = offset
if self.id != b"##DL":
message = f'Expected "##DL" block @{hex(address)} but found "{self.id}"'
logger.exception(message)
raise MdfException(message)
except KeyError:
self.address = 0
self.id = b"##DL"
self.reserved0 = 0
self.block_len = 40 + 8 * kwargs.get("links_nr", 2)
self.links_nr = kwargs.get("links_nr", 2)
self.next_dl_addr = 0
for i in range(self.links_nr - 1):
self[f"data_block_addr{i}"] = kwargs.get(f"data_block_addr{i}", 0)
self.flags = kwargs.get("flags", 1)
self.reserved1 = kwargs.get("reserved1", b"\0\0\0")
self.data_block_nr = kwargs.get("data_block_nr", 1)
if self.flags & v4c.FLAG_DL_EQUAL_LENGHT:
self.data_block_len = kwargs["data_block_len"]
else:
for i, offset in enumerate(self.links_nr - 1):
self[f"offset_{i}"] = kwargs[f"offset_{i}"]
def __getitem__(self, item):
return self.__getattribute__(item)
def __setitem__(self, item, value):
self.__setattr__(item, value)
def __bytes__(self):
fmt = v4c.FMT_DATA_LIST.format(self.links_nr)
keys = ("id", "reserved0", "block_len", "links_nr", "next_dl_addr")
keys += tuple(f"data_block_addr{i}" for i in range(self.links_nr - 1))
keys += ("flags", "reserved1", "data_block_nr", "data_block_len")
result = pack(fmt, *[getattr(self, key) for key in keys])
return result
class _EventBlockBase:
__slots__ = (
"address",
"comment",
"name",
"parent",
"range_start",
"scopes",
"id",
"reserved0",
"block_len",
"links_nr",
"next_ev_addr",
"parent_ev_addr",
"range_start_ev_addr",
"name_addr",
"comment_addr",
"event_type",
"sync_type",
"range_type",
"cause",
"flags",
"reserved1",
"scope_nr",
"attachment_nr",
"creator_index",
"sync_base",
"sync_factor",
)
[docs]class EventBlock(_EventBlockBase):
"""
*EventBlock* has the following attributes, that are also available as
dict like key-value pairs
EVBLOCK fields
* ``id`` - bytes : block ID; always b'##EV'
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``next_ev_addr`` - int : address of next EVBLOCK
* ``parent_ev_addr`` - int : address of parent EVLBOCK
* ``range_start_ev_addr`` - int : address of EVBLOCK that is the start of
the range for which this event is the end
* ``name_addr`` - int : address of TXBLOCK that contains the event name
* ``comment_addr`` - int : address of TXBLOCK/MDBLOCK that contains the
event comment
* ``scope_<N>_addr`` - int : address of N-th block that represents a scope
for this event (can be CGBLOCK, CHBLOCK, DGBLOCK)
* ``attachemnt_<N>_addr`` - int : address of N-th attachment referenced by
this event
* ``event_type`` - int : integer code for event type
* ``sync_type`` - int : integer code for event sync type
* ``range_type`` - int : integer code for event range type
* ``cause`` - int : integer code for event cause
* ``flags`` - int : event flags
* ``reserved1`` - int : reserved bytes
* ``scope_nr`` - int : number of scopes referenced by this event
* ``attachment_nr`` - int : number of attachments referenced by this event
* ``creator_index`` - int : index of FHBLOCK
* ``sync_base`` - int : timestamp base value
* ``sync_factor`` - float : timestamp factor
Other attributes
* ``address`` - int : event block address
* ``comment`` - str : event comment
* ``name`` - str : event name
* ``parent`` - int : index of event block that is the parent for the
current event
* ``range_start`` - int : index of event block that is the start of the
range for which the current event is the end
* ``scopes`` - list : list of (group index, channel index) or channel group
index that define the scope of the current event
"""
def __init__(self, **kwargs):
self.name = self.comment = ""
self.scopes = []
self.parent = None
self.range_start = None
if "stream" in kwargs:
self.address = address = kwargs["address"]
stream = kwargs["stream"]
stream.seek(address)
(self.id, self.reserved0, self.block_len, self.links_nr) = COMMON_u(
stream.read(COMMON_SIZE)
)
block = stream.read(self.block_len - COMMON_SIZE)
links_nr = self.links_nr
links = unpack_from(f"<{links_nr}Q", block)
params = unpack_from(v4c.FMT_EVENT_PARAMS, block, links_nr * 8)
(
self.next_ev_addr,
self.parent_ev_addr,
self.range_start_ev_addr,
self.name_addr,
self.comment_addr,
) = links[:5]
scope_nr = params[6]
for i in range(scope_nr):
self[f"scope_{i}_addr"] = links[5 + i]
attachment_nr = params[7]
for i in range(attachment_nr):
self[f"attachment_{i}_addr"] = links[5 + scope_nr + i]
(
self.event_type,
self.sync_type,
self.range_type,
self.cause,
self.flags,
self.reserved1,
self.scope_nr,
self.attachment_nr,
self.creator_index,
self.sync_base,
self.sync_factor,
) = params
if self.id != b"##EV":
message = f'Expected "##EV" block @{hex(address)} but found "{self.id}"'
logger.exception(message)
raise MdfException(message)
self.name = get_text_v4(self.name_addr, stream)
self.comment = get_text_v4(self.comment_addr, stream)
else:
self.address = 0
scopes = 0
while f"scope_{scopes}_addr" in kwargs:
scopes += 1
self.id = b"##EV"
self.reserved0 = 0
self.block_len = 56 + (scopes + 5) * 8
self.links_nr = scopes + 5
self.next_ev_addr = kwargs.get("next_ev_addr", 0)
self.parent_ev_addr = kwargs.get("parent_ev_addr", 0)
self.range_start_ev_addr = kwargs.get("range_start_ev_addr", 0)
self.name_addr = kwargs.get("name_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
for i in range(scopes):
self[f"scope_{i}_addr"] = kwargs[f"scope_{i}_addr"]
self.event_type = kwargs.get("event_type", v4c.EVENT_TYPE_TRIGGER)
self.sync_type = kwargs.get("sync_type", v4c.EVENT_SYNC_TYPE_S)
self.range_type = kwargs.get("range_type", v4c.EVENT_RANGE_TYPE_POINT)
self.cause = kwargs.get("cause", v4c.EVENT_CAUSE_TOOL)
self.flags = kwargs.get("flags", v4c.FLAG_EV_POST_PROCESSING)
self.reserved1 = b"\x00\x00\x00"
self.scope_nr = scopes
self.attachment_nr = 0
self.creator_index = 0
self.sync_base = kwargs.get("sync_base", 0)
self.sync_factor = kwargs.get("sync_factor", 1.0)
def update_references(self, ch_map, cg_map):
self.scopes.clear()
for i in range(self.scope_nr):
addr = self[f"scope_{i}_addr"]
if addr in ch_map:
self.scopes.append(ch_map[addr])
elif addr in cg_map:
self.scopes.append(cg_map[addr])
else:
message = (
"{} is not a valid CNBLOCK or CGBLOCK "
"address for the event scope"
)
message = message.format(hex(addr))
logger.exception(message)
raise MdfException(message)
def __bytes__(self):
fmt = v4c.FMT_EVENT.format(self.links_nr)
keys = (
"id",
"reserved0",
"block_len",
"links_nr",
"next_ev_addr",
"parent_ev_addr",
"range_start_ev_addr",
"name_addr",
"comment_addr",
)
keys += tuple(f"scope_{i}_addr" for i in range(self.scope_nr))
keys += tuple(f"attachment_{i}_addr" for i in range(self.attachment_nr))
keys += (
"event_type",
"sync_type",
"range_type",
"cause",
"flags",
"reserved1",
"scope_nr",
"attachment_nr",
"creator_index",
"sync_base",
"sync_factor",
)
result = pack(fmt, *[getattr(self, key) for key in keys])
return result
def __getitem__(self, item):
return self.__getattribute__(item)
def __setitem__(self, item, value):
self.__setattr__(item, value)
def __str__(self):
return "EventBlock (name: {}, comment: {}, address: {}, scopes: {}, fields: {})".format(
self.name, self.comment, hex(self.address), self.scopes, super().__str__()
)
[docs]class FileIdentificationBlock:
"""
*FileIdentificationBlock* has the following attributes, that are also available as
dict like key-value pairs
IDBLOCK fields
* ``file_identification`` - bytes : file identifier
* ``version_str`` - bytes : format identifier
* ``program_identification`` - bytes : creator program identifier
* ``reserved0`` - bytes : reserved bytes
* ``mdf_version`` - int : version number of MDF format
* ``reserved1`` - bytes : reserved bytes
* ``unfinalized_standard_flags`` - int : standard flags for unfinalized MDF
* ``unfinalized_custom_flags`` - int : custom flags for unfinalized MDF
Other attributes
* ``address`` - int : should always be 0
"""
__slots__ = (
"address",
"file_identification",
"version_str",
"program_identification",
"reserved0",
"mdf_version",
"reserved1",
"unfinalized_standard_flags",
"unfinalized_custom_flags",
)
def __init__(self, **kwargs):
super().__init__()
self.address = 0
try:
stream = kwargs["stream"]
stream.seek(self.address)
(
self.file_identification,
self.version_str,
self.program_identification,
self.reserved0,
self.mdf_version,
self.reserved1,
self.unfinalized_standard_flags,
self.unfinalized_custom_flags,
) = unpack(
v4c.FMT_IDENTIFICATION_BLOCK, stream.read(v4c.IDENTIFICATION_BLOCK_SIZE)
)
except KeyError:
version = kwargs.get("version", "4.00")
self.file_identification = "MDF ".encode("utf-8")
self.version_str = "{} ".format(version).encode("utf-8")
self.program_identification = "amdf{}".format(
__version__.replace(".", "")
).encode("utf-8")
self.reserved0 = b"\0" * 4
self.mdf_version = int(version.replace(".", ""))
self.reserved1 = b"\0" * 30
self.unfinalized_standard_flags = 0
self.unfinalized_custom_flags = 0
def __getitem__(self, item):
return self.__getattribute__(item)
def __setitem__(self, item, value):
self.__setattr__(item, value)
def __bytes__(self):
result = pack(
v4c.FMT_IDENTIFICATION_BLOCK,
*[self[key] for key in v4c.KEYS_IDENTIFICATION_BLOCK],
)
return result
[docs]class FileHistory:
"""
*FileHistory* has the following attributes, that are also available as
dict like key-value pairs
FHBLOCK fields
* ``id`` - bytes : block ID; always b'##FH'
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``next_fh_addr`` - int : address of next FHBLOCK
* ``comment_addr`` - int : address of TXBLOCK/MDBLOCK that contains the
file history comment
* ``abs_time`` - int : time stamp at which the file modification happened
* ``tz_offset`` - int : UTC time offset in hours (= GMT time zone)
* ``daylight_save_time`` - int : daylight saving time
* ``time_flags`` - int : time flags
* ``reserved1`` - bytes : reserved bytes
Other attributes
* ``address`` - int : file history address
* ``comment`` - str : history comment
"""
__slots__ = (
"address",
"comment",
"id",
"reserved0",
"block_len",
"links_nr",
"next_fh_addr",
"comment_addr",
"abs_time",
"tz_offset",
"daylight_save_time",
"time_flags",
"reserved1",
)
def __init__(self, **kwargs):
super().__init__()
self.comment = ""
try:
self.address = address = kwargs["address"]
stream = kwargs["stream"]
stream.seek(address)
(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.next_fh_addr,
self.comment_addr,
self.abs_time,
self.tz_offset,
self.daylight_save_time,
self.time_flags,
self.reserved1,
) = unpack(v4c.FMT_FILE_HISTORY, stream.read(v4c.FH_BLOCK_SIZE))
if self.id != b"##FH":
message = f'Expected "##FH" block @{hex(address)} but found "{self.id}"'
logger.exception(message)
raise MdfException(message)
self.comment = get_text_v4(address=self.comment_addr, stream=stream)
except KeyError:
self.id = b"##FH"
self.reserved0 = kwargs.get("reserved0", 0)
self.block_len = kwargs.get("block_len", v4c.FH_BLOCK_SIZE)
self.links_nr = kwargs.get("links_nr", 2)
self.next_fh_addr = kwargs.get("next_fh_addr", 0)
self.comment_addr = kwargs.get("comment_addr", 0)
self.abs_time = kwargs.get("abs_time", int(time.time()) * 10 ** 9)
self.tz_offset = kwargs.get("tz_offset", 120)
self.daylight_save_time = kwargs.get("daylight_save_time", 60)
self.time_flags = kwargs.get("time_flags", 2)
self.reserved1 = kwargs.get("reserved1", b"\x00" * 3)
def to_blocks(self, address, blocks, defined_texts):
text = self.comment
if text:
if text in defined_texts:
self.comment_addr = defined_texts[text]
else:
meta = text.startswith("<FHcomment")
tx_block = TextBlock(text=text, meta=meta)
self.comment_addr = address
defined_texts[text] = address
tx_block.address = address
address += tx_block.block_len
blocks.append(tx_block)
else:
self.comment_addr = 0
blocks.append(self)
self.address = address
address += self.block_len
return address
def __getitem__(self, item):
return self.__getattribute__(item)
def __setitem__(self, item, value):
self.__setattr__(item, value)
def __bytes__(self):
result = pack(
v4c.FMT_FILE_HISTORY, *[self[key] for key in v4c.KEYS_FILE_HISTORY]
)
return result
class HeaderList:
"""
*HeaderList* has the following attributes, that are also available as
dict like key-value pairs
HLBLOCK fields
* ``id`` - bytes : block ID; always b'##HL'
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``first_dl_addr`` - int : address of first data list block for this header
list
* ``flags`` - int : source flags
* ``zip_type`` - int : integer code for zip type
* ``reserved1`` - bytes : reserved bytes
Other attributes
* ``address`` - int : header list address
"""
__slots__ = (
"address",
"id",
"reserved0",
"block_len",
"links_nr",
"first_dl_addr",
"flags",
"zip_type",
"reserved1",
)
def __init__(self, **kwargs):
super().__init__()
try:
self.address = address = kwargs["address"]
stream = kwargs["stream"]
stream.seek(address)
(
self.id,
self.reserved0,
self.block_len,
self.links_nr,
self.first_dl_addr,
self.flags,
self.zip_type,
self.reserved1,
) = unpack(v4c.FMT_HL_BLOCK, stream.read(v4c.HL_BLOCK_SIZE))
if self.id != b"##HL":
message = f'Expected "##HL" block @{hex(address)} but found "{self.id}"'
logger.exception(message)
raise MdfException(message)
except KeyError:
self.address = 0
self.id = b"##HL"
self.reserved0 = 0
self.block_len = v4c.HL_BLOCK_SIZE
self.links_nr = 1
self.first_dl_addr = kwargs.get("first_dl_addr", 0)
self.flags = 1
self.zip_type = kwargs.get("zip_type", 0)
self.reserved1 = b"\x00" * 5
def __getitem__(self, item):
return self.__getattribute__(item)
def __setitem__(self, item, value):
self.__setattr__(item, value)
def __bytes__(self):
result = pack(v4c.FMT_HL_BLOCK, *[self[key] for key in v4c.KEYS_HL_BLOCK])
return result
[docs]class TextBlock:
"""common TXBLOCK and MDBLOCK class
*TextBlock* has the following attributes, that are also available as
dict like key-value pairs
TXBLOCK fields
* ``id`` - bytes : block ID; b'##TX' for TXBLOCK and b'##MD' for MDBLOCK
* ``reserved0`` - int : reserved bytes
* ``block_len`` - int : block bytes size
* ``links_nr`` - int : number of links
* ``text`` - bytes : actual text content
Other attributes
* ``address`` - int : text block address
Parameters
----------
address : int
block address
stream : handle
file handle
meta : bool
flag to set the block type to MDBLOCK for dynamically created objects; default *False*
text : bytes/str
text content for dynamically created objects
"""
__slots__ = ("address", "id", "reserved0", "block_len", "links_nr", "text")
def __init__(self, **kwargs):
super().__init__()
if "stream" in kwargs:
stream = kwargs["stream"]
mapped = kwargs.get("mapped", False)
self.address = address = kwargs["address"]
if mapped:
(self.id, self.reserved0, self.block_len, self.links_nr) = COMMON_uf(
stream, address
)
size = self.block_len - COMMON_SIZE
self.text = text = stream[address + COMMON_SIZE: address + self.block_len]
else:
stream.seek(address)
(self.id, self.reserved0, self.block_len, self.links_nr) = COMMON_u(
stream.read(COMMON_SIZE)
)
size = self.block_len - COMMON_SIZE
self.text = text = stream.read(size)
if self.id not in (b"##TX", b"##MD"):
message = f'Expected "##TX" or "##MD" block @{hex(address)} but found "{self.id}"'
logger.exception(message)
raise MdfException(message)
align = size % 8
if align:
self.block_len = size + COMMON_SIZE + 8 - align
else:
if text:
if text[-1]:
self.block_len += 8
else:
self.block_len += 8
else:
self.address = 0
text = kwargs["text"]
try:
text = text.encode("utf-8")
except (AttributeError, UnicodeDecodeError):
pass
size = len(text)
self.id = b"##MD" if kwargs.get("meta", False) else b"##TX"
self.reserved0 = 0
self.links_nr = 0
self.text = text
self.block_len = size + 32 - size % 8
def __getitem__(self, item):
return self.__getattribute__(item)
def __setitem__(self, item, value):
self.__setattr__(item, value)
def __bytes__(self):
return v4c.COMMON_p(
self.id, self.reserved0, self.block_len, self.links_nr
) + pack(f"{self.block_len - COMMON_SIZE}s", self.text)