Skip to content

Commit

Permalink
Unit tests & unified subscribe function
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasmittag committed Apr 23, 2024
1 parent aa843c2 commit a8d4101
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 37 deletions.
135 changes: 99 additions & 36 deletions databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1551,41 +1551,8 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
}
}

pub async fn subscribe(
&self,
valid_entries: HashMap<i32, HashSet<Field>>,
) -> Result<impl Stream<Item = EntryUpdates>, SubscriptionError> {
if valid_entries.is_empty() {
return Err(SubscriptionError::InvalidInput);
}

let (sender, receiver) = mpsc::channel(10);
let subscription = ChangeSubscription {
entries: valid_entries,
sender,
permissions: self.permissions.clone(),
};

{
// Send everything subscribed to in an initial notification
let db = self.broker.database.read().await;
if subscription.notify(None, &db).await.is_err() {
warn!("Failed to create initial notification");
}
}

self.broker
.subscriptions
.write()
.await
.add_change_subscription(subscription);

let stream = ReceiverStream::new(receiver);
Ok(stream)
}

// only supportede by the new API
pub async fn subscribe_interval(
pub async fn subscribe(
&self,
valid_entries: HashMap<i32, HashSet<Field>>,
interval_ms: Option<u64>,
Expand Down Expand Up @@ -3123,7 +3090,7 @@ mod tests {
}

#[tokio::test]
async fn test_subscribe_and_get() {
async fn test_subscribe_and_interval_and_get() {
let broker = DataBroker::default();
let broker = broker.authorized_access(&permissions::ALLOW_ALL);

Expand All @@ -3139,9 +3106,23 @@ mod tests {
)
.await
.expect("Register datapoint should succeed");

let id2 = broker
.add_entry(
"test.datapoint2".to_owned(),
DataType::Int32,
ChangeType::Continuous,
EntryType::Sensor,
"Test datapoint 2".to_owned(),
None,
None,
)
.await
.expect("Register datapoint should succeed");


let mut stream = broker
.subscribe(HashMap::from([(id1, HashSet::from([Field::Datapoint]))]))
.subscribe(HashMap::from([(id1, HashSet::from([Field::Datapoint]))]), None)
.await
.expect("subscription should succeed");

Expand Down Expand Up @@ -3202,6 +3183,88 @@ mod tests {
}
}

let mut continous_stream = broker
.subscribe(HashMap::from([(id2, HashSet::from([Field::Datapoint]))]), Some(1000))
.await
.expect("subscription should succeed");

// Stream should yield initial notification with current values i.e. NotAvailable
match continous_stream.next().await {
Some(next) => {
assert_eq!(next.updates.len(), 1);
assert_eq!(
next.updates[0].update.path,
Some("test.datapoint2".to_string())
);
assert_eq!(
next.updates[0].update.datapoint.as_ref().unwrap().value,
DataValue::NotAvailable
);
}
None => {
panic!("did not expect stream end")
}
}

broker
.update_entries([(
id2,
EntryUpdate {
path: None,
datapoint: Some(Datapoint {
ts: SystemTime::now(),
source_ts: None,
value: DataValue::Int32(101),
}),
actuator_target: None,
entry_type: None,
data_type: None,
description: None,
allowed: None,
unit: None,
},
)])
.await
.expect("setting datapoint #1");

// await next event to start the timer right
match continous_stream.next().await {
Some(next) => {
assert_eq!(next.updates.len(), 1);
assert_eq!(
next.updates[0].update.path,
Some("test.datapoint2".to_string())
);
assert_eq!(
next.updates[0].update.datapoint.as_ref().unwrap().value,
DataValue::Int32(101)
);
}
None => {
panic!("did not expect stream end")
}
}

let time = SystemTime::now();
match continous_stream.next().await {
Some(next) => {
// some delay is expected so check if its small enough
assert!(time.elapsed().unwrap() - Duration::from_millis(1000) < Duration::from_millis(20));
assert_eq!(next.updates.len(), 1);
assert_eq!(
next.updates[0].update.path,
Some("test.datapoint2".to_string())
);
assert_eq!(
next.updates[0].update.datapoint.as_ref().unwrap().value,
DataValue::Int32(101)
);
}
None => {
panic!("did not expect stream end")
}
}

// Check that the data point has been stored as well
match broker.get_datapoint(id1).await {
Ok(datapoint) => {
Expand Down
2 changes: 1 addition & 1 deletion databroker/src/grpc/kuksa_val_v1/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ impl proto::val_server::Val for broker::DataBroker {
}

match broker
.subscribe_interval(entries, request.interval_ms)
.subscribe(entries, request.interval_ms)
.await
{
Ok(stream) => {
Expand Down

0 comments on commit a8d4101

Please sign in to comment.