Source code for asammdf.mdf4

# -*- coding: utf-8 -*-
"""
ASAM MDF version 4 file format module
"""

from __future__ import print_function, division
import sys
import time
import warnings
import os

from tempfile import TemporaryFile
from struct import unpack, unpack_from
from functools import reduce, partial
from collections import defaultdict
from hashlib import md5
import xml.etree.ElementTree as XML
from math import ceil
from copy import deepcopy

from numpy import (
    interp,
    linspace,
    dtype,
    array_equal,
    zeros,
    uint8,
    array,
    searchsorted,
    clip,
    union1d,
    float64,
    frombuffer,
    argwhere,
    arange,
    flip,
    unpackbits,
    packbits,
    roll,
    transpose,
)
from numpy.core.records import fromstring, fromarrays
from numpy.core.defchararray import encode
from numexpr import evaluate

from .v4blocks import (
    AttachmentBlock,
    Channel,
    ChannelArrayBlock,
    ChannelGroup,
    ChannelConversion,
    DataBlock,
    DataZippedBlock,
    DataGroup,
    DataList,
    FileHistory,
    FileIdentificationBlock,
    HeaderBlock,
    HeaderList,
    SignalDataBlock,
    SourceInformation,
    TextBlock,
)

from . import v4constants as v4c
from .utils import (
    MdfException,
    get_fmt,
    fmt_to_datatype,
    get_unique_name,
    get_min_max,
    fix_dtype_fields,
)

from .signal import Signal
from .version import __version__


get_fmt = partial(get_fmt, version=4)
fmt_to_datatype = partial(fmt_to_datatype, version=4)

MASTER_CHANNELS = (
    v4c.CHANNEL_TYPE_MASTER,
    v4c.CHANNEL_TYPE_VIRTUAL_MASTER,
)

PYVERSION = sys.version_info[0]
if PYVERSION == 2:
    from .utils import bytes

__all__ = ['MDF4', ]


