diff --git a/endpoint/src/cacheservice/topo.rs b/endpoint/src/cacheservice/topo.rs index 1715668c7..8f880b7f9 100644 --- a/endpoint/src/cacheservice/topo.rs +++ b/endpoint/src/cacheservice/topo.rs @@ -1,7 +1,6 @@ use crate::select::Distance; use crate::{Endpoint, Endpoints, Topology}; use discovery::TopologyWrite; -use protocol::memcache::Binary; use protocol::{Protocol, Request, Resource::Memcache}; use sharding::hash::{Hash, HashKey, Hasher}; @@ -111,8 +110,10 @@ where }; req.try_next(try_next); req.write_back(write_back); - // TODO 有点怪异,先实现,晚点调整,这个属性直接从request获取更佳? fishermen - req.retry_on_rsp_notok(req.can_retry_on_rsp_notok()); + + // TODO 有点怪异,先实现,晚点调整,这个属性直接从request获取更佳? 预计2024.6.1后可清理 fishermen + // req.retry_on_rsp_notok(req.can_retry_on_rsp_notok()); + *req.mut_context() = ctx.ctx; log::debug!("+++ request sent prepared:{} - {} {}", idx, req, self); assert!(idx < self.streams.len(), "{} {} => {:?}", idx, self, req); diff --git a/endpoint/src/msgque/topo.rs b/endpoint/src/msgque/topo.rs index a73cee711..35ef3cb7b 100644 --- a/endpoint/src/msgque/topo.rs +++ b/endpoint/src/msgque/topo.rs @@ -144,7 +144,9 @@ where }; req.try_next(try_next); - req.retry_on_rsp_notok(true); + // TODO 设计原则:协议性质的属性,在构建时一次性设置,测试完毕后清理,预计2024.6.1后可清理 fishermen + // req.retry_on_rsp_notok(true); + *req.mut_context() = ctx.ctx; log::debug!( @@ -341,7 +343,7 @@ where let sec = self.last_updated_time.elapsed().as_secs(); write!( f, - "mq - {} rstrategy:{}, wstrategy:{}, backends/{:?}, writes/{:?}, changed: {}", + "mq - {} rstrategy:{}, wstrategy:{}, backends/{:?}, writes/{:?}, changed_time: {}", self.service, self.reader_strategy, self.writer_strategy, backends, self.writers, sec ) } diff --git a/protocol/src/callback.rs b/protocol/src/callback.rs index 5cd43460e..4e055cd3c 100644 --- a/protocol/src/callback.rs +++ b/protocol/src/callback.rs @@ -8,7 +8,7 @@ use std::{ }, }; -use crate::BackendQuota; +use crate::{BackendQuota, Protocol}; use ds::{time::Instant, AtomicWaker}; use crate::{request::Request, Command, Error, HashedCommand}; @@ -32,15 +32,16 @@ impl Callback { pub struct CallbackContext { pub(crate) flag: crate::Context, - async_mode: bool, // 是否是异步请求 - done: AtomicBool, // 当前模式请求是否完成 - inited: AtomicBool, // response是否已经初始化 - pub(crate) try_next: bool, // 请求失败后,topo层面是否允许重试 - pub(crate) retry_on_rsp_notok: bool, // 有响应且响应不ok时,协议层面是否允许重试 - pub(crate) write_back: bool, // 请求结束后,是否需要回写。 - pub(crate) max_tries: OnceCell, // 最大重试次数 - first: bool, // 当前请求是否是所有子请求的第一个 - last: bool, // 当前请求是否是所有子请求的最后一个 + async_mode: bool, // 是否是异步请求 + done: AtomicBool, // 当前模式请求是否完成 + inited: AtomicBool, // response是否已经初始化 + pub(crate) try_next: bool, // 请求失败后,topo层面是否允许重试 + pub(crate) write_back: bool, // 请求结束后,是否需要回写。 + // TODO 收拢try权限到协议context层面,读写都在本地进行,待讨论 fishermen + // retry_on_rsp_notok: bool, // 有响应且响应不ok时,协议层面是否允许重试 + max_tries: OnceCell, // 最大重试次数 + first: bool, // 当前请求是否是所有子请求的第一个 + last: bool, // 当前请求是否是所有子请求的最后一个 tries: AtomicU8, request: HashedCommand, response: MaybeUninit, @@ -58,8 +59,8 @@ impl CallbackContext { cb: CallbackPtr, first: bool, last: bool, - retry_on_rsp_notok: bool, - max_tries: u8, + // retry_on_rsp_notok: bool, + // max_tries: u8, ) -> Self { log::debug!("request prepared:{}", req); let now = Instant::now(); @@ -71,9 +72,9 @@ impl CallbackContext { inited: AtomicBool::new(false), async_mode: false, try_next: false, - retry_on_rsp_notok, + // retry_on_rsp_notok: false, + max_tries: OnceCell::new(), write_back: false, - max_tries: OnceCell::from(max_tries), request: req, response: MaybeUninit::uninit(), callback: cb, @@ -146,10 +147,11 @@ impl CallbackContext { if unsafe { self.unchecked_response().ok() } { return false; } + // TODO 去掉retry_on_rsp_notok,调整逻辑,待测试OK后再清理 fishermen //有响应并且!ok,配置了!retry_on_rsp_notok,不需要重试,比如mysql - if !self.retry_on_rsp_notok { - return false; - } + // if !self.retry_on_rsp_notok { + // return false; + // } } let max_tries = *self.max_tries.get().expect("max tries"); self.try_next && self.tries.fetch_add(1, Release) < max_tries @@ -290,6 +292,19 @@ impl CallbackContext { pub fn quota(&mut self, quota: BackendQuota) { self.quota = Some(quota); } + + #[inline] + pub fn with_retry_strategy

