Skip to content

Commit

Permalink
Polish
Browse files Browse the repository at this point in the history
  • Loading branch information
longvu-db committed Aug 13, 2024
1 parent babdddb commit 53e6c32
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 16 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/spark_master_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
- name: Run Spark Master tests
# when changing TEST_PARALLELISM_COUNT make sure to also change it in spark_test.yaml
run: |
TEST_PARALLELISM_COUNT=2 build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean spark/test
TEST_PARALLELISM_COUNT=2 build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean connectServer/test
TEST_PARALLELISM_COUNT=2 build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean connectServer/assembly connectClient/test
TEST_PARALLELISM_COUNT=2 build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean spark/test
if: steps.git-diff.outputs.diff
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package org.apache.spark.sql.connect.delta

import java.util.Optional

import com.databricks.spark.util.DatabricksLogging
import com.google.protobuf
import com.google.protobuf.{ByteString, InvalidProtocolBufferException}
import io.delta.connect.proto
Expand All @@ -39,7 +38,7 @@ import org.apache.spark.sql.types.StructType
/**
* Planner plugin for relation extensions using [[proto.DeltaRelation]].
*/
class DeltaRelationPlugin extends RelationPlugin with DeltaPlannerBase with DatabricksLogging {
class DeltaRelationPlugin extends RelationPlugin with DeltaPlannerBase {
override def transform(raw: Array[Byte], planner: SparkConnectPlanner): Optional[LogicalPlan] = {
val relation = parseAnyFrom(raw,
SparkEnv.get.conf.get(Connect.CONNECT_GRPC_MARSHALLER_RECURSION_LIMIT))
Expand All @@ -58,7 +57,6 @@ class DeltaRelationPlugin extends RelationPlugin with DeltaPlannerBase with Data

private def transform(
relation: proto.DeltaRelation, planner: SparkConnectPlanner): LogicalPlan = {
logConsole("Transforming DeltaRelation " + relation.getRelationTypeCase)
relation.getRelationTypeCase match {
case proto.DeltaRelation.RelationTypeCase.SCAN =>
transformScan(planner.session, relation.getScan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import com.google.protobuf
import io.delta.connect.proto
import io.delta.connect.spark.{proto => spark_proto}
import io.delta.tables.DeltaTable
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, QueryTest, SparkSession}
Expand Down Expand Up @@ -131,7 +130,7 @@ class DeltaConnectPlannerSuite
val plan = transform(input)
val result = Dataset.ofRows(spark, plan).collect()

assert(result.length == 1)
assert(result.length === 1)
val deltaTable = DeltaTable.forName(spark, result.head.getString(0))
assert(!deltaTable.toDF.isEmpty)
}
Expand Down Expand Up @@ -160,7 +159,7 @@ class DeltaConnectPlannerSuite
val plan = transform(input)
val result = Dataset.ofRows(spark, plan).collect()

assert(result.length == 1)
assert(result.length === 1)
val deltaTable = DeltaTable.forName(spark, result.head.getString(0))
assert(!deltaTable.toDF.isEmpty)
}
Expand Down Expand Up @@ -207,7 +206,7 @@ class DeltaConnectPlannerSuite
val plan = transform(input)
val result = Dataset.ofRows(spark, plan).collect()

assert(result.length == 1)
assert(result.length === 1)
val deltaTable = DeltaTable.forName(spark, result.head.getString(0))
assert(!deltaTable.toDF.isEmpty)
}
Expand Down Expand Up @@ -254,7 +253,7 @@ class DeltaConnectPlannerSuite
val childResult = result.asInstanceOf[DescribeDeltaDetailCommand].child
val childExpected = expected.asInstanceOf[DescribeDeltaDetailCommand].child

assert(childResult.asInstanceOf[ResolvedTable].identifier.name ==
assert(childResult.asInstanceOf[ResolvedTable].identifier.name ===
childExpected.asInstanceOf[ResolvedTable].identifier.name)
}
}
Expand Down Expand Up @@ -288,9 +287,9 @@ class DeltaConnectPlannerSuite
val plan = transform(input)
assert(plan.columns.toSeq == expectedRestoreOutputColumns)
val result = Dataset.ofRows(spark, plan).collect()
assert(result.length == 1)
assert(result.head.getLong(2) == 2) // Two files should have been removed.
assert(spark.read.table("table").count() == 1000)
assert(result.length === 1)
assert(result.head.getLong(2) === 2) // Two files should have been removed.
assert(spark.read.table("table").count() === 1000)
}
}

Expand Down Expand Up @@ -336,9 +335,9 @@ class DeltaConnectPlannerSuite
)

val plan = transform(input)
assert(plan.schema.length == 1)
assert(plan.schema.length === 1)
val result = Dataset.ofRows(spark, plan).collect()
assert(result.length == 1)
assert(result.length === 1)
assert(result.head.getBoolean(0))
}
}
Expand All @@ -354,9 +353,9 @@ class DeltaConnectPlannerSuite
)

val plan = transform(input)
assert(plan.schema.length == 1)
assert(plan.schema.length === 1)
val result = Dataset.ofRows(spark, plan).collect()
assert(result.length == 1)
assert(result.length === 1)
assert(!result.head.getBoolean(0))
}
}
Expand Down

0 comments on commit 53e6c32

Please sign in to comment.