Skip to content

Commit

Permalink
#963 Unimplemented PPL Sort Syntax (#994)
Browse files Browse the repository at this point in the history
* Initial implementation

Signed-off-by: currantw <[email protected]>

* Revert `cast` changes.

Signed-off-by: currantw <[email protected]>

* Revert changes, and update to mark syntax as deprecated and add TODOs.

Signed-off-by: currantw <[email protected]>

* Revert some unintended changes, minor doc change.

Signed-off-by: currantw <[email protected]>

* Address review comments, including reverting some changes.

Signed-off-by: currantw <[email protected]>

* Update sample queries for correctness

Signed-off-by: currantw <[email protected]>

* Update to use `eval` for casting

Signed-off-by: currantw <[email protected]>

* Update to mark sort syntax as "unimplemented" rather than "deprecated".

Signed-off-by: currantw <[email protected]>

* Update integration test, clarify TODO comments.

Signed-off-by: currantw <[email protected]>

* Clarify test name, use multi-line string.

Signed-off-by: currantw <[email protected]>

* Fix failing `FlintSparkPPLParseITSuite`

Signed-off-by: currantw <[email protected]>

* Remove note completely

Signed-off-by: currantw <[email protected]>

---------

Signed-off-by: currantw <[email protected]>
  • Loading branch information
currantw authored Jan 9, 2025
1 parent e42c535 commit 6610253
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 62 deletions.
2 changes: 1 addition & 1 deletion docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ source = table | where ispresent(a) |
- `source=accounts | parse email '.+@(?<host>.+)' | stats count() by host`
- `source=accounts | parse email '.+@(?<host>.+)' | eval eval_result=1 | fields host, eval_result`
- `source=accounts | parse email '.+@(?<host>.+)' | where age > 45 | sort - age | fields age, email, host`
- `source=accounts | parse address '(?<streetNumber>\d+) (?<street>.+)' | where streetNumber > 500 | sort num(streetNumber) | fields streetNumber, street`
- `source=accounts | parse address '(?<streetNumber>\d+) (?<street>.+)' | eval streetNumberInt = cast(streetNumber as integer) | where streetNumberInt > 500 | sort streetNumberInt | fields streetNumber, street`
- Limitation: [see limitations](ppl-parse-command.md#limitations)

#### **Grok**
Expand Down
2 changes: 1 addition & 1 deletion docs/ppl-lang/ppl-parse-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ The example shows how to sort street numbers that are higher than 500 in ``addre

PPL query:

os> source=accounts | parse address '(?<streetNumber>\d+) (?<street>.+)' | where cast(streetNumber as int) > 500 | sort num(streetNumber) | fields streetNumber, street ;
os> source=accounts | parse address '(?<streetNumber>\d+) (?<street>.+)' | eval streetNumberInt = cast(streetNumber as integer) | where streetNumberInt > 500 | sort streetNumberInt | fields streetNumber, street ;
fetched rows / total rows = 3/3
+----------------+----------------+
| streetNumber | street |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@

package org.opensearch.flint.spark.ppl

import scala.reflect.internal.Reporter.Count

import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq

import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Coalesce, Descending, GreaterThan, Literal, NullsFirst, NullsLast, RegExpExtract, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Cast, Descending, GreaterThan, Literal, NullsFirst, NullsLast, RegExpExtract, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, GlobalLimit, LocalLimit, LogicalPlan, Project, Sort}
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.types.IntegerType

