کانالهای رویداد مسیرهایی هستند که از طریق آنها رویدادها بین سرویسهای مختلف منتقل میشوند. میتوان این کانالها را مشابه خطوط لوله (Pipeline) یا صفهای پیام (Queue) در نظر گرفت که وظیفه انتقال رویدادها از یک سرویس تولیدکننده (Producer) به یک یا چند سرویس مصرفکننده (Consumer) را بر عهده دارند.
زمانی که یک Event Generator (مانند یک سرویس پرداخت یا ثبت سفارش) رویدادی را ایجاد میکند، این رویداد باید از طریق یک کانال مناسب به مقصد برسد. این کانال میتواند به صورت صف (Queue)، موضوع (Topic) یا جریان داده (Stream) پیادهسازی شود.
OrderQueue
برای پردازش سفارشها.OrderPlacedTopic
که هم توسط سرویس ارسال و هم سرویس فاکتورخوانی استفاده میشود.PaymentStream
که تاریخچه تراکنشها را نگهداری میکند.public class OrderPlacedHandler { private readonly IEventBus _eventBus; public OrderPlacedHandler(IEventBus eventBus) { _eventBus = eventBus; } public void HandleOrderPlaced(OrderPlacedEvent orderEvent) { Console.WriteLine($"Processing order {orderEvent.OrderId}"); _eventBus.PublishToQueue("OrderQueue", orderEvent); } }
2. تعریف موضوع برای اطلاعرسانی به چند سرویس
public class OrderNotifier { private readonly IEventBus _eventBus; public OrderNotifier(IEventBus eventBus) { _eventBus = eventBus; } public void NotifyServices(OrderPlacedEvent orderEvent) { _eventBus.PublishToTopic("OrderPlacedTopic", orderEvent); } }
چند مصرفکننده (Multiple Consumers): در مدل Topic، چندین سرویس میتوانند یک رویداد را دریافت کنند.
صف مرده (Dead Letter Queue - DLQ): رویدادهای پردازشنشده به یک صف جداگانه منتقل میشوند تا بعداً بررسی شوند.
حفظ ترتیب رویدادها (Event Ordering): در برخی سیستمها مانند Kafka، ترتیب رویدادها حفظ میشود.
PaymentProcessed
به سرویسهای حسابداری و انبارداری.ErrorOccurred
به سرویسهای گزارشگیری.زمانی که حجم رویدادها از حد معمول فراتر رود، سیستم با مشکلات متعددی مواجه میشود:
مشکل: عدم توانایی مصرفکنندهها در پردازش رویدادها با سرعت مناسب
راهکارها:
مثال پیادهسازی در RabbitMQ:
channel.BasicQos(prefetchSize: 0, prefetchCount: 10, global: false);
مشکل: توزیع ناعادلانه بار پردازش بین مصرفکنندهها
راهکارها:
مثال پیادهسازی در Kafka:
var config = new ConsumerConfig { BootstrapServers = "localhost:9092", GroupId = "order-processing-group", EnableAutoCommit = false }; using var consumer = new ConsumerBuilder<Ignore, string>(config).Build(); consumer.Subscribe("order-events"); var consumeResult = consumer.Consume(); Console.WriteLine($"Processing event: {consumeResult.Message.Value}"); consumer.Commit(consumeResult);
مشکل: تأخیر در پردازش رویدادهای حیاتی
راهکارها:
مثال در RabbitMQ:
var args = new Dictionary<string, object> { { "x-max-priority", 10 } }; channel.QueueDeclare("priority-queue", true, false, false, args);
مشکل: از دست رفتن پیامهای پردازش نشده
راهکارها:
مثال در RabbitMQ:
var args = new Dictionary<string, object> { { "x-dead-letter-exchange", "dlx-exchange" } }; channel.QueueDeclare("order-queue", true, false, false, args);
مشکل: ناکارآمدی در پردازش تکتک پیامها
راهکارها:
مثال در Kafka:
public void ProcessBatch(IEnumerable<ConsumeResult<Ignore, string>> messages) { foreach (var message in messages) { Console.WriteLine($"Processing batch message: {message.Value}"); } }
مشکل: چالش در پردازش بلادرنگ حجم بالای رویدادها
راهکارها:
مثال در Kafka Streams:
var builder = new StreamsBuilder(); var stream = builder.Stream<string, string>("payment-stream"); stream.GroupByKey() .WindowedBy(TimeWindows.Of(TimeSpan.FromSeconds(5))) .Reduce((value1, value2) => value1 + value2);