Skip to content

Commit

Permalink
Abort if sidecar startup fails. (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
cjyar authored Jul 10, 2023
1 parent cc0222c commit 15b2d71
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 42 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "proa"
version = "0.1.1"
version = "0.1.2"
authors = ["IronCore Labs <[email protected]>"]
edition = "2021"
description = "Manage Kubernetes sidecar container lifecycle."
Expand Down
182 changes: 151 additions & 31 deletions src/k8s.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ pub async fn wait_for_ready() -> Result<Pod, Error> {
.next()
.await
.ok_or(anyhow!("Pod was never ready"))?;
info!("Pod is ready");
Ok(ready_pod)
info!(err = ?ready_pod.as_ref().err(), "Done waiting for Pod.");
ready_pod
}

/// Return a stream providing Pod events about the pod we're running in.
Expand All @@ -47,10 +47,9 @@ pub async fn watch_my_pod() -> Result<impl Stream<Item = Result<Option<Pod>, Err
Ok(pod)
}

/// If error, log it.
/// If all the pod's containers but this one are ready, return the pod.
/// Else return None.
async fn filter_ready(pod: Result<Option<Pod>, Error>) -> Option<Pod> {
/// If we're done waiting for readiness, return something: either the ready Pod or an error.
/// If we're not done waiting, return None.
async fn filter_ready(pod: Result<Option<Pod>, Error>) -> Option<Result<Pod, Error>> {
match pod {
Err(e) => {
info!("Watch error: {}", e);
Expand All @@ -62,46 +61,112 @@ async fn filter_ready(pod: Result<Option<Pod>, Error>) -> Option<Pod> {
}
Ok(Some(p)) => {
debug!("Saw Pod {}...", p.name_any());
is_ready(&p).map_or_else(
|e| {
match is_ready(&p) {
// Keep waiting for readiness.
WatchResult::NotReady => None,
// If we see a k8s API error, log it and keep waiting.
WatchResult::ApiError(e) => {
info!("Unsure if ready: {}", e);
None
},
|ready| if ready { Some(p) } else { None },
)
}
// If all the sidecars are ready, return the Pod.
WatchResult::Ready => Some(Ok(p)),
// One of the sidecars terminated.
WatchResult::PodError(e) => {
if p.spec
.map(|s| s.restart_policy == Some("Never".to_string()))
.unwrap_or(true)
{
// If restartPolicy == Never, then return an error because there's no point in waiting.
Some(Err(e))
} else {
// Any other restartPolicy means k8s will restart the sidecar; we should keep waiting for readiness.
None
}
}
}
}
}
}

/// The result of watching a Pod.
enum WatchResult {
/// The Pod isn't ready yet.
NotReady,
/// The Pod is ready to execute the main program.
Ready,
/// Encountered a k8s API error while watching the Pod.
ApiError(Error),
/// The Pod (probably one of it containers) experienced an error.
PodError(Error),
}

/// Return true if this Pod is ready for the main process to start. That means all the containers except the main one are signaling
/// ready status.
fn is_ready(pod: &Pod) -> Result<bool, Error> {
fn is_ready(pod: &Pod) -> WatchResult {
let span = debug_span!("is_ready");
let _enter = span.enter();

// The name of the main container in the Pod. For now we pick containers[0].
let main_cont_name = &pod
.spec
.as_ref()
.ok_or(anyhow!("No pod.spec"))?
.containers
.get(0)
.ok_or(anyhow!("No pod.spec.containers[0]"))?
.name;
let main_cont_name = match main_cont_name(pod) {
Ok(name) => name,
Err(e) => return WatchResult::ApiError(e),
};

let status = &pod
// Are all of the sidecar containers ready?
let ready = &pod
.status
.as_ref()
.and_then(|s| {
s.container_statuses.as_ref().map(|s| {
s.iter()
.filter(|s| &s.name != main_cont_name)
.filter(|s| s.name != main_cont_name)
.all(|s| s.ready)
})
})
.unwrap_or(false);
debug!(status);
Ok(*status)
// Are any of the sidecar containers terminated?
let error = &pod.status.as_ref().and_then(|pod_stat| {
pod_stat.container_statuses.as_ref().map(|cont_stats| {
cont_stats
.iter()
.filter(|cont_stat| cont_stat.name != main_cont_name)
.any(|cont_stat| {
cont_stat
.state
.as_ref()
.map(|state| {
if state.terminated.is_some() {
debug!(container = cont_stat.name, "Sidecar container terminated");
true
} else {
false
}
})
.unwrap_or(false)
})
})
});
debug!(ready, error);
match (error, ready) {
(Some(true), _) => {
WatchResult::PodError(anyhow!("A sidecar container terminated prematurely"))
}
(_, false) => WatchResult::NotReady,
(_, true) => WatchResult::Ready,
}
}

fn main_cont_name(pod: &Pod) -> Result<String, Error> {
Ok(pod
.spec
.as_ref()
.ok_or(anyhow!("No pod.spec"))?
.containers
.get(0)
.ok_or(anyhow!("No pod.spec.containers[0]"))?
.name
.clone())
}

#[cfg(test)]
Expand All @@ -113,7 +178,7 @@ mod tests {
#[tokio::test]
async fn check_ready() -> Result<(), Error> {
// Pass in an error, it's not ready.
assert_eq!(filter_ready(Err(anyhow!["foo"])).await, None);
assert!(filter_ready(Err(anyhow!["foo"])).await.is_none());

// A pod where only the main container is ready.
let pod = object! {
Expand All @@ -128,7 +193,10 @@ mod tests {
}
};
let pod: Pod = serde_json::from_str(pod.dump().as_str())?;
assert_eq!(filter_ready(Ok(Some(pod.clone()))).await, Some(pod));
assert_eq!(
filter_ready(Ok(Some(pod.clone()))).await.unwrap().unwrap(),
pod
);

// A pod with only the main container, which isn't ready.
let pod = object! {
Expand All @@ -143,7 +211,10 @@ mod tests {
}
};
let pod: Pod = serde_json::from_str(pod.dump().as_str())?;
assert_eq!(filter_ready(Ok(Some(pod.clone()))).await, Some(pod));
assert_eq!(
filter_ready(Ok(Some(pod.clone()))).await.unwrap().unwrap(),
pod
);

// A pod with one ready sidecar.
let pod = object! {
Expand All @@ -164,7 +235,10 @@ mod tests {
}
};
let pod: Pod = serde_json::from_str(pod.dump().as_str())?;
assert_eq!(filter_ready(Ok(Some(pod.clone()))).await, Some(pod));
assert_eq!(
filter_ready(Ok(Some(pod.clone()))).await.unwrap().unwrap(),
pod
);

// A pod with one not-ready sidecar.
let pod = object! {
Expand All @@ -185,7 +259,7 @@ mod tests {
}
};
let pod: Pod = serde_json::from_str(pod.dump().as_str())?;
assert_eq!(filter_ready(Ok(Some(pod.clone()))).await, None);
assert!(filter_ready(Ok(Some(pod.clone()))).await.is_none());

// A pod with one ready sidecar, one not-ready.
let pod = object! {
Expand All @@ -208,7 +282,7 @@ mod tests {
}
};
let pod: Pod = serde_json::from_str(pod.dump().as_str())?;
assert_eq!(filter_ready(Ok(Some(pod.clone()))).await, None);
assert!(filter_ready(Ok(Some(pod.clone()))).await.is_none());

// A pod with two ready sidecars.
let pod = object! {
Expand All @@ -231,7 +305,53 @@ mod tests {
}
};
let pod: Pod = serde_json::from_str(pod.dump().as_str())?;
assert_eq!(filter_ready(Ok(Some(pod.clone()))).await, Some(pod));
assert_eq!(
filter_ready(Ok(Some(pod.clone()))).await.unwrap().unwrap(),
pod
);

// A pod with a sidecar that failed and won't be restarted.
let pod = object! {
apiVersion: "v1",
kind: "Pod",
metadata: { name: "pod1" },
spec: {
containers: [
{ name: "cont1" },
{ name: "cont2" },
],
restartPolicy: "Never"
},
status: {
containerStatuses: [
{ name: "cont1", ready: true },
{ name: "cont2", state: { terminated: { exitCode: 1 } } },
]
}
};
let pod: Pod = serde_json::from_str(pod.dump().as_str())?;
assert!(filter_ready(Ok(Some(pod.clone()))).await.unwrap().is_err());

// A pod with a sidecar that failed and will be restarted.
let pod = object! {
apiVersion: "v1",
kind: "Pod",
metadata: { name: "pod1" },
spec: {
containers: [
{ name: "cont1" },
{ name: "cont2" },
],
},
status: {
containerStatuses: [
{ name: "cont1", ready: true },
{ name: "cont2", state: { terminated: { exitCode: 1 } } },
]
}
};
let pod: Pod = serde_json::from_str(pod.dump().as_str())?;
assert!(filter_ready(Ok(Some(pod.clone()))).await.is_none());

Ok(())
}
Expand Down
10 changes: 7 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ async fn main() -> Result<ExitCode, Error> {
tracing_subscriber::fmt().json().init();
info!("Starting up.");

let pod = k8s::wait_for_ready().await?;
let wait_result = k8s::wait_for_ready().await;

let status = exec::run(&cli.command, &cli.args);
// If sidecar startup was successful, then keep a copy of our Pod for later, and also run the wrapped program.
let (maybe_pod, status) = match wait_result {
Ok(_) => (wait_result.ok(), exec::run(&cli.command, &cli.args)),
Err(e) => (None, Err(e)),
};

if let Err(err) = shutdown::shutdown(cli, pod).await {
if let Err(err) = shutdown::shutdown(cli, maybe_pod).await {
warn!(err = err.to_string(), "Shutdown problem");
}

Expand Down
13 changes: 7 additions & 6 deletions src/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ use crate::k8s;
use crate::stream::holistic_stream_ext::HolisticStreamExt;

/// Shut down the sidecars and wait for them to terminate.
pub async fn shutdown(cli: Cli, pod: Pod) -> Result<(), Error> {
pub async fn shutdown(cli: Cli, maybe_pod: Option<Pod>) -> Result<(), Error> {
let span = debug_span!("shutdown");
let _enter = span.enter();

info!("Sending shutdown requests.");

send_shutdown_reqs(cli).await;
wait_for_shutdown(pod).await?;
wait_for_shutdown(maybe_pod).await?;

Ok(())
}
Expand Down Expand Up @@ -149,10 +149,11 @@ mod kill {

/// Log messages as the containers shut down.
/// If the timeout expires, give up and log a message.
async fn wait_for_shutdown(pod: Pod) -> Result<(), Error> {
let timeout: Option<i64> = pod
.spec
.and_then(|spec| spec.termination_grace_period_seconds);
async fn wait_for_shutdown(maybe_pod: Option<Pod>) -> Result<(), Error> {
let timeout: Option<i64> = maybe_pod.and_then(|pod| {
pod.spec
.and_then(|spec| spec.termination_grace_period_seconds)
});
let timeout: u64 = match timeout {
Some(x @ 0..) => x.try_into().unwrap(),
_ => {
Expand Down

0 comments on commit 15b2d71

Please sign in to comment.