diff --git a/config/redshift.config.reference.hocon b/config/redshift.config.reference.hocon index 95f6f34ae..df4ce2219 100644 --- a/config/redshift.config.reference.hocon +++ b/config/redshift.config.reference.hocon @@ -32,7 +32,11 @@ # Custom JDBC configuration. Optional, default value { "ssl": true } "jdbc": { "ssl": true }, # MAXERROR, amount of acceptable loading errors. Optional, default value 10 - "maxError": 10 + "maxError": 10, + # unlock experimental features + "experimental": { + "enableWideRow": false + } }, "schedules": { diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala index 535ad3439..f84b8298e 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala @@ -81,7 +81,8 @@ object StorageTarget { username: String, password: PasswordConfig, maxError: Int, - sshTunnel: Option[TunnelConfig]) extends StorageTarget { + sshTunnel: Option[TunnelConfig], + experimental: RedshiftExperimentalFeatures) extends StorageTarget { override def driver: String = "com.amazon.redshift.jdbc42.Driver" override def connectionUrl: String = s"jdbc:redshift://$host:$port/$database" @@ -275,6 +276,8 @@ object StorageTarget { j.sslRootCert, j.tcpKeepAlive, j.tcpKeepAliveMinutes)) } + final case class RedshiftExperimentalFeatures(enableWideRow: Boolean) + /** Reference to encrypted entity inside EC2 Parameter Store */ final case class ParameterStoreConfig(parameterName: String) @@ -328,6 +331,9 @@ object StorageTarget { */ final case class TunnelConfig(bastion: BastionConfig, localPort: Int, destination: DestinationConfig) + implicit def redshiftExperimentalFeaturesDecoder: Decoder[RedshiftExperimentalFeatures] = + deriveDecoder[RedshiftExperimentalFeatures] + implicit def redshiftConfigDecoder: Decoder[Redshift] = deriveDecoder[Redshift] diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/ConfigSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/ConfigSpec.scala index 03de27d68..e2357d3ba 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/ConfigSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/ConfigSpec.scala @@ -104,7 +104,8 @@ object ConfigSpec { "admin", StorageTarget.PasswordConfig.PlainText("Supersecret1"), 10, - None + None, + StorageTarget.RedshiftExperimentalFeatures(false) ) val exampleSnowflake = StorageTarget.Snowflake( Some("us-west-2"), diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTargetSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTargetSpec.scala index d432a00c5..c50fc253a 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTargetSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTargetSpec.scala @@ -33,7 +33,8 @@ class StorageTargetSpec extends Specification { "schema": "atomic", "maxError": 1, "compRows": 20000, - "purpose": "ENRICHED_EVENTS" + "purpose": "ENRICHED_EVENTS", + "experimental": {"enableWideRow": true} }""" val expected = StorageTarget.Redshift( @@ -46,7 +47,8 @@ class StorageTargetSpec extends Specification { "ADD HERE", StorageTarget.PasswordConfig.PlainText("ADD HERE"), 1, - None) + None, + StorageTarget.RedshiftExperimentalFeatures(true)) config.as[StorageTarget] must beRight(expected) } diff --git a/modules/redshift-loader/src/main/resources/application.conf b/modules/redshift-loader/src/main/resources/application.conf index 1fbabefb7..509b11fd8 100644 --- a/modules/redshift-loader/src/main/resources/application.conf +++ b/modules/redshift-loader/src/main/resources/application.conf @@ -4,5 +4,8 @@ "port": 5439 "jdbc": { "ssl": true } "maxError": 10 + "experimental": { + "enableWideRow": false + } } } diff --git a/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala b/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala index f86f26daf..f08656f34 100644 --- a/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala +++ b/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala @@ -21,6 +21,8 @@ import com.snowplowanalytics.iglu.schemaddl.redshift.generators.{DdlFile, DdlGen import com.snowplowanalytics.snowplow.rdbloader.LoadStatements import com.snowplowanalytics.snowplow.rdbloader.common.Common import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo.WideRow.WideRowFormat.{JSON, PARQUET} +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnsToCopy, ColumnsToSkip, EventTableColumns} import com.snowplowanalytics.snowplow.rdbloader.db.Migration.{Block, Entity, Item, NoPreStatements, NoStatements} @@ -43,7 +45,7 @@ object Redshift { def build(config: Config[StorageTarget]): Either[String, Target] = { config.storage match { - case StorageTarget.Redshift(_, _, _, _, roleArn, schema, _, _, maxError, _) => + case StorageTarget.Redshift(_, _, _, _, roleArn, schema, _, _, maxError, _, experimentalFeatures) => val result = new Target { override val requiresEventsColumns: Boolean = false @@ -67,19 +69,39 @@ object Redshift { Block(preTransaction.reverse, inTransaction.reverse, Entity.Table(schema, target)) } - override def extendTable(info: ShreddedType.Info): Option[Block] = - throw new IllegalStateException("Redshift Loader does not support loading wide row") - - override def getLoadStatements(discovery: DataDiscovery, eventTableColumns: EventTableColumns, loadAuthMethod: LoadAuthMethod): LoadStatements = { - val shreddedStatements = discovery - .shreddedTypes - .filterNot(_.isAtomic) - .map(shreddedType => Statement.ShreddedCopy(shreddedType, discovery.compression)) - - val atomic = Statement.EventsCopy(discovery.base, discovery.compression, ColumnsToCopy(AtomicColumns.Columns), ColumnsToSkip.none, discovery.typesInfo, loadAuthMethod) - NonEmptyList(atomic, shreddedStatements) + override def extendTable(info: ShreddedType.Info): Option[Block] = { + val frColumnName = Fragment.const(info.getNameFull) + val frTableName = Fragment.const(EventsTable.withSchema(schema)) + val addColumnSql = sql"ALTER TABLE $frTableName ADD COLUMN $frColumnName SUPER" + Some(Block(List(Item.AddColumn(addColumnSql, Nil)), Nil, Entity.Column(info))) } + override def getLoadStatements(discovery: DataDiscovery, eventTableColumns: EventTableColumns, loadAuthMethod: LoadAuthMethod): LoadStatements = + discovery.typesInfo match { + case TypesInfo.Shredded(_) => + val shreddedStatements = discovery + .shreddedTypes + .filterNot(_.isAtomic) + .map(shreddedType => Statement.ShreddedCopy(shreddedType, discovery.compression)) + + val atomic = Statement.EventsCopy(discovery.base, discovery.compression, ColumnsToCopy(AtomicColumns.Columns), ColumnsToSkip.none, discovery.typesInfo, loadAuthMethod) + NonEmptyList(atomic, shreddedStatements) + case TypesInfo.WideRow(_, _) if !experimentalFeatures.enableWideRow => + throw new IllegalStateException("Experimental widerow loading for Redshift is not enabled") + case TypesInfo.WideRow(_, _) => + val columnsToCopy = ColumnsToCopy.fromDiscoveredData(discovery) + NonEmptyList.one( + Statement.EventsCopy( + discovery.base, + discovery.compression, + columnsToCopy, + ColumnsToSkip.none, + discovery.typesInfo, + loadAuthMethod + ) + ) + } + override def createTable(schemas: SchemaList): Block = { val subschemas = FlatSchema.extractProperties(schemas) val tableName = StringUtils.getTableName(schemas.latest) @@ -110,7 +132,7 @@ object Redshift { val frRoleArn = Fragment.const0(s"aws_iam_role=$roleArn") val frPath = Fragment.const0(source) sql"COPY $frTableName FROM '$frPath' CREDENTIALS '$frRoleArn' DELIMITER '$EventFieldSeparator'" - case Statement.EventsCopy(path, compression, columnsToCopy, _, _, _) => + case Statement.EventsCopy(path, compression, columnsToCopy, _, typesInfo, _) => // For some reasons Redshift JDBC doesn't handle interpolation in COPY statements val frTableName = Fragment.const(EventsTable.withSchema(schema)) val frPath = Fragment.const0(Common.entityPathFull(path, Common.AtomicType)) @@ -119,13 +141,18 @@ object Redshift { val frMaxError = Fragment.const0(maxError.toString) val frCompression = getCompressionFormat(compression) val frColumns = Fragment.const0(columnsToCopy.names.map(_.value).mkString(",")) + val frFileFormat = typesInfo match { + case TypesInfo.Shredded(_) => "CSV DELIMITER '$EventsFieldSeparator'" + case TypesInfo.WideRow(JSON, _) => "JSON 'auto'" + case TypesInfo.WideRow(PARQUET, _) => "PARQUET" + } sql"""COPY $frTableName ($frColumns) FROM '$frPath' | CREDENTIALS '$frRoleArn' + | FORMAT $frFileFormat | REGION '$frRegion' | MAXERROR $frMaxError | TIMEFORMAT 'auto' - | DELIMITER '$EventFieldSeparator' | EMPTYASNULL | FILLRECORD | TRUNCATECOLUMNS @@ -163,7 +190,7 @@ object Redshift { | ACCEPTINVCHARS | $frCompression""".stripMargin case ShreddedType.Widerow(_) => - throw new IllegalStateException("Widerow loading is not yet supported for Redshift") + throw new IllegalStateException("Cannot perform a shredded copy from widerow files") } case Statement.CreateTransient => Fragment.const0(s"CREATE TABLE ${EventsTable.TransitTable(schema).withSchema} ( LIKE ${EventsTable.AtomicEvents(schema).withSchema} )")