Skip to content

Commit

Permalink
Merge pull request #75 from AbsaOSS/feature/72-remove-unreachable-code
Browse files Browse the repository at this point in the history
Feature/72 remove unreachable code
  • Loading branch information
kevinwallimann authored Jan 22, 2020
2 parents a50542c + 30ef9b4 commit c6f94ed
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,25 @@ import org.apache.logging.log4j.LogManager
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.DataStreamReader
import za.co.absa.hyperdrive.ingestor.api.reader.{StreamReader, StreamReaderFactory}
import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.KafkaStreamReaderKeys.{KEY_BROKERS, KEY_TOPIC, rootComponentConfKey, rootFactoryOptionalConfKey}
import za.co.absa.hyperdrive.shared.utils.ConfigUtils.{getOrNone, getOrThrow, getSeqOrThrow}
import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.KafkaStreamReaderKeys.{KEY_BROKERS, KEY_TOPIC, rootFactoryOptionalConfKey}
import za.co.absa.hyperdrive.shared.utils.ConfigUtils
import za.co.absa.hyperdrive.shared.utils.ConfigUtils.{getOrThrow, getSeqOrThrow}

private[reader] object KafkaStreamReaderProps {
val STREAM_FORMAT_KAFKA_NAME = "kafka"
val BROKERS_SETTING_KEY = "bootstrap.servers"
val STREAM_FORMAT_KAFKA_NAME = "kafka"
val BROKERS_SETTING_KEY = "bootstrap.servers"
val SPARK_BROKERS_SETTING_KEY = "kafka.bootstrap.servers"
val TOPIC_SUBSCRIPTION_KEY = "subscribe"
val TOPIC_SUBSCRIPTION_KEY = "subscribe"
}

