Source code for place.plugins.polytec.polytec

"""The Polytech vibrometer instrument.

This plugin allows interfacing with Polytec vibrometer OFV-5000 controller and
OFV-505 sensor head. Functions based on Polytec "RS-232 Interface Commands:
OFV-5000 User Manual"

**NOTE** For each polytec controller, different decoders may be installed.
These values should be stored in the your PLACE config file.  (~/.place.cfg)

--------
Settings
--------

The Polytec plugin will need entries in your ``.place.cfg`` file.
Specifically, it needs the communication port, the baudrate, and names for
any decoders you have installed. Here is an example::

    [Polytec]
    port = /dev/ttyS0
    baudrate = 115200
    dd_300 = DisplDec,0
    dd_900 = DisplDec,1
    vd_08 = VeloDec,0
    vd_09 = VeloDec,1

--------
Metadata
--------

The Polytec plugin will produce the following experimental metadata:

========================= ============== ================================================
Key                       Type           Meaning
========================= ============== ================================================
actual_area_min           int            the actual minimum autofocus range used
                                         (if using custom autofocus)
actual_area_max           int            the actual maximum autofocus range used
                                         (if using custom autofocus)
vd_08_time_delay          float          the decoder time delay (if used)
vd_08_maximum_frequency   float          the decoder maximum frequency (if used)
vd_09_time_delay          float          the decoder time delay (if used)
vd_09_maximum_frequency   float          the decoder maximum frequency (if used)
========================= ============== ================================================

----
Data
----

The Polytec plugin will produce the following experimental data:

+----------------+-------------------------+---------------------------+
| Heading        | Type                    | Meaning                   |
+================+=========================+===========================+
| Polytec-signal | uint64                  | the signal level recorded |
|                |                         | from the vibrometer       |
+----------------+-------------------------+---------------------------+
"""
import ast
import re
from time import sleep

import numpy as np
import serial
from serial import Serial

from place.config import PlaceConfig
from place.plugins.instrument import Instrument

_NUMBER = r'[-+]?\d*\.\d+|\d+'


