<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0">
    <channel>
        <title>نوشته های صولت هوشمند</title>
        <link>https://virgool.io/feed/@s.houshmand</link>
        <description></description>
        <language>fa</language>
        <pubDate>2026-06-17 03:34:13</pubDate>
        <image>
            <url>https://files.virgool.io/upload/users/39088/avatar/1Ruxxp.png?height=120&amp;width=120</url>
            <title>صولت هوشمند</title>
            <link>https://virgool.io/@s.houshmand</link>
        </image>

                    <item>
                <title>مدیریت صحیح پارتیشن های اسپارک</title>
                <link>https://virgool.io/@s.houshmand/%D9%85%D8%AF%DB%8C%D8%B1%DB%8C%D8%AA-%D8%B5%D8%AD%DB%8C%D8%AD-%D9%BE%D8%A7%D8%B1%D8%AA%DB%8C%D8%B4%D9%86-%D9%87%D8%A7%DB%8C-%D8%A7%D8%B3%D9%BE%D8%A7%D8%B1%DA%A9-uawu62z9sfbm</link>
                <description>اسپارکامروز بعد از مدت ها فرصت پیدا کردم تا مطلبی که خودم اولین بار با خوندنش تونسته بودم سرعت عملم توی تحلیل داده رو بهبود بدم رو ترجمه کنم. مطلب اصلی رو میتونین اینجا مطالعه کنین. البته مطلب اصلی با Scala نوشته شده که اینجا من از کد های PySpark استفاده میکنم. همچنین مواردی که به نظرم ذکرش خالی از لطف نیست رو سعی کردم به مقاله اضافه کنم.اسپارک داده هارا به پارتیشن های جداگانه تقسیم می‌کند و محاسبات خود را به صورت موازی روی این  پارتیشن‌ها انجام می‌دهد. شما باید متوجه شوید که چگونه دادگان پارتیشن می‌شوند و چه موقع نیاز داریم تا به صورت دستی عمل پارتیشن کردن را انجام بدهیم تا محاسباتمان در بهینه‌ترین زمان ممکن انجام شوند.معرفی پارتیشنبیایید یک دیتافریم از اعداد ایجاد کنیم تا بتوانیم نمایش دهیم چگونه دادگان پارتیشن می‌شوند.x = [i for i in range(1,11)]row = Row(&amp;quotvalues&amp;quot)df = sc.parallelize(x).map(row).toDF()روی ماشین من این دیتافریم به ۴ پارتیشن تقسیم شده است:df.rdd.getNumPartitions() 
