مجتبی میر یعقوب زاده
مجتبی میر یعقوب زاده
خواندن ۱۱ دقیقه·۴ سال پیش

ای‌پی‌آی‌های ساختار یافته در Apache Spark

پایه‌ترین مفهوم در اسپارک، (RDD)‌Resilient Distributed Dataset است. RDD سه مولفه‌ی اصلی دارد:

  • وابستگی‌ها‌ (Dependencies)
  • پارتیشن‌ها
  • تابع محاسبه: Partition => Iterator[T]

هر ۳ از اجزای اصلی RDD Programming API در اسپارک هستند. اول، یک لیست از Dependencyها که به اسپارک می‌گویند چگونه یک RDD با ورودی‌هایش باید ساخته شود، نیاز است. در مواقعی که نیاز است نتایج دوباره ساخته شوند، اسپارک می‌تواند با استفاده از این Dependencyها، RDD را بسازد و عملیات را دوباره انجام دهد. این مشخصه به RDDها انعطاف‌پذیری می‌بخشد.

پارتیشن‌ها این امکان را به اسپارک می‌دهند که محاسبات را به‌صورت موازی در Executorها انجام دهند. در بعضی مواقع، مثلا به هنگام خواندن از HDFS، اسپارک از اطلاعات محلی برای ارسال عملیات به Executorهایی که به داده نزدیک‌تر هستند، استفاده می‌کند. در این صورت داده‌ی کمتری در شبکه جابجا شده است.

و در نهایت،‌ RDD یک تابع محاسبه دارد که یک Iterator[T] برای داده‌ای که در RDD ذخیره خواهد شد، تولید می‌کند.

اما این مدل یک سری مشکل دارد. تابع محاسبه برای اسپارک مبهم است؛ اسپارک نمی‌داند که شما چه کاری در این تابع انجام می‌دهید. فارغ از اینکه شما join, filter,select, aggregation انجام می‌دهید، اسپارک آن را به شکل یک Lambda Expression می‌بینید. مشکل دیگر این است که نوع دا‌ده‌ی Iterator[T] برای Python RDD مبهم است؛ اسپارک فقط می‌داند که این یک Generic Object در پایتون است.

به‌همین دلایل، چون اسپارک نمی‌تواند گزاره یا محاسبه‌ی درون تابع را ببیند، هیچ راهی هم برای بهینه‌سازی آن ندارد. و در نهایت، اسپارک هیچ اطلاعی درباره‌ی نوع داده‌ی T ندارد، برای اسپارک این یک آبجکت مبهم است و تنها کاری که می‌تواند بکند این است که آبجکت مبهم را به‌عنوان سری‌هایی از بایت‌ها سریال‌سازی کند، بدون استفاده از تکنیک‌های فشرده‌سازی داده.

این ابهام‌ها توانایی اسپارک در بهینه کردن و مرتب‌سازی عملیات و تبدیل آن به یک Query Plan بهینه را می‌گیرد. پس راه حل چیست؟

ساختاربندی اسپارک

اسپارک ۲ برای ساختاربندی اسپارک، چند راه حل ارائه داد. یکی این بود که محاسبات، با استفاده از پترن‌های رایج در تحلیل داده بیان شوند. این پترن‌ها به‌عنوان عملیات سطح بالا بیان می‌شوند مانند: filter, select, count, aggregate, average, group. این‌ها باعث شدند به سادگی و شفافیت اسپارک اضافه شود.

از طریق مجموعه‌ای از دستورها در domain-specific languageها که به‌عنوان APIها در زبان‌های پشتیبانی‌شده‌ی اسپارک آمده‌اند، این دستورها به اسپارک می‌گویند که چه کاری می‌خواهید با داده‌ی خود انجام دهید و در نتیجه، اسپارک یک Query Plan بهینه برای اجرا می‌سازد.

راه حل بعدی این بود که اجازه می‌داد داده به فرمت Tabular ، مانند SQL، مرتب شوند.

اما همه‌ی این ساختاربندی‌ها چه فایده‌ای دارند؟

محسن کلیدی اسپارک

ساختاربندی، یک سری خوبی‌ها دارد مانند بهتر شدن عملکرد و مدیریت بهتر فضا در اجزای اسپارک. در اینجا می‌خواهیم درباره‌ی ویژگی‌های رسا بودن (Expressivity)، سادگی (Simplicity)، ترکیب‌پذیری (Composability) و یکنواختی (Uniformly) صحبت کنیم.

