Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: gengjun-git <[email protected]>
  • Loading branch information
gengjun-git committed Mar 6, 2024
1 parent ac2892d commit 67c14d6
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -343,14 +343,16 @@ private void finishCreateReplica(AgentTask task, TFinishTaskRequest request) {
"finish recover create replica task. set replica to good. tablet {}, replica {}, backend {}",
tabletId, task.getBackendId(), replica.getId());
}
} else {
LOG.warn("tablet_info is not set in finishTaskRequest");
}

// this should be called before 'countDownLatch()'
Catalog.getCurrentSystemInfo()
.updateBackendReportVersion(task.getBackendId(), request.getReport_version(), task.getDbId());

createReplicaTask.countDownLatch(task.getBackendId(), task.getSignature());
LOG.debug("finish create replica. tablet id: {}, be: {}, report version: {}",
LOG.info("finish create replica. tablet id: {}, be: {}, report version: {}",
tabletId, task.getBackendId(), request.getReport_version());
}
} finally {
Expand Down
23 changes: 21 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/master/ReportHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.starrocks.catalog.Catalog;
import com.starrocks.catalog.ColocateTableIndex;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.DiskInfo;
import com.starrocks.catalog.LocalTablet;
import com.starrocks.catalog.LocalTablet.TabletStatus;
import com.starrocks.catalog.MaterializedIndex;
Expand Down Expand Up @@ -601,6 +602,10 @@ private static void deleteFromMeta(ListMultimap<Long, Long> tabletDeleteFromMeta
AgentBatchTask createReplicaBatchTask = new AgentBatchTask();
TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
Catalog catalog = Catalog.getCurrentCatalog();
Map<Long, DiskInfo> hashToDiskInfo = new HashMap<>();
for (DiskInfo diskInfo : Catalog.getCurrentSystemInfo().getBackend(backendId).getDisks().values()) {
hashToDiskInfo.put(diskInfo.getPathHash(), diskInfo);
}
final long MAX_DB_WLOCK_HOLDING_TIME_MS = 1000L;
DB_TRAVERSE:
for (Long dbId : tabletDeleteFromMeta.keySet()) {
Expand Down Expand Up @@ -675,8 +680,22 @@ private static void deleteFromMeta(ListMultimap<Long, Long> tabletDeleteFromMeta

long currentBackendReportVersion =
Catalog.getCurrentSystemInfo().getBackendReportVersion(backendId);
if (backendReportVersion < currentBackendReportVersion) {
continue;
DiskInfo diskInfo = hashToDiskInfo.get(replica.getPathHash());

// Only check reportVersion when the disk is online,
// as there will be no tablet changes on an unavailable disk
if (diskInfo != null
&& diskInfo.getState() == DiskInfo.DiskState.ONLINE
&& backendReportVersion < currentBackendReportVersion) {
LOG.warn("report Version from be: {} is outdated, report version in request: {}, " +
"latest report version: {}", backendId, backendReportVersion, currentBackendReportVersion);
break DB_TRAVERSE;
} else if (diskInfo == null) {
LOG.warn("disk of path hash {} dose not exist, delete tablet {} on backend {} from meta",
tableId, backendId, replica.getPathHash());
} else if (diskInfo.getState() != DiskInfo.DiskState.ONLINE) {
LOG.warn("disk of path hash {} not available, delete tablet {} on backend {} from meta",
tableId, backendId, replica.getPathHash());
}

ReplicaState state = replica.getState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,7 @@ public void updateBackendReportVersion(long backendId, long newReportVersion, lo
if (db != null) {
db.readLock();
try {
atomicLong.set(newReportVersion);
updateReportVersionIncrementally(atomicLong, newReportVersion);
LOG.debug("update backend {} report version: {}, db: {}", backendId, newReportVersion, dbId);
} finally {
db.readUnlock();
Expand All @@ -948,6 +948,12 @@ public void updateBackendReportVersion(long backendId, long newReportVersion, lo
}
}

protected synchronized void updateReportVersionIncrementally(AtomicLong currentVersion, long newVersion) {
if (currentVersion.get() < newVersion) {
currentVersion.set(newVersion);
}
}

public long saveBackends(DataOutputStream dos, long checksum) throws IOException {
ImmutableMap<Long, Backend> idToBackend = idToBackendRef;
int backendCount = idToBackend.size();
Expand Down

0 comments on commit 67c14d6

Please sign in to comment.