Skip to content

Commit

Permalink
Merge pull request #742 from damemi/ctx-cancel-backport
Browse files Browse the repository at this point in the history
[release-1.23] Backport context cancel panic + Go version update
  • Loading branch information
k8s-ci-robot authored Feb 28, 2022
2 parents 6cdfab1 + 1712879 commit 9ed7f93
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 27 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM golang:1.17.3
FROM golang:1.17.7

WORKDIR /go/src/sigs.k8s.io/descheduler
COPY . .
Expand Down
4 changes: 2 additions & 2 deletions charts/descheduler/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: v1
name: descheduler
version: 0.23.1
appVersion: 0.23.0
version: 0.23.2
appVersion: 0.23.1
description: Descheduler for Kubernetes is used to rebalance clusters by evicting pods that can potentially be scheduled on better nodes. In the current implementation, descheduler does not schedule replacement of evicted pods but relies on the default scheduler for that.
keywords:
- kubernetes
Expand Down
11 changes: 8 additions & 3 deletions cmd/descheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,28 @@ func NewDeschedulerCommand(out io.Writer) *cobra.Command {
}

ctx, done := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer done()

pathRecorderMux := mux.NewPathRecorderMux("descheduler")
if !s.DisableMetrics {
pathRecorderMux.Handle("/metrics", legacyregistry.HandlerWithReset())
}

healthz.InstallHandler(pathRecorderMux, healthz.NamedCheck("Descheduler", healthz.PingHealthz.Check))

if _, err := SecureServing.Serve(pathRecorderMux, 0, ctx.Done()); err != nil {
stoppedCh, err := SecureServing.Serve(pathRecorderMux, 0, ctx.Done())
if err != nil {
klog.Fatalf("failed to start secure server: %v", err)
return
}

err := Run(ctx, s)
err = Run(ctx, s)
if err != nil {
klog.ErrorS(err, "descheduler server")
}

done()
// wait for metrics server to close
<-stoppedCh
},
}
cmd.SetOut(out)
Expand Down
25 changes: 11 additions & 14 deletions pkg/descheduler/descheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,7 @@ func Run(ctx context.Context, rs *options.DeschedulerServer) error {
return err
}

// tie in root ctx with our wait stopChannel
stopChannel := make(chan struct{})
go func() {
<-ctx.Done()
close(stopChannel)
}()
return RunDeschedulerStrategies(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, stopChannel)
return RunDeschedulerStrategies(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion)
}

type strategyFunction func(ctx context.Context, client clientset.Interface, strategy api.DeschedulerStrategy, nodes []*v1.Node, podEvictor *evictions.PodEvictor, getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc)
Expand Down Expand Up @@ -156,13 +150,16 @@ func cachedClient(
return fakeClient, nil
}

func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, stopChannel chan struct{}) error {
func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string) error {
sharedInformerFactory := informers.NewSharedInformerFactory(rs.Client, 0)
nodeInformer := sharedInformerFactory.Core().V1().Nodes()
podInformer := sharedInformerFactory.Core().V1().Pods()
namespaceInformer := sharedInformerFactory.Core().V1().Namespaces()
priorityClassInformer := sharedInformerFactory.Scheduling().V1().PriorityClasses()

ctx, cancel := context.WithCancel(ctx)
defer cancel()

// create the informers
namespaceInformer.Informer()
priorityClassInformer.Informer()
Expand All @@ -172,8 +169,8 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
return fmt.Errorf("build get pods assigned to node function error: %v", err)
}

sharedInformerFactory.Start(stopChannel)
sharedInformerFactory.WaitForCacheSync(stopChannel)
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())

