Skip to content

Commit

Permalink
Add get_addr_dest_{ip,port} hostcalls (#402)
Browse files Browse the repository at this point in the history
  • Loading branch information
ulyssa authored Jul 8, 2024
1 parent bf635be commit c5c4e35
Show file tree
Hide file tree
Showing 9 changed files with 293 additions and 22 deletions.
24 changes: 24 additions & 0 deletions crates/adapter/src/fastly/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1974,6 +1974,30 @@ pub mod fastly_http_resp {
pub fn close(resp_handle: ResponseHandle) -> FastlyStatus {
convert_result(fastly::api::http_resp::close(resp_handle))
}

#[export_name = "fastly_http_resp#get_addr_dest_ip"]
pub fn get_addr_dest_ip(
resp_handle: ResponseHandle,
addr_octets_out: *mut u8,
nwritten_out: *mut usize,
) -> FastlyStatus {
alloc_result!(addr_octets_out, 16, nwritten_out, {
fastly::api::http_resp::get_addr_dest_ip(resp_handle)
})
}

#[export_name = "fastly_http_resp#get_addr_dest_port"]
pub fn get_addr_dest_port(resp_handle: ResponseHandle, port_out: *mut u16) -> FastlyStatus {
match fastly::api::http_resp::get_addr_dest_port(resp_handle) {
Ok(port) => {
unsafe {
*port_out = port;
}
FastlyStatus::OK
}
Err(e) => e.into(),
}
}
}

pub mod fastly_dictionary {
Expand Down
18 changes: 18 additions & 0 deletions lib/compute-at-edge-abi/compute-at-edge.witx
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,24 @@
(param $mode $http_keepalive_mode)
(result $err (expected (error $fastly_status)))
)

;;; Hostcall for getting the destination IP used for this request.
;;;
;;; The buffer for the IP address must be 16 bytes. This will return
;;; the number of bytes written to the buffer: 4 for IPv4 addresses,
;;; and 16 for IPv6.
(@interface func (export "get_addr_dest_ip")
(param $h $response_handle)
;; must be a 16-byte array
(param $addr_octets_out (@witx pointer (@witx char8)))
(result $err (expected $num_bytes (error $fastly_status)))
)

;;; Hostcall for getting the destination port used for this request.
(@interface func (export "get_addr_dest_port")
(param $h $response_handle)
(result $err (expected $port (error $fastly_status)))
)
)

(module $fastly_dictionary
Expand Down
68 changes: 68 additions & 0 deletions lib/src/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use crate::wiggle_abi::types::CacheOverrideTag;
use http::HeaderValue;

/// Optional override for response caching behavior.
#[derive(Clone, Debug, Default)]
pub enum CacheOverride {
/// Do not override the behavior specified in the origin response's cache control headers.
#[default]
None,
/// Do not cache the response to this request, regardless of the origin response's headers.
Pass,
/// Override particular cache control settings.
///
/// The origin response's cache control headers will be used for ttl and stale_while_revalidate if `None`.
Override {
ttl: Option<u32>,
stale_while_revalidate: Option<u32>,
pci: bool,
surrogate_key: Option<HeaderValue>,
},
}

impl CacheOverride {
pub fn is_pass(&self) -> bool {
if let Self::Pass = self {
true
} else {
false
}
}

/// Convert from the representation suitable for passing across the ABI boundary.
///
/// Returns `None` if the tag is not recognized. Depending on the tag, some of the values may be
/// ignored.
pub fn from_abi(
tag: u32,
ttl: u32,
swr: u32,
surrogate_key: Option<HeaderValue>,
) -> Option<Self> {
CacheOverrideTag::from_bits(tag).map(|tag| {
if tag.contains(CacheOverrideTag::PASS) {
return CacheOverride::Pass;
}
if tag.is_empty() && surrogate_key.is_none() {
return CacheOverride::None;
}
let ttl = if tag.contains(CacheOverrideTag::TTL) {
Some(ttl)
} else {
None
};
let stale_while_revalidate = if tag.contains(CacheOverrideTag::STALE_WHILE_REVALIDATE) {
Some(swr)
} else {
None
};
let pci = tag.contains(CacheOverrideTag::PCI);
CacheOverride::Override {
ttl,
stale_while_revalidate,
pci,
surrogate_key,
}
})
}
}
40 changes: 39 additions & 1 deletion lib/src/component/http_resp.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use {
super::fastly::api::{http_resp, http_types, types},
super::{headers::write_values, types::TrappableError},
crate::{error::Error, session::Session},
crate::{error::Error, session::Session, upstream},
cfg_if::cfg_if,
http::{HeaderName, HeaderValue},
hyper::http::response::Response,
std::net::IpAddr,
};

const MAX_HEADER_NAME_LEN: usize = (1 << 16) - 1;
Expand Down Expand Up @@ -286,4 +287,41 @@ impl http_resp::Host for Session {
http_resp::KeepaliveMode::Automatic => Ok(()),
}
}

