مهرداد
خواندن ۷ دقیقه·۳ ماه پیش

توسعه میکروسرویس های event driven با ksqlDB

معماری event driven معماری ایه که توش میکروسرویس ها به مجموعه‌ای خاص از رویدادها (events) واکنش نشان می دن. تو ساخت میکروسرویس‌ها، رویدادها اطلاع‌رسانی‌هایی هستن که تغییرات وضعیت توی سیستم شما را منعکس می‌کنن. تو این معماری، یک میکروسرویس عملیاتی را انجام می‌ده که یک رویداد تولید می‌کنه، در حالی که میکروسرویس دیگری برای آن رویداد گوش می‌ده و مطابق آن عملیاتی را انجام می‌ده. بنابراین، اصطلاح "مبتنی بر رویداد" یا event driven از همین مفهوم سرچشمه می‌گیره، جایی که اجزای سیستم در واکنش به رویدادها عمل می‌کنن.

به عنوان مثال تصور کنید که تو یه شرکت خدمات مالی کار می‌کنید که هر روز تعداد زیادی از تراکنش‌های کارت اعتباری تسویه می‌کنه. شما می‌خواید از فعالیت‌های مخرب مشتریان خود جلوگیری کنید. وقتی که تعداد زیادی تراکنش در یک بازه زمانی کوتاه اتفاق می‌افته، شما می‌خواهید از طریق یک اعلان به صاحب کارت در مورد فعالیت مشکوک اطلاع بدید. توی این مثال هر تراکنش یه event هستش و ما باید اپلیکیشنی طراحی کنیم که با پراسس event ها، به اونا پاسخ صحیحی بده.

یک روش رایج برای پیاده‌سازی این معماری اینه که event ها رو به کافکا ارسال کنیم و میکروسرویسی بنویسیم که رویداد های مشکوک رو تشخیص می ده و یه ایمیل امنیتی به صاحبان حساب می فرسته. شکل پایین معماری کلی این اپلیکیشن رو نشون می ده.

در اینجا میکروسرویسی که نوشتیم باید اصطلاحا state هر کارت رو ذخیره کنه، یعنی باید بدونه که هر کارت توی ۳۰ ثانیه اخیر چند تا تراکنش داشته. بعلاوه میکروسرویس ما وظیفه ارسال کردن ایمیل رو هم بر عهده داره. می دونیم که مقایس کردن اپلیکیشن های stateful کار چالشی ای هستش. از طرفی اگر تغییری در منطق ارسال ایمیل بدیم باید کل میکروسرویس، اعم از بخشی که وظیفه پروسس event ها و ذخیره state رو داره، آپدیت شه. آیا می تونیم بهتر عمل کنیم ؟‌ جواب بله هست، اما قبل از اینکه راه حلمون رو شرح بدیم بهتره یکم راجب کافکا صحبت کنیم.

کافکا

کافکا یه پلتفرم متن‌بازه که برای پردازش و مدیریت جریان‌های داده در مقیاس بالا طراحی شده. تو کافکا، داده‌ها به صورت تاپیک سازماندهی می‌شن. تایپک ها اطلاعات مربوط به یه موضوع خاص رو نگه می‌دارن. منتشرین (publishers) پیام‌ها رو به تاپیک ها مشخص می‌فرستن و مشترکین (subscriber) این پیام‌ها رو از همون تاپیک ها دریافت می‌کنن. این مدل انتشار-اشتراک، publisher-subscriber، باعث می‌شه سیستم‌ها بتونن به‌طور غیرهمزمان (asynchronous) با هم ارتباط داشته باشن. به عبارتی، publisher ها نیازی ندارن بدونن کدوم subscriber ها پیام‌ها رو می‌گیره، و subscriber ها هم نیازی ندارن نگران publisher ها باشن. این ساختار باعث انعطاف‌پذیری بالای کافکا می‌شه و به میکروسرویس‌ها اجازه می‌ده به راحتی داده‌ها رو تبادل کنن و بدون وابستگی به هم کار کنن. علاوه بر این، کافکا می‌تونه داده‌ها رو به صورت توزیع‌شده ذخیره کنه و از خطاهای احتمالی جلوگیری کنه، که این ویژگی‌ها اونو برای سیستم‌های بزرگ و پیچیده ایده‌آل کرده.

