Skip to content

Commit

Permalink
Feature/simple syncdb (#2464)
Browse files Browse the repository at this point in the history
* simplify db sync on rpc endpoints

* switch to patch-db master

* update fe for websocket only stream

* fix api

---------

Co-authored-by: Matt Hill <[email protected]>
  • Loading branch information
dr-bonez and MattDHill authored Oct 17, 2023
1 parent afbab29 commit 2026950
Show file tree
Hide file tree
Showing 29 changed files with 101 additions and 168 deletions.
2 changes: 1 addition & 1 deletion backend/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ pub async fn action(
let manifest = ctx
.db
.peek()
.await?
.await
.as_package_data()
.as_idx(&pkg_id)
.or_not_found(&pkg_id)?
Expand Down
4 changes: 2 additions & 2 deletions backend/src/backup/backup_bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub async fn backup_all(
package_ids: Option<OrdSet<PackageId>>,
#[arg] password: crate::auth::PasswordType,
) -> Result<(), Error> {
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;
let old_password_decrypted = old_password
.as_ref()
.unwrap_or(&password)
Expand Down Expand Up @@ -265,7 +265,7 @@ async fn perform_backup(
}
}

let ui = ctx.db.peek().await?.into_ui().de()?;
let ui = ctx.db.peek().await.into_ui().de()?;

let mut os_backup_file = AtomicFile::new(
backup_guard.lock().await.as_ref().join("os-backup.cbor"),
Expand Down
2 changes: 1 addition & 1 deletion backend/src/backup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl BackupActions {
let marketplace_url = ctx
.db
.peek()
.await?
.await
.as_package_data()
.as_idx(&pkg_id)
.or_not_found(pkg_id)?
Expand Down
2 changes: 1 addition & 1 deletion backend/src/backup/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ async fn assure_restoring(
let mut insert_packages = BTreeMap::new();

for id in ids {
let peek = ctx.db.peek().await?;
let peek = ctx.db.peek().await;

let model = peek.as_package_data().as_idx(&id);

Expand Down
4 changes: 2 additions & 2 deletions backend/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ pub async fn get(
#[arg(long = "format")]
format: Option<IoFormat>,
) -> Result<ConfigRes, Error> {
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;
let manifest = db
.as_package_data()
.as_idx(&id)
Expand Down Expand Up @@ -256,7 +256,7 @@ pub async fn configure(
id: &PackageId,
configure_context: ConfigureContext,
) -> Result<BTreeMap<PackageId, String>, Error> {
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;
let package = db
.as_package_data()
.as_idx(id)
Expand Down
8 changes: 1 addition & 7 deletions backend/src/config/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1696,7 +1696,6 @@ impl TorAddressPointer {
.db
.peek()
.await
.map_err(|e| ConfigurationError::SystemError(e))?
.as_package_data()
.as_idx(&self.package_id)
.and_then(|pde| pde.as_installed())
Expand Down Expand Up @@ -1739,7 +1738,6 @@ impl LanAddressPointer {
.db
.peek()
.await
.map_err(|e| ConfigurationError::SystemError(e))?
.as_package_data()
.as_idx(&self.package_id)
.and_then(|pde| pde.as_installed())
Expand Down Expand Up @@ -1775,11 +1773,7 @@ impl ConfigPointer {
Ok(self.select(&Value::Object(cfg.clone())))
} else {
let id = &self.package_id;
let db = ctx
.db
.peek()
.await
.map_err(|e| ConfigurationError::SystemError(e))?;
let db = ctx.db.peek().await;
let manifest = db.as_package_data().as_idx(id).map(|pde| pde.as_manifest());
let cfg_actions = manifest.and_then(|m| m.as_config().transpose_ref());
if let (Some(manifest), Some(cfg_actions)) = (manifest, cfg_actions) {
Expand Down
2 changes: 1 addition & 1 deletion backend/src/context/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ impl RpcContext {
})
.await?;

let peek = self.db.peek().await?;
let peek = self.db.peek().await;

for (package_id, package) in peek.as_package_data().as_entries()?.into_iter() {
let action = match package.as_match() {
Expand Down
6 changes: 3 additions & 3 deletions backend/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::Error;
#[command(display(display_none), metadata(sync_db = true))]
#[instrument(skip_all)]
pub async fn start(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result<(), Error> {
let peek = ctx.db.peek().await?;
let peek = ctx.db.peek().await;
let version = peek
.as_package_data()
.as_idx(&id)
Expand All @@ -35,7 +35,7 @@ pub async fn start(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result<(

#[command(display(display_none), metadata(sync_db = true))]
pub async fn stop(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result<MainStatus, Error> {
let peek = ctx.db.peek().await?;
let peek = ctx.db.peek().await;
let version = peek
.as_package_data()
.as_idx(&id)
Expand Down Expand Up @@ -71,7 +71,7 @@ pub async fn stop(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result<Ma

#[command(display(display_none), metadata(sync_db = true))]
pub async fn restart(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result<(), Error> {
let peek = ctx.db.peek().await?;
let peek = ctx.db.peek().await;
let version = peek
.as_package_data()
.as_idx(&id)
Expand Down
22 changes: 4 additions & 18 deletions backend/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn ws_handler<
session: Option<(HasValidSession, HashSessionToken)>,
ws_fut: WSFut,
) -> Result<(), Error> {
let (dump, sub) = ctx.db.dump_and_sub().await?;
let (dump, sub) = ctx.db.dump_and_sub().await;
let mut stream = ws_fut
.await
.with_kind(ErrorKind::Network)?
Expand Down Expand Up @@ -174,7 +174,7 @@ pub async fn subscribe(ctx: RpcContext, req: Request<Body>) -> Result<Response<B
Ok(res)
}

#[command(subcommands(revisions, dump, put, apply))]
#[command(subcommands(dump, put, apply))]
pub fn db() -> Result<(), RpcError> {
Ok(())
}
Expand All @@ -186,28 +186,14 @@ pub enum RevisionsRes {
Dump(Dump),
}

#[command(display(display_serializable))]
pub async fn revisions(
#[context] ctx: RpcContext,
#[arg] since: u64,
#[allow(unused_variables)]
#[arg(long = "format")]
format: Option<IoFormat>,
) -> Result<RevisionsRes, Error> {
Ok(match ctx.db.sync(since).await? {
Ok(revs) => RevisionsRes::Revisions(revs),
Err(dump) => RevisionsRes::Dump(dump),
})
}

#[instrument(skip_all)]
async fn cli_dump(
ctx: CliContext,
_format: Option<IoFormat>,
path: Option<PathBuf>,
) -> Result<Dump, RpcError> {
let dump = if let Some(path) = path {
PatchDb::open(path).await?.dump().await?
PatchDb::open(path).await?.dump().await
} else {
rpc_toolkit::command_helpers::call_remote(
ctx,
Expand Down Expand Up @@ -235,7 +221,7 @@ pub async fn dump(
#[arg]
path: Option<PathBuf>,
) -> Result<Dump, Error> {
Ok(ctx.db.dump().await?)
Ok(ctx.db.dump().await)
}

fn apply_expr(input: jaq_core::Val, expr: &str) -> Result<jaq_core::Val, Error> {
Expand Down
6 changes: 3 additions & 3 deletions backend/src/db/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ where

#[async_trait::async_trait]
pub trait PatchDbExt {
async fn peek(&self) -> Result<DatabaseModel, Error>;
async fn peek(&self) -> DatabaseModel;
async fn mutate<U: UnwindSafe + Send>(
&self,
f: impl FnOnce(&mut DatabaseModel) -> Result<U, Error> + UnwindSafe + Send,
Expand All @@ -40,8 +40,8 @@ pub trait PatchDbExt {
}
#[async_trait::async_trait]
impl PatchDbExt for PatchDb {
async fn peek(&self) -> Result<DatabaseModel, Error> {
Ok(DatabaseModel::from(self.dump().await?.value))
async fn peek(&self) -> DatabaseModel {
DatabaseModel::from(self.dump().await.value)
}
async fn mutate<U: UnwindSafe + Send>(
&self,
Expand Down
2 changes: 1 addition & 1 deletion backend/src/dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ pub async fn configure_logic(
ctx: RpcContext,
(pkg_id, dependency_id): (PackageId, PackageId),
) -> Result<ConfigDryRes, Error> {
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;
let pkg = db
.as_package_data()
.as_idx(&pkg_id)
Expand Down
2 changes: 1 addition & 1 deletion backend/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ pub async fn init(cfg: &RpcContextConfig) -> Result<InitResult, Error> {
let account = AccountInfo::load(&secret_store).await?;
let db = cfg.db(&account).await?;
tracing::info!("Opened PatchDB");
let peek = db.peek().await?;
let peek = db.peek().await;
let mut server_info = peek.as_server_info().de()?;

// write to ca cert store
Expand Down
4 changes: 2 additions & 2 deletions backend/src/install/cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub async fn cleanup_failed(ctx: &RpcContext, id: &PackageId) -> Result<(), Erro
if let Some(version) = match ctx
.db
.peek()
.await?
.await
.as_package_data()
.as_idx(id)
.or_not_found(id)?
Expand Down Expand Up @@ -141,7 +141,7 @@ pub async fn uninstall<Ex>(ctx: &RpcContext, secrets: &mut Ex, id: &PackageId) -
where
for<'a> &'a mut Ex: Executor<'a, Database = Postgres>,
{
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;
let entry = db
.as_package_data()
.as_idx(id)
Expand Down
8 changes: 4 additions & 4 deletions backend/src/install/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub const PKG_WASM_DIR: &str = "package-data/wasm";

#[command(display(display_serializable))]
pub async fn list(#[context] ctx: RpcContext) -> Result<Value, Error> {
Ok(ctx.db.peek().await?.as_package_data().as_entries()?
Ok(ctx.db.peek().await.as_package_data().as_entries()?
.iter()
.filter_map(|(id, pde)| {
let status = match pde.as_match() {
Expand Down Expand Up @@ -666,7 +666,7 @@ pub async fn download_install_s9pk(
) -> Result<(), Error> {
let pkg_id = &temp_manifest.id;
let version = &temp_manifest.version;
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;

if let Result::<(), Error>::Err(e) = {
let ctx = ctx.clone();
Expand Down Expand Up @@ -786,7 +786,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
rdr.validated();
let developer_key = rdr.developer_key().clone();
rdr.reset().await?;
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;

tracing::info!("Install {}@{}: Unpacking Manifest", pkg_id, version);
let manifest = progress
Expand Down Expand Up @@ -1017,7 +1017,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
)
.await?;

let peek = ctx.db.peek().await?;
let peek = ctx.db.peek().await;
let prev = peek
.as_package_data()
.as_idx(pkg_id)
Expand Down
2 changes: 1 addition & 1 deletion backend/src/manager/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::Error;
#[instrument(skip_all)]
pub async fn check(ctx: &RpcContext, id: &PackageId) -> Result<(), Error> {
let (manifest, started) = {
let peeked = ctx.db.peek().await?;
let peeked = ctx.db.peek().await;
let pde = peeked
.as_package_data()
.as_idx(id)
Expand Down
4 changes: 2 additions & 2 deletions backend/src/manager/manager_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl ManageContainer {
let current_state = Arc::new(watch::channel(StartStop::Stop).0);
let desired_state = Arc::new(
watch::channel::<StartStop>(
get_status(seed.ctx.db.peek().await?, &seed.manifest).into(),
get_status(seed.ctx.db.peek().await, &seed.manifest).into(),
)
.0,
);
Expand Down Expand Up @@ -103,7 +103,7 @@ impl ManageContainer {
&self,
seed: &manager_seed::ManagerSeed,
) -> Result<(), Error> {
let current_state = get_status(seed.ctx.db.peek().await?, &seed.manifest);
let current_state = get_status(seed.ctx.db.peek().await, &seed.manifest);
self.override_main_status
.send_modify(|x| *x = Some(current_state));
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions backend/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ impl Manager {
let manage_container = self.manage_container.clone();
let seed = self.seed.clone();
async move {
let peek = seed.ctx.db.peek().await?;
let peek = seed.ctx.db.peek().await;
let state_reverter = DesiredStateReverter::new(manage_container.clone());
let override_guard =
manage_container.set_override(get_status(peek, &seed.manifest).backing_up())?;
Expand Down Expand Up @@ -338,7 +338,7 @@ async fn configure(
id: PackageId,
mut configure_context: ConfigureContext,
) -> Result<BTreeMap<PackageId, String>, Error> {
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;
let id = &id;
let ctx = &ctx;
let overrides = &mut configure_context.overrides;
Expand Down
46 changes: 7 additions & 39 deletions backend/src/middleware/db.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use color_eyre::eyre::eyre;
use futures::future::BoxFuture;
use futures::FutureExt;
use http::HeaderValue;
Expand All @@ -11,7 +10,6 @@ use rpc_toolkit::yajrc::RpcMethod;
use rpc_toolkit::Metadata;

use crate::context::RpcContext;
use crate::{Error, ResultExt};

pub fn db<M: Metadata>(ctx: RpcContext) -> DynMiddleware<M> {
Box::new(
Expand All @@ -22,49 +20,19 @@ pub fn db<M: Metadata>(ctx: RpcContext) -> DynMiddleware<M> {
async move {
let m2: DynMiddlewareStage2 = Box::new(move |req, rpc_req| {
async move {
let seq = req.headers.remove("x-patch-sequence");
let sync_db = metadata
.get(rpc_req.method.as_str(), "sync_db")
.unwrap_or(false);

let m3: DynMiddlewareStage3 = Box::new(move |res, _| {
async move {
if sync_db && seq.is_some() {
match async {
let seq = seq
.ok_or_else(|| {
Error::new(
eyre!("Missing X-Patch-Sequence"),
crate::ErrorKind::InvalidRequest,
)
})?
.to_str()
.with_kind(crate::ErrorKind::InvalidRequest)?
.parse()?;
let res = ctx.db.sync(seq).await?;
let json = match res {
Ok(revs) => serde_json::to_vec(&revs),
Err(dump) => serde_json::to_vec(&[dump]),
}
.with_kind(crate::ErrorKind::Serialization)?;
Ok::<_, Error>(base64::encode_config(
&json,
base64::URL_SAFE,
))
}
.await
{
Ok(a) => res
.headers
.append("X-Patch-Updates", HeaderValue::from_str(&a)?),
Err(e) => res.headers.append(
"X-Patch-Error",
HeaderValue::from_str(&base64::encode_config(
&e.to_string(),
base64::URL_SAFE,
))?,
),
};
if sync_db {
res.headers.append(
"X-Patch-Sequence",
HeaderValue::from_str(
&ctx.db.sequence().await.to_string(),
)?,
);
}
Ok(Ok(noop4()))
}
Expand Down
Loading

0 comments on commit 2026950

Please sign in to comment.