diff --git a/CHANGELOG.md b/CHANGELOG.md index 60a2641..366eed2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +0.1.5 (2018-12-11) +================== + +* [Enhancement] Expose more logs when loading queries. +* [Enhancement] * Use an unique table name as default in `athena.ctas>` + 0.1.4 (2018-12-07) ================== diff --git a/README.md b/README.md index a5a1b2e..500667d 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ _export: repositories: - https://jitpack.io dependencies: - - pro.civitaspo:digdag-operator-athena:0.1.4 + - pro.civitaspo:digdag-operator-athena:0.1.5 athena: auth_method: profile @@ -133,7 +133,7 @@ Define the below options on properties (which is indicated by `-c`, `--config`). - **select_query**: The select SQL statements or file location (in local or Amazon S3) to be executed for a new table by [`Create Table As Select`]((https://aws.amazon.com/jp/about-aws/whats-new/2018/10/athena_ctas_support/)). You can use digdag's template engine like `${...}` in the SQL query. (string, required) - **database**: The database name for query execution context. (string, optional) -- **table**: The table name for the new table (string, default: `digdag_athena_ctas_${session_uuid.replaceAll("-", "_")}`) +- **table**: The table name for the new table (string, default: `digdag_athena_ctas_${session_uuid.replaceAll("-", "")}_${random}`) - **output**: Output location for data created by CTAS (string, default: `"s3://aws-athena-query-results-${AWS_ACCOUNT_ID}-/Unsaved/${YEAR}/${MONTH}/${DAY}/${athena_query_id}/"`) - **format**: The data format for the CTAS query results, such as `"orc"`, `"parquet"`, `"avro"`, `"json"`, or `"textfile"`. (string, default: `"parquet"`) - **compression**: The compression type to use for `"orc"` or `"parquet"`. (string, default: `"snappy"`) diff --git a/build.gradle b/build.gradle index dba15d6..ca42569 100644 --- a/build.gradle +++ b/build.gradle @@ -6,7 +6,7 @@ plugins { } group = 'pro.civitaspo' -version = '0.1.4' +version = '0.1.5' def digdagVersion = '0.9.27' def awsSdkVersion = "1.11.372" diff --git a/example/example.dig b/example/example.dig index ab148d7..417a8a4 100644 --- a/example/example.dig +++ b/example/example.dig @@ -4,7 +4,7 @@ _export: - file://${repos} # - https://jitpack.io dependencies: - - pro.civitaspo:digdag-operator-athena:0.1.4 + - pro.civitaspo:digdag-operator-athena:0.1.5 athena: auth_method: profile value: 5 diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala index 9f23652..c91a0a6 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala @@ -1,6 +1,5 @@ package pro.civitaspo.digdag.plugin.athena.operator -import java.io.File import java.nio.charset.StandardCharsets.UTF_8 import com.amazonaws.services.s3.AmazonS3URI @@ -11,7 +10,7 @@ import io.digdag.spi.{ImmutableTaskResult, OperatorContext, TaskResult, Template import io.digdag.util.DurationParam import scala.collection.JavaConverters._ -import scala.util.Try +import scala.util.{Failure, Random, Success, Try} class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemConfig: Config, templateEngine: TemplateEngine) extends AbstractAthenaOperator(operatorName, context, systemConfig, templateEngine) { @@ -56,9 +55,15 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC } } + protected lazy val defaultTableName: String = { + val normalizedSessionUuid: String = sessionUuid.replaceAll("-", "") + val random: String = Random.alphanumeric.take(5).mkString + s"digdag_athena_ctas_${normalizedSessionUuid}_$random" + } + protected val selectQueryOrFile: String = params.get("select_query", classOf[String]) protected val database: Optional[String] = params.getOptional("database", classOf[String]) - protected val table: String = params.get("table", classOf[String], s"digdag_athena_ctas_${sessionUuid.replaceAll("-", "_")}") + protected val table: String = params.get("table", classOf[String], defaultTableName) protected val output: Optional[String] = params.getOptional("output", classOf[String]) protected val format: String = params.get("format", classOf[String], "parquet") protected val compression: String = params.get("compression", classOf[String], "snappy") @@ -73,20 +78,38 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC protected val timeout: DurationParam = params.get("timeout", classOf[DurationParam], DurationParam.parse("10m")) protected lazy val selectQuery: String = { - val t: Try[String] = Try { - if (selectQueryOrFile.startsWith("s3://")) { - val uri = AmazonS3URI(selectQueryOrFile) - val content = withS3(_.getObjectAsString(uri.getBucket, uri.getKey)) - templateEngine.template(content, params) - } - else { - val f: File = workspace.getFile(selectQueryOrFile) - workspace.templateFile(templateEngine, f.getPath, UTF_8, params) - } - } + val t: Try[String] = + if (selectQueryOrFile.startsWith("s3://")) loadQueryOnS3(selectQueryOrFile) + else loadQueryOnLocalFileSystem(selectQueryOrFile) + t.getOrElse(selectQueryOrFile) } + protected def loadQueryOnS3(uriString: String): Try[String] = { + val t = Try { + val uri: AmazonS3URI = AmazonS3URI(uriString) + val content = withS3(_.getObjectAsString(uri.getBucket, uri.getKey)) + templateEngine.template(content, params) + } + t match { + case Success(_) => logger.info("Succeeded to load the query on S3.") + case Failure(e) => logger.warn(s"Failed to load the query on S3.: ${e.getMessage}") + } + t + } + + protected def loadQueryOnLocalFileSystem(path: String): Try[String] = { + val t = Try { + val f = workspace.getFile(path) + workspace.templateFile(templateEngine, f.getPath, UTF_8, params) + } + t match { + case Success(_) => logger.info("Succeeded to load the query on LocalFileSystem.") + case Failure(e) => logger.warn(s"Failed to load the query on LocalFileSystem.: ${e.getMessage}") + } + t + } + override def runTask(): TaskResult = { saveMode match { case SaveMode.ErrorIfExists if output.isPresent && hasObjects(output.get) => diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala index 9ad0135..888d26c 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala @@ -22,7 +22,7 @@ import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine} import io.digdag.util.DurationParam import pro.civitaspo.digdag.plugin.athena.wrapper.{NotRetryableException, ParamInGiveup, ParamInRetry, RetryableException, RetryExecutorWrapper} -import scala.util.{Random, Try} +import scala.util.{Failure, Random, Success, Try} import scala.util.hashing.MurmurHash3 class AthenaQueryOperator(operatorName: String, context: OperatorContext, systemConfig: Config, templateEngine: TemplateEngine) @@ -71,18 +71,36 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system protected val preview: Boolean = params.get("preview", classOf[Boolean], true) protected lazy val query: String = { + val t: Try[String] = + if (queryOrFile.startsWith("s3://")) loadQueryOnS3(queryOrFile) + else loadQueryOnLocalFileSystem(queryOrFile) + + t.getOrElse(queryOrFile) + } + + protected def loadQueryOnS3(uriString: String): Try[String] = { val t = Try { - if (queryOrFile.startsWith("s3://")) { - val uri = AmazonS3URI(queryOrFile) - val content = withS3(_.getObjectAsString(uri.getBucket, uri.getKey)) - templateEngine.template(content, params) - } - else { - val f = workspace.getFile(queryOrFile) - workspace.templateFile(templateEngine, f.getPath, UTF_8, params) - } + val uri: AmazonS3URI = AmazonS3URI(uriString) + val content = withS3(_.getObjectAsString(uri.getBucket, uri.getKey)) + templateEngine.template(content, params) } - t.getOrElse(queryOrFile) + t match { + case Success(_) => logger.info("Succeeded to load the query on S3.") + case Failure(e) => logger.warn(s"Failed to load the query on S3.: ${e.getMessage}") + } + t + } + + protected def loadQueryOnLocalFileSystem(path: String): Try[String] = { + val t = Try { + val f = workspace.getFile(path) + workspace.templateFile(templateEngine, f.getPath, UTF_8, params) + } + t match { + case Success(_) => logger.debug("Succeeded to load the query on LocalFileSystem.") + case Failure(e) => logger.debug(s"Failed to load the query on LocalFileSystem.: ${e.getMessage}") + } + t } protected lazy val clientRequestToken: String = {