Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: singleton pattern for logging #47

Merged
merged 7 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 14 additions & 18 deletions apps/processing/src/services/processing.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
Orchestrator,
RetroactiveProcessor,
} from "@grants-stack-indexer/data-flow";
import { ChainId, Logger } from "@grants-stack-indexer/shared";
import { ChainId, ILogger } from "@grants-stack-indexer/shared";

import { Environment } from "../config/env.js";
import { SharedDependencies, SharedDependenciesService } from "./index.js";
Expand All @@ -29,50 +29,46 @@ import { SharedDependencies, SharedDependenciesService } from "./index.js";
*/
export class ProcessingService {
private readonly orchestrators: Map<ChainId, [Orchestrator, RetroactiveProcessor]> = new Map();
private readonly logger = new Logger({ className: "ProcessingService" });
private readonly logger: ILogger;
private readonly kyselyDatabase: SharedDependencies["kyselyDatabase"];

private constructor(
orchestrators: Map<ChainId, [Orchestrator, RetroactiveProcessor]>,
kyselyDatabase: SharedDependencies["kyselyDatabase"],
logger: ILogger,
) {
this.orchestrators = orchestrators;
this.kyselyDatabase = kyselyDatabase;
this.logger = logger;
}

static async initialize(env: Environment): Promise<ProcessingService> {
const sharedDependencies = await SharedDependenciesService.initialize(env);
const { CHAINS: chains } = env;
const { core, registriesRepositories, indexerClient, kyselyDatabase } = sharedDependencies;
const { core, registriesRepositories, indexerClient, kyselyDatabase, logger } =
sharedDependencies;
const {
eventRegistryRepository,
strategyRegistryRepository,
strategyProcessingCheckpointRepository,
} = registriesRepositories;
const orchestrators: Map<ChainId, [Orchestrator, RetroactiveProcessor]> = new Map();

const strategyRegistry = new DatabaseStrategyRegistry(
new Logger({ className: "DatabaseStrategyRegistry" }),
strategyRegistryRepository,
);
const eventsRegistry = new DatabaseEventRegistry(
new Logger({ className: "DatabaseEventRegistry" }),
eventRegistryRepository,
);
const strategyRegistry = new DatabaseStrategyRegistry(logger, strategyRegistryRepository);
const eventsRegistry = new DatabaseEventRegistry(logger, eventRegistryRepository);

for (const chain of chains) {
const chainLogger = new Logger({ chainId: chain.id as ChainId });
// Initialize EVM provider
const evmProvider = new EvmProvider(chain.rpcUrls, optimism, chainLogger);
const evmProvider = new EvmProvider(chain.rpcUrls, optimism, logger);

// Initialize events registry for the chain
const cachedEventsRegistry = await InMemoryCachedEventRegistry.initialize(
new Logger({ className: "InMemoryCachedEventRegistry" }),
logger,
eventsRegistry,
[chain.id as ChainId],
);
const cachedStrategyRegistry = await InMemoryCachedStrategyRegistry.initialize(
new Logger({ className: "InMemoryCachedStrategyRegistry" }),
logger,
strategyRegistry,
chain.id as ChainId,
);
Expand All @@ -87,7 +83,7 @@ export class ProcessingService {
},
chain.fetchLimit,
chain.fetchDelayMs,
chainLogger,
logger,
);
const retroactiveProcessor = new RetroactiveProcessor(
chain.id as ChainId,
Expand All @@ -99,13 +95,13 @@ export class ProcessingService {
checkpointRepository: strategyProcessingCheckpointRepository,
},
chain.fetchLimit,
chainLogger,
logger,
);

orchestrators.set(chain.id as ChainId, [orchestrator, retroactiveProcessor]);
}

return new ProcessingService(orchestrators, kyselyDatabase);
return new ProcessingService(orchestrators, kyselyDatabase, logger);
}

