diff --git a/docs/README.md b/docs/README.md index 6e26207b7..2d2d07d0c 100644 --- a/docs/README.md +++ b/docs/README.md @@ -28,7 +28,6 @@ Feathr automatically computes your feature values and joins them to your trainin - **Native cloud integration** with simplified and scalable architecture, which is illustrated in the next section. - **Feature sharing and reuse made easy:** Feathr has built-in feature registry so that features can be easily shared across different teams and boost team productivity. - ## Running Feathr on Azure with 3 Simple Steps Feathr has native cloud integration. To use Feathr on Azure, you only need three steps: @@ -50,7 +49,7 @@ Feathr has native cloud integration. To use Feathr on Azure, you only need three If you are not using the above Jupyter Notebook and want to install Feathr client locally, use this: ```bash -pip install -U feathr +pip install feathr ``` Or use the latest code from GitHub: @@ -126,31 +125,30 @@ Read the [Streaming Source Ingestion Guide](https://linkedin.github.io/feathr/ho Read [Point-in-time Correctness and Point-in-time Join in Feathr](https://linkedin.github.io/feathr/concepts/point-in-time-join.html) for more details. - ## Running Feathr Examples -Follow the [quick start Jupyter Notebook](./feathr_project/feathrcli/data/feathr_user_workspace/product_recommendation_demo.ipynb) to try it out. There is also a companion [quick start guide](https://linkedin.github.io/feathr/quickstart.html) containing a bit more explanation on the notebook. - +Follow the [quick start Jupyter Notebook](https://github.com/linkedin/feathr/blob/main/feathr_project/feathrcli/data/feathr_user_workspace/product_recommendation_demo.ipynb) to try it out. +There is also a companion [quick start guide](https://linkedin.github.io/feathr/quickstart_synapse.html) containing a bit more explanation on the notebook. ## Cloud Architecture Feathr has native integration with Azure and other cloud services, and here's the high-level architecture to help you get started. ![Architecture](images/architecture.png) -# Next Steps +## Next Steps -## Quickstart +### Quickstart - [Quickstart for Azure Synapse](quickstart_synapse.md) -## Concepts +### Concepts - [Feature Definition](concepts/feature-definition.md) - [Feature Generation](concepts/feature-generation.md) - [Feature Join](concepts/feature-join.md) - [Point-in-time Correctness](concepts/point-in-time-join.md) -## How-to-guides +### How-to-guides - [Azure Deployment](how-to-guides/azure-deployment.md) - [Local Feature Testing](how-to-guides/local-feature-testing.md) @@ -159,4 +157,5 @@ Feathr has native integration with Azure and other cloud services, and here's th - [Feathr Job Configuration](how-to-guides/feathr-job-configuration.md) ## API Documentation + - [Python API Documentation](https://feathr.readthedocs.io/en/latest/) diff --git a/docs/concepts/feature-generation.md b/docs/concepts/feature-generation.md index 297a1d6d4..db3b1c3ae 100644 --- a/docs/concepts/feature-generation.md +++ b/docs/concepts/feature-generation.md @@ -1,16 +1,19 @@ --- layout: default -title: Feathr Feature Generation +title: Feature Generation and Materialization parent: Feathr Concepts --- -# Feature Generation -Feature generation is the process to create features from raw source data into a certain persisted storage. +# Feature Generation and Materialization -User could utilize feature generation to pre-compute and materialize pre-defined features to online and/or offline storage. This is desirable when the feature transformation is computation intensive or when the features can be reused(usually in offline setting). Feature generation is also useful in generating embedding features. Embedding distill information from large data and it is usually more compact. +Feature generation (also known as feature materialization) is the process to create features from raw source data into a certain persisted storage in either offline store (for further reuse), or online store (for online inference). + +User can utilize feature generation to pre-compute and materialize pre-defined features to online and/or offline storage. This is desirable when the feature transformation is computation intensive or when the features can be reused (usually in offline setting). Feature generation is also useful in generating embedding features, where those embeddings distill information from large data and is usually more compact. ## Generating Features to Online Store -When we need to serve the models online, we also need to serve the features online. We provide APIs to generate features to online storage for future consumption. For example: + +When the models are served in an online environment, we also need to serve the corresponding features in the same online environment as well. Feathr provides APIs to generate features to online storage for future consumption. For example: + ```python client = FeathrClient() redisSink = RedisSink(table_name="nycTaxiDemoFeature") @@ -21,12 +24,16 @@ settings = MaterializationSettings("nycTaxiMaterializationJob", client.materialize_features(settings) ``` -([MaterializationSettings API doc](https://feathr.readthedocs.io/en/latest/feathr.html#feathr.MaterializationSettings), -[RedisSink API doc](https://feathr.readthedocs.io/en/latest/feathr.html#feathr.RedisSink) +More reference on the APIs: + +- [MaterializationSettings API doc](https://feathr.readthedocs.io/en/latest/feathr.html#feathr.MaterializationSettings) +- [RedisSink API doc](https://feathr.readthedocs.io/en/latest/feathr.html#feathr.RedisSink) In the above example, we define a Redis table called `nycTaxiDemoFeature` and materialize two features called `f_location_avg_fare` and `f_location_max_fare` to Redis. -It is also possible to backfill the features for a previous time range, like below. If the `BackfillTime` part is not specified, it's by default to `now()` (i.e. if not specified, it's equivilant to `BackfillTime(start=now, end=now, step=timedelta(days=1))`). +## Feature Backfill + +It is also possible to backfill the features for a particular time range, like below. If the `BackfillTime` part is not specified, it's by default to `now()` (i.e. if not specified, it's equivalent to `BackfillTime(start=now, end=now, step=timedelta(days=1))`). ```python client = FeathrClient() @@ -39,29 +46,34 @@ settings = MaterializationSettings("nycTaxiMaterializationJob", client.materialize_features(settings) ``` -([BackfillTime API doc](https://feathr.readthedocs.io/en/latest/feathr.html#feathr.BackfillTime), -[client.materialize_features() API doc](https://feathr.readthedocs.io/en/latest/feathr.html#feathr.FeathrClient.materialize_features)) +Note that if you don't have features available in `now`, you'd better specify a `BackfillTime` range where you have features. -## Consuming the online features +Also, Feathr will submit a materialization job for each of the step for performance reasons. I.e. if you have +`BackfillTime(start=datetime(2022, 2, 1), end=datetime(2022, 2, 20), step=timedelta(days=1))`, Feathr will submit 20 jobs to run in parallel for maximum performance. -```python -client.wait_job_to_finish(timeout_sec=600) +More reference on the APIs: + +- [BackfillTime API doc](https://feathr.readthedocs.io/en/latest/feathr.html#feathr.BackfillTime) +- [client.materialize_features() API doc](https://feathr.readthedocs.io/en/latest/feathr.html#feathr.FeathrClient.materialize_features) -res = client.get_online_features('nycTaxiDemoFeature', '265', [ - 'f_location_avg_fare', 'f_location_max_fare']) + + +## Consuming features in online environment + +After the materialization job is finished, we can get the online features by querying the `feature table`, corresponding `entity key` and a list of `feature names`. In the example below, we query the online features called `f_location_avg_fare` and `f_location_max_fare`, and query with a key `265` (which is the location ID). + +```python +res = client.get_online_features('nycTaxiDemoFeature', '265', ['f_location_avg_fare', 'f_location_max_fare']) ``` -([client.get_online_features API doc](https://feathr.readthedocs.io/en/latest/feathr.html#feathr.FeathrClient.get_online_features)) +More reference on the APIs: +- [client.get_online_features API doc](https://feathr.readthedocs.io/en/latest/feathr.html#feathr.FeathrClient.get_online_features) -After we finish running the materialization job, we can get the online features by querying the feature name, with the -corresponding keys. In the example above, we query the online features called `f_location_avg_fare` and -`f_location_max_fare`, and query with a key `265` (which is the location ID). +## Materializing Features to Offline Store -## Generating Features to Offline Store +This is useful when the feature transformation is compute intensive and features can be re-used. For example, you have a feature that needs more than 24 hours to compute and the feature can be reused by more than one model training pipeline. In this case, you should consider generating features to offline. -This is a useful when the feature transformation is computation intensive and features can be re-used. For example, you -have a feature that needs more than 24 hours to compute and the feature can be reused by more than one model training -pipeline. In this case, you should consider generate features to offline. Here is an API example: +The API call is very similar to materializing features to online store, and here is an API example: ```python client = FeathrClient() @@ -73,14 +85,14 @@ settings = MaterializationSettings("nycTaxiMaterializationJob", client.materialize_features(settings) ``` -This will generate features on latest date(assuming it's `2022/05/21`) and output data to the following path: +This will generate features on latest date(assuming it's `2022/05/21`) and output data to the following path: `abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/materialize_offline_test_data/df0/daily/2022/05/21` -You can also specify a BackfillTime so the features will be generated for those dates. For example: +You can also specify a `BackfillTime` so the features will be generated only for those dates. For example: ```Python backfill_time = BackfillTime(start=datetime( - 2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) + 2020, 5, 10), end=datetime(2020, 5, 20), step=timedelta(days=1)) offline_sink = HdfsSink(output_path="abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/materialize_offline_test_data/") settings = MaterializationSettings("nycTaxiTable", sinks=[offline_sink], @@ -89,8 +101,32 @@ settings = MaterializationSettings("nycTaxiTable", backfill_time=backfill_time) ``` -This will generate features only for 2020/05/20 for me and it will be in folder: -`abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/materialize_offline_test_data/df0/daily/2020/05/20` +This will generate features from `2020/05/10` to `2020/05/20` and the output will have 11 folders, from +`abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/materialize_offline_test_data/df0/daily/2020/05/10` to `abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/materialize_offline_test_data/df0/daily/2020/05/20`. Note that currently Feathr only supports materializing data in daily step (i.e. even if you specify an hourly step, the generated features in offline store will still be presented in a daily hierarchy). + +You can also specify the format of the materialized features in the offline store by using `execution_configurations` like below. Please refer to the [documentation](../how-to-guides/feathr-job-configuration.md) here for those configuration details. + +```python + +from feathr import HdfsSink +offlineSink = HdfsSink(output_path="abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/materialize_offline_data/") +# Materialize two features into a Offline store. +settings = MaterializationSettings("nycTaxiMaterializationJob", + sinks=[offlineSink], + feature_names=["f_location_avg_fare", "f_location_max_fare"]) +client.materialize_features(settings, execution_configurations={ "spark.feathr.outputFormat": "parquet"}) + +``` + +For reading those materialized features, Feathr has a convenient helper function called `get_result_df` to help you view the data. For example, you can use the sample code below to read from the materialized result in offline store: + +```python + +path = "abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/materialize_offline_test_data/df0/daily/2020/05/20/" +res = get_result_df(client=client, format="parquet", res_url=path) +``` + +More reference on the APIs: -([MaterializationSettings API doc](https://feathr.readthedocs.io/en/latest/feathr.html#feathr.MaterializationSettings), -[HdfsSink API doc](https://feathr.readthedocs.io/en/latest/feathr.html#feathr.HdfsSink)) +- [MaterializationSettings API](https://feathr.readthedocs.io/en/latest/feathr.html#feathr.MaterializationSettings) +- [HdfsSink API](https://feathr.readthedocs.io/en/latest/feathr.html#feathr.HdfsSource) diff --git a/docs/how-to-guides/azure_resource_provision.json b/docs/how-to-guides/azure_resource_provision.json index 44dff8b16..c4891b93a 100644 --- a/docs/how-to-guides/azure_resource_provision.json +++ b/docs/how-to-guides/azure_resource_provision.json @@ -40,6 +40,52 @@ "description": "Whether or not to deploy eventhub provision script" } }, + "databaseServerName": { + "type": "string", + "defaultValue": "[concat('server-', uniqueString(resourceGroup().id, deployment().name))]", + "metadata": { + "description": "Specifies the name for the SQL server" + } + }, + "databaseName": { + "type": "string", + "defaultValue": "[concat('db-', uniqueString(resourceGroup().id, deployment().name), '-1')]", + "metadata": { + "description": "Specifies the name for the SQL database under the SQL server" + } + }, + "location": { + "type": "string", + "defaultValue": "[resourceGroup().location]", + "metadata": { + "description": "Specifies the location for server and database" + } + }, + "adminUser": { + "type": "string", + "metadata": { + "description": "Specifies the username for admin" + } + }, + "adminPassword": { + "type": "securestring", + "metadata": { + "description": "Specifies the password for admin" + } + }, + "storageAccountKey": { + "type": "string", + "metadata": { + "description": "Specifies the key of the storage account where the BACPAC file is stored." + } + }, + "bacpacUrl": { + "type": "string", + "defaultValue": "https://azurefeathrstorage.blob.core.windows.net/public/feathr-registry-schema.bacpac", + "metadata": { + "description": "This is the pre-created BACPAC file that contains required schemas by the registry server." + } + }, "dockerImage": { "defaultValue": "blrchen/feathr-sql-registry", "type": "String", @@ -393,6 +439,59 @@ "principalId": "[parameters('principalId')]", "scope": "[resourceGroup().id]" } + }, + { + "type": "Microsoft.Sql/servers", + "apiVersion": "2021-11-01-preview", + "name": "[parameters('databaseServerName')]", + "location": "[parameters('location')]", + "properties": { + "administratorLogin": "[parameters('adminUser')]", + "administratorLoginPassword": "[parameters('adminPassword')]", + "version": "12.0" + }, + "resources": [ + { + "type": "firewallrules", + "apiVersion": "2021-11-01-preview", + "name": "AllowAllAzureIps", + "location": "[parameters('location')]", + "dependsOn": [ + "[parameters('databaseServerName')]" + ], + "properties": { + "startIpAddress": "0.0.0.0", + "endIpAddress": "0.0.0.0" + } + } + ] + }, + { + "type": "Microsoft.Sql/servers/databases", + "apiVersion": "2021-11-01-preview", + "name": "[concat(string(parameters('databaseServerName')), '/', string(parameters('databaseName')))]", + "location": "[parameters('location')]", + "dependsOn": [ + "[concat('Microsoft.Sql/servers/', parameters('databaseServerName'))]" + ], + "resources": [ + { + "type": "extensions", + "apiVersion": "2021-11-01-preview", + "name": "Import", + "dependsOn": [ + "[resourceId('Microsoft.Sql/servers/databases', parameters('databaseServerName'), parameters('databaseName'))]" + ], + "properties": { + "storageKeyType": "StorageAccessKey", + "storageKey": "[parameters('storageAccountKey')]", + "storageUri": "[parameters('bacpacUrl')]", + "administratorLogin": "[parameters('adminUser')]", + "administratorLoginPassword": "[parameters('adminPassword')]", + "operationMode": "Import" + } + } + ] } ], "outputs": {} diff --git a/docs/how-to-guides/client-callback-function.md b/docs/how-to-guides/client-callback-function.md index 1d87fa57d..e7bfca830 100644 --- a/docs/how-to-guides/client-callback-function.md +++ b/docs/how-to-guides/client-callback-function.md @@ -10,7 +10,13 @@ A callback function is a function that is sent to another function as an argumen ## How to use callback functions -Currently the below functions in feathr client support passing a callback as an argument: +We can pass a callback function when initializing the feathr client. + +```python +client = FeathrClient(config_path, callback) +``` + +The below functions accept an optional parameters named **params**. params is a dictionary where user can pass the arguments for the callback function. - get_online_features - multi_get_online_features @@ -18,19 +24,15 @@ Currently the below functions in feathr client support passing a callback as an - monitor_features - materialize_features -These functions accept two optional parameters named **callback** and **params**. -callback is of type function and params is a dictionary where user can pass the arguments for the callback function. - An example on how to use it: ```python # inside notebook -client = FeathrClient(config_path) -client.get_offline_features(observation_settings,feature_query,output_path, callback, params) - -# users can define their own callback function and params +client = FeathrClient(config_path, callback) params = {"param1":"value1", "param2":"value2"} +client.get_offline_features(observation_settings,feature_query,output_path, params) +# users can define their own callback function async def callback(params): import httpx async with httpx.AsyncClient() as requestHandler: diff --git a/docs/how-to-guides/feathr-job-configuration.md b/docs/how-to-guides/feathr-job-configuration.md index e63781d7c..406fdcb21 100644 --- a/docs/how-to-guides/feathr-job-configuration.md +++ b/docs/how-to-guides/feathr-job-configuration.md @@ -6,7 +6,7 @@ parent: Feathr How-to Guides # Feathr Job Configuration -Since Feathr uses Spark as the underlying execution engine, there's a way to override Spark configuration by `FeathrClient.get_offline_features()` with `execution_configuratons` parameters. The complete list of the available spark configuration is located in [Spark Configuration](https://spark.apache.org/docs/latest/configuration.html) (though not all of those are honored for cloud hosted Spark platforms such as Databricks), and there are a few Feathr specific ones that are documented here: +Since Feathr uses Spark as the underlying execution engine, there's a way to override Spark configuration by `FeathrClient.get_offline_features()` with `execution_configurations` parameters. The complete list of the available spark configuration is located in [Spark Configuration](https://spark.apache.org/docs/latest/configuration.html) (though not all of those are honored for cloud hosted Spark platforms such as Databricks), and there are a few Feathr specific ones that are documented here: | Property Name | Default | Meaning | Since Version | | ------------------------- | ------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------- | diff --git a/feathr_project/feathr/client.py b/feathr_project/feathr/client.py index 6f8556f2c..2a8c4cd86 100644 --- a/feathr_project/feathr/client.py +++ b/feathr_project/feathr/client.py @@ -83,12 +83,13 @@ class FeathrClient(object): local_workspace_dir (str, optional): set where is the local work space dir. If not set, Feathr will create a temporary folder to store local workspace related files. credential (optional): credential to access cloud resources, most likely to be the returned result of DefaultAzureCredential(). If not set, Feathr will initialize DefaultAzureCredential() inside the __init__ function to get credentials. project_registry_tag (Dict[str, str]): adding tags for project in Feathr registry. This might be useful if you want to tag your project as deprecated, or allow certain customizations on project leve. Default is empty + callback: an async callback function that will be called after execution of the original logic. This callback should not block the thread. This is optional. Raises: RuntimeError: Fail to create the client since necessary environment variables are not set for Redis client creation. """ - def __init__(self, config_path:str = "./feathr_config.yaml", local_workspace_dir: str = None, credential=None, project_registry_tag: Dict[str, str]=None): + def __init__(self, config_path:str = "./feathr_config.yaml", local_workspace_dir: str = None, credential=None, project_registry_tag: Dict[str, str]=None, callback:callable = None): self.logger = logging.getLogger(__name__) # Redis key separator self._KEY_SEPARATOR = ':' @@ -141,7 +142,7 @@ def __init__(self, config_path:str = "./feathr_config.yaml", local_workspace_dir if self.credential is None: self.credential = DefaultAzureCredential(exclude_interactive_browser_credential=False) - self.feathr_spark_laucher = _FeathrSynapseJobLauncher( + self.feathr_spark_launcher = _FeathrSynapseJobLauncher( synapse_dev_url=self.envutils.get_environment_variable_with_default( 'spark_config', 'azure_synapse', 'dev_url'), pool_name=self.envutils.get_environment_variable_with_default( @@ -162,7 +163,7 @@ def __init__(self, config_path:str = "./feathr_config.yaml", local_workspace_dir self.envutils.get_environment_variable_with_default( 'spark_config', 'databricks', 'feathr_runtime_location') - self.feathr_spark_laucher = _FeathrDatabricksJobLauncher( + self.feathr_spark_launcher = _FeathrDatabricksJobLauncher( workspace_instance_url=self.envutils.get_environment_variable_with_default( 'spark_config', 'databricks', 'workspace_instance_url'), token_value=self.envutils.get_environment_variable( @@ -183,6 +184,7 @@ def __init__(self, config_path:str = "./feathr_config.yaml", local_workspace_dir 'feature_registry', 'purview', 'purview_name') # initialize the registry no matter whether we set purview name or not, given some of the methods are used there. self.registry = _FeatureRegistry(self.project_name, self.azure_purview_name, self.registry_delimiter, project_registry_tag, config_path = config_path, credential=self.credential) + self.callback = callback def _check_required_environment_variables_exist(self): """Checks if the required environment variables(form feathr_config.yaml) is set. @@ -265,7 +267,7 @@ def _get_registry_client(self): """ return self.registry._get_registry_client() - def get_online_features(self, feature_table, key, feature_names, callback: callable = None, params: dict = None): + def get_online_features(self, feature_table, key, feature_names, params: dict = None): """Fetches feature value for a certain key from a online feature table. There is an optional callback function and the params to extend this function's capability.For eg. cosumer of the features. @@ -273,7 +275,6 @@ def get_online_features(self, feature_table, key, feature_names, callback: calla feature_table: the name of the feature table. key: the key of the entity feature_names: list of feature names to fetch - callback: an async callback function that will be called after execution of the original logic. This callback should not block the thread. params: a dictionary of parameters for the callback function Return: @@ -288,19 +289,18 @@ def get_online_features(self, feature_table, key, feature_names, callback: calla redis_key = self._construct_redis_key(feature_table, key) res = self.redis_clint.hmget(redis_key, *feature_names) feature_values = self._decode_proto(res) - if (callback is not None) and (params is not None): + if (self.callback is not None) and (params is not None): event_loop = asyncio.get_event_loop() - event_loop.create_task(callback(params)) + event_loop.create_task(self.callback(params)) return feature_values - def multi_get_online_features(self, feature_table, keys, feature_names, callback: callable = None, params: dict = None): + def multi_get_online_features(self, feature_table, keys, feature_names, params: dict = None): """Fetches feature value for a list of keys from a online feature table. This is the batch version of the get API. Args: feature_table: the name of the feature table. keys: list of keys for the entities feature_names: list of feature names to fetch - callback: an async callback function that will be called after execution of the original logic. This callback should not block the thread. params: a dictionary of parameters for the callback function Return: @@ -322,9 +322,9 @@ def multi_get_online_features(self, feature_table, keys, feature_names, callback for feature_list in pipeline_result: decoded_pipeline_result.append(self._decode_proto(feature_list)) - if (callback is not None) and (params is not None): + if (self.callback is not None) and (params is not None): event_loop = asyncio.get_event_loop() - event_loop.create_task(callback(params)) + event_loop.create_task(self.callback(params)) return dict(zip(keys, decoded_pipeline_result)) @@ -424,10 +424,9 @@ def get_offline_features(self, observation_settings: ObservationSettings, feature_query: Union[FeatureQuery, List[FeatureQuery]], output_path: str, - execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, + execution_configurations: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, udf_files = None, verbose: bool = False, - callback: callable = None, params: dict = None ): """ @@ -438,7 +437,6 @@ def get_offline_features(self, feature_query: features that are requested to add onto the observation data output_path: output path of job, i.e. the observation data with features attached. execution_configuratons: a dict that will be passed to spark job when the job starts up, i.e. the "spark configurations". Note that not all of the configuration will be honored since some of the configurations are managed by the Spark platform, such as Databricks or Azure Synapse. Refer to the [spark documentation](https://spark.apache.org/docs/latest/configuration.html) for a complete list of spark configurations. - callback: an async callback function that will be called after execution of the original logic. This callback should not block the thread. params: a dictionary of parameters for the callback function """ feature_queries = feature_query if isinstance(feature_query, List) else [feature_query] @@ -476,19 +474,19 @@ def get_offline_features(self, FeaturePrinter.pretty_print_feature_query(feature_query) write_to_file(content=config, full_file_name=config_file_path) - job_info = self._get_offline_features_with_config(config_file_path, execution_configuratons, udf_files=udf_files) - if (callback is not None) and (params is not None): + job_info = self._get_offline_features_with_config(config_file_path, execution_configurations, udf_files=udf_files) + if (self.callback is not None) and (params is not None): event_loop = asyncio.get_event_loop() - event_loop.create_task(callback(params)) + event_loop.create_task(self.callback(params)) return job_info - def _get_offline_features_with_config(self, feature_join_conf_path='feature_join_conf/feature_join.conf', execution_configuratons: Dict[str,str] = {}, udf_files=[]): + def _get_offline_features_with_config(self, feature_join_conf_path='feature_join_conf/feature_join.conf', execution_configurations: Dict[str,str] = {}, udf_files=[]): """Joins the features to your offline observation dataset based on the join config. Args: feature_join_conf_path: Relative path to your feature join config file. """ - cloud_udf_paths = [self.feathr_spark_laucher.upload_or_get_cloud_path(udf_local_path) for udf_local_path in udf_files] + cloud_udf_paths = [self.feathr_spark_launcher.upload_or_get_cloud_path(udf_local_path) for udf_local_path in udf_files] feathr_feature = ConfigFactory.parse_file(feature_join_conf_path) feature_join_job_params = FeatureJoinJobParams(join_config_path=os.path.abspath(feature_join_conf_path), @@ -498,8 +496,8 @@ def _get_offline_features_with_config(self, feature_join_conf_path='feature_join ) job_tags = {OUTPUT_PATH_TAG:feature_join_job_params.job_output_path} # set output format in job tags if it's set by user, so that it can be used to parse the job result in the helper function - if execution_configuratons is not None and OUTPUT_FORMAT in execution_configuratons: - job_tags[OUTPUT_FORMAT]= execution_configuratons[OUTPUT_FORMAT] + if execution_configurations is not None and OUTPUT_FORMAT in execution_configurations: + job_tags[OUTPUT_FORMAT]= execution_configurations[OUTPUT_FORMAT] ''' - Job tags are for job metadata and it's not passed to the actual spark job (i.e. not visible to spark job), more like a platform related thing that Feathr want to add (currently job tags only have job output URL and job output format, ). They are carried over with the job and is visible to every Feathr client. Think this more like some customized metadata for the job which would be weird to be put in the spark job itself. - Job arguments (or sometimes called job parameters)are the arguments which are command line arguments passed into the actual spark job. This is usually highly related with the spark job. In Feathr it's like the input to the scala spark CLI. They are usually not spark specific (for example if we want to specify the location of the feature files, or want to @@ -507,18 +505,18 @@ def _get_offline_features_with_config(self, feature_join_conf_path='feature_join Job configurations and job arguments (or sometimes called job parameters) have quite some overlaps (i.e. you can achieve the same goal by either using the job arguments/parameters vs. job configurations). But the job tags should just be used for metadata purpose. ''' # submit the jars - return self.feathr_spark_laucher.submit_feathr_job( + return self.feathr_spark_launcher.submit_feathr_job( job_name=self.project_name + '_feathr_feature_join_job', main_jar_path=self._FEATHR_JOB_JAR_PATH, python_files=cloud_udf_paths, job_tags=job_tags, main_class_name='com.linkedin.feathr.offline.job.FeatureJoinJob', arguments=[ - '--join-config', self.feathr_spark_laucher.upload_or_get_cloud_path( + '--join-config', self.feathr_spark_launcher.upload_or_get_cloud_path( feature_join_job_params.join_config_path), '--input', feature_join_job_params.observation_path, '--output', feature_join_job_params.job_output_path, - '--feature-config', self.feathr_spark_laucher.upload_or_get_cloud_path( + '--feature-config', self.feathr_spark_launcher.upload_or_get_cloud_path( feature_join_job_params.feature_config), '--num-parts', self.output_num_parts, '--s3-config', self._get_s3_config_str(), @@ -528,7 +526,7 @@ def _get_offline_features_with_config(self, feature_join_conf_path='feature_join '--snowflake-config', self._get_snowflake_config_str() ], reference_files_path=[], - configuration=execution_configuratons, + configuration=execution_configurations, properties=self._get_system_properties() ) @@ -536,10 +534,10 @@ def get_job_result_uri(self, block=True, timeout_sec=300) -> str: """Gets the job output URI """ if not block: - return self.feathr_spark_laucher.get_job_result_uri() + return self.feathr_spark_launcher.get_job_result_uri() # Block the API by pooling the job status and wait for complete - if self.feathr_spark_laucher.wait_for_completion(timeout_sec): - return self.feathr_spark_laucher.get_job_result_uri() + if self.feathr_spark_launcher.wait_for_completion(timeout_sec): + return self.feathr_spark_launcher.get_job_result_uri() else: raise RuntimeError( 'Spark job failed so output cannot be retrieved.') @@ -547,39 +545,37 @@ def get_job_result_uri(self, block=True, timeout_sec=300) -> str: def get_job_tags(self) -> Dict[str, str]: """Gets the job tags """ - return self.feathr_spark_laucher.get_job_tags() + return self.feathr_spark_launcher.get_job_tags() def wait_job_to_finish(self, timeout_sec: int = 300): """Waits for the job to finish in a blocking way unless it times out """ - if self.feathr_spark_laucher.wait_for_completion(timeout_sec): + if self.feathr_spark_launcher.wait_for_completion(timeout_sec): return else: raise RuntimeError('Spark job failed.') - def monitor_features(self, settings: MonitoringSettings, execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, verbose: bool = False, callback: callable = None, params: dict = None): + def monitor_features(self, settings: MonitoringSettings, execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, verbose: bool = False, params: dict = None): """Create a offline job to generate statistics to monitor feature data. There is an optional callback function and the params to extend this function's capability.For eg. cosumer of the features. Args: settings: Feature monitoring settings execution_configuratons: a dict that will be passed to spark job when the job starts up, i.e. the "spark configurations". Note that not all of the configuration will be honored since some of the configurations are managed by the Spark platform, such as Databricks or Azure Synapse. Refer to the [spark documentation](https://spark.apache.org/docs/latest/configuration.html) for a complete list of spark configurations. - callback: an async callback function that will be called after execution of the original logic. This callback should not block the thread. params: a dictionary of parameters for the callback function. """ self.materialize_features(settings, execution_configuratons, verbose) - if (callback is not None) and (params is not None): + if (self.callback is not None) and (params is not None): event_loop = asyncio.get_event_loop() - event_loop.create_task(callback(params)) + event_loop.create_task(self.callback(params)) - def materialize_features(self, settings: MaterializationSettings, execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, verbose: bool = False, callback: callable = None, params: dict = None): + def materialize_features(self, settings: MaterializationSettings, execution_configurations: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, verbose: bool = False, params: dict = None): """Materialize feature data. There is an optional callback function and the params to extend this function's capability.For eg. cosumer of the feature store. Args: settings: Feature materialization settings execution_configuratons: a dict that will be passed to spark job when the job starts up, i.e. the "spark configurations". Note that not all of the configuration will be honored since some of the configurations are managed by the Spark platform, such as Databricks or Azure Synapse. Refer to the [spark documentation](https://spark.apache.org/docs/latest/configuration.html) for a complete list of spark configurations. - callback: an async callback function that will be called after execution of the original logic. This callback should not block the thread. params: a dictionary of parameters for the callback function """ # produce materialization config @@ -600,7 +596,7 @@ def materialize_features(self, settings: MaterializationSettings, execution_conf udf_files = _PreprocessingPyudfManager.prepare_pyspark_udf_files(settings.feature_names, self.local_workspace_dir) # CLI will directly call this so the experiene won't be broken - self._materialize_features_with_config(config_file_path, execution_configuratons, udf_files) + self._materialize_features_with_config(config_file_path, execution_configurations, udf_files) if os.path.exists(config_file_path): os.remove(config_file_path) @@ -608,18 +604,18 @@ def materialize_features(self, settings: MaterializationSettings, execution_conf if verbose and settings: FeaturePrinter.pretty_print_materialize_features(settings) - if (callback is not None) and (params is not None): + if (self.callback is not None) and (params is not None): event_loop = asyncio.get_event_loop() - event_loop.create_task(callback(params)) + event_loop.create_task(self.callback(params)) - def _materialize_features_with_config(self, feature_gen_conf_path: str = 'feature_gen_conf/feature_gen.conf',execution_configuratons: Dict[str,str] = {}, udf_files=[]): + def _materialize_features_with_config(self, feature_gen_conf_path: str = 'feature_gen_conf/feature_gen.conf',execution_configurations: Dict[str,str] = {}, udf_files=[]): """Materializes feature data based on the feature generation config. The feature data will be materialized to the destination specified in the feature generation config. Args feature_gen_conf_path: Relative path to the feature generation config you want to materialize. """ - cloud_udf_paths = [self.feathr_spark_laucher.upload_or_get_cloud_path(udf_local_path) for udf_local_path in udf_files] + cloud_udf_paths = [self.feathr_spark_launcher.upload_or_get_cloud_path(udf_local_path) for udf_local_path in udf_files] # Read all features conf generation_config = FeatureGenerationJobParams( @@ -635,10 +631,10 @@ def _materialize_features_with_config(self, feature_gen_conf_path: str = 'featur if self.envutils.get_environment_variable('KAFKA_SASL_JAAS_CONFIG'): optional_params = optional_params + ['--kafka-config', self._get_kafka_config_str()] arguments = [ - '--generation-config', self.feathr_spark_laucher.upload_or_get_cloud_path( + '--generation-config', self.feathr_spark_launcher.upload_or_get_cloud_path( generation_config.generation_config_path), # Local Config, comma seperated file names - '--feature-config', self.feathr_spark_laucher.upload_or_get_cloud_path( + '--feature-config', self.feathr_spark_launcher.upload_or_get_cloud_path( generation_config.feature_config), '--redis-config', self._getRedisConfigStr(), '--s3-config', self._get_s3_config_str(), @@ -651,14 +647,14 @@ def _materialize_features_with_config(self, feature_gen_conf_path: str = 'featur if monitoring_config_str: arguments.append('--monitoring-config') arguments.append(monitoring_config_str) - return self.feathr_spark_laucher.submit_feathr_job( + return self.feathr_spark_launcher.submit_feathr_job( job_name=self.project_name + '_feathr_feature_materialization_job', main_jar_path=self._FEATHR_JOB_JAR_PATH, python_files=cloud_udf_paths, main_class_name='com.linkedin.feathr.offline.job.FeatureGenJob', arguments=arguments, reference_files_path=[], - configuration=execution_configuratons, + configuration=execution_configurations, properties=self._get_system_properties() ) @@ -666,7 +662,7 @@ def _materialize_features_with_config(self, feature_gen_conf_path: str = 'featur def wait_job_to_finish(self, timeout_sec: int = 300): """Waits for the job to finish in a blocking way unless it times out """ - if self.feathr_spark_laucher.wait_for_completion(timeout_sec): + if self.feathr_spark_launcher.wait_for_completion(timeout_sec): return else: raise RuntimeError('Spark job failed.') diff --git a/feathr_project/feathr/definition/materialization_settings.py b/feathr_project/feathr/definition/materialization_settings.py index 7794ab51e..c21a76b14 100644 --- a/feathr_project/feathr/definition/materialization_settings.py +++ b/feathr_project/feathr/definition/materialization_settings.py @@ -10,7 +10,7 @@ class BackfillTime: Attributes: start: start time of the backfill, inclusive. end: end time of the backfill, inclusive. - step: duration of each backfill step. e.g. if daily, use timedelta(days=1) + step: duration of each backfill step. e.g. if you want to materialize features on daily basis, use timedelta(days=1) """ def __init__(self, start: datetime, end: datetime, step: timedelta): self.start = start diff --git a/feathr_project/feathr/utils/job_utils.py b/feathr_project/feathr/utils/job_utils.py index 4ac1a7a88..8de787dd0 100644 --- a/feathr_project/feathr/utils/job_utils.py +++ b/feathr_project/feathr/utils/job_utils.py @@ -16,7 +16,7 @@ def get_result_df(client: FeathrClient, format: str = None, res_url: str = None) res_url: str = res_url or client.get_job_result_uri(block=True, timeout_sec=1200) format: str = format or client.get_job_tags().get(OUTPUT_FORMAT, "") tmp_dir = tempfile.TemporaryDirectory() - client.feathr_spark_laucher.download_result(result_path=res_url, local_folder=tmp_dir.name) + client.feathr_spark_launcher.download_result(result_path=res_url, local_folder=tmp_dir.name) dataframe_list = [] # by default the result are in avro format if format: diff --git a/feathr_project/feathrcli/data/feathr_user_workspace/nyc_driver_demo.ipynb b/feathr_project/feathrcli/data/feathr_user_workspace/nyc_driver_demo.ipynb index 0ed133169..f1980ec1e 100644 --- a/feathr_project/feathrcli/data/feathr_user_workspace/nyc_driver_demo.ipynb +++ b/feathr_project/feathrcli/data/feathr_user_workspace/nyc_driver_demo.ipynb @@ -548,7 +548,7 @@ " \"\"\"Download the job result dataset from cloud as a Pandas dataframe.\"\"\"\n", " res_url = client.get_job_result_uri(block=True, timeout_sec=600)\n", " tmp_dir = tempfile.TemporaryDirectory()\n", - " client.feathr_spark_laucher.download_result(result_path=res_url, local_folder=tmp_dir.name)\n", + " client.feathr_spark_launcher.download_result(result_path=res_url, local_folder=tmp_dir.name)\n", " dataframe_list = []\n", " # assuming the result are in avro format\n", " for file in glob.glob(os.path.join(tmp_dir.name, '*.avro')):\n", diff --git a/feathr_project/feathrcli/data/feathr_user_workspace/product_recommendation_demo.ipynb b/feathr_project/feathrcli/data/feathr_user_workspace/product_recommendation_demo.ipynb index 7e1fd584b..37317ba3d 100644 --- a/feathr_project/feathrcli/data/feathr_user_workspace/product_recommendation_demo.ipynb +++ b/feathr_project/feathrcli/data/feathr_user_workspace/product_recommendation_demo.ipynb @@ -576,7 +576,7 @@ " \"\"\"Download the job result dataset from cloud as a Pandas dataframe.\"\"\"\n", " res_url = client.get_job_result_uri(block=True, timeout_sec=600)\n", " tmp_dir = tempfile.TemporaryDirectory()\n", - " client.feathr_spark_laucher.download_result(result_path=res_url, local_folder=tmp_dir.name)\n", + " client.feathr_spark_launcher.download_result(result_path=res_url, local_folder=tmp_dir.name)\n", " dataframe_list = []\n", " # assuming the result are in avro format\n", " for file in glob.glob(os.path.join(tmp_dir.name, '*.avro')):\n", diff --git a/feathr_project/feathrcli/data/feathr_user_workspace/product_recommendation_demo_advanced.ipynb b/feathr_project/feathrcli/data/feathr_user_workspace/product_recommendation_demo_advanced.ipynb index a381529b8..6230f1704 100644 --- a/feathr_project/feathrcli/data/feathr_user_workspace/product_recommendation_demo_advanced.ipynb +++ b/feathr_project/feathrcli/data/feathr_user_workspace/product_recommendation_demo_advanced.ipynb @@ -634,7 +634,7 @@ " \"\"\"Download the job result dataset from cloud as a Pandas dataframe.\"\"\"\n", " res_url = client.get_job_result_uri(block=True, timeout_sec=600)\n", " tmp_dir = tempfile.TemporaryDirectory()\n", - " client.feathr_spark_laucher.download_result(result_path=res_url, local_folder=tmp_dir.name)\n", + " client.feathr_spark_launcher.download_result(result_path=res_url, local_folder=tmp_dir.name)\n", " dataframe_list = []\n", " # assuming the result are in avro format\n", " for file in glob.glob(os.path.join(tmp_dir.name, '*.avro')):\n", diff --git a/feathr_project/test/test_azure_spark_e2e.py b/feathr_project/test/test_azure_spark_e2e.py index 62c4c39b4..bc6a155b8 100644 --- a/feathr_project/test/test_azure_spark_e2e.py +++ b/feathr_project/test/test_azure_spark_e2e.py @@ -28,7 +28,7 @@ def test_feathr_materialize_to_offline(): __file__).parent.resolve() / "test_user_workspace" # os.chdir(test_workspace_dir) - client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + client: FeathrClient = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) backfill_time = BackfillTime(start=datetime( 2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) @@ -63,7 +63,7 @@ def test_feathr_online_store_agg_features(): __file__).parent.resolve() / "test_user_workspace" # os.chdir(test_workspace_dir) - client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + client: FeathrClient = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) backfill_time = BackfillTime(start=datetime( 2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) @@ -80,11 +80,11 @@ def test_feathr_online_store_agg_features(): res = client.get_online_features(online_test_table, '265', [ 'f_location_avg_fare', 'f_location_max_fare']) - # just assme there are values. We don't hard code the values for now for testing - # the correctness of the feature generation should be garunteed by feathr runtime. + # just assume there are values. We don't hard code the values for now for testing + # the correctness of the feature generation should be guaranteed by feathr runtime. # ID 239 and 265 are available in the `DOLocationID` column in this file: # https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2020-04.csv - # View more detials on this dataset: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page + # View more details on this dataset: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page assert len(res) == 2 assert res[0] != None assert res[1] != None @@ -121,11 +121,11 @@ def test_feathr_online_store_non_agg_features(): res = client.get_online_features(online_test_table, '111', ['f_gen_trip_distance', 'f_gen_is_long_trip_distance', 'f1', 'f2', 'f3', 'f4', 'f5', 'f6']) - # just assme there are values. We don't hard code the values for now for testing - # the correctness of the feature generation should be garunteed by feathr runtime. + # just assume there are values. We don't hard code the values for now for testing + # the correctness of the feature generation should be guaranteed by feathr runtime. # ID 239 and 265 are available in the `DOLocationID` column in this file: # https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2020-04.csv - # View more detials on this dataset: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page + # View more detail on this dataset: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page assert len(res) == 8 assert res[0] != None @@ -159,7 +159,7 @@ def test_dbfs_path(): if client.spark_runtime.casefold() == "databricks": # expect this raise an error since the result path is not in dbfs: format with pytest.raises(RuntimeError): - client.feathr_spark_laucher.download_result(result_path="wasb://res_url", local_folder="/tmp") + client.feathr_spark_launcher.download_result(result_path="wasb://res_url", local_folder="/tmp") def test_feathr_get_offline_features(): diff --git a/feathr_project/test/test_client_callback.py b/feathr_project/test/test_client_callback.py index b3b543426..544c4c20b 100644 --- a/feathr_project/test/test_client_callback.py +++ b/feathr_project/test/test_client_callback.py @@ -10,14 +10,16 @@ from feathr import FeatureQuery from feathr import ObservationSettings from feathr import TypedKey -from test_fixture import basic_test_setup from test_fixture import get_online_test_table_name from feathr.definition._materialization_utils import _to_materialization_config from feathr import (BackfillTime, MaterializationSettings) from feathr import (BackfillTime, MaterializationSettings, FeatureQuery, ObservationSettings, SparkExecutionConfiguration) from feathr import RedisSink, HdfsSink - +from feathr import (BOOLEAN, FLOAT, INPUT_CONTEXT, INT32, STRING, + DerivedFeature, Feature, FeatureAnchor, HdfsSource, + TypedKey, ValueType, WindowAggTransformation) +from feathr import FeathrClient params = {"wait" : 0.1} async def sample_callback(params): @@ -26,9 +28,85 @@ async def sample_callback(params): callback = mock.MagicMock(return_value=sample_callback(params)) + +def basic_test_setup_with_callback(config_path: str, callback: callable): + + now = datetime.now() + # set workspace folder by time; make sure we don't have write conflict if there are many CI tests running + os.environ['SPARK_CONFIG__DATABRICKS__WORK_DIR'] = ''.join(['dbfs:/feathrazure_cijob','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)]) + os.environ['SPARK_CONFIG__AZURE_SYNAPSE__WORKSPACE_DIR'] = ''.join(['abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/feathr_github_ci','_', str(now.minute), '_', str(now.second) ,'_', str(now.microsecond)]) + + client = FeathrClient(config_path=config_path, callback=callback) + batch_source = HdfsSource(name="nycTaxiBatchSource", + path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", + event_timestamp_column="lpep_dropoff_datetime", + timestamp_format="yyyy-MM-dd HH:mm:ss") + + f_trip_distance = Feature(name="f_trip_distance", + feature_type=FLOAT, transform="trip_distance") + f_trip_time_duration = Feature(name="f_trip_time_duration", + feature_type=INT32, + transform="(to_unix_timestamp(lpep_dropoff_datetime) - to_unix_timestamp(lpep_pickup_datetime))/60") + + features = [ + f_trip_distance, + f_trip_time_duration, + Feature(name="f_is_long_trip_distance", + feature_type=BOOLEAN, + transform="cast_float(trip_distance)>30"), + Feature(name="f_day_of_week", + feature_type=INT32, + transform="dayofweek(lpep_dropoff_datetime)"), + ] + + + request_anchor = FeatureAnchor(name="request_features", + source=INPUT_CONTEXT, + features=features) + + f_trip_time_distance = DerivedFeature(name="f_trip_time_distance", + feature_type=FLOAT, + input_features=[ + f_trip_distance, f_trip_time_duration], + transform="f_trip_distance * f_trip_time_duration") + + f_trip_time_rounded = DerivedFeature(name="f_trip_time_rounded", + feature_type=INT32, + input_features=[f_trip_time_duration], + transform="f_trip_time_duration % 10") + + location_id = TypedKey(key_column="DOLocationID", + key_column_type=ValueType.INT32, + description="location id in NYC", + full_name="nyc_taxi.location_id") + agg_features = [Feature(name="f_location_avg_fare", + key=location_id, + feature_type=FLOAT, + transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)", + agg_func="AVG", + window="90d")), + Feature(name="f_location_max_fare", + key=location_id, + feature_type=FLOAT, + transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)", + agg_func="MAX", + window="90d")) + ] + + agg_anchor = FeatureAnchor(name="aggregationFeatures", + source=batch_source, + features=agg_features) + + client.build_features(anchor_list=[agg_anchor, request_anchor], derived_feature_list=[ + f_trip_time_distance, f_trip_time_rounded]) + + return client + + + def test_client_callback_offline_feature(): test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace" - client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + client = basic_test_setup_with_callback(os.path.join(test_workspace_dir, "feathr_config.yaml"),callback) location_id = TypedKey(key_column="DOLocationID", key_column_type=ValueType.INT32, @@ -47,7 +125,6 @@ def test_client_callback_offline_feature(): res = client.get_offline_features(observation_settings=settings, feature_query=feature_query, output_path=output_path, - callback=callback, params=params) callback.assert_called_with(params) @@ -56,7 +133,7 @@ def test_client_callback_materialization(): online_test_table = get_online_test_table_name("nycTaxiCITable") test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace" - client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + client = basic_test_setup_with_callback(os.path.join(test_workspace_dir, "feathr_config.yaml"),callback) backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) redisSink = RedisSink(table_name=online_test_table) settings = MaterializationSettings("nycTaxiTable", @@ -64,14 +141,14 @@ def test_client_callback_materialization(): feature_names=[ "f_location_avg_fare", "f_location_max_fare"], backfill_time=backfill_time) - client.materialize_features(settings, callback=callback, params=params) + client.materialize_features(settings, params=params) callback.assert_called_with(params) def test_client_callback_monitor_features(): online_test_table = get_online_test_table_name("nycTaxiCITable") test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace" - client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + client = basic_test_setup_with_callback(os.path.join(test_workspace_dir, "feathr_config.yaml"),callback) backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) redisSink = RedisSink(table_name=online_test_table) settings = MaterializationSettings("nycTaxiTable", @@ -79,14 +156,14 @@ def test_client_callback_monitor_features(): feature_names=[ "f_location_avg_fare", "f_location_max_fare"], backfill_time=backfill_time) - client.monitor_features(settings, callback=callback, params=params) + client.monitor_features(settings, params=params) callback.assert_called_with(params) def test_client_callback_get_online_features(): online_test_table = get_online_test_table_name("nycTaxiCITable") test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace" - client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + client = basic_test_setup_with_callback(os.path.join(test_workspace_dir, "feathr_config.yaml"),callback) backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) redisSink = RedisSink(table_name=online_test_table) settings = MaterializationSettings("nycTaxiTable", @@ -99,7 +176,7 @@ def test_client_callback_get_online_features(): client.wait_job_to_finish(timeout_sec=900) # wait for a few secs for the data to come in redis time.sleep(5) - client.get_online_features('nycTaxiDemoFeature', '265', ['f_location_avg_fare', 'f_location_max_fare'], callback=callback, params=params) + client.get_online_features('nycTaxiDemoFeature', '265', ['f_location_avg_fare', 'f_location_max_fare'], params=params) callback.assert_called_with(params) @@ -107,7 +184,7 @@ def test_client_callback_multi_get_online_features(): online_test_table = get_online_test_table_name("nycTaxiCITable") test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace" - client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + client = basic_test_setup_with_callback(os.path.join(test_workspace_dir, "feathr_config.yaml"),callback) backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1)) redisSink = RedisSink(table_name=online_test_table) settings = MaterializationSettings("nycTaxiTable", @@ -120,5 +197,5 @@ def test_client_callback_multi_get_online_features(): client.wait_job_to_finish(timeout_sec=900) # wait for a few secs for the data to come in redis time.sleep(5) - client.multi_get_online_features('nycTaxiDemoFeature', ["239", "265"], ['f_location_avg_fare', 'f_location_max_fare'], callback=callback, params=params) + client.multi_get_online_features('nycTaxiDemoFeature', ["239", "265"], ['f_location_avg_fare', 'f_location_max_fare'], params=params) callback.assert_called_with(params) \ No newline at end of file diff --git a/feathr_project/test/test_feature_materialization.py b/feathr_project/test/test_feature_materialization.py index 4952bb764..9539af97a 100644 --- a/feathr_project/test/test_feature_materialization.py +++ b/feathr_project/test/test_feature_materialization.py @@ -153,7 +153,7 @@ def test_get_offline_features_verbose(): observation_settings=settings, feature_query=feature_query, output_path=output_path, - execution_configuratons=SparkExecutionConfiguration({"spark.feathr.inputFormat": "parquet", "spark.feathr.outputFormat": "parquet"}), + execution_configurations=SparkExecutionConfiguration({"spark.feathr.inputFormat": "parquet", "spark.feathr.outputFormat": "parquet"}), verbose=True ) diff --git a/feathr_project/test/test_input_output_sources.py b/feathr_project/test/test_input_output_sources.py index c100ccd7e..932347145 100644 --- a/feathr_project/test/test_input_output_sources.py +++ b/feathr_project/test/test_input_output_sources.py @@ -46,7 +46,7 @@ def test_feathr_get_offline_features_with_parquet(): client.get_offline_features(observation_settings=settings, feature_query=feature_query, output_path=output_path, - execution_configuratons=SparkExecutionConfiguration({"spark.feathr.inputFormat": "parquet", "spark.feathr.outputFormat": "parquet"}) + execution_configurations=SparkExecutionConfiguration({"spark.feathr.inputFormat": "parquet", "spark.feathr.outputFormat": "parquet"}) ) # assuming the job can successfully run; otherwise it will throw exception @@ -91,7 +91,7 @@ def test_feathr_get_offline_features_with_delta_lake(): client.get_offline_features(observation_settings=settings, feature_query=feature_query, output_path=output_path, - execution_configuratons=SparkExecutionConfiguration({"spark.feathr.inputFormat": "delta", "spark.feathr.outputFormat": "delta"}) + execution_configurations=SparkExecutionConfiguration({"spark.feathr.inputFormat": "delta", "spark.feathr.outputFormat": "delta"}) ) # assuming the job can successfully run; otherwise it will throw exception diff --git a/registry/sql-registry/main.py b/registry/sql-registry/main.py index a40fae89c..bd04c73ff 100644 --- a/registry/sql-registry/main.py +++ b/registry/sql-registry/main.py @@ -1,10 +1,11 @@ import os from typing import Optional +from uuid import UUID from fastapi import APIRouter, FastAPI, HTTPException from starlette.middleware.cors import CORSMiddleware from registry import * from registry.db_registry import DbRegistry -from registry.models import EntityType +from registry.models import AnchorDef, AnchorFeatureDef, DerivedFeatureDef, EntityType, ProjectDef, SourceDef, to_snake rp = "/" try: @@ -21,11 +22,12 @@ # Enables CORS app.add_middleware(CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + @router.get("/projects") def get_projects() -> list[str]: @@ -47,15 +49,16 @@ def get_project_datasources(project: str) -> list: @router.get("/projects/{project}/features") def get_project_features(project: str, keyword: Optional[str] = None) -> list: - if keyword is None: - p = registry.get_entity(project) - feature_ids = [s.id for s in p.attributes.anchor_features] + \ - [s.id for s in p.attributes.derived_features] + if keyword: + efs = registry.search_entity( + keyword, [EntityType.AnchorFeature, EntityType.DerivedFeature]) + feature_ids = [ef.id for ef in efs] features = registry.get_entities(feature_ids) return list([e.to_dict() for e in features]) else: - efs = registry.search_entity(keyword, [EntityType.AnchorFeature, EntityType.DerivedFeature]) - feature_ids = [ef.id for ef in efs] + p = registry.get_entity(project) + feature_ids = [s.id for s in p.attributes.anchor_features] + \ + [s.id for s in p.attributes.derived_features] features = registry.get_entities(feature_ids) return list([e.to_dict() for e in features]) @@ -64,8 +67,9 @@ def get_project_features(project: str, keyword: Optional[str] = None) -> list: def get_feature(feature: str) -> dict: e = registry.get_entity(feature) if e.entity_type not in [EntityType.DerivedFeature, EntityType.AnchorFeature]: - raise HTTPException(status_code=404, detail=f"Feature {feature} not found") - return e + raise HTTPException( + status_code=404, detail=f"Feature {feature} not found") + return e.to_dict() @router.get("/features/{feature}/lineage") @@ -74,4 +78,39 @@ def get_feature_lineage(feature: str) -> dict: return lineage.to_dict() -app.include_router(prefix = rp, router=router) +@router.post("/projects") +def new_project(definition: dict) -> dict: + id = registry.create_project(ProjectDef(**to_snake(definition))) + return {"guid": str(id)} + + +@router.post("/projects/{project}/datasources") +def new_project_datasource(project: str, definition: dict) -> dict: + project_id = registry.get_entity_id(project) + id = registry.create_project_datasource(project_id, SourceDef(**to_snake(definition))) + return {"guid": str(id)} + + +@router.post("/projects/{project}/anchors") +def new_project_anchor(project: str, definition: dict) -> dict: + project_id = registry.get_entity_id(project) + id = registry.create_project_anchor(project_id, AnchorDef(**to_snake(definition))) + return {"guid": str(id)} + + +@router.post("/projects/{project}/anchors/{anchor}/features") +def new_project_anchor_feature(project: str, anchor: str, definition: dict) -> dict: + project_id = registry.get_entity_id(project) + anchor_id = registry.get_entity_id(anchor) + id = registry.create_project_anchor_feature(project_id, anchor_id, AnchorFeatureDef(**to_snake(definition))) + return {"guid": str(id)} + + +@router.post("/projects/{project}/derivedfeatures") +def new_project_derived_feature(project: str, definition: dict) -> dict: + project_id = registry.get_entity_id(project) + id = registry.create_project_derived_feature(project_id, DerivedFeatureDef(**to_snake(definition))) + return {"guid": str(id)} + + +app.include_router(prefix=rp, router=router) diff --git a/registry/sql-registry/registry/database.py b/registry/sql-registry/registry/database.py index d82568972..39bab8ec4 100644 --- a/registry/sql-registry/registry/database.py +++ b/registry/sql-registry/registry/database.py @@ -1,6 +1,7 @@ from abc import ABC, abstractmethod +from contextlib import contextmanager +import logging import threading -from distutils.log import debug, warn import os import pymssql @@ -9,7 +10,7 @@ class DbConnection(ABC): @abstractmethod - def execute(self, sql: str, *args, **kwargs) -> list[dict]: + def query(self, sql: str, *args, **kwargs) -> list[dict]: pass def quote(id): @@ -38,12 +39,15 @@ def parse_conn_str(s: str) -> dict: class MssqlConnection(DbConnection): @staticmethod - def connect(*args, **kwargs): + def connect(autocommit = True): conn_str = os.environ["CONNECTION_STR"] if "Server=" not in conn_str: - debug("`CONNECTION_STR` is not in ADO connection string format") + logging.debug("`CONNECTION_STR` is not in ADO connection string format") return None - return MssqlConnection(parse_conn_str(conn_str)) + params = parse_conn_str(conn_str) + if not autocommit: + params["autocommit"] = False + return MssqlConnection(params) def __init__(self, params): self.params = params @@ -53,8 +57,11 @@ def __init__(self, params): def make_connection(self): self.conn = pymssql.connect(**self.params) - def execute(self, sql: str, *args, **kwargs) -> list[dict]: - debug(f"SQL: `{sql}`") + def query(self, sql: str, *args, **kwargs) -> list[dict]: + """ + Make SQL query and return result + """ + logging.debug(f"SQL: `{sql}`") # NOTE: Only one cursor is allowed at the same time retry = 0 while True: @@ -64,7 +71,7 @@ def execute(self, sql: str, *args, **kwargs) -> list[dict]: c.execute(sql, *args, **kwargs) return c.fetchall() except pymssql.OperationalError: - warn("Database error, retrying...") + logging.warning("Database error, retrying...") # Reconnect self.make_connection() retry += 1 @@ -73,13 +80,49 @@ def execute(self, sql: str, *args, **kwargs) -> list[dict]: raise pass + @contextmanager + def transaction(self): + """ + Start a transaction so we can run multiple SQL in one batch. + User should use `with` with the returned value, look into db_registry.py for more real usage. + + NOTE: `self.query` and `self.execute` will use a different MSSQL connection so any change made + in this transaction will *not* be visible in these calls. + + The minimal implementation could look like this if the underlying engine doesn't support transaction. + ``` + @contextmanager + def transaction(self): + try: + c = self.create_or_get_connection(...) + yield c + finally: + c.close(...) + ``` + """ + conn = None + cursor = None + try: + # As one MssqlConnection has only one connection, we need to create a new one to disable `autocommit` + conn = MssqlConnection.connect(autocommit=False).conn + cursor = conn.cursor(as_dict=True) + yield cursor + except Exception as e: + logging.warning(f"Exception: {e}") + if conn: + conn.rollback() + raise e + finally: + if conn: + conn.commit() + providers.append(MssqlConnection) -def connect(): +def connect(*args, **kargs): for p in providers: - ret = p.connect() + ret = p.connect(*args, **kargs) if ret is not None: return ret raise RuntimeError("Cannot connect to database") \ No newline at end of file diff --git a/registry/sql-registry/registry/db_registry.py b/registry/sql-registry/registry/db_registry.py index f5456c5e5..ce636db86 100644 --- a/registry/sql-registry/registry/db_registry.py +++ b/registry/sql-registry/registry/db_registry.py @@ -1,16 +1,20 @@ from typing import Optional, Tuple, Union -from uuid import UUID +from uuid import UUID, uuid4 + +from pydantic import UUID4 from registry import Registry from registry import connect -from registry.models import Edge, EntitiesAndRelations, Entity, EntityRef, EntityType, RelationshipType, _to_type, _to_uuid +from registry.models import AnchorAttributes, AnchorDef, AnchorFeatureAttributes, AnchorFeatureDef, DerivedFeatureAttributes, DerivedFeatureDef, Edge, EntitiesAndRelations, Entity, EntityRef, EntityType, ProjectAttributes, ProjectDef, RelationshipType, SourceAttributes, SourceDef, _to_type, _to_uuid import json def quote(id): if isinstance(id, str): return f"'{id}'" + if isinstance(id, UUID): + return f"'{str(id)}'" else: - return ",".join([f"'{i}'" for i in id]) + return ",".join([quote(i) for i in id]) class DbRegistry(Registry): @@ -18,7 +22,7 @@ def __init__(self): self.conn = connect() def get_projects(self) -> list[str]: - ret = self.conn.execute( + ret = self.conn.query( f"select qualified_name from entities where entity_type='{EntityType.Project}'") return list([r["qualified_name"] for r in ret]) @@ -35,12 +39,12 @@ def get_entity_id(self, id_or_name: Union[str, UUID]) -> UUID: except ValueError: pass # It is a name - ret = self.conn.execute( + ret = self.conn.query( f"select entity_id from entities where qualified_name='{id_or_name}'") return ret[0]["entity_id"] def get_neighbors(self, id_or_name: Union[str, UUID], relationship: RelationshipType) -> list[Edge]: - rows = self.conn.execute(fr''' + rows = self.conn.query(fr''' select edge_id, from_id, to_id, conn_type from edges where from_id = '{self.get_entity_id(id_or_name)}' @@ -78,7 +82,8 @@ def get_project(self, id_or_name: Union[str, UUID]) -> EntitiesAndRelations: edges = edges.union(conn) features = list([child_map[id] for id in feature_ids]) anchor.attributes.features = features - source_id = self.get_neighbors(anchor.id, RelationshipType.Consumes)[0].to_id + source_id = self.get_neighbors( + anchor.id, RelationshipType.Consumes)[0].to_id anchor.attributes.source = child_map[source_id] for df in project.attributes.derived_features: conn = self.get_neighbors(anchor.id, RelationshipType.Consumes) @@ -88,7 +93,269 @@ def get_project(self, id_or_name: Union[str, UUID]) -> EntitiesAndRelations: df.attributes.input_features = features all_edges = self._get_edges(ids) return EntitiesAndRelations([project] + children, list(edges.union(all_edges))) - + + def search_entity(self, + keyword: str, + type: list[EntityType]) -> list[EntityRef]: + """ + WARN: This search function is implemented via `like` operator, which could be extremely slow. + """ + types = ",".join([quote(str(t)) for t in type]) + sql = fr'''select entity_id as id, qualified_name, entity_type as type from entities where qualified_name like %s and entity_type in ({types})''' + rows = self.conn.query(sql, ('%' + keyword + '%', )) + return list([EntityRef(**row) for row in rows]) + + def create_project(self, definition: ProjectDef) -> UUID: + # Here we start a transaction, any following step failed, everything rolls back + definition.qualified_name = definition.name + with self.conn.transaction() as c: + # First we try to find existing entity with the same qualified name + c.execute(f'''select entity_id, entity_type, attributes from entities where qualified_name = %s''', + definition.qualified_name) + r = c.fetchall() + if r: + if len(r) > 1: + assert False, "Data inconsistency detected, %d entities have same qualified_name %s" % ( + len(r), definition.qualified_name) + # The entity with same name already exists but with different type + if _to_type(r[0]["entity_type"], EntityType) != EntityType.Project: + raise ValueError("Entity %s already exists" % + definition.qualified_name) + # Just return the existing project id + return _to_uuid(r[0]["entity_id"]) + id = uuid4() + c.execute(f"insert into entities (entity_id, entity_type, qualified_name, attributes) values (%s, %s, %s, %s)", + (str(id), + str(EntityType.Project), + definition.qualified_name, + definition.to_attr().to_json())) + return id + + def create_project_datasource(self, project_id: UUID, definition: SourceDef) -> UUID: + project = self.get_entity(project_id) + definition.qualified_name = f"{project.qualified_name}__{definition.name}" + # Here we start a transaction, any following step failed, everything rolls back + with self.conn.transaction() as c: + # First we try to find existing entity with the same qualified name + c.execute(f'''select entity_id, entity_type, attributes from entities where qualified_name = %s''', + definition.qualified_name) + r = c.fetchall() + if r: + if len(r) > 1: + # There are multiple entities with same qualified name, that means we already have errors in the db + assert False, "Data inconsistency detected, %d entities have same qualified_name %s" % ( + len(r), definition.qualified_name) + # The entity with same name already exists but with different type + if _to_type(r[0]["entity_type"], EntityType) != EntityType.Source: + raise ValueError("Entity %s already exists" % + definition.qualified_name) + attr: SourceAttributes = _to_type( + json.loads(r[0]["attributes"]), SourceAttributes) + if attr.name == definition.name \ + and attr.type == definition.type \ + and attr.path == definition.path \ + and attr.preprocessing == definition.preprocessing \ + and attr.event_timestamp_column == definition.event_timestamp_column \ + and attr.timestamp_format == definition.timestamp_format: + # Creating exactly same entity + # Just return the existing id + return _to_uuid(r[0]["entity_id"]) + raise ValueError("Entity %s already exists" % + definition.qualified_name) + id = uuid4() + c.execute(f"insert into entities (entity_id, entity_type, qualified_name, attributes) values (%s, %s, %s, %s)", + (str(id), + str(EntityType.Source), + definition.qualified_name, + definition.to_attr().to_json())) + self._create_edge(c, project_id, id, RelationshipType.Contains) + self._create_edge(c, id, project_id, RelationshipType.BelongsTo) + return id + + def create_project_anchor(self, project_id: UUID, definition: AnchorDef) -> UUID: + project = self.get_entity(project_id) + definition.qualified_name = f"{project.qualified_name}__{definition.name}" + # Here we start a transaction, any following step failed, everything rolls back + with self.conn.transaction() as c: + # First we try to find existing entity with the same qualified name + c.execute(f'''select entity_id, entity_type, attributes from entities where qualified_name = %s''', + definition.qualified_name) + r = c.fetchall() + if r: + if len(r) > 1: + # There are multiple entities with same qualified name, that means we already have errors in the db + assert False, "Data inconsistency detected, %d entities have same qualified_name %s" % ( + len(r), definition.qualified_name) + # The entity with same name already exists but with different type + if _to_type(r[0]["entity_type"], EntityType) != EntityType.Anchor: + raise ValueError("Entity %s already exists" % + definition.qualified_name) + attr: AnchorAttributes = _to_type( + json.loads(r[0]["attributes"]), AnchorAttributes) + if attr.name == definition.name: + # Creating exactly same entity + # Just return the existing id + return _to_uuid(r[0]["entity_id"]) + raise ValueError("Entity %s already exists" % + definition.qualified_name) + c.execute("select entity_id, qualified_name from entities where entity_id = %s and entity_type = %s", (str( + definition.source_id), str(EntityType.Source))) + r = c.fetchall() + if not r: + raise ValueError("Source %s does not exist" % + definition.source_id) + ref = EntityRef(r[0]["entity_id"], + EntityType.Source, r[0]["qualified_name"]) + id = uuid4() + c.execute(f"insert into entities (entity_id, entity_type, qualified_name, attributes) values (%s, %s, %s, %s)", + (str(id), + str(EntityType.Anchor), + definition.qualified_name, + definition.to_attr(ref).to_json())) + # Add "Contains/BelongsTo" relations between anchor and project + self._create_edge(c, project_id, id, RelationshipType.Contains) + self._create_edge(c, id, project_id, RelationshipType.BelongsTo) + # Add "Consumes/Produces" relations between anchor and datasource + self._create_edge(c, id, definition.source_id, + RelationshipType.Consumes) + self._create_edge(c, definition.source_id, id, + RelationshipType.Produces) + return id + + def create_project_anchor_feature(self, project_id: UUID, anchor_id: UUID, definition: AnchorFeatureDef) -> UUID: + anchor = self.get_entity(anchor_id) + definition.qualified_name = f"{anchor.qualified_name}__{definition.name}" + # Here we start a transaction, any following step failed, everything rolls back + with self.conn.transaction() as c: + # First we try to find existing entity with the same qualified name + c.execute(f'''select entity_id, entity_type, attributes from entities where qualified_name = %s''', + definition.qualified_name) + r = c.fetchall() + if r: + if len(r) > 1: + # There are multiple entities with same qualified name, that means we already have errors in the db + assert False, "Data inconsistency detected, %d entities have same qualified_name %s" % ( + len(r), definition.qualified_name) + # The entity with same name already exists but with different type + if _to_type(r[0]["entity_type"], EntityType) != EntityType.AnchorFeature: + raise ValueError("Entity %s already exists" % + definition.qualified_name) + attr: AnchorFeatureAttributes = _to_type( + json.loads(r[0]["attributes"]), AnchorFeatureAttributes) + if attr.name == definition.name \ + and attr.type == definition.feature_type \ + and attr.transformation == definition.transformation \ + and attr.key == definition.key: + # Creating exactly same entity + # Just return the existing id + return _to_uuid(r[0]["entity_id"]) + # The existing entity has different definition, that's a conflict + raise ValueError("Entity %s already exists" % + definition.qualified_name) + source_id = anchor.attributes.source.id + id = uuid4() + c.execute(f"insert into entities (entity_id, entity_type, qualified_name, attributes) values (%s, %s, %s, %s)", + (str(id), + str(EntityType.AnchorFeature), + definition.qualified_name, + definition.to_attr().to_json())) + # Add "Contains/BelongsTo" relations between anchor feature and project + self._create_edge(c, project_id, id, RelationshipType.Contains) + self._create_edge(c, id, project_id, RelationshipType.BelongsTo) + # Add "Contains/BelongsTo" relations between anchor feature and anchor + self._create_edge(c, anchor_id, id, RelationshipType.Contains) + self._create_edge(c, id, anchor_id, RelationshipType.BelongsTo) + # Add "Consumes/Produces" relations between anchor feature and datasource used by anchor + self._create_edge(c, id, source_id, RelationshipType.Consumes) + self._create_edge(c, source_id, id, RelationshipType.Produces) + return id + + def create_project_derived_feature(self, project_id: UUID, definition: DerivedFeatureDef) -> UUID: + project = self.get_entity(project_id) + definition.qualified_name = f"{project.qualified_name}__{definition.name}" + # Here we start a transaction, any following step failed, everything rolls back + with self.conn.transaction() as c: + # First we try to find existing entity with the same qualified name + c.execute(f'''select entity_id, entity_type, attributes from entities where qualified_name = %s''', + definition.qualified_name) + r = c.fetchall() + if r: + if len(r) > 1: + # There are multiple entities with same qualified name, that means we already have errors in the db + assert False, "Data inconsistency detected, %d entities have same qualified_name %s" % ( + len(r), definition.qualified_name) + # The entity with same name already exists but with different type, that's conflict + if _to_type(r[0]["entity_type"], EntityType) != EntityType.DerivedFeature: + raise ValueError("Entity %s already exists" % + definition.qualified_name) + attr: DerivedFeatureAttributes = _to_type( + json.loads(r[0]["attributes"]), DerivedFeatureAttributes) + if attr.name == definition.name \ + and attr.type == definition.feature_type \ + and attr.transformation == definition.transformation \ + and attr.key == definition.key: + # Creating exactly same entity + # Just return the existing id + return _to_uuid(r[0]["entity_id"]) + # The existing entity has different definition, that's a conflict + raise ValueError("Entity %s already exists" % + definition.qualified_name) + r1 = [] + # Fill `input_anchor_features`, from `definition` we have ids only, we still need qualified names + if definition.input_anchor_features: + c.execute( + fr'''select entity_id, entity_type, qualified_name from entities where entity_id in ({quote(definition.input_anchor_features)}) and entity_type = %s ''', str(EntityType.AnchorFeature)) + r1 = c.fetchall() + if len(r1) != len(definition.input_anchor_features): + # TODO: More detailed error + raise(ValueError("Missing input anchor features")) + # Fill `input_derived_features`, from `definition` we have ids only, we still need qualified names + r2 = [] + if definition.input_derived_features: + c.execute( + fr'''select entity_id, entity_type, qualified_name from entities where entity_id in ({quote(definition.input_derived_features)}) and entity_type = %s ''', str(EntityType.DerivedFeature)) + r2 = c.fetchall() + if len(r2) != len(definition.input_derived_features): + # TODO: More detailed error + raise(ValueError("Missing input derived features")) + refs = list([EntityRef(r["entity_id"], r["entity_type"], r["qualified_name"]) for r in r1+r2]) + id = uuid4() + c.execute(f"insert into entities (entity_id, entity_type, qualified_name, attributes) values (%s, %s, %s, %s)", + (str(id), + str(EntityType.DerivedFeature), + definition.qualified_name, + definition.to_attr(refs).to_json())) + # Add "Contains/BelongsTo" relations between derived feature and project + self._create_edge(c, project_id, id, RelationshipType.Contains) + self._create_edge(c, id, project_id, RelationshipType.BelongsTo) + for r in r1+r2: + # Add "Consumes/Produces" relations between derived feature and all its upstream + input_feature_id = r["entity_id"] + self._create_edge(c, id, input_feature_id, + RelationshipType.Consumes) + self._create_edge(c, input_feature_id, id, + RelationshipType.Produces) + return id + + def _create_edge(self, cursor, from_id: UUID, to_id: UUID, type: RelationshipType): + """ + Create an edge with specified type between 2 entities, skip if the same connection already exists + """ + sql = r''' + IF NOT EXISTS (SELECT 1 FROM edges WHERE from_id=%(from_id)s and to_id=%(to_id)s and conn_type=%(type)s) + BEGIN + INSERT INTO edges + (edge_id, from_id, to_id, conn_type) + values + (%(edge_id)s, %(from_id)s, %(to_id)s, %(type)s) + END''' + cursor.execute(sql, { + "edge_id": str(uuid4()), + "from_id": str(from_id), + "to_id": str(to_id), + "type": type.name + }) + def _fill_entity(self, e: Entity) -> Entity: """ Entities in the DB contains only attributes belong to itself, but the returned @@ -105,7 +372,8 @@ def _fill_entity(self, e: Entity) -> Entity: feature_ids = [e.to_id for e in conn] features = self._get_entities(feature_ids) e.attributes.features = features - source_id = self.get_neighbors(e.id, RelationshipType.Consumes)[0].to_id + source_id = self.get_neighbors( + e.id, RelationshipType.Consumes)[0].to_id source = self.get_entity(source_id) e.attributes.source = source return e @@ -116,21 +384,21 @@ def _fill_entity(self, e: Entity) -> Entity: e.attributes.input_features = features return e return e - + def _get_edges(self, ids: list[UUID], types: list[RelationshipType] = []) -> list[Edge]: sql = fr"""select edge_id, from_id, to_id, conn_type from edges where from_id in ({quote(ids)}) and to_id in ({quote(ids)})""" - if len(types)>0: + if len(types) > 0: sql = fr"""select edge_id, from_id, to_id, conn_type from edges where conn_type in ({quote(types)}) and from_id in ({quote(ids)}) and to_id in ({quote(ids)})""" - rows = self.conn.execute(sql) + rows = self.conn.query(sql) return list([_to_type(row, Edge) for row in rows]) - + def _get_entity(self, id_or_name: Union[str, UUID]) -> Entity: - row = self.conn.execute(fr''' + row = self.conn.query(fr''' select entity_id, qualified_name, entity_type, attributes from entities where entity_id = '{self.get_entity_id(id_or_name)}' @@ -139,8 +407,9 @@ def _get_entity(self, id_or_name: Union[str, UUID]) -> Entity: return _to_type(row, Entity) def _get_entities(self, ids: list[UUID]) -> list[Entity]: - rows = self.conn.execute(fr''' - select entity_id, qualified_name, entity_type, attributes + if not ids: + return [] + rows = self.conn.query(fr'''select entity_id, qualified_name, entity_type, attributes from entities where entity_id in ({quote(ids)}) ''') @@ -154,7 +423,7 @@ def _bfs(self, id: UUID, conn_type: RelationshipType) -> Tuple[list[Entity], lis """ Breadth first traversal Starts from `id`, follow edges with `conn_type` only. - + WARN: There is no depth limit. """ connections = [] @@ -180,15 +449,4 @@ def _bfs_step(self, ids: list[UUID], conn_type: RelationshipType) -> set[dict]: """ ids = list([id["to_id"] for id in ids]) sql = fr"""select edge_id, from_id, to_id, conn_type from edges where conn_type = '{conn_type.name}' and from_id in ({quote(ids)})""" - return self.conn.execute(sql) - - def search_entity(self, - keyword: str, - type: list[EntityType]) -> list[EntityRef]: - """ - WARN: This search function is implemented via `like` operator, which could be extremely slow. - """ - types = ",".join([quote(str(t)) for t in type]) - sql = fr'''select entity_id as id, qualified_name, entity_type as type from entities where qualified_name like %s and entity_type in ({types})''' - rows = self.conn.execute(sql, ('%' + keyword + '%', )) - return list([EntityRef(**row) for row in rows]) + return self.conn.query(sql) diff --git a/registry/sql-registry/registry/interface.py b/registry/sql-registry/registry/interface.py index 406c52ace..0532af46b 100644 --- a/registry/sql-registry/registry/interface.py +++ b/registry/sql-registry/registry/interface.py @@ -1,10 +1,11 @@ -from abc import ABC, abstractmethod +from abc import ABC, abstractclassmethod, abstractmethod from typing import Union from uuid import UUID from registry.database import DbConnection from registry.models import * + class Registry(ABC): @abstractmethod def get_projects(self) -> list[str]: @@ -33,7 +34,7 @@ def get_entity_id(self, id_or_name: Union[str, UUID]) -> UUID: Get entity id by its name """ pass - + @abstractmethod def get_neighbors(self, id_or_name: Union[str, UUID], relationship: RelationshipType) -> list[Edge]: """ @@ -67,3 +68,37 @@ def search_entity(self, """ pass + @abstractmethod + def create_project(self, definition: ProjectDef) -> UUID: + """ + Create a new project + """ + pass + + @abstractmethod + def create_project_datasource(self, project_id: UUID, definition: SourceDef) -> UUID: + """ + Create a new datasource under the project + """ + pass + + @abstractmethod + def create_project_anchor(self, project_id: UUID, definition: AnchorDef) -> UUID: + """ + Create a new anchor under the project + """ + pass + + @abstractmethod + def create_project_anchor_feature(self, project_id: UUID, anchor_id: UUID, definition: AnchorFeatureDef) -> UUID: + """ + Create a new anchor feature under the anchor in the project + """ + pass + + @abstractmethod + def create_project_derived_feature(self, project_id: UUID, definition: DerivedFeatureDef) -> UUID: + """ + Create a new derived feature under the project + """ + pass diff --git a/registry/sql-registry/registry/models.py b/registry/sql-registry/registry/models.py index 3c08d2692..c613026bd 100644 --- a/registry/sql-registry/registry/models.py +++ b/registry/sql-registry/registry/models.py @@ -6,7 +6,7 @@ import re -def _to_snake(d, level: int = 0): +def to_snake(d, level: int = 0): """ Convert `string`, `list[string]`, or all keys in a `dict` into snake case The maximum length of input string or list is 100, or it will be truncated before being processed, for dict, the exception will be thrown if it has more than 100 keys. @@ -16,13 +16,13 @@ def _to_snake(d, level: int = 0): raise ValueError("Too many nested levels") if isinstance(d, str): d = d[:100] - return re.sub(r'([A-Z]\w+$)', r'_\1', d).lower() + return re.sub(r'(? 100: raise ValueError("Dict has too many keys") - return {_to_snake(a, level + 1): _to_snake(b, level + 1) if isinstance(b, (dict, list)) else b for a, b in d.items()} + return {to_snake(a, level + 1): to_snake(b, level + 1) if isinstance(b, (dict, list)) else b for a, b in d.items()} def _to_type(value, type): @@ -39,10 +39,10 @@ def _to_type(value, type): if hasattr(type, "new"): try: # The convention is to use `new` method to create the object from a dict - return type.new(**_to_snake(value)) + return type.new(**to_snake(value)) except TypeError: pass - return type(**_to_snake(value)) + return type(**to_snake(value)) if issubclass(type, Enum): try: n = int(value) @@ -140,6 +140,12 @@ def __init__(self, self.dimension_type = _to_type(dimension_type, ValueType) self.val_type = _to_type(val_type, ValueType) + def __eq__(self, o: object) -> bool: + return self.type == o.type \ + and self.tensor_category == o.tensor_category \ + and self.dimension_type == o.dimension_type \ + and self.val_type == o.val_type + def to_dict(self) -> dict: return { "type": self.type.name, @@ -162,6 +168,13 @@ def __init__(self, self.description = description self.key_column_alias = key_column_alias + def __eq__(self, o: object) -> bool: + if not isinstance(o, TypedKey): + return False + return self.key_column == o.key_column \ + and self.key_column_type == o.key_column_type \ + and self.key_column_alias == o.key_column_alias + def to_dict(self) -> dict: ret = { "key_column": self.key_column, @@ -193,6 +206,11 @@ class ExpressionTransformation(Transformation): def __init__(self, transform_expr: str): self.transform_expr = transform_expr + def __eq__(self, o: object) -> bool: + if not isinstance(o, ExpressionTransformation): + return False + return self.transform_expr == o.transform_expr + def to_dict(self) -> dict: return { "transform_expr": self.transform_expr @@ -214,6 +232,16 @@ def __init__(self, self.filter = filter self.limit = limit + def __eq__(self, o: object) -> bool: + if not isinstance(o, WindowAggregationTransformation): + return False + return self.def_expr == o.def_expr \ + and self.agg_func == o.agg_func \ + and self.window == o.window \ + and self.group_by == o.group_by \ + and self.filter == o.filter \ + and self.limit == o.limit + def to_dict(self) -> dict: ret = { "def_expr": self.def_expr, @@ -235,6 +263,11 @@ class UdfTransformation(Transformation): def __init__(self, name: str): self.name = name + def __eq__(self, o: object) -> bool: + if not isinstance(o, UdfTransformation): + return False + return self.name == o.name + def to_dict(self) -> dict: return { "name": self.name @@ -299,7 +332,7 @@ def __init__(self, self.attributes = attributes else: self.attributes = Attributes.new( - entity_type, **_to_snake(attributes)) + entity_type, **to_snake(attributes)) def get_ref(self) -> EntityRef: return EntityRef(self.id, @@ -432,9 +465,9 @@ def __init__(self, self._source = None self._features = [] # if source is not None: - # self._source = source.get_ref() - # if len(features)>0: - # self._set_feature(features) + # self._source = _to_type(source, Entity).get_ref() + # if features: + # self.features = features self.tags = tags @property @@ -522,8 +555,8 @@ def __init__(self, type: Union[dict, FeatureType], transformation: Union[dict, Transformation], key: list[Union[dict, TypedKey]], - # input_anchor_features: list[Union[dict, EntityRef, Entity]] = [], - # input_derived_features: list[Union[dict, EntityRef, Entity]] = [], + input_anchor_features: list[Union[dict, EntityRef, Entity]] = [], + input_derived_features: list[Union[dict, EntityRef, Entity]] = [], tags: dict = {}, **kwargs): self.qualified_name = qualified_name @@ -534,8 +567,6 @@ def __init__(self, self._input_anchor_features = [] self._input_derived_features = [] self.tags = tags - # self._set_input_anchor_features(input_anchor_features) - # self._set_input_derived_features(input_derived_features) @property def entity_type(self) -> EntityType: @@ -546,22 +577,27 @@ def input_features(self): return self._input_anchor_features + self._input_derived_features @input_features.setter - def input_features(self, v: Union[dict, Entity]): + def input_features(self, input_features_list: Union[dict, Entity, EntityRef]): self._input_anchor_features = [] self._input_derived_features = [] - for f in v: - e = None - if isinstance(f, Entity): - e = f - elif isinstance(f, dict): - e = _to_type(f, Entity) + for feature in input_features_list: + entity = None + if isinstance(feature, EntityRef): + entity = feature + elif isinstance(feature, Entity): + entity = feature.get_ref() + elif isinstance(feature, dict): + try: + entity = _to_type(feature, Entity).get_ref() + except: + entity = _to_type(feature, EntityRef) else: - raise TypeError(f) + raise TypeError(feature) - if e.entity_type == EntityType.AnchorFeature: - self._input_anchor_features.append(e) - elif e.entity_type == EntityType.DerivedFeature: - self._input_derived_features.append(e) + if entity.entity_type == EntityType.AnchorFeature: + self._input_anchor_features.append(entity) + elif entity.entity_type == EntityType.DerivedFeature: + self._input_derived_features.append(entity) else: pass @@ -569,38 +605,10 @@ def input_features(self, v: Union[dict, Entity]): def input_anchor_features(self): return self._input_anchor_features - # @input_anchor_features.setter - # def input_anchor_features(self, v): - # self._input_anchor_features = [] - # for f in v: - # if isinstance(f, Entity): - # self._input_anchor_features.append(f.get_ref()) - # elif isinstance(f, EntityRef): - # self._input_anchor_features.append(f) - # elif isinstance(f, dict): - # self._input_anchor_features.append( - # to_type(f, Entity).get_ref()) - # else: - # raise TypeError(f) - @property def input_derived_features(self): return self._input_derived_features - # @input_derived_features.setter - # def input_derived_features(self, v): - # self._input_derived_features = [] - # for f in v: - # if isinstance(f, Entity): - # self._input_derived_features.append(f.get_ref()) - # elif isinstance(f, EntityRef): - # self._input_derived_features.append(f) - # elif isinstance(f, dict): - # self._input_derived_features.append( - # to_type(f, Entity).get_ref()) - # else: - # raise TypeError(f) - def to_dict(self) -> dict: return { "qualifiedName": self.qualified_name, @@ -608,8 +616,8 @@ def to_dict(self) -> dict: "type": self.type.to_dict(), "transformation": self.transformation.to_dict(), "key": list([k.to_dict() for k in self.key]), - "input_anchor_features": [e.get_ref().to_dict() for e in self.input_anchor_features], - "input_derived_features": [e.get_ref().to_dict() for e in self.input_derived_features], + "input_anchor_features": [e.to_dict() for e in self.input_anchor_features], + "input_derived_features": [e.to_dict() for e in self.input_derived_features], "tags": self.tags, } @@ -654,18 +662,21 @@ def to_dict(self) -> dict: class ProjectDef: - def __init__(self, qualified_name: str, tags: dict = {}): + def __init__(self, name: str, qualified_name: str = "", tags: dict = {}): + self.name = name self.qualified_name = qualified_name - self.name = qualified_name self.tags = tags + + def to_attr(self) -> ProjectAttributes: + return ProjectAttributes(name=self.name, tags=self.tags) class SourceDef: def __init__(self, - qualified_name: str, name: str, path: str, type: str, + qualified_name: str = "", preprocessing: Optional[str] = None, event_timestamp_column: Optional[str] = None, timestamp_format: Optional[str] = None, @@ -679,26 +690,41 @@ def __init__(self, self.timestamp_format = timestamp_format self.tags = tags + def to_attr(self) -> SourceAttributes: + return SourceAttributes(qualified_name=self.qualified_name, + name=self.name, + type=self.type, + path=self.path, + preprocessing=self.preprocessing, + event_timestamp_column=self.event_timestamp_column, + timestamp_format=self.timestamp_format, + tags=self.tags) class AnchorDef: def __init__(self, - qualified_name: str, name: str, source_id: Union[str, UUID], + qualified_name: str = "", tags: dict = {}): self.qualified_name = qualified_name self.name = name self.source_id = _to_uuid(source_id) self.tags = tags + def to_attr(self, source: EntityRef) -> AnchorAttributes: + attr = AnchorAttributes(qualified_name=self.qualified_name, + name=self.name, + tags=self.tags) + attr.source = source + return attr class AnchorFeatureDef: def __init__(self, - qualified_name: str, name: str, feature_type: Union[dict, FeatureType], transformation: Union[dict, Transformation], key: list[Union[dict, TypedKey]], + qualified_name: str = "", tags: dict = {}): self.qualified_name = qualified_name self.name = name @@ -707,16 +733,24 @@ def __init__(self, self.key = _to_type(key, TypedKey) self.tags = tags + def to_attr(self) -> AnchorFeatureAttributes: + return AnchorFeatureAttributes(qualified_name=self.qualified_name, + name=self.name, + type=self.feature_type, + transformation=self.transformation, + key=self.key, + tags=self.tags) + class DerivedFeatureDef: def __init__(self, - qualified_name: str, name: str, feature_type: Union[dict, FeatureType], transformation: Union[dict, Transformation], key: list[Union[dict, TypedKey]], input_anchor_features: list[Union[str, UUID]], input_derived_features: list[Union[str, UUID]], + qualified_name: str = "", tags: dict = {}): self.qualified_name = qualified_name self.name = name @@ -726,3 +760,14 @@ def __init__(self, self.input_anchor_features = _to_uuid(input_anchor_features) self.input_derived_features = _to_uuid(input_derived_features) self.tags = tags + + def to_attr(self, input_features: list[EntityRef]) -> DerivedFeatureAttributes: + attr = DerivedFeatureAttributes(qualified_name=self.qualified_name, + name=self.name, + type=self.feature_type, + transformation=self.transformation, + key=self.key, + tags=self.tags) + attr.input_features = input_features + return attr + diff --git a/registry/sql-registry/test/test_create.py b/registry/sql-registry/test/test_create.py new file mode 100644 index 000000000..d3077698b --- /dev/null +++ b/registry/sql-registry/test/test_create.py @@ -0,0 +1,58 @@ +import registry +from registry.db_registry import quote +from registry.models import AnchorDef, AnchorFeatureDef, DerivedFeatureDef, ExpressionTransformation, FeatureType, ProjectDef, SourceDef, TensorCategory, Transformation, TypedKey, ValueType, VectorType + +r = registry.DbRegistry() + + +def cleanup(): + with r.conn.transaction() as c: + ids = quote([project1_id, source1_id, anchor1_id, af1_id, df1_id]) + c.execute( + f"delete from edges where from_id in ({ids}) or to_id in ({ids})") + c.execute( + f"delete from entities where entity_id in ({ids})") + + +project1_id = r.create_project(ProjectDef("unit_test_project_1")) +print("project1 id ", project1_id) +project1 = r.get_entity(project1_id) +assert project1.qualified_name == "unit_test_project_1" + +# Re-create project, should return the same id +id = r.create_project(ProjectDef("unit_test_project_1")) +assert project1_id == id + +source1_id = r.create_project_datasource(project1_id, SourceDef( + qualified_name="unit_test_project_1__source1", name="source1", path="hdfs://somewhere", type="hdfs")) +print("source1 id ", source1_id) +source1 = r.get_entity(source1_id) +assert source1.qualified_name == "unit_test_project_1__source1" + +anchor1_id = r.create_project_anchor(project1_id, AnchorDef( + qualified_name="unit_test_project_1__anchor1", name="anchor1", source_id=source1_id)) +print("anchor1 id ", anchor1_id) +anchor1 = r.get_entity(anchor1_id) +assert anchor1.qualified_name == "unit_test_project_1__anchor1" +# anchor1 has source "source1" +assert anchor1.attributes.source.id == source1_id + +ft1 = FeatureType(type=VectorType.TENSOR, tensor_category=TensorCategory.DENSE, + dimension_type=[], val_type=ValueType.INT) +t1 = ExpressionTransformation("af1") +k = TypedKey(key_column="c1", key_column_type=ValueType.INT) +af1_id = r.create_project_anchor_feature(project1_id, anchor1_id, AnchorFeatureDef( + qualified_name="unit_test_project_1__anchor1__af1", name="af1", feature_type=ft1, transformation=t1, key=[k])) +print("af1 id ", af1_id) +af1 = r.get_entity(af1_id) +assert af1.qualified_name == "unit_test_project_1__anchor1__af1" + +df1_id = r.create_project_derived_feature(project1_id, DerivedFeatureDef(qualified_name="unit_test_project_1__df1", + name="df1", feature_type=ft1, transformation=t1, key=[k], input_anchor_features=[af1_id], input_derived_features=[])) +print("df1 id ", df1_id) +df1 = r.get_entity(df1_id) +assert df1.qualified_name == "unit_test_project_1__df1" +# df1 has only 1 input anchor feature "af1" +assert df1.attributes.input_anchor_features[0].id == af1_id + +# cleanup() diff --git a/ui/src/components/featureList.tsx b/ui/src/components/featureList.tsx index 429fea677..0b4f72dd2 100644 --- a/ui/src/components/featureList.tsx +++ b/ui/src/components/featureList.tsx @@ -120,7 +120,7 @@ const FeatureList: React.FC = () => { onKeywordChange(e.target.value) } onPressEnter={ fetchData } /> + onChange={ (e) => onKeywordChange(e.target.value) } onPressEnter={ onClickSearch } />