-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
162 additions
and
52 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
import socket | ||
|
||
import yaml | ||
from qibo.config import log | ||
|
||
from qibolab.instruments.abstract import Instrument | ||
|
||
|
||
class TemperatureController(Instrument): | ||
"""Bluefors temperature controller. | ||
``` | ||
# Example usage | ||
if __name__ == "__main__": | ||
tc = TemperatureController("XLD1000_Temperature_Controller", "192.168.0.114", 8888) | ||
tc.connect() | ||
temperature_values = tc.read_data() | ||
for temperature_value in temperature_values: | ||
print(temperature_value) | ||
``` | ||
""" | ||
|
||
def __init__(self, name: str, address: str, port: int = 8888): | ||
"""Creation of the controller object. | ||
Args: | ||
name (str): name of the instrument. | ||
address (str): IP address of the board sending cryo temperature data. | ||
port (int): port of the board sending cryo temperature data. | ||
""" | ||
super().__init__(name, address) | ||
self.port = port | ||
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
|
||
def connect(self): | ||
"""Connect to the socket.""" | ||
if self.is_connected: | ||
return | ||
log.info(f"Bluefors connection. IP: {self.address} Port: {self.port}") | ||
self.client_socket.connect((self.address, self.port)) | ||
self.is_connected = True | ||
log.info("Bluefors Temperature Controller Connected") | ||
|
||
def disconnect(self): | ||
"""Disconnect from the socket.""" | ||
if self.is_connected: | ||
self.client_socket.close() | ||
self.is_connected = False | ||
|
||
def setup(self): | ||
"""Required by parent class, but not used here.""" | ||
|
||
def get_data(self) -> dict[str, dict[str, float]]: | ||
"""Connect to the socket and get temperature data. | ||
The typical message looks like this: | ||
flange_name: {'temperature':12.345678, 'timestamp':1234567890.123456} | ||
`timestamp` can be converted to datetime using `datetime.fromtimestamp`. | ||
Returns: | ||
message (dict[str, dict[str, float]]): socket message in this format: | ||
{"flange_name": {'temperature': <value(float)>, 'timestamp':<value(float)>}} | ||
""" | ||
return yaml.safe_load(self.client_socket.recv(1024).decode()) | ||
|
||
def read_data(self): | ||
"""Continously read data from the temperature controller.""" | ||
while True: | ||
yield self.get_data() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
from unittest import mock | ||
|
||
import pytest | ||
import yaml | ||
|
||
from qibolab.instruments.bluefors import TemperatureController | ||
|
||
messages = [ | ||
"4K-flange: {'temperature':3.065067, 'timestamp':1710912431.128234}", | ||
"""50K-flange: {'temperature':35.733738, 'timestamp':1710956419.545651} | ||
4K-flange: {'temperature':3.065067, 'timestamp':1710955431.128234}""", | ||
] | ||
|
||
|
||
def test_connect(): | ||
with mock.patch("socket.socket"): | ||
tc = TemperatureController("Test_Temperature_Controller", "") | ||
assert tc.is_connected is False | ||
# if already connected, it should stay connected | ||
for _ in range(2): | ||
tc.connect() | ||
assert tc.is_connected is True | ||
|
||
|
||
@pytest.mark.parametrize("already_connected", [True, False]) | ||
def test_disconnect(already_connected): | ||
with mock.patch("socket.socket"): | ||
tc = TemperatureController("Test_Temperature_Controller", "") | ||
if not already_connected: | ||
tc.connect() | ||
# if already disconnected, it should stay disconnected | ||
for _ in range(2): | ||
tc.disconnect() | ||
assert tc.is_connected is False | ||
|
||
|
||
def test_continuously_read_data(): | ||
with mock.patch( | ||
"qibolab.instruments.bluefors.TemperatureController.get_data", | ||
new=lambda _: yaml.safe_load(messages[0]), | ||
): | ||
tc = TemperatureController("Test_Temperature_Controller", "") | ||
read_temperatures = tc.read_data() | ||
for read_temperature in read_temperatures: | ||
assert read_temperature == yaml.safe_load(messages[0]) | ||
break |