Skip to content

Commit

Permalink
Merge pull request #14 from usherlabs/feature/open_ai
Browse files Browse the repository at this point in the history
Feature/open ai
  • Loading branch information
rsoury authored Jan 13, 2025
2 parents 04363b4 + 90548b4 commit f847449
Show file tree
Hide file tree
Showing 9 changed files with 952 additions and 135 deletions.
4 changes: 4 additions & 0 deletions orchestrator/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const baseConfig = {
batchSize: process.env.BATCH_SIZE ?? 1000,
// Integrations
xBearerToken: process.env.X_BEARER_TOKEN ?? "",
openAIToken: process.env.OPEN_AI_TOKEN ?? "",
};

interface IEnvVars {
Expand All @@ -41,6 +42,7 @@ interface IEnvVars {
ecdsaPrivateKey?: string;
batchSize: number;
xBearerToken: string;
openAIToken: string;
}

const envVarsSchema = Joi.object({
Expand Down Expand Up @@ -90,6 +92,7 @@ const envVarsSchema = Joi.object({

// Integrations
xBearerToken: Joi.string().allow("").required(),
openAIToken: Joi.string().allow("").required(),

// Common
sentryDSN: Joi.string().allow("", null),
Expand All @@ -113,6 +116,7 @@ export default {
sentryDSN: envVars.sentryDSN,
integrations: {
xBearerToken: envVars.xBearerToken,
openAIToken: envVars.openAIToken,
},
rooch: {
chainId: envVars.roochChainId,
Expand Down
93 changes: 21 additions & 72 deletions orchestrator/src/indexer/base.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import { log } from "@/logger";
import { type ProcessedRequestAdded, RequestStatus } from "@/types";
import { run as jqRun } from "node-jq";

import { instance as openAIInstance } from "@/integrations/openAI";
import { instance as xTwitterInstance } from "@/integrations/xtwitter";
import { isValidJson } from "@/util";
import axios, { type AxiosResponse } from "axios";
import prismaClient from "../../prisma";

const ALLOWED_HOST = [...xTwitterInstance.hosts];
import type { BasicBearerAPIHandler } from "@/integrations/base";
import prismaClient from "../../prisma";

// Abstract base class
export abstract class Indexer {
Expand Down Expand Up @@ -43,12 +41,14 @@ export abstract class Indexer {
return this.oracleAddress;
}

applyAuthorizationHeader(hostname: string): string | undefined {
if (ALLOWED_HOST.includes(hostname)) {
const token = xTwitterInstance.getAccessToken();
return `Bearer ${token}`;
requestHandlerSelector(url: URL): BasicBearerAPIHandler | null {
if (xTwitterInstance.isApprovedPath(url)) {
return xTwitterInstance;
}
return undefined;
if (openAIInstance.isApprovedPath(url)) {
return openAIInstance;
}
return null;
}

/**
Expand All @@ -70,9 +70,10 @@ export abstract class Indexer {
* @param {IRequestAdded} data - The request data that needs to be processed.
* @returns {Promise<{status: number, message: string} | null>} - The status and message of the processed request, or null if the request is not valid.
*/
async processRequestAddedEvent<T>(data: ProcessedRequestAdded<T>) {
async processRequestAddedEvent<T>(
data: ProcessedRequestAdded<T>,
): Promise<{ status: number; message: string } | null> {
log.debug("processing request:", data.request_id);
const token = xTwitterInstance.getAccessToken();

if (data.oracle.toLowerCase() !== this.getOrchestratorAddress().toLowerCase()) {
log.debug(
Expand All @@ -83,69 +84,17 @@ export abstract class Indexer {
);
return null;
}
const url = data.params.url?.includes("http") ? data.params.url : `https://${data.params.url}`;
try {
const _url = new URL(url);

if (!ALLOWED_HOST.includes(_url.hostname.toLowerCase())) {
return { status: 406, message: `${_url.hostname} is supposed by this orchestrator` };
}
} catch (err) {
return { status: 406, message: `Invalid Domain Name` };
}
const url = data.params.url?.includes("http") ? data.params.url : `https://${data.params.url}`;
const url_object = new URL(url);

try {
let request: AxiosResponse<any, any>;
if (isValidJson(data.params.headers)) {
// TODO: Replace direct requests via axios with requests via VerityClient TS module
request = await axios({
method: data.params.method,
data: data.params.body,
url: url,
headers: {
...JSON.parse(data.params.headers),
Authorization: `Bearer ${token}`,
},
});
// return { status: request.status, message: request.data };
} else {
request = await axios({
method: data.params.method,
data: data.params.body,
url: url,
headers: {
Authorization: `Bearer ${token}`,
},
});
}

try {
const result = await jqRun(data.pick, JSON.stringify(request.data), { input: "string" });
return { status: request.status, message: result };
} catch {
return { status: 409, message: "`Pick` value provided could not be resolved on the returned response" };
}
// return { status: request.status, message: result };
} catch (error: any) {
log.debug(
JSON.stringify({
error: error.message,
}),
);

if (axios.isAxiosError(error)) {
// Handle Axios-specific errors
if (error.response) {
// Server responded with a status other than 2xx
return { status: error.response.status, message: error.response.data };
} else if (error.request) {
// No response received
return { status: 504, message: "No response received" };
}
} else {
// Handle non-Axios errors
return { status: 500, message: "Unexpected error" };
const handler = this.requestHandlerSelector(url_object);
if (handler) {
return handler.submitRequest(data);
}
return { status: 406, message: "URL Not supported" };
} catch {
return { status: 406, message: "Invalid URL" };
}
}

Expand Down
110 changes: 110 additions & 0 deletions orchestrator/src/integrations/base.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { log } from "@/logger";
import type { ProcessedRequestAdded } from "@/types";
import { isValidJson } from "@/util";
import axios, { type AxiosResponse } from "axios";
import { run as jqRun } from "node-jq";

export abstract class BasicBearerAPIHandler {
constructor(
protected accessToken: string,
protected supported_host: string[],
protected supported_paths: string[],
protected rate: number,
) {}

get hosts() {
return this.supported_host;
}

get paths() {
return this.supported_paths;
}

get getRequestRate() {
return this.rate;
}

isApprovedPath(url: URL): boolean {
return (
this.hosts.includes(url.hostname.toLowerCase()) &&
this.supported_paths.filter((path) => url.pathname.toLowerCase().startsWith(path)).length > 0
);
}

getAccessToken(): string | null {
return this.accessToken;
}

abstract validatePayload(path: string, payload: string | null): boolean;

async submitRequest(data: ProcessedRequestAdded<any>): Promise<{ status: number; message: string }> {
try {
const url = data.params.url?.includes("http") ? data.params.url : `https://${data.params.url}`;
try {
const url_object = new URL(url);
if (!this.isApprovedPath(url_object)) {
return { status: 406, message: `${url_object} is supposed by this orchestrator` };
}
if (this.validatePayload(url_object.pathname, data.params.body)) {
return { status: 406, message: `Invalid Payload` };
}
} catch (err) {
return { status: 406, message: `Invalid Domain Name` };
}

const token = this.getAccessToken();
let request: AxiosResponse<any, any>;
if (isValidJson(data.params.headers)) {
// TODO: Replace direct requests via axios with requests via VerityClient TS module
request = await axios({
method: data.params.method,
data: data.params.body,
url: url,
headers: {
...JSON.parse(data.params.headers),
Authorization: `Bearer ${token}`,
},
});
// return { status: request.status, message: request.data };
} else {
request = await axios({
method: data.params.method,
data: data.params.body,
url: url,
headers: {
Authorization: `Bearer ${token}`,
},
});
}

try {
const result = (await jqRun(data.pick, JSON.stringify(request.data), { input: "string" })) as string;
return { status: request.status, message: result };
} catch {
return { status: 409, message: "`Pick` value provided could not be resolved on the returned response" };
}
// return { status: request.status, message: result };
} catch (error: any) {
log.debug(
JSON.stringify({
error: error.message,
}),
);

if (axios.isAxiosError(error)) {
// Handle Axios-specific errors
if (error.response) {
// Server responded with a status other than 2xx
return { status: error.response.status, message: error.response.data };
} else if (error.request) {
// No response received
return { status: 504, message: "No response received" };
} else {
// Handle non-Axios errors
return { status: 500, message: "Unexpected error" };
}
}
}
return { status: 500, message: "Something unexpected Happened" };
}
}
41 changes: 41 additions & 0 deletions orchestrator/src/integrations/openAI.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import env from "@/env";
import Joi from "joi";
import { BasicBearerAPIHandler } from "./base";

const chatSchema = Joi.object({
model: Joi.string().required(),
messages: Joi.array().items(
Joi.object({
role: Joi.string().valid("developer", "user").required(),
content: Joi.string().required(),
}).required(),
),
});
export default class TwitterIntegration extends BasicBearerAPIHandler {
validatePayload(path: string, payload: string): boolean {
try {
if (this.supported_paths.includes(path)) {
if (path === "/v1/chat/completions") {
const { error, value } = chatSchema.validate(JSON.parse(payload));
if (error) {
return false;
} else {
if (value.model === "gpt-4o") {
return true;
}
}
}
}
return false;
} catch {
return false;
}
}
}

export const instance = new TwitterIntegration(
env.integrations.xBearerToken,
["api.openai.com"],
["/v1/chat/completions"],
60 * 1000,
);
25 changes: 10 additions & 15 deletions orchestrator/src/integrations/xtwitter.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
import env from "@/env";
import { BasicBearerAPIHandler } from "./base";

class XfkaTwitter {
private SERVER_DOMAIN = "api.twitter.com";

constructor(private accessToken: string) {}

get hosts() {
return ["x.com", "api.x.com", "twitter.com", "api.twitter.com"];
}

get getRequestRate() {
return 60 * 1000; //
}
getAccessToken(): string | null {
return this.accessToken;
export default class TwitterIntegration extends BasicBearerAPIHandler {
validatePayload(path: string): boolean {
return true;
}
}

export const instance = new XfkaTwitter(env.integrations.xBearerToken);
export const instance = new TwitterIntegration(
env.integrations.openAIToken,
["api.x.com", "api.twitter.com"],
["/2/tweets"],
60 * 1000,
);
6 changes: 3 additions & 3 deletions rooch/Move.toml
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
[package]
name = "verity-move-oracles"
name = "verity-move-@orchestrators"
version = "0.0.1"

[dependencies]
MoveStdlib = { git = "https://github.com/rooch-network/rooch.git", subdir = "frameworks/move-stdlib", rev = "main" }
MoveosStdlib = { git = "https://github.com/rooch-network/rooch.git", subdir = "frameworks/moveos-stdlib", rev = "main" }
# RoochFramework = { git = "https://github.com/rooch-network/rooch.git", subdir = "frameworks/rooch-framework", rev = "main" }
RoochFramework = { git = "https://github.com/rooch-network/rooch.git", subdir = "frameworks/rooch-framework", rev = "main" }

[addresses]
verity = "_"
verity_test_foreign_module = "_"
std = "0x1"
moveos_std = "0x2"
# rooch_framework = "0x3"
rooch_framework = "0x3"

[dev-addresses]
verity = "0x42"
Expand Down
Loading

0 comments on commit f847449

Please sign in to comment.