Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into trunk
Browse files Browse the repository at this point in the history
  • Loading branch information
chiacyu authored Apr 9, 2024
2 parents 87faa4f + f895ab5 commit 9b2c8ab
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 6 deletions.
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@ class BrokerServer(

CoreUtils.swallow(lifecycleManager.close(), this)
CoreUtils.swallow(config.dynamicConfig.clear(), this)
CoreUtils.swallow(clientMetricsManager.close(), this)
sharedServer.stopForBroker()
info("shut down completed")
} catch {
Expand Down
7 changes: 5 additions & 2 deletions core/src/test/scala/unit/kafka/utils/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ import org.apache.kafka.common.utils.Utils.formatAddress
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.controller.QuorumController
import org.apache.kafka.metadata.properties.MetaProperties
import org.apache.kafka.server.ControllerRequestCompletionHandler
import org.apache.kafka.server.{ClientMetricsManager, ControllerRequestCompletionHandler}
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer => JAuthorizer}
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.config.{Defaults, ZkConfigs}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.server.util.timer.SystemTimer
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
import org.apache.zookeeper.KeeperException.SessionExpiredException
Expand Down Expand Up @@ -2490,7 +2491,9 @@ object TestUtils extends Logging {
AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(),
AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
QuorumTestHarness.ZkClientEventThreadSuffix,
QuorumController.CONTROLLER_THREAD_SUFFIX
QuorumController.CONTROLLER_THREAD_SUFFIX,
ClientMetricsManager.CLIENT_METRICS_REAPER_THREAD_NAME,
SystemTimer.SYSTEM_TIMER_THREAD_PREFIX,
)

def unexpectedThreads: Set[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class SystemTimer implements Timer {
public static final String SYSTEM_TIMER_THREAD_PREFIX = "executor-";

// timeout timer
private final ExecutorService taskExecutor;
private final DelayQueue<TimerTaskList> delayQueue;
Expand All @@ -49,7 +51,7 @@ public SystemTimer(
long startMs
) {
this.taskExecutor = Executors.newFixedThreadPool(1,
runnable -> KafkaThread.nonDaemon("executor-" + executorName, runnable));
runnable -> KafkaThread.nonDaemon(SYSTEM_TIMER_THREAD_PREFIX + executorName, runnable));
this.delayQueue = new DelayQueue<>();
this.taskCounter = new AtomicInteger(0);
this.timingWheel = new TimingWheel(
Expand Down Expand Up @@ -110,4 +112,9 @@ public int size() {
public void close() {
taskExecutor.shutdown();
}

// visible for testing
boolean isTerminated() {
return taskExecutor.isTerminated();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,9 @@ public void run() {}
reaper.awaitShutdown();
timer.close();
}

// visible for testing
boolean isShutdown() {
return reaper.isShutdownComplete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -59,4 +60,13 @@ public void testReaper() throws Exception {
timer.close();
}
}

@Test
public void testReaperClose() throws Exception {
Timer timer = Mockito.mock(Timer.class);
SystemTimerReaper timerReaper = new SystemTimerReaper("reaper", timer);
timerReaper.close();
Mockito.verify(timer, Mockito.times(1)).close();
TestUtils.waitForCondition(timerReaper::isShutdown, "reaper not shutdown");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.server.util.timer;

import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -66,7 +67,7 @@ public void run() {
}
}

private Timer timer = null;
private SystemTimer timer = null;

@BeforeEach
public void setup() {
Expand All @@ -76,6 +77,7 @@ public void setup() {
@AfterEach
public void teardown() throws Exception {
timer.close();
TestUtils.waitForCondition(timer::isTerminated, "timer executor not terminated");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
* Handles client telemetry metrics requests/responses, subscriptions and instance information.
*/
public class ClientMetricsManager implements AutoCloseable {
public static final String CLIENT_METRICS_REAPER_THREAD_NAME = "client-metrics-reaper";

private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class);
private static final List<Byte> SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList(
Expand Down Expand Up @@ -112,7 +113,7 @@ public ClientMetricsManager(ClientMetricsReceiverPlugin receiverPlugin, int clie
this.subscriptionMap = new ConcurrentHashMap<>();
this.subscriptionUpdateVersion = new AtomicInteger(0);
this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CACHE_MAX_SIZE));
this.expirationTimer = new SystemTimerReaper("client-metrics-reaper", new SystemTimer("client-metrics"));
this.expirationTimer = new SystemTimerReaper(CLIENT_METRICS_REAPER_THREAD_NAME, new SystemTimer("client-metrics"));
this.clientTelemetryMaxBytes = clientTelemetryMaxBytes;
this.time = time;
this.cacheExpiryMs = cacheExpiryMs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ static Map<TopicPartition, List<Integer>> getReplicaAssignmentForTopics(Admin ad
* @param adminClient The AdminClient to use.
* @param partitions The partitions to get information about.
* @return A map from partitions to broker assignments.
* If any topic can't be found, an exception will be thrown.
* If any topic or partition can't be found, an exception will be thrown.
*/
static Map<TopicPartition, List<Integer>> getReplicaAssignmentForPartitions(Admin adminClient,
Set<TopicPartition> partitions
Expand All @@ -654,6 +654,13 @@ static Map<TopicPartition, List<Integer>> getReplicaAssignmentForPartitions(Admi
res.put(tp, info.replicas().stream().map(Node::id).collect(Collectors.toList()));
})
);

if (!res.keySet().equals(partitions)) {
Set<TopicPartition> missingPartitions = new HashSet<>(partitions);
missingPartitions.removeAll(res.keySet());
throw new ExecutionException(new UnknownTopicOrPartitionException("Unable to find partition: " +
missingPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))));
}
return res;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.replicaMoveStatesToString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -300,6 +301,13 @@ public void testGetReplicaAssignments() throws Exception {

assertEquals(assignments,
getReplicaAssignmentForPartitions(adminClient, new HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("bar", 0)))));

UnknownTopicOrPartitionException exception =
assertInstanceOf(UnknownTopicOrPartitionException.class,
assertThrows(ExecutionException.class,
() -> getReplicaAssignmentForPartitions(adminClient,
new HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("foo", 10))))).getCause());
assertEquals("Unable to find partition: foo-10", exception.getMessage());
}
}

Expand Down

0 comments on commit 9b2c8ab

Please sign in to comment.