From 337e904141ad85916ede62e9bf0fd127d0a7521f Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 10 Apr 2024 11:03:39 -0700 Subject: [PATCH 1/7] improve Categorify inference testing --- tests/unit/ops/test_categorify.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/unit/ops/test_categorify.py b/tests/unit/ops/test_categorify.py index 41a69ef346..28c98854a2 100644 --- a/tests/unit/ops/test_categorify.py +++ b/tests/unit/ops/test_categorify.py @@ -734,3 +734,8 @@ def test_categorify_inference(): output_tensors = inference_op.transform(cats.input_columns, input_tensors) for key in input_tensors: assert output_tensors[key].dtype == np.dtype("int64") + + # Check results are consistent with python code path + expect = workflow.transform(df) + got = pd.DataFrame(output_tensors) + assert_eq(expect, got) From a92f0b97c9d89674a7cd8c56c56c3b76bcb9fc65 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 11 Apr 2024 07:58:05 -0700 Subject: [PATCH 2/7] fix module name --- cpp/nvtabular/inference/categorify.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/nvtabular/inference/categorify.cc b/cpp/nvtabular/inference/categorify.cc index e9b50c0cdd..603a39e33b 100644 --- a/cpp/nvtabular/inference/categorify.cc +++ b/cpp/nvtabular/inference/categorify.cc @@ -337,12 +337,12 @@ namespace nvtabular // this operator currently only supports CPU arrays .def_property_readonly("supports", [](py::object self) { - py::object supports = py::module_::import("nvtabular").attr("graph").attr("base_operator").attr("Supports"); + py::object supports = py::module_::import("nvtabular").attr("graph").attr("operator").attr("Supports"); return supports.attr("CPU_DICT_ARRAY"); }) .def_property_readonly("supported_formats", [](py::object self) { - py::object supported = py::module_::import("nvtabular").attr("graph").attr("base_operator").attr("DataFormats"); + py::object supported = py::module_::import("nvtabular").attr("graph").attr("operator").attr("DataFormats"); return supported.attr("NUMPY_DICT_ARRAY"); }); } From f091625deca234453e01c96a5a47cb205c15fb5c Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 11 Apr 2024 08:49:43 -0700 Subject: [PATCH 3/7] avoid TENSOR_TABLE --- nvtabular/ops/groupby.py | 3 ++- nvtabular/ops/operator.py | 6 +++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/nvtabular/ops/groupby.py b/nvtabular/ops/groupby.py index da343a5608..1b6084b15e 100644 --- a/nvtabular/ops/groupby.py +++ b/nvtabular/ops/groupby.py @@ -20,7 +20,7 @@ from merlin.core.dispatch import DataFrameType, annotate from merlin.dtypes.shape import DefaultShapes from merlin.schema import Schema -from nvtabular.ops.operator import ColumnSelector, Operator +from nvtabular.ops.operator import ColumnSelector, DataFormats, Operator class Groupby(Operator): @@ -109,6 +109,7 @@ def __init__( self.list_aggs[col] = list(_list_aggs) self.name_sep = name_sep + self.supported_formats = DataFormats.PANDAS_DATAFRAME | DataFormats.CUDF_DATAFRAME super().__init__() @annotate("Groupby_op", color="darkgreen", domain="nvt_python") diff --git a/nvtabular/ops/operator.py b/nvtabular/ops/operator.py index 0757557b12..41b3621643 100644 --- a/nvtabular/ops/operator.py +++ b/nvtabular/ops/operator.py @@ -13,6 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from merlin.dag import BaseOperator, ColumnSelector # noqa pylint: disable=unused-import +from merlin.dag import ( # noqa pylint: disable=unused-import + BaseOperator, + ColumnSelector, + DataFormats, +) Operator = BaseOperator From f4c9de32e9720fe7043fcaf5c17bb21b111dff80 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 11 Apr 2024 09:10:21 -0700 Subject: [PATCH 4/7] fix previous commit --- nvtabular/ops/groupby.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/nvtabular/ops/groupby.py b/nvtabular/ops/groupby.py index 1b6084b15e..93748e4f19 100644 --- a/nvtabular/ops/groupby.py +++ b/nvtabular/ops/groupby.py @@ -109,9 +109,12 @@ def __init__( self.list_aggs[col] = list(_list_aggs) self.name_sep = name_sep - self.supported_formats = DataFormats.PANDAS_DATAFRAME | DataFormats.CUDF_DATAFRAME super().__init__() + @property + def supported_formats(self): + return DataFormats.PANDAS_DATAFRAME | DataFormats.CUDF_DATAFRAME + @annotate("Groupby_op", color="darkgreen", domain="nvt_python") def transform(self, col_selector: ColumnSelector, df: DataFrameType) -> DataFrameType: # Sort if necessary From 700b1966fd23d9f730b7eaef9991707885d190a3 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 17 Apr 2024 10:42:52 -0700 Subject: [PATCH 5/7] opt out of tensor-table support by default --- nvtabular/ops/groupby.py | 6 +----- nvtabular/ops/operator.py | 7 ++++++- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/nvtabular/ops/groupby.py b/nvtabular/ops/groupby.py index 93748e4f19..da343a5608 100644 --- a/nvtabular/ops/groupby.py +++ b/nvtabular/ops/groupby.py @@ -20,7 +20,7 @@ from merlin.core.dispatch import DataFrameType, annotate from merlin.dtypes.shape import DefaultShapes from merlin.schema import Schema -from nvtabular.ops.operator import ColumnSelector, DataFormats, Operator +from nvtabular.ops.operator import ColumnSelector, Operator class Groupby(Operator): @@ -111,10 +111,6 @@ def __init__( self.name_sep = name_sep super().__init__() - @property - def supported_formats(self): - return DataFormats.PANDAS_DATAFRAME | DataFormats.CUDF_DATAFRAME - @annotate("Groupby_op", color="darkgreen", domain="nvt_python") def transform(self, col_selector: ColumnSelector, df: DataFrameType) -> DataFrameType: # Sort if necessary diff --git a/nvtabular/ops/operator.py b/nvtabular/ops/operator.py index 41b3621643..a0aa99ab6a 100644 --- a/nvtabular/ops/operator.py +++ b/nvtabular/ops/operator.py @@ -19,4 +19,9 @@ DataFormats, ) -Operator = BaseOperator + +# Avoid TENSOR_TABLE by default (for now) +class Operator(BaseOperator): + @property + def supported_formats(self): + return DataFormats.PANDAS_DATAFRAME | DataFormats.CUDF_DATAFRAME From e22a836a2b469cd306e902ca9469a03da254aeae Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 17 Apr 2024 13:06:09 -0700 Subject: [PATCH 6/7] simplify sort_values call --- nvtabular/ops/categorify.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/nvtabular/ops/categorify.py b/nvtabular/ops/categorify.py index 556e2a005a..cf5a486b54 100644 --- a/nvtabular/ops/categorify.py +++ b/nvtabular/ops/categorify.py @@ -1251,13 +1251,7 @@ def _drop_first_row(part, index): if has_size: # Avoid using dask_cudf to calculate divisions # (since it may produce too-few partitions) - df = df.sort_values( - name_size, - ascending=False, - divisions=dd.shuffle._calculate_divisions( - df, df[name_size], False, df.npartitions - )[0][::-1], - ) + df = df.sort_values(name_size, ascending=False) unique_path = _save_encodings( df, From c83d679b7e9c87b048d40600a68caf4889a21be6 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 17 Apr 2024 13:47:45 -0700 Subject: [PATCH 7/7] avoid using p2p shuffle - sorting used to be unstable --- nvtabular/ops/categorify.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nvtabular/ops/categorify.py b/nvtabular/ops/categorify.py index cf5a486b54..4ebc878621 100644 --- a/nvtabular/ops/categorify.py +++ b/nvtabular/ops/categorify.py @@ -28,6 +28,7 @@ import pandas as pd import pyarrow as pa import pyarrow.dataset as pa_ds +from dask import config from dask.base import tokenize from dask.blockwise import BlockIndex from dask.core import flatten @@ -1251,7 +1252,8 @@ def _drop_first_row(part, index): if has_size: # Avoid using dask_cudf to calculate divisions # (since it may produce too-few partitions) - df = df.sort_values(name_size, ascending=False) + with config.set({"dataframe.shuffle.method": "tasks"}): + df = df.sort_values(name_size, ascending=False) unique_path = _save_encodings( df,