-
Notifications
You must be signed in to change notification settings - Fork 37
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add the object-store interface, and rework kv-store to match the host…
…calls
- Loading branch information
Showing
6 changed files
with
330 additions
and
121 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,140 +1,102 @@ | ||
use { | ||
super::fastly::api::{http_types, kv_store, types}, | ||
crate::{ | ||
body::Body, | ||
object_store::{ObjectKey, ObjectStoreError}, | ||
session::{ | ||
PeekableTask, PendingKvDeleteTask, PendingKvInsertTask, PendingKvLookupTask, Session, | ||
}, | ||
}, | ||
super::fastly::api::{http_body, kv_store, types}, | ||
crate::session::Session, | ||
}; | ||
|
||
pub struct LookupResult; | ||
|
||
#[async_trait::async_trait] | ||
impl kv_store::Host for Session { | ||
async fn open(&mut self, name: String) -> Result<Option<kv_store::Handle>, types::Error> { | ||
if self.object_store.store_exists(&name)? { | ||
let handle = self.obj_store_handle(&name)?; | ||
Ok(Some(handle.into())) | ||
} else { | ||
Ok(None) | ||
} | ||
impl kv_store::HostLookupResult for Session { | ||
async fn body( | ||
&mut self, | ||
_self_: wasmtime::component::Resource<kv_store::LookupResult>, | ||
) -> http_body::BodyHandle { | ||
todo!() | ||
} | ||
|
||
async fn lookup( | ||
async fn metadata( | ||
&mut self, | ||
store: kv_store::Handle, | ||
key: String, | ||
) -> Result<Option<kv_store::BodyHandle>, types::Error> { | ||
let store = self.get_obj_store_key(store.into()).unwrap(); | ||
let key = ObjectKey::new(&key)?; | ||
match self.obj_lookup(store, &key) { | ||
Ok(obj) => { | ||
let new_handle = self.insert_body(Body::from(obj)); | ||
Ok(Some(new_handle.into())) | ||
} | ||
// Don't write to the invalid handle as the SDK will return Ok(None) | ||
// if the object does not exist. We need to return `Ok(())` here to | ||
// make sure Viceroy does not crash | ||
Err(ObjectStoreError::MissingObject) => Ok(None), | ||
Err(err) => Err(err.into()), | ||
} | ||
_self_: wasmtime::component::Resource<kv_store::LookupResult>, | ||
) -> Option<Vec<u8>> { | ||
todo!() | ||
} | ||
|
||
async fn lookup_async( | ||
async fn generation( | ||
&mut self, | ||
store: kv_store::Handle, | ||
key: String, | ||
) -> Result<kv_store::PendingLookupHandle, types::Error> { | ||
let store = self.get_obj_store_key(store.into()).unwrap(); | ||
let key = ObjectKey::new(key)?; | ||
// just create a future that's already ready | ||
let fut = futures::future::ok(self.obj_lookup(store, &key)); | ||
let task = PendingKvLookupTask::new(PeekableTask::spawn(fut).await); | ||
Ok(self.insert_pending_kv_lookup(task).into()) | ||
_self_: wasmtime::component::Resource<kv_store::LookupResult>, | ||
) -> u32 { | ||
todo!() | ||
} | ||
|
||
async fn pending_lookup_wait( | ||
fn drop( | ||
&mut self, | ||
pending: kv_store::PendingLookupHandle, | ||
) -> Result<Option<kv_store::BodyHandle>, types::Error> { | ||
let pending_obj = self | ||
.take_pending_kv_lookup(pending.into())? | ||
.task() | ||
.recv() | ||
.await?; | ||
// proceed with the normal match from lookup() | ||
match pending_obj { | ||
Ok(obj) => Ok(Some(self.insert_body(Body::from(obj)).into())), | ||
Err(ObjectStoreError::MissingObject) => Ok(None), | ||
Err(err) => Err(err.into()), | ||
} | ||
_rep: wasmtime::component::Resource<kv_store::LookupResult>, | ||
) -> wasmtime::Result<()> { | ||
todo!() | ||
} | ||
} | ||
|
||
async fn insert( | ||
#[async_trait::async_trait] | ||
impl kv_store::Host for Session { | ||
async fn open(&mut self, _name: String) -> Result<Option<kv_store::Handle>, types::Error> { | ||
todo!() | ||
} | ||
|
||
async fn lookup( | ||
&mut self, | ||
store: kv_store::Handle, | ||
key: String, | ||
body_handle: http_types::BodyHandle, | ||
) -> Result<(), types::Error> { | ||
let store = self.get_obj_store_key(store.into()).unwrap().clone(); | ||
let key = ObjectKey::new(&key)?; | ||
let bytes = self.take_body(body_handle.into())?.read_into_vec().await?; | ||
self.obj_insert(store, key, bytes)?; | ||
_store: kv_store::Handle, | ||
_key: String, | ||
) -> Result<Option<kv_store::BodyHandle>, types::Error> { | ||
todo!() | ||
} | ||
|
||
Ok(()) | ||
async fn lookup_wait( | ||
&mut self, | ||
_handle: kv_store::LookupHandle, | ||
) -> Result<Option<wasmtime::component::Resource<kv_store::LookupResult>>, types::Error> { | ||
todo!() | ||
} | ||
|
||
async fn insert_async( | ||
async fn insert( | ||
&mut self, | ||
store: kv_store::Handle, | ||
key: String, | ||
body_handle: http_types::BodyHandle, | ||
) -> Result<kv_store::PendingInsertHandle, types::Error> { | ||
let store = self.get_obj_store_key(store.into()).unwrap().clone(); | ||
let key = ObjectKey::new(&key)?; | ||
let bytes = self.take_body(body_handle.into())?.read_into_vec().await?; | ||
let fut = futures::future::ok(self.obj_insert(store, key, bytes)); | ||
let task = PeekableTask::spawn(fut).await; | ||
_store: kv_store::Handle, | ||
_key: String, | ||
_body_handle: kv_store::BodyHandle, | ||
_mask: kv_store::InsertConfigOptions, | ||
_config: kv_store::InsertConfig, | ||
) -> Result<kv_store::InsertHandle, types::Error> { | ||
todo!() | ||
} | ||
|
||
Ok(self | ||
.insert_pending_kv_insert(PendingKvInsertTask::new(task)) | ||
.into()) | ||
async fn insert_wait(&mut self, _handle: kv_store::InsertHandle) -> Result<(), types::Error> { | ||
todo!() | ||
} | ||
|
||
async fn pending_insert_wait( | ||
async fn delete( | ||
&mut self, | ||
handle: kv_store::PendingInsertHandle, | ||
) -> Result<(), types::Error> { | ||
Ok((self | ||
.take_pending_kv_insert(handle.into())? | ||
.task() | ||
.recv() | ||
.await?)?) | ||
_store: kv_store::Handle, | ||
_key: String, | ||
) -> Result<kv_store::DeleteHandle, types::Error> { | ||
todo!() | ||
} | ||
|
||
async fn delete_async( | ||
&mut self, | ||
store: kv_store::Handle, | ||
key: String, | ||
) -> Result<kv_store::PendingDeleteHandle, types::Error> { | ||
let store = self.get_obj_store_key(store.into()).unwrap().clone(); | ||
let key = ObjectKey::new(&key)?; | ||
let fut = futures::future::ok(self.obj_delete(store, key)); | ||
let task = PeekableTask::spawn(fut).await; | ||
async fn delete_wait(&mut self, _handle: kv_store::DeleteHandle) -> Result<(), types::Error> { | ||
todo!() | ||
} | ||
|
||
Ok(self | ||
.insert_pending_kv_delete(PendingKvDeleteTask::new(task)) | ||
.into()) | ||
async fn list( | ||
&mut self, | ||
_store: kv_store::Handle, | ||
_mask: kv_store::ListConfigOptions, | ||
_options: kv_store::ListConfig, | ||
) -> Result<kv_store::ListHandle, types::Error> { | ||
todo!() | ||
} | ||
|
||
async fn pending_delete_wait( | ||
async fn list_wait( | ||
&mut self, | ||
handle: kv_store::PendingDeleteHandle, | ||
) -> Result<(), types::Error> { | ||
Ok((self | ||
.take_pending_kv_delete(handle.into())? | ||
.task() | ||
.recv() | ||
.await?)?) | ||
_handle: kv_store::ListHandle, | ||
) -> Result<kv_store::BodyHandle, types::Error> { | ||
todo!() | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.