diff --git a/.gitignore b/.gitignore index f72595b98..18241f970 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ indexer.toml # IDE files .vscode/ migrations/ +.sqlx/ \ No newline at end of file diff --git a/README.md b/README.md index 84ff2c6da..ae6a0d4c5 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,8 @@ Experimental rust impl for The Graph [indexer service](https://github.com/graphp - switching from actix-web to `axum` for the service server - App profiling should utilize `perf`, flamegraphs or cpu profilers, and benches to track and collect performance data. The typescript implementation uses `gcloud-profile` - Consider replacing and adding parts from TAP manager +- `postgres` database connection required to indexer management server database, shared with the indexer agent +- No migration in indexer service as it might introduce conflicts to the database; indexer agent is solely responsible for database management. > Don't know if receipt validation is actually correct, need testing @@ -16,8 +18,8 @@ Experimental rust impl for The Graph [indexer service](https://github.com/graphp - [x] basic structure - [x] CORS - [x] timeouts - - [ ] Rate limiting levels - - [ ] Logger stream + - [x] Rate limiting levels + - [x] Logger stream - [ ] Query processor - [x] graph node query endpoint at specific subgraph path - [x] wrap request to and response from graph node @@ -53,9 +55,10 @@ Experimental rust impl for The Graph [indexer service](https://github.com/graphp - [ ] Status server - [x] indexing status resolver - to query indexingStatuses - [ ] Filter for unsupported queries -- [ ] Cost server - - [ ] Cost graphQL schema - - [ ] query indexer management client for Cost model +- [x] Cost server + - [x] Simple indexer management client to track postgres connection and network subgraph endpoint. + - [x] serve queries with defined graphQL schema and psql resolvers to database: `costModel(deployment)` and `costModels(deployments)`. If deployments is empty, all cost models are returned. + - [x] Global cost model fallback used when specific deployments are queried - [x] Constant service paths - [x] health - [x] ready to roll @@ -74,6 +77,11 @@ Experimental rust impl for The Graph [indexer service](https://github.com/graphp ### Indexer common components Temporarily live inside the indexer-service package under `src/common`. +- Simple indexer management client to track NetworkSubgraph and postgres connection. +- .... +- Keeps cost model schema and resolvers with postgres and graphQL types: `costModel(deployment)` and `costModels(deployments)`. If deployments is empty, all cost models are returned. + - Global cost model fallback used when specific deployments are queried + - No migration in indexer service as it might introduce conflicts to the database; indexer agent is solely responsible for database management. ### Indexer native dependency diff --git a/service/src/common/database.rs b/service/src/common/database.rs index 30bf0bb1f..2a7595169 100644 --- a/service/src/common/database.rs +++ b/service/src/common/database.rs @@ -20,7 +20,12 @@ pub async fn connect(config: &config::Postgres) -> PgPool { std::env::set_var("DATABASE_URL", &url); - debug!("Connecting to database"); + debug!( + postgres_host = tracing::field::debug(&config.postgres_host), + postgres_port = tracing::field::debug(&config.postgres_port), + postgres_database = tracing::field::debug(&config.postgres_database), + "Connecting to database" + ); PgPoolOptions::new() .max_connections(50) diff --git a/service/src/common/indexer_management_client.rs b/service/src/common/indexer_management_client/mod.rs similarity index 86% rename from service/src/common/indexer_management_client.rs rename to service/src/common/indexer_management_client/mod.rs index 7b05c3b7d..09d1e171d 100644 --- a/service/src/common/indexer_management_client.rs +++ b/service/src/common/indexer_management_client/mod.rs @@ -1,10 +1,14 @@ -use sqlx::{PgPool, Pool, Postgres}; +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use sqlx::PgPool; use reqwest::Client; use serde::{Deserialize, Serialize}; use serde_json::Value; -use super::database::{self}; +pub mod resolver; +pub mod schema; /// Indexer management client /// diff --git a/service/src/common/indexer_management_client/resolver.rs b/service/src/common/indexer_management_client/resolver.rs new file mode 100644 index 000000000..ac38fcd00 --- /dev/null +++ b/service/src/common/indexer_management_client/resolver.rs @@ -0,0 +1,333 @@ +use sqlx::PgPool; + +use super::schema::CostModel; +use crate::common::types::SubgraphDeploymentID; + +/// Query postgres indexer management server's cost models +/// Filter on deployments if it is not empty, otherwise return all cost models +pub async fn cost_models( + pool: &PgPool, + deployments: &[String], +) -> Result, anyhow::Error> { + let deployment_ids = deployments + .iter() + .map(|d| SubgraphDeploymentID::new(d).unwrap().to_string()) + .collect::>(); + let models = if deployment_ids.is_empty() { + sqlx::query_as!( + CostModel, + r#" + SELECT deployment, model, variables + FROM "CostModels" + ORDER BY deployment ASC + "# + ) + .fetch_all(pool) + .await? + } else { + sqlx::query_as!( + CostModel, + r#" + SELECT deployment, model, variables + FROM "CostModels" + WHERE deployment = ANY($1) + ORDER BY deployment ASC + "#, + &deployment_ids + ) + .fetch_all(pool) + .await? + }; + + // Merge deployment cost models with global cost model + let models = match (deployment_ids.is_empty(), global_cost_model(pool).await?) { + (false, Some(global)) => { + let m = deployment_ids + .iter() + .map(|d| { + let m = models.iter().find(|&m| &m.deployment == d).map_or_else( + || { + merge_global( + CostModel { + deployment: d.clone(), + model: None, + variables: None, + }, + &global, + ) + }, + |m| merge_global(m.clone(), &global), + ); + m + }) + .collect::>(); + + m + } + _ => models, + }; + + Ok(models) +} + +/// Make database query for a cost model indexed by deployment id +pub async fn cost_model( + pool: &PgPool, + deployment: &str, +) -> Result, anyhow::Error> { + let deployment_id = SubgraphDeploymentID::new(deployment).unwrap().to_string(); + let model = sqlx::query_as!( + CostModel, + r#" + SELECT deployment, model, variables + FROM "CostModels" + WHERE deployment = $1 + "#, + deployment_id + ) + .fetch_optional(pool) + .await?; + + // Fallback with global cost model + let model = if let Some(global) = global_cost_model(pool).await? { + model.map_or_else( + || { + Some(merge_global( + CostModel { + deployment: deployment.to_string(), + model: None, + variables: None, + }, + &global, + )) + }, + |m| Some(merge_global(m, &global)), + ) + } else { + model + }; + + Ok(model) +} + +/// Query global cost model +pub async fn global_cost_model(pool: &PgPool) -> Result, anyhow::Error> { + let model = sqlx::query_as!( + CostModel, + r#" + SELECT deployment, model, variables + FROM "CostModels" + WHERE deployment = $1 + "#, + "global" + ) + .fetch_optional(pool) + .await?; + Ok(model) +} + +fn merge_global(model: CostModel, global_model: &CostModel) -> CostModel { + CostModel { + deployment: model.deployment, + model: model.model.clone().or(global_model.model.clone()), + variables: model.variables.clone().or(global_model.variables.clone()), + } +} + +#[cfg(test)] +mod test { + + use sqlx::PgPool; + + use super::*; + + async fn setup_cost_models_table(pool: &PgPool) { + sqlx::query!( + r#" + CREATE TABLE "CostModels"( + id INT, + deployment VARCHAR NOT NULL, + model TEXT, + variables JSONB, + PRIMARY KEY( deployment ) + ); + "#, + ) + .execute(pool) + .await + .expect("Create test instance in db"); + } + + async fn add_cost_models(pool: &PgPool, models: Vec) { + for model in models { + sqlx::query!( + r#" + INSERT INTO "CostModels" (deployment, model) + VALUES ($1, $2); + "#, + model.deployment, + model.model, + ) + .execute(pool) + .await + .expect("Create test instance in db"); + } + } + + fn global_cost_model() -> CostModel { + CostModel { + deployment: "global".to_string(), + model: Some("default => 0.00001;".to_string()), + variables: None, + } + } + + fn simple_cost_models() -> Vec { + vec![ + CostModel { + deployment: "0xbd499f7673ca32ef4a642207a8bebdd0fb03888cf2678b298438e3a1ae5206ea" + .to_string(), + model: Some("default => 0.00025;".to_string()), + variables: None, + }, + CostModel { + deployment: "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + .to_string(), + model: None, + variables: None, + }, + ] + } + + #[sqlx::test] + #[ignore] + async fn success_cost_models(pool: PgPool) { + _ = setup_cost_models_table(&pool).await; + let expected_models = simple_cost_models(); + _ = add_cost_models(&pool, expected_models.clone()).await; + let res = cost_models( + &pool, + &["Qmb5Ysp5oCUXhLA8NmxmYKDAX2nCMnh7Vvb5uffb9n5vss".to_string()], + ) + .await + .expect("Cost models query"); + + assert_eq!(res.len(), 1); + assert_eq!( + &res.first().unwrap().deployment, + &expected_models.first().unwrap().deployment + ); + assert_eq!( + &res.first().unwrap().model, + &expected_models.first().unwrap().model + ); + + let res = cost_models( + &pool, + &["0xbd499f7673ca32ef4a642207a8bebdd0fb03888cf2678b298438e3a1ae5206ea".to_string()], + ) + .await + .expect("Cost models query"); + + assert_eq!(res.len(), 1); + assert_eq!( + &res.first().unwrap().deployment, + &expected_models.first().unwrap().deployment + ); + assert_eq!( + &res.first().unwrap().model, + &expected_models.first().unwrap().model + ); + } + + #[sqlx::test] + #[ignore] + async fn global_fallback_cost_models(pool: PgPool) { + let deployment_id = + "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string(); + _ = setup_cost_models_table(&pool).await; + _ = add_cost_models(&pool, simple_cost_models()).await; + let global = global_cost_model(); + _ = add_cost_models(&pool, vec![global.clone()]).await; + let res = cost_models(&pool, &[]) + .await + .expect("Cost models query without deployments filter"); + + assert_eq!(res.len(), 3); + let incomplete_model = res.iter().find(|m| m.deployment == deployment_id.clone()); + assert!(incomplete_model.is_some()); + assert_ne!(incomplete_model.unwrap().model, global.model); + + let res = cost_models( + &pool, + &[ + deployment_id.clone(), + "0xbd499f7673ca32ef4a642207a8bebdd0fb03888cf2678b298438e3a1ae5206ea".to_string(), + ], + ) + .await + .expect("Cost models query without deployments filter"); + + assert_eq!(res.len(), 2); + let incomplete_model = res.iter().find(|m| m.deployment == deployment_id.clone()); + assert!(incomplete_model.is_some()); + assert_eq!(incomplete_model.unwrap().model, global.model); + + let complete_model = res.iter().find(|m| { + m.deployment == *"0xbd499f7673ca32ef4a642207a8bebdd0fb03888cf2678b298438e3a1ae5206ea" + }); + assert!(complete_model.is_some()); + assert_ne!(complete_model.unwrap().model, global.model); + + let missing_deployment = "Qmaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + let res = cost_models(&pool, &[missing_deployment.to_string()]) + .await + .expect("Cost models query without deployments filter"); + + let missing_model = res.iter().find(|m| { + m.deployment == *"0xb5ddb473e202a7abba81803ad153fd93a9b18d07ab38a711f7c2bd79435e50d7" + }); + assert!(missing_model.is_some()); + assert_eq!(missing_model.unwrap().model, global.model); + } + + #[sqlx::test] + #[ignore] + async fn success_cost_model(pool: PgPool) { + let deployment_id = "0xbd499f7673ca32ef4a642207a8bebdd0fb03888cf2678b298438e3a1ae5206ea"; + let deployment_hash = "Qmb5Ysp5oCUXhLA8NmxmYKDAX2nCMnh7Vvb5uffb9n5vss".to_string(); + _ = setup_cost_models_table(&pool).await; + _ = add_cost_models(&pool, simple_cost_models()).await; + let res = cost_model(&pool, &deployment_hash) + .await + .expect("Cost model query") + .expect("Cost model match deployment"); + + assert_eq!(res.deployment, deployment_id.to_string()); + } + + #[sqlx::test] + #[ignore] + async fn global_fallback_cost_model(pool: PgPool) { + let deployment_hash = "Qmaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + _ = setup_cost_models_table(&pool).await; + _ = add_cost_models(&pool, simple_cost_models()).await; + + let res = cost_model(&pool, deployment_hash) + .await + .expect("Cost model query"); + + assert!(res.is_none()); + + let global = global_cost_model(); + _ = add_cost_models(&pool, vec![global.clone()]).await; + + let res = cost_model(&pool, deployment_hash) + .await + .expect("Cost model query") + .expect("Global cost model fallback"); + + assert_eq!(res.model, global.model); + assert_eq!(&res.deployment, deployment_hash); + } +} diff --git a/service/src/model/schema.rs b/service/src/common/indexer_management_client/schema.rs similarity index 67% rename from service/src/model/schema.rs rename to service/src/common/indexer_management_client/schema.rs index 250acd158..a3f24913d 100644 --- a/service/src/model/schema.rs +++ b/service/src/common/indexer_management_client/schema.rs @@ -2,11 +2,11 @@ use async_graphql::{Context, EmptyMutation, EmptySubscription, Object, Schema, S use serde::{Deserialize, Serialize}; use serde_json::Value; use sqlx::FromRow; -use sqlx::{Pool, Postgres}; -use std::sync::Arc; -use thiserror::Error; -use crate::server::ServerOptions; +use crate::{ + common::indexer_error::{IndexerError, IndexerErrorCause, IndexerErrorCode}, + server::ServerOptions, +}; use super::resolver; @@ -17,6 +17,8 @@ pub struct CostModel { pub variables: Option, } +unsafe impl Send for CostModel {} + pub type CostSchema = Schema; // Unified query object for resolvers @@ -31,13 +33,18 @@ impl QueryRoot { &self, ctx: &Context<'_>, deployments: Vec, - ) -> Result, HttpServiceError> { + ) -> Result, IndexerError> { let pool = ctx .data_unchecked::() .indexer_management_client .database(); - let models: Vec = resolver::cost_models(pool, deployments).await?; + let models: Vec = + resolver::cost_models(pool, &deployments) + .await + .map_err(|e| { + IndexerError::new(IndexerErrorCode::IE025, Some(IndexerErrorCause::new(e))) + })?; Ok(models) } @@ -45,21 +52,15 @@ impl QueryRoot { &self, ctx: &Context<'_>, deployment: String, - ) -> Result, HttpServiceError> { + ) -> Result, IndexerError> { let pool = ctx .data_unchecked::() .indexer_management_client .database(); - let model = resolver::cost_model(pool, &deployment).await?; + let model = resolver::cost_model(pool, &deployment).await.map_err(|e| { + IndexerError::new(IndexerErrorCode::IE025, Some(IndexerErrorCause::new(e))) + })?; Ok(model) } } - -#[derive(Error, Debug)] -pub enum HttpServiceError { - #[error("HTTP client error: {0}")] - HttpClientError(#[from] reqwest::Error), - #[error("{0}")] - Others(#[from] anyhow::Error), -} diff --git a/service/src/main.rs b/service/src/main.rs index 7f78840b9..c6a6c15e5 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -12,7 +12,7 @@ use axum::{routing::post, Extension, Router, Server}; use dotenvy::dotenv; use ethereum_types::{Address, U256}; -use std::{net::SocketAddr, str::FromStr, time::Duration, env}; +use std::{net::SocketAddr, str::FromStr, time::Duration}; use tower::{BoxError, ServiceBuilder}; use tower_http::{ add_extension::AddExtensionLayer, @@ -25,10 +25,12 @@ use util::{package_version, shutdown_signal}; use crate::{ common::network_subgraph::NetworkSubgraph, - common::{database, indexer_management_client::IndexerManagementClient}, + common::{ + database, + indexer_management_client::{schema::QueryRoot, IndexerManagementClient}, + }, config::Cli, metrics::handle_serve_metrics, - model::schema::QueryRoot, query_processor::QueryProcessor, server::routes::{network_ratelimiter, slow_ratelimiter}, util::public_key, @@ -41,7 +43,6 @@ mod common; mod config; mod graph_node; mod metrics; -mod model; mod query_processor; mod server; mod util; @@ -109,6 +110,11 @@ async fn main() -> Result<(), std::io::Error> { // Establish Database connection necessary for serving indexer management // requests with defined schema + // Note: Typically, you'd call `sqlx::migrate!();` here to sync the models + // which defaults to files in "./migrations" to sync the database; + // however, this can cause conflicts with the migrations run by indexer + // agent. Hence we leave syncing and migrating entirely to the agent and + // assume the models are up to date in the service. let database = database::connect(&config.postgres).await; let indexer_management_client = IndexerManagementClient::new(database).await; let service_options = ServerOptions::new( diff --git a/service/src/model/mod.rs b/service/src/model/mod.rs deleted file mode 100644 index 979087e69..000000000 --- a/service/src/model/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -pub mod resolver; -pub mod schema; diff --git a/service/src/model/resolver.rs b/service/src/model/resolver.rs deleted file mode 100644 index 9cadd4b7c..000000000 --- a/service/src/model/resolver.rs +++ /dev/null @@ -1,115 +0,0 @@ -use sqlx::PgPool; -use tracing::info; - -use super::schema::CostModel; -use crate::common::types::SubgraphDeploymentID; - -/// Query postgres indexer management server's cost models -/// Filter on deployments if it is not empty, otherwise return all cost models -//TODO: If there is global cost model, merge with all the specific cost models -pub async fn cost_models( - pool: &PgPool, - deployments: Vec, -) -> Result, anyhow::Error> { - let deployment_ids = deployments - .iter() - .map(|d| SubgraphDeploymentID::new(d).unwrap().to_string()) - .collect::>(); - let models = if deployment_ids.is_empty() { - sqlx::query_as!( - CostModel, - r#" - SELECT deployment, model, variables - FROM "CostModels" - ORDER BY deployment ASC - "# - ) - .fetch_all(pool) - .await? - } else { - sqlx::query_as!( - CostModel, - r#" - SELECT deployment, model, variables - FROM "CostModels" - WHERE deployment = ANY($1) - ORDER BY deployment ASC - "#, - &deployment_ids - ) - .fetch_all(pool) - .await? - }; - Ok(models) -} - -/// Make database query for a cost model indexed by deployment id -//TODO: Add global fallback if cost model doesn't exist -pub async fn cost_model( - pool: &PgPool, - deployment: &str, -) -> Result, anyhow::Error> { - let deployment_id = SubgraphDeploymentID::new(deployment).unwrap().to_string(); - let model = sqlx::query_as!( - CostModel, - r#" - SELECT deployment, model, variables - FROM "CostModels" - WHERE deployment = $1 - "#, - deployment_id - ) - .fetch_optional(pool) - .await?; - Ok(model) -} - -#[cfg(test)] -mod test { - use std::str::FromStr; - - use sqlx::{PgPool, types::time::OffsetDateTime}; - - use super::*; - - - async fn mock_cost_models_table(pool: &PgPool) { - let test_models = test_cost_models(); - let datetime = OffsetDateTime::now_utc(); - for model in test_models { - sqlx::query!( - r#" - INSERT INTO "CostModels" (deployment, model, "createdAt", "updatedAt") - VALUES ($1, $2, $3, $4); - "#, - model.deployment, - model.model, - datetime, - datetime - ) - .execute(pool) - .await.expect("Create test instance in db"); - - } - } - - fn test_cost_models() -> Vec { - vec![CostModel { deployment: "0xbd499f7673ca32ef4a642207a8bebdd0fb03888cf2678b298438e3a1ae5206ea".to_string(), - model: Some("default => 0.00025;".to_string()), variables: None }] - } - - #[sqlx::test] - async fn cost_model_handler(pool: PgPool) { - let out_dir = env::var("DATABASE_URL").unwrap(); - let deployment_id = "0xbd499f7673ca32ef4a642207a8bebdd0fb03888cf2678b298438e3a1ae5206ea"; - let deployment_hash = "Qmb5Ysp5oCUXhLA8NmxmYKDAX2nCMnh7Vvb5uffb9n5vss".to_string(); - let timestamp_ns = u64::MAX - 10; - - let res = cost_models( - &pool, - vec![deployment_hash], - ).await.expect("Cost models query"); - - assert_eq!(res.len(), 1); - } -} \ No newline at end of file diff --git a/service/src/server/routes/cost.rs b/service/src/server/routes/cost.rs index dd5ddc418..f0201044b 100644 --- a/service/src/server/routes/cost.rs +++ b/service/src/server/routes/cost.rs @@ -1,17 +1,10 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use async_graphql::http::{playground_source, GraphQLPlaygroundConfig}; use async_graphql_axum::{GraphQLRequest, GraphQLResponse}; -use axum::{ - extract::Extension, - response::{Html, IntoResponse}, -}; +use axum::extract::Extension; -use std::sync::Arc; -use tracing::trace; - -use crate::{model::schema::CostSchema, server::ServerOptions}; +use crate::{common::indexer_management_client::schema::CostSchema, server::ServerOptions}; pub(crate) async fn graphql_handler( req: GraphQLRequest, diff --git a/service/src/util.rs b/service/src/util.rs index c6c4a8eb7..07ff40066 100644 --- a/service/src/util.rs +++ b/service/src/util.rs @@ -7,11 +7,11 @@ use ethers::signers::{ coins_bip39::English, LocalWallet, MnemonicBuilder, Signer, Wallet, WalletError, }; use ethers_core::k256::ecdsa::SigningKey; -use lazy_static::lazy_static; + use native::attestation::AttestationSigner; use serde::Serialize; use std::collections::HashMap; -use std::env; + use std::fs; use tokio::signal; @@ -24,11 +24,6 @@ use tracing_subscriber::{EnvFilter, FmtSubscriber}; use crate::common::indexer_error::{indexer_error, IndexerError}; -lazy_static! { - pub static ref DATABASE_URL: String = - env::var("DATABASE_URL").expect("DATABASE_URL is not set"); -} - /// Struct for version control #[derive(Serialize, Debug, Clone)] pub struct PackageVersion {