Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge SQL failing with ParseException #70

Closed
srinikvv opened this issue Jun 22, 2020 · 32 comments · May be fixed by #81
Closed

Merge SQL failing with ParseException #70

srinikvv opened this issue Jun 22, 2020 · 32 comments · May be fixed by #81
Assignees

Comments

@srinikvv
Copy link

srinikvv commented Jun 22, 2020

Hi Team,
I am trying to perform MERGE on HiveAcid talbes using Qubole Spark-ACID, but was facing below errors:

Created an assembly jar from latest code from mater and tried to execute MERGE statement using spark.sql from spark-shell:

/spark-2.4.3-bin-hadoop2.7/bin/spark-shell --jars /sandbox/spark-acid-assembly-0.5.0.jar --conf spark.sql.extensions=com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension --conf "spark.hadoop.yarn.timeline-service.enabled=false"
scala> spark.sql("MERGE INTO TARGET_TABLE AS T  USING SOURCE_TABLE AS S ON t.key_col= s.key_col WHEN MATCHED THEN UPDATE SET                    col_1 = s.col_1, col_2 = s.col_2 WHEN NOT MATCHED THEN INSERT VALUES ( s.key_col, s.col_1, s.col_2)")

org.apache.spark.sql.catalyst.parser.ParseException: mismatched input 'MERGE' expecting {'(', 'SELECT', 'FROM', 'ADD', 'DESC', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'INSERT', 'DELETE', 'DESCRIBE', 'EXPLAIN', 'SHOW', 'USE', 'DROP', 'ALTER', 'MAP', 'SET', 'RESET', 'START', 'COMMIT', 'ROLLBACK', 'REDUCE', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'DFS', 'TRUNCATE', 'ANALYZE', 'LIST', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'EXPORT', 'IMPORT', 'LOAD'}(line 1, pos 0)

== SQL ==
MERGE INTO TARGET_TABLE AS T  USING SOURCE_TABLE AS S ON t.key_col= s.key_col WHEN MATCHED THEN UPDATE SET                    col_1 = s.col_1, col_2 = s.col_2 WHEN NOT MATCHED THEN INSERT VALUES ( s.key_col, s.col_1, s.col_2)
^^^

  at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:241)
  at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:117)
  at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:48)
  at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:69)
  at com.qubole.spark.datasources.hiveacid.sql.execution.SparkAcidSqlParser.parsePlan(SparkAcidSqlParser.scala:56)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
  ... 49 elided
@amoghmargoor
Copy link
Collaborator

@srinikvv how was this created: /sandbox/spark-acid-assembly-0.5.0.jar ?

@amoghmargoor amoghmargoor self-assigned this Jun 22, 2020
@srinikvv
Copy link
Author

@amoghmargoor I

@srinikvv how was this created: /sandbox/spark-acid-assembly-0.5.0.jar ?

I built this assembly jar with latest code from master branch

@amoghmargoor
Copy link
Collaborator

amoghmargoor commented Jun 22, 2020

@srinikvv Can you check with this jar once: https://drive.google.com/file/d/1sqsFzUtyrWvXfE7g_Q8brNHqMnb14Gvv/view?usp=sharing ? And also share your jar please, if the one i provided works.

@srinikvv
Copy link
Author

@amoghmargoor Tried the jar provided and still see the same issue.

@amoghmargoor
Copy link
Collaborator

amoghmargoor commented Jun 22, 2020

@srinikvv this is working fine, i rechecked. This would happen only if sql extension is not getting added on your end. It is difficult for me to figure out why that would not get added. But can you check that angle by

@srinikvv
Copy link
Author

srinikvv commented Jun 23, 2020

@amoghmargoor I checked UPDATE is working fine.
Also the error stack trace in case of MERGE indicates an exception at:


Hence I believe this is not an issue with sql extension not getting added. Please check and advice

@amoghmargoor
Copy link
Collaborator

@srinikvv Can you check what is the token we get here:

? and is that function returning true or false ?

@amoghmargoor
Copy link
Collaborator

@srinikvv this stack trace you printed is not corresponding to current code too:
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:241) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:117) at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:48) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:69) at com.qubole.spark.datasources.hiveacid.sql.execution.SparkAcidSqlParser.parsePlan(SparkAcidSqlParser.scala:56)

Line 56 doesn't have function call to parsePlan. Please recheck your jars.

@srinikvv
Copy link
Author

@srinikvv this stack trace you printed is not corresponding to current code too:
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:241) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:117) at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:48) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:69) at com.qubole.spark.datasources.hiveacid.sql.execution.SparkAcidSqlParser.parsePlan(SparkAcidSqlParser.scala:56)

