From 4fbdd92b8802d2866faa9c3845a53a536e0ae573 Mon Sep 17 00:00:00 2001 From: Yi Yang Date: Mon, 20 Mar 2023 00:25:41 +0800 Subject: [PATCH] Refactor @aomao/plugin-yjs-websocket/server * Export `getYDoc` method from `startServer` * Add `broadcastCustomMessage` method to `WSSharedDoc` * Add `requestListener` option to extend `http.Server `service Improve` auth` option to return a unique ID for the document * Remove `onConnection` option --- docs/config/yjs.md | 92 +++++++++++++++++- docs/config/yjs.zh-CN.md | 96 +++++++++++++++++-- plugins/yjs-websocket/src/server.ts | 7 +- .../yjs-websocket/src/server/persistence.ts | 14 +-- plugins/yjs-websocket/src/server/start.ts | 40 +++++--- plugins/yjs-websocket/src/server/types.ts | 1 + plugins/yjs-websocket/src/server/utils.ts | 41 ++++---- 7 files changed, 241 insertions(+), 50 deletions(-) diff --git a/docs/config/yjs.md b/docs/config/yjs.md index c302f10f..462f67d0 100644 --- a/docs/config/yjs.md +++ b/docs/config/yjs.md @@ -110,10 +110,11 @@ startServer({ host: string; // Port to listen on, default is 1234 port: number; - // http server request listener - requestListener?: http.RequestListener; // Custom authentication, connection will be terminated if code !== 200 is returned - auth?: (request: http.IncomingMessage, ws: WebSocket) => Promise; + // The document ID needs to be returned, and by default, it is extracted from the ws link in the format of ws:domain.com/docname, where docname is the document ID. + auth?: (request: http.IncomingMessage, ws: WebSocket) => Promise; // Persistence options, false means no persistence /** * Default is leveldb @@ -132,7 +133,88 @@ startServer({ contentField?: string; // Update callback callback?: UpdateCallback; - // Connection callback - onConnection?: (doc: WSSharedDoc, conn: WebSocket.WebSocket) => void; }) ``` + +#### Configuration Options + +##### `auth` Custom authentication + +The connection will be terminated when code !== 200. + +The document ID needs to be returned, and by default, it is extracted from the ws link in the format of ws:domain.com/docname, where docname is the document ID. + +```ts +const auth = async (request: http.IncomingMessage, ws: WebSocket) => { + const { url } = request; + const docname = url.split('/').pop(); + if (!docname) return { code: 400, data: 'Not found' }; + return docname; +}; + +startServer({ auth }); +``` + +##### `requestListener` is used for customizing the request listener of the http server, which can be used for custom routing. + +```ts +const app = express(); +app.get('/doc/:name', (req, res) => { + res.send('hello world'); +}); + +startServer(app); +``` + +##### `persistenceOptions` is used for customizing the persistence method. Currently supports leveldb and mongodb. + +```ts +startServer({ + persistenceOptions: { + provider: 'leveldb', + dir: './db', + }, +}); +``` + +```ts +startServer({ + persistenceOptions: { + provider: 'mongodb', + url: 'mongodb://localhost:27017', + }, +}); +``` + +##### `contentField` is used to customize the field name of the document content. The default is `content`. + +```ts +startServer({ + contentField: 'content', +}); +``` + +##### `callback` is used for customizing the update callback. + +```ts +startServer({ + callback: { + // Or use `action: string` to receive post requests via a url + action: (data: Record) => { + // `data` is the updated data + }, + // Timeout duration, defaulting to 5000ms + timeout: 5000; + // ContentType can be "Array" | "Map" | "Text" | "XmlFragment" + // Corresponding data types to be sent + objects?: Record; + }, +}); +``` + +##### `startServer` returns an `http.Server` instance, and you can get the corresponding `Y.Doc` instance via `server.getYDoc(name)`. + +```ts +const server = startServer(); +const doc = server.getYDoc('docname'); +``` diff --git a/docs/config/yjs.zh-CN.md b/docs/config/yjs.zh-CN.md index 0b645a4a..2520e2d3 100644 --- a/docs/config/yjs.zh-CN.md +++ b/docs/config/yjs.zh-CN.md @@ -105,16 +105,17 @@ startServer(); ```ts startServer({ // 监听的 host,默认为 0.0.0.0 - host: string; + host?: string; // 监听的端口,默认为 1234 - port: number; - // http server request listener - requestListener?: http.RequestListener; + port?: number; // 自定义效验,返回 code !== 200 时,会终止连接 + // 需要返回文档的 id,默认取 ws 链接中的 ws:domain.com/docname 其中 docname为文档 id auth?: ( request: http.IncomingMessage, ws: WebSocket, - ) => Promise; + ) => Promise<{ code: number; data: string } | string>; + // http server request listener + requestListener?: http.RequestListener; // 持久化选项,false 为不持久化 /** * 默认为 leveldb @@ -133,7 +134,88 @@ startServer({ contentField?: string; // 更新回调 callback?: UpdateCallback; - // 连接回调 - onConnection?: (doc: WSSharedDoc, conn: WebSocket.WebSocket) => void; }) ``` + +#### 使用配置 + +##### `auth` 用于自定义效验 + +返回 code !== 200 时,会终止连接 + +需要返回文档的 id,默认取 ws 链接中的 ws:domain.com/docname 其中 docname 为文档 id + +```ts +const auth = async (request: http.IncomingMessage, ws: WebSocket) => { + const { url } = request; + const docname = url.split('/').pop(); + if (!docname) return { code: 400, data: '文档id不能为空' }; + return docname; +}; + +startServer({ auth }); +``` + +##### `requestListener` 用于自定义 http server 的 request listener,可以用于自定义路由 + +```ts +const app = express(); +app.get('/doc/:name', (req, res) => { + res.send('hello world'); +}); + +startServer(app); +``` + +##### `persistenceOptions` 用于自定义持久化方式,目前支持 `leveldb` 和 `mongodb` + +```ts +startServer({ + persistenceOptions: { + provider: 'leveldb', + dir: './db', + }, +}); +``` + +```ts +startServer({ + persistenceOptions: { + provider: 'mongodb', + url: 'mongodb://localhost:27017', + }, +}); +``` + +##### `contentField` 用于自定义文档内容字段,默认为 `content` + +```ts +startServer({ + contentField: 'content', +}); +``` + +##### `callback` 用于自定义更新回调 + +```ts +startServer({ + callback: { + // 或则 action: string, 使用一个url来接收post请求 + action: (data: Record) => { + // data 为更新的数据 + }, + // 超时时间,默认为 5000 + timeout: 5000; + // ContentType 为 "Array" | "Map" | "Text" | "XmlFragment" + // 需要发送的对应数据类型 + objects?: Record; + }, +}); +``` + +##### `startServer` 会反应一个 `http.Server` 实例,可以通过 `server.getYDoc(name)` 获取对应的 `Y.Doc` 实例 + +```ts +const server = startServer(); +const doc = server.getYDoc('docname'); +``` diff --git a/plugins/yjs-websocket/src/server.ts b/plugins/yjs-websocket/src/server.ts index 1e62d580..7417635a 100644 --- a/plugins/yjs-websocket/src/server.ts +++ b/plugins/yjs-websocket/src/server.ts @@ -4,8 +4,13 @@ export default (options: Partial = {}) => { const { host = process.env.HOST || '0.0.0.0', port = parseInt(process.env.PORT || '1234') || 1234, + auth = (request) => + request.url + ? Promise.resolve(request.url.slice(1).split('?')[0] || '') + : Promise.reject('auth not implemented'), } = options; - startServer({ + return startServer({ + auth, ...options, host, port, diff --git a/plugins/yjs-websocket/src/server/persistence.ts b/plugins/yjs-websocket/src/server/persistence.ts index ab8c1daa..e0a4f10f 100644 --- a/plugins/yjs-websocket/src/server/persistence.ts +++ b/plugins/yjs-websocket/src/server/persistence.ts @@ -16,7 +16,7 @@ interface Persistence { bindState: ( docname: string, doc: WSSharedDoc, - onInitialValue?: (content: Y.XmlElement) => Promise | void, + onInitialValue?: (doc: WSSharedDoc) => Promise | void, ) => void; writeState: ( docname: string, @@ -72,11 +72,11 @@ export const initPersistence = async ( persistence = { provider: db, - bindState: async (docName, ydoc, onInitialValue) => { + bindState: async (docname, ydoc, onInitialValue) => { if (!db) return; - const persistedYdoc = await db.getYDoc(docName); + const persistedYdoc = await db.getYDoc(docname); const newUpdates = Y.encodeStateAsUpdate(ydoc); - db.storeUpdate(docName, newUpdates); + db.storeUpdate(docname, newUpdates); const content = persistedYdoc.get( contentField, Y.XmlElement, @@ -88,15 +88,15 @@ export const initPersistence = async ( Y.applyUpdate(ydoc, Y.encodeStateAsUpdate(persistedYdoc)); ydoc.on('update', (update) => { - db?.storeUpdate(docName, update); + db?.storeUpdate(docname, update); }); // init empty content if (content._length === 0 && updateContent._length === 0) { - if (onInitialValue) await onInitialValue(updateContent); + if (onInitialValue) await onInitialValue(ydoc); } }, - writeState: async (docName, ydoc) => { + writeState: async (docname, ydoc) => { // This is called when all connections to the document are closed. // In the future, this method might also be called in intervals or after a certain number of updates. return new Promise((resolve) => { diff --git a/plugins/yjs-websocket/src/server/start.ts b/plugins/yjs-websocket/src/server/start.ts index 78f0c44e..28a7a491 100644 --- a/plugins/yjs-websocket/src/server/start.ts +++ b/plugins/yjs-websocket/src/server/start.ts @@ -1,6 +1,6 @@ import WebSocket from 'ws'; import http from 'http'; -import { setupWSConnection, UpdateCallback } from './utils'; +import { getYDoc, setupWSConnection, UpdateCallback } from './utils'; import { initPersistence, PersistenceOptions } from './persistence'; import { WSSharedDoc } from './types'; @@ -11,15 +11,13 @@ export interface ServerOptions { host: string; // http server port port: number; - // http server request listener - requestListener?: http.RequestListener; // 效验 - auth?: ( + auth: ( request: http.IncomingMessage, ws: WebSocket, - ) => Promise; - // 连接回调 - onConnection?: (doc: WSSharedDoc, conn: WebSocket.WebSocket) => void; + ) => Promise<{ code: number; data: string } | string>; + // http server request listener + requestListener?: http.RequestListener; // 持久化选项,false 为不持久化 persistenceOptions?: PersistenceOptions | false; // 文档内容字段,默认为 content @@ -30,11 +28,16 @@ export interface ServerOptions { const SERVER_OPTIONS_WEAKMAP = new WeakMap(); +declare module 'http' { + interface Server { + getYDoc: (name: string) => WSSharedDoc; + } +} + export const startServer = (options: ServerOptions) => { const { - auth = () => Promise.resolve(), + auth, requestListener, - onConnection, host, port, persistenceOptions = { provider: 'leveldb' }, @@ -50,12 +53,14 @@ export const startServer = (options: ServerOptions) => { }); SERVER_OPTIONS_WEAKMAP.set(server, options); - + const DOC_NAME_WEAKMAP = new WeakMap(); wss.on('connection', (conn, req) => { const { callback } = SERVER_OPTIONS_WEAKMAP.get(server) ?? {}; + const name = DOC_NAME_WEAKMAP.get(conn); + if (!name) throw new Error('doc name not found'); setupWSConnection(conn, req, { callback, - onConnection, + docname: name, }); }); @@ -64,9 +69,12 @@ export const startServer = (options: ServerOptions) => { // See https://github.com/websockets/ws#client-authentication const handleAuth = (ws: WebSocket) => { auth(request, ws).then((res) => { - if (res && res.code !== 200) { - ws.close(res.code, res.data); + const resObject = + typeof res === 'object' ? res : { code: 200, data: res }; + if (resObject.code !== 200) { + ws.close(resObject.code, resObject.data); } else { + DOC_NAME_WEAKMAP.set(ws, resObject.data); wss.emit('connection', ws, request); } }); @@ -81,5 +89,11 @@ export const startServer = (options: ServerOptions) => { server.listen(port, host, () => { console.log(`running at '${host}' on port ${port}`); }); + + server.getYDoc = (name) => { + const { callback } = SERVER_OPTIONS_WEAKMAP.get(server) ?? {}; + return getYDoc(name, undefined, undefined, callback); + }; + return server; }; diff --git a/plugins/yjs-websocket/src/server/types.ts b/plugins/yjs-websocket/src/server/types.ts index 912cd891..912c5c4f 100644 --- a/plugins/yjs-websocket/src/server/types.ts +++ b/plugins/yjs-websocket/src/server/types.ts @@ -7,6 +7,7 @@ export interface WSSharedDoc extends Doc { conns: Map>; awareness: awarenessProtocol.Awareness; sendCustomMessage: (conn: WebSocket, message: Record) => void; + broadcastCustomMessage: (message: Record) => void; } export type ContentType = 'Array' | 'Map' | 'Text' | 'XmlFragment'; diff --git a/plugins/yjs-websocket/src/server/utils.ts b/plugins/yjs-websocket/src/server/utils.ts index c6814bef..94340904 100644 --- a/plugins/yjs-websocket/src/server/utils.ts +++ b/plugins/yjs-websocket/src/server/utils.ts @@ -128,6 +128,16 @@ class WSSharedDoc extends Y.Doc implements WSSharedDocInterface { send(this, conn, encoding.toUint8Array(encoder)); } + broadcastCustomMessage(message: Record) { + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, messageCustom); + encoding.writeAny(encoder, message); + const buff = encoding.toUint8Array(encoder); + this.conns.forEach((_, conn) => { + send(this, conn, buff); + }); + } + destroy(): void { super.destroy(); } @@ -139,7 +149,7 @@ class WSSharedDoc extends Y.Doc implements WSSharedDocInterface { export const getYDoc = ( docname: string, gc: boolean = true, - onInitialValue?: (content: Y.XmlElement) => Promise | void, + onInitialValue?: (doc: WSSharedDocInterface) => Promise | void, callback?: UpdateCallback, ): WSSharedDocInterface => map.setIfUndefined(docs, docname, () => { @@ -251,44 +261,41 @@ const send = ( }; interface SetupWSConnectionOptions { - docName?: string; + docname: string; gc?: boolean; pingTimeout?: number; callback?: UpdateCallback; - onConnection?: ( - doc: WSSharedDocInterface, - conn: WebSocket.WebSocket, - ) => void; } +const INIT_VALUE_WAS_SENT = new WeakSet(); + export const setupWSConnection = ( conn: WebSocket.WebSocket, req: http.IncomingMessage, options?: SetupWSConnectionOptions, ) => { const { - docName = req.url!.slice(1).split('?')[0], + docname, gc = true, pingTimeout = 30000, callback, - onConnection, - } = options ?? {}; + } = options ?? { docname: 'default' }; conn.binaryType = 'arraybuffer'; // get doc, initialize if it does not exist yet const doc = getYDoc( - docName, + docname, gc, - () => { - doc.sendCustomMessage(conn, { - action: 'initValue', - }); + (doc) => { + if (!INIT_VALUE_WAS_SENT.has(doc)) { + INIT_VALUE_WAS_SENT.add(doc); + doc.sendCustomMessage(conn, { + action: 'initValue', + }); + } }, callback, ); doc.conns.set(conn, new Set()); - if (onConnection) { - onConnection(doc, conn); - } // listen and reply to events conn.on('message', (message: ArrayBuffer) => messageListener(conn, doc, new Uint8Array(message)),