From 9c3d9e2c91e20f0fc5e9cf8d94869cefc24b11d1 Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Tue, 28 Jan 2025 08:46:13 -0500 Subject: [PATCH] catalog: Fix introspection source index migration (#31204) In a previous version of Materialize, the legacy builtin item migration framework would allocate new IDs for introspection source indexes when they needed to be migrated. As of 14cc787db3b2cefddc4b356a76d238135271ec94, introspection source index IDs are deterministic, so even when they're migrated using the legacy builtin item migration framework, they don't receive new IDs. This commit updates the legacy implementation of builtin item migrations for introspection sources indexes to account for the deterministic IDs. Fixes #MaterializeInc/database-issues/issues/8921 --- src/adapter/src/catalog/apply.rs | 104 ++++++++++++++++++------------- 1 file changed, 61 insertions(+), 43 deletions(-) diff --git a/src/adapter/src/catalog/apply.rs b/src/adapter/src/catalog/apply.rs index 9db55bc11d605..745cf082bfe81 100644 --- a/src/adapter/src/catalog/apply.rs +++ b/src/adapter/src/catalog/apply.rs @@ -472,25 +472,31 @@ impl CatalogState { match diff { StateDiff::Addition => { - if let Some(entry) = retractions + if let Some(mut entry) = retractions .introspection_source_indexes .remove(&introspection_source_index.item_id) { - // Introspection source indexes can only be updated through the builtin - // migration process, which allocates new IDs for each index. - panic!( - "cannot update introspection source indexes in place, entry: {:?}, durable: {:?}", - entry, introspection_source_index - ) + // This should only happen during startup as a result of builtin migrations. We + // create a new index item and replace the old one with it. + let (index_name, index) = self.create_introspection_source_index( + introspection_source_index.cluster_id, + log, + introspection_source_index.index_id, + ); + assert_eq!(entry.id, introspection_source_index.item_id); + assert_eq!(entry.oid, introspection_source_index.oid); + assert_eq!(entry.name, index_name); + entry.item = index; + self.insert_entry(entry); + } else { + self.insert_introspection_source_index( + introspection_source_index.cluster_id, + log, + introspection_source_index.item_id, + introspection_source_index.index_id, + introspection_source_index.oid, + ); } - - self.insert_introspection_source_index( - introspection_source_index.cluster_id, - log, - introspection_source_index.item_id, - introspection_source_index.index_id, - introspection_source_index.oid, - ); } StateDiff::Retraction => { let entry = self.drop_item(introspection_source_index.item_id); @@ -1684,6 +1690,24 @@ impl CatalogState { global_id: GlobalId, oid: u32, ) { + let (index_name, index) = + self.create_introspection_source_index(cluster_id, log, global_id); + self.insert_item( + item_id, + oid, + index_name, + index, + MZ_SYSTEM_ROLE_ID, + PrivilegeMap::default(), + ); + } + + fn create_introspection_source_index( + &self, + cluster_id: ClusterId, + log: &'static BuiltinLog, + global_id: GlobalId, + ) -> (QualifiedItemName, CatalogItem) { let source_name = FullItemName { database: RawDatabaseSpecifier::Ambient, schema: log.schema.into(), @@ -1700,35 +1724,29 @@ impl CatalogState { index_name = self.find_available_name(index_name, &SYSTEM_CONN_ID); let index_item_name = index_name.item.clone(); let (log_item_id, log_global_id) = self.resolve_builtin_log(log); - self.insert_item( - item_id, - oid, - index_name, - CatalogItem::Index(Index { - global_id, - on: log_global_id, - keys: log - .variant - .index_by() - .into_iter() - .map(MirScalarExpr::Column) - .collect(), - create_sql: index_sql( - index_item_name, - cluster_id, - source_name, - &log.variant.desc(), - &log.variant.index_by(), - ), - conn_id: None, - resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(), + let index = CatalogItem::Index(Index { + global_id, + on: log_global_id, + keys: log + .variant + .index_by() + .into_iter() + .map(MirScalarExpr::Column) + .collect(), + create_sql: index_sql( + index_item_name, cluster_id, - is_retained_metrics_object: false, - custom_logical_compaction_window: None, - }), - MZ_SYSTEM_ROLE_ID, - PrivilegeMap::default(), - ); + source_name, + &log.variant.desc(), + &log.variant.index_by(), + ), + conn_id: None, + resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(), + cluster_id, + is_retained_metrics_object: false, + custom_logical_compaction_window: None, + }); + (index_name, index) } /// Insert system configuration `name` with `value`.