class FlintSparkPPLParseITSuite
extends QueryTest
Expand Down Expand Up @@ -214,10 +211,16 @@ class FlintSparkPPLParseITSuite
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("test parse email & host expressions including cast and sort commands") {
val frame = sql(s"""
| source = $testTable| parse street_address '(?<streetNumber>\\d+) (?<street>.+)' | where streetNumber > 500 | sort num(streetNumber) | fields streetNumber, street
| """.stripMargin)
test("test parse street number & street expressions including cast and sort commands") {

// TODO #963: Implement 'num', 'str', and 'ip' sort syntax
val query = s"source = $testTable | " +
"parse street_address '(?<streetNumber>\\d+) (?<street>.+)' | " +
"eval streetNumberInt = cast(streetNumber as integer) | " +
"where streetNumberInt > 500 | " +
"sort streetNumberInt | " +
"fields streetNumber, street"
val frame = sql(query)
// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
Expand All @@ -233,36 +236,36 @@ class FlintSparkPPLParseITSuite
// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical

val addressAttribute = UnresolvedAttribute("street_address")
val streetAddressAttribute = UnresolvedAttribute("street_address")
val streetNumberAttribute = UnresolvedAttribute("streetNumber")
val streetAttribute = UnresolvedAttribute("street")
val streetNumberIntAttribute = UnresolvedAttribute("streetNumberInt")

val streetNumberExpression = Alias(
RegExpExtract(
addressAttribute,
Literal("(?<streetNumber>\\d+) (?<street>.+)"),
Literal("1")),
"streetNumber")()
val regexLiteral = Literal("(?<streetNumber>\\d+) (?<street>.+)")
val streetNumberExpression =
Alias(RegExpExtract(streetAddressAttribute, regexLiteral, Literal("1")), "streetNumber")()
val streetExpression =
Alias(RegExpExtract(streetAddressAttribute, regexLiteral, Literal("2")), "street")()

val streetExpression = Alias(
RegExpExtract(
addressAttribute,
Literal("(?<streetNumber>\\d+) (?<street>.+)"),
Literal("2")),
"street")()
val castExpression = Cast(streetNumberAttribute, IntegerType)

val expectedPlan = Project(
Seq(streetNumberAttribute, streetAttribute),
Sort(
Seq(SortOrder(streetNumberAttribute, Ascending, NullsFirst, Seq.empty)),
Seq(SortOrder(streetNumberIntAttribute, Ascending, NullsFirst, Seq.empty)),
global = true,
Filter(
GreaterThan(streetNumberAttribute, Literal(500)),
GreaterThan(streetNumberIntAttribute, Literal(500)),
Project(
Seq(addressAttribute, streetNumberExpression, streetExpression, UnresolvedStar(None)),
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))))))
Seq(UnresolvedStar(None), Alias(castExpression, "streetNumberInt")()),
Project(
Seq(
streetAddressAttribute,
streetNumberExpression,
streetExpression,
UnresolvedStar(None)),
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))))))

assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}

}
4 changes: 2 additions & 2 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ DATASOURCES: 'DATASOURCES';
USING: 'USING';
WITH: 'WITH';

// FIELD KEYWORDS
// SORT FIELD KEYWORDS
// TODO #963: Implement 'num', 'str', and 'ip' sort syntax
AUTO: 'AUTO';
STR: 'STR';
IP: 'IP';
NUM: 'NUM';


// FIELDSUMMARY keywords
FIELDSUMMARY: 'FIELDSUMMARY';
INCLUDEFIELDS: 'INCLUDEFIELDS';
Expand Down
11 changes: 7 additions & 4 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,8 @@ sortField

sortFieldExpression
: fieldExpression

// TODO #963: Implement 'num', 'str', and 'ip' sort syntax
| AUTO LT_PRTHS fieldExpression RT_PRTHS
| STR LT_PRTHS fieldExpression RT_PRTHS
| IP LT_PRTHS fieldExpression RT_PRTHS
Expand Down Expand Up @@ -1110,10 +1112,6 @@ keywordsCanBeId
| INDEX
| DESC
| DATASOURCES
| AUTO
| STR
| IP
| NUM
| FROM
| PATTERN
| NEW_FIELD
Expand Down Expand Up @@ -1196,4 +1194,9 @@ keywordsCanBeId
| BETWEEN
| CIDRMATCH
| trendlineType
// SORT FIELD KEYWORDS
| AUTO
| STR
| IP
| NUM
;
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
import org.opensearch.sql.ast.expression.subquery.ExistsSubquery;
import org.opensearch.sql.ast.expression.subquery.InSubquery;
import org.opensearch.sql.ast.expression.subquery.ScalarSubquery;
import org.opensearch.sql.ast.tree.Trendline;
import org.opensearch.sql.common.antlr.SyntaxCheckException;
import org.opensearch.sql.common.utils.StringUtils;
import org.opensearch.sql.ppl.utils.ArgumentFactory;
import org.opensearch.sql.ppl.utils.GeoIpCatalystLogicalPlanTranslator;
Expand Down Expand Up @@ -184,6 +182,8 @@ public UnresolvedExpression visitWcFieldExpression(OpenSearchPPLParser.WcFieldEx

@Override
public UnresolvedExpression visitSortField(OpenSearchPPLParser.SortFieldContext ctx) {

// TODO #963: Implement 'num', 'str', and 'ip' sort syntax
return new Field((QualifiedName)
visit(ctx.sortFieldExpression().fieldExpression().qualifiedName()),
ArgumentFactory.getArgumentList(ctx));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ import org.scalatest.matchers.should.Matchers
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.ScalaReflection.universe.Star
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Coalesce, Descending, GreaterThan, Literal, NamedExpression, NullsFirst, NullsLast, RegExpExtract, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Cast, Coalesce, Descending, GreaterThan, Literal, NamedExpression, NullsFirst, NullsLast, RegExpExtract, SortOrder}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, GlobalLimit, LocalLimit, Project, Sort}
import org.apache.spark.sql.types.IntegerType

