Skip to content

Commit

Permalink
Remove Pod handling for instance deletion and controller cleanups
Browse files Browse the repository at this point in the history
Signed-off-by: Kate Goldenring <[email protected]>
  • Loading branch information
kate-goldenring committed Sep 25, 2024
1 parent e10c0ca commit fc437e0
Show file tree
Hide file tree
Showing 8 changed files with 309 additions and 535 deletions.
2 changes: 1 addition & 1 deletion agent/src/util/discovery_configuration_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ use crate::discovery_handler_manager::{
discovery_handler_registry::DiscoveryHandlerRegistry, DiscoveryError,
};

use kube::{Resource, ResourceExt};
use kube::runtime::{
controller::Action,
reflector::{ObjectRef, Store},
Controller,
};
use kube::{Resource, ResourceExt};
use thiserror::Error;

#[derive(Debug, Error)]
Expand Down
4 changes: 2 additions & 2 deletions controller/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>
Arc::new(kube::Client::try_default().await?),
CONTROLLER_FIELD_MANAGER_ID,
));
let instance_water_ctx = controller_ctx.clone();
let instance_watcher_ctx = controller_ctx.clone();
let node_watcher_ctx = controller_ctx.clone();
let pod_watcher_ctx = controller_ctx.clone();

// Handle instance changes
tasks.push(tokio::spawn(async {
instance_action::run(instance_water_ctx).await;
instance_action::run(instance_watcher_ctx).await;
}));
// Watch for node disappearance
tasks.push(tokio::spawn(async {
Expand Down
215 changes: 58 additions & 157 deletions controller/src/util/instance_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,35 +61,18 @@ fn error_policy(
Action::requeue(std::time::Duration::from_secs(5 * 60))
}

/// Instance action types
/// Instance event types
///
/// Instance actions describe the types of actions the Controller can
/// react to for Instances.
///
/// This will determine what broker management actions to take (if any)
///
/// | --> InstanceAction::Add
/// | --> Instance Applied
/// | --> No broker => Do nothing
/// | --> <BrokerSpec::BrokerJobSpec> => Deploy a Job
/// | --> <BrokerSpec::BrokerPodSpec> => Deploy Pod to each Node on Instance's `nodes` list (up to `capacity` total)
/// | --> InstanceAction::Remove
/// | --> No broker => Do nothing
/// | --> <BrokerSpec::BrokerJobSpec> => Delete all Jobs labeled with the Instance name
/// | --> <BrokerSpec::BrokerPodSpec> => Delete all Pods labeled with the Instance name
/// | --> InstanceAction::Update
/// | --> No broker => Do nothing
/// | --> <BrokerSpec::BrokerJobSpec> => No nothing
/// | --> <BrokerSpec::BrokerPodSpec> => Ensure that each Node on Instance's `nodes` list (up to `capacity` total) have a Pod
///
#[derive(Clone, Debug, PartialEq)]
pub enum InstanceAction {
/// An Instance is added
Add,
/// An Instance is removed
Remove,
/// An Instance is updated
Update,
}
/// | --> <BrokerSpec::BrokerJobSpec> => Deploy a Job if one does not exist
/// | --> <BrokerSpec::BrokerPodSpec> => Ensure that each Node on Instance's `nodes` list (up to `capacity` total) have a Pod.
/// Deploy Pods as necessary
/// This function is the main Reconcile function for Instance resources
/// This will get called every time an Instance gets added or is changed, it will also be called for every existing instance on startup.
Expand All @@ -108,11 +91,10 @@ pub async fn reconcile(instance: Arc<Instance>, ctx: Arc<ControllerContext>) ->

async fn reconcile_inner(event: Event<Instance>, ctx: Arc<ControllerContext>) -> Result<Action> {
match event {
Event::Apply(instance) => {
handle_instance_change(&instance, &InstanceAction::Add, ctx.clone()).await
}
Event::Cleanup(instance) => {
handle_instance_change(&instance, &InstanceAction::Remove, ctx.clone()).await
Event::Apply(instance) => handle_instance_change(&instance, ctx.clone()).await,
Event::Cleanup(_) => {
// Do nothing. OwnerReferences are attached to Jobs and Pods to automate cleanup
Ok(default_requeue_action())
}
}
}
Expand All @@ -137,11 +119,7 @@ pub(crate) struct PodContext {

pub(crate) fn create_pod_context(k8s_pod: &Pod, action: PodAction) -> anyhow::Result<PodContext> {
let pod_name = k8s_pod.metadata.name.as_ref().unwrap();
let labels = &k8s_pod
.metadata
.labels
.as_ref()
.ok_or_else(|| anyhow::anyhow!("no labels found for Pod {:?}", pod_name))?;
let labels = &k8s_pod.labels();
// Early exits above ensure unwrap will not panic
let node_to_run_pod_on = labels.get(AKRI_TARGET_NODE_LABEL_NAME).ok_or_else(|| {
anyhow::anyhow!(
Expand All @@ -163,7 +141,6 @@ pub(crate) fn create_pod_context(k8s_pod: &Pod, action: PodAction) -> anyhow::Re
/// it will update the nodes_to_act_on map with the required action.
fn determine_action_for_pod(
k8s_pod: &Pod,
action: &InstanceAction,
nodes_to_act_on: &mut HashMap<String, PodContext>,
) -> anyhow::Result<()> {
let pod_name = k8s_pod.metadata.name.as_ref().unwrap();
Expand All @@ -185,7 +162,6 @@ fn determine_action_for_pod(
pending_grace_time_in_minutes: PENDING_POD_GRACE_PERIOD_MINUTES,
ended_grace_time_in_minutes: FAILED_POD_GRACE_PERIOD_MINUTES,
phase: pod_phase.to_string(),
instance_action: action.clone(),
status_start_time: pod_start_time,
unknown_node: !nodes_to_act_on.contains_key(node_to_run_pod_on),
trace_node_name: k8s_pod.metadata.name.clone().unwrap(),
Expand Down Expand Up @@ -326,40 +302,34 @@ async fn handle_addition_work(
/// 2) calling the appropriate handler depending on the broker type (Pod or Job) if any
pub async fn handle_instance_change(
instance: &Instance,
action: &InstanceAction,
ctx: Arc<ControllerContext>,
) -> Result<Action> {
trace!("handle_instance_change - enter {:?}", action);
trace!("handle_instance_change - enter");
let instance_namespace = instance
.metadata
.namespace
.as_ref()
.context("no namespace")?;
let api: Box<dyn Api<Configuration>> = ctx.client.namespaced(instance_namespace);
let Ok(Some(configuration)) = api.get(&instance.spec.configuration_name).await else {
if action != &InstanceAction::Remove {
// In this scenario, a configuration has been deleted without the Akri Agent deleting the associated Instances.
// Furthermore, Akri Agent is still modifying the Instances. This should not happen beacuse Agent
// is designed to shutdown when it's Configuration watcher fails.
error!(
// In this scenario, a configuration has been deleted without the Akri Agent deleting the associated Instances.
// Furthermore, Akri Agent is still modifying the Instances. This should not happen beacuse Agent
// is designed to shutdown when it's Configuration watcher fails.
error!(
"handle_instance_change - no configuration found for {:?} yet instance {:?} exists - check that device plugin is running properly",
&instance.spec.configuration_name, &instance.metadata.name
);
}

return Ok(default_requeue_action());
};
if let Some(broker_spec) = &configuration.spec.broker_spec {
let instance_change_result = match broker_spec {
BrokerSpec::BrokerPodSpec(p) => {
handle_instance_change_pod(instance, p, action, ctx).await
}
BrokerSpec::BrokerPodSpec(p) => handle_instance_change_pod(instance, p, ctx).await,
BrokerSpec::BrokerJobSpec(j) => {
handle_instance_change_job(
instance,
*configuration.metadata.generation.as_ref().unwrap(),
j,
action,
ctx.client.clone(),
)
.await
Expand All @@ -373,17 +343,24 @@ pub async fn handle_instance_change(
}

/// Called when an Instance has changed that requires a Job broker. Action determined by InstanceAction.
/// InstanceAction::Add => Deploy a Job with JobSpec from Configuration. Label with Instance name.
/// InstanceAction::Remove => Delete all Jobs labeled with the Instance name
/// InstanceAction::Update => No nothing
/// First check if a job with the instance name exists. If it does, do nothing. Otherwise, deploy a Job
/// with JobSpec from Configuration and label with Instance name.
pub async fn handle_instance_change_job(
instance: &Instance,
config_generation: i64,
job_spec: &JobSpec,
action: &InstanceAction,
client: Arc<dyn ControllerKubeClient>,
) -> anyhow::Result<()> {
trace!("handle_instance_change_job - enter {:?}", action);
trace!("handle_instance_change_job - enter");
let api: Box<dyn Api<Job>> = client.namespaced(instance.metadata.namespace.as_ref().unwrap());
if api
.get(instance.metadata.name.as_ref().unwrap())
.await?
.is_some()
{
// Job already exists, do nothing
return Ok(());
}
// Create name for Job. Includes Configuration generation in the suffix
// to track what version of the Configuration the Job is associated with.
let job_name = pod::create_broker_app_name(
Expand All @@ -395,53 +372,23 @@ pub async fn handle_instance_change_job(

let instance_name = instance.metadata.name.as_ref().unwrap();
let instance_namespace = instance.metadata.namespace.as_ref().unwrap();
let instance_uid = instance
.metadata
.uid
.as_ref()
.ok_or_else(|| anyhow::anyhow!("UID not found for instance: {}", &instance_name))?;
match action {
InstanceAction::Add => {
trace!("handle_instance_change_job - instance added");
let capability_id = format!("{}/{}", AKRI_PREFIX, instance_name);
let new_job = job::create_new_job_from_spec(
instance,
OwnershipInfo::new(
OwnershipType::Instance,
instance_name.to_string(),
instance_uid.to_string(),
),
&capability_id,
job_spec,
&job_name,
)?;
let api: Box<dyn Api<Job>> = client.namespaced(instance_namespace);
api.create(&new_job).await?;
}
InstanceAction::Remove => {
trace!("handle_instance_change_job - instance removed");
// Find all jobs with the label
let api: Box<dyn Api<Job>> = client.namespaced(instance_namespace);
let lp = ListParams::default()
.labels(&format!("{}={}", AKRI_INSTANCE_LABEL_NAME, instance_name));
match api.delete_collection(&lp).await? {
either::Either::Left(list) => {
let names: Vec<_> = list.iter().map(ResourceExt::name_any).collect();
trace!("handle_instance_change_job - deleting jobs: {:?}", names);
}
either::Either::Right(status) => {
println!(
"handle_instance_change_job - deleted jobs: status={:?}",
status
);
}
}
}
InstanceAction::Update => {
trace!("handle_instance_change_job - instance updated");
// TODO: Broker could have encountered unexpected admission error and need to be removed and added
}
}
let instance_uid = instance.metadata.uid.as_ref().unwrap();
trace!("handle_instance_change_job - instance added");
let capability_id = format!("{}/{}", AKRI_PREFIX, instance_name);
let new_job = job::create_new_job_from_spec(
instance,
OwnershipInfo::new(
OwnershipType::Instance,
instance_name.to_string(),
instance_uid.to_string(),
),
&capability_id,
job_spec,
&job_name,
)?;
let api: Box<dyn Api<Job>> = client.namespaced(instance_namespace);
// TODO: Consider using server side apply instead of create
api.create(&new_job).await?;
Ok(())
}

Expand All @@ -454,19 +401,15 @@ pub async fn handle_instance_change_job(
pub async fn handle_instance_change_pod(
instance: &Instance,
podspec: &PodSpec,
action: &InstanceAction,
ctx: Arc<ControllerContext>,
) -> anyhow::Result<()> {
trace!("handle_instance_change_pod - enter {:?}", action);
trace!("handle_instance_change_pod - enter");

let instance_name = instance.metadata.name.clone().unwrap();

// If InstanceAction::Remove, assume all nodes require PodAction::NoAction (reflect that there is no running Pod unless we find one)
// Otherwise, assume all nodes require PodAction::Add (reflect that there is no running Pod, unless we find one)
let default_action = match action {
InstanceAction::Remove => PodAction::NoAction,
_ => PodAction::Add,
};
let default_action = PodAction::Add;
let mut nodes_to_act_on: HashMap<String, PodContext> = instance
.spec
.nodes
Expand Down Expand Up @@ -505,7 +448,7 @@ pub async fn handle_instance_change_pod(
instance_pods
.items
.iter()
.try_for_each(|x| determine_action_for_pod(x, action, &mut nodes_to_act_on))?;
.try_for_each(|x| determine_action_for_pod(x, &mut nodes_to_act_on))?;

trace!(
"handle_instance_change - nodes tracked after querying existing pods={:?}",
Expand Down Expand Up @@ -602,6 +545,16 @@ mod handle_instance_tests {
use chrono::Utc;
use mockall::predicate::*;

#[derive(Clone, Debug, PartialEq)]
pub enum InstanceAction {
/// An Instance is added
Add,
/// An Instance is removed
Remove,
/// An Instance is updated
Update,
}

#[derive(Clone)]
struct HandleInstanceWork {
find_pods_selector: &'static str,
Expand Down Expand Up @@ -828,32 +781,6 @@ mod handle_instance_tests {
.await;
}

#[tokio::test]
async fn test_handle_instance_change_for_remove_running_local_instance() {
let _ = env_logger::builder().is_test(true).try_init();

let mut mock = MockControllerKubeClient::default();
configure_for_handle_instance_change(
&mut mock,
&HandleInstanceWork {
find_pods_selector: "akri.sh/instance=config-a-b494b6",
find_pods_result: "../test/json/running-pod-list-for-config-a-local.json",
find_pods_phase: None,
find_pods_start_time: None,
find_pods_delete_start_time: false,
config_work: get_config_work(),
deletion_work: Some(configure_deletion_work_for_config_a_b494b6()),
addition_work: None,
},
);
run_handle_instance_change_test(
Arc::new(ControllerContext::new(Arc::new(mock), "test")),
"../test/json/local-instance.json",
&InstanceAction::Remove,
)
.await;
}

#[tokio::test]
async fn test_handle_instance_change_for_add_new_shared_instance() {
let _ = env_logger::builder().is_test(true).try_init();
Expand Down Expand Up @@ -882,32 +809,6 @@ mod handle_instance_tests {
.await;
}

#[tokio::test]
async fn test_handle_instance_change_for_remove_running_shared_instance() {
let _ = env_logger::builder().is_test(true).try_init();

let mut mock = MockControllerKubeClient::default();
configure_for_handle_instance_change(
&mut mock,
&HandleInstanceWork {
find_pods_selector: "akri.sh/instance=config-a-359973",
find_pods_result: "../test/json/running-pod-list-for-config-a-shared.json",
find_pods_phase: None,
find_pods_start_time: None,
find_pods_delete_start_time: false,
config_work: get_config_work(),
deletion_work: Some(configure_deletion_work_for_config_a_359973()),
addition_work: None,
},
);
run_handle_instance_change_test(
Arc::new(ControllerContext::new(Arc::new(mock), "test")),
"../test/json/shared-instance.json",
&InstanceAction::Remove,
)
.await;
}

#[tokio::test]
async fn test_handle_instance_change_for_update_active_shared_instance() {
let _ = env_logger::builder().is_test(true).try_init();
Expand Down
Loading

0 comments on commit fc437e0

Please sign in to comment.