شروع ساده با Apache Airflow

آپاچی ایرفلو دوست داشتنی :)
آپاچی ایرفلو دوست داشتنی :)


وقتی تصمیم به خودکار سازی اجرای برنامه های مختلف و تکرارپذیر داشته باشید و ساده ترین راه ایجاد یک دستور در cronjob هستید، اما وقتی کم کم و طی زمان تعداد جاب ها و شرایط مختلف آن افزایش می یابد، مدیریت و بررسی جاب های مختلف سخت و در مواردی غیر ممکن خواهد شد در این زمان استفاده از آپاچی airflow توصیه می شود :)، به خصوص برای تیم های داده، که راه اندازی پایپلاین ها، ETL ها و... موضوع حیاتی هستش، ایرفلو به صورت خالص پایتونیه و برای همین کار باهاش راحته، برای شروع روش نصب و راه اندازی ایرفلو به وارد کردن دستورات زیر شروع می کنیم:

export AIRFLOW_HOME=~/airflow
pip install apache-airflow
airflow initdb
airflow webserver -p 8080 &
airflow scheduler &

با انجام این دستورات، رابط کاربری گرافیکی ایرفلو بر روی پورت 8080 و اسکجلر ایرفلو راه اندازی می شوند و میتوانید بر روی پورت 8080 رابط کاربری را ببینید:

رابط کاربری ایرفلو
رابط کاربری ایرفلو

برای شروع به کار بهتر است از طریق آموزش های خود سایت ایرفلو نسبت به نحوه نوشتن تسک ها آشنایی کافی داشته باشیم، اما برای شروع کد تستی به شکل زیر در نظر می گیریم:

ابتدا تسک اول انجام شود، سپس تسک دوم در صورت موفقیت تسک اول و در نهایت تسک های سوم و چهارم مستقلا در صورت موفقیت تسک دوم
ابتدا تسک اول انجام شود، سپس تسک دوم در صورت موفقیت تسک اول و در نهایت تسک های سوم و چهارم مستقلا در صورت موفقیت تسک دوم

در مسیر:

~/airflow/dags/

فایل پایتونی با نام simple_bash.py با محتوای زیر ایجاد می کنیم:

from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
dag = DAG(
    'simple_bash',
    default_args=default_args,
    description='A simple bash DAG',
    schedule_interval=timedelta(days=1),
)

t1 = BashOperator(
    task_id='echo1',
    bash_command='echo &quotsimple task! by dag&quot ',
    dag=dag,
)

t2 = BashOperator(
    task_id='echo2',
    bash_command='echo &quotsimple task! by dag second step&quot ',
    dag=dag,
)

t3 = BashOperator(
    task_id='echo3',
    bash_command='echo &quotsimple task! by dag third step&quot ',
    dag=dag,
)

t4 = BashOperator(
    task_id='echo4',
    bash_command='echo &quotsimple task! by dag fourth step, concurrent&quot ',
    dag=dag,
)

t1 >> t2 >> [t3,t4] # declare dependencies between tasks

و در نهایت با بررسی مسیر و استفاده از دستور زیر لیست دگ هارو خواهیم دید:

airflow list_dags
همونجور که مشخصه، simple_bash که دگ ایجاد شده توسط ماست در لیست قرار داره.
همونجور که مشخصه، simple_bash که دگ ایجاد شده توسط ماست در لیست قرار داره.

سپس از طریق رابط گرافیکی، simple_bash را روشن می کنیم:

و حالا از طریق graph view می تونیم خروجی کار رو ببینیم:

فلو اجرایی مورد نظر ما
فلو اجرایی مورد نظر ما

انجام دادن تسک های بیشتر و کارهای بیشتر و اسکجل کردن نیز با همین مسیر امکان پذیره و برای داکیومنت های بیشتر سایت آپاچی ایرفلو پیشنهاد میشه.