Skip to content

Commit

Permalink
Apply scalafmt
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed May 5, 2024
1 parent f4028a6 commit 9479cd1
Show file tree
Hide file tree
Showing 39 changed files with 244 additions and 359 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,16 @@ final class AmqpDetailsConnectionProvider private (
factory.setPassword(credentials.password)
}
virtualHost.foreach(factory.setVirtualHost)
sslConfiguration.foreach(sslConfiguration => {
sslConfiguration.foreach { sslConfiguration =>
if (sslConfiguration.protocol.isDefined) {
if (sslConfiguration.trustManager.isDefined)
factory.useSslProtocol(sslConfiguration.protocol.get, sslConfiguration.trustManager.get)
else factory.useSslProtocol(sslConfiguration.protocol.get)
} else if (sslConfiguration.context.isDefined) {
} else if (sslConfiguration.context.isDefined)
factory.useSslProtocol(sslConfiguration.context.get)
} else {
else
factory.useSslProtocol()
}
})
}
requestedHeartbeat.foreach(factory.setRequestedHeartbeat)
connectionTimeout.foreach(factory.setConnectionTimeout)
handshakeTimeout.foreach(factory.setHandshakeTimeout)
Expand Down Expand Up @@ -244,9 +243,8 @@ object AmqpCredentials {
final class AmqpSSLConfiguration private (val protocol: Option[String] = None,
val trustManager: Option[TrustManager] = None,
val context: Option[SSLContext] = None) {
if (protocol.isDefined && context.isDefined) {
if (protocol.isDefined && context.isDefined)
throw new IllegalArgumentException("Protocol and context can't be defined in the same AmqpSSLConfiguration.")
}

def withProtocol(protocol: String): AmqpSSLConfiguration =
copy(protocol = Some(protocol))
Expand Down Expand Up @@ -419,10 +417,8 @@ final class AmqpCachedConnectionProvider private (val provider: AmqpConnectionPr
throw new ConcurrentModificationException(
"Unexpected concurrent modification while closing the connection.")
}
} else {
if (!state.compareAndSet(c, Connected(cachedConnection, clients - 1)))
releaseRecursive(amqpConnectionProvider, connection)
}
} else if (!state.compareAndSet(c, Connected(cachedConnection, clients - 1)))
releaseRecursive(amqpConnectionProvider, connection)
case Closing => releaseRecursive(amqpConnectionProvider, connection)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,8 @@ import scala.concurrent.Promise
if (noAwaitingMessages && exitQueue.isEmpty) {
streamCompletion.success(Done)
super.onUpstreamFinish()
} else {
} else
log.debug("Received upstream finish signal - stage will be closed when all buffered messages are processed")
}

private def publish(message: WriteMessage): DeliveryTag = {
val tag: DeliveryTag = channel.getNextPublishSeqNo
Expand Down Expand Up @@ -191,10 +190,9 @@ import scala.concurrent.Promise

override protected def onTimer(timerKey: Any): Unit =
timerKey match {
case tag: DeliveryTag => {
case tag: DeliveryTag =>
log.debug("Received timeout for deliveryTag {}.", tag)
onRejection(tag, multiple = false)
}
case _ => ()
}

Expand All @@ -209,4 +207,4 @@ import scala.concurrent.Promise
}

private def isFinished: Boolean = isClosed(in) && noAwaitingMessages && exitQueue.isEmpty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,14 @@ import scala.concurrent.{ Future, Promise }
buffer += (tag -> AwaitingMessage(tag, passThrough))

override def dequeueAwaitingMessages(tag: DeliveryTag, multiple: Boolean): Iterable[AwaitingMessage[T]] =
if (multiple) {
if (multiple)
dequeueWhile((t, _) => t <= tag)
} else {
else {
setReady(tag)
if (isAtHead(tag)) {
if (isAtHead(tag))
dequeueWhile((_, message) => message.ready)
} else {
else
Seq.empty
}
}

private def dequeueWhile(
Expand All @@ -88,4 +87,4 @@ import scala.concurrent.{ Future, Promise }

}, streamCompletion.future)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ private[amqp] final class AmqpReplyToSinkStage(replyToSinkSettings: AmqpReplyToS
elem.immediate,
elem.properties.orNull,
elem.bytes.toArray)
} else if (settings.failIfReplyToMissing) {
} else if (settings.failIfReplyToMissing)
onFailure(new RuntimeException("Reply-to header was not set"))
}

