Skip to content

Commit

Permalink
Merge pull request #173 from weibocom/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
FicoHu authored Nov 2, 2022
2 parents 229486c + 9f8d738 commit 347f29b
Show file tree
Hide file tree
Showing 45 changed files with 1,829 additions and 1,051 deletions.
16 changes: 15 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ env:
redis: localhost:56810
counterservice: localhost:9302
mc: localhost:9301
phantom: localhost:9303
min_key: 1
max_key: 10000

Expand All @@ -20,8 +21,13 @@ jobs:

steps:
- name: Prepare Vintage_MC_Redis
run: docker run -d -v /home/runner/work/breeze:/data1/resource/breeze --net="host" --name breeze_github_ci ficohu/breeze:githubci0011
run: docker run -d -v /home/runner/work/breeze:/data1/resource/breeze --net="host" --name breeze_github_ci 12323312323223/breeze:githubci105
- uses: actions/checkout@v3
- name: Install stable toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true
- name: Build
run: cargo build
- name: Check Vintage
Expand All @@ -30,6 +36,8 @@ jobs:
curl http://127.0.0.1:8080/config/cloud/redis/testbreeze/redismeshtest
curl http://127.0.0.1:8080/config/v1/cache.service.testbreeze.pool.yf/all
curl http://127.0.0.1:8080/config/cloud/counterservice/testbreeze/meshtest
curl http://127.0.0.1:8080/config/cloud/phantom/testbreeze/phantomtest
curl http://127.0.0.1:8080/config/cloud/mq/testbreeze/mcqmeshtest
- name: Create Socks
run: |
#ps -aux|grep breeze
Expand All @@ -39,6 +47,7 @@ jobs:
touch /home/runner/work/breeze/socks/config+cloud+redis+testbreeze+redismeshtest@redis:56810@rs
touch /home/runner/work/breeze/socks/config+v1+cache.service.testbreeze.pool.yf+all:meshtest@mc:9301@cs
touch /home/runner/work/breeze/socks/config+cloud+counterservice+testbreeze+meshtest@redis:9302@rs
touch /home/runner/work/breeze/socks/config+cloud+phantom+testbreeze+phantomtest@phantom:9303@pt
ls -all /home/runner/work/breeze/snapshot
ls -all /home/runner/work/breeze/socks
ls -all /home/runner/work/breeze/logs
Expand All @@ -57,5 +66,10 @@ jobs:
tail -10 /home/runner/work/breeze/logs/breeze.log
#cat /home/runner/work/breeze/logs/log.file
#cat /home/runner/work/breeze/logs/breeze.log
- name: Run cargo-tarpaulin
uses: actions-rs/[email protected]
with:
version: '0.15.0'
args: '-v'
- name: Run tests
run: cargo test
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ members = [
"ds",
"rt",
"tests",
"tests_ci",
"tests_integration",
]