Line 56 doesn't have function call to parsePlan. Please recheck your jars.

@amoghmargoor below is from master branch, SparkAcidSqlParser.scala:56 is part of parsePlan function. Am I missing anything?
image

@amoghmargoor
Copy link
Collaborator

amoghmargoor commented Jun 23, 2020

@srinikvv
I think you are reading the stack trace wrong. If you look carefully at stack trace it is saying:

SparkSession.sql(SparkSession.scala:642) calls SparkAcidSqlParser.parsePlan i.e., SparkAcidSqlParser.parsePlan(SparkAcidSqlParser.scala:56). SparkAcidSqlParser.parsePlan(SparkAcidSqlParser.scala:56) will call AbstractSqlParser.parsePlan and that will call SparkSqlParser.parse and so on... So according to stack trace Line 56 of SparkAcidSqlParser.scala should have a function call to parsePlan but instead it has throw statement which means you are using old code still.

@amoghmargoor
Copy link
Collaborator

hey @srinikvv ... were you able to fix your issue ?

@srinikvv
Copy link
Author

srinikvv commented Jun 29, 2020

@amoghmargoor your suspicion is correct, spark-shell was using a cached/previous version of the spark-acid-assembly.jar. I was able to test the latest version using pySpark and conf "spark.driver.userClassPathFirst=true". However, I see below error while trying to activate the extension:

>>> spark.sparkContext._jvm.com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension() \
...             .apply(spark._jsparkSession.extensions())
********HiveAcidAutoConvertExtension***********
Traceback (most recent call last):
  File "<stdin>", line 2, in <module>
  File "/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o35.apply.
: java.lang.LinkageError: loader constraint violation: when resolving method "org.apache.spark.sql.SparkSessionExtensions.injectResolutionRule(Lscala/Function1;)V" the class loader (instance of org/apache/spark/util/ChildFirstURLClassLoader) of the current class, com/qubole/spark/hiveacid/HiveAcidAutoConvertExtension, and the class loader (instance of sun/misc/Launcher$AppClassLoader) for the method's defining class, org/apache/spark/sql/SparkSessionExtensions, have different Class objects for the type scala/Function1 used in the signature
        at com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension.apply(HiveAcidAutoConvert.scala:72)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:745)

@amoghmargoor
Copy link
Collaborator

oh ... something like this just works for me:
bin/pyspark --master local[1] --jars /Users/amoghm/src/apache/spark-acid/target/scala-2.11/spark-acid-assembly-0.5.0.jar --conf spark.sql.extensions=com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension

@srinikvv
Copy link
Author

srinikvv commented Jul 3, 2020

@amoghmargoor I was finally able to get this working after downloading Hadoop2.8.2 binaries and setting SPARK_DIST_CLASSPATH to reference these libraries.

However the MERGE syntax only works with ACID tables which are not bucketed and I see below exception when used against ACID bucketed tables:

java.lang.RuntimeException: Unsupported operation type - MERGE for Bucketed table stage.tmp_ren_data_set
        at com.qubole.spark.hiveacid.HiveAcidErrors$.unsupportedOperationTypeBucketedTable(HiveAcidErrors.scala:53)
        at com.qubole.spark.hiveacid.HiveAcidOperationDelegate.checkForSupport(AcidOperationDelegate.scala:471)

As per the documentation on Apache Hive Confluence all ACID tables must be bucketed, hence MERGE statement with Spark-Acid without support for bucketed tables is not practically usable.

Do you have any plans in the near future to support MERGE on bucketed tables?

@amoghmargoor
Copy link
Collaborator

Hi @srinikvv ,
good to hear you could fix the issue. is this something we can add to FAQ or Troubleshooting guide that can help others ?

Btw regarding bucketed tables, Hive ACID does not require them anymore. This was the restriction with earlier implementation of Acid which was changed and now this restriction does not hold. We create non-bucketed Hive ACID table all the time internally in Qubole. If you are using Hive 3.1 and onwards you should be good. We did not add bucketed table support because bucketing hashes are not same across engine.

@srinikvv
Copy link
Author

srinikvv commented Jul 6, 2020

@amoghmargoor Yes this can be added to FAQ or Troubleshooting. I believe the issue is when using a spark binaries compiled with hadoop libraries < 2.8.2 (was using spark-2.4.3-bin-hadoop2.7). As a workaround we have downloaded hadoop 2.8.2 libraries and setting SPARK_DIST_CLASSPATH to refer the new hadoop libraries as below before running the spark-submit command:
export SPARK_DIST_CLASSPATH=$(<path_to_hadoop_2.8.2_bin>/hadoop classpath)

