目錄
- RxJava是什么?為什么使用。
- RxJava是如何使用的呢?
- RxJava如何和Retrofit一起使用。
- RxJava源碼分析。
- (1)他執行流程是如何的。
- (2)map
- (3)線程的切換。
- 如何自定義RxJava操作符?
一、RxJava是什么?為什么使用
RxJava 是一個基于 ??響應式編程范式?? 的庫,用于通過??觀察者模式??和??鏈式操作符??,簡化異步、事件驅動、多線程數據流處理的開發。
簡單來說,RxJava 就像是一個??“流水線工廠”??,專門處理需要等待的任務(比如網絡請求、數據庫查詢、復雜計算等)。它能把這些任務串成一條流水線,每個環節處理完數據后,自動傳給下一個環節,還能靈活控制任務在哪個線程執行(比如后臺線程干活,主線程更新UI)。
1.1 為什么要使用RxJava呢?
接下來,我們看看不使用的問題,以網絡請求為例。
需求??:按順序做三件事(登錄 → 查詢訂單 → 更新UI)。
??傳統寫法??:??“回調地獄”??,層層嵌套,像俄羅斯套娃!
// 偽代碼:傳統嵌套回調(問題代碼)
api.login(new Callback<User>() {@Overridepublic void onSuccess(User user) {api.getOrders(user.getId(), new Callback<List<Order>>() {@Overridepublic void onSuccess(List<Order> orders) {runOnUiThread(() -> {showOrders(orders); // 切主線程更新UI});}@Overridepublic void onFailure(Throwable error) {showError(error); // 每個回調都要處理錯誤!}});}@Overridepublic void onFailure(Throwable error) {showError(error);}
});
??問題總結??:
- ??代碼縮進成“金字塔”??,維護困難。
- ??重復處理錯誤??,每個回調都要寫
onFailure
。 - ??手動切換線程??(如
runOnUiThread
),容易遺漏。
??RxJava寫法??:
// RxJava 鏈式調用(解決方案)
api.rxLogin() // 1. 登錄(被觀察者).flatMap(user -> api.rxGetOrders(user.getId())) // 2. 查詢訂單(操作符).observeOn(AndroidSchedulers.mainThread()) // 3. 切到主線程.subscribe(orders -> showOrders(orders), // 4. 觀察者消費數據error -> showError(error)); // 統一錯誤處理!
優勢??:
1? ??代碼變“直線”??,邏輯清晰。
2? ??統一錯誤處理??,一個 onError
搞定所有。
3? ??自動線程切換??,不用寫 runOnUiThread
。
二、RxJava是如何使用的呢?
(1)添加依賴
implementation 'io.reactivex.rxjava3:rxjava:3.1.8'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.2' // Android 需要
(2)使用
Observable.just("你好").subscribe(new Observer<String>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {Log.d(TAG, "onSubscribe: 訂閱開始");}@Overridepublic void onNext(@NonNull String s) {Log.d(TAG, "onNext: 拿到事件"+s);}@Overridepublic void onError(@NonNull Throwable e) {Log.d(TAG, "onError: 錯誤事件");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete: 事件完成");}});
解釋一下執行流程:
(1)首先我們先需要知道幾個角色:觀察者( Observer
),被觀察者( Observable
)。
(2)當被觀察者發送出數據(調用just方法)你好
的時候,那么觀察者就會收到消息(subscribe方法就是訂閱)。
(3) subscribe()
將觀察者與被觀察者連接,觸發 Observable 開始發射數據,Observer接收并處理事件(數據、錯誤、完成信號)。
(4)Observer的 onSubscribe
方法訂閱時立即調用??(最先執行)。 通知觀察者訂閱已建立。
(5) onNext
方法Observable 發射數據時調用。
(6) onComplete()
方法Observable ??正常完成數據發射??后調用 。onError
反之。
三、RxJava如何和Retrofit一起使用
其實就是將Retrofit的響應結果交給RxJava來處理
3.1 發起一個請求
(1)我們需要在Retrofit這里配置 RxJava
適配器
public class RetrofitClient {private static final String BASE_URL = "https://www.wanandroid.com/";private static Retrofit retrofit;public static WanAndroidService getService() {if (retrofit == null) {retrofit = new Retrofit.Builder().baseUrl(BASE_URL).addConverterFactory(GsonConverterFactory.create()) // Gson 解析.addCallAdapterFactory(RxJava3CallAdapterFactory.create()) // RxJava 支持.build();}return retrofit.create(WanAndroidService.class);}
}
這行代碼的作用是 ??讓 Retrofit 支持返回 RxJava 3 的響應式類型??(如 Observable
、Flowable
、Single
等),使得網絡請求的結果可以直接通過 RxJava 的流式操作符處理。
(2)在接口這里,我們也是使用Observable來接收。
public interface WanAndroidService {// 示例1:登錄接口(POST)@FormUrlEncoded@POST("/user/login")Observable<ApiResponse<User>> login(@Field("username") String username,@Field("password") String password);// 示例2:獲取首頁文章列表(GET)@GET("/article/list/{page}/json")Observable<ApiResponse<List<Article>>> getHomeArticles(@Path("page") int page);
}
(3)調用接口
private void login(String username, String password) {WanAndroidService service = RetrofitClient.getService();service.login(username, password).subscribeOn(Schedulers.io()) // 在IO線程發起請求.observeOn(AndroidSchedulers.mainThread()) // 在主線程處理結果.subscribe(new Observer<ApiResponse<User>>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {compositeDisposable.add(d); // 統一管理訂閱Log.d(TAG, "onSubscribe: ");}@Overridepublic void onNext(@NonNull ApiResponse<User> response) {Log.d(TAG, "onNext: "+response);}@Overridepublic void onError(@NonNull Throwable e) {// 網絡錯誤Log.d(TAG, "onError: ");}@Overridepublic void onComplete() {// 請求完成Log.d(TAG, "onComplete: ");}});
}
執行邏輯:
service.login(username, password)
??:
通過 Retrofit 定義的接口發起網絡請求,返回一個Observable<ApiResponse<User>>
。注意 ??此時網絡請求尚未執行??,只是定義了數據源。.subscribeOn(Schedulers.io())
??:
指定 Observable 的工作線程為 ??IO 線程??observeOn(AndroidSchedulers.mainThread())
??:
指定 Observer 的回調方法(onNext
、onError
、onComplete
)在 ??主線程?? 執行。- ??
.subscribe(Observer)
??:
訂閱 Observable,觸發網絡請求執行,并綁定觀察者處理結果。 ??此時網絡請求正式啟動??。
3.2 發起兩個請求,網絡嵌套
需求:先登錄,登錄成功后再獲取用戶的收藏列表。
// 獲取收藏列表 (GET, 需要登錄態)
@GET("/lg/collect/list/{page}/json")
Observable<CollectListResponse> getCollectList(@Path("page") int page
);
private void nestedNetworkRequest() {WanAndroidService service = RetrofitClient.getService();service.login("xxx", "xxx").flatMap(loginResponse -> {Log.d(TAG, "nestedNetworkRequest: "+loginResponse);// 登錄成功后,獲取收藏列表if (loginResponse.getErrorCode() == 0) {return service.getCollectList(0); // 第0頁} else {return Observable.error(new Throwable("登錄失敗"));}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<CollectListResponse>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onNext(@NonNull CollectListResponse response) {Log.d(TAG, "onNext: "+response);}@Overridepublic void onError(@NonNull Throwable e) {Log.d(TAG, "onError: "+e.getMessage());}@Overridepublic void onComplete() {// 請求完成}});
}
flatMap方法是當 login
請求成功返回數據后才會調用。
四、源碼分析:
4.1 先從Observer(觀察者)開始
里面會有一個泛型,onNext也是使用這個泛型。
4.2 Observable(被觀察者)
(1)調用create方法的時候,就會創建一個ObservableCreate
ObservableCreate里面,包裹了source,按照上面的例子,就類似于調用了我們的login方法,將要發送的請求包裹起來。那么包裹起來干嘛?因為他不是現在執行的,我們都知道,需要調用訂閱,才會觸發整個流程執行。
4.3 Observable的subscribe訂閱過程
??訂閱發生??:調用 subscribe()
時,觸發 subscribeActual
。
進來以后,我們就可以看到,先執行了我們的onSubscribe方法,然后再去執行source,source就是我們前面說的,將請求包裹起來的內容。
我們看一個非常原始的代碼。
然后再里面調用了onNext,就是執行了觀察者的onNext方法,然后執行onCompleter那么到這里,整個執行流程就結束了。
4.4 Map變換操作符原理
為什么map可以改變onnext的接收類型呢?我們繼續看看。
可以看到,這里的類型就進行了轉換。但是為什么觀察者也跟著變了呢?
在這里的時候,就已經處理。 返回一個R類型的Observable實例,那么T也就變成了R。
4.5 異步線程切換:subscribeOn(Schedulers.io())
我們拿一個代碼來進行分析。
service.login(username, password).subscribeOn(Schedulers.io()) // 在IO線程發起請求.observeOn(AndroidSchedulers.mainThread()) // 在主線程處理結果.subscribe(new Observer<ApiResponse<User>>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {compositeDisposable.add(d); // 統一管理訂閱Log.d(TAG, "onSubscribe: ");}...}
}
subscribeOn接收一個Scheduler參數,Scheduler類它封裝了線程池和線程切換邏輯。
那么Schedulers.io()做了什么?Schedulers.io()
??是一種策略,他會將內部的線程池配置成IO密集型的。我們會發現里面有很多種策略。
那么我們看看subscribeOn做了什么呢?他拿到線程池以后,做什么呢?先保存起來。我們先記住subscribeOn返回了一個ObservableSubscribeOn類
因為我們知道任務需要靠訂閱方法才觸發的,所以我們來看看ObservableSubscribeOn中的
subscribeActual訂閱方法
scheduler.scheduleDirect
createWorker是一個抽象方法,因為前面我們配置的是Schedulers.io(),所以打開IoScheduler的createWorker
,然后會調用schedult方法執行現成。
所以我們的任務就被異步線程執行了。
4.5 主線程切換
那么他如何從異步線程,又切換回主線程的?
.observeOn(AndroidSchedulers.mainThread())
AndroidSchedulers.mainThread()也是一個Scheduler,這里就不多介紹了。我們主要看看他返回的Scheduler
最終切換主線程,還是使用到了handler
我們記住HandlerScheduler類
我們來到observeOn方法
所以,然后最終交給了HandlerScheduler類來執行。
好了,到這里,源碼分析就結束了。
那么我們看源碼是為了什么?我們可以自定義RxJava操作符來玩玩,也會讓我們對前面學習更加的通透理解。
五、自定義RxJava操作符
我們之定義RxJava操作符,并不是說我們自己實現Observable,而是繼承他去實現一些功能。
我們看過just方法就知道,其實繼承了Observable
然后重寫subscribeActual方法,將value包裹進行處理,然后再調用觀察者進行分發。
下面我們就寫一個防抖操作符,幫助我們理解整個流程。
5.1 自定義防抖操作符
public class RxView {private final static String TAG = RxView.class.getSimpleName();// 我們自己的操作符 == 函數public static Observable<Object> clicks(View view) {return new ViewClickObservable(view);}}
public class ViewClickObservable extends Observable<Object> {private final View view;private static final Object EVENT = new Object();public ViewClickObservable(View view) {this.view = view;}@Overrideprotected void subscribeActual(Observer<? super Object> observer) {MyListener myListener = new MyListener(view, observer);//1.拿到view進行處理observer.onSubscribe(myListener);this.view.setOnClickListener(myListener);}// 拿到viewstatic final class MyListener implements View.OnClickListener, Disposable {private final View view;private Observer<Object> observer;private final AtomicBoolean isDisposable = new AtomicBoolean();public MyListener(View view, Observer<Object> observer) {this.view = view;this.observer = observer;}@Overridepublic void onClick(View v) {if (isDisposed() == false) {observer.onNext(EVENT);}}// 如果用調用了 中斷@Overridepublic void dispose() {// 如果沒有中斷過,才有資格, 取消view.setOnClickListener(null);if (isDisposable.compareAndSet(false, true)) {// 主線程 很好的中斷if (Looper.myLooper() == Looper.getMainLooper()) {view.setOnClickListener(null);} else { // 主線程,通過Handler的切換/*new Handler(Looper.getMainLooper()) {@Overridepublic void handleMessage(@NonNull Message msg) {super.handleMessage(msg);view.setOnClickListener(null);}};*///放到主線程執行。AndroidSchedulers.mainThread().scheduleDirect(new Runnable() {@Overridepublic void run() {view.setOnClickListener(null);}});}}}@Overridepublic boolean isDisposed() {return isDisposable.get();}}
}
RxView.clicks(button).throttleFirst(2000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {}});}
}
執行流程:
- 用戶通過
RxView.clicks(view)
創建ViewClickObservable
。 - 調用
subscribe()
后觸發subscribeActual
,創建MyListener
并綁定到View
的點擊事件。 - 點擊事件觸發
onClick
,通過Observer
發送onNext
事件。然后Observer
的accept
方法就收到了事件(object) - 調用
dispose()
時移除View
的點擊監聽。