در مثال زیر می‌خواهیم رسا بودن و سادگی را نشان دهیم. در اینجا می‌خواهیم تمام سن‌ها برای هر نام را Aggregate کنیم، بر اساس نام group کنیم و سپس میانگین را حساب کنیم. این یک پترن رایج در تحلیل داده به‌حساب می‌آید.

اگر از Low-Level RDD API برای این کار استفاده کنیم،‌ کد آن به شکل زیر است:

این کد رمزآلود و برای خواندن سخت است. این کد به اسپارک می‌گوید که چطور aggregate را انجام دهد و میانگین را محاسبه کند، یعنی چطور کوئری را محاسبه کند.

حال می‌خواهیم همین کد را با استفاده از High-Level DSLها و DataFrame API بیان کنیم. در اینجا به اسپارک می‌گوییم که چه کار کند:


این نسخه از کد بسیار رساتر و ساده‌تر از نسخه‌ی قبلی است، چون از اپراتورهای High-Level DSL و APIها استفاده می‌کنیم تا به اسپارک بگوییم که چه کار کند. در نتیجه از این اپراتورها استفاده کرده‌ایم تا کوئری ما را بسازند. و چون اسپارک می‌تواند به این کوئری نگاه کند و هدف ما را بفهمد، می‌تواند آن را بهینه و یا ترتیب عملیات را تغییر دهد تا بهینه اجرا شود. اسپارک دقیقا می‌داند که ما می‌خواهیم چکار کنیم: اشخاص را با نام‌هایشان group کن، سن آن‌ها را aggregate کن و سپس میانگین را محاسبه کن. ما با استفاده از اپراتورهای سطح بالا تمام یک عملیات را به‌عنوان یک کوئری بیان کردیم- این کار چقدر هزینه‌بر است؟

بعضی افراد معتقدند که فقط استفاده کردن از اپراتورهای زبان‌های سطح بالا، توانایی توسعه‌دهندگان را در کنترل کوئری خود یا راهنمایی کامپایلر در چگونگی محاسبه کوئری، محدود می‌کند. بعضی دیگر می‌گویند که ما محدود به این پترن‌های ساختار یافته نیستیم؛ می‌توانیم هر وقت خواستیم از RDD API سطح پایین استفاده کنیم، اگرچه که معمولا احتیاجی به این کار پیدا نمی‌کنیم.

علاوه بر سادگی در خواندن، ساختار API سطح بالای اسپارک میان اجزا و زبان‌ها یکنواخت است. برای مثال این کد Scala همان کاری را انجام می‌دهد که در بالا انجام دادیم:

اگر با SQL آشنا باشید، بعضی از این اپراتورهای DSL می‌توانند عملیات Relational را انجام دهند. مانند select, filter, group, aggregate

تمام این سادگی و رسایی بخاطر وجود موتور Spark SQL است که Structered APIهای سطح بالا بر روی آن ساخته شده‌اند. بخاطر این موتور، که زیربنای تمام اجزای اسپارک است، ما به APIهای یکنواخت دسترسی داریم.

DataFrame API

در اسپارک، DataFrame در ساختار، فرمت و تعدادی از عملیات، از Pandas DataFrames الهام گرفته‌ است. Spark DataFrames را می‌توان تیبل‌های درون‌حافظه‌ای توزیع‌شده‌ای در نظر گرفت که ستون‌های آن نام‌گذاری شده‌اند؛ طوری که هر ستون یک نوع داده‌ی مخصوص دارد: integer, string, array, map, real, date, timestamp,....

وقتی داده‌ها به شکل یک تیبل ساختار یافته نمایش داده‌ می‌شود، نه تنها وارد کردن آن راحت‌تر می‌شود بلکه کار کردن با ردیف‌ها و ستون‌ها به‌هنگام اعمال عملیات هم راحت‌تر می‌شود. همچنین، اسپارک دیتافریم Immutable است و اسپارک یک Lineage از تمام ترنسفورم‌ها ذخیره می‌کند. می‌توانید به راحتی هر تغییری روی دیتافریم ایجاد کنید، بدون اینکه نسخه‌ی اصلی تغییری کند.

در اینجا می‌خواهیم نوع داده‌های موجود در اسپارک را بررسی کنیم.

انواع داده‌ی رایج در اسپارک

برای هماهنگی با زبان‌های برنامه‌نویسی‌ای که پشتیبانی می‌کند، اسپارک از انواع داده‌ی رایج پشتیبانی می‌کند. این انواع داده می‌توانند در برنامه‌ی اسپارک یا در تعریف Schema استفاده شوند. برای مثال در اسپارک می‌توانید از String , Byte , Long , or Map, و غیره استفاده کنید.

انواع داده‌ی پیچیده و ساختار یافته در اسپارک

در تحلیل‌ داده‌های پیچیده، فقط داده‌های رایج و معمولی حضور ندارند. داده‌ها پیچیده و معمولا ساختار یافته و یا درهم خواهند بود. در این مواقع نیاز داریم تا اسپارک این داده‌های پیچیده را برای ما مدیریت کند. این داده‌ها می‌توانند در شکل‌های maps, arrays, structs, dates, timestamps باشند.

ساختن DataFrame و Schema

در اسپارک، یک Schema نام ستون‌ها و نوع داده‌ی DataFrame را تعریف می‌کند. در بیشتر مواقع، Schemaها به‌هنگام خواندن داده‌های ساختار یافته از منابع داده‌ی خارجی به کار می‌آیند.

تعریف کردن Schema قبل از خواندن داده، در مقابل تعریف Schema به هنگام خواندن داده، چند مزیت دارد:

  • وظیفه‌ی تشخیص نوع داده‌ را از دوش اسپارک برمی‌دارید
  • اسپارک را از ساختن یک جاب جدید برای خواندن داده و تشخیص Schema منع می‌کنید. این کار برای داده‌های حجیم زمان‌بر و هزینه‌بر خواهد بود
  • اگر داده‌ مطابق با Schema نباشد، می‌توانید ارور ها را تشخیص دهید

پس پیشنهاد می‌شود همیشه قبل از خواندن داده از منبع، Schema آن را تعریف کنید. البته اگر نمی‌خواهید Schema را تعریف کنید، یک راه این است که یک سمپل از کل داده‌ی خود تهیه کنید و بگذارید اسپارک از آن سمپل کوچک، Schema کل داده را استخراج کند. برای این کار می‌توان از samplingRatio استفاده کرد.

Dataset API

اسپارک ۲ APIهای DataFrame و Dataset را با عنوان Structured API یکپارچه کرد تا توسعه‌دهندگان فقط یک API را یاد بگیرند. همانطور که در شکل زیر آمده، Datasetها ۲ ویژگی دارند: Typed API و Untyped API

می‌توانید یک DataFrame را در Scala، یک نام مستعار برای کالکشنی از Generic Objectها یعنی Dataset[Row] در نظر بگیرید؛ که Row یک Generic Untyped JVM Object است که ممکن است تایپ‌های متنوعی را نگه‌داری کند. در مقابل، Dataset یک کالکشن از Strongly Typed JVM Object در Scala یا یک کلاس در Java است. در داکیومنت Dataset آمده که:

یک کالکشن از Strongly Typed Object که می‌تواند به‌صورت موازی با استفاده از عملیات Functional یا Relational ترنسفورم شود. هر Dataset [در Scala] یک شکل Untyped به نام DataFrame نیز دارد که یک Dataset از Row است.

Typed Objects, Untyped Objects, and Generic Rows

در زبان‌هایی که اسپارک پشتیبانی می‌کند، Dataset فقط در Java و Scala، و DataFrame فقط در R و Python معنی پیدا می‌کند. علت این امر این است که R و پایتون، compile-time type-safe نیستند؛ تایپ‌ها در زمان اجرا به‌صورت داینامیک مشخص و معلوم می‌شوند - نه به هنگام کامپایل. در Scala و Java، تایپ‌ها به هنگام کامپایل مقید به متغیرها و آبجکت‌ها هستند. اگرچه در Scala یک Dataframe نام دیگر برای Untyped Dataset[Row] است.

در اسپارک، Row یک Generic Object Type است که یک کالکشن از تایپ‌های مختلف را نگه می‌دارد و می‌توان به آن‌ها از طریق ایندکس دست پیدا کرد. اسپارک این آبجکت‌های Row را تغییر می‌دهد و آن‌ها را به تایپ‌هایی مثل IntegerType و دیگر تایپ‌های رایج تبدیل می‌کند.

به طور خلاصه، عملیاتی که می‌توانیم روی Dataset اجرا کنیم- مانند filter() , map() , groupBy() ,select() , take() - در DataFrame هم یکسان است. به نوعی، ‌Dataset شبیه RDD است که توابع گفته شده را به همراه Compile-Time Safety در اختیار می‌گذارد اما خواندن اینترفیس آن راحت‌تر و به شکل Object-Oriented Programming است.

