Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Unity Catalog (UC) as Commit Coordinator for Delta tables #4185

Merged
merged 2 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ lazy val spark = (project in file("spark"))
"org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests",
"org.mockito" % "mockito-inline" % "4.11.0" % "test",
),
Compile / packageBin / mappings := (Compile / packageBin / mappings).value ++
listPythonFiles(baseDirectory.value.getParentFile / "python"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#
# Copyright (2024) The Delta Lake Project Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os
import sys
import threading
import json

from pyspark.sql import SparkSession
import time
import uuid

"""
Run this script in root dir of repository:

===== Mandatory input from user =====
export CATALOG_NAME=___
export CATALOG_URI=___
export CATALOG_TOKEN=___
export TABLE_NAME=___
export SCHEMA=___

./run-integration-tests.py --use-local --unity-catalog-commit-coordinator-integration-tests \
--packages \
io.unitycatalog:unitycatalog-spark_2.12:0.2.1,org.apache.spark:spark-hadoop-cloud_2.12:3.5.4
"""

CATALOG_NAME = os.environ.get("CATALOG_NAME")
CATALOG_TOKEN = os.environ.get("CATALOG_TOKEN")
CATALOG_URI = os.environ.get("CATALOG_URI")
TABLE_NAME = os.environ.get("TABLE_NAME")
SCHEMA = os.environ.get("SCHEMA")

spark = SparkSession \
.builder \
.appName("coordinated_commit_tester") \
.master("local[*]") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "io.unitycatalog.spark.UCSingleCatalog") \
.config(f"spark.sql.catalog.{CATALOG_NAME}", "io.unitycatalog.spark.UCSingleCatalog") \
.config(f"spark.sql.catalog.{CATALOG_NAME}.token", CATALOG_TOKEN) \
.config(f"spark.sql.catalog.{CATALOG_NAME}.uri", CATALOG_URI) \
.config(f"spark.sql.defaultCatalog", CATALOG_NAME) \
.config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.databricks.delta.commitcoordinator.unity-catalog.impl",
"org.delta.catalog.UCCoordinatedCommitClient") \
.getOrCreate()

expected_error_tag = "UNITY_CATALOG_EXTERNAL_COORDINATED_COMMITS_REQUEST_DENIED"


def create() -> None:
try:
spark.sql(f"CREATE TABLE {SCHEMA}.{TABLE_NAME} (a INT)")
except Exception:
print("[UNSUPPORTED] Creating managed table using UC commit coordinator isn't allowed")


def insert() -> None:
try:
spark.sql(f"INSERT INTO {SCHEMA}.{TABLE_NAME} VALUES (1), (2)")
except Exception as error:
assert(expected_error_tag in str(error))
print("[UNSUPPORTED] Writing to managed table using UC commit coordinator isn't allowed")


def update() -> None:
try:
spark.sql(f"UPDATE {SCHEMA}.{TABLE_NAME} SET a=4")
except Exception as error:
assert(expected_error_tag in str(error))
print("[UNSUPPORTED] Updating managed table using UC commit coordinator isn't allowed")


def delete() -> None:
try:
spark.sql(f"DELETE FROM {SCHEMA}.{TABLE_NAME} where a=1")
except Exception as error:
assert(expected_error_tag in str(error))
print("[UNSUPPORTED] Deleting from managed table using UC commit coordinator isn't allowed")


def read() -> None:
try:
res = spark.sql(f"SELECT * FROM {SCHEMA}.{TABLE_NAME}")
except Exception as error:
assert(expected_error_tag in str(error))
print("[UNSUPPORTED] Reading from managed table using UC commit coordinator isn't allowed")

