Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make use of Guzzle Pool to improve efficiency #401

Merged
merged 6 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 58 additions & 3 deletions src/WebPush.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
namespace Minishlink\WebPush;

use GuzzleHttp\Client;
use GuzzleHttp\Pool;
use GuzzleHttp\Exception\RequestException;
use GuzzleHttp\Psr7\Request;
use ParagonIE\ConstantTime\Base64UrlSafe;
Expand All @@ -30,7 +31,7 @@ class WebPush
protected ?array $notifications = null;

/**
* @var array Default options: TTL, urgency, topic, batchSize
* @var array Default options: TTL, urgency, topic, batchSize, concurrency
*/
protected array $defaultOptions;

Expand All @@ -53,7 +54,7 @@ class WebPush
* WebPush constructor.
*
* @param array $auth Some servers need authentication
* @param array $defaultOptions TTL, urgency, topic, batchSize
* @param array $defaultOptions TTL, urgency, topic, batchSize, concurrency
* @param int|null $timeout Timeout of POST request
*
* @throws \ErrorException
Expand Down Expand Up @@ -175,6 +176,58 @@ public function flush(?int $batchSize = null): \Generator
}
}

/**
* Flush notifications. Triggers concurrent requests.
*
* @param callable(MessageSentReport): void $callback Callback for each notification
* @param null|int $batchSize Defaults the value defined in defaultOptions during instantiation (which defaults to 1000).
* @param null|int $concurrency Defaults the value defined in defaultOptions during instantiation (which defaults to 100).
Gugu7264 marked this conversation as resolved.
Show resolved Hide resolved
*/
public function flushPooled($callback, ?int $batchSize = null, ?int $concurrency = null): void
{
if (empty($this->notifications)) {
return;
}

if (null === $batchSize) {
$batchSize = $this->defaultOptions['batchSize'];
}

if (null === $concurrency) {
$concurrency = $this->defaultOptions['concurrency'];
}

$batches = array_chunk($this->notifications, $batchSize);
$this->notifications = [];

foreach ($batches as $batch) {
$batch = $this->prepare($batch);
$pool = new Pool($this->client, $batch, [
'concurrency' => $concurrency,
'fulfilled' => function (ResponseInterface $response, int $index) use ($callback, $batch) {
/** @var \Psr\Http\Message\RequestInterface $request **/
$request = $batch[$index];
$callback(new MessageSentReport($request, $response));
},
'rejected' => function (RequestException $reason) use ($callback) {
if (method_exists($reason, 'getResponse')) {
$response = $reason->getResponse();
} else {
$response = null;
}
$callback(new MessageSentReport($reason->getRequest(), $response, false, $reason->getMessage()));
},
]);

$promise = $pool->promise();
$promise->wait();
}

if ($this->reuseVAPIDHeaders) {
$this->vapidHeaders = [];
}
}

/**
* @throws \ErrorException|\Random\RandomException
*/
Expand Down Expand Up @@ -315,14 +368,16 @@ public function getDefaultOptions(): array
}

/**
* @param array $defaultOptions Keys 'TTL' (Time To Live, defaults 4 weeks), 'urgency', 'topic', 'batchSize'
* @param array $defaultOptions Keys 'TTL' (Time To Live, defaults 4 weeks), 'urgency', 'topic', 'batchSize', 'concurrency'
*/
public function setDefaultOptions(array $defaultOptions): WebPush
{
$this->defaultOptions['TTL'] = $defaultOptions['TTL'] ?? 2419200;
$this->defaultOptions['urgency'] = $defaultOptions['urgency'] ?? null;
$this->defaultOptions['topic'] = $defaultOptions['topic'] ?? null;
$this->defaultOptions['batchSize'] = $defaultOptions['batchSize'] ?? 1000;
$this->defaultOptions['concurrency'] = $defaultOptions['concurrency'] ?? 100;


return $this;
}
Expand Down
40 changes: 40 additions & 0 deletions tests/WebPushTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,46 @@ public function testFlush(): void
}
}

/**
* @throws \ErrorException
* @throws \JsonException
*/
public function testFlushPooled(): void
{
$subscription = new Subscription(self::$endpoints['standard']);

$report = $this->webPush->sendOneNotification($subscription);
$this->assertFalse($report->isSuccess()); // it doesn't have VAPID

// queue has been reset
$this->assertEmpty(iterator_to_array($this->webPush->flush()));

$report = $this->webPush->sendOneNotification($subscription);
$this->assertFalse($report->isSuccess()); // it doesn't have VAPID

$nonExistentSubscription = Subscription::create([
'endpoint' => 'https://fcm.googleapis.com/fcm/send/fCd2-8nXJhU:APA91bGi2uaqFXGft4qdolwyRUcUPCL1XV_jWy1tpCRqnu4sk7ojUpC5gnq1PTncbCdMq9RCVQIIFIU9BjzScvjrDqpsI7J-K_3xYW8xo1xSNCfge1RvJ6Xs8RGL_Sw7JtbCyG1_EVgWDc22on1r_jozD8vsFbB0Fg',
'publicKey' => 'BME-1ZSAv2AyGjENQTzrXDj6vSnhAIdKso4n3NDY0lsd1DUgEzBw7ARMKjrYAm7JmJBPsilV5CWNH0mVPyJEt0Q',
'authToken' => 'hUIGbmiypj9_EQea8AnCKA',
'contentEncoding' => 'aes128gcm',
]);

// test multiple requests
$this->webPush->queueNotification($nonExistentSubscription, json_encode(['test' => 1], JSON_THROW_ON_ERROR));
$this->webPush->queueNotification($nonExistentSubscription, json_encode(['test' => 2], JSON_THROW_ON_ERROR));
$this->webPush->queueNotification($nonExistentSubscription, json_encode(['test' => 3], JSON_THROW_ON_ERROR));

$callback = function ($report) {
$this->assertFalse($report->isSuccess());
$this->assertTrue($report->isSubscriptionExpired());
$this->assertEquals(410, $report->getResponse()->getStatusCode());
$this->assertNotEmpty($report->getReason());
$this->assertNotFalse(filter_var($report->getEndpoint(), FILTER_VALIDATE_URL));
};

$this->webPush->flushPooled($callback);
}

public function testFlushEmpty(): void
{
$this->assertEmpty(iterator_to_array($this->webPush->flush(300)));
Expand Down