Skip to content

Commit

Permalink
API: Use Transaction.apply instead of constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
raquo committed Nov 21, 2023
1 parent 9a00b3a commit e3aab7e
Show file tree
Hide file tree
Showing 24 changed files with 107 additions and 108 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ println(s"After set: ${myVar.now()}")
myVar.update(_ + 1)
println(s"After update: ${myVar.now()}")

Transaction {
Transaction { _ =>
println(s"After trx: ${myVar.now()}")
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/raquo/airstream/combine/MergeStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ class MergeStream[A](
if (lastFiredInTrx.contains(transaction)) {
//println("- syncFire in new trx")
nextValue.fold(
nextError => new Transaction(fireError(nextError, _)),
nextEvent => new Transaction(fireValue(nextEvent, _))
nextError => Transaction(fireError(nextError, _)),
nextEvent => Transaction(fireValue(nextEvent, _))
)
} else {
lastFiredInTrx = transaction
Expand Down
13 changes: 6 additions & 7 deletions src/main/scala/com/raquo/airstream/core/Transaction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,14 @@ class Transaction(private[Transaction] var code: Transaction => Any) {

object Transaction {

/** Create new transaction (typically used in internal observable code) */
def apply(code: Transaction => Unit): Unit = new Transaction(code)

/** Create new transaction (typically used in end user code).
/** Create new transaction.
*
* Warning: It is rare that you need to manually create transactions.
* Typically used in internal observable code.
*
* Warning: It is rare that end users need to manually create transactions.
* Example of legitimate use case: [[https://github.com/raquo/Airstream/#var-transaction-delay Var transaction delay]]
*/
def apply(code: => Unit): Unit = new Transaction(_ => code)
def apply(code: Transaction => Unit): Unit = new Transaction(code)

/** This object holds a queue of callbacks that should be executed
* when all observables finish starting. This lets `signal.changes`
Expand Down Expand Up @@ -158,7 +157,7 @@ object Transaction {
//println("- no pending callbacks")
if (postStartTransactions.length > 0) {
//println(s"> CREATE ALT RESOLVE TRX. Num trx-s = ${postStartTransactions.length}")
Transaction {
Transaction { _ =>
while (postStartTransactions.length > 0) {
pendingTransactions.add(postStartTransactions.shift())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class CustomSignalSource[A] (
) extends WritableSignal[A] with CustomSource[A] {

override protected[this] val config: Config = makeConfig(
value => new Transaction(fireTry(value, _)),
value => Transaction(fireTry(value, _)),
() => tryNow(),
() => startIndex,
() => isStarted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ trait CustomSource[A] extends WritableObservable[A] {

override protected[this] def onStart(): Unit = {
Try(config.onStart()).recover[Unit] {
case err: Throwable => new Transaction(fireError(err, _))
case err: Throwable => Transaction(fireError(err, _))
}
super.onStart()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,8 @@ class CustomStreamSource[A] (
) extends WritableStream[A] with CustomSource[A] {

override protected[this] val config: Config = makeConfig(
value => {
new Transaction(fireValue(value, _))
},
err => {
new Transaction(fireError(err, _))
},
value => Transaction(fireValue(value, _)),
err => Transaction(fireError(err, _)),
() => startIndex,
() => isStarted
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class EventBusStream[A] private[eventbus] () extends WritableStream[A] with Inte

//println(s"> init trx from EventBusStream(${nextValue})")

new Transaction(fireValue(nextValue, _))
Transaction(fireValue(nextValue, _))
}

/** Helper method to support batch emit using `WriteBus.emit` / `WriteBus.emitTry` */
Expand All @@ -55,7 +55,7 @@ class EventBusStream[A] private[eventbus] () extends WritableStream[A] with Inte
}

override protected def onError(nextError: Throwable, transaction: Transaction): Unit = {
new Transaction(fireError(nextError, _))
Transaction(fireError(nextError, _))
}

override protected def onWillStart(): Unit = {
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/raquo/airstream/eventbus/WriteBus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ object WriteBus {
if (hasDuplicateTupleKeys(values.map(_.tuple))) {
throw new Exception("Unable to {EventBus,WriteBus}.emit: the provided list of event buses has duplicates. You can't make an observable emit more than one event per transaction.")
}
new Transaction(trx => values.foreach(emitValue(_, trx)))
Transaction(trx => values.foreach(emitValue(_, trx)))
}

/** Emit events into several WriteBus-es at once (in the same transaction)
Expand All @@ -107,7 +107,7 @@ object WriteBus {
if (hasDuplicateTupleKeys(values.map(_.tuple))) {
throw new Exception("Unable to {EventBus,WriteBus}.emitTry: the provided list of event buses has duplicates. You can't make an observable emit more than one event per transaction.")
}
new Transaction(trx => values.foreach(emitTryValue(_, trx)))
Transaction(trx => values.foreach(emitTryValue(_, trx)))
}

@inline private def emitValue[A](tuple: BusTuple[A], transaction: Transaction): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ class ConcurrentStream[A](
private val accumulatedStreams: JsArray[EventStream[A]] = JsArray()

private val internalEventObserver: InternalObserver[A] = InternalObserver[A](
onNext = (nextEvent, _) => new Transaction(fireValue(nextEvent, _)),
onError = (nextError, _) => new Transaction(fireError(nextError, _))
onNext = (nextEvent, _) => Transaction(fireValue(nextEvent, _)),
onError = (nextError, _) => Transaction(fireError(nextError, _))
)

override protected val topoRank: Int = 1
Expand Down Expand Up @@ -51,7 +51,7 @@ class ConcurrentStream[A](
case Failure(err) =>
// @TODO[API] Not 100% sure that we should emit this error, but since
// we expect to use signal's current value, I think this is right.
new Transaction(fireError(err, _)) // #Note[onStart,trx,loop]
Transaction(fireError(err, _)) // #Note[onStart,trx,loop]
case _ => ()
}
case _ => ()
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/com/raquo/airstream/flatten/SwitchSignal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class SwitchSignal[A](
onTry = (nextTry, _) => {
//println(s"> init trx from $this SwitchSignal.onValue($nextTry)")
innerSignalLastSeenUpdateId = Protected.lastUpdateId(currentSignalTry.get)
new Transaction(fireTry(nextTry, _))
Transaction(fireTry(nextTry, _))
}
)

Expand Down Expand Up @@ -104,7 +104,7 @@ class SwitchSignal[A](
// Update this signal's value with nextSignal's current value (or an error if we don't have nextSignal)

//println(s"> init trx from SwitchSignal.onTry (new signal)")
new Transaction(trx => {
Transaction { trx =>

// #Note: Timing is important here.
// 1. Create the `trx` transaction, since we need that boundary when flattening
Expand All @@ -130,7 +130,7 @@ class SwitchSignal[A](

nextSignalTry.foreach(_.addInternalObserver(internalEventObserver, shouldCallMaybeWillStart = false))
}
})
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ class SwitchSignalStream[A](
private[this] val internalEventObserver: InternalObserver[A] = InternalObserver.fromTry[A](
onTry = (nextTry, _) => {
//println(s"> init trx from SwitchSignalStream.onValue($nextTry)")
new Transaction(trx => {
Transaction { trx =>
if (isStarted) {
fireTry(nextTry, trx)
maybeCurrentSignalTry.foreach { _.foreach { currentSignal =>
lastSeenSignalUpdateId = Protected.lastUpdateId(currentSignal)
}}
}
})
}
}
)

Expand Down Expand Up @@ -70,7 +70,7 @@ class SwitchSignalStream[A](
lastSeenSignalUpdateId = -1

//println(s"> init trx from SwitchSignalStream.onTry (new signal)")
new Transaction(trx => {
Transaction { trx =>
if (isStarted) {
// #Note: Timing is important here.
// 1. Create the `trx` transaction, since we need that boundary when flattening
Expand All @@ -92,7 +92,7 @@ class SwitchSignalStream[A](
nextSignal.addInternalObserver(internalEventObserver, shouldCallMaybeWillStart = false)
}
}
})
}
}
}

Expand All @@ -103,12 +103,12 @@ class SwitchSignalStream[A](
val newSignalLastUpdateId = Protected.lastUpdateId(currentSignal)
if (newSignalLastUpdateId != lastSeenSignalUpdateId) {
//println(s"> init trx from SwitchSignalStream.onTry (same signal)")
new Transaction(trx => {
Transaction { trx =>
if (isStarted) {
fireTry(currentSignal.tryNow(), trx) // #Note[onStart,trx,loop]
lastSeenSignalUpdateId = newSignalLastUpdateId
}
})
}
}
currentSignal.addInternalObserver(internalEventObserver, shouldCallMaybeWillStart = false)
})
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/com/raquo/airstream/flatten/SwitchStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ class SwitchStream[I, O](
private[this] val internalEventObserver: InternalObserver[O] = InternalObserver[O](
onNext = (nextEvent, _) => {
//println(s"> init trx from SwitchEventStream.onValue(${nextEvent})")
new Transaction(fireValue(nextEvent, _))
Transaction(fireValue(nextEvent, _))
},
onError = (nextError, _) => {
new Transaction(fireError(nextError, _))
Transaction(fireError(nextError, _))
}
)

Expand Down Expand Up @@ -113,7 +113,7 @@ class SwitchStream[I, O](
private def switchToNextError(nextError: Throwable, transaction: Option[Transaction]): Unit = {
removeInternalObserverFromCurrentEventStream()
maybeCurrentEventStreamTry = Failure(nextError)
transaction.fold[Unit](new Transaction(fireError(nextError, _)))(fireError(nextError, _)) // #Note[onStart,trx,loop]
transaction.fold[Unit](Transaction(fireError(nextError, _)))(fireError(nextError, _)) // #Note[onStart,trx,loop]
}

private def removeInternalObserverFromCurrentEventStream(): Unit = {
Expand Down
Loading

0 comments on commit e3aab7e

Please sign in to comment.