From e8d40017e1152254c31e12a3d55d8433089d91e8 Mon Sep 17 00:00:00 2001 From: Michal Rus Date: Wed, 27 Nov 2024 11:27:33 +0100 Subject: [PATCH 1/5] feat: use `testgen-hs` for deserializing `ApplyTxErr`ors Related to #42. --- docker-compose.yml | 5 +- flake.lock | 18 ++++ flake.nix | 13 ++- nix/devshells.nix | 5 + nix/internal/unix.nix | 4 + src/cbor.rs | 1 + src/cbor/fallback_decoder.rs | 180 +++++++++++++++++++++++++++++++++++ 7 files changed, 218 insertions(+), 8 deletions(-) create mode 100644 src/cbor/fallback_decoder.rs diff --git a/docker-compose.yml b/docker-compose.yml index 5ad7a43..7698f41 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,7 +9,6 @@ services: volumes: - ./nix/mithril-entrypoint.sh:/app/bin/entrypoint.sh - node-db:/data - cardano-node: image: ghcr.io/intersectmbo/cardano-node:9.2.0 environment: @@ -21,7 +20,6 @@ services: depends_on: init-node: condition: service_completed_successfully - blockfrost-platform: image: ghcr.io/blockfrost/blockfrost-platform:latest build: @@ -35,7 +33,7 @@ services: restart: on-failure ports: - 3000:3000 - profiles: [''] + profiles: [""] volumes: - node-ipc:/ipc entrypoint: @@ -48,7 +46,6 @@ services: - $REWARD_ADDRESS - --node-socket-path - /ipc/node.socket - blockfrost-platform-solitary: image: ghcr.io/blockfrost/blockfrost-platform:latest build: diff --git a/flake.lock b/flake.lock index d0a54c0..1d48378 100644 --- a/flake.lock +++ b/flake.lock @@ -140,9 +140,27 @@ "flake-compat": "flake-compat", "flake-parts": "flake-parts", "nixpkgs": "nixpkgs", + "testgen-hs": "testgen-hs", "treefmt-nix": "treefmt-nix" } }, + "testgen-hs": { + "flake": false, + "locked": { + "lastModified": 1732706647, + "narHash": "sha256-0eEzmUUcjQqsCY3FDLi0eQY+Mc1mAo/2e6w/tST9pLw=", + "owner": "input-output-hk", + "repo": "testgen-hs", + "rev": "263b69ca80d455967145774707f8a97f079fbd36", + "type": "github" + }, + "original": { + "owner": "input-output-hk", + "ref": "10.1.2.1", + "repo": "testgen-hs", + "type": "github" + } + }, "treefmt-nix": { "inputs": { "nixpkgs": [ diff --git a/flake.nix b/flake.nix index 46aeb01..a529d83 100644 --- a/flake.nix +++ b/flake.nix @@ -9,6 +9,8 @@ flake-compat.flake = false; cardano-node.url = "github:IntersectMBO/cardano-node/10.1.2"; cardano-node.flake = false; # otherwise, +2k dependencies we don’t really use + testgen-hs.url = "github:input-output-hk/testgen-hs/10.1.2.1"; # make sure it follows cardano-node + testgen-hs.flake = false; # otherwise, +2k dependencies we don’t really use devshell.url = "github:numtide/devshell"; devshell.inputs.nixpkgs.follows = "nixpkgs"; cardano-playground.url = "github:input-output-hk/cardano-playground/b4f47fd78beec0ea1ed880d6f0b794919e0c0463"; @@ -37,9 +39,12 @@ pkgs, ... }: { - packages = + packages = let + internal = inputs.self.internal.${system}; + in { - default = inputs.self.internal.${system}.package; + default = internal.package; + inherit (internal) tx-build cardano-address testgen-hs; } // (inputs.nixpkgs.lib.optionalAttrs (system == "x86_64-linux") { default-x86_64-windows = inputs.self.internal.x86_64-windows.package; @@ -49,7 +54,7 @@ treefmt = {pkgs, ...}: { projectRootFile = "flake.nix"; - programs.alejandra.enable = true; + programs.alejandra.enable = true; # Nix programs.prettier.enable = true; settings.formatter.prettier.options = [ "--config" @@ -60,7 +65,7 @@ ]; programs.rustfmt.enable = true; programs.yamlfmt.enable = true; - programs.taplo.enable = true; + programs.taplo.enable = true; # TOML programs.shfmt.enable = true; }; }; diff --git a/nix/devshells.nix b/nix/devshells.nix index fb2cc96..47ec4e9 100644 --- a/nix/devshells.nix +++ b/nix/devshells.nix @@ -50,6 +50,11 @@ in { category = "handy"; package = internal.tx-build; } + { + category = "handy"; + name = "testgen-hs"; + package = internal.testgen-hs; + } ]; language.c.compiler = diff --git a/nix/internal/unix.nix b/nix/internal/unix.nix index 5b1a0a7..68151b5 100644 --- a/nix/internal/unix.nix +++ b/nix/internal/unix.nix @@ -72,6 +72,10 @@ in rec { path = inputs.cardano-playground + "/static/book.play.dev.cardano.org/environments"; }; + testgen-hs-flake = (import inputs.flake-compat {src = inputs.testgen-hs;}).defaultNix; + + testgen-hs = testgen-hs-flake.packages.${targetSystem}.default; + stateDir = if pkgs.stdenv.isDarwin then "Library/Application Support/blockfrost-platform" diff --git a/src/cbor.rs b/src/cbor.rs index 5aecfda..7551447 100644 --- a/src/cbor.rs +++ b/src/cbor.rs @@ -1,2 +1,3 @@ pub mod codec; +pub mod fallback_decoder; pub mod haskell_types; diff --git a/src/cbor/fallback_decoder.rs b/src/cbor/fallback_decoder.rs new file mode 100644 index 0000000..1835c4c --- /dev/null +++ b/src/cbor/fallback_decoder.rs @@ -0,0 +1,180 @@ +use std::io::{BufRead, BufReader, Write}; +use std::process as proc; +use std::thread; +use tokio::sync::{mpsc, oneshot}; +use tracing::error; + +#[derive(Clone)] +pub struct FallbackDecoder { + sender: mpsc::Sender, +} + +struct FDRequest { + cbor: Vec, + response_tx: oneshot::Sender>, +} + +impl FallbackDecoder { + /// Starts a new child process. + pub fn spawn() -> Self { + let (sender, mut receiver) = mpsc::channel::(128); + + thread::spawn(move || { + // For retries: + let mut last_unfulfilled_request: Option = None; + + loop { + let single_run = Self::behavior(&mut receiver, &mut last_unfulfilled_request); + let restart_delay = std::time::Duration::from_secs(1); + error!( + "FallbackDecoder: will restart in {:?} because of a subprocess error: {:?}", + restart_delay, single_run + ); + std::thread::sleep(restart_delay); + } + }); + + Self { sender } + } + + fn behavior( + receiver: &mut mpsc::Receiver, + last_unfulfilled_request: &mut Option, + ) -> Result<(), String> { + // FIXME: _find_ the exe_path + // FIXME: make a release with LineBuffering + let exe_path = "/nix/store/4y2jqhw3c2i407m8rmkvlja9wdr1kqhq-testgen-hs-exe-testgen-hs-x86_64-unknown-linux-musl-10.1.2.1/bin/testgen-hs"; + + let child = proc::Command::new(exe_path) + .arg("deserialize-stream") + .stdin(proc::Stdio::piped()) + .stdout(proc::Stdio::piped()) + .spawn() + .map_err(|err| format!("couldn’t start the child: {:?}", err))?; + + let mut stdin = child.stdin.ok_or("couldn’t grab stdin".to_string())?; + let stdout = child.stdout.ok_or("couldn’t grab stdout".to_string())?; + let stdout_reader = BufReader::new(stdout); + let mut stdout_lines = stdout_reader.lines(); + + while let Some(request) = last_unfulfilled_request.take().or(receiver.blocking_recv()) { + let cbor_hex = hex::encode(&request.cbor); + *last_unfulfilled_request = Some(request); + + writeln!(stdin, "{}", cbor_hex) + .map_err(|err| format!("couldn’t write to stdin: {:?}", err))?; + + let result: Result = match stdout_lines.next() { + Some(Ok(line)) => Self::parse_json(&line), + Some(Err(e)) => Err(format!("failed to read from subprocess: {}", e))?, + None => Err("no output from subprocess".to_string())?, + }; + + let request = last_unfulfilled_request.take().unwrap(); + + // unwrap is safe, the other side would have to drop for a + // panic – can’t happen: + request.response_tx.send(result).unwrap(); + } + + Err("reached EOF".to_string()) + } + + fn parse_json(input: &str) -> Result { + let mut parsed: serde_json::Value = + serde_json::from_str(input).map_err(|e| e.to_string())?; + + parsed + .as_object() + .and_then(|obj| { + if obj.len() == 1 { + obj.get("error") + .and_then(|v| v.as_str()) + .map(|s| Err(s.to_string())) + } else { + None + } + }) + .unwrap_or_else(|| { + parsed + .get_mut("json") + .map(serde_json::Value::take) + .ok_or_else(|| "Missing 'json' field".to_string()) + }) + } + + /// Decodes a CBOR error using the child process. + pub async fn decode(&self, cbor: &[u8]) -> Result { + let (response_tx, response_rx) = oneshot::channel(); + self.sender + .send(FDRequest { + cbor: cbor.to_vec(), + response_tx, + }) + .await + .map_err(|err| format!("FallbackDecoder: failed to send request: {:?}", err))?; + + response_rx.await.map_err(|err| { + format!( + "FallbackDecoder: worker thread dropped (won’t happen): {:?}", + err + ) + })? + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[tokio::test] + async fn test_deserialization() { + let wrapper = FallbackDecoder::spawn(); + let input = hex::decode("8182068182028200a0").unwrap(); + let result = wrapper.decode(&input).await; + assert_eq!( + result, + Ok(serde_json::json!({ + "contents": { + "contents": { + "contents": { + "era": "ShelleyBasedEraConway", + "error": [ + "ConwayCertsFailure (WithdrawalsNotInRewardsCERTS (fromList []))" + ], + "kind": "ShelleyTxValidationError" + }, + "tag": "TxValidationErrorInCardanoMode" + }, + "tag": "TxCmdTxSubmitValidationError" + }, + "tag": "TxSubmitFail" + })) + ); + + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + // FIXME: zombie process and hangs, when we kill it now + + let input = hex::decode("8182068183051a000c275b1a000b35ec").unwrap(); + let result = wrapper.decode(&input).await; + assert_eq!( + result, + Ok(serde_json::json!({ + "contents": { + "contents": { + "contents": { + "era": "ShelleyBasedEraConway", + "error": [ + "ConwayTreasuryValueMismatch (Coin 796507) (Coin 734700)" + ], + "kind": "ShelleyTxValidationError" + }, + "tag": "TxValidationErrorInCardanoMode" + }, + "tag": "TxCmdTxSubmitValidationError" + }, + "tag": "TxSubmitFail" + })) + ); + } +} From 8e60e29a7198dc5d19f264fe88b8705a5dd84262 Mon Sep 17 00:00:00 2001 From: Michal Rus Date: Thu, 28 Nov 2024 11:29:07 +0100 Subject: [PATCH 2/5] feat: handle subprocess lifetimes correctly --- Cargo.lock | 143 +++++++++++++++++++++++++++++++++-- Cargo.toml | 2 + docs/next.config.mjs | 22 +++--- src/cbor/fallback_decoder.rs | 118 +++++++++++++++++++++-------- 4 files changed, 235 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 443daea..16906b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -316,6 +316,7 @@ dependencies = [ "serde", "serde_json", "serde_with", + "sysinfo", "thiserror", "tokio", "toml", @@ -324,6 +325,7 @@ dependencies = [ "tower-layer", "tracing", "tracing-subscriber", + "tracing-test", ] [[package]] @@ -467,6 +469,16 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crossbeam-deque" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + [[package]] name = "crossbeam-epoch" version = "0.9.18" @@ -891,7 +903,7 @@ checksum = "f9c7c7c8ac16c798734b8a24560c1362120597c40d5e1459f09498f8f6c8f2ba" dependencies = [ "cfg-if", "libc", - "windows", + "windows 0.52.0", ] [[package]] @@ -1037,7 +1049,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows-core", + "windows-core 0.52.0", ] [[package]] @@ -1140,9 +1152,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.158" +version = "0.2.166" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" +checksum = "c2ccc108bbc0b1331bd061864e7cd823c0cab660bbe6970e66e2c0614decde36" [[package]] name = "linux-raw-sys" @@ -1295,6 +1307,15 @@ dependencies = [ "tempfile", ] +[[package]] +name = "ntapi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" +dependencies = [ + "winapi", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1906,6 +1927,26 @@ dependencies = [ "bitflags", ] +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "regex" version = "1.10.6" @@ -2460,6 +2501,20 @@ dependencies = [ "futures-core", ] +[[package]] +name = "sysinfo" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c33cd241af0f2e9e3b5c32163b873b29956890b5342e6745b917ce9d490f4af" +dependencies = [ + "core-foundation-sys", + "libc", + "memchr", + "ntapi", + "rayon", + "windows 0.57.0", +] + [[package]] name = "system-configuration" version = "0.6.1" @@ -2830,6 +2885,27 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "tracing-test" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68" +dependencies = [ + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "try-lock" version = "0.2.5" @@ -3072,7 +3148,17 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" dependencies = [ - "windows-core", + "windows-core 0.52.0", + "windows-targets", +] + +[[package]] +name = "windows" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12342cb4d8e3b046f3d80effd474a7a02447231330ef77d71daa6fbc40681143" +dependencies = [ + "windows-core 0.57.0", "windows-targets", ] @@ -3085,17 +3171,60 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "windows-core" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2ed2439a290666cd67ecce2b0ffaad89c2a56b976b736e6ece670297897832d" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-result 0.1.2", + "windows-targets", +] + +[[package]] +name = "windows-implement" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9107ddc059d5b6fbfbffdfa7a7fe3e22a226def0b2608f72e9d552763d3e1ad7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29bee4b38ea3cde66011baa44dba677c432a78593e202392d1e9070cf2a7fca7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "windows-registry" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0" dependencies = [ - "windows-result", + "windows-result 0.2.0", "windows-strings", "windows-targets", ] +[[package]] +name = "windows-result" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e383302e8ec8515204254685643de10811af0ed97ea37210dc26fb0032647f8" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-result" version = "0.2.0" @@ -3111,7 +3240,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" dependencies = [ - "windows-result", + "windows-result 0.2.0", "windows-targets", ] diff --git a/Cargo.toml b/Cargo.toml index 7becf91..2e28c26 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,8 @@ metrics-exporter-prometheus = { version = "0.16", default-features = false } chrono = "0.4" deadpool = "0.12.1" serde_with = "3.11.0" +tracing-test = "0.2.5" +sysinfo = "0.32.1" [dev-dependencies] rstest = "0.22.0" diff --git a/docs/next.config.mjs b/docs/next.config.mjs index d5eeab4..e33e2d1 100644 --- a/docs/next.config.mjs +++ b/docs/next.config.mjs @@ -1,19 +1,19 @@ -import nextra from 'nextra' - +import nextra from "nextra"; + /** * @type {import('next').NextConfig} */ const nextConfig = { - output: 'export', + output: "export", images: { - unoptimized: true // mandatory, otherwise won't export - } + unoptimized: true, // mandatory, otherwise won't export + }, // Optional: Change the output directory `out` -> `dist` // distDir: "build" -} +}; const withNextra = nextra({ - theme: 'nextra-theme-docs', - themeConfig: './theme.config.tsx' -}) - -export default withNextra(nextConfig) + theme: "nextra-theme-docs", + themeConfig: "./theme.config.tsx", +}); + +export default withNextra(nextConfig); diff --git a/src/cbor/fallback_decoder.rs b/src/cbor/fallback_decoder.rs index 1835c4c..ac97b66 100644 --- a/src/cbor/fallback_decoder.rs +++ b/src/cbor/fallback_decoder.rs @@ -1,5 +1,9 @@ use std::io::{BufRead, BufReader, Write}; use std::process as proc; +use std::sync::{ + atomic::{self, AtomicU32}, + Arc, +}; use std::thread; use tokio::sync::{mpsc, oneshot}; use tracing::error; @@ -7,6 +11,7 @@ use tracing::error; #[derive(Clone)] pub struct FallbackDecoder { sender: mpsc::Sender, + current_child_pid: Arc, } struct FDRequest { @@ -17,6 +22,8 @@ struct FDRequest { impl FallbackDecoder { /// Starts a new child process. pub fn spawn() -> Self { + let current_child_pid = Arc::new(AtomicU32::new(u32::MAX)); + let current_child_pid_clone = current_child_pid.clone(); let (sender, mut receiver) = mpsc::channel::(128); thread::spawn(move || { @@ -24,7 +31,11 @@ impl FallbackDecoder { let mut last_unfulfilled_request: Option = None; loop { - let single_run = Self::behavior(&mut receiver, &mut last_unfulfilled_request); + let single_run = Self::spawn_child( + &mut receiver, + &mut last_unfulfilled_request, + ¤t_child_pid_clone, + ); let restart_delay = std::time::Duration::from_secs(1); error!( "FallbackDecoder: will restart in {:?} because of a subprocess error: {:?}", @@ -34,30 +45,86 @@ impl FallbackDecoder { } }); - Self { sender } + Self { + sender, + current_child_pid, + } + } + + /// Decodes a CBOR error using the child process. + pub async fn decode(&self, cbor: &[u8]) -> Result { + let (response_tx, response_rx) = oneshot::channel(); + self.sender + .send(FDRequest { + cbor: cbor.to_vec(), + response_tx, + }) + .await + .map_err(|err| format!("FallbackDecoder: failed to send request: {:?}", err))?; + + response_rx.await.map_err(|err| { + format!( + "FallbackDecoder: worker thread dropped (won’t happen): {:?}", + err + ) + })? + } + + /// Returns the current child PID: + pub fn child_pid(&self) -> Option { + match self.current_child_pid.load(atomic::Ordering::Relaxed) { + u32::MAX => None, + pid => Some(pid), + } } - fn behavior( + fn spawn_child( receiver: &mut mpsc::Receiver, last_unfulfilled_request: &mut Option, + current_child_pid: &Arc, ) -> Result<(), String> { // FIXME: _find_ the exe_path // FIXME: make a release with LineBuffering let exe_path = "/nix/store/4y2jqhw3c2i407m8rmkvlja9wdr1kqhq-testgen-hs-exe-testgen-hs-x86_64-unknown-linux-musl-10.1.2.1/bin/testgen-hs"; - let child = proc::Command::new(exe_path) + let mut child = proc::Command::new(exe_path) .arg("deserialize-stream") .stdin(proc::Stdio::piped()) .stdout(proc::Stdio::piped()) .spawn() .map_err(|err| format!("couldn’t start the child: {:?}", err))?; - let mut stdin = child.stdin.ok_or("couldn’t grab stdin".to_string())?; - let stdout = child.stdout.ok_or("couldn’t grab stdout".to_string())?; + current_child_pid.store(child.id(), atomic::Ordering::Relaxed); + + let result = Self::process_requests(&mut child, receiver, last_unfulfilled_request); + + child + .wait() + .map_err(|err| format!("couldn’t reap the child: {:?}", err))?; + + result + } + + fn process_requests( + child: &mut proc::Child, + receiver: &mut mpsc::Receiver, + last_unfulfilled_request: &mut Option, + ) -> Result<(), String> { + let stdin = child + .stdin + .as_mut() + .ok_or("couldn’t grab stdin".to_string())?; + let stdout = child + .stdout + .as_mut() + .ok_or("couldn’t grab stdout".to_string())?; let stdout_reader = BufReader::new(stdout); let mut stdout_lines = stdout_reader.lines(); - while let Some(request) = last_unfulfilled_request.take().or(receiver.blocking_recv()) { + while let Some(request) = last_unfulfilled_request + .take() + .or_else(|| receiver.blocking_recv()) + { let cbor_hex = hex::encode(&request.cbor); *last_unfulfilled_request = Some(request); @@ -70,10 +137,11 @@ impl FallbackDecoder { None => Err("no output from subprocess".to_string())?, }; + // unwrap is safe, we wrote there right before the writeln!() let request = last_unfulfilled_request.take().unwrap(); // unwrap is safe, the other side would have to drop for a - // panic – can’t happen: + // panic – can’t happen, because we control it: request.response_tx.send(result).unwrap(); } @@ -102,35 +170,17 @@ impl FallbackDecoder { .ok_or_else(|| "Missing 'json' field".to_string()) }) } - - /// Decodes a CBOR error using the child process. - pub async fn decode(&self, cbor: &[u8]) -> Result { - let (response_tx, response_rx) = oneshot::channel(); - self.sender - .send(FDRequest { - cbor: cbor.to_vec(), - response_tx, - }) - .await - .map_err(|err| format!("FallbackDecoder: failed to send request: {:?}", err))?; - - response_rx.await.map_err(|err| { - format!( - "FallbackDecoder: worker thread dropped (won’t happen): {:?}", - err - ) - })? - } } #[cfg(test)] mod tests { use super::*; #[tokio::test] + #[tracing_test::traced_test] async fn test_deserialization() { - let wrapper = FallbackDecoder::spawn(); + let decoder = FallbackDecoder::spawn(); let input = hex::decode("8182068182028200a0").unwrap(); - let result = wrapper.decode(&input).await; + let result = decoder.decode(&input).await; assert_eq!( result, Ok(serde_json::json!({ @@ -151,12 +201,16 @@ mod tests { })) ); - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + // Now, kill our child to test the restart logic: + sysinfo::System::new_all() + .process(sysinfo::Pid::from_u32(decoder.child_pid().unwrap())) + .unwrap() + .kill(); - // FIXME: zombie process and hangs, when we kill it now + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; let input = hex::decode("8182068183051a000c275b1a000b35ec").unwrap(); - let result = wrapper.decode(&input).await; + let result = decoder.decode(&input).await; assert_eq!( result, Ok(serde_json::json!({ From 4299ebae436637311f8c364fe51bf3a11c111e9d Mon Sep 17 00:00:00 2001 From: Michal Rus Date: Thu, 28 Nov 2024 12:20:53 +0100 Subject: [PATCH 3/5] feat: update `testgen-hs` and find it on `PATH`/own dir --- .github/workflows/ci.yaml | 25 ++++++++++++++++ flake.lock | 4 +-- src/cbor/fallback_decoder.rs | 55 +++++++++++++++++++++++++++++++++++- 3 files changed, 81 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index e2ecf56..30ecde1 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -29,6 +29,31 @@ jobs: components: rustfmt, rust-src, clippy - name: Build run: cargo build --release --verbose + - name: Download `testgen-hs` + shell: bash + run: | + set -euo pipefail + version="10.1.2.1" + case "${{ matrix.os }}" in + macos-latest) + target=testgen-hs-"$version"-aarch64-darwin.tar.bz2 + curl -fsSL -O https://github.com/input-output-hk/testgen-hs/releases/download/"$version"/"$target" + tar -xjf "$target" + ./testgen-hs/testgen-hs --version + ;; + ubuntu-latest) + target=testgen-hs-"$version"-x86_64-linux.tar.bz2 + curl -fsSL -O https://github.com/input-output-hk/testgen-hs/releases/download/"$version"/"$target" + tar -xjf "$target" + ./testgen-hs/testgen-hs --version + ;; + windows-latest) + target=testgen-hs-"$version"-x86_64-windows.zip + curl -fsSL -O https://github.com/input-output-hk/testgen-hs/releases/download/"$version"/"$target" + unzip "$target" + ./testgen-hs/testgen-hs.exe --version + ;; + esac - name: Test run: cargo test --verbose timeout-minutes: 10 diff --git a/flake.lock b/flake.lock index 1d48378..a8e3863 100644 --- a/flake.lock +++ b/flake.lock @@ -148,10 +148,10 @@ "flake": false, "locked": { "lastModified": 1732706647, - "narHash": "sha256-0eEzmUUcjQqsCY3FDLi0eQY+Mc1mAo/2e6w/tST9pLw=", + "narHash": "sha256-761CbbEvdVbp+51kYtPrK4rgHjojYk4mncrFNQURQ54=", "owner": "input-output-hk", "repo": "testgen-hs", - "rev": "263b69ca80d455967145774707f8a97f079fbd36", + "rev": "1266c36e2aee0a97e10fb3854bdf801e78650a5d", "type": "github" }, "original": { diff --git a/src/cbor/fallback_decoder.rs b/src/cbor/fallback_decoder.rs index ac97b66..372b2e1 100644 --- a/src/cbor/fallback_decoder.rs +++ b/src/cbor/fallback_decoder.rs @@ -19,6 +19,8 @@ struct FDRequest { response_tx: oneshot::Sender>, } +const CHILD_EXE_NAME: &str = "testgen-hs"; + impl FallbackDecoder { /// Starts a new child process. pub fn spawn() -> Self { @@ -70,6 +72,55 @@ impl FallbackDecoder { })? } + /// A heuristic to find the child binary that we’ll use. + pub fn locate_child_binary() -> Result { + use std::env; + + let binary_name = if cfg!(windows) { + format!("{}.exe", CHILD_EXE_NAME) + } else { + CHILD_EXE_NAME.to_string() + }; + + // Check the directory of the current binary: + if let Ok(current_exe) = env::current_exe() { + if let Some(current_dir) = current_exe.parent() { + let potential_path = current_dir.join(&binary_name); + if potential_path.is_file() { + return Ok(potential_path.to_string_lossy().into_owned()); + } + } + } + + // Check PATH: + if let Ok(paths) = env::var("PATH") { + for path in env::split_paths(&paths) { + let potential_path = path.join(&binary_name); + if potential_path.is_file() { + return Ok(potential_path.to_string_lossy().into_owned()); + } + } + } + + // Check CHILD_EXE_NAME/CHILD_EXE_NAME.exe in the current directory if + // running tests and it contains Cargo.toml: + if cfg!(test) { + if let Ok(current_dir) = env::current_dir() { + if current_dir.join("Cargo.toml").is_file() { + let potential_path = current_dir.join(CHILD_EXE_NAME).join(&binary_name); + if potential_path.is_file() { + return Ok(potential_path.to_string_lossy().into_owned()); + } + } + } + } + + Err(format!( + "Could not find binary '{}' in the current directory or on PATH", + binary_name + )) + } + /// Returns the current child PID: pub fn child_pid(&self) -> Option { match self.current_child_pid.load(atomic::Ordering::Relaxed) { @@ -85,7 +136,7 @@ impl FallbackDecoder { ) -> Result<(), String> { // FIXME: _find_ the exe_path // FIXME: make a release with LineBuffering - let exe_path = "/nix/store/4y2jqhw3c2i407m8rmkvlja9wdr1kqhq-testgen-hs-exe-testgen-hs-x86_64-unknown-linux-musl-10.1.2.1/bin/testgen-hs"; + let exe_path = Self::locate_child_binary().unwrap_or(CHILD_EXE_NAME.to_string()); let mut child = proc::Command::new(exe_path) .arg("deserialize-stream") @@ -178,6 +229,8 @@ mod tests { #[tokio::test] #[tracing_test::traced_test] async fn test_deserialization() { + FallbackDecoder::locate_child_binary().unwrap(); + let decoder = FallbackDecoder::spawn(); let input = hex::decode("8182068182028200a0").unwrap(); let result = decoder.decode(&input).await; From 5275170eaffb1dcce0069bc85330cfd8e8028942 Mon Sep 17 00:00:00 2001 From: Michal Rus Date: Thu, 28 Nov 2024 15:59:18 +0100 Subject: [PATCH 4/5] feat: integrate `FallbackDecoder` with the API, add it to artifacts Artifacts being: GitHub Workflow artifacts (including Windows), Nix packages, Docker image. --- .github/workflows/ci.yaml | 1 + Dockerfile | 8 ++++++-- nix/internal/unix.nix | 10 ++++++++++ src/cbor/fallback_decoder.rs | 4 ++-- src/errors.rs | 21 +++++++++++++++----- src/main.rs | 10 +++++++++- src/node/connection.rs | 6 +++++- src/node/pool.rs | 5 +++-- src/node/pool_manager.rs | 4 +++- src/node/transactions.rs | 37 +++++++++++++++++++++--------------- 10 files changed, 77 insertions(+), 29 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 30ecde1..b12c0e5 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -66,6 +66,7 @@ jobs: run: | mkdir -p artifacts cp target/release/blockfrost-platform* artifacts/ + cp -r testgen-hs artifacts/ - name: Upload artifacts if: github.event_name == 'workflow_dispatch' && github.event.inputs.upload_artifacts == 'true' uses: actions/upload-artifact@v4 diff --git a/Dockerfile b/Dockerfile index 722c79a..d8571a3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ FROM lukemathwalker/cargo-chef:latest-rust-slim-bookworm AS base -RUN apt update ; apt install sccache pkg-config libssl-dev -y +RUN apt update ; apt install sccache pkg-config libssl-dev bzip2 -y ENV RUSTC_WRAPPER=sccache SCCACHE_DIR=/sccache WORKDIR /app @@ -10,6 +10,10 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \ --mount=type=cache,target=$SCCACHE_DIR,sharing=locked \ ls -l ; cargo chef prepare --recipe-path recipe.json +FROM base AS downloader +ADD https://github.com/input-output-hk/testgen-hs/releases/download/10.1.2.1/testgen-hs-10.1.2.1-x86_64-linux.tar.bz2 /app/ +RUN tar -xjf testgen-hs-*.tar.* && /app/testgen-hs/testgen-hs --version + FROM base AS builder COPY --from=planner /app/recipe.json recipe.json RUN --mount=type=cache,target=/usr/local/cargo/registry \ @@ -21,9 +25,9 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \ --mount=type=cache,target=$SCCACHE_DIR,sharing=locked \ cargo build --release - FROM gcr.io/distroless/cc-debian12 as runtime COPY --from=builder /app/target/release/blockfrost-platform /app/ +COPY --from=downloader /app/testgen-hs /app/testgen-hs EXPOSE 3000/tcp STOPSIGNAL SIGINT ENTRYPOINT ["/app/blockfrost-platform"] diff --git a/nix/internal/unix.nix b/nix/internal/unix.nix index 68151b5..2603090 100644 --- a/nix/internal/unix.nix +++ b/nix/internal/unix.nix @@ -36,6 +36,16 @@ in rec { package = craneLib.buildPackage (commonArgs // { inherit cargoArtifacts; + preCheck = '' + export PATH=${lib.makeBinPath [testgen-hs]}:"$PATH" + ''; + postInstall = '' + chmod -R +w $out + mv $out/bin $out/libexec + ln -sf ${testgen-hs}/bin $out/libexec/testgen-hs + mkdir -p $out/bin + ln -sf $out/libexec/blockfrost-platform $out/bin/ + ''; }); cardano-node-flake = let diff --git a/src/cbor/fallback_decoder.rs b/src/cbor/fallback_decoder.rs index 372b2e1..1632ae6 100644 --- a/src/cbor/fallback_decoder.rs +++ b/src/cbor/fallback_decoder.rs @@ -82,10 +82,10 @@ impl FallbackDecoder { CHILD_EXE_NAME.to_string() }; - // Check the directory of the current binary: + // Check in the CHILD_EXE_NAME subdirectory in the directory of the current binary: if let Ok(current_exe) = env::current_exe() { if let Some(current_dir) = current_exe.parent() { - let potential_path = current_dir.join(&binary_name); + let potential_path = current_dir.join(CHILD_EXE_NAME).join(&binary_name); if potential_path.is_file() { return Ok(potential_path.to_string_lossy().into_owned()); } diff --git a/src/errors.rs b/src/errors.rs index 413f2ac..38e18c2 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -31,6 +31,7 @@ pub struct BlockfrostError { pub status_code: u16, pub error: String, pub message: String, + pub details: Option, } impl From for BlockfrostError { @@ -105,6 +106,7 @@ impl BlockfrostError { error: "Not Found".to_string(), message: "The requested component has not been found.".to_string(), status_code: 404, + details: None, } } @@ -114,6 +116,17 @@ impl BlockfrostError { error: "Bad Request".to_string(), message, status_code: 400, + details: None, + } + } + + /// Our custom 400 error + pub fn custom_400_details(message: String, details: serde_json::Value) -> Self { + Self { + error: "Bad Request".to_string(), + message, + status_code: 400, + details: Some(details), } } @@ -123,6 +136,7 @@ impl BlockfrostError { error: "Internal Server Error".to_string(), message: error, status_code: 500, + details: None, } } @@ -132,6 +146,7 @@ impl BlockfrostError { error: "Internal Server Error".to_string(), message: "An unexpected response was received from the backend.".to_string(), status_code: 500, + details: None, } } @@ -158,11 +173,7 @@ impl IntoResponse for BlockfrostError { error!("Error occurred: {} - {}", self.error, self.message); - let error_response = Self { - error: self.error.clone(), - message: self.message.clone(), - status_code: self.status_code, - }; + let error_response = self.clone(); (status_code, Json(error_response)).into_response() } diff --git a/src/main.rs b/src/main.rs index 7bf9fc6..b76c7b7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,7 @@ use axum::{ use blockfrost_platform::{ api::{self, metrics::setup_metrics_recorder, root, tx_submit}, background_tasks::node_health_check_task, + cbor::fallback_decoder::FallbackDecoder, cli::{Args, Config}, errors::{AppError, BlockfrostError}, icebreakers_api::IcebreakersAPI, @@ -27,7 +28,14 @@ async fn main() -> Result<(), AppError> { // Setup logging setup_tracing(&config); - let node_conn_pool = NodePool::new(&config)?; + // Set up FallbackDecoder + info!( + "Using {} as a fallback CBOR error decoder", + FallbackDecoder::locate_child_binary().map_err(AppError::Server)? + ); + let fallback_decoder = FallbackDecoder::spawn(); + + let node_conn_pool = NodePool::new(&config, fallback_decoder)?; let icebreakers_api = IcebreakersAPI::new(&config).await?; let prometheus_handle = setup_metrics_recorder(); diff --git a/src/node/connection.rs b/src/node/connection.rs index 4878375..96b8555 100644 --- a/src/node/connection.rs +++ b/src/node/connection.rs @@ -1,4 +1,7 @@ -use crate::{cbor::haskell_types::TxValidationError, BlockfrostError}; +use crate::{ + cbor::fallback_decoder::FallbackDecoder, cbor::haskell_types::TxValidationError, + BlockfrostError, +}; use pallas_codec::minicbor::{display, Decoder}; use pallas_network::{ facades::NodeClient as NodeClientFacade, miniprotocols::localstate, multiplexer::Error, @@ -13,6 +16,7 @@ pub struct NodeClient { /// *always* [`Some`]. See [`NodeConnPoolManager::recycle`] for an /// explanation. pub(in crate::node) client: Option, + pub(in crate::node) fallback_decoder: FallbackDecoder, } impl NodeClient { diff --git a/src/node/pool.rs b/src/node/pool.rs index 0769cc1..e7a60a6 100644 --- a/src/node/pool.rs +++ b/src/node/pool.rs @@ -1,5 +1,5 @@ use super::pool_manager::NodePoolManager; -use crate::{cli::Config, AppError}; +use crate::{cbor::fallback_decoder::FallbackDecoder, cli::Config, AppError}; use deadpool::managed::{Object, Pool}; /// This represents a pool of Node2Client connections to a single `cardano-node`. @@ -13,10 +13,11 @@ pub struct NodePool { impl NodePool { /// Creates a new pool of [`NodeConn`] connections. - pub fn new(config: &Config) -> Result { + pub fn new(config: &Config, fallback_decoder: FallbackDecoder) -> Result { let manager = NodePoolManager { network_magic: config.network_magic, socket_path: config.node_socket_path.to_string(), + fallback_decoder, }; let pool_manager = deadpool::managed::Pool::builder(manager) .max_size(config.max_pool_connections) diff --git a/src/node/pool_manager.rs b/src/node/pool_manager.rs index ff37aa4..b3967bd 100644 --- a/src/node/pool_manager.rs +++ b/src/node/pool_manager.rs @@ -1,5 +1,5 @@ use super::connection::NodeClient; -use crate::AppError; +use crate::{cbor::fallback_decoder::FallbackDecoder, AppError}; use deadpool::managed::{Manager, Metrics, RecycleError, RecycleResult}; use metrics::gauge; use pallas_network::facades::NodeClient as NodeClientFacade; @@ -8,6 +8,7 @@ use tracing::{error, info}; pub struct NodePoolManager { pub network_magic: u64, pub socket_path: String, + pub fallback_decoder: FallbackDecoder, } impl Manager for NodePoolManager { @@ -27,6 +28,7 @@ impl Manager for NodePoolManager { Ok(NodeClient { client: Some(connection), + fallback_decoder: self.fallback_decoder.clone(), }) } Err(err) => { diff --git a/src/node/transactions.rs b/src/node/transactions.rs index 17cfc3a..eb994f0 100644 --- a/src/node/transactions.rs +++ b/src/node/transactions.rs @@ -41,17 +41,23 @@ impl NodeClient { Ok(txid) } Ok(Response::Rejected(reason)) => { - let reason = reason.0; - let msg_res = Self::try_decode_error(&reason); - - match msg_res { - Ok(decoded_error) => { - let error_response = Self::generate_error_response(decoded_error); - let error_message = serde_json::to_string(&error_response) - .unwrap_or_else(|_| "Failed to serialize error response".to_string()); - warn!(error_message); - - Err(BlockfrostError::custom_400(error_message)) + // The [2..] is a Pallas bug, cf. . + let reason = &reason.0[2..]; + + match self.fallback_decoder.decode(reason).await { + Ok(submit_api_json) => { + let error_message = "TxSubmitFail".to_string(); + warn!( + "{}: {} ~ {:?}", + error_message, + hex::encode(reason), + submit_api_json + ); + + Err(BlockfrostError::custom_400_details( + error_message, + submit_api_json, + )) } Err(e) => { @@ -73,7 +79,7 @@ impl NodeClient { } /// Mimicks the data structure of the error response from the cardano-submit-api - fn generate_error_response(error: TxValidationError) -> TxSubmitFail { + fn _unused_i_i_i_i_i_i_i_generate_error_response(error: TxValidationError) -> TxSubmitFail { use crate::cbor::haskell_types::{ TxCmdError::TxCmdTxSubmitValidationError, TxSubmitFail::TxSubmitFail, TxValidationErrorInCardanoMode::TxValidationErrorInCardanoMode, @@ -103,9 +109,10 @@ mod tests { era: ShelleyBasedEraConway, }; - let error_string = - serde_json::to_string(&NodeClient::generate_error_response(validation_error)) - .expect("Failed to convert error to JSON"); + let error_string = serde_json::to_string( + &NodeClient::_unused_i_i_i_i_i_i_i_generate_error_response(validation_error), + ) + .expect("Failed to convert error to JSON"); let expected_error_string = r#"{"tag":"TxSubmitFail","contents":{"tag":"TxCmdTxSubmitValidationError","contents":{"tag":"TxValidationErrorInCardanoMode","contents":{"kind":"ShelleyTxValidationError","error":["MempoolFailure (error1)","MempoolFailure (error2)"],"era":"ShelleyBasedEraConway"}}}}"#; assert_eq!(error_string, expected_error_string); From df8f27d9c9531652bdf7af3a9276cc00bf75da67 Mon Sep 17 00:00:00 2001 From: Michal Rus Date: Thu, 28 Nov 2024 16:22:48 +0100 Subject: [PATCH 5/5] feat: add a sanity check at startup --- src/cbor/fallback_decoder.rs | 56 ++++++++++++++++++++++-------------- src/main.rs | 4 +++ 2 files changed, 38 insertions(+), 22 deletions(-) diff --git a/src/cbor/fallback_decoder.rs b/src/cbor/fallback_decoder.rs index 1632ae6..de19f49 100644 --- a/src/cbor/fallback_decoder.rs +++ b/src/cbor/fallback_decoder.rs @@ -72,6 +72,37 @@ impl FallbackDecoder { })? } + /// This function is called at startup, so that we make sure that the worker is reasonable. + pub async fn startup_sanity_test(&self) -> Result<(), String> { + let input = hex::decode("8182068182028200a0").map_err(|err| err.to_string())?; + let result = self.decode(&input).await; + let expected = serde_json::json!({ + "contents": { + "contents": { + "contents": { + "era": "ShelleyBasedEraConway", + "error": [ + "ConwayCertsFailure (WithdrawalsNotInRewardsCERTS (fromList []))" + ], + "kind": "ShelleyTxValidationError" + }, + "tag": "TxValidationErrorInCardanoMode" + }, + "tag": "TxCmdTxSubmitValidationError" + }, + "tag": "TxSubmitFail" + }); + + if result == Ok(expected) { + Ok(()) + } else { + Err(format!( + "FallbackDecoder: startup_sanity_test failed: {:?}", + result + )) + } + } + /// A heuristic to find the child binary that we’ll use. pub fn locate_child_binary() -> Result { use std::env; @@ -228,31 +259,12 @@ mod tests { use super::*; #[tokio::test] #[tracing_test::traced_test] - async fn test_deserialization() { + async fn test_fallback_decoder() { FallbackDecoder::locate_child_binary().unwrap(); let decoder = FallbackDecoder::spawn(); - let input = hex::decode("8182068182028200a0").unwrap(); - let result = decoder.decode(&input).await; - assert_eq!( - result, - Ok(serde_json::json!({ - "contents": { - "contents": { - "contents": { - "era": "ShelleyBasedEraConway", - "error": [ - "ConwayCertsFailure (WithdrawalsNotInRewardsCERTS (fromList []))" - ], - "kind": "ShelleyTxValidationError" - }, - "tag": "TxValidationErrorInCardanoMode" - }, - "tag": "TxCmdTxSubmitValidationError" - }, - "tag": "TxSubmitFail" - })) - ); + + decoder.startup_sanity_test().await.unwrap(); // Now, kill our child to test the restart logic: sysinfo::System::new_all() diff --git a/src/main.rs b/src/main.rs index b76c7b7..e7a1278 100644 --- a/src/main.rs +++ b/src/main.rs @@ -34,6 +34,10 @@ async fn main() -> Result<(), AppError> { FallbackDecoder::locate_child_binary().map_err(AppError::Server)? ); let fallback_decoder = FallbackDecoder::spawn(); + fallback_decoder + .startup_sanity_test() + .await + .map_err(AppError::Server)?; let node_conn_pool = NodePool::new(&config, fallback_decoder)?; let icebreakers_api = IcebreakersAPI::new(&config).await?;