diff --git a/src/Queue.php b/src/Queue.php new file mode 100644 index 000000000..9dc786c30 --- /dev/null +++ b/src/Queue.php @@ -0,0 +1,70 @@ +connectionStrings = $servers; + } + + private function declareQueueWithDLX($channel, $queueName) + { + // Existing implementation for declaring a queue with DLX + } + + private function declareQueueWithoutDLX($channel, $queueName) + { + // Implement declaration without DLX + } + + public function publish($queueName, $message, $useDLX = true) + { + $connection = $this->getRandomConnection(); + $channel = $connection->channel(); + + if ($useDLX) { + $this->declareQueueWithDLX($channel, $queueName); + } else { + $this->declareQueueWithoutDLX($channel, $queueName); + } + + // Code to publish the message + } + + public function consume($timeout, $queueName, $callback, $resetTimeoutOnReceive = false, $prefetchCount = 10, $useDLX = true) + { + foreach ($this->connectionStrings as $server) { + $connection = $this->getConnection($server); + $channel = $connection->channel(); + + if ($useDLX) { + $this->declareQueueWithDLX($channel, $queueName); + } else { + $this->declareQueueWithoutDLX($channel, $queueName); + } + + $channel->basic_qos(null, $prefetchCount, null); + + // Code to consume messages + } + } + + private function getRandomConnection() + { + $randomServer = $this->connectionStrings[array_rand($this->connectionStrings)]; + return $this->getConnection($randomServer); + } + + private function getConnection($server) + { + $options = ['connection_timeout' => 10.0, 'read_write_timeout' => 10.0]; + return AMQPStreamConnection::create_connection([$server], $options); + } + + // Additional methods and logic for the Queue class + +} diff --git a/tests/QueueTest.php b/tests/QueueTest.php new file mode 100644 index 000000000..b0a609e7e --- /dev/null +++ b/tests/QueueTest.php @@ -0,0 +1,60 @@ + 'server1', 'port' => 5672, 'user' => 'guest', 'password' => 'guest'], + ['host' => 'server2', 'port' => 5672, 'user' => 'guest', 'password' => 'guest'] + ]; + $this->queue = new Queue($servers); + } + + public function testPublishWithDLX() + { + $this->queue->publish('testQueue', 'testMessage', true); + // Assertions to verify message was published with DLX + } + + public function testPublishWithoutDLX() + { + $this->queue->publish('testQueue', 'testMessage', false); + // Assertions to verify message was published without DLX + } + + public function testConsumeWithDLX() + { + $callback = function ($msg) { + // Process message + }; + $this->queue->consume(30, 'testQueue', $callback, false, 10, true); + // Assertions to verify messages are consumed with DLX + } + + public function testConsumeWithoutDLX() + { + $callback = function ($msg) { + // Process message + }; + $this->queue->consume(30, 'testQueue', $callback, false, 10, false); + // Assertions to verify messages are consumed without DLX + } + + public function testConsumeWithDifferentQoS() + { + $callback = function ($msg) { + // Process message + }; + $this->queue->consume(30, 'testQueue', $callback, false, 5, true); + // Assertions to verify messages are consumed with different QoS settings + } + + // Additional test cases + +}