Source code for asammdf.mdf

# -*- coding: utf-8 -*-
""" common MDF file format module """

import csv
import logging
import xml.etree.ElementTree as ET
from collections import OrderedDict
from copy import deepcopy
from functools import reduce
from struct import unpack
from shutil import copy
from pathlib import Path

import numpy as np
from numpy.core.defchararray import encode, decode
from pandas import DataFrame, Series

from .blocks.mdf_v2 import MDF2
from .blocks.mdf_v3 import MDF3
from .blocks.mdf_v4 import MDF4
from .signal import Signal
from .blocks.utils import (
    CHANNEL_COUNT,
    MERGE,
    MdfException,
    matlab_compatible,
    validate_version_argument,
    MDF2_VERSIONS,
    MDF3_VERSIONS,
    MDF4_VERSIONS,
    SUPPORTED_VERSIONS,
    randomized_string,
    is_file_like,
    count_channel_groups,
    UINT16_u,
    UINT64_u,
    UniqueDB,
    components,
)
from .blocks.v2_v3_blocks import Channel as ChannelV3
from .blocks.v2_v3_blocks import HeaderBlock as HeaderV3
from .blocks.v2_v3_blocks import ChannelConversion as ChannelConversionV3
from .blocks.v2_v3_blocks import ChannelExtension
from .blocks.v4_blocks import SourceInformation
from .blocks.v4_blocks import ChannelConversion as ChannelConversionV4
from .blocks.v4_blocks import Channel as ChannelV4
from .blocks.v4_blocks import HeaderBlock as HeaderV4
from .blocks.v4_blocks import ChannelArrayBlock, EventBlock
from .blocks import v4_constants as v4c
from .blocks import v2_v3_constants as v23c


logger = logging.getLogger("asammdf")


__all__ = ["MDF", "SUPPORTED_VERSIONS"]


[docs]class MDF(object): """Unified access to MDF v3 and v4 files. Underlying _mdf's attributes and methods are linked to the `MDF` object via *setattr*. This is done to expose them to the user code and for performance considerations. Parameters ---------- name : string | BytesIO mdf file name (if provided it must be a real file name) or file-like object version : string mdf file version from ('2.00', '2.10', '2.14', '3.00', '3.10', '3.20', '3.30', '4.00', '4.10', '4.11'); default '4.10' callback : function keyword only argument: function to call to update the progress; the function must accept two arguments (the current progress and maximum progress value) use_display_names : bool keyword only argument: for MDF4 files parse the XML channel comment to search for the display name; XML parsing is quite expensive so setting this to *False* can decrease the loading times very much; default *False* """ _terminate = False def __init__(self, name=None, version="4.10", **kwargs): if name: if is_file_like(name): file_stream = name else: name = Path(name) if name.is_file(): file_stream = open(name, "rb") else: raise MdfException(f'File "{name}" does not exist') file_stream.seek(0) magic_header = file_stream.read(3) if magic_header != b"MDF": raise MdfException(f'"{name}" is not a valid ASAM MDF file') file_stream.seek(8) version = file_stream.read(4).decode("ascii").strip(" \0") if not version: file_stream.read(16) version = unpack("<H", file_stream.read(2))[0] version = str(version) version = f"{version[0]}.{version[1:]}" if version in MDF3_VERSIONS: self._mdf = MDF3(name, **kwargs) elif version in MDF4_VERSIONS: self._mdf = MDF4(name, **kwargs) elif version in MDF2_VERSIONS: self._mdf = MDF2(name, **kwargs) else: message = f'"{name}" is not a supported MDF file; "{version}" file version was found' raise MdfException(message) else: version = validate_version_argument(version) if version in MDF2_VERSIONS: self._mdf = MDF3(version=version, **kwargs) elif version in MDF3_VERSIONS: self._mdf = MDF3(version=version, **kwargs) elif version in MDF4_VERSIONS: self._mdf = MDF4(version=version, **kwargs) else: message = ( f'"{version}" is not a supported MDF file version; ' f"Supported versions are {SUPPORTED_VERSIONS}" ) raise MdfException(message) # link underlying _mdf attributes and methods to the new MDF object for attr in set(dir(self._mdf)) - set(dir(self)): setattr(self, attr, getattr(self._mdf, attr)) def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.close() def _transfer_events(self, other): def get_scopes(event, events): if event.scopes: return event.scopes else: if event.parent is not None: return get_scopes(events[event.parent], events) elif event.range_start is not None: return get_scopes(events[event.range_start], events) else: return event.scopes if other.version >= "4.00": for event in other.events: if self.version >= "4.00": new_event = deepcopy(event) event_valid = True for i, ref in enumerate(new_event.scopes): try: dg_cntr, ch_cntr = ref try: (self.groups[dg_cntr].channels[ch_cntr]) except: event_valid = False except TypeError: dg_cntr = ref try: (self.groups[dg_cntr].channel_group) except: event_valid = False # ignore attachments for now for i in range(new_event.attachment_nr): key = f"attachment_{i}_addr" event[key] = 0 if event_valid: self.events.append(new_event) else: ev_type = event.event_type ev_range = event.range_type ev_base = event.sync_base ev_factor = event.sync_factor timestamp = ev_base * ev_factor try: comment = ET.fromstring( event.comment.replace( ' xmlns="http://www.asam.net/mdf/v4"', "" ) ) pre = comment.find(".//pre_trigger_interval") if pre is not None: pre = float(pre.text) else: pre = 0.0 post = comment.find(".//post_trigger_interval") if post is not None: post = float(post.text) else: post = 0.0 comment = comment.find(".//TX") if comment is not None: comment = comment.text else: comment = "" except: pre = 0.0 post = 0.0 comment = event.comment if comment: comment += ": " if ev_range == v4c.EVENT_RANGE_TYPE_BEGINNING: comment += "Begin of " elif ev_range == v4c.EVENT_RANGE_TYPE_END: comment += "End of " else: comment += "Single point " if ev_type == v4c.EVENT_TYPE_RECORDING: comment += "recording" elif ev_type == v4c.EVENT_TYPE_RECORDING_INTERRUPT: comment += "recording interrupt" elif ev_type == v4c.EVENT_TYPE_ACQUISITION_INTERRUPT: comment += "acquisition interrupt" elif ev_type == v4c.EVENT_TYPE_START_RECORDING_TRIGGER: comment += "measurement start trigger" elif ev_type == v4c.EVENT_TYPE_STOP_RECORDING_TRIGGER: comment += "measurement stop trigger" elif ev_type == v4c.EVENT_TYPE_TRIGGER: comment += "trigger" else: comment += "marker" scopes = get_scopes(event, other.events) if scopes: for i, ref in enumerate(scopes): event_valid = True try: dg_cntr, ch_cntr = ref try: (self.groups[dg_cntr]) except: event_valid = False except TypeError: dg_cntr = ref try: (self.groups[dg_cntr]) except: event_valid = False if event_valid: self.add_trigger( dg_cntr, timestamp, pre_time=pre, post_time=post, comment=comment, ) else: for i, _ in enumerate(self.groups): self.add_trigger( i, timestamp, pre_time=pre, post_time=post, comment=comment, ) else: for trigger_info in other.iter_get_triggers(): comment = trigger_info["comment"] timestamp = trigger_info["time"] group = trigger_info["group"] if self.version < "4.00": self.add_trigger( group, timestamp, pre_time=trigger_info["pre_time"], post_time=trigger_info["post_time"], comment=comment, ) else: if timestamp: ev_type = v4c.EVENT_TYPE_TRIGGER else: ev_type = v4c.EVENT_TYPE_START_RECORDING_TRIGGER event = EventBlock( event_type=ev_type, sync_base=int(timestamp * 10 ** 9), sync_factor=10 ** -9, scope_0_addr=0, ) event.comment = comment event.scopes.append(group) self.events.append(event) def _included_channels(self, index): """ get the minimum channels needed to extract all information from the channel group (for example keep onl the structure channel and exclude the strucutre fields channels) Parameters ---------- index : int channel group index Returns ------- included_channels : set set of excluded channels """ group = self.groups[index] included_channels = set(range(len(group.channels))) master_index = self.masters_db.get(index, None) if master_index is not None: included_channels.remove(master_index) channels = group.channels if self.version in MDF2_VERSIONS + MDF3_VERSIONS: for dep in group.channel_dependencies: if dep is None: continue for gp_nr, ch_nr in dep.referenced_channels: if gp_nr == index: included_channels.add(ch_nr) else: if group.CAN_logging: where = ( self.whereis("CAN_DataFrame") + self.whereis("CAN_ErrorFrame") + self.whereis("CAN_RemoteFrame") ) for dg_cntr, ch_cntr in where: if dg_cntr == index: break else: raise MdfException( f"CAN_DataFrame or CAN_ErrorFrame not found in group {index}" ) channel = channels[ch_cntr] frame_bytes = range( channel.byte_offset, channel.byte_offset + channel.bit_count // 8 ) for i, channel in enumerate(channels): if channel.byte_offset in frame_bytes: included_channels.remove(i) if group.CAN_database: dbc_addr = group.dbc_addr message_id = group.message_id can_msg = self._dbc_cache[dbc_addr].frameById(message_id) for i, _ in enumerate(can_msg.signals, 1): included_channels.add(-i) for dependencies in group.channel_dependencies: if dependencies is None: continue if all(not isinstance(dep, ChannelArrayBlock) for dep in dependencies): for _, ch_nr in dependencies: try: included_channels.remove(ch_nr) except KeyError: pass else: for dep in dependencies: for gp_nr, ch_nr in dep.referenced_channels: if gp_nr == index: try: included_channels.remove(ch_nr) except KeyError: pass return included_channels def __contains__(self, channel): """ if *'channel name'* in *'mdf file'* """ return channel in self.channels_db def __iter__(self): """ iterate over all the channels found in the file; master channels are skipped from iteration """ for signal in self.iter_channels(): yield signal
[docs] def convert(self, version): """convert *MDF* to other version Parameters ---------- version : str new mdf file version from ('2.00', '2.10', '2.14', '3.00', '3.10', '3.20', '3.30', '4.00', '4.10', '4.11'); default '4.10' Returns ------- out : MDF new *MDF* object """ version = validate_version_argument(version) out = MDF(version=version) out.header.start_time = self.header.start_time groups_nr = len(self.groups) if self._callback: self._callback(0, groups_nr) cg_nr = -1 # walk through all groups and get all channels for i, group in enumerate(self.groups): encodings = [] included_channels = self._included_channels(i) if included_channels: cg_nr += 1 else: continue parents, dtypes = self._prepare_record(group) data = self._load_data(group) for idx, fragment in enumerate(data): if dtypes.itemsize: group.record = np.core.records.fromstring(fragment[0], dtype=dtypes) else: group.record = None continue # the first fragment triggers and append that will add the # metadata for all channels if idx == 0: sigs = [] for j in included_channels: sig = self.get( group=i, index=j, data=fragment, raw=True, ignore_invalidation_bits=True, copy_master=False, ) if version < "4.00": if sig.samples.dtype.kind == "S": encodings.append(sig.encoding) strsig = self.get( group=i, index=j, samples_only=True, ignore_invalidation_bits=True, )[0] sig.samples = sig.samples.astype(strsig.dtype) del strsig if sig.encoding != "latin-1": if sig.encoding == "utf-16-le": sig.samples = ( sig.samples.view(np.uint16) .byteswap() .view(sig.samples.dtype) ) sig.samples = encode( decode(sig.samples, "utf-16-be"), "latin-1" ) else: sig.samples = encode( decode(sig.samples, sig.encoding), "latin-1" ) else: encodings.append(None) if not sig.samples.flags.writeable: sig.samples = sig.samples.copy() sigs.append(sig) source_info = f"Converted from {self.version} to {version}" if sigs: out.append(sigs, source_info, common_timebase=True) new_group = out.groups[-1] new_channel_group = new_group.channel_group old_channel_group = group.channel_group new_channel_group.comment = old_channel_group.comment if version >= "4.00": new_channel_group["path_separator"] = ord(".") if self.version >= "4.00": new_channel_group.acq_name = old_channel_group.acq_name new_channel_group.acq_source = ( old_channel_group.acq_source ) else: break # the other fragments will trigger onl the extension of # samples records to the data block else: sigs = [ (self.get_master(i, data=fragment, copy_master=False), None) ] for k, j in enumerate(included_channels): sig = self.get( group=i, index=j, data=fragment, raw=True, samples_only=True, ignore_invalidation_bits=True, ) if version < "4.00": encoding = encodings[k] samples = sig[0] if encoding: if encoding != "latin-1": if encoding == "utf-16-le": samples = ( samples.view(np.uint16) .byteswap() .view(samples.dtype) ) samples = encode( decode(samples, "utf-16-be"), "latin-1" ) else: samples = encode( decode(samples, encoding), "latin-1" ) sig.samples = samples if not sig[0].flags.writeable: sig = sig[0].copy(), sig[1] sigs.append(sig) out.extend(cg_nr, sigs) group.record = None if self._callback: self._callback(i + 1, groups_nr) if self._terminate: return out._transfer_events(self) if self._callback: out._callback = out._mdf._callback = self._callback return out
[docs] def cut(self, start=None, stop=None, whence=0, version=None, include_ends=True): """cut *MDF* file. *start* and *stop* limits are absolute values or values relative to the first timestamp depending on the *whence* argument. Parameters ---------- start : float start time, default *None*. If *None* then the start of measurement is used stop : float stop time, default *None*. If *None* then the end of measurement is used whence : int how to search for the start and stop values * 0 : absolute * 1 : relative to first timestamp version : str new mdf file version from ('2.00', '2.10', '2.14', '3.00', '3.10', '3.20', '3.30', '4.00', '4.10', '4.11'); default *None* and in this case the original file version is used include_ends : bool include the *start* and *stop* timestamps after cutting the signal. If *start* and *stop* are found in the original timestamps, then the new samples will be computed using interpolation. Default *True*\ Returns ------- out : MDF new MDF object """ if version is None: version = self.version else: version = validate_version_argument(version) out = MDF(version=version) out.header.start_time = self.header.start_time if whence == 1: timestamps = [] for i, group in enumerate(self.groups): fragment = next(self._load_data(group)) master = self.get_master(i, fragment) if master.size: timestamps.append(master[0]) del master if timestamps: first_timestamp = np.amin(timestamps) else: first_timestamp = 0 if start is not None: start += first_timestamp if stop is not None: stop += first_timestamp out.header.start_time = self.header.start_time groups_nr = len(self.groups) if self._callback: self._callback(0, groups_nr) cg_nr = -1 interpolation_mode = self._integer_interpolation # walk through all groups and get all channels for i, group in enumerate(self.groups): included_channels = self._included_channels(i) if included_channels: cg_nr += 1 else: continue data = self._load_data(group) parents, dtypes = self._prepare_record(group) idx = 0 for fragment in data: if dtypes.itemsize: group.record = np.core.records.fromstring(fragment[0], dtype=dtypes) else: group.record = None master = self.get_master(i, fragment) if not len(master): continue needs_cutting = True # check if this fragement is within the cut interval or # if the cut interval has ended if start is None and stop is None: fragment_start = None fragment_stop = None start_index = 0 stop_index = len(master) needs_cutting = False elif start is None: fragment_start = None start_index = 0 if master[0] > stop: break else: fragment_stop = min(stop, master[-1]) stop_index = np.searchsorted( master, fragment_stop, side="right" ) if stop_index == len(master): needs_cutting = False elif stop is None: fragment_stop = None if master[-1] < start: continue else: fragment_start = max(start, master[0]) start_index = np.searchsorted( master, fragment_start, side="left" ) stop_index = len(master) if start_index == 0: needs_cutting = False else: if master[0] > stop: break elif master[-1] < start: continue else: fragment_start = max(start, master[0]) start_index = np.searchsorted( master, fragment_start, side="left" ) fragment_stop = min(stop, master[-1]) stop_index = np.searchsorted( master, fragment_stop, side="right" ) if start_index == 0 and stop_index == len(master): needs_cutting = False if needs_cutting: cut_timebase = ( Signal(master, master, name="_") .cut(fragment_start, fragment_stop, include_ends, interpolation_mode=interpolation_mode) .timestamps ) # the first fragment triggers and append that will add the # metadata for all channels if idx == 0: sigs = [] for j in included_channels: sig = self.get( group=i, index=j, data=fragment, raw=True, ignore_invalidation_bits=True, copy_master=False, ) if needs_cutting: sig = sig.interp(cut_timebase, interpolation_mode=interpolation_mode) # if sig.stream_sync and False: # attachment, _name = sig.attachment # duration = get_video_stream_duration(attachment) # if start is None: # start_t = 0 # else: # start_t = start # # if stop is None: # end_t = duration # else: # end_t = stop # # attachment = cut_video_stream( # attachment, # start_t, # end_t, # Path(_name).suffix, # ) # sig.attachment = attachment, _name if not sig.samples.flags.writeable: sig.samples = sig.samples.copy() sigs.append(sig) if sigs: if start: start_ = f"{start}s" else: start_ = "start of measurement" if stop: stop_ = f"{stop}s" else: stop_ = "end of measurement" out.append( sigs, f"Cut from {start_} to {stop_}", common_timebase=True ) else: break idx += 1 # the other fragments will trigger onl the extension of # samples records to the data block else: if needs_cutting: sigs = [(cut_timebase, None)] else: sigs = [(master, None)] for j in included_channels: sig = self.get( group=i, index=j, data=fragment, raw=True, samples_only=True, ignore_invalidation_bits=True, ) if needs_cutting: _sig = Signal( sig[0], master, name="_", invalidation_bits=sig[1] ).interp(cut_timebase, interpolation_mode=interpolation_mode) sig = (_sig.samples, _sig.invalidation_bits) del _sig sigs.append(sig) if sigs: out.extend(cg_nr, sigs) idx += 1 group.record = None # if the cut interval is not found in the measurement # then append an empty data group if idx == 0: self.configure(read_fragment_size=1) sigs = [] fragment = next(self._load_data(group)) fragment = (fragment[0], -1, None) for j in included_channels: sig = self.get( group=i, index=j, data=fragment, raw=True, ignore_invalidation_bits=True, ) sig.samples = sig.samples[:0] sig.timestamps = sig.timestamps[:0] sigs.append(sig) if start: start_ = f"{start}s" else: start_ = "start of measurement" if stop: stop_ = f"{stop}s" else: stop_ = "end of measurement" out.append(sigs, f"Cut from {start_} to {stop_}", common_timebase=True) self.configure(read_fragment_size=0) if self._callback: self._callback(i + 1, groups_nr) if self._terminate: return out._transfer_events(self) if self._callback: out._callback = out._mdf._callback = self._callback return out
[docs] def export(self, fmt, filename=None, **kargs): """ export *MDF* to other formats. The *MDF* file name is used is available, else the *filename* argument must be provided. Parameters ---------- fmt : string can be one of the following: * `csv` : CSV export that uses the ";" delimiter. This option will generate a new csv file for each data group (<MDFNAME>_DataGroup_<cntr>.csv) * `hdf5` : HDF5 file output; each *MDF* data group is mapped to a *HDF5* group with the name 'DataGroup_<cntr>' (where <cntr> is the index) * `excel` : Excel file output (very slow). This option will generate a new excel file for each data group (<MDFNAME>_DataGroup_<cntr>.xlsx) * `mat` : Matlab .mat version 4, 5 or 7.3 export. If *single_time_base==False* the channels will be renamed in the mat file to 'D<cntr>_<channel name>'. The channel group master will be renamed to 'DM<cntr>_<channel name>' ( *<cntr>* is the data group index starting from 0) * `pandas` : export all channels as a single pandas DataFrame * `parquet` : export to Apache parquet format filename : string | pathlib.Path export file name **kwargs * `single_time_base`: resample all channels to common time base, default *False* (pandas export is by default single based) * `raster`: float time raster for resampling. Valid if *single_time_base* is *True* and for *pandas* export * `time_from_zero`: adjust time channel to start from 0 * `use_display_names`: use display name instead of standard channel name, if available. * `empty_channels`: behaviour for channels without samples; the options are *skip* or *zeros*; default is *skip* * `format`: only valid for *mat* export; can be '4', '5' or '7.3', default is '5' * `oned_as`: only valid for *mat* export; can be 'row' or 'column' * `keep_arrays` : keep arrays and structure channels as well as the component channels. If *True* this can be very slow. If *False* only the component channels are saved, and their names will be prefixed with the parent channel. Returns ------- dataframe : pandas.DataFrame only in case of *pandas* export """ from time import perf_counter as pc header_items = ("date", "time", "author", "department", "project", "subject") if fmt != "pandas" and filename is None and self.name is None: message = ( "Must specify filename for export" "if MDF was created without a file name" ) logger.warning(message) return single_time_base = kargs.get("single_time_base", False) raster = kargs.get("raster", 0) time_from_zero = kargs.get("time_from_zero", True) use_display_names = kargs.get("use_display_names", True) empty_channels = kargs.get("empty_channels", "skip") format = kargs.get("format", "5") oned_as = kargs.get("oned_as", "row") name = Path(filename) if filename else self.name if fmt == "parquet": try: from fastparquet import write as write_parquet except ImportError: logger.warning( "fastparquet not found; export to parquet is unavailable" ) return elif fmt == "hdf5": try: from h5py import File as HDF5 except ImportError: logger.warning("h5py not found; export to HDF5 is unavailable") return elif fmt == "excel": try: import xlsxwriter except ImportError: logger.warning("xlsxwriter not found; export to Excel unavailable") return elif fmt == "mat": if format == "7.3": try: from hdf5storage import savemat except ImportError: logger.warning( "hdf5storage not found; export to mat v7.3 is unavailable" ) return else: try: from scipy.io import savemat except ImportError: logger.warning("scipy not found; export to mat is unavailable") return if single_time_base or fmt in ("pandas", "parquet"): df = DataFrame() units = OrderedDict() comments = OrderedDict() masters = [self.get_master(i) for i in range(len(self.groups))] for i in range(len(self.groups)): self._master_channel_cache[(i, 0, -1)] = self._master_channel_cache[i] self.masters_db.clear() master = reduce(np.union1d, masters) if raster and len(master): if len(master) > 1: num = float(np.float32((master[-1] - master[0]) / raster)) if num.is_integer(): master = np.linspace(master[0], master[-1], int(num)) else: master = np.arange( master[0], master[-1], raster, dtype=np.float64 ) if time_from_zero and len(master): df = df.assign(time=master) df["time"] = Series(master - master[0], index=np.arange(len(master))) else: df["time"] = Series(master, index=np.arange(len(master))) units["time"] = "s" comments["time"] = "" used_names = UniqueDB() used_names.get_unique_name("time") for i, grp in enumerate(self.groups): if self._terminate: return included_channels = self._included_channels(i) data = self._load_data(grp) data = b"".join(d[0] for d in data) data = (data, 0, -1) for j in included_channels: sig = self.get(group=i, index=j, data=data).interp( master, self._integer_interpolation ) if len(sig.samples.shape) > 1: arr = [sig.samples] types = [(sig.name, sig.samples.dtype, sig.samples.shape[1:])] sig.samples = np.core.records.fromarrays(arr, dtype=types) if use_display_names: channel_name = sig.display_name or sig.name else: channel_name = sig.name channel_name = used_names.get_unique_name(channel_name) if len(sig): try: # df = df.assign(**{channel_name: sig.samples}) if sig.samples.dtype.names: df[channel_name] = Series(sig.samples, dtype="O") else: df[channel_name] = Series(sig.samples) except: print(sig.samples.dtype, sig.samples.shape, sig.name) print(list(df), len(df)) raise units[channel_name] = sig.unit comments[channel_name] = sig.comment else: if empty_channels == "zeros": df[channel_name] = np.zeros( len(master), dtype=sig.samples.dtype ) units[channel_name] = sig.unit comments[channel_name] = sig.comment del self._master_channel_cache[(i, 0, -1)] if fmt == "hdf5": name = name.with_suffix(".hdf") if single_time_base: with HDF5(str(name), "w") as hdf: # header information group = hdf.create_group(str(name)) if self.version in MDF2_VERSIONS + MDF3_VERSIONS: for item in header_items: group.attrs[item] = self.header[item] # save each data group in a HDF5 group called # "DataGroup_<cntr>" with the index starting from 1 # each HDF5 group will have a string attribute "master" # that will hold the name of the master channel for channel in df: samples = df[channel] unit = units[channel] comment = comments[channel] if samples.dtype.kind == 'O': continue dataset = group.create_dataset(channel, data=samples) unit = unit.replace("\0", "") if unit: dataset.attrs["unit"] = unit comment = comment.replace("\0", "") if comment: dataset.attrs["comment"] = comment else: with HDF5(str(name), "w") as hdf: # header information group = hdf.create_group(str(name)) if self.version in MDF2_VERSIONS + MDF3_VERSIONS: for item in header_items: group.attrs[item] = self.header[item] # save each data group in a HDF5 group called # "DataGroup_<cntr>" with the index starting from 1 # each HDF5 group will have a string attribute "master" # that will hold the name of the master channel for i, grp in enumerate(self.groups): names = UniqueDB() if self._terminate: return group_name = r"/" + f"DataGroup_{i+1}" group = hdf.create_group(group_name) master_index = self.masters_db.get(i, -1) data = self._load_data(grp) data = b"".join(d[0] for d in data) data = (data, 0, -1) for j, _ in enumerate(grp.channels): sig = self.get(group=i, index=j, data=data) name = sig.name name = names.get_unique_name(name) if j == master_index: group.attrs["master"] = name dataset = group.create_dataset(name, data=sig.samples) unit = sig.unit if unit: dataset.attrs["unit"] = unit comment = sig.comment.replace("\0", "") if comment: dataset.attrs["comment"] = comment del self._master_channel_cache[(i, 0, -1)] elif fmt == "excel": if single_time_base: name = name.with_suffix(".xlsx") message = f'Writing excel export to file "{name}"' logger.info(message) workbook = xlsxwriter.Workbook(str(name)) sheet = workbook.add_worksheet("Channels") for col, (channel_name, channel_unit) in enumerate(units.items()): if self._terminate: return samples = df[channel_name] sig_description = f"{channel_name} [{channel_unit}]" sheet.write(0, col, sig_description) try: sheet.write_column(1, col, samples.astype(str)) except: vals = [str(e) for e in sig.samples] sheet.write_column(1, col, vals) workbook.close() else: while name.suffix == ".xlsx": name = name.stem count = len(self.groups) for i, grp in enumerate(self.groups): if self._terminate: return message = f"Exporting group {i+1} of {count}" logger.info(message) data = self._load_data(grp) data = b"".join(d[0] for d in data) data = (data, 0, -1) master_index = self.masters_db.get(i, None) if master_index is not None: master = self.get(group=i, index=master_index, data=data) if raster and len(master): raster_ = np.arange( master[0], master[-1], raster, dtype=np.float64 ) master = master.interp(raster_) else: raster_ = None else: master = None raster_ = None if time_from_zero: master.samples -= master.samples[0] group_name = f"DataGroup_{i+1}" wb_name = Path(f"{name.stem}_{group_name}.xlsx") workbook = xlsxwriter.Workbook(str(wb_name)) sheet = workbook.add_worksheet(group_name) if master is not None: sig_description = f"{master.name} [{master.unit}]" sheet.write(0, 0, sig_description) sheet.write_column(1, 0, master.samples.astype(str)) offset = 1 else: offset = 0 for col, _ in enumerate(grp.channels): if self._terminate: return if col == master_index: offset -= 1 continue sig = self.get(group=i, index=col, data=data) if raster_ is not None: sig = sig.interp(raster_, self._integer_interpolation) sig_description = f"{sig.name} [{sig.unit}]" sheet.write(0, col + offset, sig_description) try: sheet.write_column(1, col + offset, sig.samples.astype(str)) except: vals = [str(e) for e in sig.samples] sheet.write_column(1, col + offset, vals) workbook.close() del self._master_channel_cache[(i, 0, -1)] elif fmt == "csv": if single_time_base: name = name.with_suffix(".csv") message = f'Writing csv export to file "{name}"' logger.info(message) with open(name, "w", newline="") as csvfile: writer = csv.writer(csvfile) names_row = [ f"{channel_name} [{channel_unit}]" for (channel_name, channel_unit) in units.items() ] writer.writerow(names_row) vals = [df[name] for name in df] if self._terminate: return for row in zip(*vals): writer.writerow(row) else: name = name.with_suffix(".csv") count = len(self.groups) for i, grp in enumerate(self.groups): if self._terminate: return message = f"Exporting group {i+1} of {count}" logger.info(message) data = self._load_data(grp) data = b"".join(d[0] for d in data) data = (data, 0, -1) group_name = f"DataGroup_{i+1}" group_csv_name = Path(f"{name.stem}_{group_name}.csv") with open(group_csv_name, "w") as csvfile: writer = csv.writer(csvfile) master_index = self.masters_db.get(i, None) if master_index is not None: master = self.get(group=i, index=master_index, data=data) if raster and len(master): raster_ = np.arange( master[0], master[-1], raster, dtype=np.float64 ) master = master.interp(raster_) else: raster_ = None else: master = None raster_ = None if time_from_zero: if master is None: pass elif len(master): master.samples -= master.samples[0] ch_nr = len(grp.channels) if master is None: channels = [ self.get(group=i, index=j, data=data) for j in range(ch_nr) ] else: if raster_ is not None: channels = [ self.get(group=i, index=j, data=data).interp( raster_, self._integer_interpolation ) for j in range(ch_nr) if j != master_index ] else: channels = [ self.get(group=i, index=j, data=data) for j in range(ch_nr) if j != master_index ] if raster_ is not None: cycles = len(raster_) else: cycles = grp.channel_group["cycles_nr"] if empty_channels == "zeros": for channel in channels: if not len(channel): channel.samples = np.zeros( cycles, dtype=channel.samples.dtype ) if master is not None: names_row = [master.name] vals = [master.samples] else: names_row = [] vals = [] names_row += [f"{ch.name} [{ch.unit}]" for ch in channels] writer.writerow(names_row) vals += [ch.samples for ch in channels] for idx, row in enumerate(zip(*vals)): writer.writerow(row) del self._master_channel_cache[(i, 0, -1)] elif fmt == "mat": name = name.with_suffix(".mat") if not single_time_base: mdict = {} master_name_template = "DGM{}_{}" channel_name_template = "DG{}_{}" used_names = UniqueDB() for i, grp in enumerate(self.groups): if self._terminate: return included_channels = self._included_channels(i) master_index = self.masters_db.get(i, -1) if master_index >= 0: included_channels.add(master_index) data = self._load_data(grp) data = b"".join(d[0] for d in data) data = (data, 0, -1) for j in included_channels: sig = self.get(group=i, index=j, data=data) if j == master_index: channel_name = master_name_template.format(i, sig.name) else: if use_display_names: channel_name = sig.display_name or sig.name else: channel_name = sig.name channel_name = channel_name_template.format(i, channel_name) channel_name = matlab_compatible(channel_name) channel_name = used_names.get_unique_name(channel_name) if sig.samples.dtype.names: sig.samples.dtype.names = [ matlab_compatible(name) for name in sig.samples.dtype.names ] mdict[channel_name] = sig.samples del self._master_channel_cache[(i, 0, -1)] else: used_names = UniqueDB() new_mdict = {} for channel_name, samples in mdict.items(): channel_name = matlab_compatible(channel_name) channel_name = used_names.get_unique_name(channel_name) new_mdict[channel_name] = samples mdict = new_mdict if format == "7.3": savemat( str(name), mdict, long_field_names=True, format="7.3", delete_unused_variables=False, oned_as=oned_as, ) else: savemat(str(name), mdict, long_field_names=True, oned_as=oned_as) elif fmt in ("pandas", "parquet"): if fmt == "pandas": return df else: name = name.with_suffix(".parquet") write_parquet(name, DataFrame.from_dict(mdict)) else: message = ( 'Unsopported export type "{}". ' 'Please select "csv", "excel", "hdf5", "mat" or "pandas"' ) message.format(fmt) logger.warning(message)
[docs] def filter(self, channels, version=None): """ return new *MDF* object that contains only the channels listed in *channels* argument Parameters ---------- channels : list list of items to be filtered; each item can be : * a channel name string * (channel name, group index, channel index) list or tuple * (channel name, group index) list or tuple * (None, group index, channel index) list or tuple version : str new mdf file version from ('2.00', '2.10', '2.14', '3.00', '3.10', '3.20', '3.30', '4.00', '4.10', '4.11'); default *None* and in this case the original file version is used Returns ------- mdf : MDF new *MDF* file Examples -------- >>> from asammdf import MDF, Signal >>> import numpy as np >>> t = np.arange(5) >>> s = np.ones(5) >>> mdf = MDF() >>> for i in range(4): ... sigs = [Signal(s*(i*10+j), t, name='SIG') for j in range(1,4)] ... mdf.append(sigs) ... >>> filtered = mdf.filter(['SIG', ('SIG', 3, 1), ['SIG', 2], (None, 1, 2)]) >>> for gp_nr, ch_nr in filtered.channels_db['SIG']: ... print(filtered.get(group=gp_nr, index=ch_nr)) ... <Signal SIG: samples=[ 1. 1. 1. 1. 1.] timestamps=[0 1 2 3 4] unit="" info=None comment=""> <Signal SIG: samples=[ 31. 31. 31. 31. 31.] timestamps=[0 1 2 3 4] unit="" info=None comment=""> <Signal SIG: samples=[ 21. 21. 21. 21. 21.] timestamps=[0 1 2 3 4] unit="" info=None comment=""> <Signal SIG: samples=[ 12. 12. 12. 12. 12.] timestamps=[0 1 2 3 4] unit="" info=None comment=""> """ if version is None: version = self.version else: version = validate_version_argument(version) # group channels by group index gps = {} for item in channels: if isinstance(item, (list, tuple)): if len(item) not in (2, 3): raise MdfException( "The items used for filtering must be strings, " "or they must match the first 3 argumens of the get " "method" ) else: group, index = self._validate_channel_selection(*item) if group not in gps: gps[group] = {index} else: gps[group].add(index) else: name = item group, index = self._validate_channel_selection(name) if group not in gps: gps[group] = {index} else: gps[group].add(index) # see if there are exluded channels in the filter list for group_index, indexes in gps.items(): grp = self.groups[group_index] included_channels = set(indexes) master_index = self.masters_db.get(group_index, None) if master_index is not None: to_exclude = {master_index} else: to_exclude = set() for index in indexes: if self.version in MDF2_VERSIONS + MDF3_VERSIONS: dep = grp.channel_dependencies[index] if dep: for gp_nr, ch_nr in dep.referenced_channels: if gp_nr == group: included_channels.remove(ch_nr) else: dependencies = grp.channel_dependencies[index] if dependencies is None: continue if all( not isinstance(dep, ChannelArrayBlock) for dep in dependencies ): channels = grp.channels for _, channel in dependencies: to_exclude.add(channel) else: for dep in dependencies: for gp_nr, ch_nr in dep.referenced_channels: if gp_nr == group: to_exclude.add(ch_nr) gps[group_index] = sorted(included_channels - to_exclude) mdf = MDF(version=version) mdf.header.start_time = self.header.start_time if self.name: origin = self.name.parent else: origin = "New MDF" groups_nr = len(gps) if self._callback: self._callback(0, groups_nr) # append filtered channels to new MDF for new_index, (group_index, indexes) in enumerate(gps.items()): if version < "4.00": encodings = [] group = self.groups[group_index] data = self._load_data(group) _, dtypes = self._prepare_record(group) for idx, fragment in enumerate(data): if dtypes.itemsize: group.record = np.core.records.fromstring(fragment[0], dtype=dtypes) else: group.record = None # the first fragment triggers and append that will add the # metadata for all channels if idx == 0: sigs = [] for j in indexes: sig = self.get( group=group_index, index=j, data=fragment, raw=True, ignore_invalidation_bits=True, copy_master=False, ) if version < "4.00": if sig.samples.dtype.kind == "S": encodings.append(sig.encoding) strsig = self.get( group=group_index, index=j, samples_only=True, ignore_invalidation_bits=True, )[0] sig.samples = sig.samples.astype(strsig.dtype) del strsig if sig.encoding != "latin-1": if sig.encoding == "utf-16-le": sig.samples = ( sig.samples.view(np.uint16) .byteswap() .view(sig.samples.dtype) ) sig.samples = encode( decode(sig.samples, "utf-16-be"), "latin-1" ) else: sig.samples = encode( decode(sig.samples, sig.encoding), "latin-1" ) else: encodings.append(None) if not sig.samples.flags.writeable: sig.samples = sig.samples.copy() sigs.append(sig) source_info = f"Signals filtered from <{origin}>" if sigs: mdf.append(sigs, source_info, common_timebase=True) else: break # the other fragments will trigger onl the extension of # samples records to the data block else: sigs = [ ( self.get_master( group_index, data=fragment, copy_master=False ), None, ) ] for k, j in enumerate(included_channels): sig = self.get( group=group_index, index=j, data=fragment, samples_only=True, raw=True, ignore_invalidation_bits=True, ) if version < "4.00": encoding = encodings[k] samples = sig[0] if encoding: if encoding != "latin-1": if encoding == "utf-16-le": samples = ( samples.view(np.uint16) .byteswap() .view(samples.dtype) ) samples = encode( decode(samples, "utf-16-be"), "latin-1" ) else: samples = encode( decode(samples, encoding), "latin-1" ) sig.samples = samples sigs.append(sig) if sigs: mdf.extend(new_index, sigs) group.record = None if self._callback: self._callback(new_index + 1, groups_nr) if self._terminate: return mdf._transfer_events(self) if self._callback: mdf._callback = mdf._mdf._callback = self._callback return mdf
[docs] def iter_get( self, name=None, group=None, index=None, raster=None, samples_only=False, raw=False, ): """ iterator over a channel This is usefull in case of large files with a small number of channels. If the *raster* keyword argument is not *None* the output is interpolated accordingly Parameters ---------- name : string name of channel group : int 0-based group index index : int 0-based channel index raster : float time raster in seconds samples_only : bool if *True* return only the channel samples as numpy array; if *False* return a *Signal* object raw : bool return channel samples without appling the conversion rule; default `False` """ gp_nr, ch_nr = self._validate_channel_selection(name, group, index) grp = self.groups[gp_nr] data = self._load_data(grp) for fragment in data: yield self.get( group=gp_nr, index=ch_nr, raster=raster, samples_only=samples_only, data=fragment, raw=raw, )
[docs] @staticmethod def concatenate(files, version="4.10", sync=True, **kwargs): """ concatenates several files. The files must have the same internal structure (same number of groups, and same channels in each group) Parameters ---------- files : list | tuple list of *MDF* file names or *MDF* instances version : str merged file version sync : bool sync the files based on the start of measurement, default *True* Returns ------- concatenate : MDF new *MDF* object with concatenated channels Raises ------ MdfException : if there are inconsistencies between the files """ if not files: raise MdfException("No files given for merge") callback = kwargs.get("callback", None) if callback: callback(0, 100) mdf_nr = len(files) versions = [] if sync: timestamps = [] for file in files: if isinstance(file, MDF): timestamps.append(file.header.start_time) versions.append(file.version) else: with open(file, "rb") as mdf: mdf.seek(64) blk_id = mdf.read(2) if blk_id == b"HD": header = HeaderV3 versions.append("3.00") else: versions.append("4.00") blk_id += mdf.read(2) if blk_id == b"##HD": header = HeaderV4 else: raise MdfException(f'"{file}" is not a valid MDF file') header = header(address=64, stream=mdf) timestamps.append(header.start_time) oldest = timestamps[0] offsets = [(timestamp - oldest).total_seconds() for timestamp in timestamps] offsets = [offset if offset > 0 else 0 for offset in offsets] else: file = files[0] if isinstance(file, MDF): oldest = file.header.start_time versions.append(file.version) else: with open(file, "rb") as mdf: mdf.seek(64) blk_id = mdf.read(2) if blk_id == b"HD": versions.append("3.00") header = HeaderV3 else: versions.append("4.00") blk_id += mdf.read(2) if blk_id == b"##HD": header = HeaderV4 else: raise MdfException(f'"{file}" is not a valid MDF file') header = header(address=64, stream=mdf) oldest = header.start_time offsets = [0 for _ in files] version = validate_version_argument(version) merged = MDF(version=version, callback=callback) merged.header.start_time = oldest encodings = [] included_channel_names = [] for mdf_index, (offset, mdf) in enumerate(zip(offsets, files)): if not isinstance(mdf, MDF): mdf = MDF(mdf) if mdf_index == 0: last_timestamps = [None for gp in mdf.groups] groups_nr = len(mdf.groups) cg_nr = -1 for i, group in enumerate(mdf.groups): included_channels = mdf._included_channels(i) if mdf_index == 0: included_channel_names.append( [group.channels[k].name for k in included_channels] ) else: names = [group.channels[k].name for k in included_channels] if names != included_channel_names[i]: if sorted(names) != sorted(included_channel_names[i]): raise MdfException(f"internal structure of file {mdf_index} is different") else: logger.warning( f'Different channel order in channel group {i} of file {mdf_index}.' ' Data can be corrupted if the there are channels with the same ' 'name in this channel group' ) included_channels = [ mdf._validate_channel_selection( name=name_, group=i, )[1] for name_ in included_channel_names[i] ] if included_channels: cg_nr += 1 else: continue channels_nr = len(group.channels) y_axis = MERGE idx = np.searchsorted(CHANNEL_COUNT, channels_nr, side="right") - 1 if idx < 0: idx = 0 read_size = y_axis[idx] idx = 0 last_timestamp = last_timestamps[i] first_timestamp = None original_first_timestamp = None if read_size: mdf.configure(read_fragment_size=int(read_size)) parents, dtypes = mdf._prepare_record(group) data = mdf._load_data(group) for fragment in data: if dtypes.itemsize: group.record = np.core.records.fromstring( fragment[0], dtype=dtypes ) else: group.record = None if mdf_index == 0 and idx == 0: encodings_ = [] encodings.append(encodings_) signals = [] for j in included_channels: sig = mdf.get( group=i, index=j, data=fragment, raw=True, ignore_invalidation_bits=True, copy_master=False, ) if version < "4.00": if sig.samples.dtype.kind == "S": encodings_.append(sig.encoding) strsig = mdf.get( group=i, index=j, samples_only=True, ignore_invalidation_bits=True, )[0] sig.samples = sig.samples.astype(strsig.dtype) del strsig if sig.encoding != "latin-1": if sig.encoding == "utf-16-le": sig.samples = ( sig.samples.view(np.uint16) .byteswap() .view(sig.samples.dtype) ) sig.samples = encode( decode(sig.samples, "utf-16-be"), "latin-1", ) else: sig.samples = encode( decode(sig.samples, sig.encoding), "latin-1", ) else: encodings_.append(None) if not sig.samples.flags.writeable: sig.samples = sig.samples.copy() signals.append(sig) if signals and len(signals[0]): if offset > 0: timestamps = sig[0].timestamps + offset for sig in signals: sig.timestamps = timestamps last_timestamp = signals[0].timestamps[-1] first_timestamp = signals[0].timestamps[0] original_first_timestamp = first_timestamp if signals: merged.append(signals, common_timebase=True) else: break idx += 1 else: master = mdf.get_master(i, fragment) if len(master): if original_first_timestamp is None: original_first_timestamp = master[0] if offset > 0: master = master + offset if last_timestamp is None: last_timestamp = master[-1] else: if last_timestamp >= master[0]: if len(master) >= 2: delta = master[1] - master[0] else: delta = 0.001 master -= master[0] master += last_timestamp + delta last_timestamp = master[-1] signals = [(master, None)] for k, j in enumerate(included_channels): sig = mdf.get( group=i, index=j, data=fragment, raw=True, samples_only=True, ignore_invalidation_bits=True, ) signals.append(sig) if version < "4.00": encoding = encodings[i][k] samples = sig[0] if encoding: if encoding != "latin-1": if encoding == "utf-16-le": samples = ( samples.view(np.uint16) .byteswap() .view(samples.dtype) ) samples = encode( decode(samples, "utf-16-be"), "latin-1", ) else: samples = encode( decode(samples, encoding), "latin-1" ) sig.samples = samples if signals: merged.extend(cg_nr, signals) if first_timestamp is None: first_timestamp = master[0] idx += 1 group.record = None last_timestamps[i] = last_timestamp if first_timestamp is not None: merged.groups[-1].channel_group.comment += ( f"{first_timestamp:.6f}s-{last_timestamp:.6f}s " f'from CG {i} of "{mdf.name.name}"' f"start @ {original_first_timestamp:.6f}s\n" ) else: merged.groups[ -1 ].channel_group.comment += f'there were no samples in channel group {i} of "{mdf.name.name}"\n' if callback: callback(i + 1 + mdf_index * groups_nr, groups_nr * mdf_nr) if MDF._terminate: return merged._transfer_events(mdf) return merged
[docs] @staticmethod def stack(files, version="4.10", sync=True, **kwargs): """ stack several files and return the stacked *MDF* object Parameters ---------- files : list | tuple list of *MDF* file names or *MDF* instances version : str merged file version sync : bool sync the files based on the start of measurement, default *True* Returns ------- stacked : MDF new *MDF* object with stacked channels """ if not files: raise MdfException("No files given for stack") version = validate_version_argument(version) callback = kwargs.get("callback", None) stacked = MDF(version=version, callback=callback) files_nr = len(files) if callback: callback(0, files_nr) if sync: timestamps = [] for file in files: if isinstance(file, MDF): timestamps.append(file.header.start_time) else: with open(file, "rb") as mdf: mdf.seek(64) blk_id = mdf.read(2) if blk_id == b"HD": header = HeaderV3 else: blk_id += mdf.read(2) if blk_id == b"##HD": header = HeaderV4 else: raise MdfException(f'"{file}" is not a valid MDF file') header = header(address=64, stream=mdf) timestamps.append(header.start_time) oldest = min(timestamps) offsets = [(timestamp - oldest).total_seconds() for timestamp in timestamps] stacked.header.start_time = oldest else: offsets = [0 for file in files] cg_nr = -1 for offset, mdf in zip(offsets, files): if not isinstance(mdf, MDF): mdf = MDF(mdf) for i, group in enumerate(mdf.groups): idx = 0 if version < "4.00": encodings = [] included_channels = mdf._included_channels(i) if included_channels: cg_nr += 1 else: continue _, dtypes = mdf._prepare_record(group) data = mdf._load_data(group) for fragment in data: if dtypes.itemsize: group.record = np.core.records.fromstring( fragment[0], dtype=dtypes ) else: group.record = None if idx == 0: signals = [] for j in included_channels: sig = mdf.get( group=i, index=j, data=fragment, raw=True, ignore_invalidation_bits=True, copy_master=False, ) if version < "4.00": if sig.samples.dtype.kind == "S": encodings.append(sig.encoding) strsig = mdf.get( group=i, index=j, samples_only=True, ignore_invalidation_bits=True, )[0] sig.samples = sig.samples.astype(strsig.dtype) del strsig if sig.encoding != "latin-1": if sig.encoding == "utf-16-le": sig.samples = ( sig.samples.view(np.uint16) .byteswap() .view(sig.samples.dtype) ) sig.samples = encode( decode(sig.samples, "utf-16-be"), "latin-1", ) else: sig.samples = encode( decode(sig.samples, sig.encoding), "latin-1", ) else: encodings.append(None) if not sig.samples.flags.writeable: sig.samples = sig.samples.copy() signals.append(sig) if signals: if sync: timestamps = signals[0].timestamps + offset for sig in signals: sig.timestamps = timestamps stacked.append(signals, common_timebase=True) idx += 1 else: master = mdf.get_master(i, fragment) if sync: master = master + offset if len(master): signals = [(master, None)] for k, j in enumerate(included_channels): sig = mdf.get( group=i, index=j, data=fragment, raw=True, samples_only=True, ignore_invalidation_bits=True, ) signals.append(sig) if version < "4.00": encoding = encodings[k] samples = sig[0] if encoding: if encoding != "latin-1": if encoding == "utf-16-le": samples = ( samples.view(np.uint16) .byteswap() .view(samples.dtype) ) samples = encode( decode(samples, "utf-16-be"), "latin-1", ) else: samples = encode( decode(samples, encoding), "latin-1" ) sig.samples = samples if signals: stacked.extend(cg_nr, signals) idx += 1 group.record = None stacked.groups[ -1 ].channel_group.comment = ( f'stacked from channel group {i} of "{mdf.name.parent}"' ) if callback: callback(idx, files_nr) if MDF._terminate: return return stacked
[docs] def iter_channels(self, skip_master=True): """ generator that yields a *Signal* for each non-master channel Parameters ---------- skip_master : bool do not yield master channels; default *True* """ for i, group in enumerate(self.groups): try: master_index = self.masters_db[i] except KeyError: master_index = -1 for j, _ in enumerate(group.channels): if skip_master and j == master_index: continue yield self.get(group=i, index=j)
[docs] def iter_groups(self): """ generator that yields channel groups as pandas DataFrames. If there are multiple occurences for the same channel name inside a channel group, then a counter will be used to make the names unique (<original_name>_<counter>) """ for i, _ in enumerate(self.groups): yield self.get_group(i)
[docs] def resample(self, raster, version=None): """ resample all channels using the given raster. See *configure* to select the interpolation method for interger channels Parameters ---------- raster : float time raster is seconds version : str new mdf file version from ('2.00', '2.10', '2.14', '3.00', '3.10', '3.20', '3.30', '4.00', '4.10', '4.11'); default *None* and in this case the original file version is used Returns ------- mdf : MDF new *MDF* with resampled channels """ if version is None: version = self.version else: version = validate_version_argument(version) interpolation_mode = self._integer_interpolation mdf = MDF(version=version) mdf.header.start_time = self.header.start_time groups_nr = len(self.groups) if self._callback: self._callback(0, groups_nr) # walk through all groups and get all channels cg_nr = -1 for i, group in enumerate(self.groups): if version < "4.00": encodings = [] included_channels = self._included_channels(i) if included_channels: cg_nr += 1 else: continue last = None data = self._load_data(group) for idx, fragment in enumerate(data): if idx == 0: master = self.get_master(i, data=fragment) if len(master) > 1: if raster: num = float(np.float32((master[-1] - master[0]) / raster)) if int(num) == num: master = np.linspace(master[0], master[-1], int(num)) else: master = np.arange(master[0], master[-1], raster) last = master[-1] + raster else: last = master[-1] elif len(master) == 1: last = master[-1] + raster else: last = None else: master = self.get_master(i, data=fragment) if len(master): if last is None: last = master[0] + raster if raster: num = float(np.float32((master[-1] - last) / raster)) if int(num) == num: master = np.linspace(master[0], master[-1], int(num)) else: master = np.arange(last, master[-1], raster) last = master[-1] + raster else: last = master[-1] if idx == 0: sigs = [] for j in included_channels: sig = self.get( group=i, index=j, data=fragment, raw=True, ignore_invalidation_bits=True, ) if version < "4.00": if sig.samples.dtype.kind == "S": encodings.append(sig.encoding) strsig = self.get( group=i, index=j, samples_only=True, ignore_invalidation_bits=True, )[0] sig.samples = sig.samples.astype(strsig.dtype) del strsig if sig.encoding != "latin-1": if sig.encoding == "utf-16-le": sig.samples = ( sig.samples.view(np.uint16) .byteswap() .view(sig.samples.dtype) ) sig.samples = encode( decode(sig.samples, "utf-16-be"), "latin-1" ) else: sig.samples = encode( decode(sig.samples, sig.encoding), "latin-1" ) else: encodings.append(None) if len(master): sig = sig.interp(master, interpolation_mode=interpolation_mode) if not sig.samples.flags.writeable: sig.samples = sig.samples.copy() sigs.append(sig) if sigs: mdf.append( sigs, f"Resampled to {raster}s", common_timebase=True ) else: break else: sigs = [(master, None)] t = self.get_master(i, data=fragment) for k, j in enumerate(included_channels): sig = self.get( group=i, index=j, data=fragment, raw=True, samples_only=True, ignore_invalidation_bits=True, ) if version < "4.00": encoding = encodings[k] samples = sig[0] if encoding: if encoding != "latin-1": if encoding == "utf-16-le": samples = ( samples.view(np.uint16) .byteswap() .view(samples.dtype) ) samples = encode( decode(samples, "utf-16-be"), "latin-1" ) else: samples = encode( decode(samples, encoding), "latin-1" ) sig.samples = samples if len(master): sig = ( Signal(sig[0], t, invalidation_bits=sig[1], name='_') .interp(master, interpolation_mode=interpolation_mode) ) sig = sig.samples, sig.invalidation_bits if not sig[0].flags.writeable: sig = sig[0].copy(), sig[1] sigs.append(sig) if sigs: mdf.extend(cg_nr, sigs) if self._callback: self._callback(cg_nr + 1, groups_nr) if self._terminate: return if self._callback: self._callback(groups_nr, groups_nr) mdf._transfer_events(self) if self._callback: mdf._callback = mdf._mdf._callback = self._callback return mdf
[docs] def select(self, channels, dataframe=False, record_offset=0): """ retreiv the channels listed in *channels* argument as *Signal* objects Parameters ---------- channels : list list of items to be filtered; each item can be : * a channel name string * (channel name, group index, channel index) list or tuple * (channel name, group index) list or tuple * (None, group index, channel index) lsit or tuple dataframe: bool return a pandas DataFrame instead of a list of *Signals*; in this case the signals will be interpolated using the union of all timestamps Returns ------- signals : list list of *Signal* objects based on the input channel list Examples -------- >>> from asammdf import MDF, Signal >>> import numpy as np >>> t = np.arange(5) >>> s = np.ones(5) >>> mdf = MDF() >>> for i in range(4): ... sigs = [Signal(s*(i*10+j), t, name='SIG') for j in range(1,4)] ... mdf.append(sigs) ... >>> # select SIG group 0 default index 1 default, SIG group 3 index 1, SIG group 2 index 1 default and channel index 2 from group 1 ... >>> mdf.select(['SIG', ('SIG', 3, 1), ['SIG', 2], (None, 1, 2)]) [<Signal SIG: samples=[ 1. 1. 1. 1. 1.] timestamps=[0 1 2 3 4] unit="" info=None comment=""> , <Signal SIG: samples=[ 31. 31. 31. 31. 31.] timestamps=[0 1 2 3 4] unit="" info=None comment=""> , <Signal SIG: samples=[ 21. 21. 21. 21. 21.] timestamps=[0 1 2 3 4] unit="" info=None comment=""> , <Signal SIG: samples=[ 12. 12. 12. 12. 12.] timestamps=[0 1 2 3 4] unit="" info=None comment=""> ] """ # group channels by group index gps = {} indexes = [] for item in channels: if isinstance(item, (list, tuple)): if len(item) not in (2, 3): raise MdfException( "The items used for filtering must be strings, " "or they must match the first 3 argumens of the get " "method" ) else: group, index = self._validate_channel_selection(*item) indexes.append((group, index)) if group not in gps: gps[group] = {index} else: gps[group].add(index) else: name = item group, index = self._validate_channel_selection(name) indexes.append((group, index)) if group not in gps: gps[group] = {index} else: gps[group].add(index) signal_parts = {} for group in gps: grp = self.groups[group] data = self._load_data(grp, record_offset=record_offset) parents, dtypes = self._prepare_record(grp) for fragment in data: if dtypes.itemsize: grp.record = np.core.records.fromstring(fragment[0], dtype=dtypes) else: grp.record = None for index in gps[group]: signal = self.get( group=group, index=index, data=fragment, copy_master=False ) if (group, index) not in signal_parts: signal_parts[(group, index)] = [signal] else: signal_parts[(group, index)].append(signal) grp.record = None signals = [] for pair in indexes: parts = signal_parts[pair] signal = Signal( np.concatenate([part.samples for part in parts]), np.concatenate([part.timestamps for part in parts]), unit=parts[0].unit, name=parts[0].name, comment=parts[0].comment, raw=parts[0].raw, conversion=parts[0].conversion, ) signals.append(signal) if dataframe: interpolation_mode = self._integer_interpolation times = [s.timestamps for s in signals] master = reduce(np.union1d, times).flatten().astype(np.float64) signals = [ s.interp(master, interpolation_mode=interpolation_mode) for s in signals ] df = DataFrame() df["time"] = Series(master, index=np.arange(len(master))) df.set_index('time', inplace=True) used_names = UniqueDB() used_names.get_unique_name("time") for k, sig in enumerate(signals): # byte arrays if len(sig.samples.shape) > 1: arr = [sig.samples] types = [(sig.name, sig.samples.dtype, sig.samples.shape[1:])] sig.samples = np.core.records.fromarrays(arr, dtype=types) channel_name = used_names.get_unique_name(sig.name) df[channel_name] = Series(sig.samples, dtype="O") # arrays and structures elif sig.samples.dtype.names: for name, series in components(sig.samples, sig.name, used_names): df[name] = series # scalars else: channel_name = used_names.get_unique_name(sig.name) df[channel_name] = Series(sig.samples) return df else: return signals
[docs] def whereis(self, channel): """ get ocurrences of channel name in the file Parameters ---------- channel : str channel name string Returns ------- ocurrences : tuple Examples -------- >>> mdf = MDF(file_name) >>> mdf.whereis('VehicleSpeed') # "VehicleSpeed" exists in the file ((1, 2), (2, 4)) >>> mdf.whereis('VehicleSPD') # "VehicleSPD" doesn't exist in the file () """ if channel in self: return tuple(self.channels_db[channel]) else: return tuple()
[docs] @staticmethod def scramble(name, **kwargs): """ scramble text blocks and keep original file structure Parameters ---------- name : str | pathlib.Path file name Returns ------- name : str scrambled file name """ name = Path(name) mdf = MDF(name) texts = {} callback = kwargs.get("callback", None) if callback: callback(0, 100) count = len(mdf.groups) if mdf.version >= "4.00": ChannelConversion = ChannelConversionV4 stream = mdf._file if mdf.header.comment_addr: stream.seek(mdf.header.comment_addr + 8) size = UINT64_u(stream.read(8))[0] - 24 texts[mdf.header.comment_addr] = randomized_string(size) for fh in mdf.file_history: addr = fh.comment_addr if addr and addr not in texts: stream.seek(addr + 8) size = UINT64_u(stream.read(8))[0] - 24 texts[addr] = randomized_string(size) for ev in mdf.events: for addr in (ev.comment_addr, ev.name_addr): if addr and addr not in texts: stream.seek(addr + 8) size = UINT64_u(stream.read(8))[0] - 24 texts[addr] = randomized_string(size) for idx, gp in enumerate(mdf.groups, 1): addr = gp.data_group.comment_addr if addr and addr not in texts: stream.seek(addr + 8) size = UINT64_u(stream.read(8))[0] - 24 texts[addr] = randomized_string(size) cg = gp.channel_group for addr in (cg.acq_name_addr, cg.comment_addr): if cg.flags & v4c.FLAG_CG_BUS_EVENT: continue if addr and addr not in texts: stream.seek(addr + 8) size = UINT64_u(stream.read(8))[0] - 24 texts[addr] = randomized_string(size) source = cg.acq_source_addr if source: source = SourceInformation(address=source, stream=stream) for addr in ( source.name_addr, source.path_addr, source.comment_addr, ): if addr and addr not in texts: stream.seek(addr + 8) size = UINT64_u(stream.read(8))[0] - 24 texts[addr] = randomized_string(size) for ch in gp.channels: for addr in (ch.name_addr, ch.unit_addr, ch.comment_addr): if addr and addr not in texts: stream.seek(addr + 8) size = UINT64_u(stream.read(8))[0] - 24 texts[addr] = randomized_string(size) source = ch.source_addr if source: source = SourceInformation(address=source, stream=stream) for addr in ( source.name_addr, source.path_addr, source.comment_addr, ): if addr and addr not in texts: stream.seek(addr + 8) size = UINT64_u(stream.read(8))[0] - 24 texts[addr] = randomized_string(size) conv = ch.conversion_addr if conv: conv = ChannelConversion(address=conv, stream=stream) for addr in (conv.name_addr, conv.unit_addr, conv.comment_addr): if addr and addr not in texts: stream.seek(addr + 8) size = UINT64_u(stream.read(8))[0] - 24 texts[addr] = randomized_string(size) if conv.conversion_type == v4c.CONVERSION_TYPE_ALG: addr = conv.formula_addr if addr and addr not in texts: stream.seek(addr + 8) size = UINT64_u(stream.read(8))[0] - 24 texts[addr] = randomized_string(size) if conv.referenced_blocks: for key, block in conv.referenced_blocks.items(): if block: if block.id == b"##TX": addr = block.address if addr not in texts: stream.seek(addr + 8) size = block.block_len - 24 texts[addr] = randomized_string(size) if callback: callback(int(idx/count*66), 100) mdf.close() dst = name.with_suffix(".scrambled.mf4") copy(name, dst) with open(dst, "rb+") as mdf: count = len(texts) chunk = count // 34 idx = 0 for index, (addr, bts) in enumerate(texts.items()): mdf.seek(addr + 24) mdf.write(bts) if index % chunk == 0: if callback: callback(66 + idx, 100) if callback: callback(100, 100) else: ChannelConversion = ChannelConversionV3 stream = mdf._file if mdf.header.comment_addr: stream.seek(mdf.header.comment_addr + 2) size = UINT16_u(stream.read(2))[0] - 4 texts[mdf.header.comment_addr + 4] = randomized_string(size) texts[36 + 0x40] = randomized_string(32) texts[68 + 0x40] = randomized_string(32) texts[100 + 0x40] = randomized_string(32) texts[132 + 0x40] = randomized_string(32) for idx, gp in enumerate(mdf.groups, 1): cg = gp.channel_group addr = cg.comment_addr if addr and addr not in texts: stream.seek(addr + 2) size = UINT16_u(stream.read(2))[0] - 4 texts[addr + 4] = randomized_string(size) if gp.trigger: addr = gp.trigger.text_addr if addr: stream.seek(addr + 2) size = UINT16_u(stream.read(2))[0] - 4 texts[addr + 4] = randomized_string(size) for ch in gp.channels: for key in ("long_name_addr", "display_name_addr", "comment_addr"): if hasattr(ch, key): addr = getattr(ch, key) else: addr = 0 if addr and addr not in texts: stream.seek(addr + 2) size = UINT16_u(stream.read(2))[0] - 4 texts[addr + 4] = randomized_string(size) texts[ch.address + 26] = randomized_string(32) texts[ch.address + 58] = randomized_string(128) source = ch.source_addr if source: source = ChannelExtension(address=source, stream=stream) if source.type == v23c.SOURCE_ECU: texts[source.address + 12] = randomized_string(80) texts[source.address + 92] = randomized_string(32) else: texts[source.address + 14] = randomized_string(36) texts[source.address + 50] = randomized_string(36) conv = ch.conversion_addr if conv: texts[conv + 22] = randomized_string(20) conv = ChannelConversion(address=conv, stream=stream) if conv.conversion_type == v23c.CONVERSION_TYPE_FORMULA: texts[conv + 36] = randomized_string(conv.block_len - 36) if conv.referenced_blocks: for key, block in conv.referenced_blocks.items(): if block: if block.id == b"TX": addr = block.address if addr and addr not in texts: stream.seek(addr + 2) size = UINT16_u(stream.read(2))[0] - 4 texts[addr + 4] = randomized_string(size) if callback: callback(int(idx/count*66), 100) mdf.close() dst = name.with_suffix(".scrambled.mf4") copy(name, dst) with open(dst, "rb+") as mdf: chunk = count // 34 idx = 0 for index, (addr, bts) in enumerate(texts.items()): mdf.seek(addr) mdf.write(bts) if chunk and index % chunk == 0: if callback: callback(66 + idx, 100) if callback: callback(100, 100) return dst
[docs] def get_group(self, index, raster=None, time_from_zero=False, use_display_names=False): """ get channel group as pandas DataFrames. If there are multiple occurences for the same channel name, then a counter will be used to make the names unique (<original_name>_<counter>) Parameters ---------- index : int channel group index Returns ------- df : pandas.DataFrame """ interpolation_mode = self._integer_interpolation df = DataFrame() master = self.get_master(index) if raster and len(master): if len(master) > 1: num = float(np.float32((master[-1] - master[0]) / raster)) if num.is_integer(): master = np.linspace(master[0], master[-1], int(num)) else: master = np.arange(master[0], master[-1], raster, dtype=np.float64) if time_from_zero and len(master): df = df.assign(time=master) df["time"] = Series(master - master[0], index=np.arange(len(master))) else: df["time"] = Series(master, index=np.arange(len(master))) df.set_index('time', inplace=True) used_names = UniqueDB() used_names.get_unique_name("time") included_channels = self._included_channels(index) signals = [ self.get(group=index, index=idx, copy_master=False) for idx in included_channels ] if raster and len(master): signals = [ sig.interp(master, interpolation_mode=interpolation_mode) for sig in signals ] for sig in signals: # byte arrays if len(sig.samples.shape) > 1: arr = [sig.samples] types = [(sig.name, sig.samples.dtype, sig.samples.shape[1:])] sig.samples = np.core.records.fromarrays(arr, dtype=types) if use_display_names: channel_name = sig.display_name or sig.name else: channel_name = sig.name channel_name = used_names.get_unique_name(channel_name) df[channel_name] = Series(sig.samples, dtype="O") # arrays and structures elif sig.samples.dtype.names: for name, series in components(sig.samples, sig.name, used_names): df[name] = series # scalars else: if use_display_names: channel_name = sig.display_name or sig.name else: channel_name = sig.name channel_name = used_names.get_unique_name(channel_name) df[channel_name] = Series(sig.samples) if use_display_names: channel_name = sig.display_name or sig.name else: channel_name = sig.name return df
[docs] def to_dataframe( self, channels=None, raster=None, time_from_zero=True, empty_channels="skip", keep_arrays=False, use_display_names=False, ): """ generate pandas DataFrame Parameters ---------- * channels : list filter a subset of channels; default *None* * raster : float time raster for resampling; default *None* * time_from_zero : bool adjust time channel to start from 0 * empty_channels : str behaviour for channels without samples; the options are *skip* or *zeros*; default is *skip* * use_display_names : bool use display name instead of standard channel name, if available. * keep_arrays : bool keep arrays and structure channels as well as the component channels. If *True* this can be very slow. If *False* only the component channels are saved, and their names will be prefixed with the parent channel. Returns ------- dataframe : pandas.DataFrame """ df = DataFrame() masters = [self.get_master(i) for i in range(len(self.groups))] self._master_channel_cache.clear() master = reduce(np.union1d, masters) if raster and len(master): if len(master) > 1: num = float(np.float32((master[-1] - master[0]) / raster)) if num.is_integer(): master = np.linspace(master[0], master[-1], int(num)) else: master = np.arange(master[0], master[-1], raster, dtype=np.float64) if time_from_zero and len(master): df = df.assign(time=master) df["time"] = Series(master - master[0], index=np.arange(len(master))) else: df["time"] = Series(master, index=np.arange(len(master))) df.set_index('time', inplace=True) used_names = UniqueDB() used_names.get_unique_name("time") for i, grp in enumerate(self.groups): if grp.channel_group.cycles_nr == 0 and empty_channels == "skip": continue included_channels = self._included_channels(i) channels = grp.channels data = self._load_data(grp) _, dtypes = self._prepare_record(grp) signals = [[] for _ in included_channels] timestamps = [] for fragment in data: if dtypes.itemsize: grp.record = np.core.records.fromstring(fragment[0], dtype=dtypes) else: grp.record = None for k, index in enumerate(included_channels): signal = self.get( group=i, index=index, data=fragment, samples_only=True ) signals[k].append(signal[0]) timestamps.append(self.get_master(i, data=fragment, copy_master=False)) grp.record = None if len(timestamps): signals = [np.concatenate(parts) for parts in signals] timestamps = np.concatenate(timestamps) signals = [ Signal(samples, timestamps, name=channels[idx].name).interp( master, self._integer_interpolation ) for samples, idx in zip(signals, included_channels) ] for sig in signals: if len(sig) == 0: if empty_channels == "zeros": sig.samples = np.zeros(len(master), dtype=sig.samples.dtype) else: continue signals = [sig for sig in signals if len(sig)] for k, sig in enumerate(signals): # byte arrays if len(sig.samples.shape) > 1: arr = [sig.samples] types = [(sig.name, sig.samples.dtype, sig.samples.shape[1:])] sig.samples = np.core.records.fromarrays(arr, dtype=types) if use_display_names: channel_name = sig.display_name or sig.name else: channel_name = sig.name channel_name = used_names.get_unique_name(channel_name) df[channel_name] = Series(sig.samples, dtype="O") # arrays and structures elif sig.samples.dtype.names: for name, series in components(sig.samples, sig.name, used_names): df[name] = series # scalars else: if use_display_names: channel_name = sig.display_name or sig.name else: channel_name = sig.name channel_name = used_names.get_unique_name(channel_name) df[channel_name] = Series(sig.samples) if use_display_names: channel_name = sig.display_name or sig.name else: channel_name = sig.name return df
if __name__ == "__main__": pass