From f0d333815055824a782eedd978d04ee9797cfbdc Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 28 Jun 2024 17:48:06 +0100 Subject: [PATCH] Use an Airflow config to manage provider configuration (#41) Before this change, the provider used standard operating system environment variables to receive user configuration. This PR changes these to Airflow config, which already gracefully handles booleans and supports setting the configuration via environment variables and via `airflow.cfg`. This PR introduces these changes: * Replace `ANYSCALE__AIRFLOW_TELEMETRY_ENABLED` by `AIRFLOW__ANYSCALE__CLI_TOKEN` (or equivalent `airflow.cfg`) * Support `AIRFLOW__ANYSCALE__CLI_TOKEN` (or equivalent `airflow.cfg`) as an alternative to `ANYSCALE_CLI_TOKEN` Note: the following is discussed in a separate PR (#42), so I removed changing the default value for telemetry from the current PR. ~~The current documentation needs to provide more information about the telemetry and data collection, which is a crucial aspect that needs to be addressed.~~ ~~Until the following questions are addressed, I believe it should be disabled by default:~~ ~~* How does enabling telemetry help users?~~ ~~* Where is the data being sent?~~ ~~* What data is being collected?~~ ~~* How is the data being used?~~ --- anyscale_provider/hooks/anyscale.py | 8 ++++---- anyscale_provider/operators/anyscale.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/anyscale_provider/hooks/anyscale.py b/anyscale_provider/hooks/anyscale.py index 7ab9049..9172cd1 100644 --- a/anyscale_provider/hooks/anyscale.py +++ b/anyscale_provider/hooks/anyscale.py @@ -5,6 +5,7 @@ from functools import cached_property from typing import Any +from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.hooks.base import BaseHook from anyscale import Anyscale @@ -36,16 +37,15 @@ def client(self) -> Anyscale: # If the token is not found in the connection, try to get it from the environment variable if not token: - self.log.info(f"Using token from ENV") - token = os.getenv("ANYSCALE_CLI_TOKEN") + self.log.info(f"Using token from config or ENV") + token = conf.get("anyscale", "cli_token", fallback=os.getenv("ANYSCALE_CLI_TOKEN")) if not token: raise AirflowException(f"Missing API token for connection id {self.conn_id}") # Add custom headers if telemetry is enabled - by default telemetry is enabled. headers = {} - telemetry_env = os.getenv("ANYSCALE__AIRFLOW_TELEMETRY_ENABLED", "true") - telemetry_enabled = telemetry_env.lower() in ["true", "1", "yes", "on"] + telemetry_enabled = conf.getboolean("anyscale", "telemetry_enabled", fallback=True) if telemetry_enabled: headers["X-Anyscale-Source"] = "airflow" diff --git a/anyscale_provider/operators/anyscale.py b/anyscale_provider/operators/anyscale.py index c6bdf7a..98f8923 100644 --- a/anyscale_provider/operators/anyscale.py +++ b/anyscale_provider/operators/anyscale.py @@ -329,7 +329,7 @@ def on_kill(self) -> None: self.log.info("Termination request received. Submitted request to terminate the anyscale service rollout.") return - def execute(self, context: Context) -> str | None: + def execute(self, context: Context) -> None: """ Execute the service rollout to Anyscale.