diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index d8956bfbe..f8100cde8 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -833,7 +833,9 @@ public synchronized void executeProposals(Collection proposal startExecution(loadMonitor, null, removedBrokers, replicationThrottle, isTriggeredByUserRequest); } catch (Exception e) { if (e instanceof OngoingExecutionException) { - LOG.info("Broker removal operation with uuid {} aborted due to ongoing execution", uuid); + LOG.info("User task {}: Broker removal operation aborted due to ongoing execution", uuid); + } else { + LOG.error("User task {}: Broker removal operation failed due to exception", uuid, e); } processExecuteProposalsFailure(); throw e; @@ -1141,13 +1143,14 @@ public synchronized void failGeneratingProposalsForExecution(String uuid) { */ public synchronized void userTriggeredStopExecution(boolean stopExternalAgent) { if (stopExecution()) { - LOG.info("User requested to stop the ongoing proposal execution."); + LOG.info("User task {}: User requested to stop the ongoing proposal execution.", _uuid); _numExecutionStoppedByUser.incrementAndGet(); _executionStoppedByUser.set(true); } if (stopExternalAgent) { if (maybeStopExternalAgent()) { - LOG.info("The request to stop ongoing external agent partition reassignment is submitted successfully."); + LOG.info("User task {}: The request to stop ongoing external agent partition reassignment is submitted successfully.", + _uuid); } } } @@ -1321,7 +1324,7 @@ private class ProposalExecutionRunnable implements Runnable { _noOngoingExecutionSemaphore.release(); _stopSignal.set(NO_STOP_EXECUTION); _executionStoppedByUser.set(false); - LOG.error("Failed to initialize proposal execution."); + LOG.error("User task {}: Failed to initialize proposal execution.", _uuid); throw new IllegalStateException("User task manager cannot be null."); } if (_demotedBrokers != null) { @@ -1357,32 +1360,31 @@ private class ProposalExecutionRunnable implements Runnable { } public void run() { - LOG.info("Starting executing balancing proposals."); + LOG.info("User task {}: Starting executing balancing proposals.", _uuid); final long start = System.currentTimeMillis(); try { UserTaskManager.UserTaskInfo userTaskInfo = initExecution(); execute(userTaskInfo); } catch (Exception e) { - LOG.error("ProposalExecutionRunnable got exception during run", e); + LOG.error("User task {}: ProposalExecutionRunnable got exception during run", _uuid, e); } finally { final long duration = System.currentTimeMillis() - start; _proposalExecutionTimer.update(duration, TimeUnit.MILLISECONDS); if (_executionTimerInvolveBrokerRemovalOrDemotion != null) { _executionTimerInvolveBrokerRemovalOrDemotion.update(duration, TimeUnit.MILLISECONDS); } - - String executionStatusString = String.format("task Id: %s; removed brokers: %s; demoted brokers: %s; total time used: %dms.", - _uuid, + String executionStatusString = String.format("removed brokers: %s; demoted brokers: %s; total time used: %dms.", _removedBrokers, _demotedBrokers, duration ); - if (_executionException != null) { - LOG.info("Execution failed: {}. Exception: {}", executionStatusString, _executionException.getMessage()); + LOG.info("User task {}: Execution failed: {}. Exception: {}", _uuid, executionStatusString, _executionException.getMessage()); } else { - LOG.info("Execution succeeded: {}. ", executionStatusString); + LOG.info("User task {}: Execution succeeded: {}. ", _uuid, executionStatusString); } + // Clear completed execution. + clearCompletedExecution(); } } @@ -1490,12 +1492,10 @@ private void execute(UserTaskManager.UserTaskInfo userTaskInfo) { updateOngoingExecutionState(); } } catch (Throwable t) { - LOG.error("Executor got exception during execution", t); + LOG.error("User task {}: Executor got exception during execution", _uuid, t); _executionException = t; } finally { notifyFinishedTask(userTaskInfo); - // Clear completed execution. - clearCompletedExecution(); } } @@ -1610,14 +1610,14 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc int numTotalPartitionMovements = _executionTaskManager.numRemainingInterBrokerPartitionMovements(); long totalDataToMoveInMB = _executionTaskManager.remainingInterBrokerDataToMoveInMB(); long startTime = System.currentTimeMillis(); - LOG.info("Starting {} inter-broker partition movements.", numTotalPartitionMovements); + LOG.info("User task {}: Starting {} inter-broker partition movements.", _uuid, numTotalPartitionMovements); int partitionsToMove = numTotalPartitionMovements; // Exhaust all the pending partition movements. while ((partitionsToMove > 0 || !inExecutionTasks().isEmpty()) && _stopSignal.get() == NO_STOP_EXECUTION) { // Get tasks to execute. List tasksToExecute = _executionTaskManager.getInterBrokerReplicaMovementTasks(); - LOG.info("Executor will execute {} task(s)", tasksToExecute.size()); + LOG.info("User task {}: Executor will execute {} task(s)", _uuid, tasksToExecute.size()); AlterPartitionReassignmentsResult result = null; if (!tasksToExecute.isEmpty()) { @@ -1632,7 +1632,8 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc int numFinishedPartitionMovements = _executionTaskManager.numFinishedInterBrokerPartitionMovements(); long finishedDataMovementInMB = _executionTaskManager.finishedInterBrokerDataMovementInMB(); updatePartitionMovementMetrics(numFinishedPartitionMovements, finishedDataMovementInMB, System.currentTimeMillis() - startTime); - LOG.info("{}/{} ({}%) inter-broker partition movements completed. {}/{} ({}%) MB have been moved.", + LOG.info("User task {}: {}/{} ({}%) inter-broker partition movements completed. {}/{} ({}%) MB have been moved.", + _uuid, numFinishedPartitionMovements, numTotalPartitionMovements, String.format("%.2f", numFinishedPartitionMovements * UNIT_INTERVAL_TO_PERCENTAGE / numTotalPartitionMovements), finishedDataMovementInMB, totalDataToMoveInMB, @@ -1653,13 +1654,15 @@ private void interBrokerMoveReplicas() throws InterruptedException, ExecutionExc // At this point it is guaranteed that there are no in execution tasks to wait -- i.e. all tasks are completed or dead. if (_stopSignal.get() == NO_STOP_EXECUTION) { - LOG.info("Inter-broker partition movements finished."); + LOG.info("User task {}: Inter-broker partition movements finished", _uuid); } else { ExecutionTasksSummary executionTasksSummary = _executionTaskManager.getExecutionTasksSummary(Collections.emptySet()); Map partitionMovementTasksByState = executionTasksSummary.taskStat().get(INTER_BROKER_REPLICA_ACTION); - LOG.info("Inter-broker partition movements stopped. For inter-broker partition movements {} tasks cancelled, {} tasks in-progress, " + LOG.info("User task {}: Inter-broker partition movements stopped. For inter-broker partition movements {} tasks cancelled, " + + "{} tasks in-progress, " + "{} tasks aborting, {} tasks aborted, {} tasks dead, {} tasks completed, {} remaining data to move; for intra-broker " + "partition movement {} tasks cancelled; for leadership movements {} tasks cancelled.", + _uuid, partitionMovementTasksByState.get(ExecutionTaskState.PENDING), partitionMovementTasksByState.get(ExecutionTaskState.IN_PROGRESS), partitionMovementTasksByState.get(ExecutionTaskState.ABORTING), @@ -1676,14 +1679,14 @@ private void intraBrokerMoveReplicas() { int numTotalPartitionMovements = _executionTaskManager.numRemainingIntraBrokerPartitionMovements(); long totalDataToMoveInMB = _executionTaskManager.remainingIntraBrokerDataToMoveInMB(); long startTime = System.currentTimeMillis(); - LOG.info("Starting {} intra-broker partition movements.", numTotalPartitionMovements); + LOG.info("User task {}: Starting {} intra-broker partition movements.", _uuid, numTotalPartitionMovements); int partitionsToMove = numTotalPartitionMovements; // Exhaust all the pending partition movements. while ((partitionsToMove > 0 || !inExecutionTasks().isEmpty()) && _stopSignal.get() == NO_STOP_EXECUTION) { // Get tasks to execute. List tasksToExecute = _executionTaskManager.getIntraBrokerReplicaMovementTasks(); - LOG.info("Executor will execute {} task(s)", tasksToExecute.size()); + LOG.info("User task {}: Executor will execute {} task(s)", _uuid, tasksToExecute.size()); if (!tasksToExecute.isEmpty()) { // Execute the tasks. @@ -1696,7 +1699,8 @@ private void intraBrokerMoveReplicas() { int numFinishedPartitionMovements = _executionTaskManager.numFinishedIntraBrokerPartitionMovements(); long finishedDataToMoveInMB = _executionTaskManager.finishedIntraBrokerDataToMoveInMB(); updatePartitionMovementMetrics(numFinishedPartitionMovements, finishedDataToMoveInMB, System.currentTimeMillis() - startTime); - LOG.info("{}/{} ({}%) intra-broker partition movements completed. {}/{} ({}%) MB have been moved.", + LOG.info("User task {}: {}/{} ({}%) intra-broker partition movements completed. {}/{} ({}%) MB have been moved.", + _uuid, numFinishedPartitionMovements, numTotalPartitionMovements, String.format("%.2f", numFinishedPartitionMovements * UNIT_INTERVAL_TO_PERCENTAGE / numTotalPartitionMovements), finishedDataToMoveInMB, totalDataToMoveInMB, @@ -1705,19 +1709,21 @@ private void intraBrokerMoveReplicas() { } Set inExecutionTasks = inExecutionTasks(); while (!inExecutionTasks.isEmpty()) { - LOG.info("Waiting for {} tasks moving {} MB to finish", inExecutionTasks.size(), + LOG.info("User task {}: Waiting for {} tasks moving {} MB to finish", _uuid, inExecutionTasks.size(), _executionTaskManager.inExecutionIntraBrokerDataMovementInMB()); waitForIntraBrokerReplicaTasksToFinish(); inExecutionTasks = inExecutionTasks(); } if (inExecutionTasks().isEmpty()) { - LOG.info("Intra-broker partition movements finished."); + LOG.info("User task {}: Intra-broker partition movements finished.", _uuid); } else if (_stopSignal.get() != NO_STOP_EXECUTION) { ExecutionTasksSummary executionTasksSummary = _executionTaskManager.getExecutionTasksSummary(Collections.emptySet()); Map partitionMovementTasksByState = executionTasksSummary.taskStat().get(INTRA_BROKER_REPLICA_ACTION); - LOG.info("Intra-broker partition movements stopped. For intra-broker partition movements {} tasks cancelled, {} tasks in-progress, " + LOG.info("User task {}: Intra-broker partition movements stopped. For intra-broker partition movements {} tasks cancelled, " + + "{} tasks in-progress, " + "{} tasks aborting, {} tasks aborted, {} tasks dead, {} tasks completed, {} remaining data to move; for leadership " + "movements {} tasks cancelled.", + _uuid, partitionMovementTasksByState.get(ExecutionTaskState.PENDING), partitionMovementTasksByState.get(ExecutionTaskState.IN_PROGRESS), partitionMovementTasksByState.get(ExecutionTaskState.ABORTING), @@ -1734,12 +1740,12 @@ private void intraBrokerMoveReplicas() { */ private void moveLeaderships() { int numTotalLeadershipMovements = _executionTaskManager.numRemainingLeadershipMovements(); - LOG.info("Starting {} leadership movements.", numTotalLeadershipMovements); + LOG.info("User task {}: Starting {} leadership movements.", _uuid, numTotalLeadershipMovements); int numFinishedLeadershipMovements = 0; while (_executionTaskManager.numRemainingLeadershipMovements() != 0 && _stopSignal.get() == NO_STOP_EXECUTION) { updateOngoingExecutionState(); numFinishedLeadershipMovements += moveLeadershipInBatch(); - LOG.info("{}/{} ({}%) leadership movements completed.", numFinishedLeadershipMovements, + LOG.info("User task {}: {}/{} ({}%) leadership movements completed.", _uuid, numFinishedLeadershipMovements, numTotalLeadershipMovements, numFinishedLeadershipMovements * 100 / numTotalLeadershipMovements); } if (inExecutionTasks().isEmpty()) { @@ -1747,8 +1753,9 @@ private void moveLeaderships() { } else if (_stopSignal.get() != NO_STOP_EXECUTION) { Map leadershipMovementTasksByState = _executionTaskManager.getExecutionTasksSummary(Collections.emptySet()).taskStat().get(LEADER_ACTION); - LOG.info("Leadership movements stopped. {} tasks cancelled, {} tasks in-progress, {} tasks aborting, {} tasks aborted, " + LOG.info("User task {}: Leadership movements stopped. {} tasks cancelled, {} tasks in-progress, {} tasks aborting, {} tasks aborted, " + "{} tasks dead, {} tasks completed.", + _uuid, leadershipMovementTasksByState.get(ExecutionTaskState.PENDING), leadershipMovementTasksByState.get(ExecutionTaskState.IN_PROGRESS), leadershipMovementTasksByState.get(ExecutionTaskState.ABORTING), @@ -1882,7 +1889,7 @@ private List waitForInterBrokerReplicaTasksToFinish(AlterPartitio } } while (retry); - LOG.info("Finished tasks: {}.{}{}{}", finishedTasks, + LOG.info("User task {}: Finished tasks: {}.{}{}{}", _uuid, finishedTasks, stoppedTaskIds.isEmpty() ? "" : String.format(". [Stopped: %s]", stoppedTaskIds), deletedTaskIds.isEmpty() ? "" : String.format(". [Deleted: %s]", deletedTaskIds), deadTaskIds.isEmpty() ? "" : String.format(". [Dead: %s]", deadTaskIds)); @@ -1957,7 +1964,7 @@ private void waitForLeadershipTasksToFinish(ElectLeadersResult result) { } } while (retry); - LOG.info("Finished tasks: {}.{}{}{}", finishedTasks, + LOG.info("User task {}: Finished tasks: {}.{}{}{}", _uuid, finishedTasks, stoppedTaskIds.isEmpty() ? "" : String.format(". [Stopped: %s]", stoppedTaskIds), deletedTaskIds.isEmpty() ? "" : String.format(". [Deleted: %s]", deletedTaskIds), deadTaskIds.isEmpty() ? "" : String.format(". [Dead: %s]", deadTaskIds)); @@ -2000,7 +2007,7 @@ private void waitForIntraBrokerReplicaTasksToFinish() { updateOngoingExecutionState(); } while (!inExecutionTasks().isEmpty() && finishedTasks.isEmpty()); - LOG.info("Finished tasks: {}.{}{}", finishedTasks, + LOG.info("User task {}: Finished tasks: {}.{}{}", _uuid, finishedTasks, deletedTaskIds.isEmpty() ? "" : String.format(". [Deleted: %s]", deletedTaskIds), deadTaskIds.isEmpty() ? "" : String.format(". [Dead: %s]", deadTaskIds)); } @@ -2044,7 +2051,7 @@ private void handleDeadInterBrokerReplicaTasks(List deadInterBrok if (_stopSignal.get() == NO_STOP_EXECUTION) { // If there are dead tasks, Cruise Control stops the execution. - LOG.info("Stop the execution due to {} dead tasks: {}.", tasksToCancel.size(), tasksToCancel); + LOG.info("User task {}: Stop the execution due to {} dead tasks: {}.", _uuid, tasksToCancel.size(), tasksToCancel); stopExecution(); } @@ -2063,7 +2070,8 @@ private void handleDeadInterBrokerReplicaTasks(List deadInterBrok break; } try { - LOG.info("Waiting for the rollback of ongoing inter-broker replica reassignments for {}.", intersection); + LOG.info("User task {}: Waiting for the rollback of ongoing inter-broker replica reassignments for {}.", + _uuid, intersection); Thread.sleep(executionProgressCheckIntervalMs()); } catch (InterruptedException e) { // let it go @@ -2112,11 +2120,12 @@ private boolean maybeMarkTaskAsDead(Cluster cluster, case LEADER_ACTION: if (cluster.nodeById(task.proposal().newLeader().brokerId()) == null) { _executionTaskManager.markTaskDead(task); - LOG.warn("Killing execution for task {} because the target leader is down.", task); + LOG.warn("User task {}: Killing execution for task {} because the target leader is down.", _uuid, task); return true; } else if (_time.milliseconds() > task.startTimeMs() + _leaderMovementTimeoutMs) { _executionTaskManager.markTaskDead(task); - LOG.warn("Killing execution for task {} because it took longer than {} to finish.", task, _leaderMovementTimeoutMs); + LOG.warn("User task {}: Killing execution for task {} because it took longer than {} to finish.", + _uuid, task, _leaderMovementTimeoutMs); return true; } break; @@ -2126,7 +2135,7 @@ private boolean maybeMarkTaskAsDead(Cluster cluster, if (cluster.nodeById(broker.brokerId()) == null || deadInterBrokerReassignments.contains(task.proposal().topicPartition())) { _executionTaskManager.markTaskDead(task); - LOG.warn("Killing execution for task {} because the new replica {} is down.", task, broker); + LOG.warn("User task {}: Killing execution for task {} because the new replica {} is down.", _uuid, task, broker); return true; } } @@ -2135,7 +2144,7 @@ private boolean maybeMarkTaskAsDead(Cluster cluster, case INTRA_BROKER_REPLICA_ACTION: if (!logdirInfoByTask.containsKey(task)) { _executionTaskManager.markTaskDead(task); - LOG.warn("Killing execution for task {} because the destination disk is down.", task); + LOG.warn("User task {}: Killing execution for task {} because the destination disk is down.", _uuid, task); return true; } break; @@ -2165,7 +2174,8 @@ private void maybeReexecuteInterBrokerReplicaTasks(Set deleted, candidateInterBrokerReplicaTasksToReexecute); } catch (TimeoutException | InterruptedException | ExecutionException e) { // This may indicate transient (e.g. network) issues. - LOG.warn("Failed to retrieve partitions being reassigned. Skipping reexecution check for inter-broker replica actions.", e); + LOG.warn("User task {}: Failed to retrieve partitions being reassigned. Skipping reexecution check for inter-broker replica actions.", + _uuid, e); tasksToReexecute = Collections.emptyList(); } if (!tasksToReexecute.isEmpty()) { @@ -2195,7 +2205,7 @@ private void maybeReexecuteIntraBrokerReplicaTasks() { } }); if (!intraBrokerReplicaTasksToReexecute.isEmpty()) { - LOG.info("Reexecuting tasks {}", intraBrokerReplicaTasksToReexecute); + LOG.info("User task {}: Reexecuting tasks {}", _uuid, intraBrokerReplicaTasksToReexecute); executeIntraBrokerReplicaMovements(intraBrokerReplicaTasksToReexecute, _adminClient, _executionTaskManager, _config); } } @@ -2210,7 +2220,7 @@ private void maybeReexecuteIntraBrokerReplicaTasks() { private void maybeReexecuteLeadershipTasks(Set deleted) { List leaderActionsToReexecute = new ArrayList<>(_executionTaskManager.inExecutionTasks(Collections.singleton(LEADER_ACTION))); if (!leaderActionsToReexecute.isEmpty()) { - LOG.info("Reexecuting tasks {}", leaderActionsToReexecute); + LOG.info("User task {}: Reexecuting tasks {}", _uuid, leaderActionsToReexecute); ElectLeadersResult electLeadersResult = ExecutionUtils.submitPreferredLeaderElection(_adminClient, leaderActionsToReexecute); ExecutionUtils.processElectLeadersResult(electLeadersResult, deleted); } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/GoalBasedOperationRunnable.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/GoalBasedOperationRunnable.java index 1a75063e0..66fd1a727 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/GoalBasedOperationRunnable.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/GoalBasedOperationRunnable.java @@ -212,8 +212,8 @@ protected abstract OptimizerResult workWithClusterModel() protected void finish() { if (_operationProgress != null) { long totalTimeMs = _operationProgress.getCurrentTotalExecutionTimeMs(); - LOG.info("Operation {} finished with uuid {}; total time: {}ms; steps: {}", - _operationProgress.getOperation(), _uuid, totalTimeMs, _operationProgress); + LOG.info("User task {}: {} finished with total time: {}ms; steps: {}", + _uuid, _operationProgress.getOperation(), totalTimeMs, _operationProgress); } _operationProgress = null; _combinedCompletenessRequirements = null; diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java index 4be5b2fd0..16e05b96e 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveBrokersRunnable.java @@ -10,6 +10,7 @@ import com.linkedin.kafka.cruisecontrol.analyzer.OptimizerResult; import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig; import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException; +import com.linkedin.kafka.cruisecontrol.executor.ExecutionProposal; import com.linkedin.kafka.cruisecontrol.executor.strategy.ReplicaMovementStrategy; import com.linkedin.kafka.cruisecontrol.model.Broker; import com.linkedin.kafka.cruisecontrol.model.ClusterModel; @@ -19,7 +20,10 @@ import java.util.Set; import java.util.concurrent.TimeoutException; import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.OPERATION_LOGGER; import static com.linkedin.kafka.cruisecontrol.servlet.handler.async.runnable.RunnableUtils.SELF_HEALING_DESTINATION_BROKER_IDS; import static com.linkedin.kafka.cruisecontrol.servlet.handler.async.runnable.RunnableUtils.SELF_HEALING_REPLICA_MOVEMENT_STRATEGY; import static com.linkedin.kafka.cruisecontrol.servlet.handler.async.runnable.RunnableUtils.SELF_HEALING_CONCURRENT_MOVEMENTS; @@ -33,6 +37,8 @@ * The async runnable for broker decommission. */ public class RemoveBrokersRunnable extends GoalBasedOperationRunnable { + private static final Logger LOG = LoggerFactory.getLogger(RemoveBrokersRunnable.class); + private static final Logger OPERATION_LOG = LoggerFactory.getLogger(OPERATION_LOGGER); protected final Set _removedBrokerIds; protected final Set _destinationBrokerIds; protected final boolean _throttleRemovedBrokers; @@ -95,6 +101,7 @@ protected OptimizationResult getResult() throws Exception { @Override protected OptimizerResult workWithClusterModel() throws KafkaCruiseControlException, TimeoutException, NotEnoughValidWindowsException { + long start = System.currentTimeMillis(); ClusterModel clusterModel = _kafkaCruiseControl.clusterModel(_combinedCompletenessRequirements, _allowCapacityEstimation, _operationProgress); sanityCheckBrokersHavingOfflineReplicasOnBadDisks(_goals, clusterModel); _removedBrokerIds.forEach(id -> clusterModel.setBrokerState(id, Broker.State.DEAD)); @@ -104,7 +111,6 @@ protected OptimizerResult workWithClusterModel() throws KafkaCruiseControlExcept if (!_destinationBrokerIds.isEmpty()) { _kafkaCruiseControl.sanityCheckBrokerPresence(_destinationBrokerIds); } - OptimizationOptions optimizationOptions = computeOptimizationOptions(clusterModel, false, _kafkaCruiseControl, @@ -116,10 +122,36 @@ protected OptimizerResult workWithClusterModel() throws KafkaCruiseControlExcept _destinationBrokerIds, false, _fastMode); - + LOG.info("User task {}: Optimization options: {}", _uuid, optimizationOptions); OptimizerResult result = _kafkaCruiseControl.optimizations(clusterModel, _goalsByPriority, _operationProgress, null, optimizationOptions); + long goalProposalGenerationTimeMs = System.currentTimeMillis() - start; + LOG.info("User task {}: Time in proposals generation: {}ms; Optimization result: {}", _uuid, + goalProposalGenerationTimeMs, result.getProposalSummary()); + Set goalProposals = result.goalProposals(); + OPERATION_LOG.info("User task {}: Goal proposals: {}", _uuid, goalProposals); if (!_dryRun) { - _kafkaCruiseControl.executeRemoval(result.goalProposals(), _throttleRemovedBrokers, _removedBrokerIds, isKafkaAssignerMode(_goals), + LOG.info("User task {}: Execute broker removal. throttleRemovedBrokers={}, " + + "removedBrokerIds={}, " + + "concurrentInterBrokerPartitionMovements={}, " + + "maxInterBrokerPartitionMovements={}, " + + "clusterLeaderMovementConcurrency={}, " + + "brokerLeaderMovementConcurrency={}, " + + "executionProgressCheckIntervalMs={}, " + + "replicaMovementStrategy={}, " + + "replicationThrottle={}, " + + "isTriggeredByUserRequest={}", + _uuid, + _throttleRemovedBrokers, + _removedBrokerIds, + _concurrentInterBrokerPartitionMovements, + _maxInterBrokerPartitionMovements, + _clusterLeaderMovementConcurrency, + _brokerLeaderMovementConcurrency, + _executionProgressCheckIntervalMs, + _replicaMovementStrategy, + _replicationThrottle, + _isTriggeredByUserRequest); + _kafkaCruiseControl.executeRemoval(goalProposals, _throttleRemovedBrokers, _removedBrokerIds, isKafkaAssignerMode(_goals), _concurrentInterBrokerPartitionMovements, _maxInterBrokerPartitionMovements, _clusterLeaderMovementConcurrency, _brokerLeaderMovementConcurrency, _executionProgressCheckIntervalMs, _replicaMovementStrategy, _replicationThrottle, diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTest.java index 324659d74..2106f0f8a 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTest.java @@ -218,7 +218,7 @@ public void testRemoveBrokerEvent() EasyMock.expect(_optimizerResult.statsByGoalName()).andReturn(new LinkedHashMap<>(0)).times(2); EasyMock.expect(_optimizerResult.brokerStatsAfterOptimization()).andReturn(_brokerStats).times(2); EasyMock.expect(_brokerStats.getJsonStructure()).andReturn(Collections.emptyMap()); - EasyMock.expect(_optimizerResult.getProposalSummary()).andReturn(null); + EasyMock.expect(_optimizerResult.getProposalSummary()).andReturn(null).times(2); // Replay mocks. EasyMock.replay(_mockKafkaCruiseControl, _optimizerResult, _brokerStats);