Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] 공용 메시지 브로커 구현 및 적용 #16

Open
wants to merge 27 commits into
base: develop
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
cb9e499
feat: 메시지큐 구현
codemario318 Jan 21, 2025
316d744
feat: message-queue 모듈 등록
codemario318 Jan 22, 2025
5128b99
fix: 메시지큐 구현 제거
codemario318 Jan 22, 2025
8d058b4
feat: pub-sub 기능 구현
codemario318 Jan 22, 2025
dab1606
refactor: pub-sub 적용
codemario318 Jan 22, 2025
22284cc
fix: pub-sub 인터페이스 수정
codemario318 Jan 22, 2025
bb3efe4
feat: 메시지 브로커 수정
codemario318 Jan 22, 2025
69d6b1c
fix: 브로커 초기화 오류 수정
codemario318 Jan 22, 2025
36385ac
feat: 메시지 브로커 구현체 추가
codemario318 Jan 22, 2025
95262f6
feat: ReactiveMessageBroker 적용
codemario318 Jan 22, 2025
e2b760d
fix: 채팅 핸들러 수정
codemario318 Jan 22, 2025
01d3048
refactor: play 영역 pub-sub 모듈 적용
codemario318 Jan 22, 2025
aab69ef
refactor: pub-sub 이름 변경
codemario318 Jan 23, 2025
fe899b2
refactor: Broker 구조 변경
codemario318 Jan 23, 2025
6ff1c2d
refactor: broker 변경 사항 반영
codemario318 Jan 23, 2025
d7306a0
refactor: chat 서비스 broker 리펙토링 반영
codemario318 Jan 23, 2025
1d1971c
refactor: broker 모듈 위치 변경
codemario318 Jan 23, 2025
75bf70c
refactor: 공용 broker 수정
codemario318 Jan 23, 2025
3244f1a
chore: shared 의존성 추가
codemario318 Jan 23, 2025
5153f07
refactor: broker 모듈 변경 반영
codemario318 Jan 23, 2025
b09a1f7
refactor: reactive-message-broker 수정
codemario318 Jan 23, 2025
68df88e
chore: shared 패키지 의존성 추가
codemario318 Jan 23, 2025
f07a735
refactor: play clients 의존성 제거
codemario318 Jan 23, 2025
6856587
fix: play gateway 기능 복구
codemario318 Jan 23, 2025
3dcca09
refactor: broker 인터페이스 변경
codemario318 Jan 24, 2025
4d609af
refactor: broker 인터페이스 변경
codemario318 Jan 24, 2025
144c424
refactor: pub/sub 모듈 수정
codemario318 Jan 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor: play 영역 pub-sub 모듈 적용
- play 웹소켓 응답 처리 pub/sub 적용
- 불필요한 로직 삭제
- 일부 코드 개선
  • Loading branch information
codemario318 committed Jan 22, 2025
commit 01d3048e43f454316f8fa114010b2143eaa3c7a8
6 changes: 5 additions & 1 deletion apps/backend/src/core/SessionWsAdapter.ts
Original file line number Diff line number Diff line change
@@ -3,8 +3,12 @@ import { RequestHandler } from 'express';
import { NestApplication } from '@nestjs/core';
import { Session } from 'express-session';

export interface SessionWithQuizZone extends Session {
quizZoneId?: string;
}

export interface WebSocketWithSession extends WebSocket {
session: Session;
session: SessionWithQuizZone;
}