کافکا استریمز یه کتابخونه جاواست که برای پردازش جریان‌های داده ها در کافکا طراحی شده. با استفاده از کافکا استریمز می‌تونید داده‌هایی که از تاپیک‌ها می‌خوانید رو پردازش و تحلیل کنید، فیلتر (filter) کنید، یا حتی ترکیب (join) و گروهبندی (aggregate) کنید. این کتابخونه به شما این امکان رو می‌ده که بدون نیاز به سرویس‌های جداگانه پردازش جریان‌های پیچیده رو درون خود کافکا انجام بدید. همچنین کافکا استریمز می‌تونه وضعیت پردازش‌ها رو در یک پایگاه داده محلی ذخیره کنه تا بتونید پردازش‌ها رو در صورت نیاز از سر بگیرید.

ksqlDB یه موتور SQL هست که روی API کافکا استریمز ساخته شده و کوئری‌های SQL رو به دستورات قابل اجرا در API کافکا استریمز تبدیل می‌کنه. این قابلیت باعث می‌شه که کاربران بتونن مستقیماً با استفاده از SQL روی استیریم ها کار کنن، بدون اینکه نیازی به استفاده از زبان‌های برنامه‌نویسی مثل جاوا یا اسکالا باشه.

طراحی معماری جدید با استفاده از ksqlDB

ما میخوایم وظیفه پروسس کردن event ها و ذخیره state رو به ksqlDB واگذار کنیم و تنها میکروسرویسی بنویسیم که با دریافت event های مشکوک، به صاحبان حساب ایمیل ارسال می کنه. شکل زیر شمای کلی معماری رو نشون میده.

قبل از ادامه کار، باید استک کافکا و سرور ksqlDB رو با استفاده از داکر بالا بیارید. پیشنهاد می کنم برای اینکار به داکیومنت های confluent یه سری بزنید. confluent ارائه دهنده پلتفرم data streaming هستش. به بیان ساده، کافکا رو به عنوان سرویس به مشتری ارائه میدن تا شما نیاز نداشته باشید اون رو نصب یا نگه داری کنید.

ساخت استریم تراکنش ها

با اجرای کامند بعدی می تونید با استفاده از CLI به سرور ksqlDB وصل شید.

docker exec -it ksqldb-cli ksql <KSQL_SERVER_ADDRESS>

کامند پایین رو حتما اجرا کنید تا از اولین پیام در استریم پیام ها رو بخونید.

SET 'auto.offset.reset' = 'earliest';

حالا باید استریمی برای تراکنش ها بسازیم. هر تراکنش شماره کارت، آدرس ایمیل،‌ و میزان تراکنش رو داره. می تونیم برای ساخت استریم تراکنش ها کامند SQL پایین رو وارد CLI کنیم.

CREATE STREAM transactions ( tx_id VARCHAR KEY, email_address VARCHAR, card_number VARCHAR, timestamp VARCHAR, amount DECIMAL(12, 2) ) WITH ( kafka_topic = 'transactions', partitions = 8, value_format = 'avro', timestamp = 'timestamp', timestamp_format = 'yyyy-MM-dd''T''HH:mm:ss' );

در ادامه کامند زیر رو داخل ksql CLI وارد کنید تا تعداد تراکنش غیرواقعی داخل استریممون داشته باشیم.

INSERT INTO transactions ( email_address, card_number, tx_id, timestamp, amount ) VALUES ( 'michael@example.com', '358579699410099', 'f88c5ebb-699c-4a7b-b544-45b30681cc39', '2020-04-22T03:19:58', 50.25 ); INSERT INTO transactions ( email_address, card_number, tx_id, timestamp, amount ) VALUES ( 'derek@example.com', '352642227248344', '0cf100ca-993c-427f-9ea5-e892ef350363', '2020-04-22T12:50:30', 18.97 ); INSERT INTO transactions ( email_address, card_number, tx_id, timestamp, amount ) VALUES ( 'colin@example.com', '373913272311617', 'de9831c0-7cf1-4ebf-881d-0415edec0d6b', '2020-04-22T09:45:15', 12.50 ); INSERT INTO transactions ( email_address, card_number, tx_id, timestamp, amount ) VALUES ( 'michael@example.com', '358579699410099', '044530c0-b15d-4648-8f05-940acc321eb7', '2020-04-22T03:19:54', 103.43 ); INSERT INTO transactions ( email_address, card_number, tx_id, timestamp, amount ) VALUES ( 'derek@example.com', '352642227248344', '5d916e65-1af3-4142-9fd3-302dd55c512f', '2020-04-22T12:50:25', 3200.80 ); INSERT INTO transactions ( email_address, card_number, tx_id, timestamp, amount ) VALUES ( 'derek@example.com', '352642227248344', 'd7d47fdb-75e9-46c0-93f6-d42ff1432eea', '2020-04-22T12:51:55', 154.32 ); INSERT INTO transactions ( email_address, card_number, tx_id, timestamp, amount ) VALUES ( 'michael@example.com', '358579699410099', 'c5719d20-8d4a-47d4-8cd3-52ed784c89dc', '2020-04-22T03:19:32', 78.73 ); INSERT INTO transactions ( email_address, card_number, tx_id, timestamp, amount ) VALUES ( 'colin@example.com', '373913272311617', '2360d53e-3fad-4e9a-b306-b166b7ca4f64', '2020-04-22T09:45:35', 234.65 ); INSERT INTO transactions ( email_address, card_number, tx_id, timestamp, amount ) VALUES ( 'colin@example.com', '373913272311617', 'de9831c0-7cf1-4ebf-881d-0415edec0d6b', '2020-04-22T09:44:03', 150.00 );

