از این سری: یک دور با کافکا از صفر تا صد
این پست حاصل تلاش من برای فهم Apache Druid [که از این به بعد به آن برای سادگی دروید میگوییم] است و بیان روش من برای فهم یک موضوع: رفتن قدم به قدم در یک دور کامل از صفر تا صد. برای نوشتن این پست از نتایج سرچهایم، راهنما و سورس دروید استفاده کردهام اما به هر حال ممکن است فهم من در موردی غلط باشد که این لطف شماست اگر آنرا اصلاح کنید.
دروید از معماری «اشتراک در هیچ چیز» (Shared Nothing Architecture: SNA) پیروی میکنه. این به این معنیه که قسمتها مختلف روی نودهای جداگانه (یا بسته به شیوهی اجرا روی JVM مجزا) بالا میان و هیچ اشتراکی در منابع ندارن.
برای درک بهتر این معماری در دروید میتونیم پسگرس رو به عنوان یک پایگاه داده توزیعنیافته با معماری SNA مقایسه کنیم. وقتی یک نسخه از پسگرس بالا میاد، تمام اجزایی که نیازه تا پسگرس کار کنه (مثل انجین اجرا یا انجین ذخیرهسازی) با همون نسخهی اجرایی حاضرن و نیاز به کار بیشتری نیست. در دروید اما داستان به این شکل نیست. هر کلاستر دروید از پنج نوع مختلف نود تشکیل شده:
در کنار این نودها نوع Router هم وجود داره که بود و نبودش اختیاریه و در ادامه به همراه سایر نودها به بررسی اون میپردازیم.
این نود مسئولیت مدیریت دادهها و توزیع اونها رو بر عهده داره. یکی از مهمترین وظیفههای این نود کنترل بار به وسیلهی توزیع کردن دادهها روی نودهای مختلف Historicalه. یعنی اگه یه نود Historical میزبان حجم بالایی از دادهها باشه میاد به یکی دیگه از این نودها میگه داداش شما فلان دادهها رو بارگذاری کن و به اون نودی که شلوغ باشه میگه فلان دادهها (همونایی که به اون یکی نود Historical گفته بارگذاری کنه) رو بیخیال شو. ارتباط این نود با بقیه غیرمستقیمه و از طریق Zookeeper صورت میگیره.
این نود مسئولیت اجرای تسکها، توزیع تسکها و ساختن Supervisorها رو بر عهده داره. تسک کوچکترین واحد اجراست و Supervisor بسته به نوع Indexer چیزیه که میتونه تسکی رو ران کنه. Indexer هم چیزیه که باهاش دروید به یک منبع داده وصل میشه.
بروکر نودیایه که کوئریها رو دریافت میکنه، برای اجراشون برنامهریزی میکنه و در نهایت نتیجهی محاسبه رو به درخواستکننده بر میگردونه. این نود با Zookeeper در ارتباطه و از اونجا میتونه بفهمه که دادهها چطور روی کلاستر توزیع شدن و مسیر دسترسی بهشون چیه.
نود Middle Manager نودیه که تسکها رو میپذیره و شروع به بافر کردن دادهها موقع تزریق میکنه. این نود با شروع ساختن پارتیشن داده، اون رو در Zookeeper ثبت میکنه و با رسیدن کوئری از طرف Broker محاسبات رو روی دادههای بافرشده انجام میده و نتیجه رو به بروکر پس میده. در نهایت دادههای بافر شده رو روی Deep Storage ذخیره میکنه و انتشار اونا رو اعلام میکنه.
نود Historical مسئول محاسبات روی دادههای تاریخیه. در شروع وقتی Coordinator اعلام میکنه که دادهی تاریخیای منتشر شده، Historical کشش رو چک میکنه که ببینه داره اون داده رو یا نه، اگه نداشت داده رو از Deep Storage بارگذاری میکنه. در نهایت هم وقتی از طرف بروکر کوئریای بهش میرسه محاسبات رو روی دادهها انجام میده و نتیجه رو به بروکر بر میگردونه.
روتر نودیایه که کنسول (پنل گرافیکی کار با دروید) رو بالا میاره و میتونه درخواستها رو به نودهای مختلف پروکسی کنه. پروکسی کردن روتر خیلی قدرتمنده و میشه با استفاده از اون کوئریهای مختلف (بر اساس زمان یا تایپ کوئری) رو به بروکرهای مختلف هدایت کرد. این نود با صورت مستقیم با نودهای Coordinator، Overlord و بروکر در ارتباطه.
معماری پیشنهادی دروید یک دستهبندی منطقی سهگانه از نودهاست. سرورهای مستر شامل نودهای Coordinator و Overlord، سرورهای کوئری شامل نودهای بروکر و روتر و سرورهای دیتا شامل نودهای Middle Manager و Historical هستن. به صورت مشخص نودهای مستر با هیچ کدوم از نودها به صورت مستقیم کاری ندارن و ارتباط با اونها رو با Zookeeper انجام میدن. از نودهای کوئری، بروکر به نودهای دیتا دسترسی داره و از نودهای دیتا هم هیچ کدوم مستقیم با هم کاری ندارن.
حالت عادی ذخیرهسازی دادهها روی چیزی مثل HDFS و کوئری زدن با چیزی مثل اسپارک رو در نظر بگیریم. چیزی که واضحه اینه که سرعت اجرای کوئریها پایینان و به درد کوئریهای تقریباً در لحظه نمیخورن. راهحل عموماً ساختن مارتهای داده برای دادههای پردازش شدهست. این کار مستلزم ساختن پایپلاینهای متفاوت و نگهداری از اوناست. دروید رو میشه به نوعی مکمل این سیستم دونست: ذخیرهسازی دادههای تاریخی در جایی مثل HDFS و سپردن کوئریهای تحلیلی از ساخت پایپلاین تا بهینهکردن اجرای کوئریها به دروید. به صورت مشخص دروید Cubeعه که به صورت خفنی روی Roll-up تمرکز کرده.
قبل از رفتن سروقت roll-up اول بببینیم دروید چه اثرپذیریای از بقیهی پایگاه دادههای OLAP داشته. دروید از طرفی از cubeها بُعد (dimension) رو به ارث برده و از طرف دیگه متأثر از پایگاه دادههای سری زمانی (time series databases) از زمان به عنوان یک بعد ثابت و دارای تظریف استفاده کرده. مثل بقیهی OLAPها یک دسته از مقادیر هم به عنوان اندازهها (measures) یا متریکها وجود دارن که مقادیر عددی هستن و قراره در ترکیبی از بعدها معنای خاصی داشته باشن. مثلاً اگه سه بعد زمان، شهر و محصول رو به عنوان فاکتورهای موثر در یک گزارش در نظر بگیریم در یک ساختار cube به اینطور شکلی میرسیم:
همین شکل در یک ساختار جدولی به اینطور شمایلی در میاد:
اینجا جاییه که دروید از roll-up به عنوان یک عمل cube فراتر میره و اون رو به عنوان کنشی در هنگام ذخیرهی دادهها در نظر میگیره. هر خط منحصربفرد از ابعاد، دادههاش بنا بر عملگری که تعریف شدن، تجمیع میشن و اونطور ذخیره میشن. مثلاً برای خط time1-city2-product1 (ستون چهارم در تصویر بالا) اگه گفته باشیم عملگر بیشینه باید مورد استفاده قراره بگیره، به جای ذخیرهی اعداد ۴، ۱۵ و ۳۲ تنها عدد ۳۲ ذخیره میشه و یا اگه گفته باشیم از عملگر جمع استفاده بشه، عدد ۵۱ ذخیره میشه. محشر بودن دروید در roll-up به همینجا ختم نمیشه و دروید از برخی از دادهساختار احتمالی پیادهسازی شده توسط DataSketches هم پشتیبانی میکنه که فکر کنم اینجا جای باز کردنش نباشه و شاید تو یه پست دیگه برم سر وقتشون. در نهایت roll-up اجباری نیست و میشه متریکها رو به صورت خام ذخیره و بازیابی کرد، ولی اونطور دیگه صفایی نداره ماجرا.
تظریف یعنی میزان دقت موردنیاز. برای شرح بیشتر دروید از تظریف (ترجمهی granularity. لغت به این معنی نیست ولی فکر کنم درسته در اینجا این ترجمه) در دو جا پشتیبانی میکنه. یکی در سطح ذخیرهسازی دادهها (میزان دقت بُعد زمان) و یکی در سطح ذخیرهسازی فایلها. اول تظریف زمانی رو بررسی میکنیم و بعد از اینکه دیدیم دادهها چطور ذخیره میشن میریم سر وقت تظریف در ذخیرهی فایلها. مثلاً فرض کنیم فقط از زمان به عنوان بعد استفاده میکنیم و زمان الان هم که ۲۰ تیر ۱۴۰۱ - ۲۰:۱۶:۴۵عه. تظریف در اینجا مثل استفاده از تابع date_trunc داخل SQLه و مثلاً تظریف در مقایس ساعت، همهی دادهها در فاصلهی ۲۰ تیر ۱۴۰۱ - ۲۰:۰۰:۰۰ تا ۲۰ تیر ۱۴۰۱ - ۲۰:۵۹:۵۹ رو در یک سبد قرار میده.
اگه از سه منظر buffer کردن، mutable بودن یا نبودن و ترتیب بخوایم به انجین دروید نگاه بندازیم، همونطور که در بالا اومد، دروید دادهها رو در Middle Manager بافر میکنه، به صورت immutable ذخیره میکنه و دادهها بر اساس ابعاد مرتب میشن و طبعاً برای کوئریهای range خیلی مناسبن. برای توضیح از آخر به اول میریم. برای اینکه ببینیم دادهها چطور مرتب میشن دادههای زیر رو در نظر بگیریم:
دروید دادهها رو در سه دسته تقسیمبندی میکنه. اولین ستون زمانه (با نوع دادهی long)، دوم ستونهای بُعد میان (با نوع دادهی String) و سومی هم متریکها هستن (با نوع دادههای صحیح، اعشاری یا DataSketch). دستهی اول و سوم به صورت فشردهی LZ4 ذخیره میشن. دستهی دوم اما برای هر ستون از ترکیب سه دادهساختار نگاشت، دادههای نگاشتشده و ایندکس Bitset ساخته میشن. یعنی برای هر ستون بُعد با شروع از بعد زمان، دادهها مرتب میشن: برای ستون زمان در تصویر اول ساعت ۱ میاد و بعد ساعت ۲، برای ستون Page نامها برای ساعتهای ۱ و ۲ برابرن، برای ساعت ۱ و Page با مقدار Justin Bieber در ستون Username اول مقدار Boxer میاد و بعد Reach و قس علی هذا. بعد از این مرحله، برای هر ستون یک نگاشت رشته به عدد ساخته میشه. برای مثال برای ستون Page نگاشت اینطور چیزی میشه:
{ "Justin Bieber": 0, "Ke$ha": 1 }
بعد از اون ستون دادهها ساخته میشن:
[ 0, 0, 1, 1]
این یعنی در دو سطر اول برای ستون Page مقدار Justin Bieber و برای سطر سوم و چهارم مقدار Ke$ha بوده. در نهایت نوبت به ایندکس معکوس میرسه:
value="Justin Bieber": [1,1,0,0] value="Ke$ha": [0,0,1,1]
در ایندکس بالا اگه مقدار Ke$ha رو بخوایم سرچ کنیم، مقادیر ۱ نشون میدن که در چه سطرهایی میتونیم در این ستون اونها رو پیدا کنیم (در اینجا ۳ و ۴). پس در نهایت هر ستون با استفاده از نگاشت میتونه به حداکثر فشردهسازی برسه و با استفاده از ایندکس معکوس میتونه در جستجو سرعت بالاتری رو داشته باشه (جدا از اینکه با بالارفتن اندازهی (Cardinality) یک بُعد ایندکس هم خلوتتر میشه و دروید از این خاصیت برای فشردهسازی ایندکس استفاده میکنه).
در نهایت مقادیر هر ستون به صورت مجزا و باینری ذخیره میشن (به جز ابتدای ستون که به صورت یک Column Descriptorه که توسط Jackson قابل خوندنه و اطلاعات تصویر زیر در اون ذخیره شده). این محتوای باینری در چیزی به اسم Segmentفایلها ذخیره میشن.
سگمنت فایلیه که دروید دادهها رو با شرحی که رفت داخل اون ذخیره میکنه و به ازای هر بازهی زمانی تعریف شده توسط تظریف زمانی فایلها یا چند معیار دیگه ساخته میشه. این معیارها رو میشه به دو دسته تقسیم کرد: معیارهای پارتیشنبندی اولیه و معیارهای پارتیشنبندی ثانویه. پارتیشنبندی اولیه همون بخشبندی فایلها بر اساس واحدهای زمانیه. مثلاً وقتی تظریف فایلها رو روی هفته قرار میدیم، تمام دادههای یک هفته داخل یک فایل میان. اما این امر مشکلاتی رو هم داره. اینکه یک فایل میتونه خیلی حجیم بشه و پردازش رو سخت میکنه. برای حل این مشکل دروید روی پارتیشنبندی اولیه دو معیار دیگه هم قرار داده. معیار حجم فایل و معیار تعداد سطرهای ذخیره شده در فایل. یعنی اگه فرضاً حجم فایل رو ۵۰۰ مگ در نظر بگیریم و تعداد سطر رو ۵ میلیون، با رسیدن به یکی از این دو معیار، فایل اول بسته میشه و به صورت immutable آمادهی انتشار میشه و دادهها از اون به بعد برای نوشته شدن روی فایل دوم از اون تظریف زمانی آماده میشن. این رویکرد مشکل حجم فایل رو حل میکنه اما باز مشکلات دیگهای رو در پی خودش داره. یکی از اون مشکلات اینه که برای یک بُعد ممکنه دادهها داخل چند فایل پخش شن و برای کوئری زدن نیازه تا چند فایل خونده شه (مثلاً برای مثال قبل ممکنه Ke$ha داخل همهی فایلها باشه در زمانهای مختلف). برای حل این مشکل دروید راهحل پارتیشنبندی ثانویه رو ارائه میده.
پارتیشن -افراز- ثانویه میاد و در کنار پارتیشن اولیه قرار میگیره. یعنی اول فایلها بر اساس تظریف زمانی تعریفشده تقسیم میشن، در کنار اون، چینش دادهها داخل فایلها بستگی به پارتیشنبندی دوم داره. دروید از چهار نوع پارتیشنبندی ثانویه پشتیبانی میکنه:
نکتهی دیگهای که وجود داره اینه که انتخاب پارتیشن بستگی به Indexerی داره که برای تزریق دادهها به دروید استفاده میکنیم. برای مثال indexer کافکا فقط از نوع اول پشتیبانی میکنه و برای تغییر فرم ذخیرهسازی دادهها باید بعد از وارد کردن دادهها اونا رو reindex کرد یا براشون compaction تعریف کرد ولی برای indexerهای Batch مثل خوندن از Hadoop میشه در زمان تزریق شکل پارتیشن ثانویه رو مشخص کرد.
تا اینجا رسیدیم که سگمنتها چطور ساخته میشن و چه ساختاری دارن. یعنی گفتیم که ستونهای زمان و متریکها به صورت LZ4 ذخیره میشن و ستونهای بعد هم هر کدوم به صورت مستقل توسط Jackson قابل خوندنن و میشه به تنهایی از هر ترکیبی از اونها استفاده کرد. برای ذخیره کردن دادهها به صورت ستونی (Columnar) فرض کنید راهحل این باشه که ستونها رو تو فایلهای جدا ذخیره کنیم. این رویکرد یک مشکل بزرگ داره و اونم اینه که تعداد File Descriptorها برای خوندن فایلها بالا میره. دروید برای ذخیره کردن مستقل ستونها و در عین حال در کنترل نگه داشتن تعداد File Descriptorها میاد از تکنیکی به اسم Smoosh کردن فایلها استفاده میکنه. با Smoosh کردن چند فایل، یک فایل داده و یک فایل meta ساخته میشه که در اون آفست هر فایل ذکر شده (فایل اول از آفست ۰ تا فلان، فایل دوم از آفست فلان تا ...). در اینجا فایل متا برای هر ستون این کار رو انجام میده و به این وسیله میشه فقط ستونهایی که نیازه رو خوند.
ایونت به Middle Manager میرسه و با شرحی که رفت داخل یک Segment ذخیره میشه. بعد از اینکه یکی از معیارهای انتشار برای سگمنت اجرا شد، Middle Manager اعلام میکنه که سگمنت قابل انتشاره و اون رو داخل Deep Storage ذخیره میکنه.
کوئری به Broker میرسه و بروکر نودهای Middle Manager و Historical واجد شرایط محاسبهی کوئری رو انتخاب میکنه (در این بین اگه سگمنتی نیاز باشه که داخل هیچ نود Historicalی نباشه، Coordinator بر اساس توزیع بار و فشار، به یکی از Historicalها گفته که سگمنت مورد نیاز رو از Deep Storage بارگذاری کنه). بروکر بر اساس کوئری و محلی که دادهها حضور دارن، تعدادی سابکوئری میسازه و به نودهای مرتبط با اون سابکوئریها میده. اون نودها محاسبات رو انجام میدن و نتیجه رو به Broker تحویل میدن. در نهایت Broker نتایج رو ادغام میکنه و به درخواستکننده تحویل میده.
برای بخش پایانی فرض کنیم قراره یه بخش تحلیلی برای یک صرافی بسازیم. من از یه API رایگان برای گرفتن قیمت رمزارزها و ارزهای معروف استفاده کردم که کدش اینه:
و روالی که قراره جلو بریم اینطور چیزیه. اول دروید رو با indexer کافکا به کافکا و تاپیکی که میخوایم وصل میکنیم:
من در اینجا چون دروید رو با داکر آوردم بالا، داخل کافکا پروتکل و آدرس host.docker.internal رو دادم و پورتی که کافکا روی اون گوش بده.
وقتی دروید بتونه دادهای رو از کافکا واکشی کنه، اونا رو به صورت خام نمایش میده. بعد از اون نوبت پارس کردن دادهها میرسه. اینجا جاییه که میشه ساختارهای تودرتو رو مسطح کرد و یا اگه دادهها JSON باشن میشه با استفاده از jq مقادیر مورد نیاز رو استخراج کرد:
بعد از اون نوبت به پارس کردن زمان میرسه:
دروید خودش تلاش میکنه ستون زمان و فرمتش رو تشخیص بده. اگه نداد هم میشه با انتخاب ستون فرمتش رو مشخص کرد و یا با استفاده از تب Expression و تابع timestamp_parse ستون رو پارس کرد. بعد تبدیل و فیلتر کردن (که در اینجا نداریم) نوبت میرسه به کانفیگ کردن Schema:
در تصویر بالا من همهی ستونها رو حذف کردم و فقط ستون unit رو گذاشتم به عنوان پارتیشن بمونه. چندتا متریک هم بهش اضافه میکنم:
تظریف کوئری رو اینجا من ساعت گذاشتم. بالاخره ماجرا میرسه به کانفیگ کردن پارتیشنینگ روی فایل (که فقط از نوع پارتیشن اولیهست برای indexer کافکا) و انتشار اون:
بعد از انتشار میشه از داخل تب کوئری روی دادهها کوئری زد:
الان برای هر روز ما میتونیم بالاترین، پایینترین و میانگین قیمت (با استفاده از تقسیم جمع بر تعداد) رو داشته باشیم، در زمانی کمتر از ثانیه!
دروید یه راهحل خیلی خوب برای چیزی بین راهحلهای تحلیلی و سری زمانی محسوب میشه. معماری «اشتراک در هیچ چیز» این امکان رو میده که مقیاسپذیری هر بخش سیستم تحت کنترل و بر اساس نیاز باشه اما در کنارش بالا آوردن و نگهداری کلاستر رو هم کمی سختتر میکنه. استفاده از roll-up به عنوان یک عمل هنگام تزریق داده منجر به کاهش حجم داده و سرعت بالاتر در هنگام محاسبات میشه و امکان استفاده از DataSketchها به عنوان متریک و پشتیبانی اونا از عملیاتهای مجموعهای دست ما رو در ساختن کوئریهای تحلیلی جذاب (مثل funnel) باز میذاره، در عین حال برای داشتن تحلیلهای متفاوت تاریخی نیازه تا دیتای خام در جای دیگه نگهداری شه تا در آینده منابع جدید ساخته شن.