Skip to content

Commit

Permalink
Bugfix/websockets (#2808)
Browse files Browse the repository at this point in the history
* retry logic for init status

* fix login flashing and sideload hanging

* add logging

* misc backend bugfixes

* use closingObserver instead

* always show reinstall button

* go back to endWith

* show error if sideload fails

* refactor more watch channels

* navigate to services page on sideload complete

* handle error closure events properly

* handle error scenario better in sideload websocket

* remove a clone

---------

Co-authored-by: Matt Hill <[email protected]>
  • Loading branch information
dr-bonez and MattDHill authored Jan 14, 2025
1 parent eb1f3a0 commit 5d759f8
Show file tree
Hide file tree
Showing 22 changed files with 451 additions and 293 deletions.
37 changes: 23 additions & 14 deletions core/startos/src/context/setup.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::ops::Deref;
use std::path::{Path};
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -162,21 +162,30 @@ impl SetupContext {
if let Err(e) = async {
let mut stream =
progress_tracker.stream(Some(Duration::from_millis(100)));
while let Some(progress) = stream.next().await {
ws.send(ws::Message::Text(
serde_json::to_string(&progress)
.with_kind(ErrorKind::Serialization)?,
))
.await
.with_kind(ErrorKind::Network)?;
if progress.overall.is_complete() {
break;
loop {
tokio::select! {
progress = stream.next() => {
if let Some(progress) = progress {
ws.send(ws::Message::Text(
serde_json::to_string(&progress)
.with_kind(ErrorKind::Serialization)?,
))
.await
.with_kind(ErrorKind::Network)?;
if progress.overall.is_complete() {
return ws.normal_close("complete").await;
}
} else {
return ws.normal_close("complete").await;
}
}
msg = ws.recv() => {
if msg.transpose().with_kind(ErrorKind::Network)?.is_none() {
return Ok(())
}
}
}
}

ws.normal_close("complete").await?;

Ok::<_, Error>(())
}
.await
{
Expand Down
29 changes: 19 additions & 10 deletions core/startos/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,17 +198,26 @@ pub async fn subscribe(
session,
|mut ws| async move {
if let Err(e) = async {
while let Some(rev) = sub.recv().await {
ws.send(ws::Message::Text(
serde_json::to_string(&rev).with_kind(ErrorKind::Serialization)?,
))
.await
.with_kind(ErrorKind::Network)?;
loop {
tokio::select! {
rev = sub.recv() => {
if let Some(rev) = rev {
ws.send(ws::Message::Text(
serde_json::to_string(&rev).with_kind(ErrorKind::Serialization)?,
))
.await
.with_kind(ErrorKind::Network)?;
} else {
return ws.normal_close("complete").await;
}
}
msg = ws.recv() => {
if msg.transpose().with_kind(ErrorKind::Network)?.is_none() {
return Ok(())
}
}
}
}

ws.normal_close("complete").await?;

Ok::<_, Error>(())
}
.await
{
Expand Down
49 changes: 27 additions & 22 deletions core/startos/src/install/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::ops::Deref;
use std::path::PathBuf;
use std::time::Duration;

use axum::extract::ws;
use clap::builder::ValueParserFactory;
use clap::{value_parser, CommandFactory, FromArgMatches, Parser};
use color_eyre::eyre::eyre;
Expand All @@ -12,7 +13,7 @@ use itertools::Itertools;
use models::{FromStrParser, VersionString};
use reqwest::header::{HeaderMap, CONTENT_LENGTH};
use reqwest::Url;
use rpc_toolkit::yajrc::{GenericRpcMethod, RpcError};
use rpc_toolkit::yajrc::RpcError;
use rpc_toolkit::HandlerArgs;
use rustyline_async::ReadlineEvent;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -188,7 +189,7 @@ pub async fn sideload(
SideloadParams { session }: SideloadParams,
) -> Result<SideloadResponse, Error> {
let (upload, file) = upload(&ctx, session.clone()).await?;
let (err_send, err_recv) = oneshot::channel::<Error>();
let (err_send, mut err_recv) = oneshot::channel::<Error>();
let progress = Guid::new();
let progress_tracker = FullProgressTracker::new();
let mut progress_listener = progress_tracker.stream(Some(Duration::from_millis(200)));
Expand All @@ -198,40 +199,44 @@ pub async fn sideload(
RpcContinuation::ws_authed(
&ctx,
session,
|mut ws| {
use axum::extract::ws::Message;
async move {
if let Err(e) = async {
|mut ws| async move {
if let Err(e) = async {
loop {
tokio::select! {
res = async {
while let Some(progress) = progress_listener.next().await {
ws.send(Message::Text(
progress = progress_listener.next() => {
if let Some(progress) = progress {
ws.send(ws::Message::Text(
serde_json::to_string(&progress)
.with_kind(ErrorKind::Serialization)?,
))
.await
.with_kind(ErrorKind::Network)?;
if progress.overall.is_complete() {
return ws.normal_close("complete").await;
}
} else {
return ws.normal_close("complete").await;
}
Ok::<_, Error>(())
} => res?,
err = err_recv => {
}
msg = ws.recv() => {
if msg.transpose().with_kind(ErrorKind::Network)?.is_none() {
return Ok(())
}
}
err = (&mut err_recv) => {
if let Ok(e) = err {
ws.close_result(Err::<&str, _>(e.clone_output())).await?;
return Err(e)
}
}
}

ws.normal_close("complete").await?;

Ok::<_, Error>(())
}
.await
{
tracing::error!("Error tracking sideload progress: {e}");
tracing::debug!("{e:?}");
}
}
.await
{
tracing::error!("Error tracking sideload progress: {e}");
tracing::debug!("{e:?}");
}
},
Duration::from_secs(600),
),
Expand All @@ -255,9 +260,9 @@ pub async fn sideload(
}
.await
{
let _ = err_send.send(e.clone_output());
tracing::error!("Error sideloading package: {e}");
tracing::debug!("{e:?}");
let _ = err_send.send(e);
}
});
Ok(SideloadResponse { upload, progress })
Expand Down
47 changes: 21 additions & 26 deletions core/startos/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::error::ResultExt;
use crate::lxc::ContainerId;
use crate::prelude::*;
use crate::rpc_continuations::{Guid, RpcContinuation, RpcContinuations};
use crate::util::net::WebSocketExt;
use crate::util::serde::Reversible;
use crate::util::Invoke;

Expand Down Expand Up @@ -80,34 +81,28 @@ async fn ws_handler(
.with_kind(ErrorKind::Network)?;
}

let mut ws_closed = false;
while let Some(entry) = tokio::select! {
a = logs.try_next() => Some(a?),
a = stream.try_next() => { a.with_kind(crate::ErrorKind::Network)?; ws_closed = true; None }
} {
if let Some(entry) = entry {
let (_, log_entry) = entry.log_entry()?;
stream
.send(ws::Message::Text(
serde_json::to_string(&log_entry).with_kind(ErrorKind::Serialization)?,
))
.await
.with_kind(ErrorKind::Network)?;
loop {
tokio::select! {
entry = logs.try_next() => {
if let Some(entry) = entry? {
let (_, log_entry) = entry.log_entry()?;
stream
.send(ws::Message::Text(
serde_json::to_string(&log_entry).with_kind(ErrorKind::Serialization)?,
))
.await
.with_kind(ErrorKind::Network)?;
} else {
return stream.normal_close("complete").await;
}
},
msg = stream.try_next() => {
if msg.with_kind(crate::ErrorKind::Network)?.is_none() {
return Ok(())
}
}
}
}

if !ws_closed {
stream
.send(ws::Message::Close(Some(ws::CloseFrame {
code: ws::close_code::NORMAL,
reason: "Log Stream Finished".into(),
})))
.await
.with_kind(ErrorKind::Network)?;
drop(stream);
}

Ok(())
}

#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
Expand Down
31 changes: 16 additions & 15 deletions core/startos/src/net/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ use id_pool::IdPool;
use imbl_value::InternedString;
use serde::{Deserialize, Serialize};
use tokio::process::Command;
use tokio::sync::{mpsc, watch};
use tokio::sync::mpsc;

use crate::db::model::public::NetworkInterfaceInfo;
use crate::prelude::*;
use crate::util::sync::Watch;
use crate::util::Invoke;

pub const START9_BRIDGE_IFACE: &str = "lxcbr0";
Expand Down Expand Up @@ -147,17 +148,16 @@ pub struct LanPortForwardController {
_thread: NonDetachingJoinHandle<()>,
}
impl LanPortForwardController {
pub fn new(
mut net_iface: watch::Receiver<BTreeMap<InternedString, NetworkInterfaceInfo>>,
) -> Self {
pub fn new(mut ip_info: Watch<BTreeMap<InternedString, NetworkInterfaceInfo>>) -> Self {
let (req_send, mut req_recv) = mpsc::unbounded_channel();
let thread = NonDetachingJoinHandle::from(tokio::spawn(async move {
let mut state = ForwardState::default();
let mut interfaces = net_iface
.borrow_and_update()
.iter()
.map(|(iface, info)| (iface.clone(), info.public()))
.collect();
let mut interfaces = ip_info.peek_and_mark_seen(|ip_info| {
ip_info
.iter()
.map(|(iface, info)| (iface.clone(), info.public()))
.collect()
});
let mut reply: Option<oneshot::Sender<Result<(), Error>>> = None;
loop {
tokio::select! {
Expand All @@ -171,12 +171,13 @@ impl LanPortForwardController {
break;
}
}
_ = net_iface.changed() => {
interfaces = net_iface
.borrow()
.iter()
.map(|(iface, info)| (iface.clone(), info.public()))
.collect();
_ = ip_info.changed() => {
interfaces = ip_info.peek(|ip_info| {
ip_info
.iter()
.map(|(iface, info)| (iface.clone(), info.public()))
.collect()
});
}
}
let res = state.sync(&interfaces).await;
Expand Down
Loading

0 comments on commit 5d759f8

Please sign in to comment.