diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 04ada4bcf..7472d7ae6 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -427,7 +427,7 @@ message ExecutionGraphStage { repeated GraphStageInput inputs = 4; bytes plan = 5; repeated TaskStatus task_statuses = 6; - uint32 output_link = 7; + repeated uint32 output_links = 7; bool resolved = 8; repeated OperatorMetricsSet stage_metrics = 9; } diff --git a/ballista/rust/core/src/serde/scheduler/from_proto.rs b/ballista/rust/core/src/serde/scheduler/from_proto.rs index 970a5878b..42a0c67a6 100644 --- a/ballista/rust/core/src/serde/scheduler/from_proto.rs +++ b/ballista/rust/core/src/serde/scheduler/from_proto.rs @@ -28,7 +28,10 @@ use crate::error::BallistaError; use crate::serde::protobuf; use crate::serde::protobuf::action::ActionType; use crate::serde::protobuf::{operator_metric, NamedCount, NamedGauge, NamedTime}; -use crate::serde::scheduler::{Action, PartitionId, PartitionLocation, PartitionStats}; +use crate::serde::scheduler::{ + Action, ExecutorData, ExecutorMetadata, ExecutorSpecification, ExecutorState, + PartitionId, PartitionLocation, PartitionStats, +}; impl TryInto for protobuf::Action { type Error = BallistaError; @@ -202,3 +205,79 @@ impl TryInto for protobuf::OperatorMetricsSet { Ok(ms) } } + +#[allow(clippy::from_over_into)] +impl Into for protobuf::ExecutorMetadata { + fn into(self) -> ExecutorMetadata { + ExecutorMetadata { + id: self.id, + host: self.host, + port: self.port as u16, + grpc_port: self.grpc_port as u16, + specification: self.specification.unwrap().into(), + } + } +} + +#[allow(clippy::from_over_into)] +impl Into for protobuf::ExecutorSpecification { + fn into(self) -> ExecutorSpecification { + let mut ret = ExecutorSpecification { task_slots: 0 }; + for resource in self.resources { + if let Some(protobuf::executor_resource::Resource::TaskSlots(task_slots)) = + resource.resource + { + ret.task_slots = task_slots + } + } + ret + } +} + +#[allow(clippy::from_over_into)] +impl Into for protobuf::ExecutorData { + fn into(self) -> ExecutorData { + let mut ret = ExecutorData { + executor_id: self.executor_id, + total_task_slots: 0, + available_task_slots: 0, + }; + for resource in self.resources { + if let Some(task_slots) = resource.total { + if let Some(protobuf::executor_resource::Resource::TaskSlots( + task_slots, + )) = task_slots.resource + { + ret.total_task_slots = task_slots + } + }; + if let Some(task_slots) = resource.available { + if let Some(protobuf::executor_resource::Resource::TaskSlots( + task_slots, + )) = task_slots.resource + { + ret.available_task_slots = task_slots + } + }; + } + ret + } +} + +#[allow(clippy::from_over_into)] +impl Into for protobuf::ExecutorState { + fn into(self) -> ExecutorState { + let mut ret = ExecutorState { + available_memory_size: u64::MAX, + }; + for metric in self.metrics { + if let Some(protobuf::executor_metric::Metric::AvailableMemory( + available_memory_size, + )) = metric.metric + { + ret.available_memory_size = available_memory_size + } + } + ret + } +} diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs b/ballista/rust/core/src/serde/scheduler/mod.rs index 369a87d22..38b350d64 100644 --- a/ballista/rust/core/src/serde/scheduler/mod.rs +++ b/ballista/rust/core/src/serde/scheduler/mod.rs @@ -26,7 +26,6 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::Partitioning; use serde::Serialize; -use super::protobuf; use crate::error::BallistaError; pub mod from_proto; @@ -80,65 +79,12 @@ pub struct ExecutorMetadata { pub specification: ExecutorSpecification, } -#[allow(clippy::from_over_into)] -impl Into for ExecutorMetadata { - fn into(self) -> protobuf::ExecutorMetadata { - protobuf::ExecutorMetadata { - id: self.id, - host: self.host, - port: self.port as u32, - grpc_port: self.grpc_port as u32, - specification: Some(self.specification.into()), - } - } -} - -impl From for ExecutorMetadata { - fn from(meta: protobuf::ExecutorMetadata) -> Self { - Self { - id: meta.id, - host: meta.host, - port: meta.port as u16, - grpc_port: meta.grpc_port as u16, - specification: meta.specification.unwrap().into(), - } - } -} - /// Specification of an executor, indicting executor resources, like total task slots #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] pub struct ExecutorSpecification { pub task_slots: u32, } -#[allow(clippy::from_over_into)] -impl Into for ExecutorSpecification { - fn into(self) -> protobuf::ExecutorSpecification { - protobuf::ExecutorSpecification { - resources: vec![protobuf::executor_resource::Resource::TaskSlots( - self.task_slots, - )] - .into_iter() - .map(|r| protobuf::ExecutorResource { resource: Some(r) }) - .collect(), - } - } -} - -impl From for ExecutorSpecification { - fn from(input: protobuf::ExecutorSpecification) -> Self { - let mut ret = Self { task_slots: 0 }; - for resource in input.resources { - if let Some(protobuf::executor_resource::Resource::TaskSlots(task_slots)) = - resource.resource - { - ret.task_slots = task_slots - } - } - ret - } -} - /// From Spark, available resources for an executor, like available task slots #[derive(Debug, Clone, Serialize)] pub struct ExecutorData { @@ -152,67 +98,6 @@ pub struct ExecutorDataChange { pub task_slots: i32, } -struct ExecutorResourcePair { - total: protobuf::executor_resource::Resource, - available: protobuf::executor_resource::Resource, -} - -#[allow(clippy::from_over_into)] -impl Into for ExecutorData { - fn into(self) -> protobuf::ExecutorData { - protobuf::ExecutorData { - executor_id: self.executor_id, - resources: vec![ExecutorResourcePair { - total: protobuf::executor_resource::Resource::TaskSlots( - self.total_task_slots, - ), - available: protobuf::executor_resource::Resource::TaskSlots( - self.available_task_slots, - ), - }] - .into_iter() - .map(|r| protobuf::ExecutorResourcePair { - total: Some(protobuf::ExecutorResource { - resource: Some(r.total), - }), - available: Some(protobuf::ExecutorResource { - resource: Some(r.available), - }), - }) - .collect(), - } - } -} - -impl From for ExecutorData { - fn from(input: protobuf::ExecutorData) -> Self { - let mut ret = Self { - executor_id: input.executor_id, - total_task_slots: 0, - available_task_slots: 0, - }; - for resource in input.resources { - if let Some(task_slots) = resource.total { - if let Some(protobuf::executor_resource::Resource::TaskSlots( - task_slots, - )) = task_slots.resource - { - ret.total_task_slots = task_slots - } - }; - if let Some(task_slots) = resource.available { - if let Some(protobuf::executor_resource::Resource::TaskSlots( - task_slots, - )) = task_slots.resource - { - ret.available_task_slots = task_slots - } - }; - } - ret - } -} - /// The internal state of an executor, like cpu usage, memory usage, etc #[derive(Debug, Clone, Copy, Serialize)] pub struct ExecutorState { @@ -220,37 +105,6 @@ pub struct ExecutorState { pub available_memory_size: u64, } -#[allow(clippy::from_over_into)] -impl Into for ExecutorState { - fn into(self) -> protobuf::ExecutorState { - protobuf::ExecutorState { - metrics: vec![protobuf::executor_metric::Metric::AvailableMemory( - self.available_memory_size, - )] - .into_iter() - .map(|m| protobuf::ExecutorMetric { metric: Some(m) }) - .collect(), - } - } -} - -impl From for ExecutorState { - fn from(input: protobuf::ExecutorState) -> Self { - let mut ret = Self { - available_memory_size: u64::MAX, - }; - for metric in input.metrics { - if let Some(protobuf::executor_metric::Metric::AvailableMemory( - available_memory_size, - )) = metric.metric - { - ret.available_memory_size = available_memory_size - } - } - ret - } -} - /// Summary of executed partition #[derive(Debug, Copy, Clone, Default)] pub struct PartitionStats { diff --git a/ballista/rust/core/src/serde/scheduler/to_proto.rs b/ballista/rust/core/src/serde/scheduler/to_proto.rs index 815bc96dd..7517408b0 100644 --- a/ballista/rust/core/src/serde/scheduler/to_proto.rs +++ b/ballista/rust/core/src/serde/scheduler/to_proto.rs @@ -21,8 +21,12 @@ use std::convert::TryInto; use crate::error::BallistaError; use crate::serde::protobuf; use crate::serde::protobuf::action::ActionType; + use crate::serde::protobuf::{operator_metric, NamedCount, NamedGauge, NamedTime}; -use crate::serde::scheduler::{Action, PartitionId, PartitionLocation, PartitionStats}; +use crate::serde::scheduler::{ + Action, ExecutorData, ExecutorMetadata, ExecutorSpecification, ExecutorState, + PartitionId, PartitionLocation, PartitionStats, +}; use datafusion::physical_plan::Partitioning; impl TryInto for Action { @@ -171,3 +175,76 @@ impl TryInto for MetricsSet { Ok(protobuf::OperatorMetricsSet { metrics }) } } + +#[allow(clippy::from_over_into)] +impl Into for ExecutorMetadata { + fn into(self) -> protobuf::ExecutorMetadata { + protobuf::ExecutorMetadata { + id: self.id, + host: self.host, + port: self.port as u32, + grpc_port: self.grpc_port as u32, + specification: Some(self.specification.into()), + } + } +} + +#[allow(clippy::from_over_into)] +impl Into for ExecutorSpecification { + fn into(self) -> protobuf::ExecutorSpecification { + protobuf::ExecutorSpecification { + resources: vec![protobuf::executor_resource::Resource::TaskSlots( + self.task_slots, + )] + .into_iter() + .map(|r| protobuf::ExecutorResource { resource: Some(r) }) + .collect(), + } + } +} + +struct ExecutorResourcePair { + total: protobuf::executor_resource::Resource, + available: protobuf::executor_resource::Resource, +} + +#[allow(clippy::from_over_into)] +impl Into for ExecutorData { + fn into(self) -> protobuf::ExecutorData { + protobuf::ExecutorData { + executor_id: self.executor_id, + resources: vec![ExecutorResourcePair { + total: protobuf::executor_resource::Resource::TaskSlots( + self.total_task_slots, + ), + available: protobuf::executor_resource::Resource::TaskSlots( + self.available_task_slots, + ), + }] + .into_iter() + .map(|r| protobuf::ExecutorResourcePair { + total: Some(protobuf::ExecutorResource { + resource: Some(r.total), + }), + available: Some(protobuf::ExecutorResource { + resource: Some(r.available), + }), + }) + .collect(), + } + } +} + +#[allow(clippy::from_over_into)] +impl Into for ExecutorState { + fn into(self) -> protobuf::ExecutorState { + protobuf::ExecutorState { + metrics: vec![protobuf::executor_metric::Metric::AvailableMemory( + self.available_memory_size, + )] + .into_iter() + .map(|m| protobuf::ExecutorMetric { metric: Some(m) }) + .collect(), + } + } +} diff --git a/ballista/rust/scheduler/src/state/execution_graph.rs b/ballista/rust/scheduler/src/state/execution_graph.rs index b13f7c58c..d928f9690 100644 --- a/ballista/rust/scheduler/src/state/execution_graph.rs +++ b/ballista/rust/scheduler/src/state/execution_graph.rs @@ -101,8 +101,8 @@ pub struct ExecutionStage { /// Status of each already scheduled task. If status is None, the partition has not yet been scheduled pub(crate) task_statuses: Vec>, /// Stage ID of the stage that will take this stages outputs as inputs. - /// If `output_link` is `None` then this the final stage in the `ExecutionGraph` - pub(crate) output_link: Option, + /// If `output_links` is empty then this the final stage in the `ExecutionGraph` + pub(crate) output_links: Vec, /// Flag indicating whether all input partitions have been resolved and the plan /// has UnresovledShuffleExec operators resolved to ShuffleReadExec operators. pub(crate) resolved: bool, @@ -136,7 +136,7 @@ impl ExecutionStage { stage_id: usize, plan: Arc, output_partitioning: Option, - output_link: Option, + output_links: Vec, child_stages: Vec, ) -> Self { let num_tasks = plan.output_partitioning().partition_count(); @@ -156,7 +156,7 @@ impl ExecutionStage { inputs, plan, task_statuses: vec![None; num_tasks], - output_link, + output_links, resolved, stage_metrics: None, } @@ -320,7 +320,7 @@ struct ExecutionStageBuilder { /// Map from stage ID -> List of child stage IDs stage_dependencies: HashMap>, /// Map from Stage ID -> output link - output_links: HashMap, + output_links: HashMap>, } impl ExecutionStageBuilder { @@ -346,7 +346,7 @@ impl ExecutionStageBuilder { for stage in stages { let partitioning = stage.shuffle_output_partitioning().cloned(); let stage_id = stage.stage_id(); - let output_link = self.output_links.remove(&stage_id); + let output_links = self.output_links.remove(&stage_id).unwrap_or_default(); let child_stages = self .stage_dependencies @@ -359,7 +359,7 @@ impl ExecutionStageBuilder { stage_id, stage, partitioning, - output_link, + output_links, child_stages, ), ); @@ -381,11 +381,21 @@ impl ExecutionPlanVisitor for ExecutionStageBuilder { } else if let Some(unresolved_shuffle) = plan.as_any().downcast_ref::() { - self.output_links - .insert(unresolved_shuffle.stage_id, self.current_stage_id); + if let Some(output_links) = + self.output_links.get_mut(&unresolved_shuffle.stage_id) + { + if !output_links.contains(&self.current_stage_id) { + output_links.push(self.current_stage_id); + } + } else { + self.output_links + .insert(unresolved_shuffle.stage_id, vec![self.current_stage_id]); + } if let Some(deps) = self.stage_dependencies.get_mut(&self.current_stage_id) { - deps.push(unresolved_shuffle.stage_id) + if !deps.contains(&unresolved_shuffle.stage_id) { + deps.push(unresolved_shuffle.stage_id); + } } else { self.stage_dependencies .insert(self.current_stage_id, vec![unresolved_shuffle.stage_id]); @@ -462,10 +472,10 @@ impl Debug for Task { /// /// /// The DAG structure of this `ExecutionGraph` is encoded in the stages. Each stage's `input` field -/// will indicate which stages it depends on, and each stage's `output_link` will indicate which +/// will indicate which stages it depends on, and each stage's `output_links` will indicate which /// stage it needs to publish its output to. /// -/// If a stage has `output_link == None` then it is the final stage in this query, and it should +/// If a stage has `output_links` is empty then it is the final stage in this query, and it should /// publish its outputs to the `ExecutionGraph`s `output_locations` representing the final query results. #[derive(Clone)] pub struct ExecutionGraph { @@ -616,28 +626,33 @@ impl ExecutionGraph { completed_task.partitions, ); - if let Some(link) = stage.output_link { - // If this is an intermediate stage, we need to push its `PartitionLocation`s to the parent stage - if let Some(linked_stage) = self.stages.get_mut(&link) { - linked_stage.add_input_partitions( - stage_id, partition, locations, - )?; - - // If all tasks for this stage are complete, mark the input complete in the parent stage - if stage_complete { - linked_stage.complete_input(stage_id); - } - - // If all input partitions are ready, we can resolve any UnresolvedShuffleExec in the parent stage plan - if linked_stage.resolvable() { - linked_stage.resolve_shuffles()?; + let output_links = stage.output_links.clone(); + if output_links.is_empty() { + // If `output_links` is empty, then this is a final stage + self.output_locations.extend(locations); + } else { + for link in output_links.into_iter() { + // If this is an intermediate stage, we need to push its `PartitionLocation`s to the parent stage + if let Some(linked_stage) = self.stages.get_mut(&link) { + linked_stage.add_input_partitions( + stage_id, + partition, + locations.clone(), + )?; + + // If all tasks for this stage are complete, mark the input complete in the parent stage + if stage_complete { + linked_stage.complete_input(stage_id); + } + + // If all input partitions are ready, we can resolve any UnresolvedShuffleExec in the parent stage plan + if linked_stage.resolvable() { + linked_stage.resolve_shuffles()?; + } + } else { + return Err(BallistaError::Internal(format!("Error updating job {}: Invalid output link {} for stage {}", job_id, stage_id, link))); } - } else { - return Err(BallistaError::Internal(format!("Error updating job {}: Invalid output link {} for stage {}", job_id, stage_id, link))); } - } else { - // If `output_link` is `None`, then this is a final stage - self.output_locations.extend(locations); } } } else { diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs index 1083665f9..12546a8ec 100644 --- a/ballista/rust/scheduler/src/state/mod.rs +++ b/ballista/rust/scheduler/src/state/mod.rs @@ -52,7 +52,7 @@ pub fn decode_protobuf(bytes: &[u8]) -> Result { }) } -pub fn decode_into>(bytes: &[u8]) -> Result { +pub fn decode_into, U>(bytes: &[u8]) -> Result { T::decode(bytes) .map_err(|e| { BallistaError::Internal(format!( diff --git a/ballista/rust/scheduler/src/state/task_manager.rs b/ballista/rust/scheduler/src/state/task_manager.rs index cc3292637..dcee37225 100644 --- a/ballista/rust/scheduler/src/state/task_manager.rs +++ b/ballista/rust/scheduler/src/state/task_manager.rs @@ -233,98 +233,56 @@ impl TaskManager &self, reservations: &[ExecutorReservation], ) -> Result<(Vec<(String, Task)>, Vec, usize)> { - let lock = self.state.lock(Keyspace::ActiveJobs, "").await?; + // Reinitialize the free reservations. + let free_reservations: Vec = reservations + .iter() + .map(|reservation| { + ExecutorReservation::new_free(reservation.executor_id.clone()) + }) + .collect(); + let lock = self.state.lock(Keyspace::ActiveJobs, "").await?; with_lock(lock, async { - let mut assignments: Vec<(String, Task)> = vec![]; - let mut free_reservations: Vec = vec![]; - // let _txn_ops: Vec<(Keyspace, String, Vec)> = vec![]; - - // Need to collect graphs we update so we can update them in storage when we are done - let mut graphs: HashMap = HashMap::new(); - - // First try and fill reservations for particular jobs. If the job has no more tasks - // free the reservation. - for reservation in reservations { - debug!( - "Filling reservation for executor {} from job {:?}", - reservation.executor_id, reservation.job_id - ); - let executor_id = &reservation.executor_id; - if let Some(job_id) = &reservation.job_id { - if let Some(graph) = graphs.get_mut(job_id) { - if let Ok(Some(next_task)) = graph.pop_next_task(executor_id) { - debug!( - "Filled reservation for executor {} with task {:?}", - executor_id, next_task - ); - assignments.push((executor_id.clone(), next_task)); - } else { - debug!("Cannot fill reservation for executor {} from job {}, freeing reservation", executor_id, job_id); - free_reservations - .push(ExecutorReservation::new_free(executor_id.clone())); - } - } else { - // let lock = self.state.lock(Keyspace::ActiveJobs, job_id).await?; - let mut graph = self.get_execution_graph(job_id).await?; - - if let Ok(Some(next_task)) = graph.pop_next_task(executor_id) { - debug!( - "Filled reservation for executor {} with task {:?}", - executor_id, next_task - ); - assignments.push((executor_id.clone(), next_task)); - graphs.insert(job_id.clone(), graph); - // locks.push(lock); - } else { - debug!("Cannot fill reservation for executor {} from job {}, freeing reservation", executor_id, job_id); - free_reservations - .push(ExecutorReservation::new_free(executor_id.clone())); - } - } - } else { - free_reservations.push(reservation.clone()); - } - } - - let mut other_jobs: Vec = + let mut jobs: Vec = self.get_active_jobs().await?.into_iter().collect(); + let mut assignments: Vec<(String, Task)> = vec![]; let mut unassigned: Vec = vec![]; + // Need to collect graphs we update so we can update them in storage when we are done + let mut graphs: HashMap = HashMap::new(); // Now try and find tasks for free reservations from current set of graphs for reservation in free_reservations { debug!( - "Filling free reservation for executor {}", - reservation.executor_id - ); + "Filling free reservation for executor {}", + reservation.executor_id + ); let mut assigned = false; let executor_id = reservation.executor_id.clone(); // Try and find a task in the graphs we already have locks on if let Ok(Some(assignment)) = find_next_task(&executor_id, &mut graphs) { debug!( - "Filled free reservation for executor {} with task {:?}", - reservation.executor_id, assignment.1 - ); + "Filled free reservation for executor {} with task {:?}", + reservation.executor_id, assignment.1 + ); // First check if we can find another task assignments.push(assignment); assigned = true; } else { // Otherwise start searching through other active jobs. debug!( - "Filling free reservation for executor {} from active jobs {:?}", - reservation.executor_id, other_jobs - ); - while let Some(job_id) = other_jobs.pop() { + "Filling free reservation for executor {} from active jobs {:?}", + reservation.executor_id, jobs + ); + while let Some(job_id) = jobs.pop() { if graphs.get(&job_id).is_none() { - // let lock = self.state.lock(Keyspace::ActiveJobs, &job_id).await?; let mut graph = self.get_execution_graph(&job_id).await?; if let Ok(Some(task)) = graph.pop_next_task(&executor_id) { debug!( - "Filled free reservation for executor {} with task {:?}", - reservation.executor_id, task - ); + "Filled free reservation for executor {} with task {:?}", + reservation.executor_id, task + ); assignments.push((executor_id.clone(), task)); // locks.push(lock); graphs.insert(job_id, graph); @@ -339,9 +297,9 @@ impl TaskManager if !assigned { debug!( - "Unable to fill reservation for executor {}, no tasks available", - executor_id - ); + "Unable to fill reservation for executor {}, no tasks available", + executor_id + ); unassigned.push(reservation); } } @@ -547,15 +505,6 @@ impl TaskManager } } - // This is a little hacky but since we can't make an optional - // primitive field in protobuf, we just use 0 to encode None. - // Should work since stage IDs are 1-indexed. - let output_link = if stage.output_link == 0 { - None - } else { - Some(stage.output_link as usize) - }; - let output_partitioning: Option = parse_protobuf_hash_partitioning( stage.output_partitioning.as_ref(), @@ -608,7 +557,11 @@ impl TaskManager inputs, plan, task_statuses, - output_link, + output_links: stage + .output_links + .into_iter() + .map(|l| l as usize) + .collect(), resolved: stage.resolved, stage_metrics, }; @@ -642,15 +595,6 @@ impl TaskManager .stages .into_iter() .map(|(stage_id, stage)| { - // This is a little hacky but since we can't make an optional - // primitive field in protobuf, we just use 0 to encode None. - // Should work since stage IDs are 1-indexed. - let output_link = if let Some(link) = stage.output_link { - link as u32 - } else { - 0 - }; - let mut plan: Vec = vec![]; U::try_from_physical_plan( @@ -715,7 +659,11 @@ impl TaskManager inputs, plan, task_statuses, - output_link, + output_links: stage + .output_links + .into_iter() + .map(|l| l as u32) + .collect(), resolved: stage.resolved, stage_metrics, })