Skip to content

Commit

Permalink
add max concurrency, some refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
blind-oracle committed Jun 26, 2024
1 parent e2cb81f commit 66c5db5
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 32 deletions.
14 changes: 10 additions & 4 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,14 +368,15 @@ pub struct Vector {
pub log_vector_pass: Option<String>,

/// Vector batch size
#[clap(env, long, default_value = "25000")]
#[clap(env, long, default_value = "50000")]
pub log_vector_batch: usize,

/// Vector batch flush interval
#[clap(env, long, default_value = "5s", value_parser = parse_duration)]
pub log_vector_interval: Duration,

/// Vector buffer size to account for ingest problems
/// Vector buffer size (in number of events) to account for ingest problems.
/// If the buffer is full then new events will be dropped.
#[clap(env, long, default_value = "131072")]
pub log_vector_buffer: usize,
}
Expand All @@ -393,6 +394,11 @@ pub struct Load {
/// It tries to keep the request latency less than this.
#[clap(env, long, default_value = "1500ms", value_parser = parse_duration)]
pub load_shed_target_latency: Duration,

/// Maximum number of concurrent requests to process.
/// If more are coming in - they will be throttled.
#[clap(env, long)]
pub load_max_concurrency: Option<usize>,
}

#[derive(Args)]
Expand All @@ -410,8 +416,8 @@ pub struct Misc {
#[clap(env, long)]
pub geoip_db: Option<PathBuf>,

/// Number of Tokio threads to use to serve requests
/// Default to the number of CPUs
/// Number of Tokio threads to use to serve requests.
/// Defaults to the number of CPUs
#[clap(env, long)]
pub threads: Option<usize>,
}
Expand Down
9 changes: 8 additions & 1 deletion src/core.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::{Arc, OnceLock};

