Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34944][SQL][TESTS] Replace bigint with int for web_returns and store_returns in TPCDS tests to employ correct data type #32037

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -485,11 +485,10 @@ jobs:
restore-keys: |
tpcds-
- name: Checkout TPC-DS (SF=1) generated data repository
if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to always fetch the data?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It failed the first time, so I turned it off, shall I turn it on now

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

13:37:51.854 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 315.0 (TID 204)
java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary
	at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:45)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToInt(ParquetDictionary.java:38)
	at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getInt(OnHeapColumnVector.java:298)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:344)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1437)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Do we need to disable the cache and merge this PR, then enable the cache again? Still seeing the above error if cache on

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the mechanism to refresh the cache?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's based on the catch key here https://github.com/apache/spark/pull/32037/files#diff-48c0ee97c53013d18d6bbae44648f7fab9af2e0bf5b0dc1ca761e18ec5c478f2R484, which is based on the cache data itself(seems a mistake we made before?), and seems that it never changed if we have no opportunity to change the cache data entirely by disabling it. Or let me try to modify the key to rely on the lastest revision of maropu/tpcds-sf-1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rely on the lastest revision of maropu/tpcds-sf-1

SGTM

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may still hit the cache by fuzzy matching of restore-keys which seems useless for this case. if fails again, I will remove it too and try again.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it looks better to check if it can work correctly without restore-keys.

