Skip to content

Commit

Permalink
Feature/network policy predefined (#30261)
Browse files Browse the repository at this point in the history
<!--
Describe the contents of the PR briefly but completely.

If you write detailed commit messages, it is acceptable to copy/paste
them
here, or write "see commit messages for details." If there is only one
commit
in the PR, GitHub will have already added its commit message above.
-->

### Motivation
- adds a new predefined network policy
- removes the allow list based policy system var
  in favor of a new `network_policy` system var
- swaps policy enforcement to use the default policy.

stacked on #30172
part of MaterializeInc/database-issues#4637

<!--
Which of the following best describes the motivation behind this PR?

  * This PR fixes a recognized bug.

    [Ensure issue is linked somewhere.]

  * This PR adds a known-desirable feature.

    [Ensure issue is linked somewhere.]

  * This PR fixes a previously unreported bug.

    [Describe the bug in detail, as if you were filing a bug report.]

  * This PR adds a feature that has not yet been specified.

[Write a brief specification for the feature, including justification
for its inclusion in Materialize, as if you were writing the original
     feature specification.]

   * This PR refactors existing code.

[Describe what was wrong with the existing code, if it is not obvious.]
-->

### Tips for reviewer

If you are reviewing this prior to #30172 going in, you should only look
at the `add predefined network policy and system var` commit.

<!--
Leave some tips for your reviewer, like:

    * The diff is much smaller if viewed with whitespace hidden.
    * [Some function/module/file] deserves extra attention.
* [Some function/module/file] is pure code movement and only needs a
skim.

Delete this section if no tips.
-->

### Checklist

- [ ] This PR has adequate test coverage / QA involvement has been duly
considered. ([trigger-ci for additional test/nightly
runs](https://trigger-ci.dev.materialize.com/))
- [ ] This PR has an associated up-to-date [design
doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md),
is a design doc
([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)),
or is sufficiently small to not require a design.
  <!-- Reference the design in the description. -->
- [ ] If this PR evolves [an existing `$T ⇔ Proto$T`
mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md)
(possibly in a backwards-incompatible way), then it is tagged with a
`T-proto` label.
- [ ] If this PR will require changes to cloud orchestration or tests,
there is a companion cloud PR to account for those changes that is
tagged with the release-blocker label
([example](MaterializeInc/cloud#5021)).
<!-- Ask in #team-cloud on Slack if you need help preparing the cloud
PR. -->
- [ ] If this PR includes major [user-facing behavior
changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note),
I have pinged the relevant PM to schedule a changelog post.

---------

Co-authored-by: Parker Timmerman <[email protected]>
  • Loading branch information
jubrad and ParkMyCar authored Nov 11, 2024
1 parent f4194e7 commit c10003d
Show file tree
Hide file tree
Showing 22 changed files with 1,813 additions and 216 deletions.
4 changes: 4 additions & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,10 @@ impl Catalog {
self.state.get_network_policy(&network_policy_id)
}

pub fn get_network_policy_by_name(&self, name: &str) -> Option<&NetworkPolicy> {
self.state.try_get_network_policy_by_name(name)
}

pub fn clusters(&self) -> impl Iterator<Item = &Cluster> {
self.state.clusters_by_id.values()
}
Expand Down
31 changes: 11 additions & 20 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, Cluste
use mz_catalog::durable::OpenableDurableCatalogState;
use mz_catalog::memory::objects::{
CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
DataSourceDesc, TableDataSource,
DataSourceDesc, NetworkPolicy, TableDataSource,
};
use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
use mz_compute_client::as_of_selection;
Expand Down Expand Up @@ -4114,24 +4114,15 @@ pub enum NetworkPolicyError {
MissingIp,
}

// TODO @jubrad this will be moved to a catalog resource in v1
// of network policies.
/// Represents a basic network policy.
#[derive(Debug, Clone)]
pub struct NetworkPolicy {
allow_list: Vec<IpNet>,
}

impl NetworkPolicy {
pub fn new(allow_list: Vec<IpNet>) -> Self {
NetworkPolicy { allow_list }
}

/// Validate the provided IP is allowed by the network policy.
pub fn validate(&self, ip: &IpAddr) -> Result<(), NetworkPolicyError> {
match self.allow_list.iter().any(|net| net.contains(ip)) {
true => Ok(()),
false => Err(NetworkPolicyError::AddressDenied(ip.clone())),
}
pub(crate) fn validate_network_with_policy(
ip: &IpAddr,
policy: &NetworkPolicy,
) -> Result<(), NetworkPolicyError> {
// At the moment we're not handling action or direction
// as those are only able to be "allow" and "ingress" respectively
if policy.rules.iter().any(|r| r.address.0.contains(ip)) {
Ok(())
} else {
Err(NetworkPolicyError::AddressDenied(ip.clone()))
}
}
139 changes: 84 additions & 55 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use mz_sql::rbac;
use mz_sql::rbac::CREATE_ITEM_USAGE;
use mz_sql::session::user::User;
use mz_sql::session::vars::{
EndTransactionAction, OwnedVarInput, Value, Var, STATEMENT_LOGGING_SAMPLE_RATE,
EndTransactionAction, OwnedVarInput, Value, Var, NETWORK_POLICY, STATEMENT_LOGGING_SAMPLE_RATE,
};
use mz_sql_parser::ast::display::AstDisplay;
use mz_sql_parser::ast::{
Expand All @@ -62,8 +62,8 @@ use crate::command::{
};
use crate::coord::appends::PendingWriteTxn;
use crate::coord::{
ConnMeta, Coordinator, DeferredPlanStatement, Message, NetworkPolicy, PendingTxn,
PlanStatement, PlanValidity, PurifiedStatementReady,
validate_network_with_policy, ConnMeta, Coordinator, DeferredPlanStatement, Message,
PendingTxn, PlanStatement, PlanValidity, PurifiedStatementReady,
};
use crate::error::AdapterError;
use crate::notice::AdapterNotice;
Expand Down Expand Up @@ -257,34 +257,7 @@ impl Coordinator {
) {
// Early return if successful, otherwise cleanup any possible state.
match self.handle_startup_inner(&user, &conn_id, &client_ip).await {
Ok(role_id) => {
let system_config = self.catalog().state().system_config();
let mut session_defaults = BTreeMap::new();

// Override the session with any system defaults.
session_defaults.extend(
system_config
.iter_session()
.map(|v| (v.name().to_string(), OwnedVarInput::Flat(v.value()))),
);

// Special case.
let statement_logging_default = system_config
.statement_logging_default_sample_rate()
.format();
session_defaults.insert(
STATEMENT_LOGGING_SAMPLE_RATE.name().to_string(),
OwnedVarInput::Flat(statement_logging_default),
);

// Override system defaults with role defaults.
session_defaults.extend(
self.catalog()
.get_role(&role_id)
.vars()
.map(|(name, val)| (name.to_string(), val.clone())),
);

Ok((role_id, session_defaults)) => {
let session_type = metrics::session_type_label_value(&user);
self.metrics
.active_sessions
Expand Down Expand Up @@ -348,29 +321,7 @@ impl Coordinator {
user: &User,
conn_id: &ConnectionId,
client_ip: &Option<IpAddr>,
) -> Result<RoleId, AdapterError> {
// Validate network policies for external users. Internal users
// can only connect on the internal interfaces (internal HTTP/
// pgwire). It is up to the person deploying the system to
// ensure these internal interfaces are well secured.
let system_config = self.catalog().state().system_config();
if !user.is_internal() {
let default_policy = NetworkPolicy::new(system_config.default_network_policy());
if let Some(ip) = client_ip {
match default_policy.validate(ip) {
Ok(_) => {}
Err(e) => return Err(AdapterError::NetworkPolicyDenied(e)),
}
} else {
// Only temporary and internal representation of a session
// should be missing a client_ip. These sessions should not be
// making requests or going through handle_startup.
return Err(AdapterError::NetworkPolicyDenied(
super::NetworkPolicyError::MissingIp,
));
}
}

) -> Result<(RoleId, BTreeMap<String, OwnedVarInput>), AdapterError> {
if self.catalog().try_get_role_by_name(&user.name).is_none() {
// If the user has made it to this point, that means they have been fully authenticated.
// This includes preventing any user, except a pre-defined set of system users, from
Expand All @@ -393,9 +344,87 @@ impl Coordinator {
return Err(AdapterError::UserSessionsDisallowed);
}

// Initialize the default session variables for this role.
let mut session_defaults = BTreeMap::new();
let system_config = self.catalog().state().system_config();

// Override the session with any system defaults.
session_defaults.extend(
system_config
.iter_session()
.map(|v| (v.name().to_string(), OwnedVarInput::Flat(v.value()))),
);
// Special case.
let statement_logging_default = system_config
.statement_logging_default_sample_rate()
.format();
session_defaults.insert(
STATEMENT_LOGGING_SAMPLE_RATE.name().to_string(),
OwnedVarInput::Flat(statement_logging_default),
);
// Override system defaults with role defaults.
session_defaults.extend(
self.catalog()
.get_role(&role_id)
.vars()
.map(|(name, val)| (name.to_string(), val.clone())),
);

// Validate network policies for external users. Internal users can only connect on the
// internal interfaces (internal HTTP/ pgwire). It is up to the person deploying the system
// to ensure these internal interfaces are well secured.
//
// HACKY(parkmycar): We don't have a fully formed session yet for this role, but we want
// the default network policy for this role, so we read directly out of what the session
// will get initialized with.
if !user.is_internal() {
let network_policy_name = session_defaults
.get(NETWORK_POLICY.name())
.and_then(|value| match value {
OwnedVarInput::Flat(name) => Some(name.clone()),
OwnedVarInput::SqlSet(names) => {
tracing::error!(?names, "found multiple network policies");
None
}
})
.unwrap_or(system_config.default_network_policy_name());
let maybe_network_policy = self
.catalog()
.get_network_policy_by_name(&network_policy_name);

let Some(network_policy) = maybe_network_policy else {
// We should prevent dropping the default network policy, or setting the policy
// to something that doesn't exist, so complain loudly if this occurs.
tracing::error!(
network_policy_name,
"default network policy does not exist. All user traffic will be blocked"
);
let reason = match client_ip {
Some(ip) => super::NetworkPolicyError::AddressDenied(ip.clone()),
None => super::NetworkPolicyError::MissingIp,
};
return Err(AdapterError::NetworkPolicyDenied(reason));
};

if let Some(ip) = client_ip {
match validate_network_with_policy(ip, network_policy) {
Ok(_) => {}
Err(e) => return Err(AdapterError::NetworkPolicyDenied(e)),
}
} else {
// Only temporary and internal representation of a session
// should be missing a client_ip. These sessions should not be
// making requests or going through handle_startup.
return Err(AdapterError::NetworkPolicyDenied(
super::NetworkPolicyError::MissingIp,
));
}
}

self.catalog_mut()
.create_temporary_schema(conn_id, role_id)?;
Ok(role_id)

Ok((role_id, session_defaults))
}

/// Handles an execute command.
Expand Down
36 changes: 34 additions & 2 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ use mz_sql::plan::{
use mz_sql::session::metadata::SessionMetadata;
use mz_sql::session::user::UserKind;
use mz_sql::session::vars::{
self, IsolationLevel, OwnedVarInput, SessionVars, Var, VarInput, SCHEMA_ALIAS,
TRANSACTION_ISOLATION_VAR_NAME,
self, IsolationLevel, OwnedVarInput, SessionVars, Var, VarError, VarInput, NETWORK_POLICY,
SCHEMA_ALIAS, TRANSACTION_ISOLATION_VAR_NAME,
};
use mz_sql::{plan, rbac};
use mz_sql_parser::ast::display::AstDisplay;
Expand Down Expand Up @@ -4060,6 +4060,38 @@ impl Coordinator {
plan::AlterSystemSetPlan { name, value }: plan::AlterSystemSetPlan,
) -> Result<ExecuteResponse, AdapterError> {
self.is_user_allowed_to_alter_system(session, Some(&name))?;

// Make sure the network policy we're trying to set actually exists.
//
// TODO(parkmycar): It would be great if we could impose this as a constraint on
// `VarDefinition`, but the current API doesn't support that.
if NETWORK_POLICY.name.to_string().to_lowercase() == name.clone().to_lowercase() {
let policy_name = match &value {
// Make sure the compiled in default still exists.
plan::VariableValue::Default => Some(NETWORK_POLICY.default_value().format()),
plan::VariableValue::Values(values) if values.len() == 1 => {
values.iter().next().cloned()
}
plan::VariableValue::Values(values) => {
tracing::warn!(?values, "can't set multiple network policies at once");
None
}
};
let network_policy_exists = policy_name
.as_ref()
.and_then(|name| self.catalog.get_network_policy_by_name(name))
.is_some();
if !network_policy_exists {
return Err(AdapterError::PlanError(plan::PlanError::VarError(
VarError::InvalidParameterValue {
name: NETWORK_POLICY.name(),
invalid_values: vec![policy_name.unwrap_or("<none>".to_string())],
reason: "no network policy with such name exists".to_string(),
},
)));
}
}

let op = match value {
plan::VariableValue::Values(values) => catalog::Op::UpdateSystemConfiguration {
name: name.clone(),
Expand Down
2 changes: 2 additions & 0 deletions src/buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ breaking:
# reason: does currently not require backward-compatibility
- catalog/protos/objects_v70.proto
# reason: does currently not require backward-compatibility
- catalog/protos/objects_v71.proto
# reason: does currently not require backward-compatibility
- cluster-client/src/client.proto
# reason: does currently not require backward-compatibility
- compute-client/src/logging.proto
Expand Down
4 changes: 4 additions & 0 deletions src/catalog/protos/hashes.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,9 @@
{
"name": "objects_v70.proto",
"md5": "a43660c9160c900f00d62d3031e2fad0"
},
{
"name": "objects_v71.proto",
"md5": "26b6c8620c7d2cdcdddcad7b75d7afb2"
}
]
Loading

0 comments on commit c10003d

Please sign in to comment.