ویرگول
ورودثبت نام
بهرام انیژ
بهرام انیژمهندس نرم افزار
بهرام انیژ
بهرام انیژ
خواندن ۵ دقیقه·۲۴ روز پیش

سفرهای علمی: Rebalance ناخواسته در Kafka

ماجرا از اونجا شروع شد که یکی از سرویس‌هامون که مسئول پردازش رویدادهای یک تاپیک Kafka بود، دچار Lag شدید شد. با بررسی وضعیت Consumer Group مربوطه، متوجه شدیم که مصرف‌کننده‌ها در وضعیت "PREPARING_REBALANCE" گیر کرده‌ان و در یک حلقه بی‌انتها از ری‌بالانس گیر افتادن. همین موضوع باعث می‌شد که lag لحظه‌به‌لحظه بدتر بشه و سرویس عملاً از کار بیفته.

لگ در پردازش رویداد‌ها، شرایطی بود که قبلاً هم باهاش روبرو شده بودیم، اما این بار علاوه بر کندی در پردازش، مصرف‌کننده‌ها (Consumers) در یک حلقه بی‌پایان از Rebalance گیر کرده بودند و در وضعیت "PREPARING_REBALANCE" باقی می‌موندند. این مشکل باعث تشدید لگ و بدتر شدن وضعیت سرویس شده بود.

خب حالا برای اینکه بتونیم بریم سراغ مشکل و بیاین باهم دیگه گام به گام پیش بریم.

و فرض میگیریم که شما با مفاهیم اولیه کافکا مثل Brokers , Topic , Partition , Consumer group آشنا هستید.

اول بریم ببینیم این Rebalance کردن در Kafka چه معنی داره؟ و چه زمانی اتفاق میوفته؟ و چه ساید افکتی هایی داره؟


Rebalance یک فرایند داخلی در کافکاست که طی آن پارتیشن‌های یک تاپیک، بین کانسیومرهای فعال در یک گروه، مجدداً توزیع می‌شوند. این فرایند برای حفظ توازن و اطمینان از پردازش مستمر پیام‌ها ضروری است و معمولاً در شرایط زیر رخ می‌دهد و ما برای پیدا کردن علت مشکل این دلایل رو بررسی کردیم:

  • تغییر تعداد کانسیومرها: شاید یک کانسیومر به گروه اضافه یا ازش کم شده باشه. ولی چون ما هیچ Scale Up یا Scale Down نداشتیم، این مورد نمیتونه باشه.

  • از کار افتادن اپلیکیشن: ممکنه اپلیکیشن کرش کرده باشه یا داون شده باشه. با بررسی وضعیت پادها، مطمئن شدیم که اپلیکیشن در حال اجراست و داره کار میکنه.

  • تغییر تعداد پارتیشن‌های تاپیک: این هم می‌تونه باعث ری‌بالانس بشه، ولی تو اون بازه زمانی هیچ تغییری توی پارتیشن‌ها نداشتیم.


خب وقتی دلایل بالا رد شدن برای پیدا کردن سرنخ بهتری سراغ لاگ های اپلیکیشن رفتیم که با با این خط مهم مواجه شدیم:

consumer poll timeout has expired.

و این شد سرنخ اصلی ما برای حل معمامون. اما این خطا به چه معناست و چرا باعث Rebalance میشه؟ برای پاسخ به این سوال، لازمه که یک قدمی به عقب بریم و ببینیم کافکا چطور وضعیت سلامت کانسیومرها رو بررسی می‌کنه.


کافکا برای اینکه بفهمه کدوم کانسیومرها هنوز فعالن و درست کار می‌کنن، دو مکانیزم اصلی داره که هر کدوم توی یک ترد جدا اجرا می‌شن:
1. Heartbeat Thread
2. Poll Thread (Proccesing Thread)

- خب اولی برای این طراحی شده که بفهمه کانسیومر واقعاً زنده است یا نه.
کانسیومرها توی بازه‌های زمانی ثابت، یک سیگنال heartbeat برای Consumer Coordinator می‌فرستن که این بازه زمانی هم با پارامتر زیر قابل تنظیمه:

heartbeat.interval.ms

حالا اگر Coordinator برای یک مدت طولانی این سیگنال‌ها رو دریافت نکنه (مثلاً اپلیکیشن Shut down شده باشه یا ارتباط شبکه قطع شده باشه)، بعد از گذشت یک زمان مشخص، کانسیومر رو “dead” حساب می‌کنه.
این مدت زمان هم با پارامتر زیر تعیین میشه:

session.timeout.ms

وقتی این زمان تموم بشه، Group Coordinator فرایند Rebalance رو شروع می‌کنه و پارتیشن‌های مربوط به کانسیومر از کار افتاده رو به کانسیومرهای دیگه اساین می‌کنه.
این دقیقاً همون سناریویی بود که اول کار، احتمال دادیم دلیل مشکل ما باشه.

