Skip to content

Commit

Permalink
Added simple flow control test for protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
taras-doba-ua committed Oct 4, 2015
1 parent 74dcdbe commit 0934937
Showing 1 changed file with 48 additions and 0 deletions.
48 changes: 48 additions & 0 deletions test/protocol_tests.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from unittest import mock
import contexts
import asynqp
import asyncio
import socket
from asynqp import spec
from asynqp import protocol
from asynqp.exceptions import ConnectionLostError
Expand Down Expand Up @@ -155,3 +157,49 @@ def it_should_raise_a_connection_lost_error(self):

def cleanup(self):
self.loop.set_exception_handler(testing_exception_handler)


class WhenWritingAboveLimit:

DATA_LEN = 10 * 1024 * 1024 # 1Mb should be enough I think

def given_I_have_a_connection_with_low_water(self):
self.loop = asyncio.get_event_loop()

# Bind any free port
self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_sock.bind(("127.0.0.1", 0))
self.port = self.server_sock.getsockname()[1]

# Listen on bound socket. Note: we set read limit se we hit write limit
# on the other side
self.loop.run_until_complete(
asyncio.start_server(
self._connected, sock=self.server_sock, loop=self.loop,
limit=100))

self.transport, self.protocol = self.loop.run_until_complete(
self.loop.create_connection(
lambda: protocol.AMQP(mock.Mock(), self.loop),
host="127.0.0.1", port=self.port))
self.transport.set_write_buffer_limits(high=0)

def _connected(self, r, w):
self.reader = r
self.writer = w

def when_we_many_bytes(self):
data = b'x' * self.DATA_LEN
self.transport.write(data)

def it_should_pause_writing_correctly(self):
assert self.protocol._paused
# Launch reader
fut = asyncio.async(
self.reader.readexactly(self.DATA_LEN), loop=self.loop)
# Wait for client transport to drain
self.loop.run_until_complete(self.protocol._drain_helper())
assert not self.protocol._paused
# Destroy reader task
fut.cancel()
del fut

0 comments on commit 0934937

Please sign in to comment.