داستان‌های Data Delivery در زیرساخت داده‌ی دیوار

من محسن ایمانی هستم، دانشجوی ترم آخر کارشناسی مهندسی کامپیوتر دانشگاه تهران. ماجرای من و دیوار از سامرکمپ سال ۹۸ شروع شد که یک دوره‌ی آموزش دوماهه‌ی Software Engineering بود. بعد از این دوره با توجه به علاقه‌ای که به موضوعات مرتبط با دیتا و محصول دیوار داشتم، تصمیم بر این شد که به تیم زیرساخت داده‌ی دیوار اضافه بشم. شروع کار طبیعتا خیلی برام چالش برانگیز بود، چون قبل از اون هیچ تجربه‌‌ای تو این فیلد نداشتم و حتی اسم خیلی از تکنولوژی‌هایی که استفاده می‌کنیم رو هم نشنیده‌بودم؛ اما با اعتماد و کمک اعضای تیم، تونستم این چالش‌ها رو پشت سر بذارم و تجربه‌های جذاب و ارزشمندی رو به دست بیارم.

بعد از حدود یک سال و نیم فعالیت به عنوان Data Engineer در تیم زیرساخت داده، در این مقاله قصد دارم تعدادی از ماجراهایی رو که حول مسائل Data Delivery در تیم داشتیم، شرح بدم.

توی پست قبلی زیرساخت داده‌ی دیوار، درباره‌ی ماهیت کارمون، ضرورت استفاده از دانش بیگ‌دیتا، و تکنولوژی‌هایی که در کارهامون ازشون استفاده می‌کنیم و همین‌طور پایپلاین‌های دیتا صحبت کردیم. در این مقاله قصد داریم آستین‌ها رو بالا بزنیم و ابزارهایی که خودمون پیاده‌سازی کردیم و توسعه دادیم رو توضیح بدیم.

اگر یادتون باشه (اگر هم یادتون نیست می‌تونین پست قبلی رو بخونین) اواخر پست قبلی اشاره‌هایی شد به تمیزسازی داده و ابزارهایی که برای استفاده‌کننده‌های داده فراهم کردیم. این دفعه می‌خوایم یکم مفصل‌تر در مورد این موضوعات صحبت کنیم.

مفهوم ETL

عبارت ETL مخفف Extract Transform and Load به معنای استخراج، پالایش و بارگیری اطلاعاته. به طور خلاصه Extract می‌شه همون بخشی از پایپ‌لاین که دیتای خام رو از منابع مختلف به دست میاریم و توی Data Warehouse ذخیره می‌کنیم. Transform میشه بخش تمیزسازی و cleaning داده‌ها و در نهایت Load میشه بخش انتهای کار که استفاده‌کننده‌های داده، داده‌های تمیز شده رو از منابع مختلف می‌خونن و کاری که می‌خوان رو روش انجام میدن (این تعریف بسته به context میتونه متفاوت باشه. بنابراین اگه etl رو سرچ کردین و با تعاریف یکم متفاوت‌تری روبرو شدین خیلی تعجب نکنین). توی این مقاله روی بخش‌های Transform و Load صحبت می‌کنیم.

کیفیت داده یعنی چی؟

