RxJava آموزش برنامه نویسی ریکتیو در جاوا - بخش اول

سلام به همه دوستان. توی اولین پستم میخام مطالعاتی که در مورد RxJava از منابع مختلف داشتم رو براتون به اشتراک بزارم. امیدوارم براتون مفید باشه.

آرایکس جاوا (RxJava : Reactive Extension for JVM) یک کتابخانه برای برنامه نویسی غیرهمزمان (asynchronous) و مبتنی بر رخداد (event-based) با استفاده از الگوی Observer می باشد. این کتابخانه مزیتهای بسیاری برای ما ایجاد میکند که از جمله مهمترین آنها استفاده از thread ها و thread pool ها را برای برنامه نویسی راحت تر کرده و دیگر برنامه نویس نیازی نیست نگران جزییات اجرای کارهای خود بروی Thread های مجزا از thread اصلی برنامه باشد و همچنین امکان هر دو مدل برنامه نویسی همزمان و غیرهمزمان را فراهم میکند همچنین این کتابخانه، توابع بسیار زیادی برای کار بر روی داده های تولید شده توسط تولید کننده یا همان منبع داده (observable) فراهم میکند. الگوی برنامه نویسی reactive این مفهوم را بیان میکند که در زمان نیاز بتوانیم به تغییرات پاسخ دهیم به این معنی که یک منبع داده شروع به تولید داده ها میکند که به اصطلاح به آن مشاهده شونده یا تولید کننده Observable میگوییم. هر منبع تولید داده ای نیاز به یک یا چند مشاهده کننده یا مصرف کننده Observer ویا به اصطلاح Subscriber دارد و ارتباط بین مشاهده شونده و مشاهده کننده با مفهومی بنام مشترک شدن یا Subscribe برقرار می شود. به محض اینکه ارتباط بین تولید کننده و مصرف کننده توسط subscribe برقرار شود کانال ارسال داده های تولید کننده به سمت مصرف کننده برقرار شده و تولید کننده شروع به تولید داده و مصرف کننده شروع به مصرف ان می کند.

برای استفاده از این کتابخانه در spring boot وابستگی زیر را به فایل pom.xml اضافه میکنیم.

<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.8</version>
</dependency>


شکل کلی دستورات در RxJava بصورت زیر هستش.

source.operator1().operator2().operator3().subscribe(consumer);

یک منبع داده، صفر یا چند عملگر میانی و در انتها مصرف کننده

سه مفهوم کلی در برنامه نویسی با RxJava :

1- منبع داده (تولید کننده) Source of data

2- مصرف کننده داده Consumer of data

3- اتصال مصرف کننده به منبع داده Connecting Consumer to Source

// defining the source
Observable<Integer> source = Observable.range(1, 5);
// defining the consumer
Subscriber<Integer> consumer = new Subscriber<Integer>() {
@Override
public void onNext(Integer number) { System.out.println(number); }
@Override
public void (Throwable e) { System.out.println(&quoterror&quot); }
@Override
public void onCompleted() { System.out.println(&quotcompleted&quot); }
};
// connecting the consumer to source
source.subscribe(consumer);

تابع onNext می تواند اصلا فراخوانی نشود و یا یک و یا چندین بار و یا حتی بصورت بی نهایت فراخوانی شود. توابع و onCompleted() را رخدادهای پایانی یا به اصطلاح terminal events می نامند به این معنی که فقط و فقط یکبار میتوانند فراخوانی شوند.زمانی که یک terminal event فراخوانی شود استریم تولید داده observable به اتمام می رسد و هیج داده ای دیگری ارسال نخواهد شد. رخدادهای پایانی اگر استریم تولید داده بی نهایت باشد ممکن است اصلا فراخوانی نشوند.

منبع داده:

منبع تولید داده Observable<T> می باشد که اعداد بین 1 تا 5 را بعنوان داده تولید خواهد کرد.

مصرف کننده:

