Skip to content

Commit

Permalink
Merge pull request #473 from weibocom/mcq_loop_write_20240701
Browse files Browse the repository at this point in the history
mcq改为均衡写入
  • Loading branch information
viciousstar authored Jul 17, 2024
2 parents 0d43637 + 7279de3 commit ae2ec0a
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 89 deletions.
69 changes: 66 additions & 3 deletions endpoint/src/msgque/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use core::fmt;
use std::sync::atomic::Ordering::Relaxed;
use std::{
fmt::{Display, Formatter},
ops::Deref,
};

use crate::Endpoint;
use crate::{CloneableAtomicUsize, Endpoint};

pub(crate) mod config;
pub mod strategy;
Expand Down Expand Up @@ -76,8 +77,13 @@ impl Context {
}

pub trait WriteStrategy {
fn new(queue_len: usize, qsize_pos: &Vec<(usize, usize)>) -> Self;
fn get_write_idx(&self, msg_size: usize, last_idx: Option<usize>) -> usize;
fn new(que_len: usize, sized_que_infos: Vec<SizedQueueInfo>) -> Self;
fn get_write_idx(
&self,
msg_len: usize,
last_idx: Option<usize>,
tried_count: usize,
) -> (usize, bool);
}

pub trait ReadStrategy {
Expand Down Expand Up @@ -115,3 +121,60 @@ where
write!(f, "{}/{}", self.endpoint.addr(), self.qsize)
}
}

/**
* 某个指定size的queue列表的信息;
*/
#[derive(Debug, Clone, Default)]
pub struct SizedQueueInfo {
// 当前queue的size大小
qsize: usize,
// 当前size的queue在总队列中的起始位置
start_pos: usize,
// 当前size的queue的长度
len: usize,
// 当前size的queue的访问序号
pub(crate) sequence: CloneableAtomicUsize,
}

impl SizedQueueInfo {
pub fn new(qsize: usize, start_pos: usize, len: usize) -> Self {
Self {
qsize,
start_pos,
len,
sequence: CloneableAtomicUsize::new(0),
}
}

#[inline]
pub fn qsize(&self) -> usize {
self.qsize
}

#[inline]
pub fn start_pos(&self) -> usize {
self.start_pos
}

#[inline]
pub fn len(&self) -> usize {
self.len
}

// 根据当前的sequence,“轮询”获取本size内下一次应该请求的queue idx
#[inline]
pub fn next_idx(&self) -> usize {
let relative_idx = self.sequence.fetch_add(1, Relaxed) % self.len;
return self.start_pos + relative_idx;
}

// 根据上一次请求的idx,获取本size内下一次应该请求的queue idx
#[inline]
pub fn next_retry_idx(&self, last_idx: usize) -> usize {
assert!(last_idx >= self.start_pos, "{}:{:?}", last_idx, self);
let idx = last_idx + 1;
let relative_idx = (idx - self.start_pos) % self.len;
self.start_pos + relative_idx
}
}
75 changes: 59 additions & 16 deletions endpoint/src/msgque/strategy/fixed.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,87 @@
use std::fmt::{self, Display, Formatter};

use crate::msgque::SizedQueueInfo;

/// 写策略:对同一个size,总是从固定的队列位置开始访问,但同一个size的队列在初始化时需要进行随机初始化;
#[derive(Debug, Clone, Default)]
pub struct Fixed {
que_len: usize,
// 存储的内容:(que_size,起始位置);按照que size排序,方便查找
qsize_pos: Vec<(usize, usize)>,
sized_que_infos: Vec<SizedQueueInfo>,
}

