Source code for hardware_interfaces.analog_digital_converter

# Helper modules
import typing
from typing import List, Tuple
import logging
import os

# Import relevant modules
import numpy as np
from numpy import ndarray

# Nidaqmx modules
import nidaqmx as ni
from nidaqmx import task as ni_task
from nidaqmx.system.device import Device

# Needed for both devices
from nidaqmx._task_modules import in_stream
from nidaqmx.constants import AcquisitionType

# NI 6225
from nidaqmx.constants import TerminalConfiguration
from nidaqmx.constants import SampleTimingType
from nidaqmx.constants import Edge
from nidaqmx.stream_readers import AnalogMultiChannelReader

# NI 6533
from nidaqmx.constants import LineGrouping
from nidaqmx.constants import Polarity
from nidaqmx.constants import Level
from nidaqmx.stream_readers import DigitalSingleChannelReader


import json

# Set up logger
logger = logging.getLogger(__name__)

# Set up 'path'-like type
Path = typing.Union[str, bytes, os.PathLike]


[docs]class AnalogDigitalConverter: """ Initialises connection to ADC and sets up timing and triggering properties etc. accordingly. It is assumed that a National Instruments nidaqmx capable device is used as ADC. This class supports the devices **NI USB-6225 (Mass Termination)** and **PCI-DIO-32HS (NI-6533)**. Support for new devices can be added with relative ease (especially when they do not require additional data sorting) by adding a new private method which initialises the device with appropriate settings. Also make sure to update the if clauses in relevant methods. Args: device_name (str): Name of the device / ADC. The name can be found and configured in the National Instruments Software: 'Measurement and Automation Explorer' (MAX). The device name should also be displayed in a pop-up when plugging in the device. * E.g.: "Dev0", "Dev1" etc. input_configuration (Path): Path to json file which contains configuration of ADC input to physical device * I.e. 'Probe Pixel 0', 'Chopper', 'Wobbler' samples_to_acquire (int): Number of samples per channel / input to acquire during one sampling process. acquisition_time (float): Time needes to acquire all samples in seconds. trigger_source (str): Name of input to which the trigger is connected. For our cases the trigger should generally be a TTL from the Laser. * E.g: PFI14, PFI4 sampling_rate (float): Sampling rate of ADC in Hz. This is different for different ADCs and different lasers rep rates. We quote from the nidaqmx documentation: *Specifies the sampling rate in samples per channel per second. If you use an external source for the Sample Clock, set this input to the maximum expected rate of that clock.* So we would expect that the sampling rate corresponds to the laser frequency. For some reason this causes issues with NI 6225. When adding too many channels the last added channels do not read the held voltage at the sample and hold electronics of the PreAmp. When removing the first channels from the task the last channels start working. This problem can be solved by increasing the sampling rate. We recommend to set it as high as possible (for no particular reason (this is just a hunch)) for NI 6225 this is 250000/number of channels in task. This logic does not apply FPAS (PCI DIO 32HS) where it has to be set to 1e7 Hz. PCI DIO 32HS uses a different timing scheme and also is not technically an ADC but only a digital input/output channel. laser_frequency (float): Frequency of the laser (or more generally of the trigger) in Hz. name (str, optional): Name / Identifier to give to this ADC. This is relevant for log statements. Defaults to "analog to digital converter". Attributes: task (object): Represents a DAQmx Task object, through which all communication with device is managed. product_type (str): Modelname of device. E.g. "USB-6225 (Mass Termination)" input_configuration (List): Contains configuration information: [ADC input, physical device identifier, minimum voltage, maximum voltage]. The last two entries are relevant to indicate to the ADC in what range voltage range it needs to convert data. Setting these values incorrectly leads to either a lower (voltage) resolution or a cutoff of the signal. number_of_channels (int): Number of channels/ADC inputs that are read. data (ndarray): Array containing data of latest .read() process. If .read() method has not been calles yet, or .set_samples_to_acquire() has been used, this array contains random data! * shape: (number_of_channels, samples_to_acquire) entangled_data (ndarray): (If applicable) Array containing entangled data of FPAS ADC. * shape: (128*samples_to_acquire) stream (object): Object that exposes an input data stream on a DAQmx task. Required to read ADC data directly into numpy array. channel_reader (object): Object that reads samples from input channels directly into numpy array. pixel_idx (ndarray): Array holding indices that refer to the rows in the adc data which are MCT Pixels. The array holds all indices of the probe pixels in the first half and the indices of all reference pixel in the second half. probe_pixel_idx (ndarray): Array holding indices that refer to the rows in the adc data which are MCT Probe Pixels. reference_pixel_idx (ndarray): Array holding indices that refer to the rows in the adc data which are MCT Reference Pixels index_dict (dict): Dictionary that holds names of different devices/ inputs that are connected to adc as keys and the index of the row in the adc data as value. ToDo: Add attributes pixel_idx etc. to docstring. XXXGW completed, right? References: | https://nidaqmx-python.readthedocs.io/en/latest/index.html | https://nidaqmx-python.readthedocs.io/en/latest/task.html | https://www.ni.com/de-de/support/documentation/supplemental/06/ni-daqmx-installed-documentation.html """ def __init__( self, device_name: str, input_configuration: Path, samples_to_acquire: int, trigger_source: str, sampling_rate: float, laser_frequency: float, name: str = "analog to digital converter", ): # Set up attributes self.device_name = device_name self.input_configuration_path = input_configuration self.samples_to_acquire = samples_to_acquire self.trigger_source = trigger_source self.sampling_rate = sampling_rate self.laser_frequency = laser_frequency self.name = name self.__calculate_acquisition_time() # Load input config self.__read_input_configuration_file() # Indicates whether the task has been started and can be read # directly or whether it has to be started before reading data. # If False task has not been started, if True a task has been # started and the .read() method does not have to call .start(). # Set task state to False because it task has not been started. self.__task_state = False # Determine device product type self.device = Device( device_name ) # Create device object to get information of device. self.product_type = self.device.product_type # Get product name of device logger.info( "Device product name for {} is: {}".format(self.name, self.product_type) ) # Initialise corresponding device. if self.product_type == "USB-6225 (Mass Termination)": logger.info("Initialising NI-6225 for {}".format(self.name)) self.__initialise_NI6225() elif self.product_type == "PCI-DIO-32HS": logger.info("Initialising PCI-DIO-32HS (NI-6533) for {}".format(self.name)) # Generate sorting index map to sort data coming from device self._old_idx, self._new_idx = self.__generate_index_map_PCI_DIO_32HS() self.__generate_voltage_per_adc_count_array() self.__initialise_PCI_DIO_32HS() def __read_input_configuration_file(self): """ Read/Load the names and the input terminals from a json file. """ logger.info("Loading input configuration file for {}".format(self.name)) with open(self.input_configuration_path) as json_file: # Load json file into dictionary self.input_configuration = json.load(json_file) # Create ndarrays containing indices refering to rows that # collect data from pixels and a dictionary that tell us the # index (row) in the data that corresponds to a certain name self.pixel_idx = [] self.probe_pixel_idx = [] self.reference_pixel_idx = [] self.index_dict = {} for index, key in enumerate(self.input_configuration.keys()): self.index_dict[key] = index if "pixel" in key: self.pixel_idx.append(index) if "probe" in key: self.probe_pixel_idx.append(index) elif "reference" in key: self.reference_pixel_idx.append(index) self.pixel_idx = np.array(self.pixel_idx) self.probe_pixel_idx = np.array(self.probe_pixel_idx) self.reference_pixel_idx = np.array(self.reference_pixel_idx) def __initialise_NI6225(self): """ Initialises a connection to a NI-6225 (Mass-Termination) device. This implicitly sets voltage reference, timing properties and trigger properties. * Voltage reference is set to RSE for all inputs. * Timing is set to acquire a sample on the rising edge of the trigger input. The buffer is set to acquire a finite amount of samples. * A start trigger is configured such that (once a task is started) the acquisition commences on the rising edge of the trigger input. Also, an array to write the data into is preallocated. References: | https://knowledge.ni.com/KnowledgeArticleDetails?id=kA00Z0000019QRZSA2&l=de-DE | https://knowledge.ni.com/KnowledgeArticleDetails?id=kA00Z000000P7KdSAK&l=de-DE | http://zone.ni.com/reference/en-XX/help/370466AH-01/mxcncpts/smpletimingtype/ """ # Initialise nidaqmx task object self.task = ni_task.Task(self.name) logger.info("Initialized nidaqmx task for {}.".format(self.name)) # Add channels to task for name, config in self.input_configuration.items(): channel = config["channel"][0] min_voltage = config["minimum voltage"][0] max_voltage = config["maximum voltage"][0] self.task.ai_channels.add_ai_voltage_chan( "/{}/{}".format(self.device_name, channel), name_to_assign_to_channel=name, terminal_config=TerminalConfiguration.RSE, min_val=min_voltage, max_val=max_voltage, ) logger.info( "Adding all channels successfull for {} ({})".format( self.name, self.product_type ) ) # Todo FIND OUT MAX AND MIN VOLTAGES FROM DETECTOR AMP # Setting correct timing preferences: time on rising edge self.task.timing.cfg_samp_clk_timing( self.sampling_rate, # TODO testen ob man unter der Niquist Frequenz auflösen kann. source=self.trigger_source, active_edge=Edge.RISING, # Set timing on rising edge sample_mode=AcquisitionType.FINITE, # Collect finite amount of samples samps_per_chan=self.samples_to_acquire, ) logger.info( "Set up timing properties for {}({})".format(self.name, self.product_type) ) # Setting up start trigger on rising edge self.task.triggers.start_trigger.cfg_dig_edge_start_trig( trigger_source=self.trigger_source, trigger_edge=Edge.RISING ) # Start acquisition on rising edge logger.info( "Set up start trigger for {}({})".format(self.name, self.product_type) ) # Set number of channels in the task as class attribute self.number_of_channels = self.task.number_of_channels # Preallocate array in which data can be written repeatedly self.data = np.empty((self.task.number_of_channels, self.samples_to_acquire)) # Put task 'into stream' this is done so that data can be read # directly into numpy array. self.stream = in_stream.InStream(self.task) # Setup AnalogMultiChannelReader this is done so that data can # be read directly into numpy array. self.channel_reader = AnalogMultiChannelReader(self.stream) logger.info( "Put task in stream and instantiated DigitalSingleChannelReader array for {}".format( self.name ) ) def __initialise_PCI_DIO_32HS(self): """ Initialises a connection to a PCI-DIO-32HS (NI-6533) device. This implicitly sets timing properties and trigger properties. * It is not completley clear why the setting are chosen the way they are. This code was translated from LabView. I guess: contact the company that built the FPAS device. Also, arrays to write the data into are preallocated. References: | https://knowledge.ni.com/KnowledgeArticleDetails?id=kA00Z000000P7KdSAK&l=de-DE | https://www.ni.com/documentation/de/ni-daqmx/latest/mxcncpts/bursthandsignals/ | http://zone.ni.com/reference/en-XX/help/370466AH-01/mxcncpts/smpletimingtype/ """ # Initialise nidaqmx task object self.task = ni_task.Task(self.name) logger.info("Initialized nidaqmx task for {}.".format(self.name)) # Add channel to task self.task.di_channels.add_di_chan( "/{}/port0_32".format(self.device_name), # In this case hard coded inputs name_to_assign_to_lines="Entangled {} data".format(self.name), line_grouping=LineGrouping.CHAN_FOR_ALL_LINES, ) # (Copied from old LabView Code) # Setting correct timing preferences: time on rising edge # (copied from old labview code) self.task.timing.cfg_burst_handshaking_timing_export_clock( self.sampling_rate, # for the hlab (FPAS) this should be fixed at 1e7 Hz sample_clk_outp_term=self.trigger_source, sample_mode=AcquisitionType.FINITE, # Collect finite amount of samples, samps_per_chan=self.samples_to_acquire * 128, # For the FPAS ADC to read all samples from buffer we need to multiply by 128 sample_clk_pulse_polarity=Polarity.ACTIVE_HIGH, # Don't know what that signifies pause_when=Level.HIGH, ready_event_active_level=Polarity.ACTIVE_HIGH, ) # Don't know what that signifies # Set number of channels in the task as class attribute In this # case (for FPAS) it needs to be set manually self.number_of_channels = 144 # Preallocate arrays in which entangled and sorted data can be # written repeatedly self.data = np.empty((144, self.samples_to_acquire)) self.entangled_data = np.empty(self.samples_to_acquire * 128, dtype="uint32") self._sorted_data = np.empty(self.samples_to_acquire * 128 * 2, dtype="uint16") # Put task 'into stream' this is done so that data can be read # directly into numpy array. self.stream = in_stream.InStream(self.task) # Setup DigitalSingleChannelReader this is done so that data can # be read directly into numpy array. self.channel_reader = DigitalSingleChannelReader(self.stream) logger.info( "Put task in stream and instantiated DigitalSingleChannelReader object for {}".format( self.name ) ) def __generate_index_map_PCI_DIO_32HS(self) -> Tuple[ndarray, ndarray]: """ Generates the arrays that sort indices of the PCI_DIO_32HS device data correctly. The indexes need to be sorted in the following way: staring at 1: the 1st and the 16th need to switched. Then add 32 to both indices and swap those as well. And so on. Once you arrive at the boundary. Start at first number (i.e. 1) add 2 (i.e. 3) and repeat. Do this for a total of eight times. This is method is equivalent to following code: .. code-block:: t1 = np.empty(16 * 8) t2 = np.empty(16 * 8) for i in range(8): t1[i * 16 : (i + 1) * 16] = 2 * i + np.array( (1, 16, 33, 48, 65, 80, 97, 112, 129, 144, 161, 176, 193, 208, 225, 240) ) t2[i * 16 : (i + 1) * 16] = 2 * i + np.array( (16, 1, 48, 33, 80, 65, 112, 97, 144, 129, 176, 161, 208, 193, 240, 225) ) Returns: Tuple[ndarray, ndarray]: The first array corresponds to the old indices and the second array contains the new indices. """ eight = np.arange(8) # 0,1,2,3,4,5,6,7 stack = np.tile([1, 16], 65) # 1, 16, 1, 16 for a total of 65 times base = np.repeat(eight * 2, 16) + np.tile( np.repeat(eight * 32, 2), 8 ) # Georgs magic return base + stack[:128], base + stack[1:129] # old indices, new indices def __generate_voltage_per_adc_count_array(self): """ Generate array to multiply with adc counts to get corresponding voltage values. This method takes the minimum and maximum voltage from the input configuration file and calculates the corresponding voltage range for each pixel. The voltage per adc count is then obtained by dividing the voltage range array by 2^16. Note: Deprecated. The H-Lab FPAS ADC always counts 5V as 0xFFFF even when the maximum input voltage is loser. I.e. channel 143 has a voltage range of 0-2.5 V. 2.5 V will correspond (approximately) to 0xFFFF/2. """ # Preallocate array to that holds voltage range for each input # (which are in total 144) voltage_range = np.zeros((144, 1)) # Load and calculate voltage range from input configuration for # each input for idx, config in enumerate(self.input_configuration.values()): min_voltage = config["minimum voltage"][0] max_voltage = config["maximum voltage"][0] voltage_range[idx] = max_voltage - min_voltage # Divide by 2^16 to obtain the voltage (V) per ADC count. self.__voltage_per_adc_count = voltage_range / 0xFFFF def __calculate_acquisition_time(self): """ Calculates the acquisition time in seconds given the samples that are to be acquired and the laser frequency / repetition rate. """ self.acquisition_time = self.samples_to_acquire / self.laser_frequency return self.acquisition_time
[docs] def sort_PCI_DIO_32HS(self) -> ndarray: """ Takes the entangled data of the class attribute self.entangled_data from FPAS ADC and sorts it. This code was translated from a corresponding LabView VI. A 'bug' was found in this VI and corrected here (Division by 0xFFFF instead of 0x10000). The FPAS ADC has a total of 144 inputs: 128 of them are pixels the other 16 are open connectors. The FPAS ADC samples with 16bit. To enable maximum transferspeeds 32 bit data is transferred. Each data point can be split into two values. The first 16 binary digits correspond to one value of the ADC the last 16 binary digits are a different value of the ADC. After seperating them they have to be assigned to the corresponding input. Returns: ndarray: Sorted data. * shape: (144, self.samples_to_aquire), 144 represents the number of inputs (self.number_of_channels). """ self.entangled_data = np.roll( self.entangled_data, -1 ) # Shift data to the left by one element self._sorted_data[0::2] = np.bitwise_and( self.entangled_data, int("FFFF", base=16) ) # Filter last 16 binary digits self._sorted_data[1::2] = np.right_shift( self.entangled_data, 16 ) # Filter first 16 binary digits self._reshaped_data = self._sorted_data.reshape( int(self._sorted_data.size / 256), 256 ) # Reshape data into 256 rows and corresponding amount of columns self._reshaped_data[:, self._new_idx] = self._reshaped_data[ :, self._old_idx ] # Sort data / Assign to correct input self.data = self._reshaped_data.T[ :144 ] # Only the first 144 rows are relevant inputs # ? Check if .copy needs to be added. -> Does not seem so. # We want to have Volts, not ADC counts. So we multiply with the # voltage per adc count array to convert ADC counts to voltages. # For information concering FPAS ADC see the Note in # __generate_voltage_per_adc_count_array. # self.data = self.data * self.__voltage_per_adc_count self.data = self.data * (5 / 0xFFFF) return self.data
def __str__(self): a = "Analog to digital converter object, identifier: {}\n".format(self.name) b = "device name: {}\n".format(self.device_name) c = "product type/ model name: {}\n".format(self.product_type) d = "serial no: {}\n".format(self.device.dev_serial_num) e = "samples to acquire: {}\n".format(self.samples_to_acquire) f = "time needed to acquire samples: {} s \n".format( self.samples_to_acquire / self.laser_frequency ) g = ( "task is currently idle\n" if self.is_done() else "task is busy with acquisition\n" ) h = "laser frequency: {} Hz\n".format(self.laser_frequency) i = "number of channels in task: {}\n".format(self.number_of_channels) j = "trigger input source: {}\n".format(self.trigger_source) return a + b + c + d + e + f + g + h + i + j
[docs] def start(self): """ Starts nidaqmx task and therefore data acquisition. Technically speaking, it starts nidaqmx task and then starts acquisition once trigger activates. This method will not start the task when the task is still active. Note: This method will *not* cause the python interpreter to halt, which is advantageous for processes that need to run simultaneously. To then read the data from the buffer without an acquisition delay, use the self.read() method. """ if self.task.is_task_done(): # Only start if prior task has been completed. self.task.stop() # Stop 'old' task before starting anew self.task.start() # Set task state attribute to True. self.__task_state = True logger.info("Task for {} was started.".format(self.name)) else: logger.warning( "Task for {} cannot be started, because it is still running.".format( self.name ) )
[docs] def stop(self): """ Stops nidaqmx task and sets task state to False. This is necessary to call before a new task is started and will be called at the end of the read method. """ self.task.stop() self.__task_state = False
[docs] def set_samples_to_acquire(self, samples_to_acquire: int): """ Sets the number of samples per channel that should be acquired. E.g.: If the task is started it will acquire samples_to_acquire samples for each channel and write them into a buffer. Args: samples_to_acquire (int): Number of samples to acquire per channel. """ # Check whether samples to acquire actually have to be set. if samples_to_acquire == self.samples_to_acquire: return # Update Attributes self.samples_to_acquire = samples_to_acquire self.__calculate_acquisition_time() self.task.stop() # Stop task because property can only be set when task is not running if self.product_type == "USB-6225 (Mass Termination)": self.task.timing.samp_quant_samp_per_chan = self.samples_to_acquire # Preallocate array in which data can be written repeatedly self.data = np.empty( (self.task.number_of_channels, self.samples_to_acquire) ) logger.info("Added new, empty self.data ndarray to {}".format(self.name)) elif self.product_type == "PCI-DIO-32HS": # For the FPAS ADC to read all samples from buffer we need # to multiply by 128 # self.task.timing.samp_quant_samp_per_chan = # self.samples_to_acquire*128 To set the samples to acquire # we need to "re"set all timing settings. If we do it by # only changing the attribute (see USB-6225 if clause) We # run into a bug where the acquisition sometime is never # done. # ? Is this also required for USB 6225? logger.info( """Closing and reinitializing task to change number of samples to acquire for {}.""".format( self.name ) ) self.task.close() self.__initialise_PCI_DIO_32HS() logger.info( "Set task (buffer etc.) of {} to acquire {} samples per channel.".format( self.name, self.samples_to_acquire ) )
[docs] def set_acquisition_time(self, time: float): """ Set time in seconds for which the acquisition should run. Instead of setting the number of samples that should be acquired, this method takes the time period in seconds for which the acquisition should run and sets the according number of samples that should be acquired per channel. Args: time (float): time period in seconds for which data should be acquired. """ samples_to_acquire = int( time * self.laser_frequency ) # Round to next lowest integer self.set_samples_to_acquire(samples_to_acquire)
[docs] def read(self, timeout: float = None, sort: bool = True) -> ndarray: """ Reads the data from the buffer, saves it to self.data and returns it. Once called this method will wait until all samples have become available, so if not all samples have been read into the buffer, this will halt the interpreter. On the other hand, if all samples are already available in the buffer this method is not going to halt the interpreter. For maximum time efficiency use self.start() to write samples into buffer without halting the interpreter. By default, the FPAS ADC data is sorted. This takes about 1 ms per 1000 samples per channel. If this is too slow, set sort parameter to false. The method will then return the entangled data instead. Use self.sort_PCI_DIO_32HS() method to sort data afterwards. Args: timeout (float, optional): Specifies the amount of time in seconds to wait for samples to become available. If the time elapses, the method returns an error and any samples read before the timeout elapsed. If you set timeout to nidaqmx.constants.WAIT_INFINITELY or to -1 the method waits indefinitely. If you set timeout to 0, the method tries once to read the requested samples and returns an error if it is unable to. Defaults to None. This will set the timeout to 1.1 * acquisition time. sort (bool, optional): Only relevant for FPAS ADC. If it is set to True this method returns the sorted data. If it is not it returns the entangled data. *This data is additionally accessible through self.entangled_data. Defaults to True.* Warning: If set to False, then self.data potentially holds a preallocated ndarray containing random data! Returns: ndarray: ADC Data. * shape: (self.number_of_channels, self.samples_to_acquire). Shape differs for FPAS ADC if data is requested unsorted. """ # Set timeout to acquisition time plus buffer if it was not # specified if not timeout: timeout = 1.5 * self.acquisition_time if timeout < 0.5: timeout = 0.5 # Start task if has not been started yet. if self.__task_state == False: logger.info("Starting task automatically for {}".format(self.name)) self.start() if self.product_type == "USB-6225 (Mass Termination)": samples_read = self.channel_reader.read_many_sample( self.data, timeout=timeout ) # Read data into self.data numpy array logger.info( "Read {} samples per channel to {}.data".format(samples_read, self.name) ) # Stop task and reset task state self.stop() return self.data elif self.product_type == "PCI-DIO-32HS": samples_read = self.channel_reader.read_many_sample_port_uint32( self.entangled_data, timeout=timeout ) # Read data into self.entangled_data numpy array logger.info( "Read {} samples to {}.entangled_data".format(samples_read, self.name) ) # Stop task and reset task state self.stop() if sort: self.sort_PCI_DIO_32HS() # Sort data logger.info( "Sorted {}.entangled_data to {}.data".format(self.name, self.name) ) return self.data else: return self.entangled_data
[docs] def wait(self, timeout: float = 10): """ Wait for measurement/data acquisition to complete by halting the interpreter. Args: timeout (float, optional): Specifies the amount of time in seconds to wait for samples to become available. If the time elapses, the method returns an error and any samples read before the timeout elapsed. If you set timeout to nidaqmx.constants. WAIT_INFINITELY or to -1 the method waits indefinitely. If you set timeout to 0, the method tries once to read the requested samples and returns an error if it is unable to. Defaults to 10. """ logger.info("Waiting for acquisition for {} to complete.".format(self.name)) self.task.wait_until_done(timeout=timeout) logger.info("Acquisition for {} done.".format(self.name))
[docs] def is_done(self) -> bool: """ Check if measurement/data acquisition is complete. Returns: bool: True if acquisition is complete, False if not. """ return self.task.is_task_done()
[docs] def end(self): """ Stops and ends nidaqmx task. """ self.task.stop() logger.info("{} task was stopped.".format(self.name)) self.task.close() logger.info("{} task was closed.".format(self.name))
def __enter__(self): return self def __exit__(self, type, value, traceback): self.end()
[docs]def timetest(): t0 = time.time() adc.start() adc.wait(timeout=-1) t1 = time.time() adc.read() t2 = time.time() print("Time waited: ", t1 - t0, "seconds") print("Time to read samples: ", t2 - t1, "seconds")
if __name__ == "__main__": logging.basicConfig(level=logging.INFO) from analog_digital_converter import AnalogDigitalConverter as ADC from matplotlib import pyplot as plt import time from scipy.fft import rfft import numpy as np # Test PCI_DIO_32HS with ADC( "Dev1", r"hardware_config_files\I-Lab analog input configuration.json", 1000, "PFI8", 3470, 997, name="NI6225", ) as adc: # Check whether set acquisition time is working: t = 60 * 5 # unit in seconds adc.set_acquisition_time(t) if adc.samples_to_acquire == t * adc.laser_frequency: print("Number of channels:", adc.number_of_channels) print("Nominal: set_acquisition time is working as intended.") print("Acquisition time: ", t, "seconds") print("Samples to acquire: ", adc.samples_to_acquire) else: print("Error: set_acquisition time is NOT working.") adc.read() noise_spectrum = np.abs(rfft(adc.data[-2])) freq_axis = ( np.arange(noise_spectrum.size) * adc.laser_frequency / adc.samples_to_acquire ) fig, ax = plt.subplots(nrows=2) ax[0].plot( np.linspace( 0, adc.acquisition_time, adc.samples_to_acquire, endpoint=False ), adc.data[-2], ) ax[0].set_xlabel("time [s]") ax[0].set_ylabel("voltage [V]") ax[1].plot(freq_axis, noise_spectrum) ax[1].set_xlabel("frequency [Hz]") ax[1].set_ylabel("amplitude [a.u.]") ax[1].grid() plt.show() # channel = 20 # print(adc.is_done(), "<- This should be True") # # timetest() # fig, ax = plt.subplots(nrows=8, ncols=8) # for _ in range(1000): # print(_) # adc.read() # for i in range(63): # xy = np.unravel_index(i, (8,8)) # ax[xy].cla() # ax[xy].plot(adc.data[i]) # fig.show() # plt.pause(5) # plt.plot(adc.data[channel]) # plt.show() # for _ in range(10): # adc.read() # print("done") # timetest() # plt.plot(adc.data[channel]) # plt.show() # adc.set_samples_to_acquire(100) # adc.set_samples_to_acquire(1000) # adc.set_samples_to_acquire(1100) # for i in range(1, 6001, 1000): # adc.set_samples_to_acquire(i) # print("samples to acquire:", i) # timetest() # plt.plot(adc.data[channel]) # plt.show() # adc.start() # a = time.time() # print("When not calling .wait adc is done:", adc.is_done()) # adc.read() # print("When not waiting for acquisition to complete it takes: ", # time.time()-a, "seconds to read the data.") # print("Calling .read() without starting task manually.") # adc.read() # # adc.end() # adc.set_samples_to_acquire(30) # adc.start() # print(adc) # print(adc.is_done()) # t1 = time.time() # adc.wait(timeout=-1) # print(time.time()-t1) # adc.read() # print("part2") # # adc.set_acquisition_time(2) # print("reading without staring task") # t1 = time.time() # adc.start() # adc.read(sort=True, timeout=-1) # print(time.time()-t1) # plt.plot(adc.data[136]) # plt.show() # adc.read(sort=True, timeout=-1) # plt.plot(adc.data[139]) # plt.show() # adc.end() # # Test USB-6225 (Mass Termination) # ilab = ADC("Dev1", r"hardware_config_files\I-Lab analog input configuration.json", 1000, "PFI14", 1000, name="NI_USB") # ilab.set_samples_to_acquire(250) # ilab.start() # print(ilab) # print(ilab.is_done()) # ilab.wait(timeout=-1) # print(time.time()-t1) # ilab.read() # print("part2") # ilab.set_acquisition_time(10.5) # print("reading without staring task") # t1 = time.time() # ilab.read(timeout=-1) # print(time.time()-t1) # ilab.end() # print("completed")