Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding in all_columns_match #219

Merged
merged 1 commit into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion datacompy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,11 @@
__version__ = "0.10.2"

from datacompy.core import *
from datacompy.fugue import intersect_columns, is_match, report, unq_columns
from datacompy.fugue import (
all_columns_match,
intersect_columns,
is_match,
report,
unq_columns,
)
from datacompy.spark import NUMERIC_SPARK_TYPES, SparkCompare
19 changes: 19 additions & 0 deletions datacompy/fugue.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,25 @@ def intersect_columns(df1: AnyDataFrame, df2: AnyDataFrame):
return OrderedSet(col1) & OrderedSet(col2)


def all_columns_match(df1: AnyDataFrame, df2: AnyDataFrame):
"""Whether the columns all match in the dataframes

Parameters
----------
df1 : ``AnyDataFrame``
First dataframe to check

df2 : ``AnyDataFrame``
Second dataframe to check

Returns
-------
bool
Boolean indicating whether the columns all match in the dataframes
"""
return unq_columns(df1, df2) == unq_columns(df2, df1) == set()


def is_match(
df1: AnyDataFrame,
df2: AnyDataFrame,
Expand Down
195 changes: 143 additions & 52 deletions tests/test_fugue.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,36 @@
from ordered_set import OrderedSet
from pytest import raises

from datacompy import Compare, intersect_columns, is_match, report, unq_columns
from datacompy import (
Compare,
all_columns_match,
intersect_columns,
is_match,
report,
unq_columns,
)


@pytest.fixture
def ref_df():
np.random.seed(0)
return pd.DataFrame(

df1 = pd.DataFrame(
dict(
a=np.random.randint(0, 10, 100),
b=np.random.rand(100),
c=np.random.choice(["aaa", "b_c", "csd"], 100),
)
)
df1_copy = df1.copy()
df2 = df1.copy().drop(columns=["c"])
df3 = df1.copy().drop(columns=["a", "b"])
return [df1, df1_copy, df2, df3]


@pytest.fixture
def shuffle_df(ref_df):
return ref_df.sample(frac=1.0)
return ref_df[0].sample(frac=1.0)


@pytest.fixture
Expand Down Expand Up @@ -86,37 +98,37 @@ def test_is_match_native(
upper_col_df,
):
# defaults to Compare class
assert is_match(ref_df, ref_df.copy(), join_columns="a")
assert not is_match(ref_df, shuffle_df, join_columns="a")
assert is_match(ref_df[0], ref_df[0].copy(), join_columns="a")
assert not is_match(ref_df[0], shuffle_df, join_columns="a")
# Fugue
assert is_match(ref_df, shuffle_df, join_columns="a", parallelism=2)
assert is_match(ref_df[0], shuffle_df, join_columns="a", parallelism=2)

assert not is_match(ref_df, float_off_df, join_columns="a", parallelism=2)
assert not is_match(ref_df[0], float_off_df, join_columns="a", parallelism=2)
assert not is_match(
ref_df, float_off_df, abs_tol=0.00001, join_columns="a", parallelism=2
ref_df[0], float_off_df, abs_tol=0.00001, join_columns="a", parallelism=2
)
assert is_match(
ref_df, float_off_df, abs_tol=0.001, join_columns="a", parallelism=2
ref_df[0], float_off_df, abs_tol=0.001, join_columns="a", parallelism=2
)
assert is_match(
ref_df, float_off_df, abs_tol=0.001, join_columns="a", parallelism=2
ref_df[0], float_off_df, abs_tol=0.001, join_columns="a", parallelism=2
)

assert not is_match(ref_df, upper_case_df, join_columns="a", parallelism=2)
assert not is_match(ref_df[0], upper_case_df, join_columns="a", parallelism=2)
assert is_match(
ref_df, upper_case_df, join_columns="a", ignore_case=True, parallelism=2
ref_df[0], upper_case_df, join_columns="a", ignore_case=True, parallelism=2
)

assert not is_match(ref_df, space_df, join_columns="a", parallelism=2)
assert not is_match(ref_df[0], space_df, join_columns="a", parallelism=2)
assert is_match(
ref_df, space_df, join_columns="a", ignore_spaces=True, parallelism=2
ref_df[0], space_df, join_columns="a", ignore_spaces=True, parallelism=2
)

assert is_match(ref_df, upper_col_df, join_columns="a", parallelism=2)
assert is_match(ref_df[0], upper_col_df, join_columns="a", parallelism=2)

with raises(AssertionError):
is_match(
ref_df,
ref_df[0],
upper_col_df,
join_columns="a",
cast_column_names_lower=False,
Expand All @@ -133,8 +145,8 @@ def test_is_match_spark(
space_df,
upper_col_df,
):
ref_df.iteritems = ref_df.items # pandas 2 compatibility
rdf = spark_session.createDataFrame(ref_df)
ref_df[0].iteritems = ref_df[0].items # pandas 2 compatibility
rdf = spark_session.createDataFrame(ref_df[0])

assert is_match(rdf, shuffle_df, join_columns="a")

Expand Down Expand Up @@ -169,7 +181,7 @@ def test_is_match_polars(
space_df,
upper_col_df,
):
rdf = pl.from_pandas(ref_df)
rdf = pl.from_pandas(ref_df[0])

assert is_match(rdf, shuffle_df, join_columns="a")

Expand Down Expand Up @@ -198,7 +210,7 @@ def test_is_match_duckdb(
upper_col_df,
):
with duckdb.connect():
rdf = duckdb.from_df(ref_df)
rdf = duckdb.from_df(ref_df[0])

assert is_match(rdf, shuffle_df, join_columns="a")

Expand Down Expand Up @@ -275,9 +287,10 @@ def test_report_spark(spark_session, simple_diff_df1, simple_diff_df2):


def test_unique_columns_native(ref_df):
df1 = ref_df
df2 = ref_df.copy().drop(columns=["c"])
df3 = ref_df.copy().drop(columns=["a", "b"])
df1 = ref_df[0]
df1_copy = ref_df[1]
df2 = ref_df[2]
df3 = ref_df[3]

assert unq_columns(df1, df1.copy()) == OrderedSet()
assert unq_columns(df1, df2) == OrderedSet(["c"])
Expand All @@ -287,10 +300,10 @@ def test_unique_columns_native(ref_df):


def test_unique_columns_spark(spark_session, ref_df):
df1 = ref_df
df1_copy = ref_df.copy()
df2 = ref_df.copy().drop(columns=["c"])
df3 = ref_df.copy().drop(columns=["a", "b"])
df1 = ref_df[0]
df1_copy = ref_df[1]
df2 = ref_df[2]
df3 = ref_df[3]

df1.iteritems = df1.items # pandas 2 compatibility
df1_copy.iteritems = df1_copy.items # pandas 2 compatibility
Expand All @@ -310,12 +323,13 @@ def test_unique_columns_spark(spark_session, ref_df):


def test_unique_columns_polars(ref_df):
df1 = ref_df
df2 = ref_df.copy().drop(columns=["c"])
df3 = ref_df.copy().drop(columns=["a", "b"])
df1 = ref_df[0]
df1_copy = ref_df[1]
df2 = ref_df[2]
df3 = ref_df[3]

pdf1 = pl.from_pandas(df1)
pdf1_copy = pl.from_pandas(df1.copy())
pdf1_copy = pl.from_pandas(df1_copy)
pdf2 = pl.from_pandas(df2)
pdf3 = pl.from_pandas(df3)

Expand All @@ -327,13 +341,14 @@ def test_unique_columns_polars(ref_df):


def test_unique_columns_duckdb(ref_df):
df1 = ref_df
df2 = ref_df.copy().drop(columns=["c"])
df3 = ref_df.copy().drop(columns=["a", "b"])
df1 = ref_df[0]
df1_copy = ref_df[1]
df2 = ref_df[2]
df3 = ref_df[3]

with duckdb.connect():
ddf1 = duckdb.from_df(df1)
ddf1_copy = duckdb.from_df(df1.copy())
ddf1_copy = duckdb.from_df(df1_copy)
ddf2 = duckdb.from_df(df2)
ddf3 = duckdb.from_df(df3)

Expand All @@ -345,22 +360,23 @@ def test_unique_columns_duckdb(ref_df):


def test_intersect_columns_native(ref_df):
df1 = ref_df
df2 = ref_df.copy().drop(columns=["c"])
df3 = ref_df.copy().drop(columns=["a", "b"])
df1 = ref_df[0]
df1_copy = ref_df[1]
df2 = ref_df[2]
df3 = ref_df[3]

assert intersect_columns(df1, df1.copy()) == OrderedSet(["a", "b", "c"])
assert intersect_columns(df1, df1_copy) == OrderedSet(["a", "b", "c"])
assert intersect_columns(df1, df2) == OrderedSet(["a", "b"])
assert intersect_columns(df1, df3) == OrderedSet(["c"])
assert intersect_columns(df1.copy(), df1) == OrderedSet(["a", "b", "c"])
assert intersect_columns(df1_copy, df1) == OrderedSet(["a", "b", "c"])
assert intersect_columns(df3, df2) == OrderedSet()


def test_intersect_columns_spark(spark_session, ref_df):
df1 = ref_df
df1_copy = ref_df.copy()
df2 = ref_df.copy().drop(columns=["c"])
df3 = ref_df.copy().drop(columns=["a", "b"])
df1 = ref_df[0]
df1_copy = ref_df[1]
df2 = ref_df[2]
df3 = ref_df[3]

df1.iteritems = df1.items # pandas 2 compatibility
df1_copy.iteritems = df1_copy.items # pandas 2 compatibility
Expand All @@ -380,12 +396,13 @@ def test_intersect_columns_spark(spark_session, ref_df):


def test_intersect_columns_polars(ref_df):
df1 = ref_df
df2 = ref_df.copy().drop(columns=["c"])
df3 = ref_df.copy().drop(columns=["a", "b"])
df1 = ref_df[0]
df1_copy = ref_df[1]
df2 = ref_df[2]
df3 = ref_df[3]

pdf1 = pl.from_pandas(df1)
pdf1_copy = pl.from_pandas(df1.copy())
pdf1_copy = pl.from_pandas(df1_copy)
pdf2 = pl.from_pandas(df2)
pdf3 = pl.from_pandas(df3)

Expand All @@ -397,13 +414,14 @@ def test_intersect_columns_polars(ref_df):


def test_intersect_columns_duckdb(ref_df):
df1 = ref_df
df2 = ref_df.copy().drop(columns=["c"])
df3 = ref_df.copy().drop(columns=["a", "b"])
df1 = ref_df[0]
df1_copy = ref_df[1]
df2 = ref_df[2]
df3 = ref_df[3]

with duckdb.connect():
ddf1 = duckdb.from_df(df1)
ddf1_copy = duckdb.from_df(df1.copy())
ddf1_copy = duckdb.from_df(df1_copy)
ddf2 = duckdb.from_df(df2)
ddf3 = duckdb.from_df(df3)

Expand All @@ -412,3 +430,76 @@ def test_intersect_columns_duckdb(ref_df):
assert intersect_columns(ddf1, ddf3) == OrderedSet(["c"])
assert intersect_columns(ddf1_copy, ddf1) == OrderedSet(["a", "b", "c"])
assert intersect_columns(ddf3, ddf2) == OrderedSet()


def test_all_columns_match_native(ref_df):
df1 = ref_df[0]
df1_copy = ref_df[1]
df2 = ref_df[2]
df3 = ref_df[3]

assert all_columns_match(df1, df1_copy) is True
assert all_columns_match(df1, df2) is False
assert all_columns_match(df1, df3) is False
assert all_columns_match(df1_copy, df1) is True
assert all_columns_match(df3, df2) is False


def test_all_columns_match_spark(spark_session, ref_df):
df1 = ref_df[0]
df1_copy = ref_df[1]
df2 = ref_df[2]
df3 = ref_df[3]

df1.iteritems = df1.items # pandas 2 compatibility
df1_copy.iteritems = df1_copy.items # pandas 2 compatibility
df2.iteritems = df2.items # pandas 2 compatibility
df3.iteritems = df3.items # pandas 2 compatibility

df1 = spark_session.createDataFrame(df1)
df1_copy = spark_session.createDataFrame(df1_copy)
df2 = spark_session.createDataFrame(df2)
df3 = spark_session.createDataFrame(df3)

assert all_columns_match(df1, df1_copy) is True
assert all_columns_match(df1, df2) is False
assert all_columns_match(df1, df3) is False
assert all_columns_match(df1_copy, df1) is True
assert all_columns_match(df3, df2) is False


def test_all_columns_match_polars(ref_df):
df1 = ref_df[0]
df1_copy = ref_df[1]
df2 = ref_df[2]
df3 = ref_df[3]

df1 = pl.from_pandas(df1)
df1_copy = pl.from_pandas(df1_copy)
df2 = pl.from_pandas(df2)
df3 = pl.from_pandas(df3)

assert all_columns_match(df1, df1_copy) is True
assert all_columns_match(df1, df2) is False
assert all_columns_match(df1, df3) is False
assert all_columns_match(df1_copy, df1) is True
assert all_columns_match(df3, df2) is False


def test_all_columns_match_duckdb(ref_df):
df1 = ref_df[0]
df1_copy = ref_df[1]
df2 = ref_df[2]
df3 = ref_df[3]

with duckdb.connect():
df1 = duckdb.from_df(df1)
df1_copy = duckdb.from_df(df1_copy)
df2 = duckdb.from_df(df2)
df3 = duckdb.from_df(df3)

assert all_columns_match(df1, df1_copy) is True
assert all_columns_match(df1, df2) is False
assert all_columns_match(df1, df3) is False
assert all_columns_match(df1_copy, df1) is True
assert all_columns_match(df3, df2) is False