Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 流水线级回调与流水线编排解耦 #11283 #11367

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,7 @@ data class ProjectPipelineCallBack(
@get:Schema(title = "回调是否启用", required = false)
val failureTime: LocalDateTime? = null,
@get:Schema(title = "凭证参数", required = false)
val secretParam: ISecretParam? = null
val secretParam: ISecretParam? = null,
@get:Schema(title = "回调名称", required = false)
val name: String? = null
)
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,22 @@ interface ServiceCallBackResource {
pipelineId: String,
callbackInfo: PipelineCallbackEvent
): Result<Boolean>

@Operation(summary = "流水线级别callback")
@GET
@Path("/projects/{projectId}/pipelines/{pipelineId}")
fun getPipelineCallBack(
@Parameter(description = "用户ID", required = true, example = AUTH_HEADER_USER_ID_DEFAULT_VALUE)
@HeaderParam(AUTH_HEADER_USER_ID)
userId: String,
@Parameter(description = "projectId", required = true)
@PathParam("projectId")
projectId: String,
@Parameter(description = "pipelineId", required = true)
@PathParam("pipelineId")
pipelineId: String,
@Parameter(description = "事件类型", required = false)
@QueryParam("event")
event: CallBackEvent?
): Result<List<ProjectPipelineCallBack>>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
* LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.devops.process.dao

import com.tencent.devops.common.pipeline.event.CallBackNetWorkRegionType
import com.tencent.devops.common.pipeline.event.PipelineCallbackEvent
import com.tencent.devops.model.process.tables.TPipelineCallback
import com.tencent.devops.model.process.tables.records.TPipelineCallbackRecord
import org.jooq.DSLContext
import org.jooq.Result
import org.springframework.stereotype.Repository
import java.time.LocalDateTime

@Suppress("ALL")
@Repository
class PipelineCallbackDao {

/**
* 可直接更新或插入
*/
fun save(
dslContext: DSLContext,
projectId: String,
pipelineId: String,
userId: String,
list: List<PipelineCallbackEvent>
) {
if (list.isEmpty()) return
with(TPipelineCallback.T_PIPELINE_CALLBACK) {
val now = LocalDateTime.now()
list.forEach {
dslContext.insertInto(
this,
PROJECT_ID,
PIPELINE_ID,
NAME,
EVENT_TYPE,
USER_ID,
URL,
SECRET_TOKEN,
REGION,
CREATE_TIME,
UPDATE_TIME
).values(
projectId,
pipelineId,
it.callbackName,
it.callbackEvent.name,
userId,
it.callbackUrl,
it.secretToken,
(it.region ?: CallBackNetWorkRegionType.IDC).name,
now,
now
).onDuplicateKeyUpdate()
.set(UPDATE_TIME, now)
.set(URL, it.callbackUrl)
.set(SECRET_TOKEN, it.secretToken)
.execute()
}
}
}

fun list(
dslContext: DSLContext,
projectId: String,
pipelineId: String,
event: String? = null
): Result<TPipelineCallbackRecord> {
with(TPipelineCallback.T_PIPELINE_CALLBACK) {
return dslContext.selectFrom(this)
.where(PROJECT_ID.eq(projectId))
.let {
if (!event.isNullOrBlank()) {
it.and(EVENT_TYPE.eq(event))
} else it
}
.fetch()
}
}

fun delete(
dslContext: DSLContext,
projectId: String,
pipelineId: String,
names: Set<String>
) {
with(TPipelineCallback.T_PIPELINE_CALLBACK) {
dslContext.deleteFrom(this)
.where(PROJECT_ID.eq(projectId))
.and(PIPELINE_ID.eq(pipelineId))
.and(NAME.`in`(names))
.execute()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.tencent.devops.common.api.constant.CommonMessageCode
import com.tencent.devops.common.api.exception.DependNotFoundException
import com.tencent.devops.common.api.exception.ErrorCodeException
import com.tencent.devops.common.api.exception.InvalidParamException
import com.tencent.devops.common.api.util.AESUtil
import com.tencent.devops.common.api.util.JsonUtil
import com.tencent.devops.common.api.util.MessageUtil
import com.tencent.devops.common.api.util.Watcher
Expand All @@ -52,6 +53,9 @@ import com.tencent.devops.common.pipeline.enums.BranchVersionAction
import com.tencent.devops.common.pipeline.enums.ChannelCode
import com.tencent.devops.common.pipeline.enums.PipelineInstanceTypeEnum
import com.tencent.devops.common.pipeline.enums.VersionStatus
import com.tencent.devops.common.pipeline.event.CallBackEvent
import com.tencent.devops.common.pipeline.event.CallBackNetWorkRegionType
import com.tencent.devops.common.pipeline.event.PipelineCallbackEvent
import com.tencent.devops.common.pipeline.extend.ModelCheckPlugin
import com.tencent.devops.common.pipeline.option.MatrixControlOption
import com.tencent.devops.common.pipeline.pojo.BuildNo
Expand All @@ -72,6 +76,7 @@ import com.tencent.devops.common.service.utils.LogUtils
import com.tencent.devops.common.web.utils.I18nUtil
import com.tencent.devops.process.constant.ProcessMessageCode
import com.tencent.devops.process.constant.ProcessMessageCode.BK_FIRST_STAGE_ENV_NOT_EMPTY
import com.tencent.devops.process.dao.PipelineCallbackDao
import com.tencent.devops.process.dao.PipelineSettingDao
import com.tencent.devops.process.dao.PipelineSettingVersionDao
import com.tencent.devops.process.dao.label.PipelineViewGroupDao
Expand Down Expand Up @@ -124,6 +129,7 @@ import com.tencent.devops.project.api.service.ServiceAllocIdResource
import org.jooq.DSLContext
import org.jooq.impl.DSL
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Service
import java.time.LocalDateTime
import java.util.concurrent.atomic.AtomicInteger
Expand Down Expand Up @@ -166,7 +172,8 @@ class PipelineRepositoryService constructor(
private val redisOperation: RedisOperation,
private val pipelineYamlInfoDao: PipelineYamlInfoDao,
private val pipelineGroupService: PipelineGroupService,
private val pipelineAsCodeService: PipelineAsCodeService
private val pipelineAsCodeService: PipelineAsCodeService,
private val pipelineCallbackDao: PipelineCallbackDao
) {

companion object {
Expand Down Expand Up @@ -217,6 +224,9 @@ class PipelineRepositoryService constructor(
}
}

@Value("\${project.callback.secretParam.aes-key:project_callback_aes_key}")
private val aesKey = ""

fun deployPipeline(
model: Model,
projectId: String,
Expand Down Expand Up @@ -834,6 +844,14 @@ class PipelineRepositoryService constructor(
// 初始化流水线构建统计表
pipelineBuildSummaryDao.create(dslContext, projectId, pipelineId, buildNo)
pipelineModelTaskDao.batchSave(transactionContext, modelTasks)
// 初始化流水线单体回调
savePipelineCallback(
events = model.events,
pipelineId = pipelineId,
userId = userId,
projectId = projectId,
dslContext = transactionContext
)
}
} finally {
lock.unlock()
Expand Down Expand Up @@ -1130,6 +1148,14 @@ class PipelineRepositoryService constructor(
pipelineId = pipelineId
)
pipelineModelTaskDao.batchSave(transactionContext, modelTasks)
// 保存流水线单体回调记录
savePipelineCallback(
dslContext = transactionContext,
projectId = projectId,
pipelineId = pipelineId,
userId = userId,
events = model.events
)
}
}

Expand Down Expand Up @@ -1353,6 +1379,25 @@ class PipelineRepositoryService constructor(
}
}
}
pipelineCallbackDao.list(
dslContext = dslContext,
projectId = projectId,
pipelineId = pipelineId,
event = null
).let { records ->
if (records.isNotEmpty) {
// 填充流水线级别回调
resource?.model?.events = records.associate {
it.name to PipelineCallbackEvent(
callbackEvent = CallBackEvent.valueOf(it.eventType),
callbackUrl = it.url,
secretToken = it.secretToken?.let { AESUtil.decrypt(aesKey, it) },
region = CallBackNetWorkRegionType.valueOf(it.region),
callbackName = it.name
)
}
}
}
return resource
}

