Skip to content

Commit

Permalink
fix(webserver): Adjust DeliverPolicy to receive from WebSocket
Browse files Browse the repository at this point in the history
  • Loading branch information
pedronauck committed Dec 23, 2024
1 parent e71fc1c commit 96adca3
Show file tree
Hide file tree
Showing 22 changed files with 95 additions and 8,135 deletions.
10 changes: 6 additions & 4 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,16 @@ jobs:
repo
deps
release
core
data-parser
networks
fuel-streams
core
executors
macros
publisher
nats
storage
ws
consumer
publisher
webserver
lockfile:
name: Validate Lockfile
Expand Down
46 changes: 13 additions & 33 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 12 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ homepage = "https://fuel.network/"
license = "Apache-2.0"
repository = "https://github.com/fuellabs/data-systems"
rust-version = "1.81.0"
version = "0.0.14"
version = "0.0.13"

[workspace.dependencies]
actix-cors = "0.7"
Expand Down Expand Up @@ -69,18 +69,17 @@ tracing-subscriber = "0.3"
tracing-actix-web = "0.7"
thiserror = "2.0"

fuel-streams = { version = "0.0.14", path = "crates/fuel-streams" }
fuel-data-parser = { version = "0.0.14", path = "crates/fuel-data-parser" }
fuel-streams-core = { version = "0.0.14", path = "crates/fuel-streams-core" }
fuel-streams-macros = { version = "0.0.14", path = "crates/fuel-streams-macros" }
fuel-streams-nats = { version = "0.0.14", path = "crates/fuel-streams-nats" }
fuel-streams-storage = { version = "0.0.14", path = "crates/fuel-streams-storage" }
fuel-streams-executors = { version = "0.0.14", path = "crates/fuel-streams-executors" }
subject-derive = { version = "0.0.14", path = "crates/fuel-streams-macros/subject-derive" }
sv-publisher = { version = "0.0.14", path = "crates/sv-publisher" }
sv-consumer = { version = "0.0.14", path = "crates/sv-consumer" }
sv-emitter = { version = "0.0.14", path = "crates/sv-emitter" }
sv-webserver = { version = "0.0.14", path = "crates/sv-webserver" }
fuel-streams = { version = "0.0.13", path = "crates/fuel-streams" }
fuel-data-parser = { version = "0.0.13", path = "crates/fuel-data-parser" }
fuel-streams-core = { version = "0.0.13", path = "crates/fuel-streams-core" }
fuel-streams-macros = { version = "0.0.13", path = "crates/fuel-streams-macros" }
fuel-streams-nats = { version = "0.0.13", path = "crates/fuel-streams-nats" }
fuel-streams-storage = { version = "0.0.13", path = "crates/fuel-streams-storage" }
fuel-streams-executors = { version = "0.0.13", path = "crates/fuel-streams-executors" }
subject-derive = { version = "0.0.13", path = "crates/fuel-streams-macros/subject-derive" }
sv-publisher = { version = "0.0.13", path = "crates/sv-publisher" }
sv-consumer = { version = "0.0.13", path = "crates/sv-consumer" }
sv-webserver = { version = "0.0.13", path = "crates/sv-webserver" }

# Workspace projects
[workspace.metadata.cargo-machete]
Expand Down
18 changes: 9 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,23 @@ version:
@echo "Current version: $(VERSION)"

bump-version:
@if [ -z "$(NEW_VERSION)" ]; then \
echo "Error: NEW_VERSION is required"; \
echo "Usage: make bump-version NEW_VERSION=X.Y.Z"; \
@if [ -z "$(VERSION)" ]; then \
echo "Error: VERSION is required"; \
echo "Usage: make bump-version VERSION=X.Y.Z"; \
exit 1; \
fi
@echo "Bumping version to $(NEW_VERSION)..."
cargo set-version --workspace "$(NEW_VERSION)"
@echo "Bumping version to $(VERSION)..."
cargo set-version --workspace "$(VERSION)"
cargo update --workspace
$(MAKE) fmt

release: validate-env test lint
@if [ -z "$(NEW_VERSION)" ]; then \
echo "Error: NEW_VERSION is required"; \
echo "Usage: make release NEW_VERSION=X.Y.Z [dry_run=true]"; \
@if [ -z "$(VERSION)" ]; then \
echo "Error: VERSION is required"; \
echo "Usage: make release VERSION=X.Y.Z [dry_run=true]"; \
exit 1; \
fi
$(MAKE) bump-version NEW_VERSION=$(NEW_VERSION)
$(MAKE) bump-version VERSION=$(VERSION)
knope prepare-release $(if $(filter true,$(dry_run)),--dry-run,)

# ------------------------------------------------------------
Expand Down
46 changes: 22 additions & 24 deletions crates/fuel-streams-core/src/stream/stream_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,18 +217,7 @@ impl<S: Streamable> Stream<S> {
&self,
subscription_config: Option<SubscriptionConfig>,
) -> Result<BoxStream<'_, Vec<u8>>, StreamError> {
let mut config = PullConsumerConfig {
filter_subjects: vec![S::WILDCARD_LIST[0].to_string()],
deliver_policy: NatsDeliverPolicy::All,
ack_policy: AckPolicy::None,
..Default::default()
};

