Skip to content

Commit

Permalink
fix(torii-sqlite): executor broker messages & ipfs logging (#2923)
Browse files Browse the repository at this point in the history
* refactor(torii) ipfs fetch

* fmt

* improve logging for ipfs fetch

* remove print

* fix broker messages broadcast timing
  • Loading branch information
Larkooo authored Jan 21, 2025
1 parent fbb972b commit ad3e8fa
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 19 deletions.
5 changes: 2 additions & 3 deletions crates/torii/indexer/src/processors/metadata_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use dojo_world::contracts::world::WorldContractReader;
use dojo_world::uri::Uri;
use starknet::core::types::{Event, Felt};
use starknet::providers::Provider;
use torii_sqlite::constants::IPFS_CLIENT_MAX_RETRY;
use torii_sqlite::utils::fetch_content_from_ipfs;
use torii_sqlite::Sql;
use tracing::{error, info};
Expand Down Expand Up @@ -107,7 +106,7 @@ async fn metadata(uri_str: String) -> Result<(WorldMetadata, Option<String>, Opt
let uri = Uri::Ipfs(uri_str);
let cid = uri.cid().ok_or("Uri is malformed").map_err(Error::msg)?;

let bytes = fetch_content_from_ipfs(cid, IPFS_CLIENT_MAX_RETRY).await?;
let bytes = fetch_content_from_ipfs(cid).await?;
let metadata: WorldMetadata = serde_json::from_str(std::str::from_utf8(&bytes)?)?;

let icon_img = fetch_image(&metadata.icon_uri).await;
Expand All @@ -118,7 +117,7 @@ async fn metadata(uri_str: String) -> Result<(WorldMetadata, Option<String>, Opt

async fn fetch_image(image_uri: &Option<Uri>) -> Option<String> {
if let Some(uri) = image_uri {
let data = fetch_content_from_ipfs(uri.cid()?, IPFS_CLIENT_MAX_RETRY).await.ok()?;
let data = fetch_content_from_ipfs(uri.cid()?).await.ok()?;
let encoded = general_purpose::STANDARD.encode(data);
return Some(encoded);
}
Expand Down
4 changes: 2 additions & 2 deletions crates/torii/server/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tokio::fs;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::broadcast::Receiver;
use torii_sqlite::constants::{IPFS_CLIENT_MAX_RETRY, TOKENS_TABLE};
use torii_sqlite::constants::TOKENS_TABLE;
use torii_sqlite::utils::fetch_content_from_ipfs;
use tracing::{debug, error, trace};
use warp::http::Response;
Expand Down Expand Up @@ -204,7 +204,7 @@ async fn fetch_and_process_image(
uri if uri.starts_with("ipfs") => {
debug!(image_uri = %uri, "Fetching image from IPFS");
let cid = uri.strip_prefix("ipfs://").unwrap();
let response = fetch_content_from_ipfs(cid, IPFS_CLIENT_MAX_RETRY)
let response = fetch_content_from_ipfs(cid)
.await
.context("Failed to read image bytes from IPFS response")?;

Expand Down
4 changes: 2 additions & 2 deletions crates/torii/sqlite/src/executor/erc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use starknet_crypto::Felt;
use tracing::{debug, trace, warn};

use super::{ApplyBalanceDiffQuery, Executor};
use crate::constants::{IPFS_CLIENT_MAX_RETRY, SQL_FELT_DELIMITER, TOKEN_BALANCE_TABLE};
use crate::constants::{SQL_FELT_DELIMITER, TOKEN_BALANCE_TABLE};
use crate::executor::LOG_TARGET;
use crate::simple_broker::SimpleBroker;
use crate::types::{ContractType, TokenBalance};
Expand Down Expand Up @@ -289,7 +289,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
uri if uri.starts_with("ipfs") => {
let cid = uri.strip_prefix("ipfs://").unwrap();
debug!(cid = %cid, "Fetching metadata from IPFS");
let response = fetch_content_from_ipfs(cid, IPFS_CLIENT_MAX_RETRY)
let response = fetch_content_from_ipfs(cid)
.await
.context("Failed to fetch metadata from IPFS")?;

Expand Down
6 changes: 3 additions & 3 deletions crates/torii/sqlite/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -775,10 +775,10 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
if new_transaction {
let transaction = mem::replace(&mut self.transaction, self.pool.begin().await?);
transaction.commit().await?;
}

for message in self.publish_queue.drain(..) {
send_broker_message(message);
for message in self.publish_queue.drain(..) {
send_broker_message(message);
}
}

while let Some(result) = self.register_tasks.join_next().await {
Expand Down
20 changes: 11 additions & 9 deletions crates/torii/sqlite/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use ipfs_api_backend_hyper::{IpfsApi, IpfsClient, TryFromUri};
use starknet::core::types::U256;
use starknet_crypto::Felt;
use tokio_util::bytes::Bytes;
use tracing::info;
use tracing::warn;

use crate::constants::{
IPFS_CLIENT_MAX_RETRY, IPFS_CLIENT_PASSWORD, IPFS_CLIENT_URL, IPFS_CLIENT_USERNAME,
Expand Down Expand Up @@ -53,22 +53,24 @@ pub fn sql_string_to_felts(sql_string: &str) -> Vec<Felt> {
sql_string.split(SQL_FELT_DELIMITER).map(|felt| Felt::from_str(felt).unwrap()).collect()
}

pub async fn fetch_content_from_ipfs(cid: &str, mut retries: u8) -> Result<Bytes> {
pub async fn fetch_content_from_ipfs(cid: &str) -> Result<Bytes> {
let mut retries = IPFS_CLIENT_MAX_RETRY;
let client = IpfsClient::from_str(IPFS_CLIENT_URL)?
.with_credentials(IPFS_CLIENT_USERNAME, IPFS_CLIENT_PASSWORD);

while retries > 0 {
let response = client.cat(cid).map_ok(|chunk| chunk.to_vec()).try_concat().await;
match response {
Ok(stream) => return Ok(Bytes::from(stream)),
Err(e) => {
retries -= 1;
if retries > 0 {
info!(
error = %e,
"Fetch uri."
);
tokio::time::sleep(Duration::from_secs(3)).await;
}
warn!(
error = %e,
remaining_attempts = retries,
cid = cid,
"Failed to fetch content from IPFS, retrying after delay"
);
tokio::time::sleep(Duration::from_secs(3)).await;
}
}
}
Expand Down

0 comments on commit ad3e8fa

Please sign in to comment.