diff --git a/composer.json b/composer.json
index 7bec3f66d..92d208b46 100644
--- a/composer.json
+++ b/composer.json
@@ -31,7 +31,7 @@
"enqueue/amqp-bunny": "^0.10.0",
"enqueue/amqp-ext": "^0.10.8",
"enqueue/stomp": "^0.10.0",
- "pda/pheanstalk": "3.2.1",
+ "pda/pheanstalk": "^v5.0.0",
"aws/aws-sdk-php": ">=2.4",
"vimeo/psalm": "^5.10.0"
},
diff --git a/psalm.xml b/psalm.xml
index a4f56374a..e19fc2555 100644
--- a/psalm.xml
+++ b/psalm.xml
@@ -15,7 +15,6 @@
-
diff --git a/src/drivers/beanstalk/Command.php b/src/drivers/beanstalk/Command.php
index bcd843cef..6360032ac 100644
--- a/src/drivers/beanstalk/Command.php
+++ b/src/drivers/beanstalk/Command.php
@@ -69,9 +69,6 @@ public function actionRun(): ?int
*/
public function actionListen(int $timeout = 3): ?int
{
- if (!is_numeric($timeout)) {
- throw new Exception('Timeout must be numeric.');
- }
if ($timeout < 1) {
throw new Exception('Timeout must be greater than zero.');
}
diff --git a/src/drivers/beanstalk/Queue.php b/src/drivers/beanstalk/Queue.php
index 42288abb5..384347fa3 100644
--- a/src/drivers/beanstalk/Queue.php
+++ b/src/drivers/beanstalk/Queue.php
@@ -10,17 +10,21 @@
namespace yii\queue\beanstalk;
-use Pheanstalk\Exception\ServerException;
-use Pheanstalk\Job;
+use Exception;
+use Pheanstalk\Contract\PheanstalkPublisherInterface;
+use Pheanstalk\Contract\SocketFactoryInterface;
use Pheanstalk\Pheanstalk;
-use Pheanstalk\PheanstalkInterface;
-use Pheanstalk\Response;
+use Pheanstalk\Values\JobId;
+use Pheanstalk\Values\Timeout;
+use Pheanstalk\Values\TubeName;
+use Pheanstalk\Values\TubeStats;
use yii\base\InvalidArgumentException;
use yii\queue\cli\Queue as CliQueue;
/**
* Beanstalk Queue.
*
+ * @property-read TubeName $tubeName
* @property-read object $statsTube Tube statistics.
*
* @author Roman Zhuravlev
@@ -34,7 +38,15 @@ class Queue extends CliQueue
/**
* @var int connection port
*/
- public int $port = PheanstalkInterface::DEFAULT_PORT;
+ public int $port = SocketFactoryInterface::DEFAULT_PORT;
+ /**
+ * @var int|null connection timeout in seconds
+ */
+ public ?int $connectTimeout = null;
+ /**
+ * @var int|null receive timeout in seconds
+ */
+ public ?int $receiveTimeout = null;
/**
* @var string beanstalk tube
*/
@@ -44,11 +56,13 @@ class Queue extends CliQueue
*/
public string $commandClass = Command::class;
+ private ?Pheanstalk $pheanstalk = null;
+
/**
* Listens queue and runs each job.
*
* @param bool $repeat whether to continue listening when queue is empty.
- * @param int $timeout number of seconds to wait for next message.
+ * @param int<0, max> $timeout number of seconds to wait for next message.
* @return null|int exit code.
* @internal for worker command only.
* @since 2.0.2
@@ -57,15 +71,24 @@ public function run(bool $repeat, int $timeout = 0): ?int
{
return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) {
while ($canContinue()) {
- if ($payload = $this->getPheanstalk()->reserveFromTube($this->tube, $timeout)) {
- $info = $this->getPheanstalk()->statsJob($payload);
- if ($this->handleMessage(
- $payload->getId(),
- $payload->getData(),
- (int)$info->ttr,
- (int)$info->reserves
- )) {
- $this->getPheanstalk()->delete($payload);
+ $pheanstalk = $this->getPheanstalk();
+ $pheanstalk->watch($this->getTubeName());
+
+ $job = $pheanstalk->reserveWithTimeout($timeout);
+ if (null !== $job) {
+ try {
+ $info = $pheanstalk->statsJob($job);
+
+ if ($this->handleMessage(
+ $job->getId(),
+ $job->getData(),
+ $info->timeToRelease,
+ $info->reserves
+ )) {
+ $pheanstalk->delete($job);
+ }
+ } catch (Exception) {
+ $pheanstalk->release($job);
}
} elseif (!$repeat) {
break;
@@ -84,39 +107,32 @@ public function status($id): int
}
try {
- $stats = $this->getPheanstalk()->statsJob($id);
- if ($stats['state'] === 'reserved') {
+ $stats = $this->getPheanstalk()->statsJob(new JobId($id));
+
+ if ($stats->state->value === 'reserved') {
return self::STATUS_RESERVED;
}
return self::STATUS_WAITING;
- } catch (ServerException $e) {
- if ($e->getMessage() === 'Server reported NOT_FOUND') {
- return self::STATUS_DONE;
- }
-
- throw $e;
+ } catch (\Throwable) {
+ return self::STATUS_DONE;
}
}
/**
* Removes a job by ID.
*
- * @param int $id of a job
+ * @param int|string $id of a job
* @return bool
* @since 2.0.1
*/
- public function remove(int $id): bool
+ public function remove(int|string $id): bool
{
try {
- $this->getPheanstalk()->delete(new Job($id, null));
+ $this->getPheanstalk()->delete(new JobId($id));
return true;
- } catch (ServerException $e) {
- if (str_starts_with($e->getMessage(), 'NOT_FOUND')) {
- return false;
- }
-
- throw $e;
+ } catch (\Throwable) {
+ return false;
}
}
@@ -125,33 +141,58 @@ public function remove(int $id): bool
*/
protected function pushMessage(string $payload, int $ttr, int $delay, mixed $priority): int|string|null
{
- return $this->getPheanstalk()->putInTube(
- $this->tube,
- $payload,
- $priority ?: PheanstalkInterface::DEFAULT_PRIORITY,
- $delay,
- $ttr
- );
+ $pheanstalk = $this->getPheanstalk();
+ $pheanstalk->useTube($this->getTubeName());
+
+ $result = $pheanstalk
+ ->put(
+ $payload,
+ $priority ?: PheanstalkPublisherInterface::DEFAULT_PRIORITY,
+ $delay, // Seconds to wait before job becomes ready
+ $ttr // Time To Run: seconds a job can be reserved for
+ );
+ return $result->getId();
}
/**
- * @return object tube statistics
+ * @return TubeStats tube statistics
*/
- public function getStatsTube(): object
+ public function getStatsTube(): TubeStats
{
- return $this->getPheanstalk()->statsTube($this->tube);
+ return $this->getPheanstalk()->statsTube($this->getTubeName());
}
- /**
- * @return Pheanstalk
- */
protected function getPheanstalk(): Pheanstalk
{
- if (!$this->_pheanstalk) {
- $this->_pheanstalk = new Pheanstalk($this->host, $this->port);
+ if (null === $this->pheanstalk) {
+ $this->pheanstalk = Pheanstalk::create(
+ $this->host,
+ $this->port,
+ $this->getConnectTimeout(),
+ $this->getReceiveTimeout()
+ );
}
- return $this->_pheanstalk;
+ return $this->pheanstalk;
+ }
+
+ protected function getTubeName(): TubeName
+ {
+ return new TubeName($this->tube);
}
- private $_pheanstalk;
+ private function getConnectTimeout(): ?Timeout
+ {
+ if (null === $this->connectTimeout) {
+ return null;
+ }
+ return new Timeout($this->connectTimeout);
+ }
+
+ private function getReceiveTimeout(): ?Timeout
+ {
+ if (null === $this->receiveTimeout) {
+ return null;
+ }
+ return new Timeout($this->receiveTimeout);
+ }
}
diff --git a/tests/app/PriorityJob.php b/tests/app/PriorityJob.php
index 7abd876c3..a7e31e8f1 100644
--- a/tests/app/PriorityJob.php
+++ b/tests/app/PriorityJob.php
@@ -24,7 +24,7 @@ class PriorityJob extends BaseObject implements JobInterface
{
public int $number;
- public function execute(Queue $queue)
+ public function execute(Queue $queue): void
{
file_put_contents(self::getFileName(), $this->number, FILE_APPEND);
}
diff --git a/tests/app/RetryJob.php b/tests/app/RetryJob.php
index 7e372b6f3..2a8c0e8bd 100644
--- a/tests/app/RetryJob.php
+++ b/tests/app/RetryJob.php
@@ -10,6 +10,7 @@
namespace tests\app;
+use Exception;
use Yii;
use yii\base\BaseObject;
use yii\queue\Queue;
@@ -22,17 +23,17 @@
*/
class RetryJob extends BaseObject implements RetryableJobInterface
{
- public $uid;
+ public string $uid;
- public function execute(Queue $queue)
+ public function execute(Queue $queue): void
{
file_put_contents($this->getFileName(), 'a', FILE_APPEND);
- throw new \Exception('Planned error.');
+ throw new Exception('Planned error.');
}
public function getFileName(): bool|string
{
- return Yii::getAlias("@runtime/job-{$this->uid}.lock");
+ return Yii::getAlias("@runtime/job-$this->uid.lock");
}
public function getTtr(): int
diff --git a/tests/app/SimpleJob.php b/tests/app/SimpleJob.php
index 5b9ef9651..1e7a669ae 100644
--- a/tests/app/SimpleJob.php
+++ b/tests/app/SimpleJob.php
@@ -24,7 +24,7 @@ class SimpleJob extends BaseObject implements JobInterface
{
public string $uid;
- public function execute(Queue $queue)
+ public function execute(Queue $queue): void
{
file_put_contents($this->getFileName(), '');
}
diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml
index 84fd09812..ddcb8e508 100644
--- a/tests/docker-compose.yml
+++ b/tests/docker-compose.yml
@@ -16,6 +16,11 @@ services:
- 8.8.8.8
- 4.4.4.4
environment:
+ COMPOSER_ALLOW_SUPERUSER: 1
+ XDEBUG_MODE: ${XDEBUG_MODE:-off} # Setup "debug" to enable debugging
+ XDEBUG_CONFIG: "client_host=host.docker.internal"
+ XDEBUG_TRIGGER: ${XDEBUG_TRIGGER:-yes}
+ PHP_IDE_CONFIG: "serverName=yii2-queue"
MYSQL_HOST: mysql
MYSQL_USER: yii2_queue_test
MYSQL_PASSWORD: yii2_queue_test
@@ -31,7 +36,6 @@ services:
RABBITMQ_PASSWORD: guest
BEANSTALK_HOST: beanstalk
GEARMAN_HOST: gearmand
- COMPOSER_ALLOW_SUPERUSER: 1
ACTIVEMQ_HOST: activemq
AWS_KEY: ${AWS_KEY:-admin}
AWS_SECRET: ${AWS_SECRET:-admin}
@@ -50,6 +54,8 @@ services:
- localstack
networks:
net: {}
+ extra_hosts:
+ - host.docker.internal:${HOST_IP:-host-gateway}
# https://hub.docker.com/_/mysql/
mysql:
@@ -93,9 +99,9 @@ services:
networks:
net: {}
- # https://hub.docker.com/r/schickling/beanstalkd/
+ # https://hub.docker.com/r/rayyounghong/beanstalkd/
beanstalk:
- image: schickling/beanstalkd
+ image: rayyounghong/beanstalkd
ports:
- "11301:11300"
networks:
@@ -135,4 +141,8 @@ services:
networks:
net:
+ driver: bridge
name: yii2_queue_net
+ ipam:
+ config:
+ - subnet: 172.18.0.0/16
diff --git a/tests/drivers/CliTestCase.php b/tests/drivers/CliTestCase.php
index 907344766..66a0b6062 100644
--- a/tests/drivers/CliTestCase.php
+++ b/tests/drivers/CliTestCase.php
@@ -67,7 +67,6 @@ private function prepareCmd(array $cmd): array
{
$class = new ReflectionClass($this->getQueue());
$method = $class->getMethod('getCommandId');
- $method->setAccessible(true);
$replace = [
'php' => PHP_BINARY,
diff --git a/tests/drivers/beanstalk/QueueTest.php b/tests/drivers/beanstalk/QueueTest.php
index 8ba5bbb7c..61491c03d 100644
--- a/tests/drivers/beanstalk/QueueTest.php
+++ b/tests/drivers/beanstalk/QueueTest.php
@@ -10,10 +10,10 @@
namespace tests\drivers\beanstalk;
-use Pheanstalk\Exception\ServerException;
+use Exception;
use Pheanstalk\Pheanstalk;
+use Pheanstalk\Values\JobId;
use tests\app\PriorityJob;
-use tests\app\RetryJob;
use tests\drivers\CliTestCase;
use Yii;
use yii\queue\beanstalk\Queue;
@@ -76,17 +76,6 @@ public function testLater(): void
$this->assertSimpleJobLaterDone($job, 2);
}
- public function testRetry(): void
- {
- $this->startProcess(['php', 'yii', 'queue/listen', '1']);
- $job = new RetryJob(['uid' => uniqid()]);
- $this->getQueue()->push($job);
- sleep(6);
-
- $this->assertFileExists($job->getFileName());
- $this->assertEquals('aa', file_get_contents($job->getFileName()));
- }
-
public function testRemove(): void
{
$id = $this->getQueue()->push($this->createSimpleJob());
@@ -94,6 +83,36 @@ public function testRemove(): void
$this->runProcess(['php', 'yii', 'queue/remove', $id]);
$this->assertFalse($this->jobIsExists($id));
+
+ $queue = $this->getQueue();
+ $jobId = $queue->push($this->createSimpleJob());
+
+ $this->assertTrue($queue->remove($jobId));
+ $this->assertFalse($queue->remove('007'));
+ }
+
+ public function testConnect(): void
+ {
+ $this->startProcess(['php', 'yii', 'queue/listen', '1']);
+
+ $job = $this->createSimpleJob();
+
+ $queue = new Queue(['host' => getenv('BEANSTALK_HOST') ?: 'localhost']);
+ $queue->receiveTimeout = 1;
+ $queue->connectTimeout = 5;
+ $queue->push($job);
+
+ $this->assertSimpleJobDone($job);
+ }
+
+ public function testStatusTube(): void
+ {
+ $queue = $this->getQueue();
+ $queue->push($this->createSimpleJob());
+
+ $statusTube = $queue->getStatsTube();
+
+ $this->assertEquals('queue', $statusTube->name->value);
}
/**
@@ -111,16 +130,12 @@ protected function getQueue(): Queue
*/
protected function jobIsExists(int|string|null $id): bool
{
- $connection = new Pheanstalk($this->getQueue()->host, $this->getQueue()->port);
+ $connection = Pheanstalk::create($this->getQueue()->host, $this->getQueue()->port);
try {
- $connection->peek($id);
+ $connection->peek(new JobId($id));
return true;
- } catch (ServerException $e) {
- if (str_starts_with($e->getMessage(), 'NOT_FOUND')) {
- return false;
- }
-
- throw $e;
+ } catch (\Throwable) {
+ return false;
}
}
}