Skip to content

Commit

Permalink
Merge branch 'main' into feat/eugene/wss
Browse files Browse the repository at this point in the history
  • Loading branch information
0xterminator committed Dec 11, 2024
2 parents fb1415a + c7244cc commit 56df498
Show file tree
Hide file tree
Showing 18 changed files with 456 additions and 221 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ lcov.info
node_modules/
docker/db*
tmp
/docker/db*
**/**/docker/db*
build_rs_cov.profraw
profile.json
.rust-version
Expand Down
36 changes: 32 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -209,18 +209,46 @@ run-publisher: check-network
$(if $(TELEMETRY_PORT),--telemetry-port $(TELEMETRY_PORT),) \
$(if $(extra_args),--extra-args "$(extra_args)",)

run-mainnet-dev:
run-publisher-mainnet-dev:
$(MAKE) run-publisher NETWORK=mainnet MODE=dev

run-mainnet-profiling:
run-publisher-mainnet-profiling:
$(MAKE) run-publisher NETWORK=mainnet MODE=profiling

run-testnet-dev:
run-publisher-testnet-dev:
$(MAKE) run-publisher NETWORK=testnet MODE=dev

run-testnet-profiling:
run-publisher-testnet-profiling:
$(MAKE) run-publisher NETWORK=testnet MODE=profiling

# ------------------------------------------------------------
# Streamer Run Commands
# ------------------------------------------------------------

run-streamer-local:
cargo run --package fuel-streams-ws --bin ws-streamer -- --config-path crates/fuel-streams-ws/config.toml

run-client-local:
cargo run --package fuel-streams-ws --bin ws-client -- --config-path crates/fuel-streams-ws/config.toml

run-streamer: check-network
@./scripts/run_streamer.sh \
--mode $(MODE) \
$(if $(CONFIG_PATH),--config-path $(CONFIG_PATH),) \
$(if $(extra_args),--extra-args "$(extra_args)",)

run-streamer-mainnet-dev:
$(MAKE) run-streamer NETWORK=mainnet MODE=dev CONFIG_PATH=crates/fuel-streams-ws/config.toml

run-streamer-mainnet-profiling:
$(MAKE) run-streamer NETWORK=mainnet MODE=profiling CONFIG_PATH=crates/fuel-streams-ws/config.toml

run-streamer-testnet-dev:
$(MAKE) run-streamer NETWORK=testnet MODE=dev CONFIG_PATH=crates/fuel-streams-ws/config.toml

run-streamer-testnet-profiling:
$(MAKE) run-streamer NETWORK=testnet MODE=profiling CONFIG_PATH=crates/fuel-streams-ws/config.toml

# ------------------------------------------------------------
# Docker Compose
# ------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ custom_build(
# Build streamer ws image with proper configuration for Minikube
custom_build(
ref='fuel-streams-ws:latest',
command=['./cluster/scripts/build_ws_streamer.sh'],
command=['./cluster/scripts/build_streamer.sh'],
deps=[
'./src',
'./Cargo.toml',
Expand Down
File renamed without changes.
20 changes: 18 additions & 2 deletions crates/fuel-streams-core/src/nats/nats_client_opts.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::time::Duration;
use std::{str::FromStr, time::Duration};

use async_nats::ConnectOptions;
use serde::{Deserialize, Serialize};

use super::NatsNamespace;

Expand All @@ -11,14 +12,29 @@ pub enum NatsUserRole {
Default,
}

#[derive(Debug, Copy, Clone, Default, clap::ValueEnum)]
#[derive(
Debug, Copy, Clone, Default, clap::ValueEnum, Serialize, Deserialize,
)]
#[serde(rename_all = "lowercase")]
pub enum FuelNetwork {
Local,
#[default]
Testnet,
Mainnet,
}

impl FromStr for FuelNetwork {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"local" => Ok(FuelNetwork::Local),
"testnet" => Ok(FuelNetwork::Testnet),
"mainnet" => Ok(FuelNetwork::Mainnet),
_ => Err(()),
}
}
}