/**
Expand Down
22 changes: 13 additions & 9 deletions apps/processing/src/services/sharedDependencies.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
KyselyStrategyProcessingCheckpointRepository,
KyselyStrategyRegistryRepository,
} from "@grants-stack-indexer/repository";
import { Logger } from "@grants-stack-indexer/shared";
import { ILogger, Logger } from "@grants-stack-indexer/shared";

import { Environment } from "../config/index.js";

Expand All @@ -29,6 +29,7 @@ export type SharedDependencies = {
};
indexerClient: EnvioIndexerClient;
kyselyDatabase: ReturnType<typeof createKyselyDatabase>;
logger: ILogger;
};

/**
Expand All @@ -39,10 +40,15 @@ export type SharedDependencies = {
*/
export class SharedDependenciesService {
static async initialize(env: Environment): Promise<SharedDependencies> {
const logger = Logger.getInstance();

// Initialize repositories
const kyselyDatabase = createKyselyDatabase({
connectionString: env.DATABASE_URL,
});
const kyselyDatabase = createKyselyDatabase(
{
connectionString: env.DATABASE_URL,
},
logger,
);

const projectRepository = new KyselyProjectRepository(kyselyDatabase, env.DATABASE_SCHEMA);
const roundRepository = new KyselyRoundRepository(kyselyDatabase, env.DATABASE_SCHEMA);
Expand All @@ -59,13 +65,10 @@ export class SharedDependenciesService {
env.DATABASE_SCHEMA,
);
const pricingProvider = PricingProviderFactory.create(env, {
logger: new Logger({ className: "PricingProvider" }),
logger,
});

const metadataProvider = new IpfsProvider(
env.IPFS_GATEWAYS_URL,
new Logger({ className: "IpfsProvider" }),
);
const metadataProvider = new IpfsProvider(env.IPFS_GATEWAYS_URL, logger);

const eventRegistryRepository = new KyselyEventRegistryRepository(
kyselyDatabase,
Expand Down Expand Up @@ -102,6 +105,7 @@ export class SharedDependenciesService {
},
indexerClient,
kyselyDatabase,
logger,
};
}
}
6 changes: 6 additions & 0 deletions apps/processing/test/unit/processing.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ vi.mock("../../src/services/sharedDependencies.service.js", () => ({
kyselyDatabase: {
destroy: vi.fn(),
},
logger: {
info: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
warn: vi.fn(),
},
})),
},
}));
Expand Down
34 changes: 28 additions & 6 deletions apps/processing/test/unit/sharedDependencies.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,29 @@ import { EnvioIndexerClient } from "@grants-stack-indexer/indexer-client";
import { IpfsProvider } from "@grants-stack-indexer/metadata";
import { PricingProviderFactory } from "@grants-stack-indexer/pricing";
import { createKyselyDatabase } from "@grants-stack-indexer/repository";
import { Logger } from "@grants-stack-indexer/shared";

import type { Environment } from "../../src/config/env.js";
import { SharedDependenciesService } from "../../src/services/sharedDependencies.service.js";

const mocks = vi.hoisted(() => {
return {
logger: {
info: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
warn: vi.fn(),
},
};
});

vi.mock("@grants-stack-indexer/shared", () => {
return {
Logger: {
getInstance: vi.fn().mockReturnValue(mocks.logger),
},
};
});

