From cb9e499d34b6ce8ab843019c8ffd868f03aaee7d Mon Sep 17 00:00:00 2001 From: mario Date: Tue, 21 Jan 2025 19:12:07 +0900 Subject: [PATCH 01/26] =?UTF-8?q?feat:=20=EB=A9=94=EC=8B=9C=EC=A7=80?= =?UTF-8?q?=ED=81=90=20=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Pub/Sub 패턴 적용을 위한 메시지 큐 구현 --- .../src/common/pub-sub/interfaces/Message.ts | 7 + .../interfaces/subscription.interface.ts | 10 + .../src/common/pub-sub/message-queue.ts | 202 ++++++++++++++++++ apps/backend/src/common/pub-sub/publisher.ts | 22 ++ 4 files changed, 241 insertions(+) create mode 100644 apps/backend/src/common/pub-sub/interfaces/Message.ts create mode 100644 apps/backend/src/common/pub-sub/interfaces/subscription.interface.ts create mode 100644 apps/backend/src/common/pub-sub/message-queue.ts create mode 100644 apps/backend/src/common/pub-sub/publisher.ts diff --git a/apps/backend/src/common/pub-sub/interfaces/Message.ts b/apps/backend/src/common/pub-sub/interfaces/Message.ts new file mode 100644 index 0000000..1851e6c --- /dev/null +++ b/apps/backend/src/common/pub-sub/interfaces/Message.ts @@ -0,0 +1,7 @@ +export interface Message { + id: string; + topic: string; + data: T; + publisherId: string; + timestamp: number; +} \ No newline at end of file diff --git a/apps/backend/src/common/pub-sub/interfaces/subscription.interface.ts b/apps/backend/src/common/pub-sub/interfaces/subscription.interface.ts new file mode 100644 index 0000000..e66aca9 --- /dev/null +++ b/apps/backend/src/common/pub-sub/interfaces/subscription.interface.ts @@ -0,0 +1,10 @@ +import { Message } from './Message'; + +export type SubscriptionHandler = (message: Message) => void | Promise; + +export interface Subscription { + id: string; + topic: string; + handler: SubscriptionHandler; +} + diff --git a/apps/backend/src/common/pub-sub/message-queue.ts b/apps/backend/src/common/pub-sub/message-queue.ts new file mode 100644 index 0000000..b4823e8 --- /dev/null +++ b/apps/backend/src/common/pub-sub/message-queue.ts @@ -0,0 +1,202 @@ +import { Message } from "./interfaces/Message"; +import { Publisher } from "./publisher"; +import { Subscription, SubscriptionHandler } from "./interfaces/subscription.interface"; +import { randomUUID } from 'node:crypto'; + +interface DeliveryStatus { + success: boolean; + retries: number; +} + +export class MessageQueue { + private queues: Map[]> = new Map(); + private subscriptions: Map[]> = new Map(); + private publishers: Map> = new Map(); + private processing: Map = new Map(); + + constructor( + private readonly maxRetries: number = 3, + private readonly retryDelay: number = 1000 + ) {} + + registerPublisher(publisherId: string): Publisher { + if (this.publishers.has(publisherId)) { + throw new Error(`Publisher with ID ${publisherId} already exists`); + } + + const publisher = new Publisher(publisherId, this); + + this.publishers.set(publisherId, publisher); + + return publisher; + } + + removePublisher(publisherId: string): void { + this.publishers.delete(publisherId); + } + + async publish(message: Message): Promise { + if (!this.queues.has(message.topic)) { + this.queues.set(message.topic, []); + } + + this.queues.get(message.topic).push(message); + + await this.processQueue(message.topic); + } + + subscribe(topic: string, handler: SubscriptionHandler): () => void { + const subscriptions = this.getSubscriptions(topic); + const subscription: Subscription = { + id: randomUUID(), + topic, + handler, + }; + + return () => { + if (subscriptions) { + const index = subscriptions.findIndex(s => s.id === subscription.id); + if (index !== -1) { + subscriptions.splice(index, 1); + } + if (subscriptions.length === 0) { + this.subscriptions.delete(topic); + } + } + }; + } + + private getSubscriptions(topic: string) { + if (!this.subscriptions.has(topic)) { + this.subscriptions.set(topic, []); + } + + return this.subscriptions.get(topic); + } + + private async processQueue(topic: string): Promise { + if (!this.shouldProcessQueue(topic)) { + return; + } + + this.processing.set(topic, true); + + try { + const queue = this.queues.get(topic)!; + while (queue.length > 0) { + await this.processMessage(queue[0], topic); + queue.shift(); + } + } finally { + this.processing.set(topic, false); + } + } + + private shouldProcessQueue(topic: string) { + return !this.processing.get(topic) + && (this.queues.get(topic)?.length ?? 0) > 0; + } + + private async processMessage(message: Message, topic: string) { + const subscribers = this.subscriptions.get(topic) ?? []; + const deliveryStatus = this.initializeDeliveryStatus(subscribers); + + await this.deliverToAllSubscribers(message, subscribers, deliveryStatus, topic); + this.logFailedDeliveries(deliveryStatus, topic); + } + + private initializeDeliveryStatus(subscribers: Subscription[]) { + const deliveryStatus = new Map(); + + subscribers.forEach(sub => { + deliveryStatus.set(sub.id, { success: false, retries: 0 }); + }); + + return deliveryStatus; + } + + private async deliverToAllSubscribers( + message: Message, + subscribers: Subscription[], + deliveryStatus: Map, + topic: string + ) { + let hasMoreRetries = true; + + while (hasMoreRetries) { + const pendingSubscribers = this.getPendingSubscribers(subscribers, deliveryStatus); + + if (pendingSubscribers.length === 0) { + hasMoreRetries = false; + continue; + } + + await this.attemptDelivery(message, pendingSubscribers, deliveryStatus, topic); + } + } + + private getPendingSubscribers( + subscribers: Subscription[], + deliveryStatus: Map + ): Subscription[] { + return subscribers.filter(sub => { + const status = deliveryStatus.get(sub.id)!; + return !status.success && status.retries < this.maxRetries; + }); + } + + private async attemptDelivery( + message: Message, + subscribers: Subscription[], + deliveryStatus: Map, + topic: string + ): Promise { + await Promise.all( + subscribers.map(sub => this.deliverToSubscriber(message, sub, deliveryStatus, topic)) + ); + } + + private async deliverToSubscriber( + message: Message, + subscriber: Subscription, + deliveryStatus: Map, + topic: string + ): Promise { + const status = deliveryStatus.get(subscriber.id)!; + + try { + await subscriber.handler(message); + status.success = true; + } catch (error) { + status.retries++; + + if (status.retries >= this.maxRetries) { + console.error( + `Failed to deliver message to subscriber ${subscriber.id} in topic ${topic} after ${this.maxRetries} retries:`, + error + ); + } else { + await this.delay(this.retryDelay); + } + } + deliveryStatus.set(subscriber.id, status); + } + + private logFailedDeliveries(deliveryStatus: Map, topic: string): void { + deliveryStatus.forEach((status, subId) => { + if (!status.success) { + console.warn(`Message delivery failed for subscriber ${subId} in topic ${topic}`); + } + }); + } + + private delay(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); + } + + + clearTopic(topic: string): void { + this.queues.delete(topic); + this.subscriptions.delete(topic); + } +} diff --git a/apps/backend/src/common/pub-sub/publisher.ts b/apps/backend/src/common/pub-sub/publisher.ts new file mode 100644 index 0000000..10720af --- /dev/null +++ b/apps/backend/src/common/pub-sub/publisher.ts @@ -0,0 +1,22 @@ +import { randomUUID } from 'node:crypto'; +import { MessageQueue } from './message-queue'; + +export class Publisher { + constructor( + public readonly id: string, + private readonly messageQueue: MessageQueue + ) {} + + async publish(topic: string, data: T): Promise { + const now = Date.now(); + const message = { + id: `${this.id}-${now}-${randomUUID()}`, + topic, + data, + publisherId: this.id, + timestamp: now, + }; + + await this.messageQueue.publish(message); + } +} \ No newline at end of file From 316d7441f6adb2590f8bc62441ac348591148077 Mon Sep 17 00:00:00 2001 From: mario Date: Wed, 22 Jan 2025 16:14:41 +0900 Subject: [PATCH 02/26] =?UTF-8?q?feat:=20message-queue=20=EB=AA=A8?= =?UTF-8?q?=EB=93=88=20=EB=93=B1=EB=A1=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - MessageQueue 구현체 Nest 전역 모듈 등록 --- apps/backend/src/app.module.ts | 4 ++++ .../src/message-queue/message-queue.module.ts | 13 +++++++++++++ .../message-queue.service.spec.ts | 18 ++++++++++++++++++ .../src/message-queue/message-queue.service.ts | 9 +++++++++ 4 files changed, 44 insertions(+) create mode 100644 apps/backend/src/message-queue/message-queue.module.ts create mode 100644 apps/backend/src/message-queue/message-queue.service.spec.ts create mode 100644 apps/backend/src/message-queue/message-queue.service.ts diff --git a/apps/backend/src/app.module.ts b/apps/backend/src/app.module.ts index 89a91b9..4dd8d6f 100644 --- a/apps/backend/src/app.module.ts +++ b/apps/backend/src/app.module.ts @@ -16,6 +16,8 @@ import { DataSource } from 'typeorm'; import { addTransactionalDataSource } from 'typeorm-transactional'; import { QuizModule } from './quiz/quiz.module'; import { ChatModule } from './chat/chat.module'; +import { MessageQueueService } from './message-queue/message-queue.service'; +import { MessageQueueModule } from './message-queue/message-queue.module'; @Module({ imports: [ @@ -38,6 +40,7 @@ import { ChatModule } from './chat/chat.module'; }), QuizModule, ChatModule, + MessageQueueModule, ], controllers: [AppController], providers: [ @@ -46,6 +49,7 @@ import { ChatModule } from './chat/chat.module'; provide: APP_PIPE, useClass: ValidationPipe, }, + MessageQueueService, ], }) export class AppModule implements NestModule { diff --git a/apps/backend/src/message-queue/message-queue.module.ts b/apps/backend/src/message-queue/message-queue.module.ts new file mode 100644 index 0000000..0c1307b --- /dev/null +++ b/apps/backend/src/message-queue/message-queue.module.ts @@ -0,0 +1,13 @@ +import { Module } from '@nestjs/common'; +import { MessageQueueService } from './message-queue.service'; + +@Module({ + providers: [{ + provide: MessageQueueService, + useFactory: () => { + return new MessageQueueService(3, 1000); + } + }], + exports: [MessageQueueService], +}) +export class MessageQueueModule {} diff --git a/apps/backend/src/message-queue/message-queue.service.spec.ts b/apps/backend/src/message-queue/message-queue.service.spec.ts new file mode 100644 index 0000000..5afcae5 --- /dev/null +++ b/apps/backend/src/message-queue/message-queue.service.spec.ts @@ -0,0 +1,18 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { MessageQueueService } from './message-queue.service'; + +describe('MessageQueueService', () => { + let service: MessageQueueService; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [MessageQueueService], + }).compile(); + + service = module.get(MessageQueueService); + }); + + it('should be defined', () => { + expect(service).toBeDefined(); + }); +}); diff --git a/apps/backend/src/message-queue/message-queue.service.ts b/apps/backend/src/message-queue/message-queue.service.ts new file mode 100644 index 0000000..4021f1e --- /dev/null +++ b/apps/backend/src/message-queue/message-queue.service.ts @@ -0,0 +1,9 @@ +import { Injectable } from '@nestjs/common'; +import { MessageQueue } from '../common/pub-sub/message-queue'; + +@Injectable() +export class MessageQueueService extends MessageQueue{ + constructor(maxRetries: number = 3, retryDelay: number = 1000) { + super(maxRetries, retryDelay); + } +} From 5128b99ee376492c6a7d7b79d6981460396e5bd3 Mon Sep 17 00:00:00 2001 From: mario Date: Wed, 22 Jan 2025 19:20:40 +0900 Subject: [PATCH 03/26] =?UTF-8?q?fix:=20=EB=A9=94=EC=8B=9C=EC=A7=80?= =?UTF-8?q?=ED=81=90=20=EA=B5=AC=ED=98=84=20=EC=A0=9C=EA=B1=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 구현 변경으로 인한 메시지큐 관련 클래스 제거 --- apps/backend/src/app.module.ts | 4 - .../src/common/pub-sub/interfaces/Message.ts | 7 - .../interfaces/subscription.interface.ts | 10 - .../src/common/pub-sub/message-queue.ts | 202 ------------------ apps/backend/src/common/pub-sub/publisher.ts | 22 -- .../src/message-queue/message-queue.module.ts | 13 -- .../message-queue.service.spec.ts | 18 -- .../message-queue/message-queue.service.ts | 9 - 8 files changed, 285 deletions(-) delete mode 100644 apps/backend/src/common/pub-sub/interfaces/Message.ts delete mode 100644 apps/backend/src/common/pub-sub/interfaces/subscription.interface.ts delete mode 100644 apps/backend/src/common/pub-sub/message-queue.ts delete mode 100644 apps/backend/src/common/pub-sub/publisher.ts delete mode 100644 apps/backend/src/message-queue/message-queue.module.ts delete mode 100644 apps/backend/src/message-queue/message-queue.service.spec.ts delete mode 100644 apps/backend/src/message-queue/message-queue.service.ts diff --git a/apps/backend/src/app.module.ts b/apps/backend/src/app.module.ts index 4dd8d6f..89a91b9 100644 --- a/apps/backend/src/app.module.ts +++ b/apps/backend/src/app.module.ts @@ -16,8 +16,6 @@ import { DataSource } from 'typeorm'; import { addTransactionalDataSource } from 'typeorm-transactional'; import { QuizModule } from './quiz/quiz.module'; import { ChatModule } from './chat/chat.module'; -import { MessageQueueService } from './message-queue/message-queue.service'; -import { MessageQueueModule } from './message-queue/message-queue.module'; @Module({ imports: [ @@ -40,7 +38,6 @@ import { MessageQueueModule } from './message-queue/message-queue.module'; }), QuizModule, ChatModule, - MessageQueueModule, ], controllers: [AppController], providers: [ @@ -49,7 +46,6 @@ import { MessageQueueModule } from './message-queue/message-queue.module'; provide: APP_PIPE, useClass: ValidationPipe, }, - MessageQueueService, ], }) export class AppModule implements NestModule { diff --git a/apps/backend/src/common/pub-sub/interfaces/Message.ts b/apps/backend/src/common/pub-sub/interfaces/Message.ts deleted file mode 100644 index 1851e6c..0000000 --- a/apps/backend/src/common/pub-sub/interfaces/Message.ts +++ /dev/null @@ -1,7 +0,0 @@ -export interface Message { - id: string; - topic: string; - data: T; - publisherId: string; - timestamp: number; -} \ No newline at end of file diff --git a/apps/backend/src/common/pub-sub/interfaces/subscription.interface.ts b/apps/backend/src/common/pub-sub/interfaces/subscription.interface.ts deleted file mode 100644 index e66aca9..0000000 --- a/apps/backend/src/common/pub-sub/interfaces/subscription.interface.ts +++ /dev/null @@ -1,10 +0,0 @@ -import { Message } from './Message'; - -export type SubscriptionHandler = (message: Message) => void | Promise; - -export interface Subscription { - id: string; - topic: string; - handler: SubscriptionHandler; -} - diff --git a/apps/backend/src/common/pub-sub/message-queue.ts b/apps/backend/src/common/pub-sub/message-queue.ts deleted file mode 100644 index b4823e8..0000000 --- a/apps/backend/src/common/pub-sub/message-queue.ts +++ /dev/null @@ -1,202 +0,0 @@ -import { Message } from "./interfaces/Message"; -import { Publisher } from "./publisher"; -import { Subscription, SubscriptionHandler } from "./interfaces/subscription.interface"; -import { randomUUID } from 'node:crypto'; - -interface DeliveryStatus { - success: boolean; - retries: number; -} - -export class MessageQueue { - private queues: Map[]> = new Map(); - private subscriptions: Map[]> = new Map(); - private publishers: Map> = new Map(); - private processing: Map = new Map(); - - constructor( - private readonly maxRetries: number = 3, - private readonly retryDelay: number = 1000 - ) {} - - registerPublisher(publisherId: string): Publisher { - if (this.publishers.has(publisherId)) { - throw new Error(`Publisher with ID ${publisherId} already exists`); - } - - const publisher = new Publisher(publisherId, this); - - this.publishers.set(publisherId, publisher); - - return publisher; - } - - removePublisher(publisherId: string): void { - this.publishers.delete(publisherId); - } - - async publish(message: Message): Promise { - if (!this.queues.has(message.topic)) { - this.queues.set(message.topic, []); - } - - this.queues.get(message.topic).push(message); - - await this.processQueue(message.topic); - } - - subscribe(topic: string, handler: SubscriptionHandler): () => void { - const subscriptions = this.getSubscriptions(topic); - const subscription: Subscription = { - id: randomUUID(), - topic, - handler, - }; - - return () => { - if (subscriptions) { - const index = subscriptions.findIndex(s => s.id === subscription.id); - if (index !== -1) { - subscriptions.splice(index, 1); - } - if (subscriptions.length === 0) { - this.subscriptions.delete(topic); - } - } - }; - } - - private getSubscriptions(topic: string) { - if (!this.subscriptions.has(topic)) { - this.subscriptions.set(topic, []); - } - - return this.subscriptions.get(topic); - } - - private async processQueue(topic: string): Promise { - if (!this.shouldProcessQueue(topic)) { - return; - } - - this.processing.set(topic, true); - - try { - const queue = this.queues.get(topic)!; - while (queue.length > 0) { - await this.processMessage(queue[0], topic); - queue.shift(); - } - } finally { - this.processing.set(topic, false); - } - } - - private shouldProcessQueue(topic: string) { - return !this.processing.get(topic) - && (this.queues.get(topic)?.length ?? 0) > 0; - } - - private async processMessage(message: Message, topic: string) { - const subscribers = this.subscriptions.get(topic) ?? []; - const deliveryStatus = this.initializeDeliveryStatus(subscribers); - - await this.deliverToAllSubscribers(message, subscribers, deliveryStatus, topic); - this.logFailedDeliveries(deliveryStatus, topic); - } - - private initializeDeliveryStatus(subscribers: Subscription[]) { - const deliveryStatus = new Map(); - - subscribers.forEach(sub => { - deliveryStatus.set(sub.id, { success: false, retries: 0 }); - }); - - return deliveryStatus; - } - - private async deliverToAllSubscribers( - message: Message, - subscribers: Subscription[], - deliveryStatus: Map, - topic: string - ) { - let hasMoreRetries = true; - - while (hasMoreRetries) { - const pendingSubscribers = this.getPendingSubscribers(subscribers, deliveryStatus); - - if (pendingSubscribers.length === 0) { - hasMoreRetries = false; - continue; - } - - await this.attemptDelivery(message, pendingSubscribers, deliveryStatus, topic); - } - } - - private getPendingSubscribers( - subscribers: Subscription[], - deliveryStatus: Map - ): Subscription[] { - return subscribers.filter(sub => { - const status = deliveryStatus.get(sub.id)!; - return !status.success && status.retries < this.maxRetries; - }); - } - - private async attemptDelivery( - message: Message, - subscribers: Subscription[], - deliveryStatus: Map, - topic: string - ): Promise { - await Promise.all( - subscribers.map(sub => this.deliverToSubscriber(message, sub, deliveryStatus, topic)) - ); - } - - private async deliverToSubscriber( - message: Message, - subscriber: Subscription, - deliveryStatus: Map, - topic: string - ): Promise { - const status = deliveryStatus.get(subscriber.id)!; - - try { - await subscriber.handler(message); - status.success = true; - } catch (error) { - status.retries++; - - if (status.retries >= this.maxRetries) { - console.error( - `Failed to deliver message to subscriber ${subscriber.id} in topic ${topic} after ${this.maxRetries} retries:`, - error - ); - } else { - await this.delay(this.retryDelay); - } - } - deliveryStatus.set(subscriber.id, status); - } - - private logFailedDeliveries(deliveryStatus: Map, topic: string): void { - deliveryStatus.forEach((status, subId) => { - if (!status.success) { - console.warn(`Message delivery failed for subscriber ${subId} in topic ${topic}`); - } - }); - } - - private delay(ms: number): Promise { - return new Promise(resolve => setTimeout(resolve, ms)); - } - - - clearTopic(topic: string): void { - this.queues.delete(topic); - this.subscriptions.delete(topic); - } -} diff --git a/apps/backend/src/common/pub-sub/publisher.ts b/apps/backend/src/common/pub-sub/publisher.ts deleted file mode 100644 index 10720af..0000000 --- a/apps/backend/src/common/pub-sub/publisher.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { randomUUID } from 'node:crypto'; -import { MessageQueue } from './message-queue'; - -export class Publisher { - constructor( - public readonly id: string, - private readonly messageQueue: MessageQueue - ) {} - - async publish(topic: string, data: T): Promise { - const now = Date.now(); - const message = { - id: `${this.id}-${now}-${randomUUID()}`, - topic, - data, - publisherId: this.id, - timestamp: now, - }; - - await this.messageQueue.publish(message); - } -} \ No newline at end of file diff --git a/apps/backend/src/message-queue/message-queue.module.ts b/apps/backend/src/message-queue/message-queue.module.ts deleted file mode 100644 index 0c1307b..0000000 --- a/apps/backend/src/message-queue/message-queue.module.ts +++ /dev/null @@ -1,13 +0,0 @@ -import { Module } from '@nestjs/common'; -import { MessageQueueService } from './message-queue.service'; - -@Module({ - providers: [{ - provide: MessageQueueService, - useFactory: () => { - return new MessageQueueService(3, 1000); - } - }], - exports: [MessageQueueService], -}) -export class MessageQueueModule {} diff --git a/apps/backend/src/message-queue/message-queue.service.spec.ts b/apps/backend/src/message-queue/message-queue.service.spec.ts deleted file mode 100644 index 5afcae5..0000000 --- a/apps/backend/src/message-queue/message-queue.service.spec.ts +++ /dev/null @@ -1,18 +0,0 @@ -import { Test, TestingModule } from '@nestjs/testing'; -import { MessageQueueService } from './message-queue.service'; - -describe('MessageQueueService', () => { - let service: MessageQueueService; - - beforeEach(async () => { - const module: TestingModule = await Test.createTestingModule({ - providers: [MessageQueueService], - }).compile(); - - service = module.get(MessageQueueService); - }); - - it('should be defined', () => { - expect(service).toBeDefined(); - }); -}); diff --git a/apps/backend/src/message-queue/message-queue.service.ts b/apps/backend/src/message-queue/message-queue.service.ts deleted file mode 100644 index 4021f1e..0000000 --- a/apps/backend/src/message-queue/message-queue.service.ts +++ /dev/null @@ -1,9 +0,0 @@ -import { Injectable } from '@nestjs/common'; -import { MessageQueue } from '../common/pub-sub/message-queue'; - -@Injectable() -export class MessageQueueService extends MessageQueue{ - constructor(maxRetries: number = 3, retryDelay: number = 1000) { - super(maxRetries, retryDelay); - } -} From 8d058b414e086cc7f1a6d06fd02dc6cf9a3fcef1 Mon Sep 17 00:00:00 2001 From: mario Date: Wed, 22 Jan 2025 19:21:37 +0900 Subject: [PATCH 04/26] =?UTF-8?q?feat:=20pub-sub=20=EA=B8=B0=EB=8A=A5=20?= =?UTF-8?q?=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 간단한 pub-sub 구현 --- .../pub-sub/interfaces/message.interface.ts | 4 ++ .../pub-sub/interfaces/pub-sub.interface.ts | 14 +++++ .../src/core/pub-sub/message-broker.ts | 52 +++++++++++++++++++ apps/backend/src/core/pub-sub/types.ts | 3 ++ 4 files changed, 73 insertions(+) create mode 100644 apps/backend/src/core/pub-sub/interfaces/message.interface.ts create mode 100644 apps/backend/src/core/pub-sub/interfaces/pub-sub.interface.ts create mode 100644 apps/backend/src/core/pub-sub/message-broker.ts create mode 100644 apps/backend/src/core/pub-sub/types.ts diff --git a/apps/backend/src/core/pub-sub/interfaces/message.interface.ts b/apps/backend/src/core/pub-sub/interfaces/message.interface.ts new file mode 100644 index 0000000..000dda2 --- /dev/null +++ b/apps/backend/src/core/pub-sub/interfaces/message.interface.ts @@ -0,0 +1,4 @@ +export interface Message { + topic: string; + data: T; +} \ No newline at end of file diff --git a/apps/backend/src/core/pub-sub/interfaces/pub-sub.interface.ts b/apps/backend/src/core/pub-sub/interfaces/pub-sub.interface.ts new file mode 100644 index 0000000..97f13a8 --- /dev/null +++ b/apps/backend/src/core/pub-sub/interfaces/pub-sub.interface.ts @@ -0,0 +1,14 @@ +import { Message } from './message.interface' +import { MessageHandler } from '../types'; + +export interface PubSub { + subscribe(publisherId: string, subscriberId: string, handler: MessageHandler): Promise; + + unsubscribe(publisherId: string, subscriberId: string): Promise; + + addPublisher(publisherId: string): Promise; + + removePublisher(publisherId: string): Promise; + + publish(publisherId: string, message: Message): Promise; +} \ No newline at end of file diff --git a/apps/backend/src/core/pub-sub/message-broker.ts b/apps/backend/src/core/pub-sub/message-broker.ts new file mode 100644 index 0000000..3106645 --- /dev/null +++ b/apps/backend/src/core/pub-sub/message-broker.ts @@ -0,0 +1,52 @@ +import { PubSub } from './interfaces/pub-sub.interface'; +import { Message } from './interfaces/message.interface'; +import { MessageHandler } from './types'; + +export class MessageBroker implements PubSub { + constructor( + private readonly publishers: Map>>, + ) {} + + public async addPublisher(id: string) { + if (this.publishers.has(id)) { + throw new Error(`Publisher with ID ${id} already exists`); + } + + this.publishers.set(id, new Map()); + } + + public async removePublisher(id: string) { + if (!this.publishers.has(id)) { + throw new Error(`Publisher with ID ${id} does not exist`); + } + + this.publishers.delete(id); + } + + public async publish(id: string, message: Message) { + const subscribers = this.getSubscribers(id); + + if (subscribers === undefined) { + throw new Error(`Publisher with ID ${id} does not exist`); + } + + return Promise.all([...subscribers.values()].map(handler => handler(message))); + } + + public async subscribe(publisherId: string, subscriberId: string, handler: MessageHandler) { + const subscribers = this.getSubscribers(publisherId); + subscribers.set(subscriberId, handler); + } + + public async unsubscribe(publisherId: string, subscriberId: string) { + const subscribers = this.getSubscribers(publisherId); + subscribers.delete(subscriberId); + } + + private getSubscribers(id: string) { + if (!this.publishers.has(id)) { + throw new Error(`Publisher with ID ${id} does not exist`); + } + return this.publishers.get(id); + } +} \ No newline at end of file diff --git a/apps/backend/src/core/pub-sub/types.ts b/apps/backend/src/core/pub-sub/types.ts new file mode 100644 index 0000000..9470384 --- /dev/null +++ b/apps/backend/src/core/pub-sub/types.ts @@ -0,0 +1,3 @@ +import { Message } from './interfaces/message.interface'; + +export type MessageHandler = (message: Message) => void | Promise; \ No newline at end of file From dab16066ba8067506573b68425b050edc8cd7649 Mon Sep 17 00:00:00 2001 From: mario Date: Wed, 22 Jan 2025 19:22:26 +0900 Subject: [PATCH 05/26] =?UTF-8?q?refactor:=20pub-sub=20=EC=A0=81=EC=9A=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 간단한 pub-sub 클래스 chat에 적용 --- apps/backend/src/chat/chat.module.ts | 5 +++ apps/backend/src/chat/chat.service.ts | 47 ++++++++++++++++++++------- apps/backend/src/play/play.gateway.ts | 8 ++++- 3 files changed, 47 insertions(+), 13 deletions(-) diff --git a/apps/backend/src/chat/chat.module.ts b/apps/backend/src/chat/chat.module.ts index 6ef4ae2..f46b963 100644 --- a/apps/backend/src/chat/chat.module.ts +++ b/apps/backend/src/chat/chat.module.ts @@ -2,6 +2,7 @@ import { Module } from '@nestjs/common'; import { ChatController } from './chat.controller'; import { ChatRepositoryMemory } from './repository/chat.memory.repository'; import { ChatService } from './chat.service'; +import { MessageBroker } from '../core/pub-sub/message-broker'; @Module({ controllers: [ChatController], @@ -15,6 +16,10 @@ import { ChatService } from './chat.service'; provide: 'ChatRepository', useClass: ChatRepositoryMemory, }, + { + provide: 'PubSub', + useClass: MessageBroker, + } ], exports: [ChatService], }) diff --git a/apps/backend/src/chat/chat.service.ts b/apps/backend/src/chat/chat.service.ts index 7f90366..637bb25 100644 --- a/apps/backend/src/chat/chat.service.ts +++ b/apps/backend/src/chat/chat.service.ts @@ -1,16 +1,21 @@ import { Injectable, Inject, NotFoundException } from '@nestjs/common'; import { ChatRepositoryMemory } from './repository/chat.memory.repository'; -import {ChatMessage} from "@web08-booquiz/shared"; +import { ChatMessage, Player } from '@web08-booquiz/shared'; +import { PubSub } from '../core/pub-sub/interfaces/pub-sub.interface'; @Injectable() export class ChatService { constructor( @Inject('ChatRepository') private readonly chatRepository: ChatRepositoryMemory, - ) {} + @Inject('PubSub') + private readonly pubSub: PubSub + ) { + } async set(id: string) { - this.chatRepository.set(id); + await this.chatRepository.set(id); + await this.pubSub.addPublisher(id); } async get(id: string) { @@ -20,18 +25,36 @@ export class ChatService { return this.chatRepository.get(id); } - async add(id: string, chatMessage: ChatMessage) { - if (!(await this.chatRepository.has(id))) { - throw new NotFoundException('퀴즈 존에 대한 채팅이 존재하지 않습니다.'); - } - return this.chatRepository.add(id, chatMessage); + async delete(id: string) { + await this.chatRepository.delete(id); + await this.pubSub.removePublisher(id); + } + + async join(chatId: string, player: Player, handleSendMessage: (data: ChatMessage) => void) { + await this.pubSub.subscribe(chatId, player.id, (message) => { + const { data } = message; + const { clientId } = data; + + if (clientId !== player.id) { + handleSendMessage(data); + } + + this.add(chatId, data); + }); } - async has(id: string) { - return await this.chatRepository.has(id); + async send(chatId: string, chatMessage: ChatMessage) { + await this.pubSub.publish(chatId, { + topic: 'chat', + data: chatMessage, + }); } - async delete(id: string) { - return this.chatRepository.delete(id); + private async add(id: string, chatMessage: ChatMessage) { + if (!(await this.chatRepository.has(id))) { + throw new NotFoundException('퀴즈 존에 대한 채팅이 존재하지 않습니다.'); + } + + await this.chatRepository.add(id, chatMessage); } } diff --git a/apps/backend/src/play/play.gateway.ts b/apps/backend/src/play/play.gateway.ts index 999738a..72513d8 100644 --- a/apps/backend/src/play/play.gateway.ts +++ b/apps/backend/src/play/play.gateway.ts @@ -114,6 +114,12 @@ export class PlayGateway implements OnGatewayInit { this.broadcast(playerIds, 'someone_join', { id, nickname }); + await this.chatService.join( + quizZoneId, + currentPlayer, + (message: ChatMessage) => client.send(JSON.stringify(message)) + ); + return { event: 'join', data, @@ -303,6 +309,6 @@ export class PlayGateway implements OnGatewayInit { const clientIds = await this.playService.chatQuizZone(clientId, quizZoneId); this.broadcast(clientIds, 'chat', message); - this.chatService.add(quizZoneId, message); + await this.chatService.send(quizZoneId, message); } } From 22284cc03f58797b0c622bad1122e94ff856219b Mon Sep 17 00:00:00 2001 From: mario Date: Wed, 22 Jan 2025 19:34:46 +0900 Subject: [PATCH 06/26] =?UTF-8?q?fix:=20pub-sub=20=EC=9D=B8=ED=84=B0?= =?UTF-8?q?=ED=8E=98=EC=9D=B4=EC=8A=A4=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - publish 반환 타입 수정 --- apps/backend/src/core/pub-sub/interfaces/pub-sub.interface.ts | 2 +- apps/backend/src/core/pub-sub/message-broker.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/backend/src/core/pub-sub/interfaces/pub-sub.interface.ts b/apps/backend/src/core/pub-sub/interfaces/pub-sub.interface.ts index 97f13a8..d33c617 100644 --- a/apps/backend/src/core/pub-sub/interfaces/pub-sub.interface.ts +++ b/apps/backend/src/core/pub-sub/interfaces/pub-sub.interface.ts @@ -10,5 +10,5 @@ export interface PubSub { removePublisher(publisherId: string): Promise; - publish(publisherId: string, message: Message): Promise; + publish(publisherId: string, message: Message): Promise; } \ No newline at end of file diff --git a/apps/backend/src/core/pub-sub/message-broker.ts b/apps/backend/src/core/pub-sub/message-broker.ts index 3106645..395d5e3 100644 --- a/apps/backend/src/core/pub-sub/message-broker.ts +++ b/apps/backend/src/core/pub-sub/message-broker.ts @@ -30,7 +30,7 @@ export class MessageBroker implements PubSub { throw new Error(`Publisher with ID ${id} does not exist`); } - return Promise.all([...subscribers.values()].map(handler => handler(message))); + await Promise.all([...subscribers.values()].map(handler => handler(message))); } public async subscribe(publisherId: string, subscriberId: string, handler: MessageHandler) { From bb3efe4a5f8032f9bd5748d59751148e5f31d616 Mon Sep 17 00:00:00 2001 From: mario Date: Wed, 22 Jan 2025 20:17:16 +0900 Subject: [PATCH 07/26] =?UTF-8?q?feat:=20=EB=A9=94=EC=8B=9C=EC=A7=80=20?= =?UTF-8?q?=EB=B8=8C=EB=A1=9C=EC=BB=A4=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 메시지 브로커 클래스에 토픽에 대한 제네릭 타입 추가 - subscribe 메소드 반환값을 unsubscribe 로 변경 - 퀴즈를 떠났을 때 채팅에서도 unsubscribe 되도록 수정 --- apps/backend/src/chat/chat.service.ts | 31 ++++++++++++++----- .../pub-sub/interfaces/message.interface.ts | 6 ++-- .../pub-sub/interfaces/pub-sub.interface.ts | 8 ++--- .../src/core/pub-sub/message-broker.ts | 11 ++++--- apps/backend/src/core/pub-sub/types.ts | 2 +- apps/backend/src/play/play.gateway.ts | 1 + 6 files changed, 39 insertions(+), 20 deletions(-) diff --git a/apps/backend/src/chat/chat.service.ts b/apps/backend/src/chat/chat.service.ts index 637bb25..229d97c 100644 --- a/apps/backend/src/chat/chat.service.ts +++ b/apps/backend/src/chat/chat.service.ts @@ -9,7 +9,7 @@ export class ChatService { @Inject('ChatRepository') private readonly chatRepository: ChatRepositoryMemory, @Inject('PubSub') - private readonly pubSub: PubSub + private readonly pubSub: PubSub<'chat' | 'leave', ChatMessage> ) { } @@ -31,15 +31,21 @@ export class ChatService { } async join(chatId: string, player: Player, handleSendMessage: (data: ChatMessage) => void) { - await this.pubSub.subscribe(chatId, player.id, (message) => { - const { data } = message; + const unsubscribe = await this.pubSub.subscribe(chatId, player.id, async (message) => { + const { topic, data } = message; const { clientId } = data; - if (clientId !== player.id) { - handleSendMessage(data); + switch (topic) { + case 'chat': + if (clientId !== player.id) { + handleSendMessage(data); + } + return this.add(chatId, data); + case 'leave': + if (clientId === player.id) { + return unsubscribe(); + } } - - this.add(chatId, data); }); } @@ -50,6 +56,17 @@ export class ChatService { }); } + async leave(chatId: string, clientId: string) { + await this.pubSub.publish(chatId, { + topic: 'leave', + data: { + clientId, + nickname: '', + message: '', + }, + }) + } + private async add(id: string, chatMessage: ChatMessage) { if (!(await this.chatRepository.has(id))) { throw new NotFoundException('퀴즈 존에 대한 채팅이 존재하지 않습니다.'); diff --git a/apps/backend/src/core/pub-sub/interfaces/message.interface.ts b/apps/backend/src/core/pub-sub/interfaces/message.interface.ts index 000dda2..40c8fac 100644 --- a/apps/backend/src/core/pub-sub/interfaces/message.interface.ts +++ b/apps/backend/src/core/pub-sub/interfaces/message.interface.ts @@ -1,4 +1,4 @@ -export interface Message { - topic: string; - data: T; +export interface Message { + topic: TTopic; + data: TData; } \ No newline at end of file diff --git a/apps/backend/src/core/pub-sub/interfaces/pub-sub.interface.ts b/apps/backend/src/core/pub-sub/interfaces/pub-sub.interface.ts index d33c617..e85462d 100644 --- a/apps/backend/src/core/pub-sub/interfaces/pub-sub.interface.ts +++ b/apps/backend/src/core/pub-sub/interfaces/pub-sub.interface.ts @@ -1,14 +1,14 @@ import { Message } from './message.interface' import { MessageHandler } from '../types'; -export interface PubSub { - subscribe(publisherId: string, subscriberId: string, handler: MessageHandler): Promise; +type Unsubscribe = () => Promise; - unsubscribe(publisherId: string, subscriberId: string): Promise; +export interface PubSub { + subscribe(publisherId: string, subscriberId: string, handler: MessageHandler): Promise; addPublisher(publisherId: string): Promise; removePublisher(publisherId: string): Promise; - publish(publisherId: string, message: Message): Promise; + publish(publisherId: string, message: Message): Promise; } \ No newline at end of file diff --git a/apps/backend/src/core/pub-sub/message-broker.ts b/apps/backend/src/core/pub-sub/message-broker.ts index 395d5e3..e072baa 100644 --- a/apps/backend/src/core/pub-sub/message-broker.ts +++ b/apps/backend/src/core/pub-sub/message-broker.ts @@ -2,9 +2,9 @@ import { PubSub } from './interfaces/pub-sub.interface'; import { Message } from './interfaces/message.interface'; import { MessageHandler } from './types'; -export class MessageBroker implements PubSub { +export class MessageBroker implements PubSub { constructor( - private readonly publishers: Map>>, + private readonly publishers: Map>>, ) {} public async addPublisher(id: string) { @@ -23,7 +23,7 @@ export class MessageBroker implements PubSub { this.publishers.delete(id); } - public async publish(id: string, message: Message) { + public async publish(id: string, message: Message) { const subscribers = this.getSubscribers(id); if (subscribers === undefined) { @@ -33,12 +33,13 @@ export class MessageBroker implements PubSub { await Promise.all([...subscribers.values()].map(handler => handler(message))); } - public async subscribe(publisherId: string, subscriberId: string, handler: MessageHandler) { + public async subscribe(publisherId: string, subscriberId: string, handler: MessageHandler) { const subscribers = this.getSubscribers(publisherId); subscribers.set(subscriberId, handler); + return () => this.unsubscribe(publisherId, subscriberId); } - public async unsubscribe(publisherId: string, subscriberId: string) { + private async unsubscribe(publisherId: string, subscriberId: string) { const subscribers = this.getSubscribers(publisherId); subscribers.delete(subscriberId); } diff --git a/apps/backend/src/core/pub-sub/types.ts b/apps/backend/src/core/pub-sub/types.ts index 9470384..1021e38 100644 --- a/apps/backend/src/core/pub-sub/types.ts +++ b/apps/backend/src/core/pub-sub/types.ts @@ -1,3 +1,3 @@ import { Message } from './interfaces/message.interface'; -export type MessageHandler = (message: Message) => void | Promise; \ No newline at end of file +export type MessageHandler = (message: Message) => void | Promise; \ No newline at end of file diff --git a/apps/backend/src/play/play.gateway.ts b/apps/backend/src/play/play.gateway.ts index 72513d8..c63f2b6 100644 --- a/apps/backend/src/play/play.gateway.ts +++ b/apps/backend/src/play/play.gateway.ts @@ -293,6 +293,7 @@ export class PlayGateway implements OnGatewayInit { this.clearQuizZone(playerIds, quizZoneId, 0); } else { this.broadcast(playerIds, 'someone_leave', clientId); + await this.chatService.leave(quizZoneId, clientId); this.clearClient(clientId, 'Client leave'); } From 69d6b1cdf6ec660d55ec87445c4e1ab4588e5e85 Mon Sep 17 00:00:00 2001 From: mario Date: Wed, 22 Jan 2025 20:36:40 +0900 Subject: [PATCH 08/26] =?UTF-8?q?fix:=20=EB=B8=8C=EB=A1=9C=EC=BB=A4=20?= =?UTF-8?q?=EC=B4=88=EA=B8=B0=ED=99=94=20=EC=98=A4=EB=A5=98=20=EC=88=98?= =?UTF-8?q?=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 브로커 의존성 주입 과정에서 publisher 주입 안되는 문제 수정 --- apps/backend/src/core/pub-sub/message-broker.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/backend/src/core/pub-sub/message-broker.ts b/apps/backend/src/core/pub-sub/message-broker.ts index e072baa..f58c614 100644 --- a/apps/backend/src/core/pub-sub/message-broker.ts +++ b/apps/backend/src/core/pub-sub/message-broker.ts @@ -4,7 +4,7 @@ import { MessageHandler } from './types'; export class MessageBroker implements PubSub { constructor( - private readonly publishers: Map>>, + private readonly publishers: Map>> = new Map(), ) {} public async addPublisher(id: string) { From 36385aca9d052a07f10cfe083c5d2a1cd85785a1 Mon Sep 17 00:00:00 2001 From: mario Date: Wed, 22 Jan 2025 20:38:47 +0900 Subject: [PATCH 09/26] =?UTF-8?q?feat:=20=EB=A9=94=EC=8B=9C=EC=A7=80=20?= =?UTF-8?q?=EB=B8=8C=EB=A1=9C=EC=BB=A4=20=EA=B5=AC=ED=98=84=EC=B2=B4=20?= =?UTF-8?q?=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - RxJS를 활용한 메시지 브로커 구현 --- .../core/pub-sub/reactive-message-broker.ts | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 apps/backend/src/core/pub-sub/reactive-message-broker.ts diff --git a/apps/backend/src/core/pub-sub/reactive-message-broker.ts b/apps/backend/src/core/pub-sub/reactive-message-broker.ts new file mode 100644 index 0000000..f988dea --- /dev/null +++ b/apps/backend/src/core/pub-sub/reactive-message-broker.ts @@ -0,0 +1,56 @@ +import { Subject } from 'rxjs'; +import { Message } from './interfaces/message.interface'; +import { PubSub } from './interfaces/pub-sub.interface'; + +export class ReactiveMessageBroker implements PubSub { + constructor( + private publishers: Map>> = new Map(), + ) {} + + public async addPublisher(id: string) { + if (this.publishers.has(id)) { + throw new Error(`Publisher with ID ${id} already exists`); + } + + this.publishers.set(id, new Subject>()); + } + + public async removePublisher(id: string) { + const publisher = this.publishers.get(id); + if (!publisher) { + throw new Error(`Publisher with ID ${id} does not exist`); + } + + publisher.complete(); + this.publishers.delete(id); + } + + public async publish(id: string, message: Message) { + const publisher = this.publishers.get(id); + + if (!publisher) { + throw new Error(`Publisher with ID ${id} does not exist`); + } + + publisher.next(message); + } + + public async subscribe( + publisherId: string, + subscriberId: string, + handler: (message: Message) => void + ) { + const publisher = this.publishers.get(publisherId); + + if (!publisher) { + throw new Error(`Publisher with ID ${publisherId} does not exist`); + } + + const subscription = publisher.subscribe({ + next: handler, + error: (error) => console.error(`Error in subscription ${publisherId}:${subscriberId} :`, error) + }); + + return async () => subscription.unsubscribe(); + } +} \ No newline at end of file From 95262f67bfdff96067ea0a8ce1097a70187bfc22 Mon Sep 17 00:00:00 2001 From: mario Date: Wed, 22 Jan 2025 22:19:41 +0900 Subject: [PATCH 10/26] =?UTF-8?q?feat:=20ReactiveMessageBroker=20=EC=A0=81?= =?UTF-8?q?=EC=9A=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ReactiveMessageBroker 적용 및 테스트 완료 --- apps/backend/src/chat/chat.module.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/backend/src/chat/chat.module.ts b/apps/backend/src/chat/chat.module.ts index f46b963..fe93fef 100644 --- a/apps/backend/src/chat/chat.module.ts +++ b/apps/backend/src/chat/chat.module.ts @@ -2,7 +2,7 @@ import { Module } from '@nestjs/common'; import { ChatController } from './chat.controller'; import { ChatRepositoryMemory } from './repository/chat.memory.repository'; import { ChatService } from './chat.service'; -import { MessageBroker } from '../core/pub-sub/message-broker'; +import { ReactiveMessageBroker } from '../core/pub-sub/reactive-message-broker'; @Module({ controllers: [ChatController], @@ -18,7 +18,7 @@ import { MessageBroker } from '../core/pub-sub/message-broker'; }, { provide: 'PubSub', - useClass: MessageBroker, + useClass: ReactiveMessageBroker, } ], exports: [ChatService], From e2b760dcfcc344e37d259a9ef2b92ff76b694924 Mon Sep 17 00:00:00 2001 From: mario Date: Thu, 23 Jan 2025 00:31:57 +0900 Subject: [PATCH 11/26] =?UTF-8?q?fix:=20=EC=B1=84=ED=8C=85=20=ED=95=B8?= =?UTF-8?q?=EB=93=A4=EB=9F=AC=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 채팅 기능 메시지 보내기 핸들러 형식 수정 --- apps/backend/src/chat/chat.service.ts | 4 +--- apps/backend/src/play/play.gateway.ts | 8 +++----- apps/backend/src/quiz-zone/quiz-zone.controller.ts | 10 ++++++++-- apps/backend/src/quiz-zone/quiz-zone.module.ts | 6 +++--- 4 files changed, 15 insertions(+), 13 deletions(-) diff --git a/apps/backend/src/chat/chat.service.ts b/apps/backend/src/chat/chat.service.ts index 229d97c..f46e5a9 100644 --- a/apps/backend/src/chat/chat.service.ts +++ b/apps/backend/src/chat/chat.service.ts @@ -37,9 +37,7 @@ export class ChatService { switch (topic) { case 'chat': - if (clientId !== player.id) { - handleSendMessage(data); - } + handleSendMessage(data); return this.add(chatId, data); case 'leave': if (clientId === player.id) { diff --git a/apps/backend/src/play/play.gateway.ts b/apps/backend/src/play/play.gateway.ts index c63f2b6..1766fc3 100644 --- a/apps/backend/src/play/play.gateway.ts +++ b/apps/backend/src/play/play.gateway.ts @@ -113,12 +113,10 @@ export class PlayGateway implements OnGatewayInit { this.clients.set(sessionId, { quizZoneId, socket: client }); this.broadcast(playerIds, 'someone_join', { id, nickname }); + await this.chatService.join(quizZoneId, currentPlayer, (message: ChatMessage) => { + client.send(JSON.stringify({event: 'chat', data: message})); + }); - await this.chatService.join( - quizZoneId, - currentPlayer, - (message: ChatMessage) => client.send(JSON.stringify(message)) - ); return { event: 'join', diff --git a/apps/backend/src/quiz-zone/quiz-zone.controller.ts b/apps/backend/src/quiz-zone/quiz-zone.controller.ts index ab8d6da..181cb48 100644 --- a/apps/backend/src/quiz-zone/quiz-zone.controller.ts +++ b/apps/backend/src/quiz-zone/quiz-zone.controller.ts @@ -12,6 +12,7 @@ import { ApiOperation, ApiParam, ApiResponse, ApiTags } from '@nestjs/swagger'; import { QuizZoneService } from './quiz-zone.service'; import { ChatService } from '../chat/chat.service'; import { CreateQuizZoneDto } from './dto/create-quiz-zone.dto'; +import { PlayService } from '../play/play.service'; @ApiTags('Quiz Zone') @Controller('quiz-zone') @@ -19,6 +20,7 @@ export class QuizZoneController { constructor( private readonly quizZoneService: QuizZoneService, private readonly chatService: ChatService, + private readonly playService: PlayService, ) {} @Post() @@ -33,9 +35,11 @@ export class QuizZoneController { if (!session || !session.id) { throw new BadRequestException('세션 정보가 없습니다.'); } + const { quizZoneId } = createQuizZoneDto; const hostId = session.id; + await this.quizZoneService.create(createQuizZoneDto, hostId); - await this.chatService.set(createQuizZoneDto.quizZoneId); + await this.chatService.set(quizZoneId); } @Get('check/:quizZoneId') @@ -74,7 +78,9 @@ export class QuizZoneController { quizZoneId, session.quizZoneId, ); - session['quizZoneId'] = quizZoneId; + + session.quizZoneId = quizZoneId; + return { ...quizZoneInfo, serverTime, diff --git a/apps/backend/src/quiz-zone/quiz-zone.module.ts b/apps/backend/src/quiz-zone/quiz-zone.module.ts index 6394170..802a0c0 100644 --- a/apps/backend/src/quiz-zone/quiz-zone.module.ts +++ b/apps/backend/src/quiz-zone/quiz-zone.module.ts @@ -1,14 +1,14 @@ -import { Module } from '@nestjs/common'; +import { forwardRef, Module } from '@nestjs/common'; import { QuizZoneService } from './quiz-zone.service'; import { QuizZoneController } from './quiz-zone.controller'; import { QuizZoneRepositoryMemory } from './repository/quiz-zone.memory.repository'; -import { QuizService } from '../quiz/quiz.service'; import { QuizModule } from '../quiz/quiz.module'; import { ChatModule } from 'src/chat/chat.module'; +import { PlayModule } from '../play/play.module'; @Module({ controllers: [QuizZoneController], - imports: [QuizModule, ChatModule], + imports: [QuizModule, ChatModule, forwardRef(() => PlayModule)], providers: [ QuizZoneService, { From 01d3048e43f454316f8fa114010b2143eaa3c7a8 Mon Sep 17 00:00:00 2001 From: mario Date: Thu, 23 Jan 2025 00:46:35 +0900 Subject: [PATCH 12/26] =?UTF-8?q?refactor:=20play=20=EC=98=81=EC=97=AD=20p?= =?UTF-8?q?ub-sub=20=EB=AA=A8=EB=93=88=20=EC=A0=81=EC=9A=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - play 웹소켓 응답 처리 pub/sub 적용 - 불필요한 로직 삭제 - 일부 코드 개선 --- apps/backend/src/core/SessionWsAdapter.ts | 6 +- .../src/play/entities/send-event.entity.ts | 10 - apps/backend/src/play/play.gateway.ts | 182 ++++++++---------- apps/backend/src/play/play.module.ts | 6 + apps/backend/src/play/play.service.ts | 14 +- .../src/quiz-zone/quiz-zone.controller.ts | 9 +- .../backend/src/quiz-zone/quiz-zone.module.ts | 5 +- 7 files changed, 107 insertions(+), 125 deletions(-) delete mode 100644 apps/backend/src/play/entities/send-event.entity.ts diff --git a/apps/backend/src/core/SessionWsAdapter.ts b/apps/backend/src/core/SessionWsAdapter.ts index dfc9a97..a1a9277 100644 --- a/apps/backend/src/core/SessionWsAdapter.ts +++ b/apps/backend/src/core/SessionWsAdapter.ts @@ -3,8 +3,12 @@ import { RequestHandler } from 'express'; import { NestApplication } from '@nestjs/core'; import { Session } from 'express-session'; +export interface SessionWithQuizZone extends Session { + quizZoneId?: string; +} + export interface WebSocketWithSession extends WebSocket { - session: Session; + session: SessionWithQuizZone; } export class SessionWsAdapter extends WsAdapter { diff --git a/apps/backend/src/play/entities/send-event.entity.ts b/apps/backend/src/play/entities/send-event.entity.ts deleted file mode 100644 index 4361049..0000000 --- a/apps/backend/src/play/entities/send-event.entity.ts +++ /dev/null @@ -1,10 +0,0 @@ -/** - * 웹소켓 서버가 사용자에게 응답할 메시지 형식을 정의합니다. - * - * @property event - 클라이언트에게 전송할 이벤트 이름 - * @property data - 클라이언트에게 전송할 데이터 - */ -export interface SendEventMessage { - event: string; - data: T; -} \ No newline at end of file diff --git a/apps/backend/src/play/play.gateway.ts b/apps/backend/src/play/play.gateway.ts index 1766fc3..c5699c8 100644 --- a/apps/backend/src/play/play.gateway.ts +++ b/apps/backend/src/play/play.gateway.ts @@ -10,14 +10,13 @@ import { PlayService } from './play.service'; import { Server } from 'ws'; import { QuizSubmitDto } from './dto/quiz-submit.dto'; import { QuizJoinDto } from './dto/quiz-join.dto'; -import { BadRequestException, Inject } from '@nestjs/common'; -import { SendEventMessage } from './entities/send-event.entity'; -import { ClientInfo } from './entities/client-info.entity'; +import { Inject, NotFoundException } from '@nestjs/common'; import { WebSocketWithSession } from '../core/SessionWsAdapter'; import { RuntimeException } from '@nestjs/core/errors/exceptions'; import { SubmitResponseDto } from './dto/submit-response.dto'; import { ChatService } from '../chat/chat.service'; -import {ChatMessage, CLOSE_CODE} from "@web08-booquiz/shared"; +import { ChatMessage, SendEventMessage } from '@web08-booquiz/shared'; +import { PubSub } from '../core/pub-sub/interfaces/pub-sub.interface'; /** * 퀴즈 게임에 대한 WebSocket 연결을 관리하는 Gateway입니다. @@ -30,9 +29,11 @@ export class PlayGateway implements OnGatewayInit { constructor( @Inject('ClientInfoStorage') - private readonly clients: Map, + private readonly clients: Map, private readonly playService: PlayService, private readonly chatService: ChatService, + @Inject('PubSub') + private readonly pubSub: PubSub>, ) {} /** @@ -46,35 +47,13 @@ export class PlayGateway implements OnGatewayInit { } private sendToClient(clientId: string, event: string, data?: any) { - const { socket } = this.getClientInfo(clientId); - socket.send(JSON.stringify({ event, data })); - } - - private broadcast(clientIds: string[], event: string, data?: any) { - clientIds.forEach((clientId) => { - this.sendToClient(clientId, event, data); - }); - } - - /** - * 클라이언트의 세션 ID를 이용하여 클라이언트 정보를 조회합니다. - * - * @param clientId - 클라이언트의 세션 ID - */ - private getClientInfo(clientId: string): ClientInfo { - const clientInfo = this.clients.get(clientId); + const socket = this.clients.get(clientId); - if (!clientInfo) { - throw new BadRequestException('사용자의 접속 정보를 찾을 수 없습니다.'); + if (socket === undefined) { + throw new NotFoundException('사용자의 접속 정보를 찾을 수 없습니다.') } - return clientInfo; - } - - private clearClient(clientId: string, reason?: string) { - const { socket } = this.getClientInfo(clientId); - this.clients.delete(clientId); - socket.close(CLOSE_CODE.NORMAL, reason); + socket.send(JSON.stringify({ event, data })); } /** @@ -97,30 +76,21 @@ export class PlayGateway implements OnGatewayInit { ); const { id, nickname } = currentPlayer; - const playerIds = players.map((player) => player.id); - const data = players.map(({ id, nickname }) => ({ - id, - nickname, - })); - - if (this.clients.has(sessionId) && this.clients.get(sessionId).quizZoneId === quizZoneId) { - this.clients.set(sessionId, { quizZoneId, socket: client }); - return { - event: 'join', - data, - }; - } - this.clients.set(sessionId, { quizZoneId, socket: client }); - this.broadcast(playerIds, 'someone_join', { id, nickname }); + await this.subscribePlay(quizZoneId, client); + await this.chatService.join(quizZoneId, currentPlayer, (message: ChatMessage) => { client.send(JSON.stringify({event: 'chat', data: message})); }); + await this.pubSub.publish(quizZoneId, { topic: id, data: { + event: 'someone_join', + data: { id, nickname } + }}); return { event: 'join', - data, + data: players.map(({ id, nickname }) => ({ id, nickname })) }; } @@ -129,16 +99,14 @@ export class PlayGateway implements OnGatewayInit { @ConnectedSocket() client: WebSocketWithSession, @MessageBody() changedNickname: string, ): Promise> { - const clientId = client.session.id; - const { quizZoneId } = this.getClientInfo(clientId); + const { id, quizZoneId } = client.session; - const { playerIds } = await this.playService.changeNickname( - quizZoneId, - clientId, - changedNickname, - ); + await this.playService.changeNickname(quizZoneId, id, changedNickname); - this.broadcast(playerIds, 'updateNickname', { clientId, changedNickname }); + await this.pubSub.publish(quizZoneId, {topic: id, data: { + event: 'changeNickname', + data: {id, changedNickname} + }}); return { event: 'changeNickname', @@ -153,12 +121,14 @@ export class PlayGateway implements OnGatewayInit { */ @SubscribeMessage('start') async start(@ConnectedSocket() client: WebSocketWithSession) { - const clientId = client.session.id; - const { quizZoneId } = this.getClientInfo(clientId); + const { id, quizZoneId } = client.session; - const playerIds = await this.playService.startQuizZone(quizZoneId, clientId); + await this.playService.startQuizZone(quizZoneId, id); - this.broadcast(playerIds, 'start', 'OK'); + await this.pubSub.publish(quizZoneId, {topic: quizZoneId, data: { + event: 'start', + data: 'OK' + }}); this.server.emit('nextQuiz', quizZoneId); } @@ -170,15 +140,21 @@ export class PlayGateway implements OnGatewayInit { */ private async playNextQuiz(quizZoneId: string) { try { - const { nextQuiz, playerIds, currentQuizResult } = await this.playService.playNextQuiz( + const { nextQuiz, currentQuizResult } = await this.playService.playNextQuiz( quizZoneId, - () => { - this.broadcast(playerIds, 'quizTimeOut'); + async () => { + await this.pubSub.publish(quizZoneId, {topic: quizZoneId, data: { + event: 'quizTimeOut', + data: undefined, + }}); this.server.emit('nextQuiz', quizZoneId); }, ); - this.broadcast(playerIds, 'nextQuiz', { nextQuiz, currentQuizResult }); + await this.pubSub.publish(quizZoneId, {topic: quizZoneId, data: { + event: 'nextQuiz', + data: { nextQuiz, currentQuizResult } + }}); } catch (error) { if (error instanceof RuntimeException) { await this.finishQuizZone(quizZoneId); @@ -189,10 +165,7 @@ export class PlayGateway implements OnGatewayInit { } private async finishQuizZone(quizZoneId: string) { - const playerIds = await this.playService.finishQuizZone(quizZoneId); - - this.broadcast(playerIds, 'finish'); - + await this.pubSub.publish(quizZoneId, {topic: quizZoneId, data: {event: 'finish', data: undefined}}); this.server.emit('summary', quizZoneId); } @@ -208,30 +181,33 @@ export class PlayGateway implements OnGatewayInit { @ConnectedSocket() client: WebSocketWithSession, @MessageBody() quizSubmit: QuizSubmitDto, ): Promise> { - const clientId = client.session.id; - const { quizZoneId } = this.getClientInfo(clientId); - const chatMessages = await this.chatService.get(quizZoneId); + const { id, quizZoneId } = client.session; const { isLastSubmit, fastestPlayerIds, submittedCount, totalPlayerCount, - otherSubmittedPlayerIds, - } = await this.playService.submit(quizZoneId, clientId, { + } = await this.playService.submit(quizZoneId, id, { ...quizSubmit, receivedAt: Date.now(), }); if (isLastSubmit) { this.server.emit('nextQuiz', quizZoneId); + } else { + await this.pubSub.publish(quizZoneId, {topic: id, data: { + event: 'someone_submit', + data: { id, submittedCount } + }}); } - this.broadcast(otherSubmittedPlayerIds, 'someone_submit', { clientId, submittedCount }); - return { event: 'submit', - data: { fastestPlayerIds, submittedCount, totalPlayerCount, chatMessages }, + data: { + fastestPlayerIds, submittedCount, totalPlayerCount, + chatMessages: await this.chatService.get(quizZoneId) + }, }; } @@ -248,9 +224,7 @@ export class PlayGateway implements OnGatewayInit { this.sendToClient(id, 'summary', { score, submits, quizzes, ranks, endSocketTime }); }); - const clientsIds = summaries.map(({ id }) => id); - - this.clearQuizZone(clientsIds, quizZoneId, endSocketTime - Date.now()); + this.clearQuizZone(quizZoneId, endSocketTime - Date.now()); } /** @@ -258,17 +232,14 @@ export class PlayGateway implements OnGatewayInit { * * - 방장이 나가면 퀴즈 존을 삭제하고 모든 플레이어에게 방장이 나갔다고 알립니다. * - 일반 플레이어가 나가면 퀴즈 존에서 나가고 다른 플레이어에게 나갔다고 알립니다. - * @param clientIds - 퀴즈존에 참여하고 있는 클라이언트 id 리스트 * @param quizZoneId - 퀴즈가 끝난 퀴즈존 id * @param time - 소켓 연결 종료 시간 종료 시간 */ - private clearQuizZone(clientIds: string[], quizZoneId: string, time: number) { - setTimeout(() => { - clientIds.forEach((id) => { - this.clearClient(id, 'finish'); - }); - this.playService.clearQuizZone(quizZoneId); - this.chatService.delete(quizZoneId); + private clearQuizZone(quizZoneId: string, time: number) { + setTimeout(async () => { + await this.playService.clearQuizZone(quizZoneId); + await this.pubSub.publish(quizZoneId, {topic: quizZoneId, data: {event: 'close', data: undefined}}); + await this.chatService.delete(quizZoneId); }, time); } @@ -281,18 +252,16 @@ export class PlayGateway implements OnGatewayInit { */ @SubscribeMessage('leave') async leave(@ConnectedSocket() client: WebSocketWithSession) { - const clientId = client.session.id; - const { quizZoneId } = this.getClientInfo(clientId); + const { id, quizZoneId } = client.session; - const { isHost, playerIds } = await this.playService.leaveQuizZone(quizZoneId, clientId); + const { isHost } = await this.playService.leaveQuizZone(quizZoneId, id); if (isHost) { - this.broadcast(playerIds, 'close'); - this.clearQuizZone(playerIds, quizZoneId, 0); + await this.pubSub.publish(quizZoneId, {topic: id, data: {event: 'close', data: undefined}}); + this.clearQuizZone(quizZoneId, 0); } else { - this.broadcast(playerIds, 'someone_leave', clientId); - await this.chatService.leave(quizZoneId, clientId); - this.clearClient(clientId, 'Client leave'); + await this.pubSub.publish(quizZoneId, {topic: id, data: {event: 'someone_leave', data: undefined}}); + await this.chatService.leave(quizZoneId, id); } return { event: 'leave', data: 'OK' }; @@ -303,11 +272,28 @@ export class PlayGateway implements OnGatewayInit { @ConnectedSocket() client: WebSocketWithSession, @MessageBody() message: ChatMessage, ) { + await this.chatService.send(client.session.quizZoneId, message); + } + + private async subscribePlay(quizZoneId: string, client: WebSocketWithSession) { const clientId = client.session.id; - const { quizZoneId } = this.getClientInfo(clientId); - const clientIds = await this.playService.chatQuizZone(clientId, quizZoneId); - this.broadcast(clientIds, 'chat', message); - await this.chatService.send(quizZoneId, message); + this.clients.set(clientId, client); + + try { + await this.pubSub.addPublisher(quizZoneId); + } catch (error) {} + const unsubscribe = await this.pubSub.subscribe( + quizZoneId, clientId, async (message) => { + const {topic, data} = message; + + if (topic !== clientId) { + client.send(JSON.stringify(data)); + } else if (data.event === 'someone_leave') { + await unsubscribe(); + this.clients.delete(clientId); + client.close(); + } + }); } } diff --git a/apps/backend/src/play/play.module.ts b/apps/backend/src/play/play.module.ts index 3ffeab8..4ede6a8 100644 --- a/apps/backend/src/play/play.module.ts +++ b/apps/backend/src/play/play.module.ts @@ -3,6 +3,7 @@ import { PlayService } from './play.service'; import { PlayGateway } from './play.gateway'; import { QuizZoneModule } from '../quiz-zone/quiz-zone.module'; import { ChatModule } from 'src/chat/chat.module'; +import { ReactiveMessageBroker } from '../core/pub-sub/reactive-message-broker'; @Module({ imports: [QuizZoneModule, ChatModule], @@ -16,7 +17,12 @@ import { ChatModule } from 'src/chat/chat.module'; provide: 'ClientInfoStorage', useValue: new Map(), }, + { + provide: 'PubSub', + useValue: new ReactiveMessageBroker, + }, PlayService, ], + exports: [PlayService] }) export class PlayModule {} diff --git a/apps/backend/src/play/play.service.ts b/apps/backend/src/play/play.service.ts index eb9574b..5a6324e 100644 --- a/apps/backend/src/play/play.service.ts +++ b/apps/backend/src/play/play.service.ts @@ -25,22 +25,22 @@ export class PlayService { @Inject('PlayInfoStorage') private readonly plays: Map, ) {} - async joinQuizZone(quizZoneId: string, sessionId: string) { + async joinQuizZone(quizZoneId: string, playerId: string) { const { players } = await this.quizZoneService.findOne(quizZoneId); - if (!players.has(sessionId)) { + if (!players.has(playerId)) { throw new NotFoundException('참여하지 않은 사용자입니다.'); } return { - currentPlayer: players.get(sessionId), + currentPlayer: players.get(playerId), players: [...players.values()], }; } async startQuizZone(quizZoneId: string, clientId: string) { const quizZone = await this.quizZoneService.findOne(quizZoneId); - const { hostId, stage, players } = quizZone; + const { hostId, stage } = quizZone; if (hostId !== clientId) { throw new UnauthorizedException('방장만 퀴즈를 시작할 수 있습니다.'); @@ -54,8 +54,6 @@ export class PlayService { ...quizZone, stage: QUIZ_ZONE_STAGE.IN_PROGRESS, }); - - return [...players.values()].map((player) => player.id); } /** @@ -318,8 +316,8 @@ export class PlayService { return summaries; } - public clearQuizZone(quizZoneId: string) { - this.quizZoneService.clearQuizZone(quizZoneId); + public async clearQuizZone(quizZoneId: string) { + return this.quizZoneService.clearQuizZone(quizZoneId); } private calculateQuizRanks( diff --git a/apps/backend/src/quiz-zone/quiz-zone.controller.ts b/apps/backend/src/quiz-zone/quiz-zone.controller.ts index 181cb48..4b6463d 100644 --- a/apps/backend/src/quiz-zone/quiz-zone.controller.ts +++ b/apps/backend/src/quiz-zone/quiz-zone.controller.ts @@ -12,7 +12,7 @@ import { ApiOperation, ApiParam, ApiResponse, ApiTags } from '@nestjs/swagger'; import { QuizZoneService } from './quiz-zone.service'; import { ChatService } from '../chat/chat.service'; import { CreateQuizZoneDto } from './dto/create-quiz-zone.dto'; -import { PlayService } from '../play/play.service'; +import { SessionWithQuizZone } from '../core/SessionWsAdapter'; @ApiTags('Quiz Zone') @Controller('quiz-zone') @@ -20,7 +20,6 @@ export class QuizZoneController { constructor( private readonly quizZoneService: QuizZoneService, private readonly chatService: ChatService, - private readonly playService: PlayService, ) {} @Post() @@ -30,7 +29,7 @@ export class QuizZoneController { @ApiResponse({ status: 400, description: '세션 정보가 없습니다.' }) async create( @Body() createQuizZoneDto: CreateQuizZoneDto, - @Session() session: Record, + @Session() session: SessionWithQuizZone, ): Promise { if (!session || !session.id) { throw new BadRequestException('세션 정보가 없습니다.'); @@ -52,7 +51,7 @@ export class QuizZoneController { }) @ApiResponse({ status: 400, description: '세션 정보가 없습니다.' }) async checkExistingQuizZoneParticipation( - @Session() session: Record, + @Session() session: SessionWithQuizZone, @Param('quizZoneId') quizZoneId: string, ) { const sessionQuizZoneId = session.quizZoneId; @@ -69,7 +68,7 @@ export class QuizZoneController { }) @ApiResponse({ status: 400, description: '세션 정보가 없습니다.' }) async findQuizZoneInfo( - @Session() session: Record, + @Session() session: SessionWithQuizZone, @Param('quizZoneId') quizZoneId: string, ) { const serverTime = Date.now(); diff --git a/apps/backend/src/quiz-zone/quiz-zone.module.ts b/apps/backend/src/quiz-zone/quiz-zone.module.ts index 802a0c0..6efedfa 100644 --- a/apps/backend/src/quiz-zone/quiz-zone.module.ts +++ b/apps/backend/src/quiz-zone/quiz-zone.module.ts @@ -1,14 +1,13 @@ -import { forwardRef, Module } from '@nestjs/common'; +import { Module } from '@nestjs/common'; import { QuizZoneService } from './quiz-zone.service'; import { QuizZoneController } from './quiz-zone.controller'; import { QuizZoneRepositoryMemory } from './repository/quiz-zone.memory.repository'; import { QuizModule } from '../quiz/quiz.module'; import { ChatModule } from 'src/chat/chat.module'; -import { PlayModule } from '../play/play.module'; @Module({ controllers: [QuizZoneController], - imports: [QuizModule, ChatModule, forwardRef(() => PlayModule)], + imports: [QuizModule, ChatModule], providers: [ QuizZoneService, { From aab69ef1024f01092bef6cc23abc431d252e6766 Mon Sep 17 00:00:00 2001 From: mario Date: Thu, 23 Jan 2025 09:40:26 +0900 Subject: [PATCH 13/26] =?UTF-8?q?refactor:=20pub-sub=20=EC=9D=B4=EB=A6=84?= =?UTF-8?q?=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - pub-sub 이름을 broker로 변경 - play, chat 모듈에 변경사항 반영 - 디렉터리 위치 변경 --- apps/backend/src/chat/chat.module.ts | 4 +-- apps/backend/src/chat/chat.service.ts | 16 +++++----- .../interfaces/broker.interface.ts} | 2 +- .../interfaces/message.interface.ts | 0 .../{pub-sub => broker}/message-broker.ts | 4 +-- .../reactive-message-broker.ts | 4 +-- .../src/core/{pub-sub => broker}/types.ts | 0 apps/backend/src/play/play.gateway.ts | 30 +++++++++---------- apps/backend/src/play/play.module.ts | 4 +-- 9 files changed, 32 insertions(+), 32 deletions(-) rename apps/backend/src/core/{pub-sub/interfaces/pub-sub.interface.ts => broker/interfaces/broker.interface.ts} (91%) rename apps/backend/src/core/{pub-sub => broker}/interfaces/message.interface.ts (100%) rename apps/backend/src/core/{pub-sub => broker}/message-broker.ts (93%) rename apps/backend/src/core/{pub-sub => broker}/reactive-message-broker.ts (91%) rename apps/backend/src/core/{pub-sub => broker}/types.ts (100%) diff --git a/apps/backend/src/chat/chat.module.ts b/apps/backend/src/chat/chat.module.ts index fe93fef..3f16571 100644 --- a/apps/backend/src/chat/chat.module.ts +++ b/apps/backend/src/chat/chat.module.ts @@ -2,7 +2,7 @@ import { Module } from '@nestjs/common'; import { ChatController } from './chat.controller'; import { ChatRepositoryMemory } from './repository/chat.memory.repository'; import { ChatService } from './chat.service'; -import { ReactiveMessageBroker } from '../core/pub-sub/reactive-message-broker'; +import { ReactiveMessageBroker } from '../core/broker/reactive-message-broker'; @Module({ controllers: [ChatController], @@ -17,7 +17,7 @@ import { ReactiveMessageBroker } from '../core/pub-sub/reactive-message-broker'; useClass: ChatRepositoryMemory, }, { - provide: 'PubSub', + provide: 'Broker', useClass: ReactiveMessageBroker, } ], diff --git a/apps/backend/src/chat/chat.service.ts b/apps/backend/src/chat/chat.service.ts index f46e5a9..d6887d4 100644 --- a/apps/backend/src/chat/chat.service.ts +++ b/apps/backend/src/chat/chat.service.ts @@ -1,21 +1,21 @@ import { Injectable, Inject, NotFoundException } from '@nestjs/common'; import { ChatRepositoryMemory } from './repository/chat.memory.repository'; import { ChatMessage, Player } from '@web08-booquiz/shared'; -import { PubSub } from '../core/pub-sub/interfaces/pub-sub.interface'; +import { Broker } from '../core/broker/interfaces/broker.interface'; @Injectable() export class ChatService { constructor( @Inject('ChatRepository') private readonly chatRepository: ChatRepositoryMemory, - @Inject('PubSub') - private readonly pubSub: PubSub<'chat' | 'leave', ChatMessage> + @Inject('Broker') + private readonly broker: Broker<'chat' | 'leave', ChatMessage> ) { } async set(id: string) { await this.chatRepository.set(id); - await this.pubSub.addPublisher(id); + await this.broker.addPublisher(id); } async get(id: string) { @@ -27,11 +27,11 @@ export class ChatService { async delete(id: string) { await this.chatRepository.delete(id); - await this.pubSub.removePublisher(id); + await this.broker.removePublisher(id); } async join(chatId: string, player: Player, handleSendMessage: (data: ChatMessage) => void) { - const unsubscribe = await this.pubSub.subscribe(chatId, player.id, async (message) => { + const unsubscribe = await this.broker.subscribe(chatId, player.id, async (message) => { const { topic, data } = message; const { clientId } = data; @@ -48,14 +48,14 @@ export class ChatService { } async send(chatId: string, chatMessage: ChatMessage) { - await this.pubSub.publish(chatId, { + await this.broker.publish(chatId, { topic: 'chat', data: chatMessage, }); } async leave(chatId: string, clientId: string) { - await this.pubSub.publish(chatId, { + await this.broker.publish(chatId, { topic: 'leave', data: { clientId, diff --git a/apps/backend/src/core/pub-sub/interfaces/pub-sub.interface.ts b/apps/backend/src/core/broker/interfaces/broker.interface.ts similarity index 91% rename from apps/backend/src/core/pub-sub/interfaces/pub-sub.interface.ts rename to apps/backend/src/core/broker/interfaces/broker.interface.ts index e85462d..21f5077 100644 --- a/apps/backend/src/core/pub-sub/interfaces/pub-sub.interface.ts +++ b/apps/backend/src/core/broker/interfaces/broker.interface.ts @@ -3,7 +3,7 @@ import { MessageHandler } from '../types'; type Unsubscribe = () => Promise; -export interface PubSub { +export interface Broker { subscribe(publisherId: string, subscriberId: string, handler: MessageHandler): Promise; addPublisher(publisherId: string): Promise; diff --git a/apps/backend/src/core/pub-sub/interfaces/message.interface.ts b/apps/backend/src/core/broker/interfaces/message.interface.ts similarity index 100% rename from apps/backend/src/core/pub-sub/interfaces/message.interface.ts rename to apps/backend/src/core/broker/interfaces/message.interface.ts diff --git a/apps/backend/src/core/pub-sub/message-broker.ts b/apps/backend/src/core/broker/message-broker.ts similarity index 93% rename from apps/backend/src/core/pub-sub/message-broker.ts rename to apps/backend/src/core/broker/message-broker.ts index f58c614..b780562 100644 --- a/apps/backend/src/core/pub-sub/message-broker.ts +++ b/apps/backend/src/core/broker/message-broker.ts @@ -1,8 +1,8 @@ -import { PubSub } from './interfaces/pub-sub.interface'; +import { Broker } from './interfaces/broker.interface'; import { Message } from './interfaces/message.interface'; import { MessageHandler } from './types'; -export class MessageBroker implements PubSub { +export class MessageBroker implements Broker { constructor( private readonly publishers: Map>> = new Map(), ) {} diff --git a/apps/backend/src/core/pub-sub/reactive-message-broker.ts b/apps/backend/src/core/broker/reactive-message-broker.ts similarity index 91% rename from apps/backend/src/core/pub-sub/reactive-message-broker.ts rename to apps/backend/src/core/broker/reactive-message-broker.ts index f988dea..bfd0ff2 100644 --- a/apps/backend/src/core/pub-sub/reactive-message-broker.ts +++ b/apps/backend/src/core/broker/reactive-message-broker.ts @@ -1,8 +1,8 @@ import { Subject } from 'rxjs'; import { Message } from './interfaces/message.interface'; -import { PubSub } from './interfaces/pub-sub.interface'; +import { Broker } from './interfaces/broker.interface'; -export class ReactiveMessageBroker implements PubSub { +export class ReactiveMessageBroker implements Broker { constructor( private publishers: Map>> = new Map(), ) {} diff --git a/apps/backend/src/core/pub-sub/types.ts b/apps/backend/src/core/broker/types.ts similarity index 100% rename from apps/backend/src/core/pub-sub/types.ts rename to apps/backend/src/core/broker/types.ts diff --git a/apps/backend/src/play/play.gateway.ts b/apps/backend/src/play/play.gateway.ts index c5699c8..a0bff1a 100644 --- a/apps/backend/src/play/play.gateway.ts +++ b/apps/backend/src/play/play.gateway.ts @@ -16,7 +16,7 @@ import { RuntimeException } from '@nestjs/core/errors/exceptions'; import { SubmitResponseDto } from './dto/submit-response.dto'; import { ChatService } from '../chat/chat.service'; import { ChatMessage, SendEventMessage } from '@web08-booquiz/shared'; -import { PubSub } from '../core/pub-sub/interfaces/pub-sub.interface'; +import { Broker } from '../core/broker/interfaces/broker.interface'; /** * 퀴즈 게임에 대한 WebSocket 연결을 관리하는 Gateway입니다. @@ -32,8 +32,8 @@ export class PlayGateway implements OnGatewayInit { private readonly clients: Map, private readonly playService: PlayService, private readonly chatService: ChatService, - @Inject('PubSub') - private readonly pubSub: PubSub>, + @Inject('Broker') + private readonly broker: Broker>, ) {} /** @@ -83,7 +83,7 @@ export class PlayGateway implements OnGatewayInit { client.send(JSON.stringify({event: 'chat', data: message})); }); - await this.pubSub.publish(quizZoneId, { topic: id, data: { + await this.broker.publish(quizZoneId, { topic: id, data: { event: 'someone_join', data: { id, nickname } }}); @@ -103,7 +103,7 @@ export class PlayGateway implements OnGatewayInit { await this.playService.changeNickname(quizZoneId, id, changedNickname); - await this.pubSub.publish(quizZoneId, {topic: id, data: { + await this.broker.publish(quizZoneId, {topic: id, data: { event: 'changeNickname', data: {id, changedNickname} }}); @@ -125,7 +125,7 @@ export class PlayGateway implements OnGatewayInit { await this.playService.startQuizZone(quizZoneId, id); - await this.pubSub.publish(quizZoneId, {topic: quizZoneId, data: { + await this.broker.publish(quizZoneId, {topic: quizZoneId, data: { event: 'start', data: 'OK' }}); @@ -143,7 +143,7 @@ export class PlayGateway implements OnGatewayInit { const { nextQuiz, currentQuizResult } = await this.playService.playNextQuiz( quizZoneId, async () => { - await this.pubSub.publish(quizZoneId, {topic: quizZoneId, data: { + await this.broker.publish(quizZoneId, {topic: quizZoneId, data: { event: 'quizTimeOut', data: undefined, }}); @@ -151,7 +151,7 @@ export class PlayGateway implements OnGatewayInit { }, ); - await this.pubSub.publish(quizZoneId, {topic: quizZoneId, data: { + await this.broker.publish(quizZoneId, {topic: quizZoneId, data: { event: 'nextQuiz', data: { nextQuiz, currentQuizResult } }}); @@ -165,7 +165,7 @@ export class PlayGateway implements OnGatewayInit { } private async finishQuizZone(quizZoneId: string) { - await this.pubSub.publish(quizZoneId, {topic: quizZoneId, data: {event: 'finish', data: undefined}}); + await this.broker.publish(quizZoneId, {topic: quizZoneId, data: {event: 'finish', data: undefined}}); this.server.emit('summary', quizZoneId); } @@ -196,7 +196,7 @@ export class PlayGateway implements OnGatewayInit { if (isLastSubmit) { this.server.emit('nextQuiz', quizZoneId); } else { - await this.pubSub.publish(quizZoneId, {topic: id, data: { + await this.broker.publish(quizZoneId, {topic: id, data: { event: 'someone_submit', data: { id, submittedCount } }}); @@ -238,7 +238,7 @@ export class PlayGateway implements OnGatewayInit { private clearQuizZone(quizZoneId: string, time: number) { setTimeout(async () => { await this.playService.clearQuizZone(quizZoneId); - await this.pubSub.publish(quizZoneId, {topic: quizZoneId, data: {event: 'close', data: undefined}}); + await this.broker.publish(quizZoneId, {topic: quizZoneId, data: {event: 'close', data: undefined}}); await this.chatService.delete(quizZoneId); }, time); } @@ -257,10 +257,10 @@ export class PlayGateway implements OnGatewayInit { const { isHost } = await this.playService.leaveQuizZone(quizZoneId, id); if (isHost) { - await this.pubSub.publish(quizZoneId, {topic: id, data: {event: 'close', data: undefined}}); + await this.broker.publish(quizZoneId, {topic: id, data: {event: 'close', data: undefined}}); this.clearQuizZone(quizZoneId, 0); } else { - await this.pubSub.publish(quizZoneId, {topic: id, data: {event: 'someone_leave', data: undefined}}); + await this.broker.publish(quizZoneId, {topic: id, data: {event: 'someone_leave', data: undefined}}); await this.chatService.leave(quizZoneId, id); } @@ -281,9 +281,9 @@ export class PlayGateway implements OnGatewayInit { this.clients.set(clientId, client); try { - await this.pubSub.addPublisher(quizZoneId); + await this.broker.addPublisher(quizZoneId); } catch (error) {} - const unsubscribe = await this.pubSub.subscribe( + const unsubscribe = await this.broker.subscribe( quizZoneId, clientId, async (message) => { const {topic, data} = message; diff --git a/apps/backend/src/play/play.module.ts b/apps/backend/src/play/play.module.ts index 4ede6a8..0b902ee 100644 --- a/apps/backend/src/play/play.module.ts +++ b/apps/backend/src/play/play.module.ts @@ -3,7 +3,7 @@ import { PlayService } from './play.service'; import { PlayGateway } from './play.gateway'; import { QuizZoneModule } from '../quiz-zone/quiz-zone.module'; import { ChatModule } from 'src/chat/chat.module'; -import { ReactiveMessageBroker } from '../core/pub-sub/reactive-message-broker'; +import { ReactiveMessageBroker } from '../core/broker/reactive-message-broker'; @Module({ imports: [QuizZoneModule, ChatModule], @@ -18,7 +18,7 @@ import { ReactiveMessageBroker } from '../core/pub-sub/reactive-message-broker'; useValue: new Map(), }, { - provide: 'PubSub', + provide: 'Broker', useValue: new ReactiveMessageBroker, }, PlayService, From fe899b2e49a03cde0b7900e7e1ea7c706844e2f8 Mon Sep 17 00:00:00 2001 From: mario Date: Thu, 23 Jan 2025 09:49:15 +0900 Subject: [PATCH 14/26] =?UTF-8?q?refactor:=20Broker=20=EA=B5=AC=EC=A1=B0?= =?UTF-8?q?=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 유연한 메시지 처리를 위한 Message를 제네릭으로 변경 - Message 제네릭 변경 반영 --- .../broker/interfaces/broker.interface.ts | 7 +++---- .../message-with-topic.interface.ts | 4 ++++ .../broker/interfaces/message.interface.ts | 4 ---- .../backend/src/core/broker/message-broker.ts | 21 +++++++++---------- .../core/broker/reactive-message-broker.ts | 15 ++++++------- apps/backend/src/core/broker/types.ts | 4 +--- 6 files changed, 24 insertions(+), 31 deletions(-) create mode 100644 apps/backend/src/core/broker/interfaces/message-with-topic.interface.ts delete mode 100644 apps/backend/src/core/broker/interfaces/message.interface.ts diff --git a/apps/backend/src/core/broker/interfaces/broker.interface.ts b/apps/backend/src/core/broker/interfaces/broker.interface.ts index 21f5077..8db5565 100644 --- a/apps/backend/src/core/broker/interfaces/broker.interface.ts +++ b/apps/backend/src/core/broker/interfaces/broker.interface.ts @@ -1,14 +1,13 @@ -import { Message } from './message.interface' import { MessageHandler } from '../types'; type Unsubscribe = () => Promise; -export interface Broker { - subscribe(publisherId: string, subscriberId: string, handler: MessageHandler): Promise; +export interface Broker { + subscribe(publisherId: string, subscriberId: string, handler: MessageHandler): Promise; addPublisher(publisherId: string): Promise; removePublisher(publisherId: string): Promise; - publish(publisherId: string, message: Message): Promise; + publish(publisherId: string, message: TMessage): Promise; } \ No newline at end of file diff --git a/apps/backend/src/core/broker/interfaces/message-with-topic.interface.ts b/apps/backend/src/core/broker/interfaces/message-with-topic.interface.ts new file mode 100644 index 0000000..0ca261d --- /dev/null +++ b/apps/backend/src/core/broker/interfaces/message-with-topic.interface.ts @@ -0,0 +1,4 @@ +export interface MessageWithTopic { + topic: TTopic; + data: TData; +} \ No newline at end of file diff --git a/apps/backend/src/core/broker/interfaces/message.interface.ts b/apps/backend/src/core/broker/interfaces/message.interface.ts deleted file mode 100644 index 40c8fac..0000000 --- a/apps/backend/src/core/broker/interfaces/message.interface.ts +++ /dev/null @@ -1,4 +0,0 @@ -export interface Message { - topic: TTopic; - data: TData; -} \ No newline at end of file diff --git a/apps/backend/src/core/broker/message-broker.ts b/apps/backend/src/core/broker/message-broker.ts index b780562..9641454 100644 --- a/apps/backend/src/core/broker/message-broker.ts +++ b/apps/backend/src/core/broker/message-broker.ts @@ -1,29 +1,28 @@ import { Broker } from './interfaces/broker.interface'; -import { Message } from './interfaces/message.interface'; import { MessageHandler } from './types'; -export class MessageBroker implements Broker { +export class MessageBroker implements Broker { constructor( - private readonly publishers: Map>> = new Map(), + private readonly subscribers: Map>> = new Map(), ) {} public async addPublisher(id: string) { - if (this.publishers.has(id)) { + if (this.subscribers.has(id)) { throw new Error(`Publisher with ID ${id} already exists`); } - this.publishers.set(id, new Map()); + this.subscribers.set(id, new Map()); } public async removePublisher(id: string) { - if (!this.publishers.has(id)) { + if (!this.subscribers.has(id)) { throw new Error(`Publisher with ID ${id} does not exist`); } - this.publishers.delete(id); + this.subscribers.delete(id); } - public async publish(id: string, message: Message) { + public async publish(id: string, message: TMessage) { const subscribers = this.getSubscribers(id); if (subscribers === undefined) { @@ -33,7 +32,7 @@ export class MessageBroker implements Broker { await Promise.all([...subscribers.values()].map(handler => handler(message))); } - public async subscribe(publisherId: string, subscriberId: string, handler: MessageHandler) { + public async subscribe(publisherId: string, subscriberId: string, handler: MessageHandler) { const subscribers = this.getSubscribers(publisherId); subscribers.set(subscriberId, handler); return () => this.unsubscribe(publisherId, subscriberId); @@ -45,9 +44,9 @@ export class MessageBroker implements Broker { } private getSubscribers(id: string) { - if (!this.publishers.has(id)) { + if (!this.subscribers.has(id)) { throw new Error(`Publisher with ID ${id} does not exist`); } - return this.publishers.get(id); + return this.subscribers.get(id); } } \ No newline at end of file diff --git a/apps/backend/src/core/broker/reactive-message-broker.ts b/apps/backend/src/core/broker/reactive-message-broker.ts index bfd0ff2..b51fd96 100644 --- a/apps/backend/src/core/broker/reactive-message-broker.ts +++ b/apps/backend/src/core/broker/reactive-message-broker.ts @@ -1,10 +1,10 @@ import { Subject } from 'rxjs'; -import { Message } from './interfaces/message.interface'; import { Broker } from './interfaces/broker.interface'; +import { MessageHandler } from './types'; -export class ReactiveMessageBroker implements Broker { +export class ReactiveMessageBroker implements Broker { constructor( - private publishers: Map>> = new Map(), + private publishers: Map> = new Map(), ) {} public async addPublisher(id: string) { @@ -12,7 +12,7 @@ export class ReactiveMessageBroker implements Broker>()); + this.publishers.set(id, new Subject()); } public async removePublisher(id: string) { @@ -25,7 +25,7 @@ export class ReactiveMessageBroker implements Broker) { + public async publish(id: string, message: TMessage) { const publisher = this.publishers.get(id); if (!publisher) { @@ -35,10 +35,7 @@ export class ReactiveMessageBroker implements Broker) => void + public async subscribe(publisherId: string, subscriberId: string, handler: MessageHandler ) { const publisher = this.publishers.get(publisherId); diff --git a/apps/backend/src/core/broker/types.ts b/apps/backend/src/core/broker/types.ts index 1021e38..88b2a11 100644 --- a/apps/backend/src/core/broker/types.ts +++ b/apps/backend/src/core/broker/types.ts @@ -1,3 +1 @@ -import { Message } from './interfaces/message.interface'; - -export type MessageHandler = (message: Message) => void | Promise; \ No newline at end of file +export type MessageHandler = (message: TMessage) => void | Promise; \ No newline at end of file From 6ff1c2d6504d1876bdfe4841d6a1275dfa36a2d8 Mon Sep 17 00:00:00 2001 From: mario Date: Thu, 23 Jan 2025 10:05:19 +0900 Subject: [PATCH 15/26] =?UTF-8?q?refactor:=20broker=20=EB=B3=80=EA=B2=BD?= =?UTF-8?q?=20=EC=82=AC=ED=95=AD=20=EB=B0=98=EC=98=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 브로커 제네릭 변경으로 인한 메시지 브로드캐스팅 로직 변경 - SendEvent 인터페이스 sender 속성 추가 - SendEvent 인터페이스 변경으로 인한 핸들러 처리 로직 변경 --- apps/backend/src/play/play.gateway.ts | 53 +++++++------------ .../src/interfaces/send-event.interface.ts | 1 + 2 files changed, 21 insertions(+), 33 deletions(-) diff --git a/apps/backend/src/play/play.gateway.ts b/apps/backend/src/play/play.gateway.ts index a0bff1a..1cf5c3e 100644 --- a/apps/backend/src/play/play.gateway.ts +++ b/apps/backend/src/play/play.gateway.ts @@ -33,7 +33,7 @@ export class PlayGateway implements OnGatewayInit { private readonly playService: PlayService, private readonly chatService: ChatService, @Inject('Broker') - private readonly broker: Broker>, + private readonly broker: Broker>, ) {} /** @@ -83,13 +83,11 @@ export class PlayGateway implements OnGatewayInit { client.send(JSON.stringify({event: 'chat', data: message})); }); - await this.broker.publish(quizZoneId, { topic: id, data: { - event: 'someone_join', - data: { id, nickname } - }}); + await this.broker.publish(quizZoneId, { event: 'someone_join', sender: id, data: {id, nickname} }); return { event: 'join', + sender: id, data: players.map(({ id, nickname }) => ({ id, nickname })) }; } @@ -103,13 +101,11 @@ export class PlayGateway implements OnGatewayInit { await this.playService.changeNickname(quizZoneId, id, changedNickname); - await this.broker.publish(quizZoneId, {topic: id, data: { - event: 'changeNickname', - data: {id, changedNickname} - }}); + await this.broker.publish(quizZoneId, {event: 'changeNickname', sender: id, data: {id, changedNickname}}); return { event: 'changeNickname', + sender: id, data: 'OK', }; } @@ -125,10 +121,7 @@ export class PlayGateway implements OnGatewayInit { await this.playService.startQuizZone(quizZoneId, id); - await this.broker.publish(quizZoneId, {topic: quizZoneId, data: { - event: 'start', - data: 'OK' - }}); + await this.broker.publish(quizZoneId, {event: 'start', sender: quizZoneId, data: 'OK'}); this.server.emit('nextQuiz', quizZoneId); } @@ -143,18 +136,14 @@ export class PlayGateway implements OnGatewayInit { const { nextQuiz, currentQuizResult } = await this.playService.playNextQuiz( quizZoneId, async () => { - await this.broker.publish(quizZoneId, {topic: quizZoneId, data: { - event: 'quizTimeOut', - data: undefined, - }}); + await this.broker.publish(quizZoneId, {event: 'quizTimeOut', sender: quizZoneId, data: undefined}); this.server.emit('nextQuiz', quizZoneId); }, ); - await this.broker.publish(quizZoneId, {topic: quizZoneId, data: { - event: 'nextQuiz', - data: { nextQuiz, currentQuizResult } - }}); + await this.broker.publish(quizZoneId, { + event: 'nextQuiz', sender: quizZoneId, data: { nextQuiz, currentQuizResult } + }); } catch (error) { if (error instanceof RuntimeException) { await this.finishQuizZone(quizZoneId); @@ -165,7 +154,7 @@ export class PlayGateway implements OnGatewayInit { } private async finishQuizZone(quizZoneId: string) { - await this.broker.publish(quizZoneId, {topic: quizZoneId, data: {event: 'finish', data: undefined}}); + await this.broker.publish(quizZoneId, {event: 'finish', sender: quizZoneId, data: undefined}); this.server.emit('summary', quizZoneId); } @@ -196,14 +185,12 @@ export class PlayGateway implements OnGatewayInit { if (isLastSubmit) { this.server.emit('nextQuiz', quizZoneId); } else { - await this.broker.publish(quizZoneId, {topic: id, data: { - event: 'someone_submit', - data: { id, submittedCount } - }}); + await this.broker.publish(quizZoneId, {event: 'someone_submit', sender: id, data: { id, submittedCount }}); } return { event: 'submit', + sender: id, data: { fastestPlayerIds, submittedCount, totalPlayerCount, chatMessages: await this.chatService.get(quizZoneId) @@ -238,7 +225,7 @@ export class PlayGateway implements OnGatewayInit { private clearQuizZone(quizZoneId: string, time: number) { setTimeout(async () => { await this.playService.clearQuizZone(quizZoneId); - await this.broker.publish(quizZoneId, {topic: quizZoneId, data: {event: 'close', data: undefined}}); + await this.broker.publish(quizZoneId, {event: 'close', sender: quizZoneId, data: undefined}); await this.chatService.delete(quizZoneId); }, time); } @@ -257,10 +244,10 @@ export class PlayGateway implements OnGatewayInit { const { isHost } = await this.playService.leaveQuizZone(quizZoneId, id); if (isHost) { - await this.broker.publish(quizZoneId, {topic: id, data: {event: 'close', data: undefined}}); + await this.broker.publish(quizZoneId, {event: 'close', sender: quizZoneId, data: undefined}); this.clearQuizZone(quizZoneId, 0); } else { - await this.broker.publish(quizZoneId, {topic: id, data: {event: 'someone_leave', data: undefined}}); + await this.broker.publish(quizZoneId, {event: 'someone_leave', sender: id, data: undefined}); await this.chatService.leave(quizZoneId, id); } @@ -285,11 +272,11 @@ export class PlayGateway implements OnGatewayInit { } catch (error) {} const unsubscribe = await this.broker.subscribe( quizZoneId, clientId, async (message) => { - const {topic, data} = message; + const {event, sender} = message; - if (topic !== clientId) { - client.send(JSON.stringify(data)); - } else if (data.event === 'someone_leave') { + if (sender !== clientId) { + client.send(JSON.stringify(message)); + } else if (event === 'someone_leave') { await unsubscribe(); this.clients.delete(clientId); client.close(); diff --git a/packages/shared/src/interfaces/send-event.interface.ts b/packages/shared/src/interfaces/send-event.interface.ts index 4361049..d6c1563 100644 --- a/packages/shared/src/interfaces/send-event.interface.ts +++ b/packages/shared/src/interfaces/send-event.interface.ts @@ -6,5 +6,6 @@ */ export interface SendEventMessage { event: string; + sender: string; data: T; } \ No newline at end of file From d7306a04b16663470c0dfd29bba40856804a35e5 Mon Sep 17 00:00:00 2001 From: mario Date: Thu, 23 Jan 2025 10:10:30 +0900 Subject: [PATCH 16/26] =?UTF-8?q?refactor:=20chat=20=EC=84=9C=EB=B9=84?= =?UTF-8?q?=EC=8A=A4=20broker=20=EB=A6=AC=ED=8E=99=ED=86=A0=EB=A7=81=20?= =?UTF-8?q?=EB=B0=98=EC=98=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - chat 서비스 브로커 타입 선언 변경 --- apps/backend/src/chat/chat.service.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/backend/src/chat/chat.service.ts b/apps/backend/src/chat/chat.service.ts index d6887d4..12ee138 100644 --- a/apps/backend/src/chat/chat.service.ts +++ b/apps/backend/src/chat/chat.service.ts @@ -2,6 +2,7 @@ import { Injectable, Inject, NotFoundException } from '@nestjs/common'; import { ChatRepositoryMemory } from './repository/chat.memory.repository'; import { ChatMessage, Player } from '@web08-booquiz/shared'; import { Broker } from '../core/broker/interfaces/broker.interface'; +import { MessageWithTopic } from '../core/broker/interfaces/message-with-topic.interface'; @Injectable() export class ChatService { @@ -9,7 +10,7 @@ export class ChatService { @Inject('ChatRepository') private readonly chatRepository: ChatRepositoryMemory, @Inject('Broker') - private readonly broker: Broker<'chat' | 'leave', ChatMessage> + private readonly broker: Broker> ) { } From 1d1971c613876ef9f0bad0c6da380c484dba0be0 Mon Sep 17 00:00:00 2001 From: mario Date: Thu, 23 Jan 2025 10:24:00 +0900 Subject: [PATCH 17/26] =?UTF-8?q?refactor:=20broker=20=EB=AA=A8=EB=93=88?= =?UTF-8?q?=20=EC=9C=84=EC=B9=98=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - broker 모듈 공용 모듈로 위치 변경 - message-broker undefined narrowing 추가 - reactive-message-broker.ts error 타입 추가 --- .../broker/interfaces/broker.interface.ts | 0 .../message-with-topic.interface.ts | 0 .../shared}/src/core/broker/message-broker.ts | 32 +++++++++++-------- .../shared}/src/core/broker/types.ts | 0 4 files changed, 18 insertions(+), 14 deletions(-) rename {apps/backend => packages/shared}/src/core/broker/interfaces/broker.interface.ts (100%) rename {apps/backend => packages/shared}/src/core/broker/interfaces/message-with-topic.interface.ts (100%) rename {apps/backend => packages/shared}/src/core/broker/message-broker.ts (61%) rename {apps/backend => packages/shared}/src/core/broker/types.ts (100%) diff --git a/apps/backend/src/core/broker/interfaces/broker.interface.ts b/packages/shared/src/core/broker/interfaces/broker.interface.ts similarity index 100% rename from apps/backend/src/core/broker/interfaces/broker.interface.ts rename to packages/shared/src/core/broker/interfaces/broker.interface.ts diff --git a/apps/backend/src/core/broker/interfaces/message-with-topic.interface.ts b/packages/shared/src/core/broker/interfaces/message-with-topic.interface.ts similarity index 100% rename from apps/backend/src/core/broker/interfaces/message-with-topic.interface.ts rename to packages/shared/src/core/broker/interfaces/message-with-topic.interface.ts diff --git a/apps/backend/src/core/broker/message-broker.ts b/packages/shared/src/core/broker/message-broker.ts similarity index 61% rename from apps/backend/src/core/broker/message-broker.ts rename to packages/shared/src/core/broker/message-broker.ts index 9641454..efc167b 100644 --- a/apps/backend/src/core/broker/message-broker.ts +++ b/packages/shared/src/core/broker/message-broker.ts @@ -3,27 +3,27 @@ import { MessageHandler } from './types'; export class MessageBroker implements Broker { constructor( - private readonly subscribers: Map>> = new Map(), + private readonly publishers: Map>> = new Map(), ) {} public async addPublisher(id: string) { - if (this.subscribers.has(id)) { + if (this.publishers.has(id)) { throw new Error(`Publisher with ID ${id} already exists`); } - this.subscribers.set(id, new Map()); + this.publishers.set(id, new Map()); } public async removePublisher(id: string) { - if (!this.subscribers.has(id)) { + if (!this.publishers.has(id)) { throw new Error(`Publisher with ID ${id} does not exist`); } - this.subscribers.delete(id); + this.publishers.delete(id); } public async publish(id: string, message: TMessage) { - const subscribers = this.getSubscribers(id); + const subscribers = this.publishers.get(id); if (subscribers === undefined) { throw new Error(`Publisher with ID ${id} does not exist`); @@ -33,20 +33,24 @@ export class MessageBroker implements Broker { } public async subscribe(publisherId: string, subscriberId: string, handler: MessageHandler) { - const subscribers = this.getSubscribers(publisherId); + const subscribers = this.publishers.get(publisherId); + + if (subscribers === undefined) { + throw new Error(`Publisher with ID ${publisherId} does not exist`); + } + subscribers.set(subscriberId, handler); + return () => this.unsubscribe(publisherId, subscriberId); } private async unsubscribe(publisherId: string, subscriberId: string) { - const subscribers = this.getSubscribers(publisherId); - subscribers.delete(subscriberId); - } + const subscribers = this.publishers.get(publisherId); - private getSubscribers(id: string) { - if (!this.subscribers.has(id)) { - throw new Error(`Publisher with ID ${id} does not exist`); + if (subscribers === undefined) { + throw new Error(`Publisher with ID ${publisherId} does not exist`); } - return this.subscribers.get(id); + + subscribers.delete(subscriberId); } } \ No newline at end of file diff --git a/apps/backend/src/core/broker/types.ts b/packages/shared/src/core/broker/types.ts similarity index 100% rename from apps/backend/src/core/broker/types.ts rename to packages/shared/src/core/broker/types.ts From 75bf70c9d66ae61c4fac70501b396aa9bb82aafc Mon Sep 17 00:00:00 2001 From: mario Date: Thu, 23 Jan 2025 10:31:25 +0900 Subject: [PATCH 18/26] =?UTF-8?q?refactor:=20=EA=B3=B5=EC=9A=A9=20broker?= =?UTF-8?q?=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 공용 브로커 모듈 내보내기 추가 --- packages/shared/src/core/broker/index.ts | 5 +++++ .../shared}/src/core/broker/reactive-message-broker.ts | 2 +- packages/shared/src/index.ts | 3 ++- 3 files changed, 8 insertions(+), 2 deletions(-) create mode 100644 packages/shared/src/core/broker/index.ts rename {apps/backend => packages/shared}/src/core/broker/reactive-message-broker.ts (93%) diff --git a/packages/shared/src/core/broker/index.ts b/packages/shared/src/core/broker/index.ts new file mode 100644 index 0000000..79c5a00 --- /dev/null +++ b/packages/shared/src/core/broker/index.ts @@ -0,0 +1,5 @@ +export * from './message-broker'; +export * from './reactive-message-broker'; +export * from './types'; +export * from './interfaces/broker.interface'; +export * from './interfaces/message-with-topic.interface'; \ No newline at end of file diff --git a/apps/backend/src/core/broker/reactive-message-broker.ts b/packages/shared/src/core/broker/reactive-message-broker.ts similarity index 93% rename from apps/backend/src/core/broker/reactive-message-broker.ts rename to packages/shared/src/core/broker/reactive-message-broker.ts index b51fd96..a3074ac 100644 --- a/apps/backend/src/core/broker/reactive-message-broker.ts +++ b/packages/shared/src/core/broker/reactive-message-broker.ts @@ -45,7 +45,7 @@ export class ReactiveMessageBroker implements Broker { const subscription = publisher.subscribe({ next: handler, - error: (error) => console.error(`Error in subscription ${publisherId}:${subscriberId} :`, error) + error: (error: any) => console.error(`Error in subscription ${publisherId}:${subscriberId} :`, error) }); return async () => subscription.unsubscribe(); diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index a3c2af7..7ddc6fd 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -25,4 +25,5 @@ export * from './entities/quiz-summary.entity'; export * from './entities/quiz-zone.entity'; export * from './entities/rank.entity'; export * from './entities/submitted-quiz.entity'; -export * from './interfaces/send-event.interface'; \ No newline at end of file +export * from './interfaces/send-event.interface'; +export * from './core/broker/index'; \ No newline at end of file From 3244f1ae9f33e9f48fd3b3e222a8e50fd5c188b2 Mon Sep 17 00:00:00 2001 From: mario Date: Thu, 23 Jan 2025 10:32:22 +0900 Subject: [PATCH 19/26] =?UTF-8?q?chore:=20shared=20=EC=9D=98=EC=A1=B4?= =?UTF-8?q?=EC=84=B1=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - broker를 위한 rxjs 의존성 추가 --- packages/shared/package.json | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/shared/package.json b/packages/shared/package.json index bb3c25f..848465f 100644 --- a/packages/shared/package.json +++ b/packages/shared/package.json @@ -18,5 +18,8 @@ "eslint-config-prettier": "^9.0.0", "prettier": "^3.0.0", "typescript": "^5.x.x" + }, + "dependencies": { + "rxjs": "^7.8.1" } } From 5153f074f8119fea75a83607ddb3622378dd8ff9 Mon Sep 17 00:00:00 2001 From: mario Date: Thu, 23 Jan 2025 10:33:04 +0900 Subject: [PATCH 20/26] =?UTF-8?q?refactor:=20broker=20=EB=AA=A8=EB=93=88?= =?UTF-8?q?=20=EB=B3=80=EA=B2=BD=20=EB=B0=98=EC=98=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - broker 관련 참조를 shared로 변경 --- apps/backend/src/chat/chat.module.ts | 2 +- apps/backend/src/chat/chat.service.ts | 4 +--- apps/backend/src/play/play.gateway.ts | 3 +-- apps/backend/src/play/play.module.ts | 2 +- 4 files changed, 4 insertions(+), 7 deletions(-) diff --git a/apps/backend/src/chat/chat.module.ts b/apps/backend/src/chat/chat.module.ts index 3f16571..9bbd707 100644 --- a/apps/backend/src/chat/chat.module.ts +++ b/apps/backend/src/chat/chat.module.ts @@ -2,7 +2,7 @@ import { Module } from '@nestjs/common'; import { ChatController } from './chat.controller'; import { ChatRepositoryMemory } from './repository/chat.memory.repository'; import { ChatService } from './chat.service'; -import { ReactiveMessageBroker } from '../core/broker/reactive-message-broker'; +import { ReactiveMessageBroker } from '@web08-booquiz/shared'; @Module({ controllers: [ChatController], diff --git a/apps/backend/src/chat/chat.service.ts b/apps/backend/src/chat/chat.service.ts index 12ee138..25463c5 100644 --- a/apps/backend/src/chat/chat.service.ts +++ b/apps/backend/src/chat/chat.service.ts @@ -1,8 +1,6 @@ import { Injectable, Inject, NotFoundException } from '@nestjs/common'; import { ChatRepositoryMemory } from './repository/chat.memory.repository'; -import { ChatMessage, Player } from '@web08-booquiz/shared'; -import { Broker } from '../core/broker/interfaces/broker.interface'; -import { MessageWithTopic } from '../core/broker/interfaces/message-with-topic.interface'; +import { Broker, ChatMessage, MessageWithTopic, Player } from '@web08-booquiz/shared'; @Injectable() export class ChatService { diff --git a/apps/backend/src/play/play.gateway.ts b/apps/backend/src/play/play.gateway.ts index 1cf5c3e..93f6de2 100644 --- a/apps/backend/src/play/play.gateway.ts +++ b/apps/backend/src/play/play.gateway.ts @@ -15,8 +15,7 @@ import { WebSocketWithSession } from '../core/SessionWsAdapter'; import { RuntimeException } from '@nestjs/core/errors/exceptions'; import { SubmitResponseDto } from './dto/submit-response.dto'; import { ChatService } from '../chat/chat.service'; -import { ChatMessage, SendEventMessage } from '@web08-booquiz/shared'; -import { Broker } from '../core/broker/interfaces/broker.interface'; +import { Broker, ChatMessage, SendEventMessage } from '@web08-booquiz/shared'; /** * 퀴즈 게임에 대한 WebSocket 연결을 관리하는 Gateway입니다. diff --git a/apps/backend/src/play/play.module.ts b/apps/backend/src/play/play.module.ts index 0b902ee..a89cbaa 100644 --- a/apps/backend/src/play/play.module.ts +++ b/apps/backend/src/play/play.module.ts @@ -3,7 +3,7 @@ import { PlayService } from './play.service'; import { PlayGateway } from './play.gateway'; import { QuizZoneModule } from '../quiz-zone/quiz-zone.module'; import { ChatModule } from 'src/chat/chat.module'; -import { ReactiveMessageBroker } from '../core/broker/reactive-message-broker'; +import { ReactiveMessageBroker } from '@web08-booquiz/shared'; @Module({ imports: [QuizZoneModule, ChatModule], From b09a1f7a82d60cf2ac219112a839d33d6efbf902 Mon Sep 17 00:00:00 2001 From: mario Date: Thu, 23 Jan 2025 10:48:05 +0900 Subject: [PATCH 21/26] =?UTF-8?q?refactor:=20reactive-message-broker=20?= =?UTF-8?q?=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 생성자 인자를 readonly로 변경 --- packages/shared/src/core/broker/reactive-message-broker.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/shared/src/core/broker/reactive-message-broker.ts b/packages/shared/src/core/broker/reactive-message-broker.ts index a3074ac..a3b29d2 100644 --- a/packages/shared/src/core/broker/reactive-message-broker.ts +++ b/packages/shared/src/core/broker/reactive-message-broker.ts @@ -4,7 +4,7 @@ import { MessageHandler } from './types'; export class ReactiveMessageBroker implements Broker { constructor( - private publishers: Map> = new Map(), + private readonly publishers: Map> = new Map(), ) {} public async addPublisher(id: string) { From 68df88e71765e83107554ab525d9dd0e9119442a Mon Sep 17 00:00:00 2001 From: mario Date: Thu, 23 Jan 2025 10:48:55 +0900 Subject: [PATCH 22/26] =?UTF-8?q?chore:=20shared=20=ED=8C=A8=ED=82=A4?= =?UTF-8?q?=EC=A7=80=20=EC=9D=98=EC=A1=B4=EC=84=B1=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 모노레포 shared 패키지 rxjs 의존성 추가 --- pnpm-lock.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b838638..14a36c9 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -337,6 +337,10 @@ importers: version: 2.1.5(@types/node@22.9.0)(jsdom@25.0.1)(terser@5.36.0) packages/shared: + dependencies: + rxjs: + specifier: ^7.8.1 + version: 7.8.1 devDependencies: '@typescript-eslint/eslint-plugin': specifier: ^8.0.0 From f07a7351e43cb69c0019a0d23f06b73ecada8b54 Mon Sep 17 00:00:00 2001 From: mario Date: Thu, 23 Jan 2025 13:14:47 +0900 Subject: [PATCH 23/26] =?UTF-8?q?refactor:=20play=20clients=20=EC=9D=98?= =?UTF-8?q?=EC=A1=B4=EC=84=B1=20=EC=A0=9C=EA=B1=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - broker 도입을 통한 사용자 정보 의존성 분리 --- apps/backend/src/play/play.gateway.ts | 118 ++++++++++-------- apps/backend/src/play/play.service.ts | 15 +-- .../src/interfaces/send-event.interface.ts | 15 ++- 3 files changed, 81 insertions(+), 67 deletions(-) diff --git a/apps/backend/src/play/play.gateway.ts b/apps/backend/src/play/play.gateway.ts index 93f6de2..470da34 100644 --- a/apps/backend/src/play/play.gateway.ts +++ b/apps/backend/src/play/play.gateway.ts @@ -10,12 +10,12 @@ import { PlayService } from './play.service'; import { Server } from 'ws'; import { QuizSubmitDto } from './dto/quiz-submit.dto'; import { QuizJoinDto } from './dto/quiz-join.dto'; -import { Inject, NotFoundException } from '@nestjs/common'; +import { Inject } from '@nestjs/common'; import { WebSocketWithSession } from '../core/SessionWsAdapter'; import { RuntimeException } from '@nestjs/core/errors/exceptions'; import { SubmitResponseDto } from './dto/submit-response.dto'; import { ChatService } from '../chat/chat.service'; -import { Broker, ChatMessage, SendEventMessage } from '@web08-booquiz/shared'; +import { BroadcastPlayEvent, Broker, ChatMessage, PlayEvent, SendEventMessage } from '@web08-booquiz/shared'; /** * 퀴즈 게임에 대한 WebSocket 연결을 관리하는 Gateway입니다. @@ -27,12 +27,9 @@ export class PlayGateway implements OnGatewayInit { server: Server; constructor( - @Inject('ClientInfoStorage') - private readonly clients: Map, private readonly playService: PlayService, private readonly chatService: ChatService, - @Inject('Broker') - private readonly broker: Broker>, + @Inject('Broker') private readonly broker: Broker>, ) {} /** @@ -45,16 +42,6 @@ export class PlayGateway implements OnGatewayInit { server.on('summary', (quizZoneId: string) => this.summary(quizZoneId)); } - private sendToClient(clientId: string, event: string, data?: any) { - const socket = this.clients.get(clientId); - - if (socket === undefined) { - throw new NotFoundException('사용자의 접속 정보를 찾을 수 없습니다.') - } - - socket.send(JSON.stringify({ event, data })); - } - /** * 클라이언트가 퀴즈 방에 참여했다는 메세지를 방의 다른 참여자들에게 전송합니다. * @@ -65,7 +52,7 @@ export class PlayGateway implements OnGatewayInit { async join( @ConnectedSocket() client: WebSocketWithSession, @MessageBody() quizJoinDto: QuizJoinDto, - ): Promise> { + ): Promise> { const sessionId = client.session.id; const { quizZoneId } = quizJoinDto; @@ -86,7 +73,7 @@ export class PlayGateway implements OnGatewayInit { return { event: 'join', - sender: id, + sender: quizZoneId, data: players.map(({ id, nickname }) => ({ id, nickname })) }; } @@ -95,7 +82,7 @@ export class PlayGateway implements OnGatewayInit { async changeNickname( @ConnectedSocket() client: WebSocketWithSession, @MessageBody() changedNickname: string, - ): Promise> { + ): Promise> { const { id, quizZoneId } = client.session; await this.playService.changeNickname(quizZoneId, id, changedNickname); @@ -104,7 +91,7 @@ export class PlayGateway implements OnGatewayInit { return { event: 'changeNickname', - sender: id, + sender: quizZoneId, data: 'OK', }; } @@ -119,7 +106,6 @@ export class PlayGateway implements OnGatewayInit { const { id, quizZoneId } = client.session; await this.playService.startQuizZone(quizZoneId, id); - await this.broker.publish(quizZoneId, {event: 'start', sender: quizZoneId, data: 'OK'}); this.server.emit('nextQuiz', quizZoneId); @@ -134,10 +120,7 @@ export class PlayGateway implements OnGatewayInit { try { const { nextQuiz, currentQuizResult } = await this.playService.playNextQuiz( quizZoneId, - async () => { - await this.broker.publish(quizZoneId, {event: 'quizTimeOut', sender: quizZoneId, data: undefined}); - this.server.emit('nextQuiz', quizZoneId); - }, + this.quizTimeOut ); await this.broker.publish(quizZoneId, { @@ -157,6 +140,11 @@ export class PlayGateway implements OnGatewayInit { this.server.emit('summary', quizZoneId); } + private async quizTimeOut(quizZoneId: string) { + await this.broker.publish(quizZoneId, {event: 'quizTimeOut', sender: quizZoneId, data: undefined}); + this.server.emit('nextQuiz', quizZoneId); + } + /** * 클라이언트가 퀴즈 답안을 제출한 경우 호출됩니다. * @@ -168,7 +156,7 @@ export class PlayGateway implements OnGatewayInit { async submit( @ConnectedSocket() client: WebSocketWithSession, @MessageBody() quizSubmit: QuizSubmitDto, - ): Promise> { + ): Promise> { const { id, quizZoneId } = client.session; const { @@ -189,7 +177,7 @@ export class PlayGateway implements OnGatewayInit { return { event: 'submit', - sender: id, + sender: quizZoneId, data: { fastestPlayerIds, submittedCount, totalPlayerCount, chatMessages: await this.chatService.get(quizZoneId) @@ -203,11 +191,12 @@ export class PlayGateway implements OnGatewayInit { * @param quizZoneId - WebSocket 클라이언트 */ private async summary(quizZoneId: string) { - const summaries = await this.playService.summaryQuizZone(quizZoneId); - const endSocketTime = summaries[0].endSocketTime; + const summary = await this.playService.summaryQuizZone(quizZoneId); + const {endSocketTime} = summary; - summaries.map(async ({ id, score, submits, quizzes, ranks, endSocketTime }) => { - this.sendToClient(id, 'summary', { score, submits, quizzes, ranks, endSocketTime }); + await this.broker.publish(quizZoneId, { + event: 'summary', sender: quizZoneId, + data: summary }); this.clearQuizZone(quizZoneId, endSocketTime - Date.now()); @@ -237,7 +226,7 @@ export class PlayGateway implements OnGatewayInit { * @param client - WebSocket 클라이언트 */ @SubscribeMessage('leave') - async leave(@ConnectedSocket() client: WebSocketWithSession) { + async leave(@ConnectedSocket() client: WebSocketWithSession): Promise> { const { id, quizZoneId } = client.session; const { isHost } = await this.playService.leaveQuizZone(quizZoneId, id); @@ -250,36 +239,59 @@ export class PlayGateway implements OnGatewayInit { await this.chatService.leave(quizZoneId, id); } - return { event: 'leave', data: 'OK' }; - } - - @SubscribeMessage('chat') - async chat( - @ConnectedSocket() client: WebSocketWithSession, - @MessageBody() message: ChatMessage, - ) { - await this.chatService.send(client.session.quizZoneId, message); + return { event: 'leave', sender: quizZoneId, data: 'OK' }; } private async subscribePlay(quizZoneId: string, client: WebSocketWithSession) { const clientId = client.session.id; - this.clients.set(clientId, client); - try { await this.broker.addPublisher(quizZoneId); } catch (error) {} + const unsubscribe = await this.broker.subscribe( - quizZoneId, clientId, async (message) => { - const {event, sender} = message; - - if (sender !== clientId) { - client.send(JSON.stringify(message)); - } else if (event === 'someone_leave') { - await unsubscribe(); - this.clients.delete(clientId); - client.close(); + quizZoneId, + clientId, + async (message) => { + const { event, sender, data } = message; + + switch (event) { + case "someone_join": + case "someone_submit": + case "changeNickname": + if (sender === clientId) break; + client.send(JSON.stringify(message)); + break; + case "someone_leave": + if (sender === clientId) { + await unsubscribe(); + client.close(); + } else { + client.send(JSON.stringify(message)); + } + break; + case "summary": + const { players, quizzes, ranks, endSocketTime } = data; + const player = players.get(clientId); + + if (player === undefined) break; + + const {score, submits } = player; + + client.send(JSON.stringify({event, data: { + score, submits, quizzes, ranks, endSocketTime + }})); + + break; + case "start": + case "nextQuiz": + case "quizTimeOut": + case "finish": + case "close": + client.send(JSON.stringify(message)); + break; + } } - }); + ); } } diff --git a/apps/backend/src/play/play.service.ts b/apps/backend/src/play/play.service.ts index 5a6324e..76a0263 100644 --- a/apps/backend/src/play/play.service.ts +++ b/apps/backend/src/play/play.service.ts @@ -302,18 +302,11 @@ export class PlayService { const now = Date.now(); const endSocketTime = now + socketConnectTime; - - const summaries = [...players.values()].map(({ id, score, submits }) => ({ - id, - score, - submits, - quizzes, - ranks, - endSocketTime - })); - quizZone.summaries = {ranks, endSocketTime}; - return summaries; + + return { + players, quizzes, ranks, endSocketTime, + } } public async clearQuizZone(quizZoneId: string) { diff --git a/packages/shared/src/interfaces/send-event.interface.ts b/packages/shared/src/interfaces/send-event.interface.ts index d6c1563..8080488 100644 --- a/packages/shared/src/interfaces/send-event.interface.ts +++ b/packages/shared/src/interfaces/send-event.interface.ts @@ -1,11 +1,20 @@ +export type PlayEvent = UnicastPlayEvent | BroadcastPlayEvent; + +export type UnicastPlayEvent = 'join' | 'changeNickname' | 'submit' | 'leave'; + +export type BroadcastPlayEvent = + 'someone_join' | 'someone_leave' | 'changeNickname' | + 'start' | 'nextQuiz' | + 'someone_submit' | 'quizTimeOut' | + 'finish' | 'summary' | 'close'; /** * 웹소켓 서버가 사용자에게 응답할 메시지 형식을 정의합니다. * * @property event - 클라이언트에게 전송할 이벤트 이름 * @property data - 클라이언트에게 전송할 데이터 */ -export interface SendEventMessage { - event: string; +export interface SendEventMessage { + event: TEvent; sender: string; - data: T; + data: TMessage; } \ No newline at end of file From 6856587c6e35a80b6b2ff4a515daf6c51b22df92 Mon Sep 17 00:00:00 2001 From: mario Date: Thu, 23 Jan 2025 23:31:11 +0900 Subject: [PATCH 24/26] =?UTF-8?q?fix:=20play=20gateway=20=EA=B8=B0?= =?UTF-8?q?=EB=8A=A5=20=EB=B3=B5=EA=B5=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 누락된 chat 핸들러 복구 --- apps/backend/src/play/play.gateway.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/apps/backend/src/play/play.gateway.ts b/apps/backend/src/play/play.gateway.ts index 470da34..a8520e8 100644 --- a/apps/backend/src/play/play.gateway.ts +++ b/apps/backend/src/play/play.gateway.ts @@ -242,6 +242,14 @@ export class PlayGateway implements OnGatewayInit { return { event: 'leave', sender: quizZoneId, data: 'OK' }; } + @SubscribeMessage('chat') + async chat( + @ConnectedSocket() client: WebSocketWithSession, + @MessageBody() message: ChatMessage, + ) { + await this.chatService.send(client.session.quizZoneId, message); + } + private async subscribePlay(quizZoneId: string, client: WebSocketWithSession) { const clientId = client.session.id; From 3dcca09c90a34e8c66c7679fe1e9a2ada7d73b33 Mon Sep 17 00:00:00 2001 From: mario Date: Fri, 24 Jan 2025 10:57:09 +0900 Subject: [PATCH 25/26] =?UTF-8?q?refactor:=20broker=20=EC=9D=B8=ED=84=B0?= =?UTF-8?q?=ED=8E=98=EC=9D=B4=EC=8A=A4=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - subscribe의 불필요한 subscriberId 인자 제거 - MessageBroker를 위한 Subscriber 인터페이스 선언 - MessageBroker subscriber 저장을 배열로 변경 - subscriber의 Id를 생성하도록 변경(uuid) - ReactiveMessageBroker의 subscribe 변경 반영 --- packages/shared/package.json | 3 ++- .../broker/interfaces/broker.interface.ts | 2 +- .../shared/src/core/broker/message-broker.ts | 19 +++++++++++++------ .../core/broker/reactive-message-broker.ts | 4 ++-- 4 files changed, 18 insertions(+), 10 deletions(-) diff --git a/packages/shared/package.json b/packages/shared/package.json index 848465f..a04b074 100644 --- a/packages/shared/package.json +++ b/packages/shared/package.json @@ -20,6 +20,7 @@ "typescript": "^5.x.x" }, "dependencies": { - "rxjs": "^7.8.1" + "rxjs": "^7.8.1", + "uuid": "^11.0.5" } } diff --git a/packages/shared/src/core/broker/interfaces/broker.interface.ts b/packages/shared/src/core/broker/interfaces/broker.interface.ts index 8db5565..8bfc5ee 100644 --- a/packages/shared/src/core/broker/interfaces/broker.interface.ts +++ b/packages/shared/src/core/broker/interfaces/broker.interface.ts @@ -3,7 +3,7 @@ import { MessageHandler } from '../types'; type Unsubscribe = () => Promise; export interface Broker { - subscribe(publisherId: string, subscriberId: string, handler: MessageHandler): Promise; + subscribe(publisherId: string, handler: MessageHandler): Promise; addPublisher(publisherId: string): Promise; diff --git a/packages/shared/src/core/broker/message-broker.ts b/packages/shared/src/core/broker/message-broker.ts index efc167b..a7e7b41 100644 --- a/packages/shared/src/core/broker/message-broker.ts +++ b/packages/shared/src/core/broker/message-broker.ts @@ -1,9 +1,15 @@ import { Broker } from './interfaces/broker.interface'; import { MessageHandler } from './types'; +import { v4 as uuidv4 } from 'uuid'; + +export interface Subscriber { + id: string; + handler: MessageHandler; +} export class MessageBroker implements Broker { constructor( - private readonly publishers: Map>> = new Map(), + private readonly publishers: Map[]> = new Map(), ) {} public async addPublisher(id: string) { @@ -11,7 +17,7 @@ export class MessageBroker implements Broker { throw new Error(`Publisher with ID ${id} already exists`); } - this.publishers.set(id, new Map()); + this.publishers.set(id, []); } public async removePublisher(id: string) { @@ -29,17 +35,18 @@ export class MessageBroker implements Broker { throw new Error(`Publisher with ID ${id} does not exist`); } - await Promise.all([...subscribers.values()].map(handler => handler(message))); + await Promise.all(subscribers.map((subscriber) => subscriber.handler(message))); } - public async subscribe(publisherId: string, subscriberId: string, handler: MessageHandler) { + public async subscribe(publisherId: string, handler: MessageHandler) { const subscribers = this.publishers.get(publisherId); if (subscribers === undefined) { throw new Error(`Publisher with ID ${publisherId} does not exist`); } - subscribers.set(subscriberId, handler); + const subscriberId = uuidv4(); + this.publishers.set(publisherId, [...subscribers, {id: subscriberId, handler}]); return () => this.unsubscribe(publisherId, subscriberId); } @@ -51,6 +58,6 @@ export class MessageBroker implements Broker { throw new Error(`Publisher with ID ${publisherId} does not exist`); } - subscribers.delete(subscriberId); + this.publishers.set(publisherId, subscribers.filter((subscriber) => subscriber.id !== subscriberId)); } } \ No newline at end of file diff --git a/packages/shared/src/core/broker/reactive-message-broker.ts b/packages/shared/src/core/broker/reactive-message-broker.ts index a3b29d2..5bdbd92 100644 --- a/packages/shared/src/core/broker/reactive-message-broker.ts +++ b/packages/shared/src/core/broker/reactive-message-broker.ts @@ -35,7 +35,7 @@ export class ReactiveMessageBroker implements Broker { publisher.next(message); } - public async subscribe(publisherId: string, subscriberId: string, handler: MessageHandler + public async subscribe(publisherId: string, handler: MessageHandler ) { const publisher = this.publishers.get(publisherId); @@ -45,7 +45,7 @@ export class ReactiveMessageBroker implements Broker { const subscription = publisher.subscribe({ next: handler, - error: (error: any) => console.error(`Error in subscription ${publisherId}:${subscriberId} :`, error) + error: (error: any) => console.error(`Error in subscription ${publisherId} :`, error) }); return async () => subscription.unsubscribe(); From 4d609af519d6f2f0178e3cd0e60d9c4e90418d97 Mon Sep 17 00:00:00 2001 From: mario Date: Fri, 24 Jan 2025 11:00:33 +0900 Subject: [PATCH 26/26] =?UTF-8?q?refactor:=20broker=20=EC=9D=B8=ED=84=B0?= =?UTF-8?q?=ED=8E=98=EC=9D=B4=EC=8A=A4=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - playGateway의 broker 변경 사항 반영 - chatService의 broker 변경 사항 반영 --- apps/backend/src/chat/chat.service.ts | 2 +- apps/backend/src/play/play.gateway.ts | 2 +- pnpm-lock.yaml | 9 +++++++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/apps/backend/src/chat/chat.service.ts b/apps/backend/src/chat/chat.service.ts index 25463c5..be23d31 100644 --- a/apps/backend/src/chat/chat.service.ts +++ b/apps/backend/src/chat/chat.service.ts @@ -30,7 +30,7 @@ export class ChatService { } async join(chatId: string, player: Player, handleSendMessage: (data: ChatMessage) => void) { - const unsubscribe = await this.broker.subscribe(chatId, player.id, async (message) => { + const unsubscribe = await this.broker.subscribe(chatId, async (message) => { const { topic, data } = message; const { clientId } = data; diff --git a/apps/backend/src/play/play.gateway.ts b/apps/backend/src/play/play.gateway.ts index a8520e8..dd5a1e4 100644 --- a/apps/backend/src/play/play.gateway.ts +++ b/apps/backend/src/play/play.gateway.ts @@ -259,7 +259,6 @@ export class PlayGateway implements OnGatewayInit { const unsubscribe = await this.broker.subscribe( quizZoneId, - clientId, async (message) => { const { event, sender, data } = message; @@ -273,6 +272,7 @@ export class PlayGateway implements OnGatewayInit { case "someone_leave": if (sender === clientId) { await unsubscribe(); + await this.chatService.leave(clientId, clientId); client.close(); } else { client.send(JSON.stringify(message)); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 14a36c9..899e841 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -341,6 +341,9 @@ importers: rxjs: specifier: ^7.8.1 version: 7.8.1 + uuid: + specifier: ^11.0.5 + version: 11.0.5 devDependencies: '@typescript-eslint/eslint-plugin': specifier: ^8.0.0 @@ -6161,6 +6164,10 @@ packages: resolution: {integrity: sha512-pMZTvIkT1d+TFGvDOqodOclx0QWkkgi6Tdoa8gC8ffGAAqz9pzPTZWAybbsHHoED/ztMtkv/VoYTYyShUn81hA==} engines: {node: '>= 0.4.0'} + uuid@11.0.5: + resolution: {integrity: sha512-508e6IcKLrhxKdBbcA2b4KQZlLVp2+J5UwQ6F7Drckkc5N9ZJwFa4TgWtsww9UG8fGHbm6gbV19TdM5pQ4GaIA==} + hasBin: true + uuid@9.0.1: resolution: {integrity: sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==} hasBin: true @@ -12955,6 +12962,8 @@ snapshots: utils-merge@1.0.1: {} + uuid@11.0.5: {} + uuid@9.0.1: {} v8-compile-cache-lib@3.0.1: {}