Skip to content

Commit

Permalink
HttpResource for server
Browse files Browse the repository at this point in the history
new type alias for HttpResource.

This seems to be binary compatible. Needs to be investigated if it is.
  • Loading branch information
hamnis committed Jan 30, 2025
1 parent 3a2c626 commit 9cbce7d
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 19 deletions.
3 changes: 3 additions & 0 deletions core/src/main/scala/org/http4s/netty/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.http4s

import cats.effect.Async
import cats.effect.Resource
import cats.syntax.all._
import io.netty.channel.Channel
import io.netty.channel.ChannelFuture
Expand Down Expand Up @@ -69,4 +70,6 @@ package object netty {
val _ = a
()
}

type HttpResource[F[_]] = Request[F] => Resource[F, Response[F]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import io.netty.channel._
import io.netty.handler.codec.TooLongFrameException
import io.netty.handler.codec.http._
import io.netty.handler.timeout.IdleStateEvent
import org.http4s.HttpApp
import org.http4s.netty.server.Http4sNettyHandler.RFC7231InstantFormatter
import org.http4s.server.ServiceErrorHandler
import org.http4s.server.websocket.WebSocketBuilder2
Expand Down Expand Up @@ -256,13 +255,13 @@ object Http4sNettyHandler {
private[netty] case object InvalidMessageException extends Exception with NoStackTrace

private class WebsocketHandler[F[_]](
appFn: WebSocketBuilder2[F] => HttpApp[F],
appFn: WebSocketBuilder2[F] => HttpResource[F],
serviceErrorHandler: ServiceErrorHandler[F],
maxWSPayloadLength: Int,
dispatcher: Dispatcher[F]
)(implicit
F: Async[F],
D: Defer[F]
D: Defer[Resource[F, *]]
) extends Http4sNettyHandler[F](dispatcher) {

private[this] val converter: ServerNettyModelConversion[F] = new ServerNettyModelConversion[F]
Expand All @@ -273,13 +272,14 @@ object Http4sNettyHandler {
dateString: String
): Resource[F, DefaultHttpResponse] =
Resource.eval(WebSocketBuilder2[F]).flatMap { b =>
val app = appFn(b).run
val app = appFn(b)
logger.trace("Http request received by netty: " + request)
converter
.fromNettyRequest(channel, request)
.flatMap { req =>
Resource
.eval(D.defer(app(req)).recoverWith(serviceErrorHandler(req)))
val pf = serviceErrorHandler(req)
D.defer(app(req))
.recoverWith { case t if pf.isDefinedAt(t) => Resource.eval(pf(t)) }
.flatMap(
converter.toNettyResponseWithWebsocket(
b.webSocketKey,
Expand All @@ -292,7 +292,7 @@ object Http4sNettyHandler {
}

def websocket[F[_]: Async](
app: WebSocketBuilder2[F] => HttpApp[F],
app: WebSocketBuilder2[F] => HttpResource[F],
serviceErrorHandler: ServiceErrorHandler[F],
maxWSPayloadLength: Int,
dispatcher: Dispatcher[F]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ import cats.effect.std.Dispatcher
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.ssl.ApplicationProtocolNames
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler
import org.http4s.HttpApp
import org.http4s.server.ServiceErrorHandler
import org.http4s.server.websocket.WebSocketBuilder2

import scala.concurrent.duration.Duration

private[server] class NegotiationHandler[F[_]: Async](
config: NegotiationHandler.Config,
httpApp: WebSocketBuilder2[F] => HttpApp[F],
httpApp: WebSocketBuilder2[F] => HttpResource[F],
serviceErrorHandler: ServiceErrorHandler[F],
dispatcher: Dispatcher[F]
) extends ApplicationProtocolNegotiationHandler(ApplicationProtocolNames.HTTP_1_1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import io.netty.handler.codec.http2.Http2FrameCodecBuilder
import io.netty.handler.codec.http2.Http2MultiplexHandler
import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec
import io.netty.handler.timeout.IdleStateHandler
import org.http4s.HttpApp
import org.http4s.netty.HttpResource
import org.http4s.netty.void
import org.http4s.server.ServiceErrorHandler
import org.http4s.server.websocket.WebSocketBuilder2
Expand All @@ -40,7 +40,7 @@ private object NettyPipelineHelpers {
def buildHttp2Pipeline[F[_]: Async](
pipeline: ChannelPipeline,
config: NegotiationHandler.Config,
httpApp: WebSocketBuilder2[F] => HttpApp[F],
httpApp: WebSocketBuilder2[F] => HttpResource[F],
serviceErrorHandler: ServiceErrorHandler[F],
dispatcher: Dispatcher[F]): Unit = void {
// H2, being a multiplexed protocol, needs to always be reading events in case
Expand All @@ -62,7 +62,7 @@ private object NettyPipelineHelpers {
def buildHttp1Pipeline[F[_]: Async](
pipeline: ChannelPipeline,
config: NegotiationHandler.Config,
httpApp: WebSocketBuilder2[F] => HttpApp[F],
httpApp: WebSocketBuilder2[F] => HttpResource[F],
serviceErrorHandler: ServiceErrorHandler[F],
dispatcher: Dispatcher[F]): Unit = void {
// For HTTP/1.x pipelines the only backpressure we can exert is via the TCP
Expand All @@ -83,7 +83,7 @@ private object NettyPipelineHelpers {
private[this] def addHttp4sHandlers[F[_]: Async](
pipeline: ChannelPipeline,
config: NegotiationHandler.Config,
httpApp: WebSocketBuilder2[F] => HttpApp[F],
httpApp: WebSocketBuilder2[F] => HttpResource[F],
serviceErrorHandler: ServiceErrorHandler[F],
dispatcher: Dispatcher[F]): Unit = void {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ import io.netty.incubator.channel.uring.IOUring
import io.netty.incubator.channel.uring.IOUringEventLoopGroup
import io.netty.incubator.channel.uring.IOUringServerSocketChannel
import org.http4s.HttpApp
import org.http4s.Request
import org.http4s.Response
import org.http4s.server.Server
import org.http4s.server.ServiceErrorHandler
import org.http4s.server.defaults
Expand All @@ -65,7 +67,7 @@ import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag

final class NettyServerBuilder[F[_]] private (
httpApp: WebSocketBuilder2[F] => HttpApp[F],
httpApp: WebSocketBuilder2[F] => HttpResource[F],
serviceErrorHandler: ServiceErrorHandler[F],
socketAddress: InetSocketAddress,
idleTimeout: Duration,
Expand All @@ -84,7 +86,7 @@ final class NettyServerBuilder[F[_]] private (
type Self = NettyServerBuilder[F]

private def copy(
httpApp: WebSocketBuilder2[F] => HttpApp[F] = httpApp,
httpApp: WebSocketBuilder2[F] => HttpResource[F] = httpApp,
serviceErrorHandler: ServiceErrorHandler[F] = serviceErrorHandler,
socketAddress: InetSocketAddress = socketAddress,
idleTimeout: Duration = idleTimeout,
Expand Down Expand Up @@ -166,8 +168,12 @@ final class NettyServerBuilder[F[_]] private (
case _ => None
}

def withHttpApp(httpApp: HttpApp[F]): Self = copy(httpApp = _ => httpApp)
def withHttpApp(httpApp: HttpApp[F]): Self =
copy(httpApp = _ => (req: Request[F]) => Resource.eval(httpApp.run(req)))
def withHttpWebSocketApp(httpApp: WebSocketBuilder2[F] => HttpApp[F]): Self =
copy(httpApp = httpApp.andThen(app => (req: Request[F]) => Resource.eval(app.run(req))))

def withHttpWebSocketResource(httpApp: WebSocketBuilder2[F] => HttpResource[F]): Self =
copy(httpApp = httpApp)
def bindSocketAddress(address: InetSocketAddress): Self = copy(socketAddress = address)

Expand Down Expand Up @@ -347,7 +353,7 @@ object NettyServerBuilder {

def apply[F[_]](implicit F: Async[F]): NettyServerBuilder[F] =
new NettyServerBuilder[F](
httpApp = _ => HttpApp.notFound[F],
httpApp = _ => _ => Resource.pure(Response.notFound[F]),
serviceErrorHandler = org.http4s.server.DefaultServiceErrorHandler[F],
socketAddress = org.http4s.server.defaults.IPv4SocketAddress,
idleTimeout = org.http4s.server.defaults.IdleTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelPipeline
import io.netty.handler.codec.ByteToMessageDecoder
import io.netty.handler.codec.http2.Http2CodecUtil
import org.http4s.HttpApp
import org.http4s.server.ServiceErrorHandler
import org.http4s.server.websocket.WebSocketBuilder2
import org.log4s.getLogger
Expand All @@ -34,7 +33,7 @@ import java.util

private class PriorKnowledgeDetectionHandler[F[_]: Async](
config: NegotiationHandler.Config,
httpApp: WebSocketBuilder2[F] => HttpApp[F],
httpApp: WebSocketBuilder2[F] => HttpResource[F],
serviceErrorHandler: ServiceErrorHandler[F],
dispatcher: Dispatcher[F]
) extends ByteToMessageDecoder {
Expand Down

0 comments on commit 9cbce7d

Please sign in to comment.