A better approach may be is to build a spark 2.4.3 with Hadoop 2.8.2 binaries. I am currently trying this and let you know the result.

Also I tried to perform merge on non bucketed ACID tables, was facing org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException for a specific table, below is the stack trace:

20/07/06 07:58:26 INFO MergeImpl: MERGE Clause 1: UPDATE being executed 20/07/06 07:58:26 INFO TableWriter: Write Operation being performed to table booking_f: UPDATE 20/07/06 07:59:16 WARN TaskSetManager: Lost task 50.0 in stage 10.0 (TID 1753, hdop-stg-wrk2.vmware.com, executor 3): org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): /warehouse/tablespace/managed/hive/base.db/booking_f/delete_delta_0000011_0000011_0000/bucket_02359 path already being created, clientname=DFSClient_NONMAPREDUCE_1168652467_87 at org.apache.hadoop.ipc.Client.call(Client.java:1475) at org.apache.hadoop.ipc.Client.call(Client.java:1412) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) at com.sun.proxy.$Proxy15.create(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:296) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy16.create(Unknown Source) at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1648) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1689) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1624) at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448) at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:444) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:459) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:387) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892) at com.qubole.shaded.orc.impl.PhysicalFsWriter.<init>(PhysicalFsWriter.java:95) at com.qubole.shaded.orc.impl.WriterImpl.<init>(WriterImpl.java:177) at com.qubole.shaded.hadoop.hive.ql.io.orc.WriterImpl.<init>(WriterImpl.java:94) at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcFile.createWriter(OrcFile.java:334) at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.addSplitUpdateEvent(OrcRecordUpdater.java:456) at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.update(OrcRecordUpdater.java:498) at com.qubole.spark.hiveacid.writer.hive.HiveAcidFullAcidWriter.process(HiveAcidWriter.scala:275) at com.qubole.spark.hiveacid.writer.TableWriter$$anon$1$$anonfun$apply$1.apply(TableWriter.scala:146) at com.qubole.spark.hiveacid.writer.TableWriter$$anon$1$$anonfun$apply$1.apply(TableWriter.scala:146) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at com.qubole.spark.hiveacid.writer.TableWriter$$anon$1.apply(TableWriter.scala:146) at com.qubole.spark.hiveacid.writer.TableWriter$$anon$1.apply(TableWriter.scala:138) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
This looks similar to 41. However I realized that the merge statement is failing while trying to create a delete_delta folder. Can you assist me here?

@srinikvv
Copy link
Author

srinikvv commented Jul 6, 2020

@amoghmargoor is there a plan to modify the plugin to work on Spark 3?
We want to explore some feature of Spark3 and it also comes pre-compiled with Hadoop 3.2 which can help avoid the earlier issue out of the box.
FYI: I already tried initializing spark-acid plugin with Spark 3 but it fails with below error:
Py4JJavaError: An error occurred while calling o28.sessionState. : java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging$class at com.qubole.spark.datasources.hiveacid.sql.execution.SparkAcidSqlParser.<init>(SparkAcidSqlParser.scala:20) at com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension$$anonfun$apply$3.apply(HiveAcidAutoConvert.scala:74) at com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension$$anonfun$apply$3.apply(HiveAcidAutoConvert.scala:73) at org.apache.spark.sql.SparkSessionExtensions.$anonfun$buildParser$1(SparkSessionExtensions.scala:205) at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68) at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:49) at org.apache.spark.sql.SparkSessionExtensions.buildParser(SparkSessionExtensions.scala:204) at org.apache.spark.sql.internal.BaseSessionStateBuilder.sqlParser$lzycompute(BaseSessionStateBuilder.scala:129) at org.apache.spark.sql.internal.BaseSessionStateBuilder.sqlParser(BaseSessionStateBuilder.scala:128) at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:329) at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1107) at org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:157) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:155) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:152) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:745)

@srinikvv
Copy link
Author

srinikvv commented Jul 6, 2020

I also wanted to report another error I observed while trying to perform merge using a spark table (RDD based table created using df.createOrReplaceTempView) as source.

