diff --git a/Cargo.lock b/Cargo.lock index c47a23f92..bfa1577de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -609,6 +609,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba" + [[package]] name = "atty" version = "0.2.14" @@ -2253,7 +2259,7 @@ dependencies = [ "tokio", "tracing", "walkdir", - "yansi", + "yansi 0.5.1", ] [[package]] @@ -2352,6 +2358,20 @@ dependencies = [ "subtle", ] +[[package]] +name = "figment" +version = "0.10.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4547e226f4c9ab860571e070a9034192b3175580ecea38da34fcdb53a018c9a5" +dependencies = [ + "atomic", + "pear", + "serde", + "toml 0.7.4", + "uncased", + "version_check", +] + [[package]] name = "finl_unicode" version = "1.2.0" @@ -3174,6 +3194,12 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac" +[[package]] +name = "inlinable_string" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb" + [[package]] name = "inout" version = "0.1.3" @@ -4153,6 +4179,29 @@ dependencies = [ "hmac", ] +[[package]] +name = "pear" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61a386cd715229d399604b50d1361683fe687066f42d56f54be995bc6868f71c" +dependencies = [ + "inlinable_string", + "pear_codegen", + "yansi 1.0.0-rc.1", +] + +[[package]] +name = "pear_codegen" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da9f0f13dac8069c139e8300a6510e3f4143ecf5259c60b116a9b271b4ca0d54" +dependencies = [ + "proc-macro2", + "proc-macro2-diagnostics", + "quote", + "syn 2.0.28", +] + [[package]] name = "pem" version = "1.1.1" @@ -4385,7 +4434,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af7cee1a6c8a5b9208b3cb1061f10c0cb689087b3d8ce85fb9d2dd7a29b6ba66" dependencies = [ "diff", - "yansi", + "yansi 0.5.1", ] [[package]] @@ -4464,6 +4513,19 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "proc-macro2-diagnostics" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.28", + "version_check", + "yansi 1.0.0-rc.1", +] + [[package]] name = "prometheus" version = "0.13.3" @@ -5391,9 +5453,9 @@ dependencies = [ [[package]] name = "serde_spanned" -version = "0.6.2" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93107647184f6027e3b7dcb2e11034cf95ffa1e3a682c67951963ac69c1c007d" +checksum = "12022b835073e5b11e90a14f86838ceb1c8fb0325b72416845c487ac0fa95e80" dependencies = [ "serde", ] @@ -5449,7 +5511,7 @@ dependencies = [ "async-graphql 4.0.16", "async-graphql-axum", "autometrics 0.3.3", - "axum 0.5.17", + "axum 0.6.20", "build-info", "build-info-build", "cargo-husky", @@ -5461,6 +5523,7 @@ dependencies = [ "ethers-core", "eventuals", "faux", + "figment", "hex", "hex-literal", "hyper", @@ -5472,13 +5535,13 @@ dependencies = [ "reqwest", "serde", "serde_json", + "serde_spanned", "sha3", "sqlx", "tap_core", "test-log", "thiserror", "tokio", - "toml 0.7.4", "toolshed", "tower", "tower-http 0.4.4", @@ -6632,6 +6695,15 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94" +[[package]] +name = "uncased" +version = "0.9.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b9bc53168a4be7402ab86c3aad243a84dd7381d09be0eddc81280c1da95ca68" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.13" @@ -7171,6 +7243,12 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec" +[[package]] +name = "yansi" +version = "1.0.0-rc.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1367295b8f788d371ce2dbc842c7b709c73ee1364d30351dd300ec2203b12377" + [[package]] name = "zeroize" version = "1.6.0" diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs index c095725d9..d3eb5e0a9 100644 --- a/common/src/indexer_service/http/indexer_service.rs +++ b/common/src/indexer_service/http/indexer_service.rs @@ -180,8 +180,14 @@ impl IndexerService { { let metrics = IndexerServiceMetrics::new(options.metrics_prefix); + let http_client = reqwest::Client::builder() + .tcp_nodelay(true) + .timeout(Duration::from_secs(30)) + .build() + .expect("Failed to init HTTP client"); + let network_subgraph = Box::leak(Box::new(SubgraphClient::new( - reqwest::Client::new(), + http_client.clone(), options .config .graph_node @@ -223,7 +229,7 @@ impl IndexerService { ); let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new( - reqwest::Client::new(), + http_client, options .config .graph_node diff --git a/service/Cargo.toml b/service/Cargo.toml index ae0cb4915..d5df369e5 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -22,7 +22,7 @@ tracing = "0.1.34" thiserror = "1.0.49" serde = { version = "1.0", features = ["rc", "derive"] } serde_json = "1" -axum = "0.5" +axum = "0.6.20" hyper = "0.14.27" tower = { version = "0.4", features = ["util", "timeout", "limit"] } tower-http = { version = "0.4.0", features = [ @@ -30,7 +30,6 @@ tower-http = { version = "0.4.0", features = [ "trace", "cors", ] } -toml = "0.7.4" once_cell = "1.17" async-graphql = "4.0.16" async-graphql-axum = "4.0.16" @@ -62,6 +61,11 @@ toolshed = { git = "https://github.com/edgeandnode/toolshed", branch = "main", f "graphql", ] } build-info = "0.0.34" +figment = { version = "0.10", features = ["toml", "env"] } + +# FIXME: Needed due to a serde_spanned version conflict between +# `ethers` and `figment`. +serde_spanned = "=0.6.4" [dev-dependencies] faux = "0.1.10" diff --git a/service/src/cli.rs b/service/src/cli.rs new file mode 100644 index 000000000..e576e744a --- /dev/null +++ b/service/src/cli.rs @@ -0,0 +1,9 @@ +use std::path::PathBuf; + +use clap::Parser; + +#[derive(Parser)] +pub struct Cli { + #[arg(long, value_name = "FILE")] + pub config: PathBuf, +} diff --git a/service/src/common/address.rs b/service/src/common/address.rs deleted file mode 100644 index 4e2232c78..000000000 --- a/service/src/common/address.rs +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use ethers::signers::{ - coins_bip39::English, LocalWallet, MnemonicBuilder, Signer, Wallet, WalletError, -}; -use ethers_core::{k256::ecdsa::SigningKey, utils::hex}; -use sha3::{Digest, Keccak256}; - -/// A normalized address in checksum format. -pub type Address = String; - -/// Converts an address to checksum format and returns a typed instance. -pub fn to_address(s: impl AsRef) -> Address { - let address = s.as_ref().to_ascii_lowercase(); - let hash = &Keccak256::digest(address); - hex::encode(hash) -} - -/// Build Wallet from Private key or Mnemonic -pub fn build_wallet(value: &str) -> Result, WalletError> { - value - .parse::() - .or(MnemonicBuilder::::default().phrase(value).build()) -} - -/// Get wallet public address to String -pub fn wallet_address(wallet: &Wallet) -> String { - format!("{:?}", wallet.address()) -} diff --git a/service/src/common/database.rs b/service/src/common/database.rs deleted file mode 100644 index 5ba4707e8..000000000 --- a/service/src/common/database.rs +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use sqlx::{postgres::PgPoolOptions, PgPool}; - -use std::time::Duration; -use tracing::debug; - -use crate::config; - -pub async fn connect(config: &config::Postgres) -> PgPool { - let url = format!( - "postgresql://{}:{}@{}:{}/{}", - config.postgres_username, - config.postgres_password, - config.postgres_host, - config.postgres_port, - config.postgres_database - ); - - debug!( - postgres_host = tracing::field::debug(&config.postgres_host), - postgres_port = tracing::field::debug(&config.postgres_port), - postgres_database = tracing::field::debug(&config.postgres_database), - "Connecting to database" - ); - - PgPoolOptions::new() - .max_connections(50) - .acquire_timeout(Duration::from_secs(3)) - .connect(&url) - .await - .expect("Could not connect to DATABASE_URL") -} diff --git a/service/src/common/indexer_management/mod.rs b/service/src/common/indexer_management/mod.rs deleted file mode 100644 index 32a19e12a..000000000 --- a/service/src/common/indexer_management/mod.rs +++ /dev/null @@ -1,492 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use std::{collections::HashSet, str::FromStr}; - -use serde::{Deserialize, Serialize}; -use serde_json::Value; -use sqlx::PgPool; -use toolshed::thegraph::{DeploymentId, DeploymentIdError}; - -/// Internal cost model representation as stored in the database. -/// -/// These can have "global" as the deployment ID. -#[derive(Debug, Clone)] -struct DbCostModel { - pub deployment: String, - pub model: Option, - pub variables: Option, -} - -/// External representation of cost models. -/// -/// Here, any notion of "global" is gone and deployment IDs are valid deployment IDs. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CostModel { - pub deployment: DeploymentId, - pub model: Option, - pub variables: Option, -} - -impl TryFrom for CostModel { - type Error = DeploymentIdError; - - fn try_from(db_model: DbCostModel) -> Result { - Ok(Self { - deployment: DeploymentId::from_str(&db_model.deployment)?, - model: db_model.model, - variables: db_model.variables, - }) - } -} - -impl From for DbCostModel { - fn from(model: CostModel) -> Self { - let deployment = model.deployment; - DbCostModel { - deployment: format!("{deployment:#x}"), - model: model.model, - variables: model.variables, - } - } -} - -/// Query cost models from the database, merging the global cost model in -/// whenever there is no cost model defined for a deployment. -pub async fn cost_models( - pool: &PgPool, - deployments: &[DeploymentId], -) -> Result, anyhow::Error> { - let hex_ids = deployments - .iter() - .map(|d| format!("{d:#x}")) - .collect::>(); - - let mut models = if deployments.is_empty() { - sqlx::query_as!( - DbCostModel, - r#" - SELECT deployment, model, variables - FROM "CostModels" - WHERE deployment != 'global' - ORDER BY deployment ASC - "# - ) - .fetch_all(pool) - .await? - } else { - sqlx::query_as!( - DbCostModel, - r#" - SELECT deployment, model, variables - FROM "CostModels" - WHERE deployment = ANY($1) - AND deployment != 'global' - ORDER BY deployment ASC - "#, - &hex_ids - ) - .fetch_all(pool) - .await? - } - .into_iter() - .map(CostModel::try_from) - .collect::, _>>()?; - - let deployments_with_models = models - .iter() - .map(|model| &model.deployment) - .collect::>(); - - let deployments_without_models = deployments - .iter() - .filter(|deployment| !deployments_with_models.contains(deployment)) - .collect::>(); - - // Query the global cost model - if let Some(global_model) = global_cost_model(pool).await? { - // For all deployments that have a cost model, merge it with the global one - models = models - .into_iter() - .map(|model| merge_global(model, &global_model)) - .collect(); - - // Inject a cost model for all deployments that don't have one - models = models - .into_iter() - .chain( - deployments_without_models - .into_iter() - .map(|deployment| CostModel { - deployment: deployment.to_owned(), - model: global_model.model.clone(), - variables: global_model.variables.clone(), - }), - ) - .collect(); - } - - Ok(models) -} - -/// Make database query for a cost model indexed by deployment id -pub async fn cost_model( - pool: &PgPool, - deployment: &DeploymentId, -) -> Result, anyhow::Error> { - let model = sqlx::query_as!( - DbCostModel, - r#" - SELECT deployment, model, variables - FROM "CostModels" - WHERE deployment = $1 - AND deployment != 'global' - "#, - format!("{:#x}", deployment), - ) - .fetch_optional(pool) - .await? - .map(CostModel::try_from) - .transpose()?; - - let global_model = global_cost_model(pool).await?; - - Ok(match (model, global_model) { - // If we have no global model, return whatever we can find for the deployment - (None, None) => None, - (Some(model), None) => Some(model), - - // If we have a cost model and a global cost model, merge them - (Some(model), Some(global_model)) => Some(merge_global(model, &global_model)), - - // If we have only a global model, use that - (None, Some(global_model)) => Some(CostModel { - deployment: deployment.to_owned(), - model: global_model.model, - variables: global_model.variables, - }), - }) -} - -/// Query global cost model -async fn global_cost_model(pool: &PgPool) -> Result, anyhow::Error> { - sqlx::query_as!( - DbCostModel, - r#" - SELECT deployment, model, variables - FROM "CostModels" - WHERE deployment = $1 - "#, - "global" - ) - .fetch_optional(pool) - .await - .map_err(Into::into) -} - -fn merge_global(model: CostModel, global_model: &DbCostModel) -> CostModel { - CostModel { - deployment: model.deployment, - model: model.model.clone().or(global_model.model.clone()), - variables: model.variables.clone().or(global_model.variables.clone()), - } -} - -#[cfg(test)] -mod test { - - use std::str::FromStr; - - use sqlx::PgPool; - - use super::*; - - async fn setup_cost_models_table(pool: &PgPool) { - sqlx::query!( - r#" - CREATE TABLE "CostModels"( - id INT, - deployment VARCHAR NOT NULL, - model TEXT, - variables JSONB, - PRIMARY KEY( deployment ) - ); - "#, - ) - .execute(pool) - .await - .expect("Create test instance in db"); - } - - async fn add_cost_models(pool: &PgPool, models: Vec) { - for model in models { - sqlx::query!( - r#" - INSERT INTO "CostModels" (deployment, model) - VALUES ($1, $2); - "#, - model.deployment, - model.model, - ) - .execute(pool) - .await - .expect("Create test instance in db"); - } - } - - fn to_db_models(models: Vec) -> Vec { - models.into_iter().map(DbCostModel::from).collect() - } - - fn global_cost_model() -> DbCostModel { - DbCostModel { - deployment: "global".to_string(), - model: Some("default => 0.00001;".to_string()), - variables: None, - } - } - - fn test_data() -> Vec { - vec![ - CostModel { - deployment: "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" - .parse() - .unwrap(), - model: None, - variables: None, - }, - CostModel { - deployment: "0xbd499f7673ca32ef4a642207a8bebdd0fb03888cf2678b298438e3a1ae5206ea" - .parse() - .unwrap(), - model: Some("default => 0.00025;".to_string()), - variables: None, - }, - CostModel { - deployment: "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc" - .parse() - .unwrap(), - model: Some("default => 0.00012;".to_string()), - variables: None, - }, - ] - } - - #[sqlx::test] - async fn success_cost_models(pool: PgPool) { - let test_models = test_data(); - let test_deployments = test_models - .iter() - .map(|model| model.deployment) - .collect::>(); - - setup_cost_models_table(&pool).await; - add_cost_models(&pool, to_db_models(test_models.clone())).await; - - // First test: query without deployment filter - let models = cost_models(&pool, &[]) - .await - .expect("cost models query without deployment filter"); - - // We expect as many models as we have in the test data - assert_eq!(models.len(), test_models.len()); - - // We expect models for all test deployments to be present and - // identical to the test data - for test_deployment in test_deployments.iter() { - let test_model = test_models - .iter() - .find(|model| &model.deployment == test_deployment) - .expect("finding cost model for test deployment in test data"); - - let model = models - .iter() - .find(|model| &model.deployment == test_deployment) - .expect("finding cost model for test deployment in query result"); - - assert_eq!(test_model.model, model.model); - } - - // Second test: query with a deployment filter - let sample_deployments = vec![ - test_models.get(0).unwrap().deployment, - test_models.get(1).unwrap().deployment, - ]; - let models = cost_models(&pool, &sample_deployments) - .await - .expect("cost models query with deployment filter"); - - // Expect two cost mdoels to be returned - assert_eq!(models.len(), sample_deployments.len()); - - // Expect both returned deployments to be identical to the test data - for test_deployment in sample_deployments.iter() { - let test_model = test_models - .iter() - .find(|model| &model.deployment == test_deployment) - .expect("finding cost model for test deployment in test data"); - - let model = models - .iter() - .find(|model| &model.deployment == test_deployment) - .expect("finding cost model for test deployment in query result"); - - assert_eq!(test_model.model, model.model); - } - } - - #[sqlx::test] - async fn global_fallback_cost_models(pool: PgPool) { - let test_models = test_data(); - let test_deployments = test_models - .iter() - .map(|model| model.deployment) - .collect::>(); - let global_model = global_cost_model(); - - setup_cost_models_table(&pool).await; - add_cost_models(&pool, to_db_models(test_models.clone())).await; - add_cost_models(&pool, vec![global_model.clone()]).await; - - // First test: fetch cost models without filtering by deployment - let models = cost_models(&pool, &[]) - .await - .expect("cost models query without deployments filter"); - - // Since we've defined 3 cost models and we did not provide a filter, we - // expect all of them to be returned except for the global cost model - assert_eq!(models.len(), test_models.len()); - - // Expect all test deployments to be present in the query result - for test_deployment in test_deployments.iter() { - let test_model = test_models - .iter() - .find(|model| &model.deployment == test_deployment) - .expect("finding cost model for deployment in test data"); - - let model = models - .iter() - .find(|model| &model.deployment == test_deployment) - .expect("finding cost model for deployment in query result"); - - if test_model.model.is_some() { - // If the test model has a model definition, we expect that to be returned - assert_eq!(model.model, test_model.model); - } else { - // If the test model has no model definition, we expect the global - // model definition to be returned - assert_eq!(model.model, global_model.model); - } - } - - // Second test: fetch cost models, filtering by the first two deployment IDs - let sample_deployments = vec![ - test_models.get(0).unwrap().deployment, - test_models.get(1).unwrap().deployment, - ]; - let models = dbg!(cost_models(&pool, &sample_deployments).await) - .expect("cost models query with deployments filter"); - - // We've filtered by two deployment IDs and are expecting two cost models to be returned - assert_eq!(models.len(), sample_deployments.len()); - - for test_deployment in sample_deployments { - let test_model = test_models - .iter() - .find(|model| model.deployment == test_deployment) - .expect("finding cost model for deployment in test data"); - - let model = models - .iter() - .find(|model| model.deployment == test_deployment) - .expect("finding cost model for deployment in query result"); - - if test_model.model.is_some() { - // If the test model has a model definition, we expect that to be returned - assert_eq!(model.model, test_model.model); - } else { - // If the test model has no model definition, we expect the global - // model definition to be returned - assert_eq!(model.model, global_model.model); - } - } - - // Third test: query for missing cost model - let missing_deployment = - DeploymentId::from_str("Qmaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap(); - let models = cost_models(&pool, &[missing_deployment]) - .await - .expect("cost models query for missing deployment"); - - // The deployment may be missing in the database but we have a global model - // and expect that to be returned, with the missing deployment ID - let missing_model = models - .iter() - .find(|m| m.deployment == missing_deployment) - .expect("finding missing deployment"); - assert_eq!(missing_model.model, global_model.model); - } - - #[sqlx::test] - async fn success_cost_model(pool: PgPool) { - setup_cost_models_table(&pool).await; - add_cost_models(&pool, to_db_models(test_data())).await; - - let deployment_id_from_bytes = DeploymentId( - "0xbd499f7673ca32ef4a642207a8bebdd0fb03888cf2678b298438e3a1ae5206ea" - .parse() - .unwrap(), - ); - let deployment_id_from_hash = - DeploymentId::from_str("Qmb5Ysp5oCUXhLA8NmxmYKDAX2nCMnh7Vvb5uffb9n5vss").unwrap(); - - assert_eq!(deployment_id_from_bytes, deployment_id_from_hash); - - let model = cost_model(&pool, &deployment_id_from_bytes) - .await - .expect("cost model query") - .expect("cost model for deployment"); - - assert_eq!(model.deployment, deployment_id_from_hash); - assert_eq!(model.model, Some("default => 0.00025;".to_string())); - } - - #[sqlx::test] - async fn global_fallback_cost_model(pool: PgPool) { - let test_models = test_data(); - let global_model = global_cost_model(); - - setup_cost_models_table(&pool).await; - add_cost_models(&pool, to_db_models(test_models.clone())).await; - add_cost_models(&pool, vec![global_model.clone()]).await; - - // Test that the behavior is correct for existing deployments - for test_model in test_models { - let model = cost_model(&pool, &test_model.deployment) - .await - .expect("cost model query") - .expect("global cost model fallback"); - - assert_eq!(model.deployment, test_model.deployment); - - if test_model.model.is_some() { - // If the test model has a model definition, we expect that to be returned - assert_eq!(model.model, test_model.model); - } else { - // If the test model has no model definition, we expect the global - // model definition to be returned - assert_eq!(model.model, global_model.model); - } - } - - // Test that querying a non-existing deployment returns the default cost model - let missing_deployment = - DeploymentId::from_str("Qmnononononononononononononononononononononono").unwrap(); - let model = cost_model(&pool, &missing_deployment) - .await - .expect("cost model query") - .expect("global cost model fallback"); - assert_eq!(model.deployment, missing_deployment); - assert_eq!(model.model, global_model.model); - } -} diff --git a/service/src/common/mod.rs b/service/src/common/mod.rs deleted file mode 100644 index a7c9b560b..000000000 --- a/service/src/common/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -pub mod address; -pub mod database; -pub mod indexer_management; diff --git a/service/src/config.rs b/service/src/config.rs index c131a4f65..1f7203e4f 100644 --- a/service/src/config.rs +++ b/service/src/config.rs @@ -1,267 +1,246 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use clap::{command, Args, Parser, ValueEnum}; +use std::path::PathBuf; use alloy_primitives::Address; +use figment::{ + providers::{Format, Toml}, + Figment, +}; +use indexer_common::indexer_service::http::IndexerServiceConfig; use serde::{Deserialize, Serialize}; use toolshed::thegraph::DeploymentId; -use crate::util::init_tracing; - -#[derive(Clone, Debug, Parser, Serialize, Deserialize, Default)] -#[clap( - name = "indexer-service", - about = "Indexer service on top of graph node", - author = "hopeyen" -)] -#[command(author, version, about, long_about = None, arg_required_else_help = true)] -pub struct Cli { - #[command(flatten)] +#[derive(Clone, Deserialize)] +pub struct Config { pub ethereum: Ethereum, - #[command(flatten)] pub receipts: Receipts, - #[command(flatten)] pub indexer_infrastructure: IndexerInfrastructure, - #[command(flatten)] pub postgres: Postgres, - #[command(flatten)] pub network_subgraph: NetworkSubgraph, - #[command(flatten)] pub escrow_subgraph: EscrowSubgraph, - #[arg( - short, - value_name = "config", - env = "CONFIG", - help = "Indexer service configuration file (YAML format)" - )] - config: Option, + pub common: IndexerServiceConfig, } -#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)] -#[group(required = true, multiple = true)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct Ethereum { - #[clap( - long, - value_name = "ethereum-node-provider", - env = "ETH_NODE", - help = "Ethereum node or provider URL" - )] + // #[clap( + // long, + // value_name = "ethereum-node-provider", + // env = "ETH_NODE", + // help = "Ethereum node or provider URL" + // )] pub ethereum: String, - #[clap( - long, - value_name = "ethereum-polling-interval", - env = "ETHEREUM_POLLING_INTERVAL", - default_value_t = 4000, - help = "Polling interval for the Ethereum provider (ms)" - )] + // #[clap( + // long, + // value_name = "ethereum-polling-interval", + // env = "ETHEREUM_POLLING_INTERVAL", + // default_value_t = 4000, + // help = "Polling interval for the Ethereum provider (ms)" + // )] pub ethereum_polling_interval: usize, - #[clap( - long, - value_name = "mnemonic", - env = "MNEMONIC", - help = "Mnemonic for the operator wallet" - )] + // #[clap( + // long, + // value_name = "mnemonic", + // env = "MNEMONIC", + // help = "Mnemonic for the operator wallet" + // )] pub mnemonic: String, - #[clap( - long, - value_name = "indexer-address", - env = "INDEXER_ADDRESS", - help = "Ethereum address of the indexer" - )] + // #[clap( + // long, + // value_name = "indexer-address", + // env = "INDEXER_ADDRESS", + // help = "Ethereum address of the indexer" + // )] pub indexer_address: Address, } -#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)] -#[group(required = true, multiple = true)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct Receipts { - #[clap( - long, - value_name = "receipts-verifier-chain-id", - env = "RECEIPTS_VERIFIER_CHAIN_ID", - help = "Scalar TAP verifier chain ID" - )] + // #[clap( + // long, + // value_name = "receipts-verifier-chain-id", + // env = "RECEIPTS_VERIFIER_CHAIN_ID", + // help = "Scalar TAP verifier chain ID" + // )] pub receipts_verifier_chain_id: u64, - #[clap( - long, - value_name = "receipts-verifier-address", - env = "RECEIPTS_VERIFIER_ADDRESS", - help = "Scalar TAP verifier contract address" - )] + // #[clap( + // long, + // value_name = "receipts-verifier-address", + // env = "RECEIPTS_VERIFIER_ADDRESS", + // help = "Scalar TAP verifier contract address" + // )] pub receipts_verifier_address: Address, } -#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)] -#[group(required = true, multiple = true)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct IndexerInfrastructure { - #[clap( - long, - value_name = "port", - env = "PORT", - default_value_t = 7600, - help = "Port to serve queries at" - )] + // #[clap( + // long, + // value_name = "port", + // env = "PORT", + // default_value_t = 7600, + // help = "Port to serve queries at" + // )] pub port: u32, - #[clap( - long, - value_name = "metrics-port", - env = "METRICS_PORT", - default_value_t = 7300, - help = "Port to serve Prometheus metrics at" - )] + // #[clap( + // long, + // value_name = "metrics-port", + // env = "METRICS_PORT", + // default_value_t = 7300, + // help = "Port to serve Prometheus metrics at" + // )] pub metrics_port: u16, - #[clap( - long, - value_name = "graph-node-query-endpoint", - env = "GRAPH_NODE_QUERY_ENDPOINT", - default_value_t = String::from("http://0.0.0.0:8000"), - help = "Graph node GraphQL HTTP service endpoint" - )] + // #[clap( + // long, + // value_name = "graph-node-query-endpoint", + // env = "GRAPH_NODE_QUERY_ENDPOINT", + // default_value_t = String::from("http://0.0.0.0:8000"), + // help = "Graph node GraphQL HTTP service endpoint" + // )] pub graph_node_query_endpoint: String, - #[clap( - long, - value_name = "graph-node-status-endpoint", - env = "GRAPH_NODE_STATUS_ENDPOINT", - default_value_t = String::from("http://0.0.0.0:8030"), - help = "Graph node endpoint for the index node server" - )] + // #[clap( + // long, + // value_name = "graph-node-status-endpoint", + // env = "GRAPH_NODE_STATUS_ENDPOINT", + // default_value_t = String::from("http://0.0.0.0:8030"), + // help = "Graph node endpoint for the index node server" + // )] pub graph_node_status_endpoint: String, - #[clap( - long, - value_name = "log-level", - env = "LOG_LEVEL", - value_enum, - help = "Log level in RUST_LOG format" - )] + // #[clap( + // long, + // value_name = "log-level", + // env = "LOG_LEVEL", + // value_enum, + // help = "Log level in RUST_LOG format" + // )] pub log_level: Option, - #[clap( - long, - value_name = "gcloud-profiling", - env = "GCLOUD_PROFILING", - default_value_t = false, - help = "Whether to enable Google Cloud profiling" - )] + // #[clap( + // long, + // value_name = "gcloud-profiling", + // env = "GCLOUD_PROFILING", + // default_value_t = false, + // help = "Whether to enable Google Cloud profiling" + // )] pub gcloud_profiling: bool, - #[clap( - long, - value_name = "free-query-auth-token", - env = "FREE_QUERY_AUTH_TOKEN", - help = "Auth token that clients can use to query for free" - )] + // #[clap( + // long, + // value_name = "free-query-auth-token", + // env = "FREE_QUERY_AUTH_TOKEN", + // help = "Auth token that clients can use to query for free" + // )] pub free_query_auth_token: Option, } -#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)] -#[group(required = true, multiple = true)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct Postgres { - #[clap( - long, - value_name = "postgres-host", - env = "POSTGRES_HOST", - default_value_t = String::from("http://0.0.0.0/"), - help = "Postgres host" - )] + // #[clap( + // long, + // value_name = "postgres-host", + // env = "POSTGRES_HOST", + // default_value_t = String::from("http://0.0.0.0/"), + // help = "Postgres host" + // )] pub postgres_host: String, - #[clap( - long, - value_name = "postgres-port", - env = "POSTGRES_PORT", - default_value_t = 5432, - help = "Postgres port" - )] + // #[clap( + // long, + // value_name = "postgres-port", + // env = "POSTGRES_PORT", + // default_value_t = 5432, + // help = "Postgres port" + // )] pub postgres_port: usize, - #[clap( - long, - value_name = "postgres-database", - env = "POSTGRES_DATABASE", - help = "Postgres database name" - )] + // #[clap( + // long, + // value_name = "postgres-database", + // env = "POSTGRES_DATABASE", + // help = "Postgres database name" + // )] pub postgres_database: String, - #[clap( - long, - value_name = "postgres-username", - env = "POSTGRES_USERNAME", - default_value_t = String::from("postgres"), - help = "Postgres username" - )] + // #[clap( + // long, + // value_name = "postgres-username", + // env = "POSTGRES_USERNAME", + // default_value_t = String::from("postgres"), + // help = "Postgres username" + // )] pub postgres_username: String, - #[clap( - long, - value_name = "postgres-password", - env = "POSTGRES_PASSWORD", - default_value_t = String::from(""), - help = "Postgres password" - )] + // #[clap( + // long, + // value_name = "postgres-password", + // env = "POSTGRES_PASSWORD", + // default_value_t = String::from(""), + // help = "Postgres password" + // )] pub postgres_password: String, } -#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)] -#[group(required = true, multiple = true)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct NetworkSubgraph { - #[clap( - long, - value_name = "network-subgraph-deployment", - env = "NETWORK_SUBGRAPH_DEPLOYMENT", - help = "Network subgraph deployment" - )] + // #[clap( + // long, + // value_name = "network-subgraph-deployment", + // env = "NETWORK_SUBGRAPH_DEPLOYMENT", + // help = "Network subgraph deployment" + // )] pub network_subgraph_deployment: Option, - #[clap( - long, - value_name = "network-subgraph-endpoint", - env = "NETWORK_SUBGRAPH_ENDPOINT", - default_value_t = String::from("https://api.thegraph.com/subgraphs/name/graphprotocol/graph-network-goerli"), - help = "Endpoint to query the network subgraph from" - )] + // #[clap( + // long, + // value_name = "network-subgraph-endpoint", + // env = "NETWORK_SUBGRAPH_ENDPOINT", + // default_value_t = String::from("https://api.thegraph.com/subgraphs/name/graphprotocol/graph-network-goerli"), + // help = "Endpoint to query the network subgraph from" + // )] pub network_subgraph_endpoint: String, - #[clap( - long, - value_name = "network-subgraph-auth-token", - env = "NETWORK_SUBGRAPH_AUTH_TOKEN", - help = "Bearer token to require for /network queries" - )] + // #[clap( + // long, + // value_name = "network-subgraph-auth-token", + // env = "NETWORK_SUBGRAPH_AUTH_TOKEN", + // help = "Bearer token to require for /network queries" + // )] pub network_subgraph_auth_token: Option, - #[clap( - long, - value_name = "serve-network-subgraph", - env = "SERVE_NETWORK_SUBGRAPH", - default_value_t = false, - help = "Whether to serve the network subgraph at /network" - )] + // #[clap( + // long, + // value_name = "serve-network-subgraph", + // env = "SERVE_NETWORK_SUBGRAPH", + // default_value_t = false, + // help = "Whether to serve the network subgraph at /network" + // )] pub serve_network_subgraph: bool, - #[clap( - long, - value_name = "allocation-syncing-interval", - env = "ALLOCATION_SYNCING_INTERVAL", - default_value_t = 120_000, - help = "Interval (in ms) for syncing indexer allocations from the network" - )] + // #[clap( + // long, + // value_name = "allocation-syncing-interval", + // env = "ALLOCATION_SYNCING_INTERVAL", + // default_value_t = 120_000, + // help = "Interval (in ms) for syncing indexer allocations from the network" + // )] pub allocation_syncing_interval: u64, - #[clap( - long, - value_name = "client-signer-address", - env = "CLIENT_SIGNER_ADDRESS", - help = "Address that signs query fee receipts from a known client" - )] + // #[clap( + // long, + // value_name = "client-signer-address", + // env = "CLIENT_SIGNER_ADDRESS", + // help = "Address that signs query fee receipts from a known client" + // )] pub client_signer_address: Option, } -#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)] -#[group(required = true, multiple = true)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct EscrowSubgraph { - #[clap( - long, - value_name = "escrow-subgraph-deployment", - env = "ESCROW_SUBGRAPH_DEPLOYMENT", - help = "Escrow subgraph deployment" - )] + // #[clap( + // long, + // value_name = "escrow-subgraph-deployment", + // env = "ESCROW_SUBGRAPH_DEPLOYMENT", + // help = "Escrow subgraph deployment" + // )] pub escrow_subgraph_deployment: Option, - #[clap( - long, - value_name = "escrow-subgraph-endpoint", - env = "ESCROW_SUBGRAPH_ENDPOINT", - help = "Endpoint to query the network subgraph from" - )] + // #[clap( + // long, + // value_name = "escrow-subgraph-endpoint", + // env = "ESCROW_SUBGRAPH_ENDPOINT", + // help = "Endpoint to query the network subgraph from" + // )] pub escrow_subgraph_endpoint: String, // #[clap( // long, @@ -288,39 +267,8 @@ pub struct EscrowSubgraph { pub escrow_syncing_interval: u64, } -impl Cli { - /// Parse config arguments - /// If environmental variable for config is set to a valid config file path, then parse from config - /// Otherwise parse from command line arguments - pub fn args() -> Self { - let cli = if let Ok(file_path) = std::env::var("config") { - confy::load_path::(file_path.clone()) - .unwrap_or_else(|_| panic!("Parse config file at {}", file_path.clone())) - } else { - Cli::parse() - // Potentially store it for the user - // let _ = confy::store_path("./args.toml", cli.clone()); - }; - - // Enables tracing under RUST_LOG variable - if let Some(log_setting) = &cli.indexer_infrastructure.log_level { - std::env::set_var("RUST_LOG", log_setting); - }; - // add a LogFormat to config - init_tracing("pretty".to_string()).expect("Could not set up global default subscriber for logger, check environmental variable `RUST_LOG` or the CLI input `log-level`"); - cli +impl Config { + pub fn load(filename: &PathBuf) -> Result { + Figment::new().merge(Toml::file(filename)).extract() } } - -#[derive( - Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Serialize, Deserialize, Default, -)] -pub enum LogLevel { - Trace, - #[default] - Debug, - Info, - Warn, - Error, - Fatal, -} diff --git a/service/src/graph_node.rs b/service/src/graph_node.rs deleted file mode 100644 index 7c875764f..000000000 --- a/service/src/graph_node.rs +++ /dev/null @@ -1,176 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use anyhow::anyhow; -use reqwest::{header, Client, Url}; -use std::sync::Arc; -use toolshed::thegraph::DeploymentId; - -use crate::query_processor::{QueryError, UnattestedQueryResult}; - -/// Graph node query wrapper. -/// -/// This is Arc internally, so it can be cloned and shared between threads. -#[derive(Debug, Clone)] -pub struct GraphNodeInstance { - client: Client, // it is Arc - subgraphs_base_url: Arc, -} - -impl GraphNodeInstance { - pub fn new(endpoint: &str) -> GraphNodeInstance { - let subgraphs_base_url = Url::parse(endpoint) - .and_then(|u| u.join("/subgraphs/id/")) - .expect("Could not parse graph node endpoint"); - let client = reqwest::Client::builder() - .user_agent("indexer-service") - .build() - .expect("Could not build a client to graph node query endpoint"); - GraphNodeInstance { - client, - subgraphs_base_url: Arc::new(subgraphs_base_url), - } - } - - pub async fn subgraph_query_raw( - &self, - subgraph_id: &DeploymentId, - data: String, - ) -> Result { - let request = self - .client - .post( - self.subgraphs_base_url - .join(&subgraph_id.to_string()) - .map_err(|e| { - QueryError::Other(anyhow!( - "Could not build subgraph query URL: {}", - e.to_string() - )) - })?, - ) - .body(data) - .header(header::CONTENT_TYPE, "application/json"); - - let response = request.send().await?; - let attestable = response - .headers() - .get("graph-attestable") - .map_or(false, |v| v == "true"); - - Ok(UnattestedQueryResult { - graphql_response: response.text().await?, - attestable, - }) - } -} - -#[cfg(test)] -mod test { - use std::str::FromStr; - - use lazy_static::lazy_static; - use serde_json::json; - use wiremock::matchers::{method, path}; - use wiremock::{Mock, MockServer, ResponseTemplate}; - - use super::*; - - lazy_static! { - static ref NETWORK_SUBGRAPH_ID: DeploymentId = - DeploymentId::from_str("QmV614UpBCpuusv5MsismmPYu4KqLtdeNMKpiNrX56kw6u").unwrap(); - } - - async fn mock_graph_node_server() -> MockServer { - let mock_server = MockServer::start().await; - let mock = Mock::given(method("POST")) - .and(path( - "/subgraphs/id/".to_string() + &NETWORK_SUBGRAPH_ID.to_string(), - )) - .respond_with(ResponseTemplate::new(200).set_body_raw( - r#" - { - "data": { - "graphNetwork": { - "currentEpoch": 960 - } - } - } - "#, - "application/json", - )); - mock_server.register(mock).await; - - mock_server - } - - async fn local_graph_node() -> GraphNodeInstance { - let graph_node_endpoint = std::env::var("GRAPH_NODE_ENDPOINT") - .expect("GRAPH_NODE_ENDPOINT env variable is not set"); - - GraphNodeInstance::new(&graph_node_endpoint) - } - - /// Also tests against the network subgraph, but using the `subgraph_query_raw` method - #[tokio::test] - #[ignore] // Run only if explicitly specified - async fn test_subgraph_query_local() { - let network_subgraph_id = DeploymentId::from_str( - &std::env::var("NETWORK_SUBGRAPH_ID") - .expect("NETWORK_SUBGRAPH_ID env variable is not set"), - ) - .unwrap(); - - let graph_node = local_graph_node().await; - - let query = r#" - query { - graphNetwork(id: 1) { - currentEpoch - } - } - "#; - - let query_json = json!({ - "query": query, - "variables": {} - }); - - let response = graph_node - .subgraph_query_raw(&network_subgraph_id, query_json.to_string()) - .await - .unwrap(); - - // Check that the response is valid JSON - let _json: serde_json::Value = serde_json::from_str(&response.graphql_response).unwrap(); - } - - /// Also tests against the network subgraph, but using the `subgraph_query_raw` method - #[tokio::test] - async fn test_subgraph_query() { - let mock_server = mock_graph_node_server().await; - - let graph_node = GraphNodeInstance::new(&mock_server.uri()); - - let query = r#" - query { - graphNetwork(id: 1) { - currentEpoch - } - } - "#; - - let query_json = json!({ - "query": query, - "variables": {} - }); - - let response = graph_node - .subgraph_query_raw(&NETWORK_SUBGRAPH_ID, query_json.to_string()) - .await - .unwrap(); - - // Check that the response is valid JSON - let _json: serde_json::Value = serde_json::from_str(&response.graphql_response).unwrap(); - } -} diff --git a/service/src/main.rs b/service/src/main.rs index 3b94213a5..e928e1dd2 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -1,40 +1,90 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use alloy_sol_types::eip712_domain; -use axum::Server; -use dotenvy::dotenv; -use ethereum_types::U256; -use std::{net::SocketAddr, str::FromStr, time::Duration}; -use tracing::info; - -use indexer_common::{ - indexer_service::http::IndexerServiceRelease, - prelude::{ - attestation_signers, dispute_manager, escrow_accounts, indexer_allocations, - DeploymentDetails, SubgraphClient, TapManager, - }, +use std::sync::Arc; + +use anyhow::Error; +use axum::{ + async_trait, + response::{IntoResponse, Response}, + routing::get, + Json, Router, +}; +use clap::Parser; +use indexer_common::indexer_service::http::{ + IndexerService, IndexerServiceImpl, IndexerServiceOptions, IndexerServiceRelease, IsAttestable, }; +use serde::Serialize; +use serde_json::{json, Value}; +use thiserror::Error; +use toolshed::thegraph::DeploymentId; +use tracing::error; -use util::shutdown_signal; +mod cli; +mod config; +mod routes; -use crate::{ - common::database, config::Cli, metrics::handle_serve_metrics, query_processor::QueryProcessor, - server::create_server, util::public_key, -}; +use cli::Cli; +use config::Config; -use server::ServerOptions; +#[derive(Debug, Error)] +pub enum SubgraphServiceError {} -mod common; -mod config; -mod graph_node; -mod metrics; -mod query_processor; -mod server; -mod util; +#[derive(Serialize)] +#[serde(transparent)] +struct SubgraphResponse { + inner: Value, + #[serde(skip)] + attestable: bool, +} + +impl SubgraphResponse { + fn new(inner: Value, attestable: bool) -> Self { + Self { inner, attestable } + } +} + +impl IntoResponse for SubgraphResponse { + fn into_response(self) -> Response { + Json(self.inner).into_response() + } +} + +impl IsAttestable for SubgraphResponse { + fn is_attestable(&self) -> bool { + self.attestable + } +} + +pub struct SubgraphServiceState { + config: Config, +} + +struct SubgraphService { + config: Config, +} + +impl SubgraphService { + fn new(config: Config) -> Self { + Self { config } + } +} + +#[async_trait] +impl IndexerServiceImpl for SubgraphService { + type Error = SubgraphServiceError; + type Request = serde_json::Value; + type Response = SubgraphResponse; + type State = SubgraphServiceState; -#[cfg(test)] -mod test_vectors; + async fn process_request( + &self, + _manifest_id: DeploymentId, + request: Self::Request, + ) -> Result<(Self::Request, Self::Response), Self::Error> { + Ok((request, SubgraphResponse::new(json!("hello"), false))) + } +} /// Create Indexer service App /// @@ -49,151 +99,185 @@ mod test_vectors; /// /// Return response from Query Processor #[tokio::main] -async fn main() -> Result<(), std::io::Error> { - dotenv().ok(); +async fn main() -> Result<(), Error> { + // Parse command line and environment arguments + let cli = Cli::parse(); + + // Load the json-rpc service configuration, which is a combination of the + // general configuration options for any indexer service and specific + // options added for JSON-RPC + let config = match Config::load(&cli.config) { + Ok(config) => config, + Err(e) => { + error!( + "Invalid configuration file `{}`: {}", + cli.config.display(), + e + ); + std::process::exit(1); + } + }; // Parse basic configurations - let config = Cli::args(); build_info::build_info!(fn build_info); let release = IndexerServiceRelease::from(build_info()); - // Initialize graph-node client - let graph_node = graph_node::GraphNodeInstance::new( - &config.indexer_infrastructure.graph_node_query_endpoint, - ); - - let http_client = reqwest::Client::builder() - .tcp_nodelay(true) - .timeout(Duration::from_secs(30)) - .build() - .expect("Failed to init HTTP client"); - - // Make an instance of network subgraph at either - // graph_node_query_endpoint/subgraphs/id/network_subgraph_deployment - // or network_subgraph_endpoint - // - // We're leaking the network subgraph here to obtain a reference with - // a static lifetime, which avoids having to pass around and clone `Arc` - // objects everywhere. Since the network subgraph is read-only, this is - // no problem. - let network_subgraph = Box::leak(Box::new(SubgraphClient::new( - http_client.clone(), - config - .network_subgraph - .network_subgraph_deployment - .map(|deployment| { - DeploymentDetails::for_graph_node( - &config.indexer_infrastructure.graph_node_status_endpoint, - &config.indexer_infrastructure.graph_node_query_endpoint, - deployment, - ) - }) - .transpose() - .expect("Failed to parse graph node query endpoint and network subgraph deployment"), - DeploymentDetails::for_query_url(&config.network_subgraph.network_subgraph_endpoint) - .expect("Failed to parse network subgraph endpoint"), - ))); - - let indexer_allocations = indexer_allocations( - network_subgraph, - config.ethereum.indexer_address, - 1, - Duration::from_millis(config.network_subgraph.allocation_syncing_interval), - ); - - // TODO: Chain ID should be a config - let graph_network_id = 1; - - let dispute_manager = - dispute_manager(network_subgraph, graph_network_id, Duration::from_secs(60)); - - let attestation_signers = attestation_signers( - indexer_allocations.clone(), - config.ethereum.mnemonic.clone(), - U256::from(graph_network_id), - dispute_manager, - ); - - // Establish Database connection necessary for serving indexer management - // requests with defined schema - // Note: Typically, you'd call `sqlx::migrate!();` here to sync the models - // which defaults to files in "./migrations" to sync the database; - // however, this can cause conflicts with the migrations run by indexer - // agent. Hence we leave syncing and migrating entirely to the agent and - // assume the models are up to date in the service. - let indexer_management_db = database::connect(&config.postgres).await; - - let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new( - http_client, - config - .escrow_subgraph - .escrow_subgraph_deployment - .map(|deployment| { - DeploymentDetails::for_graph_node( - &config.indexer_infrastructure.graph_node_status_endpoint, - &config.indexer_infrastructure.graph_node_query_endpoint, - deployment, - ) - }) - .transpose() - .expect("Failed to parse graph node query endpoint and escrow subgraph deployment"), - DeploymentDetails::for_query_url(&config.escrow_subgraph.escrow_subgraph_endpoint) - .expect("Failed to parse escrow subgraph endpoint"), - ))); - - let escrow_accounts = escrow_accounts( - escrow_subgraph, - config.ethereum.indexer_address, - Duration::from_millis(config.escrow_subgraph.escrow_syncing_interval), - ); - - let tap_manager = TapManager::new( - indexer_management_db.clone(), - indexer_allocations, - escrow_accounts, - // TODO: arguments for eip712_domain should be a config - eip712_domain! { - name: "Scalar TAP", - version: "1", - chain_id: config.receipts.receipts_verifier_chain_id, - verifying_contract: config.receipts.receipts_verifier_address, - }, - ); - // Proper initiation of server, query processor - // server health check, graph-node instance connection check - let query_processor = - QueryProcessor::new(graph_node.clone(), attestation_signers.clone(), tap_manager); - - // Start indexer service basic metrics - tokio::spawn(handle_serve_metrics( - String::from("0.0.0.0"), - config.indexer_infrastructure.metrics_port, - )); - - let service_options = ServerOptions::new( - Some(config.indexer_infrastructure.port), + // Some of the subgrpah service configuration goes into the so-called + // "state", which will be passed to any request handler, middleware etc. + // that is involved in serving requests + let state = Arc::new(SubgraphServiceState { + config: config.clone(), + }); + + IndexerService::run(IndexerServiceOptions { release, - query_processor, - config.indexer_infrastructure.free_query_auth_token, - config.indexer_infrastructure.graph_node_status_endpoint, - indexer_management_db, - public_key(&config.ethereum.mnemonic).expect("Failed to initiate with operator wallet"), - network_subgraph, - config.network_subgraph.network_subgraph_auth_token, - config.network_subgraph.serve_network_subgraph, - ); - - info!("Initialized server options"); - let app = create_server(service_options).await; - - let addr = SocketAddr::from_str(&format!("0.0.0.0:{}", config.indexer_infrastructure.port)) - .expect("Start server port"); - info!("Initialized server app at {}", addr); - Server::bind(&addr) - .serve(app.into_make_service()) - .with_graceful_shutdown(shutdown_signal()) - .await - .unwrap(); - - Ok(()) + config: config.common.clone(), + url_namespace: "subgraphs", + metrics_prefix: "subgraph", + service_impl: SubgraphService::new(config), + extra_routes: Router::new() + .route("/operator", get(routes::operator)) + .with_state(state), + }) + .await + + // // Initialize graph-node client + // let graph_node = graph_node::GraphNodeInstance::new( + // &config.indexer_infrastructure.graph_node_query_endpoint, + // ); + + // let http_client = reqwest::Client::builder() + // .tcp_nodelay(true) + // .timeout(Duration::from_secs(30)) + // .build() + // .expect("Failed to init HTTP client"); + + // // Make an instance of network subgraph at either + // // graph_node_query_endpoint/subgraphs/id/network_subgraph_deployment + // // or network_subgraph_endpoint + // // + // // We're leaking the network subgraph here to obtain a reference with + // // a static lifetime, which avoids having to pass around and clone `Arc` + // // objects everywhere. Since the network subgraph is read-only, this is + // // no problem. + // let network_subgraph = Box::leak(Box::new(SubgraphClient::new( + // http_client.clone(), + // config + // .network_subgraph + // .network_subgraph_deployment + // .map(|deployment| { + // DeploymentDetails::for_graph_node( + // &config.indexer_infrastructure.graph_node_status_endpoint, + // &config.indexer_infrastructure.graph_node_query_endpoint, + // deployment, + // ) + // }) + // .transpose() + // .expect("Failed to parse graph node query endpoint and network subgraph deployment"), + // DeploymentDetails::for_query_url(&config.network_subgraph.network_subgraph_endpoint) + // .expect("Failed to parse network subgraph endpoint"), + // ))); + + // let indexer_allocations = indexer_allocations( + // network_subgraph, + // config.ethereum.indexer_address, + // 1, + // Duration::from_millis(config.network_subgraph.allocation_syncing_interval), + // ); + + // // TODO: Chain ID should be a config + // let graph_network_id = 1; + + // let dispute_manager = + // dispute_manager(network_subgraph, graph_network_id, Duration::from_secs(60)); + + // let attestation_signers = attestation_signers( + // indexer_allocations.clone(), + // config.ethereum.mnemonic.clone(), + // U256::from(graph_network_id), + // dispute_manager, + // ); + + // // Establish Database connection necessary for serving indexer management + // // requests with defined schema + // // Note: Typically, you'd call `sqlx::migrate!();` here to sync the models + // // which defaults to files in "./migrations" to sync the database; + // // however, this can cause conflicts with the migrations run by indexer + // // agent. Hence we leave syncing and migrating entirely to the agent and + // // assume the models are up to date in the service. + // let indexer_management_db = database::connect(&config.postgres).await; + + // let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new( + // http_client, + // config + // .escrow_subgraph + // .escrow_subgraph_deployment + // .map(|deployment| { + // DeploymentDetails::for_graph_node( + // &config.indexer_infrastructure.graph_node_status_endpoint, + // &config.indexer_infrastructure.graph_node_query_endpoint, + // deployment, + // ) + // }) + // .transpose() + // .expect("Failed to parse graph node query endpoint and escrow subgraph deployment"), + // DeploymentDetails::for_query_url(&config.escrow_subgraph.escrow_subgraph_endpoint) + // .expect("Failed to parse escrow subgraph endpoint"), + // ))); + + // let escrow_accounts = escrow_accounts( + // escrow_subgraph, + // config.ethereum.indexer_address, + // Duration::from_millis(config.escrow_subgraph.escrow_syncing_interval), + // ); + + // let tap_manager = TapManager::new( + // indexer_management_db.clone(), + // indexer_allocations, + // escrow_accounts, + // // TODO: arguments for eip712_domain should be a config + // eip712_domain! { + // name: "Scalar TAP", + // version: "1", + // chain_id: config.receipts.receipts_verifier_chain_id, + // verifying_contract: config.receipts.receipts_verifier_address, + // }, + // ); + // // Proper initiation of server, query processor + // // server health check, graph-node instance connection check + // let query_processor = + // QueryProcessor::new(graph_node.clone(), attestation_signers.clone(), tap_manager); + + // // Start indexer service basic metrics + // tokio::spawn(handle_serve_metrics( + // String::from("0.0.0.0"), + // config.indexer_infrastructure.metrics_port, + // )); + + // let service_options = ServerOptions::new( + // Some(config.indexer_infrastructure.port), + // release, + // query_processor, + // config.indexer_infrastructure.free_query_auth_token, + // config.indexer_infrastructure.graph_node_status_endpoint, + // indexer_management_db, + // public_key(&config.ethereum.mnemonic).expect("Failed to initiate with operator wallet"), + // network_subgraph, + // config.network_subgraph.network_subgraph_auth_token, + // config.network_subgraph.serve_network_subgraph, + // ); + + // info!("Initialized server options"); + // let app = create_server(service_options).await; + + // let addr = SocketAddr::from_str(&format!("0.0.0.0:{}", config.indexer_infrastructure.port)) + // .expect("Start server port"); + // info!("Initialized server app at {}", addr); + // Server::bind(&addr) + // .serve(app.into_make_service()) + // .with_graceful_shutdown(shutdown_signal()) + // .await + // .unwrap(); + + // Ok(()) } diff --git a/service/src/metrics/mod.rs b/service/src/metrics/mod.rs deleted file mode 100644 index dfc8d5628..000000000 --- a/service/src/metrics/mod.rs +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use autometrics::{encode_global_metrics, global_metrics_exporter}; -use axum::http::StatusCode; -use axum::routing::get; -use axum::Router; -use lazy_static::lazy_static; -use prometheus::{register_histogram_vec, register_int_counter_vec, HistogramVec, IntCounterVec}; -use std::{net::SocketAddr, str::FromStr}; -use tracing::info; - -// Record Queries related metrics -lazy_static! { - pub static ref QUERIES: IntCounterVec = register_int_counter_vec!( - "indexer_service_queries_total", - "Incoming queries", - &["deployment"], - ) - .expect("Failed to create queries counters"); - pub static ref SUCCESSFUL_QUERIES: IntCounterVec = register_int_counter_vec!( - "indexer_service_queries_ok", - "Successfully executed queries", - &["deployment"], - ) - .expect("Failed to create successfulQueries counters"); - pub static ref FAILED_QUERIES: IntCounterVec = register_int_counter_vec!( - "indexer_service_queries_failed", - "Queries that failed to execute", - &["deployment"], - ) - .expect("Failed to create failedQueries counters"); - pub static ref QUERIES_WITH_INVALID_RECEIPT_HEADER: IntCounterVec = register_int_counter_vec!( - "indexer_service_queries_with_invalid_receipt_header", - "Queries that failed executing because they came with an invalid receipt header", - &["deployment"], - ) - .expect("Failed to create queriesWithInvalidReceiptHeader counters"); - pub static ref QUERIES_WITHOUT_RECEIPT: IntCounterVec = register_int_counter_vec!( - "indexer_service_queries_without_receipt", - "Queries that failed executing because they came without a receipt", - &["deployment"], - ) - .expect("Failed to create queriesWithoutReceipt counters"); - pub static ref QUERY_DURATION: HistogramVec = register_histogram_vec!( - "indexer_service_query_duration", - "Duration of processing a query from start to end", - &["deployment"], - ) - .unwrap(); -} - -/// This handler serializes the metrics into a string for Prometheus to scrape -pub async fn get_metrics() -> (StatusCode, String) { - match encode_global_metrics() { - Ok(metrics) => (StatusCode::OK, metrics), - Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, format!("{err:?}")), - } -} - -/// Metrics server router -pub async fn handle_serve_metrics(host: String, port: u16) { - // Set up the exporter to collect metrics - let _exporter = global_metrics_exporter(); - - let app = Router::new().route("/metrics", get(get_metrics)); - let addr = - SocketAddr::from_str(&format!("{}:{}", host, port)).expect("Start Prometheus metrics"); - let server = axum::Server::bind(&addr); - info!( - address = addr.to_string(), - "Prometheus Metrics port exposed" - ); - - server - .serve(app.into_make_service()) - .await - .expect("Error starting Prometheus metrics port"); -} diff --git a/service/src/query_processor.rs b/service/src/query_processor.rs deleted file mode 100644 index b4e99120c..000000000 --- a/service/src/query_processor.rs +++ /dev/null @@ -1,257 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use std::collections::HashMap; - -use alloy_primitives::Address; -use eventuals::Eventual; -use indexer_common::tap_manager::TapManager; -use log::error; -use serde::{Deserialize, Serialize}; -use tap_core::tap_manager::SignedReceipt; -use toolshed::thegraph::attestation::Attestation; -use toolshed::thegraph::DeploymentId; - -use indexer_common::indexer_errors::{IndexerError, IndexerErrorCause, IndexerErrorCode}; -use indexer_common::prelude::AttestationSigner; - -use crate::graph_node::GraphNodeInstance; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct QueryResult { - #[serde(rename = "graphQLResponse")] - pub graphql_response: String, - pub attestation: Option, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct UnattestedQueryResult { - #[serde(rename = "graphQLResponse")] - pub graphql_response: String, - pub attestable: bool, -} - -#[derive(Debug, Serialize, Deserialize, PartialEq)] -pub struct Response { - pub result: T, - pub status: i64, -} - -/// Free query do not need signature, receipt, signers -/// Also ignore metrics for now -/// Later add along with PaidQuery -#[derive(Debug)] -pub struct FreeQuery { - pub subgraph_deployment_id: DeploymentId, - pub query: String, -} - -/// Paid query needs subgraph_deployment_id, query, receipt -pub struct PaidQuery { - pub subgraph_deployment_id: DeploymentId, - pub query: String, - pub receipt: String, -} - -#[derive(Debug, thiserror::Error)] -pub enum QueryError { - #[error(transparent)] - Transport(#[from] reqwest::Error), - #[error("The subgraph is in a failed state")] - IndexingError, - #[error("Bad or invalid entity data found in the subgraph: {}", .0.to_string())] - BadData(anyhow::Error), - #[error("Unknown error: {0}")] - Other(anyhow::Error), -} - -#[derive(Clone)] -pub struct QueryProcessor { - graph_node: GraphNodeInstance, - attestation_signers: Eventual>, - tap_manager: TapManager, -} - -impl QueryProcessor { - pub fn new( - graph_node: GraphNodeInstance, - attestation_signers: Eventual>, - tap_manager: TapManager, - ) -> QueryProcessor { - QueryProcessor { - graph_node, - attestation_signers, - tap_manager, - } - } - - pub async fn execute_free_query( - &self, - query: FreeQuery, - ) -> Result, QueryError> { - let response = self - .graph_node - .subgraph_query_raw(&query.subgraph_deployment_id, query.query) - .await?; - - Ok(Response { - result: response, - status: 200, - }) - } - - pub async fn execute_paid_query( - &self, - query: PaidQuery, - ) -> Result, QueryError> { - let PaidQuery { - subgraph_deployment_id, - query, - receipt, - } = query; - - let parsed_receipt: SignedReceipt = match serde_json::from_str(&receipt) - .map_err(|e| QueryError::Other(anyhow::Error::from(e))) - { - Ok(r) => r, - Err(e) => { - IndexerError::new( - IndexerErrorCode::IE031, - Some(IndexerErrorCause::new( - "Failed to parse receipt for a paid query", - )), - ); - - return Err(e); - } - }; - - let allocation_id = parsed_receipt.message.allocation_id; - - self.tap_manager - .verify_and_store_receipt(parsed_receipt) - .await - .map_err(|e| { - IndexerError::new( - IndexerErrorCode::IE053, - Some(IndexerErrorCause::new( - "Failed to verify and store a parsed receipt", - )), - ); - - QueryError::Other(e) - })?; - - let signers = self - .attestation_signers - .value_immediate() - .ok_or_else(|| QueryError::Other(anyhow::anyhow!("System is not ready yet")))?; - let signer = signers.get(&allocation_id).ok_or_else(|| { - let err_msg = format!("No signer found for allocation id {}", allocation_id); - IndexerError::new( - IndexerErrorCode::IE022, - Some(IndexerErrorCause::new(err_msg.clone())), - ); - - QueryError::Other(anyhow::anyhow!(err_msg)) - })?; - - let response = self - .graph_node - .subgraph_query_raw(&subgraph_deployment_id, query.clone()) - .await?; - - let attestation = response - .attestable - .then(|| Self::create_attestation(signer, &query, &response)); - - Ok(Response { - result: QueryResult { - graphql_response: response.graphql_response, - attestation, - }, - status: 200, - }) - } - - fn create_attestation( - signer: &AttestationSigner, - query: &str, - response: &UnattestedQueryResult, - ) -> Attestation { - signer.create_attestation(query, &response.graphql_response) - } -} - -#[cfg(test)] -mod tests { - use std::str::FromStr; - - use alloy_primitives::Address; - use ethers_core::types::U256; - use indexer_common::prelude::{ - Allocation, AllocationStatus, AttestationSigner, SubgraphDeployment, - }; - use lazy_static::lazy_static; - - use super::*; - - const INDEXER_OPERATOR_MNEMONIC: &str = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about"; - const INDEXER_ADDRESS: &str = "0x1234567890123456789012345678901234567890"; - - lazy_static! { - static ref DEPLOYMENT_ID: DeploymentId = DeploymentId( - "0xc064c354bc21dd958b1d41b67b8ef161b75d2246b425f68ed4c74964ae705cbd" - .parse() - .unwrap(), - ); - } - - #[test] - fn paid_query_attestation() { - let subgraph_deployment = SubgraphDeployment { - id: *DEPLOYMENT_ID, - denied_at: None, - }; - - let indexer = Address::from_str(INDEXER_ADDRESS).unwrap(); - let allocation = &Allocation { - id: Address::from_str("0x4CAF2827961262ADEF3D0Ad15C341e40c21389a4").unwrap(), - status: AllocationStatus::Null, - subgraph_deployment, - indexer, - allocated_tokens: U256::from(100), - created_at_epoch: 940, - created_at_block_hash: String::from(""), - closed_at_epoch: None, - closed_at_epoch_start_block_hash: None, - previous_epoch_start_block_hash: None, - poi: None, - query_fee_rebates: None, - query_fees_collected: None, - }; - - let attestation_signer = AttestationSigner::new( - INDEXER_OPERATOR_MNEMONIC, - allocation, - U256::from(1), - Address::from_str("0xdeadbeefcafebabedeadbeefcafebabedeadbeef").unwrap(), - ) - .unwrap(); - - let request = "test input"; - let response = "test output"; - let attestation = QueryProcessor::create_attestation( - &attestation_signer, - request, - &UnattestedQueryResult { - graphql_response: response.to_owned(), - attestable: true, - }, - ); - - attestation_signer - .verify(&attestation, request, response, &allocation.id) - .unwrap(); - } -} diff --git a/service/src/routes/mod.rs b/service/src/routes/mod.rs new file mode 100644 index 000000000..27f5dbaf8 --- /dev/null +++ b/service/src/routes/mod.rs @@ -0,0 +1,3 @@ +mod operator; + +pub use operator::operator; diff --git a/service/src/routes/operator.rs b/service/src/routes/operator.rs new file mode 100644 index 000000000..9b8be16fb --- /dev/null +++ b/service/src/routes/operator.rs @@ -0,0 +1,19 @@ +use std::sync::Arc; + +use axum::{extract::State, response::Json}; +use ethers::signers::{coins_bip39::English, MnemonicBuilder, Signer}; +use serde_json::{json, Value}; + +use crate::SubgraphServiceState; + +// Define a handler function for the `/info` route +pub async fn operator(State(state): State>) -> Json { + let mnemonic = &state.config.common.indexer.operator_mnemonic; + let address = MnemonicBuilder::::default() + .phrase(mnemonic.as_str()) + .build() + .expect("operator mnemonic must be valid") + .address() + .to_string(); + Json(json!({ "publicKey": address })) +} diff --git a/service/src/server/mod.rs b/service/src/server/mod.rs deleted file mode 100644 index 329f2cc34..000000000 --- a/service/src/server/mod.rs +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -pub(crate) use axum::{ - error_handling::HandleErrorLayer, - handler::Handler, - http::{Method, StatusCode}, - routing::get, -}; - -use axum::{routing::post, Extension, Router}; -use sqlx::PgPool; -use std::time::Duration; -use tower::{BoxError, ServiceBuilder}; -use tower_http::{ - add_extension::AddExtensionLayer, - cors::CorsLayer, - trace::{self, TraceLayer}, -}; -use tracing::Level; - -use indexer_common::{indexer_service::http::IndexerServiceRelease, prelude::SubgraphClient}; - -use crate::{ - query_processor::QueryProcessor, - server::routes::{network_ratelimiter, slow_ratelimiter}, -}; - -pub mod routes; - -#[derive(Clone)] -pub struct ServerOptions { - pub port: Option, - pub release: IndexerServiceRelease, - pub query_processor: QueryProcessor, - pub free_query_auth_token: Option, - pub graph_node_status_endpoint: String, - pub indexer_management_db: PgPool, - pub operator_public_key: String, - pub network_subgraph: &'static SubgraphClient, - pub network_subgraph_auth_token: Option, - pub serve_network_subgraph: bool, -} - -impl ServerOptions { - #[allow(clippy::too_many_arguments)] - pub fn new( - port: Option, - release: IndexerServiceRelease, - query_processor: QueryProcessor, - free_query_auth_token: Option, - graph_node_status_endpoint: String, - indexer_management_db: PgPool, - operator_public_key: String, - network_subgraph: &'static SubgraphClient, - network_subgraph_auth_token: Option, - serve_network_subgraph: bool, - ) -> Self { - let free_query_auth_token = free_query_auth_token.map(|token| format!("Bearer {}", token)); - - ServerOptions { - port, - release, - query_processor, - free_query_auth_token, - graph_node_status_endpoint, - indexer_management_db, - operator_public_key, - network_subgraph, - network_subgraph_auth_token, - serve_network_subgraph, - } - } -} - -pub async fn create_server(options: ServerOptions) -> Router { - Router::new() - .route("/", get(routes::basic::index)) - .route("/health", get(routes::basic::health)) - .route("/version", get(routes::basic::version)) - .route( - "/status", - post(routes::status::status_queries) - .layer(AddExtensionLayer::new(network_ratelimiter())), - ) - .route( - "/subgraphs/health/:deployment", - get(routes::deployment::deployment_health - .layer(AddExtensionLayer::new(slow_ratelimiter()))), - ) - .route( - "/cost", - post(routes::cost::graphql_handler) - .get(routes::cost::graphql_handler) - .layer(AddExtensionLayer::new(slow_ratelimiter())), - ) - .nest( - "/operator", - routes::basic::create_operator_server(options.clone()) - .layer(AddExtensionLayer::new(slow_ratelimiter())), - ) - .route( - "/network", - post(routes::network::network_queries) - .layer(AddExtensionLayer::new(network_ratelimiter())), - ) - .route( - "/subgraphs/id/:id", - post(routes::subgraphs::subgraph_queries), - ) - .layer(Extension(options.clone())) - .layer(CorsLayer::new().allow_methods([Method::GET, Method::POST])) - .layer( - // Handle error for timeout, ratelimit, or a general internal server error - ServiceBuilder::new() - .layer(HandleErrorLayer::new(|error: BoxError| async move { - if error.is::() { - Ok(StatusCode::REQUEST_TIMEOUT) - } else { - Err(( - StatusCode::INTERNAL_SERVER_ERROR, - format!("Unhandled internal error: {}", error), - )) - } - })) - .layer( - TraceLayer::new_for_http() - .make_span_with(trace::DefaultMakeSpan::new().level(Level::DEBUG)) - .on_response(trace::DefaultOnResponse::new().level(Level::DEBUG)), - ) - .timeout(Duration::from_secs(10)) - .into_inner(), - ) -} diff --git a/service/src/server/routes/basic.rs b/service/src/server/routes/basic.rs deleted file mode 100644 index 1c0f2f8aa..000000000 --- a/service/src/server/routes/basic.rs +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use axum::{extract::Extension, routing::get, Router}; -use axum::{http::StatusCode, response::IntoResponse, Json}; -use serde::Serialize; -use serde_json::json; - -use crate::server::ServerOptions; - -#[derive(Serialize)] -struct Health { - healthy: bool, -} - -/// Endpoint for server health -pub async fn health() -> impl IntoResponse { - let health = Health { healthy: true }; - (StatusCode::OK, Json(health)) -} - -/// Index endpoint for status checks -pub async fn index() -> impl IntoResponse { - let responder = "Ready to roll!".to_string(); - responder.into_response() -} - -/// Endpoint for package version -pub async fn version(server: axum::extract::Extension) -> impl IntoResponse { - let version = server.release.clone(); - (StatusCode::OK, Json(version)) -} - -// Define a handler function for the `/info` route -async fn operator_info(Extension(options): Extension) -> Json { - let public_key = &options.operator_public_key; - Json(json!({ "publicKey": public_key })) -} - -// Create a function to build the operator server router -pub fn create_operator_server(_options: ServerOptions) -> Router { - Router::new().route("/info", get(operator_info)) -} diff --git a/service/src/server/routes/cost.rs b/service/src/server/routes/cost.rs deleted file mode 100644 index caeb39ad3..000000000 --- a/service/src/server/routes/cost.rs +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use std::str::FromStr; - -use async_graphql::{Context, EmptyMutation, EmptySubscription, Object, Schema, SimpleObject}; -use async_graphql_axum::{GraphQLRequest, GraphQLResponse}; -use axum::extract::Extension; -use serde::{Deserialize, Serialize}; -use serde_json::Value; -use toolshed::thegraph::DeploymentId; - -use crate::{ - common::indexer_management::{self, CostModel}, - server::ServerOptions, -}; - -#[derive(Clone, Debug, Serialize, Deserialize, SimpleObject)] -pub struct GraphQlCostModel { - pub deployment: String, - pub model: Option, - pub variables: Option, -} - -impl From for GraphQlCostModel { - fn from(model: CostModel) -> Self { - Self { - deployment: model.deployment.to_string(), - model: model.model, - variables: model.variables, - } - } -} - -pub type CostSchema = Schema; - -#[derive(Default)] -pub struct QueryRoot; - -#[Object] -impl QueryRoot { - async fn cost_models( - &self, - ctx: &Context<'_>, - deployments: Vec, - ) -> Result, anyhow::Error> { - let deployment_ids = deployments - .into_iter() - .map(|s| DeploymentId::from_str(&s)) - .collect::, _>>()?; - let pool = &ctx.data_unchecked::().indexer_management_db; - let cost_models = indexer_management::cost_models(pool, &deployment_ids).await?; - Ok(cost_models.into_iter().map(|m| m.into()).collect()) - } - - async fn cost_model( - &self, - ctx: &Context<'_>, - deployment: String, - ) -> Result, anyhow::Error> { - let deployment_id = DeploymentId::from_str(&deployment)?; - let pool = &ctx.data_unchecked::().indexer_management_db; - indexer_management::cost_model(pool, &deployment_id) - .await - .map(|model_opt| model_opt.map(GraphQlCostModel::from)) - } -} - -pub(crate) async fn graphql_handler( - req: GraphQLRequest, - Extension(schema): Extension, - Extension(server_options): Extension, -) -> GraphQLResponse { - schema - .execute(req.into_inner().data(server_options)) - .await - .into() -} diff --git a/service/src/server/routes/deployment.rs b/service/src/server/routes/deployment.rs deleted file mode 100644 index 855190890..000000000 --- a/service/src/server/routes/deployment.rs +++ /dev/null @@ -1,148 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; - -use serde::{Deserialize, Serialize}; -use serde_json::json; - -use crate::server::{routes::internal_server_error_response, ServerOptions}; -use indexer_common::indexer_errors::{IndexerError, IndexerErrorCause}; - -/// Parse an incoming query request and route queries with authenticated -/// free query token to graph node -/// Later add receipt manager functions for paid queries -pub async fn deployment_health( - Extension(server): Extension, - deployment: axum::extract::Path, -) -> impl IntoResponse { - // Create the GraphQL query - let query = status_query(deployment.to_string()); - - // Send the GraphQL request - let response = reqwest::Client::new() - .post(server.graph_node_status_endpoint) - .header("Content-Type", "application/json") - .json(&query) - .send() - .await; - - match response { - Ok(response) => { - if response.status().is_success() { - // Deserialize the JSON response - //TODO: match with error - let data: serde_json::Value = if let Ok(data) = response.json().await { - data - } else { - return internal_server_error_response("Invalid json response"); - }; - - // Process the response and return the appropriate HTTP status - let status = if let Some(status) = - data["data"]["indexingStatuses"].get(0).and_then(|s| { - let parse = serde_json::from_value::(s.clone()); - parse.ok() - }) { - status - } else { - return internal_server_error_response("Missing indexing status"); - }; - - // Build health response based on the returned status - if status.health == SubgraphHealth::failed { - return internal_server_error_response("Subgraph deployment has failed"); - } - - if let Ok((latest, head)) = block_numbers(status) { - if latest > head - 5 { - (StatusCode::OK, Json("Subgraph deployment is up to date")).into_response() - } else { - internal_server_error_response("Subgraph deployment is lagging behind") - } - } else { - internal_server_error_response( - "Invalid indexing status (missing block numbers)", - ) - } - } else { - internal_server_error_response("Unknown error") - } - } - Err(e) => internal_server_error_response(&e.to_string()), - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -struct IndexingStatus { - health: SubgraphHealth, - chains: Vec, -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -#[allow(non_camel_case_types)] // Need exact field names to match with GQL response -enum SubgraphHealth { - healthy, - unhealthy, - failed, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -struct ChainStatus { - network: String, - latest_block: Block, - chain_head_block: Block, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -struct Block { - number: String, - hash: String, -} - -fn status_query(deployment: String) -> serde_json::Value { - json!({ - "query": r#"query indexingStatus($subgraphs: [String!]!) { - indexingStatuses(subgraphs: $subgraphs) { - subgraph - health - chains { - network - ... on EthereumIndexingStatus { - latestBlock { number hash } - chainHeadBlock { number hash } - } - } - } - }"#, - "variables": { - "subgraphs": [deployment], - }, - }) -} - -fn block_numbers(status: IndexingStatus) -> Result<(u64, u64), IndexerError> { - let latest_block_number = status - .chains - .get(0) - .map(|chain| chain.latest_block.number.clone()) - .map(|number| number.parse::()); - - let head_block_number = status - .chains - .get(0) - .map(|chain| chain.chain_head_block.number.clone()) - .map(|number| number.parse::()); - - if let (Some(Ok(latest)), Some(Ok(head))) = (latest_block_number, head_block_number) { - Ok((latest, head)) - } else { - Err(IndexerError::new( - indexer_common::indexer_errors::IndexerErrorCode::IE018, - Some(IndexerErrorCause::new( - "Ill formatted block numbers from indexing status", - )), - )) - } -} diff --git a/service/src/server/routes/mod.rs b/service/src/server/routes/mod.rs deleted file mode 100644 index f0aae9888..000000000 --- a/service/src/server/routes/mod.rs +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use axum::{ - http::StatusCode, - response::{IntoResponse, Response}, - Json, -}; -use hyper::http::HeaderName; -use indexer_common::indexer_errors::{IndexerError, IndexerErrorCause}; -use tower::limit::RateLimitLayer; - -pub mod basic; -pub mod cost; -pub mod deployment; -pub mod network; -pub mod status; -pub mod subgraphs; - -/// Helper function to convert response body to query string -pub async fn response_body_to_query_string( - body: hyper::body::Body, -) -> Result { - let query_bytes = hyper::body::to_bytes(body).await.map_err(|e| { - IndexerError::new( - indexer_common::indexer_errors::IndexerErrorCode::IE075, - Some(IndexerErrorCause::new(e)), - ) - })?; - let query_string = String::from_utf8(query_bytes.to_vec()).map_err(|e| { - IndexerError::new( - indexer_common::indexer_errors::IndexerErrorCode::IE075, - Some(IndexerErrorCause::new(e)), - ) - })?; - Ok(query_string) -} - -/// Create response for a bad request -pub fn bad_request_response(error_body: &str) -> Response { - ( - StatusCode::BAD_REQUEST, - axum::response::AppendHeaders([(HeaderName::from_static("graph-attestable"), "false")]), - Json(error_body.to_string()), - ) - .into_response() -} - -/// Create response for an internal server error -pub fn internal_server_error_response(error_body: &str) -> Response { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(error_body.to_string()), - ) - .into_response() -} - -/// Limit status requests to 9000/30min (5/s) -pub fn slow_ratelimiter() -> RateLimitLayer { - RateLimitLayer::new(9000, std::time::Duration::from_millis(30 * 60 * 1000)) -} - -/// Limit network requests to 90000/30min (50/s) -pub fn network_ratelimiter() -> RateLimitLayer { - RateLimitLayer::new(90000, std::time::Duration::from_millis(30 * 60 * 1000)) -} diff --git a/service/src/server/routes/network.rs b/service/src/server/routes/network.rs deleted file mode 100644 index ce330c932..000000000 --- a/service/src/server/routes/network.rs +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use axum::{ - extract::Extension, - http::{self, Request}, - response::IntoResponse, - Json, -}; -use serde_json::{json, Value}; - -use crate::server::ServerOptions; - -use super::bad_request_response; - -pub async fn network_queries( - Extension(server): Extension, - req: Request, - axum::extract::Json(body): axum::extract::Json, -) -> impl IntoResponse { - // Extract free query auth token - let auth_token = req - .headers() - .get(http::header::AUTHORIZATION) - .and_then(|t| t.to_str().ok()); - - // Serve only if enabled by indexer and request auth token matches - if !(server.serve_network_subgraph - && auth_token.is_some() - && server.network_subgraph_auth_token.is_some() - && auth_token.unwrap() == server.network_subgraph_auth_token.as_deref().unwrap()) - { - return bad_request_response("Not enabled or authorized query"); - } - - match server.network_subgraph.query::(&body).await { - Ok(result) => Json(json!({ - "data": result.data, - "errors": result.errors.map(|errors| { - errors - .into_iter() - .map(|e| json!({ "message": e.message })) - .collect::>() - }), - })) - .into_response(), - Err(e) => bad_request_response(&e.to_string()), - } -} diff --git a/service/src/server/routes/status.rs b/service/src/server/routes/status.rs deleted file mode 100644 index 630411eb3..000000000 --- a/service/src/server/routes/status.rs +++ /dev/null @@ -1,90 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use std::collections::HashSet; - -use axum::{ - http::{Request, StatusCode}, - response::IntoResponse, - Extension, Json, -}; - -use hyper::body::Bytes; - -use reqwest::{header, Client}; - -use crate::server::ServerOptions; -use indexer_common::{graphql::filter_supported_fields, indexer_errors::*}; - -use super::bad_request_response; - -lazy_static::lazy_static! { - static ref SUPPORTED_ROOT_FIELDS: HashSet<&'static str> = - vec![ - "indexingStatuses", - "chains", - "latestBlock", - "earliestBlock", - "publicProofsOfIndexing", - "entityChangesInBlock", - "blockData", - "cachedEthereumCalls", - "subgraphFeatures", - "apiVersions", - ].into_iter().collect(); -} - -// Custom middleware function to process the request before reaching the main handler -pub async fn status_queries( - Extension(server): Extension, - req: Request, -) -> impl IntoResponse { - let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap(); - // Read the requested query string - let query_string = match String::from_utf8(body_bytes.to_vec()) { - Ok(s) => s, - Err(e) => return bad_request_response(&e.to_string()), - }; - - // filter supported root fields - let query_string = match filter_supported_fields(&query_string, &SUPPORTED_ROOT_FIELDS) { - Ok(query) => query, - Err(unsupported_fields) => { - return ( - StatusCode::BAD_REQUEST, - format!("Cannot query field: {:#?}", unsupported_fields), - ) - .into_response(); - } - }; - - // Pass the modified operation to the actual endpoint - let request = Client::new() - .post(&server.graph_node_status_endpoint) - .body(Bytes::from(query_string)) - .header(header::CONTENT_TYPE, "application/json"); - - match request.send().await { - Ok(r) => match r.json::>().await { - Ok(r) => (StatusCode::OK, Json(r)).into_response(), - Err(e) => { - IndexerError::new( - IndexerErrorCode::IE018, - Some(IndexerErrorCause::new( - "Failed to parse the indexing status API response", - )), - ); - bad_request_response(&e.to_string()) - } - }, - Err(e) => { - IndexerError::new( - IndexerErrorCode::IE018, - Some(IndexerErrorCause::new( - "Failed to query indexing status API from the graph node status endpoint", - )), - ); - bad_request_response(&e.to_string()) - } - } -} diff --git a/service/src/server/routes/subgraphs.rs b/service/src/server/routes/subgraphs.rs deleted file mode 100644 index ec471964c..000000000 --- a/service/src/server/routes/subgraphs.rs +++ /dev/null @@ -1,150 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use axum::{ - extract::Extension, - http::{self, Request, StatusCode}, - response::IntoResponse, - Json, -}; -use std::str::FromStr; -use toolshed::thegraph::DeploymentId; -use tracing::trace; - -use crate::{ - metrics, - query_processor::FreeQuery, - server::{ - routes::{bad_request_response, response_body_to_query_string}, - ServerOptions, - }, -}; -use indexer_common::indexer_errors::*; - -/// Parse an incoming query request and route queries with authenticated -/// free query token to graph node -/// Later add receipt manager functions for paid queries -pub async fn subgraph_queries( - Extension(server): Extension, - id: axum::extract::Path, - req: Request, -) -> impl IntoResponse { - let (parts, body) = req.into_parts(); - - // Initialize id into a subgraph deployment ID - let subgraph_deployment_id = match DeploymentId::from_str(id.as_str()) { - Ok(id) => id, - Err(e) => return bad_request_response(&e.to_string()), - }; - let deployment_label = subgraph_deployment_id.to_string(); - - let query_duration_timer = metrics::QUERY_DURATION - .with_label_values(&[&deployment_label]) - .start_timer(); - metrics::QUERIES - .with_label_values(&[&deployment_label]) - .inc(); - // Extract scalar receipt from header and free query auth token for paid or free query - let receipt = if let Some(receipt) = parts.headers.get("scalar-receipt") { - match receipt.to_str() { - Ok(r) => Some(r), - Err(_) => { - query_duration_timer.observe_duration(); - metrics::QUERIES_WITH_INVALID_RECEIPT_HEADER - .with_label_values(&[&deployment_label]) - .inc(); - let err_msg = "Bad scalar receipt for subgraph query"; - IndexerError::new( - IndexerErrorCode::IE029, - Some(IndexerErrorCause::new(err_msg)), - ); - return bad_request_response(err_msg); - } - } - } else { - None - }; - trace!( - "receipt attached by the query, can pass it to TAP: {:?}", - receipt - ); - - // Extract free query auth token - let auth_token = parts - .headers - .get(http::header::AUTHORIZATION) - .and_then(|t| t.to_str().ok()); - // determine if the query is paid or authenticated to be free - let free = auth_token.is_some() - && server.free_query_auth_token.is_some() - && auth_token.unwrap() == server.free_query_auth_token.as_deref().unwrap(); - - let query_string = match response_body_to_query_string(body).await { - Ok(q) => q, - Err(e) => { - query_duration_timer.observe_duration(); - return bad_request_response(&e.to_string()); - } - }; - - if free { - let free_query = FreeQuery { - subgraph_deployment_id, - query: query_string, - }; - - match server.query_processor.execute_free_query(free_query).await { - Ok(res) if res.status == 200 => { - query_duration_timer.observe_duration(); - (StatusCode::OK, Json(res.result)).into_response() - } - _ => { - IndexerError::new( - IndexerErrorCode::IE033, - Some(IndexerErrorCause::new( - "Failed to execute a free subgraph query to graph node", - )), - ); - bad_request_response("Failed to execute free query") - } - } - } else if let Some(receipt) = receipt { - let paid_query = crate::query_processor::PaidQuery { - subgraph_deployment_id, - query: query_string, - receipt: receipt.to_string(), - }; - - match server.query_processor.execute_paid_query(paid_query).await { - Ok(res) => { - query_duration_timer.observe_duration(); - metrics::SUCCESSFUL_QUERIES - .with_label_values(&[&deployment_label]) - .inc(); - (StatusCode::OK, Json(res.result)).into_response() - } - Err(e) => { - metrics::FAILED_QUERIES - .with_label_values(&[&deployment_label]) - .inc(); - let err_msg = format!( - "Failed to execute a paid subgraph query to graph node: {}", - e - ); - IndexerError::new(IndexerErrorCode::IE032, Some(IndexerErrorCause::new(e))); - return bad_request_response(&err_msg); - } - } - } else { - let error_body = "Query request header missing scalar-receipts or incorrect auth token"; - metrics::QUERIES_WITHOUT_RECEIPT - .with_label_values(&[&deployment_label]) - .inc(); - IndexerError::new( - IndexerErrorCode::IE030, - Some(IndexerErrorCause::new(error_body)), - ); - query_duration_timer.observe_duration(); - bad_request_response(error_body) - } -} diff --git a/service/src/test_vectors.rs b/service/src/test_vectors.rs deleted file mode 100644 index b21c78e71..000000000 --- a/service/src/test_vectors.rs +++ /dev/null @@ -1,12 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use std::str::FromStr; - -use alloy_primitives::Address; -use lazy_static::lazy_static; - -lazy_static! { - pub static ref INDEXER_ADDRESS: Address = - Address::from_str("0x1234567890123456789012345678901234567890").unwrap(); -} diff --git a/service/src/util.rs b/service/src/util.rs deleted file mode 100644 index 0c64abc68..000000000 --- a/service/src/util.rs +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use ethers::signers::WalletError; -use tokio::signal; -use tracing::{ - info, - subscriber::{set_global_default, SetGlobalDefaultError}, -}; -use tracing_subscriber::{EnvFilter, FmtSubscriber}; - -use crate::common::address::{build_wallet, wallet_address}; - -/// Validate that private key as an Eth wallet -pub fn public_key(value: &str) -> Result { - // The wallet can be stored instead of the original private key - let wallet = build_wallet(value)?; - let addr = wallet_address(&wallet); - info!(address = addr, "Resolved Graphcast id"); - Ok(addr) -} - -/// Sets up tracing, allows log level to be set from the environment variables -pub fn init_tracing(format: String) -> Result<(), SetGlobalDefaultError> { - let filter = EnvFilter::from_default_env(); - - let subscriber_builder: tracing_subscriber::fmt::SubscriberBuilder< - tracing_subscriber::fmt::format::DefaultFields, - tracing_subscriber::fmt::format::Format, - EnvFilter, - > = FmtSubscriber::builder().with_env_filter(filter); - - match format.as_str() { - "json" => set_global_default(subscriber_builder.json().finish()), - "full" => set_global_default(subscriber_builder.finish()), - "compact" => set_global_default(subscriber_builder.compact().finish()), - _ => set_global_default(subscriber_builder.with_ansi(true).pretty().finish()), - } -} - -pub async fn shutdown_signal() { - let ctrl_c = async { - signal::ctrl_c() - .await - .expect("failed to install Ctrl+C handler"); - }; - - let terminate = async { - signal::unix::signal(signal::unix::SignalKind::terminate()) - .expect("failed to install signal handler") - .recv() - .await; - }; - - tokio::select! { - _ = ctrl_c => {}, - _ = terminate => {}, - } - - info!("signal received, starting graceful shutdown"); -}