From 966b193923be8ecfb1eb697e809374a2be0a0d3c Mon Sep 17 00:00:00 2001 From: rhaffar <141745338+rhaffar@users.noreply.github.com> Date: Wed, 12 Feb 2025 11:39:17 -0500 Subject: [PATCH] fixing compare matching rows for spark compares and SF compare (#378) * fixing compare matching rows for spark compares and SF compare * adding new test case * bump version to 0.16.2 --------- Co-authored-by: Faisal --- datacompy/__init__.py | 2 +- datacompy/snowflake.py | 2 +- datacompy/spark/pandas.py | 2 +- datacompy/spark/sql.py | 2 +- tests/test_core.py | 7 +++++++ tests/test_fugue/test_duckdb.py | 9 +++++++++ tests/test_fugue/test_fugue_pandas.py | 9 +++++++++ tests/test_fugue/test_fugue_polars.py | 9 +++++++++ tests/test_fugue/test_fugue_spark.py | 9 +++++++++ tests/test_polars.py | 7 +++++++ tests/test_snowflake.py | 9 +++++++++ tests/test_spark/test_pandas_spark.py | 8 ++++++++ tests/test_spark/test_sql_spark.py | 7 +++++++ 13 files changed, 78 insertions(+), 4 deletions(-) diff --git a/datacompy/__init__.py b/datacompy/__init__.py index ea04402..347c449 100644 --- a/datacompy/__init__.py +++ b/datacompy/__init__.py @@ -18,7 +18,7 @@ Then extended to carry that functionality over to Spark Dataframes. """ -__version__ = "0.16.1" +__version__ = "0.16.2" import platform from warnings import warn diff --git a/datacompy/snowflake.py b/datacompy/snowflake.py index a12e132..d2fdaf7 100644 --- a/datacompy/snowflake.py +++ b/datacompy/snowflake.py @@ -546,7 +546,7 @@ def count_matching_rows(self) -> int: " and ".join(conditions) ).count() else: - match_columns_count = 0 + match_columns_count = self.intersect_rows.count() return match_columns_count def intersect_rows_match(self) -> bool: diff --git a/datacompy/spark/pandas.py b/datacompy/spark/pandas.py index be6ef08..b8dbc91 100644 --- a/datacompy/spark/pandas.py +++ b/datacompy/spark/pandas.py @@ -490,7 +490,7 @@ def count_matching_rows(self) -> bool: .shape[0] ) else: - match_columns_count = 0 + self.intersect_rows.shape[0] return match_columns_count def intersect_rows_match(self) -> bool: diff --git a/datacompy/spark/sql.py b/datacompy/spark/sql.py index f0574a1..80dcba1 100644 --- a/datacompy/spark/sql.py +++ b/datacompy/spark/sql.py @@ -556,7 +556,7 @@ def count_matching_rows(self) -> int: " and ".join(conditions) ).count() else: - match_columns_count = 0 + match_columns_count = self.intersect_rows.count() return match_columns_count def intersect_rows_match(self) -> bool: diff --git a/tests/test_core.py b/tests/test_core.py index 2f164b4..e3658f5 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -911,6 +911,13 @@ def test_index_with_joins_with_ignore_case(): assert compare.intersect_rows_match() +def test_full_join_counts_all_matches(): + df1 = pd.DataFrame([{"a": 1, "b": 2}, {"a": 1, "b": 2}]) + df2 = pd.DataFrame([{"a": 1, "b": 2}, {"a": 1, "b": 2}]) + compare = datacompy.Compare(df1, df2, ["a", "b"], ignore_spaces=False) + assert compare.count_matching_rows() == 2 + + def test_strings_with_ignore_spaces_and_join_columns(): df1 = pd.DataFrame([{"a": "hi", "b": "A"}, {"a": "bye", "b": "A"}]) df2 = pd.DataFrame([{"a": " hi ", "b": "A"}, {"a": " bye ", "b": "A"}]) diff --git a/tests/test_fugue/test_duckdb.py b/tests/test_fugue/test_duckdb.py index 8d9ee41..1340400 100644 --- a/tests/test_fugue/test_duckdb.py +++ b/tests/test_fugue/test_duckdb.py @@ -168,6 +168,15 @@ def test_count_matching_rows_duckdb(count_matching_rows_df): ) == 100 ) + assert ( + count_matching_rows( + df1, + df1_copy, + join_columns=["a", "b"], + parallelism=2, + ) + == 100 + ) assert ( count_matching_rows( df1, diff --git a/tests/test_fugue/test_fugue_pandas.py b/tests/test_fugue/test_fugue_pandas.py index db2ff30..dc41288 100644 --- a/tests/test_fugue/test_fugue_pandas.py +++ b/tests/test_fugue/test_fugue_pandas.py @@ -224,6 +224,15 @@ def test_count_matching_rows_native(count_matching_rows_df): ) == 100 ) + assert ( + count_matching_rows( + count_matching_rows_df[0], + count_matching_rows_df[0].copy(), + join_columns=["a", "b"], + parallelism=2, + ) + == 100 + ) assert ( count_matching_rows( count_matching_rows_df[0], diff --git a/tests/test_fugue/test_fugue_polars.py b/tests/test_fugue/test_fugue_polars.py index 0e77781..ad1d891 100644 --- a/tests/test_fugue/test_fugue_polars.py +++ b/tests/test_fugue/test_fugue_polars.py @@ -149,6 +149,15 @@ def test_count_matching_rows_polars(count_matching_rows_df): ) == 100 ) + assert ( + count_matching_rows( + df1, + df1.clone(), + join_columns=["a", "b"], + parallelism=2, + ) + == 100 + ) assert ( count_matching_rows( df1, diff --git a/tests/test_fugue/test_fugue_spark.py b/tests/test_fugue/test_fugue_spark.py index 008ef69..c6dc51d 100644 --- a/tests/test_fugue/test_fugue_spark.py +++ b/tests/test_fugue/test_fugue_spark.py @@ -239,6 +239,15 @@ def test_count_matching_rows_spark(spark_session, count_matching_rows_df): ) == 100 ) + assert ( + count_matching_rows( + df1, + df1_copy, + join_columns=["a", "b"], + parallelism=2, + ) + == 100 + ) assert ( count_matching_rows( df1, diff --git a/tests/test_polars.py b/tests/test_polars.py index 9e0cf27..1ee6122 100644 --- a/tests/test_polars.py +++ b/tests/test_polars.py @@ -793,6 +793,13 @@ def test_joins_with_ignore_case(): assert compare.intersect_rows_match() +def test_full_join_counts_all_matches(): + df1 = pl.DataFrame([{"a": 1, "b": 2}, {"a": 1, "b": 2}]) + df2 = pl.DataFrame([{"a": 1, "b": 2}, {"a": 1, "b": 2}]) + compare = PolarsCompare(df1, df2, ["a", "b"], ignore_spaces=False) + assert compare.count_matching_rows() == 2 + + def test_strings_with_ignore_spaces_and_join_columns(): df1 = pl.DataFrame([{"a": "hi", "b": "A"}, {"a": "bye", "b": "A"}]) df2 = pl.DataFrame([{"a": " hi ", "b": "A"}, {"a": " bye ", "b": "A"}]) diff --git a/tests/test_snowflake.py b/tests/test_snowflake.py index af6029f..b0af4f6 100644 --- a/tests/test_snowflake.py +++ b/tests/test_snowflake.py @@ -885,6 +885,15 @@ def test_joins_with_sensitive_lowercase_cols(snowpark_session): assert compare.intersect_rows_match() +def test_full_join_counts_all_matches(snowpark_session): + df1 = snowpark_session.createDataFrame([{"A": 1, "B": 2}, {"A": 1, "B": 2}]) + df2 = snowpark_session.createDataFrame([{"A": 1, "B": 2}, {"A": 1, "B": 2}]) + compare = SnowflakeCompare( + snowpark_session, df1, df2, ["A", "B"], ignore_spaces=False + ) + assert compare.count_matching_rows() == 2 + + def test_strings_with_ignore_spaces_and_join_columns(snowpark_session): df1 = snowpark_session.createDataFrame( [{"A": "HI", "B": "A"}, {"A": "BYE", "B": "A"}] diff --git a/tests/test_spark/test_pandas_spark.py b/tests/test_spark/test_pandas_spark.py index 068b133..17b1c11 100644 --- a/tests/test_spark/test_pandas_spark.py +++ b/tests/test_spark/test_pandas_spark.py @@ -862,6 +862,14 @@ def test_joins_with_ignore_case(): assert compare.intersect_rows_match() +@pandas_version +def test_full_join_counts_all_matches(): + df1 = ps.DataFrame([{"a": 1, "b": 2}, {"a": 1, "b": 2}]) + df2 = ps.DataFrame([{"a": 1, "b": 2}, {"a": 1, "b": 2}]) + compare = SparkPandasCompare(df1, df2, ["a", "b"], ignore_spaces=False) + assert compare.count_matching_rows() == 2 + + @pandas_version def test_strings_with_ignore_spaces_and_join_columns(): df1 = ps.DataFrame([{"a": "hi", "b": "A"}, {"a": "bye", "b": "A"}]) diff --git a/tests/test_spark/test_sql_spark.py b/tests/test_spark/test_sql_spark.py index 5b2686f..65c13c7 100644 --- a/tests/test_spark/test_sql_spark.py +++ b/tests/test_spark/test_sql_spark.py @@ -851,6 +851,13 @@ def test_joins_with_ignore_case(spark_session): assert compare.intersect_rows_match() +def test_full_join_counts_all_matches(spark_session): + df1 = spark_session.createDataFrame([{"a": 1, "b": 2}, {"a": 1, "b": 2}]) + df2 = spark_session.createDataFrame([{"a": 1, "b": 2}, {"a": 1, "b": 2}]) + compare = SparkSQLCompare(spark_session, df1, df2, ["a", "b"], ignore_spaces=False) + assert compare.count_matching_rows() == 2 + + def test_strings_with_ignore_spaces_and_join_columns(spark_session): df1 = spark_session.createDataFrame([{"a": "hi", "b": "A"}, {"a": "bye", "b": "A"}]) df2 = spark_session.createDataFrame(