Skip to content

Commit

Permalink
fix(backend): upgrade various old dependencies (kubeflow#11448)
Browse files Browse the repository at this point in the history
* fix(backend): update various old dependencies

Signed-off-by: Humair Khan <[email protected]>

update k8s deps

Signed-off-by: Humair Khan <[email protected]>

chore: upgrade x/net

Signed-off-by: Humair Khan <[email protected]>

* fix(backend): patch go-git to v5 drop in replacement for v4

Signed-off-by: Humair Khan <[email protected]>

* fix(backend): update outdated dependencies

Signed-off-by: Humair Khan <[email protected]>

---------

Signed-off-by: Humair Khan <[email protected]>
  • Loading branch information
HumairAK authored Dec 20, 2024
1 parent 97acacb commit 803d7a8
Show file tree
Hide file tree
Showing 23 changed files with 625 additions and 838 deletions.
2 changes: 1 addition & 1 deletion backend/src/agent/persistence/worker/persistence_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Saver interface {
}

type EventHandler interface {
AddEventHandler(handler cache.ResourceEventHandler)
AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error)
}

// PersistenceWorker is a generic worker to persist objects from a queue.
Expand Down
23 changes: 17 additions & 6 deletions backend/src/agent/persistence/worker/persistence_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ import (
"k8s.io/client-go/tools/cache"
)

type FakeResourceEventHandlerRegistration struct{}

func (h *FakeResourceEventHandlerRegistration) HasSynced() bool {
return true
}

func NewFakeResourceEventHandlerRegistration() *FakeResourceEventHandlerRegistration {
return &FakeResourceEventHandlerRegistration{}
}

type FakeEventHandler struct {
handler cache.ResourceEventHandler
}
Expand All @@ -35,8 +45,9 @@ func NewFakeEventHandler() *FakeEventHandler {
return &FakeEventHandler{}
}

func (h *FakeEventHandler) AddEventHandler(handler cache.ResourceEventHandler) {
func (h *FakeEventHandler) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
h.handler = handler
return NewFakeResourceEventHandlerRegistration(), nil
}

