Skip to content

Commit

Permalink
feat(ethexe): Introduce stream returning events from the ethexe-rpc (
Browse files Browse the repository at this point in the history
  • Loading branch information
techraed authored Feb 4, 2025
1 parent 48e00ab commit 7c19334
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 9 deletions.
43 changes: 38 additions & 5 deletions ethexe/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,25 @@ use apis::{
};
use ethexe_db::Database;
use ethexe_observer::MockBlobReader;
use futures::FutureExt;
use futures::{stream::FusedStream, FutureExt, Stream};
use jsonrpsee::{
server::{
serve_with_graceful_shutdown, stop_channel, Server, ServerHandle, StopHandle,
TowerServiceBuilder,
},
Methods, RpcModule as JsonrpcModule,
};
use std::{net::SocketAddr, sync::Arc};
use tokio::net::TcpListener;
use std::{
mem,
net::SocketAddr,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::{
net::TcpListener,
sync::mpsc::{self, UnboundedReceiver},
};
use tower::Service;

mod apis;
Expand Down Expand Up @@ -77,7 +86,12 @@ impl RpcService {
self.config.listen_addr.port()
}

pub async fn run_server(self) -> Result<ServerHandle> {
pub async fn run_server(self) -> Result<(ServerHandle, RpcReceiver)> {
let (rpc_sender, rpc_receiver) = mpsc::unbounded_channel();

// TODO: Temporary solution, will be changed with introducing tx pool.
mem::forget(rpc_sender);

let listener = TcpListener::bind(self.config.listen_addr).await?;

let cors = util::try_into_cors(self.config.cors)?;
Expand Down Expand Up @@ -166,6 +180,25 @@ impl RpcService {
}
});

Ok(server_handle)
Ok((server_handle, RpcReceiver(rpc_receiver)))
}
}

pub struct RpcReceiver(UnboundedReceiver<RpcEvent>);

impl Stream for RpcReceiver {
type Item = RpcEvent;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.0.poll_recv(cx)
}
}

impl FusedStream for RpcReceiver {
fn is_terminated(&self) -> bool {
self.0.is_closed()
}
}

#[derive(Debug)]
pub enum RpcEvent {}
11 changes: 7 additions & 4 deletions ethexe/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,14 +434,14 @@ impl Service {
mut prometheus,
rpc,
} = self;
let mut rpc_handle = if let Some(rpc) = rpc {
let (mut rpc_handle, mut rpc_receiver) = if let Some(rpc) = rpc {
log::info!("🌐 Rpc server starting at: {}", rpc.port());

let rpc_run = rpc.run_server().await?;
let (rpc_run, rpc_receiver) = rpc.run_server().await?;

Some(tokio::spawn(rpc_run.stopped()))
(Some(tokio::spawn(rpc_run.stopped())), Some(rpc_receiver))
} else {
None
(None, None)
};

let mut roles = "Observer".to_string();
Expand Down Expand Up @@ -714,6 +714,9 @@ impl Service {
}
}
}
event = rpc_receiver.maybe_next_some() => {
log::info!("Received RPC event {event:#?}");
}
_ = rpc_handle.as_mut().maybe() => {
log::info!("`RPCWorker` has terminated, shutting down...");
}
Expand Down

0 comments on commit 7c19334

Please sign in to comment.