diff --git a/doc/user/content/sql/system-catalog/mz_internal.md b/doc/user/content/sql/system-catalog/mz_internal.md index 1336eb39342ec..dcd360edeeb76 100644 --- a/doc/user/content/sql/system-catalog/mz_internal.md +++ b/doc/user/content/sql/system-catalog/mz_internal.md @@ -696,6 +696,16 @@ The `mz_network_policy_rules` table contains a row for each network policy rule. | `address` | [`text`] | The address the rule will take action on. | | `direction` | [`text`] | The direction of traffic the rule applies to. `ingress` is the only supported direction. | +## `mz_show_network_policiesjj` + +The `mz_show_show_network_policies` view contains a row for each network policy in the system. + + +| Field | Type | Meaning | +|-------------|----------|-------------------------------------| +| `name` | [`text`] | The name the network policy. | +| `comment` | [`text`] | The comment on the network policy. | + ## `mz_show_all_privileges` The `mz_show_all_privileges` view contains a row for each privilege granted diff --git a/doc/user/content/sql/types/mz_aclitem.md b/doc/user/content/sql/types/mz_aclitem.md index 1c3bf4609ebce..49e21f4240aee 100644 --- a/doc/user/content/sql/types/mz_aclitem.md +++ b/doc/user/content/sql/types/mz_aclitem.md @@ -26,17 +26,18 @@ is `=/`. A list of all privileges and their abbreviations are below: -| Privilege | Description | Abbreviation | Applicable Object Types | -|-----------------|------------------------------------------------------------------------------------------------|-------------------|-----------------------------------------------| -| `SELECT` | Allows reading rows from an object. | r(”read”) | Table, View, Materialized View, Source | -| `INSERT` | Allows inserting into an object. | a(”append”) | Table | -| `UPDATE` | Allows updating an object (requires SELECT if a read is necessary). | w(”write”) | Table | -| `DELETE` | Allows deleting from an object (requires SELECT if a read is necessary). | d | Table | -| `CREATE` | Allows creating a new object within another object. | C | Database, Schema, Cluster | -| `USAGE` | Allows using an object or looking up members of an object. | U | Database, Schema, Connection, Secret, Cluster | -| `CREATEROLE` | Allows creating, altering, deleting roles and the ability to grant and revoke role membership. | R("Role") | System | -| `CREATEDB` | Allows creating databases. | B("dataBase") | System | -| `CREATECLUSTER` | Allows creating clusters. | N("compute Node") | System | +| Privilege | Description | Abbreviation | Applicable Object Types | +|-----------------------|------------------------------------------------------------------------------------------------|---------------------|-----------------------------------------------| +| `SELECT` | Allows reading rows from an object. | r(”read”) | Table, View, Materialized View, Source | +| `INSERT` | Allows inserting into an object. | a(”append”) | Table | +| `UPDATE` | Allows updating an object (requires SELECT if a read is necessary). | w(”write”) | Table | +| `DELETE` | Allows deleting from an object (requires SELECT if a read is necessary). | d | Table | +| `CREATE` | Allows creating a new object within another object. | C | Database, Schema, Cluster | +| `USAGE` | Allows using an object or looking up members of an object. | U | Database, Schema, Connection, Secret, Cluster | +| `CREATEROLE` | Allows creating, altering, deleting roles and the ability to grant and revoke role membership. | R("Role") | System | +| `CREATEDB` | Allows creating databases. | B("dataBase") | System | +| `CREATECLUSTER` | Allows creating clusters. | N("compute Node") | System | +| `CREATENETWORKPOLICY` | Allows creating network policies. | P("network Policy") | System | The `CREATEROLE` privilege is very powerful. It allows roles to grant and revoke membership in other roles, even if it doesn't have explicit membership in those roles. As a consequence, any role diff --git a/doc/user/sql-grammar/sql-grammar.bnf b/doc/user/sql-grammar/sql-grammar.bnf index 243725edb7c90..51b8bf2a124d8 100644 --- a/doc/user/sql-grammar/sql-grammar.bnf +++ b/doc/user/sql-grammar/sql-grammar.bnf @@ -84,7 +84,7 @@ comment_on ::= 'COMMENT ON' ( 'CLUSTER' | 'CLUSTER REPLICA' | 'COLUMN' | 'CONNECTION' | 'DATABASE' | 'FUNCTION' | 'INDEX' | 'MATERIALIZED VIEW' | 'ROLE' | 'SCHEMA' | 'SECRET' | 'SINK' | 'SOURCE' | - 'TABLE' | 'TYPE' | 'VIEW' + 'TABLE' | 'TYPE' | 'VIEW' | 'NETWORK POLICY' ) object_name 'IS' ( string_literal | 'NULL' ) commit ::= 'COMMIT' diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index 2ff751b422db1..fb3b8d20db211 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -1673,6 +1673,16 @@ impl SessionCatalog for ConnCatalog<'_> { } } + fn resolve_network_policy( + &self, + policy_name: &str, + ) -> Result<&dyn mz_sql::catalog::CatalogNetworkPolicy, SqlCatalogError> { + match self.state.try_get_network_policy_by_name(policy_name) { + Some(policy) => Ok(policy), + None => Err(SqlCatalogError::UnknownNetworkPolicy(policy_name.into())), + } + } + fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole> { Some(self.state.roles_by_id.get(id)?) } diff --git a/src/adapter/src/catalog/state.rs b/src/adapter/src/catalog/state.rs index 5438a24028f99..7460ec6968574 100644 --- a/src/adapter/src/catalog/state.rs +++ b/src/adapter/src/catalog/state.rs @@ -375,13 +375,10 @@ impl CatalogState { seen, )); } - id @ ObjectId::Role(_) => { - let unseen = seen.insert(id.clone()); - if unseen { - dependents.push(id.clone()); - } + ObjectId::NetworkPolicy(id) => { + dependents.extend_from_slice(&self.network_policy_dependents(*id, seen)); } - id @ ObjectId::NetworkPolicy(_) => { + id @ ObjectId::Role(_) => { let unseen = seen.insert(id.clone()); if unseen { dependents.push(id.clone()); @@ -399,7 +396,7 @@ impl CatalogState { /// itself. /// /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear - /// earlier in the list than the roots. This is particularly userful for the order to drop + /// earlier in the list than the roots. This is particularly useful for the order to drop /// objects. fn cluster_dependents( &self, @@ -430,7 +427,7 @@ impl CatalogState { /// itself. /// /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear - /// earlier in the list than the roots. This is particularly userful for the order to drop + /// earlier in the list than the roots. This is particularly useful for the order to drop /// objects. pub(super) fn cluster_replica_dependents( &self, @@ -451,7 +448,7 @@ impl CatalogState { /// itself. /// /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear - /// earlier in the list than the roots. This is particularly userful for the order to drop + /// earlier in the list than the roots. This is particularly useful for the order to drop /// objects. fn database_dependents( &self, @@ -481,7 +478,7 @@ impl CatalogState { /// itself. /// /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear - /// earlier in the list than the roots. This is particularly userful for the order to drop + /// earlier in the list than the roots. This is particularly useful for the order to drop /// objects. fn schema_dependents( &self, @@ -507,7 +504,7 @@ impl CatalogState { /// itself. /// /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear - /// earlier in the list than the roots. This is particularly userful for the order to drop + /// earlier in the list than the roots. This is particularly useful for the order to drop /// objects. pub(super) fn item_dependents( &self, @@ -533,6 +530,24 @@ impl CatalogState { dependents } + /// Returns all the IDs of all objects that depend on `network_policy_id`, including `network_policy_id` + /// itself. + /// + /// The order is guaranteed to be in reverse dependency order, i.e. the leafs will appear + /// earlier in the list than the roots. This is particularly useful for the order to drop + /// objects. + pub(super) fn network_policy_dependents( + &self, + network_policy_id: NetworkPolicyId, + _seen: &mut BTreeSet, + ) -> Vec { + let object_id = ObjectId::NetworkPolicy(network_policy_id); + // Currently network policies have no dependents + // when we add the ability for users or sources/sinks to have policies + // this method will need to be updated. + vec![object_id] + } + /// Indicates whether the indicated item is considered stable or not. /// /// Only stable items can be used as dependencies of other catalog items. @@ -721,6 +736,15 @@ impl CatalogState { .map(|id| &self.roles_by_id[id]) } + pub(super) fn try_get_network_policy_by_name( + &self, + policy_name: &str, + ) -> Option<&NetworkPolicy> { + self.network_policies_by_name + .get(policy_name) + .map(|id| &self.network_policies_by_id[id]) + } + pub(crate) fn collect_role_membership(&self, id: &RoleId) -> BTreeSet { let mut membership = BTreeSet::new(); let mut queue = VecDeque::from(vec![id]); diff --git a/src/adapter/src/catalog/transact.rs b/src/adapter/src/catalog/transact.rs index f4ee41b25db5a..be63cc48d80ac 100644 --- a/src/adapter/src/catalog/transact.rs +++ b/src/adapter/src/catalog/transact.rs @@ -22,7 +22,7 @@ use mz_audit_log::{ ObjectType, SchedulingDecisionsWithReasonsV1, VersionedEvent, VersionedStorageUsage, }; use mz_catalog::builtin::BuiltinLog; -use mz_catalog::durable::Transaction; +use mz_catalog::durable::{NetworkPolicy, Transaction}; use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind}; use mz_catalog::memory::objects::{ CatalogItem, ClusterConfig, DataSourceDesc, SourceReferences, StateDiff, StateUpdate, @@ -81,6 +81,12 @@ pub enum Op { attributes: RoleAttributes, vars: RoleVars, }, + AlterNetworkPolicy { + id: NetworkPolicyId, + rules: Vec, + name: String, + owner_id: RoleId, + }, CreateDatabase { name: String, owner_id: RoleId, @@ -115,7 +121,6 @@ pub enum Op { owner_id: RoleId, }, CreateNetworkPolicy { - id: NetworkPolicyId, rules: Vec, name: String, owner_id: RoleId, @@ -655,6 +660,38 @@ impl Catalog { info!("update role {name} ({id})"); } + Op::AlterNetworkPolicy { + id, + rules, + name, + owner_id: _owner_id, + } => { + let existing_policy = state.get_network_policy(&id).clone(); + let mut policy: NetworkPolicy = existing_policy.into(); + policy.rules = rules; + if is_reserved_name(&name) { + return Err(AdapterError::Catalog(Error::new( + ErrorKind::ReservedNetworkPolicyName(name), + ))); + } + tx.update_network_policy(id, policy.clone())?; + + CatalogState::add_to_audit_log( + &state.system_configuration, + oracle_write_ts, + session, + tx, + audit_events, + EventType::Alter, + ObjectType::NetworkPolicy, + EventDetails::IdNameV1(mz_audit_log::IdNameV1 { + id: id.to_string(), + name: name.clone(), + }), + )?; + + info!("update network policy {name} ({id})"); + } Op::CreateDatabase { name, owner_id } => { let database_owner_privileges = vec![rbac::owner_privilege( mz_sql::catalog::ObjectType::Database, @@ -1123,7 +1160,6 @@ impl Catalog { } } Op::CreateNetworkPolicy { - id, rules, name, owner_id, @@ -1157,14 +1193,29 @@ impl Catalog { .collect(); let temporary_oids: HashSet<_> = state.get_temporary_oids().collect(); - tx.insert_user_network_policy( - id, - name, + let id = tx.insert_user_network_policy( + name.clone(), rules, privileges, owner_id, &temporary_oids, )?; + + CatalogState::add_to_audit_log( + &state.system_configuration, + oracle_write_ts, + session, + tx, + audit_events, + EventType::Create, + ObjectType::NetworkPolicy, + EventDetails::IdNameV1(mz_audit_log::IdNameV1 { + id: id.to_string(), + name: name.clone(), + }), + )?; + + info!("created network policy {name} ({id})"); } Op::Comment { object_id, @@ -1357,6 +1408,31 @@ impl Catalog { info!("drop role {}", role.name()); } + // Drop any roles. + tx.remove_network_policies(&delta.network_policies)?; + + for network_policy_id in delta.network_policies { + let policy = state + .network_policies_by_id + .get(&network_policy_id) + .expect("catalog out of sync"); + + CatalogState::add_to_audit_log( + &state.system_configuration, + oracle_write_ts, + session, + tx, + audit_events, + EventType::Drop, + ObjectType::NetworkPolicy, + EventDetails::IdNameV1(mz_audit_log::IdNameV1 { + id: policy.id.to_string(), + name: policy.name.clone(), + }), + )?; + info!("drop network policy {}", policy.name.clone()); + } + // Drop any replicas. let replicas = delta.replicas.keys().copied().collect(); tx.remove_cluster_replicas(&replicas)?; diff --git a/src/adapter/src/command.rs b/src/adapter/src/command.rs index 7a9289bb9c574..52dae87a9959c 100644 --- a/src/adapter/src/command.rs +++ b/src/adapter/src/command.rs @@ -321,6 +321,8 @@ pub enum ExecuteResponse { CreatedContinualTask, /// The requested type was created. CreatedType, + /// The requested network policy was created. + CreatedNetworkPolicy, /// The requested prepared statement was removed. Deallocate { all: bool }, /// The requested cursor was declared. @@ -486,6 +488,7 @@ impl TryInto for ExecuteResponseKind { ExecuteResponseKind::CreatedMaterializedView => { Ok(ExecuteResponse::CreatedMaterializedView) } + ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy), ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask), ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType), ExecuteResponseKind::Deallocate => Err(()), @@ -550,6 +553,7 @@ impl ExecuteResponse { CreatedMaterializedView { .. } => Some("CREATE MATERIALIZED VIEW".into()), CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()), CreatedType => Some("CREATE TYPE".into()), + CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()), Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })), DeclaredCursor => Some("DECLARE CURSOR".into()), Deleted(n) => Some(format!("DELETE {}", n)), @@ -612,7 +616,8 @@ impl ExecuteResponse { | AlterConnection | AlterSource | AlterSink - | AlterTableAddColumn => &[AlteredObject], + | AlterTableAddColumn + | AlterNetworkPolicy => &[AlteredObject], AlterDefaultPrivileges => &[AlteredDefaultPrivileges], AlterSetCluster => &[AlteredObject], AlterRole => &[AlteredRole], @@ -640,6 +645,7 @@ impl ExecuteResponse { CreateIndex => &[CreatedIndex], CreateType => &[CreatedType], PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate], + CreateNetworkPolicy => &[CreatedNetworkPolicy], Declare => &[DeclaredCursor], DiscardTemp => &[DiscardedTemp], DiscardAll => &[DiscardedAll], diff --git a/src/adapter/src/coord/catalog_serving.rs b/src/adapter/src/coord/catalog_serving.rs index eda8d1e24084a..1758c0445cb29 100644 --- a/src/adapter/src/coord/catalog_serving.rs +++ b/src/adapter/src/coord/catalog_serving.rs @@ -72,6 +72,7 @@ pub fn auto_run_on_catalog_server<'a, 's, 'p>( | Plan::CreateDatabase(_) | Plan::CreateSchema(_) | Plan::CreateRole(_) + | Plan::CreateNetworkPolicy(_) | Plan::CreateCluster(_) | Plan::CreateClusterReplica(_) | Plan::CreateContinualTask(_) @@ -106,6 +107,7 @@ pub fn auto_run_on_catalog_server<'a, 's, 'p>( | Plan::ExplainPushdown(_) | Plan::ExplainSinkSchema(_) | Plan::Insert(_) + | Plan::AlterNetworkPolicy(_) | Plan::AlterNoop(_) | Plan::AlterClusterRename(_) | Plan::AlterClusterSwap(_) diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index d85e5981bf415..7bc580b0d5615 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -659,6 +659,7 @@ impl Coordinator { | Statement::AlterSystemResetAll(_) | Statement::AlterSystemSet(_) | Statement::AlterTableAddColumn(_) + | Statement::AlterNetworkPolicy(_) | Statement::CreateCluster(_) | Statement::CreateClusterReplica(_) | Statement::CreateConnection(_) @@ -677,6 +678,7 @@ impl Coordinator { | Statement::CreateType(_) | Statement::CreateView(_) | Statement::CreateWebhookSource(_) + | Statement::CreateNetworkPolicy(_) | Statement::Delete(_) | Statement::DropObjects(_) | Statement::DropOwned(_) diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index b709bcaba580a..95e688da4752b 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -1376,6 +1376,9 @@ impl Coordinator { Op::CreateRole { .. } => { new_roles += 1; } + Op::CreateNetworkPolicy { .. } => { + new_network_policies += 1; + } Op::CreateCluster { .. } => { // TODO(benesch): having deprecated linked clusters, remove // the `max_sources` and `max_sinks` limit, and set a higher @@ -1548,6 +1551,7 @@ impl Coordinator { }, Op::AlterRole { .. } | Op::AlterRetainHistory { .. } + | Op::AlterNetworkPolicy { .. } | Op::UpdatePrivilege { .. } | Op::UpdateDefaultPrivilege { .. } | Op::GrantRole { .. } @@ -1565,8 +1569,7 @@ impl Coordinator { | Op::ResetAllSystemConfiguration { .. } | Op::Comment { .. } | Op::WeirdStorageUsageUpdates { .. } - | Op::TransactionDryRun - | Op::CreateNetworkPolicy { .. } => {} + | Op::TransactionDryRun => {} } } diff --git a/src/adapter/src/coord/sequencer.rs b/src/adapter/src/coord/sequencer.rs index 0ff16cf3c3382..911edaefeaef5 100644 --- a/src/adapter/src/coord/sequencer.rs +++ b/src/adapter/src/coord/sequencer.rs @@ -228,6 +228,12 @@ impl Coordinator { .await; ctx.retire(result); } + Plan::CreateNetworkPolicy(plan) => { + let res = self + .sequence_create_network_policy(ctx.session(), plan) + .await; + ctx.retire(res); + } Plan::Comment(plan) => { let result = self.sequence_comment_on(ctx.session(), plan).await; ctx.retire(result); @@ -470,6 +476,12 @@ impl Coordinator { let result = self.sequence_alter_table(ctx.session(), plan).await; ctx.retire(result); } + Plan::AlterNetworkPolicy(plan) => { + let res = self + .sequence_alter_network_policy(ctx.session(), plan) + .await; + ctx.retire(res); + } Plan::DiscardTemp => { self.drop_temp_items(ctx.session().conn_id()).await; ctx.retire(Ok(ExecuteResponse::DiscardedTemp)); diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index 54d1310adde09..d42fc4631d755 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -907,6 +907,39 @@ impl Coordinator { .map(|_| ExecuteResponse::CreatedRole) } + #[instrument] + pub(super) async fn sequence_create_network_policy( + &mut self, + session: &Session, + plan::CreateNetworkPolicyPlan { name, rules }: plan::CreateNetworkPolicyPlan, + ) -> Result { + let op = catalog::Op::CreateNetworkPolicy { + rules, + name, + owner_id: *session.current_role_id(), + }; + self.catalog_transact_conn(Some(session.conn_id()), vec![op]) + .await + .map(|_| ExecuteResponse::CreatedNetworkPolicy) + } + + #[instrument] + pub(super) async fn sequence_alter_network_policy( + &mut self, + session: &Session, + plan::AlterNetworkPolicyPlan { id, name, rules }: plan::AlterNetworkPolicyPlan, + ) -> Result { + let op = catalog::Op::AlterNetworkPolicy { + id, + rules, + name, + owner_id: *session.current_role_id(), + }; + self.catalog_transact_conn(Some(session.conn_id()), vec![op]) + .await + .map(|_| ExecuteResponse::AlteredObject(ObjectType::NetworkPolicy)) + } + #[instrument] pub(super) async fn sequence_create_table( &mut self, diff --git a/src/adapter/src/statement_logging.rs b/src/adapter/src/statement_logging.rs index cac4c3450a043..3ecc68e140293 100644 --- a/src/adapter/src/statement_logging.rs +++ b/src/adapter/src/statement_logging.rs @@ -209,6 +209,7 @@ impl From<&ExecuteResponse> for StatementEndedExecutionReason { | ExecuteResponse::CreatedMaterializedView | ExecuteResponse::CreatedContinualTask | ExecuteResponse::CreatedType + | ExecuteResponse::CreatedNetworkPolicy | ExecuteResponse::Deallocate { .. } | ExecuteResponse::DeclaredCursor | ExecuteResponse::Deleted(_) diff --git a/src/catalog/src/builtin.rs b/src/catalog/src/builtin.rs index c8725226ea024..7d0c62b8198dd 100644 --- a/src/catalog/src/builtin.rs +++ b/src/catalog/src/builtin.rs @@ -7362,6 +7362,31 @@ WHERE access: vec![PUBLIC_SELECT], }); +pub static MZ_SHOW_NETWORK_POLICIES: LazyLock = LazyLock::new(|| BuiltinView { + name: "mz_show_network_policies", + schema: MZ_INTERNAL_SCHEMA, + oid: oid::VIEW_MZ_SHOW_NETWORK_POLICIES_OID, + column_defs: None, + sql: " +WITH comments AS ( + SELECT id, comment + FROM mz_internal.mz_comments + WHERE object_type = 'network-policy' AND object_sub_id IS NULL +) +SELECT + name, + COALESCE(comment, '') as comment +FROM + mz_internal.mz_network_policies as policy +LEFT JOIN + comments ON policy.id = comments.id +WHERE + policy.id NOT LIKE 's%' +AND + policy.id NOT LIKE 'g%'", + access: vec![PUBLIC_SELECT], +}); + pub static MZ_CLUSTER_REPLICA_HISTORY: LazyLock = LazyLock::new(|| BuiltinView { name: "mz_cluster_replica_history", schema: MZ_INTERNAL_SCHEMA, @@ -9034,6 +9059,7 @@ pub static BUILTINS_STATIC: LazyLock>> = LazyLock::ne Builtin::View(&MZ_COMPUTE_HYDRATION_STATUSES), Builtin::View(&MZ_HYDRATION_STATUSES), Builtin::View(&MZ_SHOW_CLUSTER_REPLICAS), + Builtin::View(&MZ_SHOW_NETWORK_POLICIES), Builtin::Index(&MZ_SHOW_DATABASES_IND), Builtin::Index(&MZ_SHOW_SCHEMAS_IND), Builtin::Index(&MZ_SHOW_CONNECTIONS_IND), diff --git a/src/catalog/src/durable.rs b/src/catalog/src/durable.rs index 2bca27d910e40..3eacba7e97cd3 100644 --- a/src/catalog/src/durable.rs +++ b/src/catalog/src/durable.rs @@ -63,6 +63,7 @@ pub const USER_REPLICA_ID_ALLOC_KEY: &str = "replica"; pub const SYSTEM_REPLICA_ID_ALLOC_KEY: &str = "system_replica"; pub const AUDIT_LOG_ID_ALLOC_KEY: &str = "auditlog"; pub const STORAGE_USAGE_ID_ALLOC_KEY: &str = "storage_usage"; +pub const USER_NETWORK_POLICY_ID_ALLOC_KEY: &str = "user_network_policy"; pub const OID_ALLOC_KEY: &str = "oid"; pub(crate) const CATALOG_CONTENT_VERSION_KEY: &str = "catalog_content_version"; diff --git a/src/catalog/src/durable/initialize.rs b/src/catalog/src/durable/initialize.rs index 1649b25f28db3..155b119e3ae2a 100644 --- a/src/catalog/src/durable/initialize.rs +++ b/src/catalog/src/durable/initialize.rs @@ -42,8 +42,8 @@ use crate::durable::{ DefaultPrivilege, ReplicaConfig, ReplicaLocation, Role, Schema, Transaction, AUDIT_LOG_ID_ALLOC_KEY, CATALOG_CONTENT_VERSION_KEY, DATABASE_ID_ALLOC_KEY, OID_ALLOC_KEY, SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_CLUSTER_ID_ALLOC_KEY, - SYSTEM_REPLICA_ID_ALLOC_KEY, USER_CLUSTER_ID_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY, - USER_ROLE_ID_ALLOC_KEY, + SYSTEM_REPLICA_ID_ALLOC_KEY, USER_CLUSTER_ID_ALLOC_KEY, USER_NETWORK_POLICY_ID_ALLOC_KEY, + USER_REPLICA_ID_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY, }; /// The key within the "config" Collection that stores the version of the catalog. @@ -229,6 +229,10 @@ pub(crate) async fn initialize( SYSTEM_REPLICA_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID, ), + ( + USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string(), + DEFAULT_ALLOCATOR_ID, + ), (AUDIT_LOG_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID), (STORAGE_USAGE_ID_ALLOC_KEY.to_string(), DEFAULT_ALLOCATOR_ID), (OID_ALLOC_KEY.to_string(), FIRST_USER_OID.into()), diff --git a/src/catalog/src/durable/objects/state_update.rs b/src/catalog/src/durable/objects/state_update.rs index bb1b89f7c8987..dad24135aea35 100644 --- a/src/catalog/src/durable/objects/state_update.rs +++ b/src/catalog/src/durable/objects/state_update.rs @@ -135,6 +135,7 @@ impl StateUpdate { roles, clusters, cluster_replicas, + network_policies, introspection_sources, id_allocator, configs, @@ -157,6 +158,7 @@ impl StateUpdate { let roles = from_batch(roles, StateUpdateKind::Role); let clusters = from_batch(clusters, StateUpdateKind::Cluster); let cluster_replicas = from_batch(cluster_replicas, StateUpdateKind::ClusterReplica); + let network_policies = from_batch(network_policies, StateUpdateKind::NetworkPolicy); let introspection_sources = from_batch( introspection_sources, StateUpdateKind::IntrospectionSourceIndex, @@ -186,6 +188,7 @@ impl StateUpdate { .chain(roles) .chain(clusters) .chain(cluster_replicas) + .chain(network_policies) .chain(introspection_sources) .chain(id_allocators) .chain(configs) diff --git a/src/catalog/src/durable/transaction.rs b/src/catalog/src/durable/transaction.rs index f9a195f0ad3b5..02a3ec0c63889 100644 --- a/src/catalog/src/durable/transaction.rs +++ b/src/catalog/src/durable/transaction.rs @@ -60,8 +60,8 @@ use crate::durable::{ CatalogError, DefaultPrivilege, DurableCatalogError, DurableCatalogState, NetworkPolicy, Snapshot, AUDIT_LOG_ID_ALLOC_KEY, CATALOG_CONTENT_VERSION_KEY, DATABASE_ID_ALLOC_KEY, OID_ALLOC_KEY, SCHEMA_ID_ALLOC_KEY, STORAGE_USAGE_ID_ALLOC_KEY, SYSTEM_ITEM_ALLOC_KEY, - SYSTEM_REPLICA_ID_ALLOC_KEY, USER_ITEM_ALLOC_KEY, USER_REPLICA_ID_ALLOC_KEY, - USER_ROLE_ID_ALLOC_KEY, + SYSTEM_REPLICA_ID_ALLOC_KEY, USER_ITEM_ALLOC_KEY, USER_NETWORK_POLICY_ID_ALLOC_KEY, + USER_REPLICA_ID_ALLOC_KEY, USER_ROLE_ID_ALLOC_KEY, }; use crate::memory::objects::{StateDiff, StateUpdate, StateUpdateKind}; @@ -567,7 +567,6 @@ impl<'a> Transaction<'a> { pub fn insert_user_network_policy( &mut self, - id: NetworkPolicyId, name: String, rules: Vec, privileges: Vec, @@ -575,6 +574,8 @@ impl<'a> Transaction<'a> { temporary_oids: &HashSet, ) -> Result { let oid = self.allocate_oid(temporary_oids)?; + let id = self.get_and_increment_id(USER_NETWORK_POLICY_ID_ALLOC_KEY.to_string())?; + let id = NetworkPolicyId::User(id); self.insert_network_policy(id, name, rules, privileges, owner_id, oid) } @@ -1533,7 +1534,61 @@ impl<'a> Transaction<'a> { Err(SqlCatalogError::UnknownNetworkPolicy(id.to_string()).into()) } } + /// Removes the network policy `name` from the transaction. + /// + /// Returns an error if `name` is not found. + /// + /// Runtime is linear with respect to the total number of network policies in the catalog. + /// DO NOT call this function in a loop, use [`Self::remove_roles`] instead. + pub fn remove_network_policy(&mut self, name: &str) -> Result<(), CatalogError> { + let policies = self + .network_policies + .delete(|_k, v| v.name == name, self.op_id); + assert!( + policies.iter().all(|(k, _)| k.id.is_user()), + "cannot delete non-user network policy" + ); + let n = policies.len(); + assert!(n <= 1); + if n == 1 { + Ok(()) + } else { + Err(SqlCatalogError::UnknownNetworkPolicy(name.to_owned()).into()) + } + } + /// Removes all network policies in `network policies` from the transaction. + /// + /// Returns an error if any id in `network policy` is not found. + /// + /// NOTE: On error, there still may be some roles removed from the transaction. It + /// is up to the caller to either abort the transaction or commit. + pub fn remove_network_policies( + &mut self, + network_policies: &BTreeSet, + ) -> Result<(), CatalogError> { + if network_policies.is_empty() { + return Ok(()); + } + + let to_remove = network_policies + .iter() + .map(|policy_id| (NetworkPolicyKey { id: *policy_id }, None)) + .collect(); + let mut prev = self.network_policies.set_many(to_remove, self.op_id)?; + assert!( + prev.iter().all(|(k, _)| k.id.is_user()), + "cannot delete non-user network policy" + ); + + prev.retain(|_k, v| v.is_none()); + if !prev.is_empty() { + let err = prev.keys().map(|k| k.id.to_string()).join(", "); + return Err(SqlCatalogError::UnknownNetworkPolicy(err).into()); + } + + Ok(()) + } /// Set persisted default privilege. /// /// DO NOT call this function in a loop, use [`Self::set_default_privileges`] instead. @@ -2167,6 +2222,7 @@ impl<'a> Transaction<'a> { roles: self.roles.pending(), clusters: self.clusters.pending(), cluster_replicas: self.cluster_replicas.pending(), + network_policies: self.network_policies.pending(), introspection_sources: self.introspection_sources.pending(), id_allocator: self.id_allocator.pending(), configs: self.configs.pending(), @@ -2204,6 +2260,7 @@ impl<'a> Transaction<'a> { roles, clusters, cluster_replicas, + network_policies, introspection_sources, id_allocator, configs, @@ -2228,6 +2285,7 @@ impl<'a> Transaction<'a> { differential_dataflow::consolidation::consolidate_updates(roles); differential_dataflow::consolidation::consolidate_updates(clusters); differential_dataflow::consolidation::consolidate_updates(cluster_replicas); + differential_dataflow::consolidation::consolidate_updates(network_policies); differential_dataflow::consolidation::consolidate_updates(introspection_sources); differential_dataflow::consolidation::consolidate_updates(id_allocator); differential_dataflow::consolidation::consolidate_updates(configs); @@ -2407,6 +2465,7 @@ pub struct TransactionBatch { pub(crate) roles: Vec<(proto::RoleKey, proto::RoleValue, Diff)>, pub(crate) clusters: Vec<(proto::ClusterKey, proto::ClusterValue, Diff)>, pub(crate) cluster_replicas: Vec<(proto::ClusterReplicaKey, proto::ClusterReplicaValue, Diff)>, + pub(crate) network_policies: Vec<(proto::NetworkPolicyKey, proto::NetworkPolicyValue, Diff)>, pub(crate) introspection_sources: Vec<( proto::ClusterIntrospectionSourceIndexKey, proto::ClusterIntrospectionSourceIndexValue, @@ -2458,6 +2517,7 @@ impl TransactionBatch { roles, clusters, cluster_replicas, + network_policies, introspection_sources, id_allocator, configs, @@ -2480,6 +2540,7 @@ impl TransactionBatch { && roles.is_empty() && clusters.is_empty() && cluster_replicas.is_empty() + && network_policies.is_empty() && introspection_sources.is_empty() && id_allocator.is_empty() && configs.is_empty() diff --git a/src/catalog/tests/snapshots/debug__opened_trace.snap b/src/catalog/tests/snapshots/debug__opened_trace.snap index 09620014e64eb..e99d25fd905f2 100644 --- a/src/catalog/tests/snapshots/debug__opened_trace.snap +++ b/src/catalog/tests/snapshots/debug__opened_trace.snap @@ -1,5 +1,6 @@ --- source: src/catalog/tests/debug.rs +assertion_line: 152 expression: test_trace --- Trace { @@ -387,7 +388,7 @@ Trace { object_id: "SYSTEM", grantee_id: "s1", grantor_id: "s1", - privileges: "RBN", + privileges: "RBNP", }, ), ), @@ -1032,6 +1033,18 @@ Trace { 2, 1, ), + ( + ( + IdAllocKey { + name: "user_network_policy", + }, + IdAllocValue { + next_id: 1, + }, + ), + 2, + 1, + ), ( ( IdAllocKey { @@ -2200,7 +2213,7 @@ Trace { SystemPrivilegesValue { acl_mode: Some( AclMode { - bitflags: 3758096384, + bitflags: 8053063680, }, ), }, diff --git a/src/catalog/tests/snapshots/open__initial_audit_log.snap b/src/catalog/tests/snapshots/open__initial_audit_log.snap index 0107ca3737225..a7ce419f6c8ed 100644 --- a/src/catalog/tests/snapshots/open__initial_audit_log.snap +++ b/src/catalog/tests/snapshots/open__initial_audit_log.snap @@ -193,7 +193,7 @@ expression: audit_log object_id: "SYSTEM", grantee_id: "s1", grantor_id: "s1", - privileges: "RBN", + privileges: "RBNP", }, ), user: None, diff --git a/src/catalog/tests/snapshots/open__initial_snapshot.snap b/src/catalog/tests/snapshots/open__initial_snapshot.snap index 7e7646194353b..8b2facea2d664 100644 --- a/src/catalog/tests/snapshots/open__initial_snapshot.snap +++ b/src/catalog/tests/snapshots/open__initial_snapshot.snap @@ -1,5 +1,6 @@ --- source: src/catalog/tests/open.rs +assertion_line: 461 expression: test_snapshot --- Snapshot { @@ -1356,6 +1357,11 @@ Snapshot { }: IdAllocValue { next_id: 2, }, + IdAllocKey { + name: "user_network_policy", + }: IdAllocValue { + next_id: 1, + }, IdAllocKey { name: "user_role", }: IdAllocValue { @@ -1520,7 +1526,7 @@ Snapshot { }: SystemPrivilegesValue { acl_mode: Some( AclMode { - bitflags: 3758096384, + bitflags: 8053063680, }, ), }, diff --git a/src/environmentd/src/http/sql.rs b/src/environmentd/src/http/sql.rs index 3932810bd9caa..c82a3c84c0866 100644 --- a/src/environmentd/src/http/sql.rs +++ b/src/environmentd/src/http/sql.rs @@ -1224,6 +1224,7 @@ async fn execute_stmt( | ExecuteResponse::CreatedMaterializedView { .. } | ExecuteResponse::CreatedContinualTask { .. } | ExecuteResponse::CreatedType + | ExecuteResponse::CreatedNetworkPolicy | ExecuteResponse::Comment | ExecuteResponse::Deleted(_) | ExecuteResponse::DiscardedTemp diff --git a/src/pgrepr-consts/src/oid.rs b/src/pgrepr-consts/src/oid.rs index 719b9f0c34a5f..151f35729f65e 100644 --- a/src/pgrepr-consts/src/oid.rs +++ b/src/pgrepr-consts/src/oid.rs @@ -761,3 +761,4 @@ pub const CT_MZ_WALLCLOCK_LAG_HISTORY_OID: u32 = 17038; pub const INDEX_MZ_CONTINUAL_TASKS_IND_OID: u32 = 17039; pub const TABLE_MZ_NETWORK_POLICIES_OID: u32 = 17040; pub const TABLE_MZ_NETWORK_POLICY_RULES_OID: u32 = 17041; +pub const VIEW_MZ_SHOW_NETWORK_POLICIES_OID: u32 = 17042; diff --git a/src/pgwire/src/protocol.rs b/src/pgwire/src/protocol.rs index 53b1cf1c90c21..bf7bc98d59fd6 100644 --- a/src/pgwire/src/protocol.rs +++ b/src/pgwire/src/protocol.rs @@ -1746,6 +1746,7 @@ where | ExecuteResponse::CreatedType | ExecuteResponse::CreatedView { .. } | ExecuteResponse::CreatedViews { .. } + | ExecuteResponse::CreatedNetworkPolicy | ExecuteResponse::Comment | ExecuteResponse::Deallocate { .. } | ExecuteResponse::Deleted(..) diff --git a/src/repr/src/adt/mz_acl_item.rs b/src/repr/src/adt/mz_acl_item.rs index c43115ad308f6..460293b09f769 100644 --- a/src/repr/src/adt/mz_acl_item.rs +++ b/src/repr/src/adt/mz_acl_item.rs @@ -51,6 +51,8 @@ const CREATE_ROLE_CHAR: char = 'R'; const CREATE_DB_CHAR: char = 'B'; // compute Node const CREATE_CLUSTER_CHAR: char = 'N'; +// compute Network Policy +const CREATE_NETWORK_POLICY_CHAR: char = 'P'; const INSERT_STR: &str = "INSERT"; const SELECT_STR: &str = "SELECT"; @@ -61,6 +63,7 @@ const CREATE_STR: &str = "CREATE"; const CREATE_ROLE_STR: &str = "CREATEROLE"; const CREATE_DB_STR: &str = "CREATEDB"; const CREATE_CLUSTER_STR: &str = "CREATECLUSTER"; +const CREATE_NETWORK_POLICY_STR: &str = "CREATENETWORKPOLICY"; /// The OID used to represent the PUBLIC role. See: /// @@ -94,6 +97,7 @@ bitflags! { const CREATE_CLUSTER = 1 << 29; const CREATE_DB = 1 << 30; const CREATE_ROLE = 1 << 31; + const CREATE_NETWORK_POLICY = 1 << 32; // No additional privileges should be defined at a bit larger than 1 << 31. Those bits are // reserved for grant options. @@ -112,6 +116,7 @@ impl AclMode { CREATE_ROLE_STR => Ok(AclMode::CREATE_ROLE), CREATE_DB_STR => Ok(AclMode::CREATE_DB), CREATE_CLUSTER_STR => Ok(AclMode::CREATE_CLUSTER), + CREATE_NETWORK_POLICY_STR => Ok(AclMode::CREATE_NETWORK_POLICY), _ => Err(anyhow!("{}", s.quoted())), } } @@ -158,6 +163,9 @@ impl AclMode { if self.contains(AclMode::CREATE_CLUSTER) { privileges.push(CREATE_CLUSTER_STR); } + if self.contains(AclMode::CREATE_NETWORK_POLICY) { + privileges.push(CREATE_NETWORK_POLICY_STR); + } privileges } } @@ -178,6 +186,7 @@ impl FromStr for AclMode { CREATE_ROLE_CHAR => acl_mode.bitor_assign(AclMode::CREATE_ROLE), CREATE_DB_CHAR => acl_mode.bitor_assign(AclMode::CREATE_DB), CREATE_CLUSTER_CHAR => acl_mode.bitor_assign(AclMode::CREATE_CLUSTER), + CREATE_NETWORK_POLICY_CHAR => acl_mode.bitor_assign(AclMode::CREATE_NETWORK_POLICY), _ => return Err(anyhow!("invalid privilege '{c}' in acl mode '{s}'")), } } @@ -216,6 +225,9 @@ impl fmt::Display for AclMode { if self.contains(AclMode::CREATE_CLUSTER) { write!(f, "{CREATE_CLUSTER_CHAR}")?; } + if self.contains(AclMode::CREATE_NETWORK_POLICY) { + write!(f, "{CREATE_NETWORK_POLICY_CHAR}")?; + } Ok(()) } } @@ -845,6 +857,7 @@ fn test_mz_acl_parsing() { assert!(!mz_acl.acl_mode.contains(AclMode::CREATE_ROLE)); assert!(!mz_acl.acl_mode.contains(AclMode::CREATE_DB)); assert!(!mz_acl.acl_mode.contains(AclMode::CREATE_CLUSTER)); + assert!(!mz_acl.acl_mode.contains(AclMode::CREATE_NETWORK_POLICY)); assert_eq!(s, mz_acl.to_string()); let s = "=UC/u4"; @@ -860,6 +873,7 @@ fn test_mz_acl_parsing() { assert!(!mz_acl.acl_mode.contains(AclMode::CREATE_ROLE)); assert!(!mz_acl.acl_mode.contains(AclMode::CREATE_DB)); assert!(!mz_acl.acl_mode.contains(AclMode::CREATE_CLUSTER)); + assert!(!mz_acl.acl_mode.contains(AclMode::CREATE_NETWORK_POLICY)); assert_eq!(s, mz_acl.to_string()); let s = "s7=/s12"; @@ -875,6 +889,7 @@ fn test_mz_acl_parsing() { assert!(!mz_acl.acl_mode.contains(AclMode::CREATE_ROLE)); assert!(!mz_acl.acl_mode.contains(AclMode::CREATE_DB)); assert!(!mz_acl.acl_mode.contains(AclMode::CREATE_CLUSTER)); + assert!(!mz_acl.acl_mode.contains(AclMode::CREATE_NETWORK_POLICY)); assert_eq!(s, mz_acl.to_string()); let s = "=/u100"; @@ -890,9 +905,10 @@ fn test_mz_acl_parsing() { assert!(!mz_acl.acl_mode.contains(AclMode::CREATE_ROLE)); assert!(!mz_acl.acl_mode.contains(AclMode::CREATE_DB)); assert!(!mz_acl.acl_mode.contains(AclMode::CREATE_CLUSTER)); + assert!(!mz_acl.acl_mode.contains(AclMode::CREATE_NETWORK_POLICY)); assert_eq!(s, mz_acl.to_string()); - let s = "u1=RBN/u2"; + let s = "u1=RBNP/u2"; let mz_acl: MzAclItem = s.parse().unwrap(); assert_eq!(RoleId::User(1), mz_acl.grantee); assert_eq!(RoleId::User(2), mz_acl.grantor); @@ -905,6 +921,7 @@ fn test_mz_acl_parsing() { assert!(mz_acl.acl_mode.contains(AclMode::CREATE_ROLE)); assert!(mz_acl.acl_mode.contains(AclMode::CREATE_DB)); assert!(mz_acl.acl_mode.contains(AclMode::CREATE_CLUSTER)); + assert!(mz_acl.acl_mode.contains(AclMode::CREATE_NETWORK_POLICY)); assert_eq!(s, mz_acl.to_string()); mz_ore::assert_err!("u42/rw=u666".parse::()); @@ -985,6 +1002,7 @@ fn test_acl_parsing() { assert!(!acl.acl_mode.contains(AclMode::CREATE_ROLE)); assert!(!acl.acl_mode.contains(AclMode::CREATE_DB)); assert!(!acl.acl_mode.contains(AclMode::CREATE_CLUSTER)); + assert!(!acl.acl_mode.contains(AclMode::CREATE_NETWORK_POLICY)); assert_eq!(s, acl.to_string()); let s = "=UC/4"; @@ -1000,6 +1018,7 @@ fn test_acl_parsing() { assert!(!acl.acl_mode.contains(AclMode::CREATE_ROLE)); assert!(!acl.acl_mode.contains(AclMode::CREATE_DB)); assert!(!acl.acl_mode.contains(AclMode::CREATE_CLUSTER)); + assert!(!acl.acl_mode.contains(AclMode::CREATE_NETWORK_POLICY)); assert_eq!(s, acl.to_string()); let s = "7=/12"; @@ -1015,6 +1034,7 @@ fn test_acl_parsing() { assert!(!acl.acl_mode.contains(AclMode::CREATE_ROLE)); assert!(!acl.acl_mode.contains(AclMode::CREATE_DB)); assert!(!acl.acl_mode.contains(AclMode::CREATE_CLUSTER)); + assert!(!acl.acl_mode.contains(AclMode::CREATE_NETWORK_POLICY)); assert_eq!(s, acl.to_string()); let s = "=/100"; @@ -1030,9 +1050,10 @@ fn test_acl_parsing() { assert!(!acl.acl_mode.contains(AclMode::CREATE_ROLE)); assert!(!acl.acl_mode.contains(AclMode::CREATE_DB)); assert!(!acl.acl_mode.contains(AclMode::CREATE_CLUSTER)); + assert!(!acl.acl_mode.contains(AclMode::CREATE_NETWORK_POLICY)); assert_eq!(s, acl.to_string()); - let s = "1=RBN/2"; + let s = "1=RBNP/2"; let acl: AclItem = s.parse().unwrap(); assert_eq!(1, acl.grantee.0); assert_eq!(2, acl.grantor.0); @@ -1045,6 +1066,7 @@ fn test_acl_parsing() { assert!(acl.acl_mode.contains(AclMode::CREATE_ROLE)); assert!(acl.acl_mode.contains(AclMode::CREATE_DB)); assert!(acl.acl_mode.contains(AclMode::CREATE_CLUSTER)); + assert!(acl.acl_mode.contains(AclMode::CREATE_NETWORK_POLICY)); assert_eq!(s, acl.to_string()); mz_ore::assert_err!("42/rw=666".parse::()); diff --git a/src/sql-lexer/src/keywords.txt b/src/sql-lexer/src/keywords.txt index 11da8a7a40b8d..48c20f647b059 100644 --- a/src/sql-lexer/src/keywords.txt +++ b/src/sql-lexer/src/keywords.txt @@ -24,8 +24,10 @@ # For details on the code that is generated, see keywords.rs. Access +Action Add Added +Address Addresses Aggregate Aggregation @@ -104,6 +106,7 @@ Counter Create Createcluster Createdb +Createnetworkpolicy Createrole Creation Cross @@ -132,6 +135,7 @@ Delimiter Delta Desc Details +Direction Discard Disk Distinct @@ -276,6 +280,7 @@ Name Names Natural Negative +Network New Next No @@ -317,6 +322,8 @@ Path Physical Plan Plans +Policies +Policy Port Position Postgres @@ -376,6 +383,7 @@ Rotate Rounds Row Rows +Rules Sasl Scale Schedule diff --git a/src/sql-parser/src/ast/defs/name.rs b/src/sql-parser/src/ast/defs/name.rs index 72762754042c4..42259b4162948 100644 --- a/src/sql-parser/src/ast/defs/name.rs +++ b/src/sql-parser/src/ast/defs/name.rs @@ -429,6 +429,7 @@ pub enum UnresolvedObjectName { Schema(UnresolvedSchemaName), Role(Ident), Item(UnresolvedItemName), + NetworkPolicy(Ident), } impl AstDisplay for UnresolvedObjectName { @@ -440,6 +441,7 @@ impl AstDisplay for UnresolvedObjectName { UnresolvedObjectName::Schema(n) => f.write_node(n), UnresolvedObjectName::Role(n) => f.write_node(n), UnresolvedObjectName::Item(n) => f.write_node(n), + UnresolvedObjectName::NetworkPolicy(n) => f.write_node(n), } } } diff --git a/src/sql-parser/src/ast/defs/statement.rs b/src/sql-parser/src/ast/defs/statement.rs index 832f2e4ae5088..324a83cdfd039 100644 --- a/src/sql-parser/src/ast/defs/statement.rs +++ b/src/sql-parser/src/ast/defs/statement.rs @@ -64,6 +64,7 @@ pub enum Statement { CreateCluster(CreateClusterStatement), CreateClusterReplica(CreateClusterReplicaStatement), CreateSecret(CreateSecretStatement), + CreateNetworkPolicy(CreateNetworkPolicyStatement), AlterCluster(AlterClusterStatement), AlterOwner(AlterOwnerStatement), AlterObjectRename(AlterObjectRenameStatement), @@ -78,6 +79,7 @@ pub enum Statement { AlterSystemReset(AlterSystemResetStatement), AlterSystemResetAll(AlterSystemResetAllStatement), AlterConnection(AlterConnectionStatement), + AlterNetworkPolicy(AlterNetworkPolicyStatement), AlterRole(AlterRoleStatement), AlterTableAddColumn(AlterTableAddColumnStatement), Discard(DiscardStatement), @@ -138,7 +140,9 @@ impl AstDisplay for Statement { Statement::CreateType(stmt) => f.write_node(stmt), Statement::CreateCluster(stmt) => f.write_node(stmt), Statement::CreateClusterReplica(stmt) => f.write_node(stmt), + Statement::CreateNetworkPolicy(stmt) => f.write_node(stmt), Statement::AlterCluster(stmt) => f.write_node(stmt), + Statement::AlterNetworkPolicy(stmt) => f.write_node(stmt), Statement::AlterOwner(stmt) => f.write_node(stmt), Statement::AlterObjectRename(stmt) => f.write_node(stmt), Statement::AlterRetainHistory(stmt) => f.write_node(stmt), @@ -215,11 +219,13 @@ pub fn statement_kind_label_value(kind: StatementKind) -> &'static str { StatementKind::CreateCluster => "create_cluster", StatementKind::CreateClusterReplica => "create_cluster_replica", StatementKind::CreateSecret => "create_secret", + StatementKind::CreateNetworkPolicy => "create_network_policy", StatementKind::AlterCluster => "alter_cluster", StatementKind::AlterObjectRename => "alter_object_rename", StatementKind::AlterRetainHistory => "alter_retain_history", StatementKind::AlterObjectSwap => "alter_object_swap", StatementKind::AlterIndex => "alter_index", + StatementKind::AlterNetworkPolicy => "alter_network_policy", StatementKind::AlterRole => "alter_role", StatementKind::AlterSecret => "alter_secret", StatementKind::AlterSetCluster => "alter_set_cluster", @@ -1934,6 +1940,136 @@ impl AstDisplay for SetRoleVar { } impl_display!(SetRoleVar); +/// A `CREATE ROLE` statement. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct AlterNetworkPolicyStatement { + /// The specified role. + pub name: Ident, + /// Any options that were attached, in the order they were presented. + pub options: Vec>, +} + +impl AstDisplay for AlterNetworkPolicyStatement { + fn fmt(&self, f: &mut AstFormatter) { + f.write_str("ALTER "); + f.write_str("NETWORK POLICY "); + f.write_node(&self.name); + f.write_str(" ("); + f.write_node(&display::comma_separated(&self.options)); + f.write_str(" )"); + } +} +impl_display_t!(AlterNetworkPolicyStatement); + +/// A `CREATE ROLE` statement. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct CreateNetworkPolicyStatement { + /// The specified role. + pub name: Ident, + /// Any options that were attached, in the order they were presented. + pub options: Vec>, +} + +impl AstDisplay for CreateNetworkPolicyStatement { + fn fmt(&self, f: &mut AstFormatter) { + f.write_str("CREATE "); + f.write_str("NETWORK POLICY "); + f.write_node(&self.name); + f.write_str(" ("); + f.write_node(&display::comma_separated(&self.options)); + f.write_str(" )"); + } +} +impl_display_t!(CreateNetworkPolicyStatement); + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct NetworkPolicyOption { + pub name: NetworkPolicyOptionName, + pub value: Option>, +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum NetworkPolicyOptionName { + Rules, +} + +impl WithOptionName for NetworkPolicyOptionName { + /// # WARNING + /// + /// Whenever implementing this trait consider very carefully whether or not + /// this value could contain sensitive user data. If you're uncertain, err + /// on the conservative side and return `true`. + fn redact_value(&self) -> bool { + match self { + NetworkPolicyOptionName::Rules => false, + } + } +} + +impl AstDisplay for NetworkPolicyOptionName { + fn fmt(&self, f: &mut AstFormatter) { + match self { + NetworkPolicyOptionName::Rules => f.write_str("RULES"), + } + } +} +impl_display_for_with_option!(NetworkPolicyOption); + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct NetworkPolicyRuleDefinition { + pub name: Ident, + pub options: Vec>, +} + +impl AstDisplay for NetworkPolicyRuleDefinition { + fn fmt(&self, f: &mut AstFormatter) { + f.write_node(&self.name); + f.write_str(" ("); + f.write_node(&display::comma_separated(&self.options)); + f.write_str(" )"); + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct NetworkPolicyRuleOption { + pub name: NetworkPolicyRuleOptionName, + pub value: Option>, +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum NetworkPolicyRuleOptionName { + Direction, + Action, + Address, +} + +impl WithOptionName for NetworkPolicyRuleOptionName { + /// # WARNING + /// + /// Whenever implementing this trait consider very carefully whether or not + /// this value could contain sensitive user data. If you're uncertain, err + /// on the conservative side and return `true`. + fn redact_value(&self) -> bool { + match self { + NetworkPolicyRuleOptionName::Direction + | NetworkPolicyRuleOptionName::Action + | NetworkPolicyRuleOptionName::Address => false, + } + } +} + +impl AstDisplay for NetworkPolicyRuleOptionName { + fn fmt(&self, f: &mut AstFormatter) { + match self { + NetworkPolicyRuleOptionName::Direction => f.write_str("DIRECTION"), + NetworkPolicyRuleOptionName::Action => f.write_str("ACTION"), + NetworkPolicyRuleOptionName::Address => f.write_str("ADDRESS"), + } + } +} + +impl_display_for_with_option!(NetworkPolicyRuleOption); + /// A `CREATE SECRET` statement. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct CreateSecretStatement { @@ -3247,6 +3383,7 @@ pub enum ShowObjectType { ContinualTask { in_cluster: Option, }, + NetworkPolicy, } /// `SHOW S` /// @@ -3289,6 +3426,7 @@ impl AstDisplay for ShowObjectsStatement { ShowObjectType::DefaultPrivileges { .. } => "DEFAULT PRIVILEGES", ShowObjectType::RoleMembership { .. } => "ROLE MEMBERSHIP", ShowObjectType::ContinualTask { .. } => "CONTINUAL TASKS", + ShowObjectType::NetworkPolicy => "NETWORK POLICIES", }); if let ShowObjectType::Index { on_object, .. } = &self.object_type { @@ -3889,6 +4027,7 @@ pub enum ObjectType { Func, Subsource, ContinualTask, + NetworkPolicy, } impl ObjectType { @@ -3910,7 +4049,8 @@ impl ObjectType { | ObjectType::Schema | ObjectType::Cluster | ObjectType::ClusterReplica - | ObjectType::Role => false, + | ObjectType::Role + | ObjectType::NetworkPolicy => false, } } } @@ -3935,6 +4075,7 @@ impl AstDisplay for ObjectType { ObjectType::Func => "FUNCTION", ObjectType::Subsource => "SUBSOURCE", ObjectType::ContinualTask => "CONTINUAL TASK", + ObjectType::NetworkPolicy => "NETWORK POLICY", }) } } @@ -3999,6 +4140,7 @@ pub enum WithOptionValue { Refresh(RefreshOptionValue), ClusterScheduleOptionValue(ClusterScheduleOptionValue), ClusterAlterStrategy(ClusterAlterOptionValue), + NetworkPolicyRules(Vec>), } impl AstDisplay for WithOptionValue { @@ -4026,7 +4168,8 @@ impl AstDisplay for WithOptionValue { | WithOptionValue::ConnectionAwsPrivatelink(_) | WithOptionValue::ClusterReplicas(_) | WithOptionValue::ClusterScheduleOptionValue(_) - | WithOptionValue::ClusterAlterStrategy(_) => { + | WithOptionValue::ClusterAlterStrategy(_) + | WithOptionValue::NetworkPolicyRules(_) => { // These do not need redaction. } } @@ -4066,6 +4209,11 @@ impl AstDisplay for WithOptionValue { f.write_node(&display::comma_separated(replicas)); f.write_str(")"); } + WithOptionValue::NetworkPolicyRules(rules) => { + f.write_str("("); + f.write_node(&display::comma_separated(rules)); + f.write_str(")"); + } WithOptionValue::ConnectionAwsPrivatelink(aws_privatelink) => { f.write_node(aws_privatelink); } @@ -4846,6 +4994,7 @@ pub enum Privilege { CREATEROLE, CREATEDB, CREATECLUSTER, + CREATENETWORKPOLICY, } impl AstDisplay for Privilege { @@ -4860,6 +5009,7 @@ impl AstDisplay for Privilege { Privilege::CREATEROLE => "CREATEROLE", Privilege::CREATEDB => "CREATEDB", Privilege::CREATECLUSTER => "CREATECLUSTER", + Privilege::CREATENETWORKPOLICY => "CREATENETWORKPOLICY", }); } } @@ -5227,6 +5377,7 @@ pub enum CommentObjectType { Cluster { name: T::ClusterName }, ClusterReplica { name: QualifiedReplica }, ContinualTask { name: T::ItemName }, + NetworkPolicy { name: T::NetworkPolicyName }, } impl AstDisplay for CommentObjectType { @@ -5302,6 +5453,10 @@ impl AstDisplay for CommentObjectType { f.write_str("CONTINUAL TASK "); f.write_node(name); } + NetworkPolicy { name } => { + f.write_str("NETWORK POLICY "); + f.write_node(name); + } } } } diff --git a/src/sql-parser/src/ast/metadata.rs b/src/sql-parser/src/ast/metadata.rs index 6cb6426630c3a..aa23d871f082e 100644 --- a/src/sql-parser/src/ast/metadata.rs +++ b/src/sql-parser/src/ast/metadata.rs @@ -47,7 +47,7 @@ pub trait AstInfo: Clone { /// The type used to specify a column. /// /// n.b. when implementing visitors, you likely want to build the visitor to - /// vist [`crate::ast::ColumnName`] instead of visiting this struct + /// visit [`crate::ast::ColumnName`] instead of visiting this struct /// directly. The visitor on this should usually just return an error. type ColumnReference: AstDisplay + Clone + Hash + Debug + Eq + Ord; /// The type used for schema names. @@ -62,6 +62,8 @@ pub trait AstInfo: Clone { type CteId: Clone + Hash + Debug + Eq + Ord; /// The type used for role references. type RoleName: AstDisplay + Clone + Hash + Debug + Eq + Ord; + /// The type used for network policy references. + type NetworkPolicyName: AstDisplay + Clone + Hash + Debug + Eq + Ord; /// They type used for any object names. Objects are the superset of all objects in Materialize. type ObjectName: AstDisplay + Clone + Hash + Debug + Eq + Ord; } @@ -79,6 +81,7 @@ impl AstInfo for Raw { type DataType = RawDataType; type CteId = (); type RoleName = Ident; + type NetworkPolicyName = RawNetworkPolicyName; type ObjectName = UnresolvedObjectName; } @@ -168,6 +171,38 @@ where } } +#[derive(Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone)] +pub enum RawNetworkPolicyName { + Unresolved(Ident), + Resolved(String), +} + +impl AstDisplay for RawNetworkPolicyName { + fn fmt(&self, f: &mut AstFormatter) { + match self { + RawNetworkPolicyName::Unresolved(id) => f.write_node(id), + RawNetworkPolicyName::Resolved(id) => { + f.write_str(format!("[{}]", id)); + } + } + } +} +impl_display!(RawNetworkPolicyName); + +impl FoldNode for RawNetworkPolicyName +where + T: AstInfo, +{ + type Folded = T::NetworkPolicyName; + + fn fold(self, f: &mut F) -> Self::Folded + where + F: Fold, + { + f.fold_network_policy_name(self) + } +} + /// SQL data types #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub enum RawDataType { diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 8c4d612266948..664e0155ccdde 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -1903,6 +1903,9 @@ impl<'a> Parser<'a> { self.peek_pos(), "CREATE USER is not supported, for more information consult the documentation at https://materialize.com/docs/sql/create-role/#details" ).map_parser_err(StatementKind::CreateRole) + } else if self.peek_keywords(&[NETWORK, POLICY]) { + self.parse_create_network_policy() + .map_parser_err(StatementKind::CreateNetworkPolicy) } else { let index = self.index; @@ -3922,6 +3925,20 @@ impl<'a> Parser<'a> { } } + fn parse_raw_network_policy_name(&mut self) -> Result { + if self.consume_token(&Token::LBracket) { + let id = match self.next_token() { + Some(Token::Ident(id)) => id.into_inner(), + Some(Token::Number(n)) => n, + _ => return parser_err!(self, self.peek_prev_pos(), "expected id"), + }; + self.expect_token(&Token::RBracket)?; + Ok(RawNetworkPolicyName::Resolved(id)) + } else { + Ok(RawNetworkPolicyName::Unresolved(self.parse_identifier()?)) + } + } + fn parse_optional_in_cluster(&mut self) -> Result, ParserError> { if self.parse_keywords(&[IN, CLUSTER]) { Ok(Some(self.parse_raw_ident()?)) @@ -4434,6 +4451,19 @@ impl<'a> Parser<'a> { cascade: false, })) } + ObjectType::NetworkPolicy => { + let names = self.parse_comma_separated(|parser| { + Ok(UnresolvedObjectName::NetworkPolicy( + parser.parse_identifier()?, + )) + })?; + Ok(Statement::DropObjects(DropObjectsStatement { + object_type: ObjectType::NetworkPolicy, + if_exists, + names, + cascade: false, + })) + } ObjectType::Cluster => self.parse_drop_clusters(if_exists), ObjectType::ClusterReplica => self.parse_drop_cluster_replicas(if_exists), ObjectType::Table @@ -4524,6 +4554,82 @@ impl<'a> Parser<'a> { Ok(QualifiedReplica { cluster, replica }) } + fn parse_alter_network_policy(&mut self) -> Result, ParserError> { + let name = self.parse_identifier()?; + self.expect_keyword(SET)?; + self.expect_token(&Token::LParen)?; + let options = self.parse_comma_separated(Parser::parse_network_policy_option)?; + self.expect_token(&Token::RParen)?; + Ok(Statement::AlterNetworkPolicy(AlterNetworkPolicyStatement { + name, + options, + })) + } + + fn parse_create_network_policy(&mut self) -> Result, ParserError> { + self.expect_keywords(&[NETWORK, POLICY])?; + let name = self.parse_identifier()?; + self.expect_token(&Token::LParen)?; + let options = self.parse_comma_separated(Parser::parse_network_policy_option)?; + self.expect_token(&Token::RParen)?; + Ok(Statement::CreateNetworkPolicy( + CreateNetworkPolicyStatement { name, options }, + )) + } + + fn parse_network_policy_option_name(&mut self) -> Result { + let option = self.expect_one_of_keywords(&[RULES])?; + let name = match option { + RULES => NetworkPolicyOptionName::Rules, + _ => unreachable!(), + }; + Ok(name) + } + fn parse_network_policy_option(&mut self) -> Result, ParserError> { + let name = self.parse_network_policy_option_name()?; + match name { + NetworkPolicyOptionName::Rules => self.parse_network_policy_option_rules(), + } + } + + fn parse_network_policy_option_rules( + &mut self, + ) -> Result, ParserError> { + let _ = self.consume_token(&Token::Eq); + self.expect_token(&Token::LParen)?; + let rules = if self.consume_token(&Token::RParen) { + vec![] + } else { + let rules = self.parse_comma_separated(|parser| { + let name = parser.parse_identifier()?; + parser.expect_token(&Token::LParen)?; + let options = + parser.parse_comma_separated(Parser::parse_network_policy_rule_option)?; + parser.expect_token(&Token::RParen)?; + Ok(NetworkPolicyRuleDefinition { name, options }) + })?; + self.expect_token(&Token::RParen)?; + rules + }; + Ok(NetworkPolicyOption { + name: NetworkPolicyOptionName::Rules, + value: Some(WithOptionValue::NetworkPolicyRules(rules)), + }) + } + + fn parse_network_policy_rule_option( + &mut self, + ) -> Result, ParserError> { + let name = match self.expect_one_of_keywords(&[ACTION, ADDRESS, DIRECTION])? { + ACTION => NetworkPolicyRuleOptionName::Action, + ADDRESS => NetworkPolicyRuleOptionName::Address, + DIRECTION => NetworkPolicyRuleOptionName::Direction, + v => panic!("found unreachable keyword {}", v), + }; + let value = self.parse_optional_option_value()?; + Ok(NetworkPolicyRuleOption { name, value }) + } + fn parse_create_table(&mut self) -> Result, ParserError> { let temporary = self.parse_keyword(TEMPORARY) | self.parse_keyword(TEMP); self.expect_keyword(TABLE)?; @@ -5160,6 +5266,9 @@ impl<'a> Parser<'a> { })) } ObjectType::Schema => self.parse_alter_schema(object_type), + ObjectType::NetworkPolicy => self + .parse_alter_network_policy() + .map_parser_err(StatementKind::AlterNetworkPolicy), ObjectType::Func | ObjectType::Subsource => parser_err!( self, self.peek_prev_pos(), @@ -6699,6 +6808,9 @@ impl<'a> Parser<'a> { } ObjectType::Database => UnresolvedObjectName::Database(self.parse_database_name()?), ObjectType::Schema => UnresolvedObjectName::Schema(self.parse_schema_name()?), + ObjectType::NetworkPolicy => { + UnresolvedObjectName::NetworkPolicy(self.parse_identifier()?) + } }) } @@ -7460,6 +7572,7 @@ impl<'a> Parser<'a> { ObjectType::Secret => ShowObjectType::Secret, ObjectType::Connection => ShowObjectType::Connection, ObjectType::Cluster => ShowObjectType::Cluster, + ObjectType::NetworkPolicy => ShowObjectType::NetworkPolicy, ObjectType::MaterializedView => { let in_cluster = self.parse_optional_in_cluster()?; ShowObjectType::MaterializedView { in_cluster } @@ -8824,7 +8937,8 @@ impl<'a> Parser<'a> { | ObjectType::Secret | ObjectType::Connection | ObjectType::Database - | ObjectType::Schema => Ok(object_type), + | ObjectType::Schema + | ObjectType::NetworkPolicy => Ok(object_type), } } @@ -8848,6 +8962,7 @@ impl<'a> Parser<'a> { SCHEMA, FUNCTION, CONTINUAL, + NETWORK, ])? { TABLE => ObjectType::Table, VIEW => ObjectType::View, @@ -8882,6 +8997,13 @@ impl<'a> Parser<'a> { } ObjectType::ContinualTask } + NETWORK => { + if let Err(e) = self.expect_keyword(POLICY) { + self.prev_token(); + return Err(e); + } + ObjectType::NetworkPolicy + } _ => unreachable!(), }, ) @@ -8958,6 +9080,7 @@ impl<'a> Parser<'a> { CONNECTIONS, DATABASES, SCHEMAS, + POLICIES, ])? { TABLES => ObjectType::Table, VIEWS => ObjectType::View, @@ -8985,6 +9108,7 @@ impl<'a> Parser<'a> { CONNECTIONS => ObjectType::Connection, DATABASES => ObjectType::Database, SCHEMAS => ObjectType::Schema, + POLICIES => ObjectType::NetworkPolicy, _ => unreachable!(), }, ) @@ -9011,6 +9135,7 @@ impl<'a> Parser<'a> { SCHEMAS, SUBSOURCES, CONTINUAL, + NETWORK, ])? { TABLES => ObjectType::Table, VIEWS => ObjectType::View, @@ -9049,6 +9174,14 @@ impl<'a> Parser<'a> { return None; } } + NETWORK => { + if self.parse_keyword(POLICIES) { + ObjectType::NetworkPolicy + } else { + self.prev_token(); + return None; + } + } _ => unreachable!(), }, ) @@ -9153,6 +9286,7 @@ impl<'a> Parser<'a> { CREATEROLE, CREATEDB, CREATECLUSTER, + CREATENETWORKPOLICY, ])? { INSERT => Privilege::INSERT, SELECT => Privilege::SELECT, @@ -9163,6 +9297,7 @@ impl<'a> Parser<'a> { CREATEROLE => Privilege::CREATEROLE, CREATEDB => Privilege::CREATEDB, CREATECLUSTER => Privilege::CREATECLUSTER, + CREATENETWORKPOLICY => Privilege::CREATENETWORKPOLICY, _ => unreachable!(), }, ) @@ -9229,6 +9364,7 @@ impl<'a> Parser<'a> { SCHEMA, CLUSTER, CONTINUAL, + NETWORK, ])? { TABLE => { let name = self.parse_raw_name()?; @@ -9301,6 +9437,11 @@ impl<'a> Parser<'a> { let name = self.parse_raw_name()?; CommentObjectType::ContinualTask { name } } + NETWORK => { + self.expect_keyword(POLICY)?; + let name = self.parse_raw_network_policy_name()?; + CommentObjectType::NetworkPolicy { name } + } _ => unreachable!(), }; diff --git a/src/sql/src/catalog.rs b/src/sql/src/catalog.rs index 37860bfa6452a..2b19150f6cb16 100644 --- a/src/sql/src/catalog.rs +++ b/src/sql/src/catalog.rs @@ -161,6 +161,12 @@ pub trait SessionCatalog: fmt::Debug + ExprHumanizer + Send + Sync + ConnectionR /// Resolves the named role. fn resolve_role(&self, role_name: &str) -> Result<&dyn CatalogRole, CatalogError>; + /// Resolves the named network policy. + fn resolve_network_policy( + &self, + network_policy_name: &str, + ) -> Result<&dyn CatalogNetworkPolicy, CatalogError>; + /// Gets a role by its ID. fn try_get_role(&self, id: &RoleId) -> Option<&dyn CatalogRole>; @@ -1412,6 +1418,7 @@ impl From for ObjectType { mz_sql_parser::ast::ObjectType::Schema => ObjectType::Schema, mz_sql_parser::ast::ObjectType::Func => ObjectType::Func, mz_sql_parser::ast::ObjectType::ContinualTask => ObjectType::ContinualTask, + mz_sql_parser::ast::ObjectType::NetworkPolicy => ObjectType::NetworkPolicy, } } } diff --git a/src/sql/src/names.rs b/src/sql/src/names.rs index 43faafe9429e9..b648384e9a285 100644 --- a/src/sql/src/names.rs +++ b/src/sql/src/names.rs @@ -24,7 +24,7 @@ use mz_repr::network_policy_id::NetworkPolicyId; use mz_repr::role_id::RoleId; use mz_repr::{CatalogItemId, GlobalId}; use mz_repr::{ColumnName, RelationVersionSelector}; -use mz_sql_parser::ast::{CreateContinualTaskStatement, Expr, Version}; +use mz_sql_parser::ast::{CreateContinualTaskStatement, Expr, RawNetworkPolicyName, Version}; use mz_sql_parser::ident; use proptest_derive::Arbitrary; use serde::{Deserialize, Serialize}; @@ -790,6 +790,18 @@ impl AstDisplay for ResolvedRoleName { } } +#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct ResolvedNetworkPolicyName { + pub id: NetworkPolicyId, + pub name: String, +} + +impl AstDisplay for ResolvedNetworkPolicyName { + fn fmt(&self, f: &mut AstFormatter) { + f.write_str(format!("[{} AS {}]", self.id, self.name)); + } +} + #[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] pub enum ResolvedObjectName { Cluster(ResolvedClusterName), @@ -797,6 +809,7 @@ pub enum ResolvedObjectName { Database(ResolvedDatabaseName), Schema(ResolvedSchemaName), Role(ResolvedRoleName), + NetworkPolicy(ResolvedNetworkPolicyName), Item(ResolvedItemName), } @@ -809,6 +822,7 @@ impl AstDisplay for ResolvedObjectName { ResolvedObjectName::Schema(n) => f.write_node(n), ResolvedObjectName::Role(n) => f.write_node(n), ResolvedObjectName::Item(n) => f.write_node(n), + ResolvedObjectName::NetworkPolicy(n) => f.write_node(n), } } } @@ -824,6 +838,7 @@ impl AstInfo for Aug { type CteId = LocalId; type RoleName = ResolvedRoleName; type ObjectName = ResolvedObjectName; + type NetworkPolicyName = ResolvedNetworkPolicyName; } /// The identifier for a schema. @@ -1051,6 +1066,7 @@ impl TryFrom for ObjectId { } ResolvedItemName::Error => Err(anyhow!("error in name resolution")), }, + ResolvedObjectName::NetworkPolicy(name) => Ok(ObjectId::NetworkPolicy(name.id)), } } } @@ -1910,6 +1926,12 @@ impl<'a> Fold for NameResolver<'a> { ClusterAlterStrategy(value) => { ClusterAlterStrategy(self.fold_cluster_alter_option_value(value)) } + NetworkPolicyRules(rules) => NetworkPolicyRules( + rules + .into_iter() + .map(|r| self.fold_network_policy_rule_definition(r)) + .collect(), + ), } } @@ -1931,6 +1953,29 @@ impl<'a> Fold for NameResolver<'a> { } } } + + fn fold_network_policy_name( + &mut self, + name: ::NetworkPolicyName, + ) -> ::NetworkPolicyName { + match self.catalog.resolve_network_policy(&name.to_string()) { + Ok(policy) => ResolvedNetworkPolicyName { + id: policy.id(), + name: policy.name().to_string(), + }, + Err(e) => { + if self.status.is_ok() { + self.status = Err(e.into()); + } + // garbage value that will be ignored since there's an error. + ResolvedNetworkPolicyName { + id: NetworkPolicyId::User(0), + name: "".to_string(), + } + } + } + } + fn fold_object_name( &mut self, name: ::ObjectName, @@ -1968,6 +2013,9 @@ impl<'a> Fold for NameResolver<'a> { UnresolvedObjectName::Item(name) => { ResolvedObjectName::Item(self.fold_item_name(RawItemName::Name(name))) } + UnresolvedObjectName::NetworkPolicy(name) => ResolvedObjectName::NetworkPolicy( + self.fold_network_policy_name(RawNetworkPolicyName::Unresolved(name)), + ), } } diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs index a0220d6d44a9e..4c8ee0c0b9eee 100644 --- a/src/sql/src/plan.rs +++ b/src/sql/src/plan.rs @@ -42,6 +42,7 @@ use mz_ore::now::{self, NOW_ZERO}; use mz_pgcopy::CopyFormatParams; use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem}; use mz_repr::explain::{ExplainConfig, ExplainFormat}; +use mz_repr::network_policy_id::NetworkPolicyId; use mz_repr::optimize::OptimizerFeatureOverrides; use mz_repr::refresh_schedule::RefreshSchedule; use mz_repr::role_id::RoleId; @@ -140,6 +141,7 @@ pub enum Plan { CreateView(CreateViewPlan), CreateMaterializedView(CreateMaterializedViewPlan), CreateContinualTask(CreateContinualTaskPlan), + CreateNetworkPolicy(CreateNetworkPolicyPlan), CreateIndex(CreateIndexPlan), CreateType(CreateTypePlan), Comment(CommentPlan), @@ -187,6 +189,7 @@ pub enum Plan { AlterRole(AlterRolePlan), AlterOwner(AlterOwnerPlan), AlterTableAddColumn(AlterTablePlan), + AlterNetworkPolicy(AlterNetworkPolicyPlan), Declare(DeclarePlan), Fetch(FetchPlan), Close(ClosePlan), @@ -228,6 +231,7 @@ impl Plan { PlanKind::AlterNoop, ], StatementKind::AlterRole => &[PlanKind::AlterRole], + StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy], StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret], StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster], StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink], @@ -259,6 +263,7 @@ impl Plan { StatementKind::CreateConnection => &[PlanKind::CreateConnection], StatementKind::CreateDatabase => &[PlanKind::CreateDatabase], StatementKind::CreateIndex => &[PlanKind::CreateIndex], + StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy], StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView], StatementKind::CreateContinualTask => &[PlanKind::CreateContinualTask], StatementKind::CreateRole => &[PlanKind::CreateRole], @@ -332,6 +337,7 @@ impl Plan { Plan::CreateContinualTask(_) => "create continual task", Plan::CreateIndex(_) => "create index", Plan::CreateType(_) => "create type", + Plan::CreateNetworkPolicy(_) => "create network policy", Plan::Comment(_) => "comment", Plan::DiscardTemp => "discard temp", Plan::DiscardAll => "discard all", @@ -411,6 +417,7 @@ impl Plan { Plan::AlterSystemReset(_) => "alter system", Plan::AlterSystemResetAll(_) => "alter system", Plan::AlterRole(_) => "alter role", + Plan::AlterNetworkPolicy(_) => "alter network policy", Plan::AlterOwner(plan) => match plan.object_type { ObjectType::Table => "alter table owner", ObjectType::View => "alter view owner", @@ -742,6 +749,19 @@ pub struct CreateContinualTaskPlan { pub continual_task: MaterializedView, } +#[derive(Debug, Clone)] +pub struct CreateNetworkPolicyPlan { + pub name: String, + pub rules: Vec, +} + +#[derive(Debug, Clone)] +pub struct AlterNetworkPolicyPlan { + pub id: NetworkPolicyId, + pub name: String, + pub rules: Vec, +} + #[derive(Debug, Clone)] pub struct CreateIndexPlan { pub name: QualifiedItemName, @@ -1580,12 +1600,22 @@ impl std::fmt::Display for NetworkPolicyRuleAction { } } } +impl TryFrom<&str> for NetworkPolicyRuleAction { + type Error = PlanError; + fn try_from(value: &str) -> Result { + match value.to_uppercase().as_str() { + "ALLOW" => Ok(Self::Allow), + _ => Err(PlanError::Unstructured( + "Allow is the only valid option".into(), + )), + } + } +} #[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)] pub enum NetworkPolicyRuleDirection { Ingress, } - impl std::fmt::Display for NetworkPolicyRuleDirection { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -1593,17 +1623,36 @@ impl std::fmt::Display for NetworkPolicyRuleDirection { } } } +impl TryFrom<&str> for NetworkPolicyRuleDirection { + type Error = PlanError; + fn try_from(value: &str) -> Result { + match value.to_uppercase().as_str() { + "INGRESS" => Ok(Self::Ingress), + _ => Err(PlanError::Unstructured( + "Ingress is the only valid option".into(), + )), + } + } +} #[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)] pub struct PolicyAddress(pub IpNet); -impl PolicyAddress { - pub fn to_string(&self) -> String { - self.0.to_string() +impl std::fmt::Display for PolicyAddress { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", &self.0.to_string()) } } impl From for PolicyAddress { fn from(value: String) -> Self { - Self(IpNet::from_str(&value).expect("expected")) + Self(IpNet::from_str(&value).expect("expected value to be IpNet")) + } +} +impl TryFrom<&str> for PolicyAddress { + type Error = PlanError; + fn try_from(value: &str) -> Result { + let net = IpNet::from_str(value) + .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?; + Ok(Self(net)) } } @@ -1612,7 +1661,7 @@ impl Serialize for PolicyAddress { where S: serde::Serializer, { - serializer.serialize_str(&self.0.to_string()) + serializer.serialize_str(&format!("{}", &self.0)) } } diff --git a/src/sql/src/plan/statement.rs b/src/sql/src/plan/statement.rs index 08ae5ee209aa9..b4489a566d0a2 100644 --- a/src/sql/src/plan/statement.rs +++ b/src/sql/src/plan/statement.rs @@ -136,6 +136,7 @@ pub fn describe( Statement::AlterSystemReset(stmt) => ddl::describe_alter_system_reset(&scx, stmt)?, Statement::AlterSystemResetAll(stmt) => ddl::describe_alter_system_reset_all(&scx, stmt)?, Statement::AlterTableAddColumn(stmt) => ddl::describe_alter_table_add_column(&scx, stmt)?, + Statement::AlterNetworkPolicy(stmt) => ddl::describe_alter_network_policy(&scx, stmt)?, Statement::Comment(stmt) => ddl::describe_comment(&scx, stmt)?, Statement::CreateCluster(stmt) => ddl::describe_create_cluster(&scx, stmt)?, Statement::CreateClusterReplica(stmt) => ddl::describe_create_cluster_replica(&scx, stmt)?, @@ -159,6 +160,7 @@ pub fn describe( ddl::describe_create_materialized_view(&scx, stmt)? } Statement::CreateContinualTask(stmt) => ddl::describe_create_continual_task(&scx, stmt)?, + Statement::CreateNetworkPolicy(stmt) => ddl::describe_create_network_policy(&scx, stmt)?, Statement::DropObjects(stmt) => ddl::describe_drop_objects(&scx, stmt)?, Statement::DropOwned(stmt) => ddl::describe_drop_owned(&scx, stmt)?, @@ -321,6 +323,7 @@ pub fn plan( Statement::AlterSystemReset(stmt) => ddl::plan_alter_system_reset(scx, stmt), Statement::AlterSystemResetAll(stmt) => ddl::plan_alter_system_reset_all(scx, stmt), Statement::AlterTableAddColumn(stmt) => ddl::plan_alter_table_add_column(scx, stmt), + Statement::AlterNetworkPolicy(stmt) => ddl::plan_alter_network_policy(scx, stmt), Statement::Comment(stmt) => ddl::plan_comment(scx, stmt), Statement::CreateCluster(stmt) => ddl::plan_create_cluster(scx, stmt), Statement::CreateClusterReplica(stmt) => ddl::plan_create_cluster_replica(scx, stmt), @@ -342,6 +345,7 @@ pub fn plan( ddl::plan_create_materialized_view(scx, stmt, params) } Statement::CreateContinualTask(stmt) => ddl::plan_create_continual_task(scx, stmt, params), + Statement::CreateNetworkPolicy(stmt) => ddl::plan_create_network_policy(scx, stmt), Statement::DropObjects(stmt) => ddl::plan_drop_objects(scx, stmt), Statement::DropOwned(stmt) => ddl::plan_drop_owned(scx, stmt), @@ -1025,6 +1029,7 @@ impl From<&Statement> for StatementClassifica Statement::AlterIndex(_) => DDL, Statement::AlterObjectRename(_) => DDL, Statement::AlterObjectSwap(_) => DDL, + Statement::AlterNetworkPolicy(_) => DDL, Statement::AlterRetainHistory(_) => DDL, Statement::AlterRole(_) => DDL, Statement::AlterSecret(_) => DDL, @@ -1054,6 +1059,7 @@ impl From<&Statement> for StatementClassifica Statement::CreateType(_) => DDL, Statement::CreateView(_) => DDL, Statement::CreateMaterializedView(_) => DDL, + Statement::CreateNetworkPolicy(_) => DDL, Statement::DropObjects(_) => DDL, Statement::DropOwned(_) => DDL, diff --git a/src/sql/src/plan/statement/acl.rs b/src/sql/src/plan/statement/acl.rs index 4f0327c40172b..1886cba0c7359 100644 --- a/src/sql/src/plan/statement/acl.rs +++ b/src/sql/src/plan/statement/acl.rs @@ -27,7 +27,7 @@ use crate::names::{ }; use crate::plan::error::PlanError; use crate::plan::statement::ddl::{ - resolve_cluster, resolve_database, resolve_item_or_type, resolve_schema, + resolve_cluster, resolve_database, resolve_item_or_type, resolve_network_policy, resolve_schema, }; use crate::plan::statement::{StatementContext, StatementDesc}; use crate::plan::{ @@ -76,6 +76,9 @@ pub fn plan_alter_owner( (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => { plan_alter_schema_owner(scx, if_exists, name, new_owner.id) } + (ObjectType::NetworkPolicy, UnresolvedObjectName::NetworkPolicy(name)) => { + plan_alter_network_policy_owner(scx, if_exists, name, new_owner.id) + } (ObjectType::Role, UnresolvedObjectName::Role(_)) => unreachable!("rejected by the parser"), ( object_type @ ObjectType::Cluster @@ -91,6 +94,7 @@ pub fn plan_alter_owner( | name @ UnresolvedObjectName::ClusterReplica(_) | name @ UnresolvedObjectName::Database(_) | name @ UnresolvedObjectName::Schema(_) + | name @ UnresolvedObjectName::NetworkPolicy(_) | name @ UnresolvedObjectName::Role(_), ) => { unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}") @@ -236,6 +240,31 @@ fn plan_alter_item_owner( } } +fn plan_alter_network_policy_owner( + scx: &StatementContext, + if_exists: bool, + name: Ident, + new_owner: RoleId, +) -> Result { + match resolve_network_policy(scx, name.clone(), if_exists)? { + Some(policy_id) => Ok(Plan::AlterOwner(AlterOwnerPlan { + id: ObjectId::NetworkPolicy(policy_id.id), + object_type: ObjectType::Schema, + new_owner, + })), + None => { + scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist { + name: name.to_ast_string(), + object_type: ObjectType::NetworkPolicy, + }); + + Ok(Plan::AlterNoop(AlterNoopPlan { + object_type: ObjectType::NetworkPolicy, + })) + } + } +} + pub fn describe_grant_role( _: &StatementContext, _: GrantRoleStatement, @@ -581,6 +610,7 @@ fn privilege_to_acl_mode(privilege: Privilege) -> AclMode { Privilege::CREATEROLE => AclMode::CREATE_ROLE, Privilege::CREATEDB => AclMode::CREATE_DB, Privilege::CREATECLUSTER => AclMode::CREATE_CLUSTER, + Privilege::CREATENETWORKPOLICY => AclMode::CREATE_NETWORK_POLICY, } } diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index c74ff054e8f35..85d2a48efc6d9 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -34,6 +34,7 @@ use mz_postgres_util::tunnel::PostgresFlavor; use mz_proto::RustType; use mz_repr::adt::interval::Interval; use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap}; +use mz_repr::network_policy_id::NetworkPolicyId; use mz_repr::optimize::OptimizerFeatureOverrides; use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule}; use mz_repr::role_id::RoleId; @@ -44,20 +45,21 @@ use mz_sql_parser::ast::display::comma_separated; use mz_sql_parser::ast::{ self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption, AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement, - AlterObjectRenameStatement, AlterObjectSwapStatement, AlterRetainHistoryStatement, - AlterRoleOption, AlterRoleStatement, AlterSecretStatement, AlterSetClusterStatement, - AlterSinkAction, AlterSinkStatement, AlterSourceAction, AlterSourceAddSubsourceOption, - AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement, - AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema, - AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName, - ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName, - ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName, - ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement, - ConnectionOption, ConnectionOptionName, ContinualTaskOption, ContinualTaskOptionName, - CreateClusterReplicaStatement, CreateClusterStatement, CreateConnectionOption, - CreateConnectionOptionName, CreateConnectionStatement, CreateConnectionType, - CreateContinualTaskStatement, CreateDatabaseStatement, CreateIndexStatement, - CreateMaterializedViewStatement, CreateRoleStatement, CreateSchemaStatement, + AlterNetworkPolicyStatement, AlterObjectRenameStatement, AlterObjectSwapStatement, + AlterRetainHistoryStatement, AlterRoleOption, AlterRoleStatement, AlterSecretStatement, + AlterSetClusterStatement, AlterSinkAction, AlterSinkStatement, AlterSourceAction, + AlterSourceAddSubsourceOption, AlterSourceAddSubsourceOptionName, AlterSourceStatement, + AlterSystemResetAllStatement, AlterSystemResetStatement, AlterSystemSetStatement, + AlterTableAddColumnStatement, AvroSchema, AvroSchemaOption, AvroSchemaOptionName, + ClusterAlterOption, ClusterAlterOptionName, ClusterAlterOptionValue, + ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName, ClusterFeature, + ClusterFeatureName, ClusterOption, ClusterOptionName, ClusterScheduleOptionValue, ColumnDef, + ColumnOption, CommentObjectType, CommentStatement, ConnectionOption, ConnectionOptionName, + ContinualTaskOption, ContinualTaskOptionName, CreateClusterReplicaStatement, + CreateClusterStatement, CreateConnectionOption, CreateConnectionOptionName, + CreateConnectionStatement, CreateConnectionType, CreateContinualTaskStatement, + CreateDatabaseStatement, CreateIndexStatement, CreateMaterializedViewStatement, + CreateNetworkPolicyStatement, CreateRoleStatement, CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption, CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption, CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption, CreateSubsourceOptionName, @@ -69,13 +71,15 @@ use mz_sql_parser::ast::{ DropOwnedStatement, Expr, Format, FormatSpecifier, Ident, IfExistsBehavior, IndexOption, IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption, LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, - MySqlConfigOptionName, PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, - RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, - ReplicaOption, ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, - SourceIncludeMetadata, Statement, TableConstraint, TableFromSourceColumns, - TableFromSourceOption, TableFromSourceOptionName, TableOption, TableOptionName, - UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName, Value, - ViewDefinition, WithOptionValue, + MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName, + NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName, + PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue, + RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption, + ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata, + Statement, TableConstraint, TableFromSourceColumns, TableFromSourceOption, + TableFromSourceOptionName, TableOption, TableOptionName, UnresolvedDatabaseName, + UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName, Value, ViewDefinition, + WithOptionValue, }; use mz_sql_parser::ident; use mz_sql_parser::parser::StatementParseResult; @@ -122,7 +126,7 @@ use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionE use crate::names::{ Aug, CommentObjectId, DatabaseId, ObjectId, PartialItemName, QualifiedItemName, ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier, - ResolvedItemName, SchemaSpecifier, SystemObjectId, + ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId, }; use crate::normalize::{self, ident}; use crate::plan::error::PlanError; @@ -138,19 +142,21 @@ use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue}; use crate::plan::{ literal, plan_utils, query, transform_ast, AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan, AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, - AlterConnectionPlan, AlterItemRenamePlan, AlterNoopPlan, AlterOptionParameter, - AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan, AlterSchemaSwapPlan, - AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan, AlterSystemResetAllPlan, - AlterSystemResetPlan, AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, - ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, ConnectionDetails, - CreateClusterManagedPlan, CreateClusterPlan, CreateClusterReplicaPlan, + AlterConnectionPlan, AlterItemRenamePlan, AlterNetworkPolicyPlan, AlterNoopPlan, + AlterOptionParameter, AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan, + AlterSchemaSwapPlan, AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan, + AlterSystemResetAllPlan, AlterSystemResetPlan, AlterSystemSetPlan, AlterTablePlan, + ClusterSchedule, CommentPlan, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, + ConnectionDetails, CreateClusterManagedPlan, CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant, CreateConnectionPlan, CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan, - CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, - CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc, DropObjectsPlan, - DropOwnedPlan, Index, Ingestion, MaterializedView, Params, Plan, PlanClusterOption, PlanNotice, - QueryContext, ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type, VariableValue, - View, WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders, WebhookValidation, + CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan, + CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc, + DropObjectsPlan, DropOwnedPlan, Index, Ingestion, MaterializedView, NetworkPolicyRule, + NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Params, Plan, PlanClusterOption, + PlanNotice, PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink, Source, Table, + TableDataSource, Type, VariableValue, View, WebhookBodyFormat, WebhookHeaderFilters, + WebhookHeaders, WebhookValidation, }; use crate::session::vars::{ self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_KAFKA_SINK_HEADERS, @@ -2479,6 +2485,20 @@ pub fn describe_create_continual_task( Ok(StatementDesc::new(None)) } +pub fn describe_create_network_policy( + _: &StatementContext, + _: CreateNetworkPolicyStatement, +) -> Result { + Ok(StatementDesc::new(None)) +} + +pub fn describe_alter_network_policy( + _: &StatementContext, + _: AlterNetworkPolicyStatement, +) -> Result { + Ok(StatementDesc::new(None)) +} + pub fn plan_create_materialized_view( scx: &StatementContext, mut stmt: CreateMaterializedViewStatement, @@ -4122,6 +4142,115 @@ pub fn plan_create_role( })) } +pub fn plan_create_network_policy( + ctx: &StatementContext, + CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement, +) -> Result { + ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?; + let policy_options: NetworkPolicyOptionExtracted = options.try_into()?; + + let Some(rule_defs) = policy_options.rules else { + sql_bail!("RULES must be specified when creating network policies."); + }; + + let mut rules = vec![]; + for NetworkPolicyRuleDefinition { name, options } in rule_defs { + let NetworkPolicyRuleOptionExtracted { + seen: _, + direction, + action, + address, + } = options.try_into()?; + let (direction, action, address) = match (direction, action, address) { + (Some(direction), Some(action), Some(address)) => ( + NetworkPolicyRuleDirection::try_from(direction.as_str())?, + NetworkPolicyRuleAction::try_from(action.as_str())?, + PolicyAddress::try_from(address.as_str())?, + ), + (_, _, _) => { + sql_bail!("Direction, Address, and Action must specified when creating a rule") + } + }; + rules.push(NetworkPolicyRule { + name: normalize::ident(name), + direction, + action, + address, + }); + } + + if rules.len() + > ctx + .catalog + .system_vars() + .max_rules_per_network_policy() + .try_into()? + { + sql_bail!("RULES count exceeds max_rules_per_network_policy.") + } + + Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan { + name: normalize::ident(name), + rules, + })) +} + +pub fn plan_alter_network_policy( + ctx: &StatementContext, + AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement, +) -> Result { + ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?; + + let policy_options: NetworkPolicyOptionExtracted = options.try_into()?; + let policy = ctx.catalog.resolve_network_policy(&name.to_string())?; + + let Some(rule_defs) = policy_options.rules else { + sql_bail!("RULES must be specified when creating network policies."); + }; + + let mut rules = vec![]; + for NetworkPolicyRuleDefinition { name, options } in rule_defs { + let NetworkPolicyRuleOptionExtracted { + seen: _, + direction, + action, + address, + } = options.try_into()?; + + let (direction, action, address) = match (direction, action, address) { + (Some(direction), Some(action), Some(address)) => ( + NetworkPolicyRuleDirection::try_from(direction.as_str())?, + NetworkPolicyRuleAction::try_from(action.as_str())?, + PolicyAddress::try_from(address.as_str())?, + ), + (_, _, _) => { + sql_bail!("Direction, Address, and Action must specified when creating a rule") + } + }; + rules.push(NetworkPolicyRule { + name: normalize::ident(name), + direction, + action, + address, + }); + } + if rules.len() + > ctx + .catalog + .system_vars() + .max_rules_per_network_policy() + .try_into()? + { + sql_bail!("RULES count exceeds max_rules_per_network_policy.") + } + + Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan { + id: policy.id(), + name: normalize::ident(name), + rules, + })) +} + pub fn describe_create_cluster( _: &StatementContext, _: CreateClusterStatement, @@ -4148,6 +4277,18 @@ generate_extracted_config!( (WorkloadClass, OptionalString) ); +generate_extracted_config!( + NetworkPolicyOption, + (Rules, Vec>) +); + +generate_extracted_config!( + NetworkPolicyRuleOption, + (Direction, String), + (Action, String), + (Address, String) +); + generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue)); generate_extracted_config!( @@ -4956,6 +5097,9 @@ pub fn plan_drop_objects( plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)? .map(ObjectId::Item) } + UnresolvedObjectName::NetworkPolicy(name) => { + plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy) + } }; match id { Some(id) => referenced_ids.push(id), @@ -5054,6 +5198,25 @@ fn plan_drop_cluster( }) } +fn plan_drop_network_policy( + scx: &StatementContext, + if_exists: bool, + name: &Ident, +) -> Result, PlanError> { + match scx.catalog.resolve_network_policy(name.as_str()) { + Ok(policy) => { + // TODO @jubrad don't let this be dropped if it's the default policy or + // is being used by other objects. + // if scx.catalog.system_vars().default_network_policy().id() == policy.id() { + // return Err(PlanError::Unstructured("Cannot drop default network policy.") + // } + Ok(Some(policy.id())) + } + Err(_) if if_exists => Ok(None), + Err(e) => Err(e.into()), + } +} + /// Returns `true` if the cluster has any storage object. Return `false` if the cluster has no /// objects. fn contains_storage_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool { @@ -6971,6 +7134,9 @@ pub fn plan_comment( None, ) } + CommentObjectType::NetworkPolicy { name } => { + (CommentObjectId::NetworkPolicy(name.id), None) + } }; // Note: the `mz_comments` table uses an `Int4` for the column position, but in the catalog storage we @@ -7048,6 +7214,21 @@ pub(crate) fn resolve_schema<'a>( } } +pub(crate) fn resolve_network_policy<'a>( + scx: &'a StatementContext, + name: Ident, + if_exists: bool, +) -> Result, PlanError> { + match scx.catalog.resolve_network_policy(&name.to_string()) { + Ok(policy) => Ok(Some(ResolvedNetworkPolicyName { + id: policy.id(), + name: policy.name().to_string(), + })), + Err(_) if if_exists => Ok(None), + Err(e) => Err(e.into()), + } +} + pub(crate) fn resolve_item_or_type<'a>( scx: &'a StatementContext, object_type: ObjectType, diff --git a/src/sql/src/plan/statement/show.rs b/src/sql/src/plan/statement/show.rs index 32a02e6d949f3..cafcfa3226585 100644 --- a/src/sql/src/plan/statement/show.rs +++ b/src/sql/src/plan/statement/show.rs @@ -280,6 +280,14 @@ pub fn show_roles<'a>( ShowSelect::new(scx, query, filter, None, Some(&["name", "comment"])) } +pub fn show_network_policies<'a>( + scx: &'a StatementContext<'a>, + filter: Option>, +) -> Result, PlanError> { + let query = "SELECT name, comment FROM mz_internal.mz_show_network_policies".to_string(); + ShowSelect::new(scx, query, filter, None, Some(&["name", "comment"])) +} + pub fn show_objects<'a>( scx: &'a StatementContext<'a>, ShowObjectsStatement { @@ -340,6 +348,10 @@ pub fn show_objects<'a>( ShowObjectType::ContinualTask { in_cluster } => { show_continual_tasks(scx, from, in_cluster, filter) } + ShowObjectType::NetworkPolicy => { + assert_none!(from, "parser should reject from"); + show_network_policies(scx, filter) + } } } diff --git a/src/sql/src/plan/with_options.rs b/src/sql/src/plan/with_options.rs index 2840b3e9c879e..bf7a014643f5e 100644 --- a/src/sql/src/plan/with_options.rs +++ b/src/sql/src/plan/with_options.rs @@ -17,7 +17,7 @@ use mz_repr::bytes::ByteSize; use mz_repr::{strconv, GlobalId, RelationVersionSelector}; use mz_sql_parser::ast::{ ClusterAlterOptionValue, ClusterScheduleOptionValue, ConnectionDefaultAwsPrivatelink, Expr, - Ident, KafkaBroker, RefreshOptionValue, ReplicaDefinition, + Ident, KafkaBroker, NetworkPolicyRuleDefinition, RefreshOptionValue, ReplicaDefinition, }; use mz_storage_types::connections::string_or_secret::StringOrSecret; use serde::{Deserialize, Serialize}; @@ -663,7 +663,8 @@ impl, T: AstInfo + std::fmt::Debug> TryFromValue sql_bail!( + | WithOptionValue::ClusterScheduleOptionValue(_) + | WithOptionValue::NetworkPolicyRules(_) => sql_bail!( "incompatible value types: cannot convert {} to {}", match v { // The first few are unreachable because they are handled at the top of the outer match. @@ -683,6 +684,7 @@ impl, T: AstInfo + std::fmt::Debug> TryFromValue "connection kafka brokers", WithOptionValue::Refresh(_) => "refresh option values", WithOptionValue::ClusterScheduleOptionValue(_) => "cluster schedule", + WithOptionValue::NetworkPolicyRules(_) => "network policy rules", }, V::name() ), @@ -904,3 +906,26 @@ impl TryFromValue> for ClusterAlterOptionValue { Some(WithOptionValue::ClusterAlterStrategy(self)) } } + +impl TryFromValue> for Vec> { + fn try_from_value(v: WithOptionValue) -> Result { + match v { + WithOptionValue::NetworkPolicyRules(rules) => Ok(rules), + _ => sql_bail!("cannot use value as cluster replicas"), + } + } + + fn try_into_value(self, _catalog: &dyn SessionCatalog) -> Option> { + Some(WithOptionValue::NetworkPolicyRules(self)) + } + + fn name() -> String { + "network policy rules".to_string() + } +} + +impl ImpliedValue for Vec> { + fn implied_value() -> Result { + sql_bail!("must provide a set of network policy rules") + } +} diff --git a/src/sql/src/rbac.rs b/src/sql/src/rbac.rs index 25fb57e8d03e6..d2ba3bc376948 100644 --- a/src/sql/src/rbac.rs +++ b/src/sql/src/rbac.rs @@ -438,6 +438,15 @@ fn generate_rbac_requirements( item_usage: &CREATE_ITEM_USAGE, ..Default::default() }, + Plan::CreateNetworkPolicy(plan::CreateNetworkPolicyPlan { .. }) => RbacRequirements { + privileges: vec![( + SystemObjectId::System, + AclMode::CREATE_NETWORK_POLICY, + role_id, + )], + item_usage: &CREATE_ITEM_USAGE, + ..Default::default() + }, Plan::CreateCluster(plan::CreateClusterPlan { name: _, variant: _, @@ -1201,6 +1210,11 @@ fn generate_rbac_requirements( item_usage: &CREATE_ITEM_USAGE, ..Default::default() }, + Plan::AlterNetworkPolicy(plan::AlterNetworkPolicyPlan { id, .. }) => RbacRequirements { + ownership: vec![ObjectId::NetworkPolicy(*id)], + item_usage: &CREATE_ITEM_USAGE, + ..Default::default() + }, Plan::ReadThenWrite(plan::ReadThenWritePlan { id, selection, @@ -1702,7 +1716,9 @@ pub const fn all_object_privileges(object_type: SystemObjectType) -> AclMode { const USAGE_CREATE_ACL_MODE: AclMode = AclMode::USAGE.union(AclMode::CREATE); const ALL_SYSTEM_PRIVILEGES: AclMode = AclMode::CREATE_ROLE .union(AclMode::CREATE_DB) - .union(AclMode::CREATE_CLUSTER); + .union(AclMode::CREATE_CLUSTER) + .union(AclMode::CREATE_NETWORK_POLICY); + const EMPTY_ACL_MODE: AclMode = AclMode::empty(); match object_type { SystemObjectType::Object(ObjectType::Table) => TABLE_ACL_MODE, diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index a96bbfe24d294..d332588232784 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -1190,6 +1190,8 @@ impl SystemVars { &MAX_SECRETS, &MAX_ROLES, &MAX_CONTINUAL_TASKS, + &MAX_NETWORK_POLICIES, + &MAX_RULES_PER_NETWORK_POLICY, &MAX_RESULT_SIZE, &MAX_COPY_FROM_SIZE, &ALLOWED_CLUSTER_REPLICA_SIZES, @@ -1712,6 +1714,11 @@ impl SystemVars { *self.expect_value(&MAX_NETWORK_POLICIES) } + /// Returns the value of the `max_network_policies` configuration parameter. + pub fn max_rules_per_network_policy(&self) -> u32 { + *self.expect_value(&MAX_RULES_PER_NETWORK_POLICY) + } + /// Returns the value of the `max_result_size` configuration parameter. pub fn max_result_size(&self) -> u64 { self.expect_value::(&MAX_RESULT_SIZE).as_bytes() diff --git a/src/sql/src/session/vars/definitions.rs b/src/sql/src/session/vars/definitions.rs index 418af4653d7dd..a3e2c9f0c2815 100644 --- a/src/sql/src/session/vars/definitions.rs +++ b/src/sql/src/session/vars/definitions.rs @@ -557,11 +557,18 @@ pub static MAX_NETWORK_POLICIES: VarDefinition = VarDefinition::new( true, ); +pub static MAX_RULES_PER_NETWORK_POLICY: VarDefinition = VarDefinition::new( + "max_rules_per_network_policies", + value!(u32; 25), + "The maximum number of rules per network policies.", + true, +); + // Cloud environmentd is configured with 4 GiB of RAM, so 1 GiB is a good heuristic for a single // query. // // We constrain this parameter to a minimum of 1MB, to avoid accidental usage of values that will -// interfer with queries executed by the system itself. +// interfere with queries executed by the system itself. // // TODO(jkosh44) Eventually we want to be able to return arbitrary sized results. pub static MAX_RESULT_SIZE: VarDefinition = VarDefinition::new( @@ -2163,6 +2170,12 @@ feature_flags!( default: false, enable_for_item_parsing: true, }, + { + name: enable_network_policies, + desc: "ENABLE NETWORK POLICIES", + default: false, + enable_for_item_parsing: true, + }, ); impl From<&super::SystemVars> for OptimizerFeatures { diff --git a/test/sqllogictest/audit_log.slt b/test/sqllogictest/audit_log.slt index e92eb6c851460..dc9bf0e7e201a 100644 --- a/test/sqllogictest/audit_log.slt +++ b/test/sqllogictest/audit_log.slt @@ -151,8 +151,8 @@ SELECT id, event_type, object_type, details, user FROM mz_audit_events ORDER BY 12 grant cluster {"grantee_id":"p","grantor_id":"s1","object_id":"Cu1","privileges":"U"} NULL 13 grant cluster {"grantee_id":"u1","grantor_id":"s1","object_id":"Cu1","privileges":"UC"} NULL 14 create cluster-replica {"billed_as":null,"cluster_id":"u1","cluster_name":"quickstart","disk":false,"internal":false,"logical_size":"2","reason":"system","replica_id":"u1","replica_name":"r1"} NULL -15 grant system {"grantee_id":"s1","grantor_id":"s1","object_id":"SYSTEM","privileges":"RBN"} NULL -16 grant system {"grantee_id":"u1","grantor_id":"s1","object_id":"SYSTEM","privileges":"RBN"} NULL +15 grant system {"grantee_id":"s1","grantor_id":"s1","object_id":"SYSTEM","privileges":"RBNP"} NULL +16 grant system {"grantee_id":"u1","grantor_id":"s1","object_id":"SYSTEM","privileges":"RBNP"} NULL 17 alter system {"name":"enable_reduce_mfp_fusion","value":"on"} mz_system 18 alter system {"name":"enable_unsafe_functions","value":"on"} mz_system 19 create database {"id":"u2","name":"test"} materialize diff --git a/test/sqllogictest/autogenerated/mz_internal.slt b/test/sqllogictest/autogenerated/mz_internal.slt index 3a5a6bd95bb95..36649f4915cae 100644 --- a/test/sqllogictest/autogenerated/mz_internal.slt +++ b/test/sqllogictest/autogenerated/mz_internal.slt @@ -395,6 +395,12 @@ SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object 4 address text 5 direction text +query ITT +SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object = 'mz_show_network_policies' ORDER BY position +---- +1 name text +2 comment text + query ITT SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object = 'mz_show_all_privileges' ORDER BY position ---- @@ -718,6 +724,7 @@ mz_show_my_object_privileges mz_show_my_role_members mz_show_my_schema_privileges mz_show_my_system_privileges +mz_show_network_policies mz_show_object_privileges mz_show_role_members mz_show_roles diff --git a/test/sqllogictest/information_schema_tables.slt b/test/sqllogictest/information_schema_tables.slt index 54f0bcde67710..5f2f7a2eb98a5 100644 --- a/test/sqllogictest/information_schema_tables.slt +++ b/test/sqllogictest/information_schema_tables.slt @@ -585,6 +585,10 @@ mz_show_my_system_privileges VIEW materialize mz_internal +mz_show_network_policies +VIEW +materialize +mz_internal mz_show_object_privileges VIEW materialize diff --git a/test/sqllogictest/mz_catalog_server_index_accounting.slt b/test/sqllogictest/mz_catalog_server_index_accounting.slt index ed578069d1916..a98ac526b3f70 100644 --- a/test/sqllogictest/mz_catalog_server_index_accounting.slt +++ b/test/sqllogictest/mz_catalog_server_index_accounting.slt @@ -72,7 +72,7 @@ mz_message_batch_counts_received_raw_s2_primary_idx CREATE␠INDEX␠"mz_messag mz_message_batch_counts_sent_raw_s2_primary_idx CREATE␠INDEX␠"mz_message_batch_counts_sent_raw_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_message_batch_counts_sent_raw"␠("channel_id",␠"from_worker_id",␠"to_worker_id") mz_message_counts_received_raw_s2_primary_idx CREATE␠INDEX␠"mz_message_counts_received_raw_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_message_counts_received_raw"␠("channel_id",␠"from_worker_id",␠"to_worker_id") mz_message_counts_sent_raw_s2_primary_idx CREATE␠INDEX␠"mz_message_counts_sent_raw_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_message_counts_sent_raw"␠("channel_id",␠"from_worker_id",␠"to_worker_id") -mz_notices_ind CREATE␠INDEX␠"mz_notices_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s774␠AS␠"mz_internal"."mz_notices"]␠("id") +mz_notices_ind CREATE␠INDEX␠"mz_notices_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s775␠AS␠"mz_internal"."mz_notices"]␠("id") mz_object_dependencies_ind CREATE␠INDEX␠"mz_object_dependencies_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s453␠AS␠"mz_internal"."mz_object_dependencies"]␠("object_id") mz_object_history_ind CREATE␠INDEX␠"mz_object_history_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s517␠AS␠"mz_internal"."mz_object_history"]␠("id") mz_object_lifetimes_ind CREATE␠INDEX␠"mz_object_lifetimes_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s518␠AS␠"mz_internal"."mz_object_lifetimes"]␠("id") @@ -81,7 +81,7 @@ mz_objects_ind CREATE␠INDEX␠"mz_objects_ind"␠IN␠CLUSTER␠[s2]␠ON␠[ mz_peek_durations_histogram_raw_s2_primary_idx CREATE␠INDEX␠"mz_peek_durations_histogram_raw_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_peek_durations_histogram_raw"␠("worker_id",␠"type",␠"duration_ns") mz_recent_activity_log_thinned_ind CREATE␠INDEX␠"mz_recent_activity_log_thinned_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s676␠AS␠"mz_internal"."mz_recent_activity_log_thinned"]␠("sql_hash") mz_recent_sql_text_ind CREATE␠INDEX␠"mz_recent_sql_text_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s672␠AS␠"mz_internal"."mz_recent_sql_text"]␠("sql_hash") -mz_recent_storage_usage_ind CREATE␠INDEX␠"mz_recent_storage_usage_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s767␠AS␠"mz_catalog"."mz_recent_storage_usage"]␠("object_id") +mz_recent_storage_usage_ind CREATE␠INDEX␠"mz_recent_storage_usage_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s768␠AS␠"mz_catalog"."mz_recent_storage_usage"]␠("object_id") mz_roles_ind CREATE␠INDEX␠"mz_roles_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s476␠AS␠"mz_catalog"."mz_roles"]␠("id") mz_scheduling_elapsed_raw_s2_primary_idx CREATE␠INDEX␠"mz_scheduling_elapsed_raw_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_scheduling_elapsed_raw"␠("id",␠"worker_id") mz_scheduling_parks_histogram_raw_s2_primary_idx CREATE␠INDEX␠"mz_scheduling_parks_histogram_raw_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_scheduling_parks_histogram_raw"␠("worker_id",␠"slept_for_ns",␠"requested_ns") diff --git a/test/sqllogictest/network_policy.slt b/test/sqllogictest/network_policy.slt new file mode 100644 index 0000000000000..5562d19a5e8b1 --- /dev/null +++ b/test/sqllogictest/network_policy.slt @@ -0,0 +1,82 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Test for `NETWORK POLICIES`. + +mode standard + +# Start from a pristine state +reset-server + +simple conn=mz_system,user=mz_system + +query TT rowsort +select * from (SHOW NETWORK POLICIES) +---- +db error: ERROR: Unexpected keyword QUERY at the beginning of a statement + + +simple conn=mz_system,user=mz_system +ALTER SYSTEM SET enable_network_policies=on; +---- +COMPLETE 0 + + +statement ok +CREATE NETWORK POLICY np (RULES ( r1 (address='0.0.0.0/32', action='allow', direction='ingress'), r2 (address='0.0.0.1/32', action='allow', direction='ingress') )); + +query TT rowsort +SHOW NETWORK POLICIES +---- +np +(empty) + +query TTTT +SELECT id, name, owner_id, oid FROM mz_internal.mz_network_policies +---- +u1 +np +u1 +20177 + + +query TTTTT colnames +SELECT * FROM mz_internal.mz_network_policy_rules +---- +name policy_id action address direction +r1 +u1 +allow +0.0.0.0/32 +ingress +r2 +u1 +allow +0.0.0.1/32 +ingress + +statement ok +ALTER NETWORK POLICY np SET (RULES (r1 (address='1.1.1.1/32', action='allow', direction='ingress') )); + +query TTTTT colnames +SELECT * FROM mz_internal.mz_network_policy_rules +---- +name policy_id action address direction +r1 +u1 +allow +1.1.1.1/32 +ingress + +statement ok +DROP NETWORK POLICY np; + +query TT rowsort +select * from (SHOW NETWORK POLICIES) +---- diff --git a/test/sqllogictest/object_ownership.slt b/test/sqllogictest/object_ownership.slt index 2c4c89b5f4e7b..ffa1e08e05756 100644 --- a/test/sqllogictest/object_ownership.slt +++ b/test/sqllogictest/object_ownership.slt @@ -2543,8 +2543,8 @@ v materialize=r/mz_system query T SELECT privileges::text FROM mz_system_privileges ---- -mz_system=RBN/mz_system materialize=BN/mz_system +mz_system=RBNP/mz_system simple conn=mz_system,user=mz_system DROP OWNED BY materialize; @@ -2579,7 +2579,7 @@ v mz_system=r/mz_system query T SELECT privileges::text FROM mz_system_privileges ---- -mz_system=RBN/mz_system +mz_system=RBNP/mz_system simple conn=mz_system,user=mz_system DROP VIEW v; diff --git a/test/sqllogictest/oid.slt b/test/sqllogictest/oid.slt index 5b9753d1d21a7..519f196bec4d7 100644 --- a/test/sqllogictest/oid.slt +++ b/test/sqllogictest/oid.slt @@ -1147,3 +1147,4 @@ SELECT oid, name FROM mz_objects WHERE id LIKE 's%' AND oid < 20000 ORDER BY oid 17039 mz_continual_tasks_ind 17040 mz_network_policies 17041 mz_network_policy_rules +17042 mz_show_network_policies diff --git a/test/sqllogictest/rbac_views.slt b/test/sqllogictest/rbac_views.slt index 475762e9e065b..3191299cabb3f 100644 --- a/test/sqllogictest/rbac_views.slt +++ b/test/sqllogictest/rbac_views.slt @@ -99,6 +99,7 @@ mz_system PUBLIC CREATECLUSTER mz_system materialize CREATEDB mz_system materialize CREATEROLE mz_system materialize CREATECLUSTER +mz_system materialize CREATENETWORKPOLICY mz_system r1 CREATEDB mz_system r1 CREATECLUSTER mz_system r2 CREATEROLE @@ -320,6 +321,7 @@ mz_system PUBLIC NULL NULL NULL system CREATECLUS mz_system materialize NULL NULL NULL system CREATEDB mz_system materialize NULL NULL NULL system CREATEROLE mz_system materialize NULL NULL NULL system CREATECLUSTER +mz_system materialize NULL NULL NULL system CREATENETWORKPOLICY mz_system r1 NULL NULL NULL system CREATEDB mz_system r1 NULL NULL NULL system CREATECLUSTER mz_system r2 NULL NULL NULL system CREATEROLE @@ -402,6 +404,7 @@ mz_system PUBLIC NULL NULL NULL system CREATECLUS mz_system materialize NULL NULL NULL system CREATEDB mz_system materialize NULL NULL NULL system CREATEROLE mz_system materialize NULL NULL NULL system CREATECLUSTER +mz_system materialize NULL NULL NULL system CREATENETWORKPOLICY mz_system r1 NULL NULL NULL system CREATEDB mz_system r1 NULL NULL NULL system CREATECLUSTER mz_system r2 NULL NULL NULL system CREATEROLE diff --git a/test/testdrive/catalog.td b/test/testdrive/catalog.td index 079d34a01afed..621c2771c0986 100644 --- a/test/testdrive/catalog.td +++ b/test/testdrive/catalog.td @@ -289,7 +289,7 @@ public.bool > DROP DATABASE foo ! DROP OBJECT v1 -contains:Expected one of TABLE or VIEW or MATERIALIZED or SOURCE or SINK or INDEX or TYPE or ROLE or USER or CLUSTER or SECRET or CONNECTION or DATABASE or SCHEMA or FUNCTION or CONTINUAL, found identifier +contains:Expected one of TABLE or VIEW or MATERIALIZED or SOURCE or SINK or INDEX or TYPE or ROLE or USER or CLUSTER or SECRET or CONNECTION or DATABASE or SCHEMA or FUNCTION or CONTINUAL or NETWORK, found identifier > SHOW OBJECTS name type comment @@ -670,6 +670,7 @@ mz_show_my_object_privileges "" mz_show_my_role_members "" mz_show_my_schema_privileges "" mz_show_my_system_privileges "" +mz_show_network_policies "" mz_show_object_privileges "" mz_show_role_members "" mz_show_roles "" diff --git a/test/testdrive/session.td b/test/testdrive/session.td index 9615f8586e5f1..f1023c60bbc3d 100644 --- a/test/testdrive/session.td +++ b/test/testdrive/session.td @@ -50,12 +50,14 @@ max_identifier_length 255 "The maximum length max_kafka_connections 1000 "The maximum number of Kafka connections in the region, across all schemas (Materialize)." max_materialized_views 100 "The maximum number of materialized views in the region, across all schemas (Materialize)." max_mysql_connections 1000 "The maximum number of MySQL connections in the region, across all schemas (Materialize)." +max_network_policies 25 "The maximum number of network policies in the region." max_objects_per_schema 1000 "The maximum number of objects in a schema (Materialize)." max_postgres_connections 1000 "The maximum number of PostgreSQL connections in the region, across all schemas (Materialize)." max_query_result_size "1GB" "The maximum size in bytes for a single query's result (Materialize)." max_replicas_per_cluster 5 "The maximum number of replicas of a single cluster (Materialize)." max_result_size "1GB" "The maximum size in bytes for an internal query result (Materialize)." max_roles 1000 "The maximum number of roles in the region (Materialize)." +max_rules_per_network_policies 25 "The maximum number of rules per network policies." max_schemas_per_database 1000 "The maximum number of schemas in a database (Materialize)." max_secrets 100 "The maximum number of secrets in the region, across all schemas (Materialize)." max_sinks 25 "The maximum number of sinks in the region, across all schemas (Materialize)."