diff --git a/Cargo.lock b/Cargo.lock index cc23722b..be81ac6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2422,6 +2422,7 @@ dependencies = [ "hyper", "itertools 0.10.5", "lazy_static", + "pin-project", "regex", "rustls", "rustls-native-certs", diff --git a/Cargo.toml b/Cargo.toml index 10705ddf..c2fde00e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ base64 = "0.21.2" clap = { version = "^4.0.18", features = ["derive"] } hyper = { version = "=0.14.26", features = ["full"] } itertools = "0.10.5" +pin-project = "1.0.8" rustls = { version = "0.21.5", features = ["dangerous_configuration"] } rustls-pemfile = "1.0.3" serde_json = "1.0.59" diff --git a/cli/tests/integration/main.rs b/cli/tests/integration/main.rs index 1896211b..9e303406 100644 --- a/cli/tests/integration/main.rs +++ b/cli/tests/integration/main.rs @@ -25,3 +25,4 @@ mod upstream; mod upstream_async; mod upstream_dynamic; mod upstream_streaming; +mod vcpu_time; diff --git a/cli/tests/integration/vcpu_time.rs b/cli/tests/integration/vcpu_time.rs new file mode 100644 index 00000000..610b719d --- /dev/null +++ b/cli/tests/integration/vcpu_time.rs @@ -0,0 +1,29 @@ +use crate::{ + common::{Test, TestResult}, + viceroy_test, +}; +use hyper::{Request, Response, StatusCode}; + +viceroy_test!(vcpu_time_getter_works, |is_component| { + let req = Request::get("/") + .header("Accept", "text/html") + .body("Hello, world!") + .unwrap(); + + let resp = Test::using_fixture("vcpu_time_test.wasm") + .adapt_component(is_component) + .backend("slow-server", "/", None, |_| { + std::thread::sleep(std::time::Duration::from_millis(3000)); + Response::builder() + .status(StatusCode::OK) + .body(vec![]) + .unwrap() + }) + .await + .against(req) + .await?; + + println!("resp.status() = {}", resp.status()); + assert_eq!(resp.status(), StatusCode::OK); + Ok(()) +}); diff --git a/cli/tests/trap-test/Cargo.lock b/cli/tests/trap-test/Cargo.lock index 5fe5a907..da8838c7 100644 --- a/cli/tests/trap-test/Cargo.lock +++ b/cli/tests/trap-test/Cargo.lock @@ -2346,6 +2346,7 @@ dependencies = [ "hyper", "itertools 0.10.5", "lazy_static", + "pin-project", "regex", "rustls", "rustls-native-certs", diff --git a/crates/adapter/src/fastly/core.rs b/crates/adapter/src/fastly/core.rs index 493d3cc0..15d0df7e 100644 --- a/crates/adapter/src/fastly/core.rs +++ b/crates/adapter/src/fastly/core.rs @@ -268,6 +268,24 @@ pub mod fastly_abi { } } +pub mod fastly_compute_runtime { + use super::*; + + #[export_name = "fastly_compute_runtime#get_vcpu_ms"] + pub fn get_vcpu_ms(vcpu_time_ms_out: *mut u64) -> FastlyStatus { + match crate::bindings::fastly::api::compute_runtime::get_vcpu_ms() { + Ok(time) => { + unsafe { + *vcpu_time_ms_out = time; + }; + FastlyStatus::OK + } + + Err(e) => e.into(), + } + } +} + pub mod fastly_uap { use super::*; use crate::bindings::fastly::api::uap; diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 83a8a2fd..3ad5be1b 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -40,6 +40,7 @@ http-body = "^0.4.5" hyper = { workspace = true } itertools = { workspace = true } lazy_static = "^1.4.0" +pin-project = { workspace = true } regex = "^1.3.9" rustls = "^0.21.1" rustls-native-certs = "^0.6.3" diff --git a/lib/compute-at-edge-abi/compute-at-edge.witx b/lib/compute-at-edge-abi/compute-at-edge.witx index 5aa0731d..de18f80f 100644 --- a/lib/compute-at-edge-abi/compute-at-edge.witx +++ b/lib/compute-at-edge-abi/compute-at-edge.witx @@ -965,3 +965,9 @@ (result $err (expected (error $fastly_status))) ) ) + +(module $fastly_compute_runtime + (@interface func (export "get_vcpu_ms") + (result $err (expected $vcpu_ms (error $fastly_status))) + ) +) diff --git a/lib/compute-at-edge-abi/typenames.witx b/lib/compute-at-edge-abi/typenames.witx index f3d5752d..f8fbfc9a 100644 --- a/lib/compute-at-edge-abi/typenames.witx +++ b/lib/compute-at-edge-abi/typenames.witx @@ -358,3 +358,4 @@ (typename $has u32) (typename $body_length u64) +(typename $vcpu_ms u64) diff --git a/lib/data/viceroy-component-adapter.wasm b/lib/data/viceroy-component-adapter.wasm index 000703bd..303c4243 100755 Binary files a/lib/data/viceroy-component-adapter.wasm and b/lib/data/viceroy-component-adapter.wasm differ diff --git a/lib/src/component/compute_runtime.rs b/lib/src/component/compute_runtime.rs new file mode 100644 index 00000000..9c4bf962 --- /dev/null +++ b/lib/src/component/compute_runtime.rs @@ -0,0 +1,10 @@ +use super::fastly::api::{types, compute_runtime}; +use crate::session::Session; +use std::sync::atomic::Ordering; + +#[async_trait::async_trait] +impl compute_runtime::Host for Session { + async fn get_vcpu_ms(&mut self) -> Result { + Ok(self.active_cpu_time_us.load(Ordering::SeqCst) / 1000) + } +} diff --git a/lib/src/component/mod.rs b/lib/src/component/mod.rs index 56c5a6e1..c667056c 100644 --- a/lib/src/component/mod.rs +++ b/lib/src/component/mod.rs @@ -59,6 +59,7 @@ pub fn link_host_functions(linker: &mut component::Linker) -> anyh fastly::api::types::add_to_linker(linker, |x| x.session())?; fastly::api::uap::add_to_linker(linker, |x| x.session())?; fastly::api::config_store::add_to_linker(linker, |x| x.session())?; + fastly::api::compute_runtime::add_to_linker(linker, |x| x.session())?; Ok(()) } @@ -66,6 +67,7 @@ pub fn link_host_functions(linker: &mut component::Linker) -> anyh pub mod async_io; pub mod backend; pub mod cache; +pub mod compute_runtime; pub mod config_store; pub mod device_detection; pub mod dictionary; diff --git a/lib/src/execute.rs b/lib/src/execute.rs index 9cd982ba..46108a9d 100644 --- a/lib/src/execute.rs +++ b/lib/src/execute.rs @@ -18,13 +18,19 @@ use { upstream::TlsConfig, Error, }, + futures::{ + task::{Context, Poll}, + Future, + }, hyper::{Request, Response}, + pin_project::pin_project, std::{ collections::HashSet, fs, io::Write, net::{Ipv4Addr, SocketAddr}, path::{Path, PathBuf}, + pin::Pin, sync::atomic::{AtomicBool, AtomicU64, Ordering}, sync::{Arc, Mutex}, thread::{self, JoinHandle}, @@ -365,13 +371,22 @@ impl ExecuteCtx { let req_id = self .next_req_id .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let active_cpu_time_us = Arc::new(AtomicU64::new(0)); // Spawn a separate task to run the guest code. That allows _this_ method to return a response early // if the guest sends one, while the guest continues to run afterward within its task. - let guest_handle = tokio::task::spawn( - self.run_guest(req, req_id, sender, local, remote) - .instrument(info_span!("request", id = req_id)), - ); + let guest_handle = tokio::task::spawn(CpuTimeTracking::new( + active_cpu_time_us.clone(), + self.run_guest( + req, + req_id, + sender, + local, + remote, + active_cpu_time_us.clone(), + ) + .instrument(info_span!("request", id = req_id)), + )); let resp = match receiver.await { Ok(resp) => (resp, None), @@ -428,6 +443,7 @@ impl ExecuteCtx { sender: Sender>, local: SocketAddr, remote: SocketAddr, + active_cpu_time_us: Arc, ) -> Result<(), ExecutionError> { info!("handling request {} {}", req.method(), req.uri()); let start_timestamp = Instant::now(); @@ -437,6 +453,7 @@ impl ExecuteCtx { sender, local, remote, + active_cpu_time_us, &self, self.backends.clone(), self.device_detection.clone(), @@ -586,6 +603,7 @@ impl ExecuteCtx { let (sender, receiver) = oneshot::channel(); let local = (Ipv4Addr::LOCALHOST, 80).into(); let remote = (Ipv4Addr::LOCALHOST, 0).into(); + let active_cpu_time_us = Arc::new(AtomicU64::new(0)); let session = Session::new( req_id, @@ -593,6 +611,7 @@ impl ExecuteCtx { sender, local, remote, + active_cpu_time_us.clone(), &self, self.backends.clone(), self.device_detection.clone(), @@ -637,7 +656,8 @@ impl ExecuteCtx { .map_err(ExecutionError::Typechecking)?; // Invoke the entrypoint function and collect its exit code - let result = main_func.call_async(&mut store, ()).await; + let result = + CpuTimeTracking::new(active_cpu_time_us, main_func.call_async(&mut store, ())).await; // If we collected a profile, write it to the file write_profile(&mut store, self.guest_profile_path.as_ref().as_ref()); @@ -710,3 +730,32 @@ fn configure_wasmtime( config } + +#[pin_project] +struct CpuTimeTracking { + #[pin] + future: F, + time_spent: Arc, +} + +impl CpuTimeTracking { + fn new(time_spent: Arc, future: F) -> Self { + CpuTimeTracking { future, time_spent } + } +} + +impl>> Future for CpuTimeTracking { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); + + let start = Instant::now(); + let result = me.future.poll(cx); + // 2^64 microseconds is over half a million years, so I'm not terribly + // worried about this cast. + let runtime = start.elapsed().as_micros() as u64; + let _ = me.time_spent.fetch_add(runtime, Ordering::SeqCst); + result + } +} diff --git a/lib/src/linking.rs b/lib/src/linking.rs index f23bb6f5..e81bed4d 100644 --- a/lib/src/linking.rs +++ b/lib/src/linking.rs @@ -303,6 +303,7 @@ pub fn link_host_functions( wiggle_abi::fastly_uap::add_to_linker(linker, WasmCtx::session)?; wiggle_abi::fastly_async_io::add_to_linker(linker, WasmCtx::session)?; wiggle_abi::fastly_backend::add_to_linker(linker, WasmCtx::session)?; + wiggle_abi::fastly_compute_runtime::add_to_linker(linker, WasmCtx::session)?; link_legacy_aliases(linker)?; Ok(()) } diff --git a/lib/src/session.rs b/lib/src/session.rs index 8b1b5615..eaaa47c6 100644 --- a/lib/src/session.rs +++ b/lib/src/session.rs @@ -12,6 +12,7 @@ use std::future::Future; use std::io::Write; use std::net::{IpAddr, SocketAddr}; use std::path::PathBuf; +use std::sync::atomic::AtomicU64; use std::sync::{Arc, Mutex}; use { @@ -47,6 +48,8 @@ pub struct Session { downstream_client_addr: SocketAddr, /// The IP address and port that received this session. downstream_server_addr: SocketAddr, + /// The amount of time we've spent on this session in ms. + pub active_cpu_time_us: Arc, /// The compliance region that this request was received in. /// /// For now this is just always `"none"`, but we place the field in the session @@ -150,6 +153,7 @@ impl Session { resp_sender: Sender>, server_addr: SocketAddr, client_addr: SocketAddr, + active_cpu_time_us: Arc, ctx: &ExecuteCtx, backends: Arc, device_detection: Arc, @@ -176,6 +180,7 @@ impl Session { downstream_req_handle, downstream_req_body_handle, downstream_req_original_headers, + active_cpu_time_us, async_items, req_parts, resp_parts: PrimaryMap::new(), diff --git a/lib/src/wiggle_abi.rs b/lib/src/wiggle_abi.rs index dacc0ec5..bd76b3b0 100644 --- a/lib/src/wiggle_abi.rs +++ b/lib/src/wiggle_abi.rs @@ -51,6 +51,7 @@ macro_rules! multi_value_result { mod backend_impl; mod body_impl; mod cache; +mod compute_runtime; mod config_store; mod device_detection_impl; mod dictionary_impl; diff --git a/lib/src/wiggle_abi/compute_runtime.rs b/lib/src/wiggle_abi/compute_runtime.rs new file mode 100644 index 00000000..b672aaca --- /dev/null +++ b/lib/src/wiggle_abi/compute_runtime.rs @@ -0,0 +1,14 @@ +use crate::error::Error; +use crate::session::Session; +use crate::wiggle_abi::fastly_compute_runtime::FastlyComputeRuntime; +use std::sync::atomic::Ordering; +use wiggle::GuestMemory; + +impl FastlyComputeRuntime for Session { + fn get_vcpu_ms(&mut self, _memory: &mut GuestMemory<'_>) -> Result { + // we internally track microseconds, because our wasmtime tick length + // is too short for ms to work. but we want to shrink this to ms to + // try to minimize timing attacks. + Ok(self.active_cpu_time_us.load(Ordering::SeqCst) / 1000) + } +} diff --git a/lib/wit/deps/fastly/compute.wit b/lib/wit/deps/fastly/compute.wit index a7dcb7ce..bf76171d 100644 --- a/lib/wit/deps/fastly/compute.wit +++ b/lib/wit/deps/fastly/compute.wit @@ -1086,6 +1086,14 @@ interface reactor { serve: func(req: request-handle, body: body-handle) -> result; } +interface compute-runtime { + use types.{error}; + + type vcpu-ms = u64; + + get-vcpu-ms: func() -> result; +} + world compute { import wasi:clocks/wall-clock@0.2.0; import wasi:clocks/monotonic-clock@0.2.0; @@ -1101,6 +1109,7 @@ world compute { import async-io; import backend; import cache; + import compute-runtime; import dictionary; import geo; import device-detection; diff --git a/test-fixtures/Cargo.lock b/test-fixtures/Cargo.lock index f861e605..0e17f73e 100644 --- a/test-fixtures/Cargo.lock +++ b/test-fixtures/Cargo.lock @@ -29,6 +29,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "bytes" version = "1.6.1" @@ -50,6 +59,16 @@ dependencies = [ "libc", ] +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "digest" version = "0.9.0" @@ -59,6 +78,16 @@ dependencies = [ "generic-array", ] +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer 0.10.4", + "crypto-common", +] + [[package]] name = "fastly" version = "0.10.2" @@ -76,7 +105,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "sha2", + "sha2 0.9.9", "thiserror", "time", "url", @@ -138,6 +167,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "hex-literal" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46" + [[package]] name = "http" version = "1.1.0" @@ -277,13 +312,24 @@ version = "0.9.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800" dependencies = [ - "block-buffer", + "block-buffer 0.9.0", "cfg-if", "cpufeatures", - "digest", + "digest 0.9.0", "opaque-debug", ] +[[package]] +name = "sha2" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest 0.10.7", +] + [[package]] name = "syn" version = "1.0.109" @@ -310,14 +356,17 @@ dependencies = [ name = "test-fixtures" version = "0.1.0" dependencies = [ + "anyhow", "base64", "bytes", "fastly", "fastly-shared", "fastly-sys", + "hex-literal", "http", "rustls-pemfile", "serde", + "sha2 0.10.8", ] [[package]] diff --git a/test-fixtures/Cargo.toml b/test-fixtures/Cargo.toml index 226b9546..fa143029 100644 --- a/test-fixtures/Cargo.toml +++ b/test-fixtures/Cargo.toml @@ -8,11 +8,14 @@ license = "Apache-2.0 WITH LLVM-exception" publish = false [dependencies] +anyhow = "1.0.86" base64 = "0.21.2" fastly = "0.10.1" fastly-shared = "0.10.1" fastly-sys = "0.10.1" +hex-literal = "0.4.1" bytes = "1.0.0" http = "1.1.0" rustls-pemfile = "1.0.3" serde = "1.0.114" +sha2 = "0.10.8" diff --git a/test-fixtures/src/bin/vcpu_time_test.rs b/test-fixtures/src/bin/vcpu_time_test.rs new file mode 100644 index 00000000..65401691 --- /dev/null +++ b/test-fixtures/src/bin/vcpu_time_test.rs @@ -0,0 +1,70 @@ +use anyhow::anyhow; +use fastly::{Error, Request, Response}; +use fastly_shared::FastlyStatus; +use hex_literal::hex; +use sha2::{Sha512, Digest}; +use std::time::{Duration, Instant}; + +#[link(wasm_import_module = "fastly_compute_runtime")] +extern "C" { + #[link_name = "get_vcpu_ms"] + pub fn get_vcpu_ms(ms_out: *mut u64) -> FastlyStatus; +} + +fn current_vcpu_ms() -> Result { + let mut vcpu_time = 0u64; + let vcpu_time_result = unsafe { get_vcpu_ms(&mut vcpu_time) }; + if vcpu_time_result != FastlyStatus::OK { + return Err(anyhow!("Got bad response from get_vcpu_ms: {:?}", vcpu_time_result)); + } + Ok(vcpu_time) +} + +fn test_that_waiting_for_servers_increases_only_wall_time(client_req: Request) -> Result<(), Error> { + let wall_initial_time = Instant::now(); + let vcpu_initial_time = current_vcpu_ms()?; + let Ok(_) = client_req.send("slow-server") else { + Response::from_status(500).send_to_client(); + return Ok(()); + }; + let wall_elapsed_time = wall_initial_time.elapsed().as_millis(); + let vcpu_final_time = current_vcpu_ms()?; + + assert!( (vcpu_final_time - vcpu_initial_time) < 1000 ); + assert!(wall_elapsed_time > 3000 ); + + Ok(()) +} + +fn test_that_computing_factorial_increases_vcpu_time() -> Result<(), Error> { + let vcpu_initial_time = current_vcpu_ms()?; + + let block = vec![0; 4096]; + let mut written = 0; + let mut hasher = Sha512::new(); + while written < (1024 * 1024 * 1024) { + hasher.update(&block); + written += block.len(); + } + let result = hasher.finalize(); + assert_eq!(result[..], hex!(" +c5041ae163cf0f65600acfe7f6a63f212101687 +d41a57a4e18ffd2a07a452cd8175b8f5a4868dd +2330bfe5ae123f18216bdbc9e0f80d131e64b94 +913a7b40bb5 +")[..]); + + let vcpu_final_time = current_vcpu_ms()?; + assert!(vcpu_final_time - vcpu_initial_time > 10000); + Ok(()) +} + +fn main() -> Result<(), Error> { + let client_req = Request::from_client(); + + test_that_waiting_for_servers_increases_only_wall_time(client_req); + test_that_computing_factorial_increases_vcpu_time(); + + Response::from_status(200).send_to_client(); + Ok(()) +}