Skip to content

Commit

Permalink
Merge pull request #12 from usherlabs/feature/parallel_to_sequential
Browse files Browse the repository at this point in the history
Feature/parallel to sequential
  • Loading branch information
rsoury authored Nov 29, 2024
2 parents f34f8ae + c2f17fa commit 3b51a91
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 74 deletions.
12 changes: 4 additions & 8 deletions orchestrator/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ const baseConfig = {
ecdsaPrivateKey: process.env.SENTRY_DSN ?? "",
batchSize: process.env.BATCH_SIZE ?? 1000,
// Integrations
xApiSecret: process.env.X_API_SECRET ?? "",
xApiKey: process.env.X_API_KEY ?? "",
xBearerToken: process.env.X_BEARER_TOKEN ?? "",
};

interface IEnvVars {
Expand All @@ -41,8 +40,7 @@ interface IEnvVars {
sentryDSN?: string;
ecdsaPrivateKey?: string;
batchSize: number;
xApiKey: string;
xApiSecret: string;
xBearerToken: string;
}

const envVarsSchema = Joi.object({
Expand Down Expand Up @@ -91,8 +89,7 @@ const envVarsSchema = Joi.object({
aptosIndexerCron: Joi.string().default("*/5 * * * * *"),

// Integrations
xApiSecret: Joi.string().allow("").required(),
xApiKey: Joi.string().allow("").required(),
xBearerToken: Joi.string().allow("").required(),

// Common
sentryDSN: Joi.string().allow("", null),
Expand All @@ -115,8 +112,7 @@ export default {
ecdsaPrivateKey: envVars.ecdsaPrivateKey,
sentryDSN: envVars.sentryDSN,
integrations: {
xApiSecret: envVars.xApiSecret,
xApiKey: envVars.xApiKey,
xBearerToken: envVars.xBearerToken,
},
rooch: {
chainId: envVars.roochChainId,
Expand Down
6 changes: 1 addition & 5 deletions orchestrator/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,19 @@ import "dotenv/config";
import env from "./env";
import AptosIndexer from "./indexer/aptos";
import RoochIndexer from "./indexer/rooch";
import { instance as xInstance } from "./integrations/xtwitter";
import { log } from "./logger";

(async () => {
// Check env variables to determine which chains to subscribe to for events.
// Start cron job to check for new events from Rooch Oracles
if (xInstance.isAvailable()) {
await xInstance.requestAccessToken();
}

if (env.rooch.privateKey && env.rooch.chainId.length > 0 && env.rooch.oracleAddress && env.chains.includes("ROOCH")) {
// https://www.npmjs.com/package/cron#cronjob-class

env.rooch.chainId.map((chain) => {
const rooch = new RoochIndexer(env.rooch.privateKey, chain, env.rooch.oracleAddress);
new CronJob(
"0 * * * *",
"*/15 * * * *",
() => {
rooch.sendUnfulfilledRequests();
},
Expand Down
32 changes: 26 additions & 6 deletions orchestrator/src/indexer/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export abstract class Indexer {
}

applyAuthorizationHeader(hostname: string): string | undefined {
if (ALLOWED_HOST.includes(hostname) && xTwitterInstance.isInitialized()) {
if (ALLOWED_HOST.includes(hostname)) {
const token = xTwitterInstance.getAccessToken();
return `Bearer ${token}`;
}
Expand Down Expand Up @@ -164,21 +164,41 @@ export abstract class Indexer {

// Fetch the latest events from the Aptos Oracles Contract
const newRequestsEvents = await this.fetchRequestAddedEvents(Number(latestCommit?.eventSeq ?? 0) ?? 0);
for (let i = 0; i < newRequestsEvents.length; i++) {
try {
if (i > 0) await new Promise((resolve) => setTimeout(resolve, xTwitterInstance.getRequestRate));

await Promise.all(
newRequestsEvents.map(async (event) => {
const event = newRequestsEvents[i];
const data = await this.processRequestAddedEvent(event);

if (data) {
try {
await this.sendFulfillment(event, data.status, JSON.stringify(data.message));
// TODO: Use the notify parameter to send transaction to the contract and function to marked in the request event
await this.save(event, data, RequestStatus.SUCCESS);
} catch (err: any) {
log.error({ err: err.message });
await this.save(event, data, RequestStatus.FAILED);
}
}
}),
);
} catch (error) {
console.error(`Error processing event ${i}:`, error);
}
}

// await Promise.all(
// newRequestsEvents.map(async (event) => {
// const data = await this.processRequestAddedEvent(event);
// if (data) {
// try {
// await this.sendFulfillment(event, data.status, JSON.stringify(data.message));
// // TODO: Use the notify parameter to send transaction to the contract and function to marked in the request event
// await this.save(event, data, RequestStatus.SUCCESS);
// } catch (err: any) {
// log.error({ err: err.message });
// await this.save(event, data, RequestStatus.FAILED);
// }
// }
// }),
// );
}
}
30 changes: 25 additions & 5 deletions orchestrator/src/indexer/rooch.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import env from "@/env";
import { instance as xTwitterInstance } from "@/integrations/xtwitter";
import { log } from "@/logger";
import type { IEvent, IRequestAdded, JsonRpcResponse, ProcessedRequestAdded, RoochNetwork } from "@/types";
import { decodeNotifyValue } from "@/util";
Expand Down Expand Up @@ -95,9 +96,26 @@ export default class RoochIndexer extends Indexer {
}
}

// Process all skipped requests concurrently
await Promise.all(
skippedRequests.map(async (event) => {
// // Process all skipped requests concurrently
// await Promise.all(
// skippedRequests.map(async (event) => {
// const data = await this.processRequestAddedEvent(event);
// if (data) {
// try {
// // Send fulfillment response
// const response = await this.sendFulfillment(event, data.status, JSON.stringify(data.message));
// log.debug({ response }); // Log the response
// } catch (err) {
// log.error({ err }); // Log any errors during fulfillment
// }
// }
// }),
// );

for (let i = 0; i < skippedRequests.length; i++) {
try {
await new Promise((resolve) => setTimeout(resolve, xTwitterInstance.getRequestRate));
const event = skippedRequests[i];
const data = await this.processRequestAddedEvent(event);
if (data) {
try {
Expand All @@ -108,8 +126,10 @@ export default class RoochIndexer extends Indexer {
log.error({ err }); // Log any errors during fulfillment
}
}
}),
);
} catch (err) {
log.error({ err }); // Log any errors during fulfillment
}
}

return skippedRequests; // Return the list of processed requests
}
Expand Down
54 changes: 4 additions & 50 deletions orchestrator/src/integrations/xtwitter.ts
Original file line number Diff line number Diff line change
@@ -1,66 +1,20 @@
import env from "@/env";
import axios from "axios";

class XfkaTwitter {
private initialized = false;
private accessToken: string | null = null;
private SERVER_DOMAIN = "api.twitter.com";

constructor(
private apiKey: string,
private apiKeySecret: string,
) {}
constructor(private accessToken: string) {}

get hosts() {
return ["x.com", "api.x.com", "twitter.com", "api.twitter.com"];
}

isAvailable(): boolean {
if (this.apiKey && this.apiKeySecret) {
return true;
}
return false;
get getRequestRate() {
return 60 * 1000; //
}

isInitialized(): boolean {
return this.initialized;
}

async requestAccessToken() {
try {
// Fetch bearer token
const response = await axios.post(
`https://${this.SERVER_DOMAIN}/oauth2/token`,
new URLSearchParams({
grant_type: "client_credentials",
}).toString(),
{
headers: {
"Content-Type": "application/x-www-form-urlencoded",
},
auth: {
username: this.apiKey,
password: this.apiKeySecret,
},
},
);
const accessToken = response.data.access_token;

this.accessToken = accessToken;
this.initialized = true;
return accessToken;
} catch (error: any) {
console.error("Error fetching bearer token:", typeof error, error.message);
throw error;
}
}

getAccessToken(): string | null {
if (!this.initialized) {
throw new Error("Class not initialized");
}
return this.accessToken;
}
}

export const instance = new XfkaTwitter(env.integrations.xApiKey, env.integrations.xApiSecret);
export const instance = new XfkaTwitter(env.integrations.xBearerToken);

0 comments on commit 3b51a91

Please sign in to comment.