if let Some(subscription_config) = subscription_config {
config.filter_subjects = subscription_config.filter_subjects;
config.deliver_policy = subscription_config.deliver_policy;
}

let config = self.get_consumer_config(subscription_config);
let config = self.prefix_filter_subjects(config);
let consumer = self.store.stream.create_consumer(config).await?;

Expand Down Expand Up @@ -261,18 +250,7 @@ impl<S: Streamable> Stream<S> {
&self,
subscription_config: Option<SubscriptionConfig>,
) -> Result<BoxStream<'_, S>, StreamError> {
let mut config = PullConsumerConfig {
filter_subjects: vec![S::WILDCARD_LIST[0].to_string()],
deliver_policy: NatsDeliverPolicy::All,
ack_policy: AckPolicy::None,
..Default::default()
};

if let Some(subscription_config) = subscription_config {
config.filter_subjects = subscription_config.filter_subjects;
config.deliver_policy = subscription_config.deliver_policy;
}

let config = self.get_consumer_config(subscription_config);
let config = self.prefix_filter_subjects(config);
let consumer = self.store.stream.create_consumer(config).await?;

Expand Down Expand Up @@ -301,6 +279,26 @@ impl<S: Streamable> Stream<S> {
.boxed())
}

pub fn get_consumer_config(
&self,
subscription_config: Option<SubscriptionConfig>,
) -> PullConsumerConfig {
let filter_subjects = match subscription_config.clone() {
Some(subscription_config) => subscription_config.filter_subjects,
None => vec![S::WILDCARD_LIST[0].to_string()],
};
let delivery_policy = match subscription_config.clone() {
Some(subscription_config) => subscription_config.deliver_policy,
None => NatsDeliverPolicy::New,
};
PullConsumerConfig {
filter_subjects,
deliver_policy: delivery_policy,
ack_policy: AckPolicy::None,
..Default::default()
}
}

#[cfg(feature = "test-helpers")]
/// Fetch all old messages from this stream
pub async fn catchup(
Expand Down
16 changes: 8 additions & 8 deletions crates/fuel-streams/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
[package]
name = "fuel-streams"
description = "A library for working with streams of Fuel blockchain data"
authors = { workspace = true }
keywords = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
repository = { workspace = true }
version = { workspace = true }
rust-version = { workspace = true }
authors = ["Fuel Labs <[email protected]>"]
keywords = ["data-stream", "blockchain", "cryptocurrencies"]
edition = "2021"
homepage = "https://fuel.network/"
license = "Apache-2.0"
repository = "https://github.com/fuellabs/data-systems"
rust-version = "1.81.0"
version = "0.0.13"

[dependencies]
displaydoc = { workspace = true }
Expand Down
13 changes: 13 additions & 0 deletions crates/fuel-streams/src/networks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub enum FuelNetworkUserRole {
pub enum FuelNetwork {
#[default]
Local,
Staging,
Testnet,
Mainnet,
}
Expand All @@ -26,6 +27,7 @@ impl FromStr for FuelNetwork {
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"local" => Ok(FuelNetwork::Local),
"staging" => Ok(FuelNetwork::Staging),
"testnet" => Ok(FuelNetwork::Testnet),
"mainnet" => Ok(FuelNetwork::Mainnet),
_ => Err(format!("unknown network: {}", s)),
Expand All @@ -37,6 +39,7 @@ impl std::fmt::Display for FuelNetwork {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FuelNetwork::Local => write!(f, "local"),
FuelNetwork::Staging => write!(f, "staging"),
FuelNetwork::Testnet => write!(f, "testnet"),
FuelNetwork::Mainnet => write!(f, "mainnet"),
}
Expand All @@ -48,13 +51,15 @@ impl FuelNetwork {
match std::env::var("NETWORK").as_deref() {
Ok("testnet") => FuelNetwork::Testnet,
Ok("mainnet") => FuelNetwork::Mainnet,
Ok("staging") => FuelNetwork::Staging,
_ => FuelNetwork::Local,
}
}

pub fn to_nats_url(&self) -> String {
match self {
FuelNetwork::Local => "nats://localhost:4222",
FuelNetwork::Staging => "nats://stream-staging.fuel.network:4222",
FuelNetwork::Testnet => "nats://stream-testnet.fuel.network:4222",
FuelNetwork::Mainnet => "nats://stream.fuel.network:4222",
}
Expand All @@ -66,6 +71,10 @@ impl FuelNetwork {
FuelNetwork::Local => {
Url::parse("http://localhost:9003").expect("working url")
}
FuelNetwork::Staging => {
Url::parse("http://stream-staging.fuel.network:9003")
.expect("working url")
}
FuelNetwork::Testnet => {
Url::parse("http://stream-testnet.fuel.network:9003")
.expect("working url")
Expand All @@ -82,6 +91,10 @@ impl FuelNetwork {
FuelNetwork::Local => {
Url::parse("ws://0.0.0.0:9003").expect("working url")
}
FuelNetwork::Staging => {
Url::parse("ws://stream-staging.fuel.network:9003")
.expect("working url")
}
FuelNetwork::Testnet => {
Url::parse("ws://stream-testnet.fuel.network:9003")
.expect("working url")
Expand Down
Loading

0 comments on commit 96adca3

Please sign in to comment.