exclude = ["api", "tests"]
Expand Down
2 changes: 1 addition & 1 deletion agent/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ async fn _process_one(
if let Err(e) = copy_bidirectional(top, metrics, client, p).await {
match e {
//protocol::Error::Quit => {} // client发送quit协议退出
//protocol::Error::ReadEof => {}
//protocol::Error::Eof => {}
protocol::Error::ProtocolNotSupported => unsupport_cmd += 1,
// 发送异常信息给client
_e => log::debug!("{:?} disconnected. {:?}", _path, _e),
Expand Down
8 changes: 4 additions & 4 deletions ds/src/mem/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,10 @@ impl RingBuffer {
.offset(self.mask(self.read + idx) as isize)
}
}
#[inline]
pub(crate) fn raw(&self) -> &[u8] {
unsafe { std::slice::from_raw_parts(self.data.as_ptr(), self.size) }
}
//#[inline]
//pub(crate) fn raw(&self) -> &[u8] {
// unsafe { std::slice::from_raw_parts(self.data.as_ptr(), self.size) }
//}
}

impl Drop for RingBuffer {
Expand Down
8 changes: 4 additions & 4 deletions ds/src/mem/guarded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ impl GuardedBuffer {
fn offset(&self, oft: usize) -> usize {
self.pending() + oft
}
#[inline]
pub fn raw(&self) -> &[u8] {
self.inner.raw()
}
//#[inline]
//pub fn raw(&self) -> &[u8] {
// self.inner.raw()
//}
}
use std::fmt::{self, Display, Formatter};
impl Display for GuardedBuffer {
Expand Down
96 changes: 62 additions & 34 deletions ds/src/mem/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ const BUF_MIN: usize = 1024;
use std::time::{Duration, Instant};
// 内存需要缩容时的策略
// 为了避免频繁的缩容,需要设置一个最小频繁,通常使用最小间隔时间
#[allow(dead_code)]
pub struct MemPolicy {
ticks: usize,
last: Instant, // 上一次tick返回true的时间
secs: u16, // 每两次tick返回true的最小间隔时间

// 下面两个变量为了输出日志
direction: &'static str, // 方向: true为tx, false为rx. 打日志用
id: usize,
start: Instant,
trace: trace::Trace,
}

impl MemPolicy {
Expand All @@ -25,16 +22,12 @@ impl MemPolicy {
Self::from(Duration::from_secs(600), direction)
}
fn from(delay: Duration, direction: &'static str) -> Self {
static ID: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(1);
let id = ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let secs = delay.as_secs().max(1).min(u16::MAX as u64) as u16;
Self {
ticks: 0,
secs,
last: Instant::now(),
id,
direction,
start: Instant::now(),
trace: direction.into(),
}
}
#[inline(always)]
Expand Down Expand Up @@ -82,15 +75,7 @@ impl MemPolicy {
.max(cap)
.max(BUF_MIN)
.next_power_of_two();
log::info!(
"{} buf grow: {} {} + {} => {} id:{}",
self.direction,
len,
cap,
reserve,
new,
self.id
);
log::info!("grow: {} {} > {} => {} {}", len, reserve, cap, new, self);
new
}
// 确认缩容的size:
Expand All @@ -102,28 +87,71 @@ impl MemPolicy {
#[inline]
pub fn shrink(&mut self, len: usize, cap: usize) -> usize {
let new = (cap / 2).max(BUF_MIN).max(len).next_power_of_two();
log::info!(
"{} buf shrink: {} {} => {} ticks:{} elapse:{} secs id:{}",
self.direction,
len,
cap,
new,
self.ticks,
self.last.elapsed().as_secs(),
self.id
);
log::info!("shrink: {} < {} => {} {}", len, cap, new, self);
self.ticks = 0;
new
}
}

impl Drop for MemPolicy {
fn drop(&mut self) {
log::info!(
"{} buf policy drop. lifetime:{:?} id:{}",
self.direction,
self.start.elapsed(),
self.id
);
log::info!("buf policy drop => {}", self);
}
}
use std::fmt::{self, Display, Formatter};
impl Display for MemPolicy {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"buf policy: ticks: {} last: {:?} secs: {}{:?}",
self.ticks,
self.last.elapsed(),
self.secs,
self.trace
)
}
}

#[cfg(debug_assertions)]
mod trace {
use std::fmt::{self, Debug, Formatter};
use std::time::Instant;
pub(super) struct Trace {
direction: &'static str, // 方向: true为tx, false为rx. 打日志用
id: usize,
start: Instant,
}

impl From<&'static str> for Trace {
fn from(direction: &'static str) -> Self {
static ID: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(1);
let id = ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Self {
direction,
id,
start: Instant::now(),
}
}
}
impl Debug for Trace {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
" id: {} lifetime:{:?} => {}",
self.id,
self.start.elapsed(),
self.direction
)
}
}
}
#[cfg(not(debug_assertions))]
mod trace {
#[derive(Debug)]
pub(super) struct Trace;
impl From<&'static str> for Trace {
fn from(_direction: &'static str) -> Self {
Self
}
}
}
2 changes: 1 addition & 1 deletion ds/src/mem/resized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl ResizedRingBuffer {
}
}
#[inline]
pub fn resize(&mut self, cap: usize) {
fn resize(&mut self, cap: usize) {
assert!(cap <= self.max as usize);
assert!(cap >= self.min as usize);
let new = self.inner.resize(cap);
Expand Down
12 changes: 7 additions & 5 deletions ds/src/mem/ring_slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,14 +313,16 @@ impl Debug for RingSlice {
#[inline]
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
use crate::Utf8;
let data = if self.len() > 1024 {
format!("[hidden for too long len:{}]", self.len())
} else {
self.to_vec().utf8()
};

write!(
f,
"ptr:{} start:{} end:{} cap:{} => {:?}",
self.ptr,
self.start,
self.end,
self.cap,
self.to_vec().utf8()
self.ptr, self.start, self.end, self.cap, data
)
}
}
3 changes: 3 additions & 0 deletions metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ use std::fmt::Debug;
use std::ops::AddAssign;
use std::sync::Arc;

// tests only
pub use item::Item;

pub struct Metric {
id: Arc<Id>,
item: ItemRc,
Expand Down
8 changes: 4 additions & 4 deletions protocol/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::time::Duration;

#[derive(Debug)]
pub enum Error {
ReadEof,
Eof,
QueueClosed,
NotInit,
Closed,
Expand All @@ -19,13 +19,13 @@ pub enum Error {
RequestProtocolInvalidNoReturn(&'static str),
ResponseProtocolInvalid,
ProtocolNotSupported,
IndexOutofBound,
Inner,
//IndexOutofBound,
//Inner,
TopChanged,
WriteResponseErr,
NoResponseFound,
// CommandNotSupported,
ResponseBufferFull,
BufferFull,
Quit,
Timeout(Duration),
Pending, // 在连接退出时,仍然有请求在队列中没有发送。
Expand Down
4 changes: 0 additions & 4 deletions protocol/src/msgque/mcq/text/rsppacket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,6 @@ impl<'a, S: crate::Stream> RspPacket<'a, S> {
Err(super::Error::ProtocolIncomplete)
}

pub(super) fn is_empty(&self) -> bool {
self.rsp_type == RspType::End
}

#[inline]
pub(super) fn take(&mut self) -> ds::MemGuard {
assert!(
Expand Down
13 changes: 7 additions & 6 deletions protocol/src/redis/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,13 @@ pub(super) static SUPPORTED: Commands = {
//("ping", "ping" , -1, Meta, 0, 0, 0, 2, false, true, false, false, false),
//// 不支持select 0以外的请求。所有的select请求直接返回,默认使用db0
//("select", "select" , 2, Meta, 0, 0, 0, 1, false, true, false, false, false),
//("hello", "hello" , 2, Meta, 0, 0, 0, 4, false, true, false, false, false),
//("hello", "hello" , -1, Meta, 0, 0, 0, 4, false, true, false, false, false),
//("quit", "quit" , 1, Meta, 0, 0, 0, 0, false, true, false, false, false),
// hello 参数应该是-1,可以不带或者带多个
Cmd::new("command").arity(-1).op(Meta).padding(1).nofwd(),
Cmd::new("ping").arity(-1).op(Meta).padding(2).nofwd(),
Cmd::new("select").arity(2).op(Meta).padding(1).nofwd(),
Cmd::new("hello").arity(2).op(Meta).padding(4).nofwd(),
Cmd::new("hello").arity(-1).op(Meta).padding(4).nofwd(),
// quit、master的指令token数/arity应该都是1
Cmd::new("quit").arity(1).op(Meta).nofwd(),
Cmd::new("master").arity(1).op(Meta).nofwd().master().swallow(),
Expand Down Expand Up @@ -502,14 +503,14 @@ pub(super) static SUPPORTED: Commands = {

// geo 相关指令
//("geoadd", "geoadd", -5, Store, 1, 1, 1, 3, false, false, true, true, false),
//("georadius", "georadius", -6, Get, 1, 1, 1, 3, false, false, true, false, false),
//("georadiusbymember", "georadiusbymember", -5, Get, 1, 1, 1, 3, false, false, true, false, false),
//("georadius", "georadius", -6, Store, 1, 1, 1, 3, false, false, true, false, false),
//("georadiusbymember", "georadiusbymember", -5, Store, 1, 1, 1, 3, false, false, true, false, false),
//("geohash", "geohash", -2, Get, 1, 1, 1, 3, false, false, true, false, false),
//("geopos", "geopos", -2, Get, 1, 1, 1, 3, false, false, true, false, false),
//("geodist", "geodist", -4, Get, 1, 1, 1, 3, false, false, true, false, false),
Cmd::new("geoadd").arity(-5).op(Store).first(1).last(1).step(1).padding(3).key().val(),
Cmd::new("georadius").arity(-6).op(Get).first(1).last(1).step(1).padding(3).key(),
Cmd::new("georadiusbymember").arity(-5).op(Get).first(1).last(1).step(1).padding(3).key(),
Cmd::new("georadius").arity(-6).op(Store).first(1).last(1).step(1).padding(3).key(),
Cmd::new("georadiusbymember").arity(-5).op(Store).first(1).last(1).step(1).padding(3).key(),
Cmd::new("geohash").arity(-2).op(Get).first(1).last(1).step(1).padding(3).key(),
Cmd::new("geopos").arity(-2).op(Get).first(1).last(1).step(1).padding(3).key(),
Cmd::new("geodist").arity(-4).op(Get).first(1).last(1).step(1).padding(3).key(),
Expand Down
3 changes: 3 additions & 0 deletions protocol/src/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,3 +414,6 @@ fn calculate_hash<H: Hash>(alg: &H, key: &RingSlice) -> i64 {
fn default_hash() -> i64 {
AUTO.fetch_add(1, Ordering::Relaxed)
}

// tests only
pub use packet::RequestContext;
2 changes: 1 addition & 1 deletion protocol/src/redis/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const CRLF_LEN: usize = b"\r\n".len();
// 必须是u64长度的。
#[repr(C)]
#[derive(Debug)]
struct RequestContext {
pub struct RequestContext {
bulk: u16,
op_code: u16,
first: bool, // 在multi-get请求中是否是第一个请求。
Expand Down
6 changes: 4 additions & 2 deletions rt/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ impl<F: Future<Output = Result<()>> + Unpin + ReEnter + Debug> Entry<F> {
} else {
self.last_rx = now;
}
let ret = Pin::new(&mut self.inner).poll(cx);

// 发现异常立即返回处理
let ret = Pin::new(&mut self.inner).poll(cx)?;
let (tx_post, rx_post) = (self.inner.num_tx(), self.inner.num_rx());
if tx_post > rx_post {
self.last = Instant::now();
Expand All @@ -132,7 +134,7 @@ impl<F: Future<Output = Result<()>> + Unpin + ReEnter + Debug> Entry<F> {
}
}
//}
ret
ret.map(|rs| Ok(rs))
}
}
}
Expand Down
Loading

0 comments on commit 347f29b

Please sign in to comment.