Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: 构建日志的服务调用端增加请求熔断机制 #9602 #9612

Merged
merged 9 commits into from
Oct 31, 2023
1 change: 1 addition & 0 deletions src/backend/ci/core/log/api-log/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
dependencies {
api(project(":core:common:common-api"))
api(project(":core:common:common-client"))
api("io.github.resilience4j:resilience4j-circuitbreaker")
}

plugins {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ import com.tencent.devops.common.log.pojo.enums.LogType
import com.tencent.devops.common.log.pojo.message.LogMessage
import com.tencent.devops.log.api.print.ServiceLogPrintResource
import com.tencent.devops.log.meta.Ansi
import io.github.resilience4j.circuitbreaker.CallNotPermittedException
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry
import org.slf4j.LoggerFactory

@Suppress("LongParameterList", "TooManyFunctions")
class BuildLogPrinter(
private val client: Client
private val client: Client,
private val circuitBreakerRegistry: CircuitBreakerRegistry
) {

fun addLine(
Expand All @@ -48,28 +51,36 @@ class BuildLogPrinter(
subTag: String? = null
) {
try {
genLogPrintPrintResource().addLogLine(
buildId = buildId,
logMessage = genLogMessage(
message = message,
tag = tag,
subTag = subTag,
jobId = jobId,
logType = LogType.LOG,
executeCount = executeCount
doWithCircuitBreaker {
genLogPrintPrintResource().addLogLine(
buildId = buildId,
logMessage = genLogMessage(
message = message,
tag = tag,
subTag = subTag,
jobId = jobId,
logType = LogType.LOG,
executeCount = executeCount
)
)
)
}
} catch (e: CallNotPermittedException) {
logger.warn("[LOG]|LOG_SERVER_ERROR|causingCircuitBreakerName=${e.causingCircuitBreakerName}|$buildId|")
} catch (ignore: Exception) {
logger.error("[$buildId]|addLine error|message=$message", ignore)
}
}

fun addLines(buildId: String, logMessages: List<LogMessage>) {
try {
genLogPrintPrintResource().addLogMultiLine(
buildId = buildId,
logMessages = logMessages
)
doWithCircuitBreaker {
genLogPrintPrintResource().addLogMultiLine(
buildId = buildId,
logMessages = logMessages
)
}
} catch (e: CallNotPermittedException) {
logger.warn("[LOG]|LOG_SERVER_ERROR|causingCircuitBreakerName=${e.causingCircuitBreakerName}|$buildId|")
} catch (ignore: Exception) {
logger.error("[$buildId]|addLines error|logMessages=$logMessages", ignore)
}
Expand Down Expand Up @@ -116,17 +127,21 @@ class BuildLogPrinter(
subTag: String? = null
) {
try {
genLogPrintPrintResource().addLogLine(
buildId = buildId,
logMessage = genLogMessage(
message = "$LOG_ERROR_FLAG${message.replace("\n", "\n$LOG_ERROR_FLAG")}",
tag = tag,
subTag = subTag,
jobId = jobId,
logType = LogType.ERROR,
executeCount = executeCount
doWithCircuitBreaker {
genLogPrintPrintResource().addLogLine(
buildId = buildId,
logMessage = genLogMessage(
message = "$LOG_ERROR_FLAG${message.replace("\n", "\n$LOG_ERROR_FLAG")}",
tag = tag,
subTag = subTag,
jobId = jobId,
logType = LogType.ERROR,
executeCount = executeCount
)
)
)
}
} catch (e: CallNotPermittedException) {
logger.warn("[LOG]|LOG_SERVER_ERROR|causingCircuitBreakerName=${e.causingCircuitBreakerName}|$buildId|")
} catch (ignore: Exception) {
logger.error("[$buildId]|addErrorLine error|message=$message", ignore)
}
Expand All @@ -141,17 +156,21 @@ class BuildLogPrinter(
subTag: String? = null
) {
try {
genLogPrintPrintResource().addLogLine(
buildId = buildId,
logMessage = genLogMessage(
message = "$LOG_DEBUG_FLAG${message.replace("\n", "\n$LOG_DEBUG_FLAG")}",
tag = tag,
subTag = subTag,
jobId = jobId,
logType = LogType.DEBUG,
executeCount = executeCount
doWithCircuitBreaker {
genLogPrintPrintResource().addLogLine(
buildId = buildId,
logMessage = genLogMessage(
message = "$LOG_DEBUG_FLAG${message.replace("\n", "\n$LOG_DEBUG_FLAG")}",
tag = tag,
subTag = subTag,
jobId = jobId,
logType = LogType.DEBUG,
executeCount = executeCount
)
)
)
}
} catch (e: CallNotPermittedException) {
logger.warn("[LOG]|LOG_SERVER_ERROR|causingCircuitBreakerName=${e.causingCircuitBreakerName}|$buildId|")
} catch (ignore: Exception) {
logger.error("[$buildId]|addDebugLine error|message=$message", ignore)
}
Expand All @@ -166,17 +185,21 @@ class BuildLogPrinter(
subTag: String? = null
) {
try {
genLogPrintPrintResource().addLogLine(
buildId = buildId,
logMessage = genLogMessage(
message = "$LOG_WARN_FLAG${message.replace("\n", "\n$LOG_WARN_FLAG")}",
tag = tag,
subTag = subTag,
jobId = jobId,
logType = LogType.DEBUG,
executeCount = executeCount
doWithCircuitBreaker {
genLogPrintPrintResource().addLogLine(
buildId = buildId,
logMessage = genLogMessage(
message = "$LOG_WARN_FLAG${message.replace("\n", "\n$LOG_WARN_FLAG")}",
tag = tag,
subTag = subTag,
jobId = jobId,
logType = LogType.DEBUG,
executeCount = executeCount
)
)
)
}
} catch (e: CallNotPermittedException) {
logger.warn("[LOG]|LOG_SERVER_ERROR|causingCircuitBreakerName=${e.causingCircuitBreakerName}|$buildId|")
} catch (ignore: Exception) {
logger.error("[$buildId]|addWarnLine error|message=$message", ignore)
}
Expand Down Expand Up @@ -239,14 +262,16 @@ class BuildLogPrinter(
subTag: String? = null
) {
try {
genLogPrintPrintResource().updateLogStatus(
buildId = buildId,
finished = true,
tag = tag,
subTag = subTag,
jobId = jobId,
executeCount = executeCount
)
doWithCircuitBreaker {
genLogPrintPrintResource().updateLogStatus(
buildId = buildId,
finished = true,
tag = tag,
subTag = subTag,
jobId = jobId,
executeCount = executeCount
)
}
} catch (ignore: Exception) {
logger.error("[$buildId]|stopLog fail", ignore)
}
Expand All @@ -260,13 +285,15 @@ class BuildLogPrinter(
subTag: String? = null
) {
try {
genLogPrintPrintResource().addLogStatus(
buildId = buildId,
tag = tag,
subTag = subTag,
jobId = jobId,
executeCount = executeCount
)
doWithCircuitBreaker {
genLogPrintPrintResource().addLogStatus(
buildId = buildId,
tag = tag,
subTag = subTag,
jobId = jobId,
executeCount = executeCount
)
}
} catch (ignore: Exception) {
logger.error("[$buildId]|stopLog fail", ignore)
}
Expand All @@ -293,6 +320,17 @@ class BuildLogPrinter(
return client.get(ServiceLogPrintResource::class)
}

private fun <T> doWithCircuitBreaker(
action: () -> T
): T {
return circuitBreakerRegistry.let {
val breaker = it.circuitBreaker(this.javaClass.name)
breaker.executeCallable {
action()
}
}
}

companion object {
private val logger = LoggerFactory.getLogger(BuildLogPrinter::class.java)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,37 @@ package com.tencent.devops.log.configuration

import com.tencent.devops.common.client.Client
import com.tencent.devops.common.log.utils.BuildLogPrinter
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry
import org.springframework.boot.autoconfigure.AutoConfigureOrder
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.core.Ordered
import java.time.Duration

@Configuration
@ConditionalOnWebApplication
@AutoConfigureOrder(Ordered.LOWEST_PRECEDENCE)
class LogPrinterConfiguration {

@Bean
fun buildLogPrinter(client: Client) = BuildLogPrinter(client)
fun buildLogPrinter(client: Client): BuildLogPrinter {
val builder = CircuitBreakerConfig.custom()
builder.enableAutomaticTransitionFromOpenToHalfOpen()
builder.writableStackTraceEnabled(false)
// 当熔断后等待 300s 放开熔断
builder.waitDurationInOpenState(Duration.ofSeconds(300))
// 熔断放开后,运行通过的请求数,如果达到熔断条件,继续熔断
builder.permittedNumberOfCallsInHalfOpenState(100)
// 当错误率达到 10% 开启熔断
builder.failureRateThreshold(10.0F)
// 慢请求超过 10% 开启熔断
builder.slowCallRateThreshold(10.0F)
// 请求超过 1s 就是慢请求
builder.slowCallDurationThreshold(Duration.ofSeconds(1))
// 滑动窗口大小为 100,默认值
builder.slidingWindowSize(100)
return BuildLogPrinter(client, CircuitBreakerRegistry.of(builder.build()))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,14 @@ class IndexCleanJobESImpl @Autowired constructor(
@Scheduled(cron = "0 0 2 * * ?")
override fun cleanIndex() {
logger.info("Start to clean index")
val redisLock = RedisLock(redisOperation, ES_INDEX_CLOSE_JOB_KEY, 20)
try {
if (!redisLock.tryLock()) {
logger.info("The other process is processing clean job, ignore")
return
// #9602 每个实例轮流获得锁进行幂等清理操作
RedisLock(redisOperation, ES_INDEX_CLOSE_JOB_KEY, 20).use {
try {
makeColdESIndexes()
deleteESIndexes()
} catch (ignore: Throwable) {
logger.warn("Fail to clean the index", ignore)
}
makeColdESIndexes()
deleteESIndexes()
} catch (ignore: Throwable) {
logger.warn("Fail to clean the index", ignore)
} finally {
redisLock.unlock()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import com.tencent.devops.common.redis.RedisLock
import com.tencent.devops.common.redis.RedisOperation
import com.tencent.devops.common.web.utils.I18nUtil
import com.tencent.devops.log.client.LogClient
import com.tencent.devops.log.es.ESClient
import com.tencent.devops.log.event.LogOriginEvent
import com.tencent.devops.log.event.LogStatusEvent
import com.tencent.devops.log.event.LogStorageEvent
Expand All @@ -55,6 +56,7 @@ import com.tencent.devops.log.service.LogStatusService
import com.tencent.devops.log.service.LogTagService
import com.tencent.devops.log.util.Constants
import com.tencent.devops.log.util.ESIndexUtils
import com.tencent.devops.log.util.IndexNameUtils
import org.elasticsearch.ElasticsearchStatusException
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest
import org.elasticsearch.action.bulk.BulkRequest
Expand Down Expand Up @@ -1093,7 +1095,7 @@ class LogServiceESImpl constructor(
indexCache.put(index, true)
true
} else {
true
false
}
}

Expand Down Expand Up @@ -1121,10 +1123,16 @@ class LogServiceESImpl constructor(
}

private fun createIndex(buildId: String, index: String): Boolean {
val createClient = logClient.hashClient(buildId)
// 提前创建第二天的索引备用
createESIndex(createClient, IndexNameUtils.getNextIndexName())
return createESIndex(createClient, index)
}

private fun createESIndex(createClient: ESClient, index: String): Boolean {
logger.info("[$index] Create index")
var success = false
val startEpoch = System.currentTimeMillis()
val createClient = logClient.hashClient(buildId)
return try {
logger.info(
"[${createClient.clusterName}][$index]|createIndex|: shards[${createClient.shards}]" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.junit.jupiter.api.Test
class DependOnControlTest {
private val pipelineContainerService: PipelineContainerService = mockk()
private val client: Client = mockk()
private val buildLogPrinter: BuildLogPrinter = BuildLogPrinter(client)
private val buildLogPrinter: BuildLogPrinter = BuildLogPrinter(client, mockk())
private val dependOnControl = DependOnControl(
pipelineContainerService = pipelineContainerService,
buildLogPrinter = buildLogPrinter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.junit.jupiter.api.Test
@Suppress("ALL", "UNUSED")
class MutexControlTest {

private val buildLogPrinter: BuildLogPrinter = BuildLogPrinter(mockk())
private val buildLogPrinter: BuildLogPrinter = BuildLogPrinter(mockk(), mockk())
private val redisOperation: RedisOperation = RedisOperation(mockk())
private val variables: Map<String, String> = mapOf(Pair("var1", "Test"))
private val buildId: String = "b-12345678901234567890123456789012"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies {
api("ch.qos.logback:logback-core")
api("ch.qos.logback:logback-classic")
api("com.github.ben-manes.caffeine:caffeine")
api("io.github.resilience4j:resilience4j-circuitbreaker")
api(fileTree(mapOf("dir" to "lib", "includes" to listOf("*.jar"))))
}

Expand Down
Loading