امروز بعد از مدت ها فرصت پیدا کردم تا مطلبی که خودم اولین بار با خوندنش تونسته بودم سرعت عملم توی تحلیل داده رو بهبود بدم رو ترجمه کنم. مطلب اصلی رو میتونین اینجا مطالعه کنین. البته مطلب اصلی با Scala نوشته شده که اینجا من از کد های PySpark استفاده میکنم. همچنین مواردی که به نظرم ذکرش خالی از لطف نیست رو سعی کردم به مقاله اضافه کنم.
اسپارک داده هارا به پارتیشن های جداگانه تقسیم میکند و محاسبات خود را به صورت موازی روی این پارتیشنها انجام میدهد. شما باید متوجه شوید که چگونه دادگان پارتیشن میشوند و چه موقع نیاز داریم تا به صورت دستی عمل پارتیشن کردن را انجام بدهیم تا محاسباتمان در بهینهترین زمان ممکن انجام شوند.
بیایید یک دیتافریم از اعداد ایجاد کنیم تا بتوانیم نمایش دهیم چگونه دادگان پارتیشن میشوند.
x = [i for i in range(1,11)]
row = Row("values")
df = sc.parallelize(x).map(row).toDF()
روی ماشین من این دیتافریم به ۴ پارتیشن تقسیم شده است:
df.rdd.getNumPartitions() # 4
هر پارتیشن به یک فایل جداگانه CSV تبدیل میگردد هنگامی که میخواهیم آن را به CSV تبدیل کنیم:
df.write.csv(“/Users/powers/Desktop/spark_output/numbers”)
اینجا میتوانید نوع توزیع دادگان بر روی پارتیشن های متفاوت را ببینید.
Partition A: 1, 2 Partition B: 3, 4, 5 Partition C: 6, 7 Partition D: 8, 9, 10
متد coalesce تعداد پارتیشن های دیتافریم را کاهش میدهد. اینجا میتوانید ببینید که چگونه میتوان از coalesce استفاده کرد:
df = df.coalesce(2)
با استفاده از دستور زیر میتوانیم تایید کنیم که این روش دیتافریم مارا به دو پارتیشن تقسیم کرده است:
df.rdd.getNumPartitions() # 2
همچنین حالا دیتافریم ما به به صورت دو فایل CSV روی هارد ذخیره میگردد:
df.write.csv(“/Users/powers/Desktop/spark_output/numbers”)
پارتیشن های این دیتافریم را میتوانید در اینجا مشاهده کنید:
Partition A: 1, 2, 3, 4, 5 Partition C: 6, 7, 8, 9, 10
الگوریتم coalesce داده هارا از پارتیشن B به پارتیشن A و همچنین از پارتیشن D به پارتیشن C منتقل میکند. دیتای پارتیشن های A و C جابجا نمیشوند. این الگوریتم سرعت بالایی دارد زیرا تلاش میکند جابجایی دیتا بین نود هارا به حداقل برساند.
شما میتوانید سعی کنید که با coalesce تعداد پارتیشن هارا بیشتر کنید ولی کار نمیکند!
df = df.coalesce(6) df.rdd.getNumPartitions() # 4
دیتافریم ما با ۴ پارتیشن میماند با این که ما سعی کردیم تعداد آن را با این روش به ۶ برسانیم.
متد coalesce تعداد پارتیشن هارا با جابجایی مقداری از دادگان پارتیشن های حال به باقی پارتیشن ها کمتر میکند. این روش به وضوح نمیتواند تعداد پارتیشن هارا افزایش دهد.
متد repartition را میتوان هم برای افزایش و هم کاهش تعداد پارتیشن های دیتافریم اسپارک استفاده کرد.
بیایید با استفاده از این روش دیتافریم را ریپارتیشن نماییم:
df = df.repartition(2) df.rdd.getNumPartitions() # 2
حالا بگذارید ببینیم که در هر کدام از پارتیشن ها چه مقادیر داده ای وجود دارد:
Partition ABC: 1, 3, 5, 6, 8, 10 Partition XYZ: 2, 4, 7, 9
پارتیشن ABC شامل مقادیری از دیتافریم اولیه از پارتیشن A, B, C, D میباشد. پارتیشن XYZ هم همینطور. متد repartition ابتدا به صورت کامل دادگان را شافل می کند، سپس به صورت مساوی دادگان را به تعداد خواسته شده تقسیم مینماید. این متد سعی ندارد تا مانند روش coalesce میزان انتقال دادگان بین نود هارا کاهش دهد.
از متد repartition میتوان برای افزایش تعداد پارتیشن ها استفاده نمود:
df = df.repartition(6) df.rdd.getNumPartitions() # 6
اینجا میتوانید ببینید تا این روش چگونه دادگان را بین پارتیشن های مختلف تقسیم نموده است:
Partition 00000: 5, 7 Partition 00001: 1 Partition 00002: 2 Partition 00003: 8 Partition 00004: 3, 9 Partition 00005: 4, 6, 10
متد repartition ابتدا دادگان را به صورت کامل برهم میریزد سپس دوباره آن هارا به تعداد دسته خواسته شده تقسیم میکند، لذا این متد برعکس متد coalesce میتواند تعداد پارتیشن هارا افزایش دهد.
متد repartition دادگان را ابتدا به صورت کامل شافل میکند و سپس به تعداد پارتیشن مورد نظر تقسیم میکند. ولی متد coalesce پارتیشن های حال را درهم ادغام میکند و از شافل کردن دادگان میپرهیزد.
بگذارید از دیتافریم زیر برای توضیح دادن این که چطور میتوان بر روی ستون عمل repartition را انجام داد استفاده کنیم:
#+-----+-------+ #| age | color | #+-----+-------+ #| 10 | blue | #| 13 | red | #| 15 | blue | #| 99 | red | #| 67 | blue | #+-----+-------+
بگذارید بر روی ستون رنگ ریپارتیشن کنیم.
df = df.repartition('color')
وقتی بر روی ستون ها ریپارتیشن انجام میدهید اسپارک به طور دیفالت ۲۰۰ پارتیشن بر اساس آن ستون ایجاد میکند. به عنوان مثال داده مثال زده شده حاوی ۱۹۸ پارتیشن خالی و دو پارتیشن زیر خواهد بود.
Partition 00091: 13,red 99,red Partition 00168: 10,blue 15,blue 67,blue
این دیتافریم شامل ۲ پارتیشن دارای دیتا که هر کدام از آن ها یک دسته از رنگ هارا در خود جای میدهد میباشد. عمل پارتیشن کردن بر اساس ستون ها مشابه ایندکس براساس ستون در دیتابیس های سنتی است.
از پارتیشن بر اساس ستون میتوان در عمل های join که سنگین هستند و در حالت عادی بسیار طولانی خواهد بود استفاده نمود. به این صورت که دیتافریم های rdd را بر اساس ستونی که قصد join روی آن داریم ریپارتیشن میکنیم و این باعث میشود تا انتقال دیتا بین نود ها کمتر شود و عملا join ما سریع تر انجام گردد.
فرض کنید یک دیتا لیک شامل ۲ میلیارد سطر و حدود ۱۳ هزار پارتیشن ذخیره شده بر روی فایل سیستم داریم.
شما میخواهید یک نمونه از داده که یک میلیون بار کمتر است برای استفاده توسعه کد خود انتخاب کنید. این نمونه معمولا برای توسعه استفاده میگردد و در نهایت باید مدل توسعه داده شده خود را بر روی تمام دیتاها پیاده سازی نمایید. همچنین اینجا این نمونه داده را بر روی S3 ذخیره میکنیم.
dataPuddle = dataLake.sample(true, 0.000001) dataPuddle.write.parquet("s3a://my_bucket/puddle/")
اسپارک به طور پیشفرض تعداد پارتیشن هارا بهینه نمیکند. پس نمونه دیتایی ما در واقع ۲۰۰۰ سطر دارد و همچنان ۱۳۰۰۰ پارتیشن، در نتیجه تعداد زیادی پارتیشن خواهیم داشت که خالی هستند. این موضوع اصلا برای محاسبات و نوشتن بر روی سرور S3 بهینه نیست لذا بهتر است تعداد پارتیشن هارا کاهش دهیم:
dataPuddle = dataLake.sample(true, 0.000001) goodPuddle = dataPuddle.repartition(4) goodPuddle.write.parquet("s3a://my_bucket/puddle/")
داده نمونه گیری شده یک میلیون بار کمتر است، پس ما باید تعداد پارتیشن هارا همان مقدار کمتر کنیم تا در هر پارتیشن میزان برابری دیتا داشته باشیم. ۱ = ۱۳۰۰۰/۱۰۰۰۰۰۰ (به بالا رند شده) ما تعداد پارتیشن هارا ۴ در نظر گرفتیم که پردازش موازی همچنان در سرور انجام گردد.
به طور کلی بهتر هست که تعداد پارتیشن های دیتافریم اسپارکمون ۲،۳ و یا ۴ برابر تعداد کور های سرورمون باشه (برای داده های خیلی حجیم بهتره ۸ رو امتحان کنید!)
number_of_partitions = number_of_cpus * 4
اگه دیتاتون رو خارج از فایل سیستم میگیرین (مثلا فایل CSV یا از دیتابیس های سنتی) اسپارک به صورت دیفالت این دیتافریم اسپارک رو به تعداد پارتیشن بهینه با توجه به حجم دیتا و تعداد کور های سرور تبدیل میکنه و در این حالت نیازی به ریپارتیشن نیست.
شافل کردن داده ها یک عمل وقت گیر است. ولی از طرفی در نمونه گفته شده فقط ۲۰۰۰ سطر داده داریم لذا دیتاهارا شافل کردیم تا در هر پارتیشن میزان برابری دیتا وجود داشته باشد.
به طور کلی ۲۴۰ ثانیه طول کشید تا تعداد سطر های دیتافریم اولیه (که ۱۳۰۰۰ پارتیشن داشت) شمرده شود. در عوض در نمونه پارتیشن شده فقط ۲ ثانیه طول کشید تا همین عمل انجام گردد! این یعنی ۱۲۰ برابر سریع تر.
توجه کنید که اگر دیتای خود رو از روی پارکت و یا فایل هایی از این نوع میخونید همونطور که گفته شد تعداد پارتیشن هاتون برابر با تعداد پارکت فایل هایی هست که بر روی سرور HDFS ذخیره شده. (این که روزانه چند تا پارکت فایل هست رو میتونید از مهندس داده تیمتون بپرسید). به عنوان مثال فرض کنید روزانه ۲۴ پارکت فایل بر روی سرور ذخیره میشه و ما میخوایم دیتای ۳ ماه رو تحلیل کنیم. این یعنی هنگامی که دیتا رو میخونیم دیتافریم اسپارکمون 24*90 پارتیشن خواهد داشت. ولی هر اپلیکشن سرور ما چند تا کور داره؟؟ تعداد پارتیشنمونو باید با توجه به تعداد کور های سرورمون بهینه کنیم.
این مساله غالبا یک مشکل توی کد هایی هست که با داده ها آشنایی کافی رو ندارند. مخصوصا این که افراد کدشون رو برای دیتای یک روز توسعه میدهند و در نهایت میخوان برای بازه خیلی بزرگتری اینو دوباره اصطلاحا ران کنن.
عمل پارتیشن کردن یک عمل سطح پایین است که احتمالا باید به صورت پیشفرض توسط اسپارک انجام میشد. ولی انجام نمیشود! وقتی دیتای بزرگی را فیلتر میکنید احتمالا باید همیشه عمل ریپارتیشن را بر روی دیتافریم rdd خود انجام دهید.