Skip to content

Commit

Permalink
Merge pull request #58 from weibocom/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
FicoHu authored Jan 20, 2022
2 parents 401b918 + 68234fb commit 7007e93
Show file tree
Hide file tree
Showing 31 changed files with 237 additions and 406 deletions.
4 changes: 2 additions & 2 deletions agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ elog = { path = "../log", package = "log" }
stream = { path = "../stream" }
metrics = { path = "../metrics" }
rt = { path = "../rt" }
ds = { path = "../ds" }

crossbeam-channel = "0.5.1"
log = "0.4.14"
ds = { path = "../ds" }
backtrace = "0.3.63"

tokio = {version = "1.15.0", features = ["rt", "net", "rt-multi-thread", "time", "macros"]}
mimalloc = { version = "*", default-features = false }

rlimit = "0.3.0"
7 changes: 5 additions & 2 deletions agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ static GLOBAL: MiMalloc = MiMalloc;

mod service;
use context::Context;
use crossbeam_channel::bounded;
use discovery::*;

use rt::spawn;
Expand Down Expand Up @@ -44,6 +43,10 @@ async fn main() -> Result<()> {
}));
let ctx = Context::from_os_args();
ctx.check()?;
// set number of file
if let Err(e) = rlimit::setrlimit(rlimit::Resource::NOFILE, ctx.no_file, ctx.no_file) {
log::info!("set rlimit to {} failed:{:?}", ctx.no_file, e);
}

