What’s New with Airflow 2.0

Archit Pandita
Hashmap, an NTT DATA Company
7 min readDec 21, 2020

--

That’s right, Airflow, in its new version, has changed the API. This is not unexpected on major releases but can cause issues when upgrading. But have no fear, earlier version functionality has been preserved — but deprecated. From now on, however, you should look into some of the simplifying changes that have been made.

What’s here for you?

This article will go through some of the new features of Airflow 2.0 ( Python 3.5+) and how we can utilize these new features to build highly available, scalable, and efficient solutions.

What’s exciting about 2.0?

1) Quicker Multi-scheduler: moving from a single scheduler to multiple schedulers.

Working with multiple workers and managing them via a single scheduler makes the system more vulnerable to breakdown when the single scheduler breaks. This is more like having a single load balance between managing requests to multi-web servers, which may seem the design is highly available and resilient. Still, it has a single point of failure, which obviously we don’t want. Due to this challenge, we now have a multi-scheduler facility with Airflow 2.0. This increases the performance, scalability, high availability, and system resiliency system as would be expected. Scheduler — Airflow Documentation (apache.org)

Visualization of multiple (parallel) schedulers

Benchmarking: The Airflow 2.0 Scheduler | Astronomer

2) Mandatory Dag Serialization: It is now mandatory to serialize and store DAGs.

The earlier dag that is run must be present on both our web server and scheduler, which is inefficient since it requires unnecessary duplicity. This can now be made efficient using DAG serialization. The dag is serialized and read by the scheduler, and the webserver reads it from metadata whenever it needs to read it, which gives it a lazy read capability that improves the overall performance. DAG Serialization — Airflow Documentation (apache.org)

With this, our web server becomes stateless and easy to maintain too.

Visualization of the change in architecture wrt DAG usage.

3) Smart Sensor:

When working with many sensors, it becomes important to have an efficient system where processes are done individually and don’t bring up the cost. Before, the sensors ran the process for each task, which consumed additional time. But now, DAGs are centralized, and processing is done in batches.

First, we serialize the task and store it in the database.

Secondly, we use the centralized process to execute these serialized tasks in the batches. DAGs contain SmartSensorOperator tasks and manage the smart sensor jobs for the Airflow cluster.

Smart Sensor — Airflow Documentation (apache.org)

(Note that this feature is still in its early stage)

To use Smart Sensors, add the following settings in the airflow.cfg:

[smart_sensor]
use_smart_sensor = true
shard_code_upper_limit = 10000
# Users can change the following config based on their requirements
shards = 5
sensors_enabled = NamedHivePartitionSensor, MetastorePartitionSensor

4) Re-architected Kubernetes executer: A new YAML configuration and changed parameterization for simpler and quicker coding.

The changes are as follows:

  • Define pod_template_file YAML file to keep parameters instead of airflow.cfg
  • Use the new parameter pod_override (dict type). It is passed in the operator as an argument and overrides the Pod object parameters.
    This is significant as it removed 3000 lines of code.
    Kubernetes Executor — Airflow Documentation (apache.org)

5) Taskflow API (functional dag): Using decorator to define DAGs.

Now, the way we were used to defining dag has been changed. Airflow has introduced decorator based dag creation with the task flow API. If you are familiar with Prefect — another DAG/scheduling framework in Python, then this will feel and appear very similar — if not nearly identical on the surface. Underneath, it is still just Airflow.

@task -> is used to define the task

@dag -> to instantiate the dag

The decorator is not used in the previous version, and only global task methods are considered.

Let’s see some new dag code:

# Example DAG built with the TaskFlow APIwith DAG(
'send_server_ip', default_args=default_args, schedule_interval=None
) as dag:

# Using default connection as it's set to httpbin.org by default
get_ip = SimpleHttpOperator(
task_id='get_ip', endpoint='get', method='GET', xcom_push=True
)

@dag.task(multiple_outputs=True)
def prepare_email(raw_json: str) -> Dict[str, str]:
external_ip = json.loads(raw_json)['origin']
return {
'subject':f'Server connected from {external_ip}',
'body': f'Seems like today your server executing Airflow is connected from the external IP {external_ip}<br>'
}