tryPull(in)
}
Expand All @@ -94,4 +93,4 @@ private[amqp] final class AmqpReplyToSinkStage(replyToSinkSettings: AmqpReplyToS
}

override def toString: String = "AmqpReplyToSink"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
val consumerCallback = getAsyncCallback(handleDelivery)

val commitCallback = getAsyncCallback[AckArguments] {
case AckArguments(deliveryTag, multiple, promise) => {
case AckArguments(deliveryTag, multiple, promise) =>
try {
channel.basicAck(deliveryTag, multiple)
unackedMessages -= 1
Expand All @@ -81,10 +81,9 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
} catch {
case e: Throwable => promise.failure(e)
}
}
}
val nackCallback = getAsyncCallback[NackArguments] {
case NackArguments(deliveryTag, multiple, requeue, promise) => {
case NackArguments(deliveryTag, multiple, requeue, promise) =>
try {
channel.basicNack(deliveryTag, multiple, requeue)
unackedMessages -= 1
Expand All @@ -95,7 +94,6 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
} catch {
case e: Throwable => promise.failure(e)
}
}
}

val amqpSourceConsumer = new DefaultConsumer(channel) {
Expand All @@ -105,7 +103,7 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
body: Array[Byte]): Unit =
consumerCallback.invoke(
new CommittableReadResult {
override val message = ReadResult(ByteString(body), envelope, properties)
override val message: ReadResult = ReadResult(ByteString(body), envelope, properties)

override def ack(multiple: Boolean): Future[Done] = {
val promise = Promise[Done]()
Expand Down Expand Up @@ -148,21 +146,19 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
}

def handleDelivery(message: CommittableReadResult): Unit =
if (isAvailable(out)) {
if (isAvailable(out))
pushMessage(message)
} else if (queue.size + 1 > bufferSize) {
else if (queue.size + 1 > bufferSize)
onFailure(new RuntimeException(s"Reached maximum buffer size $bufferSize"))
} else {
else
queue.enqueue(message)
}

setHandler(
out,
new OutHandler {
override def onPull(): Unit =
if (queue.nonEmpty) {
if (queue.nonEmpty)
pushMessage(queue.dequeue())
}

override def onDownstreamFinish(cause: Throwable): Unit = {
setKeepGoing(true)
Expand Down Expand Up @@ -207,15 +203,14 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf

val expectedResponses: Int = {
val headers = props.getHeaders
if (headers == null) {
if (headers == null)
responsesPerMessage
} else {
else {
val r = headers.get("expectedReplies")
if (r != null) {
if (r != null)
r.asInstanceOf[Int]
} else {
else
responsesPerMessage
}
}
}

Expand All @@ -237,4 +232,4 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf

override def toString: String = "AmqpRpcFlow"

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi
properties: BasicProperties,
body: Array[Byte]): Unit = {
val message = if (ackRequired) {

new CommittableReadResult {
override val message: ReadResult = ReadResult(ByteString(body), envelope, properties)

Expand Down Expand Up @@ -155,21 +154,19 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi
}

def handleDelivery(message: CommittableReadResult): Unit =
if (isAvailable(out)) {
if (isAvailable(out))
pushMessage(message)
} else if (queue.size + 1 > bufferSize) {
else if (queue.size + 1 > bufferSize)
onFailure(new RuntimeException(s"Reached maximum buffer size $bufferSize"))
} else {
else
queue.enqueue(message)
}

setHandler(
out,
new OutHandler {
override def onPull(): Unit =
if (queue.nonEmpty) {
if (queue.nonEmpty)
pushMessage(queue.dequeue())
}

override def onDownstreamFinish(cause: Throwable): Unit =
if (unackedMessages == 0) super.onDownstreamFinish(cause)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ trait AbstractAvroParquetBase {

val genFinalFile: Gen[String] = for {
fileName <- Gen.alphaLowerStr
} yield {
folder + "/" + fileName + ".parquet"
}
} yield folder + "/" + fileName + ".parquet"

val genFile: Gen[String] = Gen.oneOf(Seq(Gen.alphaLowerStr.sample.get + ".parquet"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ import scala.collection.mutable.Queue
if (res.isEmpty) {
settings.retrieveRetryTimeout match {
case Some(timeout) =>
if (isAvailable(out)) {
if (isAvailable(out))
scheduleOnce(NotUsed, timeout)
}
case None => complete(out)
}
} else {
Expand All @@ -69,11 +68,10 @@ import scala.collection.mutable.Queue
out,
new OutHandler {
override def onPull(): Unit =
if (buffer.nonEmpty) {
if (buffer.nonEmpty)
push(out, buffer.dequeue())
} else {
else
retrieveMessages()
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,14 @@ class DefaultSessionProvider(system: ActorSystem, config: Config) extends CqlSes
*/
private def usePekkoDiscovery(config: Config): Boolean = config.getString("service-discovery.name").nonEmpty

override def connect()(implicit ec: ExecutionContext): Future[CqlSession] = {
if (usePekkoDiscovery(config)) {
override def connect()(implicit ec: ExecutionContext): Future[CqlSession] =
if (usePekkoDiscovery(config))
PekkoDiscoverySessionProvider.connect(system, config)
} else {
else {
val driverConfig = CqlSessionProvider.driverConfig(system, config)
val driverConfigLoader = DriverConfigLoaderFromConfig.fromConfig(driverConfig)
CqlSession.builder().withConfigLoader(driverConfigLoader).buildAsync().asScala
}
}
}

object CqlSessionProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ class DriverConfigLoaderFromConfig(config: Config) extends DriverConfigLoader {

private val driverConfig: DriverConfig = new TypesafeDriverConfig(config)

override def getInitialConfig: DriverConfig = {
driverConfig
}
override def getInitialConfig: DriverConfig = driverConfig

override def onDriverInit(context: DriverContext): Unit = ()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,14 @@ import scala.concurrent.{ ExecutionContext, Future }
*/
private[cassandra] object PekkoDiscoverySessionProvider {

def connect(system: ActorSystem, config: Config)(implicit ec: ExecutionContext): Future[CqlSession] = {
def connect(system: ActorSystem, config: Config)(implicit ec: ExecutionContext): Future[CqlSession] =
readNodes(config)(system, ec).flatMap { contactPoints =>
val driverConfigWithContactPoints = ConfigFactory.parseString(s"""
basic.contact-points = [${contactPoints.mkString("\"", "\", \"", "\"")}]
""").withFallback(CqlSessionProvider.driverConfig(system, config))
val driverConfigLoader = DriverConfigLoaderFromConfig.fromConfig(driverConfigWithContactPoints)
CqlSession.builder().withConfigLoader(driverConfigLoader).buildAsync().asScala
}
}

def connect(system: ClassicActorSystemProvider, config: Config)(implicit ec: ExecutionContext): Future[CqlSession] =
connect(system.classicSystem, config)
Expand All @@ -96,7 +95,7 @@ private[cassandra] object PekkoDiscoverySessionProvider {
private def readNodes(
serviceName: String,
lookupTimeout: FiniteDuration)(
implicit system: ActorSystem, ec: ExecutionContext): Future[immutable.Seq[String]] = {
implicit system: ActorSystem, ec: ExecutionContext): Future[immutable.Seq[String]] =
Discovery(system).discovery.lookup(serviceName, lookupTimeout).map { resolved =>
resolved.addresses.map { target =>
target.host + ":" + target.port.getOrElse {
Expand All @@ -105,6 +104,5 @@ private[cassandra] object PekkoDiscoverySessionProvider {
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,11 @@ object CassandraFlow {
writeSettings: CassandraWriteSettings,
cqlStatement: String,
statementBinder: pekko.japi.Function2[T, PreparedStatement, BoundStatement])
: FlowWithContext[T, Ctx, T, Ctx, NotUsed] = {
: FlowWithContext[T, Ctx, T, Ctx, NotUsed] =
scaladsl.CassandraFlow
.withContext(writeSettings, cqlStatement, (t, preparedStatement) => statementBinder.apply(t, preparedStatement))(
session.delegate)
.asJava
}

/**
* Creates a flow that uses [[com.datastax.oss.driver.api.core.cql.BatchStatement]] and groups the
Expand All @@ -92,13 +91,12 @@ object CassandraFlow {
writeSettings: CassandraWriteSettings,
cqlStatement: String,
statementBinder: (T, PreparedStatement) => BoundStatement,
groupingKey: pekko.japi.Function[T, K]): Flow[T, T, NotUsed] = {
groupingKey: pekko.japi.Function[T, K]): Flow[T, T, NotUsed] =
scaladsl.CassandraFlow
.createBatch(writeSettings,
cqlStatement,
(t, preparedStatement) => statementBinder.apply(t, preparedStatement),
t => groupingKey.apply(t))(session.delegate)
.asJava
}

}
Loading

0 comments on commit 9479cd1

Please sign in to comment.