Skip to content

Commit

Permalink
Bug/#315 unsubscribe, subscribe에 queue 적용 (#337)
Browse files Browse the repository at this point in the history
* 🐛 fix: 한번만 callback 함수를 부르는 방식으로 변경, 안 쓰이는 함수 삭제

* 💄 style: processStockData 이름 변경

* ✨ feat: new Date 추가

* 🐛 fix: new date에서 date없이도 가능

* 📝 docs: korea.mst

* ✨ feat: newDate 타입가드, isSameMonth, week 추가

* 🐛 fix: 데이터 받아오는 단위 수정

* 🐛 fix: dequeue 수정, 테스트 코드 제외

* 🐛 fix: newDate 오류 수정

* 🐛 fix: period 데이터 받지 못하는 문제 해결

* 💄 style: 테스트 코드 삭제

* 🐛 fix: stock.gateway 수정

* 🐛 fix: unsubscribe 기능 추가

* 🐛 fix: livedata service 수정

* 🐛 fix: null 일때 에러 리턴

* 📝 docs: 불필요한 info 삭제

* ♻️ refactor: 안쓰이는 service 삭제
  • Loading branch information
swkim12345 authored Dec 3, 2024
1 parent e1bb6b8 commit 5344b26
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 65 deletions.
4 changes: 2 additions & 2 deletions packages/backend/src/scraper/openapi/api/openapiToken.api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ export class OpenapiTokenApi {
private isTokenExpired(startDate?: Date) {
if (!startDate) return true;
const now = new Date();
//실제 만료 시간은 24시간이지만, 문제가 발생할 여지를 줄이기 위해 12시간으로 설정
const baseTimeToMilliSec = 12 * 60 * 60 * 1000;
//실제 만료 시간은 24시간이지만, 문제가 발생할 여지를 줄이기 위해 6시간으로 설정
const baseTimeToMilliSec = 6 * 60 * 60 * 1000;
const timeDiff = now.getTime() - startDate.getTime();

return timeDiff >= baseTimeToMilliSec;
Expand Down
13 changes: 9 additions & 4 deletions packages/backend/src/scraper/openapi/liveData.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export class LiveData {
stockId,
);
if (stockLiveData) {
this.openapiLiveData.saveLiveData(stockLiveData);
await this.openapiLiveData.saveLiveData(stockLiveData);
}
} catch (error) {
this.logger.warn(`Subscribe error in open api : ${error}`);
Expand All @@ -60,6 +60,9 @@ export class LiveData {
}

async subscribe(stockId: string) {
if (stockId === null || stockId === undefined) {
return;
}
await this.openapiSubscribe(stockId);

if (!this.isCloseTime(new Date(), this.startTime, this.endTime)) {
Expand All @@ -85,7 +88,7 @@ export class LiveData {
const idx = this.subscribeStocks.get(stockId);
this.subscribeStocks.delete(stockId);

if (idx) {
if (idx !== undefined) {
this.configSubscribeSize[idx]--;
} else {
this.logger.warn(`Websocket error : ${stockId} has invalid idx`);
Expand All @@ -98,7 +101,7 @@ export class LiveData {
'2',
);

this.websocketClient[idx].discribe(message);
this.websocketClient[idx].unsubscribe(message);
}
}

Expand All @@ -122,11 +125,13 @@ export class LiveData {
if (message.header) {
if (message.header.tr_id === 'PINGPONG') {
client.pong(data);
} else {
this.logger.info(JSON.stringify(message));
}
return;
}
const liveData = this.openapiLiveData.convertLiveData(message);
await this.openapiLiveData.saveLiveData(liveData[0]);
await this.openapiLiveData.saveLiveData(liveData[0])
} catch (error) {
this.logger.warn(error);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { OpenapiMinuteData } from './api/openapiMinuteData.api';
import { OpenapiPeriodData } from './api/openapiPeriodData.api';
import { OpenapiTokenApi } from './api/openapiToken.api';
import { LiveData } from './liveData.service';
import { OpenapiScraperService } from './openapi-scraper.service';
import { WebsocketClient } from './websocket/websocketClient.websocket';
import { OpenapiFluctuationData } from '@/scraper/openapi/api/openapiFluctuationData.api';
import { OpenapiRankViewApi } from '@/scraper/openapi/api/openapiRankView.api';
Expand Down Expand Up @@ -49,7 +48,6 @@ import { StockLiveData } from '@/stock/domain/stockLiveData.entity';
OpenapiPeriodData,
OpenapiMinuteData,
OpenapiDetailData,
OpenapiScraperService,
OpenapiFluctuationData,
OpenapiIndex,
OpenapiRankViewApi,
Expand Down
15 changes: 0 additions & 15 deletions packages/backend/src/scraper/openapi/openapi-scraper.service.ts

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { Inject, Injectable } from '@nestjs/common';
import { Logger } from 'winston';
import { RawData, WebSocket } from 'ws';
Expand All @@ -7,24 +6,23 @@ import { RawData, WebSocket } from 'ws';
export class WebsocketClient {
static url = process.env.WS_URL ?? 'ws://ops.koreainvestment.com:21000';
private client: WebSocket;
//현재 factory 패턴을 이용해 할당하면 socket이 열리기 전에 message가 가는 문제가 있음.
// 소켓이 할당되기 전에(client에 소켓이 없을 때) message를 보내려 시도함.
private messageQueue: string[] = [];

constructor(@Inject('winston') private readonly logger: Logger) {
this.client = new WebSocket(WebsocketClient.url);
this.initOpen(() => this.flushQueue());
this.initError((error) => this.logger.error('WebSocket error', error));
}

static websocketFactory(logger: Logger) {
const websocket = new WebsocketClient(logger);

return websocket;
return new WebsocketClient(logger);
}

subscribe(message: string) {
this.sendMessage(message);
}

discribe(message: string) {
unsubscribe(message: string) {
this.sendMessage(message);
}

Expand All @@ -50,22 +48,28 @@ export class WebsocketClient {
initCloseCallback: () => void,
initErrorCallback: (error: unknown) => void,
) {
this.initOpen(initOpenCallback(this.sendMessage));
this.initOpen(initOpenCallback(this.sendMessage.bind(this)));
this.initMessage(initMessageCallback(this.client));
this.initDisconnect(initCloseCallback);
this.initError(initErrorCallback);
}

private sendMessage(message: string) {
if (!this.client || !this.client.readyState) {
this.logger.warn('WebSocket is not open. Message not sent. ');
return;
}
if (this.client.readyState === WebSocket.OPEN) {
this.client.send(message);
this.logger.info(`Sent message: ${message}`);
} else {
this.logger.warn('WebSocket is not open. Message not sent. ');
this.logger.warn('WebSocket not open. Queueing message.');
this.messageQueue.push(message); // 큐에 메시지를 추가
}
}

private flushQueue() {
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift();
if (message) {
this.sendMessage(message);
}
}
}
}
63 changes: 34 additions & 29 deletions packages/backend/src/stock/stock.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Inject, Injectable } from '@nestjs/common';
import {
ConnectedSocket,
MessageBody,
OnGatewayDisconnect,
SubscribeMessage,
WebSocketGateway,
WebSocketServer,
Expand All @@ -17,11 +18,13 @@ import { LiveData } from '@/scraper/openapi/liveData.service';
pingTimeout: 5000,
})
@Injectable()
export class StockGateway {
export class StockGateway implements OnGatewayDisconnect {
@WebSocketServer()
server: Server;
private readonly mutex = new Mutex();

private readonly users: Map<string, string> = new Map();

constructor(
private readonly liveData: LiveData,
@Inject('winston') private readonly logger: Logger,
Expand All @@ -32,40 +35,42 @@ export class StockGateway {
@MessageBody() stockId: string,
@ConnectedSocket() client: Socket,
) {
client.join(stockId);

await this.mutex.runExclusive(async () => {
const connectedSockets = await this.server.to(stockId).fetchSockets();
try {
client.join(stockId);
this.users.set(client.id, stockId);

if (connectedSockets.length > 0 && !this.liveData.isSubscribe(stockId)) {
await this.liveData.subscribe(stockId);
this.logger.info(`${stockId} is subscribed`);
}
});
await this.mutex.runExclusive(async () => {
const connectedSockets = await this.server.to(stockId).fetchSockets();

client.on('disconnecting', () => {
client.rooms.delete(client.id);
const stocks = Array.from(client.rooms.values());
for (const stock of stocks) {
this.handleDisconnectStock(stock);
}
});
if (
connectedSockets.length > 0 &&
!this.liveData.isSubscribe(stockId)
) {
await this.liveData.subscribe(stockId);
this.logger.info(`${stockId} is subscribed`);
}
});

client.emit('connectionSuccess', {
message: `Successfully connected to stock room: ${stockId}`,
stockId,
});
client.emit('connectionSuccess', {
message: `Successfully connected to stock room: ${stockId}`,
stockId,
});
} catch (e) {
const error = e as Error;
this.logger.warn(error.message);
client.emit('error', error.message);
client.disconnect();
}
}

async handleDisconnectStock(stockId: string) {
await this.mutex.runExclusive(async () => {
const connectedSockets = await this.server.in(stockId).fetchSockets();

if (connectedSockets.length === 0) {
async handleDisconnect(client: Socket) {
const stockId = this.users.get(client.id);
if (stockId) {
await this.mutex.runExclusive(async () => {
await this.liveData.unsubscribe(stockId);
this.logger.info(`${stockId} is unsubscribed`);
}
});
this.users.delete(client.id);
});
}
}

onUpdateStock(
Expand Down

0 comments on commit 5344b26

Please sign in to comment.