/**
* Creates a stream reader from Kafka.
*
* @param topic String containing the topic
* @param brokers String containing the brokers
* @param extraConfs Extra configurations, e.g. SSL params.
*/
private[reader] class KafkaStreamReader(val topic: String, val brokers: String, val extraConfs: Map[String,String]) extends StreamReader {
* Creates a stream reader from Kafka.
*
* @param topic String containing the topic
* @param brokers String containing the brokers
* @param extraConfs Extra configurations, e.g. SSL params.
*/
private[reader] class KafkaStreamReader(val topic: String, val brokers: String, val extraConfs: Map[String, String]) extends StreamReader {

private val logger = LogManager.getLogger()

Expand All @@ -50,16 +51,12 @@ private[reader] class KafkaStreamReader(val topic: String, val brokers: String,
throw new IllegalArgumentException(s"Invalid brokers: '$brokers'")
}

if (extraConfs == null) {
throw new IllegalArgumentException("Null extra configurations.")
}

/**
* Creates a [[DataStreamReader]] instance from a SparkSession
*
* IMPORTANT: this method does not check against malformed data (e.g. invalid broker protocols or certificate locations),
* thus, if not properly configured, the issue will ONLY BE FOUND AT RUNTIME.
*/
* Creates a [[DataStreamReader]] instance from a SparkSession
*
* IMPORTANT: this method does not check against malformed data (e.g. invalid broker protocols or certificate locations),
* thus, if not properly configured, the issue will ONLY BE FOUND AT RUNTIME.
*/
override def read(spark: SparkSession): DataStreamReader = {

import KafkaStreamReaderProps._
Expand All @@ -78,9 +75,7 @@ private[reader] class KafkaStreamReader(val topic: String, val brokers: String,
.option(TOPIC_SUBSCRIPTION_KEY, topic)
.option(SPARK_BROKERS_SETTING_KEY, brokers)

extraConfs.foldLeft(streamReader) {
case (previousStreamReader, (newConfKey, newConfValue)) => previousStreamReader.option(newConfKey, newConfValue)
}
streamReader.options(extraConfs)
}

override def getSourceName: String = s"Kafka topic: $topic"
Expand Down Expand Up @@ -108,46 +103,7 @@ object KafkaStreamReader extends StreamReaderFactory {
brokers.mkString(",")
}

private def getExtraOptions(configuration: Configuration): Map[String,String] = {
val optionalKeys = getKeysFromPrefix(configuration, rootFactoryOptionalConfKey)

val extraConfs = optionalKeys.foldLeft(Map[String,String]()) {
case (map,securityKey) =>
getOrNone(securityKey, configuration) match {
case Some(value) => map + (tweakOptionKeyName(securityKey) -> value)
case None => map
}
}

if (extraConfs.isEmpty || extraConfs.size == optionalKeys.size) {
extraConfs
}
else {
logger.warn(s"Assuming no security settings, since some appear to be missing: {${findMissingKeys(optionalKeys, extraConfs)}}")
Map[String,String]()
}
}

private def getKeysFromPrefix(configuration: Configuration, prefix: String): Seq[String] = {
val optionalKeys = configuration.getKeys(prefix)

if (optionalKeys != null) {
import scala.collection.JavaConverters._
optionalKeys.asScala.toSeq
} else {
Seq[String]()
}
}

private def findMissingKeys(keys: Seq[String], map: Map[String,String]): Seq[String] = keys.filterNot(map.contains)

private def tweakKeyName(key: String): String = {
key.replace(s"$rootComponentConfKey.", "") // remove the component root configuration key
}

private def tweakOptionKeyName(key: String): String = {
key.replace(s"$rootFactoryOptionalConfKey.", "") // remove the component root.option configuration key
}
private def getExtraOptions(configuration: Configuration): Map[String, String] = ConfigUtils.getPropertySubset(configuration, rootFactoryOptionalConfKey)

private def filterKeysContaining(map: Map[String,String], exclusionToken: String): Map[String,String] = map.filterKeys(!_.contains(exclusionToken))
private def filterKeysContaining(map: Map[String, String], exclusionToken: String) = map.filterKeys(!_.contains(exclusionToken))
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.ParquetStr
import za.co.absa.hyperdrive.shared.utils.ConfigUtils
import za.co.absa.hyperdrive.shared.utils.ConfigUtils.getOrThrow

private[writer] abstract class AbstractParquetStreamWriter(destination: String, val extraConfOptions: Option[Map[String, String]]) extends StreamWriter(destination) {
private[writer] abstract class AbstractParquetStreamWriter(destination: String, val extraConfOptions: Map[String, String]) extends StreamWriter(destination) {

if (StringUtils.isBlank(destination)) {
throw new IllegalArgumentException(s"Invalid PARQUET destination: '$destination'")
Expand Down Expand Up @@ -57,14 +57,7 @@ private[writer] abstract class AbstractParquetStreamWriter(destination: String,
.outputMode(OutputMode.Append())
}

protected def addOptions(outStream: DataStreamWriter[Row], extraConfOptions: Option[Map[String, String]]): DataStreamWriter[Row] = {
extraConfOptions match {
case Some(options) => options.foldLeft(outStream) {
case (previousOutStream, (optionKey, optionValue)) => previousOutStream.option(optionKey, optionValue)
}
case None => outStream
}
}
protected def addOptions(outStream: DataStreamWriter[Row], extraConfOptions: Map[String, String]): DataStreamWriter[Row] = outStream.options(extraConfOptions)

protected def configureOffsets(outStream: DataStreamWriter[Row], offsetManager: OffsetManager, configuration: org.apache.hadoop.conf.Configuration): DataStreamWriter[Row] = offsetManager.configureOffsets(outStream, configuration)
}
Expand All @@ -73,7 +66,7 @@ object AbstractParquetStreamWriter {

def getDestinationDirectory(configuration: Configuration): String = getOrThrow(KEY_DESTINATION_DIRECTORY, configuration, errorMessage = s"Destination directory not found. Is '$KEY_DESTINATION_DIRECTORY' defined?")

def getExtraOptions(configuration: Configuration): Option[Map[String, String]] = ConfigUtils.getPropertySubset(configuration, KEY_EXTRA_CONFS_ROOT)
def getExtraOptions(configuration: Configuration): Map[String, String] = ConfigUtils.getPropertySubset(configuration, KEY_EXTRA_CONFS_ROOT)
}


Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.AbstractParq
import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.ParquetPartitioningStreamWriterKeys._


private[writer] class ParquetPartitioningStreamWriter(destination: String, reportDate: String, extraConfOptions: Option[Map[String, String]]) extends AbstractParquetStreamWriter(destination, extraConfOptions) {
private[writer] class ParquetPartitioningStreamWriter(destination: String, reportDate: String, extraConfOptions: Map[String, String]) extends AbstractParquetStreamWriter(destination, extraConfOptions) {
private val COL_DATE = "hyperdrive_date"
private val COL_VERSION = "hyperdrive_version"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.logging.log4j.LogManager
import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriter, StreamWriterFactory}
import za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.AbstractParquetStreamWriter._

private[writer] class ParquetStreamWriter(destination: String, extraConfOptions: Option[Map[String, String]]) extends AbstractParquetStreamWriter(destination, extraConfOptions)
private[writer] class ParquetStreamWriter(destination: String, extraConfOptions: Map[String, String]) extends AbstractParquetStreamWriter(destination, extraConfOptions)

object ParquetStreamWriter extends StreamWriterFactory {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,36 @@

package za.co.absa.hyperdrive.ingestor.implementation.reader.factories

import org.apache.commons.configuration2.Configuration
import org.mockito.Mockito._
import org.apache.commons.configuration2.BaseConfiguration
import org.scalatest.mockito.MockitoSugar
import org.scalatest.{BeforeAndAfterEach, FlatSpec}
import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
import za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader
import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.KafkaStreamReaderKeys._

class TestStreamReaderAbstractFactory extends FlatSpec with BeforeAndAfterEach with MockitoSugar {

private val configStub: Configuration = mock[Configuration]
class TestStreamReaderAbstractFactory extends FlatSpec with BeforeAndAfterEach with MockitoSugar with Matchers {

behavior of StreamReaderAbstractFactory.getClass.getSimpleName

override def beforeEach(): Unit = reset(configStub)

it should "create KafkaStreamReader" in {
when(configStub.getString(StreamReaderAbstractFactory.componentConfigKey)).thenReturn(KafkaStreamReader.getClass.getName)
when(configStub.getString(KEY_TOPIC)).thenReturn("topic")
when(configStub.getStringArray(KEY_BROKERS)).thenReturn(Array("http://localhost:9092"))
val config = new BaseConfiguration()
config.addProperty(StreamReaderAbstractFactory.componentConfigKey, KafkaStreamReader.getClass.getName)
config.addProperty(KEY_TOPIC, "topic")
config.addProperty(KEY_BROKERS, "http://localhost:9092")

assert(StreamReaderAbstractFactory.build(configStub).isInstanceOf[KafkaStreamReader])
assert(StreamReaderAbstractFactory.build(config).isInstanceOf[KafkaStreamReader])
}

it should "throw if reader parameter is invalid" in {
val invalidFactoryName = "an-invalid-factory-name"
when(configStub.getString(StreamReaderAbstractFactory.componentConfigKey)).thenReturn(invalidFactoryName)
val throwable = intercept[IllegalArgumentException](StreamReaderAbstractFactory.build(configStub))
assert(throwable.getMessage.contains(invalidFactoryName))
val config = new BaseConfiguration()
config.addProperty(StreamReaderAbstractFactory.componentConfigKey, invalidFactoryName)

val throwable = intercept[IllegalArgumentException](StreamReaderAbstractFactory.build(config))

throwable.getMessage should include(invalidFactoryName)
}

it should "throw if stream reader parameter is absent" in {
assertThrows[IllegalArgumentException](StreamReaderAbstractFactory.build(configStub))
assertThrows[IllegalArgumentException](StreamReaderAbstractFactory.build(new BaseConfiguration()))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,6 @@ class TestKafkaStreamReader extends FlatSpec with MockitoSugar {
)
}

it should "throw on null parameters map" in {
assertThrows[IllegalArgumentException]( // null topic
new KafkaStreamReader(validTopic, validBrokers, extraConfs = null)
)
}

it should "throw on null SparkSession" in {
val reader = new KafkaStreamReader(validTopic, validBrokers, validExtraConfs)
assertThrows[IllegalArgumentException](reader.read(spark = null))
Expand All @@ -86,8 +80,7 @@ class TestKafkaStreamReader extends FlatSpec with MockitoSugar {
verify(dataStreamReader).format(KafkaStreamReaderProps.STREAM_FORMAT_KAFKA_NAME)
verify(dataStreamReader).option(TOPIC_SUBSCRIPTION_KEY, validTopic)
verify(dataStreamReader).option(SPARK_BROKERS_SETTING_KEY, validBrokers)

validExtraConfs.foreach(pair => verify(dataStreamReader).option(pair._1, pair._2))
verify(dataStreamReader).options(validExtraConfs)
}

it should "set topic and brokers on SparkSession if no extra options informed" in {
Expand All @@ -102,8 +95,7 @@ class TestKafkaStreamReader extends FlatSpec with MockitoSugar {
verify(dataStreamReader).format(STREAM_FORMAT_KAFKA_NAME)
verify(dataStreamReader).option(TOPIC_SUBSCRIPTION_KEY, validTopic)
verify(dataStreamReader).option(SPARK_BROKERS_SETTING_KEY, validBrokers)

validExtraConfs.foreach(conf => verify(dataStreamReader, never()).option(conf._1, conf._2)) // verify never
verify(dataStreamReader, never()).options(validExtraConfs)
}

it should "include the topic in the source name" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ class TestParquetStreamWriter extends FlatSpec with MockitoSugar {
behavior of "ParquetStreamWriter"

it should "throw on blank destination" in {
assertThrows[IllegalArgumentException](new ParquetStreamWriter(destination = null, None))
assertThrows[IllegalArgumentException](new ParquetStreamWriter(destination = " ", None))
assertThrows[IllegalArgumentException](new ParquetStreamWriter(destination = null, Map()))
assertThrows[IllegalArgumentException](new ParquetStreamWriter(destination = " ", Map()))
}

it should "throw on null DataFrame" in {
Expand All @@ -47,7 +47,7 @@ class TestParquetStreamWriter extends FlatSpec with MockitoSugar {
val offsetManager = mock[OffsetManager]
when(offsetManager.configureOffsets(dataStreamWriter, null)).thenReturn(dataStreamWriter)

val writer = new ParquetStreamWriter(parquetDestination.getAbsolutePath, None)
val writer = new ParquetStreamWriter(parquetDestination.getAbsolutePath, Map())
assertThrows[IllegalArgumentException](writer.write(dataFrame = null, offsetManager))
}

Expand All @@ -57,47 +57,47 @@ class TestParquetStreamWriter extends FlatSpec with MockitoSugar {
val dataFrame = mock[DataFrame]
when(dataFrame.writeStream).thenReturn(dataStreamWriter)

val writer = new ParquetStreamWriter(parquetDestination.getAbsolutePath, None)
val writer = new ParquetStreamWriter(parquetDestination.getAbsolutePath, Map())
assertThrows[IllegalArgumentException](writer.write(dataFrame, offsetManager = null))
}

it should "set format as 'parquet'" in {
val dataStreamWriter = getDataStreamWriter
val offsetManager = getOffsetManager(dataStreamWriter)

invokeWriter(dataStreamWriter, offsetManager, None)
invokeWriter(dataStreamWriter, offsetManager, Map())
verify(dataStreamWriter).format("parquet")
}

it should "set Trigger.Once" in {
val dataStreamWriter = getDataStreamWriter
val offsetManager = getOffsetManager(dataStreamWriter)

invokeWriter(dataStreamWriter, offsetManager, None)
invokeWriter(dataStreamWriter, offsetManager, Map())
verify(dataStreamWriter).trigger(Trigger.Once)
}

it should "set output mode as 'append'" in {
val dataStreamWriter = getDataStreamWriter
val offsetManager = getOffsetManager(dataStreamWriter)

invokeWriter(dataStreamWriter, offsetManager, None)
invokeWriter(dataStreamWriter, offsetManager, Map())
verify(dataStreamWriter).outputMode(OutputMode.Append())
}

it should "invoke OffsetManager passing DataStreamWriter" in {
val dataStreamWriter = getDataStreamWriter
val offsetManager = getOffsetManager(dataStreamWriter)

invokeWriter(dataStreamWriter, offsetManager, None)
invokeWriter(dataStreamWriter, offsetManager, Map())
verify(offsetManager).configureOffsets(dataStreamWriter, configuration)
}

it should "start DataStreamWriter" in {
val dataStreamWriter = getDataStreamWriter
val offsetManager = getOffsetManager(dataStreamWriter)

invokeWriter(dataStreamWriter, offsetManager, None)
invokeWriter(dataStreamWriter, offsetManager, Map())
verify(dataStreamWriter).start(parquetDestination.getAbsolutePath)
}

Expand All @@ -107,15 +107,13 @@ class TestParquetStreamWriter extends FlatSpec with MockitoSugar {

val extraConfs = Map("key.1" -> "value-1", "key.2" -> "value-2")

invokeWriter(dataStreamWriter, offsetManager, Some(extraConfs))
invokeWriter(dataStreamWriter, offsetManager, extraConfs)
verify(dataStreamWriter).start(parquetDestination.getAbsolutePath)

extraConfs.foreach {
case (key: String, value: String) => verify(dataStreamWriter).option(key, value)
}
verify(dataStreamWriter).options(extraConfs)
}

private def invokeWriter(dataStreamWriter: DataStreamWriter[Row], offsetManager: OffsetManager, extraOptions: Option[Map[String,String]]): Unit = {
private def invokeWriter(dataStreamWriter: DataStreamWriter[Row], offsetManager: OffsetManager, extraOptions: Map[String,String]): Unit = {
val dataFrame = getDataFrame(dataStreamWriter)
val writer = new ParquetStreamWriter(parquetDestination.getAbsolutePath, extraOptions)
writer.write(dataFrame, offsetManager)
Expand All @@ -126,7 +124,7 @@ class TestParquetStreamWriter extends FlatSpec with MockitoSugar {
when(dataStreamWriter.trigger(any(Trigger.Once().getClass))).thenReturn(dataStreamWriter)
when(dataStreamWriter.format(anyString())).thenReturn(dataStreamWriter)
when(dataStreamWriter.outputMode(any(OutputMode.Append().getClass))).thenReturn(dataStreamWriter)
when(dataStreamWriter.option(anyString(), anyString())).thenReturn(dataStreamWriter)
when(dataStreamWriter.options(any(classOf[scala.collection.Map[String, String]]))).thenReturn(dataStreamWriter)
dataStreamWriter
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class TestParquetStreamWriterObject extends FlatSpec with Matchers with BeforeAn
val writer = ParquetStreamWriter(config).asInstanceOf[ParquetStreamWriter]

writer.getDestination shouldBe "/tmp/destination/parquet"
writer.extraConfOptions.get should contain theSameElementsAs Map(
writer.extraConfOptions should contain theSameElementsAs Map(
"key1" -> "value1",
"key2" -> "value2"
)
Expand All @@ -53,6 +53,6 @@ class TestParquetStreamWriterObject extends FlatSpec with Matchers with BeforeAn
val writer = ParquetStreamWriter(config).asInstanceOf[ParquetStreamWriter]

writer.getDestination shouldBe "/tmp/destination/parquet"
writer.extraConfOptions shouldBe None
writer.extraConfOptions shouldBe Map()
}
}
Loading

0 comments on commit c6f94ed

Please sign in to comment.