async fn get_addr_dest_ip(
&mut self,
resp_handle: http_types::ResponseHandle,
) -> Result<Vec<u8>, types::Error> {
let resp = self.response_parts(resp_handle.into())?;
let md = resp
.extensions
.get::<upstream::ConnMetadata>()
.ok_or(Error::ValueAbsent)?;

match md.remote_addr.ip() {
IpAddr::V4(addr) => {
let octets = addr.octets();
debug_assert_eq!(octets.len(), 4);
Ok(Vec::from(octets))
}
IpAddr::V6(addr) => {
let octets = addr.octets();
debug_assert_eq!(octets.len(), 16);
Ok(Vec::from(octets))
}
}
}

async fn get_addr_dest_port(
&mut self,
resp_handle: http_types::ResponseHandle,
) -> Result<u16, types::Error> {
let resp = self.response_parts(resp_handle.into())?;
let md = resp
.extensions
.get::<upstream::ConnMetadata>()
.ok_or(Error::ValueAbsent)?;
let port = md.remote_addr.port();
Ok(port)
}
}
1 change: 1 addition & 0 deletions lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

pub mod adapt;
pub mod body;
pub mod cache;
pub mod config;
pub mod error;
pub mod logging;
Expand Down
68 changes: 52 additions & 16 deletions lib/src/upstream.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
body::{Body, Chunk},
cache::CacheOverride,
config::Backend,
error::Error,
headers::filter_outgoing_headers,
Expand All @@ -12,6 +13,7 @@ use hyper::{client::HttpConnector, header, Client, HeaderMap, Request, Response,
use rustls::client::ServerName;
use std::{
io,
net::SocketAddr,
pin::Pin,
str::FromStr,
sync::Arc,
Expand Down Expand Up @@ -96,8 +98,23 @@ impl BackendConnector {
type BoxError = Box<dyn std::error::Error + Send + Sync>;

pub enum Connection {
Http(TcpStream),
Https(Box<TlsStream<TcpStream>>),
Http(TcpStream, ConnMetadata),
Https(Box<TlsStream<TcpStream>>, ConnMetadata),
}

impl Connection {
fn metadata(&self) -> &ConnMetadata {
match self {
Connection::Http(_, md) => &md,
Connection::Https(_, md) => &md,
}
}
}

#[derive(Clone)]
pub struct ConnMetadata {
pub direct_pass: bool,
pub remote_addr: SocketAddr,
}

impl hyper::service::Service<Uri> for BackendConnector {
Expand Down Expand Up @@ -138,7 +155,13 @@ impl hyper::service::Service<Uri> for BackendConnector {
Box::pin(async move {
let tcp = connect_fut.await.map_err(Box::new)?;

if backend.uri.scheme_str() == Some("https") {
let remote_addr = tcp.peer_addr()?;
let metadata = ConnMetadata {
direct_pass: false,
remote_addr,
};

let conn = if backend.uri.scheme_str() == Some("https") {
let mut config = if let Some(certed_key) = &backend.client_cert {
config.with_client_auth_cert(certed_key.certs(), certed_key.key())?
} else {
Expand Down Expand Up @@ -181,10 +204,12 @@ impl hyper::service::Service<Uri> for BackendConnector {
}
}

Ok(Connection::Https(Box::new(tls)))
Connection::Https(Box::new(tls), metadata)
} else {
Ok(Connection::Http(tcp))
}
Connection::Http(tcp, metadata)
};

Ok(conn)
})
}
}
Expand Down Expand Up @@ -288,7 +313,13 @@ pub fn send_request(
builder.http2_only(true);
}

let basic_response = builder
let is_pass = req
.extensions()
.get::<CacheOverride>()
.map(CacheOverride::is_pass)
.unwrap_or_default();

let mut basic_response = builder
.set_host(false)
.http2_only(h2only)
.build(connector)
Expand All @@ -299,6 +330,11 @@ pub fn send_request(
e
})?;

if let Some(md) = basic_response.extensions_mut().get_mut::<ConnMetadata>() {
// This is used later to create similar behaviour between Compute and Viceroy.
md.direct_pass = is_pass;
}

if try_decompression
&& basic_response
.headers()
Expand Down Expand Up @@ -346,7 +382,7 @@ pub struct SelectTarget {

impl hyper::client::connect::Connection for Connection {
fn connected(&self) -> hyper::client::connect::Connected {
hyper::client::connect::Connected::new()
hyper::client::connect::Connected::new().extra(self.metadata().clone())
}
}

Expand All @@ -357,8 +393,8 @@ impl AsyncRead for Connection {
buf: &mut ReadBuf<'_>,
) -> Poll<Result<(), io::Error>> {
match Pin::get_mut(self) {
Connection::Http(s) => Pin::new(s).poll_read(cx, buf),
Connection::Https(s) => Pin::new(s).poll_read(cx, buf),
Connection::Http(s, _) => Pin::new(s).poll_read(cx, buf),
Connection::Https(s, _) => Pin::new(s).poll_read(cx, buf),
}
}
}
Expand All @@ -370,22 +406,22 @@ impl AsyncWrite for Connection {
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
match Pin::get_mut(self) {
Connection::Http(s) => Pin::new(s).poll_write(cx, buf),
Connection::Https(s) => Pin::new(s).poll_write(cx, buf),
Connection::Http(s, _) => Pin::new(s).poll_write(cx, buf),
Connection::Https(s, _) => Pin::new(s).poll_write(cx, buf),
}
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
match Pin::get_mut(self) {
Connection::Http(s) => Pin::new(s).poll_flush(cx),
Connection::Https(s) => Pin::new(s).poll_flush(cx),
Connection::Http(s, _) => Pin::new(s).poll_flush(cx),
Connection::Https(s, _) => Pin::new(s).poll_flush(cx),
}
}

fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
match Pin::get_mut(self) {
Connection::Http(s) => Pin::new(s).poll_shutdown(cx),
Connection::Https(s) => Pin::new(s).poll_shutdown(cx),
Connection::Http(s, _) => Pin::new(s).poll_shutdown(cx),
Connection::Https(s, _) => Pin::new(s).poll_shutdown(cx),
}
}
}
29 changes: 24 additions & 5 deletions lib/src/wiggle_abi/req_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use super::types::SendErrorDetail;
use super::SecretStoreError;
use crate::cache::CacheOverride;
use crate::config::ClientCertInfo;
use crate::secret_store::SecretLookup;

