RxJava
寫在前面
什麼是 ReactiveX?
ReactiveX 是一個專注於異步編程與控制可觀察數據(或者事件)流的API。
它組合了觀察者模式,迭代器模式和函數式編程的優秀思想。
實時數據處理是一件普通的現象,有一個高效、干淨和可擴展的方式來處理這些情景是重要的。
使用 Observables 和 Operators 來熟練操作它們。
ReactiveX 提供一個可組合又靈活的 API 來創建和處理數據流,同時簡化了異步編程帶來的一些擔憂,如:線程創建和並發問題。
RxJava 簡介
RxJava 是 ReactiveX 在 Java 上的開源的實現。
Observable(可觀察者,即被觀察者) 和 Subscriber / Observaber(訂閱者 / 觀察者)是兩個主要的類。
Subscriber是一個實現了 Observer 的抽象類。 Subscriber 對 Observer 接口進行了一些擴展,但他們的基本使用方式是完全一樣的
在RxJava 上,一個 Observable 是一個發出數據流或者事件的類,Subscriber 是一個對這些發出的 items(數據流或者事件)進行處理(采取行動)的類。
一個 Observable 的標准流發出一個或多個 item,然後成功完成或者出錯。
一個Observable 可以有多個 Subscribers,並且通過 Observable 發出的每一個 item,該 item 將會被發送到Subscriber.onNext() 方法來進行處理。
一旦 Observable 不再發出 items,它將會調用 Subscriber.onCompleted() 方法,或如果有一個出錯的話 Observable 會調用 Subscriber.onError() 方法。
Subscriber和Observer的區別
Subscriber和Observer不僅基本使用方式一樣,實質上,在 RxJava 的 subscribe 過程中,Observer 也總是會先被轉換成一個 Subscriber 再使用。所以如果你只想使用基本功能,選擇 Observer 和 Subscriber 是完全一樣的。它們的區別對於使用者來說主要有兩點:
onStart(): 這是 Subscriber 增加的方法。它會在 subscribe 剛開始,而事件還未發送之前被調用,可以用於做一些准備工作,例如數據的清零或重置。這是一個可選方法,默認情況下它的實現為空。需要注意的是,如果對准備工作的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執行), onStart() 就不適用了,因為它總是在 subscribe 所發生的線程被調用,而不能指定線程。要在指定的線程來做准備工作,可以使用 doOnSubscribe() 方法,具體可以在後面的文中看到。 unsubscribe(): 這是 Subscriber 所實現的另一個接口 Subscription 的方法,用於取消訂閱。在這個方法被調用後,Subscriber 將不再接收事件。一般在這個方法調用前,可以使用 isUnsubscribed() 先判斷一下狀態。 unsubscribe() 這個方法很重要,因為在 subscribe() 之後, Observable 會持有 Subscriber 的引用,這個引用如果不能及時被釋放,將有內存洩露的風險。所以最好保持一個原則:要在不再使用的時候盡快在合適的地方(例如 onPause() onStop() 等方法中)調用 unsubscribe() 來解除引用關系,以避免內存洩露的發生。
操作符
其實操作符就是為了提供一些函數式的特性。函數式最大的好處就是處理數據簡潔易懂。
map就是相當於對每一個元素進行變換,返回變換後的集合
filter就是對集合進行過濾
each就是遍歷集合
take取出集合中的前幾個
skip跳過前幾個元素
unique相當於按照數學上的集合處理,去重
其實我覺得題主覺得最難理解的應該是flatMap和Observable的概念吧。Observable可以理解成lazy load的集合。flatMap想當於對lazyLoad的集合中的每個元素再進行一次lazy load。
用於創建Observable的操作符
操作符 |
含義 |
Create
通過調用觀察者的方法從頭創建一個Observable
Defer
在觀察者訂閱之前不創建這個Observable,為每一個觀察者創建一個新的Observable
Empty/Never/Throw
創建行為受限的特殊Observable
From
將其它的對象或數據結構轉換為Observable
Interval
創建一個定時發射整數序列的Observable
Just
將對象或者對象集合轉換為一個會發射這些對象的Observable
Range
創建發射指定范圍的整數序列的Observable
Repeat
創建重復發射特定的數據或數據序列的Observable
Start
創建發射一個函數的返回值的Observable
Timer
創建在一個指定的延遲之後發射單個數據的Observable
變換操作
這些操作符可用於對Observable發射的數據進行變換,詳細解釋可以看每個操作符的文檔
操作符 |
含義 |
Buffer
緩存,可以簡單的理解為緩存,它定期從Observable收集數據到一個集合,然後把這些數據集合打包發射,而不是一次發射一個
FlatMap
扁平映射,將Observable發射的數據變換為Observables集合,然後將這些Observable發射的數據平坦化的放進一個單獨的Observable,可以認為是一個將嵌套的數據結構展開的過程。
GroupBy
分組,將原來的Observable分拆為Observable集合,將原始Observable發射的數據按Key分組,每一個Observable發射一組不同的數據
Map
映射,通過對序列的每一項都應用一個函數變換Observable發射的數據,實質是對序列中的每一項執行一個函數,函數的參數就是這個數據項
Scan
掃描,對Observable發射的每一項數據應用一個函數,然後按順序依次發射這些值
Window
窗口,定期將來自Observable的數據分拆成一些Observable窗口,然後發射這些窗口,而不是每次發射一項。類似於Buffer,但Buffer發射的是數據,Window發射的是Observable,每一個Observable發射原始Observable的數據的一個子集
過濾操作
這些操作符用於從Observable發射的數據中進行選擇
操作符 |
含義 |
Debounce
只有在空閒了一段時間後才發射數據,通俗的說,就是如果一段時間沒有操作,就執行一次操作
Distinct
去重,過濾掉重復數據項
ElementAt
取值,取特定位置的數據項
Filter
過濾,過濾掉沒有通過謂詞測試的數據項,只發射通過測試的
First
首項,只發射滿足條件的第一條數據
IgnoreElements
忽略所有的數據,只保留終止通知(onError或onCompleted)
Last
末項,只發射最後一條數據
Sample
取樣,定期發射最新的數據,等於是數據抽樣,有的實現裡叫ThrottleFirst
Skip
跳過前面的若干項數據
SkipLast
跳過後面的若干項數據
Take
只保留前面的若干項數據
TakeLast
只保留後面的若干項數據
組合操作
組合操作符用於將多個Observable組合成一個單一的Observable
操作符 |
含義 |
And/Then/When
通過模式(And條件)和計劃(Then次序)組合兩個或多個Observable發射的數據集
CombineLatest
當兩個Observables中的任何一個發射了一個數據時,通過一個指定的函數組合每個Observable發射的最新數據(一共兩個數據),然後發射這個函數的結果
Join
無論何時,如果一個Observable發射了一個數據項,只要在另一個Observable發射的數據項定義的時間窗口內,就將兩個Observable發射的數據合並發射
Merge
將兩個Observable發射的數據組合並成一個
StartWith
在發射原來的Observable的數據序列之前,先發射一個指定的數據序列或數據項
Switch
將一個發射Observable序列的Observable轉換為這樣一個Observable:它逐個發射那些Observable最近發射的數據
Zip
打包,使用一個指定的函數將多個Observable發射的數據組合在一起,然後將這個函數的結果作為單項數據發射
錯誤處理
這些操作符用於從錯誤通知中恢復
操作符 |
含義 |
Catch
捕獲,繼續序列操作,將錯誤替換為正常的數據,從onError通知中恢復
Retry
重試,如果Observable發射了一個錯誤通知,重新訂閱它,期待它正常終止
輔助操作
一組用於處理Observable的操作符
操作符 |
含義 |
Delay
延遲一段時間發射結果數據
Do
注冊一個動作占用一些Observable的生命周期事件,相當於Mock某個操作
Materialize/Dematerialize
將發射的數據和通知都當做數據發射,或者反過來
ObserveOn
指定觀察者觀察Observable的調度程序(工作線程)
Serialize
強制Observable按次序發射數據並且功能是有效的
Subscribe
收到Observable發射的數據和通知後執行的操作
SubscribeOn
指定Observable應該在哪個調度程序上執行
TimeInterval
將一個Observable轉換為發射兩個數據之間所耗費時間的Observable
Timeout
添加超時機制,如果過了指定的一段時間沒有發射數據,就發射一個錯誤通知
Timestamp
給Observable發射的每個數據項添加一個時間戳
Using
創建一個只在Observable的生命周期內存在的一次性資源
條件和布爾操作
這些操作符可用於單個或多個數據項,也可用於Observable
操作符 |
含義 |
All
判斷Observable發射的所有的數據項是否都滿足某個條件
Amb
給定多個Observable,只讓第一個發射數據的Observable發射全部數據
Contains
判斷Observable是否會發射一個指定的數據項
DefaultIfEmpty
發射來自原始Observable的數據,如果原始Observable沒有發射數據,就發射一個默認數據
SequenceEqual
判斷兩個Observable是否按相同的數據序列
SkipUntil
丟棄原始Observable發射的數據,直到第二個Observable發射了一個數據,然後發射原始
SkipWhile
丟棄原始Observable發射的數據,直到一個特定的條件為假,然後發射原始Observable剩余的數據
TakeUntil
發射來自原始Observable的數據,直到第二個Observable發射了一個數據或一個通知
TakeWhile
發射原始Observable的數據,直到一個特定的條件為真,然後跳過剩余的數據
算術和聚合操作
這些操作符可用於整個數據序列
操作符 |
含義 |
Average
計算Observable發射的數據序列的平均值,然後發射這個結果
Concat
不交錯的連接多個Observable的數據
Count
計算Observable發射的數據個數,然後發射這個結果
Max
計算並發射數據序列的最大值
Min
計算並發射數據序列的最小值
Reduce
按順序對數據序列的每一個應用某個函數,然後返回這個值
Sum
計算並發射數據序列的和
連接操作
一些有精確可控的訂閱行為的特殊Observable
操作符 |
含義 |
Connect
指示一個可連接的Observable開始發射數據給訂閱者
Publish
將一個普通的Observable轉換為可連接的
RefCount
使一個可連接的Observable表現得像一個普通的Observable
Replay
確保所有的觀察者收到同樣的數據序列,即使他們在Observable開始發射數據之後才訂閱
轉換操作
操作符 |
含義 |
To
將Observable轉換為其它的對象或數據結構
Blocking
阻塞Observable的操作符