Skip to content

Commit

Permalink
Create a copy of message before manipulation
Browse files Browse the repository at this point in the history
  • Loading branch information
TomTheBear committed Jul 12, 2024
1 parent 6c9c14e commit 98ba5ef
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 57 deletions.
96 changes: 49 additions & 47 deletions pkg/messageProcessor/messageProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ type MessageProcessor interface {
// Read in a JSON configuration
FromConfigJSON(config json.RawMessage) error
// Processing functions for legacy CCMetric and current CCMessage
ProcessMetric(m lp.CCMetric) (bool, error)
ProcessMessage(m lp2.CCMessage) (bool, error)
ProcessMetric(m lp.CCMetric) (lp2.CCMessage, error)
ProcessMessage(m lp2.CCMessage) (lp2.CCMessage, error)
//EvalToBool(condition string, parameters map[string]interface{}) (bool, error)
//EvalToFloat64(condition string, parameters map[string]interface{}) (float64, error)
//EvalToString(condition string, parameters map[string]interface{}) (string, error)
Expand Down Expand Up @@ -759,7 +759,7 @@ func (mp *messageProcessor) FromConfigJSON(config json.RawMessage) error {
return nil
}

func (mp *messageProcessor) ProcessMetric(metric lp.CCMetric) (bool, error) {
func (mp *messageProcessor) ProcessMetric(metric lp.CCMetric) (lp2.CCMessage, error) {
m, err := lp2.NewMessage(
metric.Name(),
metric.Tags(),
Expand All @@ -768,16 +768,17 @@ func (mp *messageProcessor) ProcessMetric(metric lp.CCMetric) (bool, error) {
metric.Time(),
)
if err != nil {
return true, fmt.Errorf("failed to parse metric to message: %v", err.Error())
return m, fmt.Errorf("failed to parse metric to message: %v", err.Error())
}
return mp.ProcessMessage(m)

}

func (mp *messageProcessor) ProcessMessage(m lp2.CCMessage) (bool, error) {
var drop bool = false
func (mp *messageProcessor) ProcessMessage(m lp2.CCMessage) (lp2.CCMessage, error) {
var err error = nil
name := m.Name()
var out lp2.CCMessage = lp2.FromMessage(m)

name := out.Name()

if len(mp.stages) == 0 {
mp.SetStages(mp.DefaultStages())
Expand All @@ -786,7 +787,7 @@ func (mp *messageProcessor) ProcessMessage(m lp2.CCMessage) (bool, error) {
mp.mutex.RLock()
defer mp.mutex.RUnlock()

params := getParamMap(m)
params := getParamMap(out)

defer func() {
params["field"] = nil
Expand All @@ -804,149 +805,150 @@ func (mp *messageProcessor) ProcessMessage(m lp2.CCMessage) (bool, error) {
cclog.ComponentDebug("MessageProcessor", "Dropping by message name ", name)
if _, ok := mp.dropMessages[name]; ok {
cclog.ComponentDebug("MessageProcessor", "Drop")
return true, nil
return nil, nil
}
}
case STAGENAME_DROP_BY_TYPE:
if len(mp.dropTypes) > 0 {
cclog.ComponentDebug("MessageProcessor", "Dropping by message type")
if _, ok := mp.dropTypes[params["messagetype"].(string)]; ok {
cclog.ComponentDebug("MessageProcessor", "Drop")
return true, nil
return nil, nil
}
}
case STAGENAME_DROP_IF:
if len(mp.dropMessagesIf) > 0 {
cclog.ComponentDebug("MessageProcessor", "Dropping by condition")
drop, err = dropMessagesIf(&params, &mp.dropMessagesIf)
drop, err := dropMessagesIf(&params, &mp.dropMessagesIf)
if err != nil {
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
}
if drop {
cclog.ComponentDebug("MessageProcessor", "Drop")
return nil, nil
}
}
case STAGENAME_RENAME_BY_NAME:
if len(mp.renameMessages) > 0 {
cclog.ComponentDebug("MessageProcessor", "Renaming by name match")
if newname, ok := mp.renameMessages[name]; ok {
cclog.ComponentDebug("MessageProcessor", "Rename to", newname)
m.SetName(newname)
out.SetName(newname)
cclog.ComponentDebug("MessageProcessor", "Add old name as 'oldname' to meta", name)
m.AddMeta("oldname", name)
out.AddMeta("oldname", name)
}
}
case STAGENAME_RENAME_IF:
if len(mp.renameMessagesIf) > 0 {
cclog.ComponentDebug("MessageProcessor", "Renaming by condition")
_, err := renameMessagesIf(m, &params, &mp.renameMessagesIf)
_, err := renameMessagesIf(out, &params, &mp.renameMessagesIf)
if err != nil {
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
}
}
case STAGENAME_ADD_TAG:
if len(mp.addTagsIf) > 0 {
cclog.ComponentDebug("MessageProcessor", "Adding tags")
_, err = addTagIf(m, &params, &mp.addTagsIf)
_, err = addTagIf(out, &params, &mp.addTagsIf)
if err != nil {
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
}
}
case STAGENAME_DELETE_TAG:
if len(mp.deleteTagsIf) > 0 {
cclog.ComponentDebug("MessageProcessor", "Delete tags")
_, err = deleteTagIf(m, &params, &mp.deleteTagsIf)
_, err = deleteTagIf(out, &params, &mp.deleteTagsIf)
if err != nil {
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
}
}
case STAGENAME_ADD_META:
if len(mp.addMetaIf) > 0 {
cclog.ComponentDebug("MessageProcessor", "Adding meta information")
_, err = addMetaIf(m, &params, &mp.addMetaIf)
_, err = addMetaIf(out, &params, &mp.addMetaIf)
if err != nil {
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
}
}
case STAGENAME_DELETE_META:
if len(mp.deleteMetaIf) > 0 {
cclog.ComponentDebug("MessageProcessor", "Delete meta information")
_, err = deleteMetaIf(m, &params, &mp.deleteMetaIf)
_, err = deleteMetaIf(out, &params, &mp.deleteMetaIf)
if err != nil {
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
}
}
case STAGENAME_ADD_FIELD:
if len(mp.addFieldIf) > 0 {
cclog.ComponentDebug("MessageProcessor", "Adding fields")
_, err = addFieldIf(m, &params, &mp.addFieldIf)
_, err = addFieldIf(out, &params, &mp.addFieldIf)
if err != nil {
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
}
}
case STAGENAME_DELETE_FIELD:
if len(mp.deleteFieldIf) > 0 {
cclog.ComponentDebug("MessageProcessor", "Delete fields")
_, err = deleteFieldIf(m, &params, &mp.deleteFieldIf)
_, err = deleteFieldIf(out, &params, &mp.deleteFieldIf)
if err != nil {
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
}
}
case STAGENAME_MOVE_TAG_META:
if len(mp.moveTagToMeta) > 0 {
cclog.ComponentDebug("MessageProcessor", "Move tag to meta")
_, err := moveTagToMeta(m, &params, &mp.moveTagToMeta)
_, err := moveTagToMeta(out, &params, &mp.moveTagToMeta)
if err != nil {
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
}
}
case STAGENAME_MOVE_TAG_FIELD:
if len(mp.moveTagToField) > 0 {
cclog.ComponentDebug("MessageProcessor", "Move tag to fields")
_, err := moveTagToField(m, &params, &mp.moveTagToField)
_, err := moveTagToField(out, &params, &mp.moveTagToField)
if err != nil {
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
}
}
case STAGENAME_MOVE_META_TAG:
if len(mp.moveMetaToTag) > 0 {
cclog.ComponentDebug("MessageProcessor", "Move meta to tags")
_, err := moveMetaToTag(m, &params, &mp.moveMetaToTag)
_, err := moveMetaToTag(out, &params, &mp.moveMetaToTag)
if err != nil {
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
}
}
case STAGENAME_MOVE_META_FIELD:
if len(mp.moveMetaToField) > 0 {
cclog.ComponentDebug("MessageProcessor", "Move meta to fields")
_, err := moveMetaToField(m, &params, &mp.moveMetaToField)
_, err := moveMetaToField(out, &params, &mp.moveMetaToField)
if err != nil {
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
}
}
case STAGENAME_MOVE_FIELD_META:
if len(mp.moveFieldToMeta) > 0 {
cclog.ComponentDebug("MessageProcessor", "Move field to meta")
_, err := moveFieldToMeta(m, &params, &mp.moveFieldToMeta)
_, err := moveFieldToMeta(out, &params, &mp.moveFieldToMeta)
if err != nil {
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
}
}
case STAGENAME_MOVE_FIELD_TAG:
if len(mp.moveFieldToTag) > 0 {
cclog.ComponentDebug("MessageProcessor", "Move field to tags")
_, err := moveFieldToTag(m, &params, &mp.moveFieldToTag)
_, err := moveFieldToTag(out, &params, &mp.moveFieldToTag)
if err != nil {
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
}
}
case STAGENAME_NORMALIZE_UNIT:
if mp.normalizeUnits {
cclog.ComponentDebug("MessageProcessor", "Normalize units")
if lp2.IsMetric(m) {
_, err := normalizeUnits(m)
if lp2.IsMetric(out) {
_, err := normalizeUnits(out)
if err != nil {
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
}
} else {
cclog.ComponentDebug("MessageProcessor", "skipped, no metric")
Expand All @@ -956,10 +958,10 @@ func (mp *messageProcessor) ProcessMessage(m lp2.CCMessage) (bool, error) {
case STAGENAME_CHANGE_UNIT_PREFIX:
if len(mp.changeUnitPrefix) > 0 {
cclog.ComponentDebug("MessageProcessor", "Change unit prefix")
if lp2.IsMetric(m) {
_, err := changeUnitPrefix(m, &params, &mp.changeUnitPrefix)
if lp2.IsMetric(out) {
_, err := changeUnitPrefix(out, &params, &mp.changeUnitPrefix)
if err != nil {
return drop, fmt.Errorf("failed to evaluate: %v", err.Error())
return out, fmt.Errorf("failed to evaluate: %v", err.Error())
}
} else {
cclog.ComponentDebug("MessageProcessor", "skipped, no metric")
Expand All @@ -969,7 +971,7 @@ func (mp *messageProcessor) ProcessMessage(m lp2.CCMessage) (bool, error) {

}

return drop, nil
return out, nil
}

// Get a new instace of a message processor.
Expand Down
29 changes: 19 additions & 10 deletions pkg/messageProcessor/messageProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,28 +326,37 @@ func TestConfigList(t *testing.T) {
return
}
//t.Log(m.ToLineProtocol(nil))
drop, err := mp.ProcessMessage(m)
out, err := mp.ProcessMessage(m)
if err != nil && !c.errors {
cclog.SetDebug()
mp.ProcessMessage(m)
t.Error(err.Error())
return
}
if drop != c.drop {
if c.drop {
t.Error("fail, message should be dropped but processor signalled NO dropping")
} else {
t.Error("fail, message should NOT be dropped but processor signalled dropping")
}
cclog.SetDebug()
mp.ProcessMessage(m)
if out == nil && !c.drop {
t.Error("fail, message should NOT be dropped but processor signalled dropping")
return
} else if out != nil && c.drop {
t.Error("fail, message should be dropped but processor signalled NO dropping")
return
}
// {
// if c.drop {
// t.Error("fail, message should be dropped but processor signalled NO dropping")
// } else {
// t.Error("fail, message should NOT be dropped but processor signalled dropping")
// }
// cclog.SetDebug()
// mp.ProcessMessage(m)
// return
// }
if c.check != nil {
if err := c.check(m); err != nil {
if err := c.check(out); err != nil {
t.Errorf("check failed with %v", err.Error())
t.Log("Rerun with debugging")
cclog.SetDebug()
mp.ProcessMessage(m)
return
}
}
})
Expand Down

0 comments on commit 98ba5ef

Please sign in to comment.