Skip to content

Commit

Permalink
Adds Health gRPC Server and Refactors Main() (kubernetes-sigs#148)
Browse files Browse the repository at this point in the history
* Add health gRPC server and refactors main()

- Introduced a health gRPC server to handle liveness and readiness probes.
- Refactored main() to manage server goroutines.
- Added graceful shutdown for servers and controller manager.
- Improved logging consistency and ensured.
- Validates CLI flags.

Signed-off-by: Daneyon Hansen <[email protected]>

* Refactors health server to use data store

Signed-off-by: Daneyon Hansen <[email protected]>

---------

Signed-off-by: Daneyon Hansen <[email protected]>
  • Loading branch information
danehans authored and kfswain committed Jan 15, 2025
1 parent 8feca7d commit 8dd0dcb
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 75 deletions.
11 changes: 9 additions & 2 deletions pkg/ext-proc/backend/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func (ds *K8sDatastore) setInferencePool(pool *v1alpha1.InferencePool) {
func (ds *K8sDatastore) getInferencePool() (*v1alpha1.InferencePool, error) {
ds.poolMu.RLock()
defer ds.poolMu.RUnlock()
if ds.inferencePool == nil {
return nil, errors.New("InferencePool hasn't been initialized yet")
if !ds.HasSynced() {
return nil, errors.New("InferencePool is not initialized in data store")
}
return ds.inferencePool, nil
}
Expand All @@ -75,6 +75,13 @@ func (s *K8sDatastore) FetchModelData(modelName string) (returnModel *v1alpha1.I
return
}

// HasSynced returns true if InferencePool is set in the data store.
func (ds *K8sDatastore) HasSynced() bool {
ds.poolMu.RLock()
defer ds.poolMu.RUnlock()
return ds.inferencePool != nil
}

func RandomWeightedDraw(model *v1alpha1.InferenceModel, seed int64) string {
var weights int32

Expand Down
39 changes: 39 additions & 0 deletions pkg/ext-proc/backend/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,47 @@ import (
"testing"

"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestHasSynced(t *testing.T) {
tests := []struct {
name string
inferencePool *v1alpha1.InferencePool
hasSynced bool
}{
{
name: "Ready when InferencePool exists in data store",
inferencePool: &v1alpha1.InferencePool{
ObjectMeta: v1.ObjectMeta{
Name: "test-pool",
Namespace: "default",
},
},
hasSynced: true,
},
{
name: "Not ready when InferencePool is nil in data store",
inferencePool: nil,
hasSynced: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
datastore := NewK8sDataStore()
// Set the inference pool
if tt.inferencePool != nil {
datastore.setInferencePool(tt.inferencePool)
}
// Check if the data store has been initialized
hasSynced := datastore.HasSynced()
if hasSynced != tt.hasSynced {
t.Errorf("IsInitialized() = %v, want %v", hasSynced, tt.hasSynced)
}
})
}
}

func TestRandomWeightedDraw(t *testing.T) {
tests := []struct {
name string
Expand Down
28 changes: 28 additions & 0 deletions pkg/ext-proc/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package main

import (
"context"

"google.golang.org/grpc/codes"
healthPb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
klog "k8s.io/klog/v2"
)

type healthServer struct {
datastore *backend.K8sDatastore
}

func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) {
if !s.datastore.HasSynced() {
klog.Infof("gRPC health check not serving: %s", in.String())
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_NOT_SERVING}, nil
}
klog.Infof("gRPC health check serving: %s", in.String())
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil
}

func (s *healthServer) Watch(in *healthPb.HealthCheckRequest, srv healthPb.Health_WatchServer) error {
return status.Error(codes.Unimplemented, "Watch is not implemented")
}
178 changes: 106 additions & 72 deletions pkg/ext-proc/main.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
package main

import (
"context"
"flag"
"fmt"
"net"
"os"
"os/signal"
"syscall"
"time"

extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
healthPb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend/vllm"
Expand All @@ -29,10 +23,14 @@ import (
)

var (
port = flag.Int(
"port",
grpcPort = flag.Int(
"grpcPort",
9002,
"gRPC port")
"The gRPC port used for communicating with Envoy proxy")
grpcHealthPort = flag.Int(
"grpcHealthPort",
9003,
"The port used for gRPC liveness and readiness probes")
targetPodHeader = flag.String(
"targetPodHeader",
"target-pod",
Expand Down Expand Up @@ -65,55 +63,39 @@ var (
scheme = runtime.NewScheme()
)

type healthServer struct{}

func (s *healthServer) Check(
ctx context.Context,
in *healthPb.HealthCheckRequest,
) (*healthPb.HealthCheckResponse, error) {
klog.Infof("Handling grpc Check request + %s", in.String())
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil
}

func (s *healthServer) Watch(in *healthPb.HealthCheckRequest, srv healthPb.Health_WatchServer) error {
return status.Error(codes.Unimplemented, "Watch is not implemented")
}

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(v1alpha1.AddToScheme(scheme))
}

func main() {

klog.InitFlags(nil)
flag.Parse()

ctrl.SetLogger(klog.TODO())

// Validate flags
if err := validateFlags(); err != nil {
klog.Fatalf("Failed to validate flags: %v", err)
}

// Print all flag values
flags := "Flags: "
flag.VisitAll(func(f *flag.Flag) {
flags += fmt.Sprintf("%s=%v; ", f.Name, f.Value)
})
klog.Info(flags)

klog.Infof("Listening on %q", fmt.Sprintf(":%d", *port))
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
// Create a new manager to manage controllers
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme})
if err != nil {
klog.Fatalf("failed to listen: %v", err)
klog.Fatalf("Failed to create controller manager: %v", err)
}

// Create the data store used to cache watched resources
datastore := backend.NewK8sDataStore()

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
})
if err != nil {
klog.Error(err, "unable to start manager")
os.Exit(1)
}

// Create the controllers and register them with the manager
if err := (&backend.InferencePoolReconciler{
Datastore: datastore,
Scheme: mgr.GetScheme(),
Expand All @@ -124,7 +106,7 @@ func main() {
},
Record: mgr.GetEventRecorderFor("InferencePool"),
}).SetupWithManager(mgr); err != nil {
klog.Error(err, "Error setting up InferencePoolReconciler")
klog.Fatalf("Failed setting up InferencePoolReconciler: %v", err)
}

if err := (&backend.InferenceModelReconciler{
Expand All @@ -137,7 +119,7 @@ func main() {
},
Record: mgr.GetEventRecorderFor("InferenceModel"),
}).SetupWithManager(mgr); err != nil {
klog.Error(err, "Error setting up InferenceModelReconciler")
klog.Fatalf("Failed setting up InferenceModelReconciler: %v", err)
}

if err := (&backend.EndpointSliceReconciler{
Expand All @@ -148,53 +130,105 @@ func main() {
ServiceName: *serviceName,
Zone: *zone,
}).SetupWithManager(mgr); err != nil {
klog.Error(err, "Error setting up EndpointSliceReconciler")
klog.Fatalf("Failed setting up EndpointSliceReconciler: %v", err)
}

// Start health and ext-proc servers in goroutines
healthSvr := startHealthServer(datastore, *grpcHealthPort)
extProcSvr := startExternalProcessorServer(
datastore,
*grpcPort,
*refreshPodsInterval,
*refreshMetricsInterval,
*targetPodHeader,
)

// Start the controller manager. Blocking and will return when shutdown is complete.
klog.Infof("Starting controller manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
klog.Fatalf("Error starting controller manager: %v", err)
}
klog.Info("Controller manager shutting down")

// Gracefully shutdown servers
if healthSvr != nil {
klog.Info("Health server shutting down")
healthSvr.GracefulStop()
}
if extProcSvr != nil {
klog.Info("Ext-proc server shutting down")
extProcSvr.GracefulStop()
}

klog.Info("All components shutdown")
}

// startHealthServer starts the gRPC health probe server in a goroutine.
func startHealthServer(ds *backend.K8sDatastore, port int) *grpc.Server {
svr := grpc.NewServer()
healthPb.RegisterHealthServer(svr, &healthServer{datastore: ds})

errChan := make(chan error)
go func() {
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
klog.Error(err, "Error running manager")
errChan <- err
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
klog.Fatalf("Health server failed to listen: %v", err)
}
klog.Infof("Health server listening on port: %d", port)

// Blocking and will return when shutdown is complete.
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
klog.Fatalf("Health server failed: %v", err)
}
klog.Info("Health server shutting down")
}()
return svr
}

