Airflow Webserver - DAGs view
This is a lot more interesting. Look! Our DAG is right there at the end! Let's click on it and see what happens!

Airflow Website - sampledag tree view
This is awesome: it shows the three runs that we had before and their status (dark green = success)!
That is the **Tree view**. Let's take a look at the **Graph view**:

Airflow Webserver - sampledag graph view
Very nice. Let's try something more advanced now.
# Second DAG
My second dag will be more like the task tree I described in the beginning of the article (Task A, B, and C). This time I will also deploy my DAG in Google Composer instead of locally.
This is my new DAG:
```python
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.subdag_operator import SubDagOperator
from datetime import datetime, timedelta
args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 8, 10),
'email': ['test@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
PARENT_DAG_NAME='my_main_dag'
SUB_DAG_NAME='my_sub_dag'
with DAG(PARENT_DAG_NAME, schedule_interval=timedelta(days=1), default_args=args) as main_dag:
initial = BashOperator(
task_id='initialtask',
bash_command='sleep $(( RANDOM % 10 )); echo "trying inital task"; exit $(( (RANDOM % 3) == 0 ))',
retries=10,
dag=main_dag)
tasks_dag=DAG('%s.%s' % (PARENT_DAG_NAME, SUB_DAG_NAME), default_args=main_dag.default_args , concurrency=3)
tasks = SubDagOperator(
subdag=tasks_dag,
task_id=SUB_DAG_NAME,
dag=main_dag)
for x in range(0, 20):
BashOperator(
task_id="task%d" % (x),
bash_command="sleep $(( RANDOM %% 10 )); echo 'trying task %d'; exit $(( (RANDOM %% 3) == 0 ))" % (x),
retries=10,
dag=tasks_dag)
final = BashOperator(
task_id='finaltask',
bash_command='sleep $(( RANDOM % 10 )); echo "trying final task"; exit $(( (RANDOM % 3) == 0 ))',
retries=10,
dag=main_dag)
initial >> tasks >> final
```
The first and last tasks should be no surprise to you, except the command. What is that command doing? That command is simply generating a random number of seconds for the task to sleep (so it will feel more realistic) and has a 30% chance of failing (exiting with code 1).
Now, what is that second part with **tasks_dag**, **tasks**, and the **for** loop? Let's take a look:
```python
tasks_dag=DAG('%s.%s' % (PARENT_DAG_NAME, SUB_DAG_NAME), default_args=main_dag.default_args , concurrency=3)
```
Notice that I am setting `concurrency=3` for that DAG. This means that it will only run 3 tasks in parallel at a time!
Why setting "concurrency"?Here I am creating a new DAG, which will be a sub-DAG. This is a great way to fork tasks. A top level look would be like this: ``` Initial task -> Sub Dag -> Final task ``` The Sub Dag, in my case, will contain 16 tasks executed in parallel: ```bash # Sub Dag -|-> Task 0 -|-> |-> Task 1 -| |-> Task 2 -| |-> ... -| |-> Task 15 -| ``` The result, if we opened the "Sub Dag" blackbox: ```bash Initial Task -|-> Task 0 -|-> Final Task |-> Task 1 -| |-> Task 2 -| |-> ... -| |-> Task 15 -| ``` Now let's see now I can create the tasks and join them with the main DAG: ```python # Remember: tasks are operators, so we need to "cast" our DAG as an operator. # We can use the SubDagOperator for this: tasks = SubDagOperator( subdag=tasks_dag, task_id=SUB_DAG_NAME, dag=main_dag) ``` Now let's populate our sub-DAG with some bash commands: ```python for x in range(0, 20): BashOperator( task_id="task%d" % (x), bash_command="sleep $(( RANDOM %% 10 )); echo 'trying task %d'; exit $(( (RANDOM %% 3) == 0 ))" % (x), retries=10, dag=tasks_dag) ``` There we go. Our new DAG is ready. Now we need to set up Cloud Composer. Setting it up is very simple - just follow the tutorial and you'll be up and running in no time. All you have to do is set up the cluster (it's easy - trust me) and upload the DAG file. Let's have a look at the webserver. This is the graph view: 
I find "concurrency" useful if I want to run my jobs all in parallel, but there is a limit to the number of jobs running at the same time.
Let's say, for example, that you have 16 different Google Dataflow jobs (a managed service for batch/streaming tasks) that will run once a day. Since these jobs are independent, you can run them all at once. However, there is a problem: you find out that there is a limit on the number of Dataflow jobs, and since you already have some jobs running, you find out that it is only safe to have 3 more jobs running at the same time :(
Well, now you have two options:
1- You can organize the DAG so it runs 3 jobs first, and when they are all done, you run three more, and then three more, etc
2- You can tell your DAG to run them in parallel, but having no more than 3 jobs running at once
The first option is not very good: it's harder to maintain, and it wastes time. It wastes time because one job may finish faster than the other two (say, one job takes 1 hour, the second one takes 1 hours, and the third one takes 3 hours, but for the next batch to start, all of them must be done.
The second option is a lot better. You can tell Airflow to run a maximum of 3 jobs at a time by defining a "concurrency". It will start three jobs, and as soon as one job is done, it will start another one.
Say our jobs are Job A, B, C, D, E, etc. We will start with jobs A, B, and C. A finishes in 1 hour, B in 2, and C in 3 hours. The other jobs will follow the same pattern (1h, 2h, 3h). Let's try visualizing what this would look like hour by hour.
First option:
A B C -> _ B C -> _ _ C -> D E F -> _ E F -> _ _ F -> G H I -> ...
Second option:
A B C -> D B C -> E F C -> G F H -> I J H -> ...
The second option will make it finish much faster because we won't have "gaps" between the jobs!
Composer Webserver - second dag graph view
To see what is inside the Sub Dag, we have to zoom into it:

