From 107351457e27843cad6a58068844407793c3a73b Mon Sep 17 00:00:00 2001 From: Nikita Gazarov Date: Wed, 10 Jul 2024 23:35:39 -0700 Subject: [PATCH] New: Splittable Vars (#112) --- .../raquo/airstream/core/BaseObservable.scala | 3 + .../airstream/split/MutableSplittable.scala | 103 +++ .../raquo/airstream/split/SplitSignal.scala | 72 +- .../raquo/airstream/split/Splittable.scala | 67 +- .../airstream/split/SplittableOneStream.scala | 2 +- .../raquo/airstream/split/SplittableVar.scala | 83 ++ .../airstream/state/LazyDerivedVar.scala | 10 +- .../state/LazyDerivedVarSignal.scala | 33 - .../airstream/state/LazyStrictSignal.scala | 44 ++ .../com/raquo/airstream/state/SourceVar.scala | 8 +- .../scala/com/raquo/airstream/state/Var.scala | 10 +- .../airstream/misc/SplitSignalSpec.scala | 16 +- .../raquo/airstream/misc/SplitVarSpec.scala | 715 ++++++++++++++++++ 13 files changed, 1105 insertions(+), 61 deletions(-) create mode 100644 src/main/scala/com/raquo/airstream/split/MutableSplittable.scala create mode 100644 src/main/scala/com/raquo/airstream/split/SplittableVar.scala delete mode 100644 src/main/scala/com/raquo/airstream/state/LazyDerivedVarSignal.scala create mode 100644 src/main/scala/com/raquo/airstream/state/LazyStrictSignal.scala create mode 100644 src/test/scala/com/raquo/airstream/misc/SplitVarSpec.scala diff --git a/src/main/scala/com/raquo/airstream/core/BaseObservable.scala b/src/main/scala/com/raquo/airstream/core/BaseObservable.scala index 8e690816..04a679b0 100644 --- a/src/main/scala/com/raquo/airstream/core/BaseObservable.scala +++ b/src/main/scala/com/raquo/airstream/core/BaseObservable.scala @@ -208,6 +208,9 @@ trait BaseObservable[+Self[+_] <: Observable[_], +A] extends Source[A] with Name /** Child observable should call this method on its parents when it is started. * This observable calls [[onStart]] if this action has given it its first observer (internal or external). + * + * I think `shouldCallMaybeWillStart` should be `false` when we're calling this from onStart. + * In that case, `maybeWillStart` was already called. #TODO But `maybeWillStart` checks for willStartDone status, so does it actually matter? */ protected[airstream] def addInternalObserver(observer: InternalObserver[A], shouldCallMaybeWillStart: Boolean): Unit diff --git a/src/main/scala/com/raquo/airstream/split/MutableSplittable.scala b/src/main/scala/com/raquo/airstream/split/MutableSplittable.scala new file mode 100644 index 00000000..d0270379 --- /dev/null +++ b/src/main/scala/com/raquo/airstream/split/MutableSplittable.scala @@ -0,0 +1,103 @@ +package com.raquo.airstream.split + +import com.raquo.ew.JsArray + +import scala.collection.mutable +import scala.scalajs.js + +trait MutableSplittable[M[_]] { + + val splittable: Splittable[M] + + def size[A](items: M[A]): Int + + /** Equivalent of array(index) */ + def getByIndex[A](items: M[A], index: Int): A + + /** Equivalent of array.update(index, newValue) */ + def updateAtIndex[A](items: M[A], index: Int, newItem: A): Unit + + /** Equivalent of array.find(predicate).foreach(array.update(foundIndex, newItem)) */ + def findUpdateInPlace[A](items: M[A], predicate: A => Boolean, newItem: A): Boolean = { + // #Warning: This implementation assumes cheap O(1) index access. + // Override this method for efficiency as needed (e.g. for ListBuffer – see below). + var found = false + var index = 0 + val len = size(items) + while (!found && index < len) { + if (predicate(getByIndex(items, index))) { + updateAtIndex(items, index, newItem) + found = true + } + index += 1 + } + found + } +} + +object MutableSplittable { + + implicit object JsArrayMutableSplittable extends MutableSplittable[JsArray] { + + override val splittable: Splittable[JsArray] = Splittable.JsArraySplittable + + override def size[A](items: JsArray[A]): Int = items.length + + override def getByIndex[A](items: JsArray[A], index: Int): A = items(index) + + override def updateAtIndex[A](items: JsArray[A], index: Int, newItem: A): Unit = { + items.update(index, newItem) + } + } + + implicit object ScalaJsArrayMutableSplittable extends MutableSplittable[js.Array] { + + override val splittable: Splittable[js.Array] = Splittable.ScalaJsArraySplittable + + override def size[A](items: js.Array[A]): Int = items.length + + override def getByIndex[A](items: js.Array[A], index: Int): A = items(index) + + override def updateAtIndex[A](items: js.Array[A], index: Int, newItem: A): Unit = { + items.update(index, newItem) + } + } + + implicit object BufferMutableSplittable extends MutableSplittable[mutable.Buffer] { + + override val splittable: Splittable[mutable.Buffer] = Splittable.BufferSplittable + + override def size[A](items: mutable.Buffer[A]): Int = items.size + + override def getByIndex[A](items: mutable.Buffer[A], index: Int): A = items(index) + + override def updateAtIndex[A](items: mutable.Buffer[A], index: Int, newItem: A): Unit = { + items.update(index, newItem) + } + + override def findUpdateInPlace[A]( + items: mutable.Buffer[A], + predicate: A => Boolean, + newItem: A + ): Boolean = { + items match { + case _: mutable.ListBuffer[A @unchecked] => + // This implementation is more efficient when accessing elements by index is costly. + val iterator = items.iterator + var found = false + var index = 0 + while (!found && iterator.hasNext) { + if (predicate(iterator.next())) { + updateAtIndex(items, index, newItem) + found = true + } + index += 1 + } + found + case _ => + // All other Buffer implementations are indexed (I think) + super.findUpdateInPlace(items, predicate, newItem) + } + } + } +} diff --git a/src/main/scala/com/raquo/airstream/split/SplitSignal.scala b/src/main/scala/com/raquo/airstream/split/SplitSignal.scala index bcf9eec6..9b76e717 100644 --- a/src/main/scala/com/raquo/airstream/split/SplitSignal.scala +++ b/src/main/scala/com/raquo/airstream/split/SplitSignal.scala @@ -1,6 +1,6 @@ package com.raquo.airstream.split -import com.raquo.airstream.common.SingleParentSignal +import com.raquo.airstream.common.{InternalTryObserver, SingleParentSignal} import com.raquo.airstream.core.{AirstreamError, Protected, Signal, Transaction} import com.raquo.airstream.timing.SyncDelayStream @@ -29,7 +29,8 @@ class SplitSignal[M[_], Input, Output, Key]( distinctCompose: Signal[Input] => Signal[Input], project: (Key, Input, Signal[Input]) => Output, splittable: Splittable[M], - duplicateKeysConfig: DuplicateKeysConfig = DuplicateKeysConfig.default + duplicateKeysConfig: DuplicateKeysConfig = DuplicateKeysConfig.default, + strict: Boolean = false // #TODO `false` default for now to keep compatibility with 17.0.0 - consider changing in 18.0.0 #nc ) extends SingleParentSignal[M[Input], M[Output]] { override protected val topoRank: Int = Protected.topoRank(parent) + 1 @@ -43,8 +44,8 @@ class SplitSignal[M[_], Input, Output, Key]( // likely help in most popular use cases. // - However, don't bother until we can benchmark how much time we spend // in memoization, and how much we'll gain from switching to JS Maps. - /** key -> (inputValue, outputValue, lastParentUpdateId) */ - private[this] val memoized: mutable.Map[Key, (Input, Output, Int)] = mutable.Map.empty + /** key -> (inputValue, inputSignal, outputValue, lastParentUpdateId) */ + private[this] val memoized: mutable.Map[Key, (Input, Signal[Input], Output, Int)] = mutable.Map.empty override protected def onTry(nextParentValue: Try[M[Input]], transaction: Transaction): Unit = { super.onTry(nextParentValue, transaction) @@ -56,6 +57,10 @@ class SplitSignal[M[_], Input, Output, Key]( private[this] val sharedDelayedParent = new SyncDelayStream(parent, this) + private[this] val emptyObserver = new InternalTryObserver[Input] { + override protected def onTry(nextValue: Try[Input], transaction: Transaction): Unit = () + } + private[this] def memoizedProject(nextInputs: M[Input]): M[Output] = { // Any keys not in this set by the end of this function will be removed from `memoized` map // This ensures that previously memoized values are forgotten once the source observables stops emitting their inputs @@ -74,9 +79,9 @@ class SplitSignal[M[_], Input, Output, Key]( nextKeys += memoizedKey - val cachedOutput = memoized.get(memoizedKey).map(_._2) + val cachedSignalAndOutput = memoized.get(memoizedKey).map(t => (t._2, t._3)) - val nextOutput = cachedOutput.getOrElse { + val nextSignalAndOutput = cachedSignalAndOutput.getOrElse { val initialInput = nextInput // @warning !!! DANGER ZONE !!! @@ -109,18 +114,25 @@ class SplitSignal[M[_], Input, Output, Key]( new SplitChildSignal[M, Input]( sharedDelayedParent, initialValue = Some((initialInput, Protected.lastUpdateId(parent))), - () => memoized.get(memoizedKey).map(t => (t._1, t._3)) + () => memoized.get(memoizedKey).map(t => (t._1, t._4)) ) ) + if (isStarted && strict) { + inputSignal.addInternalObserver(emptyObserver, shouldCallMaybeWillStart = true) + } + val newOutput = project(memoizedKey, initialInput, inputSignal) - newOutput + (inputSignal, newOutput) } + val inputSignal = nextSignalAndOutput._1 + val nextOutput = nextSignalAndOutput._2 + // Cache this key for the first time, or update the input so that inputSignal can fetch it // dom.console.log(s"${this} memoized.update ${memoizedKey} -> ${nextInput}") - memoized.update(memoizedKey, (nextInput, nextOutput, Protected.lastUpdateId(parent))) + memoized.update(memoizedKey, (nextInput, inputSignal, nextOutput, Protected.lastUpdateId(parent))) nextOutput }) @@ -128,6 +140,10 @@ class SplitSignal[M[_], Input, Output, Key]( memoized.keys.foreach { memoizedKey => if (!nextKeys.contains(memoizedKey)) { // dom.console.log(s"${this} memoized.remove ${memoizedKey}") + if (strict) { + val inputSignal = memoized(memoizedKey)._2 + inputSignal.removeInternalObserver(emptyObserver) + } memoized.remove(memoizedKey) } } @@ -140,4 +156,42 @@ class SplitSignal[M[_], Input, Output, Key]( nextOutputs } + + override protected[this] def onStart(): Unit = { + if (strict) { + parent.tryNow().foreach { inputs => + // splittable.foreach is perhaps less efficient that memoized.keys, + // but it has a predictable order that users expect. + splittable.foreach(inputs, (input: Input) => { + val memoizedKey = key(input) + val maybeInputSignal = memoized.get(memoizedKey).map(_._2) + maybeInputSignal.foreach { inputSignal => + // - If inputSignal not found because `parent.tryNow()` and `memoized` + // are temporarily out of sync, it should be added later just fine + // when they sync up. + // - Typical pattern is to call Protected.maybeWillStart(inputSignal) in onWillStart, + // but our SplitSignal does not actually depend on these inputSignal-s, so I think + // it's ok to do both onWillStart and onStart for them here. + inputSignal.addInternalObserver(emptyObserver, shouldCallMaybeWillStart = true) + } + }) + } + } + super.onStart() + } + + override protected[this] def onStop(): Unit = { + if (strict) { + // memoized.keys has no defined order, so we don't want to + // use it for starting subscriptions, but for stopping them, + // seems fine. + // This way we make sure to remove observers from exactly + // all items that are in `memoized`, no more, no less. + memoized.keys.foreach { memoizedKey => + val inputSignal = memoized(memoizedKey)._2 + inputSignal.removeInternalObserver(emptyObserver) + } + } + super.onStop() + } } diff --git a/src/main/scala/com/raquo/airstream/split/Splittable.scala b/src/main/scala/com/raquo/airstream/split/Splittable.scala index 9e226f4c..0aec6e4f 100644 --- a/src/main/scala/com/raquo/airstream/split/Splittable.scala +++ b/src/main/scala/com/raquo/airstream/split/Splittable.scala @@ -2,14 +2,40 @@ package com.raquo.airstream.split import com.raquo.ew.{JsArray, JsVector} -import scala.collection.immutable +import scala.collection.{immutable, mutable} import scala.scalajs.js /** The `split` operator needs an implicit instance of Splittable[M] in order to work on observables of M[_] */ trait Splittable[M[_]] { + /** Equivalent of list.map(project) */ def map[A, B](inputs: M[A], project: A => B): M[B] + /** Equivalent of list.foreach(f) */ + def foreach[A](inputs: M[A], f: A => Unit): Unit = { + // #TODO[API] This default implementation is inefficient, + // we're only adding it to satisfy binary compatibility in 17.1.0. + // #nc Remove this implementation later. + map(inputs, f) + } + + /** Find the FIRST item matching predicate, and return a collection with it updated */ + def findUpdate[A](inputs: M[A], predicate: A => Boolean, newItem: A): M[A] = { + // #TODO should we try to provide more efficient implementations for: + // Vector, ArraySeq, ArrayBuffer, JsArray, js.Array, JsVector? + // - but what if we have e.g. ArrayBuffer typed as Buffer? Don't want runtime type checks... + // - is the impl below actually less efficient than using indexWhere? + var found = false // keys must be unique, so we can break early + map[A, A](inputs, item => { + if (!found && predicate(item)) { + found = true + newItem + } else { + item + } + }) + } + def zipWithIndex[A](inputs: M[A]): M[(A, Int)] = { var ix = -1 map(inputs, (input: A) => { @@ -27,6 +53,8 @@ object Splittable extends LowPrioritySplittableImplicits { override def map[A, B](inputs: List[A], project: A => B): List[B] = inputs.map(project) + override def foreach[A](inputs: List[A], f: A => Unit): Unit = inputs.foreach(f) + override def empty[A]: List[A] = Nil } @@ -34,6 +62,8 @@ object Splittable extends LowPrioritySplittableImplicits { override def map[A, B](inputs: Vector[A], project: A => B): Vector[B] = inputs.map(project) + override def foreach[A](inputs: Vector[A], f: A => Unit): Unit = inputs.foreach(f) + override def empty[A]: Vector[A] = Vector.empty } @@ -41,6 +71,8 @@ object Splittable extends LowPrioritySplittableImplicits { override def map[A, B](inputs: Set[A], project: A => B): Set[B] = inputs.map(project) + override def foreach[A](inputs: Set[A], f: A => Unit): Unit = inputs.foreach(f) + override def empty[A]: Set[A] = Set.empty } @@ -48,13 +80,26 @@ object Splittable extends LowPrioritySplittableImplicits { override def map[A, B](inputs: Option[A], project: A => B): Option[B] = inputs.map(project) + override def foreach[A](inputs: Option[A], f: A => Unit): Unit = inputs.foreach(f) + override def empty[A]: Option[A] = None } + implicit object BufferSplittable extends Splittable[mutable.Buffer] { + + override def map[A, B](inputs: mutable.Buffer[A], project: A => B): mutable.Buffer[B] = inputs.map(project) + + override def foreach[A](inputs: mutable.Buffer[A], f: A => Unit): Unit = inputs.foreach(f) + + override def empty[A]: mutable.Buffer[A] = mutable.Buffer() + } + implicit object JsArraySplittable extends Splittable[JsArray] { override def map[A, B](inputs: JsArray[A], project: A => B): JsArray[B] = inputs.map(project) + override def foreach[A](inputs: JsArray[A], f: A => Unit): Unit = inputs.forEach(f) + override def empty[A]: JsArray[A] = JsArray() } @@ -62,6 +107,8 @@ object Splittable extends LowPrioritySplittableImplicits { override def map[A, B](inputs: JsVector[A], project: A => B): JsVector[B] = inputs.map(project) + override def foreach[A](inputs: JsVector[A], f: A => Unit): Unit = inputs.forEach(f) + override def empty[A]: JsVector[A] = JsVector() } @@ -69,6 +116,8 @@ object Splittable extends LowPrioritySplittableImplicits { override def map[A, B](inputs: js.Array[A], project: A => B): js.Array[B] = inputs.map(project) + override def foreach[A](inputs: js.Array[A], f: A => Unit): Unit = inputs.foreach(f) + override def empty[A]: js.Array[A] = js.Array() } } @@ -88,6 +137,14 @@ trait LowPrioritySplittableImplicits extends LowestPrioritySplittableImplicits { strictInputs.map(project) } + override def foreach[A](inputs: Seq[A], f: A => Unit): Unit = { + val strictInputs = inputs match { + case lazyList: LazyList[A @unchecked] => lazyList.toList + case _ => inputs + } + strictInputs.foreach(f) + } + override def empty[A]: immutable.Seq[A] = Nil } } @@ -104,6 +161,14 @@ trait LowestPrioritySplittableImplicits { strictInputs.map(project) } + override def foreach[A](inputs: collection.Seq[A], f: A => Unit): Unit = { + val strictInputs = inputs match { + case lazyList: LazyList[A @unchecked] => lazyList.toList + case _ => inputs + } + strictInputs.foreach(f) + } + override def empty[A]: collection.Seq[A] = Nil } } diff --git a/src/main/scala/com/raquo/airstream/split/SplittableOneStream.scala b/src/main/scala/com/raquo/airstream/split/SplittableOneStream.scala index 80e7a51d..88600431 100644 --- a/src/main/scala/com/raquo/airstream/split/SplittableOneStream.scala +++ b/src/main/scala/com/raquo/airstream/split/SplittableOneStream.scala @@ -28,7 +28,7 @@ class SplittableOneStream[Input](val stream: EventStream[Input]) extends AnyVal /** This operator lets you "split" EventStream[Input] into two branches: * - processing of Signal[Input] into Output, and * - the initial value of Output. - * This is a nice shorthand to to signal.splitOption in cases + * This is a nice shorthand to signal.splitOption in cases * when signal is actually stream.toWeakSignal or stream.startWith(initial) */ def splitStart[Output]( diff --git a/src/main/scala/com/raquo/airstream/split/SplittableVar.scala b/src/main/scala/com/raquo/airstream/split/SplittableVar.scala new file mode 100644 index 00000000..721cef5d --- /dev/null +++ b/src/main/scala/com/raquo/airstream/split/SplittableVar.scala @@ -0,0 +1,83 @@ +package com.raquo.airstream.split + +import com.raquo.airstream.core.Signal +import com.raquo.airstream.state.{LazyDerivedVar, LazyStrictSignal, Var} + +class SplittableVar[M[_], Input](val v: Var[M[Input]]) extends AnyVal { + + /** This `split` operator works on Vars, and gives you a */ + def split[Output, Key]( + key: Input => Key, + distinctCompose: Signal[Input] => Signal[Input] = _.distinct, + duplicateKeys: DuplicateKeysConfig = DuplicateKeysConfig.default + )( + project: (Key, Input, Var[Input]) => Output + )( + implicit splittable: Splittable[M] + ): Signal[M[Output]] = { + new SplitSignal[M, Input, Output, Key]( + parent = v.signal, + key, + distinctCompose, + project = (thisKey, initial, signal) => { + val displayNameSuffix = s".split(key = ${key})" + val childVar = new LazyDerivedVar[M[Input], Input]( + parent = v, + signal = new LazyStrictSignal[Input, Input]( + signal, identity, signal.displayName, displayNameSuffix + ".signal" + ), + zoomOut = (inputs, newInput) => { + splittable.findUpdate(inputs, key(_) == thisKey, newInput) + }, + displayNameSuffix = displayNameSuffix + ) + project(thisKey, initial, childVar) + }, + splittable, + duplicateKeys, + strict = true + ) + } + + /** This variation of the `split` operator is designed for Var-s of + * mutable collections. It works like the usual split, except that + * it updates the mutable collection in-place instead of creating + * a modified copy of it, like the regular `split` operator does. + * + * Note that the regular `split` operators work fine with both mutable + * and immutable collections, treating both of them as immutable. + */ + def splitMutate[Output, Key]( + key: Input => Key, + distinctCompose: Signal[Input] => Signal[Input] = _.distinct, + duplicateKeys: DuplicateKeysConfig = DuplicateKeysConfig.default + )( + project: (Key, Input, Var[Input]) => Output + )( + implicit splittable: MutableSplittable[M] + ): Signal[M[Output]] = { + new SplitSignal[M, Input, Output, Key]( + parent = v.signal, + key, + distinctCompose, + project = (thisKey, initial, signal) => { + val displayNameSuffix = s".splitMutate(key = ${key})" + val childVar = new LazyDerivedVar[M[Input], Input]( + parent = v, + signal = new LazyStrictSignal[Input, Input]( + signal, identity, signal.displayName, displayNameSuffix + ".signal" + ), + zoomOut = (inputs, newInput) => { + splittable.findUpdateInPlace[Input](inputs, key(_) == thisKey, newInput) + inputs + }, + displayNameSuffix = displayNameSuffix + ) + project(thisKey, initial, childVar) + }, + splittable.splittable, + duplicateKeys, + strict = true + ) + } +} diff --git a/src/main/scala/com/raquo/airstream/state/LazyDerivedVar.scala b/src/main/scala/com/raquo/airstream/state/LazyDerivedVar.scala index f4aebe65..bb1207fb 100644 --- a/src/main/scala/com/raquo/airstream/state/LazyDerivedVar.scala +++ b/src/main/scala/com/raquo/airstream/state/LazyDerivedVar.scala @@ -12,18 +12,18 @@ import scala.util.{Failure, Success, Try} * Unlike the regular DerivedVar, you don't need to provide an Owner * to create LazyDerivedVar, and you're allowed to update this Var * even if its signal has no subscribers. + * + * @param zoomOut (currentParentValue, nextValue) => nextParentValue. */ class LazyDerivedVar[A, B]( parent: Var[A], - zoomIn: A => B, + override val signal: StrictSignal[B], zoomOut: (A, B) => A, displayNameSuffix: String ) extends Var[B] { override private[state] def underlyingVar: SourceVar[_] = parent.underlyingVar - private[this] val _varSignal = new LazyDerivedVarSignal(parent, zoomIn, displayName) - // #Note this getCurrentValue implementation is different from SourceVar // - SourceVar's getCurrentValue looks at an internal currentValue variable // - That currentValue gets updated immediately before the signal (in an already existing transaction) @@ -34,7 +34,7 @@ class LazyDerivedVar[A, B]( override private[state] def getCurrentValue: Try[B] = signal.tryNow() override private[state] def setCurrentValue(value: Try[B], transaction: Transaction): Unit = { - parent.tryNow() match { + parent.signal.tryNow() match { case Success(parentValue) => // This can update the parent without causing an infinite loop because // the parent updates this derived var's signal, it does not call @@ -47,7 +47,5 @@ class LazyDerivedVar[A, B]( } } - override val signal: StrictSignal[B] = _varSignal - override protected def defaultDisplayName: String = parent.displayName + displayNameSuffix } diff --git a/src/main/scala/com/raquo/airstream/state/LazyDerivedVarSignal.scala b/src/main/scala/com/raquo/airstream/state/LazyDerivedVarSignal.scala deleted file mode 100644 index 0e88206b..00000000 --- a/src/main/scala/com/raquo/airstream/state/LazyDerivedVarSignal.scala +++ /dev/null @@ -1,33 +0,0 @@ -package com.raquo.airstream.state - -import com.raquo.airstream.core.Protected -import com.raquo.airstream.misc.MapSignal - -import scala.util.Try - -class LazyDerivedVarSignal[I, O]( - parent: Var[I], - zoomIn: I => O, - parentDisplayName: => String -) extends MapSignal[I, O](parent.signal, project = zoomIn, recover = None) with StrictSignal[O] { self => - - // Note that even if owner kills subscription, this signal might remain due to other listeners - // override protected[state] def isStarted: Boolean = super.isStarted - - override protected def defaultDisplayName: String = parentDisplayName + ".signal" - - override def tryNow(): Try[O] = { - val newParentLastUpdateId = Protected.lastUpdateId(parent.signal) - if (newParentLastUpdateId != _parentLastUpdateId) { - // This branch can only run if !isStarted - val nextValue = currentValueFromParent() - updateCurrentValueFromParent(nextValue, newParentLastUpdateId) - nextValue - } else { - super.tryNow() - } - } - - override protected[state] def updateCurrentValueFromParent(nextValue: Try[O], nextParentLastUpdateId: Int): Unit = - super.updateCurrentValueFromParent(nextValue, nextParentLastUpdateId) -} diff --git a/src/main/scala/com/raquo/airstream/state/LazyStrictSignal.scala b/src/main/scala/com/raquo/airstream/state/LazyStrictSignal.scala new file mode 100644 index 00000000..45f32f65 --- /dev/null +++ b/src/main/scala/com/raquo/airstream/state/LazyStrictSignal.scala @@ -0,0 +1,44 @@ +package com.raquo.airstream.state + +import com.raquo.airstream.core.{Protected, Signal} +import com.raquo.airstream.misc.MapSignal + +import scala.util.Try + +/** #TODO[Naming,Org] Messy + * + * This signal offers the API of a [[StrictSignal]] but is actually + * lazy. All it does is let you PULL the signal's current value. + * This mostly works fine if your signal does not depend on any streams. + * + * We use this signal internally for derived Var use cases where we know + * that it should work fine. we may change the naming and structure of + * this class when we implement settle on a long term strategy for + * peekNow / pullNow https://github.com/raquo/Laminar/issues/130 + */ +class LazyStrictSignal[I, O]( + parentSignal: Signal[I], + zoomIn: I => O, + parentDisplayName: => String, + displayNameSuffix: String +) extends MapSignal[I, O](parentSignal, project = zoomIn, recover = None) with StrictSignal[O] { self => + + override protected def defaultDisplayName: String = parentDisplayName + displayNameSuffix + + override def tryNow(): Try[O] = { + val newParentLastUpdateId = Protected.lastUpdateId(parentSignal) + // #TODO This comparison only works when parentSignal is started or strict. + // - e.g. it does not help us in `split`, it only helps us with lazyZoom. + if (newParentLastUpdateId != _parentLastUpdateId) { + // This branch can only run if !isStarted + val nextValue = currentValueFromParent() + updateCurrentValueFromParent(nextValue, newParentLastUpdateId) + nextValue + } else { + super.tryNow() + } + } + + override protected[state] def updateCurrentValueFromParent(nextValue: Try[O], nextParentLastUpdateId: Int): Unit = + super.updateCurrentValueFromParent(nextValue, nextParentLastUpdateId) +} diff --git a/src/main/scala/com/raquo/airstream/state/SourceVar.scala b/src/main/scala/com/raquo/airstream/state/SourceVar.scala index becd7837..fe4decd2 100644 --- a/src/main/scala/com/raquo/airstream/state/SourceVar.scala +++ b/src/main/scala/com/raquo/airstream/state/SourceVar.scala @@ -6,7 +6,8 @@ import scala.util.Try /** The regular Var that's created with `Var.apply`. * - * See also DerivedVar, created with `myVar.zoom(a => b)(b => a)(owner)` + * See also DerivedVar, created with `myVar.zoom(a => b)((a, b) => a)(owner)`, + * and LazyDerivedVar, created with `myVar.zoomLazy(a => b)((a, b) => a)` */ class SourceVar[A] private[state](initial: Try[A]) extends Var[A] { @@ -22,7 +23,10 @@ class SourceVar[A] private[state](initial: Try[A]) extends Var[A] { override private[state] def getCurrentValue: Try[A] = currentValue - override private[state] def setCurrentValue(value: Try[A], transaction: Transaction): Unit = { + override private[state] def setCurrentValue( + value: Try[A], + transaction: Transaction + ): Unit = { currentValue = value _varSignal.onTry(value, transaction) } diff --git a/src/main/scala/com/raquo/airstream/state/Var.scala b/src/main/scala/com/raquo/airstream/state/Var.scala index b39a348b..d42d17bd 100644 --- a/src/main/scala/com/raquo/airstream/state/Var.scala +++ b/src/main/scala/com/raquo/airstream/state/Var.scala @@ -4,6 +4,7 @@ import com.raquo.airstream.core.AirstreamError.VarError import com.raquo.airstream.core.Source.SignalSource import com.raquo.airstream.core.{AirstreamError, Named, Observer, Signal, Sink, Transaction} import com.raquo.airstream.ownership.Owner +import com.raquo.airstream.split.SplittableVar import com.raquo.airstream.util.hasDuplicateTupleKeys import scala.util.{Failure, Success, Try} @@ -100,7 +101,12 @@ trait Var[A] extends SignalSource[A] with Sink[A] with Named { * as they may not get called if the Var's value is not observed. */ def zoomLazy[B](in: A => B)(out: (A, B) => A): Var[B] = { - new LazyDerivedVar[A, B](this, in, out, displayNameSuffix = ".zoomLazy") + val zoomedSignal = new LazyStrictSignal(signal, in, displayName, displayNameSuffix = ".zoomLazy.signal") + new LazyDerivedVar[A, B]( + parent = this, + signal = zoomedSignal, + zoomOut = (currValue, nextZoomedValue) => out(currValue, nextZoomedValue), + displayNameSuffix = ".zoomLazy") } def setTry(tryValue: Try[A]): Unit = writer.onTry(tryValue) @@ -270,5 +276,7 @@ object Var { hasDuplicateTupleKeys(tuples.map(t => t.copy(_1 = t._1.underlyingVar))) } + /** Provides methods on Var: split, splitMutate */ + implicit def toSplittableVar[M[_], Input](signal: Var[M[Input]]): SplittableVar[M, Input] = new SplittableVar(signal) } diff --git a/src/test/scala/com/raquo/airstream/misc/SplitSignalSpec.scala b/src/test/scala/com/raquo/airstream/misc/SplitSignalSpec.scala index f44cd750..cd9f8552 100644 --- a/src/test/scala/com/raquo/airstream/misc/SplitSignalSpec.scala +++ b/src/test/scala/com/raquo/airstream/misc/SplitSignalSpec.scala @@ -1185,7 +1185,7 @@ class SplitSignalSpec extends UnitSpec with BeforeAndAfter { var ownersById = Map[String, ManualOwner]() var fooSById = Map[String, Signal[Foo]]() - var mapFooSById = Map[String, Signal[Foo]]() + var mapFooSById = Map[String, Signal[Option[Foo]]]() val splitSignal = foosVar.signal.split(_.id)((id, _, fooS) => { ownersById.get(id).foreach(_.killSubscriptions()) @@ -1195,7 +1195,7 @@ class SplitSignalSpec extends UnitSpec with BeforeAndAfter { fooSById = fooSById.updated(id, fooS) - val mapFooS = foosVar.signal.map(_.find(_.id == id).get) + val mapFooS = foosVar.signal.map(_.find(_.id == id)) mapFooSById = mapFooSById.updated(id, mapFooS) }) @@ -1225,7 +1225,7 @@ class SplitSignalSpec extends UnitSpec with BeforeAndAfter { assert(mapFooSById("a") eq mapFooS_A) assert(fooS_A_observed_1.now() == Foo("a", 2)) - assert(mapFooS_A_observed_1.now() == Foo("a", 2)) + assert(mapFooS_A_observed_1.now().contains(Foo("a", 2))) // -- @@ -1247,13 +1247,13 @@ class SplitSignalSpec extends UnitSpec with BeforeAndAfter { // Verifying that these signals don't update after their subs getting killed assert(fooS_A_observed_1.now() == Foo("a", 2)) - assert(mapFooS_A_observed_1.now() == Foo("a", 2)) + assert(mapFooS_A_observed_1.now().contains(Foo("a", 2))) // Verifying that if the start of the child signal is delayed, // the child signal still picks up the most recent value, // and not the initial value that it was instantiated with. assert(fooS_B_observed_1.now() == Foo("b", 2)) - assert(mapFooS_B_observed_1.now() == (Foo("b", 2))) + assert(mapFooS_B_observed_1.now().contains(Foo("b", 2))) // -- @@ -1264,7 +1264,7 @@ class SplitSignalSpec extends UnitSpec with BeforeAndAfter { assert(mapFooSById("a") eq mapFooS_A) assert(fooS_B_observed_1.now() == Foo("b", 3)) - assert(mapFooS_B_observed_1.now() == Foo("b", 3)) + assert(mapFooS_B_observed_1.now().contains(Foo("b", 3))) // -- @@ -1272,7 +1272,7 @@ class SplitSignalSpec extends UnitSpec with BeforeAndAfter { val mapFooS_A_observed_2 = mapFooS_A.observe(owner) assert(fooS_A_observed_2.now() == Foo("a", 4)) - assert(mapFooS_A_observed_2.now() == Foo("a", 4)) + assert(mapFooS_A_observed_2.now().contains(Foo("a", 4))) // -- @@ -1283,7 +1283,7 @@ class SplitSignalSpec extends UnitSpec with BeforeAndAfter { assert(mapFooSById("a") eq mapFooS_A) assert(fooS_A_observed_2.now() == Foo("a", 5)) - assert(mapFooS_A_observed_2.now() == Foo("a", 5)) + assert(mapFooS_A_observed_2.now().contains(Foo("a", 5))) } it("child split signal re-syncs with parent stream") { diff --git a/src/test/scala/com/raquo/airstream/misc/SplitVarSpec.scala b/src/test/scala/com/raquo/airstream/misc/SplitVarSpec.scala new file mode 100644 index 00000000..71047e36 --- /dev/null +++ b/src/test/scala/com/raquo/airstream/misc/SplitVarSpec.scala @@ -0,0 +1,715 @@ +package com.raquo.airstream.misc + +import com.raquo.airstream.UnitSpec +import com.raquo.airstream.core.{Observer, Signal, Transaction} +import com.raquo.airstream.eventbus.EventBus +import com.raquo.airstream.fixtures.{Effect, TestableOwner} +import com.raquo.airstream.ownership.{DynamicOwner, DynamicSubscription, ManualOwner} +import com.raquo.airstream.split.DuplicateKeysConfig +import com.raquo.airstream.state.Var +import com.raquo.ew.JsArray +import org.scalatest.{Assertion, BeforeAndAfter} + +import scala.collection.mutable + +// #Warning: this test is not in the `split` package to make sure that Scala 2.13 specific implicits +// in the split package will be resolved correctly even outside of that package. + +class SplitVarSpec extends UnitSpec with BeforeAndAfter { + + case class Foo(id: String, version: Int) + + case class Bar(id: String) + + case class Element(id: String, fooSignal: Signal[Foo]) { + override def toString: String = s"Element($id, fooSignal)" + } + + private val originalDuplicateKeysConfig = DuplicateKeysConfig.default + + after { + DuplicateKeysConfig.setDefault(originalDuplicateKeysConfig) + } + + def withOrWithoutDuplicateKeyWarnings(code: => Assertion): Assertion = { + // This wrapper checks that behaviour is identical in both modes + DuplicateKeysConfig.setDefault(DuplicateKeysConfig.noWarnings) + withClue("DuplicateKeysConfig.shouldWarn=false")(code) + DuplicateKeysConfig.setDefault(DuplicateKeysConfig.warnings) + withClue("DuplicateKeysConfig.shouldWarn=true")(code) + } + + it("split signal - raw semantics and lifecycle (1)") { + withOrWithoutDuplicateKeyWarnings { + val effects = mutable.Buffer[Effect[String]]() + + val myVar = Var[List[Foo]](Foo("initial", 1) :: Nil) + + val outerDynamicOwner = new DynamicOwner(() => throw new Exception("split outer dynamic owner accessed after it was killed")) + + val innerDynamicOwner = new DynamicOwner(() => throw new Exception("split inner dynamic owner accessed after it was killed")) + + // #Note: important to activate now, we're testing this (see comments below) + outerDynamicOwner.activate() + innerDynamicOwner.activate() + + // #Note: `identity` here means we're not using `distinct` to filter out redundancies in fooSignal + // We test like this to make sure that the underlying splitting machinery works correctly without this crutch + val signal = myVar.split( + key = _.id, distinctCompose = identity + )( + (key, initialFoo, fooVar) => { + assert(key == initialFoo.id, "Key does not match initial value") + effects += Effect(s"init-child-$key", key + "-" + initialFoo.version.toString) + // @Note keep foreach / addObserver here – this is important. + // It tests that SplitSignal does not cause an infinite loop trying to evaluate its initialValue. + DynamicSubscription.subscribeCallback( + innerDynamicOwner, + owner => fooVar.signal.foreach { foo => + assert(key == foo.id, "Subsequent value does not match initial key") + effects += Effect(s"update-child-$key", foo.id + "-" + foo.version.toString) + }(owner) + ) + Bar(key) + }) + + DynamicSubscription.subscribeCallback( + outerDynamicOwner, + owner => signal.foreach { result => + effects += Effect("result", result.toString) + }(owner) + ) + + effects shouldBe mutable.Buffer( + Effect("init-child-initial", "initial-1"), + Effect("update-child-initial", "initial-1"), + Effect("result", "List(Bar(initial))") + ) + + effects.clear() + + // -- + + myVar.writer.onNext(Foo("b", 1) :: Foo("a", 3) :: Nil) + + effects shouldBe mutable.Buffer( + Effect("init-child-b", "b-1"), + Effect("update-child-b", "b-1"), + Effect("init-child-a", "a-3"), + Effect("update-child-a", "a-3"), + Effect("result", "List(Bar(b), Bar(a))") + ) + + effects.clear() + + // -- + + outerDynamicOwner.deactivate() + innerDynamicOwner.deactivate() + + effects shouldBe mutable.Buffer() + + // -- + + outerDynamicOwner.activate() + innerDynamicOwner.activate() + + effects shouldBe mutable.Buffer( + // #Note `initial` is here because our code created an inner subscription for `initial` + // and kept it alive even after the element was removed. This inner signal itself will + // not receive any updates until "initial" key is added to the inputs again (actually + // it might cause issues in this pattern if this happens), but this inner signal's + // current value is still sent to the observer when we re-activate its dynamic owner. + Effect("result", "List(Bar(b), Bar(a))"), + Effect("update-child-initial", "initial-1"), + Effect("update-child-b", "b-1"), + Effect("update-child-a", "a-3"), + ) + + effects.clear() + + // -- + + myVar.writer.onNext(Foo("b", 2) :: Foo("a", 3) :: Nil) + + // This assertion makes sure that `resetOnStop` is set correctly in `drop(1, resetOnStop = false)` + effects shouldBe mutable.Buffer( + Effect("result", "List(Bar(b), Bar(a))"), + Effect("update-child-b", "b-2"), + Effect("update-child-a", "a-3"), + ) + + effects.clear() + + // -- + + outerDynamicOwner.deactivate() + + effects shouldBe mutable.Buffer() + + // -- + + outerDynamicOwner.activate() + + effects shouldBe mutable.Buffer( + Effect("result", "List(Bar(b), Bar(a))") + ) + + effects.clear() + + // -- + + myVar.writer.onNext(Foo("a", 4) :: Foo("b", 3) :: Nil) + + effects shouldBe mutable.Buffer( + Effect("result", "List(Bar(a), Bar(b))"), + Effect("update-child-b", "b-3"), + Effect("update-child-a", "a-4"), + ) + + //effects.clear() + } + } + + it("split signal - raw semantics and lifecycle (2)") { + withOrWithoutDuplicateKeyWarnings { + val effects = mutable.Buffer[Effect[String]]() + + val myVar = Var[List[Foo]](Foo("initial", 1) :: Nil) + + val outerDynamicOwner = new DynamicOwner(() => throw new Exception("split outer dynamic owner accessed after it was killed")) + + val innerDynamicOwner = new DynamicOwner(() => throw new Exception("split inner dynamic owner accessed after it was killed")) + + // #Note: important to NOT activate the inner subscription right away, we're testing this (see comments below) + outerDynamicOwner.activate() + //innerDynamicOwner.activate() + + // #Note: `identity` here means we're not using `distinct` to filter out redundancies in fooSignal + // We test like this to make sure that the underlying splitting machinery works correctly without this crutch + val signal = myVar.signal.split(_.id, distinctCompose = identity)(project = (key, initialFoo, fooSignal) => { + assert(key == initialFoo.id, "Key does not match initial value") + effects += Effect(s"init-child-$key", key + "-" + initialFoo.version.toString) + DynamicSubscription.subscribeCallback( + innerDynamicOwner, + owner => fooSignal.foreach { foo => + assert(key == foo.id, "Subsequent value does not match initial key") + effects += Effect(s"update-child-$key", foo.id + "-" + foo.version.toString) + }(owner) + ) + // #Note: Test that our dropping logic works does not break events scheduled after transaction boundary + Transaction { _ => + DynamicSubscription.subscribeCallback( + innerDynamicOwner, + owner => fooSignal.foreach { foo => + assert(key == foo.id, "Subsequent value does not match initial key [new-trx]") + effects += Effect(s"new-trx-update-child-$key", foo.id + "-" + foo.version.toString) + }(owner) + ) + } + Bar(key) + }) + + DynamicSubscription.subscribeCallback( + outerDynamicOwner, + owner => signal.foreach { result => + effects += Effect("result", result.toString) + }(owner) + ) + + effects shouldBe mutable.Buffer( + Effect("init-child-initial", "initial-1"), + Effect("result", "List(Bar(initial))") + ) + + effects.clear() + + // -- + + innerDynamicOwner.activate() + + effects shouldBe mutable.Buffer( + Effect("update-child-initial", "initial-1"), + Effect("new-trx-update-child-initial", "initial-1"), + ) + + effects.clear() + + // -- + + myVar.writer.onNext(Foo("b", 1) :: Foo("a", 3) :: Nil) + + effects shouldBe mutable.Buffer( + Effect("init-child-b", "b-1"), + Effect("update-child-b", "b-1"), + Effect("init-child-a", "a-3"), + Effect("update-child-a", "a-3"), + Effect("result", "List(Bar(b), Bar(a))"), + Effect("new-trx-update-child-b", "b-1"), + Effect("new-trx-update-child-a", "a-3") + ) + + //effects.clear() + } + } + + it("split signal - raw semantics and lifecycle (3)") { + withOrWithoutDuplicateKeyWarnings { + val effects = mutable.Buffer[Effect[String]]() + + val myVar = Var[List[Foo]](Foo("initial", 1) :: Nil) + + val outerDynamicOwner = new DynamicOwner(() => throw new Exception("split outer dynamic owner accessed after it was killed")) + + val innerDynamicOwner = new DynamicOwner(() => throw new Exception("split inner dynamic owner accessed after it was killed")) + + // #Note: `identity` here means we're not using `distinct` to filter out redundancies in fooSignal + // We test like this to make sure that the underlying splitting machinery works correctly without this crutch + val signal = myVar.signal.split(_.id, distinctCompose = identity)(project = (key, initialFoo, fooSignal) => { + assert(key == initialFoo.id, "Key does not match initial value") + effects += Effect(s"init-child-$key", key + "-" + initialFoo.version.toString) + DynamicSubscription.subscribeCallback( + innerDynamicOwner, + owner => fooSignal.foreach { foo => + assert(key == foo.id, "Subsequent value does not match initial key") + effects += Effect(s"update-child-$key", foo.id + "-" + foo.version.toString) + }(owner) + ) + Bar(key) + }) + + DynamicSubscription.subscribeCallback( + outerDynamicOwner, + owner => signal.foreach { result => + effects += Effect("result", result.toString) + }(owner) + ) + + effects shouldBe mutable.Buffer() + + // -- + + outerDynamicOwner.activate() + innerDynamicOwner.activate() + + effects shouldBe mutable.Buffer( + Effect("init-child-initial", "initial-1"), + Effect("result", "List(Bar(initial))"), + Effect("update-child-initial", "initial-1") + ) + + effects.clear() + + // -- + + myVar.writer.onNext(Foo("b", 1) :: Foo("a", 3) :: Nil) + + effects shouldBe mutable.Buffer( + Effect("init-child-b", "b-1"), + Effect("update-child-b", "b-1"), + Effect("init-child-a", "a-3"), + Effect("update-child-a", "a-3"), + Effect("result", "List(Bar(b), Bar(a))") + ) + + //effects.clear() + } + } + + it("split signal - raw semantics and lifecycle (4)") { + withOrWithoutDuplicateKeyWarnings { + val effects = mutable.Buffer[Effect[String]]() + + val myVar = Var[List[Foo]](Foo("initial", 1) :: Nil) + + val dynamicOwner = new DynamicOwner(() => throw new Exception("split outer dynamic owner accessed after it was killed")) + + // #Note: `identity` here means we're not using `distinct` to filter out redundancies in fooSignal + // We test like this to make sure that the underlying splitting machinery works correctly without this crutch + val signal = myVar.signal.split(_.id, distinctCompose = identity)(project = (key, initialFoo, fooSignal) => { + assert(key == initialFoo.id, "Key does not match initial value") + effects += Effect(s"init-child-$key", key + "-" + initialFoo.version.toString) + Element(key, fooSignal) + }) + + DynamicSubscription.subscribeCallback( + dynamicOwner, + owner => signal.foreach { result => + effects += Effect("result", result.toString) + result.foreach { element => + DynamicSubscription.subscribeCallback( + dynamicOwner, + owner => element.fooSignal.foreach { foo => + assert(element.id == foo.id, "Subsequent value does not match initial key") + effects += Effect(s"update-child-${element.id}", foo.id + "-" + foo.version.toString) + }(owner) + ) + } + }(owner) + ) + + effects shouldBe mutable.Buffer() + + // -- + + dynamicOwner.activate() + //innerDynamicOwner.activate() + + effects shouldBe mutable.Buffer( + Effect("init-child-initial", "initial-1"), + Effect("result", "List(Element(initial, fooSignal))"), + Effect("update-child-initial", "initial-1") + ) + + effects.clear() + + // -- + + myVar.writer.onNext(Foo("b", 1) :: Foo("a", 3) :: Nil) + + effects shouldBe mutable.Buffer( + Effect("init-child-b", "b-1"), + Effect("init-child-a", "a-3"), + Effect("result", "List(Element(b, fooSignal), Element(a, fooSignal))"), + Effect("update-child-b", "b-1"), + Effect("update-child-a", "a-3") + ) + + //effects.clear() + } + } + + it("child split signal re-syncs with parent signal") { + // https://github.com/raquo/Airstream/issues/120 + + val owner = new ManualOwner + + val foosVar = Var[List[Foo]](Nil) + + var ownersById = Map[String, ManualOwner]() + var fooSById = Map[String, Signal[Foo]]() + var mapFooSById = Map[String, Signal[Option[Foo]]]() + + val splitSignal = foosVar.signal.split(_.id)((id, _, fooS) => { + ownersById.get(id).foreach(_.killSubscriptions()) + + val newOwner = new ManualOwner + ownersById = ownersById.updated(id, newOwner) + + fooSById = fooSById.updated(id, fooS) + + val mapFooS = foosVar.signal.map(_.find(_.id == id)) + mapFooSById = mapFooSById.updated(id, mapFooS) + }) + + // -- + + val splitSub = splitSignal.addObserver(Observer.empty)(owner) + + assert(ownersById.isEmpty) + assert(fooSById.isEmpty) + assert(mapFooSById.isEmpty) + + // -- + + foosVar.set(Foo("a", 1) :: Nil) + + val owner_A = ownersById("a") + val fooS_A = fooSById("a") + val mapFooS_A = mapFooSById("a") + + val fooS_A_observed_1 = fooS_A.observe(owner) + val mapFooS_A_observed_1 = mapFooS_A.observe(owner) + + foosVar.set(Foo("a", 2) :: Foo("b", 1) :: Nil) + + assert(ownersById("a") eq owner_A) + assert(fooSById("a") eq fooS_A) + assert(mapFooSById("a") eq mapFooS_A) + + assert(fooS_A_observed_1.now() == Foo("a", 2)) + assert(mapFooS_A_observed_1.now().contains(Foo("a", 2))) + + // -- + + fooS_A_observed_1.killOriginalSubscription() + mapFooS_A_observed_1.killOriginalSubscription() + + foosVar.set(Foo("b", 2) :: Foo("a", 3) :: Nil) + + val owner_B = ownersById("b") + val fooS_B = fooSById("b") + val mapFooS_B = mapFooSById("b") + + val fooS_B_observed_1 = fooS_B.observe(owner_B) + val mapFooS_B_observed_1 = mapFooS_B.observe(owner_B) + + assert(ownersById("b") eq owner_B) + assert(fooSById("b") eq fooS_B) + assert(mapFooSById("b") eq mapFooS_B) + + // Verifying that these signals don't update after their subs getting killed + assert(fooS_A_observed_1.now() == Foo("a", 2)) + assert(mapFooS_A_observed_1.now().contains(Foo("a", 2))) + + // Verifying that if the start of the child signal is delayed, + // the child signal still picks up the most recent value, + // and not the initial value that it was instantiated with. + assert(fooS_B_observed_1.now() == Foo("b", 2)) + assert(mapFooS_B_observed_1.now().contains(Foo("b", 2))) + + // -- + + foosVar.set(Foo("a", 4) :: Foo("b", 3) :: Nil) + + assert(ownersById("a") eq owner_A) + assert(fooSById("a") eq fooS_A) + assert(mapFooSById("a") eq mapFooS_A) + + assert(fooS_B_observed_1.now() == Foo("b", 3)) + assert(mapFooS_B_observed_1.now().contains(Foo("b", 3))) + + // -- + + val fooS_A_observed_2 = fooS_A.observe(owner) + val mapFooS_A_observed_2 = mapFooS_A.observe(owner) + + assert(fooS_A_observed_2.now() == Foo("a", 4)) + assert(mapFooS_A_observed_2.now().contains(Foo("a", 4))) + + // -- + + foosVar.set(Foo("a", 5) :: Nil) + + assert(ownersById("a") eq owner_A) + assert(fooSById("a") eq fooS_A) + assert(mapFooSById("a") eq mapFooS_A) + + assert(fooS_A_observed_2.now() == Foo("a", 5)) + assert(mapFooS_A_observed_2.now().contains(Foo("a", 5))) + } + + it("child split signal re-syncs with parent stream") { + // https://github.com/raquo/Airstream/issues/120 + + val owner = new ManualOwner + + val foosBus = new EventBus[List[Foo]] + + var ownersById = Map[String, ManualOwner]() + var fooSById = Map[String, Signal[Foo]]() + var mapFooSById = Map[String, Signal[Option[Foo]]]() + + val splitSignal = foosBus.stream.split(_.id)((id, _, fooS) => { + ownersById.get(id).foreach(_.killSubscriptions()) + + val newOwner = new ManualOwner + ownersById = ownersById.updated(id, newOwner) + + fooSById = fooSById.updated(id, fooS) + + val mapFooS = foosBus.stream.startWith(Nil).map(_.find(_.id == id)) + mapFooSById = mapFooSById.updated(id, mapFooS) + }) + + // -- + + val splitSub = splitSignal.addObserver(Observer.empty)(owner) + + assert(ownersById.isEmpty) + assert(fooSById.isEmpty) + assert(mapFooSById.isEmpty) + + // -- + + foosBus.emit(Foo("a", 1) :: Nil) + + val owner_A = ownersById("a") + val fooS_A = fooSById("a") + val mapFooS_A = mapFooSById("a") + + val fooS_A_observed_1 = fooS_A.observe(owner_A) + val mapFooS_A_observed_1 = mapFooS_A.observe(owner_A) + + foosBus.emit(Foo("a", 2) :: Foo("b", 1) :: Nil) + + assert(ownersById("a") eq owner_A) + assert(fooSById("a") eq fooS_A) + assert(mapFooSById("a") eq mapFooS_A) + + assert(fooS_A_observed_1.now() == Foo("a", 2)) + assert(mapFooS_A_observed_1.now().contains(Foo("a", 2))) + + // -- + + fooS_A_observed_1.killOriginalSubscription() + mapFooS_A_observed_1.killOriginalSubscription() + + foosBus.emit(Foo("b", 2) :: Foo("a", 3) :: Nil) + + val owner_B = ownersById("b") + val fooS_B = fooSById("b") + val mapFooS_B = mapFooSById("b") + + val fooS_B_observed_1 = fooS_B.observe(owner_B) + val mapFooS_B_observed_1 = mapFooS_B.observe(owner_B) + + assert(ownersById("b") eq owner_B) + assert(fooSById("b") eq fooS_B) + assert(mapFooSById("b") eq mapFooS_B) + + // Verifying that these signals don't update after their subs getting killed + assert(fooS_A_observed_1.now() == Foo("a", 2)) + assert(mapFooS_A_observed_1.now().contains(Foo("a", 2))) + + // Verifying that if the start of the child signal is delayed, + // the child signal still picks up the most recent value, + // and not the initial value that it was instantiated with. + assert(fooS_B_observed_1.now() == Foo("b", 2)) + assert(mapFooS_B_observed_1.now().isEmpty) // this is based on stream so it can't actually re-sync + + // -- + + foosBus.emit(Foo("a", 4) :: Foo("b", 3) :: Nil) + + assert(ownersById("a") eq owner_A) + assert(fooSById("a") eq fooS_A) + assert(mapFooSById("a") eq mapFooS_A) + + assert(fooS_B_observed_1.now() == Foo("b", 3)) + assert(mapFooS_B_observed_1.now().contains(Foo("b", 3))) + + // -- + + val fooS_A_observed_2 = fooS_A.observe(owner) + val mapFooS_A_observed_2 = mapFooS_A.observe(owner) + + assert(fooS_A_observed_2.now() == Foo("a", 4)) + assert(mapFooS_A_observed_2.now().contains(Foo("a", 2))) // this is based on stream so it can't actually re-sync + + // -- + + foosBus.emit(Foo("a", 5) :: Nil) + + assert(ownersById("a") eq owner_A) + assert(fooSById("a") eq fooS_A) + assert(mapFooSById("a") eq mapFooS_A) + + assert(fooS_A_observed_2.now() == Foo("a", 5)) + assert(mapFooS_A_observed_2.now().contains(Foo("a", 5))) + } + + it("split mutable array") { + val effects = mutable.Buffer[Effect[String]]() + + // #TODO[Test] Would be nice to also verify this with immutable.Seq + // as their implementations are separate. + + val arr = JsArray(Foo("initial", 1)) + + val myVar = Var(arr) + + val owner = new TestableOwner + + val signal = myVar.signal.split(_.id)(project = (key, initialFoo, fooSignal) => { + assert(key == initialFoo.id, "Key does not match initial value") + effects += Effect("init-child", key + "-" + initialFoo.version.toString) + fooSignal.foreach { foo => + assert(key == foo.id, "Subsequent value does not match initial key") + effects += Effect("update-child", foo.id + "-" + foo.version.toString) + }(owner) + Bar(key) + }) + + signal.foreach { result => + effects += Effect("result", result.toString) + }(owner) + + effects shouldBe mutable.Buffer( + Effect("init-child", "initial-1"), + Effect("update-child", "initial-1"), + Effect("result", "Bar(initial)") // Single item JS Array is printed this way + ) + + effects.clear() + + // -- + + arr.update(0, Foo("a", 1)) + + myVar.set(arr) + + effects shouldBe mutable.Buffer( + Effect("init-child", "a-1"), + Effect("update-child", "a-1"), + Effect("result", "Bar(a)") // Single item JS Array is printed this way + ) + + effects.clear() + + // -- + + arr.update(0, Foo("a", 2)) + + myVar.set(arr) + + effects shouldBe mutable.Buffer( + Effect("result", "Bar(a)"), // Single item JS Array is printed this way + Effect("update-child", "a-2") + ) + + effects.clear() + + // -- + + arr.update(0, Foo("a", 3)) + arr.push(Foo("b", 1)) + + myVar.set(arr) + + effects shouldBe mutable.Buffer( + Effect("init-child", "b-1"), + Effect("update-child", "b-1"), + Effect("result", "Bar(a),Bar(b)"), // Multi item JS Array is printed this way + Effect("update-child", "a-3") + ) + + effects.clear() + + // -- + + arr.reverse() + + myVar.set(arr) + + effects shouldBe mutable.Buffer( + Effect("result", "Bar(b),Bar(a)") // Multi item JS Array is printed this way + ) + + effects.clear() + + // -- + + arr.update(0, Foo("b", 2)) + arr.pop() + + myVar.set(arr) + + effects shouldBe mutable.Buffer( + Effect("result", "Bar(b)"), // Single item JS Array is printed this way + Effect("update-child", "b-2") + ) + + effects.clear() + + // -- + + myVar.writer.onNext(arr) + + effects shouldBe mutable.Buffer( + Effect("result", "Bar(b)") // Single item JS Array is printed this way + ) + + //effects.clear() + } +}