Skip to content

Commit

Permalink
Merge branch 'awick/get-vcpu-ms' into awick/testing-ci
Browse files Browse the repository at this point in the history
  • Loading branch information
acw committed Aug 12, 2024
2 parents 19d38db + ff726ec commit 20825f1
Show file tree
Hide file tree
Showing 21 changed files with 280 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ base64 = "0.21.2"
clap = { version = "^4.0.18", features = ["derive"] }
hyper = { version = "=0.14.26", features = ["full"] }
itertools = "0.10.5"
pin-project = "1.0.8"
rustls = { version = "0.21.5", features = ["dangerous_configuration"] }
rustls-pemfile = "1.0.3"
serde_json = "1.0.59"
Expand Down
1 change: 1 addition & 0 deletions cli/tests/integration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ mod upstream;
mod upstream_async;
mod upstream_dynamic;
mod upstream_streaming;
mod vcpu_time;
29 changes: 29 additions & 0 deletions cli/tests/integration/vcpu_time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use crate::{
common::{Test, TestResult},
viceroy_test,
};
use hyper::{Request, Response, StatusCode};

viceroy_test!(vcpu_time_getter_works, |is_component| {
let req = Request::get("/")
.header("Accept", "text/html")
.body("Hello, world!")
.unwrap();

let resp = Test::using_fixture("vcpu_time_test.wasm")
.adapt_component(is_component)
.backend("slow-server", "/", None, |_| {
std::thread::sleep(std::time::Duration::from_millis(3000));
Response::builder()
.status(StatusCode::OK)
.body(vec![])
.unwrap()
})
.await
.against(req)
.await?;

println!("resp.status() = {}", resp.status());
assert_eq!(resp.status(), StatusCode::OK);
Ok(())
});
1 change: 1 addition & 0 deletions cli/tests/trap-test/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions crates/adapter/src/fastly/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,24 @@ pub mod fastly_abi {
}
}

pub mod fastly_compute_runtime {
use super::*;

#[export_name = "fastly_compute_runtime#get_vcpu_ms"]
pub fn get_vcpu_ms(vcpu_time_ms_out: *mut u64) -> FastlyStatus {
match crate::bindings::fastly::api::compute_runtime::get_vcpu_ms() {
Ok(time) => {
unsafe {
*vcpu_time_ms_out = time;
};
FastlyStatus::OK
}

Err(e) => e.into(),
}
}
}

pub mod fastly_uap {
use super::*;
use crate::bindings::fastly::api::uap;
Expand Down
1 change: 1 addition & 0 deletions lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ http-body = "^0.4.5"
hyper = { workspace = true }
itertools = { workspace = true }
lazy_static = "^1.4.0"
pin-project = { workspace = true }
regex = "^1.3.9"
rustls = "^0.21.1"
rustls-native-certs = "^0.6.3"
Expand Down
6 changes: 6 additions & 0 deletions lib/compute-at-edge-abi/compute-at-edge.witx
Original file line number Diff line number Diff line change
Expand Up @@ -965,3 +965,9 @@
(result $err (expected (error $fastly_status)))
)
)

(module $fastly_compute_runtime
(@interface func (export "get_vcpu_ms")
(result $err (expected $vcpu_ms (error $fastly_status)))
)
)
1 change: 1 addition & 0 deletions lib/compute-at-edge-abi/typenames.witx
Original file line number Diff line number Diff line change
Expand Up @@ -358,3 +358,4 @@
(typename $has u32)

(typename $body_length u64)
(typename $vcpu_ms u64)
Binary file modified lib/data/viceroy-component-adapter.wasm
Binary file not shown.
10 changes: 10 additions & 0 deletions lib/src/component/compute_runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use super::fastly::api::{types, compute_runtime};
use crate::session::Session;
use std::sync::atomic::Ordering;

