Skip to content

Commit

Permalink
use RetryContext to eliminate busy-waiting
Browse files Browse the repository at this point in the history
also add a couple of tests to cover the other timeout cases. Fixes #2614
  • Loading branch information
alyssaruth committed Nov 4, 2024
1 parent a443c94 commit 2fd6436
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 66 deletions.
102 changes: 36 additions & 66 deletions manifest/provider/waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/terraform-plugin-go/tftypes"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/retry"
"github.com/hashicorp/terraform-provider-kubernetes/manifest/payload"
"github.com/zclconf/go-cty/cty"

Expand All @@ -23,8 +24,6 @@ import (
"k8s.io/kubectl/pkg/polymorphichelpers"
)

const waiterSleepTime = 1 * time.Second

func (s *RawProviderServer) waitForCompletion(ctx context.Context, waitForBlock tftypes.Value, rs dynamic.ResourceInterface, rname string, rtype tftypes.Type, th map[string]string) error {
if waitForBlock.IsNull() || !waitForBlock.IsKnown() {
return nil
Expand All @@ -34,12 +33,15 @@ func (s *RawProviderServer) waitForCompletion(ctx context.Context, waitForBlock
if err != nil {
return err
}
return waiter.Wait(ctx)

deadline, _ := ctx.Deadline()
return retry.RetryContext(ctx, deadline.Sub(time.Now()),
waiter.Wait(ctx))
}

// Waiter is a simple interface to implement a blocking wait operation
type Waiter interface {
Wait(context.Context) error
Wait(context.Context) retry.RetryFunc
}

type WaiterError struct {
Expand Down Expand Up @@ -148,24 +150,17 @@ type FieldWaiter struct {
}

// Wait blocks until all of the FieldMatchers configured evaluate to true
func (w *FieldWaiter) Wait(ctx context.Context) error {
w.logger.Info("[ApplyResourceChange][Wait] Waiting until ready...\n")
for {
if deadline, ok := ctx.Deadline(); ok {
if time.Now().After(deadline) {
return WaiterError{Reason: "field matchers"}
}
}

func (w *FieldWaiter) Wait(ctx context.Context) retry.RetryFunc {
return func() *retry.RetryError {
// NOTE The typed API resource is actually returned in the
// event object but I haven't yet figured out how to convert it
// to a cty.Value.
res, err := w.resource.Get(ctx, w.resourceName, v1.GetOptions{})
if err != nil {
return err
return retry.NonRetryableError(err)
}
if errors.IsGone(err) {
return fmt.Errorf("resource was deleted")
return retry.NonRetryableError(fmt.Errorf("resource was deleted"))
}
resObj := res.Object
meta := resObj["metadata"].(map[string]interface{})
Expand All @@ -175,17 +170,17 @@ func (w *FieldWaiter) Wait(ctx context.Context) error {

obj, err := payload.ToTFValue(resObj, w.resourceType, w.typeHints, tftypes.NewAttributePath())
if err != nil {
return err
return retry.NonRetryableError(err)
}

done, err := func(obj tftypes.Value) (bool, error) {
result := func(obj tftypes.Value) *retry.RetryError {
for _, m := range w.fieldMatchers {
vi, rp, err := tftypes.WalkAttributePath(obj, m.path)
if err != nil {
return false, err
return retry.RetryableError(err)
}
if len(rp.Steps()) > 0 {
return false, fmt.Errorf("attribute not present at path '%s'", m.path.String())
return retry.RetryableError(fmt.Errorf("attribute not present at path '%s'", m.path.String()))
}

var s string
Expand All @@ -208,33 +203,29 @@ func (w *FieldWaiter) Wait(ctx context.Context) error {
s = fmt.Sprintf("%f", i)
}
default:
return true, fmt.Errorf("wait_for: cannot match on type %q", v.Type().String())
return retry.NonRetryableError(fmt.Errorf("wait_for: cannot match on type %q", v.Type().String()))
}

if !m.valueMatcher.Match([]byte(s)) {
return false, nil
return retry.RetryableError(WaiterError{Reason: "field matchers"})
}
}

return true, nil
return nil
}(obj)

if done {
w.logger.Info("[ApplyResourceChange][Wait] Done waiting.\n")
return err
}

// TODO: implement with exponential back-off.
time.Sleep(waiterSleepTime) // lintignore:R018
return result
}
}

// NoopWaiter is a placeholder for when there is nothing to wait on
type NoopWaiter struct{}

// Wait returns immediately
func (w *NoopWaiter) Wait(_ context.Context) error {
return nil
func (w *NoopWaiter) Wait(_ context.Context) retry.RetryFunc {
return func() *retry.RetryError {
return nil
}
}

// FieldPathToTftypesPath takes a string representation of
Expand Down Expand Up @@ -286,43 +277,33 @@ type RolloutWaiter struct {
}

// Wait uses StatusViewer to determine if the rollout is done
func (w *RolloutWaiter) Wait(ctx context.Context) error {
w.logger.Info("[ApplyResourceChange][Wait] Waiting until rollout complete...\n")
for {
if deadline, ok := ctx.Deadline(); ok {
if time.Now().After(deadline) {
return WaiterError{Reason: "rollout to complete"}
}
}

func (w *RolloutWaiter) Wait(ctx context.Context) retry.RetryFunc {
return func() *retry.RetryError {
res, err := w.resource.Get(ctx, w.resourceName, v1.GetOptions{})
if err != nil {
return err
return retry.NonRetryableError(err)
}
if errors.IsGone(err) {
return fmt.Errorf("resource was deleted")
return retry.NonRetryableError(fmt.Errorf("resource was deleted"))
}

gk := res.GetObjectKind().GroupVersionKind().GroupKind()
statusViewer, err := polymorphichelpers.StatusViewerFor(gk)
if err != nil {
return fmt.Errorf("error getting resource status: %v", err)
return retry.NonRetryableError(fmt.Errorf("error getting resource status: %v", err))
}

_, done, err := statusViewer.Status(res, 0)
if err != nil {
return fmt.Errorf("error getting resource status: %v", err)
return retry.NonRetryableError(fmt.Errorf("error getting resource status: %v", err))
}

if done {
break
return nil
}

time.Sleep(waiterSleepTime) // lintignore:R018
return retry.RetryableError(WaiterError{Reason: "rollout to complete"})
}

w.logger.Info("[ApplyResourceChange][Wait] Rollout complete\n")
return nil
}

// ConditionsWaiter will wait for the specified conditions on
Expand All @@ -335,22 +316,14 @@ type ConditionsWaiter struct {
}

// Wait checks all the configured conditions have been met
func (w *ConditionsWaiter) Wait(ctx context.Context) error {
w.logger.Info("[ApplyResourceChange][Wait] Waiting for conditions...\n")

for {
if deadline, ok := ctx.Deadline(); ok {
if time.Now().After(deadline) {
return WaiterError{Reason: "conditions"}
}
}

func (w *ConditionsWaiter) Wait(ctx context.Context) retry.RetryFunc {
return func() *retry.RetryError {
res, err := w.resource.Get(ctx, w.resourceName, v1.GetOptions{})
if err != nil {
return err
return retry.NonRetryableError(err)
}
if errors.IsGone(err) {
return fmt.Errorf("resource was deleted")
return retry.NonRetryableError(fmt.Errorf("resource was deleted"))
}

if status, ok := res.Object["status"].(map[string]interface{}); ok {
Expand All @@ -373,13 +346,10 @@ func (w *ConditionsWaiter) Wait(ctx context.Context) error {
conditionsMet = conditionsMet && conditionMet
}
if conditionsMet {
break
return nil
}
}
}
time.Sleep(waiterSleepTime) // lintignore:R018
return retry.RetryableError(WaiterError{Reason: "conditions"})
}

w.logger.Info("[ApplyResourceChange][Wait] All conditions met.\n")
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright (c) HashiCorp, Inc.
# SPDX-License-Identifier: MPL-2.0

resource "kubernetes_manifest" "test" {

manifest = {
apiVersion = "v1"
kind = "Pod"

metadata = {
name = var.name
namespace = var.namespace

annotations = {
"test.terraform.io" = "test"
}

labels = {
app = "nginx"
}
}

spec = {
containers = [
{
name = "nginx"
image = "nginx:1.19"

readinessProbe = {
initialDelaySeconds = 10

httpGet = {
path = "/"
port = 80
}
}
}
]
}
}

wait {
fields = {
"status.phase" = "Invalid",
}
}

timeouts {
create = "5s"
}
}
52 changes: 52 additions & 0 deletions manifest/test/acceptance/testdata/Wait/wait_for_rollout_invalid.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright (c) HashiCorp, Inc.
# SPDX-License-Identifier: MPL-2.0

resource kubernetes_manifest wait_for_rollout {
manifest = {
apiVersion = "apps/v1"
kind = "Deployment"
metadata = {
name = var.name
namespace = var.namespace
}
spec = {
replicas = 2
selector = {
matchLabels = {
app = "tf-acc-test"
}
}
template = {
metadata = {
labels = {
app = "tf-acc-test"
}
}
spec = {
containers = [
{
image = "nginx:invalid-does-not-exist"
imagePullPolicy = "IfNotPresent"
name = "tf-acc-test"
readinessProbe = {
httpGet = {
port = 80
path = "/"
}
initialDelaySeconds = 10
}
},
]
}
}
}
}

wait {
rollout = true
}

timeouts {
create = "5s"
}
}
Loading

0 comments on commit 2fd6436

Please sign in to comment.