Skip to content

Commit

Permalink
feat(nbstore): improve nbstore
Browse files Browse the repository at this point in the history
  • Loading branch information
EYHN committed Jan 3, 2025
1 parent 2074bda commit 3e2cbdd
Show file tree
Hide file tree
Showing 96 changed files with 3,363 additions and 3,331 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions blocksuite/framework/store/shim.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ declare module 'y-protocols/awareness.js' {
value: State[Field]
): void;
}
export { applyAwarenessUpdate, encodeAwarenessUpdate, modifyAwarenessUpdate, removeAwarenessStates } from 'y-protocols/awareness'
}
3 changes: 3 additions & 0 deletions packages/common/env/src/ua-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ export class UaHelper {
}

private isStandaloneMode() {
if (typeof window === 'undefined') {
return false;
}
if ('standalone' in window.navigator) {
return !!window.navigator.standalone;
}
Expand Down
11 changes: 7 additions & 4 deletions packages/common/infra/src/op/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class OpClient<Ops extends OpSchema> extends AutoMessageHandler {
private readonly pendingCalls = new Map<string, PendingCall>();
private readonly obs = new Map<string, Observer<any>>();
private readonly options: OpClientOptions = {
timeout: 3000,
timeout: Infinity,
};

constructor(port: MessageCommunicapable, options: OpClientOptions = {}) {
Expand Down Expand Up @@ -139,9 +139,12 @@ export class OpClient<Ops extends OpSchema> extends AutoMessageHandler {
raise('canceled');
};

const timeout = setTimeout(() => {
raise('timeout');
}, this.options.timeout);
const timeout =
this.options.timeout === Infinity
? 0
: setTimeout(() => {
raise('timeout');
}, this.options.timeout);

const transferables = fetchTransferables(payload);

Expand Down
3 changes: 2 additions & 1 deletion packages/common/nbstore/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
"sideEffects": false,
"exports": {
".": "./src/index.ts",
"./worker": "./src/worker/index.ts",
"./worker/client": "./src/worker/client.ts",
"./worker/consumer": "./src/worker/consumer.ts",
"./idb": "./src/impls/idb/index.ts",
"./idb/v1": "./src/impls/idb/v1/index.ts",
"./cloud": "./src/impls/cloud/index.ts",
Expand Down
29 changes: 18 additions & 11 deletions packages/common/nbstore/src/__tests__/frontend.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { DocFrontend } from '../frontend/doc';
import { BroadcastChannelAwarenessStorage } from '../impls/broadcast-channel/awareness';
import { IndexedDBDocStorage } from '../impls/idb';
import { AwarenessSyncImpl } from '../sync/awareness';
import { DocSyncImpl } from '../sync/doc';
import { expectYjsEqual } from './utils';

test('doc', async () => {
Expand All @@ -19,15 +20,15 @@ test('doc', async () => {

const docStorage = new IndexedDBDocStorage({
id: 'ws1',
peer: 'a',
flavour: 'a',
type: 'workspace',
});

docStorage.connection.connect();

await docStorage.connection.waitForConnected();

const frontend1 = new DocFrontend(docStorage, null);
const frontend1 = new DocFrontend(docStorage, DocSyncImpl.dummy);
frontend1.start();
frontend1.addDoc(doc1);
await vitest.waitFor(async () => {
Expand All @@ -42,7 +43,7 @@ test('doc', async () => {
const doc2 = new YDoc({
guid: 'test-doc',
});
const frontend2 = new DocFrontend(docStorage, null);
const frontend2 = new DocFrontend(docStorage, DocSyncImpl.dummy);
frontend2.start();
frontend2.addDoc(doc2);

Expand All @@ -57,15 +58,11 @@ test('doc', async () => {

test('awareness', async () => {
const storage1 = new BroadcastChannelAwarenessStorage({
id: 'ws1',
peer: 'a',
type: 'workspace',
id: 'ws1:a',
});

const storage2 = new BroadcastChannelAwarenessStorage({
id: 'ws1',
peer: 'b',
type: 'workspace',
id: 'ws1:b',
});

storage1.connection.connect();
Expand All @@ -90,13 +87,23 @@ test('awareness', async () => {
const awarenessC = new Awareness(docC);

{
const sync = new AwarenessSyncImpl(storage1, [storage2]);
const sync = new AwarenessSyncImpl({
local: storage1,
remotes: {
b: storage2,
},
});
const frontend = new AwarenessFrontend(sync);
frontend.connect(awarenessA);
frontend.connect(awarenessB);
}
{
const sync = new AwarenessSyncImpl(storage2, [storage1]);
const sync = new AwarenessSyncImpl({
local: storage2,
remotes: {
a: storage1,
},
});
const frontend = new AwarenessFrontend(sync);
frontend.connect(awarenessC);
}
Expand Down
55 changes: 40 additions & 15 deletions packages/common/nbstore/src/__tests__/sync.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,37 @@ test('doc', async () => {

const peerADoc = new IndexedDBDocStorage({
id: 'ws1',
peer: 'a',
flavour: 'a',
type: 'workspace',
});

const peerASync = new IndexedDBSyncStorage({
id: 'ws1',
peer: 'a',
flavour: 'a',
type: 'workspace',
});

const peerBDoc = new IndexedDBDocStorage({
id: 'ws1',
peer: 'b',
flavour: 'b',
type: 'workspace',
});
const peerCDoc = new IndexedDBDocStorage({
id: 'ws1',
peer: 'c',
flavour: 'c',
type: 'workspace',
});

const peerA = new SpaceStorage([peerADoc, peerASync]);
const peerB = new SpaceStorage([peerBDoc]);
const peerC = new SpaceStorage([peerCDoc]);
const peerA = new SpaceStorage({
doc: peerADoc,
sync: peerASync,
});
const peerB = new SpaceStorage({
doc: peerBDoc,
});
const peerC = new SpaceStorage({
doc: peerCDoc,
});

peerA.connect();
peerB.connect();
Expand All @@ -57,7 +64,13 @@ test('doc', async () => {
bin: update,
});

const sync = new Sync(peerA, [peerB, peerC]);
const sync = new Sync({
local: peerA,
remotes: {
b: peerB,
c: peerC,
},
});
sync.start();

await new Promise(resolve => setTimeout(resolve, 1000));
Expand Down Expand Up @@ -109,25 +122,31 @@ test('doc', async () => {
test('blob', async () => {
const a = new IndexedDBBlobStorage({
id: 'ws1',
peer: 'a',
flavour: 'a',
type: 'workspace',
});

const b = new IndexedDBBlobStorage({
id: 'ws1',
peer: 'b',
flavour: 'b',
type: 'workspace',
});

const c = new IndexedDBBlobStorage({
id: 'ws1',
peer: 'c',
flavour: 'c',
type: 'workspace',
});

const peerA = new SpaceStorage([a]);
const peerB = new SpaceStorage([b]);
const peerC = new SpaceStorage([c]);
const peerA = new SpaceStorage({
blob: a,
});
const peerB = new SpaceStorage({
blob: b,
});
const peerC = new SpaceStorage({
blob: c,
});

peerA.connect();
peerB.connect();
Expand All @@ -151,7 +170,13 @@ test('blob', async () => {
createdAt: new Date(100),
});

const sync = new Sync(peerA, [peerB, peerC]);
const sync = new Sync({
local: peerA,
remotes: {
b: peerB,
c: peerC,
},
});
sync.start();

await new Promise(resolve => setTimeout(resolve, 1000));
Expand Down
1 change: 1 addition & 0 deletions packages/common/nbstore/src/connection/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ export abstract class AutoReconnectConnection<T = any>
})
.catch(error => {
if (!this.connectingAbort?.signal.aborted) {
console.error('failed to connect', error);
this.setStatus('error', error as any);
}
});
Expand Down
2 changes: 1 addition & 1 deletion packages/common/nbstore/src/frontend/awareness.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
applyAwarenessUpdate,
type Awareness,
encodeAwarenessUpdate,
} from 'y-protocols/awareness.js';
} from 'y-protocols/awareness';

import type { AwarenessRecord } from '../storage/awareness';
import type { AwarenessSync } from '../sync/awareness';
Expand Down
24 changes: 18 additions & 6 deletions packages/common/nbstore/src/frontend/blob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,33 @@ import type { BlobSync } from '../sync/blob';

export class BlobFrontend {
constructor(
readonly storage: BlobStorage,
readonly sync?: BlobSync
public readonly storage: BlobStorage,
private readonly sync: BlobSync
) {}

get(blobId: string) {
return this.sync
? this.sync.downloadBlob(blobId)
: this.storage.get(blobId);
return this.sync.downloadBlob(blobId);
}

set(blob: BlobRecord) {
return this.sync ? this.sync.uploadBlob(blob) : this.storage.set(blob);
return this.sync.uploadBlob(blob);
}

fullSync() {
return this.sync.fullSync();
}

addPriority(_id: string, _priority: number) {
// not support yet
}

readonly state$ = this.sync.state$;

setMaxBlobSize(max: number) {
this.sync.setMaxBlobSize(max);
}

onReachedMaxBlobSize(cb: (byteSize: number) => void): () => void {
return this.sync.onReachedMaxBlobSize(cb);
}
}
Loading

0 comments on commit 3e2cbdd

Please sign in to comment.