# -*- coding: utf-8 -*-
"""
ASAM MDF version 4 file format module
"""
from __future__ import division, print_function
import os
import re
import sys
import time
import warnings
from collections import defaultdict
from copy import deepcopy
from functools import partial, reduce
from hashlib import md5
from math import ceil
from struct import unpack, unpack_from
from tempfile import TemporaryFile
from numexpr import evaluate
from numpy import (
arange,
argwhere,
array,
array_equal,
clip,
dtype,
flip,
float64,
frombuffer,
interp,
linspace,
ones,
packbits,
roll,
searchsorted,
transpose,
uint8,
union1d,
unpackbits,
zeros,
)
from numpy.core.defchararray import encode
from numpy.core.records import fromarrays, fromstring
from . import v4_constants as v4c
from .signal import Signal
from .utils import (
MdfException,
as_non_byte_sized_signed_int,
fix_dtype_fields,
fmt_to_datatype_v4,
get_fmt_v4,
get_min_max,
get_unique_name,
get_text_v4,
)
from .v4_blocks import (
AttachmentBlock,
Channel,
ChannelArrayBlock,
ChannelConversion,
ChannelGroup,
DataBlock,
DataGroup,
DataList,
DataZippedBlock,
FileHistory,
FileIdentificationBlock,
HeaderBlock,
HeaderList,
SignalDataBlock,
SourceInformation,
TextBlock,
)
from .version import __version__
MASTER_CHANNELS = (
v4c.CHANNEL_TYPE_MASTER,
v4c.CHANNEL_TYPE_VIRTUAL_MASTER,
)
TX = re.compile('<TX>(?P<text>(.|\n)+?)</TX>')
PYVERSION = sys.version_info[0]
if PYVERSION == 2:
# pylint: disable=W0622
from .utils import bytes
# pylint: enable=W0622
__all__ = ['MDF4', ]
[docs]class MDF4(object):
"""If the *name* exist it will be memorised otherwise an empty file will be
created that can be later saved to disk
Parameters
----------
name : string
mdf file name
memory : str
memory optimization option; default `full`
* if *full* the data group binary data block will be memorised in RAM
* if *low* the channel data is read from disk on request, and the
metadata is memorized into RAM
* if *minimum* only minimal data is memorized into RAM
version : string
mdf file version ('4.00', '4.10', '4.11'); default '4.10'
Attributes
----------
name : string
mdf file name
groups : list
list of data groups
header : HeaderBlock
mdf file header
file_history : list
list of (FileHistory, TextBlock) pairs
comment : TextBlock
mdf file comment
identification : FileIdentificationBlock
mdf file start block
memory : str
memory optimization option
version : str
mdf version
channels_db : dict
used for fast channel access by name; for each name key the value is a
list of (group index, channel index) tuples
masters_db : dict
used for fast master channel access; for each group index key the value
is the master channel index
"""
_compact_integers_on_append = False
_split_data_blocks = False
_split_threshold = 1 << 21
_overwrite = False
def __init__(self, name=None, memory='full', version='4.10'):
self.groups = []
self.header = None
self.identification = None
self.file_history = []
self.name = name
self.memory = memory
self.channels_db = {}
self.masters_db = {}
self.attachments = []
self.file_comment = None
self._ch_map = {}
self._master_channel_cache = {}
self._si_map = {}
self._cc_map = {}
# used for appending when memory=False
self._tempfile = TemporaryFile()
self._file = None
self._tempfile.write(b'\0')
if name:
self._file = open(self.name, 'rb')
self._read()
else:
self.header = HeaderBlock()
self.identification = FileIdentificationBlock(version=version)
self.version = version
def _check_finalised(self):
flags = self.identification['unfinalized_standard_flags']
if flags & 1:
message = ('Unfinalised file {}:'
'Update of cycle counters for CG/CA blocks required')
warnings.warn(message.format(self.name))
elif flags & 1 << 1:
message = ('Unfinalised file {}:'
'Update of cycle counters for SR blocks required')
warnings.warn(message.format(self.name))
elif flags & 1 << 2:
message = ('Unfinalised file {}:'
'Update of length for last DT block required')
warnings.warn(message.format(self.name))
elif flags & 1 << 3:
message = ('Unfinalised file {}:'
'Update of length for last RD block required')
warnings.warn(message.format(self.name))
elif flags & 1 << 4:
message = ('Unfinalised file {}:'
'Update of last DL block in each chained list'
'of DL blocks required')
warnings.warn(message.format(self.name))
elif flags & 1 << 5:
message = ('Unfinalised file {}:'
'Update of cg_data_bytes and cg_inval_bytes '
'in VLSD CG block required')
warnings.warn(message.format(self.name))
elif flags & 1 << 6:
message = ('Unfinalised file {}:'
'Update of offset values for VLSD channel required '
'in case a VLSD CG block is used')
warnings.warn(message.format(self.name))
def _read(self):
stream = self._file
memory = self.memory
dg_cntr = 0
self.identification = FileIdentificationBlock(stream=stream)
version = self.identification['version_str']
self.version = version.decode('utf-8').strip(' \n\t\0')
if self.version in ('4.10', '4.11'):
self._check_finalised()
self.header = HeaderBlock(address=0x40, stream=stream)
# read file comment
if self.header['comment_addr']:
self.file_comment = TextBlock(
address=self.header['comment_addr'],
stream=stream,
)
# read file history
fh_addr = self.header['file_history_addr']
while fh_addr:
history_block = FileHistory(address=fh_addr, stream=stream)
history_text = TextBlock(
address=history_block['comment_addr'],
stream=stream,
)
self.file_history.append((history_block, history_text))
fh_addr = history_block['next_fh_addr']
# read attachments
at_addr = self.header['first_attachment_addr']
while at_addr:
texts = {}
at_block = AttachmentBlock(address=at_addr, stream=stream)
for key in ('file_name_addr', 'mime_addr', 'comment_addr'):
addr = at_block[key]
if addr:
texts[key] = TextBlock(address=addr, stream=stream)
self.attachments.append((at_block, texts))
at_addr = at_block['next_at_addr']
# go to first date group and read each data group sequentially
dg_addr = self.header['first_dg_addr']
while dg_addr:
new_groups = []
group = DataGroup(address=dg_addr, stream=stream)
record_id_nr = group['record_id_len']
# go to first channel group of the current data group
cg_addr = group['first_cg_addr']
cg_nr = 0
cg_size = {}
while cg_addr:
cg_nr += 1
grp = {}
new_groups.append(grp)
grp['channels'] = []
grp['channel_conversions'] = []
grp['channel_sources'] = []
grp['signal_data'] = []
grp['data_block'] = None
grp['channel_dependencies'] = []
# channel_group is lsit to allow uniform handling of all texts
# in save method
grp['texts'] = {
'conversion_tab': [],
'channel_group': [],
}
# read each channel group sequentially
block = ChannelGroup(address=cg_addr, stream=stream)
channel_group = grp['channel_group'] = block
grp['record_size'] = cg_size
if channel_group['flags'] == 0:
samples_size = channel_group['samples_byte_nr']
inval_size = channel_group['invalidation_bytes_nr']
record_id = channel_group['record_id']
if PYVERSION == 2:
record_id = chr(record_id)
cg_size[record_id] = samples_size + inval_size
else:
# VLDS flags
record_id = channel_group['record_id']
if PYVERSION == 2:
record_id = chr(record_id)
cg_size[record_id] = 0
if record_id_nr:
grp['sorted'] = False
else:
grp['sorted'] = True
data_group = DataGroup(address=dg_addr, stream=stream)
grp['data_group'] = data_group
# read acquisition name and comment for current channel group
channel_group_texts = {}
for key in ('acq_name_addr', 'comment_addr'):
address = channel_group[key]
if address:
block = TextBlock(address=address, stream=stream)
if memory == 'minimum':
channel_group_texts[key] = address
else:
channel_group_texts[key] = block
if channel_group_texts:
grp['texts']['channel_group'].append(channel_group_texts)
else:
grp['texts']['channel_group'].append(None)
# go to first channel of the current channel group
ch_addr = channel_group['first_ch_addr']
ch_cntr = 0
# Read channels by walking recursively in the channel group
# starting from the first channel
self._read_channels(
ch_addr,
grp,
stream,
dg_cntr,
ch_cntr,
)
cg_addr = channel_group['next_cg_addr']
dg_cntr += 1
# store channel groups record sizes dict in each
# new group data belong to the initial unsorted group, and add
# the key 'sorted' with the value False to use a flag;
# this is used later if memory=False
if memory == 'full':
grp['data_location'] = v4c.LOCATION_MEMORY
dat_addr = group['data_block_addr']
if record_id_nr == 0:
size = channel_group['samples_byte_nr']
size *= channel_group['cycles_nr']
else:
size = sum(
(gp['channel_group']['samples_byte_nr'] + record_id_nr)
* gp['channel_group']['cycles_nr']
for gp in new_groups
)
data = self._read_data_block(
address=dat_addr,
stream=stream,
size=size,
)
if record_id_nr == 0:
grp = new_groups[0]
grp['data_location'] = v4c.LOCATION_MEMORY
grp['data_block'] = DataBlock(data=data)
else:
cg_data = defaultdict(list)
i = 0
size = len(data)
while i < size:
rec_id = data[i]
# skip record id
i += 1
rec_size = cg_size[rec_id]
if rec_size:
rec_data = data[i: i + rec_size]
cg_data[rec_id].append(rec_data)
else:
rec_size = unpack('<I', data[i: i + 4])[0]
i += 4
rec_data = data[i: i + rec_size]
cg_data[rec_id].append(rec_data)
# if 2 record id's are used skip also the second one
if record_id_nr == 2:
i += rec_size + 1
else:
i += rec_size
for grp in new_groups:
grp['data_location'] = v4c.LOCATION_MEMORY
record_id = grp['channel_group']['record_id']
if PYVERSION == 2:
record_id = chr(record_id)
data = b''.join(cg_data[record_id])
grp['channel_group']['record_id'] = 1
grp['data_block'] = DataBlock(data=data)
else:
for grp in new_groups:
grp['data_location'] = v4c.LOCATION_ORIGINAL_FILE
self.groups.extend(new_groups)
dg_addr = group['next_dg_addr']
for grp in self.groups:
for dep_list in grp['channel_dependencies']:
if not dep_list:
continue
for dep in dep_list:
if isinstance(dep, ChannelArrayBlock):
conditions = (
dep['ca_type'] == v4c.CA_TYPE_LOOKUP,
dep['links_nr'] == 4 * dep['dims'] + 1,
)
if not all(conditions):
continue
for i in range(dep['dims']):
ch_addr = dep['scale_axis_{}_ch_addr'.format(i)]
ref_channel = self._ch_map[ch_addr]
dep.referenced_channels.append(ref_channel)
else:
break
if self.memory == 'full':
self.close()
self._si_map = None
self._ch_map = None
self._cc_map = None
def _read_channels(
self,
ch_addr,
grp,
stream,
dg_cntr,
ch_cntr,
channel_composition=False):
memory = self.memory
channels = grp['channels']
composition = []
while ch_addr:
# read channel block and create channel object
channel = Channel(address=ch_addr, stream=stream)
self._ch_map[ch_addr] = (ch_cntr, dg_cntr)
if memory == 'minimum':
value = ch_addr
else:
value = channel
channels.append(value)
if channel_composition:
composition.append(value)
# read conversion block and create channel conversion object
address = channel['conversion_addr']
if address:
if memory == 'minimum':
conv = ChannelConversion(
address=address,
stream=stream,
)
conv_type = conv['conversion_type']
else:
stream.seek(address+8)
size = unpack('<Q', stream.read(8))[0]
stream.seek(address)
raw_bytes = stream.read(size)
if raw_bytes in self._cc_map:
conv = self._cc_map[raw_bytes]
conv_type = conv['conversion_type']
else:
conv = ChannelConversion(raw_bytes=raw_bytes)
conv_type = conv['conversion_type']
if conv_type not in v4c.CONVERSIONS_WITH_TEXTS:
self._cc_map[raw_bytes] = conv
else:
conv_type = -1
conv = None
if memory == 'minimum':
grp['channel_conversions'].append(address)
else:
grp['channel_conversions'].append(conv)
conv_tabx_texts = {}
# read text fields for channel conversions
if conv:
if memory != 'minimum':
address = conv['name_addr']
if address:
conv.name = get_text_v4(address, stream)
address = conv['unit_addr']
if address:
conv.unit = get_text_v4(address, stream)
address = conv['comment_addr']
if address:
conv.comment = get_text_v4(address, stream)
address = conv.get('formula_addr', 0)
if address:
conv.formula = get_text_v4(address, stream)
if conv_type in v4c.TABULAR_CONVERSIONS:
if conv_type == v4c.CONVERSION_TYPE_TTAB:
tabs = conv['links_nr'] - 4
else:
tabs = conv['links_nr'] - 4 - 1
for i in range(tabs):
address = conv['text_{}'.format(i)]
if memory == 'minimum':
conv_tabx_texts['text_{}'.format(i)] = address
else:
if address:
block = TextBlock(
address=address,
stream=stream,
)
conv_tabx_texts['text_{}'.format(i)] = block
else:
conv_tabx_texts['text_{}'.format(i)] = None
if conv_type != v4c.CONVERSION_TYPE_TTAB:
address = conv.get('default_addr', 0)
if address:
if memory == 'minimum':
conv_tabx_texts['default_addr'] = address
else:
stream.seek(address, v4c.SEEK_START)
blk_id = stream.read(4)
if blk_id == b'##TX':
block = TextBlock(
address=address,
stream=stream,
)
conv_tabx_texts['default_addr'] = block
elif blk_id == b'##CC':
block = ChannelConversion(
address=address,
stream=stream,
)
text = str(time.clock()).encode('utf-8')
default_text = block
default_text['text'] = text
conv['unit_addr'] = default_text['unit_addr']
default_text['unit_addr'] = 0
elif conv_type == v4c.CONVERSION_TYPE_TRANS:
# link_nr - common links (4) - default text link (1)
for i in range((conv['links_nr'] - 4 - 1) // 2):
for key in ('input_{}_addr'.format(i),
'output_{}_addr'.format(i)):
address = conv[key]
if address:
if memory == 'minimum':
conv_tabx_texts[key] = address
else:
block = TextBlock(
address=address,
stream=stream,
)
conv_tabx_texts[key] = block
address = conv['default_addr']
if address:
if memory == 'minimum':
conv_tabx_texts['default_addr'] = address
else:
block = TextBlock(
address=address,
stream=stream,
)
conv_tabx_texts['default_addr'] = block
if conv_tabx_texts:
grp['texts']['conversion_tab'].append(conv_tabx_texts)
else:
grp['texts']['conversion_tab'].append(None)
# read source block and create source information object
address = channel['source_addr']
if address:
if memory == 'minimum':
grp['channel_sources'].append(address)
else:
stream.seek(address, v4c.SEEK_START)
raw_bytes = stream.read(v4c.SI_BLOCK_SIZE)
if raw_bytes in self._si_map:
grp['channel_sources'].append(self._si_map[raw_bytes])
else:
source = SourceInformation(
raw_bytes=raw_bytes,
)
grp['channel_sources'].append(source)
address = source['name_addr']
if address:
source.name = get_text_v4(address, stream)
address = source['path_addr']
if address:
source.path = get_text_v4(address, stream)
address = source['comment_addr']
if address:
source.comment = get_text_v4(address, stream)
self._si_map[raw_bytes] = source
else:
if memory == 'minimum':
grp['channel_sources'].append(0)
else:
grp['channel_sources'].append(None)
if memory != 'minimum':
address = channel['unit_addr']
if address:
channel.unit = get_text_v4(address, stream)
address = channel['comment_addr']
if address:
block = TextBlock(
address=address,
stream=stream,
)
name = (
block['text']
.decode('utf-8')
.split('\\')[0]
.strip(' \t\n\r\0')
)
channel.comment = name
channel.comment_type = block['id']
channel.name = name = get_text_v4(channel['name_addr'], stream)
if name not in self.channels_db:
self.channels_db[name] = []
self.channels_db[name].append((dg_cntr, ch_cntr))
if channel['channel_type'] in MASTER_CHANNELS:
self.masters_db[dg_cntr] = ch_cntr
ch_cntr += 1
if channel['component_addr']:
# check if it is a CABLOCK or CNBLOCK
stream.seek(channel['component_addr'], v4c.SEEK_START)
blk_id = stream.read(4)
if blk_id == b'##CN':
index = ch_cntr - 1
grp['channel_dependencies'].append(None)
ch_cntr, composition = self._read_channels(
channel['component_addr'],
grp,
stream,
dg_cntr,
ch_cntr,
True,
)
grp['channel_dependencies'][index] = composition
else:
# only channel arrays with storage=CN_TEMPLATE are
# supported so far
ca_block = ChannelArrayBlock(
address=channel['component_addr'],
stream=stream,
)
if ca_block['storage'] != v4c.CA_STORAGE_TYPE_CN_TEMPLATE:
warnings.warn('Only CN template arrays are supported')
ca_list = [ca_block, ]
while ca_block['composition_addr']:
ca_block = ChannelArrayBlock(
address=ca_block['composition_addr'],
stream=stream,
)
ca_list.append(ca_block)
grp['channel_dependencies'].append(ca_list)
else:
grp['channel_dependencies'].append(None)
# go to next channel of the current channel group
ch_addr = channel['next_ch_addr']
return ch_cntr, composition
def _read_data_block(self, address, stream, size=-1):
"""read and aggregate data blocks for a given data group
Returns
-------
data : bytes
aggregated raw data
"""
orig = address
if address:
stream.seek(address, v4c.SEEK_START)
id_string = stream.read(4)
# can be a DataBlock
if id_string == b'##DT':
data = DataBlock(address=address, stream=stream)
data = data['data']
# or a DataZippedBlock
elif id_string == b'##DZ':
data = DataZippedBlock(address=address, stream=stream)
data = data['data']
# or a DataList
elif id_string == b'##DL':
if size >= 0:
data = bytearray(size)
view = memoryview(data)
position = 0
while address:
dl = DataList(address=address, stream=stream)
for i in range(dl['links_nr'] - 1):
addr = dl['data_block_addr{}'.format(i)]
stream.seek(addr, v4c.SEEK_START)
id_string = stream.read(4)
if id_string == b'##DT':
_, dim, __ = unpack('<4s2Q', stream.read(20))
dim -= 24
position += stream.readinto(view[position: position+dim])
elif id_string == b'##DZ':
block = DataZippedBlock(
stream=stream,
address=addr,
)
uncompressed_size = block['original_size']
view[position: position+uncompressed_size] = block['data']
position += uncompressed_size
address = dl['next_dl_addr']
else:
data = []
while address:
dl = DataList(address=address, stream=stream)
for i in range(dl['links_nr'] - 1):
addr = dl['data_block_addr{}'.format(i)]
stream.seek(addr, v4c.SEEK_START)
id_string = stream.read(4)
if id_string == b'##DT':
block = DataBlock(stream=stream, address=addr)
data.append(block['data'])
elif id_string == b'##DZ':
block = DataZippedBlock(
stream=stream,
address=addr,
)
data.append(block['data'])
address = dl['next_dl_addr']
data = b''.join(data)
# or a header list
elif id_string == b'##HL':
hl = HeaderList(address=address, stream=stream)
data = self._read_data_block(
address=hl['first_dl_addr'],
stream=stream,
)
else:
data = b''
return data
def _load_group_data(self, group):
""" get group's data block bytes """
if self.memory == 'full':
data = group['data_block']['data']
else:
# could be an appended group
# for now appended groups keep the measured data in the memory.
# the plan is to use a temp file for appended groups, to keep the
# memory usage low.
data_group = group['data_group']
channel_group = group['channel_group']
if group['data_location'] == v4c.LOCATION_ORIGINAL_FILE:
# go to the first data block of the current data group
stream = self._file
dat_addr = data_group['data_block_addr']
data = self._read_data_block(
address=dat_addr,
stream=stream,
)
if not group['sorted']:
cg_data = []
cg_size = group['record_size']
record_id = channel_group['record_id']
if PYVERSION == 2:
record_id = chr(record_id)
if data_group['record_id_len'] <= 2:
record_id_nr = data_group['record_id_len']
else:
record_id_nr = 0
i = 0
size = len(data)
while i < size:
rec_id = data[i]
# skip record id
i += 1
rec_size = cg_size[rec_id]
if rec_size:
if rec_id == record_id:
rec_data = data[i: i + rec_size]
cg_data.append(rec_data)
else:
rec_size = unpack('<I', data[i: i + 4])[0]
i += 4
if rec_id == record_id:
rec_data = data[i: i + rec_size]
cg_data.append(rec_data)
# consider the second record ID byte
if record_id_nr == 2:
i += rec_size + 1
else:
i += rec_size
data = b''.join(cg_data)
elif group['data_location'] == v4c.LOCATION_TEMPORARY_FILE:
dat_addr = data_group['data_block_addr']
if dat_addr:
cycles_nr = channel_group['cycles_nr']
samples_byte_nr = channel_group['samples_byte_nr']
size = cycles_nr * samples_byte_nr
self._tempfile.seek(dat_addr, v4c.SEEK_START)
data = self._tempfile.read(size)
else:
data = b''
return data
def _load_signal_data(self, address):
""" this method is used to get the channel signal data, usually for
VLSD channels
"""
if address:
if self._file.closed:
self._file = open(self.name, 'rb')
stream = self._file
stream.seek(address, v4c.SEEK_START)
blk_id = stream.read(4)
if blk_id == b'##SD':
data = SignalDataBlock(address=address, stream=stream)
data = data['data']
elif blk_id == b'##DZ':
data = DataZippedBlock(address=address, stream=stream)
data = data['data']
elif blk_id == b'##DL':
data = []
while address:
# the data list will contain only links to SDBLOCK's
data_list = DataList(address=address, stream=stream)
nr = data_list['links_nr']
# aggregate data from all SDBLOCK
for i in range(nr - 1):
addr = data_list['data_block_addr{}'.format(i)]
stream.seek(addr, v4c.SEEK_START)
blk_id = stream.read(4)
if blk_id == b'##SD':
block = SignalDataBlock(
address=addr,
stream=stream,
)
data.append(block['data'])
elif blk_id == b'##DZ':
block = DataZippedBlock(
address=addr,
stream=stream,
)
data.append(block['data'])
else:
message = ('Expected SD, DZ or DL block at {} '
'but found id="{}"')
message = message.format(hex(address), blk_id)
warnings.warn(message)
return b''
address = data_list['next_dl_addr']
data = b''.join(data)
elif blk_id == b'##CN':
data = b''
else:
message = ('Expected SD, DL, DZ or CN block at {} '
'but found id="{}"')
message = message.format(hex(address), blk_id)
warnings.warn(message)
data = b''
if self.memory == 'full':
self.close()
else:
data = b''
return data
def _prepare_record(self, group):
""" compute record dtype and parents dict fro this group
Parameters
----------
group : dict
MDF group dict
Returns
-------
parents, dtypes : dict, numpy.dtype
mapping of channels to records fields, records fields dtype
"""
grp = group
stream = self._file
memory = self.memory
channel_group = grp['channel_group']
if memory == 'minimum':
channels = [
Channel(address=ch_addr, stream=stream)
for ch_addr in grp['channels']
]
else:
channels = grp['channels']
record_size = channel_group['samples_byte_nr']
invalidation_bytes_nr = channel_group['invalidation_bytes_nr']
next_byte_aligned_position = 0
types = []
current_parent = ""
parent_start_offset = 0
parents = {}
group_channels = set()
sortedchannels = sorted(enumerate(channels), key=lambda i: i[1])
for original_index, new_ch in sortedchannels:
start_offset = new_ch['byte_offset']
bit_offset = new_ch['bit_offset']
data_type = new_ch['data_type']
bit_count = new_ch['bit_count']
ch_type = new_ch['channel_type']
dependency_list = grp['channel_dependencies'][original_index]
if memory == 'minimum':
block = TextBlock(
address=new_ch['name_addr'],
stream=stream,
)
name = (
block['text']
.decode('utf-8')
.strip(' \r\n\t\0')
.split('\\')[0]
)
else:
name = new_ch.name
# handle multiple occurance of same channel name
name = get_unique_name(group_channels, name)
group_channels.add(name)
if start_offset >= next_byte_aligned_position:
if ch_type not in (v4c.CHANNEL_TYPE_VIRTUAL_MASTER,
v4c.CHANNEL_TYPE_VIRTUAL):
if not dependency_list:
parent_start_offset = start_offset
# check if there are byte gaps in the record
gap = parent_start_offset - next_byte_aligned_position
if gap:
types.append(('', 'a{}'.format(gap)))
# adjust size to 1, 2, 4 or 8 bytes
size = bit_offset + bit_count
if data_type not in (v4c.DATA_TYPE_BYTEARRAY,
v4c.DATA_TYPE_STRING_UTF_8,
v4c.DATA_TYPE_STRING_LATIN_1,
v4c.DATA_TYPE_STRING_UTF_16_BE,
v4c.DATA_TYPE_STRING_UTF_16_LE,
v4c.DATA_TYPE_CANOPEN_TIME,
v4c.DATA_TYPE_CANOPEN_DATE):
if size > 32:
size = 8
elif size > 16:
size = 4
elif size > 8:
size = 2
else:
size = 1
else:
size = size >> 3
next_byte_aligned_position = parent_start_offset + size
if next_byte_aligned_position <= record_size:
dtype_pair = name, get_fmt_v4(data_type, size)
types.append(dtype_pair)
parents[original_index] = name, bit_offset
current_parent = name
else:
if isinstance(dependency_list[0], ChannelArrayBlock):
ca_block = dependency_list[0]
# check if there are byte gaps in the record
gap = start_offset - next_byte_aligned_position
if gap:
dtype_pair = '', 'a{}'.format(gap)
types.append(dtype_pair)
size = bit_count >> 3
shape = tuple(
ca_block['dim_size_{}'.format(i)]
for i in range(ca_block['dims'])
)
if ca_block['byte_offset_base'] // size > 1 and \
len(shape) == 1:
shape += ca_block['byte_offset_base'] // size,
dim = 1
for d in shape:
dim *= d
dtype_pair = name, get_fmt_v4(data_type, size), shape
types.append(dtype_pair)
current_parent = name
next_byte_aligned_position = start_offset + size * dim
parents[original_index] = name, 0
else:
parents[original_index] = None, None
# virtual channels do not have bytes in the record
else:
parents[original_index] = None, None
else:
max_overlapping_size = (next_byte_aligned_position - start_offset) * 8
needed_size = bit_offset + bit_count
if max_overlapping_size >= needed_size:
parents[original_index] = current_parent, ((start_offset - parent_start_offset) << 3) + bit_offset
if next_byte_aligned_position > record_size:
break
gap = record_size - next_byte_aligned_position
if gap > 0:
dtype_pair = '', 'a{}'.format(gap)
types.append(dtype_pair)
dtype_pair = 'invalidation_bytes', 'u1', invalidation_bytes_nr
types.append(dtype_pair)
if PYVERSION == 2:
types = fix_dtype_fields(types)
return parents, dtype(types)
def _get_not_byte_aligned_data(self, data, group, ch_nr):
big_endian_types = (
v4c.DATA_TYPE_UNSIGNED_MOTOROLA,
v4c.DATA_TYPE_REAL_MOTOROLA,
v4c.DATA_TYPE_SIGNED_MOTOROLA,
)
record_size = group['channel_group']['samples_byte_nr']
if self.memory == 'minimum':
channel = Channel(
address=group['channels'][ch_nr],
stream=self._file,
)
else:
channel = group['channels'][ch_nr]
bit_offset = channel['bit_offset']
byte_offset = channel['byte_offset']
bit_count = channel['bit_count']
dependencies = group['channel_dependencies'][ch_nr]
if dependencies and isinstance(dependencies[0], ChannelArrayBlock):
ca_block = dependencies[0]
size = bit_count >> 3
shape = tuple(
ca_block['dim_size_{}'.format(i)]
for i in range(ca_block['dims'])
)
if ca_block['byte_offset_base'] // size > 1 and len(shape) == 1:
shape += (ca_block['byte_offset_base'] // size, )
dim = 1
for d in shape:
dim *= d
size *= dim
bit_count = size << 3
byte_count = bit_offset + bit_count
if byte_count % 8:
byte_count = (byte_count >> 3) + 1
else:
byte_count >>= 3
types = [
('', 'a{}'.format(byte_offset)),
('vals', '({},)u1'.format(byte_count)),
('', 'a{}'.format(record_size - byte_count - byte_offset)),
]
vals = fromstring(data, dtype=dtype(types))
vals.setflags(write=False)
vals = vals['vals']
if channel['data_type'] not in big_endian_types:
vals = flip(vals, 1)
vals = unpackbits(vals)
vals = roll(vals, bit_offset)
vals = vals.reshape((len(vals) // 8, 8))
vals = packbits(vals)
vals = vals.reshape((len(vals) // byte_count, byte_count))
if bit_count < 64:
mask = 2 ** bit_count - 1
masks = []
while mask:
masks.append(mask & 0xFF)
mask >>= 8
for i in range(byte_count - len(masks)):
masks.append(0)
masks = masks[::-1]
for i, mask in enumerate(masks):
vals[:, i] &= mask
if channel['data_type'] not in big_endian_types:
vals = flip(vals, 1)
if bit_count <= 8:
size = 1
elif bit_count <= 16:
size = 2
elif bit_count <= 32:
size = 4
elif bit_count <= 64:
size = 8
else:
size = bit_count // 8
if size > byte_count:
extra_bytes = size - byte_count
extra = zeros((len(vals), extra_bytes), dtype=uint8)
types = [
('vals', vals.dtype, vals.shape[1:]),
('', extra.dtype, extra.shape[1:]),
]
vals = fromarrays([vals, extra], dtype=dtype(types))
vals = vals.tostring()
fmt = get_fmt_v4(channel['data_type'], size)
if size <= byte_count:
types = [
('vals', fmt),
('', 'a{}'.format(byte_count - size)),
]
else:
types = [('vals', fmt), ]
vals = fromstring(vals, dtype=dtype(types))
if channel['data_type'] in v4c.SIGNED_INT:
return as_non_byte_sized_signed_int(vals['vals'], bit_count)
else:
return vals['vals']
def _validate_channel_selection(self, name=None, group=None, index=None):
"""Gets channel comment.
Channel can be specified in two ways:
* using the first positional argument *name*
* if there are multiple occurrences for this channel then the
*group* and *index* arguments can be used to select a specific
group.
* if there are multiple occurrences for this channel and either the
*group* or *index* arguments is None then a warning is issued
* using the group number (keyword argument *group*) and the channel
number (keyword argument *index*). Use *info* method for group and
channel numbers
If the *raster* keyword argument is not *None* the output is
interpolated accordingly.
Parameters
----------
name : string
name of channel
group : int
0-based group index
index : int
0-based channel index
Returns
-------
group_index, channel_index : (int, int)
selected channel's group and channel index
"""
if name is None:
if group is None or index is None:
message = (
'Invalid arguments for channel selection: '
'must give "name" or, "group" and "index"'
)
raise MdfException(message)
else:
gp_nr, ch_nr = group, index
if gp_nr > len(self.groups) - 1:
raise MdfException('Group index out of range')
if index > len(self.groups[gp_nr]['channels']) - 1:
raise MdfException('Channel index out of range')
else:
name = name.split('\\')[0]
if name not in self.channels_db:
raise MdfException('Channel "{}" not found'.format(name))
else:
if group is None:
gp_nr, ch_nr = self.channels_db[name][0]
if len(self.channels_db[name]) > 1:
message = (
'Multiple occurances for channel "{}". '
'Using first occurance from data group {}. '
'Provide both "group" and "index" arguments'
' to select another data group'
)
message = message.format(name, gp_nr)
warnings.warn(message)
else:
for gp_nr, ch_nr in self.channels_db[name]:
if gp_nr == group:
if index is None:
break
elif index == ch_nr:
break
else:
if index is None:
message = 'Channel "{}" not found in group {}'
message = message.format(name, group)
else:
message = (
'Channel "{}" not found in group {} '
'at index {}'
)
message = message.format(name, group, index)
raise MdfException(message)
return gp_nr, ch_nr
[docs] def append(self, signals, source_info='Python', common_timebase=False):
"""
Appends a new data group.
For channel dependencies type Signals, the *samples* attribute must be
a numpy.recarray
Parameters
----------
signals : list
list on *Signal* objects
source_info : str
source information; default 'Python'
common_timebase : bool
flag to hint that the signals have the same timebase
Examples
--------
>>> # case 1 conversion type None
>>> s1 = np.array([1, 2, 3, 4, 5])
>>> s2 = np.array([-1, -2, -3, -4, -5])
>>> s3 = np.array([0.1, 0.04, 0.09, 0.16, 0.25])
>>> t = np.array([0.001, 0.002, 0.003, 0.004, 0.005])
>>> names = ['Positive', 'Negative', 'Float']
>>> units = ['+', '-', '.f']
>>> info = {}
>>> s1 = Signal(samples=s1, timstamps=t, unit='+', name='Positive')
>>> s2 = Signal(samples=s2, timstamps=t, unit='-', name='Negative')
>>> s3 = Signal(samples=s3, timstamps=t, unit='flts', name='Floats')
>>> mdf = MDF3('new.mdf')
>>> mdf.append([s1, s2, s3], 'created by asammdf v1.1.0')
>>> # case 2: VTAB conversions from channels inside another file
>>> mdf1 = MDF3('in.mdf')
>>> ch1 = mdf1.get("Channel1_VTAB")
>>> ch2 = mdf1.get("Channel2_VTABR")
>>> sigs = [ch1, ch2]
>>> mdf2 = MDF3('out.mdf')
>>> mdf2.append(sigs, 'created by asammdf v1.1.0')
"""
if not signals:
message = '"append" requires a non-empty list of Signal objects'
raise MdfException(message)
# check if the signals have a common timebase
# if not interpolate the signals using the union of all timbases
t_ = signals[0].timestamps
if not common_timebase:
for s in signals[1:]:
if not array_equal(s.timestamps, t_):
different = True
break
else:
different = False
if different:
times = [s.timestamps for s in signals]
t = reduce(union1d, times).flatten().astype(float64)
signals = [s.interp(t) for s in signals]
times = None
else:
t = t_
else:
t = t_
# split regular from composed signals. Composed signals have recarray
# samples or multimendional ndarray.
# The regular signals will be first added to the group.
# The composed signals will be saved along side the fields, which will
# be saved as new signals.
simple_signals = [
sig for sig in signals
if len(sig.samples.shape) <= 1
and sig.samples.dtype.names is None
]
composed_signals = [
sig for sig in signals
if len(sig.samples.shape) > 1
or sig.samples.dtype.names
]
dg_cntr = len(self.groups)
gp = {}
gp['channels'] = gp_channels = []
gp['channel_conversions'] = gp_conv = []
gp['channel_sources'] = gp_source = []
gp['channel_dependencies'] = gp_dep = []
gp['texts'] = gp_texts = {
'conversion_tab': [],
'channel_group': [],
}
self.groups.append(gp)
cycles_nr = len(t)
fields = []
types = []
parents = {}
ch_cntr = 0
offset = 0
field_names = set()
# setup all blocks related to the time master channel
# time channel texts
for key in ('conversion_tab',):
gp_texts[key].append(None)
memory = self.memory
file = self._tempfile
write = file.write
tell = file.tell
if memory == 'minimum':
block = TextBlock(text='t', meta=False)
channel_name_addr = tell()
write(bytes(block))
if memory == 'minimum':
block = TextBlock(text='s', meta=False)
channel_unit_addr = tell()
write(bytes(block))
if memory == 'minimum':
block = TextBlock(text=source_info, meta=False)
source_text_address = tell()
write(bytes(block))
else:
source_text_address = 0
source_block = SourceInformation(
name_addr=source_text_address,
path_addr=source_text_address,
)
source_block.name = source_block.path = source_info
source_info_address = tell()
write(bytes(source_block))
# conversion and source for time channel
if memory == 'minimum':
gp_conv.append(0)
gp_source.append(source_info_address)
else:
gp_conv.append(None)
gp_source.append(source_block)
if memory == 'minimum':
name_addr = channel_name_addr
unit_addr = channel_unit_addr
else:
name_addr = 0
unit_addr = 0
# time channel
t_type, t_size = fmt_to_datatype_v4(t.dtype)
kargs = {
'channel_type': v4c.CHANNEL_TYPE_MASTER,
'data_type': t_type,
'sync_type': 1,
'byte_offset': 0,
'bit_offset': 0,
'bit_count': t_size,
'min_raw_value': t[0] if cycles_nr else 0,
'max_raw_value': t[-1] if cycles_nr else 0,
'lower_limit': t[0] if cycles_nr else 0,
'upper_limit': t[-1] if cycles_nr else 0,
'flags': v4c.FLAG_PHY_RANGE_OK | v4c.FLAG_VAL_RANGE_OK,
'name_addr': name_addr,
'unit_addr': unit_addr,
}
ch = Channel(**kargs)
name = 't'
if memory == 'minimum':
address = tell()
write(bytes(ch))
gp_channels.append(address)
else:
ch.name = name
ch.unit = 's'
gp_channels.append(ch)
if name not in self.channels_db:
self.channels_db[name] = []
self.channels_db[name].append((dg_cntr, ch_cntr))
self.masters_db[dg_cntr] = 0
# data group record parents
parents[ch_cntr] = name, 0
# time channel doesn't have channel dependencies
gp_dep.append(None)
fields.append(t)
types.append((name, t.dtype))
field_names.add(name)
offset += t_size // 8
ch_cntr += 1
if self._compact_integers_on_append:
compacted_signals = [
{'signal': sig}
for sig in simple_signals
if sig.samples.dtype.kind in 'ui'
]
max_itemsize = 1
dtype_ = dtype(uint8)
for signal in compacted_signals:
itemsize = signal['signal'].samples.dtype.itemsize
min_val, max_val = get_min_max(signal['signal'].samples)
signal['min'], signal['max'] = min_val, max_val
minimum_bitlength = (itemsize // 2) * 8 + 1
bit_length = max(
int(min_val).bit_length(),
int(max_val).bit_length(),
)
bit_length += 1
signal['bit_count'] = max(minimum_bitlength, bit_length)
if itemsize > max_itemsize:
dtype_ = dtype('<u{}'.format(itemsize))
max_itemsize = itemsize
compacted_signals.sort(key=lambda x: x['bit_count'])
simple_signals = [
sig
for sig in simple_signals
if sig.samples.dtype.kind not in 'ui'
]
dtype_size = dtype_.itemsize * 8
else:
compacted_signals = []
# first try to compact unsigned integers
while compacted_signals:
# channels texts
cluster = []
tail = compacted_signals.pop()
size = tail['bit_count']
cluster.append(tail)
while size < dtype_size and compacted_signals:
head = compacted_signals[0]
head_size = head['bit_count']
if head_size + size > dtype_size:
break
else:
cluster.append(compacted_signals.pop(0))
size += head_size
bit_offset = 0
field_name = get_unique_name(field_names, 'COMPACT')
types.append((field_name, dtype_))
field_names.add(field_name)
values = zeros(cycles_nr, dtype=dtype_)
for signal_d in cluster:
signal = signal_d['signal']
bit_count = signal_d['bit_count']
min_val = signal_d['min']
max_val = signal_d['max']
name = signal.name
gp_texts['conversion_tab'].append(None)
if memory == 'minimum':
block = TextBlock(text=name, meta=False)
channel_name_address = tell()
write(bytes(block))
if signal.unit:
block = TextBlock(text=signal.unit, meta=False)
channel_unit_address = tell()
write(bytes(block))
else:
channel_unit_address = 0
if signal.comment:
block = TextBlock(text=signal.comment, meta=False)
channel_comment_address = tell()
write(bytes(block))
else:
channel_comment_address = 0
# conversions for channel
info = signal.info
conv_texts_tab = {}
if info and 'raw' in info:
kargs = {}
raw = info['raw']
phys = info['phys']
if raw.dtype.kind == 'S':
kargs['conversion_type'] = v4c.CONVERSION_TYPE_TTAB
for i, (r_, p_) in enumerate(zip(raw, phys)):
kargs['text_{}'.format(i)] = 0
kargs['val_{}'.format(i)] = p_
block = TextBlock(
text=r_,
meta=False,
)
if memory != 'minimum':
conv_texts_tab['text_{}'.format(i)] = block
else:
address = tell()
conv_texts_tab['text_{}'.format(i)] = address
write(bytes(block))
kargs['val_default'] = info['default']
kargs['links_nr'] = len(raw) + 4
else:
kargs['conversion_type'] = v4c.CONVERSION_TYPE_TABX
for i, (r_, p_) in enumerate(zip(raw, phys)):
kargs['text_{}'.format(i)] = 0
kargs['val_{}'.format(i)] = r_
block = TextBlock(
text=p_,
meta=False,
)
if memory != 'minimum':
conv_texts_tab['text_{}'.format(i)] = block
else:
address = tell()
conv_texts_tab['text_{}'.format(i)] = address
write(bytes(block))
if 'default' in info:
block = TextBlock(
text=info['default'],
meta=False,
)
if memory != 'minimum':
conv_texts_tab['default_addr'] = block
else:
address = tell()
conv_texts_tab['default_addr'] = address
write(bytes(block))
kargs['links_nr'] = len(raw) + 5
block = ChannelConversion(**kargs)
if memory != 'minimum':
gp_conv.append(block)
else:
address = tell()
gp_conv.append(address)
write(bytes(block))
elif info and 'lower' in info:
kargs = {}
kargs['conversion_type'] = v4c.CONVERSION_TYPE_RTABX
lower = info['lower']
upper = info['upper']
texts = info['phys']
kargs['ref_param_nr'] = len(upper)
kargs['links_nr'] = len(lower) + 5
for i, (u_, l_, t_) in enumerate(zip(upper, lower, texts)):
kargs['lower_{}'.format(i)] = l_
kargs['upper_{}'.format(i)] = u_
kargs['text_{}'.format(i)] = 0
block = TextBlock(
text=t_,
meta=False,
)
if memory != 'minimum':
conv_texts_tab['text_{}'.format(i)] = block
else:
address = tell()
conv_texts_tab['text_{}'.format(i)] = address
write(bytes(block))
if 'default' in info:
block = TextBlock(
text=info['default'],
meta=False,
)
if memory != 'minimum':
conv_texts_tab['default_addr'] = block
else:
address = tell()
conv_texts_tab['default_addr'] = address
write(bytes(block))
block = ChannelConversion(**kargs)
if memory != 'minimum':
gp_conv.append(block)
else:
address = tell()
gp_conv.append(address)
write(bytes(block))
else:
if memory != 'minimum':
gp_conv.append(None)
else:
gp_conv.append(0)
if conv_texts_tab:
gp_texts['conversion_tab'][-1] = conv_texts_tab
# source for channel
if memory != 'minimum':
gp_source.append(source_block)
else:
gp_source.append(source_info_address)
if memory == 'minimum':
name_addr = channel_name_address
unit_addr = channel_unit_address
comment_addr = channel_comment_address
else:
name_addr = 0
unit_addr = 0
comment_addr = 0
# compute additional byte offset for large records size
if signal.samples.dtype.kind == 'u':
data_type = v4c.DATA_TYPE_UNSIGNED_INTEL
else:
data_type = v4c.DATA_TYPE_SIGNED_INTEL
kargs = {
'channel_type': v4c.CHANNEL_TYPE_VALUE,
'bit_count': bit_count,
'byte_offset': offset + bit_offset // 8,
'bit_offset': bit_offset % 8,
'data_type': data_type,
'min_raw_value': min_val if min_val <= max_val else 0,
'max_raw_value': max_val if min_val <= max_val else 0,
'lower_limit': min_val if min_val <= max_val else 0,
'upper_limit': max_val if min_val <= max_val else 0,
'name_addr': name_addr,
'unit_addr': unit_addr,
'comment_addr': comment_addr,
}
if min_val > max_val:
kargs['flags'] = 0
else:
kargs['flags'] = v4c.FLAG_PHY_RANGE_OK | v4c.FLAG_VAL_RANGE_OK
ch = Channel(**kargs)
if memory != 'minimum':
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
gp_channels.append(ch)
else:
address = tell()
write(bytes(ch))
gp_channels.append(address)
if name not in self.channels_db:
self.channels_db[name] = []
self.channels_db[name].append((dg_cntr, ch_cntr))
# update the parents as well
parents[ch_cntr] = field_name, bit_offset
values += signal.samples.astype(dtype_) << bit_offset
bit_offset += bit_count
ch_cntr += 1
# simple channels don't have channel dependencies
gp_dep.append(None)
offset += dtype_.itemsize
fields.append(values)
# first add the signals in the simple signal list
for signal in simple_signals:
name = signal.name
gp_texts['conversion_tab'].append(None)
if memory == 'minimum':
block = TextBlock(text=name, meta=False)
channel_name_address = tell()
write(bytes(block))
if signal.unit:
block = TextBlock(
text=signal.unit,
meta=False,
)
channel_unit_address = tell()
write(bytes(block))
else:
channel_unit_address = 0
if signal.comment:
block = TextBlock(text=signal.comment, meta=False)
channel_comment_address = tell()
write(bytes(block))
else:
channel_comment_address = 0
# conversions for channel
info = signal.info
conv_texts_tab = {}
if info and 'raw' in info:
kargs = {}
raw = info['raw']
phys = info['phys']
if raw.dtype.kind == 'S':
kargs['conversion_type'] = v4c.CONVERSION_TYPE_TTAB
for i, (r_, p_) in enumerate(zip(raw, phys)):
kargs['text_{}'.format(i)] = 0
kargs['val_{}'.format(i)] = p_
block = TextBlock(
text=r_,
meta=False,
)
if memory != 'minimum':
conv_texts_tab['text_{}'.format(i)] = block
else:
address = tell()
conv_texts_tab['text_{}'.format(i)] = address
write(bytes(block))
kargs['val_default'] = info['default']
kargs['links_nr'] = len(raw) + 4
else:
kargs['conversion_type'] = v4c.CONVERSION_TYPE_TABX
for i, (r_, p_) in enumerate(zip(raw, phys)):
kargs['text_{}'.format(i)] = 0
kargs['val_{}'.format(i)] = r_
block = TextBlock(
text=p_,
meta=False,
)
if memory != 'minimum':
conv_texts_tab['text_{}'.format(i)] = block
else:
address = tell()
conv_texts_tab['text_{}'.format(i)] = address
write(bytes(block))
if 'default' in info:
block = TextBlock(
text=info['default'],
meta=False,
)
if memory != 'minimum':
conv_texts_tab['default_addr'] = block
else:
address = tell()
conv_texts_tab['default_addr'] = address
write(bytes(block))
kargs['links_nr'] = len(raw) + 5
block = ChannelConversion(**kargs)
if memory != 'minimum':
gp_conv.append(block)
else:
address = tell()
gp_conv.append(address)
write(bytes(block))
elif info and 'lower' in info:
kargs = {}
kargs['conversion_type'] = v4c.CONVERSION_TYPE_RTABX
lower = info['lower']
upper = info['upper']
texts = info['phys']
kargs['ref_param_nr'] = len(upper)
kargs['links_nr'] = len(lower) + 5
for i, (u_, l_, t_) in enumerate(zip(upper, lower, texts)):
kargs['lower_{}'.format(i)] = l_
kargs['upper_{}'.format(i)] = u_
kargs['text_{}'.format(i)] = 0
block = TextBlock(
text=t_,
meta=False,
)
if memory != 'minimum':
conv_texts_tab['text_{}'.format(i)] = block
else:
address = tell()
conv_texts_tab['text_{}'.format(i)] = address
write(bytes(block))
if 'default' in info:
block = TextBlock(
text=info['default'],
meta=False,
)
if memory != 'minimum':
conv_texts_tab['default_addr'] = block
else:
address = tell()
conv_texts_tab['default_addr'] = address
write(bytes(block))
block = ChannelConversion(**kargs)
if memory != 'minimum':
gp_conv.append(block)
else:
address = tell()
gp_conv.append(address)
write(bytes(block))
else:
if memory != 'minimum':
gp_conv.append(None)
else:
gp_conv.append(0)
if conv_texts_tab:
gp_texts['conversion_tab'][-1] = conv_texts_tab
# source for channel
if memory != 'minimum':
gp_source.append(source_block)
else:
gp_source.append(source_info_address)
if memory == 'minimum':
name_addr = channel_name_address
unit_addr = channel_unit_address
comment_addr = channel_comment_address
else:
name_addr = 0
unit_addr = 0
comment_addr = 0
# compute additional byte offset for large records size
s_type, s_size = fmt_to_datatype_v4(signal.samples.dtype)
byte_size = max(s_size // 8, 1)
min_val, max_val = get_min_max(signal.samples)
kargs = {
'channel_type': v4c.CHANNEL_TYPE_VALUE,
'bit_count': s_size,
'byte_offset': offset,
'bit_offset': 0,
'data_type': s_type,
'min_raw_value': min_val if min_val <= max_val else 0,
'max_raw_value': max_val if min_val <= max_val else 0,
'lower_limit': min_val if min_val <= max_val else 0,
'upper_limit': max_val if min_val <= max_val else 0,
'name_addr': name_addr,
'unit_addr': unit_addr,
'comment_addr': comment_addr,
}
if min_val > max_val:
kargs['flags'] = 0
else:
kargs['flags'] = v4c.FLAG_PHY_RANGE_OK | v4c.FLAG_VAL_RANGE_OK
ch = Channel(**kargs)
if memory != 'minimum':
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
gp_channels.append(ch)
else:
address = tell()
write(bytes(ch))
gp_channels.append(address)
offset += byte_size
if name not in self.channels_db:
self.channels_db[name] = []
self.channels_db[name].append((dg_cntr, ch_cntr))
# update the parents as well
field_name = get_unique_name(field_names, name)
parents[ch_cntr] = field_name, 0
fields.append(signal.samples)
types.append((field_name, signal.samples.dtype))
field_names.add(field_name)
ch_cntr += 1
# simple channels don't have channel dependencies
gp_dep.append(None)
canopen_time_fields = (
'ms',
'days',
)
canopen_date_fields = (
'ms',
'min',
'hour',
'day',
'month',
'year',
'summer_time',
'day_of_week',
)
for signal in composed_signals:
names = signal.samples.dtype.names
if names is None:
names = []
name = signal.name
if names in (canopen_time_fields, canopen_date_fields):
field_name = get_unique_name(field_names, name)
field_names.add(field_name)
if names == canopen_time_fields:
vals = signal.samples.tostring()
fields.append(frombuffer(vals, dtype='V6'))
types.append((field_name, 'V6'))
byte_size = 6
s_type = v4c.DATA_TYPE_CANOPEN_TIME
else:
vals = []
for field in ('ms', 'min', 'hour', 'day', 'month', 'year'):
vals.append(signal.samples[field])
vals = fromarrays(vals).tostring()
fields.append(frombuffer(vals, dtype='V7'))
types.append((field_name, 'V7'))
byte_size = 7
s_type = v4c.DATA_TYPE_CANOPEN_DATE
s_size = byte_size << 3
# add channel texts
gp_texts['conversion_tab'].append(None)
if memory == 'minimum':
block = TextBlock(text=name, meta=False)
channel_name_address = tell()
write(bytes(block))
if signal.unit:
block = TextBlock(
text=signal.unit,
meta=False,
)
channel_unit_address = tell()
write(bytes(block))
if signal.comment:
block = TextBlock(text=signal.comment, meta=False)
channel_comment_address = tell()
write(bytes(block))
else:
channel_comment_address = 0
# add channel conversion
if memory != 'minimum':
gp_conv.append(None)
else:
gp_conv.append(0)
if memory != 'minimum':
gp_source.append(source_block)
else:
gp_source.append(source_info_address)
# there is no channel dependency
gp_dep.append(None)
if memory == 'minimum':
name_addr = channel_name_address
unit_addr = channel_unit_address
comment_addr = channel_comment_address
else:
name_addr = 0
unit_addr = 0
comment_addr = 0
# add channel block
kargs = {
'channel_type': v4c.CHANNEL_TYPE_VALUE,
'bit_count': s_size,
'byte_offset': offset,
'bit_offset': 0,
'data_type': s_type,
'min_raw_value': 0,
'max_raw_value': 0,
'lower_limit': 0,
'upper_limit': 0,
'flags': 0,
'name_addr': name_addr,
'unit_addr': unit_addr,
'comment_addr': comment_addr,
}
ch = Channel(**kargs)
if memory != 'minimum':
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
gp_channels.append(ch)
else:
address = tell()
write(bytes(ch))
gp_channels.append(address)
offset += byte_size
if name in self.channels_db:
self.channels_db[name].append((dg_cntr, ch_cntr))
else:
self.channels_db[name] = []
self.channels_db[name].append((dg_cntr, ch_cntr))
# update the parents as well
parents[ch_cntr] = field_name, 0
ch_cntr += 1
elif names and names[0] != name:
# here we have a structure channel composition
field_name = get_unique_name(field_names, name)
field_names.add(field_name)
# first we add the structure channel
# add channel texts
gp_texts['conversion_tab'].append(None)
if memory == 'minimum':
block = TextBlock(text=name, meta=False)
channel_name_address = tell()
write(bytes(block))
if signal.unit:
block = TextBlock(text=signal.unit, meta=False)
channel_unit_address = tell()
write(bytes(block))
else:
channel_unit_address = 0
if signal.comment:
block = TextBlock(text=signal.comment, meta=False)
channel_comment_address = tell()
write(bytes(block))
else:
channel_comment_address = 0
# add channel conversion
if memory != 'minimum':
gp_conv.append(None)
else:
gp_conv.append(0)
if memory != 'minimum':
gp_source.append(source_block)
else:
gp_source.append(source_info_address)
if memory == 'minimum':
name_addr = channel_name_address
unit_addr = channel_unit_address
comment_addr = channel_comment_address
else:
name_addr = 0
unit_addr = 0
comment_addr = 0
# add channel block
kargs = {
'channel_type': v4c.CHANNEL_TYPE_VALUE,
'bit_count': 8,
'byte_offset': offset,
'bit_offset': 0,
'data_type': v4c.DATA_TYPE_BYTEARRAY,
'min_raw_value': 0,
'max_raw_value': 0,
'lower_limit': 0,
'upper_limit': 0,
'flags': 0,
'name_addr': name_addr,
'unit_addr': unit_addr,
'comment_addr': comment_addr,
}
ch = Channel(**kargs)
if memory != 'minimum':
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
gp_channels.append(ch)
else:
address = tell()
write(bytes(ch))
gp_channels.append(address)
if name not in self.channels_db:
self.channels_db[name] = []
self.channels_db[name].append((dg_cntr, ch_cntr))
# update the parents as well
parents[ch_cntr] = name, 0
ch_cntr += 1
dep_list = []
gp_dep.append(dep_list)
# then we add the fields
for name in names:
field_name = get_unique_name(field_names, name)
field_names.add(field_name)
samples = signal.samples[name]
s_type, s_size = fmt_to_datatype_v4(samples.dtype)
byte_size = s_size >> 3
fields.append(samples)
types.append((field_name, samples.dtype))
# add channel texts
gp_texts['conversion_tab'].append(None)
if memory == 'minimum':
block = TextBlock(text=name, meta=False)
channel_name_address = tell()
write(bytes(block))
if signal.unit:
block = TextBlock(text=signal.unit, meta=False)
channel_unit_address = tell()
write(bytes(block))
else:
channel_unit_address = 0
if signal.comment:
block = TextBlock(text=signal.comment, meta=False)
channel_comment_address = tell()
write(bytes(block))
else:
channel_comment_address = 0
# add channel conversion
if memory != 'minimum':
gp_conv.append(None)
else:
gp_conv.append(0)
# source
if memory != 'minimum':
gp_source.append(source_block)
else:
gp_source.append(source_info_address)
if memory == 'minimum':
name_addr = channel_name_address
unit_addr = channel_unit_address
comment_addr = channel_comment_address
else:
name_addr = 0
unit_addr = 0
comment_addr = 0
# add channel block
min_val, max_val = get_min_max(signal.samples)
kargs = {
'channel_type': v4c.CHANNEL_TYPE_VALUE,
'bit_count': s_size,
'byte_offset': offset,
'bit_offset': 0,
'data_type': s_type,
'min_raw_value': min_val if min_val <= max_val else 0,
'max_raw_value': max_val if min_val <= max_val else 0,
'lower_limit': min_val if min_val <= max_val else 0,
'upper_limit': max_val if min_val <= max_val else 0,
'flags': v4c.FLAG_PHY_RANGE_OK | v4c.FLAG_VAL_RANGE_OK,
'name_addr': name_addr,
'unit_addr': unit_addr,
'comment_addr': comment_addr,
}
ch = Channel(**kargs)
if memory != 'minimum':
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
gp_channels.append(ch)
dep_list.append(ch)
else:
address = tell()
write(bytes(ch))
gp_channels.append(address)
dep_list.append(address)
offset += byte_size
if name not in self.channels_db:
self.channels_db[name] = []
self.channels_db[name].append((dg_cntr, ch_cntr))
# update the parents as well
parents[ch_cntr] = field_name, 0
ch_cntr += 1
gp_dep.append(None)
else:
# here we have channel arrays or mdf v3 channel dependencies
if names:
samples = signal.samples[names[0]]
else:
samples = signal.samples
shape = samples.shape[1:]
if len(shape) > 1:
# add channel dependency block for composed parent channel
dims_nr = len(shape)
names_nr = len(names)
if names_nr == 0:
kargs = {
'dims': dims_nr,
'ca_type': v4c.CA_TYPE_LOOKUP,
'flags': v4c.FLAG_CA_FIXED_AXIS,
'byte_offset_base': samples.dtype.itemsize,
}
for i in range(dims_nr):
kargs['dim_size_{}'.format(i)] = shape[i]
elif len(names) == 1:
kargs = {
'dims': dims_nr,
'ca_type': v4c.CA_TYPE_ARRAY,
'flags': 0,
'byte_offset_base': samples.dtype.itemsize,
}
for i in range(dims_nr):
kargs['dim_size_{}'.format(i)] = shape[i]
else:
kargs = {
'dims': dims_nr,
'ca_type': v4c.CA_TYPE_LOOKUP,
'flags': v4c.FLAG_CA_AXIS,
'byte_offset_base': samples.dtype.itemsize,
}
for i in range(dims_nr):
kargs['dim_size_{}'.format(i)] = shape[i]
parent_dep = ChannelArrayBlock(**kargs)
gp_dep.append([parent_dep, ])
else:
# add channel dependency block for composed parent channel
kargs = {
'dims': 1,
'ca_type': v4c.CA_TYPE_SCALE_AXIS,
'flags': 0,
'byte_offset_base': samples.dtype.itemsize,
'dim_size_0': shape[0],
}
parent_dep = ChannelArrayBlock(**kargs)
gp_dep.append([parent_dep, ])
field_name = get_unique_name(field_names, name)
field_names.add(field_name)
fields.append(samples)
dtype_pair = field_name, samples.dtype, shape
types.append(dtype_pair)
# first we add the structure channel
# add channel texts
gp_texts['conversion_tab'].append(None)
if memory == 'minimum':
block = TextBlock(text=name, meta=False)
channel_name_address = tell()
write(bytes(block))
if signal.unit:
block = TextBlock(text=signal.unit, meta=False)
channel_unit_address = tell()
write(bytes(block))
else:
channel_unit_address = 0
if signal.comment:
block = TextBlock(text=signal.comment, meta=False)
channel_comment_address = tell()
write(bytes(block))
else:
channel_comment_address = 0
# add channel conversion
if memory != 'minimum':
gp_conv.append(None)
else:
gp_conv.append(0)
# source for channel
if memory != 'minimum':
gp_source.append(source_block)
else:
gp_source.append(source_info_address)
if memory == 'minimum':
name_addr = channel_name_address
unit_addr = channel_unit_address
comment_addr = channel_comment_address
else:
name_addr = 0
unit_addr = 0
comment_addr = 0
s_type, s_size = fmt_to_datatype_v4(samples.dtype)
# add channel block
kargs = {
'channel_type': v4c.CHANNEL_TYPE_VALUE,
'bit_count': s_size,
'byte_offset': offset,
'bit_offset': 0,
'data_type': s_type,
'min_raw_value': 0,
'max_raw_value': 0,
'lower_limit': 0,
'upper_limit': 0,
'flags': 0,
'name_addr': name_addr,
'unit_addr': unit_addr,
'comment_addr': comment_addr,
}
ch = Channel(**kargs)
if memory != 'minimum':
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
gp_channels.append(ch)
else:
address = tell()
write(bytes(ch))
gp_channels.append(address)
size = s_size >> 3
for dim in shape:
size *= dim
offset += size
if name not in self.channels_db:
self.channels_db[name] = []
self.channels_db[name].append((dg_cntr, ch_cntr))
# update the parents as well
parents[ch_cntr] = name, 0
ch_cntr += 1
for name in names[1:]:
field_name = get_unique_name(field_names, name)
field_names.add(field_name)
samples = signal.samples[name]
shape = samples.shape[1:]
fields.append(samples)
types.append((field_name, samples.dtype, shape))
gp_texts['conversion_tab'].append(None)
if memory == 'minimum':
block = TextBlock(text=name, meta=False)
channel_name_address = tell()
write(bytes(block))
if signal.unit:
block = TextBlock(text=signal.unit, meta=False)
channel_unit_address = tell()
write(bytes(block))
else:
channel_unit_address = 0
if signal.comment:
block = TextBlock(text=signal.comment, meta=False)
channel_comment_address = tell()
write(bytes(block))
else:
channel_comment_address = 0
# add channel conversion
if memory != 'minimum':
gp_conv.append(None)
else:
gp_conv.append(0)
# source for channel
if memory != 'minimum':
gp_source.append(source_block)
else:
gp_source.append(source_info_address)
if memory == 'minimum':
name_addr = channel_name_address
unit_addr = channel_unit_address
comment_addr = channel_comment_address
else:
name_addr = 0
unit_addr = 0
comment_addr = 0
# add channel dependency block
kargs = {
'dims': 1,
'ca_type': v4c.CA_TYPE_SCALE_AXIS,
'flags': 0,
'byte_offset_base': samples.dtype.itemsize,
'dim_size_0': shape[0],
}
dep = ChannelArrayBlock(**kargs)
gp_dep.append([dep, ])
# add components channel
min_val, max_val = get_min_max(samples)
s_type, s_size = fmt_to_datatype_v4(samples.dtype)
byte_size = max(s_size // 8, 1)
kargs = {
'channel_type': v4c.CHANNEL_TYPE_VALUE,
'bit_count': s_size,
'byte_offset': offset,
'bit_offset': 0,
'data_type': s_type,
'min_raw_value': min_val if min_val <= max_val else 0,
'max_raw_value': max_val if min_val <= max_val else 0,
'lower_limit': min_val if min_val <= max_val else 0,
'upper_limit': max_val if min_val <= max_val else 0,
'flags': v4c.FLAG_PHY_RANGE_OK | v4c.FLAG_VAL_RANGE_OK,
'name_addr': name_addr,
'unit_addr': unit_addr,
'comment_addr': comment_addr,
}
ch = Channel(**kargs)
if memory != 'minimum':
ch.name = name
ch.unit = signal.unit
ch.comment = signal.comment
gp_channels.append(ch)
else:
address = tell()
write(bytes(ch))
gp_channels.append(address)
parent_dep.referenced_channels.append((ch_cntr, dg_cntr))
for dim in shape:
byte_size *= dim
offset += byte_size
if name not in self.channels_db:
self.channels_db[name] = []
self.channels_db[name].append((dg_cntr, ch_cntr))
# update the parents as well
parents[ch_cntr] = field_name, 0
ch_cntr += 1
# channel group
kargs = {
'cycles_nr': cycles_nr,
'samples_byte_nr': offset,
}
gp['channel_group'] = ChannelGroup(**kargs)
gp['size'] = cycles_nr * offset
gp_texts['channel_group'].append(None)
# data group
gp['data_group'] = DataGroup()
# data block
if PYVERSION == 2:
types = fix_dtype_fields(types)
types = dtype(types)
gp['sorted'] = True
gp['types'] = types
gp['parents'] = parents
samples = fromarrays(fields, dtype=types)
signals = None
del signals
try:
block = samples.tostring()
if memory == 'full':
gp['data_location'] = v4c.LOCATION_MEMORY
gp['data_block'] = DataBlock(data=block)
else:
gp['data_location'] = v4c.LOCATION_TEMPORARY_FILE
data_address = self._tempfile.tell()
gp['data_group']['data_block_addr'] = data_address
self._tempfile.write(bytes(block))
except MemoryError:
if memory == 'full':
raise
else:
gp['data_location'] = v4c.LOCATION_TEMPORARY_FILE
data_address = self._tempfile.tell()
gp['data_group']['data_block_addr'] = data_address
for sample in samples:
self._tempfile.write(sample.tostring())
[docs] def attach(self,
data,
file_name=None,
comment=None,
compression=True,
mime=r'application/octet-stream'):
""" attach embedded attachment as application/octet-stream
Parameters
----------
data : bytes
data to be attached
file_name : str
string file name
comment : str
attachment comment
compression : bool
use compression for embedded attachment data
mime : str
mime type string
"""
creator_index = len(self.file_history)
fh = FileHistory()
text = """<FHcomment>
<TX>Added new embedded attachment from {}</TX>
<tool_id>asammdf</tool_id>
<tool_vendor>asammdf</tool_vendor>
<tool_version>{}</tool_version>
</FHcomment>"""
text = text.format(
file_name if file_name else 'bin.bin',
__version__,
)
fh_text = TextBlock(text=text, meta=True)
self.file_history.append((fh, fh_text))
texts = {}
texts['mime_addr'] = TextBlock(text=mime, meta=False)
if comment:
texts['comment_addr'] = TextBlock(text=comment, meta=False)
text = file_name if file_name else 'bin.bin'
texts['file_name_addr'] = TextBlock(text=text)
at_block = AttachmentBlock(data=data, compression=compression)
at_block['creator_index'] = creator_index
self.attachments.append((at_block, texts))
[docs] def close(self):
""" if the MDF was created with memory=False and new
channels have been appended, then this must be called just before the
object is not used anymore to clean-up the temporary file"""
if self._tempfile is not None:
self._tempfile.close()
if self._file is not None:
self._file.close()
[docs] def get_channel_unit(self, name=None, group=None, index=None):
"""Gets channel unit.
Channel can be specified in two ways:
* using the first positional argument *name*
* if there are multiple occurrences for this channel then the
*group* and *index* arguments can be used to select a specific
group.
* if there are multiple occurrences for this channel and either the
*group* or *index* arguments is None then a warning is issued
* using the group number (keyword argument *group*) and the channel
number (keyword argument *index*). Use *info* method for group and
channel numbers
If the *raster* keyword argument is not *None* the output is
interpolated accordingly.
Parameters
----------
name : string
name of channel
group : int
0-based group index
index : int
0-based channel index
Returns
-------
unit : str
found channel unit
"""
gp_nr, ch_nr = self._validate_channel_selection(
name,
group,
index,
)
grp = self.groups[gp_nr]
if grp['data_location'] == v4c.LOCATION_ORIGINAL_FILE:
stream = self._file
else:
stream = self._tempfile
channel = grp['channels'][ch_nr]
conversion = grp['channel_conversions'][ch_nr]
if self.memory == 'minimum':
channel = Channel(
address=channel,
stream=stream,
)
if conversion:
conversion = ChannelConversion(
address=conversion,
stream=stream,
)
address = (
conversion and conversion['unit_addr']
or channel['unit_addr']
or 0
)
if address:
unit = TextBlock(
address=address,
stream=stream,
)
if PYVERSION == 3:
unit = unit['text'].decode('utf-8').strip(' \n\t\0')
else:
unit = unit['text'].strip(' \n\t\0')
else:
unit = ''
else:
unit = (
conversion and conversion.unit
or channel.unit
or ''
)
return unit
[docs] def get(self,
name=None,
group=None,
index=None,
raster=None,
samples_only=False,
data=None,
raw=False):
"""Gets channel samples.
Channel can be specified in two ways:
* using the first positional argument *name*
* if there are multiple occurances for this channel then the
*group* and *index* arguments can be used to select a specific
group.
* if there are multiple occurances for this channel and either the
*group* or *index* arguments is None then a warning is issued
* using the group number (keyword argument *group*) and the channel
number (keyword argument *index*). Use *info* method for group and
channel numbers
If the *raster* keyword argument is not *None* the output is
interpolated accordingly
Parameters
----------
name : string
name of channel
group : int
0-based group index
index : int
0-based channel index
raster : float
time raster in seconds
samples_only : bool
if *True* return only the channel samples as numpy array; if
*False* return a *Signal* object
data : bytes
prevent redundant data read by providing the raw data group samples
raw : bool
return channel samples without appling the conversion rule; default
`False`
Returns
-------
res : (numpy.array | Signal)
returns *Signal* if *samples_only*=*False* (default option),
otherwise returns numpy.array
The *Signal* samples are:
* numpy recarray for channels that have composition/channel
array address or for channel of type BYTEARRAY,
CANOPENDATE, CANOPENTIME
* numpy array for all the rest
Raises
------
MdfException :
* if the channel name is not found
* if the group index is out of range
* if the channel index is out of range
Examples
--------
>>> from asammdf import MDF, Signal
>>> import numpy as np
>>> t = np.arange(5)
>>> s = np.ones(5)
>>> mdf = MDF(version='4.10')
>>> for i in range(4):
... sigs = [Signal(s*(i*10+j), t, name='Sig') for j in range(1, 4)]
... mdf.append(sigs)
...
>>> # first group and channel index of the specified channel name
...
>>> mdf.get('Sig')
UserWarning: Multiple occurances for channel "Sig". Using first occurance from data group 4. Provide both "group" and "index" arguments to select another data group
<Signal Sig:
samples=[ 1. 1. 1. 1. 1.]
timestamps=[0 1 2 3 4]
unit=""
info=None
comment="">
>>> # first channel index in the specified group
...
>>> mdf.get('Sig', 1)
<Signal Sig:
samples=[ 11. 11. 11. 11. 11.]
timestamps=[0 1 2 3 4]
unit=""
info=None
comment="">
>>> # channel named Sig from group 1 channel index 2
...
>>> mdf.get('Sig', 1, 2)
<Signal Sig:
samples=[ 12. 12. 12. 12. 12.]
timestamps=[0 1 2 3 4]
unit=""
info=None
comment="">
>>> # channel index 1 or group 2
...
>>> mdf.get(None, 2, 1)
<Signal Sig:
samples=[ 21. 21. 21. 21. 21.]
timestamps=[0 1 2 3 4]
unit=""
info=None
comment="">
>>> mdf.get(group=2, index=1)
<Signal Sig:
samples=[ 21. 21. 21. 21. 21.]
timestamps=[0 1 2 3 4]
unit=""
info=None
comment="">
"""
gp_nr, ch_nr = self._validate_channel_selection(
name,
group,
index,
)
memory = self.memory
grp = self.groups[gp_nr]
if grp['data_location'] == v4c.LOCATION_ORIGINAL_FILE:
stream = self._file
else:
stream = self._tempfile
if memory == 'minimum':
channel = Channel(
address=grp['channels'][ch_nr],
stream=stream,
)
addr = grp['channel_conversions'][ch_nr]
if addr:
conversion = ChannelConversion(
address=addr,
stream=stream,
)
else:
conversion = None
if name is None:
name = TextBlock(
address=channel['name_addr'],
stream=stream,
)
name = (
name['text']
.decode('utf-8')
.strip(' \r\t\n\0')
.split('\\')[0]
)
channel.name = name
else:
channel = grp['channels'][ch_nr]
conversion = grp['channel_conversions'][ch_nr]
name = channel.name
dependency_list = grp['channel_dependencies'][ch_nr]
cycles_nr = grp['channel_group']['cycles_nr']
# get data group record
try:
parents, dtypes = grp['parents'], grp['types']
except KeyError:
grp['parents'], grp['types'] = self._prepare_record(grp)
parents, dtypes = grp['parents'], grp['types']
# get group data
if data is None:
data = self._load_group_data(grp)
info = None
# get the channel signal data if available
signal_data = self._load_signal_data(
channel['data_block_addr']
)
# check if this is a channel array
if dependency_list:
arrays = []
name = channel.name
if all(
not isinstance(dep, ChannelArrayBlock)
for dep in dependency_list):
# structure channel composition
if memory == 'minimum':
names = []
# TODO : get exactly he group and chanenl
for address in dependency_list:
channel = Channel(
address=address,
stream=stream,
)
name_ = get_text_v4(channel['name_addr'], stream)
names.append(name_)
else:
names = [ch.name for ch in dependency_list]
arrays = [
self.get(name_, samples_only=True, raw=raw)
for name_ in names
]
types = [
(name_, arr.dtype)
for name_, arr in zip(names, arrays)
]
if PYVERSION == 2:
types = fix_dtype_fields(types)
types = dtype(types)
vals = fromarrays(arrays, dtype=types)
cycles_nr = len(vals)
else:
# channel arrays
arrays = []
types = []
try:
parent, bit_offset = parents[ch_nr]
except KeyError:
parent, bit_offset = None, None
if parent is not None:
if 'record' not in grp:
if dtypes.itemsize:
record = fromstring(data, dtype=dtypes)
else:
record = None
if self.memory == 'full':
grp['record'] = record
else:
record = grp['record']
record.setflags(write=False)
vals = record[parent]
else:
vals = self._get_not_byte_aligned_data(data, grp, ch_nr)
dep = dependency_list[0]
if dep['flags'] & v4c.FLAG_CA_INVERSE_LAYOUT:
shape = vals.shape
shape = (shape[0],) + shape[1:][::-1]
vals = vals.reshape(shape)
axes = (0,) + tuple(range(len(shape) - 1, 0, -1))
vals = transpose(vals, axes=axes)
cycles_nr = len(vals)
for ca_block in dependency_list[:1]:
dims_nr = ca_block['dims']
if ca_block['ca_type'] == v4c.CA_TYPE_SCALE_AXIS:
shape = (ca_block['dim_size_0'],)
arrays.append(vals)
dtype_pair = channel.name, vals.dtype, shape
types.append(dtype_pair)
elif ca_block['ca_type'] == v4c.CA_TYPE_LOOKUP:
shape = vals.shape[1:]
arrays.append(vals)
dtype_pair = channel.name, vals.dtype, shape
types.append(dtype_pair)
if ca_block['flags'] & v4c.FLAG_CA_FIXED_AXIS:
for i in range(dims_nr):
shape = (ca_block['dim_size_{}'.format(i)],)
axis = []
for j in range(shape[0]):
key = 'axis_{}_value_{}'.format(i, j)
axis.append(ca_block[key])
axis = array([axis for _ in range(cycles_nr)])
arrays.append(axis)
dtype_pair = (
'axis_{}'.format(i),
axis.dtype,
shape,
)
types.append(dtype_pair)
else:
for i in range(dims_nr):
ch_nr, dg_nr = ca_block.referenced_channels[i]
if memory == 'minimum':
channel = Channel(
address=self.groups[dg_nr]['channels'][ch_nr],
stream=stream,
)
axisname = get_text_v4(
channel['name_addr'],
stream,
)
else:
axisname = (
self.groups[dg_nr]
['channels']
[ch_nr]
.name
)
shape = (ca_block['dim_size_{}'.format(i)],)
axis_values = self.get(
group=dg_nr,
index=ch_nr,
samples_only=True,
)
axis_values = axis_values[axisname]
arrays.append(axis_values)
dtype_pair = (
axisname,
axis_values.dtype,
shape,
)
types.append(dtype_pair)
elif ca_block['ca_type'] == v4c.CA_TYPE_ARRAY:
shape = vals.shape[1:]
arrays.append(vals)
dtype_pair = channel.name, vals.dtype, shape
types.append(dtype_pair)
for ca_block in dependency_list[1:]:
dims_nr = ca_block['dims']
if ca_block['flags'] & v4c.FLAG_CA_FIXED_AXIS:
for i in range(dims_nr):
shape = (ca_block['dim_size_{}'.format(i)],)
axis = []
for j in range(shape[0]):
key = 'axis_{}_value_{}'.format(i, j)
axis.append(ca_block[key])
axis = array([axis for _ in range(cycles_nr)])
arrays.append(axis)
dtype_pair = 'axis_{}'.format(i), axis.dtype, shape
types.append(dtype_pair)
else:
for i in range(dims_nr):
ch_nr, dg_nr = ca_block.referenced_channels[i]
if memory == 'minimum':
channel = Channel(
address=self.groups[dg_nr]['channels'][ch_nr],
stream=stream,
)
axisname = get_text_v4(
channel['name_addr'],
stream,
)
else:
axisname = (
self.groups[dg_nr]
['channels']
[ch_nr]
.name
)
shape = (ca_block['dim_size_{}'.format(i)],)
axis_values = self.get(
group=dg_nr,
index=ch_nr,
samples_only=True,
)
axis_values = axis_values[axisname]
arrays.append(axis_values)
dtype_pair = axisname, axis_values.dtype, shape
types.append(dtype_pair)
if PYVERSION == 2:
types = fix_dtype_fields(types)
vals = fromarrays(arrays, dtype(types))
else:
# get channel values
if channel['channel_type'] in (v4c.CHANNEL_TYPE_VIRTUAL,
v4c.CHANNEL_TYPE_VIRTUAL_MASTER):
data_type = channel['data_type']
ch_dtype = dtype(get_fmt_v4(data_type, 8))
vals = arange(cycles_nr, dtype=ch_dtype)
record = None
else:
try:
parent, bit_offset = parents[ch_nr]
except KeyError:
parent, bit_offset = None, None
if parent is not None:
if 'record' not in grp:
if dtypes.itemsize:
record = fromstring(data, dtype=dtypes)
else:
record = None
if memory == 'full':
grp['record'] = record
else:
record = grp['record']
record.setflags(write=False)
vals = record[parent]
bits = channel['bit_count']
size = vals.dtype.itemsize
data_type = channel['data_type']
if vals.dtype.kind not in 'ui' and (bit_offset or not bits == size * 8):
vals = self._get_not_byte_aligned_data(
data,
grp,
ch_nr,
)
else:
if bit_offset:
dtype_ = vals.dtype
if dtype_.kind == 'i':
vals = vals.astype(dtype('<u{}'.format(size)))
vals >>= bit_offset
else:
vals = vals >> bit_offset
if not bits == size * 8:
mask = (1 << bits) - 1
if vals.flags.writeable:
vals &= mask
else:
vals = vals & mask
if data_type in v4c.SIGNED_INT:
vals = as_non_byte_sized_signed_int(vals, bits)
else:
vals = self._get_not_byte_aligned_data(data, grp, ch_nr)
if cycles_nr:
if conversion is None:
conversion_type = v4c.CONVERSION_TYPE_NON
else:
conversion_type = conversion['conversion_type']
if raw:
pass
elif conversion_type == v4c.CONVERSION_TYPE_NON:
# check if it is VLDS channel type with SDBLOCK
data_type = channel['data_type']
channel_type = channel['channel_type']
if channel_type in (v4c.CHANNEL_TYPE_VALUE,
v4c.CHANNEL_TYPE_MLSD):
if v4c.DATA_TYPE_STRING_LATIN_1 \
<= data_type \
<= v4c.DATA_TYPE_STRING_UTF_16_BE:
vals = [val.tobytes() for val in vals]
if data_type == v4c.DATA_TYPE_STRING_UTF_16_BE:
encoding = 'utf-16-be'
elif data_type == v4c.DATA_TYPE_STRING_UTF_16_LE:
encoding = 'utf-16-le'
elif data_type == v4c.DATA_TYPE_STRING_UTF_8:
encoding = 'utf-8'
elif data_type == v4c.DATA_TYPE_STRING_LATIN_1:
encoding = 'latin-1'
vals = array(
[x.decode(encoding).strip('\0') for x in vals]
)
vals = encode(vals, 'latin-1')
# CANopen date
elif data_type == v4c.DATA_TYPE_CANOPEN_DATE:
vals = vals.tostring()
types = dtype(
[('ms', '<u2'),
('min', '<u1'),
('hour', '<u1'),
('day', '<u1'),
('month', '<u1'),
('year', '<u1')]
)
dates = fromstring(vals, types)
arrays = []
arrays.append(dates['ms'])
# bit 6 and 7 of minutes are reserved
arrays.append(dates['min'] & 0x3F)
# only firt 4 bits of hour are used
arrays.append(dates['hour'] & 0xF)
# the first 4 bits are the day number
arrays.append(dates['day'] & 0xF)
# bit 6 and 7 of month are reserved
arrays.append(dates['month'] & 0x3F)
# bit 7 of year is reserved
arrays.append(dates['year'] & 0x7F)
# add summer or standard time information for hour
arrays.append((dates['hour'] & 0x80) >> 7)
# add day of week information
arrays.append((dates['day'] & 0xF0) >> 4)
names = [
'ms',
'min',
'hour',
'day',
'month',
'year',
'summer_time',
'day_of_week',
]
vals = fromarrays(arrays, names=names)
# CANopen time
elif data_type == v4c.DATA_TYPE_CANOPEN_TIME:
vals = vals.tostring()
types = dtype(
[('ms', '<u4'),
('days', '<u2')]
)
dates = fromstring(vals, types)
arrays = []
# bits 28 to 31 are reserverd for ms
arrays.append(dates['ms'] & 0xFFFFFFF)
arrays.append(dates['days'] & 0x3F)
names = ['ms', 'days']
vals = fromarrays(arrays, names=names)
# byte array
elif data_type == v4c.DATA_TYPE_BYTEARRAY:
vals = vals.tostring()
size = max(bits >> 3, 1)
vals = frombuffer(
vals,
dtype=dtype('({},)u1'.format(size)),
)
types = [(channel.name, vals.dtype, vals.shape[1:])]
if PYVERSION == 2:
types = fix_dtype_fields(types)
types = dtype(types)
arrays = [vals, ]
vals = fromarrays(arrays, dtype=types)
elif channel_type == v4c.CHANNEL_TYPE_VLSD:
if signal_data:
values = []
for offset in vals:
offset = int(offset)
str_size = unpack_from('<I', signal_data, offset)[0]
values.append(
signal_data[offset + 4: offset + 4 + str_size]
)
if data_type == v4c.DATA_TYPE_STRING_UTF_16_BE:
vals = [v.decode('utf-16-be') for v in values]
elif data_type == v4c.DATA_TYPE_STRING_UTF_16_LE:
vals = [v.decode('utf-16-le') for v in values]
elif data_type == v4c.DATA_TYPE_STRING_UTF_8:
vals = [v.decode('utf-8') for v in values]
elif data_type == v4c.DATA_TYPE_STRING_LATIN_1:
vals = [v.decode('latin-1') for v in values]
if PYVERSION == 2:
vals = array([str(val) for val in vals])
else:
vals = array(vals)
vals = encode(vals, 'latin-1')
else:
# no VLSD signal data samples
vals = array([])
elif conversion_type == v4c.CONVERSION_TYPE_LIN:
a = conversion['a']
b = conversion['b']
if (a, b) != (1, 0):
vals = vals * a
if b:
vals += b
elif conversion_type == v4c.CONVERSION_TYPE_RAT:
P1 = conversion['P1']
P2 = conversion['P2']
P3 = conversion['P3']
P4 = conversion['P4']
P5 = conversion['P5']
P6 = conversion['P6']
if (P1, P2, P3, P4, P5, P6) != (0, 1, 0, 0, 0, 1):
X = vals
vals = evaluate(v4c.CONV_RAT_TEXT)
elif conversion_type == v4c.CONVERSION_TYPE_ALG:
if not memory == 'minimum':
formula = conversion.formula
else:
block = TextBlock(
address=conversion['formula_addr'],
stream=stream,
)
formula = (
block['text']
.decode('utf-8')
.strip(' \n\t\0')
)
X = vals
vals = evaluate(formula)
elif conversion_type in (v4c.CONVERSION_TYPE_TABI,
v4c.CONVERSION_TYPE_TAB):
nr = conversion['val_param_nr'] // 2
raw_vals = array(
[conversion['raw_{}'.format(i)] for i in range(nr)]
)
phys = array(
[conversion['phys_{}'.format(i)] for i in range(nr)]
)
if conversion_type == v4c.CONVERSION_TYPE_TABI:
vals = interp(vals, raw_vals, phys)
else:
idx = searchsorted(raw_vals, vals)
idx = clip(idx, 0, len(raw_vals) - 1)
vals = phys[idx]
elif conversion_type == v4c.CONVERSION_TYPE_RTAB:
nr = (conversion['val_param_nr'] - 1) // 3
lower = array(
[conversion['lower_{}'.format(i)] for i in range(nr)]
)
upper = array(
[conversion['upper_{}'.format(i)] for i in range(nr)]
)
phys = array(
[conversion['phys_{}'.format(i)] for i in range(nr)]
)
default = conversion['default']
# INT channel
if channel['data_type'] <= 3:
res = []
for v in vals:
for l, u, p in zip(lower, upper, phys):
if l <= v <= u:
res.append(p)
break
else:
res.append(default)
size = max(bits >> 3, 1)
ch_fmt = get_fmt_v4(channel['data_type'], size)
vals = array(res).astype(ch_fmt)
# else FLOAT channel
else:
res = []
for v in vals:
for l, u, p in zip(lower, upper, phys):
if l <= v < u:
res.append(p)
break
else:
res.append(default)
size = max(bits >> 3, 1)
ch_fmt = get_fmt_v4(channel['data_type'], size)
vals = array(res).astype(ch_fmt)
elif conversion_type == v4c.CONVERSION_TYPE_TABX:
nr = conversion['val_param_nr']
raw_vals = array(
[conversion['val_{}'.format(i)] for i in range(nr)]
)
if not memory == 'minimum':
phys = array(
[grp['texts']['conversion_tab'][ch_nr]['text_{}'.format(i)]['text']
for i in range(nr)]
)
default = grp['texts']['conversion_tab'][ch_nr] \
.get('default_addr', {}) \
.get('text', b'')
else:
phys = []
for i in range(nr):
address = (
grp['texts']
['conversion_tab']
[ch_nr]
['text_{}'.format(i)]
)
if address:
block = TextBlock(
address=address,
stream=stream,
)
phys.append(block['text'])
else:
phys.append(b'')
phys = array(phys)
if grp['texts']['conversion_tab'][ch_nr].get(
'default_addr',
0):
block = TextBlock(
address=grp['texts']['conversion_tab'][ch_nr]['default_addr'],
stream=stream,
)
default = block['text']
else:
default = b''
info = {
'raw': raw_vals,
'phys': phys,
'default': default,
}
elif conversion_type == v4c.CONVERSION_TYPE_RTABX:
nr = conversion['val_param_nr'] // 2
if not memory == 'minimum':
phys = array(
[grp['texts']['conversion_tab'][ch_nr]['text_{}'.format(i)]['text']
for i in range(nr)]
)
default = grp['texts']['conversion_tab'][ch_nr] \
.get('default_addr', {}) \
.get('text', b'')
else:
phys = []
for i in range(nr):
address = grp['texts']['conversion_tab'][ch_nr]['text_{}'.format(i)]
if address:
block = TextBlock(
address=address,
stream=stream,
)
phys.append(block['text'])
else:
phys.append(b'')
phys = array(phys)
if grp['texts']['conversion_tab'][ch_nr].get(
'default_addr',
0):
block = TextBlock(
address=grp['texts']['conversion_tab'][ch_nr]['default_addr'],
stream=stream,
)
default = block['text']
else:
default = b''
lower = array(
[conversion['lower_{}'.format(i)] for i in range(nr)]
)
upper = array(
[conversion['upper_{}'.format(i)] for i in range(nr)]
)
info = {
'lower': lower,
'upper': upper,
'phys': phys,
'default': default,
}
elif conversion_type == v4c.CONVERSION_TYPE_TTAB:
nr = conversion['val_param_nr'] - 1
if memory == 'minimum':
raw_vals = []
for i in range(nr):
block = TextBlock(
address=grp['texts']['conversion_tab'][ch_nr]['text_{}'.format(i)],
stream=stream,
)
raw_vals.append(block['text'])
raw_vals = array(raw_vals)
else:
raw_vals = array(
[grp['texts']['conversion_tab'][ch_nr]['text_{}'.format(i)]['text']
for i in range(nr)]
)
phys = array(
[conversion['val_{}'.format(i)] for i in range(nr)]
)
default = conversion['val_default']
info = {
'raw': raw_vals,
'phys': phys,
'default': default,
}
elif conversion_type == v4c.CONVERSION_TYPE_TRANS:
nr = (conversion['ref_param_nr'] - 1) // 2
if memory == 'minimum':
in_ = []
for i in range(nr):
block = TextBlock(
address=grp['texts']['conversion_tab'][ch_nr]['input_{}_addr'.format(i)],
stream=stream,
)
in_.append(block['text'])
in_ = array(in_)
out_ = []
for i in range(nr):
block = TextBlock(
address=grp['texts']['conversion_tab'][ch_nr]['output_{}_addr'.format(i)],
stream=stream,
)
out_.append(block['text'])
out_ = array(out_)
block = TextBlock(
address=grp['texts']['conversion_tab'][ch_nr]['default_addr'],
stream=stream,
)
default = block['text']
else:
in_ = array(
[grp['texts']['conversion_tab'][ch_nr]['input_{}_addr'.format(i)]['text']
for i in range(nr)]
)
out_ = array(
[grp['texts']['conversion_tab'][ch_nr]['output_{}_addr'.format(i)]['text']
for i in range(nr)]
)
default = grp['texts']['conversion_tab'][ch_nr]['default_addr']['text']
res = []
for v in vals:
for i, o in zip(in_, out_):
if v == i:
res.append(o)
break
else:
res.append(default)
vals = array(res)
info = {
'input': in_,
'output': out_,
'default': default,
}
# in case of invalidation bits, valid_index will hold the valid indexes
valid_index = None
if grp['channel_group']['invalidation_bytes_nr']:
if channel['flags'] & (
v4c.FLAG_INVALIDATION_BIT_VALID | v4c.FLAG_ALL_SAMPLES_VALID) == v4c.FLAG_INVALIDATION_BIT_VALID:
ch_invalidation_pos = channel['pos_invalidation_bit']
pos_byte, pos_offset = divmod(ch_invalidation_pos, 8)
mask = 1 << pos_offset
if record is None:
record = fromstring(data, dtype=dtypes)
record.setflags(write=False)
inval_bytes = record['invalidation_bytes']
inval_index = array(
[bytes_[pos_byte] & mask for bytes_ in inval_bytes]
)
valid_index = argwhere(inval_index == 0).flatten()
vals = vals[valid_index]
if samples_only:
res = vals
else:
# search for unit in conversion texts
if memory == 'minimum':
address = (
conversion and conversion['unit_addr']
or channel['unit_addr']
or 0
)
if address:
unit = get_text_v4(
address=address,
stream=stream,
)
else:
unit = ''
address = channel['comment_addr']
if address:
comment = get_text_v4(
address=address,
stream=stream,
)
if channel.comment_type == b'##MD':
match = TX.search(comment)
if match:
comment = match.group('text')
else:
comment = ''
else:
comment = ''
else:
unit = (
conversion and conversion.unit
or channel.unit
or ''
)
comment = channel.comment
if channel.comment_type == b'##MD':
match = TX.search(comment)
if match:
comment = match.group('text')
t = self.get_master(gp_nr, data)
# consider invalidation bits
if valid_index is not None:
t = t[valid_index]
res = Signal(
samples=vals,
timestamps=t,
unit=unit,
name=name,
comment=comment,
info=info,
)
if raster and t:
tx = linspace(0, t[-1], int(t[-1] / raster))
res = res.interp(tx)
return res
[docs] def get_master(self, index, data=None):
""" returns master channel samples for given group
Parameters
----------
index : int
group index
data : bytes
data block raw bytes; default None
Returns
-------
t : numpy.array
master channel samples
"""
try:
return self._master_channel_cache[index]
except KeyError:
pass
group = self.groups[index]
if group['data_location'] == v4c.LOCATION_ORIGINAL_FILE:
stream = self._file
else:
stream = self._tempfile
memory = self.memory
time_ch_nr = self.masters_db.get(index, None)
cycles_nr = group['channel_group']['cycles_nr']
if time_ch_nr is None:
t = arange(cycles_nr, dtype=float64)
else:
time_conv = group['channel_conversions'][time_ch_nr]
if memory == 'minimum':
if time_conv:
time_conv = ChannelConversion(
address=group['channel_conversions'][time_ch_nr],
stream=stream,
)
else:
time_conv = None
time_ch = Channel(
address=group['channels'][time_ch_nr],
stream=stream,
)
else:
time_ch = group['channels'][time_ch_nr]
if time_ch['channel_type'] == v4c.CHANNEL_TYPE_VIRTUAL_MASTER:
time_a = time_conv['a']
time_b = time_conv['b']
t = arange(cycles_nr, dtype=float64) * time_a + time_b
else:
# get data group parents and dtypes
try:
parents, dtypes = group['parents'], group['types']
except KeyError:
parents, dtypes = self._prepare_record(group)
group['parents'], group['types'] = parents, dtypes
# get data
if data is None:
data = self._load_group_data(group)
try:
parent, _ = parents[time_ch_nr]
except KeyError:
parent = None
if parent is not None:
not_found = object()
record = group.get('record', not_found)
if record is not_found:
if dtypes.itemsize:
record = fromstring(data, dtype=dtypes)
else:
record = None
if memory == 'full':
group['record'] = record
record.setflags(write=False)
t = record[parent]
else:
t = self._get_not_byte_aligned_data(
data, group,
time_ch_nr,
)
# get timestamps
if time_conv:
if time_conv['conversion_type'] == v4c.CONVERSION_TYPE_LIN:
time_a = time_conv['a']
time_b = time_conv['b']
t = t * time_a
if time_b:
t += time_b
self._master_channel_cache[index] = t
return t
[docs] def info(self):
"""get MDF information as a dict
Examples
--------
>>> mdf = MDF4('test.mdf')
>>> mdf.info()
"""
info = {}
info['version'] = self.identification['version_str'] \
.decode('utf-8') \
.strip(' \n\t\0')
info['groups'] = len(self.groups)
for i, gp in enumerate(self.groups):
if gp['data_location'] == v4c.LOCATION_ORIGINAL_FILE:
stream = self._file
elif gp['data_location'] == v4c.LOCATION_TEMPORARY_FILE:
stream = self._tempfile
inf = {}
info['group {}'.format(i)] = inf
inf['cycles'] = gp['channel_group']['cycles_nr']
inf['channels count'] = len(gp['channels'])
for j, channel in enumerate(gp['channels']):
if self.memory == 'minimum':
channel = Channel(
address=channel,
stream=stream,
)
name = TextBlock(
address=channel['name_addr'],
stream=stream,
)
name = (
name['text']
.decode('utf-8')
.strip(' \r\t\n\0')
.split('\\')[0]
)
else:
name = channel.name
ch_type = v4c.CHANNEL_TYPE_TO_DESCRIPTION[channel['channel_type']]
inf['channel {}'.format(j)] = 'name="{}" type={}'.format(
name,
ch_type,
)
return info
[docs] def save(self, dst='', overwrite=None, compression=0):
"""Save MDF to *dst*. If *dst* is not provided the the destination file
name is the MDF name. If overwrite is *True* then the destination file
is overwritten, otherwise the file name is appened with '_<cntr>', were
'<cntr>' is the first conter that produces a new file name
(that does not already exist in the filesystem)
Parameters
----------
dst : str
destination file name, Default ''
overwrite : bool
overwrite flag, default *False*
compression : int
use compressed data blocks, default 0; valid since version 4.10
* 0 - no compression
* 1 - deflate (slower, but produces smaller files)
* 2 - transposition + deflate (slowest, but produces
the smallest files)
Returns
-------
output_file : str
output file name
"""
if overwrite is None:
overwrite = self._overwrite
output_file = ''
if self.name is None and dst == '':
message = (
'Must specify a destination file name '
'for MDF created from scratch'
)
raise MdfException(message)
else:
if self.memory == 'minimum':
output_file = self._save_without_metadata(
dst,
overwrite,
compression,
)
else:
output_file = self._save_with_metadata(
dst,
overwrite,
compression,
)
return output_file
def _save_with_metadata(self, dst, overwrite, compression):
"""Save MDF to *dst*. If *dst* is not provided the the destination file
name is the MDF name. If overwrite is *True* then the destination file
is overwritten, otherwise the file name is appened with '_<cntr>', were
'<cntr>' is the first conter that produces a new file name
(that does not already exist in the filesystem)
Parameters
----------
dst : str
destination file name, Default ''
overwrite : bool
overwrite flag, default *False*
compression : int
use compressed data blocks, default 0; valid since version 4.10
* 0 - no compression
* 1 - deflate (slower, but produces smaller files)
* 2 - transposition + deflate (slowest, but produces
the smallest files)
"""
if self.name is None and dst == '':
message = ('Must specify a destination file name '
'for MDF created from scratch')
raise MdfException(message)
dst = dst if dst else self.name
if not dst.endswith(('mf4', 'MF4')):
dst = dst + '.mf4'
if overwrite is False:
if os.path.isfile(dst):
cntr = 0
while True:
name = os.path.splitext(dst)[0] + '_{}.mf4'.format(cntr)
if not os.path.isfile(name):
break
else:
cntr += 1
message = (
'Destination file "{}" already exists '
'and "overwrite" is False. Saving MDF file as "{}"'
)
message = message.format(dst, name)
warnings.warn(message)
dst = name
if not self.file_history:
comment = 'created'
else:
comment = 'updated'
text = """<FHcomment>
<TX>{}</TX>
<tool_id>asammdf</tool_id>
<tool_vendor>asammdf</tool_vendor>
<tool_version>{}</tool_version>
</FHcomment>"""
text = text.format(comment, __version__)
self.file_history.append(
[FileHistory(),
TextBlock(text=text, meta=True)]
)
if self.memory == 'low' and dst == self.name:
destination = dst + '.temp'
else:
destination = dst
with open(destination, 'wb+') as dst_:
defined_texts = {}
write = dst_.write
tell = dst_.tell
seek = dst_.seek
write(bytes(self.identification))
write(bytes(self.header))
original_data_addresses = []
if compression == 1:
zip_type = v4c.FLAG_DZ_DEFLATE
else:
zip_type = v4c.FLAG_DZ_TRANPOSED_DEFLATE
# write DataBlocks first
for gp in self.groups:
original_data_addresses.append(
gp['data_group']['data_block_addr']
)
address = tell()
data = self._load_group_data(gp)
if MDF4._split_data_blocks:
samples_size = gp['channel_group']['samples_byte_nr']
split_size = MDF4._split_threshold // samples_size
split_size *= samples_size
if split_size == 0:
chunks = 1
else:
chunks = len(data) / split_size
chunks = int(ceil(chunks))
else:
chunks = 1
if chunks == 1:
if compression and self.version != '4.00':
if compression == 1:
param = 0
else:
param = gp['channel_group']['samples_byte_nr']
kargs = {
'data': data,
'zip_type': zip_type,
'param': param,
}
data_block = DataZippedBlock(**kargs)
else:
data_block = DataBlock(data=data)
write(bytes(data_block))
align = data_block['block_len'] % 8
if align:
write(b'\0' * (8 - align))
if gp['channel_group']['cycles_nr']:
gp['data_group']['data_block_addr'] = address
else:
gp['data_group']['data_block_addr'] = 0
else:
kargs = {
'flags': v4c.FLAG_DL_EQUAL_LENGHT,
'zip_type': zip_type,
}
hl_block = HeaderList(**kargs)
kargs = {
'flags': v4c.FLAG_DL_EQUAL_LENGHT,
'links_nr': chunks + 1,
'data_block_nr': chunks,
'data_block_len': split_size,
}
dl_block = DataList(**kargs)
data_blocks = []
for i in range(chunks):
data_ = data[i * split_size: (i + 1) * split_size]
if compression and self.version != '4.00':
if compression == 1:
zip_type = v4c.FLAG_DZ_DEFLATE
else:
zip_type = v4c.FLAG_DZ_TRANPOSED_DEFLATE
if compression == 1:
param = 0
else:
param = gp['channel_group']['samples_byte_nr']
kargs = {
'data': data_,
'zip_type': zip_type,
'param': param,
}
block = DataZippedBlock(**kargs)
else:
block = DataBlock(data=data_)
address = tell()
block.address = address
data_blocks.append(block)
write(bytes(block))
align = block['block_len'] % 8
if align:
write(b'\0' * (8 - align))
dl_block['data_block_addr{}'.format(i)] = address
address = tell()
dl_block.address = address
write(bytes(dl_block))
if compression and self.version != '4.00':
hl_block['first_dl_addr'] = address
address = tell()
hl_block.address = address
write(bytes(hl_block))
gp['data_group']['data_block_addr'] = address
address = tell()
blocks = []
if self.file_comment:
self.file_comment.address = address
address += self.file_comment['block_len']
blocks.append(self.file_comment)
# attachemnts
if self.attachments:
for at_block, texts in self.attachments:
for key, text in texts.items():
at_block[key] = text.address = address
address += text['block_len']
blocks.append(text)
for at_block, texts in self.attachments:
at_block.address = address
blocks.append(at_block)
align = at_block['block_len'] % 8
# append 8vyte alignemnt bytes for attachments
if align % 8:
blocks.append(b'\0' * (8 - align))
address += at_block['block_len'] + 8 - align
else:
address += at_block['block_len']
for i, (at_block, text) in enumerate(self.attachments[:-1]):
at_block['next_at_addr'] = self.attachments[i + 1][0].address
self.attachments[-1][0]['next_at_addr'] = 0
# file history blocks
for i, (fh, fh_text) in enumerate(self.file_history):
fh_text.address = address
blocks.append(fh_text)
address += fh_text['block_len']
fh['comment_addr'] = fh_text.address
for i, (fh, fh_text) in enumerate(self.file_history):
fh.address = address
address += fh['block_len']
blocks.append(fh)
for i, (fh, fh_text) in enumerate(self.file_history[:-1]):
fh['next_fh_addr'] = self.file_history[i + 1][0].address
self.file_history[-1][0]['next_fh_addr'] = 0
# data groups
gp_rec_ids = []
for gp in self.groups:
gp_rec_ids.append(gp['data_group']['record_id_len'])
gp['data_group']['record_id_len'] = 0
gp['data_group'].address = address
address += gp['data_group']['block_len']
blocks.append(gp['data_group'])
gp['data_group']['comment_addr'] = 0
for i, dg in enumerate(self.groups[:-1]):
addr_ = self.groups[i + 1]['data_group'].address
dg['data_group']['next_dg_addr'] = addr_
self.groups[-1]['data_group']['next_dg_addr'] = 0
tab_conversion = (
v4c.CONVERSION_TYPE_TABX,
v4c.CONVERSION_TYPE_RTABX,
v4c.CONVERSION_TYPE_TTAB,
v4c.CONVERSION_TYPE_TRANS,
)
si_map = {}
# go through each data group and append the rest of the blocks
for i, gp in enumerate(self.groups):
# write TXBLOCK's
for item_list in gp['texts'].values():
for dict_ in item_list:
if dict_ is None:
continue
for key, tx_block in dict_.items():
# text blocks can be shared
text = tx_block['text']
if text in defined_texts:
tx_block.address = defined_texts[text]
else:
defined_texts[text] = address
tx_block.address = address
address += tx_block['block_len']
blocks.append(tx_block)
for channel in gp['channels']:
if channel.name:
tx_block = TextBlock(text=channel.name)
text = tx_block['text']
if text in defined_texts:
channel['name_addr'] = defined_texts[text]
else:
channel['name_addr'] = address
defined_texts[text] = address
tx_block.address = address
address += tx_block['block_len']
blocks.append(tx_block)
else:
channel['name_addr'] = 0
if channel.unit:
tx_block = TextBlock(text=channel.unit)
text = tx_block['text']
if text in defined_texts:
channel['unit_addr'] = defined_texts[text]
else:
channel['unit_addr'] = address
defined_texts[text] = address
tx_block.address = address
address += tx_block['block_len']
blocks.append(tx_block)
else:
channel['unit_addr'] = 0
if channel.comment:
meta = channel.comment_type == b'##MD'
tx_block = TextBlock(text=channel.comment, meta=meta)
text = tx_block['text']
if text in defined_texts:
channel['comment_addr'] = defined_texts[text]
else:
channel['comment_addr'] = address
defined_texts[text] = address
tx_block.address = address
address += tx_block['block_len']
blocks.append(tx_block)
else:
channel['comment_addr'] = 0
for source in gp['channel_sources']:
if source:
if source.name:
tx_block = TextBlock(text=source.name)
text = tx_block['text']
if text in defined_texts:
source['name_addr'] = defined_texts[text]
else:
source['name_addr'] = address
defined_texts[text] = address
tx_block.address = address
address += tx_block['block_len']
blocks.append(tx_block)
else:
source['name_addr'] = 0
if source.path:
tx_block = TextBlock(text=source.path)
text = tx_block['text']
if text in defined_texts:
source['path_addr'] = defined_texts[text]
else:
source['path_addr'] = address
defined_texts[text] = address
tx_block.address = address
address += tx_block['block_len']
blocks.append(tx_block)
else:
source['path_addr'] = 0
if source.comment:
tx_block = TextBlock(text=source.comment)
text = tx_block['text']
if text in defined_texts:
source['comment_addr'] = defined_texts[text]
else:
source['comment_addr'] = address
defined_texts[text] = address
tx_block.address = address
address += tx_block['block_len']
blocks.append(tx_block)
else:
source['comment_addr'] = 0
for conversion in gp['channel_conversions']:
if conversion:
if conversion.name:
tx_block = TextBlock(text=conversion.name)
text = tx_block['text']
if text in defined_texts:
conversion['name_addr'] = defined_texts[text]
else:
conversion['name_addr'] = address
defined_texts[text] = address
tx_block.address = address
address += tx_block['block_len']
blocks.append(tx_block)
else:
conversion['name_addr'] = 0
if conversion.unit:
tx_block = TextBlock(text=conversion.unit)
text = tx_block['text']
if text in defined_texts:
conversion['unit_addr'] = defined_texts[text]
else:
conversion['unit_addr'] = address
defined_texts[text] = address
tx_block.address = address
address += tx_block['block_len']
blocks.append(tx_block)
else:
conversion['unit_addr'] = 0
if conversion.comment:
tx_block = TextBlock(text=conversion.comment)
text = tx_block['text']
if text in defined_texts:
conversion['comment_addr'] = defined_texts[text]
else:
conversion['comment_addr'] = address
defined_texts[text] = address
tx_block.address = address
address += tx_block['block_len']
blocks.append(tx_block)
else:
conversion['comment_addr'] = 0
if conversion['conversion_type'] == v4c.CONVERSION_TYPE_ALG and conversion.formula:
tx_block = TextBlock(text=conversion.formula)
text = tx_block['text']
if text in defined_texts:
conversion['formula_addr'] = defined_texts[text]
else:
conversion['formula_addr'] = address
defined_texts[text] = address
tx_block.address = address
address += tx_block['block_len']
blocks.append(tx_block)
# channel conversions
for j, conv in enumerate(gp['channel_conversions']):
if conv:
conv.address = address
conv['inv_conv_addr'] = 0
if conv['conversion_type'] in tab_conversion:
for key in gp['texts']['conversion_tab'][j]:
conv[key] = (
gp['texts']
['conversion_tab']
[j]
[key]
.address
)
address += conv['block_len']
blocks.append(conv)
# channel sources
for j, source in enumerate(gp['channel_sources']):
if source:
source_id = id(source)
if source_id not in si_map:
source.address = address
address += source['block_len']
blocks.append(source)
si_map[source_id] = 0
# channel data
gp_sd = gp['signal_data'] = []
for j, ch in enumerate(gp['channels']):
signal_data = self._load_signal_data(ch['data_block_addr'])
if signal_data:
signal_data = SignalDataBlock(data=signal_data)
signal_data.address = address
address += signal_data['block_len']
blocks.append(signal_data)
align = signal_data['block_len'] % 8
if align % 8:
blocks.append(b'\0' * (8 - align))
address += 8 - align
gp_sd.append(signal_data)
else:
gp_sd.append(None)
# channel dependecies
for j, dep_list in enumerate(gp['channel_dependencies']):
if dep_list:
if all(isinstance(dep, ChannelArrayBlock)
for dep in dep_list):
for dep in dep_list:
dep.address = address
address += dep['block_len']
blocks.append(dep)
for k, dep in enumerate(dep_list[:-1]):
dep['composition_addr'] = dep_list[k + 1].address
dep_list[-1]['composition_addr'] = 0
# channels
for j, (channel, signal_data) in enumerate(
zip(gp['channels'], gp['signal_data'])):
channel.address = address
address += channel['block_len']
blocks.append(channel)
if not gp['channel_conversions'][j]:
channel['conversion_addr'] = 0
else:
addr_ = gp['channel_conversions'][j].address
channel['conversion_addr'] = addr_
if gp['channel_sources'][j]:
addr_ = gp['channel_sources'][j].address
channel['source_addr'] = addr_
else:
channel['source_addr'] = 0
if signal_data:
channel['data_block_addr'] = signal_data.address
else:
channel['data_block_addr'] = 0
if gp['channel_dependencies'][j]:
addr_ = gp['channel_dependencies'][j][0].address
channel['component_addr'] = addr_
group_channels = gp['channels']
if group_channels:
for j, channel in enumerate(group_channels[:-1]):
channel['next_ch_addr'] = group_channels[j + 1].address
group_channels[-1]['next_ch_addr'] = 0
# channel dependecies
j = 0
while j < len(gp['channels']):
dep_list = gp['channel_dependencies'][j]
if dep_list and all(
isinstance(dep, Channel) for dep in dep_list):
gp['channels'][j]['component_addr'] = dep_list[0].address
gp['channels'][j]['next_ch_addr'] = dep_list[-1]['next_ch_addr']
dep_list[-1]['next_ch_addr'] = 0
j += len(dep_list)
for dep in dep_list:
dep['source_addr'] = 0
else:
j += 1
# channel group
gp['channel_group'].address = address
gp['channel_group']['first_ch_addr'] = gp['channels'][0].address
gp['channel_group']['next_cg_addr'] = 0
cg_texts = gp['texts']['channel_group'][0]
for key in ('acq_name_addr', 'comment_addr'):
if cg_texts and key in cg_texts:
addr_ = gp['texts']['channel_group'][0][key].address
gp['channel_group'][key] = addr_
gp['channel_group']['acq_source_addr'] = 0
gp['data_group']['first_cg_addr'] = address
address += gp['channel_group']['block_len']
blocks.append(gp['channel_group'])
for gp in self.groups:
for dep_list in gp['channel_dependencies']:
if dep_list:
if all(isinstance(dep, ChannelArrayBlock) for dep in dep_list):
for dep in dep_list:
for i, (ch_nr, gp_nr) in enumerate(dep.referenced_channels):
grp = self.groups[gp_nr]
ch = grp['channels'][ch_nr]
dep['scale_axis_{}_dg_addr'.format(i)] = grp['data_group'].address
dep['scale_axis_{}_cg_addr'.format(i)] = grp['channel_group'].address
dep['scale_axis_{}_ch_addr'.format(i)] = ch.address
for block in blocks:
write(bytes(block))
for gp, rec_id in zip(self.groups, gp_rec_ids):
gp['data_group']['record_id_len'] = rec_id
if self.groups:
addr_ = self.groups[0]['data_group'].address
self.header['first_dg_addr'] = addr_
else:
self.header['first_dg_addr'] = 0
self.header['file_history_addr'] = self.file_history[0][0].address
if self.attachments:
addr_ = self.attachments[0][0].address
self.header['first_attachment_addr'] = addr_
else:
self.header['first_attachment_addr'] = 0
if self.file_comment:
self.header['comment_addr'] = self.file_comment.address
else:
self.header['comment_addr'] = 0
seek(v4c.IDENTIFICATION_BLOCK_SIZE, v4c.SEEK_START)
write(bytes(self.header))
for orig_addr, gp in zip(original_data_addresses, self.groups):
gp['data_group']['data_block_addr'] = orig_addr
if self.memory == 'low' and dst == self.name:
self.close()
os.remove(self.name)
os.rename(destination, self.name)
self.groups = []
self.header = None
self.identification = None
self.file_history = []
self.channels_db = {}
self.masters_db = {}
self.attachments = []
self.file_comment = None
self._ch_map = {}
self._master_channel_cache = {}
self._tempfile = TemporaryFile()
self._file = open(self.name, 'rb')
self._read()
return dst
def _save_without_metadata(self, dst, overwrite, compression):
"""Save MDF to *dst*. If *dst* is not provided the the destination file
name is the MDF name. If overwrite is *True* then the destination file
is overwritten, otherwise the file name is appened with '_<cntr>', were
'<cntr>' is the first conter that produces a new file name
(that does not already exist in the filesystem)
Parameters
----------
dst : str
destination file name, Default ''
overwrite : bool
overwrite flag, default *False*
compression : int
use compressed data blocks, default 0; valid since version 4.10
* 0 - no compression
* 1 - deflate (slower, but produces smaller files)
* 2 - transposition + deflate (slowest, but produces
the smallest files)
"""
if self.name is None and dst == '':
message = (
'Must specify a destination file name '
'for MDF created from scratch'
)
raise MdfException(message)
dst = dst if dst else self.name
if not dst.endswith(('mf4', 'MF4')):
dst = dst + '.mf4'
if overwrite is False:
if os.path.isfile(dst):
cntr = 0
while True:
name = os.path.splitext(dst)[0] + '_{}.mf4'.format(cntr)
if not os.path.isfile(name):
break
else:
cntr += 1
message = (
'Destination file "{}" already exists '
'and "overwrite" is False. Saving MDF file as "{}"'
)
message = message.format(dst, name)
warnings.warn(message)
dst = name
if not self.file_history:
comment = 'created'
else:
comment = 'updated'
text = """<FHcomment>
<TX>{}</TX>
<tool_id>asammdf</tool_id>
<tool_vendor>asammdf</tool_vendor>
<tool_version>{}</tool_version>
</FHcomment>"""
text = text.format(comment, __version__)
self.file_history.append(
[FileHistory(),
TextBlock(text=text, meta=True)]
)
if dst == self.name:
destination = dst + '.temp'
else:
destination = dst
with open(destination, 'wb+') as dst_:
defined_texts = {}
write = dst_.write
tell = dst_.tell
seek = dst_.seek
write(bytes(self.identification))
write(bytes(self.header))
original_data_addresses = []
if compression == 1:
zip_type = v4c.FLAG_DZ_DEFLATE
else:
zip_type = v4c.FLAG_DZ_TRANPOSED_DEFLATE
# write DataBlocks first
for gp in self.groups:
original_data_addresses.append(
gp['data_group']['data_block_addr']
)
address = tell()
data = self._load_group_data(gp)
if MDF4._split_data_blocks:
samples_size = gp['channel_group']['samples_byte_nr']
split_size = MDF4._split_threshold // samples_size
split_size *= samples_size
if split_size == 0:
chunks = 1
else:
chunks = len(data) / split_size
chunks = int(ceil(chunks))
else:
chunks = 1
if chunks == 1:
if compression and self.version != '4.00':
if compression == 1:
param = 0
else:
param = gp['channel_group']['samples_byte_nr']
kargs = {
'data': data,
'zip_type': zip_type,
'param': param,
}
data_block = DataZippedBlock(**kargs)
else:
data_block = DataBlock(data=data)
write(bytes(data_block))
align = data_block['block_len'] % 8
if align:
write(b'\0' * (8 - align))
if gp['channel_group']['cycles_nr']:
gp['data_group']['data_block_addr'] = address
else:
gp['data_group']['data_block_addr'] = 0
else:
kargs = {
'flags': v4c.FLAG_DL_EQUAL_LENGHT,
'zip_type': zip_type,
}
hl_block = HeaderList(**kargs)
kargs = {
'flags': v4c.FLAG_DL_EQUAL_LENGHT,
'links_nr': chunks + 1,
'data_block_nr': chunks,
'data_block_len': split_size,
}
dl_block = DataList(**kargs)
data_blocks = []
for i in range(chunks):
data_ = data[i * split_size: (i + 1) * split_size]
if compression and self.version != '4.00':
if compression == 1:
zip_type = v4c.FLAG_DZ_DEFLATE
else:
zip_type = v4c.FLAG_DZ_TRANPOSED_DEFLATE
if compression == 1:
param = 0
else:
param = gp['channel_group']['samples_byte_nr']
kargs = {
'data': data_,
'zip_type': zip_type,
'param': param,
}
block = DataZippedBlock(**kargs)
else:
block = DataBlock(data=data_)
address = tell()
block.address = address
data_blocks.append(block)
write(bytes(block))
align = block['block_len'] % 8
if align:
write(b'\0' * (8 - align))
dl_block['data_block_addr{}'.format(i)] = address
address = tell()
dl_block.address = address
write(bytes(dl_block))
if compression and self.version != '4.00':
hl_block['first_dl_addr'] = address
address = tell()
hl_block.address = address
write(bytes(hl_block))
gp['data_group']['data_block_addr'] = address
address = tell()
if self.file_comment:
address = tell()
self.file_comment.address = address
write(bytes(self.file_comment))
# attachemnts
address = tell()
blocks = []
if self.attachments:
for at_block, texts in self.attachments:
for key, text in texts.items():
at_block[key] = text.address = address
address += text['block_len']
blocks.append(text)
for at_block, texts in self.attachments:
at_block.address = address
blocks.append(at_block)
align = at_block['block_len'] % 8
# append 8vyte alignemnt bytes for attachments
if align % 8:
blocks.append(b'\0' * (8 - align))
address += at_block['block_len'] + 8 - align
else:
address += at_block['block_len']
for i, (at_block, text) in enumerate(self.attachments[:-1]):
at_block['next_at_addr'] = self.attachments[i + 1][0].address
self.attachments[-1][0]['next_at_addr'] = 0
# file history blocks
for i, (fh, fh_text) in enumerate(self.file_history):
fh_text.address = address
blocks.append(fh_text)
address += fh_text['block_len']
fh['comment_addr'] = fh_text.address
for i, (fh, fh_text) in enumerate(self.file_history):
fh.address = address
address += fh['block_len']
blocks.append(fh)
for i, (fh, fh_text) in enumerate(self.file_history[:-1]):
fh['next_fh_addr'] = self.file_history[i + 1][0].address
self.file_history[-1][0]['next_fh_addr'] = 0
for blk in blocks:
write(bytes(blk))
blocks = []
tab_conversion = (
v4c.CONVERSION_TYPE_TABX,
v4c.CONVERSION_TYPE_RTABX,
v4c.CONVERSION_TYPE_TTAB,
v4c.CONVERSION_TYPE_TRANS,
)
si_map = {}
# go through each data group and append the rest of the blocks
for i, gp in enumerate(self.groups):
gp['temp_channels'] = ch_addrs = []
gp['temp_channel_conversions'] = cc_addrs = []
gp['temp_channel_sources'] = si_addrs = []
if gp['data_location'] == v4c.LOCATION_ORIGINAL_FILE:
stream = self._file
else:
stream = self._tempfile
temp_texts = deepcopy(gp['texts'])
# write TXBLOCK's
for item_list in temp_texts.values():
for dict_ in item_list:
if not dict_:
continue
for key, tx_block in dict_.items():
# text blocks can be shared
block = TextBlock(
address=tx_block,
stream=stream,
)
text = block['text']
if text in defined_texts:
dict_[key] = defined_texts[text]
else:
address = tell()
defined_texts[text] = address
dict_[key] = address
write(bytes(block))
for source in gp['channel_sources']:
if source:
stream.seek(source, v4c.SEEK_START)
raw_bytes = stream.read(v4c.SI_BLOCK_SIZE)
if raw_bytes in si_map:
si_addrs.append(si_map[raw_bytes])
else:
source = SourceInformation(
raw_bytes=raw_bytes,
)
if source['name_addr']:
tx_block = TextBlock(
address=source['name_addr'],
stream=stream,
)
text = tx_block['text']
if text in defined_texts:
source['name_addr'] = defined_texts[text]
else:
address = tell()
source['name_addr'] = address
defined_texts[text] = address
tx_block.address = address
write(bytes(tx_block))
else:
source['name_addr'] = 0
if source.path:
tx_block = TextBlock(
address=source['path_addr'],
stream=stream,
)
text = tx_block['text']
if text in defined_texts:
source['path_addr'] = defined_texts[text]
else:
address = tell()
source['path_addr'] = address
defined_texts[text] = address
tx_block.address = address
write(bytes(tx_block))
else:
source['path_addr'] = 0
if source['comment_addr']:
tx_block = TextBlock(
address=source['comment_addr'],
stream=stream,
)
text = tx_block['text']
if text in defined_texts:
source['comment_addr'] = defined_texts[text]
else:
address = tell()
source['comment_addr'] = address
defined_texts[text] = address
tx_block.address = address
write(bytes(tx_block))
else:
source['comment_addr'] = 0
address = tell()
si_addrs.append(address)
si_map[raw_bytes] = address
write(bytes(source))
else:
si_addrs.append(0)
for j, conversion in enumerate(gp['channel_conversions']):
if conversion:
conversion = ChannelConversion(
address=conversion,
stream=stream,
)
if conversion['name_addr']:
tx_block = TextBlock(
address=conversion['name_addr'],
stream=stream,
)
text = tx_block['text']
if text in defined_texts:
conversion['name_addr'] = defined_texts[text]
else:
address = tell()
conversion['name_addr'] = address
defined_texts[text] = address
tx_block.address = address
write(bytes(tx_block))
else:
conversion['name_addr'] = 0
if conversion['unit_addr']:
tx_block = TextBlock(
address=conversion['unit_addr'],
stream=stream,
)
text = tx_block['text']
if text in defined_texts:
conversion['unit_addr'] = defined_texts[text]
else:
address = tell()
conversion['unit_addr'] = address
defined_texts[text] = address
tx_block.address = address
write(bytes(tx_block))
else:
conversion['unit_addr'] = 0
if conversion['comment_addr']:
tx_block = TextBlock(
address=conversion['comment_addr'],
stream=stream,
)
text = tx_block['text']
if text in defined_texts:
conversion['comment_addr'] = defined_texts[text]
else:
address = tell()
conversion['comment_addr'] = address
defined_texts[text] = address
tx_block.address = address
write(bytes(tx_block))
else:
conversion['comment_addr'] = 0
if conversion['conversion_type'] == v4c.CONVERSION_TYPE_ALG and conversion['formula_addr']:
tx_block = TextBlock(
address=conversion['formula_addr'],
stream=stream,
)
text = tx_block['text']
if text in defined_texts:
conversion['formula_addr'] = defined_texts[text]
else:
address = tell()
conversion['formula_addr'] = address
defined_texts[text] = address
tx_block.address = address
write(bytes(tx_block))
elif conversion['conversion_type'] in tab_conversion:
for key in temp_texts['conversion_tab'][j]:
conversion[key] = temp_texts['conversion_tab'][j][key]
conversion['inv_conv_addr'] = 0
address = tell()
cc_addrs.append(address)
write(bytes(conversion))
else:
cc_addrs.append(0)
# channel dependecies
temp_deps = []
for j, dep_list in enumerate(gp['channel_dependencies']):
if dep_list:
if all(isinstance(dep, ChannelArrayBlock)
for dep in dep_list):
temp_deps.append([])
for dep in dep_list:
address = tell()
dep.address = address
temp_deps[-1].append(address)
write(bytes(dep))
for k, dep in enumerate(dep_list[:-1]):
dep['composition_addr'] = dep_list[k + 1].address
dep_list[-1]['composition_addr'] = 0
else:
temp_deps.append([])
for _ in dep_list:
temp_deps[-1].append(0)
else:
temp_deps.append(0)
# channels
blocks = []
chans = []
address = blocks_start_addr = tell()
gp['channel_group']['first_ch_addr'] = address
for j, channel in enumerate(gp['channels']):
channel = Channel(
address=channel,
stream=stream,
)
channel.address = address
ch_addrs.append(address)
chans.append(channel)
blocks.append(channel)
address += channel['block_len']
if channel['name_addr']:
tx_block = TextBlock(
address=channel['name_addr'],
stream=stream,
)
text = tx_block['text']
if text in defined_texts:
channel['name_addr'] = defined_texts[text]
else:
channel['name_addr'] = address
defined_texts[text] = address
tx_block.address = address
address += tx_block['block_len']
blocks.append(tx_block)
else:
channel['name_addr'] = 0
if channel['unit_addr']:
tx_block = TextBlock(
address=channel['unit_addr'],
stream=stream,
)
text = tx_block['text']
if text in defined_texts:
channel['unit_addr'] = defined_texts[text]
else:
channel['unit_addr'] = address
defined_texts[text] = address
tx_block.address = address
address += tx_block['block_len']
blocks.append(tx_block)
else:
channel['unit_addr'] = 0
if channel['comment_addr']:
tx_block = TextBlock(
address=channel['comment_addr'],
stream=stream,
)
text = tx_block['text']
if text in defined_texts:
channel['comment_addr'] = defined_texts[text]
else:
channel['comment_addr'] = address
defined_texts[text] = address
tx_block.address = address
address += tx_block['block_len']
blocks.append(tx_block)
else:
channel['comment_addr'] = 0
channel['conversion_addr'] = gp['temp_channel_conversions'][j]
channel['source_addr'] = gp['temp_channel_sources'][j]
signal_data = self._load_signal_data(channel['data_block_addr'])
if signal_data:
signal_data = SignalDataBlock(data=signal_data)
blocks.append(signal_data)
channel['data_block_addr'] = address
address += signal_data['block_len']
align = signal_data['block_len'] % 8
if align % 8:
blocks.append(b'\0' * (8 - align))
address += 8 - align
else:
channel['data_block_addr'] = 0
if gp['channel_dependencies'][j]:
block = gp['channel_dependencies'][j][0]
if isinstance(block, (ChannelArrayBlock, Channel)):
channel['component_addr'] = block.address
else:
channel['component_addr'] = block
group_channels = gp['channels']
if group_channels:
for j, channel in enumerate(chans[:-1]):
channel['next_ch_addr'] = chans[j + 1].address
chans[-1]['next_ch_addr'] = 0
# channel dependecies
j = 0
while j < len(gp['channels']):
dep_list = gp['channel_dependencies'][j]
if dep_list and all(
not isinstance(dep, ChannelArrayBlock) for dep in dep_list):
dep = chans[j+1]
channel = chans[j]
channel['component_addr'] = dep.address
dep = chans[j+len(dep_list)]
channel['next_ch_addr'] = dep['next_ch_addr']
dep['next_ch_addr'] = 0
for k, _ in enumerate(dep_list):
dep = chans[j+1+k]
dep['source_addr'] = 0
j += len(dep_list)
else:
j += 1
seek(blocks_start_addr, v4c.SEEK_START)
for block in blocks:
write(bytes(block))
blocks = []
chans = []
address = tell()
# channel group
gp['channel_group'].address = address
gp['channel_group']['next_cg_addr'] = 0
cg_texts = temp_texts['channel_group'][0]
for key in ('acq_name_addr', 'comment_addr'):
if cg_texts and key in cg_texts:
addr_ = temp_texts['channel_group'][0][key]
gp['channel_group'][key] = addr_
gp['channel_group']['acq_source_addr'] = 0
gp['data_group']['first_cg_addr'] = address
write(bytes(gp['channel_group']))
address = tell()
del gp['temp_channel_sources']
del gp['temp_channel_conversions']
temp_texts = None
blocks = []
address = tell()
gp_rec_ids = []
# data groups
for gp in self.groups:
gp['data_group'].address = address
gp_rec_ids.append(gp['data_group']['record_id_len'])
gp['data_group']['record_id_len'] = 0
address += gp['data_group']['block_len']
blocks.append(gp['data_group'])
gp['data_group']['comment_addr'] = 0
for i, dg in enumerate(self.groups[:-1]):
addr_ = self.groups[i + 1]['data_group'].address
dg['data_group']['next_dg_addr'] = addr_
self.groups[-1]['data_group']['next_dg_addr'] = 0
for block in blocks:
write(bytes(block))
for gp, rec_id in zip(self.groups, gp_rec_ids):
gp['data_group']['record_id_len'] = rec_id
if self.groups:
addr_ = self.groups[0]['data_group'].address
self.header['first_dg_addr'] = addr_
else:
self.header['first_dg_addr'] = 0
self.header['file_history_addr'] = self.file_history[0][0].address
if self.attachments:
addr_ = self.attachments[0][0].address
self.header['first_attachment_addr'] = addr_
else:
self.header['first_attachment_addr'] = 0
if self.file_comment:
self.header['comment_addr'] = self.file_comment.address
else:
self.header['comment_addr'] = 0
seek(v4c.IDENTIFICATION_BLOCK_SIZE, v4c.SEEK_START)
write(bytes(self.header))
for orig_addr, gp in zip(original_data_addresses, self.groups):
gp['data_group']['data_block_addr'] = orig_addr
for gp in self.groups:
for dep_list in gp['channel_dependencies']:
if dep_list:
if all(
isinstance(dep, ChannelArrayBlock)
for dep in dep_list):
for dep in dep_list:
for i, (ch_nr, gp_nr) in enumerate(dep.referenced_channels):
grp = self.groups[gp_nr]
stream.seek(0, v4c.SEEK_END)
dep['scale_axis_{}_dg_addr'.format(i)] = grp['data_group'].address
dep['scale_axis_{}_cg_addr'.format(i)] = grp['channel_group'].address
dep['scale_axis_{}_ch_addr'.format(i)] = grp['temp_channels'][ch_nr]
seek(dep.address, v4c.SEEK_START)
write(bytes(dep))
for gp in self.groups:
del gp['temp_channels']
if dst == self.name:
self.close()
os.remove(self.name)
os.rename(destination, self.name)
self.groups = []
self.header = None
self.identification = None
self.file_history = []
self.channels_db = {}
self.masters_db = {}
self.attachments = []
self.file_comment = None
self._ch_map = {}
self._master_channel_cache = {}
self._tempfile = TemporaryFile()
self._file = open(self.name, 'rb')
self._read()
return dst