محمد حسینی راد
محمد حسینی راد
خواندن ۴ دقیقه·۳ سال پیش

افزایش پرفورمنس سیستم با دسته دسته کردن درخواست ها

توی این مقاله در مورد batch select ها صحبت کردیم. اما واقعیت اینه نیاز سیستم ما فقط select ها نیستن که در بعضی شرایط نیاز به batch شدن دارن. تاثیر این روش در insert ها به مراتب بیشتر خواهد بود. بیاید یک سناریو رو با هم فرض کنیم و مقاله رو طبق اون جلو ببریم

مقالمون خیلی ربطی به گو نداره ولی عکسشو دوس داشتم:)
مقالمون خیلی ربطی به گو نداره ولی عکسشو دوس داشتم:)

یک مثال ساده

فرض کنید با دانش مقاله قبلی و این مقاله میخوایم یک سیستم کوتاه کننده لینک طراحی کنیم که بتونه لود بالا رو پشتیبانی کنه. برای این کار دو بخش رو باید در نظر بگیریم که روی دیتابیسمون لود میاره.

  • توی یوز کیس میخوایم دیتاهایی که توی دیتابیس هست رو توی یک کش key/value بریزیم که بتونیم راحت تر scale کنیم و ریسپانس تایممون به کاربرایی که روی لینکامون کلیک کردن کمتر شه. این کار رو توی مقاله قبلی انجام دادیم
  • توی بخش دوم که مربوط به این مقاله هست میخوایم روی یوز کیسی تمرکز کنیم که مربوط به کاربرایی هستن که میخوان لینک بسازن یا لینکاشون رو آپدیت کنن


من دارم روی یک مقاله کار میکنم که با هم یک سیستم شبیه توییتر که توزیع پذیر باشه طراحی کنیم. برای همین از مثال توییتر استفاده میکنم.

فرض کنید داریم یک سیستم شبیه توییتر طراحی میکنیم و میخوایم سرویس هش تگ ها رو پیاده سازی کنیم. فرض کنید در ثانیه ۵ هزار هشتگ درحال ساخته شدن هست. میخوایم طوری سیستممون رو طراحی کنیم که بتونه این عدد رو پشتیبانی کنه. فرض کنید طراحی دیتابیسمون اینطوره ( من برای تست از پستگرس استفاده کردم)

CREATE SEQUENCE hashtags_id_seq; CREATE TABLE hashtags ( id int default nextval('hashtags_id_seq'::regclass), hashtag text NOT NULL, PRIMARY KEY (id), CONSTRAINT hashtag UNIQUE (hashtag) );

یک جدول به اسم hashtags داریم که دو ستون id و hashtag داره و id کلید اصلی ما هست و hashtag هم unique هست.

راه حل ساده! به ازای هر درخواست یک INSERT

ساده ترین راه حل که به ذهن میاد اینه به ازای هر درخواستی که برامون میاد یک INSERT توی دیتابیس انجام بدیم. بیاید با هم ببینیم چقدر این راه بده! من یک تابع توی Go نوشتم که ورودیش تعدادی هشتگ و کانکشن اتصال به دیتابیس هست.

func CreateHashtagsOneByOne(db *sql.DB, hashtags []string) error { var placeholder []string for _, _ = range hashtags { placeholder = append(placeholder, &quot(?)&quot) } for _, hashtag := range hashtags { query := fmt.Sprintf(&quotINSERT INTO hashtags (hashtag) VALUES ($1)&quot) if _, err := db.Exec(query, hashtag); err != nil { return err } } return nil }

همون طوری که توی کد معلوم هست به ازای هر هشتگ یک کوئری INSERT INTO hashtags (hashtag) VALUES ($1) اجرا میشود. بیاید اول تست کنیم اگه خواستیم فقط ۵ تا هشتگ با این روش بسازیم چقدر طول میکشه. کد تست اولمون اینطوره:

var hashtags []string for i := 0; i < 5; i++ { hashtags = append(hashtags, fmt.Sprintf(&quothashtag_%d&quot, i)) } s := time.Now() if err := CreateHashtagsOneByOne(db, hashtags); err != nil { panic(err) } fmt.Println(time.Since(s))

