Skip to content

Commit

Permalink
chore(2631): Refactor storage service to separate channel and message…
Browse files Browse the repository at this point in the history
… logic (#2713)

* Create channels service and small fix for loading chain on app start

* Separate logic further into channelstore and messagesservice

* Add temp placeholders for encryption/decryption

* Minor clean up

* Expand messages service, tweak getting messages, fix/add tests

* Fix subscribing not getting old messages on join

* Fix peer list not updating

* Add compounderror

* Update auth

* Update CHANGELOG.md
  • Loading branch information
islathehut committed Feb 4, 2025
1 parent fef02c0 commit 3c33f9d
Show file tree
Hide file tree
Showing 18 changed files with 1,409 additions and 630 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
### Chores

* Add `trace` level logs to `@quiet/logger` ([#2716](https://github.com/TryQuiet/quiet/issues/2716))
* Refactor the `StorageService` and create `ChannelService`, `MessageService` and `ChannelStore` for handling channel-related persistence ([#2631](https://github.com/TryQuiet/quiet/issues/2631))

## [3.0.0]

Expand Down
1 change: 0 additions & 1 deletion packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
"mock-fs": "^5.1.2",
"pvutils": "^1.1.3",
"tmp": "^0.2.1",
"pvutils": "^1.1.3",
"ts-jest": "^29.0.3",
"ts-loader": "9.4.2",
"ts-node": "10.9.1",
Expand Down
6 changes: 3 additions & 3 deletions packages/backend/src/nest/common/types.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { type EventsType } from '@orbitdb/core'
import { type ChannelMessage, type PublicChannel } from '@quiet/types'
import { type PublicChannel } from '@quiet/types'
import { ChannelStore } from '../storage/channels/channel.store'

export interface PublicChannelsRepo {
db: EventsType<ChannelMessage>
store: ChannelStore
eventsAttached: boolean
}

Expand Down
8 changes: 3 additions & 5 deletions packages/backend/src/nest/common/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import fs from 'fs'
import fsAsync from 'fs/promises'
import getPort from 'get-port'
import path from 'path'
import { Server } from 'socket.io'
Expand Down Expand Up @@ -298,19 +299,16 @@ export async function createPeerId(): Promise<CreatedLibp2pPeerId> {
}
}

export const createArbitraryFile = (filePath: string, sizeBytes: number) => {
const stream = fs.createWriteStream(filePath)
export const createArbitraryFile = async (filePath: string, sizeBytes: number) => {
const maxChunkSize = 1048576 // 1MB

let remainingSize = sizeBytes

while (remainingSize > 0) {
const chunkSize = Math.min(maxChunkSize, remainingSize)
stream.write(crypto.randomBytes(chunkSize))
await fsAsync.appendFile(filePath, crypto.randomBytes(chunkSize))
remainingSize -= chunkSize
}

stream.end()
}

export async function* asyncGeneratorFromIterator<T>(asyncIterator: AsyncIterable<T>): AsyncGenerator<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
this.serverIoProvider.io.emit(SocketActionTypes.CERTIFICATES_STORED, {
certificates: await this.storageService?.loadAllCertificates(),
})
await this.storageService?.loadAllChannels()
await this.storageService?.channels.loadAllChannels()
}
})
this.socketService.on(
Expand Down Expand Up @@ -994,7 +994,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
this.socketService.on(
SocketActionTypes.CREATE_CHANNEL,
async (args: CreateChannelPayload, callback: (response?: CreateChannelResponse) => void) => {
callback(await this.storageService?.subscribeToChannel(args.channel))
callback(await this.storageService?.channels.subscribeToChannel(args.channel))
}
)
this.socketService.on(
Expand All @@ -1003,39 +1003,39 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
payload: { channelId: string; ownerPeerId: string },
callback: (response: DeleteChannelResponse) => void
) => {
callback(await this.storageService?.deleteChannel(payload))
callback(await this.storageService?.channels.deleteChannel(payload))
}
)
this.socketService.on(
SocketActionTypes.DELETE_FILES_FROM_CHANNEL,
async (payload: DeleteFilesFromChannelSocketPayload) => {
this.logger.info(`socketService - ${SocketActionTypes.DELETE_FILES_FROM_CHANNEL}`)
await this.storageService?.deleteFilesFromChannel(payload)
await this.storageService?.channels.deleteFilesFromChannel(payload)
// await this.deleteFilesFromTemporaryDir() //crashes on mobile, will be fixes in next versions
}
)
this.socketService.on(SocketActionTypes.SEND_MESSAGE, async (args: SendMessagePayload) => {
await this.storageService?.sendMessage(args.message)
await this.storageService?.channels.sendMessage(args.message)
})
this.socketService.on(
SocketActionTypes.GET_MESSAGES,
async (payload: GetMessagesPayload, callback: (response?: MessagesLoadedPayload) => void) => {
callback(await this.storageService?.getMessages(payload.channelId, payload.ids))
callback(await this.storageService?.channels.getMessages(payload.channelId, payload.ids))
}
)

// Files
this.socketService.on(SocketActionTypes.DOWNLOAD_FILE, async (metadata: FileMetadata) => {
await this.storageService?.downloadFile(metadata)
await this.storageService?.channels.downloadFile(metadata)
})
this.socketService.on(SocketActionTypes.UPLOAD_FILE, async (metadata: FileMetadata) => {
await this.storageService?.uploadFile(metadata)
await this.storageService?.channels.uploadFile(metadata)
})
this.socketService.on(SocketActionTypes.FILE_UPLOADED, async (args: FileMetadata) => {
await this.storageService?.uploadFile(args)
await this.storageService?.channels.uploadFile(args)
})
this.socketService.on(SocketActionTypes.CANCEL_DOWNLOAD, mid => {
this.storageService?.cancelDownload(mid)
this.storageService?.channels.cancelDownload(mid)
})

// System
Expand All @@ -1051,46 +1051,48 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI

private attachStorageListeners() {
if (!this.storageService) return
this.storageService.on(SocketActionTypes.CONNECTION_PROCESS_INFO, data => {
this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, data)
})
this.storageService.on(StorageEvents.CERTIFICATES_STORED, (payload: SendCertificatesResponse) => {
this.logger.info(`Storage - ${StorageEvents.CERTIFICATES_STORED}`)
this.serverIoProvider.io.emit(SocketActionTypes.CERTIFICATES_STORED, payload)
})
this.storageService.on(StorageEvents.CHANNELS_STORED, (payload: ChannelsReplicatedPayload) => {
// Channel and Message Events
this.storageService.channels.on(StorageEvents.CHANNELS_STORED, (payload: ChannelsReplicatedPayload) => {
this.serverIoProvider.io.emit(SocketActionTypes.CHANNELS_STORED, payload)
})
this.storageService.on(StorageEvents.MESSAGES_STORED, (payload: MessagesLoadedPayload) => {
this.storageService.channels.on(StorageEvents.MESSAGES_STORED, (payload: MessagesLoadedPayload) => {
this.serverIoProvider.io.emit(SocketActionTypes.MESSAGES_STORED, payload)
})
this.storageService.on(StorageEvents.MESSAGE_IDS_STORED, (payload: ChannelMessageIdsResponse) => {
this.storageService.channels.on(StorageEvents.MESSAGE_IDS_STORED, (payload: ChannelMessageIdsResponse) => {
if (payload.ids.length === 0) {
return
}
this.serverIoProvider.io.emit(SocketActionTypes.MESSAGE_IDS_STORED, payload)
})
this.storageService.on(StorageEvents.CHANNEL_SUBSCRIBED, (payload: ChannelSubscribedPayload) => {
this.storageService.channels.on(StorageEvents.CHANNEL_SUBSCRIBED, (payload: ChannelSubscribedPayload) => {
this.serverIoProvider.io.emit(SocketActionTypes.CHANNEL_SUBSCRIBED, payload)
})
this.storageService.on(StorageEvents.REMOVE_DOWNLOAD_STATUS, (payload: RemoveDownloadStatus) => {
this.storageService.channels.on(StorageEvents.REMOVE_DOWNLOAD_STATUS, (payload: RemoveDownloadStatus) => {
this.serverIoProvider.io.emit(SocketActionTypes.REMOVE_DOWNLOAD_STATUS, payload)
})
this.storageService.on(StorageEvents.FILE_UPLOADED, (payload: UploadFilePayload) => {
this.storageService.channels.on(StorageEvents.FILE_UPLOADED, (payload: UploadFilePayload) => {
this.serverIoProvider.io.emit(SocketActionTypes.FILE_UPLOADED, payload)
})
this.storageService.on(StorageEvents.DOWNLOAD_PROGRESS, (payload: DownloadStatus) => {
this.storageService.channels.on(StorageEvents.DOWNLOAD_PROGRESS, (payload: DownloadStatus) => {
this.serverIoProvider.io.emit(SocketActionTypes.DOWNLOAD_PROGRESS, payload)
})
this.storageService.on(StorageEvents.MESSAGE_MEDIA_UPDATED, (payload: FileMetadata) => {
this.storageService.channels.on(StorageEvents.MESSAGE_MEDIA_UPDATED, (payload: FileMetadata) => {
this.serverIoProvider.io.emit(SocketActionTypes.MESSAGE_MEDIA_UPDATED, payload)
})
this.storageService.channels.on(StorageEvents.SEND_PUSH_NOTIFICATION, (payload: PushNotificationPayload) => {
this.serverIoProvider.io.emit(SocketActionTypes.PUSH_NOTIFICATION, payload)
})
// Other Events
this.storageService.on(SocketActionTypes.CONNECTION_PROCESS_INFO, data => {
this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, data)
})
this.storageService.on(StorageEvents.CERTIFICATES_STORED, (payload: SendCertificatesResponse) => {
this.logger.info(`Storage - ${StorageEvents.CERTIFICATES_STORED}`)
this.serverIoProvider.io.emit(SocketActionTypes.CERTIFICATES_STORED, payload)
})
this.storageService.on(StorageEvents.COMMUNITY_UPDATED, (payload: Community) => {
this.serverIoProvider.io.emit(SocketActionTypes.COMMUNITY_UPDATED, payload)
})
this.storageService.on(StorageEvents.SEND_PUSH_NOTIFICATION, (payload: PushNotificationPayload) => {
this.serverIoProvider.io.emit(SocketActionTypes.PUSH_NOTIFICATION, payload)
})
this.storageService.on(StorageEvents.CSRS_STORED, async (payload: { csrs: string[] }) => {
this.logger.info(`Storage - ${StorageEvents.CSRS_STORED}`)
const users = await getUsersFromCsrs(payload.csrs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ describe('IpfsFileManagerService', () => {
tmpDir = createTmpDir()
filePath = new URL('./testUtils/large-file.txt', import.meta.url).pathname
// Generate 2.1GB file
createArbitraryFile(filePath, BIG_FILE_SIZE)
await createArbitraryFile(filePath, BIG_FILE_SIZE)
module = await Test.createTestingModule({
imports: [TestModule, IpfsFileManagerModule, IpfsModule, SocketModule, Libp2pModule],
}).compile()
Expand Down
2 changes: 1 addition & 1 deletion packages/backend/src/nest/storage/base.store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ abstract class StoreBase<V, S extends KeyValueType<V> | EventsType<V>> extends E
logger.info('Closed', this.getAddress())
}

abstract init(): Promise<void>
abstract init(...args: any[]): Promise<void> | Promise<StoreBase<V, S>>
abstract clean(): void
}

Expand Down
Loading

0 comments on commit 3c33f9d

Please sign in to comment.