From 22fcef1b96c3b03378a72f5112f3791e8123d5cf Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Wed, 8 Jan 2025 18:07:40 +0200 Subject: [PATCH] Add an end-to-end test for trim gap handling using snapshots (#2463) --- Cargo.lock | 4 +- Cargo.toml | 1 + crates/local-cluster-runner/src/node/mod.rs | 30 +- server/Cargo.toml | 3 + server/tests/common/replicated_loglet.rs | 5 +- server/tests/snapshots.rs | 169 -------- server/tests/trim_gap_handling.rs | 345 ++++++++++++++++ tools/mock-service-endpoint/Cargo.toml | 4 +- tools/mock-service-endpoint/src/handler.rs | 393 +++++++++++++++++++ tools/mock-service-endpoint/src/lib.rs | 12 + tools/mock-service-endpoint/src/listener.rs | 57 +++ tools/mock-service-endpoint/src/main.rs | 414 +------------------- 12 files changed, 849 insertions(+), 588 deletions(-) delete mode 100644 server/tests/snapshots.rs create mode 100644 server/tests/trim_gap_handling.rs create mode 100644 tools/mock-service-endpoint/src/handler.rs create mode 100644 tools/mock-service-endpoint/src/lib.rs create mode 100644 tools/mock-service-endpoint/src/listener.rs diff --git a/Cargo.lock b/Cargo.lock index 1af9e57c8..fb012d63e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4380,7 +4380,6 @@ dependencies = [ "serde_json", "thiserror 2.0.6", "tokio", - "tokio-stream", "tracing", "tracing-subscriber", "workspace-hack", @@ -6770,8 +6769,10 @@ dependencies = [ "googletest", "humantime", "hyper-util", + "mock-service-endpoint", "pin-project", "regex", + "reqwest", "restate-admin", "restate-bifrost", "restate-core", @@ -6788,6 +6789,7 @@ dependencies = [ "rust-rocksdb", "schemars", "serde", + "serde_json", "serde_with", "tempfile", "test-log", diff --git a/Cargo.toml b/Cargo.toml index eae5041b5..297ebcec3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ description = "Restate makes distributed applications easy!" [workspace.dependencies] # Own crates codederror = { path = "crates/codederror" } +mock-service-endpoint = { path = "tools/mock-service-endpoint" } restate-admin = { path = "crates/admin" } restate-admin-rest-model = { path = "crates/admin-rest-model" } restate-base64-util = { path = "crates/base64-util" } diff --git a/crates/local-cluster-runner/src/node/mod.rs b/crates/local-cluster-runner/src/node/mod.rs index be3fe722c..87650240d 100644 --- a/crates/local-cluster-runner/src/node/mod.rs +++ b/crates/local-cluster-runner/src/node/mod.rs @@ -225,7 +225,7 @@ impl Node { nodes } - /// Start this Node, providing the base_dir and the cluster_name of the cluster its + /// Start this Node, providing the base_dir and the cluster_name of the cluster it's /// expected to attach to. All relative file paths addresses specified in the node config /// (eg, nodename/node.sock) will be absolutized against the base path, and the base dir /// and cluster name present in config will be overwritten. @@ -557,7 +557,7 @@ impl StartedNode { pid ); match nix::sys::signal::kill( - nix::unistd::Pid::from_raw(pid.try_into().unwrap()), + nix::unistd::Pid::from_raw(pid.try_into().expect("pid_t = i32")), nix::sys::signal::SIGKILL, ) { Ok(()) => (&mut self.status).await, @@ -582,7 +582,7 @@ impl StartedNode { pid ); match nix::sys::signal::kill( - nix::unistd::Pid::from_raw(pid.try_into().unwrap()), + nix::unistd::Pid::from_raw(pid.try_into().expect("pid_t = i32")), nix::sys::signal::SIGTERM, ) { Err(nix::errno::Errno::ESRCH) => { @@ -807,6 +807,30 @@ impl StartedNode { } } +impl Drop for StartedNode { + fn drop(&mut self) { + if let StartedNodeStatus::Running { pid, .. } = self.status { + warn!( + "Node {} (pid {}) dropped without explicit shutdown", + self.config.node_name(), + pid, + ); + match nix::sys::signal::kill( + nix::unistd::Pid::from_raw(pid.try_into().expect("pid_t = i32")), + nix::sys::signal::SIGKILL, + ) { + Ok(()) | Err(nix::errno::Errno::ESRCH) => {} + err => error!( + "Failed to send SIGKILL to running node {} (pid {}): {:?}", + self.config.node_name(), + pid, + err, + ), + } + } + } +} + #[derive(Debug, Clone, Copy)] pub enum HealthCheck { Admin, diff --git a/server/Cargo.toml b/server/Cargo.toml index 9bfb0af3c..59821a449 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -70,6 +70,7 @@ restate-core = { workspace = true, features = ["test-util"] } restate-local-cluster-runner = { workspace = true } restate-test-util = { workspace = true } restate-types = { workspace = true, features = ["test-util"] } +mock-service-endpoint = { workspace = true } anyhow = { workspace = true } async-trait = { workspace = true } @@ -81,6 +82,8 @@ test-log = { workspace = true } tonic = { workspace = true, features = ["transport", "prost"] } tower = { workspace = true } tracing-subscriber = { workspace = true } +reqwest = { workspace = true } +serde_json = { workspace = true } url = { workspace = true } [target.'cfg(not(target_env = "msvc"))'.dependencies] diff --git a/server/tests/common/replicated_loglet.rs b/server/tests/common/replicated_loglet.rs index c11c9a1a0..45032bdc9 100644 --- a/server/tests/common/replicated_loglet.rs +++ b/server/tests/common/replicated_loglet.rs @@ -89,7 +89,6 @@ pub struct TestEnv { pub loglet: Arc, pub metadata_writer: MetadataWriter, pub metadata_store_client: MetadataStoreClient, - pub cluster: StartedCluster, } pub async fn run_in_test_env( @@ -118,7 +117,7 @@ where RocksDbManager::init(Configuration::mapped_updateable(|c| &c.common)); - let cluster = Cluster::builder() + let mut cluster = Cluster::builder() .base_dir(base_dir.as_path().to_owned()) .nodes(nodes) .build() @@ -165,12 +164,12 @@ where future(TestEnv { bifrost, loglet, - cluster, metadata_writer, metadata_store_client, }) .await?; + cluster.graceful_shutdown(Duration::from_secs(1)).await?; TaskCenter::shutdown_node("test completed", 0).await; RocksDbManager::get().shutdown().await; Ok(()) diff --git a/server/tests/snapshots.rs b/server/tests/snapshots.rs deleted file mode 100644 index 5f0fdc584..000000000 --- a/server/tests/snapshots.rs +++ /dev/null @@ -1,169 +0,0 @@ -// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH. -// All rights reserved. -// -// Use of this software is governed by the Business Source License -// included in the LICENSE file. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0. - -use std::time::Duration; - -use enumset::enum_set; -use futures_util::StreamExt; -use googletest::fail; -use hyper_util::rt::TokioIo; -use regex::Regex; -use tempfile::TempDir; -use test_log::test; -use tokio::io; -use tokio::net::UnixStream; -use tonic::codec::CompressionEncoding; -use tonic::transport::{Channel, Endpoint, Uri}; -use tower::service_fn; -use url::Url; - -use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient; -use restate_admin::cluster_controller::protobuf::{ - ClusterStateRequest, CreatePartitionSnapshotRequest, -}; -use restate_local_cluster_runner::{ - cluster::Cluster, - node::{BinarySource, Node}, -}; -use restate_types::config::{LogFormat, MetadataStoreClient}; -use restate_types::net::AdvertisedAddress; -use restate_types::protobuf::cluster::node_state::State; -use restate_types::{config::Configuration, nodes_config::Role}; - -mod common; - -#[test(restate_core::test)] -async fn create_and_restore_snapshot() -> googletest::Result<()> { - let mut base_config = Configuration::default(); - base_config.common.bootstrap_num_partitions = 1.try_into()?; - base_config.common.log_filter = "restate=debug,warn".to_owned(); - base_config.common.log_format = LogFormat::Compact; - - let snapshots_dir = TempDir::new()?; - base_config.worker.snapshots.destination = Some( - Url::from_file_path(snapshots_dir.path()) - .unwrap() - .to_string(), - ); - - let nodes = Node::new_test_nodes_with_metadata( - base_config.clone(), - BinarySource::CargoTest, - enum_set!(Role::Worker), - 1, - ); - - let mut partition_ready = nodes[1].lines(Regex::new("Won the leadership campaign")?); - - let cluster = Cluster::builder() - .temp_base_dir() - .nodes(nodes.clone()) - .build() - .start() - .await?; - - cluster.wait_healthy(Duration::from_secs(30)).await?; - assert!(partition_ready.next().await.is_some()); - - let mut client = - ClusterCtrlSvcClient::new(grpc_connect(cluster.nodes[0].node_address().clone()).await?) - .accept_compressed(CompressionEncoding::Gzip); - - any_partition_active(&mut client, Duration::from_secs(5)).await?; - - let snapshot_response = client - .create_partition_snapshot(CreatePartitionSnapshotRequest { partition_id: 0 }) - .await? - .into_inner(); - - let mut node_2 = Node::new_test_node( - "node-2", - base_config, - BinarySource::CargoTest, - enum_set!(Role::Worker), - ); - *node_2.metadata_store_client_mut() = MetadataStoreClient::Embedded { - address: cluster.nodes[0].node_address().clone(), - }; - - let mut snapshot_restored = node_2.lines( - format!( - "Importing partition store snapshot.*{}", - snapshot_response.snapshot_id - ) - .parse()?, - ); - - node_2 - .start_clustered(cluster.base_dir(), cluster.cluster_name()) - .await?; - - assert!(snapshot_restored.next().await.is_some()); - Ok(()) -} - -async fn any_partition_active( - client: &mut ClusterCtrlSvcClient, - timeout: Duration, -) -> googletest::Result<()> { - let deadline = tokio::time::Instant::now() + timeout; - loop { - let cluster_state = client - .get_cluster_state(ClusterStateRequest {}) - .await? - .into_inner() - .cluster_state - .unwrap(); - - if cluster_state.nodes.values().any(|n| { - n.state.as_ref().is_some_and(|s| match s { - State::Alive(s) => s - .partitions - .values() - .any(|p| p.effective_mode.cmp(&1).is_eq()), - _ => false, - }) - }) { - break; // partition is ready; we can request snapshot - } - if tokio::time::Instant::now() > deadline { - fail!( - "Partition processor did not become ready within {:?}", - timeout - )?; - } - tokio::time::sleep(Duration::from_millis(250)).await; - } - Ok(()) -} - -async fn grpc_connect(address: AdvertisedAddress) -> Result { - match address { - AdvertisedAddress::Uds(uds_path) => { - // dummy endpoint required to specify an uds connector, it is not used anywhere - Endpoint::try_from("http://127.0.0.1") - .expect("/ should be a valid Uri") - .connect_with_connector(service_fn(move |_: Uri| { - let uds_path = uds_path.clone(); - async move { - Ok::<_, io::Error>(TokioIo::new(UnixStream::connect(uds_path).await?)) - } - })).await - } - AdvertisedAddress::Http(uri) => { - Channel::builder(uri) - .connect_timeout(Duration::from_secs(2)) - .timeout(Duration::from_secs(2)) - .http2_adaptive_window(true) - .connect() - .await - } - } -} diff --git a/server/tests/trim_gap_handling.rs b/server/tests/trim_gap_handling.rs new file mode 100644 index 000000000..3d7b3f346 --- /dev/null +++ b/server/tests/trim_gap_handling.rs @@ -0,0 +1,345 @@ +// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::net::SocketAddr; +use std::time::Duration; + +use enumset::enum_set; +use futures_util::StreamExt; +use googletest::fail; +use hyper_util::rt::TokioIo; +use tempfile::TempDir; +use tokio::io; +use tokio::net::UnixStream; +use tonic::codec::CompressionEncoding; +use tonic::transport::{Channel, Endpoint, Uri}; +use tower::service_fn; +use tracing::{error, info}; +use url::Url; + +use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient; +use restate_admin::cluster_controller::protobuf::{ + ClusterStateRequest, CreatePartitionSnapshotRequest, DescribeLogRequest, TrimLogRequest, +}; +use restate_local_cluster_runner::{ + cluster::Cluster, + node::{BinarySource, Node}, +}; +use restate_types::config::{LogFormat, MetadataStoreClient}; +use restate_types::identifiers::PartitionId; +use restate_types::logs::metadata::ProviderKind::Replicated; +use restate_types::logs::{LogId, Lsn}; +use restate_types::net::AdvertisedAddress; +use restate_types::protobuf::cluster::node_state::State; +use restate_types::protobuf::cluster::RunMode; +use restate_types::retries::RetryPolicy; +use restate_types::{config::Configuration, nodes_config::Role}; + +mod common; + +#[test_log::test(tokio::test)] +async fn fast_forward_over_trim_gap() -> googletest::Result<()> { + let mut base_config = Configuration::default(); + base_config.common.bootstrap_num_partitions = 1.try_into()?; + base_config.bifrost.default_provider = Replicated; + base_config.common.log_filter = "restate=debug,warn".to_owned(); + base_config.common.log_format = LogFormat::Compact; + + let no_snapshot_repository_config = base_config.clone(); + + let snapshots_dir = TempDir::new()?; + base_config.worker.snapshots.destination = Some( + Url::from_file_path(snapshots_dir.path()) + .unwrap() + .to_string(), + ); + + let nodes = Node::new_test_nodes_with_metadata( + base_config.clone(), + BinarySource::CargoTest, + enum_set!(Role::Worker | Role::LogServer), + 2, + ); + let admin_node = &nodes[0]; + + let worker_1 = &nodes[1]; + let worker_2 = &nodes[2]; + + let mut worker_1_ready = worker_1.lines("PartitionProcessor starting event loop".parse()?); + let mut worker_2_ready = worker_2.lines("PartitionProcessor starting event loop".parse()?); + + let mut cluster = Cluster::builder() + .temp_base_dir() + .nodes(nodes.clone()) + .build() + .start() + .await?; + + cluster.wait_healthy(Duration::from_secs(10)).await?; + tokio::time::timeout(Duration::from_secs(10), worker_1_ready.next()).await?; + tokio::time::timeout(Duration::from_secs(10), worker_2_ready.next()).await?; + + let mut client = + ClusterCtrlSvcClient::new(grpc_connect(cluster.nodes[0].node_address().clone()).await?) + .accept_compressed(CompressionEncoding::Gzip); + + tokio::time::timeout(Duration::from_secs(5), any_partition_active(&mut client)).await??; + + let addr: SocketAddr = "127.0.0.1:9080".parse()?; + tokio::spawn(async move { + info!("Starting mock service on http://{}", addr); + if let Err(e) = mock_service_endpoint::listener::run_listener(addr).await { + error!("Error running listener: {:?}", e); + } + }); + + let http_client = reqwest::Client::new(); + let registration_response = http_client + .post(format!( + "http://{}/deployments", + admin_node.config().admin.bind_address + )) + .header("content-type", "application/json") + .json(&serde_json::json!({ "uri": "http://localhost:9080" })) + .send() + .await?; + assert!(registration_response.status().is_success()); + + let ingress_url = format!( + "http://{}/Counter/0/get", + worker_1.config().ingress.bind_address + ); + + // It takes a little bit for the service to become available for invocations + let mut retry = RetryPolicy::fixed_delay(Duration::from_millis(500), Some(10)).into_iter(); + loop { + let invoke_response = http_client.post(ingress_url.clone()).send().await?; + if invoke_response.status().is_success() { + break; + } + if let Some(delay) = retry.next() { + tokio::time::sleep(delay).await; + } else { + fail!("Failed to invoke worker")?; + } + } + + let snapshot_response = client + .create_partition_snapshot(CreatePartitionSnapshotRequest { partition_id: 0 }) + .await? + .into_inner(); + + // todo(pavel): if create snapshot returned an LSN, we could trim the log to that specific LSN instead of guessing + tokio::time::timeout( + Duration::from_secs(3), + trim_log(&mut client, LogId::new(0), Lsn::new(3)), + ) + .await??; + + let mut worker_3 = Node::new_test_node( + "node-3", + no_snapshot_repository_config, + BinarySource::CargoTest, + enum_set!(Role::HttpIngress | Role::Worker), + ); + *worker_3.metadata_store_client_mut() = MetadataStoreClient::Embedded { + address: cluster.nodes[0].node_address().clone(), + }; + + let mut trim_gap_encountered = + worker_3.lines("Partition processor stopped due to a log trim gap, and no snapshot repository is configured".parse()?); + + info!("Waiting for partition processor to encounter log trim gap"); + let mut worker_3 = worker_3 + .start_clustered(cluster.base_dir(), cluster.cluster_name()) + .await?; + assert!( + tokio::time::timeout(Duration::from_secs(20), trim_gap_encountered.next()) + .await + .is_ok() + ); + worker_3.graceful_shutdown(Duration::from_secs(1)).await?; + + info!("Re-starting additional node with snapshot repository configured"); + let mut worker_3 = Node::new_test_node( + "node-3", + base_config.clone(), + BinarySource::CargoTest, + enum_set!(Role::HttpIngress | Role::Worker), + ); + *worker_3.metadata_store_client_mut() = MetadataStoreClient::Embedded { + address: cluster.nodes[0].node_address().clone(), + }; + + let ingress_url = format!( + "http://{}/Counter/0/get", + worker_3.config().ingress.bind_address + ); + let mut worker_3_imported_snapshot = worker_3.lines( + format!( + "Importing partition store snapshot.*{}", + snapshot_response.snapshot_id + ) + .parse()?, + ); + let mut worker_3 = worker_3 + .start_clustered(cluster.base_dir(), cluster.cluster_name()) + .await?; + assert!( + tokio::time::timeout(Duration::from_secs(20), worker_3_imported_snapshot.next()) + .await + .is_ok() + ); + + // todo(pavel): promote node 3 to be the leader for partition 0 and invoke the service again + // right now, all we are asserting is that the new node is applying newly appended log records + assert!(http_client + .post(ingress_url) + .send() + .await? + .status() + .is_success()); + tokio::time::timeout( + Duration::from_secs(5), + applied_lsn_converged(&mut client, 3, PartitionId::from(0)), + ) + .await??; + + worker_3.graceful_shutdown(Duration::from_secs(1)).await?; + cluster.graceful_shutdown(Duration::from_secs(1)).await?; + Ok(()) +} + +async fn any_partition_active( + client: &mut ClusterCtrlSvcClient, +) -> googletest::Result<()> { + loop { + let cluster_state = client + .get_cluster_state(ClusterStateRequest {}) + .await? + .into_inner() + .cluster_state + .unwrap(); + + if cluster_state.nodes.values().any(|n| { + n.state.as_ref().is_some_and(|s| match s { + State::Alive(s) => s.partitions.values().any(|p| { + RunMode::try_from(p.effective_mode).is_ok_and(|m| m == RunMode::Leader) + }), + _ => false, + }) + }) { + break; + } + tokio::time::sleep(Duration::from_millis(250)).await; + } + Ok(()) +} + +async fn trim_log( + client: &mut ClusterCtrlSvcClient, + log_id: LogId, + trim_point: Lsn, +) -> googletest::Result<()> { + loop { + // We have to keep retrying the trim operation as the admin node may decide to no-op it if + // the trim point is after the known global tail. + client + .trim_log(TrimLogRequest { + log_id: log_id.into(), + trim_point: trim_point.as_u64(), + }) + .await?; + + let response = client + .describe_log(DescribeLogRequest { + log_id: log_id.into(), + }) + .await? + .into_inner(); + if response.trim_point >= trim_point.as_u64() { + break; + } + + tokio::time::sleep(Duration::from_millis(250)).await; + } + Ok(()) +} + +async fn applied_lsn_converged( + client: &mut ClusterCtrlSvcClient, + expected_processors: usize, + partition_id: PartitionId, +) -> googletest::Result<()> { + assert!(expected_processors > 0); + info!( + "Waiting for {} partition processors to converge on the same applied LSN", + expected_processors + ); + loop { + let cluster_state = client + .get_cluster_state(ClusterStateRequest {}) + .await? + .into_inner() + .cluster_state + .unwrap(); + + let applied_lsn: Vec<_> = cluster_state + .nodes + .values() + .filter_map(|n| { + n.state + .as_ref() + .map(|s| match s { + State::Alive(s) => s + .partitions + .get(&partition_id.into()) + .map(|p| p.last_applied_log_lsn) + .unwrap_or_default() + .map(|lsn| (partition_id, lsn.value)), + _ => None, + }) + .unwrap_or_default() + }) + .collect(); + + if applied_lsn.len() == expected_processors + && applied_lsn.iter().all(|(_, lsn)| *lsn == applied_lsn[0].1) + { + break; + } + tokio::time::sleep(Duration::from_millis(250)).await; + } + Ok(()) +} + +async fn grpc_connect(address: AdvertisedAddress) -> Result { + match address { + AdvertisedAddress::Uds(uds_path) => { + // dummy endpoint required to specify an uds connector, it is not used anywhere + Endpoint::try_from("http://127.0.0.1") + .expect("/ should be a valid Uri") + .connect_with_connector(service_fn(move |_: Uri| { + let uds_path = uds_path.clone(); + async move { + Ok::<_, io::Error>(TokioIo::new(UnixStream::connect(uds_path).await?)) + } + })).await + } + AdvertisedAddress::Http(uri) => { + Channel::builder(uri) + .connect_timeout(Duration::from_secs(2)) + .timeout(Duration::from_secs(2)) + .http2_adaptive_window(true) + .connect() + .await + } + } +} diff --git a/tools/mock-service-endpoint/Cargo.toml b/tools/mock-service-endpoint/Cargo.toml index f779cda79..9945efdf6 100644 --- a/tools/mock-service-endpoint/Cargo.toml +++ b/tools/mock-service-endpoint/Cargo.toml @@ -22,8 +22,10 @@ prost = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } -tokio-stream = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } thiserror = { workspace = true } workspace-hack = { version = "0.1", path = "../../workspace-hack" } + +[lib] +path = "src/lib.rs" \ No newline at end of file diff --git a/tools/mock-service-endpoint/src/handler.rs b/tools/mock-service-endpoint/src/handler.rs new file mode 100644 index 000000000..e579442fa --- /dev/null +++ b/tools/mock-service-endpoint/src/handler.rs @@ -0,0 +1,393 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::convert::Infallible; +use std::fmt::{Display, Formatter}; +use std::str::FromStr; + +use assert2::let_assert; +use async_stream::{stream, try_stream}; +use bytes::Bytes; +use futures::{pin_mut, Stream, StreamExt}; +use http_body_util::{BodyStream, Either, Empty, StreamBody}; +use hyper::body::{Frame, Incoming}; +use hyper::{Request, Response}; +use prost::Message; +use tracing::{debug, error}; + +use restate_service_protocol::codec::ProtobufRawEntryCodec; +use restate_service_protocol::message::{Decoder, Encoder, EncodingError, ProtocolMessage}; +use restate_types::errors::codes; +use restate_types::journal::raw::{EntryHeader, PlainRawEntry, RawEntryCodecError}; +use restate_types::journal::{Entry, EntryType, InputEntry}; +use restate_types::service_protocol::start_message::StateEntry; +use restate_types::service_protocol::{ + self, get_state_entry_message, output_entry_message, ServiceProtocolVersion, StartMessage, +}; + +#[derive(Debug, thiserror::Error)] +enum FrameError { + #[error(transparent)] + EncodingError(EncodingError), + #[error(transparent)] + Hyper(hyper::Error), + #[error("Stream ended before finished replay")] + UnexpectedEOF, + #[error("Journal does not contain expected messages")] + InvalidJournal, + #[error(transparent)] + RawEntryCodecError(#[from] RawEntryCodecError), + #[error(transparent)] + Serde(#[from] serde_json::Error), +} + +pub async fn serve( + req: Request, +) -> Result< + Response< + Either, StreamBody, Infallible>>>>, + >, + Infallible, +> { + let (req_head, req_body) = req.into_parts(); + let mut split = req_head.uri.path().rsplit('/'); + let handler_name = if let Some(handler_name) = split.next() { + handler_name + } else { + return Ok(Response::builder() + .status(404) + .body(Either::Left(Empty::new())) + .unwrap()); + }; + if let Some("Counter") = split.next() { + } else { + return Ok(Response::builder() + .status(404) + .body(Either::Left(Empty::new())) + .unwrap()); + }; + if let Some("invoke") = split.next() { + } else { + return Ok(Response::builder() + .status(404) + .body(Either::Left(Empty::new())) + .unwrap()); + }; + + let req_body = BodyStream::new(req_body); + let mut decoder = Decoder::new(ServiceProtocolVersion::V1, usize::MAX, None); + let encoder = Encoder::new(ServiceProtocolVersion::V1); + + let incoming = stream! { + for await frame in req_body { + match frame { + Ok(frame) => { + if let Ok(data) = frame.into_data() { + decoder.push(data); + loop { + match decoder.consume_next() { + Ok(Some((_header, message))) => yield Ok(message), + Ok(None) => { + break + }, + Err(err) => yield Err(FrameError::EncodingError(err)), + } + } + } + }, + Err(err) => yield Err(FrameError::Hyper(err)), + }; + } + }; + + let handler: Handler = match handler_name.parse() { + Ok(handler) => handler, + Err(_err) => { + return Ok(Response::builder() + .status(404) + .body(Either::Left(Empty::new())) + .unwrap()); + } + }; + + let outgoing = handler.handle(incoming).map(move |message| match message { + Ok(message) => Ok(Frame::data(encoder.encode(message))), + Err(err) => { + error!("Error handling stream: {err:?}"); + Ok(Frame::data(encoder.encode(error(err)))) + } + }); + + Ok(Response::builder() + .status(200) + .header("content-type", "application/vnd.restate.invocation.v1") + .body(Either::Right(StreamBody::new(outgoing))) + .unwrap()) +} + +pub enum Handler { + Get, + Add, +} + +#[derive(Debug, thiserror::Error)] +#[error("Invalid handler")] +pub struct InvalidHandler; + +impl FromStr for Handler { + type Err = InvalidHandler; + + fn from_str(s: &str) -> Result { + match s { + "get" => Ok(Self::Get), + "add" => Ok(Self::Add), + _ => Err(InvalidHandler), + } + } +} + +impl Display for Handler { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::Get => write!(f, "get"), + Self::Add => write!(f, "add"), + } + } +} + +impl Handler { + fn handle( + self, + incoming: impl Stream>, + ) -> impl Stream> { + try_stream! { + pin_mut!(incoming); + match (incoming.next().await, incoming.next().await) { + (Some(Ok(ProtocolMessage::Start(start_message))), Some(Ok(ProtocolMessage::UnparsedEntry(input)))) if input.ty() == EntryType::Input => { + let input = input.deserialize_entry_ref::()?; + let_assert!( + Entry::Input(input) = input + ); + + let replay_count = start_message.known_entries as usize - 1; + let mut replayed = Vec::with_capacity(replay_count); + for _ in 0..replay_count { + let message = incoming.next().await.ok_or(FrameError::UnexpectedEOF)??; + replayed.push(message); + } + + debug!("Handling request to {self} with {} known entries", start_message.known_entries); + + match self { + Handler::Get => { + for await message in Self::handle_get(start_message, input, replayed, incoming) { + yield message? + } + }, + Handler::Add => { + for await message in Self::handle_add(start_message, input, replayed, incoming) { + yield message? + } + }, + }; + }, + _ => {Err(FrameError::InvalidJournal)?; return}, + }; + } + } + + fn handle_get( + start_message: StartMessage, + _input: InputEntry, + replayed: Vec, + _incoming: impl Stream>, + ) -> impl Stream> { + try_stream! { + let counter = read_counter(&start_message.state_map); + match replayed.len() { + 0 => { + yield get_state(counter.clone()); + yield output(counter.unwrap_or("0".into())); + yield end(); + }, + 1 => { + yield output(counter.unwrap_or("0".into())); + yield end(); + } + 2=> { + yield end(); + } + _ => {Err(FrameError::InvalidJournal)?; return}, + } + } + } + + fn handle_add( + start_message: StartMessage, + input: InputEntry, + replayed: Vec, + _incoming: impl Stream>, + ) -> impl Stream> { + try_stream! { + let counter = read_counter(&start_message.state_map); + match replayed.len() { + 0 => { + yield get_state(counter.clone()); + + let next_value = match counter { + Some(ref counter) => { + let to_add: i32 = serde_json::from_slice(input.value.as_ref())?; + let current: i32 = serde_json::from_slice(counter.as_ref())?; + + serde_json::to_vec(&(to_add + current))?.into() + } + None => input.value, + }; + + yield set_state(next_value.clone()); + yield output(next_value); + yield end(); + }, + 1 => { + let next_value = match counter { + Some(ref counter) => { + let to_add: i32 = serde_json::from_slice(input.value.as_ref())?; + let current: i32 = serde_json::from_slice(counter.as_ref())?; + + serde_json::to_vec(&(to_add + current))?.into() + } + None => input.value, + }; + + yield set_state(next_value.clone()); + yield output(next_value); + yield end(); + } + 2 => { + let set_value = match &replayed[1] { + ProtocolMessage::UnparsedEntry(set) if set.ty() == EntryType::SetState => { + let set = set.deserialize_entry_ref::()?; + let_assert!( + Entry::SetState(set) = set + ); + set.value.clone() + }, + _ => {Err(FrameError::InvalidJournal)?; return}, + }; + yield output(set_value); + yield end(); + } + 3 => { + yield end(); + } + _ => {Err(FrameError::InvalidJournal)?; return}, + } + } + } +} + +fn read_counter(state_map: &[StateEntry]) -> Option { + let entry = state_map + .iter() + .find(|entry| entry.key.as_ref() == b"counter")?; + Some(entry.value.clone()) +} + +fn get_state(counter: Option) -> ProtocolMessage { + debug!( + "Yielding GetStateEntryMessage with value {}", + LossyDisplay(counter.as_deref()) + ); + + ProtocolMessage::UnparsedEntry(PlainRawEntry::new( + EntryHeader::GetState { is_completed: true }, + service_protocol::GetStateEntryMessage { + name: String::new(), + key: "counter".into(), + result: Some(match counter { + Some(ref counter) => get_state_entry_message::Result::Value(counter.clone()), + None => get_state_entry_message::Result::Empty(service_protocol::Empty {}), + }), + } + .encode_to_vec() + .into(), + )) +} + +fn set_state(value: Bytes) -> ProtocolMessage { + debug!( + "Yielding SetStateEntryMessage with value {}", + LossyDisplay(Some(&value)) + ); + + ProtocolMessage::UnparsedEntry(PlainRawEntry::new( + EntryHeader::SetState, + service_protocol::SetStateEntryMessage { + name: String::new(), + key: "counter".into(), + value: value.clone(), + } + .encode_to_vec() + .into(), + )) +} + +fn output(value: Bytes) -> ProtocolMessage { + debug!( + "Yielding OutputEntryMessage with result {}", + LossyDisplay(Some(&value)) + ); + + ProtocolMessage::UnparsedEntry(PlainRawEntry::new( + EntryHeader::Output, + service_protocol::OutputEntryMessage { + name: String::new(), + result: Some(output_entry_message::Result::Value(value)), + } + .encode_to_vec() + .into(), + )) +} + +fn end() -> ProtocolMessage { + debug!("Yielding EndMessage"); + + ProtocolMessage::End(service_protocol::EndMessage {}) +} + +fn error(err: FrameError) -> ProtocolMessage { + let code = match err { + FrameError::EncodingError(_) => codes::PROTOCOL_VIOLATION, + FrameError::Hyper(_) => codes::INTERNAL, + FrameError::UnexpectedEOF => codes::PROTOCOL_VIOLATION, + FrameError::InvalidJournal => codes::JOURNAL_MISMATCH, + FrameError::RawEntryCodecError(_) => codes::PROTOCOL_VIOLATION, + FrameError::Serde(_) => codes::INTERNAL, + }; + ProtocolMessage::Error(service_protocol::ErrorMessage { + code: code.into(), + description: err.to_string(), + message: String::new(), + related_entry_index: None, + related_entry_name: None, + related_entry_type: None, + next_retry_delay: None, + }) +} + +struct LossyDisplay<'a>(Option<&'a [u8]>); +impl<'a> Display for LossyDisplay<'a> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self.0 { + Some(bytes) => write!(f, "{}", String::from_utf8_lossy(bytes)), + None => write!(f, ""), + } + } +} diff --git a/tools/mock-service-endpoint/src/lib.rs b/tools/mock-service-endpoint/src/lib.rs new file mode 100644 index 000000000..6e95bd1cc --- /dev/null +++ b/tools/mock-service-endpoint/src/lib.rs @@ -0,0 +1,12 @@ +// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +pub mod handler; +pub mod listener; diff --git a/tools/mock-service-endpoint/src/listener.rs b/tools/mock-service-endpoint/src/listener.rs new file mode 100644 index 000000000..9a5e662f5 --- /dev/null +++ b/tools/mock-service-endpoint/src/listener.rs @@ -0,0 +1,57 @@ +// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::convert::Infallible; +use std::error::Error; +use std::net::SocketAddr; + +use bytes::Bytes; +use http_body_util::{Either, Full}; +use hyper::server::conn::http2; +use hyper::service::service_fn; +use hyper::Response; +use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer}; +use tokio::net::TcpListener; +use tracing::error; + +use crate::handler::serve; + +pub async fn run_listener(address: SocketAddr) -> Result<(), Box> { + let listener = TcpListener::bind(address).await?; + + loop { + tokio::select! { + incoming = listener.accept() => { + let (tcp, _) = incoming?; + let io = TokioIo::new(tcp); + tokio::task::spawn(async move { + if let Err(err) = http2::Builder::new(TokioExecutor::new()) + .timer(TokioTimer::new()) + .serve_connection(io, service_fn(|req| async { + if req.uri().path() == "/discover" { + return Ok(Response::builder() + .header("content-type", "application/vnd.restate.endpointmanifest.v1+json") + .body(Either::Left(Full::new(Bytes::from( + r#"{"protocolMode":"BIDI_STREAM","minProtocolVersion":1,"maxProtocolVersion":1,"services":[{"name":"Counter","ty":"VIRTUAL_OBJECT","handlers":[{"name":"add","input":{"required":false,"contentType":"application/json"},"output":{"setContentTypeIfEmpty":false,"contentType":"application/json"},"ty":"EXCLUSIVE"},{"name":"get","input":{"required":false,"contentType":"application/json"},"output":{"setContentTypeIfEmpty":false,"contentType":"application/json"},"ty":"EXCLUSIVE"}]}]}"# + )))).unwrap()); + } + + let (head, body) = serve(req).await?.into_parts(); + Result::<_, Infallible>::Ok(Response::from_parts(head, Either::Right(body))) + })) + .await + { + error!("Error serving connection: {:?}", err); + } + }); + } + } + } +} diff --git a/tools/mock-service-endpoint/src/main.rs b/tools/mock-service-endpoint/src/main.rs index 8891881c0..88ba6332b 100644 --- a/tools/mock-service-endpoint/src/main.rs +++ b/tools/mock-service-endpoint/src/main.rs @@ -8,395 +8,12 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::convert::Infallible; -use std::fmt::{Display, Formatter}; use std::net::SocketAddr; -use std::str::FromStr; -use assert2::let_assert; -use async_stream::{stream, try_stream}; -use bytes::Bytes; -use futures::{pin_mut, Stream, StreamExt}; -use http_body_util::{BodyStream, Either, Empty, Full, StreamBody}; -use hyper::body::{Frame, Incoming}; -use hyper::server::conn::http2; -use hyper::service::service_fn; -use hyper::{Request, Response}; -use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer}; -use prost::Message; -use tokio::net::TcpListener; -use tracing::{debug, error, info}; +use tracing::info; use tracing_subscriber::filter::LevelFilter; -use restate_service_protocol::codec::ProtobufRawEntryCodec; -use restate_service_protocol::message::{Decoder, Encoder, EncodingError, ProtocolMessage}; -use restate_types::errors::codes; -use restate_types::journal::raw::{EntryHeader, PlainRawEntry, RawEntryCodecError}; -use restate_types::journal::{Entry, EntryType, InputEntry}; -use restate_types::service_protocol::start_message::StateEntry; -use restate_types::service_protocol::{ - self, get_state_entry_message, output_entry_message, ServiceProtocolVersion, StartMessage, -}; - -#[derive(Debug, thiserror::Error)] -enum FrameError { - #[error(transparent)] - EncodingError(EncodingError), - #[error(transparent)] - Hyper(hyper::Error), - #[error("Stream ended before finished replay")] - UnexpectedEOF, - #[error("Journal does not contain expected messages")] - InvalidJournal, - #[error(transparent)] - RawEntryCodecError(#[from] RawEntryCodecError), - #[error(transparent)] - Serde(#[from] serde_json::Error), -} - -async fn serve( - req: Request, -) -> Result< - Response< - Either, StreamBody, Infallible>>>>, - >, - Infallible, -> { - let (req_head, req_body) = req.into_parts(); - let mut split = req_head.uri.path().rsplit('/'); - let handler_name = if let Some(handler_name) = split.next() { - handler_name - } else { - return Ok(Response::builder() - .status(404) - .body(Either::Left(Empty::new())) - .unwrap()); - }; - if let Some("Counter") = split.next() { - } else { - return Ok(Response::builder() - .status(404) - .body(Either::Left(Empty::new())) - .unwrap()); - }; - if let Some("invoke") = split.next() { - } else { - return Ok(Response::builder() - .status(404) - .body(Either::Left(Empty::new())) - .unwrap()); - }; - - let req_body = BodyStream::new(req_body); - let mut decoder = Decoder::new(ServiceProtocolVersion::V1, usize::MAX, None); - let encoder = Encoder::new(ServiceProtocolVersion::V1); - - let incoming = stream! { - for await frame in req_body { - match frame { - Ok(frame) => { - if let Ok(data) = frame.into_data() { - decoder.push(data); - loop { - match decoder.consume_next() { - Ok(Some((_header, message))) => yield Ok(message), - Ok(None) => { - break - }, - Err(err) => yield Err(FrameError::EncodingError(err)), - } - } - } - }, - Err(err) => yield Err(FrameError::Hyper(err)), - }; - } - }; - - let handler: Handler = match handler_name.parse() { - Ok(handler) => handler, - Err(_err) => { - return Ok(Response::builder() - .status(404) - .body(Either::Left(Empty::new())) - .unwrap()); - } - }; - - let outgoing = handler.handle(incoming).map(move |message| match message { - Ok(message) => Ok(Frame::data(encoder.encode(message))), - Err(err) => { - error!("Error handling stream: {err:?}"); - Ok(Frame::data(encoder.encode(error(err)))) - } - }); - - Ok(Response::builder() - .status(200) - .header("content-type", "application/vnd.restate.invocation.v1") - .body(Either::Right(StreamBody::new(outgoing))) - .unwrap()) -} - -enum Handler { - Get, - Add, -} - -#[derive(Debug, thiserror::Error)] -#[error("Invalid handler")] -struct InvalidHandler; - -impl FromStr for Handler { - type Err = InvalidHandler; - - fn from_str(s: &str) -> Result { - match s { - "get" => Ok(Self::Get), - "add" => Ok(Self::Add), - _ => Err(InvalidHandler), - } - } -} - -impl Display for Handler { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - Self::Get => write!(f, "get"), - Self::Add => write!(f, "add"), - } - } -} - -impl Handler { - fn handle( - self, - incoming: impl Stream>, - ) -> impl Stream> { - try_stream! { - pin_mut!(incoming); - match (incoming.next().await, incoming.next().await) { - (Some(Ok(ProtocolMessage::Start(start_message))), Some(Ok(ProtocolMessage::UnparsedEntry(input)))) if input.ty() == EntryType::Input => { - let input = input.deserialize_entry_ref::()?; - let_assert!( - Entry::Input(input) = input - ); - - let replay_count = start_message.known_entries as usize - 1; - let mut replayed = Vec::with_capacity(replay_count); - for _ in 0..replay_count { - let message = incoming.next().await.ok_or(FrameError::UnexpectedEOF)??; - replayed.push(message); - } - - debug!("Handling request to {self} with {} known entries", start_message.known_entries); - - match self { - Handler::Get => { - for await message in Self::handle_get(start_message, input, replayed, incoming) { - yield message? - } - }, - Handler::Add => { - for await message in Self::handle_add(start_message, input, replayed, incoming) { - yield message? - } - }, - }; - }, - _ => {Err(FrameError::InvalidJournal)?; return}, - }; - } - } - - fn handle_get( - start_message: StartMessage, - _input: InputEntry, - replayed: Vec, - _incoming: impl Stream>, - ) -> impl Stream> { - try_stream! { - let counter = read_counter(&start_message.state_map); - match replayed.len() { - 0 => { - yield get_state(counter.clone()); - yield output(counter.unwrap_or("0".into())); - yield end(); - }, - 1 => { - yield output(counter.unwrap_or("0".into())); - yield end(); - } - 2=> { - yield end(); - } - _ => {Err(FrameError::InvalidJournal)?; return}, - } - } - } - - fn handle_add( - start_message: StartMessage, - input: InputEntry, - replayed: Vec, - _incoming: impl Stream>, - ) -> impl Stream> { - try_stream! { - let counter = read_counter(&start_message.state_map); - match replayed.len() { - 0 => { - yield get_state(counter.clone()); - - let next_value = match counter { - Some(ref counter) => { - let to_add: i32 = serde_json::from_slice(input.value.as_ref())?; - let current: i32 = serde_json::from_slice(counter.as_ref())?; - - serde_json::to_vec(&(to_add + current))?.into() - } - None => input.value, - }; - - yield set_state(next_value.clone()); - yield output(next_value); - yield end(); - }, - 1 => { - let next_value = match counter { - Some(ref counter) => { - let to_add: i32 = serde_json::from_slice(input.value.as_ref())?; - let current: i32 = serde_json::from_slice(counter.as_ref())?; - - serde_json::to_vec(&(to_add + current))?.into() - } - None => input.value, - }; - - yield set_state(next_value.clone()); - yield output(next_value); - yield end(); - } - 2 => { - let set_value = match &replayed[1] { - ProtocolMessage::UnparsedEntry(set) if set.ty() == EntryType::SetState => { - let set = set.deserialize_entry_ref::()?; - let_assert!( - Entry::SetState(set) = set - ); - set.value.clone() - }, - _ => {Err(FrameError::InvalidJournal)?; return}, - }; - yield output(set_value); - yield end(); - } - 3 => { - yield end(); - } - _ => {Err(FrameError::InvalidJournal)?; return}, - } - } - } -} - -fn read_counter(state_map: &[StateEntry]) -> Option { - let entry = state_map - .iter() - .find(|entry| entry.key.as_ref() == b"counter")?; - Some(entry.value.clone()) -} - -fn get_state(counter: Option) -> ProtocolMessage { - debug!( - "Yielding GetStateEntryMessage with value {}", - LossyDisplay(counter.as_deref()) - ); - - ProtocolMessage::UnparsedEntry(PlainRawEntry::new( - EntryHeader::GetState { is_completed: true }, - service_protocol::GetStateEntryMessage { - name: String::new(), - key: "counter".into(), - result: Some(match counter { - Some(ref counter) => get_state_entry_message::Result::Value(counter.clone()), - None => get_state_entry_message::Result::Empty(service_protocol::Empty {}), - }), - } - .encode_to_vec() - .into(), - )) -} - -fn set_state(value: Bytes) -> ProtocolMessage { - debug!( - "Yielding SetStateEntryMessage with value {}", - LossyDisplay(Some(&value)) - ); - - ProtocolMessage::UnparsedEntry(PlainRawEntry::new( - EntryHeader::SetState, - service_protocol::SetStateEntryMessage { - name: String::new(), - key: "counter".into(), - value: value.clone(), - } - .encode_to_vec() - .into(), - )) -} - -fn output(value: Bytes) -> ProtocolMessage { - debug!( - "Yielding OutputEntryMessage with result {}", - LossyDisplay(Some(&value)) - ); - - ProtocolMessage::UnparsedEntry(PlainRawEntry::new( - EntryHeader::Output, - service_protocol::OutputEntryMessage { - name: String::new(), - result: Some(output_entry_message::Result::Value(value)), - } - .encode_to_vec() - .into(), - )) -} - -fn end() -> ProtocolMessage { - debug!("Yielding EndMessage"); - - ProtocolMessage::End(service_protocol::EndMessage {}) -} - -fn error(err: FrameError) -> ProtocolMessage { - let code = match err { - FrameError::EncodingError(_) => codes::PROTOCOL_VIOLATION, - FrameError::Hyper(_) => codes::INTERNAL, - FrameError::UnexpectedEOF => codes::PROTOCOL_VIOLATION, - FrameError::InvalidJournal => codes::JOURNAL_MISMATCH, - FrameError::RawEntryCodecError(_) => codes::PROTOCOL_VIOLATION, - FrameError::Serde(_) => codes::INTERNAL, - }; - ProtocolMessage::Error(service_protocol::ErrorMessage { - code: code.into(), - description: err.to_string(), - message: String::new(), - related_entry_index: None, - related_entry_name: None, - related_entry_type: None, - next_retry_delay: None, - }) -} - -struct LossyDisplay<'a>(Option<&'a [u8]>); -impl<'a> Display for LossyDisplay<'a> { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self.0 { - Some(bytes) => write!(f, "{}", String::from_utf8_lossy(bytes)), - None => write!(f, ""), - } - } -} +use mock_service_endpoint::listener::run_listener; #[tokio::main] pub async fn main() -> Result<(), Box> { @@ -413,31 +30,6 @@ pub async fn main() -> Result<(), Box> { let addr: SocketAddr = ([127, 0, 0, 1], 9080).into(); - let listener = TcpListener::bind(addr).await?; info!("Listening on http://{}", addr); - loop { - let (tcp, _) = listener.accept().await?; - let io = TokioIo::new(tcp); - - tokio::task::spawn(async move { - if let Err(err) = http2::Builder::new(TokioExecutor::new()) - .timer(TokioTimer::new()) - .serve_connection(io, service_fn(|req| async { - if req.uri().path() == "/discover" { - return Ok(Response::builder() - .header("content-type", "application/vnd.restate.endpointmanifest.v1+json") - .body(Either::Left(Full::new(Bytes::from( - r#"{"protocolMode":"BIDI_STREAM","minProtocolVersion":1,"maxProtocolVersion":1,"services":[{"name":"Counter","ty":"VIRTUAL_OBJECT","handlers":[{"name":"add","input":{"required":false,"contentType":"application/json"},"output":{"setContentTypeIfEmpty":false,"contentType":"application/json"},"ty":"EXCLUSIVE"},{"name":"get","input":{"required":false,"contentType":"application/json"},"output":{"setContentTypeIfEmpty":false,"contentType":"application/json"},"ty":"EXCLUSIVE"}]}]}"# - )))).unwrap()); - } - - let (head, body) = serve(req).await?.into_parts(); - Result::<_, Infallible>::Ok(Response::from_parts(head, Either::Right(body))) - })) - .await - { - println!("Error serving connection: {:?}", err); - } - }); - } + run_listener(addr).await }