مجتبی میر یعقوب زاده
مجتبی میر یعقوب زاده
خواندن ۱۳ دقیقه·۳ سال پیش

بهینه‌سازی Apache Spark

بهینه‌سازی‌هایی که در این مطلب آمده، حاصل مشاهده‌های عملی افراد کامیونیتی اسپارک و تمرکز آن‌ها بر ماکسیسمم‌سازی اختصاص منابع در کلاسر برای افزایش کارایی است.


افزایش مقیاس اسپارک برای حجم کار زیاد

حجم کار زیاد در اسپارک، معمولا دسته‌ای از جاب‌ها است- بعضی از آن‌ها طبق عادت شبانه اجرا می‌شوند در حالیکه بعضی دیگر برنامه‌ریزی شده‌اند که در ساعات مشخصی از روز کار کنند. در بعضی مواقع، این جاب‌ها ممکن است ده‌ها ترابایت از داده‌ را پردازش کنند. برای جلوگیری از بروز مشکل بخاطر کمبود منابع یا کاهش عملکرد در انجام جاب، تعدادی کانفیگوریشن وجود دارد که می‌توانید فعال یا تغییر دهید. این کانفیگوریشن‌ها ۳ تا از اجزای اسپارک را تغییر می‌دهد: Spark Driver، Spark Executor و Shuffle Service که روی Executor در حال اجرا است.

وظیفه‌ی Spark Driver، هماهنگ شدن با Cluster Manager برای اجرای Executorها در یک کلاستر و برنامه‌ریزی تسک‌های اسپارک بر روی آن‌ها است. در حجم کار زیاد، ممکن است هزاران تسک داشته باشیم. بعضی از تنظیماتی که در اینجا معرفی می‌شوند، حاصل تجربه‌های شرکت‌های بزرگی مانند Facebook است که این تجربه‌ها را در سمینار Spark + AI Summit به اشتراک گذاشته‌اند.

تخصیص منابع ایستا (Static) در برابر پویا (Dynamic)

وقتی منابع محاسباتی را از طریق روشی مانند spark-submit در command-line مشخص می‌کنیم، حداکثر را مشخص می‌کنیم. این یعنی اگر بعد ها حین انجام عملیات، اسپارک به منابع بیشتر از حد مشخص شده احتیاج داشت، نمی‌تواند منابع بیشتری اختصاص دهد.

در برابر اگر از کانفیگوریشن تخصیص منابع پویا در اسپارک استفاده کنید، Spark Driver می‌تواند با افزایش حجم کار، درخواست افزایش منبع بدهد.

یکی از کاربردها در Streaming است، یعنی حالتی که حجم جریان داده غیر یکنواخت باشد. یکی دیگر از کاربردها تحلیل داده‌ی on-demand است، یعنی حالتی که شما حجم بالایی از کوئری‌های SQL در ساعات اوج دارید. با فعال‌سازی تخصیص حافظه‌ی پویا، به اسپارک اجازه می‌دهید بهتر منابع را مدیریت کند، Executorهایی را که استفاده نمی‌شوند آزاد کند و در مواقع نیاز از منابع جدید استفاده کند.

مقدار حافظه‌ی Spark Executor و Shuffle Service

فقط فعال کردن تخصیص منابع پویا کافی نیست. همچنین باید بدانید که تخصیص حافظه‌ی Executor چگونه است و اسپارک چگونه از آن‌ها استفاده می‌کند تا Executorها بدون حافظه نمانند یا JVM Garbage Collection به مشکل بر نخورد.

مقدار حافظه‌ای که هر Executor دارد توسط spark.executor.memory کنترل می‌شود. همانطور که در شکل زیر نمایش داده شده، به ۳ بخش تقسیم می‌شود: Execution Memory, Storage Memory, Reserved Memory. تقسیم دیفالت به صورت ۳۰۰ مگابایت برای Reserved Memory که با ارورهای OOM روبرو نشویم و سپس ۶۰ درصد برای Execution Memory و ۴۰ درصد برای Storage Memory است. داکیومنت اسپارک توصیه می‌کند که این حالت در بیشتر مواقع پاسخگو خواهد بود. هر وقت از Storage Memory استفاده نشود، اسپارک می‌تواند از آن برای Execution Memory استفاده کند و همینطور برعکس.

