Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use tokio::watch for subgraph monitor #444

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions common/src/allocations/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,22 @@ mod test {

use super::*;

fn network_subgraph_client() -> &'static SubgraphClient {
Box::leak(Box::new(SubgraphClient::new(
reqwest::Client::new(),
None,
DeploymentDetails::for_query_url(NETWORK_SUBGRAPH_URL).unwrap(),
)))
async fn network_subgraph_client() -> &'static SubgraphClient {
Box::leak(Box::new(
SubgraphClient::new(
reqwest::Client::new(),
None,
DeploymentDetails::for_query_url(NETWORK_SUBGRAPH_URL).unwrap(),
)
.await,
))
}

#[tokio::test]
#[ignore = "depends on the defunct hosted-service"]
async fn test_network_query() {
let result = get_allocations(
network_subgraph_client(),
network_subgraph_client().await,
Address::from_str("0x326c584e0f0eab1f1f83c93cc6ae1acc0feba0bc").unwrap(),
Duration::from_secs(1712448507),
)
Expand All @@ -158,7 +161,7 @@ mod test {
#[ignore = "depends on the defunct hosted-service"]
async fn test_network_query_empty_response() {
let result = get_allocations(
network_subgraph_client(),
network_subgraph_client().await,
Address::from_str("0xdeadbeefcafebabedeadbeefcafebabedeadbeef").unwrap(),
Duration::from_secs(1712448507),
)
Expand Down
3 changes: 2 additions & 1 deletion common/src/attestations/dispute_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ mod test {
*test_vectors::NETWORK_SUBGRAPH_DEPLOYMENT
))
.unwrap(),
);
)
.await;

// Mock result for current epoch requests
mock_server
Expand Down
23 changes: 13 additions & 10 deletions common/src/escrow_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,19 @@ mod tests {
async fn test_current_accounts() {
// Set up a mock escrow subgraph
let mock_server = MockServer::start().await;
let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new(
reqwest::Client::new(),
None,
DeploymentDetails::for_query_url(&format!(
"{}/subgraphs/id/{}",
&mock_server.uri(),
*test_vectors::ESCROW_SUBGRAPH_DEPLOYMENT
))
.unwrap(),
)));
let escrow_subgraph = Box::leak(Box::new(
SubgraphClient::new(
reqwest::Client::new(),
None,
DeploymentDetails::for_query_url(&format!(
"{}/subgraphs/id/{}",
&mock_server.uri(),
*test_vectors::ESCROW_SUBGRAPH_DEPLOYMENT
))
.unwrap(),
)
.await,
));

let mock = Mock::given(method("POST"))
.and(path(format!(
Expand Down
104 changes: 56 additions & 48 deletions common/src/indexer_service/http/indexer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,36 +205,39 @@ impl IndexerService {
.build()
.expect("Failed to init HTTP client");

let network_subgraph: &'static SubgraphClient = Box::leak(Box::new(SubgraphClient::new(
http_client.clone(),
options
.config
.subgraphs
.network
.config
.deployment_id
.map(|deployment| {
DeploymentDetails::for_graph_node_url(
options.config.graph_node.status_url.clone(),
options.config.graph_node.query_url.clone(),
deployment,
)
})
.transpose()
.expect(
"Failed to parse graph node query endpoint and network subgraph deployment",
),
DeploymentDetails::for_query_url_with_token(
options.config.subgraphs.network.config.query_url.as_ref(),
let network_subgraph: &'static SubgraphClient = Box::leak(Box::new(
SubgraphClient::new(
http_client.clone(),
options
.config
.subgraphs
.network
.config
.query_auth_token
.clone(),
)?,
)));
.deployment_id
.map(|deployment| {
DeploymentDetails::for_graph_node_url(
options.config.graph_node.status_url.clone(),
options.config.graph_node.query_url.clone(),
deployment,
)
})
.transpose()
.expect(
"Failed to parse graph node query endpoint and network subgraph deployment",
),
DeploymentDetails::for_query_url_with_token(
options.config.subgraphs.network.config.query_url.as_ref(),
options
.config
.subgraphs
.network
.config
.query_auth_token
.clone(),
)?,
)
.await,
));

// Identify the dispute manager for the configured network
let dispute_manager = dispute_manager(network_subgraph, Duration::from_secs(3600))
Expand Down Expand Up @@ -269,34 +272,39 @@ impl IndexerService {
dispute_manager,
);

let escrow_subgraph: &'static SubgraphClient = Box::leak(Box::new(SubgraphClient::new(
http_client,
options
.config
.subgraphs
.escrow
.config
.deployment_id
.map(|deployment| {
DeploymentDetails::for_graph_node_url(
options.config.graph_node.status_url.clone(),
options.config.graph_node.query_url.clone(),
deployment,
)
})
.transpose()
.expect("Failed to parse graph node query endpoint and escrow subgraph deployment"),
DeploymentDetails::for_query_url_with_token(
options.config.subgraphs.escrow.config.query_url.as_ref(),
let escrow_subgraph: &'static SubgraphClient = Box::leak(Box::new(
SubgraphClient::new(
http_client,
options
.config
.subgraphs
.escrow
.config
.query_auth_token
.clone(),
)?,
)));
.deployment_id
.map(|deployment| {
DeploymentDetails::for_graph_node_url(
options.config.graph_node.status_url.clone(),
options.config.graph_node.query_url.clone(),
deployment,
)
})
.transpose()
.expect(
"Failed to parse graph node query endpoint and escrow subgraph deployment",
),
DeploymentDetails::for_query_url_with_token(
options.config.subgraphs.escrow.config.query_url.as_ref(),
options
.config
.subgraphs
.escrow
.config
.query_auth_token
.clone(),
)?,
)
.await,
));

