Skip to content

Commit

Permalink
Restore base lineage backend (apache#14146)
Browse files Browse the repository at this point in the history
This adds back the base lineage backend which can be extended to send lineage metadata to any custom backend.
closes: apache#14106

Co-authored-by: Joao Ponte <[email protected]>
Co-authored-by: Tomek Urbaszek <[email protected]>
  • Loading branch information
3 people authored Apr 3, 2021
1 parent 9ac1d0a commit af2d11e
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 1 deletion.
22 changes: 22 additions & 0 deletions airflow/lineage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import jinja2
from cattr import structure, unstructure

from airflow.configuration import conf
from airflow.lineage.backend import LineageBackend
from airflow.utils.module_loading import import_string

ENV = jinja2.Environment()
Expand All @@ -45,6 +47,22 @@ class Metadata:
data: Dict = attr.ib()


def get_backend() -> Optional[LineageBackend]:
"""Gets the lineage backend if defined in the configs"""
clazz = conf.getimport("lineage", "backend", fallback=None)

if clazz:
if not issubclass(clazz, LineageBackend):
raise TypeError(
f"Your custom Lineage class `{clazz.__name__}` "
f"is not a subclass of `{LineageBackend.__name__}`."
)
else:
return clazz()

return None


def _get_instance(meta: Metadata):
"""Instantiate an object from Metadata"""
cls = import_string(meta.type_name)
Expand Down Expand Up @@ -82,6 +100,7 @@ def apply_lineage(func: T) -> T:
Saves the lineage to XCom and if configured to do so sends it
to the backend.
"""
_backend = get_backend()

@wraps(func)
def wrapper(self, context, *args, **kwargs):
Expand All @@ -101,6 +120,9 @@ def wrapper(self, context, *args, **kwargs):
context, key=PIPELINE_INLETS, value=inlets, execution_date=context['ti'].execution_date
)

if _backend:
_backend.send_lineage(operator=self, inlets=self.inlets, outlets=self.outlets, context=context)

return ret_val

return cast(T, wrapper)
Expand Down
47 changes: 47 additions & 0 deletions airflow/lineage/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Sends lineage metadata to a backend"""
from typing import TYPE_CHECKING, Optional

if TYPE_CHECKING:
from airflow.models.baseoperator import BaseOperator # pylint: disable=cyclic-import


class LineageBackend:
"""Sends lineage metadata to a backend"""

def send_lineage(
self,
operator: 'BaseOperator',
inlets: Optional[list] = None,
outlets: Optional[list] = None,
context: Optional[dict] = None,
):
"""
Sends lineage metadata to a backend
:param operator: the operator executing a transformation on the inlets and outlets
:type operator: airflow.models.baseoperator.BaseOperator
:param inlets: the inlets to this operator
:type inlets: list
:param outlets: the outlets from this operator
:type outlets: list
:param context: the current context of the task instance
:type context: dict
"""
raise NotImplementedError()
21 changes: 21 additions & 0 deletions docs/apache-airflow/lineage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,24 @@ has outlets defined (e.g. by using ``add_outlets(..)`` or has out of the box sup
f_in > run_this | (run_this_last > outlets)
.. _precedence: https://docs.python.org/3/reference/expressions.html


Lineage Backend
---------------

It's possible to push the lineage metrics to a custom backend by providing an instance of a LinageBackend in the config:

.. code-block:: ini
[lineage]
backend = my.lineage.CustomBackend
The backend should inherit from ``airflow.lineage.LineageBackend``.

.. code-block:: python
from airflow.lineage.backend import LineageBackend
class ExampleBackend(LineageBackend):
def send_lineage(self, operator, inlets=None, outlets=None, context=None):
# Send the info to some external service
49 changes: 48 additions & 1 deletion tests/lineage/test_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,24 @@
# specific language governing permissions and limitations
# under the License.
import unittest
from unittest import mock

from airflow.lineage import AUTO
from airflow.lineage import AUTO, apply_lineage, get_backend, prepare_lineage
from airflow.lineage.backend import LineageBackend
from airflow.lineage.entities import File
from airflow.models import DAG, TaskInstance as TI
from airflow.operators.dummy import DummyOperator
from airflow.utils import timezone
from tests.test_utils.config import conf_vars

DEFAULT_DATE = timezone.datetime(2016, 1, 1)


class CustomLineageBackend(LineageBackend):
def send_lineage(self, operator, inlets=None, outlets=None, context=None):
pass


class TestLineage(unittest.TestCase):
def test_lineage(self):
dag = DAG(dag_id='test_prepare_lineage', start_date=DEFAULT_DATE)
Expand Down Expand Up @@ -111,3 +119,42 @@ def test_lineage_render(self):
op1.pre_execute(ctx1)
assert op1.inlets[0].url == f1s.format(DEFAULT_DATE)
assert op1.outlets[0].url == f1s.format(DEFAULT_DATE)

@mock.patch("airflow.lineage.get_backend")
def test_lineage_is_sent_to_backend(self, mock_get_backend):
class TestBackend(LineageBackend):
def send_lineage(self, operator, inlets=None, outlets=None, context=None):
assert len(inlets) == 1
assert len(outlets) == 1

func = mock.Mock()
func.__name__ = 'foo'

mock_get_backend.return_value = TestBackend()

dag = DAG(dag_id='test_lineage_is_sent_to_backend', start_date=DEFAULT_DATE)

with dag:
op1 = DummyOperator(task_id='task1')

file1 = File("/tmp/some_file")

op1.inlets.append(file1)
op1.outlets.append(file1)

ctx1 = {"ti": TI(task=op1, execution_date=DEFAULT_DATE), "execution_date": DEFAULT_DATE}

prep = prepare_lineage(func)
prep(op1, ctx1)
post = apply_lineage(func)
post(op1, ctx1)

def test_empty_lineage_backend(self):
backend = get_backend()
assert backend is None

@conf_vars({("lineage", "backend"): "tests.lineage.test_lineage.CustomLineageBackend"})
def test_resolve_lineage_class(self):
backend = get_backend()
assert issubclass(backend.__class__, LineageBackend)
assert isinstance(backend, CustomLineageBackend)

0 comments on commit af2d11e

Please sign in to comment.