Skip to content

Commit

Permalink
feat: use order key as mv's dist key (#20176)
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page authored Jan 17, 2025
1 parent f0db87b commit 27efddf
Show file tree
Hide file tree
Showing 28 changed files with 2,366 additions and 2,167 deletions.
6 changes: 2 additions & 4 deletions src/frontend/planner_test/tests/testdata/output/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,7 @@
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchSortAgg { group_key: [mv.v1], aggs: [max(mv.v2)] }
└─BatchExchange { order: [mv.v1 DESC], dist: HashShard(mv.v1) }
└─BatchScan { table: mv, columns: [mv.v1, mv.v2], distribution: SomeShard }
└─BatchScan { table: mv, columns: [mv.v1, mv.v2], distribution: UpstreamHashShard(mv.v1) }
- sql: |
create table t(v1 int, v2 int);
select v1, max(v2) from t group by v1 order by v1 desc;
Expand Down Expand Up @@ -367,8 +366,7 @@
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashAgg { group_key: [mv.v1], aggs: [max(mv.v2)] }
└─BatchExchange { order: [], dist: HashShard(mv.v1) }
└─BatchScan { table: mv, columns: [mv.v1, mv.v2], distribution: SomeShard }
└─BatchScan { table: mv, columns: [mv.v1, mv.v2], distribution: UpstreamHashShard(mv.v1) }
with_config_map:
RW_BATCH_ENABLE_SORT_AGG: 'false'
- name: Not use BatchSortAgg, when output requires order
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
select v1 from t1 order by v1 limit 3 offset 3;
stream_plan: |-
StreamMaterialize { columns: [v1, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [v1, t1._row_id], pk_conflict: NoCheck }
└─StreamTopN [append_only] { order: [t1.v1 ASC], limit: 3, offset: 3 }
└─StreamExchange { dist: Single }
└─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(t1.v1) }
└─StreamTopN [append_only] { order: [t1.v1 ASC], limit: 3, offset: 3 }
└─StreamExchange { dist: Single }
└─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
- sql: |
create table t1 (v1 int, v2 int) append only;
select max(v1) as max_v1 from t1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
sql: |
select max(v) as a1 from S;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchSimpleAgg { aggs: [max(s.v)] }
└─BatchScan { table: s, columns: [s.v], distribution: Single }
BatchSimpleAgg { aggs: [max(max(s.v))] }
└─BatchExchange { order: [], dist: Single }
└─BatchSimpleAgg { aggs: [max(s.v)] }
└─BatchScan { table: s, columns: [s.v], distribution: SomeShard }
batch_local_plan: |-
BatchSimpleAgg { aggs: [max(s.v)] }
└─BatchExchange { order: [], dist: Single }
Expand Down Expand Up @@ -160,7 +161,7 @@
└─BatchProject { exprs: [max(s.v)] }
└─BatchHashAgg { group_key: [s.k], aggs: [max(s.v)] }
└─BatchExchange { order: [], dist: HashShard(s.k) }
└─BatchScan { table: s, columns: [s.k, s.v], distribution: Single }
└─BatchScan { table: s, columns: [s.k, s.v], distribution: SomeShard }
batch_local_plan: |-
BatchProject { exprs: [max(s.v)] }
└─BatchHashAgg { group_key: [s.k], aggs: [max(s.v)] }
Expand Down
563 changes: 295 additions & 268 deletions src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
└─BatchScan { table: t1, columns: [t1.id, t1.i], limit: 2, distribution: UpstreamHashShard(t1.id) }
stream_plan: |-
StreamMaterialize { columns: [id, i], stream_key: [id], pk_columns: [id], pk_conflict: NoCheck }
└─StreamProject { exprs: [t1.id, t1.i] }
└─StreamTopN { order: [t1.id ASC], limit: 2, offset: 0 }
└─StreamExchange { dist: Single }
└─StreamGroupTopN { order: [t1.id ASC], limit: 2, offset: 0, group_key: [_vnode] }
└─StreamProject { exprs: [t1.id, t1.i, Vnode(t1.id) as _vnode] }
└─StreamTableScan { table: t1, columns: [t1.id, t1.i], stream_scan_type: ArrangementBackfill, stream_key: [t1.id], pk: [id], dist: UpstreamHashShard(t1.id) }
└─StreamExchange { dist: HashShard(t1.id) }
└─StreamProject { exprs: [t1.id, t1.i] }
└─StreamTopN { order: [t1.id ASC], limit: 2, offset: 0 }
└─StreamExchange { dist: Single }
└─StreamGroupTopN { order: [t1.id ASC], limit: 2, offset: 0, group_key: [_vnode] }
└─StreamProject { exprs: [t1.id, t1.i, Vnode(t1.id) as _vnode] }
└─StreamTableScan { table: t1, columns: [t1.id, t1.i], stream_scan_type: ArrangementBackfill, stream_key: [t1.id], pk: [id], dist: UpstreamHashShard(t1.id) }
- name: test functional dependency for order key pruning (order by - suffix fd)
sql: |
create table t1 (id int primary key, i int);
Expand All @@ -35,12 +36,13 @@
└─BatchScan { table: t1, columns: [t1.id, t1.i], distribution: UpstreamHashShard(t1.id) }
stream_plan: |-
StreamMaterialize { columns: [id, i], stream_key: [id], pk_columns: [i, id], pk_conflict: NoCheck }
└─StreamProject { exprs: [t1.id, t1.i] }
└─StreamTopN { order: [t1.i ASC, t1.id ASC], limit: 2, offset: 0 }
└─StreamExchange { dist: Single }
└─StreamGroupTopN { order: [t1.i ASC, t1.id ASC], limit: 2, offset: 0, group_key: [_vnode] }
└─StreamProject { exprs: [t1.id, t1.i, Vnode(t1.id) as _vnode] }
└─StreamTableScan { table: t1, columns: [t1.id, t1.i], stream_scan_type: ArrangementBackfill, stream_key: [t1.id], pk: [id], dist: UpstreamHashShard(t1.id) }
└─StreamExchange { dist: HashShard(t1.i, t1.id) }
└─StreamProject { exprs: [t1.id, t1.i] }
└─StreamTopN { order: [t1.i ASC, t1.id ASC], limit: 2, offset: 0 }
└─StreamExchange { dist: Single }
└─StreamGroupTopN { order: [t1.i ASC, t1.id ASC], limit: 2, offset: 0, group_key: [_vnode] }
└─StreamProject { exprs: [t1.id, t1.i, Vnode(t1.id) as _vnode] }
└─StreamTableScan { table: t1, columns: [t1.id, t1.i], stream_scan_type: ArrangementBackfill, stream_key: [t1.id], pk: [id], dist: UpstreamHashShard(t1.id) }
- name: test functional dependency for order key pruning on singleton
sql: |
create table t1 (id int primary key, i int);
Expand All @@ -54,8 +56,9 @@
└─BatchSort { order: [v.cnt ASC] }
└─BatchScan { table: v, columns: [v.cnt], distribution: Single }
stream_plan: |-
StreamMaterialize { columns: [cnt], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamTableScan { table: v, columns: [v.cnt], stream_scan_type: ArrangementBackfill, stream_key: [], pk: [], dist: Single }
StreamMaterialize { columns: [cnt], stream_key: [], pk_columns: [cnt], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(v.cnt) }
└─StreamTableScan { table: v, columns: [v.cnt], stream_scan_type: ArrangementBackfill, stream_key: [], pk: [], dist: Single }
- name: test functional dependency for order key pruning (index)
sql: |
create table t1 (v1 int, v2 int);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,12 +616,13 @@
select * from t1 order by a limit 1;
stream_plan: |-
StreamMaterialize { columns: [a], stream_key: [], pk_columns: [a], pk_conflict: NoCheck }
└─StreamProject { exprs: [t1.a] }
└─StreamTopN { order: [t1.a ASC], limit: 1, offset: 0 }
└─StreamExchange { dist: Single }
└─StreamGroupTopN { order: [t1.a ASC], limit: 1, offset: 0, group_key: [_vnode] }
└─StreamProject { exprs: [t1.a, Vnode(t1.a) as _vnode] }
└─StreamTableScan { table: t1, columns: [t1.a], stream_scan_type: ArrangementBackfill, stream_key: [t1.a], pk: [a], dist: UpstreamHashShard(t1.a) }
└─StreamExchange { dist: HashShard(t1.a) }
└─StreamProject { exprs: [t1.a] }
└─StreamTopN { order: [t1.a ASC], limit: 1, offset: 0 }
└─StreamExchange { dist: Single }
└─StreamGroupTopN { order: [t1.a ASC], limit: 1, offset: 0, group_key: [_vnode] }
└─StreamProject { exprs: [t1.a, Vnode(t1.a) as _vnode] }
└─StreamTableScan { table: t1, columns: [t1.a], stream_scan_type: ArrangementBackfill, stream_key: [t1.a], pk: [a], dist: UpstreamHashShard(t1.a) }
- sql: |
create table t1 (a varchar, b int, c int, d int);
create index idx on t1(a);
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/planner_test/tests/testdata/output/join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@
└─BatchScan { table: b, columns: [b.x], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [y, z, $expr2(hidden), a._row_id(hidden), b._row_id(hidden), a.x(hidden), b.x(hidden)], stream_key: [a._row_id, b._row_id, a.x, b.x], pk_columns: [$expr2, a._row_id, b._row_id, a.x, b.x], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(a._row_id, b._row_id, a.x, b.x) }
└─StreamExchange { dist: HashShard($expr2) }
└─StreamProject { exprs: [(2:Int32 * $expr1) as $expr3, $expr2, $expr2, a._row_id, b._row_id, a.x, b.x] }
└─StreamProject { exprs: [a.x, b.x, $expr1, ($expr1 + $expr1) as $expr2, a._row_id, b._row_id] }
└─StreamProject { exprs: [a.x, b.x, Coalesce(a.x, b.x) as $expr1, a._row_id, b._row_id] }
Expand Down
10 changes: 6 additions & 4 deletions src/frontend/planner_test/tests/testdata/output/limit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@
└─BatchValues { rows: [[1:Int32]] }
stream_plan: |-
StreamMaterialize { columns: [c, _row_id(hidden)], stream_key: [], pk_columns: [c], pk_conflict: NoCheck }
└─StreamTopN [append_only] { order: [1:Int32 ASC], limit: 1, offset: 0 }
└─StreamValues { rows: [[1:Int32, 0:Int64]] }
└─StreamExchange { dist: HashShard(1:Int32) }
└─StreamTopN [append_only] { order: [1:Int32 ASC], limit: 1, offset: 0 }
└─StreamValues { rows: [[1:Int32, 0:Int64]] }
- sql: |
select 1 c union all select 1 c limit 10
batch_plan: |-
Expand All @@ -117,8 +118,9 @@
└─BatchValues { rows: [[1:Int32], [1:Int32]] }
stream_plan: |-
StreamMaterialize { columns: [c, _row_id(hidden)], stream_key: [_row_id], pk_columns: [c, _row_id], pk_conflict: NoCheck }
└─StreamTopN [append_only] { order: [1:Int32 ASC], limit: 10, offset: 0 }
└─StreamValues { rows: [[1:Int32, 0:Int64], [1:Int32, 1:Int64]] }
└─StreamExchange { dist: HashShard(1:Int32) }
└─StreamTopN [append_only] { order: [1:Int32 ASC], limit: 10, offset: 0 }
└─StreamValues { rows: [[1:Int32, 0:Int64], [1:Int32, 1:Int64]] }
- sql: |
create table t (a int);
select count(*) from t limit 1;
Expand Down
42 changes: 23 additions & 19 deletions src/frontend/planner_test/tests/testdata/output/nexmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2271,42 +2271,46 @@
└─BatchScan { table: bid, columns: [bid.auction], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id], pk_columns: [bid_count, auction_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [auction.id, internal_last_seen_value(auction.item_name), count(bid.auction)] }
└─StreamTopN { order: [count(bid.auction) DESC], limit: 1000, offset: 0 }
└─StreamExchange { dist: Single }
└─StreamGroupTopN { order: [count(bid.auction) DESC], limit: 1000, offset: 0, group_key: [$expr1] }
└─StreamProject { exprs: [auction.id, internal_last_seen_value(auction.item_name), count(bid.auction), Vnode(auction.id) as $expr1] }
└─StreamHashAgg { group_key: [auction.id], aggs: [internal_last_seen_value(auction.item_name), count(bid.auction), count] }
└─StreamHashJoin { type: Inner, predicate: auction.id = bid.auction, output: all }
├─StreamExchange { dist: HashShard(auction.id) }
│ └─StreamTableScan { table: auction, columns: [auction.id, auction.item_name], stream_scan_type: ArrangementBackfill, stream_key: [auction.id], pk: [id], dist: UpstreamHashShard(auction.id) }
└─StreamExchange { dist: HashShard(bid.auction) }
└─StreamTableScan { table: bid, columns: [bid.auction, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) }
└─StreamExchange { dist: HashShard(count(bid.auction)) }
└─StreamProject { exprs: [auction.id, internal_last_seen_value(auction.item_name), count(bid.auction)] }
└─StreamTopN { order: [count(bid.auction) DESC], limit: 1000, offset: 0 }
└─StreamExchange { dist: Single }
└─StreamGroupTopN { order: [count(bid.auction) DESC], limit: 1000, offset: 0, group_key: [$expr1] }
└─StreamProject { exprs: [auction.id, internal_last_seen_value(auction.item_name), count(bid.auction), Vnode(auction.id) as $expr1] }
└─StreamHashAgg { group_key: [auction.id], aggs: [internal_last_seen_value(auction.item_name), count(bid.auction), count] }
└─StreamHashJoin { type: Inner, predicate: auction.id = bid.auction, output: all }
├─StreamExchange { dist: HashShard(auction.id) }
│ └─StreamTableScan { table: auction, columns: [auction.id, auction.item_name], stream_scan_type: ArrangementBackfill, stream_key: [auction.id], pk: [id], dist: UpstreamHashShard(auction.id) }
└─StreamExchange { dist: HashShard(bid.auction) }
└─StreamTableScan { table: bid, columns: [bid.auction, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id], pk_columns: [bid_count, auction_id], pk_conflict: NoCheck }
├── tables: [ Materialize: 4294967294 ]
└── StreamProject { exprs: [auction.id, internal_last_seen_value(auction.item_name), count(bid.auction)] }
└── StreamTopN { order: [count(bid.auction) DESC], limit: 1000, offset: 0 } { tables: [ TopN: 0 ] }
└── StreamExchange Single from 1
└── StreamExchange Hash([2]) from 1
Fragment 1
StreamProject { exprs: [auction.id, internal_last_seen_value(auction.item_name), count(bid.auction)] }
└── StreamTopN { order: [count(bid.auction) DESC], limit: 1000, offset: 0 } { tables: [ TopN: 0 ] }
└── StreamExchange Single from 2
Fragment 2
StreamGroupTopN { order: [count(bid.auction) DESC], limit: 1000, offset: 0, group_key: [$expr1] } { tables: [ GroupTopN: 1 ] }
└── StreamProject { exprs: [auction.id, internal_last_seen_value(auction.item_name), count(bid.auction), Vnode(auction.id) as $expr1] }
└── StreamHashAgg { group_key: [auction.id], aggs: [internal_last_seen_value(auction.item_name), count(bid.auction), count] }
├── tables: [ HashAggState: 2 ]
└── StreamHashJoin { type: Inner, predicate: auction.id = bid.auction, output: all }
├── tables: [ HashJoinLeft: 3, HashJoinDegreeLeft: 4, HashJoinRight: 5, HashJoinDegreeRight: 6 ]
├── StreamExchange Hash([0]) from 2
└── StreamExchange Hash([0]) from 3
├── StreamExchange Hash([0]) from 3
└── StreamExchange Hash([0]) from 4
Fragment 2
Fragment 3
StreamTableScan { table: auction, columns: [auction.id, auction.item_name], stream_scan_type: ArrangementBackfill, stream_key: [auction.id], pk: [id], dist: UpstreamHashShard(auction.id) }
├── tables: [ StreamScan: 7 ]
├── Upstream
└── BatchPlanNode
Fragment 3
Fragment 4
StreamTableScan { table: bid, columns: [bid.auction, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) }
├── tables: [ StreamScan: 8 ]
├── Upstream
Expand Down Expand Up @@ -2362,7 +2366,7 @@
├── columns: [ auction_id, auction_item_name, bid_count, _rw_timestamp ]
├── primary key: [ $2 DESC, $0 ASC ]
├── value indices: [ 0, 1, 2 ]
├── distribution key: []
├── distribution key: [ 2 ]
└── read pk prefix len hint: 2
- id: nexmark_q106
Expand Down
Loading

0 comments on commit 27efddf

Please sign in to comment.