معماری event driven معماری ایه که توش میکروسرویس ها به مجموعهای خاص از رویدادها (events) واکنش نشان می دن. تو ساخت میکروسرویسها، رویدادها اطلاعرسانیهایی هستن که تغییرات وضعیت توی سیستم شما را منعکس میکنن. تو این معماری، یک میکروسرویس عملیاتی را انجام میده که یک رویداد تولید میکنه، در حالی که میکروسرویس دیگری برای آن رویداد گوش میده و مطابق آن عملیاتی را انجام میده. بنابراین، اصطلاح "مبتنی بر رویداد" یا event driven از همین مفهوم سرچشمه میگیره، جایی که اجزای سیستم در واکنش به رویدادها عمل میکنن.
به عنوان مثال تصور کنید که تو یه شرکت خدمات مالی کار میکنید که هر روز تعداد زیادی از تراکنشهای کارت اعتباری تسویه میکنه. شما میخواید از فعالیتهای مخرب مشتریان خود جلوگیری کنید. وقتی که تعداد زیادی تراکنش در یک بازه زمانی کوتاه اتفاق میافته، شما میخواهید از طریق یک اعلان به صاحب کارت در مورد فعالیت مشکوک اطلاع بدید. توی این مثال هر تراکنش یه event هستش و ما باید اپلیکیشنی طراحی کنیم که با پراسس event ها، به اونا پاسخ صحیحی بده.
یک روش رایج برای پیادهسازی این معماری اینه که event ها رو به کافکا ارسال کنیم و میکروسرویسی بنویسیم که رویداد های مشکوک رو تشخیص می ده و یه ایمیل امنیتی به صاحبان حساب می فرسته. شکل پایین معماری کلی این اپلیکیشن رو نشون می ده.
در اینجا میکروسرویسی که نوشتیم باید اصطلاحا state هر کارت رو ذخیره کنه، یعنی باید بدونه که هر کارت توی ۳۰ ثانیه اخیر چند تا تراکنش داشته. بعلاوه میکروسرویس ما وظیفه ارسال کردن ایمیل رو هم بر عهده داره. می دونیم که مقایس کردن اپلیکیشن های stateful کار چالشی ای هستش. از طرفی اگر تغییری در منطق ارسال ایمیل بدیم باید کل میکروسرویس، اعم از بخشی که وظیفه پروسس event ها و ذخیره state رو داره، آپدیت شه. آیا می تونیم بهتر عمل کنیم ؟ جواب بله هست، اما قبل از اینکه راه حلمون رو شرح بدیم بهتره یکم راجب کافکا صحبت کنیم.
کافکا یه پلتفرم متنبازه که برای پردازش و مدیریت جریانهای داده در مقیاس بالا طراحی شده. تو کافکا، دادهها به صورت تاپیک سازماندهی میشن. تایپک ها اطلاعات مربوط به یه موضوع خاص رو نگه میدارن. منتشرین (publishers) پیامها رو به تاپیک ها مشخص میفرستن و مشترکین (subscriber) این پیامها رو از همون تاپیک ها دریافت میکنن. این مدل انتشار-اشتراک، publisher-subscriber، باعث میشه سیستمها بتونن بهطور غیرهمزمان (asynchronous) با هم ارتباط داشته باشن. به عبارتی، publisher ها نیازی ندارن بدونن کدوم subscriber ها پیامها رو میگیره، و subscriber ها هم نیازی ندارن نگران publisher ها باشن. این ساختار باعث انعطافپذیری بالای کافکا میشه و به میکروسرویسها اجازه میده به راحتی دادهها رو تبادل کنن و بدون وابستگی به هم کار کنن. علاوه بر این، کافکا میتونه دادهها رو به صورت توزیعشده ذخیره کنه و از خطاهای احتمالی جلوگیری کنه، که این ویژگیها اونو برای سیستمهای بزرگ و پیچیده ایدهآل کرده.
کافکا استریمز یه کتابخونه جاواست که برای پردازش جریانهای داده ها در کافکا طراحی شده. با استفاده از کافکا استریمز میتونید دادههایی که از تاپیکها میخوانید رو پردازش و تحلیل کنید، فیلتر (filter) کنید، یا حتی ترکیب (join) و گروهبندی (aggregate) کنید. این کتابخونه به شما این امکان رو میده که بدون نیاز به سرویسهای جداگانه پردازش جریانهای پیچیده رو درون خود کافکا انجام بدید. همچنین کافکا استریمز میتونه وضعیت پردازشها رو در یک پایگاه داده محلی ذخیره کنه تا بتونید پردازشها رو در صورت نیاز از سر بگیرید.
ksqlDB یه موتور SQL هست که روی API کافکا استریمز ساخته شده و کوئریهای SQL رو به دستورات قابل اجرا در API کافکا استریمز تبدیل میکنه. این قابلیت باعث میشه که کاربران بتونن مستقیماً با استفاده از SQL روی استیریم ها کار کنن، بدون اینکه نیازی به استفاده از زبانهای برنامهنویسی مثل جاوا یا اسکالا باشه.
ما میخوایم وظیفه پروسس کردن event ها و ذخیره state رو به ksqlDB واگذار کنیم و تنها میکروسرویسی بنویسیم که با دریافت event های مشکوک، به صاحبان حساب ایمیل ارسال می کنه. شکل زیر شمای کلی معماری رو نشون میده.
قبل از ادامه کار، باید استک کافکا و سرور ksqlDB رو با استفاده از داکر بالا بیارید. پیشنهاد می کنم برای اینکار به داکیومنت های confluent یه سری بزنید. confluent ارائه دهنده پلتفرم data streaming هستش. به بیان ساده، کافکا رو به عنوان سرویس به مشتری ارائه میدن تا شما نیاز نداشته باشید اون رو نصب یا نگه داری کنید.
با اجرای کامند بعدی می تونید با استفاده از CLI به سرور ksqlDB وصل شید.
docker exec -it ksqldb-cli ksql <KSQL_SERVER_ADDRESS>
کامند پایین رو حتما اجرا کنید تا از اولین پیام در استریم پیام ها رو بخونید.
SET 'auto.offset.reset' = 'earliest';
حالا باید استریمی برای تراکنش ها بسازیم. هر تراکنش شماره کارت، آدرس ایمیل، و میزان تراکنش رو داره. می تونیم برای ساخت استریم تراکنش ها کامند SQL پایین رو وارد CLI کنیم.
CREATE STREAM transactions ( tx_id VARCHAR KEY, email_address VARCHAR, card_number VARCHAR, timestamp VARCHAR, amount DECIMAL(12, 2) ) WITH ( kafka_topic = 'transactions', partitions = 8, value_format = 'avro', timestamp = 'timestamp', timestamp_format = 'yyyy-MM-dd''T''HH:mm:ss' );
در ادامه کامند زیر رو داخل ksql CLI وارد کنید تا تعداد تراکنش غیرواقعی داخل استریممون داشته باشیم.
INSERT INTO transactions ( email_address, card_number, tx_id, timestamp, amount ) VALUES ( 'michael@example.com', '358579699410099', 'f88c5ebb-699c-4a7b-b544-45b30681cc39', '2020-04-22T03:19:58', 50.25 ); INSERT INTO transactions ( email_address, card_number, tx_id, timestamp, amount ) VALUES ( 'derek@example.com', '352642227248344', '0cf100ca-993c-427f-9ea5-e892ef350363', '2020-04-22T12:50:30', 18.97 ); INSERT INTO transactions ( email_address, card_number, tx_id, timestamp, amount ) VALUES ( 'colin@example.com', '373913272311617', 'de9831c0-7cf1-4ebf-881d-0415edec0d6b', '2020-04-22T09:45:15', 12.50 ); INSERT INTO transactions ( email_address, card_number, tx_id, timestamp, amount ) VALUES ( 'michael@example.com', '358579699410099', '044530c0-b15d-4648-8f05-940acc321eb7', '2020-04-22T03:19:54', 103.43 ); INSERT INTO transactions ( email_address, card_number, tx_id, timestamp, amount ) VALUES ( 'derek@example.com', '352642227248344', '5d916e65-1af3-4142-9fd3-302dd55c512f', '2020-04-22T12:50:25', 3200.80 ); INSERT INTO transactions ( email_address, card_number, tx_id, timestamp, amount ) VALUES ( 'derek@example.com', '352642227248344', 'd7d47fdb-75e9-46c0-93f6-d42ff1432eea', '2020-04-22T12:51:55', 154.32 ); INSERT INTO transactions ( email_address, card_number, tx_id, timestamp, amount ) VALUES ( 'michael@example.com', '358579699410099', 'c5719d20-8d4a-47d4-8cd3-52ed784c89dc', '2020-04-22T03:19:32', 78.73 ); INSERT INTO transactions ( email_address, card_number, tx_id, timestamp, amount ) VALUES ( 'colin@example.com', '373913272311617', '2360d53e-3fad-4e9a-b306-b166b7ca4f64', '2020-04-22T09:45:35', 234.65 ); INSERT INTO transactions ( email_address, card_number, tx_id, timestamp, amount ) VALUES ( 'colin@example.com', '373913272311617', 'de9831c0-7cf1-4ebf-881d-0415edec0d6b', '2020-04-22T09:44:03', 150.00 );
برای ساخت جدول کامند زیر رو وارد کنید
CREATE TABLE possible_anomalies WITH ( kafka_topic = 'possible_anomalies', VALUE_AVRO_SCHEMA_FULL_NAME = 'io.ksqldb.tutorial.PossibleAnomaly' ) AS SELECT card_number AS `card_number_key`, as_value(card_number) AS `card_number`, latest_by_offset(email_address) AS `email_address`, count(*) AS `n_attempts`, sum(amount) AS `total_amount`, collect_list(tx_id) AS `tx_ids`, WINDOWSTART as `start_boundary`, WINDOWEND as `end_boundary` FROM transactions WINDOW TUMBLING (SIZE 30 SECONDS, RETENTION 1000 DAYS) GROUP BY card_number HAVING count(*) >= 3 EMIT CHANGES;
این دستور به صورت زیر کار میکنه:
با کامند پایین می تونید ببینید جدول anomaly ها چی تو خودش داره.
SELECT * FROM possible_anomalies EMIT CHANGES;
خروجی باید یه سطر با شماره کارت 358579699410099
رو نشون بده.
کار ما تموم شده. تنها کاری که میکروسرویسمون باید انجام بده خوندن مسیج ها از استریم جدیده. در نهایت، توجه داشته باشید که ksqlDB هر بار که پنجرهی متوالی تغییر میکنه، یه رویداد جدید تولید میکنه. ksqlDB تغییرات جدید رو بهطور مداوم مبتنی بر وضعیت منتشر میکنه. مثلاً، اگه یک anomaly تشخیص داده بشه چون سه تراکنش کارت اعتباری در یه بازه زمانی خاص پیدا شده باشه، یک رویداد از جدول منتشر میشه. اگه یه تراکنش چهارم تو همون بازه زمانی شناسایی بشه، یه رویداد دیگه منتشر میشه. چون SendGrid (در زمان نوشتن این مطلب) از ارسال ایمیل ایدمپوتنت (idempotent) پشتیبانی نمیکنه، باید یه قطعه کد کوچک تو برنامتون داشته باشید تا از ارسال ایمیلهای تکراری برای همون دوره جلوگیری کنید.