کاربرد Execution Memory در Spark Shuffle, join, sort, aggregation است. از آنجایی که کوئری‌های مختلف ممکن است به مقدار حافظه‌ی متفاوت احتیاج داشته باشند، اختصاص کسر مقدار حافظه ی موجود احتیاج به کمی مهارت دارد. (spark.memory.fraction مقدار دیفالت ۰.۶ است) Storage Memory در کش‌کردن ساختمان داده‌های کاربر و پارتیشن‌هایی که از DataFrame استخراج شده، کاربرد دارد.

در طی عملیات map و shuffle، اسپارک از فایل‌های شافل دیسک محلی می‌خواند و به آن می‌نویسد، پس فعالیت I/O فراوانی وجود دارد. این منجر به باتل‌نک می‌شود چون کانفیگوریشن دیفالت، مناسب جاب‌های اسپارک در مقیاس بالا است. دانستن اینکه کدام کانفیگوریشن‌ها را باید تغییر بدهیم می‌تواند ریسک را در این مرحله کاهش دهد.

در شکر زیر تعدادی کانفیگوریشن توصیه شده را می‌بینید که با تنظیم آن‌ها، map, spill و merge با کمبود I/O روبرو نمی‌شوند و می‌توانند قبل از نوشتن پارتیشن‌های شافل به دیسک، از بافر مموری استفاده کنند. تنظیم Shuffle Serviceای که در هر Executor در حال اجرا می‌باشد هم می‌تواند عملکرد کلی را افزایش دهد.

موارد پیشنهادی در این عکس در همه‌ی مواقع صادق نیستند اما می‌توانند ایده‌ی خوبی بدهند که چگونه طبق حجم کار،‌آن‌ها را تغییر دهید.

افزایش موازی‌کاری اسپارک

بیشتر کارایی اسپارک در توانایی آن در انجام چند تسک به‌صورت موازی است. برای اینکه بفهمید چطور می‌توانید موازی‌کاری را به بیشترین مقدار ممکن برسانید، باید ببینید که اسپارک چگونه داده را از فضای ذخیره‌سازی به حافظه می‌خواند و پارتیشن‌ها چه معنایی برای اسپارک دارند.

در اصطلاحات مدیریت داده، پارتیشن یک روش برای مرتب‌کردن داده به زیرمجموعه‌ای از تکه‌ها یا بلاک‌های قابل خواندن و قابل کانفیگ‌کردن داده‌‌های مجاور روی دیسک است. این زیرمجموعه‌های داده می‌توانند به‌صورت مستقل یا موازی، در حداقل یک thread، خوانده و یا پردازش شوند. این استقلال مهم است چون اجازه‌ی موازی‌سازی پردازش داده در مقیاس بالا را می‌دهد.

اسپارک به طرز وحشتناکی در پردازش تسک‌ها به ‌صورت موازی بهینه است. همانطور که در مطلب ****لینک به مطلب **** خواندیم، در حجم کار بالا، یک جاب Stageهای زیادی خواهد داشت و در هر Stage تعداد زیادی تسک خواهد بود. اسپارک به بهترین حالت یک thread به ازای هر تسک به ازای هر هسته برنامه‌ریزی می‌کند و هر تسک یک پارتیشن مجزا را پردازش خواهد کرد. برای بهینه‌سازی تخصیص منابع و حداکثر کردن موازی‌کاری، ایده‌ال این است که تعداد پارتیشن‌ها حداقل به اندازه‌ی هسته‌های Executorها باشد. اگر تعداد پارتیشن‌ها از تعداد هسته‌های Executorها بیشتر باشد، تمام هسته‌ها مشغول خواهند شد. می‌توانید پارتیشن را واحدهای اتمی در موازی‌کاری در نظر بگیرید: یک thread در یک هسته می‌توانید روی یک پارتیشن کار کند.

پارتیشن‌ها چگونه ساخته می‌شوند

همانطور که گفتیم، تسک‌های اسپارک داده را به صورت پارتیشن‌هایی از دیسک به حافظه می‌خوانند.بسته به نوع ذخیره‌سازی، داده‌ی روی دیسک به صورت chunkها یا بلاک‌های فایل پشت سر هم ذخیره شده است. به‌صورت دیفالت، بلاک‌های فایل در دیتا استورها سایزی بین ۶۴ مگابایت تا ۱۲۸ مگابایت دارند. برای مثال در HDFS و S3 سایز دیفالت ۱۲۸ مگابایت است. کالکشنی مجاور از این بلاک‌ها تشکیل پارتیشن می‌دهند.

در اسپارک، سایز پارتیشن‌ها توسط spark.sql.files.maxPartitionBytes کنترل می‌شود که دیفالت آن ۱۲۸ مگابایت است. می‌توانید آن را کاهش دهید اما ممکن است با مشکلی به نام Small File Problem روبرو شوید؛ تعداد خیلی زیادی از پارتیشن‌های کوچک فایل‌، که منجر به تعداد زیادی I/O می‌شود و به لطف عملیاتی مانند باز و بستن، کارایی کاهش میابد.

پارتیشن‌ها همچنین وقتی از متدهای خاصی از DataFrame API استفاده می‌کنید، ساخته می‌شوند. برای مثال، به هنگام ساختن یک DataFrame بزرگ یا خواندن یک فایل بزرگ از دیسک. می‌توانید به اسپارک بگویید که یک مقدار معین از پارتیشن‌ها را تولید کند. برای این کار از متد ()repartition استفاده می‌کنیم.

در نهایت، پارتیشن‌های شافل به هنگام مرحله‌ی شافل ساخته می‌شوند. به طور دیفالت تعداد پارتیشن‌های شافل ۲۰۰ است که می‌توانید بر اساس اندازه‌ی داده آن را تغییر دهید. با تغییر آن،می‌توانید مقدار پارتیشن‌های کوچکی که در در شبکه به Executorها ارسال می‌شود، تغییر دهید.

مقدار دیفالت spark.sql.shuffle.partitions برای کارها یا استریم‌های کوچک، بسیار زیاد است. بهتر است آن را به مقادیر کمی مانند تعداد هسته‌های Executorها و یا کمتر کاهش دهید.

شافل پارتیشن‌ها که به هنگام عملیات Wide Transformation مانند ()groupBy یا ()join ساخته می‌شوند، هم شبکه و هم منابع I/O را مصرف می‌کنند. در این عملیات، نتایج در دیسک‌های محلی Executorها در مسیری که توسط spark.local.directory مشخص شده است، ذخیره می‌شود. داشتن SSD منجر به بهتر شدن عملکرد این عملیات خواهد شد.

هیچ فرمول طلایی‌ برای تعیین تعداد شافل پارتیشن‌ها وجود ندارد؛ تعداد بسته به کار شما، دیتاست، تعداد هسته و مقدار حافظه‌ی Executorها متغیر است.

برای افزایش بیشتر کارایی اسپارک در حجم کاری بالا، بهتر است دیتافریم‌هایی را که مدام دسترسی پیدا می‌کند، ذخیره یا کش کنید.

کش کردن و ذخیره کردن داده

تفاوت بین کش‌کردن و ذخیره‌کردن چیست؟

DataFrame.cache()

تابع () cache تا جایی که حافظه اجازه دهد، هر مقدار پارتیشن خوانده شده در حافظه‌ی Executorها را ذخیره می‌کند. ممکن است یک کسری از DataFrame را بتوانید کش کنید اما پارتیشن‌ها را نمی‌توانید؛ یعنی اگر ۸ پارتیشن داشته باشید و فقط ۴.۵ تا پارتیشن در حافظه قرار بگیرد، فقط ۴ تا کش خواهد شد. اگر همه‌ی پارتیشن‌ها کش نشوند و بخواهید به داده دوباره دسترسی پیدا کنید، پارتیشن‌هایی که کش نشده‌اند دوباره باید محاسبه شوند که جاب را کند خواهند کرد.

وقتی از ()cache یا ()persist استفاده می‌کنید، تمام DataFrame کش نمی‌شود مگر تا زمانی که یک Action را فعال کنید که بر تمام نمونه‌ها تاثیر می‌گذارد( مثال ()count) اگر از یک Action مانند (1)take استفاده کنید، فقط یک پارتیشن کش می‌شود چون Catalyst Optimizer متوجه می‌شود که شما برای دسترسی به یک نمونه، نیازی به محاسبه‌ی تمام پارتیشن‌ها ندارید.

