Skip to content

Commit

Permalink
Merge pull request #1 from JuanSeZ/mvc-redis-stream
Browse files Browse the repository at this point in the history
Add MVC Support
  • Loading branch information
tomsfernandez authored Jun 8, 2024
2 parents 676c22d + 628d3b3 commit f237dcf
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 5 deletions.
2 changes: 1 addition & 1 deletion demo/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dependencies {
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'

implementation project(':lib')
implementation project(':flux-lib')
}

tasks.withType(KotlinCompile) {
Expand Down
74 changes: 74 additions & 0 deletions flux-lib/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
id 'org.jetbrains.kotlin.jvm' version '1.8.21'
id 'org.jetbrains.kotlin.plugin.spring' version '1.8.21'
id 'maven-publish'
}

group = 'org.austral.ingsis'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '17'

configurations {
compileOnly {
extendsFrom annotationProcessor
}
}

repositories {
mavenCentral()
}

dependencies {
implementation 'org.springframework.boot:spring-boot-starter-data-redis-reactive:+'
implementation 'io.projectreactor.kotlin:reactor-kotlin-extensions:+'
implementation 'org.jetbrains.kotlin:kotlin-reflect:+'
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-reactor:+'
}

tasks.withType(KotlinCompile) {
kotlinOptions {
freeCompilerArgs = ['-Xjsr305=strict']
jvmTarget = '17'
}
}

tasks.named('test') {
useJUnitPlatform()
}

java {
withSourcesJar()
}

task sourceJar(type: Jar) {
classifier 'sources'
from sourceSets.main.allSource
}


publishing {
repositories {
maven {
name = "GitHubPackages"
url = "https://maven.pkg.github.com/austral-ingsis/class-redis-streams"
credentials {
username = System.getenv("USERNAME")
password = System.getenv("PASSWORD")
}
}
}

publications {
gpr(MavenPublication) {

group = 'org.austral.ingsis'
artifactId = "redis-streams-flux"
version = "0.1.${System.getenv("BUILD_NUMBER")}"

from(components.kotlin)
artifact tasks.sourceJar
}
}
}
4 changes: 2 additions & 2 deletions lib/build.gradle → mvc-lib/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ publishing {
gpr(MavenPublication) {

group = 'org.austral.ingsis'
artifactId = "redis-streams"
artifactId = "redis-streams-mvc"
version = "0.1.${System.getenv("BUILD_NUMBER")}"

from(components.kotlin)
artifact tasks.sourceJar
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.austral.ingsis.`class`.redis

import jakarta.annotation.PostConstruct
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory
import org.springframework.data.redis.connection.stream.Consumer
import org.springframework.data.redis.connection.stream.ObjectRecord
import org.springframework.data.redis.connection.stream.ReadOffset
import org.springframework.data.redis.connection.stream.StreamOffset
import org.springframework.data.redis.core.RedisTemplate
import org.springframework.data.redis.stream.StreamReceiver
import reactor.core.publisher.Flux
import java.net.InetAddress


abstract class RedisStreamConsumer<Value>(
protected val streamKey: String,
protected val groupId: String,
private val redis: RedisTemplate<String, String>
) {

protected abstract fun onMessage(record: ObjectRecord<String, Value>)
private lateinit var flow: Flux<ObjectRecord<String, Value>>

protected abstract fun options(): StreamReceiver.StreamReceiverOptions<String, ObjectRecord<String, Value>>

@PostConstruct
fun subscription() {
val options = options()

try {
val consumerGroupExists = consumerGroupExists(streamKey, groupId)
if (!consumerGroupExists) {
println("Consumer group ${groupId} for stream ${streamKey} doesn't exist. Creating...")
createConsumerGroup(streamKey, groupId)
} else {
println("Consumer group ${groupId} for stream ${streamKey} exists!")
}
} catch (e: Exception) {
println("Exception: $e")
println("Stream ${streamKey} doesn't exist. Creating stream ${streamKey} and group ${groupId}")
redis.opsForStream<Any, Any>().createGroup(streamKey, groupId)
}
val factory = redis.connectionFactory as ReactiveRedisConnectionFactory
val container = StreamReceiver.create(factory, options)
flow = container.receiveAutoAck(
Consumer.from(groupId, InetAddress.getLocalHost().hostName),
StreamOffset.create(streamKey, ReadOffset.lastConsumed())
)
flow.subscribe(this::onMessage)
}


private fun createConsumerGroup(streamKey: String, groupId: String): String {
return redis.opsForStream<Any, Any>().createGroup(streamKey, groupId)
}

private fun consumerGroupExists(stream: String, group: String): Boolean {
val groups = redis.opsForStream<Any, Any>().groups(stream)
for(it in groups) {
if(it.groupName() == group) return true
}
return false
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.austral.ingsis.`class`.redis

import org.springframework.data.redis.connection.stream.RecordId
import org.springframework.data.redis.connection.stream.StreamRecords
import org.springframework.data.redis.core.RedisTemplate

abstract class RedisStreamProducer(
val streamKey: String,
val redis: RedisTemplate<String, String>
) {

// We use Any as the upper bound of Value to make it non-nullable
inline fun <reified Value : Any> emit(value: Value): RecordId? {
val record = StreamRecords.newRecord()
.ofObject(value)
.withStreamKey(streamKey)

return redis
.opsForStream<String, Value>()
.add(record)
}
}
5 changes: 3 additions & 2 deletions settings.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
rootProject.name = 'class-redis-streams'

include 'lib'
include 'demo'
include 'flux-lib'
include 'mvc-lib'
include 'demo'

0 comments on commit f237dcf

Please sign in to comment.