API

MDF

This class acts as a proxy for the MDF2, MDF3 and MDF4 classes. All attribute access is delegated to the underlying _mdf attribute (MDF2, MDF3 or MDF4 object). See MDF3 and MDF4 for available extra methods (MDF2 and MDF3 share the same implementation).

An empty MDF file is created if the name argument is not provided. If the name argument is provided then the file must exist in the filesystem, otherwise an exception is raised.

The best practice is to use the MDF as a context manager. This way all resources are released correctly in case of exceptions.

with MDF(r'test.mdf') as mdf_file:
    # do something
class asammdf.mdf.MDF(name=None, version='4.10', **kwargs)[source]

Unified access to MDF v3 and v4 files. Underlying _mdf’s attributes and methods are linked to the MDF object via setattr. This is done to expose them to the user code and for performance considerations.

Parameters:
name : string | BytesIO

mdf file name (if provided it must be a real file name) or file-like object

version : string

mdf file version from (‘2.00’, ‘2.10’, ‘2.14’, ‘3.00’, ‘3.10’, ‘3.20’, ‘3.30’, ‘4.00’, ‘4.10’, ‘4.11’); default ‘4.10’

callback : function

keyword only argument: function to call to update the progress; the function must accept two arguments (the current progress and maximum progress value)

use_display_names : bool

keyword only argument: for MDF4 files parse the XML channel comment to search for the display name; XML parsing is quite expensive so setting this to False can decrease the loading times very much; default False

static concatenate(files, version='4.10', sync=True, **kwargs)[source]

concatenates several files. The files must have the same internal structure (same number of groups, and same channels in each group)

Parameters:
files : list | tuple

list of MDF file names or MDF instances

version : str

merged file version

sync : bool

sync the files based on the start of measurement, default True

Returns:
concatenate : MDF

new MDF object with concatenated channels

Raises:
MdfException : if there are inconsistencies between the files
convert(version)[source]

convert MDF to other version

Parameters:
version : str

new mdf file version from (‘2.00’, ‘2.10’, ‘2.14’, ‘3.00’, ‘3.10’, ‘3.20’, ‘3.30’, ‘4.00’, ‘4.10’, ‘4.11’); default ‘4.10’

Returns:
out : MDF

new MDF object

cut(start=None, stop=None, whence=0, version=None, include_ends=True)[source]

cut MDF file. start and stop limits are absolute values or values relative to the first timestamp depending on the whence argument.

Parameters:
start : float

start time, default None. If None then the start of measurement is used

stop : float

stop time, default None. If None then the end of measurement is used

whence : int

how to search for the start and stop values

  • 0 : absolute
  • 1 : relative to first timestamp
version : str

new mdf file version from (‘2.00’, ‘2.10’, ‘2.14’, ‘3.00’, ‘3.10’, ‘3.20’, ‘3.30’, ‘4.00’, ‘4.10’, ‘4.11’); default None and in this case the original file version is used

include_ends : bool

include the start and stop timestamps after cutting the signal. If start and stop are found in the original timestamps, then the new samples will be computed using interpolation. Default True

Returns
——-
out : MDF

new MDF object

export(fmt, filename=None, **kargs)[source]

export MDF to other formats. The MDF file name is used is available, else the filename argument must be provided.

Parameters:
fmt : string

can be one of the following:

  • csv : CSV export that uses the “;” delimiter. This option will generate a new csv file for each data group (<MDFNAME>_DataGroup_<cntr>.csv)
  • hdf5 : HDF5 file output; each MDF data group is mapped to a HDF5 group with the name ‘DataGroup_<cntr>’ (where <cntr> is the index)
  • excel : Excel file output (very slow). This option will generate a new excel file for each data group (<MDFNAME>_DataGroup_<cntr>.xlsx)
  • mat : Matlab .mat version 4, 5 or 7.3 export. If single_time_base==False the channels will be renamed in the mat file to ‘D<cntr>_<channel name>’. The channel group master will be renamed to ‘DM<cntr>_<channel name>’ ( <cntr> is the data group index starting from 0)
  • pandas : export all channels as a single pandas DataFrame
  • parquet : export to Apache parquet format
filename : string | pathlib.Path

export file name

**kwargs
  • single_time_base: resample all channels to common time base, default False (pandas export is by default single based)
  • raster: float time raster for resampling. Valid if single_time_base is True and for pandas export
  • time_from_zero: adjust time channel to start from 0
  • use_display_names: use display name instead of standard channel name, if available.
  • empty_channels: behaviour for channels without samples; the options are skip or zeros; default is skip
  • format: only valid for mat export; can be ‘4’, ‘5’ or ‘7.3’, default is ‘5’
  • oned_as: only valid for mat export; can be ‘row’ or ‘column’
  • keep_arrays : keep arrays and structure channels as well as the component channels. If True this can be very slow. If False only the component channels are saved, and their names will be prefixed with the parent channel.
Returns:
dataframe : pandas.DataFrame

only in case of pandas export

filter(channels, version=None)[source]

return new MDF object that contains only the channels listed in channels argument

Parameters:
channels : list

list of items to be filtered; each item can be :

  • a channel name string
  • (channel name, group index, channel index) list or tuple
  • (channel name, group index) list or tuple
  • (None, group index, channel index) list or tuple
version : str

new mdf file version from (‘2.00’, ‘2.10’, ‘2.14’, ‘3.00’, ‘3.10’, ‘3.20’, ‘3.30’, ‘4.00’, ‘4.10’, ‘4.11’); default None and in this case the original file version is used

Returns:
mdf : MDF

new MDF file

Examples

>>> from asammdf import MDF, Signal
>>> import numpy as np
>>> t = np.arange(5)
>>> s = np.ones(5)
>>> mdf = MDF()
>>> for i in range(4):
...     sigs = [Signal(s*(i*10+j), t, name='SIG') for j in range(1,4)]
...     mdf.append(sigs)
...
>>> filtered = mdf.filter(['SIG', ('SIG', 3, 1), ['SIG', 2], (None, 1, 2)])
>>> for gp_nr, ch_nr in filtered.channels_db['SIG']:
...     print(filtered.get(group=gp_nr, index=ch_nr))
...
<Signal SIG:
        samples=[ 1.  1.  1.  1.  1.]
        timestamps=[0 1 2 3 4]
        unit=""
        info=None
        comment="">
<Signal SIG:
        samples=[ 31.  31.  31.  31.  31.]
        timestamps=[0 1 2 3 4]
        unit=""
        info=None
        comment="">
<Signal SIG:
        samples=[ 21.  21.  21.  21.  21.]
        timestamps=[0 1 2 3 4]
        unit=""
        info=None
        comment="">
<Signal SIG:
        samples=[ 12.  12.  12.  12.  12.]
        timestamps=[0 1 2 3 4]
        unit=""
        info=None
        comment="">
get_group(index, raster=None, time_from_zero=False, use_display_names=False)[source]

get channel group as pandas DataFrames. If there are multiple occurences for the same channel name, then a counter will be used to make the names unique (<original_name>_<counter>)

Parameters:
index : int

channel group index

Returns:
df : pandas.DataFrame
iter_channels(skip_master=True)[source]

generator that yields a Signal for each non-master channel

Parameters:
skip_master : bool

do not yield master channels; default True

iter_get(name=None, group=None, index=None, raster=None, samples_only=False, raw=False)[source]

iterator over a channel

This is usefull in case of large files with a small number of channels.

If the raster keyword argument is not None the output is interpolated accordingly

Parameters:
name : string

name of channel

group : int

0-based group index

index : int

0-based channel index

raster : float

time raster in seconds

samples_only : bool
if True return only the channel samples as numpy array; if

False return a Signal object

raw : bool

return channel samples without appling the conversion rule; default False

iter_groups()[source]

generator that yields channel groups as pandas DataFrames. If there are multiple occurences for the same channel name inside a channel group, then a counter will be used to make the names unique (<original_name>_<counter>)

resample(raster, version=None)[source]

resample all channels using the given raster. See configure to select the interpolation method for interger channels

Parameters:
raster : float

time raster is seconds

version : str

new mdf file version from (‘2.00’, ‘2.10’, ‘2.14’, ‘3.00’, ‘3.10’, ‘3.20’, ‘3.30’, ‘4.00’, ‘4.10’, ‘4.11’); default None and in this case the original file version is used

Returns:
mdf : MDF

new MDF with resampled channels

static scramble(name, **kwargs)[source]

scramble text blocks and keep original file structure

Parameters:
name : str | pathlib.Path

file name

Returns:
name : str

scrambled file name

select(channels, dataframe=False, record_offset=0)[source]

retreiv the channels listed in channels argument as Signal objects

Parameters:
channels : list

list of items to be filtered; each item can be :

  • a channel name string
  • (channel name, group index, channel index) list or tuple
  • (channel name, group index) list or tuple
  • (None, group index, channel index) lsit or tuple
dataframe: bool

return a pandas DataFrame instead of a list of Signals; in this case the signals will be interpolated using the union of all timestamps

Returns:
signals : list

list of Signal objects based on the input channel list

Examples

>>> from asammdf import MDF, Signal
>>> import numpy as np
>>> t = np.arange(5)
>>> s = np.ones(5)
>>> mdf = MDF()
>>> for i in range(4):
...     sigs = [Signal(s*(i*10+j), t, name='SIG') for j in range(1,4)]
...     mdf.append(sigs)
...
>>> # select SIG group 0 default index 1 default, SIG group 3 index 1, SIG group 2 index 1 default and channel index 2 from group 1
...
>>> mdf.select(['SIG', ('SIG', 3, 1), ['SIG', 2],  (None, 1, 2)])
[<Signal SIG:
        samples=[ 1.  1.  1.  1.  1.]
        timestamps=[0 1 2 3 4]
        unit=""
        info=None
        comment="">
, <Signal SIG:
        samples=[ 31.  31.  31.  31.  31.]
        timestamps=[0 1 2 3 4]
        unit=""
        info=None
        comment="">
, <Signal SIG:
        samples=[ 21.  21.  21.  21.  21.]
        timestamps=[0 1 2 3 4]
        unit=""
        info=None
        comment="">
, <Signal SIG:
        samples=[ 12.  12.  12.  12.  12.]
        timestamps=[0 1 2 3 4]
        unit=""
        info=None
        comment="">
]
static stack(files, version='4.10', sync=True, **kwargs)[source]

stack several files and return the stacked MDF object

Parameters:
files : list | tuple

list of MDF file names or MDF instances

version : str

merged file version

sync : bool

sync the files based on the start of measurement, default True

Returns:
stacked : MDF

new MDF object with stacked channels

to_dataframe(channels=None, raster=None, time_from_zero=True, empty_channels='skip', keep_arrays=False, use_display_names=False)[source]

generate pandas DataFrame

Parameters:
* channels : list

filter a subset of channels; default None

* raster : float

time raster for resampling; default None

