
ماجرا از اونجا شروع شد که یکی از سرویسهامون که مسئول پردازش رویدادهای یک تاپیک Kafka بود، دچار Lag شدید شد. با بررسی وضعیت Consumer Group مربوطه، متوجه شدیم که مصرفکنندهها در وضعیت "PREPARING_REBALANCE" گیر کردهان و در یک حلقه بیانتها از ریبالانس گیر افتادن. همین موضوع باعث میشد که lag لحظهبهلحظه بدتر بشه و سرویس عملاً از کار بیفته.
لگ در پردازش رویدادها، شرایطی بود که قبلاً هم باهاش روبرو شده بودیم، اما این بار علاوه بر کندی در پردازش، مصرفکنندهها (Consumers) در یک حلقه بیپایان از Rebalance گیر کرده بودند و در وضعیت "PREPARING_REBALANCE" باقی میموندند. این مشکل باعث تشدید لگ و بدتر شدن وضعیت سرویس شده بود.
خب حالا برای اینکه بتونیم بریم سراغ مشکل و بیاین باهم دیگه گام به گام پیش بریم.
و فرض میگیریم که شما با مفاهیم اولیه کافکا مثل Brokers , Topic , Partition , Consumer group آشنا هستید.
اول بریم ببینیم این Rebalance کردن در Kafka چه معنی داره؟ و چه زمانی اتفاق میوفته؟ و چه ساید افکتی هایی داره؟
Rebalance یک فرایند داخلی در کافکاست که طی آن پارتیشنهای یک تاپیک، بین کانسیومرهای فعال در یک گروه، مجدداً توزیع میشوند. این فرایند برای حفظ توازن و اطمینان از پردازش مستمر پیامها ضروری است و معمولاً در شرایط زیر رخ میدهد و ما برای پیدا کردن علت مشکل این دلایل رو بررسی کردیم:
تغییر تعداد کانسیومرها: شاید یک کانسیومر به گروه اضافه یا ازش کم شده باشه. ولی چون ما هیچ Scale Up یا Scale Down نداشتیم، این مورد نمیتونه باشه.
از کار افتادن اپلیکیشن: ممکنه اپلیکیشن کرش کرده باشه یا داون شده باشه. با بررسی وضعیت پادها، مطمئن شدیم که اپلیکیشن در حال اجراست و داره کار میکنه.
تغییر تعداد پارتیشنهای تاپیک: این هم میتونه باعث ریبالانس بشه، ولی تو اون بازه زمانی هیچ تغییری توی پارتیشنها نداشتیم.
خب وقتی دلایل بالا رد شدن برای پیدا کردن سرنخ بهتری سراغ لاگ های اپلیکیشن رفتیم که با با این خط مهم مواجه شدیم:
consumer poll timeout has expired.
و این شد سرنخ اصلی ما برای حل معمامون. اما این خطا به چه معناست و چرا باعث Rebalance میشه؟ برای پاسخ به این سوال، لازمه که یک قدمی به عقب بریم و ببینیم کافکا چطور وضعیت سلامت کانسیومرها رو بررسی میکنه.
کافکا برای اینکه بفهمه کدوم کانسیومرها هنوز فعالن و درست کار میکنن، دو مکانیزم اصلی داره که هر کدوم توی یک ترد جدا اجرا میشن:
1. Heartbeat Thread
2. Poll Thread (Proccesing Thread)
- خب اولی برای این طراحی شده که بفهمه کانسیومر واقعاً زنده است یا نه.
کانسیومرها توی بازههای زمانی ثابت، یک سیگنال heartbeat برای Consumer Coordinator میفرستن که این بازه زمانی هم با پارامتر زیر قابل تنظیمه:
heartbeat.interval.ms
حالا اگر Coordinator برای یک مدت طولانی این سیگنالها رو دریافت نکنه (مثلاً اپلیکیشن Shut down شده باشه یا ارتباط شبکه قطع شده باشه)، بعد از گذشت یک زمان مشخص، کانسیومر رو “dead” حساب میکنه.
این مدت زمان هم با پارامتر زیر تعیین میشه:
session.timeout.ms
وقتی این زمان تموم بشه، Group Coordinator فرایند Rebalance رو شروع میکنه و پارتیشنهای مربوط به کانسیومر از کار افتاده رو به کانسیومرهای دیگه اساین میکنه.
این دقیقاً همون سناریویی بود که اول کار، احتمال دادیم دلیل مشکل ما باشه.
پس ترد Heartbeat در اصل یک جور سیستم مانیتورینگ داخلیه که اگر کانسیومر داون بشه یا مشکل ارتباطی پیدا کنه، سریع متوجه بشیم.
- خب حالا میرسیم به ترد بعدی که اتفاقا به مشکل ما مربوط میشه یعنی poll thread
توی کافکا کانسیومر ها برای اینکه مسیج ها را از بروکر دریافت کنند از روش long polling استفاده میکنند.
کانسیومر ها به صورت دوره ای عملیات poll انجام میدن ؛ یعنی به زبان ساده، میرن یک سرشماری از بروکرها میکنن تا ببینن پیام جدیدی رسیده یا نه و بعد اون پیامها رو پردازش میکنن. این کار با صدا زدن متد ()poll انجام میشه.
حالا نکته اینجاست که اگه فاصله زمانی بین دو بار صدا زدن این متد بیشتر از یک حد مشخص بشه، کافکا فرض میکنه کانسیومرمون از گروه خارج شده و در نتیجه همون داستان قبلی اتفاق میفته؛ disconnect و در نهایت rebalance.
برای این ترد دو تا کانفیگ مهم داریم:
max.poll.interval.ms: این مربوط به همون حداکثر زمانی میشه که کانسیومر فرصت داره تا poll بعدیش رو انجام بده که به صورت پیش فرض هم روی ۵ دقیقه تنظیم شده.
یعنی مثلا اگه الان poll انجام شده کانسیومر تا حداکثر ۵ دقیقه بعد فرصت داره poll رو بازم صدا بزنه وگرنه disconnect میشه.
max.poll.records: این یکی که میتونه خیلی مهم باشه; تعداد رکوردهایی که توی هر بار صدا زدن متد poll دریافت میکنیم رو کنترل میکنه, یعنی در واقع حداکثر رکوردی که توی هر poll میتونیم پردازش کنیم.
مقدار پیش فرض این پارامتر هم ۵۰۰ رکورد هست.
حالا که میدونیم این دو تا پارامتر چه کاری انجام میدن، باید مقادیرشون رو جوری تنظیم کنیم که با سرعت پردازش اپلیکیشنمون و حجم پیامها (Throughput) هماهنگ باشه.
پس Poll Thread همون جاییه که اگر پردازش دادهها کند بشه یا گیر کنه، سریع میشه متوجه شد.
حالا با توجه به همه نکاتی که تا اینجا فهمیدیم، میتونیم برگردیم به مشکل خودمون.
ما در آخر دیدیم که به ارور poll timeout has expired خورده بودیم و حالا با توجه به دانسته های الانمون یعنی کانسیومر نتونسته()pollبعدی رو به موقع اجرا کنه و بیشتر از ۵ دقیقه درگیر پردازش پیامهایpoll قبلی بوده.
در نتیجه، مشکل اصلی مربوط به کندی پردازش پیامهاست. مثلا در سناریوی ما، یک سرویس خارجی خیلی دیر جواب میداد و همین باعث میشد پردازش طولانی بشه و کانسیومر نتونه به موقع poll() بعدی رو صدا بزنه.
برای حل مشکل، اول باید خود سرویس رو بررسی کنیم و ببینیم آیا جایی از پردازش رو میشه بهترش کرد. مثلا میتونیم از resiliency patterns استفاده کنیم تا مطمئن بشیم که سرویس دیگهای باعث گیر کردن پردازش پیامها نمیشه، یا مثلا بعضی کارها رو به صورت async اجرا کنیم.
بعد از ریفکتور کد، میتونیم سراغ تیون کردن کانفیگهای poll بریم. این کانفیگها بسته به اپلیکیشن، نرخ پیامها، میانگین زمان پردازش هر پیام، تعداد کانسیومرها و پارتیشنهای تاپیک میتونن متفاوت باشن.
مثلا اگه پردازش هر پیام طولانیه، باید یا زمان بین pollها رو بیشتر کنیم یا تعداد رکوردهایی که هر بار poll میکنیم رو کمتر کنیم. فقط یه نکته هست: اگه تعداد رکوردها خیلی کم باشه، لود شبکه برای گرفتن پیامها بیشتر میشه. در واقع این کانفیگ کمک میکنه که پیامها رو به صورت دستهای بگیریم و network overhead کمتر بشه.
https://kafka.apache.org/documentation/#consumerconfigs_max.poll.interval.ms
https://medium.com/trendyol-tech/rebalance-and-partition-assignment-strategies-for-kafka-consumers-f50573e49609