From 8111b6d13da6796451d97aa61bff67a584c65727 Mon Sep 17 00:00:00 2001 From: tedison <76473430+edisontim@users.noreply.github.com> Date: Sun, 15 Dec 2024 22:56:00 -0500 Subject: [PATCH] feat: introduce updated at field in top level torii query (#2807) * introduce updated at field in top level query * fmt * fmt + clippy * clippy * fix timestamp issue * fix query count * remove query count * docs: add comments to explain fields --------- Co-authored-by: glihm --- crates/torii/core/src/error.rs | 2 ++ crates/torii/core/src/model.rs | 20 +++++++++++ crates/torii/grpc/proto/types.proto | 1 + crates/torii/grpc/src/server/mod.rs | 33 ++++++++++++++++++- .../grpc/src/server/tests/entities_test.rs | 1 + crates/torii/grpc/src/types/mod.rs | 6 ++++ 6 files changed, 62 insertions(+), 1 deletion(-) diff --git a/crates/torii/core/src/error.rs b/crates/torii/core/src/error.rs index d2e595c874..0a8c497a76 100644 --- a/crates/torii/core/src/error.rs +++ b/crates/torii/core/src/error.rs @@ -51,4 +51,6 @@ pub enum QueryError { SqliteJoinLimit, #[error("Invalid namespaced model: {0}")] InvalidNamespacedModel(String), + #[error("Invalid timestamp: {0}. Expected valid number of seconds since unix epoch.")] + InvalidTimestamp(u64), } diff --git a/crates/torii/core/src/model.rs b/crates/torii/core/src/model.rs index 90313a7536..cee6ed29ac 100644 --- a/crates/torii/core/src/model.rs +++ b/crates/torii/core/src/model.rs @@ -116,6 +116,7 @@ impl ModelReader for ModelSQLReader { } /// Creates a query that fetches all models and their nested data. +#[allow(clippy::too_many_arguments)] pub fn build_sql_query( schemas: &Vec, table_name: &str, @@ -124,6 +125,7 @@ pub fn build_sql_query( order_by: Option<&str>, limit: Option, offset: Option, + internal_updated_at: u64, ) -> Result<(String, String), Error> { fn collect_columns(table_prefix: &str, path: &str, ty: &Ty, selections: &mut Vec) { match ty { @@ -172,6 +174,7 @@ pub fn build_sql_query( selections.push(format!("{}.id", table_name)); selections.push(format!("{}.keys", table_name)); + let mut internal_updated_at_clause = Vec::with_capacity(schemas.len()); // Process each model schema for model in schemas { let model_table = model.name(); @@ -180,6 +183,10 @@ pub fn build_sql_query( [{model_table}].{entity_relation_column}", )); + if internal_updated_at > 0 { + internal_updated_at_clause.push(format!("[{model_table}].internal_updated_at >= ?")); + } + // Collect columns with table prefix collect_columns(&model_table, "", model, &mut selections); } @@ -197,6 +204,18 @@ pub fn build_sql_query( count_query += &format!(" WHERE {}", where_clause); } + if !internal_updated_at_clause.is_empty() { + if where_clause.is_none() { + query += " WHERE "; + count_query += " WHERE "; + } else { + query += " AND "; + count_query += " AND "; + } + query += &format!(" {}", internal_updated_at_clause.join(" AND ")); + count_query += &format!(" {}", internal_updated_at_clause.join(" AND ")); + } + // Use custom order by if provided, otherwise default to event_id DESC if let Some(order_clause) = order_by { query += &format!(" ORDER BY {}", order_clause); @@ -494,6 +513,7 @@ mod tests { None, None, None, + 0, ) .unwrap(); diff --git a/crates/torii/grpc/proto/types.proto b/crates/torii/grpc/proto/types.proto index 70680462ad..1568128ca2 100644 --- a/crates/torii/grpc/proto/types.proto +++ b/crates/torii/grpc/proto/types.proto @@ -76,6 +76,7 @@ message Query { bool dont_include_hashed_keys = 4; repeated OrderBy order_by = 5; repeated string entity_models = 6; + uint64 internal_updated_at = 7; } message EventQuery { diff --git a/crates/torii/grpc/src/server/mod.rs b/crates/torii/grpc/src/server/mod.rs index 62c16a304b..8e36402d3c 100644 --- a/crates/torii/grpc/src/server/mod.rs +++ b/crates/torii/grpc/src/server/mod.rs @@ -27,6 +27,7 @@ use proto::world::{ use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use sqlx::prelude::FromRow; use sqlx::sqlite::SqliteRow; +use sqlx::types::chrono::{DateTime, Utc}; use sqlx::{Pool, Row, Sqlite}; use starknet::core::types::Felt; use starknet::providers::jsonrpc::HttpTransport; @@ -229,6 +230,7 @@ impl DojoWorld { dont_include_hashed_keys: bool, order_by: Option<&str>, entity_models: Vec, + internal_updated_at: u64, ) -> Result<(Vec, u32), Error> { self.query_by_hashed_keys( table, @@ -240,6 +242,7 @@ impl DojoWorld { dont_include_hashed_keys, order_by, entity_models, + internal_updated_at, ) .await } @@ -258,6 +261,7 @@ impl DojoWorld { row_events.iter().map(map_row_to_event).collect() } + #[allow(clippy::too_many_arguments)] async fn fetch_entities( &self, table: &str, @@ -266,6 +270,7 @@ impl DojoWorld { dont_include_hashed_keys: bool, order_by: Option<&str>, entity_models: Vec, + internal_updated_at: u64, ) -> Result, Error> { let entity_models = entity_models.iter().map(|tag| compute_selector_from_tag(tag)).collect::>(); @@ -354,9 +359,22 @@ impl DojoWorld { order_by, None, None, + internal_updated_at, )?; - let rows = sqlx::query(&entity_query).bind(models_str).fetch_all(&mut *tx).await?; + let mut query = sqlx::query(&entity_query).bind(models_str); + if internal_updated_at > 0 { + for _ in 0..schemas.len() { + let time = DateTime::::from_timestamp(internal_updated_at as i64, 0) + .ok_or_else(|| { + Error::from(QueryError::InvalidTimestamp(internal_updated_at)) + })? + .to_rfc3339(); + query = query.bind(time.clone()); + } + } + let rows = query.fetch_all(&mut *tx).await?; + let schemas = Arc::new(schemas); let group_entities: Result, Error> = rows @@ -430,6 +448,7 @@ impl DojoWorld { dont_include_hashed_keys: bool, order_by: Option<&str>, entity_models: Vec, + internal_updated_at: u64, ) -> Result<(Vec, u32), Error> { // TODO: use prepared statement for where clause let filter_ids = match hashed_keys { @@ -510,6 +529,7 @@ impl DojoWorld { dont_include_hashed_keys, order_by, entity_models, + internal_updated_at, ) .await?; Ok((entities, total_count)) @@ -527,6 +547,7 @@ impl DojoWorld { dont_include_hashed_keys: bool, order_by: Option<&str>, entity_models: Vec, + internal_updated_at: u64, ) -> Result<(Vec, u32), Error> { let keys_pattern = build_keys_pattern(keys_clause)?; @@ -655,6 +676,7 @@ impl DojoWorld { dont_include_hashed_keys, order_by, entity_models, + internal_updated_at, ) .await?; Ok((entities, total_count)) @@ -699,6 +721,7 @@ impl DojoWorld { dont_include_hashed_keys: bool, order_by: Option<&str>, entity_models: Vec, + internal_updated_at: u64, ) -> Result<(Vec, u32), Error> { let entity_models = entity_models.iter().map(|model| compute_selector_from_tag(model)).collect::>(); @@ -765,6 +788,7 @@ impl DojoWorld { order_by, limit, offset, + internal_updated_at, )?; let total_count = sqlx::query_scalar(&count_query) @@ -798,6 +822,7 @@ impl DojoWorld { dont_include_hashed_keys: bool, order_by: Option<&str>, entity_models: Vec, + internal_updated_at: u64, ) -> Result<(Vec, u32), Error> { let (where_clause, having_clause, join_clause, bind_values) = build_composite_clause(table, model_relation_table, &composite)?; @@ -868,6 +893,7 @@ impl DojoWorld { dont_include_hashed_keys, order_by, entity_models, + internal_updated_at, ) .await?; Ok((entities, total_count)) @@ -1036,6 +1062,7 @@ impl DojoWorld { query.dont_include_hashed_keys, order_by, query.entity_models, + query.internal_updated_at, ) .await? } @@ -1059,6 +1086,7 @@ impl DojoWorld { query.dont_include_hashed_keys, order_by, query.entity_models, + query.internal_updated_at, ) .await? } @@ -1073,6 +1101,7 @@ impl DojoWorld { query.dont_include_hashed_keys, order_by, query.entity_models, + query.internal_updated_at, ) .await? } @@ -1087,6 +1116,7 @@ impl DojoWorld { query.dont_include_hashed_keys, order_by, query.entity_models, + query.internal_updated_at, ) .await? } @@ -1101,6 +1131,7 @@ impl DojoWorld { query.dont_include_hashed_keys, order_by, query.entity_models, + query.internal_updated_at, ) .await? } diff --git a/crates/torii/grpc/src/server/tests/entities_test.rs b/crates/torii/grpc/src/server/tests/entities_test.rs index 8de8a92f55..58a795421f 100644 --- a/crates/torii/grpc/src/server/tests/entities_test.rs +++ b/crates/torii/grpc/src/server/tests/entities_test.rs @@ -143,6 +143,7 @@ async fn test_entities_queries(sequencer: &RunnerCtx) { false, None, vec![], + 0, ) .await .unwrap() diff --git a/crates/torii/grpc/src/types/mod.rs b/crates/torii/grpc/src/types/mod.rs index 13ff998900..35fe1897f7 100644 --- a/crates/torii/grpc/src/types/mod.rs +++ b/crates/torii/grpc/src/types/mod.rs @@ -105,7 +105,12 @@ pub struct Query { pub offset: u32, pub dont_include_hashed_keys: bool, pub order_by: Vec, + /// If the array is not empty, only the given models are retrieved. + /// All entities that don't have a model in the array are excluded. pub entity_models: Vec, + /// The internal updated at timestamp in seconds (unix timestamp) from which entities are + /// retrieved (inclusive). Use 0 to retrieve all entities. + pub internal_updated_at: u64, } #[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)] @@ -271,6 +276,7 @@ impl From for proto::types::Query { dont_include_hashed_keys: value.dont_include_hashed_keys, order_by: value.order_by.into_iter().map(|o| o.into()).collect(), entity_models: value.entity_models, + internal_updated_at: value.internal_updated_at, } } }