// Mock dependencies
vi.mock("@grants-stack-indexer/repository", () => ({
createKyselyDatabase: vi.fn(),
Expand Down Expand Up @@ -93,15 +111,18 @@ describe("SharedDependenciesService", () => {
const dependencies = await SharedDependenciesService.initialize(mockEnv as Environment);

// Verify database initialization
expect(createKyselyDatabase).toHaveBeenCalledWith({
connectionString: mockEnv.DATABASE_URL,
});
expect(createKyselyDatabase).toHaveBeenCalledWith(
{
connectionString: mockEnv.DATABASE_URL,
},
mocks.logger,
);

// Verify providers initialization
expect(PricingProviderFactory.create).toHaveBeenCalledWith(mockEnv, {
logger: expect.any(Logger) as Logger,
logger: mocks.logger,
});
expect(IpfsProvider).toHaveBeenCalledWith(mockEnv.IPFS_GATEWAYS_URL, expect.any(Logger));
expect(IpfsProvider).toHaveBeenCalledWith(mockEnv.IPFS_GATEWAYS_URL, mocks.logger);

// Verify indexer client initialization
expect(EnvioIndexerClient).toHaveBeenCalledWith(
Expand All @@ -114,6 +135,7 @@ describe("SharedDependenciesService", () => {
expect(dependencies).toHaveProperty("registriesRepositories");
expect(dependencies).toHaveProperty("indexerClient");
expect(dependencies).toHaveProperty("kyselyDatabase");
expect(dependencies).toHaveProperty("logger");

// Verify core dependencies
expect(dependencies.core).toHaveProperty("projectRepository");
Expand Down
39 changes: 37 additions & 2 deletions packages/data-flow/src/orchestrator.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { isNativeError } from "util/types";

import { IIndexerClient } from "@grants-stack-indexer/indexer-client";
import {
existsHandler,
Expand Down Expand Up @@ -106,6 +108,13 @@ export class Orchestrator {
event = this.eventsQueue.pop();

if (!event) {
this.logger.debug(
`No event to process, sleeping for ${this.fetchDelayInMs}ms`,
{
className: Orchestrator.name,
chainId: this.chainId,
},
);
await delay(this.fetchDelayInMs);
continue;
}
Expand All @@ -125,6 +134,11 @@ export class Orchestrator {
);
} else if (event.contractName === "Strategy" && "strategyId" in event) {
if (!existsHandler(event.strategyId)) {
this.logger.debug("Skipping event", {
event,
className: Orchestrator.name,
chainId: this.chainId,
});
// we skip the event if the strategy id is not handled yet
continue;
}
Expand All @@ -139,6 +153,10 @@ export class Orchestrator {
`Failed to apply changesets. ${executionResult.errors.join("\n")} Event: ${stringify(
event,
)}`,
{
className: Orchestrator.name,
chainId: this.chainId,
},
);
}
} catch (error: unknown) {
Expand All @@ -152,12 +170,29 @@ export class Orchestrator {
// `Current event cannot be handled. ${error.name}: ${error.message}. Event: ${stringify(event)}`,
// );
} else {
this.logger.error(`Error processing event: ${stringify(event)} ${error}`);
if (error instanceof Error || isNativeError(error)) {
this.logger.error(error, {
event,
className: Orchestrator.name,
chainId: this.chainId,
});
} else {
this.logger.error(
new Error(`Error processing event: ${stringify(event)} ${error}`),
{
className: Orchestrator.name,
chainId: this.chainId,
},
);
}
}
}
}

this.logger.info("Shutdown signal received. Exiting...");
this.logger.info("Shutdown signal received. Exiting...", {
className: Orchestrator.name,
chainId: this.chainId,
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ export class InMemoryCachedEventRegistry implements IEventsRegistry {

/** @inheritdoc */
async saveLastProcessedEvent(chainId: ChainId, event: NewProcessedEvent): Promise<void> {
this.logger.debug(`Saving last processed event: ${stringify(event, undefined, 4)}`);
this.logger.debug(`Saving last processed event: ${stringify(event, undefined, 4)}`, {
className: InMemoryCachedEventRegistry.name,
chainId,
});
await this.eventRegistry.saveLastProcessedEvent(chainId, event);
this.cache.set(chainId, { ...event, chainId });
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ export class InMemoryCachedStrategyRegistry implements IStrategyRegistry {

this.logger.debug(
`Saving strategy id ${strategyId} for address ${strategyAddress} and chainId ${chainId}`,
{
className: InMemoryCachedStrategyRegistry.name,
chainId,
},
);
await this.strategyRegistry.saveStrategyId(chainId, strategyAddress, strategyId, handled);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ export class DatabaseStrategyRegistry implements IStrategyRegistry {
): Promise<void> {
this.logger.debug(
`Saving strategy id ${strategyId} for address ${strategyAddress} and chainId ${chainId} in Database`,
{
className: DatabaseStrategyRegistry.name,
chainId,
},
);
await this.strategyRepository.saveStrategy({
chainId,
Expand Down
Loading
Loading