Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#6520 Implementation for chunking in WebRTC Plugin #6521

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 101 additions & 4 deletions src/plugins/replication-webrtc/connection-handler-simple-peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const Peer = _Peer as Peer
import type { RxError, RxTypeError } from '../../types/index.d.ts';
import { newRxError } from '../../rx-error.ts';

import { DEFAULT_CHUNK_SIZE } from './webrtc-helper';

export type SimplePeer = SimplePeerInstance & {
// add id to make debugging easier
id: string;
Expand Down Expand Up @@ -126,6 +128,8 @@ export function getConnectionHandlerSimplePeer({

const creator: WebRTCConnectionHandlerCreator<SimplePeer> = async (options: SyncOptionsWebRTC<any, SimplePeer>) => {

const chunkSize = options.chunkSize ? options.chunkSize : DEFAULT_CHUNK_SIZE;

const connect$ = new Subject<SimplePeer>();
const disconnect$ = new Subject<SimplePeer>();
const message$ = new Subject<PeerWithMessage<SimplePeer>>();
Expand Down Expand Up @@ -204,14 +208,77 @@ export function getConnectionHandlerSimplePeer({
});
});

newSimplePeer.on('data', (messageOrResponse: any) => {
newSimplePeer.on('data', async (messageOrResponse: any) => {
messageOrResponse = JSON.parse(messageOrResponse.toString());
if (messageOrResponse.result) {

if (messageOrResponse.type === 'data-request') {
const dataId = messageOrResponse.dataId; // Unique identifier for the data
const data = messageOrResponse.data; // The data to be sent


const dataString = JSON.stringify(data);
const totalBytes = dataString.length;

let offset = 0;
while (offset < totalBytes) {
const chunk = dataString.substring(offset, offset + chunkSize);
newSimplePeer.send(JSON.stringify({
type: 'data-chunk',
dataId: dataId,
chunk: chunk,
offset: offset,
totalBytes: totalBytes
}));
offset += chunkSize;
await promiseWait(0) // yield to avoid blocking the event loop
}

newSimplePeer.send(JSON.stringify({
type: 'data-end',
dataId: dataId
}));

} else if (messageOrResponse.type === 'data-chunk') {

const dataId = messageOrResponse.dataId;
let messageChunks = reassembledMessages.get(dataId);
if (!messageChunks) {
messageChunks = { chunks: [], receivedBytes: 0, totalBytes: messageOrResponse.totalBytes };
reassembledMessages.set(dataId, messageChunks);
}
messageChunks.chunks[messageOrResponse.offset / chunkSize] = messageOrResponse.chunk;
messageChunks.receivedBytes += messageOrResponse.chunk.length;

} else if (messageOrResponse.type === 'data-end') {

const dataId = messageOrResponse.dataId;
const messageChunks = reassembledMessages.get(dataId);

if (messageChunks) {
const combinedMessageString = messageChunks.chunks.join('');
const reconstructedMessage = JSON.parse(combinedMessageString); // Reconstruct the data
if (reconstructedMessage.result) {
response$.next({
peer: newSimplePeer,
response: reconstructedMessage
});
} else {
message$.next({
peer: newSimplePeer,
message: reconstructedMessage
});
}

reassembledMessages.delete(dataId)
}
} else if (messageOrResponse.result) {
//Handle messages that weren't chunked
response$.next({
peer: newSimplePeer,
response: messageOrResponse
});
} else {
} else {
//Handle messages that weren't chunked
message$.next({
peer: newSimplePeer,
message: messageOrResponse
Expand Down Expand Up @@ -270,7 +337,33 @@ export function getConnectionHandlerSimplePeer({
message$,
response$,
async send(peer: SimplePeer, message: WebRTCMessage) {
await peer.send(JSON.stringify(message));
const dataId = randomCouchString(10);

if (JSON.stringify(message).length > chunkSize) {
const messageString = JSON.stringify(message);
const totalBytes = messageString.length;
let offset = 0;

while (offset < totalBytes) {
const chunk = messageString.substring(offset, offset + chunkSize);
await peer.send(JSON.stringify({
type: 'data-chunk',
dataId: dataId,
chunk: chunk,
offset: offset,
totalBytes: totalBytes
} as any)); // Cast as any to bypass type checking
offset += chunkSize;
await promiseWait(0); // Yield to avoid blocking
}

await peer.send(JSON.stringify({
type: 'data-end',
dataId: dataId
} as any));
} else {
await peer.send(JSON.stringify(message));
}
},
destroy() {
closed = true;
Expand All @@ -283,6 +376,10 @@ export function getConnectionHandlerSimplePeer({
return PROMISE_RESOLVE_VOID;
}
};

const reassembledMessages = new Map<string, { chunks: string[]; receivedBytes: number; totalBytes: number }>();


return handler;
};
return creator;
Expand Down
2 changes: 1 addition & 1 deletion src/plugins/replication-webrtc/webrtc-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type {
} from './webrtc-types.ts';
import { filter, firstValueFrom, map } from 'rxjs';


export const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64KB chunks

/**
* To deterministically define which peer is master and
Expand Down
6 changes: 6 additions & 0 deletions src/plugins/replication-webrtc/webrtc-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ export type SyncOptionsWebRTC<RxDocType, PeerType> = Omit<
*/
topic: string;
connectionHandlerCreator: WebRTCConnectionHandlerCreator<PeerType>;

/**
* The size of each chunk to be sent.
*/
chunkSize?: number;

/**
* Run on new peers so that bad peers can be blocked.
* If returns true, the peer is valid and it will replicate.
Expand Down
Loading