From 58f1118728bc7d820cf94b2f2783cf9a44734f68 Mon Sep 17 00:00:00 2001 From: Xiaobo Liu Date: Tue, 28 Jan 2025 18:29:17 +0800 Subject: [PATCH] refactor(dispatcher): use `RwLock` instead of `Mutex` (#680) --- clash_lib/src/app/dispatcher/dispatcher_impl.rs | 16 ++++++++-------- clash_lib/src/proxy/socks/inbound/stream.rs | 3 ++- clash_lib/src/proxy/tproxy/mod.rs | 4 +++- clash_lib/src/proxy/tun/inbound.rs | 4 +++- 4 files changed, 16 insertions(+), 11 deletions(-) diff --git a/clash_lib/src/app/dispatcher/dispatcher_impl.rs b/clash_lib/src/app/dispatcher/dispatcher_impl.rs index 2d942f82f..66805878d 100644 --- a/clash_lib/src/app/dispatcher/dispatcher_impl.rs +++ b/clash_lib/src/app/dispatcher/dispatcher_impl.rs @@ -17,7 +17,7 @@ use std::{ collections::HashMap, fmt::{Debug, Formatter}, net::SocketAddr, - sync::{Arc, Mutex}, + sync::Arc, time::{Duration, Instant}, }; use tokio::{ @@ -35,7 +35,7 @@ pub struct Dispatcher { outbound_manager: ThreadSafeOutboundManager, router: ThreadSafeRouter, resolver: ThreadSafeDNSResolver, - mode: Arc>, + mode: Arc>, manager: Arc, } @@ -59,7 +59,7 @@ impl Dispatcher { outbound_manager, router, resolver, - mode: Arc::new(Mutex::new(mode)), + mode: Arc::new(RwLock::new(mode)), manager: statistics_manager, } } @@ -67,11 +67,11 @@ impl Dispatcher { pub async fn set_mode(&self, mode: RunMode) { info!("run mode switched to {}", mode); - *self.mode.lock().unwrap() = mode; + *self.mode.write().await = mode; } pub async fn get_mode(&self) -> RunMode { - *self.mode.lock().unwrap() + *self.mode.read().await } #[instrument(skip(self, sess, lhs))] @@ -120,7 +120,7 @@ impl Dispatcher { sess.destination = dest.clone(); - let mode = *self.mode.lock().unwrap(); + let mode = *self.mode.read().await; let (outbound_name, rule) = match mode { RunMode::Global => (PROXY_GLOBAL, None), RunMode::Rule => self.router.match_route(&mut sess).await, @@ -241,7 +241,7 @@ impl Dispatcher { /// Dispatch a UDP packet to outbound handler /// returns the close sender #[instrument] - pub fn dispatch_datagram( + pub async fn dispatch_datagram( &self, sess: Session, udp_inbound: AnyInboundDatagram, @@ -311,7 +311,7 @@ impl Dispatcher { // do Ip though? packet.dst_addr = dest; - let mode = *mode.lock().unwrap(); + let mode = *mode.read().await; let (outbound_name, rule) = match mode { RunMode::Global => (PROXY_GLOBAL, None), diff --git a/clash_lib/src/proxy/socks/inbound/stream.rs b/clash_lib/src/proxy/socks/inbound/stream.rs index 2294fe04e..cce992b36 100644 --- a/clash_lib/src/proxy/socks/inbound/stream.rs +++ b/clash_lib/src/proxy/socks/inbound/stream.rs @@ -187,7 +187,8 @@ pub async fn handle_tcp<'a>( tokio::spawn(async move { let handle = dispatcher_cloned - .dispatch_datagram(sess, Box::new(InboundUdp::new(framed))); + .dispatch_datagram(sess, Box::new(InboundUdp::new(framed))) + .await; close_listener.await.ok(); handle.send(0).ok(); }); diff --git a/clash_lib/src/proxy/tproxy/mod.rs b/clash_lib/src/proxy/tproxy/mod.rs index 083c33f5f..127082849 100644 --- a/clash_lib/src/proxy/tproxy/mod.rs +++ b/clash_lib/src/proxy/tproxy/mod.rs @@ -122,7 +122,9 @@ async fn handle_inbound_datagram( ..Default::default() }; - let closer = dispatcher.dispatch_datagram(sess, Box::new(udp_stream)); + let closer = dispatcher + .dispatch_datagram(sess, Box::new(udp_stream)) + .await; // dispatcher -> tproxy let responder = socket.clone(); diff --git a/clash_lib/src/proxy/tun/inbound.rs b/clash_lib/src/proxy/tun/inbound.rs index e20c2945b..9b781ec05 100644 --- a/clash_lib/src/proxy/tun/inbound.rs +++ b/clash_lib/src/proxy/tun/inbound.rs @@ -88,7 +88,9 @@ async fn handle_inbound_datagram( ..Default::default() }; - let closer = dispatcher.dispatch_datagram(sess, Box::new(udp_stream)); + let closer = dispatcher + .dispatch_datagram(sess, Box::new(udp_stream)) + .await; // dispatcher -> tun let fut1 = tokio::spawn(async move {