به طور کلی هدف از data cleaning خیلی ساده و مختصر، اینه که کیفیت داده رو بالا ببریم؛ حالا سؤالی که پیش میاد اینه که کیفیت داده یا data quality چیه؟
به طور خلاصه یعنی دیتا یه شکلی باشه که بشه گذاشت جلوی مهمون! (مهمونای ما رو هم که می‌شناسین دیگه؟ دیتاساینتیست‌ها، دیتاآنالیست‌ها). اما از اون‌جایی که ما اسم‌مون رو گذاشتیم مهندس، باید یکم تعریف دقیق‌تری ارائه بدیم که قابل اندازه‌گیری هم باشه. فاکتورهایی که برای data quality میشه تعریف کرد تو کاربردهای مختلف ممکنه فرق کنه؛ اینجا چندتا نمونه از فاکتورهای کیفیت داده رو نام می‌بریم و راجع به هر کدوم یه توضیح کوچیک می‌دیم که با مفهوم آشناتر بشیم:

  • فاکتور validity: این که چقد با محدودیت‌های دنیای واقعی و محدودیت‌های بیزینسی هم‌خوانی داره. مثلا ساعت ۲۶:۰۰:۰۰ ساعت validای نیست.
  • فاکتور accuracy: این که چقد دیتایی که داریم به مقدار واقعی اون دیتا نزدیکه. مثلا وقتی اکشن ساعت 12:22:45 اتفاق میفته ولی زمانی که ما ذخیره کردیم ۱۸:۳۱:۲۰ باشه، ینی فیلد created at اکشن‌لاگمون فاکتور accuracyاش پایینه.
  • فاکتور completeness: این که چقد از کل دیتایی که وجود داشته رو ما داریم. مثلا اگه ۱۰۰۰تا اکشن کلیک روی یه پستی اتفاق افتاده و توی دیتایی که ما نگه می‌داریم ۹۰۰تا رکورد کلیک‌پست روی پست مورد نظر داشته‌باشیم، یعنی ۹۰ درصد فاکتور completeness رو داریم.
  • فاکتور consistency: این که چقد دیتایی که داریم، همخوان با خودش و بقیه‌ی datasetها هست. مثلا اگه توی دیتای اکشن‌لاگ رکورد click post روی پست با توکن blahblahblah که توی زمان t اتفاق افتاده وجود داشته‌باشه، باید توی دیتای پست‌ها هم پستی با توکن blahblahblah وجود داشته‌باشه که توی زمان t جزء پست‌های منتشرشده باشه.
  • فاکتور conformity: این که چقد دیتایی که داریم یک شکله. مثلا اینکه همه‌ی ستون‌ها با فرمت camel case یا snake case نام‌گذاری شده‌باشند، همه‌ی دیتاهای یک دیتاسورس شمای یکسانی داشته‌باشند، همه‌ی داده‌های فیلد شماره تلفن در قالب یکسانی باشن(مثلا 09xxxxxxxxx) و … .

چرا کیفیت داده مهمه؟

جمله‌ای که ابتدای پست قبلی نقل کردیم رو به یاد بیارید: «اگر شرکتی تصمیم‌های محصولی مبتنی بر دیتا را مد نظر قرار ندهد، نمی‌تواند رشد کند.» هر چقدر کیفیت داده‌هامون بالاتر باشه، استفاده‌کننده‌های دیتا ترنزیشن‌های کمتری برای استخراج دیتای مورد نظرشون از دیتای اولیه انجام میدن (مثلا اگه همه‌جا فرمت شماره تلفن‌ها به شکل 09xxxxxxxxx باشه، دیگه لازم نیست برای تحلیل شماره تلفن‌ها اول روی این کار بشه که فرمت‌ها رو یکی کرد)، این اتفاق باعث میشه که تمرکزشون روی کار اصلی‌ای که قراره انجام بدن، یعنی تحلیل داده بمونه؛ از دوباره کاری هم جلوگیری میشه (به جای این که هر کی با دیتا کار میکنه یه دور از اول دیتا رو واسه خودش تمیز کنه، یه بار ما دیتا رو تمیز می‌کنیم و همه از دیتای تمیز شده استفاده می‌کنن). از طرفی بعضی از استفاده کننده‌های دیتا، مثلا بچه‌های محصول لزوما دانش فنی ندارن و نباید درگیر پیچیدگی‌های فنی بشن. بنابراین اگه دیتا تمیز باشه این افراد هم با یه سری join و aggregation ساده که نیاز به دانش فنی خاصی هم نداره می‌تونن متریک‌هایی که نیاز دارن رو ببینن. نتیجه‌ی همه‌ی این اتفاق‌ها این میشه که پرفورمنس افراد بالاتر میره (با توجه به این که تمرکزشون رو حیطه‌ی تخصصی خودشون می‌مونه) و هم این که خروجی‌ای که به دست میاد هم با کیفیت‌تر و در نتیجه قابل اعتمادتر خواهد بود.

حالا که با مفاهیم اولیه‌ی بحثمون آشنا شدیم، بریم ببینیم که چه کارهایی توی تیم ما در راستای مواردی که بالاتر بهش اشاره کردیم انجام شده.

سرویس Metadata Collector (MDC)

