程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫

RxJava

編輯:JAVA綜合教程

RxJava


寫在前面

什麼是 ReactiveX?

ReactiveX 是一個專注於異步編程與控制可觀察數據(或者事件)流的API。
它組合了觀察者模式,迭代器模式和函數式編程的優秀思想。
實時數據處理是一件普通的現象,有一個高效、干淨和可擴展的方式來處理這些情景是重要的。
使用 Observables 和 Operators 來熟練操作它們。
ReactiveX 提供一個可組合又靈活的 API 來創建和處理數據流,同時簡化了異步編程帶來的一些擔憂,如:線程創建和並發問題。

RxJava 簡介

RxJava 是 ReactiveX 在 Java 上的開源的實現。
Observable(可觀察者,即被觀察者) 和 Subscriber / Observaber(訂閱者 / 觀察者)是兩個主要的類。
Subscriber是一個實現了 Observer 的抽象類。 Subscriber 對 Observer 接口進行了一些擴展,但他們的基本使用方式是完全一樣的
在RxJava 上,一個 Observable 是一個發出數據流或者事件的類,Subscriber 是一個對這些發出的 items(數據流或者事件)進行處理(采取行動)的類。
一個 Observable 的標准流發出一個或多個 item,然後成功完成或者出錯。
一個Observable 可以有多個 Subscribers,並且通過 Observable 發出的每一個 item,該 item 將會被發送到Subscriber.onNext() 方法來進行處理。
一旦 Observable 不再發出 items,它將會調用 Subscriber.onCompleted() 方法,或如果有一個出錯的話 Observable 會調用 Subscriber.onError() 方法。

Subscriber和Observer的區別

Subscriber和Observer不僅基本使用方式一樣,實質上,在 RxJava 的 subscribe 過程中,Observer 也總是會先被轉換成一個 Subscriber 再使用。所以如果你只想使用基本功能,選擇 Observer 和 Subscriber 是完全一樣的。它們的區別對於使用者來說主要有兩點:

onStart(): 這是 Subscriber 增加的方法。它會在 subscribe 剛開始,而事件還未發送之前被調用,可以用於做一些准備工作,例如數據的清零或重置。這是一個可選方法,默認情況下它的實現為空。需要注意的是,如果對准備工作的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執行), onStart() 就不適用了,因為它總是在 subscribe 所發生的線程被調用,而不能指定線程。要在指定的線程來做准備工作,可以使用 doOnSubscribe() 方法,具體可以在後面的文中看到。 unsubscribe(): 這是 Subscriber 所實現的另一個接口 Subscription 的方法,用於取消訂閱。在這個方法被調用後,Subscriber 將不再接收事件。一般在這個方法調用前,可以使用 isUnsubscribed() 先判斷一下狀態。 unsubscribe() 這個方法很重要,因為在 subscribe() 之後, Observable 會持有 Subscriber 的引用,這個引用如果不能及時被釋放,將有內存洩露的風險。所以最好保持一個原則:要在不再使用的時候盡快在合適的地方(例如 onPause() onStop() 等方法中)調用 unsubscribe() 來解除引用關系,以避免內存洩露的發生。

操作符

其實操作符就是為了提供一些函數式的特性。函數式最大的好處就是處理數據簡潔易懂。

map就是相當於對每一個元素進行變換,返回變換後的集合
filter就是對集合進行過濾
each就是遍歷集合
take取出集合中的前幾個
skip跳過前幾個元素
unique相當於按照數學上的集合處理,去重

其實我覺得題主覺得最難理解的應該是flatMap和Observable的概念吧。Observable可以理解成lazy load的集合。flatMap想當於對lazyLoad的集合中的每個元素再進行一次lazy load。

用於創建Observable的操作符

操作符 含義 Create 通過調用觀察者的方法從頭創建一個Observable Defer 在觀察者訂閱之前不創建這個Observable,為每一個觀察者創建一個新的Observable Empty/Never/Throw 創建行為受限的特殊Observable From 將其它的對象或數據結構轉換為Observable Interval 創建一個定時發射整數序列的Observable Just 將對象或者對象集合轉換為一個會發射這些對象的Observable Range 創建發射指定范圍的整數序列的Observable Repeat 創建重復發射特定的數據或數據序列的Observable Start 創建發射一個函數的返回值的Observable Timer 創建在一個指定的延遲之後發射單個數據的Observable

變換操作

這些操作符可用於對Observable發射的數據進行變換,詳細解釋可以看每個操作符的文檔

操作符 含義 Buffer 緩存,可以簡單的理解為緩存,它定期從Observable收集數據到一個集合,然後把這些數據集合打包發射,而不是一次發射一個 FlatMap 扁平映射,將Observable發射的數據變換為Observables集合,然後將這些Observable發射的數據平坦化的放進一個單獨的Observable,可以認為是一個將嵌套的數據結構展開的過程。 GroupBy 分組,將原來的Observable分拆為Observable集合,將原始Observable發射的數據按Key分組,每一個Observable發射一組不同的數據 Map 映射,通過對序列的每一項都應用一個函數變換Observable發射的數據,實質是對序列中的每一項執行一個函數,函數的參數就是這個數據項 Scan 掃描,對Observable發射的每一項數據應用一個函數,然後按順序依次發射這些值 Window 窗口,定期將來自Observable的數據分拆成一些Observable窗口,然後發射這些窗口,而不是每次發射一項。類似於Buffer,但Buffer發射的是數據,Window發射的是Observable,每一個Observable發射原始Observable的數據的一個子集

