Skip to content

Commit

Permalink
Bug/#341 유저가 다른 주식에 접근할 때 예전 주식의 실시간 데이터가 보이는 버그 수정 (#350)
Browse files Browse the repository at this point in the history
  • Loading branch information
xjfcnfw3 authored Dec 4, 2024
2 parents d8b4a8d + 629b47d commit c4d1d9e
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 33 deletions.
6 changes: 5 additions & 1 deletion packages/backend/src/scraper/openapi/liveData.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type TR_IDS = '1' | '2';

@Injectable()
export class LiveData {
private readonly startTime: Date = new Date(2024, 0, 1, 9, 0, 0, 0);
private readonly startTime: Date = new Date(2024, 0, 1, 2, 0, 0, 0);
private readonly endTime: Date = new Date(2024, 0, 1, 15, 30, 0, 0);

private readonly reconnectInterval = 60 * 1000;
Expand All @@ -37,6 +37,10 @@ export class LiveData {
}
this.connect();
});
this.subscribe('005930');
this.subscribe('000660');
this.subscribe('000150');
this.subscribe('000020');
}

private async openapiSubscribe(stockId: string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@ import { RawData, WebSocket } from 'ws';
export class WebsocketClient {
static url = process.env.WS_URL ?? 'ws://ops.koreainvestment.com:21000';
private client: WebSocket;
private messageQueue: string[] = [];
//현재 factory 패턴을 이용해 할당하면 socket이 열리기 전에 message가 가는 문제가 있음.
// 소켓이 할당되기 전에(client에 소켓이 없을 때) message를 보내려 시도함.

constructor(@Inject('winston') private readonly logger: Logger) {
this.client = new WebSocket(WebsocketClient.url);
this.initOpen(() => this.flushQueue());
this.initError((error) => this.logger.error('WebSocket error', error));
}

static websocketFactory(logger: Logger) {
return new WebsocketClient(logger);
const websocket = new WebsocketClient(logger);
return websocket;
}

subscribe(message: string) {
Expand Down Expand Up @@ -48,28 +47,22 @@ export class WebsocketClient {
initCloseCallback: () => void,
initErrorCallback: (error: unknown) => void,
) {
this.initOpen(initOpenCallback(this.sendMessage.bind(this)));
this.initOpen(initOpenCallback(this.sendMessage));
this.initMessage(initMessageCallback(this.client));
this.initDisconnect(initCloseCallback);
this.initError(initErrorCallback);
}

private sendMessage(message: string) {
if (!this.client || !this.client.readyState) {
this.logger.warn('WebSocket is not open. Message not sent. ');
return;
}
if (this.client.readyState === WebSocket.OPEN) {
this.client.send(message);
this.logger.info(`Sent message: ${message}`);
} else {
this.logger.warn('WebSocket not open. Queueing message.');
this.messageQueue.push(message); // 큐에 메시지를 추가
}
}

private flushQueue() {
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift();
if (message) {
this.sendMessage(message);
}
this.logger.warn('WebSocket is not open. Message not sent. ');
}
}
}
46 changes: 31 additions & 15 deletions packages/backend/src/stock/stock.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,29 @@ export class StockGateway implements OnGatewayDisconnect {
@Inject('winston') private readonly logger: Logger,
) {}

private async handleJoinToRoom(stockId: string) {
const connectedSockets = await this.server.to(stockId).fetchSockets();

if (connectedSockets.length > 0 && !this.liveData.isSubscribe(stockId)) {
await this.liveData.subscribe(stockId);
this.logger.info(`${stockId} is subscribed`);
}
}

@SubscribeMessage('connectStock')
async handleConnectStock(
@MessageBody() stockId: string,
@ConnectedSocket() client: Socket,
) {
try {
client.join(stockId);
this.users.set(client.id, stockId);

await this.mutex.runExclusive(async () => {
const connectedSockets = await this.server.to(stockId).fetchSockets();
const beforeStockId = this.users.get(client.id);
await this.handleClientStockEvent(beforeStockId, client);

if (
connectedSockets.length > 0 &&
!this.liveData.isSubscribe(stockId)
) {
await this.liveData.subscribe(stockId);
this.logger.info(`${stockId} is subscribed`);
}
this.users.set(client.id, stockId);
this.handleJoinToRoom(stockId);
});

client.emit('connectionSuccess', {
Expand All @@ -63,16 +67,28 @@ export class StockGateway implements OnGatewayDisconnect {
}
}

async handleDisconnect(client: Socket) {
const stockId = this.users.get(client.id);
if (stockId) {
await this.mutex.runExclusive(async () => {
private async handleClientStockEvent(
stockId: string | undefined,
client: Socket,
) {
if (stockId !== undefined) {
client.leave(stockId);
this.users.delete(client.id);
const values = Object.values(this.users);
const isStockIdExists = values.some((value) => stockId === value);
if (!isStockIdExists) {
await this.liveData.unsubscribe(stockId);
this.users.delete(client.id);
});
}
}
}

async handleDisconnect(client: Socket) {
const stockId = this.users.get(client.id);
await this.mutex.runExclusive(async () => {
await this.handleClientStockEvent(stockId, client);
});
}

onUpdateStock(
stockId: string,
price: number,
Expand Down

0 comments on commit c4d1d9e

Please sign in to comment.