Orchestrating jobs with Apache Airflow/Cloud Composer

If you have complex-ish pipelines, especially ETL pipelines, chances are you run a lot of batch jobs. How do you schedule them? Cron jobs? Cron jobs are really great when you just want to run tasks X times per day and they are fairly independent from each other. But what if the scenario is a little more complex and you have a dependency tree? For example, consider the following task tree: ```bash |-> Task B.1 -| |-> Task B.2 -| Task A -|-> Task B.3 -|-> Task C |-> Task B.4 -| |-> Task B.5 -| Cron Job schedule: Task A - Starts at 00:30 Tasks B - Starts at 01:30 Task C - Starts at 03:30 - Tasks B depend on the output of Task A and Task C depends on the outputs from Tasks B - Task A has a 1h window to finish before Tasks B start and Tasks B have a 2h window to finish before Task C starts ``` Looks wonderful, but there are several problems there: - What if Task A or Tasks B exceed the required time? The tasks downstream will start even without the correct output! - What if one of the tasks fail? Can you rerun the tasks easily? - Do you have a limit of concurrent tasks to run? What if you can't run all 5 Tasks B at once? - Can you do backfills? - Can you see the progress and logs of your task tree easily? - If you are keeping the state of the tasks in memory, what happens if the task manager goes down? **Apache Airflow** aims to be a solution for these problems, and recently *Google Cloud Products* announced their own managed version of Apache Airflow: **Cloud Composer**. In this post I will run some tests on Airflow and Composer and talk about a few issues and solutions I would have if using them in production. First, I am going to install Airflow in my local machine. It is very easy to install. All you need is **pip** (in my case, I am using Python 3): ```bash ~ > pip install apache-airflow ~ > airflow upgradedb ``` Alright. Now, just a little more theory about Airflow: the "main piece" of your Airflow workflow is a DAG (directed acyclic graph), which is simply a python script that tells Airflow what to execute and in which order. A DAG should not have data processing logic! It is just a file that describes your workflow. The default location for dags is `~/airflow/dags`. If this directory doesn't exist, create it. Let's try creating a very simple DAG and run it, just to see if everything is properly installed and get a feeling for how it all works. # First DAG This dag will be very simple. It will have three tasks: 1. **print_date** - Will run the `date` bash command 1. **sleep** - Will sleep for 5 seconds 1. **done** - Will echo "done" They will be executed in sequence (each one waiting for the previous one to finish). Here is how the DAG looks like when finished (don't worry, I will explain everything). Make sure to save it in the `~/airflow/dags` directory: ```python # from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 8, 17), 'email': [''], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG('sampledag', default_args=default_args) t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3, dag=dag) t3 = BashOperator( task_id='done', bash_command='echo done', dag=dag) t1 >> t2 >> t3 ``` Let's take a look at what this dag is doing: ### Imports ```python # Importing a constructor for a DAG from airflow import DAG # Importing the Bash Operator - Operators are, putting it simply, a constructor # for a task. So, in this case, a Bash operator is something you would use # to build a task that runs a bash script from airflow.operators.bash_operator import BashOperator # Just a few functions to work with dates from datetime import datetime, timedelta ``` ### Default arguments These are default arguments for a DAG. You can see what these properties do here. ```python default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 8, 17), 'email': [''], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } ``` ### Creating the DAG Here I am instantiating my main DAG. The id will be `sampledag`. ```python dag = DAG('sampledag', default_args=default_args) ``` ### Tasks Here I am defining my three tasks. They are all bash commands (which is why I am using BashOperator). I am assigning them to my main DAG. I am also allowing `t2` to be retried three times, in case it fails. ```python t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3, dag=dag) t3 = BashOperator( task_id='done', bash_command='echo done', dag=dag) ``` ### Dependency tree Now this is cool. The `>>` and `<<` operators are overloaded for tasks, and I can use them to say "Execute t2, and then t2, and then t3", or, "t3 depends on t2, which depends on t1". ```python t1 >> t2 >> t3 ``` ### Testing the first DAG First, let's try parsing the DAG. Executing the DAG directly with `python` won't execute any jobs - we just want to see if it will parse: ```bash ~/airflow/dags > python ~/airflow/dags > ``` No output. This is good. It means that everything parsed! Now let's see if Airflow sees our dag using the command `airflow list_dags`: ```bash ~/airflow/dags > airflow list_dags [2018-08-19 12:13:54,799] {} INFO - Using executor SequentialExecutor [2018-08-19 12:13:54,833] {} INFO - Filling up the DagBag from /home/hscasn/airflow/dags ------------------------------------------------------------------- DAGS ------------------------------------------------------------------- example_bash_operator example_branch_dop_operator_v3 example_branch_operator example_http_operator example_passing_params_via_test_command example_python_operator example_short_circuit_operator example_skip_dag example_subdag_operator example_subdag_operator.section-1 example_subdag_operator.section-2 example_trigger_controller_dag example_trigger_target_dag example_xcom latest_only latest_only_with_trigger sampledag <---------------- HERE IT IS!!!! test_utils tutorial ``` Cool! Let's see if Airflow sees our task tree. For this, use the command `airflow list_tasks --tree`. It will print all our tasks names and their dependencies: ```json ~/airflow/dags > airflow list_tasks sampledag --tree [2018-08-19 12:16:28,518] {} INFO - Using executor SequentialExecutor [2018-08-19 12:16:28,548] {} INFO - Filling up the DagBag from /home/hscasn/airflow/dags ``` That strange tree tells us that the `done` tasks depends on the `sleep` task, which depends on the `print_date` task. Looks good to me! Now we can try testing one of the tasks. We can try running the `print_date` task for day `2018-08-08`. Notice that although we are passing August 8th 2018, the command `date` will still print today's date (just another name we gave to August 19th 2018. It will probably change in the future). We can use that date argument for backfilling, but it's not something we are doing now. ```bash ~/airflow/dags > airflow test sampledag print_date 2018-08-08 [2018-08-19 12:26:20,468] {} INFO - Using executor SequentialExecutor [2018-08-19 12:26:20,500] {} INFO - Filling up the DagBag from /home/hscasn/airflow/dags [2018-08-19 12:26:20,563] {} INFO - Tmp dir root location: /tmp [2018-08-19 12:26:20,563] {} INFO - Temporary script location: /tmp/airflowtmpvja081o7//tmp/airflowtmpvja081o7/print_dateg_2cg9xg [2018-08-19 12:26:20,563] {} INFO - Running command: date [2018-08-19 12:26:20,565] {} INFO - Output: [2018-08-19 12:26:20,568] {} INFO - Sun Aug 19 12:26:20 EDT 2018 [2018-08-19 12:26:20,568] {} INFO - Command exited with return code 0 ``` Ok. I am convinced that this will work. Let's try running the dag for the days August 16th, August 17th, and August 18th. ```bash ~/airflow/dags > airflow backfill sampledag -s 2018-08-16 -e 2018-08-18 ... lots of verbosity here [2018-08-19 12:31:23,545] {} INFO - [backfill progress] | finished run 3 of 3 | tasks waiting: 0 | succeeded: 9 | kicked_off: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0 [2018-08-19 12:31:23,545] {} INFO - Backfill done. Exiting. ``` Boring, isn't it? Well, the tasks succeeded, but this is far from the "Can you see the progress and logs of your task tree easily?" I promised in the beginning. Fear not! Run this command and you will have an Airflow web interface running on `localhost:8080`!: ```bash ~ > airflow webserver ``` Now let's go to `localhost:8080`: ![Airflow Webserver - DAGs view](
Airflow Webserver - DAGs view
This is a lot more interesting. Look! Our DAG is right there at the end! Let's click on it and see what happens! ![Airflow Webserver - sampledag tree view](
Airflow Website - sampledag tree view
This is awesome: it shows the three runs that we had before and their status (dark green = success)! That is the **Tree view**. Let's take a look at the **Graph view**: ![Airflow Webserver - sampledag graph view](
Airflow Webserver - sampledag graph view
Very nice. Let's try something more advanced now. # Second DAG My second dag will be more like the task tree I described in the beginning of the article (Task A, B, and C). This time I will also deploy my DAG in Google Composer instead of locally. This is my new DAG: ```python from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.subdag_operator import SubDagOperator from datetime import datetime, timedelta args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 8, 10), 'email': [''], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } PARENT_DAG_NAME='my_main_dag' SUB_DAG_NAME='my_sub_dag' with DAG(PARENT_DAG_NAME, schedule_interval=timedelta(days=1), default_args=args) as main_dag: initial = BashOperator( task_id='initialtask', bash_command='sleep $(( RANDOM % 10 )); echo "trying inital task"; exit $(( (RANDOM % 3) == 0 ))', retries=10, dag=main_dag) tasks_dag=DAG('%s.%s' % (PARENT_DAG_NAME, SUB_DAG_NAME), default_args=main_dag.default_args , concurrency=3) tasks = SubDagOperator( subdag=tasks_dag, task_id=SUB_DAG_NAME, dag=main_dag) for x in range(0, 20): BashOperator( task_id="task%d" % (x), bash_command="sleep $(( RANDOM %% 10 )); echo 'trying task %d'; exit $(( (RANDOM %% 3) == 0 ))" % (x), retries=10, dag=tasks_dag) final = BashOperator( task_id='finaltask', bash_command='sleep $(( RANDOM % 10 )); echo "trying final task"; exit $(( (RANDOM % 3) == 0 ))', retries=10, dag=main_dag) initial >> tasks >> final ``` The first and last tasks should be no surprise to you, except the command. What is that command doing? That command is simply generating a random number of seconds for the task to sleep (so it will feel more realistic) and has a 30% chance of failing (exiting with code 1). Now, what is that second part with **tasks_dag**, **tasks**, and the **for** loop? Let's take a look: ```python tasks_dag=DAG('%s.%s' % (PARENT_DAG_NAME, SUB_DAG_NAME), default_args=main_dag.default_args , concurrency=3) ``` Notice that I am setting `concurrency=3` for that DAG. This means that it will only run 3 tasks in parallel at a time!
Why setting "concurrency"?

I find "concurrency" useful if I want to run my jobs all in parallel, but there is a limit to the number of jobs running at the same time.

Let's say, for example, that you have 16 different Google Dataflow jobs (a managed service for batch/streaming tasks) that will run once a day. Since these jobs are independent, you can run them all at once. However, there is a problem: you find out that there is a limit on the number of Dataflow jobs, and since you already have some jobs running, you find out that it is only safe to have 3 more jobs running at the same time :(

Well, now you have two options:

1- You can organize the DAG so it runs 3 jobs first, and when they are all done, you run three more, and then three more, etc

2- You can tell your DAG to run them in parallel, but having no more than 3 jobs running at once

The first option is not very good: it's harder to maintain, and it wastes time. It wastes time because one job may finish faster than the other two (say, one job takes 1 hour, the second one takes 1 hours, and the third one takes 3 hours, but for the next batch to start, all of them must be done.

The second option is a lot better. You can tell Airflow to run a maximum of 3 jobs at a time by defining a "concurrency". It will start three jobs, and as soon as one job is done, it will start another one.

Say our jobs are Job A, B, C, D, E, etc. We will start with jobs A, B, and C. A finishes in 1 hour, B in 2, and C in 3 hours. The other jobs will follow the same pattern (1h, 2h, 3h). Let's try visualizing what this would look like hour by hour.

First option:
A B C -> _ B C -> _ _ C -> D E F -> _ E F -> _ _ F -> G H I -> ...

Second option:
A B C -> D B C -> E F C -> G F H -> I J H -> ...

The second option will make it finish much faster because we won't have "gaps" between the jobs!
Here I am creating a new DAG, which will be a sub-DAG. This is a great way to fork tasks. A top level look would be like this: ``` Initial task -> Sub Dag -> Final task ``` The Sub Dag, in my case, will contain 16 tasks executed in parallel: ```bash # Sub Dag -|-> Task 0 -|-> |-> Task 1 -| |-> Task 2 -| |-> ... -| |-> Task 15 -| ``` The result, if we opened the "Sub Dag" blackbox: ```bash Initial Task -|-> Task 0 -|-> Final Task |-> Task 1 -| |-> Task 2 -| |-> ... -| |-> Task 15 -| ``` Now let's see now I can create the tasks and join them with the main DAG: ```python # Remember: tasks are operators, so we need to "cast" our DAG as an operator. # We can use the SubDagOperator for this: tasks = SubDagOperator( subdag=tasks_dag, task_id=SUB_DAG_NAME, dag=main_dag) ``` Now let's populate our sub-DAG with some bash commands: ```python for x in range(0, 20): BashOperator( task_id="task%d" % (x), bash_command="sleep $(( RANDOM %% 10 )); echo 'trying task %d'; exit $(( (RANDOM %% 3) == 0 ))" % (x), retries=10, dag=tasks_dag) ``` There we go. Our new DAG is ready. Now we need to set up Cloud Composer. Setting it up is very simple - just follow the tutorial and you'll be up and running in no time. All you have to do is set up the cluster (it's easy - trust me) and upload the DAG file. Let's have a look at the webserver. This is the graph view: ![Composer Webserver - second dag graph view](
Composer Webserver - second dag graph view
To see what is inside the Sub Dag, we have to zoom into it: ![Composer Webserver - second dag subdag graph view](
Composer Webserver - second dag subdag graph view
Looks nice. Now let's see the tree view for all the days I backfilled: ![Composer Webserver - second dag tree view](
Composer Webserver - second dag tree view
Now this is awesome! We can clearly see the tasks that failed (red), are being retried (yellow), are still running (light green), and succeeded (dark green)! # last thing Bash scripts are cool and everyone who disagree are wrong, but you probably won't be launching jobs from bash scripts inside the Airflow pod/container/machine. What else can I do? Well, if you are really OK with launching scripts from your Airflow pod/container/machine, you can use the `PythonOperator` to assign a python function as a task: ```python def greeting(): print('Hello!') PythonOperator( task_id='hello', python_callable=greeting) ``` Still, I don't like this because I don't feel like rewriting my entire ETL jobs in python and stuffing them inside my Airflow pods. If you are using Google Cloud Products, Composer gives us operators especially made for launching tasks with these products, such as **Dataflow**, **Dataproc**, **Datstore**, etc. Take a look: Google Cloud Platform Operators. ```python # Launching a Dataflow task from Airflow task = DataFlowJavaOperator( gcp_conn_id='gcp_default', task_id='normalize-cal', jar='{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar', options={ 'autoscalingAlgorithm': 'BASIC', 'maxNumWorkers': '50', 'start': '{{ds}}', 'partitionType': 'DAY' }, dag=dag) ``` Now this is more interesting, huh? But if this is still not enough, Airflow has a `KubernetesPodOperator`! This operator allows you to launch Kubernetes Pods as tasks! ```python from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator KubernetesPodOperator( task_id='pod-ex-minimum', name='pod-ex-minimum', namespace='default', image='') ``` This will allow you to launch any task you want, in any language you want. Now things got serious! Node.js, Rust, C? No problem. Just launch the pod and Airflow will check its exit status!