python容易編輯,因此用pyrx代替rxjava3做演示會比較快捷。
pyrx安裝命令: pip install rx
一、Subject
(相當于 RxJava 的?PublishSubject
)
PublishSubject
PublishSubject?將對觀察者發送訂閱后產生的元素,而在訂閱前發出的元素將不會發送給觀察者。如果你希望觀察者接收到所有的元素,你可以通過使用?Observable
?的?create
?方法來創建?Observable
,或者使用?ReplaySubject。
如果源?Observable
?因為產生了一個?error
?事件而中止,?PublishSubject?就不會發出任何元素,而是將這個?error
?事件發送出來。
特性:
- 只發送訂閱后產生的事件,不保留歷史值。
- 新訂閱者只能收到訂閱后發射的元素。
適用場景:
實時數據流(如用戶輸入、網絡事件)。
示例代碼
from rx.subject import Subjectsubject = Subject()# 訂閱1在事件發射前訂閱
subject.subscribe(on_next=lambda value: print("訂閱1:", value),on_error=lambda error: print("錯誤:", error),on_completed=lambda: print("完成")
)subject.on_next("🐶") # 訂閱1收到: 🐶# 訂閱2在事件發射后訂閱
subject.subscribe(on_next=lambda value: print("訂閱2:", value),on_error=lambda error: print("錯誤:", error),on_completed=lambda: print("完成")
)subject.on_next("🐱") # 訂閱1收到: 🐱,訂閱2收到: 🐱
二、ReplaySubject
ReplaySubject
ReplaySubject?將對觀察者發送全部的元素,無論觀察者是何時進行訂閱的。
這里存在多個版本的?ReplaySubject,有的只會將最新的 n 個元素發送給觀察者,有的只會將限制時間段內最新的元素發送給觀察者。
如果把?ReplaySubject?當作觀察者來使用,注意不要在多個線程調用?onNext
,?onError
?或?onCompleted
。這樣會導致無序調用,將造成意想不到的結果。
特性:
- 緩存所有發射過的事件,新訂閱者會收到全部歷史事件。
- 可通過?
buffer_size
?參數限制緩存數量。
適用場景:
需要回放歷史數據的場景(如配置變更、初始化數據)。
示例代碼
from rx.subject import ReplaySubjectsubject = ReplaySubject(buffer_size=2) # 只緩存最近2個事件subject.on_next("🐶")
subject.on_next("🐱")
subject.on_next("🐭")# 訂閱時會收到緩存的最后2個事件: 🐱, 🐭
subject.subscribe(on_next=lambda value: print("訂閱1:", value))subject.on_next("🐹") # 訂閱1收到: 🐹
三、BehaviorSubject
BehaviorSubject
當觀察者對?BehaviorSubject?進行訂閱時,它會將源?Observable
?中最新的元素發送出來(如果不存在最新的元素,就發出默認元素)。然后將隨后產生的元素發送出來。
如果源?Observable
?因為產生了一個?error
?事件而中止,?BehaviorSubject?就不會發出任何元素,而是將這個?error
?事件發送出來。
特性:
- 緩存最后一個發射的事件,新訂閱者會立即收到該值。
- 創建時必須提供初始值。
適用場景:
狀態管理(如用戶登錄狀態、系統配置)。
示例代碼
from rx.subject import BehaviorSubjectsubject = BehaviorSubject("初始值")subject.on_next("🐶")# 訂閱時會收到最后一個值: 🐶
subject.subscribe(on_next=lambda value: print("訂閱1:", value))subject.on_next("🐱") # 訂閱1收到: 🐱
四、AsyncSubject
AsyncSubject
AsyncSubject?將在源?Observable
?產生完成事件后,發出最后一個元素(僅僅只有最后一個元素),如果源?Observable
?沒有發出任何元素,只有一個完成事件。那?AsyncSubject?也只有一個完成事件。
它會對隨后的觀察者發出最終元素。如果源?Observable
?因為產生了一個?error
?事件而中止,?AsyncSubject?就不會發出任何元素,而是將這個?error
?事件發送出來。
特性:
- 只發射最后一個事件,且僅在?
on_completed()
?后發射。 - 如果未調用?
on_completed()
,訂閱者不會收到任何值。
適用場景:
只關心最終結果的場景(如計算完成后的結果)。
示例代碼
from rx.subject import AsyncSubjectsubject = AsyncSubject()subject.subscribe(on_next=lambda value: print("訂閱1:", value),on_error=lambda error: print("錯誤:", error),on_completed=lambda: print("完成")
)subject.on_next("🐶")
subject.on_next("🐱")
subject.on_completed() # 訂閱1收到: 🐱(最后一個值)并觸發完成
五、對比表格
Subject 類型 | 歷史值處理 | 新訂閱者行為 | 觸發條件 |
---|---|---|---|
Subject | 不保留歷史值 | 只接收訂閱后的事件 | 無特殊條件 |
ReplaySubject | 緩存所有或部分歷史值 | 接收全部緩存的歷史事件 | 無特殊條件 |
BehaviorSubject | 緩存最后一個值 | 立即接收最后一個值 | 無特殊條件 |
AsyncSubject | 緩存最后一個值 | 僅在?on_completed() ?后接收 | 必須調用?on_completed() |
六、注意事項
-
內存管理:
ReplaySubject
?和?BehaviorSubject
?會持有歷史值,需注意避免內存泄漏。 -
線程安全:
RxPY 的?Subject
?默認非線程安全,多線程環境下需自行處理同步。 -
生命周期管理:
使用?dispose()
?方法釋放資源,避免不必要的事件處理。
rxjava3具體實例:
在引入rxjava3之前的寫法:通過監聽器,實現register、unregister,代碼邏輯臃腫、結構復雜、過一段時間之后自己寫的代碼都看起來很費勁。
引入rxjava3之后,activity、fragment、service之間解除了強耦合,代碼嵌套深度降低、邏輯交叉點減少,代碼清爽很多。
rx是響應式編程框架的集大成者,相當于應用內部的輕量級的ASMQ(高級消息隊列),前端是ui和邏輯分離的特點,需要大量的數據雙向多層傳遞。? 用rx可以從出發點直達終點,數據不需要層層傳遞,比如說原來的傳遞路徑是6層,你修改一次數據類,你就需要修改6個地方的代碼,用rx只需要修改前后緊挨著的2個數據管道之間的代碼。