Skip to content

Commit

Permalink
support scala3 with slick (#167)
Browse files Browse the repository at this point in the history
* try to support scala3 with slick

* further build changes

* Update Dependencies.scala

* Scala 3 better enforces package private checks and there is a test in a non-pekko package

* Update SlickOffsetStore.scala

* Update SlickOffsetStore.scala

* make access to db instance lazy
  • Loading branch information
pjfanning authored Jun 16, 2024
1 parent 7b23782 commit 0afbf60
Show file tree
Hide file tree
Showing 12 changed files with 28 additions and 33 deletions.
9 changes: 0 additions & 9 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ lazy val core =
Project(id = "core", base = file("core"))
.configs(IntegrationTest)
.enablePlugins(ReproducibleBuildsPlugin)
.settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(headerSettings(IntegrationTest))
.settings(Defaults.itSettings)
.settings(Dependencies.core)
Expand All @@ -31,7 +30,6 @@ lazy val core =
lazy val coreTest =
Project(id = "core-test", base = file("core-test"))
.configs(IntegrationTest)
.settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(headerSettings(IntegrationTest))
.disablePlugins(MimaPlugin)
.settings(Defaults.itSettings)
Expand All @@ -46,7 +44,6 @@ lazy val testkit =
Project(id = "testkit", base = file("testkit"))
.configs(IntegrationTest)
.enablePlugins(ReproducibleBuildsPlugin)
.settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(headerSettings(IntegrationTest))
.settings(Defaults.itSettings)
.settings(Dependencies.testKit)
Expand All @@ -59,7 +56,6 @@ lazy val jdbc =
Project(id = "jdbc", base = file("jdbc"))
.configs(IntegrationTest.extend(Test))
.enablePlugins(ReproducibleBuildsPlugin)
.settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(headerSettings(IntegrationTest))
.settings(Defaults.itSettings)
.settings(Dependencies.jdbc)
Expand All @@ -74,7 +70,6 @@ lazy val slick =
Project(id = "slick", base = file("slick"))
.configs(IntegrationTest.extend(Test))
.enablePlugins(ReproducibleBuildsPlugin)
.settings(crossScalaVersions := Dependencies.Scala2Versions)
.settings(headerSettings(IntegrationTest))
.settings(Defaults.itSettings)
.settings(Dependencies.slick)
Expand All @@ -92,7 +87,6 @@ lazy val cassandra =
Project(id = "cassandra", base = file("cassandra"))
.configs(IntegrationTest)
.enablePlugins(ReproducibleBuildsPlugin)
.settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(headerSettings(IntegrationTest))
.settings(Defaults.itSettings)
.settings(Dependencies.cassandra)
Expand All @@ -109,7 +103,6 @@ lazy val cassandra =
lazy val eventsourced =
Project(id = "eventsourced", base = file("eventsourced"))
.enablePlugins(ReproducibleBuildsPlugin)
.settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(Dependencies.eventsourced)
.settings(AutomaticModuleName.settings("pekko.projection.eventsourced"))
.settings(name := "pekko-projection-eventsourced")
Expand All @@ -120,7 +113,6 @@ lazy val eventsourced =
lazy val kafka =
Project(id = "kafka", base = file("kafka"))
.enablePlugins(ReproducibleBuildsPlugin)
.settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(Dependencies.kafka)
.settings(AutomaticModuleName.settings("pekko.projection.kafka"))
.settings(name := "pekko-projection-kafka")
Expand All @@ -146,7 +138,6 @@ lazy val `durable-state` =
Project(id = "durable-state", base = file("durable-state"))
.configs(IntegrationTest)
.enablePlugins(ReproducibleBuildsPlugin)
.settings(crossScalaVersions := Dependencies.Scala2And3Versions)
.settings(Dependencies.state)
.settings(AutomaticModuleName.settings("pekko.projection.durable-state"))
.settings(name := "pekko-projection-durable-state")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory
* INTERNAL API
*/
@InternalApi
private[projection] class JdbcOffsetStore[S <: JdbcSession](
class JdbcOffsetStore[S <: JdbcSession](
system: ActorSystem[_],
settings: JdbcSettings,
jdbcSessionFactory: () => S,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import com.typesafe.config.ConfigValueType
* INTERNAL API
*/
@InternalApi
private[projection] case class JdbcSettings(config: Config, executionContext: ExecutionContext) {
case class JdbcSettings(config: Config, executionContext: ExecutionContext) {

val schema: Option[String] =
Option(config.getString("offset-store.schema")).filterNot(_.trim.isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class KafkaToSlickIntegrationSpec extends KafkaSpecBase(ConfigFactory.load().wit
PatienceConfig(timeout = Span(30, Seconds), interval = Span(500, Milliseconds))

val dbConfig: DatabaseConfig[H2Profile] = DatabaseConfig.forConfig(SlickSettings.configPath, config)
val offsetStore = new SlickOffsetStore(system.toTyped, dbConfig.db, dbConfig.profile, SlickSettings(system.toTyped))
val offsetStore = new SlickOffsetStore(system.toTyped, dbConfig, SlickSettings(system.toTyped))
val repository = new EventTypeCountRepository(dbConfig)

override protected def beforeAll(): Unit = {
Expand Down
2 changes: 1 addition & 1 deletion project/Common.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object Common extends AutoPlugin {
override lazy val projectSettings = Seq(
projectInfoVersion := (if (isSnapshot.value) "snapshot" else version.value),
crossVersion := CrossVersion.binary,
crossScalaVersions := Dependencies.Scala2Versions,
crossScalaVersions := Dependencies.ScalaVersions,
scalaVersion := Dependencies.Scala213,
javacOptions ++= List("-Xlint:unchecked", "-Xlint:deprecation"),
Compile / doc / scalacOptions := scalacOptions.value ++ Seq(
Expand Down
7 changes: 3 additions & 4 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@ object Dependencies {
val Scala213 = "2.13.14"
val Scala212 = "2.12.19"
val Scala3 = "3.3.3"
val Scala2Versions = Seq(Scala213, Scala212)
val Scala2And3Versions = Scala2Versions.+:(Scala3)
val ScalaVersions = Seq(Scala213, Scala212, Scala3)

val PekkoVersionInDocs = "1.0"
val PekkoVersionInDocs = "1.1"
val ConnectorsVersionInDocs = "1.0"
val ConnectorsKafkaVersionInDocs = "1.0"

object Versions {
val pekko = PekkoCoreDependency.version
val pekkoPersistenceJdbc = "1.0.0"
val pekkoPersistenceJdbc = "1.1.0-M1"
val pekkoPersistenceCassandra = "1.0.0"
val connectors = PekkoConnectorsDependency.version
val connectorsKafka = PekkoConnectorsKafkaDependency.version
Expand Down
2 changes: 1 addition & 1 deletion project/PekkoCoreDependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
object PekkoCoreDependency extends PekkoDependency {
override val checkProject: String = "pekko-cluster-sharding-typed"
override val module: Option[String] = None
override val currentVersion: String = "1.0.2"
override val currentVersion: String = "1.1.0-M1"
}
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ object SlickProjection {

private def createOffsetStore[P <: JdbcProfile: ClassTag](databaseConfig: DatabaseConfig[P])(
implicit system: ActorSystem[_]) =
new SlickOffsetStore(system, databaseConfig.db, databaseConfig.profile, SlickSettings(system))
new SlickOffsetStore(system, databaseConfig, SlickSettings(system))
}

@ApiMayChange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,26 @@ import pekko.projection.jdbc.internal.MySQLDialect
import pekko.projection.jdbc.internal.OracleDialect
import pekko.projection.jdbc.internal.PostgresDialect
import pekko.util.Helpers.toRootLowerCase
import slick.basic.DatabaseConfig
import slick.jdbc.JdbcProfile

/**
* INTERNAL API
*/
@InternalApi private[projection] class SlickOffsetStore[P <: JdbcProfile](
system: ActorSystem[_],
val db: P#Backend#Database,
val profile: P,
databaseConfig: DatabaseConfig[P],
slickSettings: SlickSettings,
clock: Clock) {

def this(system: ActorSystem[_], databaseConfig: DatabaseConfig[P], slickSettings: SlickSettings) =
this(system, databaseConfig, slickSettings, Clock.systemUTC())

private[projection] val profile: P = databaseConfig.profile

import profile.api._
import OffsetSerialization.MultipleOffsets
import OffsetSerialization.SingleOffset
import profile.api._

def this(system: ActorSystem[_], db: P#Backend#Database, profile: P, slickSettings: SlickSettings) =
this(system, db, profile, slickSettings, Clock.systemUTC())

val (dialect, useLowerCase): (Dialect, Boolean) = {

Expand Down Expand Up @@ -88,7 +91,7 @@ import slick.jdbc.JdbcProfile
SingleOffset(ProjectionId(projectionId.name, row.projectionKey), row.manifest, row.offsetStr, row.mergeable))
}

val results = db.run(action)
val results = databaseConfig.db.run(action)

results.map {
case Nil => None
Expand Down Expand Up @@ -177,7 +180,8 @@ import slick.jdbc.JdbcProfile
stmt.execute(sql)
})
}
db.run(DBIO.seq(prepareSchemaDBIO, prepareManagementSchemaDBIO)).map(_ => Done)(ExecutionContexts.parasitic)
databaseConfig.db.run(DBIO.seq(prepareSchemaDBIO, prepareManagementSchemaDBIO))
.map(_ => Done)(ExecutionContexts.parasitic)
}

def dropIfExists(): Future[Done] = {
Expand All @@ -193,7 +197,8 @@ import slick.jdbc.JdbcProfile
stmt.execute(dialect.dropManagementTableStatement)
}
}
db.run(DBIO.seq(prepareSchemaDBIO, prepareManagementSchemaDBIO)).map(_ => Done)(ExecutionContexts.parasitic)
databaseConfig.db.run(DBIO.seq(prepareSchemaDBIO, prepareManagementSchemaDBIO))
.map(_ => Done)(ExecutionContexts.parasitic)
}

def readManagementState(projectionId: ProjectionId)(
Expand All @@ -206,14 +211,14 @@ import slick.jdbc.JdbcProfile
maybeRow.map(row => ManagementState(row.paused))
}

db.run(action)
databaseConfig.db.run(action)
}

def savePaused(projectionId: ProjectionId, paused: Boolean): Future[Done] = {
val millisSinceEpoch = clock.instant().toEpochMilli
val action =
managementTable.insertOrUpdate(ManagementStateRow(projectionId.name, projectionId.key, paused, millisSinceEpoch))

db.run(action).map(_ => Done)(ExecutionContexts.parasitic)
databaseConfig.db.run(action).map(_ => Done)(ExecutionContexts.parasitic)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private[projection] class SlickProjectionImpl[Offset, Envelope, P <: JdbcProfile
settings) {

implicit val executionContext: ExecutionContext = system.executionContext
override val logger: LoggingAdapter = Logging(system.classicSystem, this.getClass)
override val logger: LoggingAdapter = Logging(system.classicSystem, classOf[SlickInternalProjectionState])

override def readPaused(): Future[Boolean] =
offsetStore.readManagementState(projectionId).map(_.exists(_.paused))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ abstract class SlickOffsetStoreSpec(specConfig: SlickSpecConfig)
private val clock = new TestClock

private val offsetStore =
new SlickOffsetStore(system, dbConfig.db, dbConfig.profile, SlickSettings(slickConfig), clock)
new SlickOffsetStore(system, dbConfig, SlickSettings(slickConfig), clock)

override protected def beforeAll(): Unit = {
// create offset table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class SlickProjectionSpec

val dbConfig: DatabaseConfig[H2Profile] = DatabaseConfig.forConfig(SlickSettings.configPath, config)

val offsetStore = new SlickOffsetStore(system, dbConfig.db, dbConfig.profile, SlickSettings(system))
val offsetStore = new SlickOffsetStore(system, dbConfig, SlickSettings(system))

val projectionTestKit = ProjectionTestKit(system)

Expand Down

0 comments on commit 0afbf60

Please sign in to comment.