استفاده از celery و sqalchemy با فکتوری‌های فلسک

فلسک یه میکروفریم‌ورک بسیار جذاب و باحاله. مهم‌ترین چیز در موردش این هست که می‌تونی به سادگی چیزی که میخوای رو به سرعت باهاش تولید کنی، و هیچ کد اضافی‌ای داخلش نیست. همه چیز به همون شکل و فرم‌ای هست که خودت میخوای.

الزامی نیست که حتما پروژه رو به فلان شکل بخصوص بسازی. متودولوژی‌ها و غیره و ذلک همه به خود شخص بستگی داره.

این قضیه، هم خوبه و هم بد. خیلی بد و ترسناکه چون اگر ندونی که داری چیکار میکنی، ممکنه به صورت جد به مشکل بخوری و نتونی محصولی تولید کنی.

در ادامه، شما حتما با celery هم آشنا هستید. اگر نه، خیلی خلاصه براتون به این صورت بگم که یه پراسس هست که شما کارهایی که طول می‌کشه رو بهش میدید تا براتون انجام بده، بدون اینکه برنامتون درگیر بشه. یه مثال بعدا براتون می‌زنم که دقیقا بدونید جریان چیه و منظورمون چیه.

در نهایت، sqlalchemy هم یه orm هستش که ازش استفاده می‌کنیم تا کار کردن با دیتابیس برامون راحت‌تر و به صورت ابجکت‌اورینتد باشه، امن‌تر باشه و نیاز نباشه کوری بنویسیم. خلاصه کلی کاربرد که بهتره همینجا ببندیمش و درگیرش نشیم بیشتر!

نکته: این مطلب برای کسانی‌هست که با همهٔ موارد بلدن کار بکنن و فقط داره این شیوهٔ بخصوص استفاده از اونها در کنار هم رو بررسی می‌کنه و به هیچ عنوان به عنوان آموزش اولیه کاربرد نداره.

اکسپورت ریپورت!

برای پروژهٔ شرکت، نیاز هستش که یه سیستم ریپورتر داشته باشیم. کاربر بتونه گزارش‌های مختلف رو ببینه و دسته‌بندی کنه و چیزهای مثل اون. خوب، طبیعی هستش که یکی از نیازهایی که همراه همچین چیزی پیش میاد، امکان اکسپورت کردن(Export) یا دانلود کل گزارش در یک فایل هستش.

پروژهٔ ما گاها حتی تا چند میلیون خط هم گزارش باید جنریت میکرد و کوری‌های سنگینی باید اجرا می‌شد. بنابراین ممکن بود شما بعد از کلیک روی لینک اکسپورت، مجبور باشید ۱۰-۱۲ ثانیه منتظر باشید یا حتی ممکن بود با ارور ۵۰۲ بخورید. مسئله این هست که این کار طول می‌کشه و احمقانه‌ست که کاربر رو منتظر بذاریم تا بخواد بیاد نتیجه رو ببینه.

کاری که می‌کنیم این هست که دستور اکسپورت رو می‌گیریم، سریعا به کاربر اطلاع می‌دیم که افتادیم دنبال کارهاش و بعدا میتونه سر بزنه ببینه وضعیت گزارش مد نظرش چی هست یا اینکه خودمون بهش ایمیلی چیزی بزنیم و خبرش کنیم که کارش انجام شده.

در عین حال، درخواست کاربر رو به سلری می‌دیم تا کار رو برامون انجام بده.

امروز، بعد از حدود دو ماه از تعریف این پروژه و برنامه ریزی، تسک انجام این کار به من داده شد. نیاز بود که در بستر فلسک و sqlalchemy، من سلری رو اجرا کنم و تسک‌ها رو به سیستم بدم تا برام انجام بده.



انجام تسک

نترسید!

نمی‌خوام خط به خط و ریز به ریز توضیح بدم که چیکار کردم! به نوشتن کلیت کاری که صورت گرفته تا پروژه به درستی کار کنه بسنده می‌کنم و امیدوارم که بتونه مشکلی از کسی حل کنه و کمی بندازتش جلو.

ساختار فعلی پروژه

├── deploy.py
├── __init__.py
└── project
├── application.py
├── apps
│   ├── auth
│   │   └── __init__.py
│   ├── __init__.py
│   └── report
│       ├── __init__.py
│       ├── models.py
│       └── views.py
├── config.py
├── extensions.py
    └── __init__.py


توی چند خط خیلی سریع توضیح می‌دم که هر کدوم از این فایل‌ها چیکار میکنن

فایل deploy فایلی هستش که برای اجرای اپ فلسک ازش استفاده می‌کنیم. application فایلی هست که تابع‌های مورد نیاز برای ساختن یه اپ flask توش قرار داره. به قول معروف تابع `create_app` اینجاست. فایل extensions که شامل پلاگین‌هایی هستش که برای فلسک نصب کردیم. پوشهٔ apps هم شامل app هایی هستش که توی اپلیکیشن وجود دارن. بعدا در موردشون بیشتر حرف می‌زنیم. نهایتا فایلی config جایی هست که تنظیمات اپلیکیشن درش قرار دارند.

این پروژه به خودی خود کار می‌کنه و مشکلی نداره.

ساخت ورکر و تسک‌ها

اولین ادیت به فایل extensions اضافه می‌شه

from celery import Celery  
celery = Celery() 

ابجکت رو خام اینجا می‌سازیم تا درگیر circular import نشیم. بعدا می‌بینید کاربردش رو.

حالا باید بریم داخل فایل application و کدهای زیر رو اضافه می‌کنیم

def create_celery(app):
    celery.config_from_object(app.config)
     return celery

ابجکت اینجوری کار می‌کنه که ما هر وقت می‌خوایم یه ورکر سلری رو استفاده کنیم، این تابع رو صدا می‌زنیم و ازش استفاده می‌کنیم. بنابراین میشه با یه کد، چندین اینستنس مختلف با تنظیمات مختلف رو اجرا کرد، بدون اینکه نیازی باشه کدهای ساخت سلری رو عوض کنیم.

قبل از اینکه بریم فایل ورکر رو بسازیم، نیاز هست که یه سری تنظیمات برای سلری به فایل config اضافه کنیم. سلری برای کار کردنش، نیاز به یه روشی داره که بتونه پیام‌ها رو از ما بگیره و نتیجه‌اش رو بهمون برگردونه. برای اینکار باید از یه مسیج‌کیو (message queue) استفاده بشه. بعضی‌های ردیس استفاده میکنن ولی من ربیت(rabbitmq) استفاده کردم. دلیل هم این هست که توی این پروژهٔ بخصوص ما، جاهای دیگه‌ای داریم از ربیت استفاده میکنیم و از قبل نصب هست. پس میشه ازش استفاده کرد بدون مشکل.

توی فایل config باید چیزی اضافه بشه. ساختار این برای من به صورت کلاس‌های مختلف برای محیط های مختلف هستش. ProductionConfig برای محیط پروداکشن، DevelopmentConfig برای وقتی که دارم دولوپ می‌کنم و... تمام این‌کلاس‌ها یه سری دیتای مشترک با هم دارن که توی یه کلاس به اسم DefaultConfigs قرار دارن و کلاس‌های کانفیگ پروداکشن، دولوپ، تست و ... از اون ارث‌بردن. بنابراین با خیال راحت تنظیمات رو توی همین کلاس دیفالت اضافه میکنم که همه داشته باشنشون.

 class DefaultConfigs: 
    ...
    CELERY_BROKER_URL = "amqp://localhost/"
    CELERY_RESULT_BACKEND = "rpc://"

تو خط اول، داریم ادرس بروکر رو بهش میدیم که اینجا ربیت‌هستش و دومین، ادرس جایی هست که نتیجه عملیات‌ها توش ذخیره خواهند شد! میتونه یه دیتابیس باشه یا هرچیزی. اینجا من فعلا درگیر این قضیه نمی‌شم.

حالا باید بریم یه ورکر بسازیم که بتونه اجرا بشه.

توی دایرکتوری روت پروژه و کنار deploy ام یه فایل درست می‌کنم به اسم worker.py و کدهای زیر رو داخلش می‌نویسم:

from project.application import create_app, create_celery
from project.config import ProductionConfig

app = create_app(ProductionConfig)
celery = create_celery(app)

with app.app_context():
    celery.start()

توی خط اول، داریم توابع فکتوری رو ایمپورت می‌کنیم. این توابع رو نیاز داریم تا بتونیم ابجکت‌های سلری و فلسک رو باهاشون بسازیم.

خط دوم داریم کانفیگ پروداکشن رو ایمپورت می‌کنیم.

دو خط بعدی دارن با استفاده از توابع فکتوری، دوتا ابجکت فلسک و سلری می‌سازن. می‌بینید که کاملا توابع می‌تونن به سادگی و فقط با پاس دادن یه کانفیگ دیگه، یا اپلیکیشن دیگه، کاملا رفتارشون عوض بشه و این زیبایی استفاده از فکتوری‌هاست.

در نهایت، چون میخوایم از دیتابیس‌ها استفاده کنیم، و دیتابیس ها توی کانتکست فلسک فقط درست کار میکنن، تابع celery.start رو داخل کانتکست فلسک‌مون اجرا می‌کنیم.

خوب، تبریک می‌گم! مراحل راه‌اندازی سلری و فلسک به پایان رسید. شما می‌تونید برید و تسک‌هاتون رو اضافه کنین.

من توی پوشه project ام، یه دایرکتوری دارم به اسم utils که داخلش کدهایی که استفاده می‌کنم هستن. یه فایل tasks.py توی این پوشه درست میکنم و کدهای زیر رو توش می‌نویسم

from project.extensions import db, celery

همینجور که می‌بینید، رفتم از فایل extensions ابجکت سلری و دیتابیس رو ایمپورت کردم. (ابجکت db هم دقیقا به همون شکلی اونجا ساخته شده که ابجکت سلری ساخته شده)

