Skip to content

Commit

Permalink
More CS Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
JimTools committed Jan 17, 2025
1 parent 4164e1e commit 9236342
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 18 deletions.
8 changes: 4 additions & 4 deletions RdKafkaConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,11 @@ private function doReceive(int $timeout): ?RdKafkaMessage
}

switch ($kafkaMessage->err) {
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
case RD_KAFKA_RESP_ERR__TIMED_OUT:
case RD_KAFKA_RESP_ERR__TRANSPORT:
case \RD_KAFKA_RESP_ERR__PARTITION_EOF:
case \RD_KAFKA_RESP_ERR__TIMED_OUT:
case \RD_KAFKA_RESP_ERR__TRANSPORT:
return null;
case RD_KAFKA_RESP_ERR_NO_ERROR:
case \RD_KAFKA_RESP_ERR_NO_ERROR:
$message = $this->serializer->toMessage($kafkaMessage->payload);
$message->setKey($kafkaMessage->key);
$message->setPartition($kafkaMessage->partition);
Expand Down
6 changes: 3 additions & 3 deletions RdKafkaContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ public static function getLibrdKafkaVersion(): string
if (!defined('RD_KAFKA_VERSION')) {
throw new \RuntimeException('RD_KAFKA_VERSION constant is not defined. Phprdkafka is probably not installed');
}
$major = (RD_KAFKA_VERSION & 0xFF000000) >> 24;
$minor = (RD_KAFKA_VERSION & 0x00FF0000) >> 16;
$patch = (RD_KAFKA_VERSION & 0x0000FF00) >> 8;
$major = (\RD_KAFKA_VERSION & 0xFF000000) >> 24;
$minor = (\RD_KAFKA_VERSION & 0x00FF0000) >> 16;
$patch = (\RD_KAFKA_VERSION & 0x0000FF00) >> 8;

return "$major.$minor.$patch";
}
Expand Down
2 changes: 1 addition & 1 deletion RdKafkaProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public function send(Destination $destination, Message $message): void
InvalidDestinationException::assertDestinationInstanceOf($destination, RdKafkaTopic::class);
InvalidMessageException::assertMessageInstanceOf($message, RdKafkaMessage::class);

$partition = $message->getPartition() ?? $destination->getPartition() ?? RD_KAFKA_PARTITION_UA;
$partition = $message->getPartition() ?? $destination->getPartition() ?? \RD_KAFKA_PARTITION_UA;
$payload = $this->serializer->toString($message);
$key = $message->getKey() ?? $destination->getKey() ?? null;

Expand Down
12 changes: 6 additions & 6 deletions Tests/RdKafkaConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue()
$destination = new RdKafkaTopic('dest');

$kafkaMessage = new Message();
$kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT;
$kafkaMessage->err = \RD_KAFKA_RESP_ERR__TIMED_OUT;

$kafkaConsumer = $this->createKafkaConsumerMock();
$kafkaConsumer
Expand Down Expand Up @@ -61,7 +61,7 @@ public function testShouldPassProperlyConfiguredTopicPartitionOnAssign()
$destination = new RdKafkaTopic('dest');

$kafkaMessage = new Message();
$kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT;
$kafkaMessage->err = \RD_KAFKA_RESP_ERR__TIMED_OUT;

$kafkaConsumer = $this->createKafkaConsumerMock();
$kafkaConsumer
Expand Down Expand Up @@ -91,7 +91,7 @@ public function testShouldSubscribeOnFirstReceiveOnly()
$destination = new RdKafkaTopic('dest');

$kafkaMessage = new Message();
$kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT;
$kafkaMessage->err = \RD_KAFKA_RESP_ERR__TIMED_OUT;

$kafkaConsumer = $this->createKafkaConsumerMock();
$kafkaConsumer
Expand Down Expand Up @@ -122,7 +122,7 @@ public function testShouldAssignWhenOffsetIsSet()
$destination->setPartition(1);

$kafkaMessage = new Message();
$kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT;
$kafkaMessage->err = \RD_KAFKA_RESP_ERR__TIMED_OUT;

$kafkaConsumer = $this->createKafkaConsumerMock();
$kafkaConsumer
Expand Down Expand Up @@ -154,7 +154,7 @@ public function testThrowOnOffsetChangeAfterSubscribing()
$destination = new RdKafkaTopic('dest');

$kafkaMessage = new Message();
$kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT;
$kafkaMessage->err = \RD_KAFKA_RESP_ERR__TIMED_OUT;

$kafkaConsumer = $this->createKafkaConsumerMock();
$kafkaConsumer
Expand Down Expand Up @@ -188,7 +188,7 @@ public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue()
$expectedMessage = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']);

$kafkaMessage = new Message();
$kafkaMessage->err = RD_KAFKA_RESP_ERR_NO_ERROR;
$kafkaMessage->err = \RD_KAFKA_RESP_ERR_NO_ERROR;
$kafkaMessage->payload = 'theSerializedMessage';

$kafkaConsumer = $this->createKafkaConsumerMock();
Expand Down
8 changes: 4 additions & 4 deletions Tests/RdKafkaProducerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public function testShouldUseSerializerToEncodeMessageAndPutToExpectedTube()
->expects($this->once())
->method('producev')
->with(
RD_KAFKA_PARTITION_UA,
\RD_KAFKA_PARTITION_UA,
0,
'theSerializedMessage',
'key',
Expand Down Expand Up @@ -183,7 +183,7 @@ public function testShouldAllowSerializersToSerializeKeys()
->expects($this->once())
->method('producev')
->with(
RD_KAFKA_PARTITION_UA,
\RD_KAFKA_PARTITION_UA,
0,
'theSerializedMessage',
'theSerializedKey'
Expand Down Expand Up @@ -324,7 +324,7 @@ public function testShouldAllowFalsyKeyFromMessage(): void
->expects($this->once())
->method('producev')
->with(
RD_KAFKA_PARTITION_UA,
\RD_KAFKA_PARTITION_UA,
0,
'',
$key
Expand Down Expand Up @@ -354,7 +354,7 @@ public function testShouldAllowFalsyKeyFromDestination(): void
->expects($this->once())
->method('producev')
->with(
RD_KAFKA_PARTITION_UA,
\RD_KAFKA_PARTITION_UA,
0,
'',
$key
Expand Down

0 comments on commit 9236342

Please sign in to comment.