diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java index b6756fb5cdf361..fb0803bed1702a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java @@ -185,9 +185,22 @@ private void updateStatus(List currentBes, List expec } catch (UserException e) { LOG.warn("failed to register water shed txn id, decommission be {}", be.getId(), e); } - be.setDecommissioned(true); + be.setDecommissioning(true); } } + + if (status == Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONED) { + // When the synchronization status of the node is "NODE_STATUS_DECOMMISSIONED", + // it indicates that the conditions for decommissioning have + // already been checked in CloudTabletRebalancer.java, + // such as the tablets having been successfully migrated and no remnants of WAL on the backend (BE). + if (!be.isDecommissioned()) { + LOG.warn("impossible status, somewhere has bug, backend: {} status: {}", be, status); + } + be.setDecommissioned(true); + // edit log + Env.getCurrentEnv().getEditLog().logBackendStateChange(be); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java index 8e5033470b0e90..2bd6d8e02567c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java @@ -483,46 +483,53 @@ public void checkDecommissionState(Map> clusterToBes) { LOG.info("backend {} not found", beId); continue; } - if ((backend.isDecommissioned() && tabletNum == 0 && !backend.isActive()) - || (backend.isDecommissioned() && beList.size() == 1)) { - LOG.info("check decommission be {} state {} tabletNum {} isActive {} beList {}", - backend.getId(), backend.isDecommissioned(), tabletNum, backend.isActive(), beList); - if (!beToDecommissionedTime.containsKey(beId)) { - LOG.info("prepare to notify meta service be {} decommissioned", backend.getId()); - Cloud.AlterClusterRequest.Builder builder = - Cloud.AlterClusterRequest.newBuilder(); - builder.setCloudUniqueId(Config.cloud_unique_id); - builder.setOp(Cloud.AlterClusterRequest.Operation.NOTIFY_DECOMMISSIONED); - - Cloud.ClusterPB.Builder clusterBuilder = - Cloud.ClusterPB.newBuilder(); - clusterBuilder.setClusterName(backend.getCloudClusterName()); - clusterBuilder.setClusterId(backend.getCloudClusterId()); - clusterBuilder.setType(Cloud.ClusterPB.Type.COMPUTE); - - Cloud.NodeInfoPB.Builder nodeBuilder = Cloud.NodeInfoPB.newBuilder(); - nodeBuilder.setIp(backend.getHost()); - nodeBuilder.setHeartbeatPort(backend.getHeartbeatPort()); - nodeBuilder.setCloudUniqueId(backend.getCloudUniqueId()); - nodeBuilder.setStatus(Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONED); - - clusterBuilder.addNodes(nodeBuilder); - builder.setCluster(clusterBuilder); - - Cloud.AlterClusterResponse response; - try { - response = MetaServiceProxy.getInstance().alterCluster(builder.build()); - if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { - LOG.warn("notify decommission response: {}", response); - } - LOG.info("notify decommission response: {} ", response); - } catch (RpcException e) { - LOG.info("failed to notify decommission", e); - return; - } - beToDecommissionedTime.put(beId, System.currentTimeMillis() / 1000); + if (!backend.isDecommissioning()) { + continue; + } + // here check wal + long walNum = Env.getCurrentEnv().getGroupCommitManager().getAllWalQueueSize(backend); + LOG.info("check decommissioning be {} state {} tabletNum {} isActive {} beList {}, wal num {}", + backend.getId(), backend.isDecommissioning(), tabletNum, backend.isActive(), beList, walNum); + if ((tabletNum != 0 || backend.isActive() || walNum != 0) && beList.size() != 1) { + continue; + } + if (beToDecommissionedTime.containsKey(beId)) { + continue; + } + LOG.info("prepare to notify meta service be {} decommissioned", backend.getAddress()); + Cloud.AlterClusterRequest.Builder builder = + Cloud.AlterClusterRequest.newBuilder(); + builder.setCloudUniqueId(Config.cloud_unique_id); + builder.setOp(Cloud.AlterClusterRequest.Operation.NOTIFY_DECOMMISSIONED); + + Cloud.ClusterPB.Builder clusterBuilder = + Cloud.ClusterPB.newBuilder(); + clusterBuilder.setClusterName(backend.getCloudClusterName()); + clusterBuilder.setClusterId(backend.getCloudClusterId()); + clusterBuilder.setType(Cloud.ClusterPB.Type.COMPUTE); + + Cloud.NodeInfoPB.Builder nodeBuilder = Cloud.NodeInfoPB.newBuilder(); + nodeBuilder.setIp(backend.getHost()); + nodeBuilder.setHeartbeatPort(backend.getHeartbeatPort()); + nodeBuilder.setCloudUniqueId(backend.getCloudUniqueId()); + nodeBuilder.setStatus(Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONED); + + clusterBuilder.addNodes(nodeBuilder); + builder.setCluster(clusterBuilder); + + Cloud.AlterClusterResponse response; + try { + response = MetaServiceProxy.getInstance().alterCluster(builder.build()); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { + LOG.warn("notify decommission response: {}", response); + continue; } + LOG.info("notify decommission response: {} ", response); + } catch (RpcException e) { + LOG.warn("failed to notify decommission", e); + continue; } + beToDecommissionedTime.put(beId, System.currentTimeMillis() / 1000); } } } @@ -884,7 +891,7 @@ private boolean getTransferPair(List bes, Map> beToTabl LOG.info("backend {} not found", be); continue; } - if (tabletNum < minTabletsNum && backend.isAlive() && !backend.isDecommissioned() + if (tabletNum < minTabletsNum && backend.isAlive() && !backend.isDecommissioning() && !backend.isSmoothUpgradeSrc()) { destBe = be; minTabletsNum = tabletNum; @@ -898,7 +905,7 @@ private boolean getTransferPair(List bes, Map> beToTabl LOG.info("backend {} not found", be); continue; } - if (backend.isDecommissioned() && tabletNum > 0) { + if (backend.isDecommissioning() && tabletNum > 0) { srcBe = be; srcDecommissioned = true; break; @@ -967,7 +974,7 @@ private void balanceImpl(List bes, String clusterId, Map aliveBeIds) { } public long getAllWalQueueSize(Backend backend) { + long getAllWalQueueSizeDP = DebugPointUtil.getDebugParamOrDefault("FE.GET_ALL_WAL_QUEUE_SIZE", -1L); + if (getAllWalQueueSizeDP > 0) { + LOG.info("backend id:" + backend.getHost() + ",use dp all wal size:" + getAllWalQueueSizeDP); + return getAllWalQueueSizeDP; + } PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder() .setTableId(-1) .build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index c864e1ba2ae0ba..0bd8377bddbea9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -97,6 +97,8 @@ public class Backend implements Writable { @SerializedName("isDecommissioned") private AtomicBoolean isDecommissioned; + private AtomicBoolean isDecommissioning = new AtomicBoolean(false); + // rootPath -> DiskInfo @SerializedName("disksRef") private volatile ImmutableMap disksRef; @@ -404,6 +406,14 @@ public boolean setDecommissioned(boolean isDecommissioned) { return false; } + public boolean setDecommissioning(boolean isDecommissioning) { + if (this.isDecommissioning.compareAndSet(!isDecommissioning, isDecommissioning)) { + LOG.warn("{} set decommissioning: {}", this.toString(), isDecommissioning); + return true; + } + return false; + } + public void setHost(String host) { this.host = host; } @@ -490,6 +500,10 @@ public boolean isDecommissioned() { return this.isDecommissioned.get(); } + public boolean isDecommissioning() { + return this.isDecommissioning.get(); + } + public boolean isQueryAvailable() { return isAlive() && !isQueryDisabled() && !isShutDown.get(); } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 9077223288f0a9..c21db5eacbbbea 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -2492,7 +2492,7 @@ class Suite implements GroovyInterceptable { } } - def get_cluster = { be_unique_id -> + def get_cluster = { be_unique_id , MetaService ms=null-> def jsonOutput = new JsonOutput() def map = [instance_id: "${instance_id}", cloud_unique_id: "${be_unique_id}" ] def js = jsonOutput.toJson(map) @@ -2500,7 +2500,11 @@ class Suite implements GroovyInterceptable { def add_cluster_api = { request_body, check_func -> httpTest { - endpoint context.config.metaServiceHttpAddress + if (ms) { + endpoint ms.host+':'+ms.httpPort + } else { + endpoint context.config.metaServiceHttpAddress + } uri "/MetaService/http/get_cluster?token=${token}" body request_body check check_func @@ -2673,7 +2677,7 @@ class Suite implements GroovyInterceptable { } } - def d_node = { be_unique_id, ip, port, cluster_name, cluster_id -> + def d_node = { be_unique_id, ip, port, cluster_name, cluster_id, MetaService ms=null -> def jsonOutput = new JsonOutput() def clusterInfo = [ type: "COMPUTE", @@ -2693,7 +2697,11 @@ class Suite implements GroovyInterceptable { def d_cluster_api = { request_body, check_func -> httpTest { - endpoint context.config.metaServiceHttpAddress + if (ms) { + endpoint ms.host+':'+ms.httpPort + } else { + endpoint context.config.metaServiceHttpAddress + } uri "/MetaService/http/decommission_node?token=${token}" body request_body check check_func diff --git a/regression-test/suites/cloud_p0/node_mgr/test_cloud_decommission.groovy b/regression-test/suites/cloud_p0/node_mgr/test_cloud_decommission.groovy new file mode 100644 index 00000000000000..a8173108f0b951 --- /dev/null +++ b/regression-test/suites/cloud_p0/node_mgr/test_cloud_decommission.groovy @@ -0,0 +1,301 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +import groovy.json.JsonSlurper +import groovy.json.JsonOutput +import org.apache.doris.regression.suite.ClusterOptions + +suite("cloud_decommission", 'p0, docker') { + if (!isCloudMode()) { + return + } + + def checkStatus = { ms, decommissionBeUniqueId, decommissionBe -> + boolean found = false + dockerAwaitUntil(100) { + found = false + def resp = get_cluster.call(decommissionBeUniqueId, ms) + resp.each { cluster -> + cluster.nodes.each { node -> + if (node.ip as String == decommissionBe.Host as String && node.heartbeat_port as Integer == decommissionBe.HeartbeatPort as Integer && node.status as String == "NODE_STATUS_DECOMMISSIONED") { + found = true + } + } + } + found + } + + assertTrue(found) + } + + def dropAndCheckBe = { host, heartbeatPort -> + sql """ ALTER SYSTEM DROPP BACKEND "${host}:${heartbeatPort}" """ + dockerAwaitUntil(100) { + def result = sql_return_maparray """ SHOW BACKENDS """ + log.info("show backends result {}", result) + def ret = result.find {it.Host == host && it.HeartbeatPort == heartbeatPort} + ret == null + } + } + + def check = { Closure beforeDecommissionActionSupplier, Closure afterDecommissionActionSupplier, int beNum -> + def begin = System.currentTimeMillis() + setFeConfig("cloud_balance_tablet_percent_per_run", 0.5) + + // in docker,be's cluster name + sql """ use @compute_cluster """ + + def result = sql """ ADMIN SHOW REPLICA DISTRIBUTION FROM decommission_table """ + assertEquals(result.size(), beNum) + dockerAwaitUntil(100) { + result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM decommission_table """ + if (beNum == 3) { + result.every { Integer.valueOf((String) it.ReplicaNum) >= 15 && Integer.valueOf((String) it.ReplicaNum) <= 17 } + } else { + // beNum == 2 + result.every { Integer.valueOf((String) it.ReplicaNum) >= 23 && Integer.valueOf((String) it.ReplicaNum) <= 25 } + } + } + + if (beforeDecommissionActionSupplier) { + beforeDecommissionActionSupplier() + } + // idx = 1, http decommission one be + def decommissionBeFirstIdx = 1 + def decommissionBeFirst = cluster.getBeByIndex(decommissionBeFirstIdx) + log.info(" decommissionBeFirst: {} ", decommissionBeFirst.host) + def showBes = sql_return_maparray """SHOW BACKENDS""" + log.info(" showBes: {} ", showBes) + def firstDecommissionBe = showBes.find { + it.Host == decommissionBeFirst.host + } + assertNotNull(firstDecommissionBe) + log.info("first decommission be {}", firstDecommissionBe) + def jsonSlurper = new JsonSlurper() + def jsonObject = jsonSlurper.parseText(firstDecommissionBe.Tag) + String firstDecommissionBeCloudClusterId = jsonObject.compute_group_id + String firstDecommissionBeUniqueId = jsonObject.cloud_unique_id + String firstDecommissionBeClusterName = jsonObject.compute_group_name + + def ms = cluster.getAllMetaservices().get(0) + logger.info("ms addr={}, port={}", ms.host, ms.httpPort) + d_node.call(firstDecommissionBeUniqueId, firstDecommissionBe.Host, firstDecommissionBe.HeartbeatPort, + firstDecommissionBeClusterName, firstDecommissionBeCloudClusterId, ms) + + dockerAwaitUntil(100) { + result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM decommission_table """ + result.any { Integer.valueOf((String) it.ReplicaNum) == 0 } + } + + checkStatus(ms, firstDecommissionBeUniqueId, firstDecommissionBe) + + result = sql """ ADMIN SHOW REPLICA DISTRIBUTION FROM decommission_table """ + assertEquals(result.size(), beNum) + for (row : result) { + log.info("replica distribution: ${row} ".toString()) + } + if (afterDecommissionActionSupplier) { + afterDecommissionActionSupplier() + } + + // Drop the selected backend + dropAndCheckBe(firstDecommissionBe.Host, firstDecommissionBe.HeartbeatPort) + + if (beforeDecommissionActionSupplier) { + beforeDecommissionActionSupplier() + } + // idx = 2, sql node decommission one be + def decommissionBeSecondIdx = 2 + def secondDecommissionBe = cluster.getBeByIndex(decommissionBeSecondIdx) + + // Decommission the selected backend + sql """ ALTER SYSTEM DECOMMISSION BACKEND "$secondDecommissionBe.Host:$secondDecommissionBe.HeartbeatPort" """ + + result = sql """ ADMIN SHOW REPLICA DISTRIBUTION FROM decommission_table """ + assertEquals(result.size(), beNum - 1) + + dockerAwaitUntil(100) { + result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM decommission_table """ + log.info("show replica result {}", result) + def ret = result.findAll { Integer.valueOf((String) it.ReplicaNum) == 0 } + log.info("ret {}", ret) + // be has been droped + ret.size() == beNum - 2 + } + def secondDecommissionBeUniqueId = firstDecommissionBeUniqueId + checkStatus(ms, secondDecommissionBeUniqueId, secondDecommissionBe) + + for (row : result) { + log.info("replica distribution: ${row} ".toString()) + } + if (afterDecommissionActionSupplier) { + afterDecommissionActionSupplier() + } + // Drop the selected backend + dropAndCheckBe(secondDecommissionBe.Host, secondDecommissionBe.HeartbeatPort) + + def showComputeGroup = sql_return_maparray """ SHOW COMPUTE GROUPS """ + log.info("show compute group {}", showComputeGroup) + // when 2 bes are decommissioned, the compute group will be deleted + def bes = sql_return_maparray """ SHOW BACKENDS """ + if (bes.size() == 0) { + assertEquals(0, showComputeGroup.size()) + } + + System.currentTimeMillis() - begin + } + + def checkDifferentAction = { Closure beforeDecommissionActionSupplier, Closure afterDecommissionActionSupplier, int atLeastCost, int waitTime, int beNum -> + def begin = System.currentTimeMillis() + def cost = check.call(beforeDecommissionActionSupplier, afterDecommissionActionSupplier, beNum) + log.info("in check, inner cost {}", cost) + cost = System.currentTimeMillis() - begin + log.info("in check, outter cost {}", cost) + assertTrue(waitTime > atLeastCost) + // decommission 2 bes + assertTrue(cost >= 2 * waitTime) + cost + } + + def checkWal = { int atLeastCost, int beNum -> + def futrue = null + // 25s + def waitTime = 25 * 1000 + + Closure beforeClosure = { + log.info("before wal closure") + GetDebugPoint().enableDebugPointForAllFEs("FE.GET_ALL_WAL_QUEUE_SIZE", [value:5]) + futrue = thread { + Thread.sleep(waitTime) + cluster.clearFrontendDebugPoints() + } + } as Closure + + Closure afterClosure = { + log.info("after wal closure") + assertNotNull(futrue) + futrue.get() + } as Closure + + checkDifferentAction(beforeClosure, afterClosure, atLeastCost as int, waitTime as int, beNum) + } + + def checkTxnNotFinish = { int atLeastCost, int beNum -> + def futrue = null + def waitTime = 30 * 1000 + // check txn not finish + Closure beforeClosure = { + log.info("before insert closure") + // after waitTime insert finish + futrue = thread { + // insert waitTime seconds + for (int i = 1; i <= waitTime / 1000; i++) { + Thread.sleep(1 * 1000) + sql """insert into decommission_table values ($i + 1, $i * 2, $i + 3)""" + } + } + } + Closure afterClosure = { -> + log.info("after insert closure") + assertNotNull(futrue) + futrue.get() + } + checkDifferentAction(beforeClosure, afterClosure, atLeastCost as int, waitTime as int, beNum) + } + + def createTestTable = { -> + sql """ + CREATE TABLE decommission_table ( + class INT, + id INT, + score INT SUM + ) + AGGREGATE KEY(class, id) + DISTRIBUTED BY HASH(class) BUCKETS 48 + """ + } + + def checkFENormalAfterRestart = { + cluster.restartFrontends() + def reconnectFe = { + sleep(5000) + logger.info("Reconnecting to a new frontend...") + def newFe = cluster.getMasterFe() + if (newFe) { + logger.info("New frontend found: ${newFe.host}:${newFe.httpPort}") + def url = String.format( + "jdbc:mysql://%s:%s/?useLocalSessionState=true&allowLoadLocalInfile=false", + newFe.host, newFe.queryPort) + url = context.config.buildUrlWithDb(url, context.dbName) + context.connectTo(url, context.config.jdbcUser, context.config.jdbcPassword) + logger.info("Successfully reconnected to the new frontend") + } else { + logger.error("No new frontend found to reconnect") + } + } + reconnectFe() + def ret = sql """show frontends;""" + assertEquals(2, ret.size()) + } + + def clusterOptions = [ + new ClusterOptions(), + new ClusterOptions(), + ] + + for (int i = 0; i < clusterOptions.size(); i++) { + log.info("begin {} step", i + 1) + clusterOptions[i].feConfigs += [ + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'cloud_cluster_check_interval_second=1' + ] + clusterOptions[i].setFeNum(2) + // cluster has 3 bes + // cluster has 2 bes, after decommission 2 nodes, and drop 2 nodes, compute group name will be delete from fe + int beNum = i == 0 ? 3 : 2 + clusterOptions[i].setBeNum(beNum) + clusterOptions[i].cloudMode = true + // clusterOptions[i].connectToFollower = true + clusterOptions[i].enableDebugPoints() + + def noWalCheckCost = 0 + docker(clusterOptions[i]) { + createTestTable.call() + noWalCheckCost = check(null, null, beNum) + checkFENormalAfterRestart.call() + } + log.info("no wal check cost {}", noWalCheckCost) + + def walCheckCost = 0 + docker(clusterOptions[i]) { + createTestTable.call() + walCheckCost = checkWal(noWalCheckCost as int, beNum as int) + checkFENormalAfterRestart.call() + } + log.info("wal check cost {}", walCheckCost) + + def txnCheckCost = 0 + docker(clusterOptions[i]) { + createTestTable.call() + txnCheckCost = checkTxnNotFinish(noWalCheckCost as int, beNum as int) + checkFENormalAfterRestart.call() + } + log.info("txn check cost {}", txnCheckCost) + log.info("finish {} step", i + 1) + } +}