Skip to content

Commit

Permalink
Add http transport (#449)
Browse files Browse the repository at this point in the history
* Add http transport

* Implement review feedback

* Update libthrift to get properly working servlet support

for spinning up a thrift http server for integration testing. The servlet namespace was renamed from javax.servlet to jakarta.servlet so that's why we need the newer version.

* refactor: move socket based test server implementation behind abstraction

in order to be able to introduce new, http based server for testing as well.

* Add integration test for http protocol (client)

* Add missing license headers

---------

Co-authored-by: Ben Bader <[email protected]>
  • Loading branch information
luqasn and benjamin-bader authored Oct 12, 2023
1 parent 8371641 commit 436e7f2
Show file tree
Hide file tree
Showing 11 changed files with 589 additions and 156 deletions.
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ okio = "3.3.0"

[libraries]
antlr = "org.antlr:antlr4:4.9.3"
apacheThrift = "org.apache.thrift:libthrift:0.17.0"
apacheThrift = "org.apache.thrift:libthrift:0.19.0"
clikt = "com.github.ajalt.clikt:clikt:3.1.0"
dokka = "org.jetbrains.dokka:dokka-gradle-plugin:1.7.20"
guava = "com.google.guava:guava:31.1-jre"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import com.microsoft.thrifty.testing.ServerProtocol
import com.microsoft.thrifty.testing.ServerTransport
import com.microsoft.thrifty.testing.TestServer
import com.microsoft.thrifty.transport.FramedTransport
import com.microsoft.thrifty.transport.HttpTransport
import com.microsoft.thrifty.transport.SocketTransport
import com.microsoft.thrifty.transport.Transport
import io.kotest.assertions.fail
Expand Down Expand Up @@ -73,6 +74,9 @@ class NonblockingCompactCoroutineConformanceTest : CoroutineConformanceTests()
@ServerConfig(transport = ServerTransport.NON_BLOCKING, protocol = ServerProtocol.JSON)
class NonblockingJsonCoroutineConformanceTest : CoroutineConformanceTests()

@ServerConfig(transport = ServerTransport.HTTP, protocol = ServerProtocol.JSON)
class HttpJsonCoroutineConformanceTest : CoroutineConformanceTests()

/**
* A test of auto-generated service code for the standard ThriftTest
* service.
Expand Down Expand Up @@ -103,12 +107,7 @@ abstract class CoroutineConformanceTests {
@BeforeAll
@JvmStatic
fun beforeAll() {
val port = testServer.port()
val transport = SocketTransport.Builder("localhost", port)
.readTimeout(2000)
.build()

transport.connect()
val transport = getTransportImpl()

this.transport = decorateTransport(transport)
this.protocol = createProtocol(this.transport)
Expand All @@ -123,6 +122,18 @@ abstract class CoroutineConformanceTests {
})
}

private fun getTransportImpl(): Transport {
return when(testServer.transport) {
ServerTransport.BLOCKING, ServerTransport.NON_BLOCKING ->
return SocketTransport.Builder("localhost", testServer.port())
.readTimeout(2000)
.build()
.apply { connect() }

ServerTransport.HTTP -> HttpTransport("http://localhost:${testServer.port()}/test/service")
}
}

/**
* When overridden in a derived class, wraps the given transport
* in a decorator, e.g. a framed transport.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import com.microsoft.thrifty.testing.ServerProtocol
import com.microsoft.thrifty.testing.ServerTransport
import com.microsoft.thrifty.testing.TestServer
import com.microsoft.thrifty.transport.FramedTransport
import com.microsoft.thrifty.transport.HttpTransport
import com.microsoft.thrifty.transport.SocketTransport
import com.microsoft.thrifty.transport.Transport
import io.kotest.matchers.should
Expand Down Expand Up @@ -72,6 +73,9 @@ class NonblockingCompactConformanceTest : KotlinConformanceTest()
@ServerConfig(transport = ServerTransport.NON_BLOCKING, protocol = ServerProtocol.JSON)
class NonblockingJsonConformanceTest : KotlinConformanceTest()

@ServerConfig(transport = ServerTransport.HTTP, protocol = ServerProtocol.JSON)
class HttpJsonConformanceTest : KotlinConformanceTest()

/**
* A test of auto-generated service code for the standard ThriftTest
* service.
Expand Down Expand Up @@ -102,12 +106,7 @@ abstract class KotlinConformanceTest {
@BeforeAll
@JvmStatic
fun beforeAll() {
val port = testServer.port()
val transport = SocketTransport.Builder("localhost", port)
.readTimeout(2000)
.build()

transport.connect()
val transport = getTransportImpl()

this.transport = decorateTransport(transport)
this.protocol = createProtocol(this.transport)
Expand All @@ -122,6 +121,18 @@ abstract class KotlinConformanceTest {
})
}

private fun getTransportImpl(): Transport {
return when(testServer.transport) {
ServerTransport.BLOCKING, ServerTransport.NON_BLOCKING ->
return SocketTransport.Builder("localhost", testServer.port())
.readTimeout(2000)
.build()
.apply { connect() }

ServerTransport.HTTP -> HttpTransport("http://localhost:${testServer.port()}/test/service")
}
}

/**
* When overridden in a derived class, wraps the given transport
* in a decorator, e.g. a framed transport.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Thrifty
*
* Copyright (c) Microsoft Corporation
*
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the License);
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* THIS CODE IS PROVIDED ON AN *AS IS* BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING
* WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR CONDITIONS OF TITLE,
* FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR NON-INFRINGEMENT.
*
* See the Apache Version 2.0 License for specific language governing permissions and limitations under the License.
*/

// Adapted from Thrift sources; original license header follows:
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.microsoft.thrifty.transport


import com.microsoft.thrifty.internal.ProtocolException
import java.io.ByteArrayOutputStream
import java.io.InputStream
import java.net.HttpURLConnection
import java.net.URL

/**
* HTTP implementation of the TTransport interface. Used for working with a
* Thrift web services implementation (using for example TServlet).
*
* THIS IMPLEMENTATION IS NOT THREAD-SAFE !!!
*
* Based on the official thrift java THttpTransport with the apache client support removed.
* Both due to wanting to avoid the additional dependency as well as it being a bit weird to have two
* implementations to switch between in the same class.
*
* Uses HttpURLConnection internally
*
* Also note that under high load, the HttpURLConnection implementation
* may exhaust the open file descriptor limit.
*
* @see [THRIFT-970](https://issues.apache.org/jira/browse/THRIFT-970)
*/
open class HttpTransport(url: String) : Transport {
private val url: URL = URL(url)
private var currentState: Transport = Writing()
private var connectTimeout: Int? = null
private var readTimeout: Int? = null
private val customHeaders = mutableMapOf<String, String>()
private val sendBuffer = ByteArrayOutputStream()

private inner class Writing : Transport {
override fun read(buffer: ByteArray, offset: Int, count: Int): Int {
throw ProtocolException("Currently in writing state")
}

override fun write(buffer: ByteArray, offset: Int, count: Int) {
sendBuffer.write(buffer, offset, count)
}

override fun flush() {
val bytesToSend = sendBuffer.toByteArray()
sendBuffer.reset()
send(bytesToSend)
}

override fun close() {
// do nothing
}
}

private inner class Reading(val inputStream: InputStream) : Transport {
override fun read(buffer: ByteArray, offset: Int, count: Int): Int {
val ret = inputStream.read(buffer, offset, count)
if (ret == -1) {
throw ProtocolException("No more data available.")
}
return ret
}

override fun write(buffer: ByteArray, offset: Int, count: Int) {
throw ProtocolException("currently in reading state")
}

override fun flush() {
throw ProtocolException("currently in reading state")
}

override fun close() {
inputStream.close()
}
}

fun send(data: ByteArray) {
// Create connection object
val connection = url.openConnection() as HttpURLConnection

prepareConnection(connection)
// Make the request
connection.connect()
connection.outputStream.write(data)
val responseCode = connection.responseCode
if (responseCode != HttpURLConnection.HTTP_OK) {
throw ProtocolException("HTTP Response code: $responseCode")
}

// Read the response
this.currentState = Reading(connection.inputStream)
}

protected open fun prepareConnection(connection: HttpURLConnection) {
// Timeouts, only if explicitly set
connectTimeout?.let { connection.connectTimeout = it }
readTimeout?.let { connection.readTimeout = it }

connection.requestMethod = "POST"
connection.setRequestProperty("Content-Type", "application/x-thrift")
connection.setRequestProperty("Accept", "application/x-thrift")
connection.setRequestProperty("User-Agent", "Java/THttpClient")
for ((key, value) in customHeaders) {
connection.setRequestProperty(key, value)
}
connection.doOutput = true
}

fun setConnectTimeout(timeout: Int) {
connectTimeout = timeout
}

fun setReadTimeout(timeout: Int) {
readTimeout = timeout
}

fun setCustomHeaders(headers: Map<String, String>) {
customHeaders.clear()
customHeaders.putAll(headers)
}

fun setCustomHeader(key: String, value: String) {
customHeaders[key] = value
}

override fun close() {
currentState.close()
}

override fun read(buffer: ByteArray, offset: Int, count: Int): Int = currentState.read(buffer, offset, count)

override fun write(buffer: ByteArray, offset: Int, count: Int) {
// this mirrors the original behaviour, though it is not very elegant.
// we don't know when the user is done reading, so when they start writing again,
// we just go with it.
if (currentState is Reading) {
currentState.close()
currentState = Writing()
}
currentState.write(buffer, offset, count)
}

override fun flush() {
currentState.flush()
}
}
1 change: 1 addition & 0 deletions thrifty-test-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ dependencies {
implementation "commons-codec:commons-codec:1.15"
implementation "org.apache.httpcomponents:httpclient:4.5.13"
implementation "org.slf4j:slf4j-api:2.0.5"
implementation "org.apache.tomcat.embed:tomcat-embed-core:10.1.4"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Thrifty
*
* Copyright (c) Microsoft Corporation
*
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the License);
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* THIS CODE IS PROVIDED ON AN *AS IS* BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING
* WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR CONDITIONS OF TITLE,
* FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR NON-INFRINGEMENT.
*
* See the Apache Version 2.0 License for specific language governing permissions and limitations under the License.
*/
package com.microsoft.thrifty.testing;

import org.apache.catalina.LifecycleException;
import org.apache.catalina.core.StandardContext;
import org.apache.catalina.startup.Tomcat;

import static com.microsoft.thrifty.testing.TestServer.getProtocolFactory;

public class HttpServer implements TestServerInterface {
private Tomcat tomcat;

@Override
public void run(ServerProtocol protocol, ServerTransport transport) {
if (transport != ServerTransport.HTTP) {
throw new IllegalArgumentException("only http transport supported");
}
this.tomcat = new Tomcat();
tomcat.setBaseDir(System.getProperty("user.dir") + "\\build");
tomcat.setPort(0);
tomcat.getHost().setAutoDeploy(false);

String contextPath = "/test";
StandardContext context = new StandardContext();
context.setPath(contextPath);
context.addLifecycleListener(new Tomcat.FixContextListener());
tomcat.getHost().addChild(context);
tomcat.addServlet(contextPath, "testServlet", new TestServlet(getProtocolFactory(protocol)));
context.addServletMappingDecoded("/service", "testServlet");
try {
tomcat.start();
} catch (LifecycleException e) {
throw new RuntimeException(e);
}
}

@Override
public int port() {
return tomcat.getConnector().getLocalPort();
}

@Override
public void close() {
try {
tomcat.stop();
} catch (LifecycleException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ public enum ServerTransport {
/**
* A framed, non-blocking server socket,i.e. TNonblockingServerTransport.
*/
NON_BLOCKING
NON_BLOCKING,
HTTP
}
Loading

0 comments on commit 436e7f2

Please sign in to comment.