From b14afef68afb098209e7b112c96341425e5a7ec5 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Fri, 13 Sep 2019 18:46:40 +0300 Subject: [PATCH 1/6] Promote reactive adapters for Flow to stable API --- reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt | 2 -- reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt | 2 -- 2 files changed, 4 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt index 559068f578..1a3edc842a 100644 --- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt @@ -23,14 +23,12 @@ import kotlin.coroutines.* * If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flights elements * are discarded. */ -@ExperimentalCoroutinesApi public fun Publisher.asFlow(): Flow = PublisherAsFlow(this, 1) /** * Transforms the given flow to a spec-compliant [Publisher]. */ -@ExperimentalCoroutinesApi public fun Flow.asPublisher(): Publisher = FlowAsPublisher(this) private class PublisherAsFlow( diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt index 6b031ed453..c852c7667f 100644 --- a/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt +++ b/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt @@ -4,7 +4,6 @@ package kotlinx.coroutines.reactor -import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.reactive.FlowSubscription @@ -15,7 +14,6 @@ import reactor.core.publisher.Flux * Converts the given flow to a cold flux. * The original flow is cancelled when the flux subscriber is disposed. */ -@ExperimentalCoroutinesApi public fun Flow.asFlux(): Flux = FlowAsFlux(this) private class FlowAsFlux(private val flow: Flow) : Flux() { From d870a1625dc5af60944957e39050ef69c17acd47 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Fri, 13 Sep 2019 18:49:16 +0300 Subject: [PATCH 2/6] Promote Publisher.collect to stable API --- reactive/kotlinx-coroutines-reactive/src/Channel.kt | 1 - reactive/kotlinx-coroutines-reactive/src/Publish.kt | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactive/src/Channel.kt b/reactive/kotlinx-coroutines-reactive/src/Channel.kt index 6cf11b7aa7..74ea9d8e32 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Channel.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Channel.kt @@ -36,7 +36,6 @@ public suspend inline fun Publisher.consumeEach(action: (T) -> Unit) = * Subscribes to this [Publisher] and performs the specified action for each received element. * Cancels subscription if any exception happens during collect. */ -@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0 public suspend inline fun Publisher.collect(action: (T) -> Unit) = openSubscription().consumeEach(action) diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index a7d53876d6..0e3a7b8c11 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -48,7 +48,7 @@ public fun publish( @Deprecated( message = "CoroutineScope.publish is deprecated in favour of top-level publish", - level = DeprecationLevel.WARNING, + level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("publish(context, block)") ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring @LowPriorityInOverloadResolution From 52b97b9aa42954383556a63006dcb82d5409b7d1 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 19 Sep 2019 16:10:08 +0300 Subject: [PATCH 3/6] Promote rx2 extensions to stable, increase deprecation level for obsolete reactive primitives --- .../src/Channel.kt | 3 +- .../kotlinx-coroutines-reactor/src/Flux.kt | 2 +- .../kotlinx-coroutines-reactor/src/Mono.kt | 2 +- .../test/FluxMultiTest.kt | 2 +- .../kotlinx-coroutines-rx2/src/RxChannel.kt | 8 ++--- .../src/RxCompletable.kt | 2 +- .../kotlinx-coroutines-rx2/src/RxFlowable.kt | 2 +- .../kotlinx-coroutines-rx2/src/RxMaybe.kt | 2 +- .../src/RxObservable.kt | 2 +- .../kotlinx-coroutines-rx2/src/RxSingle.kt | 2 +- .../test/CompletableTest.kt | 4 +-- .../kotlinx-coroutines-rx2/test/MaybeTest.kt | 4 +-- .../test/ObservableMultiTest.kt | 12 +++---- .../test/ObservableSingleTest.kt | 32 +++++++++---------- .../kotlinx-coroutines-rx2/test/SingleTest.kt | 4 +-- 15 files changed, 39 insertions(+), 44 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactive/src/Channel.kt b/reactive/kotlinx-coroutines-reactive/src/Channel.kt index 74ea9d8e32..df2131b630 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Channel.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Channel.kt @@ -20,7 +20,6 @@ import org.reactivestreams.* * @param request how many items to request from publisher in advance (optional, one by default). */ @ObsoleteCoroutinesApi -@Suppress("CONFLICTING_OVERLOADS") public fun Publisher.openSubscription(request: Int = 1): ReceiveChannel { val channel = SubscriptionChannel(request) subscribe(channel) @@ -28,7 +27,7 @@ public fun Publisher.openSubscription(request: Int = 1): ReceiveChannel Publisher.consumeEach(action: (T) -> Unit) = openSubscription().consumeEach(action) diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt index 18b84ac117..389428df11 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt @@ -50,7 +50,7 @@ public fun flux( @Deprecated( message = "CoroutineScope.flux is deprecated in favour of top-level flux", - level = DeprecationLevel.WARNING, + level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("flux(context, block)") ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring @LowPriorityInOverloadResolution diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt index b218f6d0c5..76f0418ea6 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt @@ -39,7 +39,7 @@ public fun mono( @Deprecated( message = "CoroutineScope.mono is deprecated in favour of top-level mono", - level = DeprecationLevel.WARNING, + level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("mono(context, block)") ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 @LowPriorityInOverloadResolution diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt index ae23d3c23d..7203120dae 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FluxMultiTest.kt @@ -73,7 +73,7 @@ class FluxMultiTest : TestBase() { val mono = mono { var result = "" try { - flux.consumeEach { result += it } + flux.collect { result += it } } catch(e: IOException) { result += e.message } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt index b038e539fb..dc852b9024 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt @@ -19,7 +19,6 @@ import kotlinx.coroutines.internal.* * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @ObsoleteCoroutinesApi -@Suppress("CONFLICTING_OVERLOADS") public fun MaybeSource.openSubscription(): ReceiveChannel { val channel = SubscriptionChannel() subscribe(channel) @@ -34,7 +33,6 @@ public fun MaybeSource.openSubscription(): ReceiveChannel { * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). */ @ObsoleteCoroutinesApi -@Suppress("CONFLICTING_OVERLOADS") public fun ObservableSource.openSubscription(): ReceiveChannel { val channel = SubscriptionChannel() subscribe(channel) @@ -42,12 +40,12 @@ public fun ObservableSource.openSubscription(): ReceiveChannel { } // Will be promoted to error in 1.3.0, removed in 1.4.0 -@Deprecated(message = "Use collect instead", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.collect(action)")) +@Deprecated(message = "Use collect instead", level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("this.collect(action)")) public suspend inline fun MaybeSource.consumeEach(action: (T) -> Unit) = openSubscription().consumeEach(action) // Will be promoted to error in 1.3.0, removed in 1.4.0 -@Deprecated(message = "Use collect instead", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.collect(action)")) +@Deprecated(message = "Use collect instead", level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("this.collect(action)")) public suspend inline fun ObservableSource.consumeEach(action: (T) -> Unit) = openSubscription().consumeEach(action) @@ -55,7 +53,6 @@ public suspend inline fun ObservableSource.consumeEach(action: (T) -> Uni * Subscribes to this [MaybeSource] and performs the specified action for each received element. * Cancels subscription if any exception happens during collect. */ -@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0 public suspend inline fun MaybeSource.collect(action: (T) -> Unit) = openSubscription().consumeEach(action) @@ -63,7 +60,6 @@ public suspend inline fun MaybeSource.collect(action: (T) -> Unit) = * Subscribes to this [ObservableSource] and performs the specified action for each received element. * Cancels subscription if any exception happens during collect. */ -@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0 public suspend inline fun ObservableSource.collect(action: (T) -> Unit) = openSubscription().consumeEach(action) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt index 0da06776a5..c59b4bd6b5 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt @@ -36,7 +36,7 @@ public fun rxCompletable( @Deprecated( message = "CoroutineScope.rxCompletable is deprecated in favour of top-level rxCompletable", - level = DeprecationLevel.WARNING, + level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("rxCompletable(context, block)") ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 @LowPriorityInOverloadResolution diff --git a/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt b/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt index beee40eedb..30a1ed7e92 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt @@ -45,7 +45,7 @@ public fun rxFlowable( @Deprecated( message = "CoroutineScope.rxFlowable is deprecated in favour of top-level rxFlowable", - level = DeprecationLevel.WARNING, + level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("rxFlowable(context, block)") ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 @LowPriorityInOverloadResolution diff --git a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt index fbc366c6df..9f176e938c 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt @@ -37,7 +37,7 @@ public fun rxMaybe( @Deprecated( message = "CoroutineScope.rxMaybe is deprecated in favour of top-level rxMaybe", - level = DeprecationLevel.WARNING, + level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("rxMaybe(context, block)") ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 @LowPriorityInOverloadResolution diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index 3d0ccd824d..6ccf0f0bae 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -46,7 +46,7 @@ public fun rxObservable( @Deprecated( message = "CoroutineScope.rxObservable is deprecated in favour of top-level rxObservable", - level = DeprecationLevel.WARNING, + level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("rxObservable(context, block)") ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 @LowPriorityInOverloadResolution diff --git a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt index b6cebf097c..f3573ee6eb 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt @@ -36,7 +36,7 @@ public fun rxSingle( @Deprecated( message = "CoroutineScope.rxSingle is deprecated in favour of top-level rxSingle", - level = DeprecationLevel.WARNING, + level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("rxSingle(context, block)") ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 @LowPriorityInOverloadResolution diff --git a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt index a7caea4793..9a12bafb1d 100644 --- a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt @@ -150,7 +150,7 @@ class CompletableTest : TestBase() { @Test fun testFatalExceptionInSubscribe() = runTest { - GlobalScope.rxCompletable(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) { + rxCompletable(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) { expect(1) 42 }.subscribe({ throw LinkageError() }) @@ -159,7 +159,7 @@ class CompletableTest : TestBase() { @Test fun testFatalExceptionInSingle() = runTest { - GlobalScope.rxCompletable(Dispatchers.Unconfined) { + rxCompletable(Dispatchers.Unconfined) { throw LinkageError() }.subscribe({ expectUnreached() }, { expect(1); assertTrue(it is LinkageError) }) finish(2) diff --git a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt index dcd66638e5..326c83e45c 100644 --- a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt @@ -280,7 +280,7 @@ class MaybeTest : TestBase() { @Test fun testFatalExceptionInSubscribe() = runTest { - GlobalScope.rxMaybe(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) { + rxMaybe(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) { expect(1) 42 }.subscribe({ throw LinkageError() }) @@ -289,7 +289,7 @@ class MaybeTest : TestBase() { @Test fun testFatalExceptionInSingle() = runTest { - GlobalScope.rxMaybe(Dispatchers.Unconfined) { + rxMaybe(Dispatchers.Unconfined) { throw LinkageError() }.subscribe({ expectUnreached() }, { expect(1); assertTrue(it is LinkageError) }) finish(2) diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt index 75f79de5ca..6971918723 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableMultiTest.kt @@ -18,7 +18,7 @@ class ObservableMultiTest : TestBase() { @Test fun testNumbers() { val n = 100 * stressTestMultiplier - val observable = GlobalScope.rxObservable { + val observable = rxObservable { repeat(n) { send(it) } } checkSingleValue(observable.toList()) { list -> @@ -30,7 +30,7 @@ class ObservableMultiTest : TestBase() { @Test fun testConcurrentStress() { val n = 10_000 * stressTestMultiplier - val observable = GlobalScope.rxObservable { + val observable = rxObservable { newCoroutineContext(coroutineContext) // concurrent emitters (many coroutines) val jobs = List(n) { @@ -51,7 +51,7 @@ class ObservableMultiTest : TestBase() { @Test fun testIteratorResendUnconfined() { val n = 10_000 * stressTestMultiplier - val observable = GlobalScope.rxObservable(Dispatchers.Unconfined) { + val observable = rxObservable(Dispatchers.Unconfined) { Observable.range(0, n).collect { send(it) } } checkSingleValue(observable.toList()) { list -> @@ -62,7 +62,7 @@ class ObservableMultiTest : TestBase() { @Test fun testIteratorResendPool() { val n = 10_000 * stressTestMultiplier - val observable = GlobalScope.rxObservable { + val observable = rxObservable { Observable.range(0, n).collect { send(it) } } checkSingleValue(observable.toList()) { list -> @@ -72,14 +72,14 @@ class ObservableMultiTest : TestBase() { @Test fun testSendAndCrash() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send("O") throw IOException("K") } val single = rxSingle { var result = "" try { - observable.consumeEach { result += it } + observable.collect { result += it } } catch(e: IOException) { result += e.message } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt index 6b5d7451a4..2fa1d9b0df 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt @@ -13,7 +13,7 @@ import java.util.concurrent.* class ObservableSingleTest { @Test fun testSingleNoWait() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send("OK") } @@ -29,7 +29,7 @@ class ObservableSingleTest { @Test fun testSingleEmitAndAwait() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send(Observable.just("O").awaitSingle() + "K") } @@ -40,7 +40,7 @@ class ObservableSingleTest { @Test fun testSingleWithDelay() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send(Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K") } @@ -51,7 +51,7 @@ class ObservableSingleTest { @Test fun testSingleException() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send(Observable.just("O", "K").awaitSingle() + "K") } @@ -62,7 +62,7 @@ class ObservableSingleTest { @Test fun testAwaitFirst() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send(Observable.just("O", "#").awaitFirst() + "K") } @@ -73,7 +73,7 @@ class ObservableSingleTest { @Test fun testAwaitFirstOrDefault() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send(Observable.empty().awaitFirstOrDefault("O") + "K") } @@ -84,7 +84,7 @@ class ObservableSingleTest { @Test fun testAwaitFirstOrDefaultWithValues() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send(Observable.just("O", "#").awaitFirstOrDefault("!") + "K") } @@ -95,7 +95,7 @@ class ObservableSingleTest { @Test fun testAwaitFirstOrNull() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send(Observable.empty().awaitFirstOrNull() ?: "OK") } @@ -106,7 +106,7 @@ class ObservableSingleTest { @Test fun testAwaitFirstOrNullWithValues() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send((Observable.just("O", "#").awaitFirstOrNull() ?: "!") + "K") } @@ -117,7 +117,7 @@ class ObservableSingleTest { @Test fun testAwaitFirstOrElse() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send(Observable.empty().awaitFirstOrElse { "O" } + "K") } @@ -128,7 +128,7 @@ class ObservableSingleTest { @Test fun testAwaitFirstOrElseWithValues() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send(Observable.just("O", "#").awaitFirstOrElse { "!" } + "K") } @@ -139,7 +139,7 @@ class ObservableSingleTest { @Test fun testAwaitLast() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { send(Observable.just("#", "O").awaitLast() + "K") } @@ -150,7 +150,7 @@ class ObservableSingleTest { @Test fun testExceptionFromObservable() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { try { send(Observable.error(RuntimeException("O")).awaitFirst()) } catch (e: RuntimeException) { @@ -165,7 +165,7 @@ class ObservableSingleTest { @Test fun testExceptionFromCoroutine() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { error(Observable.just("O").awaitSingle() + "K") } @@ -177,7 +177,7 @@ class ObservableSingleTest { @Test fun testObservableIteration() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { var result = "" Observable.just("O", "K").collect { result += it } send(result) @@ -190,7 +190,7 @@ class ObservableSingleTest { @Test fun testObservableIterationFailure() { - val observable = GlobalScope.rxObservable { + val observable = rxObservable { try { Observable.error(RuntimeException("OK")).collect { fail("Should not be here") } send("Fail") diff --git a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt index fce772347b..9251149375 100644 --- a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt @@ -201,7 +201,7 @@ class SingleTest : TestBase() { @Test fun testFatalExceptionInSubscribe() = runTest { - GlobalScope.rxSingle(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> assertTrue(e is LinkageError); expect(2) }) { + rxSingle(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> assertTrue(e is LinkageError); expect(2) }) { expect(1) 42 }.subscribe(Consumer { @@ -212,7 +212,7 @@ class SingleTest : TestBase() { @Test fun testFatalExceptionInSingle() = runTest { - GlobalScope.rxSingle(Dispatchers.Unconfined) { + rxSingle(Dispatchers.Unconfined) { throw LinkageError() }.subscribe({ _, e -> assertTrue(e is LinkageError); expect(1) }) From 5378b80adb1de61eceba1d8f13257f1af57e4ccf Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 19 Sep 2019 17:39:08 +0300 Subject: [PATCH 4/6] Deprecate obsolete reactive API, improve existing Reactive documentation * Remove obsolete rx-example module * Properly document interoperability with Reactor context * Update reactive readme --- .../resources/api.properties | 2 +- build.gradle | 2 +- .../kotlinx-coroutines-reactive/README.md | 24 +++++---- .../src/Channel.kt | 19 ++++--- .../src/Convert.kt | 9 ++-- .../src/ReactiveFlow.kt | 10 +++- reactive/kotlinx-coroutines-reactor/README.md | 17 ++++-- .../kotlinx-coroutines-reactor/src/Convert.kt | 8 ++- .../src/ReactorContext.kt | 54 +++++++++---------- .../src/ReactorFlow.kt | 2 + .../build.gradle | 15 ------ .../kotlinx-coroutines-rx-example/src/main.kt | 52 ------------------ reactive/kotlinx-coroutines-rx2/README.md | 9 ++++ .../kotlinx-coroutines-rx2/src/RxChannel.kt | 13 +++-- .../kotlinx-coroutines-rx2/src/RxConvert.kt | 10 ++-- settings.gradle | 2 - 16 files changed, 104 insertions(+), 144 deletions(-) delete mode 100644 reactive/kotlinx-coroutines-rx-example/build.gradle delete mode 100644 reactive/kotlinx-coroutines-rx-example/src/main.kt diff --git a/binary-compatibility-validator/resources/api.properties b/binary-compatibility-validator/resources/api.properties index 9fa115b984..e15ad21a02 100644 --- a/binary-compatibility-validator/resources/api.properties +++ b/binary-compatibility-validator/resources/api.properties @@ -4,6 +4,6 @@ module.roots=/ integration reactive ui module.marker=build.gradle -module.ignore=kotlinx-coroutines-rx-example stdlib-stubs benchmarks knit binary-compatibility-validator site publication-validator kotlinx-coroutines-bom +module.ignore=stdlib-stubs benchmarks knit binary-compatibility-validator site publication-validator kotlinx-coroutines-bom packages.internal=kotlinx.coroutines.internal \ No newline at end of file diff --git a/build.gradle b/build.gradle index 4fe2225e6b..d5ed5b9736 100644 --- a/build.gradle +++ b/build.gradle @@ -11,7 +11,7 @@ def coreModule = "kotlinx-coroutines-core" def sourceless = ['kotlinx.coroutines', 'site', 'kotlinx-coroutines-bom'] def internal = ['kotlinx.coroutines', 'site', 'benchmarks', 'knit', 'js-stub', 'stdlib-stubs', 'binary-compatibility-validator'] // Not published -def unpublished = internal + ['kotlinx-coroutines-rx-example', 'example-frontend-js', 'android-unit-tests'] +def unpublished = internal + ['example-frontend-js', 'android-unit-tests'] static def platformOf(project) { def name = project.name diff --git a/reactive/kotlinx-coroutines-reactive/README.md b/reactive/kotlinx-coroutines-reactive/README.md index 69691e8e77..b38202bbdf 100644 --- a/reactive/kotlinx-coroutines-reactive/README.md +++ b/reactive/kotlinx-coroutines-reactive/README.md @@ -8,6 +8,16 @@ Coroutine builders: | --------------- | ----------------------------- | ---------------- | --------------- | [publish] | `Publisher` | [ProducerScope] | Cold reactive publisher that starts coroutine on subscribe +Integration with [Flow]: + +| **Name** | **Result** | **Description** +| --------------- | -------------- | --------------- +| [Publisher.asFlow] | `Flow` | Converts the given publisher to flow +| [Flow.asPublisher] | `Publisher` | Converts the given flow to the TCK-compliant publisher + +If these adapters are used along with `kotlinx-coroutines-reactor` in the classpath, then Reactor's `Context` is properly +propagated as coroutine context element (`ReactorContext`) and vice versa. + Suspending extension functions and suspending iteration: | **Name** | **Description** @@ -18,29 +28,23 @@ Suspending extension functions and suspending iteration: | [Publisher.awaitFirstOrNull][org.reactivestreams.Publisher.awaitFirstOrNull] | Returns the first value from the given publisher or null | [Publisher.awaitLast][org.reactivestreams.Publisher.awaitFirst] | Returns the last value from the given publisher | [Publisher.awaitSingle][org.reactivestreams.Publisher.awaitSingle] | Returns the single value from the given publisher -| [Publisher.openSubscription][org.reactivestreams.Publisher.openSubscription] | Subscribes to publisher and returns [ReceiveChannel] - -Conversion functions: - -| **Name** | **Description** -| -------- | --------------- -| [ReceiveChannel.asPublisher][kotlinx.coroutines.channels.ReceiveChannel.asPublisher] | Converts streaming channel to hot publisher + +[Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html [ProducerScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-producer-scope/index.html -[ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/index.html [publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/publish.html +[Publisher.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/as-flow.html +[Flow.asPublisher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/kotlinx.coroutines.flow.-flow/as-publisher.html [org.reactivestreams.Publisher.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/await-first.html [org.reactivestreams.Publisher.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/await-first-or-default.html [org.reactivestreams.Publisher.awaitFirstOrElse]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/await-first-or-else.html [org.reactivestreams.Publisher.awaitFirstOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/await-first-or-null.html [org.reactivestreams.Publisher.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/await-single.html -[org.reactivestreams.Publisher.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/open-subscription.html -[kotlinx.coroutines.channels.ReceiveChannel.asPublisher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/kotlinx.coroutines.channels.-receive-channel/as-publisher.html # Package kotlinx.coroutines.reactive diff --git a/reactive/kotlinx-coroutines-reactive/src/Channel.kt b/reactive/kotlinx-coroutines-reactive/src/Channel.kt index df2131b630..f27665b702 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Channel.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Channel.kt @@ -5,21 +5,28 @@ package kotlinx.coroutines.reactive import kotlinx.atomicfu.* -import kotlinx.coroutines.* import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* import kotlinx.coroutines.internal.* import org.reactivestreams.* /** * Subscribes to this [Publisher] and returns a channel to receive elements emitted by it. * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this publisher. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - * + * @param request how many items to request from publisher in advance (optional, one by default). + * + * This method is deprecated in the favor of [Flow]. + * Instead of iterating over the resulting channel please use [collect][Flow.collect]: + * ``` + * asFlow().collect { value -> + * // process value + * } + * ``` */ -@ObsoleteCoroutinesApi +@Deprecated( + message = "Transforming publisher to channel is deprecated, use asFlow() instead", + level = DeprecationLevel.WARNING) // Will be error in 1.4 public fun Publisher.openSubscription(request: Int = 1): ReceiveChannel { val channel = SubscriptionChannel(request) subscribe(channel) diff --git a/reactive/kotlinx-coroutines-reactive/src/Convert.kt b/reactive/kotlinx-coroutines-reactive/src/Convert.kt index a7ae128ea5..f1ebd23a73 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Convert.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Convert.kt @@ -4,7 +4,6 @@ package kotlinx.coroutines.reactive -import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import org.reactivestreams.* import kotlin.coroutines.* @@ -14,13 +13,11 @@ import kotlin.coroutines.* * * Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers, * they'll receive values in round-robin way. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - * * @param context -- the coroutine context from which the resulting observable is going to be signalled */ -@ObsoleteCoroutinesApi +@Deprecated(message = "Deprecated in the favour of consumeAsFlow()", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.consumeAsFlow().asPublisher()")) public fun ReceiveChannel.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher = publish(context) { for (t in this@asPublisher) send(t) diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt index 1a3edc842a..b3b764a43c 100644 --- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt @@ -17,17 +17,23 @@ import kotlin.coroutines.* /** * Transforms the given reactive [Publisher] into [Flow]. * Use [buffer] operator on the resulting flow to specify the size of the backpressure. - * More precisely, to it specifies the value of the subscription's [request][Subscription.request]. + * More precisely, it specifies the value of the subscription's [request][Subscription.request]. * `1` is used by default. * * If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flights elements * are discarded. + * + * This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module, + * see its documentation for additional details. */ public fun Publisher.asFlow(): Flow = PublisherAsFlow(this, 1) /** - * Transforms the given flow to a spec-compliant [Publisher]. + * Transforms the given flow to a reactive specification compliant [Publisher]. + * + * This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module, + * see its documentation for additional details. */ public fun Flow.asPublisher(): Publisher = FlowAsPublisher(this) diff --git a/reactive/kotlinx-coroutines-reactor/README.md b/reactive/kotlinx-coroutines-reactor/README.md index 153148844e..db1af0dab2 100644 --- a/reactive/kotlinx-coroutines-reactor/README.md +++ b/reactive/kotlinx-coroutines-reactor/README.md @@ -4,15 +4,23 @@ Utilities for [Reactor](https://projectreactor.io). Coroutine builders: -| **Name** | **Result** | **Scope** | **Description** -| --------------- | -------------------------------------- | ---------------- | --------------- -| [mono] | `Mono` | [CoroutineScope] | Cold mono that starts coroutine on subscribe -| [flux] | `Flux` | [CoroutineScope] | Cold flux that starts coroutine on subscribe +| **Name** | **Result** | **Scope** | **Description** +| --------------- | ------------| ---------------- | --------------- +| [mono] | `Mono` | [CoroutineScope] | Cold mono that starts coroutine on subscribe +| [flux] | `Flux` | [CoroutineScope] | Cold flux that starts coroutine on subscribe Note that `Mono` and `Flux` are a subclass of [Reactive Streams](https://www.reactive-streams.org) `Publisher` and extensions for it are covered by [kotlinx-coroutines-reactive](../kotlinx-coroutines-reactive) module. +Integration with [Flow]: + +| **Name** | **Result** | **Description** +| --------------- | -------------- | --------------- +| [Flow.asFlux] | `Flux` | Converts the given flow to the TCK-compliant Flux. + +This adapter is integrated with Reactor's `Context` and coroutines [ReactiveContext]. + Conversion functions: | **Name** | **Description** @@ -31,6 +39,7 @@ Conversion functions: [mono]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/mono.html [flux]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/flux.html +[Flow.asFlux]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/kotlinx.coroutines.flow.-flow/as-flux.html [kotlinx.coroutines.Job.asMono]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/kotlinx.coroutines.-job/as-mono.html [kotlinx.coroutines.Deferred.asMono]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/kotlinx.coroutines.-deferred/as-mono.html [kotlinx.coroutines.channels.ReceiveChannel.asFlux]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/kotlinx.coroutines.channels.-receive-channel/as-flux.html diff --git a/reactive/kotlinx-coroutines-reactor/src/Convert.kt b/reactive/kotlinx-coroutines-reactor/src/Convert.kt index cf6b65de92..3d9aa13e43 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Convert.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Convert.kt @@ -43,13 +43,11 @@ public fun Deferred.asMono(context: CoroutineContext): Mono = mono(co * * Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers, * they'll receive values in round-robin way. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - * * @param context -- the coroutine context from which the resulting flux is going to be signalled */ -@ObsoleteCoroutinesApi +@Deprecated(message = "Deprecated in the favour of consumeAsFlow()", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.consumeAsFlow().asFlux()")) public fun ReceiveChannel.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux = flux(context) { for (t in this@asFlux) send(t) diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt index 942ba7b66c..19240c42ad 100644 --- a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt +++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt @@ -3,48 +3,48 @@ package kotlinx.coroutines.reactor import kotlinx.coroutines.ExperimentalCoroutinesApi import reactor.util.context.Context import kotlin.coroutines.* +import kotlinx.coroutines.reactive.* /** * Wraps Reactor's [Context] into [CoroutineContext] element for seamless integration Reactor and kotlinx.coroutines. - * * [Context.asCoroutineContext] is defined to add Reactor's [Context] elements as part of [CoroutineContext]. + * Coroutine context element that propagates information about Reactor's [Context] through coroutines. * - * Reactor builders [mono] and [flux] use this context element to enhance the resulting `subscriberContext`. + * This context element is implicitly propagated through subscriber's context by all Reactive integrations, such as [mono], [flux], + * [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux]. + * Functions that subscribe to the reactive stream (e.g. [Publisher.awaitFirst][awaitFirst]) also propagate [ReactorContext] to the + * subscriber's [Context]. + ** + * ### Examples of Reactive context integration. * - * ### Usages - * Passing reactor context from coroutine builder to reactor entity: - * ``` - * launch(Context.of("key", "value").asCoroutineContext()) { - * mono { - * println(coroutineContext[ReactorContext]) // Prints { "key": "value" } - * }.subscribe() - * } + * #### Propagating ReactorContext to Reactor's Context * ``` + * val flux = myDatabaseService.getUsers() + * .subscriberContext() { ctx -> println(ctx); ctx } + * flux.await() // Will print "null" * - * Accessing modified reactor context enriched from the downstream: - * ``` - * launch { - * mono { - * println(coroutineContext[ReactorContext]) // Prints { "key": "value" } - * }.subscriberContext(Context.of("key", "value")) - * .subscribe() + * // Now add ReactorContext + * withContext(Context.of("answer", "42").asCoroutineContext()) { + * flux.await() // Will print "Context{'key'='value'}" * } * ``` * - * [CoroutineContext] of a suspendable function that awaits a value from [Mono] or [Flux] instance - * is propagated into [mono] and [flux] Reactor builders: + * #### Propagating subscriber's Context to ReactorContext: * ``` - * launch(Context.of("key", "value").asCoroutineContext()) { - * assertEquals(bar().awaitFirst(), "value") - * } - * - * fun bar(): Mono = mono { - * coroutineContext[ReactorContext]!!.context.get("key") + * val flow = flow { + * println("Reactor context in Flow: " + coroutineContext[ReactorContext]) * } + * // No context + * flow.asFlux() + * .subscribe() // Will print 'Reactor context in Flow: null' + * // Add subscriber's context + * flow.asFlux() + * .subscriberContext { ctx -> ctx.put("answer", 42) } + * .subscribe() // Will print "Reactor context in Flow: Context{'answer'=42}" * ``` */ @ExperimentalCoroutinesApi -public class ReactorContext(val context: Context) : AbstractCoroutineContextElement(ReactorContext) { +public class ReactorContext(public val context: Context) : AbstractCoroutineContextElement(ReactorContext) { companion object Key : CoroutineContext.Key } @@ -53,4 +53,4 @@ public class ReactorContext(val context: Context) : AbstractCoroutineContextElem * and later used via `coroutineContext[ReactorContext]`. */ @ExperimentalCoroutinesApi -public fun Context.asCoroutineContext(): ReactorContext = ReactorContext(this) \ No newline at end of file +public fun Context.asCoroutineContext(): ReactorContext = ReactorContext(this) diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt index c852c7667f..57260cfbb3 100644 --- a/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt +++ b/reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt @@ -13,6 +13,8 @@ import reactor.core.publisher.Flux /** * Converts the given flow to a cold flux. * The original flow is cancelled when the flux subscriber is disposed. + * + * This function is integrated with [ReactorContext], see its documentation for additional details. */ public fun Flow.asFlux(): Flux = FlowAsFlux(this) diff --git a/reactive/kotlinx-coroutines-rx-example/build.gradle b/reactive/kotlinx-coroutines-rx-example/build.gradle deleted file mode 100644 index 9ca90ede98..0000000000 --- a/reactive/kotlinx-coroutines-rx-example/build.gradle +++ /dev/null @@ -1,15 +0,0 @@ -/* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -apply plugin: 'application' - -mainClassName = 'MainKt' - -dependencies { - ext.retrofit_version = '2.3.0' - compile project(':kotlinx-coroutines-rx2') - compile "com.squareup.retrofit2:retrofit:$retrofit_version" - compile "com.squareup.retrofit2:converter-gson:$retrofit_version" - compile "com.squareup.retrofit2:adapter-rxjava2:$retrofit_version" -} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx-example/src/main.kt b/reactive/kotlinx-coroutines-rx-example/src/main.kt deleted file mode 100644 index 0c306ce919..0000000000 --- a/reactive/kotlinx-coroutines-rx-example/src/main.kt +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -import io.reactivex.Single -import kotlinx.coroutines.* -import kotlinx.coroutines.rx2.await -import retrofit2.Retrofit -import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory -import retrofit2.converter.gson.GsonConverterFactory -import retrofit2.http.GET -import retrofit2.http.Path - -interface GitHub { - @GET("/repos/{owner}/{repo}/contributors") - fun contributors( - @Path("owner") owner: String, - @Path("repo") repo: String - ): Single> - - @GET("users/{user}/repos") - fun listRepos(@Path("user") user: String): Single> -} - -data class Contributor(val login: String, val contributions: Int) -data class Repo(val name: String) - -fun main(args: Array) = runBlocking { - println("Making GitHub API request") - - val retrofit = Retrofit.Builder().apply { - baseUrl("https://api.github.com") - addConverterFactory(GsonConverterFactory.create()) - addCallAdapterFactory(RxJava2CallAdapterFactory.create()) - }.build() - - val github = retrofit.create(GitHub::class.java) - - val contributors = - github.contributors("JetBrains", "Kotlin") - .await().take(10) - - for ((name, contributions) in contributors) { - println("$name has $contributions contributions, other repos: ") - - val otherRepos = - github.listRepos(name).await() - .map(Repo::name).joinToString(", ") - - println(otherRepos) - } -} diff --git a/reactive/kotlinx-coroutines-rx2/README.md b/reactive/kotlinx-coroutines-rx2/README.md index fbdf1b35af..b3874b05d3 100644 --- a/reactive/kotlinx-coroutines-rx2/README.md +++ b/reactive/kotlinx-coroutines-rx2/README.md @@ -12,6 +12,13 @@ Coroutine builders: | [rxObservable] | `Observable` | [ProducerScope] | Cold observable that starts coroutine on subscribe | [rxFlowable] | `Flowable` | [ProducerScope] | Cold observable that starts coroutine on subscribe with **backpressure** support +Integration with [Flow]: + +| **Name** | **Result** | **Description** +| --------------- | -------------- | --------------- +| [Flow.asFlowable] | `Flowable` | Converts the given flow to a cold Flowable. +| [Flow.asObservable] | `Observable` | Converts the given flow to a cold Observable. + Suspending extension functions and suspending iteration: | **Name** | **Description** @@ -56,6 +63,8 @@ Conversion functions: [rxSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-single.html [rxObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-observable.html [rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html +[Flow.asFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.flow.-flow/as-flowable.html +[Flow.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.flow.-flow/as-observable.html [io.reactivex.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-completable-source/await.html [io.reactivex.MaybeSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await.html [io.reactivex.MaybeSource.awaitOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await-or-default.html diff --git a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt index dc852b9024..0a19eda860 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt @@ -7,7 +7,6 @@ package kotlinx.coroutines.rx2 import io.reactivex.* import io.reactivex.disposables.* import kotlinx.atomicfu.* -import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.internal.* @@ -15,10 +14,10 @@ import kotlinx.coroutines.internal.* * Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it. * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source. * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). + * This API is deprecated in the favour of [Flow]. + * [MaybeSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first. */ -@ObsoleteCoroutinesApi +@Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.WARNING) // Will be hidden in 1.4 public fun MaybeSource.openSubscription(): ReceiveChannel { val channel = SubscriptionChannel() subscribe(channel) @@ -29,10 +28,10 @@ public fun MaybeSource.openSubscription(): ReceiveChannel { * Subscribes to this [ObservableSource] and returns a channel to receive elements emitted by it. * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this source. * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). + * This API is deprecated in the favour of [Flow]. + * [ObservableSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first. */ -@ObsoleteCoroutinesApi +@Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.WARNING) // Will be hidden in 1.4 public fun ObservableSource.openSubscription(): ReceiveChannel { val channel = SubscriptionChannel() subscribe(channel) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index 4b12127189..8df6caeeed 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -67,13 +67,11 @@ public fun Deferred.asSingle(context: CoroutineContext): Single * * Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers, * they'll receive values in round-robin way. - * - * **Note: This API will become obsolete in future updates with introduction of lazy asynchronous streams.** - * See [issue #254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). - * - * @param context -- the coroutine context from which the resulting observable is going to be signalled */ -@ObsoleteCoroutinesApi +@Deprecated( + message = "Deprecated in the favour of Flow", + level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("this.consumeAsFlow().asObservable()") +) public fun ReceiveChannel.asObservable(context: CoroutineContext): Observable = rxObservable(context) { for (t in this@asObservable) send(t) diff --git a/settings.gradle b/settings.gradle index aa5c68f99c..e161cfeada 100644 --- a/settings.gradle +++ b/settings.gradle @@ -36,8 +36,6 @@ module('integration/kotlinx-coroutines-play-services') module('reactive/kotlinx-coroutines-reactive') module('reactive/kotlinx-coroutines-reactor') module('reactive/kotlinx-coroutines-rx2') -module('reactive/kotlinx-coroutines-rx-example') - module('ui/kotlinx-coroutines-android') module('ui/kotlinx-coroutines-android/android-unit-tests') module('ui/kotlinx-coroutines-javafx') From c99704ae8be53dfeb4cb8e527cadff5eb36f4659 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 24 Sep 2019 19:30:49 +0300 Subject: [PATCH 5/6] Remove outdated reactive guide and add section about Reactive Streams to flow.md --- README.md | 1 - coroutines-guide.md | 1 + docs/coroutines-guide.md | 1 - docs/flow.md | 13 + reactive/coroutines-guide-reactive.md | 1078 ----------------- .../test/guide/example-reactive-basic-01.kt | 31 - .../test/guide/example-reactive-basic-02.kt | 32 - .../test/guide/example-reactive-basic-03.kt | 26 - .../test/guide/example-reactive-basic-04.kt | 20 - .../test/guide/example-reactive-basic-05.kt | 30 - .../test/guide/example-reactive-basic-06.kt | 18 - .../test/guide/example-reactive-basic-07.kt | 22 - .../test/guide/example-reactive-basic-08.kt | 25 - .../test/guide/example-reactive-basic-09.kt | 24 - .../test/guide/example-reactive-context-01.kt | 23 - .../test/guide/example-reactive-context-02.kt | 24 - .../test/guide/example-reactive-context-03.kt | 26 - .../test/guide/example-reactive-context-04.kt | 24 - .../test/guide/example-reactive-context-05.kt | 27 - .../guide/example-reactive-operators-01.kt | 19 - .../guide/example-reactive-operators-02.kt | 32 - .../guide/example-reactive-operators-03.kt | 39 - .../guide/example-reactive-operators-04.kt | 37 - .../test/guide/test/GuideReactiveTest.kt | 191 --- .../test/guide/test/ReactiveTestBase.kt | 47 - 25 files changed, 14 insertions(+), 1797 deletions(-) delete mode 100644 reactive/coroutines-guide-reactive.md delete mode 100644 reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-01.kt delete mode 100644 reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-02.kt delete mode 100644 reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-03.kt delete mode 100644 reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-04.kt delete mode 100644 reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-05.kt delete mode 100644 reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-06.kt delete mode 100644 reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-07.kt delete mode 100644 reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-08.kt delete mode 100644 reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-09.kt delete mode 100644 reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-01.kt delete mode 100644 reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-02.kt delete mode 100644 reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-03.kt delete mode 100644 reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-04.kt delete mode 100644 reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-05.kt delete mode 100644 reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-01.kt delete mode 100644 reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-02.kt delete mode 100644 reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-03.kt delete mode 100644 reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-04.kt delete mode 100644 reactive/kotlinx-coroutines-rx2/test/guide/test/GuideReactiveTest.kt delete mode 100644 reactive/kotlinx-coroutines-rx2/test/guide/test/ReactiveTestBase.kt diff --git a/README.md b/README.md index e31d6f242f..862ce16232 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,6 @@ suspend fun main() = coroutineScope { * Guides and manuals: * [Guide to kotlinx.coroutines by example](https://kotlinlang.org/docs/reference/coroutines/coroutines-guide.html) (**read it first**) * [Guide to UI programming with coroutines](ui/coroutines-guide-ui.md) - * [Guide to reactive streams with coroutines](reactive/coroutines-guide-reactive.md) * [Debugging capabilities in kotlinx.coroutines](docs/debugging.md) * [Compatibility policy and experimental annotations](docs/compatibility.md) * [Change log for kotlinx.coroutines](CHANGES.md) diff --git a/coroutines-guide.md b/coroutines-guide.md index 4a7fb67af3..bebd0d0e3c 100644 --- a/coroutines-guide.md +++ b/coroutines-guide.md @@ -79,6 +79,7 @@ The main coroutines guide has moved to the [docs folder](docs/coroutines-guide.m * [Upstream exceptions only](docs/flow.md#upstream-exceptions-only) * [Imperative versus declarative](docs/flow.md#imperative-versus-declarative) * [Launching flow](docs/flow.md#launching-flow) + * [Flow and Reactive Streams](docs/flow.md#flow-and-reactive-streams) * [Channels](docs/channels.md#channels) * [Channel basics](docs/channels.md#channel-basics) diff --git a/docs/coroutines-guide.md b/docs/coroutines-guide.md index 5f41d06000..e3f18d208e 100644 --- a/docs/coroutines-guide.md +++ b/docs/coroutines-guide.md @@ -28,6 +28,5 @@ In order to use coroutines as well as follow the examples in this guide, you nee ## Additional references * [Guide to UI programming with coroutines](../ui/coroutines-guide-ui.md) -* [Guide to reactive streams with coroutines](../reactive/coroutines-guide-reactive.md) * [Coroutines design document (KEEP)](https://github.com/Kotlin/kotlin-coroutines/blob/master/kotlin-coroutines-informal.md) * [Full kotlinx.coroutines API reference](https://kotlin.github.io/kotlinx.coroutines) diff --git a/docs/flow.md b/docs/flow.md index 2a3ffb598b..7837a54058 100644 --- a/docs/flow.md +++ b/docs/flow.md @@ -58,6 +58,7 @@ class FlowGuideTest { * [Upstream exceptions only](#upstream-exceptions-only) * [Imperative versus declarative](#imperative-versus-declarative) * [Launching flow](#launching-flow) + * [Flow and Reactive Streams](#flow-and-reactive-streams) @@ -1794,6 +1795,18 @@ as cancellation and structured concurrency serve this purpose. Note that [launchIn] also returns a [Job], which can be used to [cancel][Job.cancel] the corresponding flow collection coroutine only without cancelling the whole scope or to [join][Job.join] it. +### Flow and Reactive Streams + +For those who are familiar with [Reactive Streams](https://www.reactive-streams.org/) or reactive frameworks such as RxJava and project Reactor, +design of the Flow may look very familiar. + +Indeed, its design was inspired by Reactive Streams and its various implementations. But Flow main goal is to have as simple design as possible, +be Kotlin and suspension friendly and respect structured concurrency. Achieving this goal would be impossible without reactive pioneers and their tremendous work. You can read the complete story in [Reactive Streams and Kotlin Flows](https://medium.com/@elizarov/reactive-streams-and-kotlin-flows-bfd12772cda4) article. + +While being different, conceptually, Flow *is* a reactive stream and it is possible to convert it to the reactive (spec and TCK compliant) Publisher and vice versa. +Such converters are provided by `kotlinx.coroutines` out-of-the-box and can be found in corresponding reactive modules (`kotlinx-coroutines-reactive` for Reactive Streams, `kotlinx-coroutines-reactor` for Project Reactor and `kotlinx-coroutines-rx2` for RxJava2). +Integration modules include conversions from and to `Flow`, integration with Reactor's `Context` and suspension-friendly ways to work with various reactive entities. + [collections]: https://kotlinlang.org/docs/reference/collections-overview.html diff --git a/reactive/coroutines-guide-reactive.md b/reactive/coroutines-guide-reactive.md deleted file mode 100644 index 0eff27b1ea..0000000000 --- a/reactive/coroutines-guide-reactive.md +++ /dev/null @@ -1,1078 +0,0 @@ - - - - -# Guide to reactive streams with coroutines - -This guide explains the key differences between Kotlin coroutines and reactive streams and shows -how they can be used together for the greater good. Prior familiarity with the basic coroutine concepts -that are covered in [Guide to kotlinx.coroutines](../docs/coroutines-guide.md) is not required, -but is a big plus. If you are familiar with reactive streams, you may find this guide -a better introduction into the world of coroutines. - -There are several modules in `kotlinx.coroutines` project that are related to reactive streams: - -* [kotlinx-coroutines-reactive](kotlinx-coroutines-reactive) -- utilities for [Reactive Streams](https://www.reactive-streams.org) -* [kotlinx-coroutines-reactor](kotlinx-coroutines-reactor) -- utilities for [Reactor](https://projectreactor.io) -* [kotlinx-coroutines-rx2](kotlinx-coroutines-rx2) -- utilities for [RxJava 2.x](https://github.com/ReactiveX/RxJava) - -This guide is mostly based on [Reactive Streams](https://www.reactive-streams.org) specification and uses -its `Publisher` interface with some examples based on [RxJava 2.x](https://github.com/ReactiveX/RxJava), -which implements reactive streams specification. - -You are welcome to clone -[`kotlinx.coroutines` project](https://github.com/Kotlin/kotlinx.coroutines) -from GitHub to your workstation in order to -run all the presented examples. They are contained in -[reactive/kotlinx-coroutines-rx2/test/guide](kotlinx-coroutines-rx2/test/guide) -directory of the project. - -## Table of contents - - - -* [Differences between reactive streams and channels](#differences-between-reactive-streams-and-channels) - * [Basics of iteration](#basics-of-iteration) - * [Subscription and cancellation](#subscription-and-cancellation) - * [Backpressure](#backpressure) - * [Rx Subject vs BroadcastChannel](#rx-subject-vs-broadcastchannel) -* [Operators](#operators) - * [Range](#range) - * [Fused filter-map hybrid](#fused-filter-map-hybrid) - * [Take until](#take-until) - * [Merge](#merge) -* [Coroutine context](#coroutine-context) - * [Threads with Rx](#threads-with-rx) - * [Threads with coroutines](#threads-with-coroutines) - * [Rx observeOn](#rx-observeon) - * [Coroutine context to rule them all](#coroutine-context-to-rule-them-all) - * [Unconfined context](#unconfined-context) - - - -## Differences between reactive streams and channels - -This section outlines key differences between reactive streams and coroutine-based channels. - -### Basics of iteration - -The [Channel] is somewhat similar concept to the following reactive stream classes: - -* Reactive stream [Publisher](https://github.com/reactive-streams/reactive-streams-jvm/blob/master/api/src/main/java/org/reactivestreams/Publisher.java); -* Rx Java 1.x [Observable](https://reactivex.io/RxJava/javadoc/rx/Observable.html); -* Rx Java 2.x [Flowable](https://reactivex.io/RxJava/2.x/javadoc/), which implements `Publisher`. - -They all describe an asynchronous stream of elements (aka items in Rx), either infinite or finite, -and all of them support backpressure. - -However, the `Channel` always represents a _hot_ stream of items, using Rx terminology. Elements are being sent -into the channel by producer coroutines and are received from it by consumer coroutines. -Every [receive][ReceiveChannel.receive] invocation consumes an element from the channel. -Let us illustrate it with the following example: - - - -```kotlin -fun main() = runBlocking { - // create a channel that produces numbers from 1 to 3 with 200ms delays between them - val source = produce { - println("Begin") // mark the beginning of this coroutine in output - for (x in 1..3) { - delay(200) // wait for 200ms - send(x) // send number x to the channel - } - } - // print elements from the source - println("Elements:") - source.consumeEach { // consume elements from it - println(it) - } - // print elements from the source AGAIN - println("Again:") - source.consumeEach { // consume elements from it - println(it) - } -} -``` - -> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-01.kt). - -This code produces the following output: - -```text -Elements: -Begin -1 -2 -3 -Again: -``` - - - -Notice how the "Begin" line was printed just once, because the [produce] _coroutine builder_, when it is executed, -launches one coroutine to produce a stream of elements. All the produced elements are consumed -with [ReceiveChannel.consumeEach][consumeEach] -extension function. There is no way to receive the elements from this -channel again. The channel is closed when the producer coroutine is over and an attempt to receive -from it again cannot receive anything. - -Let us rewrite this code using the [publish] coroutine builder from `kotlinx-coroutines-reactive` module -instead of [produce] from `kotlinx-coroutines-core` module. The code stays the same, -but where `source` used to have the [ReceiveChannel] type, it now has the reactive streams' -[Publisher](https://www.reactive-streams.org/reactive-streams-1.0.0-javadoc/org/reactivestreams/Publisher.html) -type, and where [consumeEach] was used to _consume_ elements from the channel, -now [collect][org.reactivestreams.Publisher.collect] is used to _collect_ elements from the publisher. - - - -```kotlin -fun main() = runBlocking { - // create a publisher that produces numbers from 1 to 3 with 200ms delays between them - val source = publish { - // ^^^^^^^ <--- Difference from the previous examples is here - println("Begin") // mark the beginning of this coroutine in output - for (x in 1..3) { - delay(200) // wait for 200ms - send(x) // send number x to the channel - } - } - // print elements from the source - println("Elements:") - source.collect { // collect elements from it - println(it) - } - // print elements from the source AGAIN - println("Again:") - source.collect { // collect elements from it - println(it) - } -} -``` - -> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-02.kt). - -Now the output of this code changes to: - -```text -Elements: -Begin -1 -2 -3 -Again: -Begin -1 -2 -3 -``` - - - -This example highlights the key difference between a reactive stream and a channel. A reactive stream is a higher-order -functional concept. While the channel _is_ a stream of elements, the reactive stream defines a recipe on how the stream of -elements is produced. It becomes the actual stream of elements when _collected_. Each collector may receive the same or -a different stream of elements, depending on how the corresponding implementation of `Publisher` works. - -The [publish] coroutine builder used in the above example does not launch a coroutine, -but every [collect][org.reactivestreams.Publisher.collect] invocation does. -There are two of them here and that is why we see "Begin" printed twice. - -In Rx lingo, this kind of publisher is called _cold_. Many standard Rx operators produce cold streams, too. We can collect -them from a coroutine, and every collector gets the same stream of elements. - -> Note that we can replicate the same behaviour that we saw with channels by using Rx -[publish](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#publish()) -operator and [connect](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/flowables/ConnectableFlowable.html#connect()) -method with it. - -### Subscription and cancellation - -In the second example from the previous section, `source.collect { ... }` was used to collect all elements. -Instead, we can open a channel using [openSubscription][org.reactivestreams.Publisher.openSubscription] -and iterate over it. In this way, we can have finer-grained control over our iteration -(using `break`, for example), as shown below: - - - -```kotlin -fun main() = runBlocking { - val source = Flowable.range(1, 5) // a range of five numbers - .doOnSubscribe { println("OnSubscribe") } // provide some insight - .doOnComplete { println("OnComplete") } // ... - .doFinally { println("Finally") } // ... into what's going on - var cnt = 0 - source.openSubscription().consume { // open channel to the source - for (x in this) { // iterate over the channel to receive elements from it - println(x) - if (++cnt >= 3) break // break when 3 elements are printed - } - // Note: `consume` cancels the channel when this block of code is complete - } -} -``` - -> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-03.kt). - -It produces the following output: - -```text -OnSubscribe -1 -2 -3 -Finally -``` - - - -With an explicit `openSubscription` we should [cancel][ReceiveChannel.cancel] the corresponding -subscription to unsubscribe from the source, but there is no need to call `cancel` explicitly -- -[consume] does that for us under the hood. -The installed -[doFinally](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#doFinally(io.reactivex.functions.Action)) -listener prints "Finally" to confirm that the subscription is actually being closed. Note that "OnComplete" -is never printed because we did not consume all of the elements. - -We do not need to use an explicit `cancel` either if we `collect` all the elements: - - - -```kotlin -fun main() = runBlocking { - val source = Flowable.range(1, 5) // a range of five numbers - .doOnSubscribe { println("OnSubscribe") } // provide some insight - .doOnComplete { println("OnComplete") } // ... - .doFinally { println("Finally") } // ... into what's going on - // collect the source fully - source.collect { println(it) } -} -``` - -> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-04.kt). - -We get the following output: - -```text -OnSubscribe -1 -2 -3 -OnComplete -Finally -4 -5 -``` - - - -Notice how "OnComplete" and "Finally" are printed before the lasts elements "4" and "5". -It happens because our `main` function in this -example is a coroutine that we start with the [runBlocking] coroutine builder. -Our main coroutine receives on the flowable using the `source.collect { ... }` expression. -The main coroutine is _suspended_ while it waits for the source to emit an item. -When the last items are emitted by `Flowable.range(1, 5)` it -_resumes_ the main coroutine, which gets dispatched onto the main thread to print this - last element at a later point in time, while the source completes and prints "Finally". - -### Backpressure - -Backpressure is one of the most interesting and complex aspects of reactive streams. Coroutines can -_suspend_ and they provide a natural answer to handling backpressure. - -In Rx Java 2.x, the backpressure-capable class is called -[Flowable](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html). -In the following example, we use [rxFlowable] coroutine builder from `kotlinx-coroutines-rx2` module to define a -flowable that sends three integers from 1 to 3. -It prints a message to the output before invocation of the -suspending [send][SendChannel.send] function, so that we can study how it operates. - -The integers are generated in the context of the main thread, but the subscription is shifted -to another thread using Rx -[observeOn](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#observeOn(io.reactivex.Scheduler,%20boolean,%20int)) -operator with a buffer of size 1. -The subscriber is slow. It takes 500 ms to process each item, which is simulated using `Thread.sleep`. - - - -```kotlin -fun main() = runBlocking { - // coroutine -- fast producer of elements in the context of the main thread - val source = rxFlowable { - for (x in 1..3) { - send(x) // this is a suspending function - println("Sent $x") // print after successfully sent item - } - } - // subscribe on another thread with a slow subscriber using Rx - source - .observeOn(Schedulers.io(), false, 1) // specify buffer size of 1 item - .doOnComplete { println("Complete") } - .subscribe { x -> - Thread.sleep(500) // 500ms to process each item - println("Processed $x") - } - delay(2000) // suspend the main thread for a few seconds -} -``` - -> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-05.kt). - -The output of this code nicely illustrates how backpressure works with coroutines: - -```text -Sent 1 -Processed 1 -Sent 2 -Processed 2 -Sent 3 -Processed 3 -Complete -``` - - - -We see here how the producer coroutine puts the first element in the buffer and is suspended while trying to send another -one. Only after the consumer processes the first item, the producer sends the second one and resumes, etc. - - -### Rx Subject vs BroadcastChannel - -RxJava has a concept of [Subject](https://github.com/ReactiveX/RxJava/wiki/Subject) which is an object that -effectively broadcasts elements to all its subscribers. The matching concept in the coroutines world is called a -[BroadcastChannel]. There is a variety of subjects in Rx with -[BehaviorSubject](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/subjects/BehaviorSubject.html) being -the one used to manage state: - - - -```kotlin -fun main() { - val subject = BehaviorSubject.create() - subject.onNext("one") - subject.onNext("two") // updates the state of BehaviorSubject, "one" value is lost - // now subscribe to this subject and print everything - subject.subscribe(System.out::println) - subject.onNext("three") - subject.onNext("four") -} -``` - -> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-06.kt). - -This code prints the current state of the subject on subscription and all its further updates: - - -```text -two -three -four -``` - - - -You can subscribe to subjects from a coroutine just as with any other reactive stream: - - - -```kotlin -fun main() = runBlocking { - val subject = BehaviorSubject.create() - subject.onNext("one") - subject.onNext("two") - // now launch a coroutine to print everything - GlobalScope.launch(Dispatchers.Unconfined) { // launch coroutine in unconfined context - subject.collect { println(it) } - } - subject.onNext("three") - subject.onNext("four") -} -``` - -> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-07.kt). - -The result is the same: - -```text -two -three -four -``` - - - -Here we use the [Dispatchers.Unconfined] coroutine context to launch a consuming coroutine with the same behavior as subscription in Rx. -It basically means that the launched coroutine is going to be immediately executed in the same thread that -is emitting elements. Contexts are covered in more details in a [separate section](#coroutine-context). - -The advantage of coroutines is that it is easy to get conflation behavior for single-threaded UI updates. -A typical UI application does not need to react to every state change. Only the most recent state is relevant. -A sequence of back-to-back updates to the application state needs to get reflected in UI only once, -as soon as the UI thread is free. For the following example we are going to simulate this by launching -a consuming coroutine in the context of the main thread and use the [yield] function to simulate a break in the -sequence of updates and to release the main thread: - - - -```kotlin -fun main() = runBlocking { - val subject = BehaviorSubject.create() - subject.onNext("one") - subject.onNext("two") - // now launch a coroutine to print the most recent update - launch { // use the context of the main thread for a coroutine - subject.collect { println(it) } - } - subject.onNext("three") - subject.onNext("four") - yield() // yield the main thread to the launched coroutine <--- HERE - subject.onComplete() // now complete the subject's sequence to cancel the consumer, too -} -``` - -> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-08.kt). - -Now the coroutine processes (prints) only the most recent update: - -```text -four -``` - - - -The corresponding behavior in the pure coroutines world is implemented by [ConflatedBroadcastChannel] -that provides the same logic on top of coroutine channels directly, -without going through the bridge to the reactive streams: - - - -```kotlin -fun main() = runBlocking { - val broadcast = ConflatedBroadcastChannel() - broadcast.offer("one") - broadcast.offer("two") - // now launch a coroutine to print the most recent update - launch { // use the context of the main thread for a coroutine - broadcast.consumeEach { println(it) } - } - broadcast.offer("three") - broadcast.offer("four") - yield() // yield the main thread to the launched coroutine - broadcast.close() // now close the broadcast channel to cancel the consumer, too -} -``` - -> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-09.kt). - -It produces the same output as the previous example based on `BehaviorSubject`: - -```text -four -``` - - - -Another implementation of [BroadcastChannel] is `ArrayBroadcastChannel` with an array-based buffer of -a specified `capacity`. It can be created with `BroadcastChannel(capacity)`. -It delivers every event to every -subscriber as soon as their corresponding subscriptions are opened. It corresponds to -[PublishSubject](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/subjects/PublishSubject.html) in Rx. -The capacity of the buffer in the constructor of `ArrayBroadcastChannel` controls the numbers of elements -that can be sent before the sender is suspended waiting for a receiver to receive those elements. - -## Operators - -Full-featured reactive stream libraries, like Rx, come with -[a very large set of operators](https://reactivex.io/documentation/operators.html) to create, transform, combine -and otherwise process the corresponding streams. Creating your own operators with support for -back-pressure is [notoriously](https://akarnokd.blogspot.ru/2015/05/pitfalls-of-operator-implementations.html) -[difficult](https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0). - -Coroutines and channels are designed to provide an opposite experience. There are no built-in operators, -but processing streams of elements is extremely simple and back-pressure is supported automatically -without you having to explicitly think about it. - -This section shows a coroutine-based implementation of several reactive stream operators. - -### Range - -Let's roll out own implementation of -[range](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#range(int,%20int)) -operator for the reactive streams' `Publisher` interface. The asynchronous clean-slate implementation of this operator for -reactive streams is explained in -[this blog post](https://akarnokd.blogspot.ru/2017/03/java-9-flow-api-asynchronous-integer.html). -It takes a lot of code. -Here is the corresponding code with coroutines: - - - -```kotlin -fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = publish(context) { - for (x in start until start + count) send(x) -} -``` - -Here, `CoroutineScope` and `context` are used instead of an `Executor` and all the backpressure aspects are taken care -of by the coroutines machinery. Note that this implementation depends only on the small reactive streams library -that defines the `Publisher` interface and its friends. - -Using it from a coroutine is straightforward: - -```kotlin -fun main() = runBlocking { - // Range inherits parent job from runBlocking, but overrides dispatcher with Dispatchers.Default - range(Dispatchers.Default, 1, 5).collect { println(it) } -} -``` - -> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-operators-01.kt). - -The result of this code is quite expected: - -```text -1 -2 -3 -4 -5 -``` - - - -### Fused filter-map hybrid - -Reactive operators like -[filter](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#filter(io.reactivex.functions.Predicate)) and -[map](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#map(io.reactivex.functions.Function)) -are trivial to implement with coroutines. For a bit of challenge and showcase, let us combine them -into the single `fusedFilterMap` operator: - - - -```kotlin -fun Publisher.fusedFilterMap( - context: CoroutineContext, // the context to execute this coroutine in - predicate: (T) -> Boolean, // the filter predicate - mapper: (T) -> R // the mapper function -) = publish(context) { - collect { // collect the source stream - if (predicate(it)) // filter part - send(mapper(it)) // map part - } -} -``` - -Using `range` from the previous example we can test our `fusedFilterMap` -by filtering for even numbers and mapping them to strings: - - - -```kotlin -fun main() = runBlocking { - range(1, 5) - .fusedFilterMap(Dispatchers.Unconfined, { it % 2 == 0}, { "$it is even" }) - .collect { println(it) } // print all the resulting strings -} -``` - -> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-operators-02.kt). - -It is not hard to see that the result is going to be: - -```text -2 is even -4 is even -``` - - - -### Take until - -Let's implement our own version of -[takeUntil](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#takeUntil(org.reactivestreams.Publisher)) -operator. It is quite [tricky](https://akarnokd.blogspot.ru/2015/05/pitfalls-of-operator-implementations.html) -as subscriptions to two streams need to be tracked and managed. -We need to relay all the elements from the source stream until the other stream either completes or -emits anything. However, we have the [select] expression to rescue us in the coroutines implementation: - - - -```kotlin -fun Publisher.takeUntil(context: CoroutineContext, other: Publisher) = publish(context) { - this@takeUntil.openSubscription().consume { // explicitly open channel to Publisher - val current = this - other.openSubscription().consume { // explicitly open channel to Publisher - val other = this - whileSelect { - other.onReceive { false } // bail out on any received element from `other` - current.onReceive { send(it); true } // resend element from this channel and continue - } - } - } -} -``` - -This code is using [whileSelect] as a nicer shortcut to `while(select{...}) {}` loop and Kotlin's -[consume] expressions to close the channels on exit, which unsubscribes from the corresponding publishers. - -The following hand-written combination of -[range](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#range(int,%20int)) with -[interval](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#interval(long,%20java.util.concurrent.TimeUnit,%20io.reactivex.Scheduler)) -is used for testing. It is coded using a `publish` coroutine builder -(its pure-Rx implementation is shown in later sections): - -```kotlin -fun CoroutineScope.rangeWithInterval(time: Long, start: Int, count: Int) = publish { - for (x in start until start + count) { - delay(time) // wait before sending each number - send(x) - } -} -``` - -The following code shows how `takeUntil` works: - -```kotlin -fun main() = runBlocking { - val slowNums = rangeWithInterval(200, 1, 10) // numbers with 200ms interval - val stop = rangeWithInterval(500, 1, 10) // the first one after 500ms - slowNums.takeUntil(Dispatchers.Unconfined, stop).collect { println(it) } // let's test it -} -``` - -> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-operators-03.kt). - -Producing - -```text -1 -2 -``` - - - -### Merge - -There are always at least two ways for processing multiple streams of data with coroutines. One way involving -[select] was shown in the previous example. The other way is just to launch multiple coroutines. Let -us implement -[merge](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#merge(org.reactivestreams.Publisher)) -operator using the latter approach: - - - -```kotlin -fun Publisher>.merge(context: CoroutineContext) = publish(context) { - collect { pub -> // for each publisher collected - launch { // launch a child coroutine - pub.collect { send(it) } // resend all element from this publisher - } - } -} -``` - -Notice that all the coroutines that are -being launched here are the children of the `publish` -coroutine and will get cancelled when the `publish` coroutine is cancelled or is otherwise completed. -Moreover, since the parent coroutine waits until all the children are complete, this implementation fully -merges all the received streams. - -For a test, let us start with the `rangeWithInterval` function from the previous example and write a -producer that sends its results twice with some delay: - - - -```kotlin -fun CoroutineScope.testPub() = publish> { - send(rangeWithInterval(250, 1, 4)) // number 1 at 250ms, 2 at 500ms, 3 at 750ms, 4 at 1000ms - delay(100) // wait for 100 ms - send(rangeWithInterval(500, 11, 3)) // number 11 at 600ms, 12 at 1100ms, 13 at 1600ms - delay(1100) // wait for 1.1s - done in 1.2 sec after start -} -``` - -The test code is to use `merge` on `testPub` and to display the results: - -```kotlin -fun main() = runBlocking { - testPub().merge(Dispatchers.Unconfined).collect { println(it) } // print the whole stream -} -``` - -> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-operators-04.kt). - -And the results should be: - -```text -1 -2 -11 -3 -4 -12 -13 -``` - - - -## Coroutine context - -All the example operators that are shown in the previous section have an explicit -[CoroutineContext](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines/-coroutine-context/) -parameter. In the Rx world it roughly corresponds to -a [Scheduler](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Scheduler.html). - -### Threads with Rx - -The following example shows the basics of threading context management with Rx. -Here `rangeWithIntervalRx` is an implementation of `rangeWithInterval` function using Rx -`zip`, `range`, and `interval` operators. - - - -```kotlin -fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable = - Flowable.zip( - Flowable.range(start, count), - Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler), - BiFunction { x, _ -> x }) - -fun main() { - rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3) - .subscribe { println("$it on thread ${Thread.currentThread().name}") } - Thread.sleep(1000) -} -``` - -> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-context-01.kt). - -We are explicitly passing the -[Schedulers.computation()](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/schedulers/Schedulers.html#computation()) -scheduler to our `rangeWithIntervalRx` operator and -it is going to be executed in Rx computation thread pool. The output is going to be similar to the following one: - -```text -1 on thread RxComputationThreadPool-1 -2 on thread RxComputationThreadPool-1 -3 on thread RxComputationThreadPool-1 -``` - - - -### Threads with coroutines - -In the world of coroutines `Schedulers.computation()` roughly corresponds to [Dispatchers.Default], -so the previous example is similar to the following one: - - - -```kotlin -fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish(context) { - for (x in start until start + count) { - delay(time) // wait before sending each number - send(x) - } -} - -fun main() { - Flowable.fromPublisher(rangeWithInterval(Dispatchers.Default, 100, 1, 3)) - .subscribe { println("$it on thread ${Thread.currentThread().name}") } - Thread.sleep(1000) -} -``` - -> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-context-02.kt). - -The produced output is going to be similar to: - -```text -1 on thread ForkJoinPool.commonPool-worker-1 -2 on thread ForkJoinPool.commonPool-worker-1 -3 on thread ForkJoinPool.commonPool-worker-1 -``` - - - -Here we've used Rx -[subscribe](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#subscribe(io.reactivex.functions.Consumer)) -operator that does not have its own scheduler and operates on the same thread that the publisher -- on a default -shared pool of threads in this example. - -### Rx observeOn - -In Rx you use special operators to modify the threading context for operations in the chain. You -can find some [good guides](https://tomstechnicalblog.blogspot.ru/2016/02/rxjava-understanding-observeon-and.html) -about them, if you are not familiar. - -For example, there is -[observeOn](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#observeOn(io.reactivex.Scheduler)) -operator. Let us modify the previous example to observe using `Schedulers.computation()`: - - - -```kotlin -fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish(context) { - for (x in start until start + count) { - delay(time) // wait before sending each number - send(x) - } -} - -fun main() { - Flowable.fromPublisher(rangeWithInterval(Dispatchers.Default, 100, 1, 3)) - .observeOn(Schedulers.computation()) // <-- THIS LINE IS ADDED - .subscribe { println("$it on thread ${Thread.currentThread().name}") } - Thread.sleep(1000) -} -``` - -> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-context-03.kt). - -Here is the difference in output, notice "RxComputationThreadPool": - -```text -1 on thread RxComputationThreadPool-1 -2 on thread RxComputationThreadPool-1 -3 on thread RxComputationThreadPool-1 -``` - - - -### Coroutine context to rule them all - -A coroutine is always working in some context. For example, let us start a coroutine -in the main thread with [runBlocking] and iterate over the result of the Rx version of `rangeWithIntervalRx` operator, -instead of using Rx `subscribe` operator: - - - -```kotlin -fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable = - Flowable.zip( - Flowable.range(start, count), - Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler), - BiFunction { x, _ -> x }) - -fun main() = runBlocking { - rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3) - .collect { println("$it on thread ${Thread.currentThread().name}") } -} -``` - -> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-context-04.kt). - -The resulting messages are going to be printed in the main thread: - -```text -1 on thread main -2 on thread main -3 on thread main -``` - - - -### Unconfined context - -Most Rx operators do not have any specific thread (scheduler) associated with them and are working -in whatever thread they happen to be invoked. We've seen it in the example with the `subscribe` operator -in the [threads with Rx](#threads-with-rx) section. - -In the world of coroutines, [Dispatchers.Unconfined] context serves a similar role. Let us modify our previous example, -but instead of iterating over the source `Flowable` from the `runBlocking` coroutine that is confined -to the main thread, we launch a new coroutine in the `Dispatchers.Unconfined` context, while the main coroutine -simply waits for its completion using [Job.join]: - - - -```kotlin -fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable = - Flowable.zip( - Flowable.range(start, count), - Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler), - BiFunction { x, _ -> x }) - -fun main() = runBlocking { - val job = launch(Dispatchers.Unconfined) { // launch a new coroutine in Unconfined context (without its own thread pool) - rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3) - .collect { println("$it on thread ${Thread.currentThread().name}") } - } - job.join() // wait for our coroutine to complete -} -``` - -> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-context-05.kt). - -Now, the output shows that the code of the coroutine is executing in the Rx computation thread pool, just -like our initial example using the Rx `subscribe` operator. - -```text -1 on thread RxComputationThreadPool-1 -2 on thread RxComputationThreadPool-1 -3 on thread RxComputationThreadPool-1 -``` - - - -Note that the [Dispatchers.Unconfined] context should be used with care. It may improve the overall performance on certain tests, -due to the increased stack-locality of operations and less scheduling overhead, but it also produces deeper stacks -and makes it harder to reason about asynchronicity of the code that is using it. - -If a coroutine sends an element to a channel, then the thread that invoked the -[send][SendChannel.send] may start executing the code of the coroutine with the [Dispatchers.Unconfined] dispatcher. -The original producer coroutine that invoked `send` is paused until the unconfined consumer coroutine hits its next -suspension point. This is very similar to a lock-step single-threaded `onNext` execution in the Rx world in the absense -of thread-shifting operators. It is a normal default for Rx, because operators are usually doing very small chunks -of work and you have to combine many operators for a complex processing. However, this is unusual with coroutines, -where you can have an arbitrary complex processing in a coroutine. Usually, you only need to chain stream-processing -coroutines for complex pipelines with fan-in and fan-out between multiple worker coroutines. - - - -[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html -[Dispatchers.Unconfined]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-unconfined.html -[yield]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/yield.html -[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html -[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html - -[Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html -[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html -[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html -[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html -[ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/index.html -[ReceiveChannel.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html -[consume]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume.html -[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html -[BroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-broadcast-channel/index.html -[ConflatedBroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-conflated-broadcast-channel/index.html - -[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html -[whileSelect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/while-select.html - - -[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/publish.html -[org.reactivestreams.Publisher.collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/collect.html -[org.reactivestreams.Publisher.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/open-subscription.html - - -[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html - - - diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-01.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-01.kt deleted file mode 100644 index f3bc344d58..0000000000 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-01.kt +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit. -package kotlinx.coroutines.rx2.guide.basic01 - -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.* -import kotlin.coroutines.* - -fun main() = runBlocking { - // create a channel that produces numbers from 1 to 3 with 200ms delays between them - val source = produce { - println("Begin") // mark the beginning of this coroutine in output - for (x in 1..3) { - delay(200) // wait for 200ms - send(x) // send number x to the channel - } - } - // print elements from the source - println("Elements:") - source.consumeEach { // consume elements from it - println(it) - } - // print elements from the source AGAIN - println("Again:") - source.consumeEach { // consume elements from it - println(it) - } -} diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-02.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-02.kt deleted file mode 100644 index 0e0ff2e58e..0000000000 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-02.kt +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit. -package kotlinx.coroutines.rx2.guide.basic02 - -import kotlinx.coroutines.* -import kotlinx.coroutines.reactive.* -import kotlin.coroutines.* - -fun main() = runBlocking { - // create a publisher that produces numbers from 1 to 3 with 200ms delays between them - val source = publish { - // ^^^^^^^ <--- Difference from the previous examples is here - println("Begin") // mark the beginning of this coroutine in output - for (x in 1..3) { - delay(200) // wait for 200ms - send(x) // send number x to the channel - } - } - // print elements from the source - println("Elements:") - source.collect { // collect elements from it - println(it) - } - // print elements from the source AGAIN - println("Again:") - source.collect { // collect elements from it - println(it) - } -} diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-03.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-03.kt deleted file mode 100644 index b84fc08f53..0000000000 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-03.kt +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit. -package kotlinx.coroutines.rx2.guide.basic03 - -import io.reactivex.* -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.* -import kotlinx.coroutines.reactive.* - -fun main() = runBlocking { - val source = Flowable.range(1, 5) // a range of five numbers - .doOnSubscribe { println("OnSubscribe") } // provide some insight - .doOnComplete { println("OnComplete") } // ... - .doFinally { println("Finally") } // ... into what's going on - var cnt = 0 - source.openSubscription().consume { // open channel to the source - for (x in this) { // iterate over the channel to receive elements from it - println(x) - if (++cnt >= 3) break // break when 3 elements are printed - } - // Note: `consume` cancels the channel when this block of code is complete - } -} diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-04.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-04.kt deleted file mode 100644 index a08c41fce5..0000000000 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-04.kt +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit. -package kotlinx.coroutines.rx2.guide.basic04 - -import io.reactivex.* -import kotlinx.coroutines.* -import kotlinx.coroutines.reactive.* -import kotlin.coroutines.* - -fun main() = runBlocking { - val source = Flowable.range(1, 5) // a range of five numbers - .doOnSubscribe { println("OnSubscribe") } // provide some insight - .doOnComplete { println("OnComplete") } // ... - .doFinally { println("Finally") } // ... into what's going on - // collect the source fully - source.collect { println(it) } -} diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-05.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-05.kt deleted file mode 100644 index e6428b9260..0000000000 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-05.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit. -package kotlinx.coroutines.rx2.guide.basic05 - -import io.reactivex.schedulers.* -import kotlinx.coroutines.* -import kotlinx.coroutines.rx2.* -import kotlin.coroutines.* - -fun main() = runBlocking { - // coroutine -- fast producer of elements in the context of the main thread - val source = rxFlowable { - for (x in 1..3) { - send(x) // this is a suspending function - println("Sent $x") // print after successfully sent item - } - } - // subscribe on another thread with a slow subscriber using Rx - source - .observeOn(Schedulers.io(), false, 1) // specify buffer size of 1 item - .doOnComplete { println("Complete") } - .subscribe { x -> - Thread.sleep(500) // 500ms to process each item - println("Processed $x") - } - delay(2000) // suspend the main thread for a few seconds -} diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-06.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-06.kt deleted file mode 100644 index 1f3747f4be..0000000000 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-06.kt +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit. -package kotlinx.coroutines.rx2.guide.basic06 - -import io.reactivex.subjects.BehaviorSubject - -fun main() { - val subject = BehaviorSubject.create() - subject.onNext("one") - subject.onNext("two") // updates the state of BehaviorSubject, "one" value is lost - // now subscribe to this subject and print everything - subject.subscribe(System.out::println) - subject.onNext("three") - subject.onNext("four") -} diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-07.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-07.kt deleted file mode 100644 index b4cc9fc937..0000000000 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-07.kt +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit. -package kotlinx.coroutines.rx2.guide.basic07 - -import io.reactivex.subjects.BehaviorSubject -import kotlinx.coroutines.* -import kotlinx.coroutines.rx2.collect - -fun main() = runBlocking { - val subject = BehaviorSubject.create() - subject.onNext("one") - subject.onNext("two") - // now launch a coroutine to print everything - GlobalScope.launch(Dispatchers.Unconfined) { // launch coroutine in unconfined context - subject.collect { println(it) } - } - subject.onNext("three") - subject.onNext("four") -} diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-08.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-08.kt deleted file mode 100644 index 8e17ac9cd8..0000000000 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-08.kt +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit. -package kotlinx.coroutines.rx2.guide.basic08 - -import io.reactivex.subjects.* -import kotlinx.coroutines.* -import kotlinx.coroutines.rx2.* -import kotlin.coroutines.* - -fun main() = runBlocking { - val subject = BehaviorSubject.create() - subject.onNext("one") - subject.onNext("two") - // now launch a coroutine to print the most recent update - launch { // use the context of the main thread for a coroutine - subject.collect { println(it) } - } - subject.onNext("three") - subject.onNext("four") - yield() // yield the main thread to the launched coroutine <--- HERE - subject.onComplete() // now complete the subject's sequence to cancel the consumer, too -} diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-09.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-09.kt deleted file mode 100644 index 738c4aba29..0000000000 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-basic-09.kt +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit. -package kotlinx.coroutines.rx2.guide.basic09 - -import kotlinx.coroutines.channels.* -import kotlinx.coroutines.* -import kotlin.coroutines.* - -fun main() = runBlocking { - val broadcast = ConflatedBroadcastChannel() - broadcast.offer("one") - broadcast.offer("two") - // now launch a coroutine to print the most recent update - launch { // use the context of the main thread for a coroutine - broadcast.consumeEach { println(it) } - } - broadcast.offer("three") - broadcast.offer("four") - yield() // yield the main thread to the launched coroutine - broadcast.close() // now close the broadcast channel to cancel the consumer, too -} diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-01.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-01.kt deleted file mode 100644 index b12e92ae10..0000000000 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-01.kt +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit. -package kotlinx.coroutines.rx2.guide.context01 - -import io.reactivex.* -import io.reactivex.functions.BiFunction -import io.reactivex.schedulers.Schedulers -import java.util.concurrent.TimeUnit - -fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable = - Flowable.zip( - Flowable.range(start, count), - Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler), - BiFunction { x, _ -> x }) - -fun main() { - rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3) - .subscribe { println("$it on thread ${Thread.currentThread().name}") } - Thread.sleep(1000) -} diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-02.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-02.kt deleted file mode 100644 index b87849a562..0000000000 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-02.kt +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit. -package kotlinx.coroutines.rx2.guide.context02 - -import io.reactivex.* -import kotlinx.coroutines.* -import kotlinx.coroutines.reactive.* -import kotlin.coroutines.CoroutineContext - -fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish(context) { - for (x in start until start + count) { - delay(time) // wait before sending each number - send(x) - } -} - -fun main() { - Flowable.fromPublisher(rangeWithInterval(Dispatchers.Default, 100, 1, 3)) - .subscribe { println("$it on thread ${Thread.currentThread().name}") } - Thread.sleep(1000) -} diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-03.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-03.kt deleted file mode 100644 index 1a214ce308..0000000000 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-03.kt +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit. -package kotlinx.coroutines.rx2.guide.context03 - -import io.reactivex.* -import kotlinx.coroutines.* -import kotlinx.coroutines.reactive.* -import io.reactivex.schedulers.Schedulers -import kotlin.coroutines.CoroutineContext - -fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = publish(context) { - for (x in start until start + count) { - delay(time) // wait before sending each number - send(x) - } -} - -fun main() { - Flowable.fromPublisher(rangeWithInterval(Dispatchers.Default, 100, 1, 3)) - .observeOn(Schedulers.computation()) // <-- THIS LINE IS ADDED - .subscribe { println("$it on thread ${Thread.currentThread().name}") } - Thread.sleep(1000) -} diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-04.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-04.kt deleted file mode 100644 index 3c5d3fb53a..0000000000 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-04.kt +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit. -package kotlinx.coroutines.rx2.guide.context04 - -import io.reactivex.* -import kotlinx.coroutines.* -import kotlinx.coroutines.reactive.* -import io.reactivex.functions.BiFunction -import io.reactivex.schedulers.Schedulers -import java.util.concurrent.TimeUnit - -fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable = - Flowable.zip( - Flowable.range(start, count), - Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler), - BiFunction { x, _ -> x }) - -fun main() = runBlocking { - rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3) - .collect { println("$it on thread ${Thread.currentThread().name}") } -} diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-05.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-05.kt deleted file mode 100644 index 61b54b2b4d..0000000000 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-context-05.kt +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit. -package kotlinx.coroutines.rx2.guide.context05 - -import io.reactivex.* -import kotlinx.coroutines.* -import kotlinx.coroutines.reactive.* -import io.reactivex.functions.BiFunction -import io.reactivex.schedulers.Schedulers -import java.util.concurrent.TimeUnit - -fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable = - Flowable.zip( - Flowable.range(start, count), - Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler), - BiFunction { x, _ -> x }) - -fun main() = runBlocking { - val job = launch(Dispatchers.Unconfined) { // launch a new coroutine in Unconfined context (without its own thread pool) - rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3) - .collect { println("$it on thread ${Thread.currentThread().name}") } - } - job.join() // wait for our coroutine to complete -} diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-01.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-01.kt deleted file mode 100644 index 8268ef27cd..0000000000 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-01.kt +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit. -package kotlinx.coroutines.rx2.guide.operators01 - -import kotlinx.coroutines.* -import kotlinx.coroutines.reactive.* -import kotlin.coroutines.CoroutineContext - -fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = publish(context) { - for (x in start until start + count) send(x) -} - -fun main() = runBlocking { - // Range inherits parent job from runBlocking, but overrides dispatcher with Dispatchers.Default - range(Dispatchers.Default, 1, 5).collect { println(it) } -} diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-02.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-02.kt deleted file mode 100644 index 5f07ba4972..0000000000 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-02.kt +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit. -package kotlinx.coroutines.rx2.guide.operators02 - -import kotlinx.coroutines.* -import kotlinx.coroutines.reactive.* -import org.reactivestreams.* -import kotlin.coroutines.* - -fun Publisher.fusedFilterMap( - context: CoroutineContext, // the context to execute this coroutine in - predicate: (T) -> Boolean, // the filter predicate - mapper: (T) -> R // the mapper function -) = publish(context) { - collect { // collect the source stream - if (predicate(it)) // filter part - send(mapper(it)) // map part - } -} - -fun CoroutineScope.range(start: Int, count: Int) = publish { - for (x in start until start + count) send(x) -} - -fun main() = runBlocking { - range(1, 5) - .fusedFilterMap(Dispatchers.Unconfined, { it % 2 == 0}, { "$it is even" }) - .collect { println(it) } // print all the resulting strings -} diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-03.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-03.kt deleted file mode 100644 index 818a792bac..0000000000 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-03.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit. -package kotlinx.coroutines.rx2.guide.operators03 - -import kotlinx.coroutines.channels.* -import kotlinx.coroutines.* -import kotlinx.coroutines.reactive.* -import kotlinx.coroutines.selects.* -import org.reactivestreams.* -import kotlin.coroutines.* - -fun Publisher.takeUntil(context: CoroutineContext, other: Publisher) = publish(context) { - this@takeUntil.openSubscription().consume { // explicitly open channel to Publisher - val current = this - other.openSubscription().consume { // explicitly open channel to Publisher - val other = this - whileSelect { - other.onReceive { false } // bail out on any received element from `other` - current.onReceive { send(it); true } // resend element from this channel and continue - } - } - } -} - -fun CoroutineScope.rangeWithInterval(time: Long, start: Int, count: Int) = publish { - for (x in start until start + count) { - delay(time) // wait before sending each number - send(x) - } -} - -fun main() = runBlocking { - val slowNums = rangeWithInterval(200, 1, 10) // numbers with 200ms interval - val stop = rangeWithInterval(500, 1, 10) // the first one after 500ms - slowNums.takeUntil(Dispatchers.Unconfined, stop).collect { println(it) } // let's test it -} diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-04.kt b/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-04.kt deleted file mode 100644 index 12d9c1f647..0000000000 --- a/reactive/kotlinx-coroutines-rx2/test/guide/example-reactive-operators-04.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit. -package kotlinx.coroutines.rx2.guide.operators04 - -import kotlinx.coroutines.* -import kotlinx.coroutines.reactive.* -import org.reactivestreams.* -import kotlin.coroutines.* - -fun Publisher>.merge(context: CoroutineContext) = publish(context) { - collect { pub -> // for each publisher collected - launch { // launch a child coroutine - pub.collect { send(it) } // resend all element from this publisher - } - } -} - -fun CoroutineScope.rangeWithInterval(time: Long, start: Int, count: Int) = publish { - for (x in start until start + count) { - delay(time) // wait before sending each number - send(x) - } -} - -fun CoroutineScope.testPub() = publish> { - send(rangeWithInterval(250, 1, 4)) // number 1 at 250ms, 2 at 500ms, 3 at 750ms, 4 at 1000ms - delay(100) // wait for 100 ms - send(rangeWithInterval(500, 11, 3)) // number 11 at 600ms, 12 at 1100ms, 13 at 1600ms - delay(1100) // wait for 1.1s - done in 1.2 sec after start -} - -fun main() = runBlocking { - testPub().merge(Dispatchers.Unconfined).collect { println(it) } // print the whole stream -} diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/test/GuideReactiveTest.kt b/reactive/kotlinx-coroutines-rx2/test/guide/test/GuideReactiveTest.kt deleted file mode 100644 index cebfc7b429..0000000000 --- a/reactive/kotlinx-coroutines-rx2/test/guide/test/GuideReactiveTest.kt +++ /dev/null @@ -1,191 +0,0 @@ -// This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit. -package kotlinx.coroutines.rx2.guide.test - -import kotlinx.coroutines.guide.test.* -import org.junit.Test - -class GuideReactiveTest : ReactiveTestBase() { - - @Test - fun testKotlinxCoroutinesRx2GuideBasic01() { - test("KotlinxCoroutinesRx2GuideBasic01") { kotlinx.coroutines.rx2.guide.basic01.main() }.verifyLines( - "Elements:", - "Begin", - "1", - "2", - "3", - "Again:" - ) - } - - @Test - fun testKotlinxCoroutinesRx2GuideBasic02() { - test("KotlinxCoroutinesRx2GuideBasic02") { kotlinx.coroutines.rx2.guide.basic02.main() }.verifyLines( - "Elements:", - "Begin", - "1", - "2", - "3", - "Again:", - "Begin", - "1", - "2", - "3" - ) - } - - @Test - fun testKotlinxCoroutinesRx2GuideBasic03() { - test("KotlinxCoroutinesRx2GuideBasic03") { kotlinx.coroutines.rx2.guide.basic03.main() }.verifyLines( - "OnSubscribe", - "1", - "2", - "3", - "Finally" - ) - } - - @Test - fun testKotlinxCoroutinesRx2GuideBasic04() { - test("KotlinxCoroutinesRx2GuideBasic04") { kotlinx.coroutines.rx2.guide.basic04.main() }.verifyLines( - "OnSubscribe", - "1", - "2", - "3", - "OnComplete", - "Finally", - "4", - "5" - ) - } - - @Test - fun testKotlinxCoroutinesRx2GuideBasic05() { - test("KotlinxCoroutinesRx2GuideBasic05") { kotlinx.coroutines.rx2.guide.basic05.main() }.verifyLines( - "Sent 1", - "Processed 1", - "Sent 2", - "Processed 2", - "Sent 3", - "Processed 3", - "Complete" - ) - } - - @Test - fun testKotlinxCoroutinesRx2GuideBasic06() { - test("KotlinxCoroutinesRx2GuideBasic06") { kotlinx.coroutines.rx2.guide.basic06.main() }.verifyLines( - "two", - "three", - "four" - ) - } - - @Test - fun testKotlinxCoroutinesRx2GuideBasic07() { - test("KotlinxCoroutinesRx2GuideBasic07") { kotlinx.coroutines.rx2.guide.basic07.main() }.verifyLines( - "two", - "three", - "four" - ) - } - - @Test - fun testKotlinxCoroutinesRx2GuideBasic08() { - test("KotlinxCoroutinesRx2GuideBasic08") { kotlinx.coroutines.rx2.guide.basic08.main() }.verifyLines( - "four" - ) - } - - @Test - fun testKotlinxCoroutinesRx2GuideBasic09() { - test("KotlinxCoroutinesRx2GuideBasic09") { kotlinx.coroutines.rx2.guide.basic09.main() }.verifyLines( - "four" - ) - } - - @Test - fun testKotlinxCoroutinesRx2GuideOperators01() { - test("KotlinxCoroutinesRx2GuideOperators01") { kotlinx.coroutines.rx2.guide.operators01.main() }.verifyLines( - "1", - "2", - "3", - "4", - "5" - ) - } - - @Test - fun testKotlinxCoroutinesRx2GuideOperators02() { - test("KotlinxCoroutinesRx2GuideOperators02") { kotlinx.coroutines.rx2.guide.operators02.main() }.verifyLines( - "2 is even", - "4 is even" - ) - } - - @Test - fun testKotlinxCoroutinesRx2GuideOperators03() { - test("KotlinxCoroutinesRx2GuideOperators03") { kotlinx.coroutines.rx2.guide.operators03.main() }.verifyLines( - "1", - "2" - ) - } - - @Test - fun testKotlinxCoroutinesRx2GuideOperators04() { - test("KotlinxCoroutinesRx2GuideOperators04") { kotlinx.coroutines.rx2.guide.operators04.main() }.verifyLines( - "1", - "2", - "11", - "3", - "4", - "12", - "13" - ) - } - - @Test - fun testKotlinxCoroutinesRx2GuideContext01() { - test("KotlinxCoroutinesRx2GuideContext01") { kotlinx.coroutines.rx2.guide.context01.main() }.verifyLinesFlexibleThread( - "1 on thread RxComputationThreadPool-1", - "2 on thread RxComputationThreadPool-1", - "3 on thread RxComputationThreadPool-1" - ) - } - - @Test - fun testKotlinxCoroutinesRx2GuideContext02() { - test("KotlinxCoroutinesRx2GuideContext02") { kotlinx.coroutines.rx2.guide.context02.main() }.verifyLinesStart( - "1 on thread ForkJoinPool.commonPool-worker-1", - "2 on thread ForkJoinPool.commonPool-worker-1", - "3 on thread ForkJoinPool.commonPool-worker-1" - ) - } - - @Test - fun testKotlinxCoroutinesRx2GuideContext03() { - test("KotlinxCoroutinesRx2GuideContext03") { kotlinx.coroutines.rx2.guide.context03.main() }.verifyLinesFlexibleThread( - "1 on thread RxComputationThreadPool-1", - "2 on thread RxComputationThreadPool-1", - "3 on thread RxComputationThreadPool-1" - ) - } - - @Test - fun testKotlinxCoroutinesRx2GuideContext04() { - test("KotlinxCoroutinesRx2GuideContext04") { kotlinx.coroutines.rx2.guide.context04.main() }.verifyLinesStart( - "1 on thread main", - "2 on thread main", - "3 on thread main" - ) - } - - @Test - fun testKotlinxCoroutinesRx2GuideContext05() { - test("KotlinxCoroutinesRx2GuideContext05") { kotlinx.coroutines.rx2.guide.context05.main() }.verifyLinesStart( - "1 on thread RxComputationThreadPool-1", - "2 on thread RxComputationThreadPool-1", - "3 on thread RxComputationThreadPool-1" - ) - } -} diff --git a/reactive/kotlinx-coroutines-rx2/test/guide/test/ReactiveTestBase.kt b/reactive/kotlinx-coroutines-rx2/test/guide/test/ReactiveTestBase.kt deleted file mode 100644 index 3bedb62b6f..0000000000 --- a/reactive/kotlinx-coroutines-rx2/test/guide/test/ReactiveTestBase.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.rx2.guide.test - -import io.reactivex.* -import io.reactivex.disposables.* -import io.reactivex.plugins.* -import kotlinx.coroutines.* -import kotlinx.coroutines.guide.test.* -import org.junit.* -import java.util.concurrent.* - -open class ReactiveTestBase { - @Before - fun setup() { - RxJavaPlugins.setIoSchedulerHandler(Handler) - RxJavaPlugins.setComputationSchedulerHandler(Handler) - ignoreLostThreads( - "RxComputationThreadPool-", - "RxCachedThreadScheduler-", - "RxCachedWorkerPoolEvictor-", - "RxSchedulerPurge-") - } - - @After - fun tearDown() { - RxJavaPlugins.setIoSchedulerHandler(null) - RxJavaPlugins.setComputationSchedulerHandler(null) - } -} - -private object Handler : io.reactivex.functions.Function { - override fun apply(t: Scheduler): Scheduler = WrapperScheduler(t) -} - -private class WrapperScheduler(private val scheduler: Scheduler) : Scheduler() { - override fun createWorker(): Worker = WrapperWorker(scheduler.createWorker()) -} - -private class WrapperWorker(private val worker: Scheduler.Worker) : Scheduler.Worker() { - override fun isDisposed(): Boolean = worker.isDisposed - override fun dispose() = worker.dispose() - override fun schedule(run: Runnable, delay: Long, unit: TimeUnit): Disposable = - worker.schedule(wrapTask(run), delay, unit) -} From 89bae52e9768e9626b055f0c5ff03aa37fbe71b6 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 25 Sep 2019 17:09:36 +0300 Subject: [PATCH 6/6] Version 1.3.2 --- CHANGES.md | 9 +++++++++ README.md | 16 ++++++++-------- gradle.properties | 2 +- kotlinx-coroutines-debug/README.md | 4 ++-- kotlinx-coroutines-test/README.md | 2 +- ui/coroutines-guide-ui.md | 2 +- .../animation-app/gradle.properties | 2 +- .../example-app/gradle.properties | 2 +- 8 files changed, 24 insertions(+), 15 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 4db08d9d48..8f08a9d58b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,14 @@ # Change log for kotlinx.coroutines +## Version 1.3.2 + +This is a maintenance release that does not include any new features or bug fixes. + +* Reactive integrations for `Flow` are promoted to stable API. +* Obsolete reactive API is deprecated. +* Deprecation level for API deprecated in 1.3.0 is increased. +* Various documentation improvements. + ## Version 1.3.1 This is a minor update with various fixes: diff --git a/README.md b/README.md index 862ce16232..ad5f5ddefd 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![official JetBrains project](https://jb.gg/badges/official.svg)](https://confluence.jetbrains.com/display/ALL/JetBrains+on+GitHub) [![GitHub license](https://img.shields.io/badge/license-Apache%20License%202.0-blue.svg?style=flat)](https://www.apache.org/licenses/LICENSE-2.0) -[![Download](https://api.bintray.com/packages/kotlin/kotlinx/kotlinx.coroutines/images/download.svg?version=1.3.1) ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.3.1) +[![Download](https://api.bintray.com/packages/kotlin/kotlinx/kotlinx.coroutines/images/download.svg?version=1.3.2) ](https://bintray.com/kotlin/kotlinx/kotlinx.coroutines/1.3.2) Library support for Kotlin coroutines with [multiplatform](#multiplatform) support. This is a companion version for Kotlin `1.3.50` release. @@ -82,7 +82,7 @@ Add dependencies (you can also add other modules that you need): org.jetbrains.kotlinx kotlinx-coroutines-core - 1.3.1 + 1.3.2 ``` @@ -100,7 +100,7 @@ Add dependencies (you can also add other modules that you need): ```groovy dependencies { - implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.1' + implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.2' } ``` @@ -126,7 +126,7 @@ Add dependencies (you can also add other modules that you need): ```groovy dependencies { - implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.1") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.2") } ``` @@ -145,7 +145,7 @@ Make sure that you have either `jcenter()` or `mavenCentral()` in the list of re Core modules of `kotlinx.coroutines` are also available for [Kotlin/JS](#js) and [Kotlin/Native](#native). In common code that should get compiled for different platforms, add dependency to -[`kotlinx-coroutines-core-common`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-common/1.3.1/jar) +[`kotlinx-coroutines-core-common`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-common/1.3.2/jar) (follow the link to get the dependency declaration snippet). ### Android @@ -154,7 +154,7 @@ Add [`kotlinx-coroutines-android`](ui/kotlinx-coroutines-android) module as dependency when using `kotlinx.coroutines` on Android: ```groovy -implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.1' +implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.2' ``` This gives you access to Android [Dispatchers.Main] @@ -173,7 +173,7 @@ R8 is a replacement for ProGuard in Android ecosystem, it is enabled by default ### JS [Kotlin/JS](https://kotlinlang.org/docs/reference/js-overview.html) version of `kotlinx.coroutines` is published as -[`kotlinx-coroutines-core-js`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-js/1.3.1/jar) +[`kotlinx-coroutines-core-js`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-js/1.3.2/jar) (follow the link to get the dependency declaration snippet). You can also use [`kotlinx-coroutines-core`](https://www.npmjs.com/package/kotlinx-coroutines-core) package via NPM. @@ -181,7 +181,7 @@ You can also use [`kotlinx-coroutines-core`](https://www.npmjs.com/package/kotli ### Native [Kotlin/Native](https://kotlinlang.org/docs/reference/native-overview.html) version of `kotlinx.coroutines` is published as -[`kotlinx-coroutines-core-native`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-native/1.3.1/jar) +[`kotlinx-coroutines-core-native`](https://search.maven.org/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-native/1.3.2/jar) (follow the link to get the dependency declaration snippet). Only single-threaded code (JS-style) on Kotlin/Native is currently supported. diff --git a/gradle.properties b/gradle.properties index 335c999977..949a2a5223 100644 --- a/gradle.properties +++ b/gradle.properties @@ -3,7 +3,7 @@ # # Kotlin -version=1.3.1-SNAPSHOT +version=1.3.2-SNAPSHOT group=org.jetbrains.kotlinx kotlin_version=1.3.50 diff --git a/kotlinx-coroutines-debug/README.md b/kotlinx-coroutines-debug/README.md index ca2a05b628..b7a15e8995 100644 --- a/kotlinx-coroutines-debug/README.md +++ b/kotlinx-coroutines-debug/README.md @@ -18,7 +18,7 @@ of coroutines hierarchy referenced by a [Job] or [CoroutineScope] instances usin Add `kotlinx-coroutines-debug` to your project test dependencies: ``` dependencies { - testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-debug:1.3.1' + testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-debug:1.3.2' } ``` @@ -57,7 +57,7 @@ stacktraces will be dumped to the console. ### Using as JVM agent It is possible to use this module as a standalone JVM agent to enable debug probes on the application startup. -You can run your application with an additional argument: `-javaagent:kotlinx-coroutines-debug-1.3.1.jar`. +You can run your application with an additional argument: `-javaagent:kotlinx-coroutines-debug-1.3.2.jar`. Additionally, on Linux and Mac OS X you can use `kill -5 $pid` command in order to force your application to print all alive coroutines. diff --git a/kotlinx-coroutines-test/README.md b/kotlinx-coroutines-test/README.md index 014c53b702..a634fecc68 100644 --- a/kotlinx-coroutines-test/README.md +++ b/kotlinx-coroutines-test/README.md @@ -9,7 +9,7 @@ This package provides testing utilities for effectively testing coroutines. Add `kotlinx-coroutines-test` to your project test dependencies: ``` dependencies { - testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.3.1' + testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.3.2' } ``` diff --git a/ui/coroutines-guide-ui.md b/ui/coroutines-guide-ui.md index 049c944121..ffaf1ae9f4 100644 --- a/ui/coroutines-guide-ui.md +++ b/ui/coroutines-guide-ui.md @@ -165,7 +165,7 @@ Add dependencies on `kotlinx-coroutines-android` module to the `dependencies { . `app/build.gradle` file: ```groovy -implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.1" +implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.2" ``` You can clone [kotlinx.coroutines](https://github.com/Kotlin/kotlinx.coroutines) project from GitHub onto your diff --git a/ui/kotlinx-coroutines-android/animation-app/gradle.properties b/ui/kotlinx-coroutines-android/animation-app/gradle.properties index 8e119d7159..da09045551 100644 --- a/ui/kotlinx-coroutines-android/animation-app/gradle.properties +++ b/ui/kotlinx-coroutines-android/animation-app/gradle.properties @@ -19,5 +19,5 @@ org.gradle.jvmargs=-Xmx1536m kotlin.coroutines=enable kotlin_version=1.3.50 -coroutines_version=1.3.1 +coroutines_version=1.3.2 diff --git a/ui/kotlinx-coroutines-android/example-app/gradle.properties b/ui/kotlinx-coroutines-android/example-app/gradle.properties index 8e119d7159..da09045551 100644 --- a/ui/kotlinx-coroutines-android/example-app/gradle.properties +++ b/ui/kotlinx-coroutines-android/example-app/gradle.properties @@ -19,5 +19,5 @@ org.gradle.jvmargs=-Xmx1536m kotlin.coroutines=enable kotlin_version=1.3.50 -coroutines_version=1.3.1 +coroutines_version=1.3.2