From 62f1c98acb6e510f2c2e0f1c2fd8ba4d45997f02 Mon Sep 17 00:00:00 2001 From: Karim Radhouani Date: Thu, 14 Dec 2023 21:36:11 -0800 Subject: [PATCH] add outputs metrics with listen command --- pkg/cmd/listener/listener.go | 54 ++++++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 18 deletions(-) diff --git a/pkg/cmd/listener/listener.go b/pkg/cmd/listener/listener.go index 2089e53b..543afaa8 100644 --- a/pkg/cmd/listener/listener.go +++ b/pkg/cmd/listener/listener.go @@ -15,6 +15,7 @@ import ( "io" "net" "net/http" + "os" "github.com/fullstorydev/grpcurl" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" @@ -22,15 +23,17 @@ import ( "github.com/jhump/protoreflect/dynamic" nokiasros "github.com/karimra/sros-dialout" "github.com/openconfig/gnmi/proto/gnmi" - "github.com/openconfig/gnmic/pkg/app" - "github.com/openconfig/gnmic/pkg/outputs" - "github.com/openconfig/gnmic/pkg/utils" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/cobra" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" + + "github.com/openconfig/gnmic/pkg/app" + "github.com/openconfig/gnmic/pkg/outputs" + "github.com/openconfig/gnmic/pkg/utils" ) // New returns the listen command tree. @@ -38,20 +41,38 @@ func New(gApp *app.App) *cobra.Command { cmd := &cobra.Command{ Use: "listen", Short: "listens for telemetry dialout updates from the node", - PreRun: func(cmd *cobra.Command, _ []string) { + PreRunE: func(cmd *cobra.Command, _ []string) error { gApp.Config.SetLocalFlagsFromFile(cmd) + if len(gApp.Config.Address) == 0 { + return fmt.Errorf("no address specified") + } + if len(gApp.Config.Address) > 1 { + fmt.Fprintf(os.Stderr, "multiple addresses specified, listening only on %s\n", gApp.Config.Address[0]) + } + return nil }, - RunE: func(_ *cobra.Command, _ []string) error { - ctx, cancel := context.WithCancel(context.Background()) + RunE: func(cmd *cobra.Command, _ []string) error { + ctx, cancel := context.WithCancel(cmd.Context()) defer cancel() server := new(dialoutTelemetryServer) server.ctx = ctx - if len(gApp.Config.Address) == 0 { - return fmt.Errorf("no address specified") + + opts := []grpc.ServerOption{ + grpc.MaxConcurrentStreams(gApp.Config.LocalFlags.ListenMaxConcurrentStreams), } - if len(gApp.Config.Address) > 1 { - fmt.Printf("multiple addresses specified, listening only on %s\n", gApp.Config.Address[0]) + if gApp.Config.MaxMsgSize > 0 { + opts = append(opts, grpc.MaxRecvMsgSize(gApp.Config.MaxMsgSize)) + } + + if gApp.Config.LocalFlags.ListenPrometheusAddress != "" { + server.reg = prometheus.NewRegistry() + grpcMetrics := grpc_prometheus.NewServerMetrics() + opts = append(opts, + grpc.StreamInterceptor(grpcMetrics.StreamServerInterceptor()), + ) + server.reg.MustRegister(grpcMetrics) } + if len(gApp.Config.ProtoFile) > 0 { gApp.Logger.Printf("loading proto files...") descSource, err := grpcurl.DescriptorSourceFromProtoFiles(gApp.Config.ProtoDir, gApp.Config.ProtoFile...) @@ -81,6 +102,7 @@ func New(gApp *app.App) *cobra.Command { if err != nil { return err } + for name, outConf := range outCfgs { if outType, ok := outConf["type"]; ok { if initializer, ok := outputs.Outputs[outType.(string)]; ok { @@ -90,6 +112,7 @@ func New(gApp *app.App) *cobra.Command { outputs.WithEventProcessors(procCfg, gApp.Logger, nil, actCfg), outputs.WithName(gApp.Config.InstanceName), outputs.WithClusterName(gApp.Config.ClusterName), + outputs.WithRegistry(server.reg), ) server.Outputs[name] = out } @@ -101,18 +124,12 @@ func New(gApp *app.App) *cobra.Command { o.Close() } }() + server.listener, err = net.Listen("tcp", gApp.Config.Address[0]) if err != nil { return err } gApp.Logger.Printf("waiting for connections on %s", gApp.Config.Address[0]) - var opts []grpc.ServerOption - if gApp.Config.MaxMsgSize > 0 { - opts = append(opts, grpc.MaxRecvMsgSize(gApp.Config.MaxMsgSize)) - } - opts = append(opts, - grpc.MaxConcurrentStreams(gApp.Config.LocalFlags.ListenMaxConcurrentStreams), - grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor)) if gApp.Config.TLSKey != "" && gApp.Config.TLSCert != "" { tlsConfig, err := utils.NewTLSConfig( @@ -136,7 +153,7 @@ func New(gApp *app.App) *cobra.Command { grpc_prometheus.Register(server.grpcServer) httpServer := &http.Server{ - Handler: promhttp.Handler(), + Handler: promhttp.HandlerFor(server.reg, promhttp.HandlerOpts{}), Addr: gApp.Config.LocalFlags.ListenPrometheusAddress, } go func() { @@ -170,6 +187,7 @@ type dialoutTelemetryServer struct { ctx context.Context gApp *app.App + reg *prometheus.Registry } func (s *dialoutTelemetryServer) Publish(stream nokiasros.DialoutTelemetry_PublishServer) error {