Skip to content

Commit

Permalink
Feature/#341 분봉 리팩토링 및 장 시간대에 분봉 받아오기, alarm 한번만 작동 (#343)
Browse files Browse the repository at this point in the history
* ♻️ refactor: 분봉 데이터 callback 형태로 바꿔 우선순위 큐 적용, 리팩토링

* ♻️ refactor: 분단위 데이터 수집 stock limit 200, 콜백함수로 리팩토링

* 🐛 fix: afterUpdate 적용 위한 upsert구문으로 변경

* 💄 style: 분단위 테스트 이후 테스트 코드 삭제 및 조건 원복

* 💄 style: dto 안 쓰이는 속성 삭제

* 💄 style: console.log 삭제

* 📝 docs: liveData에 unsubscribe, subscribe 메시지 info로 출력 추가

* 🐛 fix: 알람을 한번만 보내고 삭제처리하게 만듦
  • Loading branch information
swkim12345 authored Dec 3, 2024
1 parent 981f190 commit bde9d72
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 109 deletions.
2 changes: 2 additions & 0 deletions packages/backend/src/alarm/alarm.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ export class AlarmService {

for (const subscription of subscriptions) {
await this.pushService.sendPushNotification(subscription, payload);
//한번만 보내고 삭제하게 처리.
this.alarmRepository.delete(alarm.id);
}
}
}
7 changes: 3 additions & 4 deletions packages/backend/src/alarm/alarm.subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ export class AlarmSubscriber
}