An error occurred while calling o82.sql. : java.lang.UnsupportedOperationException: Update Dataframe doesn't have expected schema. Provided: StructField(rowId,StructType(StructField(writeId,LongType,true), StructField(bucketId,IntegerType,true), StructField(rowId,LongType,true)),true),StructField(booking_id,LongType,true),StructField(edw_create_date,TimestampType,true),StructField(edw_update_date,TimestampType,false) Expected: StructField(rowId,StructType(StructField(writeId,LongType,true), StructField(bucketId,IntegerType,true), StructField(rowId,LongType,true)),true),StructField(booking_id,LongType,true),StructField(edw_create_date,TimestampType,true),StructField(edw_update_date,TimestampType,true) at com.qubole.spark.hiveacid.HiveAcidOperationDelegate.mergeUpdate(AcidOperationDelegate.scala:403) at com.qubole.spark.hiveacid.merge.MergeImpl$$anonfun$runMergeOperations$2.apply(MergeImpl.scala:218) at com.qubole.spark.hiveacid.merge.MergeImpl$$anonfun$runMergeOperations$2.apply(MergeImpl.scala:210) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike.scala:44) at scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:37) at com.qubole.spark.hiveacid.merge.MergeImpl.runMergeOperations(MergeImpl.scala:210) at com.qubole.spark.hiveacid.merge.MergeImpl.run(MergeImpl.scala:196) at com.qubole.spark.hiveacid.HiveAcidOperationDelegate.merge(AcidOperationDelegate.scala:444) at com.qubole.spark.hiveacid.HiveAcidTable$$anonfun$merge$1.apply(HiveAcidTable.scala:199) at com.qubole.spark.hiveacid.HiveAcidTable$$anonfun$merge$1.apply(HiveAcidTable.scala:199) at com.qubole.spark.hiveacid.HiveTxnWrapper.inTxnRetry$1(HiveAcidTable.scala:330) at com.qubole.spark.hiveacid.HiveTxnWrapper.inTxn(HiveAcidTable.scala:355) at com.qubole.spark.hiveacid.HiveAcidTable.merge(HiveAcidTable.scala:198) at com.qubole.spark.hiveacid.datasource.HiveAcidRelation.merge(HiveAcidRelation.scala:98) at com.qubole.spark.datasources.hiveacid.sql.catalyst.plans.command.MergeCommand.run(MergeCommand.scala:51) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79) at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194) at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194) at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:745)
The error suggests that the schema of the source and the target does not match. However the only discrepancy is that the column edw_update_date was nullable in Target but this column was passed as current_timestamp() in source, which makes it a not null column in source schema. This however should not cause any error while performing the merge.
@amoghmargoor please let me know if you need me to open another issue for this

@amoghmargoor
Copy link
Collaborator

amoghmargoor commented Jul 7, 2020

@srinikvv W.r.t org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException, is this being thrown on task retries ? similar to issue #43 ? That issue has been fixed now.

reg, Spark 3 we are yet to start work on it. My guess is we might start looking at it July end.

reg the type issue, yes data transfer from non-nullable to nullable should be allowed. we will take a look at it. You can get unblocked by explicitly assigning schema to your source DF that matches target schema.

@amoghmargoor
Copy link
Collaborator

hey @srinikvv , how are things ? were you able to get MERGE working ?

@srinikvv
Copy link
Author

srinikvv commented Jul 15, 2020

@srinikvv W.r.t org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException, is this being thrown on task retries ? similar to issue #43 ? That issue has been fixed now.

reg, Spark 3 we are yet to start work on it. My guess is we might start looking at it July end.

reg the type issue, yes data transfer from non-nullable to nullable should be allowed. we will take a look at it. You can get unblocked by explicitly assigning schema to your source DF that matches target schema.

@amoghmargoor I built a latest jar with code from master branch on 14-Jul and retested the failing merge statement, I still get this error for only a specific table. Below are the steps I do

  1. Perform MERGE on an empty table with full dateset --> completes successfully (approx 90M records)
  2. Wait until compaction is successful on this table
  3. Perform MERGE using a only changed data (13K records). Fails during the Update step with below error message
    I believe multiple Tasks are trying to write to the same file, not sure how to avoid this error.
    Note: The same Merge statement is running fine from Hive.
