程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> 網頁編程 >> PHP編程 >> 關於PHP編程 >> RxJava操作符(三)Filtering

RxJava操作符(三)Filtering

編輯:關於PHP編程

RxJava操作符(三)Filtering


在上一篇文章裡,我們了解了轉化操作符,能將數據轉化為我們想要的格式,但是如果數據集合裡面有一些我們想要過濾掉的數據怎麼辦?這時候我們就需要使用過濾操作符了,有點類似於sql裡的where,讓Observable只返回滿足我們條件的數據。

一、debounce
debounce操作符就是起到了限流的作用,可以理解為閥門,當你半開閥門的時候,水會以較慢的速度流出來。不同之處就是閥門裡的水不會浪費掉,而debounce過濾掉的數據會被丟棄掉。在Rxjava中,將這個操作符氛圍了throttleWithTimeoutdebounce兩個操作符。先來看一下throttleWithTimeOut吧,如下圖所示,這個操作符通過時間來限流,源Observable每次發射出來一個數據後就會進行計時,如果在設定好的時間結束前源Observable有新的數據發射出來,這個數據就會被丟棄,同時重新開始計時。如果每次都是在計時結束前發射數據,那麼這個限流就會走向極端:只會發射最後一個數據。


首先我們來創建一個Observable,每隔100毫秒發射一個數據,當要發射的數據是3的倍數的時候,下一個數據就延遲到300毫秒再發射。

  1. private Observable<Integer> createObserver() {
  2. return Observable.create(new Observable.OnSubscribe<Integer>() {
  3. @Override
  4. public void call(Subscriber<? super Integer> subscriber) {
  5. for (int i = 0; i < 10; i++) {
  6. if (!subscriber.isUnsubscribed()) {
  7. subscriber.onNext(i);
  8. }
  9. int sleep = 100;
  10. if (i % 3 == 0) {
  11. sleep = 300;
  12. }
  13. try {
  14. Thread.sleep(sleep);
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }

  18. }
  19. subscriber.onCompleted();
  20. }
  21. }).subscribeOn(Schedulers.computation());
  22. }
下面使用throttleWithTimeOut來過濾一下這個Observable,我們設定的過濾時間是200毫秒,也就說發射間隔小於200毫秒的數據會被過濾掉。

  1. private Observable<Integer> throttleWithTimeoutObserver() {
  2. return createObserver().throttleWithTimeout(200, TimeUnit.MILLISECONDS)
  3. .observeOn(AndroidSchedulers.mainThread());

  4. }
對其進行訂閱

  1. mLButton.setText("throttleWithTimeout");
  2. mLButton.setOnClickListener(e -> throttleWithTimeoutObserver().subscribe(i -> log("throttleWithTimeout:" + i)));
運行結果如下,可以看到,不是3的倍數的數據在發射後200毫秒內會發射出新的數據,所以會被過濾掉。

debounce操作符也可以使用時間來進行過濾,這時它跟throttleWithTimeOut使用起來是一樣,但是deounce操作符還可以根據一個函數來進行限流。這個函數的返回值是一個臨時Observable,如果源Observable在發射一個新的數據的時候,上一個數據根據函數所生成的臨時Observable還沒有結束,那麼上一個數據就會被過濾掉。

生成一個Observable並使用debounce對其進行過濾,只有發射來的數據為偶數的時候才會調用onCompleted方法來表示這個臨時的Observable已經終止。

  1. private Observable<Integer> debounceObserver() {
  2. return Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9).debounce(integer -> {
  3. log(integer);
  4. return Observable.create(new Observable.OnSubscribe<Integer>() {
  5. @Override
  6. public void call(Subscriber<? super Integer> subscriber) {
  7. if (integer % 2 == 0 && !subscriber.isUnsubscribed()) {
  8. log("complete:" + integer);
  9. subscriber.onNext(integer);
  10. subscriber.onCompleted();
  11. }
  12. }
  13. });
  14. })
  15. .observeOn(AndroidSchedulers.mainThread());
  16. }
對其進行訂閱

  1. mRButton.setOnClickListener(e -> debounceObserver().subscribe(i -> log("debounce:" + i)));
運行結果如下,可以看到,只有那些調用了onCompleted方法的數據才會被發射出來,其他的都過濾掉了。


二、Distinct
Distinct操作符的用處就是用來去重,非常好理解。如下圖所示,所有重復的數據都會被過濾掉。還有一個操作符distinctUntilChanged,是用來過濾掉連續的重復數據。

創建兩個Observable,並使用Distinct和DistinctUtilChanged操作符分別對其進行過濾

  1. private Observable<Integer> distinctObserver() {
  2. return Observable.just(1, 2, 3, 4, 5, 4, 3, 2, 1).distinct();

  3. }

  4. private Observable<Integer> distinctUntilChangedObserver() {
  5. return Observable.just(1, 2, 3, 3, 3, 1, 2, 3, 3).distinctUntilChanged();

  6. }
進行訂閱

  1. mLButton.setText("distinct");
  2. mLButton.setOnClickListener(e -> distinctObserver().subscribe(i -> log("distinct:" + i)));
  3. mRButton.setText("UntilChanged");
  4. mRButton.setOnClickListener(e -> distinctUntilChangedObserver().subscribe(i -> log("UntilChanged:" + i)));
運行結果如下所示,可以看到Distinct過濾掉了所有重復的數字,二DistinctUtilChanged只過濾掉重復的數字


三、ElementAt、Filter
這兩個操作符都很好理解,ElementAt只會返回指定位置的數據,而Filter只會返回滿足過濾條件的數據,其示意圖分別如下所示

