Skip to content

Commit

Permalink
refactor: minor refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
PiergiorgioButtarini committed Dec 14, 2023
1 parent 00bcbb7 commit 17df3f8
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 181 deletions.
120 changes: 61 additions & 59 deletions src/qibolab/instruments/qblox/cluster_qcm_bb.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,49 +141,52 @@ def __init__(self, name: str, address: str, cluster: Cluster = None):
self._used_sequencers_numbers: list[int] = []
self._unused_sequencers_numbers: list[int] = []

def _initialize_device_params(self):
self.device.set("out0_offset", value=0) # Default after reboot = 0
self.device.set("out1_offset", value=0) # Default after reboot = 0
self.device.set("out2_offset", value=0) # Default after reboot = 0
self.device.set("out3_offset", value=0) # Default after reboot = 0

# initialise the parameters of the default sequencers to the default values,
# the rest of the sequencers are not configured here, but will be configured
# with the same parameters as the default in process_pulse_sequence()
for target in [
self.device.sequencers[self.DEFAULT_SEQUENCERS["o1"]],
self.device.sequencers[self.DEFAULT_SEQUENCERS["o2"]],
self.device.sequencers[self.DEFAULT_SEQUENCERS["o3"]],
self.device.sequencers[self.DEFAULT_SEQUENCERS["o4"]],
]:
target.set("cont_mode_en_awg_path0", False) # Default after reboot = False
target.set("cont_mode_en_awg_path1", False)
target.set("cont_mode_waveform_idx_awg_path0", 0)
target.set("cont_mode_waveform_idx_awg_path1", 0)
target.set("marker_ovr_en", True)
target.set("marker_ovr_value", 15)
target.set("mixer_corr_gain_ratio", 1)
target.set("mixer_corr_phase_offset_degree", 0)
target.set("offset_awg_path0", 0)
target.set("offset_awg_path1", 0)
target.set("sync_en", False)
target.set("upsample_rate_awg_path0", 0)
target.set("upsample_rate_awg_path1", 0)
target.set("connect_out0", "off") # Default after reboot = True
target.set("connect_out1", "off")
target.set("connect_out2", "off")
target.set("connect_out3", "off")

self.device.sequencers[self.DEFAULT_SEQUENCERS["o1"]].set("connect_out0", "I")
self.device.sequencers[self.DEFAULT_SEQUENCERS["o2"]].set("connect_out1", "Q")
self.device.sequencers[self.DEFAULT_SEQUENCERS["o3"]].set("connect_out2", "I")
self.device.sequencers[self.DEFAULT_SEQUENCERS["o4"]].set("connect_out3", "Q")

# on initialisation, disconnect all other sequencers from the ports
self._device_num_sequencers = len(self.device.sequencers)
for sequencer in range(4, self._device_num_sequencers):
target = self.device.sequencers[sequencer]
[target.set(f"connect_out{x}", "off") for x in range(4)]
def _set_default_values(self):
if self.device.present():
self.device.set("out0_offset", value=0) # Default after reboot = 0
self.device.set("out1_offset", value=0) # Default after reboot = 0
self.device.set("out2_offset", value=0) # Default after reboot = 0
self.device.set("out3_offset", value=0) # Default after reboot = 0

# initialise the parameters of the default sequencers to the default values,
# the rest of the sequencers are not configured here, but will be configured
# with the same parameters as the default in process_pulse_sequence()
for target in [
self.device.sequencers[self.DEFAULT_SEQUENCERS["o1"]],
self.device.sequencers[self.DEFAULT_SEQUENCERS["o2"]],
self.device.sequencers[self.DEFAULT_SEQUENCERS["o3"]],
self.device.sequencers[self.DEFAULT_SEQUENCERS["o4"]],
]:
target.set("cont_mode_en_awg_path0", False) # Default after reboot = False
target.set("cont_mode_en_awg_path1", False)
target.set("cont_mode_waveform_idx_awg_path0", 0)
target.set("cont_mode_waveform_idx_awg_path1", 0)
target.set("marker_ovr_en", True)
target.set("marker_ovr_value", 15)
target.set("mixer_corr_gain_ratio", 1)
target.set("mixer_corr_phase_offset_degree", 0)
target.set("offset_awg_path0", 0)
target.set("offset_awg_path1", 0)
target.set("sync_en", False)
target.set("upsample_rate_awg_path0", 0)
target.set("upsample_rate_awg_path1", 0)
target.set("connect_out0", "off") # Default after reboot = True
target.set("connect_out1", "off")
target.set("connect_out2", "off")
target.set("connect_out3", "off")

