From 44357e249829aed11e70bb20542e1e3cba613944 Mon Sep 17 00:00:00 2001 From: Hui Zhu Date: Thu, 9 Jan 2025 11:05:39 +0800 Subject: [PATCH] examples: Add example for server_send_stream Add example for server_send_stream to show server how to use stream send data to client. Signed-off-by: Hui Zhu --- example/async-stream-client.rs | 22 ++++++++++++++++++++-- example/async-stream-server.rs | 19 +++++++++++++++++++ example/protocols/protos/streaming.proto | 1 + 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/example/async-stream-client.rs b/example/async-stream-client.rs index 04f8c7b6..9e3ec216 100644 --- a/example/async-stream-client.rs +++ b/example/async-stream-client.rs @@ -44,9 +44,12 @@ async fn main() { let sc1 = sc.clone(); let t6 = tokio::spawn(echo_null_stream(sc1)); - let t7 = tokio::spawn(echo_default_value(sc)); + let sc1 = sc.clone(); + let t7 = tokio::spawn(echo_default_value(sc1)); + + let t8 = tokio::spawn(server_send_stream(sc)); - let _ = tokio::join!(t1, t2, t3, t4, t5, t6, t7); + let _ = tokio::join!(t1, t2, t3, t4, t5, t6, t7, t8); } fn default_ctx() -> Context { @@ -201,3 +204,18 @@ async fn echo_default_value(cli: streaming_ttrpc::StreamingClient) { assert_eq!(received.seq, 0); assert_eq!(received.msg, ""); } + +#[cfg(unix)] +async fn server_send_stream(cli: streaming_ttrpc::StreamingClient) { + let mut stream = cli + .server_send_stream(default_ctx(), &Default::default()) + .await + .unwrap(); + + let mut seq = 0; + while let Some(received) = stream.recv().await.unwrap() { + assert_eq!(received.seq, seq); + assert_eq!(received.msg, "hello"); + seq += 1; + } +} diff --git a/example/async-stream-server.rs b/example/async-stream-server.rs index 4dcba884..a04f27d2 100644 --- a/example/async-stream-server.rs +++ b/example/async-stream-server.rs @@ -152,6 +152,25 @@ impl streaming_ttrpc::Streaming for StreamingService { Ok(()) } + + async fn server_send_stream( + &self, + _ctx: &::ttrpc::r#async::TtrpcContext, + _: empty::Empty, + s: ::ttrpc::r#async::ServerStreamSender, + ) -> ::ttrpc::Result<()> { + let mut seq = 0; + while seq < 10 { + sleep(std::time::Duration::from_millis(100)).await; + let mut e = streaming::EchoPayload::new(); + e.seq = seq; + e.msg = format!("hello"); + s.send(&e).await.unwrap(); + seq += 1; + } + + Ok(()) + } } #[cfg(windows)] diff --git a/example/protocols/protos/streaming.proto b/example/protocols/protos/streaming.proto index 3aa7d850..8a32cd66 100644 --- a/example/protocols/protos/streaming.proto +++ b/example/protocols/protos/streaming.proto @@ -33,6 +33,7 @@ service Streaming { rpc EchoNull(stream EchoPayload) returns (google.protobuf.Empty); rpc EchoNullStream(stream EchoPayload) returns (stream google.protobuf.Empty); rpc EchoDefaultValue(EchoPayload) returns (stream EchoPayload); + rpc ServerSendStream(google.protobuf.Empty) returns (stream EchoPayload); } message EchoPayload {