From 308bafa431f1476edb59e7270538bb9131800be1 Mon Sep 17 00:00:00 2001 From: Shivam Sanju Date: Sat, 25 Jun 2022 13:39:21 +0530 Subject: [PATCH 1/8] added documentation for using callback function in client.py --- .../how-to-guides/client-callback-function.md | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 docs/how-to-guides/client-callback-function.md diff --git a/docs/how-to-guides/client-callback-function.md b/docs/how-to-guides/client-callback-function.md new file mode 100644 index 000000000..ddf1f8d29 --- /dev/null +++ b/docs/how-to-guides/client-callback-function.md @@ -0,0 +1,43 @@ +--- +layout: default +title: How to use callback function in feathr client +parent: Feathr How-to Guides +--- + +# How to use callback function in feathr client + +This doc shows how to build feathr registry docker image locally and publish to registry + +## What is a callback function + +A callback function is a function that is sent to another function as an argument. It can be used to extend the function as per thr user needs. + +## How to use callback functions + +Currently these functions in feathr client support callbacks + +- get_online_features +- multi_get_online_features +- get_offline_features +- monitor_features +- materialize_features + +They accept two optional parameters named **callback** and **params**, where callback is of type function and params is a dictionary where user can pass the arguments for the callback function. + +An example on how to use it: + +```python +async def callback(params): + import httpx + async with httpx.AsyncClient() as client: + response = await client.post('https://some-endpoint', json = payload) + return response + +params = {"param1":"value1", "param2":"value2"} + +# inside the notebook +client = FeathrClient(config_path) +client.get_offline_features(observation_settings,feature_query,output_path, callback, params) + + +``` From abad7d4d6a3233d3d80aedf4b2015611a2dda5ed Mon Sep 17 00:00:00 2001 From: Shivam Sanju Date: Sat, 25 Jun 2022 15:05:17 +0530 Subject: [PATCH 2/8] corrected payload to params --- docs/how-to-guides/client-callback-function.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/how-to-guides/client-callback-function.md b/docs/how-to-guides/client-callback-function.md index ddf1f8d29..fb005fd83 100644 --- a/docs/how-to-guides/client-callback-function.md +++ b/docs/how-to-guides/client-callback-function.md @@ -30,7 +30,7 @@ An example on how to use it: async def callback(params): import httpx async with httpx.AsyncClient() as client: - response = await client.post('https://some-endpoint', json = payload) + response = await client.post('https://some-endpoint', json = params) return response params = {"param1":"value1", "param2":"value2"} From c41f0aaf88d183f2d02b441422510ae286a93ef0 Mon Sep 17 00:00:00 2001 From: Shivam Sanju Date: Sun, 26 Jun 2022 21:12:43 +0530 Subject: [PATCH 3/8] added asyncio to setup.py and requirements.txt --- feathr_project/docs/requirements.txt | 3 ++- feathr_project/setup.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/feathr_project/docs/requirements.txt b/feathr_project/docs/requirements.txt index eb4f05184..2bb0a9e42 100644 --- a/feathr_project/docs/requirements.txt +++ b/feathr_project/docs/requirements.txt @@ -14,4 +14,5 @@ google>=3.0.0 google-api-python-client>=2.41.0 azure-keyvault-secrets confluent-kafka -azure-core<=1.22.1 \ No newline at end of file +azure-core<=1.22.1 +asyncio \ No newline at end of file diff --git a/feathr_project/setup.py b/feathr_project/setup.py index 9791fee95..7dda80343 100644 --- a/feathr_project/setup.py +++ b/feathr_project/setup.py @@ -51,7 +51,8 @@ # https://github.com/Azure/azure-sdk-for-python/pull/22891 # using a version lower than that to workaround this issue "azure-core<=1.22.1", - "typing_extensions>=4.2.0" + "typing_extensions>=4.2.0", + "asyncio" ], tests_require=[ 'pytest', From bc6b9c324c95c9b95d65e1b699abd9a982d9b9ec Mon Sep 17 00:00:00 2001 From: Shivam Sanju Date: Mon, 27 Jun 2022 11:32:42 +0000 Subject: [PATCH 4/8] removed asyncio and fixed documentation --- docs/how-to-guides/client-callback-function.md | 9 ++------- feathr_project/docs/requirements.txt | 3 +-- feathr_project/setup.py | 1 - 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/docs/how-to-guides/client-callback-function.md b/docs/how-to-guides/client-callback-function.md index fb005fd83..dd37a622b 100644 --- a/docs/how-to-guides/client-callback-function.md +++ b/docs/how-to-guides/client-callback-function.md @@ -4,17 +4,13 @@ title: How to use callback function in feathr client parent: Feathr How-to Guides --- -# How to use callback function in feathr client - -This doc shows how to build feathr registry docker image locally and publish to registry - ## What is a callback function -A callback function is a function that is sent to another function as an argument. It can be used to extend the function as per thr user needs. +A callback function is a function that is sent to another function as an argument. It can be used to extend the function as per the user needs. ## How to use callback functions -Currently these functions in feathr client support callbacks +Currently below functions in feathr client support callback - get_online_features - multi_get_online_features @@ -39,5 +35,4 @@ params = {"param1":"value1", "param2":"value2"} client = FeathrClient(config_path) client.get_offline_features(observation_settings,feature_query,output_path, callback, params) - ``` diff --git a/feathr_project/docs/requirements.txt b/feathr_project/docs/requirements.txt index 2bb0a9e42..eb4f05184 100644 --- a/feathr_project/docs/requirements.txt +++ b/feathr_project/docs/requirements.txt @@ -14,5 +14,4 @@ google>=3.0.0 google-api-python-client>=2.41.0 azure-keyvault-secrets confluent-kafka -azure-core<=1.22.1 -asyncio \ No newline at end of file +azure-core<=1.22.1 \ No newline at end of file diff --git a/feathr_project/setup.py b/feathr_project/setup.py index 7dda80343..dd940cada 100644 --- a/feathr_project/setup.py +++ b/feathr_project/setup.py @@ -52,7 +52,6 @@ # using a version lower than that to workaround this issue "azure-core<=1.22.1", "typing_extensions>=4.2.0", - "asyncio" ], tests_require=[ 'pytest', From e5cd8375f51fb2bcce9de7ba17ac53d196dc2043 Mon Sep 17 00:00:00 2001 From: Shivam Sanju Date: Mon, 27 Jun 2022 18:39:44 +0530 Subject: [PATCH 5/8] fixed docs and added tests --- .../how-to-guides/client-callback-function.md | 11 +- feathr_project/docs/requirements.txt | 2 +- feathr_project/setup.py | 2 +- feathr_project/test/test_client_callback.py | 124 ++++++++++++++++++ 4 files changed, 130 insertions(+), 9 deletions(-) create mode 100644 feathr_project/test/test_client_callback.py diff --git a/docs/how-to-guides/client-callback-function.md b/docs/how-to-guides/client-callback-function.md index fb005fd83..e213171e9 100644 --- a/docs/how-to-guides/client-callback-function.md +++ b/docs/how-to-guides/client-callback-function.md @@ -4,17 +4,13 @@ title: How to use callback function in feathr client parent: Feathr How-to Guides --- -# How to use callback function in feathr client - -This doc shows how to build feathr registry docker image locally and publish to registry - ## What is a callback function -A callback function is a function that is sent to another function as an argument. It can be used to extend the function as per thr user needs. +A callback function is a function that is sent to another function as an argument. It can be used to extend the function as per the user needs. ## How to use callback functions -Currently these functions in feathr client support callbacks +Currently the below functions in feathr client support passing a callback as an argument: - get_online_features - multi_get_online_features @@ -22,7 +18,8 @@ Currently these functions in feathr client support callbacks - monitor_features - materialize_features -They accept two optional parameters named **callback** and **params**, where callback is of type function and params is a dictionary where user can pass the arguments for the callback function. +These functions accept two optional parameters named **callback** and **params**. +callback is of type function and params is a dictionary where user can pass the arguments for the callback function. An example on how to use it: diff --git a/feathr_project/docs/requirements.txt b/feathr_project/docs/requirements.txt index 2bb0a9e42..fbdfd2def 100644 --- a/feathr_project/docs/requirements.txt +++ b/feathr_project/docs/requirements.txt @@ -15,4 +15,4 @@ google-api-python-client>=2.41.0 azure-keyvault-secrets confluent-kafka azure-core<=1.22.1 -asyncio \ No newline at end of file +mock \ No newline at end of file diff --git a/feathr_project/setup.py b/feathr_project/setup.py index 7dda80343..8b7eadc7b 100644 --- a/feathr_project/setup.py +++ b/feathr_project/setup.py @@ -52,7 +52,7 @@ # using a version lower than that to workaround this issue "azure-core<=1.22.1", "typing_extensions>=4.2.0", - "asyncio" + "mock" ], tests_require=[ 'pytest', diff --git a/feathr_project/test/test_client_callback.py b/feathr_project/test/test_client_callback.py new file mode 100644 index 000000000..e8eed77b2 --- /dev/null +++ b/feathr_project/test/test_client_callback.py @@ -0,0 +1,124 @@ +import os +import asyncio +import mock +import time +from subprocess import call +from datetime import datetime, timedelta + +from pathlib import Path +from feathr import ValueType +from feathr import FeatureQuery +from feathr import ObservationSettings +from feathr import TypedKey +from test_fixture import basic_test_setup +from test_fixture import get_online_test_table_name +from feathr.definition._materialization_utils import _to_materialization_config +from feathr import (BackfillTime, MaterializationSettings) +from feathr import (BackfillTime, MaterializationSettings, FeatureQuery, + ObservationSettings, SparkExecutionConfiguration) +from feathr import RedisSink, HdfsSink + + +params = {"wait" : 0.1} +async def sample_callback(params): + print(params) + await asyncio.sleep(0.1) + +callback = mock.MagicMock(return_value=sample_callback(params)) + +def test_client_callback_offline_feature(): + test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace" + client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + + location_id = TypedKey(key_column="DOLocationID", + key_column_type=ValueType.INT32, + description="location id in NYC", + full_name="nyc_taxi.location_id") + feature_query = FeatureQuery(feature_list=["f_location_avg_fare"], key=location_id) + + settings = ObservationSettings( + observation_path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", + event_timestamp_column="lpep_dropoff_datetime", + timestamp_format="yyyy-MM-dd HH:mm:ss") + + now = datetime.now() + output_path = ''.join(['dbfs:/feathrazure_cijob','_', str(now.minute), '_', str(now.second), ".avro"]) + + res = client.get_offline_features(observation_settings=settings, + feature_query=feature_query, + output_path=output_path, + callback=callback, + params=params) + callback.assert_called_with(params) + + +def test_client_callback_materialization(): + online_test_table = get_online_test_table_name("nycTaxiCITable") + test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace" + + client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) + redisSink = RedisSink(table_name=online_test_table) + settings = MaterializationSettings("nycTaxiTable", + sinks=[redisSink], + feature_names=[ + "f_location_avg_fare", "f_location_max_fare"], + backfill_time=backfill_time) + client.materialize_features(settings, callback=callback, params=params) + callback.assert_called_with(params) + +def test_client_callback_monitor_features(): + online_test_table = get_online_test_table_name("nycTaxiCITable") + test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace" + + client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) + redisSink = RedisSink(table_name=online_test_table) + settings = MaterializationSettings("nycTaxiTable", + sinks=[redisSink], + feature_names=[ + "f_location_avg_fare", "f_location_max_fare"], + backfill_time=backfill_time) + client.monitor_features(settings, callback=callback, params=params) + callback.assert_called_with(params) + +def test_client_callback_get_online_features(): + online_test_table = get_online_test_table_name("nycTaxiCITable") + test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace" + + client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) + redisSink = RedisSink(table_name=online_test_table) + settings = MaterializationSettings("nycTaxiTable", + sinks=[redisSink], + feature_names=[ + "f_location_avg_fare", "f_location_max_fare"], + backfill_time=backfill_time) + client.materialize_features(settings) + callback.assert_called_with(params) + client.wait_job_to_finish(timeout_sec=900) + # wait for a few secs for the data to come in redis + time.sleep(5) + client.get_online_features('nycTaxiDemoFeature', '265', ['f_location_avg_fare', 'f_location_max_fare'], callback=callback, params=params) + callback.assert_called_with(params) + + +def test_client_callback_multi_get_online_features(): + online_test_table = get_online_test_table_name("nycTaxiCITable") + test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace" + + client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) + redisSink = RedisSink(table_name=online_test_table) + settings = MaterializationSettings("nycTaxiTable", + sinks=[redisSink], + feature_names=[ + "f_location_avg_fare", "f_location_max_fare"], + backfill_time=backfill_time) + client.materialize_features(settings) + callback.assert_called_with(params) + client.wait_job_to_finish(timeout_sec=900) + # wait for a few secs for the data to come in redis + time.sleep(5) + client.multi_get_online_features('nycTaxiDemoFeature', ["239", "265"], ['f_location_avg_fare', 'f_location_max_fare'], callback=callback, params=params) + callback.assert_called_with(params) \ No newline at end of file From 525e15b52b03546bd0c91f1846107bf447e8acca Mon Sep 17 00:00:00 2001 From: Shivam Sanju Date: Mon, 27 Jun 2022 18:47:05 +0530 Subject: [PATCH 6/8] fixed docs example code --- docs/how-to-guides/client-callback-function.md | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/docs/how-to-guides/client-callback-function.md b/docs/how-to-guides/client-callback-function.md index 945d6c0aa..1d87fa57d 100644 --- a/docs/how-to-guides/client-callback-function.md +++ b/docs/how-to-guides/client-callback-function.md @@ -24,16 +24,17 @@ callback is of type function and params is a dictionary where user can pass the An example on how to use it: ```python -async def callback(params): - import httpx - async with httpx.AsyncClient() as client: - response = await client.post('https://some-endpoint', json = params) - return response +# inside notebook +client = FeathrClient(config_path) +client.get_offline_features(observation_settings,feature_query,output_path, callback, params) +# users can define their own callback function and params params = {"param1":"value1", "param2":"value2"} -# inside the notebook -client = FeathrClient(config_path) -client.get_offline_features(observation_settings,feature_query,output_path, callback, params) +async def callback(params): + import httpx + async with httpx.AsyncClient() as requestHandler: + response = await requestHandler.post('https://some-endpoint', json = params) + return response ``` From f39e07c8d223174afefdff5057b36eaeddaa4eb2 Mon Sep 17 00:00:00 2001 From: Shivam Sanju Date: Mon, 27 Jun 2022 18:51:43 +0530 Subject: [PATCH 7/8] moved mock to tests_require in setup.py --- feathr_project/docs/requirements.txt | 4 ---- feathr_project/setup.py | 7 ++----- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/feathr_project/docs/requirements.txt b/feathr_project/docs/requirements.txt index 5ffc612a7..05db397e7 100644 --- a/feathr_project/docs/requirements.txt +++ b/feathr_project/docs/requirements.txt @@ -14,9 +14,5 @@ google>=3.0.0 google-api-python-client>=2.41.0 azure-keyvault-secrets confluent-kafka -<<<<<<< HEAD azure-core<=1.22.1 mock -======= -azure-core<=1.22.1 ->>>>>>> bc6b9c324c95c9b95d65e1b699abd9a982d9b9ec diff --git a/feathr_project/setup.py b/feathr_project/setup.py index f72ba234b..d22d7402a 100644 --- a/feathr_project/setup.py +++ b/feathr_project/setup.py @@ -51,14 +51,11 @@ # https://github.com/Azure/azure-sdk-for-python/pull/22891 # using a version lower than that to workaround this issue "azure-core<=1.22.1", - "typing_extensions>=4.2.0", -<<<<<<< HEAD - "mock" -======= ->>>>>>> bc6b9c324c95c9b95d65e1b699abd9a982d9b9ec + "typing_extensions>=4.2.0" ], tests_require=[ 'pytest', + 'mock' ], entry_points={ 'console_scripts': ['feathr=feathrcli.cli:cli'] From e20914bac6fe94b1869900d85653dccba830c3c9 Mon Sep 17 00:00:00 2001 From: Shivam Sanju Date: Mon, 27 Jun 2022 19:19:42 +0530 Subject: [PATCH 8/8] changed mock to unittest.mock --- feathr_project/docs/requirements.txt | 3 +-- feathr_project/setup.py | 1 - feathr_project/test/test_client_callback.py | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/feathr_project/docs/requirements.txt b/feathr_project/docs/requirements.txt index 05db397e7..eb4f05184 100644 --- a/feathr_project/docs/requirements.txt +++ b/feathr_project/docs/requirements.txt @@ -14,5 +14,4 @@ google>=3.0.0 google-api-python-client>=2.41.0 azure-keyvault-secrets confluent-kafka -azure-core<=1.22.1 -mock +azure-core<=1.22.1 \ No newline at end of file diff --git a/feathr_project/setup.py b/feathr_project/setup.py index d22d7402a..9791fee95 100644 --- a/feathr_project/setup.py +++ b/feathr_project/setup.py @@ -55,7 +55,6 @@ ], tests_require=[ 'pytest', - 'mock' ], entry_points={ 'console_scripts': ['feathr=feathrcli.cli:cli'] diff --git a/feathr_project/test/test_client_callback.py b/feathr_project/test/test_client_callback.py index e8eed77b2..b3b543426 100644 --- a/feathr_project/test/test_client_callback.py +++ b/feathr_project/test/test_client_callback.py @@ -1,6 +1,6 @@ import os import asyncio -import mock +import unittest.mock as mock import time from subprocess import call from datetime import datetime, timedelta