以下是 Python 實現觀察者模式的完整方案,包含同步/異步支持、類型注解、線程安全等特性:
1. 經典觀察者模式實現
from abc import ABC, abstractmethod
from typing import List, Anyclass Observer(ABC):"""觀察者抽象基類"""@abstractmethoddef update(self, subject: Any) -> None:passclass Subject:"""被觀察對象基類"""def __init__(self):self._observers: List[Observer] = []def attach(self, observer: Observer) -> None:if observer not in self._observers:self._observers.append(observer)def detach(self, observer: Observer) -> None:try:self._observers.remove(observer)except ValueError:passdef notify(self) -> None:"""同步通知所有觀察者"""for observer in self._observers:observer.update(self)# 使用示例
class TemperatureSensor(Subject):"""具體被觀察者:溫度傳感器"""def __init__(self):super().__init__()self._temperature = 0.0@propertydef temperature(self) -> float:return self._temperature@temperature.setterdef temperature(self, value: float) -> None:self._temperature = valueself.notify() # 溫度變化時通知觀察者class Display(Observer):"""具體觀察者:顯示屏"""def update(self, subject: TemperatureSensor) -> None:print(f"當前溫度: {subject.temperature}°C")# 客戶端代碼
sensor = TemperatureSensor()
display = Display()
sensor.attach(display)sensor.temperature = 25.5 # 輸出: 當前溫度: 25.5°C
2. 線程安全增強版
import threading
from typing import List, Anyclass ThreadSafeSubject:"""線程安全的被觀察對象"""def __init__(self):self._observers: List[Observer] = []self._lock = threading.RLock()def attach(self, observer: Observer) -> None:with self._lock:if observer not in self._observers:self._observers.append(observer)def detach(self, observer: Observer) -> None:with self._lock:try:self._observers.remove(observer)except ValueError:passdef notify(self) -> None:"""線程安全的通知"""with self._lock:observers = self._observers.copy()for observer in observers:observer.update(self)
3. 異步觀察者模式
import asyncio
from abc import ABC, abstractmethod
from typing import List, Anyclass AsyncObserver(ABC):"""異步觀察者接口"""@abstractmethodasync def update(self, subject: Any) -> None:passclass AsyncSubject:"""支持異步通知的被觀察對象"""def __init__(self):self._observers: List[AsyncObserver] = []def attach(self, observer: AsyncObserver) -> None:if observer not in self._observers:self._observers.append(observer)async def notify(self) -> None:"""異步通知所有觀察者"""await asyncio.gather(*[observer.update(self) for observer in self._observers])# 使用示例
class AsyncTemperatureSensor(AsyncSubject):def __init__(self):super().__init__()self._temp = 0.0async def set_temperature(self, value: float) -> None:self._temp = valueawait self.notify()class CloudLogger(AsyncObserver):async def update(self, subject: AsyncTemperatureSensor) -> None:print(f"云端記錄溫度: {subject._temp}°C")await asyncio.sleep(0.1) # 模擬網絡請求async def main():sensor = AsyncTemperatureSensor()sensor.attach(CloudLogger())await sensor.set_temperature(28.5) # 輸出: 云端記錄溫度: 28.5°Casyncio.run(main())
4. 事件總線實現(發布-訂閱模式)
from typing import Dict, List, Callable, Any
import inspectclass EventBus:"""事件總線(高級觀察者模式)"""_instance = Nonedef __new__(cls):if not cls._instance:cls._instance = super().__new__(cls)cls._instance._subscriptions: Dict[str, List[Callable]] = {}return cls._instancedef subscribe(self, event_type: str, callback: Callable) -> None:if not inspect.iscoroutinefunction(callback):callback = self._sync_to_async(callback)if event_type not in self._subscriptions:self._subscriptions[event_type] = []self._subscriptions[event_type].append(callback)async def publish(self, event_type: str, **data) -> None:if event_type in self._subscriptions:await asyncio.gather(*[callback(**data) for callback in self._subscriptions[event_type]])@staticmethoddef _sync_to_async(func: Callable) -> Callable:async def wrapper(*args, **kwargs):return func(*args, **kwargs)return wrapper# 使用示例
bus = EventBus()@bus.subscribe("temperature_change")
async def log_temp_change(value: float):print(f"溫度變化記錄: {value}°C")async def trigger_events():await bus.publish("temperature_change", value=30.0)asyncio.run(trigger_events()) # 輸出: 溫度變化記錄: 30.0°C
5. 帶過濾器的觀察者模式
from typing import Callable, Anyclass FilteredObserver:"""帶條件過濾的觀察者"""def __init__(self, callback: Callable, filter_condition: Callable[[Any], bool]):self.callback = callbackself.filter = filter_conditiondef update(self, subject: Any) -> None:if self.filter(subject):self.callback(subject)# 使用示例
sensor = TemperatureSensor()def alert(temp: float):print(f"警報!當前溫度過高: {temp}°C")# 只接收溫度>30的通知
high_temp_observer = FilteredObserver(callback=alert,filter_condition=lambda s: s.temperature > 30
)sensor.attach(high_temp_observer)
sensor.temperature = 25 # 無輸出
sensor.temperature = 35 # 輸出: 警報!當前溫度過高: 35°C
方案對比
實現方式 | 特點 | 適用場景 |
---|---|---|
經典實現 | 簡單直接 | 單線程簡單場景 |
線程安全版 | 避免競態條件 | 多線程環境 |
異步實現 | 非阻塞通知 | I/O密集型應用 |
事件總線 | 松耦合,支持多對多 | 復雜事件系統 |
過濾觀察者 | 條件觸發 | 需要選擇性通知的場景 |
最佳實踐建議
-
生命周期管理:
# 使用上下文管理器自動取消注冊 class ObserverContext:def __init__(self, subject: Subject, observer: Observer):self.subject = subjectself.observer = observerdef __enter__(self):self.subject.attach(self.observer)return selfdef __exit__(self, *args):self.subject.detach(self.observer)with ObserverContext(sensor, display):sensor.temperature = 20
-
性能優化:
- 對于高頻事件,考慮使用弱引用(
weakref.WeakSet
) - 批量通知時使用
@dataclass
封裝事件數據
- 對于高頻事件,考慮使用弱引用(
-
異常處理:
def safe_notify(self):for observer in self._observers:try:observer.update(self)except Exception as e:print(f"Observer failed: {e}")
-
與Python生態集成:
- 使用
PyPubSub
等現成庫 - 結合
asyncio.Queue
實現生產者-消費者模式
- 使用
根據項目復雜度選擇合適實現,簡單場景用經典模式即可,分布式系統建議使用事件總線架構。