Skip to content

Commit

Permalink
Add transactional publishing, queue length querying
Browse files Browse the repository at this point in the history
  • Loading branch information
czeslavo committed Nov 23, 2019
1 parent e8b1ee6 commit 842aa75
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 1 deletion.
138 changes: 138 additions & 0 deletions cmd/transaction/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// This example shows how to use transactional publisher in order to
// store data and publish events in one transaction.
package main

import (
"context"
"encoding/json"
"errors"
"math/rand"
"time"

"github.com/Pallinder/go-randomdata"
"cloud.google.com/go/firestore"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
watermillFirestore "github.com/czeslavo/watermill-firestore/pkg/firestore"
)

const projectID = "test"

type User struct {
Name string `firestore:"name"`
}

type UserAdded struct {
When time.Time `json:"when"`
Name string `json:"name"`
}

type UserStore struct {
client *firestore.Client
}

func NewUserStore() *UserStore {
client, err := firestore.NewClient(context.Background(), projectID)
if err != nil {
panic(err)
}

return &UserStore{
client: client,
}
}

func (r *UserStore) Add(u User, t *firestore.Transaction) error {
if err := t.Create(r.client.Collection("users").NewDoc(), u); err != nil {
return err
}

// 3/4 for success
if rand.Intn(4) == 1 {
return errors.New("random error")
}
return nil
}

func main() {
client, err := firestore.NewClient(context.Background(), projectID)
if err != nil {
panic(err)
}

logger := watermill.NewStdLogger(true, false)

go addUsers(client, logger)
subscriber, err := watermillFirestore.NewSubscriber(watermillFirestore.SubscriberConfig{
ProjectID: projectID,
}, logger)
if err != nil {
panic(err)
}
go monitorQueueLength(subscriber, "user_added", logger)

userAddedCh, err := subscriber.Subscribe(context.Background(), "user_added")
if err != nil {
panic(err)
}
consume(userAddedCh, logger)
}

func addUsers(client *firestore.Client, logger watermill.LoggerAdapter) {
publisher, err := watermillFirestore.NewPublisher(watermillFirestore.PublisherConfig{
ProjectID: projectID,
}, logger)
if err != nil {
panic(err)
}

userStore := NewUserStore()
for {
if err := client.RunTransaction(context.Background(), func(ctx context.Context, t *firestore.Transaction) error {
user := User{Name: randomdata.FirstName(randomdata.RandomGender)}
if err := userStore.Add(user, t); err != nil {
return err
}

payload, err := json.Marshal(&UserAdded{When: time.Now(), Name: user.Name})
if err != nil {
return err
}

msg := message.NewMessage(watermill.NewShortUUID(), payload)
if err := publisher.PublishInTransaction("user_added", t, msg); err != nil {
return err
}

return nil
}); err != nil {
logger.Debug("Transaction failed", nil)
}
<-time.After(time.Second * 5)
}
}

func consume(ch <-chan *message.Message, logger watermill.LoggerAdapter) {
for msg := range ch {
var event UserAdded
if err := json.Unmarshal(msg.Payload, &event); err != nil {
panic(err)
}

logger.Debug("Received userAdded", watermill.LogFields{"event": event})
<-time.After(time.Second)
msg.Ack()
}
}

func monitorQueueLength(sub *watermillFirestore.Subscriber, topic string, logger watermill.LoggerAdapter) {
for {
length, err := sub.QueueLength(topic)
if err != nil {
panic(err)
}
logger.Debug("Read queue length", watermill.LogFields{"queue_length": length})
<-time.After(time.Second)
}
}

