diff --git a/src/main/scala/com/raquo/airstream/core/BaseObservable.scala b/src/main/scala/com/raquo/airstream/core/BaseObservable.scala index 9f869e74..23d8d811 100644 --- a/src/main/scala/com/raquo/airstream/core/BaseObservable.scala +++ b/src/main/scala/com/raquo/airstream/core/BaseObservable.scala @@ -1,7 +1,7 @@ package com.raquo.airstream.core import com.raquo.airstream.debug.Debugger -import com.raquo.airstream.flatten.FlattenStrategy +import com.raquo.airstream.flatten.{AllowFlatMap, FlattenStrategy, MergingStrategy, SwitchingStrategy} import com.raquo.airstream.ownership.{Owner, Subscription} import com.raquo.ew.JsArray @@ -53,11 +53,44 @@ trait BaseObservable[+Self[+_] <: Observable[_], +A] extends Source[A] with Name def mapToUnit: Self[Unit] = map(_ => ()) - /** @param compose Note: guarded against exceptions */ - @inline def flatMap[B, Inner[_], Output[+_] <: Observable[_]](compose: A => Inner[B])( - implicit strategy: FlattenStrategy[Self, Inner, Output] + /** #WARNING: DO NOT USE THIS METHOD. + * See https://github.com/raquo/Airstream/#flattening-observables + */ + @inline def flatMap[B, Inner[_], Output[+_] <: Observable[_]]( + project: A => Inner[B] + )( + implicit strategy: SwitchingStrategy[Self, Inner, Output], + allowFlatMap: AllowFlatMap + ): Output[B] = { + strategy.flatten(map(project)) + } + + /** Alias to flatMapSwitch(_ => s) */ + @inline def flatMapTo[B, Inner[_], Output[+_] <: Observable[_]](s: Inner[B])( + implicit strategy: SwitchingStrategy[Self, Inner, Output] + ): Output[B] = { + strategy.flatten(map(_ => s)) + } + + /** @param project Note: guarded against exceptions */ + @inline def flatMapSwitch[B, Inner[_], Output[+_] <: Observable[_]](project: A => Inner[B])( + implicit strategy: SwitchingStrategy[Self, Inner, Output] + ): Output[B] = { + strategy.flatten(map(project)) + } + + /** @param project Note: guarded against exceptions */ + @inline def flatMapMerge[B, Inner[_], Output[+_] <: Observable[_]](project: A => Inner[B])( + implicit strategy: MergingStrategy[Self, Inner, Output] + ): Output[B] = { + strategy.flatten(map(project)) + } + + /** @param project Note: guarded against exceptions */ + @inline def flatMapCustom[B, Inner[_], Output[+_] <: Observable[_]](project: A => Inner[B])( + strategy: FlattenStrategy[Self, Inner, Output] ): Output[B] = { - strategy.flatten(map(compose)) + strategy.flatten(map(project)) } /** Distinct events (but keep all errors) by == (equals) comparison */ diff --git a/src/main/scala/com/raquo/airstream/core/Observable.scala b/src/main/scala/com/raquo/airstream/core/Observable.scala index c6cbd465..9694caea 100644 --- a/src/main/scala/com/raquo/airstream/core/Observable.scala +++ b/src/main/scala/com/raquo/airstream/core/Observable.scala @@ -1,8 +1,8 @@ package com.raquo.airstream.core import com.raquo.airstream.debug.DebuggableObservable -import com.raquo.airstream.flatten.FlattenStrategy import com.raquo.airstream.flatten.FlattenStrategy._ +import com.raquo.airstream.flatten.{AllowFlatten, FlattenStrategy, MergingStrategy, SwitchingStrategy} // @TODO[Scala3] Put this trait together with BaseObservable in the same file, and make BaseObservable sealed. @@ -23,20 +23,41 @@ object Observable extends ObservableLowPriorityImplicits { ) extends AnyVal { @inline def flatten[Output[+_] <: Observable[_]]( - implicit strategy: FlattenStrategy[Outer, Inner, Output] + implicit strategy: SwitchingStrategy[Outer, Inner, Output], + allowFlatMap: AllowFlatten + ): Output[A] = { + strategy.flatten(parent) + } + + @inline def flattenSwitch[Output[+_] <: Observable[_]]( + implicit strategy: SwitchingStrategy[Outer, Inner, Output] + ): Output[A] = { + strategy.flatten(parent) + } + + @inline def flattenMerge[Output[+_] <: Observable[_]]( + implicit strategy: MergingStrategy[Outer, Inner, Output] + ): Output[A] = { + strategy.flatten(parent) + } + + @inline def flattenCustom[Output[+_] <: Observable[_]]( + strategy: FlattenStrategy[Outer, Inner, Output] ): Output[A] = { strategy.flatten(parent) } } - implicit val switchStreamStrategy: FlattenStrategy[Observable, EventStream, EventStream] = SwitchStreamStrategy + implicit val switchStreamStrategy: SwitchingStrategy[Observable, EventStream, EventStream] = SwitchStreamStrategy + + implicit val switchSignalStreamStrategy: SwitchingStrategy[EventStream, Signal, EventStream] = SwitchSignalStreamStrategy - implicit val switchSignalStreamStrategy: FlattenStrategy[EventStream, Signal, EventStream] = SwitchSignalStreamStrategy + implicit val switchSignalStrategy: SwitchingStrategy[Signal, Signal, Signal] = SwitchSignalStrategy - implicit val switchSignalStrategy: FlattenStrategy[Signal, Signal, Signal] = SwitchSignalStrategy + implicit val mergeStreamsStrategy: MergingStrategy[Observable, EventStream, EventStream] = ConcurrentStreamStrategy } trait ObservableLowPriorityImplicits { - implicit val switchSignalObservableStrategy: FlattenStrategy[Observable, Signal, Observable] = SwitchSignalObservableStrategy + implicit val switchSignalObservableStrategy: SwitchingStrategy[Observable, Signal, Observable] = SwitchSignalObservableStrategy } diff --git a/src/main/scala/com/raquo/airstream/flatten/ConcurrentFutureStream.scala b/src/main/scala/com/raquo/airstream/flatten/ConcurrentFutureStream.scala deleted file mode 100644 index e69de29b..00000000 diff --git a/src/main/scala/com/raquo/airstream/flatten/FlattenStrategy.scala b/src/main/scala/com/raquo/airstream/flatten/FlattenStrategy.scala index c9cf2a3a..5d24e1db 100644 --- a/src/main/scala/com/raquo/airstream/flatten/FlattenStrategy.scala +++ b/src/main/scala/com/raquo/airstream/flatten/FlattenStrategy.scala @@ -2,43 +2,75 @@ package com.raquo.airstream.flatten import com.raquo.airstream.core.{EventStream, Observable, Signal} -/** [[Observable.MetaObservable.flatten]] needs an instance of this trait to know how exactly to do the flattening. */ -trait FlattenStrategy[-Outer[+_] <: Observable[_], -Inner[_], +Output[+_] <: Observable[_]] { +import scala.annotation.implicitNotFound + +/** [[Observable.MetaObservable.switchFlatten]] needs an instance of this trait to know how exactly to do the flattening. */ +trait FlattenStrategy[ + -Outer[+_] <: Observable[_], + -Inner[_], + +Output[+_] <: Observable[_] +] { /** Must not throw */ def flatten[A](parent: Outer[Inner[A]]): Output[A] } +/** Flatten strategies with semantics of mirroring the latest emitted observable. */ +trait SwitchingStrategy[ + -Outer[+_] <: Observable[_], + -Inner[_], + +Output[+_] <: Observable[_] +] extends FlattenStrategy[Outer, Inner, Output] + +/** Flatten strategies with semantics of merging all of the emitted observables. */ +trait MergingStrategy[ + -Outer[+_] <: Observable[_], + -Inner[_], + +Output[+_] <: Observable[_] +] extends FlattenStrategy[Outer, Inner, Output] + +@implicitNotFound("\nYou are trying to use Airstream's flatMap operator.\nIt was renamed to flatMapSwitch / flatMapMerge to discourage incorrect usage, especially in for-comprehensions.\nSee https://github.com/raquo/Airstream/#flattening-observables\n\n") +trait AllowFlatMap + +@implicitNotFound("\nYou are trying to use Airstream's flatten operator.\nIt was renamed to flattenSwitch / flattenMerge to clarify intent and discourage incorrect usage, similarly to flatMap.\nSee https://github.com/raquo/Airstream/#flattening-observables\n\n") +trait AllowFlatten + object FlattenStrategy { + @deprecated("You are using Airstream's deprecated flatMap operator using FlattenStrategy.flatMapAllowed import. This migration helper will be removed in the next version of Airstream. See https://github.com/raquo/Airstream/#flattening-observables", since = "17.0.0") + implicit lazy val allowFlatMap: AllowFlatMap = new AllowFlatMap {} + + @deprecated("You are using Airstream's deprecated flatten operator using FlattenStrategy.flattenAllowed import. This migration helper will be removed in the next version of Airstream. See https://github.com/raquo/Airstream/#flattening-observables", since = "17.0.0") + implicit lazy val allowFlatten: AllowFlatten = new AllowFlatten {} + /** See docs for [[SwitchStream]] */ - object SwitchStreamStrategy extends FlattenStrategy[Observable, EventStream, EventStream] { + object SwitchStreamStrategy extends SwitchingStrategy[Observable, EventStream, EventStream] { override def flatten[A](parent: Observable[EventStream[A]]): EventStream[A] = { new SwitchStream[EventStream[A], A](parent = parent, makeStream = identity) } } /** See docs for [[ConcurrentStream]] */ - object ConcurrentStreamStrategy extends FlattenStrategy[Observable, EventStream, EventStream] { + object ConcurrentStreamStrategy extends MergingStrategy[Observable, EventStream, EventStream] { override def flatten[A](parent: Observable[EventStream[A]]): EventStream[A] = { new ConcurrentStream[A](parent = parent) } } /** See docs for [[SwitchSignalStream]] */ - object SwitchSignalStreamStrategy extends FlattenStrategy[EventStream, Signal, EventStream] { + object SwitchSignalStreamStrategy extends SwitchingStrategy[EventStream, Signal, EventStream] { override def flatten[A](parent: EventStream[Signal[A]]): EventStream[A] = { new SwitchSignalStream(parent) } } /** See docs for [[SwitchSignal]] */ - object SwitchSignalStrategy extends FlattenStrategy[Signal, Signal, Signal] { + object SwitchSignalStrategy extends SwitchingStrategy[Signal, Signal, Signal] { override def flatten[A](parent: Signal[Signal[A]]): Signal[A] = { new SwitchSignal(parent) } } - object SwitchSignalObservableStrategy extends FlattenStrategy[Observable, Signal, Observable] { + object SwitchSignalObservableStrategy extends SwitchingStrategy[Observable, Signal, Observable] { override def flatten[A](parent: Observable[Signal[A]]): Observable[A] = { parent.matchStreamOrSignal( ifStream = SwitchSignalStreamStrategy.flatten, diff --git a/src/main/scala/com/raquo/airstream/web/FetchStream.scala b/src/main/scala/com/raquo/airstream/web/FetchStream.scala index 3995136c..f2465a0c 100644 --- a/src/main/scala/com/raquo/airstream/web/FetchStream.scala +++ b/src/main/scala/com/raquo/airstream/web/FetchStream.scala @@ -99,8 +99,8 @@ class FetchBuilder[In, Out]( maybeAbortStream, shouldAbortOnStop, emitOnce - ).flatMap { promise => - EventStream.fromJsPromise(promise).flatMap(decodeResponse) + ).flatMapSwitch { promise => + EventStream.fromJsPromise(promise).flatMapSwitch(decodeResponse) } } } diff --git a/src/test/scala/com/raquo/airstream/core/GlitchSpec.scala b/src/test/scala/com/raquo/airstream/core/GlitchSpec.scala index 321fb2db..956d61f3 100644 --- a/src/test/scala/com/raquo/airstream/core/GlitchSpec.scala +++ b/src/test/scala/com/raquo/airstream/core/GlitchSpec.scala @@ -426,7 +426,7 @@ class GlitchSpec extends UnitSpec { val stateVar = Var(State(Nil)) var n = 0 - val actions: EventStream[Action] = clickBus.events.flatMap { _ => + val actions: EventStream[Action] = clickBus.events.flatMapSwitch { _ => n += 2 EventStream.merge( EventStream.fromValue(n - 2, emitOnce = true), diff --git a/src/test/scala/com/raquo/airstream/core/PullResetSignalSpec.scala b/src/test/scala/com/raquo/airstream/core/PullResetSignalSpec.scala index f72d9165..11575227 100644 --- a/src/test/scala/com/raquo/airstream/core/PullResetSignalSpec.scala +++ b/src/test/scala/com/raquo/airstream/core/PullResetSignalSpec.scala @@ -1045,7 +1045,7 @@ class PullResetSignalSpec extends UnitSpec { case _ => smallSignal } .setDisplayName("MetaSignal") - .flatten + .flattenSwitch .setDisplayName("FlatSignal") .map(Calculation.log("flat", calculations)) .setDisplayName("FlatSignal--LOG") @@ -1136,7 +1136,7 @@ class PullResetSignalSpec extends UnitSpec { EventStream.fromSeq("big-1" :: "big-2" :: Nil, emitOnce = true).setDisplayName("BigSeqStream") ).setDisplayName("BigMergeStream").startWith("big-0").setDisplayName("BigSignal") - val flatSignal = outerBus.events.startWith(0).setDisplayName("OuterBus.startWith").flatMap { + val flatSignal = outerBus.events.startWith(0).setDisplayName("OuterBus.startWith").flatMapSwitch { case i if i >= 10 => bigSignal case _ => smallSignal }.setDisplayName("FlatSignal").map(Calculation.log("flat", calculations)).setDisplayName("FlatSignal--LOG") diff --git a/src/test/scala/com/raquo/airstream/errors/EventStreamErrorSpec.scala b/src/test/scala/com/raquo/airstream/errors/EventStreamErrorSpec.scala index fadd073b..5ff67c60 100644 --- a/src/test/scala/com/raquo/airstream/errors/EventStreamErrorSpec.scala +++ b/src/test/scala/com/raquo/airstream/errors/EventStreamErrorSpec.scala @@ -347,7 +347,7 @@ class EventStreamErrorSpec extends UnitSpec with BeforeAndAfter { val bus = new EventBus[Int] - val stream = bus.events.flatMap(EventStream.fromValue(_, emitOnce = true)) + val stream = bus.events.flatMapSwitch(EventStream.fromValue(_, emitOnce = true)) val effects = mutable.Buffer[Effect[_]]() @@ -387,7 +387,7 @@ class EventStreamErrorSpec extends UnitSpec with BeforeAndAfter { val myVar = Var.fromTry[Int](Failure(err)) - val stream = myVar.signal.flatMap(EventStream.fromValue(_, emitOnce = true)) + val stream = myVar.signal.flatMapSwitch(EventStream.fromValue(_, emitOnce = true)) val effects = mutable.Buffer[Effect[_]]() diff --git a/src/test/scala/com/raquo/airstream/errors/SignalErrorSpec.scala b/src/test/scala/com/raquo/airstream/errors/SignalErrorSpec.scala index 5346d552..c54453c6 100644 --- a/src/test/scala/com/raquo/airstream/errors/SignalErrorSpec.scala +++ b/src/test/scala/com/raquo/airstream/errors/SignalErrorSpec.scala @@ -354,7 +354,7 @@ class SignalErrorSpec extends UnitSpec with BeforeAndAfter { val myVar = Var(0) - val stream = myVar.signal.flatMap(Val(_)) + val stream = myVar.signal.flatMapSwitch(Val(_)) val effects = mutable.Buffer[Effect[_]]() @@ -400,7 +400,7 @@ class SignalErrorSpec extends UnitSpec with BeforeAndAfter { val myVar = Var.fromTry[Int](Failure(err)) // @TODO[Airstream] Add Signal.fromValue / fromTry that creates a Val - val stream = myVar.signal.flatMap(Val(_)) + val stream = myVar.signal.flatMapSwitch(Val(_)) val effects = mutable.Buffer[Effect[_]]() diff --git a/src/test/scala/com/raquo/airstream/flatten/EventStreamFlattenFutureSpec.scala b/src/test/scala/com/raquo/airstream/flatten/EventStreamFlattenFutureSpec.scala index 19b395d5..022996dc 100644 --- a/src/test/scala/com/raquo/airstream/flatten/EventStreamFlattenFutureSpec.scala +++ b/src/test/scala/com/raquo/airstream/flatten/EventStreamFlattenFutureSpec.scala @@ -37,7 +37,7 @@ class EventStreamFlattenFutureSpec extends AsyncUnitSpec { val promise5 = makePromise() val futureBus = new EventBus[Future[Int]]() - val stream = futureBus.events.flatMap(EventStream.fromFuture(_)) + val stream = futureBus.events.flatMapSwitch(EventStream.fromFuture(_)) stream.addObserver(obs) diff --git a/src/test/scala/com/raquo/airstream/flatten/EventStreamFlattenSpec.scala b/src/test/scala/com/raquo/airstream/flatten/EventStreamFlattenSpec.scala index 84ab05f5..f914f3ab 100644 --- a/src/test/scala/com/raquo/airstream/flatten/EventStreamFlattenSpec.scala +++ b/src/test/scala/com/raquo/airstream/flatten/EventStreamFlattenSpec.scala @@ -1,16 +1,16 @@ package com.raquo.airstream.flatten -import com.raquo.airstream.AsyncUnitSpec +import com.raquo.airstream.{AsyncUnitSpec, Matchers} import com.raquo.airstream.core.{EventStream, Observer} import com.raquo.airstream.eventbus.EventBus import com.raquo.airstream.fixtures.{Calculation, Effect, TestableOwner} -import com.raquo.airstream.flatten.FlattenStrategy.ConcurrentStreamStrategy import com.raquo.airstream.ownership.Owner import com.raquo.airstream.state.Var +import scala.annotation.nowarn import scala.collection.mutable -class EventStreamFlattenSpec extends AsyncUnitSpec { +class EventStreamFlattenSpec extends AsyncUnitSpec with Matchers { private val done = assert(true) @@ -25,7 +25,7 @@ class EventStreamFlattenSpec extends AsyncUnitSpec { .map { v => EventStream.fromSeq(Seq(v * 3), emitOnce = true).setDisplayName(s"INT-FS-$v") }.setDisplayName("META") - .flatten.setDisplayName("FLAT") + .flattenSwitch.setDisplayName("FLAT") val effects = mutable.Buffer[Effect[_]]() val obs0 = Observer[Int](newValue => effects += Effect("obs0", newValue)).setDisplayName("obs0") @@ -47,7 +47,7 @@ class EventStreamFlattenSpec extends AsyncUnitSpec { EventStream.fromValue(v * 3, emitOnce = true).setDisplayName(s"S-${v}") } .setDisplayName("MO") - .flatten.setDisplayName("FS") + .flattenSwitch.setDisplayName("FS") val effects = mutable.Buffer[Effect[_]]() val obs = Observer[Int](v => effects += Effect("obs0", v)).setDisplayName("obs") @@ -77,9 +77,9 @@ class EventStreamFlattenSpec extends AsyncUnitSpec { .map { v => EventStream.fromSeq(Seq(v * 3), emitOnce = true).map { vv => EventStream.fromSeq(Seq(vv * 7), emitOnce = true) - }.flatten + }.flattenSwitch } - .flatten + .flattenSwitch val effects = mutable.Buffer[Effect[_]]() val subscription0 = flatStream.foreach(newValue => effects += Effect("obs0", newValue)) @@ -114,7 +114,7 @@ class EventStreamFlattenSpec extends AsyncUnitSpec { .map { v => delayedStream(range2, interval = 6, _ * v) } - .flatten + .flattenSwitch val effects = mutable.Buffer[Effect[_]]() val subscription0 = flatStream.foreach(newValue => effects += Effect("obs0", newValue)) @@ -146,9 +146,9 @@ class EventStreamFlattenSpec extends AsyncUnitSpec { .map { v => delayedStream(range2, interval = 6, _ * v).map { vv => EventStream.fromFuture(delay(1)(vv * 7)) - }.flatten + }.flattenSwitch } - .flatten + .flattenSwitch val effects = mutable.Buffer[Effect[_]]() val subscription0 = flatStream.foreach(newValue => effects += Effect("obs0", newValue)) @@ -161,7 +161,7 @@ class EventStreamFlattenSpec extends AsyncUnitSpec { } } - it("sync flatMap") { + it("sync flatMapSwitch") { implicit val owner: Owner = new TestableOwner @@ -169,7 +169,7 @@ class EventStreamFlattenSpec extends AsyncUnitSpec { val stream = EventStream.fromSeq(range, emitOnce = true) val flatStream = stream - .flatMap { v => + .flatMapSwitch { v => EventStream.fromSeq(Seq(v * 3), emitOnce = true) } @@ -180,7 +180,7 @@ class EventStreamFlattenSpec extends AsyncUnitSpec { effects.toList shouldBe range.map(i => Effect("obs0", i * 3)) } - it("sync three-level flatMap") { + it("sync three-level flatMapSwitch") { implicit val owner: Owner = new TestableOwner @@ -188,8 +188,8 @@ class EventStreamFlattenSpec extends AsyncUnitSpec { val stream = EventStream.fromSeq(range, emitOnce = true) val flatStream = stream - .flatMap { v => - EventStream.fromSeq(Seq(v * 3), emitOnce = true).flatMap { vv => + .flatMapSwitch { v => + EventStream.fromSeq(Seq(v * 3), emitOnce = true).flatMapSwitch { vv => EventStream.fromSeq(Seq(vv * 7), emitOnce = true) } } @@ -205,7 +205,7 @@ class EventStreamFlattenSpec extends AsyncUnitSpec { * emitted by the inner delayedStream are processed. Just because the interval is set to 6ms * does not mean that this is what it will be. It's merely the lower bound. */ - it("from-future flatMap") { + it("from-future flatMapSwitch") { implicit val owner: Owner = new TestableOwner val range1 = 1 to 3 @@ -214,7 +214,7 @@ class EventStreamFlattenSpec extends AsyncUnitSpec { val flatStream = stream - .flatMap { v => + .flatMapSwitch { v => delayedStream(range2, interval = 6, _ * v) } @@ -236,7 +236,7 @@ class EventStreamFlattenSpec extends AsyncUnitSpec { * emitted by the inner delayedStream are processed. Just because the interval is set to 6ms * does not mean that this is what it will be. It's merely the lower bound. */ - it("three-level from-future flatMap") { + it("three-level from-future flatMapSwitch") { implicit val owner: Owner = new TestableOwner val range1 = 1 to 3 @@ -245,8 +245,8 @@ class EventStreamFlattenSpec extends AsyncUnitSpec { val flatStream = stream - .flatMap { v => - delayedStream(range2, interval = 6, _ * v).flatMap { vv => + .flatMapSwitch { v => + delayedStream(range2, interval = 6, _ * v).flatMapSwitch { vv => EventStream.fromFuture(delay(1)(vv * 7)) } } @@ -277,7 +277,7 @@ class EventStreamFlattenSpec extends AsyncUnitSpec { val mergeBus = new EventBus[EventStream[Int]] - val mergeStream = mergeBus.events.flatten(ConcurrentStreamStrategy).map(Calculation.log("merge", calculations)) + val mergeStream = mergeBus.events.flattenMerge.map(Calculation.log("merge", calculations)) val sub1 = mergeStream.addObserver(Observer.empty) @@ -396,7 +396,7 @@ class EventStreamFlattenSpec extends AsyncUnitSpec { val mergeSignal = streamVar .signal .distinct - .flatten(ConcurrentStreamStrategy) + .flattenMerge .map(Calculation.log("merge", calculations)) val sub1 = mergeSignal.addObserver(Observer.empty) @@ -501,4 +501,26 @@ class EventStreamFlattenSpec extends AsyncUnitSpec { done } + it("legacy flatMap and flatten methods") { + + val bus = new EventBus[Int] + + assertTypeError("bus.events.flatMap(_ => EventStream.fromValue(1))") + + @nowarn("cat=deprecation") + def flatMapCompileCheck() = { + import com.raquo.airstream.flatten.FlattenStrategy.allowFlatMap + bus.events.flatMap(_ => EventStream.fromValue(1)) + } + + assertTypeError("bus.events.map(_ => EventStream.fromValue(1)).flatten") + + @nowarn("cat=deprecation") + def flattenCompileCheck() = { + import com.raquo.airstream.flatten.FlattenStrategy.allowFlatten + bus.events.map(_ => EventStream.fromValue(1)).flatten + } + + done + } } diff --git a/src/test/scala/com/raquo/airstream/flatten/SignalFlattenFutureSpec.scala b/src/test/scala/com/raquo/airstream/flatten/SignalFlattenFutureSpec.scala index 380e04ca..96570a83 100644 --- a/src/test/scala/com/raquo/airstream/flatten/SignalFlattenFutureSpec.scala +++ b/src/test/scala/com/raquo/airstream/flatten/SignalFlattenFutureSpec.scala @@ -36,7 +36,7 @@ class SignalFlattenFutureSpec extends AsyncUnitSpec { val futureBus = new EventBus[Future[Int]]() val signal = futureBus.events .startWith(promise0.future) - .flatMap(Signal.fromFuture(_, initial = -200)) + .flatMapSwitch(Signal.fromFuture(_, initial = -200)) signal.addObserver(obs) @@ -93,7 +93,7 @@ class SignalFlattenFutureSpec extends AsyncUnitSpec { val futureBus = new EventBus[Future[Int]]() - val signal = futureBus.events.startWith(promise0.future).flatMap(Signal.fromFuture(_, initial = -200)) + val signal = futureBus.events.startWith(promise0.future).flatMapSwitch(Signal.fromFuture(_, initial = -200)) promise0.success(-100) signal.addObserver(obs) @@ -148,7 +148,7 @@ class SignalFlattenFutureSpec extends AsyncUnitSpec { promise0.success(-100) delay { - val signal = futureBus.events.startWith(promise0.future).flatMap(Signal.fromFuture(_, initial = -200)) + val signal = futureBus.events.startWith(promise0.future).flatMapSwitch(Signal.fromFuture(_, initial = -200)) signal.addObserver(obs) effects shouldBe mutable.Buffer(Effect("obs", -200)) diff --git a/src/test/scala/com/raquo/airstream/flatten/SwitchSignalSpec.scala b/src/test/scala/com/raquo/airstream/flatten/SwitchSignalSpec.scala index 4088706f..19a78280 100644 --- a/src/test/scala/com/raquo/airstream/flatten/SwitchSignalSpec.scala +++ b/src/test/scala/com/raquo/airstream/flatten/SwitchSignalSpec.scala @@ -26,7 +26,7 @@ class SwitchSignalSpec extends UnitSpec { val metaVar = Var(sourceSignals(0)) - val $latestNumber = metaVar.signal.flatten // SwitchSignalStrategy is the default (provided implicitly) + val $latestNumber = metaVar.signal.flattenSwitch // SwitchSignalStrategy is the default (provided implicitly) val flattenObserver = Observer[Int](effects += Effect("flattened-obs", _)) @@ -228,7 +228,7 @@ class SwitchSignalSpec extends UnitSpec { val flatSignal = outerBus.events.setDisplayName("outerBus.events").startWith(0).setDisplayName("outerBus.signal").map { case i if i >= 10 => bigSignal case _ => smallSignal - }.setDisplayName("outerBus.meta").flatten.setDisplayName("flatSignal").map(Calculation.log("flat", calculations)) + }.setDisplayName("outerBus.meta").flattenSwitch.setDisplayName("flatSignal").map(Calculation.log("flat", calculations)) // -- @@ -374,7 +374,7 @@ class SwitchSignalSpec extends UnitSpec { val brokenSignal = intSignal - .flatMap { num => + .flatMapSwitch { num => if (num < 1000) { smallI += 1 intSignal.map("small: " + _).setDisplayName(s"small-$smallI") //.debugLogLifecycle() diff --git a/src/test/scala/com/raquo/airstream/flatten/SwitchSignalStreamSpec.scala b/src/test/scala/com/raquo/airstream/flatten/SwitchSignalStreamSpec.scala index 2d531716..f1bb3558 100644 --- a/src/test/scala/com/raquo/airstream/flatten/SwitchSignalStreamSpec.scala +++ b/src/test/scala/com/raquo/airstream/flatten/SwitchSignalStreamSpec.scala @@ -27,7 +27,7 @@ class SwitchSignalStreamSpec extends UnitSpec { val metaBus = new EventBus[Signal[Int]] - val $latestNumber = metaBus.events.flatten // SwitchSignalStreamStrategy is the default (provided implicitly) + val $latestNumber = metaBus.events.flattenSwitch // SwitchSignalStreamStrategy is the default (provided implicitly) val flattenObserver = Observer[Int](effects += Effect("flattened-obs", _)) @@ -213,7 +213,7 @@ class SwitchSignalStreamSpec extends UnitSpec { val metaBus = new EventBus[Signal[Int]] - val $latestNumber = metaBus.events.flatten // SwitchSignalStreamStrategy is the default (provided implicitly) + val $latestNumber = metaBus.events.flattenSwitch // SwitchSignalStreamStrategy is the default (provided implicitly) val flattenObserver = Observer[Int](effects += Effect("flattened-obs", _)) diff --git a/src/test/scala/com/raquo/airstream/flatten/SwitchStreamSpec.scala b/src/test/scala/com/raquo/airstream/flatten/SwitchStreamSpec.scala index 44318b00..857cd166 100644 --- a/src/test/scala/com/raquo/airstream/flatten/SwitchStreamSpec.scala +++ b/src/test/scala/com/raquo/airstream/flatten/SwitchStreamSpec.scala @@ -27,7 +27,7 @@ class SwitchStreamSpec extends UnitSpec { case (bus, index) => bus.events.map(Calculation.log(s"source-$index", calculations)) } - val $latestNumber = metaBus.events.flatten // SwitchStreamStrategy is the default (provided implicitly) + val $latestNumber = metaBus.events.flattenSwitch // SwitchStreamStrategy is the default (provided implicitly) val flattenObserver = Observer[Int](effects += Effect("flattened-obs", _)) @@ -156,7 +156,7 @@ class SwitchStreamSpec extends UnitSpec { val metaVar = Var[EventStream[Int]](sourceStreams(0)) - val $latestNumber = metaVar.signal.flatten(SwitchStreamStrategy) + val $latestNumber = metaVar.signal.flattenSwitch(SwitchStreamStrategy) val flattenObserver = Observer[Int](effects += Effect("flattened-obs", _)) @@ -313,7 +313,7 @@ class SwitchStreamSpec extends UnitSpec { EventStream.fromSeq("big-1" :: "big-2" :: Nil, emitOnce = true) ) - val flatStream = outerBus.events.flatMap { + val flatStream = outerBus.events.flatMapSwitch { case i if i >= 10 => bigStream case _ => smallStream }.map(Calculation.log("flat", calculations)) @@ -457,7 +457,7 @@ class SwitchStreamSpec extends UnitSpec { case i if i >= 10 => bigStream case _ => smallStream }.setDisplayName("outer-meta") - .flatten.setDisplayName("outer-flat") + .flattenSwitch.setDisplayName("outer-flat") .map(Calculation.log("flat", calculations)).setDisplayName("outer-flat-map") // -- @@ -598,7 +598,7 @@ class SwitchStreamSpec extends UnitSpec { val brokenSignal = intStream - .flatMap { num => + .flatMapSwitch { num => if (num < 1000) { smallI += 1 intStream.map("small: " + _).setDisplayName(s"small-$smallI") //.debugLogLifecycle() diff --git a/src/test/scala/com/raquo/airstream/syntax/SyntaxSpec.scala b/src/test/scala/com/raquo/airstream/syntax/SyntaxSpec.scala index 1875edd6..bc4853f0 100644 --- a/src/test/scala/com/raquo/airstream/syntax/SyntaxSpec.scala +++ b/src/test/scala/com/raquo/airstream/syntax/SyntaxSpec.scala @@ -78,12 +78,12 @@ class SyntaxSpec extends UnitSpec { val bus = new EventBus[Int] locally { - val flatStream = bus.events.flatMap(a => EventStream.fromFuture(Future.successful(a))) + val flatStream = bus.events.flatMapSwitch(a => EventStream.fromFuture(Future.successful(a))) flatStream: EventStream[Int] } locally { - val flatSignal = bus.events.startWith(0).flatMap(a => Signal.fromFuture(Future.successful(a), initial = 0)) + val flatSignal = bus.events.startWith(0).flatMapSwitch(a => Signal.fromFuture(Future.successful(a), initial = 0)) flatSignal: Signal[Int] } }