Skip to content

Commit

Permalink
feat(repo): Switched to tokio-tungestenite
Browse files Browse the repository at this point in the history
  • Loading branch information
0xterminator committed Dec 14, 2024
1 parent 48dc78d commit a199a62
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 119 deletions.
16 changes: 14 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/fuel-streams-ws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ serde_prometheus = { version = "0.2" }
sysinfo = { version = "0.29" }
thiserror = "2.0"
tokio = { workspace = true }
tokio-tungstenite = "0.24.0"
toml = "0.8.19"
tracing = { workspace = true }
tracing-actix-web = { workspace = true }
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
tungstenite = "0.24.0"
url = "2.5"
urlencoding = "2.1"
uuid = { version = "1.11.0", features = ["serde", "v4"] }
Expand Down
243 changes: 134 additions & 109 deletions crates/fuel-streams-ws/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ use fuel_streams::{
Streamable,
};
use fuel_streams_storage::DeliverPolicy;
use futures_util::{
stream::{SplitSink, SplitStream},
SinkExt,
StreamExt,
};
use reqwest::{
header::{
ACCEPT,
Expand All @@ -19,11 +24,15 @@ use reqwest::{
},
Client as HttpClient,
};
use tokio::sync::mpsc;
use tungstenite::{
handshake::client::generate_key,
protocol::Message,
stream::MaybeTlsStream,
use tokio::sync::{mpsc, RwLock};
use tokio_tungstenite::{
connect_async,
tungstenite::{
client::IntoClientRequest,
handshake::client::generate_key,
protocol::Message,
},
MaybeTlsStream,
};
use url::Url;

Expand All @@ -43,7 +52,23 @@ use crate::server::{

#[derive(Debug)]
pub struct WebSocketClient {
socket: Option<tungstenite::WebSocket<MaybeTlsStream<std::net::TcpStream>>>,
read_stream: Option<
SplitStream<
tokio_tungstenite::WebSocketStream<
MaybeTlsStream<tokio::net::TcpStream>,
>,
>,
>,
write_sink: Option<
RwLock<
SplitSink<
tokio_tungstenite::WebSocketStream<
MaybeTlsStream<tokio::net::TcpStream>,
>,
Message,
>,
>,
>,
jwt_token: String,
ws_url: Url,
}
Expand All @@ -59,7 +84,8 @@ impl WebSocketClient {
let ws_url = network.to_ws_url().join("/api/v1/ws")?;

Ok(Self {
socket: None,
read_stream: None,
write_sink: None,
jwt_token,
ws_url,
})
Expand Down Expand Up @@ -97,42 +123,48 @@ impl WebSocketClient {
}
}

pub fn connect(&mut self) -> anyhow::Result<()> {
pub async fn connect(&mut self) -> anyhow::Result<()> {
let host = self
.ws_url
.host_str()
.ok_or(anyhow::anyhow!("Unparsable ws host url"))?;
let request = tungstenite::handshake::client::Request::builder()
.uri(self.ws_url.as_str())
.header(AUTHORIZATION, format!("Bearer {}", self.jwt_token))
.header(HOST, host)
.header(UPGRADE, "websocket")
.header(CONNECTION, "Upgrade")
.header(SEC_WEBSOCKET_KEY, generate_key())
.header(SEC_WEBSOCKET_VERSION, "13")
.body(())
.expect("Failed to build request");

let (socket, _response) = tungstenite::connect(request)?;
self.socket = Some(socket);
let mut request = self.ws_url.as_str().into_client_request()?;
let headers_map = request.headers_mut();
headers_map.insert(
AUTHORIZATION,
format!("Bearer {}", self.jwt_token).parse()?,
);
headers_map.insert(HOST, host.parse()?);
headers_map.insert(UPGRADE, "websocket".parse()?);
headers_map.insert(CONNECTION, "Upgrade".parse().unwrap());
headers_map.insert(SEC_WEBSOCKET_KEY, generate_key().parse()?);
headers_map.insert(SEC_WEBSOCKET_VERSION, "13".parse()?);

let (socket, _response) = connect_async(request).await?;
let (write, read) = socket.split();

self.read_stream = Some(read);
self.write_sink = Some(RwLock::new(write));

Ok(())
}

fn send_client_message(
async fn send_client_message(
&mut self,
message: ClientMessage,
) -> anyhow::Result<()> {
let socket = self
.socket
.as_mut()
let write_sink = self
.write_sink
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Socket not connected"))?;
let mut write_guard = write_sink.write().await;
let serialized = serde_json::to_vec(&message)?;
socket.send(Message::Binary(serialized))?;
write_guard.send(Message::Binary(serialized)).await?;
Ok(())
}

pub fn subscribe(
pub async fn subscribe(
&mut self,
subject: impl IntoSubject,
deliver_policy: DeliverPolicy,
Expand All @@ -141,10 +173,11 @@ impl WebSocketClient {
topic: SubscriptionType::Stream(subject.parse()),
deliver_policy,
});
self.send_client_message(message)
self.send_client_message(message).await?;
Ok(())
}

pub fn unsubscribe(
pub async fn unsubscribe(
&mut self,
subject: impl IntoSubject,
deliver_policy: DeliverPolicy,
Expand All @@ -153,104 +186,96 @@ impl WebSocketClient {
topic: SubscriptionType::Stream(subject.parse()),
deliver_policy,
});
self.send_client_message(message)
self.send_client_message(message).await?;
Ok(())
}

pub fn listen(
pub async fn listen(
&mut self,
) -> anyhow::Result<mpsc::UnboundedReceiver<ServerMessage>> {
let mut socket = self
.socket
let read_stream = self
.read_stream
.take()
.ok_or_else(|| anyhow::anyhow!("Socket not connected"))?;
let (tx, rx) = mpsc::unbounded_channel::<ServerMessage>();
// TODO: the reason for using this type of channel is due to the fact that Streamable cannot be currently
// converted into a dynamic object trait, hence this approach of switching between types
tokio::spawn(async move {
let mut subscription_topic = String::new();
loop {
let msg = socket.read();
let mut read_stream = read_stream;
while let Some(Ok(msg)) = read_stream.next().await {
match msg {
Ok(msg) => match msg {
Message::Text(text) => {
println!("Received text: {:?} bytes", text.len());
}
Message::Binary(bin) => {
let server_message =
match serde_json::from_slice::<ServerMessage>(
&bin,
) {
Ok(server_message) => server_message,
Err(e) => {
eprintln!(
"Unparsable server message: {:?}",
e
);
continue;
}
};
Message::Text(text) => {
println!("Received text: {:?} bytes", text.len());
}
Message::Binary(bin) => {
let server_message = match serde_json::from_slice::<
ServerMessage,
>(
&bin
) {
Ok(server_message) => server_message,
Err(e) => {
eprintln!("Unparsable server message: {:?}", e);
continue;
}
};

match &server_message {
ServerMessage::Subscribed(sub) => {
println!(
"Subscribed to topic: {:?}",
sub.topic
);
let SubscriptionType::Stream(sub) =
&sub.topic;
subscription_topic = sub.clone();
}
ServerMessage::Unsubscribed(sub) => {
println!(
"Unsubscribed from topic: {:?}",
sub.topic
);
}
ServerMessage::Update(update) => {
let _ = decode_print(
&subscription_topic,
update.clone(),
)
.ok();
// send server message over a channel to receivers
if tx.send(server_message).is_err() {
break;
}
}
ServerMessage::Error(err) => {
println!(
"Received error from ws: {:?}",
err
);
match &server_message {
ServerMessage::Subscribed(sub) => {
println!(
"Subscribed to topic: {:?}",
sub.topic
);
let SubscriptionType::Stream(sub) = &sub.topic;
subscription_topic = sub.clone();
}
ServerMessage::Unsubscribed(sub) => {
println!(
"Unsubscribed from topic: {:?}",
sub.topic
);
}
ServerMessage::Update(update) => {
let _ = decode_print(
&subscription_topic,
update.clone(),
)
.ok();
// send server message over a channel to receivers
if tx.send(server_message).is_err() {
break;
}
}
ServerMessage::Error(err) => {
println!("Received error from ws: {:?}", err);
break;
}
}
Message::Ping(ping) => {
println!("Received ping: {:?} bytes", ping.len());
}
Message::Pong(pong) => {
println!("Received pong: {:?} bytes", pong.len());
}
Message::Close(close) => {
let close_code = close
.as_ref()
.map(|c| c.code.to_string())
.unwrap_or_default();
let close_reason = close
.as_ref()
.map(|c| c.reason.to_string())
.unwrap_or_default();
println!("Received close with code: {:?} and reason: {:?}", close_code, close_reason);
break;
}
_ => {
eprintln!("Received unknown message type");
break;
}
},
Err(e) => {
eprintln!("Error reading message from ws: {:?}", e);
}
Message::Ping(ping) => {
println!("Received ping: {:?} bytes", ping.len());
}
Message::Pong(pong) => {
println!("Received pong: {:?} bytes", pong.len());
}
Message::Close(close) => {
let close_code = close
.as_ref()
.map(|c| c.code.to_string())
.unwrap_or_default();
let close_reason = close
.as_ref()
.map(|c| c.reason.to_string())
.unwrap_or_default();
println!(
"Received close with code: {:?} and reason: {:?}",
close_code, close_reason
);
break;
}
_ => {
eprintln!("Received unknown message type");
break;
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-streams-ws/src/server/ws/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ pub async fn get_ws(
return;
}

// TODO: implement unsubscribe and session management
// TODO: implement session management for the same user_id

// send a message to the client to confirm unsubscribing
send_message_to_socket(
Expand Down
Loading

0 comments on commit a199a62

Please sign in to comment.