Skip to content

Commit

Permalink
Merge pull request #7 from restatedev/other-e2e-tests
Browse files Browse the repository at this point in the history
Other e2e tests
  • Loading branch information
slinkydeveloper authored Aug 19, 2024
2 parents 0be7406 + 6c78a34 commit a0be271
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 46 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ version = "0.0.1"
edition = "2021"

[features]
default = ["http"]
default = ["http", "anyhow"]
http = ["hyper", "http-body-util", "hyper-util", "tokio/net", "tokio/signal", "restate-sdk-shared-core/http"]

[dependencies]
anyhow = {version = "1.0", optional = true}
bytes = "1.6.1"
futures = "0.3"
http-body-util = { version = "0.1", optional = true }
Expand Down
8 changes: 8 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ impl StdError for HandlerErrorInner {
#[derive(Debug)]
pub struct HandlerError(pub(crate) HandlerErrorInner);

impl HandlerError {
#[cfg(feature = "anyhow")]
pub fn from_anyhow(err: anyhow::Error) -> Self {
Self(HandlerErrorInner::Retryable(err.into()))
}
}

impl<E: StdError + Send + Sync + 'static> From<E> for HandlerError {
fn from(value: E) -> Self {
Self(HandlerErrorInner::Retryable(Box::new(value)))
Expand All @@ -44,6 +51,7 @@ impl From<TerminalError> for HandlerError {
}
}

// Took from anyhow
impl AsRef<dyn StdError + Send + Sync> for HandlerError {
fn as_ref(&self) -> &(dyn StdError + Send + Sync + 'static) {
&self.0
Expand Down
2 changes: 1 addition & 1 deletion src/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl_serde_primitives!(f64);

// --- Json responses

pub struct Json<T>(T);
pub struct Json<T>(pub T);

impl<T> Json<T> {
pub fn into_inner(self) -> T {
Expand Down
2 changes: 2 additions & 0 deletions test-services/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ edition = "2021"
publish = false

[dependencies]
anyhow = "1.0"
tokio = { version = "1", features = ["full"] }
tracing-subscriber = "0.3"
futures = "0.3"
restate-sdk = { path = ".." }
serde = { version = "1", features = ["derive"] }
tracing = "0.1.40"
78 changes: 34 additions & 44 deletions test-services/exclusions.yaml
Original file line number Diff line number Diff line change
@@ -1,49 +1,39 @@
exclusions:
"alwaysSuspending":
- "dev.restate.sdktesting.tests.AwaitTimeout"
- "dev.restate.sdktesting.tests.ServiceToServiceCommunication"
- "dev.restate.sdktesting.tests.SideEffect"
- "dev.restate.sdktesting.tests.Sleep"
- "dev.restate.sdktesting.tests.SleepWithFailures"
- "dev.restate.sdktesting.tests.State"
- "dev.restate.sdktesting.tests.UpgradeWithInFlightInvocation"
- "dev.restate.sdktesting.tests.UpgradeWithNewInvocation"
- "dev.restate.sdktesting.tests.UserErrors"
- "dev.restate.sdktesting.tests.WorkflowAPI"
- "dev.restate.sdktesting.tests.AwaitTimeout"
- "dev.restate.sdktesting.tests.ServiceToServiceCommunication"
- "dev.restate.sdktesting.tests.SideEffect"
- "dev.restate.sdktesting.tests.Sleep"
- "dev.restate.sdktesting.tests.SleepWithFailures"
- "dev.restate.sdktesting.tests.UpgradeWithInFlightInvocation"
- "dev.restate.sdktesting.tests.UpgradeWithNewInvocation"
- "dev.restate.sdktesting.tests.UserErrors"
- "dev.restate.sdktesting.tests.WorkflowAPI"
"default":
- "dev.restate.sdktesting.tests.AwaitTimeout"
- "dev.restate.sdktesting.tests.CallOrdering"
- "dev.restate.sdktesting.tests.CancelInvocation"
- "dev.restate.sdktesting.tests.Ingress"
- "dev.restate.sdktesting.tests.KafkaIngress"
- "dev.restate.sdktesting.tests.KillInvocation"
- "dev.restate.sdktesting.tests.PrivateService"
- "dev.restate.sdktesting.tests.ServiceToServiceCommunication"
- "dev.restate.sdktesting.tests.Sleep"
- "dev.restate.sdktesting.tests.SleepWithFailures"
- "dev.restate.sdktesting.tests.State"
- "dev.restate.sdktesting.tests.UpgradeWithInFlightInvocation"
- "dev.restate.sdktesting.tests.UpgradeWithNewInvocation"
- "dev.restate.sdktesting.tests.UserErrors"
- "dev.restate.sdktesting.tests.WorkflowAPI"
"lazyState":
- "dev.restate.sdktesting.tests.State"
- "dev.restate.sdktesting.tests.AwaitTimeout"
- "dev.restate.sdktesting.tests.CallOrdering"
- "dev.restate.sdktesting.tests.CancelInvocation"
- "dev.restate.sdktesting.tests.Ingress"
- "dev.restate.sdktesting.tests.KillInvocation"
- "dev.restate.sdktesting.tests.ServiceToServiceCommunication"
- "dev.restate.sdktesting.tests.Sleep"
- "dev.restate.sdktesting.tests.SleepWithFailures"
- "dev.restate.sdktesting.tests.UpgradeWithInFlightInvocation"
- "dev.restate.sdktesting.tests.UpgradeWithNewInvocation"
- "dev.restate.sdktesting.tests.UserErrors"
- "dev.restate.sdktesting.tests.WorkflowAPI"
"persistedTimers":
- "dev.restate.sdktesting.tests.ServiceToServiceCommunication"
- "dev.restate.sdktesting.tests.Sleep"
- "dev.restate.sdktesting.tests.Sleep"
"singleThreadSinglePartition":
- "dev.restate.sdktesting.tests.AwaitTimeout"
- "dev.restate.sdktesting.tests.CallOrdering"
- "dev.restate.sdktesting.tests.CancelInvocation"
- "dev.restate.sdktesting.tests.Ingress"
- "dev.restate.sdktesting.tests.KafkaIngress"
- "dev.restate.sdktesting.tests.KillInvocation"
- "dev.restate.sdktesting.tests.PrivateService"
- "dev.restate.sdktesting.tests.ServiceToServiceCommunication"
- "dev.restate.sdktesting.tests.Sleep"
- "dev.restate.sdktesting.tests.SleepWithFailures"
- "dev.restate.sdktesting.tests.State"
- "dev.restate.sdktesting.tests.UpgradeWithInFlightInvocation"
- "dev.restate.sdktesting.tests.UpgradeWithNewInvocation"
- "dev.restate.sdktesting.tests.UserErrors"
- "dev.restate.sdktesting.tests.WorkflowAPI"
- "dev.restate.sdktesting.tests.AwaitTimeout"
- "dev.restate.sdktesting.tests.CallOrdering"
- "dev.restate.sdktesting.tests.CancelInvocation"
- "dev.restate.sdktesting.tests.Ingress"
- "dev.restate.sdktesting.tests.KillInvocation"
- "dev.restate.sdktesting.tests.ServiceToServiceCommunication"
- "dev.restate.sdktesting.tests.Sleep"
- "dev.restate.sdktesting.tests.SleepWithFailures"
- "dev.restate.sdktesting.tests.UpgradeWithInFlightInvocation"
- "dev.restate.sdktesting.tests.UpgradeWithNewInvocation"
- "dev.restate.sdktesting.tests.UserErrors"
- "dev.restate.sdktesting.tests.WorkflowAPI"
8 changes: 8 additions & 0 deletions test-services/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
mod counter;
mod map_object;
mod proxy;

use restate_sdk::prelude::{Endpoint, HyperServer};
use std::env;
Expand All @@ -14,6 +16,12 @@ async fn main() {
if services == "*" || services.contains("Counter") {
builder = builder.with_service(counter::Counter::serve(counter::CounterImpl))
}
if services == "*" || services.contains("Proxy") {
builder = builder.with_service(proxy::Proxy::serve(proxy::ProxyImpl))
}
if services == "*" || services.contains("MapObject") {
builder = builder.with_service(map_object::MapObject::serve(map_object::MapObjectImpl))
}

HyperServer::new(builder.build())
.listen_and_serve(format!("0.0.0.0:{port}").parse().unwrap())
Expand Down
55 changes: 55 additions & 0 deletions test-services/src/map_object.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use anyhow::anyhow;
use restate_sdk::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct Entry {
key: String,
value: String,
}

#[restate_sdk::object]
#[name = "MapObject"]
pub(crate) trait MapObject {
#[name = "set"]
async fn set(entry: Json<Entry>) -> HandlerResult<()>;
#[name = "get"]
async fn get(key: String) -> HandlerResult<String>;
#[name = "clearAll"]
async fn clear_all() -> HandlerResult<Json<Vec<Entry>>>;
}

pub(crate) struct MapObjectImpl;

impl MapObject for MapObjectImpl {
async fn set(
&self,
ctx: ObjectContext<'_>,
Json(Entry { key, value }): Json<Entry>,
) -> HandlerResult<()> {
ctx.set(&key, value);
Ok(())
}

async fn get(&self, ctx: ObjectContext<'_>, key: String) -> HandlerResult<String> {
Ok(ctx.get(&key).await?.unwrap_or_default())
}

async fn clear_all(&self, ctx: ObjectContext<'_>) -> HandlerResult<Json<Vec<Entry>>> {
let keys = ctx.get_keys().await?;

let mut entries = vec![];
for k in keys {
let value = ctx
.get(&k)
.await?
.ok_or_else(|| HandlerError::from_anyhow(anyhow!("Missing key {k}")))?;
entries.push(Entry { key: k, value })
}

ctx.clear_all();

Ok(entries.into())
}
}
107 changes: 107 additions & 0 deletions test-services/src/proxy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
use futures::future::BoxFuture;
use futures::FutureExt;
use restate_sdk::context::RequestTarget;
use restate_sdk::prelude::*;
use serde::{Deserialize, Serialize};
use std::time::Duration;

#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct ProxyRequest {
service_name: String,
virtual_object_key: Option<String>,
handler_name: String,
message: Vec<u8>,
delay_millis: Option<u64>,
}

impl ProxyRequest {
fn to_target(&self) -> RequestTarget {
if let Some(key) = &self.virtual_object_key {
RequestTarget::Object {
name: self.service_name.clone(),
key: key.clone(),
handler: self.handler_name.clone(),
}
} else {
RequestTarget::Service {
name: self.service_name.clone(),
handler: self.handler_name.clone(),
}
}
}
}

#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct ManyCallRequest {
proxy_request: ProxyRequest,
one_way_call: bool,
await_at_the_end: bool,
}

#[restate_sdk::service]
#[name = "Proxy"]
pub(crate) trait Proxy {
#[name = "call"]
async fn call(req: Json<ProxyRequest>) -> HandlerResult<Json<Vec<u8>>>;
#[name = "oneWayCall"]
async fn one_way_call(req: Json<ProxyRequest>) -> HandlerResult<()>;
#[name = "manyCalls"]
async fn many_calls(req: Json<Vec<ManyCallRequest>>) -> HandlerResult<()>;
}

pub(crate) struct ProxyImpl;

impl Proxy for ProxyImpl {
async fn call(
&self,
ctx: Context<'_>,
Json(req): Json<ProxyRequest>,
) -> HandlerResult<Json<Vec<u8>>> {
Ok(ctx.call(req.to_target(), req.message).await?)
}

async fn one_way_call(
&self,
ctx: Context<'_>,
Json(req): Json<ProxyRequest>,
) -> HandlerResult<()> {
ctx.send(
req.to_target(),
req.message,
req.delay_millis.map(Duration::from_millis),
);
Ok(())
}

async fn many_calls(
&self,
ctx: Context<'_>,
Json(requests): Json<Vec<ManyCallRequest>>,
) -> HandlerResult<()> {
let mut futures: Vec<BoxFuture<'_, Result<Vec<u8>, TerminalError>>> = vec![];

for req in requests {
if req.one_way_call {
ctx.send(
req.proxy_request.to_target(),
req.proxy_request.message,
req.proxy_request.delay_millis.map(Duration::from_millis),
);
} else {
let fut = ctx
.call::<_, Vec<u8>>(req.proxy_request.to_target(), req.proxy_request.message);
if req.await_at_the_end {
futures.push(fut.boxed())
}
}
}

for fut in futures {
fut.await?;
}

Ok(())
}
}

0 comments on commit a0be271

Please sign in to comment.