-
Notifications
You must be signed in to change notification settings - Fork 7.6k
What's different in 3.0
- Introduction
-
Behavior changes
- More undeliverable errors
- Connectable source reset
- Flowable.publish pause
- Processor.offer null-check
- MulticastProcessor.offer fusion-check
- Group abandonment in groupBy
- Backpressure in groupBy
- Window abandonment in window
- CompositeException cause generation
- Parameter validation exception change
- From-callbacks upfront cancellation
- Using cleanup order
-
API changes
- Functional interfaces
- New Types
- Moved components
- API promotions
-
API additions
- replay with eagerTruncate
- concatMap with Scheduler
- Schedulers.from fair mode
- blockingForEach with buffer size
- blockingSubscribe
- Maybe.delay with delayError
- onErrorComplete
- Completable.onErrorResumeWith
- retryUntil
- switchOnNext
- Maybe.dematerialize
- from conversions
- timestamp and timeInterval
- toFuture
- ofType
- doOnLifecycle
- concatMap to another type
- concat with delayError
- Java 8 additions
- API renames
- API signature changes
- API removals
- Miscellaneous
Welcome to the new major release of RxJava, a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
As with every such releases, there have been quite a lot of trivial and non-trivial changes, cleanups and improvements all across the codebase, which warrant some detailed and comprehensive explanations nonetheless.
With each major release, we take the liberty to introduce potential and actual binary and behavioral incompatible changes so that past mistakes can be corrected and technical debt can be repaid.
Please read this guide to its full extent before posting any issue about "why X no longer compiles". Please also take note of sentences marked with
RxJava 3 lives in the group io.reactivex.rxjava3
with artifact ID rxjava
. Official language/platform adaptors will also be located under the group io.reactivex.rxjava3
.
The following examples demonstrate the typical import statements. Please consider the latest version and replace 3.0.0
with the numbers from the badge:
dependencies {
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
}
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>3.0.0</version>
</dependency>
ℹ️ Further references: PR #6421
The 3.x documentation of the various components can be found at
Sub-version specific documentation is available under a version tag, for example
(replace 3.0.0-RC9
with the numbers from the badge: ).
The documentation of the current snapshot is under
For a long time, RxJava was limited to Java 6 API due to how Android was lagging behind in its runtime support. This changed with the upcoming Android Studio 4 previews where a process called desugaring is able to turn many Java 7 and 8 features into Java 6 compatible ones transparently.
This allowed us to increase the baseline of RxJava to Java 8 and add official support for many Java 8 constructs:
-
Stream
: usejava.util.stream.Stream
as a source or expose sequences as blockingStream
s. - Stream
Collector
s: aggregate items into collections specified by standard transformations. -
Optional
: helps with the non-nullness requirement of RxJava -
CompletableFuture
: consumeCompletableFuture
s non-blockingly or expose single results asCompletableFuture
s. - Use site non-null annotation: helps with some functional types be able to return null in specific circumstances.
However, some features won't be supported:
-
java.time.Duration
: would add a lot of overloads; can always be decomposed into thetime
+unit
manually. -
java.util.function
: these can't throwThrowable
s, overloads would create bloat and/or ambiguity
Consequently, one has to change the project's compilation target settings to Java 8:
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
or
android {
compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
}
Due to the state of the Android Desugar tooling, as of writing this page, internals of pre-existing, non Java 8-related RxJava operators do not use Java 8 constructs or types. This allows using these "older" operators with Android API levels where the desugaring tool doesn't provide automatic Java 8 backports of various constructs.
ℹ️ Further references: Issue #6695, PR #6765, other PRs
RxJava 3 components are located under the io.reactivex.rxjava3
package (RxJava 1 has rx
and RxJava 2 is just io.reactivex
. This allows version 3 to live side by side with the earlier versions. In addition, the core types of RxJava (Flowable
, Observer
, etc.) have been moved to io.reactivex.rxjava3.core
.
Component | RxJava 2 | RxJava 3 |
---|---|---|
Core | io.reactivex |
io.reactivex.rxjava3.core |
Annotations | io.reactivex.annotations |
io.reactivex.rxjava3.annotations |
Disposables | io.reactivex.disposables |
io.reactivex.rxjava3.disposables |
Exceptions | io.reactivex.exceptions |
io.reactivex.rxjava3.exceptions |
Functions | io.reactivex.functions |
io.reactivex.rxjava3.functions |
Flowables | io.reactivex.flowables |
io.reactivex.rxjava3.flowables |
Observables | io.reactivex.observables |
io.reactivex.rxjava3.observables |
Subjects | io.reactivex.subjects |
io.reactivex.rxjava3.subjects |
Processors | io.reactivex.processors |
io.reactivex.rxjava3.processors |
Observers | io.reactivex.observers |
io.reactivex.rxjava3.observers |
Subscribers | io.reactivex.subscribers |
io.reactivex.rxjava3.subscribers |
Parallel | io.reactivex.parallel |
io.reactivex.rxjava3.parallel |
Internal | io.reactivex.internal |
io.reactivex.rxjava3.internal |
Due to naming matches, IDE's tend to import java.util.Observable
instead of picking RxJava's io.reactivex.rxjava3.core.Observable
. One can usually have the IDE ignore java.util.Observable
and java.util.Observer
, or otherwise, specify an explicit import io.reactivex.rxjava3.core.Observable;
in the affected files.
Also since RxJava 3 now requires a Java 8 runtime, the standard library functional interfaces, such as java.util.function.Function
, may be picked instead of io.reactivex.rxjava3.functions.Function
. IDEs tend to give a non-descriptive errors such as "Function can't be converted to Function", omitting the fact about the package differences.
ℹ️ Further references: PR #6621
Sometimes, the design of components and operators turn out to be inadequate, too limited or wrong in some circumstances. Major releases such as this allows us to make the necessary changes that would have caused all sorts of problems in a patch release.
With RxJava 2.x, the goal was set to not let any errors slip away in case the sequence is no longer able to deliver them to the consumers for some reason. Despite our best efforts, errors still could have been lost in a various race conditions across many dozen operators.
Fixing this in a 2.x patch would have caused too much trouble, therefore, the fix was postponed to the, otherwise already considerably changing, 3.x release. Now, cancelling an operator that delays errors internally will signal those errors to the global error handler via RxJavaPlugins.onError()
.
RxJavaPlugins.setErrorHandler(error -> System.out.println(error));
PublishProcessor<Integer> main = PublishProcessor.create();
PublishProcessor<Integer> inner = PublishProcessor.create();
// switchMapDelayError will delay all errors
TestSubscriber<Integer> ts = main.switchMapDelayError(v -> inner).test();
main.onNext(1);
// the inner fails
inner.onError(new IOException());
// the consumer is still clueless
ts.assertEmpty();
// the consumer cancels
ts.cancel();
// console prints
// io.reactivex.rxjava3.exceptions.UndeliverableException:
// The exception could not be delivered to the consumer because
// it has already canceled/disposed the flow or the exception has
// nowhere to go to begin with. Further reading:
// https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling
// | java.io.IOException
ℹ️ Further references: PRs
The purpose of the connectable types (ConnectableFlowable
and ConnectableObservable
) is to allow one or more consumers to be prepared before the actual upstream gets streamed to them upon calling connect()
. This worked correctly for the first time, but had some trouble if the upstream terminated instead of getting disconnected. In this terminating case, depending on whether the connectable was created with replay()
or publish()
, fresh consumers would either be unable to receive items from a new connection or would miss items altogether.
With 3.x, connectables have to be reset explicitly when they terminate. This extra steps allows consumers to receive cached items or be prepared for a fresh connection.
With publish
, if the connectable terminates, consumers subscribing later will only receive the terminal event. One has to call reset()
so that a late consumer will receive items from a fresh connection.
ConnectableFlowable<Integer> connectable = Flowable.range(1, 10).publish();
// prepare consumers, nothing is signaled yet
connectable.subscribe(/* ... */);
connectable.subscribe(/* ... */);
// connect, current consumers will receive items
connectable.connect();
// let it terminate
Thread.sleep(2000);
// late consumers now will receive a terminal event
connectable.subscribe(
item -> { },
error -> { },
() -> System.out.println("Done!"));
// reset the connectable to appear fresh again
connectable.reset();
// fresh consumers, they will also be ready to receive
connectable.subscribe(
System.out::println,
error -> { },
() -> System.out.println("Done!")
);
// connect, the fresh consumer now gets the new items
connectable.connect();
With replay
, if the connectable terminates, consumers subscribing later will receive the cached items. One has to call reset
to discard this cache so that late consumers can then receive fresh items.
ConnectableFlowable<Integer> connectable = Flowable.range(1, 10).replay();
// prepare consumers, nothing is signaled yet
connectable.subscribe(System.out::println);
connectable.subscribe(System.out::println);
// connect, current consumers will receive items
connectable.connect();
// let it terminate
Thread.sleep(2000);
// late consumers will still receive the cached items
connectable.subscribe(
System.out::println,
error -> { },
() -> System.out.println("Done!"));
// reset the connectable to appear fresh again
connectable.reset();
// fresh consumers, they will also be ready to receive
connectable.subscribe(
System.out::println,
error -> { },
() -> System.out.println("Done!")
);
// connect, the fresh consumer now gets the new items
connectable.connect();
ℹ️ Further references: Issue #5628, PR #6519
The implementation of Flowable.publish
hosts an internal queue to support backpressure from its downstream. In 2.x, this queue, and consequently the upstream source, was slowly draining on its own if all of the resulting ConnectableFlowable
's consumers have cancelled. This caused unexpected item loss when the lack of consumers was only temporary.
With 3.x, the implementation pauses and items already in the internal queue will be immediately available to consumers subscribing a bit later.
ConnectableFlowable<Integer> connectable = Flowable.range(1, 200).publish();
connectable.connect();
// the first consumer takes only 50 items and cancels
connectable.take(50).test().assertValueCount(50);
// with 3.x, the remaining items will be still available
connectable.test().assertValueCount(150);
ℹ️ Further references: Issue #5899, PR #6519
Calling PublishProcessor.offer()
, BehaviorProcessor.offer()
or MulticastProcessor.offer
with a null argument now throws a NullPointerException
instead of signaling it via onError
and thus terminating the processor. This now matches the behavior of the onNext
method required by the Reactive Streams specification.
PublishProcessor<Integer> pp = PublishProcessor.create();
TestSubscriber<Integer> ts = pp.test();
try {
pp.offer(null);
} catch (NullPointerException expected) {
}
// no error received
ts.assertEmpty();
pp.offer(1);
// consumers are still there to receive proper items
ts.asssertValuesOnly(1);
ℹ️ Further references: PR #6799
MulticastProcessor
was designed to be processor that coordinates backpressure like the Flowable.publish
operators do. It includes internal optimizations such as operator-fusion when subscribing it to the right kind of source.
Since users can retain the reference to the processor itself, they could, in concept, call the onXXX
methods and possibly cause trouble. Same is true for the offer
method which, when called while the aforementioned fusion is taking place, lead to undefined behavior in 2.x.
With 3.x, the offer
method will throw an IllegalStateException
and not disturb the internal state of the processor.
ℹ️ Further references: PR #6799
The groupBy
operator is one of the peculiar operators that signals reactive sources as its main output where consumers are expected to subscribe to these inner sources as well. Consequently, if the main sequence gets cancelled (i.e., the Flowable<GroupedFlowable<T>>
itself), the consumers should still keep receiving items on their groups but no new groups should be created. The original source can then only be cancelled if all of such inner consumers have cancelled as well.
However, in 2.x, there is nothing forcing the consumption of the inner sources and thus groups may be simply ignored altogether, preventing the cancellation of the original source and possibly leading to resource leaks.
With 3.x, the behavior of groupBy
has been changed so that when it emits a group, the downstream has to subscribe to it synchronously. Otherwise, the group is considered "abandoned" and terminated. This way, abandoned groups won't prevent the cancellation of the original source. If a late consumer still subscribes to the group, the item that triggered the group creation will be still available.
Synchronous subscription means the following flow setups will cause abandonment and possibly group re-creation:
// observeOn creates a time gap between group emission
// and subscription
source.groupBy(v -> v)
.observeOn(Schedulers.computation())
.flatMap(g -> g)
// subscribeOn creates a time gap too
source.groupBy(v -> v)
.flatMap(g -> g.subscribeOn(Schedulers.computation()))
Since the groups are essentially hot sources, one should use observeOn
to move the processing of the items safely to another thread anyway:
source.groupBy(v -> v)
.flatMap(g ->
g.observeOn(Schedulers.computation())
.map(v -> v + 1)
)
ℹ️ Further references: Issue #6596, PR #6642
The Flowable.groupBy
operator is even more peculiar in a way that it has to coordinate backpressure from the consumers of its inner group and request from its original Flowable
. The complication is, such requests can lead to a creation of a new group, a new item for the group that itself requested or a new item for a completely different group altogether. Therefore, groups can affect each other's ability to receive items and can possibly hang the sequence, especially if some groups don't get to be consumed at all.
This latter can happen when groups are merged together with flatMap
where the number of individual groups is greater than the flatMap
's concurrency level (defaul 128) so fresh groups won't get subscribed to and old ones may not complete to make room. With concatMap
, the same issue can manifest immediately.
Since RxJava is non-blocking, such silent hangs are really difficult to detect and diagnose (i.e., no thread is apparently blocking in groupBy
or flatMap
). Therefore, 3.x changed the behavior of groupBy
so that if the immediate downstream is unable to receive a new group, the sequence terminates with MissingBackpressureException
:
Flowable.range(1, 1000)
.groupBy(v -> v)
.flatMap(v -> v, 16)
.test()
.assertError(MissingBackpressureException);
The error message will also indicate the group index:
Unable to emit a new group (#16) due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed.
Increasing the concurrency level to the right amount (or Integer.MAX_VALUE
if the number of groups is not known upfront) should resolve the problem:
.flatMap(v -> v, 1000)
ℹ️ Further references: Issue #6641, PR #6740
Similar to groupBy, the window
operator emits inner reactive sequences that should still keep receiving items when the outer sequence is cancelled (i.e., working with only a limited set of windows). Similarly, when all window consumers cancel, the original source should be cancelled as well.
However, in 2.x, there is nothing forcing the consumption of the inner sources and thus windows may be simply ignored altogether, preventing the cancellation of the original source and possibly leading to resource leaks.
With 3.x, the behavior of all window
operators has been changed so that when it emits a group, the downstream has to subscribe to it synchronously. Otherwise, the window is considered "abandoned" and terminated. This way, abandoned windows won't prevent the cancellation of the original source. If a late consumer still subscribes to the window, the item that triggered the window creation may be still available.
Synchronous subscription means the following flow setups will cause abandonment:
// observeOn creates a time gap between window emission
// and subscription
source.window(10, 5)
.observeOn(Schedulers.computation())
.flatMap(g -> g)
// subscribeOn creates a time gap too
source.window(1, TimeUnit.SECONDS)
.flatMap(g -> g.subscribeOn(Schedulers.computation()))
Since the windows are essentially hot sources, one should use observeOn
to move the processing of the items safely to another thread anyway:
source.window(1, TimeUnit.SECONDS)
.flatMap(g ->
g.observeOn(Schedulers.computation())
.map(v -> v + 1)
)
ℹ️ Further references: PR #6758, PR #6761, PR #6762
In 1.x and 2.x, calling the CompositeException.getCause()
method resulted in a generation of a chain of exceptions from the internal list of aggregated exceptions. This was mainly done because Java 6 lacks the suppressed exception feature of Java 7+ exceptions. However, the implementation was possibly mutating exceptions or, sometimes, unable to establish a chain at all. Given the source of the original contribution of the method, it was risky to fix the issues with it in 2.x.
With 3.x, the method constructs a cause exception that when asked for a stacktrace, generates an output without touching the aggregated exceptions (which is IDE friendly and should be navigable):
Multiple exceptions (2)
|-- io.reactivex.rxjava3.exceptions.TestException: ex3
at io.reactivex.rxjava3.exceptions.CompositeExceptionTest.nestedMultilineMessage(CompositeExceptionTest.java:341)
|-- io.reactivex.rxjava3.exceptions.TestException: ex4
at io.reactivex.rxjava3.exceptions.CompositeExceptionTest.nestedMultilineMessage(CompositeExceptionTest.java:342)
|-- io.reactivex.rxjava3.exceptions.CompositeException: 2 exceptions occurred.
at io.reactivex.rxjava3.exceptions.CompositeExceptionTest.nestedMultilineMessage(CompositeExceptionTest.java:337)
|-- io.reactivex.rxjava3.exceptions.CompositeException.ExceptionOverview:
Multiple exceptions (2)
|-- io.reactivex.rxjava3.exceptions.TestException: ex1
at io.reactivex.rxjava3.exceptions.CompositeExceptionTest.nestedMultilineMessage(CompositeExceptionTest.java:335)
|-- io.reactivex.rxjava3.exceptions.TestException: ex2
at io.reactivex.rxjava3.exceptions.CompositeExceptionTest.nestedMultilineMessage(CompositeExceptionTest.java:336)
ℹ️ Further references: Issue #6747, PR #6748
Some standard operators in 2.x throw IndexOutOfBoundsException
when the respective argument was invalid. For consistency with other parameter validation exceptions, the following operators now throw IllegalArgumentException
instead:
skip
skipLast
takeLast
takeLastTimed
ℹ️ Further references: PR #6831, PR #6835
In 2.x, cancelling sequences created via fromRunnable
and fromAction
were inconsistent with other fromX
sequences when the downstream cancelled/disposed the sequence immediately.
In 3.x, such upfront cancellation will not execute the given callback.
Runnable run = mock(Runnable.class);
Completable.fromRunnable(run)
.test(true); // cancel upfront
verify(run, never()).run();
ℹ️ Further references: PR #6873
The operator using
has an eager
parameter to determine when the resource should be cleaned up: true
means before-termination and false
means after-termination. Unfortunately, this setting didn't affect the cleanup order upon donwstream cancellation and was always cleaning up the resource before cancelling the upstream.
In 3.x, the cleanup order is now consistent when the sequence terminates or gets cancelled: true
means before-termination or before cancelling the upstream, false
means after-termination or after cancelling the upstream.
ℹ️ Further references: Issue #6347, PR #6534
A major release gives the opportunity to clean up and improve the API surface by adding, changing or removing elements all across. With 3.x, there are several of such changes that require some explanations.
RxJava 2.x introduced a custom set of functional interfaces in io.reactivex.functions
so that use of the library is possible with the same types on Java 6 and Java 8. A secondary reason for such custom types is that the standard Java 8 function types do not support throwing any checked exceptions, which in itself can result in some inconvenience when using RxJava operators.
Despite RxJava 3 being based on Java 8, the issues with the standard Java 8 functional interfaces persist, now with possible [desugaring]((https://developer.android.com/studio/preview/features#j8-desugar)ing issues on Android and their inability to throw checked exceptions. Therefore, 3.x kept the custom interfaces, but the @FunctionalInterface
annotation has been applied to them (which is safe/ignored on Android).
@FunctionalInterface
interface Function<@NonNull T, @NonNull R> {
R apply(T t) throws Throwable;
}
In addition, Java 8 allows declaring annotations on type argument and type argument use individually and thus all functional interfaces have received nullability annotations.
ℹ️ Further references: PR #6840, PR #6791, PR #6795
One small drawback with the custom throws Exception
in the functional interfaces is that some 3rd party APIs may throw a checked exception that is not a descendant of Exception
, or simply throw Throwable
.
Therefore, with 3.x, the functional interfaces as well as other support interfaces have been widened and declared with throws Throwable
in their signature.
This widening should be inconsequential for lambda-based or class-implementations provided to the RxJava methods:
source.map(v -> {
if (v == 0) {
throw new Exception();
}
return v;
});
source.filter(new Predicate<Integer>() {
@Override
public boolean test() throws Exception {
throw new IOException();
}
});
I.e., there is no need to change throws Exception
to throws Throwable
just for the sake of it.
However, if one uses these functional interfaces outside:
static void Integer callFunction(
Function<Integer, Integer> function, Integer value) throws Exception {
return function.apply(value);
}
the widening of throws
will have to be propagated:
static void Integer callFunction(
Function<Integer, Integer> function, Integer value) throws Throwable {
return function.apply(value);
}
ℹ️ Further references: PR #6511, PR #6579
RxJava 2.x already supported the standard java.util.concurrent.Callable
whose call
method is declared with throws Exception
by default. Unfortunately, when our custom functional interfaces were widened to throws Throwable
, it was impossible to widen Callable
because no in Java, implementations can't widen the throws
clause, only narrow or abandon it.
Therefore, 3.x introduces the io.reactivex.rxjava3.functions.Supplier
interface that defines the widest throws
possible:
interface Supplier<@NonNull R> {
R get() throws Throwable;
}
Due to naming matches, IDE's tend to import java.util.function.Supplier
instead of picking RxJava's io.reactivex.rxjava3.functions.Supplier
. Also IDEs tend to give a non-descriptive errors such as "Suppliercan't be converted to Supplier", omitting the fact about the package differences.
To comply with the support for wider throws functional interfaces, many operators used to take java.util.concurrent.Callable
now take io.reactivex.rxjava3.functions.Supplier
instead. If the operator was used with a lambda, only a recompilation is needed:
Flowable.defer(() -> Flowable.just(Math.random()));
However, if explicit implementation was used:
Flowable.defer(new Callable<Double>() {
@Override
public Double call() throws Exception {
return Math.random();
}
});
the interface type (Callable
-> Supplier
) and the method name (call
-> get
) has to be adjusted:
Flowable.defer(new Supplier<Double>() {
@Override
public Double get() throws Exception {
return Math.random();
}
});
See the API signature changes section on which operators are affected.
ℹ️ Further references: PR #6511
In 2.x, the to()
operator used the generic Function
to allow assembly-time conversion of flows into arbitrary types. The drawback of this
approach was that each base reactive type had the same Function
interface in their method signature,
thus it was impossible to implement multiple converters for different reactive types within the same class.
To work around this issue, the as
operator and XConverter
interfaces have been introduced
in 2.x, which interfaces are distinct and can be implemented on the same class. Changing the signature of to
in 2.x was not possible due
to the pledged binary compatibility of the library.
From 3.x, the as()
methods have been removed and the to()
methods now each work with their respective XConverter
interfaces (hosted in package io.reactivex.rxjava3.core
):
-
Flowable.to(Function<Flowable<T>, R>)
->Flowable.to(FlowableConverter<T, R>)
-
Observable.to(Function<Observable<T>, R>)
->Observable.to(ObservableConverter<T, R>)
-
Maybe.to(Function<Flowable<T>, R>)
->Maybe.to(MaybeConverter<T, R>)
-
Single.to(Function<Flowable<T>, R>)
->Maybe.to(SingleConverter<T, R>)
-
Completable.to(Function<Completable, R>)
->Completable.to(CompletableConverter<R>)
-
ParallelFlowable.to(Function<ParallelFlowable<T>, R)
->ParallelFlowable.to(ParallelFlowableConverter<T, R>)
If one was using these methods with a lambda expression, only a recompilation is needed:
// before
source.to(flowable -> flowable.blockingFirst());
// after
source.to(flowable -> flowable.blockingFirst());
If one was implementing a Function interface (typically anonymously), the interface type, type arguments and the throws
clause have to be adjusted
// before
source.to(new Function<Flowable<Integer>, Integer>() {
@Override
public Integer apply(Flowable<Integer> t) throws Exception {
return t.blockingFirst();
}
});
// after
source.to(new FlowableConverter<Integer, Integer>() {
@Override
public Integer apply(Flowable<Integer> t) {
return t.blockingFirst();
}
});
ℹ️ Further references: Issue #5654, PR #6514
Moving to Java 8 and Android's desugaring tooling allows the use of static interface methods instead of separate factory classes. The support class io.reactivex.disposables.Disposables
was a prime candidate for moving all of its methods into the Disposable
interface itself (io.reactivex.rxjava3.disposables.Disposable
).
Uses of the factory methods:
Disposable d = Disposables.empty();
should now be turned into:
Disposable d = Disposable.empty();
ℹ️ Further references: PR #6781
Internally, RxJava 2.x uses an abstraction of a disposable container instead of using CompositeDisposable
everywhere, allowing a more appropriate container type to be used. This is achieved via an internal DisposableContainer
implemented by CompositeDisposable
as well as other internal components. Unfortunately, since the public class referenced an internal interface, RxJava was causing warnings in OSGi environments.
In RxJava 3, the DisposableContainer
is now part of the public API under io.reactivex.rxjava3.disposables.DisposableContainer
and no longer causes OSGi issues.
ℹ️ Further references: Issue #6742, PR #6745
The RxJava 2.2.x line has still a couple of experimental operators (but no beta) operators, which have been promoted to standard with 3.x:
ℹ️ Further references: PR #6537
RxJava 3 received a considerable amount of new operators and methods across its API surface. Brand new operators introduced are marked with in their respective Available in: listings
Available in: Flowable
, Observable
, Maybe
, Single
, Completable
A limitation with the bounded replay
operator is that to allow continuous item delivery to slow consumers, a linked list of the cached items has to be maintained. By default, the head node of this list is moved forward when the boundary condition (size, time) mandates it. This setup avoids allocation in exchange for retaining one "invisible" item in the linked list. However, sometimes this retention is unwanted and the allocation overhead of a clean node is acceptable. In 2.x, the ReplaySubject
and ReplayProcessor
implementations already allowed for such behavior, but the instance replay()
operators did not.
With 3.x, the replay
operators (both connectable and multicasting variants) received overloads, defining an eagerTruncate
option that performs this type of head node cleanup.
replay(int, boolean)
replay(long, TimeUnit, Scheduler, boolean)
replay(int, long, TimeUnit, Scheduler, boolean)
replay(Function, int, boolean)
replay(Function, long, TimeUnit, Scheduler, boolean)
replay(Function, int, long, TimeUnit, Scheduler, boolean)
replay(int, boolean)
replay(long, TimeUnit, Scheduler, boolean)
replay(int, long, TimeUnit, Scheduler, boolean)
replay(Function, int, boolean)
replay(Function, long, TimeUnit, Scheduler, boolean)
replay(Function, int, long, TimeUnit, Scheduler, boolean)
ℹ️ Further references: Issue #6475, PR #6532
Available in: Flowable
, Observable
, Maybe
, Single
, Completable
A property of the concatMap
operator is that the mapper
function may be invoked either on the subscriber's thread or the currently completing inner source's thread. There is no good way to control this thread of invocation from the outside, therefore, new overloads have been added in 3.x with an additional Scheduler
parameter:
ℹ️ Further references: Issue #6447, PR #6538
By default, Schedulers.from
executes work on the supplied Executor
in an eager mode, running as many tasks as available. This can cause some unwanted lack of interleaving between these tasks and external tasks submitted to the same executor. To remedy the situation, a new mode and overload has been added so that the Scheduler
returned by Schedulers.from
runs tasks one by one, allowing other external tasks to be interleaved.
ℹ️ Further references: Issue #6696, Issue #6697, PR #6744
Available in: Flowable
, Observable
, Maybe
, Single
, Completable
The underlying blockingIterable
operator had already the option to specify the internal buffer size (and prefetch amounts), which is now exposed via new blockingForEach
overloads
ℹ️ Further references: Issue #6784, PR #6800
Available in: Flowable
, Observable
, Maybe
, Single
, Completable
For API consistency, the callback-based blockingSubscribe
methods have been introduced to Maybe
, Single
and Completable
respectively.
blockingSubscribe()
blockingSubscribe(Consumer)
blockingSubscribe(Consumer, Consumer)
blockingSubscribe(Consumer, Consumer, Action)
blockingSubscribe(MaybeObserver)
blockingSubscribe()
blockingSubscribe(Consumer)
blockingSubscribe(Consumer, Consumer)
blockingSubscribe(SingleObserver)
blockingSubscribe()
blockingSubscribe(Action)
blockingSubscribe(Action, Consumer)
blockingSubscribe(CompletableObserver)
ℹ️ Further references: Issue #6852, PR #6862
Available in: Flowable
, Observable
, Maybe
, Single
, Completable
The option, available in every other reactive type, to delay the errors optionally as well was missing from Maybe
.
ℹ️ Further references: Issue #6863, PR #6864
Available in: Flowable
, Observable
, Maybe
, Single
, Completable
Upon an error, the sequence is completed (conditionally) instead of signaling the error.
ℹ️ Further references: Issue #6852, PR #6867
Available in: Flowable
, Observable
, Maybe
, Single
, Completable
This operator (under the name onErrorResumeNext
now renamed) was already available everywhere else and was accidentally left out of Completable
.
ℹ️ Further references: Issue #6852, PR #6868
Available in: Flowable
, Observable
, Maybe
, Single
, Completable
The operator was missing from Single
and Completable
.
ℹ️ Further references: Issue #6852, PR #6869
Available in: Flowable
, Observable
, Maybe
, Single
, Completable
Added the static version of the switchMap
operator, switchOnNext
and switchOnNextDelayError
, to Maybe
, Single
and Completable
.
Maybe.switchOnNext(Function)
Single.switchOnNext(Function)
Completable.switchOnNext(Function)
Maybe.switchOnNextDelayError(Function)
Single.switchOnNextDelayError(Function)
Completable.switchOnNextDelayError(Function)
ℹ️ Further references: Issue #6852, PR #6870
Available in: Flowable
, Observable
, Maybe
, Single
, Completable
The operator was already added to the other reactive types before.
ℹ️ Further references: Issue #6852, PR #6871
Several operators have been added across:
Operator | F | O | M | S | C |
---|---|---|---|---|---|
fromAction
|
(23) | ||||
fromCompletable
|
(72) | (73) | |||
fromMaybe
|
(73) | ||||
fromObservable
|
(73) | ||||
fromPublisher
|
|||||
fromRunnable
|
(23) | ||||
fromSingle
|
(73) |
fromAction(Action)
fromCompletable(CompletableSource)
fromMaybe(MaybeSource)
fromObservable(ObservableSource, BackpressureStrategy)
fromRunnable(Runnable)
fromSingle(Runnable)
fromAction(Action)
fromCompletable(CompletableSource)
fromMaybe(MaybeSource)
fromRunnable(Runnable)
fromSingle(Runnable)
ℹ️ Further references: Issue #6852, PR #6873
Available in: Flowable
, Observable
, Maybe
, Single
, Completable
These operators were already available for Flowable
and Observable
, now added to Single
and Maybe
.
timeInterval()
timeInterval(TimeUnit)
timeInterval(Scheduler)
timeInterval(TimeUnit, Scheduler)
timestamp()
timestamp(TimeUnit)
timestamp(Scheduler)
timestamp(TimeUnit, Scheduler)
timeInterval()
timeInterval(TimeUnit)
timeInterval(Scheduler)
timeInterval(TimeUnit, Scheduler)
timestamp()
timestamp(TimeUnit)
timestamp(Scheduler)
timestamp(TimeUnit, Scheduler)
ℹ️ Further references: Issue #6852, PR #6874
Available in: Flowable
, Observable
, Maybe
, Single
, Completable
This operator was already available elsewhere, now added to Maybe
and Completable
.
ℹ️ Further references: Issue #6852, PR #6875
Available in: Flowable
, Observable
, Maybe
, Single
, Completable
This operator was already available in Flowable
and Observable
, now added to Maybe
and Single
.
ℹ️ Further references: Issue #6852, PR #6876
Available in: Flowable
, Observable
, Maybe
, Single
, Completable
This operator was already available in Flowable
and Observable
, now added to Maybe
, Single
and Completable
.
ℹ️ Further references: Issue #6852, PR #6877
Available in: Flowable
, Observable
, Maybe
, Single
, Completable
Added varios concatMap-based transformations between Maybe
, Single
and Completable
for Maybe
and Single
. These are essentially aliases to the respective flatMap
operators for better discoverability.
Maybe.concatMapCompletable(Function)
Maybe.concatMapSingle(Function)
Single.concatMapCompletable(Function)
Single.concatMapMaybe(Function)
Single.concatMap(Function)
ℹ️ Further references: Issue #6852, PR #6879
Available in: Flowable
, Observable
, Maybe
, Single
, Completable
The delayError variants of the concat
operator were missing across.
Single.concatArrayDelayError(Single...)
Single.concatArrayEagerDelayError
Single.concatDelayError(Iterable)
Single.concatDelayError(Publisher)
Single.concatDelayError(Publisher, int)
Completable.concatArrayDelayError(Completable...)
Completable.concatDelayError(Iterable)
Completable.concatDelayError(Publisher)
Completable.concatDelayError(Publisher, int)
ℹ️ Further references: Issue #6852, PR #6881
The method was ambiguous and/or inviting wrong usage in other languages. They have now been renamed to startWithArray
, startWithIterable
and startWithItem
:
ℹ️ Further references: Issue #6122, PR #6530
The method was ambiguous and/or inviting wrong usage in other languages. They have now been renamed to onErrorResumeWith
across all types.
Flowable.onErrorResumeWith()
Observable.onErrorResumeWith()
Maybe.onErrorResumeWith()
Single.onErrorResumeWith()
Completable.onErrorResumeWith()
ℹ️ Further references: Issue #6551, PR #6556
Renamed to be plain zip
to match the naming convention with other operators (i.e., Iterable/Source versions are named plainly, array-versions receive an Array
postfix).
ℹ️ Further references: Issue #6610, PR #6638
Renamed to be plain combineLatestArray
and combineLatestArrayDelayError
to match the naming convention with other operators (i.e., Iterable/Source versions are named plainly, array-versions receive an Array
postfix).
Flowable.combineLatestArray()
Flowable.combineLatestArrayDelayError()
Observable.combineLatestArray()
Observable.combineLatestArrayDelayError()
ℹ️ Further references: Issue #6820, PR #6640, PR #6838
Renamed to sequenceEqual
to match the naming in the other reactive classes.
ℹ️ Further references: Issue #6854, PR #6856
Operators accepting a java.util.concurrent.Callable
have been changed to accept io.reactivex.rxjava3.functions.Supplier
instead to enable the callbacks to throw any kind of exceptions.
If the operator was used with a lambda, only a recompilation is needed:
Flowable.defer(() -> Flowable.just(Math.random()));
However, if explicit implementation was used:
Flowable.defer(new Callable<Double>() {
@Override
public Double call() throws Exception {
return Math.random();
}
});
the interface type (Callable
-> Supplier
) and the method name (call
-> get
) has to be adjusted:
Flowable.defer(new Supplier<Double>() {
@Override
public Double get() throws Exception {
return Math.random();
}
});
(Across all reactive types, multiple overloads.)
defer | error | using |
generate | buffer | collect |
distinct | reduceWith | scanWith |
toMap | toMultimap |
ℹ️ Further references: PR #6511
The getValue()
and getValues(T[])
methods were a remnant from a time where Subject
and FlowableProcessor
was unifying all state peeking methods for every kind of subject/processor. These have been marked as @Deprecated
in 2.x and are now removed from 3.x. They can be trivially replaced with getValue()
if necessary, for example:
Object value = subject.getValue();
if (value == null) {
return new Object[1];
}
return new Object[] { value };
ℹ️ Further references: Issue #5622, PR #6516
Use Maybe.defaultIfEmpty(T)
instead.
ℹ️ Further references: PR #6517
Removed from Flowable
and Observable
. The 4th argument, the Subscription
/Disposable
callback, was more or less useless. Use Flowable.doOnSubscribe()
and Observable.doOnSubscribe()
instead. instead.
ℹ️ Further references: PR #6517
Using a better terminology instead: ignoreElement()
.
ℹ️ Further references: PR #6517
The behavior and signature was confusing (i.e., returning null
or a Throwable
). Use blockingAwait()
instead.
ℹ️ Further references: PR #6517
Based on user feedback, the following methods have been removed from TestSubscriber
and TestObserver
respectively due to being less useful outside testing RxJava itself:
assertErrorMessage | assertFailure(Predicate, T...) | assertFailureAndMessage |
assertNever(Predicate) | assertNever(T) | assertNoTimeout |
assertNotSubscribed | assertNotTerminated | assertSubscribed |
assertTerminated | assertTimeout | assertValueSequenceOnly |
assertValueSet | assertValueSetOnly | awaitCount(int, Runnable) |
awaitCount(int, Runnable, long) | awaitTerminalEvent | awaitTerminalEvent(long TimeUnit) |
clearTimeout | completions | errorCount |
errors | getEvents | isTerminated |
isTimeout | lastThread | valueCount |
assertOf |
ℹ️ Further references: Issue #6153, PR #6526
Thereplay(Scheduler)
and other overloads were carried over from the original Rx.NET API set but appears to be unused. Most use cases capture the connectable anyway so there is no much benefit from inlining an observeOn
into a connectable:
ConnectableFlowable<Integer> connectable = source.replay();
Flowable<Integr> flowable = connectable.observeOn(Schedulers.io());
// hand flowable to consumers
flowable.subscribe();
connectable.connect();
ℹ️ Further references: PR #6539
The Flowable.dematerialize()
and Observable.dematerialize()
were inherently type-unsafe and have been removed. In Rx.NET, the extension methods allowed dematerialize()
to be applied to Observable<Notification<T>>
only, but there is no way for doing it in Java as it has no extension methods and one can't restrict a method to appear only with a certain type argument scheme.
Use deserialize(Function)
instead.
Observable<Notification<Integer>> source = ...
Observable<Integer> result = source.dematerialize(v -> v);
ℹ️ Further references: PR #6539
The operator was apparently not used anywhere and has been removed from all types. It's function can be emulated via onErrorResumeNext
:
source.onErrorResumeNext(
error -> error instanceof Exception
? fallback : Obserable.error(error))
ℹ️ Further references: Issue #6554, PR #6564, PR #6844
This operator did not see much use and have been removed from Flowable
and Observable
. It can be emulated with the plain sourced versiong:
source.buffer(Observable.defer(supplier).take(1).repeat())
ℹ️ Further references: Issue #6555, PR #6564
Both the vararg overloaded versions of combineLatest
and combineLatestDelayError
were awkward to use from other JVM languages and have been removed. Use combineLatestArray
and combineLatestArrayDelayError
instead.
ℹ️ Further references: Issue #6634, PR #6635
Zip requires a known number of sources to work with. These overloads were just collecting up the inner sources for another overload. Removed from both Flowable
and Observable
. They can be emulated via composition:
Observable<Observable<Integer>> sources = ...
sources.toList().flatMapObservable(list -> Observable.zip(list, zipper));
ℹ️ Further references: PR #6638
These were just convenience overloads for fromFuture().subscribeOn()
all across. Apply subscribeOn
explicitly from now on.
Flowable.fromFuture(future).subscribeOn(Schedulers.io());
Flowable.fromFuture(future, 5, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io());
ℹ️ Further references: Issue #6811, PR #6814
This overload had no effect because there is no buffering happening inside the operator (unlike in the Flowable
variant). Use the Observable.concatMapIterable(Function)
overload instead.
Copyright (c) 2016-present, RxJava Contributors.
Twitter @RxJava | Gitter @RxJava