آشنایی با توابع ObserveOn , SubscribeOn :
یک Observable بعنوان منبع داده شروع به تولید و انتشار داده ها میکند. قانون کلی حاکم بر Rx این است که در هر لحظه فقط یک داده مجاز به ورود به تابع onNext در subscription می باشد. بصورت پیش فرض تولید و انتشار داده ها در Observable در thread ای که تابع subscribe در ان ایجاد شده است شروع بکار و تولید و انتشار داده ها می کند.
public static void main(String[] args) {
Observable<String> source = Observable.just("Alpha","Beta","Gamma");
Observable<Integer> lengths = source.map(String::length);
lengths.subscribe(l -> System.out.println("Received " + l +" on thread " + Thread.currentThread().getName()));
}
//……output………
Received 5 on thread main
Received 4 on thread main
Received 5 on thread main
در مثال بالا تابع subscribe در thread اصلی برنامه شروع بکار کرده و بنابراین کل عملیات در thread اصلی برنامه و بصورت blocking انجام می شود. در این مثال تا چون عملیات در thread اصلی برنامه انجام می شود تا زمانی که تمام داده ها توسط observable منتشر نشوند و observable سیگنال onCompleted را ارسال نکند برنامه خاتمه پیدا نمیکند. در صورتی که بخواهیم کل عملیات تولید و انتشار داده توسط observable در thread ای غیر از thread اصلی برنامه اتفاق بیفتد میتوانیم از تابع SubscribeOn() و مشخص نمودن یک Scheduler استفاده نماییم. با اینکار تولید و انتشار داده ها در thread غیر از thread main انجام می شود. اگر عملیاتی محاسباتی در حال انجام باشد از Schedulers.computation() استفاده مینماییم. این scheduler بصورت پیش فرض به تعداد core های cpu ، thread در اختیار برنامه قرار می دهد.
public static void main(String[] args) {
Observable<String> source = Observable.just("Alpha", "Beta", "Gamma");
Observable<Integer> lengths = source
.subscribeOn(Schedulers.computation())
.map(String::length);
lengths.subscribe(sum -> System.out.println("Received " + sum +
" on thread " + Thread.currentThread().getName()));
}
// output……
Received 5 on thread RxComputationThreadPool-1
Received 4 on thread RxComputationThreadPool-1
Received 5 on thread RxComputationThreadPool-1
اگر مثال بالا را اجرا کنید قبل از نمایش نتیجه، برنامه خاتمه پیدا میکند به این دلیل که محاسبات بالا در یک thread دیگر اتفاق افتاده و thread اصلی برنامه قبل از اینکه thread محاسباتی خروجی را نمایش دهد به اتمام می رسد. یک برداشت اشتباه زمانی که از SubscribeOn استفاده می کنیم این است که برنامه باید بصورت خودکار بصورت پارالل اجرا شود یعنی در مثال بالا ارسال و نمایش اطلاعات بصورت موازی و در چند thread جداگانه انجام شود، ولی همانطور که از خروجی پیداست تمامی محاسبات و خروجی در یک thread یکسان انجام شده است و این رفتار پیش فرض Rx می باشد.
مکان استفاده از SubscribeOn() تفاوتی در رفتار ان ایجاد نمیکند، در واقع حتی اگر چند بار از این تابع در کد خود استفاده کنید فقط نسخه ای که به Observable نزدیکتر است مورد استفاده قرار گرفته و مابقی SubscribeOn ها نادیده گرفته می شوند.
Source1.map().subscribeOn(Scheduler.IO).filter.subscribeOn(Scheduler.Computation())
در مثال بالا عملیات در IO thread انجام می شود و دومین SubscribeOn نادیده گرفته می شود. تابعی مثل Observable.interval() بصورت پیش فرض و داخلی یک تابع SubscribeOn با Schedulers.Computation دارد و این تابع بصورت پیش فرض عملیاتش را یک thread محاسباتی انجام می دهد. بطور کلی تابع SubscribeOn به Observable اعلام میکند که در چه thread ای باید عملیات خود را انجام دهد.
ObserveOn() تابع
بعضی مواقع پیش می آید که در زنجیره محاسبات و عملگرهای که روی یک Observableایجاد کرده اید ادامه کار را روی thread دیگری در scheduler دیگری اجرا کنید. بعنوان مثال تولید داده در IO Scheduler در حال انجام است و در عملگرهای بعدی که روی داده های منتشر شده قرار است انجام شود محاسبات را بخواهیم روی یک Thread از Computation Scheduler انجام دهیم. در این حالت میتوانیم از تابع ObserveOn استفاده نماییم. در مثال زیر چونکه از تابع SubscribeOn استفاده نکرده ایم انتشار داده ها در Thread اصلی برنامه انجام می شود و قبل از عملگر map از تابع ObserveOn استفاده کرده ایم و عملا ادامه زنجیره تا انتها روی Thread دیگری از Computation Schedulerانجام می شود. در واقع به هر تعدادی که بخواهیم میتوانیم از تابع ObserveOn در زنجیره توابع Rx استفاده کنیم و در هر بار استفاده از محل استفاده به بعد Thread اجرای زنجیره تغییر میکند و توابع از قبل از ObserveOn تحت تاثیر تغییر Thread قرار نمیگیرند.
public static void main(String[] args) {Observable<Integer> source = Observable.range(1,10);source.map(i -> i * 100).doOnNext(i -> System.out.println("Emitting " + i+ " on thread " + Thread.currentThread().getName())).observeOn(Schedulers.computation()).map(i -> i * 10).subscribe(i -> System.out.println("Received " + i + " on thread "+ Thread.currentThread().getName()));sleep(3000);}
// output…..
Emitting 100 on thread mainEmitting 200 on thread mainEmitting 300 on thread mainEmitting 400 on thread mainEmitting 500 on thread mainEmitting 600 on thread mainEmitting 700 on thread mainEmitting 800 on thread mainEmitting 900 on thread mainEmitting 1000 on thread mainReceived 1000 on thread RxComputationThreadPool-3Received 2000 on thread RxComputationThreadPool-3Received 3000 on thread RxComputationThreadPool-3Received 4000 on thread RxComputationThreadPool-3Received 5000 on thread RxComputationThreadPool-3Received 6000 on thread RxComputationThreadPool-3Received 7000 on thread RxComputationThreadPool-3Received 8000 on thread RxComputationThreadPool-3Received 9000 on thread RxComputationThreadPool-3Received 10000 on thread RxComputationThreadPool-3
انتخاب Scheduler :
اسکجولر های متنوعی در rx برای انجام کارهای متفاوت تعبیه شده است. Schedulers.io() که برای انجام کارهای مرتبط با IO بهینه شده است و بصورت پیش فرض thread های داخل این scheduler کش شده و مورد استفاده مجدد قرار میگرند. Schedulers.newThread() که به ازای هر subscription یک thread جدید ایجاد میکند. در هنگام استفاده از این دو scheduler باید با احتیاط زیادی عمل کنید چون میتوانند منجر به تولید تعداد زیادی thread شده و کارایی برنامه را کاهش خواهند داد. برای کارهای محاسباتی میتوان از Schedulers.computation() استفاده نمایید که حداکثر تعداد thread های آن به تعداد core های cpu محدود شده است. همچنین از Schedulers.from() نیز میتوان استفاده کرد و در واقع Executor مد نظر خود با تعداد thread های مد نظر خود را ایجاد نمایید.
int computationalThreadCount = Runtime.getRuntime().availableProcessors() + 1; ExecutorService executor = Executors.newFixedThreadPool(computationalThreadCount); Scheduler schedulerComputation = Schedulers.from(executor);