class PPLLogicalPlanParseTranslatorTestSuite
extends SparkFunSuite
Expand Down Expand Up @@ -120,43 +121,49 @@ class PPLLogicalPlanParseTranslatorTestSuite
assert(compareByString(expectedPlan) === compareByString(logPlan))
}

test("test parse email & host expressions including cast and sort commands") {
test("test parse street number & street expressions including cast and sort commands") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(
pplParser,
"source=t | parse address '(?<streetNumber>\\d+) (?<street>.+)' | where streetNumber > 500 | sort num(streetNumber) | fields streetNumber, street"),
context)

// TODO #963: Implement 'num', 'str', and 'ip' sort syntax
val query =
"source=t" +
" | parse address '(?<streetNumber>\\d+) (?<street>.+)'" +
" | eval streetNumberInt = cast(streetNumber as integer)" +
" | where streetNumberInt > 500" +
" | sort streetNumberInt" +
" | fields streetNumber, street"

val logPlan = planTransformer.visit(plan(pplParser, query), context)

val addressAttribute = UnresolvedAttribute("address")
val streetNumberAttribute = UnresolvedAttribute("streetNumber")
val streetAttribute = UnresolvedAttribute("street")
val streetNumberIntAttribute = UnresolvedAttribute("streetNumberInt")

val streetNumberExpression = Alias(
RegExpExtract(
addressAttribute,
Literal("(?<streetNumber>\\d+) (?<street>.+)"),
Literal("1")),
"streetNumber")()
val regexLiteral = Literal("(?<streetNumber>\\d+) (?<street>.+)")
val streetNumberExpression =
Alias(RegExpExtract(addressAttribute, regexLiteral, Literal("1")), "streetNumber")()
val streetExpression =
Alias(RegExpExtract(addressAttribute, regexLiteral, Literal("2")), "street")()

val streetExpression = Alias(
RegExpExtract(
addressAttribute,
Literal("(?<streetNumber>\\d+) (?<street>.+)"),
Literal("2")),
"street")()
val castExpression = Cast(streetNumberAttribute, IntegerType)

val expectedPlan = Project(
Seq(streetNumberAttribute, streetAttribute),
Sort(
Seq(SortOrder(streetNumberAttribute, Ascending, NullsFirst, Seq.empty)),
Seq(SortOrder(streetNumberIntAttribute, Ascending, NullsFirst, Seq.empty)),
global = true,
Filter(
GreaterThan(streetNumberAttribute, Literal(500)),
GreaterThan(streetNumberIntAttribute, Literal(500)),
Project(
Seq(addressAttribute, streetNumberExpression, streetExpression, UnresolvedStar(None)),
UnresolvedRelation(Seq("t"))))))
Seq(UnresolvedStar(None), Alias(castExpression, "streetNumberInt")()),
Project(
Seq(
addressAttribute,
streetNumberExpression,
streetExpression,
UnresolvedStar(None)),
UnresolvedRelation(Seq("t")))))))

assert(compareByString(expectedPlan) === compareByString(logPlan))
}
Expand Down

0 comments on commit 6610253

Please sign in to comment.