Skip to content

Commit

Permalink
KAFKA-8371: Remove dependence on ReplicaManager from Partition (#6705)
Browse files Browse the repository at this point in the history
This patch attempts to simplify the interaction between Partition and the various components from `ReplicaManager`. This is primarily to make unit testing easier. I have also tried to eliminate the OfflinePartition sentinel which has always been unsafe.

Reviewers: Boyang Chen <[email protected]>, David Arthur <[email protected]>
  • Loading branch information
hachikuji authored May 23, 2019
1 parent 5351efe commit 3696b98
Show file tree
Hide file tree
Showing 29 changed files with 696 additions and 563 deletions.
341 changes: 217 additions & 124 deletions core/src/main/scala/kafka/cluster/Partition.scala

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/AdminManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class AdminManager(val config: KafkaConfig,
private val alterConfigPolicy =
Option(config.getConfiguredInstance(KafkaConfig.AlterConfigPolicyClassNameProp, classOf[AlterConfigPolicy]))

def hasDelayedTopicOperations = topicPurgatory.delayed != 0
def hasDelayedTopicOperations = topicPurgatory.numDelayed != 0

/**
* Try to complete delayed topic operations with the request key
Expand Down
25 changes: 13 additions & 12 deletions core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kafka.server

import java.util.concurrent.TimeUnit

import kafka.cluster.Partition
import kafka.metrics.KafkaMetricsGroup
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.TopicPartition
Expand Down Expand Up @@ -73,19 +74,19 @@ class DelayedDeleteRecords(delayMs: Long,
// skip those partitions that have already been satisfied
if (status.acksPending) {
val (lowWatermarkReached, error, lw) = replicaManager.getPartition(topicPartition) match {
case Some(partition) =>
if (partition eq ReplicaManager.OfflinePartition) {
(false, Errors.KAFKA_STORAGE_ERROR, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
} else {
partition.leaderReplicaIfLocal match {
case Some(_) =>
val leaderLW = partition.lowWatermarkIfLeader
(leaderLW >= status.requiredOffset, Errors.NONE, leaderLW)
case None =>
(false, Errors.NOT_LEADER_FOR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
}
case partition: Partition =>
partition.leaderReplicaIfLocal match {
case Some(_) =>
val leaderLW = partition.lowWatermarkIfLeader
(leaderLW >= status.requiredOffset, Errors.NONE, leaderLW)
case None =>
(false, Errors.NOT_LEADER_FOR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
}
case None =>

case HostedPartition.Offline =>
(false, Errors.KAFKA_STORAGE_ERROR, DeleteRecordsResponse.INVALID_LOW_WATERMARK)

case HostedPartition.None =>
(false, Errors.UNKNOWN_TOPIC_OR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
}
if (error != Errors.NONE || lowWatermarkReached) {
Expand Down
15 changes: 9 additions & 6 deletions core/src/main/scala/kafka/server/DelayedOperation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ import scala.collection.mutable.ListBuffer
* A subclass of DelayedOperation needs to provide an implementation of both onComplete() and tryComplete().
*/
abstract class DelayedOperation(override val delayMs: Long,
lockOpt: Option[Lock] = None) extends TimerTask with Logging {
lockOpt: Option[Lock] = None)
extends TimerTask with Logging {

private val completed = new AtomicBoolean(false)
private val tryCompletePending = new AtomicBoolean(false)
Expand Down Expand Up @@ -209,7 +210,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
newGauge(
"NumDelayedOperations",
new Gauge[Int] {
def value: Int = delayed
def value: Int = numDelayed
},
metricsTags
)
Expand Down Expand Up @@ -288,10 +289,12 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
def checkAndComplete(key: Any): Int = {
val wl = watcherList(key)
val watchers = inLock(wl.watchersLock) { wl.watchersByKey.get(key) }
if(watchers == null)
val numCompleted = if (watchers == null)
0
else
watchers.tryCompleteWatched()
debug(s"Request key $key unblocked $numCompleted $purgatoryName operations")
numCompleted
}

/**
Expand All @@ -306,7 +309,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
/**
* Return the number of delayed operations in the expiry queue
*/
def delayed: Int = timeoutTimer.size
def numDelayed: Int = timeoutTimer.size

/**
* Cancel watching on any delayed operations for the given key. Note the operation will not be completed
Expand Down Expand Up @@ -435,11 +438,11 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
// Trigger a purge if the number of completed but still being watched operations is larger than
// the purge threshold. That number is computed by the difference btw the estimated total number of
// operations and the number of pending delayed operations.
if (estimatedTotalOperations.get - delayed > purgeInterval) {
if (estimatedTotalOperations.get - numDelayed > purgeInterval) {
// now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to
// clean up watchers. Note that, if more operations are completed during the clean up, we may end up with
// a little overestimated total number of operations.
estimatedTotalOperations.getAndSet(delayed)
estimatedTotalOperations.getAndSet(numDelayed)
debug("Begin purging watch lists")
val purged = watcherLists.foldLeft(0) {
case (sum, watcherList) => sum + watcherList.allWatchers.map(_.purgeCompleted()).sum
Expand Down
15 changes: 8 additions & 7 deletions core/src/main/scala/kafka/server/DelayedProduce.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.Lock

import com.yammer.metrics.core.Meter
import kafka.cluster.Partition
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.Pool

import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
Expand Down Expand Up @@ -88,12 +88,13 @@ class DelayedProduce(delayMs: Long,
// skip those partitions that have already been satisfied
if (status.acksPending) {
val (hasEnough, error) = replicaManager.getPartition(topicPartition) match {
case Some(partition) =>
if (partition eq ReplicaManager.OfflinePartition)
(false, Errors.KAFKA_STORAGE_ERROR)
else
partition.checkEnoughReplicasReachOffset(status.requiredOffset)
case None =>
case partition: Partition =>
partition.checkEnoughReplicasReachOffset(status.requiredOffset)

case HostedPartition.Offline =>
(false, Errors.KAFKA_STORAGE_ERROR)

case HostedPartition.None =>
// Case A
(false, Errors.UNKNOWN_TOPIC_OR_PARTITION)
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
if (replicaManager.hasDelayedElectionOperations) {
updateMetadataRequest.partitionStates.asScala.foreach { case (tp, ps) =>
replicaManager.tryCompleteElection(new TopicPartitionOperationKey(tp.topic(), tp.partition()))
updateMetadataRequest.partitionStates.asScala.foreach { case (tp, _) =>
replicaManager.tryCompleteElection(new TopicPartitionOperationKey(tp))
}
}
sendResponseExemptThrottle(request, new UpdateMetadataResponse(Errors.NONE))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ class ReplicaAlterLogDirsThread(name: String,
override def processPartitionData(topicPartition: TopicPartition,
fetchOffset: Long,
partitionData: PartitionData[Records]): Option[LogAppendInfo] = {
val futureReplica = replicaMgr.futureLocalReplicaOrException(topicPartition)
val partition = replicaMgr.getPartition(topicPartition).get
val partition = replicaMgr.nonOfflinePartition(topicPartition).get
val futureReplica = partition.futureLocalReplicaOrException
val records = toMemoryRecords(partitionData.records)

if (fetchOffset != futureReplica.logEndOffset)
Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ class ReplicaFetcherThread(name: String,
override def processPartitionData(topicPartition: TopicPartition,
fetchOffset: Long,
partitionData: FetchData): Option[LogAppendInfo] = {
val replica = replicaMgr.localReplicaOrException(topicPartition)
val partition = replicaMgr.getPartition(topicPartition).get
val partition = replicaMgr.nonOfflinePartition(topicPartition).get
val replica = partition.localReplicaOrException
val records = toMemoryRecords(partitionData.records)

maybeWarnIfOversizedRecords(records, topicPartition)
Expand Down Expand Up @@ -277,8 +277,9 @@ class ReplicaFetcherThread(name: String,
* The logic for finding the truncation offset is implemented in AbstractFetcherThread.getOffsetTruncationState
*/
override def truncate(tp: TopicPartition, offsetTruncationState: OffsetTruncationState): Unit = {
val replica = replicaMgr.localReplicaOrException(tp)
val partition = replicaMgr.getPartition(tp).get
val partition = replicaMgr.nonOfflinePartition(tp).get
val replica = partition.localReplicaOrException

partition.truncateTo(offsetTruncationState.offset, isFuture = false)

if (offsetTruncationState.offset < replica.highWatermark.messageOffset)
Expand All @@ -292,7 +293,7 @@ class ReplicaFetcherThread(name: String,
}

override protected def truncateFullyAndStartAt(topicPartition: TopicPartition, offset: Long): Unit = {
val partition = replicaMgr.getPartition(topicPartition).get
val partition = replicaMgr.nonOfflinePartition(topicPartition).get
partition.truncateFullyAndStartAt(offset, isFuture = false)
}

Expand Down
Loading

0 comments on commit 3696b98

Please sign in to comment.