Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into dev-version-2
Browse files Browse the repository at this point in the history
  • Loading branch information
OptimumCode committed Aug 1, 2024
2 parents 08c05cc + 6bb866c commit d54b151
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 4 deletions.
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Zephyr data processor (1.0.0)
# Zephyr data processor (1.1.0)

Zephyr data processor synchronizes the test in th2 with Zephyr Squad and Zephyr Scale.
It searches for events that match format in the configuration and updates test executions.
Expand Down Expand Up @@ -210,6 +210,12 @@ Contains parameters for synchronization with Zephyr
type: jira
extract: VERSION
```
+ **cachesConfiguration** - configuration that will be applied to the internal caches inside the zephyr processors (e.g. cycles cache)
+ **cycles** - configuration for cycle caching
+ **size** - cache size. Default value is 100.
+ **expireAfterSeconds** - element expiration time in seconds. Default value is 86400 (1 day).
+ **invalidateAt** - time in UTC (e.g. 00:00:00) when all values in cache should be invalidated. Repeats every day.
By default, `null` meaning no scheduled invalidation is configured

##### Strategies (only for Zephyr Squad)

Expand Down Expand Up @@ -280,6 +286,12 @@ Contains parameters to set up the Logging for inner HTTP clients that are used t

# Changes

## v1.1.0

### Added

+ Parameters to configure when cycle cache for Zephyr Scale processor is invalidated. Please refer to [configuration block](#configuration).

## v1.0.0

### Added
Expand Down
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ dependencies {

implementation("org.apache.commons:commons-collections4")

implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310")

// Idiomatic logging for Kotlin. Wraps slf4j
implementation 'io.github.microutils:kotlin-logging:3.0.5'

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
kotlin.code.style=official
kotlin_version=1.8.22
release_version=1.0.0
release_version=1.1.0

description = 'th2 data processor for Zephyr synchronization'
vcs_url=https://github.com/th2-net/th2-data-processor-zephyr.git
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.th2.dataprocessor.zephyr.cfg

import java.time.LocalTime
import java.util.concurrent.TimeUnit

class CachesConfiguration(
val cycles: CacheConfiguration = CacheConfiguration(
expireAfterSeconds = TimeUnit.DAYS.toSeconds(1),
size = 100,
)
)

class CacheConfiguration(
val expireAfterSeconds: Long,
val size: Int = 100,
val invalidateAt: LocalTime? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ class EventProcessorCfg(
* Mapping between custom field to set in execution and the value it should have
*/
val customFields: Map<String, CustomFieldExtraction> = emptyMap(),

/**
* Configuration that will be applied to the internal caches inside the zephyr processors (e.g. cycles cache)
*/
val cachesConfiguration: CachesConfiguration = CachesConfiguration(),
) {
val issueRegexp: Regex = issueFormat.toPattern().toRegex()
init {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes
import com.fasterxml.jackson.annotation.JsonTypeInfo
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import io.ktor.client.plugins.logging.LogLevel

Expand All @@ -37,6 +38,7 @@ class ZephyrSynchronizationCfg(
}
companion object {
val MAPPER: ObjectMapper = jacksonObjectMapper()
.registerModules(JavaTimeModule())
.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.th2.dataprocessor.zephyr.impl.cache

import mu.KotlinLogging
import org.apache.commons.collections4.map.LRUMap
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.time.LocalDate
import java.time.LocalTime
import java.time.ZoneOffset
import java.time.temporal.ChronoUnit

internal class LRUCache<K, V>(
size: Int,
private val expireAfterMillis: Long,
private val timeSource: Clock = Clock.systemUTC(),
invalidateAt: LocalTime? = null,
) {
@Volatile
private var nextInvalidateAt: Instant
init {
require(size > 0) { "size must be positive but was $size" }
require(expireAfterMillis > 0) { "expireAfterMillis must be positive but was $expireAfterMillis" }
nextInvalidateAt = if (invalidateAt == null) {
Instant.MAX
} else {
val now = timeSource.instant()
val next = invalidateAt.atDate(LocalDate.ofInstant(now, ZoneOffset.UTC))
.toInstant(ZoneOffset.UTC)
if (next <= now) {
next.plus(1, ChronoUnit.DAYS)
} else {
next
}
}
}
private class TimestampedValue<T>(val value: T, val createdAt: Instant)

private val cache = LRUMap<K, TimestampedValue<V>>(size)

operator fun get(key: K): V? {
val now = timeSource.instant()
if (now >= nextInvalidateAt) {
invalidateAll()
}
return cache[key]?.takeUnless {
Duration.between(it.createdAt, now).toMillis() > expireAfterMillis
}?.value
}

operator fun set(key: K, value: V) {
cache[key] = TimestampedValue(value, timeSource.instant())
}

private fun invalidateAll() {
cache.clear()
val currentInvalidateAt = nextInvalidateAt
var next: Instant = nextInvalidateAt
val now = timeSource.instant()
do {
next = next.plus(1, ChronoUnit.DAYS)
} while (next < now)
nextInvalidateAt = next
LOGGER.info { "Cache invalidated. Now: $now, Invalidate at: $currentInvalidateAt, Next invalidate at: $nextInvalidateAt" }
}

private companion object {
private val LOGGER = KotlinLogging.logger { }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.exactpro.th2.dataprocessor.zephyr.GrpcEvent
import com.exactpro.th2.dataprocessor.zephyr.cfg.EventProcessorCfg
import com.exactpro.th2.dataprocessor.zephyr.cfg.TestExecutionMode
import com.exactpro.th2.dataprocessor.zephyr.impl.AbstractZephyrProcessor
import com.exactpro.th2.dataprocessor.zephyr.impl.cache.LRUCache
import com.exactpro.th2.dataprocessor.zephyr.impl.scale.extractors.CustomValueExtractor
import com.exactpro.th2.dataprocessor.zephyr.impl.scale.extractors.ExtractionContext
import com.exactpro.th2.dataprocessor.zephyr.impl.scale.extractors.createCustomValueExtractors
Expand All @@ -39,7 +40,6 @@ import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import mu.KotlinLogging
import org.apache.commons.collections4.map.LRUMap
import javax.annotation.concurrent.GuardedBy

class ZephyrScaleEventProcessorImpl(
Expand Down Expand Up @@ -78,7 +78,11 @@ class ZephyrScaleEventProcessorImpl(
private val lock = Mutex()

@GuardedBy("lock")
private val cycleCache = LRUMap<CycleCacheKey, Cycle>(100)
private val cycleCaches: Map<String, LRUCache<CycleCacheKey, Cycle>> = configurations.associate {
it.destination to it.cachesConfiguration.cycles.run {
LRUCache(size, expireAfterSeconds * 1_000, invalidateAt = invalidateAt)
}
}

override suspend fun EventProcessorContext<ZephyrScaleApiService>.processEvent(
eventName: String,
Expand Down Expand Up @@ -178,6 +182,7 @@ class ZephyrScaleEventProcessorImpl(
// We cache the result because the search for cycle by name takes a lot of time
val cacheKey = CycleCacheKey(project.id, version.name, cycleName)
return lock.withLock {
val cycleCache = cycleCaches.getValue(configuration.destination)
val cachedCycle = cycleCache[cacheKey]
cachedCycle?.apply { LOGGER.trace { "Cycle cache hit. Key: $cacheKey, Value: $key ($name)" } }
?: zephyr.getCycle(project, version, folder = null, cycleName)?.also { cycleCache[cacheKey] = it }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.fasterxml.jackson.module.kotlin.readValue
import io.ktor.client.plugins.logging.LogLevel
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import java.time.LocalTime

class TestZephyrSynchronizationCfg {
private val mapper: ObjectMapper = ZephyrSynchronizationCfg.MAPPER
Expand Down Expand Up @@ -251,6 +252,65 @@ class TestZephyrSynchronizationCfg {
assertEquals(LogLevel.ALL, level)
}
}

@Test
fun `deserialize cache configuration`() {
val data = """
{
"connection": {
"baseUrl": "https://your.jira.address.com",
"jira": {
"username": "jira-user",
"key": "your password"
}
},
"dataService": {
"name": "ZephyrService",
"versionMarker": "0.0.1"
},
"syncParameters": {
"issueFormat": "QAP_\\d+",
"delimiter": "|",
"statusMapping": {
"SUCCESS": "PASS",
"FAILED": "WIP"
},
"jobAwaitTimeout": 1000,
"cachesConfiguration": {
"cycles": {
"expireAfterSeconds": 86400,
"size": 200,
"invalidateAt": "00:00:00"
}
}
},
"httpLogging": {
"level": "ALL"
}
}
""".trimIndent()
val cfg = mapper.readValue<ZephyrSynchronizationCfg>(data)
assertEquals(1, cfg.syncParameters.size)
with(cfg.syncParameters.first()) {
assertEquals("QAP_\\d+", issueRegexp.pattern)
assertEquals('|', delimiter)
assertEquals("PASS", statusMapping[EventStatus.SUCCESS])
assertEquals("WIP", statusMapping[EventStatus.FAILED])
assertEquals(1000, jobAwaitTimeout)
assertEquals(
86400,
cachesConfiguration.cycles.expireAfterSeconds,
)
assertEquals(
200,
cachesConfiguration.cycles.size,
)
assertEquals(
LocalTime.MIDNIGHT,
cachesConfiguration.cycles.invalidateAt,
)
}
}
}

private fun <K, V> Map<K, V>.assertKey(key: K): V {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.th2.dataprocessor.zephyr.impl.cache

import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNull
import org.junit.jupiter.api.Test
import org.mockito.kotlin.doReturn
import org.mockito.kotlin.doReturnConsecutively
import org.mockito.kotlin.mock
import org.mockito.kotlin.whenever
import java.time.Clock
import java.time.Instant
import java.time.LocalDate
import java.time.LocalTime
import java.time.ZoneOffset
import java.time.temporal.ChronoUnit

class LRUCacheTest {
@Test
fun `returns null if not value`() {
val cache = createCache()
assertNull(cache["key"], "unexpected value")
}

@Test
fun `returns value if not expired yet`() {
val cache = createCache()
cache["key"] = 42
assertEquals(42, cache["key"], "unexpected value")
}

@Test
fun `invalidates all keys after specified time`() {
val invalidateAt = LocalTime.of(12, 0,0)
val time = invalidateAt.atDate(LocalDate.now()).toInstant(ZoneOffset.UTC).plusSeconds(42)
val timeSource = mock<Clock>()
whenever(timeSource.instant()) doReturn time
val cache = LRUCache<String, Int>(size = 10, expireAfterMillis = 1000, timeSource, invalidateAt)
cache["key"] = 42
assertEquals(42, cache["key"], "unexpected value")
// next day
whenever(timeSource.instant()) doReturnConsecutively listOf(time.plus(1, ChronoUnit.DAYS))

assertNull(cache["key"], "unexpected value")
}

@Test
fun `does not return value if it is expired`() {
val expireAfterMillis: Long = 1000
val time = Instant.now()
val timeSource = mock<Clock> {
on { instant() } doReturn time doReturn time.plusMillis(expireAfterMillis + 1)
}
val cache = LRUCache<String, Int>(size = 10, expireAfterMillis = expireAfterMillis, timeSource)
cache["key"] = 42
assertNull(cache["key"], "value should be expired")
}

private fun createCache(size: Int = 10, expireAfterMillis: Long = 1000): LRUCache<String, Int> {
val cache = LRUCache<String, Int>(size = size, expireAfterMillis = expireAfterMillis)
return cache
}
}

0 comments on commit d54b151

Please sign in to comment.