Skip to content

Commit

Permalink
Merge pull request #61 from pablochacin/wait-for-ephemeral-container
Browse files Browse the repository at this point in the history
Wait for ephemeral container
  • Loading branch information
javaducky authored Jul 19, 2022
2 parents 568be21 + 031135e commit 3a44c45
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 17 deletions.
6 changes: 3 additions & 3 deletions examples/add-ephemeral.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ export default function () {
namespace: namespace,
name: podName,
image: image,
command: command
command: command,
wait: "5s"
})
sleep(1)

kubernetes.pods.addEphemeralContainer(
podName,
Expand All @@ -30,9 +30,9 @@ export default function () {
image: containerImage,
command: containerCommand,
capabilities: containerCapabilities,
wait: "5s"
}
)
sleep(1)

let pod = kubernetes.pods.get(podName, namespace)
if (pod.spec.ephemeral_containers[0].name == containerName) {
Expand Down
23 changes: 23 additions & 0 deletions internal/testutils/kube_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,29 @@ func NewPodWithStatus(name string, namespace string, phase string) *coreV1.Pod {
return pod
}

// PodWithEphemeralContainerStatus is a helper for building Pods with an ephemeral container having the given Status
func PodWithEphemeralContainerStatus(name string, namespace string, container string, state string) *coreV1.Pod {
pod := NewPod(name, namespace)
var containerState coreV1.ContainerState
switch state {
case "Running":
containerState = coreV1.ContainerState{
Running: &coreV1.ContainerStateRunning{},
}
case "Waiting":
containerState = coreV1.ContainerState{
Waiting: &coreV1.ContainerStateWaiting{},
}
}
pod.Status.EphemeralContainerStatuses = []coreV1.ContainerStatus{
{
Name: container,
State: containerState,
},
}
return pod
}

// NewSecret is a helper to build a new Secret instance
func NewSecret(name string, namespace string) *coreV1.Secret {
return &coreV1.Secret{
Expand Down
4 changes: 2 additions & 2 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func TestJobs_Create(t *testing.T) {
t.Parallel()
// TODO Figure out the rest.Config
client := fake.NewSimpleClientset()
watcher := watch.NewFake()
watcher := watch.NewRaceFreeFake()
client.PrependWatchReactor("jobs", k8stest.DefaultWatchReactor(watcher, nil))
fixture := New(context.Background(), client, metav1.ListOptions{})
go func(tc TestCase) {
Expand Down Expand Up @@ -431,7 +431,7 @@ func TestJobs_Wait(t *testing.T) {
t.Run(tc.test, func(t *testing.T) {
t.Parallel()
client := fake.NewSimpleClientset()
watcher := watch.NewFake()
watcher := watch.NewRaceFreeFake()
client.PrependWatchReactor("jobs", k8stest.DefaultWatchReactor(watcher, nil))
fixture := New(context.Background(), client, metav1.ListOptions{})
go func(tc TestCase) {
Expand Down
85 changes: 75 additions & 10 deletions pkg/pods/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ type ContainerOptions struct {
Capabilities []string // capabilities to be added to the container's security context
}

// EphemeralContainerOptions describes the options for creating an ephemeral container in a pod
type EphemeralContainerOptions struct {
ContainerOptions
Wait string
}

// Pods provides API for manipulating Pod resources within a Kubernetes cluster
type Pods struct {
client kubernetes.Interface
Expand All @@ -80,6 +86,9 @@ type PodOptions struct {
Wait string // timeout for waiting until the pod is running
}

// podConditionChecker defines a function that checks if a pod satisfies a condition
type podConditionChecker func(*k8sTypes.Pod) (bool, error)

// List returns a collection of Pods available within the namespace
func (obj *Pods) List(namespace string) ([]k8sTypes.Pod, error) {
pods, err := obj.client.CoreV1().Pods(namespace).List(obj.ctx, obj.metaOptions)
Expand Down Expand Up @@ -194,10 +203,35 @@ func (obj *Pods) Wait(options WaitOptions) (bool, error) {
if err != nil {
return false, err
}

return obj.waitForCondition(
options.Namespace,
options.Name,
timeout,
func(pod *k8sTypes.Pod) (bool, error) {
if pod.Status.Phase == k8sTypes.PodFailed {
return false, errors.New("pod has failed")
}
if string(pod.Status.Phase) == options.Status {
return true, nil
}
return false, nil
},
)
}

// waitForCondition watches a Pod in a namespace until a podConditionChecker is satisfied or a timeout expires
func (obj *Pods) waitForCondition(
namespace string,
name string,
timeout time.Duration,
checker podConditionChecker,
) (bool, error) {
selector := fields.Set{
"metadata.name": options.Name,
"metadata.name": name,
}.AsSelector()
watcher, err := obj.client.CoreV1().Pods(options.Namespace).Watch(

watcher, err := obj.client.CoreV1().Pods(namespace).Watch(
obj.ctx,
metav1.ListOptions{
FieldSelector: selector.String(),
Expand All @@ -221,11 +255,9 @@ func (obj *Pods) Wait(options WaitOptions) (bool, error) {
if !isPod {
return false, errors.New("received unknown object while watching for pods")
}
if pod.Status.Phase == k8sTypes.PodFailed {
return false, errors.New("pod has failed")
}
if string(pod.Status.Phase) == options.Status {
return true, nil
condition, err := checker(pod)
if condition || err != nil {
return condition, err
}
}
}
Expand Down Expand Up @@ -278,7 +310,7 @@ func (obj *Pods) Exec(options ExecOptions) (*ExecResult, error) {

// AddEphemeralContainer adds an ephemeral container to a running pod. The Pod is identified by name and namespace.
// The container is described by options
func (obj *Pods) AddEphemeralContainer(name, namespace string, options ContainerOptions) error {
func (obj *Pods) AddEphemeralContainer(name, namespace string, options EphemeralContainerOptions) error {
pod, err := obj.Get(name, namespace)
if err != nil {
return err
Expand All @@ -287,7 +319,7 @@ func (obj *Pods) AddEphemeralContainer(name, namespace string, options Container
if err != nil {
return err
}
container := generateEphemeralContainer(options)
container := generateEphemeralContainer(options.ContainerOptions)

updatedPod := pod.DeepCopy()
updatedPod.Spec.EphemeralContainers = append(updatedPod.Spec.EphemeralContainers, *container)
Expand All @@ -304,7 +336,28 @@ func (obj *Pods) AddEphemeralContainer(name, namespace string, options Container
_, err = obj.client.CoreV1().Pods(namespace).Patch(
obj.ctx, pod.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "ephemeralcontainers")

return err
if options.Wait == "" {
return err
}

timeout, err := time.ParseDuration(options.Wait)
if err != nil {
return err
}

running, err := obj.waitForCondition(
namespace,
name,
timeout,
checkEphemeralContainerState,
)
if err != nil {
return err
}
if !running {
return errors.New("Ephemeral container has not started after " + options.Wait)
}
return nil
}

func generateEphemeralContainer(o ContainerOptions) *k8sTypes.EphemeralContainer {
Expand All @@ -329,3 +382,15 @@ func generateEphemeralContainer(o ContainerOptions) *k8sTypes.EphemeralContainer
},
}
}

func checkEphemeralContainerState(pod *k8sTypes.Pod) (bool, error) {
if pod.Status.EphemeralContainerStatuses != nil {
for _, cs := range pod.Status.EphemeralContainerStatuses {
if cs.State.Running != nil {
return true, nil
}
}
}

return false, nil
}
89 changes: 87 additions & 2 deletions pkg/pods/pods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestPods_Create(t *testing.T) {
t.Parallel()
// TODO Figure out the rest.Config
client := fake.NewSimpleClientset()
watcher := watch.NewFake()
watcher := watch.NewRaceFreeFake()
client.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(watcher, nil))
fixture := New(context.Background(), client, nil, metav1.ListOptions{})
go func(tc TestCase) {
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestPods_Wait(t *testing.T) {
t.Parallel()
// TODO Figure out the rest.Config
client := fake.NewSimpleClientset()
watcher := watch.NewFake()
watcher := watch.NewRaceFreeFake()
client.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(watcher, nil))
fixture := New(context.Background(), client, nil, metav1.ListOptions{})
go func(tc TestCase) {
Expand Down Expand Up @@ -342,3 +342,88 @@ func TestPods_Get(t *testing.T) {
})
}
}

func TestPods_AddEphemeralContainer(t *testing.T) {
t.Parallel()
type TestCase struct {
test string
podName string
namespace string
delay time.Duration
expectError bool
container string
image string
state string
wait string
}

testCases := []TestCase{
{
test: "Create ephemeral container not waiting",
podName: "test-pod",
namespace: testNamespace,
delay: 1 * time.Second,
expectError: false,
container: "ephemeral",
image: "busybox",
state: "Running",
wait: "",
},
{
test: "Create ephemeral container waiting",
podName: "test-pod",
namespace: testNamespace,
delay: 3 * time.Second,
expectError: false,
container: "ephemeral",
image: "busybox",
state: "Running",
wait: "5s",
},
{
test: "Fail waiting for container",
podName: "test-pod",
namespace: testNamespace,
delay: 3 * time.Second,
expectError: true,
container: "ephemeral",
image: "busybox",
state: "Waiting",
wait: "5s",
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.test, func(t *testing.T) {
t.Parallel()
client := fake.NewSimpleClientset(
testutils.NewPod(tc.podName, testNamespace),
)
watcher := watch.NewRaceFreeFake()
client.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(watcher, nil))
fixture := New(context.Background(), client, nil, metav1.ListOptions{})

// add watcher to update ephemarel container's status
go func(tc TestCase) {
time.Sleep(tc.delay)
watcher.Modify(testutils.PodWithEphemeralContainerStatus(tc.podName, tc.namespace, tc.container, tc.state))
}(tc)

err := fixture.AddEphemeralContainer(
tc.podName,
tc.namespace,
EphemeralContainerOptions{
ContainerOptions: ContainerOptions{
Name: tc.container,
Image: tc.image,
},
Wait: tc.wait,
},
)
if !tc.expectError && err != nil {
t.Errorf("unexpected error: %v", err)
return
}
})
}
}

0 comments on commit 3a44c45

Please sign in to comment.