From 9479cd1ffa0f28b45397b200768b08252ff72020 Mon Sep 17 00:00:00 2001 From: Matthew de Detrich Date: Sun, 14 Apr 2024 10:25:03 +0200 Subject: [PATCH] Apply scalafmt --- .../amqp/AmqpConnectionProvider.scala | 18 ++-- .../AbstractAmqpAsyncFlowStageLogic.scala | 8 +- .../amqp/impl/AmqpAsyncFlowStage.scala | 11 +-- .../amqp/impl/AmqpReplyToSinkStage.scala | 5 +- .../amqp/impl/AmqpRpcFlowStage.scala | 29 +++--- .../amqp/impl/AmqpSourceStage.scala | 11 +-- .../scaladsl/AbstractAvroParquetBase.scala | 4 +- .../impl/AzureQueueSourceStage.scala | 8 +- .../cassandra/CqlSessionProvider.scala | 7 +- .../DriverConfigLoaderFromConfig.scala | 4 +- .../PekkoDiscoverySessionProvider.scala | 6 +- .../cassandra/javadsl/CassandraFlow.scala | 6 +- .../cassandra/scaladsl/CassandraFlow.scala | 9 +- .../cassandra/scaladsl/CassandraSession.scala | 40 +++----- .../scaladsl/CassandraSessionRegistry.scala | 3 +- .../docs/scaladsl/CassandraSourceSpec.scala | 3 +- .../javadsl/CassandraSessionSpec.scala | 3 +- .../scaladsl/CassandraLifecycle.scala | 10 +- .../CassandraSessionPerformanceSpec.scala | 3 +- .../couchbase/CouchbaseSessionRegistry.scala | 3 +- .../impl/CouchbaseClusterRegistry.scala | 3 +- .../couchbase/impl/CouchbaseSessionImpl.scala | 3 +- .../couchbase/scaladsl/CouchbaseFlow.scala | 16 ++-- .../connectors/csv/impl/CsvFormatter.scala | 22 ++--- .../connectors/csv/impl/CsvParser.scala | 6 +- .../csv/impl/CsvToMapJavaStage.scala | 35 +++---- .../connectors/csv/impl/CsvToMapStage.scala | 12 +-- .../scala/docs/scaladsl/CsvParsingSpec.scala | 91 +++++++++--------- .../eip/scaladsl/PassThroughExamples.scala | 10 +- .../elasticsearch/SourceSettingsBase.scala | 2 +- .../elasticsearch/WriteSettingsBase.scala | 2 +- .../elasticsearch/impl/ElasticsearchApi.scala | 8 +- .../impl/ElasticsearchSimpleFlowStage.scala | 6 +- .../impl/ElasticsearchSourceStage.scala | 61 +++++------- .../elasticsearch/impl/RestBulkApi.scala | 7 +- .../elasticsearch/impl/RestBulkApiV5.scala | 5 +- .../elasticsearch/impl/RestBulkApiV7.scala | 5 +- .../javadsl/ElasticsearchSource.scala | 94 +++++++++---------- .../scaladsl/ElasticsearchFlow.scala | 24 ++--- 39 files changed, 244 insertions(+), 359 deletions(-) diff --git a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/AmqpConnectionProvider.scala b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/AmqpConnectionProvider.scala index 36ac490fc..685abe32c 100644 --- a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/AmqpConnectionProvider.scala +++ b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/AmqpConnectionProvider.scala @@ -138,17 +138,16 @@ final class AmqpDetailsConnectionProvider private ( factory.setPassword(credentials.password) } virtualHost.foreach(factory.setVirtualHost) - sslConfiguration.foreach(sslConfiguration => { + sslConfiguration.foreach { sslConfiguration => if (sslConfiguration.protocol.isDefined) { if (sslConfiguration.trustManager.isDefined) factory.useSslProtocol(sslConfiguration.protocol.get, sslConfiguration.trustManager.get) else factory.useSslProtocol(sslConfiguration.protocol.get) - } else if (sslConfiguration.context.isDefined) { + } else if (sslConfiguration.context.isDefined) factory.useSslProtocol(sslConfiguration.context.get) - } else { + else factory.useSslProtocol() - } - }) + } requestedHeartbeat.foreach(factory.setRequestedHeartbeat) connectionTimeout.foreach(factory.setConnectionTimeout) handshakeTimeout.foreach(factory.setHandshakeTimeout) @@ -244,9 +243,8 @@ object AmqpCredentials { final class AmqpSSLConfiguration private (val protocol: Option[String] = None, val trustManager: Option[TrustManager] = None, val context: Option[SSLContext] = None) { - if (protocol.isDefined && context.isDefined) { + if (protocol.isDefined && context.isDefined) throw new IllegalArgumentException("Protocol and context can't be defined in the same AmqpSSLConfiguration.") - } def withProtocol(protocol: String): AmqpSSLConfiguration = copy(protocol = Some(protocol)) @@ -419,10 +417,8 @@ final class AmqpCachedConnectionProvider private (val provider: AmqpConnectionPr throw new ConcurrentModificationException( "Unexpected concurrent modification while closing the connection.") } - } else { - if (!state.compareAndSet(c, Connected(cachedConnection, clients - 1))) - releaseRecursive(amqpConnectionProvider, connection) - } + } else if (!state.compareAndSet(c, Connected(cachedConnection, clients - 1))) + releaseRecursive(amqpConnectionProvider, connection) case Closing => releaseRecursive(amqpConnectionProvider, connection) } diff --git a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AbstractAmqpAsyncFlowStageLogic.scala b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AbstractAmqpAsyncFlowStageLogic.scala index cd9d80b54..ffc53dabe 100644 --- a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AbstractAmqpAsyncFlowStageLogic.scala +++ b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AbstractAmqpAsyncFlowStageLogic.scala @@ -153,9 +153,8 @@ import scala.concurrent.Promise if (noAwaitingMessages && exitQueue.isEmpty) { streamCompletion.success(Done) super.onUpstreamFinish() - } else { + } else log.debug("Received upstream finish signal - stage will be closed when all buffered messages are processed") - } private def publish(message: WriteMessage): DeliveryTag = { val tag: DeliveryTag = channel.getNextPublishSeqNo @@ -191,10 +190,9 @@ import scala.concurrent.Promise override protected def onTimer(timerKey: Any): Unit = timerKey match { - case tag: DeliveryTag => { + case tag: DeliveryTag => log.debug("Received timeout for deliveryTag {}.", tag) onRejection(tag, multiple = false) - } case _ => () } @@ -209,4 +207,4 @@ import scala.concurrent.Promise } private def isFinished: Boolean = isClosed(in) && noAwaitingMessages && exitQueue.isEmpty -} +} \ No newline at end of file diff --git a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpAsyncFlowStage.scala b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpAsyncFlowStage.scala index 3e4c41888..13ba8a45f 100644 --- a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpAsyncFlowStage.scala +++ b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpAsyncFlowStage.scala @@ -58,15 +58,14 @@ import scala.concurrent.{ Future, Promise } buffer += (tag -> AwaitingMessage(tag, passThrough)) override def dequeueAwaitingMessages(tag: DeliveryTag, multiple: Boolean): Iterable[AwaitingMessage[T]] = - if (multiple) { + if (multiple) dequeueWhile((t, _) => t <= tag) - } else { + else { setReady(tag) - if (isAtHead(tag)) { + if (isAtHead(tag)) dequeueWhile((_, message) => message.ready) - } else { + else Seq.empty - } } private def dequeueWhile( @@ -88,4 +87,4 @@ import scala.concurrent.{ Future, Promise } }, streamCompletion.future) } -} +} \ No newline at end of file diff --git a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpReplyToSinkStage.scala b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpReplyToSinkStage.scala index a8f70724b..dc5887646 100644 --- a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpReplyToSinkStage.scala +++ b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpReplyToSinkStage.scala @@ -82,9 +82,8 @@ private[amqp] final class AmqpReplyToSinkStage(replyToSinkSettings: AmqpReplyToS elem.immediate, elem.properties.orNull, elem.bytes.toArray) - } else if (settings.failIfReplyToMissing) { + } else if (settings.failIfReplyToMissing) onFailure(new RuntimeException("Reply-to header was not set")) - } tryPull(in) } @@ -94,4 +93,4 @@ private[amqp] final class AmqpReplyToSinkStage(replyToSinkSettings: AmqpReplyToS } override def toString: String = "AmqpReplyToSink" -} +} \ No newline at end of file diff --git a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpRpcFlowStage.scala b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpRpcFlowStage.scala index 1b18aaea8..2bc79c487 100644 --- a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpRpcFlowStage.scala +++ b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpRpcFlowStage.scala @@ -70,7 +70,7 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf val consumerCallback = getAsyncCallback(handleDelivery) val commitCallback = getAsyncCallback[AckArguments] { - case AckArguments(deliveryTag, multiple, promise) => { + case AckArguments(deliveryTag, multiple, promise) => try { channel.basicAck(deliveryTag, multiple) unackedMessages -= 1 @@ -81,10 +81,9 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf } catch { case e: Throwable => promise.failure(e) } - } } val nackCallback = getAsyncCallback[NackArguments] { - case NackArguments(deliveryTag, multiple, requeue, promise) => { + case NackArguments(deliveryTag, multiple, requeue, promise) => try { channel.basicNack(deliveryTag, multiple, requeue) unackedMessages -= 1 @@ -95,7 +94,6 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf } catch { case e: Throwable => promise.failure(e) } - } } val amqpSourceConsumer = new DefaultConsumer(channel) { @@ -105,7 +103,7 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf body: Array[Byte]): Unit = consumerCallback.invoke( new CommittableReadResult { - override val message = ReadResult(ByteString(body), envelope, properties) + override val message: ReadResult = ReadResult(ByteString(body), envelope, properties) override def ack(multiple: Boolean): Future[Done] = { val promise = Promise[Done]() @@ -148,21 +146,19 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf } def handleDelivery(message: CommittableReadResult): Unit = - if (isAvailable(out)) { + if (isAvailable(out)) pushMessage(message) - } else if (queue.size + 1 > bufferSize) { + else if (queue.size + 1 > bufferSize) onFailure(new RuntimeException(s"Reached maximum buffer size $bufferSize")) - } else { + else queue.enqueue(message) - } setHandler( out, new OutHandler { override def onPull(): Unit = - if (queue.nonEmpty) { + if (queue.nonEmpty) pushMessage(queue.dequeue()) - } override def onDownstreamFinish(cause: Throwable): Unit = { setKeepGoing(true) @@ -207,15 +203,14 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf val expectedResponses: Int = { val headers = props.getHeaders - if (headers == null) { + if (headers == null) responsesPerMessage - } else { + else { val r = headers.get("expectedReplies") - if (r != null) { + if (r != null) r.asInstanceOf[Int] - } else { + else responsesPerMessage - } } } @@ -237,4 +232,4 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf override def toString: String = "AmqpRpcFlow" -} +} \ No newline at end of file diff --git a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpSourceStage.scala b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpSourceStage.scala index d09909b1b..c33ec75f0 100644 --- a/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpSourceStage.scala +++ b/amqp/src/main/scala/org/apache/pekko/stream/connectors/amqp/impl/AmqpSourceStage.scala @@ -94,7 +94,6 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi properties: BasicProperties, body: Array[Byte]): Unit = { val message = if (ackRequired) { - new CommittableReadResult { override val message: ReadResult = ReadResult(ByteString(body), envelope, properties) @@ -155,21 +154,19 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi } def handleDelivery(message: CommittableReadResult): Unit = - if (isAvailable(out)) { + if (isAvailable(out)) pushMessage(message) - } else if (queue.size + 1 > bufferSize) { + else if (queue.size + 1 > bufferSize) onFailure(new RuntimeException(s"Reached maximum buffer size $bufferSize")) - } else { + else queue.enqueue(message) - } setHandler( out, new OutHandler { override def onPull(): Unit = - if (queue.nonEmpty) { + if (queue.nonEmpty) pushMessage(queue.dequeue()) - } override def onDownstreamFinish(cause: Throwable): Unit = if (unackedMessages == 0) super.onDownstreamFinish(cause) diff --git a/avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquetBase.scala b/avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquetBase.scala index 580b271a4..f8bbd3256 100644 --- a/avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquetBase.scala +++ b/avroparquet/src/test/scala/docs/scaladsl/AbstractAvroParquetBase.scala @@ -39,9 +39,7 @@ trait AbstractAvroParquetBase { val genFinalFile: Gen[String] = for { fileName <- Gen.alphaLowerStr - } yield { - folder + "/" + fileName + ".parquet" - } + } yield folder + "/" + fileName + ".parquet" val genFile: Gen[String] = Gen.oneOf(Seq(Gen.alphaLowerStr.sample.get + ".parquet")) diff --git a/azure-storage-queue/src/main/scala/org/apache/pekko/stream/connectors/azure/storagequeue/impl/AzureQueueSourceStage.scala b/azure-storage-queue/src/main/scala/org/apache/pekko/stream/connectors/azure/storagequeue/impl/AzureQueueSourceStage.scala index 665581176..9b86e6063 100644 --- a/azure-storage-queue/src/main/scala/org/apache/pekko/stream/connectors/azure/storagequeue/impl/AzureQueueSourceStage.scala +++ b/azure-storage-queue/src/main/scala/org/apache/pekko/stream/connectors/azure/storagequeue/impl/AzureQueueSourceStage.scala @@ -54,9 +54,8 @@ import scala.collection.mutable.Queue if (res.isEmpty) { settings.retrieveRetryTimeout match { case Some(timeout) => - if (isAvailable(out)) { + if (isAvailable(out)) scheduleOnce(NotUsed, timeout) - } case None => complete(out) } } else { @@ -69,11 +68,10 @@ import scala.collection.mutable.Queue out, new OutHandler { override def onPull(): Unit = - if (buffer.nonEmpty) { + if (buffer.nonEmpty) push(out, buffer.dequeue()) - } else { + else retrieveMessages() - } }) } } diff --git a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/CqlSessionProvider.scala b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/CqlSessionProvider.scala index 0adceed52..7520e7659 100644 --- a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/CqlSessionProvider.scala +++ b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/CqlSessionProvider.scala @@ -54,15 +54,14 @@ class DefaultSessionProvider(system: ActorSystem, config: Config) extends CqlSes */ private def usePekkoDiscovery(config: Config): Boolean = config.getString("service-discovery.name").nonEmpty - override def connect()(implicit ec: ExecutionContext): Future[CqlSession] = { - if (usePekkoDiscovery(config)) { + override def connect()(implicit ec: ExecutionContext): Future[CqlSession] = + if (usePekkoDiscovery(config)) PekkoDiscoverySessionProvider.connect(system, config) - } else { + else { val driverConfig = CqlSessionProvider.driverConfig(system, config) val driverConfigLoader = DriverConfigLoaderFromConfig.fromConfig(driverConfig) CqlSession.builder().withConfigLoader(driverConfigLoader).buildAsync().asScala } - } } object CqlSessionProvider { diff --git a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/DriverConfigLoaderFromConfig.scala b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/DriverConfigLoaderFromConfig.scala index 657b4ed15..f9deda4b9 100644 --- a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/DriverConfigLoaderFromConfig.scala +++ b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/DriverConfigLoaderFromConfig.scala @@ -41,9 +41,7 @@ class DriverConfigLoaderFromConfig(config: Config) extends DriverConfigLoader { private val driverConfig: DriverConfig = new TypesafeDriverConfig(config) - override def getInitialConfig: DriverConfig = { - driverConfig - } + override def getInitialConfig: DriverConfig = driverConfig override def onDriverInit(context: DriverContext): Unit = () diff --git a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/PekkoDiscoverySessionProvider.scala b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/PekkoDiscoverySessionProvider.scala index 80b41b127..4898370e7 100644 --- a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/PekkoDiscoverySessionProvider.scala +++ b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/PekkoDiscoverySessionProvider.scala @@ -66,7 +66,7 @@ import scala.concurrent.{ ExecutionContext, Future } */ private[cassandra] object PekkoDiscoverySessionProvider { - def connect(system: ActorSystem, config: Config)(implicit ec: ExecutionContext): Future[CqlSession] = { + def connect(system: ActorSystem, config: Config)(implicit ec: ExecutionContext): Future[CqlSession] = readNodes(config)(system, ec).flatMap { contactPoints => val driverConfigWithContactPoints = ConfigFactory.parseString(s""" basic.contact-points = [${contactPoints.mkString("\"", "\", \"", "\"")}] @@ -74,7 +74,6 @@ private[cassandra] object PekkoDiscoverySessionProvider { val driverConfigLoader = DriverConfigLoaderFromConfig.fromConfig(driverConfigWithContactPoints) CqlSession.builder().withConfigLoader(driverConfigLoader).buildAsync().asScala } - } def connect(system: ClassicActorSystemProvider, config: Config)(implicit ec: ExecutionContext): Future[CqlSession] = connect(system.classicSystem, config) @@ -96,7 +95,7 @@ private[cassandra] object PekkoDiscoverySessionProvider { private def readNodes( serviceName: String, lookupTimeout: FiniteDuration)( - implicit system: ActorSystem, ec: ExecutionContext): Future[immutable.Seq[String]] = { + implicit system: ActorSystem, ec: ExecutionContext): Future[immutable.Seq[String]] = Discovery(system).discovery.lookup(serviceName, lookupTimeout).map { resolved => resolved.addresses.map { target => target.host + ":" + target.port.getOrElse { @@ -105,6 +104,5 @@ private[cassandra] object PekkoDiscoverySessionProvider { } } } - } } diff --git a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraFlow.scala b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraFlow.scala index e9062138f..c43413861 100644 --- a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraFlow.scala +++ b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraFlow.scala @@ -60,12 +60,11 @@ object CassandraFlow { writeSettings: CassandraWriteSettings, cqlStatement: String, statementBinder: pekko.japi.Function2[T, PreparedStatement, BoundStatement]) - : FlowWithContext[T, Ctx, T, Ctx, NotUsed] = { + : FlowWithContext[T, Ctx, T, Ctx, NotUsed] = scaladsl.CassandraFlow .withContext(writeSettings, cqlStatement, (t, preparedStatement) => statementBinder.apply(t, preparedStatement))( session.delegate) .asJava - } /** * Creates a flow that uses [[com.datastax.oss.driver.api.core.cql.BatchStatement]] and groups the @@ -92,13 +91,12 @@ object CassandraFlow { writeSettings: CassandraWriteSettings, cqlStatement: String, statementBinder: (T, PreparedStatement) => BoundStatement, - groupingKey: pekko.japi.Function[T, K]): Flow[T, T, NotUsed] = { + groupingKey: pekko.japi.Function[T, K]): Flow[T, T, NotUsed] = scaladsl.CassandraFlow .createBatch(writeSettings, cqlStatement, (t, preparedStatement) => statementBinder.apply(t, preparedStatement), t => groupingKey.apply(t))(session.delegate) .asJava - } } diff --git a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraFlow.scala b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraFlow.scala index 2281f614a..e239ec19a 100644 --- a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraFlow.scala +++ b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraFlow.scala @@ -42,7 +42,7 @@ object CassandraFlow { writeSettings: CassandraWriteSettings, cqlStatement: String, statementBinder: (T, PreparedStatement) => BoundStatement)( - implicit session: CassandraSession): Flow[T, T, NotUsed] = { + implicit session: CassandraSession): Flow[T, T, NotUsed] = Flow .lazyFutureFlow { () => val prepare = session.prepare(cqlStatement) @@ -55,7 +55,6 @@ object CassandraFlow { }(session.ec) } .mapMaterializedValue(_ => NotUsed) - } /** * A flow writing to Cassandra for every stream element, passing context along. @@ -72,7 +71,7 @@ object CassandraFlow { writeSettings: CassandraWriteSettings, cqlStatement: String, statementBinder: (T, PreparedStatement) => BoundStatement)( - implicit session: CassandraSession): FlowWithContext[T, Ctx, T, Ctx, NotUsed] = { + implicit session: CassandraSession): FlowWithContext[T, Ctx, T, Ctx, NotUsed] = FlowWithContext.fromTuples { Flow .lazyFutureFlow { () => @@ -88,7 +87,6 @@ object CassandraFlow { } .mapMaterializedValue(_ => NotUsed) } - } /** * Creates a flow that uses [[com.datastax.oss.driver.api.core.cql.BatchStatement]] and groups the @@ -114,7 +112,7 @@ object CassandraFlow { def createBatch[T, K](writeSettings: CassandraWriteSettings, cqlStatement: String, statementBinder: (T, PreparedStatement) => BoundStatement, - groupingKey: T => K)(implicit session: CassandraSession): Flow[T, T, NotUsed] = { + groupingKey: T => K)(implicit session: CassandraSession): Flow[T, T, NotUsed] = Flow .lazyFutureFlow { () => val prepareStatement: Future[PreparedStatement] = session.prepare(cqlStatement) @@ -132,5 +130,4 @@ object CassandraFlow { }(session.ec) } .mapMaterializedValue(_ => NotUsed) - } } diff --git a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSession.scala b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSession.scala index 5d77b0113..4ef5c0df1 100644 --- a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSession.scala +++ b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSession.scala @@ -62,9 +62,9 @@ final class CassandraSession(system: pekko.actor.ActorSystem, private val _underlyingSession: Future[CqlSession] = sessionProvider .connect() .flatMap { cqlSession => - cqlSession.getMetrics.ifPresent(metrics => { + cqlSession.getMetrics.ifPresent { metrics => CassandraMetricsRegistry(system).addMetrics(metricsCategory, metrics.getRegistry) - }) + } init(cqlSession).map(_ => cqlSession) } .recover { @@ -94,7 +94,7 @@ final class CassandraSession(system: pekko.actor.ActorSystem, /** * Meta data about the Cassandra server, such as its version. */ - def serverMetaData: Future[CassandraServerMetaData] = { + def serverMetaData: Future[CassandraServerMetaData] = cachedServerMetaData match { case OptionVal.Some(cached) => cached @@ -122,7 +122,6 @@ final class CassandraSession(system: pekko.actor.ActorSystem, result case other => throw new MatchError(other) } - } /** * Execute CQL commands @@ -171,11 +170,10 @@ final class CassandraSession(system: pekko.actor.ActorSystem, * The returned `Future` is completed when the statement has been * successfully executed, or if it fails. */ - def executeWrite(stmt: Statement[_]): Future[Done] = { + def executeWrite(stmt: Statement[_]): Future[Done] = underlying().flatMap { cqlSession => cqlSession.executeAsync(stmt).asScala.map(_ => Done) } - } /** * Prepare, bind and execute one statement in one go. @@ -187,18 +185,16 @@ final class CassandraSession(system: pekko.actor.ActorSystem, * The returned `Future` is completed when the statement has been * successfully executed, or if it fails. */ - def executeWrite(stmt: String, bindValues: AnyRef*): Future[Done] = { + def executeWrite(stmt: String, bindValues: AnyRef*): Future[Done] = bind(stmt, bindValues).flatMap(b => executeWrite(b)) - } /** * INTERNAL API */ - @InternalApi private[pekko] def selectResultSet(stmt: Statement[_]): Future[AsyncResultSet] = { + @InternalApi private[pekko] def selectResultSet(stmt: Statement[_]): Future[AsyncResultSet] = underlying().flatMap { s => s.executeAsync(stmt).asScala } - } /** * Execute a select statement. First you must `prepare` the @@ -212,7 +208,7 @@ final class CassandraSession(system: pekko.actor.ActorSystem, * Note that you have to connect a `Sink` that consumes the messages from * this `Source` and then `run` the stream. */ - def select(stmt: Statement[_]): Source[Row, NotUsed] = { + def select(stmt: Statement[_]): Source[Row, NotUsed] = Source .futureSource { underlying().map { cqlSession => @@ -220,7 +216,6 @@ final class CassandraSession(system: pekko.actor.ActorSystem, } } .mapMaterializedValue(_ => NotUsed) - } /** * Execute a select statement created by `prepare`. @@ -233,7 +228,7 @@ final class CassandraSession(system: pekko.actor.ActorSystem, * Note that you have to connect a `Sink` that consumes the messages from * this `Source` and then `run` the stream. */ - def select(stmt: Future[Statement[_]]): Source[Row, NotUsed] = { + def select(stmt: Future[Statement[_]]): Source[Row, NotUsed] = Source .futureSource { underlying().flatMap(cqlSession => stmt.map(cqlSession -> _)).map { @@ -242,7 +237,6 @@ final class CassandraSession(system: pekko.actor.ActorSystem, } } .mapMaterializedValue(_ => NotUsed) - } /** * Prepare, bind and execute a select statement in one go. @@ -254,9 +248,8 @@ final class CassandraSession(system: pekko.actor.ActorSystem, * Note that you have to connect a `Sink` that consumes the messages from * this `Source` and then `run` the stream. */ - def select(stmt: String, bindValues: AnyRef*): Source[Row, NotUsed] = { + def select(stmt: String, bindValues: AnyRef*): Source[Row, NotUsed] = select(bind(stmt, bindValues)) - } /** * Execute a select statement. First you must `prepare` the statement and @@ -269,11 +262,10 @@ final class CassandraSession(system: pekko.actor.ActorSystem, * * The returned `Future` is completed with the found rows. */ - def selectAll(stmt: Statement[_]): Future[immutable.Seq[Row]] = { + def selectAll(stmt: Statement[_]): Future[immutable.Seq[Row]] = select(stmt) .runWith(Sink.seq) .map(_.toVector) // Sink.seq returns Seq, not immutable.Seq (compilation issue in Eclipse) - } /** * Prepare, bind and execute a select statement in one go. Only use this method @@ -284,9 +276,8 @@ final class CassandraSession(system: pekko.actor.ActorSystem, * * The returned `Future` is completed with the found rows. */ - def selectAll(stmt: String, bindValues: AnyRef*): Future[immutable.Seq[Row]] = { + def selectAll(stmt: String, bindValues: AnyRef*): Future[immutable.Seq[Row]] = bind(stmt, bindValues).flatMap(bs => selectAll(bs)) - } /** * Execute a select statement that returns one row. First you must `prepare` the @@ -298,11 +289,10 @@ final class CassandraSession(system: pekko.actor.ActorSystem, * The returned `Future` is completed with the first row, * if any. */ - def selectOne(stmt: Statement[_]): Future[Option[Row]] = { + def selectOne(stmt: Statement[_]): Future[Option[Row]] = selectResultSet(stmt).map { rs => Option(rs.one()) // rs.one returns null if exhausted } - } /** * Prepare, bind and execute a select statement that returns one row. @@ -312,15 +302,13 @@ final class CassandraSession(system: pekko.actor.ActorSystem, * The returned `Future` is completed with the first row, * if any. */ - def selectOne(stmt: String, bindValues: AnyRef*): Future[Option[Row]] = { + def selectOne(stmt: String, bindValues: AnyRef*): Future[Option[Row]] = bind(stmt, bindValues).flatMap(bs => selectOne(bs)) - } - private def bind(stmt: String, bindValues: Seq[AnyRef]): Future[BoundStatement] = { + private def bind(stmt: String, bindValues: Seq[AnyRef]): Future[BoundStatement] = prepare(stmt).map { ps => if (bindValues.isEmpty) ps.bind() else ps.bind(bindValues: _*) } - } } diff --git a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSessionRegistry.scala b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSessionRegistry.scala index 617529686..63d209e73 100644 --- a/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSessionRegistry.scala +++ b/cassandra/src/main/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSessionRegistry.scala @@ -83,9 +83,8 @@ final class CassandraSessionRegistry(system: ExtendedActorSystem) extends Extens * Note that the session must not be stopped manually, it is shut down when the actor system is shutdown, * if you need a more fine grained life cycle control, create the CassandraSession manually instead. */ - def sessionFor(settings: CassandraSessionSettings): CassandraSession = { + def sessionFor(settings: CassandraSessionSettings): CassandraSession = sessionFor(settings, system.settings.config.getConfig(settings.configPath)) - } /** * INTERNAL API: Possibility to initialize the `SessionProvider` with a custom `Config` diff --git a/cassandra/src/test/scala/docs/scaladsl/CassandraSourceSpec.scala b/cassandra/src/test/scala/docs/scaladsl/CassandraSourceSpec.scala index 5b5c1c39d..fef0379f8 100644 --- a/cassandra/src/test/scala/docs/scaladsl/CassandraSourceSpec.scala +++ b/cassandra/src/test/scala/docs/scaladsl/CassandraSourceSpec.scala @@ -110,7 +110,7 @@ class CassandraSourceSpec extends CassandraSpecBase(ActorSystem("CassandraSource } - private def prepareIntTable(table: String) = { + private def prepareIntTable(table: String) = withSchemaMetadataDisabled { for { _ <- lifecycleSession.executeDDL(s""" @@ -120,5 +120,4 @@ class CassandraSourceSpec extends CassandraSpecBase(ActorSystem("CassandraSource _ <- executeCql(data.map(i => s"INSERT INTO $table(id) VALUES ($i)")) } yield Done }.futureValue mustBe Done - } } diff --git a/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSessionSpec.scala b/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSessionSpec.scala index 0aeec5fe1..89138511c 100644 --- a/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSessionSpec.scala +++ b/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/javadsl/CassandraSessionSpec.scala @@ -46,7 +46,7 @@ final class CassandraSessionSpec extends CassandraSpecBase(ActorSystem("Cassandr private val dataTableName = "testcounts" lazy val dataTable = s"$keyspaceName.$dataTableName" - def insertDataTable() = { + def insertDataTable() = withSchemaMetadataDisabled { for { _ <- lifecycleSession.executeDDL(s"""CREATE TABLE IF NOT EXISTS $dataTable ( @@ -66,7 +66,6 @@ final class CassandraSessionSpec extends CassandraSpecBase(ActorSystem("Cassandr s"INSERT INTO $dataTable (partition, key, count) VALUES ('B', 'f', 6);")) } yield Done }.futureValue mustBe Done - } override def beforeAll(): Unit = { super.beforeAll() diff --git a/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraLifecycle.scala b/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraLifecycle.scala index 01d67fd96..9ff47fd89 100644 --- a/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraLifecycle.scala +++ b/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraLifecycle.scala @@ -40,19 +40,17 @@ trait CassandraLifecycleBase { session.executeWriteBatch(batch.build()) } - def executeCql(session: CassandraSession, statements: immutable.Seq[String]): Future[Done] = { + def executeCql(session: CassandraSession, statements: immutable.Seq[String]): Future[Done] = execute(session, statements.map(stmt => SimpleStatement.newInstance(stmt))) - } private val keyspaceTimeout = java.time.Duration.ofSeconds(15) - def createKeyspace(session: CassandraSession, name: String): Future[Done] = { + def createKeyspace(session: CassandraSession, name: String): Future[Done] = session.executeWrite( new SimpleStatementBuilder( s"""CREATE KEYSPACE $name WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1'};""").setTimeout( keyspaceTimeout) .build()) - } def dropKeyspace(session: CassandraSession, name: String): Future[Done] = session.executeWrite( @@ -112,9 +110,9 @@ trait CassandraLifecycle extends BeforeAndAfterAll with TestKitBase with Cassand // so needs to run before the actor system is shut down dropKeyspace(keyspaceName).futureValue(PatienceConfiguration.Timeout(15.seconds)) shutdown(system, verifySystemShutdown = true) - try { + try Await.result(lifecycleSession.close(scala.concurrent.ExecutionContext.global), 20.seconds) - } catch { + catch { case NonFatal(e) => e.printStackTrace(System.err) } diff --git a/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSessionPerformanceSpec.scala b/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSessionPerformanceSpec.scala index 00fb120c6..87bb6756f 100644 --- a/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSessionPerformanceSpec.scala +++ b/cassandra/src/test/scala/org/apache/pekko/stream/connectors/cassandra/scaladsl/CassandraSessionPerformanceSpec.scala @@ -45,7 +45,7 @@ final class CassandraSessionPerformanceSpec extends CassandraSpecBase(ActorSyste // only using one primary key in this test private val id = "1" - def insertDataTable() = { + def insertDataTable() = lifecycleSession .executeDDL(s"""CREATE TABLE IF NOT EXISTS $dataTable ( | partition_id bigint, @@ -66,7 +66,6 @@ final class CassandraSessionPerformanceSpec extends CassandraSpecBase(ActorSyste .runWith(Sink.ignore) } .futureValue - } override def beforeAll(): Unit = { super.beforeAll() diff --git a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/CouchbaseSessionRegistry.scala b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/CouchbaseSessionRegistry.scala index cbac4820e..e51176b36 100644 --- a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/CouchbaseSessionRegistry.scala +++ b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/CouchbaseSessionRegistry.scala @@ -103,10 +103,9 @@ final class CouchbaseSessionRegistry(system: ExtendedActorSystem) extends Extens ExecutionContexts.parasitic) promise.completeWith(session) promise.future - } else { + } else // we lost cas (could be concurrent call for some other key though), retry startSession(key) - } } } diff --git a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseClusterRegistry.scala b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseClusterRegistry.scala index 919ff5459..29aa60daf 100644 --- a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseClusterRegistry.scala +++ b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseClusterRegistry.scala @@ -64,10 +64,9 @@ final private[couchbase] class CouchbaseClusterRegistry(system: ActorSystem) { }(system.dispatcher) } future - } else { + } else // we lost cas (could be concurrent call for some other settings though), retry createClusterClient(settings) - } } } diff --git a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseSessionImpl.scala b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseSessionImpl.scala index a4584b1f0..294e07258 100644 --- a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseSessionImpl.scala +++ b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/impl/CouchbaseSessionImpl.scala @@ -159,9 +159,8 @@ final private[couchbase] class CouchbaseSessionImpl(asyncBucket: AsyncBucket, cl case None => Future.successful(Done) } }(ExecutionContexts.global()) - } else { + } else Future.successful(Done) - } override def toString: String = s"CouchbaseSession(${asyncBucket.name()})" diff --git a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseFlow.scala b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseFlow.scala index ad35b1cee..d5f424fea 100644 --- a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseFlow.scala +++ b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseFlow.scala @@ -105,7 +105,7 @@ object CouchbaseFlow { .fromMaterializer { (materializer, _) => val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) Flow[T] - .mapAsync(writeSettings.parallelism)(doc => { + .mapAsync(writeSettings.parallelism) { doc => implicit val executor: ExecutionContext = materializer.system.dispatcher session .flatMap(_.upsertDoc(doc, writeSettings)) @@ -113,7 +113,7 @@ object CouchbaseFlow { .recover { case exception => CouchbaseWriteFailure(doc, exception) } - }) + } } flow.mapMaterializedValue(_ => NotUsed) } @@ -159,7 +159,7 @@ object CouchbaseFlow { .fromMaterializer { (materializer, _) => val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) Flow[T] - .mapAsync(writeSettings.parallelism)(doc => { + .mapAsync(writeSettings.parallelism) { doc => implicit val executor: ExecutionContext = materializer.system.dispatcher session .flatMap(_.replaceDoc(doc, writeSettings)) @@ -167,7 +167,7 @@ object CouchbaseFlow { .recover { case exception => CouchbaseWriteFailure(doc, exception) } - }) + } } flow.mapMaterializedValue(_ => NotUsed) } @@ -182,12 +182,12 @@ object CouchbaseFlow { .fromMaterializer { (materializer, _) => val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) Flow[String] - .mapAsync(writeSettings.parallelism)(id => { + .mapAsync(writeSettings.parallelism) { id => implicit val executor: ExecutionContext = materializer.system.dispatcher session .flatMap(_.remove(id, writeSettings)) .map(_ => id) - }) + } } .mapMaterializedValue(_ => NotUsed) @@ -201,7 +201,7 @@ object CouchbaseFlow { .fromMaterializer { (materializer, _) => val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) Flow[String] - .mapAsync(writeSettings.parallelism)(id => { + .mapAsync(writeSettings.parallelism) { id => implicit val executor: ExecutionContext = materializer.system.dispatcher session .flatMap(_.remove(id, writeSettings)) @@ -209,7 +209,7 @@ object CouchbaseFlow { .recover { case exception => CouchbaseDeleteFailure(id, exception) } - }) + } } .mapMaterializedValue(_ => NotUsed) } diff --git a/csv/src/main/scala/org/apache/pekko/stream/connectors/csv/impl/CsvFormatter.scala b/csv/src/main/scala/org/apache/pekko/stream/connectors/csv/impl/CsvFormatter.scala index 36f248a74..222ce37c4 100644 --- a/csv/src/main/scala/org/apache/pekko/stream/connectors/csv/impl/CsvFormatter.scala +++ b/csv/src/main/scala/org/apache/pekko/stream/connectors/csv/impl/CsvFormatter.scala @@ -65,45 +65,39 @@ import scala.collection.immutable while (index > -1) { builder ++= ByteString.apply(field.substring(lastIndex, index), charsetName) val char = field.charAt(index) - if (char == quoteChar) { + if (char == quoteChar) builder ++= duplicatedQuote - } else { + else builder ++= duplicatedEscape - } lastIndex = index + 1 index = indexOfQuoteOrEscape(lastIndex) } - if (lastIndex < field.length) { + if (lastIndex < field.length) builder ++= ByteString(field.substring(lastIndex), charsetName) - } } def append(field: String) = { val (quoteIt, splitAt) = requiresQuotesOrSplit(field) if (quoteIt || quotingStyle == CsvQuotingStyle.Always) { builder ++= quoteBs - if (splitAt != -1) { + if (splitAt != -1) splitAndDuplicateQuotesAndEscapes(field, splitAt) - } else { + else builder ++= ByteString(field, charsetName) - } builder ++= quoteBs - } else { + } else builder ++= ByteString(field, charsetName) - } } val iterator = fields.iterator var hasNext = iterator.hasNext while (hasNext) { val next = iterator.next() - if (next != null) { + if (next != null) append(next.toString) - } hasNext = iterator.hasNext - if (hasNext) { + if (hasNext) builder ++= delimiterBs - } } builder ++= endOfLineBs builder.result() diff --git a/csv/src/main/scala/org/apache/pekko/stream/connectors/csv/impl/CsvParser.scala b/csv/src/main/scala/org/apache/pekko/stream/connectors/csv/impl/CsvParser.scala index 4f5e8c43c..1a837e11d 100644 --- a/csv/src/main/scala/org/apache/pekko/stream/connectors/csv/impl/CsvParser.scala +++ b/csv/src/main/scala/org/apache/pekko/stream/connectors/csv/impl/CsvParser.scala @@ -128,9 +128,8 @@ import scala.collection.mutable val line = maybeExtractLine(requireLineEnd) if (line.nonEmpty) { currentLineNo += 1 - if (state == LineEnd || !requireLineEnd) { + if (state == LineEnd || !requireLineEnd) state = LineStart - } resetLine() columns.clear() } @@ -192,7 +191,7 @@ import scala.collection.mutable s"wrong escaping at $currentLineNo:$lineLength, no character after escape") private[this] def checkForByteOrderMark(): Unit = - if (buffer.length >= 2) { + if (buffer.length >= 2) if (buffer.startsWith(ByteOrderMark.UTF_8)) { advance(3) fieldStart = 3 @@ -207,7 +206,6 @@ import scala.collection.mutable throw new UnsupportedCharsetException("UTF-32 BE") } } - } private[this] def parseLine(): Unit = { if (firstData) { diff --git a/csv/src/main/scala/org/apache/pekko/stream/connectors/csv/impl/CsvToMapJavaStage.scala b/csv/src/main/scala/org/apache/pekko/stream/connectors/csv/impl/CsvToMapJavaStage.scala index adefbcf61..0b8f7cf52 100644 --- a/csv/src/main/scala/org/apache/pekko/stream/connectors/csv/impl/CsvToMapJavaStage.scala +++ b/csv/src/main/scala/org/apache/pekko/stream/connectors/csv/impl/CsvToMapJavaStage.scala @@ -43,7 +43,7 @@ import pekko.util.ByteString private val in = Inlet[ju.Collection[ByteString]]("CsvToMap.in") private val out = Outlet[ju.Map[String, V]]("CsvToMap.out") - override val shape = FlowShape.of(in, out) + override val shape: FlowShape[ju.Collection[ByteString], ju.Map[String, V]] = FlowShape.of(in, out) val fieldValuePlaceholder: V @@ -65,15 +65,14 @@ import pekko.util.ByteString new InHandler { override def onPush(): Unit = { val elem = grab(in) - if (combineAll) { + if (combineAll) process(elem, zipAllWithHeaders) - } else { + else process(elem, zipWithHeaders) - } } }) - private def process(elem: ju.Collection[ByteString], combine: ju.Collection[V] => ju.Map[String, V]): Unit = { + private def process(elem: ju.Collection[ByteString], combine: ju.Collection[V] => ju.Map[String, V]): Unit = if (headers.isPresent) { val map = combine(transformElements(elem)) push(out, map) @@ -81,7 +80,6 @@ import pekko.util.ByteString headers = ju.Optional.of(decode(elem)) pull(in) } - } setHandler(out, new OutHandler { @@ -92,9 +90,8 @@ import pekko.util.ByteString val map = new ju.HashMap[String, V]() val hIter = headers.get.iterator() val colIter = elem.iterator() - while (hIter.hasNext && colIter.hasNext) { + while (hIter.hasNext && colIter.hasNext) map.put(hIter.next(), colIter.next()) - } map } @@ -102,29 +99,25 @@ import pekko.util.ByteString val map = new ju.HashMap[String, V]() val hIter = headers.get.iterator() val colIter = elem.iterator() - if (headers.get.size() > elem.size()) { - while (hIter.hasNext) { - if (colIter.hasNext) { + if (headers.get.size() > elem.size()) + while (hIter.hasNext) + if (colIter.hasNext) map.put(hIter.next(), colIter.next()) - } else { + else map.put(hIter.next(), customFieldValuePlaceholder.orElse(fieldValuePlaceholder)) - } - } - } else if (elem.size() > headers.get.size()) { + else if (elem.size() > headers.get.size()) { var index = 0 - while (colIter.hasNext) { - if (hIter.hasNext) { + while (colIter.hasNext) + if (hIter.hasNext) map.put(hIter.next(), colIter.next()) - } else { + else { map.put(headerPlaceholder.orElse("MissingHeader") + index, colIter.next()) index = index + 1 } - } } else { - while (hIter.hasNext && colIter.hasNext) { + while (hIter.hasNext && colIter.hasNext) map.put(hIter.next(), colIter.next()) - } } map } diff --git a/csv/src/main/scala/org/apache/pekko/stream/connectors/csv/impl/CsvToMapStage.scala b/csv/src/main/scala/org/apache/pekko/stream/connectors/csv/impl/CsvToMapStage.scala index 30447a779..e1a16d218 100644 --- a/csv/src/main/scala/org/apache/pekko/stream/connectors/csv/impl/CsvToMapStage.scala +++ b/csv/src/main/scala/org/apache/pekko/stream/connectors/csv/impl/CsvToMapStage.scala @@ -59,21 +59,19 @@ import scala.collection.immutable override def onPush(): Unit = { val elem = grab(in) - if (combineAll) { + if (combineAll) process(elem, combineUsingPlaceholder(elem)) - } else { + else process(elem, headers => headers.get.zip(transformElements(elem)).toMap) - } } - private def process(elem: immutable.Seq[ByteString], combine: => Headers => Map[String, V]): Unit = { - if (headers.isDefined) { + private def process(elem: immutable.Seq[ByteString], combine: => Headers => Map[String, V]): Unit = + if (headers.isDefined) push(out, combine(headers)) - } else { + else { headers = Some(elem.map(_.decodeString(charset))) pull(in) } - } override def onPull(): Unit = pull(in) } diff --git a/csv/src/test/scala/docs/scaladsl/CsvParsingSpec.scala b/csv/src/test/scala/docs/scaladsl/CsvParsingSpec.scala index 71a7d797f..4117e175e 100644 --- a/csv/src/test/scala/docs/scaladsl/CsvParsingSpec.scala +++ b/csv/src/test/scala/docs/scaladsl/CsvParsingSpec.scala @@ -193,56 +193,49 @@ class CsvParsingSpec extends CsvSpec { .map(_.view.mapValues(_.utf8String).toIndexedSeq) .runWith(Sink.seq) val res = fut.futureValue - res(0) should contain allElementsOf ( - Map( - "Year" -> "1997", - "Make" -> "Ford", - "Model" -> "E350", - "Description" -> "ac, abs, moon", - "Price" -> "3000.00")) - res(1) should contain allElementsOf ( - Map( - "Year" -> "1999", - "Make" -> "Chevy", - "Model" -> "Venture \"Extended Edition\"", - "Description" -> "", - "Price" -> "4900.00")) - res(2) should contain allElementsOf ( - Map( - "Year" -> "1996", - "Make" -> "Jeep", - "Model" -> "Grand Cherokee", - "Description" -> """MUST SELL! + res(0) should contain allElementsOf Map( + "Year" -> "1997", + "Make" -> "Ford", + "Model" -> "E350", + "Description" -> "ac, abs, moon", + "Price" -> "3000.00") + res(1) should contain allElementsOf Map( + "Year" -> "1999", + "Make" -> "Chevy", + "Model" -> "Venture \"Extended Edition\"", + "Description" -> "", + "Price" -> "4900.00") + res(2) should contain allElementsOf Map( + "Year" -> "1996", + "Make" -> "Jeep", + "Model" -> "Grand Cherokee", + "Description" -> """MUST SELL! |air, moon roof, loaded""".stripMargin, - "Price" -> "4799.00")) - res(3) should contain allElementsOf ( - Map( - "Year" -> "1999", - "Make" -> "Chevy", - "Model" -> "Venture \"Extended Edition, Very Large\"", - "Description" -> "", - "Price" -> "5000.00")) - res(4) should contain allElementsOf ( - Map( - "Year" -> "", - "Make" -> "", - "Model" -> "Venture \"Extended Edition\"", - "Description" -> "", - "Price" -> "4900.00")) - res(5) should contain allElementsOf ( - Map( - "Year" -> "1995", - "Make" -> "VW", - "Model" -> "Golf \"GTE\"", - "Description" -> "", - "Price" -> "5000.00")) - res(6) should contain allElementsOf ( - Map( - "Year" -> "1996", - "Make" -> "VW", - "Model" -> "Golf GTE", - "Description" -> "", - "Price" -> "5000.00")) + "Price" -> "4799.00") + res(3) should contain allElementsOf Map( + "Year" -> "1999", + "Make" -> "Chevy", + "Model" -> "Venture \"Extended Edition, Very Large\"", + "Description" -> "", + "Price" -> "5000.00") + res(4) should contain allElementsOf Map( + "Year" -> "", + "Make" -> "", + "Model" -> "Venture \"Extended Edition\"", + "Description" -> "", + "Price" -> "4900.00") + res(5) should contain allElementsOf Map( + "Year" -> "1995", + "Make" -> "VW", + "Model" -> "Golf \"GTE\"", + "Description" -> "", + "Price" -> "5000.00") + res(6) should contain allElementsOf Map( + "Year" -> "1996", + "Make" -> "VW", + "Model" -> "Golf GTE", + "Description" -> "", + "Price" -> "5000.00") } } } diff --git a/doc-examples/src/test/scala/org/apache/pekko/stream/connectors/eip/scaladsl/PassThroughExamples.scala b/doc-examples/src/test/scala/org/apache/pekko/stream/connectors/eip/scaladsl/PassThroughExamples.scala index 50bdd127b..1bb5c8843 100644 --- a/doc-examples/src/test/scala/org/apache/pekko/stream/connectors/eip/scaladsl/PassThroughExamples.scala +++ b/doc-examples/src/test/scala/org/apache/pekko/stream/connectors/eip/scaladsl/PassThroughExamples.scala @@ -85,19 +85,17 @@ object PassThroughFlow { def apply[A, T, O](processingFlow: Flow[A, T, NotUsed], output: (T, A) => O): Graph[FlowShape[A, O], NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder => - { - import GraphDSL.Implicits._ + import GraphDSL.Implicits._ - val broadcast = builder.add(Broadcast[A](2)) - val zip = builder.add(ZipWith[T, A, O]((left, right) => output(left, right))) + val broadcast = builder.add(Broadcast[A](2)) + val zip = builder.add(ZipWith[T, A, O]((left, right) => output(left, right))) // format: off broadcast.out(0) ~> processingFlow ~> zip.in0 broadcast.out(1) ~> zip.in1 // format: on - FlowShape(broadcast.in, zip.out) - } + FlowShape(broadcast.in, zip.out) }) } //#PassThrough diff --git a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/SourceSettingsBase.scala b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/SourceSettingsBase.scala index d74ccb06a..2714eef32 100644 --- a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/SourceSettingsBase.scala +++ b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/SourceSettingsBase.scala @@ -65,6 +65,6 @@ abstract class SourceSettingsBase[Version <: ApiVersionBase, S <: SourceSettings bufferSize: Int = bufferSize, includeDocumentVersion: Boolean = includeDocumentVersion, scrollDuration: FiniteDuration = scrollDuration, - apiVersion: Version = apiVersion): S; + apiVersion: Version = apiVersion): S } diff --git a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/WriteSettingsBase.scala b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/WriteSettingsBase.scala index 7acf6fb17..a82c423eb 100644 --- a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/WriteSettingsBase.scala +++ b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/WriteSettingsBase.scala @@ -43,5 +43,5 @@ abstract class WriteSettingsBase[Version <: ApiVersionBase, W <: WriteSettingsBa retryLogic: RetryLogic = retryLogic, versionType: Option[String] = versionType, apiVersion: Version = apiVersion, - allowExplicitIndex: Boolean = allowExplicitIndex): W; + allowExplicitIndex: Boolean = allowExplicitIndex): W } diff --git a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchApi.scala b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchApi.scala index 5f49006d5..358138bce 100644 --- a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchApi.scala +++ b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchApi.scala @@ -25,14 +25,12 @@ import scala.concurrent.Future @InternalApi private[impl] object ElasticsearchApi { def executeRequest( request: HttpRequest, - connectionSettings: ElasticsearchConnectionSettings)(implicit http: HttpExt): Future[HttpResponse] = { - if (connectionSettings.hasCredentialsDefined) { + connectionSettings: ElasticsearchConnectionSettings)(implicit http: HttpExt): Future[HttpResponse] = + if (connectionSettings.hasCredentialsDefined) http.singleRequest( request.addCredentials(BasicHttpCredentials(connectionSettings.username.get, connectionSettings.password.get))) - } else { + else http.singleRequest(request, connectionContext = connectionSettings.connectionContext.getOrElse(http.defaultClientHttpsContext)) - } - } } diff --git a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala index 1bd92cc3f..b7ddb7426 100644 --- a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala +++ b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala @@ -145,14 +145,13 @@ private[elasticsearch] final class ElasticsearchSimpleFlowStage[T, C]( } val messageResults = restApi.toWriteResults(messages, response) - if (log.isErrorEnabled) { + if (log.isErrorEnabled) messageResults.filterNot(_.success).foreach { failure => if (failure.getError.isPresent) { log.error(s"Received error from elastic when attempting to index documents. Error: {}", failure.getError.get) } } - } emit(out, messageResults ++ resultsPassthrough) if (isClosed(in)) completeStage() @@ -160,9 +159,8 @@ private[elasticsearch] final class ElasticsearchSimpleFlowStage[T, C]( } private def tryPull(): Unit = - if (!isClosed(in) && !hasBeenPulled(in)) { + if (!isClosed(in) && !hasBeenPulled(in)) pull(in) - } override def onUpstreamFinish(): Unit = if (!inflight) completeStage() diff --git a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourceStage.scala b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourceStage.scala index 021a8f5b1..9d40b658f 100644 --- a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourceStage.scala +++ b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourceStage.scala @@ -74,9 +74,8 @@ private[elasticsearch] final class ElasticsearchSourceStage[T]( } object ElasticsearchSourceStage { - def validate(indexName: String): Unit = { + def validate(indexName: String): Unit = require(indexName != null, "You must define an index name") - } } /** @@ -102,43 +101,39 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T]( private var pullIsWaitingForData = false private var dataReady: Option[ScrollResponse[T]] = None - def prepareUri(path: Path): Uri = { + def prepareUri(path: Path): Uri = Uri(settings.connection.baseUrl) .withPath(path) - } def sendScrollScanRequest(): Unit = try { waitingForElasticData = true scrollId match { - case None => { + case None => log.debug("Doing initial search") // Add extra params to search val extraParams = Seq( - if (!searchParams.contains("size")) { + if (!searchParams.contains("size")) Some("size" -> settings.bufferSize.toString) - } else { - None - }, + else + None, // Tell elastic to return the documents '_version'-property with the search-results // http://nocf-www.elastic.co/guide/en/elasticsearch/reference/current/search-request-version.html // https://www.elastic.co/guide/en/elasticsearch/guide/current/optimistic-concurrency-control.html - if (!searchParams.contains("version") && settings.includeDocumentVersion) { + if (!searchParams.contains("version") && settings.includeDocumentVersion) Some("version" -> "true") - } else { - None - }) + else + None) val baseMap = Map("scroll" -> settings.scroll) // only force sorting by _doc (meaning order is not known) if not specified in search params - val sortQueryParam = if (searchParams.contains("sort")) { + val sortQueryParam = if (searchParams.contains("sort")) None - } else { + else Some(("sort", "_doc")) - } val routingQueryParam = searchParams.get("routing").map(r => ("routing", r)) @@ -187,9 +182,8 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T]( .recover { case cause: Throwable => failureHandler.invoke(cause) } - } - case Some(actualScrollId) => { + case Some(actualScrollId) => log.debug("Fetching next scroll") val uri = prepareUri(Path("/_search/scroll")) @@ -221,7 +215,6 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T]( .recover { case cause: Throwable => failureHandler.invoke(cause) } - } } } catch { case ex: Exception => failureHandler.invoke(ex) @@ -240,10 +233,9 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T]( if (pullIsWaitingForData) { log.debug("Received data from elastic. Downstream has already called pull and is waiting for data") pullIsWaitingForData = false - if (handleScrollResponse(scrollResponse)) { + if (handleScrollResponse(scrollResponse)) // we should go and get more data sendScrollScanRequest() - } } else { log.debug("Received data from elastic. Downstream have not yet asked for it") // This is a prefetch of data which we received before downstream has asked for it @@ -284,10 +276,8 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T]( dataReady = None - if (!waitingForElasticData) { + if (!waitingForElasticData) sendScrollScanRequest() - } - } case None => if (pullIsWaitingForData) throw new Exception("This should not happen: Downstream is pulling more than once") @@ -296,9 +286,8 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T]( if (!waitingForElasticData) { log.debug("Downstream is pulling data. We must go and get it") sendScrollScanRequest() - } else { + } else log.debug("Downstream is pulling data. Already waiting for data") - } } /** @@ -317,12 +306,12 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T]( * Complete the stage successfully, whether or not the clear call succeeds. * If the clear call fails, the scroll will eventually timeout. */ - def clearScrollAsync(): Unit = { + def clearScrollAsync(): Unit = scrollId match { case None => log.debug("Scroll Id is empty. Completing stage eagerly.") completeStage() - case Some(actualScrollId) => { + case Some(actualScrollId) => // Clear the scroll val uri = prepareUri(Path(s"/_search/scroll/$actualScrollId")) @@ -336,9 +325,9 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T]( case HttpResponse(StatusCodes.OK, _, responseEntity, _) => Unmarshal(responseEntity) .to[String] - .map(json => { + .map { json => clearScrollAsyncHandler.invoke(Success(json)) - }) + } case response: HttpResponse => Unmarshal(response.entity).to[String].map { body => clearScrollAsyncHandler @@ -350,16 +339,12 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T]( .recover { case cause: Throwable => failureHandler.invoke(cause) } - } } - } private val clearScrollAsyncHandler = getAsyncCallback[Try[String]] { result => - { - // Note: the scroll will expire, so there is no reason to consider a failed - // clear as a reason to fail the stream. - log.debug("Result of clearing the scroll: {}", result) - completeStage() - } + // Note: the scroll will expire, so there is no reason to consider a failed + // clear as a reason to fail the stream. + log.debug("Result of clearing the scroll: {}", result) + completeStage() } } diff --git a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApi.scala b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApi.scala index e17e9a118..c7ef22f32 100644 --- a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApi.scala +++ b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApi.scala @@ -60,20 +60,19 @@ private[impl] abstract class RestBulkApi[T, C] { val ret = new immutable.VectorBuilder[WriteResult[T, C]] val itemsIter = items.elements.iterator messages.foreach { message => - if (message.operation == Nop) { + if (message.operation == Nop) // client just wants to pass-through: ret += new WriteResult(message, None) - } else { + else { if (itemsIter.hasNext) { // good message val command = message.operation.command val res = itemsIter.next().asJsObject.fields(command).asJsObject val error: Option[String] = res.fields.get("error").map(_.compactPrint) ret += new WriteResult(message, error) - } else { + } else // error? ret += new WriteResult(message, None) - } } } ret.result() diff --git a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApiV5.scala b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApiV5.scala index e4e5f2069..11763084d 100644 --- a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApiV5.scala +++ b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApiV5.scala @@ -74,11 +74,10 @@ private[impl] final class RestBulkApiV5[T, C](indexName: String, } override def constructSharedFields(message: WriteMessage[T, C]): Seq[(String, JsString)] = { - val operationFields = if (allowExplicitIndex) { + val operationFields = if (allowExplicitIndex) Seq("_index" -> JsString(message.indexName.getOrElse(indexName)), typeNameTuple) - } else { + else Seq(typeNameTuple) - } operationFields ++ message.customMetadata.map { case (field, value) => field -> JsString(value) } } diff --git a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApiV7.scala b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApiV7.scala index 5e0c896c2..66171bb20 100644 --- a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApiV7.scala +++ b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/impl/RestBulkApiV7.scala @@ -66,11 +66,10 @@ private[impl] final class RestBulkApiV7[T, C](indexName: String, } override def constructSharedFields(message: WriteMessage[T, C]): Seq[(String, JsString)] = { - val operationFields = if (allowExplicitIndex) { + val operationFields = if (allowExplicitIndex) Seq("_index" -> JsString(message.indexName.getOrElse(indexName))) - } else { + else Seq.empty - } operationFields ++ message.customMetadata.map { case (field, value) => field -> JsString(value) } } diff --git a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchSource.scala b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchSource.scala index d3f2b7802..f9bdaa6e9 100644 --- a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchSource.scala +++ b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/javadsl/ElasticsearchSource.scala @@ -50,19 +50,17 @@ object ElasticsearchSource { objectMapper: ObjectMapper): Source[ReadResult[java.util.Map[String, Object]], NotUsed] = Source .fromMaterializer { (mat: Materializer, _: Attributes) => - { - implicit val system: ActorSystem = mat.system - implicit val http: HttpExt = Http() - implicit val ec: ExecutionContext = mat.executionContext - - Source - .fromGraph( - new impl.ElasticsearchSourceStage( - elasticsearchParams, - Map("query" -> query), - settings, - new JacksonReader[java.util.Map[String, Object]](objectMapper, classOf[java.util.Map[String, Object]]))) - } + implicit val system: ActorSystem = mat.system + implicit val http: HttpExt = Http() + implicit val ec: ExecutionContext = mat.executionContext + + Source + .fromGraph( + new impl.ElasticsearchSourceStage( + elasticsearchParams, + Map("query" -> query), + settings, + new JacksonReader[java.util.Map[String, Object]](objectMapper, classOf[java.util.Map[String, Object]]))) } .mapMaterializedValue(_ => NotUsed) @@ -82,18 +80,16 @@ object ElasticsearchSource { objectMapper: ObjectMapper): Source[ReadResult[java.util.Map[String, Object]], NotUsed] = Source .fromMaterializer { (mat: Materializer, _: Attributes) => - { - implicit val system: ActorSystem = mat.system - implicit val http: HttpExt = Http() - implicit val ec: ExecutionContext = mat.executionContext - - Source.fromGraph( - new impl.ElasticsearchSourceStage( - elasticsearchParams, - searchParams.asScala.toMap, - settings, - new JacksonReader[java.util.Map[String, Object]](objectMapper, classOf[java.util.Map[String, Object]]))) - } + implicit val system: ActorSystem = mat.system + implicit val http: HttpExt = Http() + implicit val ec: ExecutionContext = mat.executionContext + + Source.fromGraph( + new impl.ElasticsearchSourceStage( + elasticsearchParams, + searchParams.asScala.toMap, + settings, + new JacksonReader[java.util.Map[String, Object]](objectMapper, classOf[java.util.Map[String, Object]]))) } .mapMaterializedValue(_ => NotUsed) @@ -118,18 +114,16 @@ object ElasticsearchSource { objectMapper: ObjectMapper): Source[ReadResult[T], NotUsed] = Source .fromMaterializer { (mat: Materializer, _: Attributes) => - { - implicit val system: ActorSystem = mat.system - implicit val http: HttpExt = Http() - implicit val ec: ExecutionContext = mat.executionContext - - Source.fromGraph( - new impl.ElasticsearchSourceStage( - elasticsearchParams, - Map("query" -> query), - settings, - new JacksonReader[T](objectMapper, clazz))) - } + implicit val system: ActorSystem = mat.system + implicit val http: HttpExt = Http() + implicit val ec: ExecutionContext = mat.executionContext + + Source.fromGraph( + new impl.ElasticsearchSourceStage( + elasticsearchParams, + Map("query" -> query), + settings, + new JacksonReader[T](objectMapper, clazz))) } .mapMaterializedValue(_ => NotUsed) @@ -150,18 +144,16 @@ object ElasticsearchSource { objectMapper: ObjectMapper): Source[ReadResult[T], NotUsed] = Source .fromMaterializer { (mat: Materializer, _: Attributes) => - { - implicit val system: ActorSystem = mat.system - implicit val http: HttpExt = Http() - implicit val ec: ExecutionContext = mat.executionContext - - Source.fromGraph( - new impl.ElasticsearchSourceStage( - elasticsearchParams, - searchParams.asScala.toMap, - settings, - new JacksonReader[T](objectMapper, clazz))) - } + implicit val system: ActorSystem = mat.system + implicit val http: HttpExt = Http() + implicit val ec: ExecutionContext = mat.executionContext + + Source.fromGraph( + new impl.ElasticsearchSourceStage( + elasticsearchParams, + searchParams.asScala.toMap, + settings, + new JacksonReader[T](objectMapper, clazz))) } .mapMaterializedValue(_ => NotUsed) @@ -171,9 +163,9 @@ object ElasticsearchSource { val jsonTree = mapper.readTree(json) - if (jsonTree.has("error")) { + if (jsonTree.has("error")) impl.ScrollResponse(Some(jsonTree.get("error").asText()), None) - } else { + else { val scrollId = Option(jsonTree.get("_scroll_id")).map(_.asText()) val hits = jsonTree.get("hits").get("hits").asInstanceOf[ArrayNode] val messages = hits.elements().asScala.toList.map { element => diff --git a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchFlow.scala b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchFlow.scala index 3c80a959e..fef7ff1d1 100644 --- a/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchFlow.scala +++ b/elasticsearch/src/main/scala/org/apache/pekko/stream/connectors/elasticsearch/scaladsl/ElasticsearchFlow.scala @@ -48,12 +48,11 @@ object ElasticsearchFlow { */ def create[T](elasticsearchParams: ElasticsearchParams, settings: WriteSettingsBase[_, _], - writer: MessageWriter[T]): Flow[WriteMessage[T, NotUsed], WriteResult[T, NotUsed], NotUsed] = { + writer: MessageWriter[T]): Flow[WriteMessage[T, NotUsed], WriteResult[T, NotUsed], NotUsed] = Flow[WriteMessage[T, NotUsed]] .batch(settings.bufferSize, immutable.Seq(_)) { case (seq, wm) => seq :+ wm } .via(stageFlow(elasticsearchParams, settings, writer)) .mapConcat(identity) - } /** * Create a flow to update Elasticsearch with [[pekko.stream.connectors.elasticsearch.WriteMessage WriteMessage]]s containing type `T` @@ -75,12 +74,11 @@ object ElasticsearchFlow { */ def createWithPassThrough[T, C](elasticsearchParams: ElasticsearchParams, settings: WriteSettingsBase[_, _], - writer: MessageWriter[T]): Flow[WriteMessage[T, C], WriteResult[T, C], NotUsed] = { + writer: MessageWriter[T]): Flow[WriteMessage[T, C], WriteResult[T, C], NotUsed] = Flow[WriteMessage[T, C]] .batch(settings.bufferSize, immutable.Seq(_)) { case (seq, wm) => seq :+ wm } .via(stageFlow(elasticsearchParams, settings, writer)) .mapConcat(identity) - } /** * Create a flow to update Elasticsearch with @@ -106,9 +104,8 @@ object ElasticsearchFlow { def createBulk[T, C]( elasticsearchParams: ElasticsearchParams, settings: WriteSettingsBase[_, _], - writer: MessageWriter[T]): Flow[immutable.Seq[WriteMessage[T, C]], immutable.Seq[WriteResult[T, C]], NotUsed] = { + writer: MessageWriter[T]): Flow[immutable.Seq[WriteMessage[T, C]], immutable.Seq[WriteResult[T, C]], NotUsed] = stageFlow(elasticsearchParams, settings, writer) - } /** * Create a flow to update Elasticsearch with [[pekko.stream.connectors.elasticsearch.WriteMessage WriteMessage]]s containing type `T` @@ -134,19 +131,18 @@ object ElasticsearchFlow { def createWithContext[T, C]( elasticsearchParams: ElasticsearchParams, settings: WriteSettingsBase[_, _], - writer: MessageWriter[T]): FlowWithContext[WriteMessage[T, NotUsed], C, WriteResult[T, C], C, NotUsed] = { + writer: MessageWriter[T]): FlowWithContext[WriteMessage[T, NotUsed], C, WriteResult[T, C], C, NotUsed] = Flow[WriteMessage[T, C]] .batch(settings.bufferSize, immutable.Seq(_)) { case (seq, wm) => seq :+ wm } .via(stageFlow(elasticsearchParams, settings, writer)) .mapConcat(identity) .asFlowWithContext[WriteMessage[T, NotUsed], C, C]((res, c) => res.withPassThrough(c))(p => p.message.passThrough) - } @InternalApi private def stageFlow[T, C]( elasticsearchParams: ElasticsearchParams, settings: WriteSettingsBase[_, _], - writer: MessageWriter[T]): Flow[immutable.Seq[WriteMessage[T, C]], immutable.Seq[WriteResult[T, C]], NotUsed] = { + writer: MessageWriter[T]): Flow[immutable.Seq[WriteMessage[T, C]], immutable.Seq[WriteResult[T, C]], NotUsed] = if (settings.retryLogic == RetryNever) { val basicFlow = basicStageFlow[T, C](elasticsearchParams, settings, writer) Flow[immutable.Seq[WriteMessage[T, C]]] @@ -177,11 +173,10 @@ object ElasticsearchFlow { .via(retryFlow) .via(applyOrderingFlow[T, C]) } - } @InternalApi private def amendWithIndexFlow[T, C]: Flow[immutable.Seq[WriteMessage[T, C]], (immutable.Seq[WriteMessage[T, (Int, - C)]], immutable.Seq[WriteResult[T, (Int, C)]]), NotUsed] = { + C)]], immutable.Seq[WriteResult[T, (Int, C)]]), NotUsed] = Flow[immutable.Seq[WriteMessage[T, C]]].map { messages => val indexedMessages = messages.zipWithIndex.map { case (m, idx) => @@ -189,11 +184,10 @@ object ElasticsearchFlow { } indexedMessages -> Nil } - } @InternalApi private def applyOrderingFlow[T, C] - : Flow[immutable.Seq[WriteResult[T, (Int, C)]], immutable.Seq[WriteResult[T, C]], NotUsed] = { + : Flow[immutable.Seq[WriteResult[T, (Int, C)]], immutable.Seq[WriteResult[T, C]], NotUsed] = Flow[immutable.Seq[WriteResult[T, (Int, C)]]].map { results => val orderedResults = results.sortBy(_.message.passThrough._1) val finalResults = orderedResults.map { r => @@ -201,12 +195,11 @@ object ElasticsearchFlow { } finalResults } - } @InternalApi private def basicStageFlow[T, C](elasticsearchParams: ElasticsearchParams, settings: WriteSettingsBase[_, _], - writer: MessageWriter[T]) = { + writer: MessageWriter[T]) = Flow .fromMaterializer { (mat, _) => implicit val system: ActorSystem = mat.system @@ -218,7 +211,6 @@ object ElasticsearchFlow { } } .mapMaterializedValue(_ => NotUsed) - } private final class SprayJsonWriter[T](implicit writer: JsonWriter[T]) extends MessageWriter[T] { override def convert(message: T): String = message.toJson.compactPrint