self.device.sequencers[self.DEFAULT_SEQUENCERS["o1"]].set("connect_out0", "I")
self.device.sequencers[self.DEFAULT_SEQUENCERS["o2"]].set("connect_out1", "Q")
self.device.sequencers[self.DEFAULT_SEQUENCERS["o3"]].set("connect_out2", "I")
self.device.sequencers[self.DEFAULT_SEQUENCERS["o4"]].set("connect_out3", "Q")

# on initialisation, disconnect all other sequencers from the ports
self._device_num_sequencers = len(self.device.sequencers)
for sequencer in range(4, self._device_num_sequencers):
target = self.device.sequencers[sequencer]
[target.set(f"connect_out{x}", "off") for x in range(4)]
else:
raise ConnectionError(f"Module {self.device.name} not connected to cluster {self._cluster.cluster.name}")

def connect(self):
"""Connects to the instrument using the instrument settings in the runcard.
Expand All @@ -192,24 +195,23 @@ def connect(self):
parameters, and initialises the the underlying device parameters.
It uploads to the module the port settings loaded from the runcard.
"""
if self.is_connected:
return

self._cluster.connect()
self.device: QcmQrm = self._cluster.device.modules[int(self.address.split(":")[1]) - 1]
if not self.is_connected:
if not self.device.present():
raise Exception(f"Module {self.device.name} not connected to cluster {self._cluster.cluster.name}")
self.is_connected = True

# once connected, initialise the parameters of the device to the default values
self._initialize_device_params()
try:
for port in self.settings:
self._sequencers[port] = []
self.ports[port].offset = self.settings[port]["offset"]
self.ports[port].hardware_mod_en = True
self.ports[port].nco_freq = 0
self.ports[port].nco_phase_offs = 0
except:
raise RuntimeError(f"Unable to initialize port parameters on module {self.name}")
# once connected, initialise the parameters of the device to the default values
self._set_default_values()
try:
for port in self.settings:
self._sequencers[port] = []
self.ports[port].offset = self.settings[port]["offset"]
self.ports[port].hardware_mod_en = True
self.ports[port].nco_freq = 0
self.ports[port].nco_phase_offs = 0
except:
raise RuntimeError(f"Unable to initialize port parameters on module {self.name}")
self.is_connected = True

