Skip to content

Commit

Permalink
implement MessagePort and MessageChannel
Browse files Browse the repository at this point in the history
  • Loading branch information
anonrig committed Jan 14, 2025
1 parent 7053d6c commit e66571f
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 61 deletions.
14 changes: 14 additions & 0 deletions src/workerd/api/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ filegroup(
"rtti.c++",
"url.c++",
"util.c++",
"message-channel.c++",
],
),
visibility = ["//visibility:public"],
Expand All @@ -47,6 +48,7 @@ filegroup(
"rtti.h",
"url.h",
"util.h",
"message-channel.h",
],
),
visibility = ["//visibility:public"],
Expand Down Expand Up @@ -198,6 +200,18 @@ wd_cc_library(
],
)

wd_cc_library(
name = "message-channel",
srcs = ["message-channel.c++"],
hdrs = ["message-channel.h"],
visibility = ["//visibility:public"],
deps = [
"//src/workerd/io",
"//src/workerd/jsg:memory-tracker",
"@capnp-cpp//src/kj",
],
)

wd_cc_library(
name = "deferred-proxy",
hdrs = ["deferred-proxy.h"],
Expand Down
67 changes: 66 additions & 1 deletion src/workerd/api/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,70 @@

namespace workerd::api {

class CloseEvent: public Event {
public:
CloseEvent(): Event("close") {}

CloseEvent(uint code, kj::String reason, bool clean)
: Event("close"),
code(code),
reason(kj::mv(reason)),
clean(clean) {}
CloseEvent(kj::String type, int code, kj::String reason, bool clean)
: Event(kj::mv(type)),
code(code),
reason(kj::mv(reason)),
clean(clean) {}

struct Initializer {
jsg::Optional<int> code;
jsg::Optional<kj::String> reason;
jsg::Optional<bool> wasClean;

JSG_STRUCT(code, reason, wasClean);
JSG_STRUCT_TS_OVERRIDE(CloseEventInit);
};
static jsg::Ref<CloseEvent> constructor(jsg::Optional<kj::String> type = kj::none,
jsg::Optional<Initializer> initializer = kj::none) {
KJ_IF_SOME(t, type) {
Initializer init = kj::mv(initializer).orDefault({});
return jsg::alloc<CloseEvent>(kj::mv(t), init.code.orDefault(0),
kj::mv(init.reason).orDefault(nullptr), init.wasClean.orDefault(false));
}

return jsg::alloc<CloseEvent>();
}

int getCode() {
return code;
}
kj::Maybe<kj::StringPtr> getReason() {
return reason;
}
bool getWasClean() {
return clean;
}

JSG_RESOURCE_TYPE(CloseEvent) {
JSG_INHERIT(Event);

JSG_READONLY_INSTANCE_PROPERTY(code, getCode);
JSG_READONLY_INSTANCE_PROPERTY(reason, getReason);
JSG_READONLY_INSTANCE_PROPERTY(wasClean, getWasClean);

JSG_TS_ROOT();
}

void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
tracker.trackField("reason", reason);
}

private:
kj::uint code{};
kj::Maybe<kj::String> reason;
bool clean{};
};

class ErrorEvent: public Event {
public:
struct ErrorEventInit {
Expand Down Expand Up @@ -48,6 +112,7 @@ class ErrorEvent: public Event {
void visitForGc(jsg::GcVisitor& visitor);
};

#define EW_EVENTS_ISOLATE_TYPES api::ErrorEvent, api::ErrorEvent::ErrorEventInit
#define EW_EVENTS_ISOLATE_TYPES \
api::ErrorEvent, api::ErrorEvent::ErrorEventInit, api::CloseEvent, api::CloseEvent::Initializer

} // namespace workerd::api
1 change: 1 addition & 0 deletions src/workerd/api/global-scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace workerd::api {
class TailEvent;
class Cache;
class CacheStorage;
class CloseEvent;
class Crypto;
class CryptoKey;
class ErrorEvent;
Expand Down
66 changes: 66 additions & 0 deletions src/workerd/api/message-channel.c++
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#include "message-channel.h"

#include <workerd/api/events.h>
#include <workerd/jsg/jsg.h>

namespace workerd::api {

MessagePort::~MessagePort() noexcept(false) {
// Technically we should dispatch a close event whenever this object
// is garbage collected. Unfortunately, we cannot guarantee that the destructor
// will always have the isolate lock.
// Let's keep in mind that the destructor might be running during v8 GC
// in which you shouldn't execute any javascript at all.
// dispatchEvent(js, Event::constructor(kj::str("close"), kj::none));
}

jsg::Ref<MessagePort> MessagePort::constructor() {
return jsg::alloc<MessagePort>();
}

void MessagePort::disentangle(jsg::Lock &js) {
KJ_IF_SOME(e, entangledWith) {
// Fire an event named close at otherPort.
e->dispatchEvent(js, CloseEvent::constructor());
e->entangledWith = kj::none;
}
entangledWith = kj::none;
}

void MessagePort::entangle(jsg::Lock &js, jsg::Ref<MessagePort> port) {
disentangle(js);
entangledWith = port.addRef();
port->entangledWith = JSG_THIS;
}

void MessagePort::postMessage(jsg::Lock &js,
jsg::Value message,
kj::OneOf<kj::Maybe<StructuredSerializeOptions>, kj::Array<jsg::Value>> options) {}

void MessagePort::start(jsg::Lock &js) {
// The start() method steps are to enable this's port message queue, if it is not already enabled.
if (messageQueue == kj::none) {
messageQueue = kj::Vector<Message>();
}
}

void MessagePort::stop(jsg::Lock &js) {}

void MessagePort::close(jsg::Lock &js) {
// Set this's [[Detached]] internal slot value to true.
detached = true;
// If this is entangled, disentangle it.
disentangle(js);
// The close event will be fired even if the port is not explicitly closed.
dispatchEvent(js, CloseEvent::constructor());
}

MessageChannel::MessageChannel(jsg::Lock &js)
: port1(MessagePort::constructor()),
port2(MessagePort::constructor()) {}

jsg::Ref<MessageChannel> MessageChannel::constructor(jsg::Lock &js) {
return jsg::alloc<MessageChannel>(js);
}

} // namespace workerd::api
116 changes: 116 additions & 0 deletions src/workerd/api/message-channel.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright (c) 2017-2022 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

#pragma once

#include <workerd/api/basics.h>
#include <workerd/jsg/jsg.h>

#include <kj/array.h>

namespace workerd::api {

// Implements MessagePort web-spec
// Ref: https://html.spec.whatwg.org/multipage/web-messaging.html#message-ports
class MessagePort: public EventTarget {
public:
MessagePort() = default;
~MessagePort() noexcept(false);
KJ_DISALLOW_COPY(MessagePort);

static jsg::Ref<MessagePort> constructor();

struct StructuredSerializeOptions {
kj::Array<jsg::Object> transfer{};

JSG_STRUCT(transfer);
};

void postMessage(jsg::Lock& js,
jsg::Value message,
kj::OneOf<kj::Maybe<StructuredSerializeOptions>, kj::Array<jsg::Value>> options);
void start(jsg::Lock& js);
void stop(jsg::Lock& js);
void close(jsg::Lock& js);

JSG_RESOURCE_TYPE(MessagePort) {
JSG_NESTED_TYPE(EventTarget);
JSG_METHOD(postMessage);
JSG_METHOD(start);
JSG_METHOD(stop);
JSG_METHOD(close);
}

// Ref: https://html.spec.whatwg.org/multipage/web-messaging.html#disentangle
void disentangle(jsg::Lock& js);

// Ref: https://html.spec.whatwg.org/multipage/web-messaging.html#entangle
void entangle(jsg::Lock& js, jsg::Ref<MessagePort> port);

private:
bool detached = false;

kj::Maybe<jsg::HashableV8Ref<v8::Object>> onmessage;
kj::Maybe<jsg::HashableV8Ref<v8::Object>> onmessageerror;

// Each MessagePort object can be entangled with another (a symmetric relationship)
kj::Maybe<jsg::Ref<MessagePort>> entangledWith{};

class Message {
public:
Message() = default;
};

bool isMessageQueueEnabled() const {
return messageQueue != kj::none;
}

// Each MessagePort object also has a task source called the port message queue,
// A port message queue can be enabled or disabled, and is initially disabled.
// Once enabled, a port can never be disabled again
kj::Maybe<kj::Vector<Message>> messageQueue{};

bool hasBeenShipped = false;
};

// Implements MessageChannel web-spec
// Ref: https://html.spec.whatwg.org/multipage/web-messaging.html#message-channels
class MessageChannel: public jsg::Object {
public:
explicit MessageChannel(jsg::Lock& js);

static jsg::Ref<MessageChannel> constructor(jsg::Lock& js);

jsg::Ref<MessagePort> getPort1() {
return port1.addRef();
}

jsg::Ref<MessagePort> getPort2() {
return port2.addRef();
}

JSG_RESOURCE_TYPE(MessageChannel) {
JSG_READONLY_PROTOTYPE_PROPERTY(port1, getPort1);
JSG_READONLY_PROTOTYPE_PROPERTY(port2, getPort2);
}

void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
tracker.trackField("port1", port1);
tracker.trackField("port2", port2);
}

void visitForGc(jsg::GcVisitor& visitor) {
visitor.visit(port1);
visitor.visit(port2);
}

private:
jsg::Ref<MessagePort> port1;
jsg::Ref<MessagePort> port2;
};

#define EW_MESSAGE_CHANNEL_ISOLATE_TYPES \
api::MessageChannel, api::MessagePort, api::MessagePort::StructuredSerializeOptions

} // namespace workerd::api
61 changes: 1 addition & 60 deletions src/workerd/api/web-socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,64 +98,6 @@ class MessageEvent: public Event {
}
};

class CloseEvent: public Event {
public:
CloseEvent(uint code, kj::String reason, bool clean)
: Event("close"),
code(code),
reason(kj::mv(reason)),
clean(clean) {}
CloseEvent(kj::String type, int code, kj::String reason, bool clean)
: Event(kj::mv(type)),
code(code),
reason(kj::mv(reason)),
clean(clean) {}

struct Initializer {
jsg::Optional<int> code;
jsg::Optional<kj::String> reason;
jsg::Optional<bool> wasClean;

JSG_STRUCT(code, reason, wasClean);
JSG_STRUCT_TS_OVERRIDE(CloseEventInit);
};
static jsg::Ref<CloseEvent> constructor(kj::String type, jsg::Optional<Initializer> initializer) {
Initializer init = kj::mv(initializer).orDefault({});
return jsg::alloc<CloseEvent>(kj::mv(type), init.code.orDefault(0),
kj::mv(init.reason).orDefault(nullptr), init.wasClean.orDefault(false));
}

int getCode() {
return code;
}
kj::StringPtr getReason() {
return reason;
}
bool getWasClean() {
return clean;
}

JSG_RESOURCE_TYPE(CloseEvent) {
JSG_INHERIT(Event);

JSG_READONLY_INSTANCE_PROPERTY(code, getCode);
JSG_READONLY_INSTANCE_PROPERTY(reason, getReason);
JSG_READONLY_INSTANCE_PROPERTY(wasClean, getWasClean);

JSG_TS_ROOT();
// CloseEvent will be referenced from the `WebSocketEventMap` define
}

void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
tracker.trackField("reason", reason);
}

private:
int code;
kj::String reason;
bool clean;
};

// The forward declaration is necessary so we can make some
// WebSocket methods accessible to WebSocketPair via friend declaration.
class WebSocket;
Expand Down Expand Up @@ -695,8 +637,7 @@ class WebSocket: public EventTarget {
};

#define EW_WEBSOCKET_ISOLATE_TYPES \
api::CloseEvent, api::CloseEvent::Initializer, api::MessageEvent, \
api::MessageEvent::Initializer, api::WebSocket, api::WebSocketPair, \
api::MessageEvent, api::MessageEvent::Initializer, api::WebSocket, api::WebSocketPair, \
api::WebSocketPair::PairIterator, \
api::WebSocketPair::PairIterator:: \
Next // The list of websocket.h types that are added to worker.c++'s JSG_DECLARE_ISOLATE_TYPE
Expand Down
1 change: 1 addition & 0 deletions src/workerd/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ wd_cc_library(
"//src/workerd/api:html-rewriter",
"//src/workerd/api:hyperdrive",
"//src/workerd/api:memory-cache",
"//src/workerd/api:message-channel",
"//src/workerd/api:pyodide",
"//src/workerd/api:r2",
"//src/workerd/api:rtti",
Expand Down
Loading

0 comments on commit e66571f

Please sign in to comment.