Skip to content

Commit

Permalink
introduce new factory methods to support logging keys' data when dele…
Browse files Browse the repository at this point in the history
…ting; deprecate old methods
  • Loading branch information
Z1kkurat committed Jan 21, 2025
1 parent 3e77660 commit 1445a02
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ trait KeyDatabase[F[_], K] {

def all(applicationId: String, groupId: String, topicPartition: TopicPartition): Stream[F, K]

def keysOf(implicit F: Monad[F], logOf: LogOf[F], logPrefix: LogPrefix[K]): F[KeysOf[F, K]] =
@deprecated("Use `toKeysOf` instead", "5.0.6")
def keysOf(implicit F: Monad[F], logOf: LogOf[F]): F[KeysOf[F, K]] =
logOf(KeyDatabase.getClass) map { implicit log => KeysOf(this) }

def toKeysOf(implicit F: Monad[F], logOf: LogOf[F], logPrefix: LogPrefix[K]): F[KeysOf[F, K]] =
logOf(KeyDatabase.getClass) map { implicit log => KeysOf.of(this) }
}
object KeyDatabase {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,24 @@ trait KeyWriter[F[_]] {
object Keys {

/** Creates a buffer for a given writer */
private[key] def apply[F[_]: Monad: Log, K: LogPrefix](
@deprecated("Use `of` instead", "5.0.6")
private[key] def apply[F[_]: Monad: Log, K](
key: K,
database: KeyDatabase[F, K]
): Keys[F] = new Keys[F] {

def flush: F[Unit] = database.persist(key)

def delete(persist: Boolean): F[Unit] =
if (persist) {
database.delete(key) *> Log[F].info("deleted key")
} else {
().pure[F]
}

}

private[key] def of[F[_]: Monad: Log, K: LogPrefix](
key: K,
database: KeyDatabase[F, K]
): Keys[F] = new Keys[F] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,30 @@ trait KeysOf[F[_], K] {
}
object KeysOf {

def memory[F[_]: Sync: Log, K: LogPrefix]: F[KeysOf[F, K]] =
KeyDatabase.memory[F, K] map { database =>
KeysOf(database)
}
@deprecated("Use another `memory1` instead", "5.0.6")
def memory[F[_]: Sync: Log, K]: F[KeysOf[F, K]] =
KeyDatabase.memory[F, K].map(database => KeysOf.apply(database))

def memory1[F[_]: Sync: Log, K: LogPrefix]: F[KeysOf[F, K]] =
KeyDatabase.memory[F, K].map(database => KeysOf.of(database))

/** Creates `KeysOf` with a passed logger */
def apply[F[_]: Monad: Log, K: LogPrefix](
@deprecated("Use `of` instead", "5.0.6")
def apply[F[_]: Monad: Log, K](
database: KeyDatabase[F, K]
): KeysOf[F, K] = new KeysOf[F, K] {
def apply(key: K) = Keys(key, database)
def all(applicationId: String, groupId: String, topicPartition: TopicPartition) =
database.all(applicationId, groupId, topicPartition)
}

/** Creates `KeysOf` with a passed logger */
def of[F[_]: Monad: Log, K: LogPrefix](
database: KeyDatabase[F, K]
): KeysOf[F, K] = new KeysOf[F, K] {
def apply(key: K) = Keys.of(key, database)
def all(applicationId: String, groupId: String, topicPartition: TopicPartition) =
database.all(applicationId, groupId, topicPartition)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ trait PersistenceModule[F[_], S] {
implicit F: Sync[F],
logOf: LogOf[F]
): Resource[F, PersistenceOf[F, KafkaKey, KafkaSnapshot[S], ConsumerRecord[String, ByteVector]]] = for {
keysOf <- Resource.eval(keys.keysOf)
keysOf <- Resource.eval(keys.toKeysOf)
journalsOf <- Resource.eval(journals.journalsOf)
snapshotsOf <- Resource.eval(snapshots.snapshotsOf)
persistenceOf <- PersistenceOf.restoreEvents(keysOf, journalsOf, snapshotsOf)
Expand All @@ -34,7 +34,7 @@ trait PersistenceModule[F[_], S] {
implicit F: Sync[F],
logOf: LogOf[F]
): F[SnapshotPersistenceOf[F, KafkaKey, KafkaSnapshot[S], ConsumerRecord[String, ByteVector]]] = for {
keysOf <- keys.keysOf
keysOf <- keys.toKeysOf
journalsOf <- journals.journalsOf
snapshotsOf <- snapshots.snapshotsOf
} yield PersistenceOf.restoreSnapshots(keysOf, journalsOf, snapshotsOf)
Expand All @@ -44,7 +44,7 @@ trait PersistenceModule[F[_], S] {
implicit F: Sync[F],
logOf: LogOf[F]
): F[SnapshotPersistenceOf[F, KafkaKey, KafkaSnapshot[S], ConsumerRecord[String, ByteVector]]] = for {
keysOf <- keys.keysOf
keysOf <- keys.toKeysOf
snapshotsOf <- snapshots.snapshotsOf
} yield PersistenceOf.snapshotsOnly(keysOf, snapshotsOf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ object AdditionalPersistSpec {

def partitionFlow: Resource[IO, PartitionFlow[IO]] =
for {
keysOf <- Resource.eval(KeysOf.memory[IO, KafkaKey])
keysOf <- Resource.eval(KeysOf.memory1[IO, KafkaKey])
timersOf <- Resource.eval(TimersOf.memory[IO, KafkaKey])
partitionFlow <- PartitionFlow.resource(
topicPartition = TopicPartition.empty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ class PartitionFlowSpec extends FunSuite {
logOf.apply(classOf[PartitionFlowSpec]).toResource.flatMap { implicit log =>
val committedOffset = Ref.unsafe[IO, Offset](Offset.min)
val keyStorage = Ref.unsafe[IO, Set[KafkaKey]](initialData.keySet)
val keysOf = KeysOf.apply[IO, KafkaKey](KeyDatabase.memory[IO, KafkaKey](keyStorage.stateInstance))
val keysOf = KeysOf.of[IO, KafkaKey](KeyDatabase.memory[IO, KafkaKey](keyStorage.stateInstance))
val snapshotsStorage = Ref.unsafe[IO, Map[KafkaKey, String]](initialData)
val persistenceOf =
PersistenceOf
Expand Down Expand Up @@ -466,7 +466,7 @@ object PartitionFlowSpec {

type State = (Offset, Int)

val keysOf = KeysOf.memory[IO, String].unsafeRunSync()(IORuntime.global)
val keysOf = KeysOf.memory1[IO, String].unsafeRunSync()(IORuntime.global)
val journalsOf = JournalsOf.memory[IO, String, ConsumerRecord[String, ByteVector]].unsafeRunSync()(IORuntime.global)
val snapshotsOf = SnapshotsOf.memory[IO, String, State].unsafeRunSync()(IORuntime.global)
val (persistenceOf, _) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class KeysSpec extends FunSuite {

// Given("empty database")
val database = KeyDatabase.memory(f.database)
val keys = Keys("key1", database)
val keys = Keys.of("key1", database)

// When("Keys is flushed")
val program = keys.flush
Expand All @@ -32,7 +32,7 @@ class KeysSpec extends FunSuite {

// Given("database with contents")
val database = KeyDatabase.memory(f.database)
val snapshots = Keys("key1", database)
val snapshots = Keys.of("key1", database)
val context = Set("key1")

// When("delete is requested")
Expand All @@ -50,7 +50,7 @@ class KeysSpec extends FunSuite {

// Given("database with contents")
val database = KeyDatabase.memory(f.database)
val snapshots = Keys("key1", database)
val snapshots = Keys.of("key1", database)
val context = Set("key1")

// When("delete is requested")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class EntityRegistryTest extends FunSuite {
)

val resource = for {
keysOf <- KeysOf.memory[IO, KafkaKey].toResource
keysOf <- KeysOf.memory1[IO, KafkaKey].toResource
timersOf <- TimersOf.memory[IO, KafkaKey].toResource
registry <- EntityRegistry.memory[IO, KafkaKey, Int].toResource
partitionFlowOf = PartitionFlowOf.apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class FlowSpec extends CassandraSpec {
)
)
timersOf <- Resource.eval(TimersOf.memory[IO, KafkaKey])
keysOf <- Resource.eval(storage.keys.keysOf)
keysOf <- Resource.eval(storage.keys.toKeysOf)
persistenceOf <- storage.restoreEvents
keyStateOf = KeyStateOf.eagerRecovery[IO, KafkaSnapshot[String]](
applicationId = "FlowSpec",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class StatefulProcessingWithKafkaSpec extends ForAllKafkaSuite {
val snapshotPersistenceOf: SnapshotPersistenceOf[IO, KafkaKey, State, ConsumerRecord[String, ByteVector]] =
PersistenceOf.snapshotsOnly(keysOf, snapshotsOf)

override def keysOf: KeysOf[IO, KafkaKey] = KeysOf.memory[IO, KafkaKey].unsafeRunSync()
override def keysOf: KeysOf[IO, KafkaKey] = KeysOf.memory1[IO, KafkaKey].unsafeRunSync()
override def persistenceOf: SnapshotPersistenceOf[IO, KafkaKey, State, ConsumerRecord[String, ByteVector]] =
snapshotPersistenceOf
}
Expand Down

0 comments on commit 1445a02

Please sign in to comment.