Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async inserts for kv store #329

Merged
merged 1 commit into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions lib/compute-at-edge-abi/compute-at-edge.witx
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,19 @@
(param $body_handle $body_handle)
(result $err (expected (error $fastly_status)))
)

(@interface func (export "insert_async")
(param $store $object_store_handle)
(param $key string)
(param $body_handle $body_handle)
(param $pending_handle_out (@witx pointer $pending_kv_insert_handle))
(result $err (expected (error $fastly_status)))
)

(@interface func (export "pending_insert_wait")
(param $pending_objstr_handle $pending_kv_insert_handle)
(result $err (expected (error $fastly_status)))
)
)

(module $fastly_secret_store
Expand Down
4 changes: 3 additions & 1 deletion lib/compute-at-edge-abi/typenames.witx
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@
(typename $dictionary_handle (handle))
;;; A handle to an Object Store.
(typename $object_store_handle (handle))
;;; A handle to a pending KV request.
;;; A handle to a pending KV lookup request.
(typename $pending_kv_lookup_handle (handle))
;;; A handle to a pending KV insert request.
(typename $pending_kv_insert_handle (handle))
;;; A handle to a Secret Store.
(typename $secret_store_handle (handle))
;;; A handle to an individual secret.
Expand Down
4 changes: 4 additions & 0 deletions lib/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ pub enum HandleError {
#[error("Invalid pending KV lookup handle: {0}")]
InvalidPendingKvLookupHandle(crate::wiggle_abi::types::PendingKvLookupHandle),

/// A insert handle was not valid.
#[error("Invalid pending KV insert handle: {0}")]
InvalidPendingKvInsertHandle(crate::wiggle_abi::types::PendingKvInsertHandle),

/// A dictionary handle was not valid.
#[error("Invalid dictionary handle: {0}")]
InvalidDictionaryHandle(crate::wiggle_abi::types::DictionaryHandle),
Expand Down
74 changes: 68 additions & 6 deletions lib/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
mod async_item;
mod downstream;

pub use async_item::{AsyncItem, PeekableTask, PendingKvTask};
pub use async_item::{AsyncItem, PeekableTask, PendingKvInsertTask, PendingKvLookupTask};

use {
self::downstream::DownstreamResponse,
Expand All @@ -18,8 +18,8 @@ use {
upstream::{SelectTarget, TlsConfig},
wiggle_abi::types::{
self, BodyHandle, ContentEncodings, DictionaryHandle, EndpointHandle,
ObjectStoreHandle, PendingKvLookupHandle, PendingRequestHandle, RequestHandle,
ResponseHandle, SecretHandle, SecretStoreHandle,
ObjectStoreHandle, PendingKvInsertHandle, PendingKvLookupHandle, PendingRequestHandle,
RequestHandle, ResponseHandle, SecretHandle, SecretStoreHandle,
},
},
cranelift_entity::{entity_impl, PrimaryMap},
Expand Down Expand Up @@ -631,6 +631,53 @@ impl Session {
) -> Result<(), ObjectStoreError> {
self.object_store.insert(obj_store_key, obj_key, obj)
}

/// Insert a [`PendingKvInsert`] into the session.
///
/// This method returns a new [`PendingKvInsertHandle`], which can then be used to access
/// and mutate the pending insert.
pub fn insert_pending_kv_insert(
&mut self,
pending: PendingKvInsertTask,
) -> PendingKvInsertHandle {
self.async_items
.push(Some(AsyncItem::PendingKvInsert(pending)))
.into()
}

/// Take ownership of a [`PendingKvInsert`], given its [`PendingKvInsertHandle`].
///
/// Returns a [`HandleError`] if the handle is not associated with a pending insert in the
/// session.
pub fn take_pending_kv_insert(
&mut self,
handle: PendingKvInsertHandle,
) -> Result<PendingKvInsertTask, HandleError> {
// check that this is a pending request before removing it
let _ = self.pending_kv_insert(handle)?;

self.async_items
.get_mut(handle.into())
.and_then(Option::take)
.and_then(AsyncItem::into_pending_kv_insert)
.ok_or(HandleError::InvalidPendingKvInsertHandle(handle))
}

/// Get a reference to a [`PendingInsert`], given its [`PendingKvInsertHandle`].
///
/// Returns a [`HandleError`] if the handle is not associated with a insert in the
/// session.
pub fn pending_kv_insert(
&self,
handle: PendingKvInsertHandle,
) -> Result<&PendingKvInsertTask, HandleError> {
self.async_items
.get(handle.into())
.and_then(Option::as_ref)
.and_then(AsyncItem::as_pending_kv_insert)
.ok_or(HandleError::InvalidPendingKvInsertHandle(handle))
}

pub fn obj_lookup(
&self,
obj_store_key: &ObjectStoreKey,
Expand All @@ -643,7 +690,10 @@ impl Session {
///
/// This method returns a new [`PendingKvLookupHandle`], which can then be used to access
/// and mutate the pending lookup.
pub fn insert_pending_kv_lookup(&mut self, pending: PendingKvTask) -> PendingKvLookupHandle {
pub fn insert_pending_kv_lookup(
&mut self,
pending: PendingKvLookupTask,
) -> PendingKvLookupHandle {
self.async_items
.push(Some(AsyncItem::PendingKvLookup(pending)))
.into()
Expand All @@ -656,7 +706,7 @@ impl Session {
pub fn take_pending_kv_lookup(
&mut self,
handle: PendingKvLookupHandle,
) -> Result<PendingKvTask, HandleError> {
) -> Result<PendingKvLookupTask, HandleError> {
// check that this is a pending request before removing it
let _ = self.pending_kv_lookup(handle)?;

Expand All @@ -674,7 +724,7 @@ impl Session {
pub fn pending_kv_lookup(
&self,
handle: PendingKvLookupHandle,
) -> Result<&PendingKvTask, HandleError> {
) -> Result<&PendingKvLookupTask, HandleError> {
self.async_items
.get(handle.into())
.and_then(Option::as_ref)
Expand Down Expand Up @@ -972,3 +1022,15 @@ impl From<AsyncItemHandle> for PendingKvLookupHandle {
PendingKvLookupHandle::from(h.as_u32())
}
}

impl From<PendingKvInsertHandle> for AsyncItemHandle {
fn from(h: PendingKvInsertHandle) -> AsyncItemHandle {
AsyncItemHandle::from_u32(h.into())
}
}

impl From<AsyncItemHandle> for PendingKvInsertHandle {
fn from(h: AsyncItemHandle) -> PendingKvInsertHandle {
PendingKvInsertHandle::from(h.as_u32())
}
}
35 changes: 29 additions & 6 deletions lib/src/session/async_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use futures::FutureExt;
use http::Response;
use tokio::sync::oneshot;

pub type PendingKvTask = PeekableTask<Result<Vec<u8>, ObjectStoreError>>;
pub type PendingKvLookupTask = PeekableTask<Result<Vec<u8>, ObjectStoreError>>;
pub type PendingKvInsertTask = PeekableTask<Result<(), ObjectStoreError>>;

/// Represents either a full body, or the write end of a streaming body.
///
Expand All @@ -17,7 +18,8 @@ pub enum AsyncItem {
Body(Body),
StreamingBody(StreamingBody),
PendingReq(PeekableTask<Response<Body>>),
PendingKvLookup(PendingKvTask),
PendingKvLookup(PendingKvLookupTask),
PendingKvInsert(PendingKvInsertTask),
}

impl AsyncItem {
Expand Down Expand Up @@ -74,20 +76,34 @@ impl AsyncItem {
}
}

pub fn as_pending_kv_lookup(&self) -> Option<&PendingKvTask> {
pub fn as_pending_kv_lookup(&self) -> Option<&PendingKvLookupTask> {
match self {
Self::PendingKvLookup(req) => Some(req),
_ => None,
}
}

pub fn into_pending_kv_lookup(self) -> Option<PendingKvTask> {
pub fn into_pending_kv_lookup(self) -> Option<PendingKvLookupTask> {
match self {
Self::PendingKvLookup(req) => Some(req),
_ => None,
}
}

pub fn as_pending_kv_insert(&self) -> Option<&PendingKvInsertTask> {
match self {
Self::PendingKvInsert(req) => Some(req),
_ => None,
}
}

pub fn into_pending_kv_insert(self) -> Option<PendingKvInsertTask> {
match self {
Self::PendingKvInsert(req) => Some(req),
_ => None,
}
}

pub fn as_pending_req(&self) -> Option<&PeekableTask<Response<Body>>> {
match self {
Self::PendingReq(req) => Some(req),
Expand Down Expand Up @@ -115,6 +131,7 @@ impl AsyncItem {
Self::Body(body) => body.await_ready().await,
Self::PendingReq(req) => req.await_ready().await,
Self::PendingKvLookup(obj) => obj.await_ready().await,
Self::PendingKvInsert(obj) => obj.await_ready().await,
}
}

Expand All @@ -129,12 +146,18 @@ impl From<PeekableTask<Response<Body>>> for AsyncItem {
}
}

impl From<PendingKvTask> for AsyncItem {
fn from(task: PendingKvTask) -> Self {
impl From<PendingKvLookupTask> for AsyncItem {
fn from(task: PendingKvLookupTask) -> Self {
Self::PendingKvLookup(task)
}
}

impl From<PendingKvInsertTask> for AsyncItem {
fn from(task: PendingKvInsertTask) -> Self {
Self::PendingKvInsert(task)
}
}

#[derive(Debug)]
pub enum PeekableTask<T> {
Waiting(oneshot::Receiver<Result<T, Error>>),
Expand Down
2 changes: 1 addition & 1 deletion lib/src/wiggle_abi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ wiggle::from_witx!({
errors: { fastly_status => Error },
async: {
fastly_async_io::{select},
fastly_object_store::{insert, lookup_async, pending_lookup_wait},
fastly_object_store::{insert, insert_async, pending_insert_wait, lookup_async, pending_lookup_wait},
fastly_http_body::{append, read, write},
fastly_http_req::{
pending_req_select, pending_req_select_v2, pending_req_poll, pending_req_poll_v2,
Expand Down
28 changes: 27 additions & 1 deletion lib/src/wiggle_abi/obj_store_impl.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! fastly_obj_store` hostcall implementations.

use super::types::PendingKvLookupHandle;
use super::types::{PendingKvInsertHandle, PendingKvLookupHandle};
use crate::session::PeekableTask;

use {
Expand Down Expand Up @@ -101,4 +101,30 @@ impl FastlyObjectStore for Session {

Ok(())
}

async fn insert_async<'a>(
&mut self,
store: ObjectStoreHandle,
key: &GuestPtr<str>,
body_handle: BodyHandle,
opt_pending_body_handle_out: &GuestPtr<PendingKvInsertHandle>,
) -> Result<(), Error> {
let store = self.get_obj_store_key(store).unwrap().clone();
let key = ObjectKey::new(&*key.as_str()?.ok_or(Error::SharedMemory)?)?;
let bytes = self.take_body(body_handle)?.read_into_vec().await?;
let fut = futures::future::ok(self.obj_insert(store, key, bytes));
let task = PeekableTask::spawn(fut).await;
opt_pending_body_handle_out.write(self.insert_pending_kv_insert(task))?;
Ok(())
}

async fn pending_insert_wait(
&mut self,
pending_insert_handle: PendingKvInsertHandle,
) -> Result<(), Error> {
Ok((self
.take_pending_kv_insert(pending_insert_handle)?
.recv()
.await?)?)
}
}