Skip to content

Commit

Permalink
fix(webserver): Add ack_policy for historical data consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
pedronauck committed Dec 28, 2024
1 parent eff04e5 commit 8992ab5
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 14 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/fuel-streams-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ anyhow = { workspace = true }
async-nats = { workspace = true }
async-trait = { workspace = true }
displaydoc = { workspace = true }
dotenvy = { workspace = true }
fuel-core = { workspace = true, default-features = false, features = [
"p2p",
"relayer",
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-streams-core/src/stream/fuel_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl FuelStreams {
&self,
sub_subject: &str,
subscription_config: Option<SubscriptionConfig>,
) -> Result<BoxStream<'_, Vec<u8>>, StreamError> {
) -> Result<BoxStream<'_, (Vec<u8>, NatsMessage)>, StreamError> {
match sub_subject {
Transaction::NAME => {
self.transactions.subscribe_raw(subscription_config).await
Expand Down
36 changes: 24 additions & 12 deletions crates/fuel-streams-core/src/stream/stream_impl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::{Arc, LazyLock};

use async_nats::{
jetstream::{
Expand All @@ -15,6 +15,13 @@ use tokio::sync::OnceCell;

use crate::prelude::*;

pub static MAX_ACK_PENDING: LazyLock<usize> = LazyLock::new(|| {
dotenvy::var("MAX_ACK_PENDING")
.ok()
.and_then(|val| val.parse().ok())
.unwrap_or(5)
});

#[derive(Debug, Clone)]
pub struct PublishPacket<T: Streamable> {
pub subject: Arc<dyn IntoSubject>,
Expand All @@ -34,7 +41,6 @@ impl<T: Streamable> PublishPacket<T> {
format!("{}.json.zstd", subject.replace('.', "/"))
}
}

pub trait StreamEncoder: DataEncoder<Err = StreamError> {}
impl<T: DataEncoder<Err = StreamError>> StreamEncoder for T {}

Expand Down Expand Up @@ -220,7 +226,7 @@ impl<S: Streamable> Stream<S> {
pub async fn subscribe_raw(
&self,
subscription_config: Option<SubscriptionConfig>,
) -> Result<BoxStream<'_, Vec<u8>>, StreamError> {
) -> Result<BoxStream<'_, (Vec<u8>, NatsMessage)>, StreamError> {
let config = self.get_consumer_config(subscription_config);
let config = self.prefix_filter_subjects(config);
let consumer = self.store.stream.create_consumer(config).await?;
Expand All @@ -229,16 +235,19 @@ impl<S: Streamable> Stream<S> {

Ok(messages
.then(move |message| {
let nats_payload =
message.expect("Message must be valid").payload.to_vec();
let message = message.expect("Message must be valid");
let nats_payload = message.payload.to_vec();
let storage = storage.clone();
async move {
let s3_path = String::from_utf8(nats_payload)
.expect("Must be S3 path");
storage
.retrieve(&s3_path)
.await
.expect("S3 object must exist")
(
storage
.retrieve(&s3_path)
.await
.expect("S3 object must exist"),
message,
)
}
})
.boxed())
Expand All @@ -250,8 +259,10 @@ impl<S: Streamable> Stream<S> {
) -> Result<BoxStream<'_, S>, StreamError> {
let raw_stream = self.subscribe_raw(subscription_config).await?;
Ok(raw_stream
.then(|s3_data| async move {
S::decode(&s3_data).await.expect("Failed to decode")
.then(|(s3_data, message)| async move {
let item = S::decode(&s3_data).await.expect("Failed to decode");
let _ = message.ack().await;
item
})
.boxed())
}
Expand All @@ -271,7 +282,8 @@ impl<S: Streamable> Stream<S> {
PullConsumerConfig {
filter_subjects,
deliver_policy: delivery_policy,
ack_policy: AckPolicy::None,
ack_policy: AckPolicy::Explicit,
max_ack_pending: *MAX_ACK_PENDING as i64,
..Default::default()
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/fuel-streams-nats/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub use async_nats::{
kv::Config as KvStoreConfig,
stream::Config as NatsStreamConfig,
Context as JetStreamContext,
Message as NatsMessage,
},
Client as AsyncNatsClient,
ConnectOptions as NatsConnectOpts,
Expand Down
5 changes: 4 additions & 1 deletion crates/sv-webserver/src/server/ws/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ async fn handle_binary_message(
};

// consume and forward to the ws
while let Some(s3_serialized_payload) = sub.next().await {
while let Some((s3_serialized_payload, message)) =
sub.next().await
{
// decode and serialize back to ws payload
let serialized_ws_payload = match decode(
&subject_wildcard,
Expand All @@ -239,6 +241,7 @@ async fn handle_binary_message(

// send the payload over the stream
let _ = stream_session.binary(serialized_ws_payload).await;
let _ = message.ack().await;
}
});
Ok(())
Expand Down

0 comments on commit 8992ab5

Please sign in to comment.