From d24a08c647ffc7a0ef464fe4d8fcbb4cc7c78f18 Mon Sep 17 00:00:00 2001 From: SantiagoPittella Date: Wed, 27 Nov 2024 16:34:37 -0300 Subject: [PATCH 1/4] feat: dynamic creation and removal of workers feat: dynamic creation and removal of workers fix: don't allow worker duplication, don't panic on invalid action --- Cargo.lock | 12 +++++++ bin/tx-prover/Cargo.toml | 1 + bin/tx-prover/src/commands/mod.rs | 7 ++++ bin/tx-prover/src/proxy/mod.rs | 55 ++++++++++++++++++++++++++++--- 4 files changed, 70 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 52797f843..0984137c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1734,6 +1734,7 @@ dependencies = [ "prost-build", "protox", "serde", + "serde_qs", "tokio", "tokio-stream", "toml", @@ -2887,6 +2888,17 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_qs" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd34f36fe4c5ba9654417139a9b3a20d2e1de6012ee678ad14d240c22c78d8d6" +dependencies = [ + "percent-encoding", + "serde", + "thiserror 1.0.69", +] + [[package]] name = "serde_spanned" version = "0.6.8" diff --git a/bin/tx-prover/Cargo.toml b/bin/tx-prover/Cargo.toml index ac1e34efd..2b3fa2544 100644 --- a/bin/tx-prover/Cargo.toml +++ b/bin/tx-prover/Cargo.toml @@ -49,6 +49,7 @@ miden-objects = { workspace = true, default-features = false } miden-tx = { workspace = true, default-features = false } prost = { version = "0.13", default-features = false, features = ["derive"] } serde = { version = "1.0", features = ["derive"] } +serde_qs = { version = "0.13" } tokio = { version = "1.38", optional = true, features = ["full"] } tokio-stream = { version = "0.1", optional = true, features = [ "net" ]} toml = { version = "0.8" } diff --git a/bin/tx-prover/src/commands/mod.rs b/bin/tx-prover/src/commands/mod.rs index 047e7d851..22f8ac865 100644 --- a/bin/tx-prover/src/commands/mod.rs +++ b/bin/tx-prover/src/commands/mod.rs @@ -109,3 +109,10 @@ impl Cli { } } } + +#[derive(Debug, PartialEq, Deserialize, Serialize, Parser)] +pub struct UpdateWorkers { + // Add validations, actions should be "add" or "remove" + pub action: String, + pub workers: Vec, +} diff --git a/bin/tx-prover/src/proxy/mod.rs b/bin/tx-prover/src/proxy/mod.rs index c60b6702a..bdef5ba85 100644 --- a/bin/tx-prover/src/proxy/mod.rs +++ b/bin/tx-prover/src/proxy/mod.rs @@ -11,11 +11,11 @@ use pingora_core::{upstreams::peer::HttpPeer, Result}; use pingora_limits::rate::Rate; use pingora_proxy::{ProxyHttp, Session}; use tokio::sync::RwLock; -use tracing::{error, info}; +use tracing::{error, info, warn}; use uuid::Uuid; use crate::{ - commands::ProxyConfig, + commands::{ProxyConfig, UpdateWorkers}, utils::{create_queue_full_response, create_too_many_requests_response}, }; @@ -66,6 +66,33 @@ impl LoadBalancer { info!("Worker {} is now available", worker.addr); available_workers.push(worker); } + + pub async fn update_workers(&self, update_workers: UpdateWorkers) { + let mut available_workers = self.available_workers.write().await; + info!("Current workers: {:?}", available_workers); + + let action = update_workers.action.as_str(); + let workers: Vec<_> = update_workers + .workers + .iter() + .map(|worker| Backend::new(worker)) + .collect::>() + .expect("Failed to create backend"); + + match action { + "add" => { + for worker in workers { + if !available_workers.contains(&worker) { + available_workers.push(worker); + } + } + }, + "remove" => available_workers.retain(|w| !workers.contains(w)), + _ => warn!("Invalid action: {}", action), + } + + info!("Workers updated: {:?}", available_workers); + } } /// Rate limiter @@ -179,7 +206,24 @@ impl ProxyHttp for LoadBalancer { Self::CTX: Send + Sync, { let client_addr = session.client_addr(); - let user_id = client_addr.map(|addr| addr.to_string()); + + // Extract the address string early to drop the reference to session + let client_addr_str = client_addr.expect("No socket address").to_string(); + + info!("Client address: {:?}", client_addr_str); + + if client_addr_str.contains("127.0.0.1") { + let session = session.as_downstream_mut(); + session.read_request().await?; + if let Some(query_params) = session.req_header().as_ref().uri.query() { + let update_workers: UpdateWorkers = + serde_qs::from_str(query_params).expect("Failed to parse query params"); + self.update_workers(update_workers).await; + return Ok(true); + }; + }; + + let user_id = Some(client_addr_str); // Retrieve the current window requests let curr_window_requests = RATE_LIMITER.observe(&user_id, 1); @@ -315,7 +359,8 @@ impl ProxyHttp for LoadBalancer { } // Mark the worker as available - self.add_available_worker(ctx.worker.take().expect("Failed to get worker")) - .await; + if let Some(worker) = ctx.worker.take() { + self.add_available_worker(worker).await; + } } } From e8cdc64fb10cbd6e61103274046f219cce7525d6 Mon Sep 17 00:00:00 2001 From: SantiagoPittella Date: Wed, 27 Nov 2024 18:44:38 -0300 Subject: [PATCH 2/4] feat: add command to update workers feat: add command to add/remove workers improve update workers response improve error handling add status for workers docs: update README --- Cargo.lock | 911 +++++++++++++++---- bin/tx-prover/Cargo.toml | 1 + bin/tx-prover/README.md | 21 + bin/tx-prover/src/commands/mod.rs | 63 +- bin/tx-prover/src/commands/proxy.rs | 4 +- bin/tx-prover/src/commands/update_workers.rs | 64 ++ bin/tx-prover/src/proxy/mod.rs | 214 ++++- bin/tx-prover/src/utils.rs | 43 +- 8 files changed, 1097 insertions(+), 224 deletions(-) create mode 100644 bin/tx-prover/src/commands/update_workers.rs diff --git a/Cargo.lock b/Cargo.lock index 0984137c2..5ffd44176 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -56,9 +56,9 @@ dependencies = [ [[package]] name = "allocator-api2" -version = "0.2.20" +version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45862d1c77f2228b9e10bc609d5bc203d86ebc9b87ad8d5d5167a6c9abf739d9" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" [[package]] name = "anes" @@ -173,7 +173,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn 2.0.90", ] [[package]] @@ -184,7 +184,7 @@ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn 2.0.90", ] [[package]] @@ -229,10 +229,10 @@ dependencies = [ "axum-core", "bytes", "futures-util", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.5.1", "hyper-util", "itoa", "matchit", @@ -262,8 +262,8 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "http-body-util", "mime", "pin-project-lite", @@ -298,6 +298,12 @@ dependencies = [ "backtrace", ] +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + [[package]] name = "base64" version = "0.22.1" @@ -348,9 +354,9 @@ dependencies = [ [[package]] name = "blake3" -version = "1.5.4" +version = "1.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d82033247fd8e890df8f740e407ad4d038debb9eb1f40533fffb32e7d17dc6f7" +checksum = "b8ee0c1824c4dea5b5f81736aff91bae041d2c07ee1192bec91054e10e3e601e" dependencies = [ "arrayref", "arrayvec", @@ -409,9 +415,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" +checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" [[package]] name = "cast" @@ -421,9 +427,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.2.1" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd9de9f2205d5ef3fd67e685b0df337994ddd4495e2a28d185500d0e1edfea47" +checksum = "f34d93e62b03caf570cccc334cbc6c2fceca82f39211051345108adcba3eebdc" dependencies = [ "jobserver", "libc", @@ -533,7 +539,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.89", + "syn 2.0.90", ] [[package]] @@ -553,9 +559,9 @@ checksum = "afb84c814227b90d6895e01398aee0d8033c00e7466aca416fb6a8e0eb19d8a7" [[package]] name = "cmake" -version = "0.1.51" +version = "0.1.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb1e43aa7fd152b1f968787f7dbcdeb306d1867ff373c69955211876c053f91a" +checksum = "c682c223677e0e5b6b7f63a64b9351844c3f1b1678a68b7ee617e30fb082620e" dependencies = [ "cc", ] @@ -572,6 +578,22 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + [[package]] name = "cpufeatures" version = "0.2.16" @@ -741,6 +763,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "dissimilar" version = "1.0.9" @@ -762,6 +795,15 @@ dependencies = [ "log", ] +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -770,12 +812,12 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.9" +version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba" +checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -827,6 +869,21 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -892,7 +949,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn 2.0.90", ] [[package]] @@ -979,6 +1036,25 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "h2" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 0.2.12", + "indexmap 2.7.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "h2" version = "0.4.7" @@ -990,8 +1066,8 @@ dependencies = [ "fnv", "futures-core", "futures-sink", - "http", - "indexmap 2.6.0", + "http 1.1.0", + "indexmap 2.7.0", "slab", "tokio", "tokio-util", @@ -1016,9 +1092,9 @@ checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" [[package]] name = "hashbrown" -version = "0.15.1" +version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a9bfc1af68b1726ea47d3d5109de126281def866b33970e10fbab11b5dafab3" +checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" dependencies = [ "allocator-api2", "equivalent", @@ -1046,12 +1122,6 @@ dependencies = [ "libc", ] -[[package]] -name = "hermit-abi" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" - [[package]] name = "hermit-abi" version = "0.4.0" @@ -1075,6 +1145,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "http" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http" version = "1.1.0" @@ -1086,6 +1167,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.12", + "pin-project-lite", +] + [[package]] name = "http-body" version = "1.0.1" @@ -1093,7 +1185,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http", + "http 1.1.0", ] [[package]] @@ -1104,8 +1196,8 @@ checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" dependencies = [ "bytes", "futures-util", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "pin-project-lite", ] @@ -1121,6 +1213,30 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "hyper" +version = "0.14.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c08302e8fa335b151b788c775ff56e7a03ae64ff85c548ee820fecb70356e85" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "hyper" version = "1.5.1" @@ -1130,9 +1246,9 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2", - "http", - "http-body", + "h2 0.4.7", + "http 1.1.0", + "http-body 1.0.1", "httparse", "httpdate", "itoa", @@ -1148,13 +1264,26 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" dependencies = [ - "hyper", + "hyper 1.5.1", "hyper-util", "pin-project-lite", "tokio", "tower-service", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper 0.14.31", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "hyper-util" version = "0.1.10" @@ -1164,9 +1293,9 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "http", - "http-body", - "hyper", + "http 1.1.0", + "http-body 1.0.1", + "hyper 1.5.1", "pin-project-lite", "socket2", "tokio", @@ -1174,6 +1303,145 @@ dependencies = [ "tracing", ] +[[package]] +name = "icu_collections" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db2fa452206ebee18c4b5c2274dbf1de17008e874b4dc4f0aea9d01ca79e4526" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locid" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13acbb8371917fc971be86fc8057c41a64b521c184808a698c02acc242dbf637" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_locid_transform" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01d11ac35de8e40fdeda00d9e1e9d92525f3f9d887cdd7aa81d727596788b54e" +dependencies = [ + "displaydoc", + "icu_locid", + "icu_locid_transform_data", + "icu_provider", + "tinystr", + "zerovec", +] + +[[package]] +name = "icu_locid_transform_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdc8ff3388f852bede6b579ad4e978ab004f139284d7b28715f773507b946f6e" + +[[package]] +name = "icu_normalizer" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19ce3e0da2ec68599d193c93d088142efd7f9c5d6fc9b803774855747dc6a84f" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "utf16_iter", + "utf8_iter", + "write16", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8cafbf7aa791e9b22bec55a167906f9e1215fd475cd22adfcf660e03e989516" + +[[package]] +name = "icu_properties" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93d6020766cfc6302c15dbbc9c8778c37e62c14427cb7f6e601d849e092aeef5" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_locid_transform", + "icu_properties_data", + "icu_provider", + "tinystr", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67a8effbc3dd3e4ba1afa8ad918d5684b8868b3b26500753effea8d2eed19569" + +[[package]] +name = "icu_provider" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ed421c8a8ef78d3e2dbc98a973be2f3770cb42b606e3ab18d6237c4dfde68d9" +dependencies = [ + "displaydoc", + "icu_locid", + "icu_provider_macros", + "stable_deref_trait", + "tinystr", + "writeable", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_provider_macros" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + +[[package]] +name = "idna" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "686f825264d630750a544639377bae737628043f20d38bbc029e8f29ea968a7e" +dependencies = [ + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daca1df1c957320b2cf139ac61e7bd64fed304c5040df000a745aa1de3b4ef71" +dependencies = [ + "icu_normalizer", + "icu_properties", +] + [[package]] name = "indenter" version = "0.3.3" @@ -1192,12 +1460,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.6.0" +version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" +checksum = "62f822373a4fe84d4bb149bf54e584a7f4abec90e072ed49cda0edea5b95471f" dependencies = [ "equivalent", - "hashbrown 0.15.1", + "hashbrown 0.15.2", ] [[package]] @@ -1206,6 +1474,12 @@ version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb" +[[package]] +name = "ipnet" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" + [[package]] name = "is-terminal" version = "0.4.13" @@ -1258,9 +1532,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.13" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "540654e97a3f4470a492cd30ff187bc95d89557a903a2bbf112e2fae98104ef2" +checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" [[package]] name = "jobserver" @@ -1273,10 +1547,11 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.72" +version = "0.3.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9" +checksum = "a865e038f7f6ed956f788f0d7d60c541fff74c7bd74272c5d4cf15c63743e705" dependencies = [ + "once_cell", "wasm-bindgen", ] @@ -1324,9 +1599,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.164" +version = "0.2.167" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f" +checksum = "09d6582e104315a817dff97f75133544b2e094ee22447d2acf4a74e189ba06fc" [[package]] name = "libm" @@ -1366,6 +1641,12 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +[[package]] +name = "litemap" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ee93343901ab17bd981295f2cf0026d4ad018c7c31ba84549a4ddbb47a45104" + [[package]] name = "lock_api" version = "0.4.12" @@ -1384,18 +1665,18 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" [[package]] name = "logos" -version = "0.14.2" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c6b6e02facda28ca5fb8dbe4b152496ba3b1bd5a4b40bb2b1b2d8ad74e0f39b" +checksum = "4b6aa86787fd2da255f97a4425799c8d1fd39951f5798a1192fc1b956581f605" dependencies = [ "logos-derive", ] [[package]] name = "logos-codegen" -version = "0.14.2" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b32eb6b5f26efacd015b000bfc562186472cd9b34bdba3f6b264e2a052676d10" +checksum = "5f3303189202bb8a052bcd93d66b6c03e6fe70d9c7c47c0ea5e974955e54c876" dependencies = [ "beef", "fnv", @@ -1403,14 +1684,15 @@ dependencies = [ "proc-macro2", "quote", "regex-syntax 0.8.5", - "syn 2.0.89", + "rustc_version 0.4.1", + "syn 2.0.90", ] [[package]] name = "logos-derive" -version = "0.14.2" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e5d0c5463c911ef55624739fc353238b4e310f0144be1f875dc42fec6bfd5ec" +checksum = "774a1c225576486e4fdf40b74646f672c542ca3608160d348749693ae9d456e6" dependencies = [ "logos-codegen", ] @@ -1434,7 +1716,7 @@ version = "0.12.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" dependencies = [ - "hashbrown 0.15.1", + "hashbrown 0.15.2", ] [[package]] @@ -1601,8 +1883,8 @@ dependencies = [ "supports-color", "supports-hyperlinks", "supports-unicode", - "syn 2.0.89", - "terminal_size", + "syn 2.0.90", + "terminal_size 0.3.0", "textwrap", "trybuild", "unicode-width 0.1.14", @@ -1616,7 +1898,7 @@ checksum = "1cc759f0a2947acae217a2f32f722105cacc57d17d5f93bc16362142943a4edd" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn 2.0.90", ] [[package]] @@ -1691,7 +1973,7 @@ checksum = "0ee4176a0f2e7d29d2a8ee7e60b6deb14ce67a20e94c3e2c7275cdb8804e1862" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn 2.0.90", ] [[package]] @@ -1733,6 +2015,7 @@ dependencies = [ "prost", "prost-build", "protox", + "reqwest", "serde", "serde_qs", "tokio", @@ -1762,9 +2045,9 @@ dependencies = [ [[package]] name = "miette" -version = "7.2.0" +version = "7.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4edc8853320c2a0dab800fbda86253c8938f6ea88510dc92c5f1ed20e794afc1" +checksum = "317f146e2eb7021892722af37cf1b971f0a70c8406f487e24952667616192c64" dependencies = [ "backtrace", "backtrace-ext", @@ -1774,7 +2057,7 @@ dependencies = [ "supports-color", "supports-hyperlinks", "supports-unicode", - "terminal_size", + "terminal_size 0.4.1", "textwrap", "thiserror 1.0.69", "unicode-width 0.1.14", @@ -1782,13 +2065,13 @@ dependencies = [ [[package]] name = "miette-derive" -version = "7.2.0" +version = "7.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcf09caffaac8068c346b6df2a7fc27a177fd20b39421a39ce0a211bde679a6c" +checksum = "23c9b935fbe1d6cbd1dac857b54a688145e2d93f48db36010514d0f612d0ad67" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn 2.0.90", ] [[package]] @@ -1808,11 +2091,10 @@ dependencies = [ [[package]] name = "mio" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" +checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" dependencies = [ - "hermit-abi 0.3.9", "libc", "wasi", "windows-sys 0.52.0", @@ -1824,6 +2106,23 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +[[package]] +name = "native-tls" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "new_debug_unreachable" version = "1.0.6" @@ -1893,7 +2192,7 @@ checksum = "ed3955f1a9c7c0c15e092f9c887db08b1fc683305fdf6eb6684f22555355e202" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn 2.0.90", ] [[package]] @@ -1958,12 +2257,50 @@ version = "11.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9" +[[package]] +name = "openssl" +version = "0.10.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6174bc48f102d208783c2c84bf931bb75927a617866870de8a4ea85597f871f5" +dependencies = [ + "bitflags 2.6.0", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.104" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45abf306cbf99debc8195b66b7346498d7b10c210de50418b5ccd7ceba08c741" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "os_str_bytes" version = "6.6.1" @@ -2031,7 +2368,7 @@ dependencies = [ "proc-macro2", "proc-macro2-diagnostics", "quote", - "syn 2.0.89", + "syn 2.0.90", ] [[package]] @@ -2047,7 +2384,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" dependencies = [ "fixedbitset", - "indexmap 2.6.0", + "indexmap 2.7.0", ] [[package]] @@ -2076,7 +2413,7 @@ checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn 2.0.90", ] [[package]] @@ -2115,7 +2452,7 @@ dependencies = [ "blake2", "bytes", "hex", - "http", + "http 1.1.0", "httparse", "httpdate", "indexmap 1.9.3", @@ -2154,8 +2491,8 @@ dependencies = [ "daemonize", "flate2", "futures", - "h2", - "http", + "h2 0.4.7", + "http 1.1.0", "httparse", "httpdate", "libc", @@ -2201,7 +2538,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bcb3f62d852da015e76ced56e93e6d52941679a9825281c90f2897841129e59d" dependencies = [ "bytes", - "http", + "http 1.1.0", "httparse", "pingora-error", "pingora-http", @@ -2217,7 +2554,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70202f126056f366549afc804741e12dd9f419cfc79a0063ab15653007a0f4c6" dependencies = [ "bytes", - "http", + "http 1.1.0", "pingora-error", ] @@ -2250,7 +2587,7 @@ dependencies = [ "derivative", "fnv", "futures", - "http", + "http 1.1.0", "log", "pingora-core", "pingora-error", @@ -2268,7 +2605,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb50f65f06c4b81ccb3edcceaa54bb9439608506b0b3b8c048798169a64aad8e" dependencies = [ "arrayvec", - "hashbrown 0.15.1", + "hashbrown 0.15.2", "parking_lot", "rand", ] @@ -2298,8 +2635,8 @@ dependencies = [ "bytes", "clap 3.2.25", "futures", - "h2", - "http", + "h2 0.4.7", + "http 1.1.0", "log", "once_cell", "pingora-cache", @@ -2364,7 +2701,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64d1ec885c64d0457d564db4ec299b2dae3f9c02808b8ad9c3a089c591b18033" dependencies = [ "proc-macro2", - "syn 2.0.89", + "syn 2.0.90", ] [[package]] @@ -2417,7 +2754,7 @@ checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn 2.0.90", "version_check", "yansi", ] @@ -2464,7 +2801,7 @@ dependencies = [ "prost", "prost-types", "regex", - "syn 2.0.89", + "syn 2.0.90", "tempfile", ] @@ -2478,7 +2815,7 @@ dependencies = [ "itertools 0.13.0", "proc-macro2", "quote", - "syn 2.0.89", + "syn 2.0.90", ] [[package]] @@ -2665,6 +3002,46 @@ version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2" +[[package]] +name = "reqwest" +version = "0.11.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" +dependencies = [ + "base64 0.21.7", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.31", + "hyper-tls", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls-pemfile", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper 0.1.2", + "system-configuration", + "tokio", + "tokio-native-tls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", +] + [[package]] name = "rmp" version = "0.8.14" @@ -2713,7 +3090,7 @@ dependencies = [ "regex", "relative-path", "rustc_version 0.4.1", - "syn 2.0.89", + "syn 2.0.90", "unicode-ident", ] @@ -2764,6 +3141,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustls-pemfile" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" +dependencies = [ + "base64 0.21.7", +] + [[package]] name = "rustracing" version = "0.5.1" @@ -2812,6 +3198,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schannel" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "scoped-tls" version = "1.0.1" @@ -2824,6 +3219,29 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags 2.6.0", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa39c7303dc58b5543c94d22c1766b0d31f2ee58306363ea622b10bbc075eaa2" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "semver" version = "0.9.0" @@ -2862,7 +3280,7 @@ checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn 2.0.90", ] [[package]] @@ -2871,7 +3289,7 @@ version = "1.0.133" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" dependencies = [ - "indexmap 2.6.0", + "indexmap 2.7.0", "itoa", "memchr", "ryu", @@ -2939,7 +3357,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f27daf6ed3fc7ffd5ea3ce9f684fe351c47e50f2fdbb6236e2bad0b440dbe408" dependencies = [ "data-encoding", - "indexmap 2.6.0", + "indexmap 2.7.0", "rust_decimal", ] @@ -3006,9 +3424,9 @@ checksum = "b7c388c1b5e93756d0c740965c41e8822f866621d41acbdf6336a6a168f8840c" [[package]] name = "socket2" -version = "0.5.7" +version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" +checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8" dependencies = [ "libc", "windows-sys 0.52.0", @@ -3020,6 +3438,12 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "string_cache" version = "0.8.7" @@ -3073,7 +3497,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.89", + "syn 2.0.90", ] [[package]] @@ -3084,18 +3508,18 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "supports-color" -version = "3.0.1" +version = "3.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8775305acf21c96926c900ad056abeef436701108518cf890020387236ac5a77" +checksum = "c64fc7232dd8d2e4ac5ce4ef302b1d81e0b80d055b9d77c7c4f51f6aa4c867d6" dependencies = [ "is_ci", ] [[package]] name = "supports-hyperlinks" -version = "3.0.0" +version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c0a1e5168041f5f3ff68ff7d95dcb9c8749df29f6e7e89ada40dd4c9de404ee" +checksum = "804f44ed3c63152de6a9f90acbea1a110441de43006ea51bcce8f436196a288b" [[package]] name = "supports-unicode" @@ -3116,9 +3540,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.89" +version = "2.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44d46482f1c1c87acd84dea20c1bf5ebff4c757009ed6bf19cfd36fb10e92c4e" +checksum = "919d3b74a5dd0ccd15aeb8f93e7006bd9e14c295087c9896a110f490752bcf31" dependencies = [ "proc-macro2", "quote", @@ -3137,6 +3561,38 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +[[package]] +name = "synstructure" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "target-triple" version = "0.1.3" @@ -3186,6 +3642,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "terminal_size" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5352447f921fda68cf61b4101566c0bdb5104eff6804d0678e5227580ab6a4e9" +dependencies = [ + "rustix", + "windows-sys 0.59.0", +] + [[package]] name = "textwrap" version = "0.16.1" @@ -3223,7 +3689,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn 2.0.90", ] [[package]] @@ -3234,7 +3700,7 @@ checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn 2.0.90", ] [[package]] @@ -3266,6 +3732,16 @@ dependencies = [ "crunchy", ] +[[package]] +name = "tinystr" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9117f5d4db391c1cf6927e7bea3db74b9a1c1add8f7eda9ffd5364f40f57b82f" +dependencies = [ + "displaydoc", + "zerovec", +] + [[package]] name = "tinytemplate" version = "1.2.1" @@ -3278,9 +3754,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.41.1" +version = "1.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22cfb5bee7a6a52939ca9224d6ac897bb669134078daa8735560897f69de4d33" +checksum = "5cec9b21b0450273377fc97bd4c33a8acffc8c996c987a7c5b319a0083707551" dependencies = [ "backtrace", "bytes", @@ -3302,7 +3778,17 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn 2.0.90", +] + +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", ] [[package]] @@ -3369,7 +3855,7 @@ version = "0.22.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ae48d6208a266e853d946088ed816055e556cc6028c5e8e2b84d9fa5dd7c7f5" dependencies = [ - "indexmap 2.6.0", + "indexmap 2.7.0", "serde", "serde_spanned", "toml_datetime", @@ -3385,13 +3871,13 @@ dependencies = [ "async-stream", "async-trait", "axum", - "base64", + "base64 0.22.1", "bytes", - "h2", - "http", - "http-body", + "h2 0.4.7", + "http 1.1.0", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.5.1", "hyper-timeout", "hyper-util", "percent-encoding", @@ -3417,7 +3903,7 @@ dependencies = [ "prost-build", "prost-types", "quote", - "syn 2.0.89", + "syn 2.0.90", ] [[package]] @@ -3426,10 +3912,10 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5299dd20801ad736dccb4a5ea0da7376e59cd98f213bf1c3d478cf53f4834b58" dependencies = [ - "base64", + "base64 0.22.1", "bytes", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "http-body-util", "pin-project", "tokio-stream", @@ -3446,12 +3932,12 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef5ca6e7bdd0042c440d36b6df97c1436f1d45871ce18298091f114004b1beb4" dependencies = [ - "base64", + "base64 0.22.1", "byteorder", "bytes", "futures-util", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "http-body-util", "httparse", "js-sys", @@ -3509,8 +3995,8 @@ checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" dependencies = [ "bitflags 2.6.0", "bytes", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "http-body-util", "pin-project-lite", "tower-layer", @@ -3531,9 +4017,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" -version = "0.1.40" +version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "log", "pin-project-lite", @@ -3543,20 +4029,20 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.27" +version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn 2.0.90", ] [[package]] name = "tracing-core" -version = "0.1.32" +version = "0.1.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" dependencies = [ "once_cell", "valuable", @@ -3575,9 +4061,9 @@ dependencies = [ [[package]] name = "tracing-serde" -version = "0.1.3" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" +checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" dependencies = [ "serde", "tracing-core", @@ -3585,9 +4071,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.18" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" dependencies = [ "matchers", "nu-ansi-term", @@ -3706,6 +4192,29 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" +[[package]] +name = "url" +version = "2.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", +] + +[[package]] +name = "utf16_iter" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8232dd3cdaed5356e0f716d285e4b40b932ac434100fe9b7e0e8e935b9e6246" + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + [[package]] name = "utf8parse" version = "0.2.2" @@ -3727,6 +4236,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" @@ -3780,9 +4295,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.95" +version = "0.2.97" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e" +checksum = "d15e63b4482863c109d70a7b8706c1e364eb6ea449b201a76c5b89cedcec2d5c" dependencies = [ "cfg-if", "once_cell", @@ -3791,36 +4306,37 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.95" +version = "0.2.97" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358" +checksum = "8d36ef12e3aaca16ddd3f67922bc63e48e953f126de60bd33ccc0101ef9998cd" dependencies = [ "bumpalo", "log", "once_cell", "proc-macro2", "quote", - "syn 2.0.89", + "syn 2.0.90", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-futures" -version = "0.4.45" +version = "0.4.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc7ec4f8827a71586374db3e87abdb5a2bb3a15afed140221307c3ec06b1f63b" +checksum = "9dfaf8f50e5f293737ee323940c7d8b08a66a95a419223d9f41610ca08b0833d" dependencies = [ "cfg-if", "js-sys", + "once_cell", "wasm-bindgen", "web-sys", ] [[package]] name = "wasm-bindgen-macro" -version = "0.2.95" +version = "0.2.97" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56" +checksum = "705440e08b42d3e4b36de7d66c944be628d579796b8090bfa3471478a2260051" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -3828,22 +4344,22 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.95" +version = "0.2.97" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" +checksum = "98c9ae5a76e46f4deecd0f0255cc223cfa18dc9b261213b8aa0c7b36f61b3f1d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn 2.0.90", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.95" +version = "0.2.97" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" +checksum = "6ee99da9c5ba11bd675621338ef6fa52296b76b83305e9b6e5c77d4c286d6d49" [[package]] name = "wasm-streams" @@ -3860,9 +4376,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.72" +version = "0.3.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6488b90108c040df0fe62fa815cbdee25124641df01814dd7282749234c6112" +checksum = "a98bc3c33f0fe7e59ad7cd041b89034fa82a7c2d4365ca538dda6cdaf513863c" dependencies = [ "js-sys", "wasm-bindgen", @@ -3930,7 +4446,7 @@ checksum = "2bbd5b46c938e506ecbce286b6628a02171d56153ba733b6c741fc627ec9579b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn 2.0.90", ] [[package]] @@ -3941,7 +4457,7 @@ checksum = "053c4c462dc91d3b1504c6fe5a726dd15e216ba718e84a0e46a88fbe5ded3515" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn 2.0.90", ] [[package]] @@ -4120,6 +4636,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "winter-air" version = "0.10.3" @@ -4172,7 +4698,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be43529f43f70306437d2c2c9f9e2b3a4d39b42e86702d8d7577f2357ea32fa6" dependencies = [ "quote", - "syn 2.0.89", + "syn 2.0.90", ] [[package]] @@ -4222,6 +4748,18 @@ dependencies = [ "winter-utils", ] +[[package]] +name = "write16" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1890f4022759daae28ed4fe62859b1236caebfc61ede2f63ed4e695f3f6d936" + +[[package]] +name = "writeable" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" + [[package]] name = "yaml-rust" version = "0.4.5" @@ -4237,6 +4775,30 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049" +[[package]] +name = "yoke" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "120e6aef9aa629e3d4f52dc8cc43a015c7724194c97dfaf45180d2daf2b77f40" +dependencies = [ + "serde", + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", + "synstructure", +] + [[package]] name = "zerocopy" version = "0.7.35" @@ -4255,7 +4817,50 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.89", + "syn 2.0.90", +] + +[[package]] +name = "zerofrom" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cff3ee08c995dee1859d998dea82f7374f2826091dd9cd47def953cae446cd2e" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", + "synstructure", +] + +[[package]] +name = "zerovec" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa2b893d79df23bfb12d5461018d408ea19dfafe76c2c7ef6d4eba614f8ff079" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", ] [[package]] diff --git a/bin/tx-prover/Cargo.toml b/bin/tx-prover/Cargo.toml index 2b3fa2544..ae37e36c7 100644 --- a/bin/tx-prover/Cargo.toml +++ b/bin/tx-prover/Cargo.toml @@ -48,6 +48,7 @@ miden-lib = { workspace = true, default-features = false } miden-objects = { workspace = true, default-features = false } miden-tx = { workspace = true, default-features = false } prost = { version = "0.13", default-features = false, features = ["derive"] } +reqwest = { version = "0.11" } serde = { version = "1.0", features = ["derive"] } serde_qs = { version = "0.13" } tokio = { version = "1.38", optional = true, features = ["full"] } diff --git a/bin/tx-prover/README.md b/bin/tx-prover/README.md index e07a5c174..049731a13 100644 --- a/bin/tx-prover/README.md +++ b/bin/tx-prover/README.md @@ -81,6 +81,27 @@ This command will start the proxy using the workers defined in the configuration At the moment, when a worker added to the proxy stops working and can not connect to it for a request, the connection is marked as retriable meaning that the proxy will try reaching the following worker in a round-robin fashion. The amount of retries is configurable changing the `max_retries_per_request` value in the configuration file. +## Updating workers on a running proxy + +To update the workers on a running proxy, you can use the `update-workers` command. This command will update the workers on the proxy and will not require a restart. To use this command, you will need to run: + +```bash +miden-tx-prover update-workers [add|remove] [worker1] [worker2] ... [workerN] +``` + +For example: + +```bash +# To add 0.0.0.0:8085 and 200.58.70.4:50051 to the workers list: +miden-tx-prover update-workers add 0.0.0.0:8085 200.58.70.4:50051 +# To remove 158.12.12.3:8080 and 122.122.6.6:50051 from the workers list: +miden-tx-prover update-workers remove 158.12.12.3:8080 122.122.6.6:50051 +``` + +This changes will be persisted to the configuration file. + +Note that, in order to update the workers, the proxy must be running in the same computer as the command is being executed because it will check if the client address is localhost to avoid any security issues. + Both the worker and the proxy will use the `info` log level by default, but it can be changed by setting the `RUST_LOG` environment variable. ## Features diff --git a/bin/tx-prover/src/commands/mod.rs b/bin/tx-prover/src/commands/mod.rs index 22f8ac865..89845dc94 100644 --- a/bin/tx-prover/src/commands/mod.rs +++ b/bin/tx-prover/src/commands/mod.rs @@ -1,11 +1,20 @@ +use std::{fs::File, io::Write}; + use clap::Parser; +use figment::{ + providers::{Format, Toml}, + Figment, +}; use init::Init; +use miden_tx_prover::PROVER_SERVICE_CONFIG_FILE_NAME; use proxy::StartProxy; use serde::{Deserialize, Serialize}; +use update_workers::UpdateWorkers; use worker::StartWorker; pub mod init; pub mod proxy; +pub mod update_workers; pub mod worker; /// Configuration of the proxy. @@ -50,6 +59,45 @@ impl Default for ProxyConfig { } } +impl ProxyConfig { + /// Loads config file from current directory and default filename and returns it + /// + /// This function will look for the configuration file with the name defined at the + /// [PROVER_SERVICE_CONFIG_FILE_NAME] constant in the current directory. + pub(crate) fn load_config_from_file() -> Result { + let mut current_dir = std::env::current_dir().map_err(|err| err.to_string())?; + current_dir.push(PROVER_SERVICE_CONFIG_FILE_NAME); + let config_path = current_dir.as_path(); + + Figment::from(Toml::file(config_path)) + .extract() + .map_err(|err| format!("Failed to load {} config file: {err}", config_path.display())) + } + + pub(crate) fn save_to_config_file(&self) -> Result<(), String> { + let mut current_dir = std::env::current_dir().map_err(|err| err.to_string())?; + current_dir.push(PROVER_SERVICE_CONFIG_FILE_NAME); + let config_path = current_dir.as_path(); + + let config_as_toml_string = toml::to_string_pretty(self) + .map_err(|err| format!("error formatting config: {err}"))?; + + let mut file_handle = File::options() + .write(true) + .truncate(true) + .open(config_path) + .map_err(|err| format!("error opening the file: {err}"))?; + + file_handle + .write(config_as_toml_string.as_bytes()) + .map_err(|err| format!("error writing to file: {err}"))?; + + println!("Config updated successfully"); + + Ok(()) + } +} + /// Configuration for a worker #[derive(Serialize, Deserialize)] pub struct WorkerConfig { @@ -58,7 +106,7 @@ pub struct WorkerConfig { } impl WorkerConfig { - fn new(host: &str, port: u16) -> Self { + pub fn new(host: &str, port: u16) -> Self { Self { host: host.into(), port } } } @@ -89,6 +137,11 @@ pub enum Command { StartWorker(StartWorker), /// Starts the proxy defined in the config file. StartProxy(StartProxy), + /// Updates the workers defined in the config file. + /// + /// This method will make a request to the proxy defined in the config file to update the + /// workers. It will update the configuration file with the new list of workers. + UpdateWorkers(UpdateWorkers), } /// CLI entry point @@ -106,13 +159,7 @@ impl Cli { // Init does not require async, so run directly init.execute() }, + Command::UpdateWorkers(update_workers) => update_workers.execute(), } } } - -#[derive(Debug, PartialEq, Deserialize, Serialize, Parser)] -pub struct UpdateWorkers { - // Add validations, actions should be "add" or "remove" - pub action: String, - pub workers: Vec, -} diff --git a/bin/tx-prover/src/commands/proxy.rs b/bin/tx-prover/src/commands/proxy.rs index 0152c6bf2..834b9cc8f 100644 --- a/bin/tx-prover/src/commands/proxy.rs +++ b/bin/tx-prover/src/commands/proxy.rs @@ -2,7 +2,7 @@ use clap::Parser; use pingora::{apps::HttpServerOptions, lb::Backend, prelude::Opt, server::Server}; use pingora_proxy::http_proxy_service; -use crate::{proxy::LoadBalancer, utils::load_config_from_file}; +use crate::proxy::LoadBalancer; /// Starts the proxy defined in the config file. #[derive(Debug, Parser)] @@ -17,7 +17,7 @@ impl StartProxy { let mut server = Server::new(Some(Opt::default())).expect("Failed to create server"); server.bootstrap(); - let proxy_config = load_config_from_file()?; + let proxy_config = super::ProxyConfig::load_config_from_file()?; let workers = proxy_config .workers diff --git a/bin/tx-prover/src/commands/update_workers.rs b/bin/tx-prover/src/commands/update_workers.rs new file mode 100644 index 000000000..865acbc66 --- /dev/null +++ b/bin/tx-prover/src/commands/update_workers.rs @@ -0,0 +1,64 @@ +use clap::Parser; +use reqwest::Client; +use serde::{Deserialize, Serialize}; + +#[derive(clap::ValueEnum, Clone, Debug, Serialize, Deserialize)] +pub enum Action { + Add, + Remove, +} + +impl Action { + pub fn as_str(&self) -> &str { + match self { + Action::Add => "add", + Action::Remove => "remove", + } + } +} + +#[derive(Debug, Parser, Clone, Serialize, Deserialize)] +pub struct UpdateWorkers { + pub action: Action, + pub workers: Vec, +} + +impl UpdateWorkers { + pub fn execute(&self) -> Result<(), String> { + // Define a runtime + let rt = tokio::runtime::Runtime::new() + .map_err(|e| format!("Failed to create runtime: {:?}", e))?; + + let query_params = serde_qs::to_string(&self).map_err(|err| err.to_string())?; + + println!("Action: {:?}, with workers: {:?}", self.action, self.workers); + + let url = format!("http://0.0.0.0:8082?{}", query_params); + + // Create an HTTP/2 client + let client = Client::builder() + .http2_prior_knowledge() + .build() + .map_err(|err| err.to_string())?; + + // Make the request + let response = rt.block_on(client.get(url).send()).map_err(|err| err.to_string())?; + + // Check status code + if !response.status().is_success() { + return Err(format!("Request failed with status code: {}", response.status())); + } + + // Read the X-Workers-Amount header + let workers_amount = response + .headers() + .get("X-Workers-Amount") + .ok_or("Missing X-Workers-Amount header")? + .to_str() + .map_err(|err| err.to_string())?; + + println!("New amount of workers: {}", workers_amount); + + Ok(()) + } +} diff --git a/bin/tx-prover/src/proxy/mod.rs b/bin/tx-prover/src/proxy/mod.rs index bdef5ba85..619f3e358 100644 --- a/bin/tx-prover/src/proxy/mod.rs +++ b/bin/tx-prover/src/proxy/mod.rs @@ -15,8 +15,14 @@ use tracing::{error, info, warn}; use uuid::Uuid; use crate::{ - commands::{ProxyConfig, UpdateWorkers}, - utils::{create_queue_full_response, create_too_many_requests_response}, + commands::{ + update_workers::{Action, UpdateWorkers}, + ProxyConfig, WorkerConfig, + }, + utils::{ + create_queue_full_response, create_response_with_error_message, + create_too_many_requests_response, create_workers_updated_response, + }, }; // LOAD BALANCER @@ -24,7 +30,7 @@ use crate::{ /// Load balancer that uses a round robin strategy pub struct LoadBalancer { - available_workers: Arc>>, + total_workers: Arc>>, timeout_secs: Duration, connection_timeout_secs: Duration, max_queue_items: usize, @@ -36,8 +42,10 @@ pub struct LoadBalancer { impl LoadBalancer { /// Create a new load balancer pub fn new(workers: Vec, config: &ProxyConfig) -> Self { + let workers = workers.into_iter().map(|worker| (worker, WorkerStatus::Available)).collect(); + Self { - available_workers: Arc::new(RwLock::new(workers)), + total_workers: Arc::new(RwLock::new(workers)), timeout_secs: Duration::from_secs(config.timeout_secs), connection_timeout_secs: Duration::from_secs(config.connection_timeout_secs), max_queue_items: config.max_queue_items, @@ -49,49 +57,172 @@ impl LoadBalancer { } } - /// Gets an available worker and removes it from the list of available workers. + /// Gets an available worker from the list of available workers and marks it as busy. /// /// If no worker is available, it will return None. pub async fn pop_available_worker(&self) -> Option { - self.available_workers.write().await.pop() + let mut available_workers = self.total_workers.write().await; + + // Find the first available worker + let worker = available_workers.iter_mut().find_map(|(worker, status)| { + if *status == WorkerStatus::Available { + *status = WorkerStatus::Busy; + Some(worker.clone()) + } else { + None + } + }); + + worker } - /// Adds the provided worker to the list of available workers. + /// Changes the status of a worker to available. /// - /// # Panics - /// Panics if the provided worker is already in the list of available workers. + /// If the worker is Removed, it will be ignored. pub async fn add_available_worker(&self, worker: Backend) { - let mut available_workers = self.available_workers.write().await; - assert!(!available_workers.contains(&worker), "Worker already available"); - info!("Worker {} is now available", worker.addr); - available_workers.push(worker); + let mut available_workers = self.total_workers.write().await; + + if let Some((_, status)) = available_workers.iter_mut().find(|(w, _)| w == &worker) { + if *status == WorkerStatus::Busy { + *status = WorkerStatus::Available; + } + } } - pub async fn update_workers(&self, update_workers: UpdateWorkers) { - let mut available_workers = self.available_workers.write().await; + /// Updates the list of available workers. + /// + /// The `action` field can be either "add" or "remove". + /// If the action is "add" and the worker did not exist in the list of available workers or + /// was marked as Removed, it will be added with the status set to available. If it already + /// exists, it will be ignored. + /// If the action is "remove", the worker will be marked as Removed. + /// Then, it will persist the changes to the workers in the configuration file. + pub async fn update_workers( + &self, + update_workers: UpdateWorkers, + ) -> std::result::Result<(), String> { + let mut available_workers = self.total_workers.write().await; + info!("Current workers: {:?}", available_workers); - let action = update_workers.action.as_str(); let workers: Vec<_> = update_workers .workers .iter() .map(|worker| Backend::new(worker)) .collect::>() - .expect("Failed to create backend"); + .map_err(|err| format!("Failed to create backend: {}", err))?; - match action { - "add" => { + match update_workers.action { + Action::Add => { + for worker in workers { + if let Some((_, status)) = + available_workers.iter_mut().find(|(w, _)| w == &worker) + { + if *status == WorkerStatus::Removed { + *status = WorkerStatus::Available; + } + } else { + available_workers.push((worker, WorkerStatus::Available)); + } + } + }, + Action::Remove => { for worker in workers { - if !available_workers.contains(&worker) { - available_workers.push(worker); + if let Some((_, status)) = + available_workers.iter_mut().find(|(w, _)| w == &worker) + { + *status = WorkerStatus::Removed; } } }, - "remove" => available_workers.retain(|w| !workers.contains(w)), - _ => warn!("Invalid action: {}", action), } + // Persist the changes to the workers in the configuration file + let mut configuration = ProxyConfig::load_config_from_file() + .map_err(|err| format!("Failed to load configuration: {}", err))?; + + let workers = available_workers + .iter() + .map(|(worker, _)| { + worker.as_inet().ok_or_else(|| "Failed to get worker address".to_string()).map( + |worker_addr| { + WorkerConfig::new(&worker_addr.ip().to_string(), worker_addr.port()) + }, + ) + }) + .collect::, _>>()?; + + configuration.workers = workers; + + configuration + .save_to_config_file() + .map_err(|err| format!("Failed to save configuration: {}", err))?; + info!("Workers updated: {:?}", available_workers); + + Ok(()) + } + + /// Get the total number of operative workers. + /// + /// A worker is considered operative if it is available or busy. + pub async fn operative_workers_count(&self) -> usize { + let available_workers = self.total_workers.read().await; + available_workers + .iter() + .filter(|(_, status)| *status != WorkerStatus::Removed) + .count() + } + + /// Handle the update workers request + /// + /// If an error is encountered, it will return a response with the error message. + /// If the workers are updated successfully, it will return a response with the number of + /// total workers. + pub async fn handle_update_workers_request( + &self, + session: &mut Session, + ) -> Option> { + let http_session = session.as_downstream_mut(); + + // Attempt to read the HTTP request + if let Err(err) = http_session.read_request().await { + let error_message = format!("Failed to read request: {}", err); + error!("{}", error_message); + return Some(create_response_with_error_message(session, error_message).await); + } + + // Extract and parse query parameters, if there are not any, return early to continue + // processing the request as a regular proving request. + let query_params = match http_session.req_header().as_ref().uri.query() { + Some(params) => params, + None => { + return None; + }, + }; + + // Parse the query parameters + let update_workers: Result = serde_qs::from_str(query_params); + let update_workers = match update_workers { + Ok(workers) => workers, + Err(err) => { + let error_message = format!("Failed to parse query parameters: {}", err); + error!("{}", error_message); + return Some(create_response_with_error_message(session, error_message).await); + }, + }; + + // Update workers and handle potential errors + if let Err(err) = self.update_workers(update_workers).await { + let error_message = format!("Failed to update workers: {}", err); + error!("{}", error_message); + return Some(create_response_with_error_message(session, error_message).await); + } + + // Successfully updated workers + info!("Workers updated successfully"); + let available_workers_count = self.operative_workers_count().await; + Some(create_workers_updated_response(session, available_workers_count).await) } } @@ -205,25 +336,28 @@ impl ProxyHttp for LoadBalancer { where Self::CTX: Send + Sync, { - let client_addr = session.client_addr(); - - // Extract the address string early to drop the reference to session - let client_addr_str = client_addr.expect("No socket address").to_string(); - - info!("Client address: {:?}", client_addr_str); - - if client_addr_str.contains("127.0.0.1") { - let session = session.as_downstream_mut(); - session.read_request().await?; - if let Some(query_params) = session.req_header().as_ref().uri.query() { - let update_workers: UpdateWorkers = - serde_qs::from_str(query_params).expect("Failed to parse query params"); - self.update_workers(update_workers).await; - return Ok(true); - }; + // Extract the client address early + let client_addr = match session.client_addr() { + Some(addr) => addr.to_string(), + None => { + return create_response_with_error_message( + session, + "No socket address".to_string(), + ) + .await; + }, }; - let user_id = Some(client_addr_str); + info!("Client address: {:?}", client_addr); + + // Special handling for localhost + if client_addr.contains("127.0.0.1") { + if let Some(response) = self.handle_update_workers_request(session).await { + return response; + } + } + + let user_id = Some(client_addr); // Retrieve the current window requests let curr_window_requests = RATE_LIMITER.observe(&user_id, 1); diff --git a/bin/tx-prover/src/utils.rs b/bin/tx-prover/src/utils.rs index b23660eaf..cbba39e1c 100644 --- a/bin/tx-prover/src/utils.rs +++ b/bin/tx-prover/src/utils.rs @@ -1,29 +1,8 @@ -use figment::{ - providers::{Format, Toml}, - Figment, -}; -use miden_tx_prover::PROVER_SERVICE_CONFIG_FILE_NAME; use pingora::{http::ResponseHeader, Error, ErrorType}; use pingora_proxy::Session; -use crate::commands::ProxyConfig; - const RESOURCE_EXHAUSTED_CODE: u16 = 8; -/// Loads config file from current directory and default filename and returns it -/// -/// This function will look for the configuration file with the name defined at the -/// [PROVER_SERVICE_CONFIG_FILE_NAME] constant in the current directory. -pub(crate) fn load_config_from_file() -> Result { - let mut current_dir = std::env::current_dir().map_err(|err| err.to_string())?; - current_dir.push(PROVER_SERVICE_CONFIG_FILE_NAME); - let config_path = current_dir.as_path(); - - Figment::from(Toml::file(config_path)) - .extract() - .map_err(|err| format!("Failed to load {} config file: {err}", config_path.display())) -} - pub(crate) fn setup_tracing() { // Set a default log level if `RUST_LOG` is not set if std::env::var("RUST_LOG").is_err() { @@ -67,3 +46,25 @@ pub async fn create_too_many_requests_response( session.write_response_header(Box::new(header), true).await?; Ok(true) } + +pub async fn create_workers_updated_response( + session: &mut Session, + workers: usize, +) -> pingora_core::Result { + let mut header = ResponseHeader::build(200, None)?; + header.insert_header("X-Workers-Amount", workers.to_string())?; + session.set_keepalive(None); + session.write_response_header(Box::new(header), true).await?; + Ok(true) +} + +pub async fn create_response_with_error_message( + session: &mut Session, + error_msg: String, +) -> pingora_core::Result { + let mut header = ResponseHeader::build(400, None)?; + header.insert_header("X-Error-Message", error_msg)?; + session.set_keepalive(None); + session.write_response_header(Box::new(header), true).await?; + Ok(true) +} From dea672f2b1bfc27c0543fe822667e6ca0618298a Mon Sep 17 00:00:00 2001 From: SantiagoPittella Date: Thu, 28 Nov 2024 18:42:00 -0300 Subject: [PATCH 3/4] refactor: use two lists for workers in load balancer feat: use two lists instead of Worker Status fix: use proxy config from file in update-worker docs: improve code documentation docs: add entry to changelog use vec::retain update readme adding logging sections --- CHANGELOG.md | 1 + bin/tx-prover/README.md | 2 + bin/tx-prover/src/commands/update_workers.rs | 8 +- bin/tx-prover/src/proxy/mod.rs | 135 +++++++++---------- 4 files changed, 77 insertions(+), 69 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a4a390314..5f9fa61da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Implemented serialization for `AccountHeader` (#996). - Updated Pingora crates to 0.4 and added polling time to the configuration file (#997). +- Added support for `miden-tx-prover` proxy to update workers on a running proxy (#989). - Refactored `miden-tx-prover` proxy load balancing strategy (#976). - [BREAKING] Better error display when queues are full in the prover service (#967). - [BREAKING] Remove `AccountBuilder::build_testing` and make `Account::initialize_from_components` private (#969). diff --git a/bin/tx-prover/README.md b/bin/tx-prover/README.md index 049731a13..381af4c3d 100644 --- a/bin/tx-prover/README.md +++ b/bin/tx-prover/README.md @@ -102,6 +102,8 @@ This changes will be persisted to the configuration file. Note that, in order to update the workers, the proxy must be running in the same computer as the command is being executed because it will check if the client address is localhost to avoid any security issues. +## Logging + Both the worker and the proxy will use the `info` log level by default, but it can be changed by setting the `RUST_LOG` environment variable. ## Features diff --git a/bin/tx-prover/src/commands/update_workers.rs b/bin/tx-prover/src/commands/update_workers.rs index 865acbc66..a444dea47 100644 --- a/bin/tx-prover/src/commands/update_workers.rs +++ b/bin/tx-prover/src/commands/update_workers.rs @@ -2,6 +2,8 @@ use clap::Parser; use reqwest::Client; use serde::{Deserialize, Serialize}; +use crate::commands::ProxyConfig; + #[derive(clap::ValueEnum, Clone, Debug, Serialize, Deserialize)] pub enum Action { Add, @@ -33,7 +35,11 @@ impl UpdateWorkers { println!("Action: {:?}, with workers: {:?}", self.action, self.workers); - let url = format!("http://0.0.0.0:8082?{}", query_params); + // Get the proxy url from the configuration file. + let proxy_config = ProxyConfig::load_config_from_file()?; + + // Create the full URL + let url = format!("http://{}:{}?{}", proxy_config.host, proxy_config.port, query_params); // Create an HTTP/2 client let client = Client::builder() diff --git a/bin/tx-prover/src/proxy/mod.rs b/bin/tx-prover/src/proxy/mod.rs index 619f3e358..80300898e 100644 --- a/bin/tx-prover/src/proxy/mod.rs +++ b/bin/tx-prover/src/proxy/mod.rs @@ -11,7 +11,7 @@ use pingora_core::{upstreams::peer::HttpPeer, Result}; use pingora_limits::rate::Rate; use pingora_proxy::{ProxyHttp, Session}; use tokio::sync::RwLock; -use tracing::{error, info, warn}; +use tracing::{error, info}; use uuid::Uuid; use crate::{ @@ -30,7 +30,8 @@ use crate::{ /// Load balancer that uses a round robin strategy pub struct LoadBalancer { - total_workers: Arc>>, + available_workers: Arc>>, + current_workers: Arc>>, timeout_secs: Duration, connection_timeout_secs: Duration, max_queue_items: usize, @@ -42,10 +43,9 @@ pub struct LoadBalancer { impl LoadBalancer { /// Create a new load balancer pub fn new(workers: Vec, config: &ProxyConfig) -> Self { - let workers = workers.into_iter().map(|worker| (worker, WorkerStatus::Available)).collect(); - Self { - total_workers: Arc::new(RwLock::new(workers)), + available_workers: Arc::new(RwLock::new(workers.clone())), + current_workers: Arc::new(RwLock::new(workers)), timeout_secs: Duration::from_secs(config.timeout_secs), connection_timeout_secs: Duration::from_secs(config.connection_timeout_secs), max_queue_items: config.max_queue_items, @@ -57,51 +57,53 @@ impl LoadBalancer { } } - /// Gets an available worker from the list of available workers and marks it as busy. + /// Gets an available worker and removes it from the list of available workers. /// /// If no worker is available, it will return None. pub async fn pop_available_worker(&self) -> Option { - let mut available_workers = self.total_workers.write().await; - - // Find the first available worker - let worker = available_workers.iter_mut().find_map(|(worker, status)| { - if *status == WorkerStatus::Available { - *status = WorkerStatus::Busy; - Some(worker.clone()) - } else { - None - } - }); - - worker + self.available_workers.write().await.pop() } - /// Changes the status of a worker to available. + /// Adds a worker to the list of available workers. /// - /// If the worker is Removed, it will be ignored. + /// If the worker is already in the list of available workers, it won't be added. + /// If the worker is not in the list of current workers, it won't be added. pub async fn add_available_worker(&self, worker: Backend) { - let mut available_workers = self.total_workers.write().await; - - if let Some((_, status)) = available_workers.iter_mut().find(|(w, _)| w == &worker) { - if *status == WorkerStatus::Busy { - *status = WorkerStatus::Available; - } + let mut available_workers = self.available_workers.write().await; + let current_workers = self.current_workers.read().await; + if !available_workers.contains(&worker) && current_workers.contains(&worker) { + available_workers.push(worker); } } - /// Updates the list of available workers. + /// Updates the list of available workers based on the given action ("add" or "remove"). + /// + /// # Behavior + /// + /// ## Add Action + /// - If the worker exists in the current workers list, do nothing. + /// - If the worker does not exist in the current workers list, add it to both the current and + /// available workers lists. /// - /// The `action` field can be either "add" or "remove". - /// If the action is "add" and the worker did not exist in the list of available workers or - /// was marked as Removed, it will be added with the status set to available. If it already - /// exists, it will be ignored. - /// If the action is "remove", the worker will be marked as Removed. - /// Then, it will persist the changes to the workers in the configuration file. + /// ## Remove Action + /// - If the worker exists in the available workers list, remove it. + /// - If the worker exists in the current workers list, remove it. + /// - Otherwise, do nothing. + /// + /// Finally, updates the configuration file with the new list of workers. + /// + /// # Errors + /// + /// Returns an error if: + /// - The worker address cannot be cast into [Backend]. + /// - The configuration file cannot be loaded or saved. + /// - A worker address cannot be retrieved. pub async fn update_workers( &self, update_workers: UpdateWorkers, ) -> std::result::Result<(), String> { - let mut available_workers = self.total_workers.write().await; + let mut available_workers = self.available_workers.write().await; + let mut current_workers = self.current_workers.write().await; info!("Current workers: {:?}", available_workers); @@ -115,35 +117,27 @@ impl LoadBalancer { match update_workers.action { Action::Add => { for worker in workers { - if let Some((_, status)) = - available_workers.iter_mut().find(|(w, _)| w == &worker) - { - if *status == WorkerStatus::Removed { - *status = WorkerStatus::Available; - } - } else { - available_workers.push((worker, WorkerStatus::Available)); + if current_workers.contains(&worker) { + continue; } + current_workers.push(worker.clone()); + available_workers.push(worker); } }, Action::Remove => { for worker in workers { - if let Some((_, status)) = - available_workers.iter_mut().find(|(w, _)| w == &worker) - { - *status = WorkerStatus::Removed; - } + available_workers.retain(|w| w != &worker); + current_workers.retain(|w| w != &worker); } }, } - // Persist the changes to the workers in the configuration file let mut configuration = ProxyConfig::load_config_from_file() .map_err(|err| format!("Failed to load configuration: {}", err))?; - let workers = available_workers + let new_list_of_workers = current_workers .iter() - .map(|(worker, _)| { + .map(|worker| { worker.as_inet().ok_or_else(|| "Failed to get worker address".to_string()).map( |worker_addr| { WorkerConfig::new(&worker_addr.ip().to_string(), worker_addr.port()) @@ -152,7 +146,7 @@ impl LoadBalancer { }) .collect::, _>>()?; - configuration.workers = workers; + configuration.workers = new_list_of_workers; configuration .save_to_config_file() @@ -163,22 +157,25 @@ impl LoadBalancer { Ok(()) } - /// Get the total number of operative workers. - /// - /// A worker is considered operative if it is available or busy. - pub async fn operative_workers_count(&self) -> usize { - let available_workers = self.total_workers.read().await; - available_workers - .iter() - .filter(|(_, status)| *status != WorkerStatus::Removed) - .count() + /// Get the total number of current workers. + pub async fn current_workers_count(&self) -> usize { + self.current_workers.read().await.len() } - /// Handle the update workers request + /// Handles the update workers request. + /// + /// # Behavior + /// - Reads the HTTP request from the session. + /// - If query parameters are present, attempts to parse them as an `UpdateWorkers` object. + /// - If the parsing fails, returns an error response. + /// - If successful, updates the list of workers by calling `update_workers`. + /// - If the update is successful, returns the count of available workers. /// - /// If an error is encountered, it will return a response with the error message. - /// If the workers are updated successfully, it will return a response with the number of - /// total workers. + /// # Errors + /// - If the HTTP request cannot be read. + /// - If the query parameters cannot be parsed. + /// - If the workers cannot be updated. + /// - If the response cannot be created. pub async fn handle_update_workers_request( &self, session: &mut Session, @@ -221,7 +218,7 @@ impl LoadBalancer { // Successfully updated workers info!("Workers updated successfully"); - let available_workers_count = self.operative_workers_count().await; + let available_workers_count = self.current_workers_count().await; Some(create_workers_updated_response(session, available_workers_count).await) } } @@ -306,7 +303,8 @@ impl RequestContext { /// /// At the backend-level, a request lifecycle works as follows: /// - When a new requests arrives, [LoadBalancer::request_filter()] method is called. In this method -/// we apply IP-based rate-limiting to the request and check if the request queue is full. +/// we apply IP-based rate-limiting to the request and check if the request queue is full. In this +/// method we also handle the special case update workers request. /// - Next, the [Self::upstream_peer()] method is called. We use it to figure out which worker will /// process the request. Inside `upstream_peer()`, we add the request to the queue of requests. /// Once the request gets to the front of the queue, we forward it to an available worker. This @@ -327,7 +325,8 @@ impl ProxyHttp for LoadBalancer { RequestContext::new() } - /// Decide whether to filter the request or not. + /// Decide whether to filter the request or not. Also, handle the special case of the update + /// workers request. /// /// Here we apply IP-based rate-limiting to the request. We also check if the queue is full. /// From e5097fe2976c512a9a88bc8de3f5fad447bd5978 Mon Sep 17 00:00:00 2001 From: SantiagoPittella Date: Mon, 2 Dec 2024 12:22:59 -0300 Subject: [PATCH 4/4] fix: separate update workers command in two fix: separate update workers command in two commands refactor: use only one vec to store workers chore: use X-Workers-Count, add doc for function fix: use correct header chore: rename X-Worker-Count, fix doc chore: rename current_workers, rename current_workers_count feat: implement TryInto for &Worker --- bin/tx-prover/README.md | 10 +- bin/tx-prover/src/commands/mod.rs | 28 +++- bin/tx-prover/src/commands/update_workers.rs | 80 ++++++++++-- bin/tx-prover/src/proxy/mod.rs | 129 +++++++++++-------- bin/tx-prover/src/utils.rs | 8 +- 5 files changed, 175 insertions(+), 80 deletions(-) diff --git a/bin/tx-prover/README.md b/bin/tx-prover/README.md index 381af4c3d..d9c58e273 100644 --- a/bin/tx-prover/README.md +++ b/bin/tx-prover/README.md @@ -83,19 +83,19 @@ At the moment, when a worker added to the proxy stops working and can not connec ## Updating workers on a running proxy -To update the workers on a running proxy, you can use the `update-workers` command. This command will update the workers on the proxy and will not require a restart. To use this command, you will need to run: +To update the workers on a running proxy, two commands are provided: `add-worker` and `remove-worker`. These commands will update the workers on the proxy and will not require a restart. To use these commands, you will need to run: ```bash -miden-tx-prover update-workers [add|remove] [worker1] [worker2] ... [workerN] +miden-tx-prover add-worker [worker1] [worker2] ... [workerN] +miden-tx-prover remove-worker [worker1] [worker2] ... [workerN] ``` - For example: ```bash # To add 0.0.0.0:8085 and 200.58.70.4:50051 to the workers list: -miden-tx-prover update-workers add 0.0.0.0:8085 200.58.70.4:50051 +miden-tx-prover add-workers 0.0.0.0:8085 200.58.70.4:50051 # To remove 158.12.12.3:8080 and 122.122.6.6:50051 from the workers list: -miden-tx-prover update-workers remove 158.12.12.3:8080 122.122.6.6:50051 +miden-tx-prover remove-workers 158.12.12.3:8080 122.122.6.6:50051 ``` This changes will be persisted to the configuration file. diff --git a/bin/tx-prover/src/commands/mod.rs b/bin/tx-prover/src/commands/mod.rs index 89845dc94..e188006bf 100644 --- a/bin/tx-prover/src/commands/mod.rs +++ b/bin/tx-prover/src/commands/mod.rs @@ -9,7 +9,7 @@ use init::Init; use miden_tx_prover::PROVER_SERVICE_CONFIG_FILE_NAME; use proxy::StartProxy; use serde::{Deserialize, Serialize}; -use update_workers::UpdateWorkers; +use update_workers::{AddWorkers, RemoveWorkers, UpdateWorkers}; use worker::StartWorker; pub mod init; @@ -74,6 +74,10 @@ impl ProxyConfig { .map_err(|err| format!("Failed to load {} config file: {err}", config_path.display())) } + /// Saves the configuration to the config file + /// + /// This method will serialize the configuration to a TOML string and write it to the file with + /// the name defined at the [PROVER_SERVICE_CONFIG_FILE_NAME] constant in the current directory. pub(crate) fn save_to_config_file(&self) -> Result<(), String> { let mut current_dir = std::env::current_dir().map_err(|err| err.to_string())?; current_dir.push(PROVER_SERVICE_CONFIG_FILE_NAME); @@ -137,11 +141,16 @@ pub enum Command { StartWorker(StartWorker), /// Starts the proxy defined in the config file. StartProxy(StartProxy), - /// Updates the workers defined in the config file. + /// Adds workers to the proxy. + /// + /// This method will make a request to the proxy defined in the config file to add workers. It + /// will update the configuration file with the new list of workers. + AddWorkers(AddWorkers), + /// Removes workers from the proxy. /// - /// This method will make a request to the proxy defined in the config file to update the - /// workers. It will update the configuration file with the new list of workers. - UpdateWorkers(UpdateWorkers), + /// This method will make a request to the proxy defined in the config file to remove workers. + /// It will update the configuration file with the new list of workers. + RemoveWorkers(RemoveWorkers), } /// CLI entry point @@ -159,7 +168,14 @@ impl Cli { // Init does not require async, so run directly init.execute() }, - Command::UpdateWorkers(update_workers) => update_workers.execute(), + Command::AddWorkers(update_workers) => { + let update_workers: UpdateWorkers = update_workers.clone().into(); + update_workers.execute() + }, + Command::RemoveWorkers(update_workers) => { + let update_workers: UpdateWorkers = update_workers.clone().into(); + update_workers.execute() + }, } } } diff --git a/bin/tx-prover/src/commands/update_workers.rs b/bin/tx-prover/src/commands/update_workers.rs index a444dea47..d3a92f2dd 100644 --- a/bin/tx-prover/src/commands/update_workers.rs +++ b/bin/tx-prover/src/commands/update_workers.rs @@ -4,21 +4,35 @@ use serde::{Deserialize, Serialize}; use crate::commands::ProxyConfig; +// ADD WORKERS +// ================================================================================================ + +/// Add workers to the proxy +#[derive(Debug, Parser, Clone, Serialize, Deserialize)] +pub struct AddWorkers { + workers: Vec, +} + +// REMOVE WORKERS +// ================================================================================================ + +/// Remove workers from the proxy +#[derive(Debug, Parser, Clone, Serialize, Deserialize)] +pub struct RemoveWorkers { + workers: Vec, +} + +// UPDATE WORKERS +// ================================================================================================ + +/// Action to perform on the workers #[derive(clap::ValueEnum, Clone, Debug, Serialize, Deserialize)] pub enum Action { Add, Remove, } -impl Action { - pub fn as_str(&self) -> &str { - match self { - Action::Add => "add", - Action::Remove => "remove", - } - } -} - +/// Update workers in the proxy performing the specified [Action] #[derive(Debug, Parser, Clone, Serialize, Deserialize)] pub struct UpdateWorkers { pub action: Action, @@ -26,6 +40,23 @@ pub struct UpdateWorkers { } impl UpdateWorkers { + /// Makes a requests to the proxy to update the workers. + /// + /// It works by sending a GET request to the proxy with the query parameters. The query + /// parameters are serialized from the struct fields. + /// + /// This method will work only if the proxy is running and the user is in the same computer as + /// the proxy, since the proxy checks for the source IP address and checks that the sender is + /// localhost. + /// + /// The request will return the new number of workers in the X-Worker-Count header. + /// + /// # Errors + /// - If a tokio runtime cannot be created. + /// - If the query parameters cannot be serialized. + /// - If the request fails. + /// - If the status code is not successful. + /// - If the X-Worker-Count header is missing. pub fn execute(&self) -> Result<(), String> { // Define a runtime let rt = tokio::runtime::Runtime::new() @@ -55,16 +86,37 @@ impl UpdateWorkers { return Err(format!("Request failed with status code: {}", response.status())); } - // Read the X-Workers-Amount header - let workers_amount = response + // Read the X-Worker-Count header + let workers_count = response .headers() - .get("X-Workers-Amount") - .ok_or("Missing X-Workers-Amount header")? + .get("X-Worker-Count") + .ok_or("Missing X-Worker-Count header")? .to_str() .map_err(|err| err.to_string())?; - println!("New amount of workers: {}", workers_amount); + println!("New number of workers: {}", workers_count); Ok(()) } } + +// CONVERSIONS +// ================================================================================================ + +impl From for UpdateWorkers { + fn from(remove_workers: RemoveWorkers) -> Self { + UpdateWorkers { + action: Action::Remove, + workers: remove_workers.workers, + } + } +} + +impl From for UpdateWorkers { + fn from(add_workers: AddWorkers) -> Self { + UpdateWorkers { + action: Action::Add, + workers: add_workers.workers, + } + } +} diff --git a/bin/tx-prover/src/proxy/mod.rs b/bin/tx-prover/src/proxy/mod.rs index 80300898e..dfa749efe 100644 --- a/bin/tx-prover/src/proxy/mod.rs +++ b/bin/tx-prover/src/proxy/mod.rs @@ -25,13 +25,41 @@ use crate::{ }, }; +/// Localhost address +const LOCALHOST_ADDR: &str = "127.0.0.1"; + +// WORKER +// ================================================================================================ + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Worker { + worker: Backend, + is_available: bool, +} + +impl Worker { + pub fn new(worker: Backend) -> Self { + Self { worker, is_available: true } + } +} + +impl TryInto for &Worker { + type Error = String; + + fn try_into(self) -> std::result::Result { + self.worker + .as_inet() + .ok_or_else(|| "Failed to get worker address".to_string()) + .map(|worker_addr| WorkerConfig::new(&worker_addr.ip().to_string(), worker_addr.port())) + } +} + // LOAD BALANCER // ================================================================================================ /// Load balancer that uses a round robin strategy pub struct LoadBalancer { - available_workers: Arc>>, - current_workers: Arc>>, + workers: Arc>>, timeout_secs: Duration, connection_timeout_secs: Duration, max_queue_items: usize, @@ -43,9 +71,10 @@ pub struct LoadBalancer { impl LoadBalancer { /// Create a new load balancer pub fn new(workers: Vec, config: &ProxyConfig) -> Self { + let workers: Vec = workers.into_iter().map(Worker::new).collect(); + Self { - available_workers: Arc::new(RwLock::new(workers.clone())), - current_workers: Arc::new(RwLock::new(workers)), + workers: Arc::new(RwLock::new(workers.clone())), timeout_secs: Duration::from_secs(config.timeout_secs), connection_timeout_secs: Duration::from_secs(config.connection_timeout_secs), max_queue_items: config.max_queue_items, @@ -57,22 +86,24 @@ impl LoadBalancer { } } - /// Gets an available worker and removes it from the list of available workers. + /// Gets an available worker and marks it as unavailable. /// /// If no worker is available, it will return None. - pub async fn pop_available_worker(&self) -> Option { - self.available_workers.write().await.pop() + pub async fn pop_available_worker(&self) -> Option { + let mut available_workers = self.workers.write().await; + available_workers.iter_mut().find(|w| w.is_available).map(|w| { + w.is_available = false; + w.clone() + }) } - /// Adds a worker to the list of available workers. + /// Marks the given worker as available. /// - /// If the worker is already in the list of available workers, it won't be added. - /// If the worker is not in the list of current workers, it won't be added. + /// If the worker is not in the list, it won't be added. pub async fn add_available_worker(&self, worker: Backend) { - let mut available_workers = self.available_workers.write().await; - let current_workers = self.current_workers.read().await; - if !available_workers.contains(&worker) && current_workers.contains(&worker) { - available_workers.push(worker); + let mut available_workers = self.workers.write().await; + if let Some(w) = available_workers.iter_mut().find(|w| w.worker == worker) { + w.is_available = true; } } @@ -82,52 +113,47 @@ impl LoadBalancer { /// /// ## Add Action /// - If the worker exists in the current workers list, do nothing. - /// - If the worker does not exist in the current workers list, add it to both the current and - /// available workers lists. + /// - Otherwise, add it and mark it as available. /// /// ## Remove Action - /// - If the worker exists in the available workers list, remove it. /// - If the worker exists in the current workers list, remove it. /// - Otherwise, do nothing. /// /// Finally, updates the configuration file with the new list of workers. /// /// # Errors - /// - /// Returns an error if: - /// - The worker address cannot be cast into [Backend]. - /// - The configuration file cannot be loaded or saved. - /// - A worker address cannot be retrieved. + /// - If the worker cannot be created. + /// - If the configuration cannot be loaded. + /// - If the configuration cannot be saved. pub async fn update_workers( &self, update_workers: UpdateWorkers, ) -> std::result::Result<(), String> { - let mut available_workers = self.available_workers.write().await; - let mut current_workers = self.current_workers.write().await; + let mut workers = self.workers.write().await; - info!("Current workers: {:?}", available_workers); + info!("Current workers: {:?}", workers); - let workers: Vec<_> = update_workers + let workers_to_update: Vec = update_workers .workers .iter() .map(|worker| Backend::new(worker)) - .collect::>() - .map_err(|err| format!("Failed to create backend: {}", err))?; + .collect::, _>>() + .map_err(|err| format!("Failed to create backend: {}", err))? + .into_iter() + .map(Worker::new) + .collect(); match update_workers.action { Action::Add => { - for worker in workers { - if current_workers.contains(&worker) { - continue; + for worker in workers_to_update { + if !workers.iter().any(|w| w.worker == worker.worker) { + workers.push(worker); } - current_workers.push(worker.clone()); - available_workers.push(worker); } }, Action::Remove => { - for worker in workers { - available_workers.retain(|w| w != &worker); - current_workers.retain(|w| w != &worker); + for worker in workers_to_update { + workers.retain(|w| w.worker != worker.worker); } }, } @@ -135,16 +161,8 @@ impl LoadBalancer { let mut configuration = ProxyConfig::load_config_from_file() .map_err(|err| format!("Failed to load configuration: {}", err))?; - let new_list_of_workers = current_workers - .iter() - .map(|worker| { - worker.as_inet().ok_or_else(|| "Failed to get worker address".to_string()).map( - |worker_addr| { - WorkerConfig::new(&worker_addr.ip().to_string(), worker_addr.port()) - }, - ) - }) - .collect::, _>>()?; + let new_list_of_workers = + workers.iter().map(|worker| worker.try_into()).collect::, _>>()?; configuration.workers = new_list_of_workers; @@ -152,14 +170,14 @@ impl LoadBalancer { .save_to_config_file() .map_err(|err| format!("Failed to save configuration: {}", err))?; - info!("Workers updated: {:?}", available_workers); + info!("Workers updated: {:?}", workers); Ok(()) } /// Get the total number of current workers. - pub async fn current_workers_count(&self) -> usize { - self.current_workers.read().await.len() + pub async fn num_workers(&self) -> usize { + self.workers.read().await.len() } /// Handles the update workers request. @@ -218,8 +236,8 @@ impl LoadBalancer { // Successfully updated workers info!("Workers updated successfully"); - let available_workers_count = self.current_workers_count().await; - Some(create_workers_updated_response(session, available_workers_count).await) + let workers_count = self.num_workers().await; + Some(create_workers_updated_response(session, workers_count).await) } } @@ -350,7 +368,7 @@ impl ProxyHttp for LoadBalancer { info!("Client address: {:?}", client_addr); // Special handling for localhost - if client_addr.contains("127.0.0.1") { + if client_addr.contains(LOCALHOST_ADDR) { if let Some(response) = self.handle_update_workers_request(session).await { return response; } @@ -406,8 +424,11 @@ impl ProxyHttp for LoadBalancer { // Check if there is an available worker if let Some(worker) = self.pop_available_worker().await { - info!("Worker {} picked up the request with ID: {}", worker.addr, request_id); - ctx.set_worker(worker); + info!( + "Worker {} picked up the request with ID: {}", + worker.worker.addr, request_id + ); + ctx.set_worker(worker.worker); break; } info!("All workers are busy"); diff --git a/bin/tx-prover/src/utils.rs b/bin/tx-prover/src/utils.rs index cbba39e1c..8ddeb4d82 100644 --- a/bin/tx-prover/src/utils.rs +++ b/bin/tx-prover/src/utils.rs @@ -47,17 +47,23 @@ pub async fn create_too_many_requests_response( Ok(true) } +/// Create a 200 response for updated workers +/// +/// It will set the X-Worker-Count header to the number of workers. pub async fn create_workers_updated_response( session: &mut Session, workers: usize, ) -> pingora_core::Result { let mut header = ResponseHeader::build(200, None)?; - header.insert_header("X-Workers-Amount", workers.to_string())?; + header.insert_header("X-Worker-Count", workers.to_string())?; session.set_keepalive(None); session.write_response_header(Box::new(header), true).await?; Ok(true) } +/// Create a 400 response with an error message +/// +/// It will set the X-Error-Message header to the error message. pub async fn create_response_with_error_message( session: &mut Session, error_msg: String,