Skip to content

Commit

Permalink
fix(webserver): Adjust DeliverPolicy according passed through WebSocket
Browse files Browse the repository at this point in the history
  • Loading branch information
pedronauck committed Dec 23, 2024
1 parent e71fc1c commit 042c1f8
Show file tree
Hide file tree
Showing 19 changed files with 77 additions and 8,117 deletions.
10 changes: 6 additions & 4 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,16 @@ jobs:
repo
deps
release
core
data-parser
networks
fuel-streams
core
executors
macros
publisher
nats
storage
ws
consumer
publisher
webserver
lockfile:
name: Validate Lockfile
Expand Down
46 changes: 13 additions & 33 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 12 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ homepage = "https://fuel.network/"
license = "Apache-2.0"
repository = "https://github.com/fuellabs/data-systems"
rust-version = "1.81.0"
version = "0.0.14"
version = "0.0.13"

[workspace.dependencies]
actix-cors = "0.7"
Expand Down Expand Up @@ -69,18 +69,17 @@ tracing-subscriber = "0.3"
tracing-actix-web = "0.7"
thiserror = "2.0"

fuel-streams = { version = "0.0.14", path = "crates/fuel-streams" }
fuel-data-parser = { version = "0.0.14", path = "crates/fuel-data-parser" }
fuel-streams-core = { version = "0.0.14", path = "crates/fuel-streams-core" }
fuel-streams-macros = { version = "0.0.14", path = "crates/fuel-streams-macros" }
fuel-streams-nats = { version = "0.0.14", path = "crates/fuel-streams-nats" }
fuel-streams-storage = { version = "0.0.14", path = "crates/fuel-streams-storage" }
fuel-streams-executors = { version = "0.0.14", path = "crates/fuel-streams-executors" }
subject-derive = { version = "0.0.14", path = "crates/fuel-streams-macros/subject-derive" }
sv-publisher = { version = "0.0.14", path = "crates/sv-publisher" }
sv-consumer = { version = "0.0.14", path = "crates/sv-consumer" }
sv-emitter = { version = "0.0.14", path = "crates/sv-emitter" }
sv-webserver = { version = "0.0.14", path = "crates/sv-webserver" }
fuel-streams = { version = "0.0.13", path = "crates/fuel-streams" }
fuel-data-parser = { version = "0.0.13", path = "crates/fuel-data-parser" }
fuel-streams-core = { version = "0.0.13", path = "crates/fuel-streams-core" }
fuel-streams-macros = { version = "0.0.13", path = "crates/fuel-streams-macros" }
fuel-streams-nats = { version = "0.0.13", path = "crates/fuel-streams-nats" }
fuel-streams-storage = { version = "0.0.13", path = "crates/fuel-streams-storage" }
fuel-streams-executors = { version = "0.0.13", path = "crates/fuel-streams-executors" }
subject-derive = { version = "0.0.13", path = "crates/fuel-streams-macros/subject-derive" }
sv-publisher = { version = "0.0.13", path = "crates/sv-publisher" }
sv-consumer = { version = "0.0.13", path = "crates/sv-consumer" }
sv-webserver = { version = "0.0.13", path = "crates/sv-webserver" }

# Workspace projects
[workspace.metadata.cargo-machete]
Expand Down
46 changes: 22 additions & 24 deletions crates/fuel-streams-core/src/stream/stream_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,18 +217,7 @@ impl<S: Streamable> Stream<S> {
&self,
subscription_config: Option<SubscriptionConfig>,
) -> Result<BoxStream<'_, Vec<u8>>, StreamError> {
let mut config = PullConsumerConfig {
filter_subjects: vec![S::WILDCARD_LIST[0].to_string()],
deliver_policy: NatsDeliverPolicy::All,
ack_policy: AckPolicy::None,
..Default::default()
};

if let Some(subscription_config) = subscription_config {
config.filter_subjects = subscription_config.filter_subjects;
config.deliver_policy = subscription_config.deliver_policy;
}

let config = self.get_consumer_config(subscription_config);
let config = self.prefix_filter_subjects(config);
let consumer = self.store.stream.create_consumer(config).await?;

