Monday, July 3, 2023

Dependencies between DAGs in Apache Airflow

 

A DAG that runs a “goodbye” task only after two upstream DAGs have successfully finished. This post explains how to create such a DAG in Apache Airflow

In Apache Airflow we can have very complex DAGs with several tasks, and dependencies between the tasks.

But what if we have cross-DAGs dependencies, and we want to make a DAG of DAGs? Normally, we would try to put all tasks that have dependencies in the same DAG. But sometimes you cannot modify the DAGs, and you may want to still add dependencies between the DAGs.

For that, we can use the ExternalTaskSensor.

This sensor will lookup past executions of DAGs and tasks, and will match those DAGs that share the same execution_date as our DAG. However, the name execution_date might be misleading: it is not a date, but an instant. So DAGs that are cross-dependent between them need to be run in the same instant, or one after the other by a constant amount of time. In summary, we need alignment in the execution dates and times.

Let's see an example. We have two upstream DAGs, and we want to run another DAG after the first two DAGs have successfully finished.

This is the first DAG. It has only two dummy tasks.

The second upstream DAG is very similar to this one, so I don't show the code here, but you can have a look at the code in Github.

The important aspect is that both DAGs have the same schedule and start dates (see the corresponding lines in the DAG 1 and in the DAG 2). Notice that the DAGs are run every minute. That's only for the sake of this demo. In a real setting, that would be a very high frequency, so beware if you copy-paste some code for your own DAGs.

The downstream DAG will be executed when both upstream DAGs succeed. This is the code of the downstream DAG:

Some important points to notice. The schedule and start date is the same as the upstream DAGsThis is crucial for this DAG to respond to the upstream DAGs, that is, to add a dependency between the runs of the upstream DAGs and the run of this DAG.

And what if the execution dates don't match but I still want to add a dependency? If the start dates differ by a constant amount of time, you can use the execution_delta parameter of ExternalTaskSensor. For more details, check the documentation of ExternalTaskSensor.

The documentation of Airflow includes an article about cross DAG dependencies: https://airflow.apache.org/docs/stable/howto/operator/external.html

And what if I want to branch on different downstream DAGs depending on the results of the previous DAGs? For that, you can use the branch operator and the XCOM to communicate values across DAGs.

