diff --git a/README.md b/README.md index f72632e..9a7b37d 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,7 @@ mess with most of the low-level details. * [Browser](#browser) * [Methods](#methods) * [Promises](#promises) + * [Cancellation](#cancellation) * [Blocking](#blocking) * [Streaming](#streaming) * [submit()](#submit) @@ -140,7 +141,22 @@ You may also want to look into the [streaming API](#streaming): * If you're dealing with lots of concurrent requests (100+) or * If you want to process individual data chunks as they happen (without having to wait for the full response body) or * If you're expecting a big response body size (1 MiB or more, for example when downloading binary files) or -* If you're unsure about the response body size (better be safe than sorry when accessing arbitrary remote HTTP endpoints and the response body size is unknown in advance). +* If you're unsure about the response body size (better be safe than sorry when accessing arbitrary remote HTTP endpoints and the response body size is unknown in advance). + +#### Cancellation + +The returned Promise is implemented in such a way that it can be cancelled +when it is still pending. +Cancelling a pending promise will reject its value with an Exception and +clean up any underlying resources. + +```php +$promise = $browser->get($url); + +$loop->addTimer(2.0, function () use ($promise) { + $promise->cancel(); +}); +``` #### Blocking diff --git a/composer.json b/composer.json index f1e0b03..adf5926 100644 --- a/composer.json +++ b/composer.json @@ -17,7 +17,8 @@ "php": ">=5.4", "react/event-loop": "^0.4 || ^0.3", "react/http-client": "^0.5", - "react/promise": "^2 || ^1.1", + "react/promise": "^2.2.1", + "react/promise-stream": "^0.1.1", "react/socket": "^0.8", "react/socket-client": "^0.7 || ^0.6", "react/stream": "^0.6 || ^0.5 || ^0.4.6", @@ -28,7 +29,6 @@ "clue/block-react": "^1.0", "clue/socks-react": "^0.8 || ^0.7", "phpunit/phpunit": "^4.5", - "react/http": "^0.7.2", - "react/promise-stream": "^0.1.1" + "react/http": "^0.7.2" } } diff --git a/src/Io/Sender.php b/src/Io/Sender.php index bee0cc5..15e10c5 100644 --- a/src/Io/Sender.php +++ b/src/Io/Sender.php @@ -142,10 +142,14 @@ public function send(RequestInterface $request, MessageFactory $messageFactory) $headers[$name] = implode(', ', $values); } - $deferred = new Deferred(); - $requestStream = $this->http->request($request->getMethod(), (string)$uri, $headers, $request->getProtocolVersion()); + $deferred = new Deferred(function ($_, $reject) use ($requestStream) { + // close request stream if request is canceled + $reject(new \RuntimeException('Request canceled')); + $requestStream->close(); + }); + $requestStream->on('error', function($error) use ($deferred) { $deferred->reject($error); }); diff --git a/src/Io/Transaction.php b/src/Io/Transaction.php index 1759579..4ec3b1e 100644 --- a/src/Io/Transaction.php +++ b/src/Io/Transaction.php @@ -2,18 +2,16 @@ namespace Clue\React\Buzz\Io; -use Psr\Http\Message\RequestInterface; -use Psr\Http\Message\ResponseInterface; -use Exception; use Clue\React\Buzz\Browser; -use React\HttpClient\Client as HttpClient; use Clue\React\Buzz\Io\Sender; use Clue\React\Buzz\Message\ResponseException; use Clue\React\Buzz\Message\MessageFactory; -use Clue\React\Buzz\Message\BufferedResponse; -use React\Stream\BufferedSink; -use React\Stream\ReadableStreamInterface; +use Psr\Http\Message\RequestInterface; +use Psr\Http\Message\ResponseInterface; use React\Promise; +use React\Promise\Stream; +use React\Stream\ReadableStreamInterface; +use Exception; /** * @internal @@ -71,9 +69,6 @@ protected function next(RequestInterface $request) return $promise->then( function (ResponseInterface $response) use ($request, $that) { return $that->onResponse($response, $request); - }, - function ($error) use ($request, $that) { - return $that->onError($error, $request); } ); } @@ -94,9 +89,17 @@ public function bufferResponse(ResponseInterface $response) // buffer stream and resolve with buffered body $messageFactory = $this->messageFactory; - return BufferedSink::createPromise($stream)->then(function ($body) use ($response, $messageFactory) { - return $response->withBody($messageFactory->body($body)); - }); + return Stream\buffer($stream)->then( + function ($body) use ($response, $messageFactory) { + return $response->withBody($messageFactory->body($body)); + }, + function ($e) use ($stream) { + // try to close stream if buffering fails (or is cancelled) + $stream->close(); + + throw $e; + } + ); } /** @@ -123,19 +126,6 @@ public function onResponse(ResponseInterface $response, RequestInterface $reques return $response; } - /** - * @internal - * @param Exception $error - * @param RequestInterface $request - * @throws Exception - */ - public function onError(Exception $error, RequestInterface $request) - { - $this->progress('error', array($error, $request)); - - throw $error; - } - private function onResponseRedirect(ResponseInterface $response, RequestInterface $request) { // resolve location relative to last request URI diff --git a/tests/FunctionalBrowserTest.php b/tests/FunctionalBrowserTest.php index 841d1fb..bbe352a 100644 --- a/tests/FunctionalBrowserTest.php +++ b/tests/FunctionalBrowserTest.php @@ -34,6 +34,18 @@ public function testSimpleRequest() Block\await($this->browser->get($this->base . 'get'), $this->loop); } + /** + * @expectedException RuntimeException + * @group online + */ + public function testCancelPromiseWillRejectRequest() + { + $promise = $this->browser->get($this->base . 'get'); + $promise->cancel(); + + Block\await($promise, $this->loop); + } + /** @group online */ public function testRedirectRequestRelative() { diff --git a/tests/Io/SenderTest.php b/tests/Io/SenderTest.php index 96d0bc6..d2f18af 100644 --- a/tests/Io/SenderTest.php +++ b/tests/Io/SenderTest.php @@ -69,6 +69,45 @@ public function testSenderLegacyConnectorRejection() Block\await($promise, $this->loop); } + public function testCancelRequestWillCancelConnector() + { + $promise = new \React\Promise\Promise(function () { }, function () { + throw new \RuntimeException(); + }); + + $connector = $this->getMock('React\Socket\ConnectorInterface'); + $connector->expects($this->once())->method('connect')->willReturn($promise); + + $sender = new Sender(new HttpClient($this->loop, $connector)); + + $request = new Request('GET', 'http://www.google.com/'); + + $promise = $sender->send($request, $this->getMock('Clue\React\Buzz\Message\MessageFactory')); + $promise->cancel(); + + $this->setExpectedException('RuntimeException'); + Block\await($promise, $this->loop); + } + + public function testCancelRequestWillCloseConnection() + { + $connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $connection->expects($this->once())->method('close'); + + $connector = $this->getMock('React\Socket\ConnectorInterface'); + $connector->expects($this->once())->method('connect')->willReturn(Promise\resolve($connection)); + + $sender = new Sender(new HttpClient($this->loop, $connector)); + + $request = new Request('GET', 'http://www.google.com/'); + + $promise = $sender->send($request, $this->getMock('Clue\React\Buzz\Message\MessageFactory')); + $promise->cancel(); + + $this->setExpectedException('RuntimeException'); + Block\await($promise, $this->loop); + } + public function provideRequestProtocolVersion() { return array( diff --git a/tests/Io/TransactionTest.php b/tests/Io/TransactionTest.php index b497636..4e7f72d 100644 --- a/tests/Io/TransactionTest.php +++ b/tests/Io/TransactionTest.php @@ -59,6 +59,32 @@ public function testReceivingStreamingBodyWillResolveWithBufferedResponseByDefau $this->assertEquals('hello world', (string)$response->getBody()); } + /** + * @expectedException RuntimeException + */ + public function testCancelBufferingResponseWillCloseStreamAndReject() + { + $messageFactory = new MessageFactory(); + $loop = Factory::create(); + + $stream = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); + $stream->expects($this->any())->method('isReadable')->willReturn(true); + $stream->expects($this->once())->method('close'); + + $request = $this->getMock('Psr\Http\Message\RequestInterface'); + $response = $messageFactory->response(1.0, 200, 'OK', array(), $stream); + + // mock sender to resolve promise with the given $response in response to the given $request + $sender = $this->getMockBuilder('Clue\React\Buzz\Io\Sender')->disableOriginalConstructor()->getMock(); + $sender->expects($this->once())->method('send')->with($this->equalTo($request))->willReturn(Promise\resolve($response)); + + $transaction = new Transaction($request, $sender, array(), $messageFactory); + $promise = $transaction->send(); + $promise->cancel(); + + Block\await($promise, $loop); + } + public function testReceivingStreamingBodyWillResolveWithStreamingResponseIfStreamingIsEnabled() { $messageFactory = new MessageFactory();