From 842aa75601dcec7f7832c289717fe25f0a002cd6 Mon Sep 17 00:00:00 2001 From: Grzegorz Burzynski Date: Sat, 23 Nov 2019 21:22:20 +0100 Subject: [PATCH] Add transactional publishing, queue length querying --- cmd/transaction/main.go | 138 +++++++++++++++++++++++ go.mod | 1 + go.sum | 2 + pkg/firestore/publisher.go | 34 ++++++ pkg/firestore/subscriber.go | 17 ++- pkg/firestore/transactional_publisher.go | 99 ++++++++++++++++ 6 files changed, 290 insertions(+), 1 deletion(-) create mode 100644 cmd/transaction/main.go create mode 100644 pkg/firestore/transactional_publisher.go diff --git a/cmd/transaction/main.go b/cmd/transaction/main.go new file mode 100644 index 0000000..e68b7e1 --- /dev/null +++ b/cmd/transaction/main.go @@ -0,0 +1,138 @@ +// This example shows how to use transactional publisher in order to +// store data and publish events in one transaction. +package main + +import ( + "context" + "encoding/json" + "errors" + "math/rand" + "time" + + "github.com/Pallinder/go-randomdata" + "cloud.google.com/go/firestore" + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/message" + watermillFirestore "github.com/czeslavo/watermill-firestore/pkg/firestore" +) + +const projectID = "test" + +type User struct { + Name string `firestore:"name"` +} + +type UserAdded struct { + When time.Time `json:"when"` + Name string `json:"name"` +} + +type UserStore struct { + client *firestore.Client +} + +func NewUserStore() *UserStore { + client, err := firestore.NewClient(context.Background(), projectID) + if err != nil { + panic(err) + } + + return &UserStore{ + client: client, + } +} + +func (r *UserStore) Add(u User, t *firestore.Transaction) error { + if err := t.Create(r.client.Collection("users").NewDoc(), u); err != nil { + return err + } + + // 3/4 for success + if rand.Intn(4) == 1 { + return errors.New("random error") + } + return nil +} + +func main() { + client, err := firestore.NewClient(context.Background(), projectID) + if err != nil { + panic(err) + } + + logger := watermill.NewStdLogger(true, false) + + go addUsers(client, logger) + subscriber, err := watermillFirestore.NewSubscriber(watermillFirestore.SubscriberConfig{ + ProjectID: projectID, + }, logger) + if err != nil { + panic(err) + } + go monitorQueueLength(subscriber, "user_added", logger) + + userAddedCh, err := subscriber.Subscribe(context.Background(), "user_added") + if err != nil { + panic(err) + } + consume(userAddedCh, logger) +} + +func addUsers(client *firestore.Client, logger watermill.LoggerAdapter) { + publisher, err := watermillFirestore.NewPublisher(watermillFirestore.PublisherConfig{ + ProjectID: projectID, + }, logger) + if err != nil { + panic(err) + } + + userStore := NewUserStore() + for { + if err := client.RunTransaction(context.Background(), func(ctx context.Context, t *firestore.Transaction) error { + user := User{Name: randomdata.FirstName(randomdata.RandomGender)} + if err := userStore.Add(user, t); err != nil { + return err + } + + payload, err := json.Marshal(&UserAdded{When: time.Now(), Name: user.Name}) + if err != nil { + return err + } + + msg := message.NewMessage(watermill.NewShortUUID(), payload) + if err := publisher.PublishInTransaction("user_added", t, msg); err != nil { + return err + } + + return nil + }); err != nil { + logger.Debug("Transaction failed", nil) + } + <-time.After(time.Second * 5) + } +} + +func consume(ch <-chan *message.Message, logger watermill.LoggerAdapter) { + for msg := range ch { + var event UserAdded + if err := json.Unmarshal(msg.Payload, &event); err != nil { + panic(err) + } + + logger.Debug("Received userAdded", watermill.LogFields{"event": event}) + <-time.After(time.Second) + msg.Ack() + } +} + +func monitorQueueLength(sub *watermillFirestore.Subscriber, topic string, logger watermill.LoggerAdapter) { + for { + length, err := sub.QueueLength(topic) + if err != nil { + panic(err) + } + logger.Debug("Read queue length", watermill.LogFields{"queue_length": length}) + <-time.After(time.Second) + } +} + diff --git a/go.mod b/go.mod index 1431afb..b2e7343 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.12 require ( cloud.google.com/go v0.45.1 cloud.google.com/go/pubsub v1.0.1 // indirect + github.com/Pallinder/go-randomdata v1.2.0 github.com/ThreeDotsLabs/watermill v1.0.2 github.com/ThreeDotsLabs/watermill-googlecloud v1.0.2 // indirect github.com/google/uuid v1.1.1 diff --git a/go.sum b/go.sum index d049cde..1e8d7d4 100644 --- a/go.sum +++ b/go.sum @@ -11,6 +11,8 @@ cloud.google.com/go/pubsub v1.0.1 h1:W9tAK3E57P75u0XLLR82LZyw8VpAnhmyTOxW9qzmyj8 cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/Pallinder/go-randomdata v1.2.0 h1:DZ41wBchNRb/0GfsePLiSwb0PHZmT67XY00lCDlaYPg= +github.com/Pallinder/go-randomdata v1.2.0/go.mod h1:yHmJgulpD2Nfrm0cR9tI/+oAgRqCQQixsA8HyRZfV9Y= github.com/ThreeDotsLabs/watermill v1.0.0/go.mod h1:gjVFKc8aN+vmEHw3pA0kh4mmuwHe02nyfghb6IWWiKE= github.com/ThreeDotsLabs/watermill v1.0.1 h1:LITkp5Rnb6H/1faA7QH/BSBq8REIEtaNPLVeuFcha24= github.com/ThreeDotsLabs/watermill v1.0.1/go.mod h1:gjVFKc8aN+vmEHw3pA0kh4mmuwHe02nyfghb6IWWiKE= diff --git a/pkg/firestore/publisher.go b/pkg/firestore/publisher.go index ab8ece1..ceaab1e 100644 --- a/pkg/firestore/publisher.go +++ b/pkg/firestore/publisher.go @@ -128,6 +128,40 @@ func (p *Publisher) Publish(topic string, messages ...*message.Message) error { return nil } +func (p *Publisher) PublishInTransaction(topic string, t *firestore.Transaction, messages ...*message.Message) error { + ctx, cancel := context.WithTimeout(context.Background(), p.config.MessagePublishTimeout) + defer cancel() + + logger := p.logger.With(watermill.LogFields{"topic": topic}) + + subscriptions, err := p.getSubscriptions(ctx, topic) + if err != nil { + logger.Error("Failed to get subscriptions for publishing", err, nil) + return err + } + logger = logger.With(watermill.LogFields{"subscriptions_count": len(subscriptions)}) + + msgsToPublish := p.prepareFirestoreMessages(messages) + + logger.Trace("Publishing to topic", nil) + + for _, subscription := range subscriptions { + logger := logger.With(watermill.LogFields{"subscription": subscription}) + logger.Trace("Publishing to subscription", nil) + + for _, message := range msgsToPublish { + if err := t.Create(p.client.Collection(p.config.PubSubRootCollection).Doc(topic).Collection(subscription).NewDoc(), message); err != nil { + logger.Error("Failed to add message to transaction", err, nil) + return err + } + logger.Trace("Added message to transaction", nil) + continue + } + } + + return nil +} + func (p *Publisher) getSubscriptions(ctx context.Context, topic string) ([]string, error) { logger := p.logger.With(watermill.LogFields{"topic": topic}) diff --git a/pkg/firestore/subscriber.go b/pkg/firestore/subscriber.go index 1551660..bd41f5c 100644 --- a/pkg/firestore/subscriber.go +++ b/pkg/firestore/subscriber.go @@ -132,6 +132,22 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *messa return sub.output, nil } +func (s *Subscriber) QueueLength(topic string) (int, error) { + ctx, cancel := context.WithTimeout(context.Background(), s.config.Timeout) + defer cancel() + docs, err := s.subscriptionCollection(topic).Documents(ctx).GetAll() + if err != nil { + s.logger.Error("Failed to get queue length", err, watermill.LogFields{"topic": topic}) + return 0, err + } + + return len(docs), nil +} + +func (s *Subscriber) subscriptionCollection(topic string) *firestore.CollectionRef { + return s.client.Collection(s.config.PubSubRootCollection).Doc(topic).Collection(s.config.GenerateSubscriptionName(topic)) +} + func (s *Subscriber) Close() error { if s.closed { return nil @@ -171,5 +187,4 @@ func createFirestoreSubscriptionIfNotExists(client *firestore.Client, topic, sub logger.Debug("Created subscription", nil) return nil - } diff --git a/pkg/firestore/transactional_publisher.go b/pkg/firestore/transactional_publisher.go new file mode 100644 index 0000000..41798c8 --- /dev/null +++ b/pkg/firestore/transactional_publisher.go @@ -0,0 +1,99 @@ +package firestore + +import ( + "context" + + "cloud.google.com/go/firestore" + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/message" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type firestoreDocCreator interface { + Create(doc *firestore.DocumentRef, data interface{}) (*firestore.WriteResult, error) +} + +type transactionalDocCreator struct { + *firestore.Transaction +} + +type docCreator struct { +} + +func (c *docCreator) Create(doc *firestore.DocumentRef, data interface{}) (*firestore.WriteResult, error) { + return doc.Create(context.Background(), data) +} + +type TransactionalPublisher struct { + client *firestore.Client + creator firestoreDocCreator + logger watermill.LoggerAdapter +} + +func NewTransactionalPublisher(config PublisherConfig, creator firestoreDocCreator, logger watermill.LoggerAdapter) (*TransactionalPublisher, error) { + client, err := firestore.NewClient(context.Background(), config.ProjectID) + if err != nil { + return nil, err + } + + return &TransactionalPublisher{ + client: client, + creator: creator, + logger: logger, + }, nil +} + +func (p *TransactionalPublisher) Publish(topic string, messages ...*message.Message) error { + subscriptions, err := p.client.Collection("pubsub").Doc(topic).Collection("subscriptions").Documents(context.Background()).GetAll() + if err != nil { + return err + } + + logger := p.logger.With(watermill.LogFields{"subscriptions_count": len(subscriptions), "topic": topic}) + + logger.Debug("Publishing", nil) + + for _, msg := range messages { + firestoreMsg := Message{ + UUID: msg.UUID, + Payload: msg.Payload, + Metadata: make(map[string]interface{}), + } + for k, v := range msg.Metadata { + firestoreMsg.Metadata[k] = v + } + + logger := logger.With(watermill.LogFields{"message_uuid": msg.UUID}) + + for _, sub := range subscriptions { + logger := logger.With(watermill.LogFields{"collection": sub.Ref.ID}) + + docRef := p.client.Collection("pubsub").Doc(topic).Collection(sub.Ref.ID).NewDoc() + + _, err = p.creator.Create(docRef, firestoreMsg) + if err != nil { + p.logger.Error("Failed to send msg", err, watermill.LogFields{}) + return err + } + + logger.Debug("Published message", nil) + } + } + + return nil +} + +func (p *TransactionalPublisher) Close() error { + if err := p.client.Close(); err != nil { + if status.Code(err) == codes.Canceled { + // client is already closed + return nil + } + + p.logger.Error("closing client failed", err, watermill.LogFields{}) + return err + } + + return nil +}