Skip to content

Commit

Permalink
Refactor @aomao/plugin-yjs-websocket/server
Browse files Browse the repository at this point in the history
* Export `getYDoc` method from `startServer`
* Add `broadcastCustomMessage` method to `WSSharedDoc`
* Add `requestListener` option to extend `http.Server `service
Improve` auth` option to return a unique ID for the document
* Remove `onConnection` option
  • Loading branch information
big-camel committed Mar 19, 2023
1 parent ccc1164 commit 4fbdd92
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 50 deletions.
92 changes: 87 additions & 5 deletions docs/config/yjs.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,11 @@ startServer({
host: string;
// Port to listen on, default is 1234
port: number;
// http server request listener
requestListener?: http.RequestListener;
// Custom authentication, connection will be terminated if code !== 200 is returned
auth?: (request: http.IncomingMessage, ws: WebSocket) => Promise<void | { code: number; data: string | Buffer }>;
// The document ID needs to be returned, and by default, it is extracted from the ws link in the format of ws:domain.com/docname, where docname is the document ID.
auth?: (request: http.IncomingMessage, ws: WebSocket) => Promise<void | { code: number; data: string |
// http server request listener
requestListener?: http.RequestListener;Buffer }>;
// Persistence options, false means no persistence
/**
* Default is leveldb
Expand All @@ -132,7 +133,88 @@ startServer({
contentField?: string;
// Update callback
callback?: UpdateCallback;
// Connection callback
onConnection?: (doc: WSSharedDoc, conn: WebSocket.WebSocket) => void;
})
```

#### Configuration Options

##### `auth` Custom authentication

The connection will be terminated when code !== 200.

The document ID needs to be returned, and by default, it is extracted from the ws link in the format of ws:domain.com/docname, where docname is the document ID.

```ts
const auth = async (request: http.IncomingMessage, ws: WebSocket) => {
const { url } = request;
const docname = url.split('/').pop();
if (!docname) return { code: 400, data: 'Not found' };
return docname;
};

startServer({ auth });
```

##### `requestListener` is used for customizing the request listener of the http server, which can be used for custom routing.

```ts
const app = express();
app.get('/doc/:name', (req, res) => {
res.send('hello world');
});

startServer(app);
```

##### `persistenceOptions` is used for customizing the persistence method. Currently supports leveldb and mongodb.

```ts
startServer({
persistenceOptions: {
provider: 'leveldb',
dir: './db',
},
});
```

```ts
startServer({
persistenceOptions: {
provider: 'mongodb',
url: 'mongodb://localhost:27017',
},
});
```

##### `contentField` is used to customize the field name of the document content. The default is `content`.

```ts
startServer({
contentField: 'content',
});
```

##### `callback` is used for customizing the update callback.

```ts
startServer({
callback: {
// Or use `action: string` to receive post requests via a url
action: (data: Record<string, any>) => {
// `data` is the updated data
},
// Timeout duration, defaulting to 5000ms
timeout: 5000;
// ContentType can be "Array" | "Map" | "Text" | "XmlFragment"
// Corresponding data types to be sent
objects?: Record<string, ContentType>;
},
});
```

##### `startServer` returns an `http.Server` instance, and you can get the corresponding `Y.Doc` instance via `server.getYDoc(name)`.

```ts
const server = startServer();
const doc = server.getYDoc('docname');
```
96 changes: 89 additions & 7 deletions docs/config/yjs.zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,17 @@ startServer();
```ts
startServer({
// 监听的 host,默认为 0.0.0.0
host: string;
host?: string;
// 监听的端口,默认为 1234
port: number;
// http server request listener
requestListener?: http.RequestListener;
port?: number;
// 自定义效验,返回 code !== 200 时,会终止连接
// 需要返回文档的 id,默认取 ws 链接中的 ws:domain.com/docname 其中 docname为文档 id
auth?: (
request: http.IncomingMessage,
ws: WebSocket,
) => Promise<void | { code: number; data: string | Buffer }>;
) => Promise<{ code: number; data: string } | string>;
// http server request listener
requestListener?: http.RequestListener;
// 持久化选项,false 为不持久化
/**
* 默认为 leveldb
Expand All @@ -133,7 +134,88 @@ startServer({
contentField?: string;
// 更新回调
callback?: UpdateCallback;
// 连接回调
onConnection?: (doc: WSSharedDoc, conn: WebSocket.WebSocket) => void;
})
```

#### 使用配置

##### `auth` 用于自定义效验

返回 code !== 200 时,会终止连接

需要返回文档的 id,默认取 ws 链接中的 ws:domain.com/docname 其中 docname 为文档 id

```ts
const auth = async (request: http.IncomingMessage, ws: WebSocket) => {
const { url } = request;
const docname = url.split('/').pop();
if (!docname) return { code: 400, data: '文档id不能为空' };
return docname;
};

startServer({ auth });
```

##### `requestListener` 用于自定义 http server 的 request listener,可以用于自定义路由

```ts
const app = express();
app.get('/doc/:name', (req, res) => {
res.send('hello world');
});

startServer(app);
```

##### `persistenceOptions` 用于自定义持久化方式,目前支持 `leveldb``mongodb`

```ts
startServer({
persistenceOptions: {
provider: 'leveldb',
dir: './db',
},
});
```

```ts
startServer({
persistenceOptions: {
provider: 'mongodb',
url: 'mongodb://localhost:27017',
},
});
```

##### `contentField` 用于自定义文档内容字段,默认为 `content`

```ts
startServer({
contentField: 'content',
});
```

##### `callback` 用于自定义更新回调

```ts
startServer({
callback: {
// 或则 action: string, 使用一个url来接收post请求
action: (data: Record<string, any>) => {
// data 为更新的数据
},
// 超时时间,默认为 5000
timeout: 5000;
// ContentType 为 "Array" | "Map" | "Text" | "XmlFragment"
// 需要发送的对应数据类型
objects?: Record<string, ContentType>;
},
});
```

##### `startServer` 会反应一个 `http.Server` 实例,可以通过 `server.getYDoc(name)` 获取对应的 `Y.Doc` 实例

```ts
const server = startServer();
const doc = server.getYDoc('docname');
```
7 changes: 6 additions & 1 deletion plugins/yjs-websocket/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@ export default (options: Partial<ServerOptions> = {}) => {
const {
host = process.env.HOST || '0.0.0.0',
port = parseInt(process.env.PORT || '1234') || 1234,
auth = (request) =>
request.url
? Promise.resolve(request.url.slice(1).split('?')[0] || '')
: Promise.reject('auth not implemented'),
} = options;
startServer({
return startServer({
auth,
...options,
host,
port,
Expand Down
14 changes: 7 additions & 7 deletions plugins/yjs-websocket/src/server/persistence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ interface Persistence {
bindState: (
docname: string,
doc: WSSharedDoc,
onInitialValue?: (content: Y.XmlElement) => Promise<void> | void,
onInitialValue?: (doc: WSSharedDoc) => Promise<void> | void,
) => void;
writeState: (
docname: string,
Expand Down Expand Up @@ -72,11 +72,11 @@ export const initPersistence = async (

persistence = {
provider: db,
bindState: async (docName, ydoc, onInitialValue) => {
bindState: async (docname, ydoc, onInitialValue) => {
if (!db) return;
const persistedYdoc = await db.getYDoc(docName);
const persistedYdoc = await db.getYDoc(docname);
const newUpdates = Y.encodeStateAsUpdate(ydoc);
db.storeUpdate(docName, newUpdates);
db.storeUpdate(docname, newUpdates);
const content = persistedYdoc.get(
contentField,
Y.XmlElement,
Expand All @@ -88,15 +88,15 @@ export const initPersistence = async (

Y.applyUpdate(ydoc, Y.encodeStateAsUpdate(persistedYdoc));
ydoc.on('update', (update) => {
db?.storeUpdate(docName, update);
db?.storeUpdate(docname, update);
});

// init empty content
if (content._length === 0 && updateContent._length === 0) {
if (onInitialValue) await onInitialValue(updateContent);
if (onInitialValue) await onInitialValue(ydoc);
}
},
writeState: async (docName, ydoc) => {
writeState: async (docname, ydoc) => {
// This is called when all connections to the document are closed.
// In the future, this method might also be called in intervals or after a certain number of updates.
return new Promise((resolve) => {
Expand Down
40 changes: 27 additions & 13 deletions plugins/yjs-websocket/src/server/start.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import WebSocket from 'ws';
import http from 'http';
import { setupWSConnection, UpdateCallback } from './utils';
import { getYDoc, setupWSConnection, UpdateCallback } from './utils';
import { initPersistence, PersistenceOptions } from './persistence';
import { WSSharedDoc } from './types';

Expand All @@ -11,15 +11,13 @@ export interface ServerOptions {
host: string;
// http server port
port: number;
// http server request listener
requestListener?: http.RequestListener;
// 效验
auth?: (
auth: (
request: http.IncomingMessage,
ws: WebSocket,
) => Promise<void | { code: number; data: string | Buffer }>;
// 连接回调
onConnection?: (doc: WSSharedDoc, conn: WebSocket.WebSocket) => void;
) => Promise<{ code: number; data: string } | string>;
// http server request listener
requestListener?: http.RequestListener;
// 持久化选项,false 为不持久化
persistenceOptions?: PersistenceOptions | false;
// 文档内容字段,默认为 content
Expand All @@ -30,11 +28,16 @@ export interface ServerOptions {

const SERVER_OPTIONS_WEAKMAP = new WeakMap<http.Server, ServerOptions>();

declare module 'http' {
interface Server {
getYDoc: (name: string) => WSSharedDoc;
}
}

export const startServer = (options: ServerOptions) => {
const {
auth = () => Promise.resolve(),
auth,
requestListener,
onConnection,
host,
port,
persistenceOptions = { provider: 'leveldb' },
Expand All @@ -50,12 +53,14 @@ export const startServer = (options: ServerOptions) => {
});

SERVER_OPTIONS_WEAKMAP.set(server, options);

const DOC_NAME_WEAKMAP = new WeakMap<WebSocket.WebSocket, string>();
wss.on('connection', (conn, req) => {
const { callback } = SERVER_OPTIONS_WEAKMAP.get(server) ?? {};
const name = DOC_NAME_WEAKMAP.get(conn);
if (!name) throw new Error('doc name not found');
setupWSConnection(conn, req, {
callback,
onConnection,
docname: name,
});
});

Expand All @@ -64,9 +69,12 @@ export const startServer = (options: ServerOptions) => {
// See https://github.com/websockets/ws#client-authentication
const handleAuth = (ws: WebSocket) => {
auth(request, ws).then((res) => {
if (res && res.code !== 200) {
ws.close(res.code, res.data);
const resObject =
typeof res === 'object' ? res : { code: 200, data: res };
if (resObject.code !== 200) {
ws.close(resObject.code, resObject.data);
} else {
DOC_NAME_WEAKMAP.set(ws, resObject.data);
wss.emit('connection', ws, request);
}
});
Expand All @@ -81,5 +89,11 @@ export const startServer = (options: ServerOptions) => {
server.listen(port, host, () => {
console.log(`running at '${host}' on port ${port}`);
});

server.getYDoc = (name) => {
const { callback } = SERVER_OPTIONS_WEAKMAP.get(server) ?? {};
return getYDoc(name, undefined, undefined, callback);
};

return server;
};
1 change: 1 addition & 0 deletions plugins/yjs-websocket/src/server/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export interface WSSharedDoc extends Doc {
conns: Map<WebSocket, Set<number>>;
awareness: awarenessProtocol.Awareness;
sendCustomMessage: (conn: WebSocket, message: Record<string, any>) => void;
broadcastCustomMessage: (message: Record<string, any>) => void;
}

export type ContentType = 'Array' | 'Map' | 'Text' | 'XmlFragment';
Loading

0 comments on commit 4fbdd92

Please sign in to comment.