"""
ASAM MDF version 4 file format module
"""
import logging
import xml.etree.ElementTree as ET
import os
from copy import deepcopy
from functools import reduce
from hashlib import md5
from itertools import chain
from math import ceil
from struct import unpack, unpack_from
from tempfile import TemporaryFile
from zlib import decompress
from pathlib import Path
import mmap
from numpy import (
arange,
array,
array_equal,
concatenate,
dtype,
flip,
float32,
float64,
frombuffer,
linspace,
nonzero,
packbits,
roll,
transpose,
uint8,
uint64,
union1d,
unpackbits,
zeros,
uint32,
fliplr,
searchsorted,
full,
)
from numpy.core.records import fromarrays, fromstring
from canmatrix.formats import loads
from pandas import DataFrame
from . import v4_constants as v4c
from ..signal import Signal
from .conversion_utils import conversion_transfer
from .utils import (
UINT8_u,
UINT16_u,
UINT32_u,
UINT32_uf,
UINT64_u,
UINT64_uf,
CHANNEL_COUNT,
CONVERT,
ChannelsDB,
MdfException,
SignalSource,
as_non_byte_sized_signed_int,
fmt_to_datatype_v4,
get_fmt_v4,
UniqueDB,
debug_channel,
extract_cncomment_xml,
validate_version_argument,
count_channel_groups,
info_to_datatype_v4,
is_file_like,
sanitize_xml,
Group,
)
from .v4_blocks import (
AttachmentBlock,
Channel,
ChannelArrayBlock,
ChannelConversion,
ChannelGroup,
DataBlock,
DataGroup,
DataList,
DataZippedBlock,
EventBlock,
FileHistory,
FileIdentificationBlock,
HeaderBlock,
HeaderList,
SourceInformation,
TextBlock,
)
from ..version import __version__
MASTER_CHANNELS = (v4c.CHANNEL_TYPE_MASTER, v4c.CHANNEL_TYPE_VIRTUAL_MASTER)
COMMON_SIZE = v4c.COMMON_SIZE
COMMON_u = v4c.COMMON_u
COMMON_uf = v4c.COMMON_uf
logger = logging.getLogger("asammdf")
__all__ = ["MDF4"]
[docs]class MDF4(object):
"""The *header* attibute is a *HeaderBlock*.
The *groups* attribute is a list of dicts, each one with the following keys:
* ``data_group`` - DataGroup object
* ``channel_group`` - ChannelGroup object
* ``channels`` - list of Channel objects with the same order as found in the mdf file
* ``channel_dependencies`` - list of *ChannelArrayBlock* in case of channel arrays;
list of Channel objects in case of structure channel composition
* ``data_block`` - address of data block
* ``data_location``- integer code for data location (original file, temporary file or
memory)
* ``data_block_addr`` - list of raw samples starting addresses
* ``data_block_type`` - list of codes for data block type
* ``data_block_size`` - list of raw samples block size
* ``sorted`` - sorted indicator flag
* ``record_size`` - dict that maps record ID's to record sizes in bytes (including invalidation bytes)
* ``param`` - row size used for tranposizition, in case of tranposed zipped blockss
Parameters
----------
name : string
mdf file name (if provided it must be a real file name) or
file-like object
* if *full* the data group binary data block will be memorised in RAM
* if *low* the channel data is read from disk on request, and the
metadata is memorized into RAM
* if *minimum* only minimal data is memorized into RAM
version : string
mdf file version ('4.00', '4.10', '4.11'); default '4.10'
callback : function
keyword only argument: function to call to update the progress; the
function must accept two arguments (the current progress and maximum
progress value)
use_display_names : bool
keyword only argument: for MDF4 files parse the XML channel comment to
search for the display name; XML parsing is quite expensive so setting
this to *False* can decrease the loading times very much; default
*False*
Attributes
----------
attachments : list
list of file attachments
channels_db : dict
used for fast channel access by name; for each name key the value is a
list of (group index, channel index) tuples
events : list
list event blocks
file_comment : TextBlock
file comment TextBlock
file_history : list
list of (FileHistory, TextBlock) pairs
groups : list
list of data group dicts
header : HeaderBlock
mdf file header
identification : FileIdentificationBlock
mdf file start block
masters_db : dict
used for fast master channel access; for each group index key the value
is the master channel index
name : string
mdf file name
version : str
mdf version
"""
_terminate = False
def __init__(self, name=None, version="4.10", **kwargs):
self.groups = []
self.header = None
self.identification = None
self.file_history = []
self.channels_db = ChannelsDB()
self.can_logging_db = {}
self.masters_db = {}
self.attachments = []
self._attachments_cache = {}
self.file_comment = None
self.events = []
self._attachments_map = {}
self._ch_map = {}
self._master_channel_cache = {}
self._master_channel_metadata = {}
self._invalidation_cache = {}
self._si_map = {}
self._file_si_map = {}
self._cc_map = {}
self._file_cc_map = {}
self._cg_map = {}
self._dbc_cache = {}
self._tempfile = TemporaryFile()
self._file = None
self._read_fragment_size = 0 * 2 ** 20
self._write_fragment_size = 4 * 2 ** 20
self._use_display_names = kwargs.get("use_display_names", False)
self._single_bit_uint_as_bool = False
self._integer_interpolation = 0
# make sure no appended block has the address 0
self._tempfile.write(b"\0")
self._callback = kwargs.get("callback", None)
if name:
if is_file_like(name):
self._file = name
self.name = Path("From_FileLike.mf4")
self._from_filelike = True
self._read(mapped=False)
else:
self.name = Path(name)
with open(self.name, "r+b") as x:
self._file = mmap.mmap(x.fileno(), 0)
self._from_filelike = False
self._read(mapped=True)
self._file.close()
self._file = open(self.name, "rb")
else:
self._from_filelike = False
version = validate_version_argument(version)
self.header = HeaderBlock()
self.identification = FileIdentificationBlock(version=version)
self.version = version
self.name = Path("new.mf4")
def _check_finalised(self):
flags = self.identification["unfinalized_standard_flags"]
if flags & 1:
message = (
f"Unfinalised file {self.name}:"
"Update of cycle counters for CG/CA blocks required"
)
logger.warning(message)
elif flags & 1 << 1:
message = f"Unfinalised file {self.name}: Update of cycle counters for SR blocks required"
logger.warning(message)
elif flags & 1 << 2:
message = f"Unfinalised file {self.name}: Update of length for last DT block required"
logger.warning(message)
elif flags & 1 << 3:
message = f"Unfinalised file {self.name}: Update of length for last RD block required"
logger.warning(message)
elif flags & 1 << 4:
message = (
f"Unfinalised file {self.name}:"
"Update of last DL block in each chained list"
"of DL blocks required"
)
logger.warning(message)
elif flags & 1 << 5:
message = (
f"Unfinalised file {self.name}:"
"Update of cg_data_bytes and cg_inval_bytes "
"in VLSD CG block required"
)
logger.warning(message)
elif flags & 1 << 6:
message = (
f"Unfinalised file {self.name}:"
"Update of offset values for VLSD channel required "
"in case a VLSD CG block is used"
)
logger.warning(message)
def _read(self, mapped=False):
stream = self._file
dg_cntr = 0
cg_count, _ = count_channel_groups(stream)
if self._callback:
self._callback(0, cg_count)
current_cg_index = 0
self.identification = FileIdentificationBlock(stream=stream, mapped=mapped)
version = self.identification["version_str"]
self.version = version.decode("utf-8").strip(" \n\t\0")
if self.version >= "4.10":
self._check_finalised()
self.header = HeaderBlock(address=0x40, stream=stream, mapped=mapped)
# read file history
fh_addr = self.header["file_history_addr"]
while fh_addr:
history_block = FileHistory(address=fh_addr, stream=stream, mapped=mapped)
self.file_history.append(history_block)
fh_addr = history_block.next_fh_addr
# read attachments
at_addr = self.header["first_attachment_addr"]
index = 0
while at_addr:
at_block = AttachmentBlock(address=at_addr, stream=stream, mapped=mapped)
self._attachments_map[at_addr] = index
self.attachments.append(at_block)
at_addr = at_block.next_at_addr
index += 1
# go to first date group and read each data group sequentially
dg_addr = self.header.first_dg_addr
while dg_addr:
new_groups = []
group = DataGroup(address=dg_addr, stream=stream, mapped=mapped)
record_id_nr = group.record_id_len
# go to first channel group of the current data group
cg_addr = group.first_cg_addr
cg_nr = 0
cg_size = {}
while cg_addr:
cg_nr += 1
grp = Group(group.copy())
# read each channel group sequentially
block = ChannelGroup(address=cg_addr, stream=stream, mapped=mapped)
self._cg_map[cg_addr] = dg_cntr
channel_group = grp.channel_group = block
grp.record_size = cg_size
if channel_group.flags & v4c.FLAG_CG_VLSD:
# VLDS flag
record_id = channel_group.record_id
cg_size[record_id] = 0
elif channel_group.flags & v4c.FLAG_CG_BUS_EVENT:
bus_type = channel_group.acq_source.bus_type
if bus_type == v4c.BUS_TYPE_CAN:
grp.CAN_logging = True
message_name = channel_group.acq_name
comment = channel_group.acq_source.comment
comment_xml = ET.fromstring(comment)
common_properties = comment_xml.find(".//common_properties")
for e in common_properties:
name = e.get("name")
if name == "ChannelNo":
grp.CAN_id = f"CAN{e.text}"
break
if message_name == "CAN_DataFrame":
# this is a raw CAN bus logging channel group
# it will be later processed to extract all
# signals to new groups (one group per CAN message)
grp.raw_can = True
elif message_name in ("CAN_ErrorFrame", "CAN_RemoteFrame"):
# for now ignore bus logging flag
pass
else:
comment = channel_group.comment
if comment:
comment_xml = ET.fromstring(sanitize_xml(comment))
can_msg_type = comment_xml.find(".//TX").text
if can_msg_type is not None:
can_msg_type = can_msg_type.strip(" \t\r\n")
else:
can_msg_type = "CAN_DataFrame"
if can_msg_type == "CAN_DataFrame":
common_properties = comment_xml.find(
".//common_properties"
)
message_id = -1
for e in common_properties:
name = e.get("name")
if name == "MessageID":
message_id = int(e.text)
break
if message_id > 0:
if message_id > 0x80000000:
message_id -= 0x80000000
grp.extended_id = True
grp.message_name = message_name
grp.message_id = message_id
else:
message = f"Invalid bus logging channel group metadata: {comment}"
logger.warning(message)
else:
message = (
f"Unable to get CAN message information "
f"since channel group @{hex(channel_group.address)} has no metadata"
)
logger.warning(message)
else:
# only CAN bus logging is supported
pass
samples_size = channel_group.samples_byte_nr
inval_size = channel_group.invalidation_bytes_nr
record_id = channel_group.record_id
cg_size[record_id] = samples_size + inval_size
else:
samples_size = channel_group.samples_byte_nr
inval_size = channel_group.invalidation_bytes_nr
record_id = channel_group.record_id
cg_size[record_id] = samples_size + inval_size
if record_id_nr:
grp.sorted = False
else:
grp.sorted = True
# go to first channel of the current channel group
ch_addr = channel_group.first_ch_addr
ch_cntr = 0
neg_ch_cntr = -1
# Read channels by walking recursively in the channel group
# starting from the first channel
self._read_channels(ch_addr, grp, stream, dg_cntr, ch_cntr, neg_ch_cntr, mapped=mapped)
cg_addr = channel_group.next_cg_addr
dg_cntr += 1
current_cg_index += 1
if self._callback:
self._callback(current_cg_index, cg_count)
if self._terminate:
self.close()
return
new_groups.append(grp)
# store channel groups record sizes dict in each
# new group data belong to the initial unsorted group, and add
# the key 'sorted' with the value False to use a flag;
address = group.data_block_addr
info = self._get_data_blocks_info(
address=address, stream=stream, block_type=b"##DT", mapped=mapped
)
for grp in new_groups:
grp.data_location = v4c.LOCATION_ORIGINAL_FILE
grp.set_blocks_info(info)
self.groups.extend(new_groups)
dg_addr = group.next_dg_addr
# all channels have been loaded so now we can link the
# channel dependencies and load the signal data for VLSD channels
for grp in self.groups:
for dep_list in grp.channel_dependencies:
if not dep_list:
continue
for dep in dep_list:
if isinstance(dep, ChannelArrayBlock):
conditions = (
dep.ca_type == v4c.CA_TYPE_LOOKUP,
dep.links_nr == 4 * dep.dims + 1,
)
if not all(conditions):
continue
for i in range(dep.dims):
ch_addr = dep[f"scale_axis_{i}_ch_addr"]
ref_channel = self._ch_map[ch_addr]
dep.referenced_channels.append(ref_channel)
else:
break
# append indexes of groups that contain raw CAN bus logging and
# store signals and metadata that will be used to create the new
# groups.
raw_can = []
processed_can = []
for i, group in enumerate(self.groups):
if group.CAN_logging:
if group.CAN_id not in self.can_logging_db:
self.can_logging_db[group.CAN_id] = {}
message_id = group.message_id
if message_id is not None:
self.can_logging_db[group.CAN_id][message_id] = i
else:
continue
if group.raw_can:
can_ids = self.get(
"CAN_DataFrame.ID", group=i, ignore_invalidation_bits=True
)
all_can_ids = sorted(set(can_ids.samples))
payload = self.get(
"CAN_DataFrame.DataBytes",
group=i,
samples_only=True,
ignore_invalidation_bits=True,
)[0]
_sig = self.get("CAN_DataFrame", group=i, ignore_invalidation_bits=True)
attachment = _sig.attachment
if attachment and attachment[1].lower().endswith(("dbc", "arxml")):
attachment, at_name = attachment
raw_can.append(i)
import_type = "dbc" if at_name.lower().endswith("dbc") else "arxml"
db = loads(
attachment.decode("utf-8"), importType=import_type, key="db"
)["db"]
board_units = set(bu.name for bu in db.boardUnits)
cg_source = group.channel_group.acq_source
for message_id in all_can_ids:
self.can_logging_db[group.CAN_id][message_id] = i
sigs = []
can_msg = db.frameById(message_id)
for transmitter in can_msg.transmitters:
if transmitter in board_units:
break
else:
transmitter = ""
message_name = can_msg.name
source = SignalSource(
transmitter,
can_msg.name,
"",
v4c.SOURCE_BUS,
v4c.BUS_TYPE_CAN,
)
idx = nonzero(can_ids.samples == message_id)[0]
data = payload[idx]
t = can_ids.timestamps[idx].copy()
if can_ids.invalidation_bits is not None:
invalidation_bits = can_ids.invalidation_bits[idx]
else:
invalidation_bits = None
for signal in sorted(can_msg.signals, key=lambda x: x.name):
sig_vals = self.get_can_signal(
f"CAN{group.CAN_id}.{can_msg.name}.{signal.name}",
db=db,
ignore_invalidation_bits=True,
).samples
conversion = ChannelConversion(
a=float(signal.factor),
b=float(signal.offset),
conversion_type=v4c.CONVERSION_TYPE_LIN,
)
conversion.unit = signal.unit or ""
sigs.append(
Signal(
sig_vals,
t,
name=signal.name,
conversion=conversion,
source=source,
unit=signal.unit,
raw=True,
invalidation_bits=invalidation_bits,
)
)
processed_can.append(
[sigs, message_id, message_name, cg_source, group.CAN_id]
)
else:
at_name = attachment[1] if attachment else ""
message = f'Expected .dbc or .arxml file as CAN channel attachment but got "{at_name}"'
logger.warning(message)
grp.CAN_database = False
raw_can.append(i)
sigs = []
cg_source = group.channel_group.acq_source
for message_id in all_can_ids:
source = SignalSource(
"", "", "", v4c.SOURCE_BUS, v4c.BUS_TYPE_CAN
)
idx = nonzero(can_ids.samples == message_id)[0]
data = payload[idx]
t = can_ids.timestamps[idx]
if can_ids.invalidation_bits is not None:
invalidation_bits = can_ids.invalidation_bits[idx]
else:
invalidation_bits = None
sigs.append(
Signal(
data,
t,
name="CAN_DataFrame.DataBytes",
source=source,
raw=True,
invalidation_bits=invalidation_bits,
)
)
processed_can.append(
[sigs, message_id, "", cg_source, group.CAN_id]
)
# delete the groups that contain raw CAN bus logging and also
# delete the channel entries from the channels_db. Update data group
# index for the remaining channel entries. Append new data groups
if raw_can:
for index in reversed(raw_can):
self.groups.pop(index)
excluded_channels = []
for name, db_entry in self.channels_db.items():
new_entry = []
for i, entry in enumerate(db_entry):
new_group_index = entry[0]
if new_group_index in raw_can:
continue
for index in raw_can:
if new_group_index > index:
new_group_index += 1
else:
break
new_entry.append((new_group_index, entry[1]))
if new_entry:
self.channels_db[name] = new_entry
else:
excluded_channels.append(name)
for name in excluded_channels:
del self.channels_db[name]
for sigs, message_id, message_name, cg_source, can_id in processed_can:
self.append(
sigs, "Extracted from raw CAN bus logging", common_timebase=True
)
group = self.groups[-1]
group.CAN_database = message_name != ""
group.CAN_logging = True
group.CAN_id = can_id
if message_id > 0:
if message_id > 0x80000000:
message_id -= 0x80000000
group.extended_id = True
else:
group.extended_id = False
group.message_name = message_name
group.message_id = message_id
group.channel_group.acq_source = cg_source
group.data_group.comment = (
f'From message {hex(message_id)}="{message_name}"'
)
self.can_logging_db = {}
for i, group in enumerate(self.groups):
if not group.CAN_logging:
continue
if not group.CAN_id in self.can_logging_db:
self.can_logging_db[group.CAN_id] = {}
if group.message_id is not None:
self.can_logging_db[group.CAN_id][group.message_id] = i
# read events
addr = self.header.first_event_addr
ev_map = {}
event_index = 0
while addr:
event = EventBlock(address=addr, stream=stream, mapped=mapped)
event.update_references(self._ch_map, self._cg_map)
self.events.append(event)
ev_map[addr] = event_index
event_index += 1
addr = event.next_ev_addr
for event in self.events:
addr = event.parent_ev_addr
if addr:
event.parent = ev_map[addr]
addr = event.range_start_ev_addr
if addr:
event.range_start = ev_map[addr]
self._si_map.clear()
self._ch_map.clear()
self._cc_map.clear()
self._master_channel_cache.clear()
self.progress = cg_count, cg_count
def _read_channels(
self,
ch_addr,
grp,
stream,
dg_cntr,
ch_cntr,
neg_ch_cntr,
channel_composition=False,
mapped=False,
):
channels = grp.channels
dependencies = grp.channel_dependencies
composition = []
composition_dtype = []
while ch_addr:
# read channel block and create channel object
channel = Channel(
address=ch_addr,
stream=stream,
cc_map=self._cc_map,
si_map=self._si_map,
at_map=self._attachments_map,
use_display_names=self._use_display_names,
mapped=mapped,
)
value = channel
display_name = channel.display_name
name = channel.name
entry = (dg_cntr, ch_cntr)
self._ch_map[ch_addr] = entry
channels.append(value)
if channel_composition:
composition.append(entry)
self.channels_db.add(display_name, entry)
self.channels_db.add(name, entry)
# signal data
address = channel.data_block_addr
grp.signal_data.append(address)
if channel.channel_type in MASTER_CHANNELS:
self.masters_db[dg_cntr] = ch_cntr
ch_cntr += 1
component_addr = channel.component_addr
if component_addr:
# check if it is a CABLOCK or CNBLOCK
stream.seek(component_addr)
blk_id = stream.read(4)
if blk_id == b"##CN":
index = ch_cntr - 1
dependencies.append(None)
ch_cntr, neg_ch_cntr, ret_composition, ret_composition_dtype = self._read_channels(
component_addr, grp, stream, dg_cntr, ch_cntr, neg_ch_cntr, True, mapped=mapped
)
dependencies[index] = ret_composition
channel.dtype_fmt = ret_composition_dtype
composition_dtype.append((channel.name, channel.dtype_fmt))
if grp.CAN_id is not None and grp.message_id is not None:
try:
addr = channel.attachment_addr
except AttributeError:
addr = 0
if addr:
attachment_addr = self._attachments_map[addr]
if attachment_addr not in self._dbc_cache:
attachment, at_name = self.extract_attachment(
index=attachment_addr
)
if (
not at_name.name.lower().endswith(("dbc", "arxml"))
or not attachment
):
message = f'Expected .dbc or .arxml file as CAN channel attachment but got "{at_name}"'
logger.warning(message)
grp.CAN_database = False
else:
import_type = (
"dbc"
if at_name.name.lower().endswith("dbc")
else "arxml"
)
try:
attachment_string = attachment.decode("utf-8")
self._dbc_cache[attachment_addr] = loads(
attachment_string,
importType=import_type,
key="db",
)["db"]
grp.CAN_database = True
except UnicodeDecodeError:
try:
from cchardet import detect
encoding = detect(attachment)["encoding"]
attachment_string = attachment.decode(
encoding
)
self._dbc_cache[attachment_addr] = loads(
attachment_string,
importType=import_type,
key="db",
encoding=encoding,
)["db"]
grp.CAN_database = True
except ImportError:
message = (
"Unicode exception occured while processing the database "
f'attachment "{at_name}" and "cChardet" package is '
'not installed. Mdf version 4 expects "utf-8" '
"strings and this package may detect if a different"
" encoding was used"
)
logger.warning(message)
grp.CAN_database = False
else:
grp.CAN_database = True
else:
grp.CAN_database = False
if grp.CAN_database:
# here we make available multiple ways to refer to
# CAN signals by using fake negative indexes for
# the channel entries in the channels_db
grp.dbc_addr = attachment_addr
message_id = grp.message_id
message_name = grp.message_name
can_id = grp.CAN_id
can_msg = self._dbc_cache[attachment_addr].frameById(
message_id
)
can_msg_name = can_msg.name
for entry in self.channels_db["CAN_DataFrame.DataBytes"]:
if entry[0] == dg_cntr:
index = entry[1]
break
payload = channels[index]
logging_channels = grp.logging_channels
for signal in can_msg.signals:
signal_name = signal.name
# 0 - name
# 1 - message_name.name
# 2 - can_id.message_name.name
# 3 - can_msg_name.name
# 4 - can_id.can_msg_name.name
name_ = signal_name
little_endian = (
True if signal.is_little_endian else False
)
signed = signal.is_signed
s_type = info_to_datatype_v4(signed, little_endian)
bit_offset = signal.startBit % 8
byte_offset = signal.startBit // 8
bit_count = signal.size
comment = signal.comment or ""
if (signal.factor, signal.offset) != (1, 0):
conversion = ChannelConversion(
a=float(signal.factor),
b=float(signal.offset),
conversion_type=v4c.CONVERSION_TYPE_LIN,
)
conversion.unit = signal.unit or ""
else:
conversion = None
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VALUE,
"data_type": s_type,
"sync_type": payload.sync_type,
"byte_offset": byte_offset + payload.byte_offset,
"bit_offset": bit_offset,
"bit_count": bit_count,
"min_raw_value": 0,
"max_raw_value": 0,
"lower_limit": 0,
"upper_limit": 0,
"flags": 0,
"pos_invalidation_bit": payload.pos_invalidation_bit,
}
log_channel = Channel(**kwargs)
log_channel.name = name_
log_channel.comment = comment
log_channel.source = deepcopy(channel.source)
log_channel.conversion = conversion
log_channel.unit = signal.unit or ""
logging_channels.append(log_channel)
entry = dg_cntr, neg_ch_cntr
self.channels_db.add(name_, entry)
name_ = f"{message_name}.{signal_name}"
self.channels_db.add(name_, entry)
name_ = f"CAN{can_id}.{message_name}.{signal_name}"
self.channels_db.add(name_, entry)
name_ = f"{can_msg_name}.{signal_name}"
self.channels_db.add(name_, entry)
name_ = f"CAN{can_id}.{can_msg_name}.{signal_name}"
self.channels_db.add(name_, entry)
neg_ch_cntr -= 1
grp.channel_group["flags"] &= ~v4c.FLAG_CG_PLAIN_BUS_EVENT
else:
# only channel arrays with storage=CN_TEMPLATE are
# supported so far
ca_block = ChannelArrayBlock(
address=component_addr, stream=stream, mapped=mapped
)
if ca_block.storage != v4c.CA_STORAGE_TYPE_CN_TEMPLATE:
logger.warning("Only CN template arrays are supported")
ca_list = [ca_block]
while ca_block.composition_addr:
ca_block = ChannelArrayBlock(
address=ca_block.composition_addr,
stream=stream,
mapped=mapped,
)
ca_list.append(ca_block)
dependencies.append(ca_list)
else:
dependencies.append(None)
if channel_composition:
channel.dtype_fmt = get_fmt_v4(channel.data_type, channel.bit_count, channel.channel_type)
composition_dtype.append((channel.name, channel.dtype_fmt))
# go to next channel of the current channel group
ch_addr = channel.next_ch_addr
return ch_cntr, neg_ch_cntr, composition, composition_dtype
def _load_signal_data(self, address=None, stream=None, group=None, index=None):
""" this method is used to get the channel signal data, usually for
VLSD channels
Parameters
----------
address : int
address of refrerenced block
stream : handle
file IO stream handle
Returns
-------
data : bytes
signal data bytes
"""
if address == 0:
data = b""
elif address is not None and stream is not None:
stream.seek(address)
blk_id = stream.read(4)
if blk_id == b"##SD":
data = DataBlock(address=address, stream=stream)
data = data.data
elif blk_id == b"##DZ":
data = DataZippedBlock(address=address, stream=stream)
data = data.data
elif blk_id == b"##CG":
group = self.groups[self._cg_map[address]]
data = b"".join(fragment[0] for fragment in self._load_data(group))
elif blk_id == b"##DL":
data = []
while address:
# the data list will contain only links to SDBLOCK's
data_list = DataList(address=address, stream=stream)
nr = data_list.links_nr
# aggregate data from all SDBLOCK
for i in range(nr - 1):
addr = data_list[f"data_block_addr{i}"]
stream.seek(addr)
blk_id = stream.read(4)
if blk_id == b"##SD":
block = DataBlock(address=addr, stream=stream)
data.append(block.data)
elif blk_id == b"##DZ":
block = DataZippedBlock(address=addr, stream=stream)
data.append(block.data)
else:
message = f'Expected SD, DZ or DL block at {hex(address)} but found id="{blk_id}"'
logger.warning(message)
return b""
address = data_list.next_dl_addr
data = b"".join(data)
elif blk_id == b"##CN":
data = b""
elif blk_id == b"##HL":
hl = HeaderList(address=address, stream=stream)
data = self._load_signal_data(
address=hl.first_dl_addr, stream=stream, group=group, index=index
)
elif blk_id == b"##AT":
data = b""
else:
message = f'Expected AT, CG, SD, DL, DZ or CN block at {hex(address)} but found id="{blk_id}"'
logger.warning(message)
data = b""
elif group is not None and index is not None:
if group.data_location == v4c.LOCATION_ORIGINAL_FILE:
data = self._load_signal_data(
address=group.signal_data[index], stream=self._file
)
elif group.data_location == v4c.LOCATION_MEMORY:
data = group.signal_data[index]
else:
data = []
stream = self._tempfile
if group.signal_data[index]:
for addr, size in zip(
group.signal_data[index], group.signal_data_size[index]
):
if not size:
continue
stream.seek(addr)
data.append(stream.read(size))
data = b"".join(data)
else:
data = b""
return data
def _load_data(self, group, record_offset=0, record_count=None):
""" get group's data block bytes """
offset = 0
has_yielded = False
_count = record_count
data_group = group.data_group
channel_group = group.channel_group
if group.data_location == v4c.LOCATION_ORIGINAL_FILE:
stream = self._file
else:
stream = self._tempfile
read = stream.read
seek = stream.seek
block_type = group.data_block_type
param = group.param
samples_size = (
channel_group.samples_byte_nr + channel_group.invalidation_bytes_nr
)
record_offset *= samples_size
finished = False
if record_count is not None:
record_count *= samples_size
if not samples_size:
yield b"", offset, _count
else:
if group.sorted:
if self._read_fragment_size:
split_size = self._read_fragment_size // samples_size
split_size *= samples_size
else:
channels_nr = len(group.channels)
y_axis = CONVERT
idx = searchsorted(CHANNEL_COUNT, channels_nr, side="right") - 1
if idx < 0:
idx = 0
split_size = y_axis[idx]
split_size = split_size // samples_size
split_size *= samples_size
if split_size == 0:
split_size = samples_size
addr = group.data_block_addr
blocks = zip(group.data_block_addr, group.data_size, group.data_block_size)
if addr:
if group.sorted:
if block_type == v4c.DT_BLOCK:
cur_size = 0
data = []
while True:
try:
address, size, block_size = next(blocks)
current_address = address
except StopIteration:
break
if offset + size < record_offset + 1:
offset += size
continue
if offset < record_offset:
delta = record_offset - offset
current_address += delta
size -= delta
offset = record_offset
while size >= split_size - cur_size:
seek(current_address)
if data:
data.append(read(split_size - cur_size))
data_ = b"".join(data)
if record_count is not None:
yield data_[:record_count], offset, _count
has_yielded = True
record_count -= len(data_)
if record_count <= 0:
finished = True
break
else:
yield data_, offset, _count
has_yielded = True
current_address += split_size - cur_size
else:
data_ = read(split_size)
if record_count is not None:
yield data_[:record_count], offset, _count
has_yielded = True
record_count -= len(data_)
if record_count <= 0:
finished = True
break
else:
yield data_, offset, _count
has_yielded = True
current_address += split_size
offset += split_size
size -= split_size - cur_size
data = []
cur_size = 0
if finished:
data = []
break
if size:
seek(current_address)
data.append(read(size))
cur_size += size
if data:
data_ = b"".join(data)
if record_count is not None:
yield data_[:record_count], offset, _count
has_yielded = True
record_count -= len(data_)
else:
yield data_, offset, _count
has_yielded = True
else:
extra_bytes = b""
for (address, size, block_size) in blocks:
if offset + size < record_offset + 1:
offset += size
continue
seek(address)
data = read(block_size)
if block_type == v4c.DZ_BLOCK_DEFLATE:
data = decompress(data, 0, size)
else:
data = decompress(data, 0, size)
cols = param
lines = size // cols
nd = fromstring(data[: lines * cols], dtype=uint8)
nd = nd.reshape((cols, lines))
data = nd.T.tostring() + data[lines * cols :]
if offset < record_offset:
delta = record_offset - offset
offset = record_offset
data = data[delta:]
if extra_bytes:
data = extra_bytes + data
dim = len(data)
new_extra_bytes = dim % samples_size
if new_extra_bytes:
extra_bytes = data[-new_extra_bytes:]
data = data[:-new_extra_bytes]
offset_increase = dim - new_extra_bytes
else:
extra_bytes = b""
offset_increase = dim
if record_count is not None:
yield data[:record_count], offset, _count
has_yielded = True
record_count -= len(data)
if record_count <= 0:
finished = True
break
else:
yield data, offset, _count
has_yielded = True
offset += offset_increase
if extra_bytes:
if record_count is not None:
if not finished:
yield extra_bytes[:record_count], offset, _count
has_yielded = True
else:
yield extra_bytes, offset, _count
has_yielded = True
else:
for (address, size, block_size) in blocks:
seek(address)
data = read(block_size)
if block_type == v4c.DZ_BLOCK_DEFLATE:
data = decompress(data, 0, size)
elif block_type == v4c.DZ_BLOCK_TRANSPOSED:
data = decompress(data, 0, size)
cols = param
lines = size // cols
nd = fromstring(data[: lines * cols], dtype=uint8)
nd = nd.reshape((cols, lines))
data = nd.T.tostring() + data[lines * cols :]
rec_data = []
cg_size = group.record_size
record_id = channel_group.record_id
record_id_nr = data_group.record_id_len
if record_id_nr == 1:
_unpack_stuct = UINT8_u
elif record_id_nr == 2:
_unpack_stuct = UINT16_u
elif record_id_nr == 4:
_unpack_stuct = UINT32_u
elif record_id_nr == 8:
_unpack_stuct = UINT64_u
else:
message = f"invalid record id size {record_id_nr}"
raise MdfException(message)
i = 0
size = len(data)
while i < size:
(rec_id,) = _unpack_stuct(data[i : i + record_id_nr])
# skip record id
i += record_id_nr
rec_size = cg_size[rec_id]
if rec_size:
if rec_id == record_id:
rec_data.append(data[i : i + rec_size])
else:
(rec_size,) = UINT32_u(data[i : i + 4])
if rec_id == record_id:
rec_data.append(data[i : i + 4 + rec_size])
i += 4
i += rec_size
rec_data = b"".join(rec_data)
size = len(rec_data)
if size:
if offset + size < record_offset + 1:
offset += size
continue
if offset < record_offset:
delta = record_offset - offset
size -= delta
offset = record_offset
if record_count is not None:
yield rec_data[:record_count], offset, _count
has_yielded = True
record_count -= len(rec_data)
if record_count <= 0:
break
else:
yield rec_data, offset, _count
has_yielded = True
offset += size
if not has_yielded:
yield b"", 0, _count
else:
yield b"", offset, _count
def _prepare_record(self, group):
""" compute record dtype and parents dict fro this group
Parameters
----------
group : dict
MDF group dict
Returns
-------
parents, dtypes : dict, numpy.dtype
mapping of channels to records fields, records fields dtype
"""
parents, dtypes = group.parents, group.types
no_parent = None, None
if parents is None:
channel_group = group.channel_group
channels = group.channels
bus_event = channel_group.flags & v4c.FLAG_CG_BUS_EVENT
record_size = channel_group.samples_byte_nr
invalidation_bytes_nr = channel_group.invalidation_bytes_nr
next_byte_aligned_position = 0
types = []
current_parent = ""
parent_start_offset = 0
parents = {}
group_channels = UniqueDB()
neg_index = -1
sortedchannels = sorted(enumerate(channels), key=lambda i: i[1])
for original_index, new_ch in sortedchannels:
start_offset = new_ch.byte_offset
bit_offset = new_ch.bit_offset
data_type = new_ch.data_type
bit_count = new_ch.bit_count
ch_type = new_ch.channel_type
dependency_list = group.channel_dependencies[original_index]
name = new_ch.name
# handle multiple occurance of same channel name
name = group_channels.get_unique_name(name)
if start_offset >= next_byte_aligned_position:
if ch_type not in v4c.VIRTUAL_TYPES:
if not dependency_list:
parent_start_offset = start_offset
# check if there are byte gaps in the record
gap = parent_start_offset - next_byte_aligned_position
if gap:
types.append(("", f"V{gap}"))
# adjust size to 1, 2, 4 or 8 bytes
size = bit_offset + bit_count
if data_type not in v4c.NON_SCALAR_TYPES:
if size > 32:
size = 8
elif size > 16:
size = 4
elif size > 8:
size = 2
else:
size = 1
else:
size = size // 8
next_byte_aligned_position = parent_start_offset + size
bit_count = size * 8
if next_byte_aligned_position <= record_size:
if not new_ch.dtype_fmt:
new_ch.dtype_fmt = get_fmt_v4(
data_type, bit_count, ch_type
)
dtype_pair = (name, new_ch.dtype_fmt)
types.append(dtype_pair)
parents[original_index] = name, bit_offset
else:
next_byte_aligned_position = parent_start_offset
current_parent = name
else:
if isinstance(dependency_list[0], ChannelArrayBlock):
ca_block = dependency_list[0]
# check if there are byte gaps in the record
gap = start_offset - next_byte_aligned_position
if gap:
dtype_pair = "", f"V{gap}"
types.append(dtype_pair)
size = max(bit_count // 8, 1)
shape = tuple(
ca_block[f"dim_size_{i}"]
for i in range(ca_block.dims)
)
if (
ca_block.byte_offset_base // size > 1
and len(shape) == 1
):
shape += (ca_block.byte_offset_base // size,)
dim = 1
for d in shape:
dim *= d
if not new_ch.dtype_fmt:
new_ch.dtype_fmt = get_fmt_v4(data_type, bit_count)
dtype_pair = (name, new_ch.dtype_fmt, shape)
types.append(dtype_pair)
current_parent = name
next_byte_aligned_position = start_offset + size * dim
parents[original_index] = name, 0
else:
parents[original_index] = no_parent
if bus_event:
for logging_channel in group.logging_channels:
parents[neg_index] = (
"CAN_DataFrame.DataBytes",
logging_channel.bit_offset,
)
neg_index -= 1
# virtual channels do not have bytes in the record
else:
parents[original_index] = no_parent
else:
max_overlapping_size = (next_byte_aligned_position - start_offset) * 8
needed_size = bit_offset + bit_count
if max_overlapping_size >= needed_size:
parents[original_index] = (
current_parent,
((start_offset - parent_start_offset) * 8) + bit_offset,
)
if next_byte_aligned_position > record_size:
break
gap = record_size - next_byte_aligned_position
if gap > 0:
dtype_pair = "", f"V{gap}"
types.append(dtype_pair)
dtype_pair = "invalidation_bytes", "<u1", (invalidation_bytes_nr,)
types.append(dtype_pair)
dtypes = dtype(types)
group.parents, group.types = parents, dtypes
return parents, dtypes
def _append_structure_composition(
self,
grp,
signal,
field_names,
offset,
dg_cntr,
ch_cntr,
parents,
defined_texts,
invalidation_bytes_nr,
inval_bits,
inval_cntr,
):
si_map = self._si_map
fields = []
types = []
file = self._tempfile
seek = file.seek
seek(0, 2)
gp = grp
gp_sdata = gp.signal_data
gp_sdata_size = gp.signal_data_size
gp_channels = gp.channels
gp_dep = gp.channel_dependencies
name = signal.name
names = signal.samples.dtype.names
field_name = field_names.get_unique_name(name)
# first we add the structure channel
if signal.attachment:
at_data, at_name = signal.attachment
attachment_addr = self.attach(at_data, at_name, mime="application/x-dbc")
else:
attachment_addr = 0
# add channel block
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VALUE,
"bit_count": signal.samples.dtype.itemsize * 8,
"byte_offset": offset,
"bit_offset": 0,
"data_type": v4c.DATA_TYPE_BYTEARRAY,
"precision": 255,
"flags": 0,
}
if attachment_addr:
kwargs["attachment_addr"] = attachment_addr
kwargs["flags"] |= v4c.FLAG_CN_BUS_EVENT
if invalidation_bytes_nr and signal.invalidation_bits is not None:
inval_bits.append(signal.invalidation_bits)
kwargs["flags"] |= v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = inval_cntr
inval_cntr += 1
ch = Channel(**kwargs)
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
ch.display_name = signal.display_name
# source for channel
source = signal.source
if source:
if source in si_map:
ch.source = si_map[source]
else:
new_source = SourceInformation(
source_type=source.source_type, bus_type=source.bus_type
)
new_source.name = source.name
new_source.path = source.path
new_source.comment = source.comment
si_map[source] = new_source
ch.source = new_source
entry = dg_cntr, ch_cntr
gp_channels.append(ch)
struct_self = entry
gp_sdata.append(None)
gp_sdata_size.append(0)
self.channels_db.add(name, entry)
if ch.display_name:
self.channels_db.add(ch.display_name, entry)
# update the parents as well
parents[ch_cntr] = name, 0
ch_cntr += 1
dep_list = []
gp_dep.append(dep_list)
# then we add the fields
for name in names:
field_name = field_names.get_unique_name(name)
samples = signal.samples[name]
fld_names = samples.dtype.names
if fld_names is None:
sig_type = v4c.SIGNAL_TYPE_SCALAR
if samples.dtype.kind in "SV":
sig_type = v4c.SIGNAL_TYPE_STRING
else:
if fld_names in (v4c.CANOPEN_TIME_FIELDS, v4c.CANOPEN_DATE_FIELDS):
sig_type = v4c.SIGNAL_TYPE_CANOPEN
elif fld_names[0] != name:
sig_type = v4c.SIGNAL_TYPE_STRUCTURE_COMPOSITION
else:
sig_type = v4c.SIGNAL_TYPE_ARRAY
if sig_type == v4c.SIGNAL_TYPE_SCALAR:
s_type, s_size = fmt_to_datatype_v4(samples.dtype, samples.shape)
byte_size = s_size >> 3
fields.append(samples)
types.append((field_name, samples.dtype, samples.shape[1:]))
# add channel block
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VALUE,
"bit_count": s_size,
"byte_offset": offset,
"bit_offset": 0,
"data_type": s_type,
"flags": 0,
}
if attachment_addr:
kwargs["flags"] |= v4c.FLAG_CN_BUS_EVENT
if invalidation_bytes_nr:
if signal.invalidation_bits is not None:
inval_bits.append(signal.invalidation_bits)
kwargs["flags"] |= v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = inval_cntr
inval_cntr += 1
ch = Channel(**kwargs)
ch.name = name
entry = (dg_cntr, ch_cntr)
gp_channels.append(ch)
dep_list.append(entry)
offset += byte_size
gp_sdata.append(None)
gp_sdata_size.append(0)
self.channels_db.add(name, entry)
# update the parents as well
parents[ch_cntr] = field_name, 0
ch_cntr += 1
gp_dep.append(None)
elif sig_type == v4c.SIGNAL_TYPE_STRUCTURE_COMPOSITION:
struct = Signal(
samples,
samples,
name=name,
invalidation_bits=signal.invalidation_bits,
)
offset, dg_cntr, ch_cntr, sub_structure, new_fields, new_types, inval_cntr = self._append_structure_composition(
grp,
struct,
field_names,
offset,
dg_cntr,
ch_cntr,
parents,
defined_texts,
invalidation_bytes_nr,
inval_bits,
inval_cntr,
)
dep_list.append(sub_structure)
fields.extend(new_fields)
types.extend(new_types)
return offset, dg_cntr, ch_cntr, struct_self, fields, types, inval_cntr
def _get_not_byte_aligned_data(self, data, group, ch_nr):
big_endian_types = (
v4c.DATA_TYPE_UNSIGNED_MOTOROLA,
v4c.DATA_TYPE_REAL_MOTOROLA,
v4c.DATA_TYPE_SIGNED_MOTOROLA,
)
record_size = (
group.channel_group.samples_byte_nr
+ group.channel_group.invalidation_bytes_nr
)
if ch_nr >= 0:
channel = group.channels[ch_nr]
else:
channel = group.logging_channels[-ch_nr - 1]
bit_offset = channel.bit_offset
byte_offset = channel.byte_offset
bit_count = channel.bit_count
dependencies = group.channel_dependencies[ch_nr]
if dependencies and isinstance(dependencies[0], ChannelArrayBlock):
ca_block = dependencies[0]
size = bit_count >> 3
shape = tuple(ca_block[f"dim_size_{i}"] for i in range(ca_block.dims))
if ca_block.byte_offset_base // size > 1 and len(shape) == 1:
shape += (ca_block.byte_offset_base // size,)
dim = 1
for d in shape:
dim *= d
size *= dim
bit_count = size << 3
byte_count = bit_offset + bit_count
if byte_count % 8:
byte_count = (byte_count >> 3) + 1
else:
byte_count >>= 3
types = [
("", f"a{byte_offset}"),
("vals", f"({byte_count},)u1"),
("", f"a{record_size - byte_count - byte_offset}"),
]
vals = fromstring(data, dtype=dtype(types))
vals = vals["vals"]
if channel.data_type not in big_endian_types:
vals = flip(vals, 1)
vals = unpackbits(vals)
vals = roll(vals, bit_offset)
vals = vals.reshape((len(vals) // 8, 8))
vals = packbits(vals)
vals = vals.reshape((len(vals) // byte_count, byte_count))
if bit_count < 64:
mask = 2 ** bit_count - 1
masks = []
while mask:
masks.append(mask & 0xFF)
mask >>= 8
for i in range(byte_count - len(masks)):
masks.append(0)
masks = masks[::-1]
for i, mask in enumerate(masks):
vals[:, i] &= mask
if channel.data_type not in big_endian_types:
vals = flip(vals, 1)
if bit_count <= 8:
size = 1
elif bit_count <= 16:
size = 2
elif bit_count <= 32:
size = 4
elif bit_count <= 64:
size = 8
else:
size = bit_count // 8
if size > byte_count:
extra_bytes = size - byte_count
extra = zeros((len(vals), extra_bytes), dtype=uint8)
types = [
("vals", vals.dtype, vals.shape[1:]),
("", extra.dtype, extra.shape[1:]),
]
vals = fromarrays([vals, extra], dtype=dtype(types))
channel.dtype_fmt = get_fmt_v4(channel.data_type, bit_count, channel.channel_type)
fmt = channel.dtype_fmt
if size <= byte_count:
if channel.data_type in big_endian_types:
types = [("", f"a{byte_count - size}"), ("vals", fmt)]
else:
types = [("vals", fmt), ("", f"a{byte_count - size}")]
else:
types = [("vals", fmt)]
vals = fromstring(vals.tobytes(), dtype=dtype(types))["vals"]
if channel.data_type in v4c.SIGNED_INT:
return as_non_byte_sized_signed_int(vals, bit_count)
else:
return vals
def _validate_channel_selection(
self, name=None, group=None, index=None, source=None
):
"""Gets channel comment.
Channel can be specified in two ways:
* using the first positional argument *name*
* if there are multiple occurrences for this channel then the
*group* and *index* arguments can be used to select a specific
group.
* if there are multiple occurrences for this channel and either the
*group* or *index* arguments is None then a warning is issued
* using the group number (keyword argument *group*) and the channel
number (keyword argument *index*). Use *info* method for group and
channel numbers
If the *raster* keyword argument is not *None* the output is
interpolated accordingly.
Parameters
----------
name : string
name of channel
group : int
0-based group index
index : int
0-based channel index
Returns
-------
group_index, channel_index : (int, int)
selected channel's group and channel index
"""
suppress = True
if name is None:
if group is None or index is None:
message = (
"Invalid arguments for channel selection: "
'must give "name" or, "group" and "index"'
)
raise MdfException(message)
else:
gp_nr, ch_nr = group, index
if ch_nr >= 0:
try:
grp = self.groups[gp_nr]
except IndexError:
raise MdfException("Group index out of range")
try:
grp.channels[ch_nr]
except IndexError:
raise MdfException("Channel index out of range")
else:
if name not in self.channels_db:
raise MdfException(f'Channel "{name}" not found')
else:
if source is not None:
for gp_nr, ch_nr in self.channels_db[name]:
source_name = self._get_source_name(gp_nr, ch_nr)
if source_name == source:
break
else:
raise MdfException(f"{name} with source {source} not found")
elif group is None:
gp_nr, ch_nr = self.channels_db[name][0]
if len(self.channels_db[name]) > 1 and not suppress:
message = (
f'Multiple occurances for channel "{name}". '
f"Using first occurance from data group {gp_nr}. "
'Provide both "group" and "index" arguments'
" to select another data group"
)
logger.warning(message)
else:
if index is not None and index < 0:
gp_nr = group
ch_nr = index
else:
for gp_nr, ch_nr in self.channels_db[name]:
if gp_nr == group:
if index is None:
break
elif index == ch_nr:
break
else:
if index is None:
message = f'Channel "{name}" not found in group {group}'
else:
message = f'Channel "{name}" not found in group {group} at index {index}'
raise MdfException(message)
return gp_nr, ch_nr
def _get_source_name(self, group, index):
return self.groups[group].channels[index].source.name or ""
def _get_data_blocks_info(self, address, stream, block_type=b"##DT", mapped=False):
info = {
"data_block_addr": [],
"data_block_type": 0,
"data_size": [],
"data_block_size": [],
"param": 0,
}
# for low and minimum options save each block's type,
# address and size
if mapped:
if address:
id_string, _, block_len, __ = COMMON_uf(stream, address)
# can be a DataBlock
if id_string == block_type:
size = block_len - 24
if size:
info["data_size"].append(size)
info["data_block_size"].append(size)
info["data_block_addr"].append(address + COMMON_SIZE)
info["data_block_type"] = v4c.DT_BLOCK
# or a DataZippedBlock
elif id_string == b"##DZ":
temp = {}
(
temp["id"],
temp["reserved0"],
temp["block_len"],
temp["links_nr"],
temp["original_type"],
temp["zip_type"],
temp["reserved1"],
temp["param"],
temp["original_size"],
temp["zip_size"],
) = v4c.DZ_COMMON_uf(stream, address)
if temp["original_size"]:
info["data_size"].append(temp["original_size"])
info["data_block_size"].append(temp["zip_size"])
info["data_block_addr"].append(address + v4c.DZ_COMMON_SIZE)
if temp["zip_type"] == v4c.FLAG_DZ_DEFLATE:
info["data_block_type"] = v4c.DZ_BLOCK_DEFLATE
else:
info["data_block_type"] = v4c.DZ_BLOCK_TRANSPOSED
info["param"] = temp["param"]
# or a DataList
elif id_string == b"##DL":
info["data_block_type"] = v4c.DT_BLOCK
while address:
dl = DataList(address=address, stream=stream, mapped=mapped)
for i in range(dl.data_block_nr):
addr = dl[f"data_block_addr{i}"]
(size,) = UINT64_uf(stream, addr + 8)
size -= COMMON_SIZE
if size:
info["data_block_addr"].append(addr + COMMON_SIZE)
info["data_size"].append(size)
info["data_block_size"].append(size)
address = dl.next_dl_addr
# or a header list
elif id_string == b"##HL":
hl = HeaderList(address=address, stream=stream, mapped=mapped)
if hl.zip_type == v4c.FLAG_DZ_DEFLATE:
info["data_block_type"] = v4c.DZ_BLOCK_DEFLATE
else:
info["data_block_type"] = v4c.DZ_BLOCK_TRANSPOSED
address = hl.first_dl_addr
while address:
dl = DataList(address=address, stream=stream, mapped=mapped)
for i in range(dl.data_block_nr):
addr = dl[f"data_block_addr{i}"]
param, size, zip_size = unpack_from("<I2Q", stream, addr + 28)
if size:
info["data_block_addr"].append(addr + v4c.DZ_COMMON_SIZE)
info["data_size"].append(size)
info["data_block_size"].append(zip_size)
info["param"] = param
address = dl.next_dl_addr
else:
if address:
stream.seek(address)
id_string, _, block_len, __ = COMMON_u(stream.read(COMMON_SIZE))
# can be a DataBlock
if id_string == block_type:
size = block_len - 24
if size:
info["data_size"].append(size)
info["data_block_size"].append(size)
info["data_block_addr"].append(address + COMMON_SIZE)
info["data_block_type"] = v4c.DT_BLOCK
# or a DataZippedBlock
elif id_string == b"##DZ":
stream.seek(address)
temp = {}
(
temp["id"],
temp["reserved0"],
temp["block_len"],
temp["links_nr"],
temp["original_type"],
temp["zip_type"],
temp["reserved1"],
temp["param"],
temp["original_size"],
temp["zip_size"],
) = v4c.DZ_COMMON_u(stream.read(v4c.DZ_COMMON_SIZE))
if temp["original_size"]:
info["data_size"].append(temp["original_size"])
info["data_block_size"].append(temp["zip_size"])
info["data_block_addr"].append(address + v4c.DZ_COMMON_SIZE)
if temp["zip_type"] == v4c.FLAG_DZ_DEFLATE:
info["data_block_type"] = v4c.DZ_BLOCK_DEFLATE
else:
info["data_block_type"] = v4c.DZ_BLOCK_TRANSPOSED
info["param"] = temp["param"]
# or a DataList
elif id_string == b"##DL":
info["data_block_type"] = v4c.DT_BLOCK
while address:
dl = DataList(address=address, stream=stream)
for i in range(dl.data_block_nr):
addr = dl[f"data_block_addr{i}"]
stream.seek(addr + 8)
(size,) = UINT64_u(stream.read(8))
size -= COMMON_SIZE
if size:
info["data_block_addr"].append(addr + COMMON_SIZE)
info["data_size"].append(size)
info["data_block_size"].append(size)
address = dl.next_dl_addr
# or a header list
elif id_string == b"##HL":
hl = HeaderList(address=address, stream=stream)
if hl.zip_type == v4c.FLAG_DZ_DEFLATE:
info["data_block_type"] = v4c.DZ_BLOCK_DEFLATE
else:
info["data_block_type"] = v4c.DZ_BLOCK_TRANSPOSED
address = hl.first_dl_addr
while address:
dl = DataList(address=address, stream=stream)
for i in range(dl.data_block_nr):
addr = dl[f"data_block_addr{i}"]
stream.seek(addr + 28)
param, size, zip_size = unpack("<I2Q", stream.read(20))
if size:
info["data_block_addr"].append(addr + v4c.DZ_COMMON_SIZE)
info["data_size"].append(size)
info["data_block_size"].append(zip_size)
info["param"] = param
address = dl.next_dl_addr
return info
[docs] def get_invalidation_bits(self, group_index, channel, fragment):
""" get invalidation indexes for the channel
Parameters
----------
group_index : int
group index
channel : Channel
channel object
fragment : (bytes, int)
(fragment bytes, fragment offset)
Returns
-------
invalidation_bits : iterable
iterable of valid channel indexes; if all are valid `None` is
returned
"""
group = self.groups[group_index]
dtypes = group.types
invalidation_size = group.channel_group.invalidation_bytes_nr
data_bytes, offset, _count = fragment
try:
invalidation = self._invalidation_cache[(group_index, offset, _count)]
except KeyError:
record = group.record
if record is None:
dtypes = group.types
if dtypes.itemsize:
record = fromstring(data_bytes, dtype=dtypes)
else:
record = None
invalidation = record["invalidation_bytes"].tostring()
self._invalidation_cache[(group_index, offset, _count)] = invalidation
ch_invalidation_pos = channel["pos_invalidation_bit"]
pos_byte, pos_offset = divmod(ch_invalidation_pos, 8)
rec = fromstring(
invalidation,
dtype=[
("", f"S{pos_byte}"),
("vals", "<u1"),
("", f"S{invalidation_size - pos_byte - 1}"),
],
)
mask = 1 << pos_offset
invalidation_bits = rec["vals"] & mask
invalidation_bits = invalidation_bits.astype(bool)
return invalidation_bits
[docs] def append(self, signals, source_info="Python", common_timebase=False, units=None):
"""
Appends a new data group.
For channel dependencies type Signals, the *samples* attribute must be
a numpy.recarray
Parameters
----------
signals : list | Signal | pandas.DataFrame
list of *Signal* objects, or a single *Signal* object, or a pandas
*DataFrame* object. All bytes columns in the pandas *DataFrame*
must be *utf-8* encoded
source_info : str
source information; default 'Python'
common_timebase : bool
flag to hint that the signals have the same timebase. Only set this
if you know for sure that all appended channels share the same
time base
units : dict
will contain the signal units mapped to the singal names when
appending a pandas DataFrame
Examples
--------
>>> # case 1 conversion type None
>>> s1 = np.array([1, 2, 3, 4, 5])
>>> s2 = np.array([-1, -2, -3, -4, -5])
>>> s3 = np.array([0.1, 0.04, 0.09, 0.16, 0.25])
>>> t = np.array([0.001, 0.002, 0.003, 0.004, 0.005])
>>> names = ['Positive', 'Negative', 'Float']
>>> units = ['+', '-', '.f']
>>> info = {}
>>> s1 = Signal(samples=s1, timstamps=t, unit='+', name='Positive')
>>> s2 = Signal(samples=s2, timstamps=t, unit='-', name='Negative')
>>> s3 = Signal(samples=s3, timstamps=t, unit='flts', name='Floats')
>>> mdf = MDF4('new.mdf')
>>> mdf.append([s1, s2, s3], 'created by asammdf v4.0.0')
>>> # case 2: VTAB conversions from channels inside another file
>>> mdf1 = MDF4('in.mf4')
>>> ch1 = mdf1.get("Channel1_VTAB")
>>> ch2 = mdf1.get("Channel2_VTABR")
>>> sigs = [ch1, ch2]
>>> mdf2 = MDF4('out.mf4')
>>> mdf2.append(sigs, 'created by asammdf v4.0.0')
>>> mdf2.append(ch1, 'just a single channel')
>>> df = pd.DataFrame.from_dict({'s1': np.array([1, 2, 3, 4, 5]), 's2': np.array([-1, -2, -3, -4, -5])})
>>> units = {'s1': 'V', 's2': 'A'}
>>> mdf2.append(df, units=units)
"""
if isinstance(signals, Signal):
signals = [signals]
elif isinstance(signals, DataFrame):
self._append_dataframe(signals, source_info, units=units)
return
interp_mode = self._integer_interpolation
# check if the signals have a common timebase
# if not interpolate the signals using the union of all timbases
if signals:
t_ = signals[0].timestamps
if not common_timebase:
for s in signals[1:]:
if not array_equal(s.timestamps, t_):
different = True
break
else:
different = False
if different:
times = [s.timestamps for s in signals]
t = reduce(union1d, times).flatten().astype(float64)
signals = [s.interp(t, interpolation_mode=interp_mode) for s in signals]
times = None
else:
t = t_
else:
t = t_
else:
t = []
dg_cntr = len(self.groups)
gp = Group(None)
gp.signal_data = gp_sdata = []
gp.signal_data_size = gp_sdata_size = []
gp.channels = gp_channels = []
gp.channel_dependencies = gp_dep = []
gp.signal_types = gp_sig_types = []
gp.logging_channels = []
# channel group
kwargs = {"cycles_nr": 0, "samples_byte_nr": 0}
gp.channel_group = ChannelGroup(**kwargs)
gp.channel_group.name = source_info
if any(sig.invalidation_bits is not None for sig in signals):
invalidation_bytes_nr = 1
gp.channel_group.invalidation_bytes_nr = invalidation_bytes_nr
inval_bits = []
else:
invalidation_bytes_nr = 0
inval_bits = []
inval_cntr = 0
self.groups.append(gp)
cycles_nr = len(t)
fields = []
types = []
parents = {}
ch_cntr = 0
offset = 0
field_names = UniqueDB()
defined_texts = {}
si_map = self._si_map
cc_map = self._cc_map
# setup all blocks related to the time master channel
file = self._tempfile
tell = file.tell
seek = file.seek
seek(0, 2)
if signals:
master_metadata = signals[0].master_metadata
else:
master_metadata = None
if master_metadata:
time_name, sync_type = master_metadata
if sync_type in (0, 1):
time_unit = "s"
elif sync_type == 2:
time_unit = "deg"
elif sync_type == 3:
time_unit = "m"
elif sync_type == 4:
time_unit = "index"
else:
time_name, sync_type = "time", v4c.SYNC_TYPE_TIME
time_unit = "s"
source_block = SourceInformation()
source_block.name = source_block.path = source_info
if signals:
# time channel
t_type, t_size = fmt_to_datatype_v4(t.dtype, t.shape)
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_MASTER,
"data_type": t_type,
"sync_type": sync_type,
"byte_offset": 0,
"bit_offset": 0,
"bit_count": t_size,
}
ch = Channel(**kwargs)
ch.unit = time_unit
ch.name = time_name
ch.source = source_block
name = time_name
gp_channels.append(ch)
gp_sdata.append(None)
gp_sdata_size.append(0)
self.channels_db.add(name, (dg_cntr, ch_cntr))
self.masters_db[dg_cntr] = 0
# data group record parents
parents[ch_cntr] = name, 0
# time channel doesn't have channel dependencies
gp_dep.append(None)
fields.append(t)
types.append((name, t.dtype))
field_names.get_unique_name(name)
offset += t_size // 8
ch_cntr += 1
gp_sig_types.append(0)
for signal in signals:
sig = signal
samples = sig.samples
sig_dtype = samples.dtype
sig_shape = samples.shape
names = sig_dtype.names
name = signal.name
if names is None:
sig_type = v4c.SIGNAL_TYPE_SCALAR
if sig_dtype.kind in "SV":
sig_type = v4c.SIGNAL_TYPE_STRING
else:
if names in (v4c.CANOPEN_TIME_FIELDS, v4c.CANOPEN_DATE_FIELDS):
sig_type = v4c.SIGNAL_TYPE_CANOPEN
elif names[0] != sig.name:
sig_type = v4c.SIGNAL_TYPE_STRUCTURE_COMPOSITION
else:
sig_type = v4c.SIGNAL_TYPE_ARRAY
gp_sig_types.append(sig_type)
# first add the signals in the simple signal list
if sig_type == v4c.SIGNAL_TYPE_SCALAR:
# compute additional byte offset for large records size
s_type, s_size = fmt_to_datatype_v4(sig_dtype, sig_shape)
byte_size = max(s_size // 8, 1)
if sig_dtype.kind == "u" and signal.bit_count <= 4:
s_size = signal.bit_count
if signal.stream_sync:
channel_type = v4c.CHANNEL_TYPE_SYNC
at_data, at_name = signal.attachment
attachment_addr = self.attach(
at_data, at_name, mime="video/avi", embedded=False
)
data_block_addr = attachment_addr
sync_type = v4c.SYNC_TYPE_TIME
else:
channel_type = v4c.CHANNEL_TYPE_VALUE
data_block_addr = 0
sync_type = v4c.SYNC_TYPE_NONE
kwargs = {
"channel_type": channel_type,
"sync_type": sync_type,
"bit_count": s_size,
"byte_offset": offset,
"bit_offset": 0,
"data_type": s_type,
"data_block_addr": data_block_addr,
"flags": 0,
}
if invalidation_bytes_nr and signal.invalidation_bits is not None:
inval_bits.append(signal.invalidation_bits)
kwargs["flags"] = v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = inval_cntr
inval_cntr += 1
ch = Channel(**kwargs)
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
ch.display_name = signal.display_name
# conversions for channel
if signal.raw:
ch.conversion = conversion_transfer(signal.conversion, version=4)
# source for channel
source = signal.source
if source:
if source in si_map:
ch.source = si_map[source]
else:
new_source = SourceInformation(
source_type=source.source_type, bus_type=source.bus_type
)
new_source.name = source.name
new_source.path = source.path
new_source.comment = source.comment
si_map[source] = new_source
ch.source = new_source
gp_channels.append(ch)
offset += byte_size
gp_sdata.append(None)
gp_sdata_size.append(0)
entry = (dg_cntr, ch_cntr)
self.channels_db.add(name, entry)
if ch.display_name:
self.channels_db.add(ch.display_name, entry)
# update the parents as well
field_name = field_names.get_unique_name(name)
parents[ch_cntr] = field_name, 0
fields.append(samples)
types.append((field_name, sig_dtype, sig_shape[1:]))
ch_cntr += 1
# simple channels don't have channel dependencies
gp_dep.append(None)
elif sig_type == v4c.SIGNAL_TYPE_CANOPEN:
field_name = field_names.get_unique_name(name)
if names == v4c.CANOPEN_TIME_FIELDS:
vals = signal.samples.tostring()
fields.append(frombuffer(vals, dtype="V6"))
types.append((field_name, "V6"))
byte_size = 6
s_type = v4c.DATA_TYPE_CANOPEN_TIME
else:
vals = []
for field in ("ms", "min", "hour", "day", "month", "year"):
if field == "hour":
vals.append(
signal.samples[field]
+ (signal.samples["summer_time"] << 7)
)
elif field == "day":
vals.append(
signal.samples[field]
+ (signal.samples["day_of_week"] << 4)
)
else:
vals.append(signal.samples[field])
vals = fromarrays(vals).tostring()
fields.append(frombuffer(vals, dtype="V7"))
types.append((field_name, "V7"))
byte_size = 7
s_type = v4c.DATA_TYPE_CANOPEN_DATE
s_size = byte_size << 3
# there is no channel dependency
gp_dep.append(None)
# add channel block
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VALUE,
"bit_count": s_size,
"byte_offset": offset,
"bit_offset": 0,
"data_type": s_type,
"flags": 0,
}
if invalidation_bytes_nr and signal.invalidation_bits is not None:
inval_bits.append(signal.invalidation_bits)
kwargs["flags"] |= v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = inval_cntr
inval_cntr += 1
ch = Channel(**kwargs)
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
ch.display_name = signal.display_name
# source for channel
source = signal.source
if source:
if source in si_map:
ch.source = si_map[source]
else:
new_source = SourceInformation(
source_type=source.source_type, bus_type=source.bus_type
)
new_source.name = source.name
new_source.path = source.path
new_source.comment = source.comment
si_map[source] = new_source
ch.source = new_source
gp_channels.append(ch)
offset += byte_size
entry = (dg_cntr, ch_cntr)
self.channels_db.add(name, entry)
if ch.display_name:
self.channels_db.add(ch.display_name, entry)
# update the parents as well
parents[ch_cntr] = field_name, 0
gp_sdata.append(0)
gp_sdata_size.append(0)
ch_cntr += 1
elif sig_type == v4c.SIGNAL_TYPE_STRUCTURE_COMPOSITION:
offset, dg_cntr, ch_cntr, struct_self, new_fields, new_types, inval_cntr = self._append_structure_composition(
gp,
signal,
field_names,
offset,
dg_cntr,
ch_cntr,
parents,
defined_texts,
invalidation_bytes_nr,
inval_bits,
inval_cntr,
)
fields.extend(new_fields)
types.extend(new_types)
elif sig_type == v4c.SIGNAL_TYPE_ARRAY:
# here we have channel arrays or mdf v3 channel dependencies
samples = signal.samples[names[0]]
shape = samples.shape[1:]
if len(shape) > 1:
# add channel dependency block for composed parent channel
dims_nr = len(shape)
names_nr = len(names)
if names_nr == 0:
kwargs = {
"dims": dims_nr,
"ca_type": v4c.CA_TYPE_LOOKUP,
"flags": v4c.FLAG_CA_FIXED_AXIS,
"byte_offset_base": samples.dtype.itemsize,
}
for i in range(dims_nr):
kwargs[f"dim_size_{i}"] = shape[i]
elif len(names) == 1:
kwargs = {
"dims": dims_nr,
"ca_type": v4c.CA_TYPE_ARRAY,
"flags": 0,
"byte_offset_base": samples.dtype.itemsize,
}
for i in range(dims_nr):
kwargs[f"dim_size_{i}"] = shape[i]
else:
kwargs = {
"dims": dims_nr,
"ca_type": v4c.CA_TYPE_LOOKUP,
"flags": v4c.FLAG_CA_AXIS,
"byte_offset_base": samples.dtype.itemsize,
}
for i in range(dims_nr):
kwargs[f"dim_size_{i}"] = shape[i]
parent_dep = ChannelArrayBlock(**kwargs)
gp_dep.append([parent_dep])
else:
# add channel dependency block for composed parent channel
kwargs = {
"dims": 1,
"ca_type": v4c.CA_TYPE_SCALE_AXIS,
"flags": 0,
"byte_offset_base": samples.dtype.itemsize,
"dim_size_0": shape[0],
}
parent_dep = ChannelArrayBlock(**kwargs)
gp_dep.append([parent_dep])
field_name = field_names.get_unique_name(name)
fields.append(samples)
dtype_pair = field_name, samples.dtype, shape
types.append(dtype_pair)
# first we add the structure channel
s_type, s_size = fmt_to_datatype_v4(samples.dtype, samples.shape, True)
# add channel block
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VALUE,
"bit_count": s_size,
"byte_offset": offset,
"bit_offset": 0,
"data_type": s_type,
"flags": 0,
}
if invalidation_bytes_nr:
if signal.invalidation_bits is not None:
inval_bits.append(signal.invalidation_bits)
kwargs["flags"] |= v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = inval_cntr
inval_cntr += 1
ch = Channel(**kwargs)
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
ch.display_name = signal.display_name
# source for channel
source = signal.source
if source:
if source in si_map:
ch.source = si_map[source]
else:
new_source = SourceInformation(
source_type=source.source_type, bus_type=source.bus_type
)
new_source.name = source.name
new_source.path = source.path
new_source.comment = source.comment
si_map[source] = new_source
ch.source = new_source
gp_channels.append(ch)
size = s_size >> 3
for dim in shape:
size *= dim
offset += size
gp_sdata.append(None)
gp_sdata_size.append(0)
entry = (dg_cntr, ch_cntr)
self.channels_db.add(name, entry)
if ch.display_name:
self.channels_db.add(ch.display_name, entry)
# update the parents as well
parents[ch_cntr] = name, 0
ch_cntr += 1
for name in names[1:]:
field_name = field_names.get_unique_name(name)
samples = signal.samples[name]
shape = samples.shape[1:]
fields.append(samples)
types.append((field_name, samples.dtype, shape))
# add channel dependency block
kwargs = {
"dims": 1,
"ca_type": v4c.CA_TYPE_SCALE_AXIS,
"flags": 0,
"byte_offset_base": samples.dtype.itemsize,
"dim_size_0": shape[0],
}
dep = ChannelArrayBlock(**kwargs)
gp_dep.append([dep])
# add components channel
s_type, s_size = fmt_to_datatype_v4(samples.dtype, ())
byte_size = max(s_size // 8, 1)
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VALUE,
"bit_count": s_size,
"byte_offset": offset,
"bit_offset": 0,
"data_type": s_type,
"flags": 0,
}
if invalidation_bytes_nr:
if signal.invalidation_bits is not None:
inval_bits.append(signal.invalidation_bits)
kwargs["flags"] |= v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = inval_cntr
inval_cntr += 1
ch = Channel(**kwargs)
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
ch.display_name = signal.display_name
gp_channels.append(ch)
entry = dg_cntr, ch_cntr
parent_dep.referenced_channels.append(entry)
for dim in shape:
byte_size *= dim
offset += byte_size
gp_sdata.append(None)
gp_sdata_size.append(0)
self.channels_db.add(name, entry)
# update the parents as well
parents[ch_cntr] = field_name, 0
ch_cntr += 1
else:
encoding = signal.encoding
samples = signal.samples
sig_dtype = samples.dtype
if encoding == "utf-8":
data_type = v4c.DATA_TYPE_STRING_UTF_8
elif encoding == "latin-1":
data_type = v4c.DATA_TYPE_STRING_LATIN_1
elif encoding == "utf-16-be":
data_type = v4c.DATA_TYPE_STRING_UTF_16_BE
elif encoding == "utf-16-le":
data_type = v4c.DATA_TYPE_STRING_UTF_16_LE
else:
raise MdfException(f'wrong encoding "{encoding}" for string signal')
offsets = arange(len(samples), dtype=uint64) * (
signal.samples.itemsize + 4
)
values = [full(len(samples), samples.itemsize, dtype=uint32), samples]
types_ = [("o", uint32), ("s", sig_dtype)]
data = fromarrays(values, dtype=types_)
data_size = len(data) * data.itemsize
if data_size:
data_addr = tell()
gp_sdata.append([data_addr])
gp_sdata_size.append([data_size])
data.tofile(file)
else:
data_addr = 0
gp_sdata.append([])
gp_sdata_size.append([])
# compute additional byte offset for large records size
byte_size = 8
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VLSD,
"bit_count": 64,
"byte_offset": offset,
"bit_offset": 0,
"data_type": data_type,
"data_block_addr": data_addr,
"flags": 0,
}
if invalidation_bytes_nr:
if signal.invalidation_bits is not None:
inval_bits.append(signal.invalidation_bits)
kwargs["flags"] |= v4c.FLAG_CN_INVALIDATION_PRESENT
kwargs["pos_invalidation_bit"] = inval_cntr
inval_cntr += 1
ch = Channel(**kwargs)
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
ch.display_name = signal.display_name
# conversions for channel
conversion = conversion_transfer(signal.conversion, version=4)
if signal.raw:
ch.conversion = conversion
# source for channel
source = signal.source
if source:
if source in si_map:
ch.source = si_map[source]
else:
new_source = SourceInformation(
source_type=source.source_type, bus_type=source.bus_type
)
new_source.name = source.name
new_source.path = source.path
new_source.comment = source.comment
si_map[source] = new_source
ch.source = new_source
gp_channels.append(ch)
offset += byte_size
entry = (dg_cntr, ch_cntr)
self.channels_db.add(name, entry)
if ch.display_name:
self.channels_db.add(ch.display_name, entry)
# update the parents as well
field_name = field_names.get_unique_name(name)
parents[ch_cntr] = field_name, 0
fields.append(offsets)
types.append((field_name, uint64))
ch_cntr += 1
# simple channels don't have channel dependencies
gp_dep.append(None)
if invalidation_bytes_nr:
invalidation_bytes_nr = len(inval_bits)
for _ in range(8 - invalidation_bytes_nr % 8):
inval_bits.append(zeros(cycles_nr, dtype=bool))
inval_bits.reverse()
invalidation_bytes_nr = len(inval_bits) // 8
gp.channel_group.invalidation_bytes_nr = invalidation_bytes_nr
inval_bits = fliplr(
packbits(array(inval_bits).T).reshape(
(cycles_nr, invalidation_bytes_nr)
)
)
fields.append(inval_bits)
types.append(("invalidation_bytes", inval_bits.dtype, inval_bits.shape[1:]))
gp.channel_group.cycles_nr = cycles_nr
gp.channel_group.samples_byte_nr = offset
gp.size = cycles_nr * (offset + invalidation_bytes_nr)
# data group
gp.data_group = DataGroup()
# data block
types = dtype(types)
gp.sorted = True
gp.types = types
gp.parents = parents
if signals:
samples = fromarrays(fields, dtype=types)
else:
samples = array([])
signals = None
del signals
size = len(samples) * samples.itemsize
if size:
data_address = self._tempfile.tell()
gp.data_location = v4c.LOCATION_TEMPORARY_FILE
samples.tofile(self._tempfile)
dim = size
block_type = v4c.DT_BLOCK
gp.data_block_type = block_type
gp.param = 0
gp.data_size = [size]
gp.data_block_size = [dim]
gp.data_block_addr = [data_address]
gp.data_block = [data_address]
else:
gp.data_location = v4c.LOCATION_TEMPORARY_FILE
gp.data_block = []
gp.data_block_type = v4c.DT_BLOCK
gp.param = 0
gp.data_size = []
gp.data_block_size = []
gp.data_block_addr = []
def _append_dataframe(self, df, source_info="", units=None):
"""
Appends a new data group from a Pandas data frame.
"""
units = units or {}
t = df.index
index_name = df.index.name
time_name = index_name or "time"
sync_type = v4c.SYNC_TYPE_TIME
time_unit = "s"
dg_cntr = len(self.groups)
gp = Group(None)
gp.signal_data = gp_sdata = []
gp.signal_data_size = gp_sdata_size = []
gp.channels = gp_channels = []
gp.channel_dependencies = gp_dep = []
gp.signal_types = gp_sig_types = []
gp.logging_channels = []
# channel group
kwargs = {"cycles_nr": 0, "samples_byte_nr": 0}
gp.channel_group = ChannelGroup(**kwargs)
gp.channel_group.acq_name = source_info
invalidation_bytes_nr = 0
self.groups.append(gp)
cycles_nr = len(t)
fields = []
types = []
parents = {}
ch_cntr = 0
offset = 0
field_names = UniqueDB()
# setup all blocks related to the time master channel
file = self._tempfile
tell = file.tell
seek = file.seek
seek(0, 2)
source_block = SourceInformation()
source_block.name = source_block.path = source_info
if df.shape[0]:
# time channel
t_type, t_size = fmt_to_datatype_v4(t.dtype, t.shape)
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_MASTER,
"data_type": t_type,
"sync_type": sync_type,
"byte_offset": 0,
"bit_offset": 0,
"bit_count": t_size,
"min_raw_value": t[0] if cycles_nr else 0,
"max_raw_value": t[-1] if cycles_nr else 0,
"lower_limit": t[0] if cycles_nr else 0,
"upper_limit": t[-1] if cycles_nr else 0,
"flags": v4c.FLAG_PHY_RANGE_OK | v4c.FLAG_VAL_RANGE_OK,
}
ch = Channel(**kwargs)
ch.unit = time_unit
ch.name = time_name
ch.source = source_block
name = time_name
gp_channels.append(ch)
gp_sdata.append(None)
gp_sdata_size.append(0)
self.channels_db.add(name, (dg_cntr, ch_cntr))
self.masters_db[dg_cntr] = 0
# data group record parents
parents[ch_cntr] = name, 0
# time channel doesn't have channel dependencies
gp_dep.append(None)
fields.append(t)
types.append((name, t.dtype))
field_names.get_unique_name(name)
offset += t_size // 8
ch_cntr += 1
gp_sig_types.append(0)
for signal in df:
if index_name == signal:
continue
sig = df[signal]
name = signal
sig_type = v4c.SIGNAL_TYPE_SCALAR
if sig.dtype.kind in "SV":
sig_type = v4c.SIGNAL_TYPE_STRING
gp_sig_types.append(sig_type)
# first add the signals in the simple signal list
if sig_type == v4c.SIGNAL_TYPE_SCALAR:
# compute additional byte offset for large records size
s_type, s_size = fmt_to_datatype_v4(sig.dtype, sig.shape)
byte_size = max(s_size // 8, 1)
channel_type = v4c.CHANNEL_TYPE_VALUE
data_block_addr = 0
sync_type = v4c.SYNC_TYPE_NONE
kwargs = {
"channel_type": channel_type,
"sync_type": sync_type,
"bit_count": s_size,
"byte_offset": offset,
"bit_offset": 0,
"data_type": s_type,
"data_block_addr": data_block_addr,
}
ch = Channel(**kwargs)
ch.name = name
ch.unit = units.get(name, "")
gp_channels.append(ch)
offset += byte_size
gp_sdata.append(None)
gp_sdata_size.append(0)
self.channels_db.add(name, (dg_cntr, ch_cntr))
# update the parents as well
field_name = field_names.get_unique_name(name)
parents[ch_cntr] = field_name, 0
fields.append(sig)
types.append((field_name, sig.dtype, sig.shape[1:]))
ch_cntr += 1
# simple channels don't have channel dependencies
gp_dep.append(None)
elif sig_type == v4c.SIGNAL_TYPE_STRING:
offsets = arange(len(sig), dtype=uint64) * (sig.itemsize + 4)
values = [full(len(signal), sig.itemsize, dtype=uint32), sig]
types_ = [("", uint32), ("", sig.dtype)]
data = fromarrays(values, dtype=types_)
data_size = len(data) * data.itemsize
if data_size:
data_addr = tell()
gp_sdata.append([data_addr])
gp_sdata_size.append([data_size])
data.tofile(file)
else:
data_addr = 0
gp_sdata.append([])
gp_sdata_size.append([])
# compute additional byte offset for large records size
byte_size = 8
kwargs = {
"channel_type": v4c.CHANNEL_TYPE_VLSD,
"bit_count": 64,
"byte_offset": offset,
"bit_offset": 0,
"data_type": v4c.DATA_TYPE_STRING_UTF_8,
"min_raw_value": 0,
"max_raw_value": 0,
"lower_limit": 0,
"upper_limit": 0,
"flags": 0,
"data_block_addr": data_addr,
}
ch = Channel(**kwargs)
ch.name = name
ch.unit = units.get(name, "")
gp_channels.append(ch)
offset += byte_size
self.channels_db.add(name, (dg_cntr, ch_cntr))
# update the parents as well
field_name = field_names.field_names.get_unique_name(name)
parents[ch_cntr] = field_name, 0
fields.append(offsets)
types.append((field_name, uint64))
ch_cntr += 1
# simple channels don't have channel dependencies
gp_dep.append(None)
gp.channel_group.cycles_nr = cycles_nr
gp.channel_group.samples_byte_nr = offset
gp.size = cycles_nr * (offset + invalidation_bytes_nr)
# data group
gp.data_group = DataGroup()
# data block
types = dtype(types)
gp.sorted = True
gp.types = types
gp.parents = parents
if df.shape[0]:
samples = fromarrays(fields, dtype=types)
else:
samples = array([])
size = len(samples) * samples.itemsize
if size:
data_address = self._tempfile.tell()
gp.data_location = v4c.LOCATION_TEMPORARY_FILE
samples.tofile(self._tempfile)
dim = size
block_type = v4c.DT_BLOCK
gp.data_block_type = block_type
gp.param = 0
gp.data_size = [size]
gp.data_block_size = [dim]
gp.data_block_addr = [data_address]
gp.data_block = [data_address]
else:
gp.data_location = v4c.LOCATION_TEMPORARY_FILE
gp.data_block = []
gp.data_block_type = v4c.DT_BLOCK
gp.param = 0
gp.data_size = []
gp.data_block_size = []
gp.data_block_addr = []
[docs] def extend(self, index, signals):
"""
Extend a group with new samples. *signals* contains (values, invalidation_bits)
pairs for each extended signal. The first pair is the master channel's pair, and the
next pairs must respect the same order in which the signals were appended. The samples must have raw
or physical values according to the *Signals* used for the initial append.
Parameters
----------
index : int
group index
signals : list
list on (numpy.ndarray, numpy.ndarray) objects
Examples
--------
>>> # case 1 conversion type None
>>> s1 = np.array([1, 2, 3, 4, 5])
>>> s2 = np.array([-1, -2, -3, -4, -5])
>>> s3 = np.array([0.1, 0.04, 0.09, 0.16, 0.25])
>>> t = np.array([0.001, 0.002, 0.003, 0.004, 0.005])
>>> names = ['Positive', 'Negative', 'Float']
>>> units = ['+', '-', '.f']
>>> s1 = Signal(samples=s1, timstamps=t, unit='+', name='Positive')
>>> s2 = Signal(samples=s2, timstamps=t, unit='-', name='Negative')
>>> s3 = Signal(samples=s3, timstamps=t, unit='flts', name='Floats')
>>> mdf = MDF3('new.mdf')
>>> mdf.append([s1, s2, s3], 'created by asammdf v1.1.0')
>>> t = np.array([0.006, 0.007, 0.008, 0.009, 0.010])
>>> # extend without invalidation bits
>>> mdf2.extend(0, [(t, None), (s1, None), (s2, None), (s3, None)])
>>> # some invaldiation btis
>>> s1_inv = np.array([0,0,0,1,1], dtype=np.bool)
>>> mdf2.extend(0, [(t, None), (s1.samples, None), (s2.samples, None), (s3.samples, None)])
"""
gp = self.groups[index]
if not signals:
message = '"append" requires a non-empty list of Signal objects'
raise MdfException(message)
stream = self._tempfile
fields = []
types = []
inval_bits = []
invalidation_bytes_nr = gp.channel_group.invalidation_bytes_nr
for i, ((signal, invalidation_bits), sig_type) in enumerate(
zip(signals, gp.signal_types)
):
# first add the signals in the simple signal list
if sig_type == v4c.SIGNAL_TYPE_SCALAR:
fields.append(signal)
types.append(("", signal.dtype, signal.shape[1:]))
if invalidation_bytes_nr and invalidation_bits is not None:
inval_bits.append(invalidation_bits)
elif sig_type == v4c.SIGNAL_TYPE_CANOPEN:
names = signal.dtype.names
if names == v4c.CANOPEN_TIME_FIELDS:
vals = signal.tostring()
fields.append(frombuffer(vals, dtype="V6"))
types.append(("", "V6"))
else:
vals = []
for field in ("ms", "min", "hour", "day", "month", "year"):
vals.append(signal[field])
vals = fromarrays(vals).tostring()
fields.append(frombuffer(vals, dtype="V7"))
types.append(("", "V7"))
if invalidation_bytes_nr and invalidation_bits is not None:
inval_bits.append(invalidation_bits)
elif sig_type == v4c.SIGNAL_TYPE_STRUCTURE_COMPOSITION:
if invalidation_bytes_nr and invalidation_bits is not None:
inval_bits.append(invalidation_bits)
fields.append(signal)
types.append(("", signal.dtype))
elif sig_type == v4c.SIGNAL_TYPE_ARRAY:
names = signal.dtype.names
samples = signal[names[0]]
shape = samples.shape[1:]
fields.append(samples)
types.append(("", samples.dtype, shape))
if invalidation_bytes_nr and invalidation_bits is not None:
inval_bits.append(invalidation_bits)
for name in names[1:]:
samples = signal[name]
shape = samples.shape[1:]
fields.append(samples)
types.append(("", samples.dtype, shape))
if invalidation_bytes_nr and invalidation_bits is not None:
inval_bits.append(invalidation_bits)
else:
cur_offset = sum(gp.signal_data_size[i])
offsets = (
arange(len(signal), dtype=uint64) * (signal.itemsize + 4)
+ cur_offset
)
values = [full(len(signal), signal.itemsize, dtype=uint32), signal]
types_ = [("", uint32), ("", signal.dtype)]
values = fromarrays(values, dtype=types_)
stream.seek(0, 2)
addr = stream.tell()
block_size = len(values) * values.itemsize
if block_size:
values.tofile(stream)
gp.signal_data[i].append(addr)
gp.signal_data_size[i].append(block_size)
fields.append(offsets)
types.append(("", uint64))
if invalidation_bytes_nr and invalidation_bits is not None:
inval_bits.append(invalidation_bits)
if invalidation_bytes_nr:
invalidation_bytes_nr = len(inval_bits)
cycles_nr = len(inval_bits[0])
for _ in range(8 - invalidation_bytes_nr % 8):
inval_bits.append(zeros(cycles_nr, dtype=bool))
inval_bits.reverse()
invalidation_bytes_nr = len(inval_bits) // 8
gp.channel_group.invalidation_bytes_nr = invalidation_bytes_nr
inval_bits = fliplr(
packbits(array(inval_bits).T).reshape(
(cycles_nr, invalidation_bytes_nr)
)
)
fields.append(inval_bits)
types.append(("invalidation_bytes", inval_bits.dtype, inval_bits.shape[1:]))
samples = fromarrays(fields, dtype=types)
del fields
del types
stream.seek(0, 2)
addr = stream.tell()
size = len(samples) * samples.itemsize
if size:
gp.data_block.append(addr)
samples.tofile(stream)
dim = size
record_size = gp.channel_group.samples_byte_nr
record_size += gp.data_group.record_id_len
record_size += gp.channel_group.invalidation_bytes_nr
added_cycles = size // record_size
gp.channel_group.cycles_nr += added_cycles
gp.data_block_addr.append(addr)
gp.data_size.append(size)
gp.data_block_size.append(dim)
del samples
[docs] def attach(
self,
data,
file_name=None,
comment=None,
compression=True,
mime=r"application/octet-stream",
embedded=True,
):
""" attach embedded attachment as application/octet-stream
Parameters
----------
data : bytes
data to be attached
file_name : str
string file name
comment : str
attachment comment
compression : bool
use compression for embedded attachment data
mime : str
mime type string
embedded : bool
attachment is embedded in the file
Returns
-------
index : int
new attachment index
"""
if data in self._attachments_cache:
return self._attachments_cache[data]
else:
creator_index = len(self.file_history)
fh = FileHistory()
fh.comment = """<FHcomment>
<TX>Added new embedded attachment from {}</TX>
<tool_id>asammdf</tool_id>
<tool_vendor>asammdf</tool_vendor>
<tool_version>{}</tool_version>
</FHcomment>""".format(
file_name if file_name else "bin.bin", __version__
)
self.file_history.append(fh)
file_name = file_name or "bin.bin"
at_block = AttachmentBlock(
data=data,
compression=compression,
embedded=embedded,
file_name=file_name,
)
at_block["creator_index"] = creator_index
index = v4c.MAX_UINT64 - 1
while index in self._attachments_map:
index -= 1
self.attachments.append(at_block)
at_block.mime = mime
at_block.comment = comment
self._attachments_cache[data] = index
self._attachments_map[index] = len(self.attachments) - 1
return index
[docs] def close(self):
""" if the MDF was created with memory=False and new
channels have been appended, then this must be called just before the
object is not used anymore to clean-up the temporary file"""
if self._tempfile is not None:
self._tempfile.close()
if self._file is not None:
self._file.close()
[docs] def get(
self,
name=None,
group=None,
index=None,
raster=None,
samples_only=False,
data=None,
raw=False,
ignore_invalidation_bits=False,
source=None,
record_offset=0,
record_count=None,
copy_master=True,
):
"""Gets channel samples. The raw data group samples are not loaded to
memory so it is advised to use ``filter`` or ``select`` instead of
performing several ``get`` calls.
Channel can be specified in two ways:
* using the first positional argument *name*
* if *source* is given this will be first used to validate the
channel selection
* if there are multiple occurances for this channel then the
*group* and *index* arguments can be used to select a specific
group.
* if there are multiple occurances for this channel and either the
*group* or *index* arguments is None then a warning is issued
* using the group number (keyword argument *group*) and the channel
number (keyword argument *index*). Use *info* method for group and
channel numbers
If the *raster* keyword argument is not *None* the output is
interpolated accordingly
Parameters
----------
name : string
name of channel
group : int
0-based group index
index : int
0-based channel index
raster : float
time raster in seconds
samples_only : bool
if *True* return only the channel samples as numpy array; if
*False* return a *Signal* object
data : bytes
prevent redundant data read by providing the raw data group samples
raw : bool
return channel samples without appling the conversion rule; default
`False`
ignore_invalidation_bits : bool
option to ignore invalidation bits
source : str
source name used to select the channel
record_offset : int
if *data=None* use this to select the record offset from which the
group data should be loaded
record_count : int
number of records to read; default *None* and in this case all
available records are used
copy_master : bool
make a copy of the timebase for this channel
Returns
-------
res : (numpy.array, numpy.array) | Signal
returns *Signal* if *samples_only*=*False* (default option),
otherwise returns a (numpy.array, numpy.array) tuple of samples and
invalidation bits. If invalidation bits are not used or if
*ignore_invalidation_bits* if False, then the second item will be
None.
The *Signal* samples are:
* numpy recarray for channels that have composition/channel
array address or for channel of type
CANOPENDATE, CANOPENTIME
* numpy array for all the rest
Raises
------
MdfException :
* if the channel name is not found
* if the group index is out of range
* if the channel index is out of range
Examples
--------
>>> from asammdf import MDF, Signal
>>> import numpy as np
>>> t = np.arange(5)
>>> s = np.ones(5)
>>> mdf = MDF(version='4.10')
>>> for i in range(4):
... sigs = [Signal(s*(i*10+j), t, name='Sig') for j in range(1, 4)]
... mdf.append(sigs)
...
>>> # first group and channel index of the specified channel name
...
>>> mdf.get('Sig')
UserWarning: Multiple occurances for channel "Sig". Using first occurance from data group 4. Provide both "group" and "index" arguments to select another data group
<Signal Sig:
samples=[ 1. 1. 1. 1. 1.]
timestamps=[0 1 2 3 4]
unit=""
info=None
comment="">
>>> # first channel index in the specified group
...
>>> mdf.get('Sig', 1)
<Signal Sig:
samples=[ 11. 11. 11. 11. 11.]
timestamps=[0 1 2 3 4]
unit=""
info=None
comment="">
>>> # channel named Sig from group 1 channel index 2
...
>>> mdf.get('Sig', 1, 2)
<Signal Sig:
samples=[ 12. 12. 12. 12. 12.]
timestamps=[0 1 2 3 4]
unit=""
info=None
comment="">
>>> # channel index 1 or group 2
...
>>> mdf.get(None, 2, 1)
<Signal Sig:
samples=[ 21. 21. 21. 21. 21.]
timestamps=[0 1 2 3 4]
unit=""
info=None
comment="">
>>> mdf.get(group=2, index=1)
<Signal Sig:
samples=[ 21. 21. 21. 21. 21.]
timestamps=[0 1 2 3 4]
unit=""
info=None
comment="">
>>> # validation using source name
...
>>> mdf.get('Sig', source='VN7060')
<Signal Sig:
samples=[ 12. 12. 12. 12. 12.]
timestamps=[0 1 2 3 4]
unit=""
info=None
comment="">
"""
gp_nr, ch_nr = self._validate_channel_selection(
name, group, index, source=source
)
grp = self.groups[gp_nr]
interp_mode = self._integer_interpolation
if ch_nr >= 0:
# get the channel object
channel = grp.channels[ch_nr]
dependency_list = grp.channel_dependencies[ch_nr]
# get data group record
parents, dtypes = grp.parents, grp.types
if parents is None:
parents, dtypes = self._prepare_record(grp)
# get group data
if data is None:
data = self._load_data(
grp, record_offset=record_offset, record_count=record_count
)
else:
data = (data,)
channel_invalidation_present = (
channel.flags
& (v4c.FLAG_INVALIDATION_BIT_VALID | v4c.FLAG_ALL_SAMPLES_VALID)
== v4c.FLAG_INVALIDATION_BIT_VALID
)
bit_count = channel.bit_count
else:
# get data group record
parents, dtypes = self._prepare_record(grp)
parent, bit_offset = parents[ch_nr]
channel_invalidation_present = False
dependency_list = None
channel = grp.logging_channels[-ch_nr - 1]
# get group data
if data is None:
data = self._load_data(
grp, record_offset=record_offset, record_count=record_count
)
else:
data = (data,)
bit_count = channel.bit_count
data_type = channel.data_type
channel_type = channel.channel_type
stream_sync = channel_type == v4c.CHANNEL_TYPE_SYNC
encoding = None
master_is_required = not samples_only or raster
# check if this is a channel array
if dependency_list:
if not isinstance(dependency_list[0], ChannelArrayBlock):
# structure channel composition
_dtype = dtype(channel.dtype_fmt)
if _dtype.itemsize == bit_count // 8:
fast_path = True
channel_values = []
timestamps = []
invalidation_bits = []
byte_offset = channel.byte_offset
record_size = (
grp.channel_group.samples_byte_nr
+ grp.channel_group.invalidation_bytes_nr
)
count = 0
for fragment in data:
bts = fragment[0]
types = [
("", f"V{byte_offset}"),
("vals", _dtype),
("", f"V{record_size - _dtype.itemsize - byte_offset}"),
]
channel_values.append(fromstring(bts, types)["vals"].copy())
if master_is_required:
timestamps.append(
self.get_master(
gp_nr, fragment, copy_master=copy_master
)
)
if channel_invalidation_present:
invalidation_bits.append(
self.get_invalidation_bits(gp_nr, channel, fragment)
)
count += 1
else:
fast_path = False
names = [grp.channels[ch_nr].name for _, ch_nr in dependency_list]
channel_values = [[] for _ in dependency_list]
timestamps = []
invalidation_bits = []
count = 0
for fragment in data:
for i, (dg_nr, ch_nr) in enumerate(dependency_list):
vals = self.get(
group=dg_nr,
index=ch_nr,
samples_only=True,
raw=raw,
data=fragment,
ignore_invalidation_bits=ignore_invalidation_bits,
record_offset=record_offset,
record_count=record_count,
)[0]
channel_values[i].append(vals)
if master_is_required:
timestamps.append(
self.get_master(
gp_nr, fragment, copy_master=copy_master
)
)
if channel_invalidation_present:
invalidation_bits.append(
self.get_invalidation_bits(gp_nr, channel, fragment)
)
count += 1
if fast_path:
if count > 1:
vals = concatenate(channel_values)
else:
vals = channel_values[0]
else:
if count > 1:
arrays = [concatenate(lst) for lst in channel_values]
else:
arrays = [lst[0] for lst in channel_values]
types = [
(name_, arr.dtype, arr.shape[1:])
for name_, arr in zip(names, arrays)
]
types = dtype(types)
vals = fromarrays(arrays, dtype=types)
if master_is_required:
if count > 1:
timestamps = concatenate(timestamps)
else:
timestamps = timestamps[0]
if channel_invalidation_present:
if count > 1:
invalidation_bits = concatenate(invalidation_bits)
else:
invalidation_bits = invalidation_bits[0]
if not ignore_invalidation_bits:
vals = vals[nonzero(~invalidation_bits)[0]]
if master_is_required:
timestamps = timestamps[nonzero(~invalidation_bits)[0]]
if raster and len(timestamps) > 1:
t = arange(timestamps[0], timestamps[-1], raster)
vals = (
Signal(vals, timestamps, name="_")
.interp(t, interpolation_mode=interp_mode)
)
vals, timestamps, invalidation_bits = (
vals.samples,
vals.timestamps,
vals.invalidation_bits,
)
else:
# channel arrays
channel_group = grp.channel_group
samples_size = (
channel_group.samples_byte_nr + channel_group.invalidation_bytes_nr
)
channel_values = []
timestamps = []
invalidation_bits = []
count = 0
for fragment in data:
data_bytes, offset, _count = fragment
cycles = len(data_bytes) // samples_size
arrays = []
types = []
try:
parent, bit_offset = parents[ch_nr]
except KeyError:
parent, bit_offset = None, None
if parent is not None:
if grp.record is None:
dtypes = grp.types
if dtypes.itemsize:
record = fromstring(data_bytes, dtype=dtypes)
else:
record = None
else:
record = grp.record
vals = record[parent]
else:
vals = self._get_not_byte_aligned_data(data_bytes, grp, ch_nr)
vals = vals.copy()
dep = dependency_list[0]
if dep.flags & v4c.FLAG_CA_INVERSE_LAYOUT:
shape = vals.shape
shape = (shape[0],) + shape[1:][::-1]
vals = vals.reshape(shape)
axes = (0,) + tuple(range(len(shape) - 1, 0, -1))
vals = transpose(vals, axes=axes)
cycles_nr = len(vals)
for ca_block in dependency_list[:1]:
dims_nr = ca_block.dims
if ca_block.ca_type == v4c.CA_TYPE_SCALE_AXIS:
shape = (ca_block.dim_size_0,)
arrays.append(vals)
dtype_pair = channel.name, vals.dtype, shape
types.append(dtype_pair)
elif ca_block.ca_type == v4c.CA_TYPE_LOOKUP:
shape = vals.shape[1:]
arrays.append(vals)
dtype_pair = channel.name, vals.dtype, shape
types.append(dtype_pair)
if ca_block.flags & v4c.FLAG_CA_FIXED_AXIS:
for i in range(dims_nr):
shape = (ca_block[f"dim_size_{i}"],)
axis = []
for j in range(shape[0]):
key = f"axis_{i}_value_{j}"
axis.append(ca_block[key])
axis = array([axis for _ in range(cycles_nr)])
arrays.append(axis)
dtype_pair = (f"axis_{i}", axis.dtype, shape)
types.append(dtype_pair)
else:
for i in range(dims_nr):
try:
ref_dg_nr, ref_ch_nr = ca_block.referenced_channels[
i
]
except:
debug_channel(
self, grp, channel, dependency_list
)
raise
axisname = (
self.groups[ref_dg_nr].channels[ref_ch_nr].name
)
shape = (ca_block[f"dim_size_{i}"],)
if ref_dg_nr == gp_nr:
axis_values = self.get(
group=ref_dg_nr,
index=ref_ch_nr,
samples_only=True,
data=fragment,
ignore_invalidation_bits=ignore_invalidation_bits,
record_offset=record_offset,
record_count=cycles,
)[0]
else:
channel_group = grp.channel_group
record_size = channel_group.samples_byte_nr
record_size += (
channel_group.invalidation_bytes_nr
)
start = offset // record_size
end = start + len(data_bytes) // record_size + 1
ref = self.get(
group=ref_dg_nr,
index=ref_ch_nr,
samples_only=True,
ignore_invalidation_bits=ignore_invalidation_bits,
record_offset=record_offset,
record_count=cycles,
)[0]
axis_values = ref[start:end].copy()
axis_values = axis_values[axisname]
arrays.append(axis_values)
dtype_pair = (axisname, axis_values.dtype, shape)
types.append(dtype_pair)
elif ca_block.ca_type == v4c.CA_TYPE_ARRAY:
shape = vals.shape[1:]
arrays.append(vals)
dtype_pair = channel.name, vals.dtype, shape
types.append(dtype_pair)
for ca_block in dependency_list[1:]:
dims_nr = ca_block.dims
if ca_block.flags & v4c.FLAG_CA_FIXED_AXIS:
for i in range(dims_nr):
shape = (ca_block[f"dim_size_{i}"],)
axis = []
for j in range(shape[0]):
key = f"axis_{i}_value_{j}"
axis.append(ca_block[key])
axis = array([axis for _ in range(cycles_nr)])
arrays.append(axis)
types.append((f"axis_{i}", axis.dtype, shape))
else:
for i in range(dims_nr):
ref_dg_nr, ref_ch_nr = ca_block.referenced_channels[i]
axisname = (
self.groups[ref_dg_nr].channels[ref_ch_nr].name
)
shape = (ca_block[f"dim_size_{i}"],)
if ref_dg_nr == gp_nr:
axis_values = self.get(
group=ref_dg_nr,
index=ref_ch_nr,
samples_only=True,
data=fragment,
ignore_invalidation_bits=ignore_invalidation_bits,
record_offset=record_offset,
record_count=cycles,
)[0]
else:
channel_group = grp.channel_group
record_size = channel_group.samples_byte_nr
record_size += channel_group.invalidation_bytes_nr
start = offset // record_size
end = start + len(data_bytes) // record_size + 1
ref = self.get(
group=ref_dg_nr,
index=ref_ch_nr,
samples_only=True,
ignore_invalidation_bits=ignore_invalidation_bits,
record_offset=record_offset,
record_count=cycles,
)[0]
axis_values = ref[start:end].copy()
axis_values = axis_values[axisname]
arrays.append(axis_values)
dtype_pair = axisname, axis_values.dtype, shape
types.append(dtype_pair)
vals = fromarrays(arrays, dtype(types))
if master_is_required:
timestamps.append(
self.get_master(gp_nr, fragment, copy_master=copy_master)
)
if channel_invalidation_present:
invalidation_bits.append(
self.get_invalidation_bits(gp_nr, channel, fragment)
)
channel_values.append(vals)
count += 1
if count > 1:
vals = concatenate(channel_values)
elif count == 1:
vals = channel_values[0]
else:
vals = []
if master_is_required:
if count > 1:
timestamps = concatenate(timestamps)
else:
timestamps = timestamps[0]
if channel_invalidation_present:
if count > 1:
invalidation_bits = concatenate(invalidation_bits)
else:
invalidation_bits = invalidation_bits[0]
if not ignore_invalidation_bits:
vals = vals[nonzero(~invalidation_bits)[0]]
if master_is_required:
timestamps = timestamps[nonzero(~invalidation_bits)[0]]
if raster and len(timestamps) > 1:
t = arange(timestamps[0], timestamps[-1], raster)
vals = (
Signal(vals, timestamps, name="_")
.interp(t, interpolation_mode=interp_mode)
)
vals, timestamps, invalidation_bits = (
vals.samples,
vals.timestamps,
vals.invalidation_bits,
)
conversion = channel.conversion
else:
# get channel values
if channel_type in {
v4c.CHANNEL_TYPE_VIRTUAL,
v4c.CHANNEL_TYPE_VIRTUAL_MASTER,
}:
if not channel.dtype_fmt:
channel.dtype_fmt = get_fmt_v4(data_type, 64)
ch_dtype = dtype(channel.dtype_fmt)
channel_values = []
timestamps = []
invalidation_bits = []
channel_group = grp.channel_group
record_size = channel_group.samples_byte_nr
record_size += channel_group.invalidation_bytes_nr
count = 0
for fragment in data:
data_bytes, offset, _count = fragment
offset = offset // record_size
vals = arange(len(data_bytes) // record_size, dtype=ch_dtype)
vals += offset
if master_is_required:
timestamps.append(
self.get_master(gp_nr, fragment, copy_master=copy_master)
)
if channel_invalidation_present:
invalidation_bits.append(
self.get_invalidation_bits(gp_nr, channel, fragment)
)
channel_values.append(vals)
count += 1
if count > 1:
vals = concatenate(channel_values)
elif count == 1:
vals = channel_values[0]
else:
vals = []
if master_is_required:
if count > 1:
timestamps = concatenate(timestamps)
else:
timestamps = timestamps[0]
if channel_invalidation_present:
if count > 1:
invalidation_bits = concatenate(invalidation_bits)
else:
invalidation_bits = invalidation_bits[0]
if not ignore_invalidation_bits:
vals = vals[nonzero(~invalidation_bits)[0]]
if master_is_required:
timestamps = timestamps[nonzero(~invalidation_bits)[0]]
if raster and len(timestamps) > 1:
num = float(float32((timestamps[-1] - timestamps[0]) / raster))
if num.is_integer():
t = linspace(timestamps[0], timestamps[-1], int(num))
else:
t = arange(timestamps[0], timestamps[-1], raster)
vals = (
Signal(vals, timestamps, name="_")
.interp(t, interpolation_mode=interp_mode)
)
vals, timestamps, invalidation_bits = (
vals.samples,
vals.timestamps,
vals.invalidation_bits,
)
else:
channel_values = []
timestamps = []
invalidation_bits = []
count = 0
for fragment in data:
data_bytes, offset, _count = fragment
try:
parent, bit_offset = parents[ch_nr]
except KeyError:
parent, bit_offset = None, None
if parent is not None:
if grp.record is None:
record = fromstring(data_bytes, dtype=dtypes)
else:
record = grp.record
vals = record[parent]
dtype_ = vals.dtype
shape_ = vals.shape
size = vals.dtype.itemsize
for dim in shape_[1:]:
size *= dim
kind_ = dtype_.kind
if kind_ == "b":
pass
elif len(shape_) > 1 and data_type != v4c.DATA_TYPE_BYTEARRAY:
vals = self._get_not_byte_aligned_data(
data_bytes, grp, ch_nr
)
elif kind_ not in "ui":
if bit_offset:
vals = self._get_not_byte_aligned_data(
data_bytes, grp, ch_nr
)
else:
if bit_count != size * 8:
if (
bit_count % 8 == 0
and size in (2, 4, 8)
and data_type <= 3
): # integer types
vals = vals.view(f"<u{size}")
if data_type in v4c.SIGNED_INT:
vals = as_non_byte_sized_signed_int(
vals, bit_count
)
else:
mask = (1 << bit_count) - 1
if vals.flags.writeable:
vals &= mask
else:
vals = vals & mask
else:
vals = self._get_not_byte_aligned_data(
data_bytes, grp, ch_nr
)
else:
if data_type <= 3:
if not channel.dtype_fmt:
channel.dtype_fmt = get_fmt_v4(data_type, bit_count, channel_type)
channel_dtype = dtype(channel.dtype_fmt.split(')')[-1])
vals = vals.view(channel_dtype)
else:
if data_type <= 3:
if dtype_.byteorder == ">":
if bit_offset or bit_count != size << 3:
vals = self._get_not_byte_aligned_data(
data_bytes, grp, ch_nr
)
else:
if bit_offset:
if kind_ == "i":
vals = vals.astype(
dtype(f"{dtype_.byteorder}u{size}")
)
vals >>= bit_offset
else:
vals = vals >> bit_offset
if bit_count != size << 3:
if data_type in v4c.SIGNED_INT:
vals = as_non_byte_sized_signed_int(
vals, bit_count
)
else:
mask = (1 << bit_count) - 1
if vals.flags.writeable:
vals &= mask
else:
vals = vals & mask
else:
if bit_count != size * 8:
vals = self._get_not_byte_aligned_data(
data_bytes, grp, ch_nr
)
else:
if not channel.dtype_fmt:
channel.dtype_fmt = get_fmt_v4(data_type, bit_count, channel_type)
channel_dtype = dtype(channel.dtype_fmt.split(')')[-1])
vals = vals.view(channel_dtype)
else:
vals = self._get_not_byte_aligned_data(data_bytes, grp, ch_nr)
if bit_count == 1 and self._single_bit_uint_as_bool:
vals = array(vals, dtype=bool)
else:
if not channel.dtype_fmt:
channel.dtype_fmt = get_fmt_v4(
data_type, bit_count, channel_type,
)
channel_dtype = dtype(channel.dtype_fmt.split(")")[-1])
if vals.dtype != channel_dtype:
vals = vals.astype(channel_dtype)
if master_is_required:
timestamps.append(
self.get_master(gp_nr, fragment, copy_master=copy_master)
)
if channel_invalidation_present:
invalidation_bits.append(
self.get_invalidation_bits(gp_nr, channel, fragment)
)
if vals.flags.writeable:
channel_values.append(vals)
else:
channel_values.append(vals.copy())
count += 1
if count > 1:
vals = concatenate(channel_values)
elif count == 1:
vals = channel_values[0]
else:
vals = []
if master_is_required:
if count > 1:
timestamps = concatenate(timestamps)
elif count == 1:
timestamps = timestamps[0]
else:
timestamps = []
if channel_invalidation_present:
if count > 1:
invalidation_bits = concatenate(invalidation_bits)
elif count == 1:
invalidation_bits = invalidation_bits[0]
else:
invalidation_bits = []
if not ignore_invalidation_bits:
vals = vals[nonzero(~invalidation_bits)[0]]
if master_is_required:
timestamps = timestamps[nonzero(~invalidation_bits)[0]]
if raster and len(timestamps) > 1:
num = float(float32((timestamps[-1] - timestamps[0]) / raster))
if num.is_integer():
t = linspace(timestamps[0], timestamps[-1], int(num))
else:
t = arange(timestamps[0], timestamps[-1], raster)
vals = (
Signal(vals, timestamps, name="_")
.interp(t, interpolation_mode=interp_mode)
)
vals, timestamps, invalidation_bits = (
vals.samples,
vals.timestamps,
vals.invalidation_bits,
)
# get the channel conversion
conversion = channel.conversion
if conversion is None:
conversion_type = v4c.CONVERSION_TYPE_NON
else:
conversion_type = conversion.conversion_type
if conversion_type in v4c.CONVERSION_GROUP_1:
if channel_type == v4c.CHANNEL_TYPE_VLSD:
signal_data = self._load_signal_data(group=grp, index=ch_nr)
if signal_data:
values = []
vals = vals.tolist()
for offset in vals:
(str_size,) = UINT32_uf(signal_data, offset)
offset += 4
values.append(signal_data[offset : offset + str_size])
if data_type == v4c.DATA_TYPE_BYTEARRAY:
vals = array(values)
vals = vals.view(dtype=f"({vals.itemsize},)u1")
else:
vals = array(values)
if data_type == v4c.DATA_TYPE_STRING_UTF_16_BE:
encoding = "utf-16-be"
elif data_type == v4c.DATA_TYPE_STRING_UTF_16_LE:
encoding = "utf-16-le"
elif data_type == v4c.DATA_TYPE_STRING_UTF_8:
encoding = "utf-8"
elif data_type == v4c.DATA_TYPE_STRING_LATIN_1:
encoding = "latin-1"
else:
raise MdfException(
f'wrong data type "{data_type}" for vlsd channel'
)
else:
# no VLSD signal data samples
vals = array([], dtype="S")
if data_type != v4c.DATA_TYPE_BYTEARRAY:
if data_type == v4c.DATA_TYPE_STRING_UTF_16_BE:
encoding = "utf-16-be"
elif data_type == v4c.DATA_TYPE_STRING_UTF_16_LE:
encoding = "utf-16-le"
elif data_type == v4c.DATA_TYPE_STRING_UTF_8:
encoding = "utf-8"
elif data_type == v4c.DATA_TYPE_STRING_LATIN_1:
encoding = "latin-1"
else:
raise MdfException(
f'wrong data type "{data_type}" for vlsd channel'
)
elif channel_type in {
v4c.CHANNEL_TYPE_VALUE,
v4c.CHANNEL_TYPE_MLSD,
} and (
v4c.DATA_TYPE_STRING_LATIN_1
<= data_type
<= v4c.DATA_TYPE_STRING_UTF_16_BE
):
if data_type == v4c.DATA_TYPE_STRING_UTF_16_BE:
encoding = "utf-16-be"
elif data_type == v4c.DATA_TYPE_STRING_UTF_16_LE:
encoding = "utf-16-le"
elif data_type == v4c.DATA_TYPE_STRING_UTF_8:
encoding = "utf-8"
elif data_type == v4c.DATA_TYPE_STRING_LATIN_1:
encoding = "latin-1"
else:
raise MdfException(
f'wrong data type "{data_type}" for string channel'
)
# CANopen date
if data_type == v4c.DATA_TYPE_CANOPEN_DATE:
vals = vals.tostring()
types = dtype(
[
("ms", "<u2"),
("min", "<u1"),
("hour", "<u1"),
("day", "<u1"),
("month", "<u1"),
("year", "<u1"),
]
)
vals = vals.view(types)
arrays = []
arrays.append(vals["ms"])
# bit 6 and 7 of minutes are reserved
arrays.append(vals["min"] & 0x3F)
# only firt 4 bits of hour are used
arrays.append(vals["hour"] & 0xF)
# the first 4 bits are the day number
arrays.append(vals["day"] & 0xF)
# bit 6 and 7 of month are reserved
arrays.append(vals["month"] & 0x3F)
# bit 7 of year is reserved
arrays.append(vals["year"] & 0x7F)
# add summer or standard time information for hour
arrays.append((vals["hour"] & 0x80) >> 7)
# add day of week information
arrays.append((vals["day"] & 0xF0) >> 4)
names = [
"ms",
"min",
"hour",
"day",
"month",
"year",
"summer_time",
"day_of_week",
]
vals = fromarrays(arrays, names=names)
del arrays
conversion = None
# CANopen time
elif data_type == v4c.DATA_TYPE_CANOPEN_TIME:
types = dtype([("ms", "<u4"), ("days", "<u2")])
vals = vals.view(types)
arrays = []
# bits 28 to 31 are reserverd for ms
arrays.append(vals["ms"] & 0xFFFFFFF)
arrays.append(vals["days"] & 0x3F)
names = ["ms", "days"]
vals = fromarrays(arrays, names=names)
del arrays
conversion = None
if conversion_type == v4c.CONVERSION_TYPE_TRANS:
if not raw:
vals = conversion.convert(vals)
conversion = None
if conversion_type == v4c.CONVERSION_TYPE_TTAB:
raw = True
elif conversion_type in v4c.CONVERSION_GROUP_2:
if not raw:
vals = conversion.convert(vals)
conversion = None
else:
raw = True
if samples_only:
if not channel_invalidation_present or not ignore_invalidation_bits:
invalidation_bits = None
res = vals, invalidation_bits
else:
# search for unit in conversion texts
if name is None:
name = channel.name
unit = conversion and conversion.unit or channel.unit
comment = channel.comment
source = channel.source
cg_source = grp.channel_group.acq_source
if source:
source = SignalSource(
source.name,
source.path,
source.comment,
source.source_type,
source.bus_type,
)
elif cg_source:
source = SignalSource(
cg_source.name,
cg_source.path,
cg_source.comment,
cg_source.source_type,
cg_source.bus_type,
)
else:
source = None
if channel.attachment:
attachment = self.extract_attachment(index=channel.attachments[0])
elif channel_type == v4c.CHANNEL_TYPE_SYNC:
index = self._attachments_map[channel.data_block_addr]
attachment = self.extract_attachment(index=index)
else:
attachment = ()
master_metadata = self._master_channel_metadata.get(gp_nr, None)
if not channel_invalidation_present or not ignore_invalidation_bits:
invalidation_bits = None
try:
res = Signal(
samples=vals,
timestamps=timestamps,
unit=unit,
name=name,
comment=comment,
conversion=conversion,
raw=raw,
master_metadata=master_metadata,
attachment=attachment,
source=source,
display_name=channel.display_name,
bit_count=bit_count,
stream_sync=stream_sync,
invalidation_bits=invalidation_bits,
encoding=encoding,
)
except:
debug_channel(self, grp, channel, dependency_list)
raise
return res
[docs] def get_master(
self,
index,
data=None,
raster=None,
record_offset=0,
record_count=None,
copy_master=True,
):
""" returns master channel samples for given group
Parameters
----------
index : int
group index
data : (bytes, int)
(data block raw bytes, fragment offset); default None
raster : float
raster to be used for interpolation; default None
record_offset : int
if *data=None* use this to select the record offset from which the
group data should be loaded
record_count : int
number of records to read; default *None* and in this case all
available records are used
copy_master : bool
return a copy of the cached master
Returns
-------
t : numpy.array
master channel samples
"""
fragment = data
if fragment:
data_bytes, offset, _count = fragment
try:
timestamps = self._master_channel_cache[(index, offset, _count)]
if raster and len(timestamps):
timestamps = arange(timestamps[0], timestamps[-1], raster)
return timestamps
else:
if copy_master:
return timestamps.copy()
else:
return timestamps
except KeyError:
pass
else:
try:
timestamps = self._master_channel_cache[index]
if raster and len(timestamps):
timestamps = arange(timestamps[0], timestamps[-1], raster)
return timestamps
else:
if copy_master:
return timestamps.copy()
else:
return timestamps
except KeyError:
offset = 0
group = self.groups[index]
original_data = fragment
if group.data_location == v4c.LOCATION_ORIGINAL_FILE:
stream = self._file
else:
stream = self._tempfile
time_ch_nr = self.masters_db.get(index, None)
channel_group = group.channel_group
record_size = channel_group.samples_byte_nr
record_size += channel_group.invalidation_bytes_nr
cycles_nr = group.channel_group.cycles_nr
if original_data:
cycles_nr = len(data_bytes) // record_size
else:
_count = record_count
if time_ch_nr is None:
if record_size:
offset = offset // record_size
t = arange(cycles_nr, dtype=float64)
t += offset
else:
t = array([], dtype=float64)
metadata = ("time", v4c.SYNC_TYPE_TIME)
else:
time_ch = group.channels[time_ch_nr]
time_conv = time_ch.conversion
time_name = time_ch.name
metadata = (time_name, time_ch.sync_type)
if time_ch.channel_type == v4c.CHANNEL_TYPE_VIRTUAL_MASTER:
offset = offset // record_size
time_a = time_conv["a"]
time_b = time_conv["b"]
t = arange(cycles_nr, dtype=float64)
t += offset
t *= time_a
t += time_b
else:
# get data group parents and dtypes
parents, dtypes = group.parents, group.types
if parents is None:
parents, dtypes = self._prepare_record(group)
# get data
if fragment is None:
data = self._load_data(
group, record_offset=record_offset, record_count=record_count
)
else:
data = (fragment,)
time_values = []
for fragment in data:
data_bytes, offset, _count = fragment
try:
parent, _ = parents[time_ch_nr]
except KeyError:
parent = None
if parent is not None:
if group.record is None:
dtypes = group.types
if dtypes.itemsize:
record = fromstring(data_bytes, dtype=dtypes)
else:
record = None
else:
record = group.record
t = record[parent]
else:
t = self._get_not_byte_aligned_data(
data_bytes, group, time_ch_nr
)
time_values.append(t.copy())
if len(time_values) > 1:
t = concatenate(time_values)
else:
t = time_values[0]
# get timestamps
if time_conv:
t = time_conv.convert(t)
self._master_channel_metadata[index] = metadata
if not t.dtype == float64:
t = t.astype(float64)
if original_data is None:
self._master_channel_cache[index] = t
else:
data_bytes, offset, _ = original_data
self._master_channel_cache[(index, offset, _count)] = t
if raster and t.size:
timestamps = t
if len(t) > 1:
num = float(float32((timestamps[-1] - timestamps[0]) / raster))
if int(num) == num:
timestamps = linspace(t[0], t[-1], int(num))
else:
timestamps = arange(t[0], t[-1], raster)
return timestamps
else:
timestamps = t
if copy_master:
return timestamps.copy()
else:
return timestamps
[docs] def get_can_signal(
self, name, database=None, db=None, ignore_invalidation_bits=False
):
""" get CAN message signal. You can specify an external CAN database (
*database* argument) or canmatrix databse object that has already been
loaded from a file (*db* argument).
The signal name can be specified in the following ways
* ``CAN<ID>.<MESSAGE_NAME>.<SIGNAL_NAME>`` - the `ID` value starts from 1
and must match the ID found in the measurement (the source CAN bus ID)
Example: CAN1.Wheels.FL_WheelSpeed
* ``CAN<ID>.CAN_DataFrame_<MESSAGE_ID>.<SIGNAL_NAME>`` - the `ID` value
starts from 1 and the `MESSAGE_ID` is the decimal message ID as found
in the database. Example: CAN1.CAN_DataFrame_218.FL_WheelSpeed
* ``<MESSAGE_NAME>.SIGNAL_NAME`` - in this case the first occurence of
the message name and signal are returned (the same message could be
found on muplit CAN buses; for example on CAN1 and CAN3)
Example: Wheels.FL_WheelSpeed
* ``CAN_DataFrame_<MESSAGE_ID>.<SIGNAL_NAME>`` - in this case the first
occurence of the message name and signal are returned (the same
message could be found on muplit CAN buses; for example on CAN1 and
CAN3). Example: CAN_DataFrame_218.FL_WheelSpeed
* ``<SIGNAL_NAME>`` - in this case the first occurence of the signal
name is returned ( the same signal anme coudl be found in multiple
messages and on multiple CAN buses). Example: FL_WheelSpeed
Parameters
----------
name : str
signal name
database : str
path of external CAN database file (.dbc or .arxml); default *None*
db : canmatrix.database
canmatrix CAN database object; default *None*
ignore_invalidation_bits : bool
option to ignore invalidation bits
Returns
-------
sig : Signal
Signal object with the physical values
"""
if database is None and db is None:
return self.get(name)
if db is None:
if not database.lower().endswith(("dbc", "arxml")):
message = f'Expected .dbc or .arxml file as CAN channel attachment but got "{database}"'
logger.exception(message)
raise MdfException(message)
else:
import_type = "dbc" if database.lower().endswith("dbc") else "arxml"
with open(database, "rb") as db:
db_string = db.read()
md5_sum = md5().update(db_string).digest()
if md5_sum in self._external_dbc_cache:
db = self._external_dbc_cache[md5_sum]
else:
try:
db_string = db_string.decode("utf-8")
db = self._external_dbc_cache[md5_sum] = loads(
db_string, importType=import_type, key="db"
)["db"]
except UnicodeDecodeError:
try:
from cchardet import detect
encoding = detect(db_string)["encoding"]
db_string = db_string.decode(encoding)
db = self._dbc_cache[md5_sum] = loads(
db_string,
importType=import_type,
key="db",
encoding=encoding,
)["db"]
except ImportError:
message = (
"Unicode exception occured while processing the database "
f'attachment "{database}" and "cChardet" package is '
'not installed. Mdf version 4 expects "utf-8" '
"strings and this package may detect if a different"
" encoding was used"
)
logger.warning(message)
name_ = name.split(".")
if len(name_) == 3:
can_id_str, message_id_str, signal = name_
can_id = v4c.CAN_ID_PATTERN.search(can_id_str)
if can_id is None:
raise MdfException(
f'CAN id "{can_id_str}" of signal name "{name}" is not recognised by this library'
)
else:
can_id = f'CAN{an_id.group("id")}'
message_id = v4c.CAN_DATA_FRAME_PATTERN.search(message_id_str)
if message_id is None:
message_id = message_id_str
else:
message_id = int(message_id)
elif len(name_) == 2:
message_id_str, signal = name_
can_id = None
message_id = v4c.CAN_DATA_FRAME_PATTERN.search(message_id_str)
if message_id is None:
message_id = message_id_str
else:
message_id = int(message_id)
else:
can_id = message_id = None
signal = name
if isinstance(message_id, str):
message = db.frameByName(message_id)
else:
message = db.frameById(message_id)
for sig in message.signals:
if sig.name == signal:
signal = sig
break
else:
raise MdfException(
f'Signal "{signal}" not found in message "{message.name}" of "{database}"'
)
if can_id is None:
for _can_id, messages in self.can_logging_db.items():
if message.id in messages:
index = messages[message.id]
break
else:
raise MdfException(
f'Message "{message.name}" (ID={hex(message.id)}) not found in the measurement'
)
else:
if can_id in self.can_logging_db:
if message.id in self.can_logging_db[can_id]:
index = self.can_logging_db[can_id][message.id]
else:
raise MdfException(
f'Message "{message.name}" (ID={hex(message.id)}) not found in the measurement'
)
else:
raise MdfException(
f'No logging from "{can_id}" was found in the measurement'
)
can_ids = self.get(
"CAN_DataFrame.ID",
group=index,
ignore_invalidation_bits=ignore_invalidation_bits,
)
payload = self.get(
"CAN_DataFrame.DataBytes",
group=index,
samples_only=True,
ignore_invalidation_bits=ignore_invalidation_bits,
)[0]
idx = nonzero(can_ids.samples == message.id)[0]
vals = payload[idx]
t = can_ids.timestamps[idx].copy()
if can_ids.invalidation_bits is not None:
invalidation_bits = can_ids.invalidation_bits
else:
invalidation_bits = None
record_size = vals.shape[1]
big_endian = False if signal.is_little_endian else True
signed = signal.is_signed
bit_offset = signal.startBit % 8
byte_offset = signal.startBit // 8
bit_count = signal.size
byte_count = bit_offset + bit_count
if byte_count % 8:
byte_count = (byte_count >> 3) + 1
else:
byte_count //= 8
types = [
("", f"a{byte_offset}"),
("vals", f"({byte_count},)u1"),
("", f"a{record_size - byte_count - byte_offset}"),
]
vals = vals.view(dtype=dtype(types))
vals = vals["vals"]
if not big_endian:
vals = flip(vals, 1)
vals = unpackbits(vals)
vals = roll(vals, bit_offset)
vals = vals.reshape((len(vals) // 8, 8))
vals = packbits(vals)
vals = vals.reshape((len(vals) // byte_count, byte_count))
if bit_count < 64:
mask = 2 ** bit_count - 1
masks = []
while mask:
masks.append(mask & 0xFF)
mask >>= 8
for i in range(byte_count - len(masks)):
masks.append(0)
masks = masks[::-1]
for i, mask in enumerate(masks):
vals[:, i] &= mask
if not big_endian:
vals = flip(vals, 1)
if bit_count <= 8:
size = 1
elif bit_count <= 16:
size = 2
elif bit_count <= 32:
size = 4
elif bit_count <= 64:
size = 8
else:
size = bit_count // 8
if size > byte_count:
extra_bytes = size - byte_count
extra = zeros((len(vals), extra_bytes), dtype=uint8)
types = [
("vals", vals.dtype, vals.shape[1:]),
("", extra.dtype, extra.shape[1:]),
]
vals = fromarrays([vals, extra], dtype=dtype(types))
fmt = "{}u{}".format(">" if big_endian else "<", size)
if size <= byte_count:
if big_endian:
types = [("", f"a{byte_count - size}"), ("vals", fmt)]
else:
types = [("vals", fmt), ("", f"a{byte_count - size}")]
else:
types = [("vals", fmt)]
vals = vals.view(dtype=dtype(types))
if signed:
vals = as_non_byte_sized_signed_int(vals["vals"], bit_count)
else:
vals = vals["vals"]
comment = signal.comment or ""
if (signal.factor, signal.offset) != (1, 0):
vals = vals * float(signal.factor) + float(signal.offset)
if ignore_invalidation_bits:
return Signal(
samples=vals,
timestamps=t,
name=name,
unit=signal.unit or "",
comment=comment,
invalidation_bits=invalidation_bits,
)
else:
if invalidation_bits is not None:
vals = vals[nonzero(~invalidation_bits)[0]]
t = t[nonzero(~invalidation_bits)[0]]
return Signal(
samples=vals,
timestamps=t,
name=name,
unit=signal.unit or "",
comment=comment,
)
[docs] def info(self):
"""get MDF information as a dict
Examples
--------
>>> mdf = MDF4('test.mdf')
>>> mdf.info()
"""
info = {}
info["version"] = (
self.identification["version_str"].decode("utf-8").strip(" \n\t\0")
)
info["groups"] = len(self.groups)
for i, gp in enumerate(self.groups):
inf = {}
info[f"group {i}"] = inf
inf["cycles"] = gp.channel_group.cycles_nr
inf["comment"] = gp.channel_group.comment
inf["channels count"] = len(gp.channels)
for j, channel in enumerate(gp.channels):
name = channel.name
ch_type = v4c.CHANNEL_TYPE_TO_DESCRIPTION[channel.channel_type]
inf[f"channel {j}"] = f'name="{name}" type={ch_type}'
return info
[docs] def save(self, dst, overwrite=False, compression=0):
"""Save MDF to *dst*. If overwrite is *True* then the destination file
is overwritten, otherwise the file name is appened with '.<cntr>', were
'<cntr>' is the first conter that produces a new file name
(that does not already exist in the filesystem)
Parameters
----------
dst : str
destination file name, Default ''
overwrite : bool
overwrite flag, default *False*
compression : int
use compressed data blocks, default 0; valid since version 4.10
* 0 - no compression
* 1 - deflate (slower, but produces smaller files)
* 2 - transposition + deflate (slowest, but produces
the smallest files)
Returns
-------
output_file : pathlib.Path
path to saved file
"""
dst = Path(dst).with_suffix(".mf4")
destination_dir = dst.parent
destination_dir.mkdir(parents=True, exist_ok=True)
if overwrite is False:
if dst.is_file():
cntr = 0
while True:
name = dst.with_suffix(f".{cntr}.mf4")
if not name.exists():
break
else:
cntr += 1
message = (
f'Destination file "{dst}" already exists '
f'and "overwrite" is False. Saving MDF file as "{name}"'
)
logger.warning(message)
dst = name
if not self.file_history:
comment = "created"
else:
comment = "updated"
fh = FileHistory()
fh.comment = f"""<FHcomment>
<TX>{comment}</TX>
<tool_id>asammdf</tool_id>
<tool_vendor>asammdf</tool_vendor>
<tool_version>{__version__}</tool_version>
</FHcomment>"""
self.file_history.append(fh)
if dst == self.name:
destination = dst.with_suffix(".savetemp")
else:
destination = dst
with open(destination, "wb+") as dst_:
defined_texts = {}
cc_map = {}
si_map = {}
groups_nr = len(self.groups)
write = dst_.write
tell = dst_.tell
seek = dst_.seek
blocks = []
write(bytes(self.identification))
self.header.to_blocks(dst_.tell(), blocks)
for block in blocks:
write(bytes(block))
original_data_addresses = []
if compression == 1:
zip_type = v4c.FLAG_DZ_DEFLATE
else:
zip_type = v4c.FLAG_DZ_TRANPOSED_DEFLATE
# write DataBlocks first
for gp_nr, gp in enumerate(self.groups):
original_data_addresses.append(gp.data_group.data_block_addr)
if gp.channel_group.flags & v4c.FLAG_CG_VLSD:
continue
address = tell()
total_size = (
gp.channel_group.samples_byte_nr
+ gp.channel_group.invalidation_bytes_nr
) * gp.channel_group.cycles_nr
if self._write_fragment_size:
samples_size = (
gp.channel_group.samples_byte_nr
+ gp.channel_group.invalidation_bytes_nr
)
if samples_size:
split_size = self._write_fragment_size // samples_size
split_size *= samples_size
if split_size == 0:
split_size = samples_size
chunks = float(total_size) / split_size
chunks = int(ceil(chunks))
else:
chunks = 1
else:
chunks = 1
self.configure(read_fragment_size=split_size)
data = self._load_data(gp)
if chunks == 1:
data_ = next(data)[0]
if compression and self.version > "4.00":
if compression == 1:
param = 0
else:
param = (
gp.channel_group.samples_byte_nr
+ gp.channel_group.invalidation_bytes_nr
)
kwargs = {"data": data_, "zip_type": zip_type, "param": param}
data_block = DataZippedBlock(**kwargs)
else:
data_block = DataBlock(data=data_)
write(bytes(data_block))
align = data_block.block_len % 8
if align:
write(b"\0" * (8 - align))
if gp.channel_group.cycles_nr:
gp.data_group.data_block_addr = address
else:
gp.data_group.data_block_addr = 0
else:
kwargs = {"flags": v4c.FLAG_DL_EQUAL_LENGHT, "zip_type": zip_type}
hl_block = HeaderList(**kwargs)
kwargs = {
"flags": v4c.FLAG_DL_EQUAL_LENGHT,
"links_nr": chunks + 1,
"data_block_nr": chunks,
"data_block_len": split_size,
}
dl_block = DataList(**kwargs)
for i in range(chunks):
data_ = next(data)[0]
if compression and self.version > "4.00":
if compression == 1:
zip_type = v4c.FLAG_DZ_DEFLATE
else:
zip_type = v4c.FLAG_DZ_TRANPOSED_DEFLATE
if compression == 1:
param = 0
else:
param = (
gp.channel_group.samples_byte_nr
+ gp.channel_group.invalidation_bytes_nr
)
kwargs = {
"data": data_,
"zip_type": zip_type,
"param": param,
}
block = DataZippedBlock(**kwargs)
else:
block = DataBlock(data=data_)
address = tell()
block.address = address
write(bytes(block))
align = block.block_len % 8
if align:
write(b"\0" * (8 - align))
dl_block[f"data_block_addr{i}"] = address
address = tell()
dl_block.address = address
write(bytes(dl_block))
if compression and self.version != "4.00":
hl_block.first_dl_addr = address
address = tell()
hl_block.address = address
write(bytes(hl_block))
gp.data_group.data_block_addr = address
if self._callback:
self._callback(int(50 * (gp_nr + 1) / groups_nr), 100)
if self._terminate:
dst_.close()
self.close()
return
address = tell()
blocks = []
# attachments
at_map = {}
if self.attachments:
for at_block in self.attachments:
address = at_block.to_blocks(address, blocks, defined_texts)
for i in range(len(self.attachments) - 1):
at_block = self.attachments[i]
at_block.next_at_addr = self.attachments[i + 1].address
self.attachments[-1].next_at_addr = 0
# file history blocks
for fh in self.file_history:
address = fh.to_blocks(address, blocks, defined_texts)
for i, fh in enumerate(self.file_history[:-1]):
fh.next_fh_addr = self.file_history[i + 1].address
self.file_history[-1].next_fh_addr = 0
# data groups
gp_rec_ids = []
valid_data_groups = []
for gp in self.groups:
if gp.channel_group.flags & v4c.FLAG_CG_VLSD:
continue
valid_data_groups.append(gp.data_group)
gp_rec_ids.append(gp.data_group.record_id_len)
address = gp.data_group.to_blocks(address, blocks, defined_texts)
if valid_data_groups:
for i, dg in enumerate(valid_data_groups[:-1]):
addr_ = valid_data_groups[i + 1].address
dg.next_dg_addr = addr_
valid_data_groups[-1].next_dg_addr = 0
# go through each data group and append the rest of the blocks
for i, gp in enumerate(self.groups):
channels = gp.channels
for j, channel in enumerate(channels):
if channel.attachment:
channel.attachment_addr = self.attachments[channel.attachment].address
address = channel.to_blocks(
address, blocks, defined_texts, cc_map, si_map
)
if channel.channel_type == v4c.CHANNEL_TYPE_SYNC:
idx = self._attachments_map[channel.data_block_addr]
channel.data_block_addr = self.attachments[idx].address
else:
sdata = self._load_signal_data(group=gp, index=j)
if sdata:
split_size = self._write_fragment_size
if self._write_fragment_size:
chunks = float(len(sdata)) / split_size
chunks = int(ceil(chunks))
else:
chunks = 1
if chunks == 1:
if compression and self.version > "4.00":
signal_data = DataZippedBlock(
data=sdata,
zip_type=v4c.FLAG_DZ_DEFLATE,
original_type=b"SD",
)
signal_data.address = address
address += signal_data.block_len
blocks.append(signal_data)
align = signal_data.block_len % 8
if align:
blocks.append(b"\0" * (8 - align))
address += 8 - align
else:
signal_data = DataBlock(data=sdata, type="SD")
signal_data.address = address
address += signal_data.block_len
blocks.append(signal_data)
align = signal_data.block_len % 8
if align:
blocks.append(b"\0" * (8 - align))
address += 8 - align
channel.data_block_addr = signal_data.address
else:
kwargs = {
"flags": v4c.FLAG_DL_EQUAL_LENGHT,
"links_nr": chunks + 1,
"data_block_nr": chunks,
"data_block_len": self._write_fragment_size,
}
dl_block = DataList(**kwargs)
for k in range(chunks):
data_ = sdata[k * split_size : (k + 1) * split_size]
if compression and self.version > "4.00":
zip_type = v4c.FLAG_DZ_DEFLATE
param = 0
kwargs = {
"data": data_,
"zip_type": zip_type,
"param": param,
"original_type": b"SD",
}
block = DataZippedBlock(**kwargs)
else:
block = DataBlock(data=data_, type="SD")
blocks.append(block)
block.address = address
address += block.block_len
align = block.block_len % 8
if align:
blocks.append(b"\0" * (8 - align))
address += 8 - align
dl_block[f"data_block_addr{k}"] = block.address
dl_block.address = address
blocks.append(dl_block)
address += dl_block.block_len
if compression and self.version > "4.00":
kwargs = {
"flags": v4c.FLAG_DL_EQUAL_LENGHT,
"zip_type": v4c.FLAG_DZ_DEFLATE,
"first_dl_addr": dl_block.address,
}
hl_block = HeaderList(**kwargs)
hl_block.address = address
address += hl_block.block_len
blocks.append(hl_block)
channel.data_block_addr = hl_block.address
else:
channel.data_block_addr = dl_block.address
else:
channel.data_block_addr = 0
dep_list = gp.channel_dependencies[j]
if dep_list:
if all(isinstance(dep, ChannelArrayBlock) for dep in dep_list):
for dep in dep_list:
dep.address = address
address += dep.block_len
blocks.append(dep)
for k, dep in enumerate(dep_list[:-1]):
dep.composition_addr = dep_list[k + 1].address
dep_list[-1].composition_addr = 0
channel.component_addr = dep_list[0].address
else:
index = dep_list[0][1]
addr_ = gp.channels[index].address
for channel in gp.logging_channels:
address = channel.to_blocks(
address, blocks, defined_texts, cc_map, si_map
)
group_channels = list(chain(gp.channels, gp.logging_channels))
if group_channels:
for j, channel in enumerate(group_channels[:-1]):
channel.next_ch_addr = group_channels[j + 1].address
group_channels[-1].next_ch_addr = 0
# channel dependecies
j = len(channels) - 1
while j >= 0:
dep_list = gp.channel_dependencies[j]
if dep_list and all(isinstance(dep, tuple) for dep in dep_list):
index = dep_list[0][1]
channels[j].component_addr = channels[index].address
index = dep_list[-1][1]
channels[j].next_ch_addr = channels[index].next_ch_addr
channels[index].next_ch_addr = 0
for _, ch_nr in dep_list:
channels[ch_nr].source_addr = 0
j -= 1
# channel group
if gp.channel_group.flags & v4c.FLAG_CG_VLSD:
continue
gp.channel_group.first_sample_reduction_addr = 0
if channels:
gp.channel_group.first_ch_addr = gp.channels[0].address
else:
gp.channel_group.first_ch_addr = 0
gp.channel_group.next_cg_addr = 0
address = gp.channel_group.to_blocks(
address, blocks, defined_texts, si_map
)
gp.data_group.first_cg_addr = gp.channel_group.address
if self._callback:
self._callback(int(50 * (i + 1) / groups_nr) + 25, 100)
if self._terminate:
dst_.close()
self.close()
return
for gp in self.groups:
for dep_list in gp.channel_dependencies:
if dep_list:
if all(isinstance(dep, ChannelArrayBlock) for dep in dep_list):
for dep in dep_list:
if dep.ca_type != v4c.CA_TYPE_LOOKUP:
dep.referenced_channels = []
continue
for i, (gp_nr, ch_nr) in enumerate(
dep.referenced_channels
):
grp = self.groups[gp_nr]
ch = grp.channels[ch_nr]
dep[
f"scale_axis_{i}_dg_addr"
] = grp.data_group.address
dep[
f"scale_axis_{i}_cg_addr"
] = grp.channel_group.address
dep[f"scale_axis_{i}_ch_addr"] = ch.address
for gp in self.groups:
gp.data_group.record_id_len = 0
ev_map = []
if self.events:
for event in self.events:
for i, ref in enumerate(event.scopes):
try:
dg_cntr, ch_cntr = ref
event[f"scope_{i}_addr"] = (
self.groups[dg_cntr].channels[ch_cntr].address
)
except TypeError:
dg_cntr = ref
event[f"scope_{i}_addr"] = self.groups[
dg_cntr
].channel_group.address
for i in range(event.attachment_nr):
key = f"attachment_{i}_addr"
addr = event[key]
event[key] = at_map[addr]
blocks.append(event)
ev_map.append(address)
event.address = address
address += event.block_len
if event.name:
tx_block = TextBlock(text=event.name)
tx_block.address = address
blocks.append(tx_block)
address += tx_block.block_len
event.name_addr = tx_block.address
else:
event.name_addr = 0
if event.comment:
meta = event.comment.startswith("<EVcomment")
tx_block = TextBlock(text=event.comment, meta=meta)
tx_block.address = address
blocks.append(tx_block)
address += tx_block.block_len
event.comment_addr = tx_block.address
else:
event.comment_addr = 0
if event.parent is not None:
event.parent_ev_addr = ev_map[event.parent]
if event.range_start is not None:
event.range_start_ev_addr = ev_map[event.range_start]
for i in range(len(self.events) - 1):
self.events[i].next_ev_addr = self.events[i + 1].address
self.events[-1].next_ev_addr = 0
self.header.first_event_addr = self.events[0].address
if self._terminate:
dst_.close()
self.close()
return
if self._callback:
blocks_nr = len(blocks)
threshold = blocks_nr / 25
count = 1
for i, block in enumerate(blocks):
write(bytes(block))
if i >= threshold:
self._callback(75 + count, 100)
count += 1
threshold += blocks_nr / 25
else:
for block in blocks:
write(bytes(block))
for gp, rec_id in zip(self.groups, gp_rec_ids):
gp.data_group.record_id_len = rec_id
if valid_data_groups:
addr_ = valid_data_groups[0].address
self.header.first_dg_addr = addr_
else:
self.header.first_dg_addr = 0
self.header.file_history_addr = self.file_history[0].address
if self.attachments:
first_attachment = self.attachments[0]
addr_ = first_attachment.address
self.header.first_attachment_addr = addr_
else:
self.header.first_attachment_addr = 0
seek(v4c.IDENTIFICATION_BLOCK_SIZE)
write(bytes(self.header))
for orig_addr, gp in zip(original_data_addresses, self.groups):
gp.data_group.data_block_addr = orig_addr
at_map = {value: key for key, value in at_map.items()}
for event in self.events:
for i in range(event.attachment_nr):
key = f"attachment_{i}_addr"
addr = event[key]
event[key] = at_map[addr]
if dst == self.name:
self.close()
Path.unlink(self.name)
Path.rename(destination, self.name)
self.groups.clear()
self.header = None
self.identification = None
self.file_history.clear()
self.channels_db.clear()
self.masters_db.clear()
self.attachments.clear()
self.file_comment = None
self._ch_map.clear()
self._master_channel_cache.clear()
self._tempfile = TemporaryFile()
self._file = open(self.name, "rb")
self._read()
if self._callback:
self._callback(100, 100)
return dst
[docs] def get_channel_name(self, group, index):
"""Gets channel name.
Parameters
----------
group : int
0-based group index
index : int
0-based channel index
Returns
-------
name : str
found channel name
"""
gp_nr, ch_nr = self._validate_channel_selection(None, group, index)
return self.groups[gp_nr].channels[ch_nr].name
def get_channel_metadata(self, name=None, group=None, index=None):
gp_nr, ch_nr = self._validate_channel_selection(name, group, index)
grp = self.groups[gp_nr]
if ch_nr >= 0:
channel = grp.channels[ch_nr]
else:
channel = grp.logging_channels[-ch_nr - 1]
return channel
[docs] def get_channel_unit(self, name=None, group=None, index=None):
"""Gets channel unit.
Channel can be specified in two ways:
* using the first positional argument *name*
* if there are multiple occurrences for this channel then the
*group* and *index* arguments can be used to select a specific
group.
* if there are multiple occurrences for this channel and either the
*group* or *index* arguments is None then a warning is issued
* using the group number (keyword argument *group*) and the channel
number (keyword argument *index*). Use *info* method for group and
channel numbers
If the *raster* keyword argument is not *None* the output is
interpolated accordingly.
Parameters
----------
name : string
name of channel
group : int
0-based group index
index : int
0-based channel index
Returns
-------
unit : str
found channel unit
"""
gp_nr, ch_nr = self._validate_channel_selection(name, group, index)
grp = self.groups[gp_nr]
channel = grp.channels[ch_nr]
conversion = channel.conversion
unit = conversion and conversion.unit or channel.unit or ""
return unit