diff --git a/internal/services/controller/controller_exclude_race_test.go b/internal/services/controller/controller_exclude_race_test.go new file mode 100644 index 00000000..33d92223 --- /dev/null +++ b/internal/services/controller/controller_exclude_race_test.go @@ -0,0 +1,143 @@ +//go:build !race +// +build !race + +package controller + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + "time" + + "castai-agent/internal/castai" + mock_castai "castai-agent/internal/castai/mock" + "castai-agent/internal/config" + mock_types "castai-agent/internal/services/providers/types/mock" + mock_version "castai-agent/internal/services/version/mock" + "github.com/golang/mock/gomock" + "github.com/google/uuid" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" +) + +func TestController_ShouldKeepDeltaAfterDelete(t *testing.T) { + mockctrl := gomock.NewController(t) + castaiclient := mock_castai.NewMockClient(mockctrl) + version := mock_version.NewMockInterface(mockctrl) + provider := mock_types.NewMockProvider(mockctrl) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: v1.NamespaceDefault, Name: "pod1"}} + podData, err := encode(pod) + require.NoError(t, err) + + clientset := fake.NewSimpleClientset() + f := informers.NewSharedInformerFactory(clientset, 0) + + version.EXPECT().MinorInt().Return(19) + version.EXPECT().Full().Return("1.19+") + + clusterID := uuid.New() + + var invocations int64 + + // initial full snapshot + castaiclient.EXPECT(). + SendDelta(gomock.Any(), clusterID.String(), gomock.Any()). + DoAndReturn(func(_ context.Context, clusterID string, d *castai.Delta) error { + defer atomic.AddInt64(&invocations, 1) + + require.Equal(t, clusterID, d.ClusterID) + require.Equal(t, "1.19+", d.ClusterVersion) + require.True(t, d.FullSnapshot) + require.Len(t, d.Items, 0) + + clientset.CoreV1().Pods("default").Create(ctx, pod, metav1.CreateOptions{}) + + return nil + }) + + // first delta add pod - fail and trigger pod delete + castaiclient.EXPECT(). + SendDelta(gomock.Any(), clusterID.String(), gomock.Any()). + DoAndReturn(func(_ context.Context, clusterID string, d *castai.Delta) error { + defer atomic.AddInt64(&invocations, 1) + + require.Equal(t, clusterID, d.ClusterID) + require.Equal(t, "1.19+", d.ClusterVersion) + require.False(t, d.FullSnapshot) + require.Len(t, d.Items, 1) + + var actualValues []string + for _, item := range d.Items { + actualValues = append(actualValues, fmt.Sprintf("%s-%s-%s", item.Event, item.Kind, item.Data)) + } + + require.Contains(t, actualValues, fmt.Sprintf("%s-%s-%s", castai.EventAdd, "Pod", podData)) + + clientset.CoreV1().Pods("default").Delete(ctx, pod.Name, metav1.DeleteOptions{}) + + return fmt.Errorf("testError") + }) + + // second attempt to send data when pod delete is received + castaiclient.EXPECT(). + SendDelta(gomock.Any(), clusterID.String(), gomock.Any()). + DoAndReturn(func(_ context.Context, clusterID string, d *castai.Delta) error { + defer atomic.AddInt64(&invocations, 1) + + require.Equal(t, clusterID, d.ClusterID) + require.Equal(t, "1.19+", d.ClusterVersion) + require.False(t, d.FullSnapshot) + require.Len(t, d.Items, 1) + + var actualValues []string + for _, item := range d.Items { + actualValues = append(actualValues, fmt.Sprintf("%s-%s-%s", item.Event, item.Kind, item.Data)) + } + + require.Contains(t, actualValues, fmt.Sprintf("%s-%s-%s", castai.EventDelete, "Pod", podData)) + + return nil + }) + + agentVersion := &config.AgentVersion{Version: "1.2.3"} + castaiclient.EXPECT().ExchangeAgentTelemetry(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes(). + Return(&castai.AgentTelemetryResponse{}, nil). + Do(func(ctx context.Context, clusterID string, req *castai.AgentTelemetryRequest) { + require.Equalf(t, "1.2.3", req.AgentVersion, "got request: %+v", req) + }) + + log := logrus.New() + log.SetLevel(logrus.DebugLevel) + ctrl := New( + log, + f, + castaiclient, + provider, + clusterID.String(), + 2*time.Second, + 2*time.Second, + 10*time.Millisecond, + version, + agentVersion, + ) + + f.Start(ctx.Done()) + + go ctrl.Run(ctx) + + wait.Until(func() { + if atomic.LoadInt64(&invocations) >= 3 { + cancel() + } + }, 10*time.Millisecond, ctx.Done()) +} diff --git a/internal/services/controller/controller_test.go b/internal/services/controller/controller_test.go index ed1dbdb5..c1b3fcae 100644 --- a/internal/services/controller/controller_test.go +++ b/internal/services/controller/controller_test.go @@ -107,7 +107,7 @@ func TestController_HappyPath(t *testing.T) { clusterID.String(), 15*time.Second, 2*time.Second, - 10 * time.Millisecond, + 10*time.Millisecond, version, agentVersion, ) diff --git a/internal/services/controller/delta.go b/internal/services/controller/delta.go index 799edc27..baaffc5c 100644 --- a/internal/services/controller/delta.go +++ b/internal/services/controller/delta.go @@ -40,9 +40,7 @@ type delta struct { func (d *delta) add(i *item) { key := mustKeyObject(i.obj) - if other, ok := d.cache[key]; ok && other.event == eventAdd && i.event == eventDelete { - delete(d.cache, key) - } else if ok && other.event == eventAdd && i.event == eventUpdate { + if other, ok := d.cache[key]; ok && other.event == eventAdd && i.event == eventUpdate { i.event = eventAdd d.cache[key] = i } else if ok && other.event == eventDelete && (i.event == eventAdd || i.event == eventUpdate) { diff --git a/internal/services/controller/delta_test.go b/internal/services/controller/delta_test.go index b88ce812..df13f41c 100644 --- a/internal/services/controller/delta_test.go +++ b/internal/services/controller/delta_test.go @@ -91,7 +91,7 @@ func TestDelta(t *testing.T) { }, }, { - name: "debounce: entirely remove added item when it is deleted", + name: "debounce: keep only delete event when an added item is deleted", items: []*item{ { obj: pod1, @@ -106,6 +106,13 @@ func TestDelta(t *testing.T) { ClusterID: clusterID, ClusterVersion: version, FullSnapshot: true, + Items: []*castai.DeltaItem{ + { + Event: castai.EventDelete, + Kind: "Pod", + Data: mustEncode(t, pod1), + }, + }, }, }, { @@ -198,7 +205,7 @@ func TestDelta(t *testing.T) { require.Equal(t, clusterID, got.ClusterID) require.Equal(t, version, got.ClusterVersion) require.True(t, got.FullSnapshot) - require.Equal(t, len(got.Items), len(test.expected.Items)) + require.Equal(t, len(test.expected.Items), len(got.Items)) for _, expectedItem := range test.expected.Items { requireContains(t, got.Items, expectedItem) }