From 202695096a908a832906ae929cd73c0bbf5fba50 Mon Sep 17 00:00:00 2001 From: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com> Date: Tue, 17 Oct 2023 09:49:58 -0600 Subject: [PATCH] Feature/simple syncdb (#2464) * simplify db sync on rpc endpoints * switch to patch-db master * update fe for websocket only stream * fix api --------- Co-authored-by: Matt Hill --- backend/src/action.rs | 2 +- backend/src/backup/backup_bulk.rs | 4 +- backend/src/backup/mod.rs | 2 +- backend/src/backup/restore.rs | 2 +- backend/src/config/mod.rs | 4 +- backend/src/config/spec.rs | 8 +-- backend/src/context/rpc.rs | 2 +- backend/src/control.rs | 6 +- backend/src/db/mod.rs | 22 ++---- backend/src/db/prelude.rs | 6 +- backend/src/dependencies.rs | 2 +- backend/src/init.rs | 2 +- backend/src/install/cleanup.rs | 4 +- backend/src/install/mod.rs | 8 +-- backend/src/manager/health.rs | 2 +- backend/src/manager/manager_container.rs | 4 +- backend/src/manager/mod.rs | 4 +- backend/src/middleware/db.rs | 46 ++----------- backend/src/notifications.rs | 2 +- backend/src/properties.rs | 2 +- backend/src/system.rs | 2 +- backend/src/update/mod.rs | 2 +- backend/src/version/mod.rs | 2 +- backend/src/version/v0_3_5.rs | 2 +- .../app/services/api/embassy-api.service.ts | 4 +- .../services/api/embassy-live-api.service.ts | 47 +++++-------- .../services/api/embassy-mock-api.service.ts | 68 ++++++++++--------- .../app/services/patch-db/patch-db.factory.ts | 6 +- patch-db | 2 +- 29 files changed, 101 insertions(+), 168 deletions(-) diff --git a/backend/src/action.rs b/backend/src/action.rs index 07fe1759b..3223aaa86 100644 --- a/backend/src/action.rs +++ b/backend/src/action.rs @@ -134,7 +134,7 @@ pub async fn action( let manifest = ctx .db .peek() - .await? + .await .as_package_data() .as_idx(&pkg_id) .or_not_found(&pkg_id)? diff --git a/backend/src/backup/backup_bulk.rs b/backend/src/backup/backup_bulk.rs index 5bab4bb78..84def92f5 100644 --- a/backend/src/backup/backup_bulk.rs +++ b/backend/src/backup/backup_bulk.rs @@ -56,7 +56,7 @@ pub async fn backup_all( package_ids: Option>, #[arg] password: crate::auth::PasswordType, ) -> Result<(), Error> { - let db = ctx.db.peek().await?; + let db = ctx.db.peek().await; let old_password_decrypted = old_password .as_ref() .unwrap_or(&password) @@ -265,7 +265,7 @@ async fn perform_backup( } } - let ui = ctx.db.peek().await?.into_ui().de()?; + let ui = ctx.db.peek().await.into_ui().de()?; let mut os_backup_file = AtomicFile::new( backup_guard.lock().await.as_ref().join("os-backup.cbor"), diff --git a/backend/src/backup/mod.rs b/backend/src/backup/mod.rs index 670c01c29..2f3f9bd8f 100644 --- a/backend/src/backup/mod.rs +++ b/backend/src/backup/mod.rs @@ -134,7 +134,7 @@ impl BackupActions { let marketplace_url = ctx .db .peek() - .await? + .await .as_package_data() .as_idx(&pkg_id) .or_not_found(pkg_id)? diff --git a/backend/src/backup/restore.rs b/backend/src/backup/restore.rs index ac8d07f48..0fc0b6224 100644 --- a/backend/src/backup/restore.rs +++ b/backend/src/backup/restore.rs @@ -310,7 +310,7 @@ async fn assure_restoring( let mut insert_packages = BTreeMap::new(); for id in ids { - let peek = ctx.db.peek().await?; + let peek = ctx.db.peek().await; let model = peek.as_package_data().as_idx(&id); diff --git a/backend/src/config/mod.rs b/backend/src/config/mod.rs index 2d3a30fd7..3674142ab 100644 --- a/backend/src/config/mod.rs +++ b/backend/src/config/mod.rs @@ -167,7 +167,7 @@ pub async fn get( #[arg(long = "format")] format: Option, ) -> Result { - let db = ctx.db.peek().await?; + let db = ctx.db.peek().await; let manifest = db .as_package_data() .as_idx(&id) @@ -256,7 +256,7 @@ pub async fn configure( id: &PackageId, configure_context: ConfigureContext, ) -> Result, Error> { - let db = ctx.db.peek().await?; + let db = ctx.db.peek().await; let package = db .as_package_data() .as_idx(id) diff --git a/backend/src/config/spec.rs b/backend/src/config/spec.rs index b11cefaf2..6a5d96ef1 100644 --- a/backend/src/config/spec.rs +++ b/backend/src/config/spec.rs @@ -1696,7 +1696,6 @@ impl TorAddressPointer { .db .peek() .await - .map_err(|e| ConfigurationError::SystemError(e))? .as_package_data() .as_idx(&self.package_id) .and_then(|pde| pde.as_installed()) @@ -1739,7 +1738,6 @@ impl LanAddressPointer { .db .peek() .await - .map_err(|e| ConfigurationError::SystemError(e))? .as_package_data() .as_idx(&self.package_id) .and_then(|pde| pde.as_installed()) @@ -1775,11 +1773,7 @@ impl ConfigPointer { Ok(self.select(&Value::Object(cfg.clone()))) } else { let id = &self.package_id; - let db = ctx - .db - .peek() - .await - .map_err(|e| ConfigurationError::SystemError(e))?; + let db = ctx.db.peek().await; let manifest = db.as_package_data().as_idx(id).map(|pde| pde.as_manifest()); let cfg_actions = manifest.and_then(|m| m.as_config().transpose_ref()); if let (Some(manifest), Some(cfg_actions)) = (manifest, cfg_actions) { diff --git a/backend/src/context/rpc.rs b/backend/src/context/rpc.rs index cd3f1676c..e4fd66c97 100644 --- a/backend/src/context/rpc.rs +++ b/backend/src/context/rpc.rs @@ -276,7 +276,7 @@ impl RpcContext { }) .await?; - let peek = self.db.peek().await?; + let peek = self.db.peek().await; for (package_id, package) in peek.as_package_data().as_entries()?.into_iter() { let action = match package.as_match() { diff --git a/backend/src/control.rs b/backend/src/control.rs index 6d72a4043..58e39ac14 100644 --- a/backend/src/control.rs +++ b/backend/src/control.rs @@ -12,7 +12,7 @@ use crate::Error; #[command(display(display_none), metadata(sync_db = true))] #[instrument(skip_all)] pub async fn start(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result<(), Error> { - let peek = ctx.db.peek().await?; + let peek = ctx.db.peek().await; let version = peek .as_package_data() .as_idx(&id) @@ -35,7 +35,7 @@ pub async fn start(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result<( #[command(display(display_none), metadata(sync_db = true))] pub async fn stop(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result { - let peek = ctx.db.peek().await?; + let peek = ctx.db.peek().await; let version = peek .as_package_data() .as_idx(&id) @@ -71,7 +71,7 @@ pub async fn stop(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result Result<(), Error> { - let peek = ctx.db.peek().await?; + let peek = ctx.db.peek().await; let version = peek .as_package_data() .as_idx(&id) diff --git a/backend/src/db/mod.rs b/backend/src/db/mod.rs index 245abbb1c..03ad94338 100644 --- a/backend/src/db/mod.rs +++ b/backend/src/db/mod.rs @@ -37,7 +37,7 @@ async fn ws_handler< session: Option<(HasValidSession, HashSessionToken)>, ws_fut: WSFut, ) -> Result<(), Error> { - let (dump, sub) = ctx.db.dump_and_sub().await?; + let (dump, sub) = ctx.db.dump_and_sub().await; let mut stream = ws_fut .await .with_kind(ErrorKind::Network)? @@ -174,7 +174,7 @@ pub async fn subscribe(ctx: RpcContext, req: Request) -> Result Result<(), RpcError> { Ok(()) } @@ -186,20 +186,6 @@ pub enum RevisionsRes { Dump(Dump), } -#[command(display(display_serializable))] -pub async fn revisions( - #[context] ctx: RpcContext, - #[arg] since: u64, - #[allow(unused_variables)] - #[arg(long = "format")] - format: Option, -) -> Result { - Ok(match ctx.db.sync(since).await? { - Ok(revs) => RevisionsRes::Revisions(revs), - Err(dump) => RevisionsRes::Dump(dump), - }) -} - #[instrument(skip_all)] async fn cli_dump( ctx: CliContext, @@ -207,7 +193,7 @@ async fn cli_dump( path: Option, ) -> Result { let dump = if let Some(path) = path { - PatchDb::open(path).await?.dump().await? + PatchDb::open(path).await?.dump().await } else { rpc_toolkit::command_helpers::call_remote( ctx, @@ -235,7 +221,7 @@ pub async fn dump( #[arg] path: Option, ) -> Result { - Ok(ctx.db.dump().await?) + Ok(ctx.db.dump().await) } fn apply_expr(input: jaq_core::Val, expr: &str) -> Result { diff --git a/backend/src/db/prelude.rs b/backend/src/db/prelude.rs index 4fce5fbcb..922a47500 100644 --- a/backend/src/db/prelude.rs +++ b/backend/src/db/prelude.rs @@ -28,7 +28,7 @@ where #[async_trait::async_trait] pub trait PatchDbExt { - async fn peek(&self) -> Result; + async fn peek(&self) -> DatabaseModel; async fn mutate( &self, f: impl FnOnce(&mut DatabaseModel) -> Result + UnwindSafe + Send, @@ -40,8 +40,8 @@ pub trait PatchDbExt { } #[async_trait::async_trait] impl PatchDbExt for PatchDb { - async fn peek(&self) -> Result { - Ok(DatabaseModel::from(self.dump().await?.value)) + async fn peek(&self) -> DatabaseModel { + DatabaseModel::from(self.dump().await.value) } async fn mutate( &self, diff --git a/backend/src/dependencies.rs b/backend/src/dependencies.rs index 299518057..dfddecd93 100644 --- a/backend/src/dependencies.rs +++ b/backend/src/dependencies.rs @@ -170,7 +170,7 @@ pub async fn configure_logic( ctx: RpcContext, (pkg_id, dependency_id): (PackageId, PackageId), ) -> Result { - let db = ctx.db.peek().await?; + let db = ctx.db.peek().await; let pkg = db .as_package_data() .as_idx(&pkg_id) diff --git a/backend/src/init.rs b/backend/src/init.rs index 58d888b48..b98a23921 100644 --- a/backend/src/init.rs +++ b/backend/src/init.rs @@ -204,7 +204,7 @@ pub async fn init(cfg: &RpcContextConfig) -> Result { let account = AccountInfo::load(&secret_store).await?; let db = cfg.db(&account).await?; tracing::info!("Opened PatchDB"); - let peek = db.peek().await?; + let peek = db.peek().await; let mut server_info = peek.as_server_info().de()?; // write to ca cert store diff --git a/backend/src/install/cleanup.rs b/backend/src/install/cleanup.rs index e47b7ca6b..9d87f63e9 100644 --- a/backend/src/install/cleanup.rs +++ b/backend/src/install/cleanup.rs @@ -62,7 +62,7 @@ pub async fn cleanup_failed(ctx: &RpcContext, id: &PackageId) -> Result<(), Erro if let Some(version) = match ctx .db .peek() - .await? + .await .as_package_data() .as_idx(id) .or_not_found(id)? @@ -141,7 +141,7 @@ pub async fn uninstall(ctx: &RpcContext, secrets: &mut Ex, id: &PackageId) - where for<'a> &'a mut Ex: Executor<'a, Database = Postgres>, { - let db = ctx.db.peek().await?; + let db = ctx.db.peek().await; let entry = db .as_package_data() .as_idx(id) diff --git a/backend/src/install/mod.rs b/backend/src/install/mod.rs index cb21739cc..744e14664 100644 --- a/backend/src/install/mod.rs +++ b/backend/src/install/mod.rs @@ -64,7 +64,7 @@ pub const PKG_WASM_DIR: &str = "package-data/wasm"; #[command(display(display_serializable))] pub async fn list(#[context] ctx: RpcContext) -> Result { - Ok(ctx.db.peek().await?.as_package_data().as_entries()? + Ok(ctx.db.peek().await.as_package_data().as_entries()? .iter() .filter_map(|(id, pde)| { let status = match pde.as_match() { @@ -666,7 +666,7 @@ pub async fn download_install_s9pk( ) -> Result<(), Error> { let pkg_id = &temp_manifest.id; let version = &temp_manifest.version; - let db = ctx.db.peek().await?; + let db = ctx.db.peek().await; if let Result::<(), Error>::Err(e) = { let ctx = ctx.clone(); @@ -786,7 +786,7 @@ pub async fn install_s9pk( rdr.validated(); let developer_key = rdr.developer_key().clone(); rdr.reset().await?; - let db = ctx.db.peek().await?; + let db = ctx.db.peek().await; tracing::info!("Install {}@{}: Unpacking Manifest", pkg_id, version); let manifest = progress @@ -1017,7 +1017,7 @@ pub async fn install_s9pk( ) .await?; - let peek = ctx.db.peek().await?; + let peek = ctx.db.peek().await; let prev = peek .as_package_data() .as_idx(pkg_id) diff --git a/backend/src/manager/health.rs b/backend/src/manager/health.rs index 17e968f21..30f18051a 100644 --- a/backend/src/manager/health.rs +++ b/backend/src/manager/health.rs @@ -11,7 +11,7 @@ use crate::Error; #[instrument(skip_all)] pub async fn check(ctx: &RpcContext, id: &PackageId) -> Result<(), Error> { let (manifest, started) = { - let peeked = ctx.db.peek().await?; + let peeked = ctx.db.peek().await; let pde = peeked .as_package_data() .as_idx(id) diff --git a/backend/src/manager/manager_container.rs b/backend/src/manager/manager_container.rs index a4f44dbb4..32e11c2e5 100644 --- a/backend/src/manager/manager_container.rs +++ b/backend/src/manager/manager_container.rs @@ -53,7 +53,7 @@ impl ManageContainer { let current_state = Arc::new(watch::channel(StartStop::Stop).0); let desired_state = Arc::new( watch::channel::( - get_status(seed.ctx.db.peek().await?, &seed.manifest).into(), + get_status(seed.ctx.db.peek().await, &seed.manifest).into(), ) .0, ); @@ -103,7 +103,7 @@ impl ManageContainer { &self, seed: &manager_seed::ManagerSeed, ) -> Result<(), Error> { - let current_state = get_status(seed.ctx.db.peek().await?, &seed.manifest); + let current_state = get_status(seed.ctx.db.peek().await, &seed.manifest); self.override_main_status .send_modify(|x| *x = Some(current_state)); Ok(()) diff --git a/backend/src/manager/mod.rs b/backend/src/manager/mod.rs index 85e302cc5..8294ff586 100644 --- a/backend/src/manager/mod.rs +++ b/backend/src/manager/mod.rs @@ -267,7 +267,7 @@ impl Manager { let manage_container = self.manage_container.clone(); let seed = self.seed.clone(); async move { - let peek = seed.ctx.db.peek().await?; + let peek = seed.ctx.db.peek().await; let state_reverter = DesiredStateReverter::new(manage_container.clone()); let override_guard = manage_container.set_override(get_status(peek, &seed.manifest).backing_up())?; @@ -338,7 +338,7 @@ async fn configure( id: PackageId, mut configure_context: ConfigureContext, ) -> Result, Error> { - let db = ctx.db.peek().await?; + let db = ctx.db.peek().await; let id = &id; let ctx = &ctx; let overrides = &mut configure_context.overrides; diff --git a/backend/src/middleware/db.rs b/backend/src/middleware/db.rs index eeeeb299a..5eb0b482c 100644 --- a/backend/src/middleware/db.rs +++ b/backend/src/middleware/db.rs @@ -1,4 +1,3 @@ -use color_eyre::eyre::eyre; use futures::future::BoxFuture; use futures::FutureExt; use http::HeaderValue; @@ -11,7 +10,6 @@ use rpc_toolkit::yajrc::RpcMethod; use rpc_toolkit::Metadata; use crate::context::RpcContext; -use crate::{Error, ResultExt}; pub fn db(ctx: RpcContext) -> DynMiddleware { Box::new( @@ -22,49 +20,19 @@ pub fn db(ctx: RpcContext) -> DynMiddleware { async move { let m2: DynMiddlewareStage2 = Box::new(move |req, rpc_req| { async move { - let seq = req.headers.remove("x-patch-sequence"); let sync_db = metadata .get(rpc_req.method.as_str(), "sync_db") .unwrap_or(false); let m3: DynMiddlewareStage3 = Box::new(move |res, _| { async move { - if sync_db && seq.is_some() { - match async { - let seq = seq - .ok_or_else(|| { - Error::new( - eyre!("Missing X-Patch-Sequence"), - crate::ErrorKind::InvalidRequest, - ) - })? - .to_str() - .with_kind(crate::ErrorKind::InvalidRequest)? - .parse()?; - let res = ctx.db.sync(seq).await?; - let json = match res { - Ok(revs) => serde_json::to_vec(&revs), - Err(dump) => serde_json::to_vec(&[dump]), - } - .with_kind(crate::ErrorKind::Serialization)?; - Ok::<_, Error>(base64::encode_config( - &json, - base64::URL_SAFE, - )) - } - .await - { - Ok(a) => res - .headers - .append("X-Patch-Updates", HeaderValue::from_str(&a)?), - Err(e) => res.headers.append( - "X-Patch-Error", - HeaderValue::from_str(&base64::encode_config( - &e.to_string(), - base64::URL_SAFE, - ))?, - ), - }; + if sync_db { + res.headers.append( + "X-Patch-Sequence", + HeaderValue::from_str( + &ctx.db.sequence().await.to_string(), + )?, + ); } Ok(Ok(noop4())) } diff --git a/backend/src/notifications.rs b/backend/src/notifications.rs index afe92c571..73351471c 100644 --- a/backend/src/notifications.rs +++ b/backend/src/notifications.rs @@ -236,7 +236,7 @@ impl NotificationManager { subtype: T, debounce_interval: Option, ) -> Result<(), Error> { - let peek = db.peek().await?; + let peek = db.peek().await; if !self .should_notify(&package_id, &level, &title, debounce_interval) .await diff --git a/backend/src/properties.rs b/backend/src/properties.rs index da90a7370..851033b71 100644 --- a/backend/src/properties.rs +++ b/backend/src/properties.rs @@ -21,7 +21,7 @@ pub async fn properties(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Res #[instrument(skip_all)] pub async fn fetch_properties(ctx: RpcContext, id: PackageId) -> Result { - let peek = ctx.db.peek().await?; + let peek = ctx.db.peek().await; let manifest = peek .as_package_data() diff --git a/backend/src/system.rs b/backend/src/system.rs index b0bcfde8e..aee32b50a 100644 --- a/backend/src/system.rs +++ b/backend/src/system.rs @@ -58,7 +58,7 @@ pub async fn enable_zram() -> Result<(), Error> { #[command(display(display_none))] pub async fn zram(#[context] ctx: RpcContext, #[arg] enable: bool) -> Result<(), Error> { - let db = ctx.db.peek().await?; + let db = ctx.db.peek().await; let zram = db.as_server_info().as_zram().de()?; if enable == zram { diff --git a/backend/src/update/mod.rs b/backend/src/update/mod.rs index 200650166..814a27b1d 100644 --- a/backend/src/update/mod.rs +++ b/backend/src/update/mod.rs @@ -76,7 +76,7 @@ fn display_update_result(status: UpdateResult, _: &ArgMatches) { #[instrument(skip_all)] async fn maybe_do_update(ctx: RpcContext, marketplace_url: Url) -> Result, Error> { - let peeked = ctx.db.peek().await?; + let peeked = ctx.db.peek().await; let latest_version: Version = ctx .client .get(with_query_params( diff --git a/backend/src/version/mod.rs b/backend/src/version/mod.rs index efdf1a387..e0651642d 100644 --- a/backend/src/version/mod.rs +++ b/backend/src/version/mod.rs @@ -163,7 +163,7 @@ where } pub async fn init(db: &PatchDb, secrets: &PgPool) -> Result<(), Error> { - let version = Version::from_util_version(db.peek().await?.as_server_info().as_version().de()?); + let version = Version::from_util_version(db.peek().await.as_server_info().as_version().de()?); match version { Version::V0_3_4(v) => v.0.migrate_to(&Current::new(), db.clone(), secrets).await?, diff --git a/backend/src/version/v0_3_5.rs b/backend/src/version/v0_3_5.rs index 19a85a345..bf48ac02c 100644 --- a/backend/src/version/v0_3_5.rs +++ b/backend/src/version/v0_3_5.rs @@ -28,7 +28,7 @@ impl VersionT for Version { &V0_3_0_COMPAT } async fn up(&self, db: PatchDb, _secrets: &PgPool) -> Result<(), Error> { - let peek = db.peek().await?; + let peek = db.peek().await; let mut url_replacements = BTreeMap::new(); for (_, pde) in peek.as_package_data().as_entries()? { for (dependency, info) in pde diff --git a/frontend/projects/ui/src/app/services/api/embassy-api.service.ts b/frontend/projects/ui/src/app/services/api/embassy-api.service.ts index 7dd234969..17ce2d9d6 100644 --- a/frontend/projects/ui/src/app/services/api/embassy-api.service.ts +++ b/frontend/projects/ui/src/app/services/api/embassy-api.service.ts @@ -1,4 +1,4 @@ -import { Observable, Subject } from 'rxjs' +import { Observable } from 'rxjs' import { Update } from 'patch-db-client' import { RR } from './api.types' import { DataModel } from 'src/app/services/patch-db/data-model' @@ -6,8 +6,6 @@ import { Log } from '@start9labs/shared' import { WebSocketSubjectConfig } from 'rxjs/webSocket' export abstract class ApiService { - readonly patchStream$ = new Subject[]>() - // http // for getting static files: ex icons, instructions, licenses diff --git a/frontend/projects/ui/src/app/services/api/embassy-live-api.service.ts b/frontend/projects/ui/src/app/services/api/embassy-live-api.service.ts index 63b36aa93..18d47e2ce 100644 --- a/frontend/projects/ui/src/app/services/api/embassy-live-api.service.ts +++ b/frontend/projects/ui/src/app/services/api/embassy-live-api.service.ts @@ -1,6 +1,5 @@ import { Inject, Injectable } from '@angular/core' import { - decodeBase64, HttpOptions, HttpService, isRpcError, @@ -14,7 +13,7 @@ import { RR } from './api.types' import { parsePropertiesPermissive } from 'src/app/util/properties.util' import { ConfigService } from '../config.service' import { webSocket, WebSocketSubjectConfig } from 'rxjs/webSocket' -import { Observable } from 'rxjs' +import { Observable, filter, firstValueFrom } from 'rxjs' import { AuthService } from '../auth.service' import { DOCUMENT } from '@angular/common' import { DataModel } from '../patch-db/data-model' @@ -67,7 +66,7 @@ export class LiveApiService extends ApiService { // auth async login(params: RR.LoginReq): Promise { - return this.rpcRequest({ method: 'auth.login', params }, false) + return this.rpcRequest({ method: 'auth.login', params }) } async logout(params: RR.LogoutReq): Promise { @@ -91,7 +90,7 @@ export class LiveApiService extends ApiService { // server async echo(params: RR.EchoReq, urlOverride?: string): Promise { - return this.rpcRequest({ method: 'echo', params }, false, urlOverride) + return this.rpcRequest({ method: 'echo', params }, urlOverride) } openPatchWebsocket$(): Observable> { @@ -434,42 +433,28 @@ export class LiveApiService extends ApiService { private async rpcRequest( options: RPCOptions, - addHeader = true, urlOverride?: string, ): Promise { - if (addHeader) { - options.headers = { - 'x-patch-sequence': String(this.patch.cache$.value.sequence), - ...(options.headers || {}), - } - } - const res = await this.http.rpcRequest(options, urlOverride) - const encodedUpdates = res.headers.get('x-patch-updates') - const encodedError = res.headers.get('x-patch-error') - - if (encodedUpdates) { - const decoded = decodeBase64(encodedUpdates) - const updates: Update[] = JSON.parse(decoded) - this.patchStream$.next(updates) - } + const body = res.body - if (encodedError) { - const error = decodeBase64(encodedError) - console.error(error) - } - - const rpcRes = res.body - - if (isRpcError(rpcRes)) { - if (rpcRes.error.code === 34) { + if (isRpcError(body)) { + if (body.error.code === 34) { console.error('Unauthenticated, logging out') this.auth.setUnverified() } - throw new RpcError(rpcRes.error) + throw new RpcError(body.error) } - return rpcRes.result + const patchSequence = res.headers.get('x-patch-sequence') + if (patchSequence) + await firstValueFrom( + this.patch.cache$.pipe( + filter(({ sequence }) => sequence >= Number(patchSequence)), + ), + ) + + return body.result } private async httpRequest(opts: HttpOptions): Promise { diff --git a/frontend/projects/ui/src/app/services/api/embassy-mock-api.service.ts b/frontend/projects/ui/src/app/services/api/embassy-mock-api.service.ts index 5f11b085f..101d5511f 100644 --- a/frontend/projects/ui/src/app/services/api/embassy-mock-api.service.ts +++ b/frontend/projects/ui/src/app/services/api/embassy-mock-api.service.ts @@ -65,7 +65,6 @@ export class MockApiService extends ApiService { .pipe( tap(() => { this.sequence = 0 - this.patchStream$.next([]) }), switchMap(verified => iif( @@ -108,7 +107,9 @@ export class MockApiService extends ApiService { value: params.value, }, ] - return this.withRevision(patch) + this.mockRevision(patch) + + return null } // auth @@ -289,7 +290,9 @@ export class MockApiService extends ApiService { value: initialProgress, }, ] - return this.withRevision(patch, 'updating') + this.mockRevision(patch) + + return 'updating' } async restartServer( @@ -332,7 +335,9 @@ export class MockApiService extends ApiService { value: params.enable, }, ] - return this.withRevision(patch, null) + this.mockRevision(patch) + + return null } // marketplace URLs @@ -385,7 +390,9 @@ export class MockApiService extends ApiService { value: 0, }, ] - return this.withRevision(patch, Mock.Notifications) + this.mockRevision(patch) + + return Mock.Notifications } async deleteNotification( @@ -564,7 +571,9 @@ export class MockApiService extends ApiService { }, ] - return this.withRevision(originalPatch) + this.mockRevision(originalPatch) + + return null } // package @@ -629,7 +638,9 @@ export class MockApiService extends ApiService { }, }, ] - return this.withRevision(patch) + this.mockRevision(patch) + + return null } async getPackageConfig( @@ -660,7 +671,9 @@ export class MockApiService extends ApiService { value: true, }, ] - return this.withRevision(patch) + this.mockRevision(patch) + + return null } async restorePackages( @@ -684,7 +697,9 @@ export class MockApiService extends ApiService { } }) - return this.withRevision(patch) + this.mockRevision(patch) + + return null } async executePackageAction( @@ -768,7 +783,9 @@ export class MockApiService extends ApiService { }, ] - return this.withRevision(originalPatch) + this.mockRevision(originalPatch) + + return null } async restartPackage( @@ -845,7 +862,9 @@ export class MockApiService extends ApiService { }, ] - return this.withRevision(patch) + this.mockRevision(patch) + + return null } async stopPackage(params: RR.StopPackageReq): Promise { @@ -876,7 +895,9 @@ export class MockApiService extends ApiService { }, ] - return this.withRevision(patch) + this.mockRevision(patch) + + return null } async uninstallPackage( @@ -902,7 +923,9 @@ export class MockApiService extends ApiService { }, ] - return this.withRevision(patch) + this.mockRevision(patch) + + return null } async dryConfigureDependency( @@ -1057,23 +1080,4 @@ export class MockApiService extends ApiService { } this.mockWsSource$.next(revision) } - - private async withRevision( - patch: Operation[], - response: T | null = null, - ): Promise { - if (!this.sequence) { - const { sequence } = this.bootstrapper.init() - this.sequence = sequence - } - - this.patchStream$.next([ - { - id: ++this.sequence, - patch, - }, - ]) - - return response as T - } } diff --git a/frontend/projects/ui/src/app/services/patch-db/patch-db.factory.ts b/frontend/projects/ui/src/app/services/patch-db/patch-db.factory.ts index dc4276f79..51d29edc8 100644 --- a/frontend/projects/ui/src/app/services/patch-db/patch-db.factory.ts +++ b/frontend/projects/ui/src/app/services/patch-db/patch-db.factory.ts @@ -9,7 +9,7 @@ import { } from 'rxjs/operators' import { Update } from 'patch-db-client' import { DataModel } from './data-model' -import { defer, EMPTY, from, interval, merge, Observable } from 'rxjs' +import { defer, EMPTY, from, interval, Observable } from 'rxjs' import { AuthService } from '../auth.service' import { ConnectionService } from '../connection.service' import { ApiService } from '../api/embassy-api.service' @@ -51,9 +51,7 @@ export function sourceFactory( ) return authService.isVerified$.pipe( - switchMap(verified => - verified ? merge(websocket$, api.patchStream$) : EMPTY, - ), + switchMap(verified => (verified ? websocket$ : EMPTY)), ) }) } diff --git a/patch-db b/patch-db index b553ff7b5..6af2221ad 160000 --- a/patch-db +++ b/patch-db @@ -1 +1 @@ -Subproject commit b553ff7b5304c77acd886a0a87e65a01ce91790a +Subproject commit 6af2221add56f0a557b37a268ef9fb2299a05255