From 02aeaac6a4a853f1cf1815a98800fdecc9d4463c Mon Sep 17 00:00:00 2001 From: Ajay Chandran Date: Sun, 27 Dec 2020 21:27:02 +0530 Subject: [PATCH] Added Websocket event sources for #49 --- README.md | 33 ++++- .../com/raquo/airstream/web/DomError.scala | 8 ++ .../airstream/web/WebSocketEventStream.scala | 115 ++++++++++++++++++ .../com/raquo/airstream/web/package.scala | 18 +++ 4 files changed, 172 insertions(+), 2 deletions(-) create mode 100644 src/main/scala/com/raquo/airstream/web/DomError.scala create mode 100644 src/main/scala/com/raquo/airstream/web/WebSocketEventStream.scala create mode 100644 src/main/scala/com/raquo/airstream/web/package.scala diff --git a/README.md b/README.md index 330295e7..e36e2e31 100644 --- a/README.md +++ b/README.md @@ -587,11 +587,40 @@ The implementation follows that of `org.scalajs.dom.ext.ajax.apply`, but is adju ### Websockets -Airstream has no official websockets integration yet. +Airstream supports uni-directional and bi-directional websockets. -For several users' implementations, search Laminar gitter room, and the issues in this repo. +```scala +import com.raquo.airstream.eventstream.EventStream +import com.raquo.airstream.web.WebSocketEventStream +import org.scalajs.dom + +import scala.scalajs.js.typedarray.ArrayBuffer + +// absolute URL is required +// use com.raquo.airstream.web.websocketPath to construct an absolute URL from a relative one +val url: String = ??? + +// uni-directional, server -> client +val s1: EventStream[dom.MessageEvent] = WebSocketEventStream(url) +// bi-directional, transmit text from client -> server +val src2: EventStream[String] = ??? +val s2: EventStream[dom.MessageEvent] = WebSocketEventStream(url, src2) + +// bi-directional, transmit binary from client -> server +val src3: EventStream[ArrayBuffer] = ??? +val s3: EventStream[dom.MessageEvent] = WebSocketEventStream(url, src3) + +// bi-directional, transmit blob from client -> server +val src4: EventStream[dom.Blob] = ??? +val s4: EventStream[dom.MessageEvent] = WebSocketEventStream(url, src4) +``` +The behavior of the returned stream is as follows: + - A new websocket connection is established when the stream is started. + - Upstream messages, if any, are transmitted on this connection. + - Server messages and connection errors (including termination) are propagated downstream. + - The connection is closed when this stream is stopped. ### DOM Events diff --git a/src/main/scala/com/raquo/airstream/web/DomError.scala b/src/main/scala/com/raquo/airstream/web/DomError.scala new file mode 100644 index 00000000..ce501293 --- /dev/null +++ b/src/main/scala/com/raquo/airstream/web/DomError.scala @@ -0,0 +1,8 @@ +package com.raquo.airstream.web + +import org.scalajs.dom + +/** + * Wraps a [[dom.Event DOM error event]]. + */ +final case class DomError(event: dom.Event) extends Exception diff --git a/src/main/scala/com/raquo/airstream/web/WebSocketEventStream.scala b/src/main/scala/com/raquo/airstream/web/WebSocketEventStream.scala new file mode 100644 index 00000000..e233966c --- /dev/null +++ b/src/main/scala/com/raquo/airstream/web/WebSocketEventStream.scala @@ -0,0 +1,115 @@ +package com.raquo.airstream.web + +import com.raquo.airstream.core.Transaction +import com.raquo.airstream.eventstream.EventStream +import com.raquo.airstream.features.{InternalNextErrorObserver, SingleParentObservable} +import com.raquo.airstream.web.WebSocketEventStream.Transmitter +import org.scalajs.dom + +import scala.scalajs.js + +/** + * [[WebSocketEventStream]] emits messages from a [[dom.WebSocket]] connection. + * + * Lifecycle: + * - A new connection is established when this stream is started. + * - Upstream messages, if any, are transmitted on this connection. + * - Server [[dom.MessageEvent messages]] and connection [[DomError errors]] are propagated downstream. + * - The connection is closed when this stream is stopped. + */ +class WebSocketEventStream[A](override val parent: EventStream[A], url: String)(implicit T: Transmitter[A]) + extends EventStream[dom.MessageEvent] + with SingleParentObservable[A, dom.MessageEvent] + with InternalNextErrorObserver[A] { + + protected[airstream] val topoRank: Int = 1 + + private var jsSocket: js.UndefOr[dom.WebSocket] = js.undefined + + protected[airstream] def onError(nextError: Throwable, transaction: Transaction): Unit = { + // noop + } + + protected[airstream] def onNext(nextValue: A, transaction: Transaction): Unit = { + // transmit upstream message, no guard required since transmitter is trusted + jsSocket.foreach(T.transmit(_, nextValue)) + } + + override protected[this] def onStart(): Unit = { + + val socket = new dom.WebSocket(url) + + // initialize new socket + T.initialize(socket) + + // propagate connection termination error + socket.onclose = + (e: dom.CloseEvent) => if (jsSocket.nonEmpty) { + jsSocket = js.undefined + new Transaction(fireError(DomError(e), _)) + } + + // propagate connection error + socket.onerror = + (e: dom.Event) => if (jsSocket.nonEmpty) new Transaction(fireError(DomError(e), _)) + + // propagate message received + socket.onmessage = + (e: dom.MessageEvent) => if (jsSocket.nonEmpty) new Transaction(fireValue(e, _)) + + // update local reference + socket.onopen = + (_: dom.Event) => if (jsSocket.isEmpty) jsSocket = socket + + super.onStart() + } + + override protected[this] def onStop(): Unit = { + // Is "close" async? + // just to be safe, reset local reference before closing to prevent error propagation in "onclose" + val socket = jsSocket + jsSocket = js.undefined + socket.foreach(_.close()) + super.onStop() + } +} + +object WebSocketEventStream { + + /** + * Returns an [[EventStream]] that emits [[dom.MessageEvent messages]] from a [[dom.WebSocket]] connection. + * + * Websocket [[dom.Event errors]], including [[dom.CloseEvent termination]], are propagated as [[DomError]]s. + * + * @param url '''absolute''' URL of the websocket endpoint, + * use [[websocketPath]] to construct an absolute URL from a relative one + * @param transmit stream of messages to be transmitted to the websocket endpoint + */ + def apply[A: Transmitter](url: String, transmit: EventStream[A] = EventStream.empty): EventStream[dom.MessageEvent] = + new WebSocketEventStream(transmit, url) + + sealed abstract class Transmitter[A] { + + def initialize(socket: dom.WebSocket): Unit + def transmit(socket: dom.WebSocket, data: A): Unit + } + + object Transmitter { + + private def binary[A](send: (dom.WebSocket, A) => Unit, binaryType: String): Transmitter[A] = + new Transmitter[A] { + final def initialize(socket: dom.WebSocket): Unit = socket.binaryType = binaryType + final def transmit(socket: dom.WebSocket, data: A): Unit = send(socket, data) + } + + private def simple[A](send: (dom.WebSocket, A) => Unit): Transmitter[A] = + new Transmitter[A] { + final def initialize(socket: dom.WebSocket): Unit = () + final def transmit(socket: dom.WebSocket, data: A): Unit = send(socket, data) + } + + implicit val binaryTransmitter: Transmitter[js.typedarray.ArrayBuffer] = binary(_ send _, "arraybuffer") + implicit val blobTransmitter: Transmitter[dom.Blob] = binary(_ send _, "blob") + implicit val stringTransmitter: Transmitter[String] = simple(_ send _) + } +} diff --git a/src/main/scala/com/raquo/airstream/web/package.scala b/src/main/scala/com/raquo/airstream/web/package.scala new file mode 100644 index 00000000..42194001 --- /dev/null +++ b/src/main/scala/com/raquo/airstream/web/package.scala @@ -0,0 +1,18 @@ +package com.raquo.airstream + +import org.scalajs.dom + +package object web { + + /** + * Constructs and returns an absolute websocket URL from a relative one. + */ + def websocketPath(relative: String): String = { + val prefix = dom.document.location.protocol match { + case "https:" => "wss:" + case _ => "ws:" + } + val suffix = if (relative.startsWith("/")) relative else s"/$relative" + s"$prefix//${dom.document.location.hostname}:${dom.document.location.port}$suffix" + } +}