Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(backend): upgrade various old dependencies #11448

Merged
merged 3 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/src/agent/persistence/worker/persistence_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Saver interface {
}

type EventHandler interface {
AddEventHandler(handler cache.ResourceEventHandler)
AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error)
}

// PersistenceWorker is a generic worker to persist objects from a queue.
Expand Down
23 changes: 17 additions & 6 deletions backend/src/agent/persistence/worker/persistence_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ import (
"k8s.io/client-go/tools/cache"
)

type FakeResourceEventHandlerRegistration struct{}

func (h *FakeResourceEventHandlerRegistration) HasSynced() bool {
return true
}

func NewFakeResourceEventHandlerRegistration() *FakeResourceEventHandlerRegistration {
return &FakeResourceEventHandlerRegistration{}
}

type FakeEventHandler struct {
handler cache.ResourceEventHandler
}
Expand All @@ -35,8 +45,9 @@ func NewFakeEventHandler() *FakeEventHandler {
return &FakeEventHandler{}
}

func (h *FakeEventHandler) AddEventHandler(handler cache.ResourceEventHandler) {
func (h *FakeEventHandler) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
h.handler = handler
return NewFakeResourceEventHandlerRegistration(), nil
}

func TestPersistenceWorker_Success(t *testing.T) {
Expand Down Expand Up @@ -65,7 +76,7 @@ func TestPersistenceWorker_Success(t *testing.T) {
saver)

// Test
eventHandler.handler.OnAdd(workflow)
eventHandler.handler.OnAdd(workflow, true)
worker.processNextWorkItem()
assert.Equal(t, workflow, pipelineClient.GetWorkflow("MY_NAMESPACE", "MY_NAME"))
assert.Equal(t, 0, worker.Len())
Expand Down Expand Up @@ -95,7 +106,7 @@ func TestPersistenceWorker_NotFoundError(t *testing.T) {
saver)

// Test
eventHandler.handler.OnAdd(workflow)
eventHandler.handler.OnAdd(workflow, true)
worker.processNextWorkItem()
assert.Nil(t, pipelineClient.GetWorkflow("MY_NAMESPACE", "MY_NAME"))
assert.Equal(t, 0, worker.Len())
Expand Down Expand Up @@ -126,7 +137,7 @@ func TestPersistenceWorker_GetWorklowError(t *testing.T) {
saver)

// Test
eventHandler.handler.OnAdd(workflow)
eventHandler.handler.OnAdd(workflow, true)
worker.processNextWorkItem()
assert.Nil(t, pipelineClient.GetWorkflow("MY_NAMESPACE", "MY_NAME"))
assert.Equal(t, 1, worker.Len())
Expand Down Expand Up @@ -160,7 +171,7 @@ func TestPersistenceWorker_ReportWorkflowRetryableError(t *testing.T) {
saver)

// Test
eventHandler.handler.OnAdd(workflow)
eventHandler.handler.OnAdd(workflow, true)
worker.processNextWorkItem()
assert.Nil(t, pipelineClient.GetWorkflow("MY_NAMESPACE", "MY_NAME"))
assert.Equal(t, 1, worker.Len())
Expand Down Expand Up @@ -193,7 +204,7 @@ func TestPersistenceWorker_ReportWorkflowNonRetryableError(t *testing.T) {
saver)

// Test
eventHandler.handler.OnAdd(workflow)
eventHandler.handler.OnAdd(workflow, true)
worker.processNextWorkItem()
assert.Nil(t, pipelineClient.GetWorkflow("MY_NAMESPACE", "MY_NAME"))
assert.Equal(t, 0, worker.Len())
Expand Down
4 changes: 2 additions & 2 deletions backend/src/apiserver/storage/job_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package storage

import (
corev1 "k8s.io/api/core/v1"
"testing"
"time"

Expand All @@ -27,7 +28,6 @@ import (
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apis/core"
)

const (
Expand Down Expand Up @@ -773,7 +773,7 @@ func TestUpdateJob_Success(t *testing.T) {
Conditions: []swfapi.ScheduledWorkflowCondition{
{
Type: swfapi.ScheduledWorkflowEnabled,
Status: core.ConditionTrue,
Status: corev1.ConditionTrue,
LastProbeTime: metav1.NewTime(time.Unix(10, 0).UTC()),
LastTransitionTime: metav1.NewTime(time.Unix(20, 0).UTC()),
Reason: string(swfapi.ScheduledWorkflowEnabled),
Expand Down
2 changes: 1 addition & 1 deletion backend/src/common/util/execution_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type ExecutionClient interface {
// ExecutionInformerEventHandler only has AddEventHandler function
// ExecutionInformer has all functions we need in current code base
type ExecutionInformerEventHandler interface {
AddEventHandler(funcs cache.ResourceEventHandler)
AddEventHandler(funcs cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error)
}
type ExecutionInformer interface {
ExecutionInformerEventHandler
Expand Down
4 changes: 2 additions & 2 deletions backend/src/common/util/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,8 +870,8 @@ type PipelineRunInformer struct {
factory prsinformers.SharedInformerFactory
}

func (pri *PipelineRunInformer) AddEventHandler(funcs cache.ResourceEventHandler) {
pri.informer.Informer().AddEventHandler(funcs)
func (pri *PipelineRunInformer) AddEventHandler(funcs cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
return pri.informer.Informer().AddEventHandler(funcs)
}

func (pri *PipelineRunInformer) HasSynced() func() bool {
Expand Down
8 changes: 4 additions & 4 deletions backend/src/common/util/scheduled_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ package util

import (
"encoding/json"
corev1 "k8s.io/api/core/v1"
"testing"
"time"

workflowapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
swfapi "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apis/core"
)

func TestScheduledWorkflow_Getters(t *testing.T) {
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestScheduledWorkflow_ConditionSummary(t *testing.T) {
Conditions: []swfapi.ScheduledWorkflowCondition{
{
Type: swfapi.ScheduledWorkflowEnabled,
Status: core.ConditionTrue,
Status: corev1.ConditionTrue,
LastProbeTime: metav1.NewTime(time.Unix(10, 0).UTC()),
LastTransitionTime: metav1.NewTime(time.Unix(20, 0).UTC()),
Reason: string(swfapi.ScheduledWorkflowEnabled),
Expand All @@ -89,14 +89,14 @@ func TestScheduledWorkflow_ConditionSummary(t *testing.T) {
Conditions: []swfapi.ScheduledWorkflowCondition{
{
Type: swfapi.ScheduledWorkflowEnabled,
Status: core.ConditionTrue,
Status: corev1.ConditionTrue,
LastProbeTime: metav1.NewTime(time.Unix(10, 0).UTC()),
LastTransitionTime: metav1.NewTime(time.Unix(20, 0).UTC()),
Reason: string(swfapi.ScheduledWorkflowEnabled),
Message: "The schedule is enabled.",
}, {
Type: swfapi.ScheduledWorkflowDisabled,
Status: core.ConditionTrue,
Status: corev1.ConditionTrue,
LastProbeTime: metav1.NewTime(time.Unix(10, 0).UTC()),
LastTransitionTime: metav1.NewTime(time.Unix(20, 0).UTC()),
Reason: string(swfapi.ScheduledWorkflowEnabled),
Expand Down
4 changes: 2 additions & 2 deletions backend/src/common/util/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,8 +899,8 @@ type WorkflowInformer struct {
factory argoinformer.SharedInformerFactory
}

func (wfi *WorkflowInformer) AddEventHandler(funcs cache.ResourceEventHandler) {
wfi.informer.Informer().AddEventHandler(funcs)
func (wfi *WorkflowInformer) AddEventHandler(funcs cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
return wfi.informer.Informer().AddEventHandler(funcs)
}

func (wfi *WorkflowInformer) HasSynced() func() bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package util
import (
"fmt"
"hash/fnv"
corev1 "k8s.io/api/core/v1"
"math"
"sort"
"strconv"
Expand All @@ -25,7 +26,6 @@ import (
commonutil "github.com/kubeflow/pipelines/backend/src/common/util"
swfapi "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apis/core"
)

const (
Expand Down Expand Up @@ -338,7 +338,7 @@ func (s *ScheduledWorkflow) updateNextTriggeredTime(epoch int64) {

func (s *ScheduledWorkflow) getStatusAndMessage(activeCount int) (
conditionType swfapi.ScheduledWorkflowConditionType,
status core.ConditionStatus, message string) {
status corev1.ConditionStatus, message string) {
// Schedule messages
const (
ScheduleEnabledMessage = "The schedule is enabled."
Expand All @@ -349,15 +349,15 @@ func (s *ScheduledWorkflow) getStatusAndMessage(activeCount int) (

if s.isOneOffRun() {
if s.hasRunAtLeastOnce() && activeCount == 0 {
return swfapi.ScheduledWorkflowSucceeded, core.ConditionTrue, ScheduleSucceededMessage
return swfapi.ScheduledWorkflowSucceeded, corev1.ConditionTrue, ScheduleSucceededMessage
} else {
return swfapi.ScheduledWorkflowRunning, core.ConditionTrue, ScheduleRunningMessage
return swfapi.ScheduledWorkflowRunning, corev1.ConditionTrue, ScheduleRunningMessage
}
} else {
if s.enabled() {
return swfapi.ScheduledWorkflowEnabled, core.ConditionTrue, ScheduleEnabledMessage
return swfapi.ScheduledWorkflowEnabled, corev1.ConditionTrue, ScheduleEnabledMessage
} else {
return swfapi.ScheduledWorkflowDisabled, core.ConditionTrue, ScheduleDisabledMessage
return swfapi.ScheduledWorkflowDisabled, corev1.ConditionTrue, ScheduleDisabledMessage
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package util

import (
"encoding/json"
corev1 "k8s.io/api/core/v1"
"math"
"strconv"
"testing"
Expand All @@ -27,7 +28,6 @@ import (
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/apis/core"
)

func TestScheduledWorkflow_maxConcurrency(t *testing.T) {
Expand Down Expand Up @@ -560,7 +560,7 @@ func TestScheduledWorkflow_GetNextScheduledEpoch_UpdateStatus_NoWorkflow(t *test
Status: swfapi.ScheduledWorkflowStatus{
Conditions: []swfapi.ScheduledWorkflowCondition{{
Type: swfapi.ScheduledWorkflowEnabled,
Status: core.ConditionTrue,
Status: corev1.ConditionTrue,
LastProbeTime: metav1.NewTime(time.Unix(updatedEpoch, 0).UTC()),
LastTransitionTime: metav1.NewTime(time.Unix(updatedEpoch, 0).UTC()),
Reason: string(swfapi.ScheduledWorkflowEnabled),
Expand Down Expand Up @@ -641,7 +641,7 @@ func TestScheduledWorkflow_GetNextScheduledEpoch_UpdateStatus_WithWorkflow(t *te
Status: swfapi.ScheduledWorkflowStatus{
Conditions: []swfapi.ScheduledWorkflowCondition{{
Type: swfapi.ScheduledWorkflowEnabled,
Status: core.ConditionTrue,
Status: corev1.ConditionTrue,
LastProbeTime: metav1.NewTime(time.Unix(updatedEpoch, 0).UTC()),
LastTransitionTime: metav1.NewTime(time.Unix(updatedEpoch, 0).UTC()),
Reason: string(swfapi.ScheduledWorkflowEnabled),
Expand Down
7 changes: 6 additions & 1 deletion backend/src/crd/controller/viewer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"flag"
"log"
"sigs.k8s.io/controller-runtime/pkg/cache"

"github.com/golang/glog"

Expand Down Expand Up @@ -77,7 +78,11 @@ func main() {

// Create a controller that is in charge of Viewer types, and also responds to
// changes to any deployment and services that is owned by any Viewer instance.
mgr, err := manager.New(cfg, manager.Options{Namespace: *namespace})
cacheOpts := cache.Options{
DefaultNamespaces: map[string]cache.Config{
*namespace: {},
}}
mgr, err := manager.New(cfg, manager.Options{Cache: cacheOpts})
if err != nil {
log.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions backend/src/crd/pkg/apis/scheduledworkflow/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ package v1beta1

import (
"github.com/kubeflow/pipelines/backend/src/common"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/apis/core"
)

// +genclient
Expand Down Expand Up @@ -198,7 +198,7 @@ type ScheduledWorkflowCondition struct {
// Type of job condition.
Type ScheduledWorkflowConditionType `json:"type,omitempty"`
// Status of the condition, one of True, False, Unknown.
Status core.ConditionStatus `json:"status,omitempty"`
Status corev1.ConditionStatus `json:"status,omitempty"`
// Last time the condition was checked.
// +optional
LastProbeTime metav1.Time `json:"lastHeartbeatTime,omitempty"`
Expand Down
4 changes: 2 additions & 2 deletions backend/src/v2/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ func extendPodSpecPatch(
},
Spec: k8score.PersistentVolumeClaimSpec{
AccessModes: accessModes,
Resources: k8score.ResourceRequirements{
Resources: k8score.VolumeResourceRequirements{
Requests: k8score.ResourceList{
k8score.ResourceStorage: k8sres.MustParse(ephemeralVolumeSpec.GetSize()),
},
Expand Down Expand Up @@ -1779,7 +1779,7 @@ func createPVC(
},
Spec: k8score.PersistentVolumeClaimSpec{
AccessModes: accessModes,
Resources: k8score.ResourceRequirements{
Resources: k8score.VolumeResourceRequirements{
Requests: k8score.ResourceList{
k8score.ResourceStorage: k8sres.MustParse(volumeSizeInput.GetStringValue()),
},
Expand Down
6 changes: 3 additions & 3 deletions backend/src/v2/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1483,7 +1483,7 @@ func Test_extendPodSpecPatch_GenericEphemeralVolume(t *testing.T) {
VolumeClaimTemplate: &k8score.PersistentVolumeClaimTemplate{
Spec: k8score.PersistentVolumeClaimSpec{
AccessModes: []k8score.PersistentVolumeAccessMode{k8score.ReadWriteOnce},
Resources: k8score.ResourceRequirements{
Resources: k8score.VolumeResourceRequirements{
Requests: k8score.ResourceList{
k8score.ResourceStorage: k8sres.MustParse("5Gi"),
},
Expand Down Expand Up @@ -1573,7 +1573,7 @@ func Test_extendPodSpecPatch_GenericEphemeralVolume(t *testing.T) {
VolumeClaimTemplate: &k8score.PersistentVolumeClaimTemplate{
Spec: k8score.PersistentVolumeClaimSpec{
AccessModes: []k8score.PersistentVolumeAccessMode{k8score.ReadWriteOnce},
Resources: k8score.ResourceRequirements{
Resources: k8score.VolumeResourceRequirements{
Requests: k8score.ResourceList{
k8score.ResourceStorage: k8sres.MustParse("5Gi"),
},
Expand All @@ -1598,7 +1598,7 @@ func Test_extendPodSpecPatch_GenericEphemeralVolume(t *testing.T) {
},
Spec: k8score.PersistentVolumeClaimSpec{
AccessModes: []k8score.PersistentVolumeAccessMode{k8score.ReadWriteOnce},
Resources: k8score.ResourceRequirements{
Resources: k8score.VolumeResourceRequirements{
Requests: k8score.ResourceList{
k8score.ResourceStorage: k8sres.MustParse("10Gi"),
},
Expand Down
Loading
Loading