Source code for asammdf.blocks.mdf_v3

# -*- coding: utf-8 -*-
""" ASAM MDF version 3 file format module """

import logging
import time
import xml.etree.ElementTree as ET
import mmap
from copy import deepcopy
from functools import reduce
from itertools import product
from math import ceil
from tempfile import TemporaryFile
from pathlib import Path

from numpy import (
    arange,
    array,
    array_equal,
    column_stack,
    concatenate,
    dtype,
    flip,
    float32,
    float64,
    linspace,
    packbits,
    roll,
    uint8,
    union1d,
    unpackbits,
    zeros,
    searchsorted,
)

from numpy.core.records import fromarrays, fromstring
from pandas import DataFrame

from ..signal import Signal
from . import v2_v3_constants as v23c
from .conversion_utils import conversion_transfer
from .utils import (
    CHANNEL_COUNT,
    CONVERT,
    ChannelsDB,
    MdfException,
    SignalSource,
    as_non_byte_sized_signed_int,
    fmt_to_datatype_v3,
    get_fmt_v3,
    UniqueDB,
    validate_version_argument,
    count_channel_groups,
    is_file_like,
    Group,
)
from .v2_v3_blocks import (
    Channel,
    ChannelConversion,
    ChannelDependency,
    ChannelExtension,
    ChannelGroup,
    DataGroup,
    FileIdentificationBlock,
    HeaderBlock,
    TextBlock,
    TriggerBlock,
)
from ..version import __version__


logger = logging.getLogger("asammdf")

__all__ = ["MDF3"]


