Skip to content

Commit

Permalink
Refactor RoundRobinAssignor
Browse files Browse the repository at this point in the history
  • Loading branch information
Yurunsoft committed Dec 20, 2020
1 parent 56b98f7 commit e097dd0
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 20 deletions.
65 changes: 45 additions & 20 deletions src/Consumer/Assignor/RoundRobinAssignor.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace longlang\phpkafka\Consumer\Assignor;

use longlang\phpkafka\Consumer\Assignor\Struct\TopicAndPartition;
use longlang\phpkafka\Consumer\Struct\ConsumerGroupMemberMetadata;
use longlang\phpkafka\Group\Struct\ConsumerGroupMemberAssignment;
use longlang\phpkafka\Group\Struct\ConsumerGroupTopic;
use longlang\phpkafka\Protocol\JoinGroup\JoinGroupResponseMember;
Expand All @@ -13,47 +15,70 @@
class RoundRobinAssignor extends AbstractPartitionAssignor
{
/**
* @param JoinGroupResponseMember[] $members
* @param MetadataResponseTopic[] $topicMetadatas
* @param JoinGroupResponseMember[] $groupMembers
*
* @return SyncGroupRequestAssignment[]
*/
public function assign(MetadataResponseTopic $metadata, array $members): array
public function assign(array $topicMetadatas, array $groupMembers): array
{
$partitions = [];
foreach ($metadata->getPartitions() as $item) {
$partitions[spl_object_hash($item)] = $item->getPartitionIndex();
$memberIds = [];
/** @var TopicAndPartition[] $topicAndPartitions */
$topicAndPartitions = [];
foreach ($groupMembers as $groupMember) {
$memberId = $groupMember->getMemberId();
$consumerGroupMemberMetadata = new ConsumerGroupMemberMetadata();
$consumerGroupMemberMetadata->unpack($groupMember->getMetadata());
$memberIds[] = $memberId;
}
ksort($partitions);
foreach ($topicMetadatas as $topicMetadata) {
$topic = $topicMetadata->getName();
foreach ($this->getTopicPartitions($topic, $topicMetadatas) as $partition) {
$topicAndPartition = new TopicAndPartition($topic, $partition);
$topicAndPartitions[spl_object_hash($topicAndPartition)] = $topicAndPartition;
}
}
ksort($topicAndPartitions);

$assignments = [];
$consumersForTopic = [];
$memberPartitions = [];
foreach ($members as $member) {
$memberId = $member->getMemberId();
$consumersForTopic[$memberId] = $member;
foreach ($memberIds as $memberId) {
$assignments[] = $assignment = new SyncGroupRequestAssignment();
$assignment->setMemberId($memberId);
$memberPartitions[$memberId] = [];
}
ksort($consumersForTopic);

$memberCount = \count($members);
$memberCount = \count($memberIds);

$i = 0;
foreach ($partitions as $partition) {
$memberPartitions[$members[$i]->getMemberId()][] = $partition;
foreach ($topicAndPartitions as $topicAndPartition) {
$memberPartitions[$memberIds[$i]][] = $topicAndPartition;
++$i;
if ($i >= $memberCount) {
$i = 0;
}
}

for ($i = 0; $i < $memberCount; ++$i) {
$member = $members[$i];
$i = 0;
foreach ($memberPartitions as $memberId => $topicAndPartitions) {
$consumerGroupMemberAssignment = new ConsumerGroupMemberAssignment();
$consumerGroupTopic = new ConsumerGroupTopic();
$consumerGroupTopic->setTopicName($metadata->getName());
$consumerGroupTopic->setPartitions($memberPartitions[$member->getMemberId()]);
$consumerGroupMemberAssignment->setTopics([$consumerGroupTopic]);
$consumerGroupTopics = [];
/** @var TopicAndPartition[] $topicAndPartitions */
foreach ($topicAndPartitions as $topicAndPartition) {
$topic = $topicAndPartition->getTopic();
if (isset($consumerGroupTopics[$topic])) {
$consumerGroupTopic = $consumerGroupTopics[$topic];
} else {
$consumerGroupTopic = $consumerGroupTopics[$topic] = new ConsumerGroupTopic();
$consumerGroupTopic->setTopicName($topic);
}
$partitions = $consumerGroupTopic->getPartitions();
$partitions[] = $topicAndPartition->getPartition();
$consumerGroupTopic->setPartitions($partitions);
}
$consumerGroupMemberAssignment->setTopics($consumerGroupTopics);
$assignments[$i]->setAssignment($consumerGroupMemberAssignment->pack());
++$i;
}

return $assignments;
Expand Down
48 changes: 48 additions & 0 deletions src/Consumer/Assignor/Struct/TopicAndPartition.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?php

declare(strict_types=1);

namespace longlang\phpkafka\Consumer\Assignor\Struct;

class TopicAndPartition
{
/**
* @var string
*/
protected $topic;

/**
* @var int
*/
protected $partition;

public function __construct(string $topic, int $partition)
{
$this->topic = $topic;
$this->partition = $partition;
}

public function getTopic(): string
{
return $this->topic;
}

public function setTopic(string $topic): self
{
$this->topic = $topic;

return $this;
}

public function getPartition(): int
{
return $this->partition;
}

public function setPartition(int $partition): self
{
$this->partition = $partition;

return $this;
}
}
3 changes: 3 additions & 0 deletions src/Group/Struct/ConsumerGroupMemberAssignment.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ public function getTopics(): array
return $this->topics;
}

/**
* @param ConsumerGroupTopic[] $topics
*/
public function setTopics(array $topics): self
{
$this->topics = $topics;
Expand Down

0 comments on commit e097dd0

Please sign in to comment.