اگه این کد رو اجرا کنیم خروجی ما ۴ میلی ثانیه خواهد بود (یا یه چیزی همین حدودا). عدد خیلی کمیه. حالا اگه جای ۵ تا خواستیم ۵۰ تا هشتگ جدید بسازیم چی ؟ خروجی ما میشه ۷۰ میلی ثانیه! اگه بخوایم ۵۰۰ تا رو ذخیره کنیم چی؟ خروجیمون میشه ۷۰۰ میلی ثانیه. با توجه به اسکیلی که سیستم ما داره میتونه این عددا برای ما قابل قبول باشن. اما چون میدونیم قراره ۵ هزار ریکوئست برثانیه بگیریم با توجه به عددا حس میکنیم ۷ ثانیه طول بکشه! اگه کد رو برای ۵ هزار هشتگ اجرا کنیم عددی حدود ۷ ثانیه میگیریم و این برای سیستم ما اصلا قابل قبول نخواهد بود.

حالا میتونیم یک سری گیر به بنچ مارک من بدیم که قبول هم دارم درست هستن

  • یک) ما تمام insert ها رو داریم توی یک ترد انجام میدیم و این باعث میشه هر کوئری منتظر کوئری بعدی بمونه تا تموم شه در صورتی که توی دنیای واقعی اینطور نیست. هر ریکوئست کاربرا میره روی یک گوروتین و هیج وقت منتظر ریکوئست کاربر دیگه ایی نمیمونیم که تموم شه و نوبت کوئری ما برسه.
  • دو) ما فقط از یک کانکشن به دیتابیس داریم استفاده میکنیم. در صورتی که برای بهبود پرفورمنس میتونیم چند تا کانکشن باز کنیم

من اول از همه سراغ اینا نرفتم تا یکم مسئله رو باز تر کنم. برای الان بهتره واقعی تر کنیم بنچ مارک رو:

  • برای حل مشکل اول من تک تک INSERT ها رو توی یک گوروتین جدا انجام میدم و در نهایت منتظر میمونم تا همشون انجام بشن
  • برای حل مشکل دوم هم تنظیمات کانکشنمون رو توی Go اینطور تنظیم میکنم که ۱۰۰ کانکشن باز و IDLE بتونیم داشته باشیم. فقط برای اینکه مطمئن شیم بیشتر از ۱۰۰ کانکشن باز نمیکنیم من یک queue با چنل ها ساختم که اجازه نمیده همزمان بیشتر از ۹۰ تا گوروتین INSERT بزنن.

کد ما اینطور میشه:

db.SetMaxOpenConns(100) db.SetMaxIdleConns(100) ... func CreateHashtagsOneByOne(db *sql.DB, hashtags []string) error { var placeholder []string for _, _ = range hashtags { placeholder = append(placeholder, &quot(?)&quot) } queue := make(chan struct{}, 90) var wg sync.WaitGroup for _, hashtag := range hashtags { wg.Add(1) go func(hashtag string) { queue <- struct{}{} query := fmt.Sprintf(&quotINSERT INTO hashtags (hashtag) VALUES ($1)&quot) if _, err := db.Exec(query, hashtag); err != nil { panic(err) } wg.Done() <-queue }(hashtag) } wg.Wait() return nil }

خب ببینیم خروجی چطور شد؟ توی رنج ۲۵۰ میلی ثانیه کدمون اجرا میشه که قابل قبوله. حالا اگه جای ۵ هزار تا خواستیم ۵۰ هزار تا insert کنیم چطور؟ حدود دو ثانیه.

با استفاده از SELECT sum(numbackends) FROM pg_stat_database; هم میتونیم تعداد کانکشن های باز رو ببینیم که بیشتر از ۱۰۰ تا نمیشه.

اما چطور این عددا رو بهتر کنیم؟

راه حل دوم) چندین درخواست رو با هم بزنیم

راه حل دوم ما مثل مقاله قبل اینه یک پنجره داشته باشیم و تمام درخواست های insert توی اون پنجره رو با هم بزنیم. مثلا میتونیم تمام درخواست ها رو توی مموری ذخیره کنیم و هر ۵۰ میلی ثانیه یک بار همشون رو با هم بزنیم. برای این کار از bulk insert استفاده میکنیم که احتمالا باهاش آشنایی دارید و خیلی ازشم استفاده کردید.