ساخت جدول داده های غیرطبیعی (anomalous)

برای ساخت جدول کامند زیر رو وارد کنید

CREATE TABLE possible_anomalies WITH ( kafka_topic = 'possible_anomalies', VALUE_AVRO_SCHEMA_FULL_NAME = 'io.ksqldb.tutorial.PossibleAnomaly' ) AS SELECT card_number AS `card_number_key`, as_value(card_number) AS `card_number`, latest_by_offset(email_address) AS `email_address`, count(*) AS `n_attempts`, sum(amount) AS `total_amount`, collect_list(tx_id) AS `tx_ids`, WINDOWSTART as `start_boundary`, WINDOWEND as `end_boundary` FROM transactions WINDOW TUMBLING (SIZE 30 SECONDS, RETENTION 1000 DAYS) GROUP BY card_number HAVING count(*) >= 3 EMIT CHANGES;

این دستور به صورت زیر کار می‌کنه:

  • برای هر شماره کارت اعتباری، پنجره‌های ۳۰ ثانیه‌ای به‌طور متوالی ساخته می‌شه تا فعالیت‌ها رو گروه‌بندی کنه. وقتی حداقل ۳ تراکنش تو یه پنجره مشخص انجام بشه، یه ردیف جدید به جدول اضافه می‌شه.
  • هر ردیف برای ۱۰۰۰ روز آخر نگه داری می شه. به‌طور کلی، باید مدت‌زمان نگهداری رو با دقت انتخاب کنید. این یه تعادل بین ذخیره‌سازی داده‌ها برای مدت طولانی‌تر و مصرف فضای ذخیره سازیه.
  • شناسه‌های تراکنش و مبلغ‌ها که پنجره رو می‌سازن، به‌عنوان لیست جمع‌آوری می‌شن.
  • اسم مستعار ستون‌ها با backtick محاط شدن که به ksqlDB می‌گه باید دقیقاً از همون اسم استفاده کنه. به‌طور پیش‌فرض، ksqlDB نام‌ها رو به‌صورت بزرگ‌نویس(uppercase) می‌کنه.
  • تاپیک کافکا برای این جدول به possible_anomalies تنظیم شده.
  • اسکیما Avro که ksqlDB برای بخش مقدار رکوردها می‌سازه تو فضای نام io.ksqldb.tutorial.PossibleAnomaly ذخیره می‌شه.

با کامند پایین می تونید ببینید جدول anomaly ها چی تو خودش داره.

SELECT * FROM possible_anomalies EMIT CHANGES;

خروجی باید یه سطر با شماره کارت 358579699410099رو نشون بده.

کار ما تموم شده. تنها کاری که میکروسرویسمون باید انجام بده خوندن مسیج ها از استریم جدیده. در نهایت، توجه داشته باشید که ksqlDB هر بار که پنجره‌ی متوالی تغییر می‌کنه، یه رویداد جدید تولید می‌کنه. ksqlDB تغییرات جدید رو به‌طور مداوم مبتنی بر وضعیت منتشر میکنه. مثلاً، اگه یک anomaly تشخیص داده بشه چون سه تراکنش کارت اعتباری در یه بازه زمانی خاص پیدا شده باشه، یک رویداد از جدول منتشر می‌شه. اگه یه تراکنش چهارم تو همون بازه زمانی شناسایی بشه، یه رویداد دیگه منتشر می‌شه. چون SendGrid (در زمان نوشتن این مطلب) از ارسال ایمیل ایدمپوتنت (idempotent) پشتیبانی نمی‌کنه، باید یه قطعه کد کوچک تو برنامتون داشته باشید تا از ارسال ایمیل‌های تکراری برای همون دوره جلوگیری کنید.

شاید از این پست‌ها خوشتان بیاید