Skip to content

Commit

Permalink
Merge pull request #53 from weibocom/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
FicoHu authored Jan 13, 2022
2 parents eeffe38 + bb399da commit f0e241f
Show file tree
Hide file tree
Showing 162 changed files with 9,692 additions and 5,648 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ members = [
"log",
"metrics",
"ds",
"rt",
"tests",
]

Expand Down
9 changes: 7 additions & 2 deletions agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,27 @@
name = "agent"
version = "0.0.1"
authors = ["icy"]
edition = "2018"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
context = { path = "../context" }
sharding = { path = "../sharding" }
net = { path = "../net"}
protocol = { path = "../protocol" }
endpoint = { path = "../endpoint" }
discovery = { path = "../discovery" }
elog = { path = "../log", package = "log" }
stream = { path = "../stream" }
metrics = { path = "../metrics" }
rt = { path = "../rt" }

crossbeam-channel = "0.5.1"
log = "0.4.14"
ds = { path = "../ds" }
backtrace = "0.3.63"

tokio = {version = "1.12.0", features = ["rt", "net", "rt-multi-thread", "time", "macros"]}
tokio = {version = "1.15.0", features = ["rt", "net", "rt-multi-thread", "time", "macros"]}
mimalloc = { version = "*", default-features = false }

72 changes: 55 additions & 17 deletions agent/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use mimalloc::MiMalloc;

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;

Expand All @@ -8,38 +7,77 @@ use context::Context;
use crossbeam_channel::bounded;
use discovery::*;

use std::io::Result;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use rt::spawn;
use std::time::Duration;
use tokio::spawn;

//#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
#[tokio::main(flavor = "current_thread")]
use protocol::Result;

use std::ops::Deref;
use std::panic;

use backtrace::Backtrace;

#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
//#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
panic::set_hook(Box::new(|panic_info| {
let (filename, line) = panic_info
.location()
.map(|loc| (loc.file(), loc.line()))
.unwrap_or(("<unknown>", 0));

let cause = panic_info
.payload()
.downcast_ref::<String>()
.map(String::deref);

let cause = cause.unwrap_or_else(|| {
panic_info
.payload()
.downcast_ref::<&str>()
.map(|s| *s)
.unwrap_or("<cause unknown>")
});

log::error!("A panic occurred at {}:{}: {}", filename, line, cause);
log::error!("panic backtrace: {:?}", Backtrace::new())
}));
let ctx = Context::from_os_args();
ctx.check()?;

let _l = service::listener_for_supervisor(ctx.port()).await?;
elog::init(ctx.log_dir(), &ctx.log_level)?;
metrics::init(&ctx.metrics_url());
metrics::init_local_ip(&ctx.metrics_probe);

// 在专用线程中初始化4个定时任务。
metrics::init_local_ip(&ctx.metrics_probe);
let mut spawner = rt::DedicatedSpawner::new();
let metric_cycle = Duration::from_secs(10);
spawner.spawn(metrics::Sender::new(&ctx.metrics_url(), metric_cycle));
spawner.spawn(metrics::MetricRegister::default());
spawner.spawn(discovery::dns::start_dns_resolver_refresher());
use discovery::watch_discovery;
let discovery = Discovery::from_url(ctx.discovery());
let (tx_disc, rx_disc) = bounded(512);
let (tx, rx) = bounded(128);
let snapshot = ctx.snapshot().to_string();
let tick = ctx.tick();
let mut fix = discovery::Fixed::default();
fix.register(ctx.idc_path(), sharding::build_refresh_idc());
spawner.spawn(watch_discovery(snapshot, discovery, rx, tick, fix));
spawner.spawn(stream::start_delay_drop());
log::info!("starting a dedicated thread for periodic tasks");
spawner.start_on_dedicated_thread();

// 启动定期更新资源配置线程
discovery::start_watch_discovery(ctx.snapshot(), discovery, rx_disc, ctx.tick());
// 部分资源需要延迟drop。

let mut listeners = ctx.listeners();
log::info!("====> server inited <====");