[docs]class MDF3(object): """The *header* attibute is a *HeaderBlock*. The *groups* attribute is a list of dicts, each one with the following keys * ``data_group`` - DataGroup object * ``channel_group`` - ChannelGroup object * ``channels`` - list of Channel objects with the same order as found in the mdf file * ``channel_dependencies`` - list of *ChannelArrayBlock* in case of channel arrays; list of Channel objects in case of structure channel composition * ``data_block`` - address of data block * ``data_location``- integer code for data location (original file, temporary file or memory) * ``data_block_addr`` - list of raw samples starting addresses * ``data_block_type`` - list of codes for data block type * ``data_block_size`` - list of raw samples block size * ``sorted`` - sorted indicator flag * ``record_size`` - dict that maps record ID's to record sizes in bytes * ``size`` - total size of data block for the current group * ``trigger`` - *Trigger* object for current group Parameters ---------- name : string | pathlib.Path mdf file name (if provided it must be a real file name) or file-like object version : string mdf file version ('2.00', '2.10', '2.14', '3.00', '3.10', '3.20' or '3.30'); default '3.30' callback : function keyword only argument: function to call to update the progress; the function must accept two arguments (the current progress and maximum progress value) Attributes ---------- attachments : list list of file attachments channels_db : dict used for fast channel access by name; for each name key the value is a list of (group index, channel index) tuples groups : list list of data group dicts header : HeaderBlock mdf file header identification : FileIdentificationBlock mdf file start block masters_db : dict used for fast master channel access; for each group index key the value is the master channel index memory : str memory optimization option name : string mdf file name version : str mdf version """ _terminate = False def __init__(self, name=None, version="3.30", **kwargs): self.groups = [] self.header = None self.identification = None self.channels_db = ChannelsDB(version=3) self.masters_db = {} self.version = version self._master_channel_cache = {} self._master_channel_metadata = {} self._tempfile = TemporaryFile() self._tempfile.write(b"\0") self._file = None self._read_fragment_size = 0 self._write_fragment_size = 4 * 2 ** 20 self._single_bit_uint_as_bool = False self._integer_interpolation = 0 self._si_map = {} self._cc_map = {} self._callback = kwargs.get("callback", None) if name: if is_file_like(name): self._file = name self.name = Path("From_FileLike.mf4") self._from_filelike = True self._read(mapped=False) else: self.name = Path(name) x = open(self.name, "r+b") self._file = mmap.mmap(x.fileno(), 0) self._from_filelike = False self._read(mapped=True) self._file.close() x.close() self._file = open(self.name, "r+b") else: self._from_filelike = False version = validate_version_argument(version, hint=3) self.identification = FileIdentificationBlock(version=version) self.version = version self.header = HeaderBlock(version=self.version) self.name = Path("new.mdf") def _load_data(self, group, record_offset=0, record_count=None): """ get group's data block bytes""" has_yielded = False offset = 0 _count = record_count channel_group = group.channel_group if group.data_location == v23c.LOCATION_ORIGINAL_FILE: # go to the first data block of the current data group stream = self._file else: stream = self._tempfile record_offset *= channel_group.samples_byte_nr # go to the first data block of the current data group if group.sorted: samples_size = channel_group.samples_byte_nr if not samples_size: yield b"", 0, _count has_yielded = True else: if self._read_fragment_size: split_size = self._read_fragment_size // samples_size split_size *= samples_size else: channels_nr = len(group.channels) y_axis = CONVERT idx = searchsorted(CHANNEL_COUNT, channels_nr, side="right") - 1 if idx < 0: idx = 0 split_size = y_axis[idx] split_size = split_size // samples_size split_size *= samples_size if split_size == 0: split_size = samples_size blocks = zip(group.data_block_addr, group.data_block_size) cur_size = 0 data = [] while True: try: address, size = next(blocks) current_address = address except StopIteration: break if offset + size < record_offset + 1: offset += size continue stream.seek(address) if offset < record_offset: delta = record_offset - offset stream.read(delta) current_address += delta size -= delta offset = record_offset while size >= split_size - cur_size: stream.seek(current_address) if data: data.append(stream.read(split_size - cur_size)) yield b"".join(data), offset, _count has_yielded = True current_address += split_size - cur_size else: yield stream.read(split_size), offset, _count has_yielded = True current_address += split_size offset += split_size size -= split_size - cur_size data = [] cur_size = 0 if size: stream.seek(current_address) data.append(stream.read(size)) cur_size += size offset += size if data: yield b"".join(data), offset, _count has_yielded = True elif not offset: yield b"", 0, _count has_yielded = True if not has_yielded: yield b"", 0, _count else: record_id = group.channel_group.record_id cg_size = group.record_size if group.data_group.record_id_len <= 2: record_id_nr = group.data_group.record_id_len else: record_id_nr = 0 cg_data = [] blocks = zip(group.data_block_addr, group.data_block_size) for address, size in blocks: stream.seek(address) data = stream.read(size) i = 0 while i < size: rec_id = data[i] # skip record id i += 1 rec_size = cg_size[rec_id] if rec_id == record_id: rec_data = data[i : i + rec_size] cg_data.append(rec_data) # consider the second record ID if it exists if record_id_nr == 2: i += rec_size + 1 else: i += rec_size cg_data = b"".join(cg_data) size = len(cg_data) if size: if offset + size < record_offset + 1: offset += size continue if offset < record_offset: delta = record_offset - offset size -= delta offset = record_offset yield cg_data, offset, _count has_yielded = True offset += size if not has_yielded: yield b"", 0, _count def _prepare_record(self, group): """ compute record dtype and parents dict for this group Parameters ---------- group : dict MDF group dict Returns ------- parents, dtypes : dict, numpy.dtype mapping of channels to records fields, records fiels dtype """ parents, dtypes = group.parents, group.types if parents is None: if group.data_location == v23c.LOCATION_ORIGINAL_FILE: stream = self._file else: stream = self._tempfile grp = group record_size = grp.channel_group.samples_byte_nr << 3 next_byte_aligned_position = 0 types = [] current_parent = "" parent_start_offset = 0 parents = {} group_channels = UniqueDB() # the channels are first sorted ascending (see __lt__ method of Channel # class): a channel with lower start offset is smaller, when two # channels havethe same start offset the one with higer bit size is # considered smaller. The reason is that when the numpy record is built # and there are overlapping channels, the parent fields mustbe bigger # (bit size) than the embedded channels. For each channel the parent # dict will have a (parent name, bit offset) pair: the channel value is # computed using the values from the parent field, and the bit offset, # which is the channel's bit offset within the parent bytes. # This means all parents will have themselves as parent, and bit offset # of 0. Gaps in the records are also considered. Non standard integers # size is adjusted to the first higher standard integer size (eq. uint # of 28bits will be adjusted to 32bits) sortedchannels = sorted(enumerate(grp.channels), key=lambda i: i[1]) for original_index, new_ch in sortedchannels: # skip channels with channel dependencies from the numpy record if new_ch.component_addr: continue start_offset = new_ch.start_offset try: additional_byte_offset = new_ch.additional_byte_offset start_offset += 8 * additional_byte_offset except AttributeError: pass bit_offset = start_offset % 8 data_type = new_ch.data_type bit_count = new_ch.bit_count name = new_ch.name # handle multiple occurance of same channel name name = group_channels.get_unique_name(name) if start_offset >= next_byte_aligned_position: parent_start_offset = (start_offset // 8) * 8 # check if there are byte gaps in the record gap = (parent_start_offset - next_byte_aligned_position) // 8 if gap: types.append(("", f"V{gap}")) # adjust size to 1, 2, 4 or 8 bytes for nonstandard integers size = bit_offset + bit_count if data_type == v23c.DATA_TYPE_STRING: next_byte_aligned_position = parent_start_offset + size if next_byte_aligned_position <= record_size: dtype_pair = (name, get_fmt_v3(data_type, size)) types.append(dtype_pair) parents[original_index] = name, bit_offset else: next_byte_aligned_position = parent_start_offset elif data_type == v23c.DATA_TYPE_BYTEARRAY: next_byte_aligned_position = parent_start_offset + size if next_byte_aligned_position <= record_size: dtype_pair = (name, get_fmt_v3(data_type, size)) types.append(dtype_pair) parents[original_index] = name, bit_offset else: next_byte_aligned_position = parent_start_offset else: if size > 32: next_byte_aligned_position = parent_start_offset + 64 elif size > 16: next_byte_aligned_position = parent_start_offset + 32 elif size > 8: next_byte_aligned_position = parent_start_offset + 16 else: next_byte_aligned_position = parent_start_offset + 8 if next_byte_aligned_position <= record_size: dtype_pair = (name, get_fmt_v3(data_type, size)) types.append(dtype_pair) parents[original_index] = name, bit_offset else: next_byte_aligned_position = parent_start_offset current_parent = name else: max_overlapping = next_byte_aligned_position - start_offset if max_overlapping >= bit_count: parents[original_index] = ( current_parent, start_offset - parent_start_offset, ) if next_byte_aligned_position > record_size: break gap = (record_size - next_byte_aligned_position) >> 3 if gap: dtype_pair = ("", f"V{gap}") types.append(dtype_pair) dtypes = dtype(types) group.parents, group.types = parents, dtypes return parents, dtypes def _get_not_byte_aligned_data(self, data, group, ch_nr): big_endian_types = ( v23c.DATA_TYPE_UNSIGNED_MOTOROLA, v23c.DATA_TYPE_FLOAT_MOTOROLA, v23c.DATA_TYPE_DOUBLE_MOTOROLA, v23c.DATA_TYPE_SIGNED_MOTOROLA, ) record_size = group.channel_group.samples_byte_nr channel = group.channels[ch_nr] bit_offset = channel.start_offset % 8 byte_offset = channel.start_offset // 8 bit_count = channel.bit_count byte_count = bit_offset + bit_count if byte_count % 8: byte_count = (byte_count >> 3) + 1 else: byte_count >>= 3 types = [ ("", f"V{byte_offset}"), ("vals", f"({byte_count},)u1"), ("", f"V{record_size - byte_count - byte_offset}"), ] vals = fromstring(data, dtype=dtype(types)) vals = vals["vals"] if channel.data_type not in big_endian_types: vals = flip(vals, 1) vals = unpackbits(vals) vals = roll(vals, bit_offset) vals = vals.reshape((len(vals) // 8, 8)) vals = packbits(vals) vals = vals.reshape((len(vals) // byte_count, byte_count)) if bit_count < 64: mask = 2 ** bit_count - 1 masks = [] while mask: masks.append(mask & 0xFF) mask >>= 8 for i in range(byte_count - len(masks)): masks.append(0) masks = masks[::-1] for i, mask in enumerate(masks): vals[:, i] &= mask if channel.data_type not in big_endian_types: vals = flip(vals, 1) if bit_count <= 8: size = 1 elif bit_count <= 16: size = 2 elif bit_count <= 32: size = 4 elif bit_count <= 64: size = 8 else: size = bit_count // 8 if size > byte_count: extra_bytes = size - byte_count extra = zeros((len(vals), extra_bytes), dtype=uint8) types = [ ("vals", vals.dtype, vals.shape[1:]), ("", extra.dtype, extra.shape[1:]), ] vals = fromarrays([vals, extra], dtype=dtype(types)) vals = vals.tostring() fmt = get_fmt_v3(channel.data_type, bit_count) if size <= byte_count: if channel.data_type in big_endian_types: types = [("", f"a{byte_count - size}"), ("vals", fmt)] else: types = [("vals", fmt), ("", f"a{byte_count - size}")] else: types = [("vals", fmt)] vals = fromstring(vals, dtype=dtype(types))["vals"] if channel.data_type in v23c.SIGNED_INT: return as_non_byte_sized_signed_int(vals, bit_count) else: return vals def _validate_channel_selection( self, name=None, group=None, index=None, source=None ): """Gets channel comment. Channel can be specified in two ways: * using the first positional argument *name* * if there are multiple occurrences for this channel then the *group* and *index* arguments can be used to select a specific group. * if there are multiple occurrences for this channel and either the *group* or *index* arguments is None then a warning is issued * using the group number (keyword argument *group*) and the channel number (keyword argument *index*). Use *info* method for group and numbers If the *raster* keyword argument is not *None* the output is interpolated accordingly. Parameters ---------- name : string name of channel group : int 0-based group index index : int 0-based channel index Returns ------- group_index, channel_index : (int, int) selected channel's group and channel index """ suppress = True if name is None: if group is None or index is None: message = ( "Invalid arguments for channel selection: " 'must give "name" or, "group" and "index"' ) raise MdfException(message) else: gp_nr, ch_nr = group, index if ch_nr >= 0: if gp_nr > len(self.groups) - 1: raise MdfException("Group index out of range") if index > len(self.groups[gp_nr].channels) - 1: raise MdfException("Channel index out of range") else: if name not in self.channels_db: raise MdfException(f'Channel "{name}" not found') else: if source is not None: for gp_nr, ch_nr in self.channels_db[name]: source_name = self._get_source_name(gp_nr, ch_nr) if source_name == source: break else: raise MdfException( f"{name} with source {source} not found" ) elif group is None: gp_nr, ch_nr = self.channels_db[name][0] if len(self.channels_db[name]) > 1 and not suppress: message = ( f'Multiple occurances for channel "{name}". ' f"Using first occurance from data group {gp_nr}. " 'Provide both "group" and "index" arguments' " to select another data group" ) logger.warning(message) else: if index is not None and index < 0: gp_nr = group ch_nr = index else: for gp_nr, ch_nr in self.channels_db[name]: if gp_nr == group: if index is None: break elif index == ch_nr: break else: if index is None: message = f'Channel "{name}" not found in group {group}' else: message = ( f'Channel "{name}" not found in group {group} at index {index}' ) raise MdfException(message) return gp_nr, ch_nr def _get_source_name(self, group, index): grp = self.groups[group] if grp.channels[index].source: name = grp.channels[index].source.name else: name = "" return name def _read(self, mapped=False): stream = self._file cg_count, _ = count_channel_groups(stream) if self._callback: self._callback(0, cg_count) current_cg_index = 0 # performance optimization stream.seek(0) dg_cntr = 0 self.identification = FileIdentificationBlock(stream=stream) self.header = HeaderBlock(stream=stream) self.version = self.identification.version_str.decode("latin-1").strip(" \n\t\0") # this will hold mapping from channel address to Channel object # needed for linking dependency blocks to referenced channels after # the file is loaded ch_map = {} # go to first date group dg_addr = self.header.first_dg_addr # read each data group sequentially while dg_addr: data_group = DataGroup(address=dg_addr, stream=stream, mapped=mapped) record_id_nr = data_group.record_id_len cg_nr = data_group.cg_nr cg_addr = data_group.first_cg_addr data_addr = data_group.data_block_addr # read trigger information if available trigger_addr = data_group.trigger_addr if trigger_addr: trigger = TriggerBlock(address=trigger_addr, stream=stream) else: trigger = None new_groups = [] for i in range(cg_nr): new_groups.append(Group(None)) grp = new_groups[-1] grp.channels = [] grp.data_block = None grp.trigger = trigger grp.channel_dependencies = [] if record_id_nr: grp.sorted = False else: grp.sorted = True kargs = {"first_cg_addr": cg_addr, "data_block_addr": data_addr} if self.version >= "3.20": kargs["block_len"] = v23c.DG_POST_320_BLOCK_SIZE else: kargs["block_len"] = v23c.DG_PRE_320_BLOCK_SIZE kargs["record_id_len"] = record_id_nr grp.data_group = DataGroup(**kargs) # read each channel group sequentially grp.channel_group = ChannelGroup(address=cg_addr, stream=stream) # go to first channel of the current channel group ch_addr = grp.channel_group.first_ch_addr ch_cntr = 0 grp_chs = grp.channels while ch_addr: # read channel block and create channel object new_ch = Channel( address=ch_addr, stream=stream, mapped=mapped, si_map=self._si_map, cc_map=self._cc_map, ) # check if it has channel dependencies if new_ch.component_addr: dep = ChannelDependency( address=new_ch.component_addr, stream=stream, ) grp.channel_dependencies.append(dep) else: grp.channel_dependencies.append(None) # update channel map entry = dg_cntr, ch_cntr ch_map[ch_addr] = entry for name in (new_ch.name, new_ch.display_name): if name: self.channels_db.add(name, entry) if new_ch.channel_type == v23c.CHANNEL_TYPE_MASTER: self.masters_db[dg_cntr] = ch_cntr # go to next channel of the current channel group ch_cntr += 1 grp_chs.append(new_ch) ch_addr = new_ch.next_ch_addr cg_addr = grp.channel_group.next_cg_addr dg_cntr += 1 current_cg_index += 1 if self._callback: self._callback(current_cg_index, cg_count) if self._terminate: self.close() return # store channel groups record sizes dict and data block size in # each new group data belong to the initial unsorted group, and # add the key 'sorted' with the value False to use a flag; # this is used later if memory=False cg_size = {} total_size = 0 for grp in new_groups: record_id = grp.channel_group.record_id cycles_nr = grp.channel_group.cycles_nr record_size = grp.channel_group.samples_byte_nr cg_size[record_id] = record_size record_size += record_id_nr total_size += record_size * cycles_nr grp.record_size = cg_size grp.size = total_size for grp in new_groups: grp.data_location = v23c.LOCATION_ORIGINAL_FILE grp.data_group.data_block_addr = data_group.data_block_addr grp.data_block_addr = [data_group.data_block_addr] grp.data_block_size = [total_size] self.groups.extend(new_groups) # go to next data group dg_addr = data_group.next_dg_addr # finally update the channel depency references for grp in self.groups: for dep in grp.channel_dependencies: if dep: for i in range(dep.sd_nr): ref_channel_addr = dep[f"ch_{i}"] channel = ch_map[ref_channel_addr] dep.referenced_channels.append(channel)
[docs] def configure( self, *, read_fragment_size=None, write_fragment_size=None, use_display_names=None, single_bit_uint_as_bool=None, integer_interpolation=None, ): """ configure MDF parameters Parameters ---------- read_fragment_size : int size hint of split data blocks, default 8MB; if the initial size is smaller, then no data list is used. The actual split size depends on the data groups' records size write_fragment_size : int size hint of split data blocks, default 4MB; if the initial size is smaller, then no data list is used. The actual split size depends on the data groups' records size. Maximum size is 4MB to ensure compatibility with CANape use_display_names : bool search for display name in the Channel XML comment single_bit_uint_as_bool : bool return single bit channels are np.bool arrays integer_interpolation : int interpolation mode for integer channels: * 0 - repeat previous sample * 1 - use linear interpolation """ if read_fragment_size is not None: self._read_fragment_size = int(read_fragment_size) if write_fragment_size: self._write_fragment_size = min(int(write_fragment_size), 4 * 2 ** 20) if use_display_names is not None: self._use_display_names = bool(use_display_names) if single_bit_uint_as_bool is not None: self._single_bit_uint_as_bool = bool(single_bit_uint_as_bool) if integer_interpolation in (0, 1): self._integer_interpolation = int(integer_interpolation)
[docs] def add_trigger(self, group, timestamp, pre_time=0, post_time=0, comment=""): """ add trigger to data group Parameters ---------- group : int group index timestamp : float trigger time pre_time : float trigger pre time; default 0 post_time : float trigger post time; default 0 comment : str trigger comment """ comment_template = """<EVcomment> <TX>{}</TX> </EVcomment>""" try: group = self.groups[group] except IndexError: return trigger = group.trigger if comment: try: comment = ET.fromstring(comment) if comment.find(".//TX"): comment = comment.find(".//TX").text else: comment = "" except ET.ParseError: pass if trigger: count = trigger["trigger_events_nr"] trigger["trigger_events_nr"] += 1 trigger.block_len += 24 trigger[f"trigger_{count}_time"] = timestamp trigger[f"trigger_{count}_pretime"] = pre_time trigger[f"trigger_{count}_posttime"] = post_time if comment: if trigger.comment is None: comment = f"{count + 1}. {comment}" comment = comment_template.format(comment) trigger.comment = comment else: current_comment = trigger.comment try: current_comment = ET.fromstring(current_comment) if current_comment.find(".//TX"): current_comment = current_comment.find(".//TX").text else: current_comment = "" except ET.ParseError: pass comment = f"{current_comment}\n{count + 1}. {comment}" comment = comment_template.format(comment) trigger.comment = comment else: trigger = TriggerBlock( trigger_event_nr=1, trigger_0_time=timestamp, trigger_0_pretime=pre_time, trigger_0_posttime=post_time, ) if comment: comment = f"1. {comment}" comment = comment_template.format(comment) trigger.comment = comment group.trigger = trigger
[docs] def append( self, signals, acquisition_info="Python", common_timebase=False, units=None ): """Appends a new data group. For channel dependencies type Signals, the *samples* attribute must be a numpy.recarray Parameters ---------- signals : list | Signal | pandas.DataFrame list of *Signal* objects, or a single *Signal* object, or a pandas *DataFrame* object. All bytes columns in the pandas *DataFrame* must be *latin-1* encoded acquisition_info : str acquisition information; default 'Python' common_timebase : bool flag to hint that the signals have the same timebase. Only set this if you know for sure that all appended channels share the same time base units : dict will contain the signal units mapped to the singal names when appending a pandas DataFrame Examples -------- >>> # case 1 conversion type None >>> s1 = np.array([1, 2, 3, 4, 5]) >>> s2 = np.array([-1, -2, -3, -4, -5]) >>> s3 = np.array([0.1, 0.04, 0.09, 0.16, 0.25]) >>> t = np.array([0.001, 0.002, 0.003, 0.004, 0.005]) >>> names = ['Positive', 'Negative', 'Float'] >>> units = ['+', '-', '.f'] >>> info = {} >>> s1 = Signal(samples=s1, timstamps=t, unit='+', name='Positive') >>> s2 = Signal(samples=s2, timstamps=t, unit='-', name='Negative') >>> s3 = Signal(samples=s3, timstamps=t, unit='flts', name='Floats') >>> mdf = MDF3('new.mdf') >>> mdf.append([s1, s2, s3], 'created by asammdf v1.1.0') >>> # case 2: VTAB conversions from channels inside another file >>> mdf1 = MDF3('in.mdf') >>> ch1 = mdf1.get("Channel1_VTAB") >>> ch2 = mdf1.get("Channel2_VTABR") >>> sigs = [ch1, ch2] >>> mdf2 = MDF3('out.mdf') >>> mdf2.append(sigs, 'created by asammdf v1.1.0') >>> df = pd.DataFrame.from_dict({'s1': np.array([1, 2, 3, 4, 5]), 's2': np.array([-1, -2, -3, -4, -5])}) >>> units = {'s1': 'V', 's2': 'A'} >>> mdf2.append(df, units=units) """ if isinstance(signals, Signal): signals = [signals] elif isinstance(signals, DataFrame): self._append_dataframe(signals, acquisition_info, units=units) return version = self.version interp_mode = self._integer_interpolation # check if the signals have a common timebase # if not interpolate the signals using the union of all timbases if signals: timestamps = signals[0].timestamps if not common_timebase: for signal in signals[1:]: if not array_equal(signal.timestamps, timestamps): different = True break else: different = False if different: times = [s.timestamps for s in signals] timestamps = reduce(union1d, times).flatten().astype(float64) signals = [s.interp(timestamps, interpolation_mode=interp_mode) for s in signals] times = None else: timestamps = array([]) if self.version < "3.10": if timestamps.dtype.byteorder == ">": timestamps = timestamps.byteswap().newbyteorder() for signal in signals: if signal.samples.dtype.byteorder == ">": signal.samples = signal.samples.byteswap().newbyteorder() if self.version >= "3.00": channel_size = v23c.CN_DISPLAYNAME_BLOCK_SIZE elif self.version >= "2.10": channel_size = v23c.CN_LONGNAME_BLOCK_SIZE else: channel_size = v23c.CN_SHORT_BLOCK_SIZE file = self._tempfile tell = file.tell kargs = { "module_nr": 0, "module_address": 0, "type": v23c.SOURCE_ECU, "description": b"Channel inserted by Python Script", } ce_block = ChannelExtension(**kargs) canopen_time_fields = ("ms", "days") canopen_date_fields = ( "ms", "min", "hour", "day", "month", "year", "summer_time", "day_of_week", ) dg_cntr = len(self.groups) gp = Group(None) gp.channels = gp_channels = [] gp.channel_dependencies = gp_dep = [] gp.signal_types = gp_sig_types = [] gp.string_dtypes = [] self.groups.append(gp) cycles_nr = len(timestamps) fields = [] types = [] parents = {} ch_cntr = 0 offset = 0 field_names = UniqueDB() if signals: master_metadata = signals[0].master_metadata else: master_metadata = None if master_metadata: time_name = master_metadata[0] else: time_name = "time" if signals: # conversion for time channel kargs = { "conversion_type": v23c.CONVERSION_TYPE_NONE, "unit": b"s", "min_phy_value": timestamps[0] if cycles_nr else 0, "max_phy_value": timestamps[-1] if cycles_nr else 0, } conversion = ChannelConversion(**kargs) conversion.unit = "s" source = ce_block # time channel t_type, t_size = fmt_to_datatype_v3(timestamps.dtype, timestamps.shape) kargs = { "short_name": time_name.encode("latin-1"), "channel_type": v23c.CHANNEL_TYPE_MASTER, "data_type": t_type, "start_offset": 0, "min_raw_value": timestamps[0] if cycles_nr else 0, "max_raw_value": timestamps[-1] if cycles_nr else 0, "bit_count": t_size, "block_len": channel_size, "version": version, } channel = Channel(**kargs) channel.name = name = time_name channel.conversion = conversion channel.source = source gp_channels.append(channel) self.channels_db.add(name, (dg_cntr, ch_cntr)) self.masters_db[dg_cntr] = 0 # data group record parents parents[ch_cntr] = name, 0 # time channel doesn't have channel dependencies gp_dep.append(None) fields.append(timestamps) types.append((name, timestamps.dtype)) offset += t_size ch_cntr += 1 gp_sig_types.append(0) for signal in signals: sig = signal names = sig.samples.dtype.names name = signal.name if names is None: sig_type = v23c.SIGNAL_TYPE_SCALAR else: if names in (canopen_time_fields, canopen_date_fields): sig_type = v23c.SIGNAL_TYPE_CANOPEN elif names[0] != sig.name: sig_type = v23c.SIGNAL_TYPE_STRUCTURE_COMPOSITION else: sig_type = v23c.SIGNAL_TYPE_ARRAY gp_sig_types.append(sig_type) # conversions for channel conversion = conversion_transfer(signal.conversion) conversion.unit = unit = signal.unit israw = signal.raw if not israw and not unit: conversion = None if sig_type == v23c.SIGNAL_TYPE_SCALAR: # source for channel if signal.source: source = signal.source if source.source_type != 2: kargs = { "type": v23c.SOURCE_ECU, "description": source.name.encode("latin-1"), "ECU_identification": source.path.encode("latin-1"), } else: kargs = { "type": v23c.SOURCE_VECTOR, "message_name": source.name.encode("latin-1"), "sender_name": source.path.encode("latin-1"), } new_source = ChannelExtension(**kargs) else: new_source = ce_block # compute additional byte offset for large records size if offset > v23c.MAX_UINT16: additional_byte_offset = ceil((offset - v23c.MAX_UINT16) / 8) start_bit_offset = offset - additional_byte_offset * 8 else: start_bit_offset = offset additional_byte_offset = 0 s_type, s_size = fmt_to_datatype_v3( signal.samples.dtype, signal.samples.shape ) name = signal.name display_name = signal.display_name if signal.samples.dtype.kind == "u" and signal.bit_count <= 4: s_size_ = signal.bit_count else: s_size_ = s_size kargs = { "channel_type": v23c.CHANNEL_TYPE_VALUE, "data_type": s_type, "start_offset": start_bit_offset, "bit_count": s_size_, "additional_byte_offset": additional_byte_offset, "block_len": channel_size, "version": version, } if s_size < 8: s_size = 8 channel = Channel(**kargs) channel.name = signal.name channel.comment = signal.comment channel.source = new_source channel.conversion = conversion gp_channels.append(channel) offset += s_size entry = (dg_cntr, ch_cntr) self.channels_db.add(name, entry) self.channels_db.add(display_name, entry) # update the parents as well field_name = field_names.get_unique_name(name) parents[ch_cntr] = field_name, 0 if signal.samples.dtype.kind == "S": gp.string_dtypes.append(signal.samples.dtype) fields.append(signal.samples) if s_type != v23c.DATA_TYPE_BYTEARRAY: types.append((field_name, signal.samples.dtype)) else: types.append( (field_name, signal.samples.dtype, signal.samples.shape[1:]) ) ch_cntr += 1 # simple channels don't have channel dependencies gp_dep.append(None) # second, add the composed signals elif sig_type in ( v23c.SIGNAL_TYPE_CANOPEN, v23c.SIGNAL_TYPE_STRUCTURE_COMPOSITION, ): new_dg_cntr = len(self.groups) new_gp = {} new_gp.channels = new_gp_channels = [] new_gp.channel_dependencies = new_gp_dep = [] new_gp.signal_types = new_gp_sig_types = [] self.groups.append(new_gp) new_fields = [] new_types = [] new_parents = {} new_ch_cntr = 0 new_offset = 0 new_field_names = UniqueDB() # conversion for time channel kargs = { "conversion_type": v23c.CONVERSION_TYPE_NONE, "unit": b"s", "min_phy_value": timestamps[0] if cycles_nr else 0, "max_phy_value": timestamps[-1] if cycles_nr else 0, } conversion = ChannelConversion(**kargs) conversion.unit = "s" source = ce_block # time channel t_type, t_size = fmt_to_datatype_v3(timestamps.dtype, timestamps.shape) kargs = { "short_name": time_name.encode("latin-1"), "channel_type": v23c.CHANNEL_TYPE_MASTER, "data_type": t_type, "start_offset": 0, "min_raw_value": timestamps[0] if cycles_nr else 0, "max_raw_value": timestamps[-1] if cycles_nr else 0, "bit_count": t_size, "block_len": channel_size, "version": version, } channel = Channel(**kargs) channel.name = name = time_name channel.source = source channel.conversion = conversion gp_channels.append(channel) self.channels_db.add(name, (new_dg_cntr, new_ch_cntr)) self.masters_db[new_dg_cntr] = 0 # data group record parents new_parents[new_ch_cntr] = name, 0 # time channel doesn't have channel dependencies new_gp_dep.append(None) new_fields.append(timestamps) new_types.append((name, timestamps.dtype)) new_field_names.get_unique_name(name) new_gp_sig_types.append(0) new_offset += t_size new_ch_cntr += 1 names = signal.samples.dtype.names if names == ("ms", "days"): channel_group_comment = "From mdf v4 CANopen Time channel" elif names == ( "ms", "min", "hour", "day", "month", "year", "summer_time", "day_of_week", ): channel_group_comment = "From mdf v4 CANopen Date channel" else: channel_group_comment = "From mdf v4 structure channel composition" for name in names: samples = signal.samples[name] # conversions for channel kargs = { "conversion_type": v23c.CONVERSION_TYPE_NONE, "unit": signal.unit.encode("latin-1"), "min_phy_value": 0, "max_phy_value": 0, } conversion = ChannelConversion(**kargs) # source for channel if signal.source: source = signal.source if source.source_type != 2: kargs = { "type": v23c.SOURCE_ECU, "description": source.name.encode("latin-1"), "ECU_identification": source.path.encode("latin-1"), } else: kargs = { "type": v23c.SOURCE_VECTOR, "message_name": source.name.encode("latin-1"), "sender_name": source.path.encode("latin-1"), } source = ChannelExtension(**kargs) else: source = ce_block # compute additional byte offset for large records size if new_offset > v23c.MAX_UINT16: additional_byte_offset = (new_offset - v23c.MAX_UINT16) >> 3 start_bit_offset = new_offset - additional_byte_offset << 3 else: start_bit_offset = new_offset additional_byte_offset = 0 s_type, s_size = fmt_to_datatype_v3(samples.dtype, samples.shape) kargs = { "channel_type": v23c.CHANNEL_TYPE_VALUE, "data_type": s_type, "start_offset": start_bit_offset, "bit_count": s_size, "additional_byte_offset": additional_byte_offset, "block_len": channel_size, "version": version, } if s_size < 8: s_size = 8 channel = Channel(**kargs) channel.name = name channel.source = source channel.conversion = conversion new_gp_channels.append(channel) new_offset += s_size self.channels_db.add(name, (new_dg_cntr, new_ch_cntr)) # update the parents as well field_name = new_field_names.get_unique_name(name) new_parents[new_ch_cntr] = field_name, 0 new_fields.append(samples) new_types.append((field_name, samples.dtype)) new_ch_cntr += 1 # simple channels don't have channel dependencies new_gp_dep.append(None) # channel group kargs = { "cycles_nr": cycles_nr, "samples_byte_nr": new_offset >> 3, "ch_nr": new_ch_cntr, } new_gp.channel_group = ChannelGroup(**kargs) new_gp.channel_group.comment = channel_group_comment new_gp.size = cycles_nr * (new_offset >> 3) # data group if self.version >= "3.20": block_len = v23c.DG_POST_320_BLOCK_SIZE else: block_len = v23c.DG_PRE_320_BLOCK_SIZE new_gp.data_group = DataGroup(block_len=block_len) # data block new_types = dtype(new_types) new_gp.types = new_types new_gp.parents = new_parents new_gp.sorted = True samples = fromarrays(new_fields, dtype=new_types) block = samples.tostring() new_gp.data_location = v23c.LOCATION_TEMPORARY_FILE if cycles_nr: data_address = tell() new_gp.data_group.data_block_addr = data_address new_gp.data_block_addr = [data_address] new_gp.data_block_size = [new_gp.size] self._tempfile.write(block) else: new_gp.data_group.data_block_addr = 0 new_gp.data_block_addr = [0] new_gp.data_block_size = [0] # data group trigger new_gp.trigger = None else: names = signal.samples.dtype.names name = signal.name component_names = [] component_samples = [] if names: samples = signal.samples[names[0]] else: samples = signal.samples shape = samples.shape[1:] dims = [list(range(size)) for size in shape] for indexes in product(*dims): subarray = samples for idx in indexes: subarray = subarray[:, idx] component_samples.append(subarray) indexes = "".join(f"[{idx}]" for idx in indexes) component_name = f"{name}{indexes}" component_names.append(component_name) # add channel dependency block for composed parent channel sd_nr = len(component_samples) kargs = {"sd_nr": sd_nr} for i, dim in enumerate(shape[::-1]): kargs[f"dim_{i}"] = dim parent_dep = ChannelDependency(**kargs) gp_dep.append(parent_dep) # source for channel if signal.source: source = signal.source if source.source_type != 2: kargs = { "type": v23c.SOURCE_ECU, "description": source.name.encode("latin-1"), "ECU_identification": source.path.encode("latin-1"), } else: kargs = { "type": v23c.SOURCE_VECTOR, "message_name": source.name.encode("latin-1"), "sender_name": source.path.encode("latin-1"), } source = ChannelExtension(**kargs) else: source = ce_block s_type, s_size = fmt_to_datatype_v3(samples.dtype, (), True) # compute additional byte offset for large records size if offset > v23c.MAX_UINT16: additional_byte_offset = (offset - v23c.MAX_UINT16) >> 3 start_bit_offset = offset - additional_byte_offset << 3 else: start_bit_offset = offset additional_byte_offset = 0 kargs = { "channel_type": v23c.CHANNEL_TYPE_VALUE, "data_type": s_type, "start_offset": start_bit_offset, "bit_count": s_size, "additional_byte_offset": additional_byte_offset, "block_len": channel_size, "version": version, } if s_size < 8: s_size = 8 channel = Channel(**kargs) channel.comment = signal.comment channel.display_name = signal.display_name gp_channels.append(channel) self.channels_db.add(name, (dg_cntr, ch_cntr)) ch_cntr += 1 for i, (name, samples) in enumerate( zip(component_names, component_samples) ): if i < sd_nr: dep_pair = dg_cntr, ch_cntr parent_dep.referenced_channels.append(dep_pair) description = b"\0" else: description = f"{signal.name} - axis {name}" description = description.encode("latin-1") s_type, s_size = fmt_to_datatype_v3(samples.dtype, ()) shape = samples.shape[1:] # source for channel if signal.source: source = signal.source if source.source_type != 2: kargs = { "type": v23c.SOURCE_ECU, "description": source.name.encode("latin-1"), "ECU_identification": source.path.encode("latin-1"), } else: kargs = { "type": v23c.SOURCE_VECTOR, "message_name": source.name.encode("latin-1"), "sender_name": source.path.encode("latin-1"), } source = ChannelExtension(**kargs) else: source = ce_block # compute additional byte offset for large records size if offset > v23c.MAX_UINT16: additional_byte_offset = (offset - v23c.MAX_UINT16) >> 3 start_bit_offset = offset - additional_byte_offset << 3 else: start_bit_offset = offset additional_byte_offset = 0 kargs = { "channel_type": v23c.CHANNEL_TYPE_VALUE, "data_type": s_type, "start_offset": start_bit_offset, "bit_count": s_size, "additional_byte_offset": additional_byte_offset, "block_len": channel_size, "description": description, "version": version, } if s_size < 8: s_size = 8 channel = Channel(**kargs) channel.name = name channel.source = source gp_channels.append(channel) size = s_size for dim in shape: size *= dim offset += size self.channels_db.add(name, (dg_cntr, ch_cntr)) # update the parents as well field_name = field_names.get_unique_name(name) parents[ch_cntr] = field_name, 0 fields.append(samples) types.append((field_name, samples.dtype, shape)) gp_dep.append(None) ch_cntr += 1 for name in names[1:]: samples = signal.samples[name] component_names = [] component_samples = [] shape = samples.shape[1:] dims = [list(range(size)) for size in shape] for indexes in product(*dims): subarray = samples for idx in indexes: subarray = subarray[:, idx] component_samples.append(subarray) indexes = "".join(f"[{idx}]" for idx in indexes) component_name = f"{name}{indexes}" component_names.append(component_name) # add channel dependency block for composed parent channel sd_nr = len(component_samples) kargs = {"sd_nr": sd_nr} for i, dim in enumerate(shape[::-1]): kargs["dim_{}".format(i)] = dim parent_dep = ChannelDependency(**kargs) gp_dep.append(parent_dep) # source for channel if signal.source: source = signal.source if source.source_type != 2: kargs = { "type": v23c.SOURCE_ECU, "description": source.name.encode("latin-1"), "ECU_identification": source.path.encode("latin-1"), } else: kargs = { "type": v23c.SOURCE_VECTOR, "message_name": source.name.encode("latin-1"), "sender_name": source.path.encode("latin-1"), } source = ChannelExtension(**kargs) else: source = ce_block s_type, s_size = fmt_to_datatype_v3(samples.dtype, ()) # compute additional byte offset for large records size if offset > v23c.MAX_UINT16: additional_byte_offset = (offset - v23c.MAX_UINT16) >> 3 start_bit_offset = offset - additional_byte_offset << 3 else: start_bit_offset = offset additional_byte_offset = 0 kargs = { "channel_type": v23c.CHANNEL_TYPE_VALUE, "data_type": s_type, "start_offset": start_bit_offset, "bit_count": s_size, "additional_byte_offset": additional_byte_offset, "block_len": channel_size, "version": version, } if s_size < 8: s_size = 8 channel = Channel(**kargs) channel.name = name channel.comment = signal.comment channel.source = source gp_channels.append(channel) self.channels_db.add(name, (dg_cntr, ch_cntr)) ch_cntr += 1 for i, (name, samples) in enumerate( zip(component_names, component_samples) ): if i < sd_nr: dep_pair = dg_cntr, ch_cntr parent_dep.referenced_channels.append(dep_pair) description = b"\0" else: description = f"{signal.name} - axis {name}" description = description.encode("latin-1") s_type, s_size = fmt_to_datatype_v3(samples.dtype, ()) shape = samples.shape[1:] # source for channel if signal.source: source = signal.source if source.source_type != 2: kargs = { "type": v23c.SOURCE_ECU, "description": source.name.encode("latin-1"), "ECU_identification": source.path.encode("latin-1"), } else: kargs = { "type": v23c.SOURCE_VECTOR, "message_name": source.name.encode("latin-1"), "sender_name": source.path.encode("latin-1"), } source = ChannelExtension(**kargs) else: source = ce_block # compute additional byte offset for large records size if offset > v23c.MAX_UINT16: additional_byte_offset = (offset - v23c.MAX_UINT16) >> 3 start_bit_offset = offset - additional_byte_offset << 3 else: start_bit_offset = offset additional_byte_offset = 0 kargs = { "channel_type": v23c.CHANNEL_TYPE_VALUE, "data_type": s_type, "start_offset": start_bit_offset, "bit_count": s_size, "additional_byte_offset": additional_byte_offset, "block_len": channel_size, "description": description, "version": version, } if s_size < 8: s_size = 8 channel = Channel(**kargs) channel.name = name channel.source = source gp_channels.append(channel) size = s_size for dim in shape: size *= dim offset += size self.channels_db.add(name, (dg_cntr, ch_cntr)) # update the parents as well field_name = field_names.get_unique_name(name) parents[ch_cntr] = field_name, 0 fields.append(samples) types.append((field_name, samples.dtype, shape)) gp_dep.append(None) ch_cntr += 1 # channel group kargs = { "cycles_nr": cycles_nr, "samples_byte_nr": offset >> 3, "ch_nr": ch_cntr, } if self.version >= "3.30": kargs["block_len"] = v23c.CG_POST_330_BLOCK_SIZE else: kargs["block_len"] = v23c.CG_PRE_330_BLOCK_SIZE gp.channel_group = ChannelGroup(**kargs) gp.channel_group.comment = acquisition_info gp.size = cycles_nr * (offset >> 3) # data group if self.version >= "3.20": block_len = v23c.DG_POST_320_BLOCK_SIZE else: block_len = v23c.DG_PRE_320_BLOCK_SIZE gp.data_group = DataGroup(block_len=block_len) # data block types = dtype(types) gp.types = types gp.parents = parents gp.sorted = True if signals: samples = fromarrays(fields, dtype=types) else: samples = array([]) block = samples.tostring() gp.data_location = v23c.LOCATION_TEMPORARY_FILE if cycles_nr: data_address = tell() gp.data_group.data_block_addr = data_address gp.data_block_addr = [data_address] gp.data_block_size = [len(block)] self._tempfile.write(block) else: gp.data_group.data_block_addr = 0 gp.data_block_addr = [0] gp.data_block_size = [0] # data group trigger gp.trigger = None
def _append_dataframe(self, df, source_info="", units=None): """ Appends a new data group from a Pandas data frame. """ units = units or {} t = df.index index_name = df.index.name time_name = index_name or "time" version = self.version timestamps = t # if self.version < '3.10': # if timestamps.dtype.byteorder == '>': # timestamps = timestamps.byteswap().newbyteorder() # for signal in signals: # if signal.samples.dtype.byteorder == '>': # signal.samples = signal.samples.byteswap().newbyteorder() if self.version >= "3.00": channel_size = v23c.CN_DISPLAYNAME_BLOCK_SIZE elif self.version >= "2.10": channel_size = v23c.CN_LONGNAME_BLOCK_SIZE else: channel_size = v23c.CN_SHORT_BLOCK_SIZE file = self._tempfile tell = file.tell kargs = { "module_nr": 0, "module_address": 0, "type": v23c.SOURCE_ECU, "description": b"Channel inserted by Python Script", } ce_block = ChannelExtension(**kargs) dg_cntr = len(self.groups) gp = {} gp.channels = gp_channels = [] gp.channel_dependencies = gp_dep = [] gp.signal_types = gp_sig_types = [] gp.string_dtypes = [] self.groups.append(gp) cycles_nr = len(timestamps) fields = [] types = [] parents = {} ch_cntr = 0 offset = 0 field_names = UniqueDB() if df.shape[0]: # conversion for time channel kargs = { "conversion_type": v23c.CONVERSION_TYPE_NONE, "unit": b"s", "min_phy_value": timestamps[0] if cycles_nr else 0, "max_phy_value": timestamps[-1] if cycles_nr else 0, } conversion = ChannelConversion(**kargs) conversion.unit = "s" source = ce_block # time channel t_type, t_size = fmt_to_datatype_v3(timestamps.dtype, timestamps.shape) kargs = { "short_name": time_name.encode("latin-1"), "channel_type": v23c.CHANNEL_TYPE_MASTER, "data_type": t_type, "start_offset": 0, "min_raw_value": timestamps[0] if cycles_nr else 0, "max_raw_value": timestamps[-1] if cycles_nr else 0, "bit_count": t_size, "block_len": channel_size, "version": version, } channel = Channel(**kargs) channel.name = name = time_name channel.conversion = conversion channel.source = source gp_channels.append(channel) self.channels_db.add(name, (dg_cntr, ch_cntr)) self.masters_db[dg_cntr] = 0 # data group record parents parents[ch_cntr] = name, 0 # time channel doesn't have channel dependencies gp_dep.append(None) fields.append(timestamps) types.append((name, timestamps.dtype)) field_names.add(name) offset += t_size ch_cntr += 1 gp_sig_types.append(0) for signal in df: sig = df[signal] name = signal sig_type = v23c.SIGNAL_TYPE_SCALAR gp_sig_types.append(sig_type) new_source = ce_block # compute additional byte offset for large records size if offset > v23c.MAX_UINT16: additional_byte_offset = ceil((offset - v23c.MAX_UINT16) / 8) start_bit_offset = offset - additional_byte_offset * 8 else: start_bit_offset = offset additional_byte_offset = 0 s_type, s_size = fmt_to_datatype_v3(sig.dtype, sig.shape) kargs = { "channel_type": v23c.CHANNEL_TYPE_VALUE, "data_type": s_type, "min_raw_value": 0, "max_raw_value": 0, "start_offset": start_bit_offset, "bit_count": s_size, "additional_byte_offset": additional_byte_offset, "block_len": channel_size, "version": version, } if s_size < 8: s_size = 8 channel = Channel(**kargs) channel.name = name channel.source = new_source unit = units.get(name, b"") if unit: if hasattr(unit, "encode"): unit = unit.encode("latin-1") # conversion for time channel kargs = { "conversion_type": v23c.CONVERSION_TYPE_NONE, "unit": unit, "min_phy_value": 0, "max_phy_value": 0, } conversion = ChannelConversion(**kargs) conversion.unit = unit gp_channels.append(channel) offset += s_size self.channels_db.add(name, (dg_cntr, ch_cntr)) # update the parents as well field_name = field_names.get_unique_name(name) parents[ch_cntr] = field_name, 0 if sig.dtype.kind == "S": gp.string_dtypes.append(sig.dtype) fields.append(sig) types.append((field_name, sig.dtype)) ch_cntr += 1 # simple channels don't have channel dependencies gp_dep.append(None) # channel group kargs = { "cycles_nr": cycles_nr, "samples_byte_nr": offset >> 3, "ch_nr": ch_cntr, } if self.version >= "3.30": kargs["block_len"] = v23c.CG_POST_330_BLOCK_SIZE else: kargs["block_len"] = v23c.CG_PRE_330_BLOCK_SIZE gp.channel_group = ChannelGroup(**kargs) gp.channel_group.comment = source_info gp.size = cycles_nr * (offset >> 3) # data group if self.version >= "3.20": block_len = v23c.DG_POST_320_BLOCK_SIZE else: block_len = v23c.DG_PRE_320_BLOCK_SIZE gp.data_group = DataGroup(block_len=block_len) # data block types = dtype(types) gp.types = types gp.parents = parents gp.sorted = True if df.shape[0]: samples = fromarrays(fields, dtype=types) else: samples = array([]) block = samples.tostring() gp.data_location = v23c.LOCATION_TEMPORARY_FILE if cycles_nr: data_address = tell() gp.data_group.data_block_addr = data_address gp.data_block_addr = [data_address] gp.data_block_size = [len(block)] self._tempfile.write(block) else: gp.data_group.data_block_addr = 0 gp.data_block_addr = [0] gp.data_block_size = [0] # data group trigger gp.trigger = None
[docs] def close(self): """ if the MDF was created with memory='minimum' and new channels have been appended, then this must be called just before the object is not used anymore to clean-up the temporary file """ if self._tempfile is not None: self._tempfile.close() if self._file is not None and not self._from_filelike: self._file.close()
[docs] def extend(self, index, signals): """ Extend a group with new samples. *signals* contains (values, invalidation_bits) pairs for each extended signal. Since MDF3 does not support invalidation bits, the second item of each pair must be None. The first pair is the master channel's pair, and the next pairs must respect the same order in which the signals were appended. The samples must have raw or physical values according to the *Signals* used for the initial append. Parameters ---------- index : int group index signals : list list of (numpy.ndarray, None) objects Examples -------- >>> # case 1 conversion type None >>> s1 = np.array([1, 2, 3, 4, 5]) >>> s2 = np.array([-1, -2, -3, -4, -5]) >>> s3 = np.array([0.1, 0.04, 0.09, 0.16, 0.25]) >>> t = np.array([0.001, 0.002, 0.003, 0.004, 0.005]) >>> names = ['Positive', 'Negative', 'Float'] >>> units = ['+', '-', '.f'] >>> s1 = Signal(samples=s1, timstamps=t, unit='+', name='Positive') >>> s2 = Signal(samples=s2, timstamps=t, unit='-', name='Negative') >>> s3 = Signal(samples=s3, timstamps=t, unit='flts', name='Floats') >>> mdf = MDF3('new.mdf') >>> mdf.append([s1, s2, s3], 'created by asammdf v1.1.0') >>> t = np.array([0.006, 0.007, 0.008, 0.009, 0.010]) >>> mdf2.extend(0, [(t, None), (s1.samples, None), (s2.samples, None), (s3.samples, None)]) """ new_group_offset = 0 gp = self.groups[index] if not signals: message = '"append" requires a non-empty list of Signal objects' raise MdfException(message) if gp.data_location == v23c.LOCATION_ORIGINAL_FILE: stream = self._file else: stream = self._tempfile canopen_time_fields = ("ms", "days") canopen_date_fields = ( "ms", "min", "hour", "day", "month", "year", "summer_time", "day_of_week", ) fields = [] types = [] cycles_nr = len(signals[0][0]) string_counter = 0 for signal, _ in signals: sig = signal names = sig.dtype.names if len(sig.shape) <= 1: if names is None: sig_type = v23c.SIGNAL_TYPE_SCALAR else: if names in (canopen_time_fields, canopen_date_fields): sig_type = v23c.SIGNAL_TYPE_CANOPEN elif names[0] != sig.name: sig_type = v23c.SIGNAL_TYPE_STRUCTURE_COMPOSITION else: sig_type = v23c.SIGNAL_TYPE_ARRAY else: sig_type = v23c.SIGNAL_TYPE_ARRAY if sig_type == v23c.SIGNAL_TYPE_SCALAR: if signal.dtype.kind == "S": str_dtype = gp.string_dtypes[string_counter] signal = signal.astype(str_dtype) string_counter += 1 fields.append(signal) if signal.shape[1:]: types.append(("", signal.dtype, signal.shape[1:])) else: types.append(("", signal.dtype)) # second, add the composed signals elif sig_type in ( v23c.SIGNAL_TYPE_CANOPEN, v23c.SIGNAL_TYPE_STRUCTURE_COMPOSITION, ): new_group_offset += 1 new_gp = self.groups[index + new_group_offset] new_fields = [] new_types = [] names = signal.dtype.names for name in names: new_fields.append(signal[name]) new_types.append(("", signal.dtype)) # data block new_types = dtype(new_types) samples = fromarrays(new_fields, dtype=new_types) samples = samples.tostring() record_size = new_gp.channel_group.samples_byte_nr extended_size = cycles_nr * record_size new_gp.size += extended_size if samples: stream.seek(0, 2) data_address = stream.tell() new_gp.data_block_addr.append(data_address) new_gp.data_block_size.append(extended_size) stream.write(samples) else: names = signal.dtype.names component_samples = [] if names: samples = signal[names[0]] else: samples = signal shape = samples.shape[1:] dims = [list(range(size)) for size in shape] for indexes in product(*dims): subarray = samples for idx in indexes: subarray = subarray[:, idx] component_samples.append(subarray) if names: new_samples = [signal[fld] for fld in names[1:]] component_samples.extend(new_samples) for samples in component_samples: shape = samples.shape[1:] fields.append(samples) types.append(("", samples.dtype, shape)) record_size = gp.channel_group.samples_byte_nr extended_size = cycles_nr * record_size gp.size += extended_size # data block types = dtype(types) samples = fromarrays(fields, dtype=types) samples = samples.tostring() if cycles_nr: stream.seek(0, 2) data_address = stream.tell() gp.data_block_addr.append(data_address) gp.data_block_size.append(extended_size) stream.write(samples) gp.channel_group.cycles_nr += cycles_nr
[docs] def get_channel_name(self, group, index): """Gets channel name. Parameters ---------- group : int 0-based group index index : int 0-based channel index Returns ------- name : str found channel name """ gp_nr, ch_nr = self._validate_channel_selection(None, group, index) grp = self.groups[gp_nr] if grp.data_location == v23c.LOCATION_ORIGINAL_FILE: stream = self._file else: stream = self._tempfile channel = grp.channels[ch_nr] return channel.name
def get_channel_metadata(self, name=None, group=None, index=None): gp_nr, ch_nr = self._validate_channel_selection(name, group, index) grp = self.groups[gp_nr] if grp.data_location == v23c.LOCATION_ORIGINAL_FILE: stream = self._file else: stream = self._tempfile channel = grp.channels[ch_nr] channel = deepcopy(channel) return channel
[docs] def get_channel_unit(self, name=None, group=None, index=None): """Gets channel unit. Channel can be specified in two ways: * using the first positional argument *name* * if there are multiple occurances for this channel then the *group* and *index* arguments can be used to select a specific group. * if there are multiple occurances for this channel and either the *group* or *index* arguments is None then a warning is issued * using the group number (keyword argument *group*) and the channel number (keyword argument *index*). Use *info* method for group and channel numbers If the *raster* keyword argument is not *None* the output is interpolated accordingly. Parameters ---------- name : string name of channel group : int 0-based group index index : int 0-based channel index Returns ------- unit : str found channel unit """ gp_nr, ch_nr = self._validate_channel_selection(name, group, index) grp = self.groups[gp_nr] if grp.data_location == v23c.LOCATION_ORIGINAL_FILE: stream = self._file else: stream = self._tempfile channel = grp.channels[ch_nr] if channel.conversion: unit = channel.conversion.unit else: unit = "" return unit
[docs] def get_channel_comment(self, name=None, group=None, index=None): """Gets channel comment. Channel can be specified in two ways: * using the first positional argument *name* * if there are multiple occurances for this channel then the *group* and *index* arguments can be used to select a specific group. * if there are multiple occurances for this channel and either the *group* or *index* arguments is None then a warning is issued * using the group number (keyword argument *group*) and the channel number (keyword argument *index*). Use *info* method for group and channel numbers If the *raster* keyword argument is not *None* the output is interpolated accordingly. Parameters ---------- name : string name of channel group : int 0-based group index index : int 0-based channel index Returns ------- comment : str found channel comment """ gp_nr, ch_nr = self._validate_channel_selection(name, group, index) grp = self.groups[gp_nr] if grp.data_location == v23c.LOCATION_ORIGINAL_FILE: stream = self._file else: stream = self._tempfile channel = grp.channels[ch_nr] return channel.comment
[docs] def get( self, name=None, group=None, index=None, raster=None, samples_only=False, data=None, raw=False, ignore_invalidation_bits=False, source=None, record_offset=0, record_count=None, copy_master=True, ): """Gets channel samples. Channel can be specified in two ways: * using the first positional argument *name* * if *source* is given this will be first used to validate the channel selection * if there are multiple occurances for this channel then the *group* and *index* arguments can be used to select a specific group. * if there are multiple occurances for this channel and either the *group* or *index* arguments is None then a warning is issued * using the group number (keyword argument *group*) and the channel number (keyword argument *index*). Use *info* method for group and channel numbers If the *raster* keyword argument is not *None* the output is interpolated accordingly. Parameters ---------- name : string name of channel group : int 0-based group index index : int 0-based channel index raster : float time raster in seconds samples_only : bool if *True* return only the channel samples as numpy array; if *False* return a *Signal* object data : bytes prevent redundant data read by providing the raw data group samples raw : bool return channel samples without appling the conversion rule; default `False` ignore_invalidation_bits : bool only defined to have the same API with the MDF v4 source : str source name used to select the channel record_offset : int if *data=None* use this to select the record offset from which the group data should be loaded copy_master : bool make a copy of the timebase for this channel Returns ------- res : (numpy.array, None) | Signal returns *Signal* if *samples_only*=*False* (default option), otherwise returns a (numpy.array, None) tuple (for compatibility with MDF v4 class. The *Signal* samples are * numpy recarray for channels that have CDBLOCK or BYTEARRAY type channels * numpy array for all the rest Raises ------ MdfException : * if the channel name is not found * if the group index is out of range * if the channel index is out of range Examples -------- >>> from asammdf import MDF, Signal >>> import numpy as np >>> t = np.arange(5) >>> s = np.ones(5) >>> mdf = MDF(version='3.30') >>> for i in range(4): ... sigs = [Signal(s*(i*10+j), t, name='Sig') for j in range(1, 4)] ... mdf.append(sigs) ... >>> # first group and channel index of the specified channel name ... >>> mdf.get('Sig') UserWarning: Multiple occurances for channel "Sig". Using first occurance from data group 4. Provide both "group" and "index" arguments to select another data group <Signal Sig: samples=[ 1. 1. 1. 1. 1.] timestamps=[0 1 2 3 4] unit="" info=None comment=""> >>> # first channel index in the specified group ... >>> mdf.get('Sig', 1) <Signal Sig: samples=[ 11. 11. 11. 11. 11.] timestamps=[0 1 2 3 4] unit="" info=None comment=""> >>> # channel named Sig from group 1 channel index 2 ... >>> mdf.get('Sig', 1, 2) <Signal Sig: samples=[ 12. 12. 12. 12. 12.] timestamps=[0 1 2 3 4] unit="" info=None comment=""> >>> # channel index 1 or group 2 ... >>> mdf.get(None, 2, 1) <Signal Sig: samples=[ 21. 21. 21. 21. 21.] timestamps=[0 1 2 3 4] unit="" info=None comment=""> >>> mdf.get(group=2, index=1) <Signal Sig: samples=[ 21. 21. 21. 21. 21.] timestamps=[0 1 2 3 4] unit="" info=None comment=""> >>> mdf.get('Sig', source='VN7060') <Signal Sig: samples=[ 12. 12. 12. 12. 12.] timestamps=[0 1 2 3 4] unit="" info=None comment=""> """ gp_nr, ch_nr = self._validate_channel_selection( name, group, index, source=source ) original_data = data grp = self.groups[gp_nr] if grp.data_location == v23c.LOCATION_ORIGINAL_FILE: stream = self._file else: stream = self._tempfile channel = grp.channels[ch_nr] conversion = channel.conversion name = channel.name display_name = channel.display_name bit_count = channel.bit_count or 64 dep = grp.channel_dependencies[ch_nr] cycles_nr = grp.channel_group.cycles_nr encoding = None # get data group record if data is None: data = self._load_data( grp, record_offset=record_offset, record_count=record_count ) else: data = (data,) # check if this is a channel array if dep: if dep.dependency_type == v23c.DEPENDENCY_TYPE_VECTOR: shape = [dep.sd_nr] elif dep.dependency_type >= v23c.DEPENDENCY_TYPE_NDIM: shape = [] i = 0 while True: try: dim = dep[f"dim_{i}"] shape.append(dim) i += 1 except KeyError: break shape = shape[::-1] record_shape = tuple(shape) arrays = [ self.get( group=dg_nr, index=ch_nr, samples_only=True, raw=raw, data=original_data, record_offset=record_offset, record_count=record_count, )[0] for dg_nr, ch_nr in dep.referenced_channels ] shape.insert(0, cycles_nr) vals = column_stack(arrays).flatten().reshape(tuple(shape)) arrays = [vals] types = [(channel.name, vals.dtype, record_shape)] types = dtype(types) vals = fromarrays(arrays, dtype=types) if not samples_only or raster: timestamps = self.get_master( gp_nr, original_data, record_offset=record_offset, record_count=record_count, copy_master=copy_master, ) if raster and len(timestamps) > 1: num = float(float32((timestamps[-1] - timestamps[0]) / raster)) if num.is_integer(): t = linspace(timestamps[0], timestamps[-1], int(num)) else: t = arange(timestamps[0], timestamps[-1], raster) vals = ( Signal(vals, timestamps, name="_") .interp(t, interpolation_mode=self._integer_interpolation) .samples ) timestamps = t else: # get channel values channel_values = [] timestamps = [] count = 0 for fragment in data: data_bytes, _offset, _count = fragment parents, dtypes = self._prepare_record(grp) try: parent, bit_offset = parents[ch_nr] except KeyError: parent, bit_offset = None, None bits = channel.bit_count if parent is not None: if grp.record is None: if dtypes.itemsize: record = fromstring(data_bytes, dtype=dtypes) else: record = None else: record = grp.record record.setflags(write=False) vals = record[parent] data_type = channel.data_type size = vals.dtype.itemsize if data_type == v23c.DATA_TYPE_BYTEARRAY: size *= vals.shape[1] vals_dtype = vals.dtype.kind if vals_dtype not in "ui" and (bit_offset or not bits == size * 8): vals = self._get_not_byte_aligned_data(data_bytes, grp, ch_nr) else: dtype_ = vals.dtype kind_ = dtype_.kind if data_type in v23c.INT_TYPES: if kind_ == 'f': if bits != size * 8: vals = self._get_not_byte_aligned_data( data_bytes, grp, ch_nr ) else: dtype_fmt = get_fmt_v3(data_type, bits) channel_dtype = dtype(dtype_fmt.split(')')[-1]) vals = vals.view(channel_dtype) else: if dtype_.byteorder == ">": if bit_offset or bits != size << 3: vals = self._get_not_byte_aligned_data( data_bytes, grp, ch_nr ) else: if bit_offset: if dtype_.kind == "i": vals = vals.astype( dtype(f"{dtype_.byteorder}u{size}") ) vals >>= bit_offset else: vals = vals >> bit_offset if bits != size << 3: if data_type in v23c.SIGNED_INT: vals = as_non_byte_sized_signed_int(vals, bits) else: mask = (1 << bits) - 1 if vals.flags.writeable: vals &= mask else: vals = vals & mask else: if bits != size * 8: vals = self._get_not_byte_aligned_data( data_bytes, grp, ch_nr ) else: if kind_ in "ui": dtype_fmt = get_fmt_v3(data_type, bits) channel_dtype = dtype(dtype_fmt.split(')')[-1]) vals = vals.view(channel_dtype) else: vals = self._get_not_byte_aligned_data(data_bytes, grp, ch_nr) if not samples_only or raster: timestamps.append( self.get_master(gp_nr, fragment, copy_master=copy_master) ) if bits == 1 and self._single_bit_uint_as_bool: vals = array(vals, dtype=bool) else: data_type = channel.data_type channel_dtype = array([], dtype=get_fmt_v3(data_type, bits)) if vals.dtype != channel_dtype.dtype: vals = vals.astype(channel_dtype.dtype) channel_values.append(vals.copy()) count += 1 if count > 1: vals = concatenate(channel_values) elif count == 1: vals = channel_values[0] else: vals = [] if not samples_only or raster: if count > 1: timestamps = concatenate(timestamps) else: timestamps = timestamps[0] if raster and len(timestamps) > 1: num = float(float32((timestamps[-1] - timestamps[0]) / raster)) if num.is_integer(): t = linspace(timestamps[0], timestamps[-1], int(num)) else: t = arange(timestamps[0], timestamps[-1], raster) vals = ( Signal(vals, timestamps, name="_") .interp(t, interpolation_mode=self._integer_interpolation) .samples ) timestamps = t if conversion is None: conversion_type = v23c.CONVERSION_TYPE_NONE else: conversion_type = conversion.conversion_type if conversion_type == v23c.CONVERSION_TYPE_NONE: if vals.dtype.kind == "S": encoding = "latin-1" elif conversion_type in ( v23c.CONVERSION_TYPE_LINEAR, v23c.CONVERSION_TYPE_TABI, v23c.CONVERSION_TYPE_TAB, v23c.CONVERSION_TYPE_EXPO, v23c.CONVERSION_TYPE_LOGH, v23c.CONVERSION_TYPE_RAT, v23c.CONVERSION_TYPE_POLY, v23c.CONVERSION_TYPE_FORMULA, ): if not raw: try: vals = conversion.convert(vals) conversion = None except: print(channel, conversion) raise elif conversion_type in ( v23c.CONVERSION_TYPE_TABX, v23c.CONVERSION_TYPE_RTABX, ): raw = True if samples_only: res = vals, None else: if conversion: unit = conversion.unit else: unit = "" comment = channel.comment description = channel.description.decode("latin-1").strip(" \t\n\0") if comment: comment = f"{comment}\n{description}" else: comment = description source = channel.source if source: if source["type"] == v23c.SOURCE_ECU: source = SignalSource( source.name, source.path, source.comment, 0, # source type other 0, # bus type none ) else: source = SignalSource( source.name, source.path, source.comment, 2, # source type bus 2, # bus type CAN ) master_metadata = self._master_channel_metadata.get(gp_nr, None) res = Signal( samples=vals, timestamps=timestamps, unit=unit, name=channel.name, comment=comment, conversion=conversion, raw=raw, master_metadata=master_metadata, display_name=display_name, source=source, bit_count=bit_count, encoding=encoding, ) return res
[docs] def get_master( self, index, data=None, raster=None, record_offset=0, record_count=None, copy_master=True, ): """ returns master channel samples for given group Parameters ---------- index : int group index data : (bytes, int) (data block raw bytes, fragment offset); default None raster : float raster to be used for interpolation; default None record_offset : int if *data=None* use this to select the record offset from which the group data should be loaded Returns ------- t : numpy.array master channel samples """ original_data = data fragment = data if fragment: data_bytes, offset, _count = fragment try: timestamps = self._master_channel_cache[(index, offset, _count)] if raster and timestamps: timestamps = arange(timestamps[0], timestamps[-1], raster) return timestamps else: if copy_master: return timestamps.copy() else: return timestamps except KeyError: pass else: try: timestamps = self._master_channel_cache[index] if raster and timestamps: timestamps = arange(timestamps[0], timestamps[-1], raster) return timestamps else: if copy_master: return timestamps.copy() else: return timestamps except KeyError: pass group = self.groups[index] if group.data_location == v23c.LOCATION_ORIGINAL_FILE: stream = self._file else: stream = self._tempfile time_ch_nr = self.masters_db.get(index, None) cycles_nr = group.channel_group.cycles_nr if time_ch_nr is None: t = arange(cycles_nr, dtype=float64) metadata = ("time", 1) else: time_ch = group.channels[time_ch_nr] metadata = (time_ch.name, 1) if time_ch.bit_count == 0: if time_ch.sampling_rate: sampling_rate = time_ch.sampling_rate else: sampling_rate = 1 t = arange(cycles_nr, dtype=float64) * sampling_rate else: # get data group parents and dtypes parents, dtypes = self._prepare_record(group) # get data group record if data is None: data = self._load_data( group, record_offset=record_offset, record_count=record_count ) _count = record_count else: data = (data,) time_values = [] count = 0 for fragment in data: data_bytes, offset, _count = fragment parent, _ = parents.get(time_ch_nr, (None, None)) if parent is not None: if group.record is None: if dtypes.itemsize: record = fromstring(data_bytes, dtype=dtypes) else: record = None else: record = group.record record.setflags(write=False) t = record[parent] else: t = self._get_not_byte_aligned_data( data_bytes, group, time_ch_nr ) time_values.append(t.copy()) count += 1 if count > 1: t = concatenate(time_values) elif count == 1: t = time_values[0] else: t = array([], dtype=float64) # get timestamps conversion = time_ch.conversion if conversion is None: time_conv_type = v23c.CONVERSION_TYPE_NONE else: time_conv_type = conversion.conversion_type if time_conv_type == v23c.CONVERSION_TYPE_LINEAR: time_a = conversion.a time_b = conversion.b t = t * time_a if time_b: t += time_b if not t.dtype == float64: t = t.astype(float64) self._master_channel_metadata[index] = metadata if original_data is None: self._master_channel_cache[index] = t else: data_bytes, offset, _count = original_data self._master_channel_cache[(index, offset, _count)] = t if raster: timestamps = t if len(t) > 1: num = float(float32((timestamps[-1] - timestamps[0]) / raster)) if int(num) == num: timestamps = linspace(t[0], t[-1], int(num)) else: timestamps = arange(t[0], t[-1], raster) else: timestamps = t if copy_master: return timestamps.copy() else: return timestamps
[docs] def iter_get_triggers(self): """ generator that yields triggers Returns ------- trigger_info : dict trigger information with the following keys: * comment : trigger comment * time : trigger time * pre_time : trigger pre time * post_time : trigger post time * index : trigger index * group : data group index of trigger """ for i, gp in enumerate(self.groups): trigger = gp.trigger if trigger: for j in range(trigger["trigger_events_nr"]): trigger_info = { "comment": trigger.comment, "index": j, "group": i, "time": trigger[f"trigger_{j}_time"], "pre_time": trigger[f"trigger_{j}_pretime"], "post_time": trigger[f"trigger_{j}_posttime"], } yield trigger_info
[docs] def info(self): """get MDF information as a dict Examples -------- >>> mdf = MDF3('test.mdf') >>> mdf.info() """ info = {} for key in ("author", "department", "project", "subject"): value = self.header[key].decode("latin-1").strip(" \n\t\0") info[key] = value info["version"] = self.version info["groups"] = len(self.groups) for i, gp in enumerate(self.groups): if gp.data_location == v23c.LOCATION_ORIGINAL_FILE: stream = self._file elif gp.data_location == v23c.LOCATION_TEMPORARY_FILE: stream = self._tempfile inf = {} info[f"group {i}"] = inf inf["cycles"] = gp.channel_group.cycles_nr inf["comment"] = gp.channel_group.comment inf["channels count"] = len(gp.channels) for j, channel in enumerate(gp.channels): name = channel.name if channel.channel_type == v23c.CHANNEL_TYPE_MASTER: ch_type = "master" else: ch_type = "value" inf[f"channel {j}"] = f'name="{name}" type={ch_type}' return info
[docs] def save(self, dst, overwrite=False, compression=0): """Save MDF to *dst*. If overwrite is *True* then the destination file is overwritten, otherwise the file name is appended with '.<cntr>', were '<cntr>' is the first counter that produces a new file name (that does not already exist in the filesystem). Parameters ---------- dst : str | pathlib.Path destination file name overwrite : bool overwrite flag, default *False* compression : int does nothing for mdf version3; introduced here to share the same API as mdf version 4 files Returns ------- output_file : str output file name """ dst = Path(dst).with_suffix(".mdf") destination_dir = dst.parent destination_dir.mkdir(parents=True, exist_ok=True) if overwrite is False: if dst.is_file(): cntr = 0 while True: name = dst.with_suffix(f".{cntr}.mdf") if not name.exists(): break else: cntr += 1 message = ( f'Destination file "{dst}" already exists ' f'and "overwrite" is False. Saving MDF file as "{name}"' ) logger.warning(message) dst = name if not self.header.comment: self.header.comment = """<FHcomment> <TX>created</TX> <tool_id>asammdf</tool_id> <tool_vendor> </tool_vendor> <tool_version>{__version__}</tool_version> </FHcomment>""" else: old_history = self.header.comment timestamp = time.asctime() text = f"{old_history}\n{timestamp}: updated by asammdf {__version__}" self.header.comment = text defined_texts, cc_map, si_map = {}, {}, {} if dst == self.name: destination = dst.with_suffix(".savetemp") else: destination = dst with open(destination, "wb+") as dst_: groups_nr = len(self.groups) write = dst_.write seek = dst_.seek # list of all blocks blocks = [] address = 0 write(bytes(self.identification)) address += v23c.ID_BLOCK_SIZE write(bytes(self.header)) address += self.header.block_len if self.header.program: write(bytes(self.header.program)) self.header.program_addr = address address += self.header.program.block_len else: self.header.program_addr = 0 comment = TextBlock(text=self.header.comment) write(bytes(comment)) self.header.comment_addr = address address += comment.block_len # DataGroup # put them first in the block list so they will be written first to # disk this way, in case of memory=False, we can safely # restore he original data block address gp_rec_ids = [] original_data_block_addrs = [ group.data_group.data_block_addr for group in self.groups ] for idx, gp in enumerate(self.groups): dg = gp.data_group gp_rec_ids.append(dg.record_id_len) dg.record_id_len = 0 # DataBlock for (data_bytes, _, __) in self._load_data(gp): write(data_bytes) if gp.size: gp.data_group.data_block_addr = address else: gp.data_group.data_block_addr = 0 address += gp.size - gp_rec_ids[idx] * gp.channel_group.cycles_nr if self._callback: self._callback(int(33 * (idx + 1) / groups_nr), 100) if self._terminate: dst_.close() self.close() return for gp in self.groups: dg = gp.data_group blocks.append(dg) dg.address = address address += dg.block_len if self.groups: for i, dg in enumerate(self.groups[:-1]): addr = self.groups[i + 1].data_group.address dg.data_group.next_dg_addr = addr self.groups[-1].data_group.next_dg_addr = 0 for idx, gp in enumerate(self.groups): # Channel Dependency cd = gp.channel_dependencies for dep in cd: if dep: dep.address = address blocks.append(dep) address += dep.block_len for channel, dep in zip(gp.channels, gp.channel_dependencies): if dep: channel.component_addr = dep.address = address blocks.append(dep) address += dep.block_len else: channel.component_addr = 0 address = channel.to_blocks( address, blocks, defined_texts, cc_map, si_map ) count = len(gp.channels) if count: for i in range(count - 1): gp.channels[i].next_ch_addr = gp.channels[i + 1].address gp.channels[-1].next_ch_addr = 0 # ChannelGroup cg = gp.channel_group if gp.channels: cg.first_ch_addr = gp.channels[0].address else: cg.first_ch_addr = 0 cg.next_cg_addr = 0 address = cg.to_blocks(address, blocks, defined_texts, si_map) # TriggerBLock trigger = gp.trigger if trigger: address = trigger.to_blocks(address, blocks) if self._callback: self._callback(int(33 * (idx + 1) / groups_nr) + 33, 100) if self._terminate: dst_.close() self.close() return # update referenced channels addresses in the channel dependecies for gp in self.groups: for dep in gp.channel_dependencies: if not dep: continue for i, pair_ in enumerate(dep.referenced_channels): dg_nr, ch_nr = pair_ grp = self.groups[dg_nr] ch = grp.channels[ch_nr] dep[f"ch_{i}"] = ch.address dep[f"cg_{i}"] = grp.channel_group.address dep[f"dg_{i}"] = grp.data_group.address # DataGroup for gp in self.groups: gp.data_group.first_cg_addr = gp.channel_group.address if gp.trigger: gp.data_group.trigger_addr = gp.trigger.address else: gp.data_group.trigger_addr = 0 if self.groups: address = self.groups[0].data_group.address self.header.first_dg_addr = address self.header.dg_nr = len(self.groups) if self._terminate: dst_.close() self.close() return if self._callback: blocks_nr = len(blocks) threshold = blocks_nr / 33 count = 1 for i, block in enumerate(blocks): write(bytes(block)) if i >= threshold: self._callback(66 + count, 100) count += 1 threshold += blocks_nr / 33 else: for block in blocks: write(bytes(block)) for gp, rec_id, original_address in zip( self.groups, gp_rec_ids, original_data_block_addrs ): gp.data_group.record_id_len = rec_id gp.data_group.data_block_addr = original_address seek(0) write(bytes(self.identification)) write(bytes(self.header)) if dst == self.name: self.close() Path.unlink(self.name) Path.rename(destination, self.name) self.groups.clear() self.header = None self.identification = None self.channels_db.clear() self.masters_db.clear() self._master_channel_cache.clear() self._tempfile = TemporaryFile() self._file = open(self.name, "rb") self._read() if self._callback: self._callback(100, 100) return dst
if __name__ == "__main__": pass