یکی از مشکلاتی که همیشه بین اعضای چپتر دیتا وجود داشت، این بود که پیدا کردن دیتای متناسب با نیازی که داشتن بعضا سخت بود. مثلا برای یه تحلیل روی آمار پست‌های دسته‌بندی‌های مختلف، ممکنه به دیتای پست‌های منتشر شده نیاز داشته‌باشن اما نمیدونستن که این دیتا رو از کجا می‌تونن به دست بیارن. قبلا این اطلاعات جایی داکیومنت نشده‌بود و به صورت شفاهی بین افراد منتقل می‌شد، یا افراد با آزمون و خطا سعی می‌کردن دیتای مورد نظرشون رو پیدا کنن. ایرادات این وضعیت نسبتا واضحه، ولی اگر بخوام چندتا نمونه بگم، می‌تونم به این موضوع اشاره کنم که پیدا کردن کسی که جواب سؤالمونو بدونه، معمولا کار سختیه. علاوه بر این، بلاک شدن بعضی کارها به خاطر در دسترس نبودن افراد و آپدیت نبودن اطلاعات (مثلا تیبلی deprecate میشه و یه تیبل دیگه باهاش جایگزین میشه، ولی خیلیا در جریان قرار نمی‌گیرن و از دیتای deprecate شده استفاده می‌کنن) از مهم‌ترین مشکلاتی بوده که ما تجربه کردیم. برای حل این مشکل، ما سرویس Metadata Collector یا به اختصار MDC رو راه‌اندازی کردیم که هدف اصلیش کمک به تسهیل Data Discovery بود. توی MDC برای هر کدوم از دیتاسورس‌ها، اطلاعاتی مثل استوریج ذخیره‌سازی، آدرس فایل‌ها، فرمت ذخیره‌سازی، اسکجول پر شدن، شِمای داده، معنی هر کدوم از فیلدها شِما و … رو نشون میدیم. این سیستم تا حدی مسئله‌ی Data Discovery رو راحت‌تر کرد برامون. البته هنوز در حال توسعه‌س و داریم بهبودش می‌دیم. سرویس MDC توی پایپ‌لاین‌های زیرساخت دیتا هم کاربردهایی داشته که در ادامه‌ی همین متن به بعضی از این کاربردها اشاره خواهیم کرد.

پروژه Data Interface – Data Schema Conformity

یکی دیگه از نیازمندی‌هایی که توی تیم ما و چپتر دیتا همیشه حس میشد، داشتن اسکیمای مشخص برای هر کدوم از دیتاسورس‌ها بود، به خصوص وقتی یه دیتاسورس از پایپ‌لاین‌های مختلف پر میشه (مثلا بعضی از دیتاسورس‌ها دوتا پایپ‌لاین براشون پیاده‌سازی شده که یکیش به عنوان بکاپ استفاده میشه، برای وقتایی که تو پایپ‌لاین اصلی مشکل به وجود بیاد). چیزی که در ابتدای کار طراحی کردیم، به این صورت کار میکرد که انتهای هر روز دیتاهای کلین‌شده‌ی روزهای قبلی رو میخوندیم و اسکیمای دیتا رو ازش استخراج و تو دیتابیس MDC ذخیره می‌کردیم. مشکلی که این سیستم داشت این بود که اسکیما رو در واقع ما محدود نکرده‌بودیم؛ از سورس اولیه هر چی میومد همون اسکیما ذخیره می‌شد. از طرفی گوناگونی داده‌ها توی بعضی دیتاسورس‌ها باعث می‌شد که اسکیماشون در ظاهر هر روز تغییر کنه. مثلا در مورد اکشن‌لاگ، ممکنه یه روز یه اکشنی رو disable کنیم یا این که کلا اون اکشن تو کل روز اتفاق نیفته یا یه مدت کوتاهی برای تست ریلیز بشه و بعد حذف شه، این جور اتفاقات بعضا باعث میشد که فیلدهای اکشن‌لاگ کم و زیاد بشه و هر روز ورژن جدیدی از اسکیما ایجاد بشه. یه مثال دیگه، برای دیتاهایی که دیتای خامشون به شکل jsonئه، فیلد‌های integerاشون بسته به بزرگی عدد تو اسپارک میتونه به integer یا long ترجمه بشه؛‌ این تفاوت تایپ یه ستون مثلا موقع union کردن دیتاهای چند روز مختلف با pyspark پیچیدگی ایجاد میکنه. این دست مشکلات ما رو برد به این سمت که به یه سرویس جدیدی فکر کنیم که همه‌ی اسکیماها رو خودمون توش وارد کنیم و روی اسکیمای دیتاها کنترل داشته‌باشیم. اینجا بود که پروژه‌ی Data Interface رو شروع کردیم. توی این پروژه، اسکیمای دیتاسورس‌ها رو در قالب json با استفاده از dictionary پایتون ذخیره و نگهداری می کنیم. خروجی این پروژه یه پکیج پایتونه که یه سری تابع در اختیارمون قرار میده که با استفاده از اونا می‌تونیم اسکیمای دیتاسورس‌ها رو در قالب‌های مختلف مثل pyspark dataframe schema، hocon schema و json string دریافت کنیم.

