-
Notifications
You must be signed in to change notification settings - Fork 24
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
WIP: rough subscription manager API proposal
- Loading branch information
Showing
6 changed files
with
369 additions
and
0 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,269 @@ | ||
#![allow(unused)] | ||
|
||
use futures::StreamExt; | ||
use std::cmp::Ordering; | ||
use std::collections::BTreeMap; | ||
use std::error::Error; | ||
use std::fmt; | ||
use thiserror::Error; | ||
|
||
use enostr::Filter; | ||
use nostrdb::{self, Config, Ndb, NoteKey, Subscription, SubscriptionStream}; | ||
|
||
/// The Subscription Manager | ||
/// | ||
/// NOTE - This interface wishes it was called Subscriptions but there | ||
/// already is one. Using a lame (but short) placeholder name instead | ||
/// for now ... | ||
/// | ||
/// ```no_run | ||
/// use std::error::Error; | ||
/// | ||
/// use nostrdb::{Config, Ndb}; | ||
/// use enostr::Filter; | ||
/// use notedeck::submgr::{SubConstraint, SubMgr, SubSpecBuilder, SubError}; | ||
/// | ||
/// #[tokio::main] | ||
/// async fn main() -> Result<(), Box<dyn Error>> { | ||
/// let mut ndb = Ndb::new("the/db/path/", &Config::new())?; | ||
/// let mut submgr = SubMgr::new(&mut ndb); | ||
/// | ||
/// // Define a filter and build the subscription specification | ||
/// let filter = Filter::new().kinds(vec![1, 2, 3]).build(); | ||
/// let spec = SubSpecBuilder::new() | ||
/// .filters(vec![filter]) | ||
/// .constraint(SubConstraint::Local) | ||
/// .build(); | ||
/// | ||
/// // Subscribe and obtain a SubReceiver | ||
/// let mut receiver = submgr.subscribe(spec)?; | ||
/// | ||
/// // Process incoming note keys | ||
/// loop { | ||
/// match receiver.next().await { | ||
/// Ok(note_keys) => { | ||
/// // Process the note keys | ||
/// println!("Received note keys: {:?}", note_keys); | ||
/// }, | ||
/// Err(SubError::StreamEnded) => { | ||
/// // Not really an error; we should clean up | ||
/// break; | ||
/// }, | ||
/// Err(err) => { | ||
/// // Handle other errors | ||
/// eprintln!("Error: {:?}", err); | ||
/// break; | ||
/// }, | ||
/// } | ||
/// } | ||
/// | ||
/// // Unsubscribe when the subscription is no longer needed | ||
/// submgr.unsubscribe(&receiver)?; | ||
/// | ||
/// Ok(()) | ||
/// } | ||
/// ``` | ||
#[derive(Debug, Error)] | ||
pub enum SubError { | ||
#[error("Stream ended")] | ||
StreamEnded, | ||
|
||
#[error("Internal error: {0}")] | ||
InternalError(String), | ||
|
||
#[error("nostrdb error: {0}")] | ||
NdbError(#[from] nostrdb::Error), | ||
} | ||
|
||
pub type SubResult<T> = Result<T, SubError>; | ||
|
||
#[derive(Debug, Clone, Copy)] | ||
pub struct SubId(nostrdb::Subscription); | ||
|
||
impl From<Subscription> for SubId { | ||
fn from(subscription: Subscription) -> Self { | ||
SubId(subscription) | ||
} | ||
} | ||
|
||
impl Ord for SubId { | ||
fn cmp(&self, other: &Self) -> Ordering { | ||
self.0.id().cmp(&other.0.id()) | ||
} | ||
} | ||
|
||
impl PartialOrd for SubId { | ||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> { | ||
Some(self.cmp(other)) | ||
} | ||
} | ||
|
||
impl PartialEq for SubId { | ||
fn eq(&self, other: &Self) -> bool { | ||
self.0.id() == other.0.id() | ||
} | ||
} | ||
|
||
impl Eq for SubId {} | ||
|
||
#[derive(Debug, Clone)] | ||
pub enum SubConstraint { | ||
OneShot, // terminate subscription after initial query | ||
Local, // only query the local db, no remote subs | ||
OutboxRelays(Vec<String>), // ensure one of these is in the active relay set | ||
AllowedRelays(Vec<String>), // if not empty, only use these relays | ||
BlockedRelays(Vec<String>), // if not empty, don't use these relays | ||
} | ||
|
||
#[derive(Debug, Default)] | ||
pub struct SubSpecBuilder { | ||
rmtid: Option<String>, | ||
filters: Vec<Filter>, | ||
constraints: Vec<SubConstraint>, | ||
} | ||
|
||
impl SubSpecBuilder { | ||
pub fn new() -> Self { | ||
SubSpecBuilder::default() | ||
} | ||
pub fn rmtid(mut self, id: String) -> Self { | ||
self.rmtid = Some(id); | ||
self | ||
} | ||
pub fn filters(mut self, filters: Vec<Filter>) -> Self { | ||
self.filters.extend(filters); | ||
self | ||
} | ||
pub fn constraint(mut self, constraint: SubConstraint) -> Self { | ||
self.constraints.push(constraint); | ||
self | ||
} | ||
pub fn build(self) -> SubSpec { | ||
let mut outbox_relays = Vec::new(); | ||
let mut allowed_relays = Vec::new(); | ||
let mut blocked_relays = Vec::new(); | ||
let mut is_oneshot = false; | ||
let mut is_local = false; | ||
|
||
for constraint in self.constraints { | ||
match constraint { | ||
SubConstraint::OneShot => is_oneshot = true, | ||
SubConstraint::Local => is_local = true, | ||
SubConstraint::OutboxRelays(relays) => outbox_relays.extend(relays), | ||
SubConstraint::AllowedRelays(relays) => allowed_relays.extend(relays), | ||
SubConstraint::BlockedRelays(relays) => blocked_relays.extend(relays), | ||
} | ||
} | ||
|
||
SubSpec { | ||
rmtid: self.rmtid, | ||
filters: self.filters, | ||
outbox_relays, | ||
allowed_relays, | ||
blocked_relays, | ||
is_oneshot, | ||
is_local, | ||
} | ||
} | ||
} | ||
|
||
#[derive(Debug, Clone)] | ||
pub struct SubSpec { | ||
rmtid: Option<String>, | ||
filters: Vec<Filter>, | ||
outbox_relays: Vec<String>, | ||
allowed_relays: Vec<String>, | ||
blocked_relays: Vec<String>, | ||
is_oneshot: bool, | ||
is_local: bool, | ||
} | ||
|
||
pub struct SubMgr<'a> { | ||
ndb: &'a mut Ndb, | ||
subs: BTreeMap<SubId, SubSpec>, | ||
} | ||
|
||
impl<'a> SubMgr<'a> { | ||
pub fn new(ndb: &'a mut Ndb) -> Self { | ||
SubMgr { | ||
ndb, | ||
subs: BTreeMap::new(), | ||
} | ||
} | ||
|
||
pub fn subscribe(&mut self, spec: SubSpec) -> SubResult<SubReceiver> { | ||
let receiver = self.make_subscription(&spec)?; | ||
self.subs.insert(receiver.id, spec); | ||
Ok(receiver) | ||
} | ||
|
||
pub fn unsubscribe(&mut self, rcvr: &SubReceiver) -> SubResult<()> { | ||
self.subs.remove(&rcvr.id); | ||
Ok(()) | ||
} | ||
|
||
fn make_subscription(&mut self, sub: &SubSpec) -> SubResult<SubReceiver> { | ||
let subscription = self.ndb.subscribe(&sub.filters)?; | ||
let mut stream = subscription.stream(self.ndb).notes_per_await(1); | ||
Ok(SubReceiver::new(subscription.into(), stream)) | ||
} | ||
|
||
// Unfortunately this is needed so the caller doesn't need | ||
// concurrent mutable and immutable references to ndb. I think | ||
// making process_event use a mutable reference to ndb might be a | ||
// more obvious fix | ||
#[cfg(test)] | ||
pub fn process_event(&mut self, raw_msg: &str) -> SubResult<()> { | ||
Ok(self.ndb.process_event(raw_msg)?) | ||
} | ||
} | ||
|
||
pub struct SubReceiver { | ||
id: SubId, | ||
stream: SubscriptionStream, | ||
} | ||
|
||
impl SubReceiver { | ||
pub fn new(id: SubId, stream: SubscriptionStream) -> Self { | ||
SubReceiver { id, stream } | ||
} | ||
|
||
pub async fn next(&mut self) -> SubResult<Vec<nostrdb::NoteKey>> { | ||
self.stream.next().await.ok_or(SubError::StreamEnded) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
|
||
use crate::testdbs_path_async; | ||
use crate::util::test_util::{raw_msg, test_keypair, ScopedNdb}; | ||
|
||
#[tokio::test] | ||
async fn test_submgr_sub() -> Result<(), Box<dyn std::error::Error>> { | ||
// setup an ndb and submgr to test | ||
let mut sndb = ScopedNdb::new(&testdbs_path_async!()); | ||
let mut submgr = SubMgr::new(&mut sndb.ndb); | ||
|
||
// subscribe to some stuff | ||
let filter = Filter::new().kinds(vec![1]).build(); | ||
let spec = SubSpecBuilder::new() | ||
.filters(vec![filter]) | ||
.constraint(SubConstraint::Local) | ||
.build(); | ||
let mut receiver = submgr.subscribe(spec)?; | ||
|
||
// process a test event that matches the subscription | ||
let keys1 = test_keypair(1); | ||
submgr.process_event(&raw_msg("rndsubid", &keys1, 1, "hello world"))?; | ||
|
||
// receiver should now see the msg | ||
let notekeys = receiver.next().await?; | ||
assert_eq!(notekeys, vec![NoteKey::new(1)]); | ||
|
||
submgr.unsubscribe(&receiver)?; | ||
Ok(()) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
#[allow(missing_docs)] | ||
#[cfg(test)] | ||
#[macro_use] | ||
pub mod test_util; |
Oops, something went wrong.