Skip to content

Commit

Permalink
Merge pull request #45 from civitaspo/develop
Browse files Browse the repository at this point in the history
v0.1.5
  • Loading branch information
civitaspo authored Dec 11, 2018
2 parents 947be0f + 7978e35 commit ba6df44
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 29 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
0.1.5 (2018-12-11)
==================

* [Enhancement] Expose more logs when loading queries.
* [Enhancement] * Use an unique table name as default in `athena.ctas>`

0.1.4 (2018-12-07)
==================

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ _export:
repositories:
- https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-athena:0.1.4
- pro.civitaspo:digdag-operator-athena:0.1.5
athena:
auth_method: profile

Expand Down Expand Up @@ -133,7 +133,7 @@ Define the below options on properties (which is indicated by `-c`, `--config`).

- **select_query**: The select SQL statements or file location (in local or Amazon S3) to be executed for a new table by [`Create Table As Select`]((https://aws.amazon.com/jp/about-aws/whats-new/2018/10/athena_ctas_support/)). You can use digdag's template engine like `${...}` in the SQL query. (string, required)
- **database**: The database name for query execution context. (string, optional)
- **table**: The table name for the new table (string, default: `digdag_athena_ctas_${session_uuid.replaceAll("-", "_")}`)
- **table**: The table name for the new table (string, default: `digdag_athena_ctas_${session_uuid.replaceAll("-", "")}_${random}`)
- **output**: Output location for data created by CTAS (string, default: `"s3://aws-athena-query-results-${AWS_ACCOUNT_ID}-<AWS_REGION>/Unsaved/${YEAR}/${MONTH}/${DAY}/${athena_query_id}/"`)
- **format**: The data format for the CTAS query results, such as `"orc"`, `"parquet"`, `"avro"`, `"json"`, or `"textfile"`. (string, default: `"parquet"`)
- **compression**: The compression type to use for `"orc"` or `"parquet"`. (string, default: `"snappy"`)
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

group = 'pro.civitaspo'
version = '0.1.4'
version = '0.1.5'

def digdagVersion = '0.9.27'
def awsSdkVersion = "1.11.372"
Expand Down
2 changes: 1 addition & 1 deletion example/example.dig
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ _export:
- file://${repos}
# - https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-athena:0.1.4
- pro.civitaspo:digdag-operator-athena:0.1.5
athena:
auth_method: profile
value: 5
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package pro.civitaspo.digdag.plugin.athena.operator

import java.io.File
import java.nio.charset.StandardCharsets.UTF_8

import com.amazonaws.services.s3.AmazonS3URI
Expand All @@ -11,7 +10,7 @@ import io.digdag.spi.{ImmutableTaskResult, OperatorContext, TaskResult, Template
import io.digdag.util.DurationParam

import scala.collection.JavaConverters._
import scala.util.Try
import scala.util.{Failure, Random, Success, Try}

class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemConfig: Config, templateEngine: TemplateEngine)
extends AbstractAthenaOperator(operatorName, context, systemConfig, templateEngine) {
Expand Down Expand Up @@ -56,9 +55,15 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC
}
}

protected lazy val defaultTableName: String = {
val normalizedSessionUuid: String = sessionUuid.replaceAll("-", "")
val random: String = Random.alphanumeric.take(5).mkString
s"digdag_athena_ctas_${normalizedSessionUuid}_$random"
}

protected val selectQueryOrFile: String = params.get("select_query", classOf[String])
protected val database: Optional[String] = params.getOptional("database", classOf[String])
protected val table: String = params.get("table", classOf[String], s"digdag_athena_ctas_${sessionUuid.replaceAll("-", "_")}")
protected val table: String = params.get("table", classOf[String], defaultTableName)
protected val output: Optional[String] = params.getOptional("output", classOf[String])
protected val format: String = params.get("format", classOf[String], "parquet")
protected val compression: String = params.get("compression", classOf[String], "snappy")
Expand All @@ -73,20 +78,38 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC
protected val timeout: DurationParam = params.get("timeout", classOf[DurationParam], DurationParam.parse("10m"))

protected lazy val selectQuery: String = {
val t: Try[String] = Try {
if (selectQueryOrFile.startsWith("s3://")) {
val uri = AmazonS3URI(selectQueryOrFile)
val content = withS3(_.getObjectAsString(uri.getBucket, uri.getKey))
templateEngine.template(content, params)
}
else {
val f: File = workspace.getFile(selectQueryOrFile)
workspace.templateFile(templateEngine, f.getPath, UTF_8, params)
}
}
val t: Try[String] =
if (selectQueryOrFile.startsWith("s3://")) loadQueryOnS3(selectQueryOrFile)
else loadQueryOnLocalFileSystem(selectQueryOrFile)

t.getOrElse(selectQueryOrFile)
}

protected def loadQueryOnS3(uriString: String): Try[String] = {
val t = Try {
val uri: AmazonS3URI = AmazonS3URI(uriString)
val content = withS3(_.getObjectAsString(uri.getBucket, uri.getKey))
templateEngine.template(content, params)
}
t match {
case Success(_) => logger.info("Succeeded to load the query on S3.")
case Failure(e) => logger.warn(s"Failed to load the query on S3.: ${e.getMessage}")
}
t
}

protected def loadQueryOnLocalFileSystem(path: String): Try[String] = {
val t = Try {
val f = workspace.getFile(path)
workspace.templateFile(templateEngine, f.getPath, UTF_8, params)
}
t match {
case Success(_) => logger.info("Succeeded to load the query on LocalFileSystem.")
case Failure(e) => logger.warn(s"Failed to load the query on LocalFileSystem.: ${e.getMessage}")
}
t
}

override def runTask(): TaskResult = {
saveMode match {
case SaveMode.ErrorIfExists if output.isPresent && hasObjects(output.get) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine}
import io.digdag.util.DurationParam
import pro.civitaspo.digdag.plugin.athena.wrapper.{NotRetryableException, ParamInGiveup, ParamInRetry, RetryableException, RetryExecutorWrapper}

import scala.util.{Random, Try}
import scala.util.{Failure, Random, Success, Try}
import scala.util.hashing.MurmurHash3

class AthenaQueryOperator(operatorName: String, context: OperatorContext, systemConfig: Config, templateEngine: TemplateEngine)
Expand Down Expand Up @@ -71,18 +71,36 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system
protected val preview: Boolean = params.get("preview", classOf[Boolean], true)

protected lazy val query: String = {
val t: Try[String] =
if (queryOrFile.startsWith("s3://")) loadQueryOnS3(queryOrFile)
else loadQueryOnLocalFileSystem(queryOrFile)

t.getOrElse(queryOrFile)
}

protected def loadQueryOnS3(uriString: String): Try[String] = {
val t = Try {
if (queryOrFile.startsWith("s3://")) {
val uri = AmazonS3URI(queryOrFile)
val content = withS3(_.getObjectAsString(uri.getBucket, uri.getKey))
templateEngine.template(content, params)
}
else {
val f = workspace.getFile(queryOrFile)
workspace.templateFile(templateEngine, f.getPath, UTF_8, params)
}
val uri: AmazonS3URI = AmazonS3URI(uriString)
val content = withS3(_.getObjectAsString(uri.getBucket, uri.getKey))
templateEngine.template(content, params)
}
t.getOrElse(queryOrFile)
t match {
case Success(_) => logger.info("Succeeded to load the query on S3.")
case Failure(e) => logger.warn(s"Failed to load the query on S3.: ${e.getMessage}")
}
t
}

protected def loadQueryOnLocalFileSystem(path: String): Try[String] = {
val t = Try {
val f = workspace.getFile(path)
workspace.templateFile(templateEngine, f.getPath, UTF_8, params)
}
t match {
case Success(_) => logger.debug("Succeeded to load the query on LocalFileSystem.")
case Failure(e) => logger.debug(s"Failed to load the query on LocalFileSystem.: ${e.getMessage}")
}
t
}

protected lazy val clientRequestToken: String = {
Expand Down

0 comments on commit ba6df44

Please sign in to comment.