From 2b6d4c810addb76b6cc24ca980252b0d5ebe9aae Mon Sep 17 00:00:00 2001 From: Pankaj Date: Tue, 19 Mar 2024 15:59:51 +0530 Subject: [PATCH] Add docs --- .../providers/apache/hive/example_dags/example_hive.py | 5 +++-- .../providers/apache/livy/example_dags/example_livy.py | 5 +++-- astronomer/providers/sftp/example_dags/example_sftp.py | 9 ++++++--- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/astronomer/providers/apache/hive/example_dags/example_hive.py b/astronomer/providers/apache/hive/example_dags/example_hive.py index c6d8c50e4..5123634ad 100644 --- a/astronomer/providers/apache/hive/example_dags/example_hive.py +++ b/astronomer/providers/apache/hive/example_dags/example_hive.py @@ -212,6 +212,7 @@ def add_inbound_rule_for_security_group(task_instance: Any) -> None: def revoke_inbound_rules(task_instance): + """Remove an ingress rule from security group""" import boto3 current_docker_ip = get("https://api.ipify.org").text @@ -222,8 +223,8 @@ def revoke_inbound_rules(task_instance): CidrIp=ip_range, FromPort=22, GroupId=task_instance.xcom_pull( - key="cluster_response_master_security_group", task_ids=["describe_created_cluster"] - )[0], + key="cluster_response_master_security_group", task_ids=["describe_created_cluster"] + )[0], IpProtocol="tcp", ) logging.info("%s", response) diff --git a/astronomer/providers/apache/livy/example_dags/example_livy.py b/astronomer/providers/apache/livy/example_dags/example_livy.py index bad0fb420..171f54231 100644 --- a/astronomer/providers/apache/livy/example_dags/example_livy.py +++ b/astronomer/providers/apache/livy/example_dags/example_livy.py @@ -175,6 +175,7 @@ def add_inbound_rule_for_security_group(task_instance: Any) -> None: def revoke_inbound_rules(task_instance): + """Remove an ingress rule from security group""" import boto3 current_docker_ip = get("https://api.ipify.org").text @@ -186,8 +187,8 @@ def revoke_inbound_rules(task_instance): FromPort=LIVY_OPERATOR_INGRESS_PORT, ToPort=LIVY_OPERATOR_INGRESS_PORT, GroupId=task_instance.xcom_pull( - key="cluster_response_master_security_group", task_ids=["describe_created_cluster"] - )[0], + key="cluster_response_master_security_group", task_ids=["describe_created_cluster"] + )[0], IpProtocol="tcp", ) logging.info("%s", response) diff --git a/astronomer/providers/sftp/example_dags/example_sftp.py b/astronomer/providers/sftp/example_dags/example_sftp.py index a5be711a4..2d9d08c9b 100644 --- a/astronomer/providers/sftp/example_dags/example_sftp.py +++ b/astronomer/providers/sftp/example_dags/example_sftp.py @@ -158,6 +158,7 @@ def add_inbound_rule_for_security_group(task_instance: "TaskInstance") -> None: def revoke_inbound_rules(task_instance): + """Remove an ingress rule from security group""" import boto3 current_docker_ip = get("https://api.ipify.org").text @@ -169,8 +170,8 @@ def revoke_inbound_rules(task_instance): FromPort=22, ToPort=22, GroupId=task_instance.xcom_pull( - key="cluster_response_master_security_group", task_ids=["describe_created_cluster"] - )[0], + key="cluster_response_master_security_group", task_ids=["describe_created_cluster"] + )[0], IpProtocol="tcp", ) logging.info("%s", response) @@ -299,7 +300,9 @@ def check_dag_status(**kwargs: Any) -> None: ) revoke_inbound_rules = PythonOperator( - task_id="revoke_inbound_rules", trigger_rule=TriggerRule.ALL_DONE, python_callable=revoke_inbound_rules + task_id="revoke_inbound_rules", + trigger_rule=TriggerRule.ALL_DONE, + python_callable=revoke_inbound_rules, ) dag_final_status = PythonOperator(