خوب فرصتی دست داد تا راجع به تجربه کاری که در مورد Airflow داشتیم، بنویسم. بسیاری از راهنماهای کاربری و آموزش ها صرفا به جنبه های و ویژگی های گوناگون مورد آموزش می پردازند. بعضی وقت ها اگر بدانیم یک تکنولوژی به درد چه کار هایی نمی خورد، به مراتب راه گشاتر از دانستن موارد مناسب بکارگیری آن است. چرا که استفاده از یک تکنولوژی در جای نادرست می تواند منجر به افزایش هزینه تولید محصول شود.
اخیرا در یکی از پروژه ها می بایست یک ETL پیاده سازی کنیم. در حقیقت نیاز به Fetch مجموعه متنوعی داده، دستکاری آن و در نهایت ذخیره سازی در پایگاه داده داشتیم. یکی از کاندیداهای پیاده سازی ETL نرم افزار Airflow بود. چرا؟ چون وقتی کلمات کلیدی نظیر etl python و یا data pipeline python رو جستجو کنید هزار و شونصد تا پست میاد و میگه که میشه با Airflow یک پایپلاین دیتا راه اندازی کرد. برای تیم ما مهم بود تکنولوژی که استفاده می کنیم برای این کار مبتنی بر پایتون باشد (که اشتباه می کردیم، حتی اگر نصفه و نیمه پایتون رو هم پشتیبانی می کرد کافی بود). حتی شما وقتی etl airflow رو هم جستجو کنید به تعداد بسیاری مقاله می رسید که به شما می فهمانند که Airflow برای این کار هست. این طور شد که به دام افتادیم!
در مستندات Airflow می خوانیم:
نرم افزار Airflow یک پلتفورم است که امکان ساخت، زمانبندی و مانیتور گردشهای کاری را از طریق برنامه نویسی بدست می دهد.از Airflow می توان برای نوشتن گردش کار به شکل یک گراف بدون دور جهت دار (DAG) از وظایف استفاده کرد. زمانبند Airflow در حالی که وابستگی های مشخص شده در گراف را دنبال می کند، وظایف شما را در قالب یک آرایه از گردش های کاری اجرا می کند.
به بیان دیگر شما وظایف را در قالب یک تابع به زبان پایتون پیاده سازی می کند، از طرف دیگر با همین زبان وابستگی بین وظایف از نظر اجرا را مشخص می کنید که در نتیجه آن گردش کاری تعریف می شود. در نهایت زمابند Airflow این گردش های کاری را اجرا طبق زمانبندی اجرا می کند. یعنی یک Job Scheduler که وابستگی زمانی اجرای بین Jobها رو هم در نظر میگیره. برای یک ETL که قرار هست یه سری کار پشت سر هم روی دیتا اعمال بشه چی از این بهتر!
این پلتفرم یک رابط کاربری در اختیار شما قرار می دهد تا گردش های کاری را مانیتور کنید، اجرای گردش کاری را متوقف و یا آغاز نمایید و یا اقدام به عیب یابی در صورت بروز خطا کنید. Airflow همچنین اولین پیاده سازی موفق از یک الگوی مفید و انعطاف پذیر گردش کاری به شکل کد (Workflow-as-Code) است.
اما نکته مهمی که باید به آن توجه کرد این است که Airflow ابزار خوبی برای طراحی گردش های کاری با حرکت کند و اجرای ایستا در یک زمانبندی ثابت است. یعنی
در این مقاله قصد آموزش Airflow رو ندارم؛ تا دلتون بخواد منابع خوب برای یادگیری Airflow روی وب پیدا میشه؛ مستندات خوبی هم داره.
بعد از تجربه تلخ بکارگیری Airflow برای راه اندازی یک ETL، و در نهایت کنار گذاشتن آن، اخیرا به مقاله ای بر خوردم که به طور فنی به این مساله پرداخته است که «چرا Airflow نه؟». برای دانستن این مساله بهتر است به مقاله مراجعه فرمایید. این مقاله نسبتا طولانی است و در این پست بنده بخشی از مقاله را به صورت خلاصه، بخصوص مواردی که ما در پروژه آن را تجربه کرده ایم، برجسته کرده ام.
در مقاله یاد شده آمده است:
کاربران غالباً با چپاندن مورد استفاده خود در مدل Airflow خودشون رو به دردسر می اندازند. (ما رو میگه! ) برای نمونه از مواردی که Airflow نمی تواند به طور قابل قبولی برآورده کند می توان به موارد زیر اشاره کرد:۱. پیاده سازی DAG هایی که بخواهیم آن ها رو بدون زمانبندی یا خارج از زمانبندی اجرا کنیم (ما به این مشکل خوردیم. در مورد اجرای یک DAG بر اساس تقاضا – OnDemand DAG)
۲. اجرای DAG ها به صورت موازی با زمان شروع یکسان
۳. DAG های حاوی انشعابات با منطق های پیچیده
۴. DAG های با تعداد زیاد وظایف سریع
۵. DAG هایی که بر روی تبادل داده بین وظایف تکیه دارند (این بزرگترین مشکل ما در جابجا کردن دیتا بین وظایف بود، هم از نظر حجم و هم از نظر نوع)
۶. DAG های پارامتری
۷. DAG های پویا
معمولا شرکت های متوسط-تا-بزرگ برای این که بر موارد فوق فائق بیان در نهایت مجبور میشن یک DSL سفارشی شده بنویسند یا پلاگین های بخصوصی رو پیاده سازی کنند تا این دست نیازمندی ها را رفع کند. در نهایت ارتقاء سیستم سخت میشه و سربار نگهداری به طور چشمگیری افزایش پیدا می کند. این یعنی بهتره دنبال ابزار بهتری باشید کاری که ما در نهایت مجبور به انجام آن شدیم.
در ادامه مقاله به نقایص بخش های مختلف Airflow پرداخته که در اینجا تنها به برخی از موارد تجربه شده توسط خود ما اشاره می شود.
این یک دام است، لطفا گول نخورید.
با توجه به آنچه که در مقدمه گفته شده، یکی از متداول ترین موارد استفاده از Airflow ایجاد نوعی خط لوله داده است. این خیلی مسخره است؛ و واقعا من موندم چرا اینقدر این مورد تبلیغ میشه. چون اصلا Airflow از جریان داده به طور خیلی خوب (و یا حتی خوب) پشتیبانی نمی کنه.
یه امکانی به اسم XCom، (چه اسم خفنی) برای تبادل قطعات کوچک متادیتا بین وظایف ارائه کرده است. بعدها از روی بررسی schema دیتابیسی که Airflow داره ازش استفاده می کنه متوجه شدیم این XCom در قالب یک جدول پیاده سازی شده! این یعنی اگر شما نیاز به تبادل داده زیاد بین وظایف داشته باشید، یکی از جدول های پایگاه داده شما مدام در حال نوشته شدن و خوانده شده است. عجب! استفاده از جدول به عنوان فضای اشتراکی بین وظایف!
حالا ما چطور می خواستیم از این امکان برای تبادل دیتا (ی بخصوص حجیم) بین وظایف استفاده کنیم. قاعدتا باید یک فضای ذخیره سازی دیگری می داشتیم تا داده ها را در اون ذخیره کنیم و بهشون ID و یا آدرس بدیم و بجای داده این ID یا آدرس را از طریق XCom بین وظایف جابجا کنیم!!!
معایب این مکانیزم:
def puller(**kwargs): ti = kwargs['ti'] # get value_1 v1 = ti.xcom_pull(key=None, task_ids='push')
مشکل دیگه این که تازه کارها، پایگاه داده خود را به خاطر استفاده بیش از حد از XCom از بین می برند. دیده شده کسی یک داده کوچک (۱۰ گیگابایتی!) ایجاد کرده و با کمک XCom آن را بین چند وظیفه جابجا کرده است. حالا اگر ۱۰ تا وظیفه اینجا داشته باشیم هر اجرا ۱۰۰ گیگابایت داده دائمی را در پایگاه داده فراداده XCom ذخیره می کند!!!
یه تعداد محدودیت دیگه هم در مقاله اشاره شده که در ادامه به چندتا از آن ها به طور خلاصه اشاره کردم.
یکی از گیج کننده ترین موارد برای تازه واردها به دنیای Airflow مساله زمانبندی است. مقاله جزئیاتی در مورد این امکان بیان می کند؛ به طور خلاصه اگر بخواهید
راه حل Airflow ابزار درستی برای کارتان نخواهد بود.
ستون فقرات Airflow سرویس زمانبند آن است؛ سرویس زمانبند مسئول موارد زیر است:
سرویس زمانبند Airflow از مشکلاتی رنج می برد، که می توان به موارد زیر اشاره کرد:
یکی دیگه از مسایلی که ما تو کار باهاش مواجه شدیم این بود که DAG ها باید به صورت ایستا تعریف می شدند و امکان اجرای پارامتری آن وجود ندارد. بنابراین برای انجام یه کار مشخص برای منابع دیتای مختلف، API های گوناگون باید DAG های مختلفی با بدنه یکسان تولید می کردیم.
خیلی خوب میشه اگر گردش کاری داشته باشیم که بتونه به ورودی های مختلف پاسخ بده. یه گردش کار ممکنه مراحلی رو داشته باشه که بشه اون ها رو برای اطلاعاتی که از API ها، دیتابیس ها و یا ID های مختلف میاد تکرار کنه. یعنی یه منطق پردازشی یکسان داشته باشیم برای ورودی های مختلف.
این مدل جزء الگوهای اصلی Airflow نیست؛ با این حال با توجه به این که پوشه DAG ها هر چند ثانیه مجددا چک می شود، می تونیم با استفاده از متغیر ها در Airflow این الگو رو پیاده سازی کنیم.
بعضی وقت ها نیاز هست یه وظیفه مشخص در گردش کار به تعداد نامعلومی تکرار شود. برای مثال فرض کنید وظیفه A یه کوئری به دیتابیس میزنه و فهرست مشتریان جدید رو بازیابی می کنه بعد قرار هست CustomerID بره برای وظیفه دیگه ای که یه سری پردازش روش انجام بشه. تنها گزینه پیاده سازی این سناریو در Airflow پیاده سازی یک وظیفه downstream هست که لیستی از ID ها رو به عنوان ورودی بگیره و با یه حلقه روی اون یه تعداد عملیات انجام بده. مشکلش چیه:
امکان تخصیص شماره نسخه به DAG به صورت داخلی در Airflow در نظر گرفته نشده است. اگر بخواهید نسخه های قدیمی DAG خود را داشته باشید باید با نام دیگری آن را ذخیره کنید. از طرف دیگه رابط کاربری اصلا چیزی راجع به نسخه DAG شما نمی دونه. در عمل شما باید DAG هاتون رو توی Git مدیریت کنید و در Airflow از روش های قدیمی تر مثل اضافه کردن شماره نسخه به نام فایل DAG برای مشخص کرده نسخه DAG استفاده کنید. یعنی مدیریت نسخه اینقدر فشل!
ما به جایگزین به مراتب بهتری برای راه اندازی یک خط لوله داده رسیده ایم. این جایگزین ذاتا برای این کار طراحی شده است. ما از Apache NiFi برای دریافت داده، پردازش آن و همچنین ذخیره سازی در پایگاه داده استفاده کردیم.
مزایای استفاده از این ابزار عبارت است از:
ان شاءالله اگر عمری باقی بود، جزئیات بیشتری در پست های بعدی در مورد نای فای ارائه خواهد شد.
توازن بار با کمک HAProxy
رابط کاربری خط فرمان سفارشی
این نوشتار پیشتر در سفر خاک منتشر شده است.