Skip to content

Commit

Permalink
Metrics (#205)
Browse files Browse the repository at this point in the history
* fix tests

* started implementing metrics

* fmt

* fix fmt

* added metrics

* metrics

* added metrics endpoints

* fix result

* cleanup

* make listen address optional

* split http port from metrics

* bump version
  • Loading branch information
lassemand authored Sep 11, 2024
1 parent febb822 commit bd092db
Show file tree
Hide file tree
Showing 4 changed files with 390 additions and 142 deletions.
121 changes: 120 additions & 1 deletion notification-server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion notification-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
authors = ["Concordium AG [email protected]"]
edition = "2021"
name = "notification-server"
version = "0.2.4"
version = "0.2.5"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0"
axum = "0.7"
axum-prometheus = "0.7"
backoff = { version = "0.4", features = ["tokio"] }
bytes = "1.6"
clap = { version = "4.5", features = ["derive", "env"] }
Expand Down
79 changes: 75 additions & 4 deletions notification-server/src/bin/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ use axum::{
extract::{Json, State},
http::StatusCode,
response::IntoResponse,
routing::put,
routing::{get, put},
Router,
};
use axum_prometheus::{
metrics_exporter_prometheus::PrometheusHandle, GenericMetricLayer, Handle,
PrometheusMetricLayerBuilder,
};
use backoff::ExponentialBackoff;
use clap::Parser;
use concordium_rust_sdk::base::contracts_common::AccountAddress;
Expand Down Expand Up @@ -39,7 +43,11 @@ struct Args {
default_value = "0.0.0.0:3030"
)]
listen_address: std::net::SocketAddr,
/// Logging level of the application
#[arg(
long = "prometheus-address",
env = "NOTIFICATION_SERVER_PROMETHEUS_ADDRESS"
)]
prometheus_address: Option<std::net::SocketAddr>,
#[arg(long = "log-level", default_value_t = log::LevelFilter::Info)]
log_level: log::LevelFilter,
#[arg(
Expand Down Expand Up @@ -91,6 +99,35 @@ lazy_static! {
static ref MAX_PREFERENCES_LENGTH: usize = all::<Preference>().collect::<Vec<_>>().len();
}

fn setup_prometheus(
prometheus_address: std::net::SocketAddr,
) -> (
GenericMetricLayer<'static, PrometheusHandle, Handle>,
tokio::task::JoinHandle<Result<(), anyhow::Error>>,
) {
let (prometheus_layer, metric_handle) = PrometheusMetricLayerBuilder::new()
.with_prefix("notification_server")
.with_default_metrics()
.build_pair();
let prometheus_api =
Router::new().route("/metrics", get(|| async move { metric_handle.render() }));

let prometheus_handle = tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(prometheus_address)
.await
.with_context(|| {
format!(
"Could not create tcp listener on address: {}",
prometheus_address
)
})?;
axum::serve(listener, prometheus_api)
.await
.context("Prometheus server has shut down")
});
(prometheus_layer, prometheus_handle)
}

/// Processes a device subscription by validating and updating the device's
/// accounts.
///
Expand Down Expand Up @@ -259,7 +296,41 @@ async fn main() -> anyhow::Result<()> {
let app = Router::new()
.route("/api/v1/subscription", put(upsert_account_device))
.with_state(app_state);
let listener = tokio::net::TcpListener::bind(args.listen_address).await?;
axum::serve(listener, app).await?;

let (app, prometheus_handle) = if let Some(prometheus_address) = args.prometheus_address {
let (prometheus_layer, prometheus_handle) = setup_prometheus(prometheus_address);
(app.layer(prometheus_layer), Some(prometheus_handle))
} else {
(app, None)
};

let listen_address = args.listen_address;
let http_handle = tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(listen_address)
.await
.with_context(|| {
format!(
"Could not create tcp listener on address: {}",
listen_address
)
})?;

info!("Listening for requests at {}", listen_address);
axum::serve(listener, app)
.await
.context("HTTP server has shut down")
});
if let Some(prometheus_handle) = prometheus_handle {
tokio::select! {
result = prometheus_handle => {
result.context("Prometheus task panicked")??;
},
result = http_handle => {
result??;
}
}
} else {
http_handle.await??;
}
Ok(())
}
Loading

0 comments on commit bd092db

Please sign in to comment.