Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement MessagePort and MessageChannel #3336

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/workerd/api/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ filegroup(
"rtti.c++",
"url.c++",
"util.c++",
"message-channel.c++",
jasnell marked this conversation as resolved.
Show resolved Hide resolved
],
),
visibility = ["//visibility:public"],
Expand All @@ -47,6 +48,7 @@ filegroup(
"rtti.h",
"url.h",
"util.h",
"message-channel.h",
],
),
visibility = ["//visibility:public"],
Expand Down Expand Up @@ -198,6 +200,18 @@ wd_cc_library(
],
)

wd_cc_library(
name = "message-channel",
srcs = ["message-channel.c++"],
hdrs = ["message-channel.h"],
visibility = ["//visibility:public"],
deps = [
"//src/workerd/io",
"//src/workerd/jsg:memory-tracker",
"@capnp-cpp//src/kj",
],
)

wd_cc_library(
name = "deferred-proxy",
hdrs = ["deferred-proxy.h"],
Expand Down
67 changes: 66 additions & 1 deletion src/workerd/api/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,70 @@

namespace workerd::api {

class CloseEvent: public Event {
jasnell marked this conversation as resolved.
Show resolved Hide resolved
public:
CloseEvent(): Event("close") {}

CloseEvent(uint code, kj::String reason, bool clean)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth adding code comments here indicating that these constructor variations are specific to websockets.

: Event("close"),
code(code),
reason(kj::mv(reason)),
clean(clean) {}
CloseEvent(kj::String type, int code, kj::String reason, bool clean)
: Event(kj::mv(type)),
code(code),
reason(kj::mv(reason)),
clean(clean) {}

struct Initializer {
jsg::Optional<int> code;
jsg::Optional<kj::String> reason;
jsg::Optional<bool> wasClean;

JSG_STRUCT(code, reason, wasClean);
JSG_STRUCT_TS_OVERRIDE(CloseEventInit);
};
static jsg::Ref<CloseEvent> constructor(jsg::Optional<kj::String> type = kj::none,
jsg::Optional<Initializer> initializer = kj::none) {
KJ_IF_SOME(t, type) {
Initializer init = kj::mv(initializer).orDefault({});
return jsg::alloc<CloseEvent>(kj::mv(t), init.code.orDefault(0),
kj::mv(init.reason).orDefault(nullptr), init.wasClean.orDefault(false));
}

return jsg::alloc<CloseEvent>();
}

int getCode() {
return code;
}
kj::Maybe<kj::StringPtr> getReason() {
return reason;
}
bool getWasClean() {
return clean;
}

JSG_RESOURCE_TYPE(CloseEvent) {
JSG_INHERIT(Event);

JSG_READONLY_INSTANCE_PROPERTY(code, getCode);
JSG_READONLY_INSTANCE_PROPERTY(reason, getReason);
JSG_READONLY_INSTANCE_PROPERTY(wasClean, getWasClean);

JSG_TS_ROOT();
}

void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
tracker.trackField("reason", reason);
}

private:
kj::uint code{};
kj::Maybe<kj::String> reason;
bool clean{};
};

class ErrorEvent: public Event {
public:
struct ErrorEventInit {
Expand Down Expand Up @@ -48,6 +112,7 @@ class ErrorEvent: public Event {
void visitForGc(jsg::GcVisitor& visitor);
};

#define EW_EVENTS_ISOLATE_TYPES api::ErrorEvent, api::ErrorEvent::ErrorEventInit
#define EW_EVENTS_ISOLATE_TYPES \
api::ErrorEvent, api::ErrorEvent::ErrorEventInit, api::CloseEvent, api::CloseEvent::Initializer

} // namespace workerd::api
1 change: 1 addition & 0 deletions src/workerd/api/global-scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace workerd::api {
class TailEvent;
class Cache;
class CacheStorage;
class CloseEvent;
class Crypto;
class CryptoKey;
class ErrorEvent;
Expand Down
66 changes: 66 additions & 0 deletions src/workerd/api/message-channel.c++
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#include "message-channel.h"

#include <workerd/api/events.h>
#include <workerd/jsg/jsg.h>

namespace workerd::api {

MessagePort::~MessagePort() noexcept(false) {
// Technically we should dispatch a close event whenever this object
// is garbage collected. Unfortunately, we cannot guarantee that the destructor
// will always have the isolate lock.
// Let's keep in mind that the destructor might be running during v8 GC
// in which you shouldn't execute any javascript at all.
// dispatchEvent(js, Event::constructor(kj::str("close"), kj::none));
}

jsg::Ref<MessagePort> MessagePort::constructor() {
return jsg::alloc<MessagePort>();
}
Comment on lines +17 to +19
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessagePort is not user-constructible so shouldn't implement this

Suggested change
jsg::Ref<MessagePort> MessagePort::constructor() {
return jsg::alloc<MessagePort>();
}


void MessagePort::disentangle(jsg::Lock &js) {
KJ_IF_SOME(e, entangledWith) {
// Fire an event named close at otherPort.
e->dispatchEvent(js, CloseEvent::constructor());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general practice, the static constructor is meant to the be implementation of the new T constructor in javascript. For internal construction, use jsg::alloc<T> instead.

Suggested change
e->dispatchEvent(js, CloseEvent::constructor());
e->dispatchEvent(js, jsg::alloc<CloseEvent>());

e->entangledWith = kj::none;
}
entangledWith = kj::none;
}

void MessagePort::entangle(jsg::Lock &js, jsg::Ref<MessagePort> port) {
disentangle(js);
entangledWith = port.addRef();
port->entangledWith = JSG_THIS;
}

void MessagePort::postMessage(jsg::Lock &js,
jsg::Value message,
kj::OneOf<kj::Maybe<StructuredSerializeOptions>, kj::Array<jsg::Value>> options) {}
jasnell marked this conversation as resolved.
Show resolved Hide resolved

void MessagePort::start(jsg::Lock &js) {
// The start() method steps are to enable this's port message queue, if it is not already enabled.
if (messageQueue == kj::none) {
messageQueue = kj::Vector<Message>();
}
}

void MessagePort::stop(jsg::Lock &js) {}

void MessagePort::close(jsg::Lock &js) {
// Set this's [[Detached]] internal slot value to true.
detached = true;
// If this is entangled, disentangle it.
disentangle(js);
// The close event will be fired even if the port is not explicitly closed.
dispatchEvent(js, CloseEvent::constructor());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
dispatchEvent(js, CloseEvent::constructor());
dispatchEvent(js, jsg::alloc<CloseEvent>());

}

MessageChannel::MessageChannel(jsg::Lock &js)
: port1(MessagePort::constructor()),
port2(MessagePort::constructor()) {}
Comment on lines +58 to +60
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
MessageChannel::MessageChannel(jsg::Lock &js)
: port1(MessagePort::constructor()),
port2(MessagePort::constructor()) {}
MessageChannel::MessageChannel(jsg::Lock &js)
: port1(jsg::alloc<MessagePort>()),
port2(jsg::alloc<MessagePort>()) {}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should be entangled with each other immediately after creation, shouldn't they?


jsg::Ref<MessageChannel> MessageChannel::constructor(jsg::Lock &js) {
return jsg::alloc<MessageChannel>(js);
}

} // namespace workerd::api
116 changes: 116 additions & 0 deletions src/workerd/api/message-channel.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright (c) 2017-2022 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

#pragma once

#include <workerd/api/basics.h>
#include <workerd/jsg/jsg.h>

#include <kj/array.h>

namespace workerd::api {

// Implements MessagePort web-spec
// Ref: https://html.spec.whatwg.org/multipage/web-messaging.html#message-ports
class MessagePort: public EventTarget {
public:
MessagePort() = default;
~MessagePort() noexcept(false);
KJ_DISALLOW_COPY(MessagePort);

static jsg::Ref<MessagePort> constructor();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per the spec, new MessagePort() should fail with an error (it is not user constructible). This should be removed.

Suggested change
static jsg::Ref<MessagePort> constructor();


struct StructuredSerializeOptions {
kj::Array<jsg::Object> transfer{};
Copy link
Member

@jasnell jasnell Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jsg::Object instances must always be held by a jsg::Ref... this should also be an optional argument also.

Suggested change
kj::Array<jsg::Object> transfer{};
jsg::Optional<kj::Array<jsg::Ref<jsg::Object>>> transfer{};


JSG_STRUCT(transfer);
};

void postMessage(jsg::Lock& js,
jsg::Value message,
kj::OneOf<kj::Maybe<StructuredSerializeOptions>, kj::Array<jsg::Value>> options);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be inverted and made into a jsg::Optional ...

jsg::Optional<kj::OneOf<StructuredSerializeOptions, kj::Array<jsg::Value>> options

void start(jsg::Lock& js);
void stop(jsg::Lock& js);
void close(jsg::Lock& js);

JSG_RESOURCE_TYPE(MessagePort) {
JSG_NESTED_TYPE(EventTarget);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
JSG_NESTED_TYPE(EventTarget);
JSG_INHERIT(EventTarget)

JSG_METHOD(postMessage);
JSG_METHOD(start);
JSG_METHOD(stop);
JSG_METHOD(close);
}

// Ref: https://html.spec.whatwg.org/multipage/web-messaging.html#disentangle
void disentangle(jsg::Lock& js);

// Ref: https://html.spec.whatwg.org/multipage/web-messaging.html#entangle
void entangle(jsg::Lock& js, jsg::Ref<MessagePort> port);

private:
bool detached = false;

kj::Maybe<jsg::HashableV8Ref<v8::Object>> onmessage;
kj::Maybe<jsg::HashableV8Ref<v8::Object>> onmessageerror;

// Each MessagePort object can be entangled with another (a symmetric relationship)
kj::Maybe<jsg::Ref<MessagePort>> entangledWith{};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll need the visitForGc method implemented in this class also visiting each of these fields (onmessage, onmessageerror, and entagledWith)


class Message {
public:
Message() = default;
};

bool isMessageQueueEnabled() const {
return messageQueue != kj::none;
}

// Each MessagePort object also has a task source called the port message queue,
// A port message queue can be enabled or disabled, and is initially disabled.
// Once enabled, a port can never be disabled again
kj::Maybe<kj::Vector<Message>> messageQueue{};

bool hasBeenShipped = false;
jasnell marked this conversation as resolved.
Show resolved Hide resolved
};

// Implements MessageChannel web-spec
// Ref: https://html.spec.whatwg.org/multipage/web-messaging.html#message-channels
class MessageChannel: public jsg::Object {
public:
explicit MessageChannel(jsg::Lock& js);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the jsg::Lock& argument is not used.


static jsg::Ref<MessageChannel> constructor(jsg::Lock& js);

jsg::Ref<MessagePort> getPort1() {
return port1.addRef();
}

jsg::Ref<MessagePort> getPort2() {
return port2.addRef();
}

JSG_RESOURCE_TYPE(MessageChannel) {
JSG_READONLY_PROTOTYPE_PROPERTY(port1, getPort1);
JSG_READONLY_PROTOTYPE_PROPERTY(port2, getPort2);
Comment on lines +94 to +95
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the spec require these to be properties on the MessageChannel prototype? If not, using lazy instance properties would be more efficient. Not critical tho.

Suggested change
JSG_READONLY_PROTOTYPE_PROPERTY(port1, getPort1);
JSG_READONLY_PROTOTYPE_PROPERTY(port2, getPort2);
JSG_READONLY_LAZY_INSTANCE_PROPERTY(port1, getPort1);
JSG_READONLY_LAZY_INSTANCE_PROPERTY(port2, getPort2);

}

void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
tracker.trackField("port1", port1);
tracker.trackField("port2", port2);
}
anonrig marked this conversation as resolved.
Show resolved Hide resolved

void visitForGc(jsg::GcVisitor& visitor) {
visitor.visit(port1);
visitor.visit(port2);
}

private:
jsg::Ref<MessagePort> port1;
jsg::Ref<MessagePort> port2;
};

#define EW_MESSAGE_CHANNEL_ISOLATE_TYPES \
api::MessageChannel, api::MessagePort, api::MessagePort::StructuredSerializeOptions

} // namespace workerd::api
61 changes: 1 addition & 60 deletions src/workerd/api/web-socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,64 +98,6 @@ class MessageEvent: public Event {
}
};

class CloseEvent: public Event {
public:
CloseEvent(uint code, kj::String reason, bool clean)
: Event("close"),
code(code),
reason(kj::mv(reason)),
clean(clean) {}
CloseEvent(kj::String type, int code, kj::String reason, bool clean)
: Event(kj::mv(type)),
code(code),
reason(kj::mv(reason)),
clean(clean) {}

struct Initializer {
jsg::Optional<int> code;
jsg::Optional<kj::String> reason;
jsg::Optional<bool> wasClean;

JSG_STRUCT(code, reason, wasClean);
JSG_STRUCT_TS_OVERRIDE(CloseEventInit);
};
static jsg::Ref<CloseEvent> constructor(kj::String type, jsg::Optional<Initializer> initializer) {
Initializer init = kj::mv(initializer).orDefault({});
return jsg::alloc<CloseEvent>(kj::mv(type), init.code.orDefault(0),
kj::mv(init.reason).orDefault(nullptr), init.wasClean.orDefault(false));
}

int getCode() {
return code;
}
kj::StringPtr getReason() {
return reason;
}
bool getWasClean() {
return clean;
}

JSG_RESOURCE_TYPE(CloseEvent) {
JSG_INHERIT(Event);

JSG_READONLY_INSTANCE_PROPERTY(code, getCode);
JSG_READONLY_INSTANCE_PROPERTY(reason, getReason);
JSG_READONLY_INSTANCE_PROPERTY(wasClean, getWasClean);

JSG_TS_ROOT();
// CloseEvent will be referenced from the `WebSocketEventMap` define
}

void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
tracker.trackField("reason", reason);
}

private:
int code;
kj::String reason;
bool clean;
};

// The forward declaration is necessary so we can make some
// WebSocket methods accessible to WebSocketPair via friend declaration.
class WebSocket;
Expand Down Expand Up @@ -695,8 +637,7 @@ class WebSocket: public EventTarget {
};

#define EW_WEBSOCKET_ISOLATE_TYPES \
api::CloseEvent, api::CloseEvent::Initializer, api::MessageEvent, \
api::MessageEvent::Initializer, api::WebSocket, api::WebSocketPair, \
api::MessageEvent, api::MessageEvent::Initializer, api::WebSocket, api::WebSocketPair, \
api::WebSocketPair::PairIterator, \
api::WebSocketPair::PairIterator:: \
Next // The list of websocket.h types that are added to worker.c++'s JSG_DECLARE_ISOLATE_TYPE
Expand Down
1 change: 1 addition & 0 deletions src/workerd/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ wd_cc_library(
"//src/workerd/api:html-rewriter",
"//src/workerd/api:hyperdrive",
"//src/workerd/api:memory-cache",
"//src/workerd/api:message-channel",
"//src/workerd/api:pyodide",
"//src/workerd/api:r2",
"//src/workerd/api:rtti",
Expand Down
Loading
Loading