diff --git a/content/en/data_observability/jobs_monitoring/airflow.md b/content/en/data_observability/jobs_monitoring/airflow.md
index ce2b210e594..886abd9484d 100644
--- a/content/en/data_observability/jobs_monitoring/airflow.md
+++ b/content/en/data_observability/jobs_monitoring/airflow.md
@@ -360,6 +360,8 @@ To run an automated check of your OpenLineage setup, see [Troubleshoot Airflow S
### Link your dbt jobs with Airflow tasks
+
+
You can monitor your dbt jobs that are running in Airflow by connecting the dbt telemetry with respective Airflow tasks, using [OpenLineage dbt integration][6].
To see the link between Airflow tasks and dbt jobs, follow those steps:
@@ -394,6 +396,8 @@ dbt_run = BashOperator(
### Link your Spark jobs with Airflow tasks
+
+
OpenLineage integration can automatically inject Airflow's parent job information (namespace, job name, run id) into Spark application properties. This creates a parent-child relationship between Airflow tasks and Spark jobs, enabling you to troubleshoot both systems in one place.
**Note**: This feature requires `apache-airflow-providers-openlineage` version 2.1.0 or later (supported from Airflow 2.9+).
@@ -410,6 +414,141 @@ AIRFLOW__OPENLINEAGE__SPARK_INJECT_PARENT_JOB_INFO=true
This automatically injects parent job properties for all supported Spark Operators. To disable for specific operators, set `openlineage_inject_parent_job_info=False` on the operator.
+#### Manually inject parent job information for unsupported operators
+
+For operators that do not support automatic injection (for example, a `BashOperator` running the AWS CLI to start an Amazon EMR Serverless job), use the [OpenLineage Airflow lineage macros][4] to pass parent and root-parent identifiers as Spark configuration properties:
+
+```python
+from datetime import datetime
+
+from airflow import DAG
+from airflow.operators.bash import BashOperator
+
+with DAG(
+ dag_id="emr_serverless_with_openlineage_parent",
+ start_date=datetime(2026, 1, 1),
+ schedule_interval=None,
+ catchup=False,
+) as dag:
+
+ submit = BashOperator(
+ task_id="start_emr_serverless_job",
+ bash_command=r"""
+set -euo pipefail
+
+aws emr-serverless start-job-run \
+ --application-id \
+ --execution-role-arn \
+ --name \
+ --region \
+ --mode STREAMING \
+ --job-driver '{"sparkSubmit": {
+ "entryPoint": "",
+ "entryPointArguments": ["--config-key", "", "--stage", ""],
+ "sparkSubmitParameters": "\
+--conf spark.app.name= \
+--conf spark.openlineage.parentJobNamespace={{ lineage_job_namespace() }} \
+--conf spark.openlineage.parentJobName={{ lineage_job_name(task_instance) }} \
+--conf spark.openlineage.parentRunId={{ lineage_run_id(task_instance) }} \
+--conf spark.openlineage.rootParentJobNamespace={{ lineage_root_job_namespace(task_instance) }} \
+--conf spark.openlineage.rootParentJobName={{ lineage_root_job_name(task_instance) }} \
+--conf spark.openlineage.rootParentRunId={{ lineage_root_run_id(task_instance) }}\
+"
+ }}'
+""",
+ )
+```
+
+### Backport OpenLineage lineage macros for older provider versions
+
+The `lineage_root_job_namespace`, `lineage_root_job_name`, and `lineage_root_run_id` macros that emit the outermost DAG identifiers for root-parent linking were added in `apache-airflow-providers-openlineage` 2.3.0. If you cannot upgrade the provider (for example, Amazon MWAA on Airflow 2.9.2 pins provider 2.2.0), define the missing macros as `user_defined_macros` on the DAG:
+
+```python
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow import DAG
+from airflow.operators.bash import BashOperator
+from airflow.providers.openlineage import conf as ol_conf
+from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
+
+
+def _logical_date(ti):
+ return getattr(ti, "logical_date", None) or ti.execution_date
+
+
+def _clear_number(ti) -> int:
+ return ti.get_dagrun().clear_number
+
+
+def _root_from_conf(ti, key):
+ ol = (ti.get_dagrun().conf or {}).get("openlineage") or {}
+ return ol.get(key) or None
+
+
+def lineage_job_namespace():
+ return ol_conf.namespace()
+
+
+def lineage_job_name(ti):
+ return f"{ti.dag_id}.{ti.task_id}"
+
+
+def lineage_run_id(ti):
+ return OpenLineageAdapter.build_task_instance_run_id(
+ dag_id=ti.dag_id,
+ task_id=ti.task_id,
+ try_number=ti.try_number,
+ logical_date=_logical_date(ti),
+ map_index=ti.map_index,
+ )
+
+
+def lineage_root_job_namespace(ti):
+ return _root_from_conf(ti, "rootParentJobNamespace") or ol_conf.namespace()
+
+
+def lineage_root_job_name(ti):
+ return _root_from_conf(ti, "rootParentJobName") or ti.dag_id
+
+
+def lineage_root_run_id(ti):
+ forwarded = _root_from_conf(ti, "rootParentRunId")
+ if forwarded:
+ return forwarded
+ return OpenLineageAdapter.build_dag_run_id(
+ dag_id=ti.dag_id,
+ logical_date=_logical_date(ti),
+ clear_number=_clear_number(ti),
+ )
+
+
+OL_MACROS = {
+ "lineage_job_namespace": lineage_job_namespace,
+ "lineage_job_name": lineage_job_name,
+ "lineage_run_id": lineage_run_id,
+ "lineage_root_job_namespace": lineage_root_job_namespace,
+ "lineage_root_job_name": lineage_root_job_name,
+ "lineage_root_run_id": lineage_root_run_id,
+}
+
+
+with DAG(
+ dag_id="",
+ start_date=datetime(2026, 1, 1),
+ schedule_interval=None,
+ catchup=False,
+ user_defined_macros=OL_MACROS,
+) as dag:
+ submit = BashOperator(
+ task_id="",
+ bash_command="...",
+ )
+```
+
+With `user_defined_macros` set on the DAG, the `{{ lineage_*() }}` and `{{ lineage_root_*() }}` calls in your task templates resolve to values that match the built-in macros shipped with provider 2.3.0+, so downstream Spark or dbt jobs can link to the Airflow root parent in Datadog.
+
## Further Reading