diff --git a/.idea/notifico.iml b/.idea/notifico.iml index 2503a98..9844474 100644 --- a/.idea/notifico.iml +++ b/.idea/notifico.iml @@ -25,6 +25,7 @@ + diff --git a/Cargo.lock b/Cargo.lock index d70a545..c45221c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2455,6 +2455,21 @@ dependencies = [ "sea-orm-migration", ] +[[package]] +name = "notifico-transports" +version = "0.1.0" +dependencies = [ + "notifico-core", + "notifico-gotify", + "notifico-pushover", + "notifico-slack", + "notifico-smpp", + "notifico-smtp", + "notifico-telegram", + "notifico-template", + "notifico-whatsapp", +] + [[package]] name = "notifico-userapi" version = "0.1.0" @@ -2533,14 +2548,9 @@ dependencies = [ "log", "notifico-core", "notifico-dbpipeline", - "notifico-pushover", - "notifico-slack", - "notifico-smpp", - "notifico-smtp", "notifico-subscription", - "notifico-telegram", "notifico-template", - "notifico-whatsapp", + "notifico-transports", "ractor", "sea-orm", "serde", @@ -2564,14 +2574,8 @@ dependencies = [ "json5", "log", "notifico-core", - "notifico-gotify", - "notifico-pushover", - "notifico-slack", - "notifico-smpp", - "notifico-smtp", - "notifico-telegram", "notifico-template", - "notifico-whatsapp", + "notifico-transports", "regex", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 6b0993c..855b458 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ members = [ "notifico-project/migration", "notifico-web", "notifico-ingest", - "notificox", "transports/notifico-gotify", + "notificox", "transports/notifico-gotify", "notifico-transports", ] [workspace.dependencies] diff --git a/notifico-transports/Cargo.toml b/notifico-transports/Cargo.toml new file mode 100644 index 0000000..46f4c2d --- /dev/null +++ b/notifico-transports/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "notifico-transports" +version = "0.1.0" +edition = "2021" + +[dependencies] +notifico-core = { path = "../notifico-core" } +notifico-template = { path = "../notifico-template" } +# Transports +notifico-telegram = { path = "../transports/notifico-telegram" } +notifico-smtp = { path = "../transports/notifico-smtp" } +notifico-whatsapp = { path = "../transports/notifico-whatsapp" } +notifico-smpp = { path = "../transports/notifico-smpp" } +notifico-slack = { path = "../transports/notifico-slack" } +notifico-pushover = { path = "../transports/notifico-pushover" } +notifico-gotify = { path = "../transports/notifico-gotify" } diff --git a/notifico-transports/src/lib.rs b/notifico-transports/src/lib.rs new file mode 100644 index 0000000..8ba863c --- /dev/null +++ b/notifico-transports/src/lib.rs @@ -0,0 +1,72 @@ +use notifico_core::credentials::CredentialStorage; +use notifico_core::engine::EnginePlugin; +use notifico_core::recorder::Recorder; +use notifico_core::simpletransport::SimpleTransportWrapper; +use notifico_core::transport::Transport; +use notifico_gotify::GotifyTransport; +use notifico_pushover::PushoverTransport; +use notifico_slack::SlackTransport; +use notifico_smpp::SmppPlugin; +use notifico_smtp::EmailPlugin; +use notifico_telegram::TelegramTransport; +use notifico_whatsapp::WabaTransport; +use std::sync::Arc; + +pub fn all_transports( + credentials: Arc, + recorder: Arc, +) -> Vec<(Arc, Arc)> { + let mut plugins: Vec<(Arc, Arc)> = vec![]; + + // Complicated transports + let email_plugin = Arc::new(EmailPlugin::new(credentials.clone(), recorder.clone())); + plugins.push((email_plugin.clone(), email_plugin.clone())); + + let smpp_plugin = Arc::new(SmppPlugin::new(credentials.clone())); + plugins.push((smpp_plugin.clone(), smpp_plugin.clone())); + + // Simple transports + let telegram_transport = Arc::new(TelegramTransport::new()); + let telegram_plugin = Arc::new(SimpleTransportWrapper::new( + telegram_transport, + credentials.clone(), + recorder.clone(), + )); + plugins.push((telegram_plugin.clone(), telegram_plugin.clone())); + + let waba_transport = Arc::new(WabaTransport::new()); + let waba_plugin = Arc::new(SimpleTransportWrapper::new( + waba_transport, + credentials.clone(), + recorder.clone(), + )); + plugins.push((waba_plugin.clone(), waba_plugin.clone())); + + let slack_transport = Arc::new(SlackTransport::new()); + let slack_plugin = Arc::new(SimpleTransportWrapper::new( + slack_transport, + credentials.clone(), + recorder.clone(), + )); + plugins.push((slack_plugin.clone(), slack_plugin.clone())); + + let pushover_transport = Arc::new(PushoverTransport::new()); + let pushover_plugin = Arc::new(SimpleTransportWrapper::new( + pushover_transport, + credentials.clone(), + recorder.clone(), + )); + plugins.push((pushover_plugin.clone(), pushover_plugin.clone())); + + let gotify_transport = Arc::new(GotifyTransport::new()); + let gotify_plugin = Arc::new(SimpleTransportWrapper::new( + gotify_transport, + credentials.clone(), + recorder.clone(), + )); + plugins.push((gotify_plugin.clone(), gotify_plugin.clone())); + + // Add more transports here... + + plugins +} diff --git a/notifico-worker/Cargo.toml b/notifico-worker/Cargo.toml index 310488a..452038a 100644 --- a/notifico-worker/Cargo.toml +++ b/notifico-worker/Cargo.toml @@ -5,12 +5,7 @@ edition = "2021" [dependencies] notifico-core = { path = "../notifico-core" } -notifico-telegram = { path = "../transports/notifico-telegram" } -notifico-smtp = { path = "../transports/notifico-smtp" } -notifico-whatsapp = { path = "../transports/notifico-whatsapp" } -notifico-smpp = { path = "../transports/notifico-smpp" } -notifico-slack = { path = "../transports/notifico-slack" } -notifico-pushover = { path = "../transports/notifico-pushover" } +notifico-transports = { path = "../notifico-transports" } notifico-template = { path = "../notifico-template" } notifico-subscription = { path = "../notifico-subscription" } diff --git a/notifico-worker/src/main.rs b/notifico-worker/src/main.rs index 30ed0a1..1d208e9 100644 --- a/notifico-worker/src/main.rs +++ b/notifico-worker/src/main.rs @@ -12,16 +12,12 @@ use notifico_core::pipeline::event::EventHandler; use notifico_core::pipeline::executor::PipelineExecutor; use notifico_core::queue::ReceiverChannel; use notifico_core::recorder::BaseRecorder; +use notifico_core::transport::TransportRegistry; use notifico_dbpipeline::DbPipelineStorage; -use notifico_pushover::PushoverPlugin; -use notifico_slack::SlackPlugin; -use notifico_smpp::SmppPlugin; -use notifico_smtp::EmailPlugin; use notifico_subscription::SubscriptionManager; -use notifico_telegram::TelegramPlugin; use notifico_template::db::DbTemplateSource; use notifico_template::Templater; -use notifico_whatsapp::WaBusinessPlugin; +use notifico_transports::all_transports; use sea_orm::{ConnectOptions, Database}; use std::path::PathBuf; use std::sync::Arc; @@ -115,27 +111,11 @@ async fn main() { let templater_source = Arc::new(DbTemplateSource::new(db_connection.clone())); engine.add_plugin(Arc::new(Templater::new(templater_source.clone()))); - engine.add_plugin(Arc::new(TelegramPlugin::new( - credentials.clone(), - recorder.clone(), - ))); - engine.add_plugin(Arc::new(EmailPlugin::new( - credentials.clone(), - recorder.clone(), - ))); - engine.add_plugin(Arc::new(WaBusinessPlugin::new( - credentials.clone(), - recorder.clone(), - ))); - engine.add_plugin(Arc::new(SmppPlugin::new(credentials.clone()))); - engine.add_plugin(Arc::new(SlackPlugin::new( - credentials.clone(), - recorder.clone(), - ))); - engine.add_plugin(Arc::new(PushoverPlugin::new( - credentials.clone(), - recorder.clone(), - ))); + let mut transport_registry = TransportRegistry::new(); + for (engine_plugin, transport_plugin) in all_transports(credentials.clone(), recorder.clone()) { + engine.add_plugin(engine_plugin); + transport_registry.register(transport_plugin); + } let subman = Arc::new(SubscriptionManager::new( db_connection, diff --git a/notificox/Cargo.toml b/notificox/Cargo.toml index 4a2f0d3..9cf8b2d 100644 --- a/notificox/Cargo.toml +++ b/notificox/Cargo.toml @@ -6,14 +6,7 @@ edition = "2021" [dependencies] notifico-core = { path = "../notifico-core" } notifico-template = { path = "../notifico-template" } -# Transports -notifico-telegram = { path = "../transports/notifico-telegram" } -notifico-smtp = { path = "../transports/notifico-smtp" } -notifico-whatsapp = { path = "../transports/notifico-whatsapp" } -notifico-smpp = { path = "../transports/notifico-smpp" } -notifico-slack = { path = "../transports/notifico-slack" } -notifico-pushover = { path = "../transports/notifico-pushover" } -notifico-gotify = { path = "../transports/notifico-gotify" } +notifico-transports = { path = "../notifico-transports" } # Other deps serde = { version = "1.0.215", features = ["derive"] } serde_json = "1.0.133" diff --git a/notificox/src/main.rs b/notificox/src/main.rs index 1bc3c32..f039c10 100644 --- a/notificox/src/main.rs +++ b/notificox/src/main.rs @@ -2,7 +2,7 @@ use clap::{Parser, Subcommand}; use log::info; use notifico_core::config::credentials::MemoryCredentialStorage; use notifico_core::contact::Contact; -use notifico_core::credentials::{Credential, CredentialStorage}; +use notifico_core::credentials::Credential; use notifico_core::engine::plugin::core::CorePlugin; use notifico_core::engine::Engine; use notifico_core::pipeline::event::{EventHandler, ProcessEventRequest, RecipientSelector}; @@ -10,19 +10,12 @@ use notifico_core::pipeline::executor::PipelineExecutor; use notifico_core::pipeline::storage::SinglePipelineStorage; use notifico_core::pipeline::Pipeline; use notifico_core::recipient::Recipient; -use notifico_core::recorder::{BaseRecorder, Recorder}; -use notifico_core::simpletransport::SimpleTransportWrapper; +use notifico_core::recorder::BaseRecorder; use notifico_core::step::SerializedStep; use notifico_core::transport::TransportRegistry; -use notifico_gotify::GotifyTransport; -use notifico_pushover::PushoverPlugin; -use notifico_slack::SlackPlugin; -use notifico_smpp::SmppPlugin; -use notifico_smtp::EmailPlugin; -use notifico_telegram::TelegramPlugin; use notifico_template::source::DummyTemplateSource; use notifico_template::Templater; -use notifico_whatsapp::WaBusinessPlugin; +use notifico_transports::all_transports; use serde_json::{json, Map, Value}; use std::sync::Arc; use tokio::task::JoinSet; @@ -89,12 +82,12 @@ async fn main() { Arc::new(credentials) }; - add_transports( - &mut engine, - &mut transport_registry, - credentials.clone(), - recorder.clone(), - ); + for (engine_plugin, transport_plugin) in + all_transports(credentials.clone(), recorder.clone()) + { + engine.add_plugin(engine_plugin); + transport_registry.register(transport_plugin); + } let pipeline = { let mut pipeline = Pipeline { @@ -189,43 +182,3 @@ async fn main() { } } } - -fn add_transports( - engine: &mut Engine, - transport_registry: &mut TransportRegistry, - credentials: Arc, - recorder: Arc, -) { - let telegram_plugin = Arc::new(TelegramPlugin::new(credentials.clone(), recorder.clone())); - engine.add_plugin(telegram_plugin.clone()); - transport_registry.register(telegram_plugin); - - let email_plugin = Arc::new(EmailPlugin::new(credentials.clone(), recorder.clone())); - engine.add_plugin(email_plugin.clone()); - transport_registry.register(email_plugin); - - let whatsapp_plugin = Arc::new(WaBusinessPlugin::new(credentials.clone(), recorder.clone())); - engine.add_plugin(whatsapp_plugin.clone()); - transport_registry.register(whatsapp_plugin); - - let smpp_plugin = Arc::new(SmppPlugin::new(credentials.clone())); - engine.add_plugin(smpp_plugin.clone()); - transport_registry.register(smpp_plugin); - - let slack_plugin = Arc::new(SlackPlugin::new(credentials.clone(), recorder.clone())); - engine.add_plugin(slack_plugin.clone()); - transport_registry.register(slack_plugin); - - let pushover_plugin = Arc::new(PushoverPlugin::new(credentials.clone(), recorder.clone())); - engine.add_plugin(pushover_plugin.clone()); - transport_registry.register(pushover_plugin); - - let gotify_transport = Arc::new(GotifyTransport::new()); - let gotify_plugin = Arc::new(SimpleTransportWrapper::new( - gotify_transport.clone(), - credentials.clone(), - recorder.clone(), - )); - engine.add_plugin(gotify_plugin.clone()); - transport_registry.register(gotify_plugin); -} diff --git a/transports/notifico-pushover/src/lib.rs b/transports/notifico-pushover/src/lib.rs index 7d81855..359f0e8 100644 --- a/transports/notifico-pushover/src/lib.rs +++ b/transports/notifico-pushover/src/lib.rs @@ -1,18 +1,10 @@ -mod step; - -use crate::step::{Step, STEPS}; use async_trait::async_trait; use notifico_core::contact::{Contact, TypedContact}; -use notifico_core::credentials::{Credential, CredentialStorage, TypedCredential}; -use notifico_core::engine::{EnginePlugin, PipelineContext, StepOutput}; +use notifico_core::credentials::{Credential, TypedCredential}; use notifico_core::error::EngineError; -use notifico_core::recorder::Recorder; -use notifico_core::step::SerializedStep; +use notifico_core::simpletransport::SimpleTransport; use notifico_core::templater::RenderedTemplate; -use notifico_core::transport::Transport; use serde::{Deserialize, Serialize}; -use std::borrow::Cow; -use std::sync::Arc; use url::Url; #[derive(Debug, Serialize, Deserialize)] @@ -67,99 +59,58 @@ struct PushoverMessageRequest { url_title: Option, } -pub struct PushoverPlugin { +pub struct PushoverTransport { client: reqwest::Client, - credentials: Arc, - recorder: Arc, } -impl PushoverPlugin { - pub fn new(credentials: Arc, recorder: Arc) -> Self { +impl PushoverTransport { + pub fn new() -> Self { Self { client: reqwest::Client::new(), - credentials, - recorder, } } } #[async_trait] -impl EnginePlugin for PushoverPlugin { - async fn execute_step( +impl SimpleTransport for PushoverTransport { + async fn send_message( &self, - context: &mut PipelineContext, - step: &SerializedStep, - ) -> Result { - let step: Step = step.clone().convert_step()?; - - match step { - Step::Send { credential } => { - let credential: PushoverCredentials = self - .credentials - .resolve(context.project_id, credential) - .await?; - - let contacts: Vec = context.get_recipient()?.get_contacts(); - - for contact in contacts { - for message in context.messages.iter().cloned() { - let content: Message = message.content.try_into()?; - let request = PushoverMessageRequest { - token: credential.token.clone(), - user: contact.user.clone(), - message: content.text, - attachment_base64: None, - attachment_type: None, - device: None, - html: Some(1), - priority: None, - sound: None, - timestamp: None, - title: Some(content.title), - ttl: None, - url: None, - url_title: None, - }; - - let result = self - .client - .post("https://api.pushover.net/1/messages.json") - .body(serde_urlencoded::to_string(request).unwrap_or_default()) - .send() - .await; - - match result { - Ok(_) => self.recorder.record_message_sent( - context.event_id, - context.notification_id, - message.id, - ), - Err(e) => self.recorder.record_message_failed( - context.event_id, - context.notification_id, - message.id, - &e.to_string(), - ), - } - } - } - Ok(StepOutput::Continue) - } - } - } - - fn steps(&self) -> Vec> { - STEPS.iter().map(|&s| s.into()).collect() - } -} - -impl Transport for PushoverPlugin { - fn name(&self) -> Cow<'static, str> { - "pushover".into() + credential: Credential, + contact: Contact, + message: RenderedTemplate, + ) -> Result<(), EngineError> { + let credential: PushoverCredentials = credential.try_into()?; + let contact: PushoverContact = contact.try_into()?; + let message: Message = message.try_into()?; + + let request = PushoverMessageRequest { + token: credential.token.clone(), + user: contact.user.clone(), + message: message.text, + attachment_base64: None, + attachment_type: None, + device: None, + html: Some(1), + priority: None, + sound: None, + timestamp: None, + title: Some(message.title), + ttl: None, + url: None, + url_title: None, + }; + + self.client + .post("https://api.pushover.net/1/messages.json") + .body(serde_urlencoded::to_string(request).unwrap_or_default()) + .send() + .await + .map_err(|e| EngineError::InternalError(e.into()))?; + Ok(()) } - fn send_step(&self) -> Cow<'static, str> { - "pushover.send".into() + fn name(&self) -> &'static str { + "pushover" } } diff --git a/transports/notifico-pushover/src/step.rs b/transports/notifico-pushover/src/step.rs deleted file mode 100644 index 9f9227e..0000000 --- a/transports/notifico-pushover/src/step.rs +++ /dev/null @@ -1,11 +0,0 @@ -use notifico_core::credentials::CredentialSelector; -use serde::{Deserialize, Serialize}; - -#[derive(Serialize, Deserialize)] -#[serde(tag = "step")] -pub enum Step { - #[serde(rename = "pushover.send")] - Send { credential: CredentialSelector }, -} - -pub(crate) const STEPS: &[&str] = &["pushover.send"]; diff --git a/transports/notifico-slack/src/lib.rs b/transports/notifico-slack/src/lib.rs index 6b115ef..a0504f6 100644 --- a/transports/notifico-slack/src/lib.rs +++ b/transports/notifico-slack/src/lib.rs @@ -1,101 +1,53 @@ mod credentials; mod slackapi; -mod step; -use crate::step::{Step, STEPS}; use async_trait::async_trait; use credentials::SlackCredentials; use notifico_core::contact::{Contact, TypedContact}; -use notifico_core::credentials::CredentialStorage; -use notifico_core::engine::{EnginePlugin, PipelineContext, StepOutput}; +use notifico_core::credentials::Credential; use notifico_core::error::EngineError; -use notifico_core::recorder::Recorder; -use notifico_core::step::SerializedStep; +use notifico_core::simpletransport::SimpleTransport; use notifico_core::templater::RenderedTemplate; -use notifico_core::transport::Transport; use serde::{Deserialize, Serialize}; -use std::borrow::Cow; -use std::sync::Arc; -pub struct SlackPlugin { +pub struct SlackTransport { client: slackapi::SlackApi, - credentials: Arc, - recorder: Arc, } -impl SlackPlugin { - pub fn new(credentials: Arc, recorder: Arc) -> Self { - SlackPlugin { +impl SlackTransport { + pub fn new() -> Self { + SlackTransport { client: slackapi::SlackApi::new(), - credentials, - recorder, } } } #[async_trait] -impl EnginePlugin for SlackPlugin { - async fn execute_step( +impl SimpleTransport for SlackTransport { + async fn send_message( &self, - context: &mut PipelineContext, - step: &SerializedStep, - ) -> Result { - let step: Step = step.clone().convert_step()?; - - match step { - Step::Send { credential } => { - let credential: SlackCredentials = self - .credentials - .resolve(context.project_id, credential) - .await?; - - let contact: Vec = context.get_recipient()?.get_contacts(); - - for contact in contact { - for message in context.messages.iter().cloned() { - let content: SlackMessage = message.content.try_into()?; - let slack_message = slackapi::SlackMessage::Text { - channel: contact.channel_id.clone(), - text: content.text, - }; - - let result = self - .client - .chat_post_message(&credential.token, slack_message) - .await; - - match result { - Ok(_) => self.recorder.record_message_sent( - context.event_id, - context.notification_id, - message.id, - ), - Err(e) => self.recorder.record_message_failed( - context.event_id, - context.notification_id, - message.id, - &e.to_string(), - ), - } - } - } - Ok(StepOutput::Continue) - } - } - } - - fn steps(&self) -> Vec> { - STEPS.iter().map(|&s| s.into()).collect() - } -} - -impl Transport for SlackPlugin { - fn name(&self) -> Cow<'static, str> { - "slack".into() + credential: Credential, + contact: Contact, + message: RenderedTemplate, + ) -> Result<(), EngineError> { + let credential: SlackCredentials = credential.try_into()?; + let contact: SlackContact = contact.try_into()?; + let content: SlackMessage = message.try_into()?; + + let slack_message = slackapi::SlackMessage::Text { + channel: contact.channel_id.clone(), + text: content.text, + }; + + self.client + .chat_post_message(&credential.token, slack_message) + .await + .map_err(|e| EngineError::InternalError(e.into()))?; + Ok(()) } - fn send_step(&self) -> Cow<'static, str> { - "slack.send".into() + fn name(&self) -> &'static str { + "slack" } } diff --git a/transports/notifico-slack/src/step.rs b/transports/notifico-slack/src/step.rs deleted file mode 100644 index 6e5df8a..0000000 --- a/transports/notifico-slack/src/step.rs +++ /dev/null @@ -1,11 +0,0 @@ -use notifico_core::credentials::CredentialSelector; -use serde::{Deserialize, Serialize}; - -#[derive(Serialize, Deserialize)] -#[serde(tag = "step")] -pub enum Step { - #[serde(rename = "slack.send")] - Send { credential: CredentialSelector }, -} - -pub(crate) const STEPS: &[&str] = &["slack.send"]; diff --git a/transports/notifico-telegram/src/lib.rs b/transports/notifico-telegram/src/lib.rs index c34dfbc..8b858bb 100644 --- a/transports/notifico-telegram/src/lib.rs +++ b/transports/notifico-telegram/src/lib.rs @@ -1,27 +1,17 @@ -use crate::step::STEPS; use async_trait::async_trait; use contact::TelegramContact; +use notifico_core::contact::Contact; use notifico_core::credentials::Credential; -use notifico_core::recorder::Recorder; -use notifico_core::step::SerializedStep; -use notifico_core::transport::Transport; +use notifico_core::simpletransport::SimpleTransport; use notifico_core::{ - credentials::{CredentialStorage, TypedCredential}, - engine::PipelineContext, - engine::{EnginePlugin, StepOutput}, - error::EngineError, - templater::RenderedTemplate, + credentials::TypedCredential, error::EngineError, templater::RenderedTemplate, }; use serde::{Deserialize, Serialize}; use serde_json::Value; -use std::borrow::Cow; -use std::sync::Arc; -use step::Step; use teloxide::prelude::Requester; use teloxide::Bot; mod contact; -mod step; #[derive(Debug, Serialize, Deserialize)] struct TelegramBotCredentials { @@ -55,80 +45,36 @@ impl TypedCredential for TelegramBotCredentials { const TRANSPORT_NAME: &'static str = "telegram"; } -pub struct TelegramPlugin { - credentials: Arc, - recorder: Arc, -} +pub struct TelegramTransport {} -impl TelegramPlugin { - pub fn new(credentials: Arc, recorder: Arc) -> Self { - Self { - credentials, - recorder, - } +impl TelegramTransport { + pub fn new() -> Self { + Self {} } } #[async_trait] -impl EnginePlugin for TelegramPlugin { - async fn execute_step( +impl SimpleTransport for TelegramTransport { + async fn send_message( &self, - context: &mut PipelineContext, - step: &SerializedStep, - ) -> Result { - let step: Step = step.clone().convert_step()?; - - match step { - Step::Send { credential } => { - let credential: TelegramBotCredentials = self - .credentials - .resolve(context.project_id, credential) - .await?; - let bot = Bot::new(credential.token); - let contacts: Vec = context.get_recipient()?.get_contacts(); - - for contact in contacts { - for message in context.messages.iter().cloned() { - let content: TelegramContent = message.content.try_into().unwrap(); - - // Send - let result = bot - .send_message(contact.clone().into_recipient(), content.body) - .await; - - match result { - Ok(_) => self.recorder.record_message_sent( - context.event_id, - context.notification_id, - message.id, - ), - Err(e) => self.recorder.record_message_failed( - context.event_id, - context.notification_id, - message.id, - &e.to_string(), - ), - } - } - } - } - } - - Ok(StepOutput::Continue) + credential: Credential, + contact: Contact, + message: RenderedTemplate, + ) -> Result<(), EngineError> { + let credential: TelegramBotCredentials = credential.try_into()?; + let bot = Bot::new(credential.token); + let contact: TelegramContact = contact.try_into()?; + let content: TelegramContent = message.try_into()?; + + // Send + bot.send_message(contact.clone().into_recipient(), content.body) + .await + .map_err(|e| EngineError::InternalError(e.into()))?; + Ok(()) } - fn steps(&self) -> Vec> { - STEPS.iter().map(|&s| s.into()).collect() - } -} - -impl Transport for TelegramPlugin { - fn name(&self) -> Cow<'static, str> { - "telegram".into() - } - - fn send_step(&self) -> Cow<'static, str> { - "telegram.send".into() + fn name(&self) -> &'static str { + "telegram" } } @@ -138,9 +84,10 @@ struct TelegramContent { } impl TryFrom for TelegramContent { - type Error = (); + type Error = EngineError; fn try_from(value: RenderedTemplate) -> Result { - serde_json::from_value(Value::from_iter(value.0)).map_err(|_| ()) + serde_json::from_value(Value::from_iter(value.0)) + .map_err(|e| EngineError::InvalidRenderedTemplateFormat(e.into())) } } diff --git a/transports/notifico-telegram/src/step.rs b/transports/notifico-telegram/src/step.rs deleted file mode 100644 index 5e47f00..0000000 --- a/transports/notifico-telegram/src/step.rs +++ /dev/null @@ -1,11 +0,0 @@ -use notifico_core::credentials::CredentialSelector; -use serde::{Deserialize, Serialize}; - -#[derive(Serialize, Deserialize)] -#[serde(tag = "step")] -pub enum Step { - #[serde(rename = "telegram.send")] - Send { credential: CredentialSelector }, -} - -pub(crate) const STEPS: &[&str] = &["telegram.send"]; diff --git a/transports/notifico-whatsapp/src/lib.rs b/transports/notifico-whatsapp/src/lib.rs index 129f851..9f55e10 100644 --- a/transports/notifico-whatsapp/src/lib.rs +++ b/transports/notifico-whatsapp/src/lib.rs @@ -1,121 +1,67 @@ use crate::cloudapi::{MessageType, MessagingProduct}; use crate::credentials::WhatsAppCredentials; -use crate::step::{Step, STEPS}; use async_trait::async_trait; -use notifico_core::contact::MobilePhoneContact; -use notifico_core::recorder::Recorder; -use notifico_core::step::SerializedStep; -use notifico_core::transport::Transport; -use notifico_core::{ - credentials::CredentialStorage, - engine::PipelineContext, - engine::{EnginePlugin, StepOutput}, - error::EngineError, - templater::RenderedTemplate, -}; +use notifico_core::contact::{Contact, MobilePhoneContact}; +use notifico_core::credentials::Credential; +use notifico_core::simpletransport::SimpleTransport; +use notifico_core::{error::EngineError, templater::RenderedTemplate}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use std::borrow::Cow; -use std::sync::Arc; mod cloudapi; mod credentials; -mod step; -pub struct WaBusinessPlugin { - credentials: Arc, - recorder: Arc, +pub struct WabaTransport { client: reqwest::Client, } -impl WaBusinessPlugin { - pub fn new(credentials: Arc, recorder: Arc) -> Self { +impl WabaTransport { + pub fn new() -> Self { Self { - credentials, - recorder, client: reqwest::Client::new(), } } } #[async_trait] -impl EnginePlugin for WaBusinessPlugin { - async fn execute_step( +impl SimpleTransport for WabaTransport { + async fn send_message( &self, - context: &mut PipelineContext, - step: &SerializedStep, - ) -> Result { - let step: Step = step.clone().convert_step()?; - - match step { - Step::Send { credential } => { - let contacts: Vec = context.get_recipient()?.get_contacts(); - - // Send - let credential: WhatsAppCredentials = self - .credentials - .resolve(context.project_id, credential) - .await?; - - let url = format!( - "https://graph.facebook.com/v20.0/{}/messages", - credential.phone_id - ); - - for contact in contacts { - for message in context.messages.iter().cloned() { - let wa_message: WhatsAppContent = message.content.try_into().unwrap(); - - let wamessage = cloudapi::Message { - messaging_product: MessagingProduct::Whatsapp, - to: contact.number.clone(), - language: "en_US".into(), - message: MessageType::Text { - preview_url: false, - body: wa_message.body, - }, - }; - - let result = self - .client - .post(url.clone()) - .header("Authorization", format!("Bearer {}", credential.token)) - .json(&wamessage) - .send() - .await; - match result { - Ok(_) => self.recorder.record_message_sent( - context.event_id, - context.notification_id, - message.id, - ), - Err(e) => self.recorder.record_message_failed( - context.event_id, - context.notification_id, - message.id, - &e.to_string(), - ), - } - } - } - } - } - - Ok(StepOutput::Continue) - } - - fn steps(&self) -> Vec> { - STEPS.iter().map(|&s| s.into()).collect() - } -} - -impl Transport for WaBusinessPlugin { - fn name(&self) -> Cow<'static, str> { - "waba".into() + credential: Credential, + contact: Contact, + message: RenderedTemplate, + ) -> Result<(), EngineError> { + let credential: WhatsAppCredentials = credential.try_into()?; + let contact: MobilePhoneContact = contact.try_into()?; + let message: WhatsAppContent = message.try_into()?; + + let url = format!( + "https://graph.facebook.com/v20.0/{}/messages", + credential.phone_id + ); + + let request = cloudapi::Message { + messaging_product: MessagingProduct::Whatsapp, + to: contact.number.clone(), + language: "en_US".into(), + message: MessageType::Text { + preview_url: false, + body: message.body, + }, + }; + + self.client + .post(url) + .header("Authorization", format!("Bearer {}", credential.token)) + .json(&request) + .send() + .await + .map_err(|e| EngineError::InternalError(e.into()))?; + Ok(()) } - fn send_step(&self) -> Cow<'static, str> { - "waba.send".into() + fn name(&self) -> &'static str { + "waba" } } @@ -125,9 +71,10 @@ struct WhatsAppContent { } impl TryFrom for WhatsAppContent { - type Error = (); + type Error = EngineError; fn try_from(value: RenderedTemplate) -> Result { - serde_json::from_value(Value::from_iter(value.0)).map_err(|_| ()) + serde_json::from_value(Value::from_iter(value.0)) + .map_err(|e| EngineError::InternalError(e.into())) } } diff --git a/transports/notifico-whatsapp/src/step.rs b/transports/notifico-whatsapp/src/step.rs deleted file mode 100644 index 72cbeea..0000000 --- a/transports/notifico-whatsapp/src/step.rs +++ /dev/null @@ -1,11 +0,0 @@ -use notifico_core::credentials::CredentialSelector; -use serde::{Deserialize, Serialize}; - -#[derive(Serialize, Deserialize)] -#[serde(tag = "step")] -pub enum Step { - #[serde(rename = "waba.send")] - Send { credential: CredentialSelector }, -} - -pub const STEPS: &[&str] = &["waba.send"];