Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/simple syncdb #2464

Merged
merged 4 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ pub async fn action(
let manifest = ctx
.db
.peek()
.await?
.await
.as_package_data()
.as_idx(&pkg_id)
.or_not_found(&pkg_id)?
Expand Down
4 changes: 2 additions & 2 deletions backend/src/backup/backup_bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub async fn backup_all(
package_ids: Option<OrdSet<PackageId>>,
#[arg] password: crate::auth::PasswordType,
) -> Result<(), Error> {
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;
let old_password_decrypted = old_password
.as_ref()
.unwrap_or(&password)
Expand Down Expand Up @@ -265,7 +265,7 @@ async fn perform_backup(
}
}

let ui = ctx.db.peek().await?.into_ui().de()?;
let ui = ctx.db.peek().await.into_ui().de()?;

let mut os_backup_file = AtomicFile::new(
backup_guard.lock().await.as_ref().join("os-backup.cbor"),
Expand Down
2 changes: 1 addition & 1 deletion backend/src/backup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl BackupActions {
let marketplace_url = ctx
.db
.peek()
.await?
.await
.as_package_data()
.as_idx(&pkg_id)
.or_not_found(pkg_id)?
Expand Down
2 changes: 1 addition & 1 deletion backend/src/backup/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ async fn assure_restoring(
let mut insert_packages = BTreeMap::new();

for id in ids {
let peek = ctx.db.peek().await?;
let peek = ctx.db.peek().await;

let model = peek.as_package_data().as_idx(&id);

Expand Down
4 changes: 2 additions & 2 deletions backend/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ pub async fn get(
#[arg(long = "format")]
format: Option<IoFormat>,
) -> Result<ConfigRes, Error> {
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;
let manifest = db
.as_package_data()
.as_idx(&id)
Expand Down Expand Up @@ -256,7 +256,7 @@ pub async fn configure(
id: &PackageId,
configure_context: ConfigureContext,
) -> Result<BTreeMap<PackageId, String>, Error> {
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;
let package = db
.as_package_data()
.as_idx(id)
Expand Down
8 changes: 1 addition & 7 deletions backend/src/config/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1696,7 +1696,6 @@ impl TorAddressPointer {
.db
.peek()
.await
.map_err(|e| ConfigurationError::SystemError(e))?
.as_package_data()
.as_idx(&self.package_id)
.and_then(|pde| pde.as_installed())
Expand Down Expand Up @@ -1739,7 +1738,6 @@ impl LanAddressPointer {
.db
.peek()
.await
.map_err(|e| ConfigurationError::SystemError(e))?
.as_package_data()
.as_idx(&self.package_id)
.and_then(|pde| pde.as_installed())
Expand Down Expand Up @@ -1775,11 +1773,7 @@ impl ConfigPointer {
Ok(self.select(&Value::Object(cfg.clone())))
} else {
let id = &self.package_id;
let db = ctx
.db
.peek()
.await
.map_err(|e| ConfigurationError::SystemError(e))?;
let db = ctx.db.peek().await;
let manifest = db.as_package_data().as_idx(id).map(|pde| pde.as_manifest());
let cfg_actions = manifest.and_then(|m| m.as_config().transpose_ref());
if let (Some(manifest), Some(cfg_actions)) = (manifest, cfg_actions) {
Expand Down
2 changes: 1 addition & 1 deletion backend/src/context/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ impl RpcContext {
})
.await?;

let peek = self.db.peek().await?;
let peek = self.db.peek().await;

for (package_id, package) in peek.as_package_data().as_entries()?.into_iter() {
let action = match package.as_match() {
Expand Down
6 changes: 3 additions & 3 deletions backend/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::Error;
#[command(display(display_none), metadata(sync_db = true))]
#[instrument(skip_all)]
pub async fn start(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result<(), Error> {
let peek = ctx.db.peek().await?;
let peek = ctx.db.peek().await;
let version = peek
.as_package_data()
.as_idx(&id)
Expand All @@ -35,7 +35,7 @@ pub async fn start(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result<(

#[command(display(display_none), metadata(sync_db = true))]
pub async fn stop(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result<MainStatus, Error> {
let peek = ctx.db.peek().await?;
let peek = ctx.db.peek().await;
let version = peek
.as_package_data()
.as_idx(&id)
Expand Down Expand Up @@ -71,7 +71,7 @@ pub async fn stop(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result<Ma

#[command(display(display_none), metadata(sync_db = true))]
pub async fn restart(#[context] ctx: RpcContext, #[arg] id: PackageId) -> Result<(), Error> {
let peek = ctx.db.peek().await?;
let peek = ctx.db.peek().await;
let version = peek
.as_package_data()
.as_idx(&id)
Expand Down
22 changes: 4 additions & 18 deletions backend/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn ws_handler<
session: Option<(HasValidSession, HashSessionToken)>,
ws_fut: WSFut,
) -> Result<(), Error> {
let (dump, sub) = ctx.db.dump_and_sub().await?;
let (dump, sub) = ctx.db.dump_and_sub().await;
let mut stream = ws_fut
.await
.with_kind(ErrorKind::Network)?
Expand Down Expand Up @@ -174,7 +174,7 @@ pub async fn subscribe(ctx: RpcContext, req: Request<Body>) -> Result<Response<B
Ok(res)
}

#[command(subcommands(revisions, dump, put, apply))]
#[command(subcommands(dump, put, apply))]
pub fn db() -> Result<(), RpcError> {
Ok(())
}
Expand All @@ -186,28 +186,14 @@ pub enum RevisionsRes {
Dump(Dump),
}

#[command(display(display_serializable))]
pub async fn revisions(
#[context] ctx: RpcContext,
#[arg] since: u64,
#[allow(unused_variables)]
#[arg(long = "format")]
format: Option<IoFormat>,
) -> Result<RevisionsRes, Error> {
Ok(match ctx.db.sync(since).await? {
Ok(revs) => RevisionsRes::Revisions(revs),
Err(dump) => RevisionsRes::Dump(dump),
})
}

#[instrument(skip_all)]
async fn cli_dump(
ctx: CliContext,
_format: Option<IoFormat>,
path: Option<PathBuf>,
) -> Result<Dump, RpcError> {
let dump = if let Some(path) = path {
PatchDb::open(path).await?.dump().await?
PatchDb::open(path).await?.dump().await
} else {
rpc_toolkit::command_helpers::call_remote(
ctx,
Expand Down Expand Up @@ -235,7 +221,7 @@ pub async fn dump(
#[arg]
path: Option<PathBuf>,
) -> Result<Dump, Error> {
Ok(ctx.db.dump().await?)
Ok(ctx.db.dump().await)
}

fn apply_expr(input: jaq_core::Val, expr: &str) -> Result<jaq_core::Val, Error> {
Expand Down
6 changes: 3 additions & 3 deletions backend/src/db/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ where

#[async_trait::async_trait]
pub trait PatchDbExt {
async fn peek(&self) -> Result<DatabaseModel, Error>;
async fn peek(&self) -> DatabaseModel;
async fn mutate<U: UnwindSafe + Send>(
&self,
f: impl FnOnce(&mut DatabaseModel) -> Result<U, Error> + UnwindSafe + Send,
Expand All @@ -40,8 +40,8 @@ pub trait PatchDbExt {
}
#[async_trait::async_trait]
impl PatchDbExt for PatchDb {
async fn peek(&self) -> Result<DatabaseModel, Error> {
Ok(DatabaseModel::from(self.dump().await?.value))
async fn peek(&self) -> DatabaseModel {
DatabaseModel::from(self.dump().await.value)
}
async fn mutate<U: UnwindSafe + Send>(
&self,
Expand Down
2 changes: 1 addition & 1 deletion backend/src/dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ pub async fn configure_logic(
ctx: RpcContext,
(pkg_id, dependency_id): (PackageId, PackageId),
) -> Result<ConfigDryRes, Error> {
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;
let pkg = db
.as_package_data()
.as_idx(&pkg_id)
Expand Down
2 changes: 1 addition & 1 deletion backend/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ pub async fn init(cfg: &RpcContextConfig) -> Result<InitResult, Error> {
let account = AccountInfo::load(&secret_store).await?;
let db = cfg.db(&account).await?;
tracing::info!("Opened PatchDB");
let peek = db.peek().await?;
let peek = db.peek().await;
let mut server_info = peek.as_server_info().de()?;

// write to ca cert store
Expand Down
4 changes: 2 additions & 2 deletions backend/src/install/cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub async fn cleanup_failed(ctx: &RpcContext, id: &PackageId) -> Result<(), Erro
if let Some(version) = match ctx
.db
.peek()
.await?
.await
.as_package_data()
.as_idx(id)
.or_not_found(id)?
Expand Down Expand Up @@ -141,7 +141,7 @@ pub async fn uninstall<Ex>(ctx: &RpcContext, secrets: &mut Ex, id: &PackageId) -
where
for<'a> &'a mut Ex: Executor<'a, Database = Postgres>,
{
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;
let entry = db
.as_package_data()
.as_idx(id)
Expand Down
8 changes: 4 additions & 4 deletions backend/src/install/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub const PKG_WASM_DIR: &str = "package-data/wasm";

#[command(display(display_serializable))]
pub async fn list(#[context] ctx: RpcContext) -> Result<Value, Error> {
Ok(ctx.db.peek().await?.as_package_data().as_entries()?
Ok(ctx.db.peek().await.as_package_data().as_entries()?
.iter()
.filter_map(|(id, pde)| {
let status = match pde.as_match() {
Expand Down Expand Up @@ -666,7 +666,7 @@ pub async fn download_install_s9pk(
) -> Result<(), Error> {
let pkg_id = &temp_manifest.id;
let version = &temp_manifest.version;
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;

if let Result::<(), Error>::Err(e) = {
let ctx = ctx.clone();
Expand Down Expand Up @@ -786,7 +786,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
rdr.validated();
let developer_key = rdr.developer_key().clone();
rdr.reset().await?;
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;

tracing::info!("Install {}@{}: Unpacking Manifest", pkg_id, version);
let manifest = progress
Expand Down Expand Up @@ -1017,7 +1017,7 @@ pub async fn install_s9pk<R: AsyncRead + AsyncSeek + Unpin + Send + Sync>(
)
.await?;

let peek = ctx.db.peek().await?;
let peek = ctx.db.peek().await;
let prev = peek
.as_package_data()
.as_idx(pkg_id)
Expand Down
2 changes: 1 addition & 1 deletion backend/src/manager/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::Error;
#[instrument(skip_all)]
pub async fn check(ctx: &RpcContext, id: &PackageId) -> Result<(), Error> {
let (manifest, started) = {
let peeked = ctx.db.peek().await?;
let peeked = ctx.db.peek().await;
let pde = peeked
.as_package_data()
.as_idx(id)
Expand Down
4 changes: 2 additions & 2 deletions backend/src/manager/manager_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl ManageContainer {
let current_state = Arc::new(watch::channel(StartStop::Stop).0);
let desired_state = Arc::new(
watch::channel::<StartStop>(
get_status(seed.ctx.db.peek().await?, &seed.manifest).into(),
get_status(seed.ctx.db.peek().await, &seed.manifest).into(),
)
.0,
);
Expand Down Expand Up @@ -103,7 +103,7 @@ impl ManageContainer {
&self,
seed: &manager_seed::ManagerSeed,
) -> Result<(), Error> {
let current_state = get_status(seed.ctx.db.peek().await?, &seed.manifest);
let current_state = get_status(seed.ctx.db.peek().await, &seed.manifest);
self.override_main_status
.send_modify(|x| *x = Some(current_state));
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions backend/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ impl Manager {
let manage_container = self.manage_container.clone();
let seed = self.seed.clone();
async move {
let peek = seed.ctx.db.peek().await?;
let peek = seed.ctx.db.peek().await;
let state_reverter = DesiredStateReverter::new(manage_container.clone());
let override_guard =
manage_container.set_override(get_status(peek, &seed.manifest).backing_up())?;
Expand Down Expand Up @@ -338,7 +338,7 @@ async fn configure(
id: PackageId,
mut configure_context: ConfigureContext,
) -> Result<BTreeMap<PackageId, String>, Error> {
let db = ctx.db.peek().await?;
let db = ctx.db.peek().await;
let id = &id;
let ctx = &ctx;
let overrides = &mut configure_context.overrides;
Expand Down
46 changes: 7 additions & 39 deletions backend/src/middleware/db.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use color_eyre::eyre::eyre;
use futures::future::BoxFuture;
use futures::FutureExt;
use http::HeaderValue;
Expand All @@ -11,7 +10,6 @@ use rpc_toolkit::yajrc::RpcMethod;
use rpc_toolkit::Metadata;

use crate::context::RpcContext;
use crate::{Error, ResultExt};

pub fn db<M: Metadata>(ctx: RpcContext) -> DynMiddleware<M> {
Box::new(
Expand All @@ -22,49 +20,19 @@ pub fn db<M: Metadata>(ctx: RpcContext) -> DynMiddleware<M> {
async move {
let m2: DynMiddlewareStage2 = Box::new(move |req, rpc_req| {
async move {
let seq = req.headers.remove("x-patch-sequence");
let sync_db = metadata
.get(rpc_req.method.as_str(), "sync_db")
.unwrap_or(false);

let m3: DynMiddlewareStage3 = Box::new(move |res, _| {
async move {
if sync_db && seq.is_some() {
match async {
let seq = seq
.ok_or_else(|| {
Error::new(
eyre!("Missing X-Patch-Sequence"),
crate::ErrorKind::InvalidRequest,
)
})?
.to_str()
.with_kind(crate::ErrorKind::InvalidRequest)?
.parse()?;
let res = ctx.db.sync(seq).await?;
let json = match res {
Ok(revs) => serde_json::to_vec(&revs),
Err(dump) => serde_json::to_vec(&[dump]),
}
.with_kind(crate::ErrorKind::Serialization)?;
Ok::<_, Error>(base64::encode_config(
&json,
base64::URL_SAFE,
))
}
.await
{
Ok(a) => res
.headers
.append("X-Patch-Updates", HeaderValue::from_str(&a)?),
Err(e) => res.headers.append(
"X-Patch-Error",
HeaderValue::from_str(&base64::encode_config(
&e.to_string(),
base64::URL_SAFE,
))?,
),
};
if sync_db {
res.headers.append(
"X-Patch-Sequence",
HeaderValue::from_str(
&ctx.db.sequence().await.to_string(),
)?,
);
}
Ok(Ok(noop4()))
}
Expand Down
Loading
Loading