# Import helper modules
import typing
from typing import List, Tuple
from numpy import ndarray
import logging
import os
from matplotlib import pyplot as plt
# Import essential modules
import serial
import numpy as np
import csv
import time
from pi_control import PiStage
from analog_digital_converter import AnalogDigitalConverter
# Set up logger
logger = logging.getLogger(__name__)
# Set up 'path'-like type
Path = typing.Union[str, bytes, os.PathLike]
AnalogDigitalConverter = (
typing.Any
) # TODO please correct this, I don't know how. Without a definition
# import will fail and so will doc building -GW
ndarray = typing.Any # TODO please correct this also -GW
[docs]class SerialInterferometerCounter:
"""
Connecting to the interferometer counter and preamplifier
electronics from Zürich with PySerial.
This class is contains methods for all serial commands provided in
the manual.
Args:
port (str): COM port to which the controller is connected.
Please check in the device manager of your operating system.
* E.g. "COM7"
name (str, optional): Name / Identifier to give to this
controller. This is relevant for log statements, especially when
there is more than one interferometer in the setup. Defaults to
"serial connection to interferometer".
Attributes:
port (str): COM port to which the controller is connected.
Please check in the device manager of your operating system.
* E.g. "COM7"
name (str, optional): Name / Identifier to give to this
controller. This is relevant for log statements, especially when
there is more than one interferometer in the setup. Defaults to
"serial connection to interferometer".
ser (serial.Serial): Pyserial object which initiates the
communication with the Counter.
References:
| Counter für Interferometer; Anleitung; Manual; Universität Zürich.pdf
| TimingScheme.pdf
| Driver (Win7): https://www.st.com/en/development-tools/stsw-stm32102.html
"""
def __init__(self, port: str, name: str = "serial connection to interferometer"):
# Initialize attributes
self.name = name
self.port = port
# Connecting the controller with pyserial
self.ser = serial.Serial(
port,
115200,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=1,
)
[docs] def read(self):
"""
Reads out all bytes from COM port until newline character
appears.
"""
self.ser.readline()
[docs] def end(self):
"""
Closes serial connection to the interferometer counter and
preamplifier.
"""
self.ser.close()
#! Set/ Get commands start here
[docs] def counter_value(self, value: int = None):
"""
Set the counter value
Data type: volatile
Args:
value (int, optional): Value between 0 ... 65535.
Defaults to None. If None, a Get command is send instead of
a Set command.
"""
# Choose between sending and getting commands
if value is not None:
logger.info("""Set counter value to {} for {}.""".format(value, self.name))
command = "S 0 {}\r\n".format(value)
self.ser.write(command.encode())
elif value is None:
logger.info("""Read current counter value for {}.""".format(self.name))
command = "G 0 \r\n"
self.ser.write(command.encode())
return self.__check_for_transmission_error(command)
[docs] def trigger_level_pd1(self, value: int = None):
"""
Trigger Level PD1 ("DAC flag" = enable) -> automatic adjustment
by T0/T1 commands
Data type: volatile
Args:
value (int, optional): Value between 0 ... 4095 (=3.3V). Set
any value if set is not true. Defaults to None. If None, a
Get command is send instead of a Set command.
"""
if value is not None:
logger.info(
"""Set trigger value of PD1 to {} for {}.""".format(value, self.name)
)
command = "S 1 {}\r\n".format(value)
self.ser.write(command.encode())
elif value is None:
logger.info(
"""Read current trigger value of PD1 for {}.""".format(self.name)
)
command = "G 1\r\n"
self.ser.write(command.encode())
return self.__check_for_transmission_error(command)
[docs] def trigger_level_pd2(self, value: int = None):
"""
Trigger Level PD2 ("DAC flag" = enable) -> automatic adjustment
by T0/T1 commands
Data type: volatile
Args:
value (int, optional): Value between 0 ... 4095 (=3.3V).
Defaults to None. If None, a Get command is send instead of
a Set command.
"""
if value is not None:
logger.info(
"""Set trigger value of PD2 to {} for {}.""".format(value, self.name)
)
command = "S 2 {}\r\n".format(value)
self.ser.write(command.encode())
elif value is None:
logger.info(
"""Read current trigger value of PD2 for {}.""".format(self.name)
)
command = "G 2\r\n"
self.ser.write(command.encode())
return self.__check_for_transmission_error(command)
[docs] def dac_flag(self, value: int = None):
"""
DAC Flag, internal DAC provides trigger level for PD1/ PD2 ->
reset of device on change is recommended
Data type: non-volatile
Args:
value (int, optional): 0 = disable. 1 = enable.
Defaults to None. If None, a Get command is send instead of
a Set command.
"""
if value is not None:
logger.info("""Set DAC Flag to {} for {}.""".format(value, self.name))
command = "S 3 {}\r\n".format(value)
self.ser.write(command.encode())
elif value is None:
logger.info("""Read DAC Flag value for {}.""".format(self.name))
command = "G 3\r\n"
self.ser.write(command.encode())
return self.__check_for_transmission_error(command)
[docs] def data_mode(self, value: int = None):
"""
RS232 Data Mode -> disable can improve signal to noise ratio
(S/N)
Data type: volatile
Args:
value (int, optional): 0 = disable. 1 = counter value stream 1 sec.
2 = counter on trigger (<1.5kHz).
Defaults to None. If None, a Get command is send instead of
a Set command.
"""
if value is not None:
logger.info("""Set data mode to {} for {}.""".format(value, self.name))
command = "S 4 {}\r\n".format(value)
self.ser.write(command.encode())
elif value is None:
logger.info("""Read data mode for {}.""".format(self.name))
command = "G 4\r\n"
self.ser.write(command.encode())
return self.__check_for_transmission_error(command)
[docs] def size_counter_table(self, value: int = None):
"""
Size of counter table. Erase counter table and start collecting
counter values.
Data type: volatile
Args:
value (int, optional): 0 ... 8192.
Defaults to None. If None, a Get command is send instead of
a Set command.
"""
if value is not None:
logger.info(
"""Set the size of the counter table to {} for {}.""".format(
value, self.name
)
)
command = "S 5 {}\r\n".format(value)
self.ser.write(command.encode())
elif value is None:
logger.info(
"""Read the size of the counter table for {}.""".format(self.name)
)
command = "G 5\r\n"
self.ser.write(command.encode())
return self.__check_for_transmission_error(command)
[docs] def output_mode_counter_table(self, value: int = None):
"""
Output mode for counter table (setting for "Read Counter Table"
command) -> in binary mode, high byte first
Data type: non-volatile
Args:
value (int, optional): 0 = ASCII, 1 = binary.
Defaults to None. If None, a Get command is send instead of
a Set command.
"""
if value is not None:
logger.info(
"""Set the output mode for the counter table to {} for {}.""".format(
value, self.name
)
)
command = "S 6 {}\r\n".format(value)
self.ser.write(command.encode())
elif value is None:
logger.info(
"""Read the size of the counter table for {}.""".format(self.name)
)
command = "G 6\r\n"
self.ser.write(command.encode())
return self.__check_for_transmission_error(command)
[docs] def change_test_pattern(self, value: int = None):
"""
Changing test pattern on parallel counter output for checking
wiring -> do not provide any trigger signal while enabled
Data type: volatile
Args:
value (int, optional): 0 = ASCII, 1 = binary.
Defaults to None. If None, a Get command is send instead of
a Set command.
"""
if value is not None:
logger.info(
"""Change test pattern to {} for {}.""".format(value, self.name)
)
command = "S 9 {}\r\n".format(value)
self.ser.write(command.encode())
elif value is None:
logger.info("""Get current test pattern for {}.""".format(self.name))
command = "G 9\r\n"
self.ser.write(command.encode())
return self.__check_for_transmission_error(command)
[docs] def select_triggering_counter(self, value: int = None):
"""
Select PD Signal for Triggering Counter. reset of counter ->
refer to chapter in datasheet "Encoder counting modes"
Data type: non-volatile
Args:
value (int, optional): 0 = positive, not inverted
1 = negative, inverted.
Defaults to None. If None, a Get command is send instead of
a Set command.
"""
if value is not None:
logger.info(
"""Set PD Signal for Triggering Counter to {} for {}.""".format(
value, self.name
)
)
command = "S 20 {}\r\n".format(value)
self.ser.write(command.encode())
elif value is None:
logger.info(
"""Get PD Signal for Triggering Counter for {}.""".format(self.name)
)
command = "G 20\r\n"
self.ser.write(command.encode())
return self.__check_for_transmission_error(command)
[docs] def select_polarity_pd1(self, value: int = None):
"""
Select polarity of PD1 signal. reset of counter -> refer to
chapter of datasheet "Encoder Counting Modes"
Data type: non-volatile
Args:
value (int, optional): 0 = positive, not inverted
1 = negative, inverted.
Defaults to None. If None, a Get command is send instead of
a Set command.
"""
if value is not None:
logger.info(
"""Set polarity of PD1 signal to {} for {}.""".format(value, self.name)
)
command = "S 21 {}\r\n".format(value)
self.ser.write(command.encode())
elif value is None:
logger.info("""Get polarity of PD1 signal for {}.""".format(self.name))
command = "G 21\r\n"
self.ser.write(command.encode())
return self.__check_for_transmission_error(command)
[docs] def select_polarity_pd2(self, value: int = None):
"""
Select polarity of PD2 signal. reset of counter -> refer to
chapter of datasheet "Encoder Counting Modes"
Data type: non-volatile
Args:
value (int, optional): 0 = positive, not inverted
1 = negative, inverted.
Defaults to None. If None, a Get command is send instead of
a Set command.
"""
if value is not None:
logger.info(
"""Set polarity of PD2 signal to {} for {}.""".format(value, self.name)
)
command = "S 22 {}\r\n".format(value)
self.ser.write(command.encode())
elif value is None:
logger.info("""Get polarity of PD2 signal for {}.""".format(self.name))
command = "G 22\r\n"
self.ser.write(command.encode())
return self.__check_for_transmission_error(command)
[docs] def parallel_counter_mode(self, value: int = None):
"""
Parallel counter output port mode for counter value -> port
inactive can improve S/N
Data type: non-volatile
Args:
value (int, optional): 0 = positive inactive (force low)
1 = port active.
Defaults to None. If None, a Get command is send instead of
a Set command.
"""
if value is not None:
logger.info(
"""Set parallel counter output port mode for counter value to {} for {}.""".format(
value, self.name
)
)
command = "S 23 {}\r\n".format(value)
self.ser.write(command.encode())
elif value is None:
logger.info(
"""Get parallel counter output port mode for counter value for {}.""".format(
self.name
)
)
command = "G 23\r\n"
self.ser.write(command.encode())
return self.__check_for_transmission_error(command)
[docs] def grabbing_mode_counter_table(self, value: int = None):
"""
Grabbing mode for counter table -> if mode changed, use "size of
counter table" command next
Data type: volatile
Args:
value (int, optional): 0 = grabbing single sequence
1 = continuous.
Defaults to None. If None, a Get command is send instead of
a Set command.
"""
if value is not None:
logger.info("""Set grabbing mode to {} for {}.""".format(value, self.name))
command = "S 24 {}\r\n".format(value)
self.ser.write(command.encode())
elif value is None:
logger.info("""Get current grabbing mode for {}.""".format(self.name))
command = "G 24\r\n"
self.ser.write(command.encode())
return self.__check_for_transmission_error(command)
#! GET Commands start here
[docs] def get_score_list_pd1(self, value: int):
"""
Score list PD1 used for trigger level calculation
Data type: volatile
Args:
value (int): 0 ... 255 = count of values, followed by table
"""
logger.info("""Get score list of PD1 at {} for {}.""".format(value, self.name))
command = "G 10 {}\r\n".format(value)
self.ser.write(command.encode())
return self.__check_for_transmission_error(command)
[docs] def get_score_list_pd2(self, value: int):
"""
Score list PD2 used for trigger level calculation
Data type: volatile
Args:
value (int): 0 ... 255 = count of values, followed by table
"""
logger.info("""Get score list of PD2 at {} for {}.""".format(value, self.name))
command = "G 11 {}\r\n".format(value)
self.ser.write(command.encode())
return self.__check_for_transmission_error(command)
[docs] def read_counter_table(self, value: int):
"""
Read counter table. Memory block label, count of values, select
output mode first. -> in binary mode, high byte first
Data type: volatile
#todo not sure if whole functionality is covered
Args:
value (int): 1 ... 2 '_' 0 ... 8192
-> followed by table
* 1 ... 2 = memory block label
* 0 ... 8192 = count of values
"""
logger.info("""Read counter table {} for {}.""".format(value, self.name))
command = "G 12 {}\r\n".format(
value
) # todo not sure if whole functionality is covered
self.ser.write(command.encode())
return self.__check_for_transmission_error(command)
[docs] def status_counter_table(self, value: int):
"""
Status counter table. status, number of triggers left to
complete counter table
#todo not sure if whole functionality is covered
Args:
value (int): 0 ... 3 '_' 0 ...8192
* 0 ... 3 = status
* 0 = no block ready
* 1 = block 1 ready for readout
* 2 = block 2 ready for readout
* 3 = block write overrun
* 0 ... 8192 = triggers left to complete
"""
logger.info(
"""Read status of counter table {} for {}.""".format(value, self.name)
)
command = "G 13 {}\r\n".format(
value
) # todo not sure if whole functionality is covered
self.ser.write(command.encode())
return self.__check_for_transmission_error(command)
[docs] def compilation_stamp(self, value: int):
"""
Compilation stamp
Data type: non-volatile
Args:
value (int): 21 (count of characters)
-> followed by string
"""
logger.info("""Read compilation stamp {} for {}.""".format(value, self.name))
command = "G 99 {}\r\n".format(value)
self.ser.write(command.encode())
return self.__check_for_transmission_error(command)
#! TASK commands start here
[docs] def collect_data_trigger_calculation(self):
"""
Start collecting data for PD trigger level calcultion. "v.s.
offset calculation" (handwritten in datasheet)
"""
logger.info(
"""Start collecting data for PD trigger level calcultion for {}.""".format(
self.name
)
)
command = "T 0\r\n"
self.ser.write(command.encode())
return self.__check_for_transmission_error(command)
[docs] def stop_collect_data_trigger_calculation(self):
"""
Stop collecting data, calculate new trigger level for PD1+2
using score lists, set new levels with DAC
"""
logger.info(
"""Stop collecting data, calculate new trigger level for PD1+2 for {}.""".format(
self.name
)
)
command = "T 1\r\n"
self.ser.write(command.encode())
return self.__check_for_transmission_error(command)
[docs] def set_nonvolatile_variables_to_default(self):
"""
Set all nonvolatile variable to default. reset of device ist
recommended.
"""
logger.info(
"""Set all nonvolatile variable to default for {}.""".format(self.name)
)
command = "T 97\r\n"
self.ser.write(command.encode())
return self.__check_for_transmission_error(command)
def __check_for_transmission_error(self, command: str) -> bool:
"""
Checks if data transmission was successful.
The microcontroller returns the string that was sent in all
lowercase letters. So to check whether transfer was successful
we check if the command string converted to lowercase letters
corresponds to the string we received from the controller.
Args:
command (str): Command sent to controller.
Returns:
bool: True when successfull, False when not successfull.
"""
# We need to remove white spaces from the response to make it
# comparable E.g. if you send <<S 4 1 \r\n>> it will respond:
# <<s4 1 \r\n>>
response = self.ser.readline().decode().replace(" ", "")
# print(response)
# The interferometer counter returns the command in lower case
# (and with different white spaces (see above))
command = command.lower().replace(" ", "")
if response[0] != "g":
if response == command:
logger.info(
r"Transmission of data {} successful for {}.".format(
repr(command), self.name
)
)
return True
else:
logger.error(
r"Transmission of data {} not successful for {}.".format(
repr(command), self.name
)
)
return False
else:
if command[:-2] in response:
logger.info(
r"Transmission of data {} successful for {}.".format(
repr(command), self.name
)
)
response = response.replace(command[:-2], "")
return int(response)
else:
logger.error(
r"Transmission of data {} not successful for {}.".format(
repr(command), self.name
)
)
return False
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.end()
[docs]class InterferometerCounter:
"""
InterferometerCounter class is used to obtain the interferometer
position for every laser shot.
By sending certain commands the interferometer counter is set to
single sequence mode. The offset of the photodiodes is then
calculated by moving the motor. Then, the direction of the stage is
determined and the polarity of the photodiodes is set respectively.
For this process it is necessary to calculate the counter values
with the calculate_counter_values method.
Args:
port (str): COM port to which the counter is connected. Please
check in the device manager of your operating system.
* E.g. "COM7"
interferometer_stage (PiStage): Object which contains the
functionality of the interferometer stage. Is used for moving
the stage etc.
adc (AnalogDigitalConverter): Object that communicates with ADC
to which MCT detector is connected.
bin_config_path (Path): Path to configuration file where the
calibration values for the calculation of the bins are stored.
The values in this file are obtained by measuring the used R2R
network
name (str, optional): Name / Identifier to give to this
controller. This is relevant for log statements, especially when
there is more than one interferometer in the setup. Defaults to
"Interferometer Counter".
Attributes:
connection(SerialInterferometerCounter): Provides the serial
connection and commands to the interferometer counter.
name (str, optional): Name / Identifier to give to this stage.
This is relevant for log statements, especially when there is
more than one stage in the setup. Defaults to "XYZ Stage".
port (str): COM port to which the controller is connected.
Please check in the device manager of your operating system.
interferometer_stage (PiStage): Connects and initializes the
interferometer stage with the Pi_controll module. Provides the
functionality in PiStage class.
bin_config_path (Path): Path to configuration file. This is
where the binlist and the reference values for the R2R network
are stored
"""
def __init__(
self,
port: str,
interferometer_stage: PiStage,
adc: AnalogDigitalConverter,
bin_config_path: Path,
name: str = "Interferometer Counter",
):
# Initializing attributes
self.connection = SerialInterferometerCounter(port)
self.name = name
self.port = port
self.interferometer_stage = interferometer_stage
self.adc = adc
self.bin_config_path = bin_config_path
# Load bin reference values for R2R
self.__load_bin_reference_values()
# Get the row indices of the ADC data array that hold the data
# of the interferometer counter
self.__find_r2r_indices()
# if True:
# self.different_init()
# return
# print("ATM this should not be printed!")
# ------- This next part is copied from LabView (init_if(...).vi) -------
# We have the impression that the interferometer is set up to be
# working like in Graph 3 of 'Encoder Counting Modes' (p. 3 of
# the manual). The configuration command sequence for that
# setting is: 'S 20 1', 'S 21 0', 'S 22 0' For some reason the
# order of there commands is different than in the manual
# ---- CITATION ----- (LabView Code)
# "The sequence of commands must be followed! Switching up
# commands during initialisation will result in obscure bugs
# like a drifting counter!"
logger.info("Starting initalisation command sequence for {}.".format(self.name))
self.connection.data_mode(2) # Set data mode to count on trigger
self.connection.select_polarity_pd1(
0
) # Set polarity of photo diode 1 to positive, not inverted
self.connection.select_polarity_pd2(
0
) # Set polarity of photo diode 2 to positive, not inverted
self.connection.data_mode(
2
) # Set data mode to count on trigger #! Not clear why sending twice
self.connection.output_mode_counter_table(
0
) # Set output of read_counter_table to return data in ASCII format
self.connection.select_triggering_counter(
1
) # Set trigger to count in specific way. See page 3/10 of manual
self.connection.dac_flag(1) # Set DAC Flag enable
self.connection.grabbing_mode_counter_table(
0
) # Set grabbing mode of counter table to be continuos
logger.info(
"Initalisation command sequence for {} completed.".format(self.name)
)
# Start offset calculation for Photodiodes For this, we need to
# move interferometer motor slowly
logger.info("Starting photodiode offset {} calculation.".format(self.name))
prior_velocity = self.interferometer_stage.velocity
self.interferometer_stage.set_velocity_mm(0.01)
self.interferometer_stage.move(400, wait=False)
self.connection.collect_data_trigger_calculation()
# Wait for process to complete
time.sleep(3)
# Stop offset calculation
self.connection.stop_collect_data_trigger_calculation()
self.interferometer_stage.wait()
logger.info(
"Offset calculation for photodiodes for {} completed".format(self.name)
)
# Check whether counting direction is correct. Meaning that we
# count in the right direction when the interferometer stage is
# moving forward and counting in the opposite direction when the
# stage is moving backwards.
# ---- CITATION ----- (LabView Code)
# "The second version of the Interferometer Counter chip seems
# to switch its counting direction randomly during
# initialisation. To ensure the correct direction it is set to
# zero and driven into the "positive direction". If it counts to
# the negative direction (meaning below zero -> 65535 and lower)
# the polarity of Photo diode 1+2 is inverted what results in
# an inverted counting direction. "
logger.info(
"Check whether counting direction for {} is correct.".format(self.name)
)
self.interferometer_stage.set_velocity_mm(3)
# The .move method moves relative to t_zero in negative
# direction. I.e.: target position = t_zero position - target
# delay This implies that the movement from 500fs to 400 fs is
# actually in positive direction. Note: It is stupid to move to
# 500fs then to 400fs. Instead of just moving to 300fs. It's
# what it's.
self.interferometer_stage.move(500, wait=True)
# Reset counter by setting current position to counter value = 0
self.reset_counter()
self.interferometer_stage.move(400, wait=True) # Moving in positive direction.
prior_samples_to_acquire_value = (
self.adc.samples_to_acquire
) # Save old samples to acquire value to reset it later.
self.adc.set_samples_to_acquire(10)
self.adc.read()
interferometer_positions = self.calculate_counter_values(
self.adc.data
) #! Method needs work!
# We want to check whether the counter counted into the positive
# direction when moving delay "forward". If counter counts
# backwards, its memory will overflow and end up somewhere
# around 65535. 25000 was chosen literally arbitrarily (LabView
# legacy). If the counter is not within this range, we need to
# change the counting direction.
if 0 < interferometer_positions[-1] < 25000:
logger.info("{} is counting in the correct direction.".format(self.name))
else:
logger.info("{} is counting in the wrong direction.".format(self.name))
self.connection.select_polarity_pd1(
2
) # Set polarity of photo diode 1 to negative, inverted
self.connection.select_polarity_pd2(
2
) # Set polarity of photo diode 2 to negative, inverted
logger.info(
"Changed polarity of both photodiodes/counting directions for {}".format(
self.name
)
)
# Reset samples to acquire
self.adc.set_samples_to_acquire(prior_samples_to_acquire_value)
# Reset velocity of stage/motor
self.interferometer_stage.set_velocity_mm(prior_velocity)
# Move motor back to zero position (where interferometer pulses
# overlap temporally)
self.interferometer_stage.move(0, wait=True)
# ------- End of part that is copied from LabView (init_if(...).vi) -------
[docs] def different_init(self):
# ------- This next part is copied from LabView (init_if(...).vi) -------
# We have the impression that the interferometer is set up to be
# working like in Graph 3 of 'Encoder Counting Modes' (p. 3 of
# the manual). The configuration command sequence for that
# setting is: 'S 20 1', 'S 21 0', 'S 22 0' For some reason the
# order of there commands is different than in the manual
# ---- CITATION ----- (LabView Code)
# "The sequence of commands must be followed! Switching up
# commands during initialisation will result in obscure bugs
# like a drifting counter!"
logger.info("Starting initalisation command sequence for {}.".format(self.name))
self.connection.output_mode_counter_table(0) # Set Outputting Mode to ASCII
print(
"Outputting Mode: ", self.connection.output_mode_counter_table()
) # Set Outputting Mode to ASCII
self.connection.parallel_counter_mode(1) # Enable parallel port output
print("Parallel output mode: ", self.connection.parallel_counter_mode())
self.connection.data_mode(2) # Disable RS232 data mode #! ------------?
print("Data mode RS232: ", self.connection.data_mode())
self.connection.select_triggering_counter(1) # Set trigger to count on PD1
print("Count on PD: ", self.connection.select_triggering_counter())
self.connection.select_polarity_pd1(
0
) # Set polarity of photo diode 1 to positive, not inverted
print(
"PD1 Polarity: ", self.connection.select_polarity_pd1()
) # Set polarity of photo diode 1 to positive, not inverted
self.connection.select_polarity_pd2(
0
) # Set polarity of photo diode 2 to positive, not inverted
print(
"PD2 Polarity: ", self.connection.select_polarity_pd2()
) # Set polarity of photo diode 2 to positive, not inverted
print(
"DAC Trigger Level Adjust IS SET TO: ", self.connection.dac_flag()
) # Set DAC Flag enable
self.connection.dac_flag(1) # Set DAC Flag enable
print(
"DAC Trigger Level Adjust: ", self.connection.dac_flag()
) # Set DAC Flag enable
self.connection.grabbing_mode_counter_table(
1
) # Set grabbing mode of counter table to be continuos
print(
"Grabbing mode:", self.connection.grabbing_mode_counter_table()
) # Set grabbing mode of counter table to be continuos
self.connection.size_counter_table(8192)
print("Counter table size: ", self.connection.size_counter_table())
logger.info(
"Initalisation command sequence for {} completed.".format(self.name)
)
# Start offset calculation for Photodiodes For this, we need to
# move interferometer motor slowly
logger.info("Starting photodiode offset {} calculation.".format(self.name))
prior_velocity = self.interferometer_stage.velocity
self.interferometer_stage.set_velocity_mm(0.01)
self.interferometer_stage.move(400, wait=False)
self.connection.collect_data_trigger_calculation()
# Wait for process to complete
time.sleep(3)
# Stop offset calculation
self.connection.stop_collect_data_trigger_calculation()
trig_level_1 = self.connection.trigger_level_pd1()
trig_level_2 = self.connection.trigger_level_pd2()
print("Trigger level PD1: ", trig_level_1)
print("Trigger level PD2: ", trig_level_2)
self.interferometer_stage.wait()
logger.info(
"Offset calculation for photodiodes for {} completed".format(self.name)
)
# Check whether counting direction is correct. Meaning that we
# count in the right direction when the interferometer stage is
# moving forward and counting in the opposite direction when the
# stage is moving backwards.
# ---- CITATION ----- (LabView Code)
# "The second version of the Interferometer Counter chip seems
# to switch its counting direction randomly during
# initialisation. To ensure the correct direction it is set to
# zero and driven into the "positive direction". If it counts to
# the negative direction (meaning below zero -> 65535 and lower)
# the polarity of Photo diode 1+2 is inverted what results in
# an inverted counting direction. "
logger.info(
"Check whether counting direction for {} is correct.".format(self.name)
)
self.interferometer_stage.set_velocity_mm(3)
# The .move method moves relative to t_zero in negative
# direction. I.e.: target position = t_zero position - target
# delay This implies that the movement from 500fs to 400 fs is
# actually in positive direction. Note: It is stupid to move to
# 500fs then to 400fs. Instead of just moving to 300fs. It's
# what it's.
self.interferometer_stage.move(500, wait=True)
# Reset counter by setting current position to counter value = 0
self.reset_counter()
self.interferometer_stage.move(400, wait=True) # Moving in positive direction.
prior_samples_to_acquire_value = (
self.adc.samples_to_acquire
) # Save old samples to acquire value to reset it later.
self.adc.set_samples_to_acquire(100)
self.adc.read()
plt.plot(
self.adc.data[
-4:,
].T
)
plt.show()
interferometer_positions = self.calculate_counter_values(
self.adc.data
) #! Method needs work!
# We want to check whether the counter counted into the positive
# direction when moving delay "forward". If counter counts
# backwards, its memory will overflow and end up somewhere
# around 65535. 25000 was chosen literally arbitrarily (LabView
# legacy). If the counter is not within this range, we need to
# change the counting direction.
if 0 < interferometer_positions[-1] < 25000:
logger.info("{} is counting in the correct direction.".format(self.name))
else:
logger.info("{} is counting in the wrong direction.".format(self.name))
self.connection.select_polarity_pd1(
2
) # Set polarity of photo diode 1 to negative, inverted
self.connection.select_polarity_pd2(
2
) # Set polarity of photo diode 2 to negative, inverted
logger.info(
"Changed polarity of both photodiodes/counting directions for {}".format(
self.name
)
)
# Reset samples to acquire
self.adc.set_samples_to_acquire(prior_samples_to_acquire_value)
# Reset velocity of stage/motor
self.interferometer_stage.set_velocity_mm(
prior_velocity[self.interferometer_stage.axis]
)
# Move motor back to zero position (where interferometer pulses
# overlap temporally)
self.interferometer_stage.move(0, wait=True)
# ------- End of part that is copied from LabView (init_if(...).vi) -------
[docs] def reset_counter(self):
"""
Reset counter by setting current interferometer position to the
counter value 0.
"""
# Reset counter by setting current position to counter value = 0
self.connection.counter_value(0)
[docs] def start_counter(self):
"""
Sets the size of the counter table and is needed to start the
counter in general.
Stop counter method not needed because we set counting mode to
single sequence mode
-> Stops on its own.
"""
if self.adc.samples_to_acquire < 0 or self.adc.samples_to_acquire > 8192:
logger.error(
"Counter table size reaches from 0 to 8192. The number of samples to acquire is either set below 0 or above 8192."
)
self.connection.size_counter_table(self.adc.samples_to_acquire)
[docs] def calculate_counter_values(self, data: ndarray) -> ndarray:
"""
Determines the interferometer position from the (counter) values
that the ADC collected from the R2R-Network.
The counter electronics from Zurich outputs the position via USB
(on command) and also on a 16 line parallel port for every laser
trigger. The results in a 16 digit binary number that is then
converted into 4 digit hexadecimal number. Which is output on a
4 line parallel port (Interferometer counter box BNC R2R 1-4,
BNC R2R 5-8 etc.).
This data is recorded by the ADC. To obtain the actual counter
values we need to decode the analog voltages that the ADC
recorded. This is achieved by binning the data from all four
channels accordingly using an array containing reference values
and the function np.digitize.
These reference values have to be measured manually from the R2R
Network by applying 5V to different R2R inputs and recording the
corresponding output voltages. These values have to be written
in a .csv file (see existing files). The resulting hexadecimal
values for each line are then modified to match their poisition
in the hexadecimal tetrade using the function np.left_shift. BNC
R2R 1-4 is interpreted to be the channel containing the least
significant digit (and so on).
Finally, we get the counter position by summation of all four
hexadecimal numbers. We now need to divide by 2 because
apparently the circuit counts the zero crossings for both
photodiodes. (Technically it only makes to count on one
photodiode, because the actual resolution is determined by the
HeNe Wavelength.) Afterwards we floor the values to obtain
integers. Note this is not mentioned in the datasheet and
apparently is a empirically determined phenomenon copied from
LabView (see dataprocessing where this method can also be found
for an explanation to why this works).
References:
| H-Lab reference values for R2R-Network.csv
| TimingScheme.pdf
| Counter für Interferometer; Anleitung; Manual; Universität Zürich.pdf
Args:
data (ndarray): Complete data set from adc (including pixel
data, wobbler, R2R etc.) The indices/ rows containing the
information of the R2R are selected automatically.
* shape: 2D
* E.g. (number of channels, samples to acquire)
Returns:
ndarray: array containing interferometer position in counts.
* shape: (self.adc.samples_to_acquire)
"""
# Compute corresponding hexadecimal levels (0-15) from R2R
# voltage. The np.digitize function returns the bin number
# (given by bin_reference_values) in which a measured voltage
# value belongs.
inds = np.zeros(data[self.r2r_indices, :].shape, dtype="uint16")
for row in range(4):
inds[row, :] = np.digitize(
data[self.r2r_indices[row], :], self.bin_reference_values[row, :]
)
# Now we have the corresponding digits for each tetrade but they
# are not in the correct hexadecimal position: For example: The
# digit from the R2R line 13-16 might be 0x6 (hexadecimal digit
# 6) but the actual value it corresponds to is 0x6000 (0x
# indicates hexadecimal number). So to shift all values to their
# correct position we shift the bits a corresponding multiple of
# 4 to the left (np.left_shift). We need to transpose inds
# because otherwise numpy can not broadcast the two arrays (inds
# and [0,4,8,12]) together. To obtain the actual counter values
# we now need to add all the values from all lines for each
# point in time together. (E.g.: For each point in time we have
# 4 values, i.e.: [0x1, 0xA0, 0x700, 0xE000] we obtain the
# actual counter value by summing these values: 0xE7A1 = 59297)
counter_values = np.left_shift(inds.T, np.arange(0, 16, 4)).sum(axis=1)
# We now need to divide by 2 because apparently the circuit
# counts the zero crossings for both photodiodes. (Technically
# it only makes sense to count on one photodiode, because the
# actual resolution is determined by the HeNe Wavelength.)
# Afterwards we floor the values to obtain integers. Note this
# is not mentioned in the datasheet and apparently is a
# empirically determined phenomenon copied from LabView.
return np.uint16(counter_values / 2)
def __load_bin_reference_values(self):
"""
Loads the bin reference values from a npy file specified in the
self.bin_config_path. It saves the according reference values
for each bin for each R-2R in a numpy array.
(self.bin_reference_values)
Note:
The npy file must list the reference values a ascending
order, otherwise the np.digitize() function will in the best
case raise an error and in the worst case sort the data in
the wrong order.
"""
logger.info(
"Loading reference values (bins) for R2R Network configuration file for {}".format(
self.name
)
)
self.bin_reference_values = np.load(self.bin_config_path)
#! At this point, this algorithm is basically fucked. This one
#! too.
def __find_r2r_indices(self):
"""
Finds the indices for the self.adc.array where the information
for the R2R Network is located.
Note:
The order in which the according R2R inputs (to the ADC) are
listed is not relevant, since we sort them here. BUT if
naming scheme is changed, this can have catastrophic unseen
consequences. It is necessary that the names e.g.: R2R1,
R2R2... are kept in a way that when sorted alphabetically,
R2R1 is the first entry and so on.
"""
self.__r2r_config = []
for key, val in self.adc.index_dict.items():
if "R2R" in key:
self.__r2r_config.append(val)
self.__r2r_config.sort() # ? Is it really necessary to sort?
self.r2r_indices = np.array(self.__r2r_config)
logger.info(
"The R2R indices/row in the detector data are {}.".format(self.r2r_indices)
)
[docs] def end(self):
self.connection.end()
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.end()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# port="COM9"
# counter = SerialInterferometerCounter(port)
# counter.size_counter_table(100)
# # time.sleep(1)
# counter.trigger_level_pd1(2047)
# counter.counter_value(123)
# counter.parallel_counter_mode(1)
# # counter.data_mode(0)
# # # counter.set_nonvolatile_variables_to_default()
# # counter_val = counter.counter_value()
# # trig_level_1 = counter.trigger_level_pd1()
# # trig_level_2 = counter.trigger_level_pd2()
# # dac_mode = counter.dac_flag()
# # data_mode = counter.data_mode()
# # output_mode = counter.output_mode_counter_table()
# # polarity_1 = counter.select_polarity_pd1()
# # polarity_2 = counter.select_polarity_pd2()
# parallel_port_mode = counter.parallel_counter_mode()
# # print("counter_val:",counter_val)
# # print("trig_level_1:",trig_level_1)
# # print("trig_level_2:",trig_level_2)
# # print("dac_mode:",dac_mode)
# # print("data_mode:",data_mode)
# # print("output_mode:",output_mode)
# # print("polarity_1:",polarity_1)
# # print("polarity_2:",polarity_2)
# print("parallel_port_mode:",parallel_port_mode)
# # counter.set_nonvolatile_variables_to_default()
# counter.end()
# Testing the combination of a slow
# movement of the interferometer stage
# and the reading and processing the data
# from the r2r
from matplotlib import pyplot as plt
from analog_digital_converter import AnalogDigitalConverter as ADC
import pi_control
from interferometer_counter import InterferometerCounter
# # Base config:
from hardware_properties import HardwareProperties
import data_processing as dp
path = r"C:\Users\I-LAB-PC\Documents\akb_software\hardware_config_files\i-lab_hardware_properties.json"
hw = HardwareProperties(path)
interferometer_ctrl = hw.pi_instruments.interferometer_controller
interferometer_stage_properties = (
hw.pi_instruments.interferometer_controller.stages.interferometer_stage
)
# Calculate velocity and other params to smoothly record all values
he_ne_wl = hw.he_ne_laser.wavelength
dist = (
25000 // 2 * he_ne_wl
) # 0.11991698320000001 #0xFFFF//2 * he_ne_wl # (dist is approx 21 mm and thus can not be actually over gone with our interferometer)
speed = (
he_ne_wl * hw.adc.laser_frequency / 4
) # (speed is approx 2mm /s whut?) #! This will probably have to be changed to a WAAAY smaller number
time_to_traverse = dist / speed * 0.01
print("time to traverse: ", time_to_traverse)
print("speed: ", speed)
with ADC(
hw.adc.device_name,
hw.adc.input_configuration_path,
hw.adc.samples_to_acquire,
hw.adc.trigger_source,
hw.adc.sampling_rate,
hw.adc.laser_frequency,
) as adc, pi_control.PiController(
interferometer_ctrl.model,
interferometer_ctrl.stage_names,
com_port=interferometer_ctrl.port,
name=interferometer_ctrl.name,
) as interferometer_controller, pi_control.PiStage(
interferometer_controller,
interferometer_stage_properties.axis,
interferometer_stage_properties.t_zero,
interferometer_stage_properties.direction,
interferometer_stage_properties.path_factor,
interferometer_stage_properties.initial_velocity,
name=interferometer_stage_properties.name,
) as interferometer_stage, InterferometerCounter(
hw.interferometer_counter.port,
interferometer_stage,
adc,
hw.interferometer_counter.config_path,
) as interferometer_counter:
# Setup coherence time scan
coherence_time = 4 # ps
frequency = 1.5 # Hz
speedupdown = 500
interferometer_stage.setup_coherence_time_scan(
coherence_time, frequency, speedupdown
)
# Move stage to reset position and reset counter to 0
interferometer_stage.move_mm(interferometer_stage.reset_position)
interferometer_counter.reset_counter()
interferometer_stage.move_mm(interferometer_stage.start_pos)
adc.set_acquisition_time(1 / frequency * 5)
interferometer_stage.start_wavegen()
adc.read(timeout=-1)
interferometer_stage.stop_wavegen()
# Calculate interferometer states the way it is done int ft_2d_ir
interferometer_states = (
int(
np.ceil(
(
interferometer_stage.amplitude
- 2 * interferometer_stage.overshoot_mm
)
// he_ne_wl
)
)
* interferometer_stage.path_factor
)
counter_data = dp.calculate_counter_values(
adc.data,
interferometer_counter.r2r_indices,
interferometer_counter.bin_reference_values,
)
print("Interferometer states that need to be observed: ", interferometer_states)
if (counter_data == 0).any():
print("Lower bound is OK.")
else:
print("!!!Lower bound is NOT OK.!!!")
if (counter_data == interferometer_states).any():
print("Upper bound is OK.")
else:
print("!!!Upper bound is NOT OK.!!!")
# counter_data = interferometer_counter.calculate_counter_values(adc.data)
fig, ax = plt.subplots(nrows=2)
ax[0].plot(counter_data)
ax[1].plot(adc.data[interferometer_counter.r2r_indices, :].T)
plt.show()