Skip to content

Commit

Permalink
New: Splittable Vars (#112)
Browse files Browse the repository at this point in the history
  • Loading branch information
raquo committed Jul 11, 2024
1 parent 679292a commit 1073514
Show file tree
Hide file tree
Showing 13 changed files with 1,105 additions and 61 deletions.
3 changes: 3 additions & 0 deletions src/main/scala/com/raquo/airstream/core/BaseObservable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ trait BaseObservable[+Self[+_] <: Observable[_], +A] extends Source[A] with Name

/** Child observable should call this method on its parents when it is started.
* This observable calls [[onStart]] if this action has given it its first observer (internal or external).
*
* I think `shouldCallMaybeWillStart` should be `false` when we're calling this from onStart.
* In that case, `maybeWillStart` was already called. #TODO But `maybeWillStart` checks for willStartDone status, so does it actually matter?
*/
protected[airstream] def addInternalObserver(observer: InternalObserver[A], shouldCallMaybeWillStart: Boolean): Unit

Expand Down
103 changes: 103 additions & 0 deletions src/main/scala/com/raquo/airstream/split/MutableSplittable.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.raquo.airstream.split

import com.raquo.ew.JsArray

import scala.collection.mutable
import scala.scalajs.js

trait MutableSplittable[M[_]] {

val splittable: Splittable[M]

def size[A](items: M[A]): Int

/** Equivalent of array(index) */
def getByIndex[A](items: M[A], index: Int): A

/** Equivalent of array.update(index, newValue) */
def updateAtIndex[A](items: M[A], index: Int, newItem: A): Unit

/** Equivalent of array.find(predicate).foreach(array.update(foundIndex, newItem)) */
def findUpdateInPlace[A](items: M[A], predicate: A => Boolean, newItem: A): Boolean = {
// #Warning: This implementation assumes cheap O(1) index access.
// Override this method for efficiency as needed (e.g. for ListBuffer – see below).
var found = false
var index = 0
val len = size(items)
while (!found && index < len) {
if (predicate(getByIndex(items, index))) {
updateAtIndex(items, index, newItem)
found = true
}
index += 1
}
found
}
}

object MutableSplittable {

implicit object JsArrayMutableSplittable extends MutableSplittable[JsArray] {

override val splittable: Splittable[JsArray] = Splittable.JsArraySplittable

override def size[A](items: JsArray[A]): Int = items.length

override def getByIndex[A](items: JsArray[A], index: Int): A = items(index)

override def updateAtIndex[A](items: JsArray[A], index: Int, newItem: A): Unit = {
items.update(index, newItem)
}
}

implicit object ScalaJsArrayMutableSplittable extends MutableSplittable[js.Array] {

override val splittable: Splittable[js.Array] = Splittable.ScalaJsArraySplittable

override def size[A](items: js.Array[A]): Int = items.length

override def getByIndex[A](items: js.Array[A], index: Int): A = items(index)

override def updateAtIndex[A](items: js.Array[A], index: Int, newItem: A): Unit = {
items.update(index, newItem)
}
}

implicit object BufferMutableSplittable extends MutableSplittable[mutable.Buffer] {

override val splittable: Splittable[mutable.Buffer] = Splittable.BufferSplittable

override def size[A](items: mutable.Buffer[A]): Int = items.size

override def getByIndex[A](items: mutable.Buffer[A], index: Int): A = items(index)

override def updateAtIndex[A](items: mutable.Buffer[A], index: Int, newItem: A): Unit = {
items.update(index, newItem)
}

override def findUpdateInPlace[A](
items: mutable.Buffer[A],
predicate: A => Boolean,
newItem: A
): Boolean = {
items match {
case _: mutable.ListBuffer[A @unchecked] =>
// This implementation is more efficient when accessing elements by index is costly.
val iterator = items.iterator
var found = false
var index = 0
while (!found && iterator.hasNext) {
if (predicate(iterator.next())) {
updateAtIndex(items, index, newItem)
found = true
}
index += 1
}
found
case _ =>
// All other Buffer implementations are indexed (I think)
super.findUpdateInPlace(items, predicate, newItem)
}
}
}
}
72 changes: 63 additions & 9 deletions src/main/scala/com/raquo/airstream/split/SplitSignal.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.raquo.airstream.split

import com.raquo.airstream.common.SingleParentSignal
import com.raquo.airstream.common.{InternalTryObserver, SingleParentSignal}
import com.raquo.airstream.core.{AirstreamError, Protected, Signal, Transaction}
import com.raquo.airstream.timing.SyncDelayStream

Expand Down Expand Up @@ -29,7 +29,8 @@ class SplitSignal[M[_], Input, Output, Key](
distinctCompose: Signal[Input] => Signal[Input],
project: (Key, Input, Signal[Input]) => Output,
splittable: Splittable[M],
duplicateKeysConfig: DuplicateKeysConfig = DuplicateKeysConfig.default
duplicateKeysConfig: DuplicateKeysConfig = DuplicateKeysConfig.default,
strict: Boolean = false // #TODO `false` default for now to keep compatibility with 17.0.0 - consider changing in 18.0.0 #nc
) extends SingleParentSignal[M[Input], M[Output]] {

override protected val topoRank: Int = Protected.topoRank(parent) + 1
Expand All @@ -43,8 +44,8 @@ class SplitSignal[M[_], Input, Output, Key](
// likely help in most popular use cases.
// - However, don't bother until we can benchmark how much time we spend
// in memoization, and how much we'll gain from switching to JS Maps.
/** key -> (inputValue, outputValue, lastParentUpdateId) */
private[this] val memoized: mutable.Map[Key, (Input, Output, Int)] = mutable.Map.empty
/** key -> (inputValue, inputSignal, outputValue, lastParentUpdateId) */
private[this] val memoized: mutable.Map[Key, (Input, Signal[Input], Output, Int)] = mutable.Map.empty

override protected def onTry(nextParentValue: Try[M[Input]], transaction: Transaction): Unit = {
super.onTry(nextParentValue, transaction)
Expand All @@ -56,6 +57,10 @@ class SplitSignal[M[_], Input, Output, Key](

private[this] val sharedDelayedParent = new SyncDelayStream(parent, this)

private[this] val emptyObserver = new InternalTryObserver[Input] {
override protected def onTry(nextValue: Try[Input], transaction: Transaction): Unit = ()
}

private[this] def memoizedProject(nextInputs: M[Input]): M[Output] = {
// Any keys not in this set by the end of this function will be removed from `memoized` map
// This ensures that previously memoized values are forgotten once the source observables stops emitting their inputs
Expand All @@ -74,9 +79,9 @@ class SplitSignal[M[_], Input, Output, Key](

nextKeys += memoizedKey

val cachedOutput = memoized.get(memoizedKey).map(_._2)
val cachedSignalAndOutput = memoized.get(memoizedKey).map(t => (t._2, t._3))

val nextOutput = cachedOutput.getOrElse {
val nextSignalAndOutput = cachedSignalAndOutput.getOrElse {
val initialInput = nextInput

// @warning !!! DANGER ZONE !!!
Expand Down Expand Up @@ -109,25 +114,36 @@ class SplitSignal[M[_], Input, Output, Key](
new SplitChildSignal[M, Input](
sharedDelayedParent,
initialValue = Some((initialInput, Protected.lastUpdateId(parent))),
() => memoized.get(memoizedKey).map(t => (t._1, t._3))
() => memoized.get(memoizedKey).map(t => (t._1, t._4))
)
)

if (isStarted && strict) {
inputSignal.addInternalObserver(emptyObserver, shouldCallMaybeWillStart = true)
}

val newOutput = project(memoizedKey, initialInput, inputSignal)

newOutput
(inputSignal, newOutput)
}

val inputSignal = nextSignalAndOutput._1
val nextOutput = nextSignalAndOutput._2

// Cache this key for the first time, or update the input so that inputSignal can fetch it
// dom.console.log(s"${this} memoized.update ${memoizedKey} -> ${nextInput}")
memoized.update(memoizedKey, (nextInput, nextOutput, Protected.lastUpdateId(parent)))
memoized.update(memoizedKey, (nextInput, inputSignal, nextOutput, Protected.lastUpdateId(parent)))

nextOutput
})

memoized.keys.foreach { memoizedKey =>
if (!nextKeys.contains(memoizedKey)) {
// dom.console.log(s"${this} memoized.remove ${memoizedKey}")
if (strict) {
val inputSignal = memoized(memoizedKey)._2
inputSignal.removeInternalObserver(emptyObserver)
}
memoized.remove(memoizedKey)
}
}
Expand All @@ -140,4 +156,42 @@ class SplitSignal[M[_], Input, Output, Key](

nextOutputs
}

override protected[this] def onStart(): Unit = {
if (strict) {
parent.tryNow().foreach { inputs =>
// splittable.foreach is perhaps less efficient that memoized.keys,
// but it has a predictable order that users expect.
splittable.foreach(inputs, (input: Input) => {
val memoizedKey = key(input)
val maybeInputSignal = memoized.get(memoizedKey).map(_._2)
maybeInputSignal.foreach { inputSignal =>
// - If inputSignal not found because `parent.tryNow()` and `memoized`
// are temporarily out of sync, it should be added later just fine
// when they sync up.
// - Typical pattern is to call Protected.maybeWillStart(inputSignal) in onWillStart,
// but our SplitSignal does not actually depend on these inputSignal-s, so I think
// it's ok to do both onWillStart and onStart for them here.
inputSignal.addInternalObserver(emptyObserver, shouldCallMaybeWillStart = true)
}
})
}
}
super.onStart()
}

override protected[this] def onStop(): Unit = {
if (strict) {
// memoized.keys has no defined order, so we don't want to
// use it for starting subscriptions, but for stopping them,
// seems fine.
// This way we make sure to remove observers from exactly
// all items that are in `memoized`, no more, no less.
memoized.keys.foreach { memoizedKey =>
val inputSignal = memoized(memoizedKey)._2
inputSignal.removeInternalObserver(emptyObserver)
}
}
super.onStop()
}
}
67 changes: 66 additions & 1 deletion src/main/scala/com/raquo/airstream/split/Splittable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,40 @@ package com.raquo.airstream.split

import com.raquo.ew.{JsArray, JsVector}

import scala.collection.immutable
import scala.collection.{immutable, mutable}
import scala.scalajs.js

/** The `split` operator needs an implicit instance of Splittable[M] in order to work on observables of M[_] */
trait Splittable[M[_]] {

/** Equivalent of list.map(project) */
def map[A, B](inputs: M[A], project: A => B): M[B]

/** Equivalent of list.foreach(f) */
def foreach[A](inputs: M[A], f: A => Unit): Unit = {
// #TODO[API] This default implementation is inefficient,
// we're only adding it to satisfy binary compatibility in 17.1.0.
// #nc Remove this implementation later.
map(inputs, f)
}

/** Find the FIRST item matching predicate, and return a collection with it updated */
def findUpdate[A](inputs: M[A], predicate: A => Boolean, newItem: A): M[A] = {
// #TODO should we try to provide more efficient implementations for:
// Vector, ArraySeq, ArrayBuffer, JsArray, js.Array, JsVector?
// - but what if we have e.g. ArrayBuffer typed as Buffer? Don't want runtime type checks...
// - is the impl below actually less efficient than using indexWhere?
var found = false // keys must be unique, so we can break early
map[A, A](inputs, item => {
if (!found && predicate(item)) {
found = true
newItem
} else {
item
}
})
}

def zipWithIndex[A](inputs: M[A]): M[(A, Int)] = {
var ix = -1
map(inputs, (input: A) => {
Expand All @@ -27,48 +53,71 @@ object Splittable extends LowPrioritySplittableImplicits {

override def map[A, B](inputs: List[A], project: A => B): List[B] = inputs.map(project)

override def foreach[A](inputs: List[A], f: A => Unit): Unit = inputs.foreach(f)

override def empty[A]: List[A] = Nil
}

implicit object VectorSplittable extends Splittable[Vector] {

override def map[A, B](inputs: Vector[A], project: A => B): Vector[B] = inputs.map(project)

override def foreach[A](inputs: Vector[A], f: A => Unit): Unit = inputs.foreach(f)

override def empty[A]: Vector[A] = Vector.empty
}

implicit object SetSplittable extends Splittable[Set] {

override def map[A, B](inputs: Set[A], project: A => B): Set[B] = inputs.map(project)

override def foreach[A](inputs: Set[A], f: A => Unit): Unit = inputs.foreach(f)

override def empty[A]: Set[A] = Set.empty
}

implicit object OptionSplittable extends Splittable[Option] {

override def map[A, B](inputs: Option[A], project: A => B): Option[B] = inputs.map(project)

override def foreach[A](inputs: Option[A], f: A => Unit): Unit = inputs.foreach(f)

override def empty[A]: Option[A] = None
}

implicit object BufferSplittable extends Splittable[mutable.Buffer] {

override def map[A, B](inputs: mutable.Buffer[A], project: A => B): mutable.Buffer[B] = inputs.map(project)

override def foreach[A](inputs: mutable.Buffer[A], f: A => Unit): Unit = inputs.foreach(f)

override def empty[A]: mutable.Buffer[A] = mutable.Buffer()
}

implicit object JsArraySplittable extends Splittable[JsArray] {

override def map[A, B](inputs: JsArray[A], project: A => B): JsArray[B] = inputs.map(project)

override def foreach[A](inputs: JsArray[A], f: A => Unit): Unit = inputs.forEach(f)

override def empty[A]: JsArray[A] = JsArray()
}

implicit object JsVectorSplittable extends Splittable[JsVector] {

override def map[A, B](inputs: JsVector[A], project: A => B): JsVector[B] = inputs.map(project)

override def foreach[A](inputs: JsVector[A], f: A => Unit): Unit = inputs.forEach(f)

override def empty[A]: JsVector[A] = JsVector()
}

implicit object ScalaJsArraySplittable extends Splittable[js.Array] {

override def map[A, B](inputs: js.Array[A], project: A => B): js.Array[B] = inputs.map(project)

override def foreach[A](inputs: js.Array[A], f: A => Unit): Unit = inputs.foreach(f)

override def empty[A]: js.Array[A] = js.Array()
}
}
Expand All @@ -88,6 +137,14 @@ trait LowPrioritySplittableImplicits extends LowestPrioritySplittableImplicits {
strictInputs.map(project)
}

override def foreach[A](inputs: Seq[A], f: A => Unit): Unit = {
val strictInputs = inputs match {
case lazyList: LazyList[A @unchecked] => lazyList.toList
case _ => inputs
}
strictInputs.foreach(f)
}

override def empty[A]: immutable.Seq[A] = Nil
}
}
Expand All @@ -104,6 +161,14 @@ trait LowestPrioritySplittableImplicits {
strictInputs.map(project)
}

override def foreach[A](inputs: collection.Seq[A], f: A => Unit): Unit = {
val strictInputs = inputs match {
case lazyList: LazyList[A @unchecked] => lazyList.toList
case _ => inputs
}
strictInputs.foreach(f)
}

override def empty[A]: collection.Seq[A] = Nil
}
}
Loading

0 comments on commit 1073514

Please sign in to comment.