-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48530][SQL] Support for local variables in SQL Scripting #49445
base: master
Are you sure you want to change the base?
Changes from 26 commits
73cb01b
813d282
1c08f57
18da02f
cee5f1a
47934ab
399d4e8
6efe764
769607d
6225956
241fc05
068e1ec
60335db
65b69d3
fe5dc7b
4f8d2c1
ba5b8d2
33f0aac
be6052f
4b1e8e1
90b106b
7ba0923
cd4e932
fdf3c5a
52cbd17
c134fd4
3ea762d
8e9352a
78042e3
40ffa83
4a546a4
15d5554
a2b20c5
e3077a4
6ce8f9c
ccab52c
370bf65
0cea838
9895c69
cd888dd
4fe7ab5
8a6b536
db573c1
dadd517
680e5d7
7d3008e
901aa6c
34677c7
b814b97
8074c63
220aeae
45ca867
e1f1098
61a753f
f29a8fc
e184f8c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,9 +19,11 @@ package org.apache.spark.sql.catalyst.analysis | |
|
||
import scala.jdk.CollectionConverters._ | ||
|
||
import org.apache.spark.sql.AnalysisException | ||
import org.apache.spark.sql.catalyst.plans.logical._ | ||
import org.apache.spark.sql.catalyst.rules.Rule | ||
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, SupportsNamespaces} | ||
import org.apache.spark.sql.errors.DataTypeErrors.toSQLId | ||
import org.apache.spark.sql.errors.QueryCompilationErrors | ||
import org.apache.spark.util.ArrayImplicits._ | ||
|
||
|
@@ -35,10 +37,28 @@ class ResolveCatalogs(val catalogManager: CatalogManager) | |
// We only support temp variables for now and the system catalog is not properly implemented | ||
// yet. We need to resolve `UnresolvedIdentifier` for variable commands specially. | ||
case c @ CreateVariable(UnresolvedIdentifier(nameParts, _), _, _) => | ||
val resolved = resolveVariableName(nameParts) | ||
// From scripts we can only create local variables, which must be unqualified, | ||
// and must not be DECLARE OR REPLACE. | ||
if (catalogManager.scriptingLocalVariableManager.isDefined) { | ||
// TODO [SPARK-50785]: Uncomment this when For Statement starts properly using local vars. | ||
// if (c.replace) { | ||
// throw new AnalysisException( | ||
// "INVALID_VARIABLE_DECLARATION.REPLACE_LOCAL_VARIABLE", | ||
// Map("varName" -> toSQLId(nameParts)) | ||
// ) | ||
// } | ||
|
||
if (nameParts.length != 1) { | ||
dusantism-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
throw new AnalysisException( | ||
"INVALID_VARIABLE_DECLARATION.QUALIFIED_LOCAL_VARIABLE", | ||
Map("varName" -> toSQLId(nameParts))) | ||
} | ||
} | ||
|
||
val resolved = resolveCreateVariableName(nameParts) | ||
c.copy(name = resolved) | ||
case d @ DropVariable(UnresolvedIdentifier(nameParts, _), _) => | ||
dusantism-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
val resolved = resolveVariableName(nameParts) | ||
val resolved = resolveDropVariableName(nameParts) | ||
d.copy(name = resolved) | ||
|
||
case UnresolvedIdentifier(nameParts, allowTemp) => | ||
|
@@ -73,28 +93,39 @@ class ResolveCatalogs(val catalogManager: CatalogManager) | |
} | ||
} | ||
|
||
private def resolveVariableName(nameParts: Seq[String]): ResolvedIdentifier = { | ||
def ident: Identifier = Identifier.of(Array(CatalogManager.SESSION_NAMESPACE), nameParts.last) | ||
if (nameParts.length == 1) { | ||
private def resolveCreateVariableName(nameParts: Seq[String]): ResolvedIdentifier = { | ||
val ident = catalogManager.scriptingLocalVariableManager | ||
.getOrElse(catalogManager.tempVariableManager) | ||
.createIdentifier(nameParts.last) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, what do you mean already created? Here we create the identifier, which is dependent on scripting context in the case of local variables, and then we return it in |
||
|
||
resolveVariableName(nameParts, ident) | ||
} | ||
|
||
private def resolveDropVariableName(nameParts: Seq[String]): ResolvedIdentifier = { | ||
// Only session variables can be dropped, so catalogManager.scriptingLocalVariableManager | ||
// is not checked in the case of DropVariable. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so even if there is name conflict between session and local variables, DROP VARIABLE always drop session variable? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, DROP will never consider local variables. It only works on session variables, per spec. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cc @srielau can you chime in here? My current understanding is that we don't want to allow drop of local variables - and that makes total sense. However, the current agreement/implementation completely ignores local variables when resolving variable references in the input:
output: 3 input:
result: session var will be dropped (x = 1) Here, we can see that the meaning of Anyways, I think that the second example should throw an exception stating that the local variables cannot be dropped (because Simple example why this might be important - customer may want to drop local var (not aware that it's not allowed, or by mistake) and instead of getting an exception, the session variable would be silently dropped. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 to fail explicitly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @srielau Could you provide your input here? |
||
val ident = catalogManager.tempVariableManager.createIdentifier(nameParts.last) | ||
resolveVariableName(nameParts, ident) | ||
} | ||
|
||
private def resolveVariableName( | ||
nameParts: Seq[String], | ||
ident: Identifier): ResolvedIdentifier = nameParts.length match { | ||
case 1 => ResolvedIdentifier(FakeSystemCatalog, ident) | ||
|
||
// On declare variable, local variables support only unqualified names. | ||
// On drop variable, local variables are not supported at all. | ||
case 2 if nameParts.head.equalsIgnoreCase(CatalogManager.SESSION_NAMESPACE) => | ||
ResolvedIdentifier(FakeSystemCatalog, ident) | ||
} else if (nameParts.length == 2) { | ||
if (nameParts.head.equalsIgnoreCase(CatalogManager.SESSION_NAMESPACE)) { | ||
ResolvedIdentifier(FakeSystemCatalog, ident) | ||
} else { | ||
throw QueryCompilationErrors.unresolvedVariableError( | ||
nameParts, Seq(CatalogManager.SYSTEM_CATALOG_NAME, CatalogManager.SESSION_NAMESPACE)) | ||
} | ||
} else if (nameParts.length == 3) { | ||
if (nameParts(0).equalsIgnoreCase(CatalogManager.SYSTEM_CATALOG_NAME) && | ||
nameParts(1).equalsIgnoreCase(CatalogManager.SESSION_NAMESPACE)) { | ||
ResolvedIdentifier(FakeSystemCatalog, ident) | ||
} else { | ||
throw QueryCompilationErrors.unresolvedVariableError( | ||
nameParts, Seq(CatalogManager.SYSTEM_CATALOG_NAME, CatalogManager.SESSION_NAMESPACE)) | ||
} | ||
} else { | ||
|
||
// When there are 3 nameParts the variable must be a fully qualified session variable | ||
// i.e. "system.session.<varName>" | ||
case 3 if nameParts(0).equalsIgnoreCase(CatalogManager.SYSTEM_CATALOG_NAME) && | ||
nameParts(1).equalsIgnoreCase(CatalogManager.SESSION_NAMESPACE) => | ||
ResolvedIdentifier(FakeSystemCatalog, ident) | ||
|
||
case _ => | ||
throw QueryCompilationErrors.unresolvedVariableError( | ||
nameParts, Seq(CatalogManager.SYSTEM_CATALOG_NAME, CatalogManager.SESSION_NAMESPACE)) | ||
} | ||
nameParts, Seq(CatalogManager.SYSTEM_CATALOG_NAME, ident.namespace().head)) | ||
dusantism-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
session
can't be a label name so we can directly look up local variables ifnameParts.length <= 2
and then fallback to session variable lookup.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is fine to leave the explicit checks here, because it's more performant this way as local variable lookup will iterate through all frames and scopes. There's no reason to do that if we have session or system.session. Also if we have it explicitly it will be safer if we make changes in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. How about this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
local variable can only be qualified by one label, right? see https://github.com/apache/spark/pull/49445/files#r1913307900
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we already have this check in the line
.filter(_ => namePartsCaseAdjusted.nonEmpty && namePartsCaseAdjusted.length <= 2)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then the code here can be simplified
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simplified to single filter