Skip to main content
Version: 0.11.0

Airflow Integration

DataHub supports integration of

  • Airflow Pipeline (DAG) metadata
  • DAG and Task run information as well as
  • Lineage information when present

You can use either the DataHub Airflow lineage plugin (recommended) or the Airflow lineage backend (deprecated).

Using Datahub's Airflow lineage plugin

note

The Airflow lineage plugin is only supported with Airflow version >= 2.0.2 or on MWAA with an Airflow version >= 2.0.2.

If you're using Airflow 1.x, use the Airflow lineage plugin with acryl-datahub-airflow-plugin <= 0.9.1.0.

This plugin registers a task success/failure callback on every task with a cluster policy and emits DataHub events from that. This allows this plugin to be able to register both task success as well as failures compared to the older Airflow Lineage Backend which could only support emitting task success.

Setup

  1. You need to install the required dependency in your airflow.
pip install acryl-datahub-airflow-plugin
note

The DataHub Rest emitter is included in the plugin package by default. To use DataHub Kafka install pip install acryl-datahub-airflow-plugin[datahub-kafka].

  1. Disable lazy plugin loading in your airflow.cfg. On MWAA you should add this config to your Apache Airflow configuration options.
airflow.cfg
[core]
lazy_load_plugins = False
  1. You must configure an Airflow hook for Datahub. We support both a Datahub REST hook and a Kafka-based hook, but you only need one.

    # For REST-based:
    airflow connections add --conn-type 'datahub_rest' 'datahub_rest_default' --conn-host 'http://datahub-gms:8080' --conn-password '<optional datahub auth token>'
    # For Kafka-based (standard Kafka sink config can be passed via extras):
    airflow connections add --conn-type 'datahub_kafka' 'datahub_kafka_default' --conn-host 'broker:9092' --conn-extra '{}'
  2. Add your datahub_conn_id and/or cluster to your airflow.cfg file if it is not align with the default values. See configuration parameters below

    Configuration options:

    NameDefault valueDescription
    datahub.enabledtrueIf the plugin should be enabled.
    datahub.conn_iddatahub_rest_defaultThe name of the datahub connection you set in step 1.
    datahub.clusterprodname of the airflow cluster
    datahub.capture_ownership_infotrueIf true, the owners field of the DAG will be capture as a DataHub corpuser.
    datahub.capture_tags_infotrueIf true, the tags field of the DAG will be captured as DataHub tags.
    datahub.capture_executionstrueIf true, we'll capture task runs in DataHub in addition to DAG definitions.
    datahub.graceful_exceptionstrueIf set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions.
  3. Configure inlets and outlets for your Airflow operators. For reference, look at the sample DAG in lineage_backend_demo.py, or reference lineage_backend_taskflow_demo.py if you're using the TaskFlow API.

  4. [optional] Learn more about Airflow lineage, including shorthand notation and some automation.

How to validate installation

  1. Go and check in Airflow at Admin -> Plugins menu if you can see the DataHub plugin
  2. Run an Airflow DAG. In the task logs, you should see Datahub related log messages like:
Emitting DataHub ...

Emitting lineage via a custom operator to the Airflow Plugin

If you have created a custom Airflow operator docs that inherits from the BaseOperator class, when overriding the execute function, set inlets and outlets via context['ti'].task.inlets and context['ti'].task.outlets. The DataHub Airflow plugin will then pick up those inlets and outlets after the task runs.

class DbtOperator(BaseOperator):
...

def execute(self, context):
# do something
inlets, outlets = self._get_lineage()
# inlets/outlets are lists of either datahub_provider.entities.Dataset or datahub_provider.entities.Urn
context['ti'].task.inlets = self.inlets
context['ti'].task.outlets = self.outlets

def _get_lineage(self):
# Do some processing to get inlets/outlets

return inlets, outlets

If you override the pre_execute and post_execute function, ensure they include the @prepare_lineage and @apply_lineage decorators respectively. source

Using DataHub's Airflow lineage backend (deprecated)

caution

The DataHub Airflow plugin (above) is the recommended way to integrate Airflow with DataHub. For managed services like MWAA, the lineage backend is not supported and so you must use the Airflow plugin.

If you're using Airflow 1.x, we recommend using the Airflow lineage backend with acryl-datahub <= 0.9.1.0.

note

If you are looking to run Airflow and DataHub using docker locally, follow the guide here. Otherwise proceed to follow the instructions below.

Setting up Airflow to use DataHub as Lineage Backend

  1. You need to install the required dependency in your airflow. See https://registry.astronomer.io/providers/datahub/modules/datahublineagebackend
pip install acryl-datahub[airflow]
# If you need the Kafka-based emitter/hook:
pip install acryl-datahub[airflow,datahub-kafka]
  1. You must configure an Airflow hook for Datahub. We support both a Datahub REST hook and a Kafka-based hook, but you only need one.

    # For REST-based:
    airflow connections add --conn-type 'datahub_rest' 'datahub_rest_default' --conn-host 'http://datahub-gms:8080' --conn-password '<optional datahub auth token>'
    # For Kafka-based (standard Kafka sink config can be passed via extras):
    airflow connections add --conn-type 'datahub_kafka' 'datahub_kafka_default' --conn-host 'broker:9092' --conn-extra '{}'
  2. Add the following lines to your airflow.cfg file.

    airflow.cfg
    [lineage]
    backend = datahub_provider.lineage.datahub.DatahubLineageBackend
    datahub_kwargs = {
    "enabled": true,
    "datahub_conn_id": "datahub_rest_default",
    "cluster": "prod",
    "capture_ownership_info": true,
    "capture_tags_info": true,
    "graceful_exceptions": true }
    # The above indentation is important!

    Configuration options:

    • datahub_conn_id (required): Usually datahub_rest_default or datahub_kafka_default, depending on what you named the connection in step 1.
    • cluster (defaults to "prod"): The "cluster" to associate Airflow DAGs and tasks with.
    • capture_ownership_info (defaults to true): If true, the owners field of the DAG will be capture as a DataHub corpuser.
    • capture_tags_info (defaults to true): If true, the tags field of the DAG will be captured as DataHub tags.
    • capture_executions (defaults to false): If true, it captures task runs as DataHub DataProcessInstances.
    • graceful_exceptions (defaults to true): If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions.
  3. Configure inlets and outlets for your Airflow operators. For reference, look at the sample DAG in lineage_backend_demo.py, or reference lineage_backend_taskflow_demo.py if you're using the TaskFlow API.

  4. [optional] Learn more about Airflow lineage, including shorthand notation and some automation.

Emitting lineage via a separate operator

Take a look at this sample DAG:

In order to use this example, you must first configure the Datahub hook. Like in ingestion, we support a Datahub REST hook and a Kafka-based hook. See step 1 above for details.

Debugging

Incorrect URLs

If your URLs aren't being generated correctly (usually they'll start with http://localhost:8080 instead of the correct hostname), you may need to set the webserver base_url config.

airflow.cfg
[webserver]
base_url = http://airflow.example.com

Additional references

Related Datahub videos: