Skip to content

Commit

Permalink
Merge branch 'feat_8122' of http://git.woa.com/bkdevops/bk-ci into fe…
Browse files Browse the repository at this point in the history
…at_8122

# Conflicts:
#	support-files/sql/2003_v2.x/2020_ci_process-update_v2.0_mysql.sql
  • Loading branch information
hejieehe committed Oct 23, 2023
2 parents 9d3d682 + 11db4d9 commit 1fb43a5
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ data class PipelineWebhook(
val repositoryType: ScmType,
@ApiModelProperty("代码库标识类型, ID 代码库HashId / NAME 别名", required = false)
val repoType: RepositoryType?,
@ApiModelProperty("代码库HashId,repoHashId 与 repoName 不能同时为空,如果两个都不为空就用repoName", required = false)
@ApiModelProperty("插件配置的代码库HashId,repoHashId与repoName 不能同时为空,如果两个都不为空就用repoName", required = false)
var repoHashId: String?, // repoHashId 与 repoName 不能同时为空,如果两个都不为空就用repoName
@ApiModelProperty("代码库别名", required = false)
val repoName: String?,
Expand All @@ -54,8 +54,8 @@ data class PipelineWebhook(
val taskId: String? = null,
@ApiModelProperty("事件类型", required = false)
var eventType: String? = null,
@ApiModelProperty("事件源,代码库hashId", required = false)
var eventSource: String? = null,
@ApiModelProperty("代码库hashId,插件配置解析后的代码库ID", required = false)
var repositoryHashId: String? = null,
@ApiModelProperty("事件源外联Id", required = false)
var externalId: String? = null
)
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class PipelineWebhookDao {
PROJECT_NAME,
TASK_ID,
EVENT_TYPE,
EVENT_SOURCE,
REPOSITORY_HASH_ID,
EXTERNAL_ID
)
.values(
Expand All @@ -70,7 +70,7 @@ class PipelineWebhookDao {
projectName,
taskId,
eventType,
eventSource,
repositoryHashId,
externalId
)
.onDuplicateKeyUpdate()
Expand All @@ -79,7 +79,7 @@ class PipelineWebhookDao {
.set(REPO_NAME, repoName)
.set(PROJECT_NAME, projectName)
.set(EVENT_TYPE, eventType)
.set(EVENT_SOURCE, eventSource)
.set(REPOSITORY_HASH_ID, repositoryHashId)
.set(EXTERNAL_ID, externalId)
.execute()
}
Expand Down Expand Up @@ -234,30 +234,27 @@ class PipelineWebhookDao {
}?.map { convert(it) }
}

fun listWebhook(
fun listPipelines(
dslContext: DSLContext,
repositoryType: String?,
offset: Int,
limit: Int
): Result<TPipelineWebhookRecord> {
): List<PipelineWebhookSubscriber> {
return with(T_PIPELINE_WEBHOOK) {
val conditions = mutableListOf(
DELETE.eq(false)
)
if (!repositoryType.isNullOrBlank()) {
conditions.add(REPOSITORY_TYPE.eq(repositoryType))
}
dslContext.selectFrom(this)
.where(conditions)
.orderBy(PROJECT_ID.desc())
dslContext.select(PROJECT_ID, PIPELINE_ID).from(this)
.where(DELETE.eq(false))
.limit(offset, limit)
.fetch()
.fetch().map {
PipelineWebhookSubscriber(
projectId = it.value1(),
pipelineId = it.value2()
)
}
}
}

fun updateWebhookEventInfo(
dslContext: DSLContext,
eventSource: String?,
repositoryHashId: String?,
eventType: String,
externalId: String,
pipelineId: String,
Expand All @@ -266,7 +263,7 @@ class PipelineWebhookDao {
) {
return with(T_PIPELINE_WEBHOOK) {
dslContext.update(this)
.set(EVENT_SOURCE, eventSource)
.set(REPOSITORY_HASH_ID, repositoryHashId)
.set(EVENT_TYPE, eventType)
.set(EXTERNAL_ID, externalId)
.where(PROJECT_ID.eq(projectId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ class PipelineWebhookService @Autowired constructor(
version = version
)
if (repository != null) {
pipelineWebhook.eventSource = repository.repoHashId
pipelineWebhook.repositoryHashId = repository.repoHashId
pipelineWebhook.eventType = codeEventType?.name ?: ""
pipelineWebhook.externalId = getExternalId(repository)
pipelineWebhook.projectName = getProjectName(repository.projectName)
Expand Down Expand Up @@ -671,64 +671,87 @@ class PipelineWebhookService @Autowired constructor(
val repoCache = CacheBuilder.newBuilder()
.maximumSize(1000)
.build<String, Repository>()
// 上一个更新的项目ID
var preProjectId: String? = null
do {
val webhookList = pipelineWebhookDao.listWebhook(
val pipelines = pipelineWebhookDao.listPipelines(
dslContext = dslContext,
repositoryType = null,
limit = limit,
offset = offset
)
val repoSize = webhookList.size
webhookList.forEach {
with(it) {
val repositoryConfig = RepositoryConfig(
repositoryHashId = it.repoHashId,
repositoryName = it.repoName,
repositoryType = RepositoryType.valueOf(it.repoType)
)
val repository = repoCache.get("${it.projectId}_${repositoryConfig.getRepositoryId()}") {
try {
scmProxyService.getRepo(projectId = it.projectId, repositoryConfig = repositoryConfig)
} catch (ignored: Exception) {
logger.warn("fail to get repository info", ignored)
null
pipelines.forEach { (projectId, pipelineId) ->
// 更改项目,清空代码库缓存
if (preProjectId != null && preProjectId != projectId) {
repoCache.cleanUp()
}
preProjectId = projectId
val model = getModel(projectId, pipelineId)
if (model == null) {
logger.info("$projectId|$pipelineId|model is null")
return@forEach
}
val triggerContainer = model.stages[0].containers[0] as TriggerContainer
val params = triggerContainer.params.associate { param ->
param.id to param.defaultValue.toString()
}
val elementMap =
triggerContainer.elements.filterIsInstance<WebHookTriggerElement>().associateBy { it.id }
val pipelineWebhooks = pipelineWebhookDao.listWebhook(
dslContext = dslContext,
projectId = projectId,
pipelineId = pipelineId,
limit = limit,
offset = 0
)
pipelineWebhooks?.forEach webhook@{ webhook ->
try {
if (webhook.taskId.isNullOrBlank()) return@webhook
val element = elementMap[webhook.taskId] ?: return@webhook
val webhookElementParams = getElementRepositoryConfig(element, variable = params)
?: return@webhook
val elementRepositoryConfig = webhookElementParams.repositoryConfig
val webhookRepositoryConfig = getRepositoryConfig(webhook, params)
if (elementRepositoryConfig.getRepositoryId() != webhookRepositoryConfig.getRepositoryId()) {
logger.info(
"webhook repository config different from element repository config|" +
"webhook:${webhookRepositoryConfig}|element:${elementRepositoryConfig}"
)
return@webhook
}
}
if (repository != null && it.projectName != repository.projectName) {
logger.info(
"webhook projectName different from repo projectName|" +
"webhook:${it.projectName}|repo:${repository.projectName}"
)
}
val model = getModel(it.projectId, it.pipelineId)
if (model == null) {
logger.info("$projectId|$pipelineId|model is null")
return@forEach
}
val triggerContainer = model.stages[0].containers[0] as TriggerContainer
val params = triggerContainer.params.associate { param ->
param.id to param.defaultValue.toString()
}
val elements = triggerContainer.elements.filterIsInstance<WebHookTriggerElement>()
elements.forEach elements@{ element ->
run {
val webhookElementParams = getElementRepositoryConfig(element, variable = params)
?: return@elements
pipelineWebhookDao.updateWebhookEventInfo(
dslContext = dslContext,
eventType = webhookElementParams.eventType?.name ?: "",
externalId = getExternalId(repository),
projectId = it.projectId,
pipelineId = it.pipelineId,
taskId = it.taskId,
eventSource = repository?.repoHashId
val repository =
repoCache.get("${projectId}_${elementRepositoryConfig.getRepositoryId()}") {
try {
scmProxyService.getRepo(
projectId = projectId,
repositoryConfig = elementRepositoryConfig
)
} catch (ignored: Exception) {
logger.warn("fail to get repository info", ignored)
null
}
}
if (repository != null && webhook.projectName != repository.projectName) {
logger.info(
"webhook projectName different from repo projectName|" +
"webhook:${webhook.projectName}|repo:${repository.projectName}"
)
}
pipelineWebhookDao.updateWebhookEventInfo(
dslContext = dslContext,
eventType = webhookElementParams.eventType?.name ?: "",
externalId = getExternalId(repository),
projectId = projectId,
pipelineId = pipelineId,
taskId = webhook.taskId!!,
repositoryHashId = repository?.repoHashId
)
} catch (ignored: Exception) {
logger.info("update webhook event info error|${webhook}", ignored)
}
}
}
offset += limit
} while (repoSize == 1000)
} while (pipelines.size == 1000)
}

private fun getExternalId(repository: Repository?) = when (repository) {
Expand Down
1 change: 1 addition & 0 deletions support-files/sql/1001_ci_process_ddl_mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ CREATE TABLE IF NOT EXISTS `T_PIPELINE_WEBHOOK` (
`DELETE` BIT(1) DEFAULT 0 COMMENT '是否删除',
`EVENT_TYPE` varchar(32) DEFAULT NULL COMMENT '事件类型',
`EXTERNAL_ID` varchar(32) DEFAULT NULL COMMENT 'webhook事件生产者ID,工蜂-工蜂ID,github-github id,svn-svn path,p4-p4port',
`REPOSITORY_HASH_ID` varchar(64) null comment '代码库hashId';
PRIMARY KEY (`ID`),
UNIQUE KEY `UNI_INX_TPW_PROJECT_PIPELINE_TASK` (`PROJECT_ID`, `PIPELINE_ID`,`TASK_ID`),
KEY `IDX_PROJECT_NAME_REPOSITORY_TYPE` (`PROJECT_NAME`, `REPOSITORY_TYPE`)
Expand Down
19 changes: 0 additions & 19 deletions support-files/sql/2002_v1.x/2010_ci_process-update_v1.10_mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,6 @@ IF NOT EXISTS(SELECT 1
IDX_SEARCH_ID (`RESOURCE_TYPE`, `PROJECT_ID`, `RESOURCE_ID`);
END IF;


IF NOT EXISTS(SELECT 1
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = db
AND TABLE_NAME = 'T_PIPELINE_WEBHOOK'
AND COLUMN_NAME = 'EVENT_TYPE') THEN
ALTER TABLE `T_PIPELINE_WEBHOOK`
ADD COLUMN `EVENT_TYPE` varchar(32) DEFAULT null COMMENT '事件类型';
END IF;

IF NOT EXISTS(SELECT 1
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = db
AND TABLE_NAME = 'T_PIPELINE_WEBHOOK'
AND COLUMN_NAME = 'EXTERNAL_ID') THEN
ALTER TABLE `T_PIPELINE_WEBHOOK`
ADD COLUMN `EXTERNAL_ID` varchar(32) DEFAULT null COMMENT 'webhook事件生产者ID,工蜂-工蜂ID,github-github id,svn-svn path,p4-p4port';
END IF;

COMMIT;
END <CI_UBF>
DELIMITER ;
Expand Down
28 changes: 28 additions & 0 deletions support-files/sql/2003_v2.x/2020_ci_process-update_v2.0_mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,34 @@ BEGIN
ADD INDEX `IDX_TPVUS_PROJECT_ID`(`PROJECT_ID`);
END IF;

IF NOT EXISTS(SELECT 1
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = db
AND TABLE_NAME = 'T_PIPELINE_WEBHOOK'
AND COLUMN_NAME = 'EVENT_TYPE') THEN
ALTER TABLE `T_PIPELINE_WEBHOOK`
ADD COLUMN `EVENT_TYPE` varchar(32) DEFAULT null COMMENT '事件类型';
END IF;

IF NOT EXISTS(SELECT 1
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = db
AND TABLE_NAME = 'T_PIPELINE_WEBHOOK'
AND COLUMN_NAME = 'EXTERNAL_ID') THEN
ALTER TABLE `T_PIPELINE_WEBHOOK`
ADD COLUMN `EXTERNAL_ID` varchar(32) DEFAULT null COMMENT 'webhook事件生产者ID,工蜂-工蜂ID,github-github id,svn-svn path,p4-p4port';
END IF;

IF NOT EXISTS(SELECT 1
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = db
AND TABLE_NAME = 'T_PIPELINE_WEBHOOK'
AND COLUMN_NAME = 'REPOSITORY_HASH_ID') THEN
ALTER TABLE `T_PIPELINE_WEBHOOK`
ADD COLUMN `REPOSITORY_HASH_ID` varchar(64) null comment '代码库hashId';
END IF;


COMMIT;
END <CI_UBF>
DELIMITER ;
Expand Down

0 comments on commit 1fb43a5

Please sign in to comment.