(mut self, parser: &P) -> Self + where + P: Protocol, + { + self.max_tries + .set(parser.max_tries(self.request.operation())) + .expect("retry"); + // TODO ok语意调整,不retry;协议是否正确用扩展的flag来表示 fishermer + self.retry_on_rsp_notok = parser.retry_on_rsp_notok(&self.request); + self + } } impl Drop for CallbackContext { diff --git a/protocol/src/memcache/binary/mod.rs b/protocol/src/memcache/binary/mod.rs index 2e84e1813..aee410ea2 100644 --- a/protocol/src/memcache/binary/mod.rs +++ b/protocol/src/memcache/binary/mod.rs @@ -20,7 +20,7 @@ impl Protocol for MemcacheBinary { #[inline] fn config(&self) -> crate::Config { crate::Config { - retry_on_rsp_notok: true, + // retry_on_rsp_notok: true, ..Default::default() } } @@ -190,6 +190,12 @@ impl Protocol for MemcacheBinary { None } } + + /// 在收到notok的响应时,对cas/casq/add等指令不重试,其他指令需重试 + #[inline] + fn retry_on_rsp_notok(&self, req: &HashedCommand) -> bool { + req.can_retry_on_rsp_notok() + } } impl MemcacheBinary { // 根据req构建response,status为mc协议status,共11种 diff --git a/protocol/src/memcache/binary/packet.rs b/protocol/src/memcache/binary/packet.rs index 93c3c2b79..53731e599 100644 --- a/protocol/src/memcache/binary/packet.rs +++ b/protocol/src/memcache/binary/packet.rs @@ -174,6 +174,7 @@ pub(crate) fn is_quiet_get(op_code: u8) -> bool { pub trait Binary { type Item; + fn is_request(&self) -> bool; fn op(&self) -> u8; fn operation(&self) -> Operation; fn noop(&self) -> bool; @@ -211,6 +212,13 @@ pub trait Op {} use ds::RingSlice; impl Binary for RingSlice { type Item = Self; + + /// check 该指令是否是request请求 + #[inline] + fn is_request(&self) -> bool { + debug_assert!(self.len() >= HEADER_LEN); + self.at(PacketPos::Magic as usize) == REQUEST_MAGIC + } #[inline(always)] fn op(&self) -> u8 { debug_assert!(self.len() >= HEADER_LEN); @@ -395,6 +403,9 @@ impl Binary for RingSlice { #[inline(always)] fn can_retry_on_rsp_notok(&self) -> bool { + // 这个方法只对request才生效 + assert!(self.is_request(), "{}", self); + let op = self.op(); assert!((op as usize) < RETRY_TABLE.len()); diff --git a/protocol/src/msgque/mod.rs b/protocol/src/msgque/mod.rs index 37f1a609d..d2425e77c 100644 --- a/protocol/src/msgque/mod.rs +++ b/protocol/src/msgque/mod.rs @@ -25,8 +25,13 @@ impl Protocol for McqText { process: &mut P, ) -> Result<()> { let data = stream.slice(); + log::debug!("+++ recv mq msg:{}", data); let mut oft = 0; + while let Some(mut lfcr) = data.find_lf_cr(oft) { + if data.len() < 4 { + return Err(Error::ProtocolIncomplete); + } let head4 = data.u32_le(oft); let (op_code, op) = if head4 == u32::from_le_bytes(*b"get ") { (OP_GET, Get) @@ -75,6 +80,7 @@ impl Protocol for McqText { #[inline] fn parse_response(&self, stream: &mut S) -> Result> { let data = stream.slice(); + log::debug!("+++ will parse mq rsp:{}", data); let Some(mut lfcr) = data.find_lf_cr(0) else { return Ok(None); }; @@ -170,6 +176,12 @@ impl Protocol for McqText { _ => 1, } } + + /// 对于mq,不管什么指令,只要返回错误rsp,都需要重试 + #[inline] + fn retry_on_rsp_notok(&self, _req: &HashedCommand) -> bool { + true + } } impl McqText { diff --git a/protocol/src/parser.rs b/protocol/src/parser.rs index 97b1e7ddf..aef55339d 100644 --- a/protocol/src/parser.rs +++ b/protocol/src/parser.rs @@ -62,7 +62,7 @@ pub struct ResOption { pub struct Config { pub need_auth: bool, pub pipeline: bool, - pub retry_on_rsp_notok: bool, + // pub retry_on_rsp_notok: bool, } pub enum HandShake { @@ -128,6 +128,15 @@ pub trait Proto: Unpin + Clone + Send + Sync + 'static { fn max_tries(&self, _req_op: Operation) -> u8 { 1_u8 } + + /// TODO 去掉该策略,status的ok、不ok只表示rsp本身是否是ok的; + /// 额外增加一个retry属性,表示在status not ok时,是否进行重试 + /// 在收到的rsp为notok时,是否重试,默认都不重试; + /// precondition:得收到rsp,且rsp为notok;没收到响应,不属于该策略的处理范围 + #[inline] + fn retry_on_rsp_notok(&self, _req: &HashedCommand) -> bool { + false + } } pub trait RequestProcessor { diff --git a/protocol/src/req.rs b/protocol/src/req.rs index 4577248bc..dd07446d2 100644 --- a/protocol/src/req.rs +++ b/protocol/src/req.rs @@ -63,8 +63,11 @@ pub trait Request: //fn is_write_back(&self) -> bool; // 请求失败后,topo层面是否允许进行重试 fn try_next(&mut self, goon: bool); + + // TODO 测试完毕后清理 fishermen // 请求失败后,协议层面是否允许进行重试 - fn retry_on_rsp_notok(&mut self, retry: bool); + // fn retry_on_rsp_notok(&mut self, retry: bool); + // 初始化quota fn quota(&mut self, quota: BackendQuota); } diff --git a/protocol/src/request.rs b/protocol/src/request.rs index f376095d4..a676c9c8d 100644 --- a/protocol/src/request.rs +++ b/protocol/src/request.rs @@ -49,10 +49,10 @@ impl crate::Request for Request { fn try_next(&mut self, goon: bool) { self.ctx().try_next = goon; } - #[inline] - fn retry_on_rsp_notok(&mut self, retry: bool) { - self.ctx().retry_on_rsp_notok = retry; - } + // #[inline] + // fn retry_on_rsp_notok(&mut self, retry: bool) { + // self.ctx().retry_on_rsp_notok = retry; + // } #[inline] fn quota(&mut self, quota: BackendQuota) { self.ctx().quota(quota); diff --git a/stream/src/pipeline.rs b/stream/src/pipeline.rs index f0e8490c2..96b08bd21 100644 --- a/stream/src/pipeline.rs +++ b/stream/src/pipeline.rs @@ -122,7 +122,7 @@ where top: &self.top, first: &mut self.first, arena: &mut self.arena, - retry_on_rsp_notok: self.parser.config().retry_on_rsp_notok, + // retry_on_rsp_notok: self.parser.config().retry_on_rsp_notok, parser: &self.parser, }; @@ -221,7 +221,7 @@ struct Visitor<'a, T, P> { parser: &'a P, first: &'a mut bool, arena: &'a mut CallbackContextArena, - retry_on_rsp_notok: bool, + // retry_on_rsp_notok: bool, } impl<'a, T: Topology + TopologyCheck, P: Protocol> protocol::RequestProcessor @@ -234,16 +234,10 @@ impl<'a, T: Topology + TopologyCheck, P: Protocol> protocol::Req // 否则下一个请求是子请求。 *self.first = last; let cb = self.top.callback(); - let req_op = cmd.operation(); - let ctx = self.arena.alloc(CallbackContext::new( - cmd, - self.waker, - cb, - first, - last, - self.retry_on_rsp_notok, - self.parser.max_tries(req_op), - )); + // let req_op = cmd.operation(); + let ctx = self.arena.alloc( + CallbackContext::new(cmd, self.waker, cb, first, last).with_retry_strategy(self.parser), + ); let mut ctx = CallbackContextPtr::from(ctx, self.arena); // pendding 会move走ctx,所以提前把req给封装好 diff --git a/tests_integration/src/mq/mod.rs b/tests_integration/src/mq/mod.rs index b6a7a98a3..a71b5fe58 100644 --- a/tests_integration/src/mq/mod.rs +++ b/tests_integration/src/mq/mod.rs @@ -10,6 +10,8 @@ fn msgque_write_read() { let key = "k2"; let count = 5; + // let count = 1000; + // const QSIZES: [usize; 3] = [512, 4096, 32768]; const QSIZES: [usize; 2] = [512, 4096]; for i in 0..count { @@ -48,6 +50,7 @@ fn msgque_write() { let key = "k2"; let count = 5; + // const QSIZES: [usize; 3] = [512, 4096, 32768]; const QSIZES: [usize; 2] = [512, 4096]; for i in 0..count { @@ -64,6 +67,7 @@ fn msgque_write() { fn msgque_read() { let mq_client = mc_get_text_conn(MQ); + // const COUNT: i32 = 1000; const COUNT: i32 = 5; let key = "k2";