در شکل زیر می‌بینیم که چگونه یک DataFrame در یک Executor ذخیره شده است. می‌بینیم که همه در حافظه جای گرفته‌اند.

DataFrame.persist()

تابع (StorageLevel.LEVEL)persist اختلاف جزئی‌ای با تابع قبلی دارد. این تابع امکان کنترل چگونگی کش شدن داده از طریق StorageLevel را می‌دهد. شکل زیر تفاوت سطوح‌ مختلف ذخیره‌سازی را نشان می‌دهد. داده‌ی روی دیسک همیشه می‌تواند با استفاده از Java Serilization یا Kyro Serialization سریال‌‌سازی‌شده شود.

هر StorageLevel(به جز OFF_HEAP) یک معادل LEVEL_NAME_2 دارد که یعنی در دو Spark Executor متفاوت ذخیره شود:‌MEMORY_ONLY_2, MEMORY_AND_DIST_SER_2 و ...اگرچه این کار هزینه‌بر است اما اجازه می‌دهد داده در ۲ جا ذخیره شود که باعث Fault Tolerance می‌شود.

چه موقع از Cache و Persist استفاده کنیم

استفاده‌های رایج از کش‌کردن در مواقعی است که می‌خواهید مدام برای ترنسفورم‌ها و کوئری‌ها، به دیتاستی بزرگ دست پیدا کنید؛ مانند

  • استفاده مداوم از DataFrameها به هنگام ترین یک مدل ماشین لرنینگ
  • دسترسی مداوم به DataFrameها به‌هنگام ترنسفورم‌های مداوم در ETL یا ساخت دیتا پایپ‌لاین


چه موقع از Cache و Persist استفاده نکنیم

استفاده از کش در همه‌ی کاربری‌ها الزامی نیست. مانند

  • دیتافریم‌هایی که برای گنجاندن در حافظه بسیار بزرگ هستند
  • یک ترنسفورم کم‌هزینه روی دیتافریم که قرار نیست مدام استفاده شود

به‌عنوان یک قانون رایج بهتر است از Memory Caching عاقلانه استفاده کنید چون می‌تواند منجر به هزینه در serialize و deserialize شود. (بسته به StorageLevel ای که استفاده می‌کنید)

خانواده‌ی Join در اسپارک

عملیات Join از ترنسفورم‌های رایج در تحلیل داده است که در آن ۲ دیتاست، به شکل تیبل یا DataFrame، توسط یک matching key ادغام می‌شوند. مانند Relational Databaseها، اسپارک در DataFrame و DatasetAPI ا، Joinهای inner join, outer join, left join, right join و ... را ارائه می‌دهد. تمام این عملیات باعث حرکت مقدار زیادی داده در Executorها می‌شوند.

در قلب این ترنسفورم‌ها، نحوه‌ی محاسبه‌ی اینکه چه داده‌ای تولید شود، چه keyها و کدام‌ داده‌های مربوط به آن به دیسک نوشته شود و چگونگی انتقال این keyها و داده‌ها به نودها به‌عنوان قسمتی از عملیاتی مانند ()groupBy، ()join, ()، agg و ... ، قرار دارد. به این حرکت معمولا Shuffle می‌گویند.

اسپارک پنج Join دارد که داده را در Executorها جابجا، مرتب، گروه و ادغام می‌کند:

  • ‌Broadcast Hash Join (BHJ)
  • Shuffle Hash Join (SHJ)
  • Shuffle Sort Merge Join (SMJ)
  • Broadcast Nested Loop Join (BNLJ)
  • Shuffle-and-Replicated Nested Loop Join (a.k.a. Cartesian product join)

در اینجا تمرکز خود را روی BHJ و SMJ می‌گذاریم چون پرکاربرد ترین هستند.

Broadcast Hash Join

نام دیگر آن Map-Side-Only Join است. Broadcast Hash Join موقعی به کار می‌رود که ۲ دیتاست، یکی کوچک (که در حافظه‌ی Driver و Executor جا می‌گیرد) و دیگری آنقدر بزرگ که از جابجایی آن صرف نظر کنیم، باید روی شرط‌ها یا ستون‌های معین Join شوند. با استفاده از یک Spark Broadcast Variable، دیتاست کوچکتر توسط Driver به تمام Executorها broadcast می‌شود و بعد روی هر Executor با دیتاست بزرگ‌تر Join می‌شود.

