Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mqtt: handle display demuxing #39

Merged
merged 1 commit into from
Jul 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.nix
Original file line number Diff line number Diff line change
Expand Up @@ -1760,6 +1760,10 @@ rec {
name = "eyre";
packageId = "eyre";
}
{
name = "parking_lot";
packageId = "parking_lot";
}
{
name = "rumqttc";
packageId = "rumqttc";
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ clap = { version = "4.5.9", features = ["derive"] }
color-eyre = "0.6.3"
edid-rs = "0.1.0"
eyre = "0.6.12"
parking_lot = "0.12.3"
rumqttc = "0.24.0"
serde = { version = "1.0.204", features = ["derive"] }
serde_json = "1.0.120"
Expand Down
2 changes: 1 addition & 1 deletion src/browser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tracing::{debug, warn};
use wry::WebViewBuilder;

/// Commands to control the browser instance.
#[derive(Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(tag = "kind")]
pub enum Command {
LoadUrl { url: String },
Expand Down
23 changes: 16 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ struct Cli {

#[arg(long = "default-config")]
default_config_path: Option<String>,

#[arg(long)]
mqtt_topic_prefix: Option<String>,
}

fn main() -> color_eyre::eyre::Result<()> {
Expand Down Expand Up @@ -121,14 +124,20 @@ fn main() -> color_eyre::eyre::Result<()> {

let (tx, rx) = channel();

let listener = mqtt::Listener {
id: config.id.unwrap_or(display_info.serial),
host: config.host,
port: config.port,
sender: tx,
};
let listener = mqtt::Listener::new(
config.id.unwrap_or_else(|| display_info.serial.clone()),
config.host,
config.port,
cli.mqtt_topic_prefix
.unwrap_or_else(|| "screens".to_string()),
)?;

// register our display
// FUTURWORK: multiple display support
listener
.add_display(&display_info, tx)
.context("adding display")?;

listener.start().context("starting the mqtt listener")?;
fossbeamer::spawn_browser(cli.url, rx)?;

Ok(())
Expand Down
164 changes: 121 additions & 43 deletions src/mqtt.rs
Original file line number Diff line number Diff line change
@@ -1,61 +1,139 @@
use bstr::BStr;
use fossbeamer::Command;
use rumqttc::{Client, ClientError, MqttOptions, Packet, Publish};
use std::{sync::mpsc::Sender, thread, time::Duration};
use eyre::Context;
use fossbeamer::{Command, Info};
use parking_lot::RwLock;
use rumqttc::{Client, MqttOptions, Packet, Publish};
use std::{
collections::HashMap,
sync::{mpsc::Sender, Arc},
thread,
time::Duration,
};
use tracing::{debug, info, warn, Span};

/// Maintains a connection to an MQTT broker.
pub(crate) struct Listener {
pub id: String,
pub host: String,
pub port: u16,
pub sender: Sender<fossbeamer::Command>,
/// The MQTT client
client: rumqttc::Client,

/// The topic that's prepended before IDs in the topic
topic_prefix: String,

/// Senders expecting commands to be sent to, keyed by their topic.
senders: Arc<RwLock<HashMap<String, Sender<fossbeamer::Command>>>>,
}

impl Listener {
pub(crate) fn start(self) -> Result<(), ClientError> {
let (client, mut connection) =
Client::new(MqttOptions::new(&self.id, self.host, self.port), 64);

client.subscribe("screens", rumqttc::QoS::AtLeastOnce)?;
client.subscribe(format!("screens/{}", self.id), rumqttc::QoS::AtLeastOnce)?;

thread::spawn(move || {
for event in connection.iter() {
match event {
Ok(event) => match event {
rumqttc::Event::Incoming(Packet::Publish(Publish {
topic,
payload,
..
})) => {
Span::current().record("topic", &topic);
match serde_json::from_slice::<Command>(&payload) {
Ok(command) => {
info!(?command, "received command");

self.sender.send(command).unwrap();
}
Err(e) => {
warn!(err=%e, payload=%BStr::new(&payload), "received payload that couldn't be parsed");
/// Prepares a connection to the broker, and spawns off a thread dealing
/// with received messages.
/// It spawns off a thread relaying messages to the Senders added in a
/// [add_display] call.
pub fn new(
id: impl Into<String>,
host: impl Into<String>,
port: u16,
topic_prefix: impl Into<String> + Clone,
) -> eyre::Result<Self> {
let (client, mut connection) = Client::new(MqttOptions::new(id, host, port), 64);

let senders = Arc::new(RwLock::new(
HashMap::<String, Sender<fossbeamer::Command>>::new(),
));

let topic_prefix: String = topic_prefix.into();
let catchall_topic = topic_prefix.clone();

thread::spawn({
let senders = senders.clone();
let catchall_topic = catchall_topic.clone();
move || {
for event in connection.iter() {
match event {
Ok(event) => match event {
rumqttc::Event::Incoming(Packet::Publish(Publish {
topic,
payload,
..
})) => {
Span::current().record("topic", &topic);

// parse the command
let command = match serde_json::from_slice::<Command>(&payload) {
Ok(command) => {
info!(?command, "received command");
command
}
Err(e) => {
warn!(err=%e, payload=%BStr::new(&payload), "received payload that couldn't be parsed");
continue;
}
};

if topic == catchall_topic {
for (_topic, sender) in senders.read().iter() {
if let Err(e) = sender.send(command.clone()) {
warn!(err=%e, "unable to send command to tx");
}
}
} else {
match senders.read().get(&topic) {
None => {
warn!("couldn't find topic");
continue;
}
Some(tx) => {
if let Err(e) = tx.send(command) {
warn!(err=%e, "unable to send command to tx");
}
}
}
}
}
rumqttc::Event::Incoming(incoming) => {
debug!(?incoming, "other incoming event");
}
rumqttc::Event::Outgoing(out) => {
debug!(?out, "outgoing event");
}
},
Err(e) => {
warn!(err=%e, "connection error");
// sleep a bit
std::thread::sleep(Duration::from_secs(5));
}
rumqttc::Event::Incoming(incoming) => {
debug!(?incoming, "other incoming event");
}
rumqttc::Event::Outgoing(out) => {
debug!(?out, "outgoing event");
}
},
Err(e) => {
warn!(err=%e, "connection error");
// sleep a bit
std::thread::sleep(Duration::from_secs(5));
}
}
}
});

// subscribe to the catchall
client
.subscribe(catchall_topic, rumqttc::QoS::AtLeastOnce)
.context("subscribing to catchall topic")?;

Ok(Self {
client,
senders,
topic_prefix,
})
}

/// Register a new display, using the passed display_info.
/// `set` requests received are sent to the passed channel.
pub fn add_display(
&self,
display_info: &Info,
tx: Sender<fossbeamer::Command>,
) -> eyre::Result<()> {
let k = &display_info.serial;
let topic_str = format!("{}/{}", self.topic_prefix, k);

self.client
.subscribe(&topic_str, rumqttc::QoS::AtLeastOnce)
.context("subscribing to topic")?;

self.senders.write().insert(topic_str, tx);

Ok(())
}
}
Loading