Skip to content

Commit

Permalink
feat: TypeMeta as primary object Kind value source (#198)
Browse files Browse the repository at this point in the history
  • Loading branch information
pijusn-cast authored Nov 4, 2024
1 parent 8d3fd4d commit ab92482
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 123 deletions.
4 changes: 2 additions & 2 deletions internal/services/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func CollectSingleSnapshot(ctx context.Context,
return nil, err
}

log.Debugf("synced %d items", len(d.CacheLegacy))
log.Debugf("synced %d items", len(d.Cache))

return d.ToCASTAIRequest(), nil
}
Expand Down Expand Up @@ -516,7 +516,7 @@ func (c *Controller) send(ctx context.Context) {
nodesByName := map[string]*corev1.Node{}
var nodes []*corev1.Node

for _, item := range c.delta.CacheLegacy {
for _, item := range c.delta.Cache {
n, ok := item.Obj.(*corev1.Node)
if !ok {
continue
Expand Down
126 changes: 5 additions & 121 deletions internal/services/controller/delta/delta.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package delta

import (
"crypto/sha256"
"encoding/json"
"fmt"
"reflect"
"time"

"github.com/sirupsen/logrus"
Expand All @@ -25,8 +23,7 @@ func New(log logrus.FieldLogger, clusterID, clusterVersion, agentVersion string)
clusterVersion: clusterVersion,
agentVersion: agentVersion,
FullSnapshot: true,
CacheLegacy: map[string]*Item{},
CacheModern: map[string]*Item{},
Cache: map[string]*Item{},
}
}

Expand All @@ -38,18 +35,12 @@ type Delta struct {
clusterVersion string
agentVersion string
FullSnapshot bool
CacheLegacy map[string]*Item
CacheModern map[string]*Item
Cache map[string]*Item
}

// Add will add an Item to the Delta Cache. It will debounce the objects.
func (d *Delta) Add(i *Item) {
d.addToLegacy(i)
d.addToModern(i)
}

func (d *Delta) addToModern(i *Item) {
cache := d.CacheModern
cache := d.Cache

if len(i.kind) == 0 {
gvk, err := determineObjectGVK(i.Obj)
Expand All @@ -73,26 +64,6 @@ func (d *Delta) addToModern(i *Item) {
}
}

func (d *Delta) addToLegacy(i *Item) {
cache := d.CacheLegacy

key := keyObject(i.Obj)

if other, ok := cache[key]; ok && other.event == castai.EventAdd && i.event == castai.EventUpdate {
i.event = castai.EventAdd
cache[key] = i
} else if ok && other.event == castai.EventDelete && (i.event == castai.EventAdd || i.event == castai.EventUpdate) {
i.event = castai.EventUpdate
cache[key] = i
} else {
cache[key] = i
}
}

func keyObject(obj Object) string {
return fmt.Sprintf("%s::%s/%s", reflect.TypeOf(obj).String(), obj.GetNamespace(), obj.GetName())
}

func itemCacheKey(i *Item) string {
return fmt.Sprintf("%s::%s/%s", i.kind, i.Obj.GetNamespace(), i.Obj.GetName())
}
Expand All @@ -101,22 +72,14 @@ func itemCacheKey(i *Item) string {
// delivered.
func (d *Delta) Clear() {
d.FullSnapshot = false
d.CacheLegacy = map[string]*Item{}
d.CacheModern = map[string]*Item{}
d.Cache = map[string]*Item{}
}

// ToCASTAIRequest maps the collected Delta Cache to the castai.Delta type.
func (d *Delta) ToCASTAIRequest() *castai.Delta {
resultLegacy := d.toCASTAIRequestLegacy()
resultModern := d.toCASTAIRequestModern()
logMismatches(d.log, resultLegacy, resultModern)
return resultLegacy
}

func (d *Delta) toCASTAIRequestModern() *castai.Delta {
var items []*castai.DeltaItem

for _, i := range d.CacheModern {
for _, i := range d.Cache {
data, err := Encode(i.Obj)
if err != nil {
d.log.Errorf("failed to encode %T: %v", i.Obj, err)
Expand All @@ -139,43 +102,6 @@ func (d *Delta) toCASTAIRequestModern() *castai.Delta {
}
}

func (d *Delta) toCASTAIRequestLegacy() *castai.Delta {
var items []*castai.DeltaItem

for _, i := range d.CacheLegacy {
data, err := Encode(i.Obj)
if err != nil {
d.log.Errorf("failed to encode %T: %v", i.Obj, err)
continue
}

kinds, _, err := scheme.Scheme.ObjectKinds(i.Obj)
if err != nil {
d.log.Errorf("failed to find Object %T kind: %v", i.Obj, err)
continue
}
if len(kinds) == 0 || kinds[0].Kind == "" {
d.log.Errorf("unknown Object kind for Object %T", i.Obj)
continue
}

items = append(items, &castai.DeltaItem{
Event: i.event,
Kind: kinds[0].Kind,
Data: data,
CreatedAt: time.Now().UTC(),
})
}

return &castai.Delta{
ClusterID: d.clusterID,
ClusterVersion: d.clusterVersion,
AgentVersion: d.agentVersion,
FullSnapshot: d.FullSnapshot,
Items: items,
}
}

func Encode(obj interface{}) (*json.RawMessage, error) {
b, err := json.Marshal(obj)
if err != nil {
Expand Down Expand Up @@ -206,48 +132,6 @@ func determineObjectGVK(obj runtime.Object) (schema.GroupVersionKind, error) {
return kinds[0], nil
}

func logMismatches(log logrus.FieldLogger, legacy *castai.Delta, modern *castai.Delta) {
if len(legacy.Items) != len(modern.Items) {
log.Warnf("delta_modern number of items mismatch: %d legacy.Items vs %d modern.Items", len(legacy.Items), len(modern.Items))
}

checkLegacy := toChecksumMap(legacy)
checkModern := toChecksumMap(modern)

for key, _ := range checkLegacy {
if checkModern[key] == nil {
log.Warnf("delta_modern item mismatch: legacy list has item that is missing from modern list: %s", key)
}
delete(checkLegacy, key)
delete(checkModern, key)
}

for key, _ := range checkModern {
if checkLegacy[key] == nil {
log.Warnf("delta_modern item mismatch: modern list has item that is missing from legacy list: %s", key)
}
delete(checkLegacy, key)
delete(checkModern, key)
}
}

func toChecksumMap(delta *castai.Delta) map[string]*castai.DeltaItem {
out := map[string]*castai.DeltaItem{}
for _, i := range delta.Items {
var hash string
if i.Data != nil {
hash = sha256hash(*i.Data)
}
key := fmt.Sprintf("%s-%s-%s", i.Event, i.Kind, hash)
out[key] = i
}
return out
}

func sha256hash(data []byte) string {
return fmt.Sprintf("%x", sha256.Sum256(data))
}

type Object interface {
runtime.Object
metav1.Object
Expand Down

0 comments on commit ab92482

Please sign in to comment.