پس ترد Heartbeat در اصل یک جور سیستم مانیتورینگ داخلیه که اگر کانسیومر داون بشه یا مشکل ارتباطی پیدا کنه، سریع متوجه بشیم.

- خب حالا میرسیم به ترد بعدی که اتفاقا به مشکل ما مربوط میشه یعنی poll thread

توی کافکا کانسیومر ها برای اینکه مسیج ها را از بروکر دریافت کنند از روش long polling استفاده میکنند.


کانسیومر ها به صورت دوره ای عملیات poll انجام میدن ؛ یعنی به زبان ساده، میرن یک سرشماری از بروکرها می‌کنن تا ببینن پیام جدیدی رسیده یا نه و بعد اون پیام‌ها رو پردازش میکنن. این کار با صدا زدن متد ()poll انجام میشه.

حالا نکته اینجاست که اگه فاصله زمانی بین دو بار صدا زدن این متد بیشتر از یک حد مشخص بشه، کافکا فرض می‌کنه کانسیومرمون از گروه خارج شده و در نتیجه همون داستان قبلی اتفاق میفته؛ disconnect و در نهایت rebalance.

برای این ترد دو تا کانفیگ مهم داریم:
max.poll.interval.ms: این مربوط به همون حداکثر زمانی میشه که کانسیومر فرصت داره تا poll بعدیش رو انجام بده که به صورت پیش فرض هم روی ۵ دقیقه تنظیم شده.
یعنی مثلا اگه الان poll انجام شده کانسیومر تا حداکثر ۵ دقیقه بعد فرصت داره poll رو بازم صدا بزنه وگرنه disconnect میشه.

max.poll.records: این یکی که میتونه خیلی مهم باشه; تعداد رکوردهایی که توی هر بار صدا زدن متد poll دریافت میکنیم رو کنترل میکنه, یعنی در واقع حداکثر رکوردی که توی هر poll میتونیم پردازش کنیم.
مقدار پیش فرض این پارامتر هم ۵۰۰ رکورد هست.

حالا که می‌دونیم این دو تا پارامتر چه کاری انجام میدن، باید مقادیرشون رو جوری تنظیم کنیم که با سرعت پردازش اپلیکیشنمون و حجم پیام‌ها (Throughput) هماهنگ باشه.

پس Poll Thread همون جاییه که اگر پردازش داده‌ها کند بشه یا گیر کنه، سریع میشه متوجه شد.


حالا با توجه به همه نکاتی که تا اینجا فهمیدیم، می‌تونیم برگردیم به مشکل خودمون.

ما در آخر دیدیم که به ارور poll timeout has expired خورده بودیم و حالا با توجه به دانسته های الانمون یعنی کانسیومر نتونسته()pollبعدی رو به موقع اجرا کنه و بیشتر از ۵ دقیقه درگیر پردازش پیام‌هایpoll قبلی بوده.

در نتیجه، مشکل اصلی مربوط به کندی پردازش پیام‌هاست. مثلا در سناریوی ما، یک سرویس خارجی خیلی دیر جواب میداد و همین باعث میشد پردازش طولانی بشه و کانسیومر نتونه به موقع poll() بعدی رو صدا بزنه.


برای حل مشکل، اول باید خود سرویس رو بررسی کنیم و ببینیم آیا جایی از پردازش رو میشه بهترش کرد. مثلا می‌تونیم از resiliency patterns استفاده کنیم تا مطمئن بشیم که سرویس دیگه‌ای باعث گیر کردن پردازش پیام‌ها نمی‌شه، یا مثلا بعضی کارها رو به صورت async اجرا کنیم.

بعد از ریفکتور کد، می‌تونیم سراغ تیون کردن کانفیگ‌های poll بریم. این کانفیگ‌ها بسته به اپلیکیشن، نرخ پیام‌ها، میانگین زمان پردازش هر پیام، تعداد کانسیومرها و پارتیشن‌های تاپیک می‌تونن متفاوت باشن.

مثلا اگه پردازش هر پیام طولانیه، باید یا زمان بین pollها رو بیشتر کنیم یا تعداد رکوردهایی که هر بار poll می‌کنیم رو کمتر کنیم. فقط یه نکته هست: اگه تعداد رکوردها خیلی کم باشه، لود شبکه برای گرفتن پیام‌ها بیشتر می‌شه. در واقع این کانفیگ کمک می‌کنه که پیام‌ها رو به صورت دسته‌ای بگیریم و network overhead کمتر بشه.

منابع:

https://kafka.apache.org/documentation/#consumerconfigs_max.poll.interval.ms
https://medium.com/trendyol-tech/rebalance-and-partition-assignment-strategies-for-kafka-consumers-f50573e49609

apache kafkamessage broker
۱
۰
بهرام انیژ
بهرام انیژ
مهندس نرم افزار
شاید از این پست‌ها خوشتان بیاید