Skip to content

Commit

Permalink
add predefined network policy and system var
Browse files Browse the repository at this point in the history
- adds a new predefined network policy
- removes the allow list based policy system var
  in favor of a new `network_policy` system var
- swaps policy enforcement to use the default policy.
  • Loading branch information
jubrad committed Nov 1, 2024
1 parent 289410e commit 874c250
Show file tree
Hide file tree
Showing 22 changed files with 1,732 additions and 164 deletions.
4 changes: 4 additions & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,10 @@ impl Catalog {
self.state.get_network_policy(&network_policy_id)
}

pub fn get_network_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
self.state.try_get_network_policy_by_name(name)
}

pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
self.state.clusters_by_id.values()
}
Expand Down
31 changes: 11 additions & 20 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, Cluste
use mz_catalog::durable::OpenableDurableCatalogState;
use mz_catalog::memory::objects::{
CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
DataSourceDesc, TableDataSource,
DataSourceDesc, NetworkPolicy, TableDataSource,
};
use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
use mz_compute_client::as_of_selection;
Expand Down Expand Up @@ -4097,24 +4097,15 @@ pub enum NetworkPolicyError {
MissingIp,
}

// TODO @jubrad this will be moved to a catalog resource in v1
// of network policies.
/// Represents a basic network policy.
#[derive(Debug, Clone)]
pub struct NetworkPolicy {
allow_list: Vec<IpNet>,
}

impl NetworkPolicy {
pub fn new(allow_list: Vec<IpNet>) -> Self {
NetworkPolicy { allow_list }
}

/// Validate the provided IP is allowed by the network policy.
pub fn validate(&self, ip: &IpAddr) -> Result<(), NetworkPolicyError> {
match self.allow_list.iter().any(|net| net.contains(ip)) {
true => Ok(()),
false => Err(NetworkPolicyError::AddressDenied(ip.clone())),
}
pub(crate) fn validate_network_with_policy(
ip: &IpAddr,
policy: &NetworkPolicy,
) -> Result<(), NetworkPolicyError> {
// At the moment we're not handling action or direction
// as those are only able to be "allow" and "ingress" respectively
if policy.rules.iter().any(|r| r.address.0.contains(ip)) {
Ok(())
} else {
Err(NetworkPolicyError::AddressDenied(ip.clone()))
}
}
25 changes: 21 additions & 4 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ use crate::command::{
};
use crate::coord::appends::{Deferred, PendingWriteTxn};
use crate::coord::{
ConnMeta, Coordinator, DeferredPlanStatement, Message, NetworkPolicy, PendingTxn,
PlanStatement, PlanValidity, PurifiedStatementReady,
validate_network_with_policy, ConnMeta, Coordinator, DeferredPlanStatement, Message,
PendingTxn, PlanStatement, PlanValidity, PurifiedStatementReady,
};
use crate::error::AdapterError;
use crate::notice::AdapterNotice;
Expand Down Expand Up @@ -355,9 +355,26 @@ impl Coordinator {
// ensure these internal interfaces are well secured.
let system_config = self.catalog().state().system_config();
if !user.is_internal() {
let default_policy = NetworkPolicy::new(system_config.default_network_policy());
let Some(network_policy) = self
.catalog()
.get_network_by_name(&system_config.default_network_policy_name())
else {
tracing::error!("Network_policy system var is not pointing to a existing network policy. All user traffic will be blocked");
match client_ip {
Some(ip) => {
return Err(AdapterError::NetworkPolicyDenied(
super::NetworkPolicyError::AddressDenied(ip.clone()),
));
}
None => {
return Err(AdapterError::NetworkPolicyDenied(
super::NetworkPolicyError::MissingIp,
));
}
}
};
if let Some(ip) = client_ip {
match default_policy.validate(ip) {
match validate_network_with_policy(ip, network_policy) {
Ok(_) => {}
Err(e) => return Err(AdapterError::NetworkPolicyDenied(e)),
}
Expand Down
23 changes: 21 additions & 2 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ use mz_sql::plan::{
use mz_sql::session::metadata::SessionMetadata;
use mz_sql::session::user::UserKind;
use mz_sql::session::vars::{
self, IsolationLevel, OwnedVarInput, SessionVars, Var, VarInput, SCHEMA_ALIAS,
TRANSACTION_ISOLATION_VAR_NAME,
self, IsolationLevel, OwnedVarInput, SessionVars, Var, VarError, VarInput, NETWORK_POLICY,
SCHEMA_ALIAS, TRANSACTION_ISOLATION_VAR_NAME,
};
use mz_sql::{plan, rbac};
use mz_sql_parser::ast::display::AstDisplay;
Expand Down Expand Up @@ -4001,6 +4001,25 @@ impl Coordinator {
plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
) -> Result<ExecuteResponse, AdapterError> {
self.is_user_allowed_to_alter_system(session, Some(&name))?;
if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
let values = match value {
plan::VariableValue::Default => None,
plan::VariableValue::Values(ref values) => Some(values),
};
if let Some(_policy) = values
.map(|v| self.catalog.get_network_by_name(&v[0]))
.flatten()
{
} else {
return Err(AdapterError::PlanError(plan::PlanError::VarError(
VarError::InvalidParameterValue {
name: NETWORK_POLICY.name(),
invalid_values: values.expect("Must have provided value.").to_owned(),
reason: "No network policy with such name exists!".to_string(),
},
)));
}
}
let op = match value {
plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
name: name.clone(),
Expand Down
2 changes: 2 additions & 0 deletions src/buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ breaking:
# reason: does currently not require backward-compatibility
- catalog/protos/objects_v69.proto
# reason: does currently not require backward-compatibility
- catalog/protos/objects_v70.proto
# reason: does currently not require backward-compatibility
- cluster-client/src/client.proto
# reason: does currently not require backward-compatibility
- compute-client/src/logging.proto
Expand Down
4 changes: 4 additions & 0 deletions src/catalog/protos/hashes.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,9 @@
{
"name": "objects_v69.proto",
"md5": "638e206754da134b10a0712d63bdd8dc"
},
{
"name": "objects_v70.proto",
"md5": "2f8c95db7075f523a7e28af319279bcd"
}
]
Loading

0 comments on commit 874c250

Please sign in to comment.