程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> 關於JAVA >> RxJava中多種場景的完成總結

RxJava中多種場景的完成總結

編輯:關於JAVA

RxJava中多種場景的完成總結。本站提示廣大學習愛好者:(RxJava中多種場景的完成總結)文章只能為提供參考,不一定能成為您想要的結果。以下是RxJava中多種場景的完成總結正文


1、推延履行舉措

可使用timer+map辦法完成.代碼以下:

Observable.timer(5, TimeUnit.MILLISECONDS).map(value->{
   return doSomething();
  }).subscribe(System.out::println);
 }

2、推延發送履行的成果

這類場景請求發生數據的舉措是立時履行,然則成果推延發送.這和下面場景的是紛歧樣的.

這類場景可使用Observable.zip來完成.

zip操作符將多個Observable發射的數據按次序組合起來,每一個數據只能組合一次,並且都是有序的。終究組合的數據的數目由發射數據起碼的Observable來決議。

關於各個observable雷同地位的數據,須要互相期待,也就說,第一個observable第一個地位的數據發生後,要期待第二個observable第一個地位的數據發生,等各個Observable雷同地位的數據都發生後,能力按指定例則停止組合.這真是我們要應用的.

zip有許多種聲明,但年夜致上是一樣的,就是傳入幾個observable,然後指定一個規矩,對每一個observable對應地位的數據停止處置,發生一個新的數據, 上面是個中一個最簡略的:

 public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction);

用zip完成推送發送履行成果以下:

 Observable.zip(Observable.timer(5,TimeUnit.MILLISECONDS)
         ,Observable.just(doSomething()), (x,y)->y)
   .subscribe(System.out::println));

3、應用defer在指定線程裡履行某種舉措

以下面的代碼,固然我們指定了線程的運轉方法,然則doSomething()這個函數照樣在以後代碼挪用的線程中履行的.

 Observable.just(doSomething())
     .subscribeOn(Schedulers.io())
     .observeOn(Schedulers.computation())
     .subscribe(v->Utils.printlnWithThread(v.toString()););

平日我們采取上面的辦法到達目標:

 Observable.create(s->{s.onNext(doSomething());})
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation())
    .subscribe(v->{
     Utils.printlnWithThread(v.toString());
  });

但其實我們采取defer也能到達雷同的目標.

關於defer

defer 操作符與create、just、from等操作符一樣,是創立類操作符,不外一切與該操作符相干的數據都是在定閱是才失效的。

聲明:

 public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory);

defer的Func0裡的Observable是在定閱(subscribe)的時刻才創立的.

感化:

Do not create the Observable until an Observer subscribes; create a fresh Observable on each subscription.

也就說observable是在定閱的時刻才創立的.

下面的成績用defer完成:

 Observable.defer(()->Observable.just(doSomething()))
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation())
    .subscribe(v->{Utils.printlnWithThread(v.toString());
  });

4、應用compose不要打斷鏈式構造

我們常常看到上面的代碼:

 Observable.just(doSomething())
    .subscribeOn(Schedulers.io())
     .observeOn(Schedulers.computation())
    .subscribe(v->{Utils.printlnWithThread(v.toString());

下面的代碼中,subscribeOn(xxx).observeOn(xxx)能夠在許多處所都是一樣的, 假如我們盤算把它同一在某一個處所完成, 我們可以這麼寫:

 private static <T> Observable<T> applySchedulers(Observable<T> observable) {
  return observable.subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation());
 }

然則如許每次我們須要挪用下面的辦法, 年夜致會像上面如許,最裡面是一個函數,等於打破了鏈接構造:

 applySchedulers(Observable.from(someSource).map(new Func1<Data, Data>() {
   @Override public Data call(Data data) {
   return manipulate(data);
   }
  })
 ).subscribe(new Action1<Data>() {
  @Override public void call(Data data) {
  doSomething(data);
  }
 });

可使用compose操作符到達不打破鏈接構造的目標.

compose的聲名以下:

 public Observable compose(Transformer<? super T, ? extends R> transformer);

它的入參是一個Transformer接口,輸入是一個Observable. 而Transformer現實上就是一個Func1<Observable<T>, Observable<R>> ,換言之就是:可以經由過程它將一品種型的Observable轉換成另外一品種型的Observable.

簡略的說,compose可以經由過程指定的轉化方法(輸出參數transformer),將本來的observable轉化為別的一種Observable.

經由過程compose, 采取上面方法指定線程方法:

 private static <T> Transformer<T, T> applySchedulers() {
   return new Transformer<T, T>() {
    @Override
    public Observable<T> call(Observable<T> observable) {
     return observable.subscribeOn(Schedulers.io())
       .observeOn(Schedulers.computation());
    }
   };
  }

 Observable.just(doSomething()).compose(applySchedulers())
    .subscribe(v->{Utils.printlnWithThread(v.toString());
   });

函數applySchedulers可使用lambda表達式進一步簡化為上面為:

 private static <T> Transformer<T, T> applySchedulers() { 
  return observable->observable.subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation());
 }

5、按優先級應用分歧的履行成果

下面這個題目估量沒表達清晰我想表達的場景. 其實我想表達的場景相似於平凡的獲得收集數據場景:假如緩存有,從緩存獲得,假如沒有,再從收集獲得.

這裡請求,假如緩存有,不會做從收集獲得數據的舉措.

這個可以采取concat+first完成.

concat將幾個Observable歸並成一個Observable,前往終究的一個Observable. 而那些數據就像從一個Observable收回來一樣. 參數可所以多個Observable,也能夠是包括Observalbe的Iterator.

新的observable內的數據分列按本來concat裡的observable次序分列,即新成果內的數據是按本來的次序排序的.

上面是上述需求的完成:

 Observable.concat(getDataFromCache(),getDataFromNetwork()).first()
    .subscribe(v->System.out.println("result:"+v));
 //從緩存獲得數據
 private static Observable<String> getDataFromCache(){
  return Observable.create(s -> {
   //dosomething to get data
   int value = new Random().nextInt();
   value = value%2;
   if (value!=0){
    s.onNext("data from cache:"+value); //發生數據
   }
   //s.onError(new Throwable("none"));
   s.onCompleted();
  }
    );
 }
 //從收集獲得數據
 private static Observable<String> getDataFromNetwork(){
  return Observable.create(s -> {
   for (int i = 0; i < 10; i++) {
    Utils.println("obs2 generate "+i);
    s.onNext("data from network:" + i); //發生數據
   }
   s.onCompleted();
  }
    );
 }

下面的完成,假如getDataFromCache稀有據, getDataFromNetwork這裡的代碼是不會履行的, 這恰是我們想要的.

下面完成有幾個須要留意:

    1、有能夠從兩個處所都獲得不到數據, 這類場景下應用first會拋出異常NoSuchElementException,假如是如許的場景,須要用firstOrDefault調換下面的first.

    2、下面getDataFromCache()裡,假如沒稀有據,我們直接挪用onCompleted,假如不挪用onCompleted,而是挪用onError,則上述采取concat是得不就任何成果的.由於concat在收就任何一個error,歸並就會停滯.所以,假如要用onError, 則須要用concatDelayError替換concat.concatDelayError會先疏忽error,將error推延到最初在處置.

總結

以上就是這篇文章的全體內容了,願望本文的內容對年夜家的進修或許任務能帶來必定的贊助,假如有疑問年夜家可以留言交換。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved