Orchestrating jobs with Apache Airflow\Cloud Composer

Date posted: 2018-08-19

If you have complex-ish pipelines, especially ETL pipelines, chances are you run a lot of batch jobs. How do you schedule them? Cron jobs? Cron jobs are really great when you just want to run tasks X times per day and they are fairly independent from each other. But what if the scenario is a little more complex and you have a dependency tree? For example, consider the following task tree:

        |-> Task B.1 -|
        |-> Task B.2 -|
Task A -|-> Task B.3 -|-> Task C
        |-> Task B.4 -|
        |-> Task B.5 -|

Cron Job schedule:
Task  A - Starts at 00:30
Tasks B - Starts at 01:30
Task  C - Starts at 03:30

- Tasks B depend on the output of Task A
  and Task C depends on the outputs from Tasks B
- Task A has a 1h window to finish before Tasks B start
  and Tasks B have a 2h window to finish before Task C starts

Looks wonderful, but there are several problems there:

Apache Airflow aims to be a solution for these problems, and recently Google Cloud Products announced their own managed version of Apache Airflow: Cloud Composer. In this post I will run some tests on Airflow and Composer and talk about a few issues and solutions I would have if using them in production.

First, I am going to install Airflow in my local machine. It is very easy to install. All you need is pip (in my case, I am using Python 3):

~ > pip install apache-airflow
~ > airflow upgradedb

Alright. Now, just a little more theory about Airflow: the "main piece" of your Airflow workflow is a DAG (directed acyclic graph), which is simply a python script that tells Airflow what to execute and in which order. A DAG should not have data processing logic! It is just a file that describes your workflow.

The default location for dags is ~/airflow/dags. If this directory doesn't exist, create it.

Let's try creating a very simple DAG and run it, just to see if everything is properly installed and get a feeling for how it all works.

First DAG

This dag will be very simple. It will have three tasks:

  1. print_date - Will run the date bash command
  2. sleep - Will sleep for 5 seconds
  3. done - Will echo "done"

They will be executed in sequence (each one waiting for the previous one to finish).

Here is how the DAG looks like when finished (don't worry, I will explain everything). Make sure to save it in the ~/airflow/dags directory:

# sampledag.py

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 8, 17),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('sampledag', default_args=default_args)

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

t3 = BashOperator(
    task_id='done',
    bash_command='echo done',
    dag=dag)

t1 >> t2 >> t3

Let's take a look at what this dag is doing:

Imports

# Importing a constructor for a DAG
from airflow import DAG

# Importing the Bash Operator - Operators are, putting it simply, a constructor
# for a task. So, in this case, a Bash operator is something you would use
# to build a task that runs a bash script
from airflow.operators.bash_operator import BashOperator

# Just a few functions to work with dates
from datetime import datetime, timedelta

Default arguments

These are default arguments for a DAG. You can see what these properties do here.

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 8, 17),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

Creating the DAG

Here I am instantiating my main DAG. The id will be sampledag.

dag = DAG('sampledag', default_args=default_args)

Tasks

Here I am defining my three tasks. They are all bash commands (which is why I am using BashOperator). I am assigning them to my main DAG. I am also allowing t2 to be retried three times, in case it fails.

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

t3 = BashOperator(
    task_id='done',
    bash_command='echo done',
    dag=dag)

Dependency tree

Now this is cool. The >> and << operators are overloaded for tasks, and I can use them to say "Execute t2, and then t2, and then t3", or, "t3 depends on t2, which depends on t1".

t1 >> t2 >> t3

Testing the first DAG

First, let's try parsing the DAG. Executing the DAG directly with python won't execute any jobs - we just want to see if it will parse:

~/airflow/dags > python sampledag.py
~/airflow/dags > 

No output. This is good. It means that everything parsed!

Now let's see if Airflow sees our dag using the command airflow list_dags:

~/airflow/dags > airflow list_dags
[2018-08-19 12:13:54,799] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-08-19 12:13:54,833] {models.py:189} INFO - Filling up the DagBag from /home/hscasn/airflow/dags

-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
example_bash_operator
example_branch_dop_operator_v3
example_branch_operator
example_http_operator
example_passing_params_via_test_command
example_python_operator
example_short_circuit_operator
example_skip_dag
example_subdag_operator
example_subdag_operator.section-1
example_subdag_operator.section-2
example_trigger_controller_dag
example_trigger_target_dag
example_xcom
latest_only
latest_only_with_trigger
sampledag <---------------- HERE IT IS!!!!
test_utils
tutorial

Cool! Let's see if Airflow sees our task tree. For this, use the command airflow list_tasks <dag> --tree. It will print all our tasks names and their dependencies:

~/airflow/dags > airflow list_tasks sampledag --tree
[2018-08-19 12:16:28,518] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-08-19 12:16:28,548] {models.py:189} INFO - Filling up the DagBag from /home/hscasn/airflow/dags
<Task(BashOperator): done>
    <Task(BashOperator): sleep>
        <Task(BashOperator): print_date>

That strange tree tells us that the done tasks depends on the sleep task, which depends on the print_date task. Looks good to me!

Now we can try testing one of the tasks. We can try running the print_date task for day 2018-08-08. Notice that although we are passing August 8th 2018, the command date will still print today's date (just another name we gave to August 19th 2018. It will probably change in the future). We can use that date argument for backfilling, but it's not something we are doing now.

~/airflow/dags > airflow test sampledag print_date 2018-08-08
[2018-08-19 12:26:20,468] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-08-19 12:26:20,500] {models.py:189} INFO - Filling up the DagBag from /home/hscasn/airflow/dags
[2018-08-19 12:26:20,563] {bash_operator.py:70} INFO - Tmp dir root location: 
 /tmp
[2018-08-19 12:26:20,563] {bash_operator.py:80} INFO - Temporary script location: /tmp/airflowtmpvja081o7//tmp/airflowtmpvja081o7/print_dateg_2cg9xg
[2018-08-19 12:26:20,563] {bash_operator.py:88} INFO - Running command: date
[2018-08-19 12:26:20,565] {bash_operator.py:97} INFO - Output:
[2018-08-19 12:26:20,568] {bash_operator.py:101} INFO - Sun Aug 19 12:26:20 EDT 2018
[2018-08-19 12:26:20,568] {bash_operator.py:105} INFO - Command exited with return code 0

Ok. I am convinced that this will work. Let's try running the dag for the days August 16th, August 17th, and August 18th.

~/airflow/dags > airflow backfill sampledag -s 2018-08-16 -e 2018-08-18
... lots of verbosity here
[2018-08-19 12:31:23,545] {jobs.py:2125} INFO - [backfill progress] | finished run 3 of 3 | tasks waiting: 0 | succeeded: 9 | kicked_off: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2018-08-19 12:31:23,545] {jobs.py:2455} INFO - Backfill done. Exiting.

Boring, isn't it? Well, the tasks succeeded, but this is far from the "Can you see the progress and logs of your task tree easily?" I promised in the beginning. Fear not! Run this command and you will have an Airflow web interface running on localhost:8080!:

~ > airflow webserver

Now let's go to localhost:8080:

Airflow Webserver - DAGs view

Airflow Webserver - DAGs view

This is a lot more interesting. Look! Our DAG is right there at the end! Let's click on it and see what happens!

Airflow Webserver - sampledag tree view

Airflow Website - sampledag tree view

This is awesome: it shows the three runs that we had before and their status (dark green = success)!

That is the Tree view. Let's take a look at the Graph view:

Airflow Webserver - sampledag graph view

Airflow Webserver - sampledag graph view

Very nice. Let's try something more advanced now.

Second DAG

My second dag will be more like the task tree I described in the beginning of the article (Task A, B, and C). This time I will also deploy my DAG in Google Composer instead of locally.

This is my new DAG:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.subdag_operator import SubDagOperator
from datetime import datetime, timedelta

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 8, 10),
    'email': ['test@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

PARENT_DAG_NAME='my_main_dag'
SUB_DAG_NAME='my_sub_dag'

with DAG(PARENT_DAG_NAME, schedule_interval=timedelta(days=1), default_args=args) as main_dag:

    initial = BashOperator(
        task_id='initialtask',
        bash_command='sleep $(( RANDOM % 10 )); echo "trying inital task"; exit $(( (RANDOM % 3) == 0 ))',
        retries=10,
        dag=main_dag)

    tasks_dag=DAG('%s.%s' % (PARENT_DAG_NAME, SUB_DAG_NAME), default_args=main_dag.default_args , concurrency=3)
    tasks = SubDagOperator(
        subdag=tasks_dag,
        task_id=SUB_DAG_NAME,
        dag=main_dag)
    for x in range(0, 20):
        BashOperator(
            task_id="task%d" % (x),
            bash_command="sleep $(( RANDOM %% 10 )); echo 'trying task %d'; exit $(( (RANDOM %% 3) == 0 ))" % (x),
            retries=10,
            dag=tasks_dag)

    final = BashOperator(
        task_id='finaltask',
        bash_command='sleep $(( RANDOM % 10 )); echo "trying final task"; exit $(( (RANDOM % 3) == 0 ))',
        retries=10,
        dag=main_dag)

    initial >> tasks >> final

The first and last tasks should be no surprise to you, except the command. What is that command doing? That command is simply generating a random number of seconds for the task to sleep (so it will feel more realistic) and has a 30% chance of failing (exiting with code 1).

Now, what is that second part with tasks_dag, tasks, and the for loop? Let's take a look:

tasks_dag=DAG('%s.%s' % (PARENT_DAG_NAME, SUB_DAG_NAME), default_args=main_dag.default_args , concurrency=3)

Notice that I am setting concurrency=3 for that DAG. This means that it will only run 3 tasks in parallel at a time!

Why setting "concurrency"?

I find "concurrency" useful if I want to run my jobs all in parallel, but there is a limit to the number of jobs running at the same time.

Let's say, for example, that you have 16 different Google Dataflow jobs (a managed service for batch/streaming tasks) that will run once a day. Since these jobs are independent, you can run them all at once. However, there is a problem: you find out that there is a limit on the number of Dataflow jobs, and since you already have some jobs running, you find out that it is only safe to have 3 more jobs running at the same time :(

Well, now you have two options:

1- You can organize the DAG so it runs 3 jobs first, and when they are all done, you run three more, and then three more, etc

2- You can tell your DAG to run them in parallel, but having no more than 3 jobs running at once

The first option is not very good: it's harder to maintain, and it wastes time. It wastes time because one job may finish faster than the other two (say, one job takes 1 hour, the second one takes 1 hours, and the third one takes 3 hours, but for the next batch to start, all of them must be done.

The second option is a lot better. You can tell Airflow to run a maximum of 3 jobs at a time by defining a "concurrency". It will start three jobs, and as soon as one job is done, it will start another one.

Say our jobs are Job A, B, C, D, E, etc. We will start with jobs A, B, and C. A finishes in 1 hour, B in 2, and C in 3 hours. The other jobs will follow the same pattern (1h, 2h, 3h). Let's try visualizing what this would look like hour by hour.

First option:
A B C -> _ B C -> _ _ C -> D E F -> _ E F -> _ _ F -> G H I -> ...

Second option:
A B C -> D B C -> E F C -> G F H -> I J H -> ...

The second option will make it finish much faster because we won't have "gaps" between the jobs!

Here I am creating a new DAG, which will be a sub-DAG. This is a great way to fork tasks. A top level look would be like this:

Initial task -> Sub Dag -> Final task

The Sub Dag, in my case, will contain 16 tasks executed in parallel:

# Sub Dag
-|-> Task 0  -|->
 |-> Task 1  -|
 |-> Task 2  -|
 |-> ...     -|
 |-> Task 15 -|

The result, if we opened the "Sub Dag" blackbox:

Initial Task -|-> Task 0  -|-> Final Task
              |-> Task 1  -|
              |-> Task 2  -|
              |-> ...     -|
              |-> Task 15 -|

Now let's see now I can create the tasks and join them with the main DAG:

# Remember: tasks are operators, so we need to "cast" our DAG as an operator.
# We can use the SubDagOperator for this:

tasks = SubDagOperator(
    subdag=tasks_dag,
    task_id=SUB_DAG_NAME,
    dag=main_dag)

Now let's populate our sub-DAG with some bash commands:

for x in range(0, 20):
    BashOperator(
        task_id="task%d" % (x),
        bash_command="sleep $(( RANDOM %% 10 )); echo 'trying task %d'; exit $(( (RANDOM %% 3) == 0 ))" % (x),
        retries=10,
        dag=tasks_dag)

There we go. Our new DAG is ready. Now we need to set up Cloud Composer.

Setting it up is very simple - just follow the tutorial and you'll be up and running in no time. All you have to do is set up the cluster (it's easy - trust me) and upload the DAG file.

Let's have a look at the webserver.

This is the graph view:

Composer Webserver - second dag graph view

Composer Webserver - second dag graph view

To see what is inside the Sub Dag, we have to zoom into it:

Composer Webserver - second dag subdag graph view

Composer Webserver - second dag subdag graph view

Looks nice. Now let's see the tree view for all the days I backfilled:

Composer Webserver - second dag tree view

Composer Webserver - second dag tree view

Now this is awesome! We can clearly see the tasks that failed (red), are being retried (yellow), are still running (light green), and succeeded (dark green)!

...one last thing

Bash scripts are cool and everyone who disagree are wrong, but you probably won't be launching jobs from bash scripts inside the Airflow pod/container/machine. What else can I do?

Well, if you are really OK with launching scripts from your Airflow pod/container/machine, you can use the PythonOperator to assign a python function as a task:

def greeting():
    print('Hello!')

PythonOperator(
        task_id='hello',
        python_callable=greeting)

Still, I don't like this because I don't feel like rewriting my entire ETL jobs in python and stuffing them inside my Airflow pods.

If you are using Google Cloud Products, Composer gives us operators especially made for launching tasks with these products, such as Dataflow, Dataproc, Datstore, etc. Take a look: Google Cloud Platform Operators.

# Launching a Dataflow task from Airflow

task = DataFlowJavaOperator(
    gcp_conn_id='gcp_default',
    task_id='normalize-cal',
    jar='{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar',
    options={
        'autoscalingAlgorithm': 'BASIC',
        'maxNumWorkers': '50',
        'start': '{{ds}}',
        'partitionType': 'DAY'

    },
    dag=dag)

Now this is more interesting, huh? But if this is still not enough, Airflow has a KubernetesPodOperator! This operator allows you to launch Kubernetes Pods as tasks!

from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

KubernetesPodOperator(
    task_id='pod-ex-minimum',
    name='pod-ex-minimum',
    namespace='default',
    image='gcr.io/gcp-runtimes/ubuntu_16_0_4')

This will allow you to launch any task you want, in any language you want. Now things got serious! Node.js, Rust, C? No problem. Just launch the pod and Airflow will check its exit status!