impl crate::msgque::WriteStrategy for Fixed {
#[inline]
fn new(que_len: usize, qsize_pos: &Vec<(usize, usize)>) -> Self {
fn new(que_len: usize, sized_que_infos: Vec<SizedQueueInfo>) -> Self {
Self {
que_len: que_len,
qsize_pos: qsize_pos.clone(),
que_len,
sized_que_infos,
}
}

/**
* 第一次总是轮询位置,确保均衡写;
* 失败后,后续的重复请求,则按照上次的位置继续向后访问,当轮询完本size的queue列表后,进入到下一个size的queue;
* 返回:(que_idx, try_next),que_idx:当前队列的位置; try_next: 如果失败是否需要继续轮询;
*/
#[inline]
fn get_write_idx(&self, msg_len: usize, last_idx: Option<usize>) -> usize {
fn get_write_idx(
&self,
msg_len: usize,
last_idx: Option<usize>,
tried_count: usize,
) -> (usize, bool) {
match last_idx {
None => {
// 使用loop原因:短消息是大概率;size小于8时,list loop 性能比hash类算法性能更佳 fishermen
for (qsize, idx) in self.qsize_pos.iter() {
if *qsize > msg_len {
log::debug!("+++ msg len:{}, qsize:{}, idx:{}", msg_len, *qsize, *idx);
return *idx;
}
let que_info = self.get_que_info(msg_len);
// 第一次写队列消息,永远使用对应msg size的que list中的队列,且循环使用
let idx = que_info.next_idx();
let try_next = self.can_try_next(tried_count, &que_info);
log::debug!("+++ mcqw/{}, {}/{}/{:?}", msg_len, try_next, idx, que_info);
(idx, try_next)
}
Some(last_idx) => {
// 重试写队列消息时,首先轮询当前size的queue列表;在当前size的queue已经轮询完后,则进入后续更大size的queue;
let que_info = self.get_que_info(msg_len);
let try_next = self.can_try_next(tried_count, &que_info);
if tried_count < que_info.len() {
// 首先重试当前len的所有queues
let idx = que_info.next_retry_idx(last_idx);
log::debug!("+++ mcqw retry wdix {}:{}/{}", msg_len, idx, last_idx);
(idx, try_next)
} else {
let idx = last_idx + 1;
(idx.wrapping_rem(self.que_len), try_next)
}
// 如果所有size均小于消息长度,则返回最大size的queue
log::warn!("+++ msg too big {}", msg_len);
self.qsize_pos.last().expect("queue").1
}
Some(last_idx) => (last_idx + 1) % self.que_len,
}
}
}

impl Fixed {
#[inline]
fn get_que_info(&self, msg_len: usize) -> &SizedQueueInfo {
// 使用loop原因:短消息是大概率;size小于8时,list loop 性能比hash类算法性能更佳 fishermen
for qi in self.sized_que_infos.iter() {
if qi.qsize > msg_len {
return qi;
}
}
self.sized_que_infos.last().expect("que info")
}

/**
* 判断如果失败,是否可以继续尝试下一个queue。已经尝试的次数只要小于que_len - start_pos - 1,则可以继续尝试下一个queue
*/
#[inline]
fn can_try_next(&self, tried_count: usize, sized_que_info: &SizedQueueInfo) -> bool {
(self.que_len - sized_que_info.start_pos() - 1) > tried_count
}
}

impl Display for Fixed {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "mq fixed: {}/{:?}", self.que_len, self.qsize_pos)
write!(f, "mq fixed: {}/{:?}", self.que_len, self.sized_que_infos)
}
}
72 changes: 37 additions & 35 deletions endpoint/src/msgque/strategy/round_robbin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use std::fmt::{Display, Formatter};
use crate::{msgque::ReadStrategy, CloneableAtomicUsize};
use std::sync::atomic::Ordering::Relaxed;

const HITS_BITS: u32 = 8;
// const HITS_BITS: u32 = 8;

/// 依次轮询队列列表,注意整个列表在初始化时需要进行随机乱序处理
#[derive(Debug, Clone, Default)]
pub struct RoundRobbin {
que_len: usize,
// 低8bits放连续hits次数,其他bits放索引位置
// 去掉8bit的盯读策略,写改为轮询写,读也对应改为轮询读,同时miss后不重试,ip72看效果明显,后续继续观察 fishermen
current_pos: CloneableAtomicUsize,
}

Expand All @@ -25,23 +25,25 @@ impl ReadStrategy for RoundRobbin {
}
/// 实现策略很简单:持续轮询
#[inline]
fn get_read_idx(&self, last_idx: Option<usize>) -> usize {
fn get_read_idx(&self, _last_idx: Option<usize>) -> usize {
let origin_pos = self.current_pos.fetch_add(1, Relaxed);
let pos = match last_idx {
None => origin_pos,
Some(lidx) => {
// 将pos向后移动一个位置,如果已经被移动了,则不再移动
if lidx == origin_pos.que_idx(self.que_len) {
let new_pos = (lidx + 1).pos();
self.current_pos.store(new_pos, Relaxed);
new_pos
} else {
origin_pos
}
}
};
origin_pos.wrapping_rem(self.que_len)

pos.que_idx(self.que_len)
// TODO 去掉8bit盯读策略,先注释掉,目前灰度观察OK,线上没问题后,再删除,预计2024.9后可以清理 fishermen
// let pos = match last_idx {
// None => origin_pos,
// Some(lidx) => {
// // 将pos向后移动一个位置,如果已经被移动了,则不再移动
// if lidx == origin_pos.que_idx(self.que_len) {
// let new_pos = (lidx + 1).pos();
// self.current_pos.store(new_pos, Relaxed);
// new_pos
// } else {
// origin_pos
// }
// }
// };
// pos.que_idx(self.que_len)
}
}

Expand All @@ -56,24 +58,24 @@ impl Display for RoundRobbin {
}
}

/// pos:低8位为单个idx的持续读取计数,高56位为队列的idx序号
trait Pos {
fn que_idx(&self, que_len: usize) -> usize;
}
// /// pos:低8位为单个idx的持续读取计数,高56位为队列的idx序号
// trait Pos {
// fn que_idx(&self, que_len: usize) -> usize;
// }

