From 9a2d9dc19bbf31fddb713961675e8d3cdbf7c553 Mon Sep 17 00:00:00 2001 From: Jamey Sharp Date: Fri, 28 Jun 2024 12:41:20 -0700 Subject: [PATCH] Allow capturing logging endpoint messages Some integration tests need to capture log output in order to verify that logging endpoints work correctly from guests, but it's also potentially useful for people using Viceroy as a library. The previous way that the integration tests did this was not reliable when multiple tests ran in parallel, because there was only one global hook for capturing logs. Exposing this as a configuration option on execution contexts instead allows each test to set independent capture buffers. I guess this should be considered a breaking API change since `viceroy_lib::logging::LOG_WRITER` was exported publicly from the crate. --- cli/tests/integration/common.rs | 19 +++++++++++--- cli/tests/integration/logging.rs | 23 ++++++---------- lib/src/execute.rs | 35 ++++++++++++++++++------- lib/src/linking.rs | 4 +-- lib/src/logging.rs | 45 +++++++++++++++----------------- lib/src/session.rs | 16 ++++++++++-- 6 files changed, 86 insertions(+), 56 deletions(-) diff --git a/cli/tests/integration/common.rs b/cli/tests/integration/common.rs index f9069a3b..d7c4e8d0 100644 --- a/cli/tests/integration/common.rs +++ b/cli/tests/integration/common.rs @@ -2,10 +2,14 @@ use futures::stream::StreamExt; use hyper::{service, Body as HyperBody, Request, Response, Server, Uri}; -use std::net::Ipv4Addr; use std::{ - collections::HashSet, convert::Infallible, future::Future, net::SocketAddr, path::PathBuf, - sync::Arc, + collections::HashSet, + convert::Infallible, + future::Future, + io::Write, + net::{Ipv4Addr, SocketAddr}, + path::PathBuf, + sync::{Arc, Mutex}, }; use tracing_subscriber::filter::EnvFilter; use viceroy_lib::config::UnknownImportBehavior; @@ -79,6 +83,7 @@ pub struct Test { geolocation: Geolocation, object_stores: ObjectStores, secret_stores: SecretStores, + capture_logs: Arc>, log_stdout: bool, log_stderr: bool, via_hyper: bool, @@ -100,6 +105,7 @@ impl Test { geolocation: Geolocation::new(), object_stores: ObjectStores::new(), secret_stores: SecretStores::new(), + capture_logs: Arc::new(Mutex::new(std::io::stdout())), log_stdout: false, log_stderr: false, via_hyper: false, @@ -121,6 +127,7 @@ impl Test { geolocation: Geolocation::new(), object_stores: ObjectStores::new(), secret_stores: SecretStores::new(), + capture_logs: Arc::new(Mutex::new(std::io::stdout())), log_stdout: false, log_stderr: false, via_hyper: false, @@ -235,6 +242,11 @@ impl Test { self } + pub fn capture_logs(mut self, capture_logs: Arc>) -> Self { + self.capture_logs = capture_logs; + self + } + /// Treat stderr as a logging endpoint for this test. pub fn log_stderr(self) -> Self { Self { @@ -322,6 +334,7 @@ impl Test { .with_geolocation(self.geolocation.clone()) .with_object_stores(self.object_stores.clone()) .with_secret_stores(self.secret_stores.clone()) + .with_capture_logs(self.capture_logs.clone()) .with_log_stderr(self.log_stderr) .with_log_stdout(self.log_stdout); diff --git a/cli/tests/integration/logging.rs b/cli/tests/integration/logging.rs index be4326ec..192ae3af 100644 --- a/cli/tests/integration/logging.rs +++ b/cli/tests/integration/logging.rs @@ -3,19 +3,16 @@ use { hyper::StatusCode, std::{ io::{self, Write}, - sync::mpsc, + sync::{Arc, Mutex}, }, - viceroy_lib::logging, }; -struct LogWriter(mpsc::Sender>); +struct LogWriter(Vec>); impl Write for LogWriter { fn write(&mut self, buf: &[u8]) -> io::Result { - match self.0.send(buf.to_owned()) { - Ok(()) => Ok(buf.len()), - Err(_) => Err(io::ErrorKind::ConnectionReset.into()), - } + self.0.push(buf.to_owned()); + Ok(buf.len()) } fn flush(&mut self) -> io::Result<()> { @@ -23,16 +20,11 @@ impl Write for LogWriter { } } -fn setup_log_writer() -> mpsc::Receiver> { - let (send, recv) = mpsc::channel(); - *logging::LOG_WRITER.lock().unwrap() = Box::new(LogWriter(send)); - recv -} - #[tokio::test(flavor = "multi_thread")] async fn logging_works() -> TestResult { - let log_recv = setup_log_writer(); + let log_writer = Arc::new(Mutex::new(LogWriter(Vec::new()))); let resp = Test::using_fixture("logging.wasm") + .capture_logs(log_writer.clone()) .log_stderr() .log_stdout() .against_empty() @@ -40,7 +32,8 @@ async fn logging_works() -> TestResult { assert_eq!(resp.status(), StatusCode::OK); - let read_log_line = || String::from_utf8(log_recv.recv().unwrap()).unwrap(); + let mut logs = std::mem::take(&mut log_writer.lock().unwrap().0).into_iter(); + let mut read_log_line = || String::from_utf8(logs.next().unwrap()).unwrap(); assert_eq!(read_log_line(), "inigo :: Who are you?\n"); assert_eq!(read_log_line(), "mib :: No one of consequence.\n"); diff --git a/lib/src/execute.rs b/lib/src/execute.rs index 7654bf18..923a0435 100644 --- a/lib/src/execute.rs +++ b/lib/src/execute.rs @@ -1,17 +1,14 @@ //! Guest code execution. -use std::time::SystemTime; - -use wasmtime::GuestProfiler; - -use crate::config::UnknownImportBehavior; - use { crate::{ adapt, body::Body, component as compute, - config::{Backends, DeviceDetection, Dictionaries, ExperimentalModule, Geolocation}, + config::{ + Backends, DeviceDetection, Dictionaries, ExperimentalModule, Geolocation, + UnknownImportBehavior, + }, downstream::prepare_request, error::ExecutionError, linking::{create_store, link_host_functions, ComponentCtx, WasmCtx}, @@ -25,18 +22,19 @@ use { std::{ collections::HashSet, fs, + io::Write, net::{IpAddr, Ipv4Addr}, path::{Path, PathBuf}, sync::atomic::{AtomicBool, AtomicU64, Ordering}, - sync::Arc, + sync::{Arc, Mutex}, thread::{self, JoinHandle}, - time::{Duration, Instant}, + time::{Duration, Instant, SystemTime}, }, tokio::sync::oneshot::{self, Sender}, tracing::{event, info, info_span, warn, Instrument, Level}, wasmtime::{ component::{self, Component}, - Engine, InstancePre, Linker, Module, ProfilingStrategy, + Engine, GuestProfiler, InstancePre, Linker, Module, ProfilingStrategy, }, wasmtime_wasi::I32Exit, }; @@ -80,6 +78,8 @@ pub struct ExecuteCtx { dictionaries: Arc, /// Path to the config, defaults to None config_path: Arc>, + /// Where to direct logging endpoint messages, defaults to stdout + capture_logs: Arc>, /// Whether to treat stdout as a logging endpoint log_stdout: bool, /// Whether to treat stderr as a logging endpoint @@ -217,6 +217,7 @@ impl ExecuteCtx { tls_config: TlsConfig::new()?, dictionaries: Arc::new(Dictionaries::default()), config_path: Arc::new(None), + capture_logs: Arc::new(Mutex::new(std::io::stdout())), log_stdout: false, log_stderr: false, next_req_id: Arc::new(AtomicU64::new(0)), @@ -295,6 +296,18 @@ impl ExecuteCtx { self } + /// Where to direct logging endpoint messages. Defaults to stdout. + pub fn capture_logs(&self) -> Arc> { + self.capture_logs.clone() + } + + /// Set where to direct logging endpoint messages for this execution + /// context. Defaults to stdout. + pub fn with_capture_logs(mut self, capture_logs: Arc>) -> Self { + self.capture_logs = capture_logs; + self + } + /// Whether to treat stdout as a logging endpoint. pub fn log_stdout(&self) -> bool { self.log_stdout @@ -427,6 +440,7 @@ impl ExecuteCtx { req, sender, remote, + &self, self.backends.clone(), self.device_detection.clone(), self.geolocation.clone(), @@ -580,6 +594,7 @@ impl ExecuteCtx { req, sender, remote, + &self, self.backends.clone(), self.device_detection.clone(), self.geolocation.clone(), diff --git a/lib/src/linking.rs b/lib/src/linking.rs index 2e65edfe..f23bb6f5 100644 --- a/lib/src/linking.rs +++ b/lib/src/linking.rs @@ -259,13 +259,13 @@ fn make_wasi_ctx(ctx: &ExecuteCtx, session: &Session) -> WasiCtxBuilder { .env("FASTLY_TRACE_ID", &format!("{:032x}", session.req_id())); if ctx.log_stdout() { - wasi_ctx.stdout(LogEndpoint::new(b"stdout")); + wasi_ctx.stdout(LogEndpoint::new(b"stdout", ctx.capture_logs())); } else { wasi_ctx.inherit_stdout(); } if ctx.log_stderr() { - wasi_ctx.stderr(LogEndpoint::new(b"stderr")); + wasi_ctx.stderr(LogEndpoint::new(b"stderr", ctx.capture_logs())); } else { wasi_ctx.inherit_stderr(); } diff --git a/lib/src/logging.rs b/lib/src/logging.rs index 572578fe..27ea9219 100644 --- a/lib/src/logging.rs +++ b/lib/src/logging.rs @@ -1,26 +1,23 @@ -use { - lazy_static::lazy_static, - std::{ - io::{self, Write}, - sync::Mutex, - }, +use std::{ + io::{self, Write}, + sync::{Arc, Mutex}, }; -/// A logging endpoint, which for Viceroy is just a name. -pub struct LogEndpoint(Vec); - -lazy_static! { - /// The underlying writer to use for all log messages. It defaults to `stdout`, - /// but can be redirected for tests. We make this a static, rather than e.g. - /// a field in `ExecuteCtx`, because the `Write` implementation for `LogEndpoint` - /// doesn't have direct access to context data. - pub static ref LOG_WRITER: Mutex> = Mutex::new(Box::new(io::stdout())); +/// A named logging endpoint. +#[derive(Clone)] +pub struct LogEndpoint { + name: Vec, + writer: Arc>, } impl LogEndpoint { - /// Allocate a new `LogEndpoint` with the given name. - pub fn new(name: &[u8]) -> LogEndpoint { - LogEndpoint(name.to_owned()) + /// Allocate a new `LogEndpoint` with the given name, with log messages sent + /// to the given writer. + pub fn new(name: &[u8], writer: Arc>) -> LogEndpoint { + LogEndpoint { + name: name.to_owned(), + writer, + } } /// Write a log entry to this endpoint. @@ -28,7 +25,7 @@ impl LogEndpoint { /// Log entries are prefixed with the endpoint name and terminated with a newline. /// Any newlines in the message will be escaped to the string r"\n". /// - /// The entry is written atomically to `LOG_WRITER`. + /// The entry is written atomically to the writer given to [`LogEndpoint::new`]. pub fn write_entry(&self, mut msg: &[u8]) -> io::Result<()> { const LOG_ENDPOINT_DELIM: &[u8] = b" :: "; @@ -44,9 +41,9 @@ impl LogEndpoint { // Accumulate log entry into a buffer before writing, while escaping newlines let mut to_write = - Vec::with_capacity(msg.len() + self.0.len() + LOG_ENDPOINT_DELIM.len() + 1); + Vec::with_capacity(msg.len() + self.name.len() + LOG_ENDPOINT_DELIM.len() + 1); - to_write.extend_from_slice(&self.0); + to_write.extend_from_slice(&self.name); to_write.extend_from_slice(LOG_ENDPOINT_DELIM); for &byte in msg { if byte == b'\n' { @@ -57,7 +54,7 @@ impl LogEndpoint { } to_write.push(b'\n'); - LOG_WRITER.lock().unwrap().write_all(&to_write) + self.writer.lock().unwrap().write_all(&to_write) } } @@ -68,13 +65,13 @@ impl Write for LogEndpoint { } fn flush(&mut self) -> io::Result<()> { - LOG_WRITER.lock().unwrap().flush() + self.writer.lock().unwrap().flush() } } impl wasmtime_wasi::StdoutStream for LogEndpoint { fn stream(&self) -> Box { - Box::new(LogEndpoint(self.0.clone())) + Box::new(self.clone()) } fn isatty(&self) -> bool { diff --git a/lib/src/session.rs b/lib/src/session.rs index 1cf1acbf..adc03d88 100644 --- a/lib/src/session.rs +++ b/lib/src/session.rs @@ -23,11 +23,19 @@ use { ObjectStoreHandle, PendingKvDeleteHandle, PendingKvInsertHandle, PendingKvLookupHandle, PendingRequestHandle, RequestHandle, ResponseHandle, SecretHandle, SecretStoreHandle, }, + ExecuteCtx, }, cranelift_entity::{entity_impl, PrimaryMap}, futures::future::{self, FutureExt}, http::{request, response, HeaderMap, Request, Response}, - std::{collections::HashMap, future::Future, net::IpAddr, path::PathBuf, sync::Arc}, + std::{ + collections::HashMap, + future::Future, + io::Write, + net::IpAddr, + path::PathBuf, + sync::{Arc, Mutex}, + }, tokio::sync::oneshot::Sender, }; @@ -65,6 +73,8 @@ pub struct Session { /// [parts]: https://docs.rs/http/latest/http/response/struct.Parts.html /// [resp]: https://docs.rs/http/latest/http/response/struct.Response.html resp_parts: PrimaryMap>, + /// Where to direct logging endpoint messages. + capture_logs: Arc>, /// A handle map for logging endpoints. log_endpoints: PrimaryMap, /// A by-name map for logging endpoints. @@ -131,6 +141,7 @@ impl Session { req: Request, resp_sender: Sender>, client_ip: IpAddr, + ctx: &ExecuteCtx, backends: Arc, device_detection: Arc, geolocation: Arc, @@ -158,6 +169,7 @@ impl Session { req_parts, resp_parts: PrimaryMap::new(), downstream_resp: DownstreamResponse::new(resp_sender), + capture_logs: ctx.capture_logs(), log_endpoints: PrimaryMap::new(), log_endpoints_by_name: HashMap::new(), backends, @@ -523,7 +535,7 @@ impl Session { if let Some(handle) = self.log_endpoints_by_name.get(name).copied() { return handle; } - let endpoint = LogEndpoint::new(name); + let endpoint = LogEndpoint::new(name, self.capture_logs.clone()); let handle = self.log_endpoints.push(endpoint); self.log_endpoints_by_name.insert(name.to_owned(), handle); handle