Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
xy720 committed Jan 15, 2025
1 parent 63d45ee commit 4d48c00
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,10 @@ public class Config extends ConfigBase {
"Queue size to store heartbeat task in heartbeat_mgr"})
public static int heartbeat_mgr_blocking_queue_size = 1024;

@ConfField(masterOnly = true, description = {"TabletStatMgr线程数",
"Num of thread to update tablet stat"})
public static int tablet_stat_mgr_threads_num = 16;

@ConfField(masterOnly = true, description = {"Agent任务线程池的线程数",
"Num of thread to handle agent task in agent task thread-pool"})
public static int max_agent_task_threads_num = 4096;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.Status;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
Expand All @@ -34,7 +37,9 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/*
* TabletStatMgr is for collecting tablet(replica) statistics from backends.
Expand All @@ -43,24 +48,15 @@
public class TabletStatMgr extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(TabletStatMgr.class);

private ForkJoinPool taskPool = new ForkJoinPool(Math.max(8, Runtime.getRuntime().availableProcessors()));
private final ExecutorService executor = ThreadPoolManager.newDaemonFixedThreadPool(
Config.tablet_stat_mgr_threads_num, 256, "tablet-stat-mgr", true);

private MarkedCountDownLatch<Long, Backend> updateTabletStatsLatch = null;

public TabletStatMgr() {
super("tablet stat mgr", Config.tablet_stat_update_interval_second * 1000);
}

private ForkJoinPool adjustThreadPool(int backendSize) {
int minimunParallelism = Math.max(8, Runtime.getRuntime().availableProcessors());
int maximunParallelism = 64;
int newParallelism = Math.min(backendSize, maximunParallelism);
newParallelism = Math.max(newParallelism, minimunParallelism);
newParallelism = (newParallelism + 7) / 8 * 8; // Round up to the multiple of 8
if (taskPool == null || taskPool.getParallelism() != newParallelism) {
return new ForkJoinPool(newParallelism);
}
return taskPool;
}

@Override
protected void runAfterCatalogReady() {
ImmutableMap<Long, Backend> backends;
Expand All @@ -71,10 +67,13 @@ protected void runAfterCatalogReady() {
return;
}
long start = System.currentTimeMillis();
taskPool = adjustThreadPool(backends.size());
taskPool.submit(() -> {
// no need to get tablet stat if backend is not alive
backends.values().parallelStream().filter(Backend::isAlive).forEach(backend -> {
// no need to get tablet stat if backend is not alive
List<Backend> aliveBackends = backends.values().stream().filter(Backend::isAlive)
.collect(Collectors.toList());
updateTabletStatsLatch = new MarkedCountDownLatch<>(aliveBackends.size());
aliveBackends.forEach(backend -> {
updateTabletStatsLatch.addMark(backend.getId(), backend);
executor.submit(() -> {
BackendService.Client client = null;
TNetworkAddress address = null;
boolean ok = false;
Expand All @@ -87,8 +86,10 @@ protected void runAfterCatalogReady() {
result.getTabletsStatsSize());
}
updateTabletStat(backend.getId(), result);
updateTabletStatsLatch.markedCountDown(backend.getId(), backend);
ok = true;
} catch (Throwable e) {
updateTabletStatsLatch.markedCountDownWithStatus(backend.getId(), backend, Status.CANCELLED);
LOG.warn("task exec error. backend[{}]", backend.getId(), e);
}

Expand All @@ -102,7 +103,9 @@ protected void runAfterCatalogReady() {
LOG.warn("client pool recyle error. backend[{}]", backend.getId(), e);
}
});
}).join();
});
waitForTabletStatUpdate();

if (LOG.isDebugEnabled()) {
LOG.debug("finished to get tablet stat of all backends. cost: {} ms",
(System.currentTimeMillis() - start));
Expand Down Expand Up @@ -235,6 +238,29 @@ protected void runAfterCatalogReady() {
(System.currentTimeMillis() - start));
}

public void waitForTabletStatUpdate() {
boolean ok = false;
try {
if (!updateTabletStatsLatch.await(60, TimeUnit.SECONDS)) {
LOG.info("waiting {} update tablet stats tasks finish. {}",
updateTabletStatsLatch.getCount(), this);
}
ok = true;
} catch (InterruptedException e) {
LOG.warn("InterruptedException, {}", this, e);
}
if (!ok || !updateTabletStatsLatch.getStatus().ok()) {
List<Long> unfinishedBackendIds = updateTabletStatsLatch.getLeftMarks().stream()
.map(Map.Entry::getKey).collect(Collectors.toList());
Status status = Status.TIMEOUT;
if (!updateTabletStatsLatch.getStatus().ok()) {
status = updateTabletStatsLatch.getStatus();
}
LOG.warn("Failed to update tablet stats reason: {}, unfinished backends: {}",
status.getErrorMsg(), unfinishedBackendIds);
}
}

private void updateTabletStat(Long beId, TTabletStatResult result) {
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
if (result.isSetTabletStatList()) {
Expand Down

0 comments on commit 4d48c00

Please sign in to comment.