Skip to content

Commit

Permalink
Change from hertz to interval in ms
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasmittag committed Apr 23, 2024
1 parent ac9c15a commit aa843c2
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 12 deletions.
14 changes: 7 additions & 7 deletions databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ impl Subscriptions {
pub fn add_continuous_subscription(
&mut self,
subscription: ContinuousSubscription,
frequency: u64,
interval: u64,
) {
let handle = tokio::spawn(async move {
// no need to check token expiration in cleanup method because we constantly call notify. Notify is handling it.
Expand All @@ -746,7 +746,7 @@ impl Subscriptions {
Err(_) => break,
}
// wait for some time to meet frequency parameter (it is a u64)
tokio::time::sleep(Duration::from_millis(1000 / frequency)).await;
tokio::time::sleep(Duration::from_millis(interval)).await;
}
});
self.continuous_subscriptions.push(handle);
Expand Down Expand Up @@ -1585,10 +1585,10 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
}

// only supportede by the new API
pub async fn subscribe_freq(
pub async fn subscribe_interval(
&self,
valid_entries: HashMap<i32, HashSet<Field>>,
frequency: Option<u64>,
interval_ms: Option<u64>,
) -> Result<impl Stream<Item = EntryUpdates>, SubscriptionError> {
if valid_entries.is_empty() {
return Err(SubscriptionError::InvalidInput);
Expand All @@ -1597,8 +1597,8 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
let db_read = self.broker.database.read().await;

let (sender, receiver) = mpsc::channel(10);
match frequency {
Some(freq) => {
match interval_ms {
Some(interval) => {
let subscription_continuous = ContinuousSubscription {
entries: valid_entries,
sender,
Expand All @@ -1616,7 +1616,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
.subscriptions
.write()
.await
.add_continuous_subscription(subscription_continuous, freq);
.add_continuous_subscription(subscription_continuous, interval);
}
None => {
let subscription = ChangeSubscription {
Expand Down
2 changes: 1 addition & 1 deletion databroker/src/grpc/kuksa_val_v1/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ impl proto::val_server::Val for broker::DataBroker {
}

match broker
.subscribe_freq(entries, request.frequency_hertz)
.subscribe_interval(entries, request.interval_ms)
.await
{
Ok(stream) => {
Expand Down
6 changes: 3 additions & 3 deletions lib/kuksa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ impl KuksaClient {

let req = proto::v1::SubscribeRequest {
entries,
frequency_hertz: None,
interval_ms: None,
};

match client.subscribe(req).await {
Expand Down Expand Up @@ -326,7 +326,7 @@ impl KuksaClient {

let req = proto::v1::SubscribeRequest {
entries,
frequency_hertz: None,
interval_ms: None,
};

match client.subscribe(req).await {
Expand Down Expand Up @@ -354,7 +354,7 @@ impl KuksaClient {

let req = proto::v1::SubscribeRequest {
entries,
frequency_hertz: None,
interval_ms: None,
};

match client.subscribe(req).await {
Expand Down
2 changes: 1 addition & 1 deletion proto/kuksa/val/v1/val.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ message SubscribeEntry {
// Subscribe to changes in datapoints.
message SubscribeRequest {
repeated SubscribeEntry entries = 1;
optional uint64 frequency_hertz = 2;
optional uint64 interval_ms = 2;
}

// A subscription response
Expand Down

0 comments on commit aa843c2

Please sign in to comment.