Skip to content

Commit

Permalink
feat(digestwriter): add Payload Tracker kafka producer
Browse files Browse the repository at this point in the history
  • Loading branch information
michalslomczynski authored and jdobes committed Jan 16, 2023
1 parent 18b4cc5 commit 4f039e3
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 4 deletions.
57 changes: 54 additions & 3 deletions digestwriter/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ const (
rowIDKey = "row_id"
)

const (
errNoDigests = "no digests were retrieved from incoming message"
errClusterData = "error updating cluster data"
)

// OrgID data type represents organization ID.
type OrgID uint32

Expand Down Expand Up @@ -83,20 +88,43 @@ type IncomingMessage struct {
type DigestConsumer struct {
storage Storage
numberOfMessagesWithEmptyDigests uint64
PayloadTracker *utils.KafkaProducer
}

// startPayloadTracker starts Payload Tracker Kafka producer.
func startPayloadTracker() (*utils.KafkaProducer, error) {
ptWriter, err := utils.NewKafkaProducer(nil, utils.Cfg.KafkaBrokerAddress, utils.Cfg.KafkaPayloadTrackerTopic)
if err != nil {
return nil, err
}

return ptWriter, nil
}

// NewConsumer constructs a new instance of Consumer interface
// specialized in consuming from SHA extractor's result topic
func NewConsumer(storage Storage) (*utils.KafkaConsumer, error) {
SetupLogger()

payloadTrackerProducer, err := startPayloadTracker()
if err != nil {
return nil, err
}

processor := DigestConsumer{
storage,
0,
payloadTrackerProducer,
}
consumer, err := utils.NewKafkaConsumer(nil, &processor)
if err != nil {
payloadTrackerProducer.Close()
return nil, err
}

// Release Payload Tracker producer resources during DigestConsumer Close
consumer.Shutdown = payloadTrackerProducer.Close

return consumer, err
}

Expand All @@ -113,18 +141,31 @@ func (d *DigestConsumer) IncrementNumberOfMessagesWithEmptyDigests() {

// ProcessMessage processes an incoming message
func (d *DigestConsumer) ProcessMessage(msg *sarama.ConsumerMessage) error {
logger.Debugf("processing incoming message with a key=%s", msg.Key)

// Step #1: parse the incoming message
message, err := parseMessage(msg.Value)
if err != nil {
parseIncomingMessageError.Inc()
return err
}

parsedIncomingMessage.Inc()

// Set up payload tracker event
ptEvent := utils.NewPayloadTrackerEvent(string(message.RequestID))
ptEvent.SetOrgIDFromUint(uint32(message.Organization))

// Send Payload Tracker message with status received
ptEvent.UpdateStatusReceived()
d.sendPayloadTrackerMessage(&ptEvent)

// Defer sending another Payload Tracker message with status success or error set later on
defer d.sendPayloadTrackerMessage(&ptEvent)

if message.Workload.Images == nil || message.Workload.Namespaces == nil {
logger.Debugln("no digests were retrieved from incoming message")
logger.Debugln(errNoDigests)
d.IncrementNumberOfMessagesWithEmptyDigests()
ptEvent.UpdateStatusError(errNoDigests)
return nil
}

Expand All @@ -146,15 +187,25 @@ func (d *DigestConsumer) ProcessMessage(msg *sarama.ConsumerMessage) error {
orgKey: message.Organization,
clusterKey: message.ClusterName,
errorKey: err.Error(),
}).Errorln("error updating cluster data")
}).Errorln(errClusterData)
storedMessagesError.Inc()
ptEvent.UpdateStatusError(errClusterData)
return err
}

ptEvent.UpdateStatusSuccess()
storedMessagesOk.Inc()
return nil
}

// sendPayloadTrackerMessage sends Kafka message to Payload Tracker and logs errors
func (d *DigestConsumer) sendPayloadTrackerMessage(event *utils.PayloadTrackerEvent) {
logger.Debugf("sending Payload Tracker message with status %s", event.Status)
if err := event.SendKafkaMessage(d.PayloadTracker); err != nil {
logger.Errorf("failed to send Payload Tracker message: %s", err.Error())
}
}

func extractDigestsFromMessage(workload Workload) (digests []string) {
digestSet := map[string]struct{}{}
for imageID := range *workload.Images {
Expand Down
5 changes: 4 additions & 1 deletion digestwriter/export_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package digestwriter

import "app/base/utils"
import (
"app/base/utils"
)

// Export for testing
//
Expand All @@ -26,6 +28,7 @@ func NewDummyConsumerWithProcessor(storage Storage) (*utils.KafkaConsumer, *Dige
processor := DigestConsumer{
storage,
0,
nil,
}
consumer := utils.KafkaConsumer{
Processor: &processor,
Expand Down

0 comments on commit 4f039e3

Please sign in to comment.