Skip to content

Commit

Permalink
Always enforce a ttl on snapshot expiration jobs even if unconfigured (
Browse files Browse the repository at this point in the history
…#281)

## Summary

<!--- HINT: Replace #nnn with corresponding Issue number, if you are
fixing an existing issue -->

[Issue](https://github.com/linkedin/openhouse/issues/#nnn)] Briefly
discuss the summary of the changes made in this
pull request in 2-3 lines.


Currently it is possible to define tables with an version count, and the
snapshot expiration job will respect the version count instead of
enforcing a TTL. With this change the snapshot expiration job will
**always** enforce a TTL, defaulted currently to 3 days. This cannot be
overridden by users unless they explicitly define a TTL that is greater
than 3 days (but is currently the maximum is 3 days due to the history
configuration validator)
https://github.com/linkedin/openhouse/blob/7f66a730ec7d611335917e29bfed01050090c397/services/tables/src/main/java/com/linkedin/openhouse/tables/api/validator/impl/HistoryPolicySpecValidator.java#L57

Also fix some bugs where `TableSnapshotExpirationTask` is not sending
properties correctly from the JobScheduler

## Changes

- [ ] Client-facing API Changes
- [ ] Internal API Changes
- [ ] Bug Fixes
- [ ] New Features
- [ ] Performance Improvements
- [ ] Code Style
- [ ] Refactoring
- [ ] Documentation
- [ ] Tests

For all the boxes checked, please include additional details of the
changes made in this pull request.

## Testing Done
<!--- Check any relevant boxes with "x" -->

- [ ] Manually Tested on local docker setup. Please include commands
ran, and their output.
- [ ] Added new tests for the changes made.
- [x] Updated existing tests to reflect the changes made.
- [ ] No tests added or updated. Please explain why. If unsure, please
feel free to ask for help.
- [ ] Some other form of testing like staging or soak time in
production. Please explain.

Tested E2E with acceptance tests from local

For all the boxes checked, include a detailed description of the testing
done for the changes made in this pull request.

# Additional Information

- [ ] Breaking Changes
- [ ] Deprecations
- [ ] Large PR broken into smaller PRs, and PR plan linked in the
description.

For all the boxes checked, include additional details of the changes
made in this pull request.
  • Loading branch information
Will-Lo authored Jan 29, 2025
1 parent 7f66a73 commit c8f2b5d
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;

/**
* A task to expire snapshots from a table.
Expand All @@ -31,15 +32,19 @@ public JobConf.JobTypeEnum getType() {
@Override
protected List<String> getArgs() {
HistoryConfig config = metadata.getHistoryConfig();
List<String> jobArgs = new ArrayList<>();
if (config.getMaxAge() > 0) {
jobArgs.addAll(
Arrays.asList(
"--maxAge", Integer.toString(config.getMaxAge()),
"--granularity", config.getGranularity().getValue()));
}
if (config.getVersions() > 0) {
jobArgs.addAll(Arrays.asList("--versions", Integer.toString(config.getVersions())));
List<String> jobArgs = new ArrayList<>(Arrays.asList("--tableName", metadata.fqtn()));
if (config != null) {
if (config.getMaxAge() > 0) {
jobArgs.addAll(
Arrays.asList(
"--maxAge",
Objects.toString(config.getMaxAge()),
"--granularity",
config.getGranularity().getValue()));
}
if (config.getVersions() > 0) {
jobArgs.addAll(Arrays.asList("--versions", Objects.toString(config.getVersions())));
}
}
return jobArgs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,14 +216,13 @@ public void expireSnapshots(String fqtn, int maxAge, String granularity, int ver
public void expireSnapshots(Table table, int maxAge, String granularity, int versions) {
ExpireSnapshots expireSnapshotsCommand = table.expireSnapshots().cleanExpiredFiles(false);

// maxAge is always defined with granularity
if (!granularity.isEmpty()) {
TimeUnit timeUnitGranularity = TimeUnit.valueOf(granularity.toUpperCase());
long expireBeforeTimestampMs =
System.currentTimeMillis() - timeUnitGranularity.toMillis(maxAge);
log.info("Expiring snapshots for table: {} older than {}ms", table, expireBeforeTimestampMs);
expireSnapshotsCommand.expireOlderThan(expireBeforeTimestampMs).commit();
}
// maxAge will always be defined
TimeUnit timeUnitGranularity = TimeUnit.valueOf(granularity.toUpperCase());
long expireBeforeTimestampMs =
System.currentTimeMillis() - timeUnitGranularity.toMillis(maxAge);
log.info("Expiring snapshots for table: {} older than {}ms", table, expireBeforeTimestampMs);
expireSnapshotsCommand.expireOlderThan(expireBeforeTimestampMs).commit();

if (versions > 0 && Iterators.size(table.snapshots().iterator()) > versions) {
log.info("Expiring snapshots for table: {} retaining last {} versions", table, versions);
// Note: retainLast keeps the last N snapshots that WOULD be expired, hence expireOlderThan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* are older than provided count of granularities are deleted. Current snapshot is always preserved.
*
* <p>Example of invocation: com.linkedin.openhouse.jobs.spark.SnapshotsExpirationSparkApp
* --tableName db.testTable --count 7 --granularity day
* --tableName db.testTable --maxAge 3 --granularity day --versions 10
*/
@Slf4j
public class SnapshotsExpirationSparkApp extends BaseTableSparkApp {
Expand All @@ -26,11 +26,6 @@ public static class DEFAULT_CONFIGURATION {
public static final int VERSIONS = 0;
}

private static final String DEFAULT_GRANULARITY = "";

// By default do not define versions, and only retain snapshots based on max age
private static final String DEFAULT_VERSIONS = "0";

public SnapshotsExpirationSparkApp(
String jobId,
StateManager stateManager,
Expand All @@ -39,15 +34,15 @@ public SnapshotsExpirationSparkApp(
String granularity,
int versions) {
super(jobId, stateManager, fqtn);
if (maxAge == 0 && versions == 0) {
// By default, always enforce a time to live for snapshots even if unconfigured
if (maxAge == 0) {
this.maxAge = DEFAULT_CONFIGURATION.MAX_AGE;
this.granularity = DEFAULT_CONFIGURATION.GRANULARITY;
this.versions = DEFAULT_CONFIGURATION.VERSIONS;
} else {
this.granularity = granularity;
this.maxAge = maxAge;
this.versions = versions;
this.granularity = granularity;
}
this.versions = versions;
}

@Override
Expand Down Expand Up @@ -78,7 +73,7 @@ public static void main(String[] args) {
cmdLine.getOptionValue("tableName"),
Integer.parseInt(cmdLine.getOptionValue("maxAge", "0")),
cmdLine.getOptionValue("granularity", ""),
Integer.parseInt(cmdLine.getOptionValue("minVersions", "0")));
Integer.parseInt(cmdLine.getOptionValue("versions", "0")));
app.run();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package com.linkedin.openhouse.jobs.scheduler.tasks;

import com.linkedin.openhouse.jobs.client.JobsClient;
import com.linkedin.openhouse.jobs.client.TablesClient;
import com.linkedin.openhouse.jobs.util.HistoryConfig;
import com.linkedin.openhouse.jobs.util.TableMetadata;
import com.linkedin.openhouse.tables.client.model.History;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class SnapshotExpirationTaskTest {
private TablesClient tablesClient;
private JobsClient jobsClient;
private TableMetadata tableMetadata;

@BeforeEach
void setup() {
tablesClient = Mockito.mock(TablesClient.class);
jobsClient = Mockito.mock(JobsClient.class);
tableMetadata = Mockito.mock(TableMetadata.class);
Mockito.when(tableMetadata.fqtn()).thenReturn("db.table");
}

@Test
void testSnapshotExpirationForTableWithoutConfig() {
TableSnapshotsExpirationTask tableRetentionTask =
new TableSnapshotsExpirationTask(jobsClient, tablesClient, tableMetadata);

List<String> expectedArgs =
Stream.of("--tableName", tableMetadata.fqtn()).collect(Collectors.toList());
Assertions.assertEquals(expectedArgs, tableRetentionTask.getArgs());
}

@Test
void testSnapshotExpirationJobWithOnlyMaxAgeConfig() {
TableSnapshotsExpirationTask tableRetentionTask =
new TableSnapshotsExpirationTask(jobsClient, tablesClient, tableMetadata);

HistoryConfig historyConfigMock = Mockito.mock(HistoryConfig.class);
int maxAge = 1;
History.GranularityEnum granularity = History.GranularityEnum.DAY;

Mockito.when(tableMetadata.getHistoryConfig()).thenReturn(historyConfigMock);
Mockito.when(historyConfigMock.getMaxAge()).thenReturn(maxAge);
Mockito.when(historyConfigMock.getGranularity()).thenReturn(granularity);
List<String> expectedArgs =
Stream.of(
"--tableName",
tableMetadata.fqtn(),
"--maxAge",
String.valueOf(maxAge),
"--granularity",
granularity.getValue())
.collect(Collectors.toList());
Assertions.assertEquals(expectedArgs, tableRetentionTask.getArgs());
}

@Test
void testSnapshotExpirationJobWithOnlyVersionsConfig() {
TableSnapshotsExpirationTask tableRetentionTask =
new TableSnapshotsExpirationTask(jobsClient, tablesClient, tableMetadata);

HistoryConfig historyConfigMock = Mockito.mock(HistoryConfig.class);
int versions = 3;

Mockito.when(tableMetadata.getHistoryConfig()).thenReturn(historyConfigMock);
Mockito.when(historyConfigMock.getVersions()).thenReturn(versions);
List<String> expectedArgs =
Stream.of("--tableName", tableMetadata.fqtn(), "--versions", String.valueOf(versions))
.collect(Collectors.toList());
Assertions.assertEquals(expectedArgs, tableRetentionTask.getArgs());
}

@Test
void testSnapshotExpirationJobWithMaxAgeAndVersions() {
TableSnapshotsExpirationTask tableRetentionTask =
new TableSnapshotsExpirationTask(jobsClient, tablesClient, tableMetadata);

HistoryConfig historyConfigMock = Mockito.mock(HistoryConfig.class);
int maxAge = 3;
History.GranularityEnum granularity = History.GranularityEnum.DAY;
int versions = 3;

Mockito.when(tableMetadata.getHistoryConfig()).thenReturn(historyConfigMock);
Mockito.when(historyConfigMock.getMaxAge()).thenReturn(maxAge);
Mockito.when(historyConfigMock.getGranularity()).thenReturn(granularity);
Mockito.when(historyConfigMock.getVersions()).thenReturn(versions);

List<String> expectedArgs =
Stream.of(
"--tableName",
tableMetadata.fqtn(),
"--maxAge",
String.valueOf(maxAge),
"--granularity",
granularity.getValue(),
"--versions",
String.valueOf(versions))
.collect(Collectors.toList());
Assertions.assertEquals(expectedArgs, tableRetentionTask.getArgs());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,8 @@ public void testSnapshotsExpirationVersionsNoop() throws Exception {
final String tableName = "db.test_es_versions_noop_java";
final int numInserts = 3;
final int versionsToKeep = 5; // Should keep all versions given that there are fewer versions
final int maxAge = 3;
final String timeGranularity = "DAYS";
List<Long> snapshotIds;
try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) {
prepareTable(ops, tableName);
Expand All @@ -351,7 +353,7 @@ public void testSnapshotsExpirationVersionsNoop() throws Exception {
Table table = ops.getTable(tableName);
log.info("Loaded table {}, location {}", table.name(), table.location());

ops.expireSnapshots(table, 0, "", versionsToKeep);
ops.expireSnapshots(table, maxAge, timeGranularity, versionsToKeep);
// verify that table object snapshots are updated
checkSnapshots(table, snapshotIds);
}
Expand All @@ -367,6 +369,8 @@ public void testSnapshotsExpirationVersions() throws Exception {
final String tableName = "db.test_es_versions_java";
final int numInserts = 3;
final int versionsToKeep = 2;
final int maxAge = 3;
final String timeGranularity = "DAYS";
List<Long> snapshotIds;
try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) {
prepareTable(ops, tableName);
Expand All @@ -379,7 +383,7 @@ public void testSnapshotsExpirationVersions() throws Exception {
Table table = ops.getTable(tableName);
log.info("Loaded table {}, location {}", table.name(), table.location());

ops.expireSnapshots(table, 0, "", versionsToKeep);
ops.expireSnapshots(table, maxAge, timeGranularity, versionsToKeep);
// verify that table object snapshots are updated
checkSnapshots(
table, snapshotIds.subList(snapshotIds.size() - versionsToKeep, snapshotIds.size()));
Expand Down
2 changes: 1 addition & 1 deletion services/jobs/src/test/http/snapshot_expiration_job.http
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Content-Type: application/json
"args": [
"--tableName", "db.tb",
"--granularity", "day",
"--count", 1
"--maxAge", 1
]
}
}
Expand Down

0 comments on commit c8f2b5d

Please sign in to comment.