* time_from_zero : bool

adjust time channel to start from 0

* empty_channels : str

behaviour for channels without samples; the options are skip or zeros; default is skip

* use_display_names : bool

use display name instead of standard channel name, if available.

* keep_arrays : bool

keep arrays and structure channels as well as the component channels. If True this can be very slow. If False only the component channels are saved, and their names will be prefixed with the parent channel.

Returns:
dataframe : pandas.DataFrame
whereis(channel)[source]

get ocurrences of channel name in the file

Parameters:
channel : str

channel name string

Returns:
ocurrences : tuple

Examples

>>> mdf = MDF(file_name)
>>> mdf.whereis('VehicleSpeed') # "VehicleSpeed" exists in the file
((1, 2), (2, 4))
>>> mdf.whereis('VehicleSPD') # "VehicleSPD" doesn't exist in the file
()

MDF3

class asammdf.blocks.mdf_v3.MDF3(name=None, version='3.30', **kwargs)[source]

The header attibute is a HeaderBlock.

The groups attribute is a list of dicts, each one with the following keys

  • data_group - DataGroup object
  • channel_group - ChannelGroup object
  • channels - list of Channel objects with the same order as found in the mdf file
  • channel_dependencies - list of ChannelArrayBlock in case of channel arrays; list of Channel objects in case of structure channel composition
  • data_block - address of data block
  • data_location- integer code for data location (original file, temporary file or memory)
  • data_block_addr - list of raw samples starting addresses
  • data_block_type - list of codes for data block type
  • data_block_size - list of raw samples block size
  • sorted - sorted indicator flag
  • record_size - dict that maps record ID’s to record sizes in bytes
  • size - total size of data block for the current group
  • trigger - Trigger object for current group
Parameters:
name : string | pathlib.Path

mdf file name (if provided it must be a real file name) or file-like object

version : string

mdf file version (‘2.00’, ‘2.10’, ‘2.14’, ‘3.00’, ‘3.10’, ‘3.20’ or ‘3.30’); default ‘3.30’

callback : function

keyword only argument: function to call to update the progress; the function must accept two arguments (the current progress and maximum progress value)

Attributes:
attachments : list

list of file attachments

channels_db : dict

used for fast channel access by name; for each name key the value is a list of (group index, channel index) tuples

groups : list

list of data group dicts

header : HeaderBlock

mdf file header

identification : FileIdentificationBlock

mdf file start block

masters_db : dict
used for fast master channel access; for each group index key the value

is the master channel index

memory : str

memory optimization option

name : string

mdf file name

version : str

mdf version

add_trigger(group, timestamp, pre_time=0, post_time=0, comment='')[source]

add trigger to data group

Parameters:
group : int

group index

timestamp : float

trigger time

pre_time : float

trigger pre time; default 0

post_time : float

trigger post time; default 0

comment : str

trigger comment

append(signals, acquisition_info='Python', common_timebase=False, units=None)[source]

Appends a new data group.

For channel dependencies type Signals, the samples attribute must be a numpy.recarray

Parameters:
signals : list | Signal | pandas.DataFrame

list of Signal objects, or a single Signal object, or a pandas DataFrame object. All bytes columns in the pandas DataFrame must be latin-1 encoded

acquisition_info : str

acquisition information; default ‘Python’

common_timebase : bool

flag to hint that the signals have the same timebase. Only set this if you know for sure that all appended channels share the same time base

units : dict

will contain the signal units mapped to the singal names when appending a pandas DataFrame

Examples

>>> # case 1 conversion type None
>>> s1 = np.array([1, 2, 3, 4, 5])
>>> s2 = np.array([-1, -2, -3, -4, -5])
>>> s3 = np.array([0.1, 0.04, 0.09, 0.16, 0.25])
>>> t = np.array([0.001, 0.002, 0.003, 0.004, 0.005])
>>> names = ['Positive', 'Negative', 'Float']
>>> units = ['+', '-', '.f']
>>> info = {}
>>> s1 = Signal(samples=s1, timstamps=t, unit='+', name='Positive')
>>> s2 = Signal(samples=s2, timstamps=t, unit='-', name='Negative')
>>> s3 = Signal(samples=s3, timstamps=t, unit='flts', name='Floats')
>>> mdf = MDF3('new.mdf')
>>> mdf.append([s1, s2, s3], 'created by asammdf v1.1.0')
>>> # case 2: VTAB conversions from channels inside another file
>>> mdf1 = MDF3('in.mdf')
>>> ch1 = mdf1.get("Channel1_VTAB")
>>> ch2 = mdf1.get("Channel2_VTABR")
>>> sigs = [ch1, ch2]
>>> mdf2 = MDF3('out.mdf')
>>> mdf2.append(sigs, 'created by asammdf v1.1.0')
>>> df = pd.DataFrame.from_dict({'s1': np.array([1, 2, 3, 4, 5]), 's2': np.array([-1, -2, -3, -4, -5])})
>>> units = {'s1': 'V', 's2': 'A'}
>>> mdf2.append(df, units=units)
close()[source]

if the MDF was created with memory=’minimum’ and new channels have been appended, then this must be called just before the object is not used anymore to clean-up the temporary file

configure(*, read_fragment_size=None, write_fragment_size=None, use_display_names=None, single_bit_uint_as_bool=None, integer_interpolation=None)[source]

configure MDF parameters

Parameters:
read_fragment_size : int

size hint of split data blocks, default 8MB; if the initial size is smaller, then no data list is used. The actual split size depends on the data groups’ records size

