Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 构建分组并发时优化 #9618 #9635

Merged
merged 1 commit into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ import com.tencent.devops.process.bean.PipelineUrlBean
import com.tencent.devops.process.constant.ProcessMessageCode.BK_MAX_PARALLEL
import com.tencent.devops.process.constant.ProcessMessageCode.ERROR_PIPELINE_QUEUE_FULL
import com.tencent.devops.process.constant.ProcessMessageCode.ERROR_PIPELINE_SUMMARY_NOT_FOUND
import com.tencent.devops.process.engine.common.Timeout
import com.tencent.devops.process.engine.control.lock.BuildIdLock
import com.tencent.devops.process.engine.control.lock.PipelineNextQueueLock
import com.tencent.devops.process.engine.pojo.Response
import com.tencent.devops.process.engine.pojo.event.PipelineBuildCancelEvent
Expand All @@ -47,8 +45,6 @@ import com.tencent.devops.process.engine.service.PipelineRuntimeExtService
import com.tencent.devops.process.engine.service.PipelineRuntimeService
import com.tencent.devops.process.engine.service.PipelineTaskService
import com.tencent.devops.process.pojo.setting.PipelineRunLockType
import com.tencent.devops.process.util.TaskUtils
import java.util.concurrent.TimeUnit
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
Expand Down Expand Up @@ -285,7 +281,7 @@ class QueueInterceptor @Autowired constructor(
)
}
builds.forEach { (pipelineId, buildId) ->
cancelBuildPipeline(
pipelineRuntimeService.concurrencyCancelBuildPipeline(
projectId = projectId,
pipelineId = pipelineId,
buildId = buildId,
Expand Down Expand Up @@ -317,62 +313,4 @@ class QueueInterceptor @Autowired constructor(
Response(data = BuildStatus.RUNNING)
}
}

private fun cancelBuildPipeline(
projectId: String,
pipelineId: String,
buildId: String,
userId: String,
groupName: String,
detailUrl: String
) {
val redisLock = BuildIdLock(redisOperation = redisOperation, buildId = buildId)
try {
redisLock.lock()
val buildInfo = pipelineRuntimeService.getBuildInfo(projectId, pipelineId, buildId)
val tasks = pipelineTaskService.getRunningTask(projectId, buildId)
tasks.forEach { task ->
val taskId = task["taskId"]?.toString() ?: ""
logger.info("build($buildId) shutdown by $userId, taskId: $taskId, status: ${task["status"] ?: ""}")
val containerId = task["containerId"]?.toString() ?: ""
// #7599 兼容短时间取消状态异常优化
val cancelTaskSetKey = TaskUtils.getCancelTaskIdRedisKey(buildId, containerId, false)
redisOperation.addSetValue(cancelTaskSetKey, taskId)
redisOperation.expire(cancelTaskSetKey, TimeUnit.DAYS.toSeconds(Timeout.MAX_JOB_RUN_DAYS))
buildLogPrinter.addYellowLine(
buildId = buildId,
message = "[concurrency] Canceling since <a target='_blank' href='$detailUrl'>" +
"a higher priority waiting request</a> for group($groupName) exists",
tag = taskId,
jobId = task["containerId"]?.toString() ?: "",
executeCount = task["executeCount"] as? Int ?: 1
)
}
if (tasks.isEmpty()) {
buildLogPrinter.addRedLine(
buildId = buildId,
message = "[concurrency] Canceling all since <a target='_blank' href='$detailUrl'>" +
"a higher priority waiting request</a> for group($groupName) exists",
tag = "QueueInterceptor",
jobId = "",
executeCount = 1
)
}
try {
pipelineRuntimeService.cancelBuild(
projectId = projectId,
pipelineId = pipelineId,
buildId = buildId,
userId = userId,
executeCount = buildInfo?.executeCount ?: 1,
buildStatus = BuildStatus.CANCELED
)
logger.info("Cancel the pipeline($pipelineId) of instance($buildId) by the user($userId)")
} catch (t: Throwable) {
logger.warn("Fail to shutdown the build($buildId) of pipeline($pipelineId)", t)
}
} finally {
redisLock.unlock()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ import com.tencent.devops.process.engine.common.BS_MANUAL_ACTION_PARAMS
import com.tencent.devops.process.engine.common.BS_MANUAL_ACTION_SUGGEST
import com.tencent.devops.process.engine.common.BS_MANUAL_ACTION_USERID
import com.tencent.devops.process.engine.common.Timeout
import com.tencent.devops.process.engine.control.lock.BuildIdLock
import com.tencent.devops.process.engine.control.lock.PipelineBuildNumAliasLock
import com.tencent.devops.process.engine.dao.PipelineBuildDao
import com.tencent.devops.process.engine.dao.PipelineBuildSummaryDao
Expand Down Expand Up @@ -127,6 +128,7 @@ import com.tencent.devops.process.pojo.pipeline.record.BuildRecordTask.Companion
import com.tencent.devops.process.service.BuildVariableService
import com.tencent.devops.process.service.StageTagService
import com.tencent.devops.process.util.BuildMsgUtils
import com.tencent.devops.process.util.TaskUtils
import com.tencent.devops.process.utils.DependOnUtils
import com.tencent.devops.process.utils.PIPELINE_BUILD_NUM
import com.tencent.devops.process.utils.PIPELINE_BUILD_NUM_ALIAS
Expand All @@ -135,15 +137,16 @@ import com.tencent.devops.process.utils.PIPELINE_NAME
import com.tencent.devops.process.utils.PIPELINE_RETRY_COUNT
import com.tencent.devops.process.utils.PIPELINE_START_TASK_ID
import com.tencent.devops.process.utils.PipelineVarUtil
import java.time.Duration
import java.time.LocalDateTime
import java.util.Date
import java.util.concurrent.TimeUnit
import org.jooq.DSLContext
import org.jooq.Result
import org.jooq.impl.DSL
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.time.Duration
import java.time.LocalDateTime
import java.util.Date

/**
* 流水线运行时相关的服务
Expand Down Expand Up @@ -1854,4 +1857,62 @@ class PipelineRuntimeService @Autowired constructor(
buildParameters = buildParameters
)
}

fun concurrencyCancelBuildPipeline(
projectId: String,
pipelineId: String,
buildId: String,
userId: String,
groupName: String,
detailUrl: String
) {
val redisLock = BuildIdLock(redisOperation = redisOperation, buildId = buildId)
try {
redisLock.lock()
val buildInfo = getBuildInfo(projectId, pipelineId, buildId)
val tasks = pipelineTaskService.getRunningTask(projectId, buildId)
tasks.forEach { task ->
val taskId = task["taskId"]?.toString() ?: ""
logger.info("build($buildId) shutdown by $userId, taskId: $taskId, status: ${task["status"] ?: ""}")
val containerId = task["containerId"]?.toString() ?: ""
// #7599 兼容短时间取消状态异常优化
val cancelTaskSetKey = TaskUtils.getCancelTaskIdRedisKey(buildId, containerId, false)
redisOperation.addSetValue(cancelTaskSetKey, taskId)
redisOperation.expire(cancelTaskSetKey, TimeUnit.DAYS.toSeconds(Timeout.MAX_JOB_RUN_DAYS))
buildLogPrinter.addYellowLine(
buildId = buildId,
message = "[concurrency] Canceling since <a target='_blank' href='$detailUrl'>" +
"a higher priority waiting request</a> for group($groupName) exists",
tag = taskId,
jobId = task["containerId"]?.toString() ?: "",
executeCount = task["executeCount"] as? Int ?: 1
)
}
if (tasks.isEmpty()) {
buildLogPrinter.addRedLine(
buildId = buildId,
message = "[concurrency] Canceling all since <a target='_blank' href='$detailUrl'>" +
"a higher priority waiting request</a> for group($groupName) exists",
tag = "QueueInterceptor",
jobId = "",
executeCount = 1
)
}
try {
cancelBuild(
projectId = projectId,
pipelineId = pipelineId,
buildId = buildId,
userId = userId,
executeCount = buildInfo?.executeCount ?: 1,
buildStatus = BuildStatus.CANCELED
)
logger.info("Cancel the pipeline($pipelineId) of instance($buildId) by the user($userId)")
} catch (t: Throwable) {
logger.warn("Fail to shutdown the build($buildId) of pipeline($pipelineId)", t)
}
} finally {
redisLock.unlock()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ import com.tencent.devops.process.utils.PipelineVarUtil
import io.micrometer.core.instrument.Counter
import io.micrometer.core.instrument.MeterRegistry
import java.time.LocalDateTime
import kotlin.math.max
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import kotlin.math.max

/**
* 构建控制器
Expand Down Expand Up @@ -321,6 +321,26 @@ class BuildStartControl @Autowired constructor(
params = arrayOf(concurrencyGroup)
)
)
if (setting.concurrencyCancelInProgress) {
val detailUrl = pipelineUrlBean.genBuildDetailUrl(
projectCode = projectId,
pipelineId = buildInfo.pipelineId,
buildId = buildInfo.buildId,
position = null,
stageId = null,
needShortUrl = false
)
concurrencyGroupRunning.forEach { (pipelineId, buildId) ->
pipelineRuntimeService.concurrencyCancelBuildPipeline(
projectId = projectId,
pipelineId = pipelineId,
buildId = buildId,
userId = buildInfo.startUser,
groupName = concurrencyGroup,
detailUrl = detailUrl
)
}
}
val detailUrl = pipelineUrlBean.genBuildDetailUrl(
projectCode = projectId,
pipelineId = concurrencyGroupRunning.first().first,
Expand Down