Skip to content

Commit

Permalink
refactor: use tokio::watch for subgraph monitor (#444)
Browse files Browse the repository at this point in the history
  • Loading branch information
shiyasmohd authored Nov 1, 2024
1 parent 4d6ee1c commit fe6219b
Show file tree
Hide file tree
Showing 11 changed files with 252 additions and 195 deletions.
19 changes: 11 additions & 8 deletions common/src/allocations/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,22 @@ mod test {

use super::*;

fn network_subgraph_client() -> &'static SubgraphClient {
Box::leak(Box::new(SubgraphClient::new(
reqwest::Client::new(),
None,
DeploymentDetails::for_query_url(NETWORK_SUBGRAPH_URL).unwrap(),
)))
async fn network_subgraph_client() -> &'static SubgraphClient {
Box::leak(Box::new(
SubgraphClient::new(
reqwest::Client::new(),
None,
DeploymentDetails::for_query_url(NETWORK_SUBGRAPH_URL).unwrap(),
)
.await,
))
}

#[tokio::test]
#[ignore = "depends on the defunct hosted-service"]
async fn test_network_query() {
let result = get_allocations(
network_subgraph_client(),
network_subgraph_client().await,
Address::from_str("0x326c584e0f0eab1f1f83c93cc6ae1acc0feba0bc").unwrap(),
Duration::from_secs(1712448507),
)
Expand All @@ -158,7 +161,7 @@ mod test {
#[ignore = "depends on the defunct hosted-service"]
async fn test_network_query_empty_response() {
let result = get_allocations(
network_subgraph_client(),
network_subgraph_client().await,
Address::from_str("0xdeadbeefcafebabedeadbeefcafebabedeadbeef").unwrap(),
Duration::from_secs(1712448507),
)
Expand Down
3 changes: 2 additions & 1 deletion common/src/attestations/dispute_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ mod test {
*test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT
))
.unwrap(),
);
)
.await;

// Mock result for current epoch requests
mock_server
Expand Down
23 changes: 13 additions & 10 deletions common/src/escrow_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,19 @@ mod tests {
async fn test_current_accounts() {
// Set up a mock escrow subgraph
let mock_server = MockServer::start().await;
let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new(
reqwest::Client::new(),
None,
DeploymentDetails::for_query_url(&format!(
"{}/subgraphs/id/{}",
&mock_server.uri(),
*test_vectors::ESCROW_SUBGRAPH_DEPLOYMENT
))
.unwrap(),
)));
let escrow_subgraph = Box::leak(Box::new(
SubgraphClient::new(
reqwest::Client::new(),
None,
DeploymentDetails::for_query_url(&format!(
"{}/subgraphs/id/{}",
&mock_server.uri(),
*test_vectors::ESCROW_SUBGRAPH_DEPLOYMENT
))
.unwrap(),
)
.await,
));

let mock = Mock::given(method("POST"))
.and(path(format!(
Expand Down
104 changes: 56 additions & 48 deletions common/src/indexer_service/http/indexer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,36 +205,39 @@ impl IndexerService {
.build()
.expect("Failed to init HTTP client");

let network_subgraph: &'static SubgraphClient = Box::leak(Box::new(SubgraphClient::new(
http_client.clone(),
options
.config
.subgraphs
.network
.config
.deployment_id
.map(|deployment| {
DeploymentDetails::for_graph_node_url(
options.config.graph_node.status_url.clone(),
options.config.graph_node.query_url.clone(),
deployment,
)
})
.transpose()
.expect(
"Failed to parse graph node query endpoint and network subgraph deployment",
),
DeploymentDetails::for_query_url_with_token(
options.config.subgraphs.network.config.query_url.as_ref(),
let network_subgraph: &'static SubgraphClient = Box::leak(Box::new(
SubgraphClient::new(
http_client.clone(),
options
.config
.subgraphs
.network
.config
.query_auth_token
.clone(),
)?,
)));
.deployment_id
.map(|deployment| {
DeploymentDetails::for_graph_node_url(
options.config.graph_node.status_url.clone(),
options.config.graph_node.query_url.clone(),
deployment,
)
})
.transpose()
.expect(
"Failed to parse graph node query endpoint and network subgraph deployment",
),
DeploymentDetails::for_query_url_with_token(
options.config.subgraphs.network.config.query_url.as_ref(),
options
.config
.subgraphs
.network
.config
.query_auth_token
.clone(),
)?,
)
.await,
));

// Identify the dispute manager for the configured network
let dispute_manager = dispute_manager(network_subgraph, Duration::from_secs(3600))
Expand Down Expand Up @@ -269,34 +272,39 @@ impl IndexerService {
dispute_manager,
);

