diff --git a/CHANGELOG.md b/CHANGELOG.md index b6888216b..636ebdcaa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ ### Chores * Add `trace` level logs to `@quiet/logger` ([#2716](https://github.com/TryQuiet/quiet/issues/2716)) +* Refactor the `StorageService` and create `ChannelService`, `MessageService` and `ChannelStore` for handling channel-related persistence ([#2631](https://github.com/TryQuiet/quiet/issues/2631)) ## [3.0.0] diff --git a/packages/backend/package.json b/packages/backend/package.json index 0e223593c..c792b3f71 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -82,7 +82,6 @@ "mock-fs": "^5.1.2", "pvutils": "^1.1.3", "tmp": "^0.2.1", - "pvutils": "^1.1.3", "ts-jest": "^29.0.3", "ts-loader": "9.4.2", "ts-node": "10.9.1", diff --git a/packages/backend/src/nest/common/types.ts b/packages/backend/src/nest/common/types.ts index 43a3d4189..e5a3c3285 100644 --- a/packages/backend/src/nest/common/types.ts +++ b/packages/backend/src/nest/common/types.ts @@ -1,8 +1,8 @@ -import { type EventsType } from '@orbitdb/core' -import { type ChannelMessage, type PublicChannel } from '@quiet/types' +import { type PublicChannel } from '@quiet/types' +import { ChannelStore } from '../storage/channels/channel.store' export interface PublicChannelsRepo { - db: EventsType + store: ChannelStore eventsAttached: boolean } diff --git a/packages/backend/src/nest/common/utils.ts b/packages/backend/src/nest/common/utils.ts index a0fc69082..30f527e4e 100644 --- a/packages/backend/src/nest/common/utils.ts +++ b/packages/backend/src/nest/common/utils.ts @@ -1,4 +1,5 @@ import fs from 'fs' +import fsAsync from 'fs/promises' import getPort from 'get-port' import path from 'path' import { Server } from 'socket.io' @@ -298,19 +299,16 @@ export async function createPeerId(): Promise { } } -export const createArbitraryFile = (filePath: string, sizeBytes: number) => { - const stream = fs.createWriteStream(filePath) +export const createArbitraryFile = async (filePath: string, sizeBytes: number) => { const maxChunkSize = 1048576 // 1MB let remainingSize = sizeBytes while (remainingSize > 0) { const chunkSize = Math.min(maxChunkSize, remainingSize) - stream.write(crypto.randomBytes(chunkSize)) + await fsAsync.appendFile(filePath, crypto.randomBytes(chunkSize)) remainingSize -= chunkSize } - - stream.end() } export async function* asyncGeneratorFromIterator(asyncIterator: AsyncIterable): AsyncGenerator { diff --git a/packages/backend/src/nest/connections-manager/connections-manager.service.ts b/packages/backend/src/nest/connections-manager/connections-manager.service.ts index d6a2d7c82..6f8b1d8d2 100644 --- a/packages/backend/src/nest/connections-manager/connections-manager.service.ts +++ b/packages/backend/src/nest/connections-manager/connections-manager.service.ts @@ -896,7 +896,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI this.serverIoProvider.io.emit(SocketActionTypes.CERTIFICATES_STORED, { certificates: await this.storageService?.loadAllCertificates(), }) - await this.storageService?.loadAllChannels() + await this.storageService?.channels.loadAllChannels() } }) this.socketService.on( @@ -994,7 +994,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI this.socketService.on( SocketActionTypes.CREATE_CHANNEL, async (args: CreateChannelPayload, callback: (response?: CreateChannelResponse) => void) => { - callback(await this.storageService?.subscribeToChannel(args.channel)) + callback(await this.storageService?.channels.subscribeToChannel(args.channel)) } ) this.socketService.on( @@ -1003,39 +1003,39 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI payload: { channelId: string; ownerPeerId: string }, callback: (response: DeleteChannelResponse) => void ) => { - callback(await this.storageService?.deleteChannel(payload)) + callback(await this.storageService?.channels.deleteChannel(payload)) } ) this.socketService.on( SocketActionTypes.DELETE_FILES_FROM_CHANNEL, async (payload: DeleteFilesFromChannelSocketPayload) => { this.logger.info(`socketService - ${SocketActionTypes.DELETE_FILES_FROM_CHANNEL}`) - await this.storageService?.deleteFilesFromChannel(payload) + await this.storageService?.channels.deleteFilesFromChannel(payload) // await this.deleteFilesFromTemporaryDir() //crashes on mobile, will be fixes in next versions } ) this.socketService.on(SocketActionTypes.SEND_MESSAGE, async (args: SendMessagePayload) => { - await this.storageService?.sendMessage(args.message) + await this.storageService?.channels.sendMessage(args.message) }) this.socketService.on( SocketActionTypes.GET_MESSAGES, async (payload: GetMessagesPayload, callback: (response?: MessagesLoadedPayload) => void) => { - callback(await this.storageService?.getMessages(payload.channelId, payload.ids)) + callback(await this.storageService?.channels.getMessages(payload.channelId, payload.ids)) } ) // Files this.socketService.on(SocketActionTypes.DOWNLOAD_FILE, async (metadata: FileMetadata) => { - await this.storageService?.downloadFile(metadata) + await this.storageService?.channels.downloadFile(metadata) }) this.socketService.on(SocketActionTypes.UPLOAD_FILE, async (metadata: FileMetadata) => { - await this.storageService?.uploadFile(metadata) + await this.storageService?.channels.uploadFile(metadata) }) this.socketService.on(SocketActionTypes.FILE_UPLOADED, async (args: FileMetadata) => { - await this.storageService?.uploadFile(args) + await this.storageService?.channels.uploadFile(args) }) this.socketService.on(SocketActionTypes.CANCEL_DOWNLOAD, mid => { - this.storageService?.cancelDownload(mid) + this.storageService?.channels.cancelDownload(mid) }) // System @@ -1051,46 +1051,48 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI private attachStorageListeners() { if (!this.storageService) return - this.storageService.on(SocketActionTypes.CONNECTION_PROCESS_INFO, data => { - this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, data) - }) - this.storageService.on(StorageEvents.CERTIFICATES_STORED, (payload: SendCertificatesResponse) => { - this.logger.info(`Storage - ${StorageEvents.CERTIFICATES_STORED}`) - this.serverIoProvider.io.emit(SocketActionTypes.CERTIFICATES_STORED, payload) - }) - this.storageService.on(StorageEvents.CHANNELS_STORED, (payload: ChannelsReplicatedPayload) => { + // Channel and Message Events + this.storageService.channels.on(StorageEvents.CHANNELS_STORED, (payload: ChannelsReplicatedPayload) => { this.serverIoProvider.io.emit(SocketActionTypes.CHANNELS_STORED, payload) }) - this.storageService.on(StorageEvents.MESSAGES_STORED, (payload: MessagesLoadedPayload) => { + this.storageService.channels.on(StorageEvents.MESSAGES_STORED, (payload: MessagesLoadedPayload) => { this.serverIoProvider.io.emit(SocketActionTypes.MESSAGES_STORED, payload) }) - this.storageService.on(StorageEvents.MESSAGE_IDS_STORED, (payload: ChannelMessageIdsResponse) => { + this.storageService.channels.on(StorageEvents.MESSAGE_IDS_STORED, (payload: ChannelMessageIdsResponse) => { if (payload.ids.length === 0) { return } this.serverIoProvider.io.emit(SocketActionTypes.MESSAGE_IDS_STORED, payload) }) - this.storageService.on(StorageEvents.CHANNEL_SUBSCRIBED, (payload: ChannelSubscribedPayload) => { + this.storageService.channels.on(StorageEvents.CHANNEL_SUBSCRIBED, (payload: ChannelSubscribedPayload) => { this.serverIoProvider.io.emit(SocketActionTypes.CHANNEL_SUBSCRIBED, payload) }) - this.storageService.on(StorageEvents.REMOVE_DOWNLOAD_STATUS, (payload: RemoveDownloadStatus) => { + this.storageService.channels.on(StorageEvents.REMOVE_DOWNLOAD_STATUS, (payload: RemoveDownloadStatus) => { this.serverIoProvider.io.emit(SocketActionTypes.REMOVE_DOWNLOAD_STATUS, payload) }) - this.storageService.on(StorageEvents.FILE_UPLOADED, (payload: UploadFilePayload) => { + this.storageService.channels.on(StorageEvents.FILE_UPLOADED, (payload: UploadFilePayload) => { this.serverIoProvider.io.emit(SocketActionTypes.FILE_UPLOADED, payload) }) - this.storageService.on(StorageEvents.DOWNLOAD_PROGRESS, (payload: DownloadStatus) => { + this.storageService.channels.on(StorageEvents.DOWNLOAD_PROGRESS, (payload: DownloadStatus) => { this.serverIoProvider.io.emit(SocketActionTypes.DOWNLOAD_PROGRESS, payload) }) - this.storageService.on(StorageEvents.MESSAGE_MEDIA_UPDATED, (payload: FileMetadata) => { + this.storageService.channels.on(StorageEvents.MESSAGE_MEDIA_UPDATED, (payload: FileMetadata) => { this.serverIoProvider.io.emit(SocketActionTypes.MESSAGE_MEDIA_UPDATED, payload) }) + this.storageService.channels.on(StorageEvents.SEND_PUSH_NOTIFICATION, (payload: PushNotificationPayload) => { + this.serverIoProvider.io.emit(SocketActionTypes.PUSH_NOTIFICATION, payload) + }) + // Other Events + this.storageService.on(SocketActionTypes.CONNECTION_PROCESS_INFO, data => { + this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, data) + }) + this.storageService.on(StorageEvents.CERTIFICATES_STORED, (payload: SendCertificatesResponse) => { + this.logger.info(`Storage - ${StorageEvents.CERTIFICATES_STORED}`) + this.serverIoProvider.io.emit(SocketActionTypes.CERTIFICATES_STORED, payload) + }) this.storageService.on(StorageEvents.COMMUNITY_UPDATED, (payload: Community) => { this.serverIoProvider.io.emit(SocketActionTypes.COMMUNITY_UPDATED, payload) }) - this.storageService.on(StorageEvents.SEND_PUSH_NOTIFICATION, (payload: PushNotificationPayload) => { - this.serverIoProvider.io.emit(SocketActionTypes.PUSH_NOTIFICATION, payload) - }) this.storageService.on(StorageEvents.CSRS_STORED, async (payload: { csrs: string[] }) => { this.logger.info(`Storage - ${StorageEvents.CSRS_STORED}`) const users = await getUsersFromCsrs(payload.csrs) diff --git a/packages/backend/src/nest/ipfs-file-manager/big-files.long.spec.ts b/packages/backend/src/nest/ipfs-file-manager/big-files.long.spec.ts index 3ad536b46..947101e95 100644 --- a/packages/backend/src/nest/ipfs-file-manager/big-files.long.spec.ts +++ b/packages/backend/src/nest/ipfs-file-manager/big-files.long.spec.ts @@ -33,7 +33,7 @@ describe('IpfsFileManagerService', () => { tmpDir = createTmpDir() filePath = new URL('./testUtils/large-file.txt', import.meta.url).pathname // Generate 2.1GB file - createArbitraryFile(filePath, BIG_FILE_SIZE) + await createArbitraryFile(filePath, BIG_FILE_SIZE) module = await Test.createTestingModule({ imports: [TestModule, IpfsFileManagerModule, IpfsModule, SocketModule, Libp2pModule], }).compile() diff --git a/packages/backend/src/nest/storage/base.store.ts b/packages/backend/src/nest/storage/base.store.ts index 290ed42df..d4b97f926 100644 --- a/packages/backend/src/nest/storage/base.store.ts +++ b/packages/backend/src/nest/storage/base.store.ts @@ -24,7 +24,7 @@ abstract class StoreBase | EventsType> extends E logger.info('Closed', this.getAddress()) } - abstract init(): Promise + abstract init(...args: any[]): Promise | Promise> abstract clean(): void } diff --git a/packages/backend/src/nest/storage/channels/channel.store.ts b/packages/backend/src/nest/storage/channels/channel.store.ts new file mode 100644 index 000000000..0ab02260c --- /dev/null +++ b/packages/backend/src/nest/storage/channels/channel.store.ts @@ -0,0 +1,288 @@ +import { Injectable } from '@nestjs/common' + +import { EventsType, LogEntry } from '@orbitdb/core' + +import { QuietLogger } from '@quiet/logger' +import { + ChannelMessage, + CompoundError, + MessagesLoadedPayload, + PublicChannel, + PushNotificationPayload, +} from '@quiet/types' + +import { createLogger } from '../../common/logger' +import { EventStoreBase } from '../base.store' +import { EventsWithStorage } from '../orbitDb/eventsWithStorage' +import { MessagesAccessController } from '../orbitDb/MessagesAccessController' +import { OrbitDbService } from '../orbitDb/orbitDb.service' +import validate from '../../validation/validators' +import { MessagesService } from './messages/messages.service' +import { DBOptions, StorageEvents } from '../storage.types' +import { LocalDbService } from '../../local-db/local-db.service' +import { CertificatesStore } from '../certificates/certificates.store' + +/** + * Manages storage-level logic for a given channel in Quiet + */ +@Injectable() +export class ChannelStore extends EventStoreBase { + private channelData: PublicChannel + private _subscribing: boolean = false + + private logger: QuietLogger + + constructor( + private readonly orbitDbService: OrbitDbService, + private readonly localDbService: LocalDbService, + private readonly messagesService: MessagesService, + private readonly certificatesStore: CertificatesStore + ) { + super() + } + + // Initialization + + /** + * Initialize this instance of ChannelStore by opening an OrbitDB database + * + * @param channelData Channel configuration metadata + * @param options Database options for OrbitDB + * @returns Initialized ChannelStore instance + */ + public async init(channelData: PublicChannel, options: DBOptions): Promise { + if (this.store != null) { + this.logger.warn(`Channel ${this.channelData.name} has already been initialized!`) + return this + } + + this.channelData = channelData + this.logger = createLogger(`storage:channels:channelStore:${this.channelData.name}`) + this.logger.info(`Initializing channel store for channel ${this.channelData.name}`) + + this.store = await this.orbitDbService.orbitDb.open>(`channels.${this.channelData.id}`, { + type: 'events', + Database: EventsWithStorage(), + AccessController: MessagesAccessController({ write: ['*'] }), + sync: options.sync, + }) + + this.logger.info('Initialized') + return this + } + + /** + * Start syncing the OrbitDB database + */ + public async startSync(): Promise { + await this.getStore().sync.start() + } + + // Accessors + + public get isSubscribing(): boolean { + return this._subscribing + } + + /** + * Subscribe to new messages on this channel + * + * @emits StorageEvents.MESSAGE_IDS_STORED + * @emits StorageEvents.MESSAGES_STORED + * @emits StorageEvents.SEND_PUSH_NOTIFICATION + */ + public async subscribe(): Promise { + this.logger.info('Subscribing to channel ', this.channelData.id) + this._subscribing = true + + this.getStore().events.on('update', async (entry: LogEntry) => { + this.logger.info(`${this.channelData.id} database updated`, entry.hash, entry.payload.value?.channelId) + + const message = await this.messagesService.onConsume(entry.payload.value!) + + this.emit(StorageEvents.MESSAGES_STORED, { + messages: [message], + isVerified: message.verified, + }) + + await this.refreshMessageIds() + + // FIXME: the 'update' event runs if we replicate entries and if we add + // entries ourselves. So we may want to check if the message is written + // by us. + // + // Display push notifications on mobile + if (process.env.BACKEND === 'mobile') { + if (!message.verified) return + + // Do not notify about old messages + if (message.createdAt < parseInt(process.env.CONNECTION_TIME || '')) return + + const username = await this.certificatesStore.getCertificateUsername(message.pubKey) + if (!username) { + this.logger.error(`Can't send push notification, no username found for public key '${message.pubKey}'`) + return + } + + const payload: PushNotificationPayload = { + message: JSON.stringify(message), + username: username, + } + + this.emit(StorageEvents.SEND_PUSH_NOTIFICATION, payload) + } + }) + + await this.startSync() + await this.refreshMessageIds() + this._subscribing = false + + this.logger.info(`Subscribed to channel ${this.channelData.id}`) + } + + // Messages + + /** + * Validate and append a new message to this channel's OrbitDB database + * + * @param message Message to add to the OrbitDB database + */ + public async sendMessage(message: ChannelMessage): Promise { + this.logger.info(`Sending message with ID ${message.id} on channel ${this.channelData.id}`) + if (!validate.isMessage(message)) { + this.logger.error('Public channel message is invalid') + return + } + + if (message.channelId != this.channelData.id) { + this.logger.error( + `Could not send message. Message is for channel ID ${message.channelId} which does not match channel ID ${this.channelData.id}` + ) + return + } + + try { + await this.addEntry(message) + } catch (e) { + this.logger.error(`Could not append message (entry not allowed to write to the log). Details: ${e.message}`) + } + } + + /** + * Read messages from OrbitDB, optionally filtered by message ID + * + * @param ids Message IDs to read from this channel's OrbitDB database + * @returns Messages read from OrbitDB + */ + public async getMessages(ids: string[] | undefined = undefined): Promise { + const messages = await this.getEntries(ids) + return { + messages, + isVerified: true, + } + } + + /** + * Get the latest state of messages in OrbitDB and emit an event to trigger redux updates + * + * @emits StorageEvents.MESSAGE_IDS_STORED + */ + private async refreshMessageIds(): Promise { + const ids = (await this.getEntries()).map(msg => msg.id) + const community = await this.localDbService.getCurrentCommunity() + + if (community) { + this.emit(StorageEvents.MESSAGE_IDS_STORED, { + ids, + channelId: this.channelData.id, + communityId: community.id, + }) + } + } + + // Base Store Logic + + /** + * Add a new event to the OrbitDB event store + * + * @param message Message to add to the OrbitDB database + * @returns Hash of the new database entry + */ + public async addEntry(message: ChannelMessage): Promise { + this.logger.info('Adding message to database') + const encryptedMessage = await this.messagesService.onSend(message) + try { + return await this.getStore().add(encryptedMessage) + } catch (e) { + throw new CompoundError(`Could not append message (entry not allowed to write to the log)`, e) + } + } + + /** + * Read a list of entries on the OrbitDB event store + * + * @param ids Optional list of message IDs to filter by + * @returns All matching entries on the event store + */ + public async getEntries(): Promise + public async getEntries(ids: string[] | undefined): Promise + public async getEntries(ids?: string[] | undefined): Promise { + this.logger.info(`Getting all messages for channel`, this.channelData.id, this.channelData.name) + const messages: ChannelMessage[] = [] + + for await (const x of this.getStore().iterator()) { + if (ids == null || ids?.includes(x.value.id)) { + // NOTE: we skipped the verification process when reading many messages in the previous version + // so I'm skipping it here - is that really the correct behavior? + const processedMessage = await this.messagesService.onConsume(x.value, false) + messages.push(processedMessage) + } + } + + return messages + } + + // Close Logic + + /** + * Stop syncing the OrbitDB database + */ + public async stopSync(): Promise { + await this.getStore().sync.stop() + } + + /** + * Close the OrbitDB database + */ + public async close(): Promise { + this.logger.info(`Closing channel store`) + await this.stopSync() + await this.getStore().close() + } + + /** + * Delete the channel from OrbitDB + */ + public async deleteChannel(): Promise { + this.logger.info(`Deleting channel`) + try { + await this.stopSync() + await this.getStore().drop() + } catch (e) { + // we expect an error if the database isn't synced + } + + this.clean() + } + + /** + * Clean this ChannelStore + * + * NOTE: Does NOT affect data stored in IPFS + */ + public clean(): void { + this.logger.info(`Cleaning channel store`, this.channelData.id, this.channelData.name) + this.store = undefined + this._subscribing = false + } +} diff --git a/packages/backend/src/nest/storage/channels/channels.service.spec.ts b/packages/backend/src/nest/storage/channels/channels.service.spec.ts new file mode 100644 index 000000000..f92b720da --- /dev/null +++ b/packages/backend/src/nest/storage/channels/channels.service.spec.ts @@ -0,0 +1,278 @@ +import { jest } from '@jest/globals' + +import { Test, TestingModule } from '@nestjs/testing' +import { keyFromCertificate, parseCertificate } from '@quiet/identity' +import { + prepareStore, + getFactory, + publicChannels, + generateMessageFactoryContentWithId, + Store, +} from '@quiet/state-manager' +import { + ChannelMessage, + Community, + FileMetadata, + Identity, + MessageType, + PublicChannel, + TestMessage, +} from '@quiet/types' + +import path from 'path' +import { type PeerId } from '@libp2p/interface' +import waitForExpect from 'wait-for-expect' +import { TestModule } from '../../common/test.module' +import { createArbitraryFile, libp2pInstanceParams } from '../../common/utils' +import { IpfsModule } from '../../ipfs/ipfs.module' +import { IpfsService } from '../../ipfs/ipfs.service' +import { Libp2pModule } from '../../libp2p/libp2p.module' +import { Libp2pService } from '../../libp2p/libp2p.service' +import { SocketModule } from '../../socket/socket.module' +import { StorageModule } from '../storage.module' +import { StorageService } from '../storage.service' +import fs from 'fs' +import { type FactoryGirl } from 'factory-girl' +import { fileURLToPath } from 'url' +import { LocalDbModule } from '../../local-db/local-db.module' +import { LocalDbService } from '../../local-db/local-db.service' +import { createLogger } from '../../common/logger' +import { ChannelsService } from './channels.service' + +const logger = createLogger('channelsService:test') + +const filename = fileURLToPath(import.meta.url) +const dirname = path.dirname(filename) + +describe('ChannelsService', () => { + let module: TestingModule + let storageService: StorageService + let ipfsService: IpfsService + let libp2pService: Libp2pService + let localDbService: LocalDbService + let channelsService: ChannelsService + let peerId: PeerId + + let store: Store + let factory: FactoryGirl + let community: Community + let channel: PublicChannel + let alice: Identity + let john: Identity + let message: ChannelMessage + let channelio: PublicChannel + let filePath: string + + jest.setTimeout(50000) + + beforeAll(async () => { + store = prepareStore().store + factory = await getFactory(store) + + community = await factory.create('Community') + + channel = publicChannels.selectors.publicChannels(store.getState())[0] + + channelio = { + name: channel.name, + description: channel.description, + owner: channel.owner, + timestamp: channel.timestamp, + id: channel.id, + } + + alice = await factory.create('Identity', { id: community.id, nickname: 'alice' }) + + john = await factory.create('Identity', { id: community.id, nickname: 'john' }) + + message = ( + await factory.create('Message', { + identity: alice, + message: generateMessageFactoryContentWithId(channel.id), + }) + ).message + }) + + beforeEach(async () => { + jest.clearAllMocks() + filePath = path.join(dirname, '/500kB-file.txt') + + module = await Test.createTestingModule({ + imports: [TestModule, StorageModule, IpfsModule, SocketModule, Libp2pModule, LocalDbModule], + }).compile() + + storageService = await module.resolve(StorageService) + channelsService = await module.resolve(ChannelsService) + localDbService = await module.resolve(LocalDbService) + libp2pService = await module.resolve(Libp2pService) + ipfsService = await module.resolve(IpfsService) + + const params = await libp2pInstanceParams() + peerId = params.peerId.peerId + + await libp2pService.createInstance(params) + expect(libp2pService.libp2pInstance).not.toBeNull() + + await localDbService.open() + expect(localDbService.getStatus()).toEqual('open') + + await localDbService.setCommunity(community) + await localDbService.setCurrentCommunityId(community.id) + + await storageService.init(peerId) + }) + + afterEach(async () => { + await libp2pService.libp2pInstance?.stop() + await ipfsService.ipfsInstance?.stop() + await storageService.stop() + if (fs.existsSync(filePath)) { + fs.rmSync(filePath) + } + await module.close() + }) + + describe('Channels', () => { + it('deletes channel as owner', async () => { + await channelsService.subscribeToChannel(channelio) + + const result = await channelsService.deleteChannel({ channelId: channelio.id, ownerPeerId: peerId.toString() }) + expect(result).toEqual({ channelId: channelio.id }) + + const channelFromKeyValueStore = (await channelsService.getChannels()).filter(x => x.id === channelio.id) + expect(channelFromKeyValueStore).toEqual([]) + }) + + it('delete channel as standard user', async () => { + await channelsService.subscribeToChannel(channelio) + + const result = await channelsService.deleteChannel({ channelId: channelio.id, ownerPeerId: 'random peer id' }) + expect(result).toEqual({ channelId: channelio.id }) + + const channelFromKeyValueStore = (await channelsService.getChannels()).filter(x => x.id === channelio.id) + expect(channelFromKeyValueStore).toEqual([channelio]) + }) + }) + + describe('Message access controller', () => { + it('is saved to db if passed signature verification', async () => { + await channelsService.subscribeToChannel(channelio) + + const publicChannelRepo = channelsService.publicChannelsRepos.get(message.channelId) + expect(publicChannelRepo).not.toBeUndefined() + const store = publicChannelRepo!.store + const eventSpy = jest.spyOn(store, 'addEntry') + + const messageCopy = { + ...message, + } + delete messageCopy.media + + await channelsService.sendMessage(messageCopy) + + // Confirm message has passed orbitdb validator (check signature verification only) + expect(eventSpy).toHaveBeenCalled() + const savedMessages = await channelsService.getMessages(channelio.id) + expect(savedMessages?.messages.length).toBe(1) + expect(savedMessages?.messages[0]).toEqual({ ...messageCopy, verified: true }) + }) + + it('is not saved to db if did not pass signature verification', async () => { + const aliceMessage = await factory.create['payload']>( + 'Message', + { + identity: alice, + message: generateMessageFactoryContentWithId(channel.id), + } + ) + // @ts-expect-error userCertificate can be undefined + const johnCertificate: string = john.userCertificate + const johnPublicKey = keyFromCertificate(parseCertificate(johnCertificate)) + + const spoofedMessage = { + ...aliceMessage.message, + channelId: channelio.id, + pubKey: johnPublicKey, + } + delete spoofedMessage.media // Media 'undefined' is not accepted by db.add + + await channelsService.subscribeToChannel(channelio) + + const publicChannelRepo = channelsService.publicChannelsRepos.get(message.channelId) + expect(publicChannelRepo).not.toBeUndefined() + const store = publicChannelRepo!.store + const eventSpy = jest.spyOn(store, 'addEntry') + + await channelsService.sendMessage(spoofedMessage) + + // Confirm message has passed orbitdb validator (check signature verification only) + expect(eventSpy).toHaveBeenCalled() + expect((await channelsService.getMessages(channelio.id))?.messages.length).toBe(0) + }) + }) + + describe('Files deletion', () => { + let realFilePath: string + let messages: { + messages: Record + } + + beforeEach(async () => { + realFilePath = path.join(dirname, '/real-file.txt') + await createArbitraryFile(realFilePath, 2147483) + + const metadata: FileMetadata = { + path: realFilePath, + name: 'test-large-file', + ext: '.txt', + cid: 'uploading_id', + message: { + id: 'id', + channelId: channel.id, + }, + } + + const aliceMessage = await factory.create['payload']>( + 'Message', + { + identity: alice, + message: generateMessageFactoryContentWithId(channel.id, MessageType.File, metadata), + } + ) + + messages = { + messages: { + [aliceMessage.message.id]: aliceMessage.message, + }, + } + }) + + afterEach(() => { + if (fs.existsSync(realFilePath)) { + fs.rmSync(realFilePath) + } + }) + + it('delete file correctly', async () => { + console.warn(fs.existsSync(realFilePath)) + const isFileExist = await channelsService.checkIfFileExist(realFilePath) + expect(isFileExist).toBeTruthy() + + await expect(channelsService.deleteFilesFromChannel(messages)).resolves.not.toThrowError() + + await waitForExpect(async () => { + expect(await channelsService.checkIfFileExist(realFilePath)).toBeFalsy() + }, 2000) + }) + + it('file dont exist - not throw error', async () => { + fs.rmSync(realFilePath) + + await waitForExpect(async () => { + expect(await channelsService.checkIfFileExist(realFilePath)).toBeFalsy() + }, 2000) + + await expect(channelsService.deleteFilesFromChannel(messages)).resolves.not.toThrowError() + }) + }) +}) diff --git a/packages/backend/src/nest/storage/channels/channels.service.ts b/packages/backend/src/nest/storage/channels/channels.service.ts new file mode 100644 index 000000000..824742f6f --- /dev/null +++ b/packages/backend/src/nest/storage/channels/channels.service.ts @@ -0,0 +1,545 @@ +import { Inject, Injectable } from '@nestjs/common' +import { type KeyValueType, IPFSAccessController, type LogEntry } from '@orbitdb/core' +import { EventEmitter } from 'events' +import { type PeerId } from '@libp2p/interface' +import { + ChannelMessage, + ConnectionProcessInfo, + type CreateChannelResponse, + DeleteFilesFromChannelSocketPayload, + FileMetadata, + type MessagesLoadedPayload, + PublicChannel, + PushNotificationPayload, + SocketActionTypes, + ChannelMessageIdsResponse, + DeleteChannelResponse, +} from '@quiet/types' +import fs from 'fs' +import { IpfsFileManagerService } from '../../ipfs-file-manager/ipfs-file-manager.service' +import { IPFS_REPO_PATCH, ORBIT_DB_DIR, QUIET_DIR } from '../../const' +import { IpfsFilesManagerEvents } from '../../ipfs-file-manager/ipfs-file-manager.types' +import { createLogger } from '../../common/logger' +import { PublicChannelsRepo } from '../../common/types' +import { StorageEvents } from '../storage.types' +import { OrbitDbService } from '../orbitDb/orbitDb.service' +import { KeyValueIndexedValidated } from '../orbitDb/keyValueIndexedValidated' +import { ChannelStore } from './channel.store' +import { createContextId, ModuleRef } from '@nestjs/core' + +/** + * Manages storage-level logic for all channels in Quiet + */ +@Injectable() +export class ChannelsService extends EventEmitter { + private peerId: PeerId | null = null + public publicChannelsRepos: Map = new Map() + + private channels: KeyValueType | null + + private readonly logger = createLogger(`storage:channels`) + + constructor( + @Inject(QUIET_DIR) public readonly quietDir: string, + @Inject(ORBIT_DB_DIR) public readonly orbitDbDir: string, + @Inject(IPFS_REPO_PATCH) public readonly ipfsRepoPath: string, + private readonly filesManager: IpfsFileManagerService, + private readonly orbitDbService: OrbitDbService, + private readonly moduleRef: ModuleRef + ) { + super() + } + + // Initialization + + /** + * Initialize the ChannelsService by starting event handles, the file manager, and initializing databases in OrbitDB + * + * @param peerId Peer ID of the current user + */ + public async init(peerId: PeerId): Promise { + this.logger.info(`Initializing ${ChannelsService.name}`) + this.peerId = peerId + + this.logger.info(`Starting file manager`) + this.attachFileManagerEvents() + await this.filesManager.init() + + this.logger.info(`Initializing Databases`) + await this.initChannels() + + this.logger.info(`Initialized ${ChannelsService.name}`) + } + + /** + * Initialize the channels management database and individual channel stores in OrbitDB + */ + public async initChannels(): Promise { + this.logger.time(`Initializing channel databases`) + + await this.createChannelsDb() + await this.loadAllChannels() + + this.logger.timeEnd('Initializing channel databases') + this.logger.info('Initialized databases') + } + + /** + * Start syncing the channels management database in OrbitDB + */ + public async startSync(): Promise { + await this.channels?.sync.start() + } + + // Channels Database Management + + /** + * Create the channels management database in OrbitDB + * + * NOTE: This also subscribes to all known channel stores and handles update events on the channels management database for + * subscribing to newly created channel stores. + */ + private async createChannelsDb(): Promise { + this.logger.info('Creating public-channels database') + this.channels = await this.orbitDbService.orbitDb.open>('public-channels', { + sync: false, + Database: KeyValueIndexedValidated(), + AccessController: IPFSAccessController({ write: ['*'] }), + }) + + this.channels.events.on('update', async (entry: LogEntry) => { + const channelId = entry.payload.key + const operation = entry.payload.op + this.logger.info('public-channels database updated', channelId, operation) + + this.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, ConnectionProcessInfo.CHANNELS_STORED) + + const channels = await this.getChannels() + + this.emit(StorageEvents.CHANNELS_STORED, { channels }) + + if (operation === 'PUT') { + const channel = entry.payload.value as PublicChannel + await this.subscribeToChannel(channel) + } + }) + + const channels = await this.getChannels() + this.logger.info('Channels count:', channels.length) + this.logger.info( + 'Channels names:', + channels.map(x => x.name) + ) + for (const channel of channels.values()) { + await this.subscribeToChannel(channel) + } + } + + /** + * Add a channel to the channels management database + * + * @param id ID of channel to add to the channels database + * @param channel Channel configuration metadata + * @throws Error + */ + public async setChannel(id: string, channel: PublicChannel): Promise { + if (!this.channels) { + throw new Error('Channels have not been initialized!') + } + await this.channels.put(id, channel) + } + + /** + * Read channel metadata by ID from the channels management database + * + * @param id ID of channel to fetch + * @returns Channel metadata, if it exists + * @throws Error + */ + public async getChannel(id: string): Promise { + if (!this.channels) { + throw new Error('Channels have not been initialized!') + } + return await this.channels.get(id) + } + + /** + * Read entries for all keys in the channels management database + * + * @returns All channel metadata in the channels management database + * @throws Error + */ + public async getChannels(): Promise { + if (!this.channels) { + throw new Error('Channels have not been initialized!') + } + return (await this.channels.all()).map(x => x.value) + } + + /** + * Get all known channels and emit event with metadata + * + * @emits StorageEvents.CHANNELS_STORED + */ + public async loadAllChannels(): Promise { + this.logger.info('Getting all channels') + this.emit(StorageEvents.CHANNELS_STORED, { + channels: await this.getChannels(), + }) + } + + // Channel Management + + /** + * Create a new ChannelStore and, optionally, add the metadata to the channels management database + * + * @param channelData Channel metadata for new channel + * @returns Newly created ChannelStore + */ + private async createChannel(channelData: PublicChannel): Promise { + this.logger.info(`Creating channel`, channelData.id, channelData.name) + + const channelId = channelData.id + const store = await this.createChannelStore(channelData) + + const channel = await this.getChannel(channelId) + if (channel == undefined) { + await this.setChannel(channelId, channelData) + } else { + this.logger.info(`Channel ${channelId} already exists`) + } + + this.publicChannelsRepos.set(channelId, { store, eventsAttached: false }) + this.logger.info(`Set ${channelId} to local channels`) + this.logger.info(`Created channel ${channelId}`) + + return store + } + + /** + * Helper method for creating and initializing ChannelStore + * + * @param channelData Channel metadata for new channel + * @returns Newly created ChannelStore + */ + private async createChannelStore(channelData: PublicChannel): Promise { + const store = await this.moduleRef.create(ChannelStore, createContextId()) + return await store.init(channelData, { sync: false }) + } + + /** + * Creates a new channel store with the supplied metadata, if it doesn't exist, and subscribes + * to new events on the store, if it didn't already exist. + * + * NOTE: Storage events like MESSAGE_IDS_STORED are consumed up the chain on this service but are + * emitted on the ChannelStore instances so we consume and re-emit them on this service's event + * emitter. + * + * @param channelData Channel metadata for channel we are subscribing to + * @returns CreateChannelResponse + * @emits StorageEvents.CHANNEL_SUBSCRIBED + */ + public async subscribeToChannel(channelData: PublicChannel): Promise { + let store: ChannelStore + // @ts-ignore + if (channelData.address) { + // @ts-ignore + channelData.id = channelData.address + } + let repo = this.publicChannelsRepos.get(channelData.id) + + if (repo) { + store = repo.store + } else { + try { + store = await this.createChannel(channelData) + } catch (e) { + this.logger.error(`Can't subscribe to channel ${channelData.id}`, e) + return + } + if (!store) { + this.logger.error(`Can't subscribe to channel ${channelData.id}, the DB isn't initialized!`) + return + } + repo = this.publicChannelsRepos.get(channelData.id) + } + + if (repo && !repo.eventsAttached && !repo.store.isSubscribing) { + this.handleMessageEventsOnChannelStore(channelData.id, repo) + await repo.store.subscribe() + repo.eventsAttached = true + } + + this.logger.info(`Subscribed to channel ${channelData.id}`) + this.emit(StorageEvents.CHANNEL_SUBSCRIBED, { + channelId: channelData.id, + }) + return { channel: channelData } + } + + /** + * Capture events emitted by individual channel stores and re-emit on the channels service + * + * @param channelId ID of channel to re-emit events from + * @param repo Repo containing the store we are re-emitting events from + * @emits StorageEvents.MESSAGE_IDS_STORED + * @emits StorageEvents.MESSAGES_STORED + * @emits StorageEvents.SEND_PUSH_NOTIFICATION + */ + private handleMessageEventsOnChannelStore(channelId: string, repo: PublicChannelsRepo): void { + this.logger.info(`Subscribing to channel updates`, channelId) + repo.store.on(StorageEvents.MESSAGE_IDS_STORED, (payload: ChannelMessageIdsResponse) => { + this.emit(StorageEvents.MESSAGE_IDS_STORED, payload) + }) + + repo.store.on(StorageEvents.MESSAGES_STORED, (payload: MessagesLoadedPayload) => { + this.emit(StorageEvents.MESSAGES_STORED, payload) + }) + + repo.store.on(StorageEvents.SEND_PUSH_NOTIFICATION, (payload: PushNotificationPayload) => { + this.emit(StorageEvents.SEND_PUSH_NOTIFICATION, payload) + }) + } + + /** + * Get the store for a given channel ID and, optionally, create a temporary store if it doesn't exist then drop + * the database from OrbitDB + * + * @param payload Metadata on the channel to be deleted + * @returns Response containing metadata on the channel that was deleted + * @throws Error + */ + public async deleteChannel(payload: { channelId: string; ownerPeerId: string }): Promise { + this.logger.info('Deleting channel', payload) + const { channelId, ownerPeerId } = payload + const channel = await this.getChannel(channelId) + if (!this.peerId) { + this.logger.error('deleteChannel - peerId is null') + throw new Error('deleteChannel - peerId is null') + } + const isOwner = ownerPeerId === this.peerId.toString() + if (channel && isOwner) { + if (!this.channels) { + throw new Error('Channels have not been initialized!') + } + await this.channels.del(channelId) + } + const repo = this.publicChannelsRepos.get(channelId) + let store = repo?.store + if (store == null) { + const channelData: PublicChannel = channel ?? { + id: channelId, + name: 'undefined', + owner: ownerPeerId, + description: 'undefined', + timestamp: 0, + } + store = await this.createChannelStore(channelData) + } + await store.deleteChannel() + this.publicChannelsRepos.delete(channelId) + return { channelId } + } + + // Messages + + /** + * Sends a message on a given channel if that channel is known + * + * @param message Message to send + */ + public async sendMessage(message: ChannelMessage): Promise { + const repo = this.publicChannelsRepos.get(message.channelId) + if (repo == null) { + this.logger.error(`Could not send message. No '${message.channelId}' channel in saved public channels`) + return + } + + await repo.store.sendMessage(message) + } + + /** + * Read messages for a list of message IDs from a given channel if that channel is known + * + * @param channelId ID of channel to read messages from + * @param ids IDS of messages to read + * @returns Payload containing messages read + */ + public async getMessages( + channelId: string, + messageIds: string[] | undefined = undefined + ): Promise { + const repo = this.publicChannelsRepos.get(channelId) + if (repo == null) { + this.logger.error(`Could not read messages. No '${channelId}' channel in saved public channels`) + return + } + + return await repo.store.getMessages(messageIds) + } + + // Files + + /** + * Delete multiple files from the file manager + * + * @param files List of file metadata to be deleted + */ + public async deleteChannelFiles(files: FileMetadata[]): Promise { + for (const file of files) { + await this.deleteFile(file) + } + } + + /** + * Deleted a single file from the file manager + * + * @param fileMetadata Metadata of file to be deleted + */ + public async deleteFile(fileMetadata: FileMetadata): Promise { + await this.filesManager.deleteBlocks(fileMetadata) + } + + /** + * Consume file manager events and emit storage events on the channels service + * + * @emits StorageEvents.DOWNLOAD_PROGRESS + * @emits StorageEvents.MESSAGE_MEDIA_UPDATED + * @emits StorageEvents.REMOVE_DOWNLOAD_STATUS + * @emits StorageEvents.FILE_UPLOADED + * @emits StorageEvents.DOWNLOAD_PROGRESS + */ + private attachFileManagerEvents(): void { + this.filesManager.on(IpfsFilesManagerEvents.DOWNLOAD_PROGRESS, status => { + this.emit(StorageEvents.DOWNLOAD_PROGRESS, status) + }) + this.filesManager.on(IpfsFilesManagerEvents.MESSAGE_MEDIA_UPDATED, messageMedia => { + this.emit(StorageEvents.MESSAGE_MEDIA_UPDATED, messageMedia) + }) + this.filesManager.on(StorageEvents.REMOVE_DOWNLOAD_STATUS, payload => { + this.emit(StorageEvents.REMOVE_DOWNLOAD_STATUS, payload) + }) + this.filesManager.on(StorageEvents.FILE_UPLOADED, payload => { + this.emit(StorageEvents.FILE_UPLOADED, payload) + }) + this.filesManager.on(StorageEvents.DOWNLOAD_PROGRESS, payload => { + this.emit(StorageEvents.DOWNLOAD_PROGRESS, payload) + }) + this.filesManager.on(StorageEvents.MESSAGE_MEDIA_UPDATED, payload => { + this.emit(StorageEvents.MESSAGE_MEDIA_UPDATED, payload) + }) + } + + /** + * Emit event to trigger file upload on file manager + * + * @param metadata Metadata of file to be uploaded + * @emits IpfsFilesManagerEvents.UPLOAD_FILE + */ + public async uploadFile(metadata: FileMetadata): Promise { + this.filesManager.emit(IpfsFilesManagerEvents.UPLOAD_FILE, metadata) + } + + /** + * Emit event to trigger file download on file manager + * + * @param metadata Metadata of file to be downloaded + * @emits IpfsFilesManagerEvents.DOWNLOAD_FILE + */ + public async downloadFile(metadata: FileMetadata): Promise { + this.filesManager.emit(IpfsFilesManagerEvents.DOWNLOAD_FILE, metadata) + } + + /** + * Emit event to trigger file download cancellation on file manager + * + * @param metadata Metadata of file to be cancelled + * @emits IpfsFilesManagerEvents.CANCEL_DOWNLOAD + */ + public cancelDownload(mid: string): void { + this.filesManager.emit(IpfsFilesManagerEvents.CANCEL_DOWNLOAD, mid) + } + + /** + * Delete files for a list of messages + * + * @param payload Payload containing file messages whose files should be deleted + */ + public async deleteFilesFromChannel(payload: DeleteFilesFromChannelSocketPayload): Promise { + const { messages } = payload + Object.keys(messages).map(async key => { + const message = messages[key] + if (message?.media?.path) { + const mediaPath = message.media.path + this.logger.info('deleteFilesFromChannel : mediaPath', mediaPath) + const isFileExist = await this.checkIfFileExist(mediaPath) + this.logger.info(`deleteFilesFromChannel : isFileExist- ${isFileExist}`) + if (isFileExist) { + fs.unlink(mediaPath, unlinkError => { + if (unlinkError) { + this.logger.error(`deleteFilesFromChannel : unlink error`, unlinkError) + } + }) + } else { + this.logger.error(`deleteFilesFromChannel : file does not exist`, mediaPath) + } + } + }) + } + + /** + * Check if the file with the supplied path exists on the file system + * + * @param filePath Path to file + * @returns True if file exists at the path + */ + public async checkIfFileExist(filePath: string): Promise { + this.logger.info(`Checking if ${filePath} exists`) + return fs.existsSync(filePath) + } + + // Close Logic + + /** + * Close the channels management database on OrbitDB + */ + public async closeChannels(): Promise { + try { + this.logger.info('Closing channels DB') + await this.channels?.close() + this.logger.info('Closed channels DB') + } catch (e) { + this.logger.error('Error closing channels db', e) + } + } + + /** + * Stop the file manager + */ + public async closeFileManager(): Promise { + try { + this.logger.info('Stopping IPFS files manager') + await this.filesManager.stop() + } catch (e) { + this.logger.error('Error stopping IPFS files manager', e) + } + } + + /** + * Clean the ChannelsService + * + * NOTE: Does NOT affect data stored in IPFS + */ + public async clean(): Promise { + this.peerId = null + + // @ts-ignore + this.channels = undefined + // @ts-ignore + this.messageThreads = undefined + // @ts-ignore + this.publicChannelsRepos = new Map() + + this.channels = null + } +} diff --git a/packages/backend/src/nest/storage/channels/messages/messages.service.spec.ts b/packages/backend/src/nest/storage/channels/messages/messages.service.spec.ts new file mode 100644 index 000000000..e6feb6d3b --- /dev/null +++ b/packages/backend/src/nest/storage/channels/messages/messages.service.spec.ts @@ -0,0 +1,110 @@ +import { jest } from '@jest/globals' + +import { Test, TestingModule } from '@nestjs/testing' +import { keyFromCertificate, parseCertificate } from '@quiet/identity' +import { + generateMessageFactoryContentWithId, + getFactory, + prepareStore, + publicChannels, + Store, +} from '@quiet/state-manager' +import { ChannelMessage, Community, Identity, PublicChannel, TestMessage } from '@quiet/types' +import { FactoryGirl } from 'factory-girl' +import { SigChainService } from '../../../auth/sigchain.service' +import { createLogger } from '../../../common/logger' +import { TestModule } from '../../../common/test.module' +import { StorageModule } from '../../storage.module' +import { MessagesService } from './messages.service' + +const logger = createLogger('messagesService:test') + +describe('MessagesService', () => { + let module: TestingModule + let messagesService: MessagesService + let sigChainService: SigChainService + + let store: Store + let factory: FactoryGirl + let alice: Identity + let john: Identity + let community: Community + let channel: PublicChannel + let message: ChannelMessage + + beforeAll(async () => { + store = prepareStore().store + factory = await getFactory(store) + + community = await factory.create('Community') + channel = publicChannels.selectors.publicChannels(store.getState())[0] + alice = await factory.create('Identity', { id: community.id, nickname: 'alice' }) + john = await factory.create('Identity', { id: community.id, nickname: 'john' }) + message = ( + await factory.create('Message', { + identity: alice, + message: generateMessageFactoryContentWithId(channel.id), + }) + ).message + }) + + beforeEach(async () => { + jest.clearAllMocks() + + module = await Test.createTestingModule({ + imports: [TestModule, StorageModule], + }).compile() + + sigChainService = await module.resolve(SigChainService) + messagesService = await module.resolve(MessagesService) + }) + + describe('verifyMessage', () => { + it('message with valid signature is verified', async () => { + expect(await messagesService.verifyMessage(message)).toBeTruthy() + }) + + it('message with invalid signature is not verified', async () => { + expect( + await messagesService.verifyMessage({ + ...message, + pubKey: keyFromCertificate(parseCertificate(john.userCertificate!)), + }) + ).toBeFalsy() + }) + }) + + // TODO: https://github.com/TryQuiet/quiet/issues/2631 + describe('onSend', () => { + it('does nothing but return the message as-is', async () => { + expect(await messagesService.onSend(message)).toEqual(message) + }) + }) + + // TODO: https://github.com/TryQuiet/quiet/issues/2632 + describe('onConsume', () => { + it('runs verifyMessage when verify === true', async () => { + expect(await messagesService.onConsume(message, true)).toEqual({ + ...message, + verified: true, + }) + }) + + it('skips verifyMessage when verify === false', async () => { + const fakePubKey = keyFromCertificate(parseCertificate(john.userCertificate!)) + expect( + await messagesService.onConsume( + { + ...message, + pubKey: fakePubKey, + }, + false + ) + ).toEqual({ + ...message, + pubKey: fakePubKey, + verified: true, + }) + }) + }) +}) diff --git a/packages/backend/src/nest/storage/channels/messages/messages.service.ts b/packages/backend/src/nest/storage/channels/messages/messages.service.ts new file mode 100644 index 000000000..0bb06b095 --- /dev/null +++ b/packages/backend/src/nest/storage/channels/messages/messages.service.ts @@ -0,0 +1,111 @@ +import { Injectable } from '@nestjs/common' +import { stringToArrayBuffer } from 'pvutils' +import EventEmitter from 'events' +import { getCrypto, ICryptoEngine } from 'pkijs' + +import { keyObjectFromString, verifySignature } from '@quiet/identity' +import { ChannelMessage, ConsumedChannelMessage, NoCryptoEngineError } from '@quiet/types' + +import { createLogger } from '../../../common/logger' +import { EncryptedAndSignedPayload, EncryptedPayload } from '../../../auth/services/crypto/types' +import { SignedEnvelope } from '3rd-party/auth/packages/auth/dist' +import { SigChainService } from '../../../auth/sigchain.service' + +@Injectable() +export class MessagesService extends EventEmitter { + /** + * Map of signing keys used on messages + * + * Maps public key string -> CryptoKey + */ + private publicKeysMap: Map = new Map() + + private readonly logger = createLogger(`storage:channels:messagesService`) + + constructor(private readonly sigChainService: SigChainService) { + super() + } + + /** + * Handle processing of message to be added to OrbitDB and sent to peers + * + * NOTE: This will call the encryption method below (https://github.com/TryQuiet/quiet/issues/2631) + * + * @param message Message to send + * @returns Processed message + */ + public async onSend(message: ChannelMessage): Promise { + return message + } + + /** + * Handle processing of message consumed from OrbitDB + * + * NOTE: This will call the decryption method below (https://github.com/TryQuiet/quiet/issues/2632) + * + * @param message Message consumed from OrbitDB + * @returns Processed message + */ + public async onConsume(message: ChannelMessage, verify: boolean = true): Promise { + const verified = verify ? await this.verifyMessage(message) : true + return { + ...message, + verified, + } + } + + /** + * Verify signature on message + * + * @param message Message to verify + * @returns True if message is valid + */ + public async verifyMessage(message: ChannelMessage): Promise { + const crypto = this.getCrypto() + const signature = stringToArrayBuffer(message.signature) + let cryptoKey = this.publicKeysMap.get(message.pubKey) + + if (!cryptoKey) { + cryptoKey = await keyObjectFromString(message.pubKey, crypto) + this.publicKeysMap.set(message.pubKey, cryptoKey) + } + + return await verifySignature(signature, message.message, cryptoKey) + } + + // TODO: https://github.com/TryQuiet/quiet/issues/2631 + // NOTE: the signature here may not be correct + private async encryptMessage(message: ChannelMessage): Promise { + throw new Error(`MessagesService.encryptMessage is not implemented!`) + } + + // TODO: https://github.com/TryQuiet/quiet/issues/2632 + // NOTE: the signature here may not be correct + private async decryptMessage(encrypted: EncryptedPayload, signature: SignedEnvelope): Promise { + throw new Error(`MessagesService.decryptMessage is not implemented!`) + } + + /** + * Get crypto engine that was initialized previously + * + * @returns Crypto engine + * @throws NoCryptoEngineError + */ + private getCrypto(): ICryptoEngine { + const crypto = getCrypto() + if (crypto == null) { + throw new NoCryptoEngineError() + } + + return crypto + } + + /** + * Clean service + * + * NOTE: Does NOT affect data stored in IPFS + */ + public async clean(): Promise { + this.publicKeysMap = new Map() + } +} diff --git a/packages/backend/src/nest/storage/storage.module.ts b/packages/backend/src/nest/storage/storage.module.ts index 78eb7cb09..21e1f7b37 100644 --- a/packages/backend/src/nest/storage/storage.module.ts +++ b/packages/backend/src/nest/storage/storage.module.ts @@ -8,9 +8,12 @@ import { CertificatesStore } from './certificates/certificates.store' import { CommunityMetadataStore } from './communityMetadata/communityMetadata.store' import { UserProfileStore } from './userProfile/userProfile.store' import { IpfsModule } from '../ipfs/ipfs.module' +import { ChannelsService } from './channels/channels.service' +import { MessagesService } from './channels/messages/messages.service' +import { SigChainModule } from '../auth/sigchain.service.module' @Module({ - imports: [LocalDbModule, IpfsModule, IpfsFileManagerModule], + imports: [LocalDbModule, IpfsModule, IpfsFileManagerModule, SigChainModule], providers: [ StorageService, OrbitDbService, @@ -18,6 +21,8 @@ import { IpfsModule } from '../ipfs/ipfs.module' CommunityMetadataStore, CertificatesRequestsStore, UserProfileStore, + ChannelsService, + MessagesService, ], exports: [StorageService], }) diff --git a/packages/backend/src/nest/storage/storage.service.spec.ts b/packages/backend/src/nest/storage/storage.service.spec.ts index 91622ea72..522a075c0 100644 --- a/packages/backend/src/nest/storage/storage.service.spec.ts +++ b/packages/backend/src/nest/storage/storage.service.spec.ts @@ -1,7 +1,6 @@ import { jest } from '@jest/globals' import { Test, TestingModule } from '@nestjs/testing' -import { keyFromCertificate, parseCertificate } from '@quiet/identity' import { prepareStore, getFactory, @@ -9,21 +8,12 @@ import { generateMessageFactoryContentWithId, Store, } from '@quiet/state-manager' -import { - ChannelMessage, - Community, - FileMetadata, - Identity, - MessageType, - PublicChannel, - TestMessage, -} from '@quiet/types' +import { ChannelMessage, Community, Identity, PublicChannel, TestMessage } from '@quiet/types' import path from 'path' import { type PeerId } from '@libp2p/interface' -import waitForExpect from 'wait-for-expect' import { TestModule } from '../common/test.module' -import { createArbitraryFile, libp2pInstanceParams } from '../common/utils' +import { libp2pInstanceParams } from '../common/utils' import { IpfsModule } from '../ipfs/ipfs.module' import { IpfsService } from '../ipfs/ipfs.service' import { Libp2pModule } from '../libp2p/libp2p.module' @@ -165,95 +155,6 @@ describe('StorageService', () => { }) }) - describe('Channels', () => { - it('deletes channel as owner', async () => { - await storageService.init(peerId) - await storageService.subscribeToChannel(channelio) - - const result = await storageService.deleteChannel({ channelId: channelio.id, ownerPeerId: peerId.toString() }) - expect(result).toEqual({ channelId: channelio.id }) - - const channelFromKeyValueStore = (await storageService.getChannels()).filter(x => x.id === channelio.id) - expect(channelFromKeyValueStore).toEqual([]) - }) - - it('delete channel as standard user', async () => { - await storageService.init(peerId) - await storageService.subscribeToChannel(channelio) - - const result = await storageService.deleteChannel({ channelId: channelio.id, ownerPeerId: 'random peer id' }) - expect(result).toEqual({ channelId: channelio.id }) - - const channelFromKeyValueStore = (await storageService.getChannels()).filter(x => x.id === channelio.id) - expect(channelFromKeyValueStore).toEqual([channelio]) - }) - }) - - describe('Message access controller', () => { - it('is saved to db if passed signature verification', async () => { - await storageService.init(peerId) - - await storageService.subscribeToChannel(channelio) - - const publicChannelRepo = storageService.publicChannelsRepos.get(message.channelId) - expect(publicChannelRepo).not.toBeUndefined() - // @ts-expect-error - const db = publicChannelRepo.db - const eventSpy = jest.spyOn(db, 'add') - - const messageCopy = { - ...message, - } - delete messageCopy.media - - await storageService.sendMessage(messageCopy) - - // Confirm message has passed orbitdb validator (check signature verification only) - expect(eventSpy).toHaveBeenCalled() - // @ts-expect-error - const savedMessages = await storageService.getAllEventLogEntries(db) - expect(savedMessages.length).toBe(1) - expect(savedMessages[0]).toEqual(messageCopy) - }) - - it('is not saved to db if did not pass signature verification', async () => { - const aliceMessage = await factory.create['payload']>( - 'Message', - { - identity: alice, - message: generateMessageFactoryContentWithId(channel.id), - } - ) - // @ts-expect-error userCertificate can be undefined - const johnCertificate: string = john.userCertificate - const johnPublicKey = keyFromCertificate(parseCertificate(johnCertificate)) - - const spoofedMessage = { - ...aliceMessage.message, - channelId: channelio.id, - pubKey: johnPublicKey, - } - delete spoofedMessage.media // Media 'undefined' is not accepted by db.add - - await storageService.init(peerId) - - await storageService.subscribeToChannel(channelio) - - const publicChannelRepo = storageService.publicChannelsRepos.get(message.channelId) - expect(publicChannelRepo).not.toBeUndefined() - // @ts-expect-error - const db = publicChannelRepo.db - const eventSpy = jest.spyOn(db, 'add') - - await storageService.sendMessage(spoofedMessage) - - // Confirm message has passed orbitdb validator (check signature verification only) - expect(eventSpy).toHaveBeenCalled() - // @ts-expect-error getAllEventLogEntries is protected - expect((await storageService.getAllEventLogEntries(db)).length).toBe(0) - }) - }) - describe('Users', () => { it('gets all users from db', async () => { const expected = [ @@ -333,69 +234,4 @@ describe('StorageService', () => { expect(allUsers).toStrictEqual(expected) }) }) - - describe('Files deletion', () => { - let realFilePath: string - let messages: { - messages: Record - } - - beforeEach(async () => { - realFilePath = path.join(dirname, '/real-file.txt') - createArbitraryFile(realFilePath, 2147483) - await storageService.init(peerId) - - const metadata: FileMetadata = { - path: realFilePath, - name: 'test-large-file', - ext: '.txt', - cid: 'uploading_id', - message: { - id: 'id', - channelId: channel.id, - }, - } - - const aliceMessage = await factory.create['payload']>( - 'Message', - { - identity: alice, - message: generateMessageFactoryContentWithId(channel.id, MessageType.File, metadata), - } - ) - - messages = { - messages: { - [aliceMessage.message.id]: aliceMessage.message, - }, - } - }) - - afterEach(() => { - if (fs.existsSync(realFilePath)) { - fs.rmSync(realFilePath) - } - }) - - it('delete file correctly', async () => { - const isFileExist = await storageService.checkIfFileExist(realFilePath) - expect(isFileExist).toBeTruthy() - - await expect(storageService.deleteFilesFromChannel(messages)).resolves.not.toThrowError() - - await waitForExpect(async () => { - expect(await storageService.checkIfFileExist(realFilePath)).toBeFalsy() - }, 2000) - }) - - it('file dont exist - not throw error', async () => { - fs.rmSync(realFilePath) - - await waitForExpect(async () => { - expect(await storageService.checkIfFileExist(realFilePath)).toBeFalsy() - }, 2000) - - await expect(storageService.deleteFilesFromChannel(messages)).resolves.not.toThrowError() - }) - }) }) diff --git a/packages/backend/src/nest/storage/storage.service.ts b/packages/backend/src/nest/storage/storage.service.ts index 36d95ff2c..a4ac532a6 100644 --- a/packages/backend/src/nest/storage/storage.service.ts +++ b/packages/backend/src/nest/storage/storage.service.ts @@ -1,31 +1,17 @@ import { Inject, Injectable } from '@nestjs/common' import { CertFieldsTypes, - keyObjectFromString, - verifySignature, parseCertificate, parseCertificationRequest, getCertFieldValue, getReqFieldValue, keyFromCertificate, } from '@quiet/identity' -import { type KeyValueType, type EventsType, IPFSAccessController, type LogEntry } from '@orbitdb/core' import { EventEmitter } from 'events' import { type PeerId } from '@libp2p/interface' -import { getCrypto } from 'pkijs' -import { stringToArrayBuffer } from 'pvutils' -import validate from '../validation/validators' import { - ChannelMessage, CommunityMetadata, ConnectionProcessInfo, - type CreateChannelResponse, - DeleteFilesFromChannelSocketPayload, - FileMetadata, - type MessagesLoadedPayload, - NoCryptoEngineError, - PublicChannel, - PushNotificationPayload, SaveCSRPayload, SaveCertificatePayload, SocketActionTypes, @@ -36,33 +22,23 @@ import { } from '@quiet/types' import { createLibp2pAddress } from '@quiet/common' import fs from 'fs' -import { IpfsFileManagerService } from '../ipfs-file-manager/ipfs-file-manager.service' import { IPFS_REPO_PATCH, ORBIT_DB_DIR, QUIET_DIR } from '../const' -import { IpfsFilesManagerEvents } from '../ipfs-file-manager/ipfs-file-manager.types' import { LocalDbService } from '../local-db/local-db.service' import { createLogger } from '../common/logger' -import { PublicChannelsRepo } from '../common/types' import { removeFiles, removeDirs, createPaths } from '../common/utils' -import { DBOptions, StorageEvents } from './storage.types' +import { StorageEvents } from './storage.types' import { CertificatesStore } from './certificates/certificates.store' import { CertificatesRequestsStore } from './certifacteRequests/certificatesRequestsStore' import { IpfsService } from '../ipfs/ipfs.service' import { OrbitDbService } from './orbitDb/orbitDb.service' import { CommunityMetadataStore } from './communityMetadata/communityMetadata.store' import { UserProfileStore } from './userProfile/userProfile.store' -import { KeyValueIndexedValidated } from './orbitDb/keyValueIndexedValidated' -import { MessagesAccessController } from './orbitDb/MessagesAccessController' -import { EventsWithStorage } from './orbitDb/eventsWithStorage' import { LocalDBKeys } from '../local-db/local-db.types' +import { ChannelsService } from './channels/channels.service' @Injectable() export class StorageService extends EventEmitter { private peerId: PeerId | null = null - public publicChannelsRepos: Map = new Map() - private publicKeysMap: Map = new Map() - - private certificates: EventsType | null - private channels: KeyValueType | null private readonly logger = createLogger(StorageService.name) @@ -72,12 +48,12 @@ export class StorageService extends EventEmitter { @Inject(IPFS_REPO_PATCH) public readonly ipfsRepoPath: string, private readonly localDbService: LocalDbService, private readonly ipfsService: IpfsService, - private readonly filesManager: IpfsFileManagerService, private readonly orbitDbService: OrbitDbService, private readonly certificatesRequestsStore: CertificatesRequestsStore, private readonly certificatesStore: CertificatesStore, private readonly communityMetadataStore: CommunityMetadataStore, - private readonly userProfileStore: UserProfileStore + private readonly userProfileStore: UserProfileStore, + private readonly channelsService: ChannelsService ) { super() } @@ -105,10 +81,6 @@ export class StorageService extends EventEmitter { this.logger.info(`Creating OrbitDB service`) await this.orbitDbService.create(peerId, this.ipfsService.ipfsInstance!) - this.logger.info(`Starting file manager`) - this.attachFileManagerEvents() - await this.filesManager.init() - this.logger.info(`Initializing Databases`) await this.initDatabases() @@ -125,13 +97,10 @@ export class StorageService extends EventEmitter { } await this.communityMetadataStore.startSync() - await this.channels?.sync.start() await this.certificatesStore.startSync() await this.certificatesRequestsStore.startSync() await this.userProfileStore.startSync() - for (const channel of this.publicChannelsRepos.values()) { - await channel.db.sync.start() - } + await this.channelsService.startSync() } static dbAddress = (db: { root: string; path: string }) => { @@ -139,6 +108,13 @@ export class StorageService extends EventEmitter { return `/orbitdb/${db.root}/${db.path}` } + /** + * Get the ChannelsService for managing channels and messages + */ + public get channels() { + return this.channelsService + } + public async initDatabases() { this.logger.time('Storage.initDatabases') @@ -161,8 +137,7 @@ export class StorageService extends EventEmitter { await this.userProfileStore.init() this.logger.info('3/3') - await this.createDbForChannels() - await this.initAllChannels() + await this.channelsService.init(this.peerId!) this.logger.timeEnd('Storage.initDatabases') this.logger.info('Initialized DBs') @@ -171,13 +146,7 @@ export class StorageService extends EventEmitter { } public async stop() { - try { - this.logger.info('Closing channels DB') - await this.channels?.close() - this.logger.info('Closed channels DB') - } catch (e) { - this.logger.error('Error closing channels db', e) - } + await this.channelsService.closeChannels() try { await this.certificatesStore?.close() @@ -204,13 +173,7 @@ export class StorageService extends EventEmitter { } await this.orbitDbService.stop() - - this.logger.info('Stopping IPFS files manager') - try { - await this.filesManager.stop() - } catch (e) { - this.logger.error('Error stopping IPFS files manager', e) - } + await this.channelsService.closeFileManager() try { await this.ipfsService.stop() @@ -285,337 +248,6 @@ export class StorageService extends EventEmitter { return await this.certificatesStore.getEntries() } - public async setChannel(id: string, channel: PublicChannel) { - if (!this.channels) { - throw new Error('Channels have not been initialized!') - } - await this.channels.put(id, channel) - } - - public async getChannel(id: string) { - if (!this.channels) { - throw new Error('Channels have not been initialized!') - } - return await this.channels.get(id) - } - - public async getChannels(): Promise { - if (!this.channels) { - throw new Error('Channels have not been initialized!') - } - return (await this.channels.all()).map(x => x.value) - } - - public async loadAllChannels() { - this.logger.info('Getting all channels') - this.emit(StorageEvents.CHANNELS_STORED, { - channels: await this.getChannels(), - }) - } - - private async createDbForChannels() { - this.logger.info('Creating public-channels database') - this.channels = await this.orbitDbService.orbitDb.open>('public-channels', { - sync: false, - Database: KeyValueIndexedValidated(), - AccessController: IPFSAccessController({ write: ['*'] }), - }) - - this.channels.events.on('update', async (entry: LogEntry) => { - this.logger.info('public-channels database updated') - - this.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, ConnectionProcessInfo.CHANNELS_STORED) - - const channels = await this.getChannels() - - this.emit(StorageEvents.CHANNELS_STORED, { channels }) - - channels.forEach(channel => this.subscribeToChannel(channel, { replicate: true })) - }) - - const channels = await this.getChannels() - this.logger.info('Channels count:', channels.length) - this.logger.info( - 'Channels names:', - channels.map(x => x.name) - ) - channels.forEach(channel => this.subscribeToChannel(channel)) - } - - async initAllChannels() { - this.emit(StorageEvents.CHANNELS_STORED, { - channels: await this.getChannels(), - }) - } - - async verifyMessage(message: ChannelMessage): Promise { - const crypto = getCrypto() - if (!crypto) throw new NoCryptoEngineError() - - const signature = stringToArrayBuffer(message.signature) - let cryptoKey = this.publicKeysMap.get(message.pubKey) - - if (!cryptoKey) { - cryptoKey = await keyObjectFromString(message.pubKey, crypto) - this.publicKeysMap.set(message.pubKey, cryptoKey) - } - - return await verifySignature(signature, message.message, cryptoKey) - } - - protected async getAllEventLogEntries(db: EventsType): Promise { - const res: T[] = [] - - for await (const x of db.iterator()) { - res.push(x.value) - } - - return res - } - - public async subscribeToChannel( - channelData: PublicChannel, - options = { replicate: false } - ): Promise { - let db: EventsType - // @ts-ignore - if (channelData.address) { - // @ts-ignore - channelData.id = channelData.address - } - let repo = this.publicChannelsRepos.get(channelData.id) - - if (repo) { - db = repo.db - } else { - try { - db = await this.createChannel(channelData, options) - } catch (e) { - this.logger.error(`Can't subscribe to channel ${channelData.id}`, e) - return - } - if (!db) { - this.logger.error(`Can't subscribe to channel ${channelData.id}, the DB isn't initialized!`) - return - } - repo = this.publicChannelsRepos.get(channelData.id) - } - - if (repo && !repo.eventsAttached) { - this.logger.info('Subscribing to channel ', channelData.id) - - db.events.on('update', async (entry: LogEntry) => { - this.logger.info(`${channelData.id} database updated`, entry.hash, entry.payload.value?.channelId) - - const message = entry.payload.value! - const verified = await this.verifyMessage(message) - - this.emit(StorageEvents.MESSAGES_STORED, { - messages: [message], - isVerified: verified, - }) - - const ids = (await this.getAllEventLogEntries(db)).map(msg => msg.id) - const community = await this.localDbService.getCurrentCommunity() - - if (community) { - this.emit(StorageEvents.MESSAGE_IDS_STORED, { - ids, - channelId: channelData.id, - communityId: community.id, - }) - } - - // FIXME: the 'update' event runs if we replicate entries and if we add - // entries ourselves. So we may want to check if the message is written - // by us. - // - // Display push notifications on mobile - if (process.env.BACKEND === 'mobile') { - if (!verified) return - - // Do not notify about old messages - if (message.createdAt < parseInt(process.env.CONNECTION_TIME || '')) return - - const username = await this.certificatesStore.getCertificateUsername(message.pubKey) - if (!username) { - this.logger.error(`Can't send push notification, no username found for public key '${message.pubKey}'`) - return - } - - const payload: PushNotificationPayload = { - message: JSON.stringify(message), - username: username, - } - - this.emit(StorageEvents.SEND_PUSH_NOTIFICATION, payload) - } - }) - - const ids = (await this.getAllEventLogEntries(db)).map(msg => msg.id) - const community = await this.localDbService.getCurrentCommunity() - - if (community) { - this.emit(StorageEvents.MESSAGE_IDS_STORED, { - ids, - channelId: channelData.id, - communityId: community.id, - }) - } - - repo.eventsAttached = true - } - - this.logger.info(`Subscribed to channel ${channelData.id}`) - this.emit(StorageEvents.CHANNEL_SUBSCRIBED, { - channelId: channelData.id, - }) - return { channel: channelData } - } - - public async getMessages(channelId: string, ids: string[]): Promise { - const repo = this.publicChannelsRepos.get(channelId) - if (!repo) return - - const messages = await this.getAllEventLogEntries(repo.db) - const filteredMessages: ChannelMessage[] = [] - - for (const id of ids) { - filteredMessages.push(...messages.filter(i => i.id === id)) - } - - return { - messages: filteredMessages, - isVerified: true, - } - } - - private async createChannel(channelData: PublicChannel, options: DBOptions): Promise> { - if (!validate.isChannel(channelData)) { - this.logger.error('Invalid channel format') - throw new Error('Create channel validation error') - } - - this.logger.info(`Creating channel ${channelData.id}`) - - const channelId = channelData.id - const db = await this.orbitDbService.orbitDb.open>(`channels.${channelId}`, { - type: 'events', - Database: EventsWithStorage(), - AccessController: MessagesAccessController({ write: ['*'] }), - }) - const channel = await this.getChannel(channelId) - - if (channel === undefined) { - await this.setChannel(channelId, channelData) - } else { - this.logger.info(`Channel ${channelId} already exists`) - } - - this.publicChannelsRepos.set(channelId, { db, eventsAttached: false }) - this.logger.info(`Set ${channelId} to local channels`) - this.logger.info(`Created channel ${channelId}`) - - return db - } - - public async deleteChannel(payload: { channelId: string; ownerPeerId: string }) { - this.logger.info('deleting channel storage', payload) - const { channelId, ownerPeerId } = payload - const channel = await this.getChannel(channelId) - if (!this.peerId) { - this.logger.error('deleteChannel - peerId is null') - throw new Error('deleteChannel - peerId is null') - } - const isOwner = ownerPeerId === this.peerId.toString() - if (channel && isOwner) { - if (!this.channels) { - throw new Error('Channels have not been initialized!') - } - await this.channels.del(channelId) - } - let repo = this.publicChannelsRepos.get(channelId) - if (!repo) { - const db = await this.orbitDbService.orbitDb.open>(`channels.${channelId}`, { - sync: false, - type: 'events', - Database: EventsWithStorage(), - AccessController: MessagesAccessController({ write: ['*'] }), - }) - repo = { - db, - eventsAttached: false, - } - } - await repo.db.sync.stop() - await repo.db.drop() - this.publicChannelsRepos.delete(channelId) - return { channelId: payload.channelId } - } - - public async deleteChannelFiles(files: FileMetadata[]) { - for (const file of files) { - await this.deleteFile(file) - } - } - - public async deleteFile(fileMetadata: FileMetadata) { - await this.filesManager.deleteBlocks(fileMetadata) - } - - public async sendMessage(message: ChannelMessage) { - if (!validate.isMessage(message)) { - this.logger.error('STORAGE: public channel message is invalid') - return - } - const repo = this.publicChannelsRepos.get(message.channelId) - if (!repo) { - this.logger.error(`Could not send message. No '${message.channelId}' channel in saved public channels`) - return - } - try { - this.logger.info('Sending message:', message.id) - await repo.db.add(message) - } catch (e) { - this.logger.error( - `STORAGE: Could not append message (entry not allowed to write to the log). Details: ${e.message}` - ) - } - } - - private attachFileManagerEvents = () => { - this.filesManager.on(IpfsFilesManagerEvents.DOWNLOAD_PROGRESS, status => { - this.emit(StorageEvents.DOWNLOAD_PROGRESS, status) - }) - this.filesManager.on(IpfsFilesManagerEvents.MESSAGE_MEDIA_UPDATED, messageMedia => { - this.emit(StorageEvents.MESSAGE_MEDIA_UPDATED, messageMedia) - }) - this.filesManager.on(StorageEvents.REMOVE_DOWNLOAD_STATUS, payload => { - this.emit(StorageEvents.REMOVE_DOWNLOAD_STATUS, payload) - }) - this.filesManager.on(StorageEvents.FILE_UPLOADED, payload => { - this.emit(StorageEvents.FILE_UPLOADED, payload) - }) - this.filesManager.on(StorageEvents.DOWNLOAD_PROGRESS, payload => { - this.emit(StorageEvents.DOWNLOAD_PROGRESS, payload) - }) - this.filesManager.on(StorageEvents.MESSAGE_MEDIA_UPDATED, payload => { - this.emit(StorageEvents.MESSAGE_MEDIA_UPDATED, payload) - }) - } - - public async uploadFile(metadata: FileMetadata) { - this.filesManager.emit(IpfsFilesManagerEvents.UPLOAD_FILE, metadata) - } - - public async downloadFile(metadata: FileMetadata) { - this.filesManager.emit(IpfsFilesManagerEvents.DOWNLOAD_FILE, metadata) - } - - public cancelDownload(mid: string) { - this.filesManager.emit(IpfsFilesManagerEvents.CANCEL_DOWNLOAD, mid) - } - public async saveCertificate(payload: SaveCertificatePayload): Promise { this.logger.info('About to save certificate...') if (!payload.certificate) { @@ -685,40 +317,10 @@ export class StorageService extends EventEmitter { return allUsers } - public async deleteFilesFromChannel(payload: DeleteFilesFromChannelSocketPayload) { - const { messages } = payload - Object.keys(messages).map(async key => { - const message = messages[key] - if (message?.media?.path) { - const mediaPath = message.media.path - this.logger.info('deleteFilesFromChannel : mediaPath', mediaPath) - const isFileExist = await this.checkIfFileExist(mediaPath) - this.logger.info(`deleteFilesFromChannel : isFileExist- ${isFileExist}`) - if (isFileExist) { - fs.unlink(mediaPath, unlinkError => { - if (unlinkError) { - this.logger.error(`deleteFilesFromChannel : unlink error`, unlinkError) - } - }) - } else { - this.logger.error(`deleteFilesFromChannel : file does not exist`, mediaPath) - } - } - }) - } - public async addUserProfile(profile: UserProfile) { await this.userProfileStore.setEntry(profile.pubKey, profile) } - public async checkIfFileExist(filepath: string): Promise { - return await new Promise(resolve => { - fs.access(filepath, fs.constants.F_OK, error => { - resolve(!error) - }) - }) - } - public async setIdentity(identity: Identity) { await this.localDbService.setIdentity(identity) this.emit(SocketActionTypes.IDENTITY_STORED, identity) @@ -731,16 +333,7 @@ export class StorageService extends EventEmitter { public async clean() { this.peerId = null - // @ts-ignore - this.channels = undefined - // @ts-ignore - this.messageThreads = undefined - // @ts-ignore - this.publicChannelsRepos = new Map() - this.publicKeysMap = new Map() - - this.certificates = null - this.channels = null + await this.channelsService.clean() this.certificatesRequestsStore.clean() this.certificatesStore.clean() diff --git a/packages/backend/src/nest/storage/storage.types.ts b/packages/backend/src/nest/storage/storage.types.ts index 20bbf4039..6ed848fcf 100644 --- a/packages/backend/src/nest/storage/storage.types.ts +++ b/packages/backend/src/nest/storage/storage.types.ts @@ -40,5 +40,5 @@ export interface CsrReplicatedPromiseValues { } export interface DBOptions { - replicate: boolean + sync: boolean } diff --git a/packages/types/src/channel.ts b/packages/types/src/channel.ts index e4c0eb782..05c12c41e 100644 --- a/packages/types/src/channel.ts +++ b/packages/types/src/channel.ts @@ -42,6 +42,10 @@ export interface ChannelMessage { media?: FileMetadata } +export interface ConsumedChannelMessage extends ChannelMessage { + verified?: boolean +} + export interface DisplayableMessage { id: string type: number diff --git a/packages/types/src/errors.ts b/packages/types/src/errors.ts index 4f70fcec8..90cda5ee8 100644 --- a/packages/types/src/errors.ts +++ b/packages/types/src/errors.ts @@ -53,3 +53,12 @@ export enum ErrorMessages { // Storage Server STORAGE_SERVER_CONNECTION_FAILED = 'Connecting to storage server failed', } + +export class CompoundError extends Error { + constructor( + message: string, + public readonly originalError: T + ) { + super(message) + } +}