Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I want to consume KTable and materialize with optimization #2886

Open
snskshn opened this issue Jan 19, 2024 · 0 comments
Open

I want to consume KTable and materialize with optimization #2886

snskshn opened this issue Jan 19, 2024 · 0 comments

Comments

@snskshn
Copy link

snskshn commented Jan 19, 2024

When I consume KTable, it is materialized as KeyValueStore automatically.

fun process() = BiConsumer<KStream<String, String>, KTable<String, String>> { input, table ->
    input.join(table) { value1, value2 ->
        Pair(value1, value2)
    }.peek { key, value -> println("$key: $value") }
}

If I set topology.optimization to all then, changelog topic is not created but reuse consuming topic as changelog topic.
Because Kafka Streams join semantics, I want to change the type of state store to VersionedKeyValueStore.
If I materialize state store manually like this, it creates changelog topic although I set topology.optimization to all.

fun process() = BiConsumer<KStream<String, String>, KTable<String, String>> { input, table ->
    val storedTable = table
        .toStream()
        .groupByKey()
        .aggregate(
            { byteArrayOf() },
            { _, value, _ -> value.toByteArray() },
            Materialized.`as`(store)
        )
    val store = Stores.persistentVersionedKeyValueStore("kafka-streams-test-store", Duration.ofDays(1))
    val stream = input.join(storedTable) { value1, value2 ->
        Pair(value1, value2)
    }.peek { key, value -> println("$key, ${value?.first}, ${value?.second?.decodeToString()") }
}

Is there any way to change the type of state store with not creating changelog topic via optimization?

related so: https://stackoverflow.com/questions/77806593/how-to-change-state-store-type-of-ktable-from-keyvaluestore-to-versionedkeyvalue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants