diff --git a/examples/proxy.rs b/examples/proxy.rs index 20fb416..d4998b9 100644 --- a/examples/proxy.rs +++ b/examples/proxy.rs @@ -77,7 +77,6 @@ async fn test_proxy( let mut res = match client.send_now(req.into_type()).await { Ok(res) => { if is_failed { - println!("status {:?}", res.status()); assert!(res.status() != 200); return ; } diff --git a/src/helper.rs b/src/helper.rs index ef07ce7..431efc6 100644 --- a/src/helper.rs +++ b/src/helper.rs @@ -425,6 +425,10 @@ impl Helper { } } + pub fn calc_sock_map(server_id: u32, sock_map: u32) -> u64 { + ((server_id as u64) << 32) + (sock_map as u64) + } + // pub async fn udp_recv_from(socket: &UdpSocket, buf: &mut [u8]) -> io::Result { // let (s, addr) = socket.recv_from(&mut buf).await?; // unsafe { diff --git a/src/option.rs b/src/option.rs index d6341df..083b8c3 100644 --- a/src/option.rs +++ b/src/option.rs @@ -706,11 +706,9 @@ impl ConfigOption { if let Some(http) = &mut self.http { http.after_load_option()?; } - if let Some(stream) = &mut self.stream { stream.copy_to_child(); } - println!("options = {:?}", self); Ok(()) } diff --git a/src/prot/close.rs b/src/prot/close.rs index 29e75d8..d1f3edd 100644 --- a/src/prot/close.rs +++ b/src/prot/close.rs @@ -22,31 +22,29 @@ use super::{ProtFrameHeader, read_short_string, write_short_string}; /// 旧的Socket连接关闭, 接收到则关闭掉当前的连接 #[derive(Debug)] pub struct ProtClose { - server_id: u32, - sock_map: u32, + sock_map: u64, reason: String, } impl ProtClose { - pub fn new(server_id: u32, sock_map: u32) -> ProtClose { - ProtClose { server_id, sock_map, reason: String::new() } + pub fn new(sock_map: u64) -> ProtClose { + ProtClose { sock_map, reason: String::new() } } - pub fn new_by_reason(server_id: u32, sock_map: u32, reason: String) -> ProtClose { - ProtClose { server_id, sock_map, reason } + pub fn new_by_reason(sock_map: u64, reason: String) -> ProtClose { + ProtClose { sock_map, reason } } pub fn parse(header: ProtFrameHeader, mut buf: T) -> ProxyResult { let reason = read_short_string(&mut buf)?; Ok(ProtClose { - server_id: header.server_id(), sock_map: header.sock_map(), reason, }) } pub fn encode(self, buf: &mut B) -> ProxyResult { - let mut head = ProtFrameHeader::new(ProtKind::Close, ProtFlag::zero(), self.sock_map, self.server_id); + let mut head = ProtFrameHeader::new(ProtKind::Close, ProtFlag::zero(), self.sock_map); head.length = self.reason.as_bytes().len() as u32 + 1; let mut size = 0; size += head.encode(buf)?; @@ -54,7 +52,7 @@ impl ProtClose { Ok(size) } - pub fn sock_map(&self) -> u32 { + pub fn sock_map(&self) -> u64 { self.sock_map } diff --git a/src/prot/create.rs b/src/prot/create.rs index 00146f2..fb4bb31 100644 --- a/src/prot/create.rs +++ b/src/prot/create.rs @@ -1,11 +1,11 @@ // Copyright 2022 - 2023 Wenmeng See the COPYRIGHT // file at the top-level directory of this distribution. -// +// // Licensed under the Apache License, Version 2.0 , at your // option. This file may not be copied, modified, or distributed // except according to those terms. -// +// // Author: tickbh // ----- // Created Date: 2023/09/22 10:28:28 @@ -19,21 +19,19 @@ use crate::{ use super::ProtFrameHeader; -/// 新的Socket连接请求, +/// 新的Socket连接请求, /// 接收方创建一个虚拟链接来对应该Socket的读取写入 #[derive(Debug)] #[allow(dead_code)] pub struct ProtCreate { - server_id: u32, - sock_map: u32, + sock_map: u64, mode: u8, domain: Option, } impl ProtCreate { - pub fn new(server_id: u32,sock_map: u32, domain: Option) -> Self { + pub fn new(sock_map: u64, domain: Option) -> Self { Self { - server_id, sock_map, mode: 0, domain, @@ -51,7 +49,6 @@ impl ProtCreate { domain = Some(String::from_utf8_lossy(data).to_string()); } Ok(ProtCreate { - server_id: header.server_id(), sock_map: header.sock_map(), mode: 0, domain, @@ -59,8 +56,16 @@ impl ProtCreate { } pub fn encode(self, buf: &mut B) -> ProxyResult { - let mut head = ProtFrameHeader::new(ProtKind::Create, ProtFlag::zero(), self.sock_map, self.server_id); - let domain_len = self.domain.as_ref().map(|s| s.as_bytes().len() as u32).unwrap_or(0); + let mut head = ProtFrameHeader::new( + ProtKind::Create, + ProtFlag::zero(), + self.sock_map, + ); + let domain_len = self + .domain + .as_ref() + .map(|s| s.as_bytes().len() as u32) + .unwrap_or(0); head.length = 1 + domain_len; let mut size = 0; size += head.encode(buf)?; @@ -71,7 +76,7 @@ impl ProtCreate { Ok(size) } - pub fn sock_map(&self) -> u32 { + pub fn sock_map(&self) -> u64 { self.sock_map } diff --git a/src/prot/data.rs b/src/prot/data.rs index ecc4724..c71c1b4 100644 --- a/src/prot/data.rs +++ b/src/prot/data.rs @@ -22,20 +22,18 @@ use super::ProtFrameHeader; /// Socket的数据消息包 #[derive(Debug)] pub struct ProtData { - server_id: u32, - sock_map: u32, + sock_map: u64, data: Vec, } impl ProtData { - pub fn new(server_id: u32,sock_map: u32, data: Vec) -> ProtData { - Self { server_id, sock_map, data } + pub fn new(sock_map: u64, data: Vec) -> ProtData { + Self { sock_map, data } } pub fn parse(header: ProtFrameHeader, mut buf: T) -> ProxyResult { log::trace!("代理中心: 解码Data数据长度={}", header.length); Ok(Self { - server_id: header.server_id(), sock_map: header.sock_map(), data: buf.advance_chunk(header.length as usize).to_vec(), }) @@ -43,7 +41,7 @@ impl ProtData { pub fn encode(mut self, buf: &mut B) -> ProxyResult { log::trace!("代理中心: 编码Data数据长度={}", self.data.len()); - let mut head = ProtFrameHeader::new(ProtKind::Data, ProtFlag::zero(), self.sock_map, self.server_id); + let mut head = ProtFrameHeader::new(ProtKind::Data, ProtFlag::zero(), self.sock_map); head.length = self.data.len() as u32; let mut size = 0; size += head.encode(buf)?; @@ -55,7 +53,7 @@ impl ProtData { &self.data } - pub fn sock_map(&self) -> u32 { + pub fn sock_map(&self) -> u64 { self.sock_map } } diff --git a/src/prot/frame.rs b/src/prot/frame.rs index 4eb8193..f1edac3 100644 --- a/src/prot/frame.rs +++ b/src/prot/frame.rs @@ -11,10 +11,9 @@ // Created Date: 2023/09/22 10:30:10 -use tokio_util::bytes::buf; use webparse::{Buf, http2::frame::{read_u24, encode_u24}, BufMut}; -use crate::{ProxyResult, MappingConfig}; +use crate::{Helper, MappingConfig, ProxyResult}; use super::{ProtCreate, ProtClose, ProtData, ProtFlag, ProtKind, ProtMapping, ProtToken}; @@ -27,10 +26,8 @@ pub struct ProtFrameHeader { kind: ProtKind, /// 包体的标识, 如是否为响应包等 flag: ProtFlag, - /// 3个字节, socket在内存中相应的句柄, 客户端发起为单数, 服务端发起为双数 - sock_map: u32, - /// 服务器的id - server_id: u32, + /// 前32位表示server_id, 后四位3个字节表示id, socket在内存中相应的句柄, 客户端发起为单数, 服务端发起为双数 + sock_map: u64, } #[derive(Debug)] @@ -50,21 +47,16 @@ pub enum ProtFrame { impl ProtFrameHeader { pub const FRAME_HEADER_BYTES: usize = 12; - pub fn new(kind: ProtKind, flag: ProtFlag, sock_map: u32, server_id: u32) -> ProtFrameHeader { + pub fn new(kind: ProtKind, flag: ProtFlag, sock_map: u64) -> ProtFrameHeader { ProtFrameHeader { length: 0, kind, flag, sock_map, - server_id, } } - - pub fn server_id(&self) -> u32 { - self.server_id - } - pub fn sock_map(&self) -> u32 { + pub fn sock_map(&self) -> u64 { self.sock_map } @@ -94,8 +86,7 @@ impl ProtFrameHeader { length, kind: ProtKind::new(kind), flag: ProtFlag::new(flag), - sock_map, - server_id, + sock_map: Helper::calc_sock_map(server_id, sock_map), }) } @@ -105,8 +96,8 @@ impl ProtFrameHeader { size += encode_u24(buffer, self.length); size += buffer.put_u8(self.kind.encode()); size += buffer.put_u8(self.flag.bits()); - size += encode_u24(buffer, self.sock_map); - size += buffer.put_u32(self.server_id); + size += encode_u24(buffer, self.sock_map as u32); + size += buffer.put_u32((self.sock_map >> 32) as u32); Ok(size) } @@ -144,23 +135,23 @@ impl ProtFrame { Ok(size) } - pub fn new_create(server_id: u32, sock_map: u32, domain: Option) -> Self { - Self::Create(ProtCreate::new(server_id, sock_map, domain)) + pub fn new_create(sock_map: u64, domain: Option) -> Self { + Self::Create(ProtCreate::new(sock_map, domain)) } - pub fn new_close(sock_map: u32) -> Self { + pub fn new_close(sock_map: u64) -> Self { Self::Close(ProtClose::new(sock_map)) } - pub fn new_close_reason(sock_map: u32, reason: String) -> Self { + pub fn new_close_reason(sock_map: u64, reason: String) -> Self { Self::Close(ProtClose::new_by_reason(sock_map, reason)) } - pub fn new_data(sock_map: u32, data: Vec) -> Self { + pub fn new_data(sock_map: u64, data: Vec) -> Self { Self::Data(ProtData::new(sock_map, data)) } - pub fn new_mapping(sock_map: u32, mappings: Vec) -> Self { + pub fn new_mapping(sock_map: u64, mappings: Vec) -> Self { Self::Mapping(ProtMapping::new(sock_map, mappings)) } @@ -196,7 +187,7 @@ impl ProtFrame { } } - pub fn sock_map(&self) -> u32 { + pub fn sock_map(&self) -> u64 { match self { ProtFrame::Data(s) => s.sock_map(), ProtFrame::Create(s) => s.sock_map(), diff --git a/src/prot/mapping.rs b/src/prot/mapping.rs index 498e3d5..a546cc3 100644 --- a/src/prot/mapping.rs +++ b/src/prot/mapping.rs @@ -23,12 +23,12 @@ use super::{ProtFrameHeader, read_short_string, write_short_string}; /// 接收方创建一个虚拟链接来对应该Socket的读取写入 #[derive(Debug)] pub struct ProtMapping { - sock_map: u32, + sock_map: u64, pub mappings: Vec, } impl ProtMapping { - pub fn new(sock_map: u32, mappings: Vec) -> Self { + pub fn new(sock_map: u64, mappings: Vec) -> Self { Self { sock_map, mappings, @@ -87,7 +87,7 @@ impl ProtMapping { Ok(size) } - pub fn sock_map(&self) -> u32 { + pub fn sock_map(&self) -> u64 { self.sock_map } diff --git a/src/prot/token.rs b/src/prot/token.rs index 7996ebb..09bf8a8 100644 --- a/src/prot/token.rs +++ b/src/prot/token.rs @@ -65,7 +65,7 @@ impl ProtToken { return true } - pub fn sock_map(&self) -> u32 { + pub fn sock_map(&self) -> u64 { 0 } } diff --git a/src/streams/center_client.rs b/src/streams/center_client.rs index ce558b0..530d36e 100644 --- a/src/streams/center_client.rs +++ b/src/streams/center_client.rs @@ -133,7 +133,7 @@ impl CenterClient { where T: AsyncRead + AsyncWrite + Unpin, { - let mut map = HashMap::>::new(); + let mut map = HashMap::>::new(); let mut read_buf = BinaryMut::new(); let mut write_buf = BinaryMut::new(); let (mut reader, mut writer) = split(stream); @@ -362,10 +362,10 @@ impl CenterClient { Ok(()) } - fn calc_next_id(&mut self) -> u32 { + fn calc_next_id(&mut self) -> u64 { let id = self.next_id; self.next_id += 2; - id + Helper::calc_sock_map(self.option.server_id, id) } pub async fn deal_new_stream(&mut self, inbound: T) -> ProxyResult<()> diff --git a/src/streams/center_server.rs b/src/streams/center_server.rs index 9fba9cc..b9be05c 100644 --- a/src/streams/center_server.rs +++ b/src/streams/center_server.rs @@ -83,10 +83,10 @@ impl CenterServer { self.sender.is_closed() } - pub fn calc_next_id(&mut self) -> u32 { + pub fn calc_next_id(&mut self) -> u64 { let id = self.next_id; self.next_id += 2; - id + Helper::calc_sock_map(self.option.server_id, id) } pub async fn inner_serve( @@ -100,7 +100,7 @@ impl CenterServer { where T: AsyncRead + AsyncWrite + Unpin, { - let mut map = HashMap::>::new(); + let mut map = HashMap::>::new(); let mut read_buf = BinaryMut::new(); let mut write_buf = BinaryMut::new(); let mut verify_succ = option.username.is_none() && option.password.is_none(); diff --git a/src/streams/trans_stream.rs b/src/streams/trans_stream.rs index 532758b..923c2c8 100644 --- a/src/streams/trans_stream.rs +++ b/src/streams/trans_stream.rs @@ -33,7 +33,7 @@ where // 流有相应的AsyncRead + AsyncWrite + Unpin均可 stream: T, // sock绑定的句柄 - id: u32, + id: u64, // 读取的数据缓存,将转发成ProtFrame read: BinaryMut, // 写的数据缓存,直接写入到stream下,从ProtFrame转化而来 @@ -50,7 +50,7 @@ where { pub fn new( stream: T, - id: u32, + id: u64, in_sender: Sender, out_receiver: Receiver, ) -> Self { diff --git a/src/streams/virtual_stream.rs b/src/streams/virtual_stream.rs index 7759013..1bb5b85 100644 --- a/src/streams/virtual_stream.rs +++ b/src/streams/virtual_stream.rs @@ -27,7 +27,7 @@ use crate::{prot::ProtFrame}; pub struct VirtualStream { // sock绑定的句柄 - id: u32, + id: u64, // 收到数据通过sender发送给中心端 sender: PollSender, // 收到中心端的写入请求,转成write @@ -40,7 +40,7 @@ pub struct VirtualStream impl VirtualStream { - pub fn new(id: u32, sender: Sender, receiver: Receiver) -> Self { + pub fn new(id: u64, sender: Sender, receiver: Receiver) -> Self { Self { id, sender: PollSender::new(sender), diff --git a/src/trans/http.rs b/src/trans/http.rs index cca8516..0f9c4a3 100644 --- a/src/trans/http.rs +++ b/src/trans/http.rs @@ -42,7 +42,7 @@ impl HttpTrait for Operate { pub struct TransHttp { sender: Sender, sender_work: Sender<(ProtCreate, Sender)>, - sock_map: u32, + sock_map: u64, mappings: Arc>>, } @@ -51,7 +51,7 @@ struct HttpOper { pub sender: Sender>, pub virtual_sender: Option>, pub sender_work: Sender<(ProtCreate, Sender)>, - pub sock_map: u32, + pub sock_map: u64, pub mappings: Arc>>, pub http_map: Option, } @@ -60,7 +60,7 @@ impl TransHttp { pub fn new( sender: Sender, sender_work: Sender<(ProtCreate, Sender)>, - sock_map: u32, + sock_map: u64, mappings: Arc>>, ) -> Self { Self { diff --git a/src/trans/tcp.rs b/src/trans/tcp.rs index 5dddf06..09bd45c 100644 --- a/src/trans/tcp.rs +++ b/src/trans/tcp.rs @@ -22,7 +22,7 @@ use crate::{ProtFrame, TransStream, ProxyError, ProtCreate, MappingConfig}; pub struct TransTcp { sender: Sender, sender_work: Sender<(ProtCreate, Sender)>, - sock_map: u32, + sock_map: u64, mappings: Arc>>, } @@ -30,7 +30,7 @@ impl TransTcp { pub fn new( sender: Sender, sender_work: Sender<(ProtCreate, Sender)>, - sock_map: u32, + sock_map: u64, mappings: Arc>>, ) -> Self { Self { diff --git a/tests/mapping.rs b/tests/mapping.rs index ec9f28b..a056f75 100644 --- a/tests/mapping.rs +++ b/tests/mapping.rs @@ -217,6 +217,7 @@ mod tests { let mut result = BinaryMut::new(); res.body_mut().read_all(&mut result).await; + println!("result = {:?}", result.chunk()); assert_eq!(result.remaining(), HELLO_WORLD.as_bytes().len()); assert_eq!(result.as_slice(), HELLO_WORLD.as_bytes()); assert_eq!(res.version(), Version::Http2); diff --git a/tests/proxy.rs b/tests/proxy.rs index 5ec3907..0937fcc 100644 --- a/tests/proxy.rs +++ b/tests/proxy.rs @@ -81,7 +81,6 @@ mod tests { let mut res = match client.send_now(req.into_type()).await { Ok(res) => { if is_failed { - println!("status {:?}", res.status()); assert!(res.status() != 200); return ; }