func CreateHashtagsBatch(db *sql.DB, hashtags []string) error { var ( placeholder []string values []interface{} ) for i, hashtag := range hashtags { placeholder = append(placeholder, fmt.Sprintf(&quot($%d)&quot, i+1)) values = append(values, hashtag) } query := fmt.Sprintf(&quotINSERT INTO hashtags (hashtag) VALUES %s&quot, strings.Join(placeholder, &quot,&quot)) _, err := db.Exec(query, values...) return err }

با استفاده از کوئری INSERT INTO hashtags (hashtag) VALUES ($1) , ($2),($3)و... میتونیم همزمان چندین هشتگ رو insert کنیم. ببینیم پرفورمنس این کد چطوره.

برای ۵ هزار هشتگ با یک کانکشن تونستیم توی ۵۰ میلی ثانیه INSERT کنیم. تا همین الان بهبود خیلی خوبی دادیم به سیستم. حالا اگه بخوایم ۵۰ هزار تا بسازیم چی؟ حدود ۵۰۰ میلی ثانیه. همونطوری که می بینید عددا تا همین الان کلی بهبود پیدا کردن و نیازیم به باز کردن کانکشن جدید و ساختن گوروتین ها نبوده.

حالا بیایم یکم عدالت رو رعایت کنیم :)) اینبار با ۱۰ تا کانکشن همزمان INSERT رو برای ۵۰ هزار هشتگ انجام بدیم.

func CreateHashtagsBatch(db *sql.DB, hashtags []string) error { var ( concurrentRequests = 10 eachBatchSize = len(hashtags) / concurrentRequests ) var batches [][]string for i := 0; i < concurrentRequests; i++ { startIndex := i * eachBatchSize endIndex := (i + 1) * eachBatchSize var batchHashtags []string if i == 0 { batchHashtags = hashtags[:endIndex] } else if i == concurrentRequests-1 { batchHashtags = hashtags[startIndex:] } else { batchHashtags = hashtags[startIndex:endIndex] } batches = append(batches, batchHashtags) } var wg sync.WaitGroup for _, batchHashtags := range batches { wg.Add(1) go func(hashtags []string) { var ( placeholder []string values []interface{} ) for i, hashtag := range hashtags { placeholder = append(placeholder, fmt.Sprintf(&quot($%d)&quot, i+1)) values = append(values, hashtag) } query := fmt.Sprintf(&quotINSERT INTO hashtags (hashtag) VALUES %s&quot, strings.Join(placeholder, &quot,&quot)) if _, err := db.Exec(query, values...); err != nil { panic(err) } wg.Done() }(batchHashtags) } wg.Wait() return nil }

اگر کد رو اجرا کنیم می بینیم که ۵۰هزار تا در حدود ۲۵۰ میلی ثانیه INSERT میشوند که به نسبت ۲ ثانیه پیشرفت زیادی هست. اما یک مسئله مهم رو تا الان در نظر نگرفتیم.

اگر بین درخواست ها هشتگی بود که از قبل وجود داره چطور؟

فرض کنید طراحی سیستم ما طوریه که نمیدونیم یک هشتگ وجود خارجی داره یا نه. و انبوهی از درخواست ها سمت سیستم ما میاد که نمیدونیم کدوم هشتگ از قبل وجود داره و کدوم رو باید ذخیره کنیم. روش اول و کثیف اینطوره که کل درخواست ها رو بگیریم و یک کوئری بزنیم ببینیم کدوم هشتگ ها وجود دارن و اونایی که وجود ندارن رو جدا کنیم و ذخیره کنیم. اول ۵ میلیون هشتگ برای واقعی تر کردن تست ریختم توی دیتابیس.

من کد رو تا جایی که تونستم بهینه زدم. در مرحله اول تمام هشتگ ها رو با هم سعی میکنیم از دیتابیس بگیریم و میریزیمشون توی یک set. حالا یکی یکی چک میکنیم هشتگ هایی که توی ست نیستن رو توی یک اسلایس جدید میریزیم و میدیم همه رو با هم کد batch مون ذخیره کنه.

