Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

重构retry策略,将所有对应逻辑收归到协议context中 #458

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions endpoint/src/cacheservice/topo.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::select::Distance;
use crate::{Endpoint, Endpoints, Topology};
use discovery::TopologyWrite;
use protocol::memcache::Binary;
use protocol::{Protocol, Request, Resource::Memcache};
use sharding::hash::{Hash, HashKey, Hasher};

Expand Down Expand Up @@ -111,8 +110,10 @@ where
};
req.try_next(try_next);
req.write_back(write_back);
// TODO 有点怪异,先实现,晚点调整,这个属性直接从request获取更佳? fishermen
req.retry_on_rsp_notok(req.can_retry_on_rsp_notok());

// TODO 有点怪异,先实现,晚点调整,这个属性直接从request获取更佳? 预计2024.6.1后可清理 fishermen
// req.retry_on_rsp_notok(req.can_retry_on_rsp_notok());

*req.mut_context() = ctx.ctx;
log::debug!("+++ request sent prepared:{} - {} {}", idx, req, self);
assert!(idx < self.streams.len(), "{} {} => {:?}", idx, self, req);
Expand Down
6 changes: 4 additions & 2 deletions endpoint/src/msgque/topo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ where
};

req.try_next(try_next);
req.retry_on_rsp_notok(true);
// TODO 设计原则:协议性质的属性,在构建时一次性设置,测试完毕后清理,预计2024.6.1后可清理 fishermen
// req.retry_on_rsp_notok(true);

*req.mut_context() = ctx.ctx;

log::debug!(
Expand Down Expand Up @@ -341,7 +343,7 @@ where
let sec = self.last_updated_time.elapsed().as_secs();
write!(
f,
"mq - {} rstrategy:{}, wstrategy:{}, backends/{:?}, writes/{:?}, changed: {}",
"mq - {} rstrategy:{}, wstrategy:{}, backends/{:?}, writes/{:?}, changed_time: {}",
self.service, self.reader_strategy, self.writer_strategy, backends, self.writers, sec
)
}
Expand Down
49 changes: 32 additions & 17 deletions protocol/src/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
},
};

use crate::BackendQuota;
use crate::{BackendQuota, Protocol};
use ds::{time::Instant, AtomicWaker};

