Skip to content

Commit

Permalink
Merge pull request #467 from weibocom/notfound
Browse files Browse the repository at this point in the history
msq 协议优化
  • Loading branch information
viciousstar authored May 24, 2024
2 parents d22867a + 72efc6b commit d323df5
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 78 deletions.
12 changes: 12 additions & 0 deletions ds/src/mem/ring_slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,18 @@ impl RingSlice {
None
}

// 从后往前找
#[inline]
pub fn rfind_r(&self, r: impl Range, mut f: impl Visit) -> Option<usize> {
let (start, end) = r.range(self);
for i in (start..end).rev() {
if f.check(self[i], i) {
return Some(i);
}
}
None
}

//找第几个
#[inline]
pub fn find_r_n(&self, r: impl Range, mut f: impl Visit, mut num: usize) -> Option<usize> {
Expand Down
139 changes: 66 additions & 73 deletions protocol/src/msgque/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,20 @@ pub const OP_STATS: u16 = 2;
pub const OP_VERSION: u16 = 3;
pub const OP_QUIT: u16 = 4;

pub type MsgQue = McqText;
const GETBYTE: u32 = u32::from_le_bytes(*b"get ");
const SETBYTE: u32 = u32::from_le_bytes(*b"set ");
const STATSBYTE: u32 = u32::from_le_bytes(*b"stat");
const VERSIONBYTE: u32 = u32::from_le_bytes(*b"vers");
const QUITBYTE: u32 = u32::from_le_bytes(*b"quit");

const END: u32 = u32::from_le_bytes(*b"END\r");
const VALUE: u32 = u32::from_le_bytes(*b"VALU");
const STORED: u32 = u32::from_le_bytes(*b"STOR");

#[derive(Clone, Default)]
pub struct McqText;
pub struct MsgQue;

impl Protocol for McqText {
impl Protocol for MsgQue {
#[inline]
fn parse_request<S: Stream, H: Hash, P: RequestProcessor>(
&self,
Expand All @@ -28,38 +36,27 @@ impl Protocol for McqText {
let mut oft = 0;
while let Some(mut lfcr) = data.find_lf_cr(oft) {
let head4 = data.u32_le(oft);
let (op_code, op) = if head4 == u32::from_le_bytes(*b"get ") {
(OP_GET, Get)
} else if head4 == u32::from_le_bytes(*b"set ") {
// <command name> <key> <flags> <exptime> <bytes>\r\n
let line_oft = lfcr + 2;
// 命令之后第四个空格是数据长度
let Some(space) = data.find_r_n(oft + 4..line_oft, b' ', 3) else {
return Err(Error::ProtocolNotSupported);
};
let Some(val_len) = data.try_str_num(space + 1..lfcr) else {
return Err(Error::ProtocolNotSupported);
};
// 大value一次申请
lfcr = line_oft + val_len;
if data.len() < lfcr + 2 {
stream.reserve(lfcr + 2 - data.len());
return Ok(());
let (op_code, op) = match head4 {
GETBYTE => (OP_GET, Get),
SETBYTE => {
// <command name> <key> <flags> <exptime> <bytes>\r\n
// 最后一个空格后是数据长度
let val_len = Self::val_len(data, oft + 4, lfcr)?;
// 大value一次申请
lfcr = lfcr + 2 + val_len;
if data.len() < lfcr + 2 {
stream.reserve(lfcr + 2 - data.len());
return Ok(());
}
if data[lfcr] != b'\r' || data[lfcr + 1] != b'\n' {
return Err(Error::ProtocolNotSupported);
}
(OP_SET, Store)
}
if data[lfcr] != b'\r' || data[lfcr + 1] != b'\n' {
return Err(Error::ProtocolNotSupported);
}
(OP_SET, Store)
} else if head4 == u32::from_le_bytes(*b"stat") {
//stats
(OP_STATS, Meta)
} else if head4 == u32::from_le_bytes(*b"vers") {
//version
(OP_VERSION, Meta)
} else if head4 == u32::from_le_bytes(*b"quit") {
(OP_QUIT, Meta)
} else {
return Err(Error::ProtocolNotSupported);
STATSBYTE => (OP_STATS, Meta),
VERSIONBYTE => (OP_VERSION, Meta),
QUITBYTE => (OP_QUIT, Meta),
_ => return Err(Error::ProtocolNotSupported),
};

let cmd = stream.take(lfcr + 2 - oft);
Expand All @@ -83,38 +80,30 @@ impl Protocol for McqText {
return Err(crate::Error::UnexpectedData);
}
let head4 = data.u32_le(0);
let ok = if head4 == u32::from_le_bytes(*b"END\r") {
false
// VALUE <key> <flags> <bytes> [<cas unique>]\r\n
} else if head4 == u32::from_le_bytes(*b"VALU") {
let line_oft = lfcr + 2;
let Some(space) = data.find_r_n(4..line_oft, b' ', 3) else {
return Err(Error::UnexpectedData);
};
let Some(val_len) = data.try_str_num(space + 1..lfcr) else {
return Err(Error::UnexpectedData);
};
lfcr = line_oft + val_len;
//数据之后是\r\nend\r\n
if data.len() < lfcr + 2 + 5 {
stream.reserve(lfcr + 2 + 5 - data.len());
return Ok(None);
}
if !data.start_with(lfcr, b"\r\nEND\r\n") {
return Err(Error::UnexpectedData);
let ok = match head4 {
END => false,
VALUE => {
// VALUE <key> <flags> <bytes> [<cas unique>]\r\n
let val_len = Self::val_len(data, 4, lfcr)?;
let line_oft = lfcr + 2;
lfcr = line_oft + val_len;
//数据之后是\r\nend\r\n
if data.len() < lfcr + 2 + 5 {
stream.reserve(lfcr + 2 + 5 - data.len());
return Ok(None);
}
if !data.start_with(lfcr, b"\r\nEND\r\n") {
return Err(Error::UnexpectedData);
}
lfcr = lfcr + 5;
true
}
lfcr = lfcr + 5;
true
} else if head4 == u32::from_le_bytes(*b"STOR") {
//STORED
true
} else {
return Err(Error::UnexpectedData);
STORED => true,
_ => return Err(Error::UnexpectedData),
};
return Ok(Some(Command::from(ok, stream.take(lfcr + 2))));
}

// mc协议比较简单,除了quit直接断连接,其他指令直接发送即可
#[inline]
fn write_response<C, W, M, I>(
&self,
Expand All @@ -131,27 +120,21 @@ impl Protocol for McqText {
let request = ctx.request();
let op_code = request.op_code();
match op_code {
OP_QUIT => {
return Err(crate::Error::Quit);
}
OP_STATS => {
w.write(b"STAT supported later\r\nEND\r\n")?;
}
OP_VERSION => {
w.write(b"VERSION 0.0.1\r\n")?;
}
OP_QUIT => Err(crate::Error::Quit),
OP_STATS => w.write(b"STAT supported later\r\nEND\r\n"),
OP_VERSION => w.write(b"VERSION 0.0.1\r\n"),
_ => {
if let Some(rsp) = response {
w.write_slice(rsp, 0)?;
self.metrics(request, rsp, ctx);
Ok(())
} else {
//协议要求服务端错误断连
w.write(b"SERVER_ERROR mcq not available\r\n")?;
return Err(Error::Quit);
Err(Error::Quit)
}
}
}
Ok(())
}

fn on_sent(&self, req_op: crate::Operation, metrics: &mut crate::HostMetric) {
Expand All @@ -172,7 +155,7 @@ impl Protocol for McqText {
}
}

impl McqText {
impl MsgQue {
// 响应发送时,统计请求最终成功的qps
#[inline]
fn metrics<C, M, I>(&self, request: &HashedCommand, response: &Command, metrics: &C)
Expand All @@ -189,4 +172,14 @@ impl McqText {
}
}
}
#[inline]
fn val_len(data: ds::RingSlice, start: usize, end: usize) -> Result<usize> {
let Some(space) = data.rfind_r(start..end, b' ') else {
return Err(Error::ProtocolNotSupported);
};
let Some(val_len) = data.try_str_num(space + 1..end) else {
return Err(Error::ProtocolNotSupported);
};
Ok(val_len)
}
}
10 changes: 5 additions & 5 deletions tests/src/mq/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::cell::UnsafeCell;

use ds::BufWriter;
use protocol::{
msgque::{McqText, OP_GET, OP_QUIT, OP_SET, OP_STATS, OP_VERSION},
msgque::{MsgQue, OP_GET, OP_QUIT, OP_SET, OP_STATS, OP_VERSION},
AsyncBufRead, BufRead, Commander, Error, HashedCommand, Metric, Proto, RequestProcessor,
Stream, Writer,
};
Expand Down Expand Up @@ -106,7 +106,7 @@ impl Hash for Alg {
fn test_req_reenter() {
let getset = b"get key1\r\nget key2\r\nset key3 0 9999 10\r\n1234567890\r\nset key4 0 9999 10\r\n1234567890\r\nget key1\r\nget key2\r\n";

let proto = McqText;
let proto = MsgQue;
let alg = &Alg {};
for i in 0..getset.len() {
let mut process = Process { reqs: Vec::new() };
Expand Down Expand Up @@ -176,7 +176,7 @@ fn test_req_reenter() {

#[test]
fn test_meta() {
let proto = McqText;
let proto = MsgQue;
let alg = &Alg {};

let mut process = Process { reqs: Vec::new() };
Expand Down Expand Up @@ -224,7 +224,7 @@ fn test_meta() {

#[test]
fn test_rsp() {
let proto = McqText;
let proto = MsgQue;

let rspstr = b"END\r\n";
for i in 0..rspstr.len() {
Expand Down Expand Up @@ -372,7 +372,7 @@ impl Commander<TestMetric, TestMetricItem> for TestCtx {

#[test]
fn test_write_response() {
let proto = McqText;
let proto = MsgQue;
let alg = &Alg {};

let mut process = Process { reqs: Vec::new() };
Expand Down

0 comments on commit d323df5

Please sign in to comment.