این سرویس جدید یه مزیت دیگه‌ای هم داشت. قبلا برای حل کردن مشکلاتی که بالاتر توضیح دادیم، تو قسمت‌های مختلف کدها و پایپ‌لاین‌هامون راه‌حل‌های تریکی و بعضا کثیف و کلی کد duplicate داشتیم. وجود Data Interface کمک کرد که integrity اسکیماها توی پایپ‌لاین‌های مختلف رو بتونیم به شکل ساده‌تر و تمیزتری حفظ کنیم.

داده‌ها دو دسته‌اند: دسته‌ی اول و دسته‌ی دوم!

خب حالا که خیالمون از بابت اسکیمای خروجی هم راحت شد، بریم سراغ مسئله‌ی اصلی؛ data cleaning! توی data cleaning میخوایم دیتای خامی که از سمت کلاینت یا بکند برامون فرستاده میشه رو تبدیل کنیم به فرمتی که تو کار کردن باهاش راحت‌تریم و با اسکیمایی که توی Data Interface براش تعریف کردیم ذخیره‌ش کنیم؛‌ در رابطه با cleaning، میتونیم دیتاها رو به دو دسته تقسیم کنیم:

  • یه سری دیتا هستن – مثل دامپ دیتابیس‌ها – که معمولا فرمت دیتای خامشون هم تمیزه و توی پروسه‌ی cleaning خیلی نیاز نمیشه که transform خاصی روشون انجام بشه و در حد یه سری type casting واسه adjust کردن اسکیما تمیزسازی روشون انجام میشه. از نظر validity هم معمولا قبل این که به دست ما برسه چک شده و از این بابت هم خیلی نیازی نمیشه که ما کاری بکنیم.
  • یه سری دیتای دیگه هستن که دیتای خامشون یا structure خیلی خاصی نداره (مثلا در قالب json میاد) یا این که به دلیل نیازمندی‌هایی که تو قسمت‌های دیگه‌ی محصول وجود داره structureاشون با structureای که برای ما مطلوبه اختلاف زیادی داره. بعضی از دیتاها هم – مثل اکشن‌لاگ – اولین بار سمت ما ذخیره میشن و در نتیجه معمولا تضمینی روی validity دیتای خام و همچنین consistencyشون با بقیه‌ی دیتاها وجود نداره (مثلا ممکنه ساعت دیوایس کاربر مشکل داشته باشه و در نتیجه تایمی که برای اتفاق افتادن اکشن‌لاگ میخوره درست نباشه). همون طور که متوجه شدین، این دسته از دیتاها پیچیدگی بیشتری برای پیاده‌سازی cleaning و چک کردن quality دارن.

شما داده‌هاتون رو با چی تمیز میکنین؟

همونطور که گفتیم تمیزسازی داده‌هایی که در دسته‌ی اول قرار می‌گیرن پیچیدگی خاصی نداره. پس یکم در مورد نحوه‌ی پیاده‌سازی Data Cleaning دسته‌ی دوم صحبت می‌کنیم. کاری که لازمه اینجا انجام بشه توی کیس اکستریم اینه که دونه دونه فیلدهای دیتای کلین شده رو از روی دیتای خام درست کنیم. این حالت وقتی به وجود میاد که اسکیمای دیتای خام با دیتای کلین شده تفاوت اساسی بکنه. مثلا در مورد اکشن‌لاگ‌های صفحات widget based این اتفاق میفته. دیتای این اکشن‌لاگ‌ها یه بخشی از اون سمت کلاینت و یه بخشی هم سمت بکند پر میشه؛ برای این که این هماهنگی بین بکند و کلاینت حفظ بشه نیازه که فرمت خاصی توش رعایت بشه. اما این فرمت برای استفاده‌کننده‌های دیتا خیلی خوب نیست و پیچیدگی‌های غیرضروری ایجاد میکنه. بنابراین لازمه که دیتای کلین شده با اسکیمای متفاوتی ذخیره بشه.

در نگاه اول به نظر میرسه که پیاده‌سازی کدی که این کارو برامون انجام بده پیچیده و کثیف از آب دربیاد. اما کاری که ما کردیم چی بود؟ ما قالبی رو طراحی کردیم به این شکل که یه تابعی (مثلا اسمشو بذاریم driver) وجود داره که یه سطر از دیتای خام و یه کانفیگ دریافت میکنه، و یه سطر دیتای clean شده خروجی میده. توی کانفیگی که به این تابع پاس داده‌میشه، مشخص شده که چه تابع‌هایی رو چه فیلدهایی از دیتای خام و با چه آرگومان‌هایی باید اجرا بشه تا دیتای clean شده به دست بیاد. کاری که driver میکنه اینه که آرگومان‌های این تابع‌ها رو استخراج کنه و روی سطری از دیتا که توی ورودی دریافت کرده اعمال کنه تا دیتای clean شده به دست بیاد. توابعی هم که توی کانفیگ قرار داده‌میشن تا حد امکان سعی میشه که کوچیک و ساده باشن. هر کدوم از این تابع‌ها میتونن در راستای دستیابی به یک یا چندتا از متریک‌های data quality که ابتدای این مقاله بهشون اشاره شد ایجاد شده‌باشن.

