"""Run an experiment"""
import datetime
import json
import os
from importlib import import_module
from operator import attrgetter
from time import time
from threading import Event
import copy
import pkg_resources
import numpy as np
from numpy import datetime64 as npdatetime64 # pylint: disable=no-name-in-module
from numpy.lib import recfunctions as rfn
from placeweb.settings import MEDIA_ROOT
from .place_progress import PlaceProgress
from .plots import PlacePlotter
from .plugins.export import Export
from .plugins.instrument import Instrument
from .plugins.postprocessing import PostProcessing
from .utilities import build_single_file
[docs]class AbortExperiment(Exception):
"""Custom exceptions for aborting an experiment"""
pass
[docs]class BasicExperiment:
"""Basic experiment class
This is the first (and so far, only) experiment class in PLACE. It takes
in configuration data for a variety of instruments. Each instrument must
have a priority value. This experiment uses the priority order to execute
a specified number of updates on each instrument. Data is collected from
the instruments and saved as a NumPy record array.
Even if the instruments used do not produce their own data, PLACE will
still output a timestamp (with microsecond precision) into the
experimental data:
+---------------+-------------------------+-------------------------+
| Heading | Type | Meaning |
+===============+=========================+=========================+
| PLACE-time | numpy.datetime64[us] | timestamp from the |
| | | system clock, taken at |
| | | the beginning of each |
| | | update |
+---------------+-------------------------+-------------------------+
"""
def __init__(self, config):
"""Experiment constructor
:param config: a decoded JSON dictionary
:type config: dict
"""
version = pkg_resources.require("place")[0].version
self.abort_event = Event()
self.config = config
self.plugins = []
self.metadata = {
'PLACE_version': version,
'timestamp': int(round(time() * 1000)), # milliseconds since epoch
}
self.progress = PlaceProgress(config)
self.progress.update_time = 0.0
self._create_experiment_directory()
# save config data right away in case we need to reload the settings
with open(self.config['directory'] + '/config.json', 'x') as config_file:
json.dump(_remove_specific_items(self.config),
config_file, indent=2, sort_keys=True)
self.init_phase()
[docs] def run(self):
"""Run the experiment"""
try:
self.config_phase()
self.update_phase()
self.cleanup_phase()
except AbortExperiment:
self.cleanup_phase(abort=True)
[docs] def init_phase(self):
"""Initialize the plugins
During this phase, all plugins receive their configuration data and
should store it. The list of plugins being used by the experiment is
created and sorted by their priority level. No physical configuration
should occur during this phase.
"""
for elm_name, plugin_data in self.config['plugins'].items():
if self.abort_event.is_set():
raise AbortExperiment
# get import details for plugin's Python code
try:
python_module_name = plugin_data['metadata']['python_module_name']
python_class_name = plugin_data['metadata']['python_class_name']
except KeyError:
raise KeyError(
'Could not find key in module: {}, {}'.format(elm_name, plugin_data))
# get the progress dictionary for this plugin
prog = self.progress.experiment['plugins'][elm_name]['progress']
# get the directory (relative to MEDIA ROOT) for storing plots
directory = os.path.relpath(
self.config['directory'], start=MEDIA_ROOT
)
# create a PLACE plotter for the plugin
plotter = PlacePlotter(prog, directory)
# attempt to dynamically import the plugin's Python module
try:
plugin = _programmatic_import(
python_module_name, python_class_name, plugin_data['config'], plotter)
except ModuleNotFoundError:
raise ModuleNotFoundError(
'Cannot find module related to: {}'.format(plugin_data))
plugin.priority = plugin_data['priority']
plugin.elm_module_name = elm_name
self.plugins.append(plugin)
# sort plugins based on priority
self.plugins.sort(key=attrgetter('priority'))
[docs] def config_phase(self):
"""Configure the instruments and post-processing plugins.
During the configuration phase, instruments and post-processing plugins
are provided with their configuration data. Metadata is collected from
all plugins and written to disk.
"""
for plugin in self.plugins:
if self.abort_event.is_set():
raise AbortExperiment
self.progress.log('config', plugin.elm_module_name)
try:
config_func = plugin.config
except AttributeError:
continue
config_func(self.metadata, self.config['updates'])
self.config['metadata'] = self.metadata
# overwrite the config data now that all plugins have submitted their
# metadata
with open(self.config['directory'] + '/config.json', 'w') as config_file:
json.dump(_remove_specific_items(self.config),
config_file, indent=2, sort_keys=True)
[docs] def update_phase(self):
"""Perform all the updates on the plugins.
The update phase occurs *N* times, based on the user configuration for
the experiment. This function loops over the instruments and
post-processing plugins (based on their priority) and calls their
update method.
One NumPy file will be written for each update. If the experiement
completes normally, these files will be merged into a single NumPy
file.
"""
self.progress.update_time = 1.0
for update_number in range(self.config['updates']):
self._run_update(update_number)
self.progress.update_time = 0.0
[docs] def cleanup_phase(self, abort=False):
"""Cleanup the plugins.
During this phase, each module has its cleanup method called. If the
abort flag has not been set in the cleanup call, this will be passed to
the module.
:param abort: signals that the experiment is being aborted
:type abort: bool
"""
if abort:
for plugin in self.plugins:
plugin.cleanup(abort=True)
return
build_single_file(self.config['directory'])
for plugin in self.plugins:
if self.abort_event.is_set():
raise AbortExperiment
self.progress.log('cleanup', plugin.elm_module_name)
if issubclass(plugin.__class__, Export):
plugin.export(self.config['directory'])
else:
plugin.cleanup(abort=False)
with open(self.config['directory'] + '/results.json', 'x') as results_file:
json.dump(self.progress.to_dict(), results_file,
indent=2, sort_keys=True)
def _create_experiment_directory(self):
self.config['directory'] = os.path.abspath(
os.path.expanduser(self.config['directory']))
if not os.path.exists(self.config['directory']):
os.makedirs(self.config['directory'])
else:
for i in range(1, 1000):
if not os.path.exists(self.config['directory'] + '-' + str(i)):
self.config['directory'] += '-' + str(i)
break
print('Experiment path exists - saving to ' +
self.config['directory'])
os.makedirs(self.config['directory'])
def _run_update(self, update_number):
"""Run one update phase"""
then = time()
self.progress.start_update(update_number)
data = np.array([(npdatetime64(datetime.datetime.now()),)],
dtype=[('PLACE-time', 'datetime64[us]')])
for plugin in self.plugins:
if self.abort_event.is_set():
raise AbortExperiment
self.progress.log('update', plugin.elm_module_name)
data = self._run_plugin_update(plugin, update_number, data)
# save data for this update
filename = '{}/data_{:03d}.npy'.format(
self.config['directory'], update_number)
with open(filename, 'xb') as data_file:
np.save(data_file, data.copy(), allow_pickle=False)
now = time()
update_time = now - then
weight = max(0.1, 1 / (update_number + 1))
self.progress.update_time = (
update_time * weight
+ self.progress.update_time * (1 - weight)
)
def _run_plugin_update(self, plugin, update_number, data):
"""Run the update phase on one PLACE plugin"""
class_ = plugin.__class__
elm_name = plugin.elm_module_name
try:
if issubclass(class_, Instrument):
new_data = plugin.update(
update_number, self.progress.experiment['plugins'][elm_name]['progress'])
if new_data is not None:
data = rfn.merge_arrays([data, new_data], flatten=True)
elif issubclass(class_, PostProcessing):
data = plugin.update(update_number, data.copy())
except RuntimeError as err:
self.progress.message = str(err)
self.cleanup_phase(abort=True)
raise
return data
[docs] def get_progress(self):
"""Return the progress message"""
return self.progress.to_dict()
[docs] def abort(self):
"""Tell the experiment to abort"""
self.progress.log("abort", "none")
self.abort_event.set()
def _programmatic_import(module_name, class_name, config, plotter):
"""Import a module based on string input.
This function takes a string for a module and a string for a class and
imports that class from the given module programmatically.
:param module_name: the name of the module to import from
:type module_name: str
:param class_name: the string of the class to import
:type class_name: str
:param config: the JSON configuration data for the module
:type config: dict
:param plotter: a PLACE plotting object which is attached to the PLACE
progress
:type plotter: PlacePlotter
:returns: an instance of the module matching the class and module name
:rtype: Instrument, PostProcessing, or Export object
:raises TypeError: if requested module has not been subclassed correctly
"""
module = import_module('place.plugins.' + module_name)
class_ = getattr(module, class_name)
if (not issubclass(class_, Instrument) and
not issubclass(class_, PostProcessing) and
not issubclass(class_, Export)):
raise TypeError(class_name + " is not a PLACE subclass")
# this calls the plugin's __init__() method
return class_(config, plotter)
def _remove_specific_items(config):
"""removes specific items from the config that should not be saved"""
# delete author, email, maintainer, and other metadata
# so they don't appear in the config.json file that is saved
fixed_config = copy.deepcopy(config)
for _, plugin in fixed_config['plugins'].items():
plugin['metadata'].pop('title', None)
plugin['metadata'].pop('authors', None)
plugin['metadata'].pop('email', None)
plugin['metadata'].pop('maintainer', None)
plugin['metadata'].pop('url', None)
plugin['metadata'].pop('default_priority', None)
return fixed_config