Skip to content

Commit

Permalink
catalog: Fix introspection source index migration (#31204)
Browse files Browse the repository at this point in the history
In a previous version of Materialize, the legacy builtin item migration
framework would allocate new IDs for introspection source indexes when
they needed to be migrated. As of
14cc787, introspection source index IDs
are deterministic, so even when they're migrated using the legacy
builtin item migration framework, they don't receive new IDs.

This commit updates the legacy implementation of builtin item migrations
for introspection sources indexes to account for the deterministic IDs.

Fixes #MaterializeInc/database-issues/issues/8921
  • Loading branch information
jkosh44 authored Jan 28, 2025
1 parent ac7fbd2 commit e362cc9
Showing 1 changed file with 61 additions and 43 deletions.
104 changes: 61 additions & 43 deletions src/adapter/src/catalog/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,25 +472,31 @@ impl CatalogState {

match diff {
StateDiff::Addition => {
if let Some(entry) = retractions
if let Some(mut entry) = retractions
.introspection_source_indexes
.remove(&introspection_source_index.item_id)
{
// Introspection source indexes can only be updated through the builtin
// migration process, which allocates new IDs for each index.
panic!(
"cannot update introspection source indexes in place, entry: {:?}, durable: {:?}",
entry, introspection_source_index
)
// This should only happen during startup as a result of builtin migrations. We
// create a new index item and replace the old one with it.
let (index_name, index) = self.create_introspection_source_index(
introspection_source_index.cluster_id,
log,
introspection_source_index.index_id,
);
assert_eq!(entry.id, introspection_source_index.item_id);
assert_eq!(entry.oid, introspection_source_index.oid);
assert_eq!(entry.name, index_name);
entry.item = index;
self.insert_entry(entry);
} else {
self.insert_introspection_source_index(
introspection_source_index.cluster_id,
log,
introspection_source_index.item_id,
introspection_source_index.index_id,
introspection_source_index.oid,
);
}

self.insert_introspection_source_index(
introspection_source_index.cluster_id,
log,
introspection_source_index.item_id,
introspection_source_index.index_id,
introspection_source_index.oid,
);
}
StateDiff::Retraction => {
let entry = self.drop_item(introspection_source_index.item_id);
Expand Down Expand Up @@ -1684,6 +1690,24 @@ impl CatalogState {
global_id: GlobalId,
oid: u32,
) {
let (index_name, index) =
self.create_introspection_source_index(cluster_id, log, global_id);
self.insert_item(
item_id,
oid,
index_name,
index,
MZ_SYSTEM_ROLE_ID,
PrivilegeMap::default(),
);
}

fn create_introspection_source_index(
&self,
cluster_id: ClusterId,
log: &'static BuiltinLog,
global_id: GlobalId,
) -> (QualifiedItemName, CatalogItem) {
let source_name = FullItemName {
database: RawDatabaseSpecifier::Ambient,
schema: log.schema.into(),
Expand All @@ -1700,35 +1724,29 @@ impl CatalogState {
index_name = self.find_available_name(index_name, &SYSTEM_CONN_ID);
let index_item_name = index_name.item.clone();
let (log_item_id, log_global_id) = self.resolve_builtin_log(log);
self.insert_item(
item_id,
oid,
index_name,
CatalogItem::Index(Index {
global_id,
on: log_global_id,
keys: log
.variant
.index_by()
.into_iter()
.map(MirScalarExpr::Column)
.collect(),
create_sql: index_sql(
index_item_name,
cluster_id,
source_name,
&log.variant.desc(),
&log.variant.index_by(),
),
conn_id: None,
resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
let index = CatalogItem::Index(Index {
global_id,
on: log_global_id,
keys: log
.variant
.index_by()
.into_iter()
.map(MirScalarExpr::Column)
.collect(),
create_sql: index_sql(
index_item_name,
cluster_id,
is_retained_metrics_object: false,
custom_logical_compaction_window: None,
}),
MZ_SYSTEM_ROLE_ID,
PrivilegeMap::default(),
);
source_name,
&log.variant.desc(),
&log.variant.index_by(),
),
conn_id: None,
resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
cluster_id,
is_retained_metrics_object: false,
custom_logical_compaction_window: None,
});
(index_name, index)
}

/// Insert system configuration `name` with `value`.
Expand Down

0 comments on commit e362cc9

Please sign in to comment.