Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark Hive connector supports dynamic partition prunning #6943

Open
2 of 4 tasks
jackyjfhu opened this issue Feb 27, 2025 · 2 comments
Open
2 of 4 tasks

Spark Hive connector supports dynamic partition prunning #6943

jackyjfhu opened this issue Feb 27, 2025 · 2 comments
Labels

Comments

@jackyjfhu
Copy link

jackyjfhu commented Feb 27, 2025

Code of Conduct

Search before asking

  • I have searched in the issues and found no similar issues.

Describe the bug

kyuuiby:1.9.0
Spark:3.4.2

Currently, when connecting to hivemetastore and Spark through kyuubi-hive-connector and testing performance with TPC-DS, it is found that some SQL cannot be dynamically partitioned.

q33.sql

with ss as (
 select
          i_manufact_id,sum(ss_ext_sales_price) total_sales
 from
    store_sales,
    date_dim,
         customer_address,
         item
 where
         i_manufact_id in (select
  i_manufact_id
from
 item
where i_category in ('Home'))
 and     ss_item_sk              = i_item_sk
 and     ss_sold_date_sk         = d_date_sk
 and     d_year                  = 1998
 and     d_moy                   = 5
 and     ss_addr_sk              = ca_address_sk
 and     ca_gmt_offset           = -6 
 group by i_manufact_id),
 cs as (
 select
          i_manufact_id,sum(cs_ext_sales_price) total_sales
 from
    catalog_sales,
    date_dim,
         customer_address,
         item
 where
         i_manufact_id               in (select
  i_manufact_id
from
 item
where i_category in ('Home'))
 and     cs_item_sk              = i_item_sk
 and     cs_sold_date_sk         = d_date_sk
 and     d_year                  = 1998
 and     d_moy                   = 5
 and     cs_bill_addr_sk         = ca_address_sk
 and     ca_gmt_offset           = -6 
 group by i_manufact_id),
 ws as (
 select
          i_manufact_id,sum(ws_ext_sales_price) total_sales
 from
    web_sales,
    date_dim,
         customer_address,
         item
 where
         i_manufact_id               in (select
  i_manufact_id
from
 item
where i_category in ('Home'))
 and     ws_item_sk              = i_item_sk
 and     ws_sold_date_sk         = d_date_sk
 and     d_year                  = 1998
 and     d_moy                   = 5
 and     ws_bill_addr_sk         = ca_address_sk
 and     ca_gmt_offset           = -6
 group by i_manufact_id)
  select  i_manufact_id ,sum(total_sales) total_sales
 from  (select * from ss 
        union all
        select * from cs 
        union all
        select * from ws) tmp1
 group by i_manufact_id
 order by total_sales
limit 100

If kyuubi-hive-connecotor is not used, the execution plan will have dynamic partition pruning

Image

If kyuubi-hive-connecotor is used,

there will be no dynamic partition pruning in the execution plan

Image

Affects Version(s)

1.9.0

Kyuubi Server Log Output

Kyuubi Engine Log Output

Kyuubi Server Configurations

Kyuubi Engine Configurations

spark.plugins com.datastrato.gravitino.spark.connector.plugin.GravitinoSparkPlugin
spark.sql.catalog.cos8lcu4e_hive.hadoop.hive.metastore.token.signature thrift://xxx:7004,thrift://xxxx:7004
spark.sql.catalog.cos8lcu4e_iceberg.hadoop.hive.metastore.token.signature thrift://xxxx:7004,thrift://xxxx:7004
spark.sql.catalog.default_catalog_hive_0o29qw56.hadoop.hive.metastore.token.signature thrift://xxx:40016
spark.sql.catalog.default_catalog_iceberg_0o29qw56.hadoop.hive.metastore.token.signature thrift://xxx:40016
spark.sql.catalog.kyuubi_cos8lcu4e_hive org.apache.kyuubi.spark.connector.hive.HiveTableCatalog
spark.sql.catalog.kyuubi_cos8lcu4e_hive.hadoop.hive.metastore.token.signature thrift://xxx:7004,thrift://xxx:7004
spark.sql.catalog.kyuubi_cos8lcu4e_hive.hive.metastore.kerberos.principal hadoop/_HOST@TBDS-0O29QW56
spark.sql.catalog.kyuubi_cos8lcu4e_hive.hive.metastore.uris thrift://xxx:7004,thrift://xxxx:7004
spark.sql.catalog.kyuubi_default_catalog_hive_0o29qw56 org.apache.kyuubi.spark.connector.hive.HiveTableCatalog
spark.sql.catalog.kyuubi_default_catalog_hive_0o29qw56.hadoop.hive.metastore.token.signature thrift://xxx:40016
spark.sql.catalog.kyuubi_default_catalog_hive_0o29qw56.hive.metastore.kerberos.principal hadoop/xxxx@xxx
spark.sql.catalog.kyuubi_default_catalog_hive_0o29qw56.hive.metastore.uris thrift://xxx:xxx
spark.sql.catalog.origin_cos8lcu4e_iceberg org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.origin_cos8lcu4e_iceberg.hadoop.hive.metastore.token.signature thrift://xxxx:7004,thrift://xxx:7004
spark.sql.catalog.origin_cos8lcu4e_iceberg.type hive
spark.sql.catalog.origin_cos8lcu4e_iceberg.uri thrift://xxx:7004,thrift://xxx:7004
spark.sql.catalog.origin_default_catalog_iceberg_0o29qw56 org.apache.iceberg.spark.SparkCatalog

