Skip to content

Commit

Permalink
Add queue arguments (#1)
Browse files Browse the repository at this point in the history
* Add supporting for queue arguments
  • Loading branch information
mundiir authored Jul 25, 2024
1 parent 7ab2080 commit 849c5e6
Show file tree
Hide file tree
Showing 14 changed files with 282 additions and 135 deletions.
60 changes: 56 additions & 4 deletions AmqpContextManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,23 @@
use Interop\Amqp\Impl\AmqpBind;
use Interop\Queue\Context;

use InvalidArgumentException;

use function array_key_exists;

class AmqpContextManager implements ContextManager
{
private $context;
private const ARGUMENTS_AS_INTEGER = [
'x-delay',
'x-expires',
'x-max-length',
'x-max-length-bytes',
'x-max-priority',
'x-message-ttl',
'x-consumer-timeout',
];

private Context $context;

public function __construct(Context $context)
{
Expand Down Expand Up @@ -59,19 +73,57 @@ public function ensureExists(array $destination): bool

$topic = $this->context->createTopic($destination['topic']);
$topic->setType($destination['topicOptions']['type'] ?? AmqpTopic::TYPE_FANOUT);
$topicFlags = $destination['topicOptions']['flags'] ?? ((int) $topic->getFlags() | AmqpTopic::FLAG_DURABLE);
$topicFlags = $destination['topicOptions']['flags'] ?? ((int)$topic->getFlags() | AmqpTopic::FLAG_DURABLE);
$topic->setFlags($topicFlags);
$this->context->declareTopic($topic);

$queue = $this->context->createQueue($destination['queue']);
$queueFlags = $destination['queueOptions']['flags'] ?? ((int) $queue->getFlags() | AmqpQueue::FLAG_DURABLE);
$queueFlags = $destination['queueOptions']['flags'] ?? ((int)$queue->getFlags() | AmqpQueue::FLAG_DURABLE);
$queue->setFlags($queueFlags);
$queueArguments = $destination['queueOptions']['arguments'] ?? [];
$queue->setArguments($this->normalizeQueueArguments($queueArguments));

$this->context->declareQueue($queue);

$this->context->bind(
new AmqpBind($queue, $topic, $destination['queueOptions']['bindingKey'] ?? null)
new AmqpBind($queue, $topic, $destination['queueOptions']['bindingKey'] ?? null),
);

return true;
}

/**
* Normalizes queue arguments to ensure they are integers.
*
* This method iterates over a predefined list of argument keys that are expected to be integers.
* If an argument is found in the input array but is not numeric, an InvalidArgumentException is thrown.
* Numeric arguments are cast to integers to ensure type consistency.
*
* @param array $arguments Associative array of queue arguments where the key is the argument
* name and the value is the argument value.
* @return array The modified arguments array with all specified keys having integer values.
* @throws InvalidArgumentException If an expected integer argument is not numeric.
*/
private function normalizeQueueArguments(array $arguments): array
{
foreach (self::ARGUMENTS_AS_INTEGER as $key) {
if (!array_key_exists($key, $arguments)) {
continue;
}

if (!is_numeric($arguments[$key])) {
throw new InvalidArgumentException(
sprintf(
'Integer expected for queue argument "%s", "%s" given.',
$key,
get_debug_type($arguments[$key]),
),
);
}

$arguments[$key] = (int)$arguments[$key];
}

return $arguments;
}
}
4 changes: 2 additions & 2 deletions Bundle/DependencyInjection/EnqueueAdapterExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ class EnqueueAdapterExtension extends Extension
/**
* {@inheritdoc}
*/
public function load(array $configs, ContainerBuilder $container)
public function load(array $configs, ContainerBuilder $container): void
{
$loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
$loader = new YamlFileLoader($container, new FileLocator(__DIR__ . '/../Resources/config'));
$loader->load('services.yml');
}
}
2 changes: 1 addition & 1 deletion EnvelopeItem/InteropMessageStamp.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
final class InteropMessageStamp implements NonSendableStampInterface
{
/** @var Message */
private $message;
private Message $message;

public function __construct(Message $message)
{
Expand Down
6 changes: 3 additions & 3 deletions EnvelopeItem/TransportConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
*/
final class TransportConfiguration implements StampInterface
{
private $topic;
private mixed $topic;

private array $metadata;

public function __construct(array $configuration = array())
public function __construct(array $configuration = [])
{
$this->topic = $configuration['topic'] ?? null;
$this->metadata = $configuration['metadata'] ?? array();
$this->metadata = $configuration['metadata'] ?? [];
}

/**
Expand Down
17 changes: 10 additions & 7 deletions Exception/MissingMessageMetadataSetterException.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,20 @@

namespace Enqueue\MessengerAdapter\Exception;

use LogicException;
use Symfony\Component\Messenger\Exception\ExceptionInterface;

class MissingMessageMetadataSetterException extends \LogicException implements ExceptionInterface
class MissingMessageMetadataSetterException extends LogicException implements ExceptionInterface
{
public function __construct(string $metadata, string $setter, string $class)
{
parent::__construct(sprintf(
'Missing "%s" setter for "%s" metadata key in "%s" class',
$setter,
$metadata,
$class
));
parent::__construct(
sprintf(
'Missing "%s" setter for "%s" metadata key in "%s" class',
$setter,
$metadata,
$class,
),
);
}
}
3 changes: 2 additions & 1 deletion Exception/RejectMessageException.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@

namespace Enqueue\MessengerAdapter\Exception;

use LogicException;
use Symfony\Component\Messenger\Exception\ExceptionInterface;

class RejectMessageException extends \LogicException implements ExceptionInterface
class RejectMessageException extends LogicException implements ExceptionInterface
{
}
3 changes: 2 additions & 1 deletion Exception/RequeueMessageException.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@

namespace Enqueue\MessengerAdapter\Exception;

use LogicException;
use Symfony\Component\Messenger\Exception\ExceptionInterface;

class RequeueMessageException extends \LogicException implements ExceptionInterface
class RequeueMessageException extends LogicException implements ExceptionInterface
{
}
3 changes: 2 additions & 1 deletion Exception/SendingMessageFailedException.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@

namespace Enqueue\MessengerAdapter\Exception;

use LogicException;
use Symfony\Component\Messenger\Exception\ExceptionInterface;

class SendingMessageFailedException extends \LogicException implements ExceptionInterface
class SendingMessageFailedException extends LogicException implements ExceptionInterface
{
}
28 changes: 15 additions & 13 deletions MessageBusProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,45 +19,47 @@
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Throwable;

/**
* The processor could be used with any queue interop compatible consumer, for example Enqueue's QueueConsumer.
*
* @author Max Kotliar <[email protected]>
* @author Samuel Roze <[email protected]>
*/
class MessageBusProcessor implements Processor
readonly class MessageBusProcessor implements Processor
{
private $bus;
private $messageDecoder;

public function __construct(MessageBusInterface $bus, SerializerInterface $messageDecoder)
{
$this->bus = $bus;
$this->messageDecoder = $messageDecoder;
public function __construct(
private MessageBusInterface $bus,
private SerializerInterface $messageDecoder,
) {
}

/**
* {@inheritDoc}
*/
public function process(Message $message, Context $context)
{
try {
$busMessage = $this->messageDecoder->decode(array(
$busMessage = $this->messageDecoder->decode([
'body' => $message->getBody(),
'headers' => $message->getHeaders(),
'properties' => $message->getProperties(),
));
} catch (MessageDecodingFailedException $e) {
]);
} catch (MessageDecodingFailedException) {
return Processor::REJECT;
}

try {
$this->bus->dispatch($busMessage);

return Processor::ACK;
} catch (RejectMessageException $e) {
} catch (RejectMessageException) {
return Processor::REJECT;
} catch (RequeueMessageException $e) {
} catch (RequeueMessageException) {
return Processor::REQUEUE;
} catch (\Throwable $e) {
} catch (Throwable) {
return Processor::REJECT;
}
}
Expand Down
Loading

0 comments on commit 849c5e6

Please sign in to comment.