Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/network policy predefined #30261

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,10 @@ impl Catalog {
self.state.get_network_policy(&network_policy_id)
}

pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
self.state.try_get_network_policy_by_name(name)
}

pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
self.state.clusters_by_id.values()
}
Expand Down
31 changes: 11 additions & 20 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, Cluste
use mz_catalog::durable::OpenableDurableCatalogState;
use mz_catalog::memory::objects::{
CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
DataSourceDesc, TableDataSource,
DataSourceDesc, NetworkPolicy, TableDataSource,
};
use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
use mz_compute_client::as_of_selection;
Expand Down Expand Up @@ -4114,24 +4114,15 @@ pub enum NetworkPolicyError {
MissingIp,
}

// TODO @jubrad this will be moved to a catalog resource in v1
// of network policies.
/// Represents a basic network policy.
#[derive(Debug, Clone)]
pub struct NetworkPolicy {
allow_list: Vec<IpNet>,
}

impl NetworkPolicy {
pub fn new(allow_list: Vec<IpNet>) -> Self {
NetworkPolicy { allow_list }
}

/// Validate the provided IP is allowed by the network policy.
pub fn validate(&self, ip: &IpAddr) -> Result<(), NetworkPolicyError> {
match self.allow_list.iter().any(|net| net.contains(ip)) {
true => Ok(()),
false => Err(NetworkPolicyError::AddressDenied(ip.clone())),
}
pub(crate) fn validate_network_with_policy(
ip: &IpAddr,
policy: &NetworkPolicy,
) -> Result<(), NetworkPolicyError> {
// At the moment we're not handling action or direction
// as those are only able to be "allow" and "ingress" respectively
if policy.rules.iter().any(|r| r.address.0.contains(ip)) {
Ok(())
} else {
Err(NetworkPolicyError::AddressDenied(ip.clone()))
}
}
139 changes: 84 additions & 55 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use mz_sql::rbac;
use mz_sql::rbac::CREATE_ITEM_USAGE;
use mz_sql::session::user::User;
use mz_sql::session::vars::{
EndTransactionAction, OwnedVarInput, Value, Var, STATEMENT_LOGGING_SAMPLE_RATE,
EndTransactionAction, OwnedVarInput, Value, Var, NETWORK_POLICY, STATEMENT_LOGGING_SAMPLE_RATE,
};
use mz_sql_parser::ast::display::AstDisplay;
use mz_sql_parser::ast::{
Expand All @@ -62,8 +62,8 @@ use crate::command::{
};
use crate::coord::appends::PendingWriteTxn;
use crate::coord::{
ConnMeta, Coordinator, DeferredPlanStatement, Message, NetworkPolicy, PendingTxn,
PlanStatement, PlanValidity, PurifiedStatementReady,
validate_network_with_policy, ConnMeta, Coordinator, DeferredPlanStatement, Message,
PendingTxn, PlanStatement, PlanValidity, PurifiedStatementReady,
};
use crate::error::AdapterError;
use crate::notice::AdapterNotice;
Expand Down Expand Up @@ -257,34 +257,7 @@ impl Coordinator {
) {
// Early return if successful, otherwise cleanup any possible state.
match self.handle_startup_inner(&user, &conn_id, &client_ip).await {
Ok(role_id) => {
let system_config = self.catalog().state().system_config();
let mut session_defaults = BTreeMap::new();

// Override the session with any system defaults.
session_defaults.extend(
system_config
.iter_session()
.map(|v| (v.name().to_string(), OwnedVarInput::Flat(v.value()))),
);

// Special case.
let statement_logging_default = system_config
.statement_logging_default_sample_rate()
.format();
session_defaults.insert(
STATEMENT_LOGGING_SAMPLE_RATE.name().to_string(),
OwnedVarInput::Flat(statement_logging_default),
);

// Override system defaults with role defaults.
session_defaults.extend(
self.catalog()
.get_role(&role_id)
.vars()
.map(|(name, val)| (name.to_string(), val.clone())),
);

Ok((role_id, session_defaults)) => {
let session_type = metrics::session_type_label_value(&user);
self.metrics
.active_sessions
Expand Down Expand Up @@ -348,29 +321,7 @@ impl Coordinator {
user: &User,
conn_id: &ConnectionId,
client_ip: &Option<IpAddr>,
) -> Result<RoleId, AdapterError> {
// Validate network policies for external users. Internal users
// can only connect on the internal interfaces (internal HTTP/
// pgwire). It is up to the person deploying the system to
// ensure these internal interfaces are well secured.
let system_config = self.catalog().state().system_config();
if !user.is_internal() {
let default_policy = NetworkPolicy::new(system_config.default_network_policy());
if let Some(ip) = client_ip {
match default_policy.validate(ip) {
Ok(_) => {}
Err(e) => return Err(AdapterError::NetworkPolicyDenied(e)),
}
} else {
// Only temporary and internal representation of a session
// should be missing a client_ip. These sessions should not be
// making requests or going through handle_startup.
return Err(AdapterError::NetworkPolicyDenied(
super::NetworkPolicyError::MissingIp,
));
}
}

) -> Result<(RoleId, BTreeMap<String, OwnedVarInput>), AdapterError> {
if self.catalog().try_get_role_by_name(&user.name).is_none() {
// If the user has made it to this point, that means they have been fully authenticated.
// This includes preventing any user, except a pre-defined set of system users, from
Expand All @@ -393,9 +344,87 @@ impl Coordinator {
return Err(AdapterError::UserSessionsDisallowed);
}

// Initialize the default session variables for this role.
let mut session_defaults = BTreeMap::new();
let system_config = self.catalog().state().system_config();

// Override the session with any system defaults.
session_defaults.extend(
system_config
.iter_session()
.map(|v| (v.name().to_string(), OwnedVarInput::Flat(v.value()))),
);
// Special case.
let statement_logging_default = system_config
.statement_logging_default_sample_rate()
.format();
session_defaults.insert(
STATEMENT_LOGGING_SAMPLE_RATE.name().to_string(),
OwnedVarInput::Flat(statement_logging_default),
);
// Override system defaults with role defaults.
session_defaults.extend(
self.catalog()
.get_role(&role_id)
.vars()
.map(|(name, val)| (name.to_string(), val.clone())),
);

// Validate network policies for external users. Internal users can only connect on the
// internal interfaces (internal HTTP/ pgwire). It is up to the person deploying the system
// to ensure these internal interfaces are well secured.
//
// HACKY(parkmycar): We don't have a fully formed session yet for this role, but we want
// the default network policy for this role, so we read directly out of what the session
// will get initialized with.
if !user.is_internal() {
let network_policy_name = session_defaults
.get(NETWORK_POLICY.name())
.and_then(|value| match value {
OwnedVarInput::Flat(name) => Some(name.clone()),
OwnedVarInput::SqlSet(names) => {
tracing::error!(?names, "found multiple network policies");
None
}
})
.unwrap_or(system_config.default_network_policy_name());
let maybe_network_policy = self
.catalog()
.get_network_policy_by_name(&network_policy_name);

let Some(network_policy) = maybe_network_policy else {
// We should prevent dropping the default network policy, or setting the policy
// to something that doesn't exist, so complain loudly if this occurs.
tracing::error!(
network_policy_name,
"default network policy does not exist. All user traffic will be blocked"
);
let reason = match client_ip {
Some(ip) => super::NetworkPolicyError::AddressDenied(ip.clone()),
None => super::NetworkPolicyError::MissingIp,
};
return Err(AdapterError::NetworkPolicyDenied(reason));
};

if let Some(ip) = client_ip {
match validate_network_with_policy(ip, network_policy) {
Ok(_) => {}
Err(e) => return Err(AdapterError::NetworkPolicyDenied(e)),
}
} else {
// Only temporary and internal representation of a session
// should be missing a client_ip. These sessions should not be
// making requests or going through handle_startup.
return Err(AdapterError::NetworkPolicyDenied(
super::NetworkPolicyError::MissingIp,
));
}
}

self.catalog_mut()
.create_temporary_schema(conn_id, role_id)?;
Ok(role_id)

Ok((role_id, session_defaults))
}

/// Handles an execute command.
Expand Down
36 changes: 34 additions & 2 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ use mz_sql::plan::{
use mz_sql::session::metadata::SessionMetadata;
use mz_sql::session::user::UserKind;
use mz_sql::session::vars::{
self, IsolationLevel, OwnedVarInput, SessionVars, Var, VarInput, SCHEMA_ALIAS,
TRANSACTION_ISOLATION_VAR_NAME,
self, IsolationLevel, OwnedVarInput, SessionVars, Var, VarError, VarInput, NETWORK_POLICY,
SCHEMA_ALIAS, TRANSACTION_ISOLATION_VAR_NAME,
};
use mz_sql::{plan, rbac};
use mz_sql_parser::ast::display::AstDisplay;
Expand Down Expand Up @@ -4060,6 +4060,38 @@ impl Coordinator {
plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
) -> Result<ExecuteResponse, AdapterError> {
self.is_user_allowed_to_alter_system(session, Some(&name))?;

// Make sure the network policy we're trying to set actually exists.
//
// TODO(parkmycar): It would be great if we could impose this as a constraint on
// `VarDefinition`, but the current API doesn't support that.
if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
let policy_name = match &value {
// Make sure the compiled in default still exists.
plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
plan::VariableValue::Values(values) if values.len() == 1 => {
values.iter().next().cloned()
}
plan::VariableValue::Values(values) => {
tracing::warn!(?values, "can't set multiple network policies at once");
None
}
};
let network_policy_exists = policy_name
.as_ref()
.and_then(|name| self.catalog.get_network_policy_by_name(name))
.is_some();
if !network_policy_exists {
return Err(AdapterError::PlanError(plan::PlanError::VarError(
VarError::InvalidParameterValue {
name: NETWORK_POLICY.name(),
invalid_values: vec![policy_name.unwrap_or("<none>".to_string())],
reason: "no network policy with such name exists".to_string(),
},
)));
}
}

let op = match value {
plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
name: name.clone(),
Expand Down
2 changes: 2 additions & 0 deletions src/buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ breaking:
# reason: does currently not require backward-compatibility
- catalog/protos/objects_v70.proto
# reason: does currently not require backward-compatibility
- catalog/protos/objects_v71.proto
# reason: does currently not require backward-compatibility
- cluster-client/src/client.proto
# reason: does currently not require backward-compatibility
- compute-client/src/logging.proto
Expand Down
4 changes: 4 additions & 0 deletions src/catalog/protos/hashes.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,9 @@
{
"name": "objects_v70.proto",
"md5": "a43660c9160c900f00d62d3031e2fad0"
},
{
"name": "objects_v71.proto",
"md5": "26b6c8620c7d2cdcdddcad7b75d7afb2"
}
]
Loading
Loading