diff --git a/examples/add-ephemeral.js b/examples/add-ephemeral.js index b4abb8a..a2fe03a 100644 --- a/examples/add-ephemeral.js +++ b/examples/add-ephemeral.js @@ -18,9 +18,9 @@ export default function () { namespace: namespace, name: podName, image: image, - command: command + command: command, + wait: "5s" }) - sleep(1) kubernetes.pods.addEphemeralContainer( podName, @@ -30,9 +30,9 @@ export default function () { image: containerImage, command: containerCommand, capabilities: containerCapabilities, + wait: "5s" } ) - sleep(1) let pod = kubernetes.pods.get(podName, namespace) if (pod.spec.ephemeral_containers[0].name == containerName) { diff --git a/internal/testutils/kube_resources.go b/internal/testutils/kube_resources.go index 7bf2e84..39372ea 100644 --- a/internal/testutils/kube_resources.go +++ b/internal/testutils/kube_resources.go @@ -255,6 +255,29 @@ func NewPodWithStatus(name string, namespace string, phase string) *coreV1.Pod { return pod } +// PodWithEphemeralContainerStatus is a helper for building Pods with an ephemeral container having the given Status +func PodWithEphemeralContainerStatus(name string, namespace string, container string, state string) *coreV1.Pod { + pod := NewPod(name, namespace) + var containerState coreV1.ContainerState + switch state { + case "Running": + containerState = coreV1.ContainerState{ + Running: &coreV1.ContainerStateRunning{}, + } + case "Waiting": + containerState = coreV1.ContainerState{ + Waiting: &coreV1.ContainerStateWaiting{}, + } + } + pod.Status.EphemeralContainerStatuses = []coreV1.ContainerStatus{ + { + Name: container, + State: containerState, + }, + } + return pod +} + // NewSecret is a helper to build a new Secret instance func NewSecret(name string, namespace string) *coreV1.Secret { return &coreV1.Secret{ diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 4d321b6..31ddad9 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -202,7 +202,7 @@ func TestJobs_Create(t *testing.T) { t.Parallel() // TODO Figure out the rest.Config client := fake.NewSimpleClientset() - watcher := watch.NewFake() + watcher := watch.NewRaceFreeFake() client.PrependWatchReactor("jobs", k8stest.DefaultWatchReactor(watcher, nil)) fixture := New(context.Background(), client, metav1.ListOptions{}) go func(tc TestCase) { @@ -431,7 +431,7 @@ func TestJobs_Wait(t *testing.T) { t.Run(tc.test, func(t *testing.T) { t.Parallel() client := fake.NewSimpleClientset() - watcher := watch.NewFake() + watcher := watch.NewRaceFreeFake() client.PrependWatchReactor("jobs", k8stest.DefaultWatchReactor(watcher, nil)) fixture := New(context.Background(), client, metav1.ListOptions{}) go func(tc TestCase) { diff --git a/pkg/pods/pods.go b/pkg/pods/pods.go index b6623ae..202f524 100644 --- a/pkg/pods/pods.go +++ b/pkg/pods/pods.go @@ -61,6 +61,12 @@ type ContainerOptions struct { Capabilities []string // capabilities to be added to the container's security context } +// EphemeralContainerOptions describes the options for creating an ephemeral container in a pod +type EphemeralContainerOptions struct { + ContainerOptions + Wait string +} + // Pods provides API for manipulating Pod resources within a Kubernetes cluster type Pods struct { client kubernetes.Interface @@ -80,6 +86,9 @@ type PodOptions struct { Wait string // timeout for waiting until the pod is running } +// podConditionChecker defines a function that checks if a pod satisfies a condition +type podConditionChecker func(*k8sTypes.Pod) (bool, error) + // List returns a collection of Pods available within the namespace func (obj *Pods) List(namespace string) ([]k8sTypes.Pod, error) { pods, err := obj.client.CoreV1().Pods(namespace).List(obj.ctx, obj.metaOptions) @@ -194,10 +203,35 @@ func (obj *Pods) Wait(options WaitOptions) (bool, error) { if err != nil { return false, err } + + return obj.waitForCondition( + options.Namespace, + options.Name, + timeout, + func(pod *k8sTypes.Pod) (bool, error) { + if pod.Status.Phase == k8sTypes.PodFailed { + return false, errors.New("pod has failed") + } + if string(pod.Status.Phase) == options.Status { + return true, nil + } + return false, nil + }, + ) +} + +// waitForCondition watches a Pod in a namespace until a podConditionChecker is satisfied or a timeout expires +func (obj *Pods) waitForCondition( + namespace string, + name string, + timeout time.Duration, + checker podConditionChecker, +) (bool, error) { selector := fields.Set{ - "metadata.name": options.Name, + "metadata.name": name, }.AsSelector() - watcher, err := obj.client.CoreV1().Pods(options.Namespace).Watch( + + watcher, err := obj.client.CoreV1().Pods(namespace).Watch( obj.ctx, metav1.ListOptions{ FieldSelector: selector.String(), @@ -221,11 +255,9 @@ func (obj *Pods) Wait(options WaitOptions) (bool, error) { if !isPod { return false, errors.New("received unknown object while watching for pods") } - if pod.Status.Phase == k8sTypes.PodFailed { - return false, errors.New("pod has failed") - } - if string(pod.Status.Phase) == options.Status { - return true, nil + condition, err := checker(pod) + if condition || err != nil { + return condition, err } } } @@ -278,7 +310,7 @@ func (obj *Pods) Exec(options ExecOptions) (*ExecResult, error) { // AddEphemeralContainer adds an ephemeral container to a running pod. The Pod is identified by name and namespace. // The container is described by options -func (obj *Pods) AddEphemeralContainer(name, namespace string, options ContainerOptions) error { +func (obj *Pods) AddEphemeralContainer(name, namespace string, options EphemeralContainerOptions) error { pod, err := obj.Get(name, namespace) if err != nil { return err @@ -287,7 +319,7 @@ func (obj *Pods) AddEphemeralContainer(name, namespace string, options Container if err != nil { return err } - container := generateEphemeralContainer(options) + container := generateEphemeralContainer(options.ContainerOptions) updatedPod := pod.DeepCopy() updatedPod.Spec.EphemeralContainers = append(updatedPod.Spec.EphemeralContainers, *container) @@ -304,7 +336,28 @@ func (obj *Pods) AddEphemeralContainer(name, namespace string, options Container _, err = obj.client.CoreV1().Pods(namespace).Patch( obj.ctx, pod.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "ephemeralcontainers") - return err + if options.Wait == "" { + return err + } + + timeout, err := time.ParseDuration(options.Wait) + if err != nil { + return err + } + + running, err := obj.waitForCondition( + namespace, + name, + timeout, + checkEphemeralContainerState, + ) + if err != nil { + return err + } + if !running { + return errors.New("Ephemeral container has not started after " + options.Wait) + } + return nil } func generateEphemeralContainer(o ContainerOptions) *k8sTypes.EphemeralContainer { @@ -329,3 +382,15 @@ func generateEphemeralContainer(o ContainerOptions) *k8sTypes.EphemeralContainer }, } } + +func checkEphemeralContainerState(pod *k8sTypes.Pod) (bool, error) { + if pod.Status.EphemeralContainerStatuses != nil { + for _, cs := range pod.Status.EphemeralContainerStatuses { + if cs.State.Running != nil { + return true, nil + } + } + } + + return false, nil +} diff --git a/pkg/pods/pods_test.go b/pkg/pods/pods_test.go index 88ddc5f..fe2d4eb 100644 --- a/pkg/pods/pods_test.go +++ b/pkg/pods/pods_test.go @@ -84,7 +84,7 @@ func TestPods_Create(t *testing.T) { t.Parallel() // TODO Figure out the rest.Config client := fake.NewSimpleClientset() - watcher := watch.NewFake() + watcher := watch.NewRaceFreeFake() client.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(watcher, nil)) fixture := New(context.Background(), client, nil, metav1.ListOptions{}) go func(tc TestCase) { @@ -183,7 +183,7 @@ func TestPods_Wait(t *testing.T) { t.Parallel() // TODO Figure out the rest.Config client := fake.NewSimpleClientset() - watcher := watch.NewFake() + watcher := watch.NewRaceFreeFake() client.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(watcher, nil)) fixture := New(context.Background(), client, nil, metav1.ListOptions{}) go func(tc TestCase) { @@ -342,3 +342,88 @@ func TestPods_Get(t *testing.T) { }) } } + +func TestPods_AddEphemeralContainer(t *testing.T) { + t.Parallel() + type TestCase struct { + test string + podName string + namespace string + delay time.Duration + expectError bool + container string + image string + state string + wait string + } + + testCases := []TestCase{ + { + test: "Create ephemeral container not waiting", + podName: "test-pod", + namespace: testNamespace, + delay: 1 * time.Second, + expectError: false, + container: "ephemeral", + image: "busybox", + state: "Running", + wait: "", + }, + { + test: "Create ephemeral container waiting", + podName: "test-pod", + namespace: testNamespace, + delay: 3 * time.Second, + expectError: false, + container: "ephemeral", + image: "busybox", + state: "Running", + wait: "5s", + }, + { + test: "Fail waiting for container", + podName: "test-pod", + namespace: testNamespace, + delay: 3 * time.Second, + expectError: true, + container: "ephemeral", + image: "busybox", + state: "Waiting", + wait: "5s", + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.test, func(t *testing.T) { + t.Parallel() + client := fake.NewSimpleClientset( + testutils.NewPod(tc.podName, testNamespace), + ) + watcher := watch.NewRaceFreeFake() + client.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(watcher, nil)) + fixture := New(context.Background(), client, nil, metav1.ListOptions{}) + + // add watcher to update ephemarel container's status + go func(tc TestCase) { + time.Sleep(tc.delay) + watcher.Modify(testutils.PodWithEphemeralContainerStatus(tc.podName, tc.namespace, tc.container, tc.state)) + }(tc) + + err := fixture.AddEphemeralContainer( + tc.podName, + tc.namespace, + EphemeralContainerOptions{ + ContainerOptions: ContainerOptions{ + Name: tc.container, + Image: tc.image, + }, + Wait: tc.wait, + }, + ) + if !tc.expectError && err != nil { + t.Errorf("unexpected error: %v", err) + return + } + }) + } +}