From fd9685988302800d48250a13a084b3dbdda7f98a Mon Sep 17 00:00:00 2001 From: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com> Date: Fri, 10 Nov 2023 14:57:21 -0700 Subject: [PATCH] [feature]: s9pk v2 (#2507) * feature: s9pk v2 wip wip wip wip refactor * use WriteQueue * fix proptest * LoopDev eager directory hash verification --- backend/Cargo.lock | 25 ++ backend/Cargo.toml | 2 + backend/src/disk/mount/filesystem/loop_dev.rs | 89 ++++++ backend/src/disk/mount/filesystem/mod.rs | 1 + backend/src/prelude.rs | 1 + .../s9pk/merkle_archive/directory_contents.rs | 199 +++++++++++++ .../src/s9pk/merkle_archive/file_contents.rs | 82 ++++++ backend/src/s9pk/merkle_archive/hash.rs | 97 +++++++ backend/src/s9pk/merkle_archive/mod.rs | 268 ++++++++++++++++++ backend/src/s9pk/merkle_archive/sink.rs | 70 +++++ .../src/s9pk/merkle_archive/source/http.rs | 91 ++++++ backend/src/s9pk/merkle_archive/source/mod.rs | 120 ++++++++ .../source/multi_cursor_file.rs | 84 ++++++ backend/src/s9pk/merkle_archive/test.rs | 138 +++++++++ backend/src/s9pk/merkle_archive/varint.rs | 159 +++++++++++ .../src/s9pk/merkle_archive/write_queue.rs | 47 +++ backend/src/s9pk/mod.rs | 249 +--------------- backend/src/s9pk/specv2.md | 28 -- backend/src/s9pk/{ => v1}/builder.rs | 0 backend/src/s9pk/{ => v1}/docker.rs | 0 backend/src/s9pk/{ => v1}/git_hash.rs | 0 backend/src/s9pk/{ => v1}/header.rs | 0 backend/src/s9pk/{ => v1}/manifest.rs | 0 backend/src/s9pk/v1/mod.rs | 246 ++++++++++++++++ backend/src/s9pk/{ => v1}/reader.rs | 0 backend/src/s9pk/v2/mod.rs | 41 +++ backend/src/s9pk/v2/specv2.md | 89 ++++++ backend/src/util/mod.rs | 24 ++ 28 files changed, 1877 insertions(+), 273 deletions(-) create mode 100644 backend/src/disk/mount/filesystem/loop_dev.rs create mode 100644 backend/src/s9pk/merkle_archive/directory_contents.rs create mode 100644 backend/src/s9pk/merkle_archive/file_contents.rs create mode 100644 backend/src/s9pk/merkle_archive/hash.rs create mode 100644 backend/src/s9pk/merkle_archive/mod.rs create mode 100644 backend/src/s9pk/merkle_archive/sink.rs create mode 100644 backend/src/s9pk/merkle_archive/source/http.rs create mode 100644 backend/src/s9pk/merkle_archive/source/mod.rs create mode 100644 backend/src/s9pk/merkle_archive/source/multi_cursor_file.rs create mode 100644 backend/src/s9pk/merkle_archive/test.rs create mode 100644 backend/src/s9pk/merkle_archive/varint.rs create mode 100644 backend/src/s9pk/merkle_archive/write_queue.rs delete mode 100644 backend/src/s9pk/specv2.md rename backend/src/s9pk/{ => v1}/builder.rs (100%) rename backend/src/s9pk/{ => v1}/docker.rs (100%) rename backend/src/s9pk/{ => v1}/git_hash.rs (100%) rename backend/src/s9pk/{ => v1}/header.rs (100%) rename backend/src/s9pk/{ => v1}/manifest.rs (100%) create mode 100644 backend/src/s9pk/v1/mod.rs rename backend/src/s9pk/{ => v1}/reader.rs (100%) create mode 100644 backend/src/s9pk/v2/mod.rs create mode 100644 backend/src/s9pk/v2/specv2.md diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 1607bf7f2..bc9a4f09a 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -452,6 +452,19 @@ dependencies = [ "constant_time_eq", ] +[[package]] +name = "blake3" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0231f06152bf547e9c2b5194f247cd97aacf6dcd8b15d8e5ec0663f64580da87" +dependencies = [ + "arrayref", + "arrayvec", + "cc", + "cfg-if 1.0.0", + "constant_time_eq", +] + [[package]] name = "block-buffer" version = "0.9.0" @@ -2385,6 +2398,16 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "integer-encoding" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924df4f0e24e2e7f9cdd90babb0b96f93b20f3ecfa949ea9e6613756b8c8e1bf" +dependencies = [ + "async-trait", + "tokio", +] + [[package]] name = "io-lifetimes" version = "1.0.11" @@ -4898,6 +4921,7 @@ dependencies = [ "base64 0.21.4", "base64ct", "basic-cookies", + "blake3", "bytes", "chrono", "ciborium", @@ -4929,6 +4953,7 @@ dependencies = [ "include_dir", "indexmap 2.0.2", "indicatif", + "integer-encoding", "ipnet", "iprange", "isocountry", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 444ad3cf3..c7b433f1c 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -52,6 +52,7 @@ base32 = "0.4.0" base64 = "0.21.4" base64ct = "1.6.0" basic-cookies = "0.1.4" +blake3 = "1.5.0" bytes = "1" chrono = { version = "0.4.31", features = ["serde"] } clap = "3.2.25" @@ -89,6 +90,7 @@ imbl-value = { git = "https://github.com/Start9Labs/imbl-value.git" } include_dir = "0.7.3" indexmap = { version = "2.0.2", features = ["serde"] } indicatif = { version = "0.17.7", features = ["tokio"] } +integer-encoding = { version = "4.0.0", features = ["tokio_async"] } ipnet = { version = "2.8.0", features = ["serde"] } iprange = { version = "0.6.7", features = ["serde"] } isocountry = "0.3.2" diff --git a/backend/src/disk/mount/filesystem/loop_dev.rs b/backend/src/disk/mount/filesystem/loop_dev.rs new file mode 100644 index 000000000..28a18597d --- /dev/null +++ b/backend/src/disk/mount/filesystem/loop_dev.rs @@ -0,0 +1,89 @@ +use std::os::unix::ffi::OsStrExt; +use std::path::Path; + +use async_trait::async_trait; +use digest::generic_array::GenericArray; +use digest::{Digest, OutputSizeUser}; +use serde::{Deserialize, Serialize}; +use sha2::Sha256; + +use super::{FileSystem, MountType, ReadOnly}; +use crate::util::Invoke; +use crate::{Error, ResultExt}; + +pub async fn mount( + logicalname: impl AsRef, + offset: u64, + size: u64, + mountpoint: impl AsRef, + mount_type: MountType, +) -> Result<(), Error> { + tokio::fs::create_dir_all(mountpoint.as_ref()).await?; + let mut opts = format!("loop,offset={offset},sizelimit={size}"); + if mount_type == ReadOnly { + opts += ",ro"; + } + + tokio::process::Command::new("mount") + .arg(logicalname.as_ref()) + .arg(mountpoint.as_ref()) + .arg("-o") + .arg(opts) + .invoke(crate::ErrorKind::Filesystem) + .await?; + Ok(()) +} + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "kebab-case")] +pub struct LoopDev> { + logicalname: LogicalName, + offset: u64, + size: u64, +} +impl> LoopDev { + pub fn new(logicalname: LogicalName, offset: u64, size: u64) -> Self { + Self { + logicalname, + offset, + size, + } + } +} +#[async_trait] +impl + Send + Sync> FileSystem for LoopDev { + async fn mount + Send + Sync>( + &self, + mountpoint: P, + mount_type: MountType, + ) -> Result<(), Error> { + mount( + self.logicalname.as_ref(), + self.offset, + self.size, + mountpoint, + mount_type, + ) + .await + } + async fn source_hash( + &self, + ) -> Result::OutputSize>, Error> { + let mut sha = Sha256::new(); + sha.update("LoopDev"); + sha.update( + tokio::fs::canonicalize(self.logicalname.as_ref()) + .await + .with_ctx(|_| { + ( + crate::ErrorKind::Filesystem, + self.logicalname.as_ref().display().to_string(), + ) + })? + .as_os_str() + .as_bytes(), + ); + sha.update(&u64::to_be_bytes(self.offset)[..]); + Ok(sha.finalize()) + } +} diff --git a/backend/src/disk/mount/filesystem/mod.rs b/backend/src/disk/mount/filesystem/mod.rs index 00247e0dd..11a6671df 100644 --- a/backend/src/disk/mount/filesystem/mod.rs +++ b/backend/src/disk/mount/filesystem/mod.rs @@ -14,6 +14,7 @@ pub mod ecryptfs; pub mod efivarfs; pub mod httpdirfs; pub mod label; +pub mod loop_dev; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum MountType { diff --git a/backend/src/prelude.rs b/backend/src/prelude.rs index ab5de1d38..3f70b7a2b 100644 --- a/backend/src/prelude.rs +++ b/backend/src/prelude.rs @@ -1,5 +1,6 @@ pub use color_eyre::eyre::eyre; pub use models::OptionExt; +pub use tracing::instrument; pub use crate::db::prelude::*; pub use crate::ensure_code; diff --git a/backend/src/s9pk/merkle_archive/directory_contents.rs b/backend/src/s9pk/merkle_archive/directory_contents.rs new file mode 100644 index 000000000..f662300b6 --- /dev/null +++ b/backend/src/s9pk/merkle_archive/directory_contents.rs @@ -0,0 +1,199 @@ +use std::collections::BTreeMap; +use std::path::Path; + +use futures::future::BoxFuture; +use futures::FutureExt; +use imbl_value::InternedString; +use tokio::io::AsyncRead; + +use crate::prelude::*; +use crate::s9pk::merkle_archive::hash::{Hash, HashWriter}; +use crate::s9pk::merkle_archive::sink::{Sink, TrackingWriter}; +use crate::s9pk::merkle_archive::source::{ArchiveSource, FileSource, Section}; +use crate::s9pk::merkle_archive::write_queue::WriteQueue; +use crate::s9pk::merkle_archive::{varint, Entry, EntryContents}; + +#[derive(Debug)] +pub struct DirectoryContents(BTreeMap>); +impl DirectoryContents { + pub fn new() -> Self { + Self(BTreeMap::new()) + } + + #[instrument(skip_all)] + pub fn get_path(&self, path: impl AsRef) -> Option<&Entry> { + let mut dir = Some(self); + let mut res = None; + for segment in path.as_ref().into_iter() { + let segment = segment.to_str()?; + if segment == "/" { + continue; + } + res = dir?.get(segment); + if let Some(EntryContents::Directory(d)) = res.as_ref().map(|e| e.as_contents()) { + dir = Some(d); + } else { + dir = None + } + } + res + } + + pub fn insert_path(&mut self, path: impl AsRef, entry: Entry) -> Result<(), Error> { + let path = path.as_ref(); + let (parent, Some(file)) = (path.parent(), path.file_name().and_then(|f| f.to_str())) + else { + return Err(Error::new( + eyre!("cannot create file at root"), + ErrorKind::Pack, + )); + }; + let mut dir = self; + for segment in parent.into_iter().flatten() { + let segment = segment + .to_str() + .ok_or_else(|| Error::new(eyre!("non-utf8 path segment"), ErrorKind::Utf8))?; + if segment == "/" { + continue; + } + if !dir.contains_key(segment) { + dir.insert( + segment.into(), + Entry::new(EntryContents::Directory(DirectoryContents::new())), + ); + } + if let Some(EntryContents::Directory(d)) = + dir.get_mut(segment).map(|e| e.as_contents_mut()) + { + dir = d; + } else { + return Err(Error::new(eyre!("failed to insert entry at path {path:?}: ancestor exists and is not a directory"), ErrorKind::Pack)); + } + } + dir.insert(file.into(), entry); + Ok(()) + } + + pub const fn header_size() -> u64 { + 8 // position: u64 BE + + 8 // size: u64 BE + } + + #[instrument(skip_all)] + pub async fn serialize_header(&self, position: u64, w: &mut W) -> Result { + use tokio::io::AsyncWriteExt; + + let size = self.toc_size(); + + w.write_all(&position.to_be_bytes()).await?; + w.write_all(&size.to_be_bytes()).await?; + + Ok(position) + } + + pub fn toc_size(&self) -> u64 { + self.0.iter().fold( + varint::serialized_varint_size(self.0.len() as u64), + |acc, (name, entry)| { + acc + varint::serialized_varstring_size(&**name) + entry.header_size() + }, + ) + } +} +impl DirectoryContents> { + #[instrument(skip_all)] + pub fn deserialize<'a>( + source: &'a S, + header: &'a mut (impl AsyncRead + Unpin + Send), + sighash: Hash, + ) -> BoxFuture<'a, Result> { + async move { + use tokio::io::AsyncReadExt; + + let mut position = [0u8; 8]; + header.read_exact(&mut position).await?; + let position = u64::from_be_bytes(position); + + let mut size = [0u8; 8]; + header.read_exact(&mut size).await?; + let size = u64::from_be_bytes(size); + + let mut toc_reader = source.fetch(position, size).await?; + + let len = varint::deserialize_varint(&mut toc_reader).await?; + let mut entries = BTreeMap::new(); + for _ in 0..len { + entries.insert( + varint::deserialize_varstring(&mut toc_reader).await?.into(), + Entry::deserialize(source, &mut toc_reader).await?, + ); + } + + let res = Self(entries); + + if res.sighash().await? == sighash { + Ok(res) + } else { + Err(Error::new( + eyre!("hash sum does not match"), + ErrorKind::InvalidSignature, + )) + } + } + .boxed() + } +} +impl DirectoryContents { + #[instrument(skip_all)] + pub fn update_hashes<'a>(&'a mut self, only_missing: bool) -> BoxFuture<'a, Result<(), Error>> { + async move { + for (_, entry) in &mut self.0 { + entry.update_hash(only_missing).await?; + } + Ok(()) + } + .boxed() + } + + #[instrument(skip_all)] + pub fn sighash<'a>(&'a self) -> BoxFuture<'a, Result> { + async move { + let mut hasher = TrackingWriter::new(0, HashWriter::new()); + let mut sig_contents = BTreeMap::new(); + for (name, entry) in &self.0 { + sig_contents.insert(name.clone(), entry.to_missing().await?); + } + Self(sig_contents) + .serialize_toc(&mut WriteQueue::new(0), &mut hasher) + .await?; + Ok(hasher.into_inner().finalize()) + } + .boxed() + } + + #[instrument(skip_all)] + pub async fn serialize_toc<'a, W: Sink>( + &'a self, + queue: &mut WriteQueue<'a, S>, + w: &mut W, + ) -> Result<(), Error> { + varint::serialize_varint(self.0.len() as u64, w).await?; + for (name, entry) in self.0.iter() { + varint::serialize_varstring(&**name, w).await?; + entry.serialize_header(queue.add(entry).await?, w).await?; + } + + Ok(()) + } +} +impl std::ops::Deref for DirectoryContents { + type Target = BTreeMap>; + fn deref(&self) -> &Self::Target { + &self.0 + } +} +impl std::ops::DerefMut for DirectoryContents { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} diff --git a/backend/src/s9pk/merkle_archive/file_contents.rs b/backend/src/s9pk/merkle_archive/file_contents.rs new file mode 100644 index 000000000..c02c0e879 --- /dev/null +++ b/backend/src/s9pk/merkle_archive/file_contents.rs @@ -0,0 +1,82 @@ +use tokio::io::AsyncRead; + +use crate::prelude::*; +use crate::s9pk::merkle_archive::hash::{Hash, HashWriter}; +use crate::s9pk::merkle_archive::sink::{Sink, TrackingWriter}; +use crate::s9pk::merkle_archive::source::{ArchiveSource, FileSource, Section}; + +#[derive(Debug)] +pub struct FileContents(S); +impl FileContents { + pub fn new(source: S) -> Self { + Self(source) + } + pub const fn header_size() -> u64 { + 8 // position: u64 BE + + 8 // size: u64 BE + } +} +impl FileContents> { + #[instrument(skip_all)] + pub async fn deserialize( + source: &S, + header: &mut (impl AsyncRead + Unpin + Send), + ) -> Result { + use tokio::io::AsyncReadExt; + + let mut position = [0u8; 8]; + header.read_exact(&mut position).await?; + let position = u64::from_be_bytes(position); + + let mut size = [0u8; 8]; + header.read_exact(&mut size).await?; + let size = u64::from_be_bytes(size); + + Ok(Self(source.section(position, size))) + } +} +impl FileContents { + pub async fn hash(&self) -> Result { + let mut hasher = TrackingWriter::new(0, HashWriter::new()); + self.serialize_body(&mut hasher, None).await?; + Ok(hasher.into_inner().finalize()) + } + #[instrument(skip_all)] + pub async fn serialize_header(&self, position: u64, w: &mut W) -> Result { + use tokio::io::AsyncWriteExt; + + let size = self.0.size().await?; + + w.write_all(&position.to_be_bytes()).await?; + w.write_all(&size.to_be_bytes()).await?; + + Ok(position) + } + #[instrument(skip_all)] + pub async fn serialize_body( + &self, + w: &mut W, + verify: Option, + ) -> Result<(), Error> { + let start = if verify.is_some() { + Some(w.current_position().await?) + } else { + None + }; + self.0.copy_verify(w, verify).await?; + if let Some(start) = start { + ensure_code!( + w.current_position().await? - start == self.0.size().await?, + ErrorKind::Pack, + "FileSource::copy wrote a number of bytes that does not match FileSource::size" + ); + } + Ok(()) + } +} +impl std::ops::Deref for FileContents { + type Target = S; + fn deref(&self) -> &Self::Target { + &self.0 + } +} diff --git a/backend/src/s9pk/merkle_archive/hash.rs b/backend/src/s9pk/merkle_archive/hash.rs new file mode 100644 index 000000000..ae2829012 --- /dev/null +++ b/backend/src/s9pk/merkle_archive/hash.rs @@ -0,0 +1,97 @@ +pub use blake3::Hash; +use blake3::Hasher; +use tokio::io::AsyncWrite; + +use crate::prelude::*; + +#[pin_project::pin_project] +pub struct HashWriter { + hasher: Hasher, +} +impl HashWriter { + pub fn new() -> Self { + Self { + hasher: Hasher::new(), + } + } + pub fn finalize(self) -> Hash { + self.hasher.finalize() + } +} +impl AsyncWrite for HashWriter { + fn poll_write( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + self.project().hasher.update(buf); + std::task::Poll::Ready(Ok(buf.len())) + } + fn poll_flush( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } +} + +#[pin_project::pin_project] +pub struct VerifyingWriter { + verify: Option<(Hasher, Hash)>, + #[pin] + writer: W, +} +impl VerifyingWriter { + pub fn new(w: W, verify: Option) -> Self { + Self { + verify: verify.map(|v| (Hasher::new(), v)), + writer: w, + } + } + pub fn verify(self) -> Result { + if let Some((actual, expected)) = self.verify { + ensure_code!( + actual.finalize() == expected, + ErrorKind::InvalidSignature, + "hash sum does not match" + ); + } + Ok(self.writer) + } +} +impl AsyncWrite for VerifyingWriter { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + let this = self.project(); + match this.writer.poll_write(cx, buf) { + std::task::Poll::Ready(Ok(written)) => { + if let Some((h, _)) = this.verify { + h.update(&buf[..written]); + } + std::task::Poll::Ready(Ok(written)) + } + a => a, + } + } + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.project().writer.poll_flush(cx) + } + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.project().writer.poll_shutdown(cx) + } +} diff --git a/backend/src/s9pk/merkle_archive/mod.rs b/backend/src/s9pk/merkle_archive/mod.rs new file mode 100644 index 000000000..f83cd2464 --- /dev/null +++ b/backend/src/s9pk/merkle_archive/mod.rs @@ -0,0 +1,268 @@ +use ed25519_dalek::{Signature, SigningKey, VerifyingKey}; +use tokio::io::AsyncRead; + +use crate::prelude::*; +use crate::s9pk::merkle_archive::directory_contents::DirectoryContents; +use crate::s9pk::merkle_archive::file_contents::FileContents; +use crate::s9pk::merkle_archive::hash::Hash; +use crate::s9pk::merkle_archive::sink::Sink; +use crate::s9pk::merkle_archive::source::{ArchiveSource, FileSource, Section}; +use crate::s9pk::merkle_archive::write_queue::WriteQueue; + +pub mod directory_contents; +pub mod file_contents; +pub mod hash; +pub mod sink; +pub mod source; +#[cfg(test)] +mod test; +pub mod varint; +pub mod write_queue; + +#[derive(Debug)] +enum Signer { + Signed(VerifyingKey, Signature), + Signer(SigningKey), +} + +#[derive(Debug)] +pub struct MerkleArchive { + signer: Signer, + contents: DirectoryContents, +} +impl MerkleArchive { + pub fn new(contents: DirectoryContents, signer: SigningKey) -> Self { + Self { + signer: Signer::Signer(signer), + contents, + } + } + pub const fn header_size() -> u64 { + 32 // pubkey + + 64 // signature + + DirectoryContents::>::header_size() + } + pub fn contents(&self) -> &DirectoryContents { + &self.contents + } +} +impl MerkleArchive> { + #[instrument(skip_all)] + pub async fn deserialize( + source: &S, + header: &mut (impl AsyncRead + Unpin + Send), + ) -> Result { + use tokio::io::AsyncReadExt; + + let mut pubkey = [0u8; 32]; + header.read_exact(&mut pubkey).await?; + let pubkey = VerifyingKey::from_bytes(&pubkey)?; + + let mut signature = [0u8; 64]; + header.read_exact(&mut signature).await?; + let signature = Signature::from_bytes(&signature); + + let mut sighash = [0u8; 32]; + header.read_exact(&mut sighash).await?; + let sighash = Hash::from_bytes(sighash); + + let contents = DirectoryContents::deserialize(source, header, sighash).await?; + + pubkey.verify_strict(contents.sighash().await?.as_bytes(), &signature)?; + + Ok(Self { + signer: Signer::Signed(pubkey, signature), + contents, + }) + } +} +impl MerkleArchive { + pub async fn update_hashes(&mut self, only_missing: bool) -> Result<(), Error> { + self.contents.update_hashes(only_missing).await + } + #[instrument(skip_all)] + pub async fn serialize(&self, w: &mut W, verify: bool) -> Result<(), Error> { + use tokio::io::AsyncWriteExt; + + let sighash = self.contents.sighash().await?; + + let (pubkey, signature) = match &self.signer { + Signer::Signed(pubkey, signature) => (*pubkey, *signature), + Signer::Signer(s) => (s.into(), ed25519_dalek::Signer::sign(s, sighash.as_bytes())), + }; + + w.write_all(pubkey.as_bytes()).await?; + w.write_all(&signature.to_bytes()).await?; + w.write_all(sighash.as_bytes()).await?; + let mut next_pos = w.current_position().await?; + next_pos += DirectoryContents::::header_size(); + self.contents.serialize_header(next_pos, w).await?; + next_pos += self.contents.toc_size(); + let mut queue = WriteQueue::new(next_pos); + self.contents.serialize_toc(&mut queue, w).await?; + queue.serialize(w, verify).await?; + Ok(()) + } +} + +#[derive(Debug)] +pub struct Entry { + hash: Option, + contents: EntryContents, +} +impl Entry { + pub fn new(contents: EntryContents) -> Self { + Self { + hash: None, + contents, + } + } + pub fn hash(&self) -> Option { + self.hash + } + pub fn as_contents(&self) -> &EntryContents { + &self.contents + } + pub fn as_contents_mut(&mut self) -> &mut EntryContents { + self.hash = None; + &mut self.contents + } + pub fn into_contents(self) -> EntryContents { + self.contents + } + pub fn header_size(&self) -> u64 { + 32 // hash + + self.contents.header_size() + } +} +impl Entry> { + #[instrument(skip_all)] + pub async fn deserialize( + source: &S, + header: &mut (impl AsyncRead + Unpin + Send), + ) -> Result { + use tokio::io::AsyncReadExt; + + let mut hash = [0u8; 32]; + header.read_exact(&mut hash).await?; + let hash = Hash::from_bytes(hash); + + let contents = EntryContents::deserialize(source, header, hash).await?; + + Ok(Self { + hash: Some(hash), + contents, + }) + } +} +impl Entry { + pub async fn to_missing(&self) -> Result { + let hash = if let Some(hash) = self.hash { + hash + } else { + self.contents.hash().await? + }; + Ok(Self { + hash: Some(hash), + contents: EntryContents::Missing, + }) + } + pub async fn update_hash(&mut self, only_missing: bool) -> Result<(), Error> { + if let EntryContents::Directory(d) = &mut self.contents { + d.update_hashes(only_missing).await?; + } + self.hash = Some(self.contents.hash().await?); + Ok(()) + } + #[instrument(skip_all)] + pub async fn serialize_header( + &self, + position: u64, + w: &mut W, + ) -> Result, Error> { + use tokio::io::AsyncWriteExt; + + let hash = if let Some(hash) = self.hash { + hash + } else { + self.contents.hash().await? + }; + w.write_all(hash.as_bytes()).await?; + self.contents.serialize_header(position, w).await + } +} + +#[derive(Debug)] +pub enum EntryContents { + Missing, + File(FileContents), + Directory(DirectoryContents), +} +impl EntryContents { + fn type_id(&self) -> u8 { + match self { + Self::Missing => 0, + Self::File(_) => 1, + Self::Directory(_) => 2, + } + } + pub fn header_size(&self) -> u64 { + 1 // type + + match self { + Self::Missing => 0, + Self::File(_) => FileContents::::header_size(), + Self::Directory(_) => DirectoryContents::::header_size(), + } + } +} +impl EntryContents> { + #[instrument(skip_all)] + pub async fn deserialize( + source: &S, + header: &mut (impl AsyncRead + Unpin + Send), + hash: Hash, + ) -> Result { + use tokio::io::AsyncReadExt; + + let mut type_id = [0u8]; + header.read_exact(&mut type_id).await?; + match type_id[0] { + 0 => Ok(Self::Missing), + 1 => Ok(Self::File(FileContents::deserialize(source, header).await?)), + 2 => Ok(Self::Directory( + DirectoryContents::deserialize(source, header, hash).await?, + )), + id => Err(Error::new( + eyre!("Unknown type id {id} found in MerkleArchive"), + ErrorKind::ParseS9pk, + )), + } + } +} +impl EntryContents { + pub async fn hash(&self) -> Result { + match self { + Self::Missing => Err(Error::new( + eyre!("Cannot compute hash of missing file"), + ErrorKind::Pack, + )), + Self::File(f) => f.hash().await, + Self::Directory(d) => d.sighash().await, + } + } + #[instrument(skip_all)] + pub async fn serialize_header( + &self, + position: u64, + w: &mut W, + ) -> Result, Error> { + use tokio::io::AsyncWriteExt; + + w.write_all(&[self.type_id()]).await?; + Ok(match self { + Self::Missing => None, + Self::File(f) => Some(f.serialize_header(position, w).await?), + Self::Directory(d) => Some(d.serialize_header(position, w).await?), + }) + } +} diff --git a/backend/src/s9pk/merkle_archive/sink.rs b/backend/src/s9pk/merkle_archive/sink.rs new file mode 100644 index 000000000..c71377808 --- /dev/null +++ b/backend/src/s9pk/merkle_archive/sink.rs @@ -0,0 +1,70 @@ +use tokio::io::{AsyncSeek, AsyncWrite}; + +use crate::prelude::*; + +#[async_trait::async_trait] +pub trait Sink: AsyncWrite + Unpin + Send { + async fn current_position(&mut self) -> Result; +} + +#[async_trait::async_trait] +impl Sink for S { + async fn current_position(&mut self) -> Result { + use tokio::io::AsyncSeekExt; + + Ok(self.stream_position().await?) + } +} + +#[async_trait::async_trait] +impl Sink for TrackingWriter { + async fn current_position(&mut self) -> Result { + Ok(self.position) + } +} + +#[pin_project::pin_project] +pub struct TrackingWriter { + position: u64, + #[pin] + writer: W, +} +impl TrackingWriter { + pub fn new(start: u64, w: W) -> Self { + Self { + position: start, + writer: w, + } + } + pub fn into_inner(self) -> W { + self.writer + } +} +impl AsyncWrite for TrackingWriter { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + let this = self.project(); + match this.writer.poll_write(cx, buf) { + std::task::Poll::Ready(Ok(written)) => { + *this.position += written as u64; + std::task::Poll::Ready(Ok(written)) + } + a => a, + } + } + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.project().writer.poll_flush(cx) + } + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.project().writer.poll_shutdown(cx) + } +} diff --git a/backend/src/s9pk/merkle_archive/source/http.rs b/backend/src/s9pk/merkle_archive/source/http.rs new file mode 100644 index 000000000..f38fd7028 --- /dev/null +++ b/backend/src/s9pk/merkle_archive/source/http.rs @@ -0,0 +1,91 @@ +use std::sync::Arc; + +use bytes::Bytes; +use futures::stream::BoxStream; +use futures::{StreamExt, TryStreamExt}; +use http::header::{ACCEPT_RANGES, RANGE}; +use reqwest::{Client, Url}; +use tokio::io::AsyncRead; +use tokio::sync::Mutex; +use tokio_util::io::StreamReader; + +use crate::prelude::*; +use crate::s9pk::merkle_archive::source::ArchiveSource; + +#[derive(Clone)] +pub struct HttpSource { + url: Url, + client: Client, + range_support: Result< + (), + (), // Arc>> + >, +} +impl HttpSource { + pub async fn new(client: Client, url: Url) -> Result { + let range_support = client + .head(url.clone()) + .send() + .await + .with_kind(ErrorKind::Network)? + .error_for_status() + .with_kind(ErrorKind::Network)? + .headers() + .get(ACCEPT_RANGES) + .and_then(|s| s.to_str().ok()) + == Some("bytes"); + Ok(Self { + url, + client, + range_support: if range_support { + Ok(()) + } else { + todo!() // Err(Arc::new(Mutex::new(None))) + }, + }) + } +} +#[async_trait::async_trait] +impl ArchiveSource for HttpSource { + type Reader = HttpReader; + async fn fetch(&self, position: u64, size: u64) -> Result { + match self.range_support { + Ok(_) => Ok(HttpReader::Range(StreamReader::new(if size > 0 { + self.client + .get(self.url.clone()) + .header(RANGE, format!("bytes={}-{}", position, position + size - 1)) + .send() + .await + .with_kind(ErrorKind::Network)? + .error_for_status() + .with_kind(ErrorKind::Network)? + .bytes_stream() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) + .boxed() + } else { + futures::stream::empty().boxed() + }))), + _ => todo!(), + } + } +} + +#[pin_project::pin_project(project = HttpReaderProj)] +pub enum HttpReader { + Range(#[pin] StreamReader>, Bytes>), + // Rangeless(#[pin] RangelessReader), +} +impl AsyncRead for HttpReader { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + match self.project() { + HttpReaderProj::Range(r) => r.poll_read(cx, buf), + // HttpReaderProj::Rangeless(r) => r.poll_read(cx, buf), + } + } +} + +// type RangelessReader = StreamReader, Bytes>; diff --git a/backend/src/s9pk/merkle_archive/source/mod.rs b/backend/src/s9pk/merkle_archive/source/mod.rs new file mode 100644 index 000000000..3a7d60a40 --- /dev/null +++ b/backend/src/s9pk/merkle_archive/source/mod.rs @@ -0,0 +1,120 @@ +use std::path::PathBuf; +use std::sync::Arc; + +use blake3::Hash; +use tokio::fs::File; +use tokio::io::{AsyncRead, AsyncWrite}; + +use crate::prelude::*; +use crate::s9pk::merkle_archive::hash::VerifyingWriter; + +pub mod http; +pub mod multi_cursor_file; + +#[async_trait::async_trait] +pub trait FileSource: Send + Sync + Sized + 'static { + type Reader: AsyncRead + Unpin + Send; + async fn size(&self) -> Result; + async fn reader(&self) -> Result; + async fn copy(&self, w: &mut W) -> Result<(), Error> { + tokio::io::copy(&mut self.reader().await?, w).await?; + Ok(()) + } + async fn copy_verify( + &self, + w: &mut W, + verify: Option, + ) -> Result<(), Error> { + let mut w = VerifyingWriter::new(w, verify); + tokio::io::copy(&mut self.reader().await?, &mut w).await?; + w.verify()?; + Ok(()) + } + async fn to_vec(&self, verify: Option) -> Result, Error> { + let mut vec = Vec::with_capacity(self.size().await? as usize); + self.copy_verify(&mut vec, verify).await?; + Ok(vec) + } +} + +#[async_trait::async_trait] +impl FileSource for PathBuf { + type Reader = File; + async fn size(&self) -> Result { + Ok(tokio::fs::metadata(self).await?.len()) + } + async fn reader(&self) -> Result { + Ok(File::open(self).await?) + } +} + +#[async_trait::async_trait] +impl FileSource for Arc<[u8]> { + type Reader = std::io::Cursor; + async fn size(&self) -> Result { + Ok(self.len() as u64) + } + async fn reader(&self) -> Result { + Ok(std::io::Cursor::new(self.clone())) + } + async fn copy(&self, w: &mut W) -> Result<(), Error> { + use tokio::io::AsyncWriteExt; + + w.write_all(&*self).await?; + Ok(()) + } +} + +#[async_trait::async_trait] +pub trait ArchiveSource: Clone + Send + Sync + Sized + 'static { + type Reader: AsyncRead + Unpin + Send; + async fn fetch(&self, position: u64, size: u64) -> Result; + async fn copy_to( + &self, + position: u64, + size: u64, + w: &mut W, + ) -> Result<(), Error> { + tokio::io::copy(&mut self.fetch(position, size).await?, w).await?; + Ok(()) + } + fn section(&self, position: u64, size: u64) -> Section { + Section { + source: self.clone(), + position, + size, + } + } +} + +#[async_trait::async_trait] +impl ArchiveSource for Arc<[u8]> { + type Reader = tokio::io::Take>; + async fn fetch(&self, position: u64, size: u64) -> Result { + use tokio::io::AsyncReadExt; + + let mut cur = std::io::Cursor::new(self.clone()); + cur.set_position(position); + Ok(cur.take(size)) + } +} + +#[derive(Debug)] +pub struct Section { + source: S, + position: u64, + size: u64, +} +#[async_trait::async_trait] +impl FileSource for Section { + type Reader = S::Reader; + async fn size(&self) -> Result { + Ok(self.size) + } + async fn reader(&self) -> Result { + self.source.fetch(self.position, self.size).await + } + async fn copy(&self, w: &mut W) -> Result<(), Error> { + self.source.copy_to(self.position, self.size, w).await + } +} diff --git a/backend/src/s9pk/merkle_archive/source/multi_cursor_file.rs b/backend/src/s9pk/merkle_archive/source/multi_cursor_file.rs new file mode 100644 index 000000000..cda3e5103 --- /dev/null +++ b/backend/src/s9pk/merkle_archive/source/multi_cursor_file.rs @@ -0,0 +1,84 @@ +use std::io::SeekFrom; +use std::os::fd::{AsRawFd, RawFd}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use tokio::fs::File; +use tokio::io::AsyncRead; +use tokio::sync::{Mutex, OwnedMutexGuard}; + +use crate::disk::mount::filesystem::loop_dev::LoopDev; +use crate::prelude::*; +use crate::s9pk::merkle_archive::source::{ArchiveSource, Section}; + +#[derive(Clone)] +pub struct MultiCursorFile { + fd: RawFd, + file: Arc>, +} +impl MultiCursorFile { + fn path(&self) -> PathBuf { + Path::new("/proc/self/fd").join(self.fd.to_string()) + } +} +impl From for MultiCursorFile { + fn from(value: File) -> Self { + Self { + fd: value.as_raw_fd(), + file: Arc::new(Mutex::new(value)), + } + } +} + +#[pin_project::pin_project] +pub struct FileSectionReader { + #[pin] + file: OwnedMutexGuard, + remaining: u64, +} +impl AsyncRead for FileSectionReader { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let this = self.project(); + if *this.remaining == 0 { + return std::task::Poll::Ready(Ok(())); + } + let before = buf.filled().len() as u64; + let res = std::pin::Pin::new(&mut **this.file.get_mut()) + .poll_read(cx, &mut buf.take(*this.remaining as usize)); + *this.remaining = this + .remaining + .saturating_sub(buf.filled().len() as u64 - before); + res + } +} + +#[async_trait::async_trait] +impl ArchiveSource for MultiCursorFile { + type Reader = FileSectionReader; + async fn fetch(&self, position: u64, size: u64) -> Result { + use tokio::io::AsyncSeekExt; + + let mut file = if let Ok(file) = self.file.clone().try_lock_owned() { + file + } else { + Arc::new(Mutex::new(File::open(self.path()).await?)) + .try_lock_owned() + .expect("freshly created") + }; + file.seek(SeekFrom::Start(position)).await?; + Ok(Self::Reader { + file, + remaining: size, + }) + } +} + +impl From> for LoopDev { + fn from(value: Section) -> Self { + LoopDev::new(value.source.path(), value.position, value.size) + } +} diff --git a/backend/src/s9pk/merkle_archive/test.rs b/backend/src/s9pk/merkle_archive/test.rs new file mode 100644 index 000000000..430ab4f31 --- /dev/null +++ b/backend/src/s9pk/merkle_archive/test.rs @@ -0,0 +1,138 @@ +use std::collections::BTreeMap; +use std::io::Cursor; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use ed25519_dalek::SigningKey; + +use crate::prelude::*; +use crate::s9pk::merkle_archive::directory_contents::DirectoryContents; +use crate::s9pk::merkle_archive::file_contents::FileContents; +use crate::s9pk::merkle_archive::sink::TrackingWriter; +use crate::s9pk::merkle_archive::source::FileSource; +use crate::s9pk::merkle_archive::{Entry, EntryContents, MerkleArchive}; + +/// Creates a MerkleArchive (a1) with the provided files at the provided paths. NOTE: later files can overwrite previous files/directories at the same path +/// Tests: +/// - a1.update_hashes(): returns Ok(_) +/// - a1.serialize(verify: true): returns Ok(s1) +/// - MerkleArchive::deserialize(s1): returns Ok(a2) +/// - a2: contains all expected files with expected content +/// - a2.serialize(verify: true): returns Ok(s2) +/// - s1 == s2 +#[instrument] +fn test(files: Vec<(PathBuf, String)>) -> Result<(), Error> { + let mut root = DirectoryContents::>::new(); + let mut check_set = BTreeMap::::new(); + for (path, content) in files { + if let Err(e) = root.insert_path( + &path, + Entry::new(EntryContents::File(FileContents::new( + content.clone().into_bytes().into(), + ))), + ) { + eprintln!("failed to insert file at {path:?}: {e}"); + } else { + let path = path.strip_prefix("/").unwrap_or(&path); + let mut remaining = check_set.split_off(path); + while { + if let Some((p, s)) = remaining.pop_first() { + if !p.starts_with(path) { + remaining.insert(p, s); + false + } else { + true + } + } else { + false + } + } {} + check_set.append(&mut remaining); + check_set.insert(path.to_owned(), content); + } + } + let key = SigningKey::generate(&mut rand::thread_rng()); + let mut a1 = MerkleArchive::new(root, key); + tokio::runtime::Builder::new_current_thread() + .enable_io() + .build() + .unwrap() + .block_on(async move { + a1.update_hashes(true).await?; + let mut s1 = Vec::new(); + a1.serialize(&mut TrackingWriter::new(0, &mut s1), true) + .await?; + let s1: Arc<[u8]> = s1.into(); + let a2 = MerkleArchive::deserialize(&s1, &mut Cursor::new(s1.clone())).await?; + + for (path, content) in check_set { + match a2 + .contents + .get_path(&path) + .map(|e| (e.as_contents(), e.hash())) + { + Some((EntryContents::File(f), hash)) => { + ensure_code!( + &f.to_vec(hash).await? == content.as_bytes(), + ErrorKind::ParseS9pk, + "File at {path:?} does not match input" + ) + } + _ => { + return Err(Error::new( + eyre!("expected file at {path:?}"), + ErrorKind::ParseS9pk, + )) + } + } + } + + let mut s2 = Vec::new(); + a2.serialize(&mut TrackingWriter::new(0, &mut s2), true) + .await?; + let s2: Arc<[u8]> = s2.into(); + ensure_code!(s1 == s2, ErrorKind::Pack, "s1 does not match s2"); + + Ok(()) + }) +} + +proptest::proptest! { + #[test] + fn property_test(files: Vec<(PathBuf, String)>) { + let files: Vec<(PathBuf, String)> = files.into_iter().filter(|(p, _)| p.file_name().is_some() && p.iter().all(|s| s.to_str().is_some())).collect(); + if let Err(e) = test(files.clone()) { + panic!("{e}\nInput: {files:#?}\n{e:?}"); + } + } +} + +#[test] +fn test_example_1() { + if let Err(e) = test(vec![(Path::new("foo").into(), "bar".into())]) { + panic!("{e}\n{e:?}"); + } +} + +#[test] +fn test_example_2() { + if let Err(e) = test(vec![ + (Path::new("a/a.txt").into(), "a.txt".into()), + (Path::new("a/b/a.txt").into(), "a.txt".into()), + (Path::new("a/b/b/a.txt").into(), "a.txt".into()), + (Path::new("a/b/c.txt").into(), "c.txt".into()), + (Path::new("a/c.txt").into(), "c.txt".into()), + ]) { + panic!("{e}\n{e:?}"); + } +} + +#[test] +fn test_example_3() { + if let Err(e) = test(vec![ + (Path::new("b/a").into(), "𑦪".into()), + (Path::new("a/c/a").into(), "·".into()), + ]) { + panic!("{e}\n{e:?}"); + } +} diff --git a/backend/src/s9pk/merkle_archive/varint.rs b/backend/src/s9pk/merkle_archive/varint.rs new file mode 100644 index 000000000..479b488e6 --- /dev/null +++ b/backend/src/s9pk/merkle_archive/varint.rs @@ -0,0 +1,159 @@ +use integer_encoding::VarInt; +use tokio::io::{AsyncRead, AsyncWrite}; + +use crate::prelude::*; + +/// Most-significant byte, == 0x80 +pub const MSB: u8 = 0b1000_0000; + +const MAX_STR_LEN: u64 = 1024 * 1024; // 1 MiB + +pub fn serialized_varint_size(n: u64) -> u64 { + VarInt::required_space(n) as u64 +} + +pub async fn serialize_varint( + n: u64, + w: &mut W, +) -> Result<(), Error> { + use tokio::io::AsyncWriteExt; + + let mut buf = [0 as u8; 10]; + let b = n.encode_var(&mut buf); + w.write_all(&buf[0..b]).await?; + + Ok(()) +} + +pub fn serialized_varstring_size(s: &str) -> u64 { + serialized_varint_size(s.len() as u64) + s.len() as u64 +} + +pub async fn serialize_varstring( + s: &str, + w: &mut W, +) -> Result<(), Error> { + use tokio::io::AsyncWriteExt; + serialize_varint(s.len() as u64, w).await?; + w.write_all(s.as_bytes()).await?; + Ok(()) +} + +#[derive(Default)] +struct VarIntProcessor { + buf: [u8; 10], + maxsize: usize, + i: usize, +} + +impl VarIntProcessor { + fn new() -> VarIntProcessor { + VarIntProcessor { + maxsize: (std::mem::size_of::() * 8 + 7) / 7, + ..VarIntProcessor::default() + } + } + fn push(&mut self, b: u8) -> Result<(), Error> { + if self.i >= self.maxsize { + return Err(Error::new( + eyre!("Unterminated varint"), + ErrorKind::ParseS9pk, + )); + } + self.buf[self.i] = b; + self.i += 1; + Ok(()) + } + fn finished(&self) -> bool { + self.i > 0 && (self.buf[self.i - 1] & MSB == 0) + } + fn decode(&self) -> Option { + Some(u64::decode_var(&self.buf[0..self.i])?.0) + } +} + +pub async fn deserialize_varint(r: &mut R) -> Result { + use tokio::io::AsyncReadExt; + + let mut buf = [0 as u8; 1]; + let mut p = VarIntProcessor::new(); + + while !p.finished() { + r.read_exact(&mut buf).await?; + + p.push(buf[0])?; + } + + p.decode() + .ok_or_else(|| Error::new(eyre!("Reached EOF"), ErrorKind::ParseS9pk)) +} + +pub async fn deserialize_varstring(r: &mut R) -> Result { + use tokio::io::AsyncReadExt; + + let len = std::cmp::min(deserialize_varint(r).await?, MAX_STR_LEN); + let mut res = String::with_capacity(len as usize); + r.take(len).read_to_string(&mut res).await?; + Ok(res) +} + +#[cfg(test)] +mod test { + use std::io::Cursor; + + use crate::prelude::*; + + fn test_int(n: u64) -> Result<(), Error> { + let n1 = n; + tokio::runtime::Builder::new_current_thread() + .enable_io() + .build() + .unwrap() + .block_on(async move { + let mut v = Vec::new(); + super::serialize_varint(n1, &mut v).await?; + let n2 = super::deserialize_varint(&mut Cursor::new(v)).await?; + + ensure_code!(n1 == n2, ErrorKind::Deserialization, "n1 does not match n2"); + + Ok(()) + }) + } + + fn test_string(s: &str) -> Result<(), Error> { + let s1 = s; + tokio::runtime::Builder::new_current_thread() + .enable_io() + .build() + .unwrap() + .block_on(async move { + let mut v: Vec = Vec::new(); + super::serialize_varstring(&s1, &mut v).await?; + let s2 = super::deserialize_varstring(&mut Cursor::new(v)).await?; + + ensure_code!( + s1 == &s2, + ErrorKind::Deserialization, + "s1 does not match s2" + ); + + Ok(()) + }) + } + + proptest::proptest! { + #[test] + fn proptest_int(n: u64) { + if let Err(e) = test_int(n) { + panic!("{e}\nInput: {n}\n{e:?}"); + } + } + + #[test] + fn proptest_string(s: String) { + if let Err(e) = test_string(&s) { + panic!("{e}\nInput: {s:?}\n{e:?}"); + } + } + } +} diff --git a/backend/src/s9pk/merkle_archive/write_queue.rs b/backend/src/s9pk/merkle_archive/write_queue.rs new file mode 100644 index 000000000..973ffcf30 --- /dev/null +++ b/backend/src/s9pk/merkle_archive/write_queue.rs @@ -0,0 +1,47 @@ +use std::collections::VecDeque; + +use crate::prelude::*; +use crate::s9pk::merkle_archive::sink::Sink; +use crate::s9pk::merkle_archive::source::FileSource; +use crate::s9pk::merkle_archive::{Entry, EntryContents}; +use crate::util::MaybeOwned; + +pub struct WriteQueue<'a, S> { + next_available_position: u64, + queue: VecDeque<&'a Entry>, +} + +impl<'a, S> WriteQueue<'a, S> { + pub fn new(next_available_position: u64) -> Self { + Self { + next_available_position, + queue: VecDeque::new(), + } + } +} +impl<'a, S: FileSource> WriteQueue<'a, S> { + pub async fn add(&mut self, entry: &'a Entry) -> Result { + let res = self.next_available_position; + let size = match entry.as_contents() { + EntryContents::Missing => return Ok(0), + EntryContents::File(f) => f.size().await?, + EntryContents::Directory(d) => d.toc_size(), + }; + self.next_available_position += size; + self.queue.push_back(entry); + Ok(res) + } + pub async fn serialize(&mut self, w: &mut W, verify: bool) -> Result<(), Error> { + loop { + let Some(next) = self.queue.pop_front() else { + break; + }; + match next.as_contents() { + EntryContents::Missing => (), + EntryContents::File(f) => f.serialize_body(w, next.hash.filter(|_| verify)).await?, + EntryContents::Directory(d) => d.serialize_toc(self, w).await?, + } + } + Ok(()) + } +} diff --git a/backend/src/s9pk/mod.rs b/backend/src/s9pk/mod.rs index e1bf4caba..6720f2999 100644 --- a/backend/src/s9pk/mod.rs +++ b/backend/src/s9pk/mod.rs @@ -1,246 +1,5 @@ -use std::ffi::OsStr; -use std::path::PathBuf; +pub mod merkle_archive; +pub mod v1; +pub mod v2; -use color_eyre::eyre::eyre; -use futures::TryStreamExt; -use imbl::OrdMap; -use rpc_toolkit::command; -use serde_json::Value; -use tokio::io::AsyncRead; -use tracing::instrument; - -use crate::context::SdkContext; -use crate::s9pk::builder::S9pkPacker; -use crate::s9pk::docker::DockerMultiArch; -use crate::s9pk::git_hash::GitHash; -use crate::s9pk::manifest::Manifest; -use crate::s9pk::reader::S9pkReader; -use crate::util::display_none; -use crate::util::io::BufferedWriteReader; -use crate::util::serde::IoFormat; -use crate::volume::Volume; -use crate::{Error, ErrorKind, ResultExt}; - -pub mod builder; -pub mod docker; -pub mod git_hash; -pub mod header; -pub mod manifest; -pub mod reader; - -pub const SIG_CONTEXT: &[u8] = b"s9pk"; - -#[command(cli_only, display(display_none))] -#[instrument(skip_all)] -pub async fn pack(#[context] ctx: SdkContext, #[arg] path: Option) -> Result<(), Error> { - use tokio::fs::File; - - let path = if let Some(path) = path { - path - } else { - std::env::current_dir()? - }; - let manifest_value: Value = if path.join("manifest.toml").exists() { - IoFormat::Toml - .from_async_reader(File::open(path.join("manifest.toml")).await?) - .await? - } else if path.join("manifest.yaml").exists() { - IoFormat::Yaml - .from_async_reader(File::open(path.join("manifest.yaml")).await?) - .await? - } else if path.join("manifest.json").exists() { - IoFormat::Json - .from_async_reader(File::open(path.join("manifest.json")).await?) - .await? - } else { - return Err(Error::new( - eyre!("manifest not found"), - crate::ErrorKind::Pack, - )); - }; - - let manifest: Manifest = serde_json::from_value::(manifest_value.clone()) - .with_kind(crate::ErrorKind::Deserialization)? - .with_git_hash(GitHash::from_path(&path).await?); - let extra_keys = - enumerate_extra_keys(&serde_json::to_value(&manifest).unwrap(), &manifest_value); - for k in extra_keys { - tracing::warn!("Unrecognized Manifest Key: {}", k); - } - - let outfile_path = path.join(format!("{}.s9pk", manifest.id)); - let mut outfile = File::create(outfile_path).await?; - S9pkPacker::builder() - .manifest(&manifest) - .writer(&mut outfile) - .license( - File::open(path.join(manifest.assets.license_path())) - .await - .with_ctx(|_| { - ( - crate::ErrorKind::Filesystem, - manifest.assets.license_path().display().to_string(), - ) - })?, - ) - .icon( - File::open(path.join(manifest.assets.icon_path())) - .await - .with_ctx(|_| { - ( - crate::ErrorKind::Filesystem, - manifest.assets.icon_path().display().to_string(), - ) - })?, - ) - .instructions( - File::open(path.join(manifest.assets.instructions_path())) - .await - .with_ctx(|_| { - ( - crate::ErrorKind::Filesystem, - manifest.assets.instructions_path().display().to_string(), - ) - })?, - ) - .docker_images({ - let docker_images_path = path.join(manifest.assets.docker_images_path()); - let res: Box = if tokio::fs::metadata(&docker_images_path).await?.is_dir() { - let tars: Vec<_> = tokio_stream::wrappers::ReadDirStream::new(tokio::fs::read_dir(&docker_images_path).await?).try_collect().await?; - let mut arch_info = DockerMultiArch::default(); - for tar in &tars { - if tar.path().extension() == Some(OsStr::new("tar")) { - arch_info.available.insert(tar.path().file_stem().unwrap_or_default().to_str().unwrap_or_default().to_owned()); - } - } - if arch_info.available.contains("aarch64") { - arch_info.default = "aarch64".to_owned(); - } else { - arch_info.default = arch_info.available.iter().next().cloned().unwrap_or_default(); - } - let arch_info_cbor = IoFormat::Cbor.to_vec(&arch_info)?; - Box::new(BufferedWriteReader::new(|w| async move { - let mut docker_images = tokio_tar::Builder::new(w); - let mut multiarch_header = tokio_tar::Header::new_gnu(); - multiarch_header.set_path("multiarch.cbor")?; - multiarch_header.set_size(arch_info_cbor.len() as u64); - multiarch_header.set_cksum(); - docker_images.append(&multiarch_header, std::io::Cursor::new(arch_info_cbor)).await?; - for tar in tars - { - docker_images - .append_path_with_name( - tar.path(), - tar.file_name(), - ) - .await?; - } - Ok::<_, std::io::Error>(()) - }, 1024 * 1024)) - } else { - Box::new(File::open(docker_images_path) - .await - .with_ctx(|_| { - ( - crate::ErrorKind::Filesystem, - manifest.assets.docker_images_path().display().to_string(), - ) - })?) - }; - res - }) - .assets({ - let asset_volumes = manifest - .volumes - .iter() - .filter(|(_, v)| matches!(v, &&Volume::Assets {})).map(|(id, _)| id.clone()).collect::>(); - let assets_path = manifest.assets.assets_path().to_owned(); - let path = path.clone(); - - BufferedWriteReader::new(|w| async move { - let mut assets = tokio_tar::Builder::new(w); - for asset_volume in asset_volumes - { - assets - .append_dir_all( - &asset_volume, - path.join(&assets_path).join(&asset_volume), - ) - .await?; - } - Ok::<_, std::io::Error>(()) - }, 1024 * 1024) - }) - .scripts({ - let script_path = path.join(manifest.assets.scripts_path()).join("embassy.js"); - let needs_script = manifest.package_procedures().any(|a| a.is_script()); - let has_script = script_path.exists(); - match (needs_script, has_script) { - (true, true) => Some(File::open(script_path).await?), - (true, false) => { - return Err(Error::new(eyre!("Script is declared in manifest, but no such script exists at ./scripts/embassy.js"), ErrorKind::Pack).into()) - } - (false, true) => { - tracing::warn!("Manifest does not declare any actions that use scripts, but a script exists at ./scripts/embassy.js"); - None - } - (false, false) => None - } - }) - .build() - .pack(&ctx.developer_key()?) - .await?; - outfile.sync_all().await?; - - Ok(()) -} - -#[command(rename = "s9pk", cli_only, display(display_none))] -pub async fn verify(#[arg] path: PathBuf) -> Result<(), Error> { - let mut s9pk = S9pkReader::open(path, true).await?; - s9pk.validate().await?; - - Ok(()) -} - -fn enumerate_extra_keys(reference: &Value, candidate: &Value) -> Vec { - match (reference, candidate) { - (Value::Object(m_r), Value::Object(m_c)) => { - let om_r: OrdMap = m_r.clone().into_iter().collect(); - let om_c: OrdMap = m_c.clone().into_iter().collect(); - let common = om_r.clone().intersection(om_c.clone()); - let top_extra = common.clone().symmetric_difference(om_c.clone()); - let mut all_extra = top_extra - .keys() - .map(|s| format!(".{}", s)) - .collect::>(); - for (k, v) in common { - all_extra.extend( - enumerate_extra_keys(&v, om_c.get(&k).unwrap()) - .into_iter() - .map(|s| format!(".{}{}", k, s)), - ) - } - all_extra - } - (_, Value::Object(m1)) => m1.clone().keys().map(|s| format!(".{}", s)).collect(), - _ => Vec::new(), - } -} - -#[test] -fn test_enumerate_extra_keys() { - use serde_json::json; - let extras = enumerate_extra_keys( - &json!({ - "test": 1, - "test2": null, - }), - &json!({ - "test": 1, - "test2": { "test3": null }, - "test4": null - }), - ); - println!("{:?}", extras) -} +pub use v1::*; diff --git a/backend/src/s9pk/specv2.md b/backend/src/s9pk/specv2.md deleted file mode 100644 index 9bf993463..000000000 --- a/backend/src/s9pk/specv2.md +++ /dev/null @@ -1,28 +0,0 @@ -## Header - -### Magic - -2B: `0x3b3b` - -### Version - -varint: `0x02` - -### Pubkey - -32B: ed25519 pubkey - -### TOC - -- number of sections (varint) -- FOREACH section - - sig (32B: ed25519 signature of BLAKE-3 of rest of section) - - name (varstring) - - TYPE (varint) - - TYPE=FILE (`0x01`) - - mime (varstring) - - pos (32B: u64 BE) - - len (32B: u64 BE) - - hash (32B: BLAKE-3 of file contents) - - TYPE=TOC (`0x02`) - - recursively defined diff --git a/backend/src/s9pk/builder.rs b/backend/src/s9pk/v1/builder.rs similarity index 100% rename from backend/src/s9pk/builder.rs rename to backend/src/s9pk/v1/builder.rs diff --git a/backend/src/s9pk/docker.rs b/backend/src/s9pk/v1/docker.rs similarity index 100% rename from backend/src/s9pk/docker.rs rename to backend/src/s9pk/v1/docker.rs diff --git a/backend/src/s9pk/git_hash.rs b/backend/src/s9pk/v1/git_hash.rs similarity index 100% rename from backend/src/s9pk/git_hash.rs rename to backend/src/s9pk/v1/git_hash.rs diff --git a/backend/src/s9pk/header.rs b/backend/src/s9pk/v1/header.rs similarity index 100% rename from backend/src/s9pk/header.rs rename to backend/src/s9pk/v1/header.rs diff --git a/backend/src/s9pk/manifest.rs b/backend/src/s9pk/v1/manifest.rs similarity index 100% rename from backend/src/s9pk/manifest.rs rename to backend/src/s9pk/v1/manifest.rs diff --git a/backend/src/s9pk/v1/mod.rs b/backend/src/s9pk/v1/mod.rs new file mode 100644 index 000000000..e1bf4caba --- /dev/null +++ b/backend/src/s9pk/v1/mod.rs @@ -0,0 +1,246 @@ +use std::ffi::OsStr; +use std::path::PathBuf; + +use color_eyre::eyre::eyre; +use futures::TryStreamExt; +use imbl::OrdMap; +use rpc_toolkit::command; +use serde_json::Value; +use tokio::io::AsyncRead; +use tracing::instrument; + +use crate::context::SdkContext; +use crate::s9pk::builder::S9pkPacker; +use crate::s9pk::docker::DockerMultiArch; +use crate::s9pk::git_hash::GitHash; +use crate::s9pk::manifest::Manifest; +use crate::s9pk::reader::S9pkReader; +use crate::util::display_none; +use crate::util::io::BufferedWriteReader; +use crate::util::serde::IoFormat; +use crate::volume::Volume; +use crate::{Error, ErrorKind, ResultExt}; + +pub mod builder; +pub mod docker; +pub mod git_hash; +pub mod header; +pub mod manifest; +pub mod reader; + +pub const SIG_CONTEXT: &[u8] = b"s9pk"; + +#[command(cli_only, display(display_none))] +#[instrument(skip_all)] +pub async fn pack(#[context] ctx: SdkContext, #[arg] path: Option) -> Result<(), Error> { + use tokio::fs::File; + + let path = if let Some(path) = path { + path + } else { + std::env::current_dir()? + }; + let manifest_value: Value = if path.join("manifest.toml").exists() { + IoFormat::Toml + .from_async_reader(File::open(path.join("manifest.toml")).await?) + .await? + } else if path.join("manifest.yaml").exists() { + IoFormat::Yaml + .from_async_reader(File::open(path.join("manifest.yaml")).await?) + .await? + } else if path.join("manifest.json").exists() { + IoFormat::Json + .from_async_reader(File::open(path.join("manifest.json")).await?) + .await? + } else { + return Err(Error::new( + eyre!("manifest not found"), + crate::ErrorKind::Pack, + )); + }; + + let manifest: Manifest = serde_json::from_value::(manifest_value.clone()) + .with_kind(crate::ErrorKind::Deserialization)? + .with_git_hash(GitHash::from_path(&path).await?); + let extra_keys = + enumerate_extra_keys(&serde_json::to_value(&manifest).unwrap(), &manifest_value); + for k in extra_keys { + tracing::warn!("Unrecognized Manifest Key: {}", k); + } + + let outfile_path = path.join(format!("{}.s9pk", manifest.id)); + let mut outfile = File::create(outfile_path).await?; + S9pkPacker::builder() + .manifest(&manifest) + .writer(&mut outfile) + .license( + File::open(path.join(manifest.assets.license_path())) + .await + .with_ctx(|_| { + ( + crate::ErrorKind::Filesystem, + manifest.assets.license_path().display().to_string(), + ) + })?, + ) + .icon( + File::open(path.join(manifest.assets.icon_path())) + .await + .with_ctx(|_| { + ( + crate::ErrorKind::Filesystem, + manifest.assets.icon_path().display().to_string(), + ) + })?, + ) + .instructions( + File::open(path.join(manifest.assets.instructions_path())) + .await + .with_ctx(|_| { + ( + crate::ErrorKind::Filesystem, + manifest.assets.instructions_path().display().to_string(), + ) + })?, + ) + .docker_images({ + let docker_images_path = path.join(manifest.assets.docker_images_path()); + let res: Box = if tokio::fs::metadata(&docker_images_path).await?.is_dir() { + let tars: Vec<_> = tokio_stream::wrappers::ReadDirStream::new(tokio::fs::read_dir(&docker_images_path).await?).try_collect().await?; + let mut arch_info = DockerMultiArch::default(); + for tar in &tars { + if tar.path().extension() == Some(OsStr::new("tar")) { + arch_info.available.insert(tar.path().file_stem().unwrap_or_default().to_str().unwrap_or_default().to_owned()); + } + } + if arch_info.available.contains("aarch64") { + arch_info.default = "aarch64".to_owned(); + } else { + arch_info.default = arch_info.available.iter().next().cloned().unwrap_or_default(); + } + let arch_info_cbor = IoFormat::Cbor.to_vec(&arch_info)?; + Box::new(BufferedWriteReader::new(|w| async move { + let mut docker_images = tokio_tar::Builder::new(w); + let mut multiarch_header = tokio_tar::Header::new_gnu(); + multiarch_header.set_path("multiarch.cbor")?; + multiarch_header.set_size(arch_info_cbor.len() as u64); + multiarch_header.set_cksum(); + docker_images.append(&multiarch_header, std::io::Cursor::new(arch_info_cbor)).await?; + for tar in tars + { + docker_images + .append_path_with_name( + tar.path(), + tar.file_name(), + ) + .await?; + } + Ok::<_, std::io::Error>(()) + }, 1024 * 1024)) + } else { + Box::new(File::open(docker_images_path) + .await + .with_ctx(|_| { + ( + crate::ErrorKind::Filesystem, + manifest.assets.docker_images_path().display().to_string(), + ) + })?) + }; + res + }) + .assets({ + let asset_volumes = manifest + .volumes + .iter() + .filter(|(_, v)| matches!(v, &&Volume::Assets {})).map(|(id, _)| id.clone()).collect::>(); + let assets_path = manifest.assets.assets_path().to_owned(); + let path = path.clone(); + + BufferedWriteReader::new(|w| async move { + let mut assets = tokio_tar::Builder::new(w); + for asset_volume in asset_volumes + { + assets + .append_dir_all( + &asset_volume, + path.join(&assets_path).join(&asset_volume), + ) + .await?; + } + Ok::<_, std::io::Error>(()) + }, 1024 * 1024) + }) + .scripts({ + let script_path = path.join(manifest.assets.scripts_path()).join("embassy.js"); + let needs_script = manifest.package_procedures().any(|a| a.is_script()); + let has_script = script_path.exists(); + match (needs_script, has_script) { + (true, true) => Some(File::open(script_path).await?), + (true, false) => { + return Err(Error::new(eyre!("Script is declared in manifest, but no such script exists at ./scripts/embassy.js"), ErrorKind::Pack).into()) + } + (false, true) => { + tracing::warn!("Manifest does not declare any actions that use scripts, but a script exists at ./scripts/embassy.js"); + None + } + (false, false) => None + } + }) + .build() + .pack(&ctx.developer_key()?) + .await?; + outfile.sync_all().await?; + + Ok(()) +} + +#[command(rename = "s9pk", cli_only, display(display_none))] +pub async fn verify(#[arg] path: PathBuf) -> Result<(), Error> { + let mut s9pk = S9pkReader::open(path, true).await?; + s9pk.validate().await?; + + Ok(()) +} + +fn enumerate_extra_keys(reference: &Value, candidate: &Value) -> Vec { + match (reference, candidate) { + (Value::Object(m_r), Value::Object(m_c)) => { + let om_r: OrdMap = m_r.clone().into_iter().collect(); + let om_c: OrdMap = m_c.clone().into_iter().collect(); + let common = om_r.clone().intersection(om_c.clone()); + let top_extra = common.clone().symmetric_difference(om_c.clone()); + let mut all_extra = top_extra + .keys() + .map(|s| format!(".{}", s)) + .collect::>(); + for (k, v) in common { + all_extra.extend( + enumerate_extra_keys(&v, om_c.get(&k).unwrap()) + .into_iter() + .map(|s| format!(".{}{}", k, s)), + ) + } + all_extra + } + (_, Value::Object(m1)) => m1.clone().keys().map(|s| format!(".{}", s)).collect(), + _ => Vec::new(), + } +} + +#[test] +fn test_enumerate_extra_keys() { + use serde_json::json; + let extras = enumerate_extra_keys( + &json!({ + "test": 1, + "test2": null, + }), + &json!({ + "test": 1, + "test2": { "test3": null }, + "test4": null + }), + ); + println!("{:?}", extras) +} diff --git a/backend/src/s9pk/reader.rs b/backend/src/s9pk/v1/reader.rs similarity index 100% rename from backend/src/s9pk/reader.rs rename to backend/src/s9pk/v1/reader.rs diff --git a/backend/src/s9pk/v2/mod.rs b/backend/src/s9pk/v2/mod.rs new file mode 100644 index 000000000..be42d0612 --- /dev/null +++ b/backend/src/s9pk/v2/mod.rs @@ -0,0 +1,41 @@ +use crate::prelude::*; +use crate::s9pk::merkle_archive::sink::Sink; +use crate::s9pk::merkle_archive::source::{ArchiveSource, FileSource, Section}; +use crate::s9pk::merkle_archive::MerkleArchive; + +const MAGIC_AND_VERSION: &[u8] = &[0x3b, 0x3b, 0x02]; + +pub struct S9pk(MerkleArchive); +impl S9pk { + pub async fn serialize(&mut self, w: &mut W, verify: bool) -> Result<(), Error> { + use tokio::io::AsyncWriteExt; + + w.write_all(MAGIC_AND_VERSION).await?; + self.0.serialize(w, verify).await?; + + Ok(()) + } +} + +impl S9pk> { + pub async fn deserialize(source: &S) -> Result { + use tokio::io::AsyncReadExt; + + let mut header = source + .fetch( + 0, + MAGIC_AND_VERSION.len() as u64 + MerkleArchive::>::header_size(), + ) + .await?; + + let mut magic_version = [0u8; 3]; + header.read_exact(&mut magic_version).await?; + ensure_code!( + &magic_version == MAGIC_AND_VERSION, + ErrorKind::ParseS9pk, + "Invalid Magic or Unexpected Version" + ); + + Ok(Self(MerkleArchive::deserialize(source, &mut header).await?)) + } +} diff --git a/backend/src/s9pk/v2/specv2.md b/backend/src/s9pk/v2/specv2.md new file mode 100644 index 000000000..08dc3336e --- /dev/null +++ b/backend/src/s9pk/v2/specv2.md @@ -0,0 +1,89 @@ +## Magic + +`0x3b3b` + +## Version + +`0x02` (varint) + +## Merkle Archive + +### Header + +- ed25519 pubkey (32B) +- ed25519 signature of TOC sighash (64B) +- TOC sighash: (32B) +- TOC position: (8B: u64 BE) +- TOC size: (8B: u64 BE) + +### TOC + +- number of entries (varint) +- FOREACH section + - name (varstring) + - hash (32B: BLAKE-3 of file contents / TOC sighash) + - TYPE (1B) + - TYPE=MISSING (`0x00`) + - TYPE=FILE (`0x01`) + - position (8B: u64 BE) + - size (8B: u64 BE) + - TYPE=TOC (`0x02`) + - position (8B: u64 BE) + - size (8B: u64 BE) + +#### SigHash +Hash of TOC with all contents MISSING + +### FILE + +`` + +# Example + +`foo/bar/baz.txt` + +ROOT TOC: + - 1 section + - name: foo + hash: sighash('a) + type: TOC + position: 'a + length: _ + +'a: + - 1 section + - name: bar + hash: sighash('b) + type: TOC + position: 'b + size: _ + +'b: + - 2 sections + - name: baz.txt + hash: hash('c) + type: FILE + position: 'c + length: _ + - name: qux + hash: `` + type: MISSING + +'c: `` + +"foo/" +hash: _ +size: 15b + +"bar.txt" +hash: _ +size: 5b + +`` ( + "baz.txt" + hash: _ + size: 2b +) +`` ("hello") +`` ("hi") + diff --git a/backend/src/util/mod.rs b/backend/src/util/mod.rs index 2683f23c8..34c05934b 100644 --- a/backend/src/util/mod.rs +++ b/backend/src/util/mod.rs @@ -466,3 +466,27 @@ impl FileLock { pub fn assure_send(x: T) -> T { x } + +pub enum MaybeOwned<'a, T> { + Borrowed(&'a T), + Owned(T), +} +impl<'a, T> std::ops::Deref for MaybeOwned<'a, T> { + type Target = T; + fn deref(&self) -> &Self::Target { + match self { + Self::Borrowed(a) => *a, + Self::Owned(a) => a, + } + } +} +impl<'a, T> From for MaybeOwned<'a, T> { + fn from(value: T) -> Self { + MaybeOwned::Owned(value) + } +} +impl<'a, T> From<&'a T> for MaybeOwned<'a, T> { + fn from(value: &'a T) -> Self { + MaybeOwned::Borrowed(value) + } +}