Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(CompositeDevice): move device hiding logic back to device start instead of device add #287

Merged
merged 2 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rootfs/usr/lib/systemd/system/inputplumber.service
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Description=InputPlumber Service

[Service]
Environment=LOG_LEVEL=info
ExecStart=/usr/bin/inputplumber

[Install]
Expand Down
122 changes: 56 additions & 66 deletions src/input/composite_device/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ pub struct CompositeDevice {
source_devices: HashMap<String, SourceDeviceClient>,
/// Source devices that this composite device will consume.
source_devices_discovered: Vec<SourceDevice>,
/// Source devices that should be hidden before they are started. This
/// is a list of devnode paths to hide (e.g. ["/dev/input/event10", "/dev/hidraw1"])
source_devices_to_hide: Vec<String>,
/// HashSet of source devices that are blocked from passing their input events to target
/// events.
source_devices_blocked: HashSet<String>,
Expand Down Expand Up @@ -165,7 +168,7 @@ pub struct CompositeDevice {
}

impl CompositeDevice {
pub async fn new(
pub fn new(
conn: Connection,
manager: mpsc::Sender<ManagerCommand>,
config: CompositeDeviceConfig,
Expand Down Expand Up @@ -195,6 +198,7 @@ impl CompositeDevice {
rx,
source_devices: HashMap::new(),
source_devices_discovered: Vec::new(),
source_devices_to_hide: Vec::new(),
source_devices_blocked: HashSet::new(),
source_device_paths: Vec::new(),
source_device_tasks: JoinSet::new(),
Expand Down Expand Up @@ -239,7 +243,7 @@ impl CompositeDevice {
}
}

if let Err(e) = device.add_source_device(device_info).await {
if let Err(e) = device.add_source_device(device_info) {
return Err(e.to_string().into());
}

Expand Down Expand Up @@ -269,49 +273,14 @@ impl CompositeDevice {

/// Starts the [CompositeDevice] and listens for events from all source
/// devices to translate the events and send them to the appropriate target.
pub async fn run(
&mut self,
targets: HashMap<String, TargetDeviceClient>,
) -> Result<(), Box<dyn Error>> {
pub async fn run(&mut self) -> Result<(), Box<dyn Error>> {
log::debug!("Starting composite device");

let dbus_path = self.dbus_path.clone();

// Start all source devices
self.run_source_devices().await?;

// Keep track of all target devices
for (path, target) in targets.iter() {
if let Err(e) = target.set_composite_device(self.client()).await {
return Err(
format!("Failed to set composite device for target device: {:?}", e).into(),
);
}

// Query the target device for its capabilities
let caps = match target.get_capabilities().await {
Ok(caps) => caps,
Err(e) => {
return Err(format!("Failed to get target capabilities: {e:?}").into());
}
};

// Track the target device by capabilities it has
for cap in caps {
self.target_devices_by_capability
.entry(cap)
.and_modify(|devices| {
devices.insert(path.clone());
})
.or_insert_with(|| {
let mut devices = HashSet::new();
devices.insert(path.clone());
devices
});
}
}
self.target_devices = targets;

// Loop and listen for command events
log::debug!("CompositeDevice started");
let mut buffer = Vec::with_capacity(BUFFER_SIZE);
Expand Down Expand Up @@ -602,7 +571,14 @@ impl CompositeDevice {
/// Start and run the source devices that this composite device will
/// consume.
async fn run_source_devices(&mut self) -> Result<(), Box<dyn Error>> {
// Keep a list of all the tasks
// Hide the device if specified
for source_path in self.source_devices_to_hide.drain(..) {
log::debug!("Hiding device: {}", source_path);
if let Err(e) = hide_device(source_path.as_str()).await {
log::warn!("Failed to hide device '{source_path}': {e:?}");
}
log::debug!("Finished hiding device: {source_path}");
}

log::debug!("Starting new source devices");
// Start listening for events from all source devices
Expand Down Expand Up @@ -651,7 +627,7 @@ impl CompositeDevice {
log::trace!("Blocking event! {:?}", raw_event);
return Ok(());
}
//log::trace!("Received event: {:?} from {device_id}", raw_event);
log::trace!("Received event: {:?} from {device_id}", raw_event);

// Convert the event into a NativeEvent
let event: NativeEvent = match raw_event {
Expand Down Expand Up @@ -1387,7 +1363,7 @@ impl CompositeDevice {

/// Executed whenever a source device is added to this [CompositeDevice].
async fn on_source_device_added(&mut self, device: UdevDevice) -> Result<(), Box<dyn Error>> {
if let Err(e) = self.add_source_device(device).await {
if let Err(e) = self.add_source_device(device) {
return Err(e.to_string().into());
}
self.run_source_devices().await?;
Expand Down Expand Up @@ -1432,7 +1408,7 @@ impl CompositeDevice {
}

/// Creates and adds a source device using the given [SourceDeviceInfo]
async fn add_source_device(
fn add_source_device(
&mut self,
device: UdevDevice,
) -> Result<(), Box<dyn Error + Send + Sync>> {
Expand All @@ -1456,10 +1432,7 @@ impl CompositeDevice {
let should_hide = !should_passthru && subsystem.as_str() != "iio";
if should_hide {
let source_path = device.devnode();
log::debug!("Hiding device: {}", source_path);
if let Err(e) = hide_device(source_path.as_str()).await {
log::warn!("Failed to hide device '{source_path}': {e:?}");
}
self.source_devices_to_hide.push(source_path);
}

let source_device = match subsystem.as_str() {
Expand Down Expand Up @@ -1853,6 +1826,7 @@ impl CompositeDevice {

// Create new target devices using the input manager
for kind in device_types_to_start {
// Ask the input manager to create a target device
log::debug!("Requesting to create device: {kind}");
let (sender, mut receiver) = mpsc::channel(1);
self.manager
Expand All @@ -1871,29 +1845,44 @@ impl CompositeDevice {
}
};

// Attach the target device
// Ask the input manager to attach the target device to this composite
// device. Note that this *must* be run in an async task to prevent
// deadlocking.
log::debug!("Requesting to attach target device {target_path} to {composite_path}");
let (sender, mut receiver) = mpsc::channel(1);
self.manager
.send(ManagerCommand::AttachTargetDevice {
target_path: target_path.clone(),
composite_path: composite_path.clone(),
sender,
})
.await?;
let Some(response) = receiver.recv().await else {
log::warn!("Channel closed waiting for response from input manager");
continue;
};
if let Err(e) = response {
log::error!("Failed to attach target device: {e:?}");
}
let manager = self.manager.clone();
let target_path_clone = target_path.clone();
let composite_path_clone = composite_path.clone();
tokio::task::spawn(async move {
let (sender, mut receiver) = mpsc::channel(1);
let result = manager
.send(ManagerCommand::AttachTargetDevice {
target_path: target_path_clone,
composite_path: composite_path_clone,
sender,
})
.await;
if let Err(e) = result {
log::warn!(
"Failed to send attach request to input manager: {}",
e.to_string()
);
return;
}
let Some(response) = receiver.recv().await else {
log::warn!("Channel closed waiting for response from input manager");
return;
};
if let Err(e) = response {
log::error!("Failed to attach target device: {e:?}");
}
});

// Enqueue the target device to wait for the attachment message from
// the input manager to prevent multiple calls to set_target_devices()
// from mangling attachment.
self.target_devices_queued.insert(target_path);
}

// Signal change in target devices to DBus
// TODO: Check this
//self.signal_targets_changed().await;
Expand Down Expand Up @@ -1976,10 +1965,6 @@ impl CompositeDevice {
}
log::debug!("Attached device {path} to {dbus_path}");

// Add the target device
self.target_devices_queued.remove(&path);
self.target_devices.insert(path.clone(), target);

// Track the target device by capabilities it has
for cap in caps {
self.target_devices_by_capability
Expand All @@ -1993,7 +1978,12 @@ impl CompositeDevice {
devices
});
}

// Add the target device
self.target_devices_queued.remove(&path);
self.target_devices.insert(path.clone(), target);
}

// TODO: check this
//self.signal_targets_changed().await;

Expand Down
20 changes: 16 additions & 4 deletions src/input/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::error::Error;
use std::fs;
use std::path::PathBuf;
use std::time::Duration;

use ::procfs::CpuInfo;
Expand Down Expand Up @@ -210,6 +211,12 @@ impl Manager {
/// Starts listening for [Command] messages to be sent from clients and
/// dispatch those events.
pub async fn run(&mut self) -> Result<(), Box<dyn Error + Send + Sync>> {
// Delay initial discovery by a short amount of time to allow udev
// rules to process for the first time.
// TODO: Figure out a better way to prevent udev from not running hiding
// rules too early in boot.
tokio::time::sleep(Duration::from_millis(4000)).await;

let dbus_for_listen_on_dbus = self.dbus.clone();

let cmd_tx_all_devices = self.tx.clone();
Expand Down Expand Up @@ -546,8 +553,7 @@ impl Manager {
device,
self.next_composite_dbus_path()?,
capability_map,
)
.await?;
)?;

// Check to see if there's already a CompositeDevice for
// these source devices.
Expand Down Expand Up @@ -789,8 +795,7 @@ impl Manager {
let composite_path_clone = composite_path.clone();
let tx = self.tx.clone();
let task = tokio::spawn(async move {
let targets = HashMap::new();
if let Err(e) = device.run(targets).await {
if let Err(e) = device.run().await {
log::error!("Error running {composite_path}: {}", e.to_string());
}
log::debug!("Composite device stopped running: {composite_path}");
Expand Down Expand Up @@ -1553,6 +1558,13 @@ impl Manager {
continue;
};

// Ensure the path is a valid devnode
let full_path = PathBuf::from(format!("{base_path}/{name}"));
if full_path.is_dir() {
log::trace!("Devnode path {base_path}/{name} is a directory. Skipping.");
continue;
}

// Wait until the device has initialized with udev
const MAX_TRIES: u8 = 80;
let mut attempt: u8 = 0;
Expand Down
6 changes: 4 additions & 2 deletions src/udev/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,9 @@ impl Device {
let match_rule = match subsystem.as_str() {
"hidraw" => {
let name = self.name.clone();
Some(format!(r#"SUBSYSTEMS=="{subsystem}", KERNEL=="{name}""#))
Some(format!(
r#"ACTION=="add|change", SUBSYSTEMS=="{subsystem}", KERNEL=="{name}""#
))
}
"input" => {
let rule_fn = || {
Expand All @@ -755,7 +757,7 @@ impl Device {
let pid = self.get_product_id()?;

Some(format!(
r#"SUBSYSTEMS=="{subsystem}", KERNELS=="{device_name}", ATTRS{{id/vendor}}=="{vid}", ATTRS{{id/product}}=="{pid}""#
r#"ACTION=="add|change", SUBSYSTEMS=="{subsystem}", KERNELS=="{device_name}", ATTRS{{id/vendor}}=="{vid}", ATTRS{{id/product}}=="{pid}""#
))
};
rule_fn()
Expand Down
7 changes: 6 additions & 1 deletion src/udev/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ pub async fn hide_device(path: &str) -> Result<(), Box<dyn Error>> {
{match_rule}, GOTO="inputplumber_valid"
GOTO="inputplumber_end"
LABEL="inputplumber_valid"
ACTION=="add|change", KERNEL=="hidraw[0-9]*|js[0-9]*|event[0-9]*", SUBSYSTEM=="{subsystem}", MODE:="0000", GROUP:="root", RUN:="{chmod_cmd} 000 {path}", SYMLINK+="inputplumber/by-hidden/%k"
KERNEL=="js[0-9]*|event[0-9]*", SUBSYSTEM=="{subsystem}", MODE:="0000", GROUP:="root", RUN:="{chmod_cmd} 000 /dev/input/%k", SYMLINK+="inputplumber/by-hidden/%k"
KERNEL=="hidraw[0-9]*", SUBSYSTEM=="{subsystem}", MODE:="0000", GROUP:="root", RUN:="{chmod_cmd} 000 /dev/%k", SYMLINK+="inputplumber/by-hidden/%k"
LABEL="inputplumber_end"
"#
);
Expand Down Expand Up @@ -102,12 +103,14 @@ pub async fn unhide_all() -> Result<(), Box<dyn Error>> {

/// Trigger udev to evaluate rules on the children of the given parent device path
async fn reload_children(parent: String) -> Result<(), Box<dyn Error>> {
log::debug!("Reloading udev rules: udevadm control --reload-rules");
let _ = Command::new("udevadm")
.args(["control", "--reload-rules"])
.output()
.await?;

for action in ["remove", "add"] {
log::debug!("Retriggering udev rules: udevadm trigger --action {action} -b {parent}");
let _ = Command::new("udevadm")
.args(["trigger", "--action", action, "-b", parent.as_str()])
.output()
Expand All @@ -119,11 +122,13 @@ async fn reload_children(parent: String) -> Result<(), Box<dyn Error>> {

/// Trigger udev to evaluate rules on the children of the given parent device path
async fn reload_all() -> Result<(), Box<dyn Error>> {
log::debug!("Reloading udev rules: udevadm control --reload-rules");
let _ = Command::new("udevadm")
.args(["control", "--reload-rules"])
.output()
.await?;

log::debug!("Retriggering udev rules: udevadm trigger");
let _ = Command::new("udevadm").arg("trigger").output().await?;

Ok(())
Expand Down