def setup(self, **settings):
"""Cache the settings of the runcard and instantiate the ports of the module.
Expand Down
117 changes: 59 additions & 58 deletions src/qibolab/instruments/qblox/cluster_qcm_rf.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ def __init__(self, name: str, address: str, cluster: Cluster):
self.device: QcmQrm = None
self.ports: dict = {}
self.settings = {}

self._debug_folder: str = ""
self._cluster: Cluster = cluster
self._sequencers: dict[Sequencer] = {}
Expand All @@ -163,43 +164,46 @@ def __init__(self, name: str, address: str, cluster: Cluster):
self._used_sequencers_numbers: list[int] = []
self._unused_sequencers_numbers: list[int] = []

def _initialize_device_params(self):
# Default after reboot = 7.625
self.device.set("out0_offset_path0", 0)
self.device.set("out0_offset_path1", 0)
self.device.set("out1_offset_path0", 0)
self.device.set("out1_offset_path1", 0)
# initialise the parameters of the default sequencers to the default values,
# the rest of the sequencers are not configured here, but will be configured
# with the same parameters as the default in process_pulse_sequence()
for target in [
self.device.sequencers[self.DEFAULT_SEQUENCERS["o1"]],
self.device.sequencers[self.DEFAULT_SEQUENCERS["o2"]],
]:
target.set("cont_mode_en_awg_path0", False)
target.set("cont_mode_en_awg_path1", False)
target.set("cont_mode_waveform_idx_awg_path0", 0)
target.set("cont_mode_waveform_idx_awg_path1", 0)
target.set("marker_ovr_en", True) # Default after reboot = False
target.set("marker_ovr_value", 15) # Default after reboot = 0
target.set("mixer_corr_gain_ratio", 1)
target.set("mixer_corr_phase_offset_degree", 0)
target.set("offset_awg_path0", 0)
target.set("offset_awg_path1", 0)
target.set("sync_en", False) # Default after reboot = False
target.set("upsample_rate_awg_path0", 0)
target.set("upsample_rate_awg_path1", 0)

self.device.sequencers[self.DEFAULT_SEQUENCERS["o1"]].set("connect_out0", "IQ")
self.device.sequencers[self.DEFAULT_SEQUENCERS["o1"]].set("connect_out1", "off")
self.device.sequencers[self.DEFAULT_SEQUENCERS["o2"]].set("connect_out1", "IQ")
self.device.sequencers[self.DEFAULT_SEQUENCERS["o2"]].set("connect_out0", "off")

# on initialisation, disconnect all other sequencers from the ports
self._device_num_sequencers = len(self.device.sequencers)
for sequencer in range(2, self._device_num_sequencers):
self.device.sequencers[sequencer].set("connect_out0", "off")
self.device.sequencers[sequencer].set("connect_out1", "off")
def _set_default_values(self):
if self.device.present():
# Default after reboot = 7.625
self.device.set("out0_offset_path0", 0)
self.device.set("out0_offset_path1", 0)
self.device.set("out1_offset_path0", 0)
self.device.set("out1_offset_path1", 0)
# initialise the parameters of the default sequencers to the default values,
# the rest of the sequencers are not configured here, but will be configured
# with the same parameters as the default in process_pulse_sequence()
for target in [
self.device.sequencers[self.DEFAULT_SEQUENCERS["o1"]],
self.device.sequencers[self.DEFAULT_SEQUENCERS["o2"]],
]:
target.set("cont_mode_en_awg_path0", False)
target.set("cont_mode_en_awg_path1", False)
target.set("cont_mode_waveform_idx_awg_path0", 0)
target.set("cont_mode_waveform_idx_awg_path1", 0)
target.set("marker_ovr_en", True) # Default after reboot = False
target.set("marker_ovr_value", 15) # Default after reboot = 0
target.set("mixer_corr_gain_ratio", 1)
target.set("mixer_corr_phase_offset_degree", 0)
target.set("offset_awg_path0", 0)
target.set("offset_awg_path1", 0)
target.set("sync_en", False) # Default after reboot = False
target.set("upsample_rate_awg_path0", 0)
target.set("upsample_rate_awg_path1", 0)

self.device.sequencers[self.DEFAULT_SEQUENCERS["o1"]].set("connect_out0", "IQ")
self.device.sequencers[self.DEFAULT_SEQUENCERS["o1"]].set("connect_out1", "off")
self.device.sequencers[self.DEFAULT_SEQUENCERS["o2"]].set("connect_out1", "IQ")
self.device.sequencers[self.DEFAULT_SEQUENCERS["o2"]].set("connect_out0", "off")

# on initialisation, disconnect all other sequencers from the ports
self._device_num_sequencers = len(self.device.sequencers)
for sequencer in range(2, self._device_num_sequencers):
self.device.sequencers[sequencer].set("connect_out0", "off")
self.device.sequencers[sequencer].set("connect_out1", "off")
else:
raise ConnectionError(f"Module {self.device.name} not connected to cluster {self._cluster.cluster.name}")

def connect(self):
"""Connects to the instrument using the instrument settings in the runcard.
Expand All @@ -208,29 +212,26 @@ def connect(self):
parameters, and initialises the the underlying device parameters.
It uploads to the module the port settings loaded from the runcard.
"""
if self.is_connected:
return

self._cluster.connect()
self.device: QcmQrm = self._cluster.device.modules[int(self.address.split(":")[1]) - 1]
if not self.is_connected:
if not self.device.present():
raise ConnectionError(
f"Module {self.device.name} not connected to cluster {self._cluster.cluster.name}"
)

self.is_connected = True
# once connected, initialise the parameters of the device to the default values
self._initialize_device_params()
try:
for port in self.settings:
self._sequencers[port] = []
if self.settings[port]["lo_frequency"]:
self.ports[port].lo_enabled = True
self.ports[port].lo_frequency = self.settings[port]["lo_frequency"]
self.ports[port].attenuation = self.settings[port]["attenuation"]
self.ports[port].hardware_mod_en = True
self.ports[port].nco_freq = 0
self.ports[port].nco_phase_offs = 0
except:
raise RuntimeError(f"Unable to initialize port parameters on module {self.name}")
# once connected, initialise the parameters of the device to the default values
self._set_default_values()
try:
for port in self.settings:
self._sequencers[port] = []
if self.settings[port]["lo_frequency"]:
self.ports[port].lo_enabled = True
self.ports[port].lo_frequency = self.settings[port]["lo_frequency"]
self.ports[port].attenuation = self.settings[port]["attenuation"]
self.ports[port].hardware_mod_en = True
self.ports[port].nco_freq = 0
self.ports[port].nco_phase_offs = 0
except:
raise RuntimeError(f"Unable to initialize port parameters on module {self.name}")
self.is_connected = True

def setup(self, **settings):
"""Cache the settings of the runcard and instantiate the ports of the module.
Expand Down
Loading

0 comments on commit 17df3f8

Please sign in to comment.