Skip to content

Commit

Permalink
chore(deps): Update client-go and controller-runtime
Browse files Browse the repository at this point in the history
This updates k8s.io/client-go from v0.25.9 to v0.30.1 and
sigs.k8s.io/controller-runtime from v0.11.1 to v0.18.6.

Some other dependencies were updated for dependency resolution to
succeed.

Signed-off-by: mprahl <[email protected]>
  • Loading branch information
mprahl committed Dec 18, 2024
1 parent cb07619 commit d3a9b46
Show file tree
Hide file tree
Showing 27 changed files with 362 additions and 3,165 deletions.
2 changes: 1 addition & 1 deletion backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

# 1. Build api server application
FROM golang:1.21.7-bookworm as builder
FROM golang:1.22.10-bookworm as builder
RUN apt-get update && apt-get install -y cmake clang musl-dev openssl
WORKDIR /go/src/github.com/kubeflow/pipelines

Expand Down
2 changes: 1 addition & 1 deletion backend/Dockerfile.cacheserver
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

# Dockerfile for building the source code of cache_server
FROM golang:1.21.7-alpine3.19 as builder
FROM golang:1.22.10-alpine3.21 as builder

RUN apk update && apk upgrade && \
apk add --no-cache bash git openssh gcc musl-dev
Expand Down
2 changes: 1 addition & 1 deletion backend/Dockerfile.conformance
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

# Dockerfile for building the source code of conformance tests
FROM golang:1.21.7-alpine3.19 as builder
FROM golang:1.22.10-alpine3.21 as builder

RUN apk update && apk upgrade && \
apk add --no-cache bash git openssh gcc musl-dev
Expand Down
2 changes: 1 addition & 1 deletion backend/Dockerfile.driver
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM golang:1.21.7-alpine3.19 as builder
FROM golang:1.22.10-alpine3.21 as builder

WORKDIR /go/src/github.com/kubeflow/pipelines

Expand Down
2 changes: 1 addition & 1 deletion backend/Dockerfile.launcher
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM golang:1.21.7-alpine3.19 as builder
FROM golang:1.22.10-alpine3.21 as builder

WORKDIR /go/src/github.com/kubeflow/pipelines

Expand Down
2 changes: 1 addition & 1 deletion backend/Dockerfile.persistenceagent
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM golang:1.21.7-alpine3.19 as builder
FROM golang:1.22.10-alpine3.21 as builder

WORKDIR /go/src/github.com/kubeflow/pipelines

Expand Down
2 changes: 1 addition & 1 deletion backend/Dockerfile.scheduledworkflow
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM golang:1.21.7-alpine3.19 as builder
FROM golang:1.22.10-alpine3.21 as builder

WORKDIR /go/src/github.com/kubeflow/pipelines

Expand Down
2 changes: 1 addition & 1 deletion backend/Dockerfile.viewercontroller
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM golang:1.21.7-alpine3.19 as builder
FROM golang:1.22.10-alpine3.21 as builder

RUN apk update && apk upgrade
RUN apk add --no-cache git gcc musl-dev
Expand Down
2 changes: 1 addition & 1 deletion backend/api/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

# Generate client code (go & json) from API protocol buffers
FROM golang:1.21 as generator
FROM golang:1.22 as generator
ENV GRPC_GATEWAY_VERSION v1.9.6
ENV GO_SWAGGER_VERSION v0.18.0
ENV GOLANG_PROTOBUF_VERSION v1.5.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func NewScheduledWorkflowClient(informer v1beta1.ScheduledWorkflowInformer) *Sch
}