Composer Webserver - second dag subdag graph view
Looks nice. Now let's see the tree view for all the days I backfilled:

Composer Webserver - second dag tree view
Now this is awesome! We can clearly see the tasks that failed (red), are being retried (yellow), are still running (light green), and succeeded (dark green)!
# ...one last thing
Bash scripts are cool and everyone who disagree are wrong, but you probably won't be launching jobs from bash scripts inside the Airflow pod/container/machine. What else can I do?
Well, if you are really OK with launching scripts from your Airflow pod/container/machine, you can use the `PythonOperator` to assign a python function as a task:
```python
def greeting():
print('Hello!')
PythonOperator(
task_id='hello',
python_callable=greeting)
```
Still, I don't like this because I don't feel like rewriting my entire ETL jobs in python and stuffing them inside my Airflow pods.
If you are using Google Cloud Products, Composer gives us operators especially made for launching tasks with these products, such as **Dataflow**, **Dataproc**, **Datstore**, etc. Take a look: Google Cloud Platform Operators.
```python
# Launching a Dataflow task from Airflow
task = DataFlowJavaOperator(
gcp_conn_id='gcp_default',
task_id='normalize-cal',
jar='{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar',
options={
'autoscalingAlgorithm': 'BASIC',
'maxNumWorkers': '50',
'start': '{{ds}}',
'partitionType': 'DAY'
},
dag=dag)
```
Now this is more interesting, huh? But if this is still not enough, Airflow has a `KubernetesPodOperator`! This operator allows you to launch Kubernetes Pods as tasks!
```python
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
KubernetesPodOperator(
task_id='pod-ex-minimum',
name='pod-ex-minimum',
namespace='default',
image='gcr.io/gcp-runtimes/ubuntu_16_0_4')
```
This will allow you to launch any task you want, in any language you want. Now things got serious! Node.js, Rust, C? No problem. Just launch the pod and Airflow will check its exit status!