Skip to content

Commit

Permalink
Update controller and agent to kube-rs 0.91.0
Browse files Browse the repository at this point in the history
Signed-off-by: Kate Goldenring <[email protected]>
  • Loading branch information
kate-goldenring committed Sep 25, 2024
1 parent 3050186 commit e10c0ca
Show file tree
Hide file tree
Showing 22 changed files with 2,277 additions and 4,546 deletions.
525 changes: 390 additions & 135 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ env_logger = "0.10.0"
futures = { version = "0.3.1", package = "futures" }
hyper = "0.14.2"
itertools = "0.12.0"
k8s-openapi = { version = "0.20.0", default-features = false, features = ["schemars", "v1_23"] }
kube = { version = "0.87.1", features = ["derive"] }
kube-runtime = { version = "0.87.1", features = ["unstable-runtime-reconcile-on"] }
k8s-openapi = { version = "0.22.0", default-features = false, features = ["schemars", "v1_25"] }
kube = { version = "0.91.0", features = [ "derive", "runtime"] }
kube-runtime = { version = "0.91.0", features = ["unstable-runtime-reconcile-on"] }
lazy_static = "1.4"
log = "0.4"
mockall_double = "0.3.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use futures::future::try_join_all;
use futures::FutureExt;
use itertools::Itertools;
use kube::core::ObjectMeta;
use kube_runtime::reflector::ObjectRef;
use kube::runtime::reflector::ObjectRef;
use tokio::select;
use tokio::sync::mpsc;
use tokio::sync::watch;
Expand Down
2 changes: 1 addition & 1 deletion agent/src/discovery_handler_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{collections::HashMap, sync::Arc};
use akri_shared::{akri::configuration::Configuration, k8s::api::IntoApi};
use k8s_openapi::api::core::v1::{ConfigMap, Secret};

use kube_runtime::reflector::ObjectRef;
use kube::runtime::reflector::ObjectRef;
use thiserror::Error;
use tokio::sync::{mpsc, watch};

Expand Down
2 changes: 1 addition & 1 deletion agent/src/util/discovery_configuration_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::discovery_handler_manager::{
};

