Skip to content

Commit

Permalink
KAFKA-9102; Increase default zk session timeout and replica max lag […
Browse files Browse the repository at this point in the history
…KIP-537] (apache#7596)

This patch increases the default value of `zookeeper.session.timeout` from 6s to 18s and `replica.lag.time.max.ms` from 10s to 30s. This change was documented in KIP-537: https://cwiki.apache.org/confluence/display/KAFKA/KIP-537%3A+Increase+default+zookeeper+session+timeout.

Reviewers: Ismael Juma <[email protected]>
  • Loading branch information
hachikuji authored Oct 26, 2019
1 parent c5df208 commit 4bde9bb
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 14 deletions.
2 changes: 1 addition & 1 deletion config/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ case class SimpleAssignmentState(replicas: Seq[Int]) extends AssignmentState
* Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
*/
class Partition(val topicPartition: TopicPartition,
replicaLagTimeMaxMs: Long,
val replicaLagTimeMaxMs: Long,
interBrokerProtocolVersion: ApiVersion,
localBrokerId: Int,
time: Time,
Expand Down Expand Up @@ -864,11 +864,11 @@ class Partition(val topicPartition: TopicPartition,
*/
private def tryCompleteDelayedRequests(): Unit = delayedOperations.checkAndCompleteAll()

def maybeShrinkIsr(replicaMaxLagTimeMs: Long): Unit = {
def maybeShrinkIsr(): Unit = {
val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
leaderLogIfLocal match {
case Some(leaderLog) =>
val outOfSyncReplicaIds = getOutOfSyncReplicas(replicaMaxLagTimeMs)
val outOfSyncReplicaIds = getOutOfSyncReplicas(replicaLagTimeMaxMs)
if (outOfSyncReplicaIds.nonEmpty) {
val newInSyncReplicaIds = inSyncReplicaIds -- outOfSyncReplicaIds
assert(newInSyncReplicaIds.nonEmpty)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import scala.collection.{Map, Seq}

object Defaults {
/** ********* Zookeeper Configuration ***********/
val ZkSessionTimeoutMs = 6000
val ZkSessionTimeoutMs = 18000
val ZkSyncTimeMs = 2000
val ZkEnableSecureAcls = false
val ZkMaxInFlightRequests = 10
Expand Down Expand Up @@ -128,7 +128,7 @@ object Defaults {
val ControllerSocketTimeoutMs = RequestTimeoutMs
val ControllerMessageQueueSize = Int.MaxValue
val DefaultReplicationFactor = 1
val ReplicaLagTimeMaxMs = 10000L
val ReplicaLagTimeMaxMs = 30000L
val ReplicaSocketTimeoutMs = 30 * 1000
val ReplicaSocketReceiveBufferBytes = 64 * 1024
val ReplicaFetchMaxBytes = 1024 * 1024
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1550,7 +1550,7 @@ class ReplicaManager(val config: KafkaConfig,

// Shrink ISRs for non offline partitions
allPartitions.keys.foreach { topicPartition =>
nonOfflinePartition(topicPartition).foreach(_.maybeShrinkIsr(config.replicaLagTimeMaxMs))
nonOfflinePartition(topicPartition).foreach(_.maybeShrinkIsr())
}
}

Expand Down
14 changes: 7 additions & 7 deletions core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ class PartitionTest extends AbstractPartitionTest {
// Invoke some operation that acquires leaderIsrUpdate write lock on one thread
executor.submit(CoreUtils.runnable {
while (!done.get) {
partitions.foreach(_.maybeShrinkIsr(10000))
partitions.foreach(_.maybeShrinkIsr())
}
})
// Append records to partitions, one partition-per-thread
Expand Down Expand Up @@ -1240,19 +1240,19 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(Log.UnknownOffset, remoteReplica.logStartOffset)

// On initialization, the replica is considered caught up and should not be removed
partition.maybeShrinkIsr(10000)
partition.maybeShrinkIsr()
assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)

// If enough time passes without a fetch update, the ISR should shrink
time.sleep(10001)
time.sleep(partition.replicaLagTimeMaxMs + 1)
val updatedLeaderAndIsr = LeaderAndIsr(
leader = brokerId,
leaderEpoch = leaderEpoch,
isr = List(brokerId),
zkVersion = 1)
when(stateStore.shrinkIsr(controllerEpoch, updatedLeaderAndIsr)).thenReturn(Some(2))

partition.maybeShrinkIsr(10000)
partition.maybeShrinkIsr()
assertEquals(Set(brokerId), partition.inSyncReplicaIds)
assertEquals(10L, partition.localLogOrException.highWatermark)
}
Expand Down Expand Up @@ -1325,7 +1325,7 @@ class PartitionTest extends AbstractPartitionTest {

// The ISR should not be shrunk because the follower has caught up with the leader at the
// time of the first fetch.
partition.maybeShrinkIsr(10000)
partition.maybeShrinkIsr()
assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
}

Expand Down Expand Up @@ -1383,7 +1383,7 @@ class PartitionTest extends AbstractPartitionTest {
time.sleep(10001)

// The ISR should not be shrunk because the follower is caught up to the leader's log end
partition.maybeShrinkIsr(10000)
partition.maybeShrinkIsr()
assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
}

Expand Down Expand Up @@ -1434,7 +1434,7 @@ class PartitionTest extends AbstractPartitionTest {
zkVersion = 1)
when(stateStore.shrinkIsr(controllerEpoch, updatedLeaderAndIsr)).thenReturn(None)

partition.maybeShrinkIsr(10000)
partition.maybeShrinkIsr()
assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
assertEquals(0L, partition.localLogOrException.highWatermark)
}
Expand Down
7 changes: 7 additions & 0 deletions docs/upgrade.html
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@

<script id="upgrade-template" type="text/x-handlebars-template">

<h5><a id="upgrade_250_notable" href="#upgrade_250_notable">Notable changes in 2.5.0</a></h5>
<ul>
<li>For improved resiliency in typical network environments, the default value of
<code>zookeeper.session.timeout.ms</code> has been increased from 6s to 18s and
<code>replica.lag.time.max.ms</code> from 10s to 30s.</li>
</ul>

<h4><a id="upgrade_2_4_0" href="#upgrade_2_4_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x or 2.2.x or 2.3.x to 2.4.0</a></h4>

<p><b>If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets.
Expand Down

0 comments on commit 4bde9bb

Please sign in to comment.