Skip to content

Commit

Permalink
Replication job to trigger setup and carbon flow for replica tables
Browse files Browse the repository at this point in the history
  • Loading branch information
rohitkum2506 committed Jan 9, 2025
1 parent 3e8d387 commit 4ceabdd
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.openhouse.datalayout.strategy.DataLayoutStrategy;
import com.linkedin.openhouse.jobs.util.DatabaseTableFilter;
import com.linkedin.openhouse.jobs.util.DirectoryMetadata;
import com.linkedin.openhouse.jobs.util.ReplicationConfig;
import com.linkedin.openhouse.jobs.util.RetentionConfig;
import com.linkedin.openhouse.jobs.util.RetryUtil;
import com.linkedin.openhouse.jobs.util.TableDataLayoutMetadata;
Expand All @@ -15,6 +16,7 @@
import com.linkedin.openhouse.tables.client.model.GetDatabaseResponseBody;
import com.linkedin.openhouse.tables.client.model.GetTableResponseBody;
import com.linkedin.openhouse.tables.client.model.Policies;
import com.linkedin.openhouse.tables.client.model.Replication;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
Expand Down Expand Up @@ -86,6 +88,31 @@ private Optional<RetentionConfig> getTableRetention(GetTableResponseBody respons
.build());
}

private Optional<List<ReplicationConfig>> getTableReplication(GetTableResponseBody response) {
// At least one replication config must be present
if (response == null
|| response.getPolicies() == null
|| response.getPolicies().getReplication() == null
|| response.getPolicies().getReplication().getConfig().size() <= 0) {
return Optional.empty();
}
List<ReplicationConfig> replicationConfigList = new ArrayList<>();
Replication conf = response.getPolicies().getReplication();
List<com.linkedin.openhouse.tables.client.model.ReplicationConfig> replicationConfig =
conf.getConfig();

replicationConfig.forEach(
rc ->
replicationConfigList.add(
ReplicationConfig.builder()
.cluster(rc.getDestination())
.proxyUser(response.getTableCreator())
.schedule(rc.getCronSchedule())
.build()));
// since replicationConfigList is initialized, it cannot be null.
return Optional.of(replicationConfigList);
}

