Skip to content

Commit

Permalink
[AMORO-3210] Support loading flink mixed catalog without ams url (apa…
Browse files Browse the repository at this point in the history
…che#3211)

* Support loading flink mixed catalog without ams url

* Change some flink doc description

* Fix a spelling issue in method name

* Fix a style error
  • Loading branch information
zhoujinsong authored Sep 20, 2024
1 parent 7081bbc commit a8cc843
Show file tree
Hide file tree
Showing 22 changed files with 250 additions and 911 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public static TableIdentifier tableId(TableMeta tableMeta) {
tableMeta.getTableIdentifier().getTableName());
}

public static org.apache.amoro.api.TableIdentifier amsTaleId(TableIdentifier tableIdentifier) {
public static org.apache.amoro.api.TableIdentifier amsTableId(TableIdentifier tableIdentifier) {
return new org.apache.amoro.api.TableIdentifier(
tableIdentifier.getCatalog(),
tableIdentifier.getDatabase(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.thrift.org.apache.thrift.TException;
import org.apache.amoro.table.TableMetaStore;
import org.apache.amoro.utils.CatalogUtil;
import org.apache.amoro.utils.MixedFormatCatalogUtil;
import org.apache.iceberg.common.DynConstructors;

Expand Down Expand Up @@ -86,8 +87,7 @@ private static String catalogImpl(String metastoreType, Map<String, String> cata
catalogImpl = MIXED_ICEBERG_CATALOG_IMP;
break;
case CatalogMetaProperties.CATALOG_TYPE_HIVE:
Set<TableFormat> tableFormats =
MixedFormatCatalogUtil.tableFormats(metastoreType, catalogProperties);
Set<TableFormat> tableFormats = CatalogUtil.tableFormats(metastoreType, catalogProperties);
if (tableFormats.contains(TableFormat.MIXED_HIVE)) {
catalogImpl = HIVE_CATALOG_IMPL;
} else {
Expand Down Expand Up @@ -142,7 +142,7 @@ private static MixedFormatCatalog loadCatalog(
catalogName,
type,
catalogMeta.getCatalogProperties(),
MixedFormatCatalogUtil.buildMetaStore(catalogMeta));
CatalogUtil.buildMetaStore(catalogMeta));
} catch (NoSuchObjectException e1) {
throw new IllegalArgumentException("catalog not found, please check catalog name", e1);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,14 @@

package org.apache.amoro.utils;

import org.apache.amoro.TableFormat;
import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.api.TableMeta;
import org.apache.amoro.io.AuthenticatedFileIO;
import org.apache.amoro.op.MixedHadoopTableOperations;
import org.apache.amoro.op.MixedTableOperations;
import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import org.apache.amoro.table.TableIdentifier;
import org.apache.amoro.table.TableMetaStore;
import org.apache.amoro.table.TableProperties;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.BaseTable;
Expand All @@ -46,49 +40,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class MixedFormatCatalogUtil {

private static final Logger LOG = LoggerFactory.getLogger(MixedFormatCatalogUtil.class);

/** Return table format set catalog supported. */
public static Set<TableFormat> tableFormats(CatalogMeta meta) {
return tableFormats(meta.getCatalogType(), meta.getCatalogProperties());
}

/** Return table format set catalog supported. */
public static Set<TableFormat> tableFormats(
String metastoreType, Map<String, String> catalogProperties) {
if (catalogProperties != null
&& catalogProperties.containsKey(CatalogMetaProperties.TABLE_FORMATS)) {
String tableFormatsProperty = catalogProperties.get(CatalogMetaProperties.TABLE_FORMATS);
return Arrays.stream(tableFormatsProperty.split(","))
.map(
tableFormatString ->
TableFormat.valueOf(tableFormatString.trim().toUpperCase(Locale.ROOT)))
.collect(Collectors.toSet());
} else {
// Generate table format from catalog type for compatibility with older versions
switch (metastoreType) {
case CatalogMetaProperties.CATALOG_TYPE_AMS:
return Sets.newHashSet(TableFormat.MIXED_ICEBERG);
case CatalogMetaProperties.CATALOG_TYPE_CUSTOM:
case CatalogMetaProperties.CATALOG_TYPE_HADOOP:
case CatalogMetaProperties.CATALOG_TYPE_GLUE:
return Sets.newHashSet(TableFormat.ICEBERG);
case CatalogMetaProperties.CATALOG_TYPE_HIVE:
return Sets.newHashSet(TableFormat.MIXED_HIVE);
default:
throw new IllegalArgumentException("Unsupported catalog type:" + metastoreType);
}
}
}

/** Merge catalog properties in client side into catalog meta. */
public static void mergeCatalogProperties(CatalogMeta meta, Map<String, String> properties) {
if (meta.getCatalogProperties() == null) {
Expand Down Expand Up @@ -135,90 +93,6 @@ public static Map<String, String> withIcebergCatalogInitializeProperties(
return icebergCatalogProperties;
}

/** Build {@link TableMetaStore} from catalog meta. */
public static TableMetaStore buildMetaStore(CatalogMeta catalogMeta) {
// load storage configs
TableMetaStore.Builder builder = TableMetaStore.builder();
if (catalogMeta.getStorageConfigs() != null) {
Map<String, String> storageConfigs = catalogMeta.getStorageConfigs();
if (CatalogMetaProperties.STORAGE_CONFIGS_VALUE_TYPE_HADOOP.equalsIgnoreCase(
MixedFormatCatalogUtil.getCompatibleStorageType(storageConfigs))) {
String coreSite = storageConfigs.get(CatalogMetaProperties.STORAGE_CONFIGS_KEY_CORE_SITE);
String hdfsSite = storageConfigs.get(CatalogMetaProperties.STORAGE_CONFIGS_KEY_HDFS_SITE);
String hiveSite = storageConfigs.get(CatalogMetaProperties.STORAGE_CONFIGS_KEY_HIVE_SITE);
builder
.withBase64CoreSite(coreSite)
.withBase64MetaStoreSite(hiveSite)
.withBase64HdfsSite(hdfsSite);
}
}

boolean loadAuthFromAMS =
PropertyUtil.propertyAsBoolean(
catalogMeta.getCatalogProperties(),
CatalogMetaProperties.LOAD_AUTH_FROM_AMS,
CatalogMetaProperties.LOAD_AUTH_FROM_AMS_DEFAULT);
// load auth configs from ams
if (loadAuthFromAMS) {
if (catalogMeta.getAuthConfigs() != null) {
Map<String, String> authConfigs = catalogMeta.getAuthConfigs();
String authType = authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_TYPE);
LOG.info("TableMetaStore use auth config in catalog meta, authType is {}", authType);
if (CatalogMetaProperties.AUTH_CONFIGS_VALUE_TYPE_SIMPLE.equalsIgnoreCase(authType)) {
String hadoopUsername =
authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_HADOOP_USERNAME);
builder.withSimpleAuth(hadoopUsername);
} else if (CatalogMetaProperties.AUTH_CONFIGS_VALUE_TYPE_KERBEROS.equalsIgnoreCase(
authType)) {
String krb5 = authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_KRB5);
String keytab = authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_KEYTAB);
String principal = authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_PRINCIPAL);
builder.withBase64KrbAuth(keytab, krb5, principal);
} else if (CatalogMetaProperties.AUTH_CONFIGS_VALUE_TYPE_AK_SK.equalsIgnoreCase(authType)) {
String accessKey = authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_ACCESS_KEY);
String secretKey = authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_SECRET_KEY);
builder.withAkSkAuth(accessKey, secretKey);
}
}
}

// cover auth configs from ams with auth configs in properties
String authType =
catalogMeta.getCatalogProperties().get(CatalogMetaProperties.AUTH_CONFIGS_KEY_TYPE);
if (StringUtils.isNotEmpty(authType)) {
LOG.info("TableMetaStore use auth config in properties, authType is {}", authType);
if (CatalogMetaProperties.AUTH_CONFIGS_VALUE_TYPE_SIMPLE.equalsIgnoreCase(authType)) {
String hadoopUsername =
catalogMeta
.getCatalogProperties()
.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_HADOOP_USERNAME);
builder.withSimpleAuth(hadoopUsername);
} else if (CatalogMetaProperties.AUTH_CONFIGS_VALUE_TYPE_KERBEROS.equalsIgnoreCase(
authType)) {
String krb5 =
catalogMeta.getCatalogProperties().get(CatalogMetaProperties.AUTH_CONFIGS_KEY_KRB5);
String keytab =
catalogMeta.getCatalogProperties().get(CatalogMetaProperties.AUTH_CONFIGS_KEY_KEYTAB);
String principal =
catalogMeta
.getCatalogProperties()
.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_PRINCIPAL);
builder.withBase64KrbAuth(keytab, krb5, principal);
} else if (CatalogMetaProperties.AUTH_CONFIGS_VALUE_TYPE_AK_SK.equalsIgnoreCase(authType)) {
String accessKey =
catalogMeta
.getCatalogProperties()
.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_ACCESS_KEY);
String secretKey =
catalogMeta
.getCatalogProperties()
.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_SECRET_KEY);
builder.withAkSkAuth(accessKey, secretKey);
}
}
return builder.build();
}

