Skip to content

Commit

Permalink
Merge pull request #14 from vimeo/InitialStateComplete_event
Browse files Browse the repository at this point in the history
pod watcher: Add InitialListComplete event
  • Loading branch information
dfinkel authored Dec 7, 2021
2 parents 2a08b8a + 14646ee commit 5ba38e2
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 15 deletions.
31 changes: 29 additions & 2 deletions k8s_pod_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,28 @@ func (d *DeletePod) ResourceVersion() ResourceVersion { return d.rv }
// ResourceVersion because this event is part of the same resync.
func (d *DeletePod) Continues() bool { return d.continues }

// InitialListComplete is a synthetic event indicating that the initial list of
// pods matching the specified label matcher is complete (all previous
// `CreatePod` events' callbacks have completed).
type InitialListComplete struct {
rv ResourceVersion
}

// PodName returns an empty string because it's not really a single pod event.
func (i *InitialListComplete) PodName() string { return "" }

// ResourceVersion returns the ResourceVersion of the initial list
func (i *InitialListComplete) ResourceVersion() ResourceVersion { return i.rv }

// Continues indicates that the next event will use the same
// ResourceVersion because this event is part of either the same
// initial-state-dump or resync.
// This is always the last event of the initial state-dump, so it always has
// Continues set to false.
func (i *InitialListComplete) Continues() bool {
return false
}

// PodWatcher uses the k8s API to watch for new/deleted k8s pods
type PodWatcher struct {
cs kubernetes.Interface
Expand Down Expand Up @@ -278,14 +300,19 @@ func (p *PodWatcher) initialPods(ctx context.Context) (int, string, error) {
IP: &net.IPAddr{IP: ipaddr},
// be sure NOT to use the loop variable here :-)
Def: &initPods.Items[i],
// only the last event sets continues to false
continues: i < len(initPods.Items)-1,
// only the last event sets continues to false (and
// that'll be the InitialListComplete event)
continues: true,
}
p.tracker.recordEvent(&event)
for _, cb := range p.cbs {
cb(ctx, &event)
}
}
// Inform all the callbacks that the full-state dump is complete.
for _, cb := range p.cbs {
cb(ctx, &InitialListComplete{rv: ResourceVersion(initPods.ResourceVersion)})
}
return len(initPods.Items), initPods.ResourceVersion, nil
}

Expand Down
67 changes: 54 additions & 13 deletions k8s_pod_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,11 @@ func TestListWatchPods(t *testing.T) {
}
case *DeletePod:
isDel = true
case *InitialListComplete:
// this event carries no state, so just return
return
default:
t.Fatalf("unhandled event type: %T", ev)
}