[docs]class Polytec(Instrument): """The polytec class""" def __init__(self, config, plotter): """Constructor""" Instrument.__init__(self, config, plotter) self._serial = None self._signal = None self.min_used = None self.max_used = None
[docs] def config(self, metadata, total_updates): """Configure the vibrometer. :param metadata: experiment metadata :type metadata: dict :param total_updates: number of updates for the experiment :type total_updates: int """ name = self.__class__.__name__ self._serial = Serial( port=PlaceConfig().get_config_value(name, "port"), baudrate=PlaceConfig().get_config_value(name, "baudrate"), timeout=10, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS) if self._config['dd_300']: self._setup_decoder(metadata, 'dd_300') if self._config['dd_900']: self._setup_decoder(metadata, 'dd_900') if self._config['vd_08']: self._setup_decoder(metadata, 'vd_08') if self._config['vd_09']: self._setup_decoder(metadata, 'vd_09') if self._config['autofocus'] == 'custom': curr_set = self._write_and_readline( 'GetDevInfo,SensorHead,0,Focus\n') curr_min, curr_max = ast.literal_eval(curr_set) self.min_used = max(curr_min, self._config['area_min']) self.max_used = min(curr_max, self._config['area_max']) metadata['actual_area_min'] = self.min_used metadata['actual_area_max'] = self.max_used
[docs] def update(self, update_number, progress): """Update the vibrometer. :param update_number: the count of the current update (0-indexed) :type update_number: int :param progress: a dictionary of values passed back to your Elm app :type progress: dict :returns: an array containing the signal level :rtype: numpy.array dtype='uint64' """ if self._config['autofocus'] != 'none': if update_number == 0 or self._config['autofocus_everytime'] is True: self._autofocus_vibrometer( span=self._config['autofocus'], timeout=self._config['timeout']) signal_level = self._get_signal_level() field = '{}-signal'.format(self.__class__.__name__) data = np.array([(signal_level,)], dtype=[(field, 'uint64')]) if self._config['plot']: self._draw_plot(signal_level, update_number, progress) return data
[docs] def cleanup(self, abort=False): """Closes the serial port to the Polytec. :param abort: indicates that the experiment is being aborted and is unfinished :type abort: bool """ if abort is False: self._serial.close()
# PRIVATE METHODS def _write(self, message): """Send a message :param message: message to be sent to the Polytec receiver :type message: str """ self._serial.write(message.encode()) def _write_and_readline(self, message): """Send a message and get a response. :param message: message to be sent to the Polytec receiver :type message: str :returns: the decoded response :rtype: str """ self._write(message) return self._serial.readline().decode('ascii', 'replace') def _setup_decoder(self, metadata, name): """Set the range for the decoder and obtain metadata :param metadata: experiment metadata :type metadata: dict :param name: the name to use for the decoder :type name: str """ id_ = PlaceConfig().get_config_value(self.__class__.__name__, name) self._set_range(id_, self._config[name + '_range']) if name == 'vd_08' or name == 'vd_09': metadata[name + '_time_delay'] = self._get_delay(id_) metadata[name + '_maximum_frequency'] = self._get_maximum_frequency(id_) def _autofocus_vibrometer(self, span='Full', timeout=30): """Autofocus the vibrometer. :param span: the range in which the vibrometer should look for focus :type span: str :param timeout: the number of seconds to wait for focus before failing :type timeout: int :raises RuntimeError: if focus is not found before timeout """ if self._config['autofocus'] == 'custom': self._write('Set,SensorHead,0,AutoFocusArea,{},{}\n'.format( self.min_used, self.max_used)) else: self._write('Set,SensorHead,0,AutoFocusSpan,'+span+'\n') self._write('Set,SensorHead,0,AutoFocus,Search\n') countdown = timeout tick = 1 while countdown > 0: sleep(tick) countdown -= tick if self._write_and_readline('Get,SensorHead,0,AutoFocusResult\n') == 'Found\n': break else: raise RuntimeError('autofocus failed') def _get_delay(self, id_): """Get time delay. :param id_: the identification string for the decoder :type id_: str :returns: the delay time :rtype: float """ delay_string = self._write_and_readline( 'Get,' + id_ + ',SignalDelay\n') return float(re.findall(_NUMBER, delay_string)[0]) def _get_maximum_frequency(self, id_): """Get the maximum frequency. :param id_: the identification string for the decoder :type id_: str :returns: the frequency value of the selected decoder :rtype: float :raises ValueError: if maximum frequency is not available """ frequency_string = self._write_and_readline( 'Get,' + id_ + ',MaxFreq\n') if frequency_string == 'Not Available': raise ValueError( 'maximum frequency for {} not available'.format(id_)) return _parse_frequency(frequency_string) def _get_range(self, name, id_): """Get the current range. :param name: the name for the decoder :type name: str :param id_: the identification string for the decoder :type id_: str :returns: the range value and units returned from the instrument :rtype: float, string :raises ValueError: if decoder name is not recognized """ decoder_range = self._write_and_readline('Get,' + id_ + ',Range\n') if name == 'dd_300': range_num = re.findall(_NUMBER, self._config['dd_300_range']) elif name == 'dd_900': raw_num = re.findall(_NUMBER, self._config['dd_900_range']) range_num = [string.replace('um', 'µm') for string in raw_num] elif name == 'vd_08': range_num = re.findall(_NUMBER, self._config['vd_08_range']) elif name == 'vd_09': range_num = re.findall(_NUMBER, self._config['vd_09_range']) else: raise ValueError('unknown decoder: ' + name) del_num_r = len(range_num)+1 calib = float(range_num[0]) calib_unit = decoder_range[del_num_r:].lstrip() return calib, calib_unit def _set_range(self, id_, range_): """Set the range. :param id_: the identification string for the decoder :type id_: str :param range_: the desired decoder range :type range_: str """ self._write('Set,' + id_ + ',Range,' + range_ + '\n') def _get_signal_level(self): return int(self._write_and_readline('Get,SignalLevel,0,Value\n')) def _draw_plot(self, signal_level, update_number, progress): if update_number == 0: self._signal = [signal_level] else: self._signal.append(signal_level) title = 'Signal level at each PLACE update' self.plotter.view( title, [ self.plotter.line( self._signal, color='purple', shape='cross', label='signal' ) ] )
# TODO: add axis labels when PLACE supports it # plt.xlabel('trace') # plt.ylabel('signal level') def _parse_frequency(frequency_string): """Calculate a frequency from a string. Takes a frequency string and parses it to a float value. .. doctest:: >>> _parse_frequency('20MHz') 20000000.0 >>> _parse_frequency('20 MHz') 20000000.0 >>> _parse_frequency('5kHz') 5000.0 >>> _parse_frequency('16.6mhz') 16600000.000000002 >>> _parse_frequency('16.6 mhz') 16600000.000000002 :param frequency_string: string to be parsed :type frequency_string: str :returns: the frequency value :rtype: float :raises ValueError: if frequency units are not recognized """ re_match = re.match( r'([-+]?\d*\.\d+|\d+)\s?([kmg]?Hz)', frequency_string, flags=re.IGNORECASE # pylint: disable=no-member ) if re_match is None: raise ValueError( 'could not parse frequency string: ' + frequency_string) else: num_str, unit_str = re_match.groups() if unit_str.lower() == 'hz': return float(num_str) elif unit_str.lower() == 'khz': return float(num_str) * 10**3 elif unit_str.lower() == 'mhz': return float(num_str) * 10**6 elif unit_str.lower() == 'ghz': return float(num_str) * 10**9 else: raise ValueError('could not match units of frequency: ' + unit_str)