From 5d759f810ca02cea240392d593afded852165a7e Mon Sep 17 00:00:00 2001 From: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com> Date: Tue, 14 Jan 2025 03:39:52 +0000 Subject: [PATCH] Bugfix/websockets (#2808) * retry logic for init status * fix login flashing and sideload hanging * add logging * misc backend bugfixes * use closingObserver instead * always show reinstall button * go back to endWith * show error if sideload fails * refactor more watch channels * navigate to services page on sideload complete * handle error closure events properly * handle error scenario better in sideload websocket * remove a clone --------- Co-authored-by: Matt Hill --- core/startos/src/context/setup.rs | 37 +++-- core/startos/src/db/mod.rs | 29 ++-- core/startos/src/install/mod.rs | 49 +++--- core/startos/src/logs.rs | 47 +++--- core/startos/src/net/forward.rs | 31 ++-- core/startos/src/net/network_interface.rs | 54 ++----- core/startos/src/net/web_server.rs | 126 +++++++--------- core/startos/src/service/service_actor.rs | 4 +- core/startos/src/update/mod.rs | 26 ++-- core/startos/src/util/net.rs | 21 +-- core/startos/src/util/sync.rs | 141 ++++++++++++++++++ .../src/version/update_details/v0_3_6.md | 29 ++-- web/projects/ui/src/app/app-routing.module.ts | 2 +- .../ui/src/app/pages/init/init.service.ts | 54 +++---- .../marketplace-show-controls.component.html | 20 ++- .../marketplace-show-controls.component.ts | 4 - .../server-routes/sideload/sideload.page.ts | 9 +- .../sideload/sideload.service.ts | 39 ++++- .../app/services/api/embassy-api.service.ts | 3 +- .../services/api/embassy-live-api.service.ts | 4 +- .../services/api/embassy-mock-api.service.ts | 11 +- .../ui/src/app/services/auth.service.ts | 4 +- 22 files changed, 451 insertions(+), 293 deletions(-) diff --git a/core/startos/src/context/setup.rs b/core/startos/src/context/setup.rs index 30ca1ad39..5c2bf8bfc 100644 --- a/core/startos/src/context/setup.rs +++ b/core/startos/src/context/setup.rs @@ -1,5 +1,5 @@ use std::ops::Deref; -use std::path::{Path}; +use std::path::Path; use std::sync::Arc; use std::time::Duration; @@ -162,21 +162,30 @@ impl SetupContext { if let Err(e) = async { let mut stream = progress_tracker.stream(Some(Duration::from_millis(100))); - while let Some(progress) = stream.next().await { - ws.send(ws::Message::Text( - serde_json::to_string(&progress) - .with_kind(ErrorKind::Serialization)?, - )) - .await - .with_kind(ErrorKind::Network)?; - if progress.overall.is_complete() { - break; + loop { + tokio::select! { + progress = stream.next() => { + if let Some(progress) = progress { + ws.send(ws::Message::Text( + serde_json::to_string(&progress) + .with_kind(ErrorKind::Serialization)?, + )) + .await + .with_kind(ErrorKind::Network)?; + if progress.overall.is_complete() { + return ws.normal_close("complete").await; + } + } else { + return ws.normal_close("complete").await; + } + } + msg = ws.recv() => { + if msg.transpose().with_kind(ErrorKind::Network)?.is_none() { + return Ok(()) + } + } } } - - ws.normal_close("complete").await?; - - Ok::<_, Error>(()) } .await { diff --git a/core/startos/src/db/mod.rs b/core/startos/src/db/mod.rs index c7e38e81c..135153935 100644 --- a/core/startos/src/db/mod.rs +++ b/core/startos/src/db/mod.rs @@ -198,17 +198,26 @@ pub async fn subscribe( session, |mut ws| async move { if let Err(e) = async { - while let Some(rev) = sub.recv().await { - ws.send(ws::Message::Text( - serde_json::to_string(&rev).with_kind(ErrorKind::Serialization)?, - )) - .await - .with_kind(ErrorKind::Network)?; + loop { + tokio::select! { + rev = sub.recv() => { + if let Some(rev) = rev { + ws.send(ws::Message::Text( + serde_json::to_string(&rev).with_kind(ErrorKind::Serialization)?, + )) + .await + .with_kind(ErrorKind::Network)?; + } else { + return ws.normal_close("complete").await; + } + } + msg = ws.recv() => { + if msg.transpose().with_kind(ErrorKind::Network)?.is_none() { + return Ok(()) + } + } + } } - - ws.normal_close("complete").await?; - - Ok::<_, Error>(()) } .await { diff --git a/core/startos/src/install/mod.rs b/core/startos/src/install/mod.rs index bc971da37..33c2ea6c7 100644 --- a/core/startos/src/install/mod.rs +++ b/core/startos/src/install/mod.rs @@ -2,6 +2,7 @@ use std::ops::Deref; use std::path::PathBuf; use std::time::Duration; +use axum::extract::ws; use clap::builder::ValueParserFactory; use clap::{value_parser, CommandFactory, FromArgMatches, Parser}; use color_eyre::eyre::eyre; @@ -12,7 +13,7 @@ use itertools::Itertools; use models::{FromStrParser, VersionString}; use reqwest::header::{HeaderMap, CONTENT_LENGTH}; use reqwest::Url; -use rpc_toolkit::yajrc::{GenericRpcMethod, RpcError}; +use rpc_toolkit::yajrc::RpcError; use rpc_toolkit::HandlerArgs; use rustyline_async::ReadlineEvent; use serde::{Deserialize, Serialize}; @@ -188,7 +189,7 @@ pub async fn sideload( SideloadParams { session }: SideloadParams, ) -> Result { let (upload, file) = upload(&ctx, session.clone()).await?; - let (err_send, err_recv) = oneshot::channel::(); + let (err_send, mut err_recv) = oneshot::channel::(); let progress = Guid::new(); let progress_tracker = FullProgressTracker::new(); let mut progress_listener = progress_tracker.stream(Some(Duration::from_millis(200))); @@ -198,40 +199,44 @@ pub async fn sideload( RpcContinuation::ws_authed( &ctx, session, - |mut ws| { - use axum::extract::ws::Message; - async move { - if let Err(e) = async { + |mut ws| async move { + if let Err(e) = async { + loop { tokio::select! { - res = async { - while let Some(progress) = progress_listener.next().await { - ws.send(Message::Text( + progress = progress_listener.next() => { + if let Some(progress) = progress { + ws.send(ws::Message::Text( serde_json::to_string(&progress) .with_kind(ErrorKind::Serialization)?, )) .await .with_kind(ErrorKind::Network)?; + if progress.overall.is_complete() { + return ws.normal_close("complete").await; + } + } else { + return ws.normal_close("complete").await; } - Ok::<_, Error>(()) - } => res?, - err = err_recv => { + } + msg = ws.recv() => { + if msg.transpose().with_kind(ErrorKind::Network)?.is_none() { + return Ok(()) + } + } + err = (&mut err_recv) => { if let Ok(e) = err { ws.close_result(Err::<&str, _>(e.clone_output())).await?; return Err(e) } } } - - ws.normal_close("complete").await?; - - Ok::<_, Error>(()) - } - .await - { - tracing::error!("Error tracking sideload progress: {e}"); - tracing::debug!("{e:?}"); } } + .await + { + tracing::error!("Error tracking sideload progress: {e}"); + tracing::debug!("{e:?}"); + } }, Duration::from_secs(600), ), @@ -255,9 +260,9 @@ pub async fn sideload( } .await { - let _ = err_send.send(e.clone_output()); tracing::error!("Error sideloading package: {e}"); tracing::debug!("{e:?}"); + let _ = err_send.send(e); } }); Ok(SideloadResponse { upload, progress }) diff --git a/core/startos/src/logs.rs b/core/startos/src/logs.rs index 1cb3327b6..be5e9cb8f 100644 --- a/core/startos/src/logs.rs +++ b/core/startos/src/logs.rs @@ -30,6 +30,7 @@ use crate::error::ResultExt; use crate::lxc::ContainerId; use crate::prelude::*; use crate::rpc_continuations::{Guid, RpcContinuation, RpcContinuations}; +use crate::util::net::WebSocketExt; use crate::util::serde::Reversible; use crate::util::Invoke; @@ -80,34 +81,28 @@ async fn ws_handler( .with_kind(ErrorKind::Network)?; } - let mut ws_closed = false; - while let Some(entry) = tokio::select! { - a = logs.try_next() => Some(a?), - a = stream.try_next() => { a.with_kind(crate::ErrorKind::Network)?; ws_closed = true; None } - } { - if let Some(entry) = entry { - let (_, log_entry) = entry.log_entry()?; - stream - .send(ws::Message::Text( - serde_json::to_string(&log_entry).with_kind(ErrorKind::Serialization)?, - )) - .await - .with_kind(ErrorKind::Network)?; + loop { + tokio::select! { + entry = logs.try_next() => { + if let Some(entry) = entry? { + let (_, log_entry) = entry.log_entry()?; + stream + .send(ws::Message::Text( + serde_json::to_string(&log_entry).with_kind(ErrorKind::Serialization)?, + )) + .await + .with_kind(ErrorKind::Network)?; + } else { + return stream.normal_close("complete").await; + } + }, + msg = stream.try_next() => { + if msg.with_kind(crate::ErrorKind::Network)?.is_none() { + return Ok(()) + } + } } } - - if !ws_closed { - stream - .send(ws::Message::Close(Some(ws::CloseFrame { - code: ws::close_code::NORMAL, - reason: "Log Stream Finished".into(), - }))) - .await - .with_kind(ErrorKind::Network)?; - drop(stream); - } - - Ok(()) } #[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] diff --git a/core/startos/src/net/forward.rs b/core/startos/src/net/forward.rs index ba62945a0..6f3abee15 100644 --- a/core/startos/src/net/forward.rs +++ b/core/startos/src/net/forward.rs @@ -8,10 +8,11 @@ use id_pool::IdPool; use imbl_value::InternedString; use serde::{Deserialize, Serialize}; use tokio::process::Command; -use tokio::sync::{mpsc, watch}; +use tokio::sync::mpsc; use crate::db::model::public::NetworkInterfaceInfo; use crate::prelude::*; +use crate::util::sync::Watch; use crate::util::Invoke; pub const START9_BRIDGE_IFACE: &str = "lxcbr0"; @@ -147,17 +148,16 @@ pub struct LanPortForwardController { _thread: NonDetachingJoinHandle<()>, } impl LanPortForwardController { - pub fn new( - mut net_iface: watch::Receiver>, - ) -> Self { + pub fn new(mut ip_info: Watch>) -> Self { let (req_send, mut req_recv) = mpsc::unbounded_channel(); let thread = NonDetachingJoinHandle::from(tokio::spawn(async move { let mut state = ForwardState::default(); - let mut interfaces = net_iface - .borrow_and_update() - .iter() - .map(|(iface, info)| (iface.clone(), info.public())) - .collect(); + let mut interfaces = ip_info.peek_and_mark_seen(|ip_info| { + ip_info + .iter() + .map(|(iface, info)| (iface.clone(), info.public())) + .collect() + }); let mut reply: Option>> = None; loop { tokio::select! { @@ -171,12 +171,13 @@ impl LanPortForwardController { break; } } - _ = net_iface.changed() => { - interfaces = net_iface - .borrow() - .iter() - .map(|(iface, info)| (iface.clone(), info.public())) - .collect(); + _ = ip_info.changed() => { + interfaces = ip_info.peek(|ip_info| { + ip_info + .iter() + .map(|(iface, info)| (iface.clone(), info.public())) + .collect() + }); } } let res = state.sync(&interfaces).await; diff --git a/core/startos/src/net/network_interface.rs b/core/startos/src/net/network_interface.rs index 991765c86..4f7696c97 100644 --- a/core/startos/src/net/network_interface.rs +++ b/core/startos/src/net/network_interface.rs @@ -1,7 +1,5 @@ use std::collections::{BTreeMap, BTreeSet}; -use std::future::Future; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV6}; -use std::pin::Pin; use std::sync::{Arc, Weak}; use std::task::Poll; use std::time::Duration; @@ -19,7 +17,6 @@ use serde::{Deserialize, Serialize}; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::net::{TcpListener, TcpStream}; use tokio::process::Command; -use tokio::sync::watch; use ts_rs::TS; use zbus::proxy::{PropertyChanged, PropertyStream, SignalStream}; use zbus::zvariant::{ @@ -35,7 +32,7 @@ use crate::prelude::*; use crate::util::future::Until; use crate::util::io::open_file; use crate::util::serde::{display_serializable, HandlerExtSerde}; -use crate::util::sync::SyncMutex; +use crate::util::sync::{SyncMutex, Watch}; use crate::util::Invoke; pub fn network_interface_api() -> ParentHandler { @@ -112,7 +109,7 @@ pub fn network_interface_api() -> ParentHandler { async fn list_interfaces( ctx: RpcContext, ) -> Result, Error> { - Ok(ctx.net_controller.net_iface.ip_info.borrow().clone()) + Ok(ctx.net_controller.net_iface.ip_info.read()) } #[derive(Debug, Clone, Deserialize, Serialize, Parser, TS)] @@ -322,7 +319,7 @@ impl<'a> StubStream<'a> for SignalStream<'a> { } #[instrument(skip_all)] -async fn watcher(write_to: watch::Sender>) { +async fn watcher(write_to: Watch>) { loop { let res: Result<(), Error> = async { let connection = Connection::system().await?; @@ -425,7 +422,7 @@ async fn watch_ip( connection: &Connection, device_proxy: device::DeviceProxy<'_>, iface: InternedString, - write_to: &watch::Sender>, + write_to: &Watch>, ) -> Result<(), Error> { let mut until = Until::new() .with_stream( @@ -593,13 +590,13 @@ async fn watch_ip( pub struct NetworkInterfaceController { db: TypedPatchDb, - ip_info: watch::Sender>, + ip_info: Watch>, _watcher: NonDetachingJoinHandle<()>, listeners: SyncMutex>>, } impl NetworkInterfaceController { - pub fn subscribe(&self) -> watch::Receiver> { - self.ip_info.subscribe() + pub fn subscribe(&self) -> Watch> { + self.ip_info.clone_unseen() } async fn sync( @@ -667,7 +664,7 @@ impl NetworkInterfaceController { Ok(()) } pub fn new(db: TypedPatchDb) -> Self { - let (ip_info, mut recv) = watch::channel(BTreeMap::new()); + let mut ip_info = Watch::new(BTreeMap::new()); Self { db: db.clone(), ip_info: ip_info.clone(), @@ -695,7 +692,7 @@ impl NetworkInterfaceController { let res: Result<(), Error> = async { loop { if let Err(e) = async { - let ip_info = { recv.borrow().clone() }; + let ip_info = ip_info.read(); Self::sync(&db, &ip_info).boxed().await?; Ok::<_, Error>(()) @@ -706,7 +703,7 @@ impl NetworkInterfaceController { tracing::debug!("{e:?}"); } - let _ = recv.changed().await; + let _ = ip_info.changed().await; } } .await; @@ -733,12 +730,10 @@ impl NetworkInterfaceController { l.insert(port, Arc::downgrade(&arc)); Ok(()) })?; - let mut ip_info = self.ip_info.subscribe(); - ip_info.mark_changed(); + let ip_info = self.ip_info.clone_unseen(); Ok(NetworkInterfaceListener { _arc: arc, ip_info, - changed: None, listeners: ListenerMap::new(port), }) } @@ -760,12 +755,11 @@ impl NetworkInterfaceController { l.insert(port, Arc::downgrade(&arc)); Ok(()) })?; - let mut ip_info = self.ip_info.subscribe(); + let ip_info = self.ip_info.clone_unseen(); ip_info.mark_changed(); Ok(NetworkInterfaceListener { _arc: arc, ip_info, - changed: None, listeners, }) } @@ -985,9 +979,8 @@ impl ListenerMap { } pub struct NetworkInterfaceListener { - ip_info: watch::Receiver>, + ip_info: Watch>, listeners: ListenerMap, - changed: Option + Send + Sync + 'static>>>, _arc: Arc<()>, } impl NetworkInterfaceListener { @@ -995,29 +988,14 @@ impl NetworkInterfaceListener { self.listeners.port } - fn poll_ip_info_changed(&mut self, cx: &mut std::task::Context<'_>) -> Poll<()> { - let mut changed = if let Some(changed) = self.changed.take() { - changed - } else { - let mut ip_info = self.ip_info.clone(); - Box::pin(async move { - let _ = ip_info.changed().await; - }) - }; - let res = changed.poll_unpin(cx); - if res.is_pending() { - self.changed = Some(changed); - } - res - } - pub fn poll_accept( &mut self, cx: &mut std::task::Context<'_>, public: bool, ) -> Poll> { - if self.poll_ip_info_changed(cx).is_ready() || public != self.listeners.prev_public { - self.listeners.update(&*self.ip_info.borrow(), public)?; + while self.ip_info.poll_changed(cx).is_ready() || public != self.listeners.prev_public { + self.ip_info + .peek(|ip_info| self.listeners.update(ip_info, public))?; } self.listeners.poll_accept(cx) } diff --git a/core/startos/src/net/web_server.rs b/core/startos/src/net/web_server.rs index b38a7ee56..468b5d5db 100644 --- a/core/startos/src/net/web_server.rs +++ b/core/startos/src/net/web_server.rs @@ -7,13 +7,12 @@ use std::task::Poll; use std::time::Duration; use axum::Router; -use futures::future::{BoxFuture, Either}; +use futures::future::Either; use futures::FutureExt; use helpers::NonDetachingJoinHandle; use hyper_util::rt::{TokioIo, TokioTimer}; -use hyper_util::service::TowerToHyperService; use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::{oneshot, watch}; +use tokio::sync::oneshot; use crate::context::{DiagnosticContext, InitContext, InstallContext, RpcContext, SetupContext}; use crate::net::network_interface::NetworkInterfaceListener; @@ -23,6 +22,7 @@ use crate::net::static_server::{ }; use crate::prelude::*; use crate::util::actor::background::BackgroundJobQueue; +use crate::util::sync::Watch; pub struct Accepted { pub https_redirect: bool, @@ -76,42 +76,22 @@ impl Accept for Option { #[pin_project::pin_project] pub struct Acceptor { - acceptor: (watch::Sender, watch::Receiver), - changed: Option>, + acceptor: Watch, } impl Acceptor { pub fn new(acceptor: A) -> Self { Self { - acceptor: watch::channel(acceptor), - changed: None, + acceptor: Watch::new(acceptor), } } fn poll_changed(&mut self, cx: &mut std::task::Context<'_>) -> Poll<()> { - let mut changed = if let Some(changed) = self.changed.take() { - changed - } else { - let mut recv = self.acceptor.1.clone(); - async move { - let _ = recv.changed().await; - } - .boxed() - }; - let res = changed.poll_unpin(cx); - if res.is_pending() { - self.changed = Some(changed); - } - res + self.acceptor.poll_changed(cx) } fn poll_accept(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { let _ = self.poll_changed(cx); - let mut res = Poll::Pending; - self.acceptor.0.send_if_modified(|a| { - res = a.poll_accept(cx); - false - }); - res + self.acceptor.peek_mut(|a| a.poll_accept(cx)) } async fn accept(&mut self) -> Result { @@ -139,7 +119,7 @@ impl Acceptor { } pub struct WebServerAcceptorSetter { - acceptor: watch::Sender, + acceptor: Watch, } impl WebServerAcceptorSetter>> { pub fn try_upgrade Result>(&self, f: F) -> Result<(), Error> { @@ -160,7 +140,7 @@ impl WebServerAcceptorSetter>> { } } impl Deref for WebServerAcceptorSetter { - type Target = watch::Sender; + type Target = Watch; fn deref(&self) -> &Self::Target { &self.acceptor } @@ -168,8 +148,8 @@ impl Deref for WebServerAcceptorSetter { pub struct WebServer { shutdown: oneshot::Sender<()>, - router: watch::Sender>, - acceptor: watch::Sender, + router: Watch>, + acceptor: Watch, thread: NonDetachingJoinHandle<()>, } impl WebServer { @@ -180,8 +160,9 @@ impl WebServer { } pub fn new(mut acceptor: Acceptor) -> Self { - let acceptor_send = acceptor.acceptor.0.clone(); - let (router, service) = watch::channel::>(None); + let acceptor_send = acceptor.acceptor.clone(); + let router = Watch::>::new(None); + let service = router.clone_unseen(); let (shutdown, shutdown_recv) = oneshot::channel(); let thread = NonDetachingJoinHandle::from(tokio::spawn(async move { #[derive(Clone)] @@ -201,6 +182,34 @@ impl WebServer { } } + struct SwappableRouter(Watch>, bool); + impl hyper::service::Service> for SwappableRouter { + type Response = , + >>::Response; + type Error = , + >>::Error; + type Future = , + >>::Future; + + fn call(&self, req: hyper::Request) -> Self::Future { + use tower_service::Service; + + if self.1 { + redirecter().call(req) + } else { + let router = self.0.read(); + if let Some(mut router) = router { + router.call(req) + } else { + refresher().call(req) + } + } + } + } + let accept = AtomicBool::new(true); let queue_cell = Arc::new(RwLock::new(None)); let graceful = hyper_util::server::graceful::GracefulShutdown::new(); @@ -224,45 +233,16 @@ impl WebServer { loop { if let Err(e) = async { let accepted = acceptor.accept().await?; - if accepted.https_redirect { - queue.add_job( - graceful.watch( - server - .serve_connection_with_upgrades( - TokioIo::new(accepted.stream), - TowerToHyperService::new(redirecter().into_service()), - ) - .into_owned(), - ), - ); - } else { - let service = { service.borrow().clone() }; - if let Some(service) = service { - queue.add_job( - graceful.watch( - server - .serve_connection_with_upgrades( - TokioIo::new(accepted.stream), - TowerToHyperService::new(service.into_service()), - ) - .into_owned(), - ), - ); - } else { - queue.add_job( - graceful.watch( - server - .serve_connection_with_upgrades( - TokioIo::new(accepted.stream), - TowerToHyperService::new( - refresher().into_service(), - ), - ) - .into_owned(), - ), - ); - } - } + queue.add_job( + graceful.watch( + server + .serve_connection_with_upgrades( + TokioIo::new(accepted.stream), + SwappableRouter(service.clone(), accepted.https_redirect), + ) + .into_owned(), + ), + ); Ok::<_, Error>(()) } @@ -303,7 +283,7 @@ impl WebServer { } pub fn serve_router(&mut self, router: Router) { - self.router.send_replace(Some(router)); + self.router.send(Some(router)) } pub fn serve_main(&mut self, ctx: RpcContext) { diff --git a/core/startos/src/service/service_actor.rs b/core/startos/src/service/service_actor.rs index b88136650..bb30d2a98 100644 --- a/core/startos/src/service/service_actor.rs +++ b/core/startos/src/service/service_actor.rs @@ -66,9 +66,7 @@ impl Actor for ServiceActor { tracing::debug!("{e:?}"); } - if ip_info.changed().await.is_err() { - break; - }; + ip_info.changed().await; } }); } diff --git a/core/startos/src/update/mod.rs b/core/startos/src/update/mod.rs index d88838d4a..7daa4f3b2 100644 --- a/core/startos/src/update/mod.rs +++ b/core/startos/src/update/mod.rs @@ -20,7 +20,7 @@ use ts_rs::TS; use crate::context::{CliContext, RpcContext}; use crate::disk::mount::filesystem::bind::Bind; use crate::disk::mount::filesystem::block_dev::BlockDev; -use crate::disk::mount::filesystem::efivarfs::{ EfiVarFs}; +use crate::disk::mount::filesystem::efivarfs::EfiVarFs; use crate::disk::mount::filesystem::overlayfs::OverlayGuard; use crate::disk::mount::filesystem::MountType; use crate::disk::mount::guard::{GenericMountGuard, MountGuard, TmpMountGuard}; @@ -106,7 +106,7 @@ pub async fn update_system( .with_kind(ErrorKind::Database)?, ) .await; - while { + loop { let progress = ctx .db .peek() @@ -122,14 +122,22 @@ pub async fn update_system( )) .await .with_kind(ErrorKind::Network)?; - progress.is_some() - } { - sub.recv().await; + if progress.is_none() { + return ws.normal_close("complete").await; + } + tokio::select! { + _ = sub.recv() => (), + res = async { + loop { + if ws.recv().await.transpose().with_kind(ErrorKind::Network)?.is_none() { + return Ok(()) + } + } + } => { + return res + } + } } - - ws.normal_close("complete").await?; - - Ok::<_, Error>(()) } .await { diff --git a/core/startos/src/util/net.rs b/core/startos/src/util/net.rs index 9e1beeaba..1189f70f2 100644 --- a/core/startos/src/util/net.rs +++ b/core/startos/src/util/net.rs @@ -19,13 +19,8 @@ pub trait WebSocketExt { } impl WebSocketExt for ws::WebSocket { - async fn normal_close(mut self, msg: impl Into> + Send) -> Result<(), Error> { - self.send(ws::Message::Close(Some(CloseFrame { - code: 1000, - reason: msg.into(), - }))) - .await - .with_kind(ErrorKind::Network) + async fn normal_close(self, msg: impl Into> + Send) -> Result<(), Error> { + self.close_result(Ok::<_, Error>(msg)).await } async fn close_result( mut self, @@ -38,15 +33,23 @@ impl WebSocketExt for ws::WebSocket { reason: msg.into(), }))) .await - .with_kind(ErrorKind::Network), + .with_kind(ErrorKind::Network)?, Err(e) => self .send(ws::Message::Close(Some(CloseFrame { code: 1011, reason: e.to_string().into(), }))) .await - .with_kind(ErrorKind::Network), + .with_kind(ErrorKind::Network)?, } + while !matches!( + self.recv() + .await + .transpose() + .with_kind(ErrorKind::Network)?, + Some(ws::Message::Close(_)) | None + ) {} + Ok(()) } } diff --git a/core/startos/src/util/sync.rs b/core/startos/src/util/sync.rs index 2630858a9..ca7b4c6bb 100644 --- a/core/startos/src/util/sync.rs +++ b/core/startos/src/util/sync.rs @@ -1,3 +1,7 @@ +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Poll, Waker}; + #[derive(Debug, Default)] pub struct SyncMutex(std::sync::Mutex); impl SyncMutex { @@ -11,3 +15,140 @@ impl SyncMutex { f(&*self.0.lock().unwrap()) } } + +struct WatchShared { + version: u64, + data: T, + wakers: Vec, +} +impl WatchShared { + fn modified(&mut self) { + self.version += 1; + for waker in self.wakers.drain(..) { + waker.wake(); + } + } +} + +#[pin_project::pin_project] +pub struct Watch { + shared: Arc>>, + version: u64, +} +impl Clone for Watch { + fn clone(&self) -> Self { + Self { + shared: self.shared.clone(), + version: self.version, + } + } +} +impl Watch { + pub fn new(init: T) -> Self { + Self { + shared: Arc::new(SyncMutex::new(WatchShared { + version: 1, + data: init, + wakers: Vec::new(), + })), + version: 0, + } + } + pub fn clone_unseen(&self) -> Self { + Self { + shared: self.shared.clone(), + version: 0, + } + } + pub fn poll_changed(&mut self, cx: &mut std::task::Context<'_>) -> Poll<()> { + self.shared.mutate(|shared| { + if shared.version != self.version { + self.version = shared.version; + Poll::Ready(()) + } else { + let waker = cx.waker(); + if !shared.wakers.iter().any(|w| w.will_wake(waker)) { + shared.wakers.push(waker.clone()); + } + Poll::Pending + } + }) + } + pub async fn changed(&mut self) { + futures::future::poll_fn(|cx| self.poll_changed(cx)).await + } + pub fn send_if_modified bool>(&self, modify: F) -> bool { + self.shared.mutate(|shared| { + let changed = modify(&mut shared.data); + if changed { + shared.modified(); + } + changed + }) + } + pub fn send_modify U>(&self, modify: F) -> U { + self.shared.mutate(|shared| { + let res = modify(&mut shared.data); + shared.modified(); + res + }) + } + pub fn send_replace(&self, new: T) -> T { + self.send_modify(|a| std::mem::replace(a, new)) + } + pub fn send(&self, new: T) { + self.send_replace(new); + } + pub fn mark_changed(&self) { + self.shared.mutate(|shared| shared.modified()) + } + pub fn mark_unseen(&mut self) { + self.version = 0; + } + pub fn mark_seen(&mut self) { + self.shared.peek(|shared| { + self.version = shared.version; + }) + } + pub fn peek U>(&self, f: F) -> U { + self.shared.peek(|shared| f(&shared.data)) + } + pub fn peek_and_mark_seen U>(&mut self, f: F) -> U { + self.shared.peek(|shared| { + self.version = shared.version; + f(&shared.data) + }) + } + pub fn peek_mut U>(&self, f: F) -> U { + self.shared.mutate(|shared| f(&mut shared.data)) + } +} +impl Watch { + pub fn read(&self) -> T { + self.peek(|a| a.clone()) + } +} +impl futures::Stream for Watch { + type Item = T; + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let this = self.project(); + this.shared.mutate(|shared| { + if shared.version != *this.version { + *this.version = shared.version; + Poll::Ready(Some(shared.data.clone())) + } else { + let waker = cx.waker(); + if !shared.wakers.iter().any(|w| w.will_wake(waker)) { + shared.wakers.push(waker.clone()); + } + Poll::Pending + } + }) + } + fn size_hint(&self) -> (usize, Option) { + (1, None) + } +} diff --git a/core/startos/src/version/update_details/v0_3_6.md b/core/startos/src/version/update_details/v0_3_6.md index b93e8ad1a..177845f08 100644 --- a/core/startos/src/version/update_details/v0_3_6.md +++ b/core/startos/src/version/update_details/v0_3_6.md @@ -17,17 +17,16 @@ StartOS v0.3.6 is a complete rewrite of the OS internals (everything you don't s ## Changelog - [Switch to lxc-based container runtime](#lxc) -- [Update s9pk archive format](#new-s9pk-archive-format) -- [Improve config](#better-config) -- [Unify Actions](#unify-actions) +- [Update s9pk archive format](#s9pk-archive-format) +- [Improve Actions](#actions) - [Use squashfs images for OS updates](#squashfs-updates) -- [Introduce Typescript package API and SDK](#typescript-package-api-and-sdk) +- [Introduce Typescript package API and SDK](#typescript-sdk) - [Remove Postgresql](#remove-postgressql) - [Implement detailed progress reporting](#progress-reporting) - [Improve registry protocol](#registry-protocol) - [Replace unique .local URLs with unique ports](#lan-port-forwarding) - [Use start-fs Fuse module for improved backups](#improved-backups) -- [Switch to Exver for versioning](#Exver) +- [Switch to Exver for versioning](#exver) - [Support clearnet hosting via start-cli](#clearnet) ### LXC @@ -38,21 +37,17 @@ StartOS now uses a nested container paradigm based on LXC for the outer containe The S9PK archive format has been overhauled to allow for signature verification of partial downloads, and allow direct mounting of container images without unpacking the s9pk. -### Better config - -Expanded support for input types and a new UI makes configuring services easier and more powerful. - ### Actions -Actions take arbitrary form input _and_ return arbitrary responses, thus satisfying the needs of both Config and Properties, which will be removed in a future release. This gives packages developers the ability to break up Config and Properties into smaller, more specific formats, or to exclude them entirely without polluting the UI. +Actions take arbitrary form input and return arbitrary responses, thus satisfying the needs of both Config and Properties, which have been removed. The new actions API gives packages developers the ability to break up Config and Properties into smaller, more specific formats, or to exclude them entirely without polluting the UI. Improved form design and new input types round out the actions experience. ### Squashfs updates StartOS now uses squashfs images to represent OS updates. This allows for better update verification, and improved reliability over rsync updates. -### Typescript package API and SDK +### Typescript SDK -StartOS now exposes a Typescript API. Package developers can take advantage in a simple, typesafe way using the new start-sdk. A barebones StartOS package (s9pk) can be produced in minutes with minimal knowledge or skill. More advanced developers can use the SDK to create highly customized user experiences with their service. +Package developers can now take advantage of StartOS APIs using the new start-sdk, available in Typescript. A barebones StartOS package (s9pk) can be produced in minutes with minimal knowledge or skill. More advanced developers can use the SDK to create highly customized user experiences with their service. ### Remove PostgresSQL @@ -76,8 +71,14 @@ The new start-fs fuse module unifies file system expectations for various platfo ### Exver -StartOS now uses Extended Versioning (Exver), which consists of three parts, separated by semicolons: (1) a Semver-compliant upstream version, (2) a Semver-compliant wrapper version, and (3) an optional "flavor" prefix. Flavors can be thought of as alternative implementations of services, where a user would only want one or the other installed, and data can feasibly be migrating beetween the two. Another common characteristic of flavors is that they satisfy the same API requirement of dependents, though this is not strictly necessary. A valid Exver looks something like this: `#knots:28.0.:1.0-beta.1`. This would translate to "the first beta release of StartOS wrapper version 1.0 of Bitcoin Knots version 27.0". +StartOS now uses Extended Versioning (Exver), which consists of three parts: (1) a Semver-compliant upstream version, (2) a Semver-compliant wrapper version, and (3) an optional "flavor" prefix. Flavors can be thought of as alternative implementations of services, where a user would only want one or the other installed, and data can feasibly be migrating between the two. Another common characteristic of flavors is that they satisfy the same API requirement of dependents, though this is not strictly necessary. A valid Exver looks something like this: `#knots:28.0.:1.0-beta.1`. This would translate to "the first beta release of StartOS wrapper version 1.0 of Bitcoin Knots version 27.0". ### Clearnet -It is now possible, and quite easy, to expose specific services interfaces to the public Internet on a standard domain using start-cli. This functionality will be expanded upon and moved into the StartOS UI in a future release. +It is now possible, and quite easy, to expose service interfaces to the public Internet on a standard domain using start-cli. In addition to choosing which service interfaces to expose on which domains/subdomains, users have two options: + +1. Open ports on their router. This option is free and easy to accomplish with most routers. The drawback is that the user's home IP address is revealed to anyone accessing the exposes resources. For example, hosting a blog in this way would reveal your home IP address, and therefor your approximate location on Earth, to your readers. + +2. Use a Wireguard VPN to proxy web traffic. This option requires the user to provision a $5-$10/month remote VPS and perform a few, simple commands. The result is the successful obfuscation of the users home IP address. + +The CLI-driven clearnet functionality will be expanded upon and moved into the main StartOS UI in a future release. diff --git a/web/projects/ui/src/app/app-routing.module.ts b/web/projects/ui/src/app/app-routing.module.ts index e7a2036ec..e16085a97 100644 --- a/web/projects/ui/src/app/app-routing.module.ts +++ b/web/projects/ui/src/app/app-routing.module.ts @@ -12,7 +12,7 @@ const routes: Routes = [ }, { path: 'login', - canActivate: [UnauthGuard], + canActivate: [UnauthGuard, stateNot(['error', 'initializing'])], loadChildren: () => import('./pages/login/login.module').then(m => m.LoginPageModule), }, diff --git a/web/projects/ui/src/app/pages/init/init.service.ts b/web/projects/ui/src/app/pages/init/init.service.ts index c58e7777e..9fbcc933d 100644 --- a/web/projects/ui/src/app/pages/init/init.service.ts +++ b/web/projects/ui/src/app/pages/init/init.service.ts @@ -1,10 +1,8 @@ import { inject, Injectable } from '@angular/core' -import { ErrorService } from '@start9labs/shared' import { T } from '@start9labs/start-sdk' import { catchError, defer, - EMPTY, from, map, Observable, @@ -24,40 +22,46 @@ interface MappedProgress { export class InitService extends Observable { private readonly state = inject(StateService) private readonly api = inject(ApiService) - private readonly errorService = inject(ErrorService) private readonly progress$ = defer(() => from(this.api.initGetProgress()), ).pipe( switchMap(({ guid, progress }) => - this.api.openWebsocket$(guid).pipe(startWith(progress)), + this.api + .openWebsocket$(guid, { + closeObserver: { + next: () => { + this.state.syncState() + }, + }, + }) + .pipe(startWith(progress)), ), - map(({ phases, overall }) => { - return { - total: getOverallDecimal(overall), - message: phases - .filter( - ( - p, - ): p is { - name: string - progress: { - done: number - total: number | null - } - } => p.progress !== true && p.progress !== null, - ) - .map(p => `${p.name}${getPhaseBytes(p.progress)}`) - .join(', '), - } - }), + map(({ phases, overall }) => ({ + total: getOverallDecimal(overall), + message: phases + .filter( + ( + p, + ): p is { + name: string + progress: { + done: number + total: number | null + } + } => p.progress !== true && p.progress !== null, + ) + .map(p => `${p.name}${getPhaseBytes(p.progress)}`) + .join(', '), + })), tap(({ total }) => { if (total === 1) { this.state.syncState() } }), - catchError(e => { + catchError((e, caught$) => { console.error(e) - return EMPTY + this.state.syncState() + return caught$ }), ) diff --git a/web/projects/ui/src/app/pages/marketplace-routes/marketplace-show/marketplace-show-controls/marketplace-show-controls.component.html b/web/projects/ui/src/app/pages/marketplace-routes/marketplace-show/marketplace-show-controls/marketplace-show-controls.component.html index bdfe8e3ed..6178ebccc 100644 --- a/web/projects/ui/src/app/pages/marketplace-routes/marketplace-show/marketplace-show-controls/marketplace-show-controls.component.html +++ b/web/projects/ui/src/app/pages/marketplace-routes/marketplace-show/marketplace-show-controls/marketplace-show-controls.component.html @@ -35,19 +35,17 @@ > Downgrade - - - Reinstall - - + + Reinstall + - + console.error(e)) - await firstValueFrom(this.progress$.pipe(filter(Boolean))) + await this.api.uploadPackage(res.upload, this.toUpload.file!) + await firstValueFrom(this.progress$) } catch (e: any) { this.errorService.handleError(e) } finally { diff --git a/web/projects/ui/src/app/pages/server-routes/sideload/sideload.service.ts b/web/projects/ui/src/app/pages/server-routes/sideload/sideload.service.ts index d157272ff..df6ec0868 100644 --- a/web/projects/ui/src/app/pages/server-routes/sideload/sideload.service.ts +++ b/web/projects/ui/src/app/pages/server-routes/sideload/sideload.service.ts @@ -1,6 +1,16 @@ -import { Injectable } from '@angular/core' +import { inject, Injectable } from '@angular/core' +import { Router } from '@angular/router' +import { ErrorService } from '@start9labs/shared' import { T } from '@start9labs/start-sdk' -import { BehaviorSubject, endWith, shareReplay, Subject, switchMap } from 'rxjs' +import { + catchError, + EMPTY, + endWith, + shareReplay, + Subject, + switchMap, + tap, +} from 'rxjs' import { ApiService } from 'src/app/services/api/embassy-api.service' @Injectable({ @@ -8,11 +18,34 @@ import { ApiService } from 'src/app/services/api/embassy-api.service' }) export class SideloadService { private readonly guid$ = new Subject() + private readonly errorService = inject(ErrorService) + private readonly router = inject(Router) readonly progress$ = this.guid$.pipe( switchMap(guid => - this.api.openWebsocket$(guid).pipe(endWith(null)), + this.api + .openWebsocket$(guid, { + closeObserver: { + next: event => { + if (event.code !== 1000) { + this.errorService.handleError(event.reason) + } + }, + }, + }) + .pipe( + tap(p => { + if (p.overall === true) { + this.router.navigate([''], { replaceUrl: true }) + } + }), + endWith(null), + ), ), + catchError(e => { + this.errorService.handleError('Websocket connection broken. Try again.') + return EMPTY + }), shareReplay(1), ) diff --git a/web/projects/ui/src/app/services/api/embassy-api.service.ts b/web/projects/ui/src/app/services/api/embassy-api.service.ts index 280ac79f3..551011533 100644 --- a/web/projects/ui/src/app/services/api/embassy-api.service.ts +++ b/web/projects/ui/src/app/services/api/embassy-api.service.ts @@ -7,6 +7,7 @@ import { GetPackagesRes, MarketplacePkg, } from '@start9labs/marketplace' +import { WebSocketSubject } from 'rxjs/webSocket' export abstract class ApiService { // http @@ -30,7 +31,7 @@ export abstract class ApiService { abstract openWebsocket$( guid: string, config?: RR.WebsocketConfig, - ): Observable + ): WebSocketSubject // state diff --git a/web/projects/ui/src/app/services/api/embassy-live-api.service.ts b/web/projects/ui/src/app/services/api/embassy-live-api.service.ts index 6241eaad7..9893585a4 100644 --- a/web/projects/ui/src/app/services/api/embassy-live-api.service.ts +++ b/web/projects/ui/src/app/services/api/embassy-live-api.service.ts @@ -11,7 +11,7 @@ import { PATCH_CACHE } from 'src/app/services/patch-db/patch-db-source' import { ApiService } from './embassy-api.service' import { RR } from './api.types' import { ConfigService } from '../config.service' -import { webSocket } from 'rxjs/webSocket' +import { webSocket, WebSocketSubject } from 'rxjs/webSocket' import { Observable, filter, firstValueFrom } from 'rxjs' import { AuthService } from '../auth.service' import { DOCUMENT } from '@angular/common' @@ -85,7 +85,7 @@ export class LiveApiService extends ApiService { openWebsocket$( guid: string, config: RR.WebsocketConfig = {}, - ): Observable { + ): WebSocketSubject { const { location } = this.document.defaultView! const protocol = location.protocol === 'http:' ? 'ws' : 'wss' const host = location.host diff --git a/web/projects/ui/src/app/services/api/embassy-mock-api.service.ts b/web/projects/ui/src/app/services/api/embassy-mock-api.service.ts index 93925cc73..23296ad15 100644 --- a/web/projects/ui/src/app/services/api/embassy-mock-api.service.ts +++ b/web/projects/ui/src/app/services/api/embassy-mock-api.service.ts @@ -37,6 +37,7 @@ import { GetPackagesRes, MarketplacePkg, } from '@start9labs/marketplace' +import { WebSocketSubject } from 'rxjs/webSocket' const PROGRESS: T.FullProgress = { overall: { @@ -107,11 +108,11 @@ export class MockApiService extends ApiService { openWebsocket$( guid: string, config: RR.WebsocketConfig = {}, - ): Observable { + ): WebSocketSubject { if (guid === 'db-guid') { return this.mockWsSource$.pipe( shareReplay({ bufferSize: 1, refCount: true }), - ) + ) as WebSocketSubject } else if (guid === 'logs-guid') { return interval(50).pipe( map((_, index) => { @@ -120,16 +121,16 @@ export class MockApiService extends ApiService { if (index === 100) throw new Error('HAAHHA') return Mock.ServerLogs[0] }), - ) + ) as WebSocketSubject } else if (guid === 'init-progress-guid') { return from(this.initProgress()).pipe( startWith(PROGRESS), - ) as Observable + ) as WebSocketSubject } else if (guid === 'sideload-progress-guid') { config.openObserver?.next(new Event('')) return from(this.initProgress()).pipe( startWith(PROGRESS), - ) as Observable + ) as WebSocketSubject } else { throw new Error('invalid guid type') } diff --git a/web/projects/ui/src/app/services/auth.service.ts b/web/projects/ui/src/app/services/auth.service.ts index 9c16d0e26..2db008d8e 100644 --- a/web/projects/ui/src/app/services/auth.service.ts +++ b/web/projects/ui/src/app/services/auth.service.ts @@ -27,9 +27,9 @@ export class AuthService { ) {} init(): void { - const loggedIn = this.storage.get(this.LOGGED_IN_KEY) + const loggedIn = this.storage.get(this.LOGGED_IN_KEY) if (loggedIn) { - this.setVerified() + this.authState$.next(AuthState.VERIFIED) } else { this.setUnverified() }