diff --git a/src/modules/analytics-indexer/analytics.indexer.controller.ts b/src/modules/analytics-indexer/analytics.indexer.controller.ts new file mode 100644 index 000000000..4cddda58c --- /dev/null +++ b/src/modules/analytics-indexer/analytics.indexer.controller.ts @@ -0,0 +1,85 @@ +import { + Body, + Controller, + Get, + HttpException, + HttpStatus, + Param, + Post, + UseGuards, + ValidationPipe, +} from '@nestjs/common'; +import mongoose from 'mongoose'; +import { CacheService } from '@multiversx/sdk-nestjs-cache'; +import { Constants } from '@multiversx/sdk-nestjs-common'; +import { IndexerSessionRepositoryService } from './services/indexer.session.repository.service'; +import { IndexerSession } from './schemas/indexer.session.schema'; +import { CreateSessionDto } from './entities/create.session.dto'; +import { IndexerPersistenceService } from './services/indexer.persistence.service'; +import { JwtOrNativeAdminGuard } from '../auth/jwt.or.native.admin.guard'; + +@Controller('analytics-indexer') +export class AnalyticsIndexerController { + constructor( + private readonly indexerSessionRepository: IndexerSessionRepositoryService, + private readonly indexerPersistenceService: IndexerPersistenceService, + private readonly cachingService: CacheService, + ) {} + + @UseGuards(JwtOrNativeAdminGuard) + @Get('/sessions') + async getSessions(): Promise { + return await this.indexerSessionRepository.find({}); + } + + @UseGuards(JwtOrNativeAdminGuard) + @Post('/sessions') + async addSession( + @Body( + new ValidationPipe({ whitelist: true, forbidNonWhitelisted: true }), + ) + createSessionDto: CreateSessionDto, + ): Promise { + try { + return await this.indexerPersistenceService.createIndexerSession( + createSessionDto, + ); + } catch (error) { + throw new HttpException(error.message, HttpStatus.BAD_REQUEST); + } + } + + @UseGuards(JwtOrNativeAdminGuard) + @Get('/sessions/:nameOrID') + async getSession( + @Param('nameOrID') nameOrID: string, + ): Promise { + return await this.indexerSessionRepository.findOne( + mongoose.Types.ObjectId.isValid(nameOrID) + ? { _id: nameOrID } + : { name: nameOrID }, + ); + } + + @UseGuards(JwtOrNativeAdminGuard) + @Post('/sessions/:nameOrID/abort') + async abortSession(@Param('nameOrID') nameOrID: string): Promise { + const session = await this.indexerSessionRepository.findOne( + mongoose.Types.ObjectId.isValid(nameOrID) + ? { _id: nameOrID } + : { name: nameOrID }, + ); + + if (!session) { + throw new HttpException('Session not found', HttpStatus.NOT_FOUND); + } + + this.cachingService.set( + `indexer.abortSession.${session.name}`, + true, + Constants.oneHour(), + ); + + return true; + } +} diff --git a/src/modules/analytics-indexer/analytics.indexer.module.ts b/src/modules/analytics-indexer/analytics.indexer.module.ts index 10bf1d0fc..913fbd48b 100644 --- a/src/modules/analytics-indexer/analytics.indexer.module.ts +++ b/src/modules/analytics-indexer/analytics.indexer.module.ts @@ -13,14 +13,29 @@ import { IndexerPriceDiscoveryService } from './services/indexer.price.discovery import { IndexerSwapHandlerService } from './services/event-handlers/indexer.swap.handler.service'; import { IndexerLiquidityHandlerService } from './services/event-handlers/indexer.liquidity.handler.service'; import { IndexerPriceDiscoveryHandlerService } from './services/event-handlers/indexer.price.discovery.handler.service'; +import { IndexerSessionRepositoryService } from './services/indexer.session.repository.service'; +import { IndexerPersistenceService } from './services/indexer.persistence.service'; +import { AnalyticsIndexerController } from './analytics.indexer.controller'; +import { DatabaseModule } from 'src/services/database/database.module'; +import { MongooseModule } from '@nestjs/mongoose'; +import { + IndexerSession, + IndexerSessionSchema, +} from './schemas/indexer.session.schema'; +import { MXCommunicationModule } from 'src/services/multiversx-communication/mx.communication.module'; @Module({ imports: [ + MXCommunicationModule, PairModule, RouterModule, TokenModule, PriceDiscoveryModule, ElasticSearchModule, + DatabaseModule, + MongooseModule.forFeature([ + { name: IndexerSession.name, schema: IndexerSessionSchema }, + ]), ], providers: [ IndexerService, @@ -32,7 +47,10 @@ import { IndexerPriceDiscoveryHandlerService } from './services/event-handlers/i IndexerSwapHandlerService, IndexerLiquidityHandlerService, IndexerPriceDiscoveryHandlerService, + IndexerSessionRepositoryService, + IndexerPersistenceService, ], exports: [IndexerService], + controllers: [AnalyticsIndexerController], }) export class AnalyticsIndexerModule {} diff --git a/src/modules/analytics-indexer/entities/create.session.dto.ts b/src/modules/analytics-indexer/entities/create.session.dto.ts new file mode 100644 index 000000000..3b18e7813 --- /dev/null +++ b/src/modules/analytics-indexer/entities/create.session.dto.ts @@ -0,0 +1,25 @@ +import { + ArrayMinSize, + ArrayUnique, + IsArray, + IsEnum, + IsInt, + IsOptional, + IsPositive, +} from 'class-validator'; +import { IndexerEventTypes } from './indexer.event.types'; + +export class CreateSessionDto { + @IsInt() + @IsPositive() + start: number; + @IsOptional() + @IsInt() + @IsPositive() + end?: number; + @IsArray() + @ArrayMinSize(1) + @ArrayUnique() + @IsEnum(IndexerEventTypes, { each: true }) + eventTypes: IndexerEventTypes[]; +} diff --git a/src/modules/analytics-indexer/schemas/indexer.session.schema.ts b/src/modules/analytics-indexer/schemas/indexer.session.schema.ts new file mode 100644 index 000000000..6d03a72e4 --- /dev/null +++ b/src/modules/analytics-indexer/schemas/indexer.session.schema.ts @@ -0,0 +1,52 @@ +import { Prop, Schema, SchemaFactory } from '@nestjs/mongoose'; +import { Document } from 'mongoose'; +import { IndexerEventTypes } from '../entities/indexer.event.types'; + +export enum IndexerStatus { + PENDING = 'PENDING', + IN_PROGRESS = 'IN_PROGRESS', + COMPLETED = 'COMPLETED', + FAILED = 'FAILED', + ABORTED = 'ABORTED', +} + +export class IndexerJob { + startTimestamp: number; + endTimestamp: number; + order: number; + status: IndexerStatus; + runAttempts = 0; + errorCount = 0; + durationMs = 0; + + constructor(init?: Partial) { + Object.assign(this, init); + } +} + +export type IndexerSessionDocument = IndexerSession & Document; + +@Schema({ + collection: 'indexer_sessions', +}) +export class IndexerSession { + @Prop({ required: true, unique: true }) + name: string; + @Prop({ required: true }) + startTimestamp: number; + @Prop({ required: true }) + endTimestamp: number; + @Prop(() => [IndexerEventTypes]) + eventTypes: IndexerEventTypes[]; + @Prop(() => [IndexerJob]) + jobs: IndexerJob[]; + @Prop(() => IndexerStatus) + status: IndexerStatus; + + constructor(init?: Partial) { + Object.assign(this, init); + } +} + +export const IndexerSessionSchema = + SchemaFactory.createForClass(IndexerSession); diff --git a/src/modules/analytics-indexer/services/indexer.persistence.service.ts b/src/modules/analytics-indexer/services/indexer.persistence.service.ts new file mode 100644 index 000000000..9accd5784 --- /dev/null +++ b/src/modules/analytics-indexer/services/indexer.persistence.service.ts @@ -0,0 +1,89 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { WINSTON_MODULE_PROVIDER } from 'nest-winston'; +import { Logger } from 'winston'; +import { + IndexerJob, + IndexerSession, + IndexerStatus, +} from '../schemas/indexer.session.schema'; +import moment from 'moment'; +import { Constants } from '@multiversx/sdk-nestjs-common'; +import { IndexerSessionRepositoryService } from './indexer.session.repository.service'; +import { CreateSessionDto } from '../entities/create.session.dto'; + +@Injectable() +export class IndexerPersistenceService { + constructor( + private readonly indexerSessionRepository: IndexerSessionRepositoryService, + @Inject(WINSTON_MODULE_PROVIDER) private readonly logger: Logger, + ) {} + + public async createIndexerSession( + createSessionDto: CreateSessionDto, + ): Promise { + const { start } = createSessionDto; + + const end = !createSessionDto.end + ? moment().unix() + : createSessionDto.end; + + if (start >= end) { + throw new Error('End timestamp should be after start'); + } + + const activeSession = await this.getActiveSession(); + + if (activeSession) { + throw new Error( + 'Cannot start a session while another one is in progress.', + ); + } + + return await this.indexerSessionRepository.create({ + name: `Session_${moment().unix()}`, + startTimestamp: start, + endTimestamp: end, + eventTypes: createSessionDto.eventTypes, + jobs: this.createSessionJobs(start, end), + status: IndexerStatus.PENDING, + }); + } + + public async getActiveSession(): Promise { + return await this.indexerSessionRepository.findOne({ + status: { + $in: [IndexerStatus.IN_PROGRESS, IndexerStatus.PENDING], + }, + }); + } + + private createSessionJobs(start: number, end: number): IndexerJob[] { + const jobs: IndexerJob[] = []; + const oneWeek = Constants.oneWeek(); + + let currentStart = start; + let order = 0; + + while (currentStart <= end) { + let currentEnd = Math.min(currentStart + oneWeek, end); + + // avoid edge case where remaining interval is a single second + if (currentEnd + 1 === end) { + currentEnd -= 1; + } + + jobs.push( + new IndexerJob({ + startTimestamp: currentStart, + endTimestamp: currentEnd, + order: order, + status: IndexerStatus.PENDING, + }), + ); + currentStart = currentEnd + 1; + order += 1; + } + + return jobs; + } +} diff --git a/src/modules/analytics-indexer/services/indexer.session.repository.service.ts b/src/modules/analytics-indexer/services/indexer.session.repository.service.ts new file mode 100644 index 000000000..9da7af4c6 --- /dev/null +++ b/src/modules/analytics-indexer/services/indexer.session.repository.service.ts @@ -0,0 +1,18 @@ +import { Injectable } from '@nestjs/common'; +import { InjectModel } from '@nestjs/mongoose'; +import { Model } from 'mongoose'; +import { EntityRepository } from 'src/services/database/repositories/entity.repository'; +import { + IndexerSession, + IndexerSessionDocument, +} from '../schemas/indexer.session.schema'; + +@Injectable() +export class IndexerSessionRepositoryService extends EntityRepository { + constructor( + @InjectModel(IndexerSession.name) + private readonly indexerSessionModel: Model, + ) { + super(indexerSessionModel); + } +} diff --git a/src/private.app.module.ts b/src/private.app.module.ts index 39eff5a97..fae2896b8 100644 --- a/src/private.app.module.ts +++ b/src/private.app.module.ts @@ -8,6 +8,7 @@ import { TokenController } from './modules/tokens/token.controller'; import { TokenModule } from './modules/tokens/token.module'; import { DynamicModuleUtils } from './utils/dynamic.module.utils'; import { ESTransactionsService } from './services/elastic-search/services/es.transactions.service'; +import { AnalyticsIndexerModule } from './modules/analytics-indexer/analytics.indexer.module'; @Module({ imports: [ @@ -16,6 +17,7 @@ import { ESTransactionsService } from './services/elastic-search/services/es.tra TokenModule, RemoteConfigModule, DynamicModuleUtils.getCacheModule(), + AnalyticsIndexerModule, ], controllers: [MetricsController, TokenController, RemoteConfigController], providers: [ESTransactionsService],