diff --git a/gnmi_server/client_subscribe.go b/gnmi_server/client_subscribe.go index 1c2d6f313..7b1b1615f 100644 --- a/gnmi_server/client_subscribe.go +++ b/gnmi_server/client_subscribe.go @@ -30,14 +30,31 @@ type Client struct { // Wait for all sub go routine to finish w sync.WaitGroup fatal bool + logLevel int } +// Syslog level for error +const logLevelError int = 3 +const logLevelDebug int = 7 +const logLevelMax int = logLevelDebug + // NewClient returns a new initialized client. func NewClient(addr net.Addr) *Client { pq := queue.NewPriorityQueue(1, false) return &Client{ addr: addr, q: pq, + logLevel: logLevelError, + } +} + +func (c *Client) setLogLevel(lvl int) { + if (lvl >= 0) { + if lvl < logLevelMax { + c.logLevel = lvl + } else { + c.logLevel = logLevelMax + } } } @@ -121,8 +138,12 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) { } var dc sdc.Client + mode := c.subscribe.GetMode() + if target == "OTHERS" { dc, err = sdc.NewNonDbClient(paths, prefix) + } else if ((target == "EVENTS") && (mode == gnmipb.SubscriptionList_STREAM)) { + dc, err = sdc.NewEventClient(paths, prefix, c.logLevel) } else if _, ok, _, _ := sdc.IsTargetDb(target); ok { dc, err = sdc.NewDbClient(paths, prefix) } else { @@ -134,7 +155,7 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) { return grpc.Errorf(codes.NotFound, "%v", err) } - switch mode := c.subscribe.GetMode(); mode { + switch mode { case gnmipb.SubscriptionList_STREAM: c.stop = make(chan struct{}, 1) c.w.Add(1) diff --git a/gnmi_server/server.go b/gnmi_server/server.go index f21f8d68a..5ef22b5df 100644 --- a/gnmi_server/server.go +++ b/gnmi_server/server.go @@ -47,6 +47,7 @@ type Config struct { // Port for the Server to listen on. If 0 or unset the Server will pick a port // for this Server. Port int64 + LogLevel int UserAuth AuthTypes } @@ -233,6 +234,8 @@ func (s *Server) Subscribe(stream gnmipb.GNMI_SubscribeServer) error { c := NewClient(pr.Addr) + c.setLogLevel(s.config.LogLevel) + s.cMu.Lock() if oc, ok := s.clients[c.String()]; ok { log.V(2).Infof("Delete duplicate client %s", oc) diff --git a/sonic_data_client/events_client.go b/sonic_data_client/events_client.go new file mode 100644 index 000000000..a2c7f870b --- /dev/null +++ b/sonic_data_client/events_client.go @@ -0,0 +1,195 @@ +package client + +/* +#cgo CFLAGS: -g -Wall -I/sonic/src/sonic-swss-common/common -Wformat -Werror=format-security -fPIE +#cgo LDFLAGS: -lpthread -lboost_thread -lboost_system -lzmq -lboost_serialization -luuid -lswsscommon +#include +#include "events_wrap.h" +*/ +import "C" + +import ( + "encoding/json" + "fmt" + "sync" + "time" + "unsafe" + + spb "github.com/Azure/sonic-telemetry/proto" + "github.com/Workiva/go-datastructures/queue" + log "github.com/golang/glog" + gnmipb "github.com/openconfig/gnmi/proto/gnmi" +) + +type EventClient struct { + + prefix *gnmipb.Path + path *gnmipb.Path + + q *queue.PriorityQueue + channel chan struct{} + + wg *sync.WaitGroup // wait for all sub go routines to finish + + subs_handle unsafe.Pointer + + stopped int +} + +const SUBSCRIBER_TIMEOUT = (2 * 1000) // 2 seconds +const HEARTBEAT_TIMEOUT = 2 +const EVENT_BUFFSZ = 4096 +const MISSED_BUFFSZ = 16 + +func NewEventClient(paths []*gnmipb.Path, prefix *gnmipb.Path, logLevel int) (Client, error) { + var evtc EventClient + evtc.prefix = prefix + for _, path := range paths { + // Only one path is expected. Take the last if many + evtc.path = path + } + C.swssSetLogPriority(C.int(logLevel)) + + /* Init subscriber with 2 seconds time out */ + subs_data := make(map[string]interface{}) + subs_data["recv_timeout"] = SUBSCRIBER_TIMEOUT + j, err := json.Marshal(subs_data) + if err != nil { + log.V(3).Infof("events_init_subscriber: Failed to marshal") + return nil, err + } + js := string(j) + evtc.subs_handle = C.events_init_subscriber_wrap(C.CString(js)) + evtc.stopped = 0 + + log.V(7).Infof("NewEventClient constructed. logLevel=%d", logLevel) + + return &evtc, nil +} + +// String returns the target the client is querying. +func (evtc *EventClient) String() string { + return fmt.Sprintf("EventClient Prefix %v", evtc.prefix.GetTarget()) +} + + +func get_events(evtc *EventClient, updateChannel chan string) { + + evt_ptr := C.malloc(C.sizeof_char * EVENT_BUFFSZ) + missed_ptr := C.malloc(C.sizeof_char * MISSED_BUFFSZ) + + defer C.free(unsafe.Pointer(evt_ptr)) + defer C.free(unsafe.Pointer(missed_ptr)) + + c_eptr := (*C.char)(unsafe.Pointer(evt_ptr)) + c_mptr := (*C.char)(unsafe.Pointer(missed_ptr)) + + for { + rc := C.event_receive_wrap(evtc.subs_handle, c_eptr, EVENT_BUFFSZ, c_mptr, MISSED_BUFFSZ) + log.V(7).Infof("C.event_receive_wrap rc=%d evt:%s", rc, (*C.char)(evt_ptr)) + + if rc != 0 { + updateChannel <- C.GoString((*C.char)(evt_ptr)) + } + if evtc.stopped == 1 { + log.V(1).Infof("%v stop channel closed, exiting get_events routine", evtc) + C.events_deinit_subscriber_wrap(evtc.subs_handle) + evtc.subs_handle = nil + return + } + // TODO: Record missed count in stats table. + // intVar, err := strconv.Atoi(C.GoString((*C.char)(c_mptr))) + } +} + + +func send_event(evtc *EventClient, tv *gnmipb.TypedValue) error { + spbv := &spb.Value{ + Prefix: evtc.prefix, + Path: evtc.path, + Timestamp: time.Now().UnixNano(), + Val: tv, + } + + log.V(7).Infof("Sending spbv") + if err := evtc.q.Put(Value{spbv}); err != nil { + log.V(3).Infof("Queue error: %v", err) + return err + } + log.V(7).Infof("send_event done") + return nil +} + +func (evtc *EventClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, wg *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { + hbData := make(map[string]interface{}) + hbData["heart"] = "beat" + hbVal, _ := json.Marshal(hbData) + + hbTv := &gnmipb.TypedValue { + Value: &gnmipb.TypedValue_JsonIetfVal{ + JsonIetfVal: hbVal, + }} + + + evtc.wg = wg + defer evtc.wg.Done() + + evtc.q = q + evtc.channel = stop + + updateChannel := make(chan string) + go get_events(evtc, updateChannel) + + for { + select { + case nextEvent := <-updateChannel: + log.V(7).Infof("update received: %v", nextEvent) + evtTv := &gnmipb.TypedValue { + Value: &gnmipb.TypedValue_StringVal { + StringVal: nextEvent, + }} + if err := send_event(evtc, evtTv); err != nil { + return + } + + case <-IntervalTicker(time.Second * HEARTBEAT_TIMEOUT): + log.V(7).Infof("Ticker received") + if err := send_event(evtc, hbTv); err != nil { + return + } + case <-evtc.channel: + evtc.stopped = 1 + log.V(3).Infof("Channel closed by client") + return + } + } + log.V(3).Infof("Event exiting streamrun") +} + + +func (evtc *EventClient) Get(wg *sync.WaitGroup) ([]*spb.Value, error) { + return nil, nil +} + +func (evtc *EventClient) OnceRun(q *queue.PriorityQueue, once chan struct{}, wg *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { + return +} + +func (evtc *EventClient) PollRun(q *queue.PriorityQueue, poll chan struct{}, wg *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { + return +} + + +func (evtc *EventClient) Close() error { + return nil +} + +func (evtc *EventClient) Set(delete []*gnmipb.Path, replace []*gnmipb.Update, update []*gnmipb.Update) error { + return nil +} +func (evtc *EventClient) Capabilities() []gnmipb.ModelData { + return nil +} + +// cgo LDFLAGS: -L/sonic/target/files/bullseye -lxswsscommon -lpthread -lboost_thread -lboost_system -lzmq -lboost_serialization -luuid -lxxeventxx -Wl,-rpath,/sonic/target/files/bullseye + diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index 4ec580c6b..0d403b6da 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -5,6 +5,7 @@ import ( "crypto/x509" "flag" "io/ioutil" + "strconv" "time" log "github.com/golang/glog" @@ -16,7 +17,7 @@ import ( ) var ( - userAuth = gnmi.AuthTypes{"password": false, "cert": false, "jwt": false} + userAuth = gnmi.AuthTypes{"password": false, "cert": false, "jwt": false} port = flag.Int("port", -1, "port to listen on") // Certificate files. caCert = flag.String("ca_crt", "", "CA certificate for client certificate validation. Optional.") @@ -41,12 +42,12 @@ func main() { defUserAuth = gnmi.AuthTypes{"jwt": false, "password": false, "cert": false} } - if isFlagPassed("client_auth") { - log.V(1).Infof("client_auth provided") - }else { - log.V(1).Infof("client_auth not provided, using defaults.") - userAuth = defUserAuth - } + if isFlagPassed("client_auth") { + log.V(1).Infof("client_auth provided") + }else { + log.V(1).Infof("client_auth not provided, using defaults.") + userAuth = defUserAuth + } switch { case *port <= 0: @@ -58,8 +59,14 @@ func main() { cfg := &gnmi.Config{} cfg.Port = int64(*port) + cfg.LogLevel = 3 var opts []grpc.ServerOption + if val, err := strconv.Atoi(getflag("v")); err == nil { + cfg.LogLevel = val + log.Errorf("flag: log level %v", cfg.LogLevel) + } + if !*noTLS { var certificate tls.Certificate var err error @@ -69,13 +76,13 @@ func main() { log.Exitf("could not load server key pair: %s", err) } } else { - switch { - case *serverCert == "": - log.Errorf("serverCert must be set.") - return - case *serverKey == "": - log.Errorf("serverKey must be set.") - return + switch { + case *serverCert == "": + log.Errorf("serverCert must be set.") + return + case *serverKey == "": + log.Errorf("serverKey must be set.") + return } certificate, err = tls.LoadX509KeyPair(*serverCert, *serverKey) if err != nil { @@ -144,11 +151,21 @@ func main() { } func isFlagPassed(name string) bool { - found := false - flag.Visit(func(f *flag.Flag) { - if f.Name == name { - found = true - } - }) - return found + found := false + flag.Visit(func(f *flag.Flag) { + if f.Name == name { + found = true + } + }) + return found +} + +func getflag(name string) string { + val := "" + flag.VisitAll(func(f *flag.Flag) { + if f.Name == name { + val = f.Value.String() + } + }) + return val }