Skip to content

Commit

Permalink
Merge pull request #74 from civitaspo/protect-created-within
Browse files Browse the repository at this point in the history
Add a new feature: `protect` option for `athena.drop_table_multi>` op…
  • Loading branch information
civitaspo authored Aug 5, 2019
2 parents 32215cf + fb60431 commit 7ea9ee7
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 16 deletions.
18 changes: 13 additions & 5 deletions example/example.dig
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,19 @@ _export:
echo>: ${athena.last_table_exists}

+step17:
athena.drop_table_multi>:
database: ${database}
regexp: 'hoge_\d+'
with_location: true
limit: 3
+step1:
athena.drop_table_multi>:
database: ${database}
regexp: 'hoge_\d+'
with_location: true
limit: 3
+step2:
athena.drop_table_multi>:
database: ${database}
regexp: 'hoge_\d+'
with_location: true
protect:
created_within: 1h

+step18:
athena.drop_table_multi>:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package pro.civitaspo.digdag.plugin.athena.drop_table_multi


import java.util.Date

import com.amazonaws.services.glue.model.Table
import io.digdag.client.config.Config
import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine}
import io.digdag.util.DurationParam
import pro.civitaspo.digdag.plugin.athena.AbstractAthenaOperator


Expand All @@ -14,29 +18,83 @@ class AthenaDropTableMultiOperator(operatorName: String,
{
val database: String = params.get("database", classOf[String])
val regexp: String = params.getOptional("regexp", classOf[String]).orNull()
val protect: Option[Config] = Option(params.getOptionalNested("protect").orNull())
val limit: Option[Int] = Option(params.getOptional("limit", classOf[Int]).orNull())
val withLocation: Boolean = params.get("with_location", classOf[Boolean], false)
val catalogId: Option[String] = Option(params.getOptional("catalog_id", classOf[String]).orNull())

val now: Long = System.currentTimeMillis()

override def runTask(): TaskResult =
{
logger.info(s"Drop tables matched by the expression: /$regexp/ in $database")
aws.glue.table.list(catalogId, database, Option(regexp), limit).foreach { t =>
if (withLocation) {
val location: String = {
val l = t.getStorageDescriptor.getLocation
if (l.endsWith("/")) l
else l + "/"
}
if (aws.s3.hasObjects(location)) {
logger.info(s"Delete objects because the location $location has objects.")
aws.s3.rm_r(location).foreach(uri => logger.info(s"Deleted: ${uri.toString}"))
if (!isProtected(t)) {
if (withLocation) {
val location: String = {
val l = t.getStorageDescriptor.getLocation
if (l.endsWith("/")) l
else l + "/"
}
if (aws.s3.hasObjects(location)) {
logger.info(s"Delete objects because the location $location has objects.")
aws.s3.rm_r(location).foreach(uri => logger.info(s"Deleted: ${uri.toString}"))
}
}
logger.info(s"Drop the table '$database.${t.getName}'")
aws.glue.table.delete(catalogId, database, t.getName)
}
logger.info(s"Drop the table '$database.${t.getName}'")
aws.glue.table.delete(catalogId, database, t.getName)
}
TaskResult.empty(cf)
}

protected def isProtected(t: Table): Boolean =
{
protect match {
case None => return false
case Some(c) =>
Option(c.getOptional("created_within", classOf[DurationParam]).orNull()) match {
case None => // do nothing
case Some(d) =>
Option(t.getCreateTime).foreach { date =>
if (isDateWithin(date, d)) {
logger.info(s"Protect the table ${t.getDatabaseName}.${t.getName} because this is created" +
s" within ${d.toString} (created at ${t.getCreateTime.toString}).")
return true
}
}
}
Option(c.getOptional("updated_within", classOf[DurationParam]).orNull()) match {
case None => // do nothing
case Some(d) =>
Option(t.getUpdateTime).foreach { date =>
if (isDateWithin(date, d)) {
logger.info(s"Protect the table ${t.getDatabaseName}.${t.getName} because this is updated" +
s" within ${d.toString} (updated at ${t.getUpdateTime.toString}).")
return true
}
}
}
Option(c.getOptional("accessed_within", classOf[DurationParam]).orNull()) match {
case None => // do nothing
case Some(d) =>
Option(t.getLastAccessTime).foreach { date =>
if (isDateWithin(date, d)) {
logger.info(s"Protect the table ${t.getDatabaseName}.${t.getName} because this is accessed" +
s" within ${d.toString} (last accessed at ${t.getLastAccessTime.toString}).")
return true
}
}
}
}
false
}

protected def isDateWithin(target: Date,
durationWithin: DurationParam): Boolean =
{
val dateWithin: Date = new Date(now - durationWithin.getDuration.toMillis)
target.after(dateWithin)
}

}

0 comments on commit 7ea9ee7

Please sign in to comment.