ویرگول
ورودثبت نام
صولت هوشمند
صولت هوشمند
خواندن ۹ دقیقه·۵ سال پیش

مدیریت صحیح پارتیشن های اسپارک

اسپارک
اسپارک

امروز بعد از مدت ها فرصت پیدا کردم تا مطلبی که خودم اولین بار با خوندنش تونسته بودم سرعت عملم توی تحلیل داده رو بهبود بدم رو ترجمه کنم. مطلب اصلی رو میتونین اینجا مطالعه کنین. البته مطلب اصلی با Scala نوشته شده که اینجا من از کد های PySpark استفاده میکنم. همچنین مواردی که به نظرم ذکرش خالی از لطف نیست رو سعی کردم به مقاله اضافه کنم.


اسپارک داده هارا به پارتیشن های جداگانه تقسیم می‌کند و محاسبات خود را به صورت موازی روی این پارتیشن‌ها انجام می‌دهد. شما باید متوجه شوید که چگونه دادگان پارتیشن می‌شوند و چه موقع نیاز داریم تا به صورت دستی عمل پارتیشن کردن را انجام بدهیم تا محاسباتمان در بهینه‌ترین زمان ممکن انجام شوند.

معرفی پارتیشن

بیایید یک دیتافریم از اعداد ایجاد کنیم تا بتوانیم نمایش دهیم چگونه دادگان پارتیشن می‌شوند.

x = [i for i in range(1,11)]
row = Row(&quotvalues&quot)
df = sc.parallelize(x).map(row).toDF()

روی ماشین من این دیتافریم به ۴ پارتیشن تقسیم شده است:

df.rdd.getNumPartitions() # 4

هر پارتیشن به یک فایل جداگانه CSV تبدیل می‌گردد هنگامی که می‌خواهیم آن را به CSV تبدیل کنیم:

df.write.csv(“/Users/powers/Desktop/spark_output/numbers”)

اینجا می‌توانید نوع توزیع دادگان بر روی پارتیشن های متفاوت را ببینید.

Partition A: 1, 2 Partition B: 3, 4, 5 Partition C: 6, 7 Partition D: 8, 9, 10

متد coalesce

متد coalesce تعداد پارتیشن های دیتافریم را کاهش می‌دهد. اینجا می‌توانید ببینید که چگونه می‌توان از coalesce استفاده کرد:

df = df.coalesce(2)

با استفاده از دستور زیر می‌توانیم تایید کنیم که این روش دیتافریم مارا به دو پارتیشن تقسیم کرده است:

df.rdd.getNumPartitions() # 2

همچنین حالا دیتافریم ما به به صورت دو فایل CSV روی هارد ذخیره می‌گردد:

df.write.csv(“/Users/powers/Desktop/spark_output/numbers”)

پارتیشن های این دیتافریم را می‌توانید در اینجا مشاهده کنید:

Partition A: 1, 2, 3, 4, 5 Partition C: 6, 7, 8, 9, 10

الگوریتم coalesce داده هارا از پارتیشن B به پارتیشن A و همچنین از پارتیشن D به پارتیشن C منتقل می‌کند. دیتای پارتیشن های A و C جابجا نمی‌شوند. این الگوریتم سرعت بالایی دارد زیرا تلاش می‌کند جابجایی دیتا بین نود هارا به حداقل برساند.

افزایش تعداد پارتیشن ها

شما می‌توانید سعی کنید که با coalesce تعداد پارتیشن هارا بیشتر کنید ولی کار نمی‌کند!

df = df.coalesce(6) df.rdd.getNumPartitions() # 4

دیتافریم ما با ۴ پارتیشن می‌ماند با این که ما سعی کردیم تعداد آن را با این روش به ۶ برسانیم.

متد coalesce تعداد پارتیشن هارا با جابجایی مقداری از دادگان پارتیشن های حال به باقی پارتیشن ها کمتر می‌کند. این روش به وضوح نمی‌تواند تعداد پارتیشن هارا افزایش دهد.

متد repartition

متد repartition را می‌توان هم برای افزایش و هم کاهش تعداد پارتیشن های دیتافریم اسپارک استفاده کرد.

بیایید با استفاده از این روش دیتافریم را ریپارتیشن نماییم:

df = df.repartition(2) df.rdd.getNumPartitions() # 2