create()
insert()
update()
read()
delete()
57 changes: 53 additions & 4 deletions run-integration-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,43 @@ def run_pip_installation_tests(root_dir, version, use_testpypi, use_localpypi, e
print("Failed pip installation tests in %s" % (test_file))
raise

def run_unity_catalog_commit_coordinator_integration_tests(root_dir, version, test_name, use_local, extra_packages):
print(
"\n\n##### Running Unity Catalog commit coordinator integration tests on version %s #####" % str(version)
)

if use_local:
clear_artifact_cache()
run_cmd(["build/sbt", "publishM2"])

test_dir = path.join(root_dir, \
path.join("python", "delta", "integration_tests"))
test_files = [path.join(test_dir, f) for f in os.listdir(test_dir)
if path.isfile(path.join(test_dir, f)) and
f.endswith(".py") and not f.startswith("_")]

print("\n\nTests compiled\n\n")

python_root_dir = path.join(root_dir, "python")
extra_class_path = path.join(python_root_dir, path.join("delta", "testing"))
packages = "io.delta:delta-%s_2.12:%s" % (get_artifact_name(version), version)
if extra_packages:
packages += "," + extra_packages

for test_file in test_files:
if test_name is not None and test_name not in test_file:
print("\nSkipping Unity Catalog commit coordinator integration tests in %s\n============" % test_file)
continue
try:
cmd = ["spark-submit",
"--driver-class-path=%s" % extra_class_path, # for less verbose logging
"--packages", packages] + [test_file]
print("\nRunning External uc managed tables integration tests in %s\n=============" % test_file)
print("Command: %s" % " ".join(cmd))
run_cmd(cmd, stream_output=True)
except:
print("Failed Unity Catalog commit coordinator integration tests in %s" % (test_file))
raise

def clear_artifact_cache():
print("Clearing Delta artifacts from ivy2 and mvn cache")
Expand Down Expand Up @@ -495,10 +532,10 @@ def __exit__(self, tpe, value, traceback):
action="store_true",
help="Run the DynamoDB integration tests (and only them)")
parser.add_argument(
"--dbb-packages",
"--packages",
required=False,
default=None,
help="Additional packages required for Dynamodb logstore integration tests")
help="Additional packages required for integration tests")
parser.add_argument(
"--dbb-conf",
required=False,
Expand Down Expand Up @@ -544,6 +581,13 @@ def __exit__(self, tpe, value, traceback):
default="0.15.0",
help="Hudi library version"
)
parser.add_argument(
"--unity-catalog-commit-coordinator-integration-tests",
required=False,
default=False,
action="store_true",
help="Run the Unity Catalog Commit Coordinator tests (and only them)"
)

args = parser.parse_args()

Expand Down Expand Up @@ -574,18 +618,23 @@ def __exit__(self, tpe, value, traceback):

if args.run_storage_s3_dynamodb_integration_tests:
run_dynamodb_logstore_integration_tests(root_dir, args.version, args.test, args.maven_repo,
args.dbb_packages, args.dbb_conf, args.use_local)
args.packages, args.dbb_conf, args.use_local)
quit()

if args.run_dynamodb_commit_coordinator_integration_tests:
run_dynamodb_commit_coordinator_integration_tests(root_dir, args.version, args.test, args.maven_repo,
args.dbb_packages, args.dbb_conf, args.use_local)
args.packages, args.dbb_conf, args.use_local)
quit()

if args.s3_log_store_util_only:
run_s3_log_store_util_integration_tests()
quit()

if args.unity_catalog_commit_coordinator_integration_tests:
run_unity_catalog_commit_coordinator_integration_tests(root_dir, args.version, args.test, args.use_local,
args.packages)
quit()

if run_scala:
run_scala_integration_tests(root_dir, args.version, args.test, args.maven_repo,
args.scala_version, args.use_local)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
export DELTA_DYNAMO_TABLE_NAME=___
./run-integration-tests.py --use-local --run-dynamodb-commit-coordinator-integration-tests \
--dbb-packages org.apache.hadoop:hadoop-aws:3.4.0,com.amazonaws:aws-java-sdk-bundle:1.12.262 \
--packages org.apache.hadoop:hadoop-aws:3.4.0,com.amazonaws:aws-java-sdk-bundle:1.12.262 \
--dbb-conf io.delta.storage.credentials.provider=com.amazonaws.auth.profile.ProfileCredentialsProvider \
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.profile.ProfileCredentialsProvider
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,12 @@ object CommitCoordinatorProvider {
nameToBuilderMapping.retain((k, _) => initialCommitCoordinatorNames.contains(k))
}

private[delta] def clearAllBuilders(): Unit = synchronized {
nameToBuilderMapping.clear()
}

private val initialCommitCoordinatorBuilders = Seq[CommitCoordinatorBuilder](
UCCommitCoordinatorBuilder,
new DynamoDBCommitCoordinatorClientBuilder()
)
initialCommitCoordinatorBuilders.foreach(registerBuilder)
Expand Down
Loading
Loading