Skip to content

Commit

Permalink
alter_table: Durable Catalog Migration (#30163)
Browse files Browse the repository at this point in the history
This PR re-keys everything in the durable Catalog on `CatalogItemId`
instead of `GlobalId`, and shims everything above the durable Catalog
objects using two new methods `GlobalId::to_item_id()`, and
`CatalogItemId::to_global_id()`.

Now every item in the Catalog has the following fields:

* `id: CatalogItemId`, a stable _external_ identifier (i.e. in Catalog
tables like `mz_objects`) that is the same for the entire lifetime of
the object.
* `global_id: GlobalId`, a stable _internal_ identifier for this object
that can be used by storage and compute.
* `extra_versions: BTreeMap<Version, GlobalId>`, mapping of versions of
an object to the `GlobalId`s used by compute and storage to refer to a
specific version.

This de-coupling of `CatalogItemId` and `GlobalId` achieves two things:

1. Externally objects have a stable identifier, even as they are
`ALTER`-ed. This is required for external tools like dbt and Terraform
that track objects by ID.
2. Internally a `GlobalId` always refers to the same `RelationDesc` +
Persist Shard, this maintains the concept from the formalism that a
`GlobalId` is never re-assigned to a new pTVC.

The implementation of `ALTER TABLE ... ADD COLUMN ...` will thus
allocate a new `GlobalId` which will immutably refer to that specific
version of the table.

#### Other Changes

Along with `ItemKey` and `ItemValue` I updated the following Catalog
types:

* `GidMappingValue`: replaced the `id` field with `catalog_id` and
`global_id`, used to identify builtin catalog objects.
* `ClusterIntrospectionSourceIndexValue`: replaced the `index_id` field
with `catalog_id` and `global_id`, used to identify builtin
introspection source indexes.
* `CommentKey`: replaced `GlobalId` with `CatalogItemId`, used to
identify comments on objects.
* `SourceReferencesKey`: replaced `GlobalId` with `CatalogItemId`, used
to track references between a Source and the subsources/tables that read
from it.

#### Partial Progress

Today `CatalogItemId` is 1:1 with `GlobalId`, this allows us to
implement the `to_item_id` and `to_global_id` shim methods. Until we
support `ALTER TABLE ... ADD COLUMN ...` we can freely convert between
the two. This allows us to break this change up among multiple PRs
instead of a single massive change.

#### Initial Migration

Because `CatalogItemId` and `GlobalId` are currently 1:1, this allows us
to migrate the raw values of the IDs, e.g. `GlobalId::User(42)` becomes
`CatalogItemId::User(42)`, which is exactly what this PR does.

### Motivation

Progress towards
MaterializeInc/database-issues#8233
Implements changes described in
#30019

### Tips for reviewer

This PR is split into two commits:

1. The durable catalog migration, and updates to durable Catalog
objects. I would appreciate the most thorough reviews on this commit.
4. Shimming all calling code to convert between `CatalogItemId` and
`GlobalId`.

### Checklist

- [ ] This PR has adequate test coverage / QA involvement has been duly
considered. ([trigger-ci for additional test/nightly
runs](https://trigger-ci.dev.materialize.com/))
- [ ] This PR has an associated up-to-date [design
doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md),
is a design doc
([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)),
or is sufficiently small to not require a design.
  <!-- Reference the design in the description. -->
- [ ] If this PR evolves [an existing `$T ⇔ Proto$T`
mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md)
(possibly in a backwards-incompatible way), then it is tagged with a
`T-proto` label.
- [ ] If this PR will require changes to cloud orchestration or tests,
there is a companion cloud PR to account for those changes that is
tagged with the release-blocker label
([example](MaterializeInc/cloud#5021)).
<!-- Ask in #team-cloud on Slack if you need help preparing the cloud
PR. -->
- [ ] If this PR includes major [user-facing behavior
changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note),
I have pinged the relevant PM to schedule a changelog post.
  • Loading branch information
ParkMyCar authored Oct 25, 2024
1 parent 4d848ec commit 1945b14
Show file tree
Hide file tree
Showing 24 changed files with 1,940 additions and 201 deletions.
58 changes: 37 additions & 21 deletions src/adapter/src/catalog/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ impl CatalogState {
diff: StateDiff,
retractions: &mut InProgressRetractions,
) {
let id = system_object_mapping.unique_identifier.id;
let id = system_object_mapping.unique_identifier.global_id;

if system_object_mapping.unique_identifier.runtime_alterable() {
// Runtime-alterable system objects have real entries in the items
Expand Down Expand Up @@ -871,13 +871,15 @@ impl CatalogState {
StateDiff::Addition => {
let key = item.key();
let mz_catalog::durable::Item {
id,
id: _,
oid,
schema_id,
name,
create_sql,
owner_id,
privileges,
global_id,
extra_versions: _,
} = item;
let schema = self.find_non_temp_schema(&schema_id);
let name = QualifiedItemName {
Expand All @@ -889,18 +891,23 @@ impl CatalogState {
};
let entry = match retractions.items.remove(&key) {
Some(mut retraction) => {
assert_eq!(retraction.id, item.id);
// TODO(alter_table): Switch this to CatalogItemId.
assert_eq!(retraction.id, item.global_id);
// We only reparse the SQL if it's changed. Otherwise, we use the existing
// item. This is a performance optimization and not needed for correctness.
// This makes it difficult to use the `UpdateFrom` trait, but the structure
// is still the same as the trait.
//
// TODO(alter_table): Switch this to CatalogItemId.
if retraction.create_sql() != create_sql {
let item = self.deserialize_item(id, &create_sql).unwrap_or_else(|e| {
panic!("{e:?}: invalid persisted SQL: {create_sql}")
});
let item = self
.deserialize_item(global_id, &create_sql)
.unwrap_or_else(|e| {
panic!("{e:?}: invalid persisted SQL: {create_sql}")
});
retraction.item = item;
}
retraction.id = id;
retraction.id = global_id;
retraction.oid = oid;
retraction.name = name;
retraction.owner_id = owner_id;
Expand All @@ -909,15 +916,17 @@ impl CatalogState {
retraction
}
None => {
let catalog_item =
self.deserialize_item(id, &create_sql).unwrap_or_else(|e| {
// TODO(alter_table): Switch this to CatalogItemId.
let catalog_item = self
.deserialize_item(global_id, &create_sql)
.unwrap_or_else(|e| {
panic!("{e:?}: invalid persisted SQL: {create_sql}")
});
CatalogEntry {
item: catalog_item,
referenced_by: Vec::new(),
used_by: Vec::new(),
id,
id: global_id,
oid,
name,
owner_id,
Expand All @@ -939,7 +948,8 @@ impl CatalogState {
self.insert_entry(entry);
}
StateDiff::Retraction => {
let entry = self.drop_item(item.id);
// TODO(alter_table): Switch this to CatalogItemId.
let entry = self.drop_item(item.global_id);
let key = item.into_key_value().0;
retractions.items.insert(key, entry);
}
Expand Down Expand Up @@ -990,16 +1000,19 @@ impl CatalogState {
) {
match diff {
StateDiff::Addition => {
let prev = self
.source_references
.insert(source_references.source_id, source_references.into());
let prev = self.source_references.insert(
source_references.source_id.to_global_id(),
source_references.into(),
);
assert!(
prev.is_none(),
"values must be explicitly retracted before inserting a new value: {prev:?}"
);
}
StateDiff::Retraction => {
let prev = self.source_references.remove(&source_references.source_id);
let prev = self
.source_references
.remove(&source_references.source_id.to_global_id());
assert!(
prev.is_some(),
"retraction for a non-existent existing value: {source_references:?}"
Expand Down Expand Up @@ -1121,13 +1134,13 @@ impl CatalogState {
// items collection and so get handled through the normal
// `StateUpdateKind::Item`.`
if !system_object_mapping.unique_identifier.runtime_alterable() {
self.pack_item_update(system_object_mapping.unique_identifier.id, diff)
self.pack_item_update(system_object_mapping.unique_identifier.global_id, diff)
} else {
vec![]
}
}
StateUpdateKind::TemporaryItem(item) => self.pack_item_update(item.id, diff),
StateUpdateKind::Item(item) => self.pack_item_update(item.id, diff),
StateUpdateKind::Item(item) => self.pack_item_update(item.global_id, diff),
StateUpdateKind::Comment(comment) => vec![self.pack_comment_update(
comment.object_id,
comment.sub_component,
Expand Down Expand Up @@ -1720,7 +1733,8 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
if item.create_sql.starts_with("CREATE SINK") {
GlobalId::User(u64::MAX)
} else {
item.id
// TODO(alter_table): Switch this to CatalogItemId.
item.global_id
}
})
.collect()
Expand Down Expand Up @@ -1757,14 +1771,16 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
while let (Some((item, _, _)), Some((temp_item, _, _))) =
(item_updates.front(), temp_item_updates.front())
{
if item.id < temp_item.id {
// TODO(alter_table): Switch this to CatalogItemId.
if item.global_id < temp_item.id {
let (item, ts, diff) = item_updates.pop_front().expect("non-empty");
state_updates.push(StateUpdate {
kind: StateUpdateKind::Item(item),
ts,
diff,
});
} else if item.id > temp_item.id {
// TODO(alter_table): Switch this to CatalogItemId.
} else if item.global_id > temp_item.id {
let (temp_item, ts, diff) = temp_item_updates.pop_front().expect("non-empty");
state_updates.push(StateUpdate {
kind: StateUpdateKind::TemporaryItem(temp_item),
Expand Down Expand Up @@ -2009,5 +2025,5 @@ fn lookup_builtin_view_addition(
let (_, builtin) = BUILTIN_LOOKUP
.get(&system_object_mapping.description)
.expect("missing builtin view");
(*builtin, system_object_mapping.unique_identifier.id)
(*builtin, system_object_mapping.unique_identifier.global_id)
}
24 changes: 12 additions & 12 deletions src/adapter/src/catalog/consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,18 +234,18 @@ impl CatalogState {
let mut comment_inconsistencies = Vec::new();
for (comment_object_id, col_pos, _comment) in self.comments.iter() {
match comment_object_id {
CommentObjectId::Table(global_id)
| CommentObjectId::View(global_id)
| CommentObjectId::MaterializedView(global_id)
| CommentObjectId::Source(global_id)
| CommentObjectId::Sink(global_id)
| CommentObjectId::Index(global_id)
| CommentObjectId::Func(global_id)
| CommentObjectId::Connection(global_id)
| CommentObjectId::Type(global_id)
| CommentObjectId::Secret(global_id)
| CommentObjectId::ContinualTask(global_id) => {
let entry = self.entry_by_id.get(&global_id);
CommentObjectId::Table(item_id)
| CommentObjectId::View(item_id)
| CommentObjectId::MaterializedView(item_id)
| CommentObjectId::Source(item_id)
| CommentObjectId::Sink(item_id)
| CommentObjectId::Index(item_id)
| CommentObjectId::Func(item_id)
| CommentObjectId::Connection(item_id)
| CommentObjectId::Type(item_id)
| CommentObjectId::Secret(item_id)
| CommentObjectId::ContinualTask(item_id) => {
let entry = self.entry_by_id.get(&item_id.to_global_id());
match entry {
None => comment_inconsistencies
.push(CommentInconsistency::Dangling(comment_object_id)),
Expand Down
11 changes: 6 additions & 5 deletions src/adapter/src/catalog/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ where

for mut item in tx.get_items() {
let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
f(tx, item.id, &mut stmt).await?;
// TODO(alter_table): Switch this to CatalogItemId.
f(tx, item.global_id, &mut stmt).await?;

item.create_sql = stmt.to_ast_string_stable();

updated_items.insert(item.id, item);
updated_items.insert(item.global_id, item);
}
tx.update_items(updated_items)?;
Ok(())
Expand All @@ -63,12 +64,12 @@ where
let items = tx.get_items();
for mut item in items {
let mut stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;

f(tx, &cat, item.id, &mut stmt).await?;
// TODO(alter_table): Switch this to CatalogItemId.
f(tx, &cat, item.global_id, &mut stmt).await?;

item.create_sql = stmt.to_ast_string_stable();

updated_items.insert(item.id, item);
updated_items.insert(item.global_id, item);
}
tx.update_items(updated_items)?;
Ok(())
Expand Down
13 changes: 9 additions & 4 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,8 @@ impl Catalog {
object_name: entry.name().item.clone(),
},
unique_identifier: SystemObjectUniqueIdentifier {
id: new_id,
catalog_id: new_id.to_item_id(),
global_id: new_id,
fingerprint: fingerprint.clone(),
},
},
Expand Down Expand Up @@ -949,7 +950,7 @@ fn add_new_remove_old_builtin_items_migration(
!builtin.runtime_alterable(),
"setting the runtime alterable flag on an existing object is not permitted"
);
migrated_builtin_ids.push(system_object_mapping.unique_identifier.id);
migrated_builtin_ids.push(system_object_mapping.unique_identifier.global_id);
}
}

Expand All @@ -961,7 +962,11 @@ fn add_new_remove_old_builtin_items_migration(
object_type: builtin.catalog_item_type(),
object_name: builtin.name().to_string(),
},
unique_identifier: SystemObjectUniqueIdentifier { id, fingerprint },
unique_identifier: SystemObjectUniqueIdentifier {
catalog_id: id.to_item_id(),
global_id: id,
fingerprint,
},
});

// Runtime-alterable system objects are durably recorded to the
Expand Down Expand Up @@ -1004,7 +1009,7 @@ fn add_new_remove_old_builtin_items_migration(
for (_, mapping) in system_object_mappings {
deleted_system_objects.insert(mapping.description);
if mapping.unique_identifier.fingerprint == RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL {
deleted_runtime_alterable_system_ids.insert(mapping.unique_identifier.id);
deleted_runtime_alterable_system_ids.insert(mapping.unique_identifier.global_id);
}
}
// If you are 100% positive that it is safe to delete a system object outside any of the
Expand Down
3 changes: 2 additions & 1 deletion src/adapter/src/catalog/open/builtin_item_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ async fn migrate_builtin_items_0dt(
object_name: entry.name().item.clone(),
},
unique_identifier: SystemObjectUniqueIdentifier {
id: *id,
catalog_id: id.to_item_id(),
global_id: *id,
fingerprint: fingerprint.clone(),
},
},
Expand Down
29 changes: 14 additions & 15 deletions src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1742,20 +1742,19 @@ impl CatalogState {
match object_id {
ObjectId::Item(global_id) => {
let entry = self.get_entry(&global_id);
let item_id = global_id.to_item_id();
match entry.item_type() {
CatalogItemType::Table => CommentObjectId::Table(global_id),
CatalogItemType::Source => CommentObjectId::Source(global_id),
CatalogItemType::Sink => CommentObjectId::Sink(global_id),
CatalogItemType::View => CommentObjectId::View(global_id),
CatalogItemType::MaterializedView => {
CommentObjectId::MaterializedView(global_id)
}
CatalogItemType::Index => CommentObjectId::Index(global_id),
CatalogItemType::Func => CommentObjectId::Func(global_id),
CatalogItemType::Connection => CommentObjectId::Connection(global_id),
CatalogItemType::Type => CommentObjectId::Type(global_id),
CatalogItemType::Secret => CommentObjectId::Secret(global_id),
CatalogItemType::ContinualTask => CommentObjectId::ContinualTask(global_id),
CatalogItemType::Table => CommentObjectId::Table(item_id),
CatalogItemType::Source => CommentObjectId::Source(item_id),
CatalogItemType::Sink => CommentObjectId::Sink(item_id),
CatalogItemType::View => CommentObjectId::View(item_id),
CatalogItemType::MaterializedView => CommentObjectId::MaterializedView(item_id),
CatalogItemType::Index => CommentObjectId::Index(item_id),
CatalogItemType::Func => CommentObjectId::Func(item_id),
CatalogItemType::Connection => CommentObjectId::Connection(item_id),
CatalogItemType::Type => CommentObjectId::Type(item_id),
CatalogItemType::Secret => CommentObjectId::Secret(item_id),
CatalogItemType::ContinualTask => CommentObjectId::ContinualTask(item_id),
}
}
ObjectId::Role(role_id) => CommentObjectId::Role(role_id),
Expand Down Expand Up @@ -2135,7 +2134,7 @@ impl CatalogState {
| CommentObjectId::Connection(id)
| CommentObjectId::Type(id)
| CommentObjectId::Secret(id)
| CommentObjectId::ContinualTask(id) => Some(*id),
| CommentObjectId::ContinualTask(id) => Some(id.to_global_id()),
CommentObjectId::Role(_)
| CommentObjectId::Database(_)
| CommentObjectId::Schema(_)
Expand Down Expand Up @@ -2165,7 +2164,7 @@ impl CatalogState {
| CommentObjectId::Type(id)
| CommentObjectId::Secret(id)
| CommentObjectId::ContinualTask(id) => {
let item = self.get_entry(&id);
let item = self.get_entry(&id.to_global_id());
let name = self.resolve_full_name(item.name(), Some(conn_id));
name.to_string()
}
Expand Down
2 changes: 2 additions & 0 deletions src/buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ breaking:
# reason: does currently not require backward-compatibility
- catalog/protos/objects_v67.proto
# reason: does currently not require backward-compatibility
- catalog/protos/objects_v68.proto
# reason: does currently not require backward-compatibility
- cluster-client/src/client.proto
# reason: does currently not require backward-compatibility
- compute-client/src/logging.proto
Expand Down
1 change: 1 addition & 0 deletions src/catalog/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ fn main() -> anyhow::Result<()> {
.enum_attribute("CatalogItem.value", ATTR)
.enum_attribute("ClusterConfig.variant", ATTR)
.enum_attribute("GlobalId.value", ATTR)
.enum_attribute("CatalogItemId.value", ATTR)
.enum_attribute("ClusterId.value", ATTR)
.enum_attribute("DatabaseId.value", ATTR)
.enum_attribute("SchemaId.value", ATTR)
Expand Down
6 changes: 5 additions & 1 deletion src/catalog/protos/hashes.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[
{
"name": "objects.proto",
"md5": "64474ac4b2cf2f9aca7a0d772c4548e6"
"md5": "b023c4e7ca71ae263d80a609dd586c72"
},
{
"name": "objects_v60.proto",
Expand Down Expand Up @@ -34,5 +34,9 @@
{
"name": "objects_v67.proto",
"md5": "ce8acf8bc724dc3121e3014555f00250"
},
{
"name": "objects_v68.proto",
"md5": "f9ae9b93103620bce86b07c0a35b9c6d"
}
]
Loading

0 comments on commit 1945b14

Please sign in to comment.