1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.12
require (
cloud.google.com/go v0.45.1
cloud.google.com/go/pubsub v1.0.1 // indirect
github.com/Pallinder/go-randomdata v1.2.0
github.com/ThreeDotsLabs/watermill v1.0.2
github.com/ThreeDotsLabs/watermill-googlecloud v1.0.2 // indirect
github.com/google/uuid v1.1.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ cloud.google.com/go/pubsub v1.0.1 h1:W9tAK3E57P75u0XLLR82LZyw8VpAnhmyTOxW9qzmyj8
cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/Pallinder/go-randomdata v1.2.0 h1:DZ41wBchNRb/0GfsePLiSwb0PHZmT67XY00lCDlaYPg=
github.com/Pallinder/go-randomdata v1.2.0/go.mod h1:yHmJgulpD2Nfrm0cR9tI/+oAgRqCQQixsA8HyRZfV9Y=
github.com/ThreeDotsLabs/watermill v1.0.0/go.mod h1:gjVFKc8aN+vmEHw3pA0kh4mmuwHe02nyfghb6IWWiKE=
github.com/ThreeDotsLabs/watermill v1.0.1 h1:LITkp5Rnb6H/1faA7QH/BSBq8REIEtaNPLVeuFcha24=
github.com/ThreeDotsLabs/watermill v1.0.1/go.mod h1:gjVFKc8aN+vmEHw3pA0kh4mmuwHe02nyfghb6IWWiKE=
Expand Down
34 changes: 34 additions & 0 deletions pkg/firestore/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,40 @@ func (p *Publisher) Publish(topic string, messages ...*message.Message) error {
return nil
}

func (p *Publisher) PublishInTransaction(topic string, t *firestore.Transaction, messages ...*message.Message) error {
ctx, cancel := context.WithTimeout(context.Background(), p.config.MessagePublishTimeout)
defer cancel()

logger := p.logger.With(watermill.LogFields{"topic": topic})

subscriptions, err := p.getSubscriptions(ctx, topic)
if err != nil {
logger.Error("Failed to get subscriptions for publishing", err, nil)
return err
}
logger = logger.With(watermill.LogFields{"subscriptions_count": len(subscriptions)})

msgsToPublish := p.prepareFirestoreMessages(messages)

logger.Trace("Publishing to topic", nil)

for _, subscription := range subscriptions {
logger := logger.With(watermill.LogFields{"subscription": subscription})
logger.Trace("Publishing to subscription", nil)

for _, message := range msgsToPublish {
if err := t.Create(p.client.Collection(p.config.PubSubRootCollection).Doc(topic).Collection(subscription).NewDoc(), message); err != nil {
logger.Error("Failed to add message to transaction", err, nil)
return err
}
logger.Trace("Added message to transaction", nil)
continue
}
}

return nil
}

func (p *Publisher) getSubscriptions(ctx context.Context, topic string) ([]string, error) {
logger := p.logger.With(watermill.LogFields{"topic": topic})

Expand Down
17 changes: 16 additions & 1 deletion pkg/firestore/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,22 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *messa
return sub.output, nil
}

func (s *Subscriber) QueueLength(topic string) (int, error) {
ctx, cancel := context.WithTimeout(context.Background(), s.config.Timeout)
defer cancel()
docs, err := s.subscriptionCollection(topic).Documents(ctx).GetAll()
if err != nil {
s.logger.Error("Failed to get queue length", err, watermill.LogFields{"topic": topic})
return 0, err
}

return len(docs), nil
}

func (s *Subscriber) subscriptionCollection(topic string) *firestore.CollectionRef {
return s.client.Collection(s.config.PubSubRootCollection).Doc(topic).Collection(s.config.GenerateSubscriptionName(topic))
}

func (s *Subscriber) Close() error {
if s.closed {
return nil
Expand Down Expand Up @@ -171,5 +187,4 @@ func createFirestoreSubscriptionIfNotExists(client *firestore.Client, topic, sub

logger.Debug("Created subscription", nil)
return nil

}
99 changes: 99 additions & 0 deletions pkg/firestore/transactional_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package firestore

import (
"context"

"cloud.google.com/go/firestore"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type firestoreDocCreator interface {
Create(doc *firestore.DocumentRef, data interface{}) (*firestore.WriteResult, error)
}

type transactionalDocCreator struct {
*firestore.Transaction
}

type docCreator struct {
}

func (c *docCreator) Create(doc *firestore.DocumentRef, data interface{}) (*firestore.WriteResult, error) {
return doc.Create(context.Background(), data)
}

type TransactionalPublisher struct {
client *firestore.Client
creator firestoreDocCreator
logger watermill.LoggerAdapter
}

func NewTransactionalPublisher(config PublisherConfig, creator firestoreDocCreator, logger watermill.LoggerAdapter) (*TransactionalPublisher, error) {
client, err := firestore.NewClient(context.Background(), config.ProjectID)
if err != nil {
return nil, err
}

return &TransactionalPublisher{
client: client,
creator: creator,
logger: logger,
}, nil
}

func (p *TransactionalPublisher) Publish(topic string, messages ...*message.Message) error {
subscriptions, err := p.client.Collection("pubsub").Doc(topic).Collection("subscriptions").Documents(context.Background()).GetAll()
if err != nil {
return err
}

logger := p.logger.With(watermill.LogFields{"subscriptions_count": len(subscriptions), "topic": topic})

logger.Debug("Publishing", nil)

for _, msg := range messages {
firestoreMsg := Message{
UUID: msg.UUID,
Payload: msg.Payload,
Metadata: make(map[string]interface{}),
}
for k, v := range msg.Metadata {
firestoreMsg.Metadata[k] = v
}

logger := logger.With(watermill.LogFields{"message_uuid": msg.UUID})

for _, sub := range subscriptions {
logger := logger.With(watermill.LogFields{"collection": sub.Ref.ID})

docRef := p.client.Collection("pubsub").Doc(topic).Collection(sub.Ref.ID).NewDoc()

_, err = p.creator.Create(docRef, firestoreMsg)
if err != nil {
p.logger.Error("Failed to send msg", err, watermill.LogFields{})
return err
}

logger.Debug("Published message", nil)
}
}

return nil
}

func (p *TransactionalPublisher) Close() error {
if err := p.client.Close(); err != nil {
if status.Code(err) == codes.Canceled {
// client is already closed
return nil
}

p.logger.Error("closing client failed", err, watermill.LogFields{})
return err
}

return nil
}

0 comments on commit 842aa75

Please sign in to comment.