Skip to content

Commit

Permalink
Merge branch '0.0.1.78.s.onlyfix' into 82.79
Browse files Browse the repository at this point in the history
在79.1版本基础上包含fix
  • Loading branch information
viciousstar committed Aug 9, 2023
2 parents d2b5c1b + 0764f9c commit 7763497
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 25 deletions.
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ debug = false
codegen-units = 1
lto = "fat"
opt-level = 3
strip = "none"


[profile.release-stable]
inherits = "release"
strip = "symbols"

[profile.release-perf]
Expand Down
20 changes: 20 additions & 0 deletions agent/src/init.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use context::Context;
pub(super) fn init(ctx: &Context) {
init_panic_hook();
init_signal();
init_limit(&ctx);
init_log(&ctx);
init_local_ip(&ctx);
Expand Down Expand Up @@ -64,3 +65,22 @@ pub(crate) fn init_local_ip(ctx: &Context) {
pub(crate) fn start_metrics_register_task(_ctx: &Context) {
rt::spawn(metrics::MetricRegister::default());
}

use tokio::signal::unix::{signal, SignalKind};
fn init_signal() {
let stream = signal(SignalKind::terminate());
match stream {
Ok(mut stream) => {
rt::spawn(async move {
loop {
stream.recv().await;
println!("got signal terminate");
}
});
}
Err(e) => {
println!("init signal failed: {:?}", e);
return;
}
}
}
7 changes: 5 additions & 2 deletions agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@ use protocol::Result;

// 默认支持
fn main() -> Result<()> {
tokio::runtime::Builder::new_multi_thread()
let result = tokio::runtime::Builder::new_multi_thread()
.worker_threads(context::get().thread_num as usize)
.thread_name("breeze-w")
.thread_stack_size(2 * 1024 * 1024)
.enable_all()
.build()
.unwrap()
.block_on(async { run().await })
.block_on(async { run().await });

println!("exit {:?}", result);
result
}

async fn run() -> Result<()> {
Expand Down
3 changes: 1 addition & 2 deletions ds/src/mem/ring_slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ impl RingSlice {

// 特殊情况下,打印合法字节,以及buff中全部的字节
pub unsafe fn data_dump(&self) -> &[u8] {
let oft_start = self.mask(self.start());
from_raw_parts(self.ptr().sub(oft_start), self.cap())
from_raw_parts(self.ptr(), self.cap())
}
#[inline(always)]
pub fn fold<I>(&self, mut init: I, mut v: impl FnMut(&mut I, u8)) -> I {
Expand Down
9 changes: 5 additions & 4 deletions protocol/src/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct CallbackContext {
request: HashedCommand,
response: MaybeUninit<Command>,
start: Instant, // 请求的开始时间
waker: *const AtomicWaker,
waker: *const Arc<AtomicWaker>,
callback: CallbackPtr,
quota: Option<BackendQuota>,
}
Expand All @@ -51,7 +51,7 @@ impl CallbackContext {
#[inline]
pub fn new(
req: HashedCommand,
waker: &AtomicWaker,
waker: *const Arc<AtomicWaker>,
cb: CallbackPtr,
first: bool,
last: bool,
Expand Down Expand Up @@ -143,10 +143,11 @@ impl CallbackContext {
// 需要重试或回写
return self.goon();
}
//防止markdone后,在pipeline中req被释放,req和waker被覆写
let waker = unsafe { self.waker.as_ref().unwrap().clone() };
self.mark_done();
if !self.async_mode {
// 说明有请求在pending
unsafe { (&*self.waker).wake() }
waker.wake()
}
}

Expand Down
26 changes: 13 additions & 13 deletions stream/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,19 +152,19 @@ where
req.on_complete(cmd);
}
Err(e) => match e {
Error::UnexpectedData => {
let req = self
.pending
.iter()
.map(|(r, _)| r.data())
.collect::<Vec<_>>();
let rsp_data = self.s.slice();
let rsp_buf = unsafe { rsp_data.data_dump() };
panic!(
"unexpected:{:?} rsp:{:?} buff:{:?} pending req:[{:?}] ",
self, rsp_data, rsp_buf, req
);
}
// Error::UnexpectedData => {
// let req = self
// .pending
// .iter()
// .map(|(r, _)| r.data())
// .collect::<Vec<_>>();
// let rsp_data = self.s.slice();
// let rsp_buf = unsafe { rsp_data.data_dump() };
// panic!(
// "unexpected:{:?} rsp:{:?} buff:{:?} pending req:[{:?}] ",
// self, rsp_data, rsp_buf, req
// );
// }
_ => {
return Poll::Ready(Err(e.into()));
}
Expand Down
8 changes: 4 additions & 4 deletions stream/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ where
client,
parser,
pending: VecDeque::with_capacity(15),
waker: AtomicWaker::default(),
waker: Arc::new(AtomicWaker::default()),
flush: false,
start: Instant::now(),
start_init: false,
Expand All @@ -57,7 +57,7 @@ pub struct CopyBidirectional<C, P, T> {
client: C,
parser: P,
pending: VecDeque<CallbackContextPtr>,
waker: AtomicWaker,
waker: Arc<AtomicWaker>,

metrics: Arc<StreamMetrics>,
// 上一次请求的开始时间。用在multiget时计算整体耗时。
Expand Down Expand Up @@ -232,7 +232,7 @@ where
// struct Visitor<'a, P, T> {
struct Visitor<'a, T> {
pending: &'a mut VecDeque<CallbackContextPtr>,
waker: &'a AtomicWaker,
waker: &'a Arc<AtomicWaker>,
top: &'a T,
// parser: &'a P,
first: &'a mut bool,
Expand All @@ -254,7 +254,7 @@ impl<'a, T: Topology<Item = Request> + TopologyCheck> protocol::RequestProcessor
let cb = self.top.callback();
let ctx = self
.arena
.alloc(CallbackContext::new(cmd, &self.waker, cb, first, last));
.alloc(CallbackContext::new(cmd, self.waker, cb, first, last));
let mut ctx = CallbackContextPtr::from(ctx, self.arena);

// pendding 会move走ctx,所以提前把req给封装好
Expand Down

0 comments on commit 7763497

Please sign in to comment.