From 1b1d13953d2d3dd85d5d54a8709070fb39d41046 Mon Sep 17 00:00:00 2001 From: Daneyon Hansen Date: Fri, 10 Jan 2025 14:26:31 -0800 Subject: [PATCH] Adds Health gRPC Server and Refactors Main() (#148) * Add health gRPC server and refactors main() - Introduced a health gRPC server to handle liveness and readiness probes. - Refactored main() to manage server goroutines. - Added graceful shutdown for servers and controller manager. - Improved logging consistency and ensured. - Validates CLI flags. Signed-off-by: Daneyon Hansen * Refactors health server to use data store Signed-off-by: Daneyon Hansen --------- Signed-off-by: Daneyon Hansen --- pkg/ext-proc/backend/datastore.go | 11 +- pkg/ext-proc/backend/datastore_test.go | 39 ++++++ pkg/ext-proc/health.go | 28 ++++ pkg/ext-proc/main.go | 178 +++++++++++++++---------- pkg/manifests/ext_proc.yaml | 18 ++- 5 files changed, 199 insertions(+), 75 deletions(-) create mode 100644 pkg/ext-proc/health.go diff --git a/pkg/ext-proc/backend/datastore.go b/pkg/ext-proc/backend/datastore.go index f1e6379d..70f000b8 100644 --- a/pkg/ext-proc/backend/datastore.go +++ b/pkg/ext-proc/backend/datastore.go @@ -52,8 +52,8 @@ func (ds *K8sDatastore) setInferencePool(pool *v1alpha1.InferencePool) { func (ds *K8sDatastore) getInferencePool() (*v1alpha1.InferencePool, error) { ds.poolMu.RLock() defer ds.poolMu.RUnlock() - if ds.inferencePool == nil { - return nil, errors.New("InferencePool hasn't been initialized yet") + if !ds.HasSynced() { + return nil, errors.New("InferencePool is not initialized in data store") } return ds.inferencePool, nil } @@ -75,6 +75,13 @@ func (s *K8sDatastore) FetchModelData(modelName string) (returnModel *v1alpha1.I return } +// HasSynced returns true if InferencePool is set in the data store. +func (ds *K8sDatastore) HasSynced() bool { + ds.poolMu.RLock() + defer ds.poolMu.RUnlock() + return ds.inferencePool != nil +} + func RandomWeightedDraw(model *v1alpha1.InferenceModel, seed int64) string { var weights int32 diff --git a/pkg/ext-proc/backend/datastore_test.go b/pkg/ext-proc/backend/datastore_test.go index 57204eb0..d4ad48e1 100644 --- a/pkg/ext-proc/backend/datastore_test.go +++ b/pkg/ext-proc/backend/datastore_test.go @@ -4,8 +4,47 @@ import ( "testing" "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +func TestHasSynced(t *testing.T) { + tests := []struct { + name string + inferencePool *v1alpha1.InferencePool + hasSynced bool + }{ + { + name: "Ready when InferencePool exists in data store", + inferencePool: &v1alpha1.InferencePool{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-pool", + Namespace: "default", + }, + }, + hasSynced: true, + }, + { + name: "Not ready when InferencePool is nil in data store", + inferencePool: nil, + hasSynced: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + datastore := NewK8sDataStore() + // Set the inference pool + if tt.inferencePool != nil { + datastore.setInferencePool(tt.inferencePool) + } + // Check if the data store has been initialized + hasSynced := datastore.HasSynced() + if hasSynced != tt.hasSynced { + t.Errorf("IsInitialized() = %v, want %v", hasSynced, tt.hasSynced) + } + }) + } +} + func TestRandomWeightedDraw(t *testing.T) { tests := []struct { name string diff --git a/pkg/ext-proc/health.go b/pkg/ext-proc/health.go new file mode 100644 index 00000000..488851eb --- /dev/null +++ b/pkg/ext-proc/health.go @@ -0,0 +1,28 @@ +package main + +import ( + "context" + + "google.golang.org/grpc/codes" + healthPb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" + "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + klog "k8s.io/klog/v2" +) + +type healthServer struct { + datastore *backend.K8sDatastore +} + +func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) { + if !s.datastore.HasSynced() { + klog.Infof("gRPC health check not serving: %s", in.String()) + return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_NOT_SERVING}, nil + } + klog.Infof("gRPC health check serving: %s", in.String()) + return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil +} + +func (s *healthServer) Watch(in *healthPb.HealthCheckRequest, srv healthPb.Health_WatchServer) error { + return status.Error(codes.Unimplemented, "Watch is not implemented") +} diff --git a/pkg/ext-proc/main.go b/pkg/ext-proc/main.go index e8a41667..e42d8e4f 100644 --- a/pkg/ext-proc/main.go +++ b/pkg/ext-proc/main.go @@ -1,20 +1,14 @@ package main import ( - "context" "flag" "fmt" "net" - "os" - "os/signal" - "syscall" "time" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "google.golang.org/grpc" - "google.golang.org/grpc/codes" healthPb "google.golang.org/grpc/health/grpc_health_v1" - "google.golang.org/grpc/status" "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1" "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend/vllm" @@ -29,10 +23,14 @@ import ( ) var ( - port = flag.Int( - "port", + grpcPort = flag.Int( + "grpcPort", 9002, - "gRPC port") + "The gRPC port used for communicating with Envoy proxy") + grpcHealthPort = flag.Int( + "grpcHealthPort", + 9003, + "The port used for gRPC liveness and readiness probes") targetPodHeader = flag.String( "targetPodHeader", "target-pod", @@ -65,32 +63,22 @@ var ( scheme = runtime.NewScheme() ) -type healthServer struct{} - -func (s *healthServer) Check( - ctx context.Context, - in *healthPb.HealthCheckRequest, -) (*healthPb.HealthCheckResponse, error) { - klog.Infof("Handling grpc Check request + %s", in.String()) - return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil -} - -func (s *healthServer) Watch(in *healthPb.HealthCheckRequest, srv healthPb.Health_WatchServer) error { - return status.Error(codes.Unimplemented, "Watch is not implemented") -} - func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(v1alpha1.AddToScheme(scheme)) } func main() { - klog.InitFlags(nil) flag.Parse() ctrl.SetLogger(klog.TODO()) + // Validate flags + if err := validateFlags(); err != nil { + klog.Fatalf("Failed to validate flags: %v", err) + } + // Print all flag values flags := "Flags: " flag.VisitAll(func(f *flag.Flag) { @@ -98,22 +86,16 @@ func main() { }) klog.Info(flags) - klog.Infof("Listening on %q", fmt.Sprintf(":%d", *port)) - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) + // Create a new manager to manage controllers + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme}) if err != nil { - klog.Fatalf("failed to listen: %v", err) + klog.Fatalf("Failed to create controller manager: %v", err) } + // Create the data store used to cache watched resources datastore := backend.NewK8sDataStore() - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ - Scheme: scheme, - }) - if err != nil { - klog.Error(err, "unable to start manager") - os.Exit(1) - } - + // Create the controllers and register them with the manager if err := (&backend.InferencePoolReconciler{ Datastore: datastore, Scheme: mgr.GetScheme(), @@ -124,7 +106,7 @@ func main() { }, Record: mgr.GetEventRecorderFor("InferencePool"), }).SetupWithManager(mgr); err != nil { - klog.Error(err, "Error setting up InferencePoolReconciler") + klog.Fatalf("Failed setting up InferencePoolReconciler: %v", err) } if err := (&backend.InferenceModelReconciler{ @@ -137,7 +119,7 @@ func main() { }, Record: mgr.GetEventRecorderFor("InferenceModel"), }).SetupWithManager(mgr); err != nil { - klog.Error(err, "Error setting up InferenceModelReconciler") + klog.Fatalf("Failed setting up InferenceModelReconciler: %v", err) } if err := (&backend.EndpointSliceReconciler{ @@ -148,53 +130,105 @@ func main() { ServiceName: *serviceName, Zone: *zone, }).SetupWithManager(mgr); err != nil { - klog.Error(err, "Error setting up EndpointSliceReconciler") + klog.Fatalf("Failed setting up EndpointSliceReconciler: %v", err) + } + + // Start health and ext-proc servers in goroutines + healthSvr := startHealthServer(datastore, *grpcHealthPort) + extProcSvr := startExternalProcessorServer( + datastore, + *grpcPort, + *refreshPodsInterval, + *refreshMetricsInterval, + *targetPodHeader, + ) + + // Start the controller manager. Blocking and will return when shutdown is complete. + klog.Infof("Starting controller manager") + if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + klog.Fatalf("Error starting controller manager: %v", err) + } + klog.Info("Controller manager shutting down") + + // Gracefully shutdown servers + if healthSvr != nil { + klog.Info("Health server shutting down") + healthSvr.GracefulStop() } + if extProcSvr != nil { + klog.Info("Ext-proc server shutting down") + extProcSvr.GracefulStop() + } + + klog.Info("All components shutdown") +} + +// startHealthServer starts the gRPC health probe server in a goroutine. +func startHealthServer(ds *backend.K8sDatastore, port int) *grpc.Server { + svr := grpc.NewServer() + healthPb.RegisterHealthServer(svr, &healthServer{datastore: ds}) - errChan := make(chan error) go func() { - if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { - klog.Error(err, "Error running manager") - errChan <- err + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + klog.Fatalf("Health server failed to listen: %v", err) } + klog.Infof("Health server listening on port: %d", port) + + // Blocking and will return when shutdown is complete. + if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped { + klog.Fatalf("Health server failed: %v", err) + } + klog.Info("Health server shutting down") }() + return svr +} - s := grpc.NewServer() +// startExternalProcessorServer starts the Envoy external processor server in a goroutine. +func startExternalProcessorServer( + datastore *backend.K8sDatastore, + port int, + refreshPodsInterval, refreshMetricsInterval time.Duration, + targetPodHeader string, +) *grpc.Server { + svr := grpc.NewServer() - pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore) - if err := pp.Init(*refreshPodsInterval, *refreshMetricsInterval); err != nil { - klog.Fatalf("failed to initialize: %v", err) - } - extProcPb.RegisterExternalProcessorServer( - s, - handlers.NewServer( - pp, - scheduling.NewScheduler(pp), - *targetPodHeader, - datastore)) - healthPb.RegisterHealthServer(s, &healthServer{}) - - klog.Infof("Starting gRPC server on port :%v", *port) - - // shutdown - var gracefulStop = make(chan os.Signal, 1) - signal.Notify(gracefulStop, syscall.SIGTERM) - signal.Notify(gracefulStop, syscall.SIGINT) go func() { - select { - case sig := <-gracefulStop: - klog.Infof("caught sig: %+v", sig) - os.Exit(0) - case err := <-errChan: - klog.Infof("caught error in controller: %+v", err) - os.Exit(0) + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + klog.Fatalf("Ext-proc server failed to listen: %v", err) } + klog.Infof("Ext-proc server listening on port: %d", port) + // Initialize backend provider + pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore) + if err := pp.Init(refreshPodsInterval, refreshMetricsInterval); err != nil { + klog.Fatalf("Failed to initialize backend provider: %v", err) + } + + // Register ext_proc handlers + extProcPb.RegisterExternalProcessorServer( + svr, + handlers.NewServer(pp, scheduling.NewScheduler(pp), targetPodHeader, datastore), + ) + + // Blocking and will return when shutdown is complete. + if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped { + klog.Fatalf("Ext-proc server failed: %v", err) + } + klog.Info("Ext-proc server shutting down") }() + return svr +} - err = s.Serve(lis) - if err != nil { - klog.Fatalf("Ext-proc failed with the err: %v", err) +func validateFlags() error { + if *poolName == "" { + return fmt.Errorf("required %q flag not set", "poolName") + } + + if *serviceName == "" { + return fmt.Errorf("required %q flag not set", "serviceName") } + return nil } diff --git a/pkg/manifests/ext_proc.yaml b/pkg/manifests/ext_proc.yaml index 7ef31825..a9141071 100644 --- a/pkg/manifests/ext_proc.yaml +++ b/pkg/manifests/ext_proc.yaml @@ -28,7 +28,6 @@ roleRef: kind: ClusterRole name: pod-read --- - apiVersion: apps/v1 kind: Deployment metadata: @@ -57,8 +56,25 @@ spec: - "3" - -serviceName - "vllm-llama2-7b-pool" + - -grpcPort + - "9002" + - -grpcHealthPort + - "9003" ports: - containerPort: 9002 + - containerPort: 9003 + livenessProbe: + grpc: + port: 9003 + service: inference-extension + initialDelaySeconds: 5 + periodSeconds: 10 + readinessProbe: + grpc: + port: 9003 + service: inference-extension + initialDelaySeconds: 5 + periodSeconds: 10 --- apiVersion: v1 kind: Service