Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redis协议可重入优化 #479

Merged
merged 2 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion protocol/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ pub enum Error {
ChanDisabled,
ChanWriteClosed,
ChanReadClosed,
ProtocolIncomplete,
//协议完整至少还需要x个字节
ProtocolIncomplete(usize),
RequestInvalidMagic,
ResponseInvalidMagic,
RequestProtocolInvalid,
Expand Down
2 changes: 1 addition & 1 deletion protocol/src/kv/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl Into<crate::Error> for Error {
log::warn!("found unhandle response: {}", packet.utf8());
crate::Error::ResponseProtocolInvalid
}
Self::ProtocolIncomplete => crate::Error::ProtocolIncomplete,
Self::ProtocolIncomplete => crate::Error::ProtocolIncomplete(0),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions protocol/src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl Protocol for Kv {
) -> crate::Result<HandShake> {
match self.handshake_inner(stream, option) {
Ok(h) => Ok(h),
Err(crate::Error::ProtocolIncomplete) => Ok(HandShake::Continue),
Err(crate::Error::ProtocolIncomplete(0)) => Ok(HandShake::Continue),
Err(e) => {
log::warn!("+++ found err when shake hand:{:?}", e);
Err(e)
Expand Down Expand Up @@ -143,7 +143,7 @@ impl Protocol for Kv {
// 解析完毕rsp后,除了数据未读完的场景,其他不管是否遇到err,都要进行take
match self.parse_response_inner(&mut rsp_packet) {
Ok(cmd) => Ok(Some(cmd)),
Err(crate::Error::ProtocolIncomplete) => Ok(None),
Err(crate::Error::ProtocolIncomplete(0)) => Ok(None),
Err(e) => {
// 非MysqlError需要日志并外层断连处理
log::error!("+++ err when parse mysql response: {:?}", e);
Expand Down
2 changes: 1 addition & 1 deletion protocol/src/kv/rsppacket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl<'a, S: crate::Stream> ResponsePacket<'a, S> {
// fn _next_packet(&mut self) -> Result<RingSlice> {
// match self.try_next_packet() {
// Ok(pld) => Ok(pld),
// Err(Error::ProtocolIncomplete) => Err(crate::Error::ProtocolIncomplete),
// Err(Error::ProtocolIncomplete(0)) => Err(crate::Error::ProtocolIncomplete(0)),
// Err(e) => {
// // 发现异常,说明异常数据已读完,此处统一take
// self.take();
Expand Down
6 changes: 3 additions & 3 deletions protocol/src/msgque/mcq_bk/text/mcreqpacket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl<'a, S: crate::Stream> RequestPacket<'a, S> {
if self.oft <= self.data.len() {
return Ok(());
}
return Err(super::Error::ProtocolIncomplete);
return Err(super::Error::ProtocolIncomplete(0));
}

#[inline]
Expand Down Expand Up @@ -331,7 +331,7 @@ impl<'a, S: crate::Stream> RequestPacket<'a, S> {
ReqPacketState::Val => {
m = self.oft + self.vlen;
if m >= self.data.len() {
return Err(super::Error::ProtocolIncomplete);
return Err(super::Error::ProtocolIncomplete(0));
}
if self.data.at(m) == CR {
self.skip(self.vlen)?;
Expand Down Expand Up @@ -435,7 +435,7 @@ impl<'a, S: crate::Stream> RequestPacket<'a, S> {
}
}
}
Err(super::Error::ProtocolIncomplete)
Err(super::Error::ProtocolIncomplete(0))
}

#[inline]
Expand Down
8 changes: 4 additions & 4 deletions protocol/src/msgque/mcq_bk/text/mcrsppacket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl<'a, S: crate::Stream> RspPacket<'a, S> {
let mut state = self.state;

if self.data.len() < 2 {
return Err(super::Error::ProtocolIncomplete);
return Err(super::Error::ProtocolIncomplete(0));
}

if self.current().is_ascii_digit() {
Expand Down Expand Up @@ -252,7 +252,7 @@ impl<'a, S: crate::Stream> RspPacket<'a, S> {
m = self.oft + self.vlen;
// TODO 提升解析性能 speedup fishermen
if m >= self.data.len() {
return Err(super::Error::ProtocolIncomplete);
return Err(super::Error::ProtocolIncomplete(0));
}
match self.data.at(m) {
CR => {
Expand Down Expand Up @@ -317,7 +317,7 @@ impl<'a, S: crate::Stream> RspPacket<'a, S> {

self.skip(1)?;
}
Err(super::Error::ProtocolIncomplete)
Err(super::Error::ProtocolIncomplete(0))
}

#[inline]
Expand All @@ -340,7 +340,7 @@ impl<'a, S: crate::Stream> RspPacket<'a, S> {
if self.oft <= self.data.len() {
return Ok(());
}
return Err(super::Error::ProtocolIncomplete);
return Err(super::Error::ProtocolIncomplete(0));
}

#[inline]
Expand Down
4 changes: 2 additions & 2 deletions protocol/src/msgque/mcq_bk/text/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl Protocol for McqText {
) -> Result<()> {
match self.parse_request_inner(stream, alg, process) {
Ok(_) => Ok(()),
Err(Error::ProtocolIncomplete) => Ok(()),
Err(Error::ProtocolIncomplete(0)) => Ok(()),
e => e,
}
}
Expand All @@ -106,7 +106,7 @@ impl Protocol for McqText {
fn parse_response<S: Stream>(&self, data: &mut S) -> Result<Option<Command>> {
match self.parse_response_inner(data) {
Ok(cmd) => Ok(cmd),
Err(Error::ProtocolIncomplete) => Ok(None),
Err(Error::ProtocolIncomplete(0)) => Ok(None),
e => e,
}
}
Expand Down
10 changes: 5 additions & 5 deletions protocol/src/msgque/mcq_bk/text/reqpacket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl<'a, S: crate::Stream> RequestPacket<'a, S> {
if self.oft <= self.data.len() {
return Ok(());
}
return Err(super::Error::ProtocolIncomplete);
return Err(super::Error::ProtocolIncomplete(0));
}

#[inline]
Expand Down Expand Up @@ -229,7 +229,7 @@ impl<'a, S: crate::Stream> RequestPacket<'a, S> {
ReqPacketState::Val => {
m = self.oft + vlen;
if m >= self.data.len() {
return Err(super::Error::ProtocolIncomplete);
return Err(super::Error::ProtocolIncomplete(0));
}
if self.data.at(m) == CR {
self.skip(vlen)?;
Expand All @@ -251,7 +251,7 @@ impl<'a, S: crate::Stream> RequestPacket<'a, S> {
// 当前字节处理完毕,继续下一个字节
self.skip(1)?;
}
Err(super::Error::ProtocolIncomplete)
Err(super::Error::ProtocolIncomplete(0))
}

#[inline]
Expand Down Expand Up @@ -324,7 +324,7 @@ impl Packet for RingSlice {
*oft = idx + 2;
Ok(())
} else {
Err(crate::Error::ProtocolIncomplete)
Err(crate::Error::ProtocolIncomplete(0))
}
}

Expand All @@ -351,7 +351,7 @@ impl Packet for RingSlice {
return Ok(());
}
}
Err(super::Error::ProtocolIncomplete)
Err(super::Error::ProtocolIncomplete(0))
}
}
// mcq 解析时状态
Expand Down
10 changes: 5 additions & 5 deletions protocol/src/msgque/mcq_bk/text/rsppacket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl<'a, S: crate::Stream> RspPacket<'a, S> {
#[inline]
fn start_with(&self, oft: usize, s: &[u8]) -> Result<bool> {
if oft + s.len() > self.data.len() {
Err(crate::Error::ProtocolIncomplete)
Err(crate::Error::ProtocolIncomplete(0))
} else {
Ok(self.data.start_with(oft, s))
}
Expand All @@ -64,7 +64,7 @@ impl<'a, S: crate::Stream> RspPacket<'a, S> {
// memcache rsponse 解析的状态机,后续考虑优化 fishermen
pub(super) fn parse(&mut self) -> Result<()> {
if self.data.len() < 2 {
return Err(super::Error::ProtocolIncomplete);
return Err(super::Error::ProtocolIncomplete(0));
}

let mut state = RspPacketState::RspStr;
Expand Down Expand Up @@ -209,7 +209,7 @@ impl<'a, S: crate::Stream> RspPacket<'a, S> {
RspPacketState::Val => {
token = self.oft + vlen;
if token >= self.data.len() {
return Err(super::Error::ProtocolIncomplete);
return Err(super::Error::ProtocolIncomplete(0));
}
self.skip(vlen)?;
match self.current() {
Expand Down Expand Up @@ -276,7 +276,7 @@ impl<'a, S: crate::Stream> RspPacket<'a, S> {

self.skip(1)?;
}
Err(super::Error::ProtocolIncomplete)
Err(super::Error::ProtocolIncomplete(0))
}

pub(super) fn delay_metric(&mut self) -> Result<()> {
Expand Down Expand Up @@ -344,7 +344,7 @@ impl<'a, S: crate::Stream> RspPacket<'a, S> {
if self.oft <= self.data.len() {
return Ok(());
}
return Err(super::Error::ProtocolIncomplete);
return Err(super::Error::ProtocolIncomplete(0));
}

#[inline]
Expand Down
2 changes: 1 addition & 1 deletion protocol/src/redis/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl CommandHasher {
}
h.hash(slice[i]);
}
Err(crate::Error::ProtocolIncomplete)
Err(crate::Error::ProtocolIncomplete(0))
}
}

Expand Down
37 changes: 10 additions & 27 deletions protocol/src/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,32 +70,23 @@ impl Redis {
}

#[inline]
fn parse_response_inner<S: Stream>(&self, s: &mut S) -> Result<Option<Command>> {
pub fn parse_response_inner<S: Stream>(&self, s: &mut S) -> Result<Option<Command>> {
let data: Packet = s.slice().into();
let ctx: &mut ResponseContext = transmute(s.context());
log::debug!("+++ will parse redis rsp:{:?}", data);
// data.check_onetoken(*oft)?;

match data.at(0) {
b'-' | b':' | b'+' => data.line(&mut ctx.oft)?,
b'$' => ctx.oft += data.num_of_string(&mut ctx.oft)? + 2,
b'*' => data.skip_multibulks(ctx)?,
b'$' => data.skip_string_check(&mut ctx.oft)?,
b'*' => data.skip_multibulks_with_ctx(ctx)?,
_ => return Err(RedisError::RespInvalid.into()),
}

let oft = ctx.oft;
ctx.oft = 0; // 响应消息是b'$',若数据未接收完整,下次需要从起始位置开始解析
match oft <= data.len() {
true => Ok(Some(Command::from_ok(s.take(oft)))),
false => Err(Error::ProtocolIncomplete),
}
// Ok((*oft <= data.len()).then(|| Command::from_ok(s.take(*oft))))
}
#[inline(always)]
fn left_bytes<S: Stream>(&self, s: &mut S) -> usize {
let ctx = transmute(s.context());
// 64是经验值
ctx.bulk * 64
assert!(oft != 0);
assert!(oft <= data.len());
ctx.oft = 0;
Ok(Some(Command::from_ok(s.take(oft))))
}
}

Expand All @@ -117,7 +108,7 @@ impl Protocol for Redis {
let mut packet = RequestPacket::new(stream);
match self.parse_request_inner(&mut packet, alg, process) {
Ok(_) => Ok(()),
Err(Error::ProtocolIncomplete) => {
Err(Error::ProtocolIncomplete(0)) => {
// 如果解析数据不够,提前reserve stream的空间
packet.reserve_stream_buff();
Ok(())
Expand All @@ -134,18 +125,10 @@ impl Protocol for Redis {
fn parse_response<S: Stream>(&self, data: &mut S) -> Result<Option<Command>> {
match self.parse_response_inner(data) {
Ok(cmd) => Ok(cmd),
Err(Error::ProtocolIncomplete) => {
let ctx = transmute(data.context());
let oft = ctx.oft;
//assert!(oft + 3 >= data.len(), "oft:{} => {:?}", oft, data.slice());
if ctx.bulk > 0 {
// 响应消息是array场景
let left = self.left_bytes(data);
Err(Error::ProtocolIncomplete(left)) => {
if left > 0 {
data.reserve(left);
} else if oft > data.len() {
data.reserve(oft - data.len());
}

Ok(None)
}
e => e,
Expand Down
Loading
Loading