Skip to content

Commit

Permalink
fix: Split readout channels into probe and acquisition
Browse files Browse the repository at this point in the history
  • Loading branch information
alecandido committed Dec 12, 2024
1 parent ef91969 commit f72fde8
Showing 1 changed file with 40 additions and 8 deletions.
48 changes: 40 additions & 8 deletions src/qibolab/_core/instruments/qblox/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
import qblox_instruments as qblox
from qblox_instruments.qcodes_drivers.module import Module

from qibolab._core.components.configs import Config
from qibolab._core.components.configs import Config, LogConfig
from qibolab._core.execution_parameters import ExecutionParameters
from qibolab._core.identifier import ChannelId, Result
from qibolab._core.instruments.abstract import Controller
from qibolab._core.pulses.pulse import PulseLike, Readout
from qibolab._core.sequence import PulseSequence
from qibolab._core.serialize import Model
from qibolab._core.sweeper import ParallelSweepers
Expand Down Expand Up @@ -124,6 +125,7 @@ def play(
for ps in sequences:
sequences_ = _prepare(ps, sweepers, options, self.sampling_rate)
if "log" in configs:
assert isinstance(configs["log"], LogConfig)
_dump_sequences(configs["log"].path, sequences_)
sequencers = self._upload(sequences_)
results |= self._execute(sequencers)
Expand Down Expand Up @@ -165,20 +167,50 @@ def _channels_by_module() -> dict[SlotId, list[ChannelId]]:
return {}


def _split_channels(sequence: PulseSequence) -> dict[ChannelId, PulseSequence]:
def unwrap(pulse: PulseLike, output: bool) -> PulseLike:
return (
pulse
if not isinstance(pulse, Readout)
else pulse.probe if output else pulse.acquisition
)

def unwrap_seq(seq: PulseSequence, output: bool) -> PulseSequence:
return PulseSequence((ch, unwrap(p, output)) for ch, p in seq)

def ch_pulses(channel: ChannelId) -> PulseSequence:
return PulseSequence((ch, pulse) for ch, pulse in sequence if ch == channel)

def probe(channel: ChannelId) -> ChannelId:
return channel.split("/")[0] + "/probe"

def split(channel: ChannelId) -> dict[ChannelId, PulseSequence]:
seq = ch_pulses(channel)
readouts = any(isinstance(p, Readout) for _, p in seq)
assert not readouts or probe(channel) not in sequence.channels
return (
{channel: seq}
if not readouts
else {
channel: unwrap_seq(seq, output=False),
probe(channel): unwrap_seq(seq, output=True),
}
)

return {
ch: seq for channel in sequence.channels for ch, seq in split(channel).items()
}


def _prepare(
sequence: PulseSequence,
sweepers: list[ParallelSweepers],
options: ExecutionParameters,
sampling_rate: float,
) -> dict[ChannelId, Sequence]:
def ch_pulses(channel: ChannelId):
return PulseSequence((ch, pulse) for ch, pulse in sequence if ch == channel)

return {
channel: Sequence.from_pulses(
ch_pulses(channel), sweepers, options, sampling_rate
)
for channel in sequence.channels
ch: Sequence.from_pulses(seq, sweepers, options, sampling_rate)
for ch, seq in _split_channels(sequence).items()
}


Expand Down

0 comments on commit f72fde8

Please sign in to comment.