Skip to content

Commit

Permalink
fix: add jsdocs and fix imports
Browse files Browse the repository at this point in the history
  • Loading branch information
0xnigir1 committed Dec 26, 2024
1 parent 0bb15fa commit 7721c0a
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 7 deletions.
5 changes: 5 additions & 0 deletions apps/processing/src/services/processing.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,12 @@ export class ProcessingService {
}
}

/**
* Process retroactive events for all chains
* - This is a blocking operation that will run until all retroactive events are processed
*/
async processRetroactiveEvents(): Promise<void> {
this.logger.info("Processing retroactive events...");
for (const [_, retroactiveProcessor] of this.orchestrators.values()) {
await retroactiveProcessor.processRetroactiveStrategies();
}
Expand Down
2 changes: 0 additions & 2 deletions apps/processing/test/unit/processing.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,6 @@ describe("ProcessingService", () => {
// Wait for orchestrators to start
await new Promise((resolve) => setTimeout(resolve, 100));

// Verify both orchestrators are running
// const orchestratorInstances = vi.mocked(Orchestrator).mock.results;
// Verify both orchestrators are running
expect(runSpy).toHaveBeenCalledTimes(2);
expect(runSpy.mock.calls.map((call) => call[0])).toEqual([
Expand Down
23 changes: 21 additions & 2 deletions packages/data-flow/src/retroactiveProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import {
stringify,
} from "@grants-stack-indexer/shared";

import { EventsProcessor } from "./eventsProcessor.js";
import {
CoreDependencies,
DataLoader,
EventsFetcher,
EventsProcessor,
IEventsFetcher,
IEventsRegistry,
InvalidEvent,
Expand Down Expand Up @@ -69,7 +69,7 @@ export class RetroactiveProcessor {
* @param dependencies - Core system dependencies for data access and processing
* @param indexerClient - Client for fetching blockchain events
* @param registries - Event and strategy registries for tracking processing state
* @param fetchLimit - Maximum number of events to fetch in a single batch
* @param fetchLimit - Maximum number of events to fetch in a single batch (default: 1000)
* @param logger - Logger instance for debugging and monitoring
*/
constructor(
Expand Down Expand Up @@ -109,6 +109,8 @@ export class RetroactiveProcessor {
* @returns Promise that resolves when all retroactive processing is complete
*/
async processRetroactiveStrategies(): Promise<void> {
this.logger.info(`Processing retroactive strategies for chain ${this.chainId}`);

const newHandleableStrategies = await this.findNewHandleableStrategies();

if (newHandleableStrategies.size === 0) {
Expand Down Expand Up @@ -217,6 +219,11 @@ export class RetroactiveProcessor {
await this.checkpointRepository.deleteCheckpoint(this.chainId, strategyId);
}

/**
* Update the checkpoint for a strategy
* @param strategyId - The strategy ID
* @param currentPointer - The current event pointer
*/
private async updateCheckpoint(strategyId: Hex, currentPointer: EventPointer): Promise<void> {
const checkpointData = {
chainId: this.chainId,
Expand All @@ -228,6 +235,13 @@ export class RetroactiveProcessor {
await this.checkpointRepository.upsertCheckpoint(checkpointData);
}

/**
* Enqueue events if the queue is empty
* @param queue - The queue to enqueue events into
* @param strategyAddresses - The set of strategy addresses
* @param currentPointer - The current event pointer
* @param lastEventPointer - The last event pointer
*/
private async enqueueEventsIfEmpty(
queue: Queue<ProcessorEvent<ContractName, AnyEvent> & { strategyId?: Hex }>,
strategyAddresses: Set<Address>,
Expand Down Expand Up @@ -276,6 +290,11 @@ export class RetroactiveProcessor {
);
}

/**
* Mark a strategy as handled for all addresses covered by the strategy
* @param strategyId - The strategy ID
* @param addresses - The set of strategy addresses
*/
private async markStrategyAsHandled(strategyId: Hex, addresses: Set<Address>): Promise<void> {
this.logger.info(`Processed retroactively strategy ${strategyId}`);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ import { Kysely } from "kysely";

import { ChainId, Hex } from "@grants-stack-indexer/shared";

import { Database } from "../../db/connection.js";
import { IStrategyProcessingCheckpointRepository } from "../../interfaces/strategyProcessingCheckpointRepository.interface.js";
import { NewStrategyProcessingCheckpoint, StrategyProcessingCheckpoint } from "../../internal.js";
import {
Database,
IStrategyProcessingCheckpointRepository,
NewStrategyProcessingCheckpoint,
StrategyProcessingCheckpoint,
} from "../../internal.js";

export class KyselyStrategyProcessingCheckpointRepository
implements IStrategyProcessingCheckpointRepository
Expand Down

0 comments on commit 7721c0a

Please sign in to comment.