Skip to content

Commit

Permalink
Merge pull request #462 from weibocom/main_mcq_update_read_strategy
Browse files Browse the repository at this point in the history
调整mcq读取策略
  • Loading branch information
hustfisher authored May 28, 2024
2 parents d323df5 + 062f985 commit 6c87ff4
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 12 deletions.
3 changes: 2 additions & 1 deletion endpoint/src/msgque/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl Context {
Some(idx as usize)
}

/// 记录本次qid,retry时需要
#[inline]
fn update_qid(&mut self, qid: u16) {
let lower = self.ctx as u8;
Expand All @@ -81,7 +82,7 @@ pub trait WriteStrategy {

pub trait ReadStrategy {
fn new(reader_len: usize) -> Self;
fn get_read_idx(&self) -> usize;
fn get_read_idx(&self, last_idx: Option<usize>) -> usize;
}

#[derive(Debug, Clone, Default)]
Expand Down
46 changes: 42 additions & 4 deletions endpoint/src/msgque/strategy/round_robbin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ use std::fmt::{Display, Formatter};
use crate::{msgque::ReadStrategy, CloneableAtomicUsize};
use std::sync::atomic::Ordering::Relaxed;

const HITS_BITS: u32 = 8;

/// 依次轮询队列列表,注意整个列表在初始化时需要进行随机乱序处理
#[derive(Debug, Clone, Default)]
pub struct RoundRobbin {
que_len: usize,
// 低8bits放连续hits次数,其他bits放索引位置
current_pos: CloneableAtomicUsize,
}

Expand All @@ -17,15 +20,28 @@ impl ReadStrategy for RoundRobbin {
let rand: usize = rand::random();
Self {
que_len: reader_len,
// current_pos: Arc::new(AtomicUsize::new(rand)),
current_pos: CloneableAtomicUsize::new(rand),
}
}
/// 实现策略很简单:持续轮询
#[inline]
fn get_read_idx(&self) -> usize {
let pos = self.current_pos.fetch_add(1, Relaxed);
pos.wrapping_rem(self.que_len)
fn get_read_idx(&self, last_idx: Option<usize>) -> usize {
let origin_pos = self.current_pos.fetch_add(1, Relaxed);
let pos = match last_idx {
None => origin_pos,
Some(lidx) => {
// 将pos向后移动一个位置,如果已经被移动了,则不再移动
if lidx == origin_pos.que_idx(self.que_len) {
let new_pos = (lidx + 1).pos();
self.current_pos.store(new_pos, Relaxed);
new_pos
} else {
origin_pos
}
}
};

pos.que_idx(self.que_len)
}
}

Expand All @@ -39,3 +55,25 @@ impl Display for RoundRobbin {
)
}
}

/// pos:低8位为单个idx的持续读取计数,高56位为队列的idx序号
trait Pos {
fn que_idx(&self, que_len: usize) -> usize;
}

impl Pos for usize {
fn que_idx(&self, que_len: usize) -> usize {
self.wrapping_shr(HITS_BITS).wrapping_rem(que_len)
}
}

/// idx是队列的idx序号,通过将idx左移8位来构建一个新的pos
trait Idx {
fn pos(&self) -> usize;
}

impl Idx for usize {
fn pos(&self) -> usize {
self.wrapping_shl(HITS_BITS)
}
}
8 changes: 4 additions & 4 deletions endpoint/src/msgque/topo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,21 @@ where

// 将访问次数加一,并返回之前的访问次数
let tried_count = ctx.get_and_incr_tried_count();
let last_qid = ctx.get_last_qid(inited);

// 队列始终不需要write back,即写成功后不需要继回写
assert!(!req.is_write_back());