let session_id = Arc::new(AtomicUsize::new(0));
let mut listeners = ctx.listeners();
loop {
for quard in listeners.scan().await {
let discovery = tx_disc.clone();
let session_id = session_id.clone();
let discovery = tx.clone();
spawn(async move {
let session_id = session_id.clone();
match service::process_one(&quard, discovery, session_id).await {
match service::process_one(&quard, discovery).await {
Ok(_) => log::info!("service complete:{}", quard),
Err(e) => log::warn!("service failed. {} err:{:?}", quard, e),
}
Expand Down
106 changes: 46 additions & 60 deletions agent/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
use net::listener::Listener;
use std::io::{Error, ErrorKind, Result};
use std::sync::atomic::{AtomicUsize, Ordering};
use rt::spawn;
use std::sync::Arc;
use std::time::Duration;
use tokio::spawn;

use context::Quadruple;
use crossbeam_channel::Sender;
use discovery::*;
use metrics::MetricName;
use protocol::Protocols;
use stream::io::{copy_bidirectional, ConnectStatus};
use discovery::TopologyWriteGuard;
use metrics::Path;
use protocol::callback::{Callback, CallbackPtr};
use protocol::{Parser, Result};
use stream::pipeline::copy_bidirectional;
use stream::Builder;

use stream::Request;
type Endpoint = Arc<stream::Backend<Request>>;
type Topology = endpoint::Topology<Builder<Parser, Request>, Endpoint, Request, Parser>;
// 一直侦听,直到成功侦听或者取消侦听(当前尚未支持取消侦听)
// 1. 尝试侦听之前,先确保服务配置信息已经更新完成
pub(super) async fn process_one(
quard: &Quadruple,
discovery: Sender<discovery::TopologyWriteGuard<endpoint::Topology<Protocols>>>,
session_id: Arc<AtomicUsize>,
discovery: Sender<TopologyWriteGuard<Topology>>,
) -> std::result::Result<(), Box<dyn std::error::Error>> {
let p = Protocols::try_from(&quard.protocol())?;
let p = Parser::try_from(&quard.protocol())?;
let top = endpoint::Topology::try_from(p.clone(), quard.endpoint())?;
let (tx, rx) = discovery::topology(top, &quard.service());
// 注册,定期更新配置
Expand All @@ -28,86 +31,69 @@ pub(super) async fn process_one(
let mut tries = 0usize;
while !rx.inited() {
if tries >= 2 {
log::info!("waiting inited. {} ", quard);
log::warn!("waiting inited. {} ", quard);
}
tokio::time::sleep(Duration::from_secs(1 << (tries.min(10)))).await;
tries += 1;
}
log::info!("service inited. {} ", quard);
log::debug!("service inited. {} ", quard);
let switcher = ds::Switcher::from(true);
let top = Arc::new(RefreshTopology::new(rx, switcher.clone()));
let receiver = top.as_ref() as *const RefreshTopology<Topology> as usize;
let cb = RefreshTopology::<Topology>::static_send;
let path = Path::new(vec![quard.protocol(), &quard.biz()]);
let cb = Callback::new(receiver, cb);
let cb_ptr: CallbackPtr = (&cb).into();

// 服务注册完成,侦听端口直到成功。
while let Err(e) = _process_one(quard, p.clone(), rx.clone(), session_id.clone()).await {
while let Err(e) = _process_one(quard, &p, &top, cb_ptr.clone(), &path).await {
log::warn!("service process failed. {}, err:{:?}", quard, e);
tokio::time::sleep(Duration::from_secs(6)).await;
}
switcher.off();

// TODO 延迟一秒,释放top内存。
// 因为回调,有可能在连接释放的时候,还在引用top。
tokio::time::sleep(Duration::from_secs(3)).await;
Ok(())
}

use endpoint::RefreshTopology;
async fn _process_one(
quard: &Quadruple,
p: Protocols,
top: discovery::TopologyReadGuard<endpoint::Topology<Protocols>>,
session_id: Arc<AtomicUsize>,
p: &Parser,
top: &Arc<RefreshTopology<Topology>>,
cb: CallbackPtr,
path: &Path,
) -> Result<()> {
let l = Listener::bind(&quard.family(), &quard.address()).await?;

let mid = metrics::register!(quard.protocol(), &quard.biz());
let metric_id = mid.id();
log::info!("service started. {}", quard);
use stream::StreamMetrics;

loop {
let top = top.clone();
// 等待初始化成功
let (client, _addr) = l.accept().await?;
let agent = quard.endpoint().to_owned();
let p = p.clone();
let session_id = session_id.fetch_add(1, Ordering::AcqRel);
let cb = cb.clone();
let metrics = StreamMetrics::new(path);
log::debug!("connection established:{:?}", path);
spawn(async move {
metrics::qps("conn", 1, metric_id);
metrics::count("conn", 1, metric_id);
if let Err(e) =
process_one_connection(client, top, agent, p, session_id, metric_id).await
{
log::debug!("{} disconnected. {:?} ", metric_id.name(), e);
use protocol::Topology;
let hasher = top.hasher();
use protocol::Error;
if let Err(e) = copy_bidirectional(cb, metrics, hasher, client, p).await {
match e {
Error::Quit => {} // client发送quit协议退出
Error::ReadEof => {}
e => log::info!("disconnected. {:?} ", e),
}
}
metrics::count("conn", -1, metric_id);
});
}
}

async fn process_one_connection(
mut client: net::Stream,
top: TopologyReadGuard<endpoint::Topology<Protocols>>,
endpoint: String,
p: Protocols,
session_id: usize,
metric_id: usize,
) -> Result<()> {
use endpoint::Endpoint;
let ticker = top.tick();
loop {
let agent = Endpoint::from_discovery(&endpoint, p.clone(), top.clone())
.await?
.ok_or_else(|| {
Error::new(
ErrorKind::NotFound,
format!("'{}' is not a valid endpoint type", endpoint),
)
})?;
let ticker = ticker.clone();
match copy_bidirectional(agent, &mut client, p.clone(), session_id, metric_id, ticker)
.await?
{
ConnectStatus::EOF => break,
ConnectStatus::Reuse => {
log::info!("{} connection({}) reused.", metric_id.name(), session_id);
metrics::qps("conn_reuse", 1, metric_id);
}
}
}
Ok(())
}

use tokio::net::TcpListener;
// 监控一个端口,主要用于进程监控
pub(super) async fn listener_for_supervisor(port: u16) -> Result<TcpListener> {
Expand Down
6 changes: 3 additions & 3 deletions context/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
name = "context"
version = "0.1.0"
authors = ["icy"]
edition = "2018"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
#log = { path = "../log" }

clap = "^3.0.0-beta.5"
clap = {version = "3.0.0-rc.9", features = ["derive"] }
url = "2.2.2"
tokio = { version = "1.12.0", features = ["fs"] }
tokio = { version = "1.15.0", features = ["fs"] }
log = "0.4.14"

[features]
Expand Down
Loading

0 comments on commit f0e241f

Please sign in to comment.