diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/TableSnapshotsExpirationTask.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/TableSnapshotsExpirationTask.java index a543a1130..90ec995bb 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/TableSnapshotsExpirationTask.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/TableSnapshotsExpirationTask.java @@ -8,6 +8,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Objects; /** * A task to expire snapshots from a table. @@ -31,15 +32,19 @@ public JobConf.JobTypeEnum getType() { @Override protected List getArgs() { HistoryConfig config = metadata.getHistoryConfig(); - List jobArgs = new ArrayList<>(); - if (config.getMaxAge() > 0) { - jobArgs.addAll( - Arrays.asList( - "--maxAge", Integer.toString(config.getMaxAge()), - "--granularity", config.getGranularity().getValue())); - } - if (config.getVersions() > 0) { - jobArgs.addAll(Arrays.asList("--versions", Integer.toString(config.getVersions()))); + List jobArgs = new ArrayList<>(Arrays.asList("--tableName", metadata.fqtn())); + if (config != null) { + if (config.getMaxAge() > 0) { + jobArgs.addAll( + Arrays.asList( + "--maxAge", + Objects.toString(config.getMaxAge()), + "--granularity", + config.getGranularity().getValue())); + } + if (config.getVersions() > 0) { + jobArgs.addAll(Arrays.asList("--versions", Objects.toString(config.getVersions()))); + } } return jobArgs; } diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java index 2b86ec38e..a084c068f 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java @@ -216,14 +216,13 @@ public void expireSnapshots(String fqtn, int maxAge, String granularity, int ver public void expireSnapshots(Table table, int maxAge, String granularity, int versions) { ExpireSnapshots expireSnapshotsCommand = table.expireSnapshots().cleanExpiredFiles(false); - // maxAge is always defined with granularity - if (!granularity.isEmpty()) { - TimeUnit timeUnitGranularity = TimeUnit.valueOf(granularity.toUpperCase()); - long expireBeforeTimestampMs = - System.currentTimeMillis() - timeUnitGranularity.toMillis(maxAge); - log.info("Expiring snapshots for table: {} older than {}ms", table, expireBeforeTimestampMs); - expireSnapshotsCommand.expireOlderThan(expireBeforeTimestampMs).commit(); - } + // maxAge will always be defined + TimeUnit timeUnitGranularity = TimeUnit.valueOf(granularity.toUpperCase()); + long expireBeforeTimestampMs = + System.currentTimeMillis() - timeUnitGranularity.toMillis(maxAge); + log.info("Expiring snapshots for table: {} older than {}ms", table, expireBeforeTimestampMs); + expireSnapshotsCommand.expireOlderThan(expireBeforeTimestampMs).commit(); + if (versions > 0 && Iterators.size(table.snapshots().iterator()) > versions) { log.info("Expiring snapshots for table: {} retaining last {} versions", table, versions); // Note: retainLast keeps the last N snapshots that WOULD be expired, hence expireOlderThan diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/SnapshotsExpirationSparkApp.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/SnapshotsExpirationSparkApp.java index 1d653858b..5c042e3e9 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/SnapshotsExpirationSparkApp.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/SnapshotsExpirationSparkApp.java @@ -12,7 +12,7 @@ * are older than provided count of granularities are deleted. Current snapshot is always preserved. * *

Example of invocation: com.linkedin.openhouse.jobs.spark.SnapshotsExpirationSparkApp - * --tableName db.testTable --count 7 --granularity day + * --tableName db.testTable --maxAge 3 --granularity day --versions 10 */ @Slf4j public class SnapshotsExpirationSparkApp extends BaseTableSparkApp { @@ -26,11 +26,6 @@ public static class DEFAULT_CONFIGURATION { public static final int VERSIONS = 0; } - private static final String DEFAULT_GRANULARITY = ""; - - // By default do not define versions, and only retain snapshots based on max age - private static final String DEFAULT_VERSIONS = "0"; - public SnapshotsExpirationSparkApp( String jobId, StateManager stateManager, @@ -39,15 +34,15 @@ public SnapshotsExpirationSparkApp( String granularity, int versions) { super(jobId, stateManager, fqtn); - if (maxAge == 0 && versions == 0) { + // By default, always enforce a time to live for snapshots even if unconfigured + if (maxAge == 0) { this.maxAge = DEFAULT_CONFIGURATION.MAX_AGE; this.granularity = DEFAULT_CONFIGURATION.GRANULARITY; - this.versions = DEFAULT_CONFIGURATION.VERSIONS; } else { - this.granularity = granularity; this.maxAge = maxAge; - this.versions = versions; + this.granularity = granularity; } + this.versions = versions; } @Override @@ -78,7 +73,7 @@ public static void main(String[] args) { cmdLine.getOptionValue("tableName"), Integer.parseInt(cmdLine.getOptionValue("maxAge", "0")), cmdLine.getOptionValue("granularity", ""), - Integer.parseInt(cmdLine.getOptionValue("minVersions", "0"))); + Integer.parseInt(cmdLine.getOptionValue("versions", "0"))); app.run(); } } diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/scheduler/tasks/SnapshotExpirationTaskTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/scheduler/tasks/SnapshotExpirationTaskTest.java new file mode 100644 index 000000000..bc284bb46 --- /dev/null +++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/scheduler/tasks/SnapshotExpirationTaskTest.java @@ -0,0 +1,107 @@ +package com.linkedin.openhouse.jobs.scheduler.tasks; + +import com.linkedin.openhouse.jobs.client.JobsClient; +import com.linkedin.openhouse.jobs.client.TablesClient; +import com.linkedin.openhouse.jobs.util.HistoryConfig; +import com.linkedin.openhouse.jobs.util.TableMetadata; +import com.linkedin.openhouse.tables.client.model.History; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class SnapshotExpirationTaskTest { + private TablesClient tablesClient; + private JobsClient jobsClient; + private TableMetadata tableMetadata; + + @BeforeEach + void setup() { + tablesClient = Mockito.mock(TablesClient.class); + jobsClient = Mockito.mock(JobsClient.class); + tableMetadata = Mockito.mock(TableMetadata.class); + Mockito.when(tableMetadata.fqtn()).thenReturn("db.table"); + } + + @Test + void testSnapshotExpirationForTableWithoutConfig() { + TableSnapshotsExpirationTask tableRetentionTask = + new TableSnapshotsExpirationTask(jobsClient, tablesClient, tableMetadata); + + List expectedArgs = + Stream.of("--tableName", tableMetadata.fqtn()).collect(Collectors.toList()); + Assertions.assertEquals(expectedArgs, tableRetentionTask.getArgs()); + } + + @Test + void testSnapshotExpirationJobWithOnlyMaxAgeConfig() { + TableSnapshotsExpirationTask tableRetentionTask = + new TableSnapshotsExpirationTask(jobsClient, tablesClient, tableMetadata); + + HistoryConfig historyConfigMock = Mockito.mock(HistoryConfig.class); + int maxAge = 1; + History.GranularityEnum granularity = History.GranularityEnum.DAY; + + Mockito.when(tableMetadata.getHistoryConfig()).thenReturn(historyConfigMock); + Mockito.when(historyConfigMock.getMaxAge()).thenReturn(maxAge); + Mockito.when(historyConfigMock.getGranularity()).thenReturn(granularity); + List expectedArgs = + Stream.of( + "--tableName", + tableMetadata.fqtn(), + "--maxAge", + String.valueOf(maxAge), + "--granularity", + granularity.getValue()) + .collect(Collectors.toList()); + Assertions.assertEquals(expectedArgs, tableRetentionTask.getArgs()); + } + + @Test + void testSnapshotExpirationJobWithOnlyVersionsConfig() { + TableSnapshotsExpirationTask tableRetentionTask = + new TableSnapshotsExpirationTask(jobsClient, tablesClient, tableMetadata); + + HistoryConfig historyConfigMock = Mockito.mock(HistoryConfig.class); + int versions = 3; + + Mockito.when(tableMetadata.getHistoryConfig()).thenReturn(historyConfigMock); + Mockito.when(historyConfigMock.getVersions()).thenReturn(versions); + List expectedArgs = + Stream.of("--tableName", tableMetadata.fqtn(), "--versions", String.valueOf(versions)) + .collect(Collectors.toList()); + Assertions.assertEquals(expectedArgs, tableRetentionTask.getArgs()); + } + + @Test + void testSnapshotExpirationJobWithMaxAgeAndVersions() { + TableSnapshotsExpirationTask tableRetentionTask = + new TableSnapshotsExpirationTask(jobsClient, tablesClient, tableMetadata); + + HistoryConfig historyConfigMock = Mockito.mock(HistoryConfig.class); + int maxAge = 3; + History.GranularityEnum granularity = History.GranularityEnum.DAY; + int versions = 3; + + Mockito.when(tableMetadata.getHistoryConfig()).thenReturn(historyConfigMock); + Mockito.when(historyConfigMock.getMaxAge()).thenReturn(maxAge); + Mockito.when(historyConfigMock.getGranularity()).thenReturn(granularity); + Mockito.when(historyConfigMock.getVersions()).thenReturn(versions); + + List expectedArgs = + Stream.of( + "--tableName", + tableMetadata.fqtn(), + "--maxAge", + String.valueOf(maxAge), + "--granularity", + granularity.getValue(), + "--versions", + String.valueOf(versions)) + .collect(Collectors.toList()); + Assertions.assertEquals(expectedArgs, tableRetentionTask.getArgs()); + } +} diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java index b4156de62..95cbd8c27 100644 --- a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java +++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java @@ -339,6 +339,8 @@ public void testSnapshotsExpirationVersionsNoop() throws Exception { final String tableName = "db.test_es_versions_noop_java"; final int numInserts = 3; final int versionsToKeep = 5; // Should keep all versions given that there are fewer versions + final int maxAge = 3; + final String timeGranularity = "DAYS"; List snapshotIds; try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) { prepareTable(ops, tableName); @@ -351,7 +353,7 @@ public void testSnapshotsExpirationVersionsNoop() throws Exception { Table table = ops.getTable(tableName); log.info("Loaded table {}, location {}", table.name(), table.location()); - ops.expireSnapshots(table, 0, "", versionsToKeep); + ops.expireSnapshots(table, maxAge, timeGranularity, versionsToKeep); // verify that table object snapshots are updated checkSnapshots(table, snapshotIds); } @@ -367,6 +369,8 @@ public void testSnapshotsExpirationVersions() throws Exception { final String tableName = "db.test_es_versions_java"; final int numInserts = 3; final int versionsToKeep = 2; + final int maxAge = 3; + final String timeGranularity = "DAYS"; List snapshotIds; try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) { prepareTable(ops, tableName); @@ -379,7 +383,7 @@ public void testSnapshotsExpirationVersions() throws Exception { Table table = ops.getTable(tableName); log.info("Loaded table {}, location {}", table.name(), table.location()); - ops.expireSnapshots(table, 0, "", versionsToKeep); + ops.expireSnapshots(table, maxAge, timeGranularity, versionsToKeep); // verify that table object snapshots are updated checkSnapshots( table, snapshotIds.subList(snapshotIds.size() - versionsToKeep, snapshotIds.size())); diff --git a/services/jobs/src/test/http/snapshot_expiration_job.http b/services/jobs/src/test/http/snapshot_expiration_job.http index bddf45039..143d86d5f 100644 --- a/services/jobs/src/test/http/snapshot_expiration_job.http +++ b/services/jobs/src/test/http/snapshot_expiration_job.http @@ -10,7 +10,7 @@ Content-Type: application/json "args": [ "--tableName", "db.tb", "--granularity", "day", - "--count", 1 + "--maxAge", 1 ] } }