let escrow_accounts = escrow_accounts(
escrow_subgraph,
Expand Down
43 changes: 30 additions & 13 deletions common/src/subgraph_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use super::monitor::{monitor_deployment_status, DeploymentStatus};
use anyhow::anyhow;
use axum::body::Bytes;
use eventuals::Eventual;
use graphql_client::GraphQLQuery;
use reqwest::{header, Url};
use serde_json::{Map, Value};
Expand All @@ -13,6 +12,7 @@ use thegraph_graphql_http::{
graphql::{Document, IntoDocument},
http::request::{IntoRequestParameters, RequestParameters},
};
use tokio::sync::watch::Receiver;
use tracing::warn;

#[derive(Clone)]
Expand Down Expand Up @@ -140,18 +140,27 @@ impl DeploymentDetails {

struct DeploymentClient {
pub http_client: reqwest::Client,
pub status: Option<Eventual<DeploymentStatus>>,
pub status: Option<Receiver<DeploymentStatus>>,
pub query_url: Url,
}

impl DeploymentClient {
pub fn new(http_client: reqwest::Client, details: DeploymentDetails) -> Self {
pub async fn new(http_client: reqwest::Client, details: DeploymentDetails) -> Self {
Self {
http_client,
status: details
.deployment
.zip(details.status_url)
.map(|(deployment, url)| monitor_deployment_status(deployment, url)),
status: match details.deployment.zip(details.status_url) {
Some((deployment, url)) => Some(
monitor_deployment_status(deployment, url)
.await
.unwrap_or_else(|_| {
panic!(
"Failed to initialize monitoring for deployment `{}`",
deployment
)
}),
),
None => None,
},
query_url: details.query_url,
}
}
Expand All @@ -161,7 +170,7 @@ impl DeploymentClient {
variables: T::Variables,
) -> Result<ResponseResult<T::ResponseData>, anyhow::Error> {
if let Some(ref status) = self.status {
let deployment_status = status.value().await.expect("reading deployment status");
let deployment_status = status.borrow();

if !deployment_status.synced || &deployment_status.health != "healthy" {
return Err(anyhow!(
Expand Down Expand Up @@ -198,7 +207,7 @@ impl DeploymentClient {

pub async fn query_raw(&self, body: Bytes) -> Result<reqwest::Response, anyhow::Error> {
if let Some(ref status) = self.status {
let deployment_status = status.value().await.expect("reading deployment status");
let deployment_status = status.borrow();

if !deployment_status.synced || &deployment_status.health != "healthy" {
return Err(anyhow!(
Expand Down Expand Up @@ -226,14 +235,17 @@ pub struct SubgraphClient {
}

impl SubgraphClient {
pub fn new(
pub async fn new(
http_client: reqwest::Client,
local_deployment: Option<DeploymentDetails>,
remote_deployment: DeploymentDetails,
) -> Self {
Self {
local_client: local_deployment.map(|d| DeploymentClient::new(http_client.clone(), d)),
remote_client: DeploymentClient::new(http_client, remote_deployment),
local_client: match local_deployment {
Some(d) => Some(DeploymentClient::new(http_client.clone(), d).await),
None => None,
},
remote_client: DeploymentClient::new(http_client, remote_deployment).await,
}
}

Expand Down Expand Up @@ -335,12 +347,13 @@ mod test {
mock_server
}

fn network_subgraph_client() -> SubgraphClient {
async fn network_subgraph_client() -> SubgraphClient {
SubgraphClient::new(
reqwest::Client::new(),
None,
DeploymentDetails::for_query_url(NETWORK_SUBGRAPH_URL).unwrap(),
)
.await
}

#[derive(GraphQLQuery)]
Expand All @@ -359,6 +372,7 @@ mod test {

// Check that the response is valid JSON
let result = network_subgraph_client()
.await
.query::<CurrentEpoch, _>(current_epoch::Variables {})
.await
.unwrap();
Expand Down Expand Up @@ -447,6 +461,7 @@ mod test {

// Query the subgraph
let data = client
.await
.query::<UserQuery, _>(user_query::Variables {})
.await
.expect("Query should succeed")
Expand Down Expand Up @@ -527,6 +542,7 @@ mod test {

// Query the subgraph
let data = client
.await
.query::<UserQuery, _>(user_query::Variables {})
.await
.expect("Query should succeed")
Expand Down Expand Up @@ -607,6 +623,7 @@ mod test {

// Query the subgraph
let data = client
.await
.query::<UserQuery, _>(user_query::Variables {})
.await
.expect("Query should succeed")
Expand Down
Loading
Loading