
Apache Kafka یک Event-Streaming Platform اپن سورس و بسیار محبوب است که ترکیب Horizontal Scaling و Strong Ordering آن را به ابزار قدرتمندی برای ایجاد معماری های مدرن، Reactive و Event-Driven تبدیل کرده است. طی سال هایی که با کاربران Kafka کار کرده ام چالش اصلی که بیشتر اوقات به آن اشاره می کنند مربوط به Consumer Groupها و افزایش تعداد Consumerها است.
Consumer Groupها برای تحویل مرتب جریان رویدادها به یک گروه از Consumerها طراحی شده اند. در این روش Kafka بررسی می کند که Consumerها در یک گروه به چه Topicهایی مشترک شده اند و Partitionهای این تاپیک ها را میان اعضای گروه توزیع می کند. هر Consumer در یک گروه دسترسی انحصاری به پارتیشن هایی که به آن اختصاص داده شده دارد. یعنی اینکه حداقل باید به تعداد Consumerها، پارتیشن وجود داشته باشد. در نتیجه برخی از کاربران دست به Over-Partitioning می زنند تا بتوانند تعداد بیشتری Consumer را اضافه کنند و با Peak Load مقابله کنند. این کار برای افزایش Throughput و Ordering بسیار مفید می باشد، اما بسیاری از افراد همیشه به این سطح از تضمین های Kafka نیازی ندارند. خیلی ها فقط چون کافکا این قابلیت رو دارد از آن استفاده می کنند، حتی وقتی نیاز خاصی به آن ندارند!
اKIP-932 یک نوع جدید از گروه ها به نام Share Groupها را معرفی می کند. این گروه ها جایگزین Consumer Groupها نیستند، بلکه یک گزینه جدید هستند که به کاربران این قابلیت را می دهند تا بر اساس Consumption Behavior مورد نظرشان یکی از این دو روش را انتخاب کنند
در یک Share Group، Consumerها بهصورت Cooperative با یکدیگر کار می کنند، درست مانند چیزی که در Traditional Message Queueها دیده اید. شما می توانید Consumerها را طوری طراحی کنید که با استفاده از این گروهها، داده ها را از Topicها دریافت کنند. در این روش دیگر نیازی نیست نگران پارتیشن ها باشید. فقط کافی است تعداد Consumerها را افزایش دهید تا مقیاس پذیری بیشتر شود(البته به شکل منطقی). این کار دقیقا مانند حالتی است که در یک Message Queue چندین Consumer هم زمان از پیام ها استفاده می کنند.
در یک Share Group همه ی Consumerها به تمام پارتیشن ها دسترسی دارند، اما پیام ها بین آن ها تقسیم می شوند، یعنی هر پیام فقط توسط یکی از Consumerها پردازش می شود. این روش بسیار شبیه به چیزی است که در JMS بهعنوان Durable Shared Subscription می شناسیم.
Consumerها می توانند دریافت رکوردها را به صورت جداگانه تایید کنند، اما معمولا این کار را به صورت Batch انجام می دهند که باعث بهبود Efficiency می شود.
وقتی از Share Group استفاده می کنیم، رفتار سیستم خیلی شبیه به Message Queue خواهد بود. در یک Message Queue پیام ها همیشه در وضعیت های مختلفی قرار دارند، حتی اگر آن ها را به شکل مستقیم مشاهده نکنیم.
فرض کنید یک سیستم Message Queue داریم که از JMS API پشتیبانی می کند. وقتی یک پیام در صف قرار می گیرد، اول قابل دریافت هست. بعد از اینکه یکی از Consumerها آن را پردازش کرد، از نظر منطقی از Queue حذف می شود، اما هنوز به صورت فیزیکی حذف نشده و برای بقیه قابل مشاهده نمی باشد. وقتی Consumer دریافت پیام رو تایید کند، پیام به شکلی واقعی حذف می شود. پس در این فرآیند هر پیام چندین وضعیت مختلف رو تجربه می کند.
حالا Queues for Kafka هم تقریبا همین ایده را دارد. این ویژگی وضعیت رکوردهای در حال تحویل (In-Flight Records) را مدیریت می کند. در این مدل Share Groupها مسئولیت کنترل و تحویل این رکوردها را دارند، درست مانند چیزی که در یک Message Queue سنتی اتفاق می افتد.
هر Share-Partition یکسری رکورد In-Flight دارد که با دو مقدار Start Offset و End Offset مشخص می شوند. وقتی یک Consumer پیام ها را دریافت و پردازش می کند، Start Offset به جلو حرکت کرده و همزمان End Offset هم جلوتر می رود.
این فاصله بین Start Offset و End Offset همیشه یک حد مشخصی دارد تا تعداد پیام های در حال پردازش کنترل شود و Resource Management بهینه تر باشد. در واقع می توانیم این بخش را مثل یک Sliding Window در نظر بگیریم که همراه با پردازش داده ها حرکت می کند.
در حالت ایده آل این Sliding Window همیشه نزدیک به انتهای پارتیشن قرار دارد، یعنی Consumerها همگام با تولید داده های جدید پیام ها را پردازش می کنند و هیچ تاخیری در روند پردازش داده ها وجود ندارد.
وقتی یک Consumer در یک Share Group یک رکورد را دریافت می کند، آن رکورد به طور موقت در اختیارش قرار می گیرد. یعنی رکورد یک وضعیت جدید پیدا می کند که این وضعیت موقتی بوده و به طور پیش فرض ۳۰ ثانیه زمان می برد تا پردازش شده و تایید دریافتش ارسال شود.
اگر Consumer پیام را پردازش کرده و تایید بدهد، رکورد به وضعیت بعدی می رود و کارش تمام می شود. اما اگر Consumer کرش کند یا نتواند پیام را پردازش کند، آن رکورد دوباره در Queue قرار می گیرد و یک Consumer دیگر سعی می کند آن را پردازش کند. Kafka تعداد Retryها رو دنبال کرده و به طور پیش فرض هر رکورد 5 بار شانس پردازش دارد. اگر بعد از این تلاش ها باز هم پردازش نشود، آرشیو شده و دیگر تحویل داده نمی شود.
این مکانیزم باعث می شود که پیام های خراب باعث کرش کل سیستم نشوند و فقط یک اختلال موقتی ایجاد کنند.
چرا Kafka یک مفهوم جدا به اسم Queue معرفی نکرد؟
خب Kafka نمی خواست یک چیز جدید به اسم Queue را در کنار Topic اضافه کند. به همین خاطر در KIP-932 تصمیم گرفته شد که Share Groupها معرفی شوند. یعنی یک روش جدید برای مصرف رکوردها از Topicها، بدون اینکه نیازی به تغییر در Producerها باشد.
اگر دو Share Group به یک Topic متصل شوند، چی اتفاقی می افتد؟
در این حالت عملا دو Queue مستقل خواهیم داشت که هر کدام از آن ها رکوردهای خودشان رو مدیریت می کنند. یعنی هر گروه به صورت جداگانه پیام ها را پردازش می کند و با گروه دیگر تداخل نخواهد داشت.
آیا میتوان Consumer Groupها و Share Groupها را بر روی یک Topic استفاده کرد؟
بله، این کار قابل انجام است.
Share Groupها مدل Point-to-Point می باشند یا Publish/Subscribe؟
از نظر JMS این روش Publish/Subscribe محسوب می شود. اما اگر فقط یک Share Group به یک Topic متصل باشد و هیچ Consumer Group دیگری وجود نداشته باشد، از نظر رفتاری بیشتر شبیه به Point-to-Point عمل می کند.
آیا Share Groupها از Ordering پشتیبانی می کنند؟
خیر، پیام ها به صورت Unordered تحویل داده می شوند. البته Ordering بر اساس Key-Based Ordering می تواند در آینده یک تغییر کاربردی باشد، اما در KIP-932 وجود ندارد. احتمالا در آینده در یک KIP دیگر بررسی شود.
آیا Share Groupها از Exactly-Once Semantics پشتیبانی می کنند؟
در حال حاضر خیر. تحویل پیام ها به صورت At-Least-Once انجام می شود. البته پیاده سازی آن را در آینده ممکن می باشد.
آیا کاربران سیستم هایی مانند ActiveMQ می توانند به راحتی به Kafka مهاجرت کنند؟
نه به این سرعت! Kafka الان می تواند بسیاری از سناریوهایی که قبلا نیاز به یک Message Queue جداگانه داشتند رو پوشش دهد، اما هنوز باید یک Kafka Client Application برای مصرف پیام ها نوشت. اگر صرفا برای مدیریت Queueها یک MQ Broker در کنار Kafka داشتید، شاید دیگر نیازی به آن نداشته باشید، ولی این به معماری سیستم بستگی دارد.
KIP-932: https://cwiki. apache. org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
Andrew Schofield - Medium: https://lnkd. in/eVPfCpX4
KAFKA-16092: https://issues.apache. org/jira/browse/KAFKA-16092