Skip to content

Commit

Permalink
Switch to upstream launchdarkly sdk
Browse files Browse the repository at this point in the history
Switch to the upstream launchdarkly-server-sdk crate instead of our own
fork. This comes at the expense of losing some metrics that we added to
our crate but never attempted to upstream. It is not easy to replicate
the metrics without creating our own wrapper of the http library (or
continue to maintain our own fork). I'd rather lose the metrics over
continuing to maintain our own fork because the maintenance will cost time.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Feb 7, 2025
1 parent 2f15d55 commit 786ab4e
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 204 deletions.
289 changes: 149 additions & 140 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 0 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -306,12 +306,6 @@ serde-value = { git = "https://github.com/MaterializeInc/serde-value.git" }
# upstream.
tracing-opentelemetry = { git = "https://github.com/MaterializeInc/tracing-opentelemetry.git" }

# Waiting on https://github.com/launchdarkly/rust-server-sdk/pull/20 to make
# it into a release.
# Also bumps lru to 0.12.0
# Needs a more complex update to 2.0.0, with a custom TLS connector.
launchdarkly-server-sdk = { git = "https://github.com/MaterializeInc/rust-server-sdk", rev = "87c0f67aa51fa936d3a6a49605e8e3f565d15281" }

# Waiting on https://github.com/edenhill/librdkafka/pull/4051.
rdkafka = { git = "https://github.com/MaterializeInc/rust-rdkafka.git" }
rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka.git" }
Expand Down
6 changes: 3 additions & 3 deletions src/adapter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ fail = { version = "0.5.1", features = ["failpoints"] }
futures = "0.3.25"
governor = "0.6.0"
hex = "0.4.3"
hyper = { version = "0.14.32", features = ["client", "http1"] }
hyper-tls = "0.5.0"
http = "1.1.0"
ipnet = "2.5.0"
itertools = "0.12.1"
launchdarkly-server-sdk = { version = "1.0.0", default-features = false, features = [
"hypertls",
] }
launchdarkly-server-sdk = { version = "2.4.1", default-features = false }
maplit = "1.0.2"
mz-adapter-types = { path = "../adapter-types" }
mz-audit-log = { path = "../audit-log" }
Expand Down
17 changes: 1 addition & 16 deletions src/adapter/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ use std::collections::BTreeMap;

use mz_build_info::BuildInfo;
use mz_ore::metric;
use mz_ore::metrics::{MetricsRegistry, UIntGauge};
use mz_ore::now::NowFn;
use mz_ore::metrics::MetricsRegistry;
use mz_sql::catalog::EnvironmentId;
use prometheus::IntCounter;

