Thursday, December 31, 2020

Data acquisition from a Dataq Instruments DI-2108 with Python

This program is broken into two parts and I am using Jupyter lab.  The first cell handles establishing the serial connection and setting parameters on the DAQ.  The second part handles the data collection.

import serial
import serial.tools.list_ports
import time
"""
for Dataq model DI-2108
0x0000 = Analog channel 0, ±10 V range
0x0001 = Analog channel 1, ±10 V range
srate min = 375
srate max = 65535
dec min = 1
dec max = 512
deca min = 1
deca max = 40,000
dividend = 60,000,000
sample rate (hz) = dividend / (srate * dec * deca)
"""
decimation = 1
srate = 3000
dividend = 60000000
sampleRate = dividend / (decimation * srate)
samplePoints = 500
#for analog channel 0 and 1
slist = [0x0000, 0x0001]
analog_ranges = [10]
serialPort=serial.Serial()
def discover():
    available_ports = list(serial.tools.list_ports.comports())
    hooked_port = "" 
    for p in available_ports:
        # Do we have a DATAQ Instruments device?
        if ("VID:PID=0683" in p.hwid):
            hooked_port = p.device
            break
    if hooked_port:
        print("Found a DATAQ Instruments device on",hooked_port)
        serialPort.timeout = 0
        serialPort.port = hooked_port
        serialPort.baudrate = '115200'
        serialPort.open()
        return True
    else:
        print("Please connect a DATAQ Instruments device")
        return False
    
def send_cmd(command):
    serialPort.write((command+'\r').encode())
    time.sleep(.01)
    
if discover():
    send_cmd("stop")
    send_cmd("encode 0")
    send_cmd("ps 0")
    send_cmd("slist "+ str(0) + " " + str(0))
    send_cmd("slist "+ str(1) + " " + str(1))
    
    send_cmd("dec " + str(decimation))
    send_cmd("srate " + str(srate))
    
    print("sample rate = ", sampleRate)
    serialPort.close()


The second part of the program handles the data collection.  In this example the first two analog channels have a ~500khz signal.  I reversed the polarity on channel 2 so it is inverted.  The program collects 500 data points and the frequency is calculated using Numpy and the fft library.  Since this is not a mixed signal and there is only one fundamental frequency, a fast fourier transform is not the most efficient tool.  Since the signal is centered about zero the crossing values can be found with linear interpolation.  The crossing values are plotted and the frequency can be found from the slope of this line using Numpy polyfit.



import serial
import serial.tools.list_ports
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.ticker import (MultipleLocator, AutoMinorLocator)
import numpy as np
from numpy.fft import fft
import time
samplePoints = 500
serialPort.open()
data1 = []
data2 = []
count = 0
serialPort.reset_input_buffer()
send_cmd("start")
while(serialPort.inWaiting() < 4):
    pass
while (count < samplePoints+10):
    data1.append(int.from_bytes(serialPort.read(2), byteorder='little', signed=True))
    data2.append(int.from_bytes(serialPort.read(2), byteorder='little', signed=True))
    #data2.append(data1[count])
    count+=1     
send_cmd("stop")
serialPort.close()
for badData in range(10):
    data1.pop(0)
    data2.pop(0)
scaledData1=[]
scaledData2=[]
for number in data1:
    scaledData1.append(10 * number / 32768)
for number in data2:
    scaledData2.append(10 * number / 32768)
time=[]
xAxis = np.arange(0, len(data1))
for value in xAxis:
    time.append(value/sampleRate)
fig, ax = plt.subplots(6)
fig.set_size_inches(16, 20)
ax[0].plot(time,scaledData1, linewidth=1, color = 'b', marker='o',ms = 4,  markeredgecolor='k')
ax[1].plot(time,scaledData2, linewidth=1, color = 'b', marker='o',ms = 4,  markeredgecolor='k')
transform1 = fft(scaledData1)
transform2 = fft(scaledData2)
endTime = len(time) / sampleRate
frequencyAxis = np.arange(0, sampleRate, 1/endTime)
partialSample = int(samplePoints/8)
ax[2].bar(frequencyAxis[0:partialSample],abs(transform1)[0:partialSample], width = 10)
ax[2].xaxis.set_minor_locator(AutoMinorLocator())
ax[2].tick_params(which='both', width=2)
ax[2].tick_params(which='major', length=10)
ax[2].tick_params(which='minor', length=5, color='gray')
ax[2].xaxis.set_minor_locator(MultipleLocator(25))
ax[2].xaxis.set_major_locator(MultipleLocator(250))
ax[3].bar(frequencyAxis[0:partialSample],abs(transform2)[0:partialSample], width = 10)
ax[3].xaxis.set_minor_locator(AutoMinorLocator())
ax[3].tick_params(which='both', width=2)
ax[3].tick_params(which='major', length=10)
ax[3].tick_params(which='minor', length=5, color='gray')
ax[3].xaxis.set_minor_locator(MultipleLocator(25))
ax[3].xaxis.set_major_locator(MultipleLocator(250))
def interpolate(x1,x3,y1,y2,y3):
    x2 = x1 + (x3-x1) * (y2-y1) / (y3-y1)
    return x2
crossingValuesList1 = []
for index in range(len(data1)-1):
    if data1[index] * data1[index+1] < 1:
        crossingValuesList1.append(interpolate(time[index], time[index+1], data1[index], 0, data1[index+1]))
ax[4].plot(range(0,len(crossingValuesList1)),crossingValuesList1, linewidth=1, color = 'b', marker='o',ms = 4,  markeredgecolor='k')        
if len(crossingValuesList1) > 0:
    frequency1 = .5/(np.polyfit(range(0,len(crossingValuesList1)), crossingValuesList1, 1))[0]
    ax[0].scatter(crossingValuesList1, [0] * len(crossingValuesList1), marker='x', s=30, c='red')
    
crossingValuesList2 = []
for index in range(len(data2)-1):
    if (data2[index]) * (data2[index+1]) < 1:
        crossingValuesList2.append(interpolate(time[index], time[index+1], data2[index], 0, data2[index+1]))
ax[5].plot(range(0,len(crossingValuesList2)),crossingValuesList2, linewidth=1, color = 'b', marker='o',ms = 4,  markeredgecolor='k')        
if len(crossingValuesList2) > 0:
    frequency2 = .5/(np.polyfit(range(0,len(crossingValuesList2)), crossingValuesList2, 1))[0]
    ax[1].scatter(crossingValuesList2, [0] * len(crossingValuesList2), marker='x', s=30, c='red')
print("Frequency 1: ", "{:.1f}".format(frequency1))
print("Frequency 2: ", "{:.1f}".format(frequency2))
plt.savefig('output.png')    
plt.show()
serialPort.close()

No comments:

Post a Comment