Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dev' into dev_ci
Browse files Browse the repository at this point in the history
  • Loading branch information
viciousstar committed Nov 2, 2022
2 parents 7f816f9 + 9986a91 commit fae7021
Show file tree
Hide file tree
Showing 40 changed files with 809 additions and 368 deletions.
2 changes: 1 addition & 1 deletion agent/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ async fn _process_one(
if let Err(e) = copy_bidirectional(top, metrics, client, p).await {
match e {
//protocol::Error::Quit => {} // client发送quit协议退出
//protocol::Error::ReadEof => {}
//protocol::Error::Eof => {}
protocol::Error::ProtocolNotSupported => unsupport_cmd += 1,
// 发送异常信息给client
_e => log::debug!("{:?} disconnected. {:?}", _path, _e),
Expand Down
8 changes: 4 additions & 4 deletions ds/src/mem/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,10 @@ impl RingBuffer {
.offset(self.mask(self.read + idx) as isize)
}
}
#[inline]
pub(crate) fn raw(&self) -> &[u8] {
unsafe { std::slice::from_raw_parts(self.data.as_ptr(), self.size) }
}
//#[inline]
//pub(crate) fn raw(&self) -> &[u8] {
// unsafe { std::slice::from_raw_parts(self.data.as_ptr(), self.size) }
//}
}

impl Drop for RingBuffer {
Expand Down
8 changes: 4 additions & 4 deletions ds/src/mem/guarded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ impl GuardedBuffer {
fn offset(&self, oft: usize) -> usize {
self.pending() + oft
}
#[inline]
pub fn raw(&self) -> &[u8] {
self.inner.raw()
}
//#[inline]
//pub fn raw(&self) -> &[u8] {
// self.inner.raw()
//}
}
use std::fmt::{self, Display, Formatter};
impl Display for GuardedBuffer {
Expand Down
96 changes: 62 additions & 34 deletions ds/src/mem/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ const BUF_MIN: usize = 1024;
use std::time::{Duration, Instant};
// 内存需要缩容时的策略
// 为了避免频繁的缩容,需要设置一个最小频繁,通常使用最小间隔时间
#[allow(dead_code)]
pub struct MemPolicy {
ticks: usize,
last: Instant, // 上一次tick返回true的时间
secs: u16, // 每两次tick返回true的最小间隔时间

// 下面两个变量为了输出日志
direction: &'static str, // 方向: true为tx, false为rx. 打日志用
id: usize,
start: Instant,
trace: trace::Trace,
}

impl MemPolicy {
Expand All @@ -25,16 +22,12 @@ impl MemPolicy {
Self::from(Duration::from_secs(600), direction)
}
fn from(delay: Duration, direction: &'static str) -> Self {
static ID: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(1);
let id = ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let secs = delay.as_secs().max(1).min(u16::MAX as u64) as u16;
Self {
ticks: 0,
secs,
last: Instant::now(),
id,
direction,
start: Instant::now(),
trace: direction.into(),
}
}
#[inline(always)]
Expand Down Expand Up @@ -82,15 +75,7 @@ impl MemPolicy {
.max(cap)
.max(BUF_MIN)
.next_power_of_two();
log::info!(
"{} buf grow: {} {} + {} => {} id:{}",
self.direction,
len,
cap,
reserve,
new,
self.id
);
log::info!("grow: {} {} > {} => {} {}", len, reserve, cap, new, self);
new
}
// 确认缩容的size:
Expand All @@ -102,28 +87,71 @@ impl MemPolicy {
#[inline]
pub fn shrink(&mut self, len: usize, cap: usize) -> usize {
let new = (cap / 2).max(BUF_MIN).max(len).next_power_of_two();
log::info!(
"{} buf shrink: {} {} => {} ticks:{} elapse:{} secs id:{}",
self.direction,
len,
cap,
new,
self.ticks,
self.last.elapsed().as_secs(),
self.id
);
log::info!("shrink: {} < {} => {} {}", len, cap, new, self);
self.ticks = 0;
new
}
}

impl Drop for MemPolicy {
fn drop(&mut self) {
log::info!(
"{} buf policy drop. lifetime:{:?} id:{}",
self.direction,
self.start.elapsed(),
self.id
);
log::info!("buf policy drop => {}", self);
}
}
use std::fmt::{self, Display, Formatter};
impl Display for MemPolicy {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"buf policy: ticks: {} last: {:?} secs: {}{:?}",
self.ticks,
self.last.elapsed(),
self.secs,
self.trace
)
}
}

#[cfg(debug_assertions)]
mod trace {
use std::fmt::{self, Debug, Formatter};
use std::time::Instant;
pub(super) struct Trace {
direction: &'static str, // 方向: true为tx, false为rx. 打日志用
id: usize,
start: Instant,
}

impl From<&'static str> for Trace {
fn from(direction: &'static str) -> Self {
static ID: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(1);
let id = ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Self {
direction,
id,
start: Instant::now(),
}
}
}
impl Debug for Trace {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
" id: {} lifetime:{:?} => {}",
self.id,
self.start.elapsed(),
self.direction
)
}
}
}
#[cfg(not(debug_assertions))]
mod trace {
#[derive(Debug)]
pub(super) struct Trace;
impl From<&'static str> for Trace {
fn from(_direction: &'static str) -> Self {
Self
}
}
}
2 changes: 1 addition & 1 deletion ds/src/mem/resized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl ResizedRingBuffer {
}
}
#[inline]
pub fn resize(&mut self, cap: usize) {
fn resize(&mut self, cap: usize) {
assert!(cap <= self.max as usize);
assert!(cap >= self.min as usize);
let new = self.inner.resize(cap);
Expand Down
12 changes: 7 additions & 5 deletions ds/src/mem/ring_slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,14 +313,16 @@ impl Debug for RingSlice {
#[inline]
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
use crate::Utf8;
let data = if self.len() > 1024 {
format!("[hidden for too long len:{}]", self.len())
} else {
self.to_vec().utf8()
};

write!(
f,
"ptr:{} start:{} end:{} cap:{} => {:?}",
self.ptr,
self.start,
self.end,
self.cap,
self.to_vec().utf8()
self.ptr, self.start, self.end, self.cap, data
)
}
}
2 changes: 1 addition & 1 deletion endpoint/src/cacheservice/topo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ where
return;
}
}
log::debug!("request sent prepared:{} {} {}", idx, req, self);
log::debug!("+++ request sent prepared:{} - {} {}", idx, req, self);
assert!(
idx < self.streams.len(),
"{} < {} => {:?}",
Expand Down
26 changes: 17 additions & 9 deletions endpoint/src/msgque/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
mod config;
mod strategy;

pub mod topo;

use strategy::QID;

// 0-7: 读/写次数;
const COUNT_BITS: u8 = 8;
// 8-23:读写的位置索引
const IDX_SHIFT: u8 = 0 + COUNT_BITS;
const IDX_BITS: u8 = 16;
const QID_SHIFT: u8 = 0 + COUNT_BITS;
const QID_BITS: u8 = 16;
// 24-31: 写入队列的size的block数,即:size/512;
const WRITE_SIZE_BLOCK_SHIFT: u8 = IDX_SHIFT + IDX_BITS;
const WRITE_SIZE_BLOCK_SHIFT: u8 = QID_SHIFT + QID_BITS;
const WRITE_SIZE_BLOCK_BITS: u8 = 8;
// 32-63: 保留
const DATA_RESERVE_SHIFT: u8 = WRITE_SIZE_BLOCK_SHIFT + WRITE_SIZE_BLOCK_BITS;
Expand Down Expand Up @@ -55,20 +59,24 @@ impl Context {

// read/write 的idx位置相同
#[inline]
fn get_idx(&self) -> usize {
fn get_last_qid(&self) -> QID {
// idx 占16个字节
let idx = (self.ctx >> IDX_SHIFT) as u16;
idx as usize
let idx = (self.ctx >> QID_SHIFT) as u16;
idx as QID
}

#[inline]
fn update_idx(&mut self, read_idx: u16) {
self.ctx |= (read_idx << IDX_SHIFT) as u64;
fn update_qid(&mut self, qid: u16) {
let lower = self.ctx as u8;
// 不直接使用qid后面字段的shit,因为context的字段可能会变
let high_shift = QID_SHIFT + QID_BITS;
let high = self.ctx >> high_shift << high_shift;
self.ctx = lower as u64 | (qid << QID_SHIFT) as u64 | high;
}

#[inline]
fn update_write_size(&mut self, wsize: usize) {
let lower = self.ctx & (1 << WRITE_SIZE_BLOCK_SHIFT - 1);
let lower = self.ctx & ((1 << WRITE_SIZE_BLOCK_SHIFT) - 1);
let high = self.ctx >> DATA_RESERVE_SHIFT << DATA_RESERVE_SHIFT;
let block = wsize / BLOCK_SIZE;
self.ctx = lower | (block << WRITE_SIZE_BLOCK_SHIFT) as u64 | high;
Expand Down
Loading

0 comments on commit fae7021

Please sign in to comment.