Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adapter: peeks bypass coordinator #27439

Closed
652 changes: 627 additions & 25 deletions src/adapter/src/client.rs

Large diffs are not rendered by default.

20 changes: 19 additions & 1 deletion src/adapter/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@ use derivative::Derivative;
use enum_kinds::EnumKind;
use futures::future::BoxFuture;
use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
use mz_compute_client::controller::PeekClient;
use mz_compute_types::ComputeInstanceId;
use mz_ore::collections::CollectionExt;
use mz_ore::soft_assert_no_log;
use mz_ore::tracing::OpenTelemetryContext;
use mz_pgcopy::CopyFormatParams;
use mz_repr::role_id::RoleId;
use mz_repr::{GlobalId, Row};
use mz_repr::{GlobalId, Row, Timestamp};
use mz_sql::ast::{FetchDirection, Raw, Statement};
use mz_sql::catalog::ObjectType;
use mz_sql::plan::{ExecuteTimeout, Plan, PlanKind};
use mz_sql::session::user::User;
use mz_sql::session::vars::{OwnedVarInput, Var};
use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement};
use mz_storage_types::sources::Timeline;
use mz_timestamp_oracle::TimestampOracle;
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;

Expand All @@ -53,6 +57,16 @@ pub enum Command {
tx: oneshot::Sender<CatalogSnapshot>,
},

TimestampOracle {
timeline: Timeline,
tx: oneshot::Sender<Arc<dyn TimestampOracle<Timestamp> + Send + Sync>>,
},

GetPeekClient {
instance_id: ComputeInstanceId,
tx: oneshot::Sender<PeekClient<Timestamp>>,
},

