diff --git a/crates/adapter/src/fastly/core.rs b/crates/adapter/src/fastly/core.rs index 9982e4bc..493d3cc0 100644 --- a/crates/adapter/src/fastly/core.rs +++ b/crates/adapter/src/fastly/core.rs @@ -1974,6 +1974,30 @@ pub mod fastly_http_resp { pub fn close(resp_handle: ResponseHandle) -> FastlyStatus { convert_result(fastly::api::http_resp::close(resp_handle)) } + + #[export_name = "fastly_http_resp#get_addr_dest_ip"] + pub fn get_addr_dest_ip( + resp_handle: ResponseHandle, + addr_octets_out: *mut u8, + nwritten_out: *mut usize, + ) -> FastlyStatus { + alloc_result!(addr_octets_out, 16, nwritten_out, { + fastly::api::http_resp::get_addr_dest_ip(resp_handle) + }) + } + + #[export_name = "fastly_http_resp#get_addr_dest_port"] + pub fn get_addr_dest_port(resp_handle: ResponseHandle, port_out: *mut u16) -> FastlyStatus { + match fastly::api::http_resp::get_addr_dest_port(resp_handle) { + Ok(port) => { + unsafe { + *port_out = port; + } + FastlyStatus::OK + } + Err(e) => e.into(), + } + } } pub mod fastly_dictionary { diff --git a/lib/compute-at-edge-abi/compute-at-edge.witx b/lib/compute-at-edge-abi/compute-at-edge.witx index bd339e14..3306aac3 100644 --- a/lib/compute-at-edge-abi/compute-at-edge.witx +++ b/lib/compute-at-edge-abi/compute-at-edge.witx @@ -631,6 +631,24 @@ (param $mode $http_keepalive_mode) (result $err (expected (error $fastly_status))) ) + + ;;; Hostcall for getting the destination IP used for this request. + ;;; + ;;; The buffer for the IP address must be 16 bytes. This will return + ;;; the number of bytes written to the buffer: 4 for IPv4 addresses, + ;;; and 16 for IPv6. + (@interface func (export "get_addr_dest_ip") + (param $h $response_handle) + ;; must be a 16-byte array + (param $addr_octets_out (@witx pointer (@witx char8))) + (result $err (expected $num_bytes (error $fastly_status))) + ) + + ;;; Hostcall for getting the destination port used for this request. + (@interface func (export "get_addr_dest_port") + (param $h $response_handle) + (result $err (expected $port (error $fastly_status))) + ) ) (module $fastly_dictionary diff --git a/lib/src/cache.rs b/lib/src/cache.rs new file mode 100644 index 00000000..c39b909a --- /dev/null +++ b/lib/src/cache.rs @@ -0,0 +1,68 @@ +use crate::wiggle_abi::types::CacheOverrideTag; +use http::HeaderValue; + +/// Optional override for response caching behavior. +#[derive(Clone, Debug, Default)] +pub enum CacheOverride { + /// Do not override the behavior specified in the origin response's cache control headers. + #[default] + None, + /// Do not cache the response to this request, regardless of the origin response's headers. + Pass, + /// Override particular cache control settings. + /// + /// The origin response's cache control headers will be used for ttl and stale_while_revalidate if `None`. + Override { + ttl: Option, + stale_while_revalidate: Option, + pci: bool, + surrogate_key: Option, + }, +} + +impl CacheOverride { + pub fn is_pass(&self) -> bool { + if let Self::Pass = self { + true + } else { + false + } + } + + /// Convert from the representation suitable for passing across the ABI boundary. + /// + /// Returns `None` if the tag is not recognized. Depending on the tag, some of the values may be + /// ignored. + pub fn from_abi( + tag: u32, + ttl: u32, + swr: u32, + surrogate_key: Option, + ) -> Option { + CacheOverrideTag::from_bits(tag).map(|tag| { + if tag.contains(CacheOverrideTag::PASS) { + return CacheOverride::Pass; + } + if tag.is_empty() && surrogate_key.is_none() { + return CacheOverride::None; + } + let ttl = if tag.contains(CacheOverrideTag::TTL) { + Some(ttl) + } else { + None + }; + let stale_while_revalidate = if tag.contains(CacheOverrideTag::STALE_WHILE_REVALIDATE) { + Some(swr) + } else { + None + }; + let pci = tag.contains(CacheOverrideTag::PCI); + CacheOverride::Override { + ttl, + stale_while_revalidate, + pci, + surrogate_key, + } + }) + } +} diff --git a/lib/src/component/http_resp.rs b/lib/src/component/http_resp.rs index dd3756d4..e196d03d 100644 --- a/lib/src/component/http_resp.rs +++ b/lib/src/component/http_resp.rs @@ -1,10 +1,11 @@ use { super::fastly::api::{http_resp, http_types, types}, super::{headers::write_values, types::TrappableError}, - crate::{error::Error, session::Session}, + crate::{error::Error, session::Session, upstream}, cfg_if::cfg_if, http::{HeaderName, HeaderValue}, hyper::http::response::Response, + std::net::IpAddr, }; const MAX_HEADER_NAME_LEN: usize = (1 << 16) - 1; @@ -286,4 +287,41 @@ impl http_resp::Host for Session { http_resp::KeepaliveMode::Automatic => Ok(()), } } + + async fn get_addr_dest_ip( + &mut self, + resp_handle: http_types::ResponseHandle, + ) -> Result, types::Error> { + let resp = self.response_parts(resp_handle.into())?; + let md = resp + .extensions + .get::() + .ok_or(Error::ValueAbsent)?; + + match md.remote_addr.ip() { + IpAddr::V4(addr) => { + let octets = addr.octets(); + debug_assert_eq!(octets.len(), 4); + Ok(Vec::from(octets)) + } + IpAddr::V6(addr) => { + let octets = addr.octets(); + debug_assert_eq!(octets.len(), 16); + Ok(Vec::from(octets)) + } + } + } + + async fn get_addr_dest_port( + &mut self, + resp_handle: http_types::ResponseHandle, + ) -> Result { + let resp = self.response_parts(resp_handle.into())?; + let md = resp + .extensions + .get::() + .ok_or(Error::ValueAbsent)?; + let port = md.remote_addr.port(); + Ok(port) + } } diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 5f03d7ae..6bf2811c 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -15,6 +15,7 @@ pub mod adapt; pub mod body; +pub mod cache; pub mod config; pub mod error; pub mod logging; diff --git a/lib/src/upstream.rs b/lib/src/upstream.rs index add4e2d5..c6705210 100644 --- a/lib/src/upstream.rs +++ b/lib/src/upstream.rs @@ -1,5 +1,6 @@ use crate::{ body::{Body, Chunk}, + cache::CacheOverride, config::Backend, error::Error, headers::filter_outgoing_headers, @@ -12,6 +13,7 @@ use hyper::{client::HttpConnector, header, Client, HeaderMap, Request, Response, use rustls::client::ServerName; use std::{ io, + net::SocketAddr, pin::Pin, str::FromStr, sync::Arc, @@ -96,8 +98,23 @@ impl BackendConnector { type BoxError = Box; pub enum Connection { - Http(TcpStream), - Https(Box>), + Http(TcpStream, ConnMetadata), + Https(Box>, ConnMetadata), +} + +impl Connection { + fn metadata(&self) -> &ConnMetadata { + match self { + Connection::Http(_, md) => &md, + Connection::Https(_, md) => &md, + } + } +} + +#[derive(Clone)] +pub struct ConnMetadata { + pub direct_pass: bool, + pub remote_addr: SocketAddr, } impl hyper::service::Service for BackendConnector { @@ -138,7 +155,13 @@ impl hyper::service::Service for BackendConnector { Box::pin(async move { let tcp = connect_fut.await.map_err(Box::new)?; - if backend.uri.scheme_str() == Some("https") { + let remote_addr = tcp.peer_addr()?; + let metadata = ConnMetadata { + direct_pass: false, + remote_addr, + }; + + let conn = if backend.uri.scheme_str() == Some("https") { let mut config = if let Some(certed_key) = &backend.client_cert { config.with_client_auth_cert(certed_key.certs(), certed_key.key())? } else { @@ -181,10 +204,12 @@ impl hyper::service::Service for BackendConnector { } } - Ok(Connection::Https(Box::new(tls))) + Connection::Https(Box::new(tls), metadata) } else { - Ok(Connection::Http(tcp)) - } + Connection::Http(tcp, metadata) + }; + + Ok(conn) }) } } @@ -288,7 +313,13 @@ pub fn send_request( builder.http2_only(true); } - let basic_response = builder + let is_pass = req + .extensions() + .get::() + .map(CacheOverride::is_pass) + .unwrap_or_default(); + + let mut basic_response = builder .set_host(false) .http2_only(h2only) .build(connector) @@ -299,6 +330,11 @@ pub fn send_request( e })?; + if let Some(md) = basic_response.extensions_mut().get_mut::() { + // This is used later to create similar behaviour between Compute and Viceroy. + md.direct_pass = is_pass; + } + if try_decompression && basic_response .headers() @@ -346,7 +382,7 @@ pub struct SelectTarget { impl hyper::client::connect::Connection for Connection { fn connected(&self) -> hyper::client::connect::Connected { - hyper::client::connect::Connected::new() + hyper::client::connect::Connected::new().extra(self.metadata().clone()) } } @@ -357,8 +393,8 @@ impl AsyncRead for Connection { buf: &mut ReadBuf<'_>, ) -> Poll> { match Pin::get_mut(self) { - Connection::Http(s) => Pin::new(s).poll_read(cx, buf), - Connection::Https(s) => Pin::new(s).poll_read(cx, buf), + Connection::Http(s, _) => Pin::new(s).poll_read(cx, buf), + Connection::Https(s, _) => Pin::new(s).poll_read(cx, buf), } } } @@ -370,22 +406,22 @@ impl AsyncWrite for Connection { buf: &[u8], ) -> Poll> { match Pin::get_mut(self) { - Connection::Http(s) => Pin::new(s).poll_write(cx, buf), - Connection::Https(s) => Pin::new(s).poll_write(cx, buf), + Connection::Http(s, _) => Pin::new(s).poll_write(cx, buf), + Connection::Https(s, _) => Pin::new(s).poll_write(cx, buf), } } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match Pin::get_mut(self) { - Connection::Http(s) => Pin::new(s).poll_flush(cx), - Connection::Https(s) => Pin::new(s).poll_flush(cx), + Connection::Http(s, _) => Pin::new(s).poll_flush(cx), + Connection::Https(s, _) => Pin::new(s).poll_flush(cx), } } fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match Pin::get_mut(self) { - Connection::Http(s) => Pin::new(s).poll_shutdown(cx), - Connection::Https(s) => Pin::new(s).poll_shutdown(cx), + Connection::Http(s, _) => Pin::new(s).poll_shutdown(cx), + Connection::Https(s, _) => Pin::new(s).poll_shutdown(cx), } } } diff --git a/lib/src/wiggle_abi/req_impl.rs b/lib/src/wiggle_abi/req_impl.rs index 2a6ae71b..8af66e2e 100644 --- a/lib/src/wiggle_abi/req_impl.rs +++ b/lib/src/wiggle_abi/req_impl.rs @@ -2,6 +2,7 @@ use super::types::SendErrorDetail; use super::SecretStoreError; +use crate::cache::CacheOverride; use crate::config::ClientCertInfo; use crate::secret_store::SecretLookup; @@ -40,20 +41,24 @@ impl FastlyHttpReq for Session { Ok((req_handle, body_handle)) } - #[allow(unused_variables)] // FIXME KTM 2020-06-25: Remove this directive once implemented. fn cache_override_set( &mut self, - memory: &mut GuestMemory<'_>, + _memory: &mut GuestMemory<'_>, req_handle: RequestHandle, tag: CacheOverrideTag, ttl: u32, stale_while_revalidate: u32, ) -> Result<(), Error> { - // For now, we ignore caching directives because we never cache anything + let overrides = CacheOverride::from_abi(u32::from(tag), ttl, stale_while_revalidate, None) + .ok_or(Error::InvalidArgument)?; + + self.request_parts_mut(req_handle)? + .extensions + .insert(overrides); + Ok(()) } - #[allow(unused_variables)] // FIXME KTM 2020-06-25: Remove this directive once implemented. fn cache_override_v2_set( &mut self, memory: &mut GuestMemory<'_>, @@ -63,7 +68,21 @@ impl FastlyHttpReq for Session { stale_while_revalidate: u32, sk: GuestPtr<[u8]>, ) -> Result<(), Error> { - // For now, we ignore caching directives because we never cache anything + let sk = if sk.len() > 0 { + let sk = memory.as_slice(sk)?.ok_or(Error::SharedMemory)?; + let sk = HeaderValue::from_bytes(&sk).map_err(|_| Error::InvalidArgument)?; + Some(sk) + } else { + None + }; + + let overrides = CacheOverride::from_abi(u32::from(tag), ttl, stale_while_revalidate, sk) + .ok_or(Error::InvalidArgument)?; + + self.request_parts_mut(req_handle)? + .extensions + .insert(overrides); + Ok(()) } diff --git a/lib/src/wiggle_abi/resp_impl.rs b/lib/src/wiggle_abi/resp_impl.rs index 4e3ac90c..2eae9dc1 100644 --- a/lib/src/wiggle_abi/resp_impl.rs +++ b/lib/src/wiggle_abi/resp_impl.rs @@ -4,6 +4,7 @@ use { crate::{ error::Error, session::Session, + upstream, wiggle_abi::{ fastly_http_resp::FastlyHttpResp, headers::HttpHeaders, @@ -15,6 +16,7 @@ use { }, cfg_if::cfg_if, hyper::http::response::Response, + std::net::IpAddr, wiggle::{GuestMemory, GuestPtr}, }; @@ -225,4 +227,65 @@ impl FastlyHttpResp for Session { HttpKeepaliveMode::Automatic => Ok(()), } } + + fn get_addr_dest_ip( + &mut self, + memory: &mut GuestMemory<'_>, + resp_handle: ResponseHandle, + addr_octets_ptr: GuestPtr, + ) -> Result { + let resp = self.response_parts(resp_handle)?; + let md = resp + .extensions + .get::() + .ok_or(Error::ValueAbsent)?; + + if !md.direct_pass { + // Compute currently only returns this value when we are doing + // direct pass, so we skip returning a value here for now, even + // if we have one, so that guest code doesn't come to expect it + // during local testing. + return Err(Error::ValueAbsent); + } + + match md.remote_addr.ip() { + IpAddr::V4(addr) => { + let octets = addr.octets(); + let octets_bytes = octets.len() as u32; + debug_assert_eq!(octets_bytes, 4); + memory.copy_from_slice(&octets, addr_octets_ptr.as_array(octets_bytes))?; + Ok(octets_bytes) + } + IpAddr::V6(addr) => { + let octets = addr.octets(); + let octets_bytes = octets.len() as u32; + debug_assert_eq!(octets_bytes, 16); + memory.copy_from_slice(&octets, addr_octets_ptr.as_array(octets_bytes))?; + Ok(octets_bytes) + } + } + } + + fn get_addr_dest_port( + &mut self, + _memory: &mut GuestMemory<'_>, + resp_handle: ResponseHandle, + ) -> Result { + let resp = self.response_parts(resp_handle)?; + let md = resp + .extensions + .get::() + .ok_or(Error::ValueAbsent)?; + + if !md.direct_pass { + // Compute currently only returns this value when we are doing + // direct pass, so we skip returning a value here for now, even + // if we have one, so that guest code doesn't come to expect it + // during local testing. + return Err(Error::ValueAbsent); + } + + let port = md.remote_addr.port(); + Ok(port) + } } diff --git a/lib/wit/deps/fastly/compute.wit b/lib/wit/deps/fastly/compute.wit index 07675e5b..a7dcb7ce 100644 --- a/lib/wit/deps/fastly/compute.wit +++ b/lib/wit/deps/fastly/compute.wit @@ -545,6 +545,10 @@ interface http-resp { http-keepalive-mode-set: func(h: response-handle, mode: keepalive-mode) -> result<_, error>; + + get-addr-dest-ip: func(h: response-handle) -> result, error>; + + get-addr-dest-port: func(h: response-handle) -> result; } /*