在上一篇文章裡,我們了解了轉化操作符,能將數據轉化為我們想要的格式,但是如果數據集合裡面有一些我們想要過濾掉的數據怎麼辦?這時候我們就需要使用過濾操作符了,有點類似於sql裡的where,讓Observable只返回滿足我們條件的數據。
debounce操作符就是起到了限流的作用,可以理解為閥門,當你半開閥門的時候,水會以較慢的速度流出來。不同之處就是閥門裡的水不會浪費掉,而debounce過濾掉的數據會被丟棄掉。在Rxjava中,將這個操作符氛圍了
debounce兩個操作符。先來看一下throttleWithTimeOut吧,如下圖所示,這個操作符通過時間來限流,源Observable每次發射出來一個數據後就會進行計時,如果在設定好的時間結束前源Observable有新的數據發射出來,這個數據就會被丟棄,同時重新開始計時。如果每次都是在計時結束前發射數據,那麼這個限流就會走向極端:只會發射最後一個數據。
首先我們來創建一個Observable,每隔100毫秒發射一個數據,當要發射的數據是3的倍數的時候,下一個數據就延遲到300毫秒再發射。
- private Observable<Integer> createObserver() {
- return Observable.create(new Observable.OnSubscribe<Integer>() {
- @Override
- public void call(Subscriber<? super Integer> subscriber) {
- for (int i = 0; i < 10; i++) {
- if (!subscriber.isUnsubscribed()) {
- subscriber.onNext(i);
- }
- int sleep = 100;
- if (i % 3 == 0) {
- sleep = 300;
- }
- try {
- Thread.sleep(sleep);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- subscriber.onCompleted();
- }
- }).subscribeOn(Schedulers.computation());
- }
下面使用throttleWithTimeOut來過濾一下這個Observable,我們設定的過濾時間是200毫秒,也就說發射間隔小於200毫秒的數據會被過濾掉。
- private Observable<Integer> throttleWithTimeoutObserver() {
- return createObserver().throttleWithTimeout(200, TimeUnit.MILLISECONDS)
- .observeOn(AndroidSchedulers.mainThread());
- }
對其進行訂閱
- mLButton.setText("throttleWithTimeout");
- mLButton.setOnClickListener(e -> throttleWithTimeoutObserver().subscribe(i -> log("throttleWithTimeout:" + i)));
運行結果如下,可以看到,不是3的倍數的數據在發射後200毫秒內會發射出新的數據,所以會被過濾掉。
debounce操作符也可以使用時間來進行過濾,這時它跟throttleWithTimeOut使用起來是一樣,但是deounce操作符還可以根據一個函數來進行限流。這個函數的返回值是一個臨時Observable,如果源Observable在發射一個新的數據的時候,上一個數據根據函數所生成的臨時Observable還沒有結束,那麼上一個數據就會被過濾掉。
生成一個Observable並使用debounce對其進行過濾,只有發射來的數據為偶數的時候才會調用onCompleted方法來表示這個臨時的Observable已經終止。
- private Observable<Integer> debounceObserver() {
- return Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9).debounce(integer -> {
- log(integer);
- return Observable.create(new Observable.OnSubscribe<Integer>() {
- @Override
- public void call(Subscriber<? super Integer> subscriber) {
- if (integer % 2 == 0 && !subscriber.isUnsubscribed()) {
- log("complete:" + integer);
- subscriber.onNext(integer);
- subscriber.onCompleted();
- }
- }
- });
- })
- .observeOn(AndroidSchedulers.mainThread());
- }
對其進行訂閱
- mRButton.setOnClickListener(e -> debounceObserver().subscribe(i -> log("debounce:" + i)));
運行結果如下,可以看到,只有那些調用了onCompleted方法的數據才會被發射出來,其他的都過濾掉了。
二、Distinct
Distinct操作符的用處就是用來去重,非常好理解。如下圖所示,所有重復的數據都會被過濾掉。還有一個操作符distinctUntilChanged,是用來過濾掉連續的重復數據。
創建兩個Observable,並使用Distinct和DistinctUtilChanged操作符分別對其進行過濾
- private Observable<Integer> distinctObserver() {
- return Observable.just(1, 2, 3, 4, 5, 4, 3, 2, 1).distinct();
- }
- private Observable<Integer> distinctUntilChangedObserver() {
- return Observable.just(1, 2, 3, 3, 3, 1, 2, 3, 3).distinctUntilChanged();
- }
進行訂閱
- mLButton.setText("distinct");
- mLButton.setOnClickListener(e -> distinctObserver().subscribe(i -> log("distinct:" + i)));
- mRButton.setText("UntilChanged");
- mRButton.setOnClickListener(e -> distinctUntilChangedObserver().subscribe(i -> log("UntilChanged:" + i)));
運行結果如下所示,可以看到Distinct過濾掉了所有重復的數字,二DistinctUtilChanged只過濾掉重復的數字
三、ElementAt、Filter
這兩個操作符都很好理解,ElementAt只會返回指定位置的數據,而Filter只會返回滿足過濾條件的數據,其示意圖分別如下所示
創建兩個Observable對象並分別使用ElementAt和Filter操作符對其進行過濾
- private Observable<Integer> elementAtObserver() {
- return Observable.just(0, 1, 2, 3, 4, 5).elementAt(2);
- }
- private Observable<Integer> FilterObserver() {
- return Observable.just(0, 1, 2, 3, 4, 5).filter(i -> i < 3);
- }
分別對其進行訂閱
- mLButton.setText("elementAt");
- mLButton.setOnClickListener(e -> elementAtObserver().subscribe(i -> log("elementAt:" + i)));
- mRButton.setText("Filter");
- mRButton.setOnClickListener(e -> FilterObserver().subscribe(i -> log("Filter:" + i)));
運行結果如下
四、First、Last
First操作符只會返回第一條數據,並且還可以返回滿足條件的第一條數據。如果你看過我以前的博客,就會發現在我們使用Rxjava實現三級緩存的例子裡,就是使用first操作符來選擇所要使用的緩存。與First相反,Last操作符只返回最後一條滿足條件的數據。
另外還有一個BlockingObservable方法,這個方法不會對Observable做任何處理,只會阻塞住,當滿足條件的數據發射出來的時候才會返回一個BlockingObservable對象。可以使用Observable.toBlocking
或者BlockingObservable.from
方法來將一個Observable對象轉化為BlockingObservable對象。BlockingObservable可以和first操作符進行配合使用。
創建兩個Observable對象並分別使用first操作符進行處理
- private Observable<Integer> FirstObserver() {
- return Observable.just(0, 1, 2, 3, 4, 5).first(i -> i > 1);
- }
- private BlockingObservable<Integer> FilterObserver() {
- return Observable.create(new Observable.OnSubscribe<Integer>() {
- @Override
- public void call(Subscriber<? super Integer> subscriber) {
- for (int i = 0; i < 5; i++) {
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- if (!subscriber.isUnsubscribed()) {
- log("onNext:" + i);
- subscriber.onNext(i);
- }
- }
- if (!subscriber.isUnsubscribed()) {
- subscriber.onCompleted();
- }
- }
- }).toBlocking();
- }
分別進行訂閱
- mLButton.setText("First");
- mLButton.setOnClickListener(e -> FirstObserver().subscribe(i -> log("First:" + i)));
- mRButton.setText(" Blocking");
- mRButton.setOnClickListener(e -> {
- log("blocking:" + FilterObserver().first(i -> i > 1));
- });
運行結果如下。可以看到first操作符返回了第一個大於1的數2,而BlockingObservable則一直阻塞著,直到第一個大於1的數據發射出來。
五、Skip、Take
Skip操作符將源Observable發射的數據過濾掉前n項,而Take操作符則只取前n項,理解和使用起來都很容易,但是用處很大。另外還有SkipLast和TakeLast操作符,分別是從後面進行過濾操作。
創建兩個Observable並分別使用skip和take操作符對其進行過濾操作
- private Observable<Integer> skipObserver() {
- return Observable.just(0, 1, 2, 3, 4, 5).skip(2);
- }
- private Observable<Integer> takeObserver() {
- return Observable.just(0, 1, 2, 3, 4, 5).take(2);
- }
分別進行訂閱
- mLButton.setText("Skip");
- mLButton.setOnClickListener(e -> skipObserver().subscribe(i -> log("Skip:" + i)));
- mRButton.setText("Take");
- mRButton.setOnClickListener(e -> takeObserver().subscribe(i -> log("Take:" + i)));
運行結果如下,可以看到skip過濾掉了前兩項,而take則過濾掉了除了前兩項的其他所有項。
六、Sample、ThrottleFirst
Sample操作符會定時地發射源Observable最近發射的數據,其他的都會被過濾掉,等效於ThrottleLast操作符,而ThrottleFirst操作符則會定期發射這個時間段裡源Observable發射的第一個數據
我們創建一個Observable每隔200毫秒發射一個數據,然後分別使用sample和throttleFirst操作符對其進行過濾
- private Observable<Integer> sampleObserver() {
- return createObserver().sample(1000, TimeUnit.MILLISECONDS);
- }
- private Observable<Integer> throttleFirstObserver() {
- return createObserver().throttleFirst(1000, TimeUnit.MILLISECONDS);
- }
- private Observable<Integer> createObserver() {
- return Observable.create(new Observable.OnSubscribe<Integer>() {
- @Override
- public void call(Subscriber<? super Integer> subscriber) {
- for (int i = 0; i < 20; i++) {
- try {
- Thread.sleep(200);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- subscriber.onNext(i);
- }
- subscriber.onCompleted();
- }
- });
- }
分別進行訂閱
- mLButton.setText("sample");
- mLButton.setOnClickListener(e -> sampleObserver().subscribe(i -> log("sample:" + i)));
- mRButton.setText("throttleFirst");
- mRButton.setOnClickListener(e -> throttleFirstObserver().subscribe(i -> log("throttleFirst:" + i)));
運行結果如下,可以看到sample操作符會每隔5個數字發射出一個數據來,而throttleFirst則會每隔5個數據發射第一個數據。
本文的demo程序見github:https://github.com/Chaoba/RxJavaDemo