Basic Airflow in Python

Airflow: platform to program workflows.

DAG: workflow made up of tasks with dependencies.

Define a DAG in Python:

from airflow.models import DAG
from datetime import datetime

#create a default arguments dictionary (optional)
default_arguments = {
     'owner': 'massy',
     'startdate': datetime(2021,8,4)     #earliest datetime DAG could be run
}

#define the DAG object
etl_dag = DAG('etl_workflow', default_args = default_arguments)

The Airflow configurations and settings are in the airflow.cfg file.

Operators

Most common Airflow task: Operator.

Bash Operator

from airflow.operators.bash_operator import BashOperator

example_task = BashOperator(
     task_id='example',
     bash_command='example.sh',
     dag=etl_dag)

Python Operator

from airflow.operators.python_operator import PythonOperator

def printme():
     print("well done!")

python_task = PythonOperator(
     task_id='print',
     #call the Python function
     python_callable=printme,
     dag=etl_dag)

To implement keyword arguments with the Python Operator, we define an argument on the task called op_kwargs. This is a dictionary consisting of the named arguments for the intended Python function.
Example of a task to download and save a file to the system within Airflow:

def pull_file(URL, savepath):
    r = requests.get(URL)
    with open(savepath, 'wb') as f:
        f.write(r.content)   
    #print method for logging
    print(f"File pulled from {URL} and saved to {savepath}")

from airflow.operators.python_operator import PythonOperator

#Create the task
pull_file_task = PythonOperator(
    task_id='pull_file',
    python_callable=pull_file,
    #Define the arguments
    op_kwargs={'URL':'http://dataserver/sales.json', 'savepath':'latestsales.json'},
    dag=etl_dag
)

Email Operator

from airflow.operators.email_operator import EmailOperator

email_task = EmailOperator(
     task_id='Send_Email',
     to='manager@example.com',
     subject='Automated Daily Report',
     html_content='Attached is the daily report',
     files='report.xlsx',
     dag=etl_dag)

Sensor

Special type of operator that waits for a certain condition (ex. creation of a file, upload a database record, a response from a web request) to be true. We define how often to check for the condition to be true.
Sensor are assigned to tasks like normal operators.
Arguments:

  • mode -> how to check for the condition

  • mode='poke' -> default, run repeatedly

  • mode='reschedule' -> give up task slot and try again later

  • poke_interval -> how often to wait between checks

  • timeout -> how long to wait before failing task

Ex. File Sensor (check for the existence of a file in a location):

from airflow.contrib.sensors.file_sensor import FileSensor

file_sensor_task = FileSensor(task_id='file_pres',                              
     filepath='data.csv',
     poke_interval=300,
     dag=etl_dag)

Other sensors:

  • ExternalTaskSensor -> wait for a task in another DAG to complete

  • HttpSensor -> request a web URL and check for content

  • SqlSensor -> run a SQL query to check for content

Dependencies

Task dependencies define a given order of task completion and are referred to upstream (>>, before) or downstream (<<, after) tasks.

task1 = BashOperator(
     task_id='first',
     bash_command='example1.sh',
     dag=etl_dag)

task2 = BashOperator(
     task_id='second',
     bash_command='echo ok',
     dag=etl_dag)

#define the task order
task1 >> task2

#same would be task2 << task1

Now task1 will be execute firstly and, after its completion, task2.
You can also mix upstream and downstream operators in the same workflow:

task1 >> task2 << task3

#is the same to this
task1 >> task2
task3 >> task2

Now task1 and task3 will be execute firstly and, after the completion of both, task2 will start.

Schedule

For scheduling we can use the Unix CRON syntax:

image.png

So:
* * * * * means run every seconds;
0 12 * * * means run daily at noon;

from datetime import timedelta

default_args = {
  'owner': 'massy',
  'start_date': datetime(2021, 8, 4),
}

#run every friday at 12.30 pm
dag = DAG('update_dataflows', default_args=default_args, schedule_interval='30 12 * * 5')

Run DAGs

To run a specific task in shell:
airflow run [dag id] [task id] [start date]

airflow run etl_dag move_file 2021-08-04

To run a full DAG:
airflow trigger_dag -e [date] [dag_id]

airflow trigger_dag -e 2021-08-04 etl_dag

Some other shell command for airflow:

  • airflow -h -> description

  • airflow list_dags -> show all recognized DAGs, the executor used and if there are errors

  • airflow scheduler -> run the scheduler

Executors

Executors run tasks.
Some executors:

  • SequentialExecutor -> default, runs only a single task at the time

  • LocalExecutor -> runs on a single system, can execute multiple tasks in parallel

  • CeleryExecutor -> multiple Airflow systems can be configured as workers for a given set of workflows/tasks. Really powerful
    It's also possible to create executors.