use anyhow::{anyhow, Context, Error};
use axum::Router;
Expand All @@ -18,7 +18,14 @@ use crate::{
pub const SERVICE_NAME: &str = "ic_gateway";
pub const AUTHOR_NAME: &str = "Boundary Node Team <[email protected]>";

// Store env/hostname in statics so that we don't have to clone them
pub static ENV: OnceLock<String> = OnceLock::new();
pub static HOSTNAME: OnceLock<String> = OnceLock::new();

pub async fn main(cli: &Cli) -> Result<(), Error> {
ENV.set(cli.misc.env.clone()).unwrap();
HOSTNAME.set(cli.misc.hostname.clone()).unwrap();

// Make a list of all supported domains
let mut domains = cli.domain.domain.clone();
domains.extend_from_slice(&cli.domain.domain_system);
Expand Down
4 changes: 2 additions & 2 deletions src/metrics/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use crate::cli;

#[derive(clickhouse::Row, Serialize, Deserialize)]
pub struct Row {
pub env: String,
pub hostname: String,
pub env: &'static str,
pub hostname: &'static str,
#[serde(with = "clickhouse::serde::time::datetime")]
pub date: time::OffsetDateTime,
#[serde(with = "clickhouse::serde::uuid")]
Expand Down
16 changes: 5 additions & 11 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use tower_http::compression::CompressionLayer;
use tracing::info;

use crate::{
core::{ENV, HOSTNAME},
http::{
calc_headers_size, http_method, http_version,
server::{ConnInfo, TlsInfo},
Expand Down Expand Up @@ -82,9 +83,6 @@ pub fn setup(

#[derive(Clone)]
pub struct HttpMetrics {
pub env: String,
pub hostname: String,

pub requests: IntCounterVec,
pub duration: HistogramVec,
pub duration_full: HistogramVec,
Expand All @@ -98,8 +96,6 @@ pub struct HttpMetrics {
impl HttpMetrics {
pub fn new(
registry: &Registry,
env: String,
hostname: String,
clickhouse: Option<Arc<Clickhouse>>,
vector: Option<Arc<Vector>>,
) -> Self {
Expand All @@ -115,8 +111,6 @@ impl HttpMetrics {
];

Self {
env,
hostname,
clickhouse,
vector,

Expand Down Expand Up @@ -357,8 +351,8 @@ pub async fn middleware(
let meta = meta.clone();

let row = Row {
env: state.env.clone(),
hostname: state.hostname.clone(),
env: ENV.get().unwrap().as_str(),
hostname: HOSTNAME.get().unwrap().as_str(),
date: timestamp,
request_id: request_id.0,
conn_id: conn_info.id,
Expand Down Expand Up @@ -401,8 +395,8 @@ pub async fn middleware(

if let Some(v) = &state.vector {
let val = json!({
"env": state.env.clone(),
"hostname": state.hostname.clone(),
"env": ENV.get().unwrap().as_str(),
"hostname": HOSTNAME.get().unwrap().as_str(),
"date": timestamp.unix_timestamp(),
"request_id": request_id.to_string(),
"conn_id": conn_info.id.to_string(),
Expand Down
18 changes: 12 additions & 6 deletions src/metrics/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{cli, http};
#[allow(clippy::declare_interior_mutable_const)]
const CONTENT_TYPE_OCTET_STREAM: HeaderValue = HeaderValue::from_static("application/octet-stream");

// Encodes Vector events into a native format with length delimiting
/// Encodes Vector events into a native format with length delimiting
#[derive(Clone)]
struct EventEncoder {
framer: LengthDelimitedCodec,
Expand All @@ -41,6 +41,7 @@ impl EventEncoder {
}

// Encodes the event into provided buffer and adds framing
#[inline(always)]
fn encode_event(&mut self, event: Event, buf: &mut BytesMut) -> Result<(), Error> {
// Serialize
let len = buf.len();
Expand Down Expand Up @@ -80,6 +81,7 @@ impl Vector {
let (tx, rx) = channel(cli.log_vector_buffer);
let token = CancellationToken::new();

// Prepare auth header
let auth = cli
.log_vector_user
.map(|x| http::client::basic_auth(x, cli.log_vector_pass));
Expand Down Expand Up @@ -129,7 +131,7 @@ struct VectorActor {
}

impl VectorActor {
async fn buffer_event(&mut self, event: Event) -> Result<(), Error> {
async fn add_to_batch(&mut self, event: Event) -> Result<(), Error> {
self.batch.push(event);

if self.batch.len() == self.batch.capacity() {
Expand Down Expand Up @@ -173,13 +175,15 @@ impl VectorActor {
return Ok(());
}

// Encode the batch
let mut encoder = self.encoder.clone();
let body = encoder
.encode_batch(&mut self.batch)
.context("unable to encode batch")?;

// Retry until we succeed or token is cancelled
let mut interval = interval(Duration::from_secs(3));
// TODO make configurable
let mut interval = interval(Duration::from_secs(1));
let mut retries = 3;
let drain = self.token.is_cancelled();

Expand All @@ -188,6 +192,7 @@ impl VectorActor {
biased;

_ = interval.tick() => {
// Bytes is cheap to clone
if let Err(e) = self.send(body.clone()).await {
warn!("Vector: unable to flush batch: {e:#}");

Expand All @@ -205,6 +210,7 @@ impl VectorActor {
return Ok(())
}

// If we're draining then the token is already cancelled, so exclude this branch
() = self.token.cancelled(), if !drain => {
warn!("Vector: exiting, aborting batch sending");
return Ok(());
Expand All @@ -228,9 +234,9 @@ impl VectorActor {

// Drain the buffer
while let Some(v) = self.rx.recv().await {
if let Err(e) = self.buffer_event(v).await {
if let Err(e) = self.add_to_batch(v).await {
warn!("Vector: unable to drain: {e:#}");
return;
break;
}
}

Expand All @@ -250,7 +256,7 @@ impl VectorActor {

event = self.rx.recv() => {
if let Some(v) = event {
if let Err(e) = self.buffer_event(v).await {
if let Err(e) = self.add_to_batch(v).await {
warn!("Vector: unable to flush: {e:#}");
}
}
Expand Down
18 changes: 10 additions & 8 deletions src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use little_loadshedder::{LoadShedLayer, LoadShedResponse};
use middleware::cache::{self, Cache};
use prometheus::Registry;
use strum::{Display, IntoStaticStr};
use tower::{util::MapResponseLayer, ServiceBuilder, ServiceExt};
use tower::{limit::ConcurrencyLimitLayer, util::MapResponseLayer, ServiceBuilder, ServiceExt};

use crate::{
cli::Cli,
Expand Down Expand Up @@ -174,16 +174,17 @@ pub fn setup_router(

// Metrics
let metrics_mw = from_fn_with_state(
Arc::new(metrics::HttpMetrics::new(
registry,
cli.misc.env.clone(),
cli.misc.hostname.clone(),
clickhouse,
vector,
)),
Arc::new(metrics::HttpMetrics::new(registry, clickhouse, vector)),
metrics::middleware,
);

// Concurrency
let concurrency_limit_mw = option_layer(
cli.load
.load_max_concurrency
.map(ConcurrencyLimitLayer::new),
);

// Load shedder
let load_shedder_mw = option_layer(cli.load.load_shed_ewma_param.map(|x| {
ServiceBuilder::new()
Expand Down Expand Up @@ -310,6 +311,7 @@ pub fn setup_router(
.layer(from_fn(request_id::middleware))
.layer(from_fn(headers::middleware))
.layer(metrics_mw)
.layer(concurrency_limit_mw)
.layer(geoip_mw)
.layer(load_shedder_mw)
.layer(from_fn_with_state(domain_resolver, validate::middleware));
Expand Down

0 comments on commit 66c5db5

Please sign in to comment.