#[async_trait::async_trait]
impl compute_runtime::Host for Session {
async fn get_vcpu_ms(&mut self) -> Result<u64, types::Error> {
Ok(self.active_cpu_time_us.load(Ordering::SeqCst) / 1000)
}
}
2 changes: 2 additions & 0 deletions lib/src/component/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@ pub fn link_host_functions(linker: &mut component::Linker<ComponentCtx>) -> anyh
fastly::api::types::add_to_linker(linker, |x| x.session())?;
fastly::api::uap::add_to_linker(linker, |x| x.session())?;
fastly::api::config_store::add_to_linker(linker, |x| x.session())?;
fastly::api::compute_runtime::add_to_linker(linker, |x| x.session())?;

Ok(())
}

pub mod async_io;
pub mod backend;
pub mod cache;
pub mod compute_runtime;
pub mod config_store;
pub mod device_detection;
pub mod dictionary;
Expand Down
59 changes: 54 additions & 5 deletions lib/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@ use {
upstream::TlsConfig,
Error,
},
futures::{
task::{Context, Poll},
Future,
},
hyper::{Request, Response},
pin_project::pin_project,
std::{
collections::HashSet,
fs,
io::Write,
net::{Ipv4Addr, SocketAddr},
path::{Path, PathBuf},
pin::Pin,
sync::atomic::{AtomicBool, AtomicU64, Ordering},
sync::{Arc, Mutex},
thread::{self, JoinHandle},
Expand Down Expand Up @@ -365,13 +371,22 @@ impl ExecuteCtx {
let req_id = self
.next_req_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let active_cpu_time_us = Arc::new(AtomicU64::new(0));

// Spawn a separate task to run the guest code. That allows _this_ method to return a response early
// if the guest sends one, while the guest continues to run afterward within its task.
let guest_handle = tokio::task::spawn(
self.run_guest(req, req_id, sender, local, remote)
.instrument(info_span!("request", id = req_id)),
);
let guest_handle = tokio::task::spawn(CpuTimeTracking::new(
active_cpu_time_us.clone(),
self.run_guest(
req,
req_id,
sender,
local,
remote,
active_cpu_time_us.clone(),
)
.instrument(info_span!("request", id = req_id)),
));

let resp = match receiver.await {
Ok(resp) => (resp, None),
Expand Down Expand Up @@ -428,6 +443,7 @@ impl ExecuteCtx {
sender: Sender<Response<Body>>,
local: SocketAddr,
remote: SocketAddr,
active_cpu_time_us: Arc<AtomicU64>,
) -> Result<(), ExecutionError> {
info!("handling request {} {}", req.method(), req.uri());
let start_timestamp = Instant::now();
Expand All @@ -437,6 +453,7 @@ impl ExecuteCtx {
sender,
local,
remote,
active_cpu_time_us,
&self,
self.backends.clone(),
self.device_detection.clone(),
Expand Down Expand Up @@ -586,13 +603,15 @@ impl ExecuteCtx {
let (sender, receiver) = oneshot::channel();
let local = (Ipv4Addr::LOCALHOST, 80).into();
let remote = (Ipv4Addr::LOCALHOST, 0).into();
let active_cpu_time_us = Arc::new(AtomicU64::new(0));

let session = Session::new(
req_id,
req,
sender,
local,
remote,
active_cpu_time_us.clone(),
&self,
self.backends.clone(),
self.device_detection.clone(),
Expand Down Expand Up @@ -637,7 +656,8 @@ impl ExecuteCtx {
.map_err(ExecutionError::Typechecking)?;

// Invoke the entrypoint function and collect its exit code
let result = main_func.call_async(&mut store, ()).await;
let result =
CpuTimeTracking::new(active_cpu_time_us, main_func.call_async(&mut store, ())).await;

// If we collected a profile, write it to the file
write_profile(&mut store, self.guest_profile_path.as_ref().as_ref());
Expand Down Expand Up @@ -710,3 +730,32 @@ fn configure_wasmtime(

config
}

#[pin_project]
struct CpuTimeTracking<F> {
#[pin]
future: F,
time_spent: Arc<AtomicU64>,
}

impl<F> CpuTimeTracking<F> {
fn new(time_spent: Arc<AtomicU64>, future: F) -> Self {
CpuTimeTracking { future, time_spent }
}
}

impl<E, F: Future<Output = Result<(), E>>> Future for CpuTimeTracking<F> {
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();

let start = Instant::now();
let result = me.future.poll(cx);
// 2^64 microseconds is over half a million years, so I'm not terribly
// worried about this cast.
let runtime = start.elapsed().as_micros() as u64;
let _ = me.time_spent.fetch_add(runtime, Ordering::SeqCst);
result
}
}
1 change: 1 addition & 0 deletions lib/src/linking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ pub fn link_host_functions(
wiggle_abi::fastly_uap::add_to_linker(linker, WasmCtx::session)?;
wiggle_abi::fastly_async_io::add_to_linker(linker, WasmCtx::session)?;
wiggle_abi::fastly_backend::add_to_linker(linker, WasmCtx::session)?;
wiggle_abi::fastly_compute_runtime::add_to_linker(linker, WasmCtx::session)?;
link_legacy_aliases(linker)?;
Ok(())
}
Expand Down
5 changes: 5 additions & 0 deletions lib/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::future::Future;
use std::io::Write;
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex};

use {
Expand Down Expand Up @@ -47,6 +48,8 @@ pub struct Session {
downstream_client_addr: SocketAddr,
/// The IP address and port that received this session.
downstream_server_addr: SocketAddr,
/// The amount of time we've spent on this session in ms.
pub active_cpu_time_us: Arc<AtomicU64>,
/// The compliance region that this request was received in.
///
/// For now this is just always `"none"`, but we place the field in the session
Expand Down Expand Up @@ -150,6 +153,7 @@ impl Session {
resp_sender: Sender<Response<Body>>,
server_addr: SocketAddr,
client_addr: SocketAddr,
active_cpu_time_us: Arc<AtomicU64>,
ctx: &ExecuteCtx,
backends: Arc<Backends>,
device_detection: Arc<DeviceDetection>,
Expand All @@ -176,6 +180,7 @@ impl Session {
downstream_req_handle,
downstream_req_body_handle,
downstream_req_original_headers,
active_cpu_time_us,
async_items,
req_parts,
resp_parts: PrimaryMap::new(),
Expand Down
1 change: 1 addition & 0 deletions lib/src/wiggle_abi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ macro_rules! multi_value_result {
mod backend_impl;
mod body_impl;
mod cache;
mod compute_runtime;
mod config_store;
mod device_detection_impl;
mod dictionary_impl;
Expand Down
14 changes: 14 additions & 0 deletions lib/src/wiggle_abi/compute_runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use crate::error::Error;
use crate::session::Session;
use crate::wiggle_abi::fastly_compute_runtime::FastlyComputeRuntime;
use std::sync::atomic::Ordering;
use wiggle::GuestMemory;

impl FastlyComputeRuntime for Session {
fn get_vcpu_ms(&mut self, _memory: &mut GuestMemory<'_>) -> Result<u64, Error> {
// we internally track microseconds, because our wasmtime tick length
// is too short for ms to work. but we want to shrink this to ms to
// try to minimize timing attacks.
Ok(self.active_cpu_time_us.load(Ordering::SeqCst) / 1000)
}
}
9 changes: 9 additions & 0 deletions lib/wit/deps/fastly/compute.wit
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,14 @@ interface reactor {
serve: func(req: request-handle, body: body-handle) -> result;
}

interface compute-runtime {
use types.{error};

type vcpu-ms = u64;

get-vcpu-ms: func() -> result<vcpu-ms, error>;
}

world compute {
import wasi:clocks/wall-clock@0.2.0;
import wasi:clocks/monotonic-clock@0.2.0;
Expand All @@ -1101,6 +1109,7 @@ world compute {
import async-io;
import backend;
import cache;
import compute-runtime;
import dictionary;
import geo;
import device-detection;
Expand Down
Loading

0 comments on commit 20825f1

Please sign in to comment.