به‌صورت دیفالت، اگر دیتاست کوچک‌تر از ۱۰ مگابایت باشد، اسپارک از Broadcast Join استفاده خواهد کرد. می‌توانید این مقدار را با spark.sql.autoBroadcastJoinThreshold تغییر دهید.

یک کاربرد رایج آن وقتی است که بین ۲ دیتافریم، که یکی حجم کمتری نسبت به دیگری دارد، یک سری Keyهای مشترک وجود دارد و شما اختیاج دارید که یک صورت ادغام شده از این ۲ را ببینید.

راحت‌ترین و سریع‌ترین Join در اسپارک BHJ است چون هیچ جابجایی داده (Shuffle) وجود ندارد؛ بعد از Broadcast، تمام داده به‌صورت محلی برای Executor قابل دسترس است. فقط باید مطمئن شوید که Driver و Executorها حافظه‌ی کافی دارند تا بتوانند دیتاست کوچکتر را در حافظه نگه دارند.

چه موقع از ‌Broadcast Hash Join استفاده کنیم

  • وقتی اسپارک هر Key در دیتاست کوچک و بزرگ را به یک پارتیشن یکسان Hash کرده است
  • وقتی یک دیتاست بسیار کوچکتر از دیتاست دیگر است
  • وقتی می‌خواهید فقط یک Equi-Join انجام دهید تا دو دیتاست را بر اساس unsorted keyهای یکسان ادغام کنید
  • وقتی نگرانی‌ای بابت استفاده‌ی بیش از حد از پهنای باند شبکه یا ارورهای OOM ندارید چون دیتاست کوچکتر به تمام Executorها Broadcast خواهد شد.

اگر مقدار spark.sql.autoBroadcastJoinThreshold را ۱- قرار دهید، اسپارک همیشه از Shuffle Sort Merge Join استفاده خواهد کرد.


Shuffle Sort Merge Join

الگوریتم Sort-Merge یک راه بهینه است برای ادغام ۲ دیتاست بزرگ روی Key مشترک که یکتا و قابل Sort است و می‌توان آن را در پارتیشن یکسان ذخیره کرد؛ به این معنا که دو دیتاست با یک key قابل هش و مشترک را می‌توان در یک پارتیشن یکسان ذخیره کرد. در نگاه اسپارک این یعنی تمام ردیف‌های درون هر دیتاست با key یکسان بر روی پارتیشن یکسان در Executor یکسان هش می‌شوند. مشخص است که این یعنی داده باید مکان یکسانی داشته باشد یا بین Executorها جابجا شود.

همانطور که از نامش پیداست، این Join دو فاز دارد: یک فاز sort و یک فاز merge. فاز sort هر دیتاست را با join key دلخواه مرتب می‌کند؛ فاز merge در ردیف هر دیتاست، روی هر key پیمایش می‌کند و اگر دو key با هم مچ شدند، آن‌ها را merge می‌کند.

چه موقع از Shuffle Sort Merge Join استفاده کنیم

  • وقتی هر key در هر دو دیتاست بزرگ می‌تواند در یک پارتیشن یکسان sort و hash شود
  • وقتی می‌خواهید فقط equi-join انجام دهید تا دو دیتاست بر حسب keyهای مرتب‌شده‌ی مچ ادغام شوند
  • وقتی می‌خواهید از Exchange(یا همان Shuffle) و Sort جلوگیری کنید تا Shuffleهای بزرگ در شبکه رخ ندهند

آشنایی با Spark UI

اسپارک یک محیط کاربری تحت وب ارائه می‌دهد که امکان بررسی اجزای مختلف برنامه‌ی ما را فراهم می‌کند. امکاناتی که فراهم می‌کند شامل حافظه‌ی مصرفی، جاب‌ها، استیج‌ها، تسک‌ها، تایم‌لاین‌ها و آمار و معیارهایی است که برای درک کردن برنامه‌ی اسپارک ما مفید است. spark-submit محیط کاربری را آغاز می‌کند و می‌توانید در حالت لوکال در پورت دیفالت ۴۰۴۰ به آن دسترسی پیدا کنید.

تب‌های مختلف Spark UI

