Python10天突擊--Day 2: 實現觀察者模式

以下是 Python 實現觀察者模式的完整方案,包含同步/異步支持、類型注解、線程安全等特性:


1. 經典觀察者模式實現

from abc import ABC, abstractmethod
from typing import List, Anyclass Observer(ABC):"""觀察者抽象基類"""@abstractmethoddef update(self, subject: Any) -> None:passclass Subject:"""被觀察對象基類"""def __init__(self):self._observers: List[Observer] = []def attach(self, observer: Observer) -> None:if observer not in self._observers:self._observers.append(observer)def detach(self, observer: Observer) -> None:try:self._observers.remove(observer)except ValueError:passdef notify(self) -> None:"""同步通知所有觀察者"""for observer in self._observers:observer.update(self)# 使用示例
class TemperatureSensor(Subject):"""具體被觀察者:溫度傳感器"""def __init__(self):super().__init__()self._temperature = 0.0@propertydef temperature(self) -> float:return self._temperature@temperature.setterdef temperature(self, value: float) -> None:self._temperature = valueself.notify()  # 溫度變化時通知觀察者class Display(Observer):"""具體觀察者:顯示屏"""def update(self, subject: TemperatureSensor) -> None:print(f"當前溫度: {subject.temperature}°C")# 客戶端代碼
sensor = TemperatureSensor()
display = Display()
sensor.attach(display)sensor.temperature = 25.5  # 輸出: 當前溫度: 25.5°C

2. 線程安全增強版

import threading
from typing import List, Anyclass ThreadSafeSubject:"""線程安全的被觀察對象"""def __init__(self):self._observers: List[Observer] = []self._lock = threading.RLock()def attach(self, observer: Observer) -> None:with self._lock:if observer not in self._observers:self._observers.append(observer)def detach(self, observer: Observer) -> None:with self._lock:try:self._observers.remove(observer)except ValueError:passdef notify(self) -> None:"""線程安全的通知"""with self._lock:observers = self._observers.copy()for observer in observers:observer.update(self)

3. 異步觀察者模式

import asyncio
from abc import ABC, abstractmethod
from typing import List, Anyclass AsyncObserver(ABC):"""異步觀察者接口"""@abstractmethodasync def update(self, subject: Any) -> None:passclass AsyncSubject:"""支持異步通知的被觀察對象"""def __init__(self):self._observers: List[AsyncObserver] = []def attach(self, observer: AsyncObserver) -> None:if observer not in self._observers:self._observers.append(observer)async def notify(self) -> None:"""異步通知所有觀察者"""await asyncio.gather(*[observer.update(self) for observer in self._observers])# 使用示例
class AsyncTemperatureSensor(AsyncSubject):def __init__(self):super().__init__()self._temp = 0.0async def set_temperature(self, value: float) -> None:self._temp = valueawait self.notify()class CloudLogger(AsyncObserver):async def update(self, subject: AsyncTemperatureSensor) -> None:print(f"云端記錄溫度: {subject._temp}°C")await asyncio.sleep(0.1)  # 模擬網絡請求async def main():sensor = AsyncTemperatureSensor()sensor.attach(CloudLogger())await sensor.set_temperature(28.5)  # 輸出: 云端記錄溫度: 28.5°Casyncio.run(main())

4. 事件總線實現(發布-訂閱模式)

from typing import Dict, List, Callable, Any
import inspectclass EventBus:"""事件總線(高級觀察者模式)"""_instance = Nonedef __new__(cls):if not cls._instance:cls._instance = super().__new__(cls)cls._instance._subscriptions: Dict[str, List[Callable]] = {}return cls._instancedef subscribe(self, event_type: str, callback: Callable) -> None:if not inspect.iscoroutinefunction(callback):callback = self._sync_to_async(callback)if event_type not in self._subscriptions:self._subscriptions[event_type] = []self._subscriptions[event_type].append(callback)async def publish(self, event_type: str, **data) -> None:if event_type in self._subscriptions:await asyncio.gather(*[callback(**data) for callback in self._subscriptions[event_type]])@staticmethoddef _sync_to_async(func: Callable) -> Callable:async def wrapper(*args, **kwargs):return func(*args, **kwargs)return wrapper# 使用示例
bus = EventBus()@bus.subscribe("temperature_change")
async def log_temp_change(value: float):print(f"溫度變化記錄: {value}°C")async def trigger_events():await bus.publish("temperature_change", value=30.0)asyncio.run(trigger_events())  # 輸出: 溫度變化記錄: 30.0°C

5. 帶過濾器的觀察者模式

