Skip to content

Commit

Permalink
feat: respect http status 429 (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
gastonfournier authored Jul 15, 2024
1 parent 687ddfd commit 8289f0c
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,4 @@ class LocalBackup(
private fun id(context: UnleashContext): String {
return context.hashCode().toString()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ class CacheDirectoryProvider(private val config: LocalStorageConfig, private val
private class DeleteFileShutdownHook(file: File) : Thread(Runnable {
file.deleteRecursively()
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package io.getunleash.android.http

import android.util.Log
import java.net.HttpURLConnection
import java.util.concurrent.atomic.AtomicLong
import kotlin.math.max
import kotlin.math.min


class Throttler(
private val intervalLengthInSeconds: Long,
longestAcceptableIntervalSeconds: Long,
private val target: String
) {
companion object {
private const val TAG = "Throttler"
}
private val maxSkips = max(
longestAcceptableIntervalSeconds / max(
intervalLengthInSeconds, 1
), 1
)

private val skips = AtomicLong(0)
private val failures = AtomicLong(0)

/**
* We've had one successful call, so if we had 10 failures in a row, this will reduce the skips
* down to 9, so that we gradually start polling more often, instead of doing max load
* immediately after a sequence of errors.
*/
internal fun decrementFailureCountAndResetSkips() {
if (failures.get() > 0) {
skips.set(max(failures.decrementAndGet(), 0L))
}
}

/**
* We've gotten the message to back off (usually a 429 or a 50x). If we have successive
* failures, failure count here will be incremented higher and higher which will handle
* increasing our backoff, since we set the skip count to the failure count after every reset
*/
private fun increaseSkipCount() {
skips.set(min(failures.incrementAndGet(), maxSkips))
}

/**
* We've received an error code that we don't expect to change, which means we've already logged
* an ERROR. To avoid hammering the server that just told us we did something wrong and to avoid
* flooding the logs, we'll increase our skip count to maximum
*/
private fun maximizeSkips() {
skips.set(maxSkips)
failures.incrementAndGet()
}

fun performAction(): Boolean {
return skips.get() <= 0
}

fun skipped() {
skips.decrementAndGet()
}

internal fun handleHttpErrorCodes(responseCode: Int) {
if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED
|| responseCode == HttpURLConnection.HTTP_FORBIDDEN
) {
maximizeSkips()
Log.e(TAG,
"Client was not authorized to talk to the Unleash API at $target. Backing off to $maxSkips times our poll interval (of $intervalLengthInSeconds seconds) to avoid overloading server",
)
}
if (responseCode == HttpURLConnection.HTTP_NOT_FOUND) {
maximizeSkips()
Log.e(TAG,
"Server said that the endpoint at $target does not exist. Backing off to $maxSkips times our poll interval (of $intervalLengthInSeconds seconds) to avoid overloading server",
)
} else if (responseCode == 429) {
increaseSkipCount()
Log.i(TAG,
"RATE LIMITED for the ${failures.get()}. time. Further backing off. Current backoff at ${skips.get()} times our interval (of $intervalLengthInSeconds seconds)",
)
} else if (responseCode >= HttpURLConnection.HTTP_INTERNAL_ERROR) {
increaseSkipCount()
Log.i(TAG,
"Server failed with a $responseCode status code. Backing off. Current backoff at ${skips.get()} times our poll interval (of $intervalLengthInSeconds seconds)",
)
}
}

fun getSkips(): Long {
return skips.get()
}

fun getFailures(): Long {
return failures.get()
}

fun handle(statusCode: Int) {
if (statusCode in 200..399) {
decrementFailureCountAndResetSkips();
}
if (statusCode >= 400) {
handleHttpErrorCodes(statusCode);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import io.getunleash.android.data.CountBucket
import io.getunleash.android.data.MetricsPayload
import io.getunleash.android.data.Parser
import io.getunleash.android.data.Variant
import io.getunleash.android.http.Throttler
import okhttp3.Call
import okhttp3.Callback
import okhttp3.Headers.Companion.toHeaders
Expand All @@ -18,6 +19,7 @@ import okhttp3.RequestBody.Companion.toRequestBody
import okhttp3.Response
import java.io.IOException
import java.util.Date
import java.util.concurrent.TimeUnit

class MetricsSender(
private val config: UnleashConfig,
Expand All @@ -27,32 +29,48 @@ class MetricsSender(
companion object {
private const val TAG: String = "MetricsSender"
}
private val metricsUrl = config.proxyUrl.toHttpUrl().newBuilder().addPathSegment("client").addPathSegment("metrics").build()
private val metricsUrl = config.proxyUrl.toHttpUrl().newBuilder().addPathSegment("client")
.addPathSegment("metrics").build()
private var bucket: CountBucket = CountBucket(start = Date())
private val throttler =
Throttler(
TimeUnit.MILLISECONDS.toSeconds(config.metricsStrategy.interval),
longestAcceptableIntervalSeconds = 300,
metricsUrl.toString()
)

override suspend fun sendMetrics() {
val toReport = swapAndFreeze()
val payload = MetricsPayload(
appName = config.appName,
instanceId = config.instanceId,
bucket = toReport
)
val request = Request.Builder()
.headers(applicationHeaders.toHeaders())
.url(metricsUrl).post(
Parser.jackson.writeValueAsString(payload).toRequestBody("application/json".toMediaType())
).build()
httpClient.newCall(request).enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
Log.i(TAG, "Failed to report metrics for interval", e)
}
if (throttler.performAction()) {
val toReport = swapAndFreeze()
val payload = MetricsPayload(
appName = config.appName,
instanceId = config.instanceId,
bucket = toReport
)
val request = Request.Builder()
.headers(applicationHeaders.toHeaders())
.url(metricsUrl).post(
Parser.jackson.writeValueAsString(payload)
.toRequestBody("application/json".toMediaType())
).build()
httpClient.newCall(request).enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
Log.i(TAG, "Failed to report metrics for interval", e)
}

override fun onResponse(call: Call, response: Response) {
Log.d(TAG, "Received status code ${response.code} from ${request.method} $metricsUrl")
response.body.use { // Need to consume body to ensure we don't keep connection open
override fun onResponse(call: Call, response: Response) {
Log.d(
TAG,
"Received status code ${response.code} from ${request.method} $metricsUrl"
)
throttler.handle(response.code)
response.body.use { // Need to consume body to ensure we don't keep connection open
}
}
}
})
})
} else {
throttler.skipped();
}
}

private fun swapAndFreeze(): Bucket {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.getunleash.android.data.UnleashContext
import io.getunleash.android.data.UnleashState
import io.getunleash.android.errors.NoBodyException
import io.getunleash.android.errors.NotAuthorizedException
import io.getunleash.android.http.Throttler
import io.getunleash.android.unleashScope
import io.getunleash.errors.ServerException
import kotlinx.coroutines.Dispatchers
Expand All @@ -34,6 +35,7 @@ import okhttp3.Response
import okhttp3.internal.closeQuietly
import java.io.Closeable
import java.io.IOException
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
Expand Down Expand Up @@ -63,6 +65,12 @@ open class UnleashFetcher(
)
private val coroutineContextForContextChange: CoroutineContext = Dispatchers.IO
private val currentCall = AtomicReference<Call?>(null)
private val throttler =
Throttler(
TimeUnit.MILLISECONDS.toSeconds(unleashConfig.pollingStrategy.interval),
longestAcceptableIntervalSeconds = 300,
proxyUrl.toString()
)

fun getFeaturesReceivedFlow() = featuresReceivedFlow.asSharedFlow()

Expand All @@ -78,8 +86,12 @@ open class UnleashFetcher(
}

suspend fun refreshToggles(): ToggleResponse {
Log.d(TAG, "Refreshing toggles")
return refreshTogglesWithContext(unleashContext.value)
if (throttler.performAction()) {
Log.d(TAG, "Refreshing toggles")
return refreshTogglesWithContext(unleashContext.value)
}
Log.i(TAG, "Skipping refresh toggles due to throttling")
return ToggleResponse(Status.FAILED)
}

internal suspend fun refreshTogglesWithContext(ctx: UnleashContext): ToggleResponse {
Expand Down Expand Up @@ -133,6 +145,7 @@ open class UnleashFetcher(
val response = call.await()
response.use { res ->
Log.d(TAG, "Received status code ${res.code} from $contextUrl")
throttler.handle(response.code)
return when {
res.isSuccessful -> {
etag = res.header("ETag")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package io.getunleash.android

import android.content.Context
import androidx.lifecycle.Lifecycle
import io.getunleash.android.backup.LocalBackup
import io.getunleash.android.cache.ToggleCache
import io.getunleash.android.data.Toggle
import io.getunleash.android.data.UnleashState
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
import org.mockito.Mockito.mock
import java.io.File

class DefaultUnleashTest: BaseTest() {
private val testCache = object: ToggleCache {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package io.getunleash.android.http

import io.getunleash.android.BaseTest
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
import java.net.MalformedURLException

class ThrottlerTest : BaseTest(){

@Test
@Throws(MalformedURLException::class)
fun shouldNeverDecrementFailuresOrSkipsBelowZero() {
val throttler =
Throttler(10, 300, "https://localhost:1500/api");
throttler.decrementFailureCountAndResetSkips();
throttler.decrementFailureCountAndResetSkips();
throttler.decrementFailureCountAndResetSkips();
throttler.decrementFailureCountAndResetSkips();
throttler.decrementFailureCountAndResetSkips();
assertThat(throttler.getSkips()).isEqualTo(0);
assertThat(throttler.getFailures()).isEqualTo(0);
}

@Test
@Throws(MalformedURLException::class)
fun setToMaxShouldReduceDownEventually() {
val throttler =
Throttler(150, 300, "https://localhost:1500/api");
throttler.handleHttpErrorCodes(404);
assertThat(throttler.getSkips()).isEqualTo(2);
assertThat(throttler.getFailures()).isEqualTo(1);
throttler.skipped();
assertThat(throttler.getSkips()).isEqualTo(1);
assertThat(throttler.getFailures()).isEqualTo(1);
throttler.skipped();
assertThat(throttler.getSkips()).isEqualTo(0);
assertThat(throttler.getFailures()).isEqualTo(1);
throttler.decrementFailureCountAndResetSkips();
assertThat(throttler.getSkips()).isEqualTo(0);
assertThat(throttler.getFailures()).isEqualTo(0);
throttler.decrementFailureCountAndResetSkips();
assertThat(throttler.getSkips()).isEqualTo(0);
assertThat(throttler.getFailures()).isEqualTo(0);
}

@Test
@Throws(MalformedURLException::class)
fun handleIntermittentFailures() {
val throttler =
Throttler(50, 300, "https://localhost:1500/api");
throttler.handleHttpErrorCodes(429);
throttler.handleHttpErrorCodes(429);
throttler.handleHttpErrorCodes(503);
throttler.handleHttpErrorCodes(429);
assertThat(throttler.getSkips()).isEqualTo(4);
assertThat(throttler.getFailures()).isEqualTo(4);
throttler.decrementFailureCountAndResetSkips();
assertThat(throttler.getSkips()).isEqualTo(3);
assertThat(throttler.getFailures()).isEqualTo(3);
throttler.handleHttpErrorCodes(429);
assertThat(throttler.getSkips()).isEqualTo(4);
assertThat(throttler.getFailures()).isEqualTo(4);
throttler.decrementFailureCountAndResetSkips();
throttler.decrementFailureCountAndResetSkips();
throttler.decrementFailureCountAndResetSkips();
throttler.decrementFailureCountAndResetSkips();
throttler.decrementFailureCountAndResetSkips();
throttler.decrementFailureCountAndResetSkips();
throttler.decrementFailureCountAndResetSkips();
assertThat(throttler.getSkips()).isEqualTo(0);
assertThat(throttler.getFailures()).isEqualTo(0);
}
}

0 comments on commit 8289f0c

Please sign in to comment.