過濾操作

這些操作符用於從Observable發射的數據中進行選擇

操作符 含義 Debounce 只有在空閒了一段時間後才發射數據,通俗的說,就是如果一段時間沒有操作,就執行一次操作 Distinct 去重,過濾掉重復數據項 ElementAt 取值,取特定位置的數據項 Filter 過濾,過濾掉沒有通過謂詞測試的數據項,只發射通過測試的 First 首項,只發射滿足條件的第一條數據 IgnoreElements 忽略所有的數據,只保留終止通知(onError或onCompleted) Last 末項,只發射最後一條數據 Sample 取樣,定期發射最新的數據,等於是數據抽樣,有的實現裡叫ThrottleFirst Skip 跳過前面的若干項數據 SkipLast 跳過後面的若干項數據 Take 只保留前面的若干項數據 TakeLast 只保留後面的若干項數據

組合操作

組合操作符用於將多個Observable組合成一個單一的Observable

操作符 含義 And/Then/When 通過模式(And條件)和計劃(Then次序)組合兩個或多個Observable發射的數據集 CombineLatest 當兩個Observables中的任何一個發射了一個數據時,通過一個指定的函數組合每個Observable發射的最新數據(一共兩個數據),然後發射這個函數的結果 Join 無論何時,如果一個Observable發射了一個數據項,只要在另一個Observable發射的數據項定義的時間窗口內,就將兩個Observable發射的數據合並發射 Merge 將兩個Observable發射的數據組合並成一個 StartWith 在發射原來的Observable的數據序列之前,先發射一個指定的數據序列或數據項 Switch 將一個發射Observable序列的Observable轉換為這樣一個Observable:它逐個發射那些Observable最近發射的數據 Zip 打包,使用一個指定的函數將多個Observable發射的數據組合在一起,然後將這個函數的結果作為單項數據發射

錯誤處理

這些操作符用於從錯誤通知中恢復

操作符 含義 Catch 捕獲,繼續序列操作,將錯誤替換為正常的數據,從onError通知中恢復 Retry 重試,如果Observable發射了一個錯誤通知,重新訂閱它,期待它正常終止

輔助操作

一組用於處理Observable的操作符

操作符 含義 Delay 延遲一段時間發射結果數據 Do 注冊一個動作占用一些Observable的生命周期事件,相當於Mock某個操作 Materialize/Dematerialize 將發射的數據和通知都當做數據發射,或者反過來 ObserveOn 指定觀察者觀察Observable的調度程序(工作線程) Serialize 強制Observable按次序發射數據並且功能是有效的 Subscribe 收到Observable發射的數據和通知後執行的操作 SubscribeOn 指定Observable應該在哪個調度程序上執行 TimeInterval 將一個Observable轉換為發射兩個數據之間所耗費時間的Observable Timeout 添加超時機制,如果過了指定的一段時間沒有發射數據,就發射一個錯誤通知 Timestamp 給Observable發射的每個數據項添加一個時間戳 Using 創建一個只在Observable的生命周期內存在的一次性資源

條件和布爾操作

這些操作符可用於單個或多個數據項,也可用於Observable

操作符 含義 All 判斷Observable發射的所有的數據項是否都滿足某個條件 Amb 給定多個Observable,只讓第一個發射數據的Observable發射全部數據 Contains 判斷Observable是否會發射一個指定的數據項 DefaultIfEmpty 發射來自原始Observable的數據,如果原始Observable沒有發射數據,就發射一個默認數據 SequenceEqual 判斷兩個Observable是否按相同的數據序列 SkipUntil 丟棄原始Observable發射的數據,直到第二個Observable發射了一個數據,然後發射原始 SkipWhile 丟棄原始Observable發射的數據,直到一個特定的條件為假,然後發射原始Observable剩余的數據 TakeUntil 發射來自原始Observable的數據,直到第二個Observable發射了一個數據或一個通知 TakeWhile 發射原始Observable的數據,直到一個特定的條件為真,然後跳過剩余的數據

算術和聚合操作

這些操作符可用於整個數據序列

操作符 含義 Average 計算Observable發射的數據序列的平均值,然後發射這個結果 Concat 不交錯的連接多個Observable的數據 Count 計算Observable發射的數據個數,然後發射這個結果 Max 計算並發射數據序列的最大值 Min 計算並發射數據序列的最小值 Reduce 按順序對數據序列的每一個應用某個函數,然後返回這個值 Sum 計算並發射數據序列的和

連接操作

一些有精確可控的訂閱行為的特殊Observable

操作符 含義 Connect 指示一個可連接的Observable開始發射數據給訂閱者 Publish 將一個普通的Observable轉換為可連接的 RefCount 使一個可連接的Observable表現得像一個普通的Observable Replay 確保所有的觀察者收到同樣的數據序列,即使他們在Observable開始發射數據之後才訂閱

轉換操作

操作符 含義 To 將Observable轉換為其它的對象或數據結構 Blocking 阻塞Observable的操作符

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved