-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add gNMI Extension field parsing support #509
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,14 +67,22 @@ func (a *App) StartCollector(ctx context.Context) { | |
select { | ||
case rsp := <-rspChan: | ||
subscribeResponseReceivedCounter.WithLabelValues(t.Config.Name, rsp.SubscriptionConfig.Name).Add(1) | ||
if a.Config.Debug { | ||
a.Logger.Printf("target %q: gNMI Subscribe Response: %+v", t.Config.Name, rsp) | ||
// decode gNMI extensions | ||
if extensions := rsp.Response.Extension; len(extensions) > 0 { | ||
err := t.DecodeExtension(rsp.Response) | ||
if err != nil { | ||
a.Logger.Printf("target %q: failed to decode extension field: %v", t.Config.Name, err) | ||
continue | ||
} | ||
} | ||
err := t.DecodeProtoBytes(rsp.Response) | ||
if err != nil { | ||
a.Logger.Printf("target %q: failed to decode proto bytes: %v", t.Config.Name, err) | ||
continue | ||
} | ||
Comment on lines
+70
to
82
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We shouldn't drop the subscribe response if we fail to decode the extension. |
||
if a.Config.Debug { | ||
a.Logger.Printf("target %q: gNMI Subscribe Response: %+v", t.Config.Name, rsp) | ||
} | ||
Comment on lines
+83
to
+85
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you move this back under the case statement to that the debug logging always happens regardless of decoding succeeding or not ? |
||
m := outputs.Meta{ | ||
"source": t.Config.Name, | ||
"format": a.Config.Format, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,9 @@ import ( | |
|
||
"github.com/fullstorydev/grpcurl" | ||
|
||
"github.com/jhump/protoreflect/desc" | ||
"github.com/jhump/protoreflect/desc/protoparse" | ||
|
||
"github.com/openconfig/gnmic/pkg/api/target" | ||
"github.com/openconfig/gnmic/pkg/api/types" | ||
) | ||
|
@@ -39,6 +42,10 @@ func (a *App) initTarget(tc *types.TargetConfig) (*target.Target, error) { | |
if err != nil { | ||
return nil, err | ||
} | ||
err = a.parseExtensionProtos(t) | ||
if err != nil { | ||
return nil, err | ||
} | ||
a.Targets[t.Config.Name] = t | ||
return t, nil | ||
} | ||
|
@@ -155,6 +162,34 @@ func (a *App) parseProtoFiles(t *target.Target) error { | |
return nil | ||
} | ||
|
||
// Dynamically parse (and load) protobuf files defined in config for specific extension IDs | ||
func (a *App) parseExtensionProtos(t *target.Target) error { | ||
parser := protoparse.Parser{} | ||
extensionProtoMap := make(map[int]*desc.MessageDescriptor) | ||
a.Logger.Printf("Target %q loading protofiles for gNMI extensions", t.Config.Name) | ||
if len(t.Config.RegisteredExtensions) == 0 { | ||
return nil | ||
} | ||
Comment on lines
+170
to
+172
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would move this to the beginning of the function |
||
for _, extension := range t.Config.RegisteredExtensions { | ||
descSources, err := parser.ParseFiles(extension.ProtoFile) | ||
if err != nil { | ||
a.Logger.Printf("target %q could not load protofile: %s: %v", t.Config.Name, extension.ProtoFile, err) | ||
return err | ||
} | ||
// Only a single file is ever provided to ParseFiles, so we can just grab offset 0 from the returned slice | ||
// Verify if the provided message exists in the proto and assign | ||
if desc := descSources[0].FindMessage(extension.MessageName); desc != nil { | ||
extensionProtoMap[extension.Id] = desc | ||
} else { | ||
a.Logger.Printf("target %q could not find message %q", t.Config.Name, extension.MessageName) | ||
return fmt.Errorf("target %q could not find message %q", t.Config.Name, extension.MessageName) | ||
} | ||
} | ||
t.ExtensionProtoMap = extensionProtoMap | ||
a.Logger.Printf("target %q loaded proto files for gNMI extensions", t.Config.Name) | ||
return nil | ||
} | ||
|
||
func (a *App) targetConfigExists(name string) bool { | ||
a.configLock.RLock() | ||
_, ok := a.Config.Targets[name] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,10 +12,12 @@ import ( | |
"encoding/json" | ||
"fmt" | ||
"math" | ||
"strconv" | ||
"strings" | ||
|
||
flattener "github.com/karimra/go-map-flattener" | ||
"github.com/openconfig/gnmi/proto/gnmi" | ||
"github.com/openconfig/gnmi/proto/gnmi_ext" | ||
) | ||
|
||
// EventMsg represents a gNMI update message, | ||
|
@@ -40,9 +42,28 @@ func ResponseToEventMsgs(name string, rsp *gnmi.SubscribeResponse, meta map[stri | |
return nil, nil | ||
} | ||
evs := make([]*EventMsg, 0, len(rsp.GetUpdate().GetUpdate())+len(rsp.GetUpdate().GetDelete())) | ||
response := rsp | ||
switch rsp := rsp.Response.(type) { | ||
case *gnmi.SubscribeResponse_Update: | ||
namePrefix, prefixTags := tagsFromGNMIPath(rsp.Update.GetPrefix()) | ||
// Extension message to tags | ||
if prefixTags == nil { | ||
prefixTags = make(map[string]string) | ||
} | ||
for _, ext := range response.Extension { | ||
extensionValues, err := extensionToMap(ext) | ||
if err != nil { | ||
return nil, err | ||
} | ||
for k, v := range extensionValues { | ||
switch v := v.(type) { | ||
case string: | ||
prefixTags[k] = v | ||
case float64: | ||
prefixTags[k] = strconv.FormatFloat(v, 'G', -1, 64) | ||
} | ||
} | ||
} | ||
Comment on lines
+53
to
+66
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This part assumes that any extension data can and will be mapped to a map[string]any, and that the goal is to get the extension data as tags, it will not always be the case for all extensions. |
||
// notification updates | ||
uevs, err := updatesToEvent(name, namePrefix, rsp.Update.GetTimestamp(), rsp.Update.GetUpdate(), prefixTags, meta) | ||
if err != nil { | ||
|
@@ -200,6 +221,20 @@ func tagsFromGNMIPath(p *gnmi.Path) (string, map[string]string) { | |
return sb.String(), tags | ||
} | ||
|
||
func extensionToMap(ext *gnmi_ext.Extension) (map[string]interface{}, error) { | ||
jsondata := ext.GetRegisteredExt().GetMsg() | ||
|
||
var anyJson map[string]interface{} | ||
if len(jsondata) != 0 { | ||
err := json.Unmarshal(jsondata, &anyJson) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return anyJson, nil | ||
} | ||
return nil, fmt.Errorf("0 length JSON decoded") | ||
} | ||
|
||
func getValueFlat(prefix string, updValue *gnmi.TypedValue) (map[string]interface{}, error) { | ||
if updValue == nil { | ||
return nil, nil | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure it's a good idea to overwrite the extension Msg with the jsondata here. It makes more sense to return the resulting json and pass it on to the outputs as metadata.