From 4be310b19b9b862abd035ebe7c213bad6e5a5ec1 Mon Sep 17 00:00:00 2001 From: Alexander Shishenko Date: Wed, 25 Dec 2024 01:54:55 +0300 Subject: [PATCH] Prepare Recipient for recipient catalog --- Cargo.lock | 1 + notifico-app/src/main.rs | 40 +++++++------- notifico-core/Cargo.toml | 1 + notifico-core/src/engine/plugin/core.rs | 55 +++++++++++-------- notifico-core/src/lib.rs | 1 - notifico-core/src/pipeline/event.rs | 9 +-- notifico-core/src/{ => recipient}/contact.rs | 0 notifico-core/src/recipient/controller.rs | 36 ++++++++++++ .../src/{recipient.rs => recipient/mod.rs} | 6 +- notifico-core/src/simpletransport.rs | 2 +- notificox/src/main.rs | 8 ++- transports/notifico-gotify/src/lib.rs | 2 +- transports/notifico-ntfy/src/lib.rs | 2 +- transports/notifico-pushover/src/lib.rs | 2 +- transports/notifico-slack/src/lib.rs | 2 +- transports/notifico-smpp/src/lib.rs | 2 +- transports/notifico-smtp/src/lib.rs | 2 +- transports/notifico-telegram/src/contact.rs | 2 +- transports/notifico-telegram/src/lib.rs | 2 +- transports/notifico-whatsapp/src/lib.rs | 2 +- 20 files changed, 113 insertions(+), 64 deletions(-) rename notifico-core/src/{ => recipient}/contact.rs (100%) create mode 100644 notifico-core/src/recipient/controller.rs rename notifico-core/src/{recipient.rs => recipient/mod.rs} (92%) diff --git a/Cargo.lock b/Cargo.lock index e92af4e..6eee2d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2050,6 +2050,7 @@ dependencies = [ "async-trait", "axum", "flume", + "futures", "regex", "sea-orm", "serde", diff --git a/notifico-app/src/main.rs b/notifico-app/src/main.rs index 9b7dfc5..fa864bc 100644 --- a/notifico-app/src/main.rs +++ b/notifico-app/src/main.rs @@ -16,6 +16,7 @@ use notifico_core::http::SecretKey; use notifico_core::pipeline::event::EventHandler; use notifico_core::pipeline::executor::PipelineExecutor; use notifico_core::queue::{ReceiverChannel, SenderChannel}; +use notifico_core::recipient::RecipientInlineController; use notifico_core::recorder::BaseRecorder; use notifico_core::transport::TransportRegistry; use notifico_dbpipeline::DbPipelineStorage; @@ -197,28 +198,29 @@ async fn main() { if components.is_empty() || components.contains(COMPONENT_WORKER) { // Create Engine with plugins - let mut engine = Some(Engine::new()); - if let Some(engine) = engine.as_mut() { - engine.add_plugin(Arc::new(CorePlugin::new(pipelines_tx.clone()))); - engine.add_plugin(Arc::new(Templater::new(templater_source.clone()))); - engine.add_plugin(subman.clone()); - - let attachment_plugin = Arc::new(AttachmentPlugin::new(false)); - engine.add_plugin(attachment_plugin.clone()); - - let mut transport_registry = TransportRegistry::new(); - for (engine_plugin, transport_plugin) in all_transports( - credentials.clone(), - recorder.clone(), - attachment_plugin.clone(), - ) { - engine.add_plugin(engine_plugin); - transport_registry.register(transport_plugin); - } + let mut engine = Engine::new(); + engine.add_plugin(Arc::new(CorePlugin::new( + pipelines_tx.clone(), + Arc::new(RecipientInlineController), + ))); + engine.add_plugin(Arc::new(Templater::new(templater_source.clone()))); + engine.add_plugin(subman.clone()); + + let attachment_plugin = Arc::new(AttachmentPlugin::new(false)); + engine.add_plugin(attachment_plugin.clone()); + + let mut transport_registry = TransportRegistry::new(); + for (engine_plugin, transport_plugin) in all_transports( + credentials.clone(), + recorder.clone(), + attachment_plugin.clone(), + ) { + engine.add_plugin(engine_plugin); + transport_registry.register(transport_plugin); } // Main loop - let executor = Arc::new(PipelineExecutor::new(engine.unwrap())); + let executor = Arc::new(PipelineExecutor::new(engine)); let event_handler = Arc::new(EventHandler::new(pipelines.clone(), pipelines_tx.clone())); diff --git a/notifico-core/Cargo.toml b/notifico-core/Cargo.toml index 5096c02..b9ee593 100644 --- a/notifico-core/Cargo.toml +++ b/notifico-core/Cargo.toml @@ -19,3 +19,4 @@ flume = "0.11.1" thiserror = "2.0.6" regex = "1.11.1" serde_with = "3.11.0" +futures = "0.3.31" diff --git a/notifico-core/src/engine/plugin/core.rs b/notifico-core/src/engine/plugin/core.rs index 5730a96..3cef2ee 100644 --- a/notifico-core/src/engine/plugin/core.rs +++ b/notifico-core/src/engine/plugin/core.rs @@ -3,8 +3,10 @@ use crate::error::EngineError; use crate::pipeline::event::RecipientSelector; use crate::pipeline::executor::PipelineTask; use crate::queue::SenderChannel; +use crate::recipient::RecipientController; use crate::step::SerializedStep; use async_trait::async_trait; +use futures::StreamExt; use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::sync::Arc; @@ -13,11 +15,18 @@ use uuid::Uuid; pub struct CorePlugin { pipeline_sender: Arc, + recipient_controller: Arc, } impl CorePlugin { - pub fn new(pipeline_sender: Arc) -> Self { - Self { pipeline_sender } + pub fn new( + pipeline_sender: Arc, + recipient_controller: Arc, + ) -> Self { + Self { + pipeline_sender, + recipient_controller, + } } } @@ -31,21 +40,16 @@ impl EnginePlugin for CorePlugin { let step: Step = step.convert_step()?; match step { - Step::SetRecipients { recipients } => match recipients.len() { - 0 => Ok(StepOutput::Continue), - 1 => { - debug!("Single recipient; no fork"); - let recipient = &recipients[0].clone().resolve(); - context.recipient = Some(recipient.clone()); - Ok(StepOutput::Continue) - } - n => { - debug!("Multiple recipients: {n}; fork"); - for recipient in recipients { - let recipient = recipient.resolve(); + Step::SetRecipients { recipients } => { + let mut recipients = self.recipient_controller.get_recipients(recipients).await?; + let mut recipient_number = 0; + while let Some(recipient) = recipients.next().await { + recipient_number += 1; + if recipient_number == 1 { + context.recipient = Some(recipient.clone()); + } else { let mut context = context.clone(); - context.step_number += 1; context.recipient = Some(recipient.clone()); @@ -55,10 +59,10 @@ impl EnginePlugin for CorePlugin { self.pipeline_sender.send(task).await.unwrap(); } - - Ok(StepOutput::Interrupt) } - }, + debug!("Total recipients: {recipient_number}"); + Ok(StepOutput::Continue) + } } } @@ -79,6 +83,7 @@ pub(crate) const STEPS: &[&str] = &["core.set_recipients"]; #[cfg(test)] mod tests { use super::*; + use crate::recipient::RecipientInlineController; use uuid::Uuid; #[tokio::test] @@ -88,7 +93,7 @@ mod tests { let step = SerializedStep(step.as_object().unwrap().clone()); let (pipeline_tx, pipeline_rx) = flume::unbounded(); - let plugin = CorePlugin::new(Arc::new(pipeline_tx)); + let plugin = CorePlugin::new(Arc::new(pipeline_tx), Arc::new(RecipientInlineController)); let output = plugin.execute_step(&mut context, &step).await.unwrap(); assert_eq!(output, StepOutput::Continue); @@ -115,7 +120,7 @@ mod tests { let step = SerializedStep(step.as_object().unwrap().clone()); let (pipeline_tx, pipeline_rx) = flume::unbounded(); - let plugin = CorePlugin::new(Arc::new(pipeline_tx)); + let plugin = CorePlugin::new(Arc::new(pipeline_tx), Arc::new(RecipientInlineController)); let output = plugin.execute_step(&mut context, &step).await.unwrap(); @@ -138,6 +143,12 @@ mod tests { "abc:1234567890" ] }, + { + "id": Uuid::now_v7(), + "contacts": [ + "abc:1234567890" + ] + }, { "id": Uuid::now_v7(), "contacts": [ @@ -150,11 +161,11 @@ mod tests { let step = SerializedStep(step.as_object().unwrap().clone()); let (pipeline_tx, pipeline_rx) = flume::unbounded(); - let plugin = CorePlugin::new(Arc::new(pipeline_tx)); + let plugin = CorePlugin::new(Arc::new(pipeline_tx), Arc::new(RecipientInlineController)); let output = plugin.execute_step(&mut context, &step).await.unwrap(); - assert_eq!(output, StepOutput::Interrupt); + assert_eq!(output, StepOutput::Continue); assert_eq!(pipeline_rx.len(), 2) } } diff --git a/notifico-core/src/lib.rs b/notifico-core/src/lib.rs index d04e11f..70776f9 100644 --- a/notifico-core/src/lib.rs +++ b/notifico-core/src/lib.rs @@ -1,4 +1,3 @@ -pub mod contact; pub mod credentials; pub mod engine; pub mod error; diff --git a/notifico-core/src/pipeline/event.rs b/notifico-core/src/pipeline/event.rs index 8b86103..e1fbdd7 100644 --- a/notifico-core/src/pipeline/event.rs +++ b/notifico-core/src/pipeline/event.rs @@ -26,17 +26,10 @@ pub struct ProcessEventRequest { #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] #[serde(rename_all = "snake_case", untagged)] pub enum RecipientSelector { + Id(Uuid), Recipient(Recipient), } -impl RecipientSelector { - pub fn resolve(self) -> Recipient { - match self { - RecipientSelector::Recipient(recipient) => recipient, - } - } -} - pub struct EventHandler { pipeline_storage: Arc, task_tx: Arc, diff --git a/notifico-core/src/contact.rs b/notifico-core/src/recipient/contact.rs similarity index 100% rename from notifico-core/src/contact.rs rename to notifico-core/src/recipient/contact.rs diff --git a/notifico-core/src/recipient/controller.rs b/notifico-core/src/recipient/controller.rs new file mode 100644 index 0000000..92b3f45 --- /dev/null +++ b/notifico-core/src/recipient/controller.rs @@ -0,0 +1,36 @@ +use crate::error::EngineError; +use crate::pipeline::event::RecipientSelector; +use crate::recipient::Recipient; +use async_trait::async_trait; +use futures::stream::{self, BoxStream}; +use futures::StreamExt; +use tracing::warn; + +#[async_trait] +pub trait RecipientController: Send + Sync { + async fn get_recipients( + &self, + sel: Vec, + ) -> Result, EngineError>; +} + +pub struct RecipientInlineController; + +#[async_trait] +impl RecipientController for RecipientInlineController { + async fn get_recipients( + &self, + sel: Vec, + ) -> Result, EngineError> { + Ok( + stream::iter(sel.into_iter().filter_map(|selector| match selector { + RecipientSelector::Recipient(recipient) => Some(recipient), + _ => { + warn!("Invalid recipient selector: {:?}", selector); + None + } + })) + .boxed(), + ) + } +} diff --git a/notifico-core/src/recipient.rs b/notifico-core/src/recipient/mod.rs similarity index 92% rename from notifico-core/src/recipient.rs rename to notifico-core/src/recipient/mod.rs index 6ab9b1d..46afbe7 100644 --- a/notifico-core/src/recipient.rs +++ b/notifico-core/src/recipient/mod.rs @@ -1,4 +1,8 @@ -use crate::contact::{RawContact, TypedContact}; +mod contact; +mod controller; + +pub use contact::*; +pub use controller::*; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use uuid::Uuid; diff --git a/notifico-core/src/simpletransport.rs b/notifico-core/src/simpletransport.rs index 19717bb..2f0a106 100644 --- a/notifico-core/src/simpletransport.rs +++ b/notifico-core/src/simpletransport.rs @@ -1,7 +1,7 @@ -use crate::contact::RawContact; use crate::credentials::{CredentialSelector, CredentialStorage, RawCredential}; use crate::engine::{EnginePlugin, Message, PipelineContext, StepOutput}; use crate::error::EngineError; +use crate::recipient::RawContact; use crate::recorder::Recorder; use crate::step::SerializedStep; use crate::transport::Transport; diff --git a/notificox/src/main.rs b/notificox/src/main.rs index 9fe6b50..1198c58 100644 --- a/notificox/src/main.rs +++ b/notificox/src/main.rs @@ -1,6 +1,5 @@ use clap::{Parser, Subcommand}; use notifico_attachment::AttachmentPlugin; -use notifico_core::contact::RawContact; use notifico_core::credentials::memory::MemoryCredentialStorage; use notifico_core::credentials::RawCredential; use notifico_core::engine::plugin::core::CorePlugin; @@ -9,7 +8,7 @@ use notifico_core::pipeline::event::{EventHandler, ProcessEventRequest, Recipien use notifico_core::pipeline::executor::PipelineExecutor; use notifico_core::pipeline::storage::SinglePipelineStorage; use notifico_core::pipeline::Pipeline; -use notifico_core::recipient::Recipient; +use notifico_core::recipient::{RawContact, Recipient, RecipientInlineController}; use notifico_core::recorder::BaseRecorder; use notifico_core::step::SerializedStep; use notifico_core::transport::TransportRegistry; @@ -228,7 +227,10 @@ async fn main() { let (pipelines_tx, pipelines_rx) = flume::unbounded(); let pipelines_tx = Arc::new(pipelines_tx); - engine.add_plugin(Arc::new(CorePlugin::new(pipelines_tx.clone()))); + engine.add_plugin(Arc::new(CorePlugin::new( + pipelines_tx.clone(), + Arc::new(RecipientInlineController), + ))); let templater_source = Arc::new(FilesystemSource::new(template_dir)); engine.add_plugin(Arc::new(Templater::new(templater_source.clone()))); diff --git a/transports/notifico-gotify/src/lib.rs b/transports/notifico-gotify/src/lib.rs index 83b29dd..65a5fa7 100644 --- a/transports/notifico-gotify/src/lib.rs +++ b/transports/notifico-gotify/src/lib.rs @@ -1,9 +1,9 @@ use crate::credentials::GotifyCredentials; use async_trait::async_trait; -use notifico_core::contact::RawContact; use notifico_core::credentials::RawCredential; use notifico_core::engine::{Message, PipelineContext}; use notifico_core::error::EngineError; +use notifico_core::recipient::RawContact; use notifico_core::simpletransport::SimpleTransport; use notifico_core::templater::RenderedTemplate; use serde::{Deserialize, Serialize}; diff --git a/transports/notifico-ntfy/src/lib.rs b/transports/notifico-ntfy/src/lib.rs index 9effb03..08b08fc 100644 --- a/transports/notifico-ntfy/src/lib.rs +++ b/transports/notifico-ntfy/src/lib.rs @@ -2,10 +2,10 @@ use async_trait::async_trait; use base64::prelude::BASE64_STANDARD; use base64::Engine; use notifico_attachment::AttachmentPlugin; -use notifico_core::contact::RawContact; use notifico_core::credentials::{RawCredential, TypedCredential}; use notifico_core::engine::{Message, PipelineContext}; use notifico_core::error::EngineError; +use notifico_core::recipient::RawContact; use notifico_core::simpletransport::SimpleTransport; use notifico_core::templater::RenderedTemplate; use serde::{Deserialize, Serialize}; diff --git a/transports/notifico-pushover/src/lib.rs b/transports/notifico-pushover/src/lib.rs index 372814e..35f9a83 100644 --- a/transports/notifico-pushover/src/lib.rs +++ b/transports/notifico-pushover/src/lib.rs @@ -1,9 +1,9 @@ use async_trait::async_trait; use notifico_attachment::AttachmentPlugin; -use notifico_core::contact::{RawContact, TypedContact}; use notifico_core::credentials::{RawCredential, TypedCredential}; use notifico_core::engine::{Message, PipelineContext}; use notifico_core::error::EngineError; +use notifico_core::recipient::{RawContact, TypedContact}; use notifico_core::simpletransport::SimpleTransport; use notifico_core::templater::RenderedTemplate; use serde::{Deserialize, Serialize}; diff --git a/transports/notifico-slack/src/lib.rs b/transports/notifico-slack/src/lib.rs index 548a2e6..0200160 100644 --- a/transports/notifico-slack/src/lib.rs +++ b/transports/notifico-slack/src/lib.rs @@ -4,10 +4,10 @@ mod slackapi; use async_trait::async_trait; use credentials::SlackCredentials; use notifico_attachment::AttachmentPlugin; -use notifico_core::contact::{RawContact, TypedContact}; use notifico_core::credentials::RawCredential; use notifico_core::engine::{Message, PipelineContext}; use notifico_core::error::EngineError; +use notifico_core::recipient::{RawContact, TypedContact}; use notifico_core::simpletransport::SimpleTransport; use notifico_core::templater::RenderedTemplate; use serde::{Deserialize, Serialize}; diff --git a/transports/notifico-smpp/src/lib.rs b/transports/notifico-smpp/src/lib.rs index 34d598f..da728ab 100644 --- a/transports/notifico-smpp/src/lib.rs +++ b/transports/notifico-smpp/src/lib.rs @@ -6,10 +6,10 @@ use crate::step::{Step, STEPS}; use async_trait::async_trait; use futures_util::sink::SinkExt; use futures_util::StreamExt; -use notifico_core::contact::PhoneContact; use notifico_core::credentials::CredentialStorage; use notifico_core::engine::{EnginePlugin, PipelineContext, StepOutput}; use notifico_core::error::EngineError; +use notifico_core::recipient::PhoneContact; use notifico_core::step::SerializedStep; use notifico_core::templater::RenderedTemplate; use notifico_core::transport::Transport; diff --git a/transports/notifico-smtp/src/lib.rs b/transports/notifico-smtp/src/lib.rs index 55f6606..3fa59bc 100644 --- a/transports/notifico-smtp/src/lib.rs +++ b/transports/notifico-smtp/src/lib.rs @@ -16,9 +16,9 @@ use lettre::{ }; use moka::future::Cache; use notifico_attachment::AttachmentPlugin; -use notifico_core::contact::{RawContact, TypedContact}; use notifico_core::credentials::RawCredential; use notifico_core::engine::Message; +use notifico_core::recipient::{RawContact, TypedContact}; use notifico_core::simpletransport::SimpleTransport; use notifico_core::{engine::PipelineContext, error::EngineError}; use serde::Deserialize; diff --git a/transports/notifico-telegram/src/contact.rs b/transports/notifico-telegram/src/contact.rs index 9640b47..ca86b65 100644 --- a/transports/notifico-telegram/src/contact.rs +++ b/transports/notifico-telegram/src/contact.rs @@ -1,5 +1,5 @@ -use notifico_core::contact::{RawContact, TypedContact}; use notifico_core::error::EngineError; +use notifico_core::recipient::{RawContact, TypedContact}; use serde::Deserialize; #[derive(Debug, Clone, Deserialize)] diff --git a/transports/notifico-telegram/src/lib.rs b/transports/notifico-telegram/src/lib.rs index 30c25e6..28cc59c 100644 --- a/transports/notifico-telegram/src/lib.rs +++ b/transports/notifico-telegram/src/lib.rs @@ -1,9 +1,9 @@ use async_trait::async_trait; use contact::TelegramContact; use notifico_attachment::AttachmentPlugin; -use notifico_core::contact::RawContact; use notifico_core::credentials::RawCredential; use notifico_core::engine::{Message, PipelineContext}; +use notifico_core::recipient::RawContact; use notifico_core::simpletransport::SimpleTransport; use notifico_core::{ credentials::TypedCredential, error::EngineError, templater::RenderedTemplate, diff --git a/transports/notifico-whatsapp/src/lib.rs b/transports/notifico-whatsapp/src/lib.rs index 3f5183c..75601b2 100644 --- a/transports/notifico-whatsapp/src/lib.rs +++ b/transports/notifico-whatsapp/src/lib.rs @@ -1,9 +1,9 @@ use crate::cloudapi::{MessageType, MessagingProduct}; use crate::credentials::WhatsAppCredentials; use async_trait::async_trait; -use notifico_core::contact::{PhoneContact, RawContact}; use notifico_core::credentials::RawCredential; use notifico_core::engine::{Message, PipelineContext}; +use notifico_core::recipient::{PhoneContact, RawContact}; use notifico_core::simpletransport::SimpleTransport; use notifico_core::{error::EngineError, templater::RenderedTemplate}; use serde::{Deserialize, Serialize};