From 4f039e3b9d009d37b9839a0a2bf43763c5131047 Mon Sep 17 00:00:00 2001 From: michalslomczynski Date: Tue, 3 Jan 2023 22:55:47 +0100 Subject: [PATCH] feat(digestwriter): add Payload Tracker kafka producer --- digestwriter/consumer.go | 57 +++++++++++++++++++++++++++++++++++-- digestwriter/export_test.go | 5 +++- 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/digestwriter/consumer.go b/digestwriter/consumer.go index 42dac81e..e9977bc2 100644 --- a/digestwriter/consumer.go +++ b/digestwriter/consumer.go @@ -29,6 +29,11 @@ const ( rowIDKey = "row_id" ) +const ( + errNoDigests = "no digests were retrieved from incoming message" + errClusterData = "error updating cluster data" +) + // OrgID data type represents organization ID. type OrgID uint32 @@ -83,20 +88,43 @@ type IncomingMessage struct { type DigestConsumer struct { storage Storage numberOfMessagesWithEmptyDigests uint64 + PayloadTracker *utils.KafkaProducer +} + +// startPayloadTracker starts Payload Tracker Kafka producer. +func startPayloadTracker() (*utils.KafkaProducer, error) { + ptWriter, err := utils.NewKafkaProducer(nil, utils.Cfg.KafkaBrokerAddress, utils.Cfg.KafkaPayloadTrackerTopic) + if err != nil { + return nil, err + } + + return ptWriter, nil } // NewConsumer constructs a new instance of Consumer interface // specialized in consuming from SHA extractor's result topic func NewConsumer(storage Storage) (*utils.KafkaConsumer, error) { SetupLogger() + + payloadTrackerProducer, err := startPayloadTracker() + if err != nil { + return nil, err + } + processor := DigestConsumer{ storage, 0, + payloadTrackerProducer, } consumer, err := utils.NewKafkaConsumer(nil, &processor) if err != nil { + payloadTrackerProducer.Close() return nil, err } + + // Release Payload Tracker producer resources during DigestConsumer Close + consumer.Shutdown = payloadTrackerProducer.Close + return consumer, err } @@ -113,18 +141,31 @@ func (d *DigestConsumer) IncrementNumberOfMessagesWithEmptyDigests() { // ProcessMessage processes an incoming message func (d *DigestConsumer) ProcessMessage(msg *sarama.ConsumerMessage) error { + logger.Debugf("processing incoming message with a key=%s", msg.Key) + // Step #1: parse the incoming message message, err := parseMessage(msg.Value) if err != nil { parseIncomingMessageError.Inc() return err } - parsedIncomingMessage.Inc() + // Set up payload tracker event + ptEvent := utils.NewPayloadTrackerEvent(string(message.RequestID)) + ptEvent.SetOrgIDFromUint(uint32(message.Organization)) + + // Send Payload Tracker message with status received + ptEvent.UpdateStatusReceived() + d.sendPayloadTrackerMessage(&ptEvent) + + // Defer sending another Payload Tracker message with status success or error set later on + defer d.sendPayloadTrackerMessage(&ptEvent) + if message.Workload.Images == nil || message.Workload.Namespaces == nil { - logger.Debugln("no digests were retrieved from incoming message") + logger.Debugln(errNoDigests) d.IncrementNumberOfMessagesWithEmptyDigests() + ptEvent.UpdateStatusError(errNoDigests) return nil } @@ -146,15 +187,25 @@ func (d *DigestConsumer) ProcessMessage(msg *sarama.ConsumerMessage) error { orgKey: message.Organization, clusterKey: message.ClusterName, errorKey: err.Error(), - }).Errorln("error updating cluster data") + }).Errorln(errClusterData) storedMessagesError.Inc() + ptEvent.UpdateStatusError(errClusterData) return err } + ptEvent.UpdateStatusSuccess() storedMessagesOk.Inc() return nil } +// sendPayloadTrackerMessage sends Kafka message to Payload Tracker and logs errors +func (d *DigestConsumer) sendPayloadTrackerMessage(event *utils.PayloadTrackerEvent) { + logger.Debugf("sending Payload Tracker message with status %s", event.Status) + if err := event.SendKafkaMessage(d.PayloadTracker); err != nil { + logger.Errorf("failed to send Payload Tracker message: %s", err.Error()) + } +} + func extractDigestsFromMessage(workload Workload) (digests []string) { digestSet := map[string]struct{}{} for imageID := range *workload.Images { diff --git a/digestwriter/export_test.go b/digestwriter/export_test.go index 1a09ac66..6043f65e 100644 --- a/digestwriter/export_test.go +++ b/digestwriter/export_test.go @@ -1,6 +1,8 @@ package digestwriter -import "app/base/utils" +import ( + "app/base/utils" +) // Export for testing // @@ -26,6 +28,7 @@ func NewDummyConsumerWithProcessor(storage Storage) (*utils.KafkaConsumer, *Dige processor := DigestConsumer{ storage, 0, + nil, } consumer := utils.KafkaConsumer{ Processor: &processor,