Skip to content

Commit

Permalink
fix(CompositeDevice): use async target attach request to prevent dead…
Browse files Browse the repository at this point in the history
…lock
  • Loading branch information
ShadowApex committed Feb 13, 2025
1 parent ac5f7e0 commit 6e570bb
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 61 deletions.
100 changes: 41 additions & 59 deletions src/input/composite_device/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,49 +273,14 @@ impl CompositeDevice {

/// Starts the [CompositeDevice] and listens for events from all source
/// devices to translate the events and send them to the appropriate target.
pub async fn run(
&mut self,
targets: HashMap<String, TargetDeviceClient>,
) -> Result<(), Box<dyn Error>> {
pub async fn run(&mut self) -> Result<(), Box<dyn Error>> {
log::debug!("Starting composite device");

let dbus_path = self.dbus_path.clone();

// Start all source devices
self.run_source_devices().await?;

// Keep track of all target devices
for (path, target) in targets.iter() {
if let Err(e) = target.set_composite_device(self.client()).await {
return Err(
format!("Failed to set composite device for target device: {:?}", e).into(),
);
}

// Query the target device for its capabilities
let caps = match target.get_capabilities().await {
Ok(caps) => caps,
Err(e) => {
return Err(format!("Failed to get target capabilities: {e:?}").into());
}
};

// Track the target device by capabilities it has
for cap in caps {
self.target_devices_by_capability
.entry(cap)
.and_modify(|devices| {
devices.insert(path.clone());
})
.or_insert_with(|| {
let mut devices = HashSet::new();
devices.insert(path.clone());
devices
});
}
}
self.target_devices = targets;

// Loop and listen for command events
log::debug!("CompositeDevice started");
let mut buffer = Vec::with_capacity(BUFFER_SIZE);
Expand Down Expand Up @@ -661,7 +626,7 @@ impl CompositeDevice {
log::trace!("Blocking event! {:?}", raw_event);
return Ok(());
}
//log::trace!("Received event: {:?} from {device_id}", raw_event);
log::debug!("Received event: {:?} from {device_id}", raw_event);

// Convert the event into a NativeEvent
let event: NativeEvent = match raw_event {
Expand Down Expand Up @@ -1043,7 +1008,7 @@ impl CompositeDevice {

// Find all target devices capable of handling this event
let Some(target_paths) = self.target_devices_by_capability.get(&cap) else {
log::trace!("No target devices capable of handling this event: {cap}");
log::debug!("No target devices capable of handling this event: {cap}");
return Ok(());
};
let target_devices: Vec<(&str, &TargetDeviceClient)> = target_paths
Expand All @@ -1055,7 +1020,7 @@ impl CompositeDevice {
.collect();

// Only write the event to devices that are capabile of handling it
log::trace!("Emit passed event: {:?}", event);
log::debug!("Emit passed event: {:?}", event);
for (name, target) in target_devices {
if let Err(e) = target.write_event(event.clone()).await {
log::error!("Failed to write event to: {name}: {e:?}");
Expand Down Expand Up @@ -1860,6 +1825,7 @@ impl CompositeDevice {

// Create new target devices using the input manager
for kind in device_types_to_start {
// Ask the input manager to create a target device
log::debug!("Requesting to create device: {kind}");
let (sender, mut receiver) = mpsc::channel(1);
self.manager
Expand All @@ -1878,29 +1844,44 @@ impl CompositeDevice {
}
};

// Attach the target device
// Ask the input manager to attach the target device to this composite
// device. Note that this *must* be run in an async task to prevent
// deadlocking.
log::debug!("Requesting to attach target device {target_path} to {composite_path}");
let (sender, mut receiver) = mpsc::channel(1);
self.manager
.send(ManagerCommand::AttachTargetDevice {
target_path: target_path.clone(),
composite_path: composite_path.clone(),
sender,
})
.await?;
let Some(response) = receiver.recv().await else {
log::warn!("Channel closed waiting for response from input manager");
continue;
};
if let Err(e) = response {
log::error!("Failed to attach target device: {e:?}");
}
let manager = self.manager.clone();
let target_path_clone = target_path.clone();
let composite_path_clone = composite_path.clone();
tokio::task::spawn(async move {
let (sender, mut receiver) = mpsc::channel(1);
let result = manager
.send(ManagerCommand::AttachTargetDevice {
target_path: target_path_clone,
composite_path: composite_path_clone,
sender,
})
.await;
if let Err(e) = result {
log::warn!(
"Failed to send attach request to input manager: {}",
e.to_string()
);
return;
}
let Some(response) = receiver.recv().await else {
log::warn!("Channel closed waiting for response from input manager");
return;
};
if let Err(e) = response {
log::error!("Failed to attach target device: {e:?}");
}
});

// Enqueue the target device to wait for the attachment message from
// the input manager to prevent multiple calls to set_target_devices()
// from mangling attachment.
self.target_devices_queued.insert(target_path);
}

// Signal change in target devices to DBus
// TODO: Check this
//self.signal_targets_changed().await;
Expand Down Expand Up @@ -1983,10 +1964,6 @@ impl CompositeDevice {
}
log::debug!("Attached device {path} to {dbus_path}");

// Add the target device
self.target_devices_queued.remove(&path);
self.target_devices.insert(path.clone(), target);

// Track the target device by capabilities it has
for cap in caps {
self.target_devices_by_capability
Expand All @@ -2000,7 +1977,12 @@ impl CompositeDevice {
devices
});
}

// Add the target device
self.target_devices_queued.remove(&path);
self.target_devices.insert(path.clone(), target);
}

// TODO: check this
//self.signal_targets_changed().await;

Expand Down
3 changes: 1 addition & 2 deletions src/input/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,8 +793,7 @@ impl Manager {
let composite_path_clone = composite_path.clone();
let tx = self.tx.clone();
let task = tokio::spawn(async move {
let targets = HashMap::new();
if let Err(e) = device.run(targets).await {
if let Err(e) = device.run().await {
log::error!("Error running {composite_path}: {}", e.to_string());
}
log::debug!("Composite device stopped running: {composite_path}");
Expand Down

0 comments on commit 6e570bb

Please sign in to comment.