Skip to content

Commit

Permalink
Merge pull request #175 from weibocom/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
FicoHu authored Nov 7, 2022
2 parents 347f29b + 29e7819 commit 325e04d
Show file tree
Hide file tree
Showing 69 changed files with 726 additions and 509 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
ls -all ./target/debug/agent
- name: Run
run: nohup ./target/debug/agent --discovery vintage://127.0.0.1:8080 --snapshot /home/runner/work/breeze/snapshot --service-path /home/runner/work/breeze/socks --log-dir /home/runner/work/breeze/logs --port 9984 --metrics-probe 8.8.8.8:53 --log-level info > /home/runner/work/breeze/logs/log.file 2>&1 &
- name: Check Port
- name: Check Status
run: |
sleep 6s
#netstat -nat|grep LISTEN
Expand All @@ -71,5 +71,5 @@ jobs:
with:
version: '0.15.0'
args: '-v'
- name: Run tests
run: cargo test
#- name: Run tests
# run: cargo test
1 change: 0 additions & 1 deletion agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ backtrace = "0.3.63"
lazy_static = "1.4.0"

tokio = { version = "1.21.2", features = ["rt", "net", "rt-multi-thread", "time"], default-features = false }
mimalloc = { version = "*", default-features = false }
once_cell = "*"

