Skip to content

Commit

Permalink
chore(reflection): Refactor ServerReflectionInfoStream
Browse files Browse the repository at this point in the history
  • Loading branch information
tottoto committed Jan 17, 2025
1 parent 6839a39 commit 074051d
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 14 deletions.
21 changes: 14 additions & 7 deletions tonic-reflection/src/server/v1.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{fmt, sync::Arc};

use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tokio_stream::{Stream, StreamExt};
use tonic::{Request, Response, Status, Streaming};

use super::ReflectionServiceState;
Expand Down Expand Up @@ -92,9 +92,7 @@ impl ServerReflection for ReflectionService {
}
});

Ok(Response::new(ServerReflectionInfoStream(
ReceiverStream::new(resp_rx),
)))
Ok(Response::new(ServerReflectionInfoStream::new(resp_rx)))
}
}

Expand All @@ -107,7 +105,16 @@ impl From<ReflectionServiceState> for ReflectionService {
}

/// A response stream.
pub struct ServerReflectionInfoStream(ReceiverStream<Result<ServerReflectionResponse, Status>>);
pub struct ServerReflectionInfoStream {
inner: tokio_stream::wrappers::ReceiverStream<Result<ServerReflectionResponse, Status>>,
}

impl ServerReflectionInfoStream {
fn new(resp_rx: mpsc::Receiver<Result<ServerReflectionResponse, Status>>) -> Self {
let inner = tokio_stream::wrappers::ReceiverStream::new(resp_rx);
Self { inner }
}
}

impl Stream for ServerReflectionInfoStream {
type Item = Result<ServerReflectionResponse, Status>;
Expand All @@ -116,11 +123,11 @@ impl Stream for ServerReflectionInfoStream {
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
std::pin::Pin::new(&mut self.0).poll_next(cx)
std::pin::Pin::new(&mut self.inner).poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
self.inner.size_hint()
}
}

Expand Down
21 changes: 14 additions & 7 deletions tonic-reflection/src/server/v1alpha.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{fmt, sync::Arc};

use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tokio_stream::{Stream, StreamExt};
use tonic::{Request, Response, Status, Streaming};

use super::ReflectionServiceState;
Expand Down Expand Up @@ -92,9 +92,7 @@ impl ServerReflection for ReflectionService {
}
});

Ok(Response::new(ServerReflectionInfoStream(
ReceiverStream::new(resp_rx),
)))
Ok(Response::new(ServerReflectionInfoStream::new(resp_rx)))
}
}

Expand All @@ -107,7 +105,16 @@ impl From<ReflectionServiceState> for ReflectionService {
}

/// A response stream.
pub struct ServerReflectionInfoStream(ReceiverStream<Result<ServerReflectionResponse, Status>>);
pub struct ServerReflectionInfoStream {
inner: tokio_stream::wrappers::ReceiverStream<Result<ServerReflectionResponse, Status>>,
}

impl ServerReflectionInfoStream {
fn new(resp_rx: mpsc::Receiver<Result<ServerReflectionResponse, Status>>) -> Self {
let inner = tokio_stream::wrappers::ReceiverStream::new(resp_rx);
Self { inner }
}
}

impl Stream for ServerReflectionInfoStream {
type Item = Result<ServerReflectionResponse, Status>;
Expand All @@ -116,11 +123,11 @@ impl Stream for ServerReflectionInfoStream {
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
std::pin::Pin::new(&mut self.0).poll_next(cx)
std::pin::Pin::new(&mut self.inner).poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
self.inner.size_hint()
}
}

Expand Down

0 comments on commit 074051d

Please sign in to comment.