From c996d08259007cdaa58a3754ab9cc92baec094dc Mon Sep 17 00:00:00 2001 From: nigiri <168690269+0xnigir1@users.noreply.github.com> Date: Tue, 17 Dec 2024 15:22:19 -0300 Subject: [PATCH] feat: update processing service to use events registry and add tests --- .../src/services/processing.service.ts | 17 +-- .../services/sharedDependencies.service.ts | 2 +- .../test/unit/processing.service.spec.ts | 48 +++++++- .../unit/sharedDependencies.service.spec.ts | 16 ++- packages/data-flow/src/external.ts | 1 - packages/data-flow/src/internal.ts | 1 - packages/data-flow/src/orchestrator.ts | 5 +- .../src/registries/event/eventsRegistry.ts | 30 ----- .../data-flow/src/registries/event/index.ts | 1 - .../strategy/cachedStrategyRegistry.ts | 10 +- .../registries/cachedEventRegistry.spec.ts | 107 ++++++++++++++++++ .../test/registries/dbEventRegistry.spec.ts | 66 +++++++++++ .../test/unit/eventsRegistry.spec.ts | 105 ----------------- .../data-flow/test/unit/orchestrator.spec.ts | 19 +--- .../kysely/eventRegistry.repository.ts | 13 ++- packages/repository/src/types/event.types.ts | 4 +- 16 files changed, 273 insertions(+), 172 deletions(-) delete mode 100644 packages/data-flow/src/registries/event/eventsRegistry.ts create mode 100644 packages/data-flow/test/registries/cachedEventRegistry.spec.ts create mode 100644 packages/data-flow/test/registries/dbEventRegistry.spec.ts delete mode 100644 packages/data-flow/test/unit/eventsRegistry.spec.ts diff --git a/apps/processing/src/services/processing.service.ts b/apps/processing/src/services/processing.service.ts index 5725a95..0cbe94a 100644 --- a/apps/processing/src/services/processing.service.ts +++ b/apps/processing/src/services/processing.service.ts @@ -16,8 +16,10 @@ import { SharedDependencies, SharedDependenciesService } from "./index.js"; /** * Processor service application * - Initializes core dependencies (repositories, providers) via SharedDependenciesService + * - Initializes a StrategyRegistry and loads it with strategies from the database * For each chain: * - Sets up EVM provider with configured RPC endpoints + * - Instantiates an EventsRegistry and loads it with the last processed event for the chain * - Creates an Orchestrator instance to coordinate an specific chain: * - Fetching on-chain events via indexer client * - Processing events through registered handlers @@ -51,19 +53,20 @@ export class ProcessingService { strategyRegistryRepository, ), ); + const eventsRegistry = new DatabaseEventRegistry( + new Logger({ className: "DatabaseEventRegistry" }), + eventRegistryRepository, + ); for (const chain of chains) { const chainLogger = new Logger({ chainId: chain.id as ChainId }); // Initialize EVM provider const evmProvider = new EvmProvider(chain.rpcUrls, optimism, chainLogger); - // Initialize events registry - const eventsRegistry = await InMemoryCachedEventRegistry.initialize( + // Initialize events registry for the chain + const cachedEventsRegistry = await InMemoryCachedEventRegistry.initialize( new Logger({ className: "InMemoryCachedEventRegistry" }), - new DatabaseEventRegistry( - new Logger({ className: "DatabaseEventRegistry" }), - eventRegistryRepository, - ), + eventsRegistry, [chain.id as ChainId], ); @@ -74,7 +77,7 @@ export class ProcessingService { { ...core, evmProvider }, indexerClient, { - eventsRegistry, + eventsRegistry: cachedEventsRegistry, strategyRegistry, }, chain.fetchLimit, diff --git a/apps/processing/src/services/sharedDependencies.service.ts b/apps/processing/src/services/sharedDependencies.service.ts index 033c386..3fb29fe 100644 --- a/apps/processing/src/services/sharedDependencies.service.ts +++ b/apps/processing/src/services/sharedDependencies.service.ts @@ -31,7 +31,7 @@ export type SharedDependencies = { /** * Shared dependencies service * - Initializes core dependencies (repositories, providers) - * - Initializes registries + * - Initializes registries repositories * - Initializes indexer client */ export class SharedDependenciesService { diff --git a/apps/processing/test/unit/processing.service.spec.ts b/apps/processing/test/unit/processing.service.spec.ts index 6623fde..15b59ff 100644 --- a/apps/processing/test/unit/processing.service.spec.ts +++ b/apps/processing/test/unit/processing.service.spec.ts @@ -1,7 +1,13 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { EvmProvider } from "@grants-stack-indexer/chain-providers"; -import { Orchestrator } from "@grants-stack-indexer/data-flow"; +import { + DatabaseEventRegistry, + DatabaseStrategyRegistry, + InMemoryCachedEventRegistry, + InMemoryCachedStrategyRegistry, + Orchestrator, +} from "@grants-stack-indexer/data-flow"; import type { Environment } from "../../src/config/env.js"; import { ProcessingService } from "../../src/services/processing.service.js"; @@ -10,7 +16,7 @@ vi.mock("../../src/services/sharedDependencies.service.js", () => ({ SharedDependenciesService: { initialize: vi.fn(() => ({ core: {}, - registries: {}, + registriesRepositories: {}, indexerClient: {}, kyselyDatabase: { destroy: vi.fn(), @@ -23,6 +29,39 @@ vi.mock("@grants-stack-indexer/chain-providers", () => ({ EvmProvider: vi.fn(), })); +vi.mock("@grants-stack-indexer/data-flow", async (importOriginal) => { + const actual = await importOriginal(); + const mockStrategyRegistry = { + getStrategies: vi.fn(), + getStrategyId: vi.fn(), + saveStrategyId: vi.fn(), + }; + + const mockEventRegistry = { + getLastProcessedEvent: vi.fn(), + saveLastProcessedEvent: vi.fn(), + }; + + return { + ...actual, + InMemoryCachedStrategyRegistry: { + initialize: vi.fn().mockResolvedValue(mockStrategyRegistry), + }, + DatabaseStrategyRegistry: vi.fn().mockImplementation(() => ({ + getStrategies: vi.fn(), + getStrategyId: vi.fn(), + saveStrategyId: vi.fn(), + })), + DatabaseEventRegistry: vi.fn().mockImplementation(() => ({ + getLastProcessedEvent: vi.fn(), + saveLastProcessedEvent: vi.fn(), + })), + InMemoryCachedEventRegistry: { + initialize: vi.fn().mockResolvedValue(mockEventRegistry), + }, + }; +}); + vi.spyOn(Orchestrator.prototype, "run").mockImplementation(async function (signal: AbortSignal) { while (!signal.aborted) { await new Promise((resolve) => setTimeout(resolve, 100)); @@ -62,7 +101,12 @@ describe("ProcessingService", () => { }); it("initializes multiple orchestrators correctly", () => { + expect(InMemoryCachedStrategyRegistry.initialize).toHaveBeenCalledTimes(1); + expect(DatabaseStrategyRegistry).toHaveBeenCalledTimes(1); + expect(DatabaseEventRegistry).toHaveBeenCalledTimes(1); expect(EvmProvider).toHaveBeenCalledTimes(2); + expect(InMemoryCachedEventRegistry.initialize).toHaveBeenCalledTimes(2); + // Verify orchestrators were created with correct parameters expect(processingService["orchestrators"].size).toBe(2); diff --git a/apps/processing/test/unit/sharedDependencies.service.spec.ts b/apps/processing/test/unit/sharedDependencies.service.spec.ts index e1b2ab0..bff277e 100644 --- a/apps/processing/test/unit/sharedDependencies.service.spec.ts +++ b/apps/processing/test/unit/sharedDependencies.service.spec.ts @@ -22,6 +22,7 @@ vi.mock("@grants-stack-indexer/repository", () => ({ getStrategyId: vi.fn(), saveStrategyId: vi.fn(), })), + KyselyEventRegistryRepository: vi.fn(), })); vi.mock("@grants-stack-indexer/pricing", () => ({ @@ -45,8 +46,12 @@ vi.mock("@grants-stack-indexer/data-flow", () => { saveStrategyId: vi.fn(), }; + const mockEventRegistry = { + getLastProcessedEvent: vi.fn(), + saveLastProcessedEvent: vi.fn(), + }; + return { - InMemoryEventsRegistry: vi.fn(), InMemoryCachedStrategyRegistry: { initialize: vi.fn().mockResolvedValue(mockStrategyRegistry), }, @@ -55,6 +60,13 @@ vi.mock("@grants-stack-indexer/data-flow", () => { getStrategyId: vi.fn(), saveStrategyId: vi.fn(), })), + DatabaseEventRegistry: vi.fn().mockImplementation(() => ({ + getLastProcessedEvent: vi.fn(), + saveLastProcessedEvent: vi.fn(), + })), + InMemoryCachedEventRegistry: { + initialize: vi.fn().mockResolvedValue(mockEventRegistry), + }, }; }); @@ -98,7 +110,7 @@ describe("SharedDependenciesService", () => { // Verify structure of returned dependencies expect(dependencies).toHaveProperty("core"); - expect(dependencies).toHaveProperty("registries"); + expect(dependencies).toHaveProperty("registriesRepositories"); expect(dependencies).toHaveProperty("indexerClient"); expect(dependencies).toHaveProperty("kyselyDatabase"); diff --git a/packages/data-flow/src/external.ts b/packages/data-flow/src/external.ts index 1d69f49..954c0b7 100644 --- a/packages/data-flow/src/external.ts +++ b/packages/data-flow/src/external.ts @@ -1,6 +1,5 @@ export { DataLoader, - InMemoryEventsRegistry, InMemoryCachedStrategyRegistry, InMemoryCachedEventRegistry, DatabaseEventRegistry, diff --git a/packages/data-flow/src/internal.ts b/packages/data-flow/src/internal.ts index ad35306..9224340 100644 --- a/packages/data-flow/src/internal.ts +++ b/packages/data-flow/src/internal.ts @@ -6,6 +6,5 @@ export * from "./utils/index.js"; export * from "./data-loader/index.js"; export * from "./eventsFetcher.js"; export * from "./registries/index.js"; -export * from "./registries/event/eventsRegistry.js"; export * from "./eventsProcessor.js"; export * from "./orchestrator.js"; diff --git a/packages/data-flow/src/orchestrator.ts b/packages/data-flow/src/orchestrator.ts index 2eb2266..0a2f53e 100644 --- a/packages/data-flow/src/orchestrator.ts +++ b/packages/data-flow/src/orchestrator.ts @@ -110,7 +110,10 @@ export class Orchestrator { await delay(this.fetchDelayInMs); continue; } - await this.eventsRegistry.saveLastProcessedEvent(this.chainId, event); + await this.eventsRegistry.saveLastProcessedEvent(this.chainId, { + ...event, + rawEvent: event, + }); event = await this.enhanceStrategyId(event); if (this.isPoolCreated(event)) { diff --git a/packages/data-flow/src/registries/event/eventsRegistry.ts b/packages/data-flow/src/registries/event/eventsRegistry.ts deleted file mode 100644 index 0bce55e..0000000 --- a/packages/data-flow/src/registries/event/eventsRegistry.ts +++ /dev/null @@ -1,30 +0,0 @@ -import type { ChainId, ILogger } from "@grants-stack-indexer/shared"; -import { NewProcessedEvent, ProcessedEvent } from "@grants-stack-indexer/repository"; -import { stringify } from "@grants-stack-indexer/shared"; - -import type { IEventsRegistry } from "../../internal.js"; - -/** - * Class to store the last processed event in memory - */ -//TODO: Implement storage version to persist the last processed event. we need to store it by chainId -export class InMemoryEventsRegistry implements IEventsRegistry { - private lastProcessedEvent: Map = new Map(); - - constructor(private logger: ILogger) {} - - /** - * @inheritdoc - */ - async getLastProcessedEvent(chainId: ChainId): Promise { - return this.lastProcessedEvent.get(chainId); - } - - /** - * @inheritdoc - */ - async saveLastProcessedEvent(chainId: ChainId, event: NewProcessedEvent): Promise { - this.logger.debug(`Saving last processed event: ${stringify(event, undefined, 4)}`); - this.lastProcessedEvent.set(chainId, { ...event, chainId }); - } -} diff --git a/packages/data-flow/src/registries/event/index.ts b/packages/data-flow/src/registries/event/index.ts index 47653b0..547781d 100644 --- a/packages/data-flow/src/registries/event/index.ts +++ b/packages/data-flow/src/registries/event/index.ts @@ -1,3 +1,2 @@ -export * from "./eventsRegistry.js"; export * from "./cachedEventRegistry.js"; export * from "./dbEventRegistry.js"; diff --git a/packages/data-flow/src/registries/strategy/cachedStrategyRegistry.ts b/packages/data-flow/src/registries/strategy/cachedStrategyRegistry.ts index 16899cc..05cfec8 100644 --- a/packages/data-flow/src/registries/strategy/cachedStrategyRegistry.ts +++ b/packages/data-flow/src/registries/strategy/cachedStrategyRegistry.ts @@ -58,7 +58,11 @@ export class InMemoryCachedStrategyRegistry implements IStrategyRegistry { const strategy = await this.strategyRegistry.getStrategyId(chainId, strategyAddress); if (strategy) { - this.cache.get(chainId)?.set(strategyAddress, strategy); + if (!this.cache.has(strategy.chainId)) { + this.cache.set(strategy.chainId, new Map()); + } + + this.cache.get(strategy.chainId)?.set(strategyAddress, strategy); } return strategy; } @@ -79,6 +83,10 @@ export class InMemoryCachedStrategyRegistry implements IStrategyRegistry { ); await this.strategyRegistry.saveStrategyId(chainId, strategyAddress, strategyId, handled); + if (!this.cache.has(chainId)) { + this.cache.set(chainId, new Map()); + } + this.cache.get(chainId)?.set(strategyAddress, { address: strategyAddress, id: strategyId, diff --git a/packages/data-flow/test/registries/cachedEventRegistry.spec.ts b/packages/data-flow/test/registries/cachedEventRegistry.spec.ts new file mode 100644 index 0000000..b06de51 --- /dev/null +++ b/packages/data-flow/test/registries/cachedEventRegistry.spec.ts @@ -0,0 +1,107 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { NewProcessedEvent, ProcessedEvent } from "@grants-stack-indexer/repository"; +import { ChainId, ILogger } from "@grants-stack-indexer/shared"; + +import { IEventsRegistry } from "../../src/internal.js"; +import { InMemoryCachedEventRegistry } from "../../src/registries/event/cachedEventRegistry.js"; + +describe("InMemoryCachedEventRegistry", () => { + const logger: ILogger = { + debug: vi.fn(), + error: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + }; + + const mockEventRegistry: IEventsRegistry = { + getLastProcessedEvent: vi.fn(), + saveLastProcessedEvent: vi.fn(), + }; + + const chainId = 1 as ChainId; + const mockEvent: ProcessedEvent = { + chainId, + blockNumber: 100, + blockTimestamp: 1234567890, + logIndex: 1, + }; + + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("initialize with existing events", async () => { + vi.mocked(mockEventRegistry.getLastProcessedEvent).mockResolvedValue(mockEvent); + + const registry = await InMemoryCachedEventRegistry.initialize(logger, mockEventRegistry, [ + chainId, + ]); + + const cached = await registry.getLastProcessedEvent(chainId); + expect(cached).toEqual(mockEvent); + expect(mockEventRegistry.getLastProcessedEvent).toHaveBeenCalledTimes(1); + }); + + it("fetch from underlying registry when not in cache", async () => { + vi.mocked(mockEventRegistry.getLastProcessedEvent) + .mockResolvedValueOnce(undefined) // For initialization + .mockResolvedValueOnce(mockEvent); // For actual fetch + + const registry = await InMemoryCachedEventRegistry.initialize(logger, mockEventRegistry, [ + chainId, + ]); + + const result = await registry.getLastProcessedEvent(chainId); + expect(result).toEqual(mockEvent); + expect(mockEventRegistry.getLastProcessedEvent).toHaveBeenCalledTimes(2); + }); + + it("save event and update cache", async () => { + const registry = await InMemoryCachedEventRegistry.initialize(logger, mockEventRegistry, [ + chainId, + ]); + + const newEvent: NewProcessedEvent = { + blockNumber: 200, + blockTimestamp: 1234577890, + logIndex: 2, + }; + + await registry.saveLastProcessedEvent(chainId, newEvent); + + // Verify the event was saved to underlying registry + expect(mockEventRegistry.saveLastProcessedEvent).toHaveBeenCalledWith(chainId, newEvent); + + // Verify the cache was updated + const cached = await registry.getLastProcessedEvent(chainId); + expect(cached).toEqual({ + ...newEvent, + chainId, + }); + + // Verify no additional calls to underlying registry + expect(mockEventRegistry.getLastProcessedEvent).toHaveBeenCalledTimes(1); + }); + + it("initialize with multiple chain ids", async () => { + const chainId2 = 5 as ChainId; + const mockEvent2: ProcessedEvent = { ...mockEvent, chainId: chainId2 }; + + vi.mocked(mockEventRegistry.getLastProcessedEvent).mockImplementation(async (chain) => + chain === chainId ? mockEvent : mockEvent2, + ); + + const registry = await InMemoryCachedEventRegistry.initialize(logger, mockEventRegistry, [ + chainId, + chainId2, + ]); + + const cached1 = await registry.getLastProcessedEvent(chainId); + const cached2 = await registry.getLastProcessedEvent(chainId2); + + expect(cached1).toEqual(mockEvent); + expect(cached2).toEqual(mockEvent2); + expect(mockEventRegistry.getLastProcessedEvent).toHaveBeenCalledTimes(2); + }); +}); diff --git a/packages/data-flow/test/registries/dbEventRegistry.spec.ts b/packages/data-flow/test/registries/dbEventRegistry.spec.ts new file mode 100644 index 0000000..679c000 --- /dev/null +++ b/packages/data-flow/test/registries/dbEventRegistry.spec.ts @@ -0,0 +1,66 @@ +import { describe, expect, it, vi } from "vitest"; + +import { + IEventRegistryRepository, + NewProcessedEvent, + ProcessedEvent, +} from "@grants-stack-indexer/repository"; +import { ChainId, ILogger } from "@grants-stack-indexer/shared"; + +import { DatabaseEventRegistry } from "../../src/registries/event/dbEventRegistry.js"; + +describe("DatabaseEventRegistry", () => { + const logger: ILogger = { + debug: vi.fn(), + error: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + }; + + const mockEventRepository: IEventRegistryRepository = { + getLastProcessedEvent: vi.fn(), + saveLastProcessedEvent: vi.fn(), + }; + + const chainId = 1 as ChainId; + + it("return undefined for non-existent last processed event", async () => { + const registry = new DatabaseEventRegistry(logger, mockEventRepository); + + vi.mocked(mockEventRepository.getLastProcessedEvent).mockResolvedValue(undefined); + + const event = await registry.getLastProcessedEvent(chainId); + expect(event).toBeUndefined(); + expect(mockEventRepository.getLastProcessedEvent).toHaveBeenCalledWith(chainId); + }); + + it("return last processed event when it exists", async () => { + const registry = new DatabaseEventRegistry(logger, mockEventRepository); + const mockEvent: ProcessedEvent = { + chainId, + blockNumber: 100, + blockTimestamp: 1234567890, + logIndex: 1, + }; + + vi.mocked(mockEventRepository.getLastProcessedEvent).mockResolvedValue(mockEvent); + + const event = await registry.getLastProcessedEvent(chainId); + expect(event).toEqual(mockEvent); + expect(mockEventRepository.getLastProcessedEvent).toHaveBeenCalledWith(chainId); + }); + + it("save last processed event", async () => { + const registry = new DatabaseEventRegistry(logger, mockEventRepository); + const newEvent: NewProcessedEvent = { + blockNumber: 100, + blockTimestamp: 1234567890, + logIndex: 1, + }; + + vi.mocked(mockEventRepository.saveLastProcessedEvent).mockResolvedValue(); + + await registry.saveLastProcessedEvent(chainId, newEvent); + expect(mockEventRepository.saveLastProcessedEvent).toHaveBeenCalledWith(chainId, newEvent); + }); +}); diff --git a/packages/data-flow/test/unit/eventsRegistry.spec.ts b/packages/data-flow/test/unit/eventsRegistry.spec.ts deleted file mode 100644 index be14f7a..0000000 --- a/packages/data-flow/test/unit/eventsRegistry.spec.ts +++ /dev/null @@ -1,105 +0,0 @@ -import { describe, expect, it, vi } from "vitest"; - -import { ChainId, ILogger, ProcessorEvent } from "@grants-stack-indexer/shared"; - -import { InMemoryEventsRegistry } from "../../src/registries/event/eventsRegistry.js"; - -describe("InMemoryEventsRegistry", () => { - const logger: ILogger = { - debug: vi.fn(), - error: vi.fn(), - info: vi.fn(), - warn: vi.fn(), - }; - const chainId = 1 as ChainId; - it("return null when no event has been saved", async () => { - const registry = new InMemoryEventsRegistry(logger); - const lastEvent = await registry.getLastProcessedEvent(chainId); - expect(lastEvent).toBeUndefined(); - }); - - it("save and retrieve the last processed event", async () => { - const registry = new InMemoryEventsRegistry(logger); - const mockEvent: ProcessorEvent<"Allo", "PoolCreated"> = { - contractName: "Allo", - eventName: "PoolCreated", - blockNumber: 1, - blockTimestamp: 1234567890, - chainId: 1 as ChainId, - logIndex: 0, - srcAddress: "0x123", - strategyId: "0xstrategy", - params: { - poolId: "1", - profileId: "0x456", - strategy: "0x789", - token: "0xtoken", - amount: "0", - metadata: ["1", "0xmetadata"], - }, - transactionFields: { - hash: "0xabc", - transactionIndex: 0, - }, - }; - - await registry.saveLastProcessedEvent(chainId, mockEvent); - const retrievedEvent = await registry.getLastProcessedEvent(chainId); - - expect(retrievedEvent).toEqual(mockEvent); - }); - - it("updates the last processed event when saving multiple times", async () => { - const registry = new InMemoryEventsRegistry(logger); - - const firstEvent: ProcessorEvent<"Allo", "PoolCreated"> = { - contractName: "Allo", - eventName: "PoolCreated", - blockNumber: 1, - blockTimestamp: 1234567890, - chainId: 1 as ChainId, - logIndex: 0, - srcAddress: "0x123", - strategyId: "0xstrategy", - params: { - poolId: "1", - profileId: "0x456", - strategy: "0x789", - token: "0xtoken", - amount: "0", - metadata: ["1", "0xmetadata"], - }, - transactionFields: { - hash: "0xabc", - transactionIndex: 0, - }, - }; - - const secondEvent: ProcessorEvent<"Strategy", "RegisteredWithSender"> = { - contractName: "Strategy", - eventName: "RegisteredWithSender", - blockNumber: 1, - blockTimestamp: 1234567890, - chainId: 1 as ChainId, - logIndex: 0, - srcAddress: "0x123", - strategyId: "0xstrategy", - params: { - recipientId: "0xrecipient", - data: "0xdata", - sender: "0xsender", - }, - transactionFields: { - hash: "0xabc", - transactionIndex: 0, - }, - }; - - await registry.saveLastProcessedEvent(chainId, firstEvent); - await registry.saveLastProcessedEvent(chainId, secondEvent); - - const lastEvent = await registry.getLastProcessedEvent(chainId); - expect(lastEvent).toEqual(secondEvent); - expect(lastEvent).not.toEqual(firstEvent); - }); -}); diff --git a/packages/data-flow/test/unit/orchestrator.spec.ts b/packages/data-flow/test/unit/orchestrator.spec.ts index 7b492c2..7b90c04 100644 --- a/packages/data-flow/test/unit/orchestrator.spec.ts +++ b/packages/data-flow/test/unit/orchestrator.spec.ts @@ -178,14 +178,7 @@ describe("Orchestrator", { sequential: true }, () => { expect(eventsProcessorSpy).toHaveBeenCalledWith(mockEvents[0]); expect(eventsProcessorSpy).toHaveBeenCalledWith(mockEvents[1]); - expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalledWith( - chainId, - mockEvents[0], - ); - expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalledWith( - chainId, - mockEvents[1], - ); + expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalledTimes(2); }); it("wait and keep polling on empty queue", async () => { @@ -274,10 +267,7 @@ describe("Orchestrator", { sequential: true }, () => { }); expect(orchestrator["dataLoader"].applyChanges).toHaveBeenCalledTimes(1); expect(orchestrator["dataLoader"].applyChanges).toHaveBeenCalledWith(changesets); - expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalledWith( - chainId, - mockEvent, - ); + expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalled(); }); it("save strategyId to registry on PoolCreated event", async () => { @@ -409,10 +399,7 @@ describe("Orchestrator", { sequential: true }, () => { ); expect(orchestrator["dataLoader"].applyChanges).toHaveBeenCalledTimes(1); expect(orchestrator["dataLoader"].applyChanges).toHaveBeenCalledWith(changesets); - expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalledWith( - chainId, - mockEvent, - ); + expect(mockEventsRegistry.saveLastProcessedEvent).toHaveBeenCalled(); }); } diff --git a/packages/repository/src/repositories/kysely/eventRegistry.repository.ts b/packages/repository/src/repositories/kysely/eventRegistry.repository.ts index 2b77858..5a93502 100644 --- a/packages/repository/src/repositories/kysely/eventRegistry.repository.ts +++ b/packages/repository/src/repositories/kysely/eventRegistry.repository.ts @@ -23,11 +23,20 @@ export class KyselyEventRegistryRepository implements IEventRegistryRepository { /** @inheritdoc */ async saveLastProcessedEvent(chainId: ChainId, event: NewProcessedEvent): Promise { + const { blockNumber, blockTimestamp, logIndex, rawEvent } = event; // Extract only the fields from NewProcessedEvent await this.db .withSchema(this.schemaName) .insertInto("events") - .values({ ...event, chainId }) - .onConflict((oc) => oc.columns(["chainId"]).doUpdateSet({ ...event, chainId })) + .values({ blockNumber, blockTimestamp, logIndex, chainId, rawEvent }) + .onConflict((oc) => + oc.columns(["chainId"]).doUpdateSet({ + blockNumber, + blockTimestamp, + logIndex, + rawEvent, + chainId, + }), + ) .execute(); } } diff --git a/packages/repository/src/types/event.types.ts b/packages/repository/src/types/event.types.ts index 3f1eb50..1e0c5ca 100644 --- a/packages/repository/src/types/event.types.ts +++ b/packages/repository/src/types/event.types.ts @@ -1,11 +1,11 @@ -import { AnyEvent, ChainId } from "@grants-stack-indexer/shared"; +import { AnyEvent, ChainId, ContractName, ProcessorEvent } from "@grants-stack-indexer/shared"; export type ProcessedEvent = { chainId: ChainId; blockNumber: number; blockTimestamp: number; logIndex: number; - rawEvent?: AnyEvent; + rawEvent?: Partial>; }; export type NewProcessedEvent = Omit;