-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Aniket
committed
Jul 3, 2023
0 parents
commit cfe074e
Showing
20 changed files
with
3,589 additions
and
0 deletions.
There are no files selected for viewing
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
""" | ||
Class object to control the Tricontinent C-Series Syringe Pumps | ||
Author: Aniket Chitre | ||
Date: July 2022 | ||
""" | ||
|
||
import serial | ||
import time | ||
|
||
# See configuration jumpers on manual pp.22 | ||
# J2: 1, 5 installed, J9: none installed on all pumps except last - 1,2 installed on final pump | ||
# Followed RS-232 cabling diagram | ||
# Pumps in series communicate over RS-485, hence no RS-485 termination jumpers on all but last pump | ||
|
||
|
||
class C3000_pump: | ||
|
||
baud_rate = 9600 # communications rate - set by J2 jumper #4 | ||
to = 1 # timeout | ||
pause_time = 2 # delay required between writing and reading to the serial object | ||
|
||
# Key strings for pump control | ||
beg_char = '/' # Start of command | ||
end_char = '\r' # End of command | ||
run = 'R' # Run. Execute the command string. Not required for certain classes of commands - e.g., reporting commands | ||
init_CW = 'Z' # Initialise with CW configuration - sets valve output to the right - see diagram pp. 50 | ||
# Pump initialises by default in the "N0" normal increment mode - full stroke = 3000 increments | ||
|
||
valve_to_input = 'I' | ||
valve_to_output = 'O' | ||
valve_to_bypass = 'B' | ||
|
||
# Plunger moves at a start velocity and accelerates to a "top velocity" and decelerates back down - motion controlled by driver automatically | ||
set_start_vel = 'v' # power-up default = 900 increment/sec, range: 1 - 1000 | ||
set_top_vel = 'V' # power-up default = 1400 increment/sec, range: 1 - 6000 | ||
set_ramp_slopes = 'L' # see pp. 66 - default: 14 (= 35,000 increment/s^2) - I think <L10> will be suitable (25,000 increment/s^2) | ||
# n.b. power-up defaults to fast - cause cavitation when using 1/16" tubing | ||
|
||
absolute_pos = 'A' # range: 0 - 3000 - volume dispensed = displacement of plunger | ||
rel_pos_down = 'P' # aspirate | ||
rel_pos_up = 'D' # dispense | ||
|
||
syringe_vol = 1000 # uL | ||
max_N = 3000 # max number of increments for C3000 | ||
|
||
# default start-up status prior to pump connection or initialisation | ||
con_status = False | ||
init_status = False | ||
dose_cmd = '' | ||
|
||
def __init__(self, port, address): | ||
self.ser = None | ||
self.port = port | ||
self.address = address | ||
|
||
def connect(self): | ||
self.ser = serial.Serial(port=self.port, baudrate=self.baud_rate, timeout=self.to) | ||
self.__setattr__('con_status', True) | ||
self.ser.close() # close the connection, so another pump can be opened over the same COM port | ||
return self.con_status | ||
|
||
# all method calls enclosed with the serial port being opened and closed on the first and penultimate lines | ||
# crucial to avoid conflicts on port access | ||
|
||
def initialise(self): | ||
self.ser.open() | ||
init_str = self.beg_char + self.address + self.init_CW + self.run + self.end_char | ||
init_byt = init_str.encode('utf-8') | ||
self.ser.write(init_byt) | ||
self.__setattr__('init_status', True) | ||
self.ser.close() | ||
return self.init_status | ||
|
||
def prime(self, prime_cycles): | ||
self.ser.open() | ||
# g opens the loop - G closes the loop and prime_cycles is the num_iter around this loop | ||
# g IA3000 (set to input valve and fill the syringe fully) OA0 (empty syringe completely) G | ||
prime_str = self.beg_char + self.address + self.init_CW + \ | ||
self.set_start_vel + '50' + self.set_top_vel + '200' + 'L1' + \ | ||
'g' + \ | ||
self.valve_to_input + self.absolute_pos + '3000' \ | ||
+ self.valve_to_output + self.absolute_pos + '0' + \ | ||
'G' + prime_cycles + self.run + self.end_char | ||
prime_byt = prime_str.encode('utf-8') | ||
self.ser.write(prime_byt) | ||
# checks whether the pump is busy executing a command or idle and accordingly prints when priming is complete | ||
status_check_str = self.beg_char + self.address + 'Q' + self.end_char | ||
status_check_byt = status_check_str.encode('utf-8') | ||
pump_status = 'busy' # default value, during the command call, the pump is busy | ||
while pump_status == 'busy': | ||
self.ser.write(status_check_byt) | ||
time.sleep(0.2) | ||
pump_check = str(self.ser.read_until('').decode('utf-8')) | ||
if pump_check[2] == '@': # manual pp.90 shows 3rd character = @ if the pump is busy, or ` if it is free | ||
pass | ||
elif pump_check[2] == '`': | ||
pump_status = 'free' # only break the while loop once the task is complete and pump is idle again | ||
self.ser.close() | ||
return print("Priming of pump complete") | ||
|
||
# Titrating 8 mL formulations, experiments have shown ~ 500 uL acid/base is required for the titration | ||
# 2.5 mL capacity of the syringes, shall be sufficient | ||
|
||
def dose(self, vol, start_vel, top_vel): | ||
self.ser.open() | ||
# checks the current absolute position of the syringe to determine whether it needs to be filled again before dispensing | ||
pos_check_str = self.beg_char + self.address + '?' + self.end_char | ||
pos_check_byt = pos_check_str.encode('utf-8') | ||
self.ser.write(pos_check_byt) | ||
time.sleep(self.pause_time) # pause time important for the instrument to have time to write back to the computer | ||
raw_out = self.ser.read_until().decode('utf-8') | ||
abs_pos = int(raw_out.split('`')[1].split('\x03')[0]) # studied the outputted string and splitting it according to return position | ||
|
||
vol_per_N = self.syringe_vol / self.max_N # technically the finest resolution of the syringe --> 2500/3000 = 0.8333 uL | ||
num_N = int(vol / vol_per_N) # compute the increments for the screw to traverse to dispense the desired volume | ||
|
||
if abs_pos >= 0.8 * self.max_N: # if the syringe is more than 75% full directly dispense the desired amount | ||
disp_str = self.beg_char + self.address + self.set_start_vel + str(start_vel) + self.set_top_vel + str(top_vel) + 'L1' + \ | ||
self.rel_pos_up + str(num_N) + self.run + self.end_char | ||
disp_byt = disp_str.encode('utf-8') | ||
self.ser.write(disp_byt) | ||
self.__setattr__('dose_cmd', disp_str) | ||
|
||
else: # refill the syringe if it is less than 75% full | ||
# crucial that all the desired commands are encoded into a single string (ran into a bug where trying to send 2 strings consecutively fails) | ||
asp_disp_str = self.beg_char + self.address + self.set_start_vel + str(start_vel) + self.set_top_vel + str(top_vel) + 'L1' + \ | ||
self.valve_to_input + self.absolute_pos + '3000' + self.valve_to_output + self.rel_pos_up + str(num_N) + \ | ||
self.run + self.end_char | ||
asp_disp_byt = asp_disp_str.encode('utf-8') | ||
self.ser.write(asp_disp_byt) | ||
self.__setattr__('dose_cmd', asp_disp_str) | ||
# checks whether the pump is busy or idle and complete with dispensing | ||
status_check_str = self.beg_char + self.address + 'Q' + self.end_char | ||
status_check_byt = status_check_str.encode('utf-8') | ||
pump_status = 'busy' | ||
while pump_status == 'busy': | ||
self.ser.write(status_check_byt) | ||
time.sleep(0.2) | ||
pump_check = str(self.ser.read_until('').decode('utf-8')) | ||
#print(pump_check) | ||
if pump_check[2] == '@': # pp. 90 manual @ if busy, ` if free | ||
pass | ||
elif pump_check[2] == '`': | ||
pump_status = 'free' | ||
self.ser.close() | ||
|
||
if self.address =='1': | ||
return print(f"Dispensing {vol} uL base is complete") | ||
else: | ||
return print(f"Dispensing {vol} uL acid is complete") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,183 @@ | ||
""" | ||
Class object to control the Ender platform for cartesian movements of the pH robot. | ||
Author: Leong Chang Jie | ||
Date: June 2022 | ||
""" | ||
import time | ||
import numpy as np | ||
import serial | ||
import serial.tools.list_ports | ||
from debugger import Debugger | ||
debug = Debugger(show_logs=True) | ||
|
||
# %% Serial / CNC | ||
def display_ports(): | ||
""" | ||
Displays available ports. | ||
""" | ||
ports = serial.tools.list_ports.comports() | ||
for port, desc, hwid in sorted(ports): | ||
print("{}: {} [{}]".format(port, desc, hwid)) | ||
if len(ports) == 0: | ||
print("No ports detected!") | ||
print("Simulating platform...") | ||
return | ||
|
||
|
||
class CNC(object): | ||
""" | ||
Controller for cnc xyz-movements. | ||
- address: serial address of cnc Arduino | ||
""" | ||
def __init__(self, address): | ||
self.address = address | ||
# self.cnc = self.connect_cnc(address) | ||
self.current_x = 0 | ||
self.current_y = 0 | ||
self.current_z = 0 | ||
self.space_range = [(0,0,0), (0,0,0)] | ||
self.Z_safe = np.nan | ||
return | ||
|
||
def connect_cnc(self, address): | ||
""" | ||
Establish serial connection to cnc controller. | ||
- address: port address | ||
Return: serial.Serial object | ||
""" | ||
cnc = None | ||
try: | ||
cnc = serial.Serial(address, 115200, timeout=1) | ||
cnc.close() | ||
cnc.open() | ||
|
||
# Start grbl | ||
cnc.write(bytes("\r\n\r\n", 'utf-8')) | ||
time.sleep(2) | ||
cnc.flushInput() | ||
|
||
# Homing cycle | ||
cnc.write(bytes("$H\n", 'utf-8')) | ||
#print(cnc.readline()) | ||
print("CNC ready") | ||
except: | ||
pass | ||
return cnc | ||
|
||
def to_position(self, coord, z_to_safe=True, print_statement= False): | ||
""" | ||
Move cnc to absolute position in 3D | ||
- coord: (X, Y, Z) coordinates of target | ||
""" | ||
|
||
if z_to_safe and self.current_z < self.Z_safe: | ||
try: | ||
self.cnc.write(bytes("G90\n", 'utf-8')) | ||
#print(self.cnc.readline()) | ||
self.cnc.write(bytes(f"G0 Z{self.Z_safe}\n", 'utf-8')) | ||
#print(self.cnc.readline()) | ||
self.cnc.write(bytes("G90\n", 'utf-8')) | ||
#print(self.cnc.readline()) | ||
except: | ||
pass | ||
self.current_z = self.Z_safe | ||
if print_statement == True: | ||
print(f'{self.current_x}, {self.current_y}, {self.current_z}') | ||
|
||
x, y, z = coord | ||
z_first = True if self.current_z < z else False | ||
l_bound, u_bound = np.array(self.space_range) | ||
next_x = x | ||
next_y = y | ||
next_z = z | ||
next_pos = np.array([next_x, next_y, next_z]) | ||
|
||
if all(np.greater_equal(next_pos, l_bound)) and all(np.less_equal(next_pos, u_bound)): | ||
pass | ||
else: | ||
print(f"Range limits reached! {self.space_range}") | ||
return | ||
|
||
positionXY = f'X{x}Y{y}' | ||
position_Z = f'Z{z}' | ||
moves = [position_Z, positionXY] if z_first else [positionXY, position_Z] | ||
try: | ||
self.cnc.write(bytes("G90\n", 'utf-8')) | ||
#print(self.cnc.readline()) | ||
for move in moves: | ||
self.cnc.write(bytes(f"G1 {move} F20000\n", 'utf-8')) | ||
#print(self.cnc.readline()) | ||
self.cnc.write(bytes("G90\n", 'utf-8')) | ||
#print(self.cnc.readline()) | ||
except: | ||
pass | ||
|
||
self.current_x = next_x | ||
self.current_y = next_y | ||
self.current_z = next_z | ||
#print(f'{self.current_x}, {self.current_y}, {self.current_z}') | ||
return | ||
|
||
|
||
class Ender(CNC): | ||
""" | ||
XYZ controls for Ender platform. | ||
- address: serial address of cnc Arduino | ||
- space_range: range of motion of tool | ||
""" | ||
def __init__(self, address, space_range=[(0,0,0), (220,220,250)], Z_safe=90): | ||
super().__init__(address) | ||
self.cnc = self.connect_cnc(address) | ||
self.space_range = space_range | ||
self.Z_safe = Z_safe | ||
self.home() | ||
return | ||
|
||
def connect_cnc(self, address): | ||
""" | ||
Establish serial connection to cnc controller. | ||
- address: port address | ||
Return: serial.Serial object | ||
""" | ||
cnc = None | ||
try: | ||
cnc = serial.Serial(address, 115200) | ||
except: | ||
pass | ||
return cnc | ||
|
||
def home(self): | ||
""" | ||
Homing cycle for Ender platform | ||
""" | ||
try: | ||
self.cnc.write(bytes("G90\n", 'utf-8')) | ||
#print(self.cnc.readline()) | ||
self.cnc.write(bytes("G0 " + f"Z{self.Z_safe}" + "\n", 'utf-8')) | ||
#print(self.cnc.readline()) | ||
self.cnc.write(bytes("G90\n", 'utf-8')) | ||
#print(self.cnc.readline()) | ||
|
||
self.cnc.write(bytes("G28\n", 'utf-8')) | ||
|
||
self.cnc.write(bytes("G90\n", 'utf-8')) | ||
#print(self.cnc.readline()) | ||
self.cnc.write(bytes("G0 " + f"Z{self.Z_safe}" + "\n", 'utf-8')) | ||
#print(self.cnc.readline()) | ||
self.cnc.write(bytes("G90\n", 'utf-8')) | ||
#print(self.cnc.readline()) | ||
except: | ||
pass | ||
self.current_x = 0 | ||
self.current_y = 0 | ||
self.current_z = self.Z_safe | ||
print(f'{self.current_x}, {self.current_y}, {self.current_z}') | ||
try: | ||
self.cnc.write(bytes("G1 F5000\n", 'utf-8')) | ||
#print(self.cnc.readline()) | ||
except: | ||
pass | ||
return |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
""" | ||
Class object to query the pH and temperature readings from Sentron's SI 600 pH meter | ||
Author: Aniket Chitre | ||
Date: July 2022 | ||
""" | ||
|
||
import serial # https://pyserial.readthedocs.io/en/latest/pyserial.html | ||
import time | ||
|
||
class SI600_pH: | ||
|
||
baud_rate = 9600 # Communication rates from SI-meter-manual_v3.pdf pp.36: 9600 8 N 1 to receive data from the USB port | ||
pause_time = 1 # delay between transmitting to the pH meter and reading data back; too soon and nothing to read | ||
|
||
def __init__(self, port): | ||
self.port = port # Edit port according to listed port in 'Device Manager' | ||
|
||
def reading(self): | ||
self.ser = serial.Serial(port=self.port, baudrate=self.baud_rate, timeout=1) # open serial object, ensure not open elsewhere | ||
self.ser.write('ACT'.encode('utf-8')) # Manual pp.36 sending the string 'ACT' queries the pH meter | ||
time.sleep(self.pause_time) # require a delay between writing to and reading from the pH meter | ||
reading = self.ser.read_until('\r\n') # Reads data until the end of line | ||
pH = reading[26:33] # see pp. 36 of manual (or print whole string) to see data format | ||
pH = "{:.3f}".format(float(pH)) # format sliced string to obtain pH | ||
temp = reading[34:38] | ||
temp = "{:.1f}".format(float(temp)) # format sliced string to obtain temperature | ||
self.ser.close() # ensure the serial object is closed, it can be re-opened when this method is called. | ||
x = f"pH = {pH}, temp = {temp} deg C" | ||
#print(x) | ||
return x | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
""" | ||
Class object to control the speed of the pH robot stirrer and suction strength of the wash pump | ||
Author: Sarfaraz Ahamed, Aniket Chitre | ||
Date: July 2022 | ||
""" | ||
import serial | ||
import time | ||
|
||
class pH_stirrer_WashPump: | ||
|
||
baud_rate = 9600 # Communication rates for the arduino | ||
|
||
def __init__(self, port): | ||
|
||
self.port = port # Edit port according to listed port in 'Device Manager' | ||
self.ser = serial.Serial(self.port, self.baud_rate, timeout=1) # Open serial object and not elsewhere | ||
|
||
def stir_pump(self, stirrer_speed, inlet_pump, outlet_pump, print_statement=False): | ||
|
||
stir_Washpump_str = str(stirrer_speed) + ';' + str(inlet_pump) + ';' + str(outlet_pump) + ';0' + '/n' # String to send the arduino - Includes speed + separator + suction strength + end character | ||
self.ser.write(stir_Washpump_str.encode('utf-8')) # Convert the string to bytes and sends the encoded string to arduino | ||
if print_statement != False: | ||
print('Stirring at ' + str(stirrer_speed) + ' speed and wash station inlet and outlet pumps at ' + str(inlet_pump) + ' & ' + str(outlet_pump) + ' speeds, respectively.') |
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Oops, something went wrong.