Skip to content

Commit

Permalink
adapter: Remove legacy builtin item migrations
Browse files Browse the repository at this point in the history
Previously, the adapter maintained two different implementations of the
builtin item migration framework. One that was used before 0dt and one
that was used with 0dt. The 0dt implementation still works when 0dt is
turned off, but we wanted to keep the legacy implementation around
while the 0dt implementation was still new. The 0dt implementation has
been around long enough in production to prove that it works. On the
other hand, the legacy implementation has not been tested in production
in a long time.

This commit removes the legacy implementation and always uses the new
implementation. This should reduce the maintenance burden of builtin
item migrations.

In the future, the builtin item migration framework should be
completely removed and replaced with `ALTER TABLE`.
  • Loading branch information
jkosh44 committed Jan 28, 2025
1 parent e91678f commit 9465baa
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 1,312 deletions.
135 changes: 9 additions & 126 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@ use uuid::Uuid;

// DO NOT add any more imports from `crate` outside of `crate::catalog`.
pub use crate::catalog::builtin_table_updates::BuiltinTableUpdate;
pub use crate::catalog::open::{
BuiltinMigrationMetadata, InitializeStateResult, OpenCatalogResult,
};
pub use crate::catalog::open::{InitializeStateResult, OpenCatalogResult};
pub use crate::catalog::state::CatalogState;
pub use crate::catalog::transact::{
DropObjectInfo, Op, ReplicaCreateDropReason, TransactionResult,
Expand Down Expand Up @@ -671,9 +669,9 @@ impl Catalog {
// debugging/testing.
let previous_ts = now().into();
let replica_size = &bootstrap_args.default_cluster_replica_size;
let read_only = false;
let OpenCatalogResult {
catalog,
storage_collections_to_drop: _,
migrated_storage_collections_0dt: _,
new_builtin_collections: _,
builtin_table_updates: _,
Expand All @@ -687,7 +685,7 @@ impl Catalog {
all_features: false,
build_info: &DUMMY_BUILD_INFO,
environment_id: environment_id.unwrap_or(EnvironmentId::for_tests()),
read_only: false,
read_only,
now,
boot_ts: previous_ts,
skip_migrations: true,
Expand All @@ -705,7 +703,10 @@ impl Catalog {
aws_privatelink_availability_zones: None,
http_host_name: None,
connection_context: ConnectionContext::for_tests(secrets_reader),
builtin_item_migration_config: BuiltinItemMigrationConfig::Legacy,
builtin_item_migration_config: BuiltinItemMigrationConfig {
persist_client: persist_client.clone(),
read_only,
},
persist_client,
enable_expression_cache_override,
enable_0dt_deployment: true,
Expand Down Expand Up @@ -2267,10 +2268,7 @@ mod tests {
use tokio_postgres::NoTls;
use uuid::Uuid;

use mz_catalog::builtin::{
Builtin, BuiltinType, UnsafeBuiltinTableFingerprintWhitespace, BUILTINS,
UNSAFE_DO_NOT_CALL_THIS_IN_PRODUCTION_BUILTIN_TABLE_FINGERPRINT_WHITESPACE,
};
use mz_catalog::builtin::{Builtin, BuiltinType, BUILTINS};
use mz_catalog::durable::{test_bootstrap_args, CatalogError, DurableCatalogError, FenceError};
use mz_catalog::SYSTEM_CONN_ID;
use mz_controller_types::{ClusterId, ReplicaId};
Expand All @@ -2285,9 +2283,7 @@ mod tests {
CatalogItemId, Datum, GlobalId, RelationType, RelationVersionSelector, RowArena,
ScalarType, Timestamp,
};
use mz_sql::catalog::{
BuiltinsConfig, CatalogDatabase, CatalogSchema, CatalogType, SessionCatalog,
};
use mz_sql::catalog::{BuiltinsConfig, CatalogSchema, CatalogType, SessionCatalog};
use mz_sql::func::{Func, FuncImpl, Operation, OP_IMPLS};
use mz_sql::names::{
self, DatabaseId, ItemQualifiers, ObjectId, PartialItemName, QualifiedItemName,
Expand Down Expand Up @@ -3507,119 +3503,6 @@ mod tests {
.await
}

#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
async fn test_builtin_migrations() {
let persist_client = PersistClient::new_for_tests().await;
let bootstrap_args = test_bootstrap_args();
let organization_id = Uuid::new_v4();
let mv_name = "mv";
let (mz_tables_id, mv_id) = {
let mut catalog = Catalog::open_debug_catalog(
persist_client.clone(),
organization_id.clone(),
&bootstrap_args,
)
.await
.expect("unable to open debug catalog");

// Create a materialized view over `mz_tables`.
let database_id = DatabaseId::User(1);
let database = catalog.get_database(&database_id);
let database_name = database.name();
let schemas = database.schemas();
let schema = schemas.first().expect("must have at least one schema");
let schema_spec = schema.id().clone();
let schema_name = &schema.name().schema;
let database_spec = ResolvedDatabaseSpecifier::Id(database_id);
let id_ts = catalog.storage().await.current_upper().await;
let (mv_id, mv_gid) = catalog
.allocate_user_id(id_ts)
.await
.expect("unable to allocate id");
let mv = catalog
.state()
.deserialize_item(
mv_gid,
&format!("CREATE MATERIALIZED VIEW {database_name}.{schema_name}.{mv_name} AS SELECT name FROM mz_tables"),
&BTreeMap::new(),
&mut LocalExpressionCache::Closed,
None,
)
.expect("unable to deserialize item");
let commit_ts = catalog.current_upper().await;
catalog
.transact(
None,
commit_ts,
None,
vec![Op::CreateItem {
id: mv_id,
name: QualifiedItemName {
qualifiers: ItemQualifiers {
database_spec,
schema_spec,
},
item: mv_name.to_string(),
},
item: mv,
owner_id: MZ_SYSTEM_ROLE_ID,
}],
)
.await
.expect("unable to transact");

let mz_tables_id = catalog
.entries()
.find(|entry| &entry.name.item == "mz_tables" && entry.is_table())
.expect("mz_tables doesn't exist")
.id();
let check_mv_id = catalog
.entries()
.find(|entry| &entry.name.item == mv_name && entry.is_materialized_view())
.unwrap_or_else(|| panic!("{mv_name} doesn't exist"))
.id();
assert_eq!(check_mv_id, mv_id);
catalog.expire().await;
(mz_tables_id, mv_id)
};
// Forcibly migrate all tables.
{
let mut guard =
UNSAFE_DO_NOT_CALL_THIS_IN_PRODUCTION_BUILTIN_TABLE_FINGERPRINT_WHITESPACE
.lock()
.expect("lock poisoned");
*guard = Some((
UnsafeBuiltinTableFingerprintWhitespace::All,
"\n".to_string(),
));
}
{
let catalog =
Catalog::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
.await
.expect("unable to open debug catalog");

let new_mz_tables_id = catalog
.entries()
.find(|entry| &entry.name.item == "mz_tables" && entry.is_table())
.expect("mz_tables doesn't exist")
.id();
// Assert that the table was migrated and got a new ID.
assert_ne!(new_mz_tables_id, mz_tables_id);

let new_mv_id = catalog
.entries()
.find(|entry| &entry.name.item == mv_name && entry.is_materialized_view())
.unwrap_or_else(|| panic!("{mv_name} doesn't exist"))
.id();
// Assert that the materialized view was migrated and got a new ID.
assert_ne!(new_mv_id, mv_id);

catalog.expire().await;
}
}

#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
async fn test_multi_subscriber_catalog() {
Expand Down
Loading

0 comments on commit 9465baa

Please sign in to comment.