Skip to content

Commit

Permalink
Support for wide row tables on Redshift
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Sep 27, 2022
1 parent 1cc45f9 commit 4f5fcd8
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 20 deletions.
6 changes: 5 additions & 1 deletion config/redshift.config.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@
# Custom JDBC configuration. Optional, default value { "ssl": true }
"jdbc": { "ssl": true },
# MAXERROR, amount of acceptable loading errors. Optional, default value 10
"maxError": 10
"maxError": 10,
# unlock experimental features
"experimental": {
"enableWideRow": false
}
},

"schedules": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ object StorageTarget {
username: String,
password: PasswordConfig,
maxError: Int,
sshTunnel: Option[TunnelConfig]) extends StorageTarget {
sshTunnel: Option[TunnelConfig],
experimental: RedshiftExperimentalFeatures) extends StorageTarget {
override def driver: String = "com.amazon.redshift.jdbc42.Driver"

override def connectionUrl: String = s"jdbc:redshift://$host:$port/$database"
Expand Down Expand Up @@ -275,6 +276,8 @@ object StorageTarget {
j.sslRootCert, j.tcpKeepAlive, j.tcpKeepAliveMinutes))
}

final case class RedshiftExperimentalFeatures(enableWideRow: Boolean)

/** Reference to encrypted entity inside EC2 Parameter Store */
final case class ParameterStoreConfig(parameterName: String)