s := grpc.NewServer()
// startExternalProcessorServer starts the Envoy external processor server in a goroutine.
func startExternalProcessorServer(
datastore *backend.K8sDatastore,
port int,
refreshPodsInterval, refreshMetricsInterval time.Duration,
targetPodHeader string,
) *grpc.Server {
svr := grpc.NewServer()

pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore)
if err := pp.Init(*refreshPodsInterval, *refreshMetricsInterval); err != nil {
klog.Fatalf("failed to initialize: %v", err)
}
extProcPb.RegisterExternalProcessorServer(
s,
handlers.NewServer(
pp,
scheduling.NewScheduler(pp),
*targetPodHeader,
datastore))
healthPb.RegisterHealthServer(s, &healthServer{})

klog.Infof("Starting gRPC server on port :%v", *port)

// shutdown
var gracefulStop = make(chan os.Signal, 1)
signal.Notify(gracefulStop, syscall.SIGTERM)
signal.Notify(gracefulStop, syscall.SIGINT)
go func() {
select {
case sig := <-gracefulStop:
klog.Infof("caught sig: %+v", sig)
os.Exit(0)
case err := <-errChan:
klog.Infof("caught error in controller: %+v", err)
os.Exit(0)
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
klog.Fatalf("Ext-proc server failed to listen: %v", err)
}
klog.Infof("Ext-proc server listening on port: %d", port)

// Initialize backend provider
pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore)
if err := pp.Init(refreshPodsInterval, refreshMetricsInterval); err != nil {
klog.Fatalf("Failed to initialize backend provider: %v", err)
}

