diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 30ce7567ce867..256f1a046cea7 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -16,7 +16,7 @@ */ package kafka.cluster -import java.util.Optional +import java.util.{Optional, Properties} import java.util.concurrent.locks.ReentrantReadWriteLock import com.yammer.metrics.core.Gauge @@ -26,6 +26,7 @@ import kafka.controller.KafkaController import kafka.log._ import kafka.metrics.KafkaMetricsGroup import kafka.server._ +import kafka.server.checkpoints.OffsetCheckpoints import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ import kafka.zk.{AdminZkClient, KafkaZkClient} @@ -42,19 +43,114 @@ import org.apache.kafka.common.utils.Time import scala.collection.JavaConverters._ import scala.collection.Map -object Partition { +trait PartitionStateStore { + def fetchTopicConfig(): Properties + def shrinkIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int] + def expandIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int] +} + +class ZkPartitionStateStore(topicPartition: TopicPartition, + zkClient: KafkaZkClient, + replicaManager: ReplicaManager) extends PartitionStateStore { + + override def fetchTopicConfig(): Properties = { + val adminZkClient = new AdminZkClient(zkClient) + adminZkClient.fetchEntityConfig(ConfigType.Topic, topicPartition.topic) + } + + override def shrinkIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int] = { + val newVersionOpt = updateIsr(controllerEpoch, leaderAndIsr) + if (newVersionOpt.isDefined) + replicaManager.isrShrinkRate.mark() + newVersionOpt + } + + override def expandIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int] = { + val newVersionOpt = updateIsr(controllerEpoch, leaderAndIsr) + if (newVersionOpt.isDefined) + replicaManager.isrExpandRate.mark() + newVersionOpt + } + + private def updateIsr(controllerEpoch: Int, leaderAndIsr: LeaderAndIsr): Option[Int] = { + val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topicPartition, + leaderAndIsr, controllerEpoch) + + if (updateSucceeded) { + replicaManager.recordIsrChange(topicPartition) + Some(newVersion) + } else { + replicaManager.failedIsrUpdatesRate.mark() + None + } + } +} + +class DelayedOperations(topicPartition: TopicPartition, + produce: DelayedOperationPurgatory[DelayedProduce], + fetch: DelayedOperationPurgatory[DelayedFetch], + deleteRecords: DelayedOperationPurgatory[DelayedDeleteRecords]) { + + def checkAndCompleteAll(): Unit = { + val requestKey = new TopicPartitionOperationKey(topicPartition) + fetch.checkAndComplete(requestKey) + produce.checkAndComplete(requestKey) + deleteRecords.checkAndComplete(requestKey) + } + + def checkAndCompleteFetch(): Unit = { + fetch.checkAndComplete(new TopicPartitionOperationKey(topicPartition)) + } + + def checkAndCompleteProduce(): Unit = { + produce.checkAndComplete(new TopicPartitionOperationKey(topicPartition)) + } + + def checkAndCompleteDeleteRecords(): Unit = { + deleteRecords.checkAndComplete(new TopicPartitionOperationKey(topicPartition)) + } + + def numDelayedDelete: Int = deleteRecords.numDelayed + + def numDelayedFetch: Int = fetch.numDelayed + + def numDelayedProduce: Int = produce.numDelayed +} + +object Partition extends KafkaMetricsGroup { def apply(topicPartition: TopicPartition, time: Time, replicaManager: ReplicaManager): Partition = { + val zkIsrBackingStore = new ZkPartitionStateStore( + topicPartition, + replicaManager.zkClient, + replicaManager) + + val delayedOperations = new DelayedOperations( + topicPartition, + replicaManager.delayedProducePurgatory, + replicaManager.delayedFetchPurgatory, + replicaManager.delayedDeleteRecordsPurgatory) + new Partition(topicPartition, - isOffline = false, replicaLagTimeMaxMs = replicaManager.config.replicaLagTimeMaxMs, interBrokerProtocolVersion = replicaManager.config.interBrokerProtocolVersion, localBrokerId = replicaManager.config.brokerId, time = time, - replicaManager = replicaManager, - logManager = replicaManager.logManager, - zkClient = replicaManager.zkClient) + stateStore = zkIsrBackingStore, + delayedOperations = delayedOperations, + metadataCache = replicaManager.metadataCache, + logManager = replicaManager.logManager) + } + + def removeMetrics(topicPartition: TopicPartition): Unit = { + val tags = Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString) + removeMetric("UnderReplicated", tags) + removeMetric("UnderMinIsr", tags) + removeMetric("InSyncReplicasCount", tags) + removeMetric("ReplicasCount", tags) + removeMetric("LastStableOffsetLag", tags) + removeMetric("AtMinIsr", tags) } } @@ -62,14 +158,14 @@ object Partition { * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR */ class Partition(val topicPartition: TopicPartition, - val isOffline: Boolean, private val replicaLagTimeMaxMs: Long, private val interBrokerProtocolVersion: ApiVersion, private val localBrokerId: Int, private val time: Time, - private val replicaManager: ReplicaManager, - private val logManager: LogManager, - private val zkClient: KafkaZkClient) extends Logging with KafkaMetricsGroup { + private val stateStore: PartitionStateStore, + private val delayedOperations: DelayedOperations, + private val metadataCache: MetadataCache, + private val logManager: LogManager) extends HostedPartition with Logging with KafkaMetricsGroup { def topic: String = topicPartition.topic def partitionId: Int = topicPartition.partition @@ -94,68 +190,65 @@ class Partition(val topicPartition: TopicPartition, private var controllerEpoch: Int = KafkaController.InitialControllerEpoch this.logIdent = s"[Partition $topicPartition broker=$localBrokerId] " - private def isReplicaLocal(replicaId: Int) : Boolean = replicaId == localBrokerId || replicaId == Request.FutureLocalReplicaId + private def isReplicaLocal(replicaId: Int): Boolean = replicaId == localBrokerId || replicaId == Request.FutureLocalReplicaId private val tags = Map("topic" -> topic, "partition" -> partitionId.toString) - // Do not create metrics if this partition is ReplicaManager.OfflinePartition - if (!isOffline) { - newGauge("UnderReplicated", - new Gauge[Int] { - def value = { - if (isUnderReplicated) 1 else 0 - } - }, - tags - ) - - newGauge("InSyncReplicasCount", - new Gauge[Int] { - def value = { - if (isLeaderReplicaLocal) inSyncReplicas.size else 0 - } - }, - tags - ) - - newGauge("UnderMinIsr", - new Gauge[Int] { - def value = { - if (isUnderMinIsr) 1 else 0 - } - }, - tags - ) - - newGauge("AtMinIsr", - new Gauge[Int] { - def value = { - if (isAtMinIsr) 1 else 0 - } - }, - tags - ) - - newGauge("ReplicasCount", - new Gauge[Int] { - def value = { - if (isLeaderReplicaLocal) assignedReplicas.size else 0 - } - }, - tags - ) - - newGauge("LastStableOffsetLag", - new Gauge[Long] { - def value = { - leaderReplicaIfLocal.map { replica => - replica.highWatermark.messageOffset - replica.lastStableOffset.messageOffset - }.getOrElse(0) - } - }, - tags - ) - } + newGauge("UnderReplicated", + new Gauge[Int] { + def value = { + if (isUnderReplicated) 1 else 0 + } + }, + tags + ) + + newGauge("InSyncReplicasCount", + new Gauge[Int] { + def value = { + if (isLeaderReplicaLocal) inSyncReplicas.size else 0 + } + }, + tags + ) + + newGauge("UnderMinIsr", + new Gauge[Int] { + def value = { + if (isUnderMinIsr) 1 else 0 + } + }, + tags + ) + + newGauge("AtMinIsr", + new Gauge[Int] { + def value = { + if (isAtMinIsr) 1 else 0 + } + }, + tags + ) + + newGauge("ReplicasCount", + new Gauge[Int] { + def value = { + if (isLeaderReplicaLocal) assignedReplicas.size else 0 + } + }, + tags + ) + + newGauge("LastStableOffsetLag", + new Gauge[Long] { + def value = { + leaderReplicaIfLocal.map { replica => + replica.highWatermark.messageOffset - replica.lastStableOffset.messageOffset + }.getOrElse(0) + } + }, + tags + ) private def isLeaderReplicaLocal: Boolean = leaderReplicaIfLocal.isDefined @@ -185,15 +278,17 @@ class Partition(val topicPartition: TopicPartition, * does not exist. This method assumes that the current replica has already been created. * * @param logDir log directory + * @param highWatermarkCheckpoints Checkpoint to load initial high watermark from * @return true iff the future replica is created */ - def maybeCreateFutureReplica(logDir: String): Boolean = { + def maybeCreateFutureReplica(logDir: String, highWatermarkCheckpoints: OffsetCheckpoints): Boolean = { // The writeLock is needed to make sure that while the caller checks the log directory of the // current replica and the existence of the future replica, no other thread can update the log directory of the // current replica or remove the future replica. inWriteLock(leaderIsrUpdateLock) { val currentReplica = localReplicaOrException - if (currentReplica.log.get.dir.getParent == logDir) + val currentLog = currentReplica.log.get + if (currentLog.dir.getParent == logDir) false else { futureLocalReplica match { @@ -204,26 +299,25 @@ class Partition(val topicPartition: TopicPartition, s"different from the requested log dir $logDir") false case None => - getOrCreateReplica(Request.FutureLocalReplicaId, isNew = false) + getOrCreateReplica(Request.FutureLocalReplicaId, isNew = false, highWatermarkCheckpoints) true } } } } - def getOrCreateReplica(replicaId: Int, isNew: Boolean = false): Replica = { + def getOrCreateReplica(replicaId: Int, isNew: Boolean, offsetCheckpoints: OffsetCheckpoints): Replica = { allReplicasMap.getAndMaybePut(replicaId, { if (isReplicaLocal(replicaId)) { - val adminZkClient = new AdminZkClient(zkClient) - val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic) + val props = stateStore.fetchTopicConfig() val config = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props) val log = logManager.getOrCreateLog(topicPartition, config, isNew, replicaId == Request.FutureLocalReplicaId) - val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent) - val offsetMap = checkpoint.read() - if (!offsetMap.contains(topicPartition)) + val checkpointHighWatermark = offsetCheckpoints.fetch(log.dir.getParent, topicPartition).getOrElse { info(s"No checkpointed highwatermark is found for partition $topicPartition") - val offset = math.min(offsetMap.getOrElse(topicPartition, 0L), log.logEndOffset) - new Replica(replicaId, topicPartition, time, offset, Some(log)) + 0L + } + val initialHighWatermark = math.min(checkpointHighWatermark, log.logEndOffset) + new Replica(replicaId, topicPartition, time, initialHighWatermark, Some(log)) } else new Replica(replicaId, topicPartition, time) }) } @@ -300,6 +394,7 @@ class Partition(val topicPartition: TopicPartition, } } + // Visible for testing def addReplicaIfNotExists(replica: Replica): Replica = allReplicasMap.putIfNotExists(replica.brokerId, replica) @@ -370,7 +465,7 @@ class Partition(val topicPartition: TopicPartition, inSyncReplicas = Set.empty[Replica] leaderReplicaIdOpt = None leaderEpochStartOffsetOpt = None - removePartitionMetrics() + Partition.removeMetrics(topicPartition) logManager.asyncDelete(topicPartition) if (logManager.getLog(topicPartition, isFuture = true).isDefined) logManager.asyncDelete(topicPartition, isFuture = true) @@ -384,18 +479,23 @@ class Partition(val topicPartition: TopicPartition, * from the time when this broker was the leader last time) and setting the new leader and ISR. * If the leader replica id does not change, return false to indicate the replica manager. */ - def makeLeader(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = { + def makeLeader(controllerId: Int, + partitionStateInfo: LeaderAndIsrRequest.PartitionState, + correlationId: Int, + highWatermarkCheckpoints: OffsetCheckpoints): Boolean = { val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) { val newAssignedReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt) // record the epoch of the controller that made the leadership decision. This is useful while updating the isr // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch // add replicas that are new - val newInSyncReplicas = partitionStateInfo.basePartitionState.isr.asScala.map(r => getOrCreateReplica(r, partitionStateInfo.isNew)).toSet + val newInSyncReplicas = partitionStateInfo.basePartitionState.isr.asScala.map { + id => getOrCreateReplica(id, partitionStateInfo.isNew, highWatermarkCheckpoints) + }.toSet // remove assigned replicas that have been removed by the controller (assignedReplicas.map(_.brokerId) -- newAssignedReplicas).foreach(removeReplica) inSyncReplicas = newInSyncReplicas - newAssignedReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew)) + newAssignedReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew, highWatermarkCheckpoints)) val leaderReplica = localReplicaOrException val leaderEpochStartOffset = leaderReplica.logEndOffset @@ -448,7 +548,10 @@ class Partition(val topicPartition: TopicPartition, * greater (that is, no updates have been missed), return false to indicate to the * replica manager that state is already correct and the become-follower steps can be skipped */ - def makeFollower(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = { + def makeFollower(controllerId: Int, + partitionStateInfo: LeaderAndIsrRequest.PartitionState, + correlationId: Int, + highWatermarkCheckpoints: OffsetCheckpoints): Boolean = { inWriteLock(leaderIsrUpdateLock) { val newAssignedReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt) val newLeaderBrokerId = partitionStateInfo.basePartitionState.leader @@ -457,7 +560,7 @@ class Partition(val topicPartition: TopicPartition, // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch // add replicas that are new - newAssignedReplicas.foreach(r => getOrCreateReplica(r, partitionStateInfo.isNew)) + newAssignedReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew, highWatermarkCheckpoints)) // remove assigned replicas that have been removed by the controller (assignedReplicas.map(_.brokerId) -- newAssignedReplicas).foreach(removeReplica) inSyncReplicas = Set.empty[Replica] @@ -483,9 +586,9 @@ class Partition(val topicPartition: TopicPartition, def updateReplicaLogReadResult(replica: Replica, logReadResult: LogReadResult): Boolean = { val replicaId = replica.brokerId // No need to calculate low watermark if there is no delayed DeleteRecordsRequest - val oldLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L + val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L replica.updateLogReadResult(logReadResult) - val newLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L + val newLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L // check if the LW of the partition has incremented // since the replica's logStartOffset may have incremented val leaderLWIncremented = newLeaderLW > oldLeaderLW @@ -533,9 +636,9 @@ class Partition(val topicPartition: TopicPartition, val newInSyncReplicas = inSyncReplicas + replica info(s"Expanding ISR from ${inSyncReplicas.map(_.brokerId).mkString(",")} " + s"to ${newInSyncReplicas.map(_.brokerId).mkString(",")}") + // update ISR in ZK and cache - updateIsr(newInSyncReplicas) - replicaManager.isrExpandRate.mark() + expandIsr(newInSyncReplicas) } // check if the HW of the partition can now be incremented // since the replica may already be in the ISR and its LEO has just incremented @@ -634,7 +737,7 @@ class Partition(val topicPartition: TopicPartition, if (!isLeaderReplicaLocal) throw new NotLeaderForPartitionException(s"Leader not local for partition $topicPartition on broker $localBrokerId") val logStartOffsets = allReplicas.collect { - case replica if replicaManager.metadataCache.isBrokerAlive(replica.brokerId) || replica.brokerId == Request.FutureLocalReplicaId => replica.logStartOffset + case replica if metadataCache.isBrokerAlive(replica.brokerId) || replica.brokerId == Request.FutureLocalReplicaId => replica.logStartOffset } CoreUtils.min(logStartOffsets, 0L) } @@ -642,12 +745,7 @@ class Partition(val topicPartition: TopicPartition, /** * Try to complete any pending requests. This should be called without holding the leaderIsrUpdateLock. */ - private def tryCompleteDelayedRequests() { - val requestKey = new TopicPartitionOperationKey(topicPartition) - replicaManager.tryCompleteDelayedFetch(requestKey) - replicaManager.tryCompleteDelayedProduce(requestKey) - replicaManager.tryCompleteDelayedDeleteRecords(requestKey) - } + private def tryCompleteDelayedRequests(): Unit = delayedOperations.checkAndCompleteAll() def maybeShrinkIsr(replicaMaxLagTimeMs: Long) { val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) { @@ -669,8 +767,7 @@ class Partition(val topicPartition: TopicPartition, ) // update ISR in zk and in cache - updateIsr(newInSyncReplicas) - replicaManager.isrShrinkRate.mark() + shrinkIsr(newInSyncReplicas) // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(leaderReplica) @@ -785,7 +882,7 @@ class Partition(val topicPartition: TopicPartition, tryCompleteDelayedRequests() else { // probably unblock some follower fetch requests since log end offset has been updated - replicaManager.tryCompleteDelayedFetch(new TopicPartitionOperationKey(topicPartition)) + delayedOperations.checkAndCompleteFetch() } info @@ -1015,41 +1112,37 @@ class Partition(val topicPartition: TopicPartition, } } - private def updateIsr(newIsr: Set[Replica]) { + private def expandIsr(newIsr: Set[Replica]): Unit = { val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(_.brokerId).toList, zkVersion) - val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topicPartition, newLeaderAndIsr, - controllerEpoch) + val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr) + maybeUpdateIsrAndVersion(newIsr, zkVersionOpt) + } - if (updateSucceeded) { - replicaManager.recordIsrChange(topicPartition) - inSyncReplicas = newIsr - zkVersion = newVersion - trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion)) - } else { - replicaManager.failedIsrUpdatesRate.mark() - info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion)) - } + private def shrinkIsr(newIsr: Set[Replica]): Unit = { + val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(_.brokerId).toList, zkVersion) + val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr) + maybeUpdateIsrAndVersion(newIsr, zkVersionOpt) } - /** - * remove deleted log metrics - */ - def removePartitionMetrics() { - removeMetric("UnderReplicated", tags) - removeMetric("UnderMinIsr", tags) - removeMetric("InSyncReplicasCount", tags) - removeMetric("ReplicasCount", tags) - removeMetric("LastStableOffsetLag", tags) - removeMetric("AtMinIsr", tags) + private def maybeUpdateIsrAndVersion(isr: Set[Replica], zkVersionOpt: Option[Int]): Unit = { + zkVersionOpt match { + case Some(newVersion) => + inSyncReplicas = isr + zkVersion = newVersion + info("ISR updated to [%s] and zkVersion updated to [%d]".format(isr.mkString(","), zkVersion)) + + case None => + info(s"Cached zkVersion $zkVersion not equal to that in zookeeper, skip updating ISR") + } } override def equals(that: Any): Boolean = that match { - case other: Partition => partitionId == other.partitionId && topic == other.topic && isOffline == other.isOffline + case other: Partition => partitionId == other.partitionId && topic == other.topic case _ => false } override def hashCode: Int = - 31 + topic.hashCode + 17 * partitionId + (if (isOffline) 1 else 0) + 31 + topic.hashCode + 17 * partitionId override def toString(): String = { val partitionString = new StringBuilder diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala index d4247001cafd5..85d272c4e593d 100644 --- a/core/src/main/scala/kafka/server/AdminManager.scala +++ b/core/src/main/scala/kafka/server/AdminManager.scala @@ -60,7 +60,7 @@ class AdminManager(val config: KafkaConfig, private val alterConfigPolicy = Option(config.getConfiguredInstance(KafkaConfig.AlterConfigPolicyClassNameProp, classOf[AlterConfigPolicy])) - def hasDelayedTopicOperations = topicPurgatory.delayed != 0 + def hasDelayedTopicOperations = topicPurgatory.numDelayed != 0 /** * Try to complete delayed topic operations with the request key diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala index a977d9a371d01..dac9f7927ec77 100644 --- a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala +++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala @@ -20,6 +20,7 @@ package kafka.server import java.util.concurrent.TimeUnit +import kafka.cluster.Partition import kafka.metrics.KafkaMetricsGroup import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.TopicPartition @@ -73,19 +74,19 @@ class DelayedDeleteRecords(delayMs: Long, // skip those partitions that have already been satisfied if (status.acksPending) { val (lowWatermarkReached, error, lw) = replicaManager.getPartition(topicPartition) match { - case Some(partition) => - if (partition eq ReplicaManager.OfflinePartition) { - (false, Errors.KAFKA_STORAGE_ERROR, DeleteRecordsResponse.INVALID_LOW_WATERMARK) - } else { - partition.leaderReplicaIfLocal match { - case Some(_) => - val leaderLW = partition.lowWatermarkIfLeader - (leaderLW >= status.requiredOffset, Errors.NONE, leaderLW) - case None => - (false, Errors.NOT_LEADER_FOR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK) - } + case partition: Partition => + partition.leaderReplicaIfLocal match { + case Some(_) => + val leaderLW = partition.lowWatermarkIfLeader + (leaderLW >= status.requiredOffset, Errors.NONE, leaderLW) + case None => + (false, Errors.NOT_LEADER_FOR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK) } - case None => + + case HostedPartition.Offline => + (false, Errors.KAFKA_STORAGE_ERROR, DeleteRecordsResponse.INVALID_LOW_WATERMARK) + + case HostedPartition.None => (false, Errors.UNKNOWN_TOPIC_OR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK) } if (error != Errors.NONE || lowWatermarkReached) { diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index eb20e6d847da6..33187bb86dc6f 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -44,7 +44,8 @@ import scala.collection.mutable.ListBuffer * A subclass of DelayedOperation needs to provide an implementation of both onComplete() and tryComplete(). */ abstract class DelayedOperation(override val delayMs: Long, - lockOpt: Option[Lock] = None) extends TimerTask with Logging { + lockOpt: Option[Lock] = None) + extends TimerTask with Logging { private val completed = new AtomicBoolean(false) private val tryCompletePending = new AtomicBoolean(false) @@ -209,7 +210,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri newGauge( "NumDelayedOperations", new Gauge[Int] { - def value: Int = delayed + def value: Int = numDelayed }, metricsTags ) @@ -288,10 +289,12 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri def checkAndComplete(key: Any): Int = { val wl = watcherList(key) val watchers = inLock(wl.watchersLock) { wl.watchersByKey.get(key) } - if(watchers == null) + val numCompleted = if (watchers == null) 0 else watchers.tryCompleteWatched() + debug(s"Request key $key unblocked $numCompleted $purgatoryName operations") + numCompleted } /** @@ -306,7 +309,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri /** * Return the number of delayed operations in the expiry queue */ - def delayed: Int = timeoutTimer.size + def numDelayed: Int = timeoutTimer.size /** * Cancel watching on any delayed operations for the given key. Note the operation will not be completed @@ -435,11 +438,11 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri // Trigger a purge if the number of completed but still being watched operations is larger than // the purge threshold. That number is computed by the difference btw the estimated total number of // operations and the number of pending delayed operations. - if (estimatedTotalOperations.get - delayed > purgeInterval) { + if (estimatedTotalOperations.get - numDelayed > purgeInterval) { // now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to // clean up watchers. Note that, if more operations are completed during the clean up, we may end up with // a little overestimated total number of operations. - estimatedTotalOperations.getAndSet(delayed) + estimatedTotalOperations.getAndSet(numDelayed) debug("Begin purging watch lists") val purged = watcherLists.foldLeft(0) { case (sum, watcherList) => sum + watcherList.allWatchers.map(_.purgeCompleted()).sum diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index dbecba4e018cf..1570d4baf2827 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -22,9 +22,9 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.locks.Lock import com.yammer.metrics.core.Meter +import kafka.cluster.Partition import kafka.metrics.KafkaMetricsGroup import kafka.utils.Pool - import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse @@ -88,12 +88,13 @@ class DelayedProduce(delayMs: Long, // skip those partitions that have already been satisfied if (status.acksPending) { val (hasEnough, error) = replicaManager.getPartition(topicPartition) match { - case Some(partition) => - if (partition eq ReplicaManager.OfflinePartition) - (false, Errors.KAFKA_STORAGE_ERROR) - else - partition.checkEnoughReplicasReachOffset(status.requiredOffset) - case None => + case partition: Partition => + partition.checkEnoughReplicasReachOffset(status.requiredOffset) + + case HostedPartition.Offline => + (false, Errors.KAFKA_STORAGE_ERROR) + + case HostedPartition.None => // Case A (false, Errors.UNKNOWN_TOPIC_OR_PARTITION) } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index d362d64cca2f1..9cedb01702ca0 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -263,8 +263,8 @@ class KafkaApis(val requestChannel: RequestChannel, } } if (replicaManager.hasDelayedElectionOperations) { - updateMetadataRequest.partitionStates.asScala.foreach { case (tp, ps) => - replicaManager.tryCompleteElection(new TopicPartitionOperationKey(tp.topic(), tp.partition())) + updateMetadataRequest.partitionStates.asScala.foreach { case (tp, _) => + replicaManager.tryCompleteElection(new TopicPartitionOperationKey(tp)) } } sendResponseExemptThrottle(request, new UpdateMetadataResponse(Errors.NONE)) diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index 4312a92cd3f52..0622b303498d2 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -101,8 +101,8 @@ class ReplicaAlterLogDirsThread(name: String, override def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData[Records]): Option[LogAppendInfo] = { - val futureReplica = replicaMgr.futureLocalReplicaOrException(topicPartition) - val partition = replicaMgr.getPartition(topicPartition).get + val partition = replicaMgr.nonOfflinePartition(topicPartition).get + val futureReplica = partition.futureLocalReplicaOrException val records = toMemoryRecords(partitionData.records) if (fetchOffset != futureReplica.logEndOffset) diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index ab5be6ea63dde..b1b5dd0551dc2 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -143,8 +143,8 @@ class ReplicaFetcherThread(name: String, override def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: FetchData): Option[LogAppendInfo] = { - val replica = replicaMgr.localReplicaOrException(topicPartition) - val partition = replicaMgr.getPartition(topicPartition).get + val partition = replicaMgr.nonOfflinePartition(topicPartition).get + val replica = partition.localReplicaOrException val records = toMemoryRecords(partitionData.records) maybeWarnIfOversizedRecords(records, topicPartition) @@ -277,8 +277,9 @@ class ReplicaFetcherThread(name: String, * The logic for finding the truncation offset is implemented in AbstractFetcherThread.getOffsetTruncationState */ override def truncate(tp: TopicPartition, offsetTruncationState: OffsetTruncationState): Unit = { - val replica = replicaMgr.localReplicaOrException(tp) - val partition = replicaMgr.getPartition(tp).get + val partition = replicaMgr.nonOfflinePartition(tp).get + val replica = partition.localReplicaOrException + partition.truncateTo(offsetTruncationState.offset, isFuture = false) if (offsetTruncationState.offset < replica.highWatermark.messageOffset) @@ -292,7 +293,7 @@ class ReplicaFetcherThread(name: String, } override protected def truncateFullyAndStartAt(topicPartition: TopicPartition, offset: Long): Unit = { - val partition = replicaMgr.getPartition(topicPartition).get + val partition = replicaMgr.nonOfflinePartition(topicPartition).get partition.truncateFullyAndStartAt(offset, isFuture = false) } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 2023a972108c4..54d35ef04b83f 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -22,14 +22,14 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import java.util.concurrent.locks.Lock -import com.yammer.metrics.core.Gauge +import com.yammer.metrics.core.{Gauge, Meter} import kafka.api._ import kafka.cluster.{BrokerEndPoint, Partition, Replica} import kafka.controller.{KafkaController, StateChangeLogger} import kafka.log._ import kafka.metrics.KafkaMetricsGroup import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} -import kafka.server.checkpoints.OffsetCheckpointFile +import kafka.server.checkpoints.{OffsetCheckpointFile, OffsetCheckpoints, SimpleOffsetCheckpoints} import kafka.utils._ import kafka.zk.KafkaZkClient import org.apache.kafka.common.errors._ @@ -118,19 +118,30 @@ object LogReadResult { lastStableOffset = None) } +/** + * Trait to represent the state of hosted partitions. We create a concrete (active) Partition + * instance when the broker receives a LeaderAndIsr request from the controller indicating + * that it should be either a leader or follower of a partition. + */ +trait HostedPartition + +object HostedPartition { + /** + * This broker does not have any state for this partition locally. + */ + object None extends HostedPartition + + /** + * This broker hosts the partition, but it is in an offline log directory. + */ + object Offline extends HostedPartition +} + + object ReplicaManager { val HighWatermarkFilename = "replication-offset-checkpoint" val IsrChangePropagationBlackOut = 5000L val IsrChangePropagationInterval = 60000L - val OfflinePartition: Partition = new Partition(new TopicPartition("", -1), - isOffline = true, - replicaLagTimeMaxMs = 0L, - interBrokerProtocolVersion = ApiVersion.latestVersion, - localBrokerId = -1, - time = null, - replicaManager = null, - logManager = null, - zkClient = null) } class ReplicaManager(val config: KafkaConfig, @@ -181,7 +192,7 @@ class ReplicaManager(val config: KafkaConfig, /* epoch of the controller that last changed the leader */ @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch private val localBrokerId = config.brokerId - private val allPartitions = new Pool[TopicPartition, Partition](valueFactory = Some(tp => + private val allPartitions = new Pool[TopicPartition, HostedPartition](valueFactory = Some(tp => Partition(tp, time, this))) private val replicaStateChangeLock = new Object val replicaFetcherManager = createReplicaFetcherManager(metrics, time, threadNamePrefix, quotaManagers.follower) @@ -226,7 +237,7 @@ class ReplicaManager(val config: KafkaConfig, val offlineReplicaCount = newGauge( "OfflineReplicaCount", new Gauge[Int] { - def value = offlinePartitionsIterator.size + def value = offlinePartitionCount } ) val underReplicatedPartitions = newGauge( @@ -248,9 +259,9 @@ class ReplicaManager(val config: KafkaConfig, } ) - val isrExpandRate = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS) - val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS) - val failedIsrUpdatesRate = newMeter("FailedIsrUpdatesPerSec", "failedUpdates", TimeUnit.SECONDS) + val isrExpandRate: Meter = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS) + val isrShrinkRate: Meter = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS) + val failedIsrUpdatesRate: Meter = newMeter("FailedIsrUpdatesPerSec", "failedUpdates", TimeUnit.SECONDS) def underReplicatedPartitionCount: Int = leaderPartitionsIterator.count(_.isUnderReplicated) @@ -295,40 +306,7 @@ class ReplicaManager(val config: KafkaConfig, def getLog(topicPartition: TopicPartition): Option[Log] = logManager.getLog(topicPartition) - /** - * Try to complete some delayed produce requests with the request key; - * this can be triggered when: - * - * 1. The partition HW has changed (for acks = -1) - * 2. A follower replica's fetch operation is received (for acks > 1) - */ - def tryCompleteDelayedProduce(key: DelayedOperationKey) { - val completed = delayedProducePurgatory.checkAndComplete(key) - debug("Request key %s unblocked %d producer requests.".format(key.keyLabel, completed)) - } - - /** - * Try to complete some delayed fetch requests with the request key; - * this can be triggered when: - * - * 1. The partition HW has changed (for regular fetch) - * 2. A new message set is appended to the local log (for follower fetch) - */ - def tryCompleteDelayedFetch(key: DelayedOperationKey) { - val completed = delayedFetchPurgatory.checkAndComplete(key) - debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, completed)) - } - - /** - * Try to complete some delayed DeleteRecordsRequest with the request key; - * this needs to be triggered when the partition low watermark has changed - */ - def tryCompleteDelayedDeleteRecords(key: DelayedOperationKey) { - val completed = delayedDeleteRecordsPurgatory.checkAndComplete(key) - debug("Request key %s unblocked %d DeleteRecordsRequest.".format(key.keyLabel, completed)) - } - - def hasDelayedElectionOperations = delayedElectPreferredLeaderPurgatory.delayed != 0 + def hasDelayedElectionOperations: Boolean = delayedElectPreferredLeaderPurgatory.numDelayed != 0 def tryCompleteElection(key: DelayedOperationKey): Unit = { val completed = delayedElectPreferredLeaderPurgatory.checkAndComplete(key) @@ -350,24 +328,33 @@ class ReplicaManager(val config: KafkaConfig, logDirFailureHandler.start() } + private def maybeRemoveTopicMetrics(topic: String): Unit = { + val topicHasOnlinePartition = allPartitions.values.exists { + case partition: Partition => topic == partition.topic + case _ => false + } + if (!topicHasOnlinePartition) + brokerTopicStats.removeMetrics(topic) + } + def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean) = { stateChangeLogger.trace(s"Handling stop replica (delete=$deletePartition) for partition $topicPartition") if (deletePartition) { - val removedPartition = allPartitions.remove(topicPartition) - if (removedPartition eq ReplicaManager.OfflinePartition) { - allPartitions.put(topicPartition, ReplicaManager.OfflinePartition) - throw new KafkaStorageException(s"Partition $topicPartition is on an offline disk") - } + getPartition(topicPartition) match { + case HostedPartition.Offline => + throw new KafkaStorageException(s"Partition $topicPartition is on an offline disk") + + case removedPartition: Partition => + if (allPartitions.remove(topicPartition, removedPartition)) { + maybeRemoveTopicMetrics(topicPartition.topic) + // this will delete the local log. This call may throw exception if the log is on offline directory + removedPartition.delete() + } - if (removedPartition != null) { - val topicHasPartitions = allPartitions.values.exists(partition => topicPartition.topic == partition.topic) - if (!topicHasPartitions) - brokerTopicStats.removeMetrics(topicPartition.topic) - // this will delete the local log. This call may throw exception if the log is on offline directory - removedPartition.delete() - } else { - stateChangeLogger.trace(s"Ignoring stop replica (delete=$deletePartition) for partition $topicPartition as replica doesn't exist on broker") + case HostedPartition.None => + stateChangeLogger.trace(s"Ignoring stop replica (delete=$deletePartition) for partition " + + s"$topicPartition as replica doesn't exist on broker") } // Delete log and corresponding folders in case replica manager doesn't hold them anymore. @@ -409,35 +396,46 @@ class ReplicaManager(val config: KafkaConfig, } } - def getOrCreatePartition(topicPartition: TopicPartition): Partition = - allPartitions.getAndMaybePut(topicPartition) + def getPartition(topicPartition: TopicPartition): HostedPartition = { + Option(allPartitions.get(topicPartition)).getOrElse(HostedPartition.None) + } - def getPartition(topicPartition: TopicPartition): Option[Partition] = - Option(allPartitions.get(topicPartition)) + // Visible for testing + def createPartition(topicPartition: TopicPartition): Partition = { + val partition = Partition(topicPartition, time, this) + allPartitions.put(topicPartition, partition) + partition + } - def nonOfflinePartition(topicPartition: TopicPartition): Option[Partition] = - getPartition(topicPartition).filter(_ ne ReplicaManager.OfflinePartition) + def nonOfflinePartition(topicPartition: TopicPartition): Option[Partition] = { + getPartition(topicPartition) match { + case partition: Partition => Some(partition) + case _ => None + } + } // An iterator over all non offline partitions. This is a weakly consistent iterator; a partition made offline after // the iterator has been constructed could still be returned by this iterator. - private def nonOfflinePartitionsIterator: Iterator[Partition] = - allPartitions.values.iterator.filter(_ ne ReplicaManager.OfflinePartition) - - // An iterator over all offline partitions. This is a weakly consistent iterator; a partition made offline after the - // iterator has been constructed may not be visible. - private def offlinePartitionsIterator: Iterator[Partition] = - allPartitions.values.iterator.filter(_ eq ReplicaManager.OfflinePartition) + private def nonOfflinePartitionsIterator: Iterator[Partition] = { + allPartitions.values.iterator.flatMap { + case p: Partition => Some(p) + case _ => None + } + } + private def offlinePartitionCount: Int = { + allPartitions.values.iterator.count(_ == HostedPartition.Offline) + } def getPartitionOrException(topicPartition: TopicPartition, expectLeader: Boolean): Partition = { getPartition(topicPartition) match { - case Some(partition) => - if (partition eq ReplicaManager.OfflinePartition) - throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory") - else - partition + case partition: Partition => + partition - case None if metadataCache.contains(topicPartition) => + case HostedPartition.Offline => + throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory") + + case HostedPartition.None if metadataCache.contains(topicPartition) => if (expectLeader) { // The topic exists, but this broker is no longer a replica of it, so we return NOT_LEADER which // forces clients to refresh metadata to find the new location. This can happen, for example, @@ -448,7 +446,7 @@ class ReplicaManager(val config: KafkaConfig, throw new ReplicaNotAvailableException(s"Partition $topicPartition is not available") } - case None => + case HostedPartition.None => throw new UnknownTopicOrPartitionException(s"Partition $topicPartition doesn't exist") } } @@ -583,15 +581,17 @@ class ReplicaManager(val config: KafkaConfig, if (!logManager.isLogDirOnline(destinationDir)) throw new KafkaStorageException(s"Log directory $destinationDir is offline") - getPartition(topicPartition).foreach { partition => - if (partition eq ReplicaManager.OfflinePartition) + getPartition(topicPartition) match { + case partition: Partition => + // Stop current replica movement if the destinationDir is different from the existing destination log directory + if (partition.futureReplicaDirChanged(destinationDir)) { + replicaAlterLogDirsManager.removeFetcherForPartitions(Set(topicPartition)) + partition.removeFutureLocalReplica() + } + case HostedPartition.Offline => throw new KafkaStorageException(s"Partition $topicPartition is offline") - // Stop current replica movement if the destinationDir is different from the existing destination log directory - if (partition.futureReplicaDirChanged(destinationDir)) { - replicaAlterLogDirsManager.removeFetcherForPartitions(Set(topicPartition)) - partition.removeFutureLocalReplica() - } + case _ => // Do nothing } // If the log for this partition has not been created yet: @@ -609,7 +609,8 @@ class ReplicaManager(val config: KafkaConfig, // start ReplicaAlterDirThread to move data of this partition from the current log to the future log // - Otherwise, return KafkaStorageException. We do not create the future log while there is offline log directory // so that we can avoid creating future log for the same partition in multiple log directories. - if (partition.maybeCreateFutureReplica(destinationDir)) { + val highWatermarkCheckpoints = new SimpleOffsetCheckpoints(this.highWatermarkCheckpoints) + if (partition.maybeCreateFutureReplica(destinationDir, highWatermarkCheckpoints)) { val futureReplica = futureLocalReplicaOrException(topicPartition) logManager.abortAndPauseCleaning(topicPartition) @@ -779,10 +780,8 @@ class ReplicaManager(val config: KafkaConfig, (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e))) case t: Throwable => val logStartOffset = getPartition(topicPartition) match { - case Some(partition) => - partition.logStartOffset - case _ => - -1 + case partition: Partition => partition.logStartOffset + case _ => -1L } brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark() brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark() @@ -1060,43 +1059,52 @@ class ReplicaManager(val config: KafkaConfig, val newPartitions = new mutable.HashSet[Partition] leaderAndIsrRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) => - val partition = getPartition(topicPartition).getOrElse { - val createdPartition = getOrCreatePartition(topicPartition) - newPartitions.add(createdPartition) - createdPartition + val partitionOpt = getPartition(topicPartition) match { + case HostedPartition.Offline => + stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " + + s"controller $controllerId with correlation id $correlationId " + + s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " + + "partition is in an offline log directory") + responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR) + None + + case partition: Partition => Some(partition) + + case HostedPartition.None => + val partition = Partition(topicPartition, time, this) + allPartitions.putIfNotExists(topicPartition, partition) + newPartitions.add(partition) + Some(partition) } - val currentLeaderEpoch = partition.getLeaderEpoch - val requestLeaderEpoch = stateInfo.basePartitionState.leaderEpoch - if (partition eq ReplicaManager.OfflinePartition) { - stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " + - s"controller $controllerId with correlation id $correlationId " + - s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " + - "partition is in an offline log directory") - responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR) - } else if (requestLeaderEpoch > currentLeaderEpoch) { - // If the leader epoch is valid record the epoch of the controller that made the leadership decision. - // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path - if(stateInfo.basePartitionState.replicas.contains(localBrokerId)) - partitionState.put(partition, stateInfo) - else { - stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " + - s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " + - s"in assigned replica list ${stateInfo.basePartitionState.replicas.asScala.mkString(",")}") - responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION) + + partitionOpt.foreach { partition => + val currentLeaderEpoch = partition.getLeaderEpoch + val requestLeaderEpoch = stateInfo.basePartitionState.leaderEpoch + if (requestLeaderEpoch > currentLeaderEpoch) { + // If the leader epoch is valid record the epoch of the controller that made the leadership decision. + // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path + if (stateInfo.basePartitionState.replicas.contains(localBrokerId)) + partitionState.put(partition, stateInfo) + else { + stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " + + s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " + + s"in assigned replica list ${stateInfo.basePartitionState.replicas.asScala.mkString(",")}") + responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION) + } + } else if (requestLeaderEpoch < currentLeaderEpoch) { + stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " + + s"controller $controllerId with correlation id $correlationId " + + s"epoch $controllerEpoch for partition $topicPartition since its associated " + + s"leader epoch $requestLeaderEpoch is smaller than the current " + + s"leader epoch $currentLeaderEpoch") + responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH) + } else { + stateChangeLogger.debug(s"Ignoring LeaderAndIsr request from " + + s"controller $controllerId with correlation id $correlationId " + + s"epoch $controllerEpoch for partition $topicPartition since its associated " + + s"leader epoch $requestLeaderEpoch matches the current leader epoch") + responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH) } - } else if (requestLeaderEpoch < currentLeaderEpoch) { - stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " + - s"controller $controllerId with correlation id $correlationId " + - s"epoch $controllerEpoch for partition $topicPartition since its associated " + - s"leader epoch $requestLeaderEpoch is smaller than the current " + - s"leader epoch $currentLeaderEpoch") - responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH) - } else { - stateChangeLogger.debug(s"Ignoring LeaderAndIsr request from " + - s"controller $controllerId with correlation id $correlationId " + - s"epoch $controllerEpoch for partition $topicPartition since its associated " + - s"leader epoch $requestLeaderEpoch matches the current leader epoch") - responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH) } } @@ -1105,12 +1113,15 @@ class ReplicaManager(val config: KafkaConfig, } val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys + val highWatermarkCheckpoints = new SimpleOffsetCheckpoints(this.highWatermarkCheckpoints) val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty) - makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap) + makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap, + highWatermarkCheckpoints) else Set.empty[Partition] val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty) - makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap) + makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap, + highWatermarkCheckpoints) else Set.empty[Partition] @@ -1121,10 +1132,11 @@ class ReplicaManager(val config: KafkaConfig, * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object. * we need to map this topic-partition to OfflinePartition instead. */ - if (localReplica(topicPartition).isEmpty && (allPartitions.get(topicPartition) ne ReplicaManager.OfflinePartition)) - allPartitions.put(topicPartition, ReplicaManager.OfflinePartition) + if (localReplica(topicPartition).isEmpty) + markPartitionOffline(topicPartition) } + // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions // have been completely populated before starting the checkpointing there by avoiding weird race conditions if (!hwThreadInitialized) { @@ -1140,7 +1152,7 @@ class ReplicaManager(val config: KafkaConfig, val leader = BrokerEndPoint(config.brokerId, "localhost", -1) // Add future replica to partition's map - partition.getOrCreateReplica(Request.FutureLocalReplicaId, isNew = false) + partition.getOrCreateReplica(Request.FutureLocalReplicaId, isNew = false, highWatermarkCheckpoints) // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move // replica from source dir to destination dir @@ -1175,13 +1187,14 @@ class ReplicaManager(val config: KafkaConfig, * TODO: the above may need to be fixed later */ private def makeLeaders(controllerId: Int, - epoch: Int, + controllerEpoch: Int, partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState], correlationId: Int, - responseMap: mutable.Map[TopicPartition, Errors]): Set[Partition] = { + responseMap: mutable.Map[TopicPartition, Errors], + highWatermarkCheckpoints: OffsetCheckpoints): Set[Partition] = { partitionState.keys.foreach { partition => stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from " + - s"controller $controllerId epoch $epoch starting the become-leader transition for " + + s"controller $controllerId epoch $controllerEpoch starting the become-leader transition for " + s"partition ${partition.topicPartition}") } @@ -1196,20 +1209,20 @@ class ReplicaManager(val config: KafkaConfig, // Update the partition information to be the leader partitionState.foreach{ case (partition, partitionStateInfo) => try { - if (partition.makeLeader(controllerId, partitionStateInfo, correlationId)) { + if (partition.makeLeader(controllerId, partitionStateInfo, correlationId, highWatermarkCheckpoints)) { partitionsToMakeLeaders += partition stateChangeLogger.trace(s"Stopped fetchers as part of become-leader request from " + - s"controller $controllerId epoch $epoch with correlation id $correlationId for partition ${partition.topicPartition} " + + s"controller $controllerId epoch $controllerEpoch with correlation id $correlationId for partition ${partition.topicPartition} " + s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch})") } else stateChangeLogger.info(s"Skipped the become-leader state change after marking its " + - s"partition as leader with correlation id $correlationId from controller $controllerId epoch $epoch for " + + s"partition as leader with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " + s"partition ${partition.topicPartition} (last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " + s"since it is already the leader for the partition.") } catch { case e: KafkaStorageException => stateChangeLogger.error(s"Skipped the become-leader state change with " + - s"correlation id $correlationId from controller $controllerId epoch $epoch for partition ${partition.topicPartition} " + + s"correlation id $correlationId from controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " + s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) since " + s"the replica for the partition is offline due to disk error $e") val dirOpt = getLogDir(partition.topicPartition) @@ -1222,7 +1235,7 @@ class ReplicaManager(val config: KafkaConfig, case e: Throwable => partitionState.keys.foreach { partition => stateChangeLogger.error(s"Error while processing LeaderAndIsr request correlationId $correlationId received " + - s"from controller $controllerId epoch $epoch for partition ${partition.topicPartition}", e) + s"from controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition}", e) } // Re-throw the exception for it to be caught in KafkaApis throw e @@ -1230,7 +1243,7 @@ class ReplicaManager(val config: KafkaConfig, partitionState.keys.foreach { partition => stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " + - s"epoch $epoch for the become-leader transition for partition ${partition.topicPartition}") + s"epoch $controllerEpoch for the become-leader transition for partition ${partition.topicPartition}") } partitionsToMakeLeaders @@ -1255,13 +1268,14 @@ class ReplicaManager(val config: KafkaConfig, * return the set of partitions that are made follower due to this method */ private def makeFollowers(controllerId: Int, - epoch: Int, + controllerEpoch: Int, partitionStates: Map[Partition, LeaderAndIsrRequest.PartitionState], correlationId: Int, - responseMap: mutable.Map[TopicPartition, Errors]) : Set[Partition] = { + responseMap: mutable.Map[TopicPartition, Errors], + highWatermarkCheckpoints: OffsetCheckpoints) : Set[Partition] = { partitionStates.foreach { case (partition, partitionState) => stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId " + - s"epoch $epoch starting the become-follower transition for partition ${partition.topicPartition} with leader " + + s"epoch $controllerEpoch starting the become-follower transition for partition ${partition.topicPartition} with leader " + s"${partitionState.basePartitionState.leader}") } @@ -1269,7 +1283,6 @@ class ReplicaManager(val config: KafkaConfig, responseMap.put(partition.topicPartition, Errors.NONE) val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set() - try { // TODO: Delete leaders from LeaderAndIsrRequest partitionStates.foreach { case (partition, partitionStateInfo) => @@ -1278,11 +1291,11 @@ class ReplicaManager(val config: KafkaConfig, metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match { // Only change partition state when the leader is available case Some(_) => - if (partition.makeFollower(controllerId, partitionStateInfo, correlationId)) + if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, highWatermarkCheckpoints)) partitionsToMakeFollower += partition else stateChangeLogger.info(s"Skipped the become-follower state change after marking its partition as " + - s"follower with correlation id $correlationId from controller $controllerId epoch $epoch " + + s"follower with correlation id $correlationId from controller $controllerId epoch $controllerEpoch " + s"for partition ${partition.topicPartition} (last update " + s"controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " + s"since the new leader $newLeaderBrokerId is the same as the old leader") @@ -1290,17 +1303,17 @@ class ReplicaManager(val config: KafkaConfig, // The leader broker should always be present in the metadata cache. // If not, we should record the error message and abort the transition process for this partition stateChangeLogger.error(s"Received LeaderAndIsrRequest with correlation id $correlationId from " + - s"controller $controllerId epoch $epoch for partition ${partition.topicPartition} " + + s"controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " + s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " + s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.") // Create the local replica even if the leader is unavailable. This is required to ensure that we include // the partition's high watermark in the checkpoint file (see KAFKA-1647) - partition.getOrCreateReplica(localBrokerId, isNew = partitionStateInfo.isNew) + partition.getOrCreateReplica(localBrokerId, isNew = partitionStateInfo.isNew, highWatermarkCheckpoints) } } catch { case e: KafkaStorageException => stateChangeLogger.error(s"Skipped the become-follower state change with correlation id $correlationId from " + - s"controller $controllerId epoch $epoch for partition ${partition.topicPartition} " + + s"controller $controllerId epoch $controllerEpoch for partition ${partition.topicPartition} " + s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) with leader " + s"$newLeaderBrokerId since the replica for the partition is offline due to disk error $e") val dirOpt = getLogDir(partition.topicPartition) @@ -1313,57 +1326,56 @@ class ReplicaManager(val config: KafkaConfig, replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition)) partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace(s"Stopped fetchers as part of become-follower request from controller $controllerId " + - s"epoch $epoch with correlation id $correlationId for partition ${partition.topicPartition} with leader " + + s"epoch $controllerEpoch with correlation id $correlationId for partition ${partition.topicPartition} with leader " + s"${partitionStates(partition).basePartitionState.leader}") } partitionsToMakeFollower.foreach { partition => val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition) - tryCompleteDelayedProduce(topicPartitionOperationKey) - tryCompleteDelayedFetch(topicPartitionOperationKey) + delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey) + delayedFetchPurgatory.checkAndComplete(topicPartitionOperationKey) } partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace(s"Truncated logs and checkpointed recovery boundaries for partition " + s"${partition.topicPartition} as part of become-follower request with correlation id $correlationId from " + - s"controller $controllerId epoch $epoch with leader ${partitionStates(partition).basePartitionState.leader}") + s"controller $controllerId epoch $controllerEpoch with leader ${partitionStates(partition).basePartitionState.leader}") } if (isShuttingDown.get()) { partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace(s"Skipped the adding-fetcher step of the become-follower state " + - s"change with correlation id $correlationId from controller $controllerId epoch $epoch for " + + s"change with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " + s"partition ${partition.topicPartition} with leader ${partitionStates(partition).basePartitionState.leader} " + "since it is shutting down") } - } - else { + } else { // we do not need to check if the leader exists again since this has been done at the beginning of this process val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition => val leader = metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get .brokerEndPoint(config.interBrokerListenerName) val fetchOffset = partition.localReplicaOrException.highWatermark.messageOffset partition.topicPartition -> InitialFetchState(leader, partition.getLeaderEpoch, fetchOffset) - }.toMap - replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) + }.toMap - partitionsToMakeFollower.foreach { partition => + replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) + partitionsToMakeFollowerWithLeaderAndOffset.foreach { case (partition, initialFetchState) => stateChangeLogger.trace(s"Started fetcher to new leader as part of become-follower " + - s"request from controller $controllerId epoch $epoch with correlation id $correlationId for " + - s"partition ${partition.topicPartition} with leader ${partitionStates(partition).basePartitionState.leader}") + s"request from controller $controllerId epoch $controllerEpoch with correlation id $correlationId for " + + s"partition $partition with leader ${initialFetchState.leader}") } } } catch { case e: Throwable => stateChangeLogger.error(s"Error while processing LeaderAndIsr request with correlationId $correlationId " + - s"received from controller $controllerId epoch $epoch", e) + s"received from controller $controllerId epoch $controllerEpoch", e) // Re-throw the exception for it to be caught in KafkaApis throw e } partitionStates.keys.foreach { partition => stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " + - s"epoch $epoch for the become-follower transition for partition ${partition.topicPartition} with leader " + + s"epoch $controllerEpoch for the become-follower transition for partition ${partition.topicPartition} with leader " + s"${partitionStates(partition).basePartitionState.leader}") } @@ -1439,8 +1451,9 @@ class ReplicaManager(val config: KafkaConfig, } // Used only by test - def markPartitionOffline(tp: TopicPartition) { - allPartitions.put(tp, ReplicaManager.OfflinePartition) + def markPartitionOffline(tp: TopicPartition): Unit = replicaStateChangeLock synchronized { + allPartitions.put(tp, HostedPartition.Offline) + Partition.removeMetrics(tp) } // logDir should be an absolute path @@ -1467,13 +1480,10 @@ class ReplicaManager(val config: KafkaConfig, partitionsWithOfflineFutureReplica.foreach(partition => partition.removeFutureLocalReplica(deleteFromLogDir = false)) newOfflinePartitions.foreach { topicPartition => - val partition = allPartitions.put(topicPartition, ReplicaManager.OfflinePartition) - partition.removePartitionMetrics() + markPartitionOffline(topicPartition) } newOfflinePartitions.map(_.topic).foreach { topic: String => - val topicHasPartitions = allPartitions.values.exists(partition => topic == partition.topic) - if (!topicHasPartitions) - brokerTopicStats.removeMetrics(topic) + maybeRemoveTopicMetrics(topic) } highWatermarkCheckpoints = highWatermarkCheckpoints.filterKeys(_ != dir) @@ -1524,17 +1534,17 @@ class ReplicaManager(val config: KafkaConfig, def lastOffsetForLeaderEpoch(requestedEpochInfo: Map[TopicPartition, OffsetsForLeaderEpochRequest.PartitionData]): Map[TopicPartition, EpochEndOffset] = { requestedEpochInfo.map { case (tp, partitionData) => val epochEndOffset = getPartition(tp) match { - case Some(partition) => - if (partition eq ReplicaManager.OfflinePartition) - new EpochEndOffset(Errors.KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) - else - partition.lastOffsetForLeaderEpoch(partitionData.currentLeaderEpoch, partitionData.leaderEpoch, - fetchOnlyFromLeader = true) + case partition: Partition => + partition.lastOffsetForLeaderEpoch(partitionData.currentLeaderEpoch, partitionData.leaderEpoch, + fetchOnlyFromLeader = true) + + case HostedPartition.Offline => + new EpochEndOffset(Errors.KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) - case None if metadataCache.contains(tp) => + case HostedPartition.None if metadataCache.contains(tp) => new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) - case None => + case HostedPartition.None => new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) } tp -> epochEndOffset @@ -1552,7 +1562,7 @@ class ReplicaManager(val config: KafkaConfig, results: Map[TopicPartition, ApiError]): Unit = { if (expectedLeaders.nonEmpty) { val watchKeys = expectedLeaders.map{ - case (tp, leader) => new TopicPartitionOperationKey(tp.topic, tp.partition) + case (tp, _) => new TopicPartitionOperationKey(tp) }.toSeq delayedElectPreferredLeaderPurgatory.tryCompleteElseWatch( new DelayedElectPreferredLeader(deadline - time.milliseconds(), expectedLeaders, results, diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala index 2769cb4240a52..715f42fa30f41 100644 --- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala +++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala @@ -61,3 +61,18 @@ class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureCh def read(): Map[TopicPartition, Long] = checkpoint.read().toMap } + +trait OffsetCheckpoints { + def fetch(logDir: String, topicPartition: TopicPartition): Option[Long] +} + +class SimpleOffsetCheckpoints(checkpointFilesByLogDir: Map[String, OffsetCheckpointFile]) + extends OffsetCheckpoints { + + override def fetch(logDir: String, topicPartition: TopicPartition): Option[Long] = { + val checkpoint = checkpointFilesByLogDir(logDir) + val offsetMap = checkpoint.read() + offsetMap.get(topicPartition) + } + +} diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala index 33de22ba82521..e2733b8936fd9 100644 --- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala +++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala @@ -39,13 +39,13 @@ object ReplicationUtils extends Logging { try { val (writtenLeaderOpt, writtenStat) = zkClient.getDataAndStat(path) val expectedLeaderOpt = TopicPartitionStateZNode.decode(expectedLeaderAndIsrInfo, writtenStat) - val succeeded = writtenLeaderOpt.map { writtenData => + val succeeded = writtenLeaderOpt.exists { writtenData => val writtenLeaderOpt = TopicPartitionStateZNode.decode(writtenData, writtenStat) (expectedLeaderOpt, writtenLeaderOpt) match { case (Some(expectedLeader), Some(writtenLeader)) if expectedLeader == writtenLeader => true case _ => false } - }.getOrElse(false) + } if (succeeded) (true, writtenStat.getVersion) else (false, -1) } catch { diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala index 654a92ee0ac41..c5763adf59ad9 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala @@ -99,7 +99,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { val newLeaderServer = servers.find(_.config.brokerId == 101).get TestUtils.waitUntilTrue ( - () => newLeaderServer.replicaManager.getPartition(topicPartition).flatMap(_.leaderReplicaIfLocal).isDefined, + () => newLeaderServer.replicaManager.nonOfflinePartition(topicPartition).flatMap(_.leaderReplicaIfLocal).isDefined, "broker 101 should be the new leader", pause = 1L ) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index e3f7b4db3d946..b3e5ade009a55 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -22,16 +22,16 @@ import java.util.{Optional, Properties} import java.util.concurrent.{CountDownLatch, Executors, TimeUnit, TimeoutException} import java.util.concurrent.atomic.AtomicBoolean -import kafka.api.{ApiVersion, Request} +import com.yammer.metrics.Metrics +import com.yammer.metrics.core.Metric +import kafka.api.{ApiVersion, LeaderAndIsr} import kafka.common.UnexpectedAppendOffsetException import kafka.log.{Defaults => _, _} -import kafka.server.QuotaFactory.QuotaManagers import kafka.server._ +import kafka.server.checkpoints.OffsetCheckpoints import kafka.utils._ -import kafka.zk.KafkaZkClient import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{ApiException, OffsetNotAvailableException, ReplicaNotAvailableException} -import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.utils.Utils @@ -39,26 +39,30 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.{EpochEndOffset, IsolationLevel, LeaderAndIsrRequest, ListOffsetRequest} import org.junit.{After, Before, Test} import org.junit.Assert._ +import org.mockito.Mockito.{doNothing, mock, when} import org.scalatest.Assertions.assertThrows -import org.easymock.{Capture, EasyMock, IAnswer} +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.mockito.ArgumentMatchers import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer class PartitionTest { val brokerId = 101 val topicPartition = new TopicPartition("test-topic", 0) val time = new MockTime() - val brokerTopicStats = new BrokerTopicStats - val metrics = new Metrics - var tmpDir: File = _ var logDir1: File = _ var logDir2: File = _ - var replicaManager: ReplicaManager = _ var logManager: LogManager = _ var logConfig: LogConfig = _ - var quotaManagers: QuotaManagers = _ + val stateStore: PartitionStateStore = mock(classOf[PartitionStateStore]) + val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations]) + val metadataCache: MetadataCache = mock(classOf[MetadataCache]) + val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints]) + var partition: Partition = _ @Before def setup(): Unit = { @@ -72,20 +76,19 @@ class PartitionTest { logDirs = Seq(logDir1, logDir2), defaultConfig = logConfig, CleanerConfig(enableCleaner = false), time) logManager.startup() - val brokerProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect) - brokerProps.put(KafkaConfig.LogDirsProp, Seq(logDir1, logDir2).map(_.getAbsolutePath).mkString(",")) - val brokerConfig = KafkaConfig.fromProps(brokerProps) - val kafkaZkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient]) - quotaManagers = QuotaFactory.instantiate(brokerConfig, metrics, time, "") - replicaManager = new ReplicaManager( - config = brokerConfig, metrics, time, zkClient = kafkaZkClient, new MockScheduler(time), - logManager, new AtomicBoolean(false), quotaManagers, - brokerTopicStats, new MetadataCache(brokerId), new LogDirFailureChannel(brokerConfig.logDirs.size)) - - EasyMock.expect(kafkaZkClient.getEntityConfigs(EasyMock.anyString(), EasyMock.anyString())).andReturn(logProps).anyTimes() - EasyMock.expect(kafkaZkClient.conditionalUpdatePath(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject())) - .andReturn((true, 0)).anyTimes() - EasyMock.replay(kafkaZkClient) + partition = new Partition(topicPartition, + replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, + interBrokerProtocolVersion = ApiVersion.latestVersion, + localBrokerId = brokerId, + time, + stateStore, + delayedOperations, + metadataCache, + logManager) + + when(stateStore.fetchTopicConfig()).thenReturn(createLogProperties(Map.empty)) + when(offsetCheckpoints.fetch(ArgumentMatchers.anyString, ArgumentMatchers.eq(topicPartition))) + .thenReturn(None) } private def createLogProperties(overrides: Map[String, String]): Properties = { @@ -99,14 +102,8 @@ class PartitionTest { @After def tearDown(): Unit = { - brokerTopicStats.close() - metrics.close() - logManager.shutdown() Utils.delete(tmpDir) - logManager.liveLogDirs.foreach(Utils.delete) - replicaManager.shutdown(checkpointHW = false) - quotaManagers.shutdown() } @Test @@ -168,17 +165,9 @@ class PartitionTest { val latch = new CountDownLatch(1) logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath) - val log1 = logManager.getOrCreateLog(topicPartition, logConfig) + partition.getOrCreateReplica(brokerId, isNew = true, offsetCheckpoints) logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath) - val log2 = logManager.getOrCreateLog(topicPartition, logConfig, isFuture = true) - val currentReplica = new Replica(brokerId, topicPartition, time, log = Some(log1)) - val futureReplica = new Replica(Request.FutureLocalReplicaId, topicPartition, time, log = Some(log2)) - val partition = Partition(topicPartition, time, replicaManager) - - partition.addReplicaIfNotExists(futureReplica) - partition.addReplicaIfNotExists(currentReplica) - assertEquals(Some(currentReplica), partition.localReplica) - assertEquals(Some(futureReplica), partition.futureLocalReplica) + partition.maybeCreateFutureReplica(logDir2.getAbsolutePath, offsetCheckpoints) val thread1 = new Thread { override def run(): Unit = { @@ -207,10 +196,15 @@ class PartitionTest { // Verify that replacement works when the replicas have the same log end offset but different base offsets in the // active segment def testMaybeReplaceCurrentWithFutureReplicaDifferentBaseOffsets(): Unit = { - // Write records with duplicate keys to current replica and roll at offset 6 logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath) - val log1 = logManager.getOrCreateLog(topicPartition, logConfig) - log1.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0, + val currentReplica = partition.getOrCreateReplica(brokerId, isNew = true, offsetCheckpoints) + logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath) + partition.maybeCreateFutureReplica(logDir2.getAbsolutePath, offsetCheckpoints) + val futureReplica = partition.futureLocalReplicaOrException + + // Write records with duplicate keys to current replica and roll at offset 6 + val currentLog = currentReplica.log.get + currentLog.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0, new SimpleRecord("k1".getBytes, "v1".getBytes), new SimpleRecord("k1".getBytes, "v2".getBytes), new SimpleRecord("k1".getBytes, "v3".getBytes), @@ -218,15 +212,14 @@ class PartitionTest { new SimpleRecord("k2".getBytes, "v5".getBytes), new SimpleRecord("k2".getBytes, "v6".getBytes) ), leaderEpoch = 0) - log1.roll() - log1.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0, + currentLog.roll() + currentLog.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0, new SimpleRecord("k3".getBytes, "v7".getBytes), new SimpleRecord("k4".getBytes, "v8".getBytes) ), leaderEpoch = 0) // Write to the future replica as if the log had been compacted, and do not roll the segment - logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath) - val log2 = logManager.getOrCreateLog(topicPartition, logConfig, isFuture = true) + val buffer = ByteBuffer.allocate(1024) val builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME, 0L, RecordBatch.NO_TIMESTAMP, 0) @@ -235,16 +228,7 @@ class PartitionTest { builder.appendWithOffset(6L, new SimpleRecord("k3".getBytes, "v7".getBytes)) builder.appendWithOffset(7L, new SimpleRecord("k4".getBytes, "v8".getBytes)) - log2.appendAsFollower(builder.build()) - - val currentReplica = new Replica(brokerId, topicPartition, time, log = Some(log1)) - val futureReplica = new Replica(Request.FutureLocalReplicaId, topicPartition, time, log = Some(log2)) - val partition = Partition(topicPartition, time, replicaManager) - - partition.addReplicaIfNotExists(futureReplica) - partition.addReplicaIfNotExists(currentReplica) - assertEquals(Some(currentReplica), partition.localReplica) - assertEquals(Some(futureReplica), partition.futureLocalReplica) + futureReplica.log.get.appendAsFollower(builder.build()) assertTrue(partition.maybeReplaceCurrentWithFutureReplica()) } @@ -491,9 +475,10 @@ class PartitionTest { new SimpleRecord(20,"k4".getBytes, "v2".getBytes), new SimpleRecord(21,"k5".getBytes, "v3".getBytes))) - val partition = Partition(topicPartition, time, replicaManager) + val leaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 1, replicas, true) + assertTrue("Expected first makeLeader() to return 'leader changed'", - partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 1, replicas, true), 0)) + partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)) assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch) assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicas.map(_.brokerId)) @@ -532,13 +517,16 @@ class PartitionTest { } } + when(stateStore.expandIsr(controllerEpoch, new LeaderAndIsr(leader, leaderEpoch, + List(leader, follower2, follower1), 1))) + .thenReturn(Some(2)) + // Update follower 1 partition.updateReplicaLogReadResult( follower1Replica, readResult(FetchDataInfo(LogOffsetMetadata(0), batch1), leaderReplica)) partition.updateReplicaLogReadResult( follower1Replica, readResult(FetchDataInfo(LogOffsetMetadata(2), batch2), leaderReplica)) - // Update follower 2 partition.updateReplicaLogReadResult( follower2Replica, readResult(FetchDataInfo(LogOffsetMetadata(0), batch1), leaderReplica)) partition.updateReplicaLogReadResult( @@ -565,12 +553,14 @@ class PartitionTest { assertEquals(Right(None), fetchOffsetsForTimestamp(30, Some(IsolationLevel.READ_UNCOMMITTED))) // Make into a follower - assertTrue(partition.makeFollower(controllerId, - new LeaderAndIsrRequest.PartitionState(controllerEpoch, follower2, leaderEpoch + 1, isr, 1, replicas, false), 1)) + val followerState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, follower2, + leaderEpoch + 1, isr, 4, replicas, false) + assertTrue(partition.makeFollower(controllerId, followerState, 1, offsetCheckpoints)) // Back to leader, this resets the startLogOffset for this epoch (to 2), we're now in the fault condition - assertTrue(partition.makeLeader(controllerId, - new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch + 2, isr, 1, replicas, false), 2)) + val newLeaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch + 2, isr, 5, + replicas, false) + assertTrue(partition.makeLeader(controllerId, newLeaderState, 2, offsetCheckpoints)) // Try to get offsets as a client fetchOffsetsForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP, Some(IsolationLevel.READ_UNCOMMITTED)) match { @@ -611,6 +601,9 @@ class PartitionTest { case Left(e: ApiException) => fail(s"Should have seen OffsetNotAvailableException, saw $e") } + when(stateStore.expandIsr(controllerEpoch, new LeaderAndIsr(leader, leaderEpoch + 2, + List(leader, follower2, follower1), 5))) + .thenReturn(Some(2)) // Next fetch from replicas, HW is moved up to 5 (ahead of the LEO) partition.updateReplicaLogReadResult( @@ -629,27 +622,10 @@ class PartitionTest { assertEquals(Right(None), fetchOffsetsForTimestamp(100, Some(IsolationLevel.READ_UNCOMMITTED))) } - private def setupPartitionWithMocks(leaderEpoch: Int, isLeader: Boolean, log: Log = logManager.getOrCreateLog(topicPartition, logConfig)): Partition = { - val replica = new Replica(brokerId, topicPartition, time, log = Some(log)) - val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager]) - val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient]) - - val partition = new Partition(topicPartition, - isOffline = false, - replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, - interBrokerProtocolVersion = ApiVersion.latestVersion, - localBrokerId = brokerId, - time, - replicaManager, - logManager, - zkClient) - - EasyMock.replay(replicaManager, zkClient) - - partition.addReplicaIfNotExists(replica) + val replica = partition.getOrCreateReplica(brokerId, isNew = false, offsetCheckpoints) val controllerId = 0 val controllerEpoch = 0 @@ -659,13 +635,13 @@ class PartitionTest { if (isLeader) { assertTrue("Expected become leader transition to succeed", partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId, - leaderEpoch, isr, 1, replicas, true), 0)) + leaderEpoch, isr, 1, replicas, true), 0, offsetCheckpoints)) assertEquals(leaderEpoch, partition.getLeaderEpoch) assertEquals(Some(replica), partition.leaderReplicaIfLocal) } else { assertTrue("Expected become follower transition to succeed", partition.makeFollower(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId + 1, - leaderEpoch, isr, 1, replicas, true), 0)) + leaderEpoch, isr, 1, replicas, true), 0, offsetCheckpoints)) assertEquals(leaderEpoch, partition.getLeaderEpoch) assertEquals(None, partition.leaderReplicaIfLocal) } @@ -675,12 +651,7 @@ class PartitionTest { @Test def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = { - val log = logManager.getOrCreateLog(topicPartition, logConfig) - val replica = new Replica(brokerId, topicPartition, time, log = Some(log)) - val partition = Partition(topicPartition, time, replicaManager) - partition.addReplicaIfNotExists(replica) - assertEquals(Some(replica), partition.localReplica) - + val replica = partition.getOrCreateReplica(brokerId, isNew = false, offsetCheckpoints) val initialLogStartOffset = 5L partition.truncateFullyAndStartAt(initialLogStartOffset, isFuture = false) assertEquals(s"Log end offset after truncate fully and start at $initialLogStartOffset:", @@ -728,37 +699,19 @@ class PartitionTest { @Test def testListOffsetIsolationLevels(): Unit = { - val log = logManager.getOrCreateLog(topicPartition, logConfig) - val replica = new Replica(brokerId, topicPartition, time, log = Some(log)) - val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager]) - val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient]) - - val partition = new Partition(topicPartition, - isOffline = false, - replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, - interBrokerProtocolVersion = ApiVersion.latestVersion, - localBrokerId = brokerId, - time, - replicaManager, - logManager, - zkClient) - val controllerId = 0 val controllerEpoch = 0 val leaderEpoch = 5 val replicas = List[Integer](brokerId, brokerId + 1).asJava val isr = replicas - EasyMock.expect(replicaManager.tryCompleteDelayedFetch(EasyMock.anyObject[TopicPartitionOperationKey])) - .andVoid() - - EasyMock.replay(replicaManager, zkClient) + doNothing().when(delayedOperations).checkAndCompleteFetch() - partition.addReplicaIfNotExists(replica) + val replica = partition.getOrCreateReplica(brokerId, isNew = false, offsetCheckpoints) assertTrue("Expected become leader transition to succeed", partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId, - leaderEpoch, isr, 1, replicas, true), 0)) + leaderEpoch, isr, 1, replicas, true), 0, offsetCheckpoints)) assertEquals(leaderEpoch, partition.getLeaderEpoch) assertEquals(Some(replica), partition.leaderReplicaIfLocal) @@ -804,23 +757,18 @@ class PartitionTest { @Test def testGetReplica(): Unit = { - val log = logManager.getOrCreateLog(topicPartition, logConfig) - val replica = new Replica(brokerId, topicPartition, time, log = Some(log)) - val partition = Partition(topicPartition, time, replicaManager) - assertEquals(None, partition.localReplica) assertThrows[ReplicaNotAvailableException] { partition.localReplicaOrException } - partition.addReplicaIfNotExists(replica) + val replica = partition.getOrCreateReplica(brokerId, isNew = false, offsetCheckpoints) assertEquals(Some(replica), partition.localReplica) assertEquals(replica, partition.localReplicaOrException) } @Test def testAppendRecordsToFollowerWithNoReplicaThrowsException(): Unit = { - val partition = Partition(topicPartition, time, replicaManager) assertThrows[ReplicaNotAvailableException] { partition.appendRecordsToFollowerOrFutureReplica( createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 0L), isFuture = false) @@ -829,22 +777,20 @@ class PartitionTest { @Test def testMakeFollowerWithNoLeaderIdChange(): Unit = { - val partition = Partition(topicPartition, time, replicaManager) - // Start off as follower var partitionStateInfo = new LeaderAndIsrRequest.PartitionState(0, 1, 1, List[Integer](0, 1, 2).asJava, 1, List[Integer](0, 1, 2).asJava, false) - partition.makeFollower(0, partitionStateInfo, 0) + partition.makeFollower(0, partitionStateInfo, 0, offsetCheckpoints) // Request with same leader and epoch increases by only 1, do become-follower steps partitionStateInfo = new LeaderAndIsrRequest.PartitionState(0, 1, 4, List[Integer](0, 1, 2).asJava, 1, List[Integer](0, 1, 2).asJava, false) - assertTrue(partition.makeFollower(0, partitionStateInfo, 2)) + assertTrue(partition.makeFollower(0, partitionStateInfo, 2, offsetCheckpoints)) // Request with same leader and same epoch, skip become-follower steps partitionStateInfo = new LeaderAndIsrRequest.PartitionState(0, 1, 4, List[Integer](0, 1, 2).asJava, 1, List[Integer](0, 1, 2).asJava, false) - assertFalse(partition.makeFollower(0, partitionStateInfo, 2)) + assertFalse(partition.makeFollower(0, partitionStateInfo, 2, offsetCheckpoints)) } @Test @@ -865,9 +811,9 @@ class PartitionTest { val batch3 = TestUtils.records(records = List(new SimpleRecord("k6".getBytes, "v1".getBytes), new SimpleRecord("k7".getBytes, "v2".getBytes))) - val partition = Partition(topicPartition, time, replicaManager) + val leaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 1, replicas, true) assertTrue("Expected first makeLeader() to return 'leader changed'", - partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 1, replicas, true), 0)) + partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)) assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch) assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicas.map(_.brokerId)) @@ -899,11 +845,14 @@ class PartitionTest { assertEquals("Expected leader's HW", lastOffsetOfFirstBatch, leaderReplica.highWatermark.messageOffset) // current leader becomes follower and then leader again (without any new records appended) - partition.makeFollower( - controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, follower2, leaderEpoch + 1, isr, 1, replicas, false), 1) + val followerState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, follower2, leaderEpoch + 1, isr, 1, + replicas, false) + partition.makeFollower(controllerId, followerState, 1, offsetCheckpoints) + + val newLeaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch + 2, isr, 1, + replicas, false) assertTrue("Expected makeLeader() to return 'leader changed' after makeFollower()", - partition.makeLeader(controllerEpoch, new LeaderAndIsrRequest.PartitionState( - controllerEpoch, leader, leaderEpoch + 2, isr, 1, replicas, false), 2)) + partition.makeLeader(controllerEpoch, newLeaderState, 2, offsetCheckpoints)) val currentLeaderEpochStartOffset = leaderReplica.logEndOffset // append records with the latest leader epoch @@ -918,8 +867,10 @@ class PartitionTest { // fetch from the follower not in ISR from start offset of the current leader epoch should // add this follower to ISR + when(stateStore.expandIsr(controllerEpoch, new LeaderAndIsr(leader, leaderEpoch + 2, + List(leader, follower2, follower1), 1))).thenReturn(Some(2)) partition.updateReplicaLogReadResult(follower1Replica, - readResult(FetchDataInfo(LogOffsetMetadata(currentLeaderEpochStartOffset), batch3), leaderReplica)) + readResult(FetchDataInfo(LogOffsetMetadata(currentLeaderEpochStartOffset), batch3), leaderReplica)) assertEquals("ISR", Set[Integer](leader, follower1, follower2), partition.inSyncReplicas.map(_.brokerId)) } @@ -932,8 +883,6 @@ class PartitionTest { */ @Test def testDelayedFetchAfterAppendRecords(): Unit = { - val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager]) - val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient]) val controllerId = 0 val controllerEpoch = 0 val leaderEpoch = 5 @@ -944,35 +893,38 @@ class PartitionTest { val topicPartitions = (0 until 5).map { i => new TopicPartition("test-topic", i) } val logs = topicPartitions.map { tp => logManager.getOrCreateLog(tp, logConfig) } val replicas = logs.map { log => new Replica(brokerId, log.topicPartition, time, log = Some(log)) } - val partitions = replicas.map { replica => + val partitions = ListBuffer.empty[Partition] + + replicas.foreach { replica => val tp = replica.topicPartition + val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations]) val partition = new Partition(tp, - isOffline = false, replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, interBrokerProtocolVersion = ApiVersion.latestVersion, localBrokerId = brokerId, time, - replicaManager, - logManager, - zkClient) + stateStore, + delayedOperations, + metadataCache, + logManager) + + when(delayedOperations.checkAndCompleteFetch()) + .thenAnswer(new Answer[Unit] { + override def answer(invocation: InvocationOnMock): Unit = { + // Acquire leaderIsrUpdate read lock of a different partition when completing delayed fetch + val anotherPartition = (tp.partition + 1) % topicPartitions.size + val partition = partitions(anotherPartition) + partition.fetchOffsetSnapshot(Optional.of(leaderEpoch), fetchOnlyFromLeader = true) + } + }) + partition.addReplicaIfNotExists(replica) - partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId, - leaderEpoch, isr, 1, replicaIds, true), 0) - partition + val leaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId, + leaderEpoch, isr, 1, replicaIds, true) + partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints) + partitions += partition } - // Acquire leaderIsrUpdate read lock of a different partition when completing delayed fetch - val tpKey: Capture[TopicPartitionOperationKey] = EasyMock.newCapture() - EasyMock.expect(replicaManager.tryCompleteDelayedFetch(EasyMock.capture(tpKey))) - .andAnswer(new IAnswer[Unit] { - override def answer(): Unit = { - val anotherPartition = (tpKey.getValue.partition + 1) % topicPartitions.size - val partition = partitions(anotherPartition) - partition.fetchOffsetSnapshot(Optional.of(leaderEpoch), fetchOnlyFromLeader = true) - } - }).anyTimes() - EasyMock.replay(replicaManager, zkClient) - def createRecords(baseOffset: Long): MemoryRecords = { val records = List( new SimpleRecord("k1".getBytes, "v1".getBytes), @@ -1050,10 +1002,60 @@ class PartitionTest { val isr = List[Integer](leader).asJava val leaderEpoch = 8 - val partition = Partition(topicPartition, time, replicaManager) assertFalse(partition.isAtMinIsr) // Make isr set to only have leader to trigger AtMinIsr (default min isr config is 1) - partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 1, replicas, true), 0) + val leaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 1, replicas, true) + partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints) assertTrue(partition.isAtMinIsr) } + + @Test + def testUseCheckpointToInitializeHighWatermark(): Unit = { + val log = logManager.getOrCreateLog(topicPartition, logConfig) + log.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0, + new SimpleRecord("k1".getBytes, "v1".getBytes), + new SimpleRecord("k2".getBytes, "v2".getBytes), + new SimpleRecord("k3".getBytes, "v3".getBytes), + new SimpleRecord("k4".getBytes, "v4".getBytes) + ), leaderEpoch = 0) + log.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 5, + new SimpleRecord("k5".getBytes, "v5".getBytes), + new SimpleRecord("k5".getBytes, "v5".getBytes) + ), leaderEpoch = 5) + + when(offsetCheckpoints.fetch(logDir1.getAbsolutePath, topicPartition)) + .thenReturn(Some(4L)) + + val controllerId = 0 + val controllerEpoch = 3 + val replicas = List[Integer](brokerId, brokerId + 1).asJava + val leaderState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId, + 6, replicas, 1, replicas, false) + partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints) + assertEquals(4, partition.localReplicaOrException.highWatermark.messageOffset) + } + + @Test + def testAddAndRemoveMetrics(): Unit = { + val metricsToCheck = List( + "UnderReplicated", + "UnderMinIsr", + "InSyncReplicasCount", + "ReplicasCount", + "LastStableOffsetLag", + "AtMinIsr") + + def getMetric(metric: String): Option[Metric] = { + Metrics.defaultRegistry().allMetrics().asScala.filterKeys { metricName => + metricName.getName == metric && metricName.getType == "Partition" + }.headOption.map(_._2) + } + + assertTrue(metricsToCheck.forall(getMetric(_).isDefined)) + + Partition.removeMetrics(topicPartition) + + assertEquals(Set(), Metrics.defaultRegistry().allMetrics().asScala.keySet.filter(_.getType == "Partition")) + } + } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 280fc8eb0a558..770868c9a6700 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -20,7 +20,7 @@ package kafka.coordinator.group import java.util.Optional import kafka.common.OffsetAndMetadata -import kafka.server.{DelayedOperationPurgatory, KafkaConfig, ReplicaManager} +import kafka.server.{DelayedOperationPurgatory, KafkaConfig, HostedPartition, ReplicaManager} import kafka.utils._ import kafka.utils.timer.MockTimer import org.apache.kafka.common.TopicPartition @@ -1092,7 +1092,8 @@ class GroupCoordinatorTest { assertEquals(Errors.NONE, syncGroupError) EasyMock.reset(replicaManager) - EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))).andReturn(None) + EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))) + .andReturn(HostedPartition.None) EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes() EasyMock.replay(replicaManager) @@ -1837,7 +1838,7 @@ class GroupCoordinatorTest { EasyMock.reset(replicaManager) EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) - EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition)) + EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(partition) EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition)) EasyMock.replay(replicaManager, partition) @@ -2334,7 +2335,7 @@ class GroupCoordinatorTest { EasyMock.reset(replicaManager) EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) - EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition)) + EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(partition) EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition)) EasyMock.replay(replicaManager, partition) @@ -2375,7 +2376,7 @@ class GroupCoordinatorTest { EasyMock.reset(replicaManager) EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) - EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition)) + EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(partition) EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition)) EasyMock.replay(replicaManager, partition) @@ -2687,7 +2688,8 @@ class GroupCoordinatorTest { private def leaveGroup(groupId: String, consumerId: String): LeaveGroupCallbackParams = { val (responseFuture, responseCallback) = setupHeartbeatCallback - EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))).andReturn(None) + EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))) + .andReturn(HostedPartition.None) EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes() EasyMock.replay(replicaManager) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index dab2d723f6dcc..0487178c474bd 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -2025,7 +2025,7 @@ class GroupMetadataManagerTest { } private def mockGetPartition(): Unit = { - EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition)) + EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(partition) EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition)) } diff --git a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala index 23ac2dcf6a164..6c74ce36546f2 100755 --- a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala @@ -191,10 +191,9 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness { if (isEpochInRequestStale) { sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, requestBuilder) - } - else { + } else { sendAndVerifySuccessfulResponse(controllerChannelManager, requestBuilder) - assertTrue(broker2.replicaManager.getPartition(tp).isEmpty) + assertEquals(HostedPartition.None, broker2.replicaManager.getPartition(tp)) } } } finally { diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala index 3b077a0b438dc..c1e0b9f7f2aa3 100644 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -86,17 +86,17 @@ class DelayedOperationTest { purgatory.tryCompleteElseWatch(r2, Array("test1", "test2")) purgatory.tryCompleteElseWatch(r3, Array("test1", "test2", "test3")) - assertEquals("Purgatory should have 3 total delayed operations", 3, purgatory.delayed) + assertEquals("Purgatory should have 3 total delayed operations", 3, purgatory.numDelayed) assertEquals("Purgatory should have 6 watched elements", 6, purgatory.watched) // complete the operations, it should immediately be purged from the delayed operation r2.completable = true r2.tryComplete() - assertEquals("Purgatory should have 2 total delayed operations instead of " + purgatory.delayed, 2, purgatory.delayed) + assertEquals("Purgatory should have 2 total delayed operations instead of " + purgatory.numDelayed, 2, purgatory.numDelayed) r3.completable = true r3.tryComplete() - assertEquals("Purgatory should have 1 total delayed operations instead of " + purgatory.delayed, 1, purgatory.delayed) + assertEquals("Purgatory should have 1 total delayed operations instead of " + purgatory.numDelayed, 1, purgatory.numDelayed) // checking a watch should purge the watch list purgatory.checkAndComplete("test1") @@ -117,7 +117,7 @@ class DelayedOperationTest { val cancelledOperations = purgatory.cancelForKey("key") assertEquals(2, cancelledOperations.size) - assertEquals(1, purgatory.delayed) + assertEquals(1, purgatory.numDelayed) assertEquals(1, purgatory.watched) } diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 61cbd2c94f488..3da22bb871409 100755 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -72,7 +72,7 @@ class HighwatermarkPersistenceTest { var fooPartition0Hw = hwmFor(replicaManager, topic, 0) assertEquals(0L, fooPartition0Hw) val tp0 = new TopicPartition(topic, 0) - val partition0 = replicaManager.getOrCreatePartition(tp0) + val partition0 = replicaManager.createPartition(tp0) // create leader and follower replicas val log0 = logManagers.head.getOrCreateLog(new TopicPartition(topic, 0), LogConfig()) val leaderReplicaPartition0 = new Replica(configs.head.brokerId, tp0, time, 0, Some(log0)) @@ -117,7 +117,7 @@ class HighwatermarkPersistenceTest { var topic1Partition0Hw = hwmFor(replicaManager, topic1, 0) assertEquals(0L, topic1Partition0Hw) val t1p0 = new TopicPartition(topic1, 0) - val topic1Partition0 = replicaManager.getOrCreatePartition(t1p0) + val topic1Partition0 = replicaManager.createPartition(t1p0) // create leader log val topic1Log0 = logManagers.head.getOrCreateLog(t1p0, LogConfig()) // create a local replica for topic1 @@ -134,7 +134,7 @@ class HighwatermarkPersistenceTest { assertEquals(5L, topic1Partition0Hw) // add another partition and set highwatermark val t2p0 = new TopicPartition(topic2, 0) - val topic2Partition0 = replicaManager.getOrCreatePartition(t2p0) + val topic2Partition0 = replicaManager.createPartition(t2p0) // create leader log val topic2Log0 = logManagers.head.getOrCreateLog(t2p0, LogConfig()) // create a local replica for topic2 diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index 006067e4b35b2..1dd4b24c49994 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -227,7 +227,7 @@ class IsrExpirationTest { localLog: Log): Partition = { val leaderId = config.brokerId val tp = new TopicPartition(topic, partitionId) - val partition = replicaManager.getOrCreatePartition(tp) + val partition = replicaManager.createPartition(tp) val leaderReplica = new Replica(leaderId, tp, time, 0, Some(localLog)) val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala index 0fd289c1cf21c..6b69c41fed4f4 100644 --- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala @@ -22,6 +22,7 @@ import java.util.concurrent.{ExecutionException, TimeUnit} import kafka.server.LogDirFailureTest._ import kafka.api.IntegrationTestHarness +import kafka.cluster.Partition import kafka.controller.{OfflineReplica, PartitionAndReplica} import kafka.utils.{CoreUtils, Exit, TestUtils} import org.apache.kafka.clients.consumer.KafkaConsumer @@ -112,14 +113,16 @@ class LogDirFailureTest extends IntegrationTestHarness { // Send a message to another partition whose leader is the same as partition 0 // so that ReplicaFetcherThread on the follower will get response from leader immediately val anotherPartitionWithTheSameLeader = (1 until partitionNum).find { i => - leaderServer.replicaManager.getPartition(new TopicPartition(topic, i)).flatMap(_.leaderReplicaIfLocal).isDefined + leaderServer.replicaManager.nonOfflinePartition(new TopicPartition(topic, i)) + .flatMap(_.leaderReplicaIfLocal).isDefined }.get val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, anotherPartitionWithTheSameLeader, topic.getBytes, "message".getBytes) // When producer.send(...).get returns, it is guaranteed that ReplicaFetcherThread on the follower // has fetched from the leader and attempts to append to the offline replica. producer.send(record).get - assertEquals(brokerCount, leaderServer.replicaManager.getPartition(new TopicPartition(topic, anotherPartitionWithTheSameLeader)).get.inSyncReplicas.size) + assertEquals(brokerCount, leaderServer.replicaManager.nonOfflinePartition(new TopicPartition(topic, anotherPartitionWithTheSameLeader)) + .get.inSyncReplicas.size) followerServer.replicaManager.replicaFetcherManager.fetcherThreadMap.values.foreach { thread => assertFalse("ReplicaFetcherThread should still be working if its partition count > 0", thread.isShutdownComplete) } diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 15f9a9b1d57ef..780d18917b99d 100755 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -23,6 +23,7 @@ import TestUtils._ import kafka.zk.ZooKeeperTestHarness import java.io.File +import kafka.cluster.Partition import kafka.server.checkpoints.OffsetCheckpointFile import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.TopicPartition @@ -147,7 +148,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { * is that server1 has caught up on the topicPartition, and has joined the ISR. * In the line below, we wait until the condition is met before shutting down server2 */ - waitUntilTrue(() => server2.replicaManager.getPartition(topicPartition).get.inSyncReplicas.size == 2, + waitUntilTrue(() => server2.replicaManager.nonOfflinePartition(topicPartition).get.inSyncReplicas.size == 2, "Server 1 is not able to join the ISR after restart") diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala index 4049504c4e77f..d149b8aea599f 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala @@ -614,12 +614,12 @@ class ReplicaAlterLogDirsThreadTest { expect(replicaManager.futureLocalReplica(t1p0)).andReturn(Some(futureReplica)).anyTimes() expect(replicaManager.localReplicaOrException(t1p0)).andReturn(replicaT1p0).anyTimes() expect(replicaManager.futureLocalReplicaOrException(t1p0)).andReturn(futureReplica).anyTimes() - expect(replicaManager.getPartition(t1p0)).andReturn(Some(partition)).anyTimes() + expect(replicaManager.nonOfflinePartition(t1p0)).andReturn(Some(partition)).anyTimes() expect(replicaManager.localReplica(t1p1)).andReturn(Some(replicaT1p1)).anyTimes() expect(replicaManager.futureLocalReplica(t1p1)).andReturn(Some(futureReplica)).anyTimes() expect(replicaManager.localReplicaOrException(t1p1)).andReturn(replicaT1p1).anyTimes() expect(replicaManager.futureLocalReplicaOrException(t1p1)).andReturn(futureReplica).anyTimes() - expect(replicaManager.getPartition(t1p1)).andReturn(Some(partition)).anyTimes() + expect(replicaManager.nonOfflinePartition(t1p1)).andReturn(Some(partition)).anyTimes() } def stubWithFetchMessages(replicaT1p0: Replica, replicaT1p1: Replica, futureReplica: Replica, diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index d6ebdd6c57258..a51641adf5c57 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -98,9 +98,9 @@ class ReplicaFetcherThreadTest { stub(replica, partition, replicaManager) //Expectations - expect(partition.truncateTo(anyLong(), anyBoolean())).once + expect(partition.truncateTo(anyLong(), anyBoolean())).times(3) - replay(replicaManager, logManager, quota, replica) + replay(replicaManager, logManager, quota, replica, partition) //Define the offsets for the OffsetsForLeaderEpochResponse val offsets = Map(t1p0 -> new EpochEndOffset(leaderEpoch, 1), @@ -227,9 +227,9 @@ class ReplicaFetcherThreadTest { stub(replica, partition, replicaManager) //Expectations - expect(partition.truncateTo(anyLong(), anyBoolean())).once + expect(partition.truncateTo(anyLong(), anyBoolean())).times(2) - replay(replicaManager, logManager, replica) + replay(replicaManager, logManager, replica, partition) //Define the offsets for the OffsetsForLeaderEpochResponse val offsets = Map(t1p0 -> new EpochEndOffset(leaderEpoch, 1), t1p1 -> new EpochEndOffset(leaderEpoch, 1)).asJava @@ -609,6 +609,7 @@ class ReplicaFetcherThreadTest { val leaderEpoch = 4 //Stub return values + expect(partition.truncateTo(0L, false)).times(2) expect(replica.logEndOffset).andReturn(0).anyTimes() expect(replica.highWatermark).andReturn(new LogOffsetMetadata(0)).anyTimes() expect(replica.latestEpoch).andReturn(Some(leaderEpoch)).anyTimes() @@ -618,7 +619,7 @@ class ReplicaFetcherThreadTest { expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes() stub(replica, partition, replicaManager) - replay(replicaManager, logManager, quota, replica) + replay(replicaManager, logManager, quota, replica, partition) //Define the offsets for the OffsetsForLeaderEpochResponse val offsetsReply = Map( @@ -727,15 +728,13 @@ class ReplicaFetcherThreadTest { verify(mockBlockingSend) } - def stub(replica: Replica, partition: Partition, replicaManager: ReplicaManager) = { - expect(replicaManager.localReplica(t1p0)).andReturn(Some(replica)).anyTimes() + def stub(replica: Replica, partition: Partition, replicaManager: ReplicaManager): Unit = { expect(replicaManager.localReplicaOrException(t1p0)).andReturn(replica).anyTimes() - expect(replicaManager.getPartition(t1p0)).andReturn(Some(partition)).anyTimes() - expect(replicaManager.localReplica(t1p1)).andReturn(Some(replica)).anyTimes() + expect(replicaManager.nonOfflinePartition(t1p0)).andReturn(Some(partition)).anyTimes() expect(replicaManager.localReplicaOrException(t1p1)).andReturn(replica).anyTimes() - expect(replicaManager.getPartition(t1p1)).andReturn(Some(partition)).anyTimes() - expect(replicaManager.localReplica(t2p1)).andReturn(Some(replica)).anyTimes() + expect(replicaManager.nonOfflinePartition(t1p1)).andReturn(Some(partition)).anyTimes() expect(replicaManager.localReplicaOrException(t2p1)).andReturn(replica).anyTimes() - expect(replicaManager.getPartition(t2p1)).andReturn(Some(partition)).anyTimes() + expect(replicaManager.nonOfflinePartition(t2p1)).andReturn(Some(partition)).anyTimes() + expect(partition.localReplicaOrException).andReturn(replica).anyTimes() } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index 5b2f2aed77bef..c2d92df458145 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -237,7 +237,7 @@ class ReplicaManagerQuotasTest { //create the two replicas for ((p, _) <- fetchInfo) { - val partition = replicaManager.getOrCreatePartition(p) + val partition = replicaManager.createPartition(p) val leaderReplica = new Replica(configs.head.brokerId, p, time, 0, Some(log)) leaderReplica.highWatermark = new LogOffsetMetadata(5) partition.leaderReplicaIdOpt = Some(leaderReplica.brokerId) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 1c1cbd6ca90d3..b7239e095395d 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -26,6 +26,7 @@ import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager} import kafka.utils.{MockScheduler, MockTime, TestUtils} import TestUtils.createBroker import kafka.cluster.BrokerEndPoint +import kafka.server.checkpoints.SimpleOffsetCheckpoints import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend import kafka.utils.timer.MockTimer import kafka.zk.KafkaZkClient @@ -85,8 +86,8 @@ class ReplicaManagerTest { new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats, new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size)) try { - val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1)) - partition.getOrCreateReplica(1) + val partition = rm.createPartition(new TopicPartition(topic, 1)) + partition.getOrCreateReplica(1, isNew = false, new SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints)) rm.checkpointHighWatermarks() } finally { // shutdown the replica manager upon test completion @@ -104,8 +105,8 @@ class ReplicaManagerTest { new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats, new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size)) try { - val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1)) - partition.getOrCreateReplica(1) + val partition = rm.createPartition(new TopicPartition(topic, 1)) + partition.getOrCreateReplica(1, isNew = false, new SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints)) rm.checkpointHighWatermarks() } finally { // shutdown the replica manager upon test completion @@ -158,8 +159,8 @@ class ReplicaManagerTest { try { val brokerList = Seq[Integer](0, 1).asJava - val partition = rm.getOrCreatePartition(new TopicPartition(topic, 0)) - partition.getOrCreateReplica(0) + val partition = rm.createPartition(new TopicPartition(topic, 0)) + partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints)) // Make this replica the leader. val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, collection.immutable.Map(new TopicPartition(topic, 0) -> @@ -202,8 +203,8 @@ class ReplicaManagerTest { try { val brokerList = Seq[Integer](0, 1).asJava - val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, 0)) - partition.getOrCreateReplica(0) + val partition = replicaManager.createPartition(new TopicPartition(topic, 0)) + partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)) // Make this replica the leader. val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, @@ -253,8 +254,8 @@ class ReplicaManagerTest { try { val brokerList = Seq[Integer](0, 1).asJava - val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, 0)) - partition.getOrCreateReplica(0) + val partition = replicaManager.createPartition(new TopicPartition(topic, 0)) + partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)) // Make this replica the leader. val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, @@ -349,8 +350,8 @@ class ReplicaManagerTest { try { val brokerList = Seq[Integer](0, 1).asJava - val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, 0)) - partition.getOrCreateReplica(0) + val partition = replicaManager.createPartition(new TopicPartition(topic, 0)) + partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)) // Make this replica the leader. val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, @@ -415,8 +416,8 @@ class ReplicaManagerTest { try { val brokerList = Seq[Integer](0, 1, 2).asJava - val partition = rm.getOrCreatePartition(new TopicPartition(topic, 0)) - partition.getOrCreateReplica(0) + val partition = rm.createPartition(new TopicPartition(topic, 0)) + partition.getOrCreateReplica(0, isNew = false, new SimpleOffsetCheckpoints(rm.highWatermarkCheckpoints)) // Make this replica the leader. val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, @@ -465,8 +466,9 @@ class ReplicaManagerTest { // Create 2 partitions, assign replica 0 as the leader for both a different follower (1 and 2) for each val tp0 = new TopicPartition(topic, 0) val tp1 = new TopicPartition(topic, 1) - replicaManager.getOrCreatePartition(tp0).getOrCreateReplica(0) - replicaManager.getOrCreatePartition(tp1).getOrCreateReplica(0) + val offsetCheckpoints = new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) + replicaManager.createPartition(tp0).getOrCreateReplica(0, isNew = false, offsetCheckpoints) + replicaManager.createPartition(tp1).getOrCreateReplica(0, isNew = false, offsetCheckpoints) val partition0Replicas = Seq[Integer](0, 1).asJava val partition1Replicas = Seq[Integer](0, 2).asJava val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, @@ -556,11 +558,12 @@ class ReplicaManagerTest { topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, expectTruncation = true) // Initialize partition state to follower, with leader = 1, leaderEpoch = 1 - val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, topicPartition)) - partition.getOrCreateReplica(followerBrokerId) + val partition = replicaManager.createPartition(new TopicPartition(topic, topicPartition)) + val offsetCheckpoints = new SimpleOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) + partition.getOrCreateReplica(followerBrokerId, isNew = false, offsetCheckpoints) partition.makeFollower(controllerId, leaderAndIsrPartitionState(leaderEpoch, leaderBrokerId, aliveBrokerIds), - correlationId) + correlationId, offsetCheckpoints) // Make local partition a follower - because epoch increased by more than 1, truncation should // trigger even though leader does not change diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 94f9a16341323..41c6b3e2ae257 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -117,7 +117,7 @@ class SimpleFetchTest { new MetadataCache(configs.head.brokerId), new LogDirFailureChannel(configs.head.logDirs.size)) // add the partition with two replicas, both in ISR - val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, partitionId)) + val partition = replicaManager.createPartition(new TopicPartition(topic, partitionId)) // create the leader replica with the local log val leaderReplica = new Replica(configs.head.brokerId, partition.topicPartition, time, 0, Some(log)) diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala index ac6dedcd3324c..834954144cd3b 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala @@ -444,7 +444,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness private def awaitISR(tp: TopicPartition): Unit = { TestUtils.waitUntilTrue(() => { - leader.replicaManager.getPartition(tp).get.inSyncReplicas.map(_.brokerId).size == 2 + leader.replicaManager.nonOfflinePartition(tp).get.inSyncReplicas.map(_.brokerId).size == 2 }, "Timed out waiting for replicas to join ISR") } diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala index 3d3b3421546db..eba4167edf1f1 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala @@ -57,7 +57,7 @@ class OffsetsForLeaderEpochTest { val replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats, new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size)) - val partition = replicaManager.getOrCreatePartition(tp) + val partition = replicaManager.createPartition(tp) val leaderReplica = new Replica(config.brokerId, partition.topicPartition, time, 0, Some(mockLog)) partition.addReplicaIfNotExists(leaderReplica) partition.leaderReplicaIdOpt = Some(config.brokerId) @@ -79,7 +79,7 @@ class OffsetsForLeaderEpochTest { val replicaManager = new ReplicaManager(config, metrics, time, null, null, logManager, new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats, new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size)) - replicaManager.getOrCreatePartition(tp) + replicaManager.createPartition(tp) //Given val epochRequested: Integer = 5 diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 59ee426a4c54f..c7f5c24f14c7d 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -822,14 +822,14 @@ object TestUtils extends Logging { } def isLeaderLocalOnBroker(topic: String, partitionId: Int, server: KafkaServer): Boolean = { - server.replicaManager.getPartition(new TopicPartition(topic, partitionId)).exists(_.leaderReplicaIfLocal.isDefined) + server.replicaManager.nonOfflinePartition(new TopicPartition(topic, partitionId)).exists(_.leaderReplicaIfLocal.isDefined) } def findLeaderEpoch(brokerId: Int, topicPartition: TopicPartition, servers: Iterable[KafkaServer]): Int = { val leaderServer = servers.find(_.config.brokerId == brokerId) - val leaderPartition = leaderServer.flatMap(_.replicaManager.getPartition(topicPartition)) + val leaderPartition = leaderServer.flatMap(_.replicaManager.nonOfflinePartition(topicPartition)) .getOrElse(fail(s"Failed to find expected replica on broker $brokerId")) leaderPartition.getLeaderEpoch } @@ -837,7 +837,7 @@ object TestUtils extends Logging { def findFollowerId(topicPartition: TopicPartition, servers: Iterable[KafkaServer]): Int = { val followerOpt = servers.find { server => - server.replicaManager.getPartition(topicPartition) match { + server.replicaManager.nonOfflinePartition(topicPartition) match { case Some(partition) => !partition.leaderReplicaIdOpt.contains(server.config.brokerId) case None => false } @@ -903,7 +903,7 @@ object TestUtils extends Logging { def newLeaderExists: Option[Int] = { servers.find { server => server.config.brokerId != oldLeader && - server.replicaManager.getPartition(tp).exists(_.leaderReplicaIfLocal.isDefined) + server.replicaManager.nonOfflinePartition(tp).exists(_.leaderReplicaIfLocal.isDefined) }.map(_.config.brokerId) } @@ -918,7 +918,7 @@ object TestUtils extends Logging { timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = { def leaderIfExists: Option[Int] = { servers.find { server => - server.replicaManager.getPartition(tp).exists(_.leaderReplicaIfLocal.isDefined) + server.replicaManager.nonOfflinePartition(tp).exists(_.leaderReplicaIfLocal.isDefined) }.map(_.config.brokerId) } @@ -1056,7 +1056,7 @@ object TestUtils extends Logging { "Topic path /brokers/topics/%s not deleted after /admin/delete_topics/%s path is deleted".format(topic, topic)) // ensure that the topic-partition has been deleted from all brokers' replica managers TestUtils.waitUntilTrue(() => - servers.forall(server => topicPartitions.forall(tp => server.replicaManager.getPartition(tp).isEmpty)), + servers.forall(server => topicPartitions.forall(tp => server.replicaManager.nonOfflinePartition(tp).isEmpty)), "Replica manager's should have deleted all of this topic's partitions") // ensure that logs from all replicas are deleted if delete topic is marked successful in ZooKeeper assertTrue("Replica logs not deleted after delete topic is complete",