Skip to content

Commit

Permalink
Ignore the previous job_id inside fill_reservations() (#141)
Browse files Browse the repository at this point in the history
* Normalize the serialization and deserialization places of protobuf structs

* Use a vector to indicate the output links for ExecutionStage

* Ignore the previous job_id inside fill_reservations()

Co-authored-by: yangzhong <[email protected]>
  • Loading branch information
yahoNanJing and kyotoYaho authored Aug 17, 2022
1 parent e8bc8bb commit b72fdd3
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 272 deletions.
2 changes: 1 addition & 1 deletion ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ message ExecutionGraphStage {
repeated GraphStageInput inputs = 4;
bytes plan = 5;
repeated TaskStatus task_statuses = 6;
uint32 output_link = 7;
repeated uint32 output_links = 7;
bool resolved = 8;
repeated OperatorMetricsSet stage_metrics = 9;
}
Expand Down
81 changes: 80 additions & 1 deletion ballista/rust/core/src/serde/scheduler/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ use crate::error::BallistaError;
use crate::serde::protobuf;
use crate::serde::protobuf::action::ActionType;
use crate::serde::protobuf::{operator_metric, NamedCount, NamedGauge, NamedTime};
use crate::serde::scheduler::{Action, PartitionId, PartitionLocation, PartitionStats};
use crate::serde::scheduler::{
Action, ExecutorData, ExecutorMetadata, ExecutorSpecification, ExecutorState,
PartitionId, PartitionLocation, PartitionStats,
};

impl TryInto<Action> for protobuf::Action {
type Error = BallistaError;
Expand Down Expand Up @@ -202,3 +205,79 @@ impl TryInto<MetricsSet> for protobuf::OperatorMetricsSet {
Ok(ms)
}
}

#[allow(clippy::from_over_into)]
impl Into<ExecutorMetadata> for protobuf::ExecutorMetadata {
fn into(self) -> ExecutorMetadata {
ExecutorMetadata {
id: self.id,
host: self.host,
port: self.port as u16,
grpc_port: self.grpc_port as u16,
specification: self.specification.unwrap().into(),
}
}
}

#[allow(clippy::from_over_into)]
impl Into<ExecutorSpecification> for protobuf::ExecutorSpecification {
fn into(self) -> ExecutorSpecification {
let mut ret = ExecutorSpecification { task_slots: 0 };
for resource in self.resources {
if let Some(protobuf::executor_resource::Resource::TaskSlots(task_slots)) =
resource.resource
{
ret.task_slots = task_slots
}
}
ret
}
}

#[allow(clippy::from_over_into)]
impl Into<ExecutorData> for protobuf::ExecutorData {
fn into(self) -> ExecutorData {
let mut ret = ExecutorData {
executor_id: self.executor_id,
total_task_slots: 0,
available_task_slots: 0,
};
for resource in self.resources {
if let Some(task_slots) = resource.total {
if let Some(protobuf::executor_resource::Resource::TaskSlots(
task_slots,
)) = task_slots.resource
{
ret.total_task_slots = task_slots
}
};
if let Some(task_slots) = resource.available {
if let Some(protobuf::executor_resource::Resource::TaskSlots(
task_slots,
)) = task_slots.resource
{
ret.available_task_slots = task_slots
}
};
}
ret
}
}

#[allow(clippy::from_over_into)]
impl Into<ExecutorState> for protobuf::ExecutorState {
fn into(self) -> ExecutorState {
let mut ret = ExecutorState {
available_memory_size: u64::MAX,
};
for metric in self.metrics {
if let Some(protobuf::executor_metric::Metric::AvailableMemory(
available_memory_size,
)) = metric.metric
{
ret.available_memory_size = available_memory_size
}
}
ret
}
}
146 changes: 0 additions & 146 deletions ballista/rust/core/src/serde/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::Partitioning;
use serde::Serialize;

use super::protobuf;
use crate::error::BallistaError;

pub mod from_proto;
Expand Down Expand Up @@ -80,65 +79,12 @@ pub struct ExecutorMetadata {
pub specification: ExecutorSpecification,
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::ExecutorMetadata> for ExecutorMetadata {
fn into(self) -> protobuf::ExecutorMetadata {
protobuf::ExecutorMetadata {
id: self.id,
host: self.host,
port: self.port as u32,
grpc_port: self.grpc_port as u32,
specification: Some(self.specification.into()),
}
}
}

impl From<protobuf::ExecutorMetadata> for ExecutorMetadata {
fn from(meta: protobuf::ExecutorMetadata) -> Self {
Self {
id: meta.id,
host: meta.host,
port: meta.port as u16,
grpc_port: meta.grpc_port as u16,
specification: meta.specification.unwrap().into(),
}
}
}

/// Specification of an executor, indicting executor resources, like total task slots
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
pub struct ExecutorSpecification {
pub task_slots: u32,
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::ExecutorSpecification> for ExecutorSpecification {
fn into(self) -> protobuf::ExecutorSpecification {
protobuf::ExecutorSpecification {
resources: vec![protobuf::executor_resource::Resource::TaskSlots(
self.task_slots,
)]
.into_iter()
.map(|r| protobuf::ExecutorResource { resource: Some(r) })
.collect(),
}
}
}

impl From<protobuf::ExecutorSpecification> for ExecutorSpecification {
fn from(input: protobuf::ExecutorSpecification) -> Self {
let mut ret = Self { task_slots: 0 };
for resource in input.resources {
if let Some(protobuf::executor_resource::Resource::TaskSlots(task_slots)) =
resource.resource
{
ret.task_slots = task_slots
}
}
ret
}
}

/// From Spark, available resources for an executor, like available task slots
#[derive(Debug, Clone, Serialize)]
pub struct ExecutorData {
Expand All @@ -152,105 +98,13 @@ pub struct ExecutorDataChange {
pub task_slots: i32,
}

struct ExecutorResourcePair {
total: protobuf::executor_resource::Resource,
available: protobuf::executor_resource::Resource,
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::ExecutorData> for ExecutorData {
fn into(self) -> protobuf::ExecutorData {
protobuf::ExecutorData {
executor_id: self.executor_id,
resources: vec![ExecutorResourcePair {
total: protobuf::executor_resource::Resource::TaskSlots(
self.total_task_slots,
),
available: protobuf::executor_resource::Resource::TaskSlots(
self.available_task_slots,
),
}]
.into_iter()
.map(|r| protobuf::ExecutorResourcePair {
total: Some(protobuf::ExecutorResource {
resource: Some(r.total),
}),
available: Some(protobuf::ExecutorResource {
resource: Some(r.available),
}),
})
.collect(),
}
}
}

impl From<protobuf::ExecutorData> for ExecutorData {
fn from(input: protobuf::ExecutorData) -> Self {
let mut ret = Self {
executor_id: input.executor_id,
total_task_slots: 0,
available_task_slots: 0,
};
for resource in input.resources {
if let Some(task_slots) = resource.total {
if let Some(protobuf::executor_resource::Resource::TaskSlots(
task_slots,
)) = task_slots.resource
{
ret.total_task_slots = task_slots
}
};
if let Some(task_slots) = resource.available {
if let Some(protobuf::executor_resource::Resource::TaskSlots(
task_slots,
)) = task_slots.resource
{
ret.available_task_slots = task_slots
}
};
}
ret
}
}

/// The internal state of an executor, like cpu usage, memory usage, etc
#[derive(Debug, Clone, Copy, Serialize)]
pub struct ExecutorState {
// in bytes
pub available_memory_size: u64,
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::ExecutorState> for ExecutorState {
fn into(self) -> protobuf::ExecutorState {
protobuf::ExecutorState {
metrics: vec![protobuf::executor_metric::Metric::AvailableMemory(
self.available_memory_size,
)]
.into_iter()
.map(|m| protobuf::ExecutorMetric { metric: Some(m) })
.collect(),
}
}
}

impl From<protobuf::ExecutorState> for ExecutorState {
fn from(input: protobuf::ExecutorState) -> Self {
let mut ret = Self {
available_memory_size: u64::MAX,
};
for metric in input.metrics {
if let Some(protobuf::executor_metric::Metric::AvailableMemory(
available_memory_size,
)) = metric.metric
{
ret.available_memory_size = available_memory_size
}
}
ret
}
}

/// Summary of executed partition
#[derive(Debug, Copy, Clone, Default)]
pub struct PartitionStats {
Expand Down
79 changes: 78 additions & 1 deletion ballista/rust/core/src/serde/scheduler/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ use std::convert::TryInto;
use crate::error::BallistaError;
use crate::serde::protobuf;
use crate::serde::protobuf::action::ActionType;

use crate::serde::protobuf::{operator_metric, NamedCount, NamedGauge, NamedTime};
use crate::serde::scheduler::{Action, PartitionId, PartitionLocation, PartitionStats};
use crate::serde::scheduler::{
Action, ExecutorData, ExecutorMetadata, ExecutorSpecification, ExecutorState,
PartitionId, PartitionLocation, PartitionStats,
};
use datafusion::physical_plan::Partitioning;

impl TryInto<protobuf::Action> for Action {
Expand Down Expand Up @@ -171,3 +175,76 @@ impl TryInto<protobuf::OperatorMetricsSet> for MetricsSet {
Ok(protobuf::OperatorMetricsSet { metrics })
}
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::ExecutorMetadata> for ExecutorMetadata {
fn into(self) -> protobuf::ExecutorMetadata {
protobuf::ExecutorMetadata {
id: self.id,
host: self.host,
port: self.port as u32,
grpc_port: self.grpc_port as u32,
specification: Some(self.specification.into()),
}
}
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::ExecutorSpecification> for ExecutorSpecification {
fn into(self) -> protobuf::ExecutorSpecification {
protobuf::ExecutorSpecification {
resources: vec![protobuf::executor_resource::Resource::TaskSlots(
self.task_slots,
)]
.into_iter()
.map(|r| protobuf::ExecutorResource { resource: Some(r) })
.collect(),
}
}
}

struct ExecutorResourcePair {
total: protobuf::executor_resource::Resource,
available: protobuf::executor_resource::Resource,
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::ExecutorData> for ExecutorData {
fn into(self) -> protobuf::ExecutorData {
protobuf::ExecutorData {
executor_id: self.executor_id,
resources: vec![ExecutorResourcePair {
total: protobuf::executor_resource::Resource::TaskSlots(
self.total_task_slots,
),
available: protobuf::executor_resource::Resource::TaskSlots(
self.available_task_slots,
),
}]
.into_iter()
.map(|r| protobuf::ExecutorResourcePair {
total: Some(protobuf::ExecutorResource {
resource: Some(r.total),
}),
available: Some(protobuf::ExecutorResource {
resource: Some(r.available),
}),
})
.collect(),
}
}
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::ExecutorState> for ExecutorState {
fn into(self) -> protobuf::ExecutorState {
protobuf::ExecutorState {
metrics: vec![protobuf::executor_metric::Metric::AvailableMemory(
self.available_memory_size,
)]
.into_iter()
.map(|m| protobuf::ExecutorMetric { metric: Some(m) })
.collect(),
}
}
}
Loading

0 comments on commit b72fdd3

Please sign in to comment.