// Register ext_proc handlers
extProcPb.RegisterExternalProcessorServer(
svr,
handlers.NewServer(pp, scheduling.NewScheduler(pp), targetPodHeader, datastore),
)

// Blocking and will return when shutdown is complete.
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
klog.Fatalf("Ext-proc server failed: %v", err)
}
klog.Info("Ext-proc server shutting down")
}()
return svr
}

err = s.Serve(lis)
if err != nil {
klog.Fatalf("Ext-proc failed with the err: %v", err)
func validateFlags() error {
if *poolName == "" {
return fmt.Errorf("required %q flag not set", "poolName")
}

if *serviceName == "" {
return fmt.Errorf("required %q flag not set", "serviceName")
}

return nil
}
18 changes: 17 additions & 1 deletion pkg/manifests/ext_proc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ roleRef:
kind: ClusterRole
name: pod-read
---

apiVersion: apps/v1
kind: Deployment
metadata:
Expand Down Expand Up @@ -57,8 +56,25 @@ spec:
- "3"
- -serviceName
- "vllm-llama2-7b-pool"
- -grpcPort
- "9002"
- -grpcHealthPort
- "9003"
ports:
- containerPort: 9002
- containerPort: 9003
livenessProbe:
grpc:
port: 9003
service: inference-extension
initialDelaySeconds: 5
periodSeconds: 10
readinessProbe:
grpc:
port: 9003
service: inference-extension
initialDelaySeconds: 5
periodSeconds: 10
---
apiVersion: v1
kind: Service
Expand Down

0 comments on commit 8dd0dcb

Please sign in to comment.