# -*- coding: utf-8 -*-
"""
ASAM MDF version 4 file format module
"""
from __future__ import print_function, division
import sys
import time
import warnings
import os
from tempfile import TemporaryFile
from struct import unpack, unpack_from
from functools import reduce, partial
from collections import defaultdict
from hashlib import md5
import xml.etree.ElementTree as XML
from math import ceil
from copy import deepcopy
from numpy import (
interp,
linspace,
dtype,
array_equal,
zeros,
uint8,
array,
searchsorted,
clip,
union1d,
float64,
frombuffer,
argwhere,
arange,
flip,
unpackbits,
packbits,
roll,
transpose,
)
from numpy.core.records import fromstring, fromarrays
from numpy.core.defchararray import encode
from numexpr import evaluate
from .v4blocks import (
AttachmentBlock,
Channel,
ChannelArrayBlock,
ChannelGroup,
ChannelConversion,
DataBlock,
DataZippedBlock,
DataGroup,
DataList,
FileHistory,
FileIdentificationBlock,
HeaderBlock,
HeaderList,
SignalDataBlock,
SourceInformation,
TextBlock,
)
from . import v4constants as v4c
from .utils import (
MdfException,
get_fmt,
fmt_to_datatype,
get_unique_name,
get_min_max,
fix_dtype_fields,
)
from .signal import Signal
from .version import __version__
get_fmt = partial(get_fmt, version=4)
fmt_to_datatype = partial(fmt_to_datatype, version=4)
MASTER_CHANNELS = (
v4c.CHANNEL_TYPE_MASTER,
v4c.CHANNEL_TYPE_VIRTUAL_MASTER,
)
PYVERSION = sys.version_info[0]
if PYVERSION == 2:
from .utils import bytes
__all__ = ['MDF4', ]
[docs]class MDF4(object):
"""If the *name* exist it will be memorised otherwise an empty file will be
created that can be later saved to disk
Parameters
----------
name : string
mdf file name
memory : str
memory optimization option; default `full`
* if *full* the data group binary data block will be memorised in RAM
* if *low* the channel data is read from disk on request, and the
metadata is memorized into RAM
* if *minimum* only minimal data is memorized into RAM
version : string
mdf file version ('4.00', '4.10', '4.11'); default '4.10'
Attributes
----------
name : string
mdf file name
groups : list
list of data groups
header : HeaderBlock
mdf file header
file_history : list
list of (FileHistory, TextBlock) pairs
comment : TextBlock
mdf file comment
identification : FileIdentificationBlock
mdf file start block
memory : str
memory optimization option
version : str
mdf version
channels_db : dict
used for fast channel access by name; for each name key the value is a
list of (group index, channel index) tuples
masters_db : dict
used for fast master channel access; for each group index key the value
is the master channel index
"""
_compact_integers_on_append = False
_split_data_blocks = False
_split_threshold = 1 << 21
_overwrite = False
def __init__(self, name=None, memory='full', version='4.10'):
self.groups = []
self.header = None
self.identification = None
self.file_history = []
self.name = name
self.memory = memory
self.channels_db = {}
self.masters_db = {}
self.attachments = []
self.file_comment = None
self._ch_map = {}
self._master_channel_cache = {}
# used for appending when memory=False
self._tempfile = TemporaryFile()
self._file = None
self._tempfile.write(b'\0')
if name:
self._file = open(self.name, 'rb')
self._read()
else:
self.header = HeaderBlock()
self.identification = FileIdentificationBlock(version=version)
self.version = version
def _check_finalised(self):
flags = self.identification['unfinalized_standard_flags']
if flags & 1:
message = ('Unfinalised file {}:'
'Update of cycle counters for CG/CA blocks required')
warnings.warn(message.format(self.name))
elif flags & 1 << 1:
message = ('Unfinalised file {}:'
'Update of cycle counters for SR blocks required')
warnings.warn(message.format(self.name))
elif flags & 1 << 2:
message = ('Unfinalised file {}:'
'Update of length for last DT block required')
warnings.warn(message.format(self.name))
elif flags & 1 << 3:
message = ('Unfinalised file {}:'
'Update of length for last RD block required')
warnings.warn(message.format(self.name))
elif flags & 1 << 4:
message = ('Unfinalised file {}:'
'Update of last DL block in each chained list'
'of DL blocks required')
warnings.warn(message.format(self.name))
elif flags & 1 << 5:
message = ('Unfinalised file {}:'
'Update of cg_data_bytes and cg_inval_bytes '
'in VLSD CG block required')
warnings.warn(message.format(self.name))
elif flags & 1 << 6:
message = ('Unfinalised file {}:'
'Update of offset values for VLSD channel required '
'in case a VLSD CG block is used')
warnings.warn(message.format(self.name))
def _read(self):
stream = self._file
memory = self.memory
dg_cntr = 0
self.identification = FileIdentificationBlock(stream=stream)
version = self.identification['version_str']
self.version = version.decode('utf-8').strip(' \n\t\0')
if self.version in ('4.10', '4.11'):
self._check_finalised()
self.header = HeaderBlock(address=0x40, stream=stream)
# read file comment
if self.header['comment_addr']:
self.file_comment = TextBlock(
address=self.header['comment_addr'],
stream=stream,
)
# read file history
fh_addr = self.header['file_history_addr']
while fh_addr:
fh = FileHistory(address=fh_addr, stream=stream)
fh_text = TextBlock(address=fh['comment_addr'], stream=stream)
self.file_history.append((fh, fh_text))
fh_addr = fh['next_fh_addr']
# read attachments
at_addr = self.header['first_attachment_addr']
while at_addr:
texts = {}
at_block = AttachmentBlock(address=at_addr, stream=stream)
for key in ('file_name_addr', 'mime_addr', 'comment_addr'):
addr = at_block[key]
if addr:
texts[key] = TextBlock(address=addr, stream=stream)
self.attachments.append((at_block, texts))
at_addr = at_block['next_at_addr']
# go to first date group and read each data group sequentially
dg_addr = self.header['first_dg_addr']
while dg_addr:
new_groups = []
group = DataGroup(address=dg_addr, stream=stream)
record_id_nr = group['record_id_len']
# go to first channel group of the current data group
cg_addr = group['first_cg_addr']
cg_nr = 0
cg_size = {}
while cg_addr:
cg_nr += 1
grp = {}
new_groups.append(grp)
grp['channels'] = []
grp['channel_conversions'] = []
grp['channel_sources'] = []
grp['signal_data'] = []
grp['data_block'] = None
grp['channel_dependencies'] = []
# channel_group is lsit to allow uniform handling of all texts
# in save method
grp['texts'] = {
'channels': [],
'sources': [],
'conversions': [],
'conversion_tab': [],
'channel_group': [],
}
# read each channel group sequentially
block = ChannelGroup(address=cg_addr, stream=stream)
channel_group = grp['channel_group'] = block
grp['record_size'] = cg_size
if channel_group['flags'] == 0:
samples_size = channel_group['samples_byte_nr']
inval_size = channel_group['invalidation_bytes_nr']
record_id = channel_group['record_id']
cg_size[record_id] = samples_size + inval_size
else:
# VLDS flags
record_id = channel_group['record_id']
cg_size[record_id] = 0
if record_id_nr:
grp['sorted'] = False
else:
grp['sorted'] = True
data_group = DataGroup(address=dg_addr, stream=stream)
grp['data_group'] = data_group
# read acquisition name and comment for current channel group
channel_group_texts = {}
for key in ('acq_name_addr', 'comment_addr'):
address = channel_group[key]
if address:
block = TextBlock(address=address, stream=stream)
if memory == 'minimum':
channel_group_texts[key] = address
else:
channel_group_texts[key] = block
if channel_group_texts:
grp['texts']['channel_group'].append(channel_group_texts)
else:
grp['texts']['channel_group'].append(None)
# go to first channel of the current channel group
ch_addr = channel_group['first_ch_addr']
ch_cntr = 0
# Read channels by walking recursively in the channel group
# starting from the first channel
self._read_channels(
ch_addr,
grp,
stream,
dg_cntr,
ch_cntr,
)
cg_addr = channel_group['next_cg_addr']
dg_cntr += 1
# store channel groups record sizes dict in each
# new group data belong to the initial unsorted group, and add
# the key 'sorted' with the value False to use a flag;
# this is used later if memory=False
if memory == 'full':
grp['data_location'] = v4c.LOCATION_MEMORY
dat_addr = group['data_block_addr']
data = self._read_data_block(address=dat_addr, stream=stream)
if record_id_nr == 0:
grp = new_groups[0]
grp['data_location'] = v4c.LOCATION_MEMORY
grp['data_block'] = DataBlock(data=data)
else:
cg_data = defaultdict(list)
i = 0
size = len(data)
while i < size:
rec_id = data[i]
# skip record id
i += 1
rec_size = cg_size[rec_id]
if rec_size:
rec_data = data[i: i+rec_size]
cg_data[rec_id].append(rec_data)
else:
rec_size = unpack('<I', data[i: i+4])[0]
i += 4
rec_data = data[i: i + rec_size]
cg_data[rec_id].append(rec_data)
# if 2 record id's are used skip also the second one
if record_id_nr == 2:
i += rec_size + 1
else:
i += rec_size
for grp in new_groups:
grp['data_location'] = v4c.LOCATION_MEMORY
record_id = grp['channel_group']['record_id']
data = b''.join(cg_data[record_id])
grp['channel_group']['record_id'] = 1
grp['data_block'] = DataBlock(data=data)
else:
for grp in new_groups:
grp['data_location'] = v4c.LOCATION_ORIGINAL_FILE
self.groups.extend(new_groups)
dg_addr = group['next_dg_addr']
for grp in self.groups:
for dep_list in grp['channel_dependencies']:
if not dep_list:
continue
for dep in dep_list:
if isinstance(dep, Channel):
break
else:
conditions = (
dep['ca_type'] == v4c.CA_TYPE_LOOKUP,
dep['links_nr'] == 4 * dep['dims'] + 1,
)
if not all(conditions):
continue
for i in range(dep['dims']):
ch_addr = dep['scale_axis_{}_ch_addr'.format(i)]
ref_channel = self._ch_map[ch_addr]
dep.referenced_channels.append(ref_channel)
if self.memory == 'full':
self.close()
def _read_channels(
self,
ch_addr,
grp,
stream,
dg_cntr,
ch_cntr,
channel_composition=False):
memory = self.memory
channels = grp['channels']
composition = []
while ch_addr:
# read channel block and create channel object
channel = Channel(address=ch_addr, stream=stream)
self._ch_map[ch_addr] = (ch_cntr, dg_cntr)
if memory == 'minimum':
value = ch_addr
else:
value = channel
channels.append(value)
if channel_composition:
composition.append(value)
# read conversion block and create channel conversion object
address = channel['conversion_addr']
if address:
conv = ChannelConversion(address=address, stream=stream)
else:
conv = None
if memory == 'minimum':
grp['channel_conversions'].append(address)
else:
grp['channel_conversions'].append(conv)
conv_tabx_texts = {}
# read text fields for channel conversions
conv_texts = {}
if conv:
for key in ('name_addr', 'unit_addr', 'comment_addr'):
address = conv[key]
if address:
if memory == 'minimum':
conv_texts[key] = address
else:
conv_texts[key] = TextBlock(
address=address,
stream=stream,
)
if 'formula_addr' in conv:
address = conv['formula_addr']
if memory == 'minimum':
conv_texts['formula_addr'] = address
else:
if address:
block = TextBlock(address=address, stream=stream)
conv_texts['formula_addr'] = block
else:
conv_texts['formula_addr'] = None
if conv['conversion_type'] in v4c.TABULAR_CONVERSIONS:
if conv['conversion_type'] == v4c.CONVERSION_TYPE_TTAB:
tabs = conv['links_nr'] - 4
else:
tabs = conv['links_nr'] - 4 - 1
for i in range(tabs):
address = conv['text_{}'.format(i)]
if memory == 'minimum':
conv_tabx_texts['text_{}'.format(i)] = address
else:
if address:
block = TextBlock(
address=address,
stream=stream,
)
conv_tabx_texts['text_{}'.format(i)] = block
else:
conv_tabx_texts['text_{}'.format(i)] = None
if not conv['conversion_type'] == v4c.CONVERSION_TYPE_TTAB:
address = conv.get('default_addr', 0)
if address:
if memory == 'minimum':
conv_tabx_texts['default_addr'] = address
else:
stream.seek(address, v4c.SEEK_START)
blk_id = stream.read(4)
if blk_id == b'##TX':
block = TextBlock(
address=address,
stream=stream,
)
conv_tabx_texts['default_addr'] = block
elif blk_id == b'##CC':
block = ChannelConversion(
address=address,
stream=stream,
)
text = str(time.clock()).encode('utf-8')
default_text = block
default_text['text'] = text
conv['unit_addr'] = default_text['unit_addr']
default_text['unit_addr'] = 0
elif conv['conversion_type'] == v4c.CONVERSION_TYPE_TRANS:
# link_nr - common links (4) - default text link (1)
for i in range((conv['links_nr'] - 4 - 1) // 2):
for key in ('input_{}_addr'.format(i),
'output_{}_addr'.format(i)):
address = conv[key]
if address:
if memory == 'minimum':
conv_tabx_texts[key] = address
else:
block = TextBlock(
address=address,
stream=stream,
)
conv_tabx_texts[key] = block
address = conv['default_addr']
if address:
if memory == 'minimum':
conv_tabx_texts['default_addr'] = address
else:
block = TextBlock(
address=address,
stream=stream,
)
conv_tabx_texts['default_addr'] = block
if conv_tabx_texts:
grp['texts']['conversion_tab'].append(conv_tabx_texts)
else:
grp['texts']['conversion_tab'].append(None)
if conv_texts:
grp['texts']['conversions'].append(conv_texts)
else:
grp['texts']['conversions'].append(None)
# read source block and create source information object
source_texts = {}
address = channel['source_addr']
if address:
source = SourceInformation(
address=address,
stream=stream,
)
if memory == 'minimum':
grp['channel_sources'].append(address)
else:
grp['channel_sources'].append(source)
# read text fields for channel sources
for key in ('name_addr',
'path_addr',
'comment_addr'):
address = source[key]
if address:
if memory == 'minimum':
source_texts[key] = address
else:
block = TextBlock(address=address, stream=stream)
source_texts[key] = block
if source_texts:
grp['texts']['sources'].append(source_texts)
else:
grp['texts']['sources'].append(None)
else:
if memory == 'minimum':
grp['channel_sources'].append(0)
else:
grp['channel_sources'].append(None)
grp['texts']['sources'].append(None)
# read text fields for channel
channel_texts = {}
grp['texts']['channels'].append(channel_texts)
for key in ('name_addr', 'comment_addr', 'unit_addr'):
address = channel[key]
if address:
if memory == 'minimum':
channel_texts[key] = address
else:
channel_texts[key] = TextBlock(
address=address,
stream=stream,
)
# update channel object name and block_size attributes
if memory == 'minimum':
block = TextBlock(
address=channel_texts['name_addr'],
stream=stream,
)
name = block['text'].decode('utf-8')
else:
name = channel_texts['name_addr']['text'].decode('utf-8')
name = name.split('\\')[0]
name = name.strip(' \t\n\r\0')
channel.name = name
if name not in self.channels_db:
self.channels_db[name] = []
self.channels_db[name].append((dg_cntr, ch_cntr))
if channel['channel_type'] in MASTER_CHANNELS:
self.masters_db[dg_cntr] = ch_cntr
ch_cntr += 1
if channel['component_addr']:
# check if it is a CABLOCK or CNBLOCK
stream.seek(channel['component_addr'], v4c.SEEK_START)
blk_id = stream.read(4)
if blk_id == b'##CN':
index = ch_cntr - 1
grp['channel_dependencies'].append(None)
ch_cntr, composition = self._read_channels(
channel['component_addr'],
grp,
stream,
dg_cntr,
ch_cntr,
True,
)
grp['channel_dependencies'][index] = composition
else:
# only channel arrays with storage=CN_TEMPLATE are
# supported so far
ca_block = ChannelArrayBlock(
address=channel['component_addr'],
stream=stream,
)
if ca_block['storage'] != v4c.CA_STORAGE_TYPE_CN_TEMPLATE:
warnings.warn('Only CN template arrays are supported')
ca_list = [ca_block, ]
while ca_block['composition_addr']:
ca_block = ChannelArrayBlock(
address=ca_block['composition_addr'],
stream=stream,
)
ca_list.append(ca_block)
grp['channel_dependencies'].append(ca_list)
else:
grp['channel_dependencies'].append(None)
# go to next channel of the current channel group
ch_addr = channel['next_ch_addr']
return ch_cntr, composition
def _read_data_block(self, address, stream):
"""read and aggregate data blocks for a given data group
Returns
-------
data : bytes
aggregated raw data
"""
if address:
stream.seek(address, v4c.SEEK_START)
id_string = stream.read(4)
# can be a DataBlock
if id_string == b'##DT':
data = DataBlock(address=address, stream=stream)
data = data['data']
# or a DataZippedBlock
elif id_string == b'##DZ':
data = DataZippedBlock(address=address, stream=stream)
data = data['data']
# or a DataList
elif id_string == b'##DL':
data = []
while address:
dl = DataList(address=address, stream=stream)
for i in range(dl['links_nr'] - 1):
addr = dl['data_block_addr{}'.format(i)]
stream.seek(addr, v4c.SEEK_START)
id_string = stream.read(4)
if id_string == b'##DT':
block = DataBlock(stream=stream, address=addr)
data.append(block['data'])
elif id_string == b'##DZ':
block = DataZippedBlock(
stream=stream,
address=addr,
)
data.append(block['data'])
elif id_string == b'##DL':
data.append(
self._read_data_block(
address=addr,
stream=stream,
)
)
address = dl['next_dl_addr']
data = b''.join(data)
# or a header list
elif id_string == b'##HL':
hl = HeaderList(address=address, stream=stream)
data = self._read_data_block(
address=hl['first_dl_addr'],
stream=stream,
)
else:
data = b''
return data
def _load_group_data(self, group):
""" get group's data block bytes """
if self.memory == 'full':
data = group['data_block']['data']
else:
# could be an appended group
# for now appended groups keep the measured data in the memory.
# the plan is to use a temp file for appended groups, to keep the
# memory usage low.
data_group = group['data_group']
channel_group = group['channel_group']
if group['data_location'] == v4c.LOCATION_ORIGINAL_FILE:
# go to the first data block of the current data group
stream = self._file
dat_addr = data_group['data_block_addr']
data = self._read_data_block(
address=dat_addr,
stream=stream,
)
if not group['sorted']:
cg_data = []
cg_size = group['record_size']
record_id = channel_group['record_id']
if data_group['record_id_len'] <= 2:
record_id_nr = data_group['record_id_len']
else:
record_id_nr = 0
i = 0
size = len(data)
while i < size:
rec_id = data[i]
# skip record id
i += 1
rec_size = cg_size[rec_id]
if rec_size:
if rec_id == record_id:
rec_data = data[i: i+rec_size]
cg_data.append(rec_data)
else:
rec_size = unpack('<I', data[i: i+4])[0]
i += 4
if rec_id == record_id:
rec_data = data[i: i + rec_size]
cg_data.append(rec_data)
# consider the second record ID byte
if record_id_nr == 2:
i += rec_size + 1
else:
i += rec_size
data = b''.join(cg_data)
elif group['data_location'] == v4c.LOCATION_TEMPORARY_FILE:
dat_addr = data_group['data_block_addr']
if dat_addr:
cycles_nr = channel_group['cycles_nr']
samples_byte_nr = channel_group['samples_byte_nr']
size = cycles_nr * samples_byte_nr
self._tempfile.seek(dat_addr, v4c.SEEK_START)
data = self._tempfile.read(size)
else:
data = b''
return data
def _load_signal_data(self, address):
""" this method is used to get the channel signal data, usually for
VLSD channels
"""
if address:
if self._file.closed:
self._file = open(self.name, 'rb')
stream = self._file
stream.seek(address, v4c.SEEK_START)
blk_id = stream.read(4)
if blk_id == b'##SD':
data = SignalDataBlock(address=address, stream=stream)
data = data['data']
elif blk_id == b'##DZ':
data = DataZippedBlock(address=address, stream=stream)
data = data['data']
elif blk_id == b'##DL':
data = []
while address:
# the data list will contain only links to SDBLOCK's
data_list = DataList(address=address, stream=stream)
nr = data_list['links_nr']
# aggregate data from all SDBLOCK
for i in range(nr-1):
addr = data_list['data_block_addr{}'.format(i)]
stream.seek(addr, v4c.SEEK_START)
blk_id = stream.read(4)
if blk_id == b'##SD':
block = SignalDataBlock(
address=addr,
stream=stream,
)
data.append(block['data'])
elif blk_id == b'##DZ':
block = DataZippedBlock(
address=addr,
stream=stream,
)
data.append(block['data'])
else:
message = ('Expected SD, DZ or DL block at {} '
'but found id="{}"')
message = message.format(hex(address), blk_id)
warnings.warn(message)
return
address = data_list['next_dl_addr']
data = b''.join(data)
elif blk_id == b'##CN':
data = b''
else:
message = ('Expected SD, DL, DZ or CN block at {} '
'but found id="{}"')
message = message.format(hex(address), blk_id)
warnings.warn(message)
data = b''
if self.memory == 'full':
self.close()
else:
data = b''
return data
def _prepare_record(self, group):
""" compute record dtype and parents dict fro this group
Parameters
----------
group : dict
MDF group dict
Returns
-------
parents, dtypes : dict, numpy.dtype
mapping of channels to records fields, records fields dtype
"""
grp = group
stream = self._file
memory = self.memory
channel_group = grp['channel_group']
if memory == 'minimum':
channels = [
Channel(address=ch_addr, stream=stream)
for ch_addr in grp['channels']
]
else:
channels = grp['channels']
record_size = channel_group['samples_byte_nr']
invalidation_bytes_nr = channel_group['invalidation_bytes_nr']
next_byte_aligned_position = 0
types = []
current_parent = ""
parent_start_offset = 0
parents = {}
group_channels = set()
sortedchannels = sorted(enumerate(channels), key=lambda i: i[1])
for original_index, new_ch in sortedchannels:
start_offset = new_ch['byte_offset']
bit_offset = new_ch['bit_offset']
data_type = new_ch['data_type']
bit_count = new_ch['bit_count']
ch_type = new_ch['channel_type']
dependency_list = grp['channel_dependencies'][original_index]
if memory == 'minimum':
block = TextBlock(
address=grp['texts']['channels']
[original_index]['name_addr'],
stream=stream,
)
name = block['text'].decode('utf-8').strip(' \r\n\t\0')
name = name.split('\\')[0]
else:
name = new_ch.name
# handle multiple occurance of same channel name
name = get_unique_name(group_channels, name)
group_channels.add(name)
if start_offset >= next_byte_aligned_position:
if ch_type not in (v4c.CHANNEL_TYPE_VIRTUAL_MASTER,
v4c.CHANNEL_TYPE_VIRTUAL):
if not dependency_list:
parent_start_offset = start_offset
# check if there are byte gaps in the record
gap = parent_start_offset - next_byte_aligned_position
if gap:
types.append(('', 'a{}'.format(gap)))
# adjust size to 1, 2, 4 or 8 bytes
size = bit_offset + bit_count
if data_type not in (v4c.DATA_TYPE_BYTEARRAY,
v4c.DATA_TYPE_STRING_UTF_8,
v4c.DATA_TYPE_STRING_LATIN_1,
v4c.DATA_TYPE_STRING_UTF_16_BE,
v4c.DATA_TYPE_STRING_UTF_16_LE,
v4c.DATA_TYPE_CANOPEN_TIME,
v4c.DATA_TYPE_CANOPEN_DATE):
if size > 32:
size = 8
elif size > 16:
size = 4
elif size > 8:
size = 2
else:
size = 1
else:
size = size >> 3
next_byte_aligned_position = parent_start_offset + size
if next_byte_aligned_position <= record_size:
dtype_pair = name, get_fmt(data_type, size)
types.append(dtype_pair)
parents[original_index] = name, bit_offset
current_parent = name
else:
if isinstance(dependency_list[0], ChannelArrayBlock):
ca_block = dependency_list[0]
# check if there are byte gaps in the record
gap = start_offset - next_byte_aligned_position
if gap:
dtype_pair = '', 'a{}'.format(gap)
types.append(dtype_pair)
size = bit_count >> 3
shape = tuple(
ca_block['dim_size_{}'.format(i)]
for i in range(ca_block['dims'])
)
if ca_block['byte_offset_base'] // size > 1 and \
len(shape) == 1:
shape += ca_block['byte_offset_base'] // size,
dim = 1
for d in shape:
dim *= d
dtype_pair = name, get_fmt(data_type, size), shape
types.append(dtype_pair)
current_parent = name
next_byte_aligned_position = start_offset + size * dim
parents[original_index] = name, 0
else:
parents[original_index] = None, None
# virtual channels do not have bytes in the record
else:
parents[original_index] = None, None
else:
max_overlapping_size = (next_byte_aligned_position - start_offset) * 8
needed_size = bit_offset + bit_count
if max_overlapping_size >= needed_size:
parents[original_index] = current_parent, ((start_offset - parent_start_offset) << 3) + bit_offset
if next_byte_aligned_position > record_size:
break
gap = record_size - next_byte_aligned_position
if gap > 0:
dtype_pair = '', 'a{}'.format(gap)
types.append(dtype_pair)
dtype_pair = 'invalidation_bytes', 'a{}'.format(invalidation_bytes_nr)
types.append(dtype_pair)
if PYVERSION == 2:
types = fix_dtype_fields(types)
return parents, dtype(types)
def _get_not_byte_aligned_data(self, data, group, ch_nr):
big_endian_types = (
v4c.DATA_TYPE_UNSIGNED_MOTOROLA,
v4c.DATA_TYPE_REAL_MOTOROLA,
v4c.DATA_TYPE_SIGNED_MOTOROLA,
)
record_size = group['channel_group']['samples_byte_nr']
if self.memory == 'minimum':
channel = Channel(
address=group['channels'][ch_nr],
stream=self._file,
)
else:
channel = group['channels'][ch_nr]
bit_offset = channel['bit_offset']
byte_offset = channel['byte_offset']
bit_count = channel['bit_count']
dependency_list = group['channel_dependencies'][ch_nr]
if dependency_list and isinstance(dependency_list[0], ChannelArrayBlock):
ca_block = dependency_list[0]
size = bit_count >> 3
shape = tuple(
ca_block['dim_size_{}'.format(i)]
for i in range(ca_block['dims'])
)
if ca_block['byte_offset_base'] // size > 1 and len(shape) == 1:
shape += (ca_block['byte_offset_base'] // size, )
dim = 1
for d in shape:
dim *= d
size *= dim
bit_count = size << 3
byte_count = bit_offset + bit_count
if byte_count % 8:
byte_count = (byte_count >> 3) + 1
else:
byte_count >>= 3
types = [
('', 'a{}'.format(byte_offset)),
('vals', '({},)u1'.format(byte_count)),
('', 'a{}'.format(record_size - byte_count - byte_offset)),
]
vals = fromstring(data, dtype=dtype(types))
vals = vals['vals']
if channel['data_type'] not in big_endian_types:
vals = flip(vals, 1)
vals = unpackbits(vals)
vals = roll(vals, bit_offset)
vals = vals.reshape((len(vals) // 8, 8))
vals = packbits(vals)
vals = vals.reshape((len(vals) // byte_count, byte_count))
if bit_count < 64:
mask = 2 ** bit_count - 1
masks = []
while mask:
masks.append(mask & 0xFF)
mask >>= 8
for i in range(byte_count - len(masks)):
masks.append(0)
masks = masks[::-1]
for i, mask in enumerate(masks):
vals[:, i] &= mask
if channel['data_type'] not in big_endian_types:
vals = flip(vals, 1)
if bit_count <= 8:
size = 1
elif bit_count <= 16:
size = 2
elif bit_count <= 32:
size = 4
elif bit_count <= 64:
size = 8
else:
size = bit_count // 8
if size > byte_count:
extra_bytes = size - byte_count
extra = zeros((len(vals), extra_bytes), dtype=uint8)
types = [
('vals', vals.dtype, vals.shape[1:]),
('', extra.dtype, extra.shape[1:]),
]
vals = fromarrays([vals, extra], dtype=dtype(types))
vals = vals.tostring()
fmt = get_fmt(channel['data_type'], size)
if size <= byte_count:
types = [
('vals', fmt),
('', 'a{}'.format(byte_count - size)),
]
else:
types = [('vals', fmt), ]
vals = fromstring(vals, dtype=dtype(types))
return vals['vals']
def _validate_channel_selection(self, name=None, group=None, index=None):
"""Gets channel comment.
Channel can be specified in two ways:
* using the first positional argument *name*
* if there are multiple occurrences for this channel then the
*group* and *index* arguments can be used to select a specific
group.
* if there are multiple occurrences for this channel and either the
*group* or *index* arguments is None then a warning is issued
* using the group number (keyword argument *group*) and the channel
number (keyword argument *index*). Use *info* method for group and
channel numbers
If the *raster* keyword argument is not *None* the output is
interpolated accordingly.
Parameters
----------
name : string
name of channel
group : int
0-based group index
index : int
0-based channel index
Returns
-------
group_index, channel_index : (int, int)
selected channel's group and channel index
"""
if name is None:
if group is None or index is None:
message = ('Invalid arguments for channel selection: '
'must give "name" or, "group" and "index"')
raise MdfException(message)
else:
gp_nr, ch_nr = group, index
if gp_nr > len(self.groups) - 1:
raise MdfException('Group index out of range')
if index > len(self.groups[gp_nr]['channels']) - 1:
raise MdfException('Channel index out of range')
else:
name = name.split('\\')[0]
if name not in self.channels_db:
raise MdfException('Channel "{}" not found'.format(name))
else:
if group is None:
gp_nr, ch_nr = self.channels_db[name][0]
if len(self.channels_db[name]) > 1:
message = ('Multiple occurances for channel "{}". '
'Using first occurance from data group {}. '
'Provide both "group" and "index" arguments'
' to select another data group')
message = message.format(name, gp_nr)
warnings.warn(message)
else:
group_valid = False
for gp_nr, ch_nr in self.channels_db[name]:
if gp_nr == group:
group_valid = True
if index is None:
break
elif index == ch_nr:
break
else:
if group_valid:
gp_nr, ch_nr = self.channels_db[name][group]
message = ('You have selected channel index "{}"'
'of group "{}" for channel "{}", but '
'this channel index is invalid. Using '
'first occurance of "{}" in this group'
' at index "{}"')
message = message.format(
index,
group,
name,
name,
ch_nr,
)
else:
gp_nr, ch_nr = self.channels_db[name][0]
message = ('You have selected group "{}" for '
'channel "{}", but this channel was not'
' found in this group, or this group '
'index does not exist. Using first '
'occurance of "{}" from group "{}"')
message = message.format(group, name, name, gp_nr)
warnings.warn(message)
return gp_nr, ch_nr
[docs] def append(self, signals, source_info='Python', common_timebase=False):
"""
Appends a new data group.
For channel dependencies type Signals, the *samples* attribute must be a
numpy.recarray
Parameters
----------
signals : list
list on *Signal* objects
source_info : str
source information; default 'Python'
common_timebase : bool
flag to hint that the signals have the same timebase
Examples
--------
>>> # case 1 conversion type None
>>> s1 = np.array([1, 2, 3, 4, 5])
>>> s2 = np.array([-1, -2, -3, -4, -5])
>>> s3 = np.array([0.1, 0.04, 0.09, 0.16, 0.25])
>>> t = np.array([0.001, 0.002, 0.003, 0.004, 0.005])
>>> names = ['Positive', 'Negative', 'Float']
>>> units = ['+', '-', '.f']
>>> info = {}
>>> s1 = Signal(samples=s1, timstamps=t, unit='+', name='Positive')
>>> s2 = Signal(samples=s2, timstamps=t, unit='-', name='Negative')
>>> s3 = Signal(samples=s3, timstamps=t, unit='flts', name='Floats')
>>> mdf = MDF3('new.mdf')
>>> mdf.append([s1, s2, s3], 'created by asammdf v1.1.0')
>>> # case 2: VTAB conversions from channels inside another file
>>> mdf1 = MDF3('in.mdf')
>>> ch1 = mdf1.get("Channel1_VTAB")
>>> ch2 = mdf1.get("Channel2_VTABR")
>>> sigs = [ch1, ch2]
>>> mdf2 = MDF3('out.mdf')
>>> mdf2.append(sigs, 'created by asammdf v1.1.0')
"""
if not signals:
message = '"append" requires a non-empty list of Signal objects'
raise MdfException(message)
# check if the signals have a common timebase
# if not interpolate the signals using the union of all timbases
t_ = signals[0].timestamps
if not common_timebase:
for s in signals[1:]:
if not array_equal(s.timestamps, t_):
different = True
break
else:
different = False
if different:
times = [s.timestamps for s in signals]
t = reduce(union1d, times).flatten().astype(float64)
signals = [s.interp(t) for s in signals]
times = None
else:
t = t_
else:
t = t_
# split regular from composed signals. Composed signals have recarray
# samples or multimendional ndarray.
# The regular signals will be first added to the group.
# The composed signals will be saved along side the fields, which will
# be saved as new signals.
simple_signals = [
sig for sig in signals
if len(sig.samples.shape) <= 1 and sig.samples.dtype.names is None
]
composed_signals = [
sig for sig in signals
if len(sig.samples.shape) > 1 or
sig.samples.dtype.names
]
dg_cntr = len(self.groups)
gp = {}
gp['channels'] = gp_channels = []
gp['channel_conversions'] = gp_conv = []
gp['channel_sources'] = gp_source = []
gp['channel_dependencies'] = gp_dep = []
gp['texts'] = gp_texts = {
'channels': [],
'sources': [],
'conversions': [],
'conversion_tab': [],
'channel_group': [],
}
self.groups.append(gp)
cycles_nr = len(t)
fields = []
types = []
parents = {}
ch_cntr = 0
offset = 0
field_names = set()
# setup all blocks related to the time master channel
# time channel texts
for key in ('channels', 'sources', 'channel_group', 'conversions'):
gp['texts'][key].append({})
for key in ('conversion_tab', ):
gp['texts'][key].append(None)
memory = self.memory
file = self._tempfile
write = file.write
tell = file.tell
block = TextBlock(text='t', meta=False)
if memory != 'minimum':
gp_texts['channels'][-1]['name_addr'] = block
else:
address = tell()
gp_texts['channels'][-1]['name_addr'] = address
write(bytes(block))
block = TextBlock(text='s', meta=False)
if memory != 'minimum':
gp_texts['conversions'][-1]['unit_addr'] = block
else:
address = tell()
gp_texts['conversions'][-1]['unit_addr'] = address
write(bytes(block))
si_text = TextBlock(text=source_info, meta=False)
if memory == 'minimum':
address = tell()
si_text.address = address
write(bytes(si_text))
gp_texts['sources'][-1]['name_addr'] = address
gp_texts['sources'][-1]['path_addr'] = address
gp_texts['channel_group'][-1]['acq_name_addr'] = address
gp_texts['channel_group'][-1]['comment_addr'] = address
else:
gp_texts['sources'][-1]['name_addr'] = si_text
gp_texts['sources'][-1]['path_addr'] = si_text
gp_texts['channel_group'][-1]['acq_name_addr'] = si_text
gp_texts['channel_group'][-1]['comment_addr'] = si_text
# conversion and source for time channel
if memory == 'minimum':
gp_conv.append(0)
address = tell()
block = SourceInformation()
write(bytes(block))
gp_source.append(address)
else:
gp_conv.append(None)
gp_source.append(SourceInformation())
# time channel
t_type, t_size = fmt_to_datatype(t.dtype)
kargs = {
'channel_type': v4c.CHANNEL_TYPE_MASTER,
'data_type': t_type,
'sync_type': 1,
'byte_offset': 0,
'bit_offset': 0,
'bit_count': t_size,
'min_raw_value': t[0] if cycles_nr else 0,
'max_raw_value': t[-1] if cycles_nr else 0,
'lower_limit': t[0] if cycles_nr else 0,
'upper_limit': t[-1] if cycles_nr else 0,
'flags': v4c.FLAG_PHY_RANGE_OK | v4c.FLAG_VAL_RANGE_OK,
}
ch = Channel(**kargs)
ch.name = name = 't'
if memory == 'minimum':
address = tell()
write(bytes(ch))
gp_channels.append(address)
else:
gp_channels.append(ch)
if name not in self.channels_db:
self.channels_db[name] = []
self.channels_db[name].append((dg_cntr, ch_cntr))
self.masters_db[dg_cntr] = 0
# data group record parents
parents[ch_cntr] = name, 0
# time channel doesn't have channel dependencies
gp_dep.append(None)
fields.append(t)
types.append((name, t.dtype))
field_names.add(name)
offset += t_size // 8
ch_cntr += 1
if self._compact_integers_on_append:
compacted_signals = [
{'signal': sig}
for sig in simple_signals
if sig.samples.dtype.kind in 'ui'
]
max_itemsize = 1
dtype_ = dtype(uint8)
for signal in compacted_signals:
itemsize = signal['signal'].samples.dtype.itemsize
min_val, max_val = get_min_max(signal['signal'].samples)
signal['min'], signal['max'] = min_val, max_val
minimum_bitlength = (itemsize // 2) * 8 + 1
bit_length = max(
int(min_val).bit_length(),
int(max_val).bit_length(),
)
signal['bit_count'] = max(minimum_bitlength, bit_length)
if itemsize > max_itemsize:
dtype_ = dtype('<u{}'.format(itemsize))
max_itemsize = itemsize
compacted_signals.sort(key=lambda x: x['bit_count'])
simple_signals = [
sig
for sig in simple_signals
if sig.samples.dtype.kind not in 'ui'
]
dtype_size = dtype_.itemsize * 8
else:
compacted_signals = []
# first try to compact unsigned integers
while compacted_signals:
# channels texts
cluster = []
tail = compacted_signals.pop()
size = tail['bit_count']
cluster.append(tail)
while size < dtype_size and compacted_signals:
head = compacted_signals[0]
head_size = head['bit_count']
if head_size + size > dtype_size:
break
else:
cluster.append(compacted_signals.pop(0))
size += head_size
bit_offset = 0
field_name = get_unique_name(field_names, 'COMPACT')
types.append((field_name, dtype_))
field_names.add(field_name)
values = zeros(cycles_nr, dtype=dtype_)
for signal_d in cluster:
signal = signal_d['signal']
bit_count = signal_d['bit_count']
min_val = signal_d['min']
max_val = signal_d['max']
name = signal.name
for key in ('channels', 'sources'):
gp['texts'][key].append({})
for key in ('conversion_tab', 'conversions'):
gp['texts'][key].append(None)
block = TextBlock(text=name, meta=False)
if memory == 'minimum':
address = tell()
gp_texts['channels'][-1]['name_addr'] = address
write(bytes(block))
gp_texts['sources'][-1]['name_addr'] = si_text.address
gp_texts['sources'][-1]['path_addr'] = si_text.address
else:
gp_texts['channels'][-1]['name_addr'] = block
gp_texts['sources'][-1]['name_addr'] = si_text
gp_texts['sources'][-1]['path_addr'] = si_text
if signal.unit:
block = TextBlock(text=signal.unit, meta=False)
if memory == 'minimum':
address = tell()
gp_texts['channels'][-1]['unit_addr'] = address
write(bytes(block))
else:
gp_texts['channels'][-1]['unit_addr'] = block
# conversions for channel
info = signal.info
conv_texts_tab = {}
if info and 'raw' in info:
kargs = {}
raw = info['raw']
phys = info['phys']
if raw.dtype.kind == 'S':
kargs['conversion_type'] = v4c.CONVERSION_TYPE_TTAB
for i, (r_, p_) in enumerate(zip(raw, phys)):
kargs['text_{}'.format(i)] = 0
kargs['val_{}'.format(i)] = p_
block = TextBlock(
text=r_,
meta=False,
)
if memory != 'minimum':
conv_texts_tab['text_{}'.format(i)] = block
else:
address = tell()
conv_texts_tab['text_{}'.format(i)] = address
write(bytes(block))
kargs['val_default'] = info['default']
kargs['links_nr'] = len(raw) + 4
else:
kargs['conversion_type'] = v4c.CONVERSION_TYPE_TABX
for i, (r_, p_) in enumerate(zip(raw, phys)):
kargs['text_{}'.format(i)] = 0
kargs['val_{}'.format(i)] = r_
block = TextBlock(
text=p_,
meta=False,
)
if memory != 'minimum':
conv_texts_tab['text_{}'.format(i)] = block
else:
address = tell()
conv_texts_tab['text_{}'.format(i)] = address
write(bytes(block))
if info.get('default', b''):
block = TextBlock(
text=info['default'],
meta=False,
)
if memory != 'minimum':
conv_texts_tab['default_addr'] = block
else:
address = tell()
conv_texts_tab['default_addr'] = address
write(bytes(block))
kargs['default_addr'] = 0
kargs['links_nr'] = len(raw) + 5
block = ChannelConversion(**kargs)
if memory != 'minimum':
gp_conv.append(block)
else:
address = tell()
gp_conv.append(address)
write(bytes(block))
elif info and 'lower' in info:
kargs = {}
kargs['conversion_type'] = v4c.CONVERSION_TYPE_RTABX
lower = info['lower']
upper = info['upper']
texts = info['phys']
kargs['ref_param_nr'] = len(upper)
kargs['default_addr'] = info.get('default', 0)
kargs['links_nr'] = len(lower) + 5
for i, (u_, l_, t_) in enumerate(zip(upper, lower, texts)):
kargs['lower_{}'.format(i)] = l_
kargs['upper_{}'.format(i)] = u_
kargs['text_{}'.format(i)] = 0
block = TextBlock(
text=t_,
meta=False,
)
if memory != 'minimum':
conv_texts_tab['text_{}'.format(i)] = block
else:
address = tell()
conv_texts_tab['text_{}'.format(i)] = address
write(bytes(block))
if info.get('default', b''):
block = TextBlock(
text=info['default'],
meta=False,
)
if memory != 'minimum':
conv_texts_tab['default_addr'] = block
else:
address = tell()
conv_texts_tab['default_addr'] = address
write(bytes(block))
kargs['default_addr'] = 0
block = ChannelConversion(**kargs)
if memory != 'minimum':
gp_conv.append(block)
else:
address = tell()
gp_conv.append(address)
write(bytes(block))
else:
if memory != 'minimum':
gp_conv.append(None)
else:
gp_conv.append(0)
if conv_texts_tab:
gp['texts']['conversion_tab'][-1] = conv_texts_tab
# source for channel
if memory != 'minimum':
gp_source.append(SourceInformation())
else:
address = tell()
block = SourceInformation()
write(bytes(block))
gp_source.append(address)
# compute additional byte offset for large records size
if signal.samples.dtype.kind == 'u':
data_type = v4c.DATA_TYPE_UNSIGNED_INTEL
else:
data_type = v4c.DATA_TYPE_SIGNED_INTEL
kargs = {
'channel_type': v4c.CHANNEL_TYPE_VALUE,
'bit_count': bit_count,
'byte_offset': offset + bit_offset // 8,
'bit_offset': bit_offset % 8,
'data_type': data_type,
'min_raw_value': min_val if min_val <= max_val else 0,
'max_raw_value': max_val if min_val <= max_val else 0,
'lower_limit': min_val if min_val <= max_val else 0,
'upper_limit': max_val if min_val <= max_val else 0,
}
if min_val > max_val:
kargs['flags'] = 0
else:
kargs['flags'] = v4c.FLAG_PHY_RANGE_OK | v4c.FLAG_VAL_RANGE_OK
ch = Channel(**kargs)
ch.name = name
if memory != 'minimum':
gp_channels.append(ch)
else:
address = tell()
write(bytes(ch))
gp_channels.append(address)
if name not in self.channels_db:
self.channels_db[name] = []
self.channels_db[name].append((dg_cntr, ch_cntr))
# update the parents as well
parents[ch_cntr] = field_name, bit_offset
values += signal.samples.astype(dtype_) << bit_offset
bit_offset += bit_count
ch_cntr += 1
# simple channels don't have channel dependencies
gp_dep.append(None)
offset += dtype_.itemsize
fields.append(values)
# first add the signals in the simple signal list
for signal in simple_signals:
name = signal.name
for key in ('channels', 'sources'):
gp['texts'][key].append({})
for key in ('conversion_tab', 'conversions'):
gp['texts'][key].append(None)
block = TextBlock(text=name, meta=False)
if memory != 'minimum':
gp_texts['channels'][-1]['name_addr'] = block
else:
address = tell()
write(bytes(block))
gp_texts['channels'][-1]['name_addr'] = address
if signal.unit:
block = TextBlock(
text=signal.unit,
meta=False,
)
if memory != 'minimum':
gp_texts['channels'][-1]['unit_addr'] = block
else:
address = tell()
write(bytes(block))
gp_texts['channels'][-1]['unit_addr'] = address
if memory != 'minimum':
gp_texts['sources'][-1]['name_addr'] = si_text
gp_texts['sources'][-1]['path_addr'] = si_text
else:
gp_texts['sources'][-1]['name_addr'] = si_text.address
gp_texts['sources'][-1]['path_addr'] = si_text.address
# conversions for channel
info = signal.info
conv_texts_tab = {}
if info and 'raw' in info:
kargs = {}
raw = info['raw']
phys = info['phys']
if raw.dtype.kind == 'S':
kargs['conversion_type'] = v4c.CONVERSION_TYPE_TTAB
for i, (r_, p_) in enumerate(zip(raw, phys)):
kargs['text_{}'.format(i)] = 0
kargs['val_{}'.format(i)] = p_
block = TextBlock(
text=r_,
meta=False,
)
if memory != 'minimum':
conv_texts_tab['text_{}'.format(i)] = block
else:
address = tell()
conv_texts_tab['text_{}'.format(i)] = address
write(bytes(block))
kargs['val_default'] = info['default']
kargs['links_nr'] = len(raw) + 4
else:
kargs['conversion_type'] = v4c.CONVERSION_TYPE_TABX
for i, (r_, p_) in enumerate(zip(raw, phys)):
kargs['text_{}'.format(i)] = 0
kargs['val_{}'.format(i)] = r_
block = TextBlock(
text=p_,
meta=False,
)
if memory != 'minimum':
conv_texts_tab['text_{}'.format(i)] = block
else:
address = tell()
conv_texts_tab['text_{}'.format(i)] = address
write(bytes(block))
if info.get('default', b''):
block = TextBlock(
text=info['default'],
meta=False,
)
if memory != 'minimum':
conv_texts_tab['default_addr'] = block
else:
address = tell()
conv_texts_tab['default_addr'] = address
write(bytes(block))
kargs['default_addr'] = 0
kargs['links_nr'] = len(raw) + 5
block = ChannelConversion(**kargs)
if memory != 'minimum':
gp_conv.append(block)
else:
address = tell()
gp_conv.append(address)
write(bytes(block))
elif info and 'lower' in info:
kargs = {}
kargs['conversion_type'] = v4c.CONVERSION_TYPE_RTABX
lower = info['lower']
upper = info['upper']
texts = info['phys']
kargs['ref_param_nr'] = len(upper)
kargs['default_addr'] = info.get('default', 0)
kargs['links_nr'] = len(lower) + 5
for i, (u_, l_, t_) in enumerate(zip(upper, lower, texts)):
kargs['lower_{}'.format(i)] = l_
kargs['upper_{}'.format(i)] = u_
kargs['text_{}'.format(i)] = 0
block = TextBlock(
text=t_,
meta=False,
)
if memory != 'minimum':
conv_texts_tab['text_{}'.format(i)] = block
else:
address = tell()
conv_texts_tab['text_{}'.format(i)] = address
write(bytes(block))
if info.get('default', b''):
block = TextBlock(
text=info['default'],
meta=False,
)
if memory != 'minimum':
conv_texts_tab['default_addr'] = block
else:
address = tell()
conv_texts_tab['default_addr'] = address
write(bytes(block))
kargs['default_addr'] = 0
block = ChannelConversion(**kargs)
if memory != 'minimum':
gp_conv.append(block)
else:
address = tell()
gp_conv.append(address)
write(bytes(block))
else:
if memory != 'minimum':
gp_conv.append(None)
else:
gp_conv.append(0)
if conv_texts_tab:
gp['texts']['conversion_tab'][-1] = conv_texts_tab
# source for channel
if memory != 'minimum':
gp_source.append(SourceInformation())
else:
block = SourceInformation()
address = tell()
write(bytes(block))
gp_source.append(address)
# compute additional byte offset for large records size
s_type, s_size = fmt_to_datatype(signal.samples.dtype)
byte_size = max(s_size // 8, 1)
min_val, max_val = get_min_max(signal.samples)
kargs = {
'channel_type': v4c.CHANNEL_TYPE_VALUE,
'bit_count': s_size,
'byte_offset': offset,
'bit_offset': 0,
'data_type': s_type,
'min_raw_value': min_val if min_val <= max_val else 0,
'max_raw_value': max_val if min_val <= max_val else 0,
'lower_limit': min_val if min_val <= max_val else 0,
'upper_limit': max_val if min_val <= max_val else 0,
}
if min_val > max_val:
kargs['flags'] = 0
else:
kargs['flags'] = v4c.FLAG_PHY_RANGE_OK | v4c.FLAG_VAL_RANGE_OK
ch = Channel(**kargs)
ch.name = name
if memory != 'minimum':
gp_channels.append(ch)
else:
address = tell()
write(bytes(ch))
gp_channels.append(address)
offset += byte_size
if name not in self.channels_db:
self.channels_db[name] = []
self.channels_db[name].append((dg_cntr, ch_cntr))
# update the parents as well
field_name = get_unique_name(field_names, name)
parents[ch_cntr] = field_name, 0
fields.append(signal.samples)
types.append((field_name, signal.samples.dtype))
field_names.add(field_name)
ch_cntr += 1
# simple channels don't have channel dependencies
gp_dep.append(None)
canopen_time_fields = (
'ms',
'days',
)
canopen_date_fields = (
'ms',
'min',
'hour',
'day',
'month',
'year',
'summer_time',
'day_of_week',
)
for signal in composed_signals:
names = signal.samples.dtype.names
if names is None:
names = []
name = signal.name
if names in (canopen_time_fields, canopen_date_fields):
field_name = get_unique_name(field_names, name)
field_names.add(field_name)
if names == canopen_time_fields:
vals = signal.samples.tostring()
fields.append(frombuffer(vals, dtype='V6'))
types.append((field_name, 'V6'))
byte_size = 6
s_type = v4c.DATA_TYPE_CANOPEN_TIME
else:
vals = []
for field in ('ms', 'min', 'hour', 'day', 'month', 'year'):
vals.append(signal.samples[field])
vals = fromarrays(vals).tostring()
fields.append(frombuffer(vals, dtype='V7'))
types.append((field_name, 'V7'))
byte_size = 7
s_type = v4c.DATA_TYPE_CANOPEN_DATE
s_size = byte_size << 3
# add channel texts
for key in ('channels', 'sources'):
gp['texts'][key].append({})
for key in ('conversion_tab', 'conversions'):
gp['texts'][key].append(None)
block = TextBlock(text=name, meta=False)
if memory != 'minimum':
gp_texts['channels'][-1]['name_addr'] = block
else:
address = tell()
write(bytes(block))
gp_texts['channels'][-1]['name_addr'] = address
if signal.unit:
block = TextBlock(
text=signal.unit,
meta=False,
)
if memory != 'minimum':
gp_texts['channels'][-1]['unit_addr'] = block
else:
address = tell()
write(bytes(block))
gp_texts['channels'][-1]['unit_addr'] = address
if memory != 'minimum':
gp_texts['sources'][-1]['name_addr'] = si_text
gp_texts['sources'][-1]['path_addr'] = si_text
else:
gp_texts['sources'][-1]['name_addr'] = si_text.address
gp_texts['sources'][-1]['path_addr'] = si_text.address
# add channel conversion
if memory != 'minimum':
gp_conv.append(None)
else:
gp_conv.append(0)
# source for time
if memory != 'minimum':
gp_source.append(SourceInformation())
else:
address = tell()
block = SourceInformation()
write(bytes(block))
gp_source.append(address)
# there is no chanel dependency
gp_dep.append(None)
# add channel block
kargs = {
'channel_type': v4c.CHANNEL_TYPE_VALUE,
'bit_count': s_size,
'byte_offset': offset,
'bit_offset': 0,
'data_type': s_type,
'min_raw_value': 0,
'max_raw_value': 0,
'lower_limit': 0,
'upper_limit': 0,
'flags': 0,
}
ch = Channel(**kargs)
ch.name = name
if memory != 'minimum':
gp_channels.append(ch)
else:
address = tell()
write(bytes(ch))
gp_channels.append(address)
offset += byte_size
if name in self.channels_db:
self.channels_db[name].append((dg_cntr, ch_cntr))
else:
self.channels_db[name] = []
self.channels_db[name].append((dg_cntr, ch_cntr))
# update the parents as well
parents[ch_cntr] = field_name, 0
ch_cntr += 1
elif names and names[0] != signal.name:
# here we have a structure channel composition
field_name = get_unique_name(field_names, name)
field_names.add(field_name)
# first we add the structure channel
# add channel texts
for key in ('channels', 'sources'):
gp['texts'][key].append({})
for key in ('conversion_tab', 'conversions'):
gp['texts'][key].append(None)
block = TextBlock(text=name, meta=False)
if memory != 'minimum':
gp_texts['channels'][-1]['name_addr'] = block
else:
address = tell()
write(bytes(block))
gp_texts['channels'][-1]['name_addr'] = address
if signal.unit:
block = TextBlock(
text=signal.unit,
meta=False,
)
if memory != 'minimum':
gp_texts['channels'][-1]['unit_addr'] = block
else:
address = tell()
write(bytes(block))
gp_texts['channels'][-1]['unit_addr'] = address
if memory != 'minimum':
gp_texts['sources'][-1]['name_addr'] = si_text
gp_texts['sources'][-1]['path_addr'] = si_text
else:
gp_texts['sources'][-1]['name_addr'] = si_text.address
gp_texts['sources'][-1]['path_addr'] = si_text.address
# add channel conversion
if memory != 'minimum':
gp_conv.append(None)
else:
gp_conv.append(0)
# source for time
if memory != 'minimum':
gp_source.append(SourceInformation())
else:
address = tell()
block = SourceInformation()
write(bytes(block))
gp_source.append(address)
# add channel block
kargs = {
'channel_type': v4c.CHANNEL_TYPE_VALUE,
'bit_count': 8,
'byte_offset': offset,
'bit_offset': 0,
'data_type': v4c.DATA_TYPE_BYTEARRAY,
'min_raw_value': 0,
'max_raw_value': 0,
'lower_limit': 0,
'upper_limit': 0,
'flags': 0,
}
ch = Channel(**kargs)
ch.name = name
if memory != 'minimum':
gp_channels.append(ch)
else:
address = tell()
write(bytes(ch))
gp_channels.append(address)
if name not in self.channels_db:
self.channels_db[name] = []
self.channels_db[name].append((dg_cntr, ch_cntr))
# update the parents as well
parents[ch_cntr] = name, 0
ch_cntr += 1
dep_list = []
gp_dep.append(dep_list)
# then we add the fields
for name in names:
field_name = get_unique_name(field_names, name)
field_names.add(field_name)
samples = signal.samples[name]
s_type, s_size = fmt_to_datatype(samples.dtype)
byte_size = s_size >> 3
fields.append(samples)
types.append((field_name, samples.dtype))
# add channel texts
for key in ('channels', 'sources'):
gp['texts'][key].append({})
for key in ('conversion_tab', 'conversions'):
gp['texts'][key].append(None)
block = TextBlock(text=name, meta=False)
if memory != 'minimum':
gp_texts['channels'][-1]['name_addr'] = block
else:
address = tell()
write(bytes(block))
gp_texts['channels'][-1]['name_addr'] = address
if memory != 'minimum':
gp_texts['sources'][-1]['name_addr'] = si_text
gp_texts['sources'][-1]['path_addr'] = si_text
else:
gp_texts['sources'][-1]['name_addr'] = si_text.address
gp_texts['sources'][-1]['path_addr'] = si_text.address
# add channel conversion
if memory != 'minimum':
gp_conv.append(None)
else:
gp_conv.append(0)
# source for time
if memory != 'minimum':
gp_source.append(SourceInformation())
else:
address = tell()
block = SourceInformation()
write(bytes(block))
gp_source.append(address)
# add channel block
min_val, max_val = get_min_max(signal.samples)
kargs = {
'channel_type': v4c.CHANNEL_TYPE_VALUE,
'bit_count': s_size,
'byte_offset': offset,
'bit_offset': 0,
'data_type': s_type,
'min_raw_value': min_val if min_val <= max_val else 0,
'max_raw_value': max_val if min_val <= max_val else 0,
'lower_limit': min_val if min_val <= max_val else 0,
'upper_limit': max_val if min_val <= max_val else 0,
'flags': v4c.FLAG_PHY_RANGE_OK | v4c.FLAG_VAL_RANGE_OK,
}
ch = Channel(**kargs)
ch.name = name
if memory != 'minimum':
gp_channels.append(ch)
dep_list.append(ch)
else:
address = tell()
write(bytes(ch))
gp_channels.append(address)
dep_list.append(address)
offset += byte_size
if name not in self.channels_db:
self.channels_db[name] = []
self.channels_db[name].append((dg_cntr, ch_cntr))
# update the parents as well
parents[ch_cntr] = field_name, 0
ch_cntr += 1
gp_dep.append(None)
else:
# here we have channel arrays or mdf v3 channel dependencies
if names:
samples = signal.samples[names[0]]
else:
samples = signal.samples
shape = samples.shape[1:]
if len(shape) > 1:
# add channel dependency block for composed parent channel
dims_nr = len(shape)
names_nr = len(names)
if names_nr == 0:
kargs = {
'dims': dims_nr,
'ca_type': v4c.CA_TYPE_LOOKUP,
'flags': v4c.FLAG_CA_FIXED_AXIS,
'byte_offset_base': samples.dtype.itemsize,
}
for i in range(dims_nr):
kargs['dim_size_{}'.format(i)] = shape[i]
elif len(names) == 1:
kargs = {
'dims': dims_nr,
'ca_type': v4c.CA_TYPE_ARRAY,
'flags': 0,
'byte_offset_base': samples.dtype.itemsize,
}
for i in range(dims_nr):
kargs['dim_size_{}'.format(i)] = shape[i]
else:
kargs = {
'dims': dims_nr,
'ca_type': v4c.CA_TYPE_LOOKUP,
'flags': v4c.FLAG_CA_AXIS,
'byte_offset_base': samples.dtype.itemsize,
}
for i in range(dims_nr):
kargs['dim_size_{}'.format(i)] = shape[i]
parent_dep = ChannelArrayBlock(**kargs)
gp_dep.append([parent_dep, ])
else:
# add channel dependency block for composed parent channel
kargs = {
'dims': 1,
'ca_type': v4c.CA_TYPE_SCALE_AXIS,
'flags': 0,
'byte_offset_base': samples.dtype.itemsize,
'dim_size_0': shape[0],
}
parent_dep = ChannelArrayBlock(**kargs)
gp_dep.append([parent_dep, ])
field_name = get_unique_name(field_names, name)
field_names.add(field_name)
fields.append(samples)
dtype_pair = field_name, samples.dtype, shape
types.append(dtype_pair)
# first we add the structure channel
# add channel texts
for key in ('channels', 'sources'):
gp['texts'][key].append({})
for key in ('conversion_tab', 'conversions'):
gp['texts'][key].append(None)
block = TextBlock(text=name, meta=False)
if memory != 'minimum':
gp_texts['channels'][-1]['name_addr'] = block
else:
address = tell()
write(bytes(block))
gp_texts['channels'][-1]['name_addr'] = address
if signal.unit:
block = TextBlock(
text=signal.unit,
meta=False,
)
if memory != 'minimum':
gp_texts['channels'][-1]['unit_addr'] = block
else:
address = tell()
write(bytes(block))
gp_texts['channels'][-1]['unit_addr'] = address
if memory != 'minimum':
gp_texts['sources'][-1]['name_addr'] = si_text
gp_texts['sources'][-1]['path_addr'] = si_text
else:
gp_texts['sources'][-1]['name_addr'] = si_text.address
gp_texts['sources'][-1]['path_addr'] = si_text.address
# add channel conversion
if memory != 'minimum':
gp_conv.append(None)
else:
gp_conv.append(0)
# source for time
if memory != 'minimum':
gp_source.append(SourceInformation())
else:
address = tell()
block = SourceInformation()
write(bytes(block))
gp_source.append(address)
s_type, s_size = fmt_to_datatype(samples.dtype)
# add channel block
kargs = {
'channel_type': v4c.CHANNEL_TYPE_VALUE,
'bit_count': s_size,
'byte_offset': offset,
'bit_offset': 0,
'data_type': s_type,
'min_raw_value': 0,
'max_raw_value': 0,
'lower_limit': 0,
'upper_limit': 0,
'flags': 0,
}
ch = Channel(**kargs)
ch.name = name
if memory != 'minimum':
gp_channels.append(ch)
else:
address = tell()
write(bytes(ch))
gp_channels.append(address)
size = s_size >> 3
for dim in shape:
size *= dim
offset += size
if name not in self.channels_db:
self.channels_db[name] = []
self.channels_db[name].append((dg_cntr, ch_cntr))
# update the parents as well
parents[ch_cntr] = name, 0
ch_cntr += 1
for name in names[1:]:
field_name = get_unique_name(field_names, name)
field_names.add(field_name)
samples = signal.samples[name]
shape = samples.shape[1:]
fields.append(samples)
types.append((field_name, samples.dtype, shape))
# add composed parent signal texts
for key in ('channels', 'sources'):
gp['texts'][key].append({})
for key in ('conversion_tab', 'conversions'):
gp['texts'][key].append(None)
block = TextBlock(text=name, meta=False)
if memory != 'minimum':
gp_texts['channels'][-1]['name_addr'] = block
else:
address = tell()
write(bytes(block))
gp_texts['channels'][-1]['name_addr'] = address
if memory != 'minimum':
gp_texts['sources'][-1]['name_addr'] = si_text
gp_texts['sources'][-1]['path_addr'] = si_text
else:
gp_texts['sources'][-1]['name_addr'] = si_text.address
gp_texts['sources'][-1]['path_addr'] = si_text.address
# add channel conversion
if memory != 'minimum':
gp_conv.append(None)
else:
gp_conv.append(0)
# source for time
if memory != 'minimum':
gp_source.append(SourceInformation())
else:
address = tell()
block = SourceInformation()
write(bytes(block))
gp_source.append(address)
# add channel dependency block
kargs = {
'dims': 1,
'ca_type': v4c.CA_TYPE_SCALE_AXIS,
'flags': 0,
'byte_offset_base': samples.dtype.itemsize,
'dim_size_0': shape[0],
}
dep = ChannelArrayBlock(**kargs)
gp_dep.append([dep, ])
# add components channel
min_val, max_val = get_min_max(samples)
s_type, s_size = fmt_to_datatype(samples.dtype)
byte_size = max(s_size // 8, 1)
kargs = {
'channel_type': v4c.CHANNEL_TYPE_VALUE,
'bit_count': s_size,
'byte_offset': offset,
'bit_offset': 0,
'data_type': s_type,
'min_raw_value': min_val if min_val <= max_val else 0,
'max_raw_value': max_val if min_val <= max_val else 0,
'lower_limit': min_val if min_val <= max_val else 0,
'upper_limit': max_val if min_val <= max_val else 0,
'flags': v4c.FLAG_PHY_RANGE_OK | v4c.FLAG_VAL_RANGE_OK,
}
channel = Channel(**kargs)
channel.name = name
if memory != 'minimum':
gp_channels.append(channel)
else:
address = tell()
write(bytes(channel))
gp_channels.append(address)
parent_dep.referenced_channels.append((ch_cntr, dg_cntr))
for dim in shape:
byte_size *= dim
offset += byte_size
if name not in self.channels_db:
self.channels_db[name] = []
self.channels_db[name].append((dg_cntr, ch_cntr))
# update the parents as well
parents[ch_cntr] = field_name, 0
ch_cntr += 1
# channel group
kargs = {
'cycles_nr': cycles_nr,
'samples_byte_nr': offset,
}
gp['channel_group'] = ChannelGroup(**kargs)
gp['size'] = cycles_nr * offset
# data group
gp['data_group'] = DataGroup()
# data block
if PYVERSION == 2:
types = fix_dtype_fields(types)
types = dtype(types)
gp['sorted'] = True
gp['types'] = types
gp['parents'] = parents
samples = fromarrays(fields, dtype=types)
signals = None
del signals
try:
block = samples.tostring()
if memory == 'full':
gp['data_location'] = v4c.LOCATION_MEMORY
gp['data_block'] = DataBlock(data=block)
else:
gp['data_location'] = v4c.LOCATION_TEMPORARY_FILE
data_address = self._tempfile.tell()
gp['data_group']['data_block_addr'] = data_address
self._tempfile.write(bytes(block))
except MemoryError:
if memory == 'full':
raise
else:
gp['data_location'] = v4c.LOCATION_TEMPORARY_FILE
data_address = self._tempfile.tell()
gp['data_group']['data_block_addr'] = data_address
for sample in samples:
self._tempfile.write(sample.tostring())
[docs] def attach(self,
data,
file_name=None,
comment=None,
compression=True,
mime=r'application/octet-stream'):
""" attach embedded attachment as application/octet-stream
Parameters
----------
data : bytes
data to be attached
file_name : str
string file name
comment : str
attachment comment
compression : bool
use compression for embedded attachment data
mime : str
mime type string
"""
creator_index = len(self.file_history)
fh = FileHistory()
text = """<FHcomment>
<TX>Added new embedded attachment from {}</TX>
<tool_id>asammdf</tool_id>
<tool_vendor>asammdf</tool_vendor>
<tool_version>{}</tool_version>
</FHcomment>"""
text = text.format(
file_name if file_name else 'bin.bin',
__version__,
)
fh_text = TextBlock(text=text, meta=True)
self.file_history.append((fh, fh_text))
texts = {}
texts['mime_addr'] = TextBlock(text=mime, meta=False)
if comment:
texts['comment_addr'] = TextBlock(text=comment, meta=False)
text = file_name if file_name else 'bin.bin'
texts['file_name_addr'] = TextBlock(text=text)
at_block = AttachmentBlock(data=data, compression=compression)
at_block['creator_index'] = creator_index
self.attachments.append((at_block, texts))
[docs] def close(self):
""" if the MDF was created with memory=False and new
channels have been appended, then this must be called just before the
object is not used anymore to clean-up the temporary file"""
if self._tempfile is not None:
self._tempfile.close()
if self._file is not None:
self._file.close()
[docs] def get_channel_unit(self, name=None, group=None, index=None):
"""Gets channel unit.
Channel can be specified in two ways:
* using the first positional argument *name*
* if there are multiple occurrences for this channel then the
*group* and *index* arguments can be used to select a specific
group.
* if there are multiple occurrences for this channel and either the
*group* or *index* arguments is None then a warning is issued
* using the group number (keyword argument *group*) and the channel
number (keyword argument *index*). Use *info* method for group and
channel numbers
If the *raster* keyword argument is not *None* the output is
interpolated accordingly.
Parameters
----------
name : string
name of channel
group : int
0-based group index
index : int
0-based channel index
Returns
-------
unit : str
found channel unit
"""
gp_nr, ch_nr = self._validate_channel_selection(
name,
group,
index,
)
grp = self.groups[gp_nr]
if grp['data_location'] == v4c.LOCATION_ORIGINAL_FILE:
stream = self._file
else:
stream = self._tempfile
conv_texts = grp['texts']['conversions'][ch_nr]
channel_texts = grp['texts']['channels'][ch_nr]
if conv_texts and 'unit_addr' in conv_texts:
if not self.memory == 'minimum':
unit = conv_texts['unit_addr']
else:
unit = TextBlock(
address=conv_texts['unit_addr'],
stream=stream,
)
if PYVERSION == 3:
try:
unit = unit['text'].decode('utf-8').strip(' \n\t\0')
except UnicodeDecodeError:
unit = ''
else:
unit = unit['text'].strip(' \n\t\0')
else:
# search for physical unit in channel texts
if 'unit_addr' in channel_texts:
if not self.memory == 'minimum':
unit = channel_texts['unit_addr']
else:
unit = TextBlock(
address=channel_texts['unit_addr'],
stream=stream,
)
if PYVERSION == 3:
unit = unit['text'].decode('utf-8').strip(' \n\t\0')
else:
unit = unit['text'].strip(' \n\t\0')
else:
unit = ''
return unit
[docs] def get(self,
name=None,
group=None,
index=None,
raster=None,
samples_only=False,
data=None):
"""Gets channel samples.
Channel can be specified in two ways:
* using the first positional argument *name*
* if there are multiple occurances for this channel then the
*group* and *index* arguments can be used to select a specific
group.
* if there are multiple occurances for this channel and either the
*group* or *index* arguments is None then a warning is issued
* using the group number (keyword argument *group*) and the channel
number (keyword argument *index*). Use *info* method for group and
channel numbers
If the *raster* keyword argument is not *None* the output is
interpolated accordingly
Parameters
----------
name : string
name of channel
group : int
0-based group index
index : int
0-based channel index
raster : float
time raster in seconds
samples_only : bool
if *True* return only the channel samples as numpy array; if
*False* return a *Signal* object
Returns
-------
res : (numpy.array | Signal)
returns *Signal* if *samples_only*=*False* (default option),
otherwise returns numpy.array
The *Signal* samples are:
* numpy recarray for channels that have composition/channel
array address or for channel of type BYTEARRAY, CANOPENDATE,
CANOPENTIME
* numpy array for all the rest
Raises
------
MdfError :
* if the channel name is not found
* if the group index is out of range
* if the channel index is out of range
"""
gp_nr, ch_nr = self._validate_channel_selection(
name,
group,
index,
)
memory = self.memory
grp = self.groups[gp_nr]
if grp['data_location'] == v4c.LOCATION_ORIGINAL_FILE:
stream = self._file
else:
stream = self._tempfile
if memory == 'minimum':
channel = Channel(
address=grp['channels'][ch_nr],
stream=stream,
)
addr = grp['channel_conversions'][ch_nr]
if addr:
conversion = ChannelConversion(
address=addr,
stream=stream,
)
else:
conversion = None
if name is None:
name = TextBlock(
address=grp['texts']['channels'][ch_nr]['name_addr'],
stream=stream,
)
name = name['text'].decode('utf-8').strip(' \r\t\n\0')
name = name.split('\\')[0]
channel.name = name
else:
channel = grp['channels'][ch_nr]
conversion = grp['channel_conversions'][ch_nr]
name = channel.name
dependency_list = grp['channel_dependencies'][ch_nr]
cycles_nr = grp['channel_group']['cycles_nr']
# get data group record
try:
parents, dtypes = grp['parents'], grp['types']
except KeyError:
grp['parents'], grp['types'] = self._prepare_record(grp)
parents, dtypes = grp['parents'], grp['types']
# get group data
if data is None:
data = self._load_group_data(grp)
info = None
# get the channel signal data if available
signal_data = self._load_signal_data(
channel['data_block_addr']
)
# check if this is a channel array
if dependency_list:
arrays = []
name = channel.name
if all(isinstance(dep, Channel) for dep in dependency_list):
# structure channel composition
if memory == 'minimum':
names = []
# TODO : get exactly he group and chanenl
for address in dependency_list:
channel = Channel(
address=address,
stream=stream,
)
block = TextBlock(
address=channel['name_addr'],
stream=stream,
)
name_ = block['text'].decode('utf-8')
name_ = name_.split('\\')[0].strip(' \n\r\t\0')
names.append(name_)
else:
names = [ch.name for ch in dependency_list]
arrays = [
self.get(name_, samples_only=True)
for name_ in names
]
types = [(name_, arr.dtype) for name_, arr in zip(names, arrays)]
if PYVERSION == 2:
types = fix_dtype_fields(types)
types = dtype(types)
vals = fromarrays(arrays, dtype=types)
cycles_nr = len(vals)
else:
# channel arrays
arrays = []
types = []
try:
parent, bit_offset = parents[ch_nr]
except KeyError:
parent, bit_offset = None, None
if parent is not None:
if 'record' not in grp:
if dtypes.itemsize:
record = fromstring(data, dtype=dtypes)
else:
record = None
if self.memory == 'full':
grp['record'] = record
else:
record = grp['record']
vals = record[parent]
else:
vals = self._get_not_byte_aligned_data(data, grp, ch_nr)
dep = dependency_list[0]
if dep['flags'] & v4c.FLAG_CA_INVERSE_LAYOUT:
shape = vals.shape
shape = (shape[0],) + shape[1:][::-1]
vals = vals.reshape(shape)
axes = (0,) + tuple(range(len(shape) - 1, 0, -1))
vals = transpose(vals, axes=axes)
cycles_nr = len(vals)
for ca_block in dependency_list[:1]:
dims_nr = ca_block['dims']
if ca_block['ca_type'] == v4c.CA_TYPE_SCALE_AXIS:
shape = (ca_block['dim_size_0'], )
arrays.append(vals)
dtype_pair = channel.name, vals.dtype, shape
types.append(dtype_pair)
elif ca_block['ca_type'] == v4c.CA_TYPE_LOOKUP:
shape = vals.shape[1:]
arrays.append(vals)
dtype_pair = channel.name, vals.dtype, shape
types.append(dtype_pair)
if ca_block['flags'] & v4c.FLAG_CA_FIXED_AXIS:
for i in range(dims_nr):
shape = (ca_block['dim_size_{}'.format(i)], )
axis = []
for j in range(shape[0]):
key = 'axis_{}_value_{}'.format(i, j)
axis.append(ca_block[key])
axis = array([axis for _ in range(cycles_nr)])
arrays.append(axis)
dtype_pair = (
'axis_{}'.format(i),
axis.dtype,
shape,
)
types.append(dtype_pair)
else:
for i in range(dims_nr):
ch_nr, dg_nr = ca_block.referenced_channels[i]
if memory == 'minimum':
axisname = self.groups[dg_nr]['texts']['channels'][ch_nr]['name_addr']
block = TextBlock(address=axisname,
stream=self._file)
axisname = block['text'].decode('utf-8').strip(' \t\n\r\0')
axisname = axisname.split('\\')[0]
else:
axisname = self.groups[dg_nr]['channels'][ch_nr].name
shape = (ca_block['dim_size_{}'.format(i)], )
axis_values = self.get(
group=dg_nr,
index=ch_nr,
samples_only=True)
axis_values = axis_values[axisname]
arrays.append(axis_values)
dtype_pair = (
axisname,
axis_values.dtype,
shape,
)
types.append(dtype_pair)
elif ca_block['ca_type'] == v4c.CA_TYPE_ARRAY:
shape = vals.shape[1:]
arrays.append(vals)
dtype_pair = channel.name, vals.dtype, shape
types.append(dtype_pair)
for ca_block in dependency_list[1:]:
dims_nr = ca_block['dims']
if ca_block['flags'] & v4c.FLAG_CA_FIXED_AXIS:
for i in range(dims_nr):
shape = (ca_block['dim_size_{}'.format(i)], )
axis = []
for j in range(shape[0]):
key = 'axis_{}_value_{}'.format(i, j)
axis.append(ca_block[key])
axis = array([axis for _ in range(cycles_nr)])
arrays.append(axis)
dtype_pair = 'axis_{}'.format(i), axis.dtype, shape
types.append(dtype_pair)
else:
for i in range(dims_nr):
ch_nr, dg_nr = ca_block.referenced_channels[i]
if memory == 'minimum':
axisname = self.groups[dg_nr]['texts']['channels'][ch_nr]['name_addr']
block = TextBlock(address=axisname,
stream=self._file)
axisname = block['text'].decode('utf-8').strip(' \t\n\r\0')
axisname = axisname.split('\\')[0]
else:
axisname = self.groups[dg_nr]['channels'][ch_nr].name
shape = (ca_block['dim_size_{}'.format(i)], )
axis_values = self.get(
group=dg_nr,
index=ch_nr,
samples_only=True)
axis_values = axis_values[axisname]
arrays.append(axis_values)
dtype_pair = axisname, axis_values.dtype, shape
types.append(dtype_pair)
if PYVERSION == 2:
types = fix_dtype_fields(types)
vals = fromarrays(arrays, dtype(types))
else:
# get channel values
if channel['channel_type'] in (v4c.CHANNEL_TYPE_VIRTUAL,
v4c.CHANNEL_TYPE_VIRTUAL_MASTER):
data_type = channel['data_type']
ch_dtype = dtype(get_fmt(data_type, 8))
vals = arange(cycles_nr, dtype=ch_dtype)
else:
try:
parent, bit_offset = parents[ch_nr]
except KeyError:
parent, bit_offset = None, None
if parent is not None:
if 'record' not in grp:
if dtypes.itemsize:
record = fromstring(data, dtype=dtypes)
else:
record = None
if memory == 'full':
grp['record'] = record
else:
record = grp['record']
vals = record[parent]
bits = channel['bit_count']
size = vals.dtype.itemsize
data_type = channel['data_type']
if vals.dtype.kind not in 'ui' and (bit_offset or not bits == size * 8):
vals = self._get_not_byte_aligned_data(data, grp, ch_nr)
else:
if bit_offset:
dtype_ = vals.dtype
if dtype_.kind == 'i':
vals = vals.astype(dtype('<u{}'.format(size)))
vals >>= bit_offset
else:
vals = vals >> bit_offset
if not bits == size * 8:
mask = (1 << bits) - 1
if vals.flags.writeable:
vals &= mask
else:
vals = vals & mask
if data_type in v4c.SIGNED_INT:
size = vals.dtype.itemsize
mask = (1 << (size * 8)) - 1
mask = (mask << bits) & mask
vals |= mask
vals = vals.astype('<i{}'.format(size), copy=False)
else:
vals = self._get_not_byte_aligned_data(data, grp, ch_nr)
if cycles_nr:
if conversion is None:
conversion_type = v4c.CONVERSION_TYPE_NON
else:
conversion_type = conversion['conversion_type']
if conversion_type == v4c.CONVERSION_TYPE_NON:
# check if it is VLDS channel type with SDBLOCK
data_type = channel['data_type']
channel_type = channel['channel_type']
if channel_type in (v4c.CHANNEL_TYPE_VALUE,
v4c.CHANNEL_TYPE_MLSD):
if v4c.DATA_TYPE_STRING_LATIN_1 \
<= data_type \
<= v4c.DATA_TYPE_STRING_UTF_16_BE:
vals = [val.tobytes() for val in vals]
if data_type == v4c.DATA_TYPE_STRING_UTF_16_BE:
encoding = 'utf-16-be'
elif data_type == v4c.DATA_TYPE_STRING_UTF_16_LE:
encoding = 'utf-16-le'
elif data_type == v4c.DATA_TYPE_STRING_UTF_8:
encoding = 'utf-8'
elif data_type == v4c.DATA_TYPE_STRING_LATIN_1:
encoding = 'latin-1'
vals = array(
[x.decode(encoding).strip('\0') for x in vals]
)
vals = encode(vals, 'latin-1')
# CANopen date
elif data_type == v4c.DATA_TYPE_CANOPEN_DATE:
vals = vals.tostring()
types = dtype(
[('ms', '<u2'),
('min', '<u1'),
('hour', '<u1'),
('day', '<u1'),
('month', '<u1'),
('year', '<u1')]
)
dates = fromstring(vals, types)
arrays = []
arrays.append(dates['ms'])
# bit 6 and 7 of minutes are reserved
arrays.append(dates['min'] & 0x3F)
# only firt 4 bits of hour are used
arrays.append(dates['hour'] & 0xF)
# the first 4 bits are the day number
arrays.append(dates['day'] & 0xF)
# bit 6 and 7 of month are reserved
arrays.append(dates['month'] & 0x3F)
# bit 7 of year is reserved
arrays.append(dates['year'] & 0x7F)
# add summer or standard time information for hour
arrays.append((dates['hour'] & 0x80) >> 7)
# add day of week information
arrays.append((dates['day'] & 0xF0) >> 4)
names = [
'ms',
'min',
'hour',
'day',
'month',
'year',
'summer_time',
'day_of_week',
]
vals = fromarrays(arrays, names=names)
# CANopen time
elif data_type == v4c.DATA_TYPE_CANOPEN_TIME:
vals = vals.tostring()
types = dtype(
[('ms', '<u4'),
('days', '<u2')]
)
dates = fromstring(vals, types)
arrays = []
# bits 28 to 31 are reserverd for ms
arrays.append(dates['ms'] & 0xFFFFFFF)
arrays.append(dates['days'] & 0x3F)
names = ['ms', 'days']
vals = fromarrays(arrays, names=names)
# byte array
elif data_type == v4c.DATA_TYPE_BYTEARRAY:
vals = vals.tostring()
size = max(bits >> 3, 1)
vals = frombuffer(
vals,
dtype=dtype('({},)u1'.format(size)),
)
types = [(channel.name, vals.dtype, vals.shape[1:])]
if PYVERSION == 2:
types = fix_dtype_fields(types)
types = dtype(types)
arrays = [vals, ]
vals = fromarrays(arrays, dtype=types)
elif channel_type == v4c.CHANNEL_TYPE_VLSD:
if signal_data:
values = []
for offset in vals:
offset = int(offset)
str_size = unpack_from('<I', signal_data, offset)[0]
values.append(
signal_data[offset+4: offset+4+str_size]
)
if data_type == v4c.DATA_TYPE_STRING_UTF_16_BE:
vals = [v.decode('utf-16-be') for v in values]
elif data_type == v4c.DATA_TYPE_STRING_UTF_16_LE:
vals = [v.decode('utf-16-le') for v in values]
elif data_type == v4c.DATA_TYPE_STRING_UTF_8:
vals = [v.decode('utf-8') for v in values]
elif data_type == v4c.DATA_TYPE_STRING_LATIN_1:
vals = [v.decode('latin-1') for v in values]
if PYVERSION == 2:
vals = array([str(val) for val in vals])
else:
vals = array(vals)
vals = encode(vals, 'latin-1')
else:
# no VLSD signal data samples
vals = array([])
elif conversion_type == v4c.CONVERSION_TYPE_LIN:
a = conversion['a']
b = conversion['b']
if (a, b) != (1, 0):
vals = vals * a
if b:
vals += b
elif conversion_type == v4c.CONVERSION_TYPE_RAT:
P1 = conversion['P1']
P2 = conversion['P2']
P3 = conversion['P3']
P4 = conversion['P4']
P5 = conversion['P5']
P6 = conversion['P6']
if (P1, P2, P3, P4, P5, P6) != (0, 1, 0, 0, 0, 1):
X = vals
vals = evaluate(v4c.CONV_RAT_TEXT)
elif conversion_type == v4c.CONVERSION_TYPE_ALG:
if not memory == 'minimum':
block = grp['texts']['conversions'][ch_nr]['formula_addr']
formula = block['text'].decode('utf-8').strip(' \n\t\0')
else:
block = TextBlock(
address=grp['texts']['conversions'][ch_nr]['formula_addr'],
stream=self._file,
)
formula = block['text'].decode('utf-8').strip(' \n\t\0')
X = vals
vals = evaluate(formula)
elif conversion_type in (v4c.CONVERSION_TYPE_TABI,
v4c.CONVERSION_TYPE_TAB):
nr = conversion['val_param_nr'] // 2
raw = array(
[conversion['raw_{}'.format(i)] for i in range(nr)]
)
phys = array(
[conversion['phys_{}'.format(i)] for i in range(nr)]
)
if conversion_type == v4c.CONVERSION_TYPE_TABI:
vals = interp(vals, raw, phys)
else:
idx = searchsorted(raw, vals)
idx = clip(idx, 0, len(raw) - 1)
vals = phys[idx]
elif conversion_type == v4c.CONVERSION_TYPE_RTAB:
nr = (conversion['val_param_nr'] - 1) // 3
lower = array(
[conversion['lower_{}'.format(i)] for i in range(nr)]
)
upper = array(
[conversion['upper_{}'.format(i)] for i in range(nr)]
)
phys = array(
[conversion['phys_{}'.format(i)] for i in range(nr)]
)
default = conversion['default']
# INT channel
if channel['data_type'] <= 3:
res = []
for v in vals:
for l, u, p in zip(lower, upper, phys):
if l <= v <= u:
res.append(p)
break
else:
res.append(default)
size = max(bits >> 3, 1)
ch_fmt = get_fmt(channel['data_type'], size)
vals = array(res).astype(ch_fmt)
# else FLOAT channel
else:
res = []
for v in vals:
for l, u, p in zip(lower, upper, phys):
if l <= v < u:
res.append(p)
break
else:
res.append(default)
size = max(bits >> 3, 1)
ch_fmt = get_fmt(channel['data_type'], size)
vals = array(res).astype(ch_fmt)
elif conversion_type == v4c.CONVERSION_TYPE_TABX:
nr = conversion['val_param_nr']
raw = array(
[conversion['val_{}'.format(i)] for i in range(nr)]
)
if not memory == 'minimum':
phys = array(
[grp['texts']['conversion_tab'][ch_nr]['text_{}'.format(i)]['text']
for i in range(nr)]
)
default = grp['texts']['conversion_tab'][ch_nr]\
.get('default_addr', {})\
.get('text', b'')
else:
phys = []
for i in range(nr):
address=grp['texts']['conversion_tab'][ch_nr]['text_{}'.format(i)]
if address:
block = TextBlock(
address=address,
stream=self._file,
)
phys.append(block['text'])
else:
phys.append(b'')
phys = array(phys)
if grp['texts']['conversion_tab'][ch_nr].get('default_addr', 0):
block = TextBlock(
address=grp['texts']['conversion_tab'][ch_nr]['default_addr'],
stream=self._file,
)
default = block['text']
else:
default = b''
info = {
'raw': raw,
'phys': phys,
'default': default,
}
elif conversion_type == v4c.CONVERSION_TYPE_RTABX:
nr = conversion['val_param_nr'] // 2
if not memory == 'minimum':
phys = array(
[grp['texts']['conversion_tab'][ch_nr]['text_{}'.format(i)]['text']
for i in range(nr)]
)
default = grp['texts']['conversion_tab'][ch_nr]\
.get('default_addr', {})\
.get('text', b'')
else:
phys = []
for i in range(nr):
address=grp['texts']['conversion_tab'][ch_nr]['text_{}'.format(i)]
if address:
block = TextBlock(
address=address,
stream=self._file,
)
phys.append(block['text'])
else:
phys.append(b'')
phys = array(phys)
if grp['texts']['conversion_tab'][ch_nr].get('default_addr', 0):
block = TextBlock(
address=grp['texts']['conversion_tab'][ch_nr]['default_addr'],
stream=self._file,
)
default = block['text']
else:
default = b''
lower = array(
[conversion['lower_{}'.format(i)] for i in range(nr)]
)
upper = array(
[conversion['upper_{}'.format(i)] for i in range(nr)]
)
info = {
'lower': lower,
'upper': upper,
'phys': phys,
'default': default,
}
elif conversion_type == v4c.CONVERSION_TYPE_TTAB:
nr = conversion['val_param_nr'] - 1
if memory == 'minimum':
raw = []
for i in range(nr):
block = TextBlock(
address=grp['texts']['conversion_tab'][ch_nr]['text_{}'.format(i)],
stream=stream,
)
raw.append(block['text'])
raw = array(raw)
else:
raw = array(
[grp['texts']['conversion_tab'][ch_nr]['text_{}'.format(i)]['text']
for i in range(nr)]
)
phys = array(
[conversion['val_{}'.format(i)] for i in range(nr)]
)
default = conversion['val_default']
info = {
'raw': raw,
'phys': phys,
'default': default,
}
elif conversion_type == v4c.CONVERSION_TYPE_TRANS:
nr = (conversion['ref_param_nr'] - 1) // 2
if memory == 'minimum':
in_ = []
for i in range(nr):
block = TextBlock(
address=grp['texts']['conversion_tab'][ch_nr]['input_{}_addr'.format(i)],
stream=stream,
)
in_.append(block['text'])
in_ = array(in_)
out_ = []
for i in range(nr):
block = TextBlock(
address=grp['texts']['conversion_tab'][ch_nr]['output_{}_addr'.format(i)],
stream=stream,
)
out_.append(block['text'])
out_ = array(out_)
block = TextBlock(
address=grp['texts']['conversion_tab'][ch_nr]['default_addr'],
stream=stream,
)
default = block['text']
else:
in_ = array(
[grp['texts']['conversion_tab'][ch_nr]['input_{}_addr'.format(i)]['text']
for i in range(nr)]
)
out_ = array(
[grp['texts']['conversion_tab'][ch_nr]['output_{}_addr'.format(i)]['text']
for i in range(nr)]
)
default = grp['texts']['conversion_tab'][ch_nr]['default_addr']['text']
res = []
for v in vals:
for i, o in zip(in_, out_):
if v == i:
res.append(o)
break
else:
res.append(default)
vals = array(res)
info = {
'input': in_,
'output': out_,
'default': default,
}
# in case of invalidation bits, valid_index will hold the valid indexes
valid_index = None
if grp['channel_group']['invalidation_bytes_nr']:
if channel['flags'] & (v4c.FLAG_INVALIDATION_BIT_VALID | v4c.FLAG_ALL_SAMPLES_VALID) == v4c.FLAG_INVALIDATION_BIT_VALID:
ch_invalidation_pos = channel['pos_invalidation_bit']
pos_byte, pos_offset = divmod(ch_invalidation_pos)
mask = 1 << pos_offset
inval_bytes = record['invalidation_bytes']
inval_index = array(
[bytes_[pos_byte] & mask for bytes_ in inval_bytes]
)
valid_index = argwhere(inval_index == 0).flatten()
vals = vals[valid_index]
if samples_only:
res = vals
else:
# search for unit in conversion texts
conv_texts = grp['texts']['conversions'][ch_nr]
channel_texts = grp['texts']['channels'][ch_nr]
if conv_texts and 'unit_addr' in conv_texts:
if not memory == 'minimum':
unit = conv_texts['unit_addr']
else:
unit = TextBlock(
address=conv_texts['unit_addr'],
stream=stream,
)
if PYVERSION == 3:
try:
unit = unit['text'].decode('utf-8').strip(' \n\t\0')
except UnicodeDecodeError:
unit = ''
else:
unit = unit['text'].strip(' \n\t\0')
else:
# search for physical unit in channel texts
if 'unit_addr' in channel_texts:
if not memory == 'minimum':
unit = channel_texts['unit_addr']
else:
unit = TextBlock(
address=channel_texts['unit_addr'],
stream=stream,
)
if PYVERSION == 3:
unit = unit['text'].decode('utf-8').strip(' \n\t\0')
else:
unit = unit['text'].strip(' \n\t\0')
else:
unit = ''
# get the channel commment if available
if 'comment_addr' in channel_texts:
if memory == 'minimum':
comment = TextBlock(
address=channel_texts['comment_addr'],
stream=stream,
)
else:
comment = channel_texts['comment_addr']
if comment['id'] == b'##MD':
comment = comment['text'].decode('utf-8').strip(' \n\t\0')
try:
comment = XML.fromstring(comment).find('TX').text
except:
comment = ''
else:
comment = comment['text'].decode('utf-8')
else:
comment = ''
t = self.get_master(gp_nr, data)
# consider invalidation bits
if valid_index is not None:
t = t[valid_index]
res = Signal(
samples=vals,
timestamps=t,
unit=unit,
name=name,
comment=comment,
info=info,
)
if raster and t:
tx = linspace(0, t[-1], int(t[-1] / raster))
res = res.interp(tx)
return res
[docs] def get_master(self, index, data=None):
""" returns master channel samples for given group
Parameters
----------
index : int
group index
data : bytes
data block raw bytes; default None
Returns
-------
t : numpy.array
master channel samples
"""
try:
return self._master_channel_cache[index]
except KeyError:
pass
group = self.groups[index]
if group['data_location'] == v4c.LOCATION_ORIGINAL_FILE:
stream = self._file
else:
stream = self._tempfile
memory = self.memory
time_ch_nr = self.masters_db.get(index, None)
cycles_nr = group['channel_group']['cycles_nr']
if time_ch_nr is None:
t = arange(cycles_nr, dtype=float64)
else:
time_conv = group['channel_conversions'][time_ch_nr]
if memory == 'minimum':
if time_conv:
time_conv = ChannelConversion(
address=group['channel_conversions'][time_ch_nr],
stream=stream,
)
else:
time_conv = None
time_ch = Channel(
address=group['channels'][time_ch_nr],
stream=stream,
)
else:
time_ch = group['channels'][time_ch_nr]
if time_ch['channel_type'] == v4c.CHANNEL_TYPE_VIRTUAL_MASTER:
time_a = time_conv['a']
time_b = time_conv['b']
t = arange(cycles_nr, dtype=float64) * time_a + time_b
else:
# get data group parents and dtypes
try:
parents, dtypes = group['parents'], group['types']
except KeyError:
parents, dtypes = self._prepare_record(group)
group['parents'], group['types'] = parents, dtypes
# get data
if data is None:
data = self._load_group_data(group)
try:
parent, _ = parents[time_ch_nr]
except KeyError:
parent = None
if parent is not None:
not_found = object()
record = group.get('record', not_found)
if record is not_found:
if dtypes.itemsize:
record = fromstring(data, dtype=dtypes)
else:
record = None
if memory == 'full':
group['record'] = record
t = record[parent]
else:
t = self._get_not_byte_aligned_data(
data, group,
time_ch_nr,
)
# get timestamps
if time_conv:
if time_conv['conversion_type'] == v4c.CONVERSION_TYPE_LIN:
time_a = time_conv['a']
time_b = time_conv['b']
t = t * time_a
if time_b:
t += time_b
self._master_channel_cache[index] = t
return t
[docs] def info(self):
"""get MDF information as a dict
Examples
--------
>>> mdf = MDF4('test.mdf')
>>> mdf.info()
"""
info = {}
info['version'] = self.identification['version_str']\
.decode('utf-8')\
.strip(' \n\t\0')
info['groups'] = len(self.groups)
for i, gp in enumerate(self.groups):
if gp['data_location'] == v4c.LOCATION_ORIGINAL_FILE:
stream = self._file
elif gp['data_location'] == v4c.LOCATION_TEMPORARY_FILE:
stream = self._tempfile
inf = {}
info['group {}'.format(i)] = inf
inf['cycles'] = gp['channel_group']['cycles_nr']
inf['channels count'] = len(gp['channels'])
for j, channel in enumerate(gp['channels']):
if self.memory == 'minimum':
channel = Channel(
address=channel,
stream=stream,
)
name = TextBlock(
address=gp['texts']['channels'][j]['name_addr'],
stream=stream,
)
name = name['text'].decode('utf-8').strip(' \r\t\n\0')
name = name.split('\\')[0]
channel.name = name
else:
name = channel.name
ch_type = v4c.CHANNEL_TYPE_TO_DESCRIPTION[channel['channel_type']]
inf['channel {}'.format(j)] = 'name="{}" type={}'.format(name, ch_type)
return info
[docs] def save(self, dst='', overwrite=None, compression=0):
"""Save MDF to *dst*. If *dst* is not provided the the destination file
name is the MDF name. If overwrite is *True* then the destination file
is overwritten, otherwise the file name is appened with '_<cntr>', were
'<cntr>' is the first conter that produces a new file name
(that does not already exist in the filesystem)
Parameters
----------
dst : str
destination file name, Default ''
overwrite : bool
overwrite flag, default *False*
compression : int
use compressed data blocks, default 0; valid since version 4.10
* 0 - no compression
* 1 - deflate (slower, but produces smaller files)
* 2 - transposition + deflate (slowest, but produces
the smallest files)
"""
if overwrite is None:
overwrite = self._overwrite
if self.name is None and dst == '':
message = ('Must specify a destination file name '
'for MDF created from scratch')
raise MdfException(message)
else:
if self.memory == 'minimum':
self._save_without_metadata(dst, overwrite, compression)
else:
self._save_with_metadata(dst, overwrite, compression)
def _save_with_metadata(self, dst, overwrite, compression):
"""Save MDF to *dst*. If *dst* is not provided the the destination file
name is the MDF name. If overwrite is *True* then the destination file
is overwritten, otherwise the file name is appened with '_<cntr>', were
'<cntr>' is the first conter that produces a new file name
(that does not already exist in the filesystem)
Parameters
----------
dst : str
destination file name, Default ''
overwrite : bool
overwrite flag, default *False*
compression : int
use compressed data blocks, default 0; valid since version 4.10
* 0 - no compression
* 1 - deflate (slower, but produces smaller files)
* 2 - transposition + deflate (slowest, but produces
the smallest files)
"""
if self.name is None and dst == '':
message = ('Must specify a destination file name '
'for MDF created from scratch')
raise MdfException(message)
dst = dst if dst else self.name
if overwrite is False:
if os.path.isfile(dst):
cntr = 0
while True:
name = os.path.splitext(dst)[0] + '_{}.mf4'.format(cntr)
if not os.path.isfile(name):
break
else:
cntr += 1
message = ('Destination file "{}" already exists '
'and "overwrite" is False. Saving MDF file as "{}"')
message = message.format(dst, name)
warnings.warn(message)
dst = name
if not self.file_history:
comment = 'created'
else:
comment = 'updated'
text = """<FHcomment>
<TX>{}</TX>
<tool_id>asammdf</tool_id>
<tool_vendor>asammdf</tool_vendor>
<tool_version>{}</tool_version>
</FHcomment>"""
text = text.format(comment, __version__)
self.file_history.append(
[FileHistory(),
TextBlock(text=text, meta=True)]
)
if self.memory == 'low' and dst == self.name:
destination = dst + '.temp'
else:
destination = dst
with open(destination, 'wb+') as dst_:
defined_texts = {}
write = dst_.write
tell = dst_.tell
seek = dst_.seek
write(bytes(self.identification))
write(bytes(self.header))
original_data_addresses = []
if compression == 1:
zip_type = v4c.FLAG_DZ_DEFLATE
else:
zip_type = v4c.FLAG_DZ_TRANPOSED_DEFLATE
# write DataBlocks first
for gp in self.groups:
original_data_addresses.append(
gp['data_group']['data_block_addr']
)
address = tell()
data = self._load_group_data(gp)
if MDF4._split_data_blocks:
samples_size = gp['channel_group']['samples_byte_nr']
split_size = MDF4._split_threshold // samples_size
split_size *= samples_size
chunks = len(data) / split_size
chunks = ceil(chunks)
else:
chunks = 1
if chunks == 1:
if compression and self.version != '4.00':
if compression == 1:
param = 0
else:
param = gp['channel_group']['samples_byte_nr']
kargs = {
'data': data,
'zip_type': zip_type,
'param': param,
}
data_block = DataZippedBlock(**kargs)
else:
data_block = DataBlock(data=data)
write(bytes(data_block))
align = data_block['block_len'] % 8
if align:
write(b'\0' * (8-align))
if gp['channel_group']['cycles_nr']:
gp['data_group']['data_block_addr'] = address
else:
gp['data_group']['data_block_addr'] = 0
else:
kargs = {
'flags': v4c.FLAG_DL_EQUAL_LENGHT,
'zip_type': zip_type,
}
hl_block = HeaderList(**kargs)
kargs = {
'flags': v4c.FLAG_DL_EQUAL_LENGHT,
'links_nr': chunks + 1,
'data_block_nr': chunks,
'data_block_len': split_size,
}
dl_block = DataList(**kargs)
data_blocks = []
for i in range(chunks):
data_ = data[i*split_size: (i+1)*split_size]
if compression and self.version != '4.00':
if compression == 1:
zip_type = v4c.FLAG_DZ_DEFLATE
else:
zip_type = v4c.FLAG_DZ_TRANPOSED_DEFLATE
if compression == 1:
param = 0
else:
param = gp['channel_group']['samples_byte_nr']
kargs = {
'data': data_,
'zip_type': zip_type,
'param': param,
}
block = DataZippedBlock(**kargs)
else:
block = DataBlock(data=data_)
address = tell()
block.address = address
data_blocks.append(block)
write(bytes(block))
align = block['block_len'] % 8
if align:
write(b'\0' * (8-align))
dl_block['data_block_addr{}'.format(i)] = address
address = tell()
dl_block.address = address
write(bytes(dl_block))
if compression and self.version != '4.00':
hl_block['first_dl_addr'] = address
address = tell()
hl_block.address = address
write(bytes(hl_block))
gp['data_group']['data_block_addr'] = address
address = tell()
blocks = []
if self.file_comment:
self.file_comment.address = address
address += self.file_comment['block_len']
blocks.append(self.file_comment)
# attachemnts
if self.attachments:
for at_block, texts in self.attachments:
for key, text in texts.items():
at_block[key] = text.address = address
address += text['block_len']
blocks.append(text)
for at_block, texts in self.attachments:
at_block.address = address
blocks.append(at_block)
align = at_block['block_len'] % 8
# append 8vyte alignemnt bytes for attachments
if align % 8:
blocks.append(b'\0' * (8 - align))
address += at_block['block_len'] + 8 - align
else:
address += at_block['block_len']
for i, (at_block, text) in enumerate(self.attachments[:-1]):
at_block['next_at_addr'] = self.attachments[i+1][0].address
self.attachments[-1][0]['next_at_addr'] = 0
# file history blocks
for i, (fh, fh_text) in enumerate(self.file_history):
fh_text.address = address
blocks.append(fh_text)
address += fh_text['block_len']
fh['comment_addr'] = fh_text.address
for i, (fh, fh_text) in enumerate(self.file_history):
fh.address = address
address += fh['block_len']
blocks.append(fh)
for i, (fh, fh_text) in enumerate(self.file_history[:-1]):
fh['next_fh_addr'] = self.file_history[i+1][0].address
self.file_history[-1][0]['next_fh_addr'] = 0
# data groups
for gp in self.groups:
gp['data_group'].address = address
address += gp['data_group']['block_len']
blocks.append(gp['data_group'])
gp['data_group']['comment_addr'] = 0
for i, dg in enumerate(self.groups[:-1]):
addr_ = self.groups[i+1]['data_group'].address
dg['data_group']['next_dg_addr'] = addr_
self.groups[-1]['data_group']['next_dg_addr'] = 0
tab_conversion = (
v4c.CONVERSION_TYPE_TABX,
v4c.CONVERSION_TYPE_RTABX,
v4c.CONVERSION_TYPE_TTAB,
v4c.CONVERSION_TYPE_TRANS,
)
# go through each data group and append the rest of the blocks
for i, gp in enumerate(self.groups):
# write TXBLOCK's
for item_list in gp['texts'].values():
for dict_ in item_list:
if dict_ is None:
continue
for key, tx_block in dict_.items():
# text blocks can be shared
text = tx_block['text']
if text in defined_texts:
tx_block.address = defined_texts[text]
else:
defined_texts[text] = address
tx_block.address = address
address += tx_block['block_len']
blocks.append(tx_block)
# channel conversions
for j, conv in enumerate(gp['channel_conversions']):
if conv:
conv.address = address
conv_texts = gp['texts']['conversions'][j]
if conv_texts:
for key, text_block in conv_texts.items():
conv[key] = text_block.address
conv['inv_conv_addr'] = 0
if conv['conversion_type'] in tab_conversion:
for key in gp['texts']['conversion_tab'][j]:
conv[key] = gp['texts']['conversion_tab'][j][key].address
address += conv['block_len']
blocks.append(conv)
# channel sources
for j, source in enumerate(gp['channel_sources']):
if source:
source.address = address
source_texts = gp['texts']['sources'][j]
for key in ('name_addr', 'path_addr', 'comment_addr'):
if source_texts and key in source_texts:
source[key] = source_texts[key].address
else:
source[key] = 0
address += source['block_len']
blocks.append(source)
# channel data
gp_sd = gp['signal_data'] = []
for j, ch in enumerate(gp['channels']):
signal_data = self._load_signal_data(ch['data_block_addr'])
if signal_data:
signal_data = SignalDataBlock(data=signal_data)
signal_data.address = address
address += signal_data['block_len']
blocks.append(signal_data)
align = signal_data['block_len'] % 8
if align % 8:
blocks.append(b'\0' * (8 - align))
address += 8 - align
gp_sd.append(signal_data)
else:
gp_sd.append(None)
# channel dependecies
for j, dep_list in enumerate(gp['channel_dependencies']):
if dep_list:
if all(isinstance(dep, ChannelArrayBlock)
for dep in dep_list):
for dep in dep_list:
dep.address = address
address += dep['block_len']
blocks.append(dep)
for k, dep in enumerate(dep_list[:-1]):
dep['composition_addr'] = dep_list[k+1].address
dep_list[-1]['composition_addr'] = 0
# channels
for j, (channel, signal_data) in enumerate(zip(gp['channels'], gp['signal_data'])):
channel.address = address
channel_texts = gp['texts']['channels'][j]
address += channel['block_len']
blocks.append(channel)
for key in ('comment_addr', 'unit_addr'):
if key in channel_texts:
channel[key] = channel_texts[key].address
else:
channel[key] = 0
channel['name_addr'] = channel_texts['name_addr'].address
if not gp['channel_conversions'][j]:
channel['conversion_addr'] = 0
else:
addr_ = gp['channel_conversions'][j].address
channel['conversion_addr'] = addr_
if gp['channel_sources'][j]:
addr_ = gp['channel_sources'][j].address
channel['source_addr'] = addr_
else:
channel['source_addr'] = 0
if signal_data:
channel['data_block_addr'] = signal_data.address
else:
channel['data_block_addr'] = 0
if gp['channel_dependencies'][j]:
addr_ = gp['channel_dependencies'][j][0].address
channel['component_addr'] = addr_
group_channels = gp['channels']
if group_channels:
for j, channel in enumerate(group_channels[:-1]):
channel['next_ch_addr'] = group_channels[j+1].address
group_channels[-1]['next_ch_addr'] = 0
# channel group
gp['channel_group'].address = address
gp['channel_group']['first_ch_addr'] = gp['channels'][0].address
gp['channel_group']['next_cg_addr'] = 0
cg_texts = gp['texts']['channel_group'][0]
for key in ('acq_name_addr', 'comment_addr'):
if cg_texts and key in cg_texts:
addr_ = gp['texts']['channel_group'][0][key].address
gp['channel_group'][key] = addr_
gp['channel_group']['acq_source_addr'] = 0
gp['data_group']['first_cg_addr'] = address
address += gp['channel_group']['block_len']
blocks.append(gp['channel_group'])
for gp in self.groups:
for dep_list in gp['channel_dependencies']:
if dep_list:
if all(isinstance(dep, ChannelArrayBlock) for dep in dep_list):
for dep in dep_list:
for i, (ch_nr, gp_nr) in enumerate(dep.referenced_channels):
grp = self.groups[gp_nr]
ch = grp['channels'][ch_nr]
dep['scale_axis_{}_dg_addr'.format(i)] = grp['data_group'].address
dep['scale_axis_{}_cg_addr'.format(i)] = grp['channel_group'].address
dep['scale_axis_{}_ch_addr'.format(i)] = ch.address
for block in blocks:
write(bytes(block))
if self.groups:
addr_ = self.groups[0]['data_group'].address
self.header['first_dg_addr'] = addr_
else:
self.header['first_dg_addr'] = 0
self.header['file_history_addr'] = self.file_history[0][0].address
if self.attachments:
addr_ = self.attachments[0][0].address
self.header['first_attachment_addr'] = addr_
else:
self.header['first_attachment_addr'] = 0
if self.file_comment:
self.header['comment_addr'] = self.file_comment.address
else:
self.header['comment_addr'] = 0
seek(v4c.IDENTIFICATION_BLOCK_SIZE, v4c.SEEK_START)
write(bytes(self.header))
for orig_addr, gp in zip(original_data_addresses, self.groups):
gp['data_group']['data_block_addr'] = orig_addr
if self.memory == 'low' and dst == self.name:
self.close()
os.remove(self.name)
os.rename(destination, self.name)
self.groups = []
self.header = None
self.identification = None
self.file_history = []
self.channels_db = {}
self.masters_db = {}
self.attachments = []
self.file_comment = None
self._ch_map = {}
self._master_channel_cache = {}
self._tempfile = TemporaryFile()
self._file = open(self.name, 'rb')
self._read()
def _save_without_metadata(self, dst, overwrite, compression):
"""Save MDF to *dst*. If *dst* is not provided the the destination file
name is the MDF name. If overwrite is *True* then the destination file
is overwritten, otherwise the file name is appened with '_<cntr>', were
'<cntr>' is the first conter that produces a new file name
(that does not already exist in the filesystem)
Parameters
----------
dst : str
destination file name, Default ''
overwrite : bool
overwrite flag, default *False*
compression : int
use compressed data blocks, default 0; valid since version 4.10
* 0 - no compression
* 1 - deflate (slower, but produces smaller files)
* 2 - transposition + deflate (slowest, but produces
the smallest files)
"""
if self.name is None and dst == '':
message = ('Must specify a destination file name '
'for MDF created from scratch')
raise MdfException(message)
dst = dst if dst else self.name
if overwrite is False:
if os.path.isfile(dst):
cntr = 0
while True:
name = os.path.splitext(dst)[0] + '_{}.mf4'.format(cntr)
if not os.path.isfile(name):
break
else:
cntr += 1
message = ('Destination file "{}" already exists '
'and "overwrite" is False. Saving MDF file as "{}"')
message = message.format(dst, name)
warnings.warn(message)
dst = name
if not self.file_history:
comment = 'created'
else:
comment = 'updated'
text = """<FHcomment>
<TX>{}</TX>
<tool_id>asammdf</tool_id>
<tool_vendor>asammdf</tool_vendor>
<tool_version>{}</tool_version>
</FHcomment>"""
text = text.format(comment, __version__)
self.file_history.append(
[FileHistory(),
TextBlock(text=text, meta=True)]
)
if dst == self.name:
destination = dst + '.temp'
else:
destination = dst
with open(destination, 'wb+') as dst_:
defined_texts = {}
write = dst_.write
tell = dst_.tell
seek = dst_.seek
write(bytes(self.identification))
write(bytes(self.header))
original_data_addresses = []
if compression == 1:
zip_type = v4c.FLAG_DZ_DEFLATE
else:
zip_type = v4c.FLAG_DZ_TRANPOSED_DEFLATE
# write DataBlocks first
for gp in self.groups:
original_data_addresses.append(
gp['data_group']['data_block_addr']
)
address = tell()
data = self._load_group_data(gp)
if MDF4._split_data_blocks:
samples_size = gp['channel_group']['samples_byte_nr']
split_size = MDF4._split_threshold // samples_size
split_size *= samples_size
chunks = len(data) / split_size
chunks = ceil(chunks)
else:
chunks = 1
if chunks == 1:
if compression and self.version != '4.00':
if compression == 1:
param = 0
else:
param = gp['channel_group']['samples_byte_nr']
kargs = {
'data': data,
'zip_type': zip_type,
'param': param,
}
data_block = DataZippedBlock(**kargs)
else:
data_block = DataBlock(data=data)
write(bytes(data_block))
align = data_block['block_len'] % 8
if align:
write(b'\0' * (8-align))
if gp['channel_group']['cycles_nr']:
gp['data_group']['data_block_addr'] = address
else:
gp['data_group']['data_block_addr'] = 0
else:
kargs = {
'flags': v4c.FLAG_DL_EQUAL_LENGHT,
'zip_type': zip_type,
}
hl_block = HeaderList(**kargs)
kargs = {
'flags': v4c.FLAG_DL_EQUAL_LENGHT,
'links_nr': chunks + 1,
'data_block_nr': chunks,
'data_block_len': split_size,
}
dl_block = DataList(**kargs)
data_blocks = []
for i in range(chunks):
data_ = data[i*split_size: (i+1)*split_size]
if compression and self.version != '4.00':
if compression == 1:
zip_type = v4c.FLAG_DZ_DEFLATE
else:
zip_type = v4c.FLAG_DZ_TRANPOSED_DEFLATE
if compression == 1:
param = 0
else:
param = gp['channel_group']['samples_byte_nr']
kargs = {
'data': data_,
'zip_type': zip_type,
'param': param,
}
block = DataZippedBlock(**kargs)
else:
block = DataBlock(data=data_)
address = tell()
block.address = address
data_blocks.append(block)
write(bytes(block))
align = block['block_len'] % 8
if align:
write(b'\0' * (8-align))
dl_block['data_block_addr{}'.format(i)] = address
address = tell()
dl_block.address = address
write(bytes(dl_block))
if compression and self.version != '4.00':
hl_block['first_dl_addr'] = address
address = tell()
hl_block.address = address
write(bytes(hl_block))
gp['data_group']['data_block_addr'] = address
address = tell()
if self.file_comment:
address = tell()
self.file_comment.address = address
write(bytes(self.file_comment))
# attachemnts
address = tell()
blocks = []
if self.attachments:
for at_block, texts in self.attachments:
for key, text in texts.items():
at_block[key] = text.address = address
address += text['block_len']
blocks.append(text)
for at_block, texts in self.attachments:
at_block.address = address
blocks.append(at_block)
align = at_block['block_len'] % 8
# append 8vyte alignemnt bytes for attachments
if align % 8:
blocks.append(b'\0' * (8 - align))
address += at_block['block_len'] + 8 - align
else:
address += at_block['block_len']
for i, (at_block, text) in enumerate(self.attachments[:-1]):
at_block['next_at_addr'] = self.attachments[i+1][0].address
self.attachments[-1][0]['next_at_addr'] = 0
# file history blocks
for i, (fh, fh_text) in enumerate(self.file_history):
fh_text.address = address
blocks.append(fh_text)
address += fh_text['block_len']
fh['comment_addr'] = fh_text.address
for i, (fh, fh_text) in enumerate(self.file_history):
fh.address = address
address += fh['block_len']
blocks.append(fh)
for i, (fh, fh_text) in enumerate(self.file_history[:-1]):
fh['next_fh_addr'] = self.file_history[i+1][0].address
self.file_history[-1][0]['next_fh_addr'] = 0
for blk in blocks:
write(bytes(blk))
blocks = []
tab_conversion = (
v4c.CONVERSION_TYPE_TABX,
v4c.CONVERSION_TYPE_RTABX,
v4c.CONVERSION_TYPE_TTAB,
v4c.CONVERSION_TYPE_TRANS,
)
# go through each data group and append the rest of the blocks
for i, gp in enumerate(self.groups):
if gp['data_location'] == v4c.LOCATION_ORIGINAL_FILE:
stream = self._file
else:
stream = self._tempfile
temp_texts = deepcopy(gp['texts'])
# write TXBLOCK's
for item_list in temp_texts.values():
for dict_ in item_list:
if dict_ is None:
continue
for key, tx_block in dict_.items():
# text blocks can be shared
block = TextBlock(
address=tx_block,
stream=stream,
)
text = block['text']
if text in defined_texts:
dict_[key] = defined_texts[text]
else:
address = tell()
defined_texts[text] = address
dict_[key] = address
write(bytes(block))
# channel conversions
gp['temp_channel_conversions'] = []
for j, conv in enumerate(gp['channel_conversions']):
if conv:
address = tell()
gp['temp_channel_conversions'].append(address)
conv = ChannelConversion(
address=conv,
stream=stream,
)
conv.address = address
conv_texts = temp_texts['conversions'][j]
if conv_texts:
for key, text_block in conv_texts.items():
conv[key] = text_block
conv['inv_conv_addr'] = 0
if conv['conversion_type'] in tab_conversion:
for key in temp_texts['conversion_tab'][j]:
conv[key] = temp_texts['conversion_tab'][j][key]
write(bytes(conv))
else:
gp['temp_channel_conversions'].append(0)
# channel sources
gp['temp_channel_sources'] = []
for j, source in enumerate(gp['channel_sources']):
if source:
address = tell()
gp['temp_channel_sources'].append(address)
source_texts = temp_texts['sources'][j]
source = SourceInformation(
address=source,
stream=stream,
)
for key in ('name_addr', 'path_addr', 'comment_addr'):
if source_texts and key in source_texts:
source[key] = source_texts[key]
else:
source[key] = 0
write(bytes(source))
else:
gp['temp_channel_sources'].append(0)
# channel dependecies
for j, dep_list in enumerate(gp['channel_dependencies']):
if dep_list:
if all(isinstance(dep, ChannelArrayBlock)
for dep in dep_list):
for dep in dep_list:
address = tell()
dep.address = address
write(bytes(dep))
for k, dep in enumerate(dep_list[:-1]):
dep['composition_addr'] = dep_list[k+1].address
dep_list[-1]['composition_addr'] = 0
# channels
blocks = []
chans = []
address = tell()
gp['temp_channels'] = ch_addrs = []
gp['channel_group']['first_ch_addr'] = address
for j, channel in enumerate(gp['channels']):
channel = Channel(
address=channel,
stream=stream,
)
channel.address = address
channel_texts = temp_texts['channels'][j]
ch_addrs.append(address)
address += channel['block_len']
blocks.append(channel)
chans.append(channel)
for key in ('comment_addr', 'unit_addr'):
if key in channel_texts:
channel[key] = channel_texts[key]
else:
channel[key] = 0
channel['name_addr'] = channel_texts['name_addr']
channel['conversion_addr'] = gp['temp_channel_conversions'][j]
channel['source_addr'] = gp['temp_channel_sources'][j]
signal_data = self._load_signal_data(channel['data_block_addr'])
if signal_data:
signal_data = SignalDataBlock(data=signal_data)
blocks.append(signal_data)
channel['data_block_addr'] = address
address += signal_data['block_len']
align = signal_data['block_len'] % 8
if align % 8:
blocks.append(b'\0' * (8 - align))
address += 8 - align
else:
channel['data_block_addr'] = 0
if gp['channel_dependencies'][j]:
block = gp['channel_dependencies'][j][0]
if isinstance(block, int):
channel['component_addr'] = block
else:
channel['component_addr'] = block.address
group_channels = gp['channels']
if group_channels:
for j, channel in enumerate(chans[:-1]):
channel['next_ch_addr'] = chans[j+1].address
chans[-1]['next_ch_addr'] = 0
for block in blocks:
write(bytes(block))
blocks = []
chans = []
address = tell()
# channel group
gp['channel_group'].address = address
# gp['channel_group']['first_ch_addr'] = gp['channels'][0]
gp['channel_group']['next_cg_addr'] = 0
cg_texts = temp_texts['channel_group'][0]
for key in ('acq_name_addr', 'comment_addr'):
if cg_texts and key in cg_texts:
addr_ = temp_texts['channel_group'][0][key]
gp['channel_group'][key] = addr_
gp['channel_group']['acq_source_addr'] = 0
gp['data_group']['first_cg_addr'] = address
write(bytes(gp['channel_group']))
address = tell()
del gp['temp_channel_sources']
del gp['temp_channel_conversions']
temp_texts = None
blocks = []
address = tell()
# data groups
for gp in self.groups:
gp['data_group'].address = address
address += gp['data_group']['block_len']
blocks.append(gp['data_group'])
gp['data_group']['comment_addr'] = 0
for i, dg in enumerate(self.groups[:-1]):
addr_ = self.groups[i+1]['data_group'].address
dg['data_group']['next_dg_addr'] = addr_
self.groups[-1]['data_group']['next_dg_addr'] = 0
for block in blocks:
write(bytes(block))
if self.groups:
addr_ = self.groups[0]['data_group'].address
self.header['first_dg_addr'] = addr_
else:
self.header['first_dg_addr'] = 0
self.header['file_history_addr'] = self.file_history[0][0].address
if self.attachments:
addr_ = self.attachments[0][0].address
self.header['first_attachment_addr'] = addr_
else:
self.header['first_attachment_addr'] = 0
if self.file_comment:
self.header['comment_addr'] = self.file_comment.address
else:
self.header['comment_addr'] = 0
seek(v4c.IDENTIFICATION_BLOCK_SIZE, v4c.SEEK_START)
write(bytes(self.header))
for orig_addr, gp in zip(original_data_addresses, self.groups):
gp['data_group']['data_block_addr'] = orig_addr
for gp in self.groups:
for dep_list in gp['channel_dependencies']:
if dep_list:
if all(isinstance(dep, ChannelArrayBlock) for dep in dep_list):
for dep in dep_list:
for i, (ch_nr, gp_nr) in enumerate(dep.referenced_channels):
grp = self.groups[gp_nr]
stream.seek(0, v4c.SEEK_END)
dep['scale_axis_{}_dg_addr'.format(i)] = grp['data_group'].address
dep['scale_axis_{}_cg_addr'.format(i)] = grp['channel_group'].address
dep['scale_axis_{}_ch_addr'.format(i)] = grp['temp_channels'][ch_nr]
seek(dep.address, v4c.SEEK_START)
write(bytes(dep))
for gp in self.groups:
del gp['temp_channels']
if dst == self.name:
self.close()
os.remove(self.name)
os.rename(destination, self.name)
self.groups = []
self.header = None
self.identification = None
self.file_history = []
self.channels_db = {}
self.masters_db = {}
self.attachments = []
self.file_comment = None
self._ch_map = {}
self._master_channel_cache = {}
self._tempfile = TemporaryFile()
self._file = open(self.name, 'rb')
self._read()