func TestPersistenceWorker_Success(t *testing.T) {
Expand Down Expand Up @@ -65,7 +76,7 @@ func TestPersistenceWorker_Success(t *testing.T) {
saver)

// Test
eventHandler.handler.OnAdd(workflow)
eventHandler.handler.OnAdd(workflow, true)
worker.processNextWorkItem()
assert.Equal(t, workflow, pipelineClient.GetWorkflow("MY_NAMESPACE", "MY_NAME"))
assert.Equal(t, 0, worker.Len())
Expand Down Expand Up @@ -95,7 +106,7 @@ func TestPersistenceWorker_NotFoundError(t *testing.T) {
saver)

// Test
eventHandler.handler.OnAdd(workflow)
eventHandler.handler.OnAdd(workflow, true)
worker.processNextWorkItem()
assert.Nil(t, pipelineClient.GetWorkflow("MY_NAMESPACE", "MY_NAME"))
assert.Equal(t, 0, worker.Len())
Expand Down Expand Up @@ -126,7 +137,7 @@ func TestPersistenceWorker_GetWorklowError(t *testing.T) {
saver)

// Test
eventHandler.handler.OnAdd(workflow)
eventHandler.handler.OnAdd(workflow, true)
worker.processNextWorkItem()
assert.Nil(t, pipelineClient.GetWorkflow("MY_NAMESPACE", "MY_NAME"))
assert.Equal(t, 1, worker.Len())
Expand Down Expand Up @@ -160,7 +171,7 @@ func TestPersistenceWorker_ReportWorkflowRetryableError(t *testing.T) {
saver)

// Test
eventHandler.handler.OnAdd(workflow)
eventHandler.handler.OnAdd(workflow, true)
worker.processNextWorkItem()
assert.Nil(t, pipelineClient.GetWorkflow("MY_NAMESPACE", "MY_NAME"))
assert.Equal(t, 1, worker.Len())
Expand Down Expand Up @@ -193,7 +204,7 @@ func TestPersistenceWorker_ReportWorkflowNonRetryableError(t *testing.T) {
saver)

// Test
eventHandler.handler.OnAdd(workflow)
eventHandler.handler.OnAdd(workflow, true)
worker.processNextWorkItem()
assert.Nil(t, pipelineClient.GetWorkflow("MY_NAMESPACE", "MY_NAME"))
assert.Equal(t, 0, worker.Len())
Expand Down
4 changes: 2 additions & 2 deletions backend/src/apiserver/storage/job_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package storage

import (
corev1 "k8s.io/api/core/v1"
"testing"
"time"

Expand All @@ -27,7 +28,6 @@ import (
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apis/core"
)

const (
Expand Down Expand Up @@ -773,7 +773,7 @@ func TestUpdateJob_Success(t *testing.T) {
Conditions: []swfapi.ScheduledWorkflowCondition{
{
Type: swfapi.ScheduledWorkflowEnabled,
Status: core.ConditionTrue,
Status: corev1.ConditionTrue,
LastProbeTime: metav1.NewTime(time.Unix(10, 0).UTC()),
LastTransitionTime: metav1.NewTime(time.Unix(20, 0).UTC()),
Reason: string(swfapi.ScheduledWorkflowEnabled),
Expand Down
2 changes: 1 addition & 1 deletion backend/src/common/util/execution_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type ExecutionClient interface {
// ExecutionInformerEventHandler only has AddEventHandler function
// ExecutionInformer has all functions we need in current code base
type ExecutionInformerEventHandler interface {
AddEventHandler(funcs cache.ResourceEventHandler)
AddEventHandler(funcs cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error)
}
type ExecutionInformer interface {
ExecutionInformerEventHandler
Expand Down
4 changes: 2 additions & 2 deletions backend/src/common/util/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,8 +870,8 @@ type PipelineRunInformer struct {
factory prsinformers.SharedInformerFactory
}

func (pri *PipelineRunInformer) AddEventHandler(funcs cache.ResourceEventHandler) {
pri.informer.Informer().AddEventHandler(funcs)
func (pri *PipelineRunInformer) AddEventHandler(funcs cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
return pri.informer.Informer().AddEventHandler(funcs)
}

func (pri *PipelineRunInformer) HasSynced() func() bool {
Expand Down
8 changes: 4 additions & 4 deletions backend/src/common/util/scheduled_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ package util

import (
"encoding/json"
corev1 "k8s.io/api/core/v1"
"testing"
"time"

workflowapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
swfapi "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apis/core"
)

func TestScheduledWorkflow_Getters(t *testing.T) {
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestScheduledWorkflow_ConditionSummary(t *testing.T) {
Conditions: []swfapi.ScheduledWorkflowCondition{
{
Type: swfapi.ScheduledWorkflowEnabled,
Status: core.ConditionTrue,
Status: corev1.ConditionTrue,
LastProbeTime: metav1.NewTime(time.Unix(10, 0).UTC()),
LastTransitionTime: metav1.NewTime(time.Unix(20, 0).UTC()),
Reason: string(swfapi.ScheduledWorkflowEnabled),
Expand All @@ -89,14 +89,14 @@ func TestScheduledWorkflow_ConditionSummary(t *testing.T) {
Conditions: []swfapi.ScheduledWorkflowCondition{
{
Type: swfapi.ScheduledWorkflowEnabled,
Status: core.ConditionTrue,
Status: corev1.ConditionTrue,
LastProbeTime: metav1.NewTime(time.Unix(10, 0).UTC()),
LastTransitionTime: metav1.NewTime(time.Unix(20, 0).UTC()),
Reason: string(swfapi.ScheduledWorkflowEnabled),
Message: "The schedule is enabled.",
}, {
Type: swfapi.ScheduledWorkflowDisabled,
Status: core.ConditionTrue,
Status: corev1.ConditionTrue,
LastProbeTime: metav1.NewTime(time.Unix(10, 0).UTC()),
LastTransitionTime: metav1.NewTime(time.Unix(20, 0).UTC()),
Reason: string(swfapi.ScheduledWorkflowEnabled),
Expand Down
4 changes: 2 additions & 2 deletions backend/src/common/util/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,8 +899,8 @@ type WorkflowInformer struct {
factory argoinformer.SharedInformerFactory
}

func (wfi *WorkflowInformer) AddEventHandler(funcs cache.ResourceEventHandler) {
wfi.informer.Informer().AddEventHandler(funcs)
func (wfi *WorkflowInformer) AddEventHandler(funcs cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
return wfi.informer.Informer().AddEventHandler(funcs)
}

func (wfi *WorkflowInformer) HasSynced() func() bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package util
import (
"fmt"
"hash/fnv"
corev1 "k8s.io/api/core/v1"
"math"
"sort"
"strconv"
Expand All @@ -25,7 +26,6 @@ import (
commonutil "github.com/kubeflow/pipelines/backend/src/common/util"
swfapi "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apis/core"
)

const (
Expand Down Expand Up @@ -338,7 +338,7 @@ func (s *ScheduledWorkflow) updateNextTriggeredTime(epoch int64) {

func (s *ScheduledWorkflow) getStatusAndMessage(activeCount int) (
conditionType swfapi.ScheduledWorkflowConditionType,
status core.ConditionStatus, message string) {
status corev1.ConditionStatus, message string) {
// Schedule messages
const (
ScheduleEnabledMessage = "The schedule is enabled."
Expand All @@ -349,15 +349,15 @@ func (s *ScheduledWorkflow) getStatusAndMessage(activeCount int) (

if s.isOneOffRun() {
if s.hasRunAtLeastOnce() && activeCount == 0 {
return swfapi.ScheduledWorkflowSucceeded, core.ConditionTrue, ScheduleSucceededMessage
return swfapi.ScheduledWorkflowSucceeded, corev1.ConditionTrue, ScheduleSucceededMessage
} else {
return swfapi.ScheduledWorkflowRunning, core.ConditionTrue, ScheduleRunningMessage
return swfapi.ScheduledWorkflowRunning, corev1.ConditionTrue, ScheduleRunningMessage
}
} else {
if s.enabled() {
return swfapi.ScheduledWorkflowEnabled, core.ConditionTrue, ScheduleEnabledMessage
return swfapi.ScheduledWorkflowEnabled, corev1.ConditionTrue, ScheduleEnabledMessage
} else {
return swfapi.ScheduledWorkflowDisabled, core.ConditionTrue, ScheduleDisabledMessage
return swfapi.ScheduledWorkflowDisabled, corev1.ConditionTrue, ScheduleDisabledMessage
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package util

import (
"encoding/json"
corev1 "k8s.io/api/core/v1"
"math"
"strconv"
"testing"
Expand All @@ -27,7 +28,6 @@ import (
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apis/core"
)

func TestScheduledWorkflow_maxConcurrency(t *testing.T) {
Expand Down Expand Up @@ -560,7 +560,7 @@ func TestScheduledWorkflow_GetNextScheduledEpoch_UpdateStatus_NoWorkflow(t *test
Status: swfapi.ScheduledWorkflowStatus{
Conditions: []swfapi.ScheduledWorkflowCondition{{
Type: swfapi.ScheduledWorkflowEnabled,
Status: core.ConditionTrue,
Status: corev1.ConditionTrue,
LastProbeTime: metav1.NewTime(time.Unix(updatedEpoch, 0).UTC()),
LastTransitionTime: metav1.NewTime(time.Unix(updatedEpoch, 0).UTC()),
Reason: string(swfapi.ScheduledWorkflowEnabled),
Expand Down Expand Up @@ -641,7 +641,7 @@ func TestScheduledWorkflow_GetNextScheduledEpoch_UpdateStatus_WithWorkflow(t *te
Status: swfapi.ScheduledWorkflowStatus{
Conditions: []swfapi.ScheduledWorkflowCondition{{
Type: swfapi.ScheduledWorkflowEnabled,
Status: core.ConditionTrue,
Status: corev1.ConditionTrue,
LastProbeTime: metav1.NewTime(time.Unix(updatedEpoch, 0).UTC()),
LastTransitionTime: metav1.NewTime(time.Unix(updatedEpoch, 0).UTC()),
Reason: string(swfapi.ScheduledWorkflowEnabled),
Expand Down
7 changes: 6 additions & 1 deletion backend/src/crd/controller/viewer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"flag"
"log"
"sigs.k8s.io/controller-runtime/pkg/cache"

"github.com/golang/glog"

Expand Down Expand Up @@ -77,7 +78,11 @@ func main() {

// Create a controller that is in charge of Viewer types, and also responds to
// changes to any deployment and services that is owned by any Viewer instance.
mgr, err := manager.New(cfg, manager.Options{Namespace: *namespace})
cacheOpts := cache.Options{
DefaultNamespaces: map[string]cache.Config{
*namespace: {},
}}
mgr, err := manager.New(cfg, manager.Options{Cache: cacheOpts})
if err != nil {
log.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions backend/src/crd/pkg/apis/scheduledworkflow/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ package v1beta1

import (
"github.com/kubeflow/pipelines/backend/src/common"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/apis/core"
)

// +genclient
Expand Down Expand Up @@ -198,7 +198,7 @@ type ScheduledWorkflowCondition struct {
// Type of job condition.
Type ScheduledWorkflowConditionType `json:"type,omitempty"`
// Status of the condition, one of True, False, Unknown.
Status core.ConditionStatus `json:"status,omitempty"`
Status corev1.ConditionStatus `json:"status,omitempty"`
// Last time the condition was checked.
// +optional
LastProbeTime metav1.Time `json:"lastHeartbeatTime,omitempty"`
Expand Down
4 changes: 2 additions & 2 deletions backend/src/v2/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ func extendPodSpecPatch(
},
Spec: k8score.PersistentVolumeClaimSpec{
AccessModes: accessModes,
Resources: k8score.ResourceRequirements{
Resources: k8score.VolumeResourceRequirements{
Requests: k8score.ResourceList{
k8score.ResourceStorage: k8sres.MustParse(ephemeralVolumeSpec.GetSize()),
},
Expand Down Expand Up @@ -1779,7 +1779,7 @@ func createPVC(
},
Spec: k8score.PersistentVolumeClaimSpec{
AccessModes: accessModes,
Resources: k8score.ResourceRequirements{
Resources: k8score.VolumeResourceRequirements{
Requests: k8score.ResourceList{
k8score.ResourceStorage: k8sres.MustParse(volumeSizeInput.GetStringValue()),
},
Expand Down
6 changes: 3 additions & 3 deletions backend/src/v2/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1483,7 +1483,7 @@ func Test_extendPodSpecPatch_GenericEphemeralVolume(t *testing.T) {
VolumeClaimTemplate: &k8score.PersistentVolumeClaimTemplate{
Spec: k8score.PersistentVolumeClaimSpec{
AccessModes: []k8score.PersistentVolumeAccessMode{k8score.ReadWriteOnce},
Resources: k8score.ResourceRequirements{
Resources: k8score.VolumeResourceRequirements{
Requests: k8score.ResourceList{
k8score.ResourceStorage: k8sres.MustParse("5Gi"),
},
Expand Down Expand Up @@ -1573,7 +1573,7 @@ func Test_extendPodSpecPatch_GenericEphemeralVolume(t *testing.T) {
VolumeClaimTemplate: &k8score.PersistentVolumeClaimTemplate{
Spec: k8score.PersistentVolumeClaimSpec{
AccessModes: []k8score.PersistentVolumeAccessMode{k8score.ReadWriteOnce},
Resources: k8score.ResourceRequirements{
Resources: k8score.VolumeResourceRequirements{
Requests: k8score.ResourceList{
k8score.ResourceStorage: k8sres.MustParse("5Gi"),
},
Expand All @@ -1598,7 +1598,7 @@ func Test_extendPodSpecPatch_GenericEphemeralVolume(t *testing.T) {
},
Spec: k8score.PersistentVolumeClaimSpec{
AccessModes: []k8score.PersistentVolumeAccessMode{k8score.ReadWriteOnce},
Resources: k8score.ResourceRequirements{
Resources: k8score.VolumeResourceRequirements{
Requests: k8score.ResourceList{
k8score.ResourceStorage: k8sres.MustParse("10Gi"),
},
Expand Down
Loading

0 comments on commit 803d7a8

Please sign in to comment.