Skip to content

Commit

Permalink
feat(repo): Small change to websocket subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
0xterminator committed Dec 15, 2024
1 parent a199a62 commit a99cc84
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 57 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ tests:
- it: should configure ports correctly
set:
webserver.enabled: true
webserver.service.port: 8082
webserver.service.port: 9003
webserver.ports:
- name: metrics
containerPort: 9090
containerPort: 9003
protocol: TCP
asserts:
- lengthEqual:
Expand All @@ -79,13 +79,13 @@ tests:
path: spec.template.spec.containers[0].ports
content:
name: webserver
containerPort: 8082
containerPort: 9003
protocol: TCP
- contains:
path: spec.template.spec.containers[0].ports
content:
name: metrics
containerPort: 9090
containerPort: 9003
protocol: TCP

- it: should set replicas when autoscaling is disabled
Expand Down
49 changes: 46 additions & 3 deletions cluster/charts/fuel-streams/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,11 @@ publisher:
# -------------------------------------------------------------------------------------------------

webserver:
enabled: false
port: 8082
enabled: true
port: 9003

image:
repository: fuel-streams-webserver
repository: fuel-streams-ws
pullPolicy: Never
tag: "latest"

Expand Down Expand Up @@ -227,6 +227,49 @@ webserver:
targetCPUUtilizationPercentage: 80
targetMemoryUtilizationPercentage: 80

env:
STREAMER_MAX_WORKERS: "10"
STREAMER_API_PORT: 9003
JWT_AUTH_SECRET: "secret"
USE_ELASTIC_LOGGING: false
USE_METRICS: true
AWS_S3_ENABLED: true
NATS_URL: "fuel-streams-nats-publisher:4222"
NETWORK: testnet

# Additional environment variables with complex structures
# extraEnv: []
# - name: AWS_ACCESS_KEY_ID
# valueFrom:
# secretKeyRef:
# name: fuel-streams-webserver
# key: AWS_ACCESS_KEY_ID
# - name: AWS_SECRET_ACCESS_KEY
# valueFrom:
# secretKeyRef:
# name: fuel-streams-webserver
# key: AWS_SECRET_ACCESS_KEY
# - name: AWS_REGION
# valueFrom:
# secretKeyRef:
# name: fuel-streams-webserver
# key: AWS_REGION
# - name: AWS_S3_BUCKET_NAME
# valueFrom:
# secretKeyRef:
# name: fuel-streams-webserver
# key: AWS_S3_BUCKET_NAME
# - name: AWS_ENDPOINT_URL
# valueFrom:
# secretKeyRef:
# name: fuel-streams-webserver
# key: AWS_ENDPOINT_URL
# Optional: Bulk environment references
# envFrom: {}
# - configMapRef:
# name: additional-config
# - secretRef:
# name: additional-secrets
# -------------------------------------------------------------------------------------------------
# NATS Core configuration
# -------------------------------------------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions cluster/docker/fuel-streams-ws.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ ARG STREAMER_API_PORT=9003