Expand Down Expand Up @@ -40,20 +41,24 @@ impl FastlyHttpReq for Session {
Ok((req_handle, body_handle))
}

#[allow(unused_variables)] // FIXME KTM 2020-06-25: Remove this directive once implemented.
fn cache_override_set(
&mut self,
memory: &mut GuestMemory<'_>,
_memory: &mut GuestMemory<'_>,
req_handle: RequestHandle,
tag: CacheOverrideTag,
ttl: u32,
stale_while_revalidate: u32,
) -> Result<(), Error> {
// For now, we ignore caching directives because we never cache anything
let overrides = CacheOverride::from_abi(u32::from(tag), ttl, stale_while_revalidate, None)
.ok_or(Error::InvalidArgument)?;

self.request_parts_mut(req_handle)?
.extensions
.insert(overrides);

Ok(())
}

#[allow(unused_variables)] // FIXME KTM 2020-06-25: Remove this directive once implemented.
fn cache_override_v2_set(
&mut self,
memory: &mut GuestMemory<'_>,
Expand All @@ -63,7 +68,21 @@ impl FastlyHttpReq for Session {
stale_while_revalidate: u32,
sk: GuestPtr<[u8]>,
) -> Result<(), Error> {
// For now, we ignore caching directives because we never cache anything
let sk = if sk.len() > 0 {
let sk = memory.as_slice(sk)?.ok_or(Error::SharedMemory)?;
let sk = HeaderValue::from_bytes(&sk).map_err(|_| Error::InvalidArgument)?;
Some(sk)
} else {
None
};

let overrides = CacheOverride::from_abi(u32::from(tag), ttl, stale_while_revalidate, sk)
.ok_or(Error::InvalidArgument)?;

self.request_parts_mut(req_handle)?
.extensions
.insert(overrides);

Ok(())
}

Expand Down
Loading

0 comments on commit c5c4e35

Please sign in to comment.