use crate::{request::Request, Command, Error, HashedCommand};
Expand All @@ -32,15 +32,16 @@ impl Callback {

pub struct CallbackContext {
pub(crate) flag: crate::Context,
async_mode: bool, // 是否是异步请求
done: AtomicBool, // 当前模式请求是否完成
inited: AtomicBool, // response是否已经初始化
pub(crate) try_next: bool, // 请求失败后,topo层面是否允许重试
pub(crate) retry_on_rsp_notok: bool, // 有响应且响应不ok时,协议层面是否允许重试
pub(crate) write_back: bool, // 请求结束后,是否需要回写。
pub(crate) max_tries: OnceCell<u8>, // 最大重试次数
first: bool, // 当前请求是否是所有子请求的第一个
last: bool, // 当前请求是否是所有子请求的最后一个
async_mode: bool, // 是否是异步请求
done: AtomicBool, // 当前模式请求是否完成
inited: AtomicBool, // response是否已经初始化
pub(crate) try_next: bool, // 请求失败后,topo层面是否允许重试
pub(crate) write_back: bool, // 请求结束后,是否需要回写。
// TODO 收拢try权限到协议context层面,读写都在本地进行,待讨论 fishermen
// retry_on_rsp_notok: bool, // 有响应且响应不ok时,协议层面是否允许重试
max_tries: OnceCell<u8>, // 最大重试次数
first: bool, // 当前请求是否是所有子请求的第一个
last: bool, // 当前请求是否是所有子请求的最后一个
tries: AtomicU8,
request: HashedCommand,
response: MaybeUninit<Command>,
Expand All @@ -58,8 +59,8 @@ impl CallbackContext {
cb: CallbackPtr,
first: bool,
last: bool,
retry_on_rsp_notok: bool,
max_tries: u8,
// retry_on_rsp_notok: bool,
// max_tries: u8,
) -> Self {
log::debug!("request prepared:{}", req);
let now = Instant::now();
Expand All @@ -71,9 +72,9 @@ impl CallbackContext {
inited: AtomicBool::new(false),
async_mode: false,
try_next: false,
retry_on_rsp_notok,
// retry_on_rsp_notok: false,
max_tries: OnceCell::new(),
write_back: false,
max_tries: OnceCell::from(max_tries),
request: req,
response: MaybeUninit::uninit(),
callback: cb,
Expand Down Expand Up @@ -146,10 +147,11 @@ impl CallbackContext {
if unsafe { self.unchecked_response().ok() } {
return false;
}
// TODO 去掉retry_on_rsp_notok,调整逻辑,待测试OK后再清理 fishermen
//有响应并且!ok,配置了!retry_on_rsp_notok,不需要重试,比如mysql
if !self.retry_on_rsp_notok {
return false;
}
// if !self.retry_on_rsp_notok {
// return false;
// }
}
let max_tries = *self.max_tries.get().expect("max tries");
self.try_next && self.tries.fetch_add(1, Release) < max_tries
Expand Down Expand Up @@ -290,6 +292,19 @@ impl CallbackContext {
pub fn quota(&mut self, quota: BackendQuota) {
self.quota = Some(quota);
}

#[inline]
pub fn with_retry_strategy<P>(mut self, parser: &P) -> Self
where
P: Protocol,
{
self.max_tries
.set(parser.max_tries(self.request.operation()))
.expect("retry");
// TODO ok语意调整,不retry;协议是否正确用扩展的flag来表示 fishermer
self.retry_on_rsp_notok = parser.retry_on_rsp_notok(&self.request);
self
}
}

impl Drop for CallbackContext {
Expand Down
8 changes: 7 additions & 1 deletion protocol/src/memcache/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl Protocol for MemcacheBinary {
#[inline]
fn config(&self) -> crate::Config {
crate::Config {
retry_on_rsp_notok: true,
// retry_on_rsp_notok: true,
..Default::default()
}
}
Expand Down Expand Up @@ -190,6 +190,12 @@ impl Protocol for MemcacheBinary {
None
}
}

/// 在收到notok的响应时,对cas/casq/add等指令不重试,其他指令需重试
#[inline]
fn retry_on_rsp_notok(&self, req: &HashedCommand) -> bool {
req.can_retry_on_rsp_notok()
}
}
impl MemcacheBinary {
// 根据req构建response,status为mc协议status,共11种
Expand Down
11 changes: 11 additions & 0 deletions protocol/src/memcache/binary/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ pub(crate) fn is_quiet_get(op_code: u8) -> bool {

pub trait Binary {
type Item;
fn is_request(&self) -> bool;
fn op(&self) -> u8;
fn operation(&self) -> Operation;
fn noop(&self) -> bool;
Expand Down Expand Up @@ -211,6 +212,13 @@ pub trait Op {}
use ds::RingSlice;
impl Binary for RingSlice {
type Item = Self;

/// check 该指令是否是request请求
#[inline]
fn is_request(&self) -> bool {
debug_assert!(self.len() >= HEADER_LEN);
self.at(PacketPos::Magic as usize) == REQUEST_MAGIC
}
#[inline(always)]
fn op(&self) -> u8 {
debug_assert!(self.len() >= HEADER_LEN);
Expand Down Expand Up @@ -395,6 +403,9 @@ impl Binary for RingSlice {

#[inline(always)]
fn can_retry_on_rsp_notok(&self) -> bool {
// 这个方法只对request才生效
assert!(self.is_request(), "{}", self);

let op = self.op();
assert!((op as usize) < RETRY_TABLE.len());

Expand Down
12 changes: 12 additions & 0 deletions protocol/src/msgque/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@ impl Protocol for McqText {
process: &mut P,
) -> Result<()> {
let data = stream.slice();
log::debug!("+++ recv mq msg:{}", data);
let mut oft = 0;

while let Some(mut lfcr) = data.find_lf_cr(oft) {
if data.len() < 4 {
return Err(Error::ProtocolIncomplete);
}
let head4 = data.u32_le(oft);
let (op_code, op) = if head4 == u32::from_le_bytes(*b"get ") {
(OP_GET, Get)
Expand Down Expand Up @@ -75,6 +80,7 @@ impl Protocol for McqText {
#[inline]
fn parse_response<S: Stream>(&self, stream: &mut S) -> Result<Option<Command>> {
let data = stream.slice();
log::debug!("+++ will parse mq rsp:{}", data);
let Some(mut lfcr) = data.find_lf_cr(0) else {
return Ok(None);
};
Expand Down Expand Up @@ -170,6 +176,12 @@ impl Protocol for McqText {
_ => 1,
}
}

/// 对于mq,不管什么指令,只要返回错误rsp,都需要重试
#[inline]
fn retry_on_rsp_notok(&self, _req: &HashedCommand) -> bool {
true
}
}

impl McqText {
Expand Down
11 changes: 10 additions & 1 deletion protocol/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub struct ResOption {
pub struct Config {
pub need_auth: bool,
pub pipeline: bool,
pub retry_on_rsp_notok: bool,
// pub retry_on_rsp_notok: bool,
}

pub enum HandShake {
Expand Down Expand Up @@ -128,6 +128,15 @@ pub trait Proto: Unpin + Clone + Send + Sync + 'static {
fn max_tries(&self, _req_op: Operation) -> u8 {
1_u8
}

/// TODO 去掉该策略,status的ok、不ok只表示rsp本身是否是ok的;
/// 额外增加一个retry属性,表示在status not ok时,是否进行重试
/// 在收到的rsp为notok时,是否重试,默认都不重试;
/// precondition:得收到rsp,且rsp为notok;没收到响应,不属于该策略的处理范围
#[inline]
fn retry_on_rsp_notok(&self, _req: &HashedCommand) -> bool {
false
}
}

pub trait RequestProcessor {
Expand Down
5 changes: 4 additions & 1 deletion protocol/src/req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@ pub trait Request:
//fn is_write_back(&self) -> bool;
// 请求失败后,topo层面是否允许进行重试
fn try_next(&mut self, goon: bool);

// TODO 测试完毕后清理 fishermen
// 请求失败后,协议层面是否允许进行重试
fn retry_on_rsp_notok(&mut self, retry: bool);
// fn retry_on_rsp_notok(&mut self, retry: bool);

// 初始化quota
fn quota(&mut self, quota: BackendQuota);
}
8 changes: 4 additions & 4 deletions protocol/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ impl crate::Request for Request {
fn try_next(&mut self, goon: bool) {
self.ctx().try_next = goon;
}
#[inline]
fn retry_on_rsp_notok(&mut self, retry: bool) {
self.ctx().retry_on_rsp_notok = retry;
}
// #[inline]
// fn retry_on_rsp_notok(&mut self, retry: bool) {
// self.ctx().retry_on_rsp_notok = retry;
// }
#[inline]
fn quota(&mut self, quota: BackendQuota) {
self.ctx().quota(quota);
Expand Down
18 changes: 6 additions & 12 deletions stream/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ where
top: &self.top,
first: &mut self.first,
arena: &mut self.arena,
retry_on_rsp_notok: self.parser.config().retry_on_rsp_notok,
// retry_on_rsp_notok: self.parser.config().retry_on_rsp_notok,
parser: &self.parser,
};

Expand Down Expand Up @@ -221,7 +221,7 @@ struct Visitor<'a, T, P> {
parser: &'a P,
first: &'a mut bool,
arena: &'a mut CallbackContextArena,
retry_on_rsp_notok: bool,
// retry_on_rsp_notok: bool,
}

impl<'a, T: Topology<Item = Request> + TopologyCheck, P: Protocol> protocol::RequestProcessor
Expand All @@ -234,16 +234,10 @@ impl<'a, T: Topology<Item = Request> + TopologyCheck, P: Protocol> protocol::Req
// 否则下一个请求是子请求。
*self.first = last;
let cb = self.top.callback();
let req_op = cmd.operation();
let ctx = self.arena.alloc(CallbackContext::new(
cmd,
self.waker,
cb,
first,
last,
self.retry_on_rsp_notok,
self.parser.max_tries(req_op),
));
// let req_op = cmd.operation();
let ctx = self.arena.alloc(
CallbackContext::new(cmd, self.waker, cb, first, last).with_retry_strategy(self.parser),
);
let mut ctx = CallbackContextPtr::from(ctx, self.arena);

// pendding 会move走ctx,所以提前把req给封装好
Expand Down
4 changes: 4 additions & 0 deletions tests_integration/src/mq/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ fn msgque_write_read() {

let key = "k2";
let count = 5;
// let count = 1000;
// const QSIZES: [usize; 3] = [512, 4096, 32768];
const QSIZES: [usize; 2] = [512, 4096];

for i in 0..count {
Expand Down Expand Up @@ -48,6 +50,7 @@ fn msgque_write() {

let key = "k2";
let count = 5;
// const QSIZES: [usize; 3] = [512, 4096, 32768];
const QSIZES: [usize; 2] = [512, 4096];

for i in 0..count {
Expand All @@ -64,6 +67,7 @@ fn msgque_write() {
fn msgque_read() {
let mq_client = mc_get_text_conn(MQ);

// const COUNT: i32 = 1000;
const COUNT: i32 = 5;

let key = "k2";
Expand Down