From 9ff538c9709abb7925319d20da2c51b5a091d77f Mon Sep 17 00:00:00 2001 From: Taras Date: Sun, 4 Oct 2015 21:10:12 +0300 Subject: [PATCH] Another test for flow control --- test/protocol_tests.py | 60 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/test/protocol_tests.py b/test/protocol_tests.py index 3b2167d..b2a408d 100644 --- a/test/protocol_tests.py +++ b/test/protocol_tests.py @@ -161,7 +161,7 @@ def cleanup(self): class WhenWritingAboveLimit: - DATA_LEN = 10 * 1024 * 1024 # 1Mb should be enough I think + DATA_LEN = 10 * 1024 * 1024 # 10Mb should be enough I think def given_I_have_a_connection_with_low_water(self): self.loop = asyncio.get_event_loop() @@ -203,3 +203,61 @@ def it_should_pause_writing_correctly(self): # Destroy reader task fut.cancel() del fut + + +class WhenDisconnectedWritingAboveLimit: + + DATA_LEN = 10 * 1024 * 1024 # 10Mb should be enough I think + + def given_I_have_a_connection_with_low_water(self): + self.loop = asyncio.get_event_loop() + + # Bind any free port + self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server_sock.bind(("127.0.0.1", 0)) + self.port = self.server_sock.getsockname()[1] + + # Listen on bound socket. Note: we set read limit se we hit write limit + # on the other side + self.loop.run_until_complete( + asyncio.start_server( + self._connected, sock=self.server_sock, loop=self.loop, + limit=100)) + + self.transport, self.protocol = self.loop.run_until_complete( + self.loop.create_connection( + lambda: protocol.AMQP(mock.Mock(), self.loop), + host="127.0.0.1", port=self.port)) + self.transport.set_write_buffer_limits(high=0) + + def _connected(self, r, w): + self.reader = r + self.writer = w + + def when_we_many_bytes_and_disconnect(self): + data = b'x' * self.DATA_LEN + self.transport.write(data) + # Set up a waiter + self.waiter = asyncio.async( + self.protocol._drain_helper(), loop=self.loop) + self.writer.transport.close() + try: + self.loop.run_until_complete(self.waiter) + except ConnectionResetError: + pass + + def it_should_raise_an_exception_on_new_drain(self): + raised = False + try: + self.loop.run_until_complete(self.protocol._drain_helper()) + except ConnectionResetError: + raised = True + assert raised + + def it_should_raise_an_exception_on_old_drain(self): + raised = False + try: + self.waiter.result() + except ConnectionResetError: + raised = True + assert raised