Skip to content

Commit

Permalink
Merge pull request #126 from weibocom/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
icycrystal4 authored Aug 12, 2022
2 parents 042f287 + 03a23aa commit a19c08f
Show file tree
Hide file tree
Showing 48 changed files with 1,737 additions and 117 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ members = [
"metrics",
"ds",
"rt",
"api",
"tests",
]

Expand All @@ -23,3 +24,4 @@ members = [
#codegen-units = 1
#lto = "fat"
#opt-level = 3

5 changes: 5 additions & 0 deletions Rocket.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## defaults for _all_ profiles
[default]
address = "0.0.0.0"
port = 8000
workers = 8
7 changes: 7 additions & 0 deletions agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ stream = { path = "../stream" }
metrics = { path = "../metrics" }
rt = { path = "../rt" }
ds = { path = "../ds" }
api = { path = "../api" }

log = "0.4.14"
backtrace = "0.3.63"
Expand All @@ -26,3 +27,9 @@ tokio = {version = "1.17.0", features = ["rt", "net", "rt-multi-thread", "time",
mimalloc = { version = "*", default-features = false }

rlimit = "0.3.0"

# rocket for restful api
rocket = { version = "0.5.0-rc.2", features = ["json"] }

[features]
restful_api_enable = []
74 changes: 72 additions & 2 deletions agent/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use discovery::dns::DnsResolver;
use mimalloc::MiMalloc;
#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
Expand All @@ -13,15 +14,34 @@ use protocol::Result;

use std::ops::Deref;
use std::panic;
use std::path::Path;

use backtrace::Backtrace;

#[cfg(feature = "restful_api_enable")]
#[macro_use]
extern crate rocket;

#[cfg(feature = "restful_api_enable")]
use rocket::{Build, Rocket};

use api;
use api::props;

// 默认支持
#[cfg(not(feature = "restful_api_enable"))]
fn main() -> Result<()> {
let ctx = Context::from_os_args();
ctx.check()?;
set_rlimit(ctx.no_file);
set_panic_hook();

// 提前初始化log,避免延迟导致的异常
if let Err(e) = elog::init(ctx.log_dir(), &ctx.log_level) {
panic!("log init failed: {:?}", e);
}

log::info!("launch without rocket!");
let threads = ctx.thread_num as usize;
tokio::runtime::Builder::new_multi_thread()
.worker_threads(threads)
Expand All @@ -33,9 +53,44 @@ fn main() -> Result<()> {
.block_on(async { run(ctx).await })
}

// 支持 restful api 的启动入口
// 注意:api所有逻辑只在api module中进行实现,不要扩散到其他mod fishermen
#[cfg(feature = "restful_api_enable")]
#[launch]
async fn main_launch() -> Rocket<Build> {
let ctx = Context::from_os_args();
if let Err(e) = ctx.check() {
panic!("context check args failed, err: {:?}", e);
}
set_rlimit(ctx.no_file);
set_panic_hook();

// 提前初始化log,避免延迟导致的异常
if let Err(e) = elog::init(ctx.log_dir(), &ctx.log_level) {
panic!("log init failed: {:?}", e);
}

// set env props for api
set_env_props(&ctx);

// 启动api的白名单探测
rt::spawn(api::start_whitelist_refresh(ctx.whitelist_host.clone()));

// 启动核心任务
rt::spawn(async {
if let Err(e) = run(ctx).await {
panic!("start breeze core failed: {:?}", e);
}
});

log::info!("launch with rocket!");

api::routes()
}

async fn run(ctx: Context) -> Result<()> {
let _l = service::listener_for_supervisor(ctx.port()).await?;
elog::init(ctx.log_dir(), &ctx.log_level)?;
// elog::init(ctx.log_dir(), &ctx.log_level)?;
metrics::init_local_ip(&ctx.metrics_probe);

rt::spawn(metrics::Sender::new(
Expand All @@ -44,7 +99,11 @@ async fn run(ctx: Context) -> Result<()> {
Duration::from_secs(10),
));
rt::spawn(metrics::MetricRegister::default());
rt::spawn(discovery::dns::start_dns_resolver_refresher());

// 将dns resolver的初始化放到外层,提前进行,避免并发场景下顺序错乱 fishermen
let dns_resolver = DnsResolver::new();
rt::spawn(discovery::dns::start_dns_resolver_refresher(dns_resolver));

let discovery = discovery::Discovery::from_url(ctx.discovery());
let (tx, rx) = ds::chan::bounded(128);
let snapshot = ctx.snapshot().to_string();
Expand Down Expand Up @@ -105,3 +164,14 @@ fn set_panic_hook() {
log::error!("panic backtrace: {:?}", Backtrace::new())
}));
}

// 设置需要的变量到evns中
pub fn set_env_props(ctx: &Context) {
let sp_name = ctx.service_path();
let path = Path::new(&sp_name);
let base_path = path.parent().unwrap();
props::set_prop("base_path", base_path.to_str().unwrap());

// 设置version
props::set_prop("version", context::get_short_version());
}
3 changes: 3 additions & 0 deletions agent/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ async fn _process_one(
// 监听成功协议计数减1,让监听失败数重新置零
*protocol_metrics.listen_failed() -= 1;

// 记录监听的端口,方便api查询
api::props::add_listener(quard.service().to_string(), quard.address());

log::info!("started. {}", quard);

loop {
Expand Down
27 changes: 27 additions & 0 deletions api/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[package]
name = "api"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
context = { path = "../context" }
metrics = { path = "../metrics" }

log = "0.4.14"
tokio = {version = "1.17.0", features = ["fs"]}

trust-dns-resolver = "0.20.0"

# rocket for restful api
rocket = { version = "0.5.0-rc.2", features = ["json"] }
lazy_static = "1.4.0"

# for serde json
serde = { version = "1.0.126", features = ["derive"] }
serde_derive = "1.0.126"
serde_json = "1.0.65"

redis = { version = "0.17.0", features = ["tokio-comp"] }
memcache = "*"
79 changes: 79 additions & 0 deletions api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
mod meta;
pub mod props;
mod protocol;

use std::{collections::HashSet, time::Duration};

use trust_dns_resolver::{AsyncResolver, TokioConnection, TokioConnectionProvider, TokioHandle};
type Resolver = AsyncResolver<TokioConnection, TokioConnectionProvider>;

use rocket::{Build, Rocket};

#[macro_use]
extern crate rocket;

#[macro_use]
extern crate lazy_static;

use metrics::Path;

const API_PATH: &str = "api";

// 整合所有routers
pub fn routes() -> Rocket<Build> {
let mut rocket = rocket::build();

// 元数据相关routes
rocket = meta::routes(rocket);

// 各种协议 cmd相关routes
protocol::routes(rocket)
}

// 定期刷新白名单域名
pub async fn start_whitelist_refresh(host: String) {
let resolver: Resolver =
AsyncResolver::from_system_conf(TokioHandle).expect("crate api dns resolver");

// 每10分钟刷新一次
let mut tick = tokio::time::interval(Duration::from_secs(10 * 60));
loop {
tick.tick().await;

let mut whitelist = HashSet::with_capacity(2);
match resolver.lookup_ip(host.clone()).await {
Ok(ips) => {
for ip in ips.iter() {
whitelist.insert(ip.to_string());
}
}
Err(err) => {
log::warn!("api - parse whitelist host {} failed: {:?}", host, err);
}
}
if whitelist.len() > 0 {
// 合法域名时,同时将localhost加入,支持本地访问
whitelist.insert("127.0.0.1".to_string());
props::update_whitelist(whitelist);
}
}
}

// 统计
fn qps_incr(name: &'static str) {
let mut opts = Path::new(vec![API_PATH]).qps(name);
opts += 1;
}

// 校验client,并统计接口qps, 当前只检查ip白名单
fn verify_client(client_ip: &String, api_name: &'static str) -> bool {
// 统计qps
qps_incr(api_name);

// 检查白名单
if props::is_in_whitelist(client_ip) {
return true;
}
log::info!("api - found illegal user: {}", client_ip);
false
}
Loading

0 comments on commit a19c08f

Please sign in to comment.