حالا بگذارید ببینیم که در هر کدام از پارتیشن ها چه مقادیر داده ای وجود دارد:

Partition ABC: 1, 3, 5, 6, 8, 10 Partition XYZ: 2, 4, 7, 9

پارتیشن ABC شامل مقادیری از دیتافریم اولیه از پارتیشن A, B, C, D می‌باشد. پارتیشن XYZ هم همینطور. متد repartition ابتدا به صورت کامل دادگان را شافل می کند، سپس به صورت مساوی دادگان را به تعداد خواسته شده تقسیم می‌نماید. این متد سعی ندارد تا مانند روش coalesce میزان انتقال دادگان بین نود هارا کاهش دهد.

افزایش تعداد پارتیشن ها

از متد repartition می‌توان برای افزایش تعداد پارتیشن ها استفاده نمود:

df = df.repartition(6) df.rdd.getNumPartitions() # 6

اینجا می‌توانید ببینید تا این روش چگونه دادگان را بین پارتیشن های مختلف تقسیم نموده است:

Partition 00000: 5, 7 Partition 00001: 1 Partition 00002: 2 Partition 00003: 8 Partition 00004: 3, 9 Partition 00005: 4, 6, 10

متد repartition ابتدا دادگان را به صورت کامل برهم می‌ریزد سپس دوباره آن هارا به تعداد دسته خواسته شده تقسیم می‌کند، لذا این متد برعکس متد coalesce می‌تواند تعداد پارتیشن هارا افزایش دهد.

تفاوت های متد repartition و coalesce

متد repartition دادگان را ابتدا به صورت کامل شافل می‌کند و سپس به تعداد پارتیشن مورد نظر تقسیم می‌کند. ولی متد coalesce پارتیشن های حال را درهم ادغام می‌کند و از شافل کردن دادگان می‌پرهیزد.

انجام repartition بر روی ستون‌ها

بگذارید از دیتافریم زیر برای توضیح دادن این که چطور می‌توان بر روی ستون عمل repartition را انجام داد استفاده کنیم:

#+-----+-------+ #| age | color | #+-----+-------+ #| 10 | blue | #| 13 | red | #| 15 | blue | #| 99 | red | #| 67 | blue | #+-----+-------+

بگذارید بر روی ستون رنگ ریپارتیشن کنیم.

df = df.repartition('color')

وقتی بر روی ستون ها ریپارتیشن انجام می‌دهید اسپارک به طور دیفالت ۲۰۰ پارتیشن بر اساس آن ستون ایجاد می‌کند. به عنوان مثال داده مثال زده شده حاوی ۱۹۸ پارتیشن خالی و دو پارتیشن زیر خواهد بود.

Partition 00091: 13,red 99,red Partition 00168: 10,blue 15,blue 67,blue

این دیتافریم شامل ۲ پارتیشن دارای دیتا که هر کدام از آن ها یک دسته از رنگ هارا در خود جای می‌دهد می‌باشد. عمل پارتیشن کردن بر اساس ستون ها مشابه ایندکس براساس ستون در دیتابیس های سنتی است.

از پارتیشن بر اساس ستون می‌توان در عمل های join که سنگین هستند و در حالت عادی بسیار طولانی خواهد بود استفاده نمود. به این صورت که دیتافریم های rdd را بر اساس ستونی که قصد join روی آن داریم ریپارتیشن میکنیم و این باعث می‌شود تا انتقال دیتا بین نود ها کمتر شود و عملا join ما سریع تر انجام گردد.

یک مثال واقعی

فرض کنید یک دیتا لیک شامل ۲ میلیارد سطر و حدود ۱۳ هزار پارتیشن ذخیره شده بر روی فایل سیستم داریم.

شما می‌خواهید یک نمونه از داده که یک میلیون بار کمتر است برای استفاده توسعه کد خود انتخاب کنید. این نمونه معمولا برای توسعه استفاده می‌گردد و در نهایت باید مدل توسعه داده شده خود را بر روی تمام دیتاها پیاده سازی نمایید. همچنین اینجا این نمونه داده را بر روی S3 ذخیره می‌کنیم.