[docs]class MDF4(object): """If the *name* exist it will be memorised otherwise an empty file will be created that can be later saved to disk Parameters ---------- name : string mdf file name memory : str memory optimization option; default `full` * if *full* the data group binary data block will be memorised in RAM * if *low* the channel data is read from disk on request, and the metadata is memorized into RAM * if *minimum* only minimal data is memorized into RAM version : string mdf file version ('4.00', '4.10', '4.11'); default '4.10' Attributes ---------- name : string mdf file name groups : list list of data groups header : HeaderBlock mdf file header file_history : list list of (FileHistory, TextBlock) pairs comment : TextBlock mdf file comment identification : FileIdentificationBlock mdf file start block memory : str memory optimization option version : str mdf version channels_db : dict used for fast channel access by name; for each name key the value is a list of (group index, channel index) tuples masters_db : dict used for fast master channel access; for each group index key the value is the master channel index """ _compact_integers_on_append = False _split_data_blocks = False _split_threshold = 1 << 21 _overwrite = False def __init__(self, name=None, memory='full', version='4.10'): self.groups = [] self.header = None self.identification = None self.file_history = [] self.name = name self.memory = memory self.channels_db = {} self.masters_db = {} self.attachments = [] self.file_comment = None self._ch_map = {} self._master_channel_cache = {} # used for appending when memory=False self._tempfile = TemporaryFile() self._file = None self._tempfile.write(b'\0') if name: self._file = open(self.name, 'rb') self._read() else: self.header = HeaderBlock() self.identification = FileIdentificationBlock(version=version) self.version = version def _check_finalised(self): flags = self.identification['unfinalized_standard_flags'] if flags & 1: message = ('Unfinalised file {}:' 'Update of cycle counters for CG/CA blocks required') warnings.warn(message.format(self.name)) elif flags & 1 << 1: message = ('Unfinalised file {}:' 'Update of cycle counters for SR blocks required') warnings.warn(message.format(self.name)) elif flags & 1 << 2: message = ('Unfinalised file {}:' 'Update of length for last DT block required') warnings.warn(message.format(self.name)) elif flags & 1 << 3: message = ('Unfinalised file {}:' 'Update of length for last RD block required') warnings.warn(message.format(self.name)) elif flags & 1 << 4: message = ('Unfinalised file {}:' 'Update of last DL block in each chained list' 'of DL blocks required') warnings.warn(message.format(self.name)) elif flags & 1 << 5: message = ('Unfinalised file {}:' 'Update of cg_data_bytes and cg_inval_bytes ' 'in VLSD CG block required') warnings.warn(message.format(self.name)) elif flags & 1 << 6: message = ('Unfinalised file {}:' 'Update of offset values for VLSD channel required ' 'in case a VLSD CG block is used') warnings.warn(message.format(self.name)) def _read(self): stream = self._file memory = self.memory dg_cntr = 0 self.identification = FileIdentificationBlock(stream=stream) version = self.identification['version_str'] self.version = version.decode('utf-8').strip(' \n\t\0') if self.version in ('4.10', '4.11'): self._check_finalised() self.header = HeaderBlock(address=0x40, stream=stream) # read file comment if self.header['comment_addr']: self.file_comment = TextBlock( address=self.header['comment_addr'], stream=stream, ) # read file history fh_addr = self.header['file_history_addr'] while fh_addr: fh = FileHistory(address=fh_addr, stream=stream) fh_text = TextBlock(address=fh['comment_addr'], stream=stream) self.file_history.append((fh, fh_text)) fh_addr = fh['next_fh_addr'] # read attachments at_addr = self.header['first_attachment_addr'] while at_addr: texts = {} at_block = AttachmentBlock(address=at_addr, stream=stream) for key in ('file_name_addr', 'mime_addr', 'comment_addr'): addr = at_block[key] if addr: texts[key] = TextBlock(address=addr, stream=stream) self.attachments.append((at_block, texts)) at_addr = at_block['next_at_addr'] # go to first date group and read each data group sequentially dg_addr = self.header['first_dg_addr'] while dg_addr: new_groups = [] group = DataGroup(address=dg_addr, stream=stream) record_id_nr = group['record_id_len'] # go to first channel group of the current data group cg_addr = group['first_cg_addr'] cg_nr = 0 cg_size = {} while cg_addr: cg_nr += 1 grp = {} new_groups.append(grp) grp['channels'] = [] grp['channel_conversions'] = [] grp['channel_sources'] = [] grp['signal_data'] = [] grp['data_block'] = None grp['channel_dependencies'] = [] # channel_group is lsit to allow uniform handling of all texts # in save method grp['texts'] = { 'channels': [], 'sources': [], 'conversions': [], 'conversion_tab': [], 'channel_group': [], } # read each channel group sequentially block = ChannelGroup(address=cg_addr, stream=stream) channel_group = grp['channel_group'] = block grp['record_size'] = cg_size if channel_group['flags'] == 0: samples_size = channel_group['samples_byte_nr'] inval_size = channel_group['invalidation_bytes_nr'] record_id = channel_group['record_id'] cg_size[record_id] = samples_size + inval_size else: # VLDS flags record_id = channel_group['record_id'] cg_size[record_id] = 0 if record_id_nr: grp['sorted'] = False else: grp['sorted'] = True data_group = DataGroup(address=dg_addr, stream=stream) grp['data_group'] = data_group # read acquisition name and comment for current channel group channel_group_texts = {} for key in ('acq_name_addr', 'comment_addr'): address = channel_group[key] if address: block = TextBlock(address=address, stream=stream) if memory == 'minimum': channel_group_texts[key] = address else: channel_group_texts[key] = block if channel_group_texts: grp['texts']['channel_group'].append(channel_group_texts) else: grp['texts']['channel_group'].append(None) # go to first channel of the current channel group ch_addr = channel_group['first_ch_addr'] ch_cntr = 0 # Read channels by walking recursively in the channel group # starting from the first channel self._read_channels( ch_addr, grp, stream, dg_cntr, ch_cntr, ) cg_addr = channel_group['next_cg_addr'] dg_cntr += 1 # store channel groups record sizes dict in each # new group data belong to the initial unsorted group, and add # the key 'sorted' with the value False to use a flag; # this is used later if memory=False if memory == 'full': grp['data_location'] = v4c.LOCATION_MEMORY dat_addr = group['data_block_addr'] data = self._read_data_block(address=dat_addr, stream=stream) if record_id_nr == 0: grp = new_groups[0] grp['data_location'] = v4c.LOCATION_MEMORY grp['data_block'] = DataBlock(data=data) else: cg_data = defaultdict(list) i = 0 size = len(data) while i < size: rec_id = data[i] # skip record id i += 1 rec_size = cg_size[rec_id] if rec_size: rec_data = data[i: i+rec_size] cg_data[rec_id].append(rec_data) else: rec_size = unpack('<I', data[i: i+4])[0] i += 4 rec_data = data[i: i + rec_size] cg_data[rec_id].append(rec_data) # if 2 record id's are used skip also the second one if record_id_nr == 2: i += rec_size + 1 else: i += rec_size for grp in new_groups: grp['data_location'] = v4c.LOCATION_MEMORY record_id = grp['channel_group']['record_id'] data = b''.join(cg_data[record_id]) grp['channel_group']['record_id'] = 1 grp['data_block'] = DataBlock(data=data) else: for grp in new_groups: grp['data_location'] = v4c.LOCATION_ORIGINAL_FILE self.groups.extend(new_groups) dg_addr = group['next_dg_addr'] for grp in self.groups: for dep_list in grp['channel_dependencies']: if not dep_list: continue for dep in dep_list: if isinstance(dep, Channel): break else: conditions = ( dep['ca_type'] == v4c.CA_TYPE_LOOKUP, dep['links_nr'] == 4 * dep['dims'] + 1, ) if not all(conditions): continue for i in range(dep['dims']): ch_addr = dep['scale_axis_{}_ch_addr'.format(i)] ref_channel = self._ch_map[ch_addr] dep.referenced_channels.append(ref_channel) if self.memory == 'full': self.close() def _read_channels( self, ch_addr, grp, stream, dg_cntr, ch_cntr, channel_composition=False): memory = self.memory channels = grp['channels'] composition = [] while ch_addr: # read channel block and create channel object channel = Channel(address=ch_addr, stream=stream) self._ch_map[ch_addr] = (ch_cntr, dg_cntr) if memory == 'minimum': value = ch_addr else: value = channel channels.append(value) if channel_composition: composition.append(value) # read conversion block and create channel conversion object address = channel['conversion_addr'] if address: conv = ChannelConversion(address=address, stream=stream) else: conv = None if memory == 'minimum': grp['channel_conversions'].append(address) else: grp['channel_conversions'].append(conv) conv_tabx_texts = {} # read text fields for channel conversions conv_texts = {} if conv: for key in ('name_addr', 'unit_addr', 'comment_addr'): address = conv[key] if address: if memory == 'minimum': conv_texts[key] = address else: conv_texts[key] = TextBlock( address=address, stream=stream, ) if 'formula_addr' in conv: address = conv['formula_addr'] if memory == 'minimum': conv_texts['formula_addr'] = address else: if address: block = TextBlock(address=address, stream=stream) conv_texts['formula_addr'] = block else: conv_texts['formula_addr'] = None if conv['conversion_type'] in v4c.TABULAR_CONVERSIONS: if conv['conversion_type'] == v4c.CONVERSION_TYPE_TTAB: tabs = conv['links_nr'] - 4 else: tabs = conv['links_nr'] - 4 - 1 for i in range(tabs): address = conv['text_{}'.format(i)] if memory == 'minimum': conv_tabx_texts['text_{}'.format(i)] = address else: if address: block = TextBlock( address=address, stream=stream, ) conv_tabx_texts['text_{}'.format(i)] = block else: conv_tabx_texts['text_{}'.format(i)] = None if not conv['conversion_type'] == v4c.CONVERSION_TYPE_TTAB: address = conv.get('default_addr', 0) if address: if memory == 'minimum': conv_tabx_texts['default_addr'] = address else: stream.seek(address, v4c.SEEK_START) blk_id = stream.read(4) if blk_id == b'##TX': block = TextBlock( address=address, stream=stream, ) conv_tabx_texts['default_addr'] = block elif blk_id == b'##CC': block = ChannelConversion( address=address, stream=stream, ) text = str(time.clock()).encode('utf-8') default_text = block default_text['text'] = text conv['unit_addr'] = default_text['unit_addr'] default_text['unit_addr'] = 0 elif conv['conversion_type'] == v4c.CONVERSION_TYPE_TRANS: # link_nr - common links (4) - default text link (1) for i in range((conv['links_nr'] - 4 - 1) // 2): for key in ('input_{}_addr'.format(i), 'output_{}_addr'.format(i)): address = conv[key] if address: if memory == 'minimum': conv_tabx_texts[key] = address else: block = TextBlock( address=address, stream=stream, ) conv_tabx_texts[key] = block address = conv['default_addr'] if address: if memory == 'minimum': conv_tabx_texts['default_addr'] = address else: block = TextBlock( address=address, stream=stream, ) conv_tabx_texts['default_addr'] = block if conv_tabx_texts: grp['texts']['conversion_tab'].append(conv_tabx_texts) else: grp['texts']['conversion_tab'].append(None) if conv_texts: grp['texts']['conversions'].append(conv_texts) else: grp['texts']['conversions'].append(None) # read source block and create source information object source_texts = {} address = channel['source_addr'] if address: source = SourceInformation( address=address, stream=stream, ) if memory == 'minimum': grp['channel_sources'].append(address) else: grp['channel_sources'].append(source) # read text fields for channel sources for key in ('name_addr', 'path_addr', 'comment_addr'): address = source[key] if address: if memory == 'minimum': source_texts[key] = address else: block = TextBlock(address=address, stream=stream) source_texts[key] = block if source_texts: grp['texts']['sources'].append(source_texts) else: grp['texts']['sources'].append(None) else: if memory == 'minimum': grp['channel_sources'].append(0) else: grp['channel_sources'].append(None) grp['texts']['sources'].append(None) # read text fields for channel channel_texts = {} grp['texts']['channels'].append(channel_texts) for key in ('name_addr', 'comment_addr', 'unit_addr'): address = channel[key] if address: if memory == 'minimum': channel_texts[key] = address else: channel_texts[key] = TextBlock( address=address, stream=stream, ) # update channel object name and block_size attributes if memory == 'minimum': block = TextBlock( address=channel_texts['name_addr'], stream=stream, ) name = block['text'].decode('utf-8') else: name = channel_texts['name_addr']['text'].decode('utf-8') name = name.split('\\')[0] name = name.strip(' \t\n\r\0') channel.name = name if name not in self.channels_db: self.channels_db[name] = [] self.channels_db[name].append((dg_cntr, ch_cntr)) if channel['channel_type'] in MASTER_CHANNELS: self.masters_db[dg_cntr] = ch_cntr ch_cntr += 1 if channel['component_addr']: # check if it is a CABLOCK or CNBLOCK stream.seek(channel['component_addr'], v4c.SEEK_START) blk_id = stream.read(4) if blk_id == b'##CN': index = ch_cntr - 1 grp['channel_dependencies'].append(None) ch_cntr, composition = self._read_channels( channel['component_addr'], grp, stream, dg_cntr, ch_cntr, True, ) grp['channel_dependencies'][index] = composition else: # only channel arrays with storage=CN_TEMPLATE are # supported so far ca_block = ChannelArrayBlock( address=channel['component_addr'], stream=stream, ) if ca_block['storage'] != v4c.CA_STORAGE_TYPE_CN_TEMPLATE: warnings.warn('Only CN template arrays are supported') ca_list = [ca_block, ] while ca_block['composition_addr']: ca_block = ChannelArrayBlock( address=ca_block['composition_addr'], stream=stream, ) ca_list.append(ca_block) grp['channel_dependencies'].append(ca_list) else: grp['channel_dependencies'].append(None) # go to next channel of the current channel group ch_addr = channel['next_ch_addr'] return ch_cntr, composition def _read_data_block(self, address, stream): """read and aggregate data blocks for a given data group Returns ------- data : bytes aggregated raw data """ if address: stream.seek(address, v4c.SEEK_START) id_string = stream.read(4) # can be a DataBlock if id_string == b'##DT': data = DataBlock(address=address, stream=stream) data = data['data'] # or a DataZippedBlock elif id_string == b'##DZ': data = DataZippedBlock(address=address, stream=stream) data = data['data'] # or a DataList elif id_string == b'##DL': data = [] while address: dl = DataList(address=address, stream=stream) for i in range(dl['links_nr'] - 1): addr = dl['data_block_addr{}'.format(i)] stream.seek(addr, v4c.SEEK_START) id_string = stream.read(4) if id_string == b'##DT': block = DataBlock(stream=stream, address=addr) data.append(block['data']) elif id_string == b'##DZ': block = DataZippedBlock( stream=stream, address=addr, ) data.append(block['data']) elif id_string == b'##DL': data.append( self._read_data_block( address=addr, stream=stream, ) ) address = dl['next_dl_addr'] data = b''.join(data) # or a header list elif id_string == b'##HL': hl = HeaderList(address=address, stream=stream) data = self._read_data_block( address=hl['first_dl_addr'], stream=stream, ) else: data = b'' return data def _load_group_data(self, group): """ get group's data block bytes """ if self.memory == 'full': data = group['data_block']['data'] else: # could be an appended group # for now appended groups keep the measured data in the memory. # the plan is to use a temp file for appended groups, to keep the # memory usage low. data_group = group['data_group'] channel_group = group['channel_group'] if group['data_location'] == v4c.LOCATION_ORIGINAL_FILE: # go to the first data block of the current data group stream = self._file dat_addr = data_group['data_block_addr'] data = self._read_data_block( address=dat_addr, stream=stream, ) if not group['sorted']: cg_data = [] cg_size = group['record_size'] record_id = channel_group['record_id'] if data_group['record_id_len'] <= 2: record_id_nr = data_group['record_id_len'] else: record_id_nr = 0 i = 0 size = len(data) while i < size: rec_id = data[i] # skip record id i += 1 rec_size = cg_size[rec_id] if rec_size: if rec_id == record_id: rec_data = data[i: i+rec_size] cg_data.append(rec_data) else: rec_size = unpack('<I', data[i: i+4])[0] i += 4 if rec_id == record_id: rec_data = data[i: i + rec_size] cg_data.append(rec_data) # consider the second record ID byte if record_id_nr == 2: i += rec_size + 1 else: i += rec_size data = b''.join(cg_data) elif group['data_location'] == v4c.LOCATION_TEMPORARY_FILE: dat_addr = data_group['data_block_addr'] if dat_addr: cycles_nr = channel_group['cycles_nr'] samples_byte_nr = channel_group['samples_byte_nr'] size = cycles_nr * samples_byte_nr self._tempfile.seek(dat_addr, v4c.SEEK_START) data = self._tempfile.read(size) else: data = b'' return data def _load_signal_data(self, address): """ this method is used to get the channel signal data, usually for VLSD channels """ if address: if self._file.closed: self._file = open(self.name, 'rb') stream = self._file stream.seek(address, v4c.SEEK_START) blk_id = stream.read(4) if blk_id == b'##SD': data = SignalDataBlock(address=address, stream=stream) data = data['data'] elif blk_id == b'##DZ': data = DataZippedBlock(address=address, stream=stream) data = data['data'] elif blk_id == b'##DL': data = [] while address: # the data list will contain only links to SDBLOCK's data_list = DataList(address=address, stream=stream) nr = data_list['links_nr'] # aggregate data from all SDBLOCK for i in range(nr-1): addr = data_list['data_block_addr{}'.format(i)] stream.seek(addr, v4c.SEEK_START) blk_id = stream.read(4) if blk_id == b'##SD': block = SignalDataBlock( address=addr, stream=stream, ) data.append(block['data']) elif blk_id == b'##DZ': block = DataZippedBlock( address=addr, stream=stream, ) data.append(block['data']) else: message = ('Expected SD, DZ or DL block at {} ' 'but found id="{}"') message = message.format(hex(address), blk_id) warnings.warn(message) return address = data_list['next_dl_addr'] data = b''.join(data) elif blk_id == b'##CN': data = b'' else: message = ('Expected SD, DL, DZ or CN block at {} ' 'but found id="{}"') message = message.format(hex(address), blk_id) warnings.warn(message) data = b'' if self.memory == 'full': self.close() else: data = b'' return data def _prepare_record(self, group): """ compute record dtype and parents dict fro this group Parameters ---------- group : dict MDF group dict Returns ------- parents, dtypes : dict, numpy.dtype mapping of channels to records fields, records fields dtype """ grp = group stream = self._file memory = self.memory channel_group = grp['channel_group'] if memory == 'minimum': channels = [ Channel(address=ch_addr, stream=stream) for ch_addr in grp['channels'] ] else: channels = grp['channels'] record_size = channel_group['samples_byte_nr'] invalidation_bytes_nr = channel_group['invalidation_bytes_nr'] next_byte_aligned_position = 0 types = [] current_parent = "" parent_start_offset = 0 parents = {} group_channels = set() sortedchannels = sorted(enumerate(channels), key=lambda i: i[1]) for original_index, new_ch in sortedchannels: start_offset = new_ch['byte_offset'] bit_offset = new_ch['bit_offset'] data_type = new_ch['data_type'] bit_count = new_ch['bit_count'] ch_type = new_ch['channel_type'] dependency_list = grp['channel_dependencies'][original_index] if memory == 'minimum': block = TextBlock( address=grp['texts']['channels'] [original_index]['name_addr'], stream=stream, ) name = block['text'].decode('utf-8').strip(' \r\n\t\0') name = name.split('\\')[0] else: name = new_ch.name # handle multiple occurance of same channel name name = get_unique_name(group_channels, name) group_channels.add(name) if start_offset >= next_byte_aligned_position: if ch_type not in (v4c.CHANNEL_TYPE_VIRTUAL_MASTER, v4c.CHANNEL_TYPE_VIRTUAL): if not dependency_list: parent_start_offset = start_offset # check if there are byte gaps in the record gap = parent_start_offset - next_byte_aligned_position if gap: types.append(('', 'a{}'.format(gap))) # adjust size to 1, 2, 4 or 8 bytes size = bit_offset + bit_count if data_type not in (v4c.DATA_TYPE_BYTEARRAY, v4c.DATA_TYPE_STRING_UTF_8, v4c.DATA_TYPE_STRING_LATIN_1, v4c.DATA_TYPE_STRING_UTF_16_BE, v4c.DATA_TYPE_STRING_UTF_16_LE, v4c.DATA_TYPE_CANOPEN_TIME, v4c.DATA_TYPE_CANOPEN_DATE): if size > 32: size = 8 elif size > 16: size = 4 elif size > 8: size = 2 else: size = 1 else: size = size >> 3 next_byte_aligned_position = parent_start_offset + size if next_byte_aligned_position <= record_size: dtype_pair = name, get_fmt(data_type, size) types.append(dtype_pair) parents[original_index] = name, bit_offset current_parent = name else: if isinstance(dependency_list[0], ChannelArrayBlock): ca_block = dependency_list[0] # check if there are byte gaps in the record gap = start_offset - next_byte_aligned_position if gap: dtype_pair = '', 'a{}'.format(gap) types.append(dtype_pair) size = bit_count >> 3 shape = tuple( ca_block['dim_size_{}'.format(i)] for i in range(ca_block['dims']) ) if ca_block['byte_offset_base'] // size > 1 and \ len(shape) == 1: shape += ca_block['byte_offset_base'] // size, dim = 1 for d in shape: dim *= d dtype_pair = name, get_fmt(data_type, size), shape types.append(dtype_pair) current_parent = name next_byte_aligned_position = start_offset + size * dim parents[original_index] = name, 0 else: parents[original_index] = None, None # virtual channels do not have bytes in the record else: parents[original_index] = None, None else: max_overlapping_size = (next_byte_aligned_position - start_offset) * 8 needed_size = bit_offset + bit_count if max_overlapping_size >= needed_size: parents[original_index] = current_parent, ((start_offset - parent_start_offset) << 3) + bit_offset if next_byte_aligned_position > record_size: break gap = record_size - next_byte_aligned_position if gap > 0: dtype_pair = '', 'a{}'.format(gap) types.append(dtype_pair) dtype_pair = 'invalidation_bytes', 'a{}'.format(invalidation_bytes_nr) types.append(dtype_pair) if PYVERSION == 2: types = fix_dtype_fields(types) return parents, dtype(types) def _get_not_byte_aligned_data(self, data, group, ch_nr): big_endian_types = ( v4c.DATA_TYPE_UNSIGNED_MOTOROLA, v4c.DATA_TYPE_REAL_MOTOROLA, v4c.DATA_TYPE_SIGNED_MOTOROLA, ) record_size = group['channel_group']['samples_byte_nr'] if self.memory == 'minimum': channel = Channel( address=group['channels'][ch_nr], stream=self._file, ) else: channel = group['channels'][ch_nr] bit_offset = channel['bit_offset'] byte_offset = channel['byte_offset'] bit_count = channel['bit_count'] dependency_list = group['channel_dependencies'][ch_nr] if dependency_list and isinstance(dependency_list[0], ChannelArrayBlock): ca_block = dependency_list[0] size = bit_count >> 3 shape = tuple( ca_block['dim_size_{}'.format(i)] for i in range(ca_block['dims']) ) if ca_block['byte_offset_base'] // size > 1 and len(shape) == 1: shape += (ca_block['byte_offset_base'] // size, ) dim = 1 for d in shape: dim *= d size *= dim bit_count = size << 3 byte_count = bit_offset + bit_count if byte_count % 8: byte_count = (byte_count >> 3) + 1 else: byte_count >>= 3 types = [ ('', 'a{}'.format(byte_offset)), ('vals', '({},)u1'.format(byte_count)), ('', 'a{}'.format(record_size - byte_count - byte_offset)), ] vals = fromstring(data, dtype=dtype(types)) vals = vals['vals'] if channel['data_type'] not in big_endian_types: vals = flip(vals, 1) vals = unpackbits(vals) vals = roll(vals, bit_offset) vals = vals.reshape((len(vals) // 8, 8)) vals = packbits(vals) vals = vals.reshape((len(vals) // byte_count, byte_count)) if bit_count < 64: mask = 2 ** bit_count - 1 masks = [] while mask: masks.append(mask & 0xFF) mask >>= 8 for i in range(byte_count - len(masks)): masks.append(0) masks = masks[::-1] for i, mask in enumerate(masks): vals[:, i] &= mask if channel['data_type'] not in big_endian_types: vals = flip(vals, 1) if bit_count <= 8: size = 1 elif bit_count <= 16: size = 2 elif bit_count <= 32: size = 4 elif bit_count <= 64: size = 8 else: size = bit_count // 8 if size > byte_count: extra_bytes = size - byte_count extra = zeros((len(vals), extra_bytes), dtype=uint8) types = [ ('vals', vals.dtype, vals.shape[1:]), ('', extra.dtype, extra.shape[1:]), ] vals = fromarrays([vals, extra], dtype=dtype(types)) vals = vals.tostring() fmt = get_fmt(channel['data_type'], size) if size <= byte_count: types = [ ('vals', fmt), ('', 'a{}'.format(byte_count - size)), ] else: types = [('vals', fmt), ] vals = fromstring(vals, dtype=dtype(types)) return vals['vals'] def _validate_channel_selection(self, name=None, group=None, index=None): """Gets channel comment. Channel can be specified in two ways: * using the first positional argument *name* * if there are multiple occurrences for this channel then the *group* and *index* arguments can be used to select a specific group. * if there are multiple occurrences for this channel and either the *group* or *index* arguments is None then a warning is issued * using the group number (keyword argument *group*) and the channel number (keyword argument *index*). Use *info* method for group and channel numbers If the *raster* keyword argument is not *None* the output is interpolated accordingly. Parameters ---------- name : string name of channel group : int 0-based group index index : int 0-based channel index Returns ------- group_index, channel_index : (int, int) selected channel's group and channel index """ if name is None: if group is None or index is None: message = ('Invalid arguments for channel selection: ' 'must give "name" or, "group" and "index"') raise MdfException(message) else: gp_nr, ch_nr = group, index if gp_nr > len(self.groups) - 1: raise MdfException('Group index out of range') if index > len(self.groups[gp_nr]['channels']) - 1: raise MdfException('Channel index out of range') else: name = name.split('\\')[0] if name not in self.channels_db: raise MdfException('Channel "{}" not found'.format(name)) else: if group is None: gp_nr, ch_nr = self.channels_db[name][0] if len(self.channels_db[name]) > 1: message = ('Multiple occurances for channel "{}". ' 'Using first occurance from data group {}. ' 'Provide both "group" and "index" arguments' ' to select another data group') message = message.format(name, gp_nr) warnings.warn(message) else: group_valid = False for gp_nr, ch_nr in self.channels_db[name]: if gp_nr == group: group_valid = True if index is None: break elif index == ch_nr: break else: if group_valid: gp_nr, ch_nr = self.channels_db[name][group] message = ('You have selected channel index "{}"' 'of group "{}" for channel "{}", but ' 'this channel index is invalid. Using ' 'first occurance of "{}" in this group' ' at index "{}"') message = message.format( index, group, name, name, ch_nr, ) else: gp_nr, ch_nr = self.channels_db[name][0] message = ('You have selected group "{}" for ' 'channel "{}", but this channel was not' ' found in this group, or this group ' 'index does not exist. Using first ' 'occurance of "{}" from group "{}"') message = message.format(group, name, name, gp_nr) warnings.warn(message) return gp_nr, ch_nr
[docs] def append(self, signals, source_info='Python', common_timebase=False): """ Appends a new data group. For channel dependencies type Signals, the *samples* attribute must be a numpy.recarray Parameters ---------- signals : list list on *Signal* objects source_info : str source information; default 'Python' common_timebase : bool flag to hint that the signals have the same timebase Examples -------- >>> # case 1 conversion type None >>> s1 = np.array([1, 2, 3, 4, 5]) >>> s2 = np.array([-1, -2, -3, -4, -5]) >>> s3 = np.array([0.1, 0.04, 0.09, 0.16, 0.25]) >>> t = np.array([0.001, 0.002, 0.003, 0.004, 0.005]) >>> names = ['Positive', 'Negative', 'Float'] >>> units = ['+', '-', '.f'] >>> info = {} >>> s1 = Signal(samples=s1, timstamps=t, unit='+', name='Positive') >>> s2 = Signal(samples=s2, timstamps=t, unit='-', name='Negative') >>> s3 = Signal(samples=s3, timstamps=t, unit='flts', name='Floats') >>> mdf = MDF3('new.mdf') >>> mdf.append([s1, s2, s3], 'created by asammdf v1.1.0') >>> # case 2: VTAB conversions from channels inside another file >>> mdf1 = MDF3('in.mdf') >>> ch1 = mdf1.get("Channel1_VTAB") >>> ch2 = mdf1.get("Channel2_VTABR") >>> sigs = [ch1, ch2] >>> mdf2 = MDF3('out.mdf') >>> mdf2.append(sigs, 'created by asammdf v1.1.0') """ if not signals: message = '"append" requires a non-empty list of Signal objects' raise MdfException(message) # check if the signals have a common timebase # if not interpolate the signals using the union of all timbases t_ = signals[0].timestamps if not common_timebase: for s in signals[1:]: if not array_equal(s.timestamps, t_): different = True break else: different = False if different: times = [s.timestamps for s in signals] t = reduce(union1d, times).flatten().astype(float64) signals = [s.interp(t) for s in signals] times = None else: t = t_ else: t = t_ # split regular from composed signals. Composed signals have recarray # samples or multimendional ndarray. # The regular signals will be first added to the group. # The composed signals will be saved along side the fields, which will # be saved as new signals. simple_signals = [ sig for sig in signals if len(sig.samples.shape) <= 1 and sig.samples.dtype.names is None ] composed_signals = [ sig for sig in signals if len(sig.samples.shape) > 1 or sig.samples.dtype.names ] dg_cntr = len(self.groups) gp = {} gp['channels'] = gp_channels = [] gp['channel_conversions'] = gp_conv = [] gp['channel_sources'] = gp_source = [] gp['channel_dependencies'] = gp_dep = [] gp['texts'] = gp_texts = { 'channels': [], 'sources': [], 'conversions': [], 'conversion_tab': [], 'channel_group': [], } self.groups.append(gp) cycles_nr = len(t) fields = [] types = [] parents = {} ch_cntr = 0 offset = 0 field_names = set() # setup all blocks related to the time master channel # time channel texts for key in ('channels', 'sources', 'channel_group', 'conversions'): gp['texts'][key].append({}) for key in ('conversion_tab', ): gp['texts'][key].append(None) memory = self.memory file = self._tempfile write = file.write tell = file.tell block = TextBlock(text='t', meta=False) if memory != 'minimum': gp_texts['channels'][-1]['name_addr'] = block else: address = tell() gp_texts['channels'][-1]['name_addr'] = address write(bytes(block)) block = TextBlock(text='s', meta=False) if memory != 'minimum': gp_texts['conversions'][-1]['unit_addr'] = block else: address = tell() gp_texts['conversions'][-1]['unit_addr'] = address write(bytes(block)) si_text = TextBlock(text=source_info, meta=False) if memory == 'minimum': address = tell() si_text.address = address write(bytes(si_text)) gp_texts['sources'][-1]['name_addr'] = address gp_texts['sources'][-1]['path_addr'] = address gp_texts['channel_group'][-1]['acq_name_addr'] = address gp_texts['channel_group'][-1]['comment_addr'] = address else: gp_texts['sources'][-1]['name_addr'] = si_text gp_texts['sources'][-1]['path_addr'] = si_text gp_texts['channel_group'][-1]['acq_name_addr'] = si_text gp_texts['channel_group'][-1]['comment_addr'] = si_text # conversion and source for time channel if memory == 'minimum': gp_conv.append(0) address = tell() block = SourceInformation() write(bytes(block)) gp_source.append(address) else: gp_conv.append(None) gp_source.append(SourceInformation()) # time channel t_type, t_size = fmt_to_datatype(t.dtype) kargs = { 'channel_type': v4c.CHANNEL_TYPE_MASTER, 'data_type': t_type, 'sync_type': 1, 'byte_offset': 0, 'bit_offset': 0, 'bit_count': t_size, 'min_raw_value': t[0] if cycles_nr else 0, 'max_raw_value': t[-1] if cycles_nr else 0, 'lower_limit': t[0] if cycles_nr else 0, 'upper_limit': t[-1] if cycles_nr else 0, 'flags': v4c.FLAG_PHY_RANGE_OK | v4c.FLAG_VAL_RANGE_OK, } ch = Channel(**kargs) ch.name = name = 't' if memory == 'minimum': address = tell() write(bytes(ch)) gp_channels.append(address) else: gp_channels.append(ch) if name not in self.channels_db: self.channels_db[name] = [] self.channels_db[name].append((dg_cntr, ch_cntr)) self.masters_db[dg_cntr] = 0 # data group record parents parents[ch_cntr] = name, 0 # time channel doesn't have channel dependencies gp_dep.append(None) fields.append(t) types.append((name, t.dtype)) field_names.add(name) offset += t_size // 8 ch_cntr += 1 if self._compact_integers_on_append: compacted_signals = [ {'signal': sig} for sig in simple_signals if sig.samples.dtype.kind in 'ui' ] max_itemsize = 1 dtype_ = dtype(uint8) for signal in compacted_signals: itemsize = signal['signal'].samples.dtype.itemsize min_val, max_val = get_min_max(signal['signal'].samples) signal['min'], signal['max'] = min_val, max_val minimum_bitlength = (itemsize // 2) * 8 + 1 bit_length = max( int(min_val).bit_length(), int(max_val).bit_length(), ) signal['bit_count'] = max(minimum_bitlength, bit_length) if itemsize > max_itemsize: dtype_ = dtype('<u{}'.format(itemsize)) max_itemsize = itemsize compacted_signals.sort(key=lambda x: x['bit_count']) simple_signals = [ sig for sig in simple_signals if sig.samples.dtype.kind not in 'ui' ] dtype_size = dtype_.itemsize * 8 else: compacted_signals = [] # first try to compact unsigned integers while compacted_signals: # channels texts cluster = [] tail = compacted_signals.pop() size = tail['bit_count'] cluster.append(tail) while size < dtype_size and compacted_signals: head = compacted_signals[0] head_size = head['bit_count'] if head_size + size > dtype_size: break else: cluster.append(compacted_signals.pop(0)) size += head_size bit_offset = 0 field_name = get_unique_name(field_names, 'COMPACT') types.append((field_name, dtype_)) field_names.add(field_name) values = zeros(cycles_nr, dtype=dtype_) for signal_d in cluster: signal = signal_d['signal'] bit_count = signal_d['bit_count'] min_val = signal_d['min'] max_val = signal_d['max'] name = signal.name for key in ('channels', 'sources'): gp['texts'][key].append({}) for key in ('conversion_tab', 'conversions'): gp['texts'][key].append(None) block = TextBlock(text=name, meta=False) if memory == 'minimum': address = tell() gp_texts['channels'][-1]['name_addr'] = address write(bytes(block)) gp_texts['sources'][-1]['name_addr'] = si_text.address gp_texts['sources'][-1]['path_addr'] = si_text.address else: gp_texts['channels'][-1]['name_addr'] = block gp_texts['sources'][-1]['name_addr'] = si_text gp_texts['sources'][-1]['path_addr'] = si_text if signal.unit: block = TextBlock(text=signal.unit, meta=False) if memory == 'minimum': address = tell() gp_texts['channels'][-1]['unit_addr'] = address write(bytes(block)) else: gp_texts['channels'][-1]['unit_addr'] = block # conversions for channel info = signal.info conv_texts_tab = {} if info and 'raw' in info: kargs = {} raw = info['raw'] phys = info['phys'] if raw.dtype.kind == 'S': kargs['conversion_type'] = v4c.CONVERSION_TYPE_TTAB for i, (r_, p_) in enumerate(zip(raw, phys)): kargs['text_{}'.format(i)] = 0 kargs['val_{}'.format(i)] = p_ block = TextBlock( text=r_, meta=False, ) if memory != 'minimum': conv_texts_tab['text_{}'.format(i)] = block else: address = tell() conv_texts_tab['text_{}'.format(i)] = address write(bytes(block)) kargs['val_default'] = info['default'] kargs['links_nr'] = len(raw) + 4 else: kargs['conversion_type'] = v4c.CONVERSION_TYPE_TABX for i, (r_, p_) in enumerate(zip(raw, phys)): kargs['text_{}'.format(i)] = 0 kargs['val_{}'.format(i)] = r_ block = TextBlock( text=p_, meta=False, ) if memory != 'minimum': conv_texts_tab['text_{}'.format(i)] = block else: address = tell() conv_texts_tab['text_{}'.format(i)] = address write(bytes(block)) if info.get('default', b''): block = TextBlock( text=info['default'], meta=False, ) if memory != 'minimum': conv_texts_tab['default_addr'] = block else: address = tell() conv_texts_tab['default_addr'] = address write(bytes(block)) kargs['default_addr'] = 0 kargs['links_nr'] = len(raw) + 5 block = ChannelConversion(**kargs) if memory != 'minimum': gp_conv.append(block) else: address = tell() gp_conv.append(address) write(bytes(block)) elif info and 'lower' in info: kargs = {} kargs['conversion_type'] = v4c.CONVERSION_TYPE_RTABX lower = info['lower'] upper = info['upper'] texts = info['phys'] kargs['ref_param_nr'] = len(upper) kargs['default_addr'] = info.get('default', 0) kargs['links_nr'] = len(lower) + 5 for i, (u_, l_, t_) in enumerate(zip(upper, lower, texts)): kargs['lower_{}'.format(i)] = l_ kargs['upper_{}'.format(i)] = u_ kargs['text_{}'.format(i)] = 0 block = TextBlock( text=t_, meta=False, ) if memory != 'minimum': conv_texts_tab['text_{}'.format(i)] = block else: address = tell() conv_texts_tab['text_{}'.format(i)] = address write(bytes(block)) if info.get('default', b''): block = TextBlock( text=info['default'], meta=False, ) if memory != 'minimum': conv_texts_tab['default_addr'] = block else: address = tell() conv_texts_tab['default_addr'] = address write(bytes(block)) kargs['default_addr'] = 0 block = ChannelConversion(**kargs) if memory != 'minimum': gp_conv.append(block) else: address = tell() gp_conv.append(address) write(bytes(block)) else: if memory != 'minimum': gp_conv.append(None) else: gp_conv.append(0) if conv_texts_tab: gp['texts']['conversion_tab'][-1] = conv_texts_tab # source for channel if memory != 'minimum': gp_source.append(SourceInformation()) else: address = tell() block = SourceInformation() write(bytes(block)) gp_source.append(address) # compute additional byte offset for large records size if signal.samples.dtype.kind == 'u': data_type = v4c.DATA_TYPE_UNSIGNED_INTEL else: data_type = v4c.DATA_TYPE_SIGNED_INTEL kargs = { 'channel_type': v4c.CHANNEL_TYPE_VALUE, 'bit_count': bit_count, 'byte_offset': offset + bit_offset // 8, 'bit_offset': bit_offset % 8, 'data_type': data_type, 'min_raw_value': min_val if min_val <= max_val else 0, 'max_raw_value': max_val if min_val <= max_val else 0, 'lower_limit': min_val if min_val <= max_val else 0, 'upper_limit': max_val if min_val <= max_val else 0, } if min_val > max_val: kargs['flags'] = 0 else: kargs['flags'] = v4c.FLAG_PHY_RANGE_OK | v4c.FLAG_VAL_RANGE_OK ch = Channel(**kargs) ch.name = name if memory != 'minimum': gp_channels.append(ch) else: address = tell() write(bytes(ch)) gp_channels.append(address) if name not in self.channels_db: self.channels_db[name] = [] self.channels_db[name].append((dg_cntr, ch_cntr)) # update the parents as well parents[ch_cntr] = field_name, bit_offset values += signal.samples.astype(dtype_) << bit_offset bit_offset += bit_count ch_cntr += 1 # simple channels don't have channel dependencies gp_dep.append(None) offset += dtype_.itemsize fields.append(values) # first add the signals in the simple signal list for signal in simple_signals: name = signal.name for key in ('channels', 'sources'): gp['texts'][key].append({}) for key in ('conversion_tab', 'conversions'): gp['texts'][key].append(None) block = TextBlock(text=name, meta=False) if memory != 'minimum': gp_texts['channels'][-1]['name_addr'] = block else: address = tell() write(bytes(block)) gp_texts['channels'][-1]['name_addr'] = address if signal.unit: block = TextBlock( text=signal.unit, meta=False, ) if memory != 'minimum': gp_texts['channels'][-1]['unit_addr'] = block else: address = tell() write(bytes(block)) gp_texts['channels'][-1]['unit_addr'] = address if memory != 'minimum': gp_texts['sources'][-1]['name_addr'] = si_text gp_texts['sources'][-1]['path_addr'] = si_text else: gp_texts['sources'][-1]['name_addr'] = si_text.address gp_texts['sources'][-1]['path_addr'] = si_text.address # conversions for channel info = signal.info conv_texts_tab = {} if info and 'raw' in info: kargs = {} raw = info['raw'] phys = info['phys'] if raw.dtype.kind == 'S': kargs['conversion_type'] = v4c.CONVERSION_TYPE_TTAB for i, (r_, p_) in enumerate(zip(raw, phys)): kargs['text_{}'.format(i)] = 0 kargs['val_{}'.format(i)] = p_ block = TextBlock( text=r_, meta=False, ) if memory != 'minimum': conv_texts_tab['text_{}'.format(i)] = block else: address = tell() conv_texts_tab['text_{}'.format(i)] = address write(bytes(block)) kargs['val_default'] = info['default'] kargs['links_nr'] = len(raw) + 4 else: kargs['conversion_type'] = v4c.CONVERSION_TYPE_TABX for i, (r_, p_) in enumerate(zip(raw, phys)): kargs['text_{}'.format(i)] = 0 kargs['val_{}'.format(i)] = r_ block = TextBlock( text=p_, meta=False, ) if memory != 'minimum': conv_texts_tab['text_{}'.format(i)] = block else: address = tell() conv_texts_tab['text_{}'.format(i)] = address write(bytes(block)) if info.get('default', b''): block = TextBlock( text=info['default'], meta=False, ) if memory != 'minimum': conv_texts_tab['default_addr'] = block else: address = tell() conv_texts_tab['default_addr'] = address write(bytes(block)) kargs['default_addr'] = 0 kargs['links_nr'] = len(raw) + 5 block = ChannelConversion(**kargs) if memory != 'minimum': gp_conv.append(block) else: address = tell() gp_conv.append(address) write(bytes(block)) elif info and 'lower' in info: kargs = {} kargs['conversion_type'] = v4c.CONVERSION_TYPE_RTABX lower = info['lower'] upper = info['upper'] texts = info['phys'] kargs['ref_param_nr'] = len(upper) kargs['default_addr'] = info.get('default', 0) kargs['links_nr'] = len(lower) + 5 for i, (u_, l_, t_) in enumerate(zip(upper, lower, texts)): kargs['lower_{}'.format(i)] = l_ kargs['upper_{}'.format(i)] = u_ kargs['text_{}'.format(i)] = 0 block = TextBlock( text=t_, meta=False, ) if memory != 'minimum': conv_texts_tab['text_{}'.format(i)] = block else: address = tell() conv_texts_tab['text_{}'.format(i)] = address write(bytes(block)) if info.get('default', b''): block = TextBlock( text=info['default'], meta=False, ) if memory != 'minimum': conv_texts_tab['default_addr'] = block else: address = tell() conv_texts_tab['default_addr'] = address write(bytes(block)) kargs['default_addr'] = 0 block = ChannelConversion(**kargs) if memory != 'minimum': gp_conv.append(block) else: address = tell() gp_conv.append(address) write(bytes(block)) else: if memory != 'minimum': gp_conv.append(None) else: gp_conv.append(0) if conv_texts_tab: gp['texts']['conversion_tab'][-1] = conv_texts_tab # source for channel if memory != 'minimum': gp_source.append(SourceInformation()) else: block = SourceInformation() address = tell() write(bytes(block)) gp_source.append(address) # compute additional byte offset for large records size s_type, s_size = fmt_to_datatype(signal.samples.dtype) byte_size = max(s_size // 8, 1) min_val, max_val = get_min_max(signal.samples) kargs = { 'channel_type': v4c.CHANNEL_TYPE_VALUE, 'bit_count': s_size, 'byte_offset': offset, 'bit_offset': 0, 'data_type': s_type, 'min_raw_value': min_val if min_val <= max_val else 0, 'max_raw_value': max_val if min_val <= max_val else 0, 'lower_limit': min_val if min_val <= max_val else 0, 'upper_limit': max_val if min_val <= max_val else 0, } if min_val > max_val: kargs['flags'] = 0 else: kargs['flags'] = v4c.FLAG_PHY_RANGE_OK | v4c.FLAG_VAL_RANGE_OK ch = Channel(**kargs) ch.name = name if memory != 'minimum': gp_channels.append(ch) else: address = tell() write(bytes(ch)) gp_channels.append(address) offset += byte_size if name not in self.channels_db: self.channels_db[name] = [] self.channels_db[name].append((dg_cntr, ch_cntr)) # update the parents as well field_name = get_unique_name(field_names, name) parents[ch_cntr] = field_name, 0 fields.append(signal.samples) types.append((field_name, signal.samples.dtype)) field_names.add(field_name) ch_cntr += 1 # simple channels don't have channel dependencies gp_dep.append(None) canopen_time_fields = ( 'ms', 'days', ) canopen_date_fields = ( 'ms', 'min', 'hour', 'day', 'month', 'year', 'summer_time', 'day_of_week', ) for signal in composed_signals: names = signal.samples.dtype.names if names is None: names = [] name = signal.name if names in (canopen_time_fields, canopen_date_fields): field_name = get_unique_name(field_names, name) field_names.add(field_name) if names == canopen_time_fields: vals = signal.samples.tostring() fields.append(frombuffer(vals, dtype='V6')) types.append((field_name, 'V6')) byte_size = 6 s_type = v4c.DATA_TYPE_CANOPEN_TIME else: vals = [] for field in ('ms', 'min', 'hour', 'day', 'month', 'year'): vals.append(signal.samples[field]) vals = fromarrays(vals).tostring() fields.append(frombuffer(vals, dtype='V7')) types.append((field_name, 'V7')) byte_size = 7 s_type = v4c.DATA_TYPE_CANOPEN_DATE s_size = byte_size << 3 # add channel texts for key in ('channels', 'sources'): gp['texts'][key].append({}) for key in ('conversion_tab', 'conversions'): gp['texts'][key].append(None) block = TextBlock(text=name, meta=False) if memory != 'minimum': gp_texts['channels'][-1]['name_addr'] = block else: address = tell() write(bytes(block)) gp_texts['channels'][-1]['name_addr'] = address if signal.unit: block = TextBlock( text=signal.unit, meta=False, ) if memory != 'minimum': gp_texts['channels'][-1]['unit_addr'] = block else: address = tell() write(bytes(block)) gp_texts['channels'][-1]['unit_addr'] = address if memory != 'minimum': gp_texts['sources'][-1]['name_addr'] = si_text gp_texts['sources'][-1]['path_addr'] = si_text else: gp_texts['sources'][-1]['name_addr'] = si_text.address gp_texts['sources'][-1]['path_addr'] = si_text.address # add channel conversion if memory != 'minimum': gp_conv.append(None) else: gp_conv.append(0) # source for time if memory != 'minimum': gp_source.append(SourceInformation()) else: address = tell() block = SourceInformation() write(bytes(block)) gp_source.append(address) # there is no chanel dependency gp_dep.append(None) # add channel block kargs = { 'channel_type': v4c.CHANNEL_TYPE_VALUE, 'bit_count': s_size, 'byte_offset': offset, 'bit_offset': 0, 'data_type': s_type, 'min_raw_value': 0, 'max_raw_value': 0, 'lower_limit': 0, 'upper_limit': 0, 'flags': 0, } ch = Channel(**kargs) ch.name = name if memory != 'minimum': gp_channels.append(ch) else: address = tell() write(bytes(ch)) gp_channels.append(address) offset += byte_size if name in self.channels_db: self.channels_db[name].append((dg_cntr, ch_cntr)) else: self.channels_db[name] = [] self.channels_db[name].append((dg_cntr, ch_cntr)) # update the parents as well parents[ch_cntr] = field_name, 0 ch_cntr += 1 elif names and names[0] != signal.name: # here we have a structure channel composition field_name = get_unique_name(field_names, name) field_names.add(field_name) # first we add the structure channel # add channel texts for key in ('channels', 'sources'): gp['texts'][key].append({}) for key in ('conversion_tab', 'conversions'): gp['texts'][key].append(None) block = TextBlock(text=name, meta=False) if memory != 'minimum': gp_texts['channels'][-1]['name_addr'] = block else: address = tell() write(bytes(block)) gp_texts['channels'][-1]['name_addr'] = address if signal.unit: block = TextBlock( text=signal.unit, meta=False, ) if memory != 'minimum': gp_texts['channels'][-1]['unit_addr'] = block else: address = tell() write(bytes(block)) gp_texts['channels'][-1]['unit_addr'] = address if memory != 'minimum': gp_texts['sources'][-1]['name_addr'] = si_text gp_texts['sources'][-1]['path_addr'] = si_text else: gp_texts['sources'][-1]['name_addr'] = si_text.address gp_texts['sources'][-1]['path_addr'] = si_text.address # add channel conversion if memory != 'minimum': gp_conv.append(None) else: gp_conv.append(0) # source for time if memory != 'minimum': gp_source.append(SourceInformation()) else: address = tell() block = SourceInformation() write(bytes(block)) gp_source.append(address) # add channel block kargs = { 'channel_type': v4c.CHANNEL_TYPE_VALUE, 'bit_count': 8, 'byte_offset': offset, 'bit_offset': 0, 'data_type': v4c.DATA_TYPE_BYTEARRAY, 'min_raw_value': 0, 'max_raw_value': 0, 'lower_limit': 0, 'upper_limit': 0, 'flags': 0, } ch = Channel(**kargs) ch.name = name if memory != 'minimum': gp_channels.append(ch) else: address = tell() write(bytes(ch)) gp_channels.append(address) if name not in self.channels_db: self.channels_db[name] = [] self.channels_db[name].append((dg_cntr, ch_cntr)) # update the parents as well parents[ch_cntr] = name, 0 ch_cntr += 1 dep_list = [] gp_dep.append(dep_list) # then we add the fields for name in names: field_name = get_unique_name(field_names, name) field_names.add(field_name) samples = signal.samples[name] s_type, s_size = fmt_to_datatype(samples.dtype) byte_size = s_size >> 3 fields.append(samples) types.append((field_name, samples.dtype)) # add channel texts for key in ('channels', 'sources'): gp['texts'][key].append({}) for key in ('conversion_tab', 'conversions'): gp['texts'][key].append(None) block = TextBlock(text=name, meta=False) if memory != 'minimum': gp_texts['channels'][-1]['name_addr'] = block else: address = tell() write(bytes(block)) gp_texts['channels'][-1]['name_addr'] = address if memory != 'minimum': gp_texts['sources'][-1]['name_addr'] = si_text gp_texts['sources'][-1]['path_addr'] = si_text else: gp_texts['sources'][-1]['name_addr'] = si_text.address gp_texts['sources'][-1]['path_addr'] = si_text.address # add channel conversion if memory != 'minimum': gp_conv.append(None) else: gp_conv.append(0) # source for time if memory != 'minimum': gp_source.append(SourceInformation()) else: address = tell() block = SourceInformation() write(bytes(block)) gp_source.append(address) # add channel block min_val, max_val = get_min_max(signal.samples) kargs = { 'channel_type': v4c.CHANNEL_TYPE_VALUE, 'bit_count': s_size, 'byte_offset': offset, 'bit_offset': 0, 'data_type': s_type, 'min_raw_value': min_val if min_val <= max_val else 0, 'max_raw_value': max_val if min_val <= max_val else 0, 'lower_limit': min_val if min_val <= max_val else 0, 'upper_limit': max_val if min_val <= max_val else 0, 'flags': v4c.FLAG_PHY_RANGE_OK | v4c.FLAG_VAL_RANGE_OK, } ch = Channel(**kargs) ch.name = name if memory != 'minimum': gp_channels.append(ch) dep_list.append(ch) else: address = tell() write(bytes(ch)) gp_channels.append(address) dep_list.append(address) offset += byte_size if name not in self.channels_db: self.channels_db[name] = [] self.channels_db[name].append((dg_cntr, ch_cntr)) # update the parents as well parents[ch_cntr] = field_name, 0 ch_cntr += 1 gp_dep.append(None) else: # here we have channel arrays or mdf v3 channel dependencies if names: samples = signal.samples[names[0]] else: samples = signal.samples shape = samples.shape[1:] if len(shape) > 1: # add channel dependency block for composed parent channel dims_nr = len(shape) names_nr = len(names) if names_nr == 0: kargs = { 'dims': dims_nr, 'ca_type': v4c.CA_TYPE_LOOKUP, 'flags': v4c.FLAG_CA_FIXED_AXIS, 'byte_offset_base': samples.dtype.itemsize, } for i in range(dims_nr): kargs['dim_size_{}'.format(i)] = shape[i] elif len(names) == 1: kargs = { 'dims': dims_nr, 'ca_type': v4c.CA_TYPE_ARRAY, 'flags': 0, 'byte_offset_base': samples.dtype.itemsize, } for i in range(dims_nr): kargs['dim_size_{}'.format(i)] = shape[i] else: kargs = { 'dims': dims_nr, 'ca_type': v4c.CA_TYPE_LOOKUP, 'flags': v4c.FLAG_CA_AXIS, 'byte_offset_base': samples.dtype.itemsize, } for i in range(dims_nr): kargs['dim_size_{}'.format(i)] = shape[i] parent_dep = ChannelArrayBlock(**kargs) gp_dep.append([parent_dep, ]) else: # add channel dependency block for composed parent channel kargs = { 'dims': 1, 'ca_type': v4c.CA_TYPE_SCALE_AXIS, 'flags': 0, 'byte_offset_base': samples.dtype.itemsize, 'dim_size_0': shape[0], } parent_dep = ChannelArrayBlock(**kargs) gp_dep.append([parent_dep, ]) field_name = get_unique_name(field_names, name) field_names.add(field_name) fields.append(samples) dtype_pair = field_name, samples.dtype, shape types.append(dtype_pair) # first we add the structure channel # add channel texts for key in ('channels', 'sources'): gp['texts'][key].append({}) for key in ('conversion_tab', 'conversions'): gp['texts'][key].append(None) block = TextBlock(text=name, meta=False) if memory != 'minimum': gp_texts['channels'][-1]['name_addr'] = block else: address = tell() write(bytes(block)) gp_texts['channels'][-1]['name_addr'] = address if signal.unit: block = TextBlock( text=signal.unit, meta=False, ) if memory != 'minimum': gp_texts['channels'][-1]['unit_addr'] = block else: address = tell() write(bytes(block)) gp_texts['channels'][-1]['unit_addr'] = address if memory != 'minimum': gp_texts['sources'][-1]['name_addr'] = si_text gp_texts['sources'][-1]['path_addr'] = si_text else: gp_texts['sources'][-1]['name_addr'] = si_text.address gp_texts['sources'][-1]['path_addr'] = si_text.address # add channel conversion if memory != 'minimum': gp_conv.append(None) else: gp_conv.append(0) # source for time if memory != 'minimum': gp_source.append(SourceInformation()) else: address = tell() block = SourceInformation() write(bytes(block)) gp_source.append(address) s_type, s_size = fmt_to_datatype(samples.dtype) # add channel block kargs = { 'channel_type': v4c.CHANNEL_TYPE_VALUE, 'bit_count': s_size, 'byte_offset': offset, 'bit_offset': 0, 'data_type': s_type, 'min_raw_value': 0, 'max_raw_value': 0, 'lower_limit': 0, 'upper_limit': 0, 'flags': 0, } ch = Channel(**kargs) ch.name = name if memory != 'minimum': gp_channels.append(ch) else: address = tell() write(bytes(ch)) gp_channels.append(address) size = s_size >> 3 for dim in shape: size *= dim offset += size if name not in self.channels_db: self.channels_db[name] = [] self.channels_db[name].append((dg_cntr, ch_cntr)) # update the parents as well parents[ch_cntr] = name, 0 ch_cntr += 1 for name in names[1:]: field_name = get_unique_name(field_names, name) field_names.add(field_name) samples = signal.samples[name] shape = samples.shape[1:] fields.append(samples) types.append((field_name, samples.dtype, shape)) # add composed parent signal texts for key in ('channels', 'sources'): gp['texts'][key].append({}) for key in ('conversion_tab', 'conversions'): gp['texts'][key].append(None) block = TextBlock(text=name, meta=False) if memory != 'minimum': gp_texts['channels'][-1]['name_addr'] = block else: address = tell() write(bytes(block)) gp_texts['channels'][-1]['name_addr'] = address if memory != 'minimum': gp_texts['sources'][-1]['name_addr'] = si_text gp_texts['sources'][-1]['path_addr'] = si_text else: gp_texts['sources'][-1]['name_addr'] = si_text.address gp_texts['sources'][-1]['path_addr'] = si_text.address # add channel conversion if memory != 'minimum': gp_conv.append(None) else: gp_conv.append(0) # source for time if memory != 'minimum': gp_source.append(SourceInformation()) else: address = tell() block = SourceInformation() write(bytes(block)) gp_source.append(address) # add channel dependency block kargs = { 'dims': 1, 'ca_type': v4c.CA_TYPE_SCALE_AXIS, 'flags': 0, 'byte_offset_base': samples.dtype.itemsize, 'dim_size_0': shape[0], } dep = ChannelArrayBlock(**kargs) gp_dep.append([dep, ]) # add components channel min_val, max_val = get_min_max(samples) s_type, s_size = fmt_to_datatype(samples.dtype) byte_size = max(s_size // 8, 1) kargs = { 'channel_type': v4c.CHANNEL_TYPE_VALUE, 'bit_count': s_size, 'byte_offset': offset, 'bit_offset': 0, 'data_type': s_type, 'min_raw_value': min_val if min_val <= max_val else 0, 'max_raw_value': max_val if min_val <= max_val else 0, 'lower_limit': min_val if min_val <= max_val else 0, 'upper_limit': max_val if min_val <= max_val else 0, 'flags': v4c.FLAG_PHY_RANGE_OK | v4c.FLAG_VAL_RANGE_OK, } channel = Channel(**kargs) channel.name = name if memory != 'minimum': gp_channels.append(channel) else: address = tell() write(bytes(channel)) gp_channels.append(address) parent_dep.referenced_channels.append((ch_cntr, dg_cntr)) for dim in shape: byte_size *= dim offset += byte_size if name not in self.channels_db: self.channels_db[name] = [] self.channels_db[name].append((dg_cntr, ch_cntr)) # update the parents as well parents[ch_cntr] = field_name, 0 ch_cntr += 1 # channel group kargs = { 'cycles_nr': cycles_nr, 'samples_byte_nr': offset, } gp['channel_group'] = ChannelGroup(**kargs) gp['size'] = cycles_nr * offset # data group gp['data_group'] = DataGroup() # data block if PYVERSION == 2: types = fix_dtype_fields(types) types = dtype(types) gp['sorted'] = True gp['types'] = types gp['parents'] = parents samples = fromarrays(fields, dtype=types) signals = None del signals try: block = samples.tostring() if memory == 'full': gp['data_location'] = v4c.LOCATION_MEMORY gp['data_block'] = DataBlock(data=block) else: gp['data_location'] = v4c.LOCATION_TEMPORARY_FILE data_address = self._tempfile.tell() gp['data_group']['data_block_addr'] = data_address self._tempfile.write(bytes(block)) except MemoryError: if memory == 'full': raise else: gp['data_location'] = v4c.LOCATION_TEMPORARY_FILE data_address = self._tempfile.tell() gp['data_group']['data_block_addr'] = data_address for sample in samples: self._tempfile.write(sample.tostring())
[docs] def attach(self, data, file_name=None, comment=None, compression=True, mime=r'application/octet-stream'): """ attach embedded attachment as application/octet-stream Parameters ---------- data : bytes data to be attached file_name : str string file name comment : str attachment comment compression : bool use compression for embedded attachment data mime : str mime type string """ creator_index = len(self.file_history) fh = FileHistory() text = """<FHcomment> <TX>Added new embedded attachment from {}</TX> <tool_id>asammdf</tool_id> <tool_vendor>asammdf</tool_vendor> <tool_version>{}</tool_version> </FHcomment>""" text = text.format( file_name if file_name else 'bin.bin', __version__, ) fh_text = TextBlock(text=text, meta=True) self.file_history.append((fh, fh_text)) texts = {} texts['mime_addr'] = TextBlock(text=mime, meta=False) if comment: texts['comment_addr'] = TextBlock(text=comment, meta=False) text = file_name if file_name else 'bin.bin' texts['file_name_addr'] = TextBlock(text=text) at_block = AttachmentBlock(data=data, compression=compression) at_block['creator_index'] = creator_index self.attachments.append((at_block, texts))
[docs] def close(self): """ if the MDF was created with memory=False and new channels have been appended, then this must be called just before the object is not used anymore to clean-up the temporary file""" if self._tempfile is not None: self._tempfile.close() if self._file is not None: self._file.close()
[docs] def extract_attachment(self, index): """ extract attachment *index* data. If it is an embedded attachment, then this method creates the new file according to the attachment file name information Parameters ---------- index : int attachment index Returns ------- data : bytes | str attachment data """ try: current_path = os.getcwd() os.chdir(os.path.dirname(self.name)) attachment, texts = self.attachments[index] flags = attachment['flags'] # for embedded attachments extrat data and create new files if flags & v4c.FLAG_AT_EMBEDDED: data = attachment.extract() file_path = texts['file_name_addr']['text']\ .decode('utf-8')\ .strip(' \n\t\0') out_path = os.path.dirname(file_path) if out_path: if not os.path.exists(out_path): os.makedirs(out_path) with open(file_path, 'wb') as f: f.write(data) return data else: # for external attachments read the file and return the content if flags & v4c.FLAG_AT_MD5_VALID: file_path = texts['file_name_addr']['text']\ .decode('utf-8')\ .strip(' \n\t\0') data = open(file_path, 'rb').read() md5_worker = md5() md5_worker.update(data) md5_sum = md5_worker.digest() if attachment['md5_sum'] == md5_sum: if texts['mime_addr']['text']\ .decode('utf-8')\ .startswith('text'): with open(file_path, 'r') as f: data = f.read() return data else: message = ('ATBLOCK md5sum="{}" ' 'and external attachment data ({}) ' 'md5sum="{}"') message = message.format( attachment['md5_sum'], file_path, md5_sum, ) warnings.warn(message) else: if texts['mime_addr']['text']\ .decode('utf-8')\ .startswith('text'): mode = 'r' else: mode = 'rb' with open(file_path, mode) as f: data = f.read() return data except Exception as err: os.chdir(current_path) message = 'Exception during attachment extraction: ' + repr(err) warnings.warn(message)
[docs] def get_channel_unit(self, name=None, group=None, index=None): """Gets channel unit. Channel can be specified in two ways: * using the first positional argument *name* * if there are multiple occurrences for this channel then the *group* and *index* arguments can be used to select a specific group. * if there are multiple occurrences for this channel and either the *group* or *index* arguments is None then a warning is issued * using the group number (keyword argument *group*) and the channel number (keyword argument *index*). Use *info* method for group and channel numbers If the *raster* keyword argument is not *None* the output is interpolated accordingly. Parameters ---------- name : string name of channel group : int 0-based group index index : int 0-based channel index Returns ------- unit : str found channel unit """ gp_nr, ch_nr = self._validate_channel_selection( name, group, index, ) grp = self.groups[gp_nr] if grp['data_location'] == v4c.LOCATION_ORIGINAL_FILE: stream = self._file else: stream = self._tempfile conv_texts = grp['texts']['conversions'][ch_nr] channel_texts = grp['texts']['channels'][ch_nr] if conv_texts and 'unit_addr' in conv_texts: if not self.memory == 'minimum': unit = conv_texts['unit_addr'] else: unit = TextBlock( address=conv_texts['unit_addr'], stream=stream, ) if PYVERSION == 3: try: unit = unit['text'].decode('utf-8').strip(' \n\t\0') except UnicodeDecodeError: unit = '' else: unit = unit['text'].strip(' \n\t\0') else: # search for physical unit in channel texts if 'unit_addr' in channel_texts: if not self.memory == 'minimum': unit = channel_texts['unit_addr'] else: unit = TextBlock( address=channel_texts['unit_addr'], stream=stream, ) if PYVERSION == 3: unit = unit['text'].decode('utf-8').strip(' \n\t\0') else: unit = unit['text'].strip(' \n\t\0') else: unit = '' return unit
[docs] def get_channel_comment(self, name=None, group=None, index=None): """Gets channel comment. Channel can be specified in two ways: * using the first positional argument *name* * if there are multiple occurrences for this channel then the *group* and *index* arguments can be used to select a specific group. * if there are multiple occurrences for this channel and either the *group* or *index* arguments is None then a warning is issued * using the group number (keyword argument *group*) and the channel number (keyword argument *index*). Use *info* method for group and channel numbers If the *raster* keyword argument is not *None* the output is interpolated accordingly. Parameters ---------- name : string name of channel group : int 0-based group index index : int 0-based channel index Returns ------- comment : str found channel comment """ gp_nr, ch_nr = self._validate_channel_selection( name, group, index, ) grp = self.groups[gp_nr] channel_texts = grp['texts']['channels'][ch_nr] if grp['data_location'] == v4c.LOCATION_ORIGINAL_FILE: stream = self._file else: stream = self._tempfile if 'comment_addr' in channel_texts: if self.memory == 'minimum': comment = TextBlock( address=channel_texts['comment_addr'], stream=stream, ) else: comment = channel_texts['comment_addr'] if comment['id'] == b'##MD': comment = comment['text'].decode('utf-8').strip(' \n\t\0') try: comment = XML.fromstring(comment).find('TX').text except: comment = '' else: comment = comment['text'].decode('utf-8') else: comment = '' return comment
[docs] def get(self, name=None, group=None, index=None, raster=None, samples_only=False, data=None): """Gets channel samples. Channel can be specified in two ways: * using the first positional argument *name* * if there are multiple occurances for this channel then the *group* and *index* arguments can be used to select a specific group. * if there are multiple occurances for this channel and either the *group* or *index* arguments is None then a warning is issued * using the group number (keyword argument *group*) and the channel number (keyword argument *index*). Use *info* method for group and channel numbers If the *raster* keyword argument is not *None* the output is interpolated accordingly Parameters ---------- name : string name of channel group : int 0-based group index index : int 0-based channel index raster : float time raster in seconds samples_only : bool if *True* return only the channel samples as numpy array; if *False* return a *Signal* object Returns ------- res : (numpy.array | Signal) returns *Signal* if *samples_only*=*False* (default option), otherwise returns numpy.array The *Signal* samples are: * numpy recarray for channels that have composition/channel array address or for channel of type BYTEARRAY, CANOPENDATE, CANOPENTIME * numpy array for all the rest Raises ------ MdfError : * if the channel name is not found * if the group index is out of range * if the channel index is out of range """ gp_nr, ch_nr = self._validate_channel_selection( name, group, index, ) memory = self.memory grp = self.groups[gp_nr] if grp['data_location'] == v4c.LOCATION_ORIGINAL_FILE: stream = self._file else: stream = self._tempfile if memory == 'minimum': channel = Channel( address=grp['channels'][ch_nr], stream=stream, ) addr = grp['channel_conversions'][ch_nr] if addr: conversion = ChannelConversion( address=addr, stream=stream, ) else: conversion = None if name is None: name = TextBlock( address=grp['texts']['channels'][ch_nr]['name_addr'], stream=stream, ) name = name['text'].decode('utf-8').strip(' \r\t\n\0') name = name.split('\\')[0] channel.name = name else: channel = grp['channels'][ch_nr] conversion = grp['channel_conversions'][ch_nr] name = channel.name dependency_list = grp['channel_dependencies'][ch_nr] cycles_nr = grp['channel_group']['cycles_nr'] # get data group record try: parents, dtypes = grp['parents'], grp['types'] except KeyError: grp['parents'], grp['types'] = self._prepare_record(grp) parents, dtypes = grp['parents'], grp['types'] # get group data if data is None: data = self._load_group_data(grp) info = None # get the channel signal data if available signal_data = self._load_signal_data( channel['data_block_addr'] ) # check if this is a channel array if dependency_list: arrays = [] name = channel.name if all(isinstance(dep, Channel) for dep in dependency_list): # structure channel composition if memory == 'minimum': names = [] # TODO : get exactly he group and chanenl for address in dependency_list: channel = Channel( address=address, stream=stream, ) block = TextBlock( address=channel['name_addr'], stream=stream, ) name_ = block['text'].decode('utf-8') name_ = name_.split('\\')[0].strip(' \n\r\t\0') names.append(name_) else: names = [ch.name for ch in dependency_list] arrays = [ self.get(name_, samples_only=True) for name_ in names ] types = [(name_, arr.dtype) for name_, arr in zip(names, arrays)] if PYVERSION == 2: types = fix_dtype_fields(types) types = dtype(types) vals = fromarrays(arrays, dtype=types) cycles_nr = len(vals) else: # channel arrays arrays = [] types = [] try: parent, bit_offset = parents[ch_nr] except KeyError: parent, bit_offset = None, None if parent is not None: if 'record' not in grp: if dtypes.itemsize: record = fromstring(data, dtype=dtypes) else: record = None if self.memory == 'full': grp['record'] = record else: record = grp['record'] vals = record[parent] else: vals = self._get_not_byte_aligned_data(data, grp, ch_nr) dep = dependency_list[0] if dep['flags'] & v4c.FLAG_CA_INVERSE_LAYOUT: shape = vals.shape shape = (shape[0],) + shape[1:][::-1] vals = vals.reshape(shape) axes = (0,) + tuple(range(len(shape) - 1, 0, -1)) vals = transpose(vals, axes=axes) cycles_nr = len(vals) for ca_block in dependency_list[:1]: dims_nr = ca_block['dims'] if ca_block['ca_type'] == v4c.CA_TYPE_SCALE_AXIS: shape = (ca_block['dim_size_0'], ) arrays.append(vals) dtype_pair = channel.name, vals.dtype, shape types.append(dtype_pair) elif ca_block['ca_type'] == v4c.CA_TYPE_LOOKUP: shape = vals.shape[1:] arrays.append(vals) dtype_pair = channel.name, vals.dtype, shape types.append(dtype_pair) if ca_block['flags'] & v4c.FLAG_CA_FIXED_AXIS: for i in range(dims_nr): shape = (ca_block['dim_size_{}'.format(i)], ) axis = [] for j in range(shape[0]): key = 'axis_{}_value_{}'.format(i, j) axis.append(ca_block[key]) axis = array([axis for _ in range(cycles_nr)]) arrays.append(axis) dtype_pair = ( 'axis_{}'.format(i), axis.dtype, shape, ) types.append(dtype_pair) else: for i in range(dims_nr): ch_nr, dg_nr = ca_block.referenced_channels[i] if memory == 'minimum': axisname = self.groups[dg_nr]['texts']['channels'][ch_nr]['name_addr'] block = TextBlock(address=axisname, stream=self._file) axisname = block['text'].decode('utf-8').strip(' \t\n\r\0') axisname = axisname.split('\\')[0] else: axisname = self.groups[dg_nr]['channels'][ch_nr].name shape = (ca_block['dim_size_{}'.format(i)], ) axis_values = self.get( group=dg_nr, index=ch_nr, samples_only=True) axis_values = axis_values[axisname] arrays.append(axis_values) dtype_pair = ( axisname, axis_values.dtype, shape, ) types.append(dtype_pair) elif ca_block['ca_type'] == v4c.CA_TYPE_ARRAY: shape = vals.shape[1:] arrays.append(vals) dtype_pair = channel.name, vals.dtype, shape types.append(dtype_pair) for ca_block in dependency_list[1:]: dims_nr = ca_block['dims'] if ca_block['flags'] & v4c.FLAG_CA_FIXED_AXIS: for i in range(dims_nr): shape = (ca_block['dim_size_{}'.format(i)], ) axis = [] for j in range(shape[0]): key = 'axis_{}_value_{}'.format(i, j) axis.append(ca_block[key]) axis = array([axis for _ in range(cycles_nr)]) arrays.append(axis) dtype_pair = 'axis_{}'.format(i), axis.dtype, shape types.append(dtype_pair) else: for i in range(dims_nr): ch_nr, dg_nr = ca_block.referenced_channels[i] if memory == 'minimum': axisname = self.groups[dg_nr]['texts']['channels'][ch_nr]['name_addr'] block = TextBlock(address=axisname, stream=self._file) axisname = block['text'].decode('utf-8').strip(' \t\n\r\0') axisname = axisname.split('\\')[0] else: axisname = self.groups[dg_nr]['channels'][ch_nr].name shape = (ca_block['dim_size_{}'.format(i)], ) axis_values = self.get( group=dg_nr, index=ch_nr, samples_only=True) axis_values = axis_values[axisname] arrays.append(axis_values) dtype_pair = axisname, axis_values.dtype, shape types.append(dtype_pair) if PYVERSION == 2: types = fix_dtype_fields(types) vals = fromarrays(arrays, dtype(types)) else: # get channel values if channel['channel_type'] in (v4c.CHANNEL_TYPE_VIRTUAL, v4c.CHANNEL_TYPE_VIRTUAL_MASTER): data_type = channel['data_type'] ch_dtype = dtype(get_fmt(data_type, 8)) vals = arange(cycles_nr, dtype=ch_dtype) else: try: parent, bit_offset = parents[ch_nr] except KeyError: parent, bit_offset = None, None if parent is not None: if 'record' not in grp: if dtypes.itemsize: record = fromstring(data, dtype=dtypes) else: record = None if memory == 'full': grp['record'] = record else: record = grp['record'] vals = record[parent] bits = channel['bit_count'] size = vals.dtype.itemsize data_type = channel['data_type'] if vals.dtype.kind not in 'ui' and (bit_offset or not bits == size * 8): vals = self._get_not_byte_aligned_data(data, grp, ch_nr) else: if bit_offset: dtype_ = vals.dtype if dtype_.kind == 'i': vals = vals.astype(dtype('<u{}'.format(size))) vals >>= bit_offset else: vals = vals >> bit_offset if not bits == size * 8: mask = (1 << bits) - 1 if vals.flags.writeable: vals &= mask else: vals = vals & mask if data_type in v4c.SIGNED_INT: size = vals.dtype.itemsize mask = (1 << (size * 8)) - 1 mask = (mask << bits) & mask vals |= mask vals = vals.astype('<i{}'.format(size), copy=False) else: vals = self._get_not_byte_aligned_data(data, grp, ch_nr) if cycles_nr: if conversion is None: conversion_type = v4c.CONVERSION_TYPE_NON else: conversion_type = conversion['conversion_type'] if conversion_type == v4c.CONVERSION_TYPE_NON: # check if it is VLDS channel type with SDBLOCK data_type = channel['data_type'] channel_type = channel['channel_type'] if channel_type in (v4c.CHANNEL_TYPE_VALUE, v4c.CHANNEL_TYPE_MLSD): if v4c.DATA_TYPE_STRING_LATIN_1 \ <= data_type \ <= v4c.DATA_TYPE_STRING_UTF_16_BE: vals = [val.tobytes() for val in vals] if data_type == v4c.DATA_TYPE_STRING_UTF_16_BE: encoding = 'utf-16-be' elif data_type == v4c.DATA_TYPE_STRING_UTF_16_LE: encoding = 'utf-16-le' elif data_type == v4c.DATA_TYPE_STRING_UTF_8: encoding = 'utf-8' elif data_type == v4c.DATA_TYPE_STRING_LATIN_1: encoding = 'latin-1' vals = array( [x.decode(encoding).strip('\0') for x in vals] ) vals = encode(vals, 'latin-1') # CANopen date elif data_type == v4c.DATA_TYPE_CANOPEN_DATE: vals = vals.tostring() types = dtype( [('ms', '<u2'), ('min', '<u1'), ('hour', '<u1'), ('day', '<u1'), ('month', '<u1'), ('year', '<u1')] ) dates = fromstring(vals, types) arrays = [] arrays.append(dates['ms']) # bit 6 and 7 of minutes are reserved arrays.append(dates['min'] & 0x3F) # only firt 4 bits of hour are used arrays.append(dates['hour'] & 0xF) # the first 4 bits are the day number arrays.append(dates['day'] & 0xF) # bit 6 and 7 of month are reserved arrays.append(dates['month'] & 0x3F) # bit 7 of year is reserved arrays.append(dates['year'] & 0x7F) # add summer or standard time information for hour arrays.append((dates['hour'] & 0x80) >> 7) # add day of week information arrays.append((dates['day'] & 0xF0) >> 4) names = [ 'ms', 'min', 'hour', 'day', 'month', 'year', 'summer_time', 'day_of_week', ] vals = fromarrays(arrays, names=names) # CANopen time elif data_type == v4c.DATA_TYPE_CANOPEN_TIME: vals = vals.tostring() types = dtype( [('ms', '<u4'), ('days', '<u2')] ) dates = fromstring(vals, types) arrays = [] # bits 28 to 31 are reserverd for ms arrays.append(dates['ms'] & 0xFFFFFFF) arrays.append(dates['days'] & 0x3F) names = ['ms', 'days'] vals = fromarrays(arrays, names=names) # byte array elif data_type == v4c.DATA_TYPE_BYTEARRAY: vals = vals.tostring() size = max(bits >> 3, 1) vals = frombuffer( vals, dtype=dtype('({},)u1'.format(size)), ) types = [(channel.name, vals.dtype, vals.shape[1:])] if PYVERSION == 2: types = fix_dtype_fields(types) types = dtype(types) arrays = [vals, ] vals = fromarrays(arrays, dtype=types) elif channel_type == v4c.CHANNEL_TYPE_VLSD: if signal_data: values = [] for offset in vals: offset = int(offset) str_size = unpack_from('<I', signal_data, offset)[0] values.append( signal_data[offset+4: offset+4+str_size] ) if data_type == v4c.DATA_TYPE_STRING_UTF_16_BE: vals = [v.decode('utf-16-be') for v in values] elif data_type == v4c.DATA_TYPE_STRING_UTF_16_LE: vals = [v.decode('utf-16-le') for v in values] elif data_type == v4c.DATA_TYPE_STRING_UTF_8: vals = [v.decode('utf-8') for v in values] elif data_type == v4c.DATA_TYPE_STRING_LATIN_1: vals = [v.decode('latin-1') for v in values] if PYVERSION == 2: vals = array([str(val) for val in vals]) else: vals = array(vals) vals = encode(vals, 'latin-1') else: # no VLSD signal data samples vals = array([]) elif conversion_type == v4c.CONVERSION_TYPE_LIN: a = conversion['a'] b = conversion['b'] if (a, b) != (1, 0): vals = vals * a if b: vals += b elif conversion_type == v4c.CONVERSION_TYPE_RAT: P1 = conversion['P1'] P2 = conversion['P2'] P3 = conversion['P3'] P4 = conversion['P4'] P5 = conversion['P5'] P6 = conversion['P6'] if (P1, P2, P3, P4, P5, P6) != (0, 1, 0, 0, 0, 1): X = vals vals = evaluate(v4c.CONV_RAT_TEXT) elif conversion_type == v4c.CONVERSION_TYPE_ALG: if not memory == 'minimum': block = grp['texts']['conversions'][ch_nr]['formula_addr'] formula = block['text'].decode('utf-8').strip(' \n\t\0') else: block = TextBlock( address=grp['texts']['conversions'][ch_nr]['formula_addr'], stream=self._file, ) formula = block['text'].decode('utf-8').strip(' \n\t\0') X = vals vals = evaluate(formula) elif conversion_type in (v4c.CONVERSION_TYPE_TABI, v4c.CONVERSION_TYPE_TAB): nr = conversion['val_param_nr'] // 2 raw = array( [conversion['raw_{}'.format(i)] for i in range(nr)] ) phys = array( [conversion['phys_{}'.format(i)] for i in range(nr)] ) if conversion_type == v4c.CONVERSION_TYPE_TABI: vals = interp(vals, raw, phys) else: idx = searchsorted(raw, vals) idx = clip(idx, 0, len(raw) - 1) vals = phys[idx] elif conversion_type == v4c.CONVERSION_TYPE_RTAB: nr = (conversion['val_param_nr'] - 1) // 3 lower = array( [conversion['lower_{}'.format(i)] for i in range(nr)] ) upper = array( [conversion['upper_{}'.format(i)] for i in range(nr)] ) phys = array( [conversion['phys_{}'.format(i)] for i in range(nr)] ) default = conversion['default'] # INT channel if channel['data_type'] <= 3: res = [] for v in vals: for l, u, p in zip(lower, upper, phys): if l <= v <= u: res.append(p) break else: res.append(default) size = max(bits >> 3, 1) ch_fmt = get_fmt(channel['data_type'], size) vals = array(res).astype(ch_fmt) # else FLOAT channel else: res = [] for v in vals: for l, u, p in zip(lower, upper, phys): if l <= v < u: res.append(p) break else: res.append(default) size = max(bits >> 3, 1) ch_fmt = get_fmt(channel['data_type'], size) vals = array(res).astype(ch_fmt) elif conversion_type == v4c.CONVERSION_TYPE_TABX: nr = conversion['val_param_nr'] raw = array( [conversion['val_{}'.format(i)] for i in range(nr)] ) if not memory == 'minimum': phys = array( [grp['texts']['conversion_tab'][ch_nr]['text_{}'.format(i)]['text'] for i in range(nr)] ) default = grp['texts']['conversion_tab'][ch_nr]\ .get('default_addr', {})\ .get('text', b'') else: phys = [] for i in range(nr): address=grp['texts']['conversion_tab'][ch_nr]['text_{}'.format(i)] if address: block = TextBlock( address=address, stream=self._file, ) phys.append(block['text']) else: phys.append(b'') phys = array(phys) if grp['texts']['conversion_tab'][ch_nr].get('default_addr', 0): block = TextBlock( address=grp['texts']['conversion_tab'][ch_nr]['default_addr'], stream=self._file, ) default = block['text'] else: default = b'' info = { 'raw': raw, 'phys': phys, 'default': default, } elif conversion_type == v4c.CONVERSION_TYPE_RTABX: nr = conversion['val_param_nr'] // 2 if not memory == 'minimum': phys = array( [grp['texts']['conversion_tab'][ch_nr]['text_{}'.format(i)]['text'] for i in range(nr)] ) default = grp['texts']['conversion_tab'][ch_nr]\ .get('default_addr', {})\ .get('text', b'') else: phys = [] for i in range(nr): address=grp['texts']['conversion_tab'][ch_nr]['text_{}'.format(i)] if address: block = TextBlock( address=address, stream=self._file, ) phys.append(block['text']) else: phys.append(b'') phys = array(phys) if grp['texts']['conversion_tab'][ch_nr].get('default_addr', 0): block = TextBlock( address=grp['texts']['conversion_tab'][ch_nr]['default_addr'], stream=self._file, ) default = block['text'] else: default = b'' lower = array( [conversion['lower_{}'.format(i)] for i in range(nr)] ) upper = array( [conversion['upper_{}'.format(i)] for i in range(nr)] ) info = { 'lower': lower, 'upper': upper, 'phys': phys, 'default': default, } elif conversion_type == v4c.CONVERSION_TYPE_TTAB: nr = conversion['val_param_nr'] - 1 if memory == 'minimum': raw = [] for i in range(nr): block = TextBlock( address=grp['texts']['conversion_tab'][ch_nr]['text_{}'.format(i)], stream=stream, ) raw.append(block['text']) raw = array(raw) else: raw = array( [grp['texts']['conversion_tab'][ch_nr]['text_{}'.format(i)]['text'] for i in range(nr)] ) phys = array( [conversion['val_{}'.format(i)] for i in range(nr)] ) default = conversion['val_default'] info = { 'raw': raw, 'phys': phys, 'default': default, } elif conversion_type == v4c.CONVERSION_TYPE_TRANS: nr = (conversion['ref_param_nr'] - 1) // 2 if memory == 'minimum': in_ = [] for i in range(nr): block = TextBlock( address=grp['texts']['conversion_tab'][ch_nr]['input_{}_addr'.format(i)], stream=stream, ) in_.append(block['text']) in_ = array(in_) out_ = [] for i in range(nr): block = TextBlock( address=grp['texts']['conversion_tab'][ch_nr]['output_{}_addr'.format(i)], stream=stream, ) out_.append(block['text']) out_ = array(out_) block = TextBlock( address=grp['texts']['conversion_tab'][ch_nr]['default_addr'], stream=stream, ) default = block['text'] else: in_ = array( [grp['texts']['conversion_tab'][ch_nr]['input_{}_addr'.format(i)]['text'] for i in range(nr)] ) out_ = array( [grp['texts']['conversion_tab'][ch_nr]['output_{}_addr'.format(i)]['text'] for i in range(nr)] ) default = grp['texts']['conversion_tab'][ch_nr]['default_addr']['text'] res = [] for v in vals: for i, o in zip(in_, out_): if v == i: res.append(o) break else: res.append(default) vals = array(res) info = { 'input': in_, 'output': out_, 'default': default, } # in case of invalidation bits, valid_index will hold the valid indexes valid_index = None if grp['channel_group']['invalidation_bytes_nr']: if channel['flags'] & (v4c.FLAG_INVALIDATION_BIT_VALID | v4c.FLAG_ALL_SAMPLES_VALID) == v4c.FLAG_INVALIDATION_BIT_VALID: ch_invalidation_pos = channel['pos_invalidation_bit'] pos_byte, pos_offset = divmod(ch_invalidation_pos) mask = 1 << pos_offset inval_bytes = record['invalidation_bytes'] inval_index = array( [bytes_[pos_byte] & mask for bytes_ in inval_bytes] ) valid_index = argwhere(inval_index == 0).flatten() vals = vals[valid_index] if samples_only: res = vals else: # search for unit in conversion texts conv_texts = grp['texts']['conversions'][ch_nr] channel_texts = grp['texts']['channels'][ch_nr] if conv_texts and 'unit_addr' in conv_texts: if not memory == 'minimum': unit = conv_texts['unit_addr'] else: unit = TextBlock( address=conv_texts['unit_addr'], stream=stream, ) if PYVERSION == 3: try: unit = unit['text'].decode('utf-8').strip(' \n\t\0') except UnicodeDecodeError: unit = '' else: unit = unit['text'].strip(' \n\t\0') else: # search for physical unit in channel texts if 'unit_addr' in channel_texts: if not memory == 'minimum': unit = channel_texts['unit_addr'] else: unit = TextBlock( address=channel_texts['unit_addr'], stream=stream, ) if PYVERSION == 3: unit = unit['text'].decode('utf-8').strip(' \n\t\0') else: unit = unit['text'].strip(' \n\t\0') else: unit = '' # get the channel commment if available if 'comment_addr' in channel_texts: if memory == 'minimum': comment = TextBlock( address=channel_texts['comment_addr'], stream=stream, ) else: comment = channel_texts['comment_addr'] if comment['id'] == b'##MD': comment = comment['text'].decode('utf-8').strip(' \n\t\0') try: comment = XML.fromstring(comment).find('TX').text except: comment = '' else: comment = comment['text'].decode('utf-8') else: comment = '' t = self.get_master(gp_nr, data) # consider invalidation bits if valid_index is not None: t = t[valid_index] res = Signal( samples=vals, timestamps=t, unit=unit, name=name, comment=comment, info=info, ) if raster and t: tx = linspace(0, t[-1], int(t[-1] / raster)) res = res.interp(tx) return res
[docs] def get_master(self, index, data=None): """ returns master channel samples for given group Parameters ---------- index : int group index data : bytes data block raw bytes; default None Returns ------- t : numpy.array master channel samples """ try: return self._master_channel_cache[index] except KeyError: pass group = self.groups[index] if group['data_location'] == v4c.LOCATION_ORIGINAL_FILE: stream = self._file else: stream = self._tempfile memory = self.memory time_ch_nr = self.masters_db.get(index, None) cycles_nr = group['channel_group']['cycles_nr'] if time_ch_nr is None: t = arange(cycles_nr, dtype=float64) else: time_conv = group['channel_conversions'][time_ch_nr] if memory == 'minimum': if time_conv: time_conv = ChannelConversion( address=group['channel_conversions'][time_ch_nr], stream=stream, ) else: time_conv = None time_ch = Channel( address=group['channels'][time_ch_nr], stream=stream, ) else: time_ch = group['channels'][time_ch_nr] if time_ch['channel_type'] == v4c.CHANNEL_TYPE_VIRTUAL_MASTER: time_a = time_conv['a'] time_b = time_conv['b'] t = arange(cycles_nr, dtype=float64) * time_a + time_b else: # get data group parents and dtypes try: parents, dtypes = group['parents'], group['types'] except KeyError: parents, dtypes = self._prepare_record(group) group['parents'], group['types'] = parents, dtypes # get data if data is None: data = self._load_group_data(group) try: parent, _ = parents[time_ch_nr] except KeyError: parent = None if parent is not None: not_found = object() record = group.get('record', not_found) if record is not_found: if dtypes.itemsize: record = fromstring(data, dtype=dtypes) else: record = None if memory == 'full': group['record'] = record t = record[parent] else: t = self._get_not_byte_aligned_data( data, group, time_ch_nr, ) # get timestamps if time_conv: if time_conv['conversion_type'] == v4c.CONVERSION_TYPE_LIN: time_a = time_conv['a'] time_b = time_conv['b'] t = t * time_a if time_b: t += time_b self._master_channel_cache[index] = t return t
[docs] def info(self): """get MDF information as a dict Examples -------- >>> mdf = MDF4('test.mdf') >>> mdf.info() """ info = {} info['version'] = self.identification['version_str']\ .decode('utf-8')\ .strip(' \n\t\0') info['groups'] = len(self.groups) for i, gp in enumerate(self.groups): if gp['data_location'] == v4c.LOCATION_ORIGINAL_FILE: stream = self._file elif gp['data_location'] == v4c.LOCATION_TEMPORARY_FILE: stream = self._tempfile inf = {} info['group {}'.format(i)] = inf inf['cycles'] = gp['channel_group']['cycles_nr'] inf['channels count'] = len(gp['channels']) for j, channel in enumerate(gp['channels']): if self.memory == 'minimum': channel = Channel( address=channel, stream=stream, ) name = TextBlock( address=gp['texts']['channels'][j]['name_addr'], stream=stream, ) name = name['text'].decode('utf-8').strip(' \r\t\n\0') name = name.split('\\')[0] channel.name = name else: name = channel.name ch_type = v4c.CHANNEL_TYPE_TO_DESCRIPTION[channel['channel_type']] inf['channel {}'.format(j)] = 'name="{}" type={}'.format(name, ch_type) return info
[docs] def save(self, dst='', overwrite=None, compression=0): """Save MDF to *dst*. If *dst* is not provided the the destination file name is the MDF name. If overwrite is *True* then the destination file is overwritten, otherwise the file name is appened with '_<cntr>', were '<cntr>' is the first conter that produces a new file name (that does not already exist in the filesystem) Parameters ---------- dst : str destination file name, Default '' overwrite : bool overwrite flag, default *False* compression : int use compressed data blocks, default 0; valid since version 4.10 * 0 - no compression * 1 - deflate (slower, but produces smaller files) * 2 - transposition + deflate (slowest, but produces the smallest files) """ if overwrite is None: overwrite = self._overwrite if self.name is None and dst == '': message = ('Must specify a destination file name ' 'for MDF created from scratch') raise MdfException(message) else: if self.memory == 'minimum': self._save_without_metadata(dst, overwrite, compression) else: self._save_with_metadata(dst, overwrite, compression)
def _save_with_metadata(self, dst, overwrite, compression): """Save MDF to *dst*. If *dst* is not provided the the destination file name is the MDF name. If overwrite is *True* then the destination file is overwritten, otherwise the file name is appened with '_<cntr>', were '<cntr>' is the first conter that produces a new file name (that does not already exist in the filesystem) Parameters ---------- dst : str destination file name, Default '' overwrite : bool overwrite flag, default *False* compression : int use compressed data blocks, default 0; valid since version 4.10 * 0 - no compression * 1 - deflate (slower, but produces smaller files) * 2 - transposition + deflate (slowest, but produces the smallest files) """ if self.name is None and dst == '': message = ('Must specify a destination file name ' 'for MDF created from scratch') raise MdfException(message) dst = dst if dst else self.name if overwrite is False: if os.path.isfile(dst): cntr = 0 while True: name = os.path.splitext(dst)[0] + '_{}.mf4'.format(cntr) if not os.path.isfile(name): break else: cntr += 1 message = ('Destination file "{}" already exists ' 'and "overwrite" is False. Saving MDF file as "{}"') message = message.format(dst, name) warnings.warn(message) dst = name if not self.file_history: comment = 'created' else: comment = 'updated' text = """<FHcomment> <TX>{}</TX> <tool_id>asammdf</tool_id> <tool_vendor>asammdf</tool_vendor> <tool_version>{}</tool_version> </FHcomment>""" text = text.format(comment, __version__) self.file_history.append( [FileHistory(), TextBlock(text=text, meta=True)] ) if self.memory == 'low' and dst == self.name: destination = dst + '.temp' else: destination = dst with open(destination, 'wb+') as dst_: defined_texts = {} write = dst_.write tell = dst_.tell seek = dst_.seek write(bytes(self.identification)) write(bytes(self.header)) original_data_addresses = [] if compression == 1: zip_type = v4c.FLAG_DZ_DEFLATE else: zip_type = v4c.FLAG_DZ_TRANPOSED_DEFLATE # write DataBlocks first for gp in self.groups: original_data_addresses.append( gp['data_group']['data_block_addr'] ) address = tell() data = self._load_group_data(gp) if MDF4._split_data_blocks: samples_size = gp['channel_group']['samples_byte_nr'] split_size = MDF4._split_threshold // samples_size split_size *= samples_size chunks = len(data) / split_size chunks = ceil(chunks) else: chunks = 1 if chunks == 1: if compression and self.version != '4.00': if compression == 1: param = 0 else: param = gp['channel_group']['samples_byte_nr'] kargs = { 'data': data, 'zip_type': zip_type, 'param': param, } data_block = DataZippedBlock(**kargs) else: data_block = DataBlock(data=data) write(bytes(data_block)) align = data_block['block_len'] % 8 if align: write(b'\0' * (8-align)) if gp['channel_group']['cycles_nr']: gp['data_group']['data_block_addr'] = address else: gp['data_group']['data_block_addr'] = 0 else: kargs = { 'flags': v4c.FLAG_DL_EQUAL_LENGHT, 'zip_type': zip_type, } hl_block = HeaderList(**kargs) kargs = { 'flags': v4c.FLAG_DL_EQUAL_LENGHT, 'links_nr': chunks + 1, 'data_block_nr': chunks, 'data_block_len': split_size, } dl_block = DataList(**kargs) data_blocks = [] for i in range(chunks): data_ = data[i*split_size: (i+1)*split_size] if compression and self.version != '4.00': if compression == 1: zip_type = v4c.FLAG_DZ_DEFLATE else: zip_type = v4c.FLAG_DZ_TRANPOSED_DEFLATE if compression == 1: param = 0 else: param = gp['channel_group']['samples_byte_nr'] kargs = { 'data': data_, 'zip_type': zip_type, 'param': param, } block = DataZippedBlock(**kargs) else: block = DataBlock(data=data_) address = tell() block.address = address data_blocks.append(block) write(bytes(block)) align = block['block_len'] % 8 if align: write(b'\0' * (8-align)) dl_block['data_block_addr{}'.format(i)] = address address = tell() dl_block.address = address write(bytes(dl_block)) if compression and self.version != '4.00': hl_block['first_dl_addr'] = address address = tell() hl_block.address = address write(bytes(hl_block)) gp['data_group']['data_block_addr'] = address address = tell() blocks = [] if self.file_comment: self.file_comment.address = address address += self.file_comment['block_len'] blocks.append(self.file_comment) # attachemnts if self.attachments: for at_block, texts in self.attachments: for key, text in texts.items(): at_block[key] = text.address = address address += text['block_len'] blocks.append(text) for at_block, texts in self.attachments: at_block.address = address blocks.append(at_block) align = at_block['block_len'] % 8 # append 8vyte alignemnt bytes for attachments if align % 8: blocks.append(b'\0' * (8 - align)) address += at_block['block_len'] + 8 - align else: address += at_block['block_len'] for i, (at_block, text) in enumerate(self.attachments[:-1]): at_block['next_at_addr'] = self.attachments[i+1][0].address self.attachments[-1][0]['next_at_addr'] = 0 # file history blocks for i, (fh, fh_text) in enumerate(self.file_history): fh_text.address = address blocks.append(fh_text) address += fh_text['block_len'] fh['comment_addr'] = fh_text.address for i, (fh, fh_text) in enumerate(self.file_history): fh.address = address address += fh['block_len'] blocks.append(fh) for i, (fh, fh_text) in enumerate(self.file_history[:-1]): fh['next_fh_addr'] = self.file_history[i+1][0].address self.file_history[-1][0]['next_fh_addr'] = 0 # data groups for gp in self.groups: gp['data_group'].address = address address += gp['data_group']['block_len'] blocks.append(gp['data_group']) gp['data_group']['comment_addr'] = 0 for i, dg in enumerate(self.groups[:-1]): addr_ = self.groups[i+1]['data_group'].address dg['data_group']['next_dg_addr'] = addr_ self.groups[-1]['data_group']['next_dg_addr'] = 0 tab_conversion = ( v4c.CONVERSION_TYPE_TABX, v4c.CONVERSION_TYPE_RTABX, v4c.CONVERSION_TYPE_TTAB, v4c.CONVERSION_TYPE_TRANS, ) # go through each data group and append the rest of the blocks for i, gp in enumerate(self.groups): # write TXBLOCK's for item_list in gp['texts'].values(): for dict_ in item_list: if dict_ is None: continue for key, tx_block in dict_.items(): # text blocks can be shared text = tx_block['text'] if text in defined_texts: tx_block.address = defined_texts[text] else: defined_texts[text] = address tx_block.address = address address += tx_block['block_len'] blocks.append(tx_block) # channel conversions for j, conv in enumerate(gp['channel_conversions']): if conv: conv.address = address conv_texts = gp['texts']['conversions'][j] if conv_texts: for key, text_block in conv_texts.items(): conv[key] = text_block.address conv['inv_conv_addr'] = 0 if conv['conversion_type'] in tab_conversion: for key in gp['texts']['conversion_tab'][j]: conv[key] = gp['texts']['conversion_tab'][j][key].address address += conv['block_len'] blocks.append(conv) # channel sources for j, source in enumerate(gp['channel_sources']): if source: source.address = address source_texts = gp['texts']['sources'][j] for key in ('name_addr', 'path_addr', 'comment_addr'): if source_texts and key in source_texts: source[key] = source_texts[key].address else: source[key] = 0 address += source['block_len'] blocks.append(source) # channel data gp_sd = gp['signal_data'] = [] for j, ch in enumerate(gp['channels']): signal_data = self._load_signal_data(ch['data_block_addr']) if signal_data: signal_data = SignalDataBlock(data=signal_data) signal_data.address = address address += signal_data['block_len'] blocks.append(signal_data) align = signal_data['block_len'] % 8 if align % 8: blocks.append(b'\0' * (8 - align)) address += 8 - align gp_sd.append(signal_data) else: gp_sd.append(None) # channel dependecies for j, dep_list in enumerate(gp['channel_dependencies']): if dep_list: if all(isinstance(dep, ChannelArrayBlock) for dep in dep_list): for dep in dep_list: dep.address = address address += dep['block_len'] blocks.append(dep) for k, dep in enumerate(dep_list[:-1]): dep['composition_addr'] = dep_list[k+1].address dep_list[-1]['composition_addr'] = 0 # channels for j, (channel, signal_data) in enumerate(zip(gp['channels'], gp['signal_data'])): channel.address = address channel_texts = gp['texts']['channels'][j] address += channel['block_len'] blocks.append(channel) for key in ('comment_addr', 'unit_addr'): if key in channel_texts: channel[key] = channel_texts[key].address else: channel[key] = 0 channel['name_addr'] = channel_texts['name_addr'].address if not gp['channel_conversions'][j]: channel['conversion_addr'] = 0 else: addr_ = gp['channel_conversions'][j].address channel['conversion_addr'] = addr_ if gp['channel_sources'][j]: addr_ = gp['channel_sources'][j].address channel['source_addr'] = addr_ else: channel['source_addr'] = 0 if signal_data: channel['data_block_addr'] = signal_data.address else: channel['data_block_addr'] = 0 if gp['channel_dependencies'][j]: addr_ = gp['channel_dependencies'][j][0].address channel['component_addr'] = addr_ group_channels = gp['channels'] if group_channels: for j, channel in enumerate(group_channels[:-1]): channel['next_ch_addr'] = group_channels[j+1].address group_channels[-1]['next_ch_addr'] = 0 # channel group gp['channel_group'].address = address gp['channel_group']['first_ch_addr'] = gp['channels'][0].address gp['channel_group']['next_cg_addr'] = 0 cg_texts = gp['texts']['channel_group'][0] for key in ('acq_name_addr', 'comment_addr'): if cg_texts and key in cg_texts: addr_ = gp['texts']['channel_group'][0][key].address gp['channel_group'][key] = addr_ gp['channel_group']['acq_source_addr'] = 0 gp['data_group']['first_cg_addr'] = address address += gp['channel_group']['block_len'] blocks.append(gp['channel_group']) for gp in self.groups: for dep_list in gp['channel_dependencies']: if dep_list: if all(isinstance(dep, ChannelArrayBlock) for dep in dep_list): for dep in dep_list: for i, (ch_nr, gp_nr) in enumerate(dep.referenced_channels): grp = self.groups[gp_nr] ch = grp['channels'][ch_nr] dep['scale_axis_{}_dg_addr'.format(i)] = grp['data_group'].address dep['scale_axis_{}_cg_addr'.format(i)] = grp['channel_group'].address dep['scale_axis_{}_ch_addr'.format(i)] = ch.address for block in blocks: write(bytes(block)) if self.groups: addr_ = self.groups[0]['data_group'].address self.header['first_dg_addr'] = addr_ else: self.header['first_dg_addr'] = 0 self.header['file_history_addr'] = self.file_history[0][0].address if self.attachments: addr_ = self.attachments[0][0].address self.header['first_attachment_addr'] = addr_ else: self.header['first_attachment_addr'] = 0 if self.file_comment: self.header['comment_addr'] = self.file_comment.address else: self.header['comment_addr'] = 0 seek(v4c.IDENTIFICATION_BLOCK_SIZE, v4c.SEEK_START) write(bytes(self.header)) for orig_addr, gp in zip(original_data_addresses, self.groups): gp['data_group']['data_block_addr'] = orig_addr if self.memory == 'low' and dst == self.name: self.close() os.remove(self.name) os.rename(destination, self.name) self.groups = [] self.header = None self.identification = None self.file_history = [] self.channels_db = {} self.masters_db = {} self.attachments = [] self.file_comment = None self._ch_map = {} self._master_channel_cache = {} self._tempfile = TemporaryFile() self._file = open(self.name, 'rb') self._read() def _save_without_metadata(self, dst, overwrite, compression): """Save MDF to *dst*. If *dst* is not provided the the destination file name is the MDF name. If overwrite is *True* then the destination file is overwritten, otherwise the file name is appened with '_<cntr>', were '<cntr>' is the first conter that produces a new file name (that does not already exist in the filesystem) Parameters ---------- dst : str destination file name, Default '' overwrite : bool overwrite flag, default *False* compression : int use compressed data blocks, default 0; valid since version 4.10 * 0 - no compression * 1 - deflate (slower, but produces smaller files) * 2 - transposition + deflate (slowest, but produces the smallest files) """ if self.name is None and dst == '': message = ('Must specify a destination file name ' 'for MDF created from scratch') raise MdfException(message) dst = dst if dst else self.name if overwrite is False: if os.path.isfile(dst): cntr = 0 while True: name = os.path.splitext(dst)[0] + '_{}.mf4'.format(cntr) if not os.path.isfile(name): break else: cntr += 1 message = ('Destination file "{}" already exists ' 'and "overwrite" is False. Saving MDF file as "{}"') message = message.format(dst, name) warnings.warn(message) dst = name if not self.file_history: comment = 'created' else: comment = 'updated' text = """<FHcomment> <TX>{}</TX> <tool_id>asammdf</tool_id> <tool_vendor>asammdf</tool_vendor> <tool_version>{}</tool_version> </FHcomment>""" text = text.format(comment, __version__) self.file_history.append( [FileHistory(), TextBlock(text=text, meta=True)] ) if dst == self.name: destination = dst + '.temp' else: destination = dst with open(destination, 'wb+') as dst_: defined_texts = {} write = dst_.write tell = dst_.tell seek = dst_.seek write(bytes(self.identification)) write(bytes(self.header)) original_data_addresses = [] if compression == 1: zip_type = v4c.FLAG_DZ_DEFLATE else: zip_type = v4c.FLAG_DZ_TRANPOSED_DEFLATE # write DataBlocks first for gp in self.groups: original_data_addresses.append( gp['data_group']['data_block_addr'] ) address = tell() data = self._load_group_data(gp) if MDF4._split_data_blocks: samples_size = gp['channel_group']['samples_byte_nr'] split_size = MDF4._split_threshold // samples_size split_size *= samples_size chunks = len(data) / split_size chunks = ceil(chunks) else: chunks = 1 if chunks == 1: if compression and self.version != '4.00': if compression == 1: param = 0 else: param = gp['channel_group']['samples_byte_nr'] kargs = { 'data': data, 'zip_type': zip_type, 'param': param, } data_block = DataZippedBlock(**kargs) else: data_block = DataBlock(data=data) write(bytes(data_block)) align = data_block['block_len'] % 8 if align: write(b'\0' * (8-align)) if gp['channel_group']['cycles_nr']: gp['data_group']['data_block_addr'] = address else: gp['data_group']['data_block_addr'] = 0 else: kargs = { 'flags': v4c.FLAG_DL_EQUAL_LENGHT, 'zip_type': zip_type, } hl_block = HeaderList(**kargs) kargs = { 'flags': v4c.FLAG_DL_EQUAL_LENGHT, 'links_nr': chunks + 1, 'data_block_nr': chunks, 'data_block_len': split_size, } dl_block = DataList(**kargs) data_blocks = [] for i in range(chunks): data_ = data[i*split_size: (i+1)*split_size] if compression and self.version != '4.00': if compression == 1: zip_type = v4c.FLAG_DZ_DEFLATE else: zip_type = v4c.FLAG_DZ_TRANPOSED_DEFLATE if compression == 1: param = 0 else: param = gp['channel_group']['samples_byte_nr'] kargs = { 'data': data_, 'zip_type': zip_type, 'param': param, } block = DataZippedBlock(**kargs) else: block = DataBlock(data=data_) address = tell() block.address = address data_blocks.append(block) write(bytes(block)) align = block['block_len'] % 8 if align: write(b'\0' * (8-align)) dl_block['data_block_addr{}'.format(i)] = address address = tell() dl_block.address = address write(bytes(dl_block)) if compression and self.version != '4.00': hl_block['first_dl_addr'] = address address = tell() hl_block.address = address write(bytes(hl_block)) gp['data_group']['data_block_addr'] = address address = tell() if self.file_comment: address = tell() self.file_comment.address = address write(bytes(self.file_comment)) # attachemnts address = tell() blocks = [] if self.attachments: for at_block, texts in self.attachments: for key, text in texts.items(): at_block[key] = text.address = address address += text['block_len'] blocks.append(text) for at_block, texts in self.attachments: at_block.address = address blocks.append(at_block) align = at_block['block_len'] % 8 # append 8vyte alignemnt bytes for attachments if align % 8: blocks.append(b'\0' * (8 - align)) address += at_block['block_len'] + 8 - align else: address += at_block['block_len'] for i, (at_block, text) in enumerate(self.attachments[:-1]): at_block['next_at_addr'] = self.attachments[i+1][0].address self.attachments[-1][0]['next_at_addr'] = 0 # file history blocks for i, (fh, fh_text) in enumerate(self.file_history): fh_text.address = address blocks.append(fh_text) address += fh_text['block_len'] fh['comment_addr'] = fh_text.address for i, (fh, fh_text) in enumerate(self.file_history): fh.address = address address += fh['block_len'] blocks.append(fh) for i, (fh, fh_text) in enumerate(self.file_history[:-1]): fh['next_fh_addr'] = self.file_history[i+1][0].address self.file_history[-1][0]['next_fh_addr'] = 0 for blk in blocks: write(bytes(blk)) blocks = [] tab_conversion = ( v4c.CONVERSION_TYPE_TABX, v4c.CONVERSION_TYPE_RTABX, v4c.CONVERSION_TYPE_TTAB, v4c.CONVERSION_TYPE_TRANS, ) # go through each data group and append the rest of the blocks for i, gp in enumerate(self.groups): if gp['data_location'] == v4c.LOCATION_ORIGINAL_FILE: stream = self._file else: stream = self._tempfile temp_texts = deepcopy(gp['texts']) # write TXBLOCK's for item_list in temp_texts.values(): for dict_ in item_list: if dict_ is None: continue for key, tx_block in dict_.items(): # text blocks can be shared block = TextBlock( address=tx_block, stream=stream, ) text = block['text'] if text in defined_texts: dict_[key] = defined_texts[text] else: address = tell() defined_texts[text] = address dict_[key] = address write(bytes(block)) # channel conversions gp['temp_channel_conversions'] = [] for j, conv in enumerate(gp['channel_conversions']): if conv: address = tell() gp['temp_channel_conversions'].append(address) conv = ChannelConversion( address=conv, stream=stream, ) conv.address = address conv_texts = temp_texts['conversions'][j] if conv_texts: for key, text_block in conv_texts.items(): conv[key] = text_block conv['inv_conv_addr'] = 0 if conv['conversion_type'] in tab_conversion: for key in temp_texts['conversion_tab'][j]: conv[key] = temp_texts['conversion_tab'][j][key] write(bytes(conv)) else: gp['temp_channel_conversions'].append(0) # channel sources gp['temp_channel_sources'] = [] for j, source in enumerate(gp['channel_sources']): if source: address = tell() gp['temp_channel_sources'].append(address) source_texts = temp_texts['sources'][j] source = SourceInformation( address=source, stream=stream, ) for key in ('name_addr', 'path_addr', 'comment_addr'): if source_texts and key in source_texts: source[key] = source_texts[key] else: source[key] = 0 write(bytes(source)) else: gp['temp_channel_sources'].append(0) # channel dependecies for j, dep_list in enumerate(gp['channel_dependencies']): if dep_list: if all(isinstance(dep, ChannelArrayBlock) for dep in dep_list): for dep in dep_list: address = tell() dep.address = address write(bytes(dep)) for k, dep in enumerate(dep_list[:-1]): dep['composition_addr'] = dep_list[k+1].address dep_list[-1]['composition_addr'] = 0 # channels blocks = [] chans = [] address = tell() gp['temp_channels'] = ch_addrs = [] gp['channel_group']['first_ch_addr'] = address for j, channel in enumerate(gp['channels']): channel = Channel( address=channel, stream=stream, ) channel.address = address channel_texts = temp_texts['channels'][j] ch_addrs.append(address) address += channel['block_len'] blocks.append(channel) chans.append(channel) for key in ('comment_addr', 'unit_addr'): if key in channel_texts: channel[key] = channel_texts[key] else: channel[key] = 0 channel['name_addr'] = channel_texts['name_addr'] channel['conversion_addr'] = gp['temp_channel_conversions'][j] channel['source_addr'] = gp['temp_channel_sources'][j] signal_data = self._load_signal_data(channel['data_block_addr']) if signal_data: signal_data = SignalDataBlock(data=signal_data) blocks.append(signal_data) channel['data_block_addr'] = address address += signal_data['block_len'] align = signal_data['block_len'] % 8 if align % 8: blocks.append(b'\0' * (8 - align)) address += 8 - align else: channel['data_block_addr'] = 0 if gp['channel_dependencies'][j]: block = gp['channel_dependencies'][j][0] if isinstance(block, int): channel['component_addr'] = block else: channel['component_addr'] = block.address group_channels = gp['channels'] if group_channels: for j, channel in enumerate(chans[:-1]): channel['next_ch_addr'] = chans[j+1].address chans[-1]['next_ch_addr'] = 0 for block in blocks: write(bytes(block)) blocks = [] chans = [] address = tell() # channel group gp['channel_group'].address = address # gp['channel_group']['first_ch_addr'] = gp['channels'][0] gp['channel_group']['next_cg_addr'] = 0 cg_texts = temp_texts['channel_group'][0] for key in ('acq_name_addr', 'comment_addr'): if cg_texts and key in cg_texts: addr_ = temp_texts['channel_group'][0][key] gp['channel_group'][key] = addr_ gp['channel_group']['acq_source_addr'] = 0 gp['data_group']['first_cg_addr'] = address write(bytes(gp['channel_group'])) address = tell() del gp['temp_channel_sources'] del gp['temp_channel_conversions'] temp_texts = None blocks = [] address = tell() # data groups for gp in self.groups: gp['data_group'].address = address address += gp['data_group']['block_len'] blocks.append(gp['data_group']) gp['data_group']['comment_addr'] = 0 for i, dg in enumerate(self.groups[:-1]): addr_ = self.groups[i+1]['data_group'].address dg['data_group']['next_dg_addr'] = addr_ self.groups[-1]['data_group']['next_dg_addr'] = 0 for block in blocks: write(bytes(block)) if self.groups: addr_ = self.groups[0]['data_group'].address self.header['first_dg_addr'] = addr_ else: self.header['first_dg_addr'] = 0 self.header['file_history_addr'] = self.file_history[0][0].address if self.attachments: addr_ = self.attachments[0][0].address self.header['first_attachment_addr'] = addr_ else: self.header['first_attachment_addr'] = 0 if self.file_comment: self.header['comment_addr'] = self.file_comment.address else: self.header['comment_addr'] = 0 seek(v4c.IDENTIFICATION_BLOCK_SIZE, v4c.SEEK_START) write(bytes(self.header)) for orig_addr, gp in zip(original_data_addresses, self.groups): gp['data_group']['data_block_addr'] = orig_addr for gp in self.groups: for dep_list in gp['channel_dependencies']: if dep_list: if all(isinstance(dep, ChannelArrayBlock) for dep in dep_list): for dep in dep_list: for i, (ch_nr, gp_nr) in enumerate(dep.referenced_channels): grp = self.groups[gp_nr] stream.seek(0, v4c.SEEK_END) dep['scale_axis_{}_dg_addr'.format(i)] = grp['data_group'].address dep['scale_axis_{}_cg_addr'.format(i)] = grp['channel_group'].address dep['scale_axis_{}_ch_addr'.format(i)] = grp['temp_channels'][ch_nr] seek(dep.address, v4c.SEEK_START) write(bytes(dep)) for gp in self.groups: del gp['temp_channels'] if dst == self.name: self.close() os.remove(self.name) os.rename(destination, self.name) self.groups = [] self.header = None self.identification = None self.file_history = [] self.channels_db = {} self.masters_db = {} self.attachments = [] self.file_comment = None self._ch_map = {} self._master_channel_cache = {} self._tempfile = TemporaryFile() self._file = open(self.name, 'rb') self._read()