The ExternalTaskSensor will only receive a SUCCESS or FAILED status corresponding to the “sensed” DAG, but not any output value. If you need to branch depending on the values calculated in a task, you can use the BranchPythonOperator (https://airflow.apache.org/docs/stable/concepts.html#branching)

The upstream DAG would have to publish the values in the XCOM, and the downstream DAG needs to provide a callback function to the branch operator. This callback function would read the XCOM using the upstream task_id and then it would return the id of the task to be continued after this one (among a list of potential tasks to be executed downstream after the branch operator) — I will cover this example with code snippets in a future post!

When you cannot modify existing DAGs, that does not mean that you cannot create dependencies between those DAGs. This post has shown how to create those dependencies even if you don't control the upstream DAGs: add a new DAG that relies on using the ExternalTaskSensor (one sensor per upstream DAG), encode the dependencies between the DAGs as dependencies between the sensor tasks, run the DAG encoding the dependencies in the same schedule as the upstream DAGs, and configure the sensors with the corresponding execution_delta if the DAGs' schedules are shifted by a constant amount of time.


Cross-DAG dependencies

When designing Airflow DAGs, it is often best practice to put all related tasks in the same DAG. However, it's sometimes necessary to create dependencies between your DAGs. In this scenario, one node of a DAG is its own complete DAG, rather than just a single task. Throughout this guide, the following terms are used to describe DAG dependencies:

  • Upstream DAG: A DAG that must reach a specified state before a downstream DAG can run
  • Downstream DAG: A DAG that cannot run until an upstream DAG reaches a specified state

The Airflow topic Cross-DAG Dependencies, indicates cross-DAG dependencies can be helpful in the following situations:

  • A DAG should only run after one or more datasets have been updated by tasks in other DAGs.
  • Two DAGs are dependent, but they have different schedules.
  • Two DAGs are dependent, but they are owned by different teams.
  • A task depends on another task but for a different execution date.

In this guide, you'll review the methods for implementing cross-DAG dependencies, including how to implement dependencies if your dependent DAGs are located in different Airflow deployments.

All code used in this is available in the cross-dag-dependencies-tutorial registry.

Assumed knowledge

To get the most out of this guide, you should have an understanding of:

Implement cross-DAG dependencies

There are multiple ways to implement cross-DAG dependencies in Airflow, including:

In this section, you'll learn how and when you should use each method and how to view dependencies in the Airflow UI.

Using SubDAGs to handle DAG dependencies can cause performance issues. Instead, use one of the methods described in this guide.

Dataset dependencies

In Airflow 2.4 and later, you can use datasets to create data-driven dependencies between DAGs. DAGs that access the same data can have explicit, visible relationships, and DAGs can be scheduled based on updates to this data.

You should use this method if you have a downstream DAG that should only run after a dataset has been updated by an upstream DAG, especially if those updates are irregular. This type of dependency also provides you with increased observability into the dependencies between your DAGs and datasets in the Airflow UI.

Using datasets requires knowledge of the following scheduling concepts:

  • Producing task: A task that updates a specific dataset, defined by its outlets parameter.
  • Consuming DAG: A DAG that runs as soon as a specific dataset is updated.

Any task can be made into a producing task by providing one or more datasets to the outlets parameter. For example:

dataset1 = Dataset('s3://folder1/dataset_1.txt')

# producing task in the upstream DAG
EmptyOperator(
task_id="producing_task",
outlets=[dataset1] # flagging to Airflow that dataset1 was updated
)

The following downstream DAG is scheduled to run after dataset1 has been updated by providing it to the schedule parameter.

dataset1 = Dataset('s3://folder1/dataset_1.txt')

# consuming DAG
with DAG(
dag_id='consuming_dag_1',
catchup=False,
start_date=datetime.datetime(2022, 1, 1),
schedule=[dataset1]
) as dag:

In the Airflow UI, the Next Run column for the downstream DAG shows dataset dependencies for the DAG and how many dependencies have been updated since the last DAG run. The following image shows that the DAG dataset_dependent_example_dag runs only after two different datasets have been updated. One of those datasets has already been updated by an upstream DAG.

DAG Dependencies View

See Datasets and Data-Aware Scheduling in Airflow to learn more.

TriggerDagRunOperator

The TriggerDagRunOperator is a straightforward method of implementing cross-DAG dependencies from an upstream DAG. This operator allows you to have a task in one DAG that triggers another DAG in the same Airflow environment. For more information about this operator, see TriggerDagRunOperator.

You can trigger a downstream DAG with the TriggerDagRunOperator from any point in the upstream DAG. If you set the operator's wait_for_completion parameter to True, the upstream DAG will pause and resume only once the downstream DAG has finished running. As of Airflow 2.6 this waiting process can be deferred to the triggerer by setting the parameter deferrable to True, turning the operator into a deferrable operator which increases Airflow's scalability and can reduce cost.

A common use case for this implementation is when an upstream DAG fetches new testing data for a machine learning pipeline, runs and tests a model, and publishes the model's prediction. In case of the model underperforming, the TriggerDagRunOperator is used to start a separate DAG that retrains the model while the upstream DAG waits. Once the model is retrained and tested by the downstream DAG, the upstream DAG resumes and publishes the new model's results.

The following example DAG implements the TriggerDagRunOperator to trigger a DAG with the dag_id dependent_dag between two other tasks. Since both the wait_for_completion and the deferrable parameters of the trigger_dependent_dag task in the trigger_dagrun_dag are set to True, the task is deferred until the dependent_dag has finished its run. Once the trigger_dagrun_dag task completes, the end_task will run.

from airflow.decorators import dag, task
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from pendulum import datetime, duration


@task
def start_task(task_type):
return f"The {task_type} task has completed."


@task
def end_task(task_type):
return f"The {task_type} task has completed."


# Default settings applied to all tasks
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": duration(minutes=5),
}


@dag(
start_date=datetime(2023, 1, 1),
max_active_runs=1,
schedule="@daily",
default_args=default_args,
catchup=False,
)
def trigger_dagrun_dag():
trigger_dependent_dag = TriggerDagRunOperator(
task_id="trigger_dependent_dag",
trigger_dag_id="dependent_dag",
wait_for_completion=True,
deferrable=True, # Note that this parameter only exists in Airflow 2.6+
)

start_task("starting") >> trigger_dependent_dag >> end_task("ending")


trigger_dagrun_dag()

In the following image, you can see that the trigger_dependent_dag task in the middle is the TriggerDagRunOperator, which runs the dependent-dag.

Trigger DAG Graph

If your dependent DAG requires a config input or a specific execution date, you can specify them in the operator using the conf and execution_date params respectively.

ExternalTaskSensor

