Skip to content

Commit

Permalink
feat: Support setting deadline per-call (#1838)
Browse files Browse the repository at this point in the history
Adds a setDeadline method to the request builder, so that it can optionally be
specified per call.
  • Loading branch information
longshorej authored Oct 14, 2023
1 parent 4d569c2 commit cdd9e63
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# New APIs for setting deadline per call

ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.grpc.javadsl.SingleResponseRequestBuilder.setDeadline")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.grpc.javadsl.StreamResponseRequestBuilder.setDeadline")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.grpc.scaladsl.SingleResponseRequestBuilder.setDeadline")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.grpc.scaladsl.StreamResponseRequestBuilder.setDeadline")
78 changes: 76 additions & 2 deletions runtime/src/main/scala/akka/grpc/internal/RequestBuilderImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@

package akka.grpc.internal

import java.util.concurrent.CompletionStage

import java.time.{ Duration => JDuration }
import java.util.concurrent.{ CompletionStage, TimeUnit }
import akka.NotUsed
import akka.annotation.{ InternalApi, InternalStableApi }
import akka.dispatch.ExecutionContexts
import akka.grpc.scaladsl.SingleResponseRequestBuilder
import akka.grpc.{ GrpcClientSettings, GrpcResponseMetadata, GrpcServiceException, GrpcSingleResponse }
import akka.stream.{ Graph, Materializer, SourceShape }
import akka.stream.javadsl.{ Source => JavaSource }
Expand All @@ -17,6 +18,7 @@ import akka.util.ByteString
import io.grpc._

import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{ ExecutionContext, Future }

/**
Expand Down Expand Up @@ -52,6 +54,15 @@ final class ScalaUnaryRequestBuilder[I, O](

override def withHeaders(headers: MetadataImpl): ScalaUnaryRequestBuilder[I, O] =
new ScalaUnaryRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers)

override def setDeadline(deadline: Duration): SingleResponseRequestBuilder[I, O] =
new ScalaUnaryRequestBuilder[I, O](
descriptor,
channel,
if (!deadline.isFinite) defaultOptions.withDeadline(null)
else defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS),
settings,
headers)
}

/**
Expand Down Expand Up @@ -84,6 +95,15 @@ final class JavaUnaryRequestBuilder[I, O](

override def withHeaders(headers: MetadataImpl): JavaUnaryRequestBuilder[I, O] =
new JavaUnaryRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers)

override def setDeadline(deadline: JDuration): JavaUnaryRequestBuilder[I, O] =
new JavaUnaryRequestBuilder[I, O](
descriptor,
channel,
if (deadline == null) defaultOptions.withDeadline(null)
else defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS),
settings,
headers)
}

/**
Expand Down Expand Up @@ -152,6 +172,15 @@ final class ScalaClientStreamingRequestBuilder[I, O](

override def withHeaders(headers: MetadataImpl): ScalaClientStreamingRequestBuilder[I, O] =
new ScalaClientStreamingRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers)

override def setDeadline(deadline: Duration): ScalaClientStreamingRequestBuilder[I, O] =
new ScalaClientStreamingRequestBuilder[I, O](
descriptor,
channel,
if (!deadline.isFinite) defaultOptions.withDeadline(null)
else defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS),
settings,
headers)
}

/**
Expand Down Expand Up @@ -195,6 +224,15 @@ final class JavaClientStreamingRequestBuilder[I, O](

override def withHeaders(headers: MetadataImpl): JavaClientStreamingRequestBuilder[I, O] =
new JavaClientStreamingRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers)

override def setDeadline(deadline: JDuration): JavaClientStreamingRequestBuilder[I, O] =
new JavaClientStreamingRequestBuilder[I, O](
descriptor,
channel,
if (deadline == null) defaultOptions.withDeadline(null)
else defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS),
settings,
headers)
}

/**
Expand Down Expand Up @@ -242,6 +280,15 @@ final class ScalaServerStreamingRequestBuilder[I, O](

override def withHeaders(headers: MetadataImpl): ScalaServerStreamingRequestBuilder[I, O] =
new ScalaServerStreamingRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers)

override def setDeadline(deadline: Duration): ScalaServerStreamingRequestBuilder[I, O] =
new ScalaServerStreamingRequestBuilder[I, O](
descriptor,
channel,
if (!deadline.isFinite) defaultOptions.withDeadline(null)
else defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS),
settings,
headers)
}

/**
Expand Down Expand Up @@ -285,6 +332,15 @@ final class JavaServerStreamingRequestBuilder[I, O](

override def withHeaders(headers: MetadataImpl): JavaServerStreamingRequestBuilder[I, O] =
new JavaServerStreamingRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers)

override def setDeadline(deadline: JDuration): JavaServerStreamingRequestBuilder[I, O] =
new JavaServerStreamingRequestBuilder[I, O](
descriptor,
channel,
if (deadline == null) defaultOptions.withDeadline(null)
else defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS),
settings,
headers)
}

/**
Expand Down Expand Up @@ -333,6 +389,15 @@ final class ScalaBidirectionalStreamingRequestBuilder[I, O](

override def withHeaders(headers: MetadataImpl): ScalaBidirectionalStreamingRequestBuilder[I, O] =
new ScalaBidirectionalStreamingRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers)

override def setDeadline(deadline: Duration): ScalaBidirectionalStreamingRequestBuilder[I, O] =
new ScalaBidirectionalStreamingRequestBuilder[I, O](
descriptor,
channel,
if (!deadline.isFinite) defaultOptions.withDeadline(null)
else defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS),
settings,
headers)
}

/**
Expand Down Expand Up @@ -377,6 +442,15 @@ final class JavaBidirectionalStreamingRequestBuilder[I, O](

override def withHeaders(headers: MetadataImpl): JavaBidirectionalStreamingRequestBuilder[I, O] =
new JavaBidirectionalStreamingRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers)

override def setDeadline(deadline: JDuration): JavaBidirectionalStreamingRequestBuilder[I, O] =
new JavaBidirectionalStreamingRequestBuilder[I, O](
descriptor,
channel,
if (deadline == null) defaultOptions.withDeadline(null)
else defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS),
settings,
headers)
}

/**
Expand Down
13 changes: 13 additions & 0 deletions runtime/src/main/scala/akka/grpc/javadsl/RequestBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package akka.grpc.javadsl

import java.time.Duration
import java.util.concurrent.CompletionStage

import akka.NotUsed
Expand Down Expand Up @@ -48,6 +49,12 @@ trait SingleResponseRequestBuilder[Req, Res] {
* Invoke the gRPC method with the additional metadata added and provide access to response metadata
*/
def invokeWithMetadata(request: Req): CompletionStage[GrpcSingleResponse[Res]]

/**
* Set the deadline for this call
* @return A new request builder, that will use the supplied deadline when invoked
*/
def setDeadline(deadline: Duration): SingleResponseRequestBuilder[Req, Res]
}

/**
Expand Down Expand Up @@ -86,4 +93,10 @@ trait StreamResponseRequestBuilder[Req, Res] {
* Invoke the gRPC method with the additional metadata added and provide access to response metadata
*/
def invokeWithMetadata(request: Req): Source[Res, CompletionStage[GrpcResponseMetadata]]

/**
* Set the deadline for this call
* @return A new request builder, that will use the supplied deadline when invoked
*/
def setDeadline(deadline: Duration): StreamResponseRequestBuilder[Req, Res]
}
13 changes: 13 additions & 0 deletions runtime/src/main/scala/akka/grpc/scaladsl/RequestBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import akka.stream.scaladsl.Source
import akka.util.ByteString

import scala.concurrent.Future
import scala.concurrent.duration.Duration

/**
* Request builder for requests providing per call specific metadata capabilities in
Expand Down Expand Up @@ -48,6 +49,12 @@ trait SingleResponseRequestBuilder[Req, Res] {
* Invoke the gRPC method with the additional metadata added and provide access to response metadata
*/
def invokeWithMetadata(request: Req): Future[GrpcSingleResponse[Res]]

/**
* Set the deadline for this call
* @return A new request builder, that will use the supplied deadline when invoked
*/
def setDeadline(deadline: Duration): SingleResponseRequestBuilder[Req, Res]
}

/**
Expand Down Expand Up @@ -88,4 +95,10 @@ trait StreamResponseRequestBuilder[Req, Res] {
* Invoke the gRPC method with the additional metadata added and provide access to response metadata
*/
def invokeWithMetadata(request: Req): Source[Res, Future[GrpcResponseMetadata]]

/**
* Set the deadline for this call
* @return A new request builder, that will use the supplied deadline when invoked
*/
def setDeadline(deadline: Duration): StreamResponseRequestBuilder[Req, Res]
}

0 comments on commit cdd9e63

Please sign in to comment.