Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close inactive connections #423

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ multiple concurrent HTTP requests without blocking.
* [ServerRequest](#serverrequest)
* [ResponseException](#responseexception)
* [React\Http\Middleware](#reacthttpmiddleware)
* [InactiveConnectionTimeoutMiddleware](#inactiveconnectiontimeoutmiddleware)
* [StreamingRequestMiddleware](#streamingrequestmiddleware)
* [LimitConcurrentRequestsMiddleware](#limitconcurrentrequestsmiddleware)
* [RequestBodyBufferMiddleware](#requestbodybuffermiddleware)
Expand Down Expand Up @@ -2498,6 +2499,22 @@ access its underlying response object.

### React\Http\Middleware

#### InactiveConnectionTimeoutMiddleware

The `React\Http\Middleware\InactiveConnectionTimeoutMiddleware` is purely a configuration middleware to configure the
`HttpServer` to close any inactive connections between requests to close the connection and not leave them needlessly open.

The following example configures the `HttpServer` to close any inactive connections after one and a half second:

```php
$http = new React\Http\HttpServer(
new React\Http\Middleware\InactiveConnectionTimeoutMiddleware(1.5),
$handler
);
```
> Internally, this class is used as a "value object" to override the default timeout of one minute.
As such it doesn't have any behavior internally, that is all in the internal "StreamingServer".

#### StreamingRequestMiddleware

The `React\Http\Middleware\StreamingRequestMiddleware` can be used to
Expand Down
10 changes: 7 additions & 3 deletions src/HttpServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use React\Http\Io\IniUtil;
use React\Http\Io\MiddlewareRunner;
use React\Http\Io\StreamingServer;
use React\Http\Middleware\InactiveConnectionTimeoutMiddleware;
use React\Http\Middleware\LimitConcurrentRequestsMiddleware;
use React\Http\Middleware\StreamingRequestMiddleware;
use React\Http\Middleware\RequestBodyBufferMiddleware;
Expand Down Expand Up @@ -219,10 +220,13 @@ public function __construct($requestHandlerOrLoop)
}

$streaming = false;
$idleConnectTimeout = InactiveConnectionTimeoutMiddleware::DEFAULT_TIMEOUT;
foreach ((array) $requestHandlers as $handler) {
if ($handler instanceof StreamingRequestMiddleware) {
$streaming = true;
break;
}
if ($handler instanceof InactiveConnectionTimeoutMiddleware) {
$idleConnectTimeout = $handler->getTimeout();
}
}

Expand Down Expand Up @@ -252,10 +256,10 @@ public function __construct($requestHandlerOrLoop)
* doing anything with the request.
*/
$middleware = \array_filter($middleware, function ($handler) {
return !($handler instanceof StreamingRequestMiddleware);
return !($handler instanceof StreamingRequestMiddleware) && !($handler instanceof InactiveConnectionTimeoutMiddleware);
});

$this->streamingServer = new StreamingServer($loop, new MiddlewareRunner($middleware));
$this->streamingServer = new StreamingServer($loop, new MiddlewareRunner($middleware), $idleConnectTimeout);

$that = $this;
$this->streamingServer->on('error', function ($error) use ($that) {
Expand Down
36 changes: 30 additions & 6 deletions src/Io/StreamingServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use React\EventLoop\LoopInterface;
use React\Http\Message\Response;
use React\Http\Message\ServerRequest;
use React\Http\Middleware\InactiveConnectionTimeoutMiddleware;
use React\Promise;
use React\Promise\CancellablePromiseInterface;
use React\Promise\PromiseInterface;
Expand Down Expand Up @@ -85,6 +86,7 @@ final class StreamingServer extends EventEmitter
private $callback;
private $parser;
private $loop;
private $idleConnectionTimeout;

/**
* Creates an HTTP server that invokes the given callback for each incoming HTTP request
Expand All @@ -96,15 +98,17 @@ final class StreamingServer extends EventEmitter
*
* @param LoopInterface $loop
* @param callable $requestHandler
* @param float $idleConnectTimeout
* @see self::listen()
*/
public function __construct(LoopInterface $loop, $requestHandler)
public function __construct(LoopInterface $loop, $requestHandler, $idleConnectTimeout = InactiveConnectionTimeoutMiddleware::DEFAULT_TIMEOUT)
{
if (!\is_callable($requestHandler)) {
throw new \InvalidArgumentException('Invalid request handler given');
}

$this->loop = $loop;
$this->idleConnectionTimeout = $idleConnectTimeout;

$this->callback = $requestHandler;
$this->parser = new RequestHeaderParser();
Expand Down Expand Up @@ -134,7 +138,27 @@ public function __construct(LoopInterface $loop, $requestHandler)
*/
public function listen(ServerInterface $socket)
{
$socket->on('connection', array($this->parser, 'handle'));
$socket->on('connection', array($this, 'handle'));
}

/** @internal */
public function handle(ConnectionInterface $conn)
{
$timer = $this->loop->addTimer($this->idleConnectionTimeout, function () use ($conn) {
$conn->close();
});
$loop = $this->loop;
$conn->once('data', function () use ($loop, $timer) {
$loop->cancelTimer($timer);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider a 60 second timeout for all headers instead

});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WyriHaximus This looks like a good starting point 👍

However, the way it's currently designed, this will only wait for some data and then wait infinitely for a complete HTTP request header. I would suggest moving this logic to the RequestHeaderParser and make the timeout apply to the entire HTTP request header. For instance, if I send GET / HTTP/1.1\r\n but nothing else, this should still run into a timeout.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clue Correct, that is intentional. My initial intent was to first create this PR and do it on a connection level, and then do a follow up to do it on both the connection and request level. But if you think it would be better to do both I can fold that into this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WyriHaximus Without knowing the complete scope of this feature I would lean towards including the timeout on the request level, but I'll leave this up to you to decide 👍

Either way I think the above snippet should be updated without the if statement as the underlying socket and stream components should never emit an empty data event.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clue updated the PR with that change as per your suggestion. As discussed this morning I'll also file the follow-up PR taking care of the idle request closing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clue As promised: #425

$conn->on('end', function () use ($loop, $timer) {
$loop->cancelTimer($timer);
});
Comment on lines +154 to +156
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be unneeded given the end event will always be followed by the close event which is handled the same way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, will drop this one 👍

$conn->on('close', function () use ($loop, $timer) {
$loop->cancelTimer($timer);
});

$this->parser->handle($conn);
}

/** @internal */
Expand Down Expand Up @@ -350,7 +374,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt

// either wait for next request over persistent connection or end connection
if ($persist) {
$this->parser->handle($connection);
$this->handle($connection);
} else {
$connection->end();
}
Expand All @@ -371,10 +395,10 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
// write streaming body and then wait for next request over persistent connection
if ($persist) {
$body->pipe($connection, array('end' => false));
$parser = $this->parser;
$body->on('end', function () use ($connection, $parser, $body) {
$that = $this;
$body->on('end', function () use ($connection, $that, $body) {
$connection->removeListener('close', array($body, 'close'));
$parser->handle($connection);
$that->handle($connection);
});
} else {
$body->pipe($connection);
Expand Down
58 changes: 58 additions & 0 deletions src/Middleware/InactiveConnectionTimeoutMiddleware.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?php

namespace React\Http\Middleware;

use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use React\Http\Io\HttpBodyStream;
use React\Http\Io\PauseBufferStream;
use React\Promise;
use React\Promise\PromiseInterface;
use React\Promise\Deferred;
use React\Stream\ReadableStreamInterface;

/**
* Closes any inactive connection after the specified amount of seconds since last activity.
*
* This allows you to set an alternative timeout to the default one minute (60 seconds). For example
* thirteen and a half seconds:
*
* ```php
* $http = new React\Http\HttpServer(
* new React\Http\Middleware\InactiveConnectionTimeoutMiddleware(13.5),
* $handler
* );
*
* > Internally, this class is used as a "value object" to override the default timeout of one minute.
* As such it doesn't have any behavior internally, that is all in the internal "StreamingServer".
*/
final class InactiveConnectionTimeoutMiddleware
{
const DEFAULT_TIMEOUT = 60;

/**
* @var float
*/
private $timeout;

/**
* @param float $timeout
*/
public function __construct($timeout = self::DEFAULT_TIMEOUT)
{
$this->timeout = $timeout;
}

public function __invoke(ServerRequestInterface $request, $next)
{
return $next($request);
}

/**
* @return float
*/
public function getTimeout()
{
return $this->timeout;
}
}
17 changes: 16 additions & 1 deletion tests/HttpServerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
use React\EventLoop\Factory;
use React\Http\HttpServer;
use React\Http\Io\IniUtil;
use React\Http\Io\StreamingServer;
use React\Http\Middleware\InactiveConnectionTimeoutMiddleware;
use React\Http\Middleware\StreamingRequestMiddleware;
use React\Promise;
use React\Promise\Deferred;
Expand Down Expand Up @@ -254,6 +256,19 @@ function (ServerRequestInterface $request) use (&$streaming) {
$this->assertEquals(true, $streaming);
}

public function testIdleConnectionWillBeClosedAfterConfiguredTimeout()
{
$this->connection->expects($this->once())->method('close');

$loop = Factory::create();
$http = new HttpServer($loop, new InactiveConnectionTimeoutMiddleware(0.1), $this->expectCallableNever());

$http->listen($this->socket);
$this->socket->emit('connection', array($this->connection));

$loop->run();
}

public function testForwardErrors()
{
$exception = new \Exception();
Expand Down Expand Up @@ -434,7 +449,7 @@ public function testConstructServerWithMemoryLimitDoesLimitConcurrency()

public function testConstructFiltersOutConfigurationMiddlewareBefore()
{
$http = new HttpServer(new StreamingRequestMiddleware(), function () { });
$http = new HttpServer(new InactiveConnectionTimeoutMiddleware(0), new StreamingRequestMiddleware(), function () { });

$ref = new \ReflectionProperty($http, 'streamingServer');
$ref->setAccessible(true);
Expand Down
17 changes: 15 additions & 2 deletions tests/Io/StreamingServerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,19 @@ public function testRequestEventWillNotBeEmittedForIncompleteHeaders()
$this->connection->emit('data', array($data));
}

public function testIdleConnectionWillBeClosedAfterConfiguredTimeout()
{
$this->connection->expects($this->once())->method('close');

$loop = Factory::create();
$server = new StreamingServer($loop, $this->expectCallableNever(), 0.1);

$server->listen($this->socket);
$this->socket->emit('connection', array($this->connection));

$loop->run();
}

public function testRequestEventIsEmitted()
{
$server = new StreamingServer(Factory::create(), $this->expectCallableOnce());
Expand Down Expand Up @@ -3196,9 +3209,9 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle
// pretend parser just finished parsing
$server->handleRequest($this->connection, $request);

$this->assertCount(2, $this->connection->listeners('close'));
$this->assertCount(3, $this->connection->listeners('close'));
$body->end();
$this->assertCount(1, $this->connection->listeners('close'));
$this->assertCount(3, $this->connection->listeners('close'));
}

private function createGetRequest()
Expand Down