خوبی این روش اینه که نیازی نیست واسه هندل کردن چیزهای جدید توی کلینینگ، کد اصلی کلینینگ رو دستکاری و آپدیت کرد و صرفا کافیه که کانفیگ کیلینیگ رو آپدیت کنیم. همچنین کمک میکنه بدون این که خوانایی کد به شکل معناداری افت کنه روی جزئیات هم تسلط بالایی داشته‌باشیم.

نحوه‌ی استفاده از این سیستم هم به این شکله که اول با اسپارک دیتای خام رو از رو hdfs در قالب rdd میخونیم، روش یه map میزنیم و تابع driver رو روی همه‌ی سطرها اعمال می‌کنیم، rdd به دست اومده رو با اسکیمایی که توی data interface تعریف شده به pyspark dataframe تبدیل میکنیم و در قالب parquet توی مسیری که توی mdc براش تعریف شده ذخیره میکنیم.

خوان آخر؛ bil:

بعد از این که تونستیم دیتای تمیز شده رو هم به دست بیاریم و مرحله‌ی Transform رو هم با موفقیت پست بذاریم، میرسیم به مرحله‌ی آخر که Load کردن دیتاست. اینجا نیاز به ابزارهایی داریم که بتونیم داده‌ها رو باهاش بخونیم و باهاشون کار کنیم. قبل‌ترها اینطوری بود که هر کس یه سری تابع utility برای خودش تعریف می‌کرد که داده‌هایی که بیشتر باهاشون کار میکنه رو راحت‌تر بخونه. مشکلی که وجود داشت این بود که هم کلی کد دوپلیکیت تو جاهای مختلف به وجود میومد که خوب نبود، هم مشکلاتی که در بخش مربوط به mdc مطرح کردیم اینجا هم بازتولید میشد. این مشکلات ما رو برد به سمتی که یه ابزار متمرکز ارائه بدیم که همه بتونن ازش استفاده کنن. این ابزار رو در قالب یک پکیج پایتون که اسمشو گذاشتیم bil فراهم کردیم. برای داده‌های پرکاربرد تابع‌هایی با قالب‌های مشخص تعریف کردیم که همه بتونن راحت‌تر به داد‌ه‌های مورد نیازشون دسترسی پیدا کنن.

به جز خوندن دیتا یه سری کارهای جزئی دیگه هم تو bil انجام میشه. مثلا برای ستون‌هایی که از جنس زمان هستن، این که تایم‌زون اون ستون چی باشه مهمه. توی bil موقع خوندن دیتاها تعیین می‌کنیم که ستون‌های از جنس زمان تو چه تایم‌زونی باشن؛ یا مثلا برای دیتاهایی که حجم خیلی زیادی ندارن، تسک کلینینگ جدا پیاده‌سازی نمیکنیم و موقع خوندن با bil به شکل on the fly دیتا رو تمیز میکنیم.

همچنین برای به دست آوردن آدرس فایل‌های داده‌ها و از طرف دیگه برای به دست آوردن schema دیتاهایی که کلینینگشون توی bil به شکل on the fly انجام میشه این پکیج با MDC و data interface نیز integrate شده.

در تصویر زیر یه نمونه از تابع‌هایی که توی bil پیاده‌سازی شده و نحوه‌ی فراخوانی‌ش توی محیط zeppelin رو میتونید ببینید:

چی گفتیم چی شنیدین؟

اگر در نهایت بخوایم یه جمع‌بندی مختصر بکنیم، تو این مقاله اولش با مفهوم etl و بعدش data quality و ضرورت و اهمیتش آشنا شدیم؛ بعد در ادامه با ابزارها و سرویس‌هایی که برای پیاده‌سازی etl و رسیدن به data quality بهتر پیاده‌سازی و استفاده کردیم مثل mdc و data interface مختصرا آشنا شدیم و در نهایت پیاده‌سازی عملیات cleaning و integrationاش با mdc و data interface رو دیدیم. در نهایت هم با bil که ابزاری برای خوندن دیتا بود آشنا شدیم. امیدواریم مطالب براتون جذاب و مفید باشه؛ نظرات‌تون رو با ما در میون بذارید.