Skip to content

Commit

Permalink
Merge pull request #492 from weibocom/metric_err_20241011
Browse files Browse the repository at this point in the history
增加异常响应的统计
  • Loading branch information
hustfisher authored Oct 18, 2024
2 parents c4a1725 + f412973 commit 34db998
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 4 deletions.
6 changes: 6 additions & 0 deletions protocol/src/memcache/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ impl Protocol for MemcacheBinary {
None
}
}

// mc目前不需要统计error,因为mc的error基本都是get miss,del not-found这种,这种错误不需要统计
#[inline]
fn metric_err(&self) -> bool {
false
}
}
impl MemcacheBinary {
// 根据req构建response,status为mc协议status,共11种
Expand Down
4 changes: 4 additions & 0 deletions protocol/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ pub trait Proto: Unpin + Clone + Send + Sync + 'static {
fn max_tries(&self, _req_op: Operation) -> u8 {
1_u8
}

fn metric_err(&self) -> bool {
true
}
}

pub trait RequestProcessor {
Expand Down
9 changes: 8 additions & 1 deletion stream/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub struct Handler<'r, Req, P, S> {
s: S,
parser: P,
rtt: Metric,
err: Metric,
host_metric: HostMetric,

num: Number,
Expand Down Expand Up @@ -63,12 +64,14 @@ where
data.enable();
let name = path.clone();
let rtt = path.rtt("req");
let err = path.qps("be_err");
Self {
data,
pending: VecDeque::with_capacity(31),
s,
parser,
rtt,
err,
host_metric: HostMetric::from(path),
num: Number::default(),
req_buf: Vec::with_capacity(4),
Expand Down Expand Up @@ -142,8 +145,12 @@ where
}
let (req, start) = self.pending.pop_front().expect("take response");
self.num.rx();
// 统计请求耗时
// 统计请求耗时、异常响应
self.rtt += start.elapsed();
if self.parser.metric_err() && !cmd.ok() {
self.err += 1;
}

self.parser.check(&*req, &cmd);
req.on_complete(cmd);
continue;
Expand Down
8 changes: 7 additions & 1 deletion stream/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,16 @@ where

let op = ctx.request().operation();
if let Some(rsp) = response {
if ctx.is_write_back() && rsp.ok() {
let rsp_ok = rsp.ok();
if ctx.is_write_back() && rsp_ok {
ctx.async_write_back(&self.parser, rsp, self.top.exp_sec(), &mut self.metrics);
self.async_pending.push_back(ctx);
}

// 不区分是否last,这样更精确的感知总的异常响应数量
if self.parser.metric_err() && !rsp_ok {
*self.metrics.err() += 1;
}
}

// 数据写完,统计耗时。当前数据只写入到buffer中,
Expand Down
10 changes: 8 additions & 2 deletions tests/src/bkdrsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,15 @@ use sharding::{
fn bkdrsub_one() {
let hasher = Hasher::from("bkdrsub");

let key1 = "mfh15d#3940964349989430";
let key1 = "otdn#1042015:carSubBrand_e4ab74c125e9e95edad691ffe9820118";
let hash1 = hasher.hash(&key1.as_bytes());
println!("key:{}, hash:{}, idx:{}", key1, hash1, hash1 % 180);

let shards = 1080;
let servers = vec!["padding".to_string(); shards];
let dist = Distribute::from("modrange-8640", &servers);
let dist_idx = dist.index(hash1);

println!("key:{}, hash:{}, idx:{}", key1, hash1, dist_idx);
}

// TODO 临时批量文件的hash、dist校验测试,按需打开
Expand Down

0 comments on commit 34db998

Please sign in to comment.