Skip to content

Commit

Permalink
Add downstream_server_ip_addr hostcall (#401)
Browse files Browse the repository at this point in the history
  • Loading branch information
ulyssa authored Jul 3, 2024
1 parent 5647b08 commit bf635be
Show file tree
Hide file tree
Showing 14 changed files with 163 additions and 64 deletions.
25 changes: 18 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion cli/tests/integration/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,9 +398,11 @@ impl Test {
.build()
.unwrap();
*req.uri_mut() = new_uri;
let local = (Ipv4Addr::LOCALHOST, 80).into();
let remote = (Ipv4Addr::LOCALHOST, 0).into();
let resp = ctx
.clone()
.handle_request(req.map(Into::into), Ipv4Addr::LOCALHOST.into())
.handle_request(req.map(Into::into), local, remote)
.await
.map(|result| {
match result {
Expand Down
27 changes: 19 additions & 8 deletions cli/tests/trap-test/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion cli/tests/trap-test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ async fn fatal_error_traps_impl(adapt_core_wasm: bool) -> TestResult {
adapt_core_wasm,
)?;
let req = Request::get("http://127.0.0.1:7676/").body(Body::from(""))?;
let local = "127.0.0.1:80".parse().unwrap();
let remote = "127.0.0.1:0".parse().unwrap();
let resp = ctx
.handle_request_with_runtime_error(req, "127.0.0.1".parse().unwrap())
.handle_request_with_runtime_error(req, local, remote)
.await?;

// The Guest was terminated and so should return a 500.
Expand Down
2 changes: 1 addition & 1 deletion lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ bytesize = "^1.1.0"
cfg-if = "^1.0"
clap = { workspace = true }
cranelift-entity = "^0.88.1"
fastly-shared = "^0.9.11"
fastly-shared = "^0.10.1"
flate2 = "^1.0.24"
futures = { workspace = true }
http = "^0.2.8"
Expand Down
6 changes: 6 additions & 0 deletions lib/compute-at-edge-abi/compute-at-edge.witx
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@
(result $err (expected $num_bytes (error $fastly_status)))
)

(@interface func (export "downstream_server_ip_addr")
;; must be a 16-byte array
(param $addr_octets_out (@witx pointer (@witx char8)))
(result $err (expected $num_bytes (error $fastly_status)))
)

(@interface func (export "downstream_client_h2_fingerprint")
(param $h2fp_out (@witx pointer (@witx char8)))
(param $h2fp_max_len (@witx usize))
Expand Down
15 changes: 13 additions & 2 deletions lib/src/component/http_req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use {
request::Request,
Method, Uri,
},
std::net::IpAddr,
std::str::FromStr,
};

Expand Down Expand Up @@ -107,7 +108,6 @@ impl http_req::Host for Session {
}

async fn downstream_client_ip_addr(&mut self) -> Result<Vec<u8>, types::Error> {
use std::net::IpAddr;
match self.downstream_client_ip() {
IpAddr::V4(addr) => {
let octets = addr.octets();
Expand All @@ -123,7 +123,18 @@ impl http_req::Host for Session {
}

async fn downstream_server_ip_addr(&mut self) -> Result<Vec<u8>, types::Error> {
Err(Error::NotAvailable("Downstream server ip address").into())
match self.downstream_server_ip() {
IpAddr::V4(addr) => {
let octets = addr.octets();
debug_assert_eq!(octets.len(), 4);
Ok(Vec::from(octets))
}
IpAddr::V6(addr) => {
let octets = addr.octets();
debug_assert_eq!(octets.len(), 16);
Ok(Vec::from(octets))
}
}
}

async fn downstream_tls_cipher_openssl_name(
Expand Down
24 changes: 16 additions & 8 deletions lib/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use {
collections::HashSet,
fs,
io::Write,
net::{IpAddr, Ipv4Addr},
net::{Ipv4Addr, SocketAddr},
path::{Path, PathBuf},
sync::atomic::{AtomicBool, AtomicU64, Ordering},
sync::{Arc, Mutex},
Expand Down Expand Up @@ -347,14 +347,17 @@ impl ExecuteCtx {
/// # let req = Request::new(Body::from(""));
/// let adapt_core_wasm = false;
/// let ctx = ExecuteCtx::new("path/to/a/file.wasm", ProfilingStrategy::None, HashSet::new(), None, Default::default(), adapt_core_wasm)?;
/// let resp = ctx.handle_request(req, "127.0.0.1".parse().unwrap()).await?;
/// let local = "127.0.0.1:80".parse().unwrap();
/// let remote = "127.0.0.1:0".parse().unwrap();
/// let resp = ctx.handle_request(req, local, remote).await?;
/// # Ok(())
/// # }
/// ```
pub async fn handle_request(
self,
incoming_req: Request<hyper::Body>,
remote: IpAddr,
local: SocketAddr,
remote: SocketAddr,
) -> Result<(Response<Body>, Option<anyhow::Error>), Error> {
let req = prepare_request(incoming_req)?;
let (sender, receiver) = oneshot::channel();
Expand All @@ -366,7 +369,7 @@ impl ExecuteCtx {
// Spawn a separate task to run the guest code. That allows _this_ method to return a response early
// if the guest sends one, while the guest continues to run afterward within its task.
let guest_handle = tokio::task::spawn(
self.run_guest(req, req_id, sender, remote)
self.run_guest(req, req_id, sender, local, remote)
.instrument(info_span!("request", id = req_id)),
);

Expand Down Expand Up @@ -400,9 +403,10 @@ impl ExecuteCtx {
pub async fn handle_request_with_runtime_error(
self,
incoming_req: Request<hyper::Body>,
remote: IpAddr,
local: SocketAddr,
remote: SocketAddr,
) -> Result<Response<Body>, Error> {
let result = self.handle_request(incoming_req, remote).await?;
let result = self.handle_request(incoming_req, local, remote).await?;
let resp = match result.1 {
None => result.0,
Some(err) => {
Expand All @@ -422,14 +426,16 @@ impl ExecuteCtx {
req: Request<Body>,
req_id: u64,
sender: Sender<Response<Body>>,
remote: IpAddr,
local: SocketAddr,
remote: SocketAddr,
) -> Result<(), ExecutionError> {
info!("handling request {} {}", req.method(), req.uri());
let start_timestamp = Instant::now();
let session = Session::new(
req_id,
req,
sender,
local,
remote,
&self,
self.backends.clone(),
Expand Down Expand Up @@ -578,12 +584,14 @@ impl ExecuteCtx {
let req = Request::get("http://example.com/").body(Body::empty())?;
let req_id = 0;
let (sender, receiver) = oneshot::channel();
let remote = Ipv4Addr::LOCALHOST.into();
let local = (Ipv4Addr::LOCALHOST, 80).into();
let remote = (Ipv4Addr::LOCALHOST, 0).into();

let session = Session::new(
req_id,
req,
sender,
local,
remote,
&self,
self.backends.clone(),
Expand Down
27 changes: 20 additions & 7 deletions lib/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use {
std::{
convert::Infallible,
future::Future,
net::{IpAddr, SocketAddr},
net::SocketAddr,
pin::Pin,
task::{self, Poll},
},
Expand Down Expand Up @@ -54,7 +54,7 @@ impl ViceroyService {
}

/// An internal helper, create a [`RequestService`](struct.RequestService.html).
fn make_service(&self, remote: IpAddr) -> RequestService {
fn make_service(&self, remote: &AddrStream) -> RequestService {
RequestService::new(self.ctx.clone(), remote)
}

Expand All @@ -81,7 +81,7 @@ impl<'addr> Service<&'addr AddrStream> for ViceroyService {
}

fn call(&mut self, addr: &'addr AddrStream) -> Self::Future {
future::ok(self.make_service(addr.remote_addr().ip()))
future::ok(self.make_service(addr))
}
}

Expand All @@ -102,13 +102,21 @@ impl<'addr> Service<&'addr AddrStream> for ViceroyService {
#[derive(Clone)]
pub struct RequestService {
ctx: ExecuteCtx,
remote_addr: IpAddr,
local_addr: SocketAddr,
remote_addr: SocketAddr,
}

impl RequestService {
/// Create a new request service.
fn new(ctx: ExecuteCtx, remote_addr: IpAddr) -> Self {
Self { ctx, remote_addr }
fn new(ctx: ExecuteCtx, addr: &AddrStream) -> Self {
let local_addr = addr.local_addr();
let remote_addr = addr.remote_addr();

Self {
ctx,
local_addr,
remote_addr,
}
}
}

Expand All @@ -126,9 +134,14 @@ impl Service<Request<hyper::Body>> for RequestService {
fn call(&mut self, req: Request<hyper::Body>) -> Self::Future {
// Request handling currently takes ownership of the context, which is cheaply cloneable.
let ctx = self.ctx.clone();
let local = self.local_addr;
let remote = self.remote_addr;

// Now, use the execution context to handle the request.
Box::pin(async move { ctx.handle_request(req, remote).await.map(|result| result.0) })
Box::pin(async move {
ctx.handle_request(req, local, remote)
.await
.map(|result| result.0)
})
}
}
Loading

0 comments on commit bf635be

Please sign in to comment.