Skip to content

Commit

Permalink
Merge pull request #157 from AbsaOSS/feature/149-remove-process-all-a…
Browse files Browse the repository at this point in the history
…vailable

Feature/149 remove process all available
  • Loading branch information
kevinwallimann authored Jul 20, 2020
2 parents af7b529 + cce7667 commit 73e4040
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 120 deletions.
26 changes: 14 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ to identify which configuration options belong to a certain transformer instance
| Property Name | Required | Description |
| :--- | :---: | :--- |
| `ingestor.spark.app.name` | Yes | User-defined name of the Spark application. See Spark property `spark.app.name` |
| `ingestor.spark.termination.method` | No | Either `processAllAvailable` (stop query when no more messages are incoming) or `awaitTermination` (stop query on signal, e.g. Ctrl-C). Default: `processAllAvailable`. See also [Combination of trigger and termination method](#combination-of-trigger-and-termination-method) |
| `ingestor.spark.await.termination.timeout` | No | Timeout in milliseconds. Stops query when timeout is reached. This option is only valid with termination method `awaitTermination` |

#### Settings for built-in components
Expand Down Expand Up @@ -183,21 +182,24 @@ Any additional properties for the `DataStreamWriter` can be added with the prefi

#### Common writer properties

| Property Name | Description |
| :--- | :--- |
| Property Name | Required |Description |
| :--- | :---: | :--- |
| `writer.common.checkpoint.location` | Yes | Used for Spark property `checkpointLocation`. The checkpoint location has to be unique among different workflows. |
| `writer.common.trigger.type` | Either `Once` for one-time execution or `ProcessingTime` for micro-batch executions for micro-batch execution. Default: `Once`. See also [Combination of trigger and termination method](#combination-of-trigger-and-termination-method) |
| `writer.common.trigger.processing.time` | Interval in ms for micro-batch execution (using `ProcessingTime`). Default: 0ms, i.e. execution as fast as possible. |
| `writer.common.trigger.type` | No | Either `Once` for one-time execution or `ProcessingTime` for micro-batch executions for micro-batch execution. Default: `Once`. |
| `writer.common.trigger.processing.time` | No | Interval in ms for micro-batch execution (using `ProcessingTime`). Default: 0ms, i.e. execution as fast as possible. |

#### Combination of Trigger and termination method
#### Behavior of Triggers

| Trigger (`writer.common.trigger.type`) | Termination method (`ingestor.spark.termination.method`) | Runtime | Details |
| Trigger (`writer.common.trigger.type`) | Timeout (`ingestor.spark.termination.timeout`) | Runtime | Details |
| :--- | :--- | :--- | :--- |
| Once | AwaitTermination or ProcessAllAvailable | Limited | Consumes all data that is available at the beginning of the micro-batch. The query processes exactly one micro-batch and stops then, even if more data would be available at the end of the micro-batch. |
| Once | AwaitTermination with timeout | Limited | Same as above, but terminates at the timeout. If the timeout is reached before the micro-batch is processed, it won't be completed and no data will be committed. |
| ProcessingTime | ProcessAllAvailable | Only long-running if topic continuously produces messages, otherwise limited | Consumes all available data in micro-batches and only stops when no new data arrives, i.e. when the available offsets are the same as in the previous micro-batch. Thus, it completely depends on the topic, if and when the query terminates. |
| ProcessingTime | AwaitTermination with timeout | Limited | Consumes data in micro-batches and only stops when the timeout is reached or the query is killed. |
| ProcessingTime | AwaitTermination | Long-running | Consumes data in micro-batches and only stops when the query is killed. |
| Once | No timeout | Limited | Consumes all data that is available at the beginning of the micro-batch. The query processes exactly one micro-batch and stops then, even if more data would be available at the end of the micro-batch. |
| ProcessingTime | With timeout | Limited | Consumes data in micro-batches and only stops when the timeout is reached or the query is killed. |
| ProcessingTime | No timeout | Long-running | Consumes data in micro-batches and only stops when the query is killed. |

- Note 1: The first micro-batch of the query will contain all available messages to consume and can therefore be quite large,
even if the trigger `ProcessingTime` is configured, and regardless of what micro-batch interval is configured.
To limit the size of a micro-batch, the property `reader.option.maxOffsetsPerTrigger` should be used. See also http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
- Note 2: It's possible to define a timeout for trigger `Once`. If the timeout is reached before the micro-batch is processed, it won't be completed and no data will be committed. Such a behavior seems quite unpredictable and therefore we don't recommend it.

See the [Spark Documentation](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers) for more information about triggers.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,34 @@ import java.util.UUID
import org.apache.commons.configuration2.Configuration
import org.apache.logging.log4j.LogManager
import org.apache.spark.sql.SparkSession
import za.co.absa.hyperdrive.driver.TerminationMethodEnum.{AwaitTermination, ProcessAllAvailable, TerminationMethod}
import za.co.absa.hyperdrive.ingestor.api.reader.StreamReader
import za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformer
import za.co.absa.hyperdrive.ingestor.api.utils.{ComponentFactoryUtil, ConfigUtils}
import za.co.absa.hyperdrive.ingestor.api.writer.StreamWriter
import za.co.absa.hyperdrive.shared.exceptions.{IngestionException, IngestionStartException}

import scala.util.control.NonFatal
import scala.util.{Failure, Success}

/**
* This object is responsible for running the ingestion job by using the components it
* receives upon invocation.
*/
* This object is responsible for running the ingestion job by using the components it
* receives upon invocation.
*/
class SparkIngestor(val spark: SparkSession,
val terminationMethod: TerminationMethod,
val awaitTerminationTimeout: Option[Long],
val conf: Configuration) {

private val logger = LogManager.getLogger

/**
* This method performs the ingestion according to the components it receives.
*
* THIS METHOD IS BLOCKING, which is achieved by invoking "processAllAvailable" on the streaming query, thus, if you
* do not want a blocking behaviour, make sure you invoke it from inside a separate thread (or similar approach).
*
* IF this method is invoked to ingest from a continuous source (e.g. a topic that is receiving data no-stop), it WILL
* BLOCK UNTIL THERE IS NO MORE DATA because of how "processAllAvailable" works.
*
* @param streamReader [[StreamReader]] implementation responsible for connecting to the source stream.
* @param streamTransformers List of [[StreamTransformer]] implementation responsible for performing any transformations on the stream data (e.g. conformance)
* @param streamWriter [[StreamWriter]] implementation responsible for defining how and where the stream will be sent.
*/
* This method performs the ingestion according to the components it receives.
*
* THIS METHOD IS BLOCKING, which is achieved by invoking "awaitTermination" on the streaming query, thus, if you
* do not want a blocking behaviour, make sure you invoke it from inside a separate thread (or similar approach).
*
* @param streamReader [[StreamReader]] implementation responsible for connecting to the source stream.
* @param streamTransformers List of [[StreamTransformer]] implementation responsible for performing any transformations on the stream data (e.g. conformance)
* @param streamWriter [[StreamWriter]] implementation responsible for defining how and where the stream will be sent.
*/
@throws(classOf[IllegalArgumentException])
@throws(classOf[IngestionStartException])
@throws(classOf[IngestionException])
Expand All @@ -75,18 +69,12 @@ class SparkIngestor(val spark: SparkSession,
}

try {
terminationMethod match {
case ProcessAllAvailable =>
ingestionQuery.processAllAvailable() // processes everything available at the source and stops after that
awaitTerminationTimeout match {
case Some(timeout) =>
ingestionQuery.awaitTermination(timeout)
ingestionQuery.stop()
case AwaitTermination =>
awaitTerminationTimeout match {
case Some(timeout) =>
ingestionQuery.awaitTermination(timeout)
ingestionQuery.stop()
case None =>
ingestionQuery.awaitTermination()
}
case None =>
ingestionQuery.awaitTermination()
}
} catch {
case NonFatal(e) =>
Expand All @@ -108,26 +96,10 @@ object SparkIngestor extends SparkIngestorAttributes {
def apply(conf: Configuration): SparkIngestor = {
ComponentFactoryUtil.validateConfiguration(conf, getProperties)
val spark = getSparkSession(conf)
val terminationMethod = getTerminationMethod(conf)
val awaitTerminationTimeout = getAwaitTerminationTimeoutMs(conf)

logger.info(s"Creating ingestor: termination method = '$terminationMethod', " +
s"await termination timeout = '$awaitTerminationTimeout'")
new SparkIngestor(spark, terminationMethod, awaitTerminationTimeout, conf)
}

private def getTerminationMethod(conf: Configuration): TerminationMethod = {
ConfigUtils.getOrNone(KEY_TERMINATION_METHOD, conf) match {
case Some(name) => parseTerminationMethod(name)
case None => ProcessAllAvailable
}
}

private def parseTerminationMethod(name: String) = {
TerminationMethodEnum.of(name) match {
case Failure(exception) => throw new IllegalArgumentException(s"Invalid value for $KEY_TERMINATION_METHOD", exception)
case Success(value) => value
}
logger.info(s"Creating ingestor: await termination timeout = '$awaitTerminationTimeout'")
new SparkIngestor(spark, awaitTerminationTimeout, conf)
}

private def getAwaitTerminationTimeoutMs(conf: Configuration): Option[Long] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@

package za.co.absa.hyperdrive.driver

import za.co.absa.hyperdrive.driver.TerminationMethodEnum.{AwaitTermination, ProcessAllAvailable}
import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata}

trait SparkIngestorAttributes extends HasComponentAttributes {
val keysPrefix = "ingestor.spark"
val KEY_APP_NAME = s"$keysPrefix.app.name"
val KEY_TERMINATION_METHOD = s"$keysPrefix.termination.method"
val KEY_AWAIT_TERMINATION_TIMEOUT = s"$keysPrefix.await.termination.timeout"

override def getName: String = "Spark Ingestor"
Expand All @@ -30,11 +28,7 @@ trait SparkIngestorAttributes extends HasComponentAttributes {

override def getProperties: Map[String, PropertyMetadata] = Map(
KEY_APP_NAME -> PropertyMetadata("Name of Spark application", None, required = true),
KEY_TERMINATION_METHOD -> PropertyMetadata("Termination method",
Some(s"Either '$ProcessAllAvailable' (stop when no more messages arrive) or '$AwaitTermination' (stop on signal)." +
s" Default is '$ProcessAllAvailable'"), required = false),
KEY_AWAIT_TERMINATION_TIMEOUT -> PropertyMetadata("Await Termination: Timeout(ms)", Some("Stops query when timeout is reached." +
s" This option is only valid with termination method '$AwaitTermination'"), required = false)
KEY_AWAIT_TERMINATION_TIMEOUT -> PropertyMetadata("Await Termination: Timeout(ms)", Some("Stops query when timeout is reached."), required = false)
)

override def getExtraConfigurationPrefix: Option[String] = None
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.mockito.Mockito._
import org.scalatest.mockito.MockitoSugar
import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
import za.co.absa.commons.spark.SparkTestBase
import za.co.absa.hyperdrive.driver.TerminationMethodEnum.AwaitTermination
import za.co.absa.hyperdrive.ingestor.api.reader.StreamReader
import za.co.absa.hyperdrive.ingestor.api.transformer.StreamTransformer
import za.co.absa.hyperdrive.ingestor.api.writer.StreamWriter
Expand Down Expand Up @@ -81,7 +80,7 @@ class TestSparkIngestor extends FlatSpec with BeforeAndAfterEach with MockitoSug
when(streamReader.read(any[SparkSession])).thenReturn(dataFrame)
when(streamTransformer.transform(dataFrame)).thenReturn(dataFrame)
when(streamWriter.write(dataFrame)).thenReturn(streamingQuery)
when(streamingQuery.stop()).thenThrow(classOf[RuntimeException])
when(streamingQuery.awaitTermination).thenThrow(classOf[RuntimeException])
assertThrows[IngestionException](sparkIngestor.ingest(streamReader, Seq(streamTransformer), streamWriter))
}

Expand All @@ -97,8 +96,7 @@ class TestSparkIngestor extends FlatSpec with BeforeAndAfterEach with MockitoSug
inOrderCheck.verify(streamReader).read(any[SparkSession])
inOrderCheck.verify(streamTransformer).transform(dataFrame)
inOrderCheck.verify(streamWriter).write(dataFrame)
verify(streamingQuery).processAllAvailable
verify(streamingQuery).stop
verify(streamingQuery).awaitTermination
}

it should "use the configured app name" in {
Expand All @@ -118,7 +116,6 @@ class TestSparkIngestor extends FlatSpec with BeforeAndAfterEach with MockitoSug
it should "use terminationMethod awaitTermination if configured" in {
val config = new BaseConfiguration
config.addProperty(SparkIngestor.KEY_APP_NAME, "my-spark-app")
config.addProperty(s"${SparkIngestor.KEY_TERMINATION_METHOD}", AwaitTermination)
val sparkIngestor = SparkIngestor(config)
when(streamReader.read(any[SparkSession])).thenReturn(dataFrame)
when(streamTransformer.transform(dataFrame)).thenReturn(dataFrame)
Expand All @@ -132,7 +129,6 @@ class TestSparkIngestor extends FlatSpec with BeforeAndAfterEach with MockitoSug
it should "use timeout if configured with terminationMethod awaitTermination" in {
val config = new BaseConfiguration
config.addProperty(SparkIngestor.KEY_APP_NAME, "my-spark-app")
config.addProperty(s"${SparkIngestor.KEY_TERMINATION_METHOD}", AwaitTermination)
config.addProperty(s"${SparkIngestor.KEY_AWAIT_TERMINATION_TIMEOUT}", "10000")
val sparkIngestor = SparkIngestor(config)
when(streamReader.read(any[SparkSession])).thenReturn(dataFrame)
Expand All @@ -144,15 +140,6 @@ class TestSparkIngestor extends FlatSpec with BeforeAndAfterEach with MockitoSug
verify(streamingQuery).awaitTermination(eqTo(10000L))
}

it should "throw if an invalid terminationMethod is configured" in {
val config = new BaseConfiguration
config.addProperty(SparkIngestor.KEY_APP_NAME, "my-spark-app")
config.addProperty(s"${SparkIngestor.KEY_TERMINATION_METHOD}", "non-existent")
val throwable = intercept[IllegalArgumentException](SparkIngestor(config))

throwable.getMessage should include(SparkIngestor.KEY_TERMINATION_METHOD)
}

it should "throw if a timeout is not a number" in {
val config = new BaseConfiguration
config.addProperty(SparkIngestor.KEY_APP_NAME, "my-spark-app")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ class KafkaToKafkaDockerTest extends FlatSpec with Matchers with SparkTestBase w

// Spark settings
"ingestor.spark.app.name" -> "ingestor-app",
"ingestor.spark.termination.method" -> "ProcessAllAvailable",

// Source(Kafka) settings
"reader.kafka.topic" -> sourceTopic,
Expand All @@ -117,8 +116,7 @@ class KafkaToKafkaDockerTest extends FlatSpec with Matchers with SparkTestBase w

// Sink(Kafka) settings
"writer.common.checkpoint.location" -> (checkpointDir + "/${reader.kafka.topic}"),
"writer.common.trigger.type" -> "ProcessingTime",
"writer.common.trigger.processing.time" -> "1000",
"writer.common.trigger.type" -> "Once",
"writer.kafka.topic" -> destinationTopic,
"writer.kafka.brokers" -> "${reader.kafka.brokers}"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ class KafkaToParquetDockerTest extends FlatSpec with Matchers with SparkTestBase

// Spark settings
"ingestor.spark.app.name" -> "ingestor-app",
"ingestor.spark.termination.method" -> "AwaitTermination",

// Source(Kafka) settings
"reader.kafka.topic" -> topic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ class KafkaToParquetIncrementingVersionDockerTest extends FlatSpec with Matchers

// Spark settings
"ingestor.spark.app.name" -> "ingestor-app",
"ingestor.spark.termination.method" -> "AwaitTermination",

// Source(Kafka) settings
"reader.kafka.topic" -> topic,
Expand Down

0 comments on commit 73e4040

Please sign in to comment.