"""
Providing the "UV/VIS Pump IR Probe Split Sample" experiment. In this
experiment, a broad UV/VIS pump pulse is used to excite the sample. An
IR probe pulse is used to scan the samples' response. The UV/VIS stage
is used to move to different delay times between each collection of
data. Phase cycling is not necessary for UV/VIS pump experiments because
the scattered light cannot be observed on the MCT detector. The high
intensity and frequency of the pump light can cause the sample to burn
or create gas bubbles in the sample. An unnecessarily high exposure of
the sample to pump light must thus be avoided. This is achieved by
closing the pump shutter when the delay stage is moving. To distribute
the exposure to pump light a Lissajous scanner or a different
translation device can be used. The split sample method has a sample
holder with two compartments. One compartment contains the sample (the
actual molecules that should be observed) in a solvent which creates a
spectroscopic background that is of no interest. The other compartment
only contains the solvent (background). The goal is to subtract the
background and remove artifacts that are slow drift over time like the
pixel response background. For this experiment, the UV/VIS chopper must
be running.
#######################
Step by Step Algorithm:
#######################
**Acquisition:**
1. Preallocate dictionary (data container) which will contain data
and information about scan index, delay index etc.
2. Close Shutter
3. Move micrometer screw to other sample compartment after all
delays have been measured
4. Close shutter
5. Set the number of samples to acquire to account for weights
specified for the current delay as specified in the delay file
6. Move to delay (one at a time)
7. Open Shutter
8. Read the data from the ADC
9. Place data into dictionary and hand it over to
primary processing
**Primary Processing:**
1. Preallocate arrays for data, counts, weights, chopper.
Here there are 2 states. On (chopper high) and off
(chopper low). Here a new index dimension, namely the
different split sample positions are added in addition
to the delay index. Note that the compartment positions
are not "sorted" into separate states because we already
know beforehand which compartment was measured
(analogously to how it is done for the delays)
2. Subtract background from raw data (dark noise)
3. Linearize response of pixels
4. Calculate transmission, or more precisely, relative intensity
(probe intensity / reference intensity) for each laser shot for
each pixel pair
5. Identify the chopper states for all laser shots using the
corresponding channel(s) in the ADCs' data
6. Sort the data (transmissions) for each state and calculate
statistics
7. Average the data by weighting equally
8. Calculate shot to shot difference signal and its statistical
properties
9. Put this information into data container and hand it over to
secondary processing
10. Save data (and raw data) including counts, shot to shot signal,
weights and s2s_std if respective checkboxes on GUI were
checked.
**Secondary Processing:**
1. Calculate the absorption (-log10) of the sorted data
2. Calculate the pump probe difference signal (chopper high - chopper low)
3. Calculate the background corrected signal by subtracting
position 0 from position 1
4. Calculate the average intensities and standard deviation of
the intensities for each pixel
5. Put this information into data container
6. Calculate the numbers which are displayed in the
"statistics box" on the GUI
7. Hand over data to Pyqtplotting thread
**Pyqt Plotting:**
1. Remove old plots
2. Setup the plot that displays:
* Difference signal for pos 0 for current delay
* Difference signal for pos 1 for current delay
* Background corrected difference signal for current delay
* Single pos 0 time-signal heatmap
* Single pos 1 time-signal heatmap
* Single background corr time-signal heatmap
* Standard deviation of shot to shot signal
* Intensities and their standard deviation (multiplied by 5)
3. Plot the plots for the first time
4. Update plots
**Saving:**
Note that the separate files are saved for each compartment position
(here 0 and 1). Technically it is possible to introduce even more
compartments by giving the micrometer screw more positions to move to.
For this a new experiment would be recommended.
.. code-block::
programming data dimension:
[(2 ([0] is current average scan, [1] is last single scan), n_sample_positions, n_delays, n_probe_pixels, n_chopper_states)]
saving dimension:
[(n_probe_pixels, n_chopper_states)]
raw data dimension:
[(n_channels, samples_to_acquire)]
################
Folder Structure
################
In **scans/dXXX** there are two sets of 5 files for each scan. One set
for each of the compartments. The first file in a given set contains the
data in the saving dimensions. The counts file contains the number of
times a given state was observed. This should be used as weights when
averaging equally weighted. The weights file contains the inverse
variances of a given state for all pixels, etc. The dimensions of these
files are as the saving dimension suggests. These files can be used to
average in different ways in order to obtain the complete resulting
spectrum. Check the actual code of the corresponding experiment to
understand how to calculate the desired end result (difference
spectrum). The s2s_std file contains the standard deviation of the shot
to shot difference spectrum (this was used in the old software as
weights to average different scans. However, this method of averaging
has ambiguity and it has been proven difficult to reason why it should
work when averaging transmissions. It is saved so that the option to
revert to this avering method exists). Note that the dimensionality of
the s2s_std file is not as "saving dimension" suggests since it results
from difference spectra. This means that except the probe pixel
dimension every other dimension collapses. In addition to the s2s_std,
the shot to shot signal is saved now, too. This was implemented because
Jan Loeffler discovered that the shot to shot signal has better quality.
The data in **/averaged_data** is averaged equally weighted (using
counts). Likewise to the scan data the difference spectrum still needs
to be calculated from the transmissions for every state. Because the
array contains the transmissions averaged with counts it is only
intended to be used as a first indicator for the measurement.
The **/figure** folder is currently empty. It is possible to implement
plotting of figures which are saved here while the experiment is
running. This is however not implemented yet.
The **/hardware config** folder holds every file which contains hardware
configuration parameters. This folder is a compressed copy of the
hardware configuration folder in the software's directory. Here it is
possible to look up the ADC configuration to obtain what channel was
connected to which hardware element (e.g. chopper - ai78). It is also
possible to obtain the R-2R values, the FPAS configuration, the
linearization parameters, etc.
The **/raw data** folder is in the experiments directory only if the
"save raw data" checkbox on the GUI was checked. It contains the raw
(unedited... as raw as it can get) data. Basically, the voltages which
the ADC measured for each channel. The dimensions are as "raw data
dimensions" suggests.
The **background.npy** file is a copy of the most recently collected
dark noise background of the MCT detector array. It corrects for dark
noise. If no new background was collected before the experiment was
started the code will use the latest available background and display a
warning in the log.
**setupinfo.txt** is a ReadMe file that contains the most relevant
experimental parameters at first glance. Additionally, the user can
decide to write a comment in the readme editor of the GUI. The content
of this is written to the **notes.txt** file.
**probe_wn_axis.npy** contains the wavenumber axis which is generated by
the spectrometer triax.py class.
**delays.npy** contains the delays including weights.
.. code-block::
username/
├── date1_experimentname1_000/
│ ├── averaged_data
│ │ ├── d000_pos000_date1_experimentname1_000.npy
│ │ ├── d000_pos001_date1_experimentname1_000.npy
│ │ ├── ...
│ │ ├── d000_pos000_date1_experimentname1_000.npy
│ │ └── d999_pos001_date1_experimentname1_000.npy
│ ├── figures
│ ├── hardware config
│ ├── raw data
│ │ ├── delay000
│ │ │ ├── s000000_d000_pos000_date1_experimentname1_000_raw.npy
│ │ │ ├── s000000_d000_pos001_date1_experimentname1_000_raw.npy
│ │ │ ├── ...
│ │ │ ├── s000099_pos000_date1_experimentname1_000_raw.npy
│ │ │ └── s000099_pos001_date1_experimentname1_000_raw.npy
│ │ ├── ...
│ │ └── delay999
│ ├── scans
│ │ ├── delay000
│ │ │ ├── s000000_d000_pos000_date1_experimentname1_000.npy
│ │ │ ├── s000000_d000_pos000_counts_date1_experimentname1_000.npy
│ │ │ ├── s000000_d000_pos000_s2s_std_date1_experimentname1_000.npy
│ │ │ ├── s000000_d000_pos000_weights_date1_experimentname1_000.npy
│ │ │ ├── s000000_d000_pos001_date1_experimentname1_000.npy
│ │ │ ├── s000000_d000_pos001_counts_date1_experimentname1_000.npy
│ │ │ ├── s000000_d000_pos001_s2s_std_date1_experimentname1_000.npy
│ │ │ ├── s000000_d000_pos001_weights_date1_experimentname1_000.npy
│ │ │ ├── ...
│ │ │ └── s000099_d000_pos001_date1_experimentname1_000.npy
│ │ ├── ...
│ │ └── delay999
│ ├── probe_wn_axis_date1_experimentname1_000.npy
│ ├── delays_date1_experimentname1_000.npy
│ ├── setupinfo_date1_experimentname1_000.txt
│ ├── notes_date1_experimentname1_000.txt
│ └── background_date1_experimentname1_000.npy
├── date1_experimentname1_001/
├── date2_experimentname1_000/
└── date2_experimentname2_000/
"""
if __name__ == "__main__":
# Add directories to path for imports
import os, sys, inspect
currentdir = os.path.dirname(
os.path.abspath(inspect.getfile(inspect.currentframe()))
)
parentdir = os.path.dirname(currentdir)
sys.path.insert(0, parentdir)
sys.path.insert(0, os.path.join(parentdir, "hardware_interfaces"))
sys.path.insert(0, os.path.join(parentdir, "gui"))
import multiprocessing
from multiprocessing import Process, Queue
import sys
import threading
import numpy as np
from numpy import ndarray
# Matplotlib
import matplotlib
from matplotlib import pyplot as plt
from matplotlib.backends.backend_qt5agg import (
FigureCanvasQTAgg,
NavigationToolbar2QT as NavigationToolbar,
)
from matplotlib.figure import Figure
from matplotlib import cm
# PyQTGraph
from pyqtgraph import PlotWidget, plot
import pyqtgraph as pg
sys.path.append(r"C:\Users\hlab\Documents\iris\akb_software\gui")
from PyQt5 import QtWidgets, QtCore # , QtGui
from PyQt5.QtCore import QRunnable, QThreadPool, pyqtSignal, QObject
from qt_multithreading_wrapper import Worker
sys.path.append(r"C:\Users\hlab\Documents\iris\akb_software")
# Data processing
import data_processing as dp
from data_processing import PixelResponseLinearization as PRL
from save_data import SaveData, Background
# Hardware modules
from analog_digital_converter import AnalogDigitalConverter as ADC
from triax import Triax as Spectrometer
from pi_control import PiStage
from newport_control import NewportControl as MuScrew
from shutter import Shutter
[docs]class VisPumpIrProbeSplitSample:
"""
Args:
widget_pyqtgraph (QWidget): WidgetPyqtgraph object on which the
plots are going to be displayed. Has methods for plot
manipulation (i.e. removal of plots, autoscale).
delays (ndarray): Array containing the delays in fs that are
supposed to be measured in the 0th column and their
corresponding weights in the 1st column. I.e.: loaded from a
delay file which can be generated via the delay file editor.
* shape: 2D
* E.g.: (number of delays, 2)
positions (ndarray): Array containing the positions (in mm)
which the micrometer screw should move to in order to change
the sample compartment.
* shape: 2D
* E.g.: (2)
adc (ADC): Analog to digital converter hardware object which is
used to communicate with and read data from the ADC.
delay_stage (PiStage): PiStage hardware control object which
provides the interface to the delay stage needed for this
experiment.
mu_screw (MuScrew): NewportControl object that can be used to
move the micrometer screw.
shutter (Shutter): Shutter object that can be used to open and
close the UV/VIS (pump) shutter.
prl (PRL): Pixel response linearization object which grants
the functionality to linearize raw data according to the
linearization parameters specified in the corresponding
*pixel_linearization_fit_parameters.json* file (for each
lab).
chopper_info (dict): Contains the information that is necessary
to identify the different chopper states of the chopper that
chops the pump pulse. It contains the keys "high voltage
level" and "name". "high voltage level" is the voltage read
by the ADC when the chopper reference output is high. It is
needed as a reference for the digitization function that is
used. The "name" key is required to determine to which
channel of the adc the chopper is connected and its value
needs to match the corresponding key in index_dict.
background_handler (Background): Instance of Background class
which can access the most recently collected background.
This background is later subtracted from the raw data as
dark noise correction.
spectrometer (Spectrometer): Spectrometer/Triax hardware class
which grants functionality to control the triax
spectrometer. Needed to obtain e.g. wavenumber axis etc.
info_queue (Queue): Multiprocessing queue object which is used
to transfer/hand over information to lineEdits on GUI.
Contains (if applicable) scan index, delay index, interleave
index, values for statistics groupBox etc.
saver (SaveData): Object that manages saving of data (including
counts, weights, probe wavenumber axis etc.) into their
respective directories. If None is passed, no data is going
to be saved. If the raw data checkbox was checked on the GUI
the savers raw data attribute is set to True and the raw
data is saved automatically. Defaults to None.
"""
def __init__(
self,
widget_pyqtgraph,
delays: ndarray,
positions: ndarray,
adc: ADC,
delay_stage: PiStage,
mu_screw: MuScrew,
shutter: Shutter,
prl: PRL,
chopper_info: dict,
background_handler: Background,
spectrometer: Spectrometer,
info_queue: Queue,
saver: SaveData = None,
):
# Initialise Queues
self.acq_queue = Queue()
self.processing_queue = Queue()
self.plot_queue = Queue()
# Save wavenumber axis, delay files etc. to file system
if saver:
saver.save_other(spectrometer.wn_axis, "probe_wn_axis")
saver.save_other(delays, "delay_file")
saver.save_other(background_handler.load_background(), "background")
self.acquisition = Acquisition(
delays,
positions,
adc,
delay_stage,
mu_screw,
spectrometer,
shutter,
self.acq_queue,
)
self.primary_processing = PrimaryProcessing(
self.acq_queue,
self.processing_queue,
delays,
positions,
adc.pixel_idx,
adc.probe_pixel_idx,
adc.reference_pixel_idx,
adc.index_dict,
prl,
chopper_info,
background_handler,
saver,
)
self.secondary_processing = SecondaryProcessing(
self.processing_queue,
self.plot_queue,
info_queue,
delays,
positions,
adc.probe_pixel_idx,
saver,
)
self.plotting = PyqtPlotting(widget_pyqtgraph, adc, self.plot_queue, delays)
[docs] def start(self):
self.plotting.threadpool.start(self.plotting.work)
self.secondary_processing.start()
self.primary_processing.start()
self.acquisition.start()
[docs]class Acquisition(threading.Thread):
"""
Acquisition class (python multithreaded). This is where the actual
experiment is conducted.
This class is used to control the hardware devices required for the
experiment. The sole purpose of it is to collect the data according
to the parameters specified for the experiment by moving the delay
stages, opening and closing shutters etc.
A dictionary which will contain data and other information (scan
index etc.) is instantiated here. The acquisition class passes the
collected information (raw data, scan index etc.) to the primary
processing class.
Note:
**No data processing beyond what is required to conduct the
experiment should be implemented in this class.** The rationale
behind this is to minimize down time/ maximize laser time. Data
processing costs computation time and will, generally speaking,
slow down the measurement process because the computer is busy
while the rest of the hardware is idle. If implemented correctly
the data processing could be carried out while the data
acquisition is waiting for all data to become available. But
even in this scenario the problem that the data processing takes
longer than the acquisition time can occur and is thus best
avoided through parallelisation.
The reason why this class is a child of the threading module instead
of the multiprocessing module is that to use multiprocessing all
objects passed to the function must be picklable. This is not the
case for some of the objects interfacing with the hardware (e.g.
ADC). In an ideal scenario the acquisition too, would run in its own
process seperated from the GUI thread but this would only be
possible with major restructuring of the software.
Args:
delays (ndarray): Array containing the delays in fs that are
supposed to be measured in the 0th column and their
corresponding weights in the 1st column. I.e.: loaded from a
delay file which can be generated via the delay file editor.
* shape: 2D
* E.g.: (number of delays, 2)
positions (ndarray): Array containing the positions (in mm)
which the micrometer screw should move to in order to change
the sample compartment.
* shape: 2D
* E.g.: (2).
adc (ADC): Analog to digital converter hardware object which is
used to communicate with and read data from the ADC.
delay_stage (PiStage): PiStage hardware control object which
provides the interface to the delay stage needed for this
experiment.
mu_screw (MuScrew): NewportControl object that can be used to
move the micrometer screw.
spectrometer (Spectrometer): Spectrometer/Triax hardware class
which grants functionality to control the triax
spectrometer. Needed to obtain e.g. wavenumber axis etc.
shutter (Shutter): Shutter object that can be used to open and
close the UV/VIS (pump) shutter.
acq_queue (Queue): Multiprocessing queue object that the
acquisition thread uses to pass data to the primary
processing process.
"""
def __init__(
self,
delays: ndarray,
positions: ndarray,
adc: ADC,
delay_stage: PiStage,
mu_screw: MuScrew,
spectrometer: Spectrometer,
shutter: Shutter,
acq_queue: Queue,
):
threading.Thread.__init__(self)
# Assign attributes
self.delays = delays
self.positions = positions
self.adc = adc
self.delay_stage = delay_stage
self.mu_screw = mu_screw
self.spectrometer = spectrometer
self.shutter = shutter
# Multiprocessing Queue
self.acq_queue = acq_queue
# Save the base amount of samples
# to acquire
# This is needed to reset the weighting
# for each delay
self.base_samples = self.adc.samples_to_acquire
# Initialize scan index
self.scan_idx = 0
# Create dictionary that will hold data that is passed
# to other queues
self.data_container = {}
# Multithreading event to stop experiment
self.exit = threading.Event()
[docs] def run(self):
# * You might wonder why this acquisition
# * is structured differently from the acquisition
# * of the other (older/simpler) experiments
# * [Or you might not wonder -
# * Lets hope that someone fixed this discrepancy
# * already]
# * The way it is implemented here (the straight
# * forward way) was chosen because it simplifies
# * the indices counting a lot, and within multiprocessing
# * should not lead to any loss of laser time.
# * Before the 0th data set was acquired outside of
# * the loop and was processed while the second
# * acquisition was running
while not self.exit.is_set():
# Iterate all (for now two) positions for
# micrometer screw
for p_idx, position in enumerate(self.positions):
# Move to next position
# First close shutter before moving
# micrometer screw to avoid unnecessary
# exposition to UV/VIS pump light
self.shutter.close()
# Technically (to save laser time)
# It would make sense to not wait for
# screw to complete movement
# But directly starting to move
# delay stage and then wait
# for both
self.mu_screw.move(position)
# We set the positions
# into the data container as
# interleave index so it
# can be displayed on the GUI
self.data_container["interleave index"] = p_idx
# Iterate over all delays
for d_idx, delay in enumerate(self.delays):
# Move to next delay
# First close shutter before moving stage
# to avoid unnecessary exposition to
# UV/VIS pump light
self.shutter.close()
# Move delay stage
self.delay_stage.move(delay[0])
# Set samples to acquire according to weight
# for this delay
samples_to_acquire = int(round(self.base_samples * delay[1]))
self.adc.set_samples_to_acquire(samples_to_acquire)
# To measure data open shutter
self.shutter.open()
# Read data with given parameters
self.adc.read()
# Put data in acquisition queue
self.data_container["data"] = self.adc.data.copy()
self.data_container["scan index"] = self.scan_idx
self.data_container["delay index"] = d_idx
self.data_container["probe axis"] = self.spectrometer.wn_axis.copy()
# Give data of prior acquisition to queue
self.acq_queue.put(self.data_container.copy())
self.scan_idx += 1
# Tell processes to stop after last data
self.acq_queue.put("stop")
[docs] def shutdown(self):
# Setting exit will stop the loop
# within run
self.exit.set()
[docs]class PrimaryProcessing(multiprocessing.Process):
"""
Primary processing class (python multiprocess). This class' purpose
is to process the raw data to a state where it can be saved onto the
hard drive as npy (binary) files.
This step generally includes linearization, normalisation, sorting
and averaging. Besides the actual data additional
information that is required for post processing purposes is
calculated and saved. I.e. counts and weights. The last step
of primary processing should always be to pass the data to
secondary processing and save it to the hard disk.
Note:
The actual signal(s) are generally not intended to be calculated
here. Signals and other information that is supposed to be
displayed on the GUI should be calculated in secondary
processing. The main reason for this is minimizing the risk of
an error leading to a crash of the software which then in turn
ruins the measurement. The more code that has to run the more
likely a crash becomes. Others reasons mostly imply open
questions regarding averaging. Generally, shot to shot
normalized intensities that are sorted and averaged by their
state are saved (*m2 method*). From this - for a simple
experiment at least - the signal can be easily calculated while
offering different choices of averaging in post processing. For
more complex experiments e.g. VIPER or time domain experiments
the argument of saving sorted transmissions instead of signals
is even more compelling. In VIPER experiments more than one
signal of interest is present in the different states. Saving
each signal separately would actually increase the amount of
data that has to be saved. For time domain experiments we want
to save the data in the time domain for post processing reasons
like zeropadding and apodization.
The goal of primary processing is to make the data as compact as
possible while keeping as much information and flexibility as
possible. Even if the processes later on crash, the data is
secured and can be analysed in post processing.
Args:
acq_queue (Queue): Multiprocessing queue object that the
acquisition thread uses to pass data to the primary
processing process.
processing_queue (Queue): Multiprocessing queue object that the
primary processing process uses to pass data to the secondary
processing process.
delays (ndarray): Array containing the delays in fs that are
supposed to be measured in the 0th column and their
corresponding weights in the 1st column. I.e.: loaded from a
delay file which can be generated via the delay file editor.
* shape: 2D
* E.g.: (number of delays, 2)
positions (ndarray): Array containing the positions (in mm)
which the micrometer screw should move to in order to change
the sample compartment.
* shape: 2D
* E.g.: (2).
pixel_idx (ndarray): Array that contains the indices of the rows
in the ADCs' data that correspond to pixel input channels.
These are specified in the "analog input configuration.json"
for each laboratory and can be easily accessed with the
attribute "pixel_idx" of the ADC.
* shape: 1D
* E.g.: (64) or (128)
probe_pixel_idx (ndarray): Array that contains the indices of
the rows in the ADCs' data that correspond to *probe* pixel
input channels. These are specified in the "analog input
configuration.json" for each laboratory and can be easily
accessed with the attribute "probe_pixel_idx" of the ADC. It
is highly relevant that the order the pixels are listed in
this array match the order of the array in the
reference_pixel_idx argument. This means that if reference
pixel 3 is listed first in the other array here probe pixel
3 needs to be listed first as well and so on. Otherwise the
intensities on the probe array are not going to be
normalized correctly. For the plotting to work correctly the
pixels also need to be listed in the order of the wavenumber
axis array of the spectrometer.
* shape: 1D
* E.g.: (32) or (64)
reference_pixel_idx (ndarray): Array that contains the indices
of the rows in the ADCs' data that correspond to *reference*
pixel input channels. These are specified in the "analog
input configuration.json" for each laboratory and can be
easily accessed with the attribute "reference_pixel_idx" of
the ADC. It is highly relevant that the order the pixels are
listed in this array match the order of the array in the
probe_pixel_idx argument. This means that if probe pixel 10
is listed first in the other array here reference pixel 10
needs to be listed first as well and so on. Otherwise the
intensities on the probe array are not going to be
normalized correctly. For the plotting to work correctly the
pixels also need to be listed in the order of the wavenumber
axis array of the spectrometer.
* shape: 1D
* E.g.: (32) or (64)
index_dict (dict): Dictionary that maps the names of the input
channels to their corresponding row in the ADCs' data as
they are specified in the "analog input configuration.json"
for each laboratory. I.e.: It contains the information which
entries of the ADCs' data array belong choppers, wobblers
etc. This dictionary can be easily accessed with the
attribute "index_dict" of the ADC.
prl (PRL): Pixel response linearization object which grants
the functionality to linearize raw data according to the
linearization parameters specified in the corresponding
*pixel_linearization_fit_parameters.json* file (for each
lab).
chopper_info (dict): Contains the information that is necessary
to identify the different chopper states of the chopper that
chops the pump pulse. It contains the keys "high voltage
level" and "name". "high voltage level" is the voltage read
by the ADC when the chopper reference output is high. It is
needed as a reference for the digitization function that is
used. The "name" key is required to determine to which
channel of the adc the chopper is connected and its value
needs to match the corresponding key in index_dict.
background_handler (Background): Instance of Background class
which can access the most recently collected background.
This background is later subtracted from the raw data as
dark noise correction.
saver (SaveData): Object that manages saving of data (including
counts, weights, probe wavenumber axis etc.) into their
respective directories. If None is passed, no data is going
to be saved. If the raw data checkbox was checked on the GUI
the savers raw data attribute is set to True and the raw
data is saved automatically. Defaults to None.
"""
def __init__(
self,
acq_queue: Queue,
processing_queue: Queue,
delays: ndarray,
positions: ndarray,
pixel_idx: ndarray,
probe_pixel_idx: ndarray,
reference_pixel_idx: ndarray,
index_dict: dict,
prl: PRL,
chopper_info: dict,
background_handler: Background,
saver: SaveData = None,
):
super(multiprocessing.Process, self).__init__()
# Multiprocessing Queue
# Gets data from acquisition class
self.acq_queue = acq_queue
# Gives data to secondary processing class
self.processing_queue = processing_queue
# Pixel response linearisation
self.prl = prl
# Data saving instance
self.saver = saver
# Pixel index is an array which tells us which
# entries in our adc data are pixels
# Append probe and reference pixel indices
self.pixel_idx = pixel_idx
self.probe_pixel_idx = probe_pixel_idx
self.ref_pixel_idx = reference_pixel_idx
self.index_dict = index_dict
# Load background data from file
self.background = background_handler.load_background()
# In this experiment we have two different chopper states
self.n_chopper_states = np.array(2)
# Save chopper high voltage level divided by 2 as list
# We need this for the digitize function that identifies
# the High and Low Chopper
# We select half of the high voltage as the limit
# at which the distinction between states is done
self.chopper_voltage_level = [chopper_info["high voltage level"][0] / 2]
self.chopper_name = chopper_info["name"][0]
# Initialize array that holds both
# temporary and averaged data
# In this case the data is average
# relative intensity
# (transmission (probe/ref))
# for each pixel.
# By convention the averaged is the
# 0 th entry of the 0th axis of the array
# and the temp data is the 1st entry of the 0th
# axis of the array.
# dimensions: 2 = (current average scan, last single scan)
# dimensions: (2, sample_positions, n_delays, n_probe_pixels, n_chopper_states)
self.data = np.zeros(
(
2,
positions.size,
delays.shape[0],
self.probe_pixel_idx.size,
self.n_chopper_states,
)
)
self.counts = np.zeros(self.data.shape)
self.weights = np.zeros(self.data.shape)
[docs] def run(self):
while True:
# Get data / information from acquisition process
data_container = self.acq_queue.get()
if type(data_container) == str:
if data_container == "stop":
break
# Write data in dictionary into variable
raw_data = data_container["data"]
scan_idx = data_container["scan index"]
delay_idx = data_container["delay index"]
pos_idx = data_container["interleave index"]
# Subtract background from raw data
# of pixels and pixels only
background_corrected_data = (
raw_data[self.pixel_idx] - self.background[self.pixel_idx, np.newaxis]
)
# Linearize response of Pixels (and pixels only)
intensities = self.prl.linearize(background_corrected_data)
# Calculate transmission/ relative intensity
# (probe intensity / reference intensity)
transmission = (
intensities[self.probe_pixel_idx] / intensities[self.ref_pixel_idx]
)
# Get the corresponding chopper state for each shot
chopper_states = np.digitize(
raw_data[self.index_dict[self.chopper_name]], self.chopper_voltage_level
)
# Sort and average data
idx = (1, pos_idx, delay_idx)
(
self.data[idx],
self.weights[idx],
self.counts[idx],
statistics,
) = dp.sort_data(transmission, chopper_states, self.n_chopper_states)
# Create / Average "averaged" data by using weighted average
# between the temp data (weight = 1) and the already
# existing average data (weight = scan_idx) (technically
# it is the number of total samples that were acquired in a
# given state)
self.data[0, pos_idx, delay_idx] = np.average(
self.data[:, pos_idx, delay_idx],
axis=0,
weights=self.counts[:, pos_idx, delay_idx],
)
# Add everything to data container
# and give it to processing queue
data_container["sorted data"] = self.data.copy()
data_container["intensities"] = intensities.copy()
data_container["transmission"] = transmission.copy()
data_container["statistics"] = statistics
data_container["chopper states"] = chopper_states
# * The following code should be moved to
# * secondary processing once it is clear
# * whether or not it is actually needed
# * to average scans in post processing
# Calculate the s2s (shot to shot) difference signal
# and its statistical information
# We are not interested in the 0th element of the tuple
# which represents the shot to shot averaged difference
# signal - we calculated it correctly with the transmissions
# above
(
s2s_signal,
s2s_amp_signal,
s2s_std_signal,
s2s_avg_std_signal,
) = dp.shot_to_shot_signal(
data_container["transmission"],
chopper_state=data_container["chopper states"][0],
)
data_container["std s2s signal"] = s2s_std_signal # s2s: shot to shot
# Average s2s signal amplitude
data_container["s2s signal amplitude"] = s2s_amp_signal # s2s: shot to shot
# Average standard deviation of signal over all wavenumbers
data_container[
"s2s signal average std"
] = s2s_avg_std_signal # s2s: shot to shot
# *-------------
self.processing_queue.put(data_container.copy())
# ----------------------------------------
# Save data (if specified)
if self.saver:
# Use saver class to save data
self.saver.save_scan(
self.data[1, pos_idx, delay_idx],
scan_idx,
delay_idx=delay_idx,
pos=pos_idx,
)
self.saver.save_avg(
self.data[0, pos_idx, delay_idx], delay_idx=delay_idx, pos=pos_idx
)
self.saver.save_counts(
self.counts[1, pos_idx, delay_idx],
scan_idx,
delay_idx=delay_idx,
pos=pos_idx,
)
self.saver.save_weights(
self.weights[1, pos_idx, delay_idx],
scan_idx,
delay_idx=delay_idx,
pos=pos_idx,
)
# *-------------
self.saver.save_s2s_std(
s2s_std_signal, scan_idx, delay_idx=delay_idx, pos=pos_idx
)
self.saver.save_s2s_signal(s2s_signal, scan_idx, delay_idx=delay_idx)
# *-------------
if self.saver.raw_data:
self.saver.save_raw_data(
raw_data, scan_idx, delay_idx=delay_idx, pos=pos_idx
)
# Update counts
self.counts[0, pos_idx, delay_idx] += self.counts[1, pos_idx, delay_idx]
# Once stop signal was received the function breaks out of the loop.
# Now we need to tell the secondary processing to stop.
self.processing_queue.put("stop")
[docs]class SecondaryProcessing(multiprocessing.Process):
"""
Secondary processing class (python multiprocess). This class is used
to process the data from the primary processing class such that it
can be displayed on the GUI within plots and lineEdits. This
generally implies (if applicable):
* calculation of signals
* calculation of statistics like standard deviation of
intensities and shot to shot standard deviation of signal
* interpolation for 2D / heatmap plots (see comments in code why
this is necessary)
* Fourier transform and phasing for time domain data
This data is handed over to the plotting thread.
Note:
The feature of saving figures/plots to the hard drive should be
implemented here if it is needed.
Args:
processing_queue (Queue): Multiprocessing queue object that the
primary processing process uses to pass data to the secondary
processing process.
plot_queue (Queue): Multiprocessing queue object that the
secondary processing process uses to pass data to the plot
thread.
info_queue (Queue): Multiprocessing queue object which is used
to transfer/hand over information to lineEdits on GUI.
Contains (if applicable) scan index, delay index, interleave
index, values for statistics groupBox etc.
delays (ndarray): Array containing the delays in fs that are
supposed to be measured in the 0th column and their
corresponding weights in the 1st column. I.e.: loaded from a
delay file which can be generated via the delay file editor.
* shape: 2D
* E.g.: (number of delays, 2)
positions (ndarray): Array containing the positions (in mm)
which the micrometer screw should move to in order to change
the sample compartment.
* shape: 2D
* E.g.: (2).
probe_pixel_idx (ndarray): Array that contains the indices of
the rows in the ADCs' data that correspond to *probe* pixel
input channels. These are specified in the "analog input
configuration.json" for each laboratory and can be easily
accessed with the attribute "probe_pixel_idx" of the ADC. It
is highly relevant that the order the pixels are listed in
this array match the order of the array in the
reference_pixel_idx argument. This means that if reference
pixel 3 is listed first in the other array here probe pixel
3 needs to be listed first as well and so on. Otherwise the
intensities on the probe array are not going to be
normalized correctly. For the plotting to work correctly the
pixels also need to be listed in the order of the wavenumber
axis array of the spectrometer.
* shape: 1D
* E.g.: (32) or (64)
saver (SaveData): Object that manages saving of data (including
counts, weights, probe wavenumber axis etc.) into their
respective directories. If None is passed, no data is going
to be saved. If the raw data checkbox was checked on the GUI
the savers raw data attribute is set to True and the raw
data is saved automatically. Defaults to None.
"""
def __init__(
self,
processing_queue: Queue,
plot_queue: Queue,
info_queue: Queue,
delays: ndarray,
positions: ndarray,
probe_pixel_idx: ndarray,
saver: SaveData = None,
):
super(multiprocessing.Process, self).__init__()
# Multiprocessing Queue
# Gets data from acquisition class
self.processing_queue = processing_queue
# Gives data to secondary processing class
self.plot_queue = plot_queue
# Give experimental status and statistics to
# GUI
self.info_queue = info_queue
self.delays = delays
# The file saver is required the save the resulting plot
self.saver = saver
# Pixel index is an array which tells us which
# entries in our adc data are pixels
# In this case we only need this to
# preallocate our signal array
self.probe_pixel_idx = probe_pixel_idx
# Preallocate array that will hold signal information
# By convention the averaged is the
# 0 th entry of the 0th axis of the array
# and the temp data is the 1st entry of the 0th
# axis of the array.
self.signal = np.zeros(
(2, positions.size, delays.shape[0], self.probe_pixel_idx.size)
)
# Background corrected signal
# where the 0th position will be subtracted
# from the 1st position
# (difference signal of the difference signal)
self.background_corr_signal = np.zeros(
(2, delays.shape[0], self.probe_pixel_idx.size)
)
# Needed for 2D heatmap /contour plot
self.intp_signal = [None, None, None]
# if saver:
# # Initialize matplotlib figure that is going
# # to be saved as pdf
# # fig1: overview over delays
# self.fig, self.axes = plt.subplots(
# nrows = self.delays.shape[0]+1,
# ncols = 3,
# # We chose our figure to be the width of a din a4 page
# # We want for rows of plot per din a4 page
# figsize = (8.27, 11.69*(self.delays.shape[0]+1/4))
# )
# # Setup titles, xlabels, ylabels
# [ax.set_xlabel("wavenumber cm$^{-1}$") for ax in self.axes.flatten()]
# self.axes[0, 0].set_xlabel("delay [fs]")
# self.axes[0, 0].set_ylabel("wavenumber [cm$^{-1}$]")
# self.axes[0, 0].set_title("Difference signal")
# self.axes[0, 1].set_xlabel("delay [fs]")
# self.axes[0, 1].set_ylabel("wavenumber [cm$^{-1}$]")
# self.axes[0, 1].set_title("Average difference signal")
# for i, delay in enumerate(self.delays):
# self.axes[i+1, 0].set_title("Signal of current scan, delay {} fs".format(delay))
# self.axes[i+1, 1].set_title("Average signal, delay {} fs".format(delay))
# self.axes[i+1, 2].set_title("Intensities of current scan, delay {} fs".format(delay))
# self.axes[i+1, 0].set_xlabel("wavenumber [cm$^{-1}$]")
# self.axes[i+1, 0].set_ylabel("difference signal [OD]")
# self.axes[i+1, 1].set_xlabel("wavenumber [cm$^{-1}$]")
# self.axes[i+1, 1].set_ylabel("difference signal [OD]")
# self.axes[i+1, 2].set_xlabel("wavenumber [cm$^{-1}$]")
# self.axes[i+1, 2].set_ylabel("intensity [a.u.]")
[docs] def run(self):
while True:
# Get data / information from acquisition process
data_container = self.processing_queue.get()
if type(data_container) == str:
if data_container == "stop":
break
# ----------------------------------------
# Calculate information for plotting and GUI
# We first calculate everything that is required
# for plotting and pass it to the plot queue because
# plotting can also cost time. Only then we
# calculate the values that we want to display on
# line edits on the GUI
# Get delay index
delay_idx = data_container["delay index"]
pos_idx = data_container["interleave index"]
# Calculate the pump probe signal from the sorted data
sorted_data = data_container["sorted data"]
absorption = -np.log10(
sorted_data[:, pos_idx, delay_idx, :, :]
) #! Comments
self.signal[:, pos_idx, delay_idx, :] = (
absorption[:, :, 1] - absorption[:, :, 0]
)
# subtract position 0 from position 1
# of the data to calculate the
# background corrected signal
# (difference signal of the difference signal)
self.background_corr_signal[:, delay_idx] = (
self.signal[:, 1, delay_idx] - self.signal[:, 0, delay_idx]
)
# Prepare data for 2D heatmap+contour plotting
# Single scan data pos 0
self.intp_signal[0] = dp.generate_img_data(
self.delays[:, 0], data_container["probe axis"], self.signal[1, 0, :].T
)
# Single scan data pos 1
self.intp_signal[1] = dp.generate_img_data(
self.delays[:, 0], data_container["probe axis"], self.signal[1, 1, :].T
)
# Single scan background corrected data
self.intp_signal[2] = dp.generate_img_data(
self.delays[:, 0],
data_container["probe axis"],
self.background_corr_signal[1, :].T,
)
# Calculate the average intensity for each pixel
avg_intensities = np.average(data_container["intensities"], axis=1)
# Calculate the standard deviation of the
# intensities
std_intensities = np.std(data_container["intensities"], axis=1, ddof=1)
# Add everything to data container
# and give to plot queue
data_container["signal"] = self.signal
data_container["background corrected data"] = self.background_corr_signal
data_container["signal (interpol)"] = self.intp_signal
data_container["average intensity"] = avg_intensities
data_container["std intensity"] = std_intensities
self.plot_queue.put(data_container.copy())
# ----------------------------------------
# Calculate everything that is needed for statistical information
# on GUI and add it to data container and give it to info queue
# Calculate the average intensities over all pixels
data_container["mean state intensity"] = np.average(
data_container["transmission"], axis=1
)
data_container["mean state std"] = data_container["statistics"][1]
self.info_queue.put(data_container.copy())
# if self.saver:
# scan_idx = data_container["scan index"]
# [ax.clear() for ax in self.axes[delay_idx+1,:]]
# self.axes[delay_idx+1,0].plot(data_container["probe axis"], self.signal[1, delay_idx])
# self.axes[delay_idx+1,1].plot(data_container["probe axis"], self.signal[0, delay_idx])
# self.axes[delay_idx+1,2].plot(data_container["probe axis"], data_container["average intensity"][self.probe_pixel_idx])
# # Save figure once last delay was processed
# if delay_idx +1 == self.delays.shape[0]:
# self.axes[0,0].clear()
# self.axes[0,1].clear()
# self.fig.suptitle("UV/VIS Pump - IR Probe: Scan {}".format(scan_idx))
# X, Y = np.meshgrid(self.delays[:,0], data_container["probe axis"]) # Inefficient
# self.axes[0,0].contour(X, Y, self.signal[1].T)
# self.axes[0,1].contour(X, Y, self.signal[0].T)
# save_path = self.saver.save_figures(scan_idx)
# plt.savefig(save_path+".pdf")
# Once stop signal was received the function breaks out of the loop.
# Now we need to tell the plotting and updating of info on GUI to stop.
self.plot_queue.put("stop")
self.info_queue.put("stop")
[docs]class PyqtPlotting:
"""
Pyqt Plotting class (Qt multithreaded). This class is necessary for
displaying plots on the GUI. PyQtGraph is used as the plotting
engine. Generally the plots are set up first (type of plot, layout,
title etc.). On the first run, the plots are drawn for the first
time. Then the plots are updated every iteration. We update the same
plot references every time to make it more efficient.
Args:
widget_pyqtgraph (QWidget): WidgetPyqtgraph object on which the
plots are going to be displayed. Has methods for plot
manipulation (i.e. removal of plots, autoscale).
adc (ADC): Analog to digital converter hardware object which is
used to communicate with and read data from the ADC.
plot_queue (Queue): Multiprocessing queue object that the
secondary processing process uses to pass data to the plot
thread.
delays (ndarray): Array containing the delays in fs that are
supposed to be measured in the 0th column and their
corresponding weights in the 1st column. I.e.: loaded from a
delay file which can be generated via the delay file editor.
* shape: 2D
* E.g.: (number of delays, 2)
"""
[docs] class Signals(QObject):
new_data = pyqtSignal(dict)
def __init__(self, widget_pyqtgraph, adc: ADC, plot_queue, delays: ndarray):
# Assign attributes
self.adc = adc
self.widget_pyqtgraph = widget_pyqtgraph
self.graphics_layout = widget_pyqtgraph.graphics_layout
self.plot_queue = plot_queue
self.delays = delays
# Setup signals and threadpool
self.threadpool = QThreadPool()
self.signals = self.Signals()
# Clear old plots
self.widget_pyqtgraph.remove_plots()
# Create dictionary that holds reference to lines etc.
self.plot_ref = {}
# Add a sub-layout to hold the first 2 plots in the first row
self.widget_pyqtgraph.plots["upper layout"] = self.graphics_layout.addLayout(
colspan=4
)
# Setup plot that holds plot for signal of position 0 for current delay
# (0th position of sample container)
self.widget_pyqtgraph.plots[
"signal pos 0 current delay"
] = self.widget_pyqtgraph.plots["upper layout"].addPlot(colspan=2)
self.widget_pyqtgraph.plots["signal pos 0 current delay"].setTitle(
"Position 0: Signal for delay time {} fs"
)
self.widget_pyqtgraph.plots["signal pos 0 current delay"].setLabel(
"bottom", "wavenumber [cm<sup>-1</sup>]"
)
self.widget_pyqtgraph.plots["signal pos 0 current delay"].setLabel(
"left", "difference signal [OD]"
)
self.widget_pyqtgraph.set_style(
self.widget_pyqtgraph.plots["signal pos 0 current delay"]
)
# Setup plot that holds plot for signal of position 1 for current delay
self.widget_pyqtgraph.plots[
"signal pos 1 current delay"
] = self.widget_pyqtgraph.plots["upper layout"].addPlot(colspan=2)
self.widget_pyqtgraph.plots["signal pos 1 current delay"].setTitle(
"Position 1: Signal for delay time {} fs"
)
self.widget_pyqtgraph.plots["signal pos 1 current delay"].setLabel(
"bottom", "wavenumber [cm<sup>-1</sup>]"
)
self.widget_pyqtgraph.plots["signal pos 1 current delay"].setLabel(
"left", "difference signal [OD]"
)
self.widget_pyqtgraph.set_style(
self.widget_pyqtgraph.plots["signal pos 1 current delay"]
)
# Setup plot that holds plot for background corrected signal for current delay
self.widget_pyqtgraph.plots[
"signal background corr current delay"
] = self.widget_pyqtgraph.plots["upper layout"].addPlot(colspan=2)
self.widget_pyqtgraph.plots["signal background corr current delay"].setTitle(
"Position 1 - 0: Difference signal for delay time {} fs"
)
self.widget_pyqtgraph.plots["signal background corr current delay"].setLabel(
"bottom", "wavenumber [cm<sup>-1</sup>]"
)
self.widget_pyqtgraph.plots["signal background corr current delay"].setLabel(
"left", "difference difference signal [OD]"
)
self.widget_pyqtgraph.set_style(
self.widget_pyqtgraph.plots["signal background corr current delay"]
)
# Skip to next row
self.graphics_layout.nextRow()
# Add a sub-layout to hold the 2D plots and the histograms in the second row
self.widget_pyqtgraph.plots["middle layout"] = self.graphics_layout.addLayout(
colspan=4
)
# Setup plots that holds heatmap plot y-axis: probe wavenumber, x-axis: delay
# We only plot the single scan data
# 0th position of sample container
self.widget_pyqtgraph.plots[
"single pos 0 time-signal heatmap"
] = self.widget_pyqtgraph.plots["middle layout"].addPlot()
self.widget_pyqtgraph.plots["single pos 0 time-signal heatmap"].setTitle(
"Position 0: Signal with respect to delay time"
)
self.widget_pyqtgraph.plots["single pos 0 time-signal heatmap"].setLabel(
"bottom", "pump-probe delay [fs]"
)
self.widget_pyqtgraph.plots["single pos 0 time-signal heatmap"].setLabel(
"left", "wavenumber [cm<sup>-1</sup>]"
)
self.widget_pyqtgraph.set_style(
self.widget_pyqtgraph.plots["single pos 0 time-signal heatmap"]
)
# Add the HistogramLUTItem to the plotlayout directly after 2D plot
# Also add the item. With that it will be directly drawn at the correct position
self.plot_ref[
"single pos 0 time-signal heatmap histogram"
] = pg.HistogramLUTItem()
self.widget_pyqtgraph.plots["middle layout"].addItem(
self.plot_ref["single pos 0 time-signal heatmap histogram"]
)
# 1st position of sample container
self.widget_pyqtgraph.plots[
"single pos 1 time-signal heatmap"
] = self.widget_pyqtgraph.plots["middle layout"].addPlot()
self.widget_pyqtgraph.plots["single pos 1 time-signal heatmap"].setTitle(
"Position 1: Signal with respect to delay time"
)
self.widget_pyqtgraph.plots["single pos 1 time-signal heatmap"].setLabel(
"bottom", "pump-probe delay [fs]"
)
self.widget_pyqtgraph.plots["single pos 1 time-signal heatmap"].setLabel(
"left", "wavenumber [cm<sup>-1</sup>]"
)
self.widget_pyqtgraph.set_style(
self.widget_pyqtgraph.plots["single pos 1 time-signal heatmap"]
)
# Add the HistogramLUTItem to the plotlayout directly after 2D plot
# Also add the item. With that it will be directly drawn at the correct position
self.plot_ref[
"single pos 1 time-signal heatmap histogram"
] = pg.HistogramLUTItem()
self.widget_pyqtgraph.plots["middle layout"].addItem(
self.plot_ref["single pos 1 time-signal heatmap histogram"]
)
# Background corrected signal
self.widget_pyqtgraph.plots[
"single background corr time-signal heatmap"
] = self.widget_pyqtgraph.plots["middle layout"].addPlot()
self.widget_pyqtgraph.plots[
"single background corr time-signal heatmap"
].setTitle("Position 1 - 0: Signal with respect to delay time")
self.widget_pyqtgraph.plots[
"single background corr time-signal heatmap"
].setLabel("bottom", "pump-probe delay [fs]")
self.widget_pyqtgraph.plots[
"single background corr time-signal heatmap"
].setLabel("left", "wavenumber [cm<sup>-1</sup>]")
self.widget_pyqtgraph.set_style(
self.widget_pyqtgraph.plots["single background corr time-signal heatmap"]
)
# Add the HistogramLUTItem to the plotlayout directly after 2D plot
# Also add the item. With that it will be directly drawn at the correct position
self.plot_ref[
"single background corr time-signal heatmap histogram"
] = pg.HistogramLUTItem()
self.widget_pyqtgraph.plots["middle layout"].addItem(
self.plot_ref["single background corr time-signal heatmap histogram"]
)
# Setup colormap for heatmaps
# Credit: https://github.com/pyqtgraph/pyqtgraph/issues/561
colormap = cm.get_cmap("seismic") # cm.get_cmap("CMRmap")
colormap._init()
# [:-3,:] because the last values of the colormap are fringe
# cases which are matplotlib specific and do not define our
# colormap
self.lut = (colormap._lut * 255).view(np.ndarray)[
:-3, :
] # Convert matplotlib colormap from 0-1 to 0 -255 for Qt
# Add next row for the last row of plots. here there are 2
self.graphics_layout.nextRow()
# Setup plot that displays intensities
self.widget_pyqtgraph.plots["intensities"] = self.graphics_layout.addPlot(
row=2, col=1
)
self.widget_pyqtgraph.plots["intensities"].setTitle("Average intensities")
self.widget_pyqtgraph.plots["intensities"].setLabel(
"bottom", "wavenumber [cm<sup>-1</sup>]"
)
self.widget_pyqtgraph.plots["intensities"].setLabel("left", "intensity [a.u.]")
self.widget_pyqtgraph.set_style(self.widget_pyqtgraph.plots["intensities"])
# Setup plot that displays standard deviation of signal
# Make the title of std signal so that the Average standard deviation
# of signal over all wavenumbers is displayed. HTML is
# used because setTitle works with it.
self.widget_pyqtgraph.plots["std signal"] = self.graphics_layout.addPlot(
row=2, col=0
)
self.widget_pyqtgraph.plots["std signal"].setTitle(
"Standard deviation of pump-probe signal"
)
self.widget_pyqtgraph.plots["std signal"].setLabel(
"bottom", "wavenumber [cm<sup>-1</sup>]"
)
self.widget_pyqtgraph.plots["std signal"].setLabel(
"left", "difference signal [OD]"
)
self.widget_pyqtgraph.set_style(self.widget_pyqtgraph.plots["std signal"])
# Connect signal that data has arrived to update the plot
self.signals.new_data.connect(
lambda data_container: self.update_plot(data_container)
)
# Start loop that gets data from queue in Qt Thread
self.work = Worker(self.run)
[docs] def run(self):
while True:
data_container = self.plot_queue.get()
if type(data_container) == str:
if data_container == "stop":
break
self.signals.new_data.emit(data_container)
[docs] def update_plot(self, data_container):
# Plot for the first time to get line references
delay_idx = data_container["delay index"]
signal = data_container["signal"]
background_corr_signal = data_container["background corrected data"]
intp_signal = data_container["signal (interpol)"]
probe_axis = data_container["probe axis"]
avg_intensities = data_container["average intensity"]
std_intensities = data_container["std intensity"]
std_signal = data_container["std s2s signal"]
# Update title with current delay
self.widget_pyqtgraph.plots["signal pos 0 current delay"].setTitle(
"Position 0: Signal for delay time {} fs".format(self.delays[delay_idx][0])
)
self.widget_pyqtgraph.plots["signal pos 1 current delay"].setTitle(
"Position 1: Signal for delay time {} fs".format(self.delays[delay_idx][0])
)
self.widget_pyqtgraph.plots["signal background corr current delay"].setTitle(
"Position 1 - 0: Difference signal for delay time {} fs".format(
self.delays[delay_idx][0]
)
)
# Needs to be bigger than three because
# we define two colorbar/histogram items
# in init which are already in plot ref
if len(self.plot_ref) > 3:
# Update signal plots
# 0th position of sample container
# Single scan
self.plot_ref["single signal pos 0 current delay"].setData(
x=probe_axis, y=signal[1, 0, delay_idx]
)
# Scan averaged
self.plot_ref["avg signal pos 0 current delay"].setData(
x=probe_axis, y=signal[0, 0, delay_idx]
)
# 1st position of sample container
# Single scan
self.plot_ref["single signal pos 1 current delay"].setData(
x=probe_axis, y=signal[1, 1, delay_idx]
)
# Scan averaged
self.plot_ref["avg signal pos 1 current delay"].setData(
x=probe_axis, y=signal[0, 1, delay_idx]
)
# Background corrected signal
# Single scan
self.plot_ref["single signal background corr current delay"].setData(
x=probe_axis, y=background_corr_signal[1, delay_idx]
)
# Scan averaged
self.plot_ref["avg signal background corr current delay"].setData(
x=probe_axis, y=background_corr_signal[0, delay_idx]
)
# -----------Start 2D Plots---------------
# Update 2d image: time vs. signal(wavenumber)
# 0th position of sample container
# Get old levels so it does not rescale the colors everytime (see below)
levels = self.plot_ref[
"single pos 0 time-signal heatmap histogram"
].getLevels()
self.plot_ref["single pos 0 time-signal heatmap"].setImage(intp_signal[0])
# "Disable" autoscale after 0th scan has run.
if data_container["scan index"] > 0:
# Set to old levels of the histogram
self.plot_ref["single pos 0 time-signal heatmap histogram"].setLevels(
*levels
)
# Update contour lines
dp.update_contour_lines(
intp_signal[0],
self.plot_ref["single pos 0 time-signal heatmap contours"],
)
# 1st position of sample container
# Get old levels so it does not rescale the colors everytime (see below)
levels = self.plot_ref[
"single pos 1 time-signal heatmap histogram"
].getLevels()
self.plot_ref["single pos 1 time-signal heatmap"].setImage(intp_signal[1])
# "Disable" autoscale after 0th scan has run.
if data_container["scan index"] > 0:
# Set to old levels of the histogram
self.plot_ref["single pos 1 time-signal heatmap histogram"].setLevels(
*levels
)
# Update contour lines
dp.update_contour_lines(
intp_signal[1],
self.plot_ref["single pos 1 time-signal heatmap contours"],
)
# Background corrected signal
# 1st position of sample container
# Get old levels so it does not rescale the colors everytime (see below)
levels = self.plot_ref[
"single background corr time-signal heatmap histogram"
].getLevels()
self.plot_ref["single background corr time-signal heatmap"].setImage(
intp_signal[2]
)
# "Disable" autoscale after 0th scan has run.
if data_container["scan index"] > 0:
# Set to old levels of the histogram
self.plot_ref[
"single background corr time-signal heatmap histogram"
].setLevels(*levels)
# Update contour lines
dp.update_contour_lines(
intp_signal[2],
self.plot_ref["single background corr time-signal heatmap contours"],
)
# ----------End 2D Plots----------------
# Update intensity error bars for probe array
self.plot_ref["probe error bars"].setData(
x=probe_axis,
y=avg_intensities[self.adc.probe_pixel_idx],
height=5 * std_intensities[self.adc.probe_pixel_idx],
)
# Update intensity error bars for reference array
self.plot_ref["ref error bars"].setData(
x=probe_axis,
y=avg_intensities[self.adc.reference_pixel_idx],
height=5 * std_intensities[self.adc.reference_pixel_idx],
)
# Update intensities
self.plot_ref["probe intensities"].setData(
x=probe_axis, y=avg_intensities[self.adc.probe_pixel_idx]
)
self.plot_ref["ref intensities"].setData(
x=probe_axis, y=avg_intensities[self.adc.reference_pixel_idx]
)
# Plot standard deviation of pseudo signal
self.plot_ref["std signal"].setData(x=probe_axis, y=std_signal)
else:
# First time plotting
signal_pen = pg.mkPen(color="#d62728", width=2.5, style=QtCore.Qt.SolidLine)
avg_signal_pen = pg.mkPen(
color="#17becf", width=2.5, style=QtCore.Qt.SolidLine
)
probe_pen = pg.mkPen(color="#1f77b4", width=2.5, style=QtCore.Qt.SolidLine)
reference_pen = pg.mkPen(
color="#ff7f0e", width=2.5, style=QtCore.Qt.SolidLine
)
std_intensity_pen = pg.mkPen(
color="#7f7f7f", width=1.5, style=QtCore.Qt.SolidLine
) # ? dashed lines?
std_signal_pen = pg.mkPen(
color="#2ca02c", width=2.5, style=QtCore.Qt.SolidLine
)
# Histogram colormap
seismic = pg.graphicsItems.GradientEditorItem.Gradients["seismic"]
# Plot signal
# 0th position of sample container
# Single scan
self.plot_ref[
"single signal pos 0 current delay"
] = self.widget_pyqtgraph.plots["signal pos 0 current delay"].plot(
x=probe_axis,
y=signal[1, 0, delay_idx],
name="single scan pump-probe signal pos 0",
pen=signal_pen,
)
# Scan averaged
self.plot_ref[
"avg signal pos 0 current delay"
] = self.widget_pyqtgraph.plots["signal pos 0 current delay"].plot(
x=probe_axis,
y=signal[0, 0, delay_idx],
name="scan averaged pump-probe signal pos 0",
pen=avg_signal_pen,
)
# 1st position of sample container
# Single scan
self.plot_ref[
"single signal pos 1 current delay"
] = self.widget_pyqtgraph.plots["signal pos 1 current delay"].plot(
x=probe_axis,
y=signal[1, 1, delay_idx],
name="single scan pump-probe signal pos 1",
pen=signal_pen,
)
# Scan averaged
self.plot_ref[
"avg signal pos 1 current delay"
] = self.widget_pyqtgraph.plots["signal pos 1 current delay"].plot(
x=probe_axis,
y=signal[0, 1, delay_idx],
name="scan averaged pump-probe signal pos 1",
pen=avg_signal_pen,
)
# Background corrected signal
# Single scan
self.plot_ref[
"single signal background corr current delay"
] = self.widget_pyqtgraph.plots[
"signal background corr current delay"
].plot(
x=probe_axis,
y=background_corr_signal[1, delay_idx],
name="single scan pump-probe signal (background corrected)",
pen=signal_pen,
)
# Scan averaged
self.plot_ref[
"avg signal background corr current delay"
] = self.widget_pyqtgraph.plots[
"signal background corr current delay"
].plot(
x=probe_axis,
y=background_corr_signal[0, delay_idx],
name="scan averaged pump-probe signal (background corrected)",
pen=avg_signal_pen,
)
# Plot 2d image: time vs. signal (wavenumber)
# We only display the non averaged data
# 0th position of sample container
self.plot_ref["single pos 0 time-signal heatmap"] = pg.ImageItem(
intp_signal[0]
)
# Generate a scrollable colorbar
# This generates a histogram with
# which it is possible to scale the
# Data which will be displayed in
# the heatmaps
# First create a reference for the histogram
# which containes the image item "single time-signal heatmap"
# This basically means, that HistogramLUTItem contains the data
# from the heatmap
self.plot_ref["single pos 0 time-signal heatmap histogram"].setImageItem(
self.plot_ref["single pos 0 time-signal heatmap"]
)
# Set the color levels of the histogram
self.plot_ref[
"single pos 0 time-signal heatmap histogram"
].gradient.restoreState(seismic)
# Scale image to match axes
dp.scale_img(
self.delays[:, 0],
probe_axis,
intp_signal[0],
self.plot_ref["single pos 0 time-signal heatmap"],
)
self.widget_pyqtgraph.plots["single pos 0 time-signal heatmap"].addItem(
self.plot_ref["single pos 0 time-signal heatmap"]
)
self.plot_ref["single pos 0 time-signal heatmap"].setLookupTable(self.lut)
# Generate contour lines
self.plot_ref[
"single pos 0 time-signal heatmap contours"
] = dp.generate_contour_lines(
intp_signal[0], self.plot_ref["single pos 0 time-signal heatmap"]
)
# 1st position of sample container
self.plot_ref["single pos 1 time-signal heatmap"] = pg.ImageItem(
intp_signal[1]
)
self.plot_ref["single pos 1 time-signal heatmap histogram"].setImageItem(
self.plot_ref["single pos 1 time-signal heatmap"]
)
# Set the color levels of the histogram
self.plot_ref[
"single pos 1 time-signal heatmap histogram"
].gradient.restoreState(seismic)
# Scale image to match axes
dp.scale_img(
self.delays[:, 0],
probe_axis,
intp_signal[1],
self.plot_ref["single pos 1 time-signal heatmap"],
)
self.widget_pyqtgraph.plots["single pos 1 time-signal heatmap"].addItem(
self.plot_ref["single pos 1 time-signal heatmap"]
)
self.plot_ref["single pos 1 time-signal heatmap"].setLookupTable(self.lut)
# Generate contour lines
self.plot_ref[
"single pos 1 time-signal heatmap contours"
] = dp.generate_contour_lines(
intp_signal[1], self.plot_ref["single pos 1 time-signal heatmap"]
)
# Background corrected signal
self.plot_ref["single background corr time-signal heatmap"] = pg.ImageItem(
intp_signal[2]
)
self.plot_ref[
"single background corr time-signal heatmap histogram"
].setImageItem(self.plot_ref["single background corr time-signal heatmap"])
# Set the color levels of the histogram
self.plot_ref[
"single background corr time-signal heatmap histogram"
].gradient.restoreState(seismic)
# Scale image to match axes
dp.scale_img(
self.delays[:, 0],
probe_axis,
intp_signal[2],
self.plot_ref["single background corr time-signal heatmap"],
)
self.widget_pyqtgraph.plots[
"single background corr time-signal heatmap"
].addItem(self.plot_ref["single background corr time-signal heatmap"])
self.plot_ref["single background corr time-signal heatmap"].setLookupTable(
self.lut
)
# Generate contour lines
self.plot_ref[
"single background corr time-signal heatmap contours"
] = dp.generate_contour_lines(
intp_signal[2],
self.plot_ref["single background corr time-signal heatmap"],
)
# Create intensity error bars for probe array
self.plot_ref["probe error bars"] = pg.ErrorBarItem(
x=probe_axis,
y=avg_intensities[self.adc.probe_pixel_idx],
height=5 * std_intensities[self.adc.probe_pixel_idx],
beam=0.3,
pen=std_intensity_pen,
)
self.widget_pyqtgraph.plots["intensities"].addItem(
self.plot_ref["probe error bars"]
)
# Create intensity error bars for reference array
self.plot_ref["ref error bars"] = pg.ErrorBarItem(
x=probe_axis,
y=avg_intensities[self.adc.reference_pixel_idx],
height=5 * std_intensities[self.adc.reference_pixel_idx],
beam=0.3,
pen=std_intensity_pen,
)
self.widget_pyqtgraph.plots["intensities"].addItem(
self.plot_ref["ref error bars"]
)
# Plot intensities
self.plot_ref["probe intensities"] = self.widget_pyqtgraph.plots[
"intensities"
].plot(
x=probe_axis,
y=avg_intensities[self.adc.probe_pixel_idx],
name="Average intensities on probe array",
pen=probe_pen,
)
self.plot_ref["ref intensities"] = self.widget_pyqtgraph.plots[
"intensities"
].plot(
x=probe_axis,
y=avg_intensities[self.adc.reference_pixel_idx],
name="Average intensities on reference array",
pen=reference_pen,
)
# Plot standard deviation of pseudo signal
self.plot_ref["std signal"] = self.widget_pyqtgraph.plots[
"std signal"
].plot(
x=probe_axis,
y=std_signal,
name="Standard deviation of pump-probe signal",
pen=std_signal_pen,
)