// close the firstEventSeen channel if it's not
Expand Down Expand Up @@ -685,12 +690,18 @@ func TestPodWatcherErrorRecovery(t *testing.T) {
watchRets []watchRet
expectedEvents []PodEvent
}{
{
name: "none_exist",
listRets: []listRetPI{},
expectedEvents: []PodEvent{&InitialListComplete{}},
watchRets: []watchRet{{watch: []watchEvent{}}},
},
{
name: "one_ready",
listRets: []listRetPI{{pi: []podInfo{{name: "foobar", ip: "10.42.42.42", labels: map[string]string{"app": "fimbat"},
ready: true, phase: k8score.PodRunning}}}},
expectedEvents: []PodEvent{&CreatePod{name: "foobar", IP: &net.IPAddr{IP: net.IPv4(10, 42, 42, 42)},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, true, k8score.PodRunning)}},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, true, k8score.PodRunning), continues: true}, &InitialListComplete{}},
watchRets: []watchRet{{watch: []watchEvent{}}},
},
{
Expand All @@ -706,7 +717,7 @@ func TestPodWatcherErrorRecovery(t *testing.T) {
&CreatePod{name: "foobar", IP: &net.IPAddr{IP: net.IPv4(10, 42, 42, 42)},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, true, k8score.PodRunning), continues: true},
&CreatePod{name: "foobar2", IP: &net.IPAddr{IP: net.IPv4(10, 42, 43, 41)},
Def: genPod("foobar2", "10.42.43.41", map[string]string{"app": "fimbat"}, true, k8score.PodRunning), continues: false},
Def: genPod("foobar2", "10.42.43.41", map[string]string{"app": "fimbat"}, true, k8score.PodRunning), continues: true}, &InitialListComplete{},
},
},
{
Expand All @@ -722,7 +733,8 @@ func TestPodWatcherErrorRecovery(t *testing.T) {
&CreatePod{name: "foobar", IP: &net.IPAddr{IP: net.IPv4(10, 42, 42, 42)},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, false, k8score.PodRunning), continues: true},
&CreatePod{name: "foobar2", IP: &net.IPAddr{IP: net.IPv4(10, 42, 43, 41)},
Def: genPod("foobar2", "10.42.43.41", map[string]string{"app": "fimbat"}, false, k8score.PodRunning), continues: false},
Def: genPod("foobar2", "10.42.43.41", map[string]string{"app": "fimbat"}, false, k8score.PodRunning), continues: true},
&InitialListComplete{},
},
},
{
Expand All @@ -738,7 +750,8 @@ func TestPodWatcherErrorRecovery(t *testing.T) {
&CreatePod{name: "foobar", IP: &net.IPAddr{IP: net.IPv4(10, 42, 42, 42)},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, false, k8score.PodRunning), continues: true},
&CreatePod{name: "foobar2", IP: &net.IPAddr{IP: net.IPv4(10, 42, 43, 41)},
Def: genPod("foobar2", "10.42.43.41", map[string]string{"app": "fimbat"}, false, k8score.PodRunning), continues: false},
Def: genPod("foobar2", "10.42.43.41", map[string]string{"app": "fimbat"}, false, k8score.PodRunning), continues: true},
&InitialListComplete{},
},
},
{
Expand All @@ -751,7 +764,8 @@ func TestPodWatcherErrorRecovery(t *testing.T) {
ready: false, phase: k8score.PodFailed}, eventType: watch.Deleted}}}},
expectedEvents: []PodEvent{
&CreatePod{name: "foobar", IP: &net.IPAddr{IP: net.IPv4(10, 42, 42, 42)},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, true, k8score.PodRunning), continues: false},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, true, k8score.PodRunning), continues: true},
&InitialListComplete{},
&DeletePod{name: "foobar", continues: false},
},
},
Expand All @@ -766,7 +780,8 @@ func TestPodWatcherErrorRecovery(t *testing.T) {
},
expectedEvents: []PodEvent{
&CreatePod{name: "foobar", IP: &net.IPAddr{IP: net.IPv4(10, 42, 42, 42)},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, true, k8score.PodRunning), continues: false},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, true, k8score.PodRunning), continues: true},
&InitialListComplete{},
&DeletePod{name: "foobar"},
},
},
Expand All @@ -782,7 +797,8 @@ func TestPodWatcherErrorRecovery(t *testing.T) {
},
expectedEvents: []PodEvent{
&CreatePod{name: "foobar", IP: &net.IPAddr{IP: net.IPv4(10, 42, 42, 42)},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, true, k8score.PodRunning), continues: false},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, true, k8score.PodRunning), continues: true},
&InitialListComplete{},
&DeletePod{name: "foobar"},
},
},
Expand All @@ -796,7 +812,8 @@ func TestPodWatcherErrorRecovery(t *testing.T) {
},
expectedEvents: []PodEvent{
&CreatePod{name: "foobar", IP: &net.IPAddr{IP: net.IPv4(10, 42, 42, 42)},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, true, k8score.PodRunning), continues: false},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, true, k8score.PodRunning), continues: true},
&InitialListComplete{},
&DeletePod{name: "foobar"},
},
},
Expand All @@ -811,7 +828,8 @@ func TestPodWatcherErrorRecovery(t *testing.T) {
},
expectedEvents: []PodEvent{
&CreatePod{name: "foobar", IP: &net.IPAddr{IP: net.IPv4(10, 42, 42, 42)},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, true, k8score.PodRunning), continues: false},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, true, k8score.PodRunning), continues: true},
&InitialListComplete{},
&DeletePod{name: "foobar"},
},
},
Expand All @@ -825,7 +843,8 @@ func TestPodWatcherErrorRecovery(t *testing.T) {
},
expectedEvents: []PodEvent{
&CreatePod{name: "foobar", IP: &net.IPAddr{IP: net.IPv4(10, 42, 42, 42)},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, true, k8score.PodRunning), continues: false},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, true, k8score.PodRunning), continues: true},
&InitialListComplete{},
&DeletePod{name: "foobar"},
},
},
Expand All @@ -843,7 +862,8 @@ func TestPodWatcherErrorRecovery(t *testing.T) {
},
expectedEvents: []PodEvent{
&CreatePod{name: "foobar", IP: &net.IPAddr{IP: net.IPv4(10, 42, 42, 42)},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, true, k8score.PodRunning), continues: false},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, true, k8score.PodRunning), continues: true},
&InitialListComplete{},
&DeletePod{name: "foobar"},
},
},
Expand All @@ -860,7 +880,8 @@ func TestPodWatcherErrorRecovery(t *testing.T) {
},
expectedEvents: []PodEvent{
&CreatePod{name: "foobar", IP: &net.IPAddr{IP: net.IPv4(10, 42, 42, 42)},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, true, k8score.PodRunning), continues: false},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, true, k8score.PodRunning), continues: true},
&InitialListComplete{},
&DeletePod{name: "foobar"},
},
},
Expand All @@ -879,6 +900,25 @@ func TestPodWatcherErrorRecovery(t *testing.T) {
},
expectedEvents: []PodEvent{
&CreatePod{name: "foobar", IP: &net.IPAddr{IP: net.IPv4(10, 42, 42, 42)},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, true, k8score.PodRunning), continues: true},
&InitialListComplete{},
},
},
{
name: "none_exist_and_later_created_becomes_ready",
listRets: []listRetPI{},
watchRets: []watchRet{
{watch: []watchEvent{
{pi: podInfo{name: "foobar", ip: "10.42.42.42", labels: map[string]string{"app": "fimbat"},
ready: false, phase: k8score.PodRunning}, eventType: watch.Added},
{pi: podInfo{name: "foobar", ip: "10.42.42.42", labels: map[string]string{"app": "fimbat"},
ready: true, phase: k8score.PodRunning}, eventType: watch.Modified},
}}},
expectedEvents: []PodEvent{
&InitialListComplete{},
&CreatePod{name: "foobar", IP: &net.IPAddr{IP: net.IPv4(10, 42, 42, 42)},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, false, k8score.PodRunning), continues: false},
&ModPod{name: "foobar", IP: &net.IPAddr{IP: net.IPv4(10, 42, 42, 42)},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, true, k8score.PodRunning), continues: false},
},
},
Expand All @@ -905,7 +945,8 @@ func TestPodWatcherErrorRecovery(t *testing.T) {
},
expectedEvents: []PodEvent{
&CreatePod{name: "foobar", IP: &net.IPAddr{IP: net.IPv4(10, 42, 42, 42)},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, true, k8score.PodRunning), continues: false},
Def: genPod("foobar", "10.42.42.42", map[string]string{"app": "fimbat"}, true, k8score.PodRunning), continues: true},
&InitialListComplete{},
&CreatePod{name: "foobar2", IP: &net.IPAddr{IP: net.IPv4(10, 42, 43, 41)},
Def: genPod("foobar2", "10.42.43.41", map[string]string{"app": "fimbat"}, false, k8score.PodRunning), continues: false},
&ModPod{name: "foobar2", IP: &net.IPAddr{IP: net.IPv4(10, 42, 43, 41)},
Expand Down

0 comments on commit 5ba38e2

Please sign in to comment.