بهینهسازیهایی که در این مطلب آمده، حاصل مشاهدههای عملی افراد کامیونیتی اسپارک و تمرکز آنها بر ماکسیسممسازی اختصاص منابع در کلاسر برای افزایش کارایی است.
حجم کار زیاد در اسپارک، معمولا دستهای از جابها است- بعضی از آنها طبق عادت شبانه اجرا میشوند در حالیکه بعضی دیگر برنامهریزی شدهاند که در ساعات مشخصی از روز کار کنند. در بعضی مواقع، این جابها ممکن است دهها ترابایت از داده را پردازش کنند. برای جلوگیری از بروز مشکل بخاطر کمبود منابع یا کاهش عملکرد در انجام جاب، تعدادی کانفیگوریشن وجود دارد که میتوانید فعال یا تغییر دهید. این کانفیگوریشنها ۳ تا از اجزای اسپارک را تغییر میدهد: Spark Driver، Spark Executor و Shuffle Service که روی Executor در حال اجرا است.
وظیفهی Spark Driver، هماهنگ شدن با Cluster Manager برای اجرای Executorها در یک کلاستر و برنامهریزی تسکهای اسپارک بر روی آنها است. در حجم کار زیاد، ممکن است هزاران تسک داشته باشیم. بعضی از تنظیماتی که در اینجا معرفی میشوند، حاصل تجربههای شرکتهای بزرگی مانند Facebook است که این تجربهها را در سمینار Spark + AI Summit به اشتراک گذاشتهاند.
وقتی منابع محاسباتی را از طریق روشی مانند spark-submit در command-line مشخص میکنیم، حداکثر را مشخص میکنیم. این یعنی اگر بعد ها حین انجام عملیات، اسپارک به منابع بیشتر از حد مشخص شده احتیاج داشت، نمیتواند منابع بیشتری اختصاص دهد.
در برابر اگر از کانفیگوریشن تخصیص منابع پویا در اسپارک استفاده کنید، Spark Driver میتواند با افزایش حجم کار، درخواست افزایش منبع بدهد.
یکی از کاربردها در Streaming است، یعنی حالتی که حجم جریان داده غیر یکنواخت باشد. یکی دیگر از کاربردها تحلیل دادهی on-demand است، یعنی حالتی که شما حجم بالایی از کوئریهای SQL در ساعات اوج دارید. با فعالسازی تخصیص حافظهی پویا، به اسپارک اجازه میدهید بهتر منابع را مدیریت کند، Executorهایی را که استفاده نمیشوند آزاد کند و در مواقع نیاز از منابع جدید استفاده کند.
فقط فعال کردن تخصیص منابع پویا کافی نیست. همچنین باید بدانید که تخصیص حافظهی Executor چگونه است و اسپارک چگونه از آنها استفاده میکند تا Executorها بدون حافظه نمانند یا JVM Garbage Collection به مشکل بر نخورد.
مقدار حافظهای که هر Executor دارد توسط spark.executor.memory کنترل میشود. همانطور که در شکل زیر نمایش داده شده، به ۳ بخش تقسیم میشود: Execution Memory, Storage Memory, Reserved Memory. تقسیم دیفالت به صورت ۳۰۰ مگابایت برای Reserved Memory که با ارورهای OOM روبرو نشویم و سپس ۶۰ درصد برای Execution Memory و ۴۰ درصد برای Storage Memory است. داکیومنت اسپارک توصیه میکند که این حالت در بیشتر مواقع پاسخگو خواهد بود. هر وقت از Storage Memory استفاده نشود، اسپارک میتواند از آن برای Execution Memory استفاده کند و همینطور برعکس.
کاربرد Execution Memory در Spark Shuffle, join, sort, aggregation است. از آنجایی که کوئریهای مختلف ممکن است به مقدار حافظهی متفاوت احتیاج داشته باشند، اختصاص کسر مقدار حافظه ی موجود احتیاج به کمی مهارت دارد. (spark.memory.fraction مقدار دیفالت ۰.۶ است) Storage Memory در کشکردن ساختمان دادههای کاربر و پارتیشنهایی که از DataFrame استخراج شده، کاربرد دارد.
در طی عملیات map و shuffle، اسپارک از فایلهای شافل دیسک محلی میخواند و به آن مینویسد، پس فعالیت I/O فراوانی وجود دارد. این منجر به باتلنک میشود چون کانفیگوریشن دیفالت، مناسب جابهای اسپارک در مقیاس بالا است. دانستن اینکه کدام کانفیگوریشنها را باید تغییر بدهیم میتواند ریسک را در این مرحله کاهش دهد.
در شکر زیر تعدادی کانفیگوریشن توصیه شده را میبینید که با تنظیم آنها، map, spill و merge با کمبود I/O روبرو نمیشوند و میتوانند قبل از نوشتن پارتیشنهای شافل به دیسک، از بافر مموری استفاده کنند. تنظیم Shuffle Serviceای که در هر Executor در حال اجرا میباشد هم میتواند عملکرد کلی را افزایش دهد.
موارد پیشنهادی در این عکس در همهی مواقع صادق نیستند اما میتوانند ایدهی خوبی بدهند که چگونه طبق حجم کار،آنها را تغییر دهید.
بیشتر کارایی اسپارک در توانایی آن در انجام چند تسک بهصورت موازی است. برای اینکه بفهمید چطور میتوانید موازیکاری را به بیشترین مقدار ممکن برسانید، باید ببینید که اسپارک چگونه داده را از فضای ذخیرهسازی به حافظه میخواند و پارتیشنها چه معنایی برای اسپارک دارند.
در اصطلاحات مدیریت داده، پارتیشن یک روش برای مرتبکردن داده به زیرمجموعهای از تکهها یا بلاکهای قابل خواندن و قابل کانفیگکردن دادههای مجاور روی دیسک است. این زیرمجموعههای داده میتوانند بهصورت مستقل یا موازی، در حداقل یک thread، خوانده و یا پردازش شوند. این استقلال مهم است چون اجازهی موازیسازی پردازش داده در مقیاس بالا را میدهد.
اسپارک به طرز وحشتناکی در پردازش تسکها به صورت موازی بهینه است. همانطور که در مطلب ****لینک به مطلب **** خواندیم، در حجم کار بالا، یک جاب Stageهای زیادی خواهد داشت و در هر Stage تعداد زیادی تسک خواهد بود. اسپارک به بهترین حالت یک thread به ازای هر تسک به ازای هر هسته برنامهریزی میکند و هر تسک یک پارتیشن مجزا را پردازش خواهد کرد. برای بهینهسازی تخصیص منابع و حداکثر کردن موازیکاری، ایدهال این است که تعداد پارتیشنها حداقل به اندازهی هستههای Executorها باشد. اگر تعداد پارتیشنها از تعداد هستههای Executorها بیشتر باشد، تمام هستهها مشغول خواهند شد. میتوانید پارتیشن را واحدهای اتمی در موازیکاری در نظر بگیرید: یک thread در یک هسته میتوانید روی یک پارتیشن کار کند.
پارتیشنها چگونه ساخته میشوند
همانطور که گفتیم، تسکهای اسپارک داده را به صورت پارتیشنهایی از دیسک به حافظه میخوانند.بسته به نوع ذخیرهسازی، دادهی روی دیسک به صورت chunkها یا بلاکهای فایل پشت سر هم ذخیره شده است. بهصورت دیفالت، بلاکهای فایل در دیتا استورها سایزی بین ۶۴ مگابایت تا ۱۲۸ مگابایت دارند. برای مثال در HDFS و S3 سایز دیفالت ۱۲۸ مگابایت است. کالکشنی مجاور از این بلاکها تشکیل پارتیشن میدهند.
در اسپارک، سایز پارتیشنها توسط spark.sql.files.maxPartitionBytes کنترل میشود که دیفالت آن ۱۲۸ مگابایت است. میتوانید آن را کاهش دهید اما ممکن است با مشکلی به نام Small File Problem روبرو شوید؛ تعداد خیلی زیادی از پارتیشنهای کوچک فایل، که منجر به تعداد زیادی I/O میشود و به لطف عملیاتی مانند باز و بستن، کارایی کاهش میابد.
پارتیشنها همچنین وقتی از متدهای خاصی از DataFrame API استفاده میکنید، ساخته میشوند. برای مثال، به هنگام ساختن یک DataFrame بزرگ یا خواندن یک فایل بزرگ از دیسک. میتوانید به اسپارک بگویید که یک مقدار معین از پارتیشنها را تولید کند. برای این کار از متد ()repartition استفاده میکنیم.
در نهایت، پارتیشنهای شافل به هنگام مرحلهی شافل ساخته میشوند. به طور دیفالت تعداد پارتیشنهای شافل ۲۰۰ است که میتوانید بر اساس اندازهی داده آن را تغییر دهید. با تغییر آن،میتوانید مقدار پارتیشنهای کوچکی که در در شبکه به Executorها ارسال میشود، تغییر دهید.
مقدار دیفالت spark.sql.shuffle.partitions برای کارها یا استریمهای کوچک، بسیار زیاد است. بهتر است آن را به مقادیر کمی مانند تعداد هستههای Executorها و یا کمتر کاهش دهید.
شافل پارتیشنها که به هنگام عملیات Wide Transformation مانند ()groupBy یا ()join ساخته میشوند، هم شبکه و هم منابع I/O را مصرف میکنند. در این عملیات، نتایج در دیسکهای محلی Executorها در مسیری که توسط spark.local.directory مشخص شده است، ذخیره میشود. داشتن SSD منجر به بهتر شدن عملکرد این عملیات خواهد شد.
هیچ فرمول طلایی برای تعیین تعداد شافل پارتیشنها وجود ندارد؛ تعداد بسته به کار شما، دیتاست، تعداد هسته و مقدار حافظهی Executorها متغیر است.
برای افزایش بیشتر کارایی اسپارک در حجم کاری بالا، بهتر است دیتافریمهایی را که مدام دسترسی پیدا میکند، ذخیره یا کش کنید.
تفاوت بین کشکردن و ذخیرهکردن چیست؟
تابع () cache تا جایی که حافظه اجازه دهد، هر مقدار پارتیشن خوانده شده در حافظهی Executorها را ذخیره میکند. ممکن است یک کسری از DataFrame را بتوانید کش کنید اما پارتیشنها را نمیتوانید؛ یعنی اگر ۸ پارتیشن داشته باشید و فقط ۴.۵ تا پارتیشن در حافظه قرار بگیرد، فقط ۴ تا کش خواهد شد. اگر همهی پارتیشنها کش نشوند و بخواهید به داده دوباره دسترسی پیدا کنید، پارتیشنهایی که کش نشدهاند دوباره باید محاسبه شوند که جاب را کند خواهند کرد.
وقتی از ()cache یا ()persist استفاده میکنید، تمام DataFrame کش نمیشود مگر تا زمانی که یک Action را فعال کنید که بر تمام نمونهها تاثیر میگذارد( مثال ()count) اگر از یک Action مانند (1)take استفاده کنید، فقط یک پارتیشن کش میشود چون Catalyst Optimizer متوجه میشود که شما برای دسترسی به یک نمونه، نیازی به محاسبهی تمام پارتیشنها ندارید.
در شکل زیر میبینیم که چگونه یک DataFrame در یک Executor ذخیره شده است. میبینیم که همه در حافظه جای گرفتهاند.
تابع (StorageLevel.LEVEL)persist اختلاف جزئیای با تابع قبلی دارد. این تابع امکان کنترل چگونگی کش شدن داده از طریق StorageLevel را میدهد. شکل زیر تفاوت سطوح مختلف ذخیرهسازی را نشان میدهد. دادهی روی دیسک همیشه میتواند با استفاده از Java Serilization یا Kyro Serialization سریالسازیشده شود.
هر StorageLevel(به جز OFF_HEAP) یک معادل LEVEL_NAME_2 دارد که یعنی در دو Spark Executor متفاوت ذخیره شود:MEMORY_ONLY_2, MEMORY_AND_DIST_SER_2 و ...اگرچه این کار هزینهبر است اما اجازه میدهد داده در ۲ جا ذخیره شود که باعث Fault Tolerance میشود.
استفادههای رایج از کشکردن در مواقعی است که میخواهید مدام برای ترنسفورمها و کوئریها، به دیتاستی بزرگ دست پیدا کنید؛ مانند
استفاده از کش در همهی کاربریها الزامی نیست. مانند
بهعنوان یک قانون رایج بهتر است از Memory Caching عاقلانه استفاده کنید چون میتواند منجر به هزینه در serialize و deserialize شود. (بسته به StorageLevel ای که استفاده میکنید)
عملیات Join از ترنسفورمهای رایج در تحلیل داده است که در آن ۲ دیتاست، به شکل تیبل یا DataFrame، توسط یک matching key ادغام میشوند. مانند Relational Databaseها، اسپارک در DataFrame و DatasetAPI ا، Joinهای inner join, outer join, left join, right join و ... را ارائه میدهد. تمام این عملیات باعث حرکت مقدار زیادی داده در Executorها میشوند.
در قلب این ترنسفورمها، نحوهی محاسبهی اینکه چه دادهای تولید شود، چه keyها و کدام دادههای مربوط به آن به دیسک نوشته شود و چگونگی انتقال این keyها و دادهها به نودها بهعنوان قسمتی از عملیاتی مانند ()groupBy، ()join, ()، agg و ... ، قرار دارد. به این حرکت معمولا Shuffle میگویند.
اسپارک پنج Join دارد که داده را در Executorها جابجا، مرتب، گروه و ادغام میکند:
در اینجا تمرکز خود را روی BHJ و SMJ میگذاریم چون پرکاربرد ترین هستند.
نام دیگر آن Map-Side-Only Join است. Broadcast Hash Join موقعی به کار میرود که ۲ دیتاست، یکی کوچک (که در حافظهی Driver و Executor جا میگیرد) و دیگری آنقدر بزرگ که از جابجایی آن صرف نظر کنیم، باید روی شرطها یا ستونهای معین Join شوند. با استفاده از یک Spark Broadcast Variable، دیتاست کوچکتر توسط Driver به تمام Executorها broadcast میشود و بعد روی هر Executor با دیتاست بزرگتر Join میشود.
بهصورت دیفالت، اگر دیتاست کوچکتر از ۱۰ مگابایت باشد، اسپارک از Broadcast Join استفاده خواهد کرد. میتوانید این مقدار را با spark.sql.autoBroadcastJoinThreshold تغییر دهید.
یک کاربرد رایج آن وقتی است که بین ۲ دیتافریم، که یکی حجم کمتری نسبت به دیگری دارد، یک سری Keyهای مشترک وجود دارد و شما اختیاج دارید که یک صورت ادغام شده از این ۲ را ببینید.
راحتترین و سریعترین Join در اسپارک BHJ است چون هیچ جابجایی داده (Shuffle) وجود ندارد؛ بعد از Broadcast، تمام داده بهصورت محلی برای Executor قابل دسترس است. فقط باید مطمئن شوید که Driver و Executorها حافظهی کافی دارند تا بتوانند دیتاست کوچکتر را در حافظه نگه دارند.
چه موقع از Broadcast Hash Join استفاده کنیم
اگر مقدار spark.sql.autoBroadcastJoinThreshold را ۱- قرار دهید، اسپارک همیشه از Shuffle Sort Merge Join استفاده خواهد کرد.
الگوریتم Sort-Merge یک راه بهینه است برای ادغام ۲ دیتاست بزرگ روی Key مشترک که یکتا و قابل Sort است و میتوان آن را در پارتیشن یکسان ذخیره کرد؛ به این معنا که دو دیتاست با یک key قابل هش و مشترک را میتوان در یک پارتیشن یکسان ذخیره کرد. در نگاه اسپارک این یعنی تمام ردیفهای درون هر دیتاست با key یکسان بر روی پارتیشن یکسان در Executor یکسان هش میشوند. مشخص است که این یعنی داده باید مکان یکسانی داشته باشد یا بین Executorها جابجا شود.
همانطور که از نامش پیداست، این Join دو فاز دارد: یک فاز sort و یک فاز merge. فاز sort هر دیتاست را با join key دلخواه مرتب میکند؛ فاز merge در ردیف هر دیتاست، روی هر key پیمایش میکند و اگر دو key با هم مچ شدند، آنها را merge میکند.
چه موقع از Shuffle Sort Merge Join استفاده کنیم
اسپارک یک محیط کاربری تحت وب ارائه میدهد که امکان بررسی اجزای مختلف برنامهی ما را فراهم میکند. امکاناتی که فراهم میکند شامل حافظهی مصرفی، جابها، استیجها، تسکها، تایملاینها و آمار و معیارهایی است که برای درک کردن برنامهی اسپارک ما مفید است. spark-submit محیط کاربری را آغاز میکند و میتوانید در حالت لوکال در پورت دیفالت ۴۰۴۰ به آن دسترسی پیدا کنید.
این محیط کاربری شامل ۶ تب است. در نسخهی ۳ یک تب دیگر با نام Structured Streaming اضافه شده است.
همانطور که میدانید، اسپارک یک اپلیکشن را به جابها، استیجها و تسکهای مختلف تبدیل میکند. تبهای جاب و استیج به شما اجازه میدهد ریز به ریز جزئیات را بررسی کنید. میتوانید میزان پیشرفت، معیارهای مربوط به I/O، حافظهی مصرفی، مدت زمان اجرا و ... را مشاهده کنید.
شکل زیر تب جاب را با Event Timeline گسترش یافته نشان میدهد. میتوان مشاهده کرد که کی Executorها اضافه یا حذف شدهاند. همچنین یک لیست از جابهای تمام شده نشان میدهد. ستون Duration مدت زمان هر جاب را نشان میدهد. اگر این زمان زیاد باشد، بهتر است استیجهای آن جاب را ببینید و بفهمید که کدام تسک باعث طولانی شدن شده است. از این صفحه میتوانید به صفحهی پر جزيیات هر جاب که شامل DAG نیز هم هست، دسترسی پیدا کنید.
تب استیج یک خلاصه از وضعیت فعلی تمام استیجهای تمام جابها میدهد. همچنین میتوانید به یک صفحهی پرجزئیات برای هر استیج نیز دست پیدا کنید. میتوانید میانگین زمان اجرا برای هر تسک، زمان سپری شده در Garbage Collection و تعداد بایتها/نمونههای خوانده شده در شافل را ببینید. اگر دادهی شافل از Executorهای ریموت خوانده میشود، Shuffle Read Blocked Time بالا میتواند نشاندهندهی مشکلات I/O باشد. زمان GC بالا میتواند نشاندهندهی وجود آبجکتهای زیادی در Heap باشد(ممکن است Executorهای شما به حافظهی زیادی احتیاج داشته باشند) اگر بیشترین زمان یک تسک بسیار بزرگتر از متوسط باشد، احتمالا بخاطر توزیع نامساوی داده در پارتیشنها، با Data Skew روبرو شدهاید.
این تب اطلاعاتی دربارهی Executorهای برنامه در اختیار میگذارد. میتوانید جزئیات شکل زیر را ببینید.
علاوه بر آمارها، میتوانید ببینید که هر Executor چگونه از حافظه و برای چه کاری استفاده میکند. همچنین این کمک میکند تا میزان مصرف منابع را به هنگام استفاده از توابع cache یا persist بررسی کنیم.
این تب اطلاعات هر تیبل یا DataFrame که برنامه آن را کش کرده، نشان میدهد.
اثر کوئریهای Spark SQL که در برنامهی شما اجرا میشوند، قابل دنبالکردن و دیدن هستند. میتوانید ببینید که هر جاب چه موقع کدام کوئری را اجرا میکند و مدت زمان آن چقدر بوده است.
با کلیک بر روی هر کوئری، جزيات و برنامهی اجرای آن نمایش داده میشود. این جزئیات در مواقعی که میخواهیم جزئیات هر اپراتور و تغییرات آنها را ببینیم، مفید است.
شناختن محیطی که برنامهی اسپارک در آن در حال اجرا است، نکات مفید زیادی را ارائه میکند که در عیبیابی مفید هستند. در واقع، ضروری است که از متغیرهای محیط مطلع باشیم، بدانیم از چه jarهایی استفاده شده است، مقدار متغیرهای اسپارک و سیستم چیست و ... تمام این اطللاعات read-only یک معدن طلا از اطلاعات هستند که به هنگام برخورد با یک رفتار غیرطبیعی در برنامه، باید آنها را بررسی کنیم.