Skip to content

Commit

Permalink
Merge pull request #84 from rainshowerLabs/db-ws-fixes
Browse files Browse the repository at this point in the history
Db ws fixes
  • Loading branch information
makemake-kbo authored Jun 20, 2024
2 parents a99d829 + 93ca7d7 commit 4528c47
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 205 deletions.
389 changes: 203 additions & 186 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ COPY . /app

RUN apt-get update && apt-get install -y libssl-dev pkg-config
# Docker is a pos
RUN cargo build --profile maxperf
RUN cargo build --profile maxperf --features debug-verbose

FROM debian:bookworm

Expand Down
18 changes: 9 additions & 9 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
valgrind
python311Packages.requests
python311Packages.websocket-client
(rust-bin.stable.latest.default.override {
(rust-bin.nightly.latest.default.override {
extensions = [ "rust-src" "rustfmt-preview" "rust-analyzer" ];
})
];
Expand Down
5 changes: 4 additions & 1 deletion src/balancer/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ pub fn replace_block_tags(
// Check if the block number is a named tag
let nn = has_named_number(&block_number);
if nn != NamedNumber::Null {
let rwlock_guard = named_blocknumbers.read().unwrap();
let rwlock_guard = named_blocknumbers.read().unwrap_or_else(|e| {
// Handle the case where the RwLock is poisoned
e.into_inner()
});

// Replace the named block tag with its corresponding hex value
match nn {
Expand Down
2 changes: 0 additions & 2 deletions src/health/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,6 @@ fn escape_poverty(
to_send = LiveReadyUpdate::Health(HealthState::Healthy);
} else if !is_pov_empty && !is_rpc_empty {
to_send = LiveReadyUpdate::Health(HealthState::MissingRpcs);
} else if is_rpc_empty {
to_send = LiveReadyUpdate::Health(HealthState::Unhealthy);
} else {
to_send = LiveReadyUpdate::Health(HealthState::Unhealthy);
}
Expand Down
5 changes: 4 additions & 1 deletion src/health/safe_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,10 @@ pub async fn subscribe_to_new_heads(
let mut nn_rwlock = cache_args.named_numbers.write().unwrap();
let a = hex_to_decimal(sub["params"]["result"]["number"].as_str().unwrap())
.unwrap();
subscription_id = sub["params"]["subscription"].as_str().unwrap().to_owned();
sub["params"]["subscription"]
.as_str()
.unwrap()
.clone_into(&mut subscription_id);
log_info!("New chain head: {}", a);
let _ = blocknum_tx.send(a);
nn_rwlock.latest = a;
Expand Down
50 changes: 46 additions & 4 deletions src/websocket/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,24 @@ pub async fn ws_conn_manager(
// Initialize WebSocket connections
update_ws_connections(&rpc_list, &ws_handles, &broadcast_tx, &ws_error_tx).await;

// Buffer for WS subscriptions when all nodes are ded
let mut ws_buffer: Vec<Value> = Vec::new();

while let Some(message) = incoming_rx.recv().await {
match message {
WsconnMessage::Message(incoming, specified_index) => {
handle_incoming_message(&ws_handles, &rpc_list, incoming, specified_index).await;
handle_incoming_message(
&ws_handles,
&rpc_list,
incoming,
specified_index,
&mut ws_buffer,
)
.await;
}
WsconnMessage::Reconnect() => {
update_ws_connections(&rpc_list, &ws_handles, &broadcast_tx, &ws_error_tx).await;
unload_buffer(&rpc_list, &ws_handles, &mut ws_buffer).await;
}
}
}
Expand All @@ -97,6 +108,19 @@ async fn update_ws_connections(
*ws_handle_guard = ws_vec;
}

/// Dispatches buffered WS subscriptions out to nodes.
async fn unload_buffer(
rpc_list: &Arc<RwLock<Vec<Rpc>>>,
ws_handles: &Arc<RwLock<Vec<Option<mpsc::UnboundedSender<Value>>>>>,
ws_buffer: &mut Vec<Value>,
) {
for i in 0..ws_buffer.len() {
let incoming = ws_buffer[i].clone();
handle_incoming_message(ws_handles, rpc_list, incoming, None, ws_buffer).await;
}
ws_buffer.clear();
}

/// Sends an incoming request to a WS connection.
///
/// Indexes can be specified via the `specified_index` param.
Expand All @@ -105,19 +129,29 @@ async fn handle_incoming_message(
rpc_list: &Arc<RwLock<Vec<Rpc>>>,
incoming: Value,
specified_index: Option<usize>,
ws_buffer: &mut Vec<Value>,
) {
let rpc_position = if let Some(index) = specified_index {
index
} else {
let mut rpc_list_guard = rpc_list.write().unwrap_or_else(|e| {
// Handle the case where the rpc_list RwLock is poisoned
log_err!("{}", e);
log_err!("handle_incoming_message poison: {}", e);
e.into_inner()
});

match pick(&mut rpc_list_guard).1 {
Some(position) => position,
None => {
// Check if the incoming content is a subscription.
//
// We do this because we want to send it to a buffer
// in case we have no available RPCs.
if incoming["method"] == "eth_subscription" || incoming["method"] == "eth_subscribe"
{
println!("in none: {:?}", incoming);
ws_buffer.push(incoming);
}
log_err!("No RPC position available");
return;
}
Expand Down Expand Up @@ -461,8 +495,16 @@ mod tests {
let (tx, mut rx) = mpsc::unbounded_channel();
let ws_handles = Arc::new(RwLock::new(vec![Some(tx)]));
let incoming = json!({"type": "test"});

handle_incoming_message(&ws_handles, &rpc_list, incoming.clone(), Some(0)).await;
let mut ws_buffer: Vec<Value> = Vec::new();

handle_incoming_message(
&ws_handles,
&rpc_list,
incoming.clone(),
Some(0),
&mut ws_buffer,
)
.await;

// Check if the message was sent through the channel
let received = rx.recv().await;
Expand Down

0 comments on commit 4528c47

Please sign in to comment.