Skip to content

Commit

Permalink
Added Websocket event sources for #49
Browse files Browse the repository at this point in the history
  • Loading branch information
ajaychandran committed Dec 27, 2020
1 parent 167e2db commit 02aeaac
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 2 deletions.
33 changes: 31 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -587,11 +587,40 @@ The implementation follows that of `org.scalajs.dom.ext.ajax.apply`, but is adju

### Websockets

Airstream has no official websockets integration yet.
Airstream supports uni-directional and bi-directional websockets.

For several users' implementations, search Laminar gitter room, and the issues in this repo.
```scala
import com.raquo.airstream.eventstream.EventStream
import com.raquo.airstream.web.WebSocketEventStream
import org.scalajs.dom

import scala.scalajs.js.typedarray.ArrayBuffer

// absolute URL is required
// use com.raquo.airstream.web.websocketPath to construct an absolute URL from a relative one
val url: String = ???

// uni-directional, server -> client
val s1: EventStream[dom.MessageEvent] = WebSocketEventStream(url)

// bi-directional, transmit text from client -> server
val src2: EventStream[String] = ???
val s2: EventStream[dom.MessageEvent] = WebSocketEventStream(url, src2)

// bi-directional, transmit binary from client -> server
val src3: EventStream[ArrayBuffer] = ???
val s3: EventStream[dom.MessageEvent] = WebSocketEventStream(url, src3)

// bi-directional, transmit blob from client -> server
val src4: EventStream[dom.Blob] = ???
val s4: EventStream[dom.MessageEvent] = WebSocketEventStream(url, src4)
```

The behavior of the returned stream is as follows:
- A new websocket connection is established when the stream is started.
- Upstream messages, if any, are transmitted on this connection.
- Server messages and connection errors (including termination) are propagated downstream.
- The connection is closed when this stream is stopped.

### DOM Events

Expand Down
8 changes: 8 additions & 0 deletions src/main/scala/com/raquo/airstream/web/DomError.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.raquo.airstream.web

import org.scalajs.dom

/**
* Wraps a [[dom.Event DOM error event]].
*/
final case class DomError(event: dom.Event) extends Exception
115 changes: 115 additions & 0 deletions src/main/scala/com/raquo/airstream/web/WebSocketEventStream.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package com.raquo.airstream.web

import com.raquo.airstream.core.Transaction
import com.raquo.airstream.eventstream.EventStream
import com.raquo.airstream.features.{InternalNextErrorObserver, SingleParentObservable}
import com.raquo.airstream.web.WebSocketEventStream.Transmitter
import org.scalajs.dom

import scala.scalajs.js

/**
* [[WebSocketEventStream]] emits messages from a [[dom.WebSocket]] connection.
*
* Lifecycle:
* - A new connection is established when this stream is started.
* - Upstream messages, if any, are transmitted on this connection.
* - Server [[dom.MessageEvent messages]] and connection [[DomError errors]] are propagated downstream.
* - The connection is closed when this stream is stopped.
*/
class WebSocketEventStream[A](override val parent: EventStream[A], url: String)(implicit T: Transmitter[A])
extends EventStream[dom.MessageEvent]
with SingleParentObservable[A, dom.MessageEvent]
with InternalNextErrorObserver[A] {

protected[airstream] val topoRank: Int = 1

private var jsSocket: js.UndefOr[dom.WebSocket] = js.undefined

protected[airstream] def onError(nextError: Throwable, transaction: Transaction): Unit = {
// noop
}

protected[airstream] def onNext(nextValue: A, transaction: Transaction): Unit = {
// transmit upstream message, no guard required since transmitter is trusted
jsSocket.foreach(T.transmit(_, nextValue))
}

override protected[this] def onStart(): Unit = {

val socket = new dom.WebSocket(url)

// initialize new socket
T.initialize(socket)

// propagate connection termination error
socket.onclose =
(e: dom.CloseEvent) => if (jsSocket.nonEmpty) {
jsSocket = js.undefined
new Transaction(fireError(DomError(e), _))
}

// propagate connection error
socket.onerror =
(e: dom.Event) => if (jsSocket.nonEmpty) new Transaction(fireError(DomError(e), _))

// propagate message received
socket.onmessage =
(e: dom.MessageEvent) => if (jsSocket.nonEmpty) new Transaction(fireValue(e, _))

// update local reference
socket.onopen =
(_: dom.Event) => if (jsSocket.isEmpty) jsSocket = socket

super.onStart()
}

override protected[this] def onStop(): Unit = {
// Is "close" async?
// just to be safe, reset local reference before closing to prevent error propagation in "onclose"
val socket = jsSocket
jsSocket = js.undefined
socket.foreach(_.close())
super.onStop()
}
}

object WebSocketEventStream {

/**
* Returns an [[EventStream]] that emits [[dom.MessageEvent messages]] from a [[dom.WebSocket]] connection.
*
* Websocket [[dom.Event errors]], including [[dom.CloseEvent termination]], are propagated as [[DomError]]s.
*
* @param url '''absolute''' URL of the websocket endpoint,
* use [[websocketPath]] to construct an absolute URL from a relative one
* @param transmit stream of messages to be transmitted to the websocket endpoint
*/
def apply[A: Transmitter](url: String, transmit: EventStream[A] = EventStream.empty): EventStream[dom.MessageEvent] =
new WebSocketEventStream(transmit, url)

sealed abstract class Transmitter[A] {

def initialize(socket: dom.WebSocket): Unit
def transmit(socket: dom.WebSocket, data: A): Unit
}

object Transmitter {

private def binary[A](send: (dom.WebSocket, A) => Unit, binaryType: String): Transmitter[A] =
new Transmitter[A] {
final def initialize(socket: dom.WebSocket): Unit = socket.binaryType = binaryType
final def transmit(socket: dom.WebSocket, data: A): Unit = send(socket, data)
}

private def simple[A](send: (dom.WebSocket, A) => Unit): Transmitter[A] =
new Transmitter[A] {
final def initialize(socket: dom.WebSocket): Unit = ()
final def transmit(socket: dom.WebSocket, data: A): Unit = send(socket, data)
}

implicit val binaryTransmitter: Transmitter[js.typedarray.ArrayBuffer] = binary(_ send _, "arraybuffer")
implicit val blobTransmitter: Transmitter[dom.Blob] = binary(_ send _, "blob")
implicit val stringTransmitter: Transmitter[String] = simple(_ send _)
}
}
18 changes: 18 additions & 0 deletions src/main/scala/com/raquo/airstream/web/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.raquo.airstream

import org.scalajs.dom

package object web {

/**
* Constructs and returns an absolute websocket URL from a relative one.
*/
def websocketPath(relative: String): String = {
val prefix = dom.document.location.protocol match {
case "https:" => "wss:"
case _ => "ws:"
}
val suffix = if (relative.startsWith("/")) relative else s"/$relative"
s"$prefix//${dom.document.location.hostname}:${dom.document.location.port}$suffix"
}
}

0 comments on commit 02aeaac

Please sign in to comment.