-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathLabjackClient.py
168 lines (143 loc) · 7.81 KB
/
LabjackClient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
from labjack import ljm
import logging
class LabJackT7Driver:
voltage_ranges = {
"10V": 10.0,
"1V": 1.0,
"0.1V": 0.1,
"0.01V": 0.01,
"-1V": 1.0 # Special case for -1V to 1V range
}
def __init__(self, ip_address="172.18.120.132", connection_type="ETHERNET", voltage_range="-1V"):
self.handle = None
self.num_analog_inputs = 16 # Default for 16 analog inputs
self.channel_rms_flags = {} # Store per-channel RMS flags
self.channel_types = {} # Store channel measurement types (single-ended or differential)
self.channel_scaling_factors = {} # Store per-channel scaling factors
self.voltage_range = voltage_range
self.ip_address = ip_address
self.connection_type = connection_type
self.resolution_index = 0 # Auto-resolution by default
def set_scaling_factor(self, channel, scaling_factor):
self.channel_scaling_factors[channel] = scaling_factor
logging.info(f"Set scaling factor for AIN{channel} to {scaling_factor}")
def start(self):
try:
self.handle = ljm.openS("T7", self.connection_type, self.ip_address)
self.device_info = ljm.getHandleInfo(self.handle)
logging.info(f"Opened LabJack T7: {self.device_info}")
for i in range(self.num_analog_inputs):
self.set_range(i, self.voltage_range)
self.set_resolution_index(i, 0) # Auto-resolution index
except ljm.LJMError as e:
logging.error(f"Error opening LabJack T7: {e}")
raise Exception(f"Failed to start LabJack: {e}")
def stop(self):
try:
ljm.eStreamStop(self.handle)
logging.info("Stream stopped.")
except ljm.LJMError as e:
logging.warning(f"Stream not running or error stopping: {e}")
def close(self):
if self.handle:
ljm.close(self.handle)
logging.info("Closed LabJack connection.")
def set_range(self, channel, voltage_range):
if voltage_range not in self.voltage_ranges:
raise ValueError(f"Invalid voltage range: {voltage_range}")
try:
ljm.eWriteName(self.handle, f"AIN{channel}_RANGE", self.voltage_ranges[voltage_range])
logging.info(f"Set AIN{channel} range to {voltage_range}")
except ljm.LJMError as e:
raise Exception(f"Failed to set voltage range for channel {channel}: {e}")
def set_resolution_index(self, channel, resolution_index):
try:
ljm.eWriteName(self.handle, f"AIN{channel}_RESOLUTION_INDEX", resolution_index)
logging.info(f"Set AIN{channel} resolution index to {resolution_index}")
except ljm.LJMError as e:
raise Exception(f"Failed to set resolution index for channel {channel}: {e}")
def configure_measurement_type(self, channel, measurement_type="single-ended", differential_negative_channel=None):
if measurement_type == "single-ended":
negative_channel_val = 199 # GND for single-ended
elif measurement_type == "differential":
if differential_negative_channel is None:
raise ValueError("You must specify a valid negative channel for differential measurements.")
negative_channel_val = differential_negative_channel
else:
raise ValueError("Invalid measurement type. Use 'single-ended' or 'differential'.")
try:
ljm.eWriteName(self.handle, f"AIN{channel}_NEGATIVE_CH", negative_channel_val)
logging.info(f"Configured AIN{channel} for {measurement_type} measurement with negative channel {negative_channel_val}")
self.channel_types[channel] = measurement_type # Store the channel type
except ljm.LJMError as e:
raise Exception(f"Failed to configure measurement type for channel {channel}: {e}")
def set_channel_rms(self, channel, rms_enabled, num_scans=200, scan_time_microseconds=10000):
"""
Enable or disable FlexRMS for a channel by setting the AIN#_EF_INDEX to the FlexRMS feature (index 44).
Also allows setting the number of scans and scan time for FlexRMS functionality.
:param channel: Channel number
:param rms_enabled: Whether to enable or disable RMS
:param num_scans: Number of scans to use for FlexRMS (default 200)
:param scan_time_microseconds: Total time in microseconds for the FlexRMS scan (default 10,000 µs = 10 ms)
"""
if rms_enabled:
try:
# Reset any existing extended feature on this channel before enabling FlexRMS
ljm.eWriteName(self.handle, f"AIN{channel}_EF_INDEX", 0)
logging.info(f"Reset existing extended feature for AIN{channel}")
# Enable FlexRMS mode for this channel (EF_INDEX 10)
ljm.eWriteName(self.handle, f"AIN{channel}_EF_INDEX", 10) # Set to FlexRMS (EF_INDEX 10)
ljm.eWriteName(self.handle, f"AIN{channel}_EF_CONFIG_A", num_scans) # Set number of scans
ljm.eWriteName(self.handle, f"AIN{channel}_EF_CONFIG_B", scan_time_microseconds) # Set scan time
logging.info(f"Enabled FlexRMS for AIN{channel} with {num_scans} scans over {scan_time_microseconds} µs")
except ljm.LJMError as e:
raise Exception(f"Failed to enable FlexRMS for AIN{channel}: {e}")
else:
try:
# Disable FlexRMS mode for this channel
ljm.eWriteName(self.handle, f"AIN{channel}_EF_INDEX", 0) # Disable extended feature
logging.info(f"Disabled FlexRMS for AIN{channel}")
except ljm.LJMError as e:
raise Exception(f"Failed to disable FlexRMS for AIN{channel}: {e}")
self.channel_rms_flags[channel] = rms_enabled
def read_samples(self):
results = {}
for channel in range(self.num_analog_inputs):
try:
if self.channel_rms_flags.get(channel, False):
# Try to read FlexRMS value from the AIN#_EF_READ_A register
try:
value = ljm.eReadName(self.handle, f"AIN{channel}_EF_READ_A")
logging.info(f"Read FlexRMS value for AIN{channel}: {value}")
value = abs(value) # Ensure the FlexRMS value is non-negative
except ljm.LJMError as e:
if "AIN_EF_COULD_NOT_FIND_PERIOD" in str(e):
logging.warning(f"Could not find valid FlexRMS period for AIN{channel}. Returning 0.")
value = 0 # Return 0 or a default value
else:
raise e # Re-raise other errors
else:
# Read normal voltage value
value = ljm.eReadName(self.handle, f"AIN{channel}")
logging.info(f"Read value for AIN{channel}: {value}")
# Apply the scaling factor if it exists
scaling_factor = self.channel_scaling_factors.get(channel, 1)
value *= scaling_factor
results[f"AIN{channel}"] = value
except ljm.LJMError as e:
logging.error(f"Error encountered while reading from AIN{channel}: {e}")
# Restart the device to recover from the error
self.restart_device()
break # Exit the loop to avoid further issues in this read cycle
return results
def restart_device(self):
"""
Attempt to restart the LabJack device connection.
"""
try:
logging.info("Restarting LabJack connection...")
self.close() # Close the existing connection
self.start() # Re-open the connection
logging.info("LabJack connection restarted successfully.")
except ljm.LJMError as e:
logging.error(f"Failed to restart LabJack connection: {e}")