Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

list table names with snapshot #566

Merged
merged 3 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,20 @@ public interface Config {
*/
boolean listDatabaseNameByDefaultOnGetCatalog();

/**
* Get the page size when listing table entities.
*
* @return True if it is.
*/
int getListTableEntitiesPageSize();

/**
* Get the page size when listing table names.
*
* @return True if it is.
*/
int getListTableNamesPageSize();

/**
* Metadata query timeout in seconds.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,16 @@ public boolean listDatabaseNameByDefaultOnGetCatalog() {
return this.metacatProperties.getService().isListDatabaseNameByDefaultOnGetCatalog();
}

@Override
public int getListTableEntitiesPageSize() {
return this.metacatProperties.getService().getListTableEntitiesPageSize();
}

@Override
public int getListTableNamesPageSize() {
return this.metacatProperties.getService().getListTableNamesPageSize();
}

@Override
public int getMetadataQueryTimeout() {
return this.metacatProperties.getUsermetadata().getQueryTimeoutInSeconds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class ServiceProperties {
private Tables tables = new Tables();
private boolean listTableNamesByDefaultOnGetDatabase = true;
private boolean listDatabaseNameByDefaultOnGetCatalog = true;
private int listTableEntitiesPageSize = 1000;
private int listTableNamesPageSize = 10000;

/**
* Max related properties.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.netflix.metacat.connector.polaris;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.netflix.metacat.common.QualifiedName;
Expand All @@ -17,6 +18,7 @@
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -31,6 +33,88 @@
@ActiveProfiles(profiles = {"polaris_functional_test"})
@AutoConfigureDataJpa
public class PolarisConnectorTableServiceFunctionalTest extends PolarisConnectorTableServiceTest {
/**
* Test get table names.
*/
@Test
public void testGetTableNames() {
final QualifiedName name1 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table1");
final TableInfo tableInfo1 = TableInfo.builder()
.name(name1)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc1"))
.build();
getPolarisTableService().create(getRequestContext(), tableInfo1);
final QualifiedName name2 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table2");
final TableInfo tableInfo2 = TableInfo.builder()
.name(name2)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc2"))
.build();
getPolarisTableService().create(getRequestContext(), tableInfo2);
final QualifiedName name3 = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table3");
final TableInfo tableInfo3 = TableInfo.builder()
.name(name3)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc3"))
.build();
getPolarisTableService().create(getRequestContext(), tableInfo3);

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
log.debug("Sleep was interrupted");
}

final List<QualifiedName> tables = getPolarisTableService()
.getTableNames(getRequestContext(), DB_QUALIFIED_NAME, "", -1);
Assert.assertEquals(tables.size(), 3);
Assert.assertEquals(tables, ImmutableList.of(name1, name2, name3));
}

/**
* Test empty list tables.
*/
@Test
public void testListTablesEmpty() {
final QualifiedName qualifiedName = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "");

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
log.debug("Sleep was interrupted");
}

final List<QualifiedName> names = getPolarisTableService().listNames(
getRequestContext(), DB_QUALIFIED_NAME, qualifiedName,
new Sort(null, SortOrder.ASC), new Pageable(2, 0));
Assert.assertEquals(names, Arrays.asList());
}

/**
* Test table creation then list tables.
*/
@Test
public void testTableCreationAndList() {
final QualifiedName qualifiedName = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, "table1");
final TableInfo tableInfo = TableInfo.builder()
.name(qualifiedName)
.metadata(ImmutableMap.of("table_type", "ICEBERG", "metadata_location", "loc1"))
.build();
getPolarisTableService().create(getRequestContext(), tableInfo);

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
log.debug("Sleep was interrupted");
}

final List<QualifiedName> names = getPolarisTableService().listNames(
getRequestContext(), DB_QUALIFIED_NAME, qualifiedName,
new Sort(null, SortOrder.ASC), new Pageable(2, 0));
Assert.assertEquals(names, Arrays.asList(qualifiedName));
}