impl FuelNetwork {
pub fn to_url(&self) -> String {
match self {
Expand Down
3 changes: 3 additions & 0 deletions crates/fuel-streams-ws/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ port = 9003
[nats]
url = "nats://localhost:4222"

[fuel]
network = "mainnet"

[s3]
enabled = false
region = "us-west-rack1"
Expand Down
158 changes: 16 additions & 142 deletions crates/fuel-streams-ws/src/bin/ws-client.rs
Original file line number Diff line number Diff line change
@@ -1,157 +1,31 @@
use fuel_streams_ws::server::{
http::models::{LoginRequest, LoginResponse},
ws::models::{
use fuel_streams::types::FuelNetwork;
use fuel_streams_ws::{
client::WebSocketClient,
server::ws::models::{
ClientMessage,
ServerMessage,
SubscriptionPayload,
SubscriptionType,
},
};
use reqwest::header::{ACCEPT, CONTENT_TYPE};
use tungstenite::{
handshake::client::{generate_key, Request},
Message,
};
use url::Url;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// get jwt token
let jwt_url = Url::parse("http://localhost:9003/api/v1/jwt").unwrap();
let client = reqwest::Client::new();
let json_body = serde_json::to_string(&LoginRequest {
username: "client".to_string(),
password: "client".to_string(),
})?;
let response = client
.get(jwt_url)
.header(ACCEPT, "application/json")
.header(CONTENT_TYPE, "application/json")
.body(json_body)
.send()
.await?;
let jwt = if response.status().is_success() {
let json_body = response.json::<LoginResponse>().await?;
println!("Jwt endpoint response JSON: {:?}", json_body);
json_body.jwt_token
} else {
panic!("Failed to fetch jwt data: {}", response.status());
};

// open websocket connection
let url = Url::parse("ws://localhost:9003/api/v1/ws").unwrap();
println!("Using websocket url: {:?}", url.as_str());

// url.query_pairs_mut()
// .append_pair("Authorization", &urlencoding::encode(&format!("Bearer {}", jwt)));
// let (mut socket, response) = connect(url.as_str()).expect("Can't connect to ws");
async fn main() -> anyhow::Result<()> {
let mut client =
WebSocketClient::new(FuelNetwork::Mainnet, "admin", "admin").await?;

// Create the WebSocket request with the Authorization header
let host = url.host_str().expect("Invalid host");
let request = Request::builder()
.uri(url.as_str())
.header("Authorization", format!("Bearer {}", jwt))
.header("Host", host)
.header("Upgrade", "websocket")
.header("Connection", "Upgrade")
.header("Sec-WebSocket-Key", generate_key())
.header("Sec-WebSocket-Version", "13")
.body(())
.expect("Failed to build request");
client.connect()?;

let (mut socket, response) =
match tungstenite::client::connect_with_config(request, None, 5) {
Ok((socket, response)) => (socket, response),
Err(err) => {
eprintln!(
"Failed to connect to the server: {:?}",
err.to_string()
);
panic!("Failed to connect to the server");
}
};
client.send_message(ClientMessage::Subscribe(SubscriptionPayload {
topic: SubscriptionType::Stream("blocks.*.*".to_string()),
from: None,
to: None,
}))?;

for (header, value) in response.headers() {
println!("* {}: {:?}", header, value);
}
let mut receiver = client.listen()?;

println!("Connected to the server");
println!("Response HTTP code: {}", response.status());
println!("Response contains the following headers:");
for (ref header, _value) in response.headers() {
println!("* {}", header);
while let Some(message) = receiver.recv().await {
println!("Received: {:?}", message);
}

let stream_topic_wildcard = "blocks.*.*".to_owned();
let msg = ClientMessage::Subscribe(SubscriptionPayload {
topic: SubscriptionType::Stream(stream_topic_wildcard.clone()),
});
socket
.send(Message::Text(serde_json::to_string(&msg).unwrap()))
.unwrap();

socket
.send(Message::Binary(serde_json::to_vec(&msg).unwrap()))
.unwrap();

socket
.send(Message::Ping(serde_json::to_vec(&msg).unwrap()))
.unwrap();

socket
.send(Message::Pong(serde_json::to_vec(&msg).unwrap()))
.unwrap();

let bad_sub = serde_json::json!({
"subscribe": {
"topics": {
"stream": stream_topic_wildcard
}
}
});
socket
.send(Message::Binary(serde_json::to_vec(&bad_sub).unwrap()))
.unwrap();

let jh = tokio::spawn(async move {
loop {
let msg = socket.read();
println!("Received: {:?}", msg);
match msg {
Ok(msg) => match msg {
Message::Text(text) => {
println!("Received text: {:?}", text);
}
Message::Binary(bin) => {
println!("Received binary: {:?}", bin);
let decoded =
serde_json::from_slice::<ServerMessage>(&bin)
.unwrap();
println!("Received server message: {:?}", decoded);
}
Message::Ping(ping) => {
println!("Received ping: {:?}", ping);
}
Message::Pong(pong) => {
println!("Received pong: {:?}", pong);
}
Message::Close(close) => {
println!("Received close: {:?}", close);
break;
}
_ => {
println!("Received unknown message type");
}
},
Err(e) => {
println!("Error reading message: {:?}", e);
break;
}
}
}
});

jh.await?;

Ok(())
}
Loading

0 comments on commit 56df498

Please sign in to comment.