Skip to content

Commit

Permalink
fix: fix update_stats_from_catalog and improve the test
Browse files Browse the repository at this point in the history
  • Loading branch information
lanlou1554 committed Nov 9, 2024
1 parent d15a7e2 commit 02407a5
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 22 deletions.
11 changes: 7 additions & 4 deletions optd-persistent/src/cost_model/catalog/mock_catalog.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use sea_orm::prelude::Json;
use serde_json::json;

use crate::cost_model::interface::{AttrType, IndexType, StatType};

pub struct MockDatabaseMetadata {
Expand Down Expand Up @@ -31,7 +34,7 @@ pub struct MockStatistic {
pub id: i32,
pub stat_type: i32,
// TODO(lanlou): what should I use for the value type?
pub stat_value: String,
pub stat_value: Json,
pub attr_ids: Vec<i32>,
pub table_id: Option<i32>,
pub name: String,
Expand Down Expand Up @@ -110,23 +113,23 @@ impl MockCatalog {
MockStatistic {
id: 1,
stat_type: StatType::Count as i32,
stat_value: "100".to_string(),
stat_value: json!(100),
attr_ids: vec![1],
table_id: None,
name: "CountAttr1".to_string(),
},
MockStatistic {
id: 2,
stat_type: StatType::Count as i32,
stat_value: "200".to_string(),
stat_value: json!(200),
attr_ids: vec![2],
table_id: None,
name: "CountAttr2".to_string(),
},
MockStatistic {
id: 3,
stat_type: StatType::Count as i32,
stat_value: "300".to_string(),
stat_value: json!(300),
attr_ids: vec![],
table_id: Some(1),
name: "Table1Count".to_string(),
Expand Down
14 changes: 9 additions & 5 deletions optd-persistent/src/cost_model/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ pub enum ConstraintType {
}

pub enum StatType {
// TODO(lanlou): I am not sure which way to represent the type is better.
// 1. `Count` means row count, (i.e. record count), and it only applies to
// table statistics. In this way, we should introduce `NotNullCount` for attribute
// statistics to indicate the number of non-null values.
// 2. `Count` means the number of non-null values, and it applies to both table
// and attribute statistics. (Will a table have a record with null values in all
// attributes?)
// For now, we just use the second way for simplicity.
Count,
Cardinality,
Min,
Expand Down Expand Up @@ -74,11 +82,7 @@ pub trait CostModelStorageLayer {
data: String,
) -> StorageResult<Self::EpochId>;

async fn update_stats_from_catalog(
&self,
c: CatalogSource,
epoch_id: Self::EpochId,
) -> StorageResult<()>;
async fn update_stats_from_catalog(&self, c: CatalogSource) -> StorageResult<Self::EpochId>;

async fn update_stats(
&mut self,
Expand Down
76 changes: 63 additions & 13 deletions optd-persistent/src/cost_model/orm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,25 @@ impl CostModelStorageLayer for BackendManager {
Ok(insert_res.last_insert_id)
}

async fn update_stats_from_catalog(
&self,
c: CatalogSource,
epoch_id: Self::EpochId,
) -> StorageResult<()> {
/// TODO: documentation
async fn update_stats_from_catalog(&self, c: CatalogSource) -> StorageResult<Self::EpochId> {
let transaction = self.db.begin().await?;
let source = match c {
CatalogSource::Mock => "Mock",
CatalogSource::Iceberg() => "Iceberg",
};
let new_event = event::ActiveModel {
source_variant: sea_orm::ActiveValue::Set(source.to_string()),
timestamp: sea_orm::ActiveValue::Set(Utc::now()),
data: sea_orm::ActiveValue::Set(sea_orm::JsonValue::String(
"Update stats from catalog".to_string(),
)),
..Default::default()
};
let epoch_id = Event::insert(new_event)
.exec(&transaction)
.await?
.last_insert_id;
match c {
CatalogSource::Mock => {
let mock_catalog = MockCatalog::new();
Expand All @@ -72,7 +86,7 @@ impl CostModelStorageLayer for BackendManager {
..Default::default()
}
}))
.exec(&self.db)
.exec(&transaction)
.await?;
NamespaceMetadata::insert_many(mock_catalog.namespaces.iter().map(|namespace| {
namespace_metadata::ActiveModel {
Expand All @@ -82,7 +96,7 @@ impl CostModelStorageLayer for BackendManager {
..Default::default()
}
}))
.exec(&self.db)
.exec(&transaction)
.await?;
TableMetadata::insert_many(mock_catalog.tables.iter().map(|table| {
table_metadata::ActiveModel {
Expand All @@ -92,7 +106,7 @@ impl CostModelStorageLayer for BackendManager {
..Default::default()
}
}))
.exec(&self.db)
.exec(&transaction)
.await?;
Attribute::insert_many(mock_catalog.attributes.iter().map(|attr| {
attribute::ActiveModel {
Expand All @@ -107,7 +121,7 @@ impl CostModelStorageLayer for BackendManager {
..Default::default()
}
}))
.exec(&self.db)
.exec(&transaction)
.await?;
Statistic::insert_many(mock_catalog.statistics.iter().map(|stat| {
statistic::ActiveModel {
Expand All @@ -122,7 +136,29 @@ impl CostModelStorageLayer for BackendManager {
..Default::default()
}
}))
.exec(&self.db)
.exec(&transaction)
.await?;
VersionedStatistic::insert_many(mock_catalog.statistics.iter().map(|stat| {
versioned_statistic::ActiveModel {
epoch_id: sea_orm::ActiveValue::Set(epoch_id),
statistic_id: sea_orm::ActiveValue::Set(stat.id),
statistic_value: sea_orm::ActiveValue::Set(stat.stat_value.clone()),
..Default::default()
}
}))
.exec(&transaction)
.await?;
StatisticToAttributeJunction::insert_many(mock_catalog.statistics.iter().flat_map(
|stat| {
stat.attr_ids.iter().map(move |attr_id| {
statistic_to_attribute_junction::ActiveModel {
statistic_id: sea_orm::ActiveValue::Set(stat.id),
attribute_id: sea_orm::ActiveValue::Set(*attr_id),
}
})
},
))
.exec(&transaction)
.await?;
IndexMetadata::insert_many(
mock_catalog
Expand All @@ -146,12 +182,13 @@ impl CostModelStorageLayer for BackendManager {
..Default::default()
}),
)
.exec(&self.db)
.exec(&transaction)
.await?;
Ok(())
}
CatalogSource::Iceberg() => todo!(),
}
transaction.commit().await?;
Ok(epoch_id)
}

/* Update the statistics in the database.
Expand Down Expand Up @@ -562,15 +599,28 @@ mod tests {
let mut binding = super::BackendManager::new(Some(&database_url)).await;
let backend_manager = binding.as_mut().unwrap();
let res = backend_manager
.update_stats_from_catalog(super::CatalogSource::Mock, 1)
.update_stats_from_catalog(super::CatalogSource::Mock)
.await;
println!("{:?}", res);
assert!(res.is_ok());
let epoch_id = res.unwrap();
assert_eq!(epoch_id, 1);

let lookup_res = Statistic::find().all(&backend_manager.db).await.unwrap();
println!("{:?}", lookup_res);
assert_eq!(lookup_res.len(), 3);

let stat_res = backend_manager
.get_stats_for_table(1, StatType::Count as i32, Some(epoch_id))
.await;
assert!(stat_res.is_ok());
assert_eq!(stat_res.unwrap().unwrap(), json!(300));
let stat_res = backend_manager
.get_stats_for_attr([2].to_vec(), StatType::Count as i32, None)
.await;
assert!(stat_res.is_ok());
assert_eq!(stat_res.unwrap().unwrap(), json!(200));

remove_db_file(DATABASE_FILE);
}

Expand Down

0 comments on commit 02407a5

Please sign in to comment.