From 15ede4a1a1af8bebb52799420bf2f328406c126e Mon Sep 17 00:00:00 2001 From: VladaZakharova <80038284+VladaZakharova@users.noreply.github.com> Date: Mon, 7 Aug 2023 22:50:11 +0200 Subject: [PATCH] Fix DataflowStartSqlJobOperator system test (#32823) --- .../cloud/example_dags/example_dataflow.py | 291 -------------- .../example_dataflow_flex_template.py | 69 ---- .../example_dags/example_dataflow_sql.py | 67 ---- .../operators/cloud/dataflow.rst | 4 +- .../cloud/operators/test_dataflow_system.py | 368 ------------------ .../example_dataflow_native_python.py | 12 +- .../cloud/dataflow/example_dataflow_sql.py | 145 +++++++ 7 files changed, 158 insertions(+), 798 deletions(-) delete mode 100644 airflow/providers/google/cloud/example_dags/example_dataflow.py delete mode 100644 airflow/providers/google/cloud/example_dags/example_dataflow_flex_template.py delete mode 100644 airflow/providers/google/cloud/example_dags/example_dataflow_sql.py delete mode 100644 tests/providers/google/cloud/operators/test_dataflow_system.py create mode 100644 tests/system/providers/google/cloud/dataflow/example_dataflow_sql.py diff --git a/airflow/providers/google/cloud/example_dags/example_dataflow.py b/airflow/providers/google/cloud/example_dags/example_dataflow.py deleted file mode 100644 index 2ab4c04f5bb75..0000000000000 --- a/airflow/providers/google/cloud/example_dags/example_dataflow.py +++ /dev/null @@ -1,291 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -""" -Example Airflow DAG for Google Cloud Dataflow service -""" -from __future__ import annotations - -import os -from datetime import datetime -from typing import Callable -from urllib.parse import urlsplit - -from airflow import models -from airflow.exceptions import AirflowException -from airflow.providers.apache.beam.operators.beam import ( - BeamRunJavaPipelineOperator, - BeamRunPythonPipelineOperator, -) -from airflow.providers.google.cloud.hooks.dataflow import DataflowJobStatus -from airflow.providers.google.cloud.operators.dataflow import ( - CheckJobRunning, - DataflowStopJobOperator, - DataflowTemplatedJobStartOperator, -) -from airflow.providers.google.cloud.sensors.dataflow import ( - DataflowJobAutoScalingEventsSensor, - DataflowJobMessagesSensor, - DataflowJobMetricsSensor, - DataflowJobStatusSensor, -) -from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator - -START_DATE = datetime(2021, 1, 1) - -GCS_TMP = os.environ.get("GCP_DATAFLOW_GCS_TMP", "gs://INVALID BUCKET NAME/temp/") -GCS_STAGING = os.environ.get("GCP_DATAFLOW_GCS_STAGING", "gs://INVALID BUCKET NAME/staging/") -GCS_OUTPUT = os.environ.get("GCP_DATAFLOW_GCS_OUTPUT", "gs://INVALID BUCKET NAME/output") -GCS_JAR = os.environ.get("GCP_DATAFLOW_JAR", "gs://INVALID BUCKET NAME/word-count-beam-bundled-0.1.jar") -GCS_PYTHON = os.environ.get("GCP_DATAFLOW_PYTHON", "gs://INVALID BUCKET NAME/wordcount_debugging.py") -PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") - -GCS_JAR_PARTS = urlsplit(GCS_JAR) -GCS_JAR_BUCKET_NAME = GCS_JAR_PARTS.netloc -GCS_JAR_OBJECT_NAME = GCS_JAR_PARTS.path[1:] - -default_args = { - "dataflow_default_options": { - "tempLocation": GCS_TMP, - "stagingLocation": GCS_STAGING, - } -} - -with models.DAG( - "example_gcp_dataflow_native_java", - start_date=START_DATE, - catchup=False, - tags=["example"], -) as dag_native_java: - - # [START howto_operator_start_java_job_jar_on_gcs] - start_java_job = BeamRunJavaPipelineOperator( - task_id="start-java-job", - jar=GCS_JAR, - pipeline_options={ - "output": GCS_OUTPUT, - }, - job_class="org.apache.beam.examples.WordCount", - dataflow_config={ - "check_if_running": CheckJobRunning.IgnoreJob, - "location": "europe-west3", - "poll_sleep": 10, - }, - ) - # [END howto_operator_start_java_job_jar_on_gcs] - - # [START howto_operator_start_java_job_local_jar] - jar_to_local = GCSToLocalFilesystemOperator( - task_id="jar-to-local", - bucket=GCS_JAR_BUCKET_NAME, - object_name=GCS_JAR_OBJECT_NAME, - filename="/tmp/dataflow-{{ ds_nodash }}.jar", - ) - - start_java_job_local = BeamRunJavaPipelineOperator( - task_id="start-java-job-local", - jar="/tmp/dataflow-{{ ds_nodash }}.jar", - pipeline_options={ - "output": GCS_OUTPUT, - }, - job_class="org.apache.beam.examples.WordCount", - dataflow_config={ - "check_if_running": CheckJobRunning.WaitForRun, - "location": "europe-west3", - "poll_sleep": 10, - }, - ) - jar_to_local >> start_java_job_local - # [END howto_operator_start_java_job_local_jar] - -with models.DAG( - "example_gcp_dataflow_native_python", - default_args=default_args, - start_date=START_DATE, - catchup=False, - tags=["example"], -) as dag_native_python: - - # [START howto_operator_start_python_job] - start_python_job = BeamRunPythonPipelineOperator( - task_id="start-python-job", - py_file=GCS_PYTHON, - py_options=[], - pipeline_options={ - "output": GCS_OUTPUT, - }, - py_requirements=["apache-beam[gcp]==2.21.0"], - py_interpreter="python3", - py_system_site_packages=False, - dataflow_config={"location": "europe-west3"}, - ) - # [END howto_operator_start_python_job] - - start_python_job_local = BeamRunPythonPipelineOperator( - task_id="start-python-job-local", - py_file="apache_beam.examples.wordcount", - py_options=["-m"], - pipeline_options={ - "output": GCS_OUTPUT, - }, - py_requirements=["apache-beam[gcp]==2.14.0"], - py_interpreter="python3", - py_system_site_packages=False, - ) - -with models.DAG( - "example_gcp_dataflow_native_python_async", - default_args=default_args, - start_date=START_DATE, - catchup=False, - tags=["example"], -) as dag_native_python_async: - # [START howto_operator_start_python_job_async] - start_python_job_async = BeamRunPythonPipelineOperator( - task_id="start-python-job-async", - runner="DataflowRunner", - py_file=GCS_PYTHON, - py_options=[], - pipeline_options={ - "output": GCS_OUTPUT, - }, - py_requirements=["apache-beam[gcp]==2.25.0"], - py_interpreter="python3", - py_system_site_packages=False, - dataflow_config={ - "job_name": "start-python-job-async", - "location": "europe-west3", - "wait_until_finished": False, - }, - ) - # [END howto_operator_start_python_job_async] - - # [START howto_sensor_wait_for_job_status] - wait_for_python_job_async_done = DataflowJobStatusSensor( - task_id="wait-for-python-job-async-done", - job_id="{{task_instance.xcom_pull('start-python-job-async')['id']}}", - expected_statuses={DataflowJobStatus.JOB_STATE_DONE}, - location="europe-west3", - ) - # [END howto_sensor_wait_for_job_status] - - # [START howto_sensor_wait_for_job_metric] - def check_metric_scalar_gte(metric_name: str, value: int) -> Callable: - """Check is metric greater than equals to given value.""" - - def callback(metrics: list[dict]) -> bool: - dag_native_python_async.log.info("Looking for '%s' >= %d", metric_name, value) - for metric in metrics: - context = metric.get("name", {}).get("context", {}) - original_name = context.get("original_name", "") - tentative = context.get("tentative", "") - if original_name == "Service-cpu_num_seconds" and not tentative: - return metric["scalar"] >= value - raise AirflowException(f"Metric '{metric_name}' not found in metrics") - - return callback - - wait_for_python_job_async_metric = DataflowJobMetricsSensor( - task_id="wait-for-python-job-async-metric", - job_id="{{task_instance.xcom_pull('start-python-job-async')['id']}}", - location="europe-west3", - callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100), - fail_on_terminal_state=False, - ) - # [END howto_sensor_wait_for_job_metric] - - # [START howto_sensor_wait_for_job_message] - def check_message(messages: list[dict]) -> bool: - """Check message""" - for message in messages: - if "Adding workflow start and stop steps." in message.get("messageText", ""): - return True - return False - - wait_for_python_job_async_message = DataflowJobMessagesSensor( - task_id="wait-for-python-job-async-message", - job_id="{{task_instance.xcom_pull('start-python-job-async')['id']}}", - location="europe-west3", - callback=check_message, - fail_on_terminal_state=False, - ) - # [END howto_sensor_wait_for_job_message] - - # [START howto_sensor_wait_for_job_autoscaling_event] - def check_autoscaling_event(autoscaling_events: list[dict]) -> bool: - """Check autoscaling event""" - for autoscaling_event in autoscaling_events: - if "Worker pool started." in autoscaling_event.get("description", {}).get("messageText", ""): - return True - return False - - wait_for_python_job_async_autoscaling_event = DataflowJobAutoScalingEventsSensor( - task_id="wait-for-python-job-async-autoscaling-event", - job_id="{{task_instance.xcom_pull('start-python-job-async')['id']}}", - location="europe-west3", - callback=check_autoscaling_event, - fail_on_terminal_state=False, - ) - # [END howto_sensor_wait_for_job_autoscaling_event] - - start_python_job_async >> wait_for_python_job_async_done - start_python_job_async >> wait_for_python_job_async_metric - start_python_job_async >> wait_for_python_job_async_message - start_python_job_async >> wait_for_python_job_async_autoscaling_event - - -with models.DAG( - "example_gcp_dataflow_template", - default_args=default_args, - start_date=START_DATE, - catchup=False, - tags=["example"], -) as dag_template: - # [START howto_operator_start_template_job] - start_template_job = DataflowTemplatedJobStartOperator( - task_id="start-template-job", - project_id=PROJECT_ID, - template="gs://dataflow-templates/latest/Word_Count", - parameters={"inputFile": "gs://dataflow-samples/shakespeare/kinglear.txt", "output": GCS_OUTPUT}, - location="europe-west3", - ) - # [END howto_operator_start_template_job] - -with models.DAG( - "example_gcp_stop_dataflow_job", - default_args=default_args, - start_date=START_DATE, - catchup=False, - tags=["example"], -) as dag_template: - # [START howto_operator_stop_dataflow_job] - stop_dataflow_job = DataflowStopJobOperator( - task_id="stop-dataflow-job", - location="europe-west3", - job_name_prefix="start-template-job", - ) - # [END howto_operator_stop_dataflow_job] - start_template_job = DataflowTemplatedJobStartOperator( - task_id="start-template-job", - project_id=PROJECT_ID, - template="gs://dataflow-templates/latest/Word_Count", - parameters={"inputFile": "gs://dataflow-samples/shakespeare/kinglear.txt", "output": GCS_OUTPUT}, - location="europe-west3", - append_job_name=False, - ) - - stop_dataflow_job >> start_template_job diff --git a/airflow/providers/google/cloud/example_dags/example_dataflow_flex_template.py b/airflow/providers/google/cloud/example_dags/example_dataflow_flex_template.py deleted file mode 100644 index 8af748e8a70c5..0000000000000 --- a/airflow/providers/google/cloud/example_dags/example_dataflow_flex_template.py +++ /dev/null @@ -1,69 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -""" -Example Airflow DAG for Google Cloud Dataflow service -""" -from __future__ import annotations - -import os -from datetime import datetime - -from airflow import models -from airflow.providers.google.cloud.operators.dataflow import DataflowStartFlexTemplateOperator - -GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") - -DATAFLOW_FLEX_TEMPLATE_JOB_NAME = os.environ.get( - "GCP_DATAFLOW_FLEX_TEMPLATE_JOB_NAME", "dataflow-flex-template" -) - -# For simplicity we use the same topic name as the subscription name. -PUBSUB_FLEX_TEMPLATE_TOPIC = os.environ.get( - "GCP_DATAFLOW_PUBSUB_FLEX_TEMPLATE_TOPIC", "dataflow-flex-template" -) -PUBSUB_FLEX_TEMPLATE_SUBSCRIPTION = PUBSUB_FLEX_TEMPLATE_TOPIC -GCS_FLEX_TEMPLATE_TEMPLATE_PATH = os.environ.get( - "GCP_DATAFLOW_GCS_FLEX_TEMPLATE_TEMPLATE_PATH", - "gs://INVALID BUCKET NAME/samples/dataflow/templates/streaming-beam-sql.json", -) -BQ_FLEX_TEMPLATE_DATASET = os.environ.get("GCP_DATAFLOW_BQ_FLEX_TEMPLATE_DATASET", "airflow_dataflow_samples") -BQ_FLEX_TEMPLATE_LOCATION = os.environ.get("GCP_DATAFLOW_BQ_FLEX_TEMPLATE_LOCATION>", "us-west1") - -with models.DAG( - dag_id="example_gcp_dataflow_flex_template_java", - start_date=datetime(2021, 1, 1), - catchup=False, -) as dag_flex_template: - # [START howto_operator_start_template_job] - start_flex_template = DataflowStartFlexTemplateOperator( - task_id="start_flex_template_streaming_beam_sql", - project_id=GCP_PROJECT_ID, - body={ - "launchParameter": { - "containerSpecGcsPath": GCS_FLEX_TEMPLATE_TEMPLATE_PATH, - "jobName": DATAFLOW_FLEX_TEMPLATE_JOB_NAME, - "parameters": { - "inputSubscription": PUBSUB_FLEX_TEMPLATE_SUBSCRIPTION, - "outputTable": f"{GCP_PROJECT_ID}:{BQ_FLEX_TEMPLATE_DATASET}.streaming_beam_sql", - }, - } - }, - do_xcom_push=True, - location=BQ_FLEX_TEMPLATE_LOCATION, - ) - # [END howto_operator_start_template_job] diff --git a/airflow/providers/google/cloud/example_dags/example_dataflow_sql.py b/airflow/providers/google/cloud/example_dags/example_dataflow_sql.py deleted file mode 100644 index 3ef0626f6db10..0000000000000 --- a/airflow/providers/google/cloud/example_dags/example_dataflow_sql.py +++ /dev/null @@ -1,67 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -""" -Example Airflow DAG for Google Cloud Dataflow service -""" -from __future__ import annotations - -import os -from datetime import datetime - -from airflow import models -from airflow.providers.google.cloud.operators.dataflow import DataflowStartSqlJobOperator - -GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") - -BQ_SQL_DATASET = os.environ.get("GCP_DATAFLOW_BQ_SQL_DATASET", "airflow_dataflow_samples") -BQ_SQL_TABLE_INPUT = os.environ.get("GCP_DATAFLOW_BQ_SQL_TABLE_INPUT", "beam_input") -BQ_SQL_TABLE_OUTPUT = os.environ.get("GCP_DATAFLOW_BQ_SQL_TABLE_OUTPUT", "beam_output") -DATAFLOW_SQL_JOB_NAME = os.environ.get("GCP_DATAFLOW_SQL_JOB_NAME", "dataflow-sql") -DATAFLOW_SQL_LOCATION = os.environ.get("GCP_DATAFLOW_SQL_LOCATION", "us-west1") - - -with models.DAG( - dag_id="example_gcp_dataflow_sql", - start_date=datetime(2021, 1, 1), - catchup=False, - tags=["example"], -) as dag_sql: - # [START howto_operator_start_sql_job] - start_sql = DataflowStartSqlJobOperator( - task_id="start_sql_query", - job_name=DATAFLOW_SQL_JOB_NAME, - query=f""" - SELECT - sales_region as sales_region, - count(state_id) as count_state - FROM - bigquery.table.`{GCP_PROJECT_ID}`.`{BQ_SQL_DATASET}`.`{BQ_SQL_TABLE_INPUT}` - WHERE state_id >= @state_id_min - GROUP BY sales_region; - """, - options={ - "bigquery-project": GCP_PROJECT_ID, - "bigquery-dataset": BQ_SQL_DATASET, - "bigquery-table": BQ_SQL_TABLE_OUTPUT, - "bigquery-write-disposition": "write-truncate", - "parameter": "state_id_min:INT64:2", - }, - location=DATAFLOW_SQL_LOCATION, - do_xcom_push=True, - ) - # [END howto_operator_start_sql_job] diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataflow.rst b/docs/apache-airflow-providers-google/operators/cloud/dataflow.rst index da4923c6e2d5d..366f14f872e03 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/dataflow.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/dataflow.rst @@ -225,7 +225,7 @@ extensions for running Dataflow streaming jobs. Here is an example of running Dataflow SQL job with :class:`~airflow.providers.google.cloud.operators.dataflow.DataflowStartSqlJobOperator`: -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataflow_sql.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataflow/example_dataflow_sql.py :language: python :dedent: 4 :start-after: [START howto_operator_start_sql_job] @@ -247,7 +247,7 @@ To stop one or more Dataflow pipelines you can use Streaming pipelines are drained by default, setting ``drain_pipeline`` to ``False`` will cancel them instead. Provide ``job_id`` to stop a specific job, or ``job_name_prefix`` to stop all jobs with provided name prefix. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataflow.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataflow/example_dataflow_native_python.py :language: python :dedent: 4 :start-after: [START howto_operator_stop_dataflow_job] diff --git a/tests/providers/google/cloud/operators/test_dataflow_system.py b/tests/providers/google/cloud/operators/test_dataflow_system.py deleted file mode 100644 index cf971dfdd4dca..0000000000000 --- a/tests/providers/google/cloud/operators/test_dataflow_system.py +++ /dev/null @@ -1,368 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import json -import os -import shlex -import textwrap -from tempfile import NamedTemporaryFile -from urllib.parse import urlsplit - -import pytest -import requests - -from airflow.providers.google.cloud.example_dags.example_dataflow_flex_template import ( - BQ_FLEX_TEMPLATE_DATASET, - BQ_FLEX_TEMPLATE_LOCATION, - DATAFLOW_FLEX_TEMPLATE_JOB_NAME, - GCS_FLEX_TEMPLATE_TEMPLATE_PATH, - PUBSUB_FLEX_TEMPLATE_SUBSCRIPTION, - PUBSUB_FLEX_TEMPLATE_TOPIC, -) -from airflow.providers.google.cloud.example_dags.example_dataflow_sql import ( - BQ_SQL_DATASET, - DATAFLOW_SQL_JOB_NAME, - DATAFLOW_SQL_LOCATION, -) -from tests.providers.google.cloud.utils.gcp_authenticator import GCP_DATAFLOW_KEY, GCP_GCS_TRANSFER_KEY -from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context - - -@pytest.mark.backend("mysql", "postgres") -@pytest.mark.credential_file(GCP_DATAFLOW_KEY) -class TestCloudDataflowExampleDagsSystem(GoogleSystemTest): - @provide_gcp_context(GCP_DATAFLOW_KEY) - def test_run_example_gcp_dataflow_native_java(self): - self.run_dag("example_gcp_dataflow_native_java", CLOUD_DAG_FOLDER) - - @provide_gcp_context(GCP_DATAFLOW_KEY) - def test_run_example_gcp_dataflow_native_python(self): - self.run_dag("example_gcp_dataflow_native_python", CLOUD_DAG_FOLDER) - - @provide_gcp_context(GCP_DATAFLOW_KEY) - def test_run_example_gcp_dataflow_native_python_async(self): - self.run_dag("example_gcp_dataflow_native_python_async", CLOUD_DAG_FOLDER) - - @provide_gcp_context(GCP_DATAFLOW_KEY) - def test_run_example_gcp_dataflow_template(self): - self.run_dag("example_gcp_dataflow_template", CLOUD_DAG_FOLDER) - - -GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") -GCR_FLEX_TEMPLATE_IMAGE = f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest" - -# https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql -GCS_TEMPLATE_PARTS = urlsplit(GCS_FLEX_TEMPLATE_TEMPLATE_PATH) -GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc - - -EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples" -EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe" -EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql" - - -@pytest.mark.backend("mysql", "postgres") -@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY) -class TestCloudDataflowExampleDagFlexTemplateJavaSystem(GoogleSystemTest): - @provide_gcp_context(GCP_GCS_TRANSFER_KEY, project_id=GoogleSystemTest._project_id()) - def setup_method(self) -> None: - # Create a Cloud Storage bucket - self.execute_cmd(["gsutil", "mb", f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"]) - - # Build image with pipeline - with NamedTemporaryFile("w") as f: - cloud_build_config = { - "steps": [ - {"name": "gcr.io/cloud-builders/git", "args": ["clone", "$_EXAMPLE_REPO", "repo_dir"]}, - { - "name": "gcr.io/cloud-builders/git", - "args": ["checkout", "$_EXAMPLE_COMMIT"], - "dir": "repo_dir", - }, - { - "name": "maven", - "args": ["mvn", "clean", "package"], - "dir": "repo_dir/$_EXAMPLE_SUBDIR", - }, - { - "name": "gcr.io/cloud-builders/docker", - "args": ["build", "-t", "$_TEMPLATE_IMAGE", "."], - "dir": "repo_dir/$_EXAMPLE_SUBDIR", - }, - ], - "images": ["$_TEMPLATE_IMAGE"], - } - f.write(json.dumps(cloud_build_config)) - f.flush() - self.execute_cmd(["cat", f.name]) - substitutions = { - "_TEMPLATE_IMAGE": GCR_FLEX_TEMPLATE_IMAGE, - "_EXAMPLE_REPO": f"https://github.com/{EXAMPLE_FLEX_TEMPLATE_REPO}.git", - "_EXAMPLE_SUBDIR": EXAMPLE_FLEX_TEMPLATE_SUBDIR, - "_EXAMPLE_COMMIT": EXAMPLE_FLEX_TEMPLATE_COMMIT, - } - self.execute_cmd( - [ - "gcloud", - "builds", - "submit", - "--substitutions=" + ",".join(f"{k}={shlex.quote(v)}" for k, v in substitutions.items()), - f"--config={f.name}", - "--no-source", - ] - ) - - # Build template - with NamedTemporaryFile() as f: # type: ignore - manifest_url = ( - f"https://raw.githubusercontent.com/" - f"{EXAMPLE_FLEX_TEMPLATE_REPO}/{EXAMPLE_FLEX_TEMPLATE_COMMIT}/" - f"{EXAMPLE_FLEX_TEMPLATE_SUBDIR}/metadata.json" - ) - f.write(requests.get(manifest_url).content) # type: ignore - f.flush() - self.execute_cmd( - [ - "gcloud", - "beta", - "dataflow", - "flex-template", - "build", - GCS_FLEX_TEMPLATE_TEMPLATE_PATH, - "--image", - GCR_FLEX_TEMPLATE_IMAGE, - "--sdk-language", - "JAVA", - "--metadata-file", - f.name, - ] - ) - - # Create a Pub/Sub topic and a subscription to that topic - self.execute_cmd(["gcloud", "pubsub", "topics", "create", PUBSUB_FLEX_TEMPLATE_TOPIC]) - self.execute_cmd( - [ - "gcloud", - "pubsub", - "subscriptions", - "create", - "--topic", - PUBSUB_FLEX_TEMPLATE_TOPIC, - PUBSUB_FLEX_TEMPLATE_SUBSCRIPTION, - ] - ) - # Create a publisher for "positive ratings" that publishes 1 message per minute - self.execute_cmd( - [ - "gcloud", - "scheduler", - "jobs", - "create", - "pubsub", - "positive-ratings-publisher", - "--schedule=* * * * *", - f"--topic={PUBSUB_FLEX_TEMPLATE_TOPIC}", - '--message-body=\'{"url": "https://beam.apache.org/", "review": "positive"}\'', - ] - ) - # Create and run another similar publisher for "negative ratings" that - self.execute_cmd( - [ - "gcloud", - "scheduler", - "jobs", - "create", - "pubsub", - "negative-ratings-publisher", - "--schedule=*/2 * * * *", - f"--topic={PUBSUB_FLEX_TEMPLATE_TOPIC}", - '--message-body=\'{"url": "https://beam.apache.org/", "review": "negative"}\'', - ] - ) - - # Create a BigQuery dataset - self.execute_cmd(["bq", "mk", "--dataset", f"{self._project_id()}:{BQ_FLEX_TEMPLATE_DATASET}"]) - - @provide_gcp_context(GCP_GCS_TRANSFER_KEY) - def test_run_example_dag_function(self): - self.run_dag("example_gcp_dataflow_flex_template_java", CLOUD_DAG_FOLDER) - - @provide_gcp_context(GCP_GCS_TRANSFER_KEY, project_id=GoogleSystemTest._project_id()) - def teardown_method(self) -> None: - # Stop the Dataflow pipeline. - self.execute_cmd( - [ - "bash", - "-c", - textwrap.dedent( - f"""\ - gcloud dataflow jobs list \ - --region={BQ_FLEX_TEMPLATE_LOCATION} \ - --filter 'NAME:{DATAFLOW_FLEX_TEMPLATE_JOB_NAME} AND STATE=Running' \ - --format 'value(JOB_ID)' \ - | xargs -r gcloud dataflow jobs cancel --region={BQ_FLEX_TEMPLATE_LOCATION} - """ - ), - ] - ) - - # Delete the template spec file from Cloud Storage - self.execute_cmd(["gsutil", "rm", GCS_FLEX_TEMPLATE_TEMPLATE_PATH]) - - # Delete the Flex Template container image from Container Registry. - self.execute_cmd( - [ - "gcloud", - "container", - "images", - "delete", - GCR_FLEX_TEMPLATE_IMAGE, - "--force-delete-tags", - "--quiet", - ] - ) - - # Delete the Cloud Scheduler jobs. - self.execute_cmd(["gcloud", "scheduler", "jobs", "delete", "negative-ratings-publisher", "--quiet"]) - self.execute_cmd(["gcloud", "scheduler", "jobs", "delete", "positive-ratings-publisher", "--quiet"]) - - # Delete the Pub/Sub subscription and topic. - self.execute_cmd(["gcloud", "pubsub", "subscriptions", "delete", PUBSUB_FLEX_TEMPLATE_SUBSCRIPTION]) - self.execute_cmd(["gcloud", "pubsub", "topics", "delete", PUBSUB_FLEX_TEMPLATE_TOPIC]) - - # Delete the BigQuery dataset, - self.execute_cmd(["bq", "rm", "-r", "-f", "-d", f"{self._project_id()}:{BQ_FLEX_TEMPLATE_DATASET}"]) - - # Delete the Cloud Storage bucket - self.execute_cmd(["gsutil", "rm", "-r", f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"]) - super().tearDown() - - -@pytest.mark.backend("mysql", "postgres") -@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY) -class TestCloudDataflowExampleDagSqlSystem(GoogleSystemTest): - @provide_gcp_context(GCP_GCS_TRANSFER_KEY, project_id=GoogleSystemTest._project_id()) - def setup_method(self) -> None: - # Build image with pipeline - with NamedTemporaryFile(suffix=".csv") as f: - f.write( - textwrap.dedent( - """\ - state_id,state_code,state_name,sales_region - 1,MO,Missouri,Region_1 - 2,SC,South Carolina,Region_1 - 3,IN,Indiana,Region_1 - 6,DE,Delaware,Region_2 - 15,VT,Vermont,Region_2 - 16,DC,District of Columbia,Region_2 - 19,CT,Connecticut,Region_2 - 20,ME,Maine,Region_2 - 35,PA,Pennsylvania,Region_2 - 38,NJ,New Jersey,Region_2 - 47,MA,Massachusetts,Region_2 - 54,RI,Rhode Island,Region_2 - 55,NY,New York,Region_2 - 60,MD,Maryland,Region_2 - 66,NH,New Hampshire,Region_2 - 4,CA,California,Region_3 - 8,AK,Alaska,Region_3 - 37,WA,Washington,Region_3 - 61,OR,Oregon,Region_3 - 33,HI,Hawaii,Region_4 - 59,AS,American Samoa,Region_4 - 65,GU,Guam,Region_4 - 5,IA,Iowa,Region_5 - 32,NV,Nevada,Region_5 - 11,PR,Puerto Rico,Region_6 - 17,CO,Colorado,Region_6 - 18,MS,Mississippi,Region_6 - 41,AL,Alabama,Region_6 - 42,AR,Arkansas,Region_6 - 43,FL,Florida,Region_6 - 44,NM,New Mexico,Region_6 - 46,GA,Georgia,Region_6 - 48,KS,Kansas,Region_6 - 52,AZ,Arizona,Region_6 - 56,TN,Tennessee,Region_6 - 58,TX,Texas,Region_6 - 63,LA,Louisiana,Region_6 - 7,ID,Idaho,Region_7 - 12,IL,Illinois,Region_7 - 13,ND,North Dakota,Region_7 - 31,MN,Minnesota,Region_7 - 34,MT,Montana,Region_7 - 36,SD,South Dakota,Region_7 - 50,MI,Michigan,Region_7 - 51,UT,Utah,Region_7 - 64,WY,Wyoming,Region_7 - 9,NE,Nebraska,Region_8 - 10,VA,Virginia,Region_8 - 14,OK,Oklahoma,Region_8 - 39,NC,North Carolina,Region_8 - 40,WV,West Virginia,Region_8 - 45,KY,Kentucky,Region_8 - 53,WI,Wisconsin,Region_8 - 57,OH,Ohio,Region_8 - 49,VI,United States Virgin Islands,Region_9 - 62,MP,Commonwealth of the Northern Mariana Islands,Region_9 - """ - ).encode() - ) - f.flush() - - self.execute_cmd(["bq", "mk", "--dataset", f"{self._project_id()}:{BQ_SQL_DATASET}"]) - - self.execute_cmd( - ["bq", "load", "--autodetect", "--source_format=CSV", f"{BQ_SQL_DATASET}.beam_input", f.name] - ) - - @provide_gcp_context(GCP_GCS_TRANSFER_KEY, project_id=GoogleSystemTest._project_id()) - def test_run_example_dag_function(self): - self.run_dag("example_gcp_dataflow_sql", CLOUD_DAG_FOLDER) - - @provide_gcp_context(GCP_GCS_TRANSFER_KEY, project_id=GoogleSystemTest._project_id()) - def teardown_method(self) -> None: - # Execute test query - self.execute_cmd( - [ - "bq", - "query", - "--use_legacy_sql=false", - f"select * FROM `{self._project_id()}.{BQ_SQL_DATASET}.beam_output`", - ] - ) - - # Stop the Dataflow pipelines. - self.execute_cmd( - [ - "bash", - "-c", - textwrap.dedent( - f"""\ - gcloud dataflow jobs list \ - --region={DATAFLOW_SQL_LOCATION} \ - --filter 'NAME:{DATAFLOW_SQL_JOB_NAME} AND STATE=Running' \ - --format 'value(JOB_ID)' \ - | xargs -r gcloud dataflow jobs cancel --region={DATAFLOW_SQL_LOCATION} - """ - ), - ] - ) - # Delete the BigQuery dataset, - self.execute_cmd(["bq", "rm", "-r", "-f", "-d", f"{self._project_id()}:{BQ_SQL_DATASET}"]) diff --git a/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python.py b/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python.py index 6e80f8664c4a2..5b95bc8ffa663 100644 --- a/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python.py +++ b/tests/system/providers/google/cloud/dataflow/example_dataflow_native_python.py @@ -28,6 +28,7 @@ from airflow import models from airflow.providers.apache.beam.hooks.beam import BeamRunnerType from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator +from airflow.providers.google.cloud.operators.dataflow import DataflowStopJobOperator from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator from airflow.utils.trigger_rule import TriggerRule @@ -37,7 +38,7 @@ BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" -PYTHON_FILE_NAME = "wordcount_debugging.txt" +PYTHON_FILE_NAME = "wordcount_debugging.py" GCS_TMP = f"gs://{BUCKET_NAME}/temp/" GCS_STAGING = f"gs://{BUCKET_NAME}/staging/" GCS_OUTPUT = f"gs://{BUCKET_NAME}/output" @@ -97,6 +98,14 @@ py_system_site_packages=False, ) + # [START howto_operator_stop_dataflow_job] + stop_dataflow_job = DataflowStopJobOperator( + task_id="stop_dataflow_job", + location=LOCATION, + job_name_prefix="start-python-pipeline", + ) + # [END howto_operator_stop_dataflow_job] + delete_bucket = GCSDeleteBucketOperator( task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE ) @@ -108,6 +117,7 @@ # TEST BODY >> start_python_job >> start_python_job_local + >> stop_dataflow_job # TEST TEARDOWN >> delete_bucket ) diff --git a/tests/system/providers/google/cloud/dataflow/example_dataflow_sql.py b/tests/system/providers/google/cloud/dataflow/example_dataflow_sql.py new file mode 100644 index 0000000000000..f9c4b9a776e93 --- /dev/null +++ b/tests/system/providers/google/cloud/dataflow/example_dataflow_sql.py @@ -0,0 +1,145 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Airflow DAG for Google Cloud Dataflow service +""" +from __future__ import annotations + +import os +from datetime import datetime + +from airflow import models +from airflow.providers.google.cloud.operators.bigquery import ( + BigQueryCreateEmptyDatasetOperator, + BigQueryCreateEmptyTableOperator, + BigQueryDeleteDatasetOperator, + BigQueryDeleteTableOperator, + BigQueryInsertJobOperator, +) +from airflow.providers.google.cloud.operators.dataflow import DataflowStartSqlJobOperator +from airflow.utils.trigger_rule import TriggerRule + +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_dataflow_sql" +LOCATION = "europe-west3" +DATAFLOW_SQL_JOB_NAME = f"{DAG_ID}_{ENV_ID}".replace("_", "-") +BQ_SQL_DATASET = f"{DAG_ID}_{ENV_ID}".replace("-", "_") +BQ_SQL_TABLE_INPUT = f"input_{ENV_ID}".replace("-", "_") +BQ_SQL_TABLE_OUTPUT = f"output_{ENV_ID}".replace("-", "_") +INSERT_ROWS_QUERY = ( + f"INSERT {BQ_SQL_DATASET}.{BQ_SQL_TABLE_INPUT} VALUES " + "('John Doe', 900), " + "('Alice Storm', 1200)," + "('Bob Max', 1000)," + "('Peter Jackson', 800)," + "('Mia Smith', 1100);" +) + + +with models.DAG( + dag_id=DAG_ID, + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["example", "dataflow-sql"], +) as dag: + create_bq_dataset = BigQueryCreateEmptyDatasetOperator( + task_id="create_bq_dataset", + dataset_id=BQ_SQL_DATASET, + location=LOCATION, + ) + + create_bq_table = BigQueryCreateEmptyTableOperator( + task_id="create_bq_table", + dataset_id=BQ_SQL_DATASET, + table_id=BQ_SQL_TABLE_INPUT, + schema_fields=[ + {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}, + ], + ) + + insert_query_job = BigQueryInsertJobOperator( + task_id="insert_query_job", + configuration={ + "query": { + "query": INSERT_ROWS_QUERY, + "useLegacySql": False, + "priority": "BATCH", + } + }, + location=LOCATION, + ) + + # [START howto_operator_start_sql_job] + start_sql = DataflowStartSqlJobOperator( + task_id="start_sql_query", + job_name=DATAFLOW_SQL_JOB_NAME, + query=f""" + SELECT + emp_name as employee, + salary as employee_salary + FROM + bigquery.table.`{PROJECT_ID}`.`{BQ_SQL_DATASET}`.`{BQ_SQL_TABLE_INPUT}` + WHERE salary >= 1000; + """, + options={ + "bigquery-project": PROJECT_ID, + "bigquery-dataset": BQ_SQL_DATASET, + "bigquery-table": BQ_SQL_TABLE_OUTPUT, + }, + location=LOCATION, + do_xcom_push=True, + ) + # [END howto_operator_start_sql_job] + + delete_bq_table = BigQueryDeleteTableOperator( + task_id="delete_bq_table", + deletion_dataset_table=f"{PROJECT_ID}.{BQ_SQL_DATASET}.{BQ_SQL_TABLE_INPUT}", + trigger_rule=TriggerRule.ALL_DONE, + ) + + delete_bq_dataset = BigQueryDeleteDatasetOperator( + task_id="delete_bq_dataset", + dataset_id=BQ_SQL_DATASET, + delete_contents=True, + trigger_rule=TriggerRule.ALL_DONE, + ) + + ( + # TEST SETUP + create_bq_dataset + >> create_bq_table + >> insert_query_job + # TEST BODY + >> start_sql + # TEST TEARDOWN + >> delete_bq_table + >> delete_bq_dataset + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)