پایهترین مفهوم در اسپارک، (RDD)Resilient Distributed Dataset است. RDD سه مولفهی اصلی دارد:
هر ۳ از اجزای اصلی RDD Programming API در اسپارک هستند. اول، یک لیست از Dependencyها که به اسپارک میگویند چگونه یک RDD با ورودیهایش باید ساخته شود، نیاز است. در مواقعی که نیاز است نتایج دوباره ساخته شوند، اسپارک میتواند با استفاده از این Dependencyها، RDD را بسازد و عملیات را دوباره انجام دهد. این مشخصه به RDDها انعطافپذیری میبخشد.
پارتیشنها این امکان را به اسپارک میدهند که محاسبات را بهصورت موازی در Executorها انجام دهند. در بعضی مواقع، مثلا به هنگام خواندن از HDFS، اسپارک از اطلاعات محلی برای ارسال عملیات به Executorهایی که به داده نزدیکتر هستند، استفاده میکند. در این صورت دادهی کمتری در شبکه جابجا شده است.
و در نهایت، RDD یک تابع محاسبه دارد که یک Iterator[T] برای دادهای که در RDD ذخیره خواهد شد، تولید میکند.
اما این مدل یک سری مشکل دارد. تابع محاسبه برای اسپارک مبهم است؛ اسپارک نمیداند که شما چه کاری در این تابع انجام میدهید. فارغ از اینکه شما join, filter,select, aggregation انجام میدهید، اسپارک آن را به شکل یک Lambda Expression میبینید. مشکل دیگر این است که نوع دادهی Iterator[T] برای Python RDD مبهم است؛ اسپارک فقط میداند که این یک Generic Object در پایتون است.
بههمین دلایل، چون اسپارک نمیتواند گزاره یا محاسبهی درون تابع را ببیند، هیچ راهی هم برای بهینهسازی آن ندارد. و در نهایت، اسپارک هیچ اطلاعی دربارهی نوع دادهی T ندارد، برای اسپارک این یک آبجکت مبهم است و تنها کاری که میتواند بکند این است که آبجکت مبهم را بهعنوان سریهایی از بایتها سریالسازی کند، بدون استفاده از تکنیکهای فشردهسازی داده.
این ابهامها توانایی اسپارک در بهینه کردن و مرتبسازی عملیات و تبدیل آن به یک Query Plan بهینه را میگیرد. پس راه حل چیست؟
اسپارک ۲ برای ساختاربندی اسپارک، چند راه حل ارائه داد. یکی این بود که محاسبات، با استفاده از پترنهای رایج در تحلیل داده بیان شوند. این پترنها بهعنوان عملیات سطح بالا بیان میشوند مانند: filter, select, count, aggregate, average, group. اینها باعث شدند به سادگی و شفافیت اسپارک اضافه شود.
از طریق مجموعهای از دستورها در domain-specific languageها که بهعنوان APIها در زبانهای پشتیبانیشدهی اسپارک آمدهاند، این دستورها به اسپارک میگویند که چه کاری میخواهید با دادهی خود انجام دهید و در نتیجه، اسپارک یک Query Plan بهینه برای اجرا میسازد.
راه حل بعدی این بود که اجازه میداد داده به فرمت Tabular ، مانند SQL، مرتب شوند.
اما همهی این ساختاربندیها چه فایدهای دارند؟
ساختاربندی، یک سری خوبیها دارد مانند بهتر شدن عملکرد و مدیریت بهتر فضا در اجزای اسپارک. در اینجا میخواهیم دربارهی ویژگیهای رسا بودن (Expressivity)، سادگی (Simplicity)، ترکیبپذیری (Composability) و یکنواختی (Uniformly) صحبت کنیم.
در مثال زیر میخواهیم رسا بودن و سادگی را نشان دهیم. در اینجا میخواهیم تمام سنها برای هر نام را Aggregate کنیم، بر اساس نام group کنیم و سپس میانگین را حساب کنیم. این یک پترن رایج در تحلیل داده بهحساب میآید.
اگر از Low-Level RDD API برای این کار استفاده کنیم، کد آن به شکل زیر است:
این کد رمزآلود و برای خواندن سخت است. این کد به اسپارک میگوید که چطور aggregate را انجام دهد و میانگین را محاسبه کند، یعنی چطور کوئری را محاسبه کند.
حال میخواهیم همین کد را با استفاده از High-Level DSLها و DataFrame API بیان کنیم. در اینجا به اسپارک میگوییم که چه کار کند:
این نسخه از کد بسیار رساتر و سادهتر از نسخهی قبلی است، چون از اپراتورهای High-Level DSL و APIها استفاده میکنیم تا به اسپارک بگوییم که چه کار کند. در نتیجه از این اپراتورها استفاده کردهایم تا کوئری ما را بسازند. و چون اسپارک میتواند به این کوئری نگاه کند و هدف ما را بفهمد، میتواند آن را بهینه و یا ترتیب عملیات را تغییر دهد تا بهینه اجرا شود. اسپارک دقیقا میداند که ما میخواهیم چکار کنیم: اشخاص را با نامهایشان group کن، سن آنها را aggregate کن و سپس میانگین را محاسبه کن. ما با استفاده از اپراتورهای سطح بالا تمام یک عملیات را بهعنوان یک کوئری بیان کردیم- این کار چقدر هزینهبر است؟
بعضی افراد معتقدند که فقط استفاده کردن از اپراتورهای زبانهای سطح بالا، توانایی توسعهدهندگان را در کنترل کوئری خود یا راهنمایی کامپایلر در چگونگی محاسبه کوئری، محدود میکند. بعضی دیگر میگویند که ما محدود به این پترنهای ساختار یافته نیستیم؛ میتوانیم هر وقت خواستیم از RDD API سطح پایین استفاده کنیم، اگرچه که معمولا احتیاجی به این کار پیدا نمیکنیم.
علاوه بر سادگی در خواندن، ساختار API سطح بالای اسپارک میان اجزا و زبانها یکنواخت است. برای مثال این کد Scala همان کاری را انجام میدهد که در بالا انجام دادیم:
اگر با SQL آشنا باشید، بعضی از این اپراتورهای DSL میتوانند عملیات Relational را انجام دهند. مانند select, filter, group, aggregate
تمام این سادگی و رسایی بخاطر وجود موتور Spark SQL است که Structered APIهای سطح بالا بر روی آن ساخته شدهاند. بخاطر این موتور، که زیربنای تمام اجزای اسپارک است، ما به APIهای یکنواخت دسترسی داریم.
در اسپارک، DataFrame در ساختار، فرمت و تعدادی از عملیات، از Pandas DataFrames الهام گرفته است. Spark DataFrames را میتوان تیبلهای درونحافظهای توزیعشدهای در نظر گرفت که ستونهای آن نامگذاری شدهاند؛ طوری که هر ستون یک نوع دادهی مخصوص دارد: integer, string, array, map, real, date, timestamp,....
وقتی دادهها به شکل یک تیبل ساختار یافته نمایش داده میشود، نه تنها وارد کردن آن راحتتر میشود بلکه کار کردن با ردیفها و ستونها بههنگام اعمال عملیات هم راحتتر میشود. همچنین، اسپارک دیتافریم Immutable است و اسپارک یک Lineage از تمام ترنسفورمها ذخیره میکند. میتوانید به راحتی هر تغییری روی دیتافریم ایجاد کنید، بدون اینکه نسخهی اصلی تغییری کند.
در اینجا میخواهیم نوع دادههای موجود در اسپارک را بررسی کنیم.
برای هماهنگی با زبانهای برنامهنویسیای که پشتیبانی میکند، اسپارک از انواع دادهی رایج پشتیبانی میکند. این انواع داده میتوانند در برنامهی اسپارک یا در تعریف Schema استفاده شوند. برای مثال در اسپارک میتوانید از String , Byte , Long , or Map, و غیره استفاده کنید.
در تحلیل دادههای پیچیده، فقط دادههای رایج و معمولی حضور ندارند. دادهها پیچیده و معمولا ساختار یافته و یا درهم خواهند بود. در این مواقع نیاز داریم تا اسپارک این دادههای پیچیده را برای ما مدیریت کند. این دادهها میتوانند در شکلهای maps, arrays, structs, dates, timestamps باشند.
در اسپارک، یک Schema نام ستونها و نوع دادهی DataFrame را تعریف میکند. در بیشتر مواقع، Schemaها بههنگام خواندن دادههای ساختار یافته از منابع دادهی خارجی به کار میآیند.
تعریف کردن Schema قبل از خواندن داده، در مقابل تعریف Schema به هنگام خواندن داده، چند مزیت دارد:
پس پیشنهاد میشود همیشه قبل از خواندن داده از منبع، Schema آن را تعریف کنید. البته اگر نمیخواهید Schema را تعریف کنید، یک راه این است که یک سمپل از کل دادهی خود تهیه کنید و بگذارید اسپارک از آن سمپل کوچک، Schema کل داده را استخراج کند. برای این کار میتوان از samplingRatio استفاده کرد.
اسپارک ۲ APIهای DataFrame و Dataset را با عنوان Structured API یکپارچه کرد تا توسعهدهندگان فقط یک API را یاد بگیرند. همانطور که در شکل زیر آمده، Datasetها ۲ ویژگی دارند: Typed API و Untyped API
میتوانید یک DataFrame را در Scala، یک نام مستعار برای کالکشنی از Generic Objectها یعنی Dataset[Row] در نظر بگیرید؛ که Row یک Generic Untyped JVM Object است که ممکن است تایپهای متنوعی را نگهداری کند. در مقابل، Dataset یک کالکشن از Strongly Typed JVM Object در Scala یا یک کلاس در Java است. در داکیومنت Dataset آمده که:
یک کالکشن از Strongly Typed Object که میتواند بهصورت موازی با استفاده از عملیات Functional یا Relational ترنسفورم شود. هر Dataset [در Scala] یک شکل Untyped به نام DataFrame نیز دارد که یک Dataset از Row است.
در زبانهایی که اسپارک پشتیبانی میکند، Dataset فقط در Java و Scala، و DataFrame فقط در R و Python معنی پیدا میکند. علت این امر این است که R و پایتون، compile-time type-safe نیستند؛ تایپها در زمان اجرا بهصورت داینامیک مشخص و معلوم میشوند - نه به هنگام کامپایل. در Scala و Java، تایپها به هنگام کامپایل مقید به متغیرها و آبجکتها هستند. اگرچه در Scala یک Dataframe نام دیگر برای Untyped Dataset[Row] است.
در اسپارک، Row یک Generic Object Type است که یک کالکشن از تایپهای مختلف را نگه میدارد و میتوان به آنها از طریق ایندکس دست پیدا کرد. اسپارک این آبجکتهای Row را تغییر میدهد و آنها را به تایپهایی مثل IntegerType و دیگر تایپهای رایج تبدیل میکند.
به طور خلاصه، عملیاتی که میتوانیم روی Dataset اجرا کنیم- مانند filter() , map() , groupBy() ,select() , take() - در DataFrame هم یکسان است. به نوعی، Dataset شبیه RDD است که توابع گفته شده را به همراه Compile-Time Safety در اختیار میگذارد اما خواندن اینترفیس آن راحتتر و به شکل Object-Oriented Programming است.
وقتی از Dataset استفاده میکنیم، موتور Spark SQL وظیفهی ساخت، تبدیل و سریالسازی شیهای JVM را برعهده دارد. همچنین با استفاده از Dataset Encoderها، وظیفهی مدیریت حافظهی Java Heap را هم برعهده دارد.
تا به اینجا ممکن است سوال پیش بیاید که چرا و چه موقع باید از DataFrame و یا Dataset استفاده کرد. در بیشتر مواقع استفاده از هر کدام ممکن است، بسته به اینکه از چه زبانی استفاده میکنید، اما بعضی مواقع یکی از آن دو به دیگری ارجحیت دارد. مثالهایی را در زیر میبینیم:
ممکن است سوال پیش بیاید که با این اوصاف، آیا RDD دارد تبدیل به شهروند درجهی دوم میشود؟ آیا آن Deprecate میشود؟ جواب نه است. RDD API همچنان پشتیبانی خواهد شد، اگرچه تمام ویژگیهایی که در اسپارک ۲ و ۳ هستند و خواهند بود، اینترفیس DataFrame خواهد داشت.
مواقعی هستند که شما از RDD استفاده میکنید مانند:
میتوانید بین DataFrame یا Dataset و RDD با فراخوانی تابع df.rdd جابجا شوید. (البته این کار هزینه دارد و باید جلوگیری شود مگر اینکه واجب است) بالاخره، DataFrame و Dataset بر روی RDD ساخته شدهاند و بههنگام مرحلهی ساخت کد، به RDD فشرده تجزیه میشوند.
در سطح بالا، Spark SQL به توسعه دهنده اجازه میدهد تا کوئریهای مطابق با ANSI SQL:2003 بر روی دادهی ساختار یافته منتشر کند. از ابتدای منتشر شدن آن در نسخهی ۱.۳، Spark SQL به یک موتور قابل توجه تبدیل شده است که بسیاری از کاربریهای سطح بالا با استفاده از آن ساخته شده است. علاوه بر امکان انتشار کوئریهای SQL مانند، Spark SQL کارهای زیر را انجام میدهد:
در مرکز Spark SQL Engine، بهینهساز Catalyst Optimizer و Project Tungsten قرار دارد. این دو با هم امکان دسترسی به APIهای سطح بالای DataFrame و Dataset و کوئریهای SQL را فراهم میسازند.
بهینهساز Catalyst Optimizer یک کوئری محاسباتی را میگیرد و آن را تبدیل به برنامهی اجرا (Execution Plan) میکند. این عملیات شامل ۴ فاز است:
1- تحلیل (Analysis)
2- بهینهسازی منطقی (Logical Optimization)
3- برنامهریزی فیزیکی (Physical Planning)
4- تولید کد (Code Generation)
موتور Spark SQL کار خود را با ایجاد یک Abstract Syntax Tree برای کوئری SQL یا DataFrame شروع میکند. در این مرحلهی آغازین، هر ستون یا نام تیبل با استفاده از Catalog داخلی بهدست خواهد آمد. Catalog یک اینترفیس برای Spark SQL است که لیست نامهای ستونها، انواع داده، تابعها، تیبلها، دیتابیسها و ... را ذخیره میکند. وقتی همهی اینها به دست آمدند، وارد فاز بعدی میشویم.
همانطور که شکل بالا نشان میدهد، این فاز شامل ۲ مرحلهی درونی است. Catalyst Optimizer ابتدا مجموعهای از برنامهریزیها را ایجاد خواهد کرد و سپس با استفاده از Cost-Based Optimizer(CBO)، هزینهی هر برنامهریزی را به آن متصل خواهد کرد.
در این فاز، Spark SQL یک برنامهریزی فیزیکی بهینه را برای Logical Plan انتخاب شده در مرحلهی قبلی تولید میکند.
فاز آخر بهینهسازی کوئری شامل تولید بایتکدهای بهینهی جاوا است که بر روی هر ماشین اجرا شود. چون Spark SQL میتواند روی دیتاستهای لود شده در حافظه عملیات انجام دهد، اسپارک میتواند از آخرین تکنولوژیهای کامپایلر برای تولید کد استفاده کند تا سرعت اجرا را افزایش دهد. به عبارت دیگر، بهعنوان یک کامپایلر عمل میکند. Project Tungsten که امکان تولید کد تمام مرحلهای (Whole-Stage Code Generation) را فراهم میکند، در اینجا نقش دارد.
ممکن است بپرسید Whole-Stage Code Generation چیست؟ یک فاز بهینهسازی فیزیکی کوئری است که کل کوئری را به یک فانکشن تبدیل میکند؛ با این کار از شر فراخوانیهای مجازی فانکشن راحت میشود و از CPU برای دادههای میانی استفاده میکند. نسخهی دوم موتور Tungsten که در اسپارک ۲ معرفی شد، از این روش برای تولید کدهای فشردهی RDD استفاده میکند. این استراتژی به طرز فوقالعادهای کارآمدی CPU و عملکرد را افزایش میدهد.