write_fragment_size : int

size hint of split data blocks, default 4MB; if the initial size is smaller, then no data list is used. The actual split size depends on the data groups’ records size. Maximum size is 4MB to ensure compatibility with CANape

use_display_names : bool

search for display name in the Channel XML comment

single_bit_uint_as_bool : bool

return single bit channels are np.bool arrays

integer_interpolation : int

interpolation mode for integer channels:

  • 0 - repeat previous sample
  • 1 - use linear interpolation
extend(index, signals)[source]

Extend a group with new samples. signals contains (values, invalidation_bits) pairs for each extended signal. Since MDF3 does not support invalidation bits, the second item of each pair must be None. The first pair is the master channel’s pair, and the next pairs must respect the same order in which the signals were appended. The samples must have raw or physical values according to the Signals used for the initial append.

Parameters:
index : int

group index

signals : list

list of (numpy.ndarray, None) objects

Examples

>>> # case 1 conversion type None
>>> s1 = np.array([1, 2, 3, 4, 5])
>>> s2 = np.array([-1, -2, -3, -4, -5])
>>> s3 = np.array([0.1, 0.04, 0.09, 0.16, 0.25])
>>> t = np.array([0.001, 0.002, 0.003, 0.004, 0.005])
>>> names = ['Positive', 'Negative', 'Float']
>>> units = ['+', '-', '.f']
>>> s1 = Signal(samples=s1, timstamps=t, unit='+', name='Positive')
>>> s2 = Signal(samples=s2, timstamps=t, unit='-', name='Negative')
>>> s3 = Signal(samples=s3, timstamps=t, unit='flts', name='Floats')
>>> mdf = MDF3('new.mdf')
>>> mdf.append([s1, s2, s3], 'created by asammdf v1.1.0')
>>> t = np.array([0.006, 0.007, 0.008, 0.009, 0.010])
>>> mdf2.extend(0, [(t, None), (s1.samples, None), (s2.samples, None), (s3.samples, None)])
get(name=None, group=None, index=None, raster=None, samples_only=False, data=None, raw=False, ignore_invalidation_bits=False, source=None, record_offset=0, record_count=None, copy_master=True)[source]

Gets channel samples. Channel can be specified in two ways:

  • using the first positional argument name

    • if source is given this will be first used to validate the channel selection
    • if there are multiple occurances for this channel then the group and index arguments can be used to select a specific group.
    • if there are multiple occurances for this channel and either the group or index arguments is None then a warning is issued
  • using the group number (keyword argument group) and the channel number (keyword argument index). Use info method for group and channel numbers

If the raster keyword argument is not None the output is interpolated accordingly.

Parameters:
name : string

name of channel

group : int

0-based group index

index : int

0-based channel index

raster : float

time raster in seconds

samples_only : bool

if True return only the channel samples as numpy array; if False return a Signal object

data : bytes

prevent redundant data read by providing the raw data group samples

raw : bool

return channel samples without appling the conversion rule; default False

ignore_invalidation_bits : bool

only defined to have the same API with the MDF v4

source : str

source name used to select the channel

record_offset : int

if data=None use this to select the record offset from which the group data should be loaded

copy_master : bool

make a copy of the timebase for this channel

Returns:
res : (numpy.array, None) | Signal