To create cross-DAG dependencies from a downstream DAG, consider using one or more ExternalTaskSensors. The downstream DAG will pause until a task is completed in the upstream DAG before resuming.

This method of creating cross-DAG dependencies is especially useful when you have a downstream DAG with different branches that depend on different tasks in one or more upstream DAGs. Instead of defining an entire DAG as being downstream of another DAG as you do with datasets, you can set a specific task in a downstream DAG to wait for a task to finish in an upstream DAG.

For example, you could have upstream tasks modifying different tables in a data warehouse and one downstream DAG running one branch of data quality checks for each of those tables. You can use one ExternalTaskSensor at the start of each branch to make sure that the checks running on each table only start after the update to the specific table is finished.

In Airflow 2.2 and later, a deferrable version of the ExternalTaskSensor is available, the ExternalTaskSensorAsync. For more info on deferrable operators and their benefits, see Deferrable Operators

The following example DAG uses three ExternalTaskSensors at the start of three parallel branches in the same DAG.

from airflow.decorators import dag, task
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.empty import EmptyOperator
from pendulum import datetime, duration


@task
def downstream_function_branch_1():
print("Upstream DAG 1 has completed. Starting tasks of branch 1.")


@task
def downstream_function_branch_2():
print("Upstream DAG 2 has completed. Starting tasks of branch 2.")


@task
def downstream_function_branch_3():
print("Upstream DAG 3 has completed. Starting tasks of branch 3.")


default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": duration(minutes=5),
}


@dag(
start_date=datetime(2022, 8, 1),
max_active_runs=3,
schedule="*/1 * * * *",
catchup=False,
)
def external_task_sensor_taskflow_dag():
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")

ets_branch_1 = ExternalTaskSensor(
task_id="ets_branch_1",
external_dag_id="upstream_dag_1",
external_task_id="my_task",
allowed_states=["success"],
failed_states=["failed", "skipped"],
)

task_branch_1 = downstream_function_branch_1()

ets_branch_2 = ExternalTaskSensor(
task_id="ets_branch_2",
external_dag_id="upstream_dag_2",
external_task_id="my_task",
allowed_states=["success"],
failed_states=["failed", "skipped"],
)

task_branch_2 = downstream_function_branch_2()

ets_branch_3 = ExternalTaskSensor(
task_id="ets_branch_3",
external_dag_id="upstream_dag_3",
external_task_id="my_task",
allowed_states=["success"],
failed_states=["failed", "skipped"],
)

task_branch_3 = downstream_function_branch_3()

start >> [ets_branch_1, ets_branch_2, ets_branch_3]

ets_branch_1 >> task_branch_1
ets_branch_2 >> task_branch_2
ets_branch_3 >> task_branch_3

[task_branch_1, task_branch_2, task_branch_3] >> end


external_task_sensor_taskflow_dag()

In this DAG:

  • ets_branch_1 waits for the my_task task of upstream_dag_1 to complete before moving on to execute task_branch_1.
  • ets_branch_2 waits for the my_task task of upstream_dag_2 to complete before moving on to execute task_branch_2.
  • ets_branch_3 waits for the my_task task of upstream_dag_3 to complete before moving on to execute task_branch_3.

These processes happen in parallel and are independent of each other. The graph view shows the state of the DAG after my_task in upstream_dag_1 has finished which caused ets_branch_1 and task_branch_1 to run. ets_branch_2 and ets_branch_3 are still waiting for their upstream tasks to finish.

ExternalTaskSensor 3 Branches

If you want the downstream DAG to wait for the entire upstream DAG to finish instead of a specific task, you can set the external_task_id to None. In the example above, you specified that the external task must have a state of success for the downstream task to succeed, as defined by the allowed_states and failed_states.

In the previous example, the upstream DAG (example_dag) and downstream DAG (external-task-sensor-dag) must have the same start date and schedule interval. This is because the ExternalTaskSensor will look for completion of the specified task or DAG at the same logical_date (previously called execution_date). To look for completion of the external task at a different date, you can make use of either of the execution_delta or execution_date_fn parameters (these are described in more detail in the documentation linked above).

Airflow API

The Airflow API is another way of creating cross-DAG dependencies. This is especially useful in Airflow 2.0, which has a fully stable REST API. To use the API to trigger a DAG run, you can make a POST request to the DAGRuns endpoint as described in the Airflow API documentation.

This method is useful if your dependent DAGs live in different Airflow environments (more on this in the Cross-Deployment Dependencies section below). The task triggering the downstream DAG will complete once the API call is complete.