مصرف کننده Subscriber<T> داده های تولید شده می باشد. RxJava داده های تولید شده توسط تولید کننده را توابع onNext مصرف کننده ارسال میکند. در صورتی که در فرایند ارسال داده یا به اصطلاح emit کردن داده به سمت مصرف کننده هرگونه خطایی رخ دهد تابع مصرف کننده فراخوانی شده و کانال ارسال داده بسته خواهد شد. وقتی تمامی داده های تولید کننده به سمت مصرف کننده با موفقیت ارسال شد، RxJava سیگنال اتمام تولید داده را به تابع onCompleted مصرف کننده ارسال میکند.

اتصال مصرف کننده به منبع داده:

در مثال بالا تمام محاسبات و کدهایی که بعنوان بخشی از تولید کننده و مصرف کننده نوشته شده فقط زمانی که اتصال بین منبع داده و مصرف کننده برقرار شده باشد، اجرا خواهد شد. در واقع در مثال بالا تا زمانی که مصرف کننده ارتباطش با تولید کننده از طریق تابع subscribe برقرار نشده باشد، منبع داده هیچ داده ای را تولید نخواهد کرد و فرایند مربوطه شروع نخواهد شد.

انواع منبع داده در RxJava:

io.reactivex.rxjava3.core.Flowable: 
انتشار صفر یا بی نهایت داده با قابلیت بافر کردن داده های ارسالی
io.reactivex.rxjava3.core.Observable: 
انتشار صفر یا بی نهایت داده بدون قابلیت بافرینگ داده های ارسالی
io.reactivex.rxjava3.core.Single: 
منبع داده ای با قابلیت انتشار دقیقا یک داده و یا ارسال سیگنال خطا
io.reactivex.rxjava3.core.Completable: 
منبع داده ای بدون انتشار داده و فقط ارسال سیگنال اتمام کار و یا سیگنال خطا
io.reactivex.rxjava3.core.Maybe: 
منبع داده ای بدون انتشار داده یا دقیقا انتشار فقط یک داده و یا ارسال سیگنال خطا

منبع داده Observable :

این منبع داده را میتوان در واقع قلب RxJava نامید. منبع تولید داده یا رخداد (event) در RxJavaمی باشد. RxJava توابع زیادی را برای ایجاد یک منبع داده (observable) در اختیار ما قرار می دهد. انواع توابعی که برای ایجاد یک منبع داده وجود دارد را در ادامه با هم بررسی میکنیم. توابع زیر اکثراً در observable و flowable مشترک می باشند.

منبع داده Flowable :

این منبع داده علاوه برداشتن قابلیتهای observable بعنوان یک منبع داده قابلیتی بنام BackPressure دارد. زمانی که سرعت تولید و انتشار داده توسط یک observable بیشتر از توان مصرف کننده در مصرف داده های ارسالی باشد چه اتفاقی رخ میدهد؟ observable یک بافر با اندازه بی نهایت دارد به این معنی که تمامی داده هایی که قرار است به سمت مصرف کننده ارسال کند را بافر کرده و سپس ارسال می کند و در حالتی که مصرف کننده توان مصرف نرخ داده ارسالی را نداشته باشد خطای outOfMemoryException رخ می دهد. با اعمال قابلیت backPressure بر روی یک استریم داده میتوان داده های ارسالی را به روش دلخواه مدیریت کرد. در نسخه 1 rxJava اعمال این قابلیت بر عهده observable بود ولی از نسخه 2 rxJava این قابلیت بر عهده کلاس flowable قرار داده شده است. همچنین میتوانیم یک observable را با تابع زیر به یک flowable با قابلیت backpressure تبدیل کنیم.

Observable.range(1, 1_000_000).toFlowable(BackpressureStrategy.Drop)

انواع حالتهایی که backpressure در اختیار ما قرار میدهد بصورت زیر می باشد:

Drop:
 اگر تعداد ایتمهای تولید شده از سایز بافر بزرگتر شد دور ریخته می شود.
Buffer: 
تمامی داده های تولید شده را بافر میکند.
Latest:
 فقط آخرین مورد را نگه می دارد
Error: 
هنگامی که سرعت انتشار داده از توان مصرف بیشتر باشد خطای MissingBackpressureException  ایجاد میکند.
Missing: 
No strategy, it would throw a MissingBackpressureException sooner or later somewhere on the  downstream

منبع داده Single :

زمانی که بخواهیم منبع داده ما دقیقا یک آیتم را به مصرف کننده ارسال کند و یا در صورت بروز خطا سیگنال error را ارسال کند از این نوع منبع داده استفاده می نماییم. این منبع داده سیگنال onComplete ارسال نمیکند.

Single.fromCallable(this::returnSomething)
      .subscribe(
                 (item) -> {
                              System.out.println(item);
                               },
                (err) -> {
                             System.out.println(err);
                });
private int returnSomething() {
         return 100;
}

منبع داده Completable :

یک منبع داده بدون تولید و انتشار هیچگونه داده ای که فقط سیگنال completionو یا errorرا صادر میکند. کاربرد آن زمانی است که یک تابعی قرار است یک کاری را انجام دهد ولی خروجی که برمیگرداند برای ما اهمیت ندارد و یا اینکه اصلا خروجی برنمیگرداند و صرفاً انجام خود کار برایمان مهم است مثلا یکسری مقداردهی های اولیه در برنامه یا مثلا بروز رسانی یک مقداری در کش و یا انجام یک سری عملیات در دیتابیس.

Completable.fromAction(this::call)
  .subscribe(
                    () -> {
                              System.out.println(&quotComplete&quot);
                           },
                   (err) -> {
                             System.out.println(err);
                 });
private void call() {
       System.out.println(1);
}

منبع داده Maybe :

مشابه با Single با این تفاوت که امکان انتشار و تولید هیچگونه داده ای را نیز فراهم میکند. یعنی عملا ترکیبی از Completable و single می باشد.

//Maybe is similar to Single only difference being that it allows for no emissions as well.
 Disposable disposable = Maybe.just(&quotHello World Maybe&quot)
    .delay(2, TimeUnit.SECONDS, Schedulers.io())
    .subscribeWith(new DisposableMaybeObserver<String>() {
                  @Override
                   public void (Throwable e) {
                       e.printStackTrace();
                   }
                 @Override
                 public void onSuccess(String value) {
                      System.out.println(value);
                   }
               @Override
               public void onComplete() {
                   System.out.println(&quotDone Maybe!&quot);
                }
  });
Thread.sleep(3000);
 //start observing
 disposable.dispose();
 
 //no emission
 Maybe<Integer> emptySource = Maybe.empty();
emptySource.subscribe(
  s -> System.out.println(&quotItem received: from emptySource&quot + s), // اجرا نمی شود
  Throwable::printStackTrace,
  () -> System.out.println(&quotDone from EmptySource&quot)
);

انواع توابعی که منابع داده بالا برای تولید و انتشار داده در اختیار ما قرار می دهند:

تابع just :

این تابع در منابع داده زیر موجود است:

observable,flowable,maybe,single

برای ایجاد منبع داده از روی یک سری از اشیا از پیش ساخته شده و اماده میتوانیم از این تابع استفاده کنیم. این منبع داده به محض اینکه یک مصرف کننده به ان متصل شود شروع به ارسال داده ها به سمت مصرف کننده خواهد کرد. این تابع ده overload مختلف دارد که از بین 1 تا 10 ایتم را بعنوان ورودی میپذیرد.

Observable<Object> observable = Observable.just(&quot100&quot, &quotmohammad&quot, 200, 2.4);
observable.subscribe(
            item -> System.out.print(item),
           error -> error.printStackTrace(),
           () -> System.out.println(&quotfinish&quot)
   );

تابع FromIterable:

این تابع در منابع داده زیر موجود است:

flowable,observable

برای ایجاد یک منبع تولید داده از روی یک لیست استفاده می شود. در واقع هر کدام از ایتمهای داخل لیست بصورت مجزا به سمت مصرف کننده ارسال می شوند.

List<Integer> list = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8));
Observable<Integer> observable = Observable.fromIterable(list);
observable.subscribe(
          item -> System.out.println(item),
          error -> error.printStackTrace(),
          () -> System.out.println(&quotDone&quot)
);

