From 5d0a3a03cd2eb748ec1e609d35ccee59f120c13b Mon Sep 17 00:00:00 2001 From: stevie9868 <151791653+stevie9868@users.noreply.github.com> Date: Tue, 30 Apr 2024 18:18:32 -0700 Subject: [PATCH] Yingjianw/list db with follower reader timestamp (#589) * follower_read_timestamp for list db calls * address comments --------- Co-authored-by: Yingjian Wu --- .../common/server/properties/Config.java | 18 +- .../server/properties/DefaultConfigImpl.java | 10 ++ .../server/properties/ServiceProperties.java | 2 + ...onnectorDatabaseServiceFunctionalTest.java | 156 ++++++++++++++++++ ...isConnectorTableServiceFunctionalTest.java | 28 +--- .../PolarisStoreConnectorFunctionalTest.java | 60 +++++-- .../metacat/connector/polaris/TestUtil.java | 29 ++++ .../PolarisConnectorDatabaseService.java | 34 ++-- .../polaris/PolarisConnectorTableService.java | 4 +- .../polaris/store/PolarisStoreConnector.java | 39 ++--- .../polaris/store/PolarisStoreService.java | 15 +- .../PolarisDatabaseCustomRepository.java | 20 +++ .../PolarisDatabaseCustomRepositoryImpl.java | 100 +++++++++++ .../repos/PolarisDatabaseRepository.java | 2 +- .../PolarisConnectorDatabaseServiceTest.java | 33 +--- .../metacat/MetacatFunctionalSpec.groovy | 2 + 16 files changed, 436 insertions(+), 116 deletions(-) create mode 100644 metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/PolarisConnectorDatabaseServiceFunctionalTest.java create mode 100644 metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/TestUtil.java create mode 100644 metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisDatabaseCustomRepository.java create mode 100644 metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisDatabaseCustomRepositoryImpl.java diff --git a/metacat-common-server/src/main/java/com/netflix/metacat/common/server/properties/Config.java b/metacat-common-server/src/main/java/com/netflix/metacat/common/server/properties/Config.java index f2515a725..a22ea5885 100644 --- a/metacat-common-server/src/main/java/com/netflix/metacat/common/server/properties/Config.java +++ b/metacat-common-server/src/main/java/com/netflix/metacat/common/server/properties/Config.java @@ -555,17 +555,31 @@ public interface Config { /** * Get the page size when listing table entities. * - * @return True if it is. + * @return size of the page */ int getListTableEntitiesPageSize(); /** * Get the page size when listing table names. * - * @return True if it is. + * @return size of the page */ int getListTableNamesPageSize(); + /** + * Get the page size when listing db entities. + * + * @return size of the page + */ + int getListDatabaseEntitiesPageSize(); + + /** + * Get the page size when listing db names. + * + * @return size of the page + */ + int getListDatabaseNamesPageSize(); + /** * Metadata query timeout in seconds. * diff --git a/metacat-common-server/src/main/java/com/netflix/metacat/common/server/properties/DefaultConfigImpl.java b/metacat-common-server/src/main/java/com/netflix/metacat/common/server/properties/DefaultConfigImpl.java index ad27e0057..65b4f2a8e 100644 --- a/metacat-common-server/src/main/java/com/netflix/metacat/common/server/properties/DefaultConfigImpl.java +++ b/metacat-common-server/src/main/java/com/netflix/metacat/common/server/properties/DefaultConfigImpl.java @@ -649,6 +649,16 @@ public int getListTableNamesPageSize() { return this.metacatProperties.getService().getListTableNamesPageSize(); } + @Override + public int getListDatabaseEntitiesPageSize() { + return this.metacatProperties.getService().getListDatabaseEntitiesPageSize(); + } + + @Override + public int getListDatabaseNamesPageSize() { + return this.metacatProperties.getService().getListDatabaseNamesPageSize(); + } + @Override public int getMetadataQueryTimeout() { return this.metacatProperties.getUsermetadata().getQueryTimeoutInSeconds(); diff --git a/metacat-common-server/src/main/java/com/netflix/metacat/common/server/properties/ServiceProperties.java b/metacat-common-server/src/main/java/com/netflix/metacat/common/server/properties/ServiceProperties.java index cec2f452e..3f759b8b8 100644 --- a/metacat-common-server/src/main/java/com/netflix/metacat/common/server/properties/ServiceProperties.java +++ b/metacat-common-server/src/main/java/com/netflix/metacat/common/server/properties/ServiceProperties.java @@ -39,6 +39,8 @@ public class ServiceProperties { private boolean listDatabaseNameByDefaultOnGetCatalog = true; private int listTableEntitiesPageSize = 1000; private int listTableNamesPageSize = 10000; + private int listDatabaseEntitiesPageSize = 1000; + private int listDatabaseNamesPageSize = 10000; /** * Max related properties. diff --git a/metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/PolarisConnectorDatabaseServiceFunctionalTest.java b/metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/PolarisConnectorDatabaseServiceFunctionalTest.java new file mode 100644 index 000000000..182766da1 --- /dev/null +++ b/metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/PolarisConnectorDatabaseServiceFunctionalTest.java @@ -0,0 +1,156 @@ + +package com.netflix.metacat.connector.polaris; + +import com.netflix.metacat.common.QualifiedName; +import com.netflix.metacat.common.dto.Pageable; +import com.netflix.metacat.common.dto.Sort; +import com.netflix.metacat.common.dto.SortOrder; +import com.netflix.metacat.common.server.connectors.model.DatabaseInfo; +import com.netflix.metacat.connector.polaris.configs.PolarisPersistenceConfig; +import lombok.extern.slf4j.Slf4j; +import org.junit.Assert; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureDataJpa; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import java.util.Arrays; +import java.util.List; + + +/** + * Test PolarisConnectorTableService. + */ +@Slf4j +@ExtendWith(SpringExtension.class) +@SpringBootTest(classes = {PolarisPersistenceConfig.class}) +@ActiveProfiles(profiles = {"polaris_functional_test"}) +@AutoConfigureDataJpa +public class PolarisConnectorDatabaseServiceFunctionalTest extends PolarisConnectorDatabaseServiceTest { + /** + * Test SimpleDBList. + */ + @Test + public void testSimpleListDb() { + // Simulate a delay so that the dbs schema is visible + TestUtil.simulateDelay(); + final DatabaseInfo db1 = DatabaseInfo.builder().name(DB1_QUALIFIED_NAME).uri("uri1").build(); + final DatabaseInfo db2 = DatabaseInfo.builder().name(DB2_QUALIFIED_NAME).uri("uri2").build(); + getPolarisDBService().create(getRequestContext(), db1); + getPolarisDBService().create(getRequestContext(), db2); + Assert.assertTrue(getPolarisDBService().exists(getRequestContext(), DB1_QUALIFIED_NAME)); + Assert.assertTrue(getPolarisDBService().exists(getRequestContext(), DB2_QUALIFIED_NAME)); + + // Since now list dbs use follower_read_timestamp, we will not immediately get the newly created dbs + List dbNames = + getPolarisDBService().listNames( + getRequestContext(), QualifiedName.ofCatalog(CATALOG_NAME), null, null, null); + List dbs = + getPolarisDBService().list( + getRequestContext(), QualifiedName.ofCatalog(CATALOG_NAME), null, null, null); + Assert.assertTrue("Expected dbNames to be empty", dbNames.isEmpty()); + Assert.assertTrue("Expected dbs to be empty", dbs.isEmpty()); + + + // After sufficient time, the dbs should return using follower_read_timestamp + TestUtil.simulateDelay(); + dbNames = getPolarisDBService().listNames( + getRequestContext(), QualifiedName.ofCatalog(CATALOG_NAME), null, null, null); + Assert.assertEquals(dbNames, Arrays.asList(DB1_QUALIFIED_NAME, DB2_QUALIFIED_NAME)); + dbs = getPolarisDBService().list( + getRequestContext(), QualifiedName.ofCatalog(CATALOG_NAME), null, null, null); + Assert.assertEquals(dbs, Arrays.asList(db1, db2)); + + // Test Prefix + dbNames = getPolarisDBService().listNames( + getRequestContext(), + QualifiedName.ofCatalog(CATALOG_NAME), QualifiedName.ofDatabase(CATALOG_NAME, "db"), + null, + null); + Assert.assertEquals(dbNames, Arrays.asList(DB1_QUALIFIED_NAME, DB2_QUALIFIED_NAME)); + dbs = getPolarisDBService().list( + getRequestContext(), + QualifiedName.ofCatalog(CATALOG_NAME), + QualifiedName.ofDatabase(CATALOG_NAME, "db"), + null, + null); + Assert.assertEquals(dbs, Arrays.asList(db1, db2)); + + dbNames = getPolarisDBService().listNames( + getRequestContext(), + QualifiedName.ofCatalog(CATALOG_NAME), + QualifiedName.ofDatabase(CATALOG_NAME, "db1_"), + null, + null); + Assert.assertEquals(dbNames, Arrays.asList(DB1_QUALIFIED_NAME)); + dbs = getPolarisDBService().list( + getRequestContext(), + QualifiedName.ofCatalog(CATALOG_NAME), + QualifiedName.ofDatabase(CATALOG_NAME, "db1_"), + null, + null); + Assert.assertEquals(dbs, Arrays.asList(db1)); + + // Test Order desc + dbNames = getPolarisDBService().listNames( + getRequestContext(), + QualifiedName.ofCatalog(CATALOG_NAME), + null, + new Sort("name", SortOrder.DESC), + null); + Assert.assertEquals(dbNames, Arrays.asList(DB2_QUALIFIED_NAME, DB1_QUALIFIED_NAME)); + dbs = getPolarisDBService().list( + getRequestContext(), + QualifiedName.ofCatalog(CATALOG_NAME), + null, + new Sort("name", SortOrder.DESC), + null); + Assert.assertEquals(dbs, Arrays.asList(db2, db1)); + + // Test pageable + dbNames = getPolarisDBService().listNames( + getRequestContext(), + QualifiedName.ofCatalog(CATALOG_NAME), + null, + null, + new Pageable(5, 0)); + Assert.assertEquals(dbNames, Arrays.asList(DB1_QUALIFIED_NAME, DB2_QUALIFIED_NAME)); + dbs = getPolarisDBService().list( + getRequestContext(), + QualifiedName.ofCatalog(CATALOG_NAME), + null, + null, + new Pageable(5, 0)); + Assert.assertEquals(dbs, Arrays.asList(db1, db2)); + + dbNames = getPolarisDBService().listNames( + getRequestContext(), + QualifiedName.ofCatalog(CATALOG_NAME), null, null, new Pageable(1, 0)); + Assert.assertEquals(dbNames, Arrays.asList(DB1_QUALIFIED_NAME)); + dbs = getPolarisDBService().list( + getRequestContext(), + QualifiedName.ofCatalog(CATALOG_NAME), null, null, new Pageable(1, 0)); + Assert.assertEquals(dbs, Arrays.asList(db1)); + + dbNames = getPolarisDBService().listNames( + getRequestContext(), + QualifiedName.ofCatalog(CATALOG_NAME), null, null, new Pageable(1, 1)); + Assert.assertEquals(dbNames, Arrays.asList(DB2_QUALIFIED_NAME)); + dbs = getPolarisDBService().list( + getRequestContext(), + QualifiedName.ofCatalog(CATALOG_NAME), null, null, new Pageable(1, 1)); + Assert.assertEquals(dbs, Arrays.asList(db2)); + + dbNames = getPolarisDBService().listNames( + getRequestContext(), + QualifiedName.ofCatalog(CATALOG_NAME), null, null, new Pageable(5, 1)); + Assert.assertEquals(dbNames, Arrays.asList(DB2_QUALIFIED_NAME)); + dbs = getPolarisDBService().list( + getRequestContext(), + QualifiedName.ofCatalog(CATALOG_NAME), null, null, new Pageable(5, 1)); + Assert.assertEquals(dbs, Arrays.asList(db2)); + } +} + diff --git a/metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableServiceFunctionalTest.java b/metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableServiceFunctionalTest.java index 5b647e348..278447119 100644 --- a/metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableServiceFunctionalTest.java +++ b/metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableServiceFunctionalTest.java @@ -57,12 +57,7 @@ public void testGetTableNames() { .build(); getPolarisTableService().create(getRequestContext(), tableInfo3); - try { - // pause execution for 10000 milliseconds (10 seconds) - Thread.sleep(10000); - } catch (InterruptedException e) { - log.debug("Sleep was interrupted"); - } + TestUtil.simulateDelay(); final List tables = getPolarisTableService() .getTableNames(getRequestContext(), DB_QUALIFIED_NAME, "", -1); @@ -77,12 +72,7 @@ public void testGetTableNames() { public void testListTablesEmpty() { final QualifiedName qualifiedName = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, ""); - try { - // pause execution for 10000 milliseconds (10 seconds) - Thread.sleep(10000); - } catch (InterruptedException e) { - log.debug("Sleep was interrupted"); - } + TestUtil.simulateDelay(); final List names = getPolarisTableService().listNames( getRequestContext(), DB_QUALIFIED_NAME, qualifiedName, @@ -102,12 +92,7 @@ public void testTableCreationAndList() { .build(); getPolarisTableService().create(getRequestContext(), tableInfo); - try { - // pause execution for 10000 milliseconds (10 seconds) - Thread.sleep(10000); - } catch (InterruptedException e) { - log.debug("Sleep was interrupted"); - } + TestUtil.simulateDelay(); final List names = getPolarisTableService().listNames( getRequestContext(), DB_QUALIFIED_NAME, qualifiedName, @@ -136,12 +121,7 @@ public void testList() { final QualifiedName qualifiedName = QualifiedName.ofTable(CATALOG_NAME, DB_NAME, ""); - try { - // pause execution for 10000 milliseconds (10 seconds) - Thread.sleep(10000); - } catch (InterruptedException e) { - log.debug("Sleep was interrupted"); - } + TestUtil.simulateDelay(); List tables = this.getPolarisTableService().list( this.getRequestContext(), DB_QUALIFIED_NAME, qualifiedName, new Sort(null, SortOrder.ASC), diff --git a/metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/PolarisStoreConnectorFunctionalTest.java b/metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/PolarisStoreConnectorFunctionalTest.java index 2cdf296ff..641e588f2 100644 --- a/metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/PolarisStoreConnectorFunctionalTest.java +++ b/metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/PolarisStoreConnectorFunctionalTest.java @@ -4,6 +4,7 @@ import com.netflix.metacat.connector.polaris.configs.PolarisPersistenceConfig; import com.netflix.metacat.connector.polaris.store.PolarisStoreConnectorTest; import com.netflix.metacat.connector.polaris.store.entities.PolarisTableEntity; +import com.netflix.metacat.connector.polaris.store.entities.PolarisDatabaseEntity; import lombok.extern.slf4j.Slf4j; import org.junit.Assert; import org.junit.jupiter.api.Test; @@ -13,7 +14,9 @@ import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.junit.jupiter.SpringExtension; +import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; /** * Test persistence operations on Database objects. @@ -42,12 +45,7 @@ public void testPaginatedFetch() { createTable(dbName, tblNameB); createTable(dbName, tblNameC); - try { - // pause execution for 10000 milliseconds (10 seconds) - Thread.sleep(10000); - } catch (InterruptedException e) { - log.debug("Sleep was interrupted"); - } + TestUtil.simulateDelay(); tblNames = getPolarisConnector().getTables(dbName, "", 1000); Assert.assertEquals(3, tblNames.size()); @@ -65,12 +63,7 @@ public void testGetTableEntities() { final String dbName = generateDatabaseName(); createDB(dbName); - try { - // pause execution for 10000 milliseconds (10 seconds) - Thread.sleep(10000); - } catch (InterruptedException e) { - log.debug("Sleep was interrupted"); - } + TestUtil.simulateDelay(); // Test when db is empty List entities = getPolarisConnector().getTableEntities(dbName, "", 1); @@ -85,12 +78,7 @@ public void testGetTableEntities() { createTable(dbName, tblNameB); createTable(dbName, tblNameC); - try { - // pause execution for 10000 milliseconds (10 seconds) - Thread.sleep(10000); - } catch (InterruptedException e) { - log.debug("Sleep was interrupted"); - } + TestUtil.simulateDelay(); // Test pagination and sort entities = getPolarisConnector().getTableEntities(dbName, "", 1); @@ -117,4 +105,40 @@ public void testGetTableEntities() { Assert.assertEquals(tblNameB, entities.get(1).getTblName()); Assert.assertEquals(tblNameC, entities.get(2).getTblName()); } + + /** + * test list database with different db page size config. + */ + @Test + public void testListDbPage() { + createDB("db1"); + createDB("db2"); + createDB("db3"); + + TestUtil.simulateDelay(); + + List dbNames = getPolarisConnector().getDatabaseNames("db", null, 1); + List dbs = getPolarisConnector().getDatabases("db", null, 1); + Assert.assertEquals("Expected dbNames ", Arrays.asList("db1", "db2", "db3"), dbNames); + Assert.assertEquals("Expected dbs ", Arrays.asList("db1", "db2", "db3"), + dbs.stream().map(PolarisDatabaseEntity::getDbName).collect(Collectors.toList())); + + dbNames = getPolarisConnector().getDatabaseNames("db", null, 2); + dbs = getPolarisConnector().getDatabases("db", null, 2); + Assert.assertEquals("Expected dbNames ", Arrays.asList("db1", "db2", "db3"), dbNames); + Assert.assertEquals("Expected dbs ", Arrays.asList("db1", "db2", "db3"), + dbs.stream().map(PolarisDatabaseEntity::getDbName).collect(Collectors.toList())); + + dbNames = getPolarisConnector().getDatabaseNames("db", null, 3); + dbs = getPolarisConnector().getDatabases("db", null, 3); + Assert.assertEquals("Expected dbNames ", Arrays.asList("db1", "db2", "db3"), dbNames); + Assert.assertEquals("Expected dbs ", Arrays.asList("db1", "db2", "db3"), + dbs.stream().map(PolarisDatabaseEntity::getDbName).collect(Collectors.toList())); + + dbNames = getPolarisConnector().getDatabaseNames("db", null, 4); + dbs = getPolarisConnector().getDatabases("db", null, 4); + Assert.assertEquals("Expected dbNames ", Arrays.asList("db1", "db2", "db3"), dbNames); + Assert.assertEquals("Expected dbs ", Arrays.asList("db1", "db2", "db3"), + dbs.stream().map(PolarisDatabaseEntity::getDbName).collect(Collectors.toList())); + } } diff --git a/metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/TestUtil.java b/metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/TestUtil.java new file mode 100644 index 000000000..6ecaa5148 --- /dev/null +++ b/metacat-connector-polaris/src/functionalTest/java/com/netflix/metacat/connector/polaris/TestUtil.java @@ -0,0 +1,29 @@ +package com.netflix.metacat.connector.polaris; + +import lombok.extern.slf4j.Slf4j; + +/** + * Utility class containing methods to aid in testing by simulating various conditions. + */ +@Slf4j +public final class TestUtil { + + /** + * Private constructor to prevent instantiation of this utility class. + */ + private TestUtil() { + throw new UnsupportedOperationException("This is a utility class and cannot be instantiated"); + } + + /** + * Simulates a delay for a fixed period of time. + */ + public static void simulateDelay() { + try { + Thread.sleep(5000); // 5 seconds delay + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // Restore the interrupted status + log.debug("Sleep was interrupted", e); + } + } +} diff --git a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/PolarisConnectorDatabaseService.java b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/PolarisConnectorDatabaseService.java index 6380ff45d..94d67780e 100644 --- a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/PolarisConnectorDatabaseService.java +++ b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/PolarisConnectorDatabaseService.java @@ -21,7 +21,6 @@ import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; @@ -35,6 +34,8 @@ public class PolarisConnectorDatabaseService implements ConnectorDatabaseService private final String defaultLocationPrefix; private final PolarisStoreService polarisStoreService; + private final ConnectorContext connectorContext; + /** * Constructor. * @@ -46,6 +47,7 @@ public PolarisConnectorDatabaseService( final ConnectorContext connectorContext ) { this.polarisStoreService = polarisStoreService; + this.connectorContext = connectorContext; this.defaultLocationPrefix = connectorContext.getConfiguration().getOrDefault(DB_DEFAULT_LOCATION, ""); } @@ -159,17 +161,12 @@ public List listNames( @Nullable final Pageable pageable ) { try { - List qualifiedNames = polarisStoreService.getAllDatabases().stream() - .map(d -> QualifiedName.ofDatabase(name.getCatalogName(), d.getDbName())) + final String dbPrefix = prefix == null ? "" : prefix.getDatabaseName(); + final List qualifiedNames = polarisStoreService.getDatabaseNames( + dbPrefix, sort, this.connectorContext.getConfig().getListDatabaseNamesPageSize()) + .stream() + .map(dbName -> QualifiedName.ofDatabase(name.getCatalogName(), dbName)) .collect(Collectors.toCollection(ArrayList::new)); - if (prefix != null) { - qualifiedNames = qualifiedNames.stream() - .filter(n -> n.startsWith(prefix)) - .collect(Collectors.toCollection(ArrayList::new)); - } - if (sort != null) { - ConnectorUtils.sort(qualifiedNames, sort, Comparator.comparing(QualifiedName::toString)); - } return ConnectorUtils.paginate(qualifiedNames, pageable); } catch (Exception exception) { throw new ConnectorException( @@ -190,15 +187,12 @@ public List list( ) { try { final PolarisDatabaseMapper mapper = new PolarisDatabaseMapper(name.getCatalogName()); - List dbs = polarisStoreService.getAllDatabases(); - if (prefix != null) { - dbs = dbs.stream() - .filter(n -> QualifiedName.ofDatabase(name.getCatalogName(), n.getDbName()).startsWith(prefix)) - .collect(Collectors.toCollection(ArrayList::new)); - } - if (sort != null) { - ConnectorUtils.sort(dbs, sort, Comparator.comparing(p -> p.getDbName())); - } + final String dbPrefix = prefix == null ? "" : prefix.getDatabaseName(); + + final List dbs = polarisStoreService.getDatabases( + dbPrefix, sort, this.connectorContext.getConfig().getListDatabaseEntitiesPageSize() + ); + return ConnectorUtils.paginate(dbs, pageable).stream() .map(d -> mapper.toInfo(d)).collect(Collectors.toList()); } catch (Exception exception) { diff --git a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableService.java b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableService.java index d1ee5ea12..53b484449 100644 --- a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableService.java +++ b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/PolarisConnectorTableService.java @@ -388,7 +388,9 @@ public List getTableNames( log.warn(String.format("Calling Polaris getTableNames with nonempty filter %s", filter)); } final List databaseNames = name.isDatabaseDefinition() ? ImmutableList.of(name.getDatabaseName()) - : polarisStoreService.getAllDatabases().stream().map(d -> d.getDbName()).collect(Collectors.toList()); + : polarisStoreService.getDatabaseNames(null, null, + connectorContext.getConfig().getListDatabaseNamesPageSize() + ); int limitSize = limit == null || limit < 0 ? Integer.MAX_VALUE : limit; final List result = Lists.newArrayList(); for (int i = 0; i < databaseNames.size() && limitSize > 0; i++) { diff --git a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/PolarisStoreConnector.java b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/PolarisStoreConnector.java index 5c63890d9..94341ca6a 100644 --- a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/PolarisStoreConnector.java +++ b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/PolarisStoreConnector.java @@ -1,19 +1,17 @@ package com.netflix.metacat.connector.polaris.store; +import com.netflix.metacat.common.dto.Sort; import com.netflix.metacat.connector.polaris.store.entities.AuditEntity; import com.netflix.metacat.connector.polaris.store.entities.PolarisDatabaseEntity; import com.netflix.metacat.connector.polaris.store.entities.PolarisTableEntity; import com.netflix.metacat.connector.polaris.store.repos.PolarisDatabaseRepository; import com.netflix.metacat.connector.polaris.store.repos.PolarisTableRepository; import lombok.RequiredArgsConstructor; -import org.springframework.data.domain.PageRequest; -import org.springframework.data.domain.Pageable; -import org.springframework.data.domain.Slice; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; +import javax.annotation.Nullable; import java.time.Instant; -import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -60,27 +58,22 @@ public void deleteDatabase(final String dbName) { dbRepo.deleteByName(dbName); } - /** - * Fetches all database entities. - * - * @return Polaris Database entities - */ @Override @Transactional(propagation = Propagation.SUPPORTS) - public List getAllDatabases() { - final int pageFetchSize = 1000; - final List retval = new ArrayList<>(); - Pageable page = PageRequest.of(0, pageFetchSize); - boolean hasNext; - do { - final Slice dbs = dbRepo.getDatabases(page); - retval.addAll(dbs.toList()); - hasNext = dbs.hasNext(); - if (hasNext) { - page = dbs.nextPageable(); - } - } while (hasNext); - return retval; + public List getDatabases( + @Nullable final String dbNamePrefix, + @Nullable final Sort sort, + final int pageSize) { + return (List) dbRepo.getAllDatabases(dbNamePrefix, sort, pageSize, true); + } + + @Override + @Transactional(propagation = Propagation.SUPPORTS) + public List getDatabaseNames( + @Nullable final String dbNamePrefix, + @Nullable final Sort sort, + final int pageSize) { + return (List) dbRepo.getAllDatabases(dbNamePrefix, sort, pageSize, false); } /** diff --git a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/PolarisStoreService.java b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/PolarisStoreService.java index db358abce..1f576b92c 100644 --- a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/PolarisStoreService.java +++ b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/PolarisStoreService.java @@ -1,5 +1,6 @@ package com.netflix.metacat.connector.polaris.store; +import com.netflix.metacat.common.dto.Sort; import com.netflix.metacat.connector.polaris.store.entities.PolarisDatabaseEntity; import com.netflix.metacat.connector.polaris.store.entities.PolarisTableEntity; @@ -35,9 +36,21 @@ public interface PolarisStoreService { /** * Fetches all database entities. + * @param dbNamePrefix dbNamePrefix to return + * @param pageSize db page size + * @param sort the order of the result * @return Polaris Database entities */ - List getAllDatabases(); + List getDatabases(String dbNamePrefix, Sort sort, int pageSize); + + /** + * Fetches all database entities. + * @param dbNamePrefix dbNamePrefix to return + * @param sort the order of the result + * @param pageSize db page size + * @return Polaris Database entities + */ + List getDatabaseNames(String dbNamePrefix, Sort sort, int pageSize); /** * Checks if database with the name exists. diff --git a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisDatabaseCustomRepository.java b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisDatabaseCustomRepository.java new file mode 100644 index 000000000..c165c3cec --- /dev/null +++ b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisDatabaseCustomRepository.java @@ -0,0 +1,20 @@ +package com.netflix.metacat.connector.polaris.store.repos; + +import com.netflix.metacat.common.dto.Sort; + +import java.util.List; + +/** + * Custom JPA repository implementation for storing PolarisDatabaseEntity. + */ +public interface PolarisDatabaseCustomRepository { + /** + * Fetch db entities for given database using AS OF SYSTEM TIME follower_read_timestamp(). + * @param dbNamePrefix db name prefix. can be empty. + * @param sort sort + * @param pageSize db pageSize + * @param selectAllColumns if true return the PolarisEntity else return name of the entity + * @return table entities in the database. + */ + List getAllDatabases(String dbNamePrefix, Sort sort, int pageSize, boolean selectAllColumns); +} diff --git a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisDatabaseCustomRepositoryImpl.java b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisDatabaseCustomRepositoryImpl.java new file mode 100644 index 000000000..47642da7e --- /dev/null +++ b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisDatabaseCustomRepositoryImpl.java @@ -0,0 +1,100 @@ +package com.netflix.metacat.connector.polaris.store.repos; + +import com.netflix.metacat.common.dto.SortOrder; +import com.netflix.metacat.connector.polaris.store.entities.PolarisDatabaseEntity; +import org.springframework.data.domain.Sort; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Pageable; +import org.springframework.data.domain.Slice; +import org.springframework.data.domain.SliceImpl; +import org.springframework.transaction.annotation.Transactional; + +import javax.annotation.Nullable; +import javax.persistence.EntityManager; +import javax.persistence.PersistenceContext; +import javax.persistence.Query; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Implementation for Custom JPA repository implementation for interacting with PolarisDatabaseEntity. + */ +public class PolarisDatabaseCustomRepositoryImpl implements PolarisDatabaseCustomRepository { + @PersistenceContext + private EntityManager entityManager; + + private Slice getAllDatabasesForCurrentPage( + final String dbNamePrefix, final Pageable page, final boolean selectAllColumns) { + + // Generate ORDER BY clause + String orderBy = ""; + if (page.getSort().isSorted()) { + orderBy = page.getSort().stream() + .map(order -> order.getProperty() + " " + order.getDirection()) + .collect(Collectors.joining(", ")); + orderBy = " ORDER BY " + orderBy; + } + + final String selectClause = selectAllColumns ? "d.*" : "d.name"; + final String sql = "SELECT " + selectClause + " FROM DBS d " + + "WHERE d.name LIKE :dbNamePrefix" + orderBy; + + Query query; + if (selectAllColumns) { + query = entityManager.createNativeQuery(sql, PolarisDatabaseEntity.class); + } else { + query = entityManager.createNativeQuery(sql); + } + query.setParameter("dbNamePrefix", dbNamePrefix + "%"); + query.setFirstResult(page.getPageNumber() * page.getPageSize()); + query.setMaxResults(page.getPageSize() + 1); // Fetch one extra result to determine if there is a next page + final List resultList = query.getResultList(); + + // Check if there is a next page + final boolean hasNext = resultList.size() > page.getPageSize(); + + // If there is a next page, remove the last item from the list + if (hasNext) { + resultList.remove(resultList.size() - 1); + } + return new SliceImpl<>(resultList, page, hasNext); + } + + @Override + @Transactional + public List getAllDatabases( + @Nullable final String dbNamePrefix, + @Nullable final com.netflix.metacat.common.dto.Sort sort, + final int pageSize, + final boolean selectAllColumns) { + final List retval = new ArrayList<>(); + + final String dbPrefix = dbNamePrefix == null ? "" : dbNamePrefix; + + // by default sort name in ascending order + Sort dbSort = Sort.by("name").ascending(); + if (sort != null && sort.hasSort()) { + if (sort.getOrder() == SortOrder.DESC) { + dbSort = Sort.by(sort.getSortBy()).descending(); + } else { + dbSort = Sort.by(sort.getSortBy()).ascending(); + } + } + + Pageable page = PageRequest.of(0, pageSize, dbSort); + entityManager.createNativeQuery("SET TRANSACTION AS OF SYSTEM TIME follower_read_timestamp()") + .executeUpdate(); + Slice dbs; + boolean hasNext; + do { + dbs = getAllDatabasesForCurrentPage(dbPrefix, page, selectAllColumns); + retval.addAll(dbs.getContent()); + hasNext = dbs.hasNext(); + if (hasNext) { + page = dbs.nextPageable(); + } + } while (hasNext); + return retval; + } +} diff --git a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisDatabaseRepository.java b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisDatabaseRepository.java index afad95a3a..c053a28c5 100644 --- a/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisDatabaseRepository.java +++ b/metacat-connector-polaris/src/main/java/com/netflix/metacat/connector/polaris/store/repos/PolarisDatabaseRepository.java @@ -17,7 +17,7 @@ */ @Repository public interface PolarisDatabaseRepository extends JpaRepository, - JpaSpecificationExecutor { + JpaSpecificationExecutor, PolarisDatabaseCustomRepository { /** * Fetch database entry. diff --git a/metacat-connector-polaris/src/test/java/com/netflix/metacat/connector/polaris/PolarisConnectorDatabaseServiceTest.java b/metacat-connector-polaris/src/test/java/com/netflix/metacat/connector/polaris/PolarisConnectorDatabaseServiceTest.java index 9090482c3..98fb0eca7 100644 --- a/metacat-connector-polaris/src/test/java/com/netflix/metacat/connector/polaris/PolarisConnectorDatabaseServiceTest.java +++ b/metacat-connector-polaris/src/test/java/com/netflix/metacat/connector/polaris/PolarisConnectorDatabaseServiceTest.java @@ -2,7 +2,6 @@ package com.netflix.metacat.connector.polaris; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import com.netflix.metacat.common.QualifiedName; import com.netflix.metacat.common.server.connectors.ConnectorContext; import com.netflix.metacat.common.server.connectors.ConnectorRequestContext; @@ -15,6 +14,7 @@ import com.netflix.metacat.connector.polaris.configs.PolarisPersistenceConfig; import com.netflix.metacat.connector.polaris.store.PolarisStoreService; import com.netflix.spectator.api.NoopRegistry; +import lombok.Getter; import org.junit.Assert; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -29,7 +29,6 @@ import spock.lang.Shared; import java.util.Date; -import java.util.List; /** @@ -40,12 +39,13 @@ @ActiveProfiles(profiles = {"polarisconnectortest"}) @DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD) @AutoConfigureDataJpa +@Getter public class PolarisConnectorDatabaseServiceTest { - private static final String CATALOG_NAME = "catalog_name"; - private static final String DB1_NAME = "db1_name"; - private static final String DB2_NAME = "db2_name"; - private static final QualifiedName DB1_QUALIFIED_NAME = QualifiedName.ofDatabase(CATALOG_NAME, DB1_NAME); - private static final QualifiedName DB2_QUALIFIED_NAME = QualifiedName.ofDatabase(CATALOG_NAME, DB2_NAME); + public static final String CATALOG_NAME = "catalog_name"; + public static final String DB1_NAME = "db1_name"; + public static final String DB2_NAME = "db2_name"; + public static final QualifiedName DB1_QUALIFIED_NAME = QualifiedName.ofDatabase(CATALOG_NAME, DB1_NAME); + public static final QualifiedName DB2_QUALIFIED_NAME = QualifiedName.ofDatabase(CATALOG_NAME, DB2_NAME); @Autowired private PolarisStoreService polarisStoreService; @@ -166,24 +166,5 @@ public void testDeleteDb() { polarisDBService.delete(requestContext, DB1_QUALIFIED_NAME); Assert.assertFalse(polarisDBService.exists(requestContext, DB1_QUALIFIED_NAME)); } - - /** - * Test list databases. - */ - @Test - public void testListDb() { - final DatabaseInfo db1 = DatabaseInfo.builder().name(DB1_QUALIFIED_NAME).uri("uri1").build(); - final DatabaseInfo db2 = DatabaseInfo.builder().name(DB2_QUALIFIED_NAME).uri("uri2").build(); - polarisDBService.create(requestContext, db1); - polarisDBService.create(requestContext, db2); - Assert.assertTrue(polarisDBService.exists(requestContext, DB1_QUALIFIED_NAME)); - Assert.assertTrue(polarisDBService.exists(requestContext, DB2_QUALIFIED_NAME)); - final List dbNames = - polarisDBService.listNames(requestContext, QualifiedName.ofCatalog(CATALOG_NAME), null, null, null); - Assert.assertEquals(Sets.newHashSet(dbNames), Sets.newHashSet(DB1_QUALIFIED_NAME, DB2_QUALIFIED_NAME)); - final List dbs = - polarisDBService.list(requestContext, QualifiedName.ofCatalog(CATALOG_NAME), null, null, null); - Assert.assertEquals(Sets.newHashSet(dbs), Sets.newHashSet(db1, db2)); - } } diff --git a/metacat-functional-tests/src/functionalTest/groovy/com/netflix/metacat/MetacatFunctionalSpec.groovy b/metacat-functional-tests/src/functionalTest/groovy/com/netflix/metacat/MetacatFunctionalSpec.groovy index 7f9dc1d60..22c0aa16e 100644 --- a/metacat-functional-tests/src/functionalTest/groovy/com/netflix/metacat/MetacatFunctionalSpec.groovy +++ b/metacat-functional-tests/src/functionalTest/groovy/com/netflix/metacat/MetacatFunctionalSpec.groovy @@ -226,6 +226,7 @@ class MetacatFunctionalSpec extends Specification { when: api.createDatabase(catalog.name, databaseName, dto) + Thread.sleep(5000) catalogResponse = api.getCatalog(catalog.name) then: @@ -269,6 +270,7 @@ class MetacatFunctionalSpec extends Specification { when: api.createDatabase(catalog.name, databaseName, dto) + Thread.sleep(5000) catalogResponse = api.getCatalog(catalog.name) then: