From 5bc7907f63bd265bad450a0eb632042f8c76bfc3 Mon Sep 17 00:00:00 2001 From: xlassix Date: Fri, 29 Nov 2024 16:16:10 +0100 Subject: [PATCH 1/2] Update xTwitter integration to use bearer token --- orchestrator/src/env.ts | 12 ++--- orchestrator/src/index.ts | 6 +-- orchestrator/src/indexer/base.ts | 32 +++++++++++--- orchestrator/src/indexer/rooch.ts | 30 ++++++++++--- orchestrator/src/integrations/xtwitter.ts | 54 ++--------------------- 5 files changed, 60 insertions(+), 74 deletions(-) diff --git a/orchestrator/src/env.ts b/orchestrator/src/env.ts index d954072..ca57f5b 100644 --- a/orchestrator/src/env.ts +++ b/orchestrator/src/env.ts @@ -23,8 +23,7 @@ const baseConfig = { ecdsaPrivateKey: process.env.SENTRY_DSN ?? "", batchSize: process.env.BATCH_SIZE ?? 1000, // Integrations - xApiSecret: process.env.X_API_SECRET ?? "", - xApiKey: process.env.X_API_KEY ?? "", + xBearerToken: process.env.X_BEARER_TOKEN ?? "", }; interface IEnvVars { @@ -41,8 +40,7 @@ interface IEnvVars { sentryDSN?: string; ecdsaPrivateKey?: string; batchSize: number; - xApiKey: string; - xApiSecret: string; + xBearerToken: string; } const envVarsSchema = Joi.object({ @@ -91,8 +89,7 @@ const envVarsSchema = Joi.object({ aptosIndexerCron: Joi.string().default("*/5 * * * * *"), // Integrations - xApiSecret: Joi.string().allow("").required(), - xApiKey: Joi.string().allow("").required(), + xBearerToken: Joi.string().allow("").required(), // Common sentryDSN: Joi.string().allow("", null), @@ -115,8 +112,7 @@ export default { ecdsaPrivateKey: envVars.ecdsaPrivateKey, sentryDSN: envVars.sentryDSN, integrations: { - xApiSecret: envVars.xApiSecret, - xApiKey: envVars.xApiKey, + xBearerToken: envVars.xBearerToken, }, rooch: { chainId: envVars.roochChainId, diff --git a/orchestrator/src/index.ts b/orchestrator/src/index.ts index 93da502..9744ea4 100644 --- a/orchestrator/src/index.ts +++ b/orchestrator/src/index.ts @@ -3,15 +3,11 @@ import "dotenv/config"; import env from "./env"; import AptosIndexer from "./indexer/aptos"; import RoochIndexer from "./indexer/rooch"; -import { instance as xInstance } from "./integrations/xtwitter"; import { log } from "./logger"; (async () => { // Check env variables to determine which chains to subscribe to for events. // Start cron job to check for new events from Rooch Oracles - if (xInstance.isAvailable()) { - await xInstance.requestAccessToken(); - } if (env.rooch.privateKey && env.rooch.chainId.length > 0 && env.rooch.oracleAddress && env.chains.includes("ROOCH")) { // https://www.npmjs.com/package/cron#cronjob-class @@ -19,7 +15,7 @@ import { log } from "./logger"; env.rooch.chainId.map((chain) => { const rooch = new RoochIndexer(env.rooch.privateKey, chain, env.rooch.oracleAddress); new CronJob( - "0 * * * *", + "*/15 * * * *", () => { rooch.sendUnfulfilledRequests(); }, diff --git a/orchestrator/src/indexer/base.ts b/orchestrator/src/indexer/base.ts index 5e92acf..1660604 100644 --- a/orchestrator/src/indexer/base.ts +++ b/orchestrator/src/indexer/base.ts @@ -44,7 +44,7 @@ export abstract class Indexer { } applyAuthorizationHeader(hostname: string): string | undefined { - if (ALLOWED_HOST.includes(hostname) && xTwitterInstance.isInitialized()) { + if (ALLOWED_HOST.includes(hostname)) { const token = xTwitterInstance.getAccessToken(); return `Bearer ${token}`; } @@ -164,21 +164,41 @@ export abstract class Indexer { // Fetch the latest events from the Aptos Oracles Contract const newRequestsEvents = await this.fetchRequestAddedEvents(Number(latestCommit?.eventSeq ?? 0) ?? 0); + for (let i = 0; i < newRequestsEvents.length; i++) { + try { + await new Promise((resolve) => setTimeout(resolve, xTwitterInstance.getRequestRate)); - await Promise.all( - newRequestsEvents.map(async (event) => { + const event = newRequestsEvents[i]; const data = await this.processRequestAddedEvent(event); + if (data) { try { await this.sendFulfillment(event, data.status, JSON.stringify(data.message)); - // TODO: Use the notify parameter to send transaction to the contract and function to marked in the request event await this.save(event, data, RequestStatus.SUCCESS); } catch (err: any) { log.error({ err: err.message }); await this.save(event, data, RequestStatus.FAILED); } } - }), - ); + } catch (error) { + console.error(`Error processing event ${i}:`, error); + } + } + + // await Promise.all( + // newRequestsEvents.map(async (event) => { + // const data = await this.processRequestAddedEvent(event); + // if (data) { + // try { + // await this.sendFulfillment(event, data.status, JSON.stringify(data.message)); + // // TODO: Use the notify parameter to send transaction to the contract and function to marked in the request event + // await this.save(event, data, RequestStatus.SUCCESS); + // } catch (err: any) { + // log.error({ err: err.message }); + // await this.save(event, data, RequestStatus.FAILED); + // } + // } + // }), + // ); } } diff --git a/orchestrator/src/indexer/rooch.ts b/orchestrator/src/indexer/rooch.ts index 32e6eba..fc50663 100644 --- a/orchestrator/src/indexer/rooch.ts +++ b/orchestrator/src/indexer/rooch.ts @@ -1,4 +1,5 @@ import env from "@/env"; +import { instance as xTwitterInstance } from "@/integrations/xtwitter"; import { log } from "@/logger"; import type { IEvent, IRequestAdded, JsonRpcResponse, ProcessedRequestAdded, RoochNetwork } from "@/types"; import { decodeNotifyValue } from "@/util"; @@ -95,9 +96,26 @@ export default class RoochIndexer extends Indexer { } } - // Process all skipped requests concurrently - await Promise.all( - skippedRequests.map(async (event) => { + // // Process all skipped requests concurrently + // await Promise.all( + // skippedRequests.map(async (event) => { + // const data = await this.processRequestAddedEvent(event); + // if (data) { + // try { + // // Send fulfillment response + // const response = await this.sendFulfillment(event, data.status, JSON.stringify(data.message)); + // log.debug({ response }); // Log the response + // } catch (err) { + // log.error({ err }); // Log any errors during fulfillment + // } + // } + // }), + // ); + + for (let i = 0; i < skippedRequests.length; i++) { + try { + await new Promise((resolve) => setTimeout(resolve, xTwitterInstance.getRequestRate)); + const event = skippedRequests[i]; const data = await this.processRequestAddedEvent(event); if (data) { try { @@ -108,8 +126,10 @@ export default class RoochIndexer extends Indexer { log.error({ err }); // Log any errors during fulfillment } } - }), - ); + } catch (err) { + log.error({ err }); // Log any errors during fulfillment + } + } return skippedRequests; // Return the list of processed requests } diff --git a/orchestrator/src/integrations/xtwitter.ts b/orchestrator/src/integrations/xtwitter.ts index 0fc5a61..42175ae 100644 --- a/orchestrator/src/integrations/xtwitter.ts +++ b/orchestrator/src/integrations/xtwitter.ts @@ -1,66 +1,20 @@ import env from "@/env"; -import axios from "axios"; class XfkaTwitter { - private initialized = false; - private accessToken: string | null = null; private SERVER_DOMAIN = "api.twitter.com"; - constructor( - private apiKey: string, - private apiKeySecret: string, - ) {} + constructor(private accessToken: string) {} get hosts() { return ["x.com", "api.x.com", "twitter.com", "api.twitter.com"]; } - isAvailable(): boolean { - if (this.apiKey && this.apiKeySecret) { - return true; - } - return false; + get getRequestRate() { + return 60 * 1000; // } - - isInitialized(): boolean { - return this.initialized; - } - - async requestAccessToken() { - try { - // Fetch bearer token - const response = await axios.post( - `https://${this.SERVER_DOMAIN}/oauth2/token`, - new URLSearchParams({ - grant_type: "client_credentials", - }).toString(), - { - headers: { - "Content-Type": "application/x-www-form-urlencoded", - }, - auth: { - username: this.apiKey, - password: this.apiKeySecret, - }, - }, - ); - const accessToken = response.data.access_token; - - this.accessToken = accessToken; - this.initialized = true; - return accessToken; - } catch (error: any) { - console.error("Error fetching bearer token:", typeof error, error.message); - throw error; - } - } - getAccessToken(): string | null { - if (!this.initialized) { - throw new Error("Class not initialized"); - } return this.accessToken; } } -export const instance = new XfkaTwitter(env.integrations.xApiKey, env.integrations.xApiSecret); +export const instance = new XfkaTwitter(env.integrations.xBearerToken); From c2f17faa3be51c214976019fe65af1a4044106db Mon Sep 17 00:00:00 2001 From: xlassix Date: Fri, 29 Nov 2024 16:27:37 +0100 Subject: [PATCH 2/2] Update indexer base to conditionally wait on the first loop --- orchestrator/src/indexer/base.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestrator/src/indexer/base.ts b/orchestrator/src/indexer/base.ts index 1660604..b550602 100644 --- a/orchestrator/src/indexer/base.ts +++ b/orchestrator/src/indexer/base.ts @@ -166,7 +166,7 @@ export abstract class Indexer { const newRequestsEvents = await this.fetchRequestAddedEvents(Number(latestCommit?.eventSeq ?? 0) ?? 0); for (let i = 0; i < newRequestsEvents.length; i++) { try { - await new Promise((resolve) => setTimeout(resolve, xTwitterInstance.getRequestRate)); + if (i > 0) await new Promise((resolve) => setTimeout(resolve, xTwitterInstance.getRequestRate)); const event = newRequestsEvents[i]; const data = await this.processRequestAddedEvent(event);