تابع fromArray :

این تابع در منابع داده زیر موجود است:

flowable,observable

بعنوان یک منبع داده از روی یک آرایه عمل کرده و هر یک از عناصر آرایه را بعنوان یک آیتم مجزا به سمت مصرف کننده ارسال میکند.از آرایه های primitive پشتیبانی نمیکند و فقط مجاز به استفاده از Wrapper type ها می باشید.

Integer[] array = new Integer[10];
 for (int i = 0; i < array.length; i++) {
  array[i] = i;
}
Observable<Integer> observableArrays = Observable.fromArray(array);
observableArrays.subscribe(
               item -> System.out.println(item),
              error -> error.printStackTrace(),
              () -> System.out.println(&quotDone&quot));

تابع fromCallable :

در منابع داده زیر موجود است:

flowable,observable,maybe,single,completable

کارکرد این تابع زمانی است که یک تابع برای انجام کاری نوشته اید و این تابع پس از اتمام محاسبه یک نتیجه را برمیگرداند. بعنوان مثال تابعی که یک سرویس http را برای دریافت اطلاعاتی از سمت سرور فراخوانی کرده و بعد از فراخوانی آن سرویس، نتیجه را به قسمتی که تابع را فراخوانی کرده برگشت می دهد. مواردی از این قبیل با استفاده از اینترفیس callable قابل انجام می باشد.

Callable<String> callable = () -> {
       System.out.println(&quotinside callable!&quot);
      return &quotHello World!&quot // http method call or whatever
 };
Observable<String> observable = Observable.fromCallable(callable);
observable.subscribe(
          item -> System.out.println(item),
          error -> error.printStackTrace(),
          () -> System.out.println(&quotDone&quot)
);
//…. With lambda syntax
Observable.fromCallable((Callable<String>) () -> null)
.subscribe(item -> System.out.println(item),
                 error -> error.printStackTrace(),
                 () -> System.out.println(&quotDone&quot));

تابع Range :

در منابع داده زیر موجود است:

flowable,observable

با استفاده از این تابع یک دنباله افزایشی از اعداد صحیح را میتوانید بصورت یک به یک به سمت مصرف کننده ها ارسال نمایید. تابع rangeLong برای ایجاد یک دنباله از اعداد long استفاده می شود و اینکار پیش فرض بصورت blocking و روی thread اصلی برنامه اجرا می شود.

Observable.range(1,5).subscribe(item-> System.out.println(item));
Observable.rangeLong(1,5).subscribe(item-> System.out.println(item));

تابع Timer:

بعد از گذشت مدت زمان مورد نظر یک عدد 0 را به سمت مصرف کننده ارسال کرده و سپس سیگنال complete را ارسال میکند. در مثال زیر بعد از گذشت 5 ثانیه تابع onNext مصرف کننده را با مقدار 0 فراخوانی کرده و سپس خاتمه پیدا میکند. این کار را بصورت nonBlocking و در یک thread دیگر انجام می دهد.

Observable<Long> timer = Observable.timer(5, TimeUnit.SECONDS);
timer.subscribe(
           (v) -> System.out.println(Thread.currentThread().getName() + v + &quot onNext!&quot), 
         // RxComputationThreadPool-10 onNext!
       (err) -> System.out.println(err),
      () -> System.out.println(&quotfinish&quot)
);

مثال زیر بصورت blocking و در thread اصلی برنامه با استفاده از تابع blockingSubscribe اینکار را انجام می دهد.

Observable<Long> timer = Observable.timer(5, TimeUnit.SECONDS);
timer.blockingSubscribe(
           (v) -> System.out.println(Thread.currentThread().getName() + v + &quot onNext!&quot), 
// main0 onNext!
          (err) -> System.out.println(err),
          () -> System.out.println(&quotfinish&quot)
);

انشالا توی بخش دوم میخام در مورد توابع subscribeOn , observerOn و همچنین اجرای کدها بصورت موازی و در واقع MultiThread توی RxJava صحبت کنم.