diff --git a/pkg/etcdcli/etcdcli.go b/pkg/etcdcli/etcdcli.go index 5de2c7578..bb2d77703 100644 --- a/pkg/etcdcli/etcdcli.go +++ b/pkg/etcdcli/etcdcli.go @@ -341,7 +341,7 @@ func (g *etcdClientGetter) UnhealthyMembers(ctx context.Context) ([]*etcdserverp return nil, fmt.Errorf("could not get member list %v", err) } - memberHealth := getMemberHealth(etcdCluster.Members) + memberHealth := getMemberHealth(ctx, etcdCluster.Members) unstartedMemberNames := GetUnstartedMemberNames(memberHealth) if len(unstartedMemberNames) > 0 { @@ -369,7 +369,7 @@ func (g *etcdClientGetter) HealthyMembers(ctx context.Context) ([]*etcdserverpb. return nil, err } - healthyMembers := getMemberHealth(etcdCluster.Members).GetHealthyMembers() + healthyMembers := getMemberHealth(ctx, etcdCluster.Members).GetHealthyMembers() if len(healthyMembers) == 0 { return nil, fmt.Errorf("no healthy etcd members found") } @@ -387,14 +387,14 @@ func (g *etcdClientGetter) MemberHealth(ctx context.Context) (memberHealth, erro if err != nil { return nil, err } - return getMemberHealth(etcdCluster.Members), nil + return getMemberHealth(ctx, etcdCluster.Members), nil } -func (g *etcdClientGetter) IsMemberHealthy(member *etcdserverpb.Member) (bool, error) { +func (g *etcdClientGetter) IsMemberHealthy(ctx context.Context, member *etcdserverpb.Member) (bool, error) { if member == nil { return false, fmt.Errorf("member can not be nil") } - memberHealth := getMemberHealth([]*etcdserverpb.Member{member}) + memberHealth := getMemberHealth(ctx, []*etcdserverpb.Member{member}) if len(memberHealth) == 0 { return false, fmt.Errorf("member health check failed") } diff --git a/pkg/etcdcli/health.go b/pkg/etcdcli/health.go index d278d021c..354742740 100644 --- a/pkg/etcdcli/health.go +++ b/pkg/etcdcli/health.go @@ -39,7 +39,7 @@ type healthCheck struct { type memberHealth []healthCheck -func getMemberHealth(etcdMembers []*etcdserverpb.Member) memberHealth { +func getMemberHealth(ctx context.Context, etcdMembers []*etcdserverpb.Member) memberHealth { var wg sync.WaitGroup memberHealth := memberHealth{} hch := make(chan healthCheck, len(etcdMembers)) @@ -58,10 +58,9 @@ func getMemberHealth(etcdMembers []*etcdserverpb.Member) memberHealth { } defer cli.Close() st := time.Now() - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) // linearized request to verify health of member resp, err := cli.Get(ctx, "health") - cancel() hc := healthCheck{Member: member, Healthy: false, Took: time.Since(st).String()} if err == nil { if resp.Header != nil { @@ -71,6 +70,7 @@ func getMemberHealth(etcdMembers []*etcdserverpb.Member) memberHealth { } else { hc.Error = fmt.Errorf("health check failed: %w", err) } + cancel() hch <- hc }(member) } diff --git a/pkg/etcdcli/helpers.go b/pkg/etcdcli/helpers.go index 13d824182..5fe78d97f 100644 --- a/pkg/etcdcli/helpers.go +++ b/pkg/etcdcli/helpers.go @@ -73,7 +73,7 @@ func (f *fakeEtcdClient) MemberHealth(ctx context.Context) (memberHealth, error) } //IsMemberHealthy returns true if the number of etcd members equals the member of healthy members. -func (f *fakeEtcdClient) IsMemberHealthy(member *etcdserverpb.Member) (bool, error) { +func (f *fakeEtcdClient) IsMemberHealthy(ctx context.Context, member *etcdserverpb.Member) (bool, error) { return len(f.members) == f.opts.healthyMember, nil } diff --git a/pkg/etcdcli/interfaces.go b/pkg/etcdcli/interfaces.go index e965e0455..4592df419 100644 --- a/pkg/etcdcli/interfaces.go +++ b/pkg/etcdcli/interfaces.go @@ -46,7 +46,7 @@ type MemberHealth interface { MemberHealth(ctx context.Context) (memberHealth, error) } type IsMemberHealthy interface { - IsMemberHealthy(member *etcdserverpb.Member) (bool, error) + IsMemberHealthy(ctx context.Context, member *etcdserverpb.Member) (bool, error) } type MemberRemover interface { MemberRemove(ctx context.Context, member string) error diff --git a/pkg/operator/defragcontroller/defragcontroller.go b/pkg/operator/defragcontroller/defragcontroller.go index 29b362841..0479119bb 100644 --- a/pkg/operator/defragcontroller/defragcontroller.go +++ b/pkg/operator/defragcontroller/defragcontroller.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math" "time" configv1 "github.com/openshift/api/config/v1" @@ -23,12 +24,17 @@ import ( const ( minDefragBytes int64 = 100 * 1024 * 1024 // 100MB + minDefragWaitDuration = 36 * time.Second maxFragmentedPercentage float64 = 45 - waitDuration = 2 * time.Second - timeoutDuration = 30 * time.Second + pollWaitDuration = 2 * time.Second + pollTimeoutDuration = 45 * time.Second + compactionInterval = 10 * time.Minute + + defragDisabledCondition = "DefragControllerDisabled" ) -// DefragController observes the operand state file for fragmentation +// DefragController observes the etcd state file fragmentation via Status method of Maintenance API. Based on these +// observations the controller will perform rolling defragmentation of each etcd member in the cluster. type DefragController struct { operatorClient v1helpers.OperatorClient etcdClient etcdcli.EtcdClient @@ -46,7 +52,7 @@ func NewDefragController( etcdClient: etcdClient, infrastructureLister: infrastructureLister, } - return factory.New().ResyncEvery(9*time.Minute).WithInformers( + return factory.New().ResyncEvery(compactionInterval+1*time.Minute).WithInformers( // attempt to sync outside of etcd compaction interval to ensure maximum gain by defragmentation. operatorClient.Informer(), ).WithSync(c.sync).ToController("DefragController", eventRecorder.WithComponentSuffix("defrag-controller")) } @@ -76,38 +82,21 @@ func (c *DefragController) sync(ctx context.Context, syncCtx factory.SyncContext } func (c *DefragController) checkDefrag(ctx context.Context, recorder events.Recorder) error { - controlPlaneTopology, err := ceohelpers.GetControlPlaneTopology(c.infrastructureLister) + // Check for existing status. + _, status, _, err := c.operatorClient.GetOperatorState() if err != nil { return err } - var updateErr error - // Defrag is blocking and can only be safely performed in HighlyAvailableTopologyMode - if controlPlaneTopology == configv1.HighlyAvailableTopologyMode { - _, _, updateErr = v1helpers.UpdateStatus(c.operatorClient, - v1helpers.UpdateConditionFn(operatorv1.OperatorCondition{ - Type: "DefragControllerDisabled", - Status: operatorv1.ConditionFalse, - Reason: "AsExpected", - })) - } else { - _, _, updateErr = v1helpers.UpdateStatus(c.operatorClient, - v1helpers.UpdateConditionFn(operatorv1.OperatorCondition{ - Type: "DefragControllerDisabled", - Status: operatorv1.ConditionTrue, - Reason: "AsExpected", - })) - return nil - } - if updateErr != nil { - recorder.Warning("DefragControllerUpdatingStatus", updateErr.Error()) - } - etcdMembers, err := c.etcdClient.MemberList(ctx) - if err != nil { - return err + controllerDisabledCondition := v1helpers.FindOperatorCondition(status.Conditions, defragDisabledCondition) + if controllerDisabledCondition == nil { + err := c.ensureControllerDisabledCondition(recorder) + if err != nil { + return fmt.Errorf("failed to update controller disabled status: %w", err) + } } - // Do not defrag if any of the cluster members are unhealthy + // Do not defrag if any of the cluster members are unhealthy. memberHealth, err := c.etcdClient.MemberHealth(ctx) if err != nil { return err @@ -116,6 +105,11 @@ func (c *DefragController) checkDefrag(ctx context.Context, recorder events.Reco return fmt.Errorf("cluster is unhealthy: %s", memberHealth.Status()) } + etcdMembers, err := c.etcdClient.MemberList(ctx) + if err != nil { + return err + } + var endpointStatus []*clientv3.StatusResponse var leader *clientv3.StatusResponse for _, member := range etcdMembers { @@ -134,7 +128,7 @@ func (c *DefragController) checkDefrag(ctx context.Context, recorder events.Reco endpointStatus = append(endpointStatus, status) } - // leader last if possible + // Leader last if possible. if leader != nil { klog.V(4).Infof("Appending leader last, ID: %d", leader.Header.MemberId) endpointStatus = append(endpointStatus, leader) @@ -155,7 +149,7 @@ func (c *DefragController) checkDefrag(ctx context.Context, recorder events.Reco if isEndpointBackendFragmented(member, status) { recorder.Eventf("DefragControllerDefragmentAttempt", "Attempting defrag on member: %s, memberID: %d, dbSize: %d, dbInUse: %d, leader ID: %d", member.Name, member.ID, status.DbSize, status.DbSizeInUse, status.Leader) if _, err := c.etcdClient.Defragment(ctx, member); err != nil { - // defrag can timeout if defragmentation takes longer than etcdcli.DefragDialTimeout + // Defrag can timeout if defragmentation takes longer than etcdcli.DefragDialTimeout. errors = append(errors, fmt.Errorf("failed to defragment etcd member: %q :%v", member.Name, err)) continue } @@ -164,9 +158,13 @@ func (c *DefragController) checkDefrag(ctx context.Context, recorder events.Reco // Give cluster time to recover before we move to the next member. if err := wait.Poll( - waitDuration, - timeoutDuration, + pollWaitDuration, + pollTimeoutDuration, func() (bool, error) { + // Ensure defragmentation attempts have clear observable signal. + klog.V(4).Infof("Sleeping to allow cluster to recover before defrag next member: %v", minDefragWaitDuration) + time.Sleep(minDefragWaitDuration) + memberHealth, err := c.etcdClient.MemberHealth(ctx) if err != nil { klog.Warningf("failed checking member health: %v", err) @@ -186,6 +184,39 @@ func (c *DefragController) checkDefrag(ctx context.Context, recorder events.Reco return v1helpers.NewMultiLineAggregate(errors) } +func (c *DefragController) ensureControllerDisabledCondition(recorder events.Recorder) error { + controlPlaneTopology, err := ceohelpers.GetControlPlaneTopology(c.infrastructureLister) + if err != nil { + return fmt.Errorf("failed to get control-plane topology: %w", err) + } + + // Defrag is blocking and can only be safely performed in HighlyAvailableTopologyMode + if controlPlaneTopology == configv1.HighlyAvailableTopologyMode { + _, _, updateErr := v1helpers.UpdateStatus(c.operatorClient, + v1helpers.UpdateConditionFn(operatorv1.OperatorCondition{ + Type: defragDisabledCondition, + Status: operatorv1.ConditionFalse, + Reason: "AsExpected", + })) + if updateErr != nil { + recorder.Warning("DefragControllerUpdatingStatus", updateErr.Error()) + return err + } + } else { + _, _, updateErr := v1helpers.UpdateStatus(c.operatorClient, + v1helpers.UpdateConditionFn(operatorv1.OperatorCondition{ + Type: defragDisabledCondition, + Status: operatorv1.ConditionTrue, + Reason: "AsExpected", + })) + if updateErr != nil { + recorder.Warning("DefragControllerUpdatingStatus", updateErr.Error()) + return err + } + } + return nil +} + // isEndpointBackendFragmented checks the status of all cluster members to ensure that no members have a fragmented store. // This can happen if the operator starts defrag of the cluster but then loses leader status and is rescheduled before // the operator can defrag all members. @@ -195,13 +226,16 @@ func isEndpointBackendFragmented(member *etcdserverpb.Member, endpointStatus *cl return false } fragmentedPercentage := checkFragmentationPercentage(endpointStatus.DbSize, endpointStatus.DbSizeInUse) - klog.Infof("etcd member %q backend store fragmented: %.2f %%, dbSize: %d", member.Name, fragmentedPercentage, endpointStatus.DbSize) + if fragmentedPercentage > 0.00 { + klog.Infof("etcd member %q backend store fragmented: %.2f %%, dbSize: %d", member.Name, fragmentedPercentage, endpointStatus.DbSize) + } return fragmentedPercentage >= maxFragmentedPercentage && endpointStatus.DbSize >= minDefragBytes } func checkFragmentationPercentage(ondisk, inuse int64) float64 { diff := float64(ondisk - inuse) - return (diff / float64(ondisk)) * 100 + fragmentedPercentage := (diff / float64(ondisk)) * 100 + return math.Round(fragmentedPercentage*100) / 100 } func getMemberFromStatus(members []*etcdserverpb.Member, endpointStatus *clientv3.StatusResponse) (*etcdserverpb.Member, error) { diff --git a/pkg/operator/upgradebackupcontroller/upgradebackupcontroller.go b/pkg/operator/upgradebackupcontroller/upgradebackupcontroller.go index f52a9a6a2..6b16684b3 100644 --- a/pkg/operator/upgradebackupcontroller/upgradebackupcontroller.go +++ b/pkg/operator/upgradebackupcontroller/upgradebackupcontroller.go @@ -257,7 +257,7 @@ func (c *UpgradeBackupController) getBackupNodeName(ctx context.Context) (string errs = append(errs, err) continue } - isHealthy, err := c.etcdClient.IsMemberHealthy(member) + isHealthy, err := c.etcdClient.IsMemberHealthy(ctx, member) if err != nil { errs = append(errs, err) continue