Using the API to trigger a downstream DAG can be implemented within a DAG by using the SimpleHttpOperator as shown in the example DAG below:

from airflow.decorators import dag, task
from airflow.providers.http.operators.http import SimpleHttpOperator
from pendulum import datetime, duration
import json

# Define body of POST request for the API call to trigger another DAG
date = "{{ execution_date }}"
request_body = {"execution_date": date}
json_body = json.dumps(request_body)


@task
def print_task_type(task_type):
"""
Example function to call before and after downstream DAG.
"""
print(f"The {task_type} task has completed.")
print(request_body)


default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": duration(minutes=5),
}


@dag(
start_date=datetime(2021, 1, 1),
max_active_runs=1,
schedule="@daily",
catchup=False,
)
def api_dag_taskflow():
start_task = print_task_type("starting")

api_trigger_dependent_dag = SimpleHttpOperator(
task_id="api_trigger_dependent_dag",
http_conn_id="airflow-api",
endpoint="/api/v1/dags/dependent-dag/dagRuns",
method="POST",
headers={"Content-Type": "application/json"},
data=json_body,
)

end_task = print_task_type("ending")

start_task >> api_trigger_dependent_dag >> end_task


api_dag_taskflow()

This DAG has a similar structure to the TriggerDagRunOperator DAG, but instead uses the SimpleHttpOperator to trigger the dependent-dag using the Airflow API. The graph view appears similar to the following image:

API Graph View

To use the SimpleHttpOperator to trigger another DAG, you need to define the following:

  • endpoint: This should be of the form '/api/v1/dags/<dag-id>/dagRuns' where <dag-id> is the ID of the DAG you want to trigger.
  • data: To trigger a DAG run using this endpoint, you must provide an execution date. In the example above, we use the execution_date of the upstream DAG, but this can be any date of your choosing. You can also specify other information about the DAG run as described in the API documentation linked above.
  • http_conn_id: This should be an Airflow connection of type HTTP, with your Airflow domain as the Host. Any authentication should be provided either as a Login/Password (if using Basic auth) or as a JSON-formatted Extra. In the example below, we use an authorization token.

Http Connection

DAG dependencies view

In Airflow 2.1, a new cross-DAG dependencies view was added to the Airflow UI. This view shows all DAG dependencies in your Airflow environment as long as they are implemented using one of the following methods:

  • Using dataset driven scheduling
  • Using a TriggerDagRunOperator
  • Using an ExternalTaskSensor

To view dependencies in the UI, go to Browse > DAG Dependencies or by click Graph within the Datasets tab. The following image shows the dependencies created by the TriggerDagRunOperator and ExternalTaskSensor example DAGs.

DAG Dependencies View

When DAGs are scheduled depending on datasets, both the DAG containing the producing task and the dataset are shown upstream of the consuming DAG.

DAG Dependencies View Datasets

In Airflow 2.4 an additional Datasets tab was added, which shows all dependencies between datasets and DAGs.

DAG Dependencies View Datasets

Cross-deployment dependencies

It is sometimes necessary to implement cross-DAG dependencies where the DAGs do not exist in the same Airflow deployment. The TriggerDagRunOperator, ExternalTaskSensor, and dataset methods are designed to work with DAGs in the same Airflow environment, so they are not ideal for cross-Airflow deployments. The Airflow API is ideal for this use case. In this section, you'll learn how to implement this method on Astro, but the general concepts are also applicable to your Airflow environments.

Cross-deployment dependencies on Astro

To implement cross-DAG dependencies on two different Airflow environments on Astro, follow the steps for triggering a DAG using the Airflow API. Before you get started, you should review Make requests to the Airflow REST API. When you're ready to implement a cross-deployment dependency, follow these steps:

  1. In the upstream DAG, create a SimpleHttpOperator task that will trigger the downstream DAG. Refer to the section above for details on configuring the operator.
  2. In the Deployment running the downstream DAG, create an API key and copy it.
  3. In the upstream DAG Airflow environment, create an Airflow connection as shown in the Airflow API section above. The Host should be https://<your-base-domain>/<deployment-release-name>/airflow where the base domain and deployment release name are from your downstream DAG's Airflow deployment. In the Extras field, enter {"Authorization": "api-token"} where api-token is the API key you copied in step 2.
  4. Ensure the downstream DAG is turned on, then run the upstream DAG.

Was this page helpful?

No comments:

Must Watch YouTube Videos for Databricks Platform Administrators

  While written word is clearly the medium of choice for this platform, sometimes a picture or a video can be worth 1,000 words. Below are  ...