ENV STREAMER_API_PORT=$STREAMER_API_PORT
ENV NATS_URL=
ENV NETWORK=
ENV USE_METRICS=
ENV USE_ELASTIC_LOGGING=
ENV AWS_S3_ENABLED=
Expand Down
1 change: 1 addition & 0 deletions crates/fuel-streams-ws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ serde_json = { workspace = true }
serde_prometheus = { version = "0.2" }
sysinfo = { version = "0.29" }
thiserror = "2.0"
time = { version = "0.3", features = ["serde"] }
tokio = { workspace = true }
tokio-tungstenite = "0.24.0"
toml = "0.8.19"
Expand Down
51 changes: 33 additions & 18 deletions crates/fuel-streams-ws/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use fuel_streams::{
utxos::Utxo,
Streamable,
};
use fuel_streams_storage::DeliverPolicy;
use futures_util::{
stream::{SplitSink, SplitStream},
SinkExt,
Expand Down Expand Up @@ -42,9 +41,9 @@ use crate::server::{
errors::WsSubscriptionError,
models::{
ClientMessage,
DeliverPolicy,
ServerMessage,
SubscriptionPayload,
SubscriptionType,
},
socket::verify_and_extract_subject_name,
},
Expand All @@ -69,8 +68,11 @@ pub struct WebSocketClient {
>,
>,
>,
jwt_token: String,
jwt_token: Option<String>,
ws_url: Url,
network: FuelNetwork,
username: String,
password: String,
}

impl WebSocketClient {
Expand All @@ -86,8 +88,11 @@ impl WebSocketClient {
Ok(Self {
read_stream: None,
write_sink: None,
jwt_token,
jwt_token: Some(jwt_token),
ws_url,
network,
username: username.to_string(),
password: password.to_string(),
})
}

Expand Down Expand Up @@ -123,18 +128,29 @@ impl WebSocketClient {
}
}

pub async fn refresh_jwt(&mut self) -> anyhow::Result<()> {
let jwt_token =
Self::fetch_jwt(self.network, &self.username, &self.password)
.await?;
self.jwt_token = Some(jwt_token);
Ok(())
}

pub async fn connect(&mut self) -> anyhow::Result<()> {
let host = self
.ws_url
.host_str()
.ok_or(anyhow::anyhow!("Unparsable ws host url"))?;

let jwt_token = self
.jwt_token
.clone()
.ok_or(anyhow::anyhow!("JWT token is missing"))?;

let mut request = self.ws_url.as_str().into_client_request()?;
let headers_map = request.headers_mut();
headers_map.insert(
AUTHORIZATION,
format!("Bearer {}", self.jwt_token).parse()?,
);
headers_map
.insert(AUTHORIZATION, format!("Bearer {}", jwt_token).parse()?);
headers_map.insert(HOST, host.parse()?);
headers_map.insert(UPGRADE, "websocket".parse()?);
headers_map.insert(CONNECTION, "Upgrade".parse().unwrap());
Expand Down Expand Up @@ -166,11 +182,11 @@ impl WebSocketClient {

pub async fn subscribe(
&mut self,
subject: impl IntoSubject,
wildcard: impl IntoSubject,
deliver_policy: DeliverPolicy,
) -> anyhow::Result<()> {
let message = ClientMessage::Subscribe(SubscriptionPayload {
topic: SubscriptionType::Stream(subject.parse()),
wildcard: wildcard.parse(),
deliver_policy,
});
self.send_client_message(message).await?;
Expand All @@ -179,11 +195,11 @@ impl WebSocketClient {

pub async fn unsubscribe(
&mut self,
subject: impl IntoSubject,
wildcard: impl IntoSubject,
deliver_policy: DeliverPolicy,
) -> anyhow::Result<()> {
let message = ClientMessage::Unsubscribe(SubscriptionPayload {
topic: SubscriptionType::Stream(subject.parse()),
wildcard: wildcard.parse(),
deliver_policy,
});
self.send_client_message(message).await?;
Expand Down Expand Up @@ -224,16 +240,15 @@ impl WebSocketClient {
match &server_message {
ServerMessage::Subscribed(sub) => {
println!(
"Subscribed to topic: {:?}",
sub.topic
"Subscribed to wildcard: {:?}",
sub.wildcard
);
let SubscriptionType::Stream(sub) = &sub.topic;
subscription_topic = sub.clone();
subscription_topic = sub.wildcard.clone();
}
ServerMessage::Unsubscribed(sub) => {
println!(
"Unsubscribed from topic: {:?}",
sub.topic
"Unsubscribed from wildcard: {:?}",
sub.wildcard
);
}
ServerMessage::Update(update) => {
Expand Down
4 changes: 2 additions & 2 deletions crates/fuel-streams-ws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ pub mod telemetry;

use std::{env, sync::LazyLock};

pub static MAX_THREADS: LazyLock<usize> = LazyLock::new(|| {
pub static STREAMER_MAX_WORKERS: LazyLock<usize> = LazyLock::new(|| {
let available_cpus = num_cpus::get();
let default_threads = 2 * available_cpus;

env::var("MAX_THREADS")
env::var("STREAMER_MAX_WORKERS")
.ok()
.and_then(|val| val.parse().ok())
.unwrap_or(default_threads)
Expand Down
6 changes: 2 additions & 4 deletions crates/fuel-streams-ws/src/server/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ use super::{
state::ServerState,
ws::socket::get_ws,
};
use crate::config::Config;

const MAX_WORKERS: usize = 10;
use crate::{config::Config, STREAMER_MAX_WORKERS};

const API_VERSION: &str = "v1";

Expand Down Expand Up @@ -82,7 +80,7 @@ pub fn create_api(
)
})
.bind(server_addr)?
.workers(MAX_WORKERS)
.workers(*STREAMER_MAX_WORKERS)
.shutdown_timeout(20)
.run();

Expand Down
68 changes: 55 additions & 13 deletions crates/fuel-streams-ws/src/server/ws/models.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,62 @@
use fuel_streams_storage::DeliverPolicy;
use fuel_streams_storage::DeliverPolicy as NatsDeliverPolicy;
use serde::{Deserialize, Serialize};

#[derive(Eq, PartialEq, Debug, Deserialize, Serialize, Clone)]
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
#[serde(rename_all = "camelCase")]
pub enum SubscriptionType {
Stream(String),
pub enum DeliverPolicy {
All,
Last,
New,
ByStartSequence {
#[serde(rename = "optStartSeq")]
start_sequence: u64,
},
ByStartTime {
#[serde(rename = "optStartTime")]
start_time: time::OffsetDateTime,
},
LastPerSubject,
}

impl From<DeliverPolicy> for NatsDeliverPolicy {
fn from(policy: DeliverPolicy) -> Self {
match policy {
DeliverPolicy::All => NatsDeliverPolicy::All,
DeliverPolicy::Last => NatsDeliverPolicy::Last,
DeliverPolicy::New => NatsDeliverPolicy::New,
DeliverPolicy::ByStartSequence { start_sequence } => {
NatsDeliverPolicy::ByStartSequence { start_sequence }
}
DeliverPolicy::ByStartTime { start_time } => {
NatsDeliverPolicy::ByStartTime { start_time }
}
DeliverPolicy::LastPerSubject => NatsDeliverPolicy::LastPerSubject,
}
}
}

impl From<NatsDeliverPolicy> for DeliverPolicy {
fn from(policy: NatsDeliverPolicy) -> Self {
match policy {
NatsDeliverPolicy::All => DeliverPolicy::All,
NatsDeliverPolicy::Last => DeliverPolicy::Last,
NatsDeliverPolicy::New => DeliverPolicy::New,
NatsDeliverPolicy::ByStartSequence { start_sequence } => {
DeliverPolicy::ByStartSequence { start_sequence }
}
NatsDeliverPolicy::ByStartTime { start_time } => {
DeliverPolicy::ByStartTime { start_time }
}
NatsDeliverPolicy::LastPerSubject => DeliverPolicy::LastPerSubject,
}
}
}

#[derive(Eq, PartialEq, Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct SubscriptionPayload {
pub topic: SubscriptionType,
pub wildcard: String,
pub deliver_policy: DeliverPolicy,
}

Expand All @@ -32,25 +78,21 @@ pub enum ServerMessage {

#[cfg(test)]
mod tests {
use fuel_streams_storage::DeliverPolicy;

use super::{ClientMessage, SubscriptionPayload, SubscriptionType};
use super::{ClientMessage, DeliverPolicy, SubscriptionPayload};

#[test]
fn test_sub_ser() {
let stream_topic_wildcard = "blocks.*.*".to_owned();
let msg = ClientMessage::Subscribe(SubscriptionPayload {
topic: SubscriptionType::Stream(stream_topic_wildcard.clone()),
wildcard: stream_topic_wildcard.clone(),
deliver_policy: DeliverPolicy::All,
});
let ser_str_value = serde_json::to_string(&msg).unwrap();
println!("Ser value {:?}", ser_str_value);
let expected_value = serde_json::json!({
"subscribe": {
"topic": {
"stream": stream_topic_wildcard,
"deliver_policy": "all"
}
"wildcard": stream_topic_wildcard,
"deliverPolicy": "all"
}
});
let deser_msg_val =
Expand Down
Loading

0 comments on commit a99cc84

Please sign in to comment.