Expand Down Expand Up @@ -2166,4 +2211,41 @@ class PipelineRepositoryService constructor(
null
}?.handoverFrom
}

/**
* 保存流水线单体回调记录
*/
private fun savePipelineCallback(
events: Map<String, PipelineCallbackEvent>?,
pipelineId: String,
projectId: String,
dslContext: DSLContext,
userId: String
) {
if (events.isNullOrEmpty()) return
val existEventNames = pipelineCallbackDao.list(
dslContext = dslContext,
projectId = projectId,
pipelineId = pipelineId
).map { it.name }.toSet()
if (existEventNames.isNotEmpty()) {
val needDelNames = existEventNames.subtract(events.keys).toSet()
pipelineCallbackDao.delete(
dslContext = dslContext,
projectId = projectId,
pipelineId = pipelineId,
names = needDelNames
)
}
// 保存回调事件
pipelineCallbackDao.save(
dslContext = dslContext,
projectId = projectId,
pipelineId = pipelineId,
userId = userId,
list = events.map { (key, value) ->
value.copy(secretToken = value.secretToken?.let { AESUtil.encrypt(aesKey, it) })
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,19 @@ class ServiceCallBackResourceImpl @Autowired constructor(
)
return Result(true)
}

override fun getPipelineCallBack(
userId: String,
projectId: String,
pipelineId: String,
event: CallBackEvent?
): Result<List<ProjectPipelineCallBack>> {
return Result(
projectPipelineCallBackService.getPipelineCallback(
projectId = projectId,
pipelineId = pipelineId,
event = event?.name
)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,24 @@ class CallBackControl @Autowired constructor(
events = callBackEvent.name
)
)
val pipelineCallback = pipelineRepositoryService.getPipelineResourceVersion(projectId, pipelineId)
?.model
?.getPipelineCallBack(projectId, callBackEvent) ?: emptyList()
// 流水线级别回调,旧数据存在model中,新数据存在数据库中
val pipelineCallback = projectPipelineCallBackService.getPipelineCallback(
projectId = projectId,
pipelineId = pipelineId,
event = callBackEvent.name
).let {
if (it.isEmpty()) {
pipelineRepositoryService.getPipelineResourceVersion(
projectId = projectId,
pipelineId = pipelineId
)?.model?.getPipelineCallBack(
projectId = projectId,
callbackEvent = callBackEvent
) ?: emptyList()
} else {
it
}
}
if (pipelineCallback.isNotEmpty()) {
list.addAll(pipelineCallback)
}
Expand Down
Loading
Loading