文章目錄
- 接口變化
- 操作符
- map
- flatmap
- debounce
- throttleFirst()
- take
- concat
RxJava 是一個基于 觀察者模式的異步編程庫,它提供了豐富的操作符來處理和轉換數據流。 操作符是 RxJava 的核心組成部分,它們提供了一種靈活、可組合的方式來處理數據流,使得開發者可以更加便捷地進行數據處理和流程控制。
接口變化
RxJava 2.x
擁有了新的特性,其依賴于4個接口:
Publisher
Subscriber
Subscription
Processor
- 為
Subscriber
,Func1
變為Function
等等。此外,還引入了Single
、Maybe
和Completable
等新的可觀察類型。- RxJava 2.x 中的背壓支持:RxJava 2.x 引入了對背壓的支持,新增了
Flowable
類型來處理背壓場景。同時,對一些操作符的行為進行了一些修改以適應背壓機制。- 異常處理方式的變化:在RxJava 1.x中,異常處理是通過
onError()
方法來處理,而在RxJava 2.x中,引入了onError(Throwable)
方法和onError(Throwable, boolean)
方法,允許開發者控制是否中斷流程。- 取消訂閱的方式變化:在RxJava 1.x中,使用
unsubscribe()
方法取消訂閱,而在RxJava 2.x中,使用dispose()
方法取消訂閱
關于背壓:
在RxJava中,背壓(Backpressure)是一種處理生產者和消費者之間速度不匹配的機制。通過背壓,可以使得消費者根據自身的處理能力告知生產者它們能夠接受的數據量,從而避免生產者產生過多的數據導致消費者無法處理的情況。
而在RxJava 2.x ~ RxJava 3.x,發生以下變化:
- 不再支持
Backpressure
:RxJava 3.x不再內置支持背壓機制,而是采用基于Reactive-Streams的響應式規范,并提供了相應的Flowable類型。因此,在RxJava 3.x中,需要使用Flowable
來處理背壓場景。 Observer
接口的變化:在RxJava 3.x中,Observer接口被拆分為兩個接口:Observer
和Disposable
。Observer
接口用于處理事件的消費,而Disposable
接口用于取消訂閱。SingleObserver
和CompletableObserver
的變化:在RxJava 3.x中,SingleObserver
和CompletableObserver
接口的方法簽名有所變化,取消訂閱的方法從dispose()
改為了onDispose()
。
操作符
RxJava
提供了對事件序列進行變換的支持,這是它的核心功能之一.所謂變換,就是將事件序列中的對象或整個序列進行加工處理,轉換成不同的事件或事件序列。
操作符是用來對Observable(或Flowable)流進行轉換、過濾、組合和操作的方法。
RxJava
提供了很多很有用的操作符。多的要死
在Rxjava 3.x 下,有以下常見的操作符:
map
:將Observable發射的數據項通過指定的函數進行轉換,并發射轉換后的數據項。filter
:根據指定的條件過濾Observable發射的數據項,只發射滿足條件的數據項。take
:只發射Observable發射的前N個數據項,忽略后面的數據項。skip
:跳過Observable發射的前N個數據項,只發射后面的數據項。merge
:將多個Observable合并成一個Observable,按照時間順序發射合并后的數據項。zip
:將多個Observable按照順序進行合并,每個數據項都是由對應位置的Observable發射的數據項組合而成。concat
:按照順序連接多個Observable,依次發射它們的數據項,等前一個Observable完成后才會訂閱下一個Observable。onErrorResumeNext
:在Observable發生錯誤時,使用備用的Observable繼續發射數據項。retry
:在Observable發生錯誤時,進行錯誤重試,重新訂閱Observable。interval
:創建一個按照固定時間間隔發射遞增數值的Observable。debounce
:只有在指定的時間間隔內沒有發射新的數據項時,才發射最后一個數據項。distinct
:過濾掉重復的數據項,只發射不重復的數據項。flatMap
:將Observable發射的數據項轉換為Observable集合,并按順序發射這些Observable發射的數據項。reduce
:對Observable發射的數據項進行累積操作,返回最終的累積結果。scan
:對Observable發射的數據項進行累積操作,并按順序發射每次累積的結果
在此簡單介紹其中幾個的用法:
map
示意圖:
實際上,map操作符可以理解為對Observable
發射的每個數據項都應用一個函數,將原始數據項轉換為另一種形式的數據項,然后再發射出去。(感覺Kotlin里有)
假設我們有一個Observable發射的是整數序列,我們想將每個整數乘以2,并發射出去。
Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5);observable.map(number -> number * 2).subscribe(result -> System.out.println(result));輸出:
2
4
6
8
10
flatmap
flatMap操作符會對Observable的每個數據項應用一個函數,這個函數返回一個新的Observable。然后,它會將這些新的Observable合并成一個Observable,并發射合并后的數據項。
假設我們有一個Observable發射的是字符串數組,我們想將每個字符串拆分為字符數組,并發射出去。
Observable<String> observable = Observable.just("Hello", "World", "RxJava");observable.flatMap(str -> Observable.fromArray(str.split(""))).subscribe(character -> System.out.print(character + " "));
輸出:
H e l l o W o r l d R x J a v a
flatMap操作符將每個字符串拆分為字符數組,并將所有的字符合并成了一個Observable,最終發射出去。
debounce
debounce
操作符也是RxJava中常用的操作符之一,它用于在一定時間間隔內只發射最后一個數據項,忽略中間的數據項。debounce
操作符主要用于處理需要在一定時間內連續發生的事件,但只關心最后一個事件的場景。
在安卓開發中,debounce
操作符可以用于處理用戶輸入場景,比如搜索框輸入關鍵詞時,通常需要等待用戶停止輸入一段時間后再進行搜索,以減少不必要的網絡請求。
Observable<String> observable = Observable.create(emitter -> {editText.addTextChangedListener(new TextWatcher() {@Overridepublic void beforeTextChanged(CharSequence s, int start, int count, int after) {}@Overridepublic void onTextChanged(CharSequence s, int start, int before, int count) {}@Overridepublic void afterTextChanged(Editable s) {emitter.onNext(s.toString());}});
});observable.debounce(500, TimeUnit.MILLISECONDS).subscribe(keyword -> {// 進行搜索操作performSearch(keyword);});
我們首先創建了一個Observable
對象,該Observable
通過監聽EditText
的文本變化事件,將用戶輸入的關鍵詞發射出去。
然后,我們使用debounce
操作符,設置一個時間間隔(這里是500毫秒),它會在用戶輸入停止500毫秒后才發射最后一個關鍵詞。
最后,通過subscribe
方法訂閱Observable
,并在訂閱中執行搜索操作。
這樣做的好處是,用戶在連續輸入時,debounce
操作符會忽略中間的輸入,只關注最后一個輸入,在用戶停止輸入一段時間后才執行搜索操作,避免不必要的網絡請求。
throttleFirst()
throttleFirst()
操作符也是RxJava中常用的操作符之一,它用于在指定時間間隔內只發射第一個數據項,忽略后續的數據項。throttleFirst
操作符主要用于處理需要限制觸發頻率的事件,保證在指定時間間隔內只處理一次。
在安卓開發中,throttleFirst
操作符可以用于處理按鈕點擊事件,防止用戶重復點擊按鈕造成重復操作:
Observable<Object> observable = Observable.create(emitter -> {button.setOnClickListener(v -> {emitter.onNext(new Object());});
});observable.throttleFirst(1000, TimeUnit.MILLISECONDS).subscribe(event -> {// 執行按鈕點擊操作ClickAction();});
take
take()操作符也是RxJava中常用的操作符之一,用于從Observable中取出一定數量的數據項,并在達到指定數量后停止發射。它可以與Retrofit和RxJava的線程切換一起使用,來控制網絡請求結果的數量和線程切換。
在安卓開發中,通常使用Retrofit進行網絡請求,而結合RxJava可以實現異步操作和線程切換。下面是一個結合Retrofit和RxJava的實例,使用take操作符來限制結果數量,并配合線程切換:
首先,創建一個網絡請求的接口:
public interface ApiInterface {@GET("data")Observable<List<Data>> getData();
}
然后,創建一個Retrofit實例,并結合RxJava的Observable進行網絡請求:
ApiInterface apiInterface = RetrofitClient.getClient().create(ApiInterface.class);apiInterface.getData().subscribeOn(Schedulers.io()) // 在IO線程進行網絡請求.observeOn(AndroidSchedulers.mainThread()) // 在主線程接收和處理結果.take(5) // 只接收前5個數據項.subscribe(dataList -> {// 處理獲取到的數據for (Data data : dataList) {Log.d(TAG, "Received data: " + data.toString());}}, throwable -> {// 處理錯誤Log.e(TAG, "Error: " + throwable.getMessage());});
先建了一個ApiInterface
的實例,用于定義網絡接口。
然后用Retrofit和RxJava的Observable
結合進行網絡請求。通過subscribeOn()
方法指定在IO線程進行網絡請求,observeOn()
方法指定在主線程接收和處理結果。使用take(5)
操作符來限制只接收前5個數據項,即結果數量限制為5。
最后,通過subscribe方法訂閱Observable,并在訂閱中處理獲取到的數據或錯誤。
concat
concat()
操作符是RxJava中常用的操作符之一,用于將多個Observable
按順序連接在一起,并依次發射數據。它可以在安卓開發中用于實現多個下載任務的順序執行。
在安卓開發中,有時需要進行多個文件的下載操作,可以使用concat操作符來依次執行下載任務:
創建一個下載任務的接口:
public interface DownloadService {@GETObservable<ResponseBody> downloadFile(@Url String fileUrl);
}
Retrofit結合RxJava的Observable進行下載任務:
DownloadService service = RetrofitClient.getClient().create(DownloadService.class);Observable<ResponseBody> downloadTask1 = service.downloadFile("http://example.com/file1");
Observable<ResponseBody> downloadTask2 = service.downloadFile("http://example.com/file2");
Observable<ResponseBody> downloadTask3 = service.downloadFile("http://example.com/file3");Observable.concat(downloadTask1, downloadTask2, downloadTask3).subscribeOn(Schedulers.io()) // 在IO線程進行下載任務.observeOn(AndroidSchedulers.mainThread()) // 在主線程接收和處理結果.subscribe(responseBody -> {// 處理下載完成的文件saveFile(responseBody);}, throwable -> {// 處理錯誤Log.e(TAG, "Error: " + throwable.getMessage());});
操作符真的很多,其他的可以看詳細文檔進行轉換,學習RxJava的操作符的關鍵是理解其原理和使用場景,以及熟悉常用的操作符和它們的功能。
操作符可以總結為以下幾種:
- 轉換操作符:用來對數據進行轉換,比如將一個數據類型轉換成另一個數據類型,或者對數據進行映射或扁平化處理。
- 過濾操作符:用來過濾數據流中的元素,比如只保留滿足特定條件的元素,或者去除重復的元素。
- 組合操作符:用來將多個數據流進行組合,比如將多個流依次連接在一起,或者合并多個流的元素。
- 錯誤處理操作符:用來處理異常和錯誤情況,比如在遇到錯誤時返回一個默認值,或者在錯誤發生時切換到另一個數據流。
- 調度操作符:用來控制數據流在不同線程之間的切換,比如將數據流切換到IO線程執行耗時操作,或者將結果切換回主線程更新UI。