توی این مقاله در مورد batch select ها صحبت کردیم. اما واقعیت اینه نیاز سیستم ما فقط select ها نیستن که در بعضی شرایط نیاز به batch شدن دارن. تاثیر این روش در insert ها به مراتب بیشتر خواهد بود. بیاید یک سناریو رو با هم فرض کنیم و مقاله رو طبق اون جلو ببریم
فرض کنید با دانش مقاله قبلی و این مقاله میخوایم یک سیستم کوتاه کننده لینک طراحی کنیم که بتونه لود بالا رو پشتیبانی کنه. برای این کار دو بخش رو باید در نظر بگیریم که روی دیتابیسمون لود میاره.
من دارم روی یک مقاله کار میکنم که با هم یک سیستم شبیه توییتر که توزیع پذیر باشه طراحی کنیم. برای همین از مثال توییتر استفاده میکنم.
فرض کنید داریم یک سیستم شبیه توییتر طراحی میکنیم و میخوایم سرویس هش تگ ها رو پیاده سازی کنیم. فرض کنید در ثانیه ۵ هزار هشتگ درحال ساخته شدن هست. میخوایم طوری سیستممون رو طراحی کنیم که بتونه این عدد رو پشتیبانی کنه. فرض کنید طراحی دیتابیسمون اینطوره ( من برای تست از پستگرس استفاده کردم)
CREATE SEQUENCE hashtags_id_seq; CREATE TABLE hashtags ( id int default nextval('hashtags_id_seq'::regclass), hashtag text NOT NULL, PRIMARY KEY (id), CONSTRAINT hashtag UNIQUE (hashtag) );
یک جدول به اسم hashtags داریم که دو ستون id و hashtag داره و id کلید اصلی ما هست و hashtag هم unique هست.
ساده ترین راه حل که به ذهن میاد اینه به ازای هر درخواستی که برامون میاد یک INSERT توی دیتابیس انجام بدیم. بیاید با هم ببینیم چقدر این راه بده! من یک تابع توی Go نوشتم که ورودیش تعدادی هشتگ و کانکشن اتصال به دیتابیس هست.
func CreateHashtagsOneByOne(db *sql.DB, hashtags []string) error { var placeholder []string for _, _ = range hashtags { placeholder = append(placeholder, "(?)") } for _, hashtag := range hashtags { query := fmt.Sprintf("INSERT INTO hashtags (hashtag) VALUES ($1)") if _, err := db.Exec(query, hashtag); err != nil { return err } } return nil }
همون طوری که توی کد معلوم هست به ازای هر هشتگ یک کوئری INSERT INTO hashtags (hashtag) VALUES ($1) اجرا میشود. بیاید اول تست کنیم اگه خواستیم فقط ۵ تا هشتگ با این روش بسازیم چقدر طول میکشه. کد تست اولمون اینطوره:
var hashtags []string for i := 0; i < 5; i++ { hashtags = append(hashtags, fmt.Sprintf("hashtag_%d", i)) } s := time.Now() if err := CreateHashtagsOneByOne(db, hashtags); err != nil { panic(err) } fmt.Println(time.Since(s))
اگه این کد رو اجرا کنیم خروجی ما ۴ میلی ثانیه خواهد بود (یا یه چیزی همین حدودا). عدد خیلی کمیه. حالا اگه جای ۵ تا خواستیم ۵۰ تا هشتگ جدید بسازیم چی ؟ خروجی ما میشه ۷۰ میلی ثانیه! اگه بخوایم ۵۰۰ تا رو ذخیره کنیم چی؟ خروجیمون میشه ۷۰۰ میلی ثانیه. با توجه به اسکیلی که سیستم ما داره میتونه این عددا برای ما قابل قبول باشن. اما چون میدونیم قراره ۵ هزار ریکوئست برثانیه بگیریم با توجه به عددا حس میکنیم ۷ ثانیه طول بکشه! اگه کد رو برای ۵ هزار هشتگ اجرا کنیم عددی حدود ۷ ثانیه میگیریم و این برای سیستم ما اصلا قابل قبول نخواهد بود.
حالا میتونیم یک سری گیر به بنچ مارک من بدیم که قبول هم دارم درست هستن
من اول از همه سراغ اینا نرفتم تا یکم مسئله رو باز تر کنم. برای الان بهتره واقعی تر کنیم بنچ مارک رو:
کد ما اینطور میشه:
db.SetMaxOpenConns(100) db.SetMaxIdleConns(100) ... func CreateHashtagsOneByOne(db *sql.DB, hashtags []string) error { var placeholder []string for _, _ = range hashtags { placeholder = append(placeholder, "(?)") } queue := make(chan struct{}, 90) var wg sync.WaitGroup for _, hashtag := range hashtags { wg.Add(1) go func(hashtag string) { queue <- struct{}{} query := fmt.Sprintf("INSERT INTO hashtags (hashtag) VALUES ($1)") if _, err := db.Exec(query, hashtag); err != nil { panic(err) } wg.Done() <-queue }(hashtag) } wg.Wait() return nil }
خب ببینیم خروجی چطور شد؟ توی رنج ۲۵۰ میلی ثانیه کدمون اجرا میشه که قابل قبوله. حالا اگه جای ۵ هزار تا خواستیم ۵۰ هزار تا insert کنیم چطور؟ حدود دو ثانیه.
با استفاده از SELECT sum(numbackends) FROM pg_stat_database; هم میتونیم تعداد کانکشن های باز رو ببینیم که بیشتر از ۱۰۰ تا نمیشه.
اما چطور این عددا رو بهتر کنیم؟
راه حل دوم ما مثل مقاله قبل اینه یک پنجره داشته باشیم و تمام درخواست های insert توی اون پنجره رو با هم بزنیم. مثلا میتونیم تمام درخواست ها رو توی مموری ذخیره کنیم و هر ۵۰ میلی ثانیه یک بار همشون رو با هم بزنیم. برای این کار از bulk insert استفاده میکنیم که احتمالا باهاش آشنایی دارید و خیلی ازشم استفاده کردید.
func CreateHashtagsBatch(db *sql.DB, hashtags []string) error { var ( placeholder []string values []interface{} ) for i, hashtag := range hashtags { placeholder = append(placeholder, fmt.Sprintf("($%d)", i+1)) values = append(values, hashtag) } query := fmt.Sprintf("INSERT INTO hashtags (hashtag) VALUES %s", strings.Join(placeholder, ",")) _, err := db.Exec(query, values...) return err }
با استفاده از کوئری INSERT INTO hashtags (hashtag) VALUES ($1) , ($2),($3)و... میتونیم همزمان چندین هشتگ رو insert کنیم. ببینیم پرفورمنس این کد چطوره.
برای ۵ هزار هشتگ با یک کانکشن تونستیم توی ۵۰ میلی ثانیه INSERT کنیم. تا همین الان بهبود خیلی خوبی دادیم به سیستم. حالا اگه بخوایم ۵۰ هزار تا بسازیم چی؟ حدود ۵۰۰ میلی ثانیه. همونطوری که می بینید عددا تا همین الان کلی بهبود پیدا کردن و نیازیم به باز کردن کانکشن جدید و ساختن گوروتین ها نبوده.
حالا بیایم یکم عدالت رو رعایت کنیم :)) اینبار با ۱۰ تا کانکشن همزمان INSERT رو برای ۵۰ هزار هشتگ انجام بدیم.
func CreateHashtagsBatch(db *sql.DB, hashtags []string) error { var ( concurrentRequests = 10 eachBatchSize = len(hashtags) / concurrentRequests ) var batches [][]string for i := 0; i < concurrentRequests; i++ { startIndex := i * eachBatchSize endIndex := (i + 1) * eachBatchSize var batchHashtags []string if i == 0 { batchHashtags = hashtags[:endIndex] } else if i == concurrentRequests-1 { batchHashtags = hashtags[startIndex:] } else { batchHashtags = hashtags[startIndex:endIndex] } batches = append(batches, batchHashtags) } var wg sync.WaitGroup for _, batchHashtags := range batches { wg.Add(1) go func(hashtags []string) { var ( placeholder []string values []interface{} ) for i, hashtag := range hashtags { placeholder = append(placeholder, fmt.Sprintf("($%d)", i+1)) values = append(values, hashtag) } query := fmt.Sprintf("INSERT INTO hashtags (hashtag) VALUES %s", strings.Join(placeholder, ",")) if _, err := db.Exec(query, values...); err != nil { panic(err) } wg.Done() }(batchHashtags) } wg.Wait() return nil }
اگر کد رو اجرا کنیم می بینیم که ۵۰هزار تا در حدود ۲۵۰ میلی ثانیه INSERT میشوند که به نسبت ۲ ثانیه پیشرفت زیادی هست. اما یک مسئله مهم رو تا الان در نظر نگرفتیم.
فرض کنید طراحی سیستم ما طوریه که نمیدونیم یک هشتگ وجود خارجی داره یا نه. و انبوهی از درخواست ها سمت سیستم ما میاد که نمیدونیم کدوم هشتگ از قبل وجود داره و کدوم رو باید ذخیره کنیم. روش اول و کثیف اینطوره که کل درخواست ها رو بگیریم و یک کوئری بزنیم ببینیم کدوم هشتگ ها وجود دارن و اونایی که وجود ندارن رو جدا کنیم و ذخیره کنیم. اول ۵ میلیون هشتگ برای واقعی تر کردن تست ریختم توی دیتابیس.
من کد رو تا جایی که تونستم بهینه زدم. در مرحله اول تمام هشتگ ها رو با هم سعی میکنیم از دیتابیس بگیریم و میریزیمشون توی یک set. حالا یکی یکی چک میکنیم هشتگ هایی که توی ست نیستن رو توی یک اسلایس جدید میریزیم و میدیم همه رو با هم کد batch مون ذخیره کنه.
func UpsertOneByOne(db *sql.DB, hashtags []string) { var ( placeholder []string data []interface{} ) for i, hashtag := range hashtags { placeholder = append(placeholder, fmt.Sprintf("$%d", i+1)) data = append(data, hashtag) } rows, err := db.Query(fmt.Sprintf("SELECT hashtag FROM hashtags WHERE hashtag IN (%s)", strings.Join(placeholder, ",")), data...) if err != nil { panic(err) } availableSet := make(map[string]struct{}) for rows.Next() { var hashtag string if err := rows.Scan(&hashtag); err != nil { panic(err) } availableSet[hashtag] = struct{}{} } var newHashtags []string for _, hashtag := range hashtags { if _, ok := availableSet[hashtag]; !ok { newHashtags = append(newHashtags, hashtag) } } CreateHashtagsBatch(db, newHashtags) }
کد رو برای ۵۰هزار هشتگ (۲۵۰۰۰ تا جدید و ۲۵۰۰۰ قدیمی) اجرا کردم. فرایند پیدا کردن هشتگ های قدیمی ۳۰۰ میلی ثانیه طول کشید و فرایند ذخیره سازی نیست ۱۵۰ میلی ثانیه که در کل میشه ۴۵۰ میلی ثانیه.
احتمالا الان براتون سواله خب چرا؟ از قابلیت های دیتابیس استفاده کنیم و کار خودمون رو راحت کنیم دیگه :)) منم باهاتون موافقم.
توی کوئریمون میتونیم تعریف کنیم که شما بیا INSERT رو انجام بده حالا اگر یک هشتگی از قبل وجود داشت کاری نکن. برای این کار از ON CONFLICTاستفاده میکنیم که خیلی راحت با اضافه کردن کد زیر به تابع Bulk insert مون میتونیم از همون تابع برای این منظور هم استفاده کنیم:
query += "ON CONFLICT (hashtag) DO NOTHING"
شکل جدید تابعمون مثال قبل رو توی ۲۵۰ میلی ثانیه اجرا میشه که بهبود تقریبا ۲ برابری نسبت به حالت قبل و کد بسیار بسیار تمیز تر هست.
فرض کنید اطلاعات کاربرها همراه هر ریکوئست براتون ارسال میشه. میخواد اگر کاربر وجود داشت اون رو با اطلاعات جدید آپدیت کنید و اگر وجود نداشت اون رو بسازیم. با استفاده از ON CONFLICT به راحتی میتونیم همین کارو کنیم.
بیایم اول یک ستون جدید به دیتابیس به اسم description اضافه کنیم:
ALTER TABLE hashtags ADD COLUMN description text;
حالا میخوایم تعدادی هشتگ با description بسازیم. میخوایم اگر هشتگ از قبل وجود نداشت ذخیره شه و اگر وجود داشت ستون description (فقط) آپدیت بشه. برای این کار میتونیم از کوئری زیر استفاده کنیم:
query := fmt.Sprintf("INSERT INTO hashtags (hashtag,description) VALUES %s", strings.Join(placeholder, ",")) query += " ON CONFLICT (hashtag) DO UPDATE SET description = EXCLUDED.description"
برای ۵۰هزار تا مثل مثال های قبل کد رو اجرا کردم و ۵۰۰ میلی ثانیه طول کشید.
توی مقاله قبل من سعی ام بر روی این بود که با شو آف کردن قدرت گو :)) یکم در مورد batch کردن درخواست و کم کردن تاثیر شبکه برروی سیستم کیفیت سیستمون صحبت کنیم. با ترکیب این مقاله و مقاله قبلی میتونیم درخواست های SELECT, INSERT و UPDATE سیستممون رو Batch کنیم و فشار روی سیستم رو کمتر کنیم و کیفیتش رو بالا تر ببریم.