export class SessionWsAdapter extends WsAdapter {
10 changes: 0 additions & 10 deletions apps/backend/src/play/entities/send-event.entity.ts

This file was deleted.

182 changes: 84 additions & 98 deletions apps/backend/src/play/play.gateway.ts
Original file line number Diff line number Diff line change
@@ -10,14 +10,13 @@ import { PlayService } from './play.service';
import { Server } from 'ws';
import { QuizSubmitDto } from './dto/quiz-submit.dto';
import { QuizJoinDto } from './dto/quiz-join.dto';
import { BadRequestException, Inject } from '@nestjs/common';
import { SendEventMessage } from './entities/send-event.entity';
import { ClientInfo } from './entities/client-info.entity';
import { Inject, NotFoundException } from '@nestjs/common';
import { WebSocketWithSession } from '../core/SessionWsAdapter';
import { RuntimeException } from '@nestjs/core/errors/exceptions';
import { SubmitResponseDto } from './dto/submit-response.dto';
import { ChatService } from '../chat/chat.service';
import {ChatMessage, CLOSE_CODE} from "@web08-booquiz/shared";
import { ChatMessage, SendEventMessage } from '@web08-booquiz/shared';
import { PubSub } from '../core/pub-sub/interfaces/pub-sub.interface';

/**
* 퀴즈 게임에 대한 WebSocket 연결을 관리하는 Gateway입니다.
@@ -30,9 +29,11 @@ export class PlayGateway implements OnGatewayInit {

constructor(
@Inject('ClientInfoStorage')
private readonly clients: Map<String, ClientInfo>,
private readonly clients: Map<String, WebSocketWithSession>,
private readonly playService: PlayService,
private readonly chatService: ChatService,
@Inject('PubSub')
private readonly pubSub: PubSub<string, SendEventMessage<any>>,
) {}

/**
@@ -46,35 +47,13 @@ export class PlayGateway implements OnGatewayInit {
}

private sendToClient(clientId: string, event: string, data?: any) {
const { socket } = this.getClientInfo(clientId);
socket.send(JSON.stringify({ event, data }));
}

private broadcast(clientIds: string[], event: string, data?: any) {
clientIds.forEach((clientId) => {
this.sendToClient(clientId, event, data);
});
}

/**
* 클라이언트의 세션 ID를 이용하여 클라이언트 정보를 조회합니다.
*
* @param clientId - 클라이언트의 세션 ID
*/
private getClientInfo(clientId: string): ClientInfo {
const clientInfo = this.clients.get(clientId);
const socket = this.clients.get(clientId);

if (!clientInfo) {
throw new BadRequestException('사용자의 접속 정보를 찾을 수 없습니다.');
if (socket === undefined) {
throw new NotFoundException('사용자의 접속 정보를 찾을 수 없습니다.')
}

return clientInfo;
}

private clearClient(clientId: string, reason?: string) {
const { socket } = this.getClientInfo(clientId);
this.clients.delete(clientId);
socket.close(CLOSE_CODE.NORMAL, reason);
socket.send(JSON.stringify({ event, data }));
}

/**
@@ -97,30 +76,21 @@ export class PlayGateway implements OnGatewayInit {
);

const { id, nickname } = currentPlayer;
const playerIds = players.map((player) => player.id);
const data = players.map(({ id, nickname }) => ({
id,
nickname,
}));

if (this.clients.has(sessionId) && this.clients.get(sessionId).quizZoneId === quizZoneId) {
this.clients.set(sessionId, { quizZoneId, socket: client });
return {
event: 'join',
data,
};
}
this.clients.set(sessionId, { quizZoneId, socket: client });

this.broadcast(playerIds, 'someone_join', { id, nickname });
await this.subscribePlay(quizZoneId, client);

await this.chatService.join(quizZoneId, currentPlayer, (message: ChatMessage) => {
client.send(JSON.stringify({event: 'chat', data: message}));
});

await this.pubSub.publish(quizZoneId, { topic: id, data: {
event: 'someone_join',
data: { id, nickname }
}});

return {
event: 'join',
data,
data: players.map(({ id, nickname }) => ({ id, nickname }))
};
}

@@ -129,16 +99,14 @@ export class PlayGateway implements OnGatewayInit {
@ConnectedSocket() client: WebSocketWithSession,
@MessageBody() changedNickname: string,
): Promise<SendEventMessage<string>> {
const clientId = client.session.id;
const { quizZoneId } = this.getClientInfo(clientId);
const { id, quizZoneId } = client.session;

const { playerIds } = await this.playService.changeNickname(
quizZoneId,
clientId,
changedNickname,
);
await this.playService.changeNickname(quizZoneId, id, changedNickname);

this.broadcast(playerIds, 'updateNickname', { clientId, changedNickname });
await this.pubSub.publish(quizZoneId, {topic: id, data: {
event: 'changeNickname',
data: {id, changedNickname}
}});

return {
event: 'changeNickname',
@@ -153,12 +121,14 @@ export class PlayGateway implements OnGatewayInit {
*/
@SubscribeMessage('start')
async start(@ConnectedSocket() client: WebSocketWithSession) {
const clientId = client.session.id;
const { quizZoneId } = this.getClientInfo(clientId);
const { id, quizZoneId } = client.session;

const playerIds = await this.playService.startQuizZone(quizZoneId, clientId);
await this.playService.startQuizZone(quizZoneId, id);

this.broadcast(playerIds, 'start', 'OK');
await this.pubSub.publish(quizZoneId, {topic: quizZoneId, data: {
event: 'start',
data: 'OK'
}});

this.server.emit('nextQuiz', quizZoneId);
}
@@ -170,15 +140,21 @@ export class PlayGateway implements OnGatewayInit {
*/
private async playNextQuiz(quizZoneId: string) {
try {
const { nextQuiz, playerIds, currentQuizResult } = await this.playService.playNextQuiz(
const { nextQuiz, currentQuizResult } = await this.playService.playNextQuiz(
quizZoneId,
() => {
this.broadcast(playerIds, 'quizTimeOut');
async () => {
await this.pubSub.publish(quizZoneId, {topic: quizZoneId, data: {
event: 'quizTimeOut',
data: undefined,
}});
this.server.emit('nextQuiz', quizZoneId);
},
);

this.broadcast(playerIds, 'nextQuiz', { nextQuiz, currentQuizResult });
await this.pubSub.publish(quizZoneId, {topic: quizZoneId, data: {
event: 'nextQuiz',
data: { nextQuiz, currentQuizResult }
}});
} catch (error) {
if (error instanceof RuntimeException) {
await this.finishQuizZone(quizZoneId);
@@ -189,10 +165,7 @@ export class PlayGateway implements OnGatewayInit {
}

private async finishQuizZone(quizZoneId: string) {
const playerIds = await this.playService.finishQuizZone(quizZoneId);

this.broadcast(playerIds, 'finish');

await this.pubSub.publish(quizZoneId, {topic: quizZoneId, data: {event: 'finish', data: undefined}});
this.server.emit('summary', quizZoneId);
}

@@ -208,30 +181,33 @@ export class PlayGateway implements OnGatewayInit {
@ConnectedSocket() client: WebSocketWithSession,
@MessageBody() quizSubmit: QuizSubmitDto,
): Promise<SendEventMessage<SubmitResponseDto>> {
const clientId = client.session.id;
const { quizZoneId } = this.getClientInfo(clientId);
const chatMessages = await this.chatService.get(quizZoneId);
const { id, quizZoneId } = client.session;

const {
isLastSubmit,
fastestPlayerIds,
submittedCount,
totalPlayerCount,
otherSubmittedPlayerIds,
} = await this.playService.submit(quizZoneId, clientId, {
} = await this.playService.submit(quizZoneId, id, {
...quizSubmit,
receivedAt: Date.now(),
});

if (isLastSubmit) {
this.server.emit('nextQuiz', quizZoneId);
} else {
await this.pubSub.publish(quizZoneId, {topic: id, data: {
event: 'someone_submit',
data: { id, submittedCount }
}});
}

this.broadcast(otherSubmittedPlayerIds, 'someone_submit', { clientId, submittedCount });

return {
event: 'submit',
data: { fastestPlayerIds, submittedCount, totalPlayerCount, chatMessages },
data: {
fastestPlayerIds, submittedCount, totalPlayerCount,
chatMessages: await this.chatService.get(quizZoneId)
},
};
}

@@ -248,27 +224,22 @@ export class PlayGateway implements OnGatewayInit {
this.sendToClient(id, 'summary', { score, submits, quizzes, ranks, endSocketTime });
});

const clientsIds = summaries.map(({ id }) => id);

this.clearQuizZone(clientsIds, quizZoneId, endSocketTime - Date.now());
this.clearQuizZone(quizZoneId, endSocketTime - Date.now());
}

/**
* 퀴즈 방을 나갔다는 메시지를 클라이언트로 전송합니다.
*
* - 방장이 나가면 퀴즈 존을 삭제하고 모든 플레이어에게 방장이 나갔다고 알립니다.
* - 일반 플레이어가 나가면 퀴즈 존에서 나가고 다른 플레이어에게 나갔다고 알립니다.
* @param clientIds - 퀴즈존에 참여하고 있는 클라이언트 id 리스트
* @param quizZoneId - 퀴즈가 끝난 퀴즈존 id
* @param time - 소켓 연결 종료 시간 종료 시간
*/
private clearQuizZone(clientIds: string[], quizZoneId: string, time: number) {
setTimeout(() => {
clientIds.forEach((id) => {
this.clearClient(id, 'finish');
});
this.playService.clearQuizZone(quizZoneId);
this.chatService.delete(quizZoneId);
private clearQuizZone(quizZoneId: string, time: number) {
setTimeout(async () => {
await this.playService.clearQuizZone(quizZoneId);
await this.pubSub.publish(quizZoneId, {topic: quizZoneId, data: {event: 'close', data: undefined}});
await this.chatService.delete(quizZoneId);
}, time);
}

@@ -281,18 +252,16 @@ export class PlayGateway implements OnGatewayInit {
*/
@SubscribeMessage('leave')
async leave(@ConnectedSocket() client: WebSocketWithSession) {
const clientId = client.session.id;
const { quizZoneId } = this.getClientInfo(clientId);
const { id, quizZoneId } = client.session;

const { isHost, playerIds } = await this.playService.leaveQuizZone(quizZoneId, clientId);
const { isHost } = await this.playService.leaveQuizZone(quizZoneId, id);

if (isHost) {
this.broadcast(playerIds, 'close');
this.clearQuizZone(playerIds, quizZoneId, 0);
await this.pubSub.publish(quizZoneId, {topic: id, data: {event: 'close', data: undefined}});
this.clearQuizZone(quizZoneId, 0);
} else {
this.broadcast(playerIds, 'someone_leave', clientId);
await this.chatService.leave(quizZoneId, clientId);
this.clearClient(clientId, 'Client leave');
await this.pubSub.publish(quizZoneId, {topic: id, data: {event: 'someone_leave', data: undefined}});
await this.chatService.leave(quizZoneId, id);
}

return { event: 'leave', data: 'OK' };
@@ -303,11 +272,28 @@ export class PlayGateway implements OnGatewayInit {
@ConnectedSocket() client: WebSocketWithSession,
@MessageBody() message: ChatMessage,
) {
await this.chatService.send(client.session.quizZoneId, message);
}

private async subscribePlay(quizZoneId: string, client: WebSocketWithSession) {
const clientId = client.session.id;
const { quizZoneId } = this.getClientInfo(clientId);
const clientIds = await this.playService.chatQuizZone(clientId, quizZoneId);

this.broadcast(clientIds, 'chat', message);
await this.chatService.send(quizZoneId, message);
this.clients.set(clientId, client);

try {
await this.pubSub.addPublisher(quizZoneId);
} catch (error) {}
const unsubscribe = await this.pubSub.subscribe(
quizZoneId, clientId, async (message) => {
const {topic, data} = message;

if (topic !== clientId) {
client.send(JSON.stringify(data));
} else if (data.event === 'someone_leave') {
await unsubscribe();
this.clients.delete(clientId);
client.close();
}
});
}
}
6 changes: 6 additions & 0 deletions apps/backend/src/play/play.module.ts
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ import { PlayService } from './play.service';
import { PlayGateway } from './play.gateway';
import { QuizZoneModule } from '../quiz-zone/quiz-zone.module';
import { ChatModule } from 'src/chat/chat.module';
import { ReactiveMessageBroker } from '../core/pub-sub/reactive-message-broker';

@Module({
imports: [QuizZoneModule, ChatModule],
@@ -16,7 +17,12 @@ import { ChatModule } from 'src/chat/chat.module';
provide: 'ClientInfoStorage',
useValue: new Map(),
},
{
provide: 'PubSub',
useValue: new ReactiveMessageBroker,
},
PlayService,
],
exports: [PlayService]
})
export class PlayModule {}
Loading