در بخش های اول و نصب، معماری کلی و نحوه ی نصب کافکا بیان شد. در این بخش دمویی از انتقال پیام بین consumer و producer ها و سپس مفاهیم عمیقتر و معماری جزیی تری از کافکا آمده است.
Kafka Client Library
در ابتدا کافکا تنها برای جاوا و scala قابل استفاده بود ولی اکنون اغلب زبان های محبوب مانند C/C++، Go، Java،، .NET، Python و Scala در کنار سایر زبان هایی مانند Erlang, Groovy, Haskell, Kotlin, Lua, Node.js, OCaml, PHP, Ruby, Rust, Tcl, Swift قابلیت استفاده از کافکا را دارند.
برای نوشتن دموی اولیه از لایبرری Kafka Client استفاده میکنیم. این لایبرری از سه قسمت تشکیل شده است. کارهای administrative مانند ایجاد تاپیک، مربوط به بخش Admin، تولید پیغام مربوط به Producer و مصرف پیغام مربوط به Consumer است.
برای بررسی دقیق ویژگی های موجود در کافکا میتوان به سایت که توسط سازنده های کافکا ایجاد شده است، مراجعه کرد.
برای ارسال و دریافت پیغام نیاز به Topic ای برای نگهداری پیام، Producer ای که پیغام را send کند و consumer ای که پبغام را poll(دریافت) کند، داریم.
برای این منظور دو/سه برنامه مینویسیم. من با استفاده از زبان جاوا ورژن 11 این برنامه ها را نوشته ام.
برای ساخت Topic میتوان با استفاده از CLI، Kafka tools و یا کد نویسی استفاده شود. تجربه من در شرکتم این هست که تاپیک ها با استفاده از kafka tools در هنگام عملیاتی شدن ساخته میشوند.
در همه حالتها نیز لازم است، آدرس بروکر را در غالب متغیر bootstrap.server داشته باشیم.
در صورت استفاده از CLI میتوان بعد از بالا بودن کافکا با استفاده از دستور زیر تاپیک مورد نظر را ساخت.
kafka-topics.sh --create --topic <topic-name> --partitions <num-partitions> --replication-factor <replication-factor> --zookeeper <zookeeper-host:port>
در اینجا <topic-names> نام تاپیک مورد نظر، <num-partitions> اندازه ی partition و سایر متغیر ها نیز به ترتیب تعداد replication مورد نیاز و آدرس zookeeper و port ای روی آن بالا آمده است، میباشد.
در صورت استفاده از kafka tools که به نظرم برای این کار بهترین ابزار میباشد، ابتدا kafka-tools را نصب کرده سپس لازم است یک connection جدید با استفاده از انتخاب Add new connection از منوی File ساخته شود. نحوه ساخت connection به صورت زیر است.
در اینجا لازم است آدرس zookeeper، پورت آن و نامی برای کلاستر انتخاب شود. بعد از اتصال به آن در قسمت topic میتوان تمامی تاپیک های ایجاد شده را مشاهده کرد و یا تاپیک جدیدی را ایجاد کرد.
در منوی سمت چپ از cluster اضافه شده، ابتدا لیست بروکر ها(در اینجا من براساس نصب اشاره کردم که تنها یک بروکر را بالا می آورم که آدرس و پورت آن آمده است) سپس لیست تاپیک ها( که در اینجا خالی است) آمده است که با right click بر روی Topics و انتخاب گزینه ی create topics پنجره نشان داده میشود(شکل بالا). نام topic، تعداد replication ها از آن و مقدار partition نیز قابل مقداردهی میباشد و با گزینه ی Add، تاپیک مورد نظر اضافه میشود.
حالت سومی نیز برای ایجاد تاپیک وجود دارد که با استفاده از کدنویسی است. واقعیت من چون تجربه کمی در کافکا دارم تا کنون ندیده ام با کدنویسی تاپیک نیز ساخته شود در این صورت کد زیر قابل استفاده است. ابتدا لازم است Admin کانفیگ شود که برای اینکار آدرس بروکر را قرار میدهیم . سپس با استفاده از متد createTopic از admin یک تاپیک ایجاد میکنیم. برای ساخت تاپیک فعلا نیاز به نام آن تنها داریم و بقیه ی موارد آن را خالی قرار میدهیم. من در اینجا یک تاپیک با نام "my_topic1" ایجاد کردم. چون کافکا را بر روی سیستم خود تنها اجرا کرده ام آدرس بروکر نیز localhost است که بر روی پورت 9092 قابل استفاده است. کد این قسمت در زیر آمده است.
public static void main(String[] args) { Admin admin = Admin.create(Map.of("bootstrap.servers", "localhost:9092")); admin.createTopics(Arrays.asList(new NewTopic("my_topic1", Optional.empty(), Optional.empty()))); }
تا الان ما تاپیک را ایجاد کردیم در مرحله ی بعد لازمه پیغام را produce کرده به topic ایجاد شده ارسال کنیم. در اینجا کدهای لازم producer و consumer با استفاده از جاوا آمده است ولی در صورتی که بر بستر اسپرینگ کار میکنین spring kafka بسیار ساده تر است که در این نوشتار به اون نمیپردازیم.
پیغام با استفاده از ProducerRecord تولید میشه و از طریق KafkaProducer ارسال میشه. حالا برنامه ی دیگه ای مینویسیم که پیغام را تولید و ارسال کند. برای تولید پیغام نام تاپیک و محتوای پیغام کافی است. ایجاد KafkaProducer نیاز به آدرس بروکر، Serializer برای key و value دارد. پیغام ایجاد شده نیز توسط متد send از KafkaProducer به تاپیک ارسال میشود. در انتها نیز برای آنکه پیغام های buffer شده یکجا ارسال شود، آن را flush میکنیم. در ادامه دلیل وجود serializer ها و فلاش را میگم. با اجرای برنامه پیام "Hi hi" در تاپیک قرار میگیرد. کد این قسمت از برنامه در زیر آمده است.
public static void main(String[] args) { Properties prop = new Properties(); prop.put("bootstrap.servers", "localhost:9092"); prop.put("key.serializer", StringSerializer.class); prop.put("value.serializer", StringSerializer.class); KafkaProducer<String, String> kafkaProducer = new KafkaProducer(prop); ProducerRecord<String, String> producerRecord1 = new ProducerRecord<String, String>("my_topic1", "Hi hi");
kafkaProducer.send(producerRecord1); kafkaProducer.flush(); kafkaProducer.close(); }
تا اینجا تاپیک و تولید کننده ی پیغام را ساخته ایم. در ادامه برنامه ی دیگری مینویسیم که Consumer را ساخته و به تاپیک subscribe کند. ساخت Consumer با استفاده از کلاس KafkaConsumer انجام میشود که به آدرس بروکر، نام گروهی که به آن تعلق دارد و desrializer برای deserialize شدن key و value نیاز دارد و از طریق متد subscribe به تاپیک مورد نظر subscribe شده. Fetch کردن دیتا از تاپیک (یا پارتیشن خاصی در تاپیک) با استفاده از متد poll است. پیغام در غالب کلاس ConsumerRecord دریافت میشود. هر پیغام حاوی value یا مقدار پیغام، شماره ی پارتیشن(partition)، شماره آفست (offset) است. ما در این برنامه فقط میخواهیم همه پیغام های رسیده را چاپ کنیم. چون میخواهیم هر پیغامی که آمده باشد را بگیریم، انتظار دریافت پیغام را در یک loop قرار داده ایم، بنابراین کد زیر را اجرا کرده و منتظر ارسال پیغام میمانیم. سپس با اجرای برنامه ی تولید پیغام، پیغام ها را به تاپیک ارسال میکنیم و میبینیم که پیغام بلافاصله به دست consumer میرسد.
public static void main(String[] args) { String consumerGroup = "dispatch-service" Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", consumerGroup); props.put("key.deserializer", StringDeserializer.class); props.put("value.deserializer", StringDeserializer.class); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props); kafkaConsumer.subscribe(List.of("my_topic1")); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll ( Duration.ofMillis ( 1000 )); for (ConsumerRecord<String, String> record : records) { System.out.printf("Message %s at offset %d of partition %d%n", record.value(), record.offset(), record.partition()); } } }
با اجرای برنامه ها به ترتیب تولید تاپیک، ساخت consumer و ساخت producer، خروجی زیر که نشان دهنده ی رسیدن پیغام به consumer است در کنسول consumer چاپ میشود.
کافکا ترتیب پیغام ها را در هر partition تضمین میکند. بنابراین اگر لازم است پیغامی به ترتیب به دست مصرف کننده برسد لازم است key آنها یکسان باشد.
در صورتی که بروکر بالا نباشد و producer پیغام را ارسال کند و ack از جانب بروکر به دست producer نرسیده باشد(در صورت فعال بودن)، producer فرض میکند پیغام به دست بروکر نرسیده یا خطایی پیش آمده، به همین دلیل به صورت اتوماتیک پیغام را مجدد ارسال میکند. این تلاش مجدد به تعداد retries انجام میشود که در غالب متغیری به نام "retries" قابل تنظیم است. در کنار این retries زمان کل ارسال پیغام از زمانی که با متد send ارسال شود تا ack به دست producer برسد یک Timeout دارد که آن نیز با استفاده از متغیر "delivery.timeout.ms" قابل تنظیم است. در صورتی که timeout.ms سپری شود فارغ از اینکه چه تعداد retries برای ارسال پیغام انجام شده، پیغام ارسال نشده فرض میشود.
برای مثال حالتی را در نظر بگیرید که retries = 5 و timeout =5ms باشد و زمان 5ms از send شدن پیغام گذشته باشد در حالی که تنها 2 بار تلاش مجدد برای ارسال پیغام صورت گرفته باشد و ack فعال باشد، پیغام ارسال نشده فرض میشود و خطا برمیگردد. به صورت دیفالت timeout= 2 minutes و retires=Integer.MAX_VALUE است. یعنی به صورت دیفالت تا زمانی که timeout نرسد، producer به هر تعداد مجدد برای ارسال پیغام تلاش میکند. Producer برای همه ی خطاها مجدد ارسال نمیکند برای مثال اگر serializer اشتباه ست شده باشد ارسال مجدد تاثیری ندارد و برای این خطا پیغام مجدد ارسال نمیشود.
در کافکا producer برای هر retry به اندازه ی retry.backoff.ms صبر کرده و سپس تلاش بعدی را انجام میدهد. این زمان جلوی ارسال مجدد و دائم در یک چرخه را میگیرد.
در کنار این مفهوم و متغیرهای زمانی، کافکا متغیر دیگری به نام "max.in.flight.requests.per.connection" نیز دارد. این متغیر به معنی حداکثر تعداد ارسال های unacknowledged (بدون دریافت ack برای آن) است. در صورتی که این مقدار از 1 بزرگتر باشد، به این معنی است که ترتیب قرار گیری پیغام ها در partitionها به همان ترتیب ارسال ممکن است نباشد. برای مثال حالتی را فرض کنید که این متغیر 2 باشد یعنی میتوان دو request در یک ارتباط با بروکر ارسال کرد و برای دومی محدودیتی برای اینکه اولی توسط بروکر ذخیره شده است را نداشت. حال producer این دو درخواست را ارسال میکند و پیغام های درخواست اول fail میشود ولی درخواست دوم در بروکر ذخیره میشود بنابراین پیغام های سری دوم قبل از پیغام های اول در بروکر ذخیره میشود و ترتیب ذخیره سازی پیغام ها به همان ترتیب ارسال نشده است.
در demo بالا، پیغام بعد از ارسال flush نیز شده.
kafkaProducer.send(producerRecord1);
kafkaProducer.flush();
برای علت اینکار لازمه flow ارسال پیغام توسط producer را با جزییات بیشتر بررسی کنیم.
کافکا چون میخواد سرعت انتقال بالایی داشته باشه، پیغام ها را به صورت batch و دسته ای ارسال میکنه. چون ارسال هر پیغام به تنهایی سربار زیادی داره. بنابراین به ازای پیغام های هر پارتیشن، آن ها را در بافری نگهداری میکنه و بعد همه را با هم به آن پارتیشن میفرسته.
در کنار serialize کردن پیغام ها، میتوان آنها را compress و حجم آنها را کاهش داد. برای اینکار لازم است الگوریتمی که برای compress به کار میرود را با استفاده از متغیر "compression.type" ست کرد. با ست شدن این متغیر، کافکا هر پیغامی که در buffer نگهداری میکند و هم batch ارسالی را compress میکند. کافکا اینکار را تنها برای سمت producer انجام میدهد و سمت consumer بی تغییر باقی میماند. کاهش حجم باعث افزایش بار بر روی cpu و زمان برای compress شدن دارد ولی چون سربار cpu برای تولید پیغام بالا نیست اغلب پیغام ها به صورت compress ارسال میشود.
در بخش قبل گفتیم consumer ها offset پیغام دریافتی را برای بروکر تاپیک ارسال میکنند. در ادامه جزییات بیشتری از نحوه ی تعامل consumer با کافکا آمده است.
اولین consumer ای که به کافکا متصل میشود، کافکا یک بروکر به آن اختصاص میدهد که اصطلاحا Group Coordinator برای consumer های یک گروه نامیده میشود. این coordinator لیست تمامی consumer های همگروه و متصل را نگه میدارد و با اضافه یا حذف شدن consumer ها این لیست را به روز میکند.
زمانی که یک consumer حذف میشود، این coordinator کار partition rebalance را انجام میدهد. برای partition rebalance ابتدا تمامی تخصیص های انجام شده برای پارتیشن ها از بین میرود.(به این partition revoke میگویند) سپس coordinator لیست consumerها را به یکی از consumer ها به نام consumer g leader میفرستند. (Consumer leader هم اولین consumer ای هست که گروه را ایجاد کرده)
این consumer مسئول اختصاص partition به consumerهای همگروه است و بعد لیست این partitionهای تخصیص یافته را برای coordinator باز میگرداند.
این coordinator نیز براساس لیست بازگشته، partitionها را تخصیص میدهد.
در مورد rebalance شدن لازمه به این نکات توجه بشه:
هر خطایی که باعث fail شدن اپلیکیشن مصرف کننده شود لازم است close روی آن صدا زده شود تا coordinator سریعتر partition rebalance را انجام دهد. گاهی این مشکل خطای فیزیکی است و سرور نتوانسته به کافکا خبر دهد. در این حالت coordinator باید به نحوی این را متوجه شود. روش کافکا برای اینکار heart beat است. consumer ها هر چند مدت یکبار یک پیغامی مبنی بر سالم بودن به coordinator ارسال کند.
زمان بین ارسال ها توسط متغیری به نام "heartbeat.interval.ms" قابل تنظیم است. در کنار این interval ها مدت زمانی نیز لازم است به عنوان timeout لحاظ شود که بعد از عدم ارسال heartbeat، کافکا consumer را از دست رفته فرض کند. این مدت زمان نیز توسط متغیر "session.timeout.ms" تعیین میشود. بنابراین heartbeat.interval لازم است کوچکتر از session.timeout باشد وگرنه coordinator آن را حذف کرده و partition rebalance انجام میدهد.
در این پست دموی کوچکی از نحوه ی ارسال و دریافت پیغام ها آمد. نحوه ی کارکرد کافکا برای ارسال و دریافت ها با جزییات بیشتر بیان شد. کافکا سه خصیصه برای خود بیان کرده بود که شامل fault tolerance، high availabilibty و consistency است. بعضی از این خصیصه ها با این موارد تا کنون گفته شده پوشش داده نمیشن برای مثال در صورتی که بروکر fail شود این موارد نقض میشه. در پست سوم ویژگی های دیگه ای از کافکا بررسی میشه که در صورت بروز مشکل در بروکر باز هم ویژگی های اصلی کافکا حفظ بشن.