Skip to content

Commit

Permalink
feat: add PubSub SubscribeFilter functionality (#120)
Browse files Browse the repository at this point in the history
* Add filter functionality

* Add example and docs

* Move complex type to type alias and refactor filter code
  • Loading branch information
hirschenberger authored Jan 20, 2025
1 parent febad08 commit bc4c631
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 6 deletions.
92 changes: 92 additions & 0 deletions examples/pubsub_filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use kameo::{
actor::pubsub::{PubSub, Publish, Subscribe, SubscribeFilter},
message::{Context, Message},
Actor,
};
use tracing_subscriber::EnvFilter;

#[derive(Clone, Debug)]
struct PrintActorID(String);

#[derive(Actor, Default)]
struct ActorA;

impl Message<PrintActorID> for ActorA {
type Reply = ();

async fn handle(
&mut self,
m: PrintActorID,
ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
println!("ActorA: {} - {:?}", ctx.actor_ref().id(), m);
}
}

#[derive(Actor, Default)]
struct ActorB;

impl Message<PrintActorID> for ActorB {
type Reply = ();

async fn handle(
&mut self,
m: PrintActorID,
ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
println!("ActorB: {} - {:?}", ctx.actor_ref().id(), m);
}
}

#[derive(Actor, Default)]
struct ActorC;

impl Message<PrintActorID> for ActorC {
type Reply = ();

async fn handle(
&mut self,
m: PrintActorID,
ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
println!("ActorC: {} - {:?}", ctx.actor_ref().id(), m);
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter("warn".parse::<EnvFilter>().unwrap())
.without_time()
.with_target(false)
.init();

let pubsub = kameo::spawn(PubSub::<PrintActorID>::new());
let actor_a = kameo::spawn(ActorA);
let actor_b = kameo::spawn(ActorB);
let actor_c = kameo::spawn(ActorC);
pubsub
.ask(SubscribeFilter(actor_a, |m: &PrintActorID| {
m.0.starts_with("TopicA:")
}))
.await?;
pubsub
.ask(SubscribeFilter(actor_b, |m: &PrintActorID| {
m.0.starts_with("TopicB:")
}))
.await?;
pubsub.ask(Subscribe(actor_c)).await?;

pubsub
.ask(Publish(PrintActorID(
"TopicA: Some important note".to_string(),
)))
.await?;
pubsub
.ask(Publish(PrintActorID(
"TopicB: Some very important note".to_string(),
)))
.await?;

Ok(())
}
94 changes: 88 additions & 6 deletions src/actor/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//! - **Publish-Subscribe Pattern**: Actors can subscribe to the `PubSub` actor to receive broadcast messages.
//! - **Message Broadcasting**: Messages published to the `PubSub` actor are sent to all subscribed actors.
//! - **Subscriber Management**: Actors can subscribe and unsubscribe dynamically, allowing flexible message routing.
//! - **Message Filtering**: Messages can be filtered out with a filter-function to allow topic subscription or conditional sending.
//!
//! # Example
//!
Expand Down Expand Up @@ -58,6 +59,9 @@ use crate::{

use super::{ActorID, ActorRef};

type SubscriberMap<M> =
HashMap<ActorID, (Box<dyn MessageSubscriber<M> + Send + Sync>, fn(&M) -> bool)>;

/// A publish-subscribe (pubsub) actor that allows message broadcasting to multiple subscribers.
///
/// `PubSub` can be used as a standalone object or spawned as an actor. When spawned, messages can
Expand All @@ -66,7 +70,7 @@ use super::{ActorID, ActorRef};
/// to manage it directly or interact with it via messages.
#[allow(missing_debug_implementations)]
pub struct PubSub<M> {
subscribers: HashMap<ActorID, Box<dyn MessageSubscriber<M> + Send + Sync>>,
subscribers: SubscriberMap<M>,
}

impl<M> PubSub<M> {
Expand Down Expand Up @@ -104,10 +108,16 @@ impl<M> PubSub<M> {
where
M: Clone + Send + 'static,
{
let results = join_all(self.subscribers.iter().map(|(id, subscriber)| {
let msg = msg.clone();
async move { (*id, subscriber.tell(msg).await) }
}))
let results = join_all(
self.subscribers
.iter()
.filter_map(|(id, (subscriber, filter))| {
filter(&msg).then_some({
let msg = msg.clone();
async move { (*id, subscriber.tell(msg).await) }
})
}),
)
.await;
for (id, result) in results.into_iter() {
match result {
Expand Down Expand Up @@ -160,7 +170,52 @@ impl<M> PubSub<M> {
for<'a> TellRequest<LocalTellRequest<'a, A, A::Mailbox>, A::Mailbox, M, WithoutRequestTimeout>:
MessageSend<Ok = (), Error = SendError<M, <A::Reply as Reply>::Error>>,
{
self.subscribers.insert(actor_ref.id(), Box::new(actor_ref));
self.subscribers
.insert(actor_ref.id(), (Box::new(actor_ref), |_| true));
}

/// Subscribes an actor to receive only messages published by the pubsub actor that pass the given
/// filter function.
///
/// Once subscribed, the actor will receive only the messages sent to the pubsub actor via the `publish` method where
/// the given filter function returns `true`.
/// The actor reference is stored in the list of subscribers, and messages are sent to the actor asynchronously.
///
/// # Example
///
/// ```
/// # use kameo::Actor;
/// use kameo::actor::pubsub::PubSub;
/// # use kameo::message::{Context, Message};
///
/// # #[derive(Actor)]
/// # struct MyActor;
/// #
/// # impl Message<Msg> for MyActor {
/// # type Reply = ();
/// # async fn handle(&mut self, msg: Msg, ctx: Context<'_, Self, Self::Reply>) -> Self::Reply { }
/// # }
/// #
/// #[derive(Clone)]
/// struct Msg(String);
///
/// # tokio_test::block_on(async {
/// let mut pubsub = PubSub::new();
///
/// let actor_ref = kameo::spawn(MyActor);
/// pubsub.subscribe_filter(actor_ref, |m| m.0.starts_with("my-topic:"));
/// # })
/// ```
#[inline]
pub fn subscribe_filter<A>(&mut self, actor_ref: ActorRef<A>, filter: fn(&M) -> bool)
where
A: Actor + Message<M>,
M: Send + 'static,
for<'a> TellRequest<LocalTellRequest<'a, A, A::Mailbox>, A::Mailbox, M, WithoutRequestTimeout>:
MessageSend<Ok = (), Error = SendError<M, <A::Reply as Reply>::Error>>,
{
self.subscribers
.insert(actor_ref.id(), (Box::new(actor_ref), filter));
}
}

Expand Down Expand Up @@ -221,6 +276,33 @@ where
}
}

/// A message used to subscribe an actor and filter a bessage before sending to a `PubSub` actor.
///
/// This struct wraps an `ActorRef` and is used to subscribe an actor to a pubsub actor. Before sending
/// the message is passed to the given function and only sent if this function returns `true`.
///
/// Once subscribed, the actor will receive all published and unfiltered messages from the pubsub actor.
#[derive(Clone, Debug)]
pub struct SubscribeFilter<A: Actor, M: Send + 'static>(pub ActorRef<A>, pub fn(&M) -> bool);

impl<A, M> Message<SubscribeFilter<A, M>> for PubSub<M>
where
A: Actor + Message<M>,
M: Send + 'static,
for<'a> TellRequest<LocalTellRequest<'a, A, A::Mailbox>, A::Mailbox, M, WithoutRequestTimeout>:
MessageSend<Ok = (), Error = SendError<M, <A::Reply as Reply>::Error>>,
{
type Reply = ();

async fn handle(
&mut self,
SubscribeFilter(actor_ref, filter): SubscribeFilter<A, M>,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
self.subscribe_filter(actor_ref, filter)
}
}

trait MessageSubscriber<M> {
fn tell(&self, msg: M) -> BoxFuture<'_, Result<(), SendError<M, ()>>>;
}
Expand Down

0 comments on commit bc4c631

Please sign in to comment.