uses: actions/checkout@v2
with:
repository: maropu/spark-tpcds-sf-1
ref: 6b660a53091bd6d23cbe58b0f09aae08e71cc667
ref: 556111e35d400f56cb0625dc16e9063d54628320
path: ./tpcds-sf-1
- name: Cache Scala, SBT and Maven
uses: actions/cache@v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ Batched: true
Location: InMemoryFileIndex []
PartitionFilters: [isnotnull(sr_returned_date_sk#4), dynamicpruningexpression(sr_returned_date_sk#4 IN dynamicpruning#5)]
PushedFilters: [IsNotNull(sr_store_sk), IsNotNull(sr_customer_sk)]
ReadSchema: struct<sr_customer_sk:bigint,sr_store_sk:bigint,sr_return_amt:decimal(7,2)>
ReadSchema: struct<sr_customer_sk:int,sr_store_sk:int,sr_return_amt:decimal(7,2)>

(2) ColumnarToRow [codegen id : 2]
Input [4]: [sr_customer_sk#1, sr_store_sk#2, sr_return_amt#3, sr_returned_date_sk#4]
Expand Down Expand Up @@ -87,7 +87,7 @@ Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))

(9) BroadcastHashJoin [codegen id : 2]
Left keys [1]: [sr_returned_date_sk#4]
Right keys [1]: [cast(d_date_sk#6 as bigint)]
Right keys [1]: [d_date_sk#6]
Join condition: None

(10) Project [codegen id : 2]
Expand Down Expand Up @@ -122,7 +122,7 @@ Batched: true
Location: InMemoryFileIndex []
PartitionFilters: [isnotnull(sr_returned_date_sk#19), dynamicpruningexpression(sr_returned_date_sk#19 IN dynamicpruning#5)]
PushedFilters: [IsNotNull(sr_store_sk)]
ReadSchema: struct<sr_customer_sk:bigint,sr_store_sk:bigint,sr_return_amt:decimal(7,2)>
ReadSchema: struct<sr_customer_sk:int,sr_store_sk:int,sr_return_amt:decimal(7,2)>

(16) ColumnarToRow [codegen id : 4]
Input [4]: [sr_customer_sk#16, sr_store_sk#17, sr_return_amt#18, sr_returned_date_sk#19]
Expand All @@ -136,7 +136,7 @@ Output [1]: [d_date_sk#20]

(19) BroadcastHashJoin [codegen id : 4]
Left keys [1]: [sr_returned_date_sk#19]
Right keys [1]: [cast(d_date_sk#20 as bigint)]
Right keys [1]: [d_date_sk#20]
Join condition: None

(20) Project [codegen id : 4]
Expand Down Expand Up @@ -185,7 +185,7 @@ Condition : isnotnull((avg(ctr_total_return) * 1.2)#31)

(28) BroadcastExchange
Input [2]: [(avg(ctr_total_return) * 1.2)#31, ctr_store_sk#14#32]
Arguments: HashedRelationBroadcastMode(List(input[1, bigint, true]),false), [id=#33]
Arguments: HashedRelationBroadcastMode(List(cast(input[1, int, true] as bigint)),false), [id=#33]

(29) BroadcastHashJoin [codegen id : 8]
Left keys [1]: [ctr_store_sk#14]
Expand Down Expand Up @@ -220,7 +220,7 @@ Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))

(36) BroadcastHashJoin [codegen id : 8]
Left keys [1]: [ctr_store_sk#14]
Right keys [1]: [cast(s_store_sk#34 as bigint)]
Right keys [1]: [s_store_sk#34]
Join condition: None

(37) Project [codegen id : 8]
Expand Down Expand Up @@ -251,15 +251,15 @@ Condition : isnotnull(c_customer_sk#38)

(43) Exchange
Input [2]: [c_customer_sk#38, c_customer_id#39]
Arguments: hashpartitioning(cast(c_customer_sk#38 as bigint), 5), ENSURE_REQUIREMENTS, [id=#40]
Arguments: hashpartitioning(c_customer_sk#38, 5), ENSURE_REQUIREMENTS, [id=#40]

(44) Sort [codegen id : 11]
Input [2]: [c_customer_sk#38, c_customer_id#39]
Arguments: [cast(c_customer_sk#38 as bigint) ASC NULLS FIRST], false, 0
Arguments: [c_customer_sk#38 ASC NULLS FIRST], false, 0

(45) SortMergeJoin [codegen id : 12]
Left keys [1]: [ctr_customer_sk#13]
Right keys [1]: [cast(c_customer_sk#38 as bigint)]
Right keys [1]: [c_customer_sk#38]
Join condition: None

(46) Project [codegen id : 12]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ TakeOrderedAndProject [c_customer_id]
Project [ctr_customer_sk]
BroadcastHashJoin [ctr_store_sk,s_store_sk]
Project [ctr_customer_sk,ctr_store_sk]
BroadcastHashJoin [ctr_store_sk,ctr_store_skL,ctr_total_return,(avg(ctr_total_return) * 1.2)]
BroadcastHashJoin [ctr_store_sk,ctr_store_sk,ctr_total_return,(avg(ctr_total_return) * 1.2)]
Filter [ctr_total_return]
HashAggregate [sr_customer_sk,sr_store_sk,sum] [sum(UnscaledValue(sr_return_amt)),ctr_customer_sk,ctr_store_sk,ctr_total_return,sum]
InputAdapter
Expand All @@ -38,7 +38,7 @@ TakeOrderedAndProject [c_customer_id]
BroadcastExchange #4
WholeStageCodegen (6)
Filter [(avg(ctr_total_return) * 1.2)]
HashAggregate [ctr_store_sk,sum,count] [avg(ctr_total_return),(avg(ctr_total_return) * 1.2),ctr_store_skL,sum,count]
HashAggregate [ctr_store_sk,sum,count] [avg(ctr_total_return),(avg(ctr_total_return) * 1.2),ctr_store_sk,sum,count]
InputAdapter
Exchange [ctr_store_sk] #5
WholeStageCodegen (5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ Batched: true
Location: InMemoryFileIndex []
PartitionFilters: [isnotnull(sr_returned_date_sk#4), dynamicpruningexpression(sr_returned_date_sk#4 IN dynamicpruning#5)]
PushedFilters: [IsNotNull(sr_store_sk), IsNotNull(sr_customer_sk)]
ReadSchema: struct<sr_customer_sk:bigint,sr_store_sk:bigint,sr_return_amt:decimal(7,2)>
ReadSchema: struct<sr_customer_sk:int,sr_store_sk:int,sr_return_amt:decimal(7,2)>

(2) ColumnarToRow [codegen id : 2]
Input [4]: [sr_customer_sk#1, sr_store_sk#2, sr_return_amt#3, sr_returned_date_sk#4]
Expand Down Expand Up @@ -84,7 +84,7 @@ Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))

(9) BroadcastHashJoin [codegen id : 2]
Left keys [1]: [sr_returned_date_sk#4]
Right keys [1]: [cast(d_date_sk#6 as bigint)]
Right keys [1]: [d_date_sk#6]
Join condition: None

(10) Project [codegen id : 2]
Expand Down Expand Up @@ -119,7 +119,7 @@ Batched: true
Location: InMemoryFileIndex []
PartitionFilters: [isnotnull(sr_returned_date_sk#19), dynamicpruningexpression(sr_returned_date_sk#19 IN dynamicpruning#5)]
PushedFilters: [IsNotNull(sr_store_sk)]
ReadSchema: struct<sr_customer_sk:bigint,sr_store_sk:bigint,sr_return_amt:decimal(7,2)>
ReadSchema: struct<sr_customer_sk:int,sr_store_sk:int,sr_return_amt:decimal(7,2)>

(16) ColumnarToRow [codegen id : 4]
Input [4]: [sr_customer_sk#16, sr_store_sk#17, sr_return_amt#18, sr_returned_date_sk#19]
Expand All @@ -133,7 +133,7 @@ Output [1]: [d_date_sk#20]

(19) BroadcastHashJoin [codegen id : 4]
Left keys [1]: [sr_returned_date_sk#19]
Right keys [1]: [cast(d_date_sk#20 as bigint)]
Right keys [1]: [d_date_sk#20]
Join condition: None

(20) Project [codegen id : 4]
Expand Down Expand Up @@ -182,7 +182,7 @@ Condition : isnotnull((avg(ctr_total_return) * 1.2)#31)

(28) BroadcastExchange
Input [2]: [(avg(ctr_total_return) * 1.2)#31, ctr_store_sk#14#32]
Arguments: HashedRelationBroadcastMode(List(input[1, bigint, true]),false), [id=#33]
Arguments: HashedRelationBroadcastMode(List(cast(input[1, int, true] as bigint)),false), [id=#33]

(29) BroadcastHashJoin [codegen id : 9]
Left keys [1]: [ctr_store_sk#14]
Expand Down Expand Up @@ -217,7 +217,7 @@ Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))

(36) BroadcastHashJoin [codegen id : 9]
Left keys [1]: [ctr_store_sk#14]
Right keys [1]: [cast(s_store_sk#34 as bigint)]
Right keys [1]: [s_store_sk#34]
Join condition: None

(37) Project [codegen id : 9]
Expand All @@ -244,7 +244,7 @@ Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)

(42) BroadcastHashJoin [codegen id : 9]
Left keys [1]: [ctr_customer_sk#13]
Right keys [1]: [cast(c_customer_sk#37 as bigint)]
Right keys [1]: [c_customer_sk#37]
Join condition: None

(43) Project [codegen id : 9]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ TakeOrderedAndProject [c_customer_id]
Project [ctr_customer_sk]
BroadcastHashJoin [ctr_store_sk,s_store_sk]
Project [ctr_customer_sk,ctr_store_sk]
BroadcastHashJoin [ctr_store_sk,ctr_store_skL,ctr_total_return,(avg(ctr_total_return) * 1.2)]
BroadcastHashJoin [ctr_store_sk,ctr_store_sk,ctr_total_return,(avg(ctr_total_return) * 1.2)]
Filter [ctr_total_return]
HashAggregate [sr_customer_sk,sr_store_sk,sum] [sum(UnscaledValue(sr_return_amt)),ctr_customer_sk,ctr_store_sk,ctr_total_return,sum]
InputAdapter
Expand All @@ -32,7 +32,7 @@ TakeOrderedAndProject [c_customer_id]
BroadcastExchange #3
WholeStageCodegen (6)
Filter [(avg(ctr_total_return) * 1.2)]
HashAggregate [ctr_store_sk,sum,count] [avg(ctr_total_return),(avg(ctr_total_return) * 1.2),ctr_store_skL,sum,count]
HashAggregate [ctr_store_sk,sum,count] [avg(ctr_total_return),(avg(ctr_total_return) * 1.2),ctr_store_sk,sum,count]
InputAdapter
Exchange [ctr_store_sk] #4
WholeStageCodegen (5)
Expand Down
Loading