حالا، باید مدل‌هام رو از داخل اپ‌هام ایمپورت کنم به داخل این فایل تا بتونم استفاده‌شون بکنم

from project.extensions import db, celery
from project.apps.report.models import Names

حالا باید کدها و توابع‌ام رو بنویسم.

@celery.task(name="cl_get_names")
def get_names():
    names = Names.query.all()
    return [x.name for x in names]


در کد بالا، با یه دکوریتور یه تسک درست می‌کنیم. ارگویمنت name برای این هست که سلری با این اسم بشناستش و اگر احتمالا توی کد‌هام جای دیگه‌ای همین تابع رو استفاده کرده بودم، قاطی پاتی نشه چیزا. همچنین من سعی می‌کنم اسمی که برای تسک‌هام میذارم از بیرون قابل فهم باشه. مثلا دو حرف اول اسم تسک رو از اسم اپ می‌گیرم(اینجا من cl برای سلری رو گذاشتم) و بعد اسم خود تسک.

در نهایت ادامه کد‌ها کاملا عادی هستند.

خوب، حالا باید از یه طریقی به سلری(ورکر) بفهمونیم که باید این تسک‌ها رو ارائه بکنه برای کاربرانمون.(بنابراین شما می‌تونین برای کاربردهای مختلف، ورکر های مختلفی رو با همون یک کد اجرا کنین و در نهایت نیازی هم به تغییر کدهاتون ندارید.)

من دوباره به فایل ورکر(worker) بر می‌گردیم تا بهش بگم چه تسک‌هایی رو باید برام اجرا کنه.

from project.application import create_app, create_celery
from project.config import ProductionConfig
from project.utils.tasks import (get_names,  )  # we added this!

app = create_app(ProductionConfig)
celery = create_celery(app)

with app.app_context():
   celery.start()

بدون تغییر در چیزی، فقط اسم تابع‌ای که تسک‌هام داخلش هست رو ایمپورت می‌کنم. اونها رو توی پرانتز می‌ذارم چون ممکنه کلی تسک داشته باشم و خوب، قائدتا میخوایم بریم توی چند خط احتمالا بنابراین از همین اول پرانتز رو می‌ذارم.

حالا یه تب جدید توی ترمینال باز می‌کنم(پیشنهاد میکنم از چیزی مثل tmux استفاده کنین این‌تیپ مواقع که چند تا سرور رو میخواید اجرا کنین) و با کد پایین ورکر‌ام رو اجرا می‌کنم.

celery -A worker:celery worker --loglevel=info

توی فایل worker کلی ابجکت و اینا ساختم، یکیش app که مربوط به فلسک هست. بنابراین با استایل بخصوص worker:celery به سلری می‌فهمونم که از فایل ورکر، ابجکت سلری رو بخونه و استفاده کنه. در نهایت لاگ‌لول رو هم اینفو میذارم تا بتونم ببینم چی داره میشه.

استفاده از تسک‌ها

برای استفاده از این تسک‌ها، جایی که میخوام ازشون استفاده کنم مثل یه تابع معمولی اونها رو باید ایمپورت کنم. برای مثال، من میخوام لیست کل نام‌ها رو بگیرم توی فایل views.py ام، بنابراین به این صورت تسک‌رو ایمپورت می‌کنم:

from project.utils.tasks import (get_names)

@app.route("/get_names/")
def get_names_view():
    get_names.delay()
    return "request sent to celery" 

همینجور که می‌بینید استفاده کاملا شبیه به یه تابع معمولی هست، با این تفاوت که تابع نیست و یه ابجکت هست. ما میتونیم مثل تابع معمولی صداش کنیم(()get_names) و نتیجه بگیریم، ولی نکته این هست که اینجا دیگه سلری کار رو انجام نداده. برای اینکه تاکید کنیم که سلری تابع رو اجرا کنه، باید ابجکت مد نظر رو با تابع delay اش صدا بزنیم. که توی کد می‌بینید.

همچنین در صورتی که ارگومانی باشه که بخوایم پاس‌بدیم خیلی عادی برای همین تابع delay می‌فرستیمش.

اجرای اپ فلسک کاملا طبیعی هستش و مثل همیشه که اجرا می‌کنید!

این شیوه از اجرا و استفاده از استک سلری، فلسک و سیکول‌الچمی(sqlalchemy) معمولا زیاد استفاده نمی‌شه ولی با توجه به ساختار بخصوص پروژه ما، شیوه اجرا به این صورت بودش.

متن بسیار طولانی هست، ولی انجام دادنش کمتر از ۱۰ دقیقه طول می‌کشه و واقعا راه‌اندازی تسک‌ها با سلری راحت و سریع هستش.

بعد از این چی؟

بعد از این، باید نتیجه رو یه جوری به کاربر نشون بدین! این بخش از داستان کار راحتیه و بعید می‌دونم مشکلی باشه براش، آموزش‌های زیادی هم براش وجود داره پس من متن رو همینجا کوتاه می‌کنم دیگه!

موفق باشید.