Skip to content

Commit

Permalink
[SPARTA-817] [SPARTA-578] Save transformations step (#1845)
Browse files Browse the repository at this point in the history
* Prepare for next version

* Noting 1.3.0 release date

* Transformations with writer (#1830)

* Validation outputs (#1838)

* Prepare for next version

* Noting 1.3.0 release date

* Zookeeper log level added and updated curator version (#1831)

* Prepare for next version

* Noting 1.3.0 release date

* zookeeper log level added

* [DCS-1371] [SPARTA-816] Postgres options spark 2.1 (#1829)

* Prepare for next version

* Noting 1.3.0 release date

* Postgres properties added

* [SPARTA-762] Save kafka offsets in kafka with Spark 2.x API (#1828)

* Prepare for next version

* Noting 1.3.0 release date

* Save kafka offsets in kafka with Spark 2.x API

* Auto offset lastest by default

* [SPARTA-807]  Edit transformations without delete it (#1832)

* driver and plugins view

* Editable transformations

* merge

* [DCS-1368] Integration with Zookeeper securized with Kerberos (#1827)

* Added Spark Sql dependency in serving-api (#1834)

* Prepare for next version

* Noting 1.3.0 release date

* Spark sql dependency inside serving-api

* fix: HADOOP_PRINCIPAL_NAME and HADOOP_KEYTAB_PATH refectored with SPARTA_* (#1836)

* Transformations with writer (#1830)

* Removed validation outputs

* Transformations with writer (#1830)

* Merge from master (#1842)

* Prepare for next version

* Noting 1.3.0 release date

* Zookeeper log level added and updated curator version (#1831)

* Prepare for next version

* Noting 1.3.0 release date

* zookeeper log level added

* [DCS-1371] [SPARTA-816] Postgres options spark 2.1 (#1829)

* Prepare for next version

* Noting 1.3.0 release date

* Postgres properties added

* [SPARTA-762] Save kafka offsets in kafka with Spark 2.x API (#1828)

* Prepare for next version

* Noting 1.3.0 release date

* Save kafka offsets in kafka with Spark 2.x API

* Auto offset lastest by default

* [SPARTA-807]  Edit transformations without delete it (#1832)

* driver and plugins view

* Editable transformations

* merge

* [DCS-1368] Integration with Zookeeper securized with Kerberos (#1827)

* Added Spark Sql dependency in serving-api (#1834)

* Prepare for next version

* Noting 1.3.0 release date

* Spark sql dependency inside serving-api

* fix: HADOOP_PRINCIPAL_NAME and HADOOP_KEYTAB_PATH refectored with SPARTA_* (#1836)

* Updated Jq version in sparta docker (#1835)

* Updated Jq version in sparta docker

* Update Jenkinsfile

* Update Jenkinsfile

* Delete dependencyfix.sh

* [SPARTA-813] Save raw data step in GUI (#1837)

* driver and plugins view

* Added new option to wizard panel

* merge

* fixed navigation with raw data option

* Zookeeper log level added and updated curator version (#1831)

* Prepare for next version

* Noting 1.3.0 release date

* zookeeper log level added

* [DCS-1371] [SPARTA-816] Postgres options spark 2.1 (#1829)

* Prepare for next version

* Noting 1.3.0 release date

* Postgres properties added

* [SPARTA-762] Save kafka offsets in kafka with Spark 2.x API (#1828)

* Prepare for next version

* Noting 1.3.0 release date

* Save kafka offsets in kafka with Spark 2.x API

* Auto offset lastest by default

* [SPARTA-807]  Edit transformations without delete it (#1832)

* driver and plugins view

* Editable transformations

* merge

* [DCS-1368] Integration with Zookeeper securized with Kerberos (#1827)

* Added Spark Sql dependency in serving-api (#1834)

* Prepare for next version

* Noting 1.3.0 release date

* Spark sql dependency inside serving-api

* fixed menu wizard errors

fixed error

* fixed unit test

* [DCS-1372] Kafka securized with SSL (#1833)

* Prepare for next version

* Noting 1.3.0 release date

* Kafka jobs securized in Spark with SSL

* [DCS-1369] Hdfs configuration files with security (#1839)

* Prepare for next version

* Noting 1.3.0 release date

* Hdfs configuration files with security

* Transformations with writer (#1830)

* Merge from master

* [SPARTA-817] (#1844)

* merge with transformations-writer

* Added new option to wizard panel

* merge

* fixed navigation with raw data option

* [DCS-1368] Integration with Zookeeper securized with Kerberos (#1827)

* fix: HADOOP_PRINCIPAL_NAME and HADOOP_KEYTAB_PATH refectored with SPARTA_* (#1836)

* [SPARTA-762] Save kafka offsets in kafka with Spark 2.x API (#1828)

* Prepare for next version

* Noting 1.3.0 release date

* Save kafka offsets in kafka with Spark 2.x API

* Auto offset lastest by default

* [DCS-1368] Integration with Zookeeper securized with Kerberos (#1827)

* Added Spark Sql dependency in serving-api (#1834)

* Prepare for next version

* Noting 1.3.0 release date

* Spark sql dependency inside serving-api

* Updated grunt tasks

* fix

* Updated Jq version in sparta docker (#1835)

* Updated Jq version in sparta docker

* Update Jenkinsfile

* Update Jenkinsfile

* Delete dependencyfix.sh

* [SPARTA-813] Save raw data step in GUI (#1837)

* driver and plugins view

* Added new option to wizard panel

* merge

* fixed navigation with raw data option

* Zookeeper log level added and updated curator version (#1831)

* Prepare for next version

* Noting 1.3.0 release date

* zookeeper log level added

* [DCS-1371] [SPARTA-816] Postgres options spark 2.1 (#1829)

* Prepare for next version

* Noting 1.3.0 release date

* Postgres properties added

* [SPARTA-762] Save kafka offsets in kafka with Spark 2.x API (#1828)

* Prepare for next version

* Noting 1.3.0 release date

* Save kafka offsets in kafka with Spark 2.x API

* Auto offset lastest by default

* [SPARTA-807]  Edit transformations without delete it (#1832)

* driver and plugins view

* Editable transformations

* merge

* [DCS-1368] Integration with Zookeeper securized with Kerberos (#1827)

* Added Spark Sql dependency in serving-api (#1834)

* Prepare for next version

* Noting 1.3.0 release date

* Spark sql dependency inside serving-api

* fixed menu wizard errors

fixed error

* fixed unit test

* Added save options form in transformations

* fixed unit test

* Merge gruntfile
  • Loading branch information
compae authored Apr 25, 2017
1 parent 06eb405 commit dc872db
Show file tree
Hide file tree
Showing 68 changed files with 1,082 additions and 880 deletions.
1 change: 0 additions & 1 deletion dist/src/main/resources/marathon-app-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,3 @@
"DCOS_SERVICE_SCHEME": "http"
}
}

1 change: 1 addition & 0 deletions docker/kerberos-server-config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ echo "Setting configuration options needed for securized Zookeeper"
##In sparta keytab is expected in SPARTA_KEYTAB_PATH
export SPARTA_KEYTAB_PATH=/etc/sds/sparta/security/sparta.keytab


## Creating a jaas.conf that must be used to connect to Zookeeper if Zookeeper is securized

cat > /etc/sds/sparta/security/jaas.conf<<EOF
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
package com.stratio.sparta.driver

import com.stratio.sparta.driver.factory.SparkContextFactory._
import com.stratio.sparta.driver.helper.SchemaHelper
import com.stratio.sparta.driver.schema.SchemaHelper
import com.stratio.sparta.driver.stage._
import com.stratio.sparta.driver.writer.TransformationsWriterHelper
import com.stratio.sparta.sdk.pipeline.input.Input
import com.stratio.sparta.sdk.utils.AggregationTime
import com.stratio.sparta.serving.core.helpers.PolicyHelper
Expand Down Expand Up @@ -46,11 +47,16 @@ class SpartaWorkflow(val policy: PolicyModel, val curatorFramework: CuratorFrame

saveRawData(policy.rawData, inputDStream, outputs)

val parserSchemas = SchemaHelper.getSchemasFromTransformations(policy.transformations, Input.InitSchema)
val parsedData = ParserStage.applyParsers(inputDStream, parserStage(ReflectionUtils, parserSchemas).sorted)
policy.transformations.foreach { transformationsModel =>
val parserSchemas = SchemaHelper.getSchemasFromTransformations(
transformationsModel.transformationsPipe, Input.InitSchema)
val (parsers, writerOptions) = parserStage(ReflectionUtils, parserSchemas)
val parsedData = ParserStage.applyParsers(
inputDStream, parsers, parserSchemas.values.last, outputs, writerOptions)

triggersStreamStage(parserSchemas.values.last, parsedData, outputs, window)
cubesStreamStage(ReflectionUtils, parserSchemas.values.last, parsedData, outputs)
triggersStreamStage(parserSchemas.values.last, parsedData, outputs, window)
cubesStreamStage(ReflectionUtils, parserSchemas.values.last, parsedData, outputs)
}

ssc.get
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ object SparkContextFactory extends SLF4JLogging {
}
}

def setInitialSentences(sentences: Seq[String]): Unit = sqlInitialSentences = sentences

def sparkStreamingInstance(batchDuration: Duration, checkpointDir: String, remember: Option[String]):
Option[StreamingContext] = {
synchronized {
Expand All @@ -55,27 +53,48 @@ object SparkContextFactory extends SLF4JLogging {
ssc
}

def setSparkContext(createdContext: SparkContext): Unit = sc = Option(createdContext)
def sparkStandAloneContextInstance(specificConfig: Map[String, String], jars: Seq[File]): SparkContext =
synchronized {
sc.getOrElse(instantiateSparkContext(specificConfig, jars))
}


def destroySparkContext(destroyStreamingContext: Boolean = true): Unit = {
if (destroyStreamingContext) destroySparkStreamingContext()

sc.fold(log.warn("Spark Context is empty")) { sparkContext =>
synchronized {
try {
log.info("Stopping SparkContext with name: " + sparkContext.appName)
sparkContext.stop()
log.info("Stopped SparkContext with name: " + sparkContext.appName)
} finally {
sparkSession = None
sqlInitialSentences = Seq.empty[String]
ssc = None
sc = None
}
}
}
}

private[driver] def setSparkContext(createdContext: SparkContext): Unit = sc = Option(createdContext)

def setSparkStreamingContext(createdContext: StreamingContext): Unit = ssc = Option(createdContext)
private[driver] def setSparkStreamingContext(createdContext: StreamingContext): Unit = ssc = Option(createdContext)

private def getNewStreamingContext(batchDuration: Duration, checkpointDir: String, remember: Option[String]):
private[driver] def getNewStreamingContext(batchDuration: Duration, checkpointDir: String, remember: Option[String]):
StreamingContext = {
val ssc = new StreamingContext(sc.get, batchDuration)
ssc.checkpoint(checkpointDir)
remember.foreach(value => ssc.remember(Duration(AggregationTime.parseValueToMilliSeconds(value))))
ssc
}

def sparkStandAloneContextInstance(specificConfig: Map[String, String], jars: Seq[File]): SparkContext =
synchronized {
sc.getOrElse(instantiateSparkContext(specificConfig, jars))
}

def sparkClusterContextInstance(specificConfig: Map[String, String], files: Seq[String]): SparkContext =
private[driver] def sparkClusterContextInstance(specificConfig: Map[String, String],
files: Seq[String]): SparkContext =
sc.getOrElse(instantiateClusterContext(specificConfig, files))

private def instantiateSparkContext(specificConfig: Map[String, String], jars: Seq[File]): SparkContext = {
private[driver] def instantiateSparkContext(specificConfig: Map[String, String], jars: Seq[File]): SparkContext = {
sc = Some(SparkContext.getOrCreate(configToSparkConf(specificConfig)))
jars.foreach(f => {
log.info(s"Adding jar ${f.getAbsolutePath} to Spark context")
Expand All @@ -84,7 +103,8 @@ object SparkContextFactory extends SLF4JLogging {
sc.get
}

private def instantiateClusterContext(specificConfig: Map[String, String], files: Seq[String]): SparkContext = {
private[driver] def instantiateClusterContext(specificConfig: Map[String, String],
files: Seq[String]): SparkContext = {
sc = Some(SparkContext.getOrCreate(configToSparkConf(specificConfig)))
files.foreach(f => {
log.info(s"Adding jar $f to cluster Spark context")
Expand All @@ -93,13 +113,15 @@ object SparkContextFactory extends SLF4JLogging {
sc.get
}

private def configToSparkConf(specificConfig: Map[String, String]): SparkConf = {
private[driver] def configToSparkConf(specificConfig: Map[String, String]): SparkConf = {
val conf = new SparkConf()
specificConfig.foreach { case (key, value) => conf.set(key, value) }
conf
}

def destroySparkStreamingContext(): Unit = {
private[driver] def setInitialSentences(sentences: Seq[String]): Unit = sqlInitialSentences = sentences

private[driver] def destroySparkStreamingContext(): Unit = {
ssc.fold(log.warn("Spark Streaming Context is empty")) { streamingContext =>
try {
synchronized {
Expand All @@ -117,23 +139,4 @@ object SparkContextFactory extends SLF4JLogging {
}
}

def destroySparkContext(destroyStreamingContext: Boolean = true): Unit = {
if (destroyStreamingContext)
destroySparkStreamingContext()

sc.fold(log.warn("Spark Context is empty")) { sparkContext =>
synchronized {
try {
log.info("Stopping SparkContext with name: " + sparkContext.appName)
sparkContext.stop()
log.info("Stopped SparkContext with name: " + sparkContext.appName)
} finally {
sparkSession = None
sqlInitialSentences = Seq.empty[String]
ssc = None
sc = None
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@
* limitations under the License.
*/

package com.stratio.sparta.driver.helper
package com.stratio.sparta.driver.schema

import com.stratio.sparta.driver.writer.TriggerWriterOptions
import com.stratio.sparta.sdk.pipeline.aggregation.cube.{Dimension, ExpiringData}
import com.stratio.sparta.sdk.pipeline.aggregation.operator.Operator
import com.stratio.sparta.sdk.pipeline.output.Output
import com.stratio.sparta.sdk.pipeline.schema.TypeOp
import com.stratio.sparta.sdk.pipeline.schema.TypeOp._
import com.stratio.sparta.sdk.properties.ValidatingPropertyMap._
import com.stratio.sparta.serving.core.models.policy.TransformationsModel
import com.stratio.sparta.serving.core.models.policy.TransformationModel
import com.stratio.sparta.serving.core.models.policy.cube.CubeModel
import org.apache.spark.sql.types.{StructType, _}

Expand All @@ -32,19 +31,12 @@ import scala.util.Try

object SchemaHelper {

val Default_Precision = 10
val Default_Scale = 0
val Nullable = true
val NotNullable = false
val DefaultTimeStampTypeString = "timestamp"
val DefaultTimeStampType = TypeOp.Timestamp
private val Default_Precision = 10
private val Default_Scale = 0
private val Nullable = true
private val NotNullable = false
private val MetadataBuilder = new MetadataBuilder
val MeasureMetadata = MetadataBuilder.putBoolean(Output.MeasureMetadataKey, value = true).build()
val PkMetadata = MetadataBuilder.putBoolean(Output.PrimaryKeyMetadataKey, value = true).build()
val PkTimeMetadata = MetadataBuilder.putBoolean(Output.PrimaryKeyMetadataKey, value = true)
.putBoolean(Output.TimeDimensionKey, value = true).build()

val mapTypes = Map(
private val mapTypes = Map(
TypeOp.Long -> LongType,
TypeOp.Double -> DoubleType,
TypeOp.BigDecimal -> DecimalType(Default_Precision, Default_Scale),
Expand All @@ -57,25 +49,8 @@ object SchemaHelper {
TypeOp.ArrayString -> ArrayType(StringType),
TypeOp.String -> StringType,
TypeOp.MapStringLong -> MapType(StringType, LongType),
TypeOp.MapStringDouble -> MapType(StringType, DoubleType, valueContainsNull = false)
)

val mapSparkTypes: Map[DataType, TypeOp] = Map(
LongType -> TypeOp.Long,
DoubleType -> TypeOp.Double,
DecimalType(Default_Precision, Default_Scale) -> TypeOp.BigDecimal,
IntegerType -> TypeOp.Int,
BooleanType -> TypeOp.Boolean,
DateType -> TypeOp.Date,
TimestampType -> TypeOp.Timestamp,
ArrayType(DoubleType) -> TypeOp.ArrayDouble,
ArrayType(StringType) -> TypeOp.ArrayString,
StringType -> TypeOp.String,
MapType(StringType, LongType) -> TypeOp.MapStringLong,
MapType(StringType, DoubleType, valueContainsNull = false) -> TypeOp.MapStringDouble
)

val mapStringSparkTypes = Map(
TypeOp.MapStringDouble -> MapType(StringType, DoubleType, valueContainsNull = false))
private val mapStringSparkTypes = Map(
"long" -> LongType,
"double" -> DoubleType,
"int" -> IntegerType,
Expand All @@ -88,10 +63,28 @@ object SchemaHelper {
"string" -> StringType,
"arraydouble" -> ArrayType(DoubleType),
"arraystring" -> ArrayType(StringType),
"text" -> StringType
)
"text" -> StringType)

private[driver] val DefaultTimeStampTypeString = "timestamp"
private[driver] val MeasureMetadata = MetadataBuilder.putBoolean(Output.MeasureMetadataKey, value = true).build()
private[driver] val PkMetadata = MetadataBuilder.putBoolean(Output.PrimaryKeyMetadataKey, value = true).build()
private[driver] val PkTimeMetadata = MetadataBuilder.putBoolean(Output.PrimaryKeyMetadataKey, value = true)
.putBoolean(Output.TimeDimensionKey, value = true).build()
private[driver] val mapSparkTypes: Map[DataType, TypeOp] = Map(
LongType -> TypeOp.Long,
DoubleType -> TypeOp.Double,
DecimalType(Default_Precision, Default_Scale) -> TypeOp.BigDecimal,
IntegerType -> TypeOp.Int,
BooleanType -> TypeOp.Boolean,
DateType -> TypeOp.Date,
TimestampType -> TypeOp.Timestamp,
ArrayType(DoubleType) -> TypeOp.ArrayDouble,
ArrayType(StringType) -> TypeOp.ArrayString,
StringType -> TypeOp.String,
MapType(StringType, LongType) -> TypeOp.MapStringLong,
MapType(StringType, DoubleType, valueContainsNull = false) -> TypeOp.MapStringDouble)

def getSchemasFromTransformations(transformationsModel: Seq[TransformationsModel],
def getSchemasFromTransformations(transformationsModel: Seq[TransformationModel],
initSchema: Map[String, StructType]): Map[String, StructType] =
initSchema ++ searchSchemasFromParsers(transformationsModel.sortBy(_.order), initSchema)

Expand Down Expand Up @@ -137,8 +130,38 @@ object SchemaHelper {
case _ => TypeOp.String
}

private def searchSchemasFromParsers(transformationsModel: Seq[TransformationsModel],
schemas: Map[String, StructType]): Map[String, StructType] =

def getTimeFieldType(dateTimeType: TypeOp,
fieldName: String,
nullable: Boolean,
metadata: Option[Metadata] = None): StructField =
dateTimeType match {
case TypeOp.Date | TypeOp.DateTime =>
Output.defaultDateField(fieldName, nullable, metadata.getOrElse(Metadata.empty))
case TypeOp.Timestamp =>
Output.defaultTimeStampField(fieldName, nullable, metadata.getOrElse(Metadata.empty))
case TypeOp.Long =>
Output.defaultLongField(fieldName, nullable, metadata.getOrElse(Metadata.empty))
case TypeOp.String =>
Output.defaultStringField(fieldName, nullable, metadata.getOrElse(Metadata.empty))
case _ =>
Output.defaultStringField(fieldName, nullable, metadata.getOrElse(Metadata.empty))
}

private[driver] def measuresFields(operators: Seq[Operator], avoidNullValues: Boolean): Seq[StructField] =
operators.map(operator =>
StructField(operator.key, rowTypeFromOption(operator.returnType), !avoidNullValues, MeasureMetadata))

private[driver] def dimensionsFields(fields: Seq[Dimension], avoidNullValues: Boolean): Seq[StructField] =
fields.map(field =>
StructField(field.name, rowTypeFromOption(field.precision.typeOp), !avoidNullValues, PkMetadata)
)

private[driver] def rowTypeFromOption(optionType: TypeOp): DataType = mapTypes.getOrElse(optionType, StringType)


private[driver] def searchSchemasFromParsers(transformationsModel: Seq[TransformationModel],
schemas: Map[String, StructType]): Map[String, StructType] =
transformationsModel.headOption match {
case Some(transformationModel) =>
val schema = transformationModel.outputFieldsTransformed.map(outputField =>
Expand Down Expand Up @@ -166,48 +189,21 @@ object SchemaHelper {
schemas
}

private def filterDimensionsByTime(dimensions: Seq[Dimension], timeDimension: Option[String]): Seq[Dimension] =
private[driver] def filterDimensionsByTime(dimensions: Seq[Dimension],
timeDimension: Option[String]): Seq[Dimension] =
timeDimension match {
case Some(timeName) => dimensions.filter(dimension => dimension.name != timeName)
case None => dimensions
}

private def timeDimensionFieldType(timeDimension: Option[String],
dateType: TypeOp.Value,
avoidNullValues: Boolean): Seq[StructField] = {
private[driver] def timeDimensionFieldType(timeDimension: Option[String],
dateType: TypeOp.Value,
avoidNullValues: Boolean): Seq[StructField] = {
timeDimension match {
case None =>
Seq.empty[StructField]
case Some(timeDimensionName) =>
Seq(getTimeFieldType(dateType, timeDimensionName, !avoidNullValues, Some(PkTimeMetadata)))
}
}

def getTimeFieldType(dateTimeType: TypeOp,
fieldName: String,
nullable: Boolean,
metadata: Option[Metadata] = None): StructField =
dateTimeType match {
case TypeOp.Date | TypeOp.DateTime =>
Output.defaultDateField(fieldName, nullable, metadata.getOrElse(Metadata.empty))
case TypeOp.Timestamp =>
Output.defaultTimeStampField(fieldName, nullable, metadata.getOrElse(Metadata.empty))
case TypeOp.Long =>
Output.defaultLongField(fieldName, nullable, metadata.getOrElse(Metadata.empty))
case TypeOp.String =>
Output.defaultStringField(fieldName, nullable, metadata.getOrElse(Metadata.empty))
case _ =>
Output.defaultStringField(fieldName, nullable, metadata.getOrElse(Metadata.empty))
}

private def measuresFields(operators: Seq[Operator], avoidNullValues: Boolean): Seq[StructField] =
operators.map(operator =>
StructField(operator.key, rowTypeFromOption(operator.returnType), !avoidNullValues, MeasureMetadata))

private def dimensionsFields(fields: Seq[Dimension], avoidNullValues: Boolean): Seq[StructField] =
fields.map(field =>
StructField(field.name, rowTypeFromOption(field.precision.typeOp), !avoidNullValues, PkMetadata)
)

private def rowTypeFromOption(optionType: TypeOp): DataType = mapTypes.getOrElse(optionType, StringType)
}
Loading

0 comments on commit dc872db

Please sign in to comment.