Skip to content

Commit

Permalink
Add the SpatialOptimizations injector function (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
pomadchin authored Apr 13, 2022
1 parent a38db7f commit fe486d7
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2022 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.azavea.hiveless.spark.sql

import com.azavea.hiveless.spark.sql.rules.SpatialFilterPushdownRules
import org.apache.spark.sql.SparkSessionExtensions

class SpatialFilterPushdownOptimizations extends (SparkSessionExtensions => Unit) {
def apply(e: SparkSessionExtensions): Unit = e.injectOptimizerRule(_ => SpatialFilterPushdownRules)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.azavea.hiveless.spark.spatial.rules
package com.azavea.hiveless.spark.sql.rules

import com.azavea.hiveless.spark.rules.syntax._
import com.azavea.hiveless.spatial._
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2022 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.azavea.hiveless

import com.azavea.hiveless.spark.sql.SpatialFilterPushdownOptimizations
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.scalatest.{BeforeAndAfterAll, Suite}

trait InjectOptimizerTestEnvironment extends SpatialIndexHiveTestEnvironment { self: Suite with BeforeAndAfterAll =>

// disable manual optimizations registration
override def registerOptimizations(sqlContext: SQLContext): Unit = {}

// enable plan optimizations by using the plan injector
override def addSparkConfigProperties(config: SparkConf): Unit =
config.set("spark.sql.extensions", classOf[SpatialFilterPushdownOptimizations].getName)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.azavea.hiveless

import com.azavea.hiveless.spark.spatial.rules.SpatialFilterPushdownRules
import com.azavea.hiveless.spark.sql.rules.SpatialFilterPushdownRules
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.scalatest.{BeforeAndAfterAll, Suite}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Copyright 2022 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.azavea.hiveless.spatial.index

import com.azavea.hiveless.{InjectOptimizerTestEnvironment, SpatialIndexTestTables}
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.scalatest.funspec.AnyFunSpec

class STIndexInjectorSpec extends AnyFunSpec with InjectOptimizerTestEnvironment with SpatialIndexTestTables {

describe("ST Index functions spec") {
it("ST_Intersects plan should be optimized") {
val df = ssc.sql(
"""
|SELECT * FROM polygons_parquet WHERE ST_Intersects(bbox, ST_GeomFromGeoJSON('{"type":"Polygon","coordinates":[[[-75.5859375,40.32517767999294],[-75.5859375,43.197167282501276],[-72.41015625,43.197167282501276],[-72.41015625,40.32517767999294],[-75.5859375,40.32517767999294]]]}'))
|""".stripMargin
)

val dfe = ssc.sql(
"""
|SELECT * FROM polygons_parquet
|WHERE bbox.xmin >= -75.5859375
|AND bbox.ymin >= 40.3251777
|AND bbox.xmax <= -72.4101562
|AND bbox.ymax <= 43.1971673
|AND ST_Intersects(bbox, ST_GeomFromGeoJSON('{"type":"Polygon","coordinates":[[[-75.5859375,40.32517767999294],[-75.5859375,43.197167282501276],[-72.41015625,43.197167282501276],[-72.41015625,40.32517767999294],[-75.5859375,40.32517767999294]]]}'))
|""".stripMargin
)

df.count() shouldBe dfe.count()

// compare optimized plans filters
val dfc = df.queryExecution.optimizedPlan.collect { case Filter(condition, _) => condition }
val dfec = dfe.queryExecution.optimizedPlan.collect { case Filter(condition, _) => condition }

dfc shouldBe dfec
}

it("ST_Intersects by Extent plan should be optimized") {
val df = ssc.sql(
"""
|SELECT * FROM polygons_parquet WHERE ST_Intersects(bbox, ST_MakeExtent(-75.5859375, 40.3251777, -72.4101562, 43.1971673))
|""".stripMargin
)

val dfe = ssc.sql(
"""
|SELECT * FROM polygons_parquet
|WHERE bbox.xmin >= -75.5859375
|AND bbox.ymin >= 40.3251777
|AND bbox.xmax <= -72.4101562
|AND bbox.ymax <= 43.1971673
|""".stripMargin
)

df.count() shouldBe dfe.count()

// compare optimized plans filters
val dfc = df.queryExecution.optimizedPlan.collect { case Filter(condition, _) => condition }
val dfec = dfe.queryExecution.optimizedPlan.collect { case Filter(condition, _) => condition }

dfc shouldBe dfec
}

it("ST_Intersects optimization failure (Extent, Extent)") {
val df = ssc.sql(
"""
|SELECT * FROM polygons_parquet WHERE ST_Intersects(bbox, bbox)
|""".stripMargin
)

val dfe = ssc.sql(
"""
|SELECT * FROM polygons_parquet
|WHERE bbox.xmin >= -75.5859375
|AND bbox.ymin >= 40.3251777
|AND bbox.xmax <= -72.4101562
|AND bbox.ymax <= 43.1971673
|""".stripMargin
)

df.count() shouldBe dfe.count()

// compare optimized plans filters
val dfc = df.queryExecution.optimizedPlan.collect { case Filter(condition, _) => condition }
val dfec = dfe.queryExecution.optimizedPlan.collect { case Filter(condition, _) => condition }

dfc shouldNot be(dfec)
}

it("ST_Intersects optimization failure (Extent, Geometry)") {
val df = ssc.sql(
"""
|SELECT * FROM polygons_parquet WHERE ST_Intersects(bbox, geom)
|""".stripMargin
)

val dfe = ssc.sql(
"""
|SELECT * FROM polygons_parquet
|WHERE bbox.xmin >= -75.5859375
|AND bbox.ymin >= 40.3251777
|AND bbox.xmax <= -72.4101562
|AND bbox.ymax <= 43.1971673
|""".stripMargin
)

df.count() shouldBe dfe.count()

// compare optimized plans filters
val dfc = df.queryExecution.optimizedPlan.collect { case Filter(condition, _) => condition }
val dfec = dfe.queryExecution.optimizedPlan.collect { case Filter(condition, _) => condition }

dfc shouldNot be(dfec)
}

it("ST_Intersects optimization failure (Geometry, Geometry)") {
val df = ssc.sql(
"""
|SELECT * FROM polygons_parquet WHERE ST_Intersects(geom, geom)
|""".stripMargin
)

val dfe = ssc.sql(
"""
|SELECT * FROM polygons_parquet
|WHERE bbox.xmin >= -75.5859375
|AND bbox.ymin >= 40.3251777
|AND bbox.xmax <= -72.4101562
|AND bbox.ymax <= 43.1971673
|""".stripMargin
)

df.count() shouldBe dfe.count()

// compare optimized plans filters
val dfc = df.queryExecution.optimizedPlan.collect { case Filter(condition, _) => condition }
val dfec = dfe.queryExecution.optimizedPlan.collect { case Filter(condition, _) => condition }

dfc shouldNot be(dfec)
}

it("ST_Intersects optimization failure (Geometry, Extent)") {
val df = ssc.sql(
"""
|SELECT * FROM polygons_parquet WHERE ST_Intersects(geom, bbox)
|""".stripMargin
)

val dfe = ssc.sql(
"""
|SELECT * FROM polygons_parquet
|WHERE bbox.xmin >= -75.5859375
|AND bbox.ymin >= 40.3251777
|AND bbox.xmax <= -72.4101562
|AND bbox.ymax <= 43.1971673
|""".stripMargin
)

df.count() shouldBe dfe.count()

// compare optimized plans filters
val dfc = df.queryExecution.optimizedPlan.collect { case Filter(condition, _) => condition }
val dfec = dfe.queryExecution.optimizedPlan.collect { case Filter(condition, _) => condition }

dfc shouldNot be(dfec)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ trait SpatialHiveTestEnvironment extends TestEnvironment { self: Suite with Befo
// function to override optimizations
def registerOptimizations(sqlContext: SQLContext): Unit = {}

val (warehouseDir, derbyConnectionURL) = {
def addSparkConfigProperties(config: SparkConf): Unit = {}

// returns (warehouseDir, derbyConnectionURL)
def warehouseLocation: (String, String) = {
val tmpDir = System.getProperty("java.io.tmpdir")
// a separate warehouse for each spec, JDK 8 is unhappy with the old directory being populated
val wdir = s"${tmpDir}/cartoanalyticstoolbox-warehouse/${self.getClass.getName}"
Expand All @@ -52,13 +55,9 @@ trait SpatialHiveTestEnvironment extends TestEnvironment { self: Suite with Befo
(wdir, connectionURL)
}

// override the SparkSession construction to enable Hive support
override lazy val _ssc: SparkSession = {
System.setProperty("spark.driver.port", "0")
System.setProperty("spark.hostPort", "0")
System.setProperty("spark.ui.enabled", "false")

val conf = new SparkConf()
lazy val sparkConfig: SparkConf = {
val (warehouseDir, derbyConnectionURL) = warehouseLocation
val conf = new SparkConf()
conf
.setMaster(sparkMaster)
.setAppName("Test Hive Context")
Expand All @@ -79,7 +78,22 @@ trait SpatialHiveTestEnvironment extends TestEnvironment { self: Suite with Befo
setKryoRegistrator(conf)
}

val sparkContext = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
addSparkConfigProperties(conf)
conf
}

// override the SparkSession construction to enable Hive support
override lazy val _ssc: SparkSession = {
System.setProperty("spark.driver.port", "0")
System.setProperty("spark.hostPort", "0")
System.setProperty("spark.ui.enabled", "false")

val sparkContext =
SparkSession
.builder()
.config(sparkConfig)
.enableHiveSupport()
.getOrCreate()

System.clearProperty("spark.driver.port")
System.clearProperty("spark.hostPort")
Expand Down

0 comments on commit fe486d7

Please sign in to comment.