Expand Down Expand Up @@ -261,18 +250,7 @@ impl<S: Streamable> Stream<S> {
&self,
subscription_config: Option<SubscriptionConfig>,
) -> Result<BoxStream<'_, S>, StreamError> {
let mut config = PullConsumerConfig {
filter_subjects: vec![S::WILDCARD_LIST[0].to_string()],
deliver_policy: NatsDeliverPolicy::All,
ack_policy: AckPolicy::None,
..Default::default()
};

if let Some(subscription_config) = subscription_config {
config.filter_subjects = subscription_config.filter_subjects;
config.deliver_policy = subscription_config.deliver_policy;
}

let config = self.get_consumer_config(subscription_config);
let config = self.prefix_filter_subjects(config);
let consumer = self.store.stream.create_consumer(config).await?;

Expand Down Expand Up @@ -301,6 +279,26 @@ impl<S: Streamable> Stream<S> {
.boxed())
}

pub fn get_consumer_config(
&self,
subscription_config: Option<SubscriptionConfig>,
) -> PullConsumerConfig {
let filter_subjects = match subscription_config.clone() {
Some(subscription_config) => subscription_config.filter_subjects,
None => vec![S::WILDCARD_LIST[0].to_string()],
};
let delivery_policy = match subscription_config.clone() {
Some(subscription_config) => subscription_config.deliver_policy,
None => NatsDeliverPolicy::New,
};
PullConsumerConfig {
filter_subjects,
deliver_policy: delivery_policy,
ack_policy: AckPolicy::None,
..Default::default()
}
}

#[cfg(feature = "test-helpers")]
/// Fetch all old messages from this stream
pub async fn catchup(
Expand Down
13 changes: 13 additions & 0 deletions crates/fuel-streams/src/networks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub enum FuelNetworkUserRole {
pub enum FuelNetwork {
#[default]
Local,
Staging,
Testnet,
Mainnet,
}
Expand All @@ -26,6 +27,7 @@ impl FromStr for FuelNetwork {
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"local" => Ok(FuelNetwork::Local),
"staging" => Ok(FuelNetwork::Staging),
"testnet" => Ok(FuelNetwork::Testnet),
"mainnet" => Ok(FuelNetwork::Mainnet),
_ => Err(format!("unknown network: {}", s)),
Expand All @@ -37,6 +39,7 @@ impl std::fmt::Display for FuelNetwork {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FuelNetwork::Local => write!(f, "local"),
FuelNetwork::Staging => write!(f, "staging"),
FuelNetwork::Testnet => write!(f, "testnet"),
FuelNetwork::Mainnet => write!(f, "mainnet"),
}
Expand All @@ -48,13 +51,15 @@ impl FuelNetwork {
match std::env::var("NETWORK").as_deref() {
Ok("testnet") => FuelNetwork::Testnet,
Ok("mainnet") => FuelNetwork::Mainnet,
Ok("staging") => FuelNetwork::Staging,
_ => FuelNetwork::Local,
}
}

pub fn to_nats_url(&self) -> String {
match self {
FuelNetwork::Local => "nats://localhost:4222",
FuelNetwork::Staging => "nats://stream-staging.fuel.network:4222",
FuelNetwork::Testnet => "nats://stream-testnet.fuel.network:4222",
FuelNetwork::Mainnet => "nats://stream.fuel.network:4222",
}
Expand All @@ -66,6 +71,10 @@ impl FuelNetwork {
FuelNetwork::Local => {
Url::parse("http://localhost:9003").expect("working url")
}
FuelNetwork::Staging => {
Url::parse("http://stream-staging.fuel.network:9003")
.expect("working url")
}
FuelNetwork::Testnet => {
Url::parse("http://stream-testnet.fuel.network:9003")
.expect("working url")
Expand All @@ -82,6 +91,10 @@ impl FuelNetwork {
FuelNetwork::Local => {
Url::parse("ws://0.0.0.0:9003").expect("working url")
}
FuelNetwork::Staging => {
Url::parse("ws://stream-staging.fuel.network:9003")
.expect("working url")
}
FuelNetwork::Testnet => {
Url::parse("ws://stream-testnet.fuel.network:9003")
.expect("working url")
Expand Down
Loading

0 comments on commit 042c1f8

Please sign in to comment.