Startup {
tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
user: User,
Expand Down Expand Up @@ -135,6 +149,8 @@ impl Command {
Command::CancelRequest { .. }
| Command::Startup { .. }
| Command::CatalogSnapshot { .. }
| Command::TimestampOracle { .. }
| Command::GetPeekClient { .. }
| Command::PrivilegedCancelRequest { .. }
| Command::GetWebhook { .. }
| Command::Terminate { .. }
Expand All @@ -152,6 +168,8 @@ impl Command {
Command::CancelRequest { .. }
| Command::Startup { .. }
| Command::CatalogSnapshot { .. }
| Command::TimestampOracle { .. }
| Command::GetPeekClient { .. }
| Command::PrivilegedCancelRequest { .. }
| Command::GetWebhook { .. }
| Command::Terminate { .. }
Expand Down
33 changes: 18 additions & 15 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ use self::statement_logging::{StatementLogging, StatementLoggingId};
pub(crate) mod id_bundle;
pub(crate) mod in_memory_oracle;
pub(crate) mod peek;
pub(crate) mod peek_exec;
pub(crate) mod statement_logging;
pub(crate) mod timeline;
pub(crate) mod timestamp_selection;
Expand All @@ -188,7 +189,7 @@ mod command_handler;
pub mod consistency;
mod ddl;
mod indexes;
mod introspection;
pub mod introspection;
mod message_handler;
mod privatelink_status;
pub mod read_policy;
Expand Down Expand Up @@ -287,6 +288,8 @@ impl Message {
match self {
Message::Command(_, msg) => match msg {
Command::CatalogSnapshot { .. } => "command-catalog_snapshot",
Command::TimestampOracle { .. } => "command-timestamp_oracle",
Command::GetPeekClient { .. } => "command-peek_client",
Command::Startup { .. } => "command-startup",
Command::Execute { .. } => "command-execute",
Command::Commit { .. } => "command-commit",
Expand Down Expand Up @@ -456,17 +459,17 @@ pub struct CopyToContext {

#[derive(Debug)]
pub struct PeekStageValidate {
plan: mz_sql::plan::SelectPlan,
target_cluster: TargetCluster,
pub plan: mz_sql::plan::SelectPlan,
pub target_cluster: TargetCluster,
/// An optional context set iff the state machine is initiated from
/// sequencing a COPY TO statement.
///
/// Will result in creating and using [`optimize::copy_to::Optimizer`] in
/// the `optimizer` field of all subsequent stages.
copy_to_ctx: Option<CopyToContext>,
pub copy_to_ctx: Option<CopyToContext>,
/// An optional context set iff the state machine is initiated from
/// sequencing an EXPLAIN for this statement.
explain_ctx: ExplainContext,
pub explain_ctx: ExplainContext,
}

#[derive(Debug)]
Expand Down Expand Up @@ -527,17 +530,17 @@ pub struct PeekStageOptimize {

#[derive(Debug)]
pub struct PeekStageFinish {
validity: PlanValidity,
plan: mz_sql::plan::SelectPlan,
id_bundle: CollectionIdBundle,
target_replica: Option<ReplicaId>,
source_ids: BTreeSet<GlobalId>,
determination: TimestampDetermination<mz_repr::Timestamp>,
optimizer: optimize::peek::Optimizer,
pub validity: PlanValidity,
pub plan: mz_sql::plan::SelectPlan,
pub id_bundle: CollectionIdBundle,
pub target_replica: Option<ReplicaId>,
pub source_ids: BTreeSet<GlobalId>,
pub determination: TimestampDetermination<mz_repr::Timestamp>,
pub optimizer: optimize::peek::Optimizer,
/// When present, an optimizer trace to be used for emitting a plan insights
/// notice.
plan_insights_optimizer_trace: Option<OptimizerTrace>,
global_lir_plan: optimize::peek::GlobalLirPlan,
pub plan_insights_optimizer_trace: Option<OptimizerTrace>,
pub global_lir_plan: optimize::peek::GlobalLirPlan,
}

#[derive(Debug)]
Expand Down Expand Up @@ -767,7 +770,7 @@ pub struct PlanValidity {

impl PlanValidity {
/// Returns an error if the current catalog no longer has all dependencies.
fn check(&mut self, catalog: &Catalog) -> Result<(), AdapterError> {
pub fn check(&mut self, catalog: &Catalog) -> Result<(), AdapterError> {
if self.transient_revision == catalog.transient_revision() {
return Ok(());
}
Expand Down
18 changes: 17 additions & 1 deletion src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,21 @@ impl Coordinator {
});
}

Command::TimestampOracle { timeline, tx } => {
let oracle = self.get_timestamp_oracle(&timeline);

let _ = tx.send(oracle);
}

Command::GetPeekClient { instance_id, tx } => {
let peek_client = self
.controller
.active_compute()
.get_peek_client(instance_id)
.expect("missing instance");
let _ = tx.send(peek_client);
}

Command::CheckConsistency { tx } => {
let _ = tx.send(self.check_consistency());
}
Expand Down Expand Up @@ -878,7 +893,8 @@ impl Coordinator {
.iter()
.any(materialized_view_option_contains_temporal)
{
let timeline_context = self.validate_timeline_context(resolved_ids.0.clone())?;
let timeline_context =
Self::validate_timeline_context(self.catalog(), resolved_ids.0.clone())?;

// We default to EpochMilliseconds, similarly to `determine_timestamp_for`,
// but even in the TimestampIndependent case.
Expand Down
9 changes: 4 additions & 5 deletions src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,18 +526,17 @@ impl Coordinator {
if !peeks_to_drop.is_empty() {
for (dropped_name, uuid) in peeks_to_drop {
if let Some(pending_peek) = self.remove_pending_peek(&uuid) {
let cancel_reason = PeekResponse::Error(format!(
"query could not complete because {dropped_name} was dropped"
));
self.controller
.active_compute()
.cancel_peek(pending_peek.cluster_id, uuid)
.cancel_peek(pending_peek.cluster_id, uuid, cancel_reason)
.unwrap_or_terminate("unable to cancel peek");
self.retire_execution(
StatementEndedExecutionReason::Canceled,
pending_peek.ctx_extra,
);
// Client may have left.
let _ = pending_peek.sender.send(PeekResponse::Error(format!(
"query could not complete because {dropped_name} was dropped"
)));
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/src/coord/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ impl Coordinator {
async fn message_controller(&mut self, message: ControllerResponse) {
event!(Level::TRACE, message = format!("{:?}", message));
match message {
ControllerResponse::PeekResponse(uuid, response, otel_ctx) => {
self.send_peek_response(uuid, response, otel_ctx);
ControllerResponse::PeekNotification(uuid, notification, otel_ctx) => {
self.handle_peek_notification(uuid, notification, otel_ctx);
}
ControllerResponse::SubscribeResponse(sink_id, response) => {
match self.active_compute_sinks.get_mut(&sink_id) {
Expand Down
Loading
Loading