From cec6f16b427c978d80868cfac548e3e522f848f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?yongyiduan=28=E6=AE=B5=E6=B0=B8=E5=84=84=29?= Date: Wed, 1 Nov 2023 16:33:30 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=9E=84=E5=BB=BA=E5=88=86=E7=BB=84?= =?UTF-8?q?=E5=B9=B6=E5=8F=91=E6=97=B6=E4=BC=98=E5=8C=96=20#9618?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../engine/interceptor/QueueInterceptor.kt | 64 +----------------- .../engine/service/PipelineRuntimeService.kt | 67 ++++++++++++++++++- .../engine/control/BuildStartControl.kt | 22 +++++- 3 files changed, 86 insertions(+), 67 deletions(-) diff --git a/src/backend/ci/core/process/biz-base/src/main/kotlin/com/tencent/devops/process/engine/interceptor/QueueInterceptor.kt b/src/backend/ci/core/process/biz-base/src/main/kotlin/com/tencent/devops/process/engine/interceptor/QueueInterceptor.kt index a0bd90f0e0b..5edd4f613ec 100644 --- a/src/backend/ci/core/process/biz-base/src/main/kotlin/com/tencent/devops/process/engine/interceptor/QueueInterceptor.kt +++ b/src/backend/ci/core/process/biz-base/src/main/kotlin/com/tencent/devops/process/engine/interceptor/QueueInterceptor.kt @@ -37,8 +37,6 @@ import com.tencent.devops.process.bean.PipelineUrlBean import com.tencent.devops.process.constant.ProcessMessageCode.BK_MAX_PARALLEL import com.tencent.devops.process.constant.ProcessMessageCode.ERROR_PIPELINE_QUEUE_FULL import com.tencent.devops.process.constant.ProcessMessageCode.ERROR_PIPELINE_SUMMARY_NOT_FOUND -import com.tencent.devops.process.engine.common.Timeout -import com.tencent.devops.process.engine.control.lock.BuildIdLock import com.tencent.devops.process.engine.control.lock.PipelineNextQueueLock import com.tencent.devops.process.engine.pojo.Response import com.tencent.devops.process.engine.pojo.event.PipelineBuildCancelEvent @@ -47,8 +45,6 @@ import com.tencent.devops.process.engine.service.PipelineRuntimeExtService import com.tencent.devops.process.engine.service.PipelineRuntimeService import com.tencent.devops.process.engine.service.PipelineTaskService import com.tencent.devops.process.pojo.setting.PipelineRunLockType -import com.tencent.devops.process.util.TaskUtils -import java.util.concurrent.TimeUnit import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Component @@ -285,7 +281,7 @@ class QueueInterceptor @Autowired constructor( ) } builds.forEach { (pipelineId, buildId) -> - cancelBuildPipeline( + pipelineRuntimeService.concurrencyCancelBuildPipeline( projectId = projectId, pipelineId = pipelineId, buildId = buildId, @@ -317,62 +313,4 @@ class QueueInterceptor @Autowired constructor( Response(data = BuildStatus.RUNNING) } } - - private fun cancelBuildPipeline( - projectId: String, - pipelineId: String, - buildId: String, - userId: String, - groupName: String, - detailUrl: String - ) { - val redisLock = BuildIdLock(redisOperation = redisOperation, buildId = buildId) - try { - redisLock.lock() - val buildInfo = pipelineRuntimeService.getBuildInfo(projectId, pipelineId, buildId) - val tasks = pipelineTaskService.getRunningTask(projectId, buildId) - tasks.forEach { task -> - val taskId = task["taskId"]?.toString() ?: "" - logger.info("build($buildId) shutdown by $userId, taskId: $taskId, status: ${task["status"] ?: ""}") - val containerId = task["containerId"]?.toString() ?: "" - // #7599 兼容短时间取消状态异常优化 - val cancelTaskSetKey = TaskUtils.getCancelTaskIdRedisKey(buildId, containerId, false) - redisOperation.addSetValue(cancelTaskSetKey, taskId) - redisOperation.expire(cancelTaskSetKey, TimeUnit.DAYS.toSeconds(Timeout.MAX_JOB_RUN_DAYS)) - buildLogPrinter.addYellowLine( - buildId = buildId, - message = "[concurrency] Canceling since " + - "a higher priority waiting request for group($groupName) exists", - tag = taskId, - jobId = task["containerId"]?.toString() ?: "", - executeCount = task["executeCount"] as? Int ?: 1 - ) - } - if (tasks.isEmpty()) { - buildLogPrinter.addRedLine( - buildId = buildId, - message = "[concurrency] Canceling all since " + - "a higher priority waiting request for group($groupName) exists", - tag = "QueueInterceptor", - jobId = "", - executeCount = 1 - ) - } - try { - pipelineRuntimeService.cancelBuild( - projectId = projectId, - pipelineId = pipelineId, - buildId = buildId, - userId = userId, - executeCount = buildInfo?.executeCount ?: 1, - buildStatus = BuildStatus.CANCELED - ) - logger.info("Cancel the pipeline($pipelineId) of instance($buildId) by the user($userId)") - } catch (t: Throwable) { - logger.warn("Fail to shutdown the build($buildId) of pipeline($pipelineId)", t) - } - } finally { - redisLock.unlock() - } - } } diff --git a/src/backend/ci/core/process/biz-base/src/main/kotlin/com/tencent/devops/process/engine/service/PipelineRuntimeService.kt b/src/backend/ci/core/process/biz-base/src/main/kotlin/com/tencent/devops/process/engine/service/PipelineRuntimeService.kt index e0a09dee461..cfb9a5904d2 100644 --- a/src/backend/ci/core/process/biz-base/src/main/kotlin/com/tencent/devops/process/engine/service/PipelineRuntimeService.kt +++ b/src/backend/ci/core/process/biz-base/src/main/kotlin/com/tencent/devops/process/engine/service/PipelineRuntimeService.kt @@ -80,6 +80,7 @@ import com.tencent.devops.process.engine.common.BS_MANUAL_ACTION_PARAMS import com.tencent.devops.process.engine.common.BS_MANUAL_ACTION_SUGGEST import com.tencent.devops.process.engine.common.BS_MANUAL_ACTION_USERID import com.tencent.devops.process.engine.common.Timeout +import com.tencent.devops.process.engine.control.lock.BuildIdLock import com.tencent.devops.process.engine.control.lock.PipelineBuildNumAliasLock import com.tencent.devops.process.engine.dao.PipelineBuildDao import com.tencent.devops.process.engine.dao.PipelineBuildSummaryDao @@ -127,6 +128,7 @@ import com.tencent.devops.process.pojo.pipeline.record.BuildRecordTask.Companion import com.tencent.devops.process.service.BuildVariableService import com.tencent.devops.process.service.StageTagService import com.tencent.devops.process.util.BuildMsgUtils +import com.tencent.devops.process.util.TaskUtils import com.tencent.devops.process.utils.DependOnUtils import com.tencent.devops.process.utils.PIPELINE_BUILD_NUM import com.tencent.devops.process.utils.PIPELINE_BUILD_NUM_ALIAS @@ -135,15 +137,16 @@ import com.tencent.devops.process.utils.PIPELINE_NAME import com.tencent.devops.process.utils.PIPELINE_RETRY_COUNT import com.tencent.devops.process.utils.PIPELINE_START_TASK_ID import com.tencent.devops.process.utils.PipelineVarUtil +import java.time.Duration +import java.time.LocalDateTime +import java.util.Date +import java.util.concurrent.TimeUnit import org.jooq.DSLContext import org.jooq.Result import org.jooq.impl.DSL import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service -import java.time.Duration -import java.time.LocalDateTime -import java.util.Date /** * 流水线运行时相关的服务 @@ -1854,4 +1857,62 @@ class PipelineRuntimeService @Autowired constructor( buildParameters = buildParameters ) } + + fun concurrencyCancelBuildPipeline( + projectId: String, + pipelineId: String, + buildId: String, + userId: String, + groupName: String, + detailUrl: String + ) { + val redisLock = BuildIdLock(redisOperation = redisOperation, buildId = buildId) + try { + redisLock.lock() + val buildInfo = getBuildInfo(projectId, pipelineId, buildId) + val tasks = pipelineTaskService.getRunningTask(projectId, buildId) + tasks.forEach { task -> + val taskId = task["taskId"]?.toString() ?: "" + logger.info("build($buildId) shutdown by $userId, taskId: $taskId, status: ${task["status"] ?: ""}") + val containerId = task["containerId"]?.toString() ?: "" + // #7599 兼容短时间取消状态异常优化 + val cancelTaskSetKey = TaskUtils.getCancelTaskIdRedisKey(buildId, containerId, false) + redisOperation.addSetValue(cancelTaskSetKey, taskId) + redisOperation.expire(cancelTaskSetKey, TimeUnit.DAYS.toSeconds(Timeout.MAX_JOB_RUN_DAYS)) + buildLogPrinter.addYellowLine( + buildId = buildId, + message = "[concurrency] Canceling since " + + "a higher priority waiting request for group($groupName) exists", + tag = taskId, + jobId = task["containerId"]?.toString() ?: "", + executeCount = task["executeCount"] as? Int ?: 1 + ) + } + if (tasks.isEmpty()) { + buildLogPrinter.addRedLine( + buildId = buildId, + message = "[concurrency] Canceling all since " + + "a higher priority waiting request for group($groupName) exists", + tag = "QueueInterceptor", + jobId = "", + executeCount = 1 + ) + } + try { + cancelBuild( + projectId = projectId, + pipelineId = pipelineId, + buildId = buildId, + userId = userId, + executeCount = buildInfo?.executeCount ?: 1, + buildStatus = BuildStatus.CANCELED + ) + logger.info("Cancel the pipeline($pipelineId) of instance($buildId) by the user($userId)") + } catch (t: Throwable) { + logger.warn("Fail to shutdown the build($buildId) of pipeline($pipelineId)", t) + } + } finally { + redisLock.unlock() + } + } } diff --git a/src/backend/ci/core/process/biz-engine/src/main/kotlin/com/tencent/devops/process/engine/control/BuildStartControl.kt b/src/backend/ci/core/process/biz-engine/src/main/kotlin/com/tencent/devops/process/engine/control/BuildStartControl.kt index 297488e244f..39194597601 100644 --- a/src/backend/ci/core/process/biz-engine/src/main/kotlin/com/tencent/devops/process/engine/control/BuildStartControl.kt +++ b/src/backend/ci/core/process/biz-engine/src/main/kotlin/com/tencent/devops/process/engine/control/BuildStartControl.kt @@ -93,10 +93,10 @@ import com.tencent.devops.process.utils.PipelineVarUtil import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.MeterRegistry import java.time.LocalDateTime +import kotlin.math.max import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service -import kotlin.math.max /** * 构建控制器 @@ -321,6 +321,26 @@ class BuildStartControl @Autowired constructor( params = arrayOf(concurrencyGroup) ) ) + if (setting.concurrencyCancelInProgress) { + val detailUrl = pipelineUrlBean.genBuildDetailUrl( + projectCode = projectId, + pipelineId = buildInfo.pipelineId, + buildId = buildInfo.buildId, + position = null, + stageId = null, + needShortUrl = false + ) + concurrencyGroupRunning.forEach { (pipelineId, buildId) -> + pipelineRuntimeService.concurrencyCancelBuildPipeline( + projectId = projectId, + pipelineId = pipelineId, + buildId = buildId, + userId = buildInfo.startUser, + groupName = concurrencyGroup, + detailUrl = detailUrl + ) + } + } val detailUrl = pipelineUrlBean.genBuildDetailUrl( projectCode = projectId, pipelineId = concurrencyGroupRunning.first().first,