func UpsertOneByOne(db *sql.DB, hashtags []string) { var ( placeholder []string data []interface{} ) for i, hashtag := range hashtags { placeholder = append(placeholder, fmt.Sprintf(&quot$%d&quot, i+1)) data = append(data, hashtag) } rows, err := db.Query(fmt.Sprintf(&quotSELECT hashtag FROM hashtags WHERE hashtag IN (%s)&quot, strings.Join(placeholder, &quot,&quot)), data...) if err != nil { panic(err) } availableSet := make(map[string]struct{}) for rows.Next() { var hashtag string if err := rows.Scan(&hashtag); err != nil { panic(err) } availableSet[hashtag] = struct{}{} } var newHashtags []string for _, hashtag := range hashtags { if _, ok := availableSet[hashtag]; !ok { newHashtags = append(newHashtags, hashtag) } } CreateHashtagsBatch(db, newHashtags) }

کد رو برای ۵۰هزار هشتگ (۲۵۰۰۰ تا جدید و ۲۵۰۰۰ قدیمی) اجرا کردم. فرایند پیدا کردن هشتگ های قدیمی ۳۰۰ میلی ثانیه طول کشید و فرایند ذخیره سازی نیست ۱۵۰ میلی ثانیه که در کل میشه ۴۵۰ میلی ثانیه.

احتمالا الان براتون سواله خب چرا؟ از قابلیت های دیتابیس استفاده کنیم و کار خودمون رو راحت کنیم دیگه :)) منم باهاتون موافقم.

استفاده از ON CONFLICT

توی کوئریمون میتونیم تعریف کنیم که شما بیا INSERT رو انجام بده حالا اگر یک هشتگی از قبل وجود داشت کاری نکن. برای این کار از ON CONFLICTاستفاده میکنیم که خیلی راحت با اضافه کردن کد زیر به تابع Bulk insert مون میتونیم از همون تابع برای این منظور هم استفاده کنیم:

query += &quotON CONFLICT (hashtag) DO NOTHING&quot

شکل جدید تابعمون مثال قبل رو توی ۲۵۰ میلی ثانیه اجرا میشه که بهبود تقریبا ۲ برابری نسبت به حالت قبل و کد بسیار بسیار تمیز تر هست.

حالا اگر خواستیم آپدیت کنیم چطور؟

فرض کنید اطلاعات کاربرها همراه هر ریکوئست براتون ارسال میشه. میخواد اگر کاربر وجود داشت اون رو با اطلاعات جدید آپدیت کنید و اگر وجود نداشت اون رو بسازیم. با استفاده از ON CONFLICT به راحتی میتونیم همین کارو کنیم.

بیایم اول یک ستون جدید به دیتابیس به اسم description اضافه کنیم:

ALTER TABLE hashtags ADD COLUMN description text;

حالا میخوایم تعدادی هشتگ با description بسازیم. میخوایم اگر هشتگ از قبل وجود نداشت ذخیره شه و اگر وجود داشت ستون description (فقط) آپدیت بشه. برای این کار میتونیم از کوئری زیر استفاده کنیم:

query := fmt.Sprintf(&quotINSERT INTO hashtags (hashtag,description) VALUES %s&quot, strings.Join(placeholder, &quot,&quot)) query += &quot ON CONFLICT (hashtag) DO UPDATE SET description = EXCLUDED.description&quot

برای ۵۰هزار تا مثل مثال های قبل کد رو اجرا کردم و ۵۰۰ میلی ثانیه طول کشید.

خب الان با ترکیب این مقاله و مقاله قبل به چی میرسیم؟

توی مقاله قبل من سعی ام بر روی این بود که با شو آف کردن قدرت گو :)) یکم در مورد batch کردن درخواست و کم کردن تاثیر شبکه برروی سیستم کیفیت سیستمون صحبت کنیم. با ترکیب این مقاله و مقاله قبلی میتونیم درخواست های SELECT, INSERT و UPDATE سیستممون رو Batch کنیم و فشار روی سیستم رو کمتر کنیم و کیفیتش رو بالا تر ببریم.

گوgolangپرفورمنسدیتابیسpostgres
یک مهندس نرم‌افزار در دیوار.
شاید از این پست‌ها خوشتان بیاید