یک راست برم سر اصل مطلب؛ تصمیم گرفتم با Hadoop و Spark آشنا بشم و موقع گشتن واسه دوره آموزشی رایگان، با این کلاس آنلاین تو سایت Udemy آشنا شدم که به مرور چیزایی رو که یاد میگیرم اینجا مینویسم. اینجا رو برای آموزش بقیه نمینویسم. مینویسم تا بعداً خودم بهش برگردم و بخونمش. این نکته رو هم بگم که این مطلب برای مبتدیها و noobهایی مثل خودمه :)
جواب سوال «Big Data چیست؟» اینه: «حجم بسیار بزرگی از داده». حالا سوالی که پیش میاد اینه که تعریف ما از «بسیار بزرگ» چیه؟ حقیقت اینه که جواب درست و مشخصی به این سوال وجود نداره. عدم وجود جواب مشخص برای این سوال به دو دلیله:
پس از کجا بدونیم با Big Data سروکار داریم یا نه؟
مثال: یه استارتاپ تو زمینه سرویس ایمیل راه اندازی کردیم که کاربران میتونند وارد سیستم ما بشن و ایمیل ارسال و دریافت کنند. در سه ماه، ۱۰۰ هزار کاربر در سیستم ما ثبتنام کرده و شروع به استفاده از سیستم ما میکنند. فرض کنیم از دیتابیسهای سنتی برای ذخیره ایمیلها استفاده میکنیم و اندازه فعلی دیتابیسمون یک ترابایت است. آیا الان مشکل Big Dataای داریم؟ جواب خیر است. ولی سوال درستتر این است که با این سرعت رشد، آیا در آینده مشکل Big Dataای خواهیم داشت؟ برای جواب به این سوال باید سه فاکتور Volume یعنی حجم، Velocity یعنی سرعت و Variety یعنی تنوع را در نظر بگیریم. تعداد کاربران در سه ماه ۱۰۰ هزار بود و در صورتی که با همین سرعت رشد کنیم، آخر سال، ۴۰۰ هزار کاربر خواهیم داشت و حجم داده ۴ ترابایت خواهد بود. نکتهای که وجود دارد این است که در این جا فقط حجم مهم نبود، بلکه سرعت رشد هم مهم بود. نکتهی بعدی که وجود دارد این است که در سیستم ما فقط متن وجود ندارد و ممکن است کاربران علاوه بر متن، به ارسال و دریافت عکس و ویدئو هم بپردازند که عملاً دیتابیسهای سنتی قابلیت ذخیره بهینه این نوع داده را ندارند. بنابراین در تعیین اینکه با مسئله Big Dataای روبرو هستیم یا نه، باید حجم، سرعت و تنوع داده را با هم در نظر بگیریم.
وقتی متوجه شدیم مسئلهای که با آن روبرو هستیم Big Dataای است، باید بدانیم که با چالشهای زیر روبرو خواهیم بود:
۳. از دست دادن داده یا Data Loss
۴. هزینه یا Cost
سوالی که به ذهن میرسد این است که آیا روشهای قدیمی توانایی حل این مشکلات را دارند؟ مشکل RDBMSها این است که قابلیت Scalability مناسبی ندارند؛ توانایی ذخیره داده Unstructured مثل تصویر و ویدئو را ندارند و در نهایت اینکه هزینه افزایش فضای ذخیرهسازی نیز در این سیستمها بالاست. راه حل سنتی دوم، استفاده از Grid Computing است که در آن چندین node کامپیوتر به صورت موازی پردازشها را انجام میدهند. مشکل این روش هم این است که وقتی حجم داده زیاد است، عملکرد مناسبی ندارند و مشکل بعدی آن است که لازم فرد مورد نظر دانش بسیار خوبی در زبانهای برنامهنویسی سطح پایین داشته باشه تا بتواند چنین سیستمی را پیادهسازی کند.
پس راه حل چیست؟ Hadoop
چرا Hadoop یه راهحل خوب است؟
سوالی که پیش میآید این است که آیا hadoop جایگزین دیتابیس است؟ جواب خیر است. چیزهایی هستند که hadoop در آنها بهتر است ولی در برخی زمینهها دیتابیسها قویتر هستند.
در حال حاضر hadoop تنها انتخاب برای Big Data نیست و دیتابیسهای NoSQL مثل کاساندرا هم گزینههای مناسبی هستند. برای مثال کاساندرا میتونه دادهای با میلیونها ستون و میلیاردها سطر رو پردازش کنه! (یا ابلفضل!)
مثال: فرض کنید برای یک بازار بورس کار میکنید. در دیتاست شما اطلاعات روزانه مربوط به تراکنشهای چندین سال بورس موجود است. اندازه دیتاست ۱ ترابایت است. یک روز از شما سوال میشود که حداکثر مقدار قیمت symbolهای مختلف را در طول این چند سال پیدا کنید. در اینجا لازم است به دو مقوله «فضای ذخیرهسازی» و «محاسبات» فکر کنیم. لپ تاپ شما فقط ۲۰ گیگ فضای خالی دارد. شما از مسئول شبکه میخواهید که فایل ۱ ترابایتی را در جایی از فضای ذخیرهسازی شبکه ذخیره کند و آدرس آن را به شما بدهد. و شما یک برنامه جاوا نوشتهاید که محاسبات مدنظرتان برای گرفتن خروجی مناسب از داده را انجام میدهد. حال لازم است محاسبه کنیم چقدر طول میکشد داده را بخوانیم و محاسبات لازم را روی آن انجام دهیم. با فرض اینکه سرعت انتقال داده از حافظه شبکه به لپتاپ شما ۱۲۲ مگابایت در ثانیه (Data Access Rate) است. بنابراین برای انتقال ۱ ترابایت، ۲ ساعت و ۲۲ دقیقه زمان میبرد. اگر پیشبینی کنیم که اجرای خود برنامه هم یک ساعت زمان ببرد و مقداری هم زمان با توجه به پهنای باند شبکه اضافه شود، میتوان گفت که Execution Time ما بیشتر از ۳ ساعت خواهد بود. ولی رییس شما به سرعت به نتیجه این عملیات نیاز دارد و نمیتواند ۳ ساعت صبر کند. بنابراین لازم است دنبال راهحل بهتری باشیم و مثلاً زیر ۳۰ دقیقه نتیجه را گزارش کنیم. بیشترین زمان صرف خواندن داده از حافظه شبکه و انجام محاسبات شده است. میتوان به جای HDD از SSD یا Solid State Drive استفاده کرد؛ ولی مشکل این است که قیمت SSD بسیار بیشتر از HDD است و وقتی حجم خیلی زیاد باشد، شاید خرید آن به صرفه نباشد. راه دوم میتواند این باشد که داده ۱ ترابایتی را به ۱۰۰ قسمت تقسیم کنیم و توسط ۱۰۰ کامپیوتر یا node شبکه آنها را بخوانیم و محاسبات را انجام دهیم. در این صورت ۲ ساعت و ۲۰ دقیقهای که صرف خواندن داده میشد تقسیم بر ۱۰۰ میشود به زیر دو دقیقه کاهش پیدا میکند و یک ساعتی که صرف انجام محاسبات میشد نیز تقسیم بر ۱۰۰ میشود و به زیر ۱ دقیقه کاهش پیدا میکند. مشکلی که راه حل دوم دارد این است که پهنای باند شبکه جوابگوی خواندن دیتا توسط ۱۰۰ کامپیوتر به صورت همزمان نیست و سرعت همهی کامپیوترهای شبکه هنگام خواندن داده به شدت کاهش پیدا میکند. برای حل این مشکل فرض میکنیم که هر ۱۰۰ بخش دیتا را که توسط هر کامپیوتر قرار بود خوانده شود به آن کامپویتر منتقل کردهایم و دیگر مشکل پهنای باند شبکه را نداریم. ولی مجدد مشکلی که در این جا ممکن است به وجود بیاید این است که چه اتفاقی میافتد اگر روی یک یا چندین تا از این کامپیوترها هارد دیسک دچار مشکل شود و داده از دست برود؟ برای مثال اگر یک تصویر داشته باشیم، میتوانیم آن را روی گوگل درایو هم داشته باشیم تا اگر مشکلی برای لپ تاپ ما به وجود آمد، مطمئن باشیم که یک نسخه پشتیبان از آن تصویر داریم. در مثال خودمان، کاری که میتوانیم بکنیم این است که داده هر کامپیوتر را روی دو کامپیوتر دیگر هم ذخیره کنیم تا در صورتی که مشکلی پیش آمد، بتوان نسخه پشتیبان را از آنجا بازیابی کرد. البته همین روش چالشهایی نیز دارد: لازم است کامپیوترها به یکدیگر متصل شوند. لازم است معلوم شود طبق چه منطقی دادهی هر کامپیوتر روی کدام کامپیوترهای دیگر ذخیره خواهد شد. در بخش محاسبات نیز مکشلاتی وجود دارد. برای مثال داده یک symbol میتواند روی کامپیوترهای مختلف باشد. حال، کدام کامپیوتر باید تمام خروجیهای کامپیوترهای مختلف را جمعآوری و همگامسازی کند؟ همانطور که مشخص است راهحلی که ارائه دادیم خیلی پیچیدگیهای مربوط به فضای ذخیرهسازی و محاسباتی دارد. پس راهحل بهینه برای ما در این وضعیت چیست؟ hadoop
در حقیقت، hadoop یک فریمورک برای پردازش موازی دیتاستهای بزرگ است که به صورت چندین cluster روی کامپیوترهای مختلف ذخیره شدهاند. نکته قابل توجه hadoop این است که این کامپیوترها لازم نیست سختافزار ویژهای داشته باشند. هر کامپیوتر یک هارد دیسک، رم و پردازنده داشته باشد،کافی است. البته توجه شود که این کامپیوترها احتمالا ارزان نیستند و سیستم آنها قویتر از کامپیوترهای معمولی است.
در اینجا، HDFS یا Hadoop Distributed File System تمام پیچیدگیهای مربوط به ذخیرهسازی داده مانند تقسیم داده به چندین بخش، تکرار هر بخش از داده در بیشتر از یک node را بر عهده میگیرد. همچنین MapReduce یک مدل در برنامهنویسی است که hadoop آن را پیادهسازی میکند و ظیفه آن برعهده گرفتن تمام پیچیدگیهای محاسباتی است. بنابراین فریمورک hadoop تمام منابع مورد نیاز را از منابع مختلف گردآوری کرده و یک خروجی یکپارچه شده را تحویل میدهد.
خب الان که با hadoop آشنا شدهایم کافی است پیش مدیر بورس رفته و hadoop را به او معرفی کرده و پیشنهاد خرید ۱۰۰ کامپیوتر را بدهیم. در این صورت احتمالا مدیر مربوطه، با قیافهای متعجب شما را نگاه خواهد کرد. مجدد اینجا hadoop به کمک شما میآید زیرا hadoop میتواند با تعداد کمتری کلاستر و یعنی حتی ۱۰ کامپیوتر هم کار کند. ولی در صورتی که بخواهید سرعت را افزایش دهید، تنها کافی است تعداد بیشتری node یا cluster به ساختار خود اضافه کنید؛ به عبارت دیگر hadoop قابلیت horizontally scale شدن دارد.
سوال ۱: حجم دیتای شما ۵۰۰ گیگ است و شواهد نشان میدهد که حجم دیتا در آینده افزایش چشمگیری نخواهد داشت. آیا شما با یک مسئله Big Data روبرو هستید؟
سوال ۲: به دلیل افزایش حجم داده از شما خواسته شده که مجدد ساختار این سایت فروشگاهی را بررسی کنید. در حال حاضر در back-end این سایت از MySQL Server استفاده میشود. این وبسایت داده را از دیتابیس میگیرد و همچنین داده جدید را به آن اضافه و داده قدیمی را پاک میکند. به دلیل اینکه hadoop یک راهحل batch و نه interactive است، بهتر است از دیتابیسهای NoSQL استفاده شود؛ آیا این جمله درست است؟
سوال ۳: کاری به شما محول شده که روی دیتابیس Oracle دو ساعت انجامش طول میکشد. از شما خواسته میشود این زمان را به یک ساعت کاهش دهید. آیا بهترین روش این است که تعداد سروهای بیشتری اضافه کنیم؟
سوال ۴: چرا پیشنهاد استفاده از SSD به جای HDD پیشنهاد خوبی برای افزایش نرخ خواندن داده نیست؟
سوال ۵: بهترین کار برای بازیابی داده چیست؟
سوال ۶: چه اتفاقی برای local file system میافتد وقتی HDFS را نصب میکنیم؟
سوال ۷: زمانی که چندین node به صورت همزمان درخواست داده روی پهنای بند شبکه میکنند، این موضوع موجب سرعت پایین انتقال داده روی شبکه میشود. بهترین کار افزایش پهنای باند شبکه است یا اینکه داده را روی nodeای که قرار است محاسبه را انجام دهد ذخیره کنیم؟
جواب ۱: خیر
جواب ۲: بله
جواب ۳: خیر
جواب ۴: هزینه خیلی بالاست.
جواب ۵: کپی کردن داده هر بلاک روی چندین بلاک دیگر
جواب ۶: هر دو در کنار یکدیگر قرار دارند و مشکلی پیش نمیآید و local file system با HDFS جایگزین نمیشود.
جواب ۷: داده را روی nodeای که محاسبه را انجام میدهد ذخیره کنیم.
سوالی که همان ابتدا لازم است پرسیده شود این است که آیا قبلاً از هیچگونه file system دیگری استفاده کردهاید؟ جواب طبیعتاً باید «بله» باشد. همین الان که دارید با لپتاپتان این متن را میخوانید یا اگر از دستگاه USB استفاده کردهاید، این یعنی از یک file system استفاده کردهاید. در حقیقت file system یکی از بخشهای اساسی یک سیستم عامل است. در حقیقت file system حافظه هارد دیسک شما را مدیریت میکند.
فرض کنید به یک نفر یک کتاب میدهید و به یک نفر دیگر برگههای آن کتاب را به صورت نامنظم میدهید. و به آنها میگوییم که به فصل ۳۴ بروند. با استفاده از فهرست مطالب کتاب، فرد اول، سریعتر صفحه مورد نظرش را پیدا میکند. بدون file system، اطلاعاتی که روی هارد دیسک ذخیره شدهاند، یک تیکه داده بزرگ خواهد بود که نمیتوان در آن تعیین کرد فلان اطلاعات یا فایل از کجای هارد شروع و در کجا تمام میشود.
عملکردهای اصلی file system موارد زیر هستند:
برگردیم به این سوال که چه نیازی به یک file system جدید به اسم HDFS است؟ به تصویر زیر توجه کنید:
فرض کنید یک کلاستر با ۱۰ node داریم و file system هر کدام EXT4 است. روی هر کدام از این nodeها، EXT4 را Local File System آن node مینامیم. اولین کار این است که وقتی یک فایل را روی file system پیشنهادیمان آپلود میکنیم، لازم است file system داده را به بلاکهایی با ندازه مساوی تقسیم کند.
توجه: مفهموم بلاک در HDFS با مفهوم آن روی file systemهای قدیمی متفاوت است که جلوتر در مورد آن صحبت خواهد شد.
خُب، file system ما باید یک دیدِ توزیع شده به فایلها و بلاکها کلاستر داشته باشد که این امر با EXT4 امکانپذیر نیست. یعنی local file systemعه node شماره یک، هیچ ایدهای ندارد که روی node شماره دو چه فایلهایی قرار دارند. به همین دلیل است که نام آن را local file system گذاشتهاند. در این یکی از مشکلات احتمالی، از دست دادنِ داده است. راهحل این است که یک file system روی EXT4 داریم که روی همه nodeها توزیع شده است که آن را HDFS مینامیم. حالا وقتی فایلی را روی HDFS آپلود میکنیم، HDFS آن را به صورت خودکار به بلاکهایی با اندازه ۱۲۸ مگابایت تقسیم میکند. HDFS خودش به صورت خودکار بلاکهای مختلف را روی nodeهای مختلف قرار میدهد و عمل کپی کردن هر بلاک روی بیشتر از یک node را انجام میدهد. به صورت پیشفرض HDFS هر بلاک را روی ۳ node کپی میکند. فرض کنید فایلی با حجم ۷۰۰ مگابایت را روی HDFS ذخیره میکنید. ۵ بلاک با اندازه ۱۲۸ مگابایت و یک بلاک با اندازه ۶۰ مگابایت خواهیم داشت. به این دلیل که HDFS یک دیدِ توزیع شده به کل کلاستر دارد، میتواند به سادگی تصمیم بگیرد که کدام node کدام بلاک را در خود داشته باشد و کدام nodeها کپیها را در خود داشته باشند.
سوال مصاحبه: وقتی HDFS داریم چه اتفاقی برای local file system میافتد؟ HDFS جایگزین آن نیست و هنوز هم سیستمعامل هر node از local file system استفاده میکند. همچنین HDFS از EXT4 در مثال ما برای ذخیره کردن داده استفاده میکند.
مزایای HDFS اینا هستند:
برای کار کردن با HDFS تو محیط واقعی، تو این لینک، اطلاعاتتون رو ثبت کنید تا اطلاعات لازم برای اتصال به یک کلاستر با ۳ تا node روی AWS براتون ارسال بشه. تو اون ایمیل دو تا فایل هست که یکیش با پسوند .pemعه. حالا تو ترمینال دستورات زیر رو وارد کنید تا به کلاستر مورد نظر روی AWS وصل شید. من این دستورات رو زدم:
cd ~ sudo ssh -i /home/mehrdad/Downloads/cluster_key/hirwuser150430.pem hirwuser120130@54.85.143.224
در حقیقت ساختار دستوری که میزنید باید به این صورت باشه:
sudo ssh -i <pem file> <user name>@<ip address>
بعد از اتصال به کلاستر تو ترمینال، اگه دستور زیر رو بزنید، فایلهای موجود تو پوشه root رو مشاهده میکنید:
ls /
دستورات مربوط به hadoop با hadoop fs آغاز میشن. حالا اگه دستور زیر رو بزنید، خروجی با خروجی دستور قبلی فرق داره:
hadoop fs -ls /
دلیل این تفاوت اینه که hadoop یک دید کلی نسبت به همه nodeهای کلاستر داره در صورتی که دستور قبلی فقط فایلهای موجود تو یک node رو نشون میده. یعنی اگه روی یک node دیگه دستور ls / رو بزنید، ممکنه فایلهای موجود روی اون node با node فعلی فرق کنه ولی اگه روی node دوم دستور `hadoop fs -ls /` رو بزنیم، خروجی باید یکسان باشه چون همونطور که گفتیم hadoop یک دید کلی نسبت به همه nodeها دارد و فرقی نمیکند دستور را روی کدام node بزنیم.
از دستور زیر میتوان برای ایجاد یک دایرکتوری جدید در hadoop استفاده کرد:
hadoop fs -mkdir hadoop-test1
بنابراین میتوان نتیجه گرفت view و محتوای HDFS متفاوت از local file system است.
برای کپی کردن فایل از local file system به HDFS از دستور زیر استفاده می کنیم:
hadoop fs -copyFromLocal /hirw-starterkit/hdfs/commands/dwp-payments-april10.csv hadoop-test1
برای برعکسش از دستور زیر استفاده میکنیم:
hadoop fs -copyToLocal hadoop-test1/dwp-payments-april10.csv .
برای کپی کردن فایل از یک دایرکتوری به دایرکتوری دیگر از دستور زیر استفاده میکنیم:
hadoop fs -cp hadoop-test1/dwp-payments-april10.csv hadoop-test2
برای انتقال فایل از یک دایرکتوری به دایرکتوری دیگر از دستور زیز استفاده میکنیم:
hadoop fs -mv hadoop-test1/dwp-payments-april10.csv hadoop-test3
خب بریم سراغ اینکه ببینیم HDFS فایلها رو چطوری روی nodeهای مختلف کپی کرده. به صورت پیشفرض، هر فایل سه بار کپی شده است. برای تغییر این عدد میتوان به صورت زیر عمل کرد:
hadoop fs -Ddfs.replication=2 -cp hadoop-test2/dwp-payments-april10.csv hadoop-test2/test_with_rep2.csv
اینجا خوبه مجدد یادآوری کنم که اون بلاکهای فایل که HDFS ذخیره میکنه، روی local file system مثلاً NTFS یا EXT4 ذخیره میشن و HDFS حواسش هست که کدوم بلاکها روی کدوم nodeها و کجا ذخیره شدن و میتونه اونها رو مدیریت کنه.
وقتی ۱۰۰ها node در کلاستر دارید و صدها دیتاست بزرگ، این دیتا به بلاکهای کوچکتر تقسیم میشود و روی nodeهای مختلف قرار میگیرد و کپیهای آنها نیز همینطور. HDFS باید بتواند این تعداد فایل را مدیریت کند و یک فایل را از روی چندین بلاک بسازد. ولی HDFS چطور این کار را میکند؟ بریم سراغ چند تا تعریف.
به nodeهایی که بلاکها به صورت فیزیکی روی آنها ذخیره شدهاند، DataNode میگویند. هر DataNode میداند مسئول چه بلاکهایی است.
ولی برای مثال nodeهای ۱ و ۲ نمیدانند که بلاکهایی که روی خود دارند، متعلق به کدام دیتاستها و فایلها هستند و همچنین هر node هیچ اطلاعی در مورد سایر بلاکهای سایر nodeها ندارد. ما به عنوان کاربر احتمالا اینکه نام فایل را بدانیم باید برایمان کافی باشد و لازم نباشد وارد جزییات بلاکها شویم. سوالی که وجود دارد این است پس اطلاعات مربوط به اینکه کدام بلاک متعلق به کدام فایل است، کجا قرار دارد؟ این اطلاعات در nodeای با نام NameNode قرار دارد.
اگر این node کار نکند عملاً هیچ کاری نمیتوانیم انجام دهیم و کارها جلو نمیرود. پس شاید لازم باشد یک NameNode ذخیره هم داشته باشیم.
یک نکته مهم این است که همه meta dataها روی NameNode قرار دارند و تنها چیزی که روی NameNode قرار ندارد، آدرس بلاکها روی nodeهاست. سوالی که پیش میآید این است که چرا آدرس بلاکها روی NameNode قرار ندارد؟ وقتی NameNode اجرا میشود، سایر Nodeها لیستی از بلاکهایی که در خود دارند را برای آن میفرستند و به این صورت NameNode میداند بلاکها کجا قرار دارند. در حقیقت NameNode محل هر بلاک را روی memory دارد ولی این دیتا را روی دیسک ذخیره نمیکند. دلیل این قضیه این است که به صورت پیوسته فایلهای جدید روی HDFS قرار میگیرند و اصلاح میشوندو اگر قرار باشد تمام این تغییرات روی دیسک ذخیره شود؛ مجدد خواندن آنها از دیسک یک bottleneck خواهد بود.
در زیر مشخصات احتمالی یک node در یک کلاستر معمولی آورده شده است:
خب بریم چند تا جارگون یا واژه پر استفاده تو این زمینه رو یاد بگیریم:
معنی Rack: گروهی از nodeها که در یک شبکه به یکدیگر متصل شدهاند.
معنی cluster: گروهی از rackها که در یک شبکه به یکدیگر متصل شدهاند.
معنی Data Center: یک مکان فیزیکی که که کلاستر در آن جا قرار دارد.
سوال ۱: وقتی file systemهای سنتی توانایی کار با فایلهایی به حجم چند اِگزا بایت را دارند، چه نیازی به HDFS است؟
سوال ۲: به صورت پیشفرض از هر بلاک چند کپی روی HDFS قرار دارد؟
سوال ۳: برای چک کردن یک بلاک و چک کردن مکان آن، از چه دستوری استفاده میشود؟
سوال ۴: آدرس بلاکها در nameNode روی هارد دیسک ذخیره میشود؟
سوال ۵: برای تغییر تعداد کپیها، چه متغییری را باید تغییر دهیم؟
سوال ۶: مهمترین Node در کلاستر؟
جواب ۱: چون file systemهای قدیمی مناسب انجام محاسبات توزیع شده نیستند.
جواب ۲: سه تا
جواب ۳: `fsck`
جواب ۴: خیر
جواب ۵: `dfs.replication`
جواب ۶: NameNode
فرض کنید فرماندار کالیفرنیا از شما میخواهد جمعیت شهرهای این ایالت را پیدا کنید. همه منابع در اختیار شما قرار داده شده ولی فقط ۴ ماه وقت دارید. کاری که میتوان کرد این است که برای هر شهر فردی را در نظر بگیرید و وی را مسئول آمارگیری آن شهر بکنید. حال به این افراد میگویید که سراغ تک تک خانهها بروید و در بزنید و از اهالی خانه بپرسید چند نفر در آن خانه زندگی میکنند. از هر فرد میخواهید وقتی سراغ هر خانه میروند و آمار آن خانه را میگیرند، در هر خط اسم آن شهر را بنویسند و جلوی آن تعداد ساکنان آن خانه و به همین ترتیب برای خانههای بعدی پیش بروند. این روش یک روش کلاسیک divide and conquer است. سپس از همه مسئولین شهر میخواهید که نتیجه را به دفتر مرکزی کالیفرنیا ارسال کنند. در دفتر مرکزی، آمار هر شهر جداگانه جمع زده میشود و آمار هر شهر مشخص میشود. فرماندار از نتیجه کار شما راضی است.
سال بعد مجدد همین کار به شما اختصاص داده میشود ولی فقط ۲ ماه وقت خواهید داشت. کاری که میتوانید بکنید این است که هر شهر را به دو بخش تقسیم کنید و مسئولیت هر بخش در هر شهر را به یک نفر بدهید (دو نفر در هر شهر) تا کارها سریعتر انجام شود. در دفتر مرکزی هم میتوان دو نفر را داشت و هر شهر را به یکی از این دو نفر داد تا همل جمع زدن را انجام دهد. در اینجا شما میخواهید مطمئن شوید که آمار هر دو بخش یک شهر به یکی از نیروهای دفتر مرکزی ارسال میشود و اینگونه نشود که آمار بخشی از شهر در درست نفر اول دفتر مرکزی و آمار بخش دوم شهر در دست نفر دوم دفتر مرکزی باشد. چه کار میتوان کرد؟ دستورالمعل مشخص برای هر شهر میفرستیم که آمارشان را به کدام نیرو در دفتر مرکزی ارسال کنند. کار انجام شد و مجدد فرماندار از نتیجه راضی بود.
در صورتی که سال بعد فرماندار از شما بخواهد همین کار را در یک ماه انجام دهید، دقیقا میدانید باید چه روندی را پیش بگیرید. در اینجا مدلی که ما داریم نه تنها کار میکند بلکه به راحتی scale میشود. این مدل را MapReduce مینامند. در حقیقت MapReduce یک مدل برنامهنویسی برای محاسبات توزیع شده است. یعنی روشی است برای پردازش دیتاستهای بزرگ به صورت توزیع شده. در شکل زیر، نام فازهای مختلف کار مشخص شدهاند:
توجه شود که مفهموم MapReduce توسط هر زبان برنامه نویسیای قابل پیادهسازی است. hadoop در حقیقت MapReduce را پیادهسازی میکند.
فرض کنید دیتاستی از بورس دارید که در آن اطلاعاتی مانند نام Symbol، تاریخ، قیمت باز آغاز و پایان آن روز و ... داریم. برای هر Symbol، بیشترین قیمت پایان روز را پیدا کنید.
فرض کنیم دیتا توزیع شده نبود و فقط یک node داشتیم. چگونه این مسئله را حل میکردیم؟ هر خط را میخواندیم و اگر قیمت پایانی کمتر از قیمت پایانیای که قبلاً ذخیره کردهایم بود، آن را جایزگین میکنیم و به خط بعد میرویم و در غیر این صورت به صورت مستقیم به خط بعد میرویم و این کار را تا رسیدن به خط آخر تکرار میکنیم.
خب حالا این مسئله را وقتی داده توزیع شده است چگونه حل کنیم؟ داده را به چند بخش تقسیم میکنیم و روی هر کدام از بخشهای داده، یک پروسه داریم که عمل میکند. بخشهای داده را input split مینامیم و به پروسههایی را که روی input splitها عملی را انجام میدهند mapper مینامیم. هر mapper در هر لحظه فقط یک رکورد از داده را پردازش میکند و هر کدام از mapperها یک سری دستورالعمل مشخص و یکسان را روی هر سطر داده انجام خواهند داد و خروجی از mapper یک key-value خواهد بود.
آیا فکر میکنید input split همان block (بلاک) است که در مبحث HDFS یاد گرفتیم؟ خب در اشتباه هستید. این دو تفاوتهایی دارند.
بلاک تقسیم فیزیکی داده با توجه به اندازه هر بلاک است. چون اندازه بلاک مشخص است، ممکن است ظرفیت هر بلاک قبل از اینکه کل یک رکورد را در بر بگیرد پر شود.
برای مثل در این دیتاست، ۴ رکورد با اندازه ۱۰۰ مگابایت داریم.اولین رکورد بدون مشکل در بلاک اول جا میگیرد. ولی دومین رکورد در بلاک اول جا نمیگیرد. بنابراین رکورد دوم در بلاک اول شروع میشود ولی در بلاک دوم تمام میشود. اگر یک mapper به بلاک اول اختصاص دهیم، نمیتواند رکورد دوم را پردازش کند به این دلیل که کل محتوای رکورد دوم روی بلاک اول قرار ندارد. این دقیقا مشکلی است که input split حل میکند. در این مثال، input split اول، هم رکورد اول و هم رکورد دوم را شامل میشود. input split دوم فقط شامل رکورد دوم خواد بود. input splitها کلاسهای جاوا هستند که که یک pointerای به محل شروع و پایان هر کورد درون بلاکها دارند. بنابراین وقتی mapper شروع به خواندن داده میکند، میداند کجا از کجا شروع کند و کجا پایان دهد. محل شروع یک input split میتواند یک بلاک باشد و محل پایانش یک بلاک دیگر. وقتی فرآیند MapReduce انجام میشود، hadoop بلاکها را اسکن میکند و input splitها را میسازد. (نفهمیدم چطوری از بلاکها input splitها رو ساخت.)
بریم سراغ mapperها. mapperها رو میشه با زبانهای برنامهنویسی مختلفی نوشت.در مثال ما، mapper یک برنامه جاوا است که توسط فریمورک hadoop برای هر رکورد در هر input split یک بار فراخوانی می شود.
به نظرتون چند تا mapper رو hadoop برای پردازش دیتاست میسازه؟ به تعداد input splitها. در مثال ما، خروجی هر بار اجرای mapper، قیمت پایانی برای هر symbol خواهد بود. symbol میشود key و مبلغ میشود value.
از کجا بدونیم کدوم متغییر رو key و کدوم یکی رو value در نظر بگیریم؟ ببینید چه چیزی قرار است reduced شود و آن را value در نظر بگیرید. Reducerها روی خروجی mapperها کار میکنند. خروجی mapperها group by میشوند با symbol و به reducer تحویل داده میشوند. اگر فرض کنید دیتای ما اطلاعاتی در مورد ۱۰ symbol دارد و برای هر symbol به تعداد ۱۰۰ رکورد دارد؛ که میشود ۱۰۰۰ رکورد در مجموع. بنابراین خروجی mapperها ۱۰۰۰ تا key-value خواهد بود چون mapperها روی هر رکورد اجرا میشوند. چون ۱۰ تا symbol داریم، reducer در ورودیاش ۱۰ رکورد دریافت خواهد کرد. در حقیقت به ازای هر symbol، یک بار reducer فراخوانده میشود.
نکته: تعداد reducerها توسط کاربر تعیین میشود و حتی میتوانیم reducerای نداشته باشیم.
به مرحلهای که در آن خروجی mapperها به reducerها تحویل داده میشوند shuffle phase میگویند. بقیه فازها در تصویر زیر قابل مشاهده هستند.
سوال ۱: کدام یک از موارد زیر جز فازهای MapReduce است؟ mapper - reducer - shuffle
سوال۲ : کاربر میتواند تعداد input splitها را مشخص کند؟
سوال ۳: کاربر میتواند تعداد reducerها را مشخص کند؟
جواب ۱: همگی
جواب ۲: خیر
جواب ۳: بله
چیزی که هست اینه که نوشتن یه برنامه mapreduce زمانبره و نیاز به تمرین و تکرار داره که دستتون راه بیفته. بهتر نبود که یه ابزاری داشته باشیم که فقط چند تا دستورالعمل بهش بدیم و توی کلاستر hadoopمون اجراش کنیم و همون نتیجه رو بهمون بده؟ جواب این سوال و مشکل Apache Pig است. این ابزار در شرکت یاهو توسعه داده شد تا امکان استفاده از mapreduce برای هر کسی که میخواهد با کلاستر hadoop کار کند فراهم شد.اگه میخواهید در وارد اکوسیستم hadoop شید، لازمه کار کردن با Pig رو یاد بگیرید. یه کد نمونه برای مسئله پیدا کردن بیشترین قیمت برای هر symbol توی بورس رو می بینید.
--Load dataset with column names and datatypes stock_records = LOAD '/user/hirw/input/stocks' USING PigStorage(',') as (exchange:chararray, symbol:chararray, date:datetime, open:float, high:float, low:float, close:float,volume:int, adj_close:float); --Group records by symbol grp_by_sym = GROUP stock_records BY symbol; --Calculate maximum closing price max_closing = FOREACH grp_by_sym GENERATE group, MAX(stock_records.close) as maxclose; --Store output STORE max_closing INTO 'output/pig/stocks' USING PigStorage(',');
وقتی به تحلیل داده فکر میکنید چه چیزی بلافاصله به ذهنتون میاد؟ برای خیلیهامون احتمالا جداول دیتابیس به ذهنمون میاد. در اکوسیستم hadoop دیتا به صورت فایل قرار داشتند. ما اینجا به دنبال ابزاری هستیم که دیتا رو به صورت جداول نشون بده و بشه روش کوئریهای SQL زد. خب چه ابزاری؟ Apache Hive.
پس Hive به عنوان ورودی، یک کوئری SQL میگیره و اون رو به یک یا چند jobعه mapreduce تبدیل میکنه و به کلاستر hadoop ارسال میکنه. سوالی که وجود داره اینه که چرا به دو تا ابزار Pig و Hive نیاز داریم؟ pig دستورالعمل Pig رو میگیره و تبدیل میکنه و Hive دستورالعمل SQL رو. دلیلش این بوده که تو دو تا شرکت مجزا و مستقل وی برای برای رسیدن به یه هدف مشابه این دو ابزار توسعهداده شدهاند و الان از هر دو استفاده میشه. یه کد نمونه برای مسئله پیدا کردن بیشترین قیمت برای هر symbol توی بورس رو می بینید.
### www.hadoopinrealworld ### ### Hive Queries To Compute Max Close Price By Stock Symbol ### ### CREATE EXTERNAL TABLE ### hive> CREATE EXTERNAL TABLE IF NOT EXISTS stocks_starterkit ( exch STRING, symbol STRING, ymd STRING, price_open FLOAT, price_high FLOAT, price_low FLOAT, price_close FLOAT, volume INT, price_adj_close FLOAT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/user/hirw/input/stocks'; ### SELECT 100 RECORDS ### hive> SELECT * FROM stocks_starterkit LIMIT 100; ### DESCRIBE TO GET MORE INFORMATION ABOUT TABLE ### hive> DESCRIBE FORMATTED stocks_starterkit; ### CALCULATE MAX CLOSING PRICE ### hive> SELECT symbol, max(price_close) max_close FROM stocks_starterkit GROUP BY symbol;
#تامام!#