// 对于读请求:顺序读取队列,如果队列都去了到数据,就连续读N个,如果没读到,则尝试下一个ip,直到轮询完所有的ip
// 注意空读后的最后一次请求,会概率尝试访问offline
let (qid, try_next) = if req.operation().is_retrival() {
let qid = self.reader_strategy.get_read_idx();
let qid = self.reader_strategy.get_read_idx(last_qid);
ctx.update_qid(qid as u16);
let try_next = (tried_count + 1) < self.backends.len();
(qid, try_next)
} else {
debug_assert!(req.operation().is_store());

let last_wid = ctx.get_last_qid(inited);
let wid = self.writer_strategy.get_write_idx(req.len(), last_wid);
let wid = self.writer_strategy.get_write_idx(req.len(), last_qid);
ctx.update_qid(wid as u16);
let try_next = (wid + 1) < self.writers.len();

Expand Down
3 changes: 3 additions & 0 deletions protocol/src/msgque/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ impl Protocol for MsgQue {
process: &mut P,
) -> Result<()> {
let data = stream.slice();
log::debug!("+++ will parse req:{}", data);

let mut oft = 0;
while let Some(mut lfcr) = data.find_lf_cr(oft) {
let head4 = data.u32_le(oft);
Expand Down Expand Up @@ -72,6 +74,7 @@ impl Protocol for MsgQue {
#[inline]
fn parse_response<S: Stream>(&self, stream: &mut S) -> Result<Option<Command>> {
let data = stream.slice();
log::debug!("+++ will parse rsp:{}", data);
let Some(mut lfcr) = data.find_lf_cr(0) else {
return Ok(None);
};
Expand Down
5 changes: 4 additions & 1 deletion tests/src/mq/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use endpoint::msgque::strategy::Fixed;
use endpoint::msgque::strategy::RoundRobbin;
use endpoint::msgque::ReadStrategy;
use endpoint::msgque::WriteStrategy;
use rand::random;

mod protocol;
/// 轮询读取40次,预期把每个节点都读一遍
Expand All @@ -15,10 +16,12 @@ fn mq_read_strategy() {

let mut count = 0;
let mut readed = HashSet::with_capacity(READER_COUNT);
let mut last_idx = Some(random());
loop {
count += 1;
let idx = rstrategy.get_read_idx();
let idx = rstrategy.get_read_idx(last_idx);
readed.insert(idx);
last_idx = Some(idx);

if readed.len() == READER_COUNT {
// println!("read strategy loop all: {}/{}", count, readed.len());
Expand Down
48 changes: 47 additions & 1 deletion tests_integration/src/mq/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ fn msgque_write() {
fn msgque_read() {
let mq_client = mc_get_text_conn(MQ);

const COUNT: i32 = 5;
const COUNT: i32 = 1000;

let key = "k2";
let mut read_count = 0;
Expand Down Expand Up @@ -97,6 +97,52 @@ fn msgque_read() {
}
}

#[test]
fn msgque_strategy_check() {
let mq_client = mc_get_text_conn(MQ);

let key = "k2";
let count = 100;
const QSIZES: [usize; 1] = [512];

for i in 0..count {
let msg_len = QSIZES[i % QSIZES.len()] * 8 / 10;
let value = build_msg(msg_len);
println!("will set mcq msg {} with len:{}", i, value.len());
mq_client.set(key, value, 0).unwrap();
}

println!("mq write {} msgs done", count);
let mut read_count = 0;
let mut hits = 0;
loop {
let msg: Option<String> = mq_client.get(key).unwrap();
read_count += 1;

if msg.is_some() {
hits += 1;
println!(
"mq len/{}, hits:{}/{}",
msg.unwrap().len(),
hits,
read_count
);
if hits >= count {
println!("read all mq msgs count:{}/{}", hits, read_count);
break;
}
}
}

let hits_percent = (hits as f64) / (read_count as f64);
assert!(
hits_percent >= 0.9,
"check read strategy:{}/{}",
hits,
read_count
);
}

/// 构建所需长度的msg
fn build_msg(len: usize) -> String {
let mut msg = format!("msg-{} ", len);
Expand Down
2 changes: 1 addition & 1 deletion tests_integration/src/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ fn test_mset_reenter() {
assert_eq!(con.recv_response().unwrap(), redis::Value::Okay);
let key = ("test_mset_reenter1", "test_mset_reenter2");
assert_eq!(
con.mget::<(&str, &str), (usize, usize)>(key).unwrap(),
con.get::<(&str, &str), (usize, usize)>(key).unwrap(),
(mid, mid)
);
}
Expand Down

0 comments on commit 6c87ff4

Please sign in to comment.