Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

branch-3.0: [fix](cloud) Fix cloud decomission and check wal #47187 #47483

Merged
merged 1 commit into from
Jan 30, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -183,9 +183,22 @@ private void updateStatus(List<Backend> currentBes, List<Cloud.NodeInfoPB> expec
} catch (UserException e) {
LOG.warn("failed to register water shed txn id, decommission be {}", be.getId(), e);
}
be.setDecommissioned(true);
be.setDecommissioning(true);
}
}

if (status == Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONED) {
// When the synchronization status of the node is "NODE_STATUS_DECOMMISSIONED",
// it indicates that the conditions for decommissioning have
// already been checked in CloudTabletRebalancer.java,
// such as the tablets having been successfully migrated and no remnants of WAL on the backend (BE).
if (!be.isDecommissioned()) {
LOG.warn("impossible status, somewhere has bug, backend: {} status: {}", be, status);
}
be.setDecommissioned(true);
// edit log
Env.getCurrentEnv().getEditLog().logBackendStateChange(be);
}
}
}

Original file line number Diff line number Diff line change
@@ -483,46 +483,53 @@ public void checkDecommissionState(Map<String, List<Long>> clusterToBes) {
LOG.info("backend {} not found", beId);
continue;
}
if ((backend.isDecommissioned() && tabletNum == 0 && !backend.isActive())
|| (backend.isDecommissioned() && beList.size() == 1)) {
LOG.info("check decommission be {} state {} tabletNum {} isActive {} beList {}",
backend.getId(), backend.isDecommissioned(), tabletNum, backend.isActive(), beList);
if (!beToDecommissionedTime.containsKey(beId)) {
LOG.info("prepare to notify meta service be {} decommissioned", backend.getId());
Cloud.AlterClusterRequest.Builder builder =
Cloud.AlterClusterRequest.newBuilder();
builder.setCloudUniqueId(Config.cloud_unique_id);
builder.setOp(Cloud.AlterClusterRequest.Operation.NOTIFY_DECOMMISSIONED);

Cloud.ClusterPB.Builder clusterBuilder =
Cloud.ClusterPB.newBuilder();
clusterBuilder.setClusterName(backend.getCloudClusterName());
clusterBuilder.setClusterId(backend.getCloudClusterId());
clusterBuilder.setType(Cloud.ClusterPB.Type.COMPUTE);

Cloud.NodeInfoPB.Builder nodeBuilder = Cloud.NodeInfoPB.newBuilder();
nodeBuilder.setIp(backend.getHost());
nodeBuilder.setHeartbeatPort(backend.getHeartbeatPort());
nodeBuilder.setCloudUniqueId(backend.getCloudUniqueId());
nodeBuilder.setStatus(Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONED);

clusterBuilder.addNodes(nodeBuilder);
builder.setCluster(clusterBuilder);

Cloud.AlterClusterResponse response;
try {
response = MetaServiceProxy.getInstance().alterCluster(builder.build());
if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
LOG.warn("notify decommission response: {}", response);
}
LOG.info("notify decommission response: {} ", response);
} catch (RpcException e) {
LOG.info("failed to notify decommission", e);
return;
}
beToDecommissionedTime.put(beId, System.currentTimeMillis() / 1000);
if (!backend.isDecommissioning()) {
continue;
}
// here check wal
long walNum = Env.getCurrentEnv().getGroupCommitManager().getAllWalQueueSize(backend);
LOG.info("check decommissioning be {} state {} tabletNum {} isActive {} beList {}, wal num {}",
backend.getId(), backend.isDecommissioning(), tabletNum, backend.isActive(), beList, walNum);
if ((tabletNum != 0 || backend.isActive() || walNum != 0) && beList.size() != 1) {
continue;
}
if (beToDecommissionedTime.containsKey(beId)) {
continue;
}
LOG.info("prepare to notify meta service be {} decommissioned", backend.getAddress());
Cloud.AlterClusterRequest.Builder builder =
Cloud.AlterClusterRequest.newBuilder();
builder.setCloudUniqueId(Config.cloud_unique_id);
builder.setOp(Cloud.AlterClusterRequest.Operation.NOTIFY_DECOMMISSIONED);

Cloud.ClusterPB.Builder clusterBuilder =
Cloud.ClusterPB.newBuilder();
clusterBuilder.setClusterName(backend.getCloudClusterName());
clusterBuilder.setClusterId(backend.getCloudClusterId());
clusterBuilder.setType(Cloud.ClusterPB.Type.COMPUTE);

Cloud.NodeInfoPB.Builder nodeBuilder = Cloud.NodeInfoPB.newBuilder();
nodeBuilder.setIp(backend.getHost());
nodeBuilder.setHeartbeatPort(backend.getHeartbeatPort());
nodeBuilder.setCloudUniqueId(backend.getCloudUniqueId());
nodeBuilder.setStatus(Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONED);

clusterBuilder.addNodes(nodeBuilder);
builder.setCluster(clusterBuilder);

Cloud.AlterClusterResponse response;
try {
response = MetaServiceProxy.getInstance().alterCluster(builder.build());
if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
LOG.warn("notify decommission response: {}", response);
continue;
}
LOG.info("notify decommission response: {} ", response);
} catch (RpcException e) {
LOG.warn("failed to notify decommission", e);
continue;
}
beToDecommissionedTime.put(beId, System.currentTimeMillis() / 1000);
}
}
}
@@ -884,7 +891,7 @@ private boolean getTransferPair(List<Long> bes, Map<Long, List<Tablet>> beToTabl
LOG.info("backend {} not found", be);
continue;
}
if (tabletNum < minTabletsNum && backend.isAlive() && !backend.isDecommissioned()
if (tabletNum < minTabletsNum && backend.isAlive() && !backend.isDecommissioning()
&& !backend.isSmoothUpgradeSrc()) {
destBe = be;
minTabletsNum = tabletNum;
@@ -898,7 +905,7 @@ private boolean getTransferPair(List<Long> bes, Map<Long, List<Tablet>> beToTabl
LOG.info("backend {} not found", be);
continue;
}
if (backend.isDecommissioned() && tabletNum > 0) {
if (backend.isDecommissioning() && tabletNum > 0) {
srcBe = be;
srcDecommissioned = true;
break;
@@ -967,7 +974,7 @@ private void balanceImpl(List<Long> bes, String clusterId, Map<Long, List<Tablet
for (Long be : bes) {
long tabletNum = beToTablets.get(be) == null ? 0 : beToTablets.get(be).size();
Backend backend = cloudSystemInfoService.getBackend(be);
if (backend != null && !backend.isDecommissioned()) {
if (backend != null && !backend.isDecommissioning()) {
beNum++;
}
totalTabletsNum += tabletNum;
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.SlidingWindowCounter;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
@@ -127,6 +128,11 @@ private boolean isPreviousWalFinished(long tableId, List<Long> aliveBeIds) {
}

public long getAllWalQueueSize(Backend backend) {
long getAllWalQueueSizeDP = DebugPointUtil.getDebugParamOrDefault("FE.GET_ALL_WAL_QUEUE_SIZE", -1L);
if (getAllWalQueueSizeDP > 0) {
LOG.info("backend id:" + backend.getHost() + ",use dp all wal size:" + getAllWalQueueSizeDP);
return getAllWalQueueSizeDP;
}
PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder()
.setTableId(-1)
.build();
14 changes: 14 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
Original file line number Diff line number Diff line change
@@ -97,6 +97,8 @@ public class Backend implements Writable {
@SerializedName("isDecommissioned")
private AtomicBoolean isDecommissioned;

private AtomicBoolean isDecommissioning = new AtomicBoolean(false);

// rootPath -> DiskInfo
@SerializedName("disksRef")
private volatile ImmutableMap<String, DiskInfo> disksRef;
@@ -404,6 +406,14 @@ public boolean setDecommissioned(boolean isDecommissioned) {
return false;
}

public boolean setDecommissioning(boolean isDecommissioning) {
if (this.isDecommissioning.compareAndSet(!isDecommissioning, isDecommissioning)) {
LOG.warn("{} set decommissioning: {}", this.toString(), isDecommissioning);
return true;
}
return false;
}

public void setHost(String host) {
this.host = host;
}
@@ -490,6 +500,10 @@ public boolean isDecommissioned() {
return this.isDecommissioned.get();
}

public boolean isDecommissioning() {
return this.isDecommissioning.get();
}

public boolean isQueryAvailable() {
return isAlive() && !isQueryDisabled() && !isShutDown.get();
}
Original file line number Diff line number Diff line change
@@ -2382,15 +2382,19 @@ class Suite implements GroovyInterceptable {
}
}

def get_cluster = { be_unique_id ->
def get_cluster = { be_unique_id , MetaService ms=null->
def jsonOutput = new JsonOutput()
def map = [instance_id: "${instance_id}", cloud_unique_id: "${be_unique_id}" ]
def js = jsonOutput.toJson(map)
log.info("get cluster req: ${js} ".toString())

def add_cluster_api = { request_body, check_func ->
httpTest {
endpoint context.config.metaServiceHttpAddress
if (ms) {
endpoint ms.host+':'+ms.httpPort
} else {
endpoint context.config.metaServiceHttpAddress
}
uri "/MetaService/http/get_cluster?token=${token}"
body request_body
check check_func
@@ -2563,7 +2567,7 @@ class Suite implements GroovyInterceptable {
}
}

def d_node = { be_unique_id, ip, port, cluster_name, cluster_id ->
def d_node = { be_unique_id, ip, port, cluster_name, cluster_id, MetaService ms=null ->
def jsonOutput = new JsonOutput()
def clusterInfo = [
type: "COMPUTE",
@@ -2583,7 +2587,11 @@ class Suite implements GroovyInterceptable {

def d_cluster_api = { request_body, check_func ->
httpTest {
endpoint context.config.metaServiceHttpAddress
if (ms) {
endpoint ms.host+':'+ms.httpPort
} else {
endpoint context.config.metaServiceHttpAddress
}
uri "/MetaService/http/decommission_node?token=${token}"
body request_body
check check_func
Original file line number Diff line number Diff line change
@@ -0,0 +1,301 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import groovy.json.JsonSlurper
import groovy.json.JsonOutput
import org.apache.doris.regression.suite.ClusterOptions

suite("cloud_decommission", 'p0, docker') {
if (!isCloudMode()) {
return
}

def checkStatus = { ms, decommissionBeUniqueId, decommissionBe ->
boolean found = false
dockerAwaitUntil(100) {
found = false
def resp = get_cluster.call(decommissionBeUniqueId, ms)
resp.each { cluster ->
cluster.nodes.each { node ->
if (node.ip as String == decommissionBe.Host as String && node.heartbeat_port as Integer == decommissionBe.HeartbeatPort as Integer && node.status as String == "NODE_STATUS_DECOMMISSIONED") {
found = true
}
}
}
found
}

assertTrue(found)
}

def dropAndCheckBe = { host, heartbeatPort ->
sql """ ALTER SYSTEM DROPP BACKEND "${host}:${heartbeatPort}" """
dockerAwaitUntil(100) {
def result = sql_return_maparray """ SHOW BACKENDS """
log.info("show backends result {}", result)
def ret = result.find {it.Host == host && it.HeartbeatPort == heartbeatPort}
ret == null
}
}

def check = { Closure beforeDecommissionActionSupplier, Closure afterDecommissionActionSupplier, int beNum ->
def begin = System.currentTimeMillis()
setFeConfig("cloud_balance_tablet_percent_per_run", 0.5)

// in docker,be's cluster name
sql """ use @compute_cluster """

def result = sql """ ADMIN SHOW REPLICA DISTRIBUTION FROM decommission_table """
assertEquals(result.size(), beNum)
dockerAwaitUntil(100) {
result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM decommission_table """
if (beNum == 3) {
result.every { Integer.valueOf((String) it.ReplicaNum) >= 15 && Integer.valueOf((String) it.ReplicaNum) <= 17 }
} else {
// beNum == 2
result.every { Integer.valueOf((String) it.ReplicaNum) >= 23 && Integer.valueOf((String) it.ReplicaNum) <= 25 }
}
}

if (beforeDecommissionActionSupplier) {
beforeDecommissionActionSupplier()
}
// idx = 1, http decommission one be
def decommissionBeFirstIdx = 1
def decommissionBeFirst = cluster.getBeByIndex(decommissionBeFirstIdx)
log.info(" decommissionBeFirst: {} ", decommissionBeFirst.host)
def showBes = sql_return_maparray """SHOW BACKENDS"""
log.info(" showBes: {} ", showBes)
def firstDecommissionBe = showBes.find {
it.Host == decommissionBeFirst.host
}
assertNotNull(firstDecommissionBe)
log.info("first decommission be {}", firstDecommissionBe)
def jsonSlurper = new JsonSlurper()
def jsonObject = jsonSlurper.parseText(firstDecommissionBe.Tag)
String firstDecommissionBeCloudClusterId = jsonObject.compute_group_id
String firstDecommissionBeUniqueId = jsonObject.cloud_unique_id
String firstDecommissionBeClusterName = jsonObject.compute_group_name

def ms = cluster.getAllMetaservices().get(0)
logger.info("ms addr={}, port={}", ms.host, ms.httpPort)
d_node.call(firstDecommissionBeUniqueId, firstDecommissionBe.Host, firstDecommissionBe.HeartbeatPort,
firstDecommissionBeClusterName, firstDecommissionBeCloudClusterId, ms)

dockerAwaitUntil(100) {
result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM decommission_table """
result.any { Integer.valueOf((String) it.ReplicaNum) == 0 }
}

checkStatus(ms, firstDecommissionBeUniqueId, firstDecommissionBe)

result = sql """ ADMIN SHOW REPLICA DISTRIBUTION FROM decommission_table """
assertEquals(result.size(), beNum)
for (row : result) {
log.info("replica distribution: ${row} ".toString())
}
if (afterDecommissionActionSupplier) {
afterDecommissionActionSupplier()
}

// Drop the selected backend
dropAndCheckBe(firstDecommissionBe.Host, firstDecommissionBe.HeartbeatPort)

if (beforeDecommissionActionSupplier) {
beforeDecommissionActionSupplier()
}
// idx = 2, sql node decommission one be
def decommissionBeSecondIdx = 2
def secondDecommissionBe = cluster.getBeByIndex(decommissionBeSecondIdx)

// Decommission the selected backend
sql """ ALTER SYSTEM DECOMMISSION BACKEND "$secondDecommissionBe.Host:$secondDecommissionBe.HeartbeatPort" """

result = sql """ ADMIN SHOW REPLICA DISTRIBUTION FROM decommission_table """
assertEquals(result.size(), beNum - 1)

dockerAwaitUntil(100) {
result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM decommission_table """
log.info("show replica result {}", result)
def ret = result.findAll { Integer.valueOf((String) it.ReplicaNum) == 0 }
log.info("ret {}", ret)
// be has been droped
ret.size() == beNum - 2
}
def secondDecommissionBeUniqueId = firstDecommissionBeUniqueId
checkStatus(ms, secondDecommissionBeUniqueId, secondDecommissionBe)

for (row : result) {
log.info("replica distribution: ${row} ".toString())
}
if (afterDecommissionActionSupplier) {
afterDecommissionActionSupplier()
}
// Drop the selected backend
dropAndCheckBe(secondDecommissionBe.Host, secondDecommissionBe.HeartbeatPort)

def showComputeGroup = sql_return_maparray """ SHOW COMPUTE GROUPS """
log.info("show compute group {}", showComputeGroup)
// when 2 bes are decommissioned, the compute group will be deleted
def bes = sql_return_maparray """ SHOW BACKENDS """
if (bes.size() == 0) {
assertEquals(0, showComputeGroup.size())
}

System.currentTimeMillis() - begin
}

def checkDifferentAction = { Closure beforeDecommissionActionSupplier, Closure afterDecommissionActionSupplier, int atLeastCost, int waitTime, int beNum ->
def begin = System.currentTimeMillis()
def cost = check.call(beforeDecommissionActionSupplier, afterDecommissionActionSupplier, beNum)
log.info("in check, inner cost {}", cost)
cost = System.currentTimeMillis() - begin
log.info("in check, outter cost {}", cost)
assertTrue(waitTime > atLeastCost)
// decommission 2 bes
assertTrue(cost >= 2 * waitTime)
cost
}

def checkWal = { int atLeastCost, int beNum ->
def futrue = null
// 25s
def waitTime = 25 * 1000

Closure beforeClosure = {
log.info("before wal closure")
GetDebugPoint().enableDebugPointForAllFEs("FE.GET_ALL_WAL_QUEUE_SIZE", [value:5])
futrue = thread {
Thread.sleep(waitTime)
cluster.clearFrontendDebugPoints()
}
} as Closure

Closure afterClosure = {
log.info("after wal closure")
assertNotNull(futrue)
futrue.get()
} as Closure

checkDifferentAction(beforeClosure, afterClosure, atLeastCost as int, waitTime as int, beNum)
}

def checkTxnNotFinish = { int atLeastCost, int beNum ->
def futrue = null
def waitTime = 30 * 1000
// check txn not finish
Closure beforeClosure = {
log.info("before insert closure")
// after waitTime insert finish
futrue = thread {
// insert waitTime seconds
for (int i = 1; i <= waitTime / 1000; i++) {
Thread.sleep(1 * 1000)
sql """insert into decommission_table values ($i + 1, $i * 2, $i + 3)"""
}
}
}
Closure afterClosure = { ->
log.info("after insert closure")
assertNotNull(futrue)
futrue.get()
}
checkDifferentAction(beforeClosure, afterClosure, atLeastCost as int, waitTime as int, beNum)
}

def createTestTable = { ->
sql """
CREATE TABLE decommission_table (
class INT,
id INT,
score INT SUM
)
AGGREGATE KEY(class, id)
DISTRIBUTED BY HASH(class) BUCKETS 48
"""
}

def checkFENormalAfterRestart = {
cluster.restartFrontends()
def reconnectFe = {
sleep(5000)
logger.info("Reconnecting to a new frontend...")
def newFe = cluster.getMasterFe()
if (newFe) {
logger.info("New frontend found: ${newFe.host}:${newFe.httpPort}")
def url = String.format(
"jdbc:mysql://%s:%s/?useLocalSessionState=true&allowLoadLocalInfile=false",
newFe.host, newFe.queryPort)
url = context.config.buildUrlWithDb(url, context.dbName)
context.connectTo(url, context.config.jdbcUser, context.config.jdbcPassword)
logger.info("Successfully reconnected to the new frontend")
} else {
logger.error("No new frontend found to reconnect")
}
}
reconnectFe()
def ret = sql """show frontends;"""
assertEquals(2, ret.size())
}

def clusterOptions = [
new ClusterOptions(),
new ClusterOptions(),
]

for (int i = 0; i < clusterOptions.size(); i++) {
log.info("begin {} step", i + 1)
clusterOptions[i].feConfigs += [
'sys_log_verbose_modules=org',
'heartbeat_interval_second=1',
'cloud_tablet_rebalancer_interval_second=1',
'cloud_cluster_check_interval_second=1'
]
clusterOptions[i].setFeNum(2)
// cluster has 3 bes
// cluster has 2 bes, after decommission 2 nodes, and drop 2 nodes, compute group name will be delete from fe
int beNum = i == 0 ? 3 : 2
clusterOptions[i].setBeNum(beNum)
clusterOptions[i].cloudMode = true
// clusterOptions[i].connectToFollower = true
clusterOptions[i].enableDebugPoints()

def noWalCheckCost = 0
docker(clusterOptions[i]) {
createTestTable.call()
noWalCheckCost = check(null, null, beNum)
checkFENormalAfterRestart.call()
}
log.info("no wal check cost {}", noWalCheckCost)

def walCheckCost = 0
docker(clusterOptions[i]) {
createTestTable.call()
walCheckCost = checkWal(noWalCheckCost as int, beNum as int)
checkFENormalAfterRestart.call()
}
log.info("wal check cost {}", walCheckCost)

def txnCheckCost = 0
docker(clusterOptions[i]) {
createTestTable.call()
txnCheckCost = checkTxnNotFinish(noWalCheckCost as int, beNum as int)
checkFENormalAfterRestart.call()
}
log.info("txn check cost {}", txnCheckCost)
log.info("finish {} step", i + 1)
}
}