文章目錄
- 1. 總覽
- 1.1 基本原理
- 1.2 導入包和依賴
- 2. 操作符
- 2.1 創建操作符
- 2.2 轉換操作符
- 2.3 組合操作符
- 2.4 功能操作符
1. 總覽
1.1 基本原理
參考文獻
構建流:每一步操作都會生成一個新的Observable節點(沒錯,包括ObserveOn和SubscribeOn線程變換操作),并將新生成的Observable返回,直到最后一步執行subscribe方法。編寫Rxjava代碼的過程其實就是構建一個一個Observable節點的過程
訂閱流:從最后一個N5節點的訂閱行為開始,依次執行前面各個節點真正的訂閱方法。在每個節點的訂閱方法中,都會生成一個新的Observer**,這個Observer會包含“下游”的Observer,這樣當每個節點都執行完訂閱(subscribeActual)后,也就生成了一串Observer,它們通過downstream,upstream引用連接
回調流: 當訂閱流執行到最后,也就是第一個節點N0時,用onNext方法,兩個作用,一個是把上個節點返回的數據進行一次map變換,另一個就是將map后的結果傳遞給下游。
小結:先從上到下把各個變換的Observable連成鏈(拼裝流水線),然后在最后subscribe的時候,又從下到上通過每個Observable的OnSubscribe從最下的Subscriber對象開始連成鏈(流水線開始工作包裝Subscriber),直到頂端,當頂端的Subscriber對象調用了onNext方法的時候,又從上往下調用Subscriber鏈的onNext(用戶一層層拆開包裝盒),里面執行了每個操作的變換邏輯。
1.2 導入包和依賴
implementation 'io.reactivex.rxjava2:rxjava:2.2.21'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
2. 操作符
添加鏈接描述
2.1 創建操作符
- Create
private void test1() {//被觀察者Observable;觀察者Observer/消費者consumer;通過subsribe訂閱Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> emitter) throws Exception {emitter.onNext("1");
// emitter.onError(new Throwable("異常模擬"));emitter.onComplete();}}).subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {System.out.println("subscribe");}@Overridepublic void onNext(Object o) {System.out.println("onNext Observer " + o);}@Overridepublic void onError(Throwable e) {System.out.println("erro");}@Overridepublic void onComplete() {System.out.println("Complete Observer....");}});}private void test2() {Disposable d = Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> emitter) throws Exception {emitter.onNext("2");emitter.onError(new Throwable("模擬異常"));emitter.onComplete();}}).subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {System.out.println("Accept " + o);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("Accept " + throwable);}});}
Observer:
適合需要完整事件處理的場景,包括處理數據、錯誤和完成信號。
提供了更靈活的事件處理能力,可以根據需求實現對錯誤和完成事件的響應。
Consumer:
適合簡單的場景,只需處理每個發出的數據項,而不需要關心錯誤或完成事件。
簡化了代碼結構,特別是在處理簡單流時,使用起來更為便捷和直觀。
- 其他
just 10個發射源
from 將一個Iterable、一個Future、 或者一個數組,內部通過代理的方式轉換成一個Observable
interval操作符 創建一個按固定時間間隔發射整數序列的Observable,這個序列為一個無限遞增的整數序列
range操作符 發射一個范圍內的有序整數序列,并且我們可以指定范圍的起始和長度
repeat操作符 重復發射原始Observable的數據序列,這個序列或者是無限的,或者通過repeat(n)指定重復次數
2.2 轉換操作符
map
將源Observable發送的數據轉換為一個新的Observable對象
private void test3(){Observable.just("111").map(new Function<String, Object>() {@Overridepublic Object apply(String s) throws Exception {return "my name is " + s;}}).subscribe(ob);}//subscribe
//onNext Observer my name is 111
//Complete Observer....
flatmap
添加鏈接描述
將一個發送事件的上游Observable變換為多個發送事件的Observables,然后將它們發射的事件合并后放進一個單獨的Observable里(但是是無序的)
private void test4(){Disposable ob = Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {emitter.onNext(1);emitter.onNext(2);emitter.onNext(3);}}).flatMap(new Function<Integer, ObservableSource<String>>() {@Overridepublic ObservableSource<String> apply(Integer o) throws Exception {final List<String> list = new ArrayList<>();for (int i = 0; i < 3; i++) {list.add("I am value " + o);}return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);//為了無序 加了延遲}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String o) throws Exception {System.out.println(o);}});}//出現的 1 2 3會隨機出現
concatMap
concatMap操作符類似于flatMap操作符,不同的一點是它按次序連接。
2.3 組合操作符
concat
concatArray 合并多個對象,按照一定的順序
merge
2.4 功能操作符
SubscribeOn 改變調用它之前代碼的線程,只有第一次有效
ObserveOn 改變調用它之后代碼的線程, 可以多次調用
Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> emitter) throws Exception {Log.d(TAG,"加了subscribeOn和observeOn: " + Thread.currentThread().getName());emitter.onNext("1111");emitter.onNext("22222");emitter.onComplete();}}).subscribeOn(Schedulers.newThread()) //1 進行創建和發射在子線程.observeOn(AndroidSchedulers.mainThread())// 2 在主線程消費;由于程序是test里面執行,所以不是main線程;后續改成了main是一樣的道理.subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG,"onSubscribe " + Thread.currentThread().getName());}@Overridepublic void onNext(Object o) {Log.d(TAG,"onNext " + Thread.currentThread().getName());}@Overridepublic void onError(Throwable e) {Log.d(TAG,"onError " + Thread.currentThread().getName());}@Overridepublic void onComplete() {Log.d(TAG,"onComplete " + Thread.currentThread().getName());}});}
這一個onSubsribe 一直是在測試線程里
1. **Observable 的創建和訂閱**:- 在 `subscribe()` 方法中,你創建了一個 `Observer` 對象,并將其訂閱到了 `Observable` 對象上。2. **onSubscribe 方法執行**:- 當 `subscribe()` 方法被調用后,`Observer` 對象的 `onSubscribe` 方法會立即執行。這是因為 `onSubscribe` 是 `Observer` 接口的一部分,它負責接收 `Disposable` 對象,表示訂閱關系,而不是響應數據流本身。3. **異步操作執行**:- 然后,`Observable` 中的異步操作開始執行。在你的例子中,通過 `Observable.create()` 創建了一個新的數據流,該數據流會在新線程(通過 `subscribeOn(Schedulers.newThread())` 指定的線程)中執行。這意味著 `Observable.create()` 中的代碼塊會在新線程中運行,而不會阻塞主線程。4. **數據流發射和消費**:- 在新線程中,`ObservableEmitter` 會發射數據項(通過 `emitter.onNext()` 發送數據)并在合適的時機調用 `onComplete()` 或者 `onError()`,表示數據流的結束。5. **observeOn 切換到主線程**:- 通過 `observeOn(AndroidSchedulers.mainThread())`,確保在數據流中的消費者部分(即 `Observer` 的 `onNext()`, `onError()`, `onComplete()` 方法)在主線程中執行。這個切換保證了在主線程更新UI或處理數據,從而避免了在主線程中執行耗時操作而導致的UI阻塞問題。