وقتی از Dataset استفاده می‌کنیم، موتور Spark SQL وظیفه‌ی ساخت، تبدیل و سریال‌سازی شی‌های JVM را برعهده دارد. همچنین با استفاده از Dataset Encoderها، وظیفه‌ی مدیریت حافظه‌ی Java Heap را هم برعهده دارد.

DataFrames Versus Datasets

تا به اینجا ممکن است سوال پیش بیاید که چرا و چه موقع باید از DataFrame و یا Dataset استفاده کرد. در بیشتر مواقع استفاده از هر کدام ممکن است، بسته به اینکه از چه زبانی استفاده می‌کنید، اما بعضی مواقع یکی از آن دو به دیگری ارجحیت دارد. مثال‌هایی را در زیر می‌بینیم:

  • اگر می‌خواید به اسپارک بگویید که چه کار کند، نه اینکه چگونه انجامش دهد، از DataFrame یا Dataset استفاده کنید.
  • اگر فهم بیشتر، مفاهیم سطح بالا و عملیات DSL را می‌خواهید، از DataFrame یا Dataset استفاده کنید.
  • اگر Compile-Time Type Safety می‌خواهید و مشکلی با ساختن چند Case Class برای یک Dataset[T] به خصوص ندارید، از Dataset استفاده کنید.
  • اگر پردازش شما به عبارت‌های سطح بالا، filter، map، aggregation، محاسبه‌ی میانگین و مجموعه، کوئری‌های SQL، دسترسی به ستون‌ها و یا عملیات Relational روی داده‌های نیمه‌ ساختار یافته احتیاج دارد، از DataFrame یا Dataset استفاده کنید.
  • اگر پردازش شما اجبار می‌کند که از Relational Transformation مانند کوئری‌های SQLمانند استفاده شود، از DataFrame استفاده کنید.
  • اگر می‌خواهید از از مزایای سریال‌سازی بهینه‌ی Tungesten به‌همراه Encoderها بهره‌مند شوید، از Dataset استفاده کنید.
  • اگر یکپارچگی، بهینگی کد و سادگی API میان تمام اجزای اسپارک احتیاج دارید، از DataFrame استفاده کنید.
  • اگر برنامه‌نویس R هستید، از DataFrame استفاده کنید
  • اگر برنامه‌نویس Python هستید، از DataFrame استفاده کنید و اگر کنترل بیشتری می‌خواهید، از RDD استفاده کنید
  • اگر فضا و کارآمدی سرعت می‌خواهید، از DataFrame استفاده کنید.
  • اگر می‌خواهید ارورها به‌هنگام کامپایل مشخص شوند و نه ران‌تایم، API مناسب را مطابق شکل زیر انتخاب کنید.

چه موقع از RDD استفاده کنیم

ممکن است سوال پیش بیاید که با این اوصاف، آیا RDD دارد تبدیل به شهروند درجه‌ی دوم می‌شود؟ آیا آن Deprecate می‌شود؟ جواب نه است. RDD API همچنان پشتیبانی خواهد شد، اگرچه تمام ویژگی‌هایی که در اسپارک ۲ و ۳ هستند و خواهند بود، اینترفیس DataFrame خواهد داشت.

مواقعی هستند که شما از RDD استفاده می‌کنید مانند:

  • از یک پکیج third-party استفاده می‌کنید که با استفاده از RDD نوشته شده است
  • می‌توانید از بهینه‌سازی کد، استفاده بهینه از فضا و کارایی عملکرد که در DataFrame و Dataset وجود دارد، چشم‌پوشی کنید
  • می‌خواهید دقیقا به اسپارک بگویید که چگونه یک کوئری را انجام بدهد

می‌توانید بین DataFrame یا Dataset و RDD با فراخوانی تابع df.rdd جابجا شوید. (البته این کار هزینه دارد و باید جلوگیری شود مگر اینکه واجب است) بالاخره، DataFrame و Dataset بر روی RDD ساخته شده‌اند و به‌هنگام مرحله‌ی ساخت کد، به RDD فشرده تجزیه می‌شوند.

موتور Spark SQL

