Skip to content

Commit

Permalink
增加批量grpc链接测试工具,并打包grpc管理的链接数 #94
Browse files Browse the repository at this point in the history
  • Loading branch information
heqingpan committed May 28, 2024
1 parent eef220b commit 837dc6b
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 8 deletions.
32 changes: 32 additions & 0 deletions loadtest/src/bin/multiple_grpc_conn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#![allow(unused_imports, unreachable_code)]
use nacos_rust_client::client::naming_client::{InstanceDefaultListener, ServiceInstanceKey};
use nacos_rust_client::conn_manage;
use std::sync::Arc;

use std::time::Duration;

use nacos_rust_client::client::naming_client::{Instance, NamingClient, QueryInstanceListParams};
use nacos_rust_client::client::{AuthInfo, ClientBuilder, HostInfo};

pub(crate) const CONN_COUNT: usize = 100;

#[tokio::main]
async fn main() {
std::env::set_var("RUST_LOG", "INFO");
env_logger::init();
let namespace_id = "public".to_owned();
let mut list = vec![];
for i in 0..CONN_COUNT {
let auth_info = None;
let client = ClientBuilder::new()
.set_endpoint_addrs("127.0.0.1:8848")
.set_auth_info(auth_info)
.set_tenant(namespace_id.clone())
.set_use_grpc(true)
.build_naming_client();
list.push(client);
}
tokio::signal::ctrl_c()
.await
.expect("failed to listen for event");
}
32 changes: 27 additions & 5 deletions src/grpc/bistream_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ impl BiStreamConn {
}

pub fn receive(&mut self, ctx: &mut Context<Self>) {
println!("BiStreamConn start receive");
if let Some(mut receiver_stream) = self.receiver_stream.replace(None) {
let manage = self.manage.clone();
let client_id = self.client_id.clone();
async move {
if let Some(Ok(_payload)) = receiver_stream.next().await {
//println!("BiStreamConn receive frist msg:{}",PayloadUtils::get_payload_string(&payload));
}
//if let Some(Ok(_payload)) = receiver_stream.next().await {
//println!("BiStreamConn receive frist msg:{}",PayloadUtils::get_payload_string(&payload));
//}
while let Some(Ok(payload)) = receiver_stream.next().await {
//println!("BiStreamConn receive msg:{}",PayloadUtils::get_payload_string(&payload));
manage.do_send(BiStreamManageCmd::Response(client_id.clone(), payload));
Expand All @@ -50,6 +51,8 @@ impl BiStreamConn {
}
.into_actor(self)
.map(|_, _, ctx| {
//debug
println!("stop at receive!");
ctx.stop();
})
.spawn(ctx);
Expand All @@ -65,21 +68,40 @@ impl BiStreamConn {
.map(|_, _, _| {})
.spawn(ctx);
}

fn close_stream_and_stop(&mut self, ctx: &mut Context<Self>) {
let sender = self.sender.clone();
async move {
//debug
println!("close_stream_and_stop! 01");
sender.send(Err(tonic::Status::cancelled("close"))).await
}
.into_actor(self)
.map(|_, _, ctx| {
//debug
println!("stop at close_stream_and_stop!");
ctx.stop();
})
.wait(ctx);
}
}

impl Actor for BiStreamConn {
type Context = Context<Self>;

fn started(&mut self, _ctx: &mut Self::Context) {
fn started(&mut self, ctx: &mut Self::Context) {
//log::info!("BiStreamConn started");
self.receive(ctx);
}
}

/*
impl Supervised for BiStreamConn {
fn restarting(&mut self, _ctx: &mut <Self as Actor>::Context) {
log::warn!("BiStreamConn restart ...");
}
}
*/

#[derive(Debug, Message)]
#[rtype(result = "Result<BiStreamSenderResult,std::io::Error>")]
Expand Down Expand Up @@ -129,7 +151,7 @@ impl Handler<BiStreamSenderCmd> for BiStreamConn {
self.send_payload(ctx, payload.as_ref().to_owned());
}
BiStreamSenderCmd::Close => {
ctx.stop();
self.close_stream_and_stop(ctx);
}
}
Ok(BiStreamSenderResult::None)
Expand Down
2 changes: 2 additions & 0 deletions src/grpc/bistream_manage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,11 @@ impl Handler<BiStreamManageCmd> for BiStreamManage {
if let Some(naming_addr) = &self.naming_addr {
naming_addr.do_send(NamingCmd::RemoveClient(client_id));
}
println!("|ConnClose|conn size: {}",self.conn_cache.len());
}
BiStreamManageCmd::AddConn(client_id, conn) => {
self.add_conn(client_id, conn.start());
println!("|AddConn|conn size: {}",self.conn_cache.len());
}
BiStreamManageCmd::ActiveClinet(client_id) => {
self.active_client(client_id)?;
Expand Down
6 changes: 3 additions & 3 deletions src/raft/cache/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// raft缓存数据

use std::{convert::TryInto, sync::Arc};
use std::time::Duration;
use std::{convert::TryInto, sync::Arc};

use actix::prelude::*;
use bean_factory::{bean, Inject};
Expand Down Expand Up @@ -42,7 +42,7 @@ type KvPair = (Vec<u8>, Vec<u8>);
impl CacheManager {
pub fn new() -> Self {
Self {
cache: MemCache::default(),
cache: MemCache::new(),
//default_timeout: 1200,
raft_table_route: None,
table_manager: None,
Expand Down Expand Up @@ -135,7 +135,7 @@ impl Inject for CacheManager {
self.load(ctx).ok();
//增加每10秒触发缓存清理
self.cache.mode = MemCacheMode::None;
ctx.run_interval(Duration::from_millis(10000),|act,_|{
ctx.run_interval(Duration::from_millis(10000), |act, _| {
act.cache.clear_time_out();
});
}
Expand Down

0 comments on commit 837dc6b

Please sign in to comment.