PublishSubject、ReplaySubject、BehaviorSubject、AsyncSubject的區別

python容易編輯,因此用pyrx代替rxjava3做演示會比較快捷。

pyrx安裝命令: pip install rx

一、Subject(相當于 RxJava 的?PublishSubject

PublishSubject

PublishSubject?將對觀察者發送訂閱后產生的元素,而在訂閱前發出的元素將不會發送給觀察者。如果你希望觀察者接收到所有的元素,你可以通過使用?Observable?的?create?方法來創建?Observable,或者使用?ReplaySubject。

如果源?Observable?因為產生了一個?error?事件而中止,?PublishSubject?就不會發出任何元素,而是將這個?error?事件發送出來。

特性

  • 只發送訂閱后產生的事件,不保留歷史值。
  • 新訂閱者只能收到訂閱后發射的元素。

適用場景
實時數據流(如用戶輸入、網絡事件)。

示例代碼
from rx.subject import Subjectsubject = Subject()# 訂閱1在事件發射前訂閱
subject.subscribe(on_next=lambda value: print("訂閱1:", value),on_error=lambda error: print("錯誤:", error),on_completed=lambda: print("完成")
)subject.on_next("🐶")  # 訂閱1收到: 🐶# 訂閱2在事件發射后訂閱
subject.subscribe(on_next=lambda value: print("訂閱2:", value),on_error=lambda error: print("錯誤:", error),on_completed=lambda: print("完成")
)subject.on_next("🐱")  # 訂閱1收到: 🐱,訂閱2收到: 🐱

二、ReplaySubject

ReplaySubject

ReplaySubject?將對觀察者發送全部的元素,無論觀察者是何時進行訂閱的。

這里存在多個版本的?ReplaySubject,有的只會將最新的 n 個元素發送給觀察者,有的只會將限制時間段內最新的元素發送給觀察者。

如果把?ReplaySubject?當作觀察者來使用,注意不要在多個線程調用?onNext,?onError?或?onCompleted。這樣會導致無序調用,將造成意想不到的結果。

特性

  • 緩存所有發射過的事件,新訂閱者會收到全部歷史事件。
  • 可通過?buffer_size?參數限制緩存數量。

適用場景
需要回放歷史數據的場景(如配置變更、初始化數據)。

示例代碼
from rx.subject import ReplaySubjectsubject = ReplaySubject(buffer_size=2)  # 只緩存最近2個事件subject.on_next("🐶")
subject.on_next("🐱")
subject.on_next("🐭")# 訂閱時會收到緩存的最后2個事件: 🐱, 🐭
subject.subscribe(on_next=lambda value: print("訂閱1:", value))subject.on_next("🐹")  # 訂閱1收到: 🐹

三、BehaviorSubject

BehaviorSubject

當觀察者對?BehaviorSubject?進行訂閱時,它會將源?Observable?中最新的元素發送出來(如果不存在最新的元素,就發出默認元素)。然后將隨后產生的元素發送出來。

如果源?Observable?因為產生了一個?error?事件而中止,?BehaviorSubject?就不會發出任何元素,而是將這個?error?事件發送出來。

特性

  • 緩存最后一個發射的事件,新訂閱者會立即收到該值。
  • 創建時必須提供初始值。

適用場景
狀態管理(如用戶登錄狀態、系統配置)。

示例代碼
from rx.subject import BehaviorSubjectsubject = BehaviorSubject("初始值")subject.on_next("🐶")# 訂閱時會收到最后一個值: 🐶
subject.subscribe(on_next=lambda value: print("訂閱1:", value))subject.on_next("🐱")  # 訂閱1收到: 🐱

四、AsyncSubject

AsyncSubject

AsyncSubject?將在源?Observable?產生完成事件后,發出最后一個元素(僅僅只有最后一個元素),如果源?Observable?沒有發出任何元素,只有一個完成事件。那?AsyncSubject?也只有一個完成事件。

它會對隨后的觀察者發出最終元素。如果源?Observable?因為產生了一個?error?事件而中止,?AsyncSubject?就不會發出任何元素,而是將這個?error?事件發送出來。

特性

  • 只發射最后一個事件,且僅在?on_completed()?后發射。
  • 如果未調用?on_completed(),訂閱者不會收到任何值。

適用場景
只關心最終結果的場景(如計算完成后的結果)。

示例代碼
from rx.subject import AsyncSubjectsubject = AsyncSubject()subject.subscribe(on_next=lambda value: print("訂閱1:", value),on_error=lambda error: print("錯誤:", error),on_completed=lambda: print("完成")
)subject.on_next("🐶")
subject.on_next("🐱")
subject.on_completed()  # 訂閱1收到: 🐱(最后一個值)并觸發完成

五、對比表格

Subject 類型歷史值處理新訂閱者行為觸發條件
Subject不保留歷史值只接收訂閱后的事件無特殊條件
ReplaySubject緩存所有或部分歷史值接收全部緩存的歷史事件無特殊條件
BehaviorSubject緩存最后一個值立即接收最后一個值無特殊條件
AsyncSubject緩存最后一個值僅在?on_completed()?后接收必須調用?on_completed()

六、注意事項

  1. 內存管理
    ReplaySubject?和?BehaviorSubject?會持有歷史值,需注意避免內存泄漏。

  2. 線程安全
    RxPY 的?Subject?默認非線程安全,多線程環境下需自行處理同步。

  3. 生命周期管理
    使用?dispose()?方法釋放資源,避免不必要的事件處理。

rxjava3具體實例:

在引入rxjava3之前的寫法:通過監聽器,實現register、unregister,代碼邏輯臃腫、結構復雜、過一段時間之后自己寫的代碼都看起來很費勁。

引入rxjava3之后,activity、fragment、service之間解除了強耦合,代碼嵌套深度降低、邏輯交叉點減少,代碼清爽很多。

rx是響應式編程框架的集大成者,相當于應用內部的輕量級的ASMQ(高級消息隊列),前端是ui和邏輯分離的特點,需要大量的數據雙向多層傳遞。? 用rx可以從出發點直達終點,數據不需要層層傳遞,比如說原來的傳遞路徑是6層,你修改一次數據類,你就需要修改6個地方的代碼,用rx只需要修改前后緊挨著的2個數據管道之間的代碼。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/82594.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/82594.shtml
英文地址,請注明出處:http://en.pswp.cn/web/82594.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

BLE中心與外圍設備MTU協商過程詳解

一、MTU基礎概念?? 1. ??MTU定義?? ??最大傳輸單元(MTU)?? 指單次數據傳輸中允許的最大字節數,包含協議頭部(3字節)和有效載荷(最多517字節)。BLE默認MTU為??23字節??&a…

【華為云Astro-服務編排】服務編排使用全攻略

目錄 概述 為什么使用服務編排 服務編排基本能力 拖拉拽式編排流程 邏輯處理 對象處理 服務單元組合腳本、原生服務、BO、第三方服務 服務編排與模塊間調用關系 腳本 對象 標準頁面 BPM API接口 BO 連接器 如何創建服務編排 創建服務編排 如何開發服務編排 服…

centos實現SSH遠程登錄

1. 生成SSH密鑰對 首先,你需要在客戶端機器上生成一個SSH密鑰對。打開終端,執行以下命令 ssh-keygen 或ssh-keygen -t rsa -b 2048(效果相同) 按照提示操作,可以按回車鍵接受默認的文件名(通常是~/.ssh/id_…

定制開發開源AI智能名片S2B2C商城小程序在無界零售中的應用與行業智能升級示范研究

摘要:本文聚焦無界零售背景下京東從零售產品提供者向零售基礎設施提供者的轉變,探討定制開發開源AI智能名片S2B2C商城小程序在這一轉變中的應用。通過分析該小程序在商業運營成本降低、效率提升、用戶體驗優化等方面的作用,以及其與京東AI和馮…

ZooKeeper 安裝教程(Windows + Linux 雙平臺)

ZooKeeper 安裝教程(Windows + Linux 雙平臺) Zookeeper 和 Kafka 版本與 JDK 要求 一、安裝前準備 系統要求 Java 環境(JDK17+)開放端口:2181(客戶端),2888(集群通信),3888(選舉)安裝 Java Linux(Ubuntu/CentOS) # Ubuntu

【Git系列】如何同步原始倉庫的更新到你的fork倉庫?

🎉🎉🎉歡迎來到我們的博客!無論您是第一次訪問,還是我們的老朋友,我們都由衷地感謝您的到來。無論您是來尋找靈感、獲取知識,還是單純地享受閱讀的樂趣,我們都希望您能在這里找到屬于…

Could not obtain transaction-synchronized Session for current thread

背景 寫了一個函數,分別支持手動調用和定時任務調用。 測試的時候一直用手動點擊按鈕觸發函數,功能可用 等到了測試定時任務的時候,后臺報錯 Could not obtain transaction-synchronized Session for current thread錯誤分析 事務管理不匹…

linux nm/objdump/readelf/addr2line命令詳解

我們在開發過程中通過需要反匯編查看問題,那么我們這里使用rk3568開發板來舉例nm/objdump/readelf/addr2line 分析動態庫和可執行文件以及.o文件。 1,我們舉例nm/objdump/readelf/addr2line解析linux 內核文件vmlinux (1),addr2…

C++自定義簡單的內存池

內存池簡述 在C的STL的容器中的容器如vector、deque等用的默認分配器(allocator)都是從直接從系統的堆中申請內存,用一點申請一點,效率極低。這就是設計內存池的意義,所謂內存池,就是一次性向系統申請一大片內存(預分…

【極客日常】分享go開發中wire和interface配合的一些經驗

在先前一篇文章中,筆者給大家提到了go語言后端編程可以用wire依賴注入模塊去簡化單例服務的初始化,同時也可以解決服務單例之間復雜依賴的問題。但實事求是來講,用wire也是有一些學習成本的,wire在幫助解決復雜依賴的問題同時&…

20250605車充安服務器受木馬攻擊導致服務不可用

https://mp.weixin.qq.com/s/2JyxmDIDBa9_owNjIJ6UIg 因業務服務器受木馬攻擊,服務器網絡資源損耗,業務負載能力受損

web3-虛擬合約 vs 現實合同:權利、義務與資產的鏈上新秩序

web3-虛擬合約 vs 現實合同:權利、義務與資產的鏈上新秩序 一、智能合約vs真實世界合約 傳統合約:基礎要素 如下圖,現實世界的合約,會有一個條款,然后下面還有一個“Alice”的簽名 提出合約和接受合約; …

【面經分享】京東

線程池核心參數 7 個參數。 coreSize maxSize 阻塞隊列 時間 時間 線程工廠 拒絕策略 核心參數的話,有 coreSize、阻塞隊列、拒絕策略。 JVM 組成 內存上劃分: 線程私有:Java 虛擬機棧,本地方法棧、Tlab、程序計數器 …

工作流引擎-11-開源 BPM 項目 jbpm

工作流引擎系列 工作流引擎-00-流程引擎概覽 工作流引擎-01-Activiti 是領先的輕量級、以 Java 為中心的開源 BPMN 引擎,支持現實世界的流程自動化需求 工作流引擎-02-BPM OA ERP 區別和聯系 工作流引擎-03-聊一聊流程引擎 工作流引擎-04-流程引擎 activiti 優…

深度學習在非線性場景中的核心應用領域及向量/張量數據處理案例,結合工業、金融等領域的實際落地場景分析

一、工業場景:非線性缺陷檢測與預測 1. ?半導體晶圓缺陷檢測? ?問題?:微米級劃痕、顆粒污染等缺陷形態復雜,與正常紋理呈非線性關系。?解決方案?: ?輸入張量?:高分辨率晶圓圖像 → 三維張量 (Batch, Height,…

Python-線程同步

多線程 案例 說明: 唱歌方法 sing()跳舞方法 dance()啟用兩個線程調用主線程結束 代碼 # 導入線程模塊 import threading import timedef sing(name,age):time.sleep(2)print(唱歌者姓名: name ,年齡: str(age))print(正在唱…

前端八股之JS的原型鏈

1.原型的定義 每一個對象從被創建開始就和另一個對象關聯,從另一個對象上繼承其屬性,這個另一個對象就是 原型。 當訪問一個對象的屬性時,先在對象的本身找,找不到就去對象的原型上找,如果還是找不到,就去…

kafka命令

kafka安裝先安裝zookeeper,jdk 確保jdk版本與kafka版本匹配: 先啟動zookeeper: # 啟動獨立安裝的zookeeper ./zkServer.sh start # 也可以自動kafka自帶的zookerper ./zookeeper-server-start.sh ../config/zookeeper.pr…

微服務面試(分布式事務、注冊中心、遠程調用、服務保護)

1.分布式事務 分布式事務,就是指不是在單個服務或單個數據庫架構下,產生的事務,例如: 跨數據源的分布式事務跨服務的分布式事務綜合情況 我們之前解決分布式事務問題是直接使用Seata框架的AT模式,但是解決分布式事務…

Linux --進程優先級

概念 什么是進程優先級,為什么需要進程優先級,怎么做到進程優先級這是本文需要解釋清楚的。 優先級的本質其實就是排隊,為了去爭奪有限的資源,比如cpu的調度。cpu資源分配的先后性就是指進程的優先級。優先級高的進程有優先執行的…