Skip to content

Commit

Permalink
fix server id
Browse files Browse the repository at this point in the history
  • Loading branch information
tickbh committed Jan 25, 2024
1 parent de8e93a commit fb90973
Show file tree
Hide file tree
Showing 17 changed files with 67 additions and 74 deletions.
1 change: 0 additions & 1 deletion examples/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ async fn test_proxy(
let mut res = match client.send_now(req.into_type()).await {
Ok(res) => {
if is_failed {
println!("status {:?}", res.status());
assert!(res.status() != 200);
return ;
}
Expand Down
4 changes: 4 additions & 0 deletions src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,10 @@ impl Helper {
}
}

pub fn calc_sock_map(server_id: u32, sock_map: u32) -> u64 {
((server_id as u64) << 32) + (sock_map as u64)
}

// pub async fn udp_recv_from(socket: &UdpSocket, buf: &mut [u8]) -> io::Result<usize> {
// let (s, addr) = socket.recv_from(&mut buf).await?;
// unsafe {
Expand Down
2 changes: 0 additions & 2 deletions src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -706,11 +706,9 @@ impl ConfigOption {
if let Some(http) = &mut self.http {
http.after_load_option()?;
}

if let Some(stream) = &mut self.stream {
stream.copy_to_child();
}
println!("options = {:?}", self);
Ok(())
}

Expand Down
16 changes: 7 additions & 9 deletions src/prot/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,39 +22,37 @@ use super::{ProtFrameHeader, read_short_string, write_short_string};
/// 旧的Socket连接关闭, 接收到则关闭掉当前的连接
#[derive(Debug)]
pub struct ProtClose {
server_id: u32,
sock_map: u32,
sock_map: u64,
reason: String,
}

impl ProtClose {
pub fn new(server_id: u32, sock_map: u32) -> ProtClose {
ProtClose { server_id, sock_map, reason: String::new() }
pub fn new(sock_map: u64) -> ProtClose {
ProtClose { sock_map, reason: String::new() }
}

pub fn new_by_reason(server_id: u32, sock_map: u32, reason: String) -> ProtClose {
ProtClose { server_id, sock_map, reason }
pub fn new_by_reason(sock_map: u64, reason: String) -> ProtClose {
ProtClose { sock_map, reason }
}

pub fn parse<T: Buf>(header: ProtFrameHeader, mut buf: T) -> ProxyResult<ProtClose> {
let reason = read_short_string(&mut buf)?;
Ok(ProtClose {
server_id: header.server_id(),
sock_map: header.sock_map(),
reason,
})
}

pub fn encode<B: Buf + BufMut>(self, buf: &mut B) -> ProxyResult<usize> {
let mut head = ProtFrameHeader::new(ProtKind::Close, ProtFlag::zero(), self.sock_map, self.server_id);
let mut head = ProtFrameHeader::new(ProtKind::Close, ProtFlag::zero(), self.sock_map);
head.length = self.reason.as_bytes().len() as u32 + 1;
let mut size = 0;
size += head.encode(buf)?;
size += write_short_string(buf, &self.reason)?;
Ok(size)
}

pub fn sock_map(&self) -> u32 {
pub fn sock_map(&self) -> u64 {
self.sock_map
}

Expand Down
27 changes: 16 additions & 11 deletions src/prot/create.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2022 - 2023 Wenmeng See the COPYRIGHT
// file at the top-level directory of this distribution.
//
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//
//
// Author: tickbh
// -----
// Created Date: 2023/09/22 10:28:28
Expand All @@ -19,21 +19,19 @@ use crate::{

use super::ProtFrameHeader;

/// 新的Socket连接请求,
/// 新的Socket连接请求,
/// 接收方创建一个虚拟链接来对应该Socket的读取写入
#[derive(Debug)]
#[allow(dead_code)]
pub struct ProtCreate {
server_id: u32,
sock_map: u32,
sock_map: u64,
mode: u8,
domain: Option<String>,
}

impl ProtCreate {
pub fn new(server_id: u32,sock_map: u32, domain: Option<String>) -> Self {
pub fn new(sock_map: u64, domain: Option<String>) -> Self {
Self {
server_id,
sock_map,
mode: 0,
domain,
Expand All @@ -51,16 +49,23 @@ impl ProtCreate {
domain = Some(String::from_utf8_lossy(data).to_string());
}
Ok(ProtCreate {
server_id: header.server_id(),
sock_map: header.sock_map(),
mode: 0,
domain,
})
}

pub fn encode<B: Buf + BufMut>(self, buf: &mut B) -> ProxyResult<usize> {
let mut head = ProtFrameHeader::new(ProtKind::Create, ProtFlag::zero(), self.sock_map, self.server_id);
let domain_len = self.domain.as_ref().map(|s| s.as_bytes().len() as u32).unwrap_or(0);
let mut head = ProtFrameHeader::new(
ProtKind::Create,
ProtFlag::zero(),
self.sock_map,
);
let domain_len = self
.domain
.as_ref()
.map(|s| s.as_bytes().len() as u32)
.unwrap_or(0);
head.length = 1 + domain_len;
let mut size = 0;
size += head.encode(buf)?;
Expand All @@ -71,7 +76,7 @@ impl ProtCreate {
Ok(size)
}

pub fn sock_map(&self) -> u32 {
pub fn sock_map(&self) -> u64 {
self.sock_map
}

Expand Down
12 changes: 5 additions & 7 deletions src/prot/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,26 @@ use super::ProtFrameHeader;
/// Socket的数据消息包
#[derive(Debug)]
pub struct ProtData {
server_id: u32,
sock_map: u32,
sock_map: u64,
data: Vec<u8>,
}

impl ProtData {
pub fn new(server_id: u32,sock_map: u32, data: Vec<u8>) -> ProtData {
Self { server_id, sock_map, data }
pub fn new(sock_map: u64, data: Vec<u8>) -> ProtData {
Self { sock_map, data }
}

pub fn parse<T: Buf>(header: ProtFrameHeader, mut buf: T) -> ProxyResult<ProtData> {
log::trace!("代理中心: 解码Data数据长度={}", header.length);
Ok(Self {
server_id: header.server_id(),
sock_map: header.sock_map(),
data: buf.advance_chunk(header.length as usize).to_vec(),
})
}

pub fn encode<B: Buf + BufMut>(mut self, buf: &mut B) -> ProxyResult<usize> {
log::trace!("代理中心: 编码Data数据长度={}", self.data.len());
let mut head = ProtFrameHeader::new(ProtKind::Data, ProtFlag::zero(), self.sock_map, self.server_id);
let mut head = ProtFrameHeader::new(ProtKind::Data, ProtFlag::zero(), self.sock_map);
head.length = self.data.len() as u32;
let mut size = 0;
size += head.encode(buf)?;
Expand All @@ -55,7 +53,7 @@ impl ProtData {
&self.data
}

pub fn sock_map(&self) -> u32 {
pub fn sock_map(&self) -> u64 {
self.sock_map
}
}
39 changes: 15 additions & 24 deletions src/prot/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@
// Created Date: 2023/09/22 10:30:10


use tokio_util::bytes::buf;
use webparse::{Buf, http2::frame::{read_u24, encode_u24}, BufMut};

use crate::{ProxyResult, MappingConfig};
use crate::{Helper, MappingConfig, ProxyResult};

use super::{ProtCreate, ProtClose, ProtData, ProtFlag, ProtKind, ProtMapping, ProtToken};

Expand All @@ -27,10 +26,8 @@ pub struct ProtFrameHeader {
kind: ProtKind,
/// 包体的标识, 如是否为响应包等
flag: ProtFlag,
/// 3个字节, socket在内存中相应的句柄, 客户端发起为单数, 服务端发起为双数
sock_map: u32,
/// 服务器的id
server_id: u32,
/// 前32位表示server_id, 后四位3个字节表示id, socket在内存中相应的句柄, 客户端发起为单数, 服务端发起为双数
sock_map: u64,
}

#[derive(Debug)]
Expand All @@ -50,21 +47,16 @@ pub enum ProtFrame {
impl ProtFrameHeader {
pub const FRAME_HEADER_BYTES: usize = 12;

pub fn new(kind: ProtKind, flag: ProtFlag, sock_map: u32, server_id: u32) -> ProtFrameHeader {
pub fn new(kind: ProtKind, flag: ProtFlag, sock_map: u64) -> ProtFrameHeader {
ProtFrameHeader {
length: 0,
kind,
flag,
sock_map,
server_id,
}
}

pub fn server_id(&self) -> u32 {
self.server_id
}

pub fn sock_map(&self) -> u32 {
pub fn sock_map(&self) -> u64 {
self.sock_map
}

Expand Down Expand Up @@ -94,8 +86,7 @@ impl ProtFrameHeader {
length,
kind: ProtKind::new(kind),
flag: ProtFlag::new(flag),
sock_map,
server_id,
sock_map: Helper::calc_sock_map(server_id, sock_map),
})
}

Expand All @@ -105,8 +96,8 @@ impl ProtFrameHeader {
size += encode_u24(buffer, self.length);
size += buffer.put_u8(self.kind.encode());
size += buffer.put_u8(self.flag.bits());
size += encode_u24(buffer, self.sock_map);
size += buffer.put_u32(self.server_id);
size += encode_u24(buffer, self.sock_map as u32);
size += buffer.put_u32((self.sock_map >> 32) as u32);
Ok(size)
}

Expand Down Expand Up @@ -144,23 +135,23 @@ impl ProtFrame {
Ok(size)
}

pub fn new_create(server_id: u32, sock_map: u32, domain: Option<String>) -> Self {
Self::Create(ProtCreate::new(server_id, sock_map, domain))
pub fn new_create(sock_map: u64, domain: Option<String>) -> Self {
Self::Create(ProtCreate::new(sock_map, domain))
}

pub fn new_close(sock_map: u32) -> Self {
pub fn new_close(sock_map: u64) -> Self {
Self::Close(ProtClose::new(sock_map))
}

pub fn new_close_reason(sock_map: u32, reason: String) -> Self {
pub fn new_close_reason(sock_map: u64, reason: String) -> Self {
Self::Close(ProtClose::new_by_reason(sock_map, reason))
}

pub fn new_data(sock_map: u32, data: Vec<u8>) -> Self {
pub fn new_data(sock_map: u64, data: Vec<u8>) -> Self {
Self::Data(ProtData::new(sock_map, data))
}

pub fn new_mapping(sock_map: u32, mappings: Vec<MappingConfig>) -> Self {
pub fn new_mapping(sock_map: u64, mappings: Vec<MappingConfig>) -> Self {
Self::Mapping(ProtMapping::new(sock_map, mappings))
}

Expand Down Expand Up @@ -196,7 +187,7 @@ impl ProtFrame {
}
}

pub fn sock_map(&self) -> u32 {
pub fn sock_map(&self) -> u64 {
match self {
ProtFrame::Data(s) => s.sock_map(),
ProtFrame::Create(s) => s.sock_map(),
Expand Down
6 changes: 3 additions & 3 deletions src/prot/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ use super::{ProtFrameHeader, read_short_string, write_short_string};
/// 接收方创建一个虚拟链接来对应该Socket的读取写入
#[derive(Debug)]
pub struct ProtMapping {
sock_map: u32,
sock_map: u64,
pub mappings: Vec<MappingConfig>,
}

impl ProtMapping {
pub fn new(sock_map: u32, mappings: Vec<MappingConfig>) -> Self {
pub fn new(sock_map: u64, mappings: Vec<MappingConfig>) -> Self {
Self {
sock_map,
mappings,
Expand Down Expand Up @@ -87,7 +87,7 @@ impl ProtMapping {
Ok(size)
}

pub fn sock_map(&self) -> u32 {
pub fn sock_map(&self) -> u64 {
self.sock_map
}

Expand Down
2 changes: 1 addition & 1 deletion src/prot/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl ProtToken {
return true
}

pub fn sock_map(&self) -> u32 {
pub fn sock_map(&self) -> u64 {
0
}
}
6 changes: 3 additions & 3 deletions src/streams/center_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl CenterClient {
where
T: AsyncRead + AsyncWrite + Unpin,
{
let mut map = HashMap::<u32, Sender<ProtFrame>>::new();
let mut map = HashMap::<u64, Sender<ProtFrame>>::new();
let mut read_buf = BinaryMut::new();
let mut write_buf = BinaryMut::new();
let (mut reader, mut writer) = split(stream);
Expand Down Expand Up @@ -362,10 +362,10 @@ impl CenterClient {
Ok(())
}

fn calc_next_id(&mut self) -> u32 {
fn calc_next_id(&mut self) -> u64 {
let id = self.next_id;
self.next_id += 2;
id
Helper::calc_sock_map(self.option.server_id, id)
}

pub async fn deal_new_stream<T>(&mut self, inbound: T) -> ProxyResult<()>
Expand Down
6 changes: 3 additions & 3 deletions src/streams/center_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ impl CenterServer {
self.sender.is_closed()
}

pub fn calc_next_id(&mut self) -> u32 {
pub fn calc_next_id(&mut self) -> u64 {
let id = self.next_id;
self.next_id += 2;
id
Helper::calc_sock_map(self.option.server_id, id)
}

pub async fn inner_serve<T>(
Expand All @@ -100,7 +100,7 @@ impl CenterServer {
where
T: AsyncRead + AsyncWrite + Unpin,
{
let mut map = HashMap::<u32, Sender<ProtFrame>>::new();
let mut map = HashMap::<u64, Sender<ProtFrame>>::new();
let mut read_buf = BinaryMut::new();
let mut write_buf = BinaryMut::new();
let mut verify_succ = option.username.is_none() && option.password.is_none();
Expand Down
4 changes: 2 additions & 2 deletions src/streams/trans_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ where
// 流有相应的AsyncRead + AsyncWrite + Unpin均可
stream: T,
// sock绑定的句柄
id: u32,
id: u64,
// 读取的数据缓存,将转发成ProtFrame
read: BinaryMut,
// 写的数据缓存,直接写入到stream下,从ProtFrame转化而来
Expand All @@ -50,7 +50,7 @@ where
{
pub fn new(
stream: T,
id: u32,
id: u64,
in_sender: Sender<ProtFrame>,
out_receiver: Receiver<ProtFrame>,
) -> Self {
Expand Down
Loading

0 comments on commit fb90973

Please sign in to comment.