// AddEventHandler adds an event handler.
func (c *ScheduledWorkflowClient) AddEventHandler(funcs *cache.ResourceEventHandlerFuncs) {
c.informer.Informer().AddEventHandler(funcs)
func (c *ScheduledWorkflowClient) AddEventHandler(funcs *cache.ResourceEventHandlerFuncs) (cache.ResourceEventHandlerRegistration, error) {
return c.informer.Informer().AddEventHandler(funcs)
}

// HasSynced returns true if the shared informer's store has synced.
Expand Down
4 changes: 2 additions & 2 deletions backend/src/agent/persistence/client/workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func NewWorkflowClient(informer util.ExecutionInformer) *WorkflowClient {
}

// AddEventHandler adds an event handler.
func (c *WorkflowClient) AddEventHandler(funcs *cache.ResourceEventHandlerFuncs) {
c.informer.AddEventHandler(funcs)
func (c *WorkflowClient) AddEventHandler(funcs *cache.ResourceEventHandlerFuncs) (cache.ResourceEventHandlerRegistration, error) {
return c.informer.AddEventHandler(funcs)
}

// HasSynced returns true if the shared informer's store has synced.
Expand Down
5 changes: 4 additions & 1 deletion backend/src/agent/persistence/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,14 @@ func main() {
log.Fatalf("Error creating ML pipeline API Server client: %v", err)
}

controller := NewPersistenceAgent(
controller, err := NewPersistenceAgent(
swfInformerFactory,
execInformer,
pipelineClient,
util.NewRealTime())
if err != nil {
log.Fatalf("Failed to instantiate the controller: %v", err)
}

go swfInformerFactory.Start(stopCh)
go execInformer.InformerFactoryStart(stopCh)
Expand Down
15 changes: 11 additions & 4 deletions backend/src/agent/persistence/persistence_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func NewPersistenceAgent(
swfInformerFactory swfinformers.SharedInformerFactory,
execInformer util.ExecutionInformer,
pipelineClient *client.PipelineClient,
time util.TimeInterface) *PersistenceAgent {
time util.TimeInterface,
) (*PersistenceAgent, error) {
// obtain references to shared informers
swfInformer := swfInformerFactory.Scheduledworkflow().V1beta1().ScheduledWorkflows()

Expand All @@ -57,12 +58,18 @@ func NewPersistenceAgent(
swfClient := client.NewScheduledWorkflowClient(swfInformer)
workflowClient := client.NewWorkflowClient(execInformer)

swfWorker := worker.NewPersistenceWorker(time, swfregister.Kind, swfInformer.Informer(), true,
swfWorker, err := worker.NewPersistenceWorker(time, swfregister.Kind, swfInformer.Informer(), true,
worker.NewScheduledWorkflowSaver(swfClient, pipelineClient))
if err != nil {
return nil, err
}

workflowWorker := worker.NewPersistenceWorker(time, workflowregister.WorkflowKind,
workflowWorker, err := worker.NewPersistenceWorker(time, workflowregister.WorkflowKind,
execInformer, true,
worker.NewWorkflowSaver(workflowClient, pipelineClient, ttlSecondsAfterWorkflowFinish))
if err != nil {
return nil, err
}

agent := &PersistenceAgent{
swfClient: swfClient,
Expand All @@ -73,7 +80,7 @@ func NewPersistenceAgent(

log.Info("Setting up event handlers")

return agent
return agent, nil
}

// Run will set up the event handlers for types we are interested in, as well
Expand Down
12 changes: 8 additions & 4 deletions backend/src/agent/persistence/worker/persistence_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Saver interface {
}

type EventHandler interface {
AddEventHandler(handler cache.ResourceEventHandler)
AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error)
}

// PersistenceWorker is a generic worker to persist objects from a queue.
Expand All @@ -62,7 +62,8 @@ func NewPersistenceWorker(
name string,
eventHandler util.ExecutionInformerEventHandler,
enforceRequeueDelays bool,
saver Saver) *PersistenceWorker {
saver Saver,
) (*PersistenceWorker, error) {
worker := &PersistenceWorker{
workqueue: workqueue.NewNamedRateLimitingQueue(
workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), name),
Expand All @@ -74,15 +75,18 @@ func NewPersistenceWorker(
log.Info("Setting up event handlers")

// Set up an event handler for when the Scheduled Workflow changes
eventHandler.AddEventHandler(&cache.ResourceEventHandlerFuncs{
_, err := eventHandler.AddEventHandler(&cache.ResourceEventHandlerFuncs{
AddFunc: worker.enqueue,
UpdateFunc: func(old, new interface{}) {
worker.enqueue(new)
},
DeleteFunc: worker.enqueueForDelete,
})
if err != nil {
return nil, err
}

return worker
return worker, nil
}

func (p *PersistenceWorker) Shutdown() {
Expand Down
29 changes: 18 additions & 11 deletions backend/src/agent/persistence/worker/persistence_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ func NewFakeEventHandler() *FakeEventHandler {
return &FakeEventHandler{}
}

func (h *FakeEventHandler) AddEventHandler(handler cache.ResourceEventHandler) {
func (h *FakeEventHandler) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
h.handler = handler

return nil, nil
}

func TestPersistenceWorker_Success(t *testing.T) {
Expand All @@ -57,15 +59,16 @@ func TestPersistenceWorker_Success(t *testing.T) {
// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
worker, err := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
"PERSISTENCE_WORKER",
eventHandler,
false,
saver)
assert.NoError(t, err)

// Test
eventHandler.handler.OnAdd(workflow)
eventHandler.handler.OnAdd(workflow, true)
worker.processNextWorkItem()
assert.Equal(t, workflow, pipelineClient.GetWorkflow("MY_NAMESPACE", "MY_NAME"))
assert.Equal(t, 0, worker.Len())
Expand All @@ -87,15 +90,16 @@ func TestPersistenceWorker_NotFoundError(t *testing.T) {
// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
worker, err := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
"PERSISTENCE_WORKER",
eventHandler,
false,
saver)
assert.NoError(t, err)

// Test
eventHandler.handler.OnAdd(workflow)
eventHandler.handler.OnAdd(workflow, true)
worker.processNextWorkItem()
assert.Nil(t, pipelineClient.GetWorkflow("MY_NAMESPACE", "MY_NAME"))
assert.Equal(t, 0, worker.Len())
Expand All @@ -118,15 +122,16 @@ func TestPersistenceWorker_GetWorklowError(t *testing.T) {
// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
worker, err := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
"PERSISTENCE_WORKER",
eventHandler,
false,
saver)
assert.NoError(t, err)

// Test
eventHandler.handler.OnAdd(workflow)
eventHandler.handler.OnAdd(workflow, true)
worker.processNextWorkItem()
assert.Nil(t, pipelineClient.GetWorkflow("MY_NAMESPACE", "MY_NAME"))
assert.Equal(t, 1, worker.Len())
Expand All @@ -152,15 +157,16 @@ func TestPersistenceWorker_ReportWorkflowRetryableError(t *testing.T) {
// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
worker, err := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
"PERSISTENCE_WORKER",
eventHandler,
false,
saver)
assert.NoError(t, err)

// Test
eventHandler.handler.OnAdd(workflow)
eventHandler.handler.OnAdd(workflow, true)
worker.processNextWorkItem()
assert.Nil(t, pipelineClient.GetWorkflow("MY_NAMESPACE", "MY_NAME"))
assert.Equal(t, 1, worker.Len())
Expand All @@ -185,15 +191,16 @@ func TestPersistenceWorker_ReportWorkflowNonRetryableError(t *testing.T) {
// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
worker, err := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
"PERSISTENCE_WORKER",
eventHandler,
false,
saver)
assert.NoError(t, err)

// Test
eventHandler.handler.OnAdd(workflow)
eventHandler.handler.OnAdd(workflow, true)
worker.processNextWorkItem()
assert.Nil(t, pipelineClient.GetWorkflow("MY_NAMESPACE", "MY_NAME"))
assert.Equal(t, 0, worker.Len())
Expand Down
2 changes: 1 addition & 1 deletion backend/src/common/util/execution_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type ExecutionClient interface {
// ExecutionInformerEventHandler only has AddEventHandler function
// ExecutionInformer has all functions we need in current code base
type ExecutionInformerEventHandler interface {
AddEventHandler(funcs cache.ResourceEventHandler)
AddEventHandler(funcs cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error)
}
type ExecutionInformer interface {
ExecutionInformerEventHandler
Expand Down
4 changes: 2 additions & 2 deletions backend/src/common/util/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,8 +870,8 @@ type PipelineRunInformer struct {
factory prsinformers.SharedInformerFactory
}

func (pri *PipelineRunInformer) AddEventHandler(funcs cache.ResourceEventHandler) {
pri.informer.Informer().AddEventHandler(funcs)
func (pri *PipelineRunInformer) AddEventHandler(funcs cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
return pri.informer.Informer().AddEventHandler(funcs)
}

func (pri *PipelineRunInformer) HasSynced() func() bool {
Expand Down
4 changes: 2 additions & 2 deletions backend/src/common/util/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,8 +899,8 @@ type WorkflowInformer struct {
factory argoinformer.SharedInformerFactory
}

func (wfi *WorkflowInformer) AddEventHandler(funcs cache.ResourceEventHandler) {
wfi.informer.Informer().AddEventHandler(funcs)
func (wfi *WorkflowInformer) AddEventHandler(funcs cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
return wfi.informer.Informer().AddEventHandler(funcs)
}

func (wfi *WorkflowInformer) HasSynced() func() bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func NewScheduledWorkflowClient(clientSet swfclientset.Interface,
}

// AddEventHandler adds an event handler.
func (p *ScheduledWorkflowClient) AddEventHandler(funcs *cache.ResourceEventHandlerFuncs) {
p.informer.Informer().AddEventHandler(funcs)
func (p *ScheduledWorkflowClient) AddEventHandler(funcs *cache.ResourceEventHandlerFuncs) (cache.ResourceEventHandlerRegistration, error) {
return p.informer.Informer().AddEventHandler(funcs)
}

// HasSynced returns true if the shared informer's store has synced.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func NewWorkflowClient(clientSet commonutil.ExecutionClient,
}

// AddEventHandler adds an event handler.
func (p *WorkflowClient) AddEventHandler(funcs *cache.ResourceEventHandlerFuncs) {
p.informer.AddEventHandler(funcs)
func (p *WorkflowClient) AddEventHandler(funcs *cache.ResourceEventHandlerFuncs) (cache.ResourceEventHandlerRegistration, error) {
return p.informer.AddEventHandler(funcs)
}

// HasSynced returns true if the shared informer's store has synced.
Expand Down
Loading

0 comments on commit d3a9b46

Please sign in to comment.