Skip to content

Commit

Permalink
Merge pull request #688 from hexfusion/set-defrag-delay
Browse files Browse the repository at this point in the history
pkg/operator/defragcontroller: ensure defrag has clear signal.
  • Loading branch information
openshift-merge-robot authored Oct 13, 2021
2 parents 0aac637 + 2c32833 commit b5ee3cc
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 47 deletions.
10 changes: 5 additions & 5 deletions pkg/etcdcli/etcdcli.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func (g *etcdClientGetter) UnhealthyMembers(ctx context.Context) ([]*etcdserverp
return nil, fmt.Errorf("could not get member list %v", err)
}

memberHealth := getMemberHealth(etcdCluster.Members)
memberHealth := getMemberHealth(ctx, etcdCluster.Members)

unstartedMemberNames := GetUnstartedMemberNames(memberHealth)
if len(unstartedMemberNames) > 0 {
Expand Down Expand Up @@ -369,7 +369,7 @@ func (g *etcdClientGetter) HealthyMembers(ctx context.Context) ([]*etcdserverpb.
return nil, err
}

healthyMembers := getMemberHealth(etcdCluster.Members).GetHealthyMembers()
healthyMembers := getMemberHealth(ctx, etcdCluster.Members).GetHealthyMembers()
if len(healthyMembers) == 0 {
return nil, fmt.Errorf("no healthy etcd members found")
}
Expand All @@ -387,14 +387,14 @@ func (g *etcdClientGetter) MemberHealth(ctx context.Context) (memberHealth, erro
if err != nil {
return nil, err
}
return getMemberHealth(etcdCluster.Members), nil
return getMemberHealth(ctx, etcdCluster.Members), nil
}

func (g *etcdClientGetter) IsMemberHealthy(member *etcdserverpb.Member) (bool, error) {
func (g *etcdClientGetter) IsMemberHealthy(ctx context.Context, member *etcdserverpb.Member) (bool, error) {
if member == nil {
return false, fmt.Errorf("member can not be nil")
}
memberHealth := getMemberHealth([]*etcdserverpb.Member{member})
memberHealth := getMemberHealth(ctx, []*etcdserverpb.Member{member})
if len(memberHealth) == 0 {
return false, fmt.Errorf("member health check failed")
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/etcdcli/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type healthCheck struct {

type memberHealth []healthCheck

func getMemberHealth(etcdMembers []*etcdserverpb.Member) memberHealth {
func getMemberHealth(ctx context.Context, etcdMembers []*etcdserverpb.Member) memberHealth {
var wg sync.WaitGroup
memberHealth := memberHealth{}
hch := make(chan healthCheck, len(etcdMembers))
Expand All @@ -58,10 +58,9 @@ func getMemberHealth(etcdMembers []*etcdserverpb.Member) memberHealth {
}
defer cli.Close()
st := time.Now()
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
// linearized request to verify health of member
resp, err := cli.Get(ctx, "health")
cancel()
hc := healthCheck{Member: member, Healthy: false, Took: time.Since(st).String()}
if err == nil {
if resp.Header != nil {
Expand All @@ -71,6 +70,7 @@ func getMemberHealth(etcdMembers []*etcdserverpb.Member) memberHealth {
} else {
hc.Error = fmt.Errorf("health check failed: %w", err)
}
cancel()
hch <- hc
}(member)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/etcdcli/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (f *fakeEtcdClient) MemberHealth(ctx context.Context) (memberHealth, error)
}

//IsMemberHealthy returns true if the number of etcd members equals the member of healthy members.
func (f *fakeEtcdClient) IsMemberHealthy(member *etcdserverpb.Member) (bool, error) {
func (f *fakeEtcdClient) IsMemberHealthy(ctx context.Context, member *etcdserverpb.Member) (bool, error) {
return len(f.members) == f.opts.healthyMember, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/etcdcli/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type MemberHealth interface {
MemberHealth(ctx context.Context) (memberHealth, error)
}
type IsMemberHealthy interface {
IsMemberHealthy(member *etcdserverpb.Member) (bool, error)
IsMemberHealthy(ctx context.Context, member *etcdserverpb.Member) (bool, error)
}
type MemberRemover interface {
MemberRemove(ctx context.Context, member string) error
Expand Down
106 changes: 70 additions & 36 deletions pkg/operator/defragcontroller/defragcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math"
"time"

configv1 "github.com/openshift/api/config/v1"
Expand All @@ -23,12 +24,17 @@ import (

const (
minDefragBytes int64 = 100 * 1024 * 1024 // 100MB
minDefragWaitDuration = 36 * time.Second
maxFragmentedPercentage float64 = 45
waitDuration = 2 * time.Second
timeoutDuration = 30 * time.Second
pollWaitDuration = 2 * time.Second
pollTimeoutDuration = 45 * time.Second
compactionInterval = 10 * time.Minute

defragDisabledCondition = "DefragControllerDisabled"
)

// DefragController observes the operand state file for fragmentation
// DefragController observes the etcd state file fragmentation via Status method of Maintenance API. Based on these
// observations the controller will perform rolling defragmentation of each etcd member in the cluster.
type DefragController struct {
operatorClient v1helpers.OperatorClient
etcdClient etcdcli.EtcdClient
Expand All @@ -46,7 +52,7 @@ func NewDefragController(
etcdClient: etcdClient,
infrastructureLister: infrastructureLister,
}
return factory.New().ResyncEvery(9*time.Minute).WithInformers(
return factory.New().ResyncEvery(compactionInterval+1*time.Minute).WithInformers( // attempt to sync outside of etcd compaction interval to ensure maximum gain by defragmentation.
operatorClient.Informer(),
).WithSync(c.sync).ToController("DefragController", eventRecorder.WithComponentSuffix("defrag-controller"))
}
Expand Down Expand Up @@ -76,38 +82,21 @@ func (c *DefragController) sync(ctx context.Context, syncCtx factory.SyncContext
}

func (c *DefragController) checkDefrag(ctx context.Context, recorder events.Recorder) error {
controlPlaneTopology, err := ceohelpers.GetControlPlaneTopology(c.infrastructureLister)
// Check for existing status.
_, status, _, err := c.operatorClient.GetOperatorState()
if err != nil {
return err
}
var updateErr error
// Defrag is blocking and can only be safely performed in HighlyAvailableTopologyMode
if controlPlaneTopology == configv1.HighlyAvailableTopologyMode {
_, _, updateErr = v1helpers.UpdateStatus(c.operatorClient,
v1helpers.UpdateConditionFn(operatorv1.OperatorCondition{
Type: "DefragControllerDisabled",
Status: operatorv1.ConditionFalse,
Reason: "AsExpected",
}))
} else {
_, _, updateErr = v1helpers.UpdateStatus(c.operatorClient,
v1helpers.UpdateConditionFn(operatorv1.OperatorCondition{
Type: "DefragControllerDisabled",
Status: operatorv1.ConditionTrue,
Reason: "AsExpected",
}))
return nil
}
if updateErr != nil {
recorder.Warning("DefragControllerUpdatingStatus", updateErr.Error())
}

etcdMembers, err := c.etcdClient.MemberList(ctx)
if err != nil {
return err
controllerDisabledCondition := v1helpers.FindOperatorCondition(status.Conditions, defragDisabledCondition)
if controllerDisabledCondition == nil {
err := c.ensureControllerDisabledCondition(recorder)
if err != nil {
return fmt.Errorf("failed to update controller disabled status: %w", err)
}
}

// Do not defrag if any of the cluster members are unhealthy
// Do not defrag if any of the cluster members are unhealthy.
memberHealth, err := c.etcdClient.MemberHealth(ctx)
if err != nil {
return err
Expand All @@ -116,6 +105,11 @@ func (c *DefragController) checkDefrag(ctx context.Context, recorder events.Reco
return fmt.Errorf("cluster is unhealthy: %s", memberHealth.Status())
}

etcdMembers, err := c.etcdClient.MemberList(ctx)
if err != nil {
return err
}

var endpointStatus []*clientv3.StatusResponse
var leader *clientv3.StatusResponse
for _, member := range etcdMembers {
Expand All @@ -134,7 +128,7 @@ func (c *DefragController) checkDefrag(ctx context.Context, recorder events.Reco
endpointStatus = append(endpointStatus, status)
}

// leader last if possible
// Leader last if possible.
if leader != nil {
klog.V(4).Infof("Appending leader last, ID: %d", leader.Header.MemberId)
endpointStatus = append(endpointStatus, leader)
Expand All @@ -155,7 +149,7 @@ func (c *DefragController) checkDefrag(ctx context.Context, recorder events.Reco
if isEndpointBackendFragmented(member, status) {
recorder.Eventf("DefragControllerDefragmentAttempt", "Attempting defrag on member: %s, memberID: %d, dbSize: %d, dbInUse: %d, leader ID: %d", member.Name, member.ID, status.DbSize, status.DbSizeInUse, status.Leader)
if _, err := c.etcdClient.Defragment(ctx, member); err != nil {
// defrag can timeout if defragmentation takes longer than etcdcli.DefragDialTimeout
// Defrag can timeout if defragmentation takes longer than etcdcli.DefragDialTimeout.
errors = append(errors, fmt.Errorf("failed to defragment etcd member: %q :%v", member.Name, err))
continue
}
Expand All @@ -164,9 +158,13 @@ func (c *DefragController) checkDefrag(ctx context.Context, recorder events.Reco

// Give cluster time to recover before we move to the next member.
if err := wait.Poll(
waitDuration,
timeoutDuration,
pollWaitDuration,
pollTimeoutDuration,
func() (bool, error) {
// Ensure defragmentation attempts have clear observable signal.
klog.V(4).Infof("Sleeping to allow cluster to recover before defrag next member: %v", minDefragWaitDuration)
time.Sleep(minDefragWaitDuration)

memberHealth, err := c.etcdClient.MemberHealth(ctx)
if err != nil {
klog.Warningf("failed checking member health: %v", err)
Expand All @@ -186,6 +184,39 @@ func (c *DefragController) checkDefrag(ctx context.Context, recorder events.Reco
return v1helpers.NewMultiLineAggregate(errors)
}

func (c *DefragController) ensureControllerDisabledCondition(recorder events.Recorder) error {
controlPlaneTopology, err := ceohelpers.GetControlPlaneTopology(c.infrastructureLister)
if err != nil {
return fmt.Errorf("failed to get control-plane topology: %w", err)
}

// Defrag is blocking and can only be safely performed in HighlyAvailableTopologyMode
if controlPlaneTopology == configv1.HighlyAvailableTopologyMode {
_, _, updateErr := v1helpers.UpdateStatus(c.operatorClient,
v1helpers.UpdateConditionFn(operatorv1.OperatorCondition{
Type: defragDisabledCondition,
Status: operatorv1.ConditionFalse,
Reason: "AsExpected",
}))
if updateErr != nil {
recorder.Warning("DefragControllerUpdatingStatus", updateErr.Error())
return err
}
} else {
_, _, updateErr := v1helpers.UpdateStatus(c.operatorClient,
v1helpers.UpdateConditionFn(operatorv1.OperatorCondition{
Type: defragDisabledCondition,
Status: operatorv1.ConditionTrue,
Reason: "AsExpected",
}))
if updateErr != nil {
recorder.Warning("DefragControllerUpdatingStatus", updateErr.Error())
return err
}
}
return nil
}

// isEndpointBackendFragmented checks the status of all cluster members to ensure that no members have a fragmented store.
// This can happen if the operator starts defrag of the cluster but then loses leader status and is rescheduled before
// the operator can defrag all members.
Expand All @@ -195,13 +226,16 @@ func isEndpointBackendFragmented(member *etcdserverpb.Member, endpointStatus *cl
return false
}
fragmentedPercentage := checkFragmentationPercentage(endpointStatus.DbSize, endpointStatus.DbSizeInUse)
klog.Infof("etcd member %q backend store fragmented: %.2f %%, dbSize: %d", member.Name, fragmentedPercentage, endpointStatus.DbSize)
if fragmentedPercentage > 0.00 {
klog.Infof("etcd member %q backend store fragmented: %.2f %%, dbSize: %d", member.Name, fragmentedPercentage, endpointStatus.DbSize)
}
return fragmentedPercentage >= maxFragmentedPercentage && endpointStatus.DbSize >= minDefragBytes
}

func checkFragmentationPercentage(ondisk, inuse int64) float64 {
diff := float64(ondisk - inuse)
return (diff / float64(ondisk)) * 100
fragmentedPercentage := (diff / float64(ondisk)) * 100
return math.Round(fragmentedPercentage*100) / 100
}

func getMemberFromStatus(members []*etcdserverpb.Member, endpointStatus *clientv3.StatusResponse) (*etcdserverpb.Member, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (c *UpgradeBackupController) getBackupNodeName(ctx context.Context) (string
errs = append(errs, err)
continue
}
isHealthy, err := c.etcdClient.IsMemberHealthy(member)
isHealthy, err := c.etcdClient.IsMemberHealthy(ctx, member)
if err != nil {
errs = append(errs, err)
continue
Expand Down

0 comments on commit b5ee3cc

Please sign in to comment.