Apache Airflow Branching (and gotcha!)

February 21, 2024
Karl Sorensen

Apache Airflow is a powerful, open-source tool that allows you to define workflows for data engineering and even general workflow processes.

A useful feature of Apache Airflow is the ability to manage to the directions of the workflows through the use of branches.

This allows for the creation of more complex flows and means you don’t have to go in a linear fashion such as Task A > Task B > Task C

Often at workflow design time you won’t know exactly which tasks need to be run, for example you might have some data cleaning steps that only run if the source data is dirty or you might need to do an external system lookup and make a decision based on the returned result.

To show a simple example we are going to create a workflow that checks to see if you’ve passed in a “name” parameter and then greets you by name or just says hello stranger.

I’ll do all the examples in the ‘traditional syntax’ because I think its easier to understand but branching works using TaskFlow syntax as well

import logging
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator, BranchPythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
}

with DAG (
    'branching_example',
    default_args=default_args,
    description='A basic DAG branching example',
    schedule_interval=None, # No Schedule required 
    start_date=days_ago(0), # Don't backdate executions
) as dag: #Context Manager
    def hello_stranger():
        logging.info("Hello Stranger")

    def hello_name(**context):
        logging.info(f"Hello {context['params']['name']}")

    def branching_function(**context):
        if "name" in context['params']:
            return "hello_name_task"
        else:
            return "hello_stranger_task"

    branching_function_task = BranchPythonOperator(
        task_id=f"branching_function_task",
        python_callable=branching_function,
    )

    hello_name_task = PythonOperator(
        task_id=f'hello_name_task', 
        python_callable=hello_name, 
    )
    
    hello_stranger_task = PythonOperator(
        task_id=f'hello_stranger_task', 
        python_callable=hello_stranger, 
    )  

# Set dependency relationship
branching_function_task >> [hello_name_task, hello_stranger_task]

Here we define a task using the BranchPythonOperator and link it to a basic function (branching_function) which checks to see if there is a “name” parameter in the context dictionary.

The function returns the task id of the next step that should be executed

We then add the task instance to the dependency relationship and use a list of the possible branch options as the downstream dependency

If we have a look at the DAG in the UI you can see the simple choice it makes

From the green boxes you can see that we didn’t provide a name so it greets us a stranger.

If we run it again but provide a runtime payload of

{"name": "fred"}

Then we get a different decision made

This shows how simple it is to create a decision branch within a workflow

The Gotcha!

Most of the examples you find are similar to this one where you have a simple workflow that follows a simple branch and then ends.

Most of the time however it will follow a more complex path than that and you will have multiple tasks following the branch choice.

I found that as I was creating more complex workflows I was seeing behavior that didn’t seem to be following what was expected.

The branch choice occurred ok but then execution in downstream tasks just seemed to stop without error.

To demonstrate this, lets expand our example to be polite and include a goodbye task.

We are simply adding a new goodbye_task and then adding it to the dependency flow

import logging
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator, BranchPythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
}

with DAG (
    'branching_example',
    default_args=default_args,
    description='A basic DAG branching example',
    schedule_interval=None, # No Schedule required 
    start_date=days_ago(0), # Don't backdate executions
) as dag: #Context Manager
    def hello_stranger():
        logging.info("Hello Stranger")

    def hello_name(**context):
        logging.info(f"Hello {context['params']['name']}")

    def goodbye():
        logging.info("Goodbye!")

    def branching_function(**context):
        if "name" in context['params']:
            return "hello_name_task"
        else:
            return "hello_stranger_task"

    branching_function_task = BranchPythonOperator(
        task_id=f"branching_function_task",
        python_callable=branching_function,
    )

    hello_name_task = PythonOperator(
        task_id=f'hello_name_task', 
        python_callable=hello_name, 
    )
    
    hello_stranger_task = PythonOperator(
        task_id=f'hello_stranger_task', 
        python_callable=hello_stranger, 
    )  

    goodbye_task = PythonOperator(
        task_id="goodbye_task",
        python_callable=goodbye,
    )

# Set dependency relationship
branching_function_task >> [hello_name_task, hello_stranger_task] >> goodbye_task

When we run it again we see that the goodbye task is getting skipped even though we can clearly see that it is linked as a downstream task from hello_stranger_task

The issue here is the default trigger rule that gets applied to all tasks.

Airflow uses the trigger rule to help determine when a task should be executed and the default rule is “all_success”.

When an upstream task gets skipped because of a branching rule its status gets set to “skipped” which is not considered success so it will not be executed

The solution is to apply a different trigger rule to all downstream tasks from the branching point.

trigger_rule=TriggerRule.NONE_FAILED

By setting it to “none_failed” it will allow the task to continue

import logging
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
}

with DAG (
    'branching_example',
    default_args=default_args,
    description='A basic DAG branching example',
    schedule_interval=None, # No Schedule required 
    start_date=days_ago(0), # Don't backdate executions
) as dag: #Context Manager
    def hello_stranger():
        logging.info("Hello Stranger")

    def hello_name(**context):
        logging.info(f"Hello {context['params']['name']}")

    def goodbye():
        logging.info("Goodbye!")

    def branching_function(**context):
        if "name" in context['params']:
            return "hello_name_task"
        else:
            return "hello_stranger_task"

    branching_function_task = BranchPythonOperator(
        task_id=f"branching_function_task",
        python_callable=branching_function,
    )

    hello_name_task = PythonOperator(
        task_id=f'hello_name_task', 
        python_callable=hello_name, 
    )
    
    hello_stranger_task = PythonOperator(
        task_id=f'hello_stranger_task', 
        python_callable=hello_stranger, 
    )  

    goodbye_task = PythonOperator(
        task_id="goodbye_task",
        python_callable=goodbye,
        trigger_rule=TriggerRule.NONE_FAILED #Trigger rule
    )


# Set dependency relationship
branching_function_task >> [hello_name_task, hello_stranger_task] >> goodbye_task

If we run this now we should see all downstream tasks execute

Hopefully this has given you an idea of how you can use branching in your Airflow workflows to create more advanced task relationships and to avoid the trigger issues that can prevent tasks from executing.

If you would like to know more about how to implement modern data, streaming and cloud technologies, such as Apache Kafka, into your business, we at Digitalis do it all: from data engineering, cloud migration to fully managed services, we can help you modernize your applications, operations, and data. We provide consulting and managed services on cloud, data, and DevOps for any business type. Contact us for more information.

Subscribe to newsletter

Subscribe to receive the latest blog posts to your inbox every week.

By subscribing you agree to with our Privacy Policy.
Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Ready to Transform 

Your Business?