Skip to content

Commit

Permalink
catalog: Fix introspection source index migration
Browse files Browse the repository at this point in the history
In a previous version of Materialize, the legacy builtin item migration
framework would allocate new IDs for introspection source indexes when
they needed to be migrated. As of
14cc787, introspection source index
IDs are deterministic, so even when they're migrated using the legacy
builtin item migration framework, they don't receive new IDs.

This commit updates the legacy implementation of builtin item
migrations for introspection sources indexes to account for the
deterministic IDs.

Fixes #MaterializeInc/database-issues/issues/8921
  • Loading branch information
jkosh44 committed Jan 27, 2025
1 parent 4242bb3 commit aacef7b
Showing 1 changed file with 61 additions and 43 deletions.
104 changes: 61 additions & 43 deletions src/adapter/src/catalog/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,25 +472,31 @@ impl CatalogState {

match diff {
StateDiff::Addition => {
if let Some(entry) = retractions
if let Some(mut entry) = retractions
.introspection_source_indexes
.remove(&introspection_source_index.item_id)
{
// Introspection source indexes can only be updated through the builtin
// migration process, which allocates new IDs for each index.
panic!(
"cannot update introspection source indexes in place, entry: {:?}, durable: {:?}",
entry, introspection_source_index
)
// This should only happen during startup as a result of builtin migrations. We
// create a new index item and replace the old one with it.
let (index_name, index) = self.create_introspection_source_index(
introspection_source_index.cluster_id,
log,
introspection_source_index.index_id,
);
assert_eq!(entry.id, introspection_source_index.item_id);
assert_eq!(entry.oid, introspection_source_index.oid);
assert_eq!(entry.name, index_name);
entry.item = index;
self.insert_entry(entry);
} else {
self.insert_introspection_source_index(
introspection_source_index.cluster_id,
log,
introspection_source_index.item_id,
introspection_source_index.index_id,
introspection_source_index.oid,
);
}

self.insert_introspection_source_index(
introspection_source_index.cluster_id,
log,
introspection_source_index.item_id,
introspection_source_index.index_id,
introspection_source_index.oid,
);
}
StateDiff::Retraction => {
let entry = self.drop_item(introspection_source_index.item_id);
Expand Down Expand Up @@ -1684,6 +1690,24 @@ impl CatalogState {
global_id: GlobalId,
oid: u32,
) {
let (index_name, index) =
self.create_introspection_source_index(cluster_id, log, global_id);
self.insert_item(
item_id,
oid,
index_name,
index,
MZ_SYSTEM_ROLE_ID,
PrivilegeMap::default(),
);
}

fn create_introspection_source_index(
&self,
cluster_id: ClusterId,
log: &'static BuiltinLog,
global_id: GlobalId,
) -> (QualifiedItemName, CatalogItem) {
let source_name = FullItemName {
database: RawDatabaseSpecifier::Ambient,
schema: log.schema.into(),
Expand All @@ -1700,35 +1724,29 @@ impl CatalogState {
index_name = self.find_available_name(index_name, &SYSTEM_CONN_ID);
let index_item_name = index_name.item.clone();
let (log_item_id, log_global_id) = self.resolve_builtin_log(log);
self.insert_item(
item_id,
oid,
index_name,
CatalogItem::Index(Index {
global_id,
on: log_global_id,
keys: log
.variant
.index_by()
.into_iter()
.map(MirScalarExpr::Column)
.collect(),
create_sql: index_sql(
index_item_name,
cluster_id,
source_name,
&log.variant.desc(),
&log.variant.index_by(),
),
conn_id: None,
resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
let index = CatalogItem::Index(Index {
global_id,
on: log_global_id,
keys: log
.variant
.index_by()
.into_iter()
.map(MirScalarExpr::Column)
.collect(),
create_sql: index_sql(
index_item_name,
cluster_id,
is_retained_metrics_object: false,
custom_logical_compaction_window: None,
}),
MZ_SYSTEM_ROLE_ID,
PrivilegeMap::default(),
);
source_name,
&log.variant.desc(),
&log.variant.index_by(),
),
conn_id: None,
resolved_ids: [(log_item_id, log_global_id)].into_iter().collect(),
cluster_id,
is_retained_metrics_object: false,
custom_logical_compaction_window: None,
});
(index_name, index)
}

/// Insert system configuration `name` with `value`.
Expand Down

0 comments on commit aacef7b

Please sign in to comment.