From 1c40f6255fea6fa5fa504c916ce1d3b5d8ba6083 Mon Sep 17 00:00:00 2001 From: Sherif Akoush Date: Wed, 11 Dec 2024 14:32:40 +0000 Subject: [PATCH] fix: Add timeout to contexts in client calls (#6125) * add timeout context from infer call for modelgateway * add timeout context to pipeline gateway * set timeout context on process request * add a test for grpc call timeout * add agent k8s api call timeout * add context timeout for shutting down services * add timeout for controller k8s api calls * add timeout for control plane context * add timeout context to reconcile logic * pr comments --- .../mlops/experiment_controller.go | 4 +- .../controllers/mlops/model_controller.go | 4 +- .../controllers/mlops/pipeline_controller.go | 4 +- .../mlops/seldonruntime_controller.go | 10 +- .../controllers/mlops/server_controller.go | 9 +- operator/main.go | 5 + operator/pkg/constants/constants.go | 15 +- operator/scheduler/control_plane.go | 18 +- operator/scheduler/control_plane_test.go | 2 +- operator/scheduler/experiment.go | 116 +++++----- operator/scheduler/model.go | 137 ++++++------ operator/scheduler/pipeline.go | 209 ++++++++---------- operator/scheduler/server.go | 12 +- scheduler/go.mod | 2 +- scheduler/pkg/agent/client.go | 2 +- scheduler/pkg/agent/drainservice/server.go | 6 +- scheduler/pkg/agent/k8s/secrets.go | 7 +- scheduler/pkg/agent/k8s/secrets_test.go | 3 +- scheduler/pkg/agent/rclone/rclone_config.go | 3 +- scheduler/pkg/agent/rproxy.go | 4 +- scheduler/pkg/kafka/gateway/worker.go | 40 ++-- scheduler/pkg/kafka/gateway/worker_test.go | 67 ++++-- scheduler/pkg/kafka/pipeline/httpserver.go | 2 +- .../kafka/pipeline/multi_topic_consumer.go | 12 +- scheduler/pkg/metrics/agent.go | 6 +- scheduler/pkg/util/constants.go | 15 ++ 26 files changed, 413 insertions(+), 301 deletions(-) diff --git a/operator/controllers/mlops/experiment_controller.go b/operator/controllers/mlops/experiment_controller.go index 62a2c6c71a..99ef595679 100644 --- a/operator/controllers/mlops/experiment_controller.go +++ b/operator/controllers/mlops/experiment_controller.go @@ -44,7 +44,7 @@ func (r *ExperimentReconciler) handleFinalizer(ctx context.Context, logger logr. // Add our finalizer if !utils.ContainsStr(experiment.ObjectMeta.Finalizers, constants.ExperimentFinalizerName) { experiment.ObjectMeta.Finalizers = append(experiment.ObjectMeta.Finalizers, constants.ExperimentFinalizerName) - if err := r.Update(context.Background(), experiment); err != nil { + if err := r.Update(ctx, experiment); err != nil { return true, err } } @@ -84,6 +84,8 @@ func (r *ExperimentReconciler) handleFinalizer(ctx context.Context, logger logr. // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile func (r *ExperimentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx).WithName("Reconcile") + ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout) + defer cancel() experiment := &mlopsv1alpha1.Experiment{} if err := r.Get(ctx, req.NamespacedName, experiment); err != nil { diff --git a/operator/controllers/mlops/model_controller.go b/operator/controllers/mlops/model_controller.go index 4f39838425..1b3245ef0c 100644 --- a/operator/controllers/mlops/model_controller.go +++ b/operator/controllers/mlops/model_controller.go @@ -46,7 +46,7 @@ func (r *ModelReconciler) handleFinalizer(ctx context.Context, logger logr.Logge // Add our finalizer if !utils.ContainsStr(model.ObjectMeta.Finalizers, constants.ModelFinalizerName) { model.ObjectMeta.Finalizers = append(model.ObjectMeta.Finalizers, constants.ModelFinalizerName) - if err := r.Update(context.Background(), model); err != nil { + if err := r.Update(ctx, model); err != nil { return true, err } } @@ -78,6 +78,8 @@ func (r *ModelReconciler) handleFinalizer(ctx context.Context, logger logr.Logge func (r *ModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx).WithName("Reconcile") + ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout) + defer cancel() model := &mlopsv1alpha1.Model{} if err := r.Get(ctx, req.NamespacedName, model); err != nil { diff --git a/operator/controllers/mlops/pipeline_controller.go b/operator/controllers/mlops/pipeline_controller.go index 7e3bcc477d..0fc2fb20d2 100644 --- a/operator/controllers/mlops/pipeline_controller.go +++ b/operator/controllers/mlops/pipeline_controller.go @@ -49,7 +49,7 @@ func (r *PipelineReconciler) handleFinalizer( // Add our finalizer if !utils.ContainsStr(pipeline.ObjectMeta.Finalizers, constants.PipelineFinalizerName) { pipeline.ObjectMeta.Finalizers = append(pipeline.ObjectMeta.Finalizers, constants.PipelineFinalizerName) - if err := r.Update(context.Background(), pipeline); err != nil { + if err := r.Update(ctx, pipeline); err != nil { return true, err } } @@ -94,6 +94,8 @@ func (r *PipelineReconciler) handleFinalizer( // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx).WithName("Reconcile") + ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout) + defer cancel() pipeline := &mlopsv1alpha1.Pipeline{} if err := r.Get(ctx, req.NamespacedName, pipeline); err != nil { diff --git a/operator/controllers/mlops/seldonruntime_controller.go b/operator/controllers/mlops/seldonruntime_controller.go index 837c55af60..3e39c0874c 100644 --- a/operator/controllers/mlops/seldonruntime_controller.go +++ b/operator/controllers/mlops/seldonruntime_controller.go @@ -65,7 +65,7 @@ func (r *SeldonRuntimeReconciler) handleFinalizer(ctx context.Context, logger lo // Add our finalizer if !utils.ContainsStr(runtime.ObjectMeta.Finalizers, constants.RuntimeFinalizerName) { runtime.ObjectMeta.Finalizers = append(runtime.ObjectMeta.Finalizers, constants.RuntimeFinalizerName) - if err := r.Update(context.Background(), runtime); err != nil { + if err := r.Update(ctx, runtime); err != nil { return true, err } } @@ -120,6 +120,8 @@ func (r *SeldonRuntimeReconciler) handleFinalizer(ctx context.Context, logger lo // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.4/pkg/reconcile func (r *SeldonRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx).WithName("Reconcile") + ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout) + defer cancel() seldonRuntime := &mlopsv1alpha1.SeldonRuntime{} if err := r.Get(ctx, req.NamespacedName, seldonRuntime); err != nil { @@ -214,9 +216,11 @@ func (r *SeldonRuntimeReconciler) updateStatus(seldonRuntime *mlopsv1alpha1.Seld // Find SeldonRuntimes that reference the changes SeldonConfig // TODO: pass an actual context from the caller to be used here func (r *SeldonRuntimeReconciler) mapSeldonRuntimesFromSeldonConfig(_ context.Context, obj client.Object) []reconcile.Request { - logger := log.FromContext(context.Background()).WithName("mapSeldonRuntimesFromSeldonConfig") + ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPICallsTxTimeout) + defer cancel() + logger := log.FromContext(ctx).WithName("mapSeldonRuntimesFromSeldonConfig") var seldonRuntimes mlopsv1alpha1.SeldonRuntimeList - if err := r.Client.List(context.Background(), &seldonRuntimes); err != nil { + if err := r.Client.List(ctx, &seldonRuntimes); err != nil { logger.Error(err, "error listing seldonRuntimes") return nil } diff --git a/operator/controllers/mlops/server_controller.go b/operator/controllers/mlops/server_controller.go index e7b3d95741..f6b6353bd1 100644 --- a/operator/controllers/mlops/server_controller.go +++ b/operator/controllers/mlops/server_controller.go @@ -33,6 +33,7 @@ import ( mlopsv1alpha1 "github.com/seldonio/seldon-core/operator/v2/apis/mlops/v1alpha1" "github.com/seldonio/seldon-core/operator/v2/controllers/reconcilers/common" serverreconcile "github.com/seldonio/seldon-core/operator/v2/controllers/reconcilers/server" + "github.com/seldonio/seldon-core/operator/v2/pkg/constants" scheduler "github.com/seldonio/seldon-core/operator/v2/scheduler" ) @@ -65,6 +66,8 @@ type ServerReconciler struct { // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile func (r *ServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx).WithName("Reconcile") + ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout) + defer cancel() logger.Info("Received reconcile for Server", "name", req.Name, "namespace", req.NamespacedName.Namespace) @@ -186,9 +189,11 @@ func (r *ServerReconciler) updateStatus(server *mlopsv1alpha1.Server) error { // Find Servers that need reconcilliation from a change to a given ServerConfig // TODO: pass an actual context from the caller to be used here func (r *ServerReconciler) mapServerFromServerConfig(_ context.Context, obj client.Object) []reconcile.Request { - logger := log.FromContext(context.Background()).WithName("mapServerFromServerConfig") + ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPICallsTxTimeout) + defer cancel() + logger := log.FromContext(ctx).WithName("mapServerFromServerConfig") var servers mlopsv1alpha1.ServerList - if err := r.Client.List(context.Background(), &servers); err != nil { + if err := r.Client.List(ctx, &servers); err != nil { logger.Error(err, "error listing servers") return nil } diff --git a/operator/main.go b/operator/main.go index 6e4586b1b8..82963484bd 100644 --- a/operator/main.go +++ b/operator/main.go @@ -12,6 +12,7 @@ package main import ( "flag" "os" + "time" //+kubebuilder:scaffold:imports "go.uber.org/zap/zapcore" @@ -43,6 +44,10 @@ func init() { //+kubebuilder:scaffold:scheme } +const ( + defaultReconcileTimeout = 2 * time.Minute +) + func main() { var metricsAddr string var enableLeaderElection bool diff --git a/operator/pkg/constants/constants.go b/operator/pkg/constants/constants.go index 1c52c078c7..6372b394a1 100644 --- a/operator/pkg/constants/constants.go +++ b/operator/pkg/constants/constants.go @@ -9,7 +9,10 @@ the Change License after the Change Date as each is defined in accordance with t package constants -import "os" +import ( + "os" + "time" +) const ( // note: we do not have a finalizer for servers as we rely on the draining logic to reschedule models @@ -50,3 +53,13 @@ const ( ReconcileUpdateNeeded ReconcileCreateNeeded ) + +// k8s api call timeout +const ( + // this is a constant that can be used to set the timeout for k8s api calls + // currently it can be used for a series of calls in a single logical operation + // which is expected to be completed in this amount of time (as opposed to a single call) + K8sAPICallsTxTimeout = 2 * time.Minute + ControlPlaneExecTimeOut = 5 * time.Minute + ReconcileTimeout = 5 * time.Minute +) diff --git a/operator/scheduler/control_plane.go b/operator/scheduler/control_plane.go index 9f0d1dfd8d..65466c580f 100644 --- a/operator/scheduler/control_plane.go +++ b/operator/scheduler/control_plane.go @@ -19,10 +19,8 @@ import ( "google.golang.org/grpc/status" "github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler" -) -const ( - execTimeOut = 5 * time.Minute + "github.com/seldonio/seldon-core/operator/v2/pkg/constants" ) func (s *SchedulerClient) SubscribeControlPlaneEvents(ctx context.Context, grpcClient scheduler.SchedulerClient, namespace string) error { @@ -49,10 +47,12 @@ func (s *SchedulerClient) SubscribeControlPlaneEvents(ctx context.Context, grpcC } logger.Info("Received event to handle state", "event", event) - fn := func() error { + fn := func(ctx context.Context) error { return s.handleStateOnReconnect(ctx, grpcClient, namespace, event.GetEvent()) } - _, err = execWithTimeout(fn, execTimeOut) + // in general we could have also handled timeout via a context with timeout + // but we want to handle the timeout in a more controlled way and not depending on the other side + _, err = execWithTimeout(ctx, fn, constants.ControlPlaneExecTimeOut) if err != nil { logger.Error(err, "Failed to handle state on reconnect") return err @@ -64,10 +64,14 @@ func (s *SchedulerClient) SubscribeControlPlaneEvents(ctx context.Context, grpcC return nil } -func execWithTimeout(f func() error, d time.Duration) (bool, error) { +func execWithTimeout(baseContext context.Context, f func(ctx context.Context) error, d time.Duration) (bool, error) { + // cancel the context after the timeout + ctxWithCancel, cancel := context.WithCancel(baseContext) + defer cancel() + errChan := make(chan error, 1) go func() { - errChan <- f() + errChan <- f(ctxWithCancel) close(errChan) }() t := time.NewTimer(d) diff --git a/operator/scheduler/control_plane_test.go b/operator/scheduler/control_plane_test.go index 3bdd535f55..7792ff6997 100644 --- a/operator/scheduler/control_plane_test.go +++ b/operator/scheduler/control_plane_test.go @@ -67,7 +67,7 @@ func TestSendWithTimeout(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - hasExpired, err := execWithTimeout(func() error { + hasExpired, err := execWithTimeout(context.Background(), func(_ context.Context) error { return fn(test.err) }, test.sleepTime) g.Expect(hasExpired).To(Equal(test.isExpired)) diff --git a/operator/scheduler/experiment.go b/operator/scheduler/experiment.go index 50be2e6cf8..727794a28d 100644 --- a/operator/scheduler/experiment.go +++ b/operator/scheduler/experiment.go @@ -102,79 +102,79 @@ func (s *SchedulerClient) SubscribeExperimentEvents(ctx context.Context, grpcCli logger.Info("Received experiment event with no k8s metadata so ignoring", "Experiment", event.ExperimentName) continue } - experiment := &v1alpha1.Experiment{} - err = s.Get(ctx, client.ObjectKey{Name: event.ExperimentName, Namespace: event.KubernetesMeta.Namespace}, experiment) - if err != nil { - logger.Error(err, "Failed to get experiment", "name", event.ExperimentName, "namespace", event.KubernetesMeta.Namespace) - continue - } - if !experiment.ObjectMeta.DeletionTimestamp.IsZero() { - logger.Info("Experiment is pending deletion", "experiment", experiment.Name) - if !event.Active { - retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - latestExperiment := &v1alpha1.Experiment{} - err = s.Get(ctx, client.ObjectKey{Name: event.ExperimentName, Namespace: event.KubernetesMeta.Namespace}, latestExperiment) - if err != nil { + // An experiment is not active if it is being deleted or some models are not ready + if !event.Active { + retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + ctxWithTimeout, cancel := context.WithTimeout(ctx, constants.K8sAPICallsTxTimeout) + defer cancel() + + latestExperiment := &v1alpha1.Experiment{} + err = s.Get(ctxWithTimeout, client.ObjectKey{Name: event.ExperimentName, Namespace: event.KubernetesMeta.Namespace}, latestExperiment) + if err != nil { + return err + } + if !latestExperiment.ObjectMeta.DeletionTimestamp.IsZero() { // Experiment is being deleted + // remove finalizer now we have completed successfully + latestExperiment.ObjectMeta.Finalizers = utils.RemoveStr(latestExperiment.ObjectMeta.Finalizers, constants.ExperimentFinalizerName) + if err := s.Update(ctxWithTimeout, latestExperiment); err != nil { + logger.Error(err, "Failed to remove finalizer", "experiment", latestExperiment.GetName()) return err } - if !latestExperiment.ObjectMeta.DeletionTimestamp.IsZero() { // Experiment is being deleted - // remove finalizer now we have completed successfully - latestExperiment.ObjectMeta.Finalizers = utils.RemoveStr(latestExperiment.ObjectMeta.Finalizers, constants.ExperimentFinalizerName) - if err := s.Update(ctx, latestExperiment); err != nil { - logger.Error(err, "Failed to remove finalizer", "experiment", latestExperiment.GetName()) - return err - } - } - return nil - }) - if retryErr != nil { - logger.Error(err, "Failed to remove finalizer after retries") } + return nil + }) + if retryErr != nil { + logger.Error(err, "Failed to remove finalizer after retries") } } // Try to update status - retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - experiment := &v1alpha1.Experiment{} - err = s.Get(ctx, client.ObjectKey{Name: event.ExperimentName, Namespace: event.KubernetesMeta.Namespace}, experiment) - if err != nil { - return err - } - if event.KubernetesMeta.Generation != experiment.Generation { - logger.Info("Ignoring event for old generation", "currentGeneration", experiment.Generation, "eventGeneration", event.KubernetesMeta.Generation, "server", event.ExperimentName) - return nil - } - // Handle status update - if event.Active { - logger.Info("Setting experiment to ready", "experiment", event.ExperimentName) - experiment.Status.CreateAndSetCondition(v1alpha1.ExperimentReady, true, event.StatusDescription) - } else { - logger.Info("Setting experiment to not ready", "experiment", event.ExperimentName) - experiment.Status.CreateAndSetCondition(v1alpha1.ExperimentReady, false, event.StatusDescription) - } - if event.CandidatesReady { - experiment.Status.CreateAndSetCondition(v1alpha1.CandidatesReady, true, "Candidates ready") - } else { - experiment.Status.CreateAndSetCondition(v1alpha1.CandidatesReady, false, "Candidates not ready") - } - if event.MirrorReady { - experiment.Status.CreateAndSetCondition(v1alpha1.MirrorReady, true, "Mirror ready") - } else { - experiment.Status.CreateAndSetCondition(v1alpha1.MirrorReady, false, "Mirror not ready") + { + retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + ctxWithTimeout, cancel := context.WithTimeout(ctx, constants.K8sAPICallsTxTimeout) + defer cancel() + + experiment := &v1alpha1.Experiment{} + err = s.Get(ctxWithTimeout, client.ObjectKey{Name: event.ExperimentName, Namespace: event.KubernetesMeta.Namespace}, experiment) + if err != nil { + return err + } + if event.KubernetesMeta.Generation != experiment.Generation { + logger.Info("Ignoring event for old generation", "currentGeneration", experiment.Generation, "eventGeneration", event.KubernetesMeta.Generation, "server", event.ExperimentName) + return nil + } + // Handle status update + if event.Active { + logger.Info("Setting experiment to ready", "experiment", event.ExperimentName) + experiment.Status.CreateAndSetCondition(v1alpha1.ExperimentReady, true, event.StatusDescription) + } else { + logger.Info("Setting experiment to not ready", "experiment", event.ExperimentName) + experiment.Status.CreateAndSetCondition(v1alpha1.ExperimentReady, false, event.StatusDescription) + } + if event.CandidatesReady { + experiment.Status.CreateAndSetCondition(v1alpha1.CandidatesReady, true, "Candidates ready") + } else { + experiment.Status.CreateAndSetCondition(v1alpha1.CandidatesReady, false, "Candidates not ready") + } + if event.MirrorReady { + experiment.Status.CreateAndSetCondition(v1alpha1.MirrorReady, true, "Mirror ready") + } else { + experiment.Status.CreateAndSetCondition(v1alpha1.MirrorReady, false, "Mirror not ready") + } + return s.updateExperimentStatus(ctxWithTimeout, experiment) + }) + if retryErr != nil { + logger.Error(err, "Failed to update status", "experiment", event.ExperimentName) } - return s.updateExperimentStatus(experiment) - }) - if retryErr != nil { - logger.Error(err, "Failed to update status", "experiment", event.ExperimentName) } } return nil } -func (s *SchedulerClient) updateExperimentStatus(experiment *v1alpha1.Experiment) error { - if err := s.Status().Update(context.TODO(), experiment); err != nil { +func (s *SchedulerClient) updateExperimentStatus(ctx context.Context, experiment *v1alpha1.Experiment) error { + if err := s.Status().Update(ctx, experiment); err != nil { s.recorder.Eventf(experiment, v1.EventTypeWarning, "UpdateFailed", "Failed to update status for experiment %q: %v", experiment.Name, err) return err diff --git a/operator/scheduler/model.go b/operator/scheduler/model.go index f49217be38..9c5a18aad5 100644 --- a/operator/scheduler/model.go +++ b/operator/scheduler/model.go @@ -161,10 +161,12 @@ func (s *SchedulerClient) SubscribeModelEvents(ctx context.Context, grpcClient s // Handle terminated event to remove finalizer if canRemoveFinalizer(latestVersionStatus.State.State) { retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - latestModel := &v1alpha1.Model{} + ctxWithTimeout, cancel := context.WithTimeout(ctx, constants.K8sAPICallsTxTimeout) + defer cancel() + latestModel := &v1alpha1.Model{} err = s.Get( - ctx, + ctxWithTimeout, client.ObjectKey{ Name: event.ModelName, Namespace: latestVersionStatus.GetKubernetesMeta().Namespace, @@ -181,7 +183,7 @@ func (s *SchedulerClient) SubscribeModelEvents(ctx context.Context, grpcClient s latestModel.ObjectMeta.Finalizers, constants.ModelFinalizerName, ) - if err := s.Update(ctx, latestModel); err != nil { + if err := s.Update(ctxWithTimeout, latestModel); err != nil { logger.Error(err, "Failed to remove finalizer", "model", latestModel.GetName()) return err } @@ -195,70 +197,75 @@ func (s *SchedulerClient) SubscribeModelEvents(ctx context.Context, grpcClient s } // Try to update status - retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - latestModel := &v1alpha1.Model{} - - err = s.Get( - ctx, - client.ObjectKey{ - Name: event.ModelName, - Namespace: latestVersionStatus.GetKubernetesMeta().Namespace, - }, - latestModel, - ) - if err != nil { - return err - } + { + retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + ctxWithTimeout, cancel := context.WithTimeout(ctx, constants.K8sAPICallsTxTimeout) + defer cancel() - if latestVersionStatus.GetKubernetesMeta().Generation != latestModel.Generation { - logger.Info( - "Ignoring event for old generation", - "currentGeneration", latestModel.Generation, - "eventGeneration", latestVersionStatus.GetKubernetesMeta().Generation, - "model", event.ModelName, - ) - return nil - } + latestModel := &v1alpha1.Model{} - // Handle status update - modelStatus := latestVersionStatus.GetState() - switch modelStatus.GetState() { - case scheduler.ModelStatus_ModelAvailable: - logger.Info( - "Setting model to ready", - "name", event.ModelName, - "state", modelStatus.GetState().String(), - ) - latestModel.Status.CreateAndSetCondition( - v1alpha1.ModelReady, - true, - modelStatus.GetState().String(), - modelStatus.GetReason(), - ) - default: - logger.Info( - "Setting model to not ready", - "name", event.ModelName, - "state", modelStatus.GetState().String(), + err = s.Get( + ctxWithTimeout, + client.ObjectKey{ + Name: event.ModelName, + Namespace: latestVersionStatus.GetKubernetesMeta().Namespace, + }, + latestModel, ) - latestModel.Status.CreateAndSetCondition( - v1alpha1.ModelReady, - false, - modelStatus.GetState().String(), - modelStatus.GetReason(), + if err != nil { + return err + } + + if latestVersionStatus.GetKubernetesMeta().Generation != latestModel.Generation { + logger.Info( + "Ignoring event for old generation", + "currentGeneration", latestModel.Generation, + "eventGeneration", latestVersionStatus.GetKubernetesMeta().Generation, + "model", event.ModelName, + ) + return nil + } + + // Handle status update + modelStatus := latestVersionStatus.GetState() + switch modelStatus.GetState() { + case scheduler.ModelStatus_ModelAvailable: + logger.Info( + "Setting model to ready", + "name", event.ModelName, + "state", modelStatus.GetState().String(), + ) + latestModel.Status.CreateAndSetCondition( + v1alpha1.ModelReady, + true, + modelStatus.GetState().String(), + modelStatus.GetReason(), + ) + default: + logger.Info( + "Setting model to not ready", + "name", event.ModelName, + "state", modelStatus.GetState().String(), + ) + latestModel.Status.CreateAndSetCondition( + v1alpha1.ModelReady, + false, + modelStatus.GetState().String(), + modelStatus.GetReason(), + ) + } + + // Set the total number of replicas targeted by this model + latestModel.Status.Replicas = int32( + modelStatus.GetAvailableReplicas() + + modelStatus.GetUnavailableReplicas(), ) + latestModel.Status.Selector = "server=" + latestVersionStatus.ServerName + return s.updateModelStatus(ctxWithTimeout, latestModel) + }) + if retryErr != nil { + logger.Error(err, "Failed to update status", "model", event.ModelName) } - - // Set the total number of replicas targeted by this model - latestModel.Status.Replicas = int32( - modelStatus.GetAvailableReplicas() + - modelStatus.GetUnavailableReplicas(), - ) - latestModel.Status.Selector = "server=" + latestVersionStatus.ServerName - return s.updateModelStatus(latestModel) - }) - if retryErr != nil { - logger.Error(err, "Failed to update status", "model", event.ModelName) } } @@ -284,11 +291,11 @@ func modelReady(status v1alpha1.ModelStatus) bool { status.GetCondition(apis.ConditionReady).Status == v1.ConditionTrue } -func (s *SchedulerClient) updateModelStatus(model *v1alpha1.Model) error { +func (s *SchedulerClient) updateModelStatus(ctx context.Context, model *v1alpha1.Model) error { existingModel := &v1alpha1.Model{} namespacedName := types.NamespacedName{Name: model.Name, Namespace: model.Namespace} - if err := s.Get(context.TODO(), namespacedName, existingModel); err != nil { + if err := s.Get(ctx, namespacedName, existingModel); err != nil { if errors.IsNotFound(err) { //Ignore NotFound errors return nil } @@ -299,7 +306,7 @@ func (s *SchedulerClient) updateModelStatus(model *v1alpha1.Model) error { if equality.Semantic.DeepEqual(existingModel.Status, model.Status) { // Not updating as no difference } else { - if err := s.Status().Update(context.TODO(), model); err != nil { + if err := s.Status().Update(ctx, model); err != nil { s.recorder.Eventf( model, v1.EventTypeWarning, diff --git a/operator/scheduler/pipeline.go b/operator/scheduler/pipeline.go index fce4af998c..6c8efdbb06 100644 --- a/operator/scheduler/pipeline.go +++ b/operator/scheduler/pipeline.go @@ -74,7 +74,7 @@ func (s *SchedulerClient) UnloadPipeline(ctx context.Context, pipeline *v1alpha1 scheduler.PipelineVersionState_PipelineTerminating.String(), "Pipeline unload requested", ) - _ = s.updatePipelineStatusImpl(pipeline) + _ = s.updatePipelineStatusImpl(ctx, pipeline) return nil, false } @@ -125,135 +125,116 @@ func (s *SchedulerClient) SubscribePipelineEvents(ctx context.Context, grpcClien "State", pv.GetState().String(), ) - pipeline := &v1alpha1.Pipeline{} - err = s.Get( - ctx, - client.ObjectKey{ - Name: event.PipelineName, - Namespace: pv.GetPipeline().GetKubernetesMeta().GetNamespace(), - }, - pipeline, - ) - if err != nil { - logger.Error( - err, - "Failed to get pipeline", - "name", event.PipelineName, - "namespace", pv.GetPipeline().GetKubernetesMeta().GetNamespace(), - ) - continue - } - - if !pipeline.ObjectMeta.DeletionTimestamp.IsZero() { - logger.Info( - "Pipeline is pending deletion", - "pipeline", pipeline.Name, - "state", pv.State.Status.String(), - ) - if canRemovePipelineFinalizer(pv.State.Status) { - retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - latestPipeline := &v1alpha1.Pipeline{} - err = s.Get( - ctx, - client.ObjectKey{ - Name: event.PipelineName, - Namespace: pv.GetPipeline().GetKubernetesMeta().GetNamespace(), - }, - latestPipeline, + if canRemovePipelineFinalizer(pv.State.Status) { + retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + ctxWithTimeout, cancel := context.WithTimeout(ctx, constants.K8sAPICallsTxTimeout) + defer cancel() + + latestPipeline := &v1alpha1.Pipeline{} + err = s.Get( + ctxWithTimeout, + client.ObjectKey{ + Name: event.PipelineName, + Namespace: pv.GetPipeline().GetKubernetesMeta().GetNamespace(), + }, + latestPipeline, + ) + if err != nil { + return err + } + if !latestPipeline.ObjectMeta.DeletionTimestamp.IsZero() { // Pipeline is being deleted + // remove finalizer now we have completed successfully + latestPipeline.ObjectMeta.Finalizers = utils.RemoveStr( + latestPipeline.ObjectMeta.Finalizers, + constants.PipelineFinalizerName, ) - if err != nil { + if err := s.Update(ctxWithTimeout, latestPipeline); err != nil { + logger.Error(err, "Failed to remove finalizer", "pipeline", latestPipeline.GetName()) return err } - if !latestPipeline.ObjectMeta.DeletionTimestamp.IsZero() { // Pipeline is being deleted - // remove finalizer now we have completed successfully - latestPipeline.ObjectMeta.Finalizers = utils.RemoveStr( - latestPipeline.ObjectMeta.Finalizers, - constants.PipelineFinalizerName, - ) - if err := s.Update(ctx, latestPipeline); err != nil { - logger.Error(err, "Failed to remove finalizer", "pipeline", latestPipeline.GetName()) - return err - } - } - return nil - }) - if retryErr != nil { - logger.Error(err, "Failed to remove finalizer after retries") } + return nil + }) + if retryErr != nil { + logger.Error(err, "Failed to remove finalizer after retries") } } // Try to update status - retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - pipeline := &v1alpha1.Pipeline{} - err = s.Get( - ctx, - client.ObjectKey{ - Name: event.PipelineName, - Namespace: pv.GetPipeline().GetKubernetesMeta().GetNamespace(), - }, - pipeline, - ) - if err != nil { - return err - } - - if pv.GetPipeline().GetKubernetesMeta().GetGeneration() != pipeline.Generation { - logger.Info( - "Ignoring event for old generation", - "currentGeneration", pipeline.Generation, - "eventGeneration", pv.GetPipeline().GetKubernetesMeta().GetGeneration(), - "server", event.PipelineName, + { + retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + ctxWithTimeout, cancel := context.WithTimeout(ctx, constants.K8sAPICallsTxTimeout) + defer cancel() + + pipeline := &v1alpha1.Pipeline{} + err = s.Get( + ctxWithTimeout, + client.ObjectKey{ + Name: event.PipelineName, + Namespace: pv.GetPipeline().GetKubernetesMeta().GetNamespace(), + }, + pipeline, ) - return nil - } + if err != nil { + return err + } - // Handle status update - switch pv.State.Status { - case scheduler.PipelineVersionState_PipelineReady: - logger.Info( - "Setting pipeline to ready", - "pipeline", pipeline.Name, - "generation", pipeline.Generation, - ) - pipeline.Status.CreateAndSetCondition( - v1alpha1.PipelineReady, - true, - pv.State.Reason, - pv.State.Status.String(), - ) - default: - logger.Info( - "Setting pipeline to not ready", - "pipeline", pipeline.Name, - "generation", pipeline.Generation, - ) - pipeline.Status.CreateAndSetCondition( - v1alpha1.PipelineReady, - false, - pv.State.Reason, - pv.State.Status.String(), - ) - } - // Set models ready - if pv.State.ModelsReady { - pipeline.Status.CreateAndSetCondition(v1alpha1.ModelsReady, true, "Models all available", "") - } else { - pipeline.Status.CreateAndSetCondition(v1alpha1.ModelsReady, false, "Some models are not available", "") - } + if pv.GetPipeline().GetKubernetesMeta().GetGeneration() != pipeline.Generation { + logger.Info( + "Ignoring event for old generation", + "currentGeneration", pipeline.Generation, + "eventGeneration", pv.GetPipeline().GetKubernetesMeta().GetGeneration(), + "server", event.PipelineName, + ) + return nil + } - return s.updatePipelineStatusImpl(pipeline) - }) - if retryErr != nil { - logger.Error(retryErr, "Failed to update status", "pipeline", event.PipelineName) - } + // Handle status update + switch pv.State.Status { + case scheduler.PipelineVersionState_PipelineReady: + logger.Info( + "Setting pipeline to ready", + "pipeline", pipeline.Name, + "generation", pipeline.Generation, + ) + pipeline.Status.CreateAndSetCondition( + v1alpha1.PipelineReady, + true, + pv.State.Reason, + pv.State.Status.String(), + ) + default: + logger.Info( + "Setting pipeline to not ready", + "pipeline", pipeline.Name, + "generation", pipeline.Generation, + ) + pipeline.Status.CreateAndSetCondition( + v1alpha1.PipelineReady, + false, + pv.State.Reason, + pv.State.Status.String(), + ) + } + // Set models ready + if pv.State.ModelsReady { + pipeline.Status.CreateAndSetCondition(v1alpha1.ModelsReady, true, "Models all available", "") + } else { + pipeline.Status.CreateAndSetCondition(v1alpha1.ModelsReady, false, "Some models are not available", "") + } + return s.updatePipelineStatusImpl(ctxWithTimeout, pipeline) + }) + if retryErr != nil { + logger.Error(retryErr, "Failed to update status", "pipeline", event.PipelineName) + } + } } return nil } -func (s *SchedulerClient) updatePipelineStatusImpl(pipeline *v1alpha1.Pipeline) error { - if err := s.Status().Update(context.TODO(), pipeline); err != nil { +func (s *SchedulerClient) updatePipelineStatusImpl(ctx context.Context, pipeline *v1alpha1.Pipeline) error { + if err := s.Status().Update(ctx, pipeline); err != nil { s.recorder.Eventf(pipeline, v1.EventTypeWarning, "UpdateFailed", "Failed to update status for pipeline %q: %v", pipeline.Name, err) return err diff --git a/operator/scheduler/server.go b/operator/scheduler/server.go index 8ecf39bf17..de2ab90756 100644 --- a/operator/scheduler/server.go +++ b/operator/scheduler/server.go @@ -21,6 +21,7 @@ import ( "github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler" "github.com/seldonio/seldon-core/operator/v2/apis/mlops/v1alpha1" + "github.com/seldonio/seldon-core/operator/v2/pkg/constants" ) func (s *SchedulerClient) ServerNotify(ctx context.Context, grpcClient scheduler.SchedulerClient, servers []v1alpha1.Server, isFirstSync bool) error { @@ -114,8 +115,11 @@ func (s *SchedulerClient) SubscribeServerEvents(ctx context.Context, grpcClient // Try to update status retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + contextWithTimeout, cancel := context.WithTimeout(ctx, constants.K8sAPICallsTxTimeout) + defer cancel() + server := &v1alpha1.Server{} - err = s.Get(ctx, client.ObjectKey{Name: event.ServerName, Namespace: event.GetKubernetesMeta().GetNamespace()}, server) + err = s.Get(contextWithTimeout, client.ObjectKey{Name: event.ServerName, Namespace: event.GetKubernetesMeta().GetNamespace()}, server) if err != nil { return err } @@ -125,7 +129,7 @@ func (s *SchedulerClient) SubscribeServerEvents(ctx context.Context, grpcClient } // Handle status update server.Status.LoadedModelReplicas = event.NumLoadedModelReplicas - return s.updateServerStatus(server) + return s.updateServerStatus(contextWithTimeout, server) }) if retryErr != nil { logger.Error(err, "Failed to update status", "model", event.ServerName) @@ -135,8 +139,8 @@ func (s *SchedulerClient) SubscribeServerEvents(ctx context.Context, grpcClient return nil } -func (s *SchedulerClient) updateServerStatus(server *v1alpha1.Server) error { - if err := s.Status().Update(context.TODO(), server); err != nil { +func (s *SchedulerClient) updateServerStatus(ctx context.Context, server *v1alpha1.Server) error { + if err := s.Status().Update(ctx, server); err != nil { s.recorder.Eventf(server, v1.EventTypeWarning, "UpdateFailed", "Failed to update status for Server %q: %v", server.Name, err) return err diff --git a/scheduler/go.mod b/scheduler/go.mod index 1b9ddc1347..a25f454159 100644 --- a/scheduler/go.mod +++ b/scheduler/go.mod @@ -26,8 +26,8 @@ require ( github.com/prometheus/client_golang v1.19.1 github.com/rs/xid v1.6.0 github.com/seldonio/seldon-core/apis/go/v2 v2.0.0-00010101000000-000000000000 - github.com/seldonio/seldon-core/components/tls/v2 v2.0.0-00010101000000-000000000000 github.com/seldonio/seldon-core/components/kafka/v2 v2.0.0-00010101000000-000000000000 + github.com/seldonio/seldon-core/components/tls/v2 v2.0.0-00010101000000-000000000000 github.com/serialx/hashring v0.0.0-20200727003509-22c0c7ab6b1b github.com/signalfx/splunk-otel-go/instrumentation/github.com/confluentinc/confluent-kafka-go/v2/kafka/splunkkafka v1.19.0 github.com/sirupsen/logrus v1.9.3 diff --git a/scheduler/pkg/agent/client.go b/scheduler/pkg/agent/client.go index f0cfb53630..b86d49ffb0 100644 --- a/scheduler/pkg/agent/client.go +++ b/scheduler/pkg/agent/client.go @@ -559,7 +559,7 @@ func (c *Client) getArtifactConfig(request *agent.ModelOperationMessage) ([]byte } - config, err := c.secretsHandler.GetSecretConfig(x.StorageSecretName) + config, err := c.secretsHandler.GetSecretConfig(x.StorageSecretName, util.K8sTimeoutDefault) if err != nil { return nil, err } diff --git a/scheduler/pkg/agent/drainservice/server.go b/scheduler/pkg/agent/drainservice/server.go index 1315dc9658..36e6910cf7 100644 --- a/scheduler/pkg/agent/drainservice/server.go +++ b/scheduler/pkg/agent/drainservice/server.go @@ -19,6 +19,8 @@ import ( "github.com/gorilla/mux" log "github.com/sirupsen/logrus" + + "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" ) const ( @@ -102,7 +104,9 @@ func (drainer *DrainerService) Stop() error { defer drainer.muServerReady.Unlock() var err error if drainer.server != nil { - err = drainer.server.Shutdown(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), util.ServerControlPlaneTimeout) + defer cancel() + err = drainer.server.Shutdown(ctx) } drainer.serverReady = false drainer.logger.Info("Finished graceful shutdown") diff --git a/scheduler/pkg/agent/k8s/secrets.go b/scheduler/pkg/agent/k8s/secrets.go index c8d50d4522..66dd78f47b 100644 --- a/scheduler/pkg/agent/k8s/secrets.go +++ b/scheduler/pkg/agent/k8s/secrets.go @@ -12,6 +12,7 @@ package k8s import ( "context" "fmt" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -29,8 +30,10 @@ func NewSecretsHandler(clientset kubernetes.Interface, namespace string) *Secret } } -func (s *SecretHandler) GetSecretConfig(secretName string) ([]byte, error) { - secret, err := s.clientset.CoreV1().Secrets(s.namespace).Get(context.Background(), secretName, metav1.GetOptions{}) +func (s *SecretHandler) GetSecretConfig(secretName string, timeout time.Duration) ([]byte, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + secret, err := s.clientset.CoreV1().Secrets(s.namespace).Get(ctx, secretName, metav1.GetOptions{}) if err != nil { return nil, err } diff --git a/scheduler/pkg/agent/k8s/secrets_test.go b/scheduler/pkg/agent/k8s/secrets_test.go index 6e4ef2a375..9069414593 100644 --- a/scheduler/pkg/agent/k8s/secrets_test.go +++ b/scheduler/pkg/agent/k8s/secrets_test.go @@ -11,6 +11,7 @@ package k8s import ( "testing" + "time" . "github.com/onsi/gomega" v1 "k8s.io/api/core/v1" @@ -67,7 +68,7 @@ parameters: t.Run(test.name, func(t *testing.T) { fakeClientset := fake.NewSimpleClientset(test.secret) s := NewSecretsHandler(fakeClientset, test.secret.Namespace) - data, err := s.GetSecretConfig(test.secretName) + data, err := s.GetSecretConfig(test.secretName, 1*time.Millisecond) if test.err { g.Expect(err).ToNot(BeNil()) } else { diff --git a/scheduler/pkg/agent/rclone/rclone_config.go b/scheduler/pkg/agent/rclone/rclone_config.go index 4900d90926..1c45ac071f 100644 --- a/scheduler/pkg/agent/rclone/rclone_config.go +++ b/scheduler/pkg/agent/rclone/rclone_config.go @@ -12,6 +12,7 @@ package rclone import ( "github.com/seldonio/seldon-core/scheduler/v2/pkg/agent/config" "github.com/seldonio/seldon-core/scheduler/v2/pkg/agent/k8s" + "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" ) func (r *RCloneClient) loadRcloneConfiguration(config *config.AgentConfiguration) error { @@ -119,7 +120,7 @@ func (r *RCloneClient) loadRcloneSecretsConfiguration(config *config.AgentConfig for _, secret := range config.Rclone.ConfigSecrets { logger.WithField("secret_name", secret).Infof("retrieving Rclone secret") - config, err := secretsHandler.GetSecretConfig(secret) + config, err := secretsHandler.GetSecretConfig(secret, util.K8sTimeoutDefault) if err != nil { return nil, err } diff --git a/scheduler/pkg/agent/rproxy.go b/scheduler/pkg/agent/rproxy.go index db7a067590..1d44858e7f 100644 --- a/scheduler/pkg/agent/rproxy.go +++ b/scheduler/pkg/agent/rproxy.go @@ -242,7 +242,9 @@ func (rp *reverseHTTPProxy) Stop() error { defer rp.mu.Unlock() var err error if rp.server != nil { - err = rp.server.Shutdown(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), util.ServerControlPlaneTimeout) + defer cancel() + err = rp.server.Shutdown(ctx) } rp.serverReady = false rp.logger.Info("Finished graceful shutdown") diff --git a/scheduler/pkg/kafka/gateway/worker.go b/scheduler/pkg/kafka/gateway/worker.go index d126eff71f..728db9ed24 100644 --- a/scheduler/pkg/kafka/gateway/worker.go +++ b/scheduler/pkg/kafka/gateway/worker.go @@ -20,6 +20,7 @@ import ( "net/url" "strconv" "strings" + "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" @@ -158,14 +159,6 @@ func getProtoInferRequest(job *InferWork) (*v2.ModelInferRequest, error) { return &ireq, nil } -// Extract tracing context from Kafka message -func createContextFromKafkaMsg(job *InferWork) context.Context { - ctx := context.Background() - carrierIn := splunkkafka.NewMessageCarrier(job.msg) - ctx = otel.GetTextMapPropagator().Extract(ctx, carrierIn) - return ctx -} - func (iw *InferWorker) Start(jobChan <-chan *InferWork, cancelChan <-chan struct{}) { for { select { @@ -173,8 +166,8 @@ func (iw *InferWorker) Start(jobChan <-chan *InferWork, cancelChan <-chan struct return case job := <-jobChan: - ctx := createContextFromKafkaMsg(job) - err := iw.processRequest(ctx, job) + ctx := createBaseContextFromKafkaMsg(job.msg) + err := iw.processRequest(ctx, job, util.InferTimeoutDefault) if err != nil { iw.logger.WithError(err).Errorf("Failed to process request for model %s", job.modelName) } @@ -182,35 +175,38 @@ func (iw *InferWorker) Start(jobChan <-chan *InferWork, cancelChan <-chan struct } } -func (iw *InferWorker) processRequest(ctx context.Context, job *InferWork) error { +func (iw *InferWorker) processRequest(ctx context.Context, job *InferWork, timeout time.Duration) error { + ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + // Has Type Header if typeValue, ok := job.headers[HeaderKeyType]; ok { switch typeValue { case HeaderValueJsonReq: - return iw.restRequest(ctx, job, false) + return iw.restRequest(ctxWithTimeout, job, false) case HeaderValueJsonRes: - return iw.restRequest(ctx, job, true) + return iw.restRequest(ctxWithTimeout, job, true) case HeaderValueProtoReq: protoRequest, err := getProtoInferRequest(job) if err != nil { return err } - return iw.grpcRequest(ctx, job, protoRequest) + return iw.grpcRequest(ctxWithTimeout, job, protoRequest) case HeaderValueProtoRes: protoRequest, err := getProtoRequestAssumingResponse(job.msg.Value) if err != nil { return err } - return iw.grpcRequest(ctx, job, protoRequest) + return iw.grpcRequest(ctxWithTimeout, job, protoRequest) default: return fmt.Errorf("Header %s with unknown type %s", HeaderKeyType, typeValue) } } else { // Does not have type header - this is the general case to allow easy use protoRequest, err := getProtoInferRequest(job) if err != nil { - return iw.restRequest(ctx, job, true) + return iw.restRequest(ctxWithTimeout, job, true) } else { - return iw.grpcRequest(ctx, job, protoRequest) + return iw.grpcRequest(ctxWithTimeout, job, protoRequest) } } } @@ -404,3 +400,13 @@ func (iw *InferWorker) grpcRequest(ctx context.Context, job *InferWork, req *v2. } return nil } + +// this is redundant code but is kept there to avoid circular dependencies +// todo: refactor tracing pkg in general and remove this +func createBaseContextFromKafkaMsg(msg *kafka.Message) context.Context { + // these are just a base context for a new span + // callers should add timeout, etc for this context as they see fit. + ctx := context.Background() + carrierIn := splunkkafka.NewMessageCarrier(msg) + return otel.GetTextMapPropagator().Extract(ctx, carrierIn) +} diff --git a/scheduler/pkg/kafka/gateway/worker_test.go b/scheduler/pkg/kafka/gateway/worker_test.go index e4008e1a7c..1cfb6089fd 100644 --- a/scheduler/pkg/kafka/gateway/worker_test.go +++ b/scheduler/pkg/kafka/gateway/worker_test.go @@ -15,6 +15,7 @@ import ( "net" "net/http" "testing" + "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/jarcoal/httpmock" @@ -31,6 +32,7 @@ import ( v2 "github.com/seldonio/seldon-core/apis/go/v2/mlops/v2_dataplane" kafka_config "github.com/seldonio/seldon-core/components/kafka/v2/pkg/config" + "github.com/seldonio/seldon-core/scheduler/v2/pkg/internal/testing_utils" kafka2 "github.com/seldonio/seldon-core/scheduler/v2/pkg/kafka" seldontracer "github.com/seldonio/seldon-core/scheduler/v2/pkg/tracing" "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" @@ -66,10 +68,14 @@ func TestRestRequest(t *testing.T) { t.Run(test.name, func(t *testing.T) { httpmock.Activate() defer httpmock.DeactivateAndReset() + + httpPort, _ := testing_utils.GetFreePortForTest() + grpcPort, _ := testing_utils.GetFreePortForTest() + kafkaServerConfig := InferenceServerConfig{ Host: "0.0.0.0", - HttpPort: 1234, - GrpcPort: 1235, + HttpPort: httpPort, + GrpcPort: grpcPort, } kafkaModelConfig := KafkaModelConfig{ ModelName: "foo", @@ -111,10 +117,13 @@ func TestProcessRequestRest(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + httpPort, _ := testing_utils.GetFreePortForTest() + grpcPort, _ := testing_utils.GetFreePortForTest() + kafkaServerConfig := InferenceServerConfig{ Host: "0.0.0.0", - HttpPort: 1234, - GrpcPort: 1235, + HttpPort: httpPort, + GrpcPort: grpcPort, } kafkaModelConfig := KafkaModelConfig{ ModelName: "foo", @@ -134,7 +143,7 @@ func TestProcessRequestRest(t *testing.T) { g.Expect(err).To(BeNil()) iw, err := NewInferWorker(ic, logger, tp, tn) g.Expect(err).To(BeNil()) - err = iw.processRequest(context.Background(), &InferWork{modelName: "foo", msg: &kafka.Message{Value: test.data}}) + err = iw.processRequest(context.Background(), &InferWork{modelName: "foo", msg: &kafka.Message{Value: test.data}}, util.InferTimeoutDefault) g.Expect(err).To(BeNil()) ic.Stop() g.Eventually(httpmock.GetTotalCallCount).Should(Equal(1)) @@ -244,10 +253,14 @@ func TestProcessRequestGrpc(t *testing.T) { t.Run(test.name, func(t *testing.T) { logger := log.New() t.Log("Start test", test.name) + + httpPort, _ := testing_utils.GetFreePortForTest() + grpcPort, _ := testing_utils.GetFreePortForTest() + kafkaServerConfig := InferenceServerConfig{ Host: "0.0.0.0", - HttpPort: 1234, - GrpcPort: 1235, + HttpPort: httpPort, + GrpcPort: grpcPort, } kafkaModelConfig := KafkaModelConfig{ ModelName: "foo", @@ -262,7 +275,7 @@ func TestProcessRequestGrpc(t *testing.T) { g.Eventually(check).Should(BeTrue()) b, err := proto.Marshal(test.req) g.Expect(err).To(BeNil()) - err = iw.processRequest(context.Background(), &InferWork{modelName: "foo", msg: &kafka.Message{Value: b}}) + err = iw.processRequest(context.Background(), &InferWork{modelName: "foo", msg: &kafka.Message{Value: b}}, util.InferTimeoutDefault) g.Expect(err).To(BeNil()) g.Eventually(func() int { return mockMLGrpcServer.recv }).Should(Equal(1)) g.Eventually(ic.producer.Len).Should(Equal(1)) @@ -280,6 +293,7 @@ func TestProcessRequest(t *testing.T) { restCalls int grpcCalls int error bool + timeout time.Duration } getProtoBytes := func(res proto.Message) []byte { b, _ := proto.Marshal(res) @@ -319,6 +333,7 @@ func TestProcessRequest(t *testing.T) { msg: &kafka.Message{Value: []byte{}, Key: []byte{}}, }, grpcCalls: 1, + timeout: util.InferTimeoutDefault, }, { name: "empty json request", @@ -328,6 +343,7 @@ func TestProcessRequest(t *testing.T) { msg: &kafka.Message{Value: []byte("{}"), Key: []byte{}}, }, restCalls: 1, + timeout: util.InferTimeoutDefault, }, { name: "json request", @@ -337,6 +353,7 @@ func TestProcessRequest(t *testing.T) { msg: &kafka.Message{Value: []byte(`{"inputs": [{"name": "predict", "shape": [1, 4], "datatype": "FP32", "data": [[1, 2, 3, 4]]}]}`), Key: []byte{}}, }, restCalls: 1, + timeout: util.InferTimeoutDefault, }, { name: "chain json request", @@ -346,6 +363,7 @@ func TestProcessRequest(t *testing.T) { msg: &kafka.Message{Value: []byte(`{"model_name":"iris_1","model_version":"1","id":"903964e4-2419-41ce-b5d1-3ca0c8df9e0c","parameters":null,"outputs":[{"name":"predict","shape":[1],"datatype":"INT64","parameters":null,"data":[2]}]}`), Key: []byte{}}, }, restCalls: 1, + timeout: util.InferTimeoutDefault, }, { name: "json request with header", @@ -355,6 +373,7 @@ func TestProcessRequest(t *testing.T) { msg: &kafka.Message{Value: []byte(`{"inputs": [{"name": "predict", "shape": [1, 4], "datatype": "FP32", "data": [[1, 2, 3, 4]]}]}`), Key: []byte{}}, }, restCalls: 1, + timeout: util.InferTimeoutDefault, }, { name: "chain json request with header", @@ -364,6 +383,7 @@ func TestProcessRequest(t *testing.T) { msg: &kafka.Message{Value: []byte(`{"model_name":"iris_1","model_version":"1","id":"903964e4-2419-41ce-b5d1-3ca0c8df9e0c","parameters":null,"outputs":[{"name":"predict","shape":[1],"datatype":"INT64","parameters":null,"data":[2]}]}`), Key: []byte{}}, }, restCalls: 1, + timeout: util.InferTimeoutDefault, }, { name: "grpc request without header", @@ -373,6 +393,7 @@ func TestProcessRequest(t *testing.T) { msg: &kafka.Message{Value: getProtoBytes(testRequest), Key: []byte{}}, }, grpcCalls: 1, + timeout: util.InferTimeoutDefault, }, { name: "grpc request with header", @@ -382,6 +403,7 @@ func TestProcessRequest(t *testing.T) { msg: &kafka.Message{Value: getProtoBytes(testRequest), Key: []byte{}}, }, grpcCalls: 1, + timeout: util.InferTimeoutDefault, }, { name: "chained grpc request without header", @@ -391,6 +413,7 @@ func TestProcessRequest(t *testing.T) { msg: &kafka.Message{Value: getProtoBytes(testResponse), Key: []byte{}}, }, grpcCalls: 1, + timeout: util.InferTimeoutDefault, }, { name: "chained grpc request with header", @@ -400,6 +423,7 @@ func TestProcessRequest(t *testing.T) { msg: &kafka.Message{Value: getProtoBytes(testResponse), Key: []byte{}}, }, grpcCalls: 1, + timeout: util.InferTimeoutDefault, }, { name: "json request with proto request header", @@ -408,7 +432,8 @@ func TestProcessRequest(t *testing.T) { headers: map[string]string{HeaderKeyType: HeaderValueProtoReq}, msg: &kafka.Message{Value: []byte(`{"inputs": [{"name": "predict", "shape": [1, 4], "datatype": "FP32", "data": [[1, 2, 3, 4]]}]}`), Key: []byte{}}, }, - error: true, + error: true, + timeout: util.InferTimeoutDefault, }, { name: "json request with proto response header", @@ -417,7 +442,8 @@ func TestProcessRequest(t *testing.T) { headers: map[string]string{HeaderKeyType: HeaderValueProtoRes}, msg: &kafka.Message{Value: []byte(`{"inputs": [{"name": "predict", "shape": [1, 4], "datatype": "FP32", "data": [[1, 2, 3, 4]]}]}`), Key: []byte{}}, }, - error: true, + error: true, + timeout: util.InferTimeoutDefault, }, { name: "grpc request with json header treated as json", //TODO maybe fail in this case as it will fail at server @@ -427,6 +453,17 @@ func TestProcessRequest(t *testing.T) { msg: &kafka.Message{Value: getProtoBytes(testRequest), Key: []byte{}}, }, restCalls: 1, + timeout: util.InferTimeoutDefault, + }, + { + name: "grpc request with header - timeout", + job: &InferWork{ + modelName: "foo", + headers: map[string]string{HeaderKeyType: HeaderValueProtoReq}, + msg: &kafka.Message{Value: getProtoBytes(testRequest), Key: []byte{}}, + }, + grpcCalls: 0, // grpc call will not be made as it will timeout + timeout: time.Nanosecond * 1, }, } for _, test := range tests { @@ -434,10 +471,14 @@ func TestProcessRequest(t *testing.T) { logger := log.New() logger.Infof("Start test %s", test.name) t.Log("Start test", test.name) + + httpPort, _ := testing_utils.GetFreePortForTest() + grpcPort, _ := testing_utils.GetFreePortForTest() + kafkaServerConfig := InferenceServerConfig{ Host: "0.0.0.0", - HttpPort: 1234, - GrpcPort: 1235, + HttpPort: httpPort, + GrpcPort: grpcPort, } kafkaModelConfig := KafkaModelConfig{ ModelName: "foo", @@ -453,7 +494,7 @@ func TestProcessRequest(t *testing.T) { defer ic.Stop() check := creatMockServerHealthFunc(mockMLGrpcServer) g.Eventually(check).Should(BeTrue()) - err := iw.processRequest(context.Background(), test.job) + err := iw.processRequest(context.Background(), test.job, test.timeout) if test.error { g.Expect(err).ToNot(BeNil()) } else { diff --git a/scheduler/pkg/kafka/pipeline/httpserver.go b/scheduler/pkg/kafka/pipeline/httpserver.go index b5ba40eae2..c4da136f22 100644 --- a/scheduler/pkg/kafka/pipeline/httpserver.go +++ b/scheduler/pkg/kafka/pipeline/httpserver.go @@ -68,7 +68,7 @@ func NewGatewayHttpServer(port int, logger log.FieldLogger, } func (g *GatewayHttpServer) Stop() error { - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second*5)) + ctx, cancel := context.WithTimeout(context.Background(), util.ServerControlPlaneTimeout) defer cancel() return g.server.Shutdown(ctx) } diff --git a/scheduler/pkg/kafka/pipeline/multi_topic_consumer.go b/scheduler/pkg/kafka/pipeline/multi_topic_consumer.go index 30af55586f..05c1b0a9e1 100644 --- a/scheduler/pkg/kafka/pipeline/multi_topic_consumer.go +++ b/scheduler/pkg/kafka/pipeline/multi_topic_consumer.go @@ -158,9 +158,7 @@ func (c *MultiTopicsKafkaConsumer) pollAndMatch() error { Debugf("received message") if val, ok := c.requests.Get(string(e.Key)); ok { - ctx := context.Background() - carrierIn := splunkkafka.NewMessageCarrier(e) - ctx = otel.GetTextMapPropagator().Extract(ctx, carrierIn) + ctx := createBaseContextFromKafkaMsg(e) // Add tracing span _, span := c.tracer.Start(ctx, "Consume") @@ -196,3 +194,11 @@ func (c *MultiTopicsKafkaConsumer) pollAndMatch() error { logger.Warning("Ending kafka consumer poll") return nil // assumption here is that the connection has already terminated } + +func createBaseContextFromKafkaMsg(msg *kafka.Message) context.Context { + // these are just a base context for a new span + // callers should add timeout, etc for this context as they see fit. + ctx := context.Background() + carrierIn := splunkkafka.NewMessageCarrier(msg) + return otel.GetTextMapPropagator().Extract(ctx, carrierIn) +} diff --git a/scheduler/pkg/metrics/agent.go b/scheduler/pkg/metrics/agent.go index a3f42b801c..a05768c53b 100644 --- a/scheduler/pkg/metrics/agent.go +++ b/scheduler/pkg/metrics/agent.go @@ -21,6 +21,8 @@ import ( log "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/grpc/status" + + "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" ) // Keep next line as used in docs @@ -521,7 +523,9 @@ func (pm *PrometheusMetrics) Start(port int) error { func (pm *PrometheusMetrics) Stop() error { pm.logger.Info("Graceful shutdown") if pm.server != nil { - return pm.server.Shutdown(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), util.ServerControlPlaneTimeout) + defer cancel() + return pm.server.Shutdown(ctx) } return nil } diff --git a/scheduler/pkg/util/constants.go b/scheduler/pkg/util/constants.go index 3f61402f06..3e0755cf3f 100644 --- a/scheduler/pkg/util/constants.go +++ b/scheduler/pkg/util/constants.go @@ -46,6 +46,21 @@ const ( GRPCControlPlaneTimeout = 1 * time.Minute // For control plane operations except load/unload ) +// K8s API +const ( + K8sTimeoutDefault = 2 * time.Minute +) + +// Servers control plane +const ( + ServerControlPlaneTimeout = time.Second * 5 +) + +// inference +const ( + InferTimeoutDefault = 10 * time.Minute // TODO: expose this as a config (map)? +) + const ( EnvoyUpdateDefaultBatchWait = 250 * time.Millisecond // note that we keep client and server keepalive times the same