diff --git a/Operators.md b/Operators.md index 3c562d8..bb4ff6d 100644 --- a/Operators.md +++ b/Operators.md @@ -5,6 +5,8 @@ ReactiveX的每种编程语言的实现都实现了一组操作符的集合。 本文首先会给出ReactiveX的核心操作符列表和对应的文档链接,后面还有一个决策树用于帮助你根据具体的场景选择合适的操作符。最后有一个语言特定实现的按字母排序的操作符列表。 +如果你想实现你自己的操作符,可以参考这里:[实现自定义操作符](topics/Implementing-Your-Own-Operators.md) + ## 操作符目录 ### 创建操作 diff --git a/images/operators/catch.png b/images/operators/catch.png new file mode 100644 index 0000000..2c40180 Binary files /dev/null and b/images/operators/catch.png differ diff --git a/images/operators/retry.C.png b/images/operators/retry.C.png new file mode 100644 index 0000000..1934ded Binary files /dev/null and b/images/operators/retry.C.png differ diff --git a/images/operators/retryWhen.f.png b/images/operators/retryWhen.f.png new file mode 100644 index 0000000..e117f7f Binary files /dev/null and b/images/operators/retryWhen.f.png differ diff --git a/operators/Catch.md b/operators/Catch.md new file mode 100644 index 0000000..9dc81a2 --- /dev/null +++ b/operators/Catch.md @@ -0,0 +1,52 @@ +## Catch + +从`onError`通知中恢复发射数据 + +![catch](../images/operators/catch.png) + +`Catch`操作符拦截原始Observable的`onError`通知,将它替换为其它的数据项或数据序列,让产生的Observable能够正常终止或者根本不终止。 + +在某些ReactiveX的实现中,有一个叫`onErrorResumeNext`的操作符,它的行为与`Catch`相似。 + +RxJava将`Catch`实现为三个不同的操作符: + +**`onErrorReturn`** + +让Observable遇到错误时发射一个特殊的项并且正常终止。 + +**`onErrorResumeNext`** + +让Observable在遇到错误时开始发射第二个Observable的数据序列。 + +**`onExceptionResumeNext`** + +让Observable在遇到错误时继续发射后面的数据项。 + +### onErrorReturn + +![onErrorReturn](../images/operators/onErrorReturn.png) + +`onErrorReturn`方法返回一个镜像原有Observable行为的新Observable,后者会忽略前者的`onError`调用,不会将错误传递给观察者,作为替代,它会发发射一个特殊的项并调用观察者的`onComleted`方法。 + +* Javadoc: [onErrorReturn(Func1)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#onErrorReturn(rx.functions.Func1)) + +### onErrorResumeNext + +![onErrorResumeNext](../images/operators/onErrorResumeNext.png) + +`onErrorResumeNext`方法返回一个镜像原有Observable行为的新Observable,后者会忽略前者的`onError`调用,不会将错误传递给观察者,作为替代,它会开始镜像另一个,备用的Observable。 + +* Javadoc: [onErrorResumeNext(Func1)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#onErrorResumeNext(rx.functions.Func1)) +* Javadoc: [onErrorResumeNext(Observable)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#onErrorResumeNext(rx.Observable)) + +### onExceptionResumeNext + +![onExceptionResumeNext](../images/operators/onExceptionResumeNextViaObservable.png) + +和`onErrorResumeNext`类似,`onExceptionResumeNext`方法返回一个镜像原有Observable行为的新Observable,也使用一个备用的Observable,不同的是,如果`onError`收到的`Throwable`不是一个`Exception`,它会将错误传递给观察者的`onError`方法,不会使用备用的Observable。 + +* Javadoc: [onExceptionResumeNext(Observable)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#onExceptionResumeNext(rx.Observable)) + + + + diff --git a/operators/Error-Handling-Operators.md b/operators/Error-Handling-Operators.md index 914c212..99615ee 100644 --- a/operators/Error-Handling-Operators.md +++ b/operators/Error-Handling-Operators.md @@ -7,8 +7,8 @@ 这是操作符列表: -* [**`onErrorResumeNext( )`**](Catch.md) — 指示Observable在遇到错误时发送一个数据序列 -* [**`onErrorReturn( )`**](Catch.md) — 指示Observable在遇到错误时发送一个特定的数据 -* [**`onExceptionResumeNext( )`**](Catch.md) — instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable)指示Observable遇到错误时继续发送数据 -* [**`retry( )`**](Retry.md) — 指示Observable遇到错误时重试 -* [**`retryWhen( )`**](Retry.md) — 指示Observable遇到错误时,将错误传递给另一个Observable来决定是否要重新给订阅这个Observable +* [**`onErrorResumeNext( )`**](Catch.md#onErrorResumeNext) — 指示Observable在遇到错误时发送一个数据序列 +* [**`onErrorReturn( )`**](Catch.md#onErrorReturn) — 指示Observable在遇到错误时发送一个特定的数据 +* [**`onExceptionResumeNext( )`**](Catch.md#onExceptionResumeNext) — instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable)指示Observable遇到错误时继续发送数据 +* [**`retry( )`**](Retry.md#retry) — 指示Observable遇到错误时重试 +* [**`retryWhen( )`**](Retry.md#retryWhen) — 指示Observable遇到错误时,将错误传递给另一个Observable来决定是否要重新给订阅这个Observable diff --git a/operators/Retry.md b/operators/Retry.md new file mode 100644 index 0000000..3f36dfd --- /dev/null +++ b/operators/Retry.md @@ -0,0 +1,60 @@ +## Retry + +如果原始Observable遇到错误,重新订阅它期望它能正常终止 + +![retry](../images/operators/retry.C.png) + +`Retry`操作符不会将原始Observable的`onError`通知传递给观察者,它会订阅这个Observable,再给它一次机会无错误地完成它的数据序列。`Retry`总是传递`onNext`通知给观察者,由于重新订阅,可能会造成数据项重复,如上图所示。 + +RxJava中的实现为`retry`和`retryWhen`。 + +无论收到多少次`onError`通知,无参数版本的`retry`都会继续订阅并反射原始Observable。 + +接受单个`count`参数的`retry`会最多重新订阅指定的次数,如果次数超了,它不会尝试再次订阅,它会把最新的一个`onError`通知传递给它的观察者。 + +还有一个版本的`retry`接受一个谓词函数作为参数,这个函数的两个参数是:重试次数和导致发送`onError`通知的`Throwable`。这个函数返回一个布尔值,如果返回`true`,`retry`应该再次订阅和镜像原始的Observable,如果返回`false`,`retry`会将最新的一个`onError`通知传递给它的观察者。 + +`retry`操作符默认在`trampoline`调度器上执行。 + +* Javadoc: [retry()](http://reactivex.io/RxJava/javadoc/rx/Observable.html#retry()) +* Javadoc: [retry(long)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#retry(long)) +* Javadoc: [retry(Func2)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#retry(rx.functions.Func2)) + +### retryWhen + +![retryWhen](../images/operators/retryWhen.f.png) + +`retryWhen`和`retry`类似,区别是,`retryWhen`将`onError`中的`Throwable`传递给一个函数,这个函数产生另一个Observable,`retryWhen`观察它的结果再决定是不是要重新订阅原始的Observable。如果这个Observable发射了一项数据,它就重新订阅,如果这个Observable发送的是`onError`通知,它就将这个通知传递给观察者然后终止。 + +`retryWhen`默认在`trampoline`调度器上执行,你可以通过参数指定其它的调度器。 + +示例代码 + +```java + +Observable.create((Subscriber s) -> { + System.out.println("subscribing"); + s.onError(new RuntimeException("always fails")); + }).retryWhen(attempts -> { + return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> { + System.out.println("delay retry by " + i + " second(s)"); + return Observable.timer(i, TimeUnit.SECONDS); + }); + }).toBlocking().forEach(System.out::println); + +``` + +输出 + +``` +subscribing +delay retry by 1 second(s) +subscribing +delay retry by 2 second(s) +subscribing +delay retry by 3 second(s) +subscribing +``` + +* Javadoc: [retryWhen(Func1)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#retryWhen(rx.functions.Func1)) +* Javadoc: [retryWhen(Func1,Scheduler)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#retryWhen(rx.functions.Func1,%20rx.Scheduler))