Skip to content

Commit

Permalink
Added retryWhen operator
Browse files Browse the repository at this point in the history
  • Loading branch information
arkivanov committed Aug 22, 2024
1 parent d089611 commit ff29dd2
Show file tree
Hide file tree
Showing 10 changed files with 956 additions and 0 deletions.
16 changes: 16 additions & 0 deletions reaktive/api/android/reaktive.api
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ public final class com/badoo/reaktive/completable/RetryKt {
public static synthetic fun retry$default (Lcom/badoo/reaktive/completable/Completable;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lcom/badoo/reaktive/completable/Completable;
}

public final class com/badoo/reaktive/completable/RetryWhenKt {
public static final fun retryWhen (Lcom/badoo/reaktive/completable/Completable;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/completable/Completable;
}

public final class com/badoo/reaktive/completable/SubscribeKt {
public static final fun subscribe (Lcom/badoo/reaktive/completable/Completable;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;)Lcom/badoo/reaktive/disposable/Disposable;
public static synthetic fun subscribe$default (Lcom/badoo/reaktive/completable/Completable;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;ILjava/lang/Object;)Lcom/badoo/reaktive/disposable/Disposable;
Expand Down Expand Up @@ -492,6 +496,10 @@ public final class com/badoo/reaktive/maybe/RetryKt {
public static synthetic fun retry$default (Lcom/badoo/reaktive/maybe/Maybe;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lcom/badoo/reaktive/maybe/Maybe;
}

public final class com/badoo/reaktive/maybe/RetryWhenKt {
public static final fun retryWhen (Lcom/badoo/reaktive/maybe/Maybe;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/maybe/Maybe;
}

public final class com/badoo/reaktive/maybe/SubscribeKt {
public static final fun subscribe (Lcom/badoo/reaktive/maybe/Maybe;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/disposable/Disposable;
public static synthetic fun subscribe$default (Lcom/badoo/reaktive/maybe/Maybe;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lcom/badoo/reaktive/disposable/Disposable;
Expand Down Expand Up @@ -861,6 +869,10 @@ public final class com/badoo/reaktive/observable/RetryKt {
public static synthetic fun retry$default (Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lcom/badoo/reaktive/observable/Observable;
}

public final class com/badoo/reaktive/observable/RetryWhenKt {
public static final fun retryWhen (Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/observable/Observable;
}

public final class com/badoo/reaktive/observable/SampleKt {
public static final fun sample-8Mi8wO0 (Lcom/badoo/reaktive/observable/Observable;JLcom/badoo/reaktive/scheduler/Scheduler;)Lcom/badoo/reaktive/observable/Observable;
}
Expand Down Expand Up @@ -1246,6 +1258,10 @@ public final class com/badoo/reaktive/single/RetryKt {
public static synthetic fun retry$default (Lcom/badoo/reaktive/single/Single;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lcom/badoo/reaktive/single/Single;
}

public final class com/badoo/reaktive/single/RetryWhenKt {
public static final fun retryWhen (Lcom/badoo/reaktive/single/Single;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/single/Single;
}

public abstract interface class com/badoo/reaktive/single/Single : com/badoo/reaktive/base/Source {
}

Expand Down
16 changes: 16 additions & 0 deletions reaktive/api/jvm/reaktive.api
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ public final class com/badoo/reaktive/completable/RetryKt {
public static synthetic fun retry$default (Lcom/badoo/reaktive/completable/Completable;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lcom/badoo/reaktive/completable/Completable;
}

public final class com/badoo/reaktive/completable/RetryWhenKt {
public static final fun retryWhen (Lcom/badoo/reaktive/completable/Completable;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/completable/Completable;
}

public final class com/badoo/reaktive/completable/SubscribeKt {
public static final fun subscribe (Lcom/badoo/reaktive/completable/Completable;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;)Lcom/badoo/reaktive/disposable/Disposable;
public static synthetic fun subscribe$default (Lcom/badoo/reaktive/completable/Completable;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;ILjava/lang/Object;)Lcom/badoo/reaktive/disposable/Disposable;
Expand Down Expand Up @@ -492,6 +496,10 @@ public final class com/badoo/reaktive/maybe/RetryKt {
public static synthetic fun retry$default (Lcom/badoo/reaktive/maybe/Maybe;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lcom/badoo/reaktive/maybe/Maybe;
}

public final class com/badoo/reaktive/maybe/RetryWhenKt {
public static final fun retryWhen (Lcom/badoo/reaktive/maybe/Maybe;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/maybe/Maybe;
}

public final class com/badoo/reaktive/maybe/SubscribeKt {
public static final fun subscribe (Lcom/badoo/reaktive/maybe/Maybe;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/disposable/Disposable;
public static synthetic fun subscribe$default (Lcom/badoo/reaktive/maybe/Maybe;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lcom/badoo/reaktive/disposable/Disposable;
Expand Down Expand Up @@ -861,6 +869,10 @@ public final class com/badoo/reaktive/observable/RetryKt {
public static synthetic fun retry$default (Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lcom/badoo/reaktive/observable/Observable;
}

public final class com/badoo/reaktive/observable/RetryWhenKt {
public static final fun retryWhen (Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/observable/Observable;
}

public final class com/badoo/reaktive/observable/SampleKt {
public static final fun sample-8Mi8wO0 (Lcom/badoo/reaktive/observable/Observable;JLcom/badoo/reaktive/scheduler/Scheduler;)Lcom/badoo/reaktive/observable/Observable;
}
Expand Down Expand Up @@ -1246,6 +1258,10 @@ public final class com/badoo/reaktive/single/RetryKt {
public static synthetic fun retry$default (Lcom/badoo/reaktive/single/Single;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lcom/badoo/reaktive/single/Single;
}

public final class com/badoo/reaktive/single/RetryWhenKt {
public static final fun retryWhen (Lcom/badoo/reaktive/single/Single;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/single/Single;
}

public abstract interface class com/badoo/reaktive/single/Single : com/badoo/reaktive/base/Source {
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.badoo.reaktive.completable

import com.badoo.reaktive.observable.Observable
import com.badoo.reaktive.observable.asCompletable
import com.badoo.reaktive.observable.retryWhen

/**
* Returns a [Completable] that automatically resubscribes to this [Completable] if it signals `onError`
* and the [Observable] returned by the [handler] function emits a value for that specific [Throwable].
*
* Please refer to the corresponding RxJava [document](https://reactivex.io/RxJava/javadoc/io/reactivex/Completable.html#retryWhen-io.reactivex.functions.Function-).
*/
fun Completable.retryWhen(handler: (Observable<Throwable>) -> Observable<*>): Completable =
asObservable()
.retryWhen(handler)
.asCompletable()
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.badoo.reaktive.maybe

import com.badoo.reaktive.observable.Observable
import com.badoo.reaktive.observable.firstOrComplete
import com.badoo.reaktive.observable.retryWhen

/**
* Returns a [Maybe] that automatically resubscribes to this [Maybe] if it signals `onError`
* and the [Observable] returned by the [handler] function emits a value for that specific [Throwable].
*
* Please refer to the corresponding RxJava [document](https://reactivex.io/RxJava/javadoc/io/reactivex/Maybe.html#retryWhen-io.reactivex.functions.Function-).
*/
fun <T> Maybe<T>.retryWhen(handler: (Observable<Throwable>) -> Observable<*>): Maybe<T> =
asObservable()
.retryWhen(handler)
.firstOrComplete()
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.badoo.reaktive.observable

import com.badoo.reaktive.base.CompleteCallback
import com.badoo.reaktive.base.ValueCallback
import com.badoo.reaktive.completable.CompletableCallbacks
import com.badoo.reaktive.disposable.CompositeDisposable
import com.badoo.reaktive.disposable.Disposable
import com.badoo.reaktive.disposable.SerialDisposable
import com.badoo.reaktive.disposable.plusAssign
import com.badoo.reaktive.subject.publish.PublishSubject
import com.badoo.reaktive.utils.atomic.AtomicBoolean

/**
* Returns an [Observable] that automatically resubscribes to this [Observable] if it signals `onError`
* and the [Observable] returned by the [handler] function emits a value for that specific [Throwable].
*
* Please refer to the corresponding RxJava [document](https://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#retryWhen-io.reactivex.functions.Function-).
*/
fun <T> Observable<T>.retryWhen(handler: (Observable<Throwable>) -> Observable<*>): Observable<T> =
observable { emitter ->
val disposables = CompositeDisposable()
emitter.setDisposable(disposables)

val errorSubject = PublishSubject<Throwable>()
val isError = AtomicBoolean()

val disposableObserver =
object : SerialDisposable(), ObservableObserver<T>, ValueCallback<T> by emitter, CompleteCallback by emitter {
override fun onSubscribe(disposable: Disposable) {
replace(disposable)
}

override fun onError(error: Throwable) {
replace(null)
isError.value = true
errorSubject.onNext(error)
}
}

disposables += disposableObserver

handler(errorSubject).subscribe(
object : ObservableObserver<Any?>, CompletableCallbacks by emitter {
override fun onSubscribe(disposable: Disposable) {
disposables += disposable
}

override fun onNext(value: Any?) {
if (isError.compareAndSet(true, false)) {
subscribe(disposableObserver)
}
}
}
)

subscribe(disposableObserver)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.badoo.reaktive.single

import com.badoo.reaktive.observable.Observable
import com.badoo.reaktive.observable.firstOrError
import com.badoo.reaktive.observable.retryWhen

/**
* Returns a [Single] that automatically resubscribes to this [Single] if it signals `onError`
* and the [Observable] returned by the [handler] function emits a value for that specific [Throwable].
*
* Please refer to the corresponding RxJava [document](https://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#retryWhen-io.reactivex.functions.Function-).
*/
fun <T> Single<T>.retryWhen(handler: (Observable<Throwable>) -> Observable<*>): Single<T> =
asObservable()
.retryWhen(handler)
.firstOrError()
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package com.badoo.reaktive.completable

import com.badoo.reaktive.observable.flatMap
import com.badoo.reaktive.observable.observableOf
import com.badoo.reaktive.observable.retryWhen
import com.badoo.reaktive.test.base.assertError
import com.badoo.reaktive.test.base.assertNotError
import com.badoo.reaktive.test.base.hasSubscribers
import com.badoo.reaktive.test.completable.TestCompletable
import com.badoo.reaktive.test.completable.assertComplete
import com.badoo.reaktive.test.completable.test
import com.badoo.reaktive.test.observable.TestObservable
import com.badoo.reaktive.test.observable.TestObservableObserver
import com.badoo.reaktive.test.observable.assertValue
import com.badoo.reaktive.test.observable.test
import kotlin.test.Ignore
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertNull
import kotlin.test.assertTrue

class RetryWhenTest : CompletableToCompletableTests by CompletableToCompletableTestsImpl({ retryWhen { it } }) {

private val upstream = TestCompletable()
private val retryObservable = TestObservable<Any?>()

@Ignore
@Test
override fun disposes_downstream_disposable_WHEN_upstream_produced_error() {
// Not applicable
}

@Ignore
@Test
override fun produces_error_WHEN_upstream_produced_error() {
// Not applicable
}

@Test
fun calls_handler_WHEN_subscribed() {
var isCalled = false

upstream
.retryWhen {
isCalled = true
retryObservable
}
.test()

assertTrue(isCalled)
}

@Test
fun produces_error_WHEN_handler_throws() {
val exception = Exception()

val observer = upstream.retryWhen { throw exception }.test()

observer.assertError(exception)
}

@Test
fun subscribes_to_retry_observable_WHEN_subscribed() {
upstream.retryWhen { retryObservable }.test()

assertEquals(1, retryObservable.subscriptionCount)
}

@Test
fun disposes_retry_observable_WHEN_disposed() {
val observer = upstream.retryWhen { retryObservable }.test()

observer.dispose()

assertFalse(retryObservable.hasSubscribers)
}

@Test
fun disposes_retry_observable_WHEN_upstream_completed() {
upstream.retryWhen { retryObservable }.test()

upstream.onComplete()

assertFalse(retryObservable.hasSubscribers)
}

@Test
fun does_not_dispose_retry_observable_WHEN_upstream_produced_error() {
upstream.retryWhen { retryObservable }.test()

upstream.onError(Exception())

assertEquals(1, retryObservable.subscriptionCount)
}

@Test
fun does_not_produce_error_WHEN_upstream_produced_error() {
val observer = upstream.retryWhen { retryObservable }.test()

upstream.onError(Exception())

assertNull(observer.error)
}

@Test
fun does_not_complete_WHEN_upstream_produced_error() {
val observer = upstream.retryWhen { retryObservable }.test()

upstream.onError(Exception())

assertFalse(observer.isComplete)
}

@Test
fun error_observable_emits_exception_WHEN_upstream_produced_error() {
val exception = Exception()
lateinit var errorObserver: TestObservableObserver<Any?>

upstream
.retryWhen {
errorObserver = it.test()
retryObservable
}
.test()

upstream.onError(exception)

errorObserver.assertValue(exception)
}

@Test
fun completes_WHEN_retry_observable_completed() {
val observer = upstream.retryWhen { retryObservable }.test()

retryObservable.onComplete()

observer.assertComplete()
}

@Test
fun produces_error_WHEN_retry_observable_produced_error() {
val exception = Exception()
val observer = upstream.retryWhen { retryObservable }.test()

retryObservable.onError(exception)

observer.assertError(exception)
}

@Test
fun does_not_produce_error_WHEN_retry_observable_emitted_value() {
val observer = upstream.retryWhen { retryObservable }.test()

retryObservable.onNext(1)

observer.assertNotError()
}

@Test
fun subscribes_to_upstream_WHEN_upstream_produced_error_and_retry_observable_emitted_value() {
upstream.retryWhen { retryObservable }.test()
upstream.onError(Exception())
upstream.reset()

retryObservable.onNext(1)

assertEquals(1, upstream.subscriptionCount)
}

@Test
fun completes_WHEN_resubscribed_after_error_and_upstream_completed() {
val observer = upstream.retryWhen { retryObservable }.test()
upstream.onError(Exception())
upstream.reset()
retryObservable.onNext(1)

upstream.onComplete()

observer.assertComplete()
}

@Test
fun resubscribes_to_upstream_only_once_WHEN_upstream_produced_error_and_retry_observable_emitted_multiple_values_synchronously() {
upstream
.retryWhen { errors ->
errors.flatMap {
upstream.reset()
observableOf(1, 2)
}
}.test()

upstream.onError(Exception())

assertEquals(1, upstream.subscriptionCount)
}
}
Loading

0 comments on commit ff29dd2

Please sign in to comment.