diff --git a/optd-persistent/src/cost_model/catalog/mock_catalog.rs b/optd-persistent/src/cost_model/catalog/mock_catalog.rs index 7d9b8aa..997feb7 100644 --- a/optd-persistent/src/cost_model/catalog/mock_catalog.rs +++ b/optd-persistent/src/cost_model/catalog/mock_catalog.rs @@ -1,3 +1,6 @@ +use sea_orm::prelude::Json; +use serde_json::json; + use crate::cost_model::interface::{AttrType, IndexType, StatType}; pub struct MockDatabaseMetadata { @@ -31,7 +34,7 @@ pub struct MockStatistic { pub id: i32, pub stat_type: i32, // TODO(lanlou): what should I use for the value type? - pub stat_value: String, + pub stat_value: Json, pub attr_ids: Vec, pub table_id: Option, pub name: String, @@ -110,7 +113,7 @@ impl MockCatalog { MockStatistic { id: 1, stat_type: StatType::Count as i32, - stat_value: "100".to_string(), + stat_value: json!(100), attr_ids: vec![1], table_id: None, name: "CountAttr1".to_string(), @@ -118,7 +121,7 @@ impl MockCatalog { MockStatistic { id: 2, stat_type: StatType::Count as i32, - stat_value: "200".to_string(), + stat_value: json!(200), attr_ids: vec![2], table_id: None, name: "CountAttr2".to_string(), @@ -126,7 +129,7 @@ impl MockCatalog { MockStatistic { id: 3, stat_type: StatType::Count as i32, - stat_value: "300".to_string(), + stat_value: json!(300), attr_ids: vec![], table_id: Some(1), name: "Table1Count".to_string(), diff --git a/optd-persistent/src/cost_model/interface.rs b/optd-persistent/src/cost_model/interface.rs index d8dfcdf..4f7f953 100644 --- a/optd-persistent/src/cost_model/interface.rs +++ b/optd-persistent/src/cost_model/interface.rs @@ -36,6 +36,14 @@ pub enum ConstraintType { } pub enum StatType { + // TODO(lanlou): I am not sure which way to represent the type is better. + // 1. `Count` means row count, (i.e. record count), and it only applies to + // table statistics. In this way, we should introduce `NotNullCount` for attribute + // statistics to indicate the number of non-null values. + // 2. `Count` means the number of non-null values, and it applies to both table + // and attribute statistics. (Will a table have a record with null values in all + // attributes?) + // For now, we just use the second way for simplicity. Count, Cardinality, Min, @@ -74,11 +82,7 @@ pub trait CostModelStorageLayer { data: String, ) -> StorageResult; - async fn update_stats_from_catalog( - &self, - c: CatalogSource, - epoch_id: Self::EpochId, - ) -> StorageResult<()>; + async fn update_stats_from_catalog(&self, c: CatalogSource) -> StorageResult; async fn update_stats( &mut self, diff --git a/optd-persistent/src/cost_model/orm.rs b/optd-persistent/src/cost_model/orm.rs index 07ddbdb..ee94ed8 100644 --- a/optd-persistent/src/cost_model/orm.rs +++ b/optd-persistent/src/cost_model/orm.rs @@ -57,11 +57,25 @@ impl CostModelStorageLayer for BackendManager { Ok(insert_res.last_insert_id) } - async fn update_stats_from_catalog( - &self, - c: CatalogSource, - epoch_id: Self::EpochId, - ) -> StorageResult<()> { + /// TODO: documentation + async fn update_stats_from_catalog(&self, c: CatalogSource) -> StorageResult { + let transaction = self.db.begin().await?; + let source = match c { + CatalogSource::Mock => "Mock", + CatalogSource::Iceberg() => "Iceberg", + }; + let new_event = event::ActiveModel { + source_variant: sea_orm::ActiveValue::Set(source.to_string()), + timestamp: sea_orm::ActiveValue::Set(Utc::now()), + data: sea_orm::ActiveValue::Set(sea_orm::JsonValue::String( + "Update stats from catalog".to_string(), + )), + ..Default::default() + }; + let epoch_id = Event::insert(new_event) + .exec(&transaction) + .await? + .last_insert_id; match c { CatalogSource::Mock => { let mock_catalog = MockCatalog::new(); @@ -72,7 +86,7 @@ impl CostModelStorageLayer for BackendManager { ..Default::default() } })) - .exec(&self.db) + .exec(&transaction) .await?; NamespaceMetadata::insert_many(mock_catalog.namespaces.iter().map(|namespace| { namespace_metadata::ActiveModel { @@ -82,7 +96,7 @@ impl CostModelStorageLayer for BackendManager { ..Default::default() } })) - .exec(&self.db) + .exec(&transaction) .await?; TableMetadata::insert_many(mock_catalog.tables.iter().map(|table| { table_metadata::ActiveModel { @@ -92,7 +106,7 @@ impl CostModelStorageLayer for BackendManager { ..Default::default() } })) - .exec(&self.db) + .exec(&transaction) .await?; Attribute::insert_many(mock_catalog.attributes.iter().map(|attr| { attribute::ActiveModel { @@ -107,7 +121,7 @@ impl CostModelStorageLayer for BackendManager { ..Default::default() } })) - .exec(&self.db) + .exec(&transaction) .await?; Statistic::insert_many(mock_catalog.statistics.iter().map(|stat| { statistic::ActiveModel { @@ -122,7 +136,29 @@ impl CostModelStorageLayer for BackendManager { ..Default::default() } })) - .exec(&self.db) + .exec(&transaction) + .await?; + VersionedStatistic::insert_many(mock_catalog.statistics.iter().map(|stat| { + versioned_statistic::ActiveModel { + epoch_id: sea_orm::ActiveValue::Set(epoch_id), + statistic_id: sea_orm::ActiveValue::Set(stat.id), + statistic_value: sea_orm::ActiveValue::Set(stat.stat_value.clone()), + ..Default::default() + } + })) + .exec(&transaction) + .await?; + StatisticToAttributeJunction::insert_many(mock_catalog.statistics.iter().flat_map( + |stat| { + stat.attr_ids.iter().map(move |attr_id| { + statistic_to_attribute_junction::ActiveModel { + statistic_id: sea_orm::ActiveValue::Set(stat.id), + attribute_id: sea_orm::ActiveValue::Set(*attr_id), + } + }) + }, + )) + .exec(&transaction) .await?; IndexMetadata::insert_many( mock_catalog @@ -146,12 +182,13 @@ impl CostModelStorageLayer for BackendManager { ..Default::default() }), ) - .exec(&self.db) + .exec(&transaction) .await?; - Ok(()) } CatalogSource::Iceberg() => todo!(), } + transaction.commit().await?; + Ok(epoch_id) } /* Update the statistics in the database. @@ -562,15 +599,28 @@ mod tests { let mut binding = super::BackendManager::new(Some(&database_url)).await; let backend_manager = binding.as_mut().unwrap(); let res = backend_manager - .update_stats_from_catalog(super::CatalogSource::Mock, 1) + .update_stats_from_catalog(super::CatalogSource::Mock) .await; println!("{:?}", res); assert!(res.is_ok()); + let epoch_id = res.unwrap(); + assert_eq!(epoch_id, 1); let lookup_res = Statistic::find().all(&backend_manager.db).await.unwrap(); println!("{:?}", lookup_res); assert_eq!(lookup_res.len(), 3); + let stat_res = backend_manager + .get_stats_for_table(1, StatType::Count as i32, Some(epoch_id)) + .await; + assert!(stat_res.is_ok()); + assert_eq!(stat_res.unwrap().unwrap(), json!(300)); + let stat_res = backend_manager + .get_stats_for_attr([2].to_vec(), StatType::Count as i32, None) + .await; + assert!(stat_res.is_ok()); + assert_eq!(stat_res.unwrap().unwrap(), json!(200)); + remove_db_file(DATABASE_FILE); }