strategyFuncs := map[api.StrategyName]strategyFunction{
"RemoveDuplicates": strategies.RemoveDuplicatePods,
Expand Down Expand Up @@ -223,13 +220,13 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
nodes, err := nodeutil.ReadyNodes(ctx, rs.Client, nodeInformer, nodeSelector)
if err != nil {
klog.V(1).InfoS("Unable to get ready nodes", "err", err)
close(stopChannel)
cancel()
return
}

if len(nodes) <= 1 {
klog.V(1).InfoS("The cluster size is 0 or 1 meaning eviction causes service disruption or degradation. So aborting..")
close(stopChannel)
cancel()
return
}

Expand Down Expand Up @@ -292,9 +289,9 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer

// If there was no interval specified, send a signal to the stopChannel to end the wait.Until loop after 1 iteration
if rs.DeschedulingInterval.Seconds() == 0 {
close(stopChannel)
cancel()
}
}, rs.DeschedulingInterval, stopChannel)
}, rs.DeschedulingInterval, ctx.Done())

return nil
}
71 changes: 67 additions & 4 deletions pkg/descheduler/descheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ func TestTaintsUpdated(t *testing.T) {
},
}

stopChannel := make(chan struct{})
defer close(stopChannel)

rs, err := options.NewDeschedulerServer()
if err != nil {
t.Fatalf("Unable to initialize server: %v", err)
Expand All @@ -47,7 +44,7 @@ func TestTaintsUpdated(t *testing.T) {
errChan := make(chan error, 1)
defer close(errChan)
go func() {
err := RunDeschedulerStrategies(ctx, rs, dp, "v1beta1", stopChannel)
err := RunDeschedulerStrategies(ctx, rs, dp, "v1beta1")
errChan <- err
}()
select {
Expand Down Expand Up @@ -101,3 +98,69 @@ func TestTaintsUpdated(t *testing.T) {
t.Fatalf("Unable to evict pod, node taint did not get propagated to descheduler strategies")
}
}

func TestRootCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
client := fakeclientset.NewSimpleClientset(n1, n2)
dp := &api.DeschedulerPolicy{
Strategies: api.StrategyList{}, // no strategies needed for this test
}

rs, err := options.NewDeschedulerServer()
if err != nil {
t.Fatalf("Unable to initialize server: %v", err)
}
rs.Client = client
rs.DeschedulingInterval = 100 * time.Millisecond
errChan := make(chan error, 1)
defer close(errChan)

go func() {
err := RunDeschedulerStrategies(ctx, rs, dp, "v1beta1")
errChan <- err
}()
cancel()
select {
case err := <-errChan:
if err != nil {
t.Fatalf("Unable to run descheduler strategies: %v", err)
}
case <-time.After(1 * time.Second):
t.Fatal("Root ctx should have canceled immediately")
}
}

func TestRootCancelWithNoInterval(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
client := fakeclientset.NewSimpleClientset(n1, n2)
dp := &api.DeschedulerPolicy{
Strategies: api.StrategyList{}, // no strategies needed for this test
}

rs, err := options.NewDeschedulerServer()
if err != nil {
t.Fatalf("Unable to initialize server: %v", err)
}
rs.Client = client
rs.DeschedulingInterval = 0
errChan := make(chan error, 1)
defer close(errChan)

go func() {
err := RunDeschedulerStrategies(ctx, rs, dp, "v1beta1")
errChan <- err
}()
cancel()
select {
case err := <-errChan:
if err != nil {
t.Fatalf("Unable to run descheduler strategies: %v", err)
}
case <-time.After(1 * time.Second):
t.Fatal("Root ctx should have canceled immediately")
}
}
4 changes: 1 addition & 3 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,9 +984,7 @@ func TestDeschedulingInterval(t *testing.T) {
if err != nil || len(evictionPolicyGroupVersion) == 0 {
t.Errorf("Error when checking support for eviction: %v", err)
}

stopChannel := make(chan struct{})
if err := descheduler.RunDeschedulerStrategies(ctx, s, deschedulerPolicy, evictionPolicyGroupVersion, stopChannel); err != nil {
if err := descheduler.RunDeschedulerStrategies(ctx, s, deschedulerPolicy, evictionPolicyGroupVersion); err != nil {
t.Errorf("Error running descheduler strategies: %+v", err)
}
c <- true
Expand Down

0 comments on commit 9ed7f93

Please sign in to comment.