Skip to content
This repository has been archived by the owner on Jul 2, 2020. It is now read-only.

Substitute headers #46

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/scala/com/twitter/diffy/ApiController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ApiController @Inject()(
JoinedEndpoint: JoinedEndpoint,
includeWeights: Boolean,
excludeNoise: Boolean
) =
) : Map[String, Map[String, Any]] =
Map(
"endpoint" -> Renderer.endpoint(JoinedEndpoint.endpoint),
"fields" ->
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/twitter/diffy/proxy/ClientService.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.twitter.diffy.proxy

import com.twitter.finagle.{Addr, Name, Resolver, Service}
import com.twitter.finagle.thrift.ThriftClientRequest
import com.twitter.finagle.{Addr, Name, Service}
import com.twitter.util.{Time, Var}
import org.jboss.netty.handler.codec.http.{HttpRequest, HttpResponse}

Expand Down Expand Up @@ -42,7 +42,7 @@ case class ThriftService(
}
}

private[this] def sizeChange(size: Int) {
private[this] def sizeChange(size: Int) : Unit = {
changeCount += 1
if (changeCount > 1) {
changedAt = Some(Time.now)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.twitter.diffy.proxy

import com.twitter.finagle.Filter
import com.twitter.util.Future
import org.jboss.netty.handler.codec.http.{HttpHeaders, DefaultHttpRequest, HttpResponse, HttpRequest}
import scala.collection.JavaConverters._

object CloneHttpRequestFilter {

type Req = HttpRequest
type Res = HttpResponse

private def mkCloneRequest(reqIn:Req, filter: (Req => Future[Res])):Future[Res] = {
def copyHeader(headers:HttpHeaders)(k: String, v:String) : Unit = { headers.add(k, v)}

val reqOut: DefaultHttpRequest = new DefaultHttpRequest(reqIn.getProtocolVersion, reqIn.getMethod, reqIn.getUri)
reqOut.setChunked(reqIn.isChunked)
reqOut.setContent(reqIn.getContent)

val headers = for {entry <- reqIn.headers().asScala} yield (entry.getKey, entry.getValue)
headers.foreach((copyHeader(reqOut.headers)_).tupled)

filter(reqOut)
}

def apply(): Filter[Req, Res, Req, Res] = Filter.mk(mkCloneRequest)
}
16 changes: 10 additions & 6 deletions src/main/scala/com/twitter/diffy/proxy/HttpDifferenceProxy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package com.twitter.diffy.proxy

import java.net.SocketAddress

import com.twitter.diffy.analysis.{DifferenceAnalyzer, JoinedDifferences, InMemoryDifferenceCollector}
import com.twitter.diffy.analysis.{DifferenceAnalyzer, InMemoryDifferenceCollector, JoinedDifferences}
import com.twitter.diffy.lifter.{HttpLifter, Message}
import com.twitter.diffy.proxy.DifferenceProxy.NoResponseException
import com.twitter.finagle.{Service, Http, Filter}
import com.twitter.finagle.http.{Status, Response, Method, Request}
import com.twitter.util.{Try, Future}
import org.jboss.netty.handler.codec.http.{HttpResponse, HttpRequest}
import com.twitter.finagle.http.{Method, Request, Response, Status}
import com.twitter.finagle.{Filter, Http, Service}
import com.twitter.util.{Future, Try}
import org.jboss.netty.handler.codec.http.{HttpRequest, HttpResponse}

object HttpDifferenceProxy {
val okResponse = Future.value(Response(Status.Ok))
Expand All @@ -34,8 +34,12 @@ trait HttpDifferenceProxy extends DifferenceProxy {
override type Rep = HttpResponse
override type Srv = HttpService


override def serviceFactory(serverset: String, label: String) =
HttpService(Http.newClient(serverset, label).toService)
HttpService(
CloneHttpRequestFilter.apply.
andThen(RefineHttpHeadersByLabel(label)).
Copy link
Contributor

@puneetkhanduri puneetkhanduri Jul 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking a step back, RefineHttpHeadersByLabel might be better written as two separate filters:

RewriteHeadersForLabel(label)
RemoveHeadersForLabels(labels)

and then composing them as:

CloneHttpRequestFilter.apply.
  andThen RewriteHttpHeadersForLabel(label)
  andThen RemoveHeadersForLabels(DifferenceProxy.AllLabels filter { !_.equalsIgnoreCase(label) })

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the desire to partition the logic. I'd prefer not to perform multiple runs through the http header set.

Let's see if we can have one filter, but aggregate the header effects.

andThen(Http.newClient(serverset, label).toService))

override lazy val server =
Http.serve(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.twitter.diffy.proxy

import com.twitter.finagle.Filter
import com.twitter.finagle.http.Request
import com.twitter.util.Future
import org.jboss.netty.handler.codec.http.{HttpResponse, HttpHeaders, HttpRequest}
import scala.collection.JavaConverters._

object RefineHttpHeadersByLabel {
val knownLabels: Seq[String] = Seq("primary", "secondary", "candidate")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These constants should be maintained in DifferenceProxy.scala to ensure that the labels always match. The current implementation requires these labels to be maintained in two places.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.


def substitute(inclusions: Seq[String], exclusion: Seq[String])(req: HttpRequest) : HttpRequest = {

def orFn[T](f: T => Boolean, g: T => Boolean): T => Boolean = { t => f(t) || g(t) }
def constFalse[T](t: T): Boolean = false

def renameHeaders(headers: HttpHeaders)(nameOld: String, nameNew: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't really need headers as an arg since it's closured

val temp = headers.get(nameOld)
headers.remove(nameOld)
headers.set(nameNew, temp)
}


val removeExclusion = exclusion.
map(_+"_").
map( x => (_:String).startsWith(x)).
foldLeft(constFalse[String]_)(orFn)

val reqOut = Request.apply(req)
val headers: HttpHeaders = reqOut.headers
//k => Option[Function[k => k]]
val incomingHeaders = headers.names().asScala.toSet

val rewrites = for {
name <- incomingHeaders
inclusion <- inclusions if name.startsWith(inclusion + "_")
} yield name->name.substring(inclusion.length+1)

incomingHeaders.filter( removeExclusion).foreach( headers.remove )
Copy link
Contributor

@puneetkhanduri puneetkhanduri Jul 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can ignore this but I am leaving it in to share my thought process through the review.
Consider making this slightly procedural and reducing the total lines of code:

incomingHeaders foreach { name =>
  val shouldExclude = exclusion exists { ex => name.startsWith(s"$ex_") }
  if (shouldExclude) {
    headers.remove(name)
  }

  val shouldInclude = inclusion exists { in => name.startsWith(s"$in_") }
  if (shouldInclude) {
    renameHeaders(headers)(name, name.substring(inclusion.length + 1))
  }
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reflections - Since the header set is mutable, we should apply all the changes in a single operation.
this makes sense. It's counter to splitting the filters, unless we just compose aggregates of headerEffects.

rewrites.foreach((renameHeaders(headers) _ ).tupled)

reqOut
}

def apply(label:String): Filter[HttpRequest, HttpResponse, HttpRequest, HttpResponse] = {
def mkFilter(inclusions: Seq[String], exclusion: Seq[String])
(req:HttpRequest, filter: (HttpRequest => Future[HttpResponse]) ) =
filter(substitute(inclusions, exclusion)(req))

val (inclusions, exclusion) = knownLabels.partition(_.equalsIgnoreCase(label))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expected behavior is that "inclusions" should always be equal to "Seq(label)". You can simplify this code by using that observation and defining "exclusions" as "knownLabels filter { !_.equalsIgnoreCase(label) }" and eliminating inclusions by directly using label instead.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

Filter.mk( mkFilter(inclusions, exclusion))
}

}