در سطح بالا، Spark SQL به توسعه دهنده اجازه می‌دهد تا کوئری‌های مطابق با ANSI SQL:2003 بر روی داده‌ی ساختار یافته‌ منتشر کند. از ابتدای منتشر شدن آن در نسخه‌ی ۱.۳، Spark SQL به یک موتور قابل توجه تبدیل شده است که بسیاری از کاربری‌های سطح بالا با استفاده از آن ساخته شده است. علاوه بر امکان انتشار کوئری‌های SQL مانند، Spark SQL کارهای زیر را انجام می‌دهد:

  • اجزای اسپارک را یکپارچه و یکنواخت می‌کند و اجازه استفاده از مفاهیم DataFrame/Dateset را می‌دهد
  • امکان اتصال به Apache Hive را فراهم می‌کند
  • داده‌های ساختاریافته با Schema مشخص را از فایل‌های با فرمت JSON, CSV, Text, Avro, Parquet, ORC و... می‌خواند و آن‌ها را به تیبل‌های موقت تبدیل می‌کند
  • پلی است برای اتصال به JDBC/ODBC
  • در اجرای نهایی، Query Plan بهینه و کد فشرده برای JVM می‌سازد

در مرکز Spark SQL Engine، بهینه‌ساز Catalyst Optimizer و Project Tungsten قرار دارد. این دو با هم امکان دسترسی به APIهای سطح بالای DataFrame و Dataset و کوئری‌های SQL را فراهم می‌سازند.

Catalyst Optimizer

بهینه‌ساز Catalyst Optimizer یک کوئری محاسباتی را می‌گیرد و آن را تبدیل به برنامه‌ی اجرا (Execution Plan) می‌کند. این عملیات شامل ۴ فاز است:

1- تحلیل (Analysis)

2- بهینه‌سازی منطقی (Logical Optimization)

3- برنامه‌ریزی فیزیکی (Physical Planning)

4- تولید کد (Code Generation)

فاز اول: تحلیل (Analysis)

موتور Spark SQL کار خود را با ایجاد یک Abstract Syntax Tree برای کوئری SQL یا DataFrame شروع می‌کند. در این مرحله‌ی آغازین، هر ستون یا نام تیبل با استفاده از Catalog داخلی به‌دست خواهد آمد. Catalog یک اینترفیس برای Spark SQL است که لیست نام‌های ستون‌ها، انواع داده، تابع‌ها، تیبل‌ها، دیتابیس‌ها و ... را ذخیره می‌کند. وقتی همه‌ی این‌ها به ‌دست آمدند، وارد فاز بعدی می‌شویم.

فاز دوم: بهینه‌سازی منطقی (Logical Optimization)

همانطور که شکل بالا نشان می‌دهد، این فاز شامل ۲ مرحله‌ی درونی است. Catalyst Optimizer ابتدا مجموعه‌ای از برنامه‌ریزی‌ها را ایجاد خواهد کرد و سپس با استفاده از Cost-Based Optimizer(CBO)، هزینه‌ی هر برنامه‌ریزی را به آن متصل خواهد کرد.

فاز سوم: برنامه‌ریزی فیزیکی (Physical Planning)

در این فاز، Spark SQL یک برنامه‌ریزی فیزیکی بهینه را برای Logical Plan انتخاب شده در مرحله‌ی قبلی تولید می‌کند.

فاز چهارم: تولید کد (Code Generation)

فاز آخر بهینه‌سازی کوئری شامل تولید بایت‌کدهای بهینه‌ی جاوا است که بر روی هر ماشین اجرا شود. چون Spark SQL می‌تواند روی دیتاست‌های لود شده در حافظه عملیات انجام دهد، اسپارک می‌تواند از آخرین تکنولوژی‌های کامپایلر برای تولید کد استفاده کند تا سرعت اجرا را افزایش دهد. به عبارت دیگر، به‌عنوان یک کامپایلر عمل می‌کند. Project Tungsten که امکان تولید کد تمام مرحله‌ای (Whole-Stage Code Generation) را فراهم می‌کند، در اینجا نقش دارد.

ممکن است بپرسید Whole-Stage Code Generation چیست؟ یک فاز بهینه‌سازی فیزیکی کوئری است که کل کوئری را به یک فانکشن تبدیل می‌کند؛ با این کار از شر فراخوانی‌های مجازی فانکشن راحت می‌شود و از CPU برای داده‌های میانی استفاده می‌کند. نسخه‌ی دوم موتور Tungsten که در اسپارک ۲ معرفی شد، از این روش برای تولید کد‌های فشرده‌ی RDD استفاده می‌کند. این استراتژی به طرز فوق‌العاده‌ای کارآمدی CPU و عملکرد را افزایش می‌دهد.

apache sparkبیگ دیتا
فارغ التحصیل علوم کامپیوتر
شاید از این پست‌ها خوشتان بیاید