Skip to content

Commit

Permalink
feat: update processing service to use events registry and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
0xnigir1 committed Dec 17, 2024
1 parent 0f4214f commit c996d08
Show file tree
Hide file tree
Showing 16 changed files with 273 additions and 172 deletions.
17 changes: 10 additions & 7 deletions apps/processing/src/services/processing.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ import { SharedDependencies, SharedDependenciesService } from "./index.js";
/**
* Processor service application
* - Initializes core dependencies (repositories, providers) via SharedDependenciesService
* - Initializes a StrategyRegistry and loads it with strategies from the database
* For each chain:
* - Sets up EVM provider with configured RPC endpoints
* - Instantiates an EventsRegistry and loads it with the last processed event for the chain
* - Creates an Orchestrator instance to coordinate an specific chain:
* - Fetching on-chain events via indexer client
* - Processing events through registered handlers
Expand Down Expand Up @@ -51,19 +53,20 @@ export class ProcessingService {
strategyRegistryRepository,
),
);
const eventsRegistry = new DatabaseEventRegistry(
new Logger({ className: "DatabaseEventRegistry" }),
eventRegistryRepository,
);

for (const chain of chains) {
const chainLogger = new Logger({ chainId: chain.id as ChainId });
// Initialize EVM provider
const evmProvider = new EvmProvider(chain.rpcUrls, optimism, chainLogger);

// Initialize events registry
const eventsRegistry = await InMemoryCachedEventRegistry.initialize(
// Initialize events registry for the chain
const cachedEventsRegistry = await InMemoryCachedEventRegistry.initialize(
new Logger({ className: "InMemoryCachedEventRegistry" }),
new DatabaseEventRegistry(
new Logger({ className: "DatabaseEventRegistry" }),
eventRegistryRepository,
),
eventsRegistry,
[chain.id as ChainId],
);

Expand All @@ -74,7 +77,7 @@ export class ProcessingService {
{ ...core, evmProvider },
indexerClient,
{
eventsRegistry,
eventsRegistry: cachedEventsRegistry,
strategyRegistry,
},
chain.fetchLimit,
Expand Down
2 changes: 1 addition & 1 deletion apps/processing/src/services/sharedDependencies.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export type SharedDependencies = {
/**
* Shared dependencies service
* - Initializes core dependencies (repositories, providers)
* - Initializes registries
* - Initializes registries repositories
* - Initializes indexer client
*/
export class SharedDependenciesService {
Expand Down
48 changes: 46 additions & 2 deletions apps/processing/test/unit/processing.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";

import { EvmProvider } from "@grants-stack-indexer/chain-providers";
import { Orchestrator } from "@grants-stack-indexer/data-flow";
import {
DatabaseEventRegistry,
DatabaseStrategyRegistry,
InMemoryCachedEventRegistry,
InMemoryCachedStrategyRegistry,
Orchestrator,
} from "@grants-stack-indexer/data-flow";

import type { Environment } from "../../src/config/env.js";
import { ProcessingService } from "../../src/services/processing.service.js";
Expand All @@ -10,7 +16,7 @@ vi.mock("../../src/services/sharedDependencies.service.js", () => ({
SharedDependenciesService: {
initialize: vi.fn(() => ({
core: {},
registries: {},
registriesRepositories: {},
indexerClient: {},
kyselyDatabase: {
destroy: vi.fn(),
Expand All @@ -23,6 +29,39 @@ vi.mock("@grants-stack-indexer/chain-providers", () => ({
EvmProvider: vi.fn(),
}));

vi.mock("@grants-stack-indexer/data-flow", async (importOriginal) => {
const actual = await importOriginal<typeof import("@grants-stack-indexer/data-flow")>();
const mockStrategyRegistry = {
getStrategies: vi.fn(),
getStrategyId: vi.fn(),
saveStrategyId: vi.fn(),
};

const mockEventRegistry = {
getLastProcessedEvent: vi.fn(),
saveLastProcessedEvent: vi.fn(),
};

return {
...actual,
InMemoryCachedStrategyRegistry: {
initialize: vi.fn().mockResolvedValue(mockStrategyRegistry),
},
DatabaseStrategyRegistry: vi.fn().mockImplementation(() => ({
getStrategies: vi.fn(),
getStrategyId: vi.fn(),
saveStrategyId: vi.fn(),
})),
DatabaseEventRegistry: vi.fn().mockImplementation(() => ({
getLastProcessedEvent: vi.fn(),
saveLastProcessedEvent: vi.fn(),
})),
InMemoryCachedEventRegistry: {
initialize: vi.fn().mockResolvedValue(mockEventRegistry),
},
};
});

vi.spyOn(Orchestrator.prototype, "run").mockImplementation(async function (signal: AbortSignal) {
while (!signal.aborted) {
await new Promise((resolve) => setTimeout(resolve, 100));
Expand Down Expand Up @@ -62,7 +101,12 @@ describe("ProcessingService", () => {
});

it("initializes multiple orchestrators correctly", () => {
expect(InMemoryCachedStrategyRegistry.initialize).toHaveBeenCalledTimes(1);
expect(DatabaseStrategyRegistry).toHaveBeenCalledTimes(1);
expect(DatabaseEventRegistry).toHaveBeenCalledTimes(1);
expect(EvmProvider).toHaveBeenCalledTimes(2);
expect(InMemoryCachedEventRegistry.initialize).toHaveBeenCalledTimes(2);

// Verify orchestrators were created with correct parameters
expect(processingService["orchestrators"].size).toBe(2);

Expand Down
16 changes: 14 additions & 2 deletions apps/processing/test/unit/sharedDependencies.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ vi.mock("@grants-stack-indexer/repository", () => ({
getStrategyId: vi.fn(),
saveStrategyId: vi.fn(),
})),
KyselyEventRegistryRepository: vi.fn(),
}));

vi.mock("@grants-stack-indexer/pricing", () => ({
Expand All @@ -45,8 +46,12 @@ vi.mock("@grants-stack-indexer/data-flow", () => {
saveStrategyId: vi.fn(),
};

const mockEventRegistry = {
getLastProcessedEvent: vi.fn(),
saveLastProcessedEvent: vi.fn(),
};

return {
InMemoryEventsRegistry: vi.fn(),
InMemoryCachedStrategyRegistry: {
initialize: vi.fn().mockResolvedValue(mockStrategyRegistry),
},
Expand All @@ -55,6 +60,13 @@ vi.mock("@grants-stack-indexer/data-flow", () => {
getStrategyId: vi.fn(),
saveStrategyId: vi.fn(),
})),
DatabaseEventRegistry: vi.fn().mockImplementation(() => ({
getLastProcessedEvent: vi.fn(),
saveLastProcessedEvent: vi.fn(),
})),
InMemoryCachedEventRegistry: {
initialize: vi.fn().mockResolvedValue(mockEventRegistry),
},
};
});

Expand Down Expand Up @@ -98,7 +110,7 @@ describe("SharedDependenciesService", () => {

// Verify structure of returned dependencies
expect(dependencies).toHaveProperty("core");
expect(dependencies).toHaveProperty("registries");
expect(dependencies).toHaveProperty("registriesRepositories");
expect(dependencies).toHaveProperty("indexerClient");
expect(dependencies).toHaveProperty("kyselyDatabase");

Expand Down
1 change: 0 additions & 1 deletion packages/data-flow/src/external.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
export {
DataLoader,
InMemoryEventsRegistry,
InMemoryCachedStrategyRegistry,
InMemoryCachedEventRegistry,
DatabaseEventRegistry,
Expand Down
1 change: 0 additions & 1 deletion packages/data-flow/src/internal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,5 @@ export * from "./utils/index.js";
export * from "./data-loader/index.js";
export * from "./eventsFetcher.js";
export * from "./registries/index.js";
export * from "./registries/event/eventsRegistry.js";
export * from "./eventsProcessor.js";
export * from "./orchestrator.js";
5 changes: 4 additions & 1 deletion packages/data-flow/src/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ export class Orchestrator {
await delay(this.fetchDelayInMs);
continue;
}
await this.eventsRegistry.saveLastProcessedEvent(this.chainId, event);
await this.eventsRegistry.saveLastProcessedEvent(this.chainId, {
...event,
rawEvent: event,
});

event = await this.enhanceStrategyId(event);
if (this.isPoolCreated(event)) {
Expand Down
30 changes: 0 additions & 30 deletions packages/data-flow/src/registries/event/eventsRegistry.ts

This file was deleted.

1 change: 0 additions & 1 deletion packages/data-flow/src/registries/event/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
export * from "./eventsRegistry.js";
export * from "./cachedEventRegistry.js";
export * from "./dbEventRegistry.js";
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ export class InMemoryCachedStrategyRegistry implements IStrategyRegistry {

const strategy = await this.strategyRegistry.getStrategyId(chainId, strategyAddress);
if (strategy) {
this.cache.get(chainId)?.set(strategyAddress, strategy);
if (!this.cache.has(strategy.chainId)) {
this.cache.set(strategy.chainId, new Map());
}

this.cache.get(strategy.chainId)?.set(strategyAddress, strategy);
}
return strategy;
}
Expand All @@ -79,6 +83,10 @@ export class InMemoryCachedStrategyRegistry implements IStrategyRegistry {
);
await this.strategyRegistry.saveStrategyId(chainId, strategyAddress, strategyId, handled);

if (!this.cache.has(chainId)) {
this.cache.set(chainId, new Map());
}

this.cache.get(chainId)?.set(strategyAddress, {
address: strategyAddress,
id: strategyId,
Expand Down
107 changes: 107 additions & 0 deletions packages/data-flow/test/registries/cachedEventRegistry.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import { beforeEach, describe, expect, it, vi } from "vitest";

import { NewProcessedEvent, ProcessedEvent } from "@grants-stack-indexer/repository";
import { ChainId, ILogger } from "@grants-stack-indexer/shared";

import { IEventsRegistry } from "../../src/internal.js";
import { InMemoryCachedEventRegistry } from "../../src/registries/event/cachedEventRegistry.js";

describe("InMemoryCachedEventRegistry", () => {
const logger: ILogger = {
debug: vi.fn(),
error: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
};

const mockEventRegistry: IEventsRegistry = {
getLastProcessedEvent: vi.fn(),
saveLastProcessedEvent: vi.fn(),
};

const chainId = 1 as ChainId;
const mockEvent: ProcessedEvent = {
chainId,
blockNumber: 100,
blockTimestamp: 1234567890,
logIndex: 1,
};

beforeEach(() => {
vi.clearAllMocks();
});

it("initialize with existing events", async () => {
vi.mocked(mockEventRegistry.getLastProcessedEvent).mockResolvedValue(mockEvent);

const registry = await InMemoryCachedEventRegistry.initialize(logger, mockEventRegistry, [
chainId,
]);

const cached = await registry.getLastProcessedEvent(chainId);
expect(cached).toEqual(mockEvent);
expect(mockEventRegistry.getLastProcessedEvent).toHaveBeenCalledTimes(1);
});

it("fetch from underlying registry when not in cache", async () => {
vi.mocked(mockEventRegistry.getLastProcessedEvent)
.mockResolvedValueOnce(undefined) // For initialization
.mockResolvedValueOnce(mockEvent); // For actual fetch

const registry = await InMemoryCachedEventRegistry.initialize(logger, mockEventRegistry, [
chainId,
]);

const result = await registry.getLastProcessedEvent(chainId);
expect(result).toEqual(mockEvent);
expect(mockEventRegistry.getLastProcessedEvent).toHaveBeenCalledTimes(2);
});

it("save event and update cache", async () => {
const registry = await InMemoryCachedEventRegistry.initialize(logger, mockEventRegistry, [
chainId,
]);

const newEvent: NewProcessedEvent = {
blockNumber: 200,
blockTimestamp: 1234577890,
logIndex: 2,
};

await registry.saveLastProcessedEvent(chainId, newEvent);

// Verify the event was saved to underlying registry
expect(mockEventRegistry.saveLastProcessedEvent).toHaveBeenCalledWith(chainId, newEvent);

// Verify the cache was updated
const cached = await registry.getLastProcessedEvent(chainId);
expect(cached).toEqual({
...newEvent,
chainId,
});

// Verify no additional calls to underlying registry
expect(mockEventRegistry.getLastProcessedEvent).toHaveBeenCalledTimes(1);
});

it("initialize with multiple chain ids", async () => {
const chainId2 = 5 as ChainId;
const mockEvent2: ProcessedEvent = { ...mockEvent, chainId: chainId2 };

vi.mocked(mockEventRegistry.getLastProcessedEvent).mockImplementation(async (chain) =>
chain === chainId ? mockEvent : mockEvent2,
);

const registry = await InMemoryCachedEventRegistry.initialize(logger, mockEventRegistry, [
chainId,
chainId2,
]);

const cached1 = await registry.getLastProcessedEvent(chainId);
const cached2 = await registry.getLastProcessedEvent(chainId2);

expect(cached1).toEqual(mockEvent);
expect(cached2).toEqual(mockEvent2);
expect(mockEventRegistry.getLastProcessedEvent).toHaveBeenCalledTimes(2);
});
});
Loading

0 comments on commit c996d08

Please sign in to comment.