Skip to content

Commit

Permalink
fix(torch): refactor extract to func - check if pods are in Running s…
Browse files Browse the repository at this point in the history
…tate

Signed-off-by: Jose Ramon Mañes <[email protected]>
  • Loading branch information
tty47 committed Nov 9, 2023
1 parent 4398782 commit eddceca
Showing 1 changed file with 20 additions and 20 deletions.
40 changes: 20 additions & 20 deletions pkg/k8s/statefulsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import (
"github.com/celestiaorg/torch/pkg/db/redis"
)

const queueK8SNodes = "k8s"
const (
queueK8SNodes = "k8s" // queueK8SNodes name of the queue.
daNodePrefix = "da" // daNodePrefix name prefix that Torch will use to filter the StatefulSets.
)

// WatchStatefulSets watches for changes to the StatefulSets in the specified namespace and updates the metrics accordingly
func WatchStatefulSets() error {
Expand All @@ -40,32 +43,29 @@ func WatchStatefulSets() error {
return err
}

// Variable to keep track of the last processed name
lastProcessedName := ""

// Watch for events on the watcher channel
for event := range watcher.ResultChan() {
// Check if the event object is of type *v1.StatefulSet
if statefulSet, ok := event.Object.(*v1.StatefulSet); ok {
// Check if the name has the "da" prefix
if strings.HasPrefix(statefulSet.Name, "da") {
// Check if the name is different from the last processed one
if statefulSet.Name != lastProcessedName {
// Process the name and perform necessary actions
err := redis.Producer(statefulSet.Name, queueK8SNodes)
if err != nil {
log.Error("ERROR adding the node to the queue: ", err)
return err
}

// Update the last processed name to don't process it more than once.
lastProcessedName = statefulSet.Name
} else {
// cleanup the previous processed to add it in future iterations.
lastProcessedName = ""
// Check if the StatefulSet is valid based on the conditions
if isStatefulSetValid(statefulSet) {
// Perform necessary actions, such as adding the node to the Redis queue
err := redis.Producer(statefulSet.Name, queueK8SNodes)
if err != nil {
log.Error("ERROR adding the node to the queue: ", err)
return err
}
}
}
}

return nil
}

// isStatefulSetValid validates the StatefulSet received.
// checks if the StatefulSet name contains the daNodePrefix, and if the StatefulSet is in the "Running" state.
func isStatefulSetValid(statefulSet *v1.StatefulSet) bool {
return strings.HasPrefix(statefulSet.Name, daNodePrefix) &&
statefulSet.Status.CurrentReplicas > 0 &&
statefulSet.Status.Replicas == statefulSet.Status.ReadyReplicas
}

0 comments on commit eddceca

Please sign in to comment.