# 4هر پارتیشن به یک فایل جداگانه CSV تبدیل می‌گردد هنگامی که می‌خواهیم آن را به CSV تبدیل کنیم:df.write.csv(“/Users/powers/Desktop/spark_output/numbers”)اینجا می‌توانید نوع توزیع دادگان بر روی پارتیشن های متفاوت را ببینید.Partition A: 1, 2
Partition B: 3, 4, 5
Partition C: 6, 7
Partition D: 8, 9, 10متد coalesceمتد coalesce تعداد پارتیشن های دیتافریم را کاهش می‌دهد. اینجا می‌توانید ببینید که چگونه می‌توان از coalesce استفاده کرد:df = df.coalesce(2)با استفاده از دستور زیر می‌توانیم تایید کنیم که این روش دیتافریم مارا به دو پارتیشن تقسیم کرده است:df.rdd.getNumPartitions()
# 2همچنین حالا دیتافریم ما به به صورت دو فایل CSV روی هارد ذخیره می‌گردد:df.write.csv(“/Users/powers/Desktop/spark_output/numbers”)پارتیشن های این دیتافریم را می‌توانید در اینجا مشاهده کنید:Partition A: 1, 2, 3, 4, 5 
Partition C: 6, 7, 8, 9, 10الگوریتم coalesce داده هارا از پارتیشن B به پارتیشن A و همچنین از پارتیشن D به پارتیشن C منتقل می‌کند. دیتای پارتیشن های A و C جابجا نمی‌شوند. این الگوریتم سرعت بالایی دارد زیرا تلاش می‌کند جابجایی دیتا بین نود هارا به حداقل برساند.افزایش تعداد پارتیشن هاشما می‌توانید سعی کنید که با coalesce تعداد پارتیشن هارا بیشتر کنید ولی کار نمی‌کند!df = df.coalesce(6) 
df.rdd.getNumPartitions() 
# 4دیتافریم ما با ۴ پارتیشن می‌ماند با این که ما سعی کردیم تعداد آن را با این روش به ۶ برسانیم.متد coalesce تعداد پارتیشن هارا با جابجایی مقداری از دادگان پارتیشن های حال به باقی پارتیشن ها کمتر می‌کند. این روش به وضوح نمی‌تواند تعداد پارتیشن هارا افزایش دهد.متد repartitionمتد repartition را می‌توان هم برای افزایش و هم کاهش تعداد پارتیشن های دیتافریم اسپارک استفاده کرد.بیایید با استفاده از این روش دیتافریم را ریپارتیشن نماییم:df = df.repartition(2) 
df.rdd.getNumPartitions() 
# 2حالا بگذارید ببینیم که در هر کدام از پارتیشن ها چه مقادیر داده ای وجود دارد:Partition ABC: 1, 3, 5, 6, 8, 10 
Partition XYZ: 2, 4, 7, 9پارتیشن ABC شامل مقادیری از دیتافریم اولیه از پارتیشن A, B, C, D می‌باشد. پارتیشن XYZ هم همینطور. متد repartition ابتدا به صورت کامل دادگان را شافل می کند، سپس به صورت مساوی دادگان را به تعداد خواسته شده تقسیم می‌نماید. این متد سعی ندارد تا مانند روش coalesce میزان انتقال دادگان بین نود هارا کاهش دهد.افزایش تعداد پارتیشن هااز متد repartition می‌توان برای افزایش تعداد پارتیشن ها استفاده نمود:df = df.repartition(6)  
df.rdd.getNumPartitions()  
# 6اینجا می‌توانید ببینید تا این روش چگونه دادگان را بین پارتیشن های مختلف تقسیم نموده است:Partition 00000: 5, 7 
Partition 00001: 1 
Partition 00002: 2 
Partition 00003: 8 
Partition 00004: 3, 9 
Partition 00005: 4, 6, 10متد repartition ابتدا دادگان را به صورت کامل برهم می‌ریزد سپس دوباره آن هارا به تعداد دسته خواسته شده تقسیم می‌کند، لذا این متد برعکس متد coalesce می‌تواند تعداد پارتیشن هارا افزایش دهد.تفاوت های متد repartition و coalesceمتد repartition دادگان را ابتدا به صورت کامل شافل می‌کند و سپس به تعداد پارتیشن مورد نظر تقسیم می‌کند. ولی متد coalesce پارتیشن های حال را درهم ادغام می‌کند و از شافل کردن دادگان می‌پرهیزد.انجام repartition بر روی ستون‌هابگذارید از دیتافریم زیر برای توضیح دادن این که چطور می‌توان بر روی ستون عمل repartition را انجام داد استفاده کنیم:#+-----+-------+ 
#| age | color | 
#+-----+-------+ 
#|  10 | blue  | 
#|  13 | red   | 
#|  15 | blue  | 
#|  99 | red   | 
#|  67 | blue  | 
#+-----+-------+بگذارید بر روی ستون رنگ ریپارتیشن کنیم.df = df.repartition(&#039;color&#039;)وقتی بر روی ستون ها ریپارتیشن انجام می‌دهید اسپارک به طور دیفالت ۲۰۰ پارتیشن بر اساس آن ستون ایجاد می‌کند. به عنوان مثال داده مثال زده شده حاوی ۱۹۸ پارتیشن خالی و دو پارتیشن زیر خواهد بود.Partition 00091: 
13,red
99,red
Partition 00168:
10,blue
15,blue
67,blueاین دیتافریم شامل ۲ پارتیشن دارای دیتا که هر کدام از آن ها یک دسته از رنگ هارا در خود جای می‌دهد می‌باشد. عمل پارتیشن کردن بر اساس ستون ها مشابه ایندکس براساس ستون در دیتابیس های سنتی است. از پارتیشن بر اساس ستون می‌توان در عمل های join که سنگین هستند و در حالت عادی بسیار طولانی خواهد بود استفاده نمود. به این صورت که دیتافریم های rdd را بر اساس ستونی که قصد join روی آن داریم ریپارتیشن میکنیم و این باعث می‌شود تا انتقال دیتا بین نود ها کمتر شود و عملا join ما سریع تر انجام گردد.یک مثال واقعیفرض کنید یک دیتا لیک شامل ۲ میلیارد سطر و حدود ۱۳ هزار پارتیشن ذخیره شده بر روی فایل سیستم داریم.شما می‌خواهید یک نمونه از داده که یک میلیون بار کمتر است برای استفاده توسعه کد خود انتخاب کنید. این نمونه معمولا برای توسعه استفاده می‌گردد و در نهایت باید مدل توسعه داده شده خود را بر روی تمام دیتاها پیاده سازی نمایید. همچنین اینجا این نمونه داده را بر روی S3 ذخیره می‌کنیم.dataPuddle = dataLake.sample(true, 0.000001)
dataPuddle.write.parquet(&amp;quots3a://my_bucket/puddle/&amp;quot)اسپارک به طور پیشفرض تعداد پارتیشن هارا بهینه نمی‌کند. پس نمونه دیتایی ما در واقع ۲۰۰۰ سطر دارد و همچنان ۱۳۰۰۰ پارتیشن، در نتیجه تعداد زیادی پارتیشن خواهیم داشت که خالی هستند. این موضوع اصلا برای محاسبات و نوشتن بر روی سرور S3 بهینه نیست لذا بهتر است تعداد پارتیشن هارا کاهش دهیم:dataPuddle = dataLake.sample(true, 0.000001)
goodPuddle = dataPuddle.repartition(4)
goodPuddle.write.parquet(&amp;quots3a://my_bucket/puddle/&amp;quot)چرا ۴ پارتیشن؟داده نمونه گیری شده یک میلیون بار کمتر است، پس ما باید تعداد پارتیشن هارا همان مقدار کمتر کنیم تا در هر پارتیشن میزان برابری دیتا داشته باشیم. ۱ = ۱۳۰۰۰/۱۰۰۰۰۰۰ (به بالا رند شده) ما تعداد پارتیشن هارا ۴ در نظر گرفتیم که پردازش موازی همچنان در سرور انجام گردد. به طور کلی بهتر هست که تعداد پارتیشن های دیتافریم اسپارکمون ۲،۳ و یا ۴ برابر تعداد کور های سرورمون باشه (برای داده های خیلی حجیم بهتره ۸ رو امتحان کنید!) number_of_partitions = number_of_cpus * 4اگه دیتاتون رو خارج از فایل سیستم میگیرین (مثلا فایل CSV یا از دیتابیس های سنتی) اسپارک به صورت دیفالت این دیتافریم اسپارک رو به تعداد پارتیشن بهینه با توجه به حجم دیتا و تعداد کور های سرور تبدیل می‌کنه و در این حالت نیازی به ریپارتیشن نیست.چرا repartition استفاده کردیم و نه coalesce ؟ شافل کردن داده ها یک عمل وقت گیر است. ولی از طرفی در نمونه گفته شده فقط ۲۰۰۰ سطر داده داریم لذا دیتاهارا شافل کردیم تا در هر پارتیشن میزان برابری دیتا وجود داشته باشد.میزان بهبود سرعت؟به طور کلی ۲۴۰ ثانیه طول کشید تا تعداد سطر های دیتافریم اولیه (که ۱۳۰۰۰ پارتیشن داشت) شمرده شود. در عوض در نمونه پارتیشن شده فقط ۲ ثانیه طول کشید تا همین عمل انجام گردد! این یعنی ۱۲۰ برابر سریع تر.توجه کنید که اگر دیتای خود رو از روی پارکت و یا فایل هایی از این نوع می‌خونید همونطور که گفته شد تعداد پارتیشن هاتون برابر با تعداد پارکت فایل هایی هست که بر روی سرور HDFS ذخیره شده. (این که روزانه چند تا پارکت فایل هست رو میتونید از مهندس داده تیمتون بپرسید). به عنوان مثال فرض کنید روزانه ۲۴ پارکت فایل بر روی سرور ذخیره میشه و ما میخوایم دیتای ۳ ماه رو تحلیل کنیم. این یعنی هنگامی که دیتا رو می‌خونیم دیتافریم اسپارکمون 24*90 پارتیشن خواهد داشت. ولی هر اپلیکشن سرور ما چند تا کور داره؟؟ تعداد پارتیشنمونو باید با توجه به تعداد کور های سرورمون بهینه کنیم.این مساله غالبا یک مشکل توی کد هایی هست که با داده ها آشنایی کافی رو ندارند. مخصوصا این که افراد کدشون رو برای دیتای یک روز توسعه می‌دهند و در نهایت میخوان برای بازه خیلی بزرگتری اینو دوباره اصطلاحا ران کنن. احتمالا باید به پارتیشن های خود فکر کنیدعمل پارتیشن کردن یک عمل سطح پایین است که احتمالا باید به صورت پیشفرض توسط اسپارک انجام می‌شد. ولی انجام نمی‌شود! وقتی دیتای بزرگی را فیلتر می‌کنید احتمالا باید همیشه عمل ریپارتیشن را بر روی دیتافریم rdd خود انجام دهید. </description>
                <category>صولت هوشمند</category>
                <author>صولت هوشمند</author>
                <pubDate>Sat, 07 Mar 2020 17:25:52 +0330</pubDate>
            </item>
            </channel>
</rss>