Skip to content

Commit

Permalink
Merge pull request #75 from clue-labs/cancellation
Browse files Browse the repository at this point in the history
Support request cancellation
  • Loading branch information
clue authored Sep 8, 2017
2 parents 7150c51 + 7d2a295 commit 7ff7fd1
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 32 deletions.
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ mess with most of the low-level details.
* [Browser](#browser)
* [Methods](#methods)
* [Promises](#promises)
* [Cancellation](#cancellation)
* [Blocking](#blocking)
* [Streaming](#streaming)
* [submit()](#submit)
Expand Down Expand Up @@ -140,7 +141,22 @@ You may also want to look into the [streaming API](#streaming):
* If you're dealing with lots of concurrent requests (100+) or
* If you want to process individual data chunks as they happen (without having to wait for the full response body) or
* If you're expecting a big response body size (1 MiB or more, for example when downloading binary files) or
* If you're unsure about the response body size (better be safe than sorry when accessing arbitrary remote HTTP endpoints and the response body size is unknown in advance).
* If you're unsure about the response body size (better be safe than sorry when accessing arbitrary remote HTTP endpoints and the response body size is unknown in advance).

#### Cancellation

The returned Promise is implemented in such a way that it can be cancelled
when it is still pending.
Cancelling a pending promise will reject its value with an Exception and
clean up any underlying resources.

```php
$promise = $browser->get($url);

$loop->addTimer(2.0, function () use ($promise) {
$promise->cancel();
});
```

#### Blocking

Expand Down
6 changes: 3 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"php": ">=5.4",
"react/event-loop": "^0.4 || ^0.3",
"react/http-client": "^0.5",
"react/promise": "^2 || ^1.1",
"react/promise": "^2.2.1",
"react/promise-stream": "^0.1.1",
"react/socket": "^0.8",
"react/socket-client": "^0.7 || ^0.6",
"react/stream": "^0.6 || ^0.5 || ^0.4.6",
Expand All @@ -28,7 +29,6 @@
"clue/block-react": "^1.0",
"clue/socks-react": "^0.8 || ^0.7",
"phpunit/phpunit": "^4.5",
"react/http": "^0.7.2",
"react/promise-stream": "^0.1.1"
"react/http": "^0.7.2"
}
}
8 changes: 6 additions & 2 deletions src/Io/Sender.php
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,14 @@ public function send(RequestInterface $request, MessageFactory $messageFactory)
$headers[$name] = implode(', ', $values);
}

$deferred = new Deferred();

$requestStream = $this->http->request($request->getMethod(), (string)$uri, $headers, $request->getProtocolVersion());

$deferred = new Deferred(function ($_, $reject) use ($requestStream) {
// close request stream if request is canceled
$reject(new \RuntimeException('Request canceled'));
$requestStream->close();
});

$requestStream->on('error', function($error) use ($deferred) {
$deferred->reject($error);
});
Expand Down
42 changes: 16 additions & 26 deletions src/Io/Transaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@

namespace Clue\React\Buzz\Io;

use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use Exception;
use Clue\React\Buzz\Browser;
use React\HttpClient\Client as HttpClient;
use Clue\React\Buzz\Io\Sender;
use Clue\React\Buzz\Message\ResponseException;
use Clue\React\Buzz\Message\MessageFactory;
use Clue\React\Buzz\Message\BufferedResponse;
use React\Stream\BufferedSink;
use React\Stream\ReadableStreamInterface;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use React\Promise;
use React\Promise\Stream;
use React\Stream\ReadableStreamInterface;
use Exception;

/**
* @internal
Expand Down Expand Up @@ -71,9 +69,6 @@ protected function next(RequestInterface $request)
return $promise->then(
function (ResponseInterface $response) use ($request, $that) {
return $that->onResponse($response, $request);
},
function ($error) use ($request, $that) {
return $that->onError($error, $request);
}
);
}
Expand All @@ -94,9 +89,17 @@ public function bufferResponse(ResponseInterface $response)

// buffer stream and resolve with buffered body
$messageFactory = $this->messageFactory;
return BufferedSink::createPromise($stream)->then(function ($body) use ($response, $messageFactory) {
return $response->withBody($messageFactory->body($body));
});
return Stream\buffer($stream)->then(
function ($body) use ($response, $messageFactory) {
return $response->withBody($messageFactory->body($body));
},
function ($e) use ($stream) {
// try to close stream if buffering fails (or is cancelled)
$stream->close();

throw $e;
}
);
}

/**
Expand All @@ -123,19 +126,6 @@ public function onResponse(ResponseInterface $response, RequestInterface $reques
return $response;
}

/**
* @internal
* @param Exception $error
* @param RequestInterface $request
* @throws Exception
*/
public function onError(Exception $error, RequestInterface $request)
{
$this->progress('error', array($error, $request));

throw $error;
}

private function onResponseRedirect(ResponseInterface $response, RequestInterface $request)
{
// resolve location relative to last request URI
Expand Down
12 changes: 12 additions & 0 deletions tests/FunctionalBrowserTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ public function testSimpleRequest()
Block\await($this->browser->get($this->base . 'get'), $this->loop);
}

/**
* @expectedException RuntimeException
* @group online
*/
public function testCancelPromiseWillRejectRequest()
{
$promise = $this->browser->get($this->base . 'get');
$promise->cancel();

Block\await($promise, $this->loop);
}

/** @group online */
public function testRedirectRequestRelative()
{
Expand Down
39 changes: 39 additions & 0 deletions tests/Io/SenderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,45 @@ public function testSenderLegacyConnectorRejection()
Block\await($promise, $this->loop);
}

public function testCancelRequestWillCancelConnector()
{
$promise = new \React\Promise\Promise(function () { }, function () {
throw new \RuntimeException();
});

$connector = $this->getMock('React\Socket\ConnectorInterface');
$connector->expects($this->once())->method('connect')->willReturn($promise);

$sender = new Sender(new HttpClient($this->loop, $connector));

$request = new Request('GET', 'http://www.google.com/');

$promise = $sender->send($request, $this->getMock('Clue\React\Buzz\Message\MessageFactory'));
$promise->cancel();

$this->setExpectedException('RuntimeException');
Block\await($promise, $this->loop);
}

public function testCancelRequestWillCloseConnection()
{
$connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
$connection->expects($this->once())->method('close');

$connector = $this->getMock('React\Socket\ConnectorInterface');
$connector->expects($this->once())->method('connect')->willReturn(Promise\resolve($connection));

$sender = new Sender(new HttpClient($this->loop, $connector));

$request = new Request('GET', 'http://www.google.com/');

$promise = $sender->send($request, $this->getMock('Clue\React\Buzz\Message\MessageFactory'));
$promise->cancel();

$this->setExpectedException('RuntimeException');
Block\await($promise, $this->loop);
}

public function provideRequestProtocolVersion()
{
return array(
Expand Down
26 changes: 26 additions & 0 deletions tests/Io/TransactionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,32 @@ public function testReceivingStreamingBodyWillResolveWithBufferedResponseByDefau
$this->assertEquals('hello world', (string)$response->getBody());
}

/**
* @expectedException RuntimeException
*/
public function testCancelBufferingResponseWillCloseStreamAndReject()
{
$messageFactory = new MessageFactory();
$loop = Factory::create();

$stream = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
$stream->expects($this->any())->method('isReadable')->willReturn(true);
$stream->expects($this->once())->method('close');

$request = $this->getMock('Psr\Http\Message\RequestInterface');
$response = $messageFactory->response(1.0, 200, 'OK', array(), $stream);

// mock sender to resolve promise with the given $response in response to the given $request
$sender = $this->getMockBuilder('Clue\React\Buzz\Io\Sender')->disableOriginalConstructor()->getMock();
$sender->expects($this->once())->method('send')->with($this->equalTo($request))->willReturn(Promise\resolve($response));

$transaction = new Transaction($request, $sender, array(), $messageFactory);
$promise = $transaction->send();
$promise->cancel();

Block\await($promise, $loop);
}

public function testReceivingStreamingBodyWillResolveWithStreamingResponseIfStreamingIsEnabled()
{
$messageFactory = new MessageFactory();
Expand Down

0 comments on commit 7ff7fd1

Please sign in to comment.