Skip to content

Commit

Permalink
Wait for service availability at the ingress before running tests
Browse files Browse the repository at this point in the history
This commit makes the RestateDeployer wait for the availability of registered
services at the ingress. The deployer checks whether the ingress is serving
the registered services via calling grpc.health.v1.Health/Check and checking
that the response contains status = SERVING.

This fixes #266.
  • Loading branch information
tillrohrmann committed Feb 14, 2024
1 parent 1632e6f commit 551fe3d
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 72 deletions.
2 changes: 2 additions & 0 deletions test-utils/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ dependencies {
implementation(libs.jackson.databind)
implementation(libs.jackson.yaml)

implementation(libs.awaitility)

testImplementation(libs.junit.all)
testImplementation(libs.assertj)
}
Expand Down
43 changes: 43 additions & 0 deletions test-utils/src/main/kotlin/dev/restate/e2e/utils/JsonUtils.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate e2e tests,
// which are released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/e2e/blob/main/LICENSE

package dev.restate.e2e.utils

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import java.net.URI
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.nio.charset.StandardCharsets

object JsonUtils {
private val objMapper = ObjectMapper()
private val httpClient = HttpClient.newHttpClient()

fun jacksonBodyHandler(): HttpResponse.BodyHandler<JsonNode> {
return HttpResponse.BodyHandler {
HttpResponse.BodySubscribers.mapping(
HttpResponse.BodySubscribers.ofString(StandardCharsets.UTF_8), objMapper::readTree)
}
}

fun jacksonBodyPublisher(value: Any): HttpRequest.BodyPublisher {
return HttpRequest.BodyPublishers.ofString(objMapper.writeValueAsString(value))
}

fun postJsonRequest(uri: String, reqBody: Any): HttpResponse<JsonNode> {
val req =
HttpRequest.newBuilder(URI.create(uri))
.headers("Content-Type", "application/json")
.POST(jacksonBodyPublisher(reqBody))
.build()
return httpClient.send(req, jacksonBodyHandler())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
import java.io.File
import java.lang.reflect.Method
import java.net.URI
import java.net.URL
import java.nio.file.Path
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
Expand Down Expand Up @@ -87,6 +88,8 @@ private constructor(

private const val RESTATE_URI_ENV = "RESTATE_URI"

const val HEALTH_CHECK_SERVICE = "grpc.health.v1.Health/Check"

private val logger = LogManager.getLogger(RestateDeployer::class.java)

@JvmStatic
Expand Down Expand Up @@ -250,6 +253,8 @@ private constructor(
.setPort(getContainerPort(RESTATE_RUNTIME, RUNTIME_META_ENDPOINT_PORT)))
serviceContainers.values.forEach { (spec, _) -> discoverDeployment(client, spec) }

waitForServicesBeingAvailable()

// Log environment
writeEnvironmentReport(testReportDir)
}
Expand Down Expand Up @@ -382,13 +387,28 @@ private constructor(
RUNTIME_META_ENDPOINT_PORT,
)
proxyContainer.waitHttp(
Wait.forHttp("/grpc.health.v1.Health/Check"),
Wait.forHttp("/${HEALTH_CHECK_SERVICE}"),
RESTATE_RUNTIME,
RUNTIME_GRPC_INGRESS_ENDPOINT_PORT,
)
logger.debug("Runtime META and Ingress healthy")
}

private fun waitForServicesBeingAvailable() {
val adminUrl =
URL(
"http",
"127.0.0.1",
getContainerPort(RESTATE_RUNTIME, RUNTIME_GRPC_INGRESS_ENDPOINT_PORT),
"")

for ((spec, _) in serviceContainers.values) {
if (!spec.skipRegistration) {
waitForServicesBeingAvailable(spec.services, adminUrl)
}
}
}

fun discoverDeployment(client: DeploymentApi, spec: ServiceSpec) {
val url = spec.getEndpointUrl()
if (spec.skipRegistration) {
Expand Down
12 changes: 9 additions & 3 deletions test-utils/src/main/kotlin/dev/restate/e2e/utils/ServiceSpec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ data class ServiceSpec(
internal val containerImage: String,
val hostName: String,
internal val envs: Map<String, String>,
val services: Set<String>,
internal val port: Int,
internal val skipRegistration: Boolean,
internal val dependencies: List<Startable>,
Expand All @@ -44,6 +45,7 @@ data class ServiceSpec(
private var port: Int = 8080,
private var skipRegistration: Boolean = false,
private var dependencies: MutableList<Startable> = mutableListOf(),
private var services: MutableSet<String> = mutableSetOf(),
) {
fun withHostName(hostName: String) = apply { this.hostName = hostName }

Expand All @@ -54,11 +56,14 @@ data class ServiceSpec(

fun withEnvs(envs: Map<String, String>) = apply { this.envs.putAll(envs) }

fun withServices(vararg services: String) = apply { this.services.addAll(services) }

fun skipRegistration() = apply { this.skipRegistration = true }

fun dependsOn(container: Startable) = apply { this.dependencies.add(container) }

fun build() = ServiceSpec(containerImage, hostName, envs, port, skipRegistration, dependencies)
fun build() =
ServiceSpec(containerImage, hostName, envs, services, port, skipRegistration, dependencies)
}

fun toBuilder(): Builder {
Expand All @@ -67,13 +72,14 @@ data class ServiceSpec(
hostName,
envs = envs.toMutableMap(),
port,
dependencies = dependencies.toMutableList())
dependencies = dependencies.toMutableList(),
services = services.toMutableSet())
}

internal fun toContainer(): GenericContainer<*> {
return GenericContainer(DockerImageName.parse(containerImage))
.withEnv("PORT", port.toString())
.withEnv(envs)
.withEnv(envs + ("SERVICES" to services.joinToString(",")))
.dependsOn(dependencies)
}

Expand Down
32 changes: 28 additions & 4 deletions test-utils/src/main/kotlin/dev/restate/e2e/utils/utils.kt
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate e2e tests,
// which are released under the MIT license.
// This file is part of the Restate Java SDK,
// which is released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/e2e/blob/main/LICENSE
// https://github.com/restatedev/sdk-java/blob/main/LICENSE

package dev.restate.e2e.utils

import com.github.dockerjava.api.command.InspectContainerResponse
import java.net.URL
import org.awaitility.kotlin.await
import org.awaitility.kotlin.matches
import org.awaitility.kotlin.untilCallTo
import org.testcontainers.containers.wait.strategy.WaitStrategyTarget

internal open class NotCachedContainerInfo(private val delegate: WaitStrategyTarget) :
Expand All @@ -29,3 +33,23 @@ internal class WaitOnSpecificPortsTarget(
return ports.toMutableList()
}
}

fun waitForServicesBeingAvailable(services: Collection<String>, ingressURL: URL) {
val healthCheckURL =
URL(
ingressURL.protocol,
ingressURL.host,
ingressURL.port,
"/${RestateDeployer.HEALTH_CHECK_SERVICE}")

for (service in services) {
val body = mapOf("service" to service)

await
.untilCallTo { JsonUtils.postJsonRequest(healthCheckURL.toString(), body) }
.matches { response ->
response!!.statusCode() == 200 &&
response!!.body().get("status").asText().equals("SERVING")
}
}
}
4 changes: 2 additions & 2 deletions tests/src/test/kotlin/dev/restate/e2e/Containers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object Containers {
fun javaServicesContainer(hostName: String, vararg services: String): ServiceSpec.Builder {
assert(services.isNotEmpty())
return ServiceSpec.builder("restatedev/e2e-java-services")
.withEnv("SERVICES", services.joinToString(","))
.withServices(*services)
.withHostName(hostName)
}

Expand Down Expand Up @@ -82,7 +82,7 @@ object Containers {
fun nodeServicesContainer(hostName: String, vararg services: String): ServiceSpec.Builder {
assert(services.isNotEmpty())
return ServiceSpec.builder("restatedev/e2e-node-services")
.withEnv("SERVICES", services.joinToString(","))
.withServices(*services)
.withEnv("RESTATE_DEBUG_LOGGING", "JOURNAL")
.withHostName(hostName)
}
Expand Down
4 changes: 2 additions & 2 deletions tests/src/test/kotlin/dev/restate/e2e/KafkaIngressTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,11 @@ class NodeHandlerAPIKafkaIngressTest {
HttpRequest.newBuilder(
URI.create(
"${httpEndpointURL}${Containers.HANDLER_API_COUNTER_SERVICE_NAME}/get"))
.POST(Utils.jacksonBodyPublisher(mapOf("key" to counter)))
.POST(JsonUtils.jacksonBodyPublisher(mapOf("key" to counter)))
.headers("Content-Type", "application/json")
.build()

val response = client.send(req, Utils.jacksonBodyHandler())
val response = client.send(req, JsonUtils.jacksonBodyHandler())

assertThat(response.statusCode()).isEqualTo(200)
assertThat(response.headers().firstValue("content-type"))
Expand Down
32 changes: 2 additions & 30 deletions tests/src/test/kotlin/dev/restate/e2e/Utils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,46 +10,18 @@
package dev.restate.e2e

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import java.net.URI
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.nio.charset.StandardCharsets
import dev.restate.e2e.utils.JsonUtils
import org.assertj.core.api.Assertions.assertThat

object Utils {

private val objMapper = ObjectMapper()
private val httpClient = HttpClient.newHttpClient()

fun jacksonBodyHandler(): HttpResponse.BodyHandler<JsonNode> {
return HttpResponse.BodyHandler {
HttpResponse.BodySubscribers.mapping(
HttpResponse.BodySubscribers.ofString(StandardCharsets.UTF_8), objMapper::readTree)
}
}

fun jacksonBodyPublisher(value: Any): HttpRequest.BodyPublisher {
return HttpRequest.BodyPublishers.ofString(objMapper.writeValueAsString(value))
}

fun postJsonRequest(uri: String, reqBody: Any): HttpResponse<JsonNode> {
val req =
HttpRequest.newBuilder(URI.create(uri))
.headers("Content-Type", "application/json")
.POST(jacksonBodyPublisher(reqBody))
.build()
return httpClient.send(req, jacksonBodyHandler())
}

fun doJsonRequestToService(
restateEndpoint: String,
service: String,
method: String,
reqBody: Any
): JsonNode {
val res = postJsonRequest("${restateEndpoint}${service}/${method}", reqBody)
val res = JsonUtils.postJsonRequest("${restateEndpoint}${service}/${method}", reqBody)
assertThat(res.statusCode()).isEqualTo(200)
assertThat(res.headers().firstValue("content-type"))
.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import dev.restate.e2e.Containers.EMBEDDED_HANDLER_SERVER_HOSTNAME
import dev.restate.e2e.Containers.EMBEDDED_HANDLER_SERVER_PORT
import dev.restate.e2e.Containers.HANDLER_API_COUNTER_SERVICE_NAME
import dev.restate.e2e.Containers.nodeServicesContainer
import dev.restate.e2e.Utils.postJsonRequest
import dev.restate.e2e.utils.*
import java.net.URL
import java.util.UUID
Expand Down Expand Up @@ -55,7 +54,7 @@ class EmbeddedHandlerApiTest {

for (i in 0..2) {
val response =
postJsonRequest(
JsonUtils.postJsonRequest(
"http://localhost:${embeddedHandlerServerPort}/increment_counter_test",
mapOf("id" to operationUuid, "input" to counterUuid))
assertThat(response.statusCode()).isEqualTo(200)
Expand All @@ -64,7 +63,7 @@ class EmbeddedHandlerApiTest {
}

val response =
postJsonRequest(
JsonUtils.postJsonRequest(
"${httpEndpointURL}$HANDLER_API_COUNTER_SERVICE_NAME/get", mapOf("key" to counterUuid))
assertThat(response.statusCode()).isEqualTo(200)
assertThat(response.body().get("response").get("counter").asInt()).isEqualTo(1)
Expand Down Expand Up @@ -100,12 +99,11 @@ class EmbeddedHandlerApiTest {
@InjectContainerPort(
hostName = EMBEDDED_HANDLER_SERVER_HOSTNAME, port = EMBEDDED_HANDLER_SERVER_PORT)
embeddedHandlerServerPort: Int,
@InjectGrpcIngressURL httpEndpointURL: URL
) {
val operationUuid = UUID.randomUUID().toString()

val response =
postJsonRequest(
JsonUtils.postJsonRequest(
"http://localhost:${embeddedHandlerServerPort}/side_effect_and_awakeable",
mapOf("id" to operationUuid, "itemsNumber" to 10))
assertThat(response.statusCode()).isEqualTo(200)
Expand All @@ -122,12 +120,11 @@ class EmbeddedHandlerApiTest {
@InjectContainerPort(
hostName = EMBEDDED_HANDLER_SERVER_HOSTNAME, port = EMBEDDED_HANDLER_SERVER_PORT)
embeddedHandlerServerPort: Int,
@InjectGrpcIngressURL httpEndpointURL: URL
) {
val operationUuid = UUID.randomUUID().toString()

val response =
postJsonRequest(
JsonUtils.postJsonRequest(
"http://localhost:${embeddedHandlerServerPort}/consecutive_side_effects",
mapOf("id" to operationUuid))
assertThat(response.statusCode()).isEqualTo(200)
Expand All @@ -144,7 +141,7 @@ class EmbeddedHandlerApiTest {

for (i in 0..2) {
val response =
postJsonRequest(
JsonUtils.postJsonRequest(
"http://localhost:${embeddedHandlerServerPort}/${path}",
mapOf("id" to operationUuid, "input" to counterUuid))
assertThat(response.statusCode()).isEqualTo(200)
Expand All @@ -153,7 +150,7 @@ class EmbeddedHandlerApiTest {
await untilAsserted
{
val response =
postJsonRequest(
JsonUtils.postJsonRequest(
"${httpEndpointURL}$HANDLER_API_COUNTER_SERVICE_NAME/get",
mapOf("key" to counterUuid))
assertThat(response.statusCode()).isEqualTo(200)
Expand Down
Loading

0 comments on commit 551fe3d

Please sign in to comment.