Skip to content

Commit

Permalink
refactor(dispatcher): use RwLock<RunMode> instead of `Mutex<RunMode…
Browse files Browse the repository at this point in the history
…>` (#680)
  • Loading branch information
cppcoffee authored Jan 28, 2025
1 parent 8f86d58 commit 58f1118
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 11 deletions.
16 changes: 8 additions & 8 deletions clash_lib/src/app/dispatcher/dispatcher_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::{
collections::HashMap,
fmt::{Debug, Formatter},
net::SocketAddr,
sync::{Arc, Mutex},
sync::Arc,
time::{Duration, Instant},
};
use tokio::{
Expand All @@ -35,7 +35,7 @@ pub struct Dispatcher {
outbound_manager: ThreadSafeOutboundManager,
router: ThreadSafeRouter,
resolver: ThreadSafeDNSResolver,
mode: Arc<Mutex<RunMode>>,
mode: Arc<RwLock<RunMode>>,

manager: Arc<Manager>,
}
Expand All @@ -59,19 +59,19 @@ impl Dispatcher {
outbound_manager,
router,
resolver,
mode: Arc::new(Mutex::new(mode)),
mode: Arc::new(RwLock::new(mode)),
manager: statistics_manager,
}
}

pub async fn set_mode(&self, mode: RunMode) {
info!("run mode switched to {}", mode);

*self.mode.lock().unwrap() = mode;
*self.mode.write().await = mode;
}

pub async fn get_mode(&self) -> RunMode {
*self.mode.lock().unwrap()
*self.mode.read().await
}

#[instrument(skip(self, sess, lhs))]
Expand Down Expand Up @@ -120,7 +120,7 @@ impl Dispatcher {

sess.destination = dest.clone();

let mode = *self.mode.lock().unwrap();
let mode = *self.mode.read().await;
let (outbound_name, rule) = match mode {
RunMode::Global => (PROXY_GLOBAL, None),
RunMode::Rule => self.router.match_route(&mut sess).await,
Expand Down Expand Up @@ -241,7 +241,7 @@ impl Dispatcher {
/// Dispatch a UDP packet to outbound handler
/// returns the close sender
#[instrument]
pub fn dispatch_datagram(
pub async fn dispatch_datagram(
&self,
sess: Session,
udp_inbound: AnyInboundDatagram,
Expand Down Expand Up @@ -311,7 +311,7 @@ impl Dispatcher {
// do Ip though?
packet.dst_addr = dest;

let mode = *mode.lock().unwrap();
let mode = *mode.read().await;

let (outbound_name, rule) = match mode {
RunMode::Global => (PROXY_GLOBAL, None),
Expand Down
3 changes: 2 additions & 1 deletion clash_lib/src/proxy/socks/inbound/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ pub async fn handle_tcp<'a>(

tokio::spawn(async move {
let handle = dispatcher_cloned
.dispatch_datagram(sess, Box::new(InboundUdp::new(framed)));
.dispatch_datagram(sess, Box::new(InboundUdp::new(framed)))
.await;
close_listener.await.ok();
handle.send(0).ok();
});
Expand Down
4 changes: 3 additions & 1 deletion clash_lib/src/proxy/tproxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ async fn handle_inbound_datagram(
..Default::default()
};

let closer = dispatcher.dispatch_datagram(sess, Box::new(udp_stream));
let closer = dispatcher
.dispatch_datagram(sess, Box::new(udp_stream))
.await;

// dispatcher -> tproxy
let responder = socket.clone();
Expand Down
4 changes: 3 additions & 1 deletion clash_lib/src/proxy/tun/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ async fn handle_inbound_datagram(
..Default::default()
};

let closer = dispatcher.dispatch_datagram(sess, Box::new(udp_stream));
let closer = dispatcher
.dispatch_datagram(sess, Box::new(udp_stream))
.await;

// dispatcher -> tun
let fut1 = tokio::spawn(async move {
Expand Down

0 comments on commit 58f1118

Please sign in to comment.