# Helper modules
import typing
from typing import List, Tuple
import logging
import os
# Import relevant modules
import numpy as np
from numpy import ndarray
# Nidaqmx modules
import nidaqmx as ni
from nidaqmx import task as ni_task
from nidaqmx.system.device import Device
# Needed for both devices
from nidaqmx._task_modules import in_stream
from nidaqmx.constants import AcquisitionType
# NI 6225
from nidaqmx.constants import TerminalConfiguration
from nidaqmx.constants import SampleTimingType
from nidaqmx.constants import Edge
from nidaqmx.stream_readers import AnalogMultiChannelReader
# NI 6533
from nidaqmx.constants import LineGrouping
from nidaqmx.constants import Polarity
from nidaqmx.constants import Level
from nidaqmx.stream_readers import DigitalSingleChannelReader
import json
# Set up logger
logger = logging.getLogger(__name__)
# Set up 'path'-like type
Path = typing.Union[str, bytes, os.PathLike]
[docs]class AnalogDigitalConverter:
"""
Initialises connection to ADC and sets up timing and triggering
properties etc. accordingly.
It is assumed that a National Instruments nidaqmx capable device is
used as ADC. This class supports the devices **NI USB-6225 (Mass
Termination)** and **PCI-DIO-32HS (NI-6533)**.
Support for new devices can be added with relative ease (especially
when they do not require additional data sorting) by adding a new
private method which initialises the device with appropriate
settings. Also make sure to update the if clauses in relevant
methods.
Args:
device_name (str): Name of the device / ADC.
The name can be found and configured in the National Instruments
Software: 'Measurement and Automation Explorer' (MAX). The
device name should also be displayed in a pop-up when plugging
in the device.
* E.g.: "Dev0", "Dev1" etc.
input_configuration (Path): Path to json file which contains
configuration of ADC input to physical device
* I.e. 'Probe Pixel 0', 'Chopper', 'Wobbler'
samples_to_acquire (int): Number of samples per channel / input
to acquire during one sampling process.
acquisition_time (float): Time needes to acquire all samples in
seconds.
trigger_source (str): Name of input to which the trigger is
connected. For our cases the trigger should generally be a TTL
from the Laser.
* E.g: PFI14, PFI4
sampling_rate (float): Sampling rate of ADC in Hz. This is
different for different ADCs and different lasers rep rates. We
quote from the nidaqmx documentation: *Specifies the sampling
rate in samples per channel per second. If you use an external
source for the Sample Clock, set this input to the maximum
expected rate of that clock.* So we would expect that the
sampling rate corresponds to the laser frequency. For some
reason this causes issues with NI 6225. When adding too many
channels the last added channels do not read the held voltage at
the sample and hold electronics of the PreAmp. When removing the
first channels from the task the last channels start working.
This problem can be solved by increasing the sampling rate. We
recommend to set it as high as possible (for no particular
reason (this is just a hunch)) for NI 6225 this is 250000/number
of channels in task. This logic does not apply FPAS (PCI DIO
32HS) where it has to be set to 1e7 Hz. PCI DIO 32HS uses a
different timing scheme and also is not technically an ADC but
only a digital input/output channel.
laser_frequency (float): Frequency of the laser (or more
generally of the trigger) in Hz.
name (str, optional): Name / Identifier to give to this ADC.
This is relevant for log statements. Defaults to "analog to
digital converter".
Attributes:
task (object): Represents a DAQmx Task object, through which all
communication with device is managed.
product_type (str): Modelname of device. E.g. "USB-6225 (Mass
Termination)"
input_configuration (List): Contains configuration information:
[ADC input, physical device identifier, minimum voltage, maximum
voltage]. The last two entries are relevant to indicate to the
ADC in what range voltage range it needs to convert data.
Setting these values incorrectly leads to either a lower
(voltage) resolution or a cutoff of the signal.
number_of_channels (int): Number of channels/ADC inputs that are
read.
data (ndarray): Array containing data of latest .read() process.
If .read() method has not been calles yet, or
.set_samples_to_acquire() has been used, this array contains
random data!
* shape: (number_of_channels, samples_to_acquire)
entangled_data (ndarray): (If applicable) Array containing entangled data of FPAS ADC.
* shape: (128*samples_to_acquire)
stream (object): Object that exposes an input data stream on a DAQmx task. Required to read
ADC data directly into numpy array.
channel_reader (object): Object that reads samples from input channels directly into numpy
array.
pixel_idx (ndarray): Array holding indices that refer to the rows in the adc data
which are MCT Pixels. The array holds all indices of the probe pixels
in the first half and the indices of all reference pixel in the second
half.
probe_pixel_idx (ndarray): Array holding indices that refer to the rows in the adc data
which are MCT Probe Pixels.
reference_pixel_idx (ndarray): Array holding indices that refer to the rows in the adc data
which are MCT Reference Pixels
index_dict (dict): Dictionary that holds names of different devices/ inputs that are connected
to adc as keys and the index of the row in the adc data as value.
ToDo:
Add attributes pixel_idx etc. to docstring. XXXGW completed, right?
References:
| https://nidaqmx-python.readthedocs.io/en/latest/index.html
| https://nidaqmx-python.readthedocs.io/en/latest/task.html
| https://www.ni.com/de-de/support/documentation/supplemental/06/ni-daqmx-installed-documentation.html
"""
def __init__(
self,
device_name: str,
input_configuration: Path,
samples_to_acquire: int,
trigger_source: str,
sampling_rate: float,
laser_frequency: float,
name: str = "analog to digital converter",
):
# Set up attributes
self.device_name = device_name
self.input_configuration_path = input_configuration
self.samples_to_acquire = samples_to_acquire
self.trigger_source = trigger_source
self.sampling_rate = sampling_rate
self.laser_frequency = laser_frequency
self.name = name
self.__calculate_acquisition_time()
# Load input config
self.__read_input_configuration_file()
# Indicates whether the task has been started and can be read
# directly or whether it has to be started before reading data.
# If False task has not been started, if True a task has been
# started and the .read() method does not have to call .start().
# Set task state to False because it task has not been started.
self.__task_state = False
# Determine device product type
self.device = Device(
device_name
) # Create device object to get information of device.
self.product_type = self.device.product_type # Get product name of device
logger.info(
"Device product name for {} is: {}".format(self.name, self.product_type)
)
# Initialise corresponding device.
if self.product_type == "USB-6225 (Mass Termination)":
logger.info("Initialising NI-6225 for {}".format(self.name))
self.__initialise_NI6225()
elif self.product_type == "PCI-DIO-32HS":
logger.info("Initialising PCI-DIO-32HS (NI-6533) for {}".format(self.name))
# Generate sorting index map to sort data coming from device
self._old_idx, self._new_idx = self.__generate_index_map_PCI_DIO_32HS()
self.__generate_voltage_per_adc_count_array()
self.__initialise_PCI_DIO_32HS()
def __read_input_configuration_file(self):
"""
Read/Load the names and the input terminals from a json file.
"""
logger.info("Loading input configuration file for {}".format(self.name))
with open(self.input_configuration_path) as json_file:
# Load json file into dictionary
self.input_configuration = json.load(json_file)
# Create ndarrays containing indices refering to rows that
# collect data from pixels and a dictionary that tell us the
# index (row) in the data that corresponds to a certain name
self.pixel_idx = []
self.probe_pixel_idx = []
self.reference_pixel_idx = []
self.index_dict = {}
for index, key in enumerate(self.input_configuration.keys()):
self.index_dict[key] = index
if "pixel" in key:
self.pixel_idx.append(index)
if "probe" in key:
self.probe_pixel_idx.append(index)
elif "reference" in key:
self.reference_pixel_idx.append(index)
self.pixel_idx = np.array(self.pixel_idx)
self.probe_pixel_idx = np.array(self.probe_pixel_idx)
self.reference_pixel_idx = np.array(self.reference_pixel_idx)
def __initialise_NI6225(self):
"""
Initialises a connection to a NI-6225 (Mass-Termination) device.
This implicitly sets voltage reference, timing properties and
trigger properties.
* Voltage reference is set to RSE for all inputs.
* Timing is set to acquire a sample on the rising edge of the
trigger input. The buffer is set to acquire a finite amount of
samples.
* A start trigger is configured such that (once a task is
started) the acquisition commences on the rising edge of the
trigger input.
Also, an array to write the data into is preallocated.
References:
| https://knowledge.ni.com/KnowledgeArticleDetails?id=kA00Z0000019QRZSA2&l=de-DE
| https://knowledge.ni.com/KnowledgeArticleDetails?id=kA00Z000000P7KdSAK&l=de-DE
| http://zone.ni.com/reference/en-XX/help/370466AH-01/mxcncpts/smpletimingtype/
"""
# Initialise nidaqmx task object
self.task = ni_task.Task(self.name)
logger.info("Initialized nidaqmx task for {}.".format(self.name))
# Add channels to task
for name, config in self.input_configuration.items():
channel = config["channel"][0]
min_voltage = config["minimum voltage"][0]
max_voltage = config["maximum voltage"][0]
self.task.ai_channels.add_ai_voltage_chan(
"/{}/{}".format(self.device_name, channel),
name_to_assign_to_channel=name,
terminal_config=TerminalConfiguration.RSE,
min_val=min_voltage,
max_val=max_voltage,
)
logger.info(
"Adding all channels successfull for {} ({})".format(
self.name, self.product_type
)
)
# Todo FIND OUT MAX AND MIN VOLTAGES FROM DETECTOR AMP
# Setting correct timing preferences: time on rising edge
self.task.timing.cfg_samp_clk_timing(
self.sampling_rate, # TODO testen ob man unter der Niquist Frequenz auflösen kann.
source=self.trigger_source,
active_edge=Edge.RISING, # Set timing on rising edge
sample_mode=AcquisitionType.FINITE, # Collect finite amount of samples
samps_per_chan=self.samples_to_acquire,
)
logger.info(
"Set up timing properties for {}({})".format(self.name, self.product_type)
)
# Setting up start trigger on rising edge
self.task.triggers.start_trigger.cfg_dig_edge_start_trig(
trigger_source=self.trigger_source, trigger_edge=Edge.RISING
) # Start acquisition on rising edge
logger.info(
"Set up start trigger for {}({})".format(self.name, self.product_type)
)
# Set number of channels in the task as class attribute
self.number_of_channels = self.task.number_of_channels
# Preallocate array in which data can be written repeatedly
self.data = np.empty((self.task.number_of_channels, self.samples_to_acquire))
# Put task 'into stream' this is done so that data can be read
# directly into numpy array.
self.stream = in_stream.InStream(self.task)
# Setup AnalogMultiChannelReader this is done so that data can
# be read directly into numpy array.
self.channel_reader = AnalogMultiChannelReader(self.stream)
logger.info(
"Put task in stream and instantiated DigitalSingleChannelReader array for {}".format(
self.name
)
)
def __initialise_PCI_DIO_32HS(self):
"""
Initialises a connection to a PCI-DIO-32HS (NI-6533) device.
This implicitly sets timing properties and trigger properties.
* It is not completley clear why the setting are chosen the way
they are. This code was translated from LabView. I guess:
contact the company that built the FPAS device.
Also, arrays to write the data into are preallocated.
References:
| https://knowledge.ni.com/KnowledgeArticleDetails?id=kA00Z000000P7KdSAK&l=de-DE
| https://www.ni.com/documentation/de/ni-daqmx/latest/mxcncpts/bursthandsignals/
| http://zone.ni.com/reference/en-XX/help/370466AH-01/mxcncpts/smpletimingtype/
"""
# Initialise nidaqmx task object
self.task = ni_task.Task(self.name)
logger.info("Initialized nidaqmx task for {}.".format(self.name))
# Add channel to task
self.task.di_channels.add_di_chan(
"/{}/port0_32".format(self.device_name), # In this case hard coded inputs
name_to_assign_to_lines="Entangled {} data".format(self.name),
line_grouping=LineGrouping.CHAN_FOR_ALL_LINES,
) # (Copied from old LabView Code)
# Setting correct timing preferences: time on rising edge
# (copied from old labview code)
self.task.timing.cfg_burst_handshaking_timing_export_clock(
self.sampling_rate, # for the hlab (FPAS) this should be fixed at 1e7 Hz
sample_clk_outp_term=self.trigger_source,
sample_mode=AcquisitionType.FINITE, # Collect finite amount of samples,
samps_per_chan=self.samples_to_acquire
* 128, # For the FPAS ADC to read all samples from buffer we need to multiply by 128
sample_clk_pulse_polarity=Polarity.ACTIVE_HIGH, # Don't know what that signifies
pause_when=Level.HIGH,
ready_event_active_level=Polarity.ACTIVE_HIGH,
) # Don't know what that signifies
# Set number of channels in the task as class attribute In this
# case (for FPAS) it needs to be set manually
self.number_of_channels = 144
# Preallocate arrays in which entangled and sorted data can be
# written repeatedly
self.data = np.empty((144, self.samples_to_acquire))
self.entangled_data = np.empty(self.samples_to_acquire * 128, dtype="uint32")
self._sorted_data = np.empty(self.samples_to_acquire * 128 * 2, dtype="uint16")
# Put task 'into stream' this is done so that data can be read
# directly into numpy array.
self.stream = in_stream.InStream(self.task)
# Setup DigitalSingleChannelReader this is done so that data can
# be read directly into numpy array.
self.channel_reader = DigitalSingleChannelReader(self.stream)
logger.info(
"Put task in stream and instantiated DigitalSingleChannelReader object for {}".format(
self.name
)
)
def __generate_index_map_PCI_DIO_32HS(self) -> Tuple[ndarray, ndarray]:
"""
Generates the arrays that sort indices of the PCI_DIO_32HS
device data correctly.
The indexes need to be sorted in the following way: staring at
1: the 1st and the 16th need to switched. Then add 32 to both
indices and swap those as well. And so on. Once you arrive at
the boundary. Start at first number (i.e. 1) add 2 (i.e. 3) and
repeat. Do this for a total of eight times. This is method is
equivalent to following code:
.. code-block::
t1 = np.empty(16 * 8)
t2 = np.empty(16 * 8)
for i in range(8):
t1[i * 16 : (i + 1) * 16] = 2 * i + np.array(
(1, 16, 33, 48, 65, 80, 97, 112, 129, 144, 161, 176, 193, 208, 225, 240)
)
t2[i * 16 : (i + 1) * 16] = 2 * i + np.array(
(16, 1, 48, 33, 80, 65, 112, 97, 144, 129, 176, 161, 208, 193, 240, 225)
)
Returns:
Tuple[ndarray, ndarray]: The first array corresponds to the old indices and the
second array contains the new indices.
"""
eight = np.arange(8) # 0,1,2,3,4,5,6,7
stack = np.tile([1, 16], 65) # 1, 16, 1, 16 for a total of 65 times
base = np.repeat(eight * 2, 16) + np.tile(
np.repeat(eight * 32, 2), 8
) # Georgs magic
return base + stack[:128], base + stack[1:129] # old indices, new indices
def __generate_voltage_per_adc_count_array(self):
"""
Generate array to multiply with adc counts to get corresponding
voltage values.
This method takes the minimum and maximum voltage from the input
configuration file and calculates the corresponding voltage
range for each pixel. The voltage per adc count is then obtained
by dividing the voltage range array by 2^16.
Note:
Deprecated. The H-Lab FPAS ADC always counts 5V as 0xFFFF
even when the maximum input voltage is loser. I.e. channel
143 has a voltage range of 0-2.5 V. 2.5 V will correspond
(approximately) to 0xFFFF/2.
"""
# Preallocate array to that holds voltage range for each input
# (which are in total 144)
voltage_range = np.zeros((144, 1))
# Load and calculate voltage range from input configuration for
# each input
for idx, config in enumerate(self.input_configuration.values()):
min_voltage = config["minimum voltage"][0]
max_voltage = config["maximum voltage"][0]
voltage_range[idx] = max_voltage - min_voltage
# Divide by 2^16 to obtain the voltage (V) per ADC count.
self.__voltage_per_adc_count = voltage_range / 0xFFFF
def __calculate_acquisition_time(self):
"""
Calculates the acquisition time in seconds given the samples
that are to be acquired and the laser frequency / repetition
rate.
"""
self.acquisition_time = self.samples_to_acquire / self.laser_frequency
return self.acquisition_time
[docs] def sort_PCI_DIO_32HS(self) -> ndarray:
"""
Takes the entangled data of the class attribute
self.entangled_data from FPAS ADC and sorts it.
This code was translated from a corresponding LabView VI. A
'bug' was found in this VI and corrected here (Division by
0xFFFF instead of 0x10000). The FPAS ADC has a total of 144
inputs: 128 of them are pixels the other 16 are open connectors.
The FPAS ADC samples with 16bit. To enable maximum
transferspeeds 32 bit data is transferred. Each data point can
be split into two values. The first 16 binary digits correspond
to one value of the ADC the last 16 binary digits are a
different value of the ADC. After seperating them they have to
be assigned to the corresponding input.
Returns:
ndarray: Sorted data.
* shape: (144, self.samples_to_aquire),
144 represents the number of inputs
(self.number_of_channels).
"""
self.entangled_data = np.roll(
self.entangled_data, -1
) # Shift data to the left by one element
self._sorted_data[0::2] = np.bitwise_and(
self.entangled_data, int("FFFF", base=16)
) # Filter last 16 binary digits
self._sorted_data[1::2] = np.right_shift(
self.entangled_data, 16
) # Filter first 16 binary digits
self._reshaped_data = self._sorted_data.reshape(
int(self._sorted_data.size / 256), 256
) # Reshape data into 256 rows and corresponding amount of columns
self._reshaped_data[:, self._new_idx] = self._reshaped_data[
:, self._old_idx
] # Sort data / Assign to correct input
self.data = self._reshaped_data.T[
:144
] # Only the first 144 rows are relevant inputs
# ? Check if .copy needs to be added. -> Does not seem so.
# We want to have Volts, not ADC counts. So we multiply with the
# voltage per adc count array to convert ADC counts to voltages.
# For information concering FPAS ADC see the Note in
# __generate_voltage_per_adc_count_array.
# self.data = self.data * self.__voltage_per_adc_count
self.data = self.data * (5 / 0xFFFF)
return self.data
def __str__(self):
a = "Analog to digital converter object, identifier: {}\n".format(self.name)
b = "device name: {}\n".format(self.device_name)
c = "product type/ model name: {}\n".format(self.product_type)
d = "serial no: {}\n".format(self.device.dev_serial_num)
e = "samples to acquire: {}\n".format(self.samples_to_acquire)
f = "time needed to acquire samples: {} s \n".format(
self.samples_to_acquire / self.laser_frequency
)
g = (
"task is currently idle\n"
if self.is_done()
else "task is busy with acquisition\n"
)
h = "laser frequency: {} Hz\n".format(self.laser_frequency)
i = "number of channels in task: {}\n".format(self.number_of_channels)
j = "trigger input source: {}\n".format(self.trigger_source)
return a + b + c + d + e + f + g + h + i + j
[docs] def start(self):
"""
Starts nidaqmx task and therefore data acquisition.
Technically speaking, it starts nidaqmx task and then starts
acquisition once trigger activates. This method will not start
the task when the task is still active.
Note:
This method will *not* cause the python interpreter to
halt, which is advantageous for processes that need to run
simultaneously. To then read the data from the buffer
without an acquisition delay, use the self.read() method.
"""
if self.task.is_task_done(): # Only start if prior task has been completed.
self.task.stop() # Stop 'old' task before starting anew
self.task.start()
# Set task state attribute to True.
self.__task_state = True
logger.info("Task for {} was started.".format(self.name))
else:
logger.warning(
"Task for {} cannot be started, because it is still running.".format(
self.name
)
)
[docs] def stop(self):
"""
Stops nidaqmx task and sets task state to False.
This is necessary to call before a new task is started and will
be called at the end of the read method.
"""
self.task.stop()
self.__task_state = False
[docs] def set_samples_to_acquire(self, samples_to_acquire: int):
"""
Sets the number of samples per channel that should be acquired.
E.g.: If the task is started it will acquire samples_to_acquire
samples for each channel and write them into a buffer.
Args:
samples_to_acquire (int): Number of samples to acquire per
channel.
"""
# Check whether samples to acquire actually have to be set.
if samples_to_acquire == self.samples_to_acquire:
return
# Update Attributes
self.samples_to_acquire = samples_to_acquire
self.__calculate_acquisition_time()
self.task.stop() # Stop task because property can only be set when task is not running
if self.product_type == "USB-6225 (Mass Termination)":
self.task.timing.samp_quant_samp_per_chan = self.samples_to_acquire
# Preallocate array in which data can be written repeatedly
self.data = np.empty(
(self.task.number_of_channels, self.samples_to_acquire)
)
logger.info("Added new, empty self.data ndarray to {}".format(self.name))
elif self.product_type == "PCI-DIO-32HS":
# For the FPAS ADC to read all samples from buffer we need
# to multiply by 128
# self.task.timing.samp_quant_samp_per_chan =
# self.samples_to_acquire*128 To set the samples to acquire
# we need to "re"set all timing settings. If we do it by
# only changing the attribute (see USB-6225 if clause) We
# run into a bug where the acquisition sometime is never
# done.
# ? Is this also required for USB 6225?
logger.info(
"""Closing and reinitializing task to change number
of samples to acquire for {}.""".format(
self.name
)
)
self.task.close()
self.__initialise_PCI_DIO_32HS()
logger.info(
"Set task (buffer etc.) of {} to acquire {} samples per channel.".format(
self.name, self.samples_to_acquire
)
)
[docs] def set_acquisition_time(self, time: float):
"""
Set time in seconds for which the acquisition should run.
Instead of setting the number of samples that should be
acquired, this method takes the time period in seconds for which
the acquisition should run and sets the according number of
samples that should be acquired per channel.
Args:
time (float): time period in seconds for which data should
be acquired.
"""
samples_to_acquire = int(
time * self.laser_frequency
) # Round to next lowest integer
self.set_samples_to_acquire(samples_to_acquire)
[docs] def read(self, timeout: float = None, sort: bool = True) -> ndarray:
"""
Reads the data from the buffer, saves it to self.data and
returns it.
Once called this method will wait until all samples have become
available, so if not all samples have been read into the buffer,
this will halt the interpreter. On the other hand, if all
samples are already available in the buffer this method is not
going to halt the interpreter. For maximum time efficiency use
self.start() to write samples into buffer without halting the
interpreter. By default, the FPAS ADC data is sorted. This takes
about 1 ms per 1000 samples per channel. If this is too slow,
set sort parameter to false. The method will then return the
entangled data instead. Use self.sort_PCI_DIO_32HS() method to
sort data afterwards.
Args:
timeout (float, optional): Specifies the amount of time in
seconds to wait for samples to become available. If the time
elapses, the method returns an error and any samples read
before the timeout elapsed. If you set timeout to
nidaqmx.constants.WAIT_INFINITELY or to -1 the method waits
indefinitely. If you set timeout to 0, the method tries once
to read the requested samples and returns an error if it is
unable to. Defaults to None. This will set the timeout to
1.1 * acquisition time.
sort (bool, optional): Only relevant for FPAS ADC. If it is
set to True this method returns the sorted data. If it is
not it returns the entangled data. *This data is
additionally accessible through self.entangled_data.
Defaults to True.*
Warning:
If set to False, then self.data potentially holds a preallocated ndarray containing
random data!
Returns:
ndarray: ADC Data.
* shape: (self.number_of_channels, self.samples_to_acquire).
Shape differs for FPAS ADC if data is requested unsorted.
"""
# Set timeout to acquisition time plus buffer if it was not
# specified
if not timeout:
timeout = 1.5 * self.acquisition_time
if timeout < 0.5:
timeout = 0.5
# Start task if has not been started yet.
if self.__task_state == False:
logger.info("Starting task automatically for {}".format(self.name))
self.start()
if self.product_type == "USB-6225 (Mass Termination)":
samples_read = self.channel_reader.read_many_sample(
self.data, timeout=timeout
) # Read data into self.data numpy array
logger.info(
"Read {} samples per channel to {}.data".format(samples_read, self.name)
)
# Stop task and reset task state
self.stop()
return self.data
elif self.product_type == "PCI-DIO-32HS":
samples_read = self.channel_reader.read_many_sample_port_uint32(
self.entangled_data, timeout=timeout
) # Read data into self.entangled_data numpy array
logger.info(
"Read {} samples to {}.entangled_data".format(samples_read, self.name)
)
# Stop task and reset task state
self.stop()
if sort:
self.sort_PCI_DIO_32HS() # Sort data
logger.info(
"Sorted {}.entangled_data to {}.data".format(self.name, self.name)
)
return self.data
else:
return self.entangled_data
[docs] def wait(self, timeout: float = 10):
"""
Wait for measurement/data acquisition to complete by halting the
interpreter.
Args:
timeout (float, optional): Specifies the amount of time in
seconds to wait for samples to become available. If the time
elapses, the method returns an error and any samples read
before the timeout elapsed. If you set timeout to
nidaqmx.constants. WAIT_INFINITELY or to -1 the method waits
indefinitely. If you set timeout to 0, the method tries once
to read the requested samples and returns an error if it is
unable to. Defaults to 10.
"""
logger.info("Waiting for acquisition for {} to complete.".format(self.name))
self.task.wait_until_done(timeout=timeout)
logger.info("Acquisition for {} done.".format(self.name))
[docs] def is_done(self) -> bool:
"""
Check if measurement/data acquisition is complete.
Returns:
bool: True if acquisition is complete, False if not.
"""
return self.task.is_task_done()
[docs] def end(self):
"""
Stops and ends nidaqmx task.
"""
self.task.stop()
logger.info("{} task was stopped.".format(self.name))
self.task.close()
logger.info("{} task was closed.".format(self.name))
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.end()
[docs]def timetest():
t0 = time.time()
adc.start()
adc.wait(timeout=-1)
t1 = time.time()
adc.read()
t2 = time.time()
print("Time waited: ", t1 - t0, "seconds")
print("Time to read samples: ", t2 - t1, "seconds")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
from analog_digital_converter import AnalogDigitalConverter as ADC
from matplotlib import pyplot as plt
import time
from scipy.fft import rfft
import numpy as np
# Test PCI_DIO_32HS
with ADC(
"Dev1",
r"hardware_config_files\I-Lab analog input configuration.json",
1000,
"PFI8",
3470,
997,
name="NI6225",
) as adc:
# Check whether set acquisition time is working:
t = 60 * 5 # unit in seconds
adc.set_acquisition_time(t)
if adc.samples_to_acquire == t * adc.laser_frequency:
print("Number of channels:", adc.number_of_channels)
print("Nominal: set_acquisition time is working as intended.")
print("Acquisition time: ", t, "seconds")
print("Samples to acquire: ", adc.samples_to_acquire)
else:
print("Error: set_acquisition time is NOT working.")
adc.read()
noise_spectrum = np.abs(rfft(adc.data[-2]))
freq_axis = (
np.arange(noise_spectrum.size)
* adc.laser_frequency
/ adc.samples_to_acquire
)
fig, ax = plt.subplots(nrows=2)
ax[0].plot(
np.linspace(
0, adc.acquisition_time, adc.samples_to_acquire, endpoint=False
),
adc.data[-2],
)
ax[0].set_xlabel("time [s]")
ax[0].set_ylabel("voltage [V]")
ax[1].plot(freq_axis, noise_spectrum)
ax[1].set_xlabel("frequency [Hz]")
ax[1].set_ylabel("amplitude [a.u.]")
ax[1].grid()
plt.show()
# channel = 20
# print(adc.is_done(), "<- This should be True")
# # timetest()
# fig, ax = plt.subplots(nrows=8, ncols=8)
# for _ in range(1000):
# print(_)
# adc.read()
# for i in range(63):
# xy = np.unravel_index(i, (8,8))
# ax[xy].cla()
# ax[xy].plot(adc.data[i])
# fig.show()
# plt.pause(5)
# plt.plot(adc.data[channel])
# plt.show()
# for _ in range(10):
# adc.read()
# print("done")
# timetest()
# plt.plot(adc.data[channel])
# plt.show()
# adc.set_samples_to_acquire(100)
# adc.set_samples_to_acquire(1000)
# adc.set_samples_to_acquire(1100)
# for i in range(1, 6001, 1000):
# adc.set_samples_to_acquire(i)
# print("samples to acquire:", i)
# timetest()
# plt.plot(adc.data[channel])
# plt.show()
# adc.start()
# a = time.time()
# print("When not calling .wait adc is done:", adc.is_done())
# adc.read()
# print("When not waiting for acquisition to complete it takes: ",
# time.time()-a, "seconds to read the data.")
# print("Calling .read() without starting task manually.")
# adc.read()
# # adc.end()
# adc.set_samples_to_acquire(30)
# adc.start()
# print(adc)
# print(adc.is_done())
# t1 = time.time()
# adc.wait(timeout=-1)
# print(time.time()-t1)
# adc.read()
# print("part2")
# # adc.set_acquisition_time(2)
# print("reading without staring task")
# t1 = time.time()
# adc.start()
# adc.read(sort=True, timeout=-1)
# print(time.time()-t1)
# plt.plot(adc.data[136])
# plt.show()
# adc.read(sort=True, timeout=-1)
# plt.plot(adc.data[139])
# plt.show()
# adc.end()
# # Test USB-6225 (Mass Termination)
# ilab = ADC("Dev1", r"hardware_config_files\I-Lab analog input configuration.json", 1000, "PFI14", 1000, name="NI_USB")
# ilab.set_samples_to_acquire(250)
# ilab.start()
# print(ilab)
# print(ilab.is_done())
# ilab.wait(timeout=-1)
# print(time.time()-t1)
# ilab.read()
# print("part2")
# ilab.set_acquisition_time(10.5)
# print("reading without staring task")
# t1 = time.time()
# ilab.read(timeout=-1)
# print(time.time()-t1)
# ilab.end()
# print("completed")