Skip to content

Commit

Permalink
[yugabyte#14559][YSQL][YCQL] Implement select distinct pushdown to DocDB
Browse files Browse the repository at this point in the history
Summary:
We implement pushdown for SELECT DISTINCT query.

We extend the Hybrid Scan work to support scanning tuple prefixes. Specifically, with a given prefix length, Hybrid Scan will advance to the next prefix that is different from the previous one.

We need to determine the prefix length to be used when scanning in DocDB. This should equal the index of the last column to be requested in the scan.

Test Plan:
unit test added.

A example to demonstrate pushdown working as intended:

Populate data

```
yugabyte=# create table t(h int, c int);
CREATE TABLE
create index idx on t(h ASC, c ASC);

yugabyte=# insert into t (select 1, i from generate_series(1, 1000000) as i);
INSERT 0 1000000
yugabyte=# insert into t (select 2, i from generate_series(1, 1000000) as i);
INSERT 0 1000000
```

Before

```
yugabyte=# explain analyze select distinct h from t where h <= 2;
                                                          QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------
 Unique  (cost=0.00..15.50 rows=82 width=4) (actual time=7.259..12649.788 rows=2 loops=1)
   ->  Index Only Scan using idx on t  (cost=0.00..15.25 rows=100 width=4) (actual time=7.255..12415.214 rows=2000000 loops=1)
         Index Cond: (h <= 2)
         Heap Fetches: 0
 Planning Time: 0.094 ms
 Execution Time: 12649.868 ms
 Peak Memory Usage: 8 kB
(7 rows)
```

After

```
yugabyte=# explain analyze select distinct h from t where h <= 2;
                                                     QUERY PLAN
---------------------------------------------------------------------------------------------------------------------
 Unique  (cost=0.00..15.50 rows=82 width=4) (actual time=2.182..2.191 rows=2 loops=1)
   ->  Index Only Scan using idx on t  (cost=0.00..15.25 rows=100 width=4) (actual time=2.178..2.183 rows=2 loops=1)
         Index Cond: (h <= 2)
         Heap Fetches: 0
 Planning Time: 0.113 ms
 Execution Time: 2.274 ms
 Peak Memory Usage: 8 kB
(7 rows)
```

Reviewers: smishra, amartsinchyk, pjain, tnayak

Reviewed By: tnayak

Subscribers: rskannan, yql, kannan, smishra

Differential Revision: https://phabricator.dev.yugabyte.com/D20742
  • Loading branch information
lnguyen-yugabyte authored and Linh Nguyen committed Jan 3, 2023
1 parent c97640f commit 1524735
Show file tree
Hide file tree
Showing 20 changed files with 375 additions and 37 deletions.
58 changes: 58 additions & 0 deletions java/yb-cql/src/test/java/org/yb/cql/TestSelect.java
Original file line number Diff line number Diff line change
Expand Up @@ -2252,6 +2252,64 @@ public void testDistinct() throws Exception {
runInvalidQuery("select distinct h, s from test_distinct where c < 0;");
}

@Test
public void testDistinctPushdown() throws Exception {
session.execute("create table t(h int, c int, primary key(h, c))");
session.execute("insert into t(h, c) values (0, 0)");
session.execute("insert into t(h, c) values (0, 1)");
session.execute("insert into t(h, c) values (0, 2)");
session.execute("insert into t(h, c) values (1, 0)");
session.execute("insert into t(h, c) values (1, 1)");
session.execute("insert into t(h, c) values (1, 2)");

// For both queries, the scan should jump directly to the relevant primary key,
// so the number of seeks is equal to the items to be retrived.
{
String query = "select distinct h from t where h = 0";
String[] rows = {"Row[0]"};

RocksDBMetrics metrics = assertPartialRangeSpec("t", query, rows);
assertEquals(1, metrics.seekCount);
}

{
String query = "select distinct h from t where h in (0, 1)";
String[] rows = {"Row[0]", "Row[1]"};

RocksDBMetrics metrics = assertPartialRangeSpec("t", query, rows);
assertEquals(2, metrics.seekCount);
}
}

@Test
public void testDistinctPushdownSecondColumn() throws Exception {
session.execute("create table t(r1 int, r2 int, r3 int, primary key(r2, r3))");
session.execute("insert into t(r1, r2, r3) values (0, 0, 0)");
session.execute("insert into t(r1, r2, r3) values (0, 0, 1)");
session.execute("insert into t(r1, r2, r3) values (0, 0, 2)");
session.execute("insert into t(r1, r2, r3) values (1, 1, 0)");
session.execute("insert into t(r1, r2, r3) values (1, 1, 1)");
session.execute("insert into t(r1, r2, r3) values (1, 1, 2)");

// For both queries, the scan should jump directly to the relevant primary key,
// so the number of seeks is equal to the items to be retrived.
{
String query = "select distinct r2 from t where r2 = 0";
String[] rows = {"Row[0]"};

RocksDBMetrics metrics = assertPartialRangeSpec("t", query, rows);
assertEquals(1, metrics.seekCount);
}

{
String query = "select distinct r2 from t where r2 in (0, 1)";
String[] rows = {"Row[0]", "Row[1]"};

RocksDBMetrics metrics = assertPartialRangeSpec("t", query, rows);
assertEquals(2, metrics.seekCount);
}
}

@Test
public void testToJson() throws Exception {
// Create test table.
Expand Down
198 changes: 198 additions & 0 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgSelect.java
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,204 @@ public void testPartialKeyScan() throws Exception {
}
}

@Test
public void testIndexDistinctRangeScan() throws Exception {
String query = "CREATE TABLE t(r1 INT, r2 INT, PRIMARY KEY(r1 ASC, r2 ASC))";
try (Statement statement = connection.createStatement()) {
statement.execute(query);

query = "INSERT INTO t (SELECT 1, i FROM GENERATE_SERIES(1, 10) AS i)";
statement.execute(query);

query = "INSERT INTO t (SELECT 2, i FROM GENERATE_SERIES(1, 10) AS i)";
statement.execute(query);

query = "INSERT INTO t (SELECT 3, i FROM GENERATE_SERIES(1, 10) AS i)";
statement.execute(query);

Set<Row> expectedRows = new HashSet<>();
expectedRows.add(new Row(1));
expectedRows.add(new Row(2));
expectedRows.add(new Row(3));
query = "SELECT DISTINCT r1 FROM t WHERE r1 <= 3";
assertRowSet(statement, query, expectedRows);

// With DISTINCT pushed down to DocDB, we only to scan three keys:
// 1. From kLowest, seek to 1.
// 2. From 1, seek to 2.
// 3. From 2, seek to 3.
// The constraint r1 <= 3 implies we are done after the third seek.
RocksDBMetrics metrics = assertFullDocDBFilter(statement, query, "t");
assertEquals(3, metrics.seekCount);
}
}

@Test
public void testIndexDistinctMulticolumnsScan() throws Exception {
String query = "CREATE TABLE t(r1 INT, r2 INT, r3 INT, PRIMARY KEY(r1 ASC, r2 ASC, r3 ASC))";
try (Statement statement = connection.createStatement()) {
statement.execute(query);

query = "INSERT INTO t (SELECT 1, 1, i FROM GENERATE_SERIES(1, 100) AS i)";
statement.execute(query);

query = "INSERT INTO t (SELECT 2, 2, i FROM GENERATE_SERIES(1, 100) AS i)";
statement.execute(query);

query = "INSERT INTO t (SELECT 3, 1, i FROM GENERATE_SERIES(1, 100) AS i)";
statement.execute(query);

query = "INSERT INTO t (SELECT 3, 2, i FROM GENERATE_SERIES(1, 100) AS i)";
statement.execute(query);

query = "INSERT INTO t (SELECT 3, 3, i FROM GENERATE_SERIES(1, 100) AS i)";
statement.execute(query);

Set<Row> expectedRows = new HashSet<>();
expectedRows.add(new Row(1, 1));
expectedRows.add(new Row(2, 2));
expectedRows.add(new Row(3, 1));
expectedRows.add(new Row(3, 2));
expectedRows.add(new Row(3, 3));
query = "SELECT DISTINCT r1, r2 FROM t WHERE r1 <= 3";
assertRowSet(statement, query, expectedRows);

// We need to do 6 seeks here:
// 1. Seek from (kLowest, kLowest), found (1, 1).
// 2. Seek from (1, 1), found (2, 2).
// 3. Seek from (2, 2), found (3, 1).
// 4. Seek from (3, 1), found (3, 2).
// 5. Seek from (3, 2), found (3, 3).
// 6. Seek from (3, 3), found no more key.
// Note that we need the last seek, since under the condition r1 <= 3, we don't know whether
// there are more items to be scanned.
RocksDBMetrics metrics = assertFullDocDBFilter(statement, query, "t");
assertEquals(6, metrics.seekCount);
}
}

@Test
public void testIndexDistinctSkipColumnScan() throws Exception {
String query = "CREATE TABLE t(r1 INT, r2 INT, r3 INT, r4 INT, " +
" PRIMARY KEY(r1 ASC, r2 ASC, r3 ASC, r4 ASC))";

try (Statement statement = connection.createStatement()) {
statement.execute(query);

query = "INSERT INTO t (SELECT 1, 1, 1, i FROM GENERATE_SERIES(1, 100) AS i)";
statement.execute(query);

query = "INSERT INTO t (SELECT 2, 2, 2, i FROM GENERATE_SERIES(1, 100) AS i)";
statement.execute(query);

query = "INSERT INTO t (SELECT 3, 3, 3, i FROM GENERATE_SERIES(1, 100) AS i)";
statement.execute(query);

Set<Row> expectedRows = new HashSet<>();
expectedRows.add(new Row(1, 1));
expectedRows.add(new Row(2, 2));
expectedRows.add(new Row(3, 3));
query = "SELECT DISTINCT r1, r3 FROM t WHERE r3 <= 3";
assertRowSet(statement, query, expectedRows);

RocksDBMetrics metrics = assertFullDocDBFilter(statement, query, "t");
assertEquals(1, metrics.seekCount);
}
}

@Test
public void testDistinctScanHashColumn() throws Exception {
String query = "CREATE TABLE t(r1 INT, r2 INT, r3 INT, PRIMARY KEY(r1 HASH, r2 ASC, r3 ASC))";
try (Statement statement = connection.createStatement()) {
statement.execute(query);

query = "INSERT INTO t (SELECT i, i, i FROM GENERATE_SERIES(1, 100) AS i)";
statement.execute(query);

{
Set<Row> expectedRows = new HashSet<>();
for (int i = 1; i <= 100; i++) {
expectedRows.add(new Row(i));
}

query = "SELECT DISTINCT r1 FROM t WHERE r1 <= 100";
assertRowSet(statement, query, expectedRows);

// Here we do a sequential scan.
RocksDBMetrics metrics = assertFullDocDBFilter(statement, query, "t");
assertEquals(3, metrics.seekCount);
}

{
Set<Row> expectedRows = new HashSet<>();
expectedRows.add(new Row(100));

query = "SELECT DISTINCT r1 FROM t WHERE r1 = 100";
assertRowSet(statement, query, expectedRows);

// Here we do an index scan.
RocksDBMetrics metrics = assertFullDocDBFilter(statement, query, "t");
assertEquals(1, metrics.seekCount);
}
}
}

@Test
public void testDistinctMultiHashColumns() throws Exception {
String query = "CREATE TABLE t(h1 INT, h2 INT, r INT, PRIMARY KEY((h1, h2) HASH, r ASC))";
try (Statement statement = connection.createStatement()) {
statement.execute(query);

query = "INSERT INTO t (SELECT i, i, i FROM GENERATE_SERIES(1, 100) AS i)";
statement.execute(query);

{
Set<Row> expectedRows = new HashSet<>();
expectedRows.add(new Row(1, 1));

query = "SELECT DISTINCT h1, h2 FROM t WHERE h1 = 1 AND h2 = 1";
assertRowSet(statement, query, expectedRows);

RocksDBMetrics metrics = assertFullDocDBFilter(statement, query, "t");
assertEquals(1, metrics.seekCount);
}
}
}

@Test
public void testDistinctOnNonPrefixScan() throws Exception {
String query = "CREATE TABLE t(r1 INT, r2 INT, r3 INT)";
try (Statement statement = connection.createStatement()) {
statement.execute(query);

query = "CREATE INDEX idx on t(r3 ASC)";
statement.execute(query);

query = "INSERT INTO t (SELECT i, 1, 1 FROM GENERATE_SERIES(1, 100) AS i)";
statement.execute(query);

{
Set<Row> expectedRows = new HashSet<>();
expectedRows.add(new Row(1));

query = "SELECT DISTINCT r3 FROM t WHERE r3 <= 10";
assertRowSet(statement, query, expectedRows);

// Here we perform the seek on the index table with two seeks:
// 1. From kLowest, we seek to 1.
// 2. From 1, we seek to the next key, which is not found.
// Note that we need to seek on the index table. The main table will result
// in zero seeks.

RocksDBMetrics metrics = assertFullDocDBFilter(statement, query, "t");
assertEquals(0, metrics.seekCount);

metrics = assertFullDocDBFilter(statement, query, "idx");
assertEquals(2, metrics.seekCount);
}
}
}

@Test
public void testStrictInequalities() throws Exception {
String query = "CREATE TABLE sample_table(h INT, r1 INT, r2 INT, r3 INT, " +
Expand Down
7 changes: 7 additions & 0 deletions src/postgres/src/backend/executor/nodeUnique.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ ExecUnique(PlanState *pstate)
PlanState *outerPlan;

CHECK_FOR_INTERRUPTS();

/*
* SELECT DISTINCT is only enabled for an index scan. Specifically, for a scan on hash columns,
* the index scan will not be used.
*/
if (IsYugaByteEnabled())
pstate->state->yb_exec_params.is_select_distinct = true;

/*
* get information from the node
Expand Down
4 changes: 4 additions & 0 deletions src/yb/common/pgsql_protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,10 @@ message PgsqlReadRequestPB {
// Reading distinct columns?
optional bool distinct = 11 [default = false];

// Current only used on SELECT DISTINCT scan. If the value is greater than 0, use the specified
// prefix length to scan the table.
optional uint64 prefix_length = 39 [default = 0];

// Flag for reading aggregate values.
optional bool is_aggregate = 12 [default = false];

Expand Down
4 changes: 4 additions & 0 deletions src/yb/common/ql_protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ message QLReadRequestPB {
// Reading distinct columns?
optional bool distinct = 12 [default = false];

// Current only used on SELECT DISTINCT scan. If the value is greater than 0, use the specified
// prefix length to scan the table.
optional uint64 prefix_length = 23 [default = 0];

// Limit number of rows to return. For QL SELECT, this limit is the smaller of the page size (max
// (max number of rows to return per fetch) & the LIMIT clause if present in the SELECT statement.
optional uint64 limit = 8;
Expand Down
12 changes: 8 additions & 4 deletions src/yb/docdb/doc_pgsql_scanspec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ DocPgsqlScanSpec::DocPgsqlScanSpec(const Schema& schema,
const boost::optional<int32_t> hash_code,
const boost::optional<int32_t> max_hash_code,
const DocKey& start_doc_key,
bool is_forward_scan)
bool is_forward_scan,
const size_t prefix_length)
: PgsqlScanSpec(nullptr),
schema_(schema),
query_id_(query_id),
Expand All @@ -47,7 +48,8 @@ DocPgsqlScanSpec::DocPgsqlScanSpec(const Schema& schema,
max_hash_code_(max_hash_code),
start_doc_key_(start_doc_key.empty() ? KeyBytes() : start_doc_key.Encode()),
lower_doc_key_(doc_key.Encode()),
is_forward_scan_(is_forward_scan) {
is_forward_scan_(is_forward_scan),
prefix_length_(prefix_length) {

// Compute lower and upper doc_key.
// We add +inf as an extra component to make sure this is greater than all keys in range.
Expand Down Expand Up @@ -87,7 +89,8 @@ DocPgsqlScanSpec::DocPgsqlScanSpec(
const DocKey& start_doc_key,
bool is_forward_scan,
const DocKey& lower_doc_key,
const DocKey& upper_doc_key)
const DocKey& upper_doc_key,
const size_t prefix_length)
: PgsqlScanSpec(where_expr),
range_bounds_(condition ? new QLScanRange(schema, *condition) : nullptr),
schema_(schema),
Expand All @@ -100,7 +103,8 @@ DocPgsqlScanSpec::DocPgsqlScanSpec(
start_doc_key_(start_doc_key.empty() ? KeyBytes() : start_doc_key.Encode()),
lower_doc_key_(lower_doc_key.Encode()),
upper_doc_key_(upper_doc_key.Encode()),
is_forward_scan_(is_forward_scan) {
is_forward_scan_(is_forward_scan),
prefix_length_(prefix_length) {

auto lower_bound_key = bound_key(schema, true);
lower_doc_key_ = lower_bound_key > lower_doc_key_
Expand Down
12 changes: 10 additions & 2 deletions src/yb/docdb/doc_pgsql_scanspec.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class DocPgsqlScanSpec : public PgsqlScanSpec {
const boost::optional<int32_t> hash_code = boost::none,
const boost::optional<int32_t> max_hash_code = boost::none,
const DocKey& start_doc_key = DefaultStartDocKey(),
bool is_forward_scan = true);
bool is_forward_scan = true,
const size_t prefix_length = 0);

// Scan for the given hash key, a condition, and optional doc_key.
//
Expand All @@ -56,7 +57,8 @@ class DocPgsqlScanSpec : public PgsqlScanSpec {
const DocKey& start_doc_key = DefaultStartDocKey(),
bool is_forward_scan = true,
const DocKey& lower_doc_key = DefaultStartDocKey(),
const DocKey& upper_doc_key = DefaultStartDocKey());
const DocKey& upper_doc_key = DefaultStartDocKey(),
const size_t prefix_length = 0);

//------------------------------------------------------------------------------------------------
// Access funtions.
Expand All @@ -68,6 +70,10 @@ class DocPgsqlScanSpec : public PgsqlScanSpec {
return is_forward_scan_;
}

const size_t prefix_length() const {
return prefix_length_;
}

//------------------------------------------------------------------------------------------------
// Filters.
std::shared_ptr<rocksdb::ReadFileFilter> CreateFileFilter() const;
Expand Down Expand Up @@ -161,6 +167,8 @@ class DocPgsqlScanSpec : public PgsqlScanSpec {
// Scan behavior.
bool is_forward_scan_;

size_t prefix_length_ = 0;

DISALLOW_COPY_AND_ASSIGN(DocPgsqlScanSpec);
};

Expand Down
Loading

0 comments on commit 1524735

Please sign in to comment.