20/07/15 14:55:08 INFO MergeImpl: MERGE requires right outer join between Target and Source.
20/07/15 14:55:08 INFO MergeImpl: MERGE Clause 1: UPDATE being executed
20/07/15 14:55:08 INFO TableWriter: Write Operation being performed to table service_contract_f: UPDATE
20/07/15 14:55:12 WARN TaskSetManager: Lost task 132.0 in stage 45.0 (TID 2473, hdop-stg-wrk12.vmware.com, executor 9): org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): /warehouse/tablespace/managed/hive/base.db/service_contract_f/delete_delta_0000027_0000027_0000/bucket_00004 path already being created, clientname=DFSClient_NONMAPREDUCE_-1263939954_80
        at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1493)
        at org.apache.hadoop.ipc.Client.call(Client.java:1439)
        at org.apache.hadoop.ipc.Client.call(Client.java:1349)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
        at com.sun.proxy.$Proxy15.create(Unknown Source)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:297)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
        at com.sun.proxy.$Proxy16.create(Unknown Source)
        at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:269)
        at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1274)
        at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1216)
        at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:477)
        at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:474)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:474)
        at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:415)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1067)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1048)
        at com.qubole.shaded.orc.impl.PhysicalFsWriter.<init>(PhysicalFsWriter.java:95)
        at com.qubole.shaded.orc.impl.WriterImpl.<init>(WriterImpl.java:177)
        at com.qubole.shaded.hadoop.hive.ql.io.orc.WriterImpl.<init>(WriterImpl.java:94)
        at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcFile.createWriter(OrcFile.java:334)
        at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.addSplitUpdateEvent(OrcRecordUpdater.java:456)
        at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.update(OrcRecordUpdater.java:498)
        at com.qubole.spark.hiveacid.writer.hive.HiveAcidFullAcidWriter.process(HiveAcidWriter.scala:292)
        at com.qubole.spark.hiveacid.writer.TableWriter$$anon$1$$anonfun$apply$1.apply(TableWriter.scala:146)
        at com.qubole.spark.hiveacid.writer.TableWriter$$anon$1$$anonfun$apply$1.apply(TableWriter.scala:146)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.foreach(WholeStageCodegenExec.scala:634)
        at com.qubole.spark.hiveacid.writer.TableWriter$$anon$1.apply(TableWriter.scala:146)
        at com.qubole.spark.hiveacid.writer.TableWriter$$anon$1.apply(TableWriter.scala:138)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Please check this and let me know if I can do anything to fix this issue.

@srinikvv
Copy link
Author

srinikvv commented Jul 16, 2020

@amoghmargoor When we create ACID tables without bucketing, we see a lot of unevenly sized files underneath the HDFS storage. please let me know if we can do a Zoom session to show you this issue?

@amoghmargoor
Copy link
Collaborator

@srinikvv that would be great. I and @sourabh912 are in PST timezone and would like to join the call. Send some time slots that would work for you guys. also feel free to join the group [email protected]

@srinikvv
Copy link
Author

@amoghmargoor we can meet at 16July IST 9:00PM to 10:00PM, if that works for you guys.

@amoghmargoor
Copy link
Collaborator

hey @srinikvv I missed the message above. This timing should work for me on Friday (but I guess it would be Friday night for you guys). Or else on Monday too I would be available for the call. Let me know if that would be fine.

@srinikvv
Copy link
Author

@amoghmargoor lets meet Today. please use below zoom link:
Topic: Veera Venkata Rao's Zoom Meeting
Time: Jul 17, 2020 09:00 PM Mumbai, Kolkata, New Delhi

Join Zoom Meeting
https://VMware.zoom.us/j/98812494211?pwd=cjhqRktESTJ0L0l3elJxaVJ1YVMxUT09
Meeting ID: 988 1249 4211
Password: 494932

@amoghmargoor
Copy link
Collaborator

amoghmargoor commented Jul 17, 2020 via email

@srinikvv
Copy link
Author

srinikvv commented Jul 20, 2020

@amoghmargoor Appreciate you guys taking time to understand/debug the issue.
As discussed, please let us know the steps to perform update using an API. Also if you have the Debug Jar or Branch created with appropriate logger statements to help fix the issue.

@amoghmargoor
Copy link
Collaborator

amoghmargoor commented Jul 20, 2020

@srinikvv I think existing update API may not be able to support your use case. Can you try running by compiling the jars from here: https://github.com/amoghmargoor/spark-acid/pull/new/issue-70 ? I have added few logs. Run it with just 2 executors and provide me logs for the driver and both the executors after the failure. I may followup with few more such iterations. you can mail me the logs - [email protected], if you don't want to attach here.

Another question we had is: Was speculative execution also enabled ? Thanks.

@srinikvv
Copy link
Author

@amoghmargoor shared the logs via email. Reg Speculative execution, we are executing jobs with default value for spark.speculation (which is false for version Spark2.4.3)

@amoghmargoor
Copy link
Collaborator

@srinikvv We have figured out an issue why this could be happening and I have added the fix here: https://github.com/amoghmargoor/spark-acid/pull/new/issue-70. Can you recreate jar from the branch above and check if your issue has been fixed by it ?

@sourabh912
Copy link
Contributor

@srinikvv : Thanks again for reporting the issue. As part of this we identified few issues (issue #83 and issue #93) and fixed them.
I am closing this one. Please feel free to reopen it or open a new issue if you face any problem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants