Skip to content

Commit

Permalink
Prepare Recipient for recipient catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
GamePad64 committed Dec 24, 2024
1 parent 831dcef commit 4be310b
Show file tree
Hide file tree
Showing 20 changed files with 113 additions and 64 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 21 additions & 19 deletions notifico-app/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use notifico_core::http::SecretKey;
use notifico_core::pipeline::event::EventHandler;
use notifico_core::pipeline::executor::PipelineExecutor;
use notifico_core::queue::{ReceiverChannel, SenderChannel};
use notifico_core::recipient::RecipientInlineController;
use notifico_core::recorder::BaseRecorder;
use notifico_core::transport::TransportRegistry;
use notifico_dbpipeline::DbPipelineStorage;
Expand Down Expand Up @@ -197,28 +198,29 @@ async fn main() {

if components.is_empty() || components.contains(COMPONENT_WORKER) {
// Create Engine with plugins
let mut engine = Some(Engine::new());
if let Some(engine) = engine.as_mut() {
engine.add_plugin(Arc::new(CorePlugin::new(pipelines_tx.clone())));
engine.add_plugin(Arc::new(Templater::new(templater_source.clone())));
engine.add_plugin(subman.clone());

let attachment_plugin = Arc::new(AttachmentPlugin::new(false));
engine.add_plugin(attachment_plugin.clone());

let mut transport_registry = TransportRegistry::new();
for (engine_plugin, transport_plugin) in all_transports(
credentials.clone(),
recorder.clone(),
attachment_plugin.clone(),
) {
engine.add_plugin(engine_plugin);
transport_registry.register(transport_plugin);
}
let mut engine = Engine::new();
engine.add_plugin(Arc::new(CorePlugin::new(
pipelines_tx.clone(),
Arc::new(RecipientInlineController),
)));
engine.add_plugin(Arc::new(Templater::new(templater_source.clone())));
engine.add_plugin(subman.clone());

let attachment_plugin = Arc::new(AttachmentPlugin::new(false));
engine.add_plugin(attachment_plugin.clone());

let mut transport_registry = TransportRegistry::new();
for (engine_plugin, transport_plugin) in all_transports(
credentials.clone(),
recorder.clone(),
attachment_plugin.clone(),
) {
engine.add_plugin(engine_plugin);
transport_registry.register(transport_plugin);
}

// Main loop
let executor = Arc::new(PipelineExecutor::new(engine.unwrap()));
let executor = Arc::new(PipelineExecutor::new(engine));
let event_handler =
Arc::new(EventHandler::new(pipelines.clone(), pipelines_tx.clone()));

Expand Down
1 change: 1 addition & 0 deletions notifico-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ flume = "0.11.1"
thiserror = "2.0.6"
regex = "1.11.1"
serde_with = "3.11.0"
futures = "0.3.31"
55 changes: 33 additions & 22 deletions notifico-core/src/engine/plugin/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ use crate::error::EngineError;
use crate::pipeline::event::RecipientSelector;
use crate::pipeline::executor::PipelineTask;
use crate::queue::SenderChannel;
use crate::recipient::RecipientController;
use crate::step::SerializedStep;
use async_trait::async_trait;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::sync::Arc;
Expand All @@ -13,11 +15,18 @@ use uuid::Uuid;

pub struct CorePlugin {
pipeline_sender: Arc<dyn SenderChannel>,
recipient_controller: Arc<dyn RecipientController>,
}

impl CorePlugin {
pub fn new(pipeline_sender: Arc<dyn SenderChannel>) -> Self {
Self { pipeline_sender }
pub fn new(
pipeline_sender: Arc<dyn SenderChannel>,
recipient_controller: Arc<dyn RecipientController>,
) -> Self {
Self {
pipeline_sender,
recipient_controller,
}
}
}

Expand All @@ -31,21 +40,16 @@ impl EnginePlugin for CorePlugin {
let step: Step = step.convert_step()?;

match step {
Step::SetRecipients { recipients } => match recipients.len() {
0 => Ok(StepOutput::Continue),
1 => {
debug!("Single recipient; no fork");
let recipient = &recipients[0].clone().resolve();
context.recipient = Some(recipient.clone());
Ok(StepOutput::Continue)
}
n => {
debug!("Multiple recipients: {n}; fork");
for recipient in recipients {
let recipient = recipient.resolve();
Step::SetRecipients { recipients } => {
let mut recipients = self.recipient_controller.get_recipients(recipients).await?;
let mut recipient_number = 0;

while let Some(recipient) = recipients.next().await {
recipient_number += 1;
if recipient_number == 1 {
context.recipient = Some(recipient.clone());
} else {
let mut context = context.clone();

context.step_number += 1;
context.recipient = Some(recipient.clone());

Expand All @@ -55,10 +59,10 @@ impl EnginePlugin for CorePlugin {

self.pipeline_sender.send(task).await.unwrap();
}

Ok(StepOutput::Interrupt)
}
},
debug!("Total recipients: {recipient_number}");
Ok(StepOutput::Continue)
}
}
}

Expand All @@ -79,6 +83,7 @@ pub(crate) const STEPS: &[&str] = &["core.set_recipients"];
#[cfg(test)]
mod tests {
use super::*;
use crate::recipient::RecipientInlineController;
use uuid::Uuid;

#[tokio::test]
Expand All @@ -88,7 +93,7 @@ mod tests {
let step = SerializedStep(step.as_object().unwrap().clone());

let (pipeline_tx, pipeline_rx) = flume::unbounded();
let plugin = CorePlugin::new(Arc::new(pipeline_tx));
let plugin = CorePlugin::new(Arc::new(pipeline_tx), Arc::new(RecipientInlineController));

let output = plugin.execute_step(&mut context, &step).await.unwrap();
assert_eq!(output, StepOutput::Continue);
Expand All @@ -115,7 +120,7 @@ mod tests {
let step = SerializedStep(step.as_object().unwrap().clone());

let (pipeline_tx, pipeline_rx) = flume::unbounded();
let plugin = CorePlugin::new(Arc::new(pipeline_tx));
let plugin = CorePlugin::new(Arc::new(pipeline_tx), Arc::new(RecipientInlineController));

let output = plugin.execute_step(&mut context, &step).await.unwrap();

Expand All @@ -138,6 +143,12 @@ mod tests {
"abc:1234567890"
]
},
{
"id": Uuid::now_v7(),
"contacts": [
"abc:1234567890"
]
},
{
"id": Uuid::now_v7(),
"contacts": [
Expand All @@ -150,11 +161,11 @@ mod tests {
let step = SerializedStep(step.as_object().unwrap().clone());

let (pipeline_tx, pipeline_rx) = flume::unbounded();
let plugin = CorePlugin::new(Arc::new(pipeline_tx));
let plugin = CorePlugin::new(Arc::new(pipeline_tx), Arc::new(RecipientInlineController));

let output = plugin.execute_step(&mut context, &step).await.unwrap();

assert_eq!(output, StepOutput::Interrupt);
assert_eq!(output, StepOutput::Continue);
assert_eq!(pipeline_rx.len(), 2)
}
}
1 change: 0 additions & 1 deletion notifico-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod contact;
pub mod credentials;
pub mod engine;
pub mod error;
Expand Down
9 changes: 1 addition & 8 deletions notifico-core/src/pipeline/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,10 @@ pub struct ProcessEventRequest {
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "snake_case", untagged)]
pub enum RecipientSelector {
Id(Uuid),
Recipient(Recipient),
}

impl RecipientSelector {
pub fn resolve(self) -> Recipient {
match self {
RecipientSelector::Recipient(recipient) => recipient,
}
}
}

pub struct EventHandler {
pipeline_storage: Arc<dyn PipelineStorage>,
task_tx: Arc<dyn SenderChannel>,
Expand Down
File renamed without changes.
36 changes: 36 additions & 0 deletions notifico-core/src/recipient/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use crate::error::EngineError;
use crate::pipeline::event::RecipientSelector;
use crate::recipient::Recipient;
use async_trait::async_trait;
use futures::stream::{self, BoxStream};
use futures::StreamExt;
use tracing::warn;

#[async_trait]
pub trait RecipientController: Send + Sync {
async fn get_recipients(
&self,
sel: Vec<RecipientSelector>,
) -> Result<BoxStream<Recipient>, EngineError>;
}

pub struct RecipientInlineController;

#[async_trait]
impl RecipientController for RecipientInlineController {
async fn get_recipients(
&self,
sel: Vec<RecipientSelector>,
) -> Result<BoxStream<Recipient>, EngineError> {
Ok(
stream::iter(sel.into_iter().filter_map(|selector| match selector {
RecipientSelector::Recipient(recipient) => Some(recipient),
_ => {
warn!("Invalid recipient selector: {:?}", selector);
None
}
}))
.boxed(),
)
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use crate::contact::{RawContact, TypedContact};
mod contact;
mod controller;

pub use contact::*;
pub use controller::*;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use uuid::Uuid;
Expand Down
2 changes: 1 addition & 1 deletion notifico-core/src/simpletransport.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::contact::RawContact;
use crate::credentials::{CredentialSelector, CredentialStorage, RawCredential};
use crate::engine::{EnginePlugin, Message, PipelineContext, StepOutput};
use crate::error::EngineError;
use crate::recipient::RawContact;
use crate::recorder::Recorder;
use crate::step::SerializedStep;
use crate::transport::Transport;
Expand Down
8 changes: 5 additions & 3 deletions notificox/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use clap::{Parser, Subcommand};
use notifico_attachment::AttachmentPlugin;
use notifico_core::contact::RawContact;
use notifico_core::credentials::memory::MemoryCredentialStorage;
use notifico_core::credentials::RawCredential;
use notifico_core::engine::plugin::core::CorePlugin;
Expand All @@ -9,7 +8,7 @@ use notifico_core::pipeline::event::{EventHandler, ProcessEventRequest, Recipien
use notifico_core::pipeline::executor::PipelineExecutor;
use notifico_core::pipeline::storage::SinglePipelineStorage;
use notifico_core::pipeline::Pipeline;
use notifico_core::recipient::Recipient;
use notifico_core::recipient::{RawContact, Recipient, RecipientInlineController};
use notifico_core::recorder::BaseRecorder;
use notifico_core::step::SerializedStep;
use notifico_core::transport::TransportRegistry;
Expand Down Expand Up @@ -228,7 +227,10 @@ async fn main() {
let (pipelines_tx, pipelines_rx) = flume::unbounded();
let pipelines_tx = Arc::new(pipelines_tx);

engine.add_plugin(Arc::new(CorePlugin::new(pipelines_tx.clone())));
engine.add_plugin(Arc::new(CorePlugin::new(
pipelines_tx.clone(),
Arc::new(RecipientInlineController),
)));

let templater_source = Arc::new(FilesystemSource::new(template_dir));
engine.add_plugin(Arc::new(Templater::new(templater_source.clone())));
Expand Down
2 changes: 1 addition & 1 deletion transports/notifico-gotify/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::credentials::GotifyCredentials;
use async_trait::async_trait;
use notifico_core::contact::RawContact;
use notifico_core::credentials::RawCredential;
use notifico_core::engine::{Message, PipelineContext};
use notifico_core::error::EngineError;
use notifico_core::recipient::RawContact;
use notifico_core::simpletransport::SimpleTransport;
use notifico_core::templater::RenderedTemplate;
use serde::{Deserialize, Serialize};
Expand Down
2 changes: 1 addition & 1 deletion transports/notifico-ntfy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use notifico_attachment::AttachmentPlugin;
use notifico_core::contact::RawContact;
use notifico_core::credentials::{RawCredential, TypedCredential};
use notifico_core::engine::{Message, PipelineContext};
use notifico_core::error::EngineError;
use notifico_core::recipient::RawContact;
use notifico_core::simpletransport::SimpleTransport;
use notifico_core::templater::RenderedTemplate;
use serde::{Deserialize, Serialize};
Expand Down
2 changes: 1 addition & 1 deletion transports/notifico-pushover/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use async_trait::async_trait;
use notifico_attachment::AttachmentPlugin;
use notifico_core::contact::{RawContact, TypedContact};
use notifico_core::credentials::{RawCredential, TypedCredential};
use notifico_core::engine::{Message, PipelineContext};
use notifico_core::error::EngineError;
use notifico_core::recipient::{RawContact, TypedContact};
use notifico_core::simpletransport::SimpleTransport;
use notifico_core::templater::RenderedTemplate;
use serde::{Deserialize, Serialize};
Expand Down
2 changes: 1 addition & 1 deletion transports/notifico-slack/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ mod slackapi;
use async_trait::async_trait;
use credentials::SlackCredentials;
use notifico_attachment::AttachmentPlugin;
use notifico_core::contact::{RawContact, TypedContact};
use notifico_core::credentials::RawCredential;
use notifico_core::engine::{Message, PipelineContext};
use notifico_core::error::EngineError;
use notifico_core::recipient::{RawContact, TypedContact};
use notifico_core::simpletransport::SimpleTransport;
use notifico_core::templater::RenderedTemplate;
use serde::{Deserialize, Serialize};
Expand Down
2 changes: 1 addition & 1 deletion transports/notifico-smpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use crate::step::{Step, STEPS};
use async_trait::async_trait;
use futures_util::sink::SinkExt;
use futures_util::StreamExt;
use notifico_core::contact::PhoneContact;
use notifico_core::credentials::CredentialStorage;
use notifico_core::engine::{EnginePlugin, PipelineContext, StepOutput};
use notifico_core::error::EngineError;
use notifico_core::recipient::PhoneContact;
use notifico_core::step::SerializedStep;
use notifico_core::templater::RenderedTemplate;
use notifico_core::transport::Transport;
Expand Down
2 changes: 1 addition & 1 deletion transports/notifico-smtp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use lettre::{
};
use moka::future::Cache;
use notifico_attachment::AttachmentPlugin;
use notifico_core::contact::{RawContact, TypedContact};
use notifico_core::credentials::RawCredential;
use notifico_core::engine::Message;
use notifico_core::recipient::{RawContact, TypedContact};
use notifico_core::simpletransport::SimpleTransport;
use notifico_core::{engine::PipelineContext, error::EngineError};
use serde::Deserialize;
Expand Down
2 changes: 1 addition & 1 deletion transports/notifico-telegram/src/contact.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use notifico_core::contact::{RawContact, TypedContact};
use notifico_core::error::EngineError;
use notifico_core::recipient::{RawContact, TypedContact};
use serde::Deserialize;

#[derive(Debug, Clone, Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion transports/notifico-telegram/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use async_trait::async_trait;
use contact::TelegramContact;
use notifico_attachment::AttachmentPlugin;
use notifico_core::contact::RawContact;
use notifico_core::credentials::RawCredential;
use notifico_core::engine::{Message, PipelineContext};
use notifico_core::recipient::RawContact;
use notifico_core::simpletransport::SimpleTransport;
use notifico_core::{
credentials::TypedCredential, error::EngineError, templater::RenderedTemplate,
Expand Down
Loading

0 comments on commit 4be310b

Please sign in to comment.