این محیط کاربری شامل ۶ تب است. در نسخه‌ی ۳ یک تب دیگر با نام Structured Streaming اضافه شده است.

Jobs and Stages

همانطور که می‌دانید، اسپارک یک اپلیکشن‌ را به جاب‌ها، استیج‌ها و تسک‌های مختلف تبدیل می‌کند. تب‌های جاب و استیج به شما اجازه می‌دهد ریز به ریز جزئیات‌ را بررسی کنید. می‌توانید میزان پیشرفت، معیارهای مربوط به I/O، حافظه‌ی مصرفی، مدت زمان اجرا و ... را مشاهده کنید.

شکل زیر تب جاب را با Event Timeline گسترش یافته نشان می‌دهد. می‌توان مشاهده کرد که کی Executorها اضافه یا حذف شده‌اند. همچنین یک لیست از جاب‌های تمام شده نشان می‌دهد. ستون Duration مدت زمان هر جاب‌ را نشان می‌دهد. اگر این زمان زیاد باشد، بهتر است استیج‌های آن جاب را ببینید و بفهمید که کدام تسک باعث طولانی شدن شده است. از این صفحه می‌توانید به صفحه‌ی پر جزيیات هر جاب که شامل DAG نیز هم هست، دسترسی پیدا کنید.

تب استیج یک خلاصه از وضعیت فعلی تمام استیج‌های تمام جاب‌ها می‌دهد. همچنین میتوانید به یک صفحه‌ی پرجزئیات برای هر استیج نیز دست پیدا کنید. می‌توانید میانگین زمان اجرا برای هر تسک، زمان سپری شده در Garbage Collection و تعداد بایت‌ها/نمونه‌های خوانده شده در شافل را ببینید. اگر داده‌ی شافل از Executorهای ریموت خوانده می‌شود، Shuffle Read Blocked Time بالا می‌تواند نشان‌دهنده‌ی مشکلات I/O باشد. زمان GC بالا می‌تواند نشان‌دهنده‌ی وجود آبجکت‌های زیادی در Heap باشد(ممکن است Executorهای شما به حافظه‌ی زیادی احتیاج داشته باشند) اگر بیشترین زمان یک تسک بسیار بزرگتر از متوسط باشد، احتمالا بخاطر توزیع نامساوی داده در پارتیشن‌ها، با Data Skew روبرو شده‌اید.

Executors

این تب اطلاعاتی درباره‌ی Executorهای برنامه در اختیار می‌گذارد. می‌توانید جزئیات شکل زیر را ببینید.

علاوه بر آمارها، می‌توانید ببینید که هر Executor چگونه از حافظه و برای چه کاری استفاده می‌کند. همچنین این کمک می‌کند تا میزان مصرف منابع را به هنگام استفاده از توابع cache یا persist بررسی کنیم.

Storage

این تب اطلاعات هر تیبل یا DataFrame که برنامه آن را کش کرده، نشان می‌دهد.

SQL

اثر کوئری‌های Spark SQL که در برنامه‌ی شما اجرا می‌شوند، قابل دنبال‌کردن و دیدن هستند. می‌توانید ببینید که هر جاب چه موقع کدام کوئری را اجرا می‌کند و مدت زمان آن چقدر بوده است.

با کلیک بر روی هر کوئری، جزيات و برنامه‌ی اجرای آن نمایش داده می‌شود. این جزئیات در مواقعی که می‌خواهیم جزئیات هر اپراتور و تغییرات آن‌ها را ببینیم، مفید است.

Environment

شناختن محیطی که برنامه‌ی اسپارک در آن در حال اجرا است، نکات مفید زیادی را ارائه می‌کند که در عیب‌یابی مفید هستند. در واقع، ضروری است که از متغیرهای محیط مطلع باشیم، بدانیم از چه jarهایی استفاده شده است، مقدار متغیرهای اسپارک و سیستم چیست و ... تمام این اطللاعات read-only یک معدن طلا از اطلاعات هستند که به هنگام برخورد با یک رفتار غیرطبیعی در برنامه، باید آن‌ها را بررسی کنیم.


apache sparkآپاچی اسپارکبیگ دیتاbig data
فارغ التحصیل علوم کامپیوتر
شاید از این پست‌ها خوشتان بیاید