You can determine your executor by looking the airflow.cfg file and search for the executor= line, it will specify the executor in use

cat airflow/airflow.cfg | grep "executor = "

# executor = SequentialExecutor

SLAs

SLA is the amount of time a task or a DAG should require to run.
An SLA Miss is any time the task/DAG doesn't meet the expected timing.
If a SLA is missed a log is stored.
You can define SLAs using the sla argument on the task, or on the default_args dictionary, where you can also set to send emails after failure or success.

from datetime import timedelta

#sla argument
task1 = BashOperator(task_id='sla_task',
     bash_command='example.sh',
     sla=timedelta(minutes=30),
     dag=etl_dag)

#default_args dictionary
default_args={
     'sla': timedelta(minutes=20),
     'start_date': datetime(2020,2,20),
     'email': 'massy@example.com',
     'email_on_failure': True,
     'email_on_success': True
}

dag = DAG('sla_dag', default_args=default_args)

Variables

Airflow built-in runtime variables. Some examples:

  • {{ ds }} -> execution date YYYY-MM-DD

  • {{ ds_nodash }} -> YYYYMMDD

  • {{ dag }} -> DAG object

Templates

Templates allow substitution of information during a DAG run, so you can add flexibility when defining task.
Templates are created using the jinja templating language.

#iterate the same operation for multiple file
templated_command = """
{% for filename in params.filenames %}
     echo "Processing {{ filename }}
{% endfor %}
"""

t1 = BashOperator(
     task_id='template_task',
     bash_command=templated_command,
     params={'filenames'= ['example1.txt', 'example2.txt', 'example3.txt']},
     dag=etl_dag)


#another example
filelist = [f'file{x}.txt' for x in range(30)]

templated_command = """
     <% for filename in params.filenames %>
     bash example.sh {{ ds_nodash }} {{ filename }};
     <% endfor %>
"""

clean_task = BashOperator(task_id='example_task',
                          bash_command=templated_command,
                          params={'filenames': filelist},
                          dag=example_dag)

Branching

For conditional logic, takes a python_callable to return the next task id to follow.

from airflow.operators.python_operator import BranchPythonOperator

#different executions in even or odd date
def branch_to_use(**kwargs):
     if int(kwargs['ds_nodash']) % 2 == 0:
          return 'even_task'
     else:
          return 'odd_task'

branch_task = BranchPythonOperator(
     task_id='branch_task',
     provide_context=True,
     python_callable=branch_to_use,
     dag=etl_dag)

#define the dependencies in both cases
start_task >> branch_task >> even_task >> even_day_task2
branch_task >> odd_task >> odd_day_task2

A full example

from airflow.models import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.email_operator import EmailOperator
from dags.process import process_data
from datetime import datetime, timedelta

#Update the default arguments and apply them to the DAG.
default_args = {
  'start_date': datetime(2021,8,6),
  'sla': timedelta(minutes=90)
}

ex_dag = DAG(dag_id='etl_up', default_args=default_args)

#check for a file
sensor = FileSensor(task_id='sense_file', 
                    filepath='/home/repl/workspace/startprocess.txt',
                    poke_interval=45,
                    dag=ex_dag)

#exec a command
bash_task = BashOperator(task_id='cleanup_tempfiles', 
                         bash_command='rm -f /home/repl/*.tmp',
                         dag=ex_dag)

#exec a python script
python_task = PythonOperator(task_id='run_processing', 
                             python_callable=process_data,
                             provide_context=True,
                             dag=ex_dag)

#email
email_subject="""
  Email report for {{ params.dep }} on {{ ds_nodash }}
"""

email_task = EmailOperator(task_id='email_task',
                                  to='massy@example.com',
                                  subject=email_subject,
                                  html_content='',
                                  params={'dep': 'Data Mining department'},
                                  dag=ex_dag)

#no email
no_email_task = DummyOperator(task_id='no_email_task', dag=ex_dag)

#if weekend don't send email
def check_weekday(**kwargs):
    dt = datetime.strptime(kwargs['execution_date'],"%Y-%m-%d")
    if (dt.weekday() < 5):     #mon-fri
        return 'email_report_task'
    else:     #sat-sun
        return 'no_email_task'

branch_task = BranchPythonOperator(task_id='check_day',
                                   provide_context=True,
                                   python_callable=check_weekday,
                                   dag=ex_dag)


#define dependencies  
sensor >> bash_task >> python_task

python_task >> branch_task >> [email_task, no_email_task]