Source code for place.plugins.moku_lab.moku_lab

"""The PLACE module for the Moku:Lab"""
import calendar
import time

import numpy as np

from place.config import PlaceConfig
from place.plugins.instrument import Instrument

try:
    from pymoku import Moku
    from pymoku.instruments import BodeAnalyzer
except ImportError:
    pass

# Maximum and minimum number of points per sweep.
MAX_P = 512
MIN_P = 32


[docs]class MokuLab(Instrument): """The MokuLab class for place""" def __init__(self, config, plotter): """Initialize the MokuLab, without configuring. :param config: configuration data (as a parsed JSON object) :type config: dict :param plotter: a plotting object to return plots to the web interface :type plotter: plots.PlacePlotter """ Instrument.__init__(self, config, plotter) self.total_updates = None self.sweeps = None self.moku = None self.bode = None self.time_safety_net = None
[docs] def config(self, metadata, total_updates): """ Called by PLACE at the beginning of the experiment to get everything up and running. """ self.total_updates = total_updates name = self.__class__.__name__ tsn_str = PlaceConfig().get_config_value(name, 'time_safety_net', '3.0') self.time_safety_net = float(tsn_str) metadata['MokuLab-predicted-update-time'] = self._predicted_sweep_time() self.sweeps = _calc_sweeps( f_start=self._config['f_start'], f_end=self._config['f_end'], n_pts=self._config['data_points'] )
[docs] def update(self, update_number, progress): """ Called by PLACE during the experiment, update_number of times. """ framedata = { "freq": [], "ch1_mag": [], "ch2_mag": [], "ch1_phase": [], "ch2_phase": [] } for sweep in self.sweeps: self._set_up_moku_sweep(sweep) framedata = self._get_and_plot_live_data( progress, framedata, sweep ) if self._config['data_points'] % 2 != 0: framedata = cut_last_point(framedata) self._print_statements(update_number) return self._save_data(framedata)
[docs] def cleanup(self, abort=False): """Nothing to cleanup""" pass
def _set_up_moku_sweep(self, sweep): ip_address = PlaceConfig().get_config_value( self.__class__.__name__, 'ip_address') ch1_amp = self._config['ch1_amp'] ch2_amp = self._config['ch2_amp'] self.moku = Moku(ip_address) self.bode = self.moku.deploy_or_connect(BodeAnalyzer) try: # or self._config['data_points'] % 2 != 0: self.bode.set_xmode('sweep') self.bode.set_output(1, ch1_amp) self.bode.set_output(2, ch2_amp) self.bode.set_frontend( channel=1, ac=True, atten=False, fiftyr=False) self.bode.set_frontend( channel=2, ac=True, atten=False, fiftyr=False) except: self.moku.close() raise self.bode.set_sweep( sweep[0] * 1000, sweep[-1] * 1000, len(sweep), False, self._config['averaging_time'], self._config['settling_time'], self._config['averaging_cycles'], self._config['settling_cycles']) self.bode.start_sweep(single=self._config['single_sweep']) def _get_and_plot_live_data(self, progress, framedata, sweep): sweep_time = self._predicted_sweep_time(pts=len(sweep)) then = calendar.timegm(time.gmtime()) now = then frame = self.bode.get_realtime_data() while sweep_time * self.time_safety_net > now - then: flag = 0 sweepdata = { "freq": (np.array(frame.frequency)/1000).tolist(), "ch1_mag": frame.ch1.magnitude_dB, "ch1_phase": frame.ch1.phase, "ch2_mag": frame.ch2.magnitude_dB, "ch2_phase": frame.ch2.phase} sweepdata = replace_none_dictionary(sweepdata) if (sweep == self.sweeps[-1] and (self._config['data_points'] % 2 != 0) and sweepdata['ch1_mag'][-2] and sweepdata['ch2_mag'][-2] and sweepdata['ch1_phase'][-2] and sweepdata['ch2_phase'][-2] is not np.nan): sweepdata = cut_last_point(sweepdata) self._live_progress( channel=1, progress=progress, framedata=merge_dictionaries(framedata, sweepdata) ) self._live_progress( channel=2, progress=progress, framedata=merge_dictionaries(framedata, sweepdata) ) for key in sweepdata: if key != 'freq': if sweepdata[key][-1] is not np.nan: flag = 1 break if flag == 1: break now = calendar.timegm(time.gmtime()) frame = self.bode.get_realtime_data() framedata = merge_dictionaries(framedata, sweepdata) self.moku.close() return framedata def _live_progress(self, channel, progress, framedata): other_channel = 1 if channel == 2 else 2 if self._config['channel'] != 'ch{}'.format(other_channel): raw_mag = framedata['ch{}_mag'.format(channel)] mag = [] for x in raw_mag: if np.isnan(x): break mag.append(x) raw_phase = framedata['ch{}_phase'.format(channel)] phase = [] for x in raw_phase: if np.isnan(x): break phase.append(x) mag_freq = framedata['freq'][:len(mag)] phase_freq = framedata['freq'][:len(phase)] self.plotter.view( 'Channel {} Magnitude'.format(channel), [ self.plotter.line( ydata=mag, xdata=mag_freq, color="green", shape="none", label="magnitude" ) ] ) self.plotter.view( 'Channel {} Phase'.format(channel), [ self.plotter.line( ydata=phase, xdata=phase_freq, color="purple", shape="none", label="phase" ) ] ) def _save_data(self, framedata): freq = framedata['freq'] mag = [framedata['ch1_mag'], framedata['ch2_mag']] phase = [framedata['ch1_phase'], framedata['ch2_phase']] linedata = { "mag": [np.array([[freq[i], mag[0][i]] for i in range( min(len(freq), len(mag[0])))]), np.array([[freq[i], mag[1][i]] for i in range( min(len(freq), len(mag[1])))])], "phase": [np.array([[freq[i], phase[0][i]] for i in range( min(len(freq), len(phase[0])))]), np.array([[freq[i], phase[1][i]] for i in range( min(len(freq), len(phase[1])))])] } fielddata = { "mag": ['{}-ch1_magnitude_data'.format(self.__class__.__name__), '{}-ch2_magnitude_data'.format(self.__class__.__name__)], "phase": ['{}-ch1_phase_data'.format(self.__class__.__name__), '{}-ch2_phase_data'.format(self.__class__.__name__)] } shape = '({},2)float64'.format(self._config['data_points']) if self._config['channel'] == 'ch1': data = np.array( [(linedata['mag'][0], linedata['phase'][0])], dtype=[(fielddata['mag'][0], shape), (fielddata['phase'][0], shape)]) if self._config['channel'] == 'ch2': data = np.array( [(linedata['mag'][1], linedata['phase'][1])], dtype=[(fielddata['mag'][1], shape), (fielddata['phase'][1], shape)]) if self._config['channel'] == 'both': data = np.array( [(linedata['mag'][0], linedata['phase'][0], linedata['mag'][1], linedata['phase'][1])], dtype=[(fielddata['mag'][0], shape), (fielddata['phase'][0], shape), (fielddata['mag'][1], shape), (fielddata['phase'][1], shape)]) return data.copy() def _print_statements(self, update_number): if update_number == self.total_updates - 2: print('Almost there, I have 1 more update to work through.') if self._config['pause'] and self._config['plot'] != 'no': print( 'Double-click the figure when you\'re ready to move on to the next update.') print('Cheers mate.') elif update_number == self.total_updates - 1: print("I've finished the final sweep.") if self._config['pause'] and self._config['plot'] != 'no': print('Please close the plot to wrap up your experiment.') print('May the odds be ever in your favor.') else: print("Don't celebrate yet,") print('I have {} more updates to work through.'.format( self.total_updates - (update_number + 1))) if self._config['pause'] and self._config['plot'] != 'no': print( 'I need you to double-click the figure when you\'re ready for me to continue.') print('Cheers mate.') def _predicted_sweep_time(self, pts=None): """ Empirical equation for sweep time estimates. Note: rough estimate, lower freuqencies take longer (such as around 20kHz) """ a_1 = self._config['averaging_time'] a_2 = self._config['averaging_cycles'] s_1 = self._config['settling_time'] s_2 = self._config['settling_cycles'] if pts is None: pts = self._config['data_points'] return max(a_1*pts, (a_2/10000)*(pts/3.75)) + max(s_1*pts, (s_2/10000)*(pts/3.75))
def _calc_sweeps(f_start, f_end, n_pts): """ Calculates the frequencies that will be sent through the core. Splits them into sweeps of allowed length. """ points, step = np.linspace(f_start, f_end, n_pts, retstep=True) if n_pts % 2 != 0: points = np.linspace(f_start, f_end + step, n_pts + 1) points = points.tolist() sweeps = [points[x:x+MAX_P] for x in range(0, len(points), MAX_P)] if len(sweeps[-1]) < MIN_P: sweeps[-1] = sweeps[-2][-MIN_P:] + sweeps[-1] sweeps[-2] = sweeps[-2][:-MIN_P] return sweeps def _add_to_wiggle_plot(data, freq, shift, axes): wiggle = data / np.amax(np.abs(data)) + shift axes.plot(wiggle, freq, color='black', linewidth=0.5) positive = [not np.isnan(x) and x > shift for x in wiggle] axes.fill_betweenx(freq, wiggle, shift, where=positive, color='black')
[docs]def merge_dictionaries(d_1, d_2): """ Merges dictionaries so that second is integrated into first. Returns extended first dictionary. """ d_3 = {} for key1, value1 in d_1.items(): d_3[key1] = value1 for key2, value2 in d_2.items(): if key1 == key2: d_3[key1] = value1 + value2 break else: d_3[key2] = value2 return d_3
[docs]def replace_none_dictionary(d_1): """ Replaces all None's in dictionary with nan. """ no_none_d_1 = {} for key in d_1: no_none_d_1[key] = [np.nan if x is None else x for x in d_1[key]] return no_none_d_1
[docs]def cut_last_point(d_1): """ Removes last point of final framedata that was added to avoid None for odd points. """ finaldata = {} for key in d_1: finaldata[key] = d_1[key][:-1] return finaldata