let escrow_subgraph: &'static SubgraphClient = Box::leak(Box::new(SubgraphClient::new(
http_client,
options
.config
.subgraphs
.escrow
.config
.deployment_id
.map(|deployment| {
DeploymentDetails::for_graph_node_url(
options.config.graph_node.status_url.clone(),
options.config.graph_node.query_url.clone(),
deployment,
)
})
.transpose()
.expect("Failed to parse graph node query endpoint and escrow subgraph deployment"),
DeploymentDetails::for_query_url_with_token(
options.config.subgraphs.escrow.config.query_url.as_ref(),
let escrow_subgraph: &'static SubgraphClient = Box::leak(Box::new(
SubgraphClient::new(
http_client,
options
.config
.subgraphs
.escrow
.config
.query_auth_token
.clone(),
)?,
)));
.deployment_id
.map(|deployment| {
DeploymentDetails::for_graph_node_url(
options.config.graph_node.status_url.clone(),
options.config.graph_node.query_url.clone(),
deployment,
)
})
.transpose()
.expect(
"Failed to parse graph node query endpoint and escrow subgraph deployment",
),
DeploymentDetails::for_query_url_with_token(
options.config.subgraphs.escrow.config.query_url.as_ref(),
options
.config
.subgraphs
.escrow
.config
.query_auth_token
.clone(),
)?,
)
.await,
));

let escrow_accounts = escrow_accounts(
escrow_subgraph,
Expand Down
43 changes: 30 additions & 13 deletions common/src/subgraph_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use super::monitor::{monitor_deployment_status, DeploymentStatus};
use anyhow::anyhow;
use axum::body::Bytes;
use eventuals::Eventual;
use graphql_client::GraphQLQuery;
use reqwest::{header, Url};
use serde_json::{Map, Value};
Expand All @@ -13,6 +12,7 @@ use thegraph_graphql_http::{
graphql::{Document, IntoDocument},
http::request::{IntoRequestParameters, RequestParameters},
};
use tokio::sync::watch::Receiver;
use tracing::warn;

#[derive(Clone)]
Expand Down Expand Up @@ -140,18 +140,27 @@ impl DeploymentDetails {

struct DeploymentClient {
pub http_client: reqwest::Client,
pub status: Option<Eventual<DeploymentStatus>>,
pub status: Option<Receiver<DeploymentStatus>>,
pub query_url: Url,
}

impl DeploymentClient {
pub fn new(http_client: reqwest::Client, details: DeploymentDetails) -> Self {
pub async fn new(http_client: reqwest::Client, details: DeploymentDetails) -> Self {
Self {
http_client,
status: details
.deployment
.zip(details.status_url)
.map(|(deployment, url)| monitor_deployment_status(deployment, url)),
status: match details.deployment.zip(details.status_url) {
Some((deployment, url)) => Some(
monitor_deployment_status(deployment, url)
.await
.unwrap_or_else(|_| {
panic!(
"Failed to initialize monitoring for deployment `{}`",
deployment
)
}),
),
None => None,
},
query_url: details.query_url,
}
}
Expand All @@ -161,7 +170,7 @@ impl DeploymentClient {
variables: T::Variables,
) -> Result<ResponseResult<T::ResponseData>, anyhow::Error> {
if let Some(ref status) = self.status {
let deployment_status = status.value().await.expect("reading deployment status");
let deployment_status = status.borrow();

if !deployment_status.synced || &deployment_status.health != "healthy" {
return Err(anyhow!(
Expand Down Expand Up @@ -198,7 +207,7 @@ impl DeploymentClient {

pub async fn query_raw(&self, body: Bytes) -> Result<reqwest::Response, anyhow::Error> {
if let Some(ref status) = self.status {
let deployment_status = status.value().await.expect("reading deployment status");
let deployment_status = status.borrow();

if !deployment_status.synced || &deployment_status.health != "healthy" {
return Err(anyhow!(
Expand Down Expand Up @@ -226,14 +235,17 @@ pub struct SubgraphClient {
}

impl SubgraphClient {
pub fn new(
pub async fn new(
http_client: reqwest::Client,
local_deployment: Option<DeploymentDetails>,
remote_deployment: DeploymentDetails,
) -> Self {
Self {
local_client: local_deployment.map(|d| DeploymentClient::new(http_client.clone(), d)),
remote_client: DeploymentClient::new(http_client, remote_deployment),
local_client: match local_deployment {
Some(d) => Some(DeploymentClient::new(http_client.clone(), d).await),
None => None,
},
remote_client: DeploymentClient::new(http_client, remote_deployment).await,
}
}

Expand Down Expand Up @@ -335,12 +347,13 @@ mod test {
mock_server
}

fn network_subgraph_client() -> SubgraphClient {
async fn network_subgraph_client() -> SubgraphClient {
SubgraphClient::new(
reqwest::Client::new(),
None,
DeploymentDetails::for_query_url(NETWORK_SUBGRAPH_URL).unwrap(),
)
.await
}

#[derive(GraphQLQuery)]
Expand All @@ -359,6 +372,7 @@ mod test {

// Check that the response is valid JSON
let result = network_subgraph_client()
.await
.query::<CurrentEpoch, _>(current_epoch::Variables {})
.await
.unwrap();
Expand Down Expand Up @@ -447,6 +461,7 @@ mod test {

// Query the subgraph
let data = client
.await
.query::<UserQuery, _>(user_query::Variables {})
.await
.expect("Query should succeed")
Expand Down Expand Up @@ -527,6 +542,7 @@ mod test {

// Query the subgraph
let data = client
.await
.query::<UserQuery, _>(user_query::Variables {})
.await
.expect("Query should succeed")
Expand Down Expand Up @@ -607,6 +623,7 @@ mod test {

// Query the subgraph
let data = client
.await
.query::<UserQuery, _>(user_query::Variables {})
.await
.expect("Query should succeed")
Expand Down
Loading

0 comments on commit fe6219b

Please sign in to comment.