diff --git a/content/en/data_observability/jobs_monitoring/airflow.md b/content/en/data_observability/jobs_monitoring/airflow.md index ce2b210e594..886abd9484d 100644 --- a/content/en/data_observability/jobs_monitoring/airflow.md +++ b/content/en/data_observability/jobs_monitoring/airflow.md @@ -360,6 +360,8 @@ To run an automated check of your OpenLineage setup, see [Troubleshoot Airflow S ### Link your dbt jobs with Airflow tasks +
On Airflow 2.9.2 with apache-airflow-providers-openlineage 2.2.0 - the latest provider enabled for this Airflow version - the lineage_root_* macros required for root-parent linking are not available. To use them anyway, see Backport OpenLineage lineage macros for older provider versions.
+ You can monitor your dbt jobs that are running in Airflow by connecting the dbt telemetry with respective Airflow tasks, using [OpenLineage dbt integration][6]. To see the link between Airflow tasks and dbt jobs, follow those steps: @@ -394,6 +396,8 @@ dbt_run = BashOperator( ### Link your Spark jobs with Airflow tasks +
The lineage_root_* macros require apache-airflow-providers-openlineage 2.3.0 or later. On older provider versions (for example, Airflow 2.9.2 with provider 2.2.0), see Backport OpenLineage lineage macros for older provider versions.
+ OpenLineage integration can automatically inject Airflow's parent job information (namespace, job name, run id) into Spark application properties. This creates a parent-child relationship between Airflow tasks and Spark jobs, enabling you to troubleshoot both systems in one place. **Note**: This feature requires `apache-airflow-providers-openlineage` version 2.1.0 or later (supported from Airflow 2.9+). @@ -410,6 +414,141 @@ AIRFLOW__OPENLINEAGE__SPARK_INJECT_PARENT_JOB_INFO=true This automatically injects parent job properties for all supported Spark Operators. To disable for specific operators, set `openlineage_inject_parent_job_info=False` on the operator. +#### Manually inject parent job information for unsupported operators + +For operators that do not support automatic injection (for example, a `BashOperator` running the AWS CLI to start an Amazon EMR Serverless job), use the [OpenLineage Airflow lineage macros][4] to pass parent and root-parent identifiers as Spark configuration properties: + +```python +from datetime import datetime + +from airflow import DAG +from airflow.operators.bash import BashOperator + +with DAG( + dag_id="emr_serverless_with_openlineage_parent", + start_date=datetime(2026, 1, 1), + schedule_interval=None, + catchup=False, +) as dag: + + submit = BashOperator( + task_id="start_emr_serverless_job", + bash_command=r""" +set -euo pipefail + +aws emr-serverless start-job-run \ + --application-id \ + --execution-role-arn \ + --name \ + --region \ + --mode STREAMING \ + --job-driver '{"sparkSubmit": { + "entryPoint": "", + "entryPointArguments": ["--config-key", "", "--stage", ""], + "sparkSubmitParameters": "\ +--conf spark.app.name= \ +--conf spark.openlineage.parentJobNamespace={{ lineage_job_namespace() }} \ +--conf spark.openlineage.parentJobName={{ lineage_job_name(task_instance) }} \ +--conf spark.openlineage.parentRunId={{ lineage_run_id(task_instance) }} \ +--conf spark.openlineage.rootParentJobNamespace={{ lineage_root_job_namespace(task_instance) }} \ +--conf spark.openlineage.rootParentJobName={{ lineage_root_job_name(task_instance) }} \ +--conf spark.openlineage.rootParentRunId={{ lineage_root_run_id(task_instance) }}\ +" + }}' +""", + ) +``` + +### Backport OpenLineage lineage macros for older provider versions + +The `lineage_root_job_namespace`, `lineage_root_job_name`, and `lineage_root_run_id` macros that emit the outermost DAG identifiers for root-parent linking were added in `apache-airflow-providers-openlineage` 2.3.0. If you cannot upgrade the provider (for example, Amazon MWAA on Airflow 2.9.2 pins provider 2.2.0), define the missing macros as `user_defined_macros` on the DAG: + +```python +from __future__ import annotations + +from datetime import datetime + +from airflow import DAG +from airflow.operators.bash import BashOperator +from airflow.providers.openlineage import conf as ol_conf +from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter + + +def _logical_date(ti): + return getattr(ti, "logical_date", None) or ti.execution_date + + +def _clear_number(ti) -> int: + return ti.get_dagrun().clear_number + + +def _root_from_conf(ti, key): + ol = (ti.get_dagrun().conf or {}).get("openlineage") or {} + return ol.get(key) or None + + +def lineage_job_namespace(): + return ol_conf.namespace() + + +def lineage_job_name(ti): + return f"{ti.dag_id}.{ti.task_id}" + + +def lineage_run_id(ti): + return OpenLineageAdapter.build_task_instance_run_id( + dag_id=ti.dag_id, + task_id=ti.task_id, + try_number=ti.try_number, + logical_date=_logical_date(ti), + map_index=ti.map_index, + ) + + +def lineage_root_job_namespace(ti): + return _root_from_conf(ti, "rootParentJobNamespace") or ol_conf.namespace() + + +def lineage_root_job_name(ti): + return _root_from_conf(ti, "rootParentJobName") or ti.dag_id + + +def lineage_root_run_id(ti): + forwarded = _root_from_conf(ti, "rootParentRunId") + if forwarded: + return forwarded + return OpenLineageAdapter.build_dag_run_id( + dag_id=ti.dag_id, + logical_date=_logical_date(ti), + clear_number=_clear_number(ti), + ) + + +OL_MACROS = { + "lineage_job_namespace": lineage_job_namespace, + "lineage_job_name": lineage_job_name, + "lineage_run_id": lineage_run_id, + "lineage_root_job_namespace": lineage_root_job_namespace, + "lineage_root_job_name": lineage_root_job_name, + "lineage_root_run_id": lineage_root_run_id, +} + + +with DAG( + dag_id="", + start_date=datetime(2026, 1, 1), + schedule_interval=None, + catchup=False, + user_defined_macros=OL_MACROS, +) as dag: + submit = BashOperator( + task_id="", + bash_command="...", + ) +``` + +With `user_defined_macros` set on the DAG, the `{{ lineage_*() }}` and `{{ lineage_root_*() }}` calls in your task templates resolve to values that match the built-in macros shipped with provider 2.3.0+, so downstream Spark or dbt jobs can link to the Airflow root parent in Datadog. + ## Further Reading