Expand Down Expand Up @@ -328,6 +331,9 @@ object StorageTarget {
*/
final case class TunnelConfig(bastion: BastionConfig, localPort: Int, destination: DestinationConfig)

implicit def redshiftExperimentalFeaturesDecoder: Decoder[RedshiftExperimentalFeatures] =
deriveDecoder[RedshiftExperimentalFeatures]

implicit def redshiftConfigDecoder: Decoder[Redshift] =
deriveDecoder[Redshift]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ object ConfigSpec {
"admin",
StorageTarget.PasswordConfig.PlainText("Supersecret1"),
10,
None
None,
StorageTarget.RedshiftExperimentalFeatures(false)
)
val exampleSnowflake = StorageTarget.Snowflake(
Some("us-west-2"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class StorageTargetSpec extends Specification {
"schema": "atomic",
"maxError": 1,
"compRows": 20000,
"purpose": "ENRICHED_EVENTS"
"purpose": "ENRICHED_EVENTS",
"experimental": {"enableWideRow": true}
}"""

val expected = StorageTarget.Redshift(
Expand All @@ -46,7 +47,8 @@ class StorageTargetSpec extends Specification {
"ADD HERE",
StorageTarget.PasswordConfig.PlainText("ADD HERE"),
1,
None)
None,
StorageTarget.RedshiftExperimentalFeatures(true))

config.as[StorageTarget] must beRight(expected)
}
Expand Down
3 changes: 3 additions & 0 deletions modules/redshift-loader/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@
"port": 5439
"jdbc": { "ssl": true }
"maxError": 10
"experimental": {
"enableWideRow": false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import com.snowplowanalytics.iglu.schemaddl.redshift.generators.{DdlFile, DdlGen
import com.snowplowanalytics.snowplow.rdbloader.LoadStatements
import com.snowplowanalytics.snowplow.rdbloader.common.Common
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo.WideRow.WideRowFormat.{JSON, PARQUET}
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo
import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget}
import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnsToCopy, ColumnsToSkip, EventTableColumns}
import com.snowplowanalytics.snowplow.rdbloader.db.Migration.{Block, Entity, Item, NoPreStatements, NoStatements}
Expand All @@ -43,7 +45,7 @@ object Redshift {

def build(config: Config[StorageTarget]): Either[String, Target] = {
config.storage match {
case StorageTarget.Redshift(_, _, _, _, roleArn, schema, _, _, maxError, _) =>
case StorageTarget.Redshift(_, _, _, _, roleArn, schema, _, _, maxError, _, experimentalFeatures) =>
val result = new Target {

override val requiresEventsColumns: Boolean = false
Expand All @@ -67,19 +69,39 @@ object Redshift {
Block(preTransaction.reverse, inTransaction.reverse, Entity.Table(schema, target))
}

override def extendTable(info: ShreddedType.Info): Option[Block] =
throw new IllegalStateException("Redshift Loader does not support loading wide row")

override def getLoadStatements(discovery: DataDiscovery, eventTableColumns: EventTableColumns, loadAuthMethod: LoadAuthMethod): LoadStatements = {
val shreddedStatements = discovery
.shreddedTypes
.filterNot(_.isAtomic)
.map(shreddedType => Statement.ShreddedCopy(shreddedType, discovery.compression))

val atomic = Statement.EventsCopy(discovery.base, discovery.compression, ColumnsToCopy(AtomicColumns.Columns), ColumnsToSkip.none, discovery.typesInfo, loadAuthMethod)
NonEmptyList(atomic, shreddedStatements)
override def extendTable(info: ShreddedType.Info): Option[Block] = {
val frColumnName = Fragment.const(info.getNameFull)
val frTableName = Fragment.const(EventsTable.withSchema(schema))
val addColumnSql = sql"ALTER TABLE $frTableName ADD COLUMN $frColumnName SUPER"
Some(Block(List(Item.AddColumn(addColumnSql, Nil)), Nil, Entity.Column(info)))
}

override def getLoadStatements(discovery: DataDiscovery, eventTableColumns: EventTableColumns, loadAuthMethod: LoadAuthMethod): LoadStatements =
discovery.typesInfo match {
case TypesInfo.Shredded(_) =>
val shreddedStatements = discovery
.shreddedTypes
.filterNot(_.isAtomic)
.map(shreddedType => Statement.ShreddedCopy(shreddedType, discovery.compression))

val atomic = Statement.EventsCopy(discovery.base, discovery.compression, ColumnsToCopy(AtomicColumns.Columns), ColumnsToSkip.none, discovery.typesInfo, loadAuthMethod)
NonEmptyList(atomic, shreddedStatements)
case TypesInfo.WideRow(_, _) if !experimentalFeatures.enableWideRow =>
throw new IllegalStateException("Experimental widerow loading for Redshift is not enabled")
case TypesInfo.WideRow(_, _) =>
val columnsToCopy = ColumnsToCopy.fromDiscoveredData(discovery)
NonEmptyList.one(
Statement.EventsCopy(
discovery.base,
discovery.compression,
columnsToCopy,
ColumnsToSkip.none,
discovery.typesInfo,
loadAuthMethod
)
)
}

override def createTable(schemas: SchemaList): Block = {
val subschemas = FlatSchema.extractProperties(schemas)
val tableName = StringUtils.getTableName(schemas.latest)
Expand Down Expand Up @@ -110,7 +132,7 @@ object Redshift {
val frRoleArn = Fragment.const0(s"aws_iam_role=$roleArn")
val frPath = Fragment.const0(source)
sql"COPY $frTableName FROM '$frPath' CREDENTIALS '$frRoleArn' DELIMITER '$EventFieldSeparator'"
case Statement.EventsCopy(path, compression, columnsToCopy, _, _, _) =>
case Statement.EventsCopy(path, compression, columnsToCopy, _, typesInfo, _) =>
// For some reasons Redshift JDBC doesn't handle interpolation in COPY statements
val frTableName = Fragment.const(EventsTable.withSchema(schema))
val frPath = Fragment.const0(Common.entityPathFull(path, Common.AtomicType))
Expand All @@ -119,13 +141,18 @@ object Redshift {
val frMaxError = Fragment.const0(maxError.toString)
val frCompression = getCompressionFormat(compression)
val frColumns = Fragment.const0(columnsToCopy.names.map(_.value).mkString(","))
val frFileFormat = typesInfo match {
case TypesInfo.Shredded(_) => "CSV DELIMITER '$EventsFieldSeparator'"
case TypesInfo.WideRow(JSON, _) => "JSON 'auto'"
case TypesInfo.WideRow(PARQUET, _) => "PARQUET"
}

sql"""COPY $frTableName ($frColumns) FROM '$frPath'
| CREDENTIALS '$frRoleArn'
| FORMAT $frFileFormat
| REGION '$frRegion'
| MAXERROR $frMaxError
| TIMEFORMAT 'auto'
| DELIMITER '$EventFieldSeparator'
| EMPTYASNULL
| FILLRECORD
| TRUNCATECOLUMNS
Expand Down Expand Up @@ -163,7 +190,7 @@ object Redshift {
| ACCEPTINVCHARS
| $frCompression""".stripMargin
case ShreddedType.Widerow(_) =>
throw new IllegalStateException("Widerow loading is not yet supported for Redshift")
throw new IllegalStateException("Cannot perform a shredded copy from widerow files")
}
case Statement.CreateTransient =>
Fragment.const0(s"CREATE TABLE ${EventsTable.TransitTable(schema).withSchema} ( LIKE ${EventsTable.AtomicEvents(schema).withSchema} )")
Expand Down

0 comments on commit 4f5fcd8

Please sign in to comment.