Skip to content

Commit

Permalink
Merge pull request #5 from czeslavo/transactional-publisher
Browse files Browse the repository at this point in the history
Add transactional publisher
  • Loading branch information
czeslavo authored Mar 20, 2020
2 parents 3059fcb + 2d9ad14 commit 6b4ba83
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 3 deletions.
5 changes: 2 additions & 3 deletions pkg/firestore/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package firestore

import (
"context"
"github.com/pkg/errors"
"sync"
"time"

"github.com/pkg/errors"

"cloud.google.com/go/firestore"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
Expand Down Expand Up @@ -164,7 +165,6 @@ func (p *Publisher) PublishInTransaction(topic string, t *firestore.Transaction,
})

logger.Trace("Added message to transaction", nil)
continue
}
}

Expand Down Expand Up @@ -277,7 +277,6 @@ func (p *Publisher) publishInBatches(
logger.Error("Failed to commit messages batch", err, nil)
return err
}
logger.Trace("Published message", nil)
}

return nil
Expand Down
25 changes: 25 additions & 0 deletions pkg/firestore/transactional.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package firestore

import (
"cloud.google.com/go/firestore"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
)

type TransactionalPublisher struct {
publisher *Publisher
tx *firestore.Transaction
}

func NewTransactionalPublisher(config PublisherConfig, tx *firestore.Transaction, logger watermill.LoggerAdapter) (*TransactionalPublisher, error) {
publisher, err := NewPublisher(config, logger)
if err != nil {
return nil, err
}

return &TransactionalPublisher{publisher, tx}, nil
}

func (p *TransactionalPublisher) Publish(topic string, messages ...*message.Message) error {
return p.publisher.PublishInTransaction(topic, p.tx, messages...)
}
56 changes: 56 additions & 0 deletions pkg/firestore/transactional_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package firestore_test

import (
"context"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"

stdFirestore "cloud.google.com/go/firestore"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/czeslavo/watermill-firestore/pkg/firestore"
"github.com/stretchr/testify/require"
)

func TestTransactionalPublisher(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

projectID := os.Getenv("FIRESTORE_PROJECT_ID")
client, err := stdFirestore.NewClient(ctx, projectID)
require.NoError(t, err)

logger := watermill.NewStdLogger(true, true)
topic := "transactional_publisher_test_" + watermill.NewShortUUID()

subscriber, err := firestore.NewSubscriber(firestore.SubscriberConfig{
ProjectID: projectID,
}, logger)
msgs, err := subscriber.Subscribe(ctx, topic)
require.NoError(t, err)

msg := message.NewMessage(watermill.NewUUID(), message.Payload("payload"))
msg.Metadata = message.Metadata{"key": "value"}

err = client.RunTransaction(ctx, func(ctx context.Context, tx *stdFirestore.Transaction) error {
publisher, err := firestore.NewTransactionalPublisher(firestore.PublisherConfig{
ProjectID: projectID,
}, tx, logger)
require.NoError(t, err)

err = publisher.Publish(topic, msg)
require.NoError(t, err)

return nil
})
require.NoError(t, err)

receivedMsg := <-msgs
require.NotNil(t, receivedMsg)
assert.Equal(t, msg.UUID, receivedMsg.UUID)
assert.Equal(t, msg.Payload, receivedMsg.Payload)
assert.Equal(t, msg.Metadata, receivedMsg.Metadata)
}

0 comments on commit 6b4ba83

Please sign in to comment.