returns Signal if samples_only*=*False (default option), otherwise returns a (numpy.array, None) tuple (for compatibility with MDF v4 class.

The Signal samples are

  • numpy recarray for channels that have CDBLOCK or BYTEARRAY type channels
  • numpy array for all the rest
Raises:
MdfException :
* if the channel name is not found
* if the group index is out of range
* if the channel index is out of range

Examples

>>> from asammdf import MDF, Signal
>>> import numpy as np
>>> t = np.arange(5)
>>> s = np.ones(5)
>>> mdf = MDF(version='3.30')
>>> for i in range(4):
...     sigs = [Signal(s*(i*10+j), t, name='Sig') for j in range(1, 4)]
...     mdf.append(sigs)
...
>>> # first group and channel index of the specified channel name
...
>>> mdf.get('Sig')
UserWarning: Multiple occurances for channel "Sig". Using first occurance from data group 4. Provide both "group" and "index" arguments to select another data group
<Signal Sig:
        samples=[ 1.  1.  1.  1.  1.]
        timestamps=[0 1 2 3 4]
        unit=""
        info=None
        comment="">
>>> # first channel index in the specified group
...
>>> mdf.get('Sig', 1)
<Signal Sig:
        samples=[ 11.  11.  11.  11.  11.]
        timestamps=[0 1 2 3 4]
        unit=""
        info=None
        comment="">
>>> # channel named Sig from group 1 channel index 2
...
>>> mdf.get('Sig', 1, 2)
<Signal Sig:
        samples=[ 12.  12.  12.  12.  12.]
        timestamps=[0 1 2 3 4]
        unit=""
        info=None
        comment="">
>>> # channel index 1 or group 2
...
>>> mdf.get(None, 2, 1)
<Signal Sig:
        samples=[ 21.  21.  21.  21.  21.]
        timestamps=[0 1 2 3 4]
        unit=""
        info=None
        comment="">
>>> mdf.get(group=2, index=1)
<Signal Sig:
        samples=[ 21.  21.  21.  21.  21.]
        timestamps=[0 1 2 3 4]
        unit=""
        info=None
        comment="">
>>> mdf.get('Sig', source='VN7060')
<Signal Sig:
        samples=[ 12.  12.  12.  12.  12.]
        timestamps=[0 1 2 3 4]
        unit=""
        info=None
        comment="">
get_channel_comment(name=None, group=None, index=None)[source]

Gets channel comment. Channel can be specified in two ways:

  • using the first positional argument name

    • if there are multiple occurances for this channel then the group and index arguments can be used to select a specific group.
    • if there are multiple occurances for this channel and either the group or index arguments is None then a warning is issued
  • using the group number (keyword argument group) and the channel number (keyword argument index). Use info method for group and channel numbers

If the raster keyword argument is not None the output is interpolated accordingly.

Parameters:
name : string

name of channel

group : int

0-based group index

index : int

0-based channel index

Returns:
comment : str

found channel comment

get_channel_name(group, index)[source]

Gets channel name.

Parameters:
group : int

0-based group index

index : int

0-based channel index

Returns:
name : str

found channel name

get_channel_unit(name=None, group=None, index=None)[source]

Gets channel unit.

Channel can be specified in two ways:

  • using the first positional argument name

    • if there are multiple occurances for this channel then the group and index arguments can be used to select a specific group.
    • if there are multiple occurances for this channel and either the group or index arguments is None then a warning is issued
  • using the group number (keyword argument group) and the channel number (keyword argument index). Use info method for group and channel numbers

If the raster keyword argument is not None the output is interpolated accordingly.

Parameters:
name : string

name of channel

group : int

0-based group index

index : int

0-based channel index

Returns:
unit : str

found channel unit

get_master(index, data=None, raster=None, record_offset=0, record_count=None, copy_master=True)[source]

returns master channel samples for given group

Parameters:
index : int

group index

data : (bytes, int)

(data block raw bytes, fragment offset); default None

raster : float

raster to be used for interpolation; default None

record_offset : int

if data=None use this to select the record offset from which the group data should be loaded

Returns:
t : numpy.array

master channel samples

info()[source]

get MDF information as a dict

Examples

>>> mdf = MDF3('test.mdf')
>>> mdf.info()
iter_get_triggers()[source]

generator that yields triggers

Returns:
trigger_info : dict

trigger information with the following keys:

  • comment : trigger comment
  • time : trigger time
  • pre_time : trigger pre time
  • post_time : trigger post time
  • index : trigger index
  • group : data group index of trigger
save(dst, overwrite=False, compression=0)[source]

Save MDF to dst. If overwrite is True then the destination file is overwritten, otherwise the file name is appended with ‘.<cntr>’, were ‘<cntr>’ is the first counter that produces a new file name (that does not already exist in the filesystem).

Parameters:
dst : str | pathlib.Path

destination file name

overwrite : bool

overwrite flag, default False

compression : int

does nothing for mdf version3; introduced here to share the same API as mdf version 4 files

Returns:
output_file : str

output file name

MDF4

class asammdf.blocks.mdf_v4.MDF4(name=None, version='4.10', **kwargs)[source]

The header attibute is a HeaderBlock.

The groups attribute is a list of dicts, each one with the following keys:

  • data_group - DataGroup object
  • channel_group - ChannelGroup object
  • channels - list of Channel objects with the same order as found in the mdf file
  • channel_dependencies - list of ChannelArrayBlock in case of channel arrays; list of Channel objects in case of structure channel composition
  • data_block - address of data block
  • data_location- integer code for data location (original file, temporary file or memory)
  • data_block_addr - list of raw samples starting addresses
  • data_block_type - list of codes for data block type
  • data_block_size - list of raw samples block size
  • sorted - sorted indicator flag
  • record_size - dict that maps record ID’s to record sizes in bytes (including invalidation bytes)
  • param - row size used for tranposizition, in case of tranposed zipped blockss
Parameters:
name : string

mdf file name (if provided it must be a real file name) or file-like object

  • if full the data group binary data block will be memorised in RAM
  • if low the channel data is read from disk on request, and the metadata is memorized into RAM
  • if minimum only minimal data is memorized into RAM
version : string

mdf file version (‘4.00’, ‘4.10’, ‘4.11’); default ‘4.10’

callback : function

keyword only argument: function to call to update the progress; the function must accept two arguments (the current progress and maximum progress value)

use_display_names : bool

keyword only argument: for MDF4 files parse the XML channel comment to search for the display name; XML parsing is quite expensive so setting this to False can decrease the loading times very much; default False

Attributes:
attachments : list

list of file attachments

channels_db : dict

used for fast channel access by name; for each name key the value is a list of (group index, channel index) tuples

events : list

list event blocks

file_comment : TextBlock

file comment TextBlock

file_history : list

list of (FileHistory, TextBlock) pairs

groups : list

list of data group dicts

header : HeaderBlock

mdf file header

identification : FileIdentificationBlock

mdf file start block

masters_db : dict
used for fast master channel access; for each group index key the value

is the master channel index

name : string

mdf file name

version : str

mdf version

append(signals, source_info='Python', common_timebase=False, units=None)[source]

Appends a new data group.

For channel dependencies type Signals, the samples attribute must be a numpy.recarray

Parameters:
signals : list | Signal | pandas.DataFrame

list of Signal objects, or a single Signal object, or a pandas DataFrame object. All bytes columns in the pandas DataFrame must be utf-8 encoded

source_info : str

source information; default ‘Python’

common_timebase : bool

flag to hint that the signals have the same timebase. Only set this if you know for sure that all appended channels share the same time base

units : dict

will contain the signal units mapped to the singal names when appending a pandas DataFrame

Examples

>>> # case 1 conversion type None
>>> s1 = np.array([1, 2, 3, 4, 5])
>>> s2 = np.array([-1, -2, -3, -4, -5])
>>> s3 = np.array([0.1, 0.04, 0.09, 0.16, 0.25])
>>> t = np.array([0.001, 0.002, 0.003, 0.004, 0.005])
>>> names = ['Positive', 'Negative', 'Float']
>>> units = ['+', '-', '.f']
>>> info = {}
>>> s1 = Signal(samples=s1, timstamps=t, unit='+', name='Positive')
>>> s2 = Signal(samples=s2, timstamps=t, unit='-', name='Negative')
>>> s3 = Signal(samples=s3, timstamps=t, unit='flts', name='Floats')
>>> mdf = MDF4('new.mdf')
>>> mdf.append([s1, s2, s3], 'created by asammdf v4.0.0')
>>> # case 2: VTAB conversions from channels inside another file
>>> mdf1 = MDF4('in.mf4')
>>> ch1 = mdf1.get("Channel1_VTAB")
>>> ch2 = mdf1.get("Channel2_VTABR")
>>> sigs = [ch1, ch2]
>>> mdf2 = MDF4('out.mf4')
>>> mdf2.append(sigs, 'created by asammdf v4.0.0')
>>> mdf2.append(ch1, 'just a single channel')
>>> df = pd.DataFrame.from_dict({'s1': np.array([1, 2, 3, 4, 5]), 's2': np.array([-1, -2, -3, -4, -5])})
>>> units = {'s1': 'V', 's2': 'A'}
>>> mdf2.append(df, units=units)
attach(data, file_name=None, comment=None, compression=True, mime='application/octet-stream', embedded=True)[source]

attach embedded attachment as application/octet-stream

Parameters:
data : bytes

data to be attached

file_name : str

string file name

comment : str

attachment comment

compression : bool

use compression for embedded attachment data

mime : str

mime type string

embedded : bool

attachment is embedded in the file

Returns:
index : int

new attachment index

close()[source]

if the MDF was created with memory=False and new channels have been appended, then this must be called just before the object is not used anymore to clean-up the temporary file

configure(*, read_fragment_size=None, write_fragment_size=None, use_display_names=None, single_bit_uint_as_bool=None, integer_interpolation=None)[source]

configure MDF parameters

Parameters:
read_fragment_size : int

size hint of split data blocks, default 8MB; if the initial size is smaller, then no data list is used. The actual split size depends on the data groups’ records size

write_fragment_size : int

size hint of split data blocks, default 4MB; if the initial size is smaller, then no data list is used. The actual split size depends on the data groups’ records size. Maximum size is 4MB to ensure compatibility with CANape

use_display_names : bool

search for display name in the Channel XML comment

single_bit_uint_as_bool : bool

return single bit channels are np.bool arrays

integer_interpolation : int

interpolation mode for integer channels:

  • 0 - repeat previous sample
  • 1 - use linear interpolation
extend(index, signals)[source]

Extend a group with new samples. signals contains (values, invalidation_bits) pairs for each extended signal. The first pair is the master channel’s pair, and the next pairs must respect the same order in which the signals were appended. The samples must have raw or physical values according to the Signals used for the initial append.

Parameters:
index : int

group index

signals : list

list on (numpy.ndarray, numpy.ndarray) objects

Examples

>>> # case 1 conversion type None
>>> s1 = np.array([1, 2, 3, 4, 5])
>>> s2 = np.array([-1, -2, -3, -4, -5])
>>> s3 = np.array([0.1, 0.04, 0.09, 0.16, 0.25])
>>> t = np.array([0.001, 0.002, 0.003, 0.004, 0.005])
>>> names = ['Positive', 'Negative', 'Float']
>>> units = ['+', '-', '.f']
>>> s1 = Signal(samples=s1, timstamps=t, unit='+', name='Positive')
>>> s2 = Signal(samples=s2, timstamps=t, unit='-', name='Negative')
>>> s3 = Signal(samples=s3, timstamps=t, unit='flts', name='Floats')
>>> mdf = MDF3('new.mdf')
>>> mdf.append([s1, s2, s3], 'created by asammdf v1.1.0')
>>> t = np.array([0.006, 0.007, 0.008, 0.009, 0.010])
>>> # extend without invalidation bits
>>> mdf2.extend(0, [(t, None), (s1, None), (s2, None), (s3, None)])
>>> # some invaldiation btis
>>> s1_inv = np.array([0,0,0,1,1], dtype=np.bool)
>>> mdf2.extend(0, [(t, None), (s1.samples, None), (s2.samples, None), (s3.samples, None)])
extract_attachment(address=None, index=None)[source]

extract attachment data by original address or by index. If it is an embedded attachment, then this method creates the new file according to the attachment file name information

Parameters:
address : int

attachment index; default None

index : int

attachment index; default None

Returns:
data : (bytes, pathlib.Path)

tuple of attachment data and path

get(name=None, group=None, index=None, raster=None, samples_only=False, data=None, raw=False, ignore_invalidation_bits=False, source=None, record_offset=0, record_count=None, copy_master=True)[source]

Gets channel samples. The raw data group samples are not loaded to memory so it is advised to use filter or select instead of performing several get calls.

Channel can be specified in two ways:

  • using the first positional argument name

    • if source is given this will be first used to validate the channel selection
    • if there are multiple occurances for this channel then the group and index arguments can be used to select a specific group.
    • if there are multiple occurances for this channel and either the group or index arguments is None then a warning is issued
  • using the group number (keyword argument group) and the channel number (keyword argument index). Use info method for group and channel numbers

If the raster keyword argument is not None the output is interpolated accordingly

Parameters:
name : string

name of channel

group : int

0-based group index

index : int

0-based channel index

raster : float

time raster in seconds

samples_only : bool
if True return only the channel samples as numpy array; if

False return a Signal object

data : bytes

prevent redundant data read by providing the raw data group samples

raw : bool

return channel samples without appling the conversion rule; default False

ignore_invalidation_bits : bool

option to ignore invalidation bits

source : str

source name used to select the channel

record_offset : int

if data=None use this to select the record offset from which the group data should be loaded

record_count : int

number of records to read; default None and in this case all available records are used

copy_master : bool

make a copy of the timebase for this channel

Returns:
res : (numpy.array, numpy.array) | Signal

returns Signal if samples_only*=*False (default option), otherwise returns a (numpy.array, numpy.array) tuple of samples and invalidation bits. If invalidation bits are not used or if ignore_invalidation_bits if False, then the second item will be None.

The Signal samples are:

  • numpy recarray for channels that have composition/channel array address or for channel of type CANOPENDATE, CANOPENTIME
  • numpy array for all the rest
Raises:
MdfException :
* if the channel name is not found
* if the group index is out of range
* if the channel index is out of range

Examples

>>> from asammdf import MDF, Signal
>>> import numpy as np
>>> t = np.arange(5)
>>> s = np.ones(5)
>>> mdf = MDF(version='4.10')
>>> for i in range(4):
...     sigs = [Signal(s*(i*10+j), t, name='Sig') for j in range(1, 4)]
...     mdf.append(sigs)
...
>>> # first group and channel index of the specified channel name
...
>>> mdf.get('Sig')
UserWarning: Multiple occurances for channel "Sig". Using first occurance from data group 4. Provide both "group" and "index" arguments to select another data group
<Signal Sig:
        samples=[ 1.  1.  1.  1.  1.]
        timestamps=[0 1 2 3 4]
        unit=""
        info=None
        comment="">
>>> # first channel index in the specified group
...
>>> mdf.get('Sig', 1)
<Signal Sig:
        samples=[ 11.  11.  11.  11.  11.]
        timestamps=[0 1 2 3 4]
        unit=""
        info=None
        comment="">
>>> # channel named Sig from group 1 channel index 2
...
>>> mdf.get('Sig', 1, 2)
<Signal Sig:
        samples=[ 12.  12.  12.  12.  12.]
        timestamps=[0 1 2 3 4]
        unit=""
        info=None
        comment="">
>>> # channel index 1 or group 2
...
>>> mdf.get(None, 2, 1)
<Signal Sig:
        samples=[ 21.  21.  21.  21.  21.]
        timestamps=[0 1 2 3 4]
        unit=""
        info=None
        comment="">
>>> mdf.get(group=2, index=1)
<Signal Sig:
        samples=[ 21.  21.  21.  21.  21.]
        timestamps=[0 1 2 3 4]
        unit=""
        info=None
        comment="">
>>> # validation using source name
...
>>> mdf.get('Sig', source='VN7060')
<Signal Sig:
        samples=[ 12.  12.  12.  12.  12.]
        timestamps=[0 1 2 3 4]
        unit=""
        info=None
        comment="">
get_can_signal(name, database=None, db=None, ignore_invalidation_bits=False)[source]

get CAN message signal. You can specify an external CAN database ( database argument) or canmatrix databse object that has already been loaded from a file (db argument).

The signal name can be specified in the following ways

  • CAN<ID>.<MESSAGE_NAME>.<SIGNAL_NAME> - the ID value starts from 1 and must match the ID found in the measurement (the source CAN bus ID) Example: CAN1.Wheels.FL_WheelSpeed
  • CAN<ID>.CAN_DataFrame_<MESSAGE_ID>.<SIGNAL_NAME> - the ID value starts from 1 and the MESSAGE_ID is the decimal message ID as found in the database. Example: CAN1.CAN_DataFrame_218.FL_WheelSpeed
  • <MESSAGE_NAME>.SIGNAL_NAME - in this case the first occurence of the message name and signal are returned (the same message could be found on muplit CAN buses; for example on CAN1 and CAN3) Example: Wheels.FL_WheelSpeed
  • CAN_DataFrame_<MESSAGE_ID>.<SIGNAL_NAME> - in this case the first occurence of the message name and signal are returned (the same message could be found on muplit CAN buses; for example on CAN1 and CAN3). Example: CAN_DataFrame_218.FL_WheelSpeed
  • <SIGNAL_NAME> - in this case the first occurence of the signal name is returned ( the same signal anme coudl be found in multiple messages and on multiple CAN buses). Example: FL_WheelSpeed
Parameters:
name : str

signal name

database : str

path of external CAN database file (.dbc or .arxml); default None

db : canmatrix.database

canmatrix CAN database object; default None

ignore_invalidation_bits : bool

option to ignore invalidation bits

Returns:
sig : Signal

Signal object with the physical values

get_channel_comment(name=None, group=None, index=None)[source]

Gets channel comment.

Channel can be specified in two ways:

  • using the first positional argument name

    • if there are multiple occurrences for this channel then the group and index arguments can be used to select a specific group.
    • if there are multiple occurrences for this channel and either the group or index arguments is None then a warning is issued
  • using the group number (keyword argument group) and the channel number (keyword argument index). Use info method for group and channel numbers

If the raster keyword argument is not None the output is interpolated accordingly.

Parameters:
name : string

name of channel

group : int

0-based group index

index : int

0-based channel index

Returns:
comment : str

found channel comment

get_channel_name(group, index)[source]

Gets channel name.

Parameters:
group : int

0-based group index

index : int

0-based channel index

Returns:
name : str

found channel name

get_channel_unit(name=None, group=None, index=None)[source]

Gets channel unit.

Channel can be specified in two ways:

  • using the first positional argument name

    • if there are multiple occurrences for this channel then the group and index arguments can be used to select a specific group.
    • if there are multiple occurrences for this channel and either the group or index arguments is None then a warning is issued
  • using the group number (keyword argument group) and the channel number (keyword argument index). Use info method for group and channel numbers

If the raster keyword argument is not None the output is interpolated accordingly.

Parameters:
name : string

name of channel

group : int

0-based group index

index : int

0-based channel index

Returns:
unit : str

found channel unit

get_invalidation_bits(group_index, channel, fragment)[source]

get invalidation indexes for the channel

Parameters:
group_index : int

group index

channel : Channel

channel object

fragment : (bytes, int)

(fragment bytes, fragment offset)

Returns:
invalidation_bits : iterable

iterable of valid channel indexes; if all are valid None is returned

get_master(index, data=None, raster=None, record_offset=0, record_count=None, copy_master=True)[source]

returns master channel samples for given group

Parameters:
index : int

group index

data : (bytes, int)

(data block raw bytes, fragment offset); default None

raster : float

raster to be used for interpolation; default None

record_offset : int

if data=None use this to select the record offset from which the group data should be loaded

record_count : int

number of records to read; default None and in this case all available records are used

copy_master : bool

return a copy of the cached master

Returns:
t : numpy.array

master channel samples

info()[source]

get MDF information as a dict

Examples

>>> mdf = MDF4('test.mdf')
>>> mdf.info()
save(dst, overwrite=False, compression=0)[source]

Save MDF to dst. If overwrite is True then the destination file is overwritten, otherwise the file name is appened with ‘.<cntr>’, were ‘<cntr>’ is the first conter that produces a new file name (that does not already exist in the filesystem)

Parameters:
dst : str

destination file name, Default ‘’

overwrite : bool

overwrite flag, default False

compression : int

use compressed data blocks, default 0; valid since version 4.10

  • 0 - no compression
  • 1 - deflate (slower, but produces smaller files)
  • 2 - transposition + deflate (slowest, but produces the smallest files)
Returns:
output_file : pathlib.Path

path to saved file

Signal

class asammdf.signal.Signal(samples=None, timestamps=None, unit='', name='', conversion=None, comment='', raw=True, master_metadata=None, display_name='', attachment=(), source=None, bit_count=None, stream_sync=False, invalidation_bits=None, encoding=None)[source]

The Signal represents a channel described by it’s samples and timestamps. It can perform arithmetic operations against other Signal or numeric types. The operations are computed in respect to the timestamps (time correct). The non-float signals are not interpolated, instead the last value relative to the current timestamp is used. samples, timstamps and name are mandatory arguments.

Parameters:
samples : numpy.array | list | tuple

signal samples

timestamps : numpy.array | list | tuple

signal timestamps

unit : str

signal unit

name : str

signal name

conversion : dict | channel conversion block

dict that contains extra conversion information about the signal , default None

comment : str

signal comment, default ‘’

raw : bool

signal samples are raw values, with no physical conversion applied

master_metadata : list

master name and sync type

display_name : str

display name used by mdf version 3

attachment : bytes, name

channel attachment and name from MDF version 4

source : SignalSource

source information named tuple

bit_count : int

bit count; useful for integer channels

stream_sync : bool

the channel is a synchronisation for the attachment stream (mdf v4 only)

invalidation_bits : numpy.array | None

channel invalidation bits, default None

encoding : str | None

encoding for string signals; default None

astype(np_type)[source]

returns new Signal with samples of dtype np_type

Parameters:
np_type : np.dtype

new numpy dtye

Returns:
signal : Signal

new Signal with the samples of np_type dtype

cut(start=None, stop=None, include_ends=True, interpolation_mode=0)[source]

Cuts the signal according to the start and stop values, by using the insertion indexes in the signal’s time axis.

Parameters:
start : float

start timestamp for cutting

stop : float

stop timestamp for cutting

include_ends : bool

include the start and stop timestamps after cutting the signal. If start and stop are found in the original timestamps, then the new samples will be computed using interpolation. Default True

interpolation_mode : int

interpolation mode for integer signals; default 0

  • 0 - repeat previous samples
  • 1 - linear interpolation
Returns:
result : Signal

new Signal cut from the original

Examples

>>> new_sig = old_sig.cut(1.0, 10.5)
>>> new_sig.timestamps[0], new_sig.timestamps[-1]
0.98, 10.48
extend(other)[source]

extend signal with samples from another signal

Parameters:
other : Signal
Returns:
signal : Signal

new extended Signal

interp(new_timestamps, interpolation_mode=0)[source]

returns a new Signal interpolated using the new_timestamps

Parameters:
new_timestamps : np.array

timestamps used for interpolation

interpolation_mode : int

interpolation mode for integer signals; default 0

  • 0 - repeat previous samples
  • 1 - linear interpolation
Returns:
signal : Signal

new interpolated Signal

physical()[source]

get the physical samples values

Returns:
phys : Signal

new Signal with physical values

plot()[source]

plot Signal samples. Pyqtgraph is used if it is available; in this case see the GUI plot documentation to see the available commands