Skip to content

Commit

Permalink
Merge pull request #76 from civitaspo/develop
Browse files Browse the repository at this point in the history
v0.3.1
  • Loading branch information
civitaspo authored Aug 5, 2019
2 parents 56e7d18 + 679e7fc commit eb918f9
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 25 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
0.3.1 (2019-08-05)
==================

* [Fix -- `athena.ctas>`] When using `save_mode: overwrite`, delete the specified table and location, not the table location that the data catalog has.
* [New featuere -- `athena.drop_table_multi>`] `protect` option.

0.3.0 (2019-07-30)
==================

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ _export:
repositories:
- https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-athena:0.3.0
- pro.civitaspo:digdag-operator-athena:0.3.1
athena:
auth_method: profile

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
}

group = 'pro.civitaspo'
version = '0.3.0'
version = '0.3.1'

def digdagVersion = '0.9.37'
def awsSdkVersion = "1.11.587"
Expand Down
20 changes: 14 additions & 6 deletions example/example.dig
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ _export:
- file://${repos}
# - https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-athena:0.3.0
- pro.civitaspo:digdag-operator-athena:0.3.1
athena:
auth_method: profile
value: 5
Expand Down Expand Up @@ -138,11 +138,19 @@ _export:
echo>: ${athena.last_table_exists}

+step17:
athena.drop_table_multi>:
database: ${database}
regexp: 'hoge_\d+'
with_location: true
limit: 3
+step1:
athena.drop_table_multi>:
database: ${database}
regexp: 'hoge_\d+'
with_location: true
limit: 3
+step2:
athena.drop_table_multi>:
database: ${database}
regexp: 'hoge_\d+'
with_location: true
protect:
created_within: 1h

+step18:
athena.drop_table_multi>:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,23 @@ class AthenaCtasOperator(operatorName: String,
return TaskResult.empty(request)
}

case SaveMode.Overwrite =>
if (aws.glue.table.exists(catalogId, database, table)) {
logger.info(s"'$database.$table' already exists, so delete this.")
aws.glue.table.delete(catalogId, database, table)
}
if (location.exists(aws.s3.hasObjects)) {
logger.info(s"${location.get} already exists, so delete this.")
aws.s3.rm_r(location.get).foreach(uri => logger.info(s"Deleted: ${uri.toString}"))
}

case _ => // do nothing
}

val subTask: Config = cf.create()
if (saveMode.equals(SaveMode.Overwrite)) {
subTask.setNested("+drop-before-ctas", buildDropTableSubTaskConfig(with_location = true))
}
subTask.setNested("+ctas", buildCtasQuerySubTaskConfig())
if (tableMode.equals(TableMode.DataOnly)) {
subTask.setNested("+drop-after-ctas", buildDropTableSubTaskConfig(with_location = false))
subTask.setNested("+drop-after-ctas", buildDropTableSubTaskConfig())
}

val builder: ImmutableTaskResult.Builder = TaskResult.defaultBuilder(cf)
Expand Down Expand Up @@ -235,14 +242,14 @@ class AthenaCtasOperator(operatorName: String,
subTask
}

protected def buildDropTableSubTaskConfig(with_location: Boolean): Config =
protected def buildDropTableSubTaskConfig(): Config =
{
val subTask: Config = cf.create()

subTask.set("_type", "athena.drop_table")
subTask.set("database", database)
subTask.set("table", table)
subTask.set("with_location", with_location)
subTask.set("with_location", false)
catalogId.foreach(cid => subTask.set("catalog_id", cid))

putCommonSettingToSubTask(subTask)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package pro.civitaspo.digdag.plugin.athena.drop_table_multi


import java.util.Date

import com.amazonaws.services.glue.model.Table
import io.digdag.client.config.Config
import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine}
import io.digdag.util.DurationParam
import pro.civitaspo.digdag.plugin.athena.AbstractAthenaOperator


Expand All @@ -14,29 +18,83 @@ class AthenaDropTableMultiOperator(operatorName: String,
{
val database: String = params.get("database", classOf[String])
val regexp: String = params.getOptional("regexp", classOf[String]).orNull()
val protect: Option[Config] = Option(params.getOptionalNested("protect").orNull())
val limit: Option[Int] = Option(params.getOptional("limit", classOf[Int]).orNull())
val withLocation: Boolean = params.get("with_location", classOf[Boolean], false)
val catalogId: Option[String] = Option(params.getOptional("catalog_id", classOf[String]).orNull())

val now: Long = System.currentTimeMillis()

override def runTask(): TaskResult =
{
logger.info(s"Drop tables matched by the expression: /$regexp/ in $database")
aws.glue.table.list(catalogId, database, Option(regexp), limit).foreach { t =>
if (withLocation) {
val location: String = {
val l = t.getStorageDescriptor.getLocation
if (l.endsWith("/")) l
else l + "/"
}
if (aws.s3.hasObjects(location)) {
logger.info(s"Delete objects because the location $location has objects.")
aws.s3.rm_r(location).foreach(uri => logger.info(s"Deleted: ${uri.toString}"))
if (!isProtected(t)) {
if (withLocation) {
val location: String = {
val l = t.getStorageDescriptor.getLocation
if (l.endsWith("/")) l
else l + "/"
}
if (aws.s3.hasObjects(location)) {
logger.info(s"Delete objects because the location $location has objects.")
aws.s3.rm_r(location).foreach(uri => logger.info(s"Deleted: ${uri.toString}"))
}
}
logger.info(s"Drop the table '$database.${t.getName}'")
aws.glue.table.delete(catalogId, database, t.getName)
}
logger.info(s"Drop the table '$database.${t.getName}'")
aws.glue.table.delete(catalogId, database, t.getName)
}
TaskResult.empty(cf)
}

protected def isProtected(t: Table): Boolean =
{
protect match {
case None => return false
case Some(c) =>
Option(c.getOptional("created_within", classOf[DurationParam]).orNull()) match {
case None => // do nothing
case Some(d) =>
Option(t.getCreateTime).foreach { date =>
if (isDateWithin(date, d)) {
logger.info(s"Protect the table ${t.getDatabaseName}.${t.getName} because this is created" +
s" within ${d.toString} (created at ${t.getCreateTime.toString}).")
return true
}
}
}
Option(c.getOptional("updated_within", classOf[DurationParam]).orNull()) match {
case None => // do nothing
case Some(d) =>
Option(t.getUpdateTime).foreach { date =>
if (isDateWithin(date, d)) {
logger.info(s"Protect the table ${t.getDatabaseName}.${t.getName} because this is updated" +
s" within ${d.toString} (updated at ${t.getUpdateTime.toString}).")
return true
}
}
}
Option(c.getOptional("accessed_within", classOf[DurationParam]).orNull()) match {
case None => // do nothing
case Some(d) =>
Option(t.getLastAccessTime).foreach { date =>
if (isDateWithin(date, d)) {
logger.info(s"Protect the table ${t.getDatabaseName}.${t.getName} because this is accessed" +
s" within ${d.toString} (last accessed at ${t.getLastAccessTime.toString}).")
return true
}
}
}
}
false
}

protected def isDateWithin(target: Date,
durationWithin: DurationParam): Boolean =
{
val dateWithin: Date = new Date(now - durationWithin.getDuration.toMillis)
target.after(dateWithin)
}

}

0 comments on commit eb918f9

Please sign in to comment.