Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Major fix: Send timeouts #124

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/ZeroMQ.AcceptanceTests/SocketFixtures.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,17 @@ public void Initialize()

_senderThread = new Thread(() =>
{
SenderInit(Sender);
Sender.SendHighWatermark = 1;
SenderInit(Sender);
_receiverReady.WaitOne();
Sender.Connect("inproc://spec_context");
SenderAction(Sender);
});

_receiverThread = new Thread(() =>
{
ReceiverInit(Receiver);
Receiver.SendHighWatermark = 1;
ReceiverInit(Receiver);
Receiver.Bind("inproc://spec_context");
_receiverReady.Set();
ReceiverAction(Receiver);
Expand Down Expand Up @@ -170,4 +170,9 @@ public class UsingThreadedPubSub : UsingThreadedSocketPair
{
public UsingThreadedPubSub() : base(SocketType.PUB, SocketType.SUB) { }
}

public class UsingThreadedPushPull : UsingThreadedSocketPair
{
public UsingThreadedPushPull() : base(SocketType.PUSH, SocketType.PULL) { }
}
}
79 changes: 79 additions & 0 deletions src/ZeroMQ.AcceptanceTests/ZmqSocketTests/Send_Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,5 +203,84 @@ public WhenTransferringWithAPreallocatedReceiveBuffer()
};
}
}

public class WhenTransferringWithAnAmpleSendTimeout : UsingThreadedPushPull
{
private const int SendMessageCount = 5;
private int _receivedMessageCount;

public WhenTransferringWithAnAmpleSendTimeout()
{
SenderInit = push => push.SendHighWatermark = 1;
SenderAction = push =>
{
for (int i = 0; i < SendMessageCount; i++)
{
push.SendFrame(Messages.SingleMessage, TimeSpan.FromMilliseconds(500));
}
};

// slow receiver with small HighWatermark
ReceiverInit = pull => pull.ReceiveHighWatermark = 1;
ReceiverAction = pull =>
{
for (int i = 0; i < SendMessageCount; i++)
{
Thread.Sleep(50);

int size;
pull.Receive(null, SocketFlags.DontWait, out size);
if (pull.ReceiveStatus == ReceiveStatus.Received)
_receivedMessageCount++;
}
};
}

[Test]
public void ShouldReceiveAllMessages()
{
Assert.AreEqual(SendMessageCount, _receivedMessageCount);
}
}

public class WhenTransferringWithAnInsufficientSendTimeout : UsingThreadedPushPull
{
private const int SendMessageCount = 5;
private const int SendHighWatermark = 1;
private const int ReceiveHighWatermark = 1;
private int _receivedMessageCount;

public WhenTransferringWithAnInsufficientSendTimeout()
{
SenderInit = push => push.SendHighWatermark = SendHighWatermark;
SenderAction = push =>
{
for (int i = 0; i < SendMessageCount; i++)
{
push.SendFrame(Messages.SingleMessage, TimeSpan.FromMilliseconds(5));
}
};

// slow receiver with small HighWatermark
ReceiverInit = pull => pull.ReceiveHighWatermark = ReceiveHighWatermark;
ReceiverAction = pull =>
{
Thread.Sleep(50);

for (int i = 0; i < SendMessageCount; i++)
{
var frame = pull.ReceiveFrame(TimeSpan.FromMilliseconds(10));
if (frame.ReceiveStatus == ReceiveStatus.Received)
_receivedMessageCount++;
}
};
}

[Test]
public void ShouldReceiveHighWatermarkMessages()
{
Assert.AreEqual(SendHighWatermark + ReceiveHighWatermark, _receivedMessageCount);
}
}
}
}
8 changes: 4 additions & 4 deletions src/ZeroMQ/ExecuteExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ internal static class ExecuteExtensions
{
public delegate TResult ThirdParamOut<in T1, in T2, T3, out TResult>(T1 arg1, T2 arg2, out T3 arg3);

public static TResult WithTimeout<T1, T2, TResult>(this ZmqSocket socket, Func<T1, T2, TResult> method, T1 arg1, T2 arg2, TimeSpan timeout)
public static TResult WithReceiveTimeout<T1, T2, TResult>(this ZmqSocket socket, Func<T1, T2, TResult> method, T1 arg1, T2 arg2, TimeSpan timeout)
{
if ((int)timeout.TotalMilliseconds < 1)
{
Expand Down Expand Up @@ -37,7 +37,7 @@ public static TResult WithTimeout<T1, T2, TResult>(this ZmqSocket socket, Func<T
return receiveResult;
}

public static TResult WithTimeout<T1, T2, T3, TResult>(this ZmqSocket socket, Func<T1, T2, T3, TResult> method, T1 arg1, T2 arg2, T3 arg3, TimeSpan timeout)
public static TResult WithSendTimeout<T1, T2, T3, TResult>(this ZmqSocket socket, Func<T1, T2, T3, TResult> method, T1 arg1, T2 arg2, T3 arg3, TimeSpan timeout)
{
if ((int)timeout.TotalMilliseconds < 1)
{
Expand All @@ -53,7 +53,7 @@ public static TResult WithTimeout<T1, T2, T3, TResult>(this ZmqSocket socket, Fu
{
receiveResult = method(arg1, arg2, arg3);

if (socket.ReceiveStatus != ReceiveStatus.TryAgain)
if (socket.SendStatus != SendStatus.TryAgain)
{
break;
}
Expand All @@ -65,7 +65,7 @@ public static TResult WithTimeout<T1, T2, T3, TResult>(this ZmqSocket socket, Fu
return receiveResult;
}

public static TResult WithTimeout<T1, T2, T3, TResult>(this ZmqSocket socket, ThirdParamOut<T1, T2, T3, TResult> method, T1 arg1, T2 arg2, out T3 arg3, TimeSpan timeout)
public static TResult WithReceiveTimeout<T1, T2, T3, TResult>(this ZmqSocket socket, ThirdParamOut<T1, T2, T3, TResult> method, T1 arg1, T2 arg2, out T3 arg3, TimeSpan timeout)
{
if ((int)timeout.TotalMilliseconds < 1)
{
Expand Down
6 changes: 3 additions & 3 deletions src/ZeroMQ/ZmqSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ public int Receive(byte[] buffer, TimeSpan timeout)
{
return timeout == TimeSpan.MaxValue
? Receive(buffer)
: this.WithTimeout(Receive, buffer, SocketFlags.DontWait, timeout);
: this.WithReceiveTimeout(Receive, buffer, SocketFlags.DontWait, timeout);
}

/// <summary>
Expand Down Expand Up @@ -648,7 +648,7 @@ public byte[] Receive(byte[] buffer, TimeSpan timeout, out int size)
}

int receivedBytes;
byte[] message = this.WithTimeout(Receive, buffer, SocketFlags.DontWait, out receivedBytes, timeout);
byte[] message = this.WithReceiveTimeout(Receive, buffer, SocketFlags.DontWait, out receivedBytes, timeout);

size = receivedBytes;

Expand Down Expand Up @@ -782,7 +782,7 @@ public int Send(byte[] buffer, int size, SocketFlags flags, TimeSpan timeout)
{
return timeout == TimeSpan.MaxValue
? Send(buffer, size, flags & ~SocketFlags.DontWait)
: this.WithTimeout(Send, buffer, size, flags | SocketFlags.DontWait, timeout);
: this.WithSendTimeout(Send, buffer, size, flags | SocketFlags.DontWait, timeout);
}

/// <summary>
Expand Down