-
Notifications
You must be signed in to change notification settings - Fork 59
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #781 from arkivanov/retryWhen
Added retryWhen operator
- Loading branch information
Showing
10 changed files
with
956 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
16 changes: 16 additions & 0 deletions
16
reaktive/src/commonMain/kotlin/com/badoo/reaktive/completable/RetryWhen.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
package com.badoo.reaktive.completable | ||
|
||
import com.badoo.reaktive.observable.Observable | ||
import com.badoo.reaktive.observable.asCompletable | ||
import com.badoo.reaktive.observable.retryWhen | ||
|
||
/** | ||
* Returns a [Completable] that automatically resubscribes to this [Completable] if it signals `onError` | ||
* and the [Observable] returned by the [handler] function emits a value for that specific [Throwable]. | ||
* | ||
* Please refer to the corresponding RxJava [document](https://reactivex.io/RxJava/javadoc/io/reactivex/Completable.html#retryWhen-io.reactivex.functions.Function-). | ||
*/ | ||
fun Completable.retryWhen(handler: (Observable<Throwable>) -> Observable<*>): Completable = | ||
asObservable() | ||
.retryWhen(handler) | ||
.asCompletable() |
16 changes: 16 additions & 0 deletions
16
reaktive/src/commonMain/kotlin/com/badoo/reaktive/maybe/RetryWhen.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
package com.badoo.reaktive.maybe | ||
|
||
import com.badoo.reaktive.observable.Observable | ||
import com.badoo.reaktive.observable.firstOrComplete | ||
import com.badoo.reaktive.observable.retryWhen | ||
|
||
/** | ||
* Returns a [Maybe] that automatically resubscribes to this [Maybe] if it signals `onError` | ||
* and the [Observable] returned by the [handler] function emits a value for that specific [Throwable]. | ||
* | ||
* Please refer to the corresponding RxJava [document](https://reactivex.io/RxJava/javadoc/io/reactivex/Maybe.html#retryWhen-io.reactivex.functions.Function-). | ||
*/ | ||
fun <T> Maybe<T>.retryWhen(handler: (Observable<Throwable>) -> Observable<*>): Maybe<T> = | ||
asObservable() | ||
.retryWhen(handler) | ||
.firstOrComplete() |
57 changes: 57 additions & 0 deletions
57
reaktive/src/commonMain/kotlin/com/badoo/reaktive/observable/RetryWhen.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
package com.badoo.reaktive.observable | ||
|
||
import com.badoo.reaktive.base.CompleteCallback | ||
import com.badoo.reaktive.base.ValueCallback | ||
import com.badoo.reaktive.completable.CompletableCallbacks | ||
import com.badoo.reaktive.disposable.CompositeDisposable | ||
import com.badoo.reaktive.disposable.Disposable | ||
import com.badoo.reaktive.disposable.SerialDisposable | ||
import com.badoo.reaktive.disposable.plusAssign | ||
import com.badoo.reaktive.subject.publish.PublishSubject | ||
import com.badoo.reaktive.utils.atomic.AtomicBoolean | ||
|
||
/** | ||
* Returns an [Observable] that automatically resubscribes to this [Observable] if it signals `onError` | ||
* and the [Observable] returned by the [handler] function emits a value for that specific [Throwable]. | ||
* | ||
* Please refer to the corresponding RxJava [document](https://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#retryWhen-io.reactivex.functions.Function-). | ||
*/ | ||
fun <T> Observable<T>.retryWhen(handler: (Observable<Throwable>) -> Observable<*>): Observable<T> = | ||
observable { emitter -> | ||
val disposables = CompositeDisposable() | ||
emitter.setDisposable(disposables) | ||
|
||
val errorSubject = PublishSubject<Throwable>() | ||
val isError = AtomicBoolean() | ||
|
||
val disposableObserver = | ||
object : SerialDisposable(), ObservableObserver<T>, ValueCallback<T> by emitter, CompleteCallback by emitter { | ||
override fun onSubscribe(disposable: Disposable) { | ||
replace(disposable) | ||
} | ||
|
||
override fun onError(error: Throwable) { | ||
replace(null) | ||
isError.value = true | ||
errorSubject.onNext(error) | ||
} | ||
} | ||
|
||
disposables += disposableObserver | ||
|
||
handler(errorSubject).subscribe( | ||
object : ObservableObserver<Any?>, CompletableCallbacks by emitter { | ||
override fun onSubscribe(disposable: Disposable) { | ||
disposables += disposable | ||
} | ||
|
||
override fun onNext(value: Any?) { | ||
if (isError.compareAndSet(true, false)) { | ||
subscribe(disposableObserver) | ||
} | ||
} | ||
} | ||
) | ||
|
||
subscribe(disposableObserver) | ||
} |
16 changes: 16 additions & 0 deletions
16
reaktive/src/commonMain/kotlin/com/badoo/reaktive/single/RetryWhen.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
package com.badoo.reaktive.single | ||
|
||
import com.badoo.reaktive.observable.Observable | ||
import com.badoo.reaktive.observable.firstOrError | ||
import com.badoo.reaktive.observable.retryWhen | ||
|
||
/** | ||
* Returns a [Single] that automatically resubscribes to this [Single] if it signals `onError` | ||
* and the [Observable] returned by the [handler] function emits a value for that specific [Throwable]. | ||
* | ||
* Please refer to the corresponding RxJava [document](https://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#retryWhen-io.reactivex.functions.Function-). | ||
*/ | ||
fun <T> Single<T>.retryWhen(handler: (Observable<Throwable>) -> Observable<*>): Single<T> = | ||
asObservable() | ||
.retryWhen(handler) | ||
.firstOrError() |
197 changes: 197 additions & 0 deletions
197
reaktive/src/commonTest/kotlin/com/badoo/reaktive/completable/RetryWhenTest.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,197 @@ | ||
package com.badoo.reaktive.completable | ||
|
||
import com.badoo.reaktive.observable.flatMap | ||
import com.badoo.reaktive.observable.observableOf | ||
import com.badoo.reaktive.observable.retryWhen | ||
import com.badoo.reaktive.test.base.assertError | ||
import com.badoo.reaktive.test.base.assertNotError | ||
import com.badoo.reaktive.test.base.hasSubscribers | ||
import com.badoo.reaktive.test.completable.TestCompletable | ||
import com.badoo.reaktive.test.completable.assertComplete | ||
import com.badoo.reaktive.test.completable.test | ||
import com.badoo.reaktive.test.observable.TestObservable | ||
import com.badoo.reaktive.test.observable.TestObservableObserver | ||
import com.badoo.reaktive.test.observable.assertValue | ||
import com.badoo.reaktive.test.observable.test | ||
import kotlin.test.Ignore | ||
import kotlin.test.Test | ||
import kotlin.test.assertEquals | ||
import kotlin.test.assertFalse | ||
import kotlin.test.assertNull | ||
import kotlin.test.assertTrue | ||
|
||
class RetryWhenTest : CompletableToCompletableTests by CompletableToCompletableTestsImpl({ retryWhen { it } }) { | ||
|
||
private val upstream = TestCompletable() | ||
private val retryObservable = TestObservable<Any?>() | ||
|
||
@Ignore | ||
@Test | ||
override fun disposes_downstream_disposable_WHEN_upstream_produced_error() { | ||
// Not applicable | ||
} | ||
|
||
@Ignore | ||
@Test | ||
override fun produces_error_WHEN_upstream_produced_error() { | ||
// Not applicable | ||
} | ||
|
||
@Test | ||
fun calls_handler_WHEN_subscribed() { | ||
var isCalled = false | ||
|
||
upstream | ||
.retryWhen { | ||
isCalled = true | ||
retryObservable | ||
} | ||
.test() | ||
|
||
assertTrue(isCalled) | ||
} | ||
|
||
@Test | ||
fun produces_error_WHEN_handler_throws() { | ||
val exception = Exception() | ||
|
||
val observer = upstream.retryWhen { throw exception }.test() | ||
|
||
observer.assertError(exception) | ||
} | ||
|
||
@Test | ||
fun subscribes_to_retry_observable_WHEN_subscribed() { | ||
upstream.retryWhen { retryObservable }.test() | ||
|
||
assertEquals(1, retryObservable.subscriptionCount) | ||
} | ||
|
||
@Test | ||
fun disposes_retry_observable_WHEN_disposed() { | ||
val observer = upstream.retryWhen { retryObservable }.test() | ||
|
||
observer.dispose() | ||
|
||
assertFalse(retryObservable.hasSubscribers) | ||
} | ||
|
||
@Test | ||
fun disposes_retry_observable_WHEN_upstream_completed() { | ||
upstream.retryWhen { retryObservable }.test() | ||
|
||
upstream.onComplete() | ||
|
||
assertFalse(retryObservable.hasSubscribers) | ||
} | ||
|
||
@Test | ||
fun does_not_dispose_retry_observable_WHEN_upstream_produced_error() { | ||
upstream.retryWhen { retryObservable }.test() | ||
|
||
upstream.onError(Exception()) | ||
|
||
assertEquals(1, retryObservable.subscriptionCount) | ||
} | ||
|
||
@Test | ||
fun does_not_produce_error_WHEN_upstream_produced_error() { | ||
val observer = upstream.retryWhen { retryObservable }.test() | ||
|
||
upstream.onError(Exception()) | ||
|
||
assertNull(observer.error) | ||
} | ||
|
||
@Test | ||
fun does_not_complete_WHEN_upstream_produced_error() { | ||
val observer = upstream.retryWhen { retryObservable }.test() | ||
|
||
upstream.onError(Exception()) | ||
|
||
assertFalse(observer.isComplete) | ||
} | ||
|
||
@Test | ||
fun error_observable_emits_exception_WHEN_upstream_produced_error() { | ||
val exception = Exception() | ||
lateinit var errorObserver: TestObservableObserver<Any?> | ||
|
||
upstream | ||
.retryWhen { | ||
errorObserver = it.test() | ||
retryObservable | ||
} | ||
.test() | ||
|
||
upstream.onError(exception) | ||
|
||
errorObserver.assertValue(exception) | ||
} | ||
|
||
@Test | ||
fun completes_WHEN_retry_observable_completed() { | ||
val observer = upstream.retryWhen { retryObservable }.test() | ||
|
||
retryObservable.onComplete() | ||
|
||
observer.assertComplete() | ||
} | ||
|
||
@Test | ||
fun produces_error_WHEN_retry_observable_produced_error() { | ||
val exception = Exception() | ||
val observer = upstream.retryWhen { retryObservable }.test() | ||
|
||
retryObservable.onError(exception) | ||
|
||
observer.assertError(exception) | ||
} | ||
|
||
@Test | ||
fun does_not_produce_error_WHEN_retry_observable_emitted_value() { | ||
val observer = upstream.retryWhen { retryObservable }.test() | ||
|
||
retryObservable.onNext(1) | ||
|
||
observer.assertNotError() | ||
} | ||
|
||
@Test | ||
fun subscribes_to_upstream_WHEN_upstream_produced_error_and_retry_observable_emitted_value() { | ||
upstream.retryWhen { retryObservable }.test() | ||
upstream.onError(Exception()) | ||
upstream.reset() | ||
|
||
retryObservable.onNext(1) | ||
|
||
assertEquals(1, upstream.subscriptionCount) | ||
} | ||
|
||
@Test | ||
fun completes_WHEN_resubscribed_after_error_and_upstream_completed() { | ||
val observer = upstream.retryWhen { retryObservable }.test() | ||
upstream.onError(Exception()) | ||
upstream.reset() | ||
retryObservable.onNext(1) | ||
|
||
upstream.onComplete() | ||
|
||
observer.assertComplete() | ||
} | ||
|
||
@Test | ||
fun resubscribes_to_upstream_only_once_WHEN_upstream_produced_error_and_retry_observable_emitted_multiple_values_synchronously() { | ||
upstream | ||
.retryWhen { errors -> | ||
errors.flatMap { | ||
upstream.reset() | ||
observableOf(1, 2) | ||
} | ||
}.test() | ||
|
||
upstream.onError(Exception()) | ||
|
||
assertEquals(1, upstream.subscriptionCount) | ||
} | ||
} |
Oops, something went wrong.