Skip to content

Commit

Permalink
Merge pull request #257 from weibocom/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
FicoHu authored Mar 16, 2023
2 parents ffb30ff + 4adc188 commit 4091b8f
Show file tree
Hide file tree
Showing 46 changed files with 822 additions and 580 deletions.
2 changes: 2 additions & 0 deletions agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ rocket = { version = "0.5.0-rc.2", features = ["json"], default-features = false
#rocket_async_compression = "0.1.2"
reqwest = { version = "0.11.4", default-features = false }

proc-macro2 = "<= 1.0.51"

[features]
default = ["prometheus"]
http = []
Expand Down
2 changes: 1 addition & 1 deletion context/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ ds = { path = "../ds" }

clap = {version = "3.1.1", features = ["derive"] }
url = "2.2.2"
tokio = { version = "1.24.2", features = ["rt", "net", "rt-multi-thread", "time"], default-features = false }
tokio = { version = "1.24.2", features = ["rt", "net", "rt-multi-thread", "time", "fs"], default-features = false }
git-version = "0.3.5"
lazy_static = "1.4.0"

Expand Down
5 changes: 1 addition & 4 deletions discovery/src/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,17 +165,14 @@ pub async fn start_dns_resolver_refresher() {
let mut cache = dns_resolver.tx;
let mut rx = dns_resolver.reg_rx;
let mut resolver = dns_resolver.resolver;
use std::task::Poll;
let noop = noop_waker::noop_waker();
let mut ctx = std::task::Context::from_waker(&noop);
use ds::time::{Duration, Instant};
const BATCH_CNT: usize = 128;
let mut tick = tokio::time::interval(Duration::from_secs(1));
//let mut last = Instant::now(); // 上一次刷新的时间
let mut idx = 0;
loop {
let mut regs = Vec::new();
while let Poll::Ready(Some(reg)) = rx.poll_recv(&mut ctx) {
while let Ok(reg) = rx.try_recv() {
regs.push(reg);
}
if regs.len() > 0 {
Expand Down
24 changes: 15 additions & 9 deletions ds/src/mem/guarded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,6 @@ impl GuardedBuffer {
{
self.gc();
self.inner.copy_from(r)
//loop {
// let b = self.inner.as_mut_bytes();
// let cap = b.len();
// let (n, out) = r.read(b);
// self.inner.advance_write(n);
// if cap > n || n == 0 {
// return out;
// }
//}
}
#[inline]
pub fn read(&self) -> RingSlice {
Expand Down Expand Up @@ -103,6 +94,15 @@ impl Display for GuardedBuffer {
)
}
}
impl fmt::Debug for GuardedBuffer {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"taken:{} {} guarded:{}",
self.taken, self.inner, self.guards
)
}
}

pub struct MemGuard {
mem: RingSlice,
Expand Down Expand Up @@ -187,6 +187,12 @@ impl Display for MemGuard {
write!(f, "data:{} guarded:{}", self.mem, !self.guard.is_null())
}
}
impl fmt::Debug for MemGuard {
#[inline]
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "data:{:?} guarded:{}", self.mem, !self.guard.is_null())
}
}
impl Drop for GuardedBuffer {
#[inline]
fn drop(&mut self) {
Expand Down
1 change: 1 addition & 0 deletions ds/src/mem/policy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const BUF_MIN: usize = 2 * 1024;
// 内存需要缩容时的策略
// 为了避免频繁的缩容,需要设置一个最小频繁,通常使用最小间隔时间
#[derive(Debug)]
pub struct MemPolicy {
max: u32, // 最近一个周期内,最大的内存使用量
cycles: u32, // 连续多少次tick返回true
Expand Down
5 changes: 2 additions & 3 deletions endpoint/src/cacheservice/topo.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use crate::{Builder, Endpoint, Topology};
use discovery::TopologyWrite;
use ds::time::Duration;
use protocol::{Protocol, Request, Resource, TryNextType};
use sharding::hash::{Hash, HashKey, Hasher};
use sharding::Distance;
use std::collections::HashMap;

use crate::shards::Shards;
use crate::TimeoutAdjust;
use crate::Timeout;

#[derive(Clone)]
pub struct CacheService<B, E, Req, P> {
Expand Down Expand Up @@ -240,7 +239,7 @@ where
addrs: Vec<String>,
dist: &str,
name: &str,
timeout: Duration,
timeout: Timeout,
) -> Shards<E, Req> {
Shards::from(dist, addrs, |addr| {
old.remove(addr).map(|e| e).unwrap_or_else(|| {
Expand Down
39 changes: 24 additions & 15 deletions endpoint/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,35 @@ mod refresh;
pub use refresh::{CheckedTopology, RefreshTopology};

// 不同资源默认的超时时间
const TO_PHANTOM_M: Duration = Duration::from_millis(200);
const TO_REDIS_M: Duration = Duration::from_millis(500);
const TO_REDIS_S: Duration = Duration::from_millis(200);
const TO_MC_M: Duration = Duration::from_millis(500);
const TO_MC_S: Duration = Duration::from_millis(80);
const TO_PHANTOM_M: Timeout = Timeout::from_millis(200);
const TO_REDIS_M: Timeout = Timeout::from_millis(500);
const TO_REDIS_S: Timeout = Timeout::from_millis(200);
const TO_MC_M: Timeout = Timeout::from_millis(500);
const TO_MC_S: Timeout = Timeout::from_millis(80);

trait TimeoutAdjust: Sized {
fn adjust(&mut self, ms: u32);
fn to(mut self, ms: u32) -> Self {
#[derive(Copy, Clone, Debug)]
pub struct Timeout {
ms: u16,
}
impl Timeout {
const fn from_millis(ms: u16) -> Self {
Self { ms }
}
pub fn adjust(&mut self, ms: u32) {
self.ms = ms.max(100).min(6000) as u16;
}
pub fn to(mut self, ms: u32) -> Self {
self.adjust(ms);
self
}
pub fn ms(&self) -> u16 {
self.ms
}
}

use ds::time::Duration;
impl TimeoutAdjust for Duration {
fn adjust(&mut self, ms: u32) {
if ms > 0 {
*self = Duration::from_millis(ms.max(100) as u64);
}
use std::time::Duration;
impl Into<Duration> for Timeout {
fn into(self) -> Duration {
Duration::from_millis(self.ms as u64)
}
}

20 changes: 10 additions & 10 deletions endpoint/src/msgque/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ pub struct Namespace {
pub sized_queue: BTreeMap<usize, Vec<String>>,

#[serde(default)]
timeout_read: u64,
pub(crate) timeout_read: u32,
#[serde(default)]
timeout_write: u64,
pub(crate) timeout_write: u32,

#[serde(default)]
pub offline_idle_time: Duration,
Expand Down Expand Up @@ -85,13 +85,13 @@ impl Namespace {
}
}

#[inline]
pub(crate) fn timeout_read(&self) -> Duration {
Duration::from_millis(200.max(self.timeout_read))
}
//#[inline]
//pub(crate) fn timeout_read(&self) -> Duration {
// Duration::from_millis(200.max(self.timeout_read))
//}

#[inline]
pub(crate) fn timeout_write(&self) -> Duration {
Duration::from_millis(300.max(self.timeout_write))
}
//#[inline]
//pub(crate) fn timeout_write(&self) -> Duration {
// Duration::from_millis(300.max(self.timeout_write))
//}
}
23 changes: 10 additions & 13 deletions endpoint/src/msgque/topo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,9 @@ use std::{
};
use tokio::time::Instant;

use std::{
collections::{BTreeMap, HashMap},
time::Duration,
};
use std::collections::{BTreeMap, HashMap};

use crate::{Builder, Endpoint, Topology};
use crate::{Builder, Endpoint, Timeout, Topology};
use sharding::hash::{Hash, HashKey, Hasher, Padding};

use crate::msgque::strategy::hitfirst::Node;
Expand Down Expand Up @@ -52,8 +49,8 @@ pub struct MsgQue<B, E, Req, P> {
// 占位hasher,暂时不需要真实计算
max_size: usize,

timeout_write: Duration,
timeout_read: Duration,
timeout_write: Timeout,
timeout_read: Timeout,
_marker: std::marker::PhantomData<(B, Req)>,
}

Expand All @@ -71,8 +68,8 @@ impl<B, E, Req, P> From<P> for MsgQue<B, E, Req, P> {
write_strategy: Default::default(),
parser,
max_size: super::BLOCK_SIZE,
timeout_write: Duration::from_millis(200),
timeout_read: Duration::from_millis(100),
timeout_write: Timeout::from_millis(200),
timeout_read: Timeout::from_millis(100),
_marker: Default::default(),
}
}
Expand Down Expand Up @@ -287,7 +284,7 @@ where
old: &mut HashMap<String, E>,
addrs: &BTreeMap<usize, Vec<String>>,
name: &str,
timeout: Duration,
timeout: Timeout,
) -> Vec<(String, E, usize)> {
let mut streams = Vec::with_capacity(addrs.len());
for (size, servs) in addrs.iter() {
Expand Down Expand Up @@ -317,7 +314,7 @@ where
&mut self,
addrs: &BTreeMap<usize, Vec<String>>,
name: &str,
timeout: Duration,
timeout: Timeout,
) -> BTreeMap<usize, Vec<(String, E)>> {
let mut old_streams = HashMap::with_capacity(self.streams_write.len() * 3);
if self.streams_write.len() > 0 {
Expand Down Expand Up @@ -393,8 +390,8 @@ where

self.service = name.to_string();

self.timeout_read = ns.timeout_read();
self.timeout_write = ns.timeout_write();
self.timeout_read.adjust(ns.timeout_read);
self.timeout_write.adjust(ns.timeout_write);

let old_r = self.streams_read.split_off(0);
let mut old_streams_read: HashMap<String, E> =
Expand Down
7 changes: 3 additions & 4 deletions endpoint/src/phantomservice/topo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};

use crate::{Builder, Endpoint, Topology};
Expand All @@ -21,7 +20,7 @@ use sharding::{
};

use super::config::PhantomNamespace;
use crate::TimeoutAdjust;
use crate::Timeout;

const CONFIG_UPDATED_KEY: &str = "__config__";

Expand All @@ -36,7 +35,7 @@ pub struct PhantomService<B, E, Req, P> {
distribution: Range,
parser: P,
service: String,
timeout: Duration,
timeout: Timeout,
_mark: PhantomData<(B, Req)>,
}

Expand Down Expand Up @@ -178,7 +177,7 @@ where
E: Endpoint<Item = Req>,
{
#[inline]
fn take_or_build(&self, old: &mut HashMap<String, Vec<E>>, addr: &str, timeout: Duration) -> E {
fn take_or_build(&self, old: &mut HashMap<String, Vec<E>>, addr: &str, timeout: Timeout) -> E {
match old.get_mut(addr).map(|endpoints| endpoints.pop()) {
Some(Some(end)) => end,
_ => B::build(
Expand Down
6 changes: 4 additions & 2 deletions endpoint/src/redisservice/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ pub(super) mod config;
pub mod topo;

struct Context {
runs: u32, // 运行的次数
idx: u32,
runs: u16, // 运行的次数
idx: u16, //最多有65535个主从
shard_idx: u16,
_ignore: u16,
}

#[inline]
Expand Down
Loading

0 comments on commit 4091b8f

Please sign in to comment.