from typing import Callable, Anyclass FilteredObserver:"""帶條件過濾的觀察者"""def __init__(self, callback: Callable, filter_condition: Callable[[Any], bool]):self.callback = callbackself.filter = filter_conditiondef update(self, subject: Any) -> None:if self.filter(subject):self.callback(subject)# 使用示例
sensor = TemperatureSensor()def alert(temp: float):print(f"警報!當前溫度過高: {temp}°C")# 只接收溫度>30的通知
high_temp_observer = FilteredObserver(callback=alert,filter_condition=lambda s: s.temperature > 30
)sensor.attach(high_temp_observer)
sensor.temperature = 25  # 無輸出
sensor.temperature = 35  # 輸出: 警報!當前溫度過高: 35°C

方案對比

實現方式特點適用場景
經典實現簡單直接單線程簡單場景
線程安全版避免競態條件多線程環境
異步實現非阻塞通知I/O密集型應用
事件總線松耦合,支持多對多復雜事件系統
過濾觀察者條件觸發需要選擇性通知的場景

最佳實踐建議

  1. 生命周期管理

    # 使用上下文管理器自動取消注冊
    class ObserverContext:def __init__(self, subject: Subject, observer: Observer):self.subject = subjectself.observer = observerdef __enter__(self):self.subject.attach(self.observer)return selfdef __exit__(self, *args):self.subject.detach(self.observer)with ObserverContext(sensor, display):sensor.temperature = 20
    
  2. 性能優化

    • 對于高頻事件,考慮使用弱引用(weakref.WeakSet
    • 批量通知時使用@dataclass封裝事件數據
  3. 異常處理

    def safe_notify(self):for observer in self._observers:try:observer.update(self)except Exception as e:print(f"Observer failed: {e}")
    
  4. 與Python生態集成

    • 使用PyPubSub等現成庫
    • 結合asyncio.Queue實現生產者-消費者模式

根據項目復雜度選擇合適實現,簡單場景用經典模式即可,分布式系統建議使用事件總線架構。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/75706.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/75706.shtml
英文地址,請注明出處:http://en.pswp.cn/web/75706.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

CST1019.基于Spring Boot+Vue智能洗車管理系統

計算機/JAVA畢業設計 【CST1019.基于Spring BootVue智能洗車管理系統】 【項目介紹】 智能洗車管理系統,基于 Spring Boot Vue 實現,功能豐富、界面精美 【業務模塊】 系統共有三類用戶,分別是:管理員用戶、普通用戶、工人用戶&…

Windows上使用Qt搭建ARM開發環境

在 Windows 上使用 Qt 和 g++-arm-linux-gnueabihf 進行 ARM Linux 交叉編譯(例如針對樹莓派或嵌入式設備),需要配置 交叉編譯工具鏈 和 Qt for ARM Linux。以下是詳細步驟: 1. 安裝工具鏈 方法 1:使用 MSYS2(推薦) MSYS2 提供 mingw-w64 的 ARM Linux 交叉編譯工具鏈…

Python爬蟲教程011:scrapy爬取當當網數據開啟多條管道下載及下載多頁數據

文章目錄 3.6.4 開啟多條管道下載3.6.5 下載多頁數據3.6.6 完整項目下載3.6.4 開啟多條管道下載 在pipelines.py中新建管道類(用來下載圖書封面圖片): # 多條管道開啟 # 要在settings.py中開啟管道 class DangdangDownloadPipeline:def process_item(self, item, spider):…

Mysql -- 基礎

SQL SQL通用語法: SQL分類: DDL: 數據庫操作 查詢: SHOW DATABASES; 創建: CREATE DATABASE[IF NOT EXISTS] 數據庫名 [DEFAULT CHARSET字符集] [COLLATE 排序規則]; 刪除: DROP DATABA…

實操(環境變量)Linux

環境變量概念 我們用語言寫的文件編好后變成了程序,./ 運行的時候他就會變成一個進程被操作系統調度并運行,運行完畢進程相關資源被釋放,因為它是一個bash的子進程,所以它退出之后進入僵尸狀態,bash回收他的退出結果&…

torch.cat和torch.stack的區別

torch.cat 和 torch.stack 是 PyTorch 中用于組合張量的兩個常用函數,它們的核心區別在于輸入張量的維度和輸出張量的維度變化。以下是詳細對比: 1. torch.cat (Concatenate) 作用:沿現有維度拼接多個張量,不創建新維度 輸入要求…

深入解析@Validated注解:Spring 驗證機制的核心工具

一、注解出處與核心定位 1. 注解來源 ? 所屬框架:Validated 是 Spring Framework 提供的注解(org.springframework.validation.annotation 包下)。 ? 核心定位: 作為 Spring 對 JSR-380(Bean Validation 2.0&#…

2025年認證杯數學建模競賽A題完整分析論文(含模型、可運行代碼)(共32頁)

2025年認證杯數學建模競賽A題完整分析論文 目錄 摘要 一、問題分析 二、問題重述 三、模型假設 四、 模型建立與求解 4.1問題1 4.1.1問題1解析 4.1.2問題1模型建立 4.1.3問題1樣例代碼(僅供參考) 4.1.4問題1求解結果分析&#xff08…

Google A2A協議,是為了戰略性占領標準?

一、導讀 2025 年 4 月 9 日,Google 正式發布了 Agent2Agent(A2A)協議。 A2A 協議致力于打破智能體之間的隔閡,讓它們能夠跨越框架和供應商的限制,以一種標準化、開放的方式進行溝通與協作 截止到現在,代…

Ansible:roles角色

文章目錄 Roles角色Ansible Roles目錄編排Roles各目錄作用創建 roleplaybook調用角色調用角色方法1:調用角色方法2:調用角色方法3: roles 中 tags 使用實戰案例 Roles角色 角色是ansible自1.2版本引入的新特性,用于層次性、結構化…

MCU的USB接口作為 USB CDC串口輸出

引用: https://microchip-mplab-harmony.github.io/usb_apps_device/apps/usb_uart_bridge_dual/readme.html STM32 USB使用記錄:使用CDC類虛擬串口(VCP)進行通訊_stm32 usb使用記錄:使用cdc類虛擬串口(vcp)進行通訊-CSDN博客 前…

深度解析強化學習:原理、算法與實戰

深度解析強化學習:原理、算法與實戰 0. 前言1. 強化學習基礎1.1 基本概念1.2 馬爾科夫決策過程1.3 目標函數1.4 智能體學習過程2. 計算狀態值3. 計算狀態-動作值4. Q 學習4.1 Q 值4.2 使用 Q 學習進行 frozen lake 游戲4.3. frozen lake 問題4.4 實現 Q 學習小結系列鏈接0. 前…

UE5藍圖之間的通信------接口

一、創建藍圖接口 二、雙擊創建的藍圖接口,添加函數,并重命名新函數。 三、在一個藍圖(如玩家角色藍圖)中實現接口,如下圖: 步驟一:點擊類設置 步驟二:在細節面板已經實現的接口中…

2025 年“認證杯”數學中國數學建模網絡挑戰賽 A題 小行星軌跡預測

近地小行星( Near Earth Asteroids, NEAs )是軌道相對接近地球的小行 星,它的正式定義為橢圓軌道的近日距不大于 1.3 天文單位( AU )的小行星。 其中軌道與地球軌道最近距離小于 0.05A 且直徑大于 140 米的小行星被…

Axure中繼器(Repeater): 列表多選和 列表查詢

文章目錄 引言I 列表多選添加選中交互事件添加未選中交互事件II 列表查詢知識點操作說明引言 基于鼠標點擊交互事件實現列表多選列表查詢 I 列表多選 添加選中交互事件 給列標題第一列多選框元件命名為ckeck,并同時添加選中交互事件; 同步添加設置選擇/選中動作,目標元件選…

windows11下pytorch(cpu)安裝

先裝anaconda 見最下方 Pytorch 官網:PyTorch 找到下圖(不要求版本一樣)(我的電腦是集顯(有navdia的裝gpu),裝cpu) 查看已有環境列表 創建環境 conda create –n 虛擬環境名字(…

最新版IDEA超詳細圖文安裝教程(適用Mac系統)附安裝包及補丁2025最新教程

目錄 前言 一、IDEA最新版下載 二、IDEA安裝 三、IDEA補丁 前言 IDEA(IntelliJ IDEA)是專為Java語言設計的集成開發環境(IDE),由JetBrains公司開發,被公認為業界最優秀的Java開發工具之一。DEA全稱Int…

react從零開始的基礎課1

全文約5萬字。 1.hello,.. // App.jsx import { useState } from react import reactLogo from ./assets/react.svg import viteLogo from /vite.svg import ./App.cssfunction App() {const [count, setCount] useState(0)return (<><Greeting name"world&qu…

【linux知識】web服務環境搭建(一):用戶以及開發環境初始化

toc 創建用戶組以及用戶 以下是 創建用戶組 wendao 和用戶 wendao 并指定 GID、UID 及家目錄 的完整操作指南&#xff1a; 一、創建用戶組&#xff08;指定 GID&#xff09; sudo groupadd -g 1500 wendao # 創建組并指定 GID 為 1500? 注意&#xff1a;GID 需唯一&#…

音視頻 五 看書的筆記 MediaCodec

MediaCodec 用于訪問底層媒體編解碼器框架&#xff0c;編解碼組件。通常與MediaExtractor(解封裝,例如Mp4文件分解成 video和audio)、MediaSync、MediaMuxer(封裝 例如音視頻合成Mp4文件)、MediaCrypto、Image(cameraX 回調的ImageReader對象可以獲取到Image幀圖像,可轉換成YU…