Skip to content

Commit

Permalink
Allow S3 checkpoint folder (#225)
Browse files Browse the repository at this point in the history
* file system change

* adding generic file system

* pr fixes

* undo whitespaces
  • Loading branch information
senelesithole authored Apr 30, 2021
1 parent 16a23c7 commit d154403
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@

package za.co.absa.hyperdrive.ingestor.implementation.reader.kafka

import java.net.URI

import org.apache.commons.configuration2.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.commons.lang3.StringUtils
import org.apache.logging.log4j.LogManager
import org.apache.spark.sql.{DataFrame, SparkSession}
Expand Down Expand Up @@ -62,6 +65,7 @@ private[reader] class KafkaStreamReader(
* thus, if not properly configured, the issue will ONLY BE FOUND AT RUNTIME.
*/
override def read(spark: SparkSession): DataFrame = {
implicit val fs: FileSystem = FileSystem.get(new URI(checkpointLocation), spark.sparkContext.hadoopConfiguration)

if (spark.sparkContext.isStopped) {
throw new IllegalStateException("SparkSession is stopped.")
Expand All @@ -75,14 +79,14 @@ private[reader] class KafkaStreamReader(
.option(TOPIC_SUBSCRIPTION_KEY, topic)
.option(SPARK_BROKERS_SETTING_KEY, brokers)

val streamReaderWithStartingOffsets = configureStartingOffsets(streamReader, spark.sparkContext.hadoopConfiguration)
val streamReaderWithStartingOffsets = configureStartingOffsets(streamReader)
streamReaderWithStartingOffsets
.options(extraConfs)
.load()
}

private def configureStartingOffsets(streamReader: DataStreamReader, configuration: org.apache.hadoop.conf.Configuration): DataStreamReader = {
val startingOffsets = getStartingOffsets(checkpointLocation, configuration)
private def configureStartingOffsets(streamReader: DataStreamReader)(implicit fileSystem: FileSystem): DataStreamReader = {
val startingOffsets = getStartingOffsets(checkpointLocation)

startingOffsets match {
case Some(startOffset) =>
Expand All @@ -94,9 +98,9 @@ private[reader] class KafkaStreamReader(
}
}

private def getStartingOffsets(checkpointLocation: String, configuration: org.apache.hadoop.conf.Configuration): Option[String] = {
if (FileUtils.exists(checkpointLocation, configuration) && !FileUtils.isEmpty(checkpointLocation, configuration)) {
Option.empty
private def getStartingOffsets(checkpointLocation: String)(implicit fileSystem: FileSystem): Option[String] = {
if (FileUtils.exists(checkpointLocation) && !FileUtils.isEmpty(checkpointLocation)) {
Option.empty
}
else {
Option(STARTING_OFFSETS_EARLIEST)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

package za.co.absa.hyperdrive.ingestor.implementation.utils

import java.net.URI

import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation}
Expand All @@ -26,8 +29,8 @@ import scala.util.{Failure, Success, Try}

object MetadataLogUtil {
def getParquetFilesNotListedInMetadataLog(spark: SparkSession, rootPath: String): Try[Set[String]] = {
val config = spark.sparkContext.hadoopConfiguration
if(FileUtils.notExists(rootPath, config) || FileUtils.isEmpty(rootPath, config)) {
implicit val fs: FileSystem = FileSystem.get(new URI(rootPath), spark.sparkContext.hadoopConfiguration)
if(FileUtils.notExists(rootPath) || FileUtils.isEmpty(rootPath)) {
Success(Set.empty)
} else {
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,24 @@

package za.co.absa.hyperdrive.shared.utils

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

private[hyperdrive] object FileUtils {

def exists(file: String, configuration: Configuration): Boolean = {
val fileSystem = getFileSystem(configuration)
fileSystem.exists(new Path(file))
def exists(file: String)(implicit fs: FileSystem): Boolean = {
fs.exists(new Path(file))
}

def notExists(file: String, configuration: Configuration): Boolean = !exists(file, configuration)
def notExists(file: String)(implicit fs: FileSystem): Boolean = !exists(file)

def isDirectory(file: String, configuration: Configuration): Boolean = {
val fileSystem = getFileSystem(configuration)
fileSystem.isDirectory(new Path(file))
def isDirectory(file: String)(implicit fs: FileSystem): Boolean = {
fs.isDirectory(new Path(file))
}

def isNotDirectory(file: String, configuration: Configuration): Boolean = !isDirectory(file, configuration)
def isNotDirectory(file: String)(implicit fs: FileSystem): Boolean = !isDirectory(file)

def isEmpty(directory: String, configuration: Configuration): Boolean = {
val fs = getFileSystem(configuration)
def isEmpty(directory: String)(implicit fs: FileSystem): Boolean = {
val path = new Path(directory)
fs.exists(path) && !fs.listFiles(path, true).hasNext
}

private def getFileSystem(configuration: Configuration): FileSystem = {
FileSystem.get(configuration)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@
package za.co.absa.hyperdrive.shared.utils

import java.util.UUID
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
import za.co.absa.commons.io.TempDirectory
import za.co.absa.commons.spark.SparkTestBase

class TestFileUtils extends FlatSpec with Matchers with SparkTestBase with BeforeAndAfter {

behavior of "FileUtils"
private val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
private val config = spark.sparkContext.hadoopConfiguration
private var baseDirectory: TempDirectory = _
private var baseDirPath: String = _
private val dummyDirectory = TempDirectory("DummyDirectory")
private implicit val fs: FileSystem = FileSystem.get(dummyDirectory.path.toUri, spark.sparkContext.hadoopConfiguration)

before {
baseDirectory = TempDirectory("FileUtilsTest")
Expand All @@ -45,7 +45,7 @@ class TestFileUtils extends FlatSpec with Matchers with SparkTestBase with Befor
fs.mkdirs(new Path(directory))

// when, then
FileUtils.isEmpty(directory, config) shouldBe true
FileUtils.isEmpty(directory) shouldBe true
}

it should "return true if the directory only contains other directories, but no files" in {
Expand All @@ -55,7 +55,7 @@ class TestFileUtils extends FlatSpec with Matchers with SparkTestBase with Befor
fs.mkdirs(new Path(subDirectory))

// when, then
FileUtils.isEmpty(directory, config) shouldBe true
FileUtils.isEmpty(directory) shouldBe true
}

it should "return false if the directory is not empty" in {
Expand All @@ -66,15 +66,15 @@ class TestFileUtils extends FlatSpec with Matchers with SparkTestBase with Befor
fs.create(new Path(subDirectory, "_INFO"))

// when, then
FileUtils.isEmpty(directory, config) shouldBe false
FileUtils.isEmpty(directory) shouldBe false
}

it should "return false if the directory does not exist" in {
// given
val doesNotExist = s"$baseDirPath/${UUID.randomUUID().toString}"

// when, then
FileUtils.isEmpty(doesNotExist, config) shouldBe false
FileUtils.isEmpty(doesNotExist) shouldBe false
}

it should "return false if the argument is a file" in {
Expand All @@ -83,6 +83,6 @@ class TestFileUtils extends FlatSpec with Matchers with SparkTestBase with Befor
fs.create(new Path(file))

// when, then
FileUtils.isEmpty(file, config) shouldBe false
FileUtils.isEmpty(file) shouldBe false
}
}

0 comments on commit d154403

Please sign in to comment.