創建兩個Observable對象並分別使用ElementAt和Filter操作符對其進行過濾

  1. private Observable<Integer> elementAtObserver() {
  2. return Observable.just(0, 1, 2, 3, 4, 5).elementAt(2);
  3. }

  4. private Observable<Integer> FilterObserver() {
  5. return Observable.just(0, 1, 2, 3, 4, 5).filter(i -> i < 3);
  6. }
分別對其進行訂閱

  1. mLButton.setText("elementAt");
  2. mLButton.setOnClickListener(e -> elementAtObserver().subscribe(i -> log("elementAt:" + i)));
  3. mRButton.setText("Filter");
  4. mRButton.setOnClickListener(e -> FilterObserver().subscribe(i -> log("Filter:" + i)));
運行結果如下


四、First、Last
First操作符只會返回第一條數據,並且還可以返回滿足條件的第一條數據。如果你看過我以前的博客,就會發現在我們使用Rxjava實現三級緩存的例子裡,就是使用first操作符來選擇所要使用的緩存。與First相反,Last操作符只返回最後一條滿足條件的數據。

另外還有一個BlockingObservable方法,這個方法不會對Observable做任何處理,只會阻塞住,當滿足條件的數據發射出來的時候才會返回一個BlockingObservable對象。可以使用Observable.toBlocking或者BlockingObservable.from方法來將一個Observable對象轉化為BlockingObservable對象。BlockingObservable可以和first操作符進行配合使用。

創建兩個Observable對象並分別使用first操作符進行處理

  1. private Observable<Integer> FirstObserver() {
  2. return Observable.just(0, 1, 2, 3, 4, 5).first(i -> i > 1);
  3. }

  4. private BlockingObservable<Integer> FilterObserver() {
  5. return Observable.create(new Observable.OnSubscribe<Integer>() {
  6. @Override
  7. public void call(Subscriber<? super Integer> subscriber) {
  8. for (int i = 0; i < 5; i++) {
  9. try {
  10. Thread.sleep(500);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. if (!subscriber.isUnsubscribed()) {
  15. log("onNext:" + i);
  16. subscriber.onNext(i);
  17. }
  18. }
  19. if (!subscriber.isUnsubscribed()) {
  20. subscriber.onCompleted();
  21. }
  22. }
  23. }).toBlocking();
  24. }
分別進行訂閱

  1. mLButton.setText("First");
  2. mLButton.setOnClickListener(e -> FirstObserver().subscribe(i -> log("First:" + i)));
  3. mRButton.setText(" Blocking");
  4. mRButton.setOnClickListener(e -> {
  5. log("blocking:" + FilterObserver().first(i -> i > 1));
  6. });
運行結果如下。可以看到first操作符返回了第一個大於1的數2,而BlockingObservable則一直阻塞著,直到第一個大於1的數據發射出來。


五、Skip、Take
Skip操作符將源Observable發射的數據過濾掉前n項,而Take操作符則只取前n項,理解和使用起來都很容易,但是用處很大。另外還有SkipLast和TakeLast操作符,分別是從後面進行過濾操作。

創建兩個Observable並分別使用skip和take操作符對其進行過濾操作

  1. private Observable<Integer> skipObserver() {
  2. return Observable.just(0, 1, 2, 3, 4, 5).skip(2);
  3. }
  4. private Observable<Integer> takeObserver() {
  5. return Observable.just(0, 1, 2, 3, 4, 5).take(2);
  6. }
分別進行訂閱

  1. mLButton.setText("Skip");
  2. mLButton.setOnClickListener(e -> skipObserver().subscribe(i -> log("Skip:" + i)));
  3. mRButton.setText("Take");
  4. mRButton.setOnClickListener(e -> takeObserver().subscribe(i -> log("Take:" + i)));
運行結果如下,可以看到skip過濾掉了前兩項,而take則過濾掉了除了前兩項的其他所有項。


六、Sample、ThrottleFirst
Sample操作符會定時地發射源Observable最近發射的數據,其他的都會被過濾掉,等效於ThrottleLast操作符,而ThrottleFirst操作符則會定期發射這個時間段裡源Observable發射的第一個數據

我們創建一個Observable每隔200毫秒發射一個數據,然後分別使用sample和throttleFirst操作符對其進行過濾

  1. private Observable<Integer> sampleObserver() {
  2. return createObserver().sample(1000, TimeUnit.MILLISECONDS);
  3. }

  4. private Observable<Integer> throttleFirstObserver() {
  5. return createObserver().throttleFirst(1000, TimeUnit.MILLISECONDS);
  6. }


  7. private Observable<Integer> createObserver() {
  8. return Observable.create(new Observable.OnSubscribe<Integer>() {
  9. @Override
  10. public void call(Subscriber<? super Integer> subscriber) {
  11. for (int i = 0; i < 20; i++) {
  12. try {
  13. Thread.sleep(200);
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. subscriber.onNext(i);
  18. }
  19. subscriber.onCompleted();
  20. }
  21. });
  22. }
分別進行訂閱

  1. mLButton.setText("sample");
  2. mLButton.setOnClickListener(e -> sampleObserver().subscribe(i -> log("sample:" + i)));
  3. mRButton.setText("throttleFirst");
  4. mRButton.setOnClickListener(e -> throttleFirstObserver().subscribe(i -> log("throttleFirst:" + i)));
運行結果如下,可以看到sample操作符會每隔5個數字發射出一個數據來,而throttleFirst則會每隔5個數據發射第一個數據。


本文的demo程序見github:https://github.com/Chaoba/RxJavaDemo

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved