Skip to content

Commit

Permalink
Use fake time in RTN23 test.
Browse files Browse the repository at this point in the history
Previous way was flaky due to the artificially short timeouts:

--- FAIL: TestRealtimeConn_RTN23 (0.00s)
    realtime_conn_spec_test.go:1764: [ErrorInfo :timeout code=80003 disconnected statusCode=0] See https://help.ably.io/error/80003
  • Loading branch information
tcard committed Nov 9, 2020
1 parent 4c5bd3d commit eba16f1
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 18 deletions.
18 changes: 12 additions & 6 deletions ably/ablytest/ablytest.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,18 @@ func TimeFuncs(afterCalls chan<- AfterCall) (

timeUpdate := make(chan time.Time, 1)
go func() {
mtx.Lock()
t := currentTime
mtx.Unlock()

select {
case afterCalls <- AfterCall{Ctx: ctx, D: d, Deadline: t.Add(d), Time: timeUpdate}:
case <-ctx.Done():
// This allows tests to ignore a call if they expect the timer to
// be cancelled.
return
}

select {
case <-ctx.Done():
close(ch)
Expand All @@ -178,12 +190,6 @@ func TimeFuncs(afterCalls chan<- AfterCall) (
}
}()

mtx.Lock()
t := currentTime
mtx.Unlock()

afterCalls <- AfterCall{Ctx: ctx, D: d, Deadline: t.Add(d), Time: timeUpdate}

return ch
}

Expand Down
28 changes: 22 additions & 6 deletions ably/ablytest/recorders.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,22 @@ func MessagePipeWithNowFunc(now func() time.Time) MessagePipeOption {
}
}

// MessagePipeWithAfterFunc sets a function to get a timer. This timer
// will be used to determine whether a Receive times out.
//
// If not set, receives won't timeout.
func MessagePipeWithAfterFunc(after func(context.Context, time.Duration) <-chan time.Time) MessagePipeOption {
return func(pc *pipeConn) {
pc.after = after
}
}

func MessagePipe(in <-chan *proto.ProtocolMessage, out chan<- *proto.ProtocolMessage, opts ...MessagePipeOption) func(string, *url.URL, time.Duration) (proto.Conn, error) {
return func(proto string, u *url.URL, timeout time.Duration) (proto.Conn, error) {
pc := pipeConn{
in: in,
out: out,
in: in,
out: out,
after: ablyutil.After,
}
for _, opt := range opts {
opt(&pc)
Expand All @@ -223,9 +234,10 @@ func MessagePipe(in <-chan *proto.ProtocolMessage, out chan<- *proto.ProtocolMes
}

type pipeConn struct {
in <-chan *proto.ProtocolMessage
out chan<- *proto.ProtocolMessage
now func() time.Time
in <-chan *proto.ProtocolMessage
out chan<- *proto.ProtocolMessage
now func() time.Time
after func(context.Context, time.Duration) <-chan time.Time
}

func (pc pipeConn) Send(msg *proto.ProtocolMessage) error {
Expand All @@ -234,10 +246,14 @@ func (pc pipeConn) Send(msg *proto.ProtocolMessage) error {
}

func (pc pipeConn) Receive(deadline time.Time) (*proto.ProtocolMessage, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var timeout <-chan time.Time
if pc.now != nil {
timeout = time.After(deadline.Sub(pc.now()))
timeout = pc.after(ctx, deadline.Sub(pc.now()))
}

select {
case m, ok := <-pc.in:
if !ok || m == nil {
Expand Down
37 changes: 31 additions & 6 deletions ably/realtime_conn_spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1732,17 +1732,22 @@ func TestRealtimeConn_RTN23(t *testing.T) {

connDetails := proto.ConnectionDetails{
ConnectionKey: "foo",
ConnectionStateTTL: proto.DurationFromMsecs(time.Millisecond * 20),
MaxIdleInterval: proto.DurationFromMsecs(time.Millisecond * 5),
ConnectionStateTTL: proto.DurationFromMsecs(time.Minute * 20),
MaxIdleInterval: proto.DurationFromMsecs(time.Minute * 5),
}

afterCalls := make(chan ablytest.AfterCall)
now, after := ablytest.TimeFuncs(afterCalls)

dials := make(chan *url.URL, 1)
var in chan *proto.ProtocolMessage
realtimeRequestTimeout := time.Millisecond
realtimeRequestTimeout := time.Minute
c, _ := ably.NewRealtime(
ably.WithAutoConnect(false),
ably.WithToken("fake:token"),
ably.WithRealtimeRequestTimeout(realtimeRequestTimeout),
ably.WithNow(now),
ably.WithAfter(after),
ably.WithDial(func(p string, u *url.URL, timeout time.Duration) (proto.Conn, error) {
in = make(chan *proto.ProtocolMessage, 1)
in <- &proto.ProtocolMessage{
Expand All @@ -1752,7 +1757,8 @@ func TestRealtimeConn_RTN23(t *testing.T) {
}
dials <- u
return ablytest.MessagePipe(in, nil,
ablytest.MessagePipeWithNowFunc(time.Now),
ablytest.MessagePipeWithNowFunc(now),
ablytest.MessagePipeWithAfterFunc(after),
)(p, u, timeout)
}))
disconnected := make(chan *ably.ErrorInfo, 1)
Expand All @@ -1763,6 +1769,7 @@ func TestRealtimeConn_RTN23(t *testing.T) {
if err != nil {
t.Fatal(err)
}

var dialed *url.URL
ablytest.Instantly.Recv(t, &dialed, dials, t.Fatalf)
// RTN23b
Expand All @@ -1774,11 +1781,29 @@ func TestRealtimeConn_RTN23(t *testing.T) {
maxIdleInterval := time.Duration(connDetails.MaxIdleInterval)
receiveTimeout := realtimeRequestTimeout + maxIdleInterval

// Expect a timer for a message receive.
var timer ablytest.AfterCall
ablytest.Instantly.Recv(t, &timer, afterCalls, t.Fatalf)
if expected, got := receiveTimeout, timer.D; expected != got {
t.Fatalf("expected %v, got %v", expected, got)
}

in <- &proto.ProtocolMessage{
Action: proto.ActionHeartbeat,
}
// The connection should not disconnect as we received the heartbeat message
ablytest.Before(receiveTimeout).NoRecv(t, nil, disconnected, t.Fatalf)

// An incoming message should cancel the timer and prevent a disconnection.
ablytest.Instantly.Recv(t, nil, timer.Ctx.Done(), t.Fatalf)
ablytest.Instantly.NoRecv(t, nil, disconnected, t.Fatalf)

// Expect another timer for a message receive.
ablytest.Instantly.Recv(t, &timer, afterCalls, t.Fatalf)
if expected, got := receiveTimeout, timer.D; expected != got {
t.Fatalf("expected %v, got %v", expected, got)
}

// Let the deadline pass without a message; expect a disconnection.
timer.Fire()

// RTN23a The connection should be disconnected due to lack of activity past
// receiveTimeout
Expand Down

0 comments on commit eba16f1

Please sign in to comment.