Skip to content

Commit

Permalink
Make Full ListOptions Available
Browse files Browse the repository at this point in the history
Create an alternate constructor that allows us to set all the fields of
the ListOptions so we can filter on Fields as well as Labels.  Also,
delegate to the new, more flexible, constructor from the old one.
  • Loading branch information
sergiosalvatore committed May 9, 2024
1 parent 5ba38e2 commit 79bab27
Showing 1 changed file with 16 additions and 12 deletions.
28 changes: 16 additions & 12 deletions k8s_pod_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (i *InitialListComplete) Continues() bool {
type PodWatcher struct {
cs kubernetes.Interface
k8sNamespace string
selector string
listOpts k8smeta.ListOptions

cbs []EventCallback

Expand All @@ -189,13 +189,14 @@ type Logger interface {
Printf(string, ...interface{})
}

// NewPodWatcher constructs a new PodWatcher. Each callback in the variadic
// callback list is called in its own goroutine.
func NewPodWatcher(cs kubernetes.Interface, k8sNamespace, selector string, cbs ...EventCallback) *PodWatcher {
// NewPodWatcherOptions constructs a new PodWatcher with a potentially more
// complicated selector. Each callback in the variadic callback list is called
// in its own goroutine.
func NewPodWatcherOptions(cs kubernetes.Interface, k8sNamespace string, options k8smeta.ListOptions, cbs ...EventCallback) *PodWatcher {
return &PodWatcher{
cs: cs,
k8sNamespace: k8sNamespace,
selector: selector,
listOpts: options,
cbs: cbs,
tracker: podTracker{
lastStatus: map[string]*k8score.Pod{},
Expand All @@ -204,17 +205,20 @@ func NewPodWatcher(cs kubernetes.Interface, k8sNamespace, selector string, cbs .
}
}

// NewPodWatcher constructs a new PodWatcher. The `selector` selects by labels
// only. Each callback in the variadic callback list is called in its own
// goroutine.
func NewPodWatcher(cs kubernetes.Interface, k8sNamespace, selector string, cbs ...EventCallback) *PodWatcher {
return NewPodWatcherOptions(cs, k8sNamespace, k8smeta.ListOptions{LabelSelector: selector}, cbs...)
}

func (p *PodWatcher) logf(format string, args ...interface{}) {
if p.Logger == nil {
return
}
p.Logger.Printf(format, args...)
}

func (p *PodWatcher) listOpts() k8smeta.ListOptions {
return k8smeta.ListOptions{LabelSelector: p.selector}
}

func setContinues(ev PodEvent, continues bool) {
switch pe := ev.(type) {
case *CreatePod:
Expand All @@ -231,7 +235,7 @@ func setContinues(ev PodEvent, continues bool) {
// returns the new version ID (or an error)
func (p *PodWatcher) resync(ctx context.Context, cbChans []chan<- PodEvent) (string, error) {
initPods, initListerr := p.cs.CoreV1().Pods(p.k8sNamespace).List(
ctx, p.listOpts())
ctx, p.listOpts)
if initListerr != nil {
return "", fmt.Errorf("failed pod listing: %w", initListerr)
}
Expand Down Expand Up @@ -278,7 +282,7 @@ func (p *PodWatcher) resync(ctx context.Context, cbChans []chan<- PodEvent) (str
// returns the number of pods, resource version and (optionally) an error
func (p *PodWatcher) initialPods(ctx context.Context) (int, string, error) {
initPods, initListerr := p.cs.CoreV1().Pods(p.k8sNamespace).List(
ctx, p.listOpts())
ctx, p.listOpts)
if initListerr != nil {
return -1, "", fmt.Errorf("failed initial pod listing: %w", initListerr)
}
Expand Down Expand Up @@ -391,7 +395,7 @@ func (p *PodWatcher) Run(ctx context.Context) error {

lastwatchStart := time.Now()
for {
watchOpt := p.listOpts()
watchOpt := p.listOpts
watchOpt.ResourceVersion = version
podWatch, watchStartErr := p.cs.CoreV1().Pods(p.k8sNamespace).Watch(ctx, watchOpt)
if watchStartErr != nil {
Expand Down

0 comments on commit 79bab27

Please sign in to comment.