let _l = service::listener_for_supervisor(ctx.port()).await?;
elog::init(ctx.log_dir(), &ctx.log_level)?;
Expand All @@ -57,7 +60,7 @@ async fn main() -> Result<()> {
spawner.spawn(discovery::dns::start_dns_resolver_refresher());
use discovery::watch_discovery;
let discovery = Discovery::from_url(ctx.discovery());
let (tx, rx) = bounded(128);
let (tx, rx) = ds::chan::bounded(128);
let snapshot = ctx.snapshot().to_string();
let tick = ctx.tick();
let mut fix = discovery::Fixed::default();
Expand Down
6 changes: 3 additions & 3 deletions agent/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::sync::Arc;
use std::time::Duration;

use context::Quadruple;
use crossbeam_channel::Sender;
use discovery::TopologyWriteGuard;
use ds::chan::Sender;
use metrics::Path;
use protocol::callback::{Callback, CallbackPtr};
use protocol::{Parser, Result};
Expand Down Expand Up @@ -52,7 +52,6 @@ pub(super) async fn process_one(
}
switcher.off();

// TODO 延迟一秒,释放top内存。
// 因为回调,有可能在连接释放的时候,还在引用top。
tokio::time::sleep(Duration::from_secs(3)).await;
Ok(())
Expand All @@ -78,6 +77,7 @@ async fn _process_one(
let p = p.clone();
let cb = cb.clone();
let metrics = StreamMetrics::new(path);
let path = format!("{:?}", path);
log::debug!("connection established:{:?}", path);
spawn(async move {
use protocol::Topology;
Expand All @@ -87,7 +87,7 @@ async fn _process_one(
match e {
Error::Quit => {} // client发送quit协议退出
Error::ReadEof => {}
e => log::info!("disconnected. {:?} ", e),
e => log::info!("{:?} disconnected. {:?}", path, e),
}
}
});
Expand Down
5 changes: 4 additions & 1 deletion context/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub struct Context {
#[clap(long, help("port for suvervisor"), default_value("9984"))]
port: u16,

#[clap(long, help("number of open file"), default_value("204800"))]
pub no_file: u64,

#[clap(
short,
long,
Expand Down Expand Up @@ -144,7 +147,7 @@ impl ListenerIter {
continue;
}
if let Some(one) = Quadruple::parse(&name) {
log::info!("service parsed :{}", one);
log::debug!("service parsed :{}", one);
listeners.push(one);
self.processed.insert(name.to_string(), ());
}
Expand Down
1 change: 0 additions & 1 deletion discovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ serde_yaml = "0.8.17"
serde_json = "1.0.65"
rand = "0.8.4"
futures = "0.3.16"
crossbeam-channel = "0.5.1"
md5 = "0.7"
bs58 = "0.4"
trust-dns-resolver = "0.20.0"
Expand Down
2 changes: 1 addition & 1 deletion discovery/src/update.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// 定期更新discovery.
use super::{Discover, ServiceId, TopologyWrite};
use crossbeam_channel::Receiver;
use ds::chan::Receiver;
use std::time::{Duration, Instant};
use tokio::time::interval;

Expand Down
3 changes: 1 addition & 2 deletions ds/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ rand = "0.8.4"
cache_line_size = "1.0.0"
log = "0.4.14"
crossbeam-queue = "0.3.2"
crossbeam-channel = "0.5.1"
cfg-if = "1"
futures = "0.3.17"
atomic-waker = "*"

backtrace = "0.3.63"

3 changes: 3 additions & 0 deletions ds/src/chan/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub type Sender<T> = crossbeam_channel::Sender<T>;
pub type Receiver<T> = crossbeam_channel::Receiver<T>;
pub use crossbeam_channel::bounded;
1 change: 1 addition & 0 deletions ds/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod chan;
mod cow;
mod mem;
pub mod queue;
Expand Down
2 changes: 2 additions & 0 deletions ds/src/queue/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mod pinned;
pub use pinned::PinnedQueue;
File renamed without changes.
Empty file added ds/src/queue/receiver.rs
Empty file.
Empty file added ds/src/queue/sender.rs
Empty file.
26 changes: 22 additions & 4 deletions endpoint/src/cacheservice/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use serde::{Deserialize, Serialize};
use std::time::Duration;

#[derive(Serialize, Deserialize, Clone, Debug, Default, Hash)]
pub struct Namespace {
Expand All @@ -18,20 +19,37 @@ pub struct Namespace {
pub slave: Vec<String>,
#[serde(default)]
pub slave_l1: Vec<Vec<String>>,

#[serde(default)]
pub timeout_ms_master: u32,
#[serde(default)]
pub timeout_ms_slave: u32,
}

impl Namespace {
pub(crate) fn parse<F: FnMut(Self)>(cfg: &str, namespace: &str, mut f: F) {
pub(crate) fn try_from(cfg: &str, namespace: &str) -> Option<Self> {
log::debug!("namespace:{} cfg:{} updating", namespace, cfg);
match serde_yaml::from_str::<Namespace>(cfg) {
Ok(ns) => {
f(ns);
}
Err(e) => {
log::warn!("parse namespace error. {} msg:{:?}", namespace, e);
None
}
Ok(ns) => {
if ns.master.len() == 0 {
log::info!("cache service master empty. namespace:{}", namespace);
None
} else {
Some(ns)
}
}
}
}
pub(super) fn timeout_master(&self) -> Duration {
Duration::from_millis(250.max(self.timeout_ms_master as u64))
}
pub(super) fn timeout_slave(&self) -> Duration {
Duration::from_millis(100.max(self.timeout_ms_slave as u64))
}
}

impl Namespace {}
Expand Down
15 changes: 6 additions & 9 deletions endpoint/src/cacheservice/topo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,7 @@ where
{
#[inline]
fn update(&mut self, namespace: &str, cfg: &str) {
super::config::Namespace::parse(cfg, namespace, |ns| {
if ns.master.len() == 0 {
log::info!("cache service master empty. namespace:{}", namespace);
return;
}
if let Some(ns) = super::config::Namespace::try_from(cfg, namespace) {
self.hasher = Hasher::from(&ns.hash);
let dist = &ns.distribution;

Expand All @@ -179,12 +175,13 @@ where
old.insert(e.1, e.0);
}
}
let mto = Duration::from_millis(500);
let mto = ns.timeout_master();
let rto = ns.timeout_slave();

// 准备master
let master = self.build(old, ns.master, dist, namespace, mto);
self.streams.push(master);

let rto = Duration::from_millis(150);
// master_l1
self.has_l1 = ns.master_l1.len() > 0;
for l1 in ns.master_l1 {
Expand All @@ -203,8 +200,8 @@ where
let g = self.build(old, sl1, dist, namespace, rto);
self.streams.push(g);
}
// old 会被dopped
});
}
// old 会被dopped
}
// 不同的业务共用一个配置。把不同的业务配置给拆分开
#[inline]
Expand Down
Loading

0 comments on commit 7007e93

Please sign in to comment.