From ff29dd2f6578166a4fb5eed9af962708b2cb6188 Mon Sep 17 00:00:00 2001 From: Arkadii Ivanov Date: Wed, 21 Aug 2024 12:47:39 +0300 Subject: [PATCH] Added retryWhen operator --- reaktive/api/android/reaktive.api | 16 ++ reaktive/api/jvm/reaktive.api | 16 ++ .../badoo/reaktive/completable/RetryWhen.kt | 16 ++ .../com/badoo/reaktive/maybe/RetryWhen.kt | 16 ++ .../badoo/reaktive/observable/RetryWhen.kt | 57 +++++ .../com/badoo/reaktive/single/RetryWhen.kt | 16 ++ .../reaktive/completable/RetryWhenTest.kt | 197 ++++++++++++++++ .../com/badoo/reaktive/maybe/RetryWhenTest.kt | 219 ++++++++++++++++++ .../reaktive/observable/RetryWhenTest.kt | 215 +++++++++++++++++ .../badoo/reaktive/single/RetryWhenTest.kt | 188 +++++++++++++++ 10 files changed, 956 insertions(+) create mode 100644 reaktive/src/commonMain/kotlin/com/badoo/reaktive/completable/RetryWhen.kt create mode 100644 reaktive/src/commonMain/kotlin/com/badoo/reaktive/maybe/RetryWhen.kt create mode 100644 reaktive/src/commonMain/kotlin/com/badoo/reaktive/observable/RetryWhen.kt create mode 100644 reaktive/src/commonMain/kotlin/com/badoo/reaktive/single/RetryWhen.kt create mode 100644 reaktive/src/commonTest/kotlin/com/badoo/reaktive/completable/RetryWhenTest.kt create mode 100644 reaktive/src/commonTest/kotlin/com/badoo/reaktive/maybe/RetryWhenTest.kt create mode 100644 reaktive/src/commonTest/kotlin/com/badoo/reaktive/observable/RetryWhenTest.kt create mode 100644 reaktive/src/commonTest/kotlin/com/badoo/reaktive/single/RetryWhenTest.kt diff --git a/reaktive/api/android/reaktive.api b/reaktive/api/android/reaktive.api index e43502ba6..9f51343fd 100644 --- a/reaktive/api/android/reaktive.api +++ b/reaktive/api/android/reaktive.api @@ -201,6 +201,10 @@ public final class com/badoo/reaktive/completable/RetryKt { public static synthetic fun retry$default (Lcom/badoo/reaktive/completable/Completable;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lcom/badoo/reaktive/completable/Completable; } +public final class com/badoo/reaktive/completable/RetryWhenKt { + public static final fun retryWhen (Lcom/badoo/reaktive/completable/Completable;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/completable/Completable; +} + public final class com/badoo/reaktive/completable/SubscribeKt { public static final fun subscribe (Lcom/badoo/reaktive/completable/Completable;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;)Lcom/badoo/reaktive/disposable/Disposable; public static synthetic fun subscribe$default (Lcom/badoo/reaktive/completable/Completable;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;ILjava/lang/Object;)Lcom/badoo/reaktive/disposable/Disposable; @@ -492,6 +496,10 @@ public final class com/badoo/reaktive/maybe/RetryKt { public static synthetic fun retry$default (Lcom/badoo/reaktive/maybe/Maybe;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lcom/badoo/reaktive/maybe/Maybe; } +public final class com/badoo/reaktive/maybe/RetryWhenKt { + public static final fun retryWhen (Lcom/badoo/reaktive/maybe/Maybe;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/maybe/Maybe; +} + public final class com/badoo/reaktive/maybe/SubscribeKt { public static final fun subscribe (Lcom/badoo/reaktive/maybe/Maybe;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/disposable/Disposable; public static synthetic fun subscribe$default (Lcom/badoo/reaktive/maybe/Maybe;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lcom/badoo/reaktive/disposable/Disposable; @@ -861,6 +869,10 @@ public final class com/badoo/reaktive/observable/RetryKt { public static synthetic fun retry$default (Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lcom/badoo/reaktive/observable/Observable; } +public final class com/badoo/reaktive/observable/RetryWhenKt { + public static final fun retryWhen (Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/observable/Observable; +} + public final class com/badoo/reaktive/observable/SampleKt { public static final fun sample-8Mi8wO0 (Lcom/badoo/reaktive/observable/Observable;JLcom/badoo/reaktive/scheduler/Scheduler;)Lcom/badoo/reaktive/observable/Observable; } @@ -1246,6 +1258,10 @@ public final class com/badoo/reaktive/single/RetryKt { public static synthetic fun retry$default (Lcom/badoo/reaktive/single/Single;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lcom/badoo/reaktive/single/Single; } +public final class com/badoo/reaktive/single/RetryWhenKt { + public static final fun retryWhen (Lcom/badoo/reaktive/single/Single;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/single/Single; +} + public abstract interface class com/badoo/reaktive/single/Single : com/badoo/reaktive/base/Source { } diff --git a/reaktive/api/jvm/reaktive.api b/reaktive/api/jvm/reaktive.api index e43502ba6..9f51343fd 100644 --- a/reaktive/api/jvm/reaktive.api +++ b/reaktive/api/jvm/reaktive.api @@ -201,6 +201,10 @@ public final class com/badoo/reaktive/completable/RetryKt { public static synthetic fun retry$default (Lcom/badoo/reaktive/completable/Completable;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lcom/badoo/reaktive/completable/Completable; } +public final class com/badoo/reaktive/completable/RetryWhenKt { + public static final fun retryWhen (Lcom/badoo/reaktive/completable/Completable;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/completable/Completable; +} + public final class com/badoo/reaktive/completable/SubscribeKt { public static final fun subscribe (Lcom/badoo/reaktive/completable/Completable;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;)Lcom/badoo/reaktive/disposable/Disposable; public static synthetic fun subscribe$default (Lcom/badoo/reaktive/completable/Completable;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;ILjava/lang/Object;)Lcom/badoo/reaktive/disposable/Disposable; @@ -492,6 +496,10 @@ public final class com/badoo/reaktive/maybe/RetryKt { public static synthetic fun retry$default (Lcom/badoo/reaktive/maybe/Maybe;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lcom/badoo/reaktive/maybe/Maybe; } +public final class com/badoo/reaktive/maybe/RetryWhenKt { + public static final fun retryWhen (Lcom/badoo/reaktive/maybe/Maybe;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/maybe/Maybe; +} + public final class com/badoo/reaktive/maybe/SubscribeKt { public static final fun subscribe (Lcom/badoo/reaktive/maybe/Maybe;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/disposable/Disposable; public static synthetic fun subscribe$default (Lcom/badoo/reaktive/maybe/Maybe;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lcom/badoo/reaktive/disposable/Disposable; @@ -861,6 +869,10 @@ public final class com/badoo/reaktive/observable/RetryKt { public static synthetic fun retry$default (Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lcom/badoo/reaktive/observable/Observable; } +public final class com/badoo/reaktive/observable/RetryWhenKt { + public static final fun retryWhen (Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/observable/Observable; +} + public final class com/badoo/reaktive/observable/SampleKt { public static final fun sample-8Mi8wO0 (Lcom/badoo/reaktive/observable/Observable;JLcom/badoo/reaktive/scheduler/Scheduler;)Lcom/badoo/reaktive/observable/Observable; } @@ -1246,6 +1258,10 @@ public final class com/badoo/reaktive/single/RetryKt { public static synthetic fun retry$default (Lcom/badoo/reaktive/single/Single;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lcom/badoo/reaktive/single/Single; } +public final class com/badoo/reaktive/single/RetryWhenKt { + public static final fun retryWhen (Lcom/badoo/reaktive/single/Single;Lkotlin/jvm/functions/Function1;)Lcom/badoo/reaktive/single/Single; +} + public abstract interface class com/badoo/reaktive/single/Single : com/badoo/reaktive/base/Source { } diff --git a/reaktive/src/commonMain/kotlin/com/badoo/reaktive/completable/RetryWhen.kt b/reaktive/src/commonMain/kotlin/com/badoo/reaktive/completable/RetryWhen.kt new file mode 100644 index 000000000..297e63eee --- /dev/null +++ b/reaktive/src/commonMain/kotlin/com/badoo/reaktive/completable/RetryWhen.kt @@ -0,0 +1,16 @@ +package com.badoo.reaktive.completable + +import com.badoo.reaktive.observable.Observable +import com.badoo.reaktive.observable.asCompletable +import com.badoo.reaktive.observable.retryWhen + +/** + * Returns a [Completable] that automatically resubscribes to this [Completable] if it signals `onError` + * and the [Observable] returned by the [handler] function emits a value for that specific [Throwable]. + * + * Please refer to the corresponding RxJava [document](https://reactivex.io/RxJava/javadoc/io/reactivex/Completable.html#retryWhen-io.reactivex.functions.Function-). + */ +fun Completable.retryWhen(handler: (Observable) -> Observable<*>): Completable = + asObservable() + .retryWhen(handler) + .asCompletable() diff --git a/reaktive/src/commonMain/kotlin/com/badoo/reaktive/maybe/RetryWhen.kt b/reaktive/src/commonMain/kotlin/com/badoo/reaktive/maybe/RetryWhen.kt new file mode 100644 index 000000000..cf1e319f1 --- /dev/null +++ b/reaktive/src/commonMain/kotlin/com/badoo/reaktive/maybe/RetryWhen.kt @@ -0,0 +1,16 @@ +package com.badoo.reaktive.maybe + +import com.badoo.reaktive.observable.Observable +import com.badoo.reaktive.observable.firstOrComplete +import com.badoo.reaktive.observable.retryWhen + +/** + * Returns a [Maybe] that automatically resubscribes to this [Maybe] if it signals `onError` + * and the [Observable] returned by the [handler] function emits a value for that specific [Throwable]. + * + * Please refer to the corresponding RxJava [document](https://reactivex.io/RxJava/javadoc/io/reactivex/Maybe.html#retryWhen-io.reactivex.functions.Function-). + */ +fun Maybe.retryWhen(handler: (Observable) -> Observable<*>): Maybe = + asObservable() + .retryWhen(handler) + .firstOrComplete() diff --git a/reaktive/src/commonMain/kotlin/com/badoo/reaktive/observable/RetryWhen.kt b/reaktive/src/commonMain/kotlin/com/badoo/reaktive/observable/RetryWhen.kt new file mode 100644 index 000000000..ddad524a6 --- /dev/null +++ b/reaktive/src/commonMain/kotlin/com/badoo/reaktive/observable/RetryWhen.kt @@ -0,0 +1,57 @@ +package com.badoo.reaktive.observable + +import com.badoo.reaktive.base.CompleteCallback +import com.badoo.reaktive.base.ValueCallback +import com.badoo.reaktive.completable.CompletableCallbacks +import com.badoo.reaktive.disposable.CompositeDisposable +import com.badoo.reaktive.disposable.Disposable +import com.badoo.reaktive.disposable.SerialDisposable +import com.badoo.reaktive.disposable.plusAssign +import com.badoo.reaktive.subject.publish.PublishSubject +import com.badoo.reaktive.utils.atomic.AtomicBoolean + +/** + * Returns an [Observable] that automatically resubscribes to this [Observable] if it signals `onError` + * and the [Observable] returned by the [handler] function emits a value for that specific [Throwable]. + * + * Please refer to the corresponding RxJava [document](https://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#retryWhen-io.reactivex.functions.Function-). + */ +fun Observable.retryWhen(handler: (Observable) -> Observable<*>): Observable = + observable { emitter -> + val disposables = CompositeDisposable() + emitter.setDisposable(disposables) + + val errorSubject = PublishSubject() + val isError = AtomicBoolean() + + val disposableObserver = + object : SerialDisposable(), ObservableObserver, ValueCallback by emitter, CompleteCallback by emitter { + override fun onSubscribe(disposable: Disposable) { + replace(disposable) + } + + override fun onError(error: Throwable) { + replace(null) + isError.value = true + errorSubject.onNext(error) + } + } + + disposables += disposableObserver + + handler(errorSubject).subscribe( + object : ObservableObserver, CompletableCallbacks by emitter { + override fun onSubscribe(disposable: Disposable) { + disposables += disposable + } + + override fun onNext(value: Any?) { + if (isError.compareAndSet(true, false)) { + subscribe(disposableObserver) + } + } + } + ) + + subscribe(disposableObserver) + } diff --git a/reaktive/src/commonMain/kotlin/com/badoo/reaktive/single/RetryWhen.kt b/reaktive/src/commonMain/kotlin/com/badoo/reaktive/single/RetryWhen.kt new file mode 100644 index 000000000..5f0a9acb2 --- /dev/null +++ b/reaktive/src/commonMain/kotlin/com/badoo/reaktive/single/RetryWhen.kt @@ -0,0 +1,16 @@ +package com.badoo.reaktive.single + +import com.badoo.reaktive.observable.Observable +import com.badoo.reaktive.observable.firstOrError +import com.badoo.reaktive.observable.retryWhen + +/** + * Returns a [Single] that automatically resubscribes to this [Single] if it signals `onError` + * and the [Observable] returned by the [handler] function emits a value for that specific [Throwable]. + * + * Please refer to the corresponding RxJava [document](https://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#retryWhen-io.reactivex.functions.Function-). + */ +fun Single.retryWhen(handler: (Observable) -> Observable<*>): Single = + asObservable() + .retryWhen(handler) + .firstOrError() diff --git a/reaktive/src/commonTest/kotlin/com/badoo/reaktive/completable/RetryWhenTest.kt b/reaktive/src/commonTest/kotlin/com/badoo/reaktive/completable/RetryWhenTest.kt new file mode 100644 index 000000000..9c8a3fa63 --- /dev/null +++ b/reaktive/src/commonTest/kotlin/com/badoo/reaktive/completable/RetryWhenTest.kt @@ -0,0 +1,197 @@ +package com.badoo.reaktive.completable + +import com.badoo.reaktive.observable.flatMap +import com.badoo.reaktive.observable.observableOf +import com.badoo.reaktive.observable.retryWhen +import com.badoo.reaktive.test.base.assertError +import com.badoo.reaktive.test.base.assertNotError +import com.badoo.reaktive.test.base.hasSubscribers +import com.badoo.reaktive.test.completable.TestCompletable +import com.badoo.reaktive.test.completable.assertComplete +import com.badoo.reaktive.test.completable.test +import com.badoo.reaktive.test.observable.TestObservable +import com.badoo.reaktive.test.observable.TestObservableObserver +import com.badoo.reaktive.test.observable.assertValue +import com.badoo.reaktive.test.observable.test +import kotlin.test.Ignore +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertNull +import kotlin.test.assertTrue + +class RetryWhenTest : CompletableToCompletableTests by CompletableToCompletableTestsImpl({ retryWhen { it } }) { + + private val upstream = TestCompletable() + private val retryObservable = TestObservable() + + @Ignore + @Test + override fun disposes_downstream_disposable_WHEN_upstream_produced_error() { + // Not applicable + } + + @Ignore + @Test + override fun produces_error_WHEN_upstream_produced_error() { + // Not applicable + } + + @Test + fun calls_handler_WHEN_subscribed() { + var isCalled = false + + upstream + .retryWhen { + isCalled = true + retryObservable + } + .test() + + assertTrue(isCalled) + } + + @Test + fun produces_error_WHEN_handler_throws() { + val exception = Exception() + + val observer = upstream.retryWhen { throw exception }.test() + + observer.assertError(exception) + } + + @Test + fun subscribes_to_retry_observable_WHEN_subscribed() { + upstream.retryWhen { retryObservable }.test() + + assertEquals(1, retryObservable.subscriptionCount) + } + + @Test + fun disposes_retry_observable_WHEN_disposed() { + val observer = upstream.retryWhen { retryObservable }.test() + + observer.dispose() + + assertFalse(retryObservable.hasSubscribers) + } + + @Test + fun disposes_retry_observable_WHEN_upstream_completed() { + upstream.retryWhen { retryObservable }.test() + + upstream.onComplete() + + assertFalse(retryObservable.hasSubscribers) + } + + @Test + fun does_not_dispose_retry_observable_WHEN_upstream_produced_error() { + upstream.retryWhen { retryObservable }.test() + + upstream.onError(Exception()) + + assertEquals(1, retryObservable.subscriptionCount) + } + + @Test + fun does_not_produce_error_WHEN_upstream_produced_error() { + val observer = upstream.retryWhen { retryObservable }.test() + + upstream.onError(Exception()) + + assertNull(observer.error) + } + + @Test + fun does_not_complete_WHEN_upstream_produced_error() { + val observer = upstream.retryWhen { retryObservable }.test() + + upstream.onError(Exception()) + + assertFalse(observer.isComplete) + } + + @Test + fun error_observable_emits_exception_WHEN_upstream_produced_error() { + val exception = Exception() + lateinit var errorObserver: TestObservableObserver + + upstream + .retryWhen { + errorObserver = it.test() + retryObservable + } + .test() + + upstream.onError(exception) + + errorObserver.assertValue(exception) + } + + @Test + fun completes_WHEN_retry_observable_completed() { + val observer = upstream.retryWhen { retryObservable }.test() + + retryObservable.onComplete() + + observer.assertComplete() + } + + @Test + fun produces_error_WHEN_retry_observable_produced_error() { + val exception = Exception() + val observer = upstream.retryWhen { retryObservable }.test() + + retryObservable.onError(exception) + + observer.assertError(exception) + } + + @Test + fun does_not_produce_error_WHEN_retry_observable_emitted_value() { + val observer = upstream.retryWhen { retryObservable }.test() + + retryObservable.onNext(1) + + observer.assertNotError() + } + + @Test + fun subscribes_to_upstream_WHEN_upstream_produced_error_and_retry_observable_emitted_value() { + upstream.retryWhen { retryObservable }.test() + upstream.onError(Exception()) + upstream.reset() + + retryObservable.onNext(1) + + assertEquals(1, upstream.subscriptionCount) + } + + @Test + fun completes_WHEN_resubscribed_after_error_and_upstream_completed() { + val observer = upstream.retryWhen { retryObservable }.test() + upstream.onError(Exception()) + upstream.reset() + retryObservable.onNext(1) + + upstream.onComplete() + + observer.assertComplete() + } + + @Test + fun resubscribes_to_upstream_only_once_WHEN_upstream_produced_error_and_retry_observable_emitted_multiple_values_synchronously() { + upstream + .retryWhen { errors -> + errors.flatMap { + upstream.reset() + observableOf(1, 2) + } + }.test() + + upstream.onError(Exception()) + + assertEquals(1, upstream.subscriptionCount) + } +} diff --git a/reaktive/src/commonTest/kotlin/com/badoo/reaktive/maybe/RetryWhenTest.kt b/reaktive/src/commonTest/kotlin/com/badoo/reaktive/maybe/RetryWhenTest.kt new file mode 100644 index 000000000..5c7bbbdd1 --- /dev/null +++ b/reaktive/src/commonTest/kotlin/com/badoo/reaktive/maybe/RetryWhenTest.kt @@ -0,0 +1,219 @@ +package com.badoo.reaktive.maybe + +import com.badoo.reaktive.observable.flatMap +import com.badoo.reaktive.observable.observableOf +import com.badoo.reaktive.observable.retryWhen +import com.badoo.reaktive.test.base.assertError +import com.badoo.reaktive.test.base.assertNotError +import com.badoo.reaktive.test.base.hasSubscribers +import com.badoo.reaktive.test.maybe.TestMaybe +import com.badoo.reaktive.test.maybe.assertComplete +import com.badoo.reaktive.test.maybe.assertSuccess +import com.badoo.reaktive.test.maybe.test +import com.badoo.reaktive.test.observable.TestObservable +import com.badoo.reaktive.test.observable.TestObservableObserver +import com.badoo.reaktive.test.observable.assertValue +import com.badoo.reaktive.test.observable.test +import kotlin.test.Ignore +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertNull +import kotlin.test.assertTrue + +class RetryWhenTest : MaybeToMaybeTests by MaybeToMaybeTestsImpl({ retryWhen { it } }) { + + private val upstream = TestMaybe() + private val retryObservable = TestObservable() + + @Ignore + @Test + override fun disposes_downstream_disposable_WHEN_upstream_produced_error() { + // Not applicable + } + + @Ignore + @Test + override fun produces_error_WHEN_upstream_produced_error() { + // Not applicable + } + + @Test + fun calls_handler_WHEN_subscribed() { + var isCalled = false + + upstream + .retryWhen { + isCalled = true + retryObservable + } + .test() + + assertTrue(isCalled) + } + + @Test + fun produces_error_WHEN_handler_throws() { + val exception = Exception() + + val observer = upstream.retryWhen { throw exception }.test() + + observer.assertError(exception) + } + + @Test + fun subscribes_to_retry_observable_WHEN_subscribed() { + upstream.retryWhen { retryObservable }.test() + + assertEquals(1, retryObservable.subscriptionCount) + } + + @Test + fun disposes_retry_observable_WHEN_disposed() { + val observer = upstream.retryWhen { retryObservable }.test() + + observer.dispose() + + assertFalse(retryObservable.hasSubscribers) + } + + @Test + fun disposes_retry_observable_WHEN_upstream_completed() { + upstream.retryWhen { retryObservable }.test() + + upstream.onComplete() + + assertFalse(retryObservable.hasSubscribers) + } + + @Test + fun disposes_retry_observable_WHEN_upstream_succeeded() { + upstream.retryWhen { retryObservable }.test() + + upstream.onSuccess(1) + + assertFalse(retryObservable.hasSubscribers) + } + + @Test + fun does_not_dispose_retry_observable_WHEN_upstream_produced_error() { + upstream.retryWhen { retryObservable }.test() + + upstream.onError(Exception()) + + assertEquals(1, retryObservable.subscriptionCount) + } + + @Test + fun does_not_produce_error_WHEN_upstream_produced_error() { + val observer = upstream.retryWhen { retryObservable }.test() + + upstream.onError(Exception()) + + assertNull(observer.error) + } + + @Test + fun does_not_complete_WHEN_upstream_produced_error() { + val observer = upstream.retryWhen { retryObservable }.test() + + upstream.onError(Exception()) + + assertFalse(observer.isComplete) + } + + @Test + fun error_observable_emits_exception_WHEN_upstream_produced_error() { + val exception = Exception() + lateinit var errorObserver: TestObservableObserver + + upstream + .retryWhen { + errorObserver = it.test() + retryObservable + } + .test() + + upstream.onError(exception) + + errorObserver.assertValue(exception) + } + + @Test + fun completes_WHEN_retry_observable_completed() { + val observer = upstream.retryWhen { retryObservable }.test() + + retryObservable.onComplete() + + observer.assertComplete() + } + + @Test + fun produces_error_WHEN_retry_observable_produced_error() { + val exception = Exception() + val observer = upstream.retryWhen { retryObservable }.test() + + retryObservable.onError(exception) + + observer.assertError(exception) + } + + @Test + fun does_not_produce_error_WHEN_retry_observable_emitted_value() { + val observer = upstream.retryWhen { retryObservable }.test() + + retryObservable.onNext(1) + + observer.assertNotError() + } + + @Test + fun subscribes_to_upstream_WHEN_upstream_produced_error_and_retry_observable_emitted_value() { + upstream.retryWhen { retryObservable }.test() + upstream.onError(Exception()) + upstream.reset() + + retryObservable.onNext(1) + + assertEquals(1, upstream.subscriptionCount) + } + + @Test + fun succeeds_WHEN_resubscribed_after_error_and_upstream_succeeded() { + val observer = upstream.retryWhen { retryObservable }.test() + upstream.onError(Exception()) + upstream.reset() + retryObservable.onNext(1) + + upstream.onSuccess(1) + + observer.assertSuccess(1) + } + + @Test + fun completes_WHEN_resubscribed_after_error_and_upstream_completed() { + val observer = upstream.retryWhen { retryObservable }.test() + upstream.onError(Exception()) + upstream.reset() + retryObservable.onNext(1) + + upstream.onComplete() + + observer.assertComplete() + } + + @Test + fun resubscribes_to_upstream_only_once_WHEN_upstream_produced_error_and_retry_observable_emitted_multiple_values_synchronously() { + upstream + .retryWhen { errors -> + errors.flatMap { + upstream.reset() + observableOf(1, 2) + } + }.test() + + upstream.onError(Exception()) + + assertEquals(1, upstream.subscriptionCount) + } +} diff --git a/reaktive/src/commonTest/kotlin/com/badoo/reaktive/observable/RetryWhenTest.kt b/reaktive/src/commonTest/kotlin/com/badoo/reaktive/observable/RetryWhenTest.kt new file mode 100644 index 000000000..f8f19be35 --- /dev/null +++ b/reaktive/src/commonTest/kotlin/com/badoo/reaktive/observable/RetryWhenTest.kt @@ -0,0 +1,215 @@ +package com.badoo.reaktive.observable + +import com.badoo.reaktive.test.base.assertError +import com.badoo.reaktive.test.base.assertNotError +import com.badoo.reaktive.test.base.hasSubscribers +import com.badoo.reaktive.test.observable.TestObservable +import com.badoo.reaktive.test.observable.TestObservableObserver +import com.badoo.reaktive.test.observable.assertComplete +import com.badoo.reaktive.test.observable.assertValue +import com.badoo.reaktive.test.observable.assertValues +import com.badoo.reaktive.test.observable.onNext +import com.badoo.reaktive.test.observable.test +import kotlin.test.Ignore +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertNull +import kotlin.test.assertTrue + +class RetryWhenTest : ObservableToObservableTests by ObservableToObservableTestsImpl({ retryWhen { it } }) { + + private val upstream = TestObservable() + private val retryObservable = TestObservable() + + @Ignore + @Test + override fun disposes_downstream_disposable_WHEN_upstream_produced_error() { + // Not applicable + } + + @Ignore + @Test + override fun produces_error_WHEN_upstream_produced_error() { + // Not applicable + } + + @Test + fun calls_handler_WHEN_subscribed() { + var isCalled = false + + upstream + .retryWhen { + isCalled = true + retryObservable + } + .test() + + assertTrue(isCalled) + } + + @Test + fun produces_error_WHEN_handler_throws() { + val exception = Exception() + + val observer = upstream.retryWhen { throw exception }.test() + + observer.assertError(exception) + } + + @Test + fun subscribes_to_retry_observable_WHEN_subscribed() { + upstream.retryWhen { retryObservable }.test() + + assertEquals(1, retryObservable.subscriptionCount) + } + + @Test + fun disposes_retry_observable_WHEN_disposed() { + val observer = upstream.retryWhen { retryObservable }.test() + + observer.dispose() + + assertFalse(retryObservable.hasSubscribers) + } + + @Test + fun disposes_retry_observable_WHEN_upstream_completed() { + upstream.retryWhen { retryObservable }.test() + + upstream.onComplete() + + assertFalse(retryObservable.hasSubscribers) + } + + @Test + fun does_not_dispose_retry_observable_WHEN_upstream_produced_value() { + upstream.retryWhen { retryObservable }.test() + + upstream.onNext(1) + + assertEquals(1, retryObservable.subscriptionCount) + } + + @Test + fun does_not_dispose_retry_observable_WHEN_upstream_produced_error() { + upstream.retryWhen { retryObservable }.test() + + upstream.onError(Exception()) + + assertEquals(1, retryObservable.subscriptionCount) + } + + @Test + fun does_not_produce_error_WHEN_upstream_produced_error() { + val observer = upstream.retryWhen { retryObservable }.test() + + upstream.onError(Exception()) + + assertNull(observer.error) + } + + @Test + fun does_not_complete_WHEN_upstream_produced_error() { + val observer = upstream.retryWhen { retryObservable }.test() + + upstream.onError(Exception()) + + assertFalse(observer.isComplete) + } + + @Test + fun error_observable_emits_exception_WHEN_upstream_produced_error() { + val exception = Exception() + lateinit var errorObserver: TestObservableObserver + + upstream + .retryWhen { + errorObserver = it.test() + retryObservable + } + .test() + + upstream.onError(exception) + + errorObserver.assertValue(exception) + } + + @Test + fun completes_WHEN_retry_observable_completed() { + val observer = upstream.retryWhen { retryObservable }.test() + + retryObservable.onComplete() + + observer.assertComplete() + } + + @Test + fun produces_error_WHEN_retry_observable_produced_error() { + val exception = Exception() + val observer = upstream.retryWhen { retryObservable }.test() + + retryObservable.onError(exception) + + observer.assertError(exception) + } + + @Test + fun does_not_produce_error_WHEN_retry_observable_emitted_value() { + val observer = upstream.retryWhen { retryObservable }.test() + + retryObservable.onNext(1) + + observer.assertNotError() + } + + @Test + fun subscribes_to_upstream_WHEN_upstream_produced_error_and_retry_observable_emitted_value() { + upstream.retryWhen { retryObservable }.test() + upstream.onError(Exception()) + upstream.reset() + + retryObservable.onNext(1) + + assertEquals(1, upstream.subscriptionCount) + } + + @Test + fun emits_values_from_upstream_WHEN_resubscribed_after_error_and_upstream_emitted_values() { + val observer = upstream.retryWhen { retryObservable }.test() + upstream.onError(Exception()) + upstream.reset() + retryObservable.onNext(1) + + upstream.onNext(1, 2, 3) + + observer.assertValues(1, 2, 3) + } + + @Test + fun completes_WHEN_resubscribed_after_error_and_upstream_completed() { + val observer = upstream.retryWhen { retryObservable }.test() + upstream.onError(Exception()) + upstream.reset() + retryObservable.onNext(1) + + upstream.onComplete() + + observer.assertComplete() + } + + @Test + fun resubscribes_to_upstream_only_once_WHEN_upstream_produced_error_and_retry_observable_emitted_multiple_values_synchronously() { + upstream + .retryWhen { errors -> + errors.flatMap { + upstream.reset() + observableOf(1, 2) + } + }.test() + + upstream.onError(Exception()) + + assertEquals(1, upstream.subscriptionCount) + } +} diff --git a/reaktive/src/commonTest/kotlin/com/badoo/reaktive/single/RetryWhenTest.kt b/reaktive/src/commonTest/kotlin/com/badoo/reaktive/single/RetryWhenTest.kt new file mode 100644 index 000000000..bc665a353 --- /dev/null +++ b/reaktive/src/commonTest/kotlin/com/badoo/reaktive/single/RetryWhenTest.kt @@ -0,0 +1,188 @@ +package com.badoo.reaktive.single + +import com.badoo.reaktive.observable.flatMap +import com.badoo.reaktive.observable.observableOf +import com.badoo.reaktive.observable.retryWhen +import com.badoo.reaktive.test.base.assertError +import com.badoo.reaktive.test.base.assertNotError +import com.badoo.reaktive.test.base.hasSubscribers +import com.badoo.reaktive.test.observable.TestObservable +import com.badoo.reaktive.test.observable.TestObservableObserver +import com.badoo.reaktive.test.observable.assertValue +import com.badoo.reaktive.test.observable.test +import com.badoo.reaktive.test.single.TestSingle +import com.badoo.reaktive.test.single.assertSuccess +import com.badoo.reaktive.test.single.test +import kotlin.test.Ignore +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertNull +import kotlin.test.assertTrue + +class RetryWhenTest : SingleToSingleTests by SingleToSingleTestsImpl({ retryWhen { it } }) { + + private val upstream = TestSingle() + private val retryObservable = TestObservable() + + @Ignore + @Test + override fun disposes_downstream_disposable_WHEN_upstream_produced_error() { + // Not applicable + } + + @Ignore + @Test + override fun produces_error_WHEN_upstream_produced_error() { + // Not applicable + } + + @Test + fun calls_handler_WHEN_subscribed() { + var isCalled = false + + upstream + .retryWhen { + isCalled = true + retryObservable + } + .test() + + assertTrue(isCalled) + } + + @Test + fun produces_error_WHEN_handler_throws() { + val exception = Exception() + + val observer = upstream.retryWhen { throw exception }.test() + + observer.assertError(exception) + } + + @Test + fun subscribes_to_retry_observable_WHEN_subscribed() { + upstream.retryWhen { retryObservable }.test() + + assertEquals(1, retryObservable.subscriptionCount) + } + + @Test + fun disposes_retry_observable_WHEN_disposed() { + val observer = upstream.retryWhen { retryObservable }.test() + + observer.dispose() + + assertFalse(retryObservable.hasSubscribers) + } + + @Test + fun disposes_retry_observable_WHEN_upstream_succeeded() { + upstream.retryWhen { retryObservable }.test() + + upstream.onSuccess(1) + + assertFalse(retryObservable.hasSubscribers) + } + + @Test + fun does_not_dispose_retry_observable_WHEN_upstream_produced_error() { + upstream.retryWhen { retryObservable }.test() + + upstream.onError(Exception()) + + assertEquals(1, retryObservable.subscriptionCount) + } + + @Test + fun does_not_produce_error_WHEN_upstream_produced_error() { + val observer = upstream.retryWhen { retryObservable }.test() + + upstream.onError(Exception()) + + assertNull(observer.error) + } + + @Test + fun error_observable_emits_exception_WHEN_upstream_produced_error() { + val exception = Exception() + lateinit var errorObserver: TestObservableObserver + + upstream + .retryWhen { + errorObserver = it.test() + retryObservable + } + .test() + + upstream.onError(exception) + + errorObserver.assertValue(exception) + } + + @Test + fun produces_error_WHEN_retry_observable_completed() { + val observer = upstream.retryWhen { retryObservable }.test() + + retryObservable.onComplete() + + observer.assertError { it is NoSuchElementException } + } + + @Test + fun produces_error_WHEN_retry_observable_produced_error() { + val exception = Exception() + val observer = upstream.retryWhen { retryObservable }.test() + + retryObservable.onError(exception) + + observer.assertError(exception) + } + + @Test + fun does_not_produce_error_WHEN_retry_observable_emitted_value() { + val observer = upstream.retryWhen { retryObservable }.test() + + retryObservable.onNext(1) + + observer.assertNotError() + } + + @Test + fun subscribes_to_upstream_WHEN_upstream_produced_error_and_retry_observable_emitted_value() { + upstream.retryWhen { retryObservable }.test() + upstream.onError(Exception()) + upstream.reset() + + retryObservable.onNext(1) + + assertEquals(1, upstream.subscriptionCount) + } + + @Test + fun succeeds_WHEN_resubscribed_after_error_and_upstream_succeeded() { + val observer = upstream.retryWhen { retryObservable }.test() + upstream.onError(Exception()) + upstream.reset() + retryObservable.onNext(1) + + upstream.onSuccess(1) + + observer.assertSuccess(1) + } + + @Test + fun resubscribes_to_upstream_only_once_WHEN_upstream_produced_error_and_retry_observable_emitted_multiple_values_synchronously() { + upstream + .retryWhen { errors -> + errors.flatMap { + upstream.reset() + observableOf(1, 2) + } + }.test() + + upstream.onError(Exception()) + + assertEquals(1, upstream.subscriptionCount) + } +}