Source code for hardware_interfaces.interferometer_counter

# Import helper modules
import typing
from typing import List, Tuple
from numpy import ndarray
import logging
import os
from matplotlib import pyplot as plt

# Import essential modules
import serial
import numpy as np
import csv
import time
from pi_control import PiStage
from analog_digital_converter import AnalogDigitalConverter

# Set up logger
logger = logging.getLogger(__name__)


# Set up 'path'-like type
Path = typing.Union[str, bytes, os.PathLike]
AnalogDigitalConverter = (
    typing.Any
)  # TODO please correct this, I don't know how. Without a definition
# import will fail and so will doc building -GW
ndarray = typing.Any  # TODO please correct this also -GW


[docs]class SerialInterferometerCounter: """ Connecting to the interferometer counter and preamplifier electronics from Zürich with PySerial. This class is contains methods for all serial commands provided in the manual. Args: port (str): COM port to which the controller is connected. Please check in the device manager of your operating system. * E.g. "COM7" name (str, optional): Name / Identifier to give to this controller. This is relevant for log statements, especially when there is more than one interferometer in the setup. Defaults to "serial connection to interferometer". Attributes: port (str): COM port to which the controller is connected. Please check in the device manager of your operating system. * E.g. "COM7" name (str, optional): Name / Identifier to give to this controller. This is relevant for log statements, especially when there is more than one interferometer in the setup. Defaults to "serial connection to interferometer". ser (serial.Serial): Pyserial object which initiates the communication with the Counter. References: | Counter für Interferometer; Anleitung; Manual; Universität Zürich.pdf | TimingScheme.pdf | Driver (Win7): https://www.st.com/en/development-tools/stsw-stm32102.html """ def __init__(self, port: str, name: str = "serial connection to interferometer"): # Initialize attributes self.name = name self.port = port # Connecting the controller with pyserial self.ser = serial.Serial( port, 115200, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1, )
[docs] def read(self): """ Reads out all bytes from COM port until newline character appears. """ self.ser.readline()
[docs] def end(self): """ Closes serial connection to the interferometer counter and preamplifier. """ self.ser.close()
#! Set/ Get commands start here
[docs] def counter_value(self, value: int = None): """ Set the counter value Data type: volatile Args: value (int, optional): Value between 0 ... 65535. Defaults to None. If None, a Get command is send instead of a Set command. """ # Choose between sending and getting commands if value is not None: logger.info("""Set counter value to {} for {}.""".format(value, self.name)) command = "S 0 {}\r\n".format(value) self.ser.write(command.encode()) elif value is None: logger.info("""Read current counter value for {}.""".format(self.name)) command = "G 0 \r\n" self.ser.write(command.encode()) return self.__check_for_transmission_error(command)
[docs] def trigger_level_pd1(self, value: int = None): """ Trigger Level PD1 ("DAC flag" = enable) -> automatic adjustment by T0/T1 commands Data type: volatile Args: value (int, optional): Value between 0 ... 4095 (=3.3V). Set any value if set is not true. Defaults to None. If None, a Get command is send instead of a Set command. """ if value is not None: logger.info( """Set trigger value of PD1 to {} for {}.""".format(value, self.name) ) command = "S 1 {}\r\n".format(value) self.ser.write(command.encode()) elif value is None: logger.info( """Read current trigger value of PD1 for {}.""".format(self.name) ) command = "G 1\r\n" self.ser.write(command.encode()) return self.__check_for_transmission_error(command)
[docs] def trigger_level_pd2(self, value: int = None): """ Trigger Level PD2 ("DAC flag" = enable) -> automatic adjustment by T0/T1 commands Data type: volatile Args: value (int, optional): Value between 0 ... 4095 (=3.3V). Defaults to None. If None, a Get command is send instead of a Set command. """ if value is not None: logger.info( """Set trigger value of PD2 to {} for {}.""".format(value, self.name) ) command = "S 2 {}\r\n".format(value) self.ser.write(command.encode()) elif value is None: logger.info( """Read current trigger value of PD2 for {}.""".format(self.name) ) command = "G 2\r\n" self.ser.write(command.encode()) return self.__check_for_transmission_error(command)
[docs] def dac_flag(self, value: int = None): """ DAC Flag, internal DAC provides trigger level for PD1/ PD2 -> reset of device on change is recommended Data type: non-volatile Args: value (int, optional): 0 = disable. 1 = enable. Defaults to None. If None, a Get command is send instead of a Set command. """ if value is not None: logger.info("""Set DAC Flag to {} for {}.""".format(value, self.name)) command = "S 3 {}\r\n".format(value) self.ser.write(command.encode()) elif value is None: logger.info("""Read DAC Flag value for {}.""".format(self.name)) command = "G 3\r\n" self.ser.write(command.encode()) return self.__check_for_transmission_error(command)
[docs] def data_mode(self, value: int = None): """ RS232 Data Mode -> disable can improve signal to noise ratio (S/N) Data type: volatile Args: value (int, optional): 0 = disable. 1 = counter value stream 1 sec. 2 = counter on trigger (<1.5kHz). Defaults to None. If None, a Get command is send instead of a Set command. """ if value is not None: logger.info("""Set data mode to {} for {}.""".format(value, self.name)) command = "S 4 {}\r\n".format(value) self.ser.write(command.encode()) elif value is None: logger.info("""Read data mode for {}.""".format(self.name)) command = "G 4\r\n" self.ser.write(command.encode()) return self.__check_for_transmission_error(command)
[docs] def size_counter_table(self, value: int = None): """ Size of counter table. Erase counter table and start collecting counter values. Data type: volatile Args: value (int, optional): 0 ... 8192. Defaults to None. If None, a Get command is send instead of a Set command. """ if value is not None: logger.info( """Set the size of the counter table to {} for {}.""".format( value, self.name ) ) command = "S 5 {}\r\n".format(value) self.ser.write(command.encode()) elif value is None: logger.info( """Read the size of the counter table for {}.""".format(self.name) ) command = "G 5\r\n" self.ser.write(command.encode()) return self.__check_for_transmission_error(command)
[docs] def output_mode_counter_table(self, value: int = None): """ Output mode for counter table (setting for "Read Counter Table" command) -> in binary mode, high byte first Data type: non-volatile Args: value (int, optional): 0 = ASCII, 1 = binary. Defaults to None. If None, a Get command is send instead of a Set command. """ if value is not None: logger.info( """Set the output mode for the counter table to {} for {}.""".format( value, self.name ) ) command = "S 6 {}\r\n".format(value) self.ser.write(command.encode()) elif value is None: logger.info( """Read the size of the counter table for {}.""".format(self.name) ) command = "G 6\r\n" self.ser.write(command.encode()) return self.__check_for_transmission_error(command)
[docs] def change_test_pattern(self, value: int = None): """ Changing test pattern on parallel counter output for checking wiring -> do not provide any trigger signal while enabled Data type: volatile Args: value (int, optional): 0 = ASCII, 1 = binary. Defaults to None. If None, a Get command is send instead of a Set command. """ if value is not None: logger.info( """Change test pattern to {} for {}.""".format(value, self.name) ) command = "S 9 {}\r\n".format(value) self.ser.write(command.encode()) elif value is None: logger.info("""Get current test pattern for {}.""".format(self.name)) command = "G 9\r\n" self.ser.write(command.encode()) return self.__check_for_transmission_error(command)
[docs] def select_triggering_counter(self, value: int = None): """ Select PD Signal for Triggering Counter. reset of counter -> refer to chapter in datasheet "Encoder counting modes" Data type: non-volatile Args: value (int, optional): 0 = positive, not inverted 1 = negative, inverted. Defaults to None. If None, a Get command is send instead of a Set command. """ if value is not None: logger.info( """Set PD Signal for Triggering Counter to {} for {}.""".format( value, self.name ) ) command = "S 20 {}\r\n".format(value) self.ser.write(command.encode()) elif value is None: logger.info( """Get PD Signal for Triggering Counter for {}.""".format(self.name) ) command = "G 20\r\n" self.ser.write(command.encode()) return self.__check_for_transmission_error(command)
[docs] def select_polarity_pd1(self, value: int = None): """ Select polarity of PD1 signal. reset of counter -> refer to chapter of datasheet "Encoder Counting Modes" Data type: non-volatile Args: value (int, optional): 0 = positive, not inverted 1 = negative, inverted. Defaults to None. If None, a Get command is send instead of a Set command. """ if value is not None: logger.info( """Set polarity of PD1 signal to {} for {}.""".format(value, self.name) ) command = "S 21 {}\r\n".format(value) self.ser.write(command.encode()) elif value is None: logger.info("""Get polarity of PD1 signal for {}.""".format(self.name)) command = "G 21\r\n" self.ser.write(command.encode()) return self.__check_for_transmission_error(command)
[docs] def select_polarity_pd2(self, value: int = None): """ Select polarity of PD2 signal. reset of counter -> refer to chapter of datasheet "Encoder Counting Modes" Data type: non-volatile Args: value (int, optional): 0 = positive, not inverted 1 = negative, inverted. Defaults to None. If None, a Get command is send instead of a Set command. """ if value is not None: logger.info( """Set polarity of PD2 signal to {} for {}.""".format(value, self.name) ) command = "S 22 {}\r\n".format(value) self.ser.write(command.encode()) elif value is None: logger.info("""Get polarity of PD2 signal for {}.""".format(self.name)) command = "G 22\r\n" self.ser.write(command.encode()) return self.__check_for_transmission_error(command)
[docs] def parallel_counter_mode(self, value: int = None): """ Parallel counter output port mode for counter value -> port inactive can improve S/N Data type: non-volatile Args: value (int, optional): 0 = positive inactive (force low) 1 = port active. Defaults to None. If None, a Get command is send instead of a Set command. """ if value is not None: logger.info( """Set parallel counter output port mode for counter value to {} for {}.""".format( value, self.name ) ) command = "S 23 {}\r\n".format(value) self.ser.write(command.encode()) elif value is None: logger.info( """Get parallel counter output port mode for counter value for {}.""".format( self.name ) ) command = "G 23\r\n" self.ser.write(command.encode()) return self.__check_for_transmission_error(command)
[docs] def grabbing_mode_counter_table(self, value: int = None): """ Grabbing mode for counter table -> if mode changed, use "size of counter table" command next Data type: volatile Args: value (int, optional): 0 = grabbing single sequence 1 = continuous. Defaults to None. If None, a Get command is send instead of a Set command. """ if value is not None: logger.info("""Set grabbing mode to {} for {}.""".format(value, self.name)) command = "S 24 {}\r\n".format(value) self.ser.write(command.encode()) elif value is None: logger.info("""Get current grabbing mode for {}.""".format(self.name)) command = "G 24\r\n" self.ser.write(command.encode()) return self.__check_for_transmission_error(command)
#! GET Commands start here
[docs] def get_score_list_pd1(self, value: int): """ Score list PD1 used for trigger level calculation Data type: volatile Args: value (int): 0 ... 255 = count of values, followed by table """ logger.info("""Get score list of PD1 at {} for {}.""".format(value, self.name)) command = "G 10 {}\r\n".format(value) self.ser.write(command.encode()) return self.__check_for_transmission_error(command)
[docs] def get_score_list_pd2(self, value: int): """ Score list PD2 used for trigger level calculation Data type: volatile Args: value (int): 0 ... 255 = count of values, followed by table """ logger.info("""Get score list of PD2 at {} for {}.""".format(value, self.name)) command = "G 11 {}\r\n".format(value) self.ser.write(command.encode()) return self.__check_for_transmission_error(command)
[docs] def read_counter_table(self, value: int): """ Read counter table. Memory block label, count of values, select output mode first. -> in binary mode, high byte first Data type: volatile #todo not sure if whole functionality is covered Args: value (int): 1 ... 2 '_' 0 ... 8192 -> followed by table * 1 ... 2 = memory block label * 0 ... 8192 = count of values """ logger.info("""Read counter table {} for {}.""".format(value, self.name)) command = "G 12 {}\r\n".format( value ) # todo not sure if whole functionality is covered self.ser.write(command.encode()) return self.__check_for_transmission_error(command)
[docs] def status_counter_table(self, value: int): """ Status counter table. status, number of triggers left to complete counter table #todo not sure if whole functionality is covered Args: value (int): 0 ... 3 '_' 0 ...8192 * 0 ... 3 = status * 0 = no block ready * 1 = block 1 ready for readout * 2 = block 2 ready for readout * 3 = block write overrun * 0 ... 8192 = triggers left to complete """ logger.info( """Read status of counter table {} for {}.""".format(value, self.name) ) command = "G 13 {}\r\n".format( value ) # todo not sure if whole functionality is covered self.ser.write(command.encode()) return self.__check_for_transmission_error(command)
[docs] def compilation_stamp(self, value: int): """ Compilation stamp Data type: non-volatile Args: value (int): 21 (count of characters) -> followed by string """ logger.info("""Read compilation stamp {} for {}.""".format(value, self.name)) command = "G 99 {}\r\n".format(value) self.ser.write(command.encode()) return self.__check_for_transmission_error(command)
#! TASK commands start here
[docs] def collect_data_trigger_calculation(self): """ Start collecting data for PD trigger level calcultion. "v.s. offset calculation" (handwritten in datasheet) """ logger.info( """Start collecting data for PD trigger level calcultion for {}.""".format( self.name ) ) command = "T 0\r\n" self.ser.write(command.encode()) return self.__check_for_transmission_error(command)
[docs] def stop_collect_data_trigger_calculation(self): """ Stop collecting data, calculate new trigger level for PD1+2 using score lists, set new levels with DAC """ logger.info( """Stop collecting data, calculate new trigger level for PD1+2 for {}.""".format( self.name ) ) command = "T 1\r\n" self.ser.write(command.encode()) return self.__check_for_transmission_error(command)
[docs] def set_nonvolatile_variables_to_default(self): """ Set all nonvolatile variable to default. reset of device ist recommended. """ logger.info( """Set all nonvolatile variable to default for {}.""".format(self.name) ) command = "T 97\r\n" self.ser.write(command.encode()) return self.__check_for_transmission_error(command)
def __check_for_transmission_error(self, command: str) -> bool: """ Checks if data transmission was successful. The microcontroller returns the string that was sent in all lowercase letters. So to check whether transfer was successful we check if the command string converted to lowercase letters corresponds to the string we received from the controller. Args: command (str): Command sent to controller. Returns: bool: True when successfull, False when not successfull. """ # We need to remove white spaces from the response to make it # comparable E.g. if you send <<S 4 1 \r\n>> it will respond: # <<s4 1 \r\n>> response = self.ser.readline().decode().replace(" ", "") # print(response) # The interferometer counter returns the command in lower case # (and with different white spaces (see above)) command = command.lower().replace(" ", "") if response[0] != "g": if response == command: logger.info( r"Transmission of data {} successful for {}.".format( repr(command), self.name ) ) return True else: logger.error( r"Transmission of data {} not successful for {}.".format( repr(command), self.name ) ) return False else: if command[:-2] in response: logger.info( r"Transmission of data {} successful for {}.".format( repr(command), self.name ) ) response = response.replace(command[:-2], "") return int(response) else: logger.error( r"Transmission of data {} not successful for {}.".format( repr(command), self.name ) ) return False def __enter__(self): return self def __exit__(self, type, value, traceback): self.end()
[docs]class InterferometerCounter: """ InterferometerCounter class is used to obtain the interferometer position for every laser shot. By sending certain commands the interferometer counter is set to single sequence mode. The offset of the photodiodes is then calculated by moving the motor. Then, the direction of the stage is determined and the polarity of the photodiodes is set respectively. For this process it is necessary to calculate the counter values with the calculate_counter_values method. Args: port (str): COM port to which the counter is connected. Please check in the device manager of your operating system. * E.g. "COM7" interferometer_stage (PiStage): Object which contains the functionality of the interferometer stage. Is used for moving the stage etc. adc (AnalogDigitalConverter): Object that communicates with ADC to which MCT detector is connected. bin_config_path (Path): Path to configuration file where the calibration values for the calculation of the bins are stored. The values in this file are obtained by measuring the used R2R network name (str, optional): Name / Identifier to give to this controller. This is relevant for log statements, especially when there is more than one interferometer in the setup. Defaults to "Interferometer Counter". Attributes: connection(SerialInterferometerCounter): Provides the serial connection and commands to the interferometer counter. name (str, optional): Name / Identifier to give to this stage. This is relevant for log statements, especially when there is more than one stage in the setup. Defaults to "XYZ Stage". port (str): COM port to which the controller is connected. Please check in the device manager of your operating system. interferometer_stage (PiStage): Connects and initializes the interferometer stage with the Pi_controll module. Provides the functionality in PiStage class. bin_config_path (Path): Path to configuration file. This is where the binlist and the reference values for the R2R network are stored """ def __init__( self, port: str, interferometer_stage: PiStage, adc: AnalogDigitalConverter, bin_config_path: Path, name: str = "Interferometer Counter", ): # Initializing attributes self.connection = SerialInterferometerCounter(port) self.name = name self.port = port self.interferometer_stage = interferometer_stage self.adc = adc self.bin_config_path = bin_config_path # Load bin reference values for R2R self.__load_bin_reference_values() # Get the row indices of the ADC data array that hold the data # of the interferometer counter self.__find_r2r_indices() # if True: # self.different_init() # return # print("ATM this should not be printed!") # ------- This next part is copied from LabView (init_if(...).vi) ------- # We have the impression that the interferometer is set up to be # working like in Graph 3 of 'Encoder Counting Modes' (p. 3 of # the manual). The configuration command sequence for that # setting is: 'S 20 1', 'S 21 0', 'S 22 0' For some reason the # order of there commands is different than in the manual # ---- CITATION ----- (LabView Code) # "The sequence of commands must be followed! Switching up # commands during initialisation will result in obscure bugs # like a drifting counter!" logger.info("Starting initalisation command sequence for {}.".format(self.name)) self.connection.data_mode(2) # Set data mode to count on trigger self.connection.select_polarity_pd1( 0 ) # Set polarity of photo diode 1 to positive, not inverted self.connection.select_polarity_pd2( 0 ) # Set polarity of photo diode 2 to positive, not inverted self.connection.data_mode( 2 ) # Set data mode to count on trigger #! Not clear why sending twice self.connection.output_mode_counter_table( 0 ) # Set output of read_counter_table to return data in ASCII format self.connection.select_triggering_counter( 1 ) # Set trigger to count in specific way. See page 3/10 of manual self.connection.dac_flag(1) # Set DAC Flag enable self.connection.grabbing_mode_counter_table( 0 ) # Set grabbing mode of counter table to be continuos logger.info( "Initalisation command sequence for {} completed.".format(self.name) ) # Start offset calculation for Photodiodes For this, we need to # move interferometer motor slowly logger.info("Starting photodiode offset {} calculation.".format(self.name)) prior_velocity = self.interferometer_stage.velocity self.interferometer_stage.set_velocity_mm(0.01) self.interferometer_stage.move(400, wait=False) self.connection.collect_data_trigger_calculation() # Wait for process to complete time.sleep(3) # Stop offset calculation self.connection.stop_collect_data_trigger_calculation() self.interferometer_stage.wait() logger.info( "Offset calculation for photodiodes for {} completed".format(self.name) ) # Check whether counting direction is correct. Meaning that we # count in the right direction when the interferometer stage is # moving forward and counting in the opposite direction when the # stage is moving backwards. # ---- CITATION ----- (LabView Code) # "The second version of the Interferometer Counter chip seems # to switch its counting direction randomly during # initialisation. To ensure the correct direction it is set to # zero and driven into the "positive direction". If it counts to # the negative direction (meaning below zero -> 65535 and lower) # the polarity of Photo diode 1+2 is inverted what results in # an inverted counting direction. " logger.info( "Check whether counting direction for {} is correct.".format(self.name) ) self.interferometer_stage.set_velocity_mm(3) # The .move method moves relative to t_zero in negative # direction. I.e.: target position = t_zero position - target # delay This implies that the movement from 500fs to 400 fs is # actually in positive direction. Note: It is stupid to move to # 500fs then to 400fs. Instead of just moving to 300fs. It's # what it's. self.interferometer_stage.move(500, wait=True) # Reset counter by setting current position to counter value = 0 self.reset_counter() self.interferometer_stage.move(400, wait=True) # Moving in positive direction. prior_samples_to_acquire_value = ( self.adc.samples_to_acquire ) # Save old samples to acquire value to reset it later. self.adc.set_samples_to_acquire(10) self.adc.read() interferometer_positions = self.calculate_counter_values( self.adc.data ) #! Method needs work! # We want to check whether the counter counted into the positive # direction when moving delay "forward". If counter counts # backwards, its memory will overflow and end up somewhere # around 65535. 25000 was chosen literally arbitrarily (LabView # legacy). If the counter is not within this range, we need to # change the counting direction. if 0 < interferometer_positions[-1] < 25000: logger.info("{} is counting in the correct direction.".format(self.name)) else: logger.info("{} is counting in the wrong direction.".format(self.name)) self.connection.select_polarity_pd1( 2 ) # Set polarity of photo diode 1 to negative, inverted self.connection.select_polarity_pd2( 2 ) # Set polarity of photo diode 2 to negative, inverted logger.info( "Changed polarity of both photodiodes/counting directions for {}".format( self.name ) ) # Reset samples to acquire self.adc.set_samples_to_acquire(prior_samples_to_acquire_value) # Reset velocity of stage/motor self.interferometer_stage.set_velocity_mm(prior_velocity) # Move motor back to zero position (where interferometer pulses # overlap temporally) self.interferometer_stage.move(0, wait=True) # ------- End of part that is copied from LabView (init_if(...).vi) -------
[docs] def different_init(self): # ------- This next part is copied from LabView (init_if(...).vi) ------- # We have the impression that the interferometer is set up to be # working like in Graph 3 of 'Encoder Counting Modes' (p. 3 of # the manual). The configuration command sequence for that # setting is: 'S 20 1', 'S 21 0', 'S 22 0' For some reason the # order of there commands is different than in the manual # ---- CITATION ----- (LabView Code) # "The sequence of commands must be followed! Switching up # commands during initialisation will result in obscure bugs # like a drifting counter!" logger.info("Starting initalisation command sequence for {}.".format(self.name)) self.connection.output_mode_counter_table(0) # Set Outputting Mode to ASCII print( "Outputting Mode: ", self.connection.output_mode_counter_table() ) # Set Outputting Mode to ASCII self.connection.parallel_counter_mode(1) # Enable parallel port output print("Parallel output mode: ", self.connection.parallel_counter_mode()) self.connection.data_mode(2) # Disable RS232 data mode #! ------------? print("Data mode RS232: ", self.connection.data_mode()) self.connection.select_triggering_counter(1) # Set trigger to count on PD1 print("Count on PD: ", self.connection.select_triggering_counter()) self.connection.select_polarity_pd1( 0 ) # Set polarity of photo diode 1 to positive, not inverted print( "PD1 Polarity: ", self.connection.select_polarity_pd1() ) # Set polarity of photo diode 1 to positive, not inverted self.connection.select_polarity_pd2( 0 ) # Set polarity of photo diode 2 to positive, not inverted print( "PD2 Polarity: ", self.connection.select_polarity_pd2() ) # Set polarity of photo diode 2 to positive, not inverted print( "DAC Trigger Level Adjust IS SET TO: ", self.connection.dac_flag() ) # Set DAC Flag enable self.connection.dac_flag(1) # Set DAC Flag enable print( "DAC Trigger Level Adjust: ", self.connection.dac_flag() ) # Set DAC Flag enable self.connection.grabbing_mode_counter_table( 1 ) # Set grabbing mode of counter table to be continuos print( "Grabbing mode:", self.connection.grabbing_mode_counter_table() ) # Set grabbing mode of counter table to be continuos self.connection.size_counter_table(8192) print("Counter table size: ", self.connection.size_counter_table()) logger.info( "Initalisation command sequence for {} completed.".format(self.name) ) # Start offset calculation for Photodiodes For this, we need to # move interferometer motor slowly logger.info("Starting photodiode offset {} calculation.".format(self.name)) prior_velocity = self.interferometer_stage.velocity self.interferometer_stage.set_velocity_mm(0.01) self.interferometer_stage.move(400, wait=False) self.connection.collect_data_trigger_calculation() # Wait for process to complete time.sleep(3) # Stop offset calculation self.connection.stop_collect_data_trigger_calculation() trig_level_1 = self.connection.trigger_level_pd1() trig_level_2 = self.connection.trigger_level_pd2() print("Trigger level PD1: ", trig_level_1) print("Trigger level PD2: ", trig_level_2) self.interferometer_stage.wait() logger.info( "Offset calculation for photodiodes for {} completed".format(self.name) ) # Check whether counting direction is correct. Meaning that we # count in the right direction when the interferometer stage is # moving forward and counting in the opposite direction when the # stage is moving backwards. # ---- CITATION ----- (LabView Code) # "The second version of the Interferometer Counter chip seems # to switch its counting direction randomly during # initialisation. To ensure the correct direction it is set to # zero and driven into the "positive direction". If it counts to # the negative direction (meaning below zero -> 65535 and lower) # the polarity of Photo diode 1+2 is inverted what results in # an inverted counting direction. " logger.info( "Check whether counting direction for {} is correct.".format(self.name) ) self.interferometer_stage.set_velocity_mm(3) # The .move method moves relative to t_zero in negative # direction. I.e.: target position = t_zero position - target # delay This implies that the movement from 500fs to 400 fs is # actually in positive direction. Note: It is stupid to move to # 500fs then to 400fs. Instead of just moving to 300fs. It's # what it's. self.interferometer_stage.move(500, wait=True) # Reset counter by setting current position to counter value = 0 self.reset_counter() self.interferometer_stage.move(400, wait=True) # Moving in positive direction. prior_samples_to_acquire_value = ( self.adc.samples_to_acquire ) # Save old samples to acquire value to reset it later. self.adc.set_samples_to_acquire(100) self.adc.read() plt.plot( self.adc.data[ -4:, ].T ) plt.show() interferometer_positions = self.calculate_counter_values( self.adc.data ) #! Method needs work! # We want to check whether the counter counted into the positive # direction when moving delay "forward". If counter counts # backwards, its memory will overflow and end up somewhere # around 65535. 25000 was chosen literally arbitrarily (LabView # legacy). If the counter is not within this range, we need to # change the counting direction. if 0 < interferometer_positions[-1] < 25000: logger.info("{} is counting in the correct direction.".format(self.name)) else: logger.info("{} is counting in the wrong direction.".format(self.name)) self.connection.select_polarity_pd1( 2 ) # Set polarity of photo diode 1 to negative, inverted self.connection.select_polarity_pd2( 2 ) # Set polarity of photo diode 2 to negative, inverted logger.info( "Changed polarity of both photodiodes/counting directions for {}".format( self.name ) ) # Reset samples to acquire self.adc.set_samples_to_acquire(prior_samples_to_acquire_value) # Reset velocity of stage/motor self.interferometer_stage.set_velocity_mm( prior_velocity[self.interferometer_stage.axis] ) # Move motor back to zero position (where interferometer pulses # overlap temporally) self.interferometer_stage.move(0, wait=True)
# ------- End of part that is copied from LabView (init_if(...).vi) -------
[docs] def reset_counter(self): """ Reset counter by setting current interferometer position to the counter value 0. """ # Reset counter by setting current position to counter value = 0 self.connection.counter_value(0)
[docs] def start_counter(self): """ Sets the size of the counter table and is needed to start the counter in general. Stop counter method not needed because we set counting mode to single sequence mode -> Stops on its own. """ if self.adc.samples_to_acquire < 0 or self.adc.samples_to_acquire > 8192: logger.error( "Counter table size reaches from 0 to 8192. The number of samples to acquire is either set below 0 or above 8192." ) self.connection.size_counter_table(self.adc.samples_to_acquire)
[docs] def calculate_counter_values(self, data: ndarray) -> ndarray: """ Determines the interferometer position from the (counter) values that the ADC collected from the R2R-Network. The counter electronics from Zurich outputs the position via USB (on command) and also on a 16 line parallel port for every laser trigger. The results in a 16 digit binary number that is then converted into 4 digit hexadecimal number. Which is output on a 4 line parallel port (Interferometer counter box BNC R2R 1-4, BNC R2R 5-8 etc.). This data is recorded by the ADC. To obtain the actual counter values we need to decode the analog voltages that the ADC recorded. This is achieved by binning the data from all four channels accordingly using an array containing reference values and the function np.digitize. These reference values have to be measured manually from the R2R Network by applying 5V to different R2R inputs and recording the corresponding output voltages. These values have to be written in a .csv file (see existing files). The resulting hexadecimal values for each line are then modified to match their poisition in the hexadecimal tetrade using the function np.left_shift. BNC R2R 1-4 is interpreted to be the channel containing the least significant digit (and so on). Finally, we get the counter position by summation of all four hexadecimal numbers. We now need to divide by 2 because apparently the circuit counts the zero crossings for both photodiodes. (Technically it only makes to count on one photodiode, because the actual resolution is determined by the HeNe Wavelength.) Afterwards we floor the values to obtain integers. Note this is not mentioned in the datasheet and apparently is a empirically determined phenomenon copied from LabView (see dataprocessing where this method can also be found for an explanation to why this works). References: | H-Lab reference values for R2R-Network.csv | TimingScheme.pdf | Counter für Interferometer; Anleitung; Manual; Universität Zürich.pdf Args: data (ndarray): Complete data set from adc (including pixel data, wobbler, R2R etc.) The indices/ rows containing the information of the R2R are selected automatically. * shape: 2D * E.g. (number of channels, samples to acquire) Returns: ndarray: array containing interferometer position in counts. * shape: (self.adc.samples_to_acquire) """ # Compute corresponding hexadecimal levels (0-15) from R2R # voltage. The np.digitize function returns the bin number # (given by bin_reference_values) in which a measured voltage # value belongs. inds = np.zeros(data[self.r2r_indices, :].shape, dtype="uint16") for row in range(4): inds[row, :] = np.digitize( data[self.r2r_indices[row], :], self.bin_reference_values[row, :] ) # Now we have the corresponding digits for each tetrade but they # are not in the correct hexadecimal position: For example: The # digit from the R2R line 13-16 might be 0x6 (hexadecimal digit # 6) but the actual value it corresponds to is 0x6000 (0x # indicates hexadecimal number). So to shift all values to their # correct position we shift the bits a corresponding multiple of # 4 to the left (np.left_shift). We need to transpose inds # because otherwise numpy can not broadcast the two arrays (inds # and [0,4,8,12]) together. To obtain the actual counter values # we now need to add all the values from all lines for each # point in time together. (E.g.: For each point in time we have # 4 values, i.e.: [0x1, 0xA0, 0x700, 0xE000] we obtain the # actual counter value by summing these values: 0xE7A1 = 59297) counter_values = np.left_shift(inds.T, np.arange(0, 16, 4)).sum(axis=1) # We now need to divide by 2 because apparently the circuit # counts the zero crossings for both photodiodes. (Technically # it only makes sense to count on one photodiode, because the # actual resolution is determined by the HeNe Wavelength.) # Afterwards we floor the values to obtain integers. Note this # is not mentioned in the datasheet and apparently is a # empirically determined phenomenon copied from LabView. return np.uint16(counter_values / 2)
def __load_bin_reference_values(self): """ Loads the bin reference values from a npy file specified in the self.bin_config_path. It saves the according reference values for each bin for each R-2R in a numpy array. (self.bin_reference_values) Note: The npy file must list the reference values a ascending order, otherwise the np.digitize() function will in the best case raise an error and in the worst case sort the data in the wrong order. """ logger.info( "Loading reference values (bins) for R2R Network configuration file for {}".format( self.name ) ) self.bin_reference_values = np.load(self.bin_config_path) #! At this point, this algorithm is basically fucked. This one #! too. def __find_r2r_indices(self): """ Finds the indices for the self.adc.array where the information for the R2R Network is located. Note: The order in which the according R2R inputs (to the ADC) are listed is not relevant, since we sort them here. BUT if naming scheme is changed, this can have catastrophic unseen consequences. It is necessary that the names e.g.: R2R1, R2R2... are kept in a way that when sorted alphabetically, R2R1 is the first entry and so on. """ self.__r2r_config = [] for key, val in self.adc.index_dict.items(): if "R2R" in key: self.__r2r_config.append(val) self.__r2r_config.sort() # ? Is it really necessary to sort? self.r2r_indices = np.array(self.__r2r_config) logger.info( "The R2R indices/row in the detector data are {}.".format(self.r2r_indices) )
[docs] def end(self): self.connection.end()
def __enter__(self): return self def __exit__(self, type, value, traceback): self.end()
if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # port="COM9" # counter = SerialInterferometerCounter(port) # counter.size_counter_table(100) # # time.sleep(1) # counter.trigger_level_pd1(2047) # counter.counter_value(123) # counter.parallel_counter_mode(1) # # counter.data_mode(0) # # # counter.set_nonvolatile_variables_to_default() # # counter_val = counter.counter_value() # # trig_level_1 = counter.trigger_level_pd1() # # trig_level_2 = counter.trigger_level_pd2() # # dac_mode = counter.dac_flag() # # data_mode = counter.data_mode() # # output_mode = counter.output_mode_counter_table() # # polarity_1 = counter.select_polarity_pd1() # # polarity_2 = counter.select_polarity_pd2() # parallel_port_mode = counter.parallel_counter_mode() # # print("counter_val:",counter_val) # # print("trig_level_1:",trig_level_1) # # print("trig_level_2:",trig_level_2) # # print("dac_mode:",dac_mode) # # print("data_mode:",data_mode) # # print("output_mode:",output_mode) # # print("polarity_1:",polarity_1) # # print("polarity_2:",polarity_2) # print("parallel_port_mode:",parallel_port_mode) # # counter.set_nonvolatile_variables_to_default() # counter.end() # Testing the combination of a slow # movement of the interferometer stage # and the reading and processing the data # from the r2r from matplotlib import pyplot as plt from analog_digital_converter import AnalogDigitalConverter as ADC import pi_control from interferometer_counter import InterferometerCounter # # Base config: from hardware_properties import HardwareProperties import data_processing as dp path = r"C:\Users\I-LAB-PC\Documents\akb_software\hardware_config_files\i-lab_hardware_properties.json" hw = HardwareProperties(path) interferometer_ctrl = hw.pi_instruments.interferometer_controller interferometer_stage_properties = ( hw.pi_instruments.interferometer_controller.stages.interferometer_stage ) # Calculate velocity and other params to smoothly record all values he_ne_wl = hw.he_ne_laser.wavelength dist = ( 25000 // 2 * he_ne_wl ) # 0.11991698320000001 #0xFFFF//2 * he_ne_wl # (dist is approx 21 mm and thus can not be actually over gone with our interferometer) speed = ( he_ne_wl * hw.adc.laser_frequency / 4 ) # (speed is approx 2mm /s whut?) #! This will probably have to be changed to a WAAAY smaller number time_to_traverse = dist / speed * 0.01 print("time to traverse: ", time_to_traverse) print("speed: ", speed) with ADC( hw.adc.device_name, hw.adc.input_configuration_path, hw.adc.samples_to_acquire, hw.adc.trigger_source, hw.adc.sampling_rate, hw.adc.laser_frequency, ) as adc, pi_control.PiController( interferometer_ctrl.model, interferometer_ctrl.stage_names, com_port=interferometer_ctrl.port, name=interferometer_ctrl.name, ) as interferometer_controller, pi_control.PiStage( interferometer_controller, interferometer_stage_properties.axis, interferometer_stage_properties.t_zero, interferometer_stage_properties.direction, interferometer_stage_properties.path_factor, interferometer_stage_properties.initial_velocity, name=interferometer_stage_properties.name, ) as interferometer_stage, InterferometerCounter( hw.interferometer_counter.port, interferometer_stage, adc, hw.interferometer_counter.config_path, ) as interferometer_counter: # Setup coherence time scan coherence_time = 4 # ps frequency = 1.5 # Hz speedupdown = 500 interferometer_stage.setup_coherence_time_scan( coherence_time, frequency, speedupdown ) # Move stage to reset position and reset counter to 0 interferometer_stage.move_mm(interferometer_stage.reset_position) interferometer_counter.reset_counter() interferometer_stage.move_mm(interferometer_stage.start_pos) adc.set_acquisition_time(1 / frequency * 5) interferometer_stage.start_wavegen() adc.read(timeout=-1) interferometer_stage.stop_wavegen() # Calculate interferometer states the way it is done int ft_2d_ir interferometer_states = ( int( np.ceil( ( interferometer_stage.amplitude - 2 * interferometer_stage.overshoot_mm ) // he_ne_wl ) ) * interferometer_stage.path_factor ) counter_data = dp.calculate_counter_values( adc.data, interferometer_counter.r2r_indices, interferometer_counter.bin_reference_values, ) print("Interferometer states that need to be observed: ", interferometer_states) if (counter_data == 0).any(): print("Lower bound is OK.") else: print("!!!Lower bound is NOT OK.!!!") if (counter_data == interferometer_states).any(): print("Upper bound is OK.") else: print("!!!Upper bound is NOT OK.!!!") # counter_data = interferometer_counter.calculate_counter_values(adc.data) fig, ax = plt.subplots(nrows=2) ax[0].plot(counter_data) ax[1].plot(adc.data[interferometer_counter.r2r_indices, :].T) plt.show()