Android RxJava框架分析:它的執行流程是如何的?它的線程是如何切換的?如何自定義RxJava操作符?

目錄

  1. RxJava是什么?為什么使用。
  2. RxJava是如何使用的呢?
  3. RxJava如何和Retrofit一起使用。
  4. RxJava源碼分析。
  • (1)他執行流程是如何的。
  • (2)map
  • (3)線程的切換。
  1. 如何自定義RxJava操作符?

一、RxJava是什么?為什么使用

RxJava 是一個基于 ??響應式編程范式?? 的庫,用于通過??觀察者模式??和??鏈式操作符??,簡化異步、事件驅動、多線程數據流處理的開發。

簡單來說,RxJava 就像是一個??“流水線工廠”??,專門處理需要等待的任務(比如網絡請求、數據庫查詢、復雜計算等)。它能把這些任務串成一條流水線,每個環節處理完數據后,自動傳給下一個環節,還能靈活控制任務在哪個線程執行(比如后臺線程干活,主線程更新UI)。

1.1 為什么要使用RxJava呢?

接下來,我們看看不使用的問題,以網絡請求為例

需求??:按順序做三件事(登錄 → 查詢訂單 → 更新UI)。

??傳統寫法??:??“回調地獄”??,層層嵌套,像俄羅斯套娃!

