
Apache Airflow is a powerful, open-source tool that allows you to define workflows for data engineering and even general workflow processes.
A useful feature of Apache Airflow is the ability to manage to the directions of the workflows through the use of branches.
This allows for the creation of more complex flows and means you don’t have to go in a linear fashion such as Task A > Task B > Task C
Often at workflow design time you won’t know exactly which tasks need to be run, for example you might have some data cleaning steps that only run if the source data is dirty or you might need to do an external system lookup and make a decision based on the returned result.
To show a simple example we are going to create a workflow that checks to see if you’ve passed in a “name” parameter and then greets you by name or just says hello stranger.
I’ll do all the examples in the ‘traditional syntax’ because I think its easier to understand but branching works using TaskFlow syntax as well
import logging
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator, BranchPythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
}
with DAG (
'branching_example',
default_args=default_args,
description='A basic DAG branching example',
schedule_interval=None, # No Schedule required
start_date=days_ago(0), # Don't backdate executions
) as dag: #Context Manager
def hello_stranger():
logging.info("Hello Stranger")
def hello_name(**context):
logging.info(f"Hello {context['params']['name']}")
def branching_function(**context):
if "name" in context['params']:
return "hello_name_task"
else:
return "hello_stranger_task"
branching_function_task = BranchPythonOperator(
task_id=f"branching_function_task",
python_callable=branching_function,
)
hello_name_task = PythonOperator(
task_id=f'hello_name_task',
python_callable=hello_name,
)
hello_stranger_task = PythonOperator(
task_id=f'hello_stranger_task',
python_callable=hello_stranger,
)
# Set dependency relationship
branching_function_task >> [hello_name_task, hello_stranger_task]
Here we define a task using the BranchPythonOperator and link it to a basic function (branching_function) which checks to see if there is a “name” parameter in the context dictionary.
The function returns the task id of the next step that should be executed
We then add the task instance to the dependency relationship and use a list of the possible branch options as the downstream dependency
If we have a look at the DAG in the UI you can see the simple choice it makes

From the green boxes you can see that we didn’t provide a name so it greets us a stranger.
If we run it again but provide a runtime payload of
{"name": "fred"}
Then we get a different decision made

This shows how simple it is to create a decision branch within a workflow
The Gotcha!
Most of the examples you find are similar to this one where you have a simple workflow that follows a simple branch and then ends.
Most of the time however it will follow a more complex path than that and you will have multiple tasks following the branch choice.
I found that as I was creating more complex workflows I was seeing behavior that didn’t seem to be following what was expected.
The branch choice occurred ok but then execution in downstream tasks just seemed to stop without error.
To demonstrate this, lets expand our example to be polite and include a goodbye task.
We are simply adding a new goodbye_task and then adding it to the dependency flow
import logging
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator, BranchPythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
}
with DAG (
'branching_example',
default_args=default_args,
description='A basic DAG branching example',
schedule_interval=None, # No Schedule required
start_date=days_ago(0), # Don't backdate executions
) as dag: #Context Manager
def hello_stranger():
logging.info("Hello Stranger")
def hello_name(**context):
logging.info(f"Hello {context['params']['name']}")
def goodbye():
logging.info("Goodbye!")
def branching_function(**context):
if "name" in context['params']:
return "hello_name_task"
else:
return "hello_stranger_task"
branching_function_task = BranchPythonOperator(
task_id=f"branching_function_task",
python_callable=branching_function,
)
hello_name_task = PythonOperator(
task_id=f'hello_name_task',
python_callable=hello_name,
)
hello_stranger_task = PythonOperator(
task_id=f'hello_stranger_task',
python_callable=hello_stranger,
)
goodbye_task = PythonOperator(
task_id="goodbye_task",
python_callable=goodbye,
)
# Set dependency relationship
branching_function_task >> [hello_name_task, hello_stranger_task] >> goodbye_task
When we run it again we see that the goodbye task is getting skipped even though we can clearly see that it is linked as a downstream task from hello_stranger_task

The issue here is the default trigger rule that gets applied to all tasks.
Airflow uses the trigger rule to help determine when a task should be executed and the default rule is “all_success”.
When an upstream task gets skipped because of a branching rule its status gets set to “skipped” which is not considered success so it will not be executed
The solution is to apply a different trigger rule to all downstream tasks from the branching point.
trigger_rule=TriggerRule.NONE_FAILED
By setting it to “none_failed” it will allow the task to continue
import logging
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
}
with DAG (
'branching_example',
default_args=default_args,
description='A basic DAG branching example',
schedule_interval=None, # No Schedule required
start_date=days_ago(0), # Don't backdate executions
) as dag: #Context Manager
def hello_stranger():
logging.info("Hello Stranger")
def hello_name(**context):
logging.info(f"Hello {context['params']['name']}")
def goodbye():
logging.info("Goodbye!")
def branching_function(**context):
if "name" in context['params']:
return "hello_name_task"
else:
return "hello_stranger_task"
branching_function_task = BranchPythonOperator(
task_id=f"branching_function_task",
python_callable=branching_function,
)
hello_name_task = PythonOperator(
task_id=f'hello_name_task',
python_callable=hello_name,
)
hello_stranger_task = PythonOperator(
task_id=f'hello_stranger_task',
python_callable=hello_stranger,
)
goodbye_task = PythonOperator(
task_id="goodbye_task",
python_callable=goodbye,
trigger_rule=TriggerRule.NONE_FAILED #Trigger rule
)
# Set dependency relationship
branching_function_task >> [hello_name_task, hello_stranger_task] >> goodbye_task
If we run this now we should see all downstream tasks execute

Hopefully this has given you an idea of how you can use branching in your Airflow workflows to create more advanced task relationships and to avoid the trigger issues that can prevent tasks from executing.
If you would like to know more about how to implement modern data, streaming and cloud technologies, such as Apache Kafka, into your business, we at Digitalis do it all: from data engineering, cloud migration to fully managed services, we can help you modernize your applications, operations, and data. We provide consulting and managed services on cloud, data, and DevOps for any business type. Contact us for more information.