dataPuddle = dataLake.sample(true, 0.000001) dataPuddle.write.parquet(&quots3a://my_bucket/puddle/&quot)

اسپارک به طور پیشفرض تعداد پارتیشن هارا بهینه نمی‌کند. پس نمونه دیتایی ما در واقع ۲۰۰۰ سطر دارد و همچنان ۱۳۰۰۰ پارتیشن، در نتیجه تعداد زیادی پارتیشن خواهیم داشت که خالی هستند. این موضوع اصلا برای محاسبات و نوشتن بر روی سرور S3 بهینه نیست لذا بهتر است تعداد پارتیشن هارا کاهش دهیم:

dataPuddle = dataLake.sample(true, 0.000001) goodPuddle = dataPuddle.repartition(4) goodPuddle.write.parquet(&quots3a://my_bucket/puddle/&quot)

چرا ۴ پارتیشن؟

داده نمونه گیری شده یک میلیون بار کمتر است، پس ما باید تعداد پارتیشن هارا همان مقدار کمتر کنیم تا در هر پارتیشن میزان برابری دیتا داشته باشیم. ۱ = ۱۳۰۰۰/۱۰۰۰۰۰۰ (به بالا رند شده) ما تعداد پارتیشن هارا ۴ در نظر گرفتیم که پردازش موازی همچنان در سرور انجام گردد.

به طور کلی بهتر هست که تعداد پارتیشن های دیتافریم اسپارکمون ۲،۳ و یا ۴ برابر تعداد کور های سرورمون باشه (برای داده های خیلی حجیم بهتره ۸ رو امتحان کنید!)

number_of_partitions = number_of_cpus * 4

اگه دیتاتون رو خارج از فایل سیستم میگیرین (مثلا فایل CSV یا از دیتابیس های سنتی) اسپارک به صورت دیفالت این دیتافریم اسپارک رو به تعداد پارتیشن بهینه با توجه به حجم دیتا و تعداد کور های سرور تبدیل می‌کنه و در این حالت نیازی به ریپارتیشن نیست.

چرا repartition استفاده کردیم و نه coalesce ؟

شافل کردن داده ها یک عمل وقت گیر است. ولی از طرفی در نمونه گفته شده فقط ۲۰۰۰ سطر داده داریم لذا دیتاهارا شافل کردیم تا در هر پارتیشن میزان برابری دیتا وجود داشته باشد.

میزان بهبود سرعت؟

به طور کلی ۲۴۰ ثانیه طول کشید تا تعداد سطر های دیتافریم اولیه (که ۱۳۰۰۰ پارتیشن داشت) شمرده شود. در عوض در نمونه پارتیشن شده فقط ۲ ثانیه طول کشید تا همین عمل انجام گردد! این یعنی ۱۲۰ برابر سریع تر.

توجه کنید که اگر دیتای خود رو از روی پارکت و یا فایل هایی از این نوع می‌خونید همونطور که گفته شد تعداد پارتیشن هاتون برابر با تعداد پارکت فایل هایی هست که بر روی سرور HDFS ذخیره شده. (این که روزانه چند تا پارکت فایل هست رو میتونید از مهندس داده تیمتون بپرسید). به عنوان مثال فرض کنید روزانه ۲۴ پارکت فایل بر روی سرور ذخیره میشه و ما میخوایم دیتای ۳ ماه رو تحلیل کنیم. این یعنی هنگامی که دیتا رو می‌خونیم دیتافریم اسپارکمون 24*90 پارتیشن خواهد داشت. ولی هر اپلیکشن سرور ما چند تا کور داره؟؟ تعداد پارتیشنمونو باید با توجه به تعداد کور های سرورمون بهینه کنیم.

این مساله غالبا یک مشکل توی کد هایی هست که با داده ها آشنایی کافی رو ندارند. مخصوصا این که افراد کدشون رو برای دیتای یک روز توسعه می‌دهند و در نهایت میخوان برای بازه خیلی بزرگتری اینو دوباره اصطلاحا ران کنن.

احتمالا باید به پارتیشن های خود فکر کنید

عمل پارتیشن کردن یک عمل سطح پایین است که احتمالا باید به صورت پیشفرض توسط اسپارک انجام می‌شد. ولی انجام نمی‌شود! وقتی دیتای بزرگی را فیلتر می‌کنید احتمالا باید همیشه عمل ریپارتیشن را بر روی دیتافریم rdd خود انجام دهید.

اسپارکsparkpartitionپارتیشنریپارتیشن
شاید از این پست‌ها خوشتان بیاید