email_info = prepare_email(get_ip.output)

send_email = EmailOperator(
task_id='send_email',
to='example@example.com',
subject=email_info['subject'],
html_content=email_info['body']
)

Imports required:

import json

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
}

Dag instantiation (Dag decorator) :
This is the dag instantiation where we define the dag properties and

@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2))
def taskflow_api_etl():
"""
### TaskFlow API
ETL dag with the configuration we want to setup this dag.
For eg: dag don't have any schedule interval.
"""

Task definition (Task decorator):
@task will wrap the function as a task or data pipeline where the function name work as a unique task name.

@task()
def extract():
"""
#### Extract task
A simple Extract task to get data ready for the rest of the data
pipeline. In this case, getting data is simulated by reading from a
hardcoded JSON string.
"""
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

order_data_dict = json.loads(data_string)
return order_data_dict

Multiple output and Xcom:
If the function returns multiple values like the dictionary, the value of “multiple_outputs” is automatically set to True.

Behind the scene, the Xcom variable is used to pass the value between each task like before and readable via UI.

@task(multiple_outputs=True)
def transform(order_data_dict: dict):
"""
#### Transform task
A simple Transform task which takes in the collection of order data and
computes the total order value.
"""
total_order_value = 0

for value in order_data_dict.values():
total_order_value += value

return {"total_order_value": total_order_value}

let's have a third function, “load.”

@task()
def load(total_order_value: float):
"""
#### Load task
A simple Load task which takes in the result of the Transform task and
instead of saving it to end user review, just prints it out.
"""

print("Total order value is: %.2f" % total_order_value)

Creating flow or dag or pipeline (relationship builder):

Remember the Xcom variable? that is what is used behind the scene via which the output of the parent function is passed to a child or dependent function.
With this new technique, it becomes straightforward to define the pipeline, which seems more like Python's common function usage. Because of @task Airflow will understand and manage to run the whole pipeline or dag efficiently.

# calling the extract method and return value to stored
order_data = extract()
# calling transform which takes input from previous output
order_summary = transform(order_data)
# call load which gets input from transform
load(order_summary["total_order_value"])

The old way of building dependencies:

extract_task >> transform_task >> load_task

Now we have defined the pipeline, and during execution, this is executed by the workers may be in different nodes, which Airflow will manage.

Enabling the dag:

The whole dag is created under the name “taskflow_api_etl” which is wrapped by @dag decorator. Thus, we need to enable or call the method to build the pipeline.

tutorial_etl_dag = taskflow_api_etl()

That's pretty much how to get an idea of how Airflow 2.0 will make achieving your goal more easily.

Extra point:

Scope

Airflow will load any DAG object it can import from a DAGfile. Critically, that means the DAG must appear in globals(). Consider the following two DAGs. Only dag_1 will be loaded; the other one only appears in a local scope.

dag_1 = DAG('this_dag_will_be_discovered')def my_function():
dag_2 = DAG('but_this_dag_will_not')
my_function()

Sometimes this can be put to good use. For example, a common pattern with SubDagOperator defines the subdag inside a function so that Airflow doesn't try to load it as a standalone DAG.

Ready to Accelerate Your Digital Transformation?

At Hashmap, we work with our clients to build better together.

If you’d like additional assistance in this area, Hashmap offers a range of enablement workshops and consulting service packages as part of our consulting service offerings, and would be glad to work through your specifics. Reach out to us here.

Feel free to share on other channels, and be sure and keep up with all new content from Hashmap here. To listen in on a casual conversation about all things data engineering and the cloud, check out Hashmap’s podcast Hashmap on Tap as well on Spotify, Apple, Google, and other popular streaming apps.

Other Tools and Content For You

Archit Pandita is a Python developer and Data Engineer with Hashmap providing Data, Cloud, IoT and AI/ML solutions and consulting expertise across industries with a group of innovative technologists and domain experts accelerating high-value business outcomes for our customers.
Have a question? Don’t hesitate to reach out to connect or exchange more information. I’m happy to help:
Archit’s LinkedIn

--

--