diff --git a/pkg/k8s/statefulsets.go b/pkg/k8s/statefulsets.go index ec5ce3c..9defb87 100644 --- a/pkg/k8s/statefulsets.go +++ b/pkg/k8s/statefulsets.go @@ -13,7 +13,10 @@ import ( "github.com/celestiaorg/torch/pkg/db/redis" ) -const queueK8SNodes = "k8s" +const ( + queueK8SNodes = "k8s" // queueK8SNodes name of the queue. + daNodePrefix = "da" // daNodePrefix name prefix that Torch will use to filter the StatefulSets. +) // WatchStatefulSets watches for changes to the StatefulSets in the specified namespace and updates the metrics accordingly func WatchStatefulSets() error { @@ -40,28 +43,17 @@ func WatchStatefulSets() error { return err } - // Variable to keep track of the last processed name - lastProcessedName := "" - // Watch for events on the watcher channel for event := range watcher.ResultChan() { + // Check if the event object is of type *v1.StatefulSet if statefulSet, ok := event.Object.(*v1.StatefulSet); ok { - // Check if the name has the "da" prefix - if strings.HasPrefix(statefulSet.Name, "da") { - // Check if the name is different from the last processed one - if statefulSet.Name != lastProcessedName { - // Process the name and perform necessary actions - err := redis.Producer(statefulSet.Name, queueK8SNodes) - if err != nil { - log.Error("ERROR adding the node to the queue: ", err) - return err - } - - // Update the last processed name to don't process it more than once. - lastProcessedName = statefulSet.Name - } else { - // cleanup the previous processed to add it in future iterations. - lastProcessedName = "" + // Check if the StatefulSet is valid based on the conditions + if isStatefulSetValid(statefulSet) { + // Perform necessary actions, such as adding the node to the Redis queue + err := redis.Producer(statefulSet.Name, queueK8SNodes) + if err != nil { + log.Error("ERROR adding the node to the queue: ", err) + return err } } } @@ -69,3 +61,11 @@ func WatchStatefulSets() error { return nil } + +// isStatefulSetValid validates the StatefulSet received. +// checks if the StatefulSet name contains the daNodePrefix, and if the StatefulSet is in the "Running" state. +func isStatefulSetValid(statefulSet *v1.StatefulSet) bool { + return strings.HasPrefix(statefulSet.Name, daNodePrefix) && + statefulSet.Status.CurrentReplicas > 0 && + statefulSet.Status.Replicas == statefulSet.Status.ReadyReplicas +}