isValidAlarm(alarm: Alarm, entity: StockMinutely) {
if (alarm.alarmDate && alarm.alarmDate > entity.createdAt) {
if (alarm.alarmDate && alarm.alarmDate >= entity.createdAt) {
return false;
} else {
if (alarm.targetPrice && alarm.targetPrice >= entity.open) {
if (alarm.targetPrice && alarm.targetPrice <= entity.open) {
return true;
}
if (alarm.targetVolume && alarm.targetVolume >= entity.volume) {
if (alarm.targetVolume && alarm.targetVolume <= entity.volume) {
return true;
}
return false;
Expand All @@ -48,7 +48,6 @@ export class AlarmSubscriber
where: { stock: { id: stockMinutely.stock.id } },
relations: ['user', 'stock'],
});

const alarms = rawAlarms.filter((val) =>
this.isValidAlarm(val, stockMinutely),
);
Expand Down
4 changes: 0 additions & 4 deletions packages/backend/src/alarm/dto/subscribe.request.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import { ApiProperty } from '@nestjs/swagger';
//import { User } from '@/user/domain/user.entity';

export class SubscriptionData {
//@ApiProperty({ type: () => User, description: '유저 아이디' })
//user: User;

@ApiProperty({
type: 'string',
description: '엔드 포인트 설정',
Expand Down
169 changes: 81 additions & 88 deletions packages/backend/src/scraper/openapi/api/openapiMinuteData.api.ts
Original file line number Diff line number Diff line change
@@ -1,48 +1,104 @@
import { Inject, Injectable } from '@nestjs/common';
import { Cron } from '@nestjs/schedule';
import { DataSource } from 'typeorm';
import { Logger } from 'winston';
import { openApiConfig } from '../config/openapi.config';

import {
isMinuteData,
Json,
OpenapiQueue,
OpenapiQueueNodeValue,
} from '../queue/openapi.queue';
import {
isMinuteDataOutput1,
isMinuteDataOutput2,
MinuteData,
MinuteDataOutput1,
MinuteDataOutput2,
UpdateStockQuery,
} from '../type/openapiMinuteData.type';
import { TR_IDS } from '../type/openapiUtil.type';
import { getCurrentTime, getOpenApi } from '../util/openapiUtil.api';
import { OpenapiTokenApi } from './openapiToken.api';
import { getCurrentTime } from '../util/openapiUtil.api';
import { Alarm } from '@/alarm/domain/alarm.entity';
import { Stock } from '@/stock/domain/stock.entity';
import { StockData, StockMinutely } from '@/stock/domain/stockData.entity';

const STOCK_CUT = 4;

@Injectable()
export class OpenapiMinuteData {
private stock: Stock[][] = [];
private readonly entity = StockMinutely;
private readonly url: string =
'/uapi/domestic-stock/v1/quotations/inquire-time-itemchartprice';
private readonly intervals: number = 130;
private flip: number = 0;
private readonly STOCK_LIMITS: number = 200;
constructor(
private readonly datasource: DataSource,
private readonly openApiToken: OpenapiTokenApi,
private readonly openapiQueue: OpenapiQueue,
@Inject('winston') private readonly logger: Logger,
) {
//this.getStockData();
}
) {}

async getStockData() {
@Cron(`* 9-15 * * 1-5`)
async getStockMinuteData() {
if (process.env.NODE_ENV !== 'production') return;
const stock = await this.datasource.manager.findBy(Stock, {
isTrading: true,
});
const stockSize = Math.ceil(stock.length / STOCK_CUT);
let i = 0;
this.stock = [];
while (i < STOCK_CUT) {
this.stock.push(stock.slice(i * stockSize, (i + 1) * stockSize));
i++;
const alarms = await this.datasource.manager
.getRepository(Alarm)
.createQueryBuilder('alarm')
.leftJoin('alarm.stock', 'stock')
.select('stock.id', 'stockId')
.addSelect('COUNT(alarm.id)', 'alarmCount')
.groupBy('stock.id')
.orderBy('alarmCount', 'DESC')
.limit(this.STOCK_LIMITS)
.execute();
for (const alarm of alarms) {
const time = getCurrentTime();
const query = this.getUpdateStockQuery(alarm.stockId, time);
const node: OpenapiQueueNodeValue = {
url: this.url,
query,
trId: TR_IDS.MINUTE_DATA,
callback: this.getStockMinuteDataCallback(alarm.stockId, time),
};
this.openapiQueue.enqueue(node);
}
}

getStockMinuteDataCallback(stockId: string, time: string) {
return async (data: Json) => {
let output1: MinuteDataOutput1, output2: MinuteDataOutput2[];
if (data.output1 && isMinuteDataOutput1(data.output1)) {
output1 = data.output1;
} else {
this.logger.info(`${stockId} has invalid minute data`);
return;
}
if (
data.output2 &&
data.output2[0] &&
isMinuteDataOutput2(data.output2[0])
) {
output2 = data.output2 as MinuteDataOutput2[];
} else {
this.logger.info(`${stockId} has invalid minute data`);
return;
}
const minuteDatas: MinuteData[] = output2.map((val): MinuteData => {
return { acml_vol: output1.acml_vol, ...val };
});
await this.saveMinuteData(stockId, minuteDatas, time);
};
}

private async saveMinuteData(
stockId: string,
item: MinuteData[],
time: string,
) {
if (!this.isMarketOpenTime(time)) return;
const stockPeriod = item.map((val) =>
this.convertResToMinuteData(stockId, val, time),
);
if (stockPeriod[0]) {
this.datasource.manager.upsert(this.entity, stockPeriod[0], [
'stock.id',
'startTime',
]);
}
}

Expand All @@ -64,7 +120,7 @@ export class OpenapiMinuteData {
stockPeriod.open = parseInt(item.stck_oprc);
stockPeriod.high = parseInt(item.stck_hgpr);
stockPeriod.low = parseInt(item.stck_lwpr);
stockPeriod.volume = parseInt(item.cntg_vol);
stockPeriod.volume = parseInt(item.acml_vol);
stockPeriod.createdAt = new Date();
return stockPeriod;
}
Expand All @@ -74,69 +130,6 @@ export class OpenapiMinuteData {
return numberTime >= 90000 && numberTime <= 153000;
}

private async saveMinuteData(
stockId: string,
item: MinuteData[],
time: string,
) {
const manager = this.datasource.manager;
if (!this.isMarketOpenTime(time)) return;
const stockPeriod = item.map((val) =>
this.convertResToMinuteData(stockId, val, time),
);
manager.save(this.entity, stockPeriod);
}

private async getMinuteDataInterval(
stockId: string,
time: string,
config: typeof openApiConfig,
) {
const query = this.getUpdateStockQuery(stockId, time);
try {
const response = await getOpenApi(
this.url,
config,
query,
TR_IDS.MINUTE_DATA,
);
let output;
if (response.output2) output = response.output2;
if (output && output[0] && isMinuteData(output[0])) {
this.saveMinuteData(stockId, output, time);
}
} catch (error) {
this.logger.warn(error);
}
}

private async getMinuteDataChunk(
chunk: Stock[],
config: typeof openApiConfig,
) {
const time = getCurrentTime();
let interval = 0;
for await (const stock of chunk) {
setTimeout(
() => this.getMinuteDataInterval(stock.id!, time, config),
interval,
);
interval += this.intervals;
}
}

async getMinuteData() {
if (process.env.NODE_ENV !== 'production') return;
const configCount = (await this.openApiToken.configs()).length;
const stock = this.stock[this.flip % STOCK_CUT];
this.flip++;
const chunkSize = Math.ceil(stock.length / configCount);
for (let i = 0; i < configCount; i++) {
const chunk = stock.slice(i * chunkSize, (i + 1) * chunkSize);
this.getMinuteDataChunk(chunk, (await this.openApiToken.configs())[i]);
}
}

private getUpdateStockQuery(
stockId: string,
time: string,
Expand Down
5 changes: 3 additions & 2 deletions packages/backend/src/scraper/openapi/liveData.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ export class LiveData {
stockId,
'1',
);
this.logger.info(`${idx} : ${message}`);
this.websocketClient[idx].subscribe(message);
return;
}
Expand All @@ -99,7 +100,7 @@ export class LiveData {
stockId,
'2',
);

this.logger.info(`${idx} : ${message}`);
this.websocketClient[idx].unsubscribe(message);
}
}
Expand Down Expand Up @@ -130,7 +131,7 @@ export class LiveData {
return;
}
const liveData = this.openapiLiveData.convertLiveData(message);
await this.openapiLiveData.saveLiveData(liveData[0])
await this.openapiLiveData.saveLiveData(liveData[0]);
} catch (error) {
this.logger.warn(error);
}
Expand Down
60 changes: 49 additions & 11 deletions packages/backend/src/scraper/openapi/type/openapiMinuteData.type.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,73 @@
/* eslint-disable @typescript-eslint/no-explicit-any */

export type MinuteData = {
export type MinuteDataOutput1 = {
prdy_vrss: string;
prdy_vrss_sign: string;
prdy_ctrt: string;
stck_prdy_clpr: string;
acml_vol: string;
acml_tr_pbmn: string;
hts_kor_isnm: string;
stck_prpr: string;
};

export type MinuteDataOutput2 = {
stck_bsop_date: string;
stck_cntg_hour: string;
acml_tr_pbmn: string;
stck_prpr: string;
stck_oprc: string;
stck_hgpr: string;
stck_lwpr: string;
cntg_vol: string;
};

export type MinuteData = {
stck_bsop_date: string;
stck_cntg_hour: string;
acml_tr_pbmn: string;
acml_vol: string;
stck_prpr: string;
stck_oprc: string;
stck_hgpr: string;
stck_lwpr: string;
cntg_vol: string;
};

export type UpdateStockQuery = {
fid_etc_cls_code: string;
fid_cond_mrkt_div_code: 'J' | 'W';
fid_input_iscd: string;
fid_input_hour_1: string;
fid_pw_data_incu_yn: 'Y' | 'N';
export const isMinuteDataOutput1 = (data: any): data is MinuteDataOutput1 => {
return (
data !== null &&
typeof data === 'object' &&
typeof data.prdy_vrss === 'string' &&
typeof data.prdy_vrss_sign === 'string' &&
typeof data.prdy_ctrt === 'string' &&
typeof data.stck_prdy_clpr === 'string' &&
typeof data.acml_vol === 'string' &&
typeof data.acml_tr_pbmn === 'string' &&
typeof data.hts_kor_isnm === 'string' &&
typeof data.stck_prpr === 'string'
);
};

export const isMinuteData = (data: any) => {
export const isMinuteDataOutput2 = (data: any): data is MinuteDataOutput2 => {
return (
data &&
data !== null &&
typeof data === 'object' &&
typeof data.stck_bsop_date === 'string' &&
typeof data.stck_cntg_hour === 'string' &&
typeof data.acml_tr_pbmn === 'string' &&
typeof data.stck_prpr === 'string' &&
typeof data.stck_oprc === 'string' &&
typeof data.stck_hgpr === 'string' &&
typeof data.stck_lwpr === 'string' &&
typeof data.cntg_vol === 'string' &&
typeof data.acml_tr_pbmn === 'string'
typeof data.cntg_vol === 'string'
);
};

export type UpdateStockQuery = {
fid_etc_cls_code: string;
fid_cond_mrkt_div_code: 'J' | 'W';
fid_input_iscd: string;
fid_input_hour_1: string;
fid_pw_data_incu_yn: 'Y' | 'N';
};

0 comments on commit bde9d72

Please sign in to comment.