/** Wrap table operation with authorization logic for {@link Table}. */
public static Table useMixedTableOperations(
Table table,
Expand Down Expand Up @@ -295,53 +169,6 @@ public static Map<String, String> mergeCatalogPropertiesToTable(
return mergedProperties;
}

public static TableIdentifier tableId(TableMeta tableMeta) {
return TableIdentifier.of(
tableMeta.getTableIdentifier().getCatalog(),
tableMeta.getTableIdentifier().getDatabase(),
tableMeta.getTableIdentifier().getTableName());
}

public static org.apache.amoro.api.TableIdentifier amsTaleId(TableIdentifier tableIdentifier) {
return new org.apache.amoro.api.TableIdentifier(
tableIdentifier.getCatalog(),
tableIdentifier.getDatabase(),
tableIdentifier.getTableName());
}

/**
* Get storage type compatible with history storage type `hdfs`, which is `Hadoop` now.
*
* @param conf - configurations containing `storage.type`
* @return storage type, return `Hadoop` if `storage.type` is `hdfs`, return null if
* `storage.type` not exist.
*/
public static String getCompatibleStorageType(Map<String, String> conf) {
if (CatalogMetaProperties.STORAGE_CONFIGS_VALUE_TYPE_HDFS_LEGACY.equals(
conf.get(CatalogMetaProperties.STORAGE_CONFIGS_KEY_TYPE))) {
return CatalogMetaProperties.STORAGE_CONFIGS_VALUE_TYPE_HADOOP;
}
return conf.get(CatalogMetaProperties.STORAGE_CONFIGS_KEY_TYPE);
}

/**
* Copy property from source properties to target properties, support changing the key name.
*
* @param fromProperties - from these properties
* @param toProperties - to these properties
* @param fromKey - from key
* @param toKey - to key
*/
public static <T> void copyProperty(
Map<String, String> fromProperties,
Map<String, T> toProperties,
String fromKey,
String toKey) {
if (StringUtils.isNotEmpty(fromProperties.get(fromKey))) {
toProperties.put(toKey, (T) fromProperties.get(fromKey));
}
}

/**
* Build cache catalog.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.utils.MixedFormatCatalogUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
Expand Down Expand Up @@ -100,7 +99,8 @@ public MixedTables buildMixedTables(CatalogMeta catalogMeta) {
"Cannot build mixed-tables for table format:" + tableFormat);
}
return new MixedTables(
catalogMeta.getCatalogProperties(), MixedFormatCatalogUtil.buildMetaStore(catalogMeta));
catalogMeta.getCatalogProperties(),
org.apache.amoro.utils.CatalogUtil.buildMetaStore(catalogMeta));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.amoro.table.TableProperties;
import org.apache.amoro.table.blocker.BasicTableBlockerManager;
import org.apache.amoro.table.blocker.TableBlockerManager;
import org.apache.amoro.utils.CatalogUtil;
import org.apache.amoro.utils.CompatiblePropertyUtil;
import org.apache.amoro.utils.ConvertStructUtil;
import org.apache.amoro.utils.MixedFormatCatalogUtil;
Expand Down Expand Up @@ -214,7 +215,7 @@ public Map<String, String> properties() {
protected TableMeta getMixedTableMeta(TableIdentifier identifier) {
TableMeta tableMeta;
try {
tableMeta = getClient().getTable(MixedFormatCatalogUtil.amsTaleId(identifier));
tableMeta = getClient().getTable(CatalogUtil.amsTableId(identifier));
return tableMeta;
} catch (NoSuchObjectException e) {
throw new NoSuchTableException(e, "load table failed %s.", identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.amoro.table.TableBuilder;
import org.apache.amoro.table.TableMetaStore;
import org.apache.amoro.table.UnkeyedTable;
import org.apache.amoro.utils.MixedFormatCatalogUtil;
import org.apache.amoro.utils.CatalogUtil;
import org.apache.amoro.utils.MixedTableUtil;
import org.junit.After;
import org.junit.Before;
Expand All @@ -41,7 +41,7 @@ public TableTestBase(CatalogTestHelper catalogTestHelper, TableTestHelper tableT

@Before
public void setupTable() {
this.tableMetaStore = MixedFormatCatalogUtil.buildMetaStore(getCatalogMeta());
this.tableMetaStore = CatalogUtil.buildMetaStore(getCatalogMeta());

getUnifiedCatalog().createDatabase(TableTestHelper.TEST_DB_NAME);
switch (getTestFormat()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import org.apache.amoro.table.TableMetaStore;
import org.apache.amoro.utils.CatalogUtil;
import org.apache.amoro.utils.MixedFormatCatalogUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.PartitionSpec;
Expand Down Expand Up @@ -73,7 +74,7 @@ public void initHiveConf(Configuration hiveConf) {
@Override
public AmoroCatalog amoroCatalog() {
IcebergCatalogFactory icebergCatalogFactory = new IcebergCatalogFactory();
TableMetaStore metaStore = MixedFormatCatalogUtil.buildMetaStore(getCatalogMeta());
TableMetaStore metaStore = CatalogUtil.buildMetaStore(getCatalogMeta());
Map<String, String> properties =
icebergCatalogFactory.convertCatalogProperties(
catalogName, getMetastoreType(), catalogProperties);
Expand All @@ -85,7 +86,7 @@ public Catalog originalCatalog() {
Map<String, String> props =
MixedFormatCatalogUtil.withIcebergCatalogInitializeProperties(
catalogName, getMetastoreType(), catalogProperties);
TableMetaStore metaStore = MixedFormatCatalogUtil.buildMetaStore(getCatalogMeta());
TableMetaStore metaStore = CatalogUtil.buildMetaStore(getCatalogMeta());
return org.apache.iceberg.CatalogUtil.buildIcebergCatalog(
catalogName, props, metaStore.getConfiguration());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import org.apache.amoro.table.TableIdentifier;
import org.apache.amoro.table.TableMetaStore;
import org.apache.amoro.utils.MixedFormatCatalogUtil;
import org.apache.amoro.utils.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -63,7 +63,7 @@ protected TableFormat format() {
@Override
public AmoroCatalog amoroCatalog() {
MixedIcebergCatalogFactory mixedIcebergCatalogFactory = new MixedIcebergCatalogFactory();
TableMetaStore metaStore = MixedFormatCatalogUtil.buildMetaStore(getCatalogMeta());
TableMetaStore metaStore = CatalogUtil.buildMetaStore(getCatalogMeta());
Map<String, String> properties =
mixedIcebergCatalogFactory.convertCatalogProperties(
catalogName, getMetastoreType(), catalogProperties);
Expand All @@ -74,7 +74,7 @@ public AmoroCatalog amoroCatalog() {
@Override
public MixedFormatCatalog originalCatalog() {
CatalogMeta meta = getCatalogMeta();
TableMetaStore metaStore = MixedFormatCatalogUtil.buildMetaStore(meta);
TableMetaStore metaStore = CatalogUtil.buildMetaStore(meta);
return CatalogLoader.createCatalog(
catalogName(), meta.getCatalogType(), meta.getCatalogProperties(), metaStore);
}
Expand Down
Loading

0 comments on commit a8cc843

Please sign in to comment.