Expand All @@ -35,8 +34,6 @@ pub struct SystemParameterSyncConfig {
build_info: &'static BuildInfo,
/// Parameter sync metrics.
metrics: Metrics,
/// Function to return the current time.
now_fn: NowFn,
/// The SDK key.
ld_sdk_key: String,
/// A map from parameter names to LaunchDarkly feature keys
Expand All @@ -51,15 +48,13 @@ impl SystemParameterSyncConfig {
env_id: EnvironmentId,
build_info: &'static BuildInfo,
registry: &MetricsRegistry,
now_fn: NowFn,
ld_sdk_key: String,
ld_key_map: BTreeMap<String, String>,
) -> Self {
Self {
env_id,
build_info,
metrics: Metrics::register_into(registry),
now_fn,
ld_sdk_key,
ld_key_map,
}
Expand All @@ -68,22 +63,12 @@ impl SystemParameterSyncConfig {

#[derive(Debug, Clone)]
pub(super) struct Metrics {
pub last_cse_time_seconds: UIntGauge,
pub last_sse_time_seconds: UIntGauge,
pub params_changed: IntCounter,
}

impl Metrics {
pub(super) fn register_into(registry: &MetricsRegistry) -> Self {
Self {
last_cse_time_seconds: registry.register(metric!(
name: "mz_parameter_frontend_last_cse_time_seconds",
help: "The last known time when the LaunchDarkly client sent an event to the LaunchDarkly server (as unix timestamp).",
)),
last_sse_time_seconds: registry.register(metric!(
name: "mz_parameter_frontend_last_sse_time_seconds",
help: "The last known time when the LaunchDarkly client received an event from the LaunchDarkly server (as unix timestamp).",
)),
params_changed: registry.register(metric!(
name: "mz_parameter_frontend_params_changed",
help: "The number of parameter changes pulled from the LaunchDarkly frontend.",
Expand Down
42 changes: 11 additions & 31 deletions src/adapter/src/config/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,13 @@
// by the Apache License, Version 2.0.

use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;

use derivative::Derivative;
use launchdarkly_server_sdk as ld;
use mz_build_info::BuildInfo;
use mz_cloud_provider::CloudProvider;
use mz_ore::now::NowFn;
use mz_sql::catalog::EnvironmentId;
use tokio::time;

use crate::config::{Metrics, SynchronizedParameters, SystemParameterSyncConfig};

Expand All @@ -37,8 +34,6 @@ pub struct SystemParameterFrontend {
ld_key_map: BTreeMap<String, String>,
/// Frontend metrics.
ld_metrics: Metrics,
/// Function to return the current time.
now_fn: NowFn,
}

impl SystemParameterFrontend {
Expand All @@ -53,7 +48,6 @@ impl SystemParameterFrontend {
ld_ctx: ld_ctx(&sync_config.env_id, sync_config.build_info)?,
ld_key_map: sync_config.ld_key_map.clone(),
ld_metrics: sync_config.metrics.clone(),
now_fn: sync_config.now_fn.clone(),
})
}

Expand Down Expand Up @@ -90,43 +84,29 @@ impl SystemParameterFrontend {
}
}

fn ld_config(sync_config: &SystemParameterSyncConfig) -> ld::Config {
fn ld_config(sync_config: &SystemParameterSyncConfig) -> Result<ld::Config, ld::ConfigBuildError> {
ld::ConfigBuilder::new(&sync_config.ld_sdk_key)
.event_processor(ld::EventProcessorBuilder::new().on_success({
let last_cse_time_seconds = sync_config.metrics.last_cse_time_seconds.clone();
Arc::new(move |result| {
if let Ok(ts) = u64::try_from(result.time_from_server / 1000) {
last_cse_time_seconds.set(ts);
} else {
tracing::warn!("Cannot convert time_from_server / 1000 from u128 to u64");
}
})
}))
.event_processor(
ld::EventProcessorBuilder::new().https_connector(hyper_tls::HttpsConnector::new()),
)
.data_source(&ld::StreamingDataSourceBuilder::<
hyper_tls::HttpsConnector<hyper::client::HttpConnector>,
>::new())
.build()
}

async fn ld_client(sync_config: &SystemParameterSyncConfig) -> Result<ld::Client, anyhow::Error> {
let ld_client = ld::Client::build(ld_config(sync_config))?;
let ld_client = ld::Client::build(ld_config(sync_config)?)?;

tracing::info!("waiting for SystemParameterFrontend to initialize");

// Start and initialize LD client for the frontend. The callback passed
// will export the last time when an SSE event from the LD server was
// received in a Prometheus metric.
ld_client.start_with_default_executor_and_callback({
let last_sse_time_seconds = sync_config.metrics.last_sse_time_seconds.clone();
let now_fn = sync_config.now_fn.clone();
Arc::new(move |_ev| {
let ts = now_fn() / 1000;
last_sse_time_seconds.set(ts);
})
});
// Start and initialize LD client for the frontend.
ld_client.start_with_default_executor();

let max_backoff = Duration::from_secs(60);
let mut backoff = Duration::from_secs(5);
while !ld_client.initialized_async().await {
while ld_client.wait_for_initialization(backoff).await != Some(true) {
tracing::warn!("SystemParameterFrontend failed to initialize");
time::sleep(backoff).await;
backoff = (backoff * 2).min(max_backoff);
}

Expand Down
2 changes: 1 addition & 1 deletion src/balancerd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ hyper = { version = "1.4.1", features = ["http1", "server"] }
hyper-openssl = "0.10.2"
hyper-util = "0.1.6"
jsonwebtoken = "9.2.0"
launchdarkly-server-sdk = { version = "1.0.0", default-features = false }
launchdarkly-server-sdk = { version = "2.4.1", default-features = false }
mz-alloc = { path = "../alloc" }
mz-alloc-default = { path = "../alloc-default", optional = true }
mz-build-info = { path = "../build-info" }
Expand Down
4 changes: 1 addition & 3 deletions src/dyncfg-launchdarkly/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ workspace = true
[dependencies]
anyhow = { version = "1.0.66", features = ["backtrace"] }
humantime = "2.1.0"
launchdarkly-server-sdk = { version = "1.0.0", default-features = false, features = [
"hypertls",
] }
launchdarkly-server-sdk = { version = "2.4.1", default-features = false }
mz-build-info = { path = "../build-info" }
mz-dyncfg = { path = "../dyncfg" }
mz-ore = { path = "../ore", default-features = false }
Expand Down
5 changes: 2 additions & 3 deletions src/dyncfg-launchdarkly/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,13 @@ where
let _ = dyn_into_flag(entry.val())?;
}
let ld_client = if let Some(key) = launchdarkly_sdk_key {
let client = ld::Client::build(ld::ConfigBuilder::new(key).build())?;
let client = ld::Client::build(ld::ConfigBuilder::new(key).build()?)?;
client.start_with_default_executor();
let init = async {
let max_backoff = Duration::from_secs(60);
let mut backoff = Duration::from_secs(5);
while !client.initialized_async().await {
while client.wait_for_initialization(backoff).await != Some(true) {
tracing::warn!("SyncedConfigSet failed to initialize");
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(max_backoff);
}
};
Expand Down
1 change: 0 additions & 1 deletion src/environmentd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,6 @@ impl Listeners {
config.environment_id.clone(),
&BUILD_INFO,
&config.metrics_registry,
config.now.clone(),
ld_sdk_key,
config.launchdarkly_key_map,
))
Expand Down

0 comments on commit 786ab4e

Please sign in to comment.