rlimit = "0.8.3"
Expand Down
4 changes: 1 addition & 3 deletions agent/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ pub(super) fn start_http_server(ctx: &context::Context) {
c.log_level = LogLevel::Critical;
c.workers = 4;
let mut rocket = rocket::custom(c);
{
rocket = crate::console::init_routes(rocket, ctx);
}
rocket = crate::console::init_routes(rocket, ctx);
rocket = crate::prometheus::init_routes(rocket);
//rocket = rocket.attach(rocket_async_compression::Compression::fairing());
rt::spawn(async {
Expand Down
6 changes: 3 additions & 3 deletions agent/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use mimalloc::MiMalloc;
use ds::BrzMalloc;
#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
static GLOBAL: BrzMalloc = BrzMalloc {};

#[macro_use]
extern crate rocket;
Expand All @@ -13,7 +13,7 @@ use discovery::*;
mod init;

use rt::spawn;
use std::time::Duration;
use ds::time::Duration;

use protocol::Result;

Expand Down
4 changes: 2 additions & 2 deletions agent/src/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub struct PrometheusMetricsResponse {}

use ds::lock::Lock;
use lazy_static::lazy_static;
use std::time::Instant;
use ds::time::Instant;

lazy_static! {
static ref LAST: Lock<Instant> = Instant::now().into();
Expand Down Expand Up @@ -71,7 +71,7 @@ pub(crate) fn register_target(ctx: &context::Context) {
port
);
let client = reqwest::Client::new();
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
let mut interval = tokio::time::interval(ds::time::Duration::from_secs(60));
let mut q = vec![("refresh", true)];
loop {
let body = body.clone();
Expand Down
8 changes: 3 additions & 5 deletions agent/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@ use context::Quadruple;
use net::Listener;
use rt::spawn;
use std::sync::Arc;
use std::time::Duration;
use ds::time::Duration;

use discovery::TopologyWriteGuard;
use ds::chan::Sender;
use metrics::Path;
use protocol::{Parser, Result};
use stream::pipeline::copy_bidirectional;
use stream::Builder;
use stream::StreamMetrics;
use stream::{Backend, Builder, Request, StreamMetrics};

use stream::Request;
type Endpoint = Arc<stream::Backend<Request>>;
type Endpoint = Arc<Backend<Request>>;
type Topology = endpoint::Topology<Builder<Parser, Request>, Endpoint, Request, Parser>;
// 一直侦听,直到成功侦听或者取消侦听(当前尚未支持取消侦听)
// 1. 尝试侦听之前,先确保服务配置信息已经更新完成
Expand Down
1 change: 1 addition & 0 deletions context/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2021"

[dependencies]
log = { path = "../log" }
ds = { path = "../ds" }

clap = {version = "3.1.1", features = ["derive"] }
url = "2.2.2"
Expand Down
9 changes: 5 additions & 4 deletions context/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,12 @@ lazy_static! {
let fields:Vec<&str> = full.split('-').collect();
let len = fields.len();
let last = *fields.get(len-1).unwrap_or(&"");
let build = if cfg!(debug_assertions) { "_debug" } else { "" };
if last == "modified" {
let second_last = fields.get(len-2).unwrap_or(&"");
format!("{}_{}", second_last, last)
format!("{}_{}{}", second_last, last, build)
} else {
last.to_string()
last.to_string() + build
}
};
static ref CONTEXT: Context = {
Expand Down Expand Up @@ -133,9 +134,9 @@ impl ContextOption {
Ok(())
}

pub fn tick(&self) -> std::time::Duration {
pub fn tick(&self) -> ds::time::Duration {
assert!(self.tick_sec >= 1 && self.tick_sec <= 60);
std::time::Duration::from_secs(self.tick_sec as u64)
ds::time::Duration::from_secs(self.tick_sec as u64)
}
// 如果是以升级模式启动,则会将原有的端口先关闭。
pub fn listeners(&self) -> ListenerIter {
Expand Down
2 changes: 1 addition & 1 deletion context/src/quadruple.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::path::Path;
use std::time::{Duration, Instant};
use ds::time::{Duration, Instant};
#[derive(Debug, Clone, Eq)]
pub struct Quadruple {
parsed_at: Instant,
Expand Down
2 changes: 1 addition & 1 deletion discovery/src/cfg.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::io::{Error, ErrorKind, Result};
use std::path::PathBuf;
use std::time::{Duration, Instant};
use ds::time::{Duration, Instant};

use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
Expand Down
2 changes: 1 addition & 1 deletion discovery/src/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ pub async fn start_dns_resolver_refresher() {
use std::task::Poll;
let noop = noop_waker::noop_waker();
let mut ctx = std::task::Context::from_waker(&noop);
use std::time::{Duration, Instant};
use ds::time::{Duration, Instant};
const CYCLE: Duration = Duration::from_secs(79);
let mut tick = tokio::time::interval(Duration::from_secs(1));
let mut last = Instant::now(); // 上一次刷新的时间
Expand Down
2 changes: 1 addition & 1 deletion discovery/src/update.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// 定期更新discovery.
use super::{Discover, ServiceId, TopologyWrite};
use ds::chan::Receiver;
use std::time::{Duration, Instant};
use ds::time::{Duration, Instant};
use tokio::time::interval;

use crate::cache::DiscoveryCache;
Expand Down
6 changes: 6 additions & 0 deletions ds/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,9 @@ tokio = { version = "1.21.2", features = ["rt", "net", "rt-multi-thread", "time"
byteorder = "1.4.3"
rand = "0.8.4"
atomic-waker = "*"
mimalloc = { version = "*", default-features = false }
cache-padded = "1.2.0"

[features]
default = ["heap-stats"]
heap-stats = []
10 changes: 10 additions & 0 deletions ds/src/asserts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#[macro_export]
macro_rules! assert {
($cond:expr $(,)?) => {{ std::assert!($cond $(,)?) }};
($cond:expr, $($arg:tt)+) => {{ std::assert!($cond, $($arg)+) }};
}
#[macro_export]
macro_rules! assert_eq {
($cond:expr $(,)?) => {{ std::assert!($cond $(,)?) }};
($cond:expr, $($arg:tt)+) => {{ std::assert!($cond, $($arg)+) }};
}
5 changes: 5 additions & 0 deletions ds/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ pub use switcher::Switcher;
pub use utf8::*;
pub use waker::AtomicWaker;

pub mod time;

mod asserts;
pub use asserts::*;

pub const NUM_STR_TBL: [&'static str; 32] = [
"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16",
"17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29", "30", "31",
Expand Down
2 changes: 1 addition & 1 deletion ds/src/mem/guarded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl MemGuard {
unsafe { assert_eq!((&*guard).load(Ordering::Acquire), 0) };
Self {
mem: data,
guard: guard,
guard,
cap: 0,
}
}
Expand Down
67 changes: 67 additions & 0 deletions ds/src/mem/malloc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
pub struct HeapStats {
pub total: usize,
pub used: usize,
pub total_objects: usize,
pub used_objects: usize,
}
pub use inner::*;
#[cfg(feature = "heap-stats")]
mod inner {
use cache_padded::CachePadded;
use mimalloc::MiMalloc;
use std::alloc::{GlobalAlloc, Layout};
use std::sync::atomic::{AtomicU64, Ordering::*};
static ALLOC: CachePadded<AtomicU64> = CachePadded::new(AtomicU64::new(0));
static FREE: CachePadded<AtomicU64> = CachePadded::new(AtomicU64::new(0));
static ALLOC_OBJ: CachePadded<AtomicU64> = CachePadded::new(AtomicU64::new(0));
static FREE_OBJ: CachePadded<AtomicU64> = CachePadded::new(AtomicU64::new(0));

struct Stats;
impl Stats {
#[inline(always)]
fn alloc(&self, size: usize) {
ALLOC.fetch_add(size as u64, Relaxed);
ALLOC_OBJ.fetch_add(1, Relaxed);
}
#[inline(always)]
fn free(&self, size: usize) {
FREE.fetch_add(size as u64, Relaxed);
FREE_OBJ.fetch_add(1, Relaxed);
}
}
pub struct BrzMalloc;
unsafe impl GlobalAlloc for BrzMalloc {
#[inline]
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
Stats.alloc(layout.size());
MiMalloc.alloc(layout)
}

#[inline]
unsafe fn dealloc(&self, ptr: *mut u8, _layout: Layout) {
Stats.free(_layout.size());
MiMalloc.dealloc(ptr, _layout)
}
}

pub fn heap() -> Option<super::HeapStats> {
let alloc = ALLOC.load(Relaxed);
let free = FREE.load(Relaxed);
let alloc_objects = ALLOC_OBJ.load(Relaxed);
let free_objects = FREE_OBJ.load(Relaxed);
Some(super::HeapStats {
total: alloc as usize,
used: (alloc - free) as usize,
total_objects: alloc_objects as usize,
used_objects: (alloc_objects - free_objects) as usize,
})
}
}
#[cfg(not(feature = "heap-stats"))]
mod inner {
pub struct HeapStats;
pub type BrzMalloc = mimalloc::MiMalloc;
pub fn heap() -> Option<super::HeapStats> {
None
}
}
3 changes: 3 additions & 0 deletions ds/src/mem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ pub use guarded::*;

mod policy;
pub use policy::*;

mod malloc;
pub use malloc::*;
Loading

0 comments on commit 325e04d

Please sign in to comment.