// 偽代碼:傳統嵌套回調(問題代碼)
api.login(new Callback<User>() {@Overridepublic void onSuccess(User user) {api.getOrders(user.getId(), new Callback<List<Order>>() {@Overridepublic void onSuccess(List<Order> orders) {runOnUiThread(() -> {showOrders(orders); // 切主線程更新UI});}@Overridepublic void onFailure(Throwable error) {showError(error); // 每個回調都要處理錯誤!}});}@Overridepublic void onFailure(Throwable error) {showError(error);}
});

??問題總結??:

  • ??代碼縮進成“金字塔”??,維護困難。
  • ??重復處理錯誤??,每個回調都要寫 onFailure
  • ??手動切換線程??(如 runOnUiThread),容易遺漏。

??RxJava寫法??:

// RxJava 鏈式調用(解決方案)
api.rxLogin()                   // 1. 登錄(被觀察者).flatMap(user -> api.rxGetOrders(user.getId())) // 2. 查詢訂單(操作符).observeOn(AndroidSchedulers.mainThread()) // 3. 切到主線程.subscribe(orders -> showOrders(orders), // 4. 觀察者消費數據error -> showError(error));   // 統一錯誤處理!

優勢??:
1? ??代碼變“直線”??,邏輯清晰。
2? ??統一錯誤處理??,一個 onError 搞定所有。
3? ??自動線程切換??,不用寫 runOnUiThread


二、RxJava是如何使用的呢?

(1)添加依賴

implementation 'io.reactivex.rxjava3:rxjava:3.1.8'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.2' // Android 需要

(2)使用

Observable.just("你好").subscribe(new Observer<String>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {Log.d(TAG, "onSubscribe: 訂閱開始");}@Overridepublic void onNext(@NonNull String s) {Log.d(TAG, "onNext: 拿到事件"+s);}@Overridepublic void onError(@NonNull Throwable e) {Log.d(TAG, "onError: 錯誤事件");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete: 事件完成");}});

在這里插入圖片描述

解釋一下執行流程:

(1)首先我們先需要知道幾個角色:觀察者( Observer),被觀察者( Observable)。

(2)當被觀察者發送出數據(調用just方法)你好的時候,那么觀察者就會收到消息(subscribe方法就是訂閱)。

(3) subscribe() 將觀察者與被觀察者連接,觸發 Observable 開始發射數據,Observer接收并處理事件(數據、錯誤、完成信號)。

(4)Observer的 onSubscribe 方法訂閱時立即調用??(最先執行)。 通知觀察者訂閱已建立。

(5) onNext 方法Observable 發射數據時調用。

(6) onComplete() 方法Observable ??正常完成數據發射??后調用 。onError 反之。


三、RxJava如何和Retrofit一起使用

其實就是將Retrofit的響應結果交給RxJava來處理

3.1 發起一個請求

(1)我們需要在Retrofit這里配置 RxJava適配器

public class RetrofitClient {private static final String BASE_URL = "https://www.wanandroid.com/";private static Retrofit retrofit;public static WanAndroidService getService() {if (retrofit == null) {retrofit = new Retrofit.Builder().baseUrl(BASE_URL).addConverterFactory(GsonConverterFactory.create()) // Gson 解析.addCallAdapterFactory(RxJava3CallAdapterFactory.create()) // RxJava 支持.build();}return retrofit.create(WanAndroidService.class);}
}

這行代碼的作用是 ??讓 Retrofit 支持返回 RxJava 3 的響應式類型??(如 ObservableFlowableSingle 等),使得網絡請求的結果可以直接通過 RxJava 的流式操作符處理。

(2)在接口這里,我們也是使用Observable來接收。

public interface WanAndroidService {// 示例1:登錄接口(POST)@FormUrlEncoded@POST("/user/login")Observable<ApiResponse<User>> login(@Field("username") String username,@Field("password") String password);// 示例2:獲取首頁文章列表(GET)@GET("/article/list/{page}/json")Observable<ApiResponse<List<Article>>> getHomeArticles(@Path("page") int page);
}

(3)調用接口

private void login(String username, String password) {WanAndroidService service = RetrofitClient.getService();service.login(username, password).subscribeOn(Schedulers.io()) // 在IO線程發起請求.observeOn(AndroidSchedulers.mainThread()) // 在主線程處理結果.subscribe(new Observer<ApiResponse<User>>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {compositeDisposable.add(d); // 統一管理訂閱Log.d(TAG, "onSubscribe: ");}@Overridepublic void onNext(@NonNull ApiResponse<User> response) {Log.d(TAG, "onNext: "+response);}@Overridepublic void onError(@NonNull Throwable e) {// 網絡錯誤Log.d(TAG, "onError: ");}@Overridepublic void onComplete() {// 請求完成Log.d(TAG, "onComplete: ");}});
}

執行邏輯:

  1. service.login(username, password)??:
    通過 Retrofit 定義的接口發起網絡請求,返回一個 Observable<ApiResponse<User>>。注意 ??此時網絡請求尚未執行??,只是定義了數據源。
  2. .subscribeOn(Schedulers.io())??:
    指定 Observable 的工作線程為 ??IO 線程??
  3. observeOn(AndroidSchedulers.mainThread())??:
    指定 Observer 的回調方法(onNextonErroronComplete)在 ??主線程?? 執行。
  4. ??.subscribe(Observer)??:
    訂閱 Observable,觸發網絡請求執行,并綁定觀察者處理結果。 ??此時網絡請求正式啟動??。

3.2 發起兩個請求,網絡嵌套

需求:先登錄,登錄成功后再獲取用戶的收藏列表。

// 獲取收藏列表 (GET, 需要登錄態)
@GET("/lg/collect/list/{page}/json")
Observable<CollectListResponse> getCollectList(@Path("page") int page
);
private void nestedNetworkRequest() {WanAndroidService service = RetrofitClient.getService();service.login("xxx", "xxx").flatMap(loginResponse -> {Log.d(TAG, "nestedNetworkRequest: "+loginResponse);// 登錄成功后,獲取收藏列表if (loginResponse.getErrorCode() == 0) {return service.getCollectList(0); // 第0頁} else {return Observable.error(new Throwable("登錄失敗"));}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<CollectListResponse>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onNext(@NonNull CollectListResponse response) {Log.d(TAG, "onNext: "+response);}@Overridepublic void onError(@NonNull Throwable e) {Log.d(TAG, "onError: "+e.getMessage());}@Overridepublic void onComplete() {// 請求完成}});
}

flatMap方法是當 login 請求成功返回數據后才會調用。

在這里插入圖片描述

四、源碼分析:

4.1 先從Observer(觀察者)開始

里面會有一個泛型,onNext也是使用這個泛型。

在這里插入圖片描述

4.2 Observable(被觀察者)

(1)調用create方法的時候,就會創建一個ObservableCreate
在這里插入圖片描述

ObservableCreate里面,包裹了source,按照上面的例子,就類似于調用了我們的login方法,將要發送的請求包裹起來。那么包裹起來干嘛?因為他不是現在執行的,我們都知道,需要調用訂閱,才會觸發整個流程執行。

4.3 Observable的subscribe訂閱過程

在這里插入圖片描述

??訂閱發生??:調用 subscribe() 時,觸發 subscribeActual

在這里插入圖片描述

進來以后,我們就可以看到,先執行了我們的onSubscribe方法,然后再去執行source,source就是我們前面說的,將請求包裹起來的內容。

我們看一個非常原始的代碼。
在這里插入圖片描述

然后再里面調用了onNext,就是執行了觀察者的onNext方法,然后執行onCompleter那么到這里,整個執行流程就結束了。

4.4 Map變換操作符原理

為什么map可以改變onnext的接收類型呢?我們繼續看看。

在這里插入圖片描述

可以看到,這里的類型就進行了轉換。但是為什么觀察者也跟著變了呢?
在這里插入圖片描述

在這里的時候,就已經處理。 返回一個R類型的Observable實例,那么T也就變成了R。

4.5 異步線程切換:subscribeOn(Schedulers.io())

我們拿一個代碼來進行分析。

service.login(username, password).subscribeOn(Schedulers.io()) // 在IO線程發起請求.observeOn(AndroidSchedulers.mainThread()) // 在主線程處理結果.subscribe(new Observer<ApiResponse<User>>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {compositeDisposable.add(d); // 統一管理訂閱Log.d(TAG, "onSubscribe: ");}...}
}

subscribeOn接收一個Scheduler參數,Scheduler類它封裝了線程池和線程切換邏輯。

在這里插入圖片描述

在這里插入圖片描述

那么Schedulers.io()做了什么?Schedulers.io()??是一種策略,他會將內部的線程池配置成IO密集型的。我們會發現里面有很多種策略。

在這里插入圖片描述

那么我們看看subscribeOn做了什么呢?他拿到線程池以后,做什么呢?先保存起來。我們先記住subscribeOn返回了一個ObservableSubscribeOn類
在這里插入圖片描述

因為我們知道任務需要靠訂閱方法才觸發的,所以我們來看看ObservableSubscribeOn中的
subscribeActual訂閱方法
在這里插入圖片描述

scheduler.scheduleDirect

在這里插入圖片描述

createWorker是一個抽象方法,因為前面我們配置的是Schedulers.io(),所以打開IoScheduler的createWorker
,然后會調用schedult方法執行現成。

在這里插入圖片描述

在這里插入圖片描述

在這里插入圖片描述

所以我們的任務就被異步線程執行了。

4.5 主線程切換

那么他如何從異步線程,又切換回主線程的?

.observeOn(AndroidSchedulers.mainThread())

AndroidSchedulers.mainThread()也是一個Scheduler,這里就不多介紹了。我們主要看看他返回的Scheduler

在這里插入圖片描述

最終切換主線程,還是使用到了handler

在這里插入圖片描述

我們記住HandlerScheduler類

在這里插入圖片描述

我們來到observeOn方法

在這里插入圖片描述

所以,然后最終交給了HandlerScheduler類來執行。

好了,到這里,源碼分析就結束了。

那么我們看源碼是為了什么?我們可以自定義RxJava操作符來玩玩,也會讓我們對前面學習更加的通透理解。

五、自定義RxJava操作符

我們之定義RxJava操作符,并不是說我們自己實現Observable,而是繼承他去實現一些功能。

我們看過just方法就知道,其實繼承了Observable
在這里插入圖片描述

在這里插入圖片描述

然后重寫subscribeActual方法,將value包裹進行處理,然后再調用觀察者進行分發。

下面我們就寫一個防抖操作符,幫助我們理解整個流程。

5.1 自定義防抖操作符

public class RxView {private final static String TAG = RxView.class.getSimpleName();// 我們自己的操作符 == 函數public static Observable<Object> clicks(View view) {return new ViewClickObservable(view);}}

public class ViewClickObservable extends Observable<Object> {private final View view;private static final Object EVENT = new Object();public ViewClickObservable(View view) {this.view = view;}@Overrideprotected void subscribeActual(Observer<? super Object> observer) {MyListener myListener = new MyListener(view, observer);//1.拿到view進行處理observer.onSubscribe(myListener);this.view.setOnClickListener(myListener);}// 拿到viewstatic final class MyListener implements View.OnClickListener, Disposable {private final View view;private Observer<Object> observer;private final AtomicBoolean isDisposable = new AtomicBoolean();public MyListener(View view, Observer<Object> observer) {this.view = view;this.observer = observer;}@Overridepublic void onClick(View v) {if (isDisposed() == false) {observer.onNext(EVENT);}}// 如果用調用了 中斷@Overridepublic void dispose() {// 如果沒有中斷過,才有資格,   取消view.setOnClickListener(null);if (isDisposable.compareAndSet(false, true)) {// 主線程 很好的中斷if (Looper.myLooper() == Looper.getMainLooper()) {view.setOnClickListener(null);} else { // 主線程,通過Handler的切換/*new Handler(Looper.getMainLooper()) {@Overridepublic void handleMessage(@NonNull Message msg) {super.handleMessage(msg);view.setOnClickListener(null);}};*///放到主線程執行。AndroidSchedulers.mainThread().scheduleDirect(new Runnable() {@Overridepublic void run() {view.setOnClickListener(null);}});}}}@Overridepublic boolean isDisposed() {return isDisposable.get();}}
}

在這里插入圖片描述

RxView.clicks(button).throttleFirst(2000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {}});}
}

執行流程:

  • 用戶通過 RxView.clicks(view) 創建 ViewClickObservable
  • 調用 subscribe() 后觸發 subscribeActual,創建 MyListener 并綁定到 View 的點擊事件。
  • 點擊事件觸發 onClick,通過 Observer 發送 onNext 事件。然后Observeraccept方法就收到了事件(object)
  • 調用 dispose() 時移除 View 的點擊監聽。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/80330.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/80330.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/80330.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

QT的初始代碼解讀及其布局和彈簧

this指的是真正的當前正在顯示的窗口 main函數&#xff1a; Widget w是生成了一個主窗口&#xff0c;QT Designer是在這個主窗口里塞組件 w.show()用來展示這個主窗口 頭文件&#xff1a; namespace Ui{class Widget;}中的class Widget和下面的class Widget不是一個東西 Ui…

什么是AI寫作

一、AI寫作簡介 AI 寫作正在成為未來 10 年最炙手可熱的超級技能。已經有越來越多的人通過 AI 寫作&#xff0c;在自媒體、公文寫作、商業策劃等領域實現了提效&#xff0c;甚至產生了變現收益。 掌握 AI 寫作技能&#xff0c;不僅能提高個人生產力&#xff0c;還可能在未來的 …

13.原生測試框架Unittest解決用例組織問題 與測試套件的使用

13. 原生測試框架Unittest解決用例組織問題 與測試套件的使用 一、測試架構核心組件解析 1.1 系統組成模塊 #mermaid-svg-bYie0B3MLRp0HL4g {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-bYie0B3MLRp0HL4g .erro…

UE5 腳部貼地不穿過地板方案

UE自帶的IK RIG和ControlRig技術 【UE5】角色腳部IK——如何讓腳貼在不同斜度的地面(設置腳的旋轉)_嗶哩嗶哩_bilibili 實驗后這個還是有一部分問題,首先只能保證高度不能穿過,但是腳步旋轉還是會導致穿模 IK前,整個模型在斜坡上會浮空 參考制作:https://www.youtube.com/w…

關于 js:4. 異步機制與事件循環

一、同步 vs 異步 1. 什么是同步&#xff08;Synchronous&#xff09; 同步代碼就是一行一行、按順序執行的。當前行沒有執行完&#xff0c;下一行不能動。 示例&#xff1a; console.log("A"); console.log("B"); console.log("C");輸出&am…

如何通過外網訪問內網?對比5個簡單的局域網讓互聯網連接方案

在實際應用中&#xff0c;常常需要從外網訪問內網資源&#xff0c;如遠程辦公訪問公司內部服務器、在家訪問家庭網絡中的設備等。又或者在本地內網搭建的項目應用需要提供互聯網服務。以下介紹幾種常見的外網訪問內網、內網提供公網連接實現方法參考。 一、公網IP路由器端口映…

java的輸入輸出模板(ACM模式)

文章目錄 1、前置準備2、普通輸入輸出API①、輸入API②、輸出API 3、快速輸入輸出API①、BufferedReader②、BufferedWriter 案例題目描述代碼 面試有時候要acm模式&#xff0c;刷慣leetcode可能會手生不會acm模式&#xff0c;該文直接通過幾個題來熟悉java的輸入輸出模板&…

什么是移動設備管理(MDM)

移動設備管理&#xff08;MDM&#xff09;是一種安全解決方案&#xff0c;旨在監控、管理和保護企業的移動設備&#xff08;包括智能手機、平板電腦、筆記本電腦和計算機&#xff09;。MDM軟件是IT部門的關鍵工具&#xff0c;其核心功能包括設備配置、安全策略實施、遠程控制及…

c++中構造對象實例的兩種方式及其返回值

c中&#xff0c;構造對象實例有兩種方式&#xff0c;一種返回對象實例&#xff0c;一種返回該對象實例的指針。如下所示&#xff1a; 一、兩種返回值 RedisConn conn1; //得到實例conn1;RedisConn *conn2 new RedisConn();//得到指針conn2;RedisConn conn3 new RedisConn()…

【Unity筆記】PathCreator使用教程:用PathCreator實現自定義軌跡動畫與路徑控制

在Unity開發過程中&#xff0c;角色移動、攝像機動畫、軌道系統、AI巡邏等功能中&#xff0c;路徑控制是常見又復雜的需求之一。如何優雅、高效地創建路徑并控制對象沿路徑運動&#xff0c;是游戲開發、動畫制作乃至工業仿真中的關鍵問題。 在這篇文章中&#xff0c;我將介紹一…

JAVA實戰開源項目:健身房管理系統 (Vue+SpringBoot) 附源碼

本文項目編號 T 180 &#xff0c;文末自助獲取源碼 \color{red}{T180&#xff0c;文末自助獲取源碼} T180&#xff0c;文末自助獲取源碼 目錄 一、系統介紹二、數據庫設計三、配套教程3.1 啟動教程3.2 講解視頻3.3 二次開發教程 四、功能截圖五、文案資料5.1 選題背景5.2 國內…

[人機交互]交互設計過程

*一.設計 1.1什么是設計 設計是一項創新活動&#xff0c;旨在為用戶提供可用的產品 –交互設計是“設計交互式產品、以支持人們的生活和工作” 1.2設計包含的四個活動 – 識別用戶的需要&#xff08; needs &#xff09;并建立需求&#xff08; requirements &…

1. 視頻基礎知識

1. 圖像基礎概念 像素&#xff1a;像素是一個圖片的基本單位&#xff0c;pix是英語單詞picture&#xff0c;加上英語單詞“元素element”&#xff0c;就得到了pixel&#xff0c;簡稱px。所以“像素”有“圖像元素”之意。分辨率&#xff1a;指的是圖像的大小或者尺寸。比如 19…

代理IP是什么,有什么用?

一、什么是代理IP&#xff1f; 簡單理解&#xff0c;代理IP是一座橋梁——你通過它連接到目標服務器&#xff0c;而不是直接暴露自己。這里的“IP”是網絡世界中的地址標簽&#xff0c;而代理IP在運行時&#xff0c;蹦跶到臺前&#xff0c;成為目標服務器看到的那個“地址”。…

日常代碼邏輯實現

日常代碼邏輯實現&#xff1a; 1.防抖 解釋&#xff1a; 防抖是指n秒內只執行一次&#xff0c;如果n秒內事件再次觸發&#xff0c;則重新計算時間 應用場景&#xff1a; 搜索框輸入聯想&#xff08;避免每次按鍵都發送請求&#xff09;窗口尺寸調整 代碼實現&#xff1a;…

北斗導航 | RTKLib中模糊度解算詳解,公式,代碼

模糊度解算 一、模糊度解算總體流程二、核心算法與公式推導1. **雙差模糊度定義**2. **浮點解方程**三、LAMBDA算法實現細節1. **降相關變換(Z-transform)**2. **整數最小二乘搜索**3. **Ratio檢驗**四、部分模糊度固定(Partial Ambiguity Resolution, PAR)1. **子集選擇策…

基于大模型的母嬰ABO血型不合溶血病全方位預測與診療方案研究

目錄 一、引言 1.1 研究背景與目的 1.2 國內外研究現狀 1.3 研究方法與創新點 二、母嬰 ABO 血型不合溶血病概述 2.1 發病機制 2.2 臨床表現 2.3 流行病學特征 三、大模型在母嬰 ABO 血型不合溶血病預測中的應用 3.1 模型選擇與構建 3.2 預測指標與數據輸入 3.3 模…

驅動-互斥鎖

互斥鎖可以說是“量值” 為 1 的 信號量&#xff0c; 最終實現的效果相同&#xff0c; 既然有了信號量&#xff0c; 那為什么還要有互斥鎖呢&#xff1f; 這就是我們這里需要了解并掌握的 文章目錄 參考資料互斥鎖的介紹互斥鎖結構體 - mutex互斥鎖 API互斥鎖實驗源碼程序-mute…

人工智能100問?第17問:智能體的定義及其基本特征?

目錄 一、通俗解釋 二、專業解析 三、權威參考 智能體是能夠通過傳感器感知環境、自主決策并借助執行器采取行動以實現特定目標的智能實體或系統。 一、通俗解釋 智能體就像一臺能自己“看、想、動”的智能機器。比如你手機里的語音助手&#xff0c;它能聽懂你說的話&…

Linux系統入門第十一章 --Shell編程之函數與數組

一、Shell函數 1、函數的用法 Shell函數可用于存放一系列的指令。在Shell腳本執行的過程中&#xff0c;函數被置于內存中&#xff0c;每次調用函數時不需要從硬盤讀取&#xff0c;因此運行的速度比較快。在Shell編程中函數并非是必須的元素&#xff0c;但使用函數可以對程序進…