ما تا الان نحوه کار کلی کافکا، دمویی از اون و نصبش و جزییات بیشتری از نحوه ذخیره سازی پیغام ها توسط کافکا رو با هم مشاهده کردیم. در این نوشتار میخوایم موارد پیشرفته ی دیگه ای مانند نحوه تنظیم order و تضمین مصرف شدن پیغام توسط consumer رو با هم ببینیم. این جزییات برای مصارفی که در اون ترتیب دریافت پیام و گارانتی پردازش همه پیام ها برایشان ضرورت دارد بسیار کاربردی است.
در بخش های قبل دیدیم که هر تاپیک به یک یا چند پارتیشن تقسیم میشود. یک نکته ای را در مورد پارتیشن ها در نظر بگیریم هم این هست که هر پارتیشن به حداکثر یک consumer میرسد. یعنی یک consumer میتواند از تعدادی partition بخواند ولی دو consumer مختلف نمیتوانند از یک partition پیغام بگیرند. اضافه کردن consumer ها برای افزایش بار پردازشی است، چون به هر پارتیشن نهایت یک consumer متصل میشود، بنابراین برای زیاد شدن بار پردازشی لازم است متناسب با افزایش تعداد پارتیشن ها، تعداد consumer ها را نیز زیاد کرد.
پیغام های تولید شده برای هر تاپیک، براساس انتخاب بروکر به partition ای داخل آن تاپیک وارد میشود. نحوه این انتخاب پارتیشن ها، با الگوریتم های قابل تغییر انجام میشود. برای مثال نحوه نوشتن پیغام بر روی پارتیشن ها به صورت دیفالت، الگوریتم round robin است. کافکا key پیغام ها را hash کرده و بعد با استفاده از این hash و الگوریتم خود، شماره پارتیشن برای ذخیره سازی داده ها را به دست می آورد. کلیدهایی که یکسان هستند چون hash آنها ثابت است همگی در یک پارتیشن ذخیره سازی میشوند. بتابراین اگر order خواندن پیغام توسط consumer برای ما اهمیت داشته باشد، لازم است برای پیغام های تولیدی key یکسانی قرار دهیم.
در کافکا مفهومی به اسم consumer group وجود دارد که consumer هایی که میخواهند از یک topic مشترکا خوانده و پردازش کنند، با هم تشکیل یک consumer group را میدهند. بیشتر برای اینکه توان پردازشی پیام ها بالا برود این consumer group تشکیل میشود. نحوه هماهنگی پردازش پیغام های تاپیک، تحت عنوان consumer group انجام میشود. تشکیل consumer group با دادن id یکسان برای گروه به consumer ها تشکیل میشود. ایجاد consumer group برای افزایش بار پردازشی پیام ها ضروری است ولی در صورتی که یک consumer از دست برود و یا consumer دیگری بنابر نیاز اضافه شود لازم است به نحوی کافکا بداند که تا الان چه پیغام هایی توسط consumer ها خوانده شده است که با اضافه و کم شدن consumer ها پیغام تکراری ارسال نکند. این کار برای کافکا با استفاده از متغیری به نام offset commit انجام میشود که در ادامه نحوه ی کار آن را میبینیم.
با خواندن پیام ها از یک تاپیک توسط consumer، باعث میشود که offset هایی که توسط هر consumer خوانده شده است به کافکا از consumer ارسال گردد. کافکا به دلیل اینکه پیغامی را چند باره به consumer ارسال نکند و یا در صورت از دست رفتن consumer و اضافه شدن consumer جدید بتوان پردازش از سر گرفته شود این آفست ها را نگهداری میکند که با نام offset commit شناسایی میشود. کافکا اینها را در یک تاپیک سیستمی به نام __consumer_offsets برای هر consumer ای ذخیره میکند (برای consumer های داخل consumer group اینها برای گروه نگهداری میشود.) این ارسال آفست خوانده شده از سمت consumer تحت درخواستی به اسم offsetCommitRequest انجام میشود. بنابراین acknowledge در بحث consumer ها با استفاده از همین consumer offset است.
گاهی ممکن است بین این offset کامیت شده با آخرین پیام خوانده شده فاصله ای باشد چون این ارسال committed offset برای افزایش throughput به صورت asynch است و بسته به پیاده سازی library کافکا ممکن است مثلا بعد از خواندن یک batch کامل مقدار last commited offset به روز رسانی شود.
در هر پارتیشن دو تا offset دیگه هم هست به نام های log end offset که نشان میدهد offset آخرین پیامی که produce شده چیست و high watermark به آفست ای اشاره میکند که replicate شده و در اینجا نیز یک تاخیری بین پیغام های produce شده و high watermark وجود دارد.
یک سری key ها در کافکا وجود دارد که KPI هستند یعنی برای بررسی performance سیستم streaming بکار می روند. Consumer-Lag یکی از اینها است که تفاضل زمانی آخرین داده ای که خوانده شده و آخرین داده ای که produce شده است، میباشد.
Lag = Last commited Offset – Last Produced Message
این lag باید در کمترین مقدار نگهداری شود و برای مانیتور کردن سیستم لازم است مقدار این lag بررسی گردد. این lag در تاپیک سیستمی __consumer_offset توسط کافکا ذخیره میشود. این lag لازم است هواره کمینه باشد زیرا پیغام ها در کافکا based on retention هستند یعنی نهایتا تا یه بازه زمانی در کافکا ذخیره میشوند. (از دست نمیرن ولی فقط در آن بازه ی زمانی که قابل تغییر است ذخیره میشوند.)
کافکا را به چند طریق میتوان مانیتور کرد، یکی از ابزارها Burrow است که در کنار Lag تایم، نرخ produce پیغام و نرخ consume کردن پیغام را میتوان با آن مانیتور کرد و اگر تاخیر داریم متوجه بشیم کدام قسمت از سامانه مشکل دارد. burrow میتواند نمودارها را time-base نیز نشان دهد که lag را در زمان های مختلف نشان میدهد. نمودارهای partition-base نیز داریم که گاهی میتوان علت مشکل را ناشی از تنها یک پارتیشن باشد را تشخیص داد. یکی دیگر از مقادیری که میتوان مانیتور کرد lag به نرخ تولید پیام است که در ارزیابی ها بسیار مهم است.
در بحث consume پیام ها، مفهوم over consumption و under consumption داریم. این مفهوم به این اشاره میکند که گاهی پیش می آید که ما یک پیام را اصلا consume نکنیم ولی بعضی پیام ها را چندین بار consume کنیم. فرض کنیم consumer از دست میرود و commit offset ها دیر اتفاق افتاده باشد در این حالت با برگشت مجدد consumer، پیام های تکراری ارسال میگردد که over consumption است. Under consumption زمانی اتفاق می افتد که offset از دست برود برای مثال consumer تنظیم کند پیغام latest همواره ارسال گردد که در این حالت با از دست رفتن کوتاه مدت consumer یک سری پیغام ها پردازش نمیشود یا consumer دیگری اضافه شود و offset را reset کند.
در این پست به موارد و جزییات بیشتری از نحوه مصرف شدن داده ها توسط consumer ها و نیازمندی های سخت افزاری و نرم افزاری اشاره کردیم و برخی کانفیگ های مهم رو بررسی کردیم.
از اینکه وقت خود را در اختیار من قرار دادین بسیار سپاس گذارم. امیدوارم این مطالب برای شما موثر و مفید باشد. من تلاش کردم مطلب رو به ساده ترین حالت بیان کنم. در صورتی که پیشنهاد یا انتقادی در مورد نحوه ی نگارش و مطالب دارین خوشحال میشم با من در میان بذارین و هر سوال و ابهامی در مورد کافکا داشتین رو در کامنت ها بذارین که بتونم کمک کنم.