/**
* Test table list.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,37 @@
@AutoConfigureDataJpa
public class PolarisStoreConnectorFunctionalTest extends PolarisStoreConnectorTest {

/**
* Test to verify that table names fetch works.
*/
@Test
public void testPaginatedFetch() {
final String dbName = generateDatabaseName();
createDB(dbName);
List<String> tblNames = getPolarisConnector().getTables(dbName, "", 1000);
Assert.assertEquals(0, tblNames.size());

final String tblNameA = "A_" + generateTableName();
final String tblNameB = "B_" + generateTableName();
final String tblNameC = "C_" + generateTableName();
createTable(dbName, tblNameA);
createTable(dbName, tblNameB);
createTable(dbName, tblNameC);

try {
// pause execution for 10000 milliseconds (10 seconds)
Thread.sleep(10000);
} catch (InterruptedException e) {
log.debug("Sleep was interrupted");
}

tblNames = getPolarisConnector().getTables(dbName, "", 1000);
Assert.assertEquals(3, tblNames.size());
Assert.assertEquals(tblNameA, tblNames.get(0));
Assert.assertEquals(tblNameB, tblNames.get(1));
Assert.assertEquals(tblNameC, tblNames.get(2));
}

/**
* Test getTableEntities.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,10 @@ public List<QualifiedName> listNames(
try {
final List<QualifiedName> qualifiedNames = Lists.newArrayList();
final String tableFilter = (prefix != null && prefix.isTableDefinition()) ? prefix.getTableName() : "";
for (String tableName : polarisStoreService.getTables(name.getDatabaseName(), tableFilter)) {
for (String tableName : polarisStoreService.getTables(name.getDatabaseName(),
tableFilter,
connectorContext.getConfig().getListTableNamesPageSize())
) {
final QualifiedName qualifiedName =
QualifiedName.ofTable(name.getCatalogName(), name.getDatabaseName(), tableName);
if (prefix != null && !qualifiedName.toString().startsWith(prefix.toString())) {
Expand Down Expand Up @@ -338,7 +341,9 @@ public List<TableInfo> list(
try {
final String tableFilter = (prefix != null && prefix.isTableDefinition()) ? prefix.getTableName() : "";
final List<PolarisTableEntity> tbls =
polarisStoreService.getTableEntities(name.getDatabaseName(), tableFilter, 1000);
polarisStoreService.getTableEntities(name.getDatabaseName(),
tableFilter,
connectorContext.getConfig().getListTableEntitiesPageSize());
if (sort != null) {
ConnectorUtils.sort(tbls, sort, Comparator.comparing(t -> t.getTblName()));
}
Expand Down Expand Up @@ -388,7 +393,10 @@ public List<QualifiedName> getTableNames(
final List<QualifiedName> result = Lists.newArrayList();
for (int i = 0; i < databaseNames.size() && limitSize > 0; i++) {
final String databaseName = databaseNames.get(i);
final List<String> tableNames = polarisStoreService.getTables(name.getDatabaseName(), "");
final List<String> tableNames = polarisStoreService.getTables(
name.getDatabaseName(),
"",
connectorContext.getConfig().getListTableNamesPageSize());
result.addAll(tableNames.stream()
.map(n -> QualifiedName.ofTable(name.getCatalogName(), databaseName, n))
.limit(limitSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.Sort;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

Expand Down Expand Up @@ -158,7 +157,8 @@ public Optional<PolarisTableEntity> getTable(final String dbName, final String t
public List<PolarisTableEntity> getTableEntities(final String databaseName,
final String tableNamePrefix,
final int pageFetchSize) {
return tblRepo.findAllTablesByDbNameAndTablePrefix(databaseName, tableNamePrefix, pageFetchSize);
return (List<PolarisTableEntity>)
tblRepo.findAllTablesByDbNameAndTablePrefix(databaseName, tableNamePrefix, pageFetchSize, true);
}

/**
Expand Down Expand Up @@ -205,22 +205,9 @@ boolean tableExistsById(final String tblId) {
*/
@Override
@Transactional(propagation = Propagation.SUPPORTS)
public List<String> getTables(final String databaseName, final String tableNamePrefix) {
final int pageFetchSize = 1000;
final List<String> retval = new ArrayList<>();
final String tblPrefix = tableNamePrefix == null ? "" : tableNamePrefix;
Pageable page = PageRequest.of(0, pageFetchSize, Sort.by("tblName").ascending());
Slice<String> tblNames = null;
boolean hasNext = true;
do {
tblNames = tblRepo.findAllByDbNameAndTablePrefix(databaseName, tblPrefix, page);
retval.addAll(tblNames.toList());
hasNext = tblNames.hasNext();
if (hasNext) {
page = tblNames.nextPageable();
}
} while (hasNext);
return retval;
public List<String> getTables(final String databaseName, final String tableNamePrefix, final int pageFetchSize) {
return (List<String>)
tblRepo.findAllTablesByDbNameAndTablePrefix(databaseName, tableNamePrefix, pageFetchSize, false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ public interface PolarisStoreService {
* Gets tables in the database and tableName prefix.
* @param databaseName database name
* @param tableNamePrefix table name prefix
* @param pageFetchSize size of each page
* @return list of table names in the database with the table name prefix.
*/
List<String> getTables(String databaseName, String tableNamePrefix);
List<String> getTables(String databaseName, String tableNamePrefix, int pageFetchSize);

/**
* Do an atomic compare-and-swap to update the table's metadata location.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.netflix.metacat.connector.polaris.store.repos;

import com.netflix.metacat.connector.polaris.store.entities.PolarisTableEntity;
import java.util.List;

/**
Expand All @@ -12,8 +11,9 @@ public interface PolarisTableCustomRepository {
* @param dbName database name
* @param tableNamePrefix table name prefix. can be empty.
* @param pageSize target size for each page
* @param selectAllColumns if true return the PolarisEntity else return name of the entity
* @return table entities in the database.
*/
List<PolarisTableEntity> findAllTablesByDbNameAndTablePrefix(
String dbName, String tableNamePrefix, int pageSize);
List<?> findAllTablesByDbNameAndTablePrefix(
String dbName, String tableNamePrefix, int pageSize, boolean selectAllColumns);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public class PolarisTableCustomRepositoryImpl implements PolarisTableCustomRepos
@PersistenceContext
private EntityManager entityManager;

private Slice<PolarisTableEntity> findAllTablesByDbNameAndTablePrefixForCurrentPage(
final String dbName, final String tableNamePrefix, final Pageable page) {
private <T> Slice<T> findAllTablesByDbNameAndTablePrefixForCurrentPage(
final String dbName, final String tableNamePrefix, final Pageable page, final boolean selectAllColumns) {

// Generate ORDER BY clause
String orderBy = "";
Expand All @@ -37,14 +37,21 @@ private Slice<PolarisTableEntity> findAllTablesByDbNameAndTablePrefixForCurrentP
orderBy = " ORDER BY " + orderBy;
}

final String sql = "SELECT t.* FROM TBLS t "
final String selectClause = selectAllColumns ? "t.*" : "t.tbl_name";
final String sql = "SELECT " + selectClause + " FROM TBLS t "
+ "WHERE t.db_name = :dbName AND t.tbl_name LIKE :tableNamePrefix" + orderBy;
final Query query = entityManager.createNativeQuery(sql, PolarisTableEntity.class);

Query query;
if (selectAllColumns) {
query = entityManager.createNativeQuery(sql, PolarisTableEntity.class);
} else {
query = entityManager.createNativeQuery(sql);
}
query.setParameter("dbName", dbName);
query.setParameter("tableNamePrefix", tableNamePrefix + "%");
query.setFirstResult(page.getPageNumber() * page.getPageSize());
query.setMaxResults(page.getPageSize() + 1); // Fetch one extra result to determine if there is a next page
final List<PolarisTableEntity> resultList = query.getResultList();
final List<T> resultList = query.getResultList();
// Check if there is a next page
final boolean hasNext = resultList.size() > page.getPageSize();
// If there is a next page, remove the last item from the list
Expand All @@ -56,18 +63,18 @@ private Slice<PolarisTableEntity> findAllTablesByDbNameAndTablePrefixForCurrentP

@Override
@Transactional
public List<PolarisTableEntity> findAllTablesByDbNameAndTablePrefix(
final String dbName, final String tableNamePrefix, final int pageFetchSize) {
public List<?> findAllTablesByDbNameAndTablePrefix(
final String dbName, final String tableNamePrefix, final int pageFetchSize, final boolean selectAllColumns) {
Pageable page = PageRequest.of(0, pageFetchSize, Sort.by("tbl_name").ascending());
entityManager.createNativeQuery("SET TRANSACTION AS OF SYSTEM TIME follower_read_timestamp()")
.executeUpdate();
final List<PolarisTableEntity> retval = new ArrayList<>();
final List<Object> retval = new ArrayList<>();
final String tblPrefix = tableNamePrefix == null ? "" : tableNamePrefix;
Slice<PolarisTableEntity> tbls;
Slice<?> tbls;
boolean hasNext;
do {
tbls = findAllTablesByDbNameAndTablePrefixForCurrentPage(dbName, tblPrefix, page);
retval.addAll(tbls.toList());
tbls = findAllTablesByDbNameAndTablePrefixForCurrentPage(dbName, tblPrefix, page, selectAllColumns);
retval.addAll(tbls.getContent());
hasNext = tbls.hasNext();
if (hasNext) {
page = tbls.nextPageable();
Expand Down
Loading
Loading