Skip to content

Commit

Permalink
Merge pull request #208 from ably/feature/rtl6c-queue-messages
Browse files Browse the repository at this point in the history
RTL6c: Publish while not connected
  • Loading branch information
tcard authored Nov 9, 2020
2 parents 48ff15f + bdb4f30 commit 96bc562
Show file tree
Hide file tree
Showing 18 changed files with 1,344 additions and 266 deletions.
12 changes: 12 additions & 0 deletions ably/ably_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ func safeclose(t *testing.T, closers ...io.Closer) {
}
}

type closeFunc func() error

func (f closeFunc) Close() error {
return f()
}

func checkError(code ably.ErrorCode, err error) error {
switch e, ok := err.(*ably.ErrorInfo); {
case !ok:
Expand All @@ -75,3 +81,9 @@ func init() {
return ably.ApplyOptionsWithDefaults(o...).HTTPClient
}
}

type messages chan *ably.Message

func (ms messages) Receive(m *ably.Message) {
ms <- m
}
70 changes: 70 additions & 0 deletions ably/ablytest/ablytest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"reflect"
"strconv"
"sync"
"time"

"github.com/ably/ably-go/ably"
Expand Down Expand Up @@ -125,3 +126,72 @@ func ReceivePresenceMessages(channel *ably.RealtimeChannel, action *ably.Presenc
}
return ch, unsubscribe, err
}

type AfterCall struct {
Ctx context.Context
D time.Duration
Deadline time.Time
Time chan<- time.Time
}

func (c AfterCall) Fire() {
c.Time <- c.Deadline
}

// TimeFuncs returns time functions to be passed as options.
//
// Now returns a stable time that is only updated with the times that the
// returned After produces.
//
// After forwards calls to the given channel. The receiver is in charge of
// sending the resulting time to the AfterCall.Time channel.
func TimeFuncs(afterCalls chan<- AfterCall) (
now func() time.Time,
after func(context.Context, time.Duration) <-chan time.Time,
) {
var mtx sync.Mutex
currentTime := time.Now()
now = func() time.Time {
mtx.Lock()
defer mtx.Unlock()
return currentTime
}

after = func(ctx context.Context, d time.Duration) <-chan time.Time {
ch := make(chan time.Time, 1)

timeUpdate := make(chan time.Time, 1)
go func() {
mtx.Lock()
t := currentTime
mtx.Unlock()

select {
case afterCalls <- AfterCall{Ctx: ctx, D: d, Deadline: t.Add(d), Time: timeUpdate}:
case <-ctx.Done():
// This allows tests to ignore a call if they expect the timer to
// be cancelled.
return
}

select {
case <-ctx.Done():
close(ch)

case t, ok := <-timeUpdate:
if !ok {
close(ch)
return
}
mtx.Lock()
currentTime = t
mtx.Unlock()
ch <- t
}
}()

return ch
}

return now, after
}
95 changes: 85 additions & 10 deletions ably/ablytest/recorders.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,22 @@ func MessagePipeWithNowFunc(now func() time.Time) MessagePipeOption {
}
}

// MessagePipeWithAfterFunc sets a function to get a timer. This timer
// will be used to determine whether a Receive times out.
//
// If not set, receives won't timeout.
func MessagePipeWithAfterFunc(after func(context.Context, time.Duration) <-chan time.Time) MessagePipeOption {
return func(pc *pipeConn) {
pc.after = after
}
}

func MessagePipe(in <-chan *proto.ProtocolMessage, out chan<- *proto.ProtocolMessage, opts ...MessagePipeOption) func(string, *url.URL, time.Duration) (proto.Conn, error) {
return func(proto string, u *url.URL, timeout time.Duration) (proto.Conn, error) {
pc := pipeConn{
in: in,
out: out,
in: in,
out: out,
after: ablyutil.After,
}
for _, opt := range opts {
opt(&pc)
Expand All @@ -223,9 +234,10 @@ func MessagePipe(in <-chan *proto.ProtocolMessage, out chan<- *proto.ProtocolMes
}

type pipeConn struct {
in <-chan *proto.ProtocolMessage
out chan<- *proto.ProtocolMessage
now func() time.Time
in <-chan *proto.ProtocolMessage
out chan<- *proto.ProtocolMessage
now func() time.Time
after func(context.Context, time.Duration) <-chan time.Time
}

func (pc pipeConn) Send(msg *proto.ProtocolMessage) error {
Expand All @@ -234,10 +246,14 @@ func (pc pipeConn) Send(msg *proto.ProtocolMessage) error {
}

func (pc pipeConn) Receive(deadline time.Time) (*proto.ProtocolMessage, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var timeout <-chan time.Time
if pc.now != nil {
timeout = time.After(deadline.Sub(pc.now()))
timeout = pc.after(ctx, deadline.Sub(pc.now()))
}

select {
case m, ok := <-pc.in:
if !ok || m == nil {
Expand Down Expand Up @@ -498,6 +514,69 @@ func (c connWithFakeDisconnect) Close() error {
return c.conn.Close()
}

// DialIntercept returns a DialFunc and an intercept function that, when called,
// makes the processing of the next received protocol message with any of the given
// actions. The processing remains blocked until the passed context expires. The
// intercepted message is sent to the returned channel.
func DialIntercept(dial DialFunc) (_ DialFunc, intercept func(context.Context, ...proto.Action) <-chan *proto.ProtocolMessage) {
active := &activeIntercept{}

intercept = func(ctx context.Context, actions ...proto.Action) <-chan *proto.ProtocolMessage {
msg := make(chan *proto.ProtocolMessage, 1)
active.Lock()
defer active.Unlock()
active.ctx = ctx
active.actions = actions
active.msg = msg
return msg
}

return func(proto string, url *url.URL, timeout time.Duration) (proto.Conn, error) {
conn, err := dial(proto, url, timeout)
if err != nil {
return nil, err
}
return interceptConn{conn, active}, nil
}, intercept
}

type activeIntercept struct {
sync.Mutex
ctx context.Context
actions []proto.Action
msg chan<- *proto.ProtocolMessage
}

type interceptConn struct {
proto.Conn
active *activeIntercept
}

func (c interceptConn) Receive(deadline time.Time) (*proto.ProtocolMessage, error) {
msg, err := c.Conn.Receive(deadline)
if err != nil {
return nil, err
}

c.active.Lock()
defer c.active.Unlock()

if c.active.msg == nil {
return msg, err
}

for _, a := range c.active.actions {
if msg.Action == a {
c.active.msg <- msg
c.active.msg = nil
<-c.active.ctx.Done()
break
}
}

return msg, err
}

// FullRealtimeCloser returns an io.Closer that, on Close, calls Close on the
// Realtime instance and waits for its effects.
func FullRealtimeCloser(c *ably.Realtime) io.Closer {
Expand All @@ -515,10 +594,6 @@ func (c realtimeIOCloser) Close() error {
ably.ConnectionStateClosed,
ably.ConnectionStateFailed:

err := c.c.Connection.ErrorReason()
if err != nil {
return err
}
return nil
}

Expand Down
29 changes: 19 additions & 10 deletions ably/ablytest/resultgroup.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ablytest

import (
"context"
"errors"
"fmt"
"sync"
Expand All @@ -15,7 +16,7 @@ func Wait(res ably.Result, err error) error {
}
errch := make(chan error)
go func() {
errch <- res.Wait()
errch <- res.Wait(context.Background())
}()
select {
case err := <-errch:
Expand Down Expand Up @@ -54,7 +55,7 @@ func (rg *ResultGroup) Add(res ably.Result, err error) {
}
rg.wg.Add(1)
go func() {
err := res.Wait()
err := res.Wait(context.Background())
if err != nil && err != (*ably.ErrorInfo)(nil) {
select {
case rg.errch <- err:
Expand Down Expand Up @@ -100,15 +101,18 @@ func ConnWaiter(client *ably.Realtime, do func(), expectedEvent ...ably.Connecti
if errInfo := client.Connection.ErrorReason(); errInfo != nil {
err = errInfo
}
return ResultFunc(func() error { return err })
return ResultFunc(func(context.Context) error { return err })
}
}
return ResultFunc(func() error {
return ResultFunc(func(ctx context.Context) error {
defer off()
timer := time.After(Timeout)

for {
select {
case <-ctx.Done():
return ctx.Err()

case <-timer:
return fmt.Errorf("timeout waiting for event %v", expectedEvent)

Expand All @@ -132,18 +136,23 @@ func ConnWaiter(client *ably.Realtime, do func(), expectedEvent ...ably.Connecti
})
}

type ResultFunc func() error
type ResultFunc func(context.Context) error

func (f ResultFunc) Wait() error {
return f()
func (f ResultFunc) Wait(ctx context.Context) error {
return f(ctx)
}

func (f ResultFunc) Go() ably.Result {
err := make(chan error, 1)
go func() {
err <- f()
err <- f(context.Background())
}()
return ResultFunc(func() error {
return <-err
return ResultFunc(func(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-err:
return err
}
})
}
6 changes: 3 additions & 3 deletions ably/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ func TestAuth_RequestToken_PublishClientID(t *testing.T) {
}
client := app.NewRealtime(opts...)
defer safeclose(t, ablytest.FullRealtimeCloser(client))
if err = ablytest.ConnWaiter(client, client.Connect, ably.ConnectionEventConnected).Wait(); err != nil {
if err = ablytest.Wait(ablytest.ConnWaiter(client, client.Connect, ably.ConnectionEventConnected), nil); err != nil {
t.Fatalf("Connect(): want err == nil got err=%v", err)
}
if id := client.Auth.ClientID(); id != cas.clientID {
Expand Down Expand Up @@ -664,7 +664,7 @@ func TestAuth_ClientID(t *testing.T) {
if id := client.Auth.ClientID(); id != "" {
t.Fatalf("want clientID to be empty; got %q", id)
}
if err := ablytest.ConnWaiter(client, client.Connect, ably.ConnectionEventConnected).Wait(); err != nil {
if err := ablytest.Wait(ablytest.ConnWaiter(client, client.Connect, ably.ConnectionEventConnected), nil); err != nil {
t.Fatalf("Connect()=%v", err)
}
if id := client.Auth.ClientID(); id != connected.ConnectionDetails.ClientID {
Expand Down Expand Up @@ -802,7 +802,7 @@ func TestAuth_RealtimeAccessToken(t *testing.T) {
app, client := ablytest.NewRealtime(opts...)
defer safeclose(t, app)

if err := ablytest.ConnWaiter(client, client.Connect, ably.ConnectionEventConnected).Wait(); err != nil {
if err := ablytest.Wait(ablytest.ConnWaiter(client, client.Connect, ably.ConnectionEventConnected), nil); err != nil {
t.Fatalf("Connect()=%v", err)
}
if err := client.Channels.Get("test").Publish(context.Background(), "name", "value"); err != nil {
Expand Down
26 changes: 25 additions & 1 deletion ably/internal/ablyutil/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func After(ctx context.Context, d time.Duration) <-chan time.Time {
timer := time.NewTimer(d)

ch := make(chan time.Time)
ch := make(chan time.Time, 1)

go func() {
defer timer.Stop()
Expand All @@ -25,3 +25,27 @@ func After(ctx context.Context, d time.Duration) <-chan time.Time {

return ch
}

type TimerFunc func(context.Context, time.Duration) <-chan time.Time

// NewTicker repeatedly calls the given TimerFunc, which should behave like
// After, until the context it cancelled. It returns a channel to which it sends
// every value produced by the TimerFunc.
func NewTicker(after TimerFunc) TimerFunc {
return func(ctx context.Context, d time.Duration) <-chan time.Time {
ch := make(chan time.Time, 1)

go func() {
for {
t, ok := <-after(ctx, d)
if !ok {
close(ch)
return
}
ch <- t
}
}()

return ch
}
}
Loading

0 comments on commit 96bc562

Please sign in to comment.