Additional context

Dynamic partition cutting core logic:

private def prune(plan: LogicalPlan): LogicalPlan = {
plan transformUp {
.......
var filterableScan = getFilterableTableScan(l, left)
if (filterableScan.isDefined && canPruneLeft(joinType) &&
hasPartitionPruningFilter(right)) {
newLeft = insertPredicate(l, newLeft, r, right, rightKeys, filterableScan.get)
} else {
filterableScan = getFilterableTableScan(r, right)
if (filterableScan.isDefined && canPruneRight(joinType) &&
hasPartitionPruningFilter(left) ) {
newRight = insertPredicate(r, newRight, l, left, leftKeys, filterableScan.get)
}
}
case _ =>
}
.......
Join(newLeft, newRight, joinType, Some(condition), hint)
}
}

def getFilterableTableScan(a: Expression, plan: LogicalPlan): Option[LogicalPlan] = {
val srcInfo: Option[(Expression, LogicalPlan)] = findExpressionAndTrackLineageDown(a, plan)
srcInfo.flatMap {
case (resExp, l: LogicalRelation) =>
l.relation match {
case fs: HadoopFsRelation =>
val partitionColumns = AttributeSet(
l.resolve(fs.partitionSchema, fs.sparkSession.sessionState.analyzer.resolver))
if (resExp.references.subsetOf(partitionColumns)) {
return Some(l)
} else {
None
}
case _ => None
}
case (resExp, l: HiveTableRelation) =>
if (resExp.references.subsetOf(AttributeSet(l.partitionCols))) {
return Some(l)
} else {
None
}
case (resExp, r @ DataSourceV2ScanRelation(_, scan: SupportsRuntimeV2Filtering, _, _, _)) =>
val filterAttrs = V2ExpressionUtils.resolveRefs[Attribute](scan.filterAttributes, r)
if (resExp.references.subsetOf(AttributeSet(filterAttrs))) {
Some(r)
} else {
None
}
case _ => None
}
}

Image

Image

Image

Image

Are you willing to submit PR?

  • Yes. I would be willing to submit a PR with guidance from the Kyuubi community to fix.
  • No. I cannot submit a PR at this time.
@jackyjfhu jackyjfhu added kind:bug This is a clearly a bug priority:major labels Feb 27, 2025
@pan3793
Copy link
Member

pan3793 commented Feb 27, 2025

The DPP requires the data source to implement either SupportsRuntimeFiltering or SupportsRuntimeV2Filtering, unfortunately, KSHC does not.

This is a feature request, not a bug.

@pan3793 pan3793 added kind:feature Feature request and removed kind:bug This is a clearly a bug labels Feb 27, 2025
@pan3793 pan3793 changed the title [Bug] when I use kyuubi's connector, Spark dynamic partitioning cannot be used. Spark Hive connector supports dynamic partition prunning Feb 27, 2025
@jackyjfhu
Copy link
Author

The DPP requires the data source to implement either SupportsRuntimeFiltering or SupportsRuntimeV2Filtering, unfortunately, KSHC does not.

This is a feature request, not a bug.
OK,thanks for reply

another question,when I execute this sql: ANALYZE TABLE COMPUTE STATISTICS FOR ALL COLUMNS,I encountered the following error

Image

KSHC Through KSHC, Spark will parse the table into a V2 table

Image

Image

Image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants