From e9929d69657ca85fd240e6ad470b93f475c7c3b8 Mon Sep 17 00:00:00 2001 From: Simon Davies Date: Wed, 6 Oct 2021 11:31:04 +0100 Subject: [PATCH] Emit events for data changes --- Cargo.lock | 254 ++++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + bin/server.rs | 23 ++++ docs/README.md | 23 ++++ docs/example-events.md | 98 +++++++++++++++ src/events/defaultsink.rs | 25 ++++ src/events/filesink.rs | 112 +++++++++++++++++ src/events/mod.rs | 54 ++++++++ src/lib.rs | 1 + src/server/handlers.rs | 49 ++++++-- src/server/mod.rs | 178 +++++++++++++++++++++++--- src/server/routes.rs | 46 +++++-- src/testing/mod.rs | 77 +++++++++++- 13 files changed, 900 insertions(+), 41 deletions(-) create mode 100644 docs/example-events.md create mode 100644 src/events/defaultsink.rs create mode 100644 src/events/filesink.rs create mode 100644 src/events/mod.rs diff --git a/Cargo.lock b/Cargo.lock index b0477e4..2cec224 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -37,6 +37,117 @@ version = "1.0.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61604a8f862e1d5c3229fdd78f8b02c68dcf73a4c4b05fd636d12240aaa242c1" +[[package]] +name = "async-channel" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2114d64672151c0c5eaa5e131ec84a74f06e1e559830dabba01ca30605d66319" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + +[[package]] +name = "async-executor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "871f9bb5e0a22eeb7e8cf16641feb87c9dc67032ccf8ff49e772eb9941d3a965" +dependencies = [ + "async-task", + "concurrent-queue", + "fastrand", + "futures-lite", + "once_cell", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9586ec52317f36de58453159d48351bc244bc24ced3effc1fce22f3d48664af6" +dependencies = [ + "async-channel", + "async-executor", + "async-io", + "async-mutex", + "blocking", + "futures-lite", + "num_cpus", + "once_cell", +] + +[[package]] +name = "async-io" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a811e6a479f2439f0c04038796b5cfb3d2ad56c230e0f2d3f7b04d68cfee607b" +dependencies = [ + "concurrent-queue", + "futures-lite", + "libc", + "log", + "once_cell", + "parking", + "polling", + "slab", + "socket2", + "waker-fn", + "winapi", +] + +[[package]] +name = "async-lock" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6a8ea61bf9947a1007c5cada31e647dbc77b103c679858150003ba697ea798b" +dependencies = [ + "event-listener", +] + +[[package]] +name = "async-mutex" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e" +dependencies = [ + "event-listener", +] + +[[package]] +name = "async-std" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8056f1455169ab86dd47b47391e4ab0cbd25410a70e9fe675544f49bafaf952" +dependencies = [ + "async-channel", + "async-global-executor", + "async-io", + "async-lock", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "num_cpus", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + +[[package]] +name = "async-task" +version = "4.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" + [[package]] name = "async-trait" version = "0.1.51" @@ -48,6 +159,12 @@ dependencies = [ "syn", ] +[[package]] +name = "atomic-waker" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a" + [[package]] name = "atty" version = "0.2.14" @@ -93,6 +210,7 @@ name = "bindle" version = "0.6.0" dependencies = [ "anyhow", + "async-std", "async-trait", "base64 0.13.0", "bcrypt", @@ -163,6 +281,20 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5e170dbede1f740736619b776d7251cb1b9095c435c34d8ca9f57fcd2f335e9" +dependencies = [ + "async-channel", + "async-task", + "atomic-waker", + "fastrand", + "futures-lite", + "once_cell", +] + [[package]] name = "blowfish" version = "0.8.0" @@ -202,6 +334,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +[[package]] +name = "cache-padded" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba" + [[package]] name = "cc" version = "1.0.70" @@ -268,6 +406,15 @@ dependencies = [ "syn", ] +[[package]] +name = "concurrent-queue" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3" +dependencies = [ + "cache-padded", +] + [[package]] name = "core-foundation" version = "0.9.1" @@ -325,6 +472,16 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "ctor" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccc0a48a9b826acdf4028595adc9db92caea352f7af011a3034acd172a52a0aa" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "curve25519-dalek" version = "3.2.0" @@ -411,6 +568,21 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "event-listener" +version = "2.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59" + +[[package]] +name = "fastrand" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b394ed3d285a429378d3b384b9eb1285267e7df4b166df24b7a6939a04dc392e" +dependencies = [ + "instant", +] + [[package]] name = "fnv" version = "1.0.7" @@ -500,6 +672,21 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "522de2a0fe3e380f1bc577ba0474108faf3f6b18321dbf60b3b9c39a75073377" +[[package]] +name = "futures-lite" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.17" @@ -589,6 +776,19 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "gloo-timers" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47204a46aaff920a1ea58b11d03dec6f704287d27561724a4631e450654a891f" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "h2" version = "0.3.4" @@ -838,6 +1038,15 @@ dependencies = [ "simple_asn1", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -866,6 +1075,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" dependencies = [ "cfg-if", + "value-bag", ] [[package]] @@ -1176,6 +1386,12 @@ version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6acbef58a60fe69ab50510a55bc8cdd4d6cf2283d27ad338f54cb52747a9cf2d" +[[package]] +name = "parking" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" + [[package]] name = "parking_lot" version = "0.11.2" @@ -1256,6 +1472,19 @@ version = "0.3.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c9b1041b4387893b91ee6746cddfc28516aff326a3519fb2adf820932c5e6cb" +[[package]] +name = "polling" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92341d779fa34ea8437ef4d82d440d5e1ce3f3ff7f824aa64424cd481f9a1f25" +dependencies = [ + "cfg-if", + "libc", + "log", + "wepoll-ffi", + "winapi", +] + [[package]] name = "ppv-lite86" version = "0.2.10" @@ -2249,6 +2478,16 @@ version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad9680608df133af2c1ddd5eaf1ddce91d60d61b6bc51494ef326458365a470a" +[[package]] +name = "value-bag" +version = "1.0.0-alpha.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd320e1520f94261153e96f7534476ad869c14022aee1e59af7c778075d840ae" +dependencies = [ + "ctor", + "version_check", +] + [[package]] name = "vcpkg" version = "0.2.15" @@ -2267,6 +2506,12 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "want" version = "0.3.0" @@ -2416,6 +2661,15 @@ dependencies = [ "webpki", ] +[[package]] +name = "wepoll-ffi" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d743fdedc5c64377b5fc2bc036b01c7fd642205a0d96356034ae3404d49eb7fb" +dependencies = [ + "cc", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 3a89fd3..2fa743f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,6 +75,7 @@ openid = { version = "0.9.3", optional = true } bcrypt = "0.10.1" chrono = { version = "0.4.19", features = ["serde"], optional = true } either = "1.6.1" +async-std = "1.9.0" # NOTE: This is a workaround due to a dependency issue in oauth2: https://github.com/tkaitchuck/ahash/issues/95#issuecomment-903560879 indexmap = "~1.6.2" diff --git a/bin/server.rs b/bin/server.rs index 4c92387..3dfa851 100644 --- a/bin/server.rs +++ b/bin/server.rs @@ -5,6 +5,7 @@ use clap::Clap; use tracing::{info, warn}; use bindle::{ + events::{defaultsink::DefaultEventSink, filesink::FileEventSink, EventSyncs}, invoice::signature::{KeyRing, SignatureRole}, provider, search, server::{server, TlsConfig}, @@ -149,6 +150,15 @@ struct Opts { )] #[serde(default)] unauthenticated: bool, + + #[clap( + name = "events", + long = "events", + env = "BINDLE_EMIT_EVENTS", + about = "Emit events for changes in binlde data" + )] + #[serde(default)] + emit_events: bool, } #[tokio::main] @@ -253,6 +263,12 @@ async fn main() -> anyhow::Result<()> { ); }; + let eventsink = if config.emit_events { + EventSyncs::FileEventSink(FileEventSink::new(bindle_directory.clone())) + } else { + EventSyncs::DefaultEventSink(DefaultEventSink::new()) + }; + // TODO: This is really gnarly, but the associated type on `Authenticator` makes turning it into // a Boxed dynner really difficult. I also tried rolling our own type erasure and ran into // similar issues (though I think it could be fixed, it would be a lot of code). So we might @@ -279,6 +295,7 @@ async fn main() -> anyhow::Result<()> { secret_store, strategy, keyring, + eventsink, ) .await } @@ -297,6 +314,7 @@ async fn main() -> anyhow::Result<()> { secret_store, strategy, keyring, + eventsink, ) .await } @@ -319,6 +337,7 @@ async fn main() -> anyhow::Result<()> { secret_store, strategy, keyring, + eventsink, ) .await } @@ -336,6 +355,7 @@ async fn main() -> anyhow::Result<()> { secret_store, strategy, keyring, + eventsink, ) .await } @@ -356,6 +376,7 @@ async fn main() -> anyhow::Result<()> { secret_store, strategy, keyring, + eventsink, ) .await } @@ -375,6 +396,7 @@ async fn main() -> anyhow::Result<()> { secret_store, strategy, keyring, + eventsink, ) .await } @@ -455,6 +477,7 @@ async fn merged_opts() -> anyhow::Result { bindle_directory: opts.bindle_directory.or(config.bindle_directory), cert_path: opts.cert_path.or(config.cert_path), config_file: opts.config_file, + emit_events: opts.emit_events || config.emit_events, htpasswd_file: opts.htpasswd_file.or(config.htpasswd_file), unauthenticated: opts.unauthenticated || config.unauthenticated, key_path: opts.key_path.or(config.key_path), diff --git a/docs/README.md b/docs/README.md index c124f69..a946c48 100644 --- a/docs/README.md +++ b/docs/README.md @@ -214,6 +214,29 @@ This file can be moved from system to system, just like OpenPGP or SSH key sets. - To create a signing key for a client, use `bindle create-key` - By default, if Bindle does not find an existing keyring, it creates one of these when it first starts. +## Generating Change Events + +Starting bindle with the `--events` flag will cause it to emit an record of the event to a file named `bindle-event.log` in the bindle data directory. + +Each event is serialized as a JSON object in the following format: +``` json +{ + "event_date": "2021-10-05T20:05:51.028318270Z", + "event_data": { + "EventType": EventSpecificData + } +} +``` + +The following event types are recorded: + +- **InvoiceCreated** - occurs when an invoice is created. +- **MissingParcel** - occurs when an invoice is created, but a parcel is missing. +- **ParcelCreated** - occurs when a parcel is created. +- **InvoiceYanked** - occurs when an invoice is yanked. + +Examples of each event can be found [here](example-events.md). + ## Specification 1. The specification for the Bindle format and design begins with the [Bindle Specification](bindle-spec.md). diff --git a/docs/example-events.md b/docs/example-events.md new file mode 100644 index 0000000..371f763 --- /dev/null +++ b/docs/example-events.md @@ -0,0 +1,98 @@ +# Event Types + +These are examples of events that can be emitted when running bindle with the --events flag. + +## Invoice Created + +```json +{ + "event_date": "2021-10-05T20:05:50.997960746Z", + "event_data": { + "InvoiceCreated": { + "bindleVersion": "1.0.0", + "yanked": null, + "yankedSignature": null, + "bindle": { + "name": "enterprise.com/warpcore", + "version": "1.0.0", + "description": "Warp core components", + "authors": [ + "Geordi La Forge " + ] + }, + "annotations": { + "engineering_location": "main" + }, + "parcel": [ + { + "label": { + "sha256": "23f310b54076878fd4c36f0c60ec92011a8b406349b98dd37d08577d17397de5", + "mediaType": "text/plain", + "name": "isolinear_chip.txt", + "size": 9, + "annotations": null, + "feature": null, + "origin": null + }, + "conditions": null + } + ], + "group": null, + "signature": [ + { + "by": "Test ", + "signature": "OxjhtGwTVDMJScoBDovNB1U52RsD2DyMgQoPIVAew0n4UKyY9Cw8S7KEkSsN6Lj71EFt8QPKO1hg1Tsz26MtBg==", + "key": "ccjHo+plTirpq+QJQ40/vrVjrVIycjMQeoDwdZZsbW8=", + "role": "host", + "at": 1633464350 + } + ] + } + } +} +``` + +## Missing Parcel + +```json +{ + "event_date": "2021-10-05T20:05:51.013871309Z", + "event_data": { + "MissingParcel": { + "sha256": "23f310b54076878fd4c36f0c60ec92011a8b406349b98dd37d08577d17397de5", + "mediaType": "text/plain", + "name": "isolinear_chip.txt", + "size": 9, + "annotations": null, + "feature": null, + "origin": null + } + } +} +``` + +## ParcelCreated + +```json + +{ + "event_date": "2021-10-05T20:05:51.028318270Z", + "event_data": { + "ParcelCreated": [ + "enterprise.com/warpcore/1.0.0", + "23f310b54076878fd4c36f0c60ec92011a8b406349b98dd37d08577d17397de5" + ] + } +} +``` + +## Invoice Yanked + +```json +{ + "event_date": "2021-10-05T20:05:51.028318270Z", + "event_data": { + "InvoiceYanked":"enterprise.com/warpcore/1.0.0" + } +} +``` diff --git a/src/events/defaultsink.rs b/src/events/defaultsink.rs new file mode 100644 index 0000000..b8531e3 --- /dev/null +++ b/src/events/defaultsink.rs @@ -0,0 +1,25 @@ +use crate::events::{EventSink, EventType}; +use serde::Serialize; + +pub struct DefaultEventSink {} +impl DefaultEventSink { + pub fn new() -> Self { + DefaultEventSink {} + } +} + +impl Clone for DefaultEventSink { + fn clone(&self) -> Self { + DefaultEventSink {} + } +} + +#[async_trait::async_trait] +impl EventSink for DefaultEventSink { + async fn raise_event(&self, _: EventType) -> Result<(), Box> + where + T: Serialize + Send + Sync, + { + Ok(()) + } +} diff --git a/src/events/filesink.rs b/src/events/filesink.rs new file mode 100644 index 0000000..65d45f4 --- /dev/null +++ b/src/events/filesink.rs @@ -0,0 +1,112 @@ +use async_std::fs::OpenOptions; +use async_std::io::prelude::*; +use serde::Serialize; +use std::path::PathBuf; +use tracing::info; + +use chrono::prelude::*; + +use crate::events::{EventSink, EventType}; + +pub const EVENT_SINK_FILE_NAME: &str = "bindle_event.log"; +pub struct FileEventSink { + file: PathBuf, +} + +impl FileEventSink { + pub fn new(directory: PathBuf) -> Self { + info!("Using FileEventSink"); + Self { + file: directory.join(EVENT_SINK_FILE_NAME), + } + } +} + +impl Clone for FileEventSink { + fn clone(&self) -> Self { + FileEventSink { + file: self.file.clone(), + } + } +} + +#[derive(Serialize)] +struct EventLog { + event_date: DateTime, + event_data: EventType, +} + +#[async_trait::async_trait] +impl EventSink for FileEventSink { + async fn raise_event(&self, event: EventType) -> Result<(), Box> + where + T: Serialize + Send + Sync, + { + let data = EventLog { + event_date: Utc::now(), + event_data: event, + }; + + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(&self.file) + .await?; + let mut buf = serde_json::to_vec_pretty(&data)?; + buf.push(b'\n'); + file.write_all(&buf).await?; + file.flush().await?; + Ok(()) + } +} + +#[cfg(test)] + +mod test { + + use super::*; + use async_std::fs::File; + use tempfile::tempdir; + + use crate::testing::{BindleEvent, InvoiceYankedEventData}; + + #[tokio::test] + async fn test_should_create_and_write_to_file() { + let temp = tempdir().expect("unable to create tempdir"); + let directory = PathBuf::from(temp.path()); + let filename = directory.join(EVENT_SINK_FILE_NAME); + let sink = FileEventSink::new(directory); + sink.raise_event(EventType::InvoiceYanked("test")) + .await + .unwrap(); + let mut file = File::open(filename).await.unwrap(); + let mut buf = String::new(); + file.read_to_string(&mut buf).await.unwrap(); + let event: BindleEvent = serde_json::from_str(&buf).unwrap(); + assert_eq!("test", event.event_data.invoice_yanked); + } + + #[tokio::test] + async fn test_should_create_and_append_to_a_file() { + let temp = tempdir().expect("unable to create tempdir"); + let directory = PathBuf::from(temp.path()); + let filename = directory.join(EVENT_SINK_FILE_NAME); + let sink = FileEventSink::new(directory); + sink.raise_event(EventType::InvoiceYanked("test0")) + .await + .unwrap(); + sink.raise_event(EventType::InvoiceYanked("test1")) + .await + .unwrap(); + let mut file = File::open(filename).await.unwrap(); + let mut buf = String::new(); + file.read_to_string(&mut buf).await.unwrap(); + let deserializer = serde_json::Deserializer::from_str(&buf); + let iterator = deserializer.into_iter::(); + for (i, event) in iterator.enumerate() { + let event: BindleEvent = + serde_json::from_value(event.unwrap()).unwrap(); + assert_eq!(format!("test{}", i), event.event_data.invoice_yanked); + } + } +} diff --git a/src/events/mod.rs b/src/events/mod.rs new file mode 100644 index 0000000..9c989c5 --- /dev/null +++ b/src/events/mod.rs @@ -0,0 +1,54 @@ +//! Types and traits for use in emitting change notifications +pub mod defaultsink; +pub mod filesink; +use super::events::defaultsink::DefaultEventSink; +use super::events::filesink::FileEventSink; +use serde::Serialize; + +#[derive(Debug, Serialize)] +pub enum EventType { + InvoiceCreated(T), + MissingParcel(T), + InvoiceYanked(T), + ParcelCreated(T), +} + +#[async_trait::async_trait] +pub trait EventSink { + // Emits an event. + async fn raise_event(&self, _: EventType) -> Result<(), Box> + where + T: Serialize + Send + Sync; +} + +// This enum exists to enable the type of event sync to be dynamically selected at runtime depending on arguments passed +// to the program. +// It seems like the normal approach of using trait objects does not work as the trait is not object safe as it has a generic type parameter. +// parameter, this feels like a bit of a hack, but it's the only way I can see to do this. + +pub enum EventSyncs { + FileEventSink(FileEventSink), + DefaultEventSink(DefaultEventSink), +} + +impl Clone for EventSyncs { + fn clone(&self) -> Self { + match self { + EventSyncs::FileEventSink(sink) => EventSyncs::FileEventSink(sink.clone()), + EventSyncs::DefaultEventSink(sink) => EventSyncs::DefaultEventSink(sink.clone()), + } + } +} + +#[async_trait::async_trait] +impl EventSink for EventSyncs { + async fn raise_event(&self, event: EventType) -> Result<(), Box> + where + T: Serialize + Send + Sync, + { + match self { + EventSyncs::FileEventSink(sink) => sink.raise_event(event).await, + EventSyncs::DefaultEventSink(sink) => sink.raise_event(event).await, + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 5e1711b..2afd7a6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,6 +31,7 @@ pub mod testing; pub mod authn; #[cfg(feature = "server")] pub mod authz; +pub mod events; pub mod filters; #[doc(inline)] diff --git a/src/server/handlers.rs b/src/server/handlers.rs index d5554a2..817001a 100644 --- a/src/server/handlers.rs +++ b/src/server/handlers.rs @@ -13,6 +13,7 @@ pub mod v1 { use super::*; use crate::{ + events::{EventSink, EventType}, signature::{KeyRing, SecretKeyStorage}, LoginParams, QueryOptions, SignatureError, }; @@ -57,10 +58,11 @@ pub mod v1 { )) } - #[instrument(level = "trace", skip(store, secret_store))] - pub async fn create_invoice( + #[instrument(level = "trace", skip(store, secret_store, eventsink))] + pub async fn create_invoice( store: P, secret_store: S, + eventsink: E, strategy: VerificationStrategy, keyring: std::sync::Arc, inv: crate::Invoice, @@ -98,6 +100,14 @@ pub mod v1 { return Ok(reply::into_reply(e)); } }; + + eventsink + .raise_event(EventType::InvoiceCreated(&invoice)) + .await + .unwrap_or_else(|err| { + tracing::log::error!("Failed to raise event InvoiceCreated: {}", err) + }); + // If there are missing parcels that still need to be created, return a 202 to indicate that // things were accepted, but will not be fetchable until further action is taken if !labels.is_empty() { @@ -106,6 +116,14 @@ pub mod v1 { missing = labels.len(), "Newly created invoice is missing parcels", ); + for label in &labels { + eventsink + .raise_event(EventType::MissingParcel(label)) + .await + .unwrap_or_else(|err| { + tracing::log::error!("Failed to raise event MissingParcel: {}", err) + }); + } Ok(warp::reply::with_status( reply::serialized_data( &crate::InvoiceCreateResponse { @@ -162,10 +180,11 @@ pub mod v1 { Ok::, Infallible>(res) } - #[instrument(level = "trace", skip(store), fields(id = tail.as_str()))] - pub async fn yank_invoice( + #[instrument(level = "trace", skip(store, eventsink), fields(id = tail.as_str()))] + pub async fn yank_invoice( tail: warp::path::Tail, store: P, + eventsink: E, accept_header: Option, ) -> Result { let id = tail.as_str(); @@ -174,6 +193,13 @@ pub mod v1 { return Ok(reply::into_reply(e)); } + eventsink + .raise_event(EventType::InvoiceYanked(id)) + .await + .unwrap_or_else(|err| { + tracing::log::error!("Failed to raise event InvoiceYanked: {}", err) + }); + let mut resp = std::collections::HashMap::new(); resp.insert("message", "invoice yanked"); Ok(warp::reply::with_status( @@ -201,17 +227,19 @@ pub mod v1 { } //////////// Parcel Functions //////////// - #[instrument(level = "trace", skip(store, body))] - pub async fn create_parcel( + #[instrument(level = "trace", skip(store, body, eventsink))] + pub async fn create_parcel( (bindle_id, sha): (String, String), body: B, store: P, + eventsink: E, accept_header: Option, ) -> Result where P: Provider + Sync, B: stream::Stream> + Send + Sync + Unpin + 'static, D: bytes::Buf + Send, + E: EventSink + Send + Sync, { trace!("Checking if parcel exists in bindle"); @@ -222,7 +250,7 @@ pub mod v1 { if let Err(e) = store .create_parcel( - bindle_id, + &bindle_id, &sha, body.map(|res| { res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string())) @@ -234,6 +262,13 @@ pub mod v1 { return Ok(reply::into_reply(e)); } + eventsink + .raise_event(EventType::ParcelCreated((bindle_id, &sha))) + .await + .unwrap_or_else(|err| { + tracing::log::error!("Failed to raise event ParcelCreated: {}", err) + }); + let mut resp = std::collections::HashMap::new(); resp.insert("message", "parcel created"); Ok(warp::reply::with_status( diff --git a/src/server/mod.rs b/src/server/mod.rs index 29059d5..1c68aea 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -14,6 +14,7 @@ use std::path::PathBuf; use tracing::debug; use super::provider::Provider; +use crate::events::EventSink; use crate::signature::KeyRing; use crate::{search::Search, signature::SecretKeyStorage}; @@ -30,9 +31,10 @@ pub struct TlsConfig { /// configuration is given, the server will be configured to use TLS. Otherwise it will use plain /// HTTP #[allow(clippy::too_many_arguments)] -pub async fn server( +pub async fn server( store: P, index: I, + authn: Authn, authz: Authz, addr: impl Into + 'static, @@ -40,6 +42,7 @@ pub async fn server( keystore: S, verification_strategy: crate::VerificationStrategy, keyring: KeyRing, + eventsink: E, ) -> anyhow::Result<()> where P: Provider + Clone + Send + Sync + 'static, @@ -47,6 +50,7 @@ where S: SecretKeyStorage + Clone + Send + Sync + 'static, Authn: crate::authn::Authenticator + Clone + Send + Sync + 'static, Authz: crate::authz::Authorizer + Clone + Send + Sync + 'static, + E: EventSink + Clone + Send + Sync + 'static, { // V1 API paths, currently the only version let api = routes::api( @@ -57,6 +61,7 @@ where keystore, verification_strategy, keyring, + eventsink, ); let server = warp::serve(api); @@ -94,19 +99,28 @@ async fn shutdown_signal() { #[cfg(test)] mod test { + use async_std::fs::File; + use async_std::prelude::*; + use chrono::naive::serde::ts_milliseconds::deserialize; use std::convert::TryInto; + use std::path::PathBuf; use crate::authn::always::AlwaysAuthenticate; use crate::authz::always::AlwaysAuthorize; + use crate::events::{defaultsink::DefaultEventSink, filesink::FileEventSink}; use crate::invoice::{ signature::{KeyRing, SecretKeyEntry}, SignatureRole, VerificationStrategy, }; use crate::provider::Provider; use crate::search::StrictEngine; - use crate::testing::{self, MockKeyStore}; + use crate::testing::{ + self, BindleEvent, InvoiceCreatedEventData, InvoiceYankedEventData, MissingParcelEventData, + MockKeyStore, ParcelCreatedEventData, + }; use rstest::rstest; + use tempfile::tempdir; use testing::Scaffold; use tokio_util::codec::{BytesCodec, FramedRead}; @@ -115,12 +129,16 @@ mod test { async fn test_successful_workflow( #[values(testing::setup(), testing::setup_embedded())] #[future] - provider_setup: (T, StrictEngine, MockKeyStore), + provider_setup: (T, StrictEngine, MockKeyStore, DefaultEventSink), ) where T: Provider + Clone + Send + Sync + 'static, { let bindles = testing::load_all_files().await; - let (store, index, ks) = provider_setup.await; + let (store, index, ks, _) = provider_setup.await; + let temp = tempdir().expect("unable to create tempdir"); + let directory = PathBuf::from(temp.path()); + let filename = directory.join(crate::events::filesink::EVENT_SINK_FILE_NAME); + let es = FileEventSink::new(directory); let api = super::routes::api( store, @@ -130,6 +148,7 @@ mod test { ks, VerificationStrategy::default(), KeyRing::default(), + es.clone(), ); // Now that we can't upload parcels before invoices exist, we need to create a bindle that shares some parcels @@ -158,6 +177,36 @@ mod test { "Invoice should have missing parcels" ); + // Check that we got a Invoice Created event and a missing parcel event. + + let mut file = File::open(filename).await.unwrap(); + let mut buf = [0u8; 4096]; + let bytes = file.read(&mut buf).await.unwrap(); + assert!( + bytes < buf.len(), + "Should have read less than the buffer size" + ); + let json_string = String::from_utf8_lossy(&buf); + let deserializer = serde_json::Deserializer::from_str(&json_string); + let mut iterator = deserializer.into_iter::(); + + let inv = iterator.next().unwrap().unwrap(); + let event: BindleEvent = serde_json::from_value(inv).unwrap(); + assert_eq!( + "enterprise.com/warpcore", + event.event_data.invoice_created.bindle.name + ); + assert_eq!("1.0.0", event.event_data.invoice_created.bindle.version); + + let missing = iterator.next().unwrap().unwrap(); + let event: BindleEvent = serde_json::from_value(missing).unwrap(); + assert_eq!( + "23f310b54076878fd4c36f0c60ec92011a8b406349b98dd37d08577d17397de5", + event.event_data.missing_parcel.sha256 + ); + assert_eq!("text/plain", event.event_data.missing_parcel.media_type); + assert_eq!("isolinear_chip.txt", event.event_data.missing_parcel.name); + // Upload the parcels for one of the invoices for file in valid_v1.parcel_files.values() { @@ -178,6 +227,28 @@ mod test { ); } + // Check that we got a parcel created event for the parcel. + + let bytes = file.read(&mut buf).await.unwrap(); + assert!( + bytes < buf.len(), + "Should have read less than the buffer size" + ); + let json_string = String::from_utf8_lossy(&buf); + let deserializer = serde_json::Deserializer::from_str(&json_string); + let mut iterator = deserializer.into_iter::(); + + let parcel = iterator.next().unwrap().unwrap(); + let event: BindleEvent = serde_json::from_value(parcel).unwrap(); + assert_eq!( + "enterprise.com/warpcore/1.0.0", + event.event_data.parcel_created[0] + ); + assert_eq!( + "23f310b54076878fd4c36f0c60ec92011a8b406349b98dd37d08577d17397de5", + event.event_data.parcel_created[1] + ); + // Now create another invoice that references the same parcel and validated that it returns // the proper response @@ -207,6 +278,25 @@ mod test { "Invoice should not have missing parcels" ); + // Check that we got a Invoice Created event. + + let bytes = file.read(&mut buf).await.unwrap(); + assert!( + bytes < buf.len(), + "Should have read less than the buffer size" + ); + let json_string = String::from_utf8_lossy(&buf); + let deserializer = serde_json::Deserializer::from_str(&json_string); + let mut iterator = deserializer.into_iter::(); + + let inv = iterator.next().unwrap().unwrap(); + let event: BindleEvent = serde_json::from_value(inv).unwrap(); + assert_eq!( + "another.com/bindle", + event.event_data.invoice_created.bindle.name + ); + assert_eq!("1.0.0", event.event_data.invoice_created.bindle.version); + // Create a second version of the same invoice with some missing and already existing // parcels and make sure the correct response is returned let valid_v2 = bindles.get("valid_v2").expect("Missing scaffold"); @@ -234,8 +324,27 @@ mod test { .expect("Should have missing parcels") .len(), 1, - "Invoice should not have missing parcels" + "Invoice should have missing parcels" + ); + + // Check that we got a Invoice Created event and a missing parcel event. + + let bytes = file.read(&mut buf).await.unwrap(); + assert!( + bytes < buf.len(), + "Should have read less than the buffer size" + ); + let json_string = String::from_utf8_lossy(&buf); + let deserializer = serde_json::Deserializer::from_str(&json_string); + let mut iterator = deserializer.into_iter::(); + + let inv = iterator.next().unwrap().unwrap(); + let event: BindleEvent = serde_json::from_value(inv).unwrap(); + assert_eq!( + "enterprise.com/warpcore", + event.event_data.invoice_created.bindle.name ); + assert_eq!("2.0.0", event.event_data.invoice_created.bindle.version); // Get an invoice let res = warp::test::request() @@ -286,11 +395,15 @@ mod test { async fn test_yank( #[values(testing::setup(), testing::setup_embedded())] #[future] - provider_setup: (T, StrictEngine, MockKeyStore), + provider_setup: (T, StrictEngine, MockKeyStore, DefaultEventSink), ) where T: Provider + Clone + Send + Sync + 'static, { - let (store, index, ks) = provider_setup.await; + let (store, index, ks, _) = provider_setup.await; + let temp = tempdir().expect("unable to create tempdir"); + let directory = PathBuf::from(temp.path()); + let filename = directory.join(crate::events::filesink::EVENT_SINK_FILE_NAME); + let es = FileEventSink::new(directory); let api = super::routes::api( store.clone(), @@ -300,6 +413,7 @@ mod test { ks, VerificationStrategy::default(), KeyRing::default(), + es.clone(), ); let sk = SecretKeyEntry::new("test".to_owned(), vec![SignatureRole::Host]); @@ -330,6 +444,26 @@ mod test { String::from_utf8_lossy(res.body()) ); + // Check that we got a Invoice Yanked Event. + + let mut file = File::open(filename).await.unwrap(); + let mut buf = [0u8; 4096]; + let bytes = file.read(&mut buf).await.unwrap(); + assert!( + bytes < buf.len(), + "Should have read less than the buffer size" + ); + let json_string = String::from_utf8_lossy(&buf); + let deserializer = serde_json::Deserializer::from_str(&json_string); + let mut iterator = deserializer.into_iter::(); + + let inv = iterator.next().unwrap().unwrap(); + let event: BindleEvent = serde_json::from_value(inv).unwrap(); + assert_eq!( + "enterprise.com/bridge/1.0.0", + event.event_data.invoice_yanked + ); + // Attempt to fetch the invoice and make sure it doesn't return let res = warp::test::request().path(&inv_path).reply(&api).await; @@ -362,12 +496,12 @@ mod test { async fn test_invoice_validation( #[values(testing::setup(), testing::setup_embedded())] #[future] - provider_setup: (T, StrictEngine, MockKeyStore), + provider_setup: (T, StrictEngine, MockKeyStore, DefaultEventSink), ) where T: Provider + Clone + Send + Sync + 'static, { let bindles = testing::load_all_files().await; - let (store, index, ks) = provider_setup.await; + let (store, index, ks, es) = provider_setup.await; let api = super::routes::api( store.clone(), @@ -377,6 +511,7 @@ mod test { ks, VerificationStrategy::default(), KeyRing::default(), + es.clone(), ); let valid_raw = bindles.get("valid_v1").expect("Missing scaffold"); let valid = testing::Scaffold::from(valid_raw.clone()); @@ -413,11 +548,11 @@ mod test { async fn test_parcel_validation( #[values(testing::setup(), testing::setup_embedded())] #[future] - provider_setup: (T, StrictEngine, MockKeyStore), + provider_setup: (T, StrictEngine, MockKeyStore, DefaultEventSink), ) where T: Provider + Clone + Send + Sync + 'static, { - let (store, index, keystore) = provider_setup.await; + let (store, index, keystore, es) = provider_setup.await; let api = super::routes::api( store.clone(), @@ -427,6 +562,7 @@ mod test { keystore.clone(), VerificationStrategy::default(), KeyRing::default(), + es.clone(), ); // Insert a parcel let scaffold = testing::Scaffold::load("valid_v1").await; @@ -510,12 +646,12 @@ mod test { async fn test_queries( #[values(testing::setup(), testing::setup_embedded())] #[future] - provider_setup: (T, StrictEngine, MockKeyStore), + provider_setup: (T, StrictEngine, MockKeyStore, DefaultEventSink), ) where T: Provider + Clone + Send + Sync + 'static, { // Insert data into store - let (store, index, ks) = provider_setup.await; + let (store, index, ks, es) = provider_setup.await; let api = super::routes::api( store.clone(), @@ -525,6 +661,7 @@ mod test { ks, VerificationStrategy::default(), KeyRing::default(), + es.clone(), ); let bindles_to_insert = vec!["incomplete", "valid_v1", "valid_v2"]; @@ -625,11 +762,11 @@ mod test { async fn test_missing( #[values(testing::setup(), testing::setup_embedded())] #[future] - provider_setup: (T, StrictEngine, MockKeyStore), + provider_setup: (T, StrictEngine, MockKeyStore, DefaultEventSink), ) where T: Provider + Clone + Send + Sync + 'static, { - let (store, index, ks) = provider_setup.await; + let (store, index, ks, es) = provider_setup.await; let api = super::routes::api( store.clone(), @@ -639,6 +776,7 @@ mod test { ks, VerificationStrategy::default(), KeyRing::default(), + es.clone(), ); let scaffold = testing::Scaffold::load("lotsa_parcels").await; @@ -705,11 +843,11 @@ mod test { async fn test_host_signed( #[values(testing::setup(), testing::setup_embedded())] #[future] - provider_setup: (T, StrictEngine, MockKeyStore), + provider_setup: (T, StrictEngine, MockKeyStore, DefaultEventSink), ) where T: Provider + Clone + Send + Sync + 'static, { - let (store, index, ks) = provider_setup.await; + let (store, index, ks, es) = provider_setup.await; let api = super::routes::api( store, @@ -719,6 +857,7 @@ mod test { ks, VerificationStrategy::default(), KeyRing::default(), + es.clone(), ); let scaffold = testing::RawScaffold::load("valid_v1").await; @@ -756,11 +895,11 @@ mod test { async fn test_anonymous_get( #[values(testing::setup(), testing::setup_embedded())] #[future] - provider_setup: (T, StrictEngine, MockKeyStore), + provider_setup: (T, StrictEngine, MockKeyStore, DefaultEventSink), ) where T: Provider + Clone + Send + Sync + 'static, { - let (store, index, ks) = provider_setup.await; + let (store, index, ks, es) = provider_setup.await; let api = super::routes::api( store, @@ -772,6 +911,7 @@ mod test { ks, VerificationStrategy::default(), KeyRing::default(), + es.clone(), ); let scaffold = testing::RawScaffold::load("valid_v1").await; diff --git a/src/server/routes.rs b/src/server/routes.rs index 007ac41..3602d75 100644 --- a/src/server/routes.rs +++ b/src/server/routes.rs @@ -2,11 +2,11 @@ use std::sync::Arc; use warp::Filter; -use crate::{server::filters, signature::KeyRing}; +use crate::{events::EventSink, server::filters, signature::KeyRing}; /// A helper function that aggregates all routes into a complete API filter. If you only wish to /// serve specific endpoints or versions, you can assemble them with the individual submodules -pub fn api( +pub fn api( store: P, index: I, authn: Authn, @@ -14,6 +14,7 @@ pub fn api( secret_store: S, verification_strategy: crate::VerificationStrategy, keyring: KeyRing, + eventsink: E, ) -> impl Filter + Clone where P: crate::provider::Provider + Clone + Send + Sync + 'static, @@ -21,6 +22,7 @@ where S: crate::invoice::signature::SecretKeyStorage + Clone + Send + Sync + 'static, Authn: crate::authn::Authenticator + Clone + Send + Sync + 'static, Authz: crate::authz::Authorizer + Clone + Send + Sync + 'static, + E: EventSink + Clone + Send + Sync + 'static, { let health = warp::path("healthz").map(|| "OK"); @@ -36,6 +38,7 @@ where secret_store.clone(), verification_strategy.clone(), wrapped_keyring.clone(), + eventsink.clone(), )) .boxed() .or(v1::invoice::create_json( @@ -43,15 +46,16 @@ where secret_store, verification_strategy, wrapped_keyring, + eventsink.clone(), )) .boxed() .or(v1::invoice::get(store.clone())) .boxed() .or(v1::invoice::head(store.clone())) .boxed() - .or(v1::invoice::yank(store.clone())) + .or(v1::invoice::yank(store.clone(), eventsink.clone())) .boxed() - .or(v1::parcel::create(store.clone())) + .or(v1::parcel::create(store.clone(), eventsink.clone())) .boxed() .or(v1::parcel::get(store.clone())) .boxed() @@ -78,7 +82,7 @@ pub mod v1 { use crate::provider::Provider; use crate::search::Search; use crate::server::handlers::v1::*; - use crate::server::{filters, routes::with_store}; + use crate::server::{filters, routes::with_eventsink, routes::with_store}; use warp::Filter; @@ -106,6 +110,7 @@ pub mod v1 { pub mod invoice { use crate::{ + events::EventSink, server::routes::with_secret_store, signature::{KeyRing, SecretKeyStorage}, }; @@ -128,21 +133,24 @@ pub mod v1 { .and_then(query_invoices) } - pub fn create_toml( + pub fn create_toml( store: P, secret_store: S, verification_strategy: crate::VerificationStrategy, keyring: Arc, + eventsink: E, ) -> impl Filter + Clone where P: Provider + Clone + Send + Sync, S: SecretKeyStorage + Clone + Send + Sync, + E: EventSink + Clone + Send + Sync, { warp::path("_i") .and(warp::path::end()) .and(warp::post()) .and(with_store(store)) .and(with_secret_store(secret_store)) + .and(with_eventsink(eventsink)) .and(warp::any().map(move || verification_strategy.clone())) .and(warp::any().map(move || keyring.clone())) .and(filters::toml()) @@ -150,21 +158,24 @@ pub mod v1 { .and_then(create_invoice) .recover(filters::handle_deserialize_rejection) } - pub fn create_json( + pub fn create_json( store: P, secret_store: S, verification_strategy: crate::VerificationStrategy, keyring: Arc, + eventsink: E, ) -> impl Filter + Clone where P: Provider + Clone + Send + Sync, S: SecretKeyStorage + Clone + Send + Sync, + E: EventSink + Clone + Send + Sync, { warp::path("_i") .and(warp::path::end()) .and(warp::post()) .and(with_store(store)) .and(with_secret_store(secret_store)) + .and(with_eventsink(eventsink)) .and(warp::any().map(move || verification_strategy.clone())) .and(warp::any().map(move || keyring.clone())) .and(warp::body::json()) @@ -202,16 +213,19 @@ pub mod v1 { .and_then(head_invoice) } - pub fn yank

( + pub fn yank( store: P, + eventsink: E, ) -> impl Filter + Clone where P: Provider + Clone + Send + Sync, + E: EventSink + Clone + Send + Sync, { warp::path("_i") .and(warp::path::tail()) .and(warp::delete()) .and(with_store(store)) + .and(with_eventsink(eventsink)) .and(warp::header::optional::("accept")) .and_then(yank_invoice) } @@ -219,17 +233,21 @@ pub mod v1 { pub mod parcel { use super::*; + use crate::events::EventSink; - pub fn create

( + pub fn create( store: P, + eventsink: E, ) -> impl Filter + Clone where P: Provider + Clone + Send + Sync, + E: EventSink + Clone + Send + Sync, { filters::parcel() .and(warp::post()) .and(warp::body::stream()) .and(with_store(store)) + .and(with_eventsink(eventsink)) .and(warp::header::optional::("accept")) .and_then(create_parcel) } @@ -299,3 +317,13 @@ where // We have to clone for this to be Fn instead of FnOnce warp::any().map(move || store.clone()) } + +pub(crate) fn with_eventsink( + eventsink: E, +) -> impl Filter + Clone +where + E: crate::events::EventSink + Clone + Send, +{ + // We have to clone for this to be Fn instead of FnOnce + warp::any().map(move || eventsink.clone()) +} diff --git a/src/testing/mod.rs b/src/testing/mod.rs index fb443c2..e48f04d 100644 --- a/src/testing/mod.rs +++ b/src/testing/mod.rs @@ -11,10 +11,12 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; +use crate::events::defaultsink::DefaultEventSink; use crate::invoice::signature::{SecretKeyEntry, SecretKeyStorage, SignatureRole}; use crate::provider::embedded::EmbeddedProvider; use crate::provider::file::FileProvider; use crate::search::StrictEngine; +use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use tempfile::tempdir; @@ -153,25 +155,37 @@ impl From for Scaffold { } /// Returns a file `Provider` implementation configured with a temporary directory, strict Search -/// implementation, and a mock key store for use in testing API endpoints -pub async fn setup() -> (FileProvider, StrictEngine, MockKeyStore) { +/// implementation,a mock key store and a default noop event sink for use in testing API endpoints +pub async fn setup() -> ( + FileProvider, + StrictEngine, + MockKeyStore, + DefaultEventSink, +) { let temp = tempdir().expect("unable to create tempdir"); let index = StrictEngine::default(); let store = FileProvider::new(temp.path().to_owned(), index.clone()).await; let kstore = MockKeyStore::new(); - (store, index, kstore) + let eventsink = DefaultEventSink::new(); + (store, index, kstore, eventsink) } /// Returns an embedded `Provider` implementation configured with a temporary directory, strict -/// Search implementation, and a mock key store for use in testing API endpoints -pub async fn setup_embedded() -> (EmbeddedProvider, StrictEngine, MockKeyStore) { +/// Search implementation, a mock key store and a default noop event sink for use in testing API endpoints +pub async fn setup_embedded() -> ( + EmbeddedProvider, + StrictEngine, + MockKeyStore, + DefaultEventSink, +) { let temp = tempdir().expect("unable to create tempdir"); let index = StrictEngine::default(); let store = EmbeddedProvider::new(temp.path().to_owned(), index.clone()) .await .expect("Unable to configure embedded provider"); let kstore = MockKeyStore::new(); - (store, index, kstore) + let eventsink = DefaultEventSink::new(); + (store, index, kstore, eventsink) } /// Loads all scaffolds in the scaffolds directory, returning them as a hashmap with the directory @@ -267,3 +281,54 @@ impl SecretKeyStorage for MockKeyStore { Some(&self.mock_secret_key) } } + +#[derive(Serialize, Deserialize)] +pub struct InvoiceYankedEventData { + #[serde(rename = "InvoiceYanked")] + pub invoice_yanked: String, +} + +#[derive(Serialize, Deserialize)] +pub struct MissingParcelEventData { + #[serde(rename = "MissingParcel")] + pub missing_parcel: ParcelEventData, +} + +#[derive(Serialize, Deserialize)] +pub struct ParcelCreatedEventData { + #[serde(rename = "ParcelCreated")] + pub parcel_created: Vec, +} + +#[derive(Serialize, Deserialize)] +pub struct InvoiceCreatedEventData { + #[serde(rename = "InvoiceCreated")] + pub invoice_created: InvoiceEventData, +} + +#[derive(Serialize, Deserialize)] +pub struct InvoiceEventData { + #[serde(rename = "bindleVersion")] + pub bindle_version: String, + pub bindle: BindleEventData, +} + +#[derive(Serialize, Deserialize)] +pub struct BindleEventData { + pub name: String, + pub version: String, +} + +#[derive(Serialize, Deserialize)] +pub struct ParcelEventData { + pub sha256: String, + pub name: String, + #[serde(rename = "mediaType")] + pub media_type: String, +} + +#[derive(Serialize, Deserialize)] +pub struct BindleEvent { + pub event_date: String, + pub event_data: T, +}