protected GetTableResponseBody getTable(TableMetadata tableMetadata) {
return getTable(tableMetadata.getDbName(), tableMetadata.getTableName());
}
Expand Down Expand Up @@ -281,6 +308,7 @@ protected Optional<TableMetadata> mapTableResponseToTableMetadata(
.isTimePartitioned(tableResponseBody.getTimePartitioning() != null)
.isClustered(tableResponseBody.getClustering() != null)
.retentionConfig(getTableRetention(tableResponseBody).orElse(null))
.replicationConfig(getTableReplication(tableResponseBody).orElse(null))
.jobExecutionProperties(getJobExecutionProperties(tableResponseBody));
builder.creationTimeMs(Objects.requireNonNull(tableResponseBody.getCreationTime()));
return Optional.of(builder.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,13 @@ protected static CommandLine parseArgs(String[] args) {
.longOpt("tableFilter")
.desc("Regexp for filtering tables, defaults to .*")
.build());
options.addOption(
Option.builder(null)
.required(false)
.hasArg()
.longOpt("tableTypeFilter")
.desc("Option for filtering tables based on tableType. Defaults to None")
.build());
options.addOption(
Option.builder(null)
.required(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ private List<OperationTask<?>> prepareTableOperationTaskList(JobConf.JobTypeEnum
return processMetadataList(tableMetadataList, jobType);
}

private List<OperationTask<?>> prepareReplicationOperationTaskList(JobConf.JobTypeEnum jobType) {
List<TableMetadata> replicationSetupTableMetadataList = tablesClient.getTableMetadataList();
// filters tables which are primary and hava replication config defined
replicationSetupTableMetadataList =
replicationSetupTableMetadataList.stream()
.filter(m -> m.isPrimary() && (m.getReplicationConfig() != null))
.collect(Collectors.toList());
log.info(
"Fetched metadata for {} tables for replication setup task",
replicationSetupTableMetadataList.size());
return processMetadataList(replicationSetupTableMetadataList, jobType);
}

private List<OperationTask<?>> prepareTableDirectoryOperationTaskList(
JobConf.JobTypeEnum jobType) {
List<DirectoryMetadata> directoryMetadataList = tablesClient.getOrphanTableDirectories();
Expand Down Expand Up @@ -152,6 +165,8 @@ public List<OperationTask<?>> buildOperationTaskList(
case STAGED_FILES_DELETION:
case DATA_LAYOUT_STRATEGY_GENERATION:
return prepareTableOperationTaskList(jobType);
case REPLICATION:
return prepareReplicationOperationTaskList(jobType);
case DATA_LAYOUT_STRATEGY_EXECUTION:
return prepareDataLayoutOperationTaskList(jobType, properties, meter);
case ORPHAN_DIRECTORY_DELETION:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package com.linkedin.openhouse.jobs.scheduler.tasks;

import com.linkedin.openhouse.common.JobState;
import com.linkedin.openhouse.jobs.client.JobsClient;
import com.linkedin.openhouse.jobs.client.TablesClient;
import com.linkedin.openhouse.jobs.client.model.JobConf;
import com.linkedin.openhouse.jobs.util.AppConstants;
import com.linkedin.openhouse.jobs.util.OtelConfig;
import com.linkedin.openhouse.jobs.util.ReplicationConfig;
import com.linkedin.openhouse.jobs.util.TableMetadata;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;

/** A task to apply replication to a table. */
@Slf4j
public class TableReplicationTask extends TableOperationTask<TableMetadata> {
public static final JobConf.JobTypeEnum OPERATION_TYPE = JobConf.JobTypeEnum.REPLICATION;
private static final Meter METER = OtelConfig.getMeter(OperationTask.class.getName());

protected TableReplicationTask(
JobsClient jobsClient, TablesClient tablesClient, TableMetadata tableMetadata) {
super(jobsClient, tablesClient, tableMetadata);
}

@Override
public JobConf.JobTypeEnum getType() {
return OPERATION_TYPE;
}

@Override
protected List<String> getArgs() {
return null;
}

/* Returns empty value iff the callable was interrupted by future cancel. */
@Override
public Optional<JobState> call() {
if (!shouldRun()) {
log.info("Skipping job for {}, since the operation doesn't need to be run", metadata);
return Optional.empty();
}
List<ReplicationConfig> replicationConfigs = metadata.getReplicationConfig();
for (ReplicationConfig config : replicationConfigs) {
log.info("Launching job for {}", metadata);
Attributes typeAttributes =
Attributes.of(
AttributeKey.stringKey(AppConstants.TYPE),
getType().getValue(),
(metadata.getClass().equals(TableMetadata.class)
? AttributeKey.stringKey(AppConstants.TABLE_NAME)
: AttributeKey.stringKey(AppConstants.DATABASE_NAME)),
metadata.getEntityName());
try {
OtelConfig.executeWithStats(
() -> {
// this is a wrapper to convert boolean false to an exception
if (!launchJob(config)) {
throw new Exception();
}
return null;
},
METER,
"submit",
typeAttributes);
} catch (Exception e) {
log.error(
"Could not launch job {} for {}. Exception {}", getType(), metadata, e.getMessage());
return Optional.empty();
}
log.info("Launched a job for {}", metadata);
// TODO: implement wait loop for job to finish and update metrics and job state
// TODO: update the jobState with returned value from Airflow client
}
return Optional.of(Enum.valueOf(JobState.class, JobState.FAILED.name()));
}

protected boolean launchJob(ReplicationConfig config) {
String jobName =
String.format(
"%s_%s_%s_%s",
getType(), config.getCluster(), metadata.getDbName(), metadata.getTableName());
// TODO: Trigger Airflow job using airflow job client. Config can be used to create airflow
// client params
// TODO: Poll for job ID
log.info("Triggering Replication job: {} via airflow client", jobName);
return false;
}

@Override
protected boolean shouldRun() {
return metadata.isPrimary() && metadata.getReplicationConfig() != null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.linkedin.openhouse.jobs.spark;

import com.linkedin.openhouse.jobs.spark.state.StateManager;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;

/**
* Class with main entry point to replication job to trigger airflow run to setup replication for a
* table with defined replication config
*
* <p>Example of invocation: com.linkedin.openhouse.jobs.spark.ReplicationSparkApp --tableName
* db.testTable
*/
@Slf4j
public class ReplicationSparkApp extends BaseTableSparkApp {
private final String schedule;
private final String cluster;
private final String proxyUser;

public ReplicationSparkApp(
String jobId,
StateManager stateManager,
String fqtn,
String schedule,
String cluster,
String proxyUser) {
super(jobId, stateManager, fqtn);
this.schedule = schedule;
this.cluster = cluster;
this.proxyUser = proxyUser;
}

@Override
protected void runInner(Operations ops) {
log.info(
"Running ReplicationSparkApp for table {}, with parameters schedule: {}, cluster: {}, proxyUser: {}",
fqtn,
schedule,
cluster,
proxyUser);
}

public static void main(String[] args) {
List<Option> extraOptions = new ArrayList<>();
extraOptions.add(new Option("t", "tableName", true, "Fully-qualified table name"));
extraOptions.add(new Option("s", "schedule", true, "Replication job schedule in cron format"));
extraOptions.add(
new Option("p", "proxyUser", true, "Proxy user to run carbon replication job"));
extraOptions.add(new Option("p", "cluster", true, "Destination cluster for replication"));

CommandLine cmdLine = createCommandLine(args, extraOptions);
ReplicationSparkApp app =
new ReplicationSparkApp(
getJobId(cmdLine),
createStateManager(cmdLine),
cmdLine.getOptionValue("tableName"),
cmdLine.getOptionValue("schedule"),
cmdLine.getOptionValue("cluster"),
cmdLine.getOptionValue("proxyUser"));
app.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import org.apache.commons.cli.Option;

/**
* Class with main entry point to run as a table retention job. The table doesn't have to be time
* Class with main entry point to run as a table replication job. The table doesn't have to be time
* partitioned, but the retention column must be a time column.
*
* <p>Example of invocation: com.linkedin.openhouse.jobs.spark.RetentionSparkApp --columnName
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.linkedin.openhouse.jobs.util;

import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;

/** Table retention config class. This is app side representation of /tables policies->retention */
@Builder
@Getter
@EqualsAndHashCode
@ToString
public class ReplicationConfig {
private final String schedule;
private final String proxyUser;
private final String cluster;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.openhouse.jobs.util;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import lombok.Builder;
Expand All @@ -25,6 +26,7 @@ public class TableMetadata extends Metadata {
@Builder.Default protected @NonNull Map<String, String> jobExecutionProperties = new HashMap<>();
protected @Nullable RetentionConfig retentionConfig;
protected @Nullable HistoryConfig historyConfig;
protected @Nullable List<ReplicationConfig> replicationConfig;

public String fqtn() {
return String.format("%s.%s", dbName, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public enum JobType {
ORPHAN_DIRECTORY_DELETION,
TABLE_STATS_COLLECTION,
DATA_LAYOUT_STRATEGY_GENERATION,
DATA_LAYOUT_STRATEGY_EXECUTION
DATA_LAYOUT_STRATEGY_EXECUTION,
REPLICATION
}
}

0 comments on commit 4ceabdd

Please sign in to comment.