use kube::{Resource, ResourceExt};
use kube_runtime::{
use kube::runtime::{
controller::Action,
reflector::{ObjectRef, Store},
Controller,
Expand Down
13 changes: 8 additions & 5 deletions controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ akri-shared = { path = "../shared" }
anyhow = "1.0.38"
async-std = "1.5.0"
chrono = "0.4.10"
env_logger = "0.10.0"
either = "1.13"
env_logger = "0.11.5"
futures = "0.3.1"
k8s-openapi = { version = "0.20.0", default-features = false, features = ["schemars", "v1_23"] }
kube = { version = "0.87.1", features = ["derive"] }
kube-runtime = "0.87.1"
k8s-openapi = { version = "0.22.0", default-features = false, features = ["schemars", "v1_25"] }
kube = { version = "0.91.0", features = ["runtime", "client", "derive" ] }
lazy_static = "1.4"
log = "0.4"
prometheus = { version = "0.12.0", features = ["process"] }
prometheus = { version = "0.13.4", features = ["process"] }
# Used for patch API
serde_json = "1.0.45"
thiserror = "1"
tokio = { version = "1.0.2", features = ["full"] }

[dev-dependencies]
Expand Down
49 changes: 19 additions & 30 deletions controller/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ extern crate lazy_static;
mod util;

use akri_shared::akri::{metrics::run_metrics_server, API_NAMESPACE};
use async_std::sync::Mutex;
use prometheus::IntGaugeVec;
use std::sync::Arc;
use util::{instance_action, node_watcher, pod_watcher};
use util::{
controller_ctx::{ControllerContext, CONTROLLER_FIELD_MANAGER_ID},
instance_action, node_watcher, pod_watcher,
};

/// Length of time to sleep between controller system validation checks
pub const SYSTEM_CHECK_DELAY_SECS: u64 = 30;
Expand All @@ -32,43 +34,30 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>
);

log::info!("{} Controller logging started", API_NAMESPACE);

let synchronization = Arc::new(Mutex::new(()));
let instance_watch_synchronization = synchronization.clone();
let mut tasks = Vec::new();

// Start server for prometheus metrics
tasks.push(tokio::spawn(async move {
run_metrics_server().await.unwrap();
}));
tokio::spawn(run_metrics_server());

let controller_ctx = Arc::new(ControllerContext::new(
Arc::new(kube::Client::try_default().await?),
CONTROLLER_FIELD_MANAGER_ID,
));
let instance_water_ctx = controller_ctx.clone();
let node_watcher_ctx = controller_ctx.clone();
let pod_watcher_ctx = controller_ctx.clone();

// Handle existing instances
tasks.push(tokio::spawn({
async move {
instance_action::handle_existing_instances().await.unwrap();
}
}));
// Handle instance changes
tasks.push(tokio::spawn({
async move {
instance_action::do_instance_watch(instance_watch_synchronization)
.await
.unwrap();
}
tasks.push(tokio::spawn(async {
instance_action::run(instance_water_ctx).await;
}));
// Watch for node disappearance
tasks.push(tokio::spawn({
async move {
let mut node_watcher = node_watcher::NodeWatcher::new();
node_watcher.watch().await.unwrap();
}
tasks.push(tokio::spawn(async {
node_watcher::run(node_watcher_ctx).await;
}));
// Watch for broker Pod state changes
tasks.push(tokio::spawn({
async move {
let mut broker_pod_watcher = pod_watcher::BrokerPodWatcher::new();
broker_pod_watcher.watch().await.unwrap();
}
tasks.push(tokio::spawn(async {
pod_watcher::run(pod_watcher_ctx).await;
}));

futures::future::try_join_all(tasks).await?;
Expand Down
106 changes: 106 additions & 0 deletions controller/src/util/controller_ctx.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use std::collections::HashMap;
use std::sync::Arc;

use akri_shared::akri::configuration::Configuration;
use akri_shared::akri::instance::Instance;
use akri_shared::k8s::api::IntoApi;

use k8s_openapi::api::batch::v1::Job;
use k8s_openapi::api::core::v1::{Node, Pod, Service};

use tokio::sync::RwLock;

// Identifier for the controller to be set as the field manager for server-side apply
pub const CONTROLLER_FIELD_MANAGER_ID: &str = "akri.sh/controller";

/// Pod states that BrokerPodWatcher is interested in
///
/// PodState describes the various states that the controller can
/// react to for Pods.
#[derive(Clone, Debug, PartialEq)]
pub enum PodState {
/// Pod is in Pending state and no action is needed.
Pending,
/// Pod is in Running state and needs to ensure that
/// instance and configuration services are running
Running,
/// Pod is in Failed/Completed/Succeeded state and
/// needs to remove any instance and configuration
/// services that are not supported by other Running
/// Pods. Also, at this point, if an Instance still
/// exists, instance_action::handle_instance_change
/// needs to be called to ensure that Pods are
/// restarted
Ended,
/// Pod is in Deleted state and needs to remove any
/// instance and configuration services that are not
/// supported by other Running Pods. Also, at this
/// point, if an Instance still exists, and the Pod is
/// owned by the Instance,
/// instance_action::handle_instance_change needs to be
/// called to ensure that Pods are restarted. Akri
/// places an Instance OwnerReference on all the Pods it
/// deploys. This declares that the Instance owns that
/// Pod and Akri's Controller explicitly manages its
/// deployment. However, if the Pod is not owned by the
/// Instance, Akri should not assume retry logic and
/// should cease action. The owning object (ie Job) will
/// handle retries as necessary.
Deleted,
}

/// Node states that NodeWatcher is interested in
///
/// NodeState describes the various states that the controller can
/// react to for Nodes.
#[derive(Clone, Debug, PartialEq)]
pub enum NodeState {
/// Node has been seen, but not Running yet
Known,
/// Node has been seen Running
Running,
/// A previously Running Node has been seen as not Running
/// and the Instances have been cleaned of references to that
/// vanished Node
InstancesCleaned,
}

pub trait ControllerKubeClient:
IntoApi<Instance>
+ IntoApi<Configuration>
+ IntoApi<Pod>
+ IntoApi<Job>
+ IntoApi<Service>
+ IntoApi<Node>
{
}

impl<
T: IntoApi<Instance>
+ IntoApi<Configuration>
+ IntoApi<Pod>
+ IntoApi<Job>
+ IntoApi<Service>
+ IntoApi<Node>,
> ControllerKubeClient for T
{
}

pub struct ControllerContext {
/// Kubernetes client
pub client: Arc<dyn ControllerKubeClient>,
pub known_pods: Arc<RwLock<HashMap<String, PodState>>>,
pub known_nodes: Arc<RwLock<HashMap<String, NodeState>>>,
pub identifier: String,
}

impl ControllerContext {
pub fn new(client: Arc<dyn ControllerKubeClient>, identifier: &str) -> Self {
ControllerContext {
client,
known_pods: Arc::new(RwLock::new(HashMap::new())),
known_nodes: Arc::new(RwLock::new(HashMap::new())),
identifier: identifier.to_string(),
}
}
}
Loading

0 comments on commit e10c0ca

Please sign in to comment.