impl Pos for usize {
fn que_idx(&self, que_len: usize) -> usize {
self.wrapping_shr(HITS_BITS).wrapping_rem(que_len)
}
}
// impl Pos for usize {
// fn que_idx(&self, que_len: usize) -> usize {
// self.wrapping_shr(HITS_BITS).wrapping_rem(que_len)
// }
// }

/// idx是队列的idx序号,通过将idx左移8位来构建一个新的pos
trait Idx {
fn pos(&self) -> usize;
}
// /// idx是队列的idx序号,通过将idx左移8位来构建一个新的pos
// trait Idx {
// fn pos(&self) -> usize;
// }

impl Idx for usize {
fn pos(&self) -> usize {
self.wrapping_shl(HITS_BITS)
}
}
// impl Idx for usize {
// fn pos(&self) -> usize {
// self.wrapping_shl(HITS_BITS)
// }
// }
12 changes: 7 additions & 5 deletions endpoint/src/msgque/topo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use super::{
ReadStrategy, WriteStrategy,
};
use crate::dns::{DnsConfig, DnsLookup};
use crate::msgque::SizedQueueInfo;
use crate::{CloneAbleAtomicBool, Endpoint, Endpoints, Timeout, Topology};
use sharding::hash::{Hash, HashKey, Hasher, Padding};

Expand Down Expand Up @@ -135,9 +136,10 @@ where
(qid, try_next)
} else {
debug_assert!(req.operation().is_store());
let wid = self.writer_strategy.get_write_idx(req.len(), last_qid);
let (wid, try_next) =
self.writer_strategy
.get_write_idx(req.len(), last_qid, tried_count);
ctx.update_qid(wid as u16);
let try_next = (wid + 1) < self.writers.len();

assert!(wid < self.writers.len(), "{}/{}", wid, self);
(*self.writers.get(wid).expect("mq write"), try_next)
Expand Down Expand Up @@ -258,11 +260,11 @@ where

// 将按size分的ip列表按顺序放置,记录每个size的que的起始位置
let mut ordered_ques = Vec::with_capacity(qaddrs.len());
let mut qsize_poses = Vec::with_capacity(qaddrs.len());
let mut sized_qinfos = Vec::with_capacity(qaddrs.len());
let mut rng = thread_rng();
for (i, mut adrs) in qaddrs.into_iter().enumerate() {
let qs = qsizes[i];
qsize_poses.push((qs, ordered_ques.len()));
sized_qinfos.push(SizedQueueInfo::new(qs, ordered_ques.len(), adrs.len()));

// 对每个size的ip列表进行随机排序
adrs.shuffle(&mut rng);
Expand All @@ -276,7 +278,7 @@ where

// 设置读写策略
self.reader_strategy = RoundRobbin::new(self.backends.len());
self.writer_strategy = Fixed::new(self.writers.len(), &qsize_poses);
self.writer_strategy = Fixed::new(self.writers.len(), sized_qinfos);

log::debug!("+++ mq loaded: {}", self);

Expand Down
14 changes: 11 additions & 3 deletions protocol/src/msgque/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const QUITBYTE: u32 = u32::from_le_bytes(*b"quit");
const END: u32 = u32::from_le_bytes(*b"END\r");
const VALUE: u32 = u32::from_le_bytes(*b"VALU");
const STORED: u32 = u32::from_le_bytes(*b"STOR");
const SERVER_ERROR: u32 = u32::from_le_bytes(*b"SERV");

#[derive(Clone, Default)]
pub struct MsgQue;
Expand All @@ -33,7 +34,7 @@ impl Protocol for MsgQue {
process: &mut P,
) -> Result<()> {
let data = stream.slice();
log::debug!("+++ will parse req:{}", data);
log::debug!("+++ will parse req:{:?}", data);

let mut oft = 0;
while let Some(mut lfcr) = data.find_lf_cr(oft) {
Expand Down Expand Up @@ -84,7 +85,7 @@ impl Protocol for MsgQue {
}
let head4 = data.u32_le(0);
let ok = match head4 {
END => false,
END => true, // get miss 后,改为不重试,因为写已经改为了轮询写,每个节点概率相同,观察效果 2024.7.11 fishermen
VALUE => {
// VALUE <key> <flags> <bytes> [<cas unique>]\r\n
let val_len = Self::val_len(data, 4, lfcr)?;
Expand All @@ -102,7 +103,14 @@ impl Protocol for MsgQue {
true
}
STORED => true,
_ => return Err(Error::UnexpectedData),
SERVER_ERROR => {
log::warn!("+++ server err mcq rsp: {:?} \r\n", data);
false
}
_ => {
log::warn!("+++ unknown err mcq rsp: {:?} \r\n", data);
return Err(Error::UnexpectedData);
}
};
return Ok(Some(Command::from(ok, stream.take(lfcr + 2))));
}
Expand Down
Loading

0 comments on commit ae2ec0a

Please sign in to comment.