目錄
- 一、概述
- 1.1 動機
- 1.2 核心思想
- 1.3 別名
- 二、角色與實現原理
- 2.1 角色
- 2.2 實現原理
- 2.3 類圖
- 三、經典接口實現
- 3.1 示例
- 3.1.1 觀察者接口
- 3.1.2 目標接口
- 3.1.3 具體被觀察者
- 3.1.4 具體觀察者
- 3.1.5 Client
- 3.1.6 UML時序圖
- 3.2 特點
- 四、其他實現方式
- 4.1 委托與事件(.NET 原生實現)
- 4.1.1 示例
- 4.1.2 UML類圖
- 4.1.3 特點
- 4.1.4 適用場景
- 4.2 IObservable<T> 和 IObserver<T> 接口
- 4.2.1 接口概述
- 4.2.1.1 被觀察者接口 : IObservable<out T>
- 4.2.1.2 觀察者接口 : IObserver<in T>
- 4.2.2 示例
- 4.2.2.1 具體被觀察者Subject:實現 IObservable<T>
- 4.2.2.1.1 訂閱管理 (`Subscribe` 方法)
- 4.2.2.1.2 取消訂閱 (`Unsubscriber`類)
- 4.2.2.1.3 狀態通知 (`NotifyObservers` 方法)
- 4.2.2.1.4 完成與錯誤通知 (`OnCompleted` 和 `OnError`)
- 4.2.2.1.5 線程安全設計
- 4.2.2.1.6 全部代碼
- 4.2.3 UML類圖
- 4.2.4 擴展內容
- 4.2.4.1 異步通知
- 4.2.4.2 事件過濾
- 4.2.5 特點
- 4.2.6 適用場景
- 4.3 System.Reactive
- 4.3.1 安裝
- 4.3.2 示例
- 4.3.3 特點
- 4.3.4 適用場景
- 五、使用場景
- 六、擴展
- 6.1 初始化和基礎架構搭建
- 6.1.1 初始化和基礎架構搭建
- 6.1.1.1 一個觀察者觀察多個目標
- 6.1.1.2 **目標多而觀察者少**
- 6.1.1.3 **目標對象之間存在復雜依賴關系**
- 6.2 注冊機制
- 6.2.1 問題
- 6.2.2 解決方案
- 6.2.2.1 實現思路
- 6.2.2.2 示例
- 6.2.2.3 優點
- 6.2.2.4 適用場景
- 6.3 觸發機制
- 6.3.1 觸發之前
- 6.3.2 更新的觸發者
- 6.3.3 示例
- 6.3.3.1 抽象目標對象
- 6.3.3.2 觀察者接口
- 6.3.3.3 具體的的觀察者
- 6.3.3.4 自動觸發的目標對象
- 6.3.3.5 手動觸發的目標對象
- 6.3.3.6 使用示例
- 6.4 **信息傳遞機制**
- 6.4.1 解決方案與實現方式
- 6.4.1.1 推模型(Push Model)
- 6.4.1.2 拉模型(Pull Model)
- 6.4.1.3 設計對比與權衡
- 6.4.1.4 最佳實踐建議
- 6.5 資源管理和錯誤處理
-
參考
設計模式:可復用面向對象軟件的基礎(典藏版) - 埃里克·伽瑪 - 微信讀書
第七章:C#響應式編程System.Reactive - 平元兄 - 博客園
ReactiveX · ReactiveX文檔中文翻譯
-
項目地址
NitasDemo/10DesignPatterns/DesignPatterns/ObserverPattern at main · Nita121388/NitasDemo
一、概述
觀察者模式是一種行為設計模式
,用于定義對象之間的一對多依賴關系。當一個對象(被觀察者)的狀態發生改變時,所有依賴于它的對象(觀察者)都會自動得到通知并更新。
1.1 動機
將一個系統分割成一系列相互協作的類有一個常見的副作用:需要維護相關對象間的一致性。我們不希望為了維持一致性而使各類緊密耦合,因為這樣降低了其可復用性。
1.2 核心思想
- 解耦:將被觀察者與觀察者解耦,使它們之間通過接口或事件機制交互。
- 自動通知:當被觀察者狀態改變時,自動通知所有觀察者。
1.3 別名
-
依賴模式:強調對象間的依賴關系。
-
發布–訂閱模式:發布者將消息發送給多個訂閱者。
-
模型-視圖模式:模型變化時,視圖自動更新。
-
源-監聽器模式:事件源觸發事件后通知所有監聽器。
-
從屬者模式:從屬者依賴于其他對象的狀態變化。
這些別名都體現了觀察者模式的核心思想:定義對象間的一對多依賴關系,實現狀態變化的自動通知。
二、角色與實現原理
2.1 角色
- Subject(目標/主題/被觀察者)
- 維護一個觀察者列表,允許觀察者訂閱或取消訂閱。
- 當狀態改變時,通知所有觀察者。
- Observer(觀察者)
- 觀察者將對觀察目標的改變做出反應。觀察者一般定義為接口,該接口聲明了更新數據的方法update(?)?,因此又稱為抽象觀察者。
- ConcreteSubject(具體被觀察者)
- 實現Subject接口,維護自身狀態,并在狀態改變時通知觀察者。
- ConcreteObserver(具體觀察者)
- 實現Observer接口,根據被觀察者的狀態改變做出相應反應。
2.2 實現原理
- 被觀察者維護觀察者列表
- 被觀察者類中包含一個觀察者列表,用于存儲所有訂閱的觀察者對象。
- 被觀察者類可以注冊與注銷觀察者,提供方法允許觀察者對象注冊到被觀察者列表中,或從列表中注銷。
- 通知機制
- 當被觀察者狀態改變時,遍歷觀察者列表,調用每個觀察者的更新方法。
2.3 類圖
三、經典接口實現
Gang of Four(GoF)是指四位著名軟件設計模式專家(Erich Gamma、Richard Helm、Ralph Johnson 和 John Vlissides)在1994年出版的《設計模式》。
GoF模式本質*:通過接口規范化觀察者模式中的角色職責,強調設計契約優先,適用于需要長期維護、高可擴展性的復雜系統架構設計。*
核心思想:通過顯式接口定義觀察者和主題的關系。
3.1 示例
3.1.1 觀察者接口
// 觀察者接口
public interface IObserver
{void Update(string message);
}
3.1.2 目標接口
// 目標接口
public interface ISubject
{void Attach(IObserver observer);void Detach(IObserver observer);void Notify();
}
3.1.3 具體被觀察者
// 具體目標
public class ConcreteSubject : ISubject
{// 觀察者列表private List<IObserver> _observers = new();// 目標狀態private string _state;// 注冊觀察者: 將觀察者對象注冊到目標對象中public void Attach(IObserver observer) => _observers.Add(observer);// 注銷觀察者: 移除一個觀察者public void Detach(IObserver observer) => _observers.Remove(observer);// 通知觀察者: 改變目標對象的狀態,觸發通知public void Notify(){foreach (var observer in _observers)observer.Update(_state);}// 設置目標狀態: 改變目標對象的狀態public void SetState(string state){_state = state;Notify();}
}
3.1.4 具體觀察者
// 具體觀察者
public class ConcreteObserver : IObserver
{// 接收通知并處理public void Update(string message) => Console.WriteLine($"Received: {message}");
}
3.1.5 Client
using System;class Program
{static void Main(string[] args){// 創建具體目標對象ConcreteSubject subject = new ConcreteSubject();// 創建多個具體觀察者對象ConcreteObserver observer1 = new ConcreteObserver();ConcreteObserver observer2 = new ConcreteObserver();ConcreteObserver observer3 = new ConcreteObserver();// 將觀察者對象注冊到目標對象中subject.Attach(observer1);subject.Attach(observer2);subject.Attach(observer3);// 改變目標對象的狀態,觸發通知Console.WriteLine("第一次狀態更新:");subject.SetState("Hello, Observers!");// 移除一個觀察者subject.Detach(observer2);// 再次改變目標對象的狀態,觸發通知Console.WriteLine("\n第二次狀態更新:");subject.SetState("State has changed!");}
}
結果:
第一次狀態更新:
Received: Hello, Observers!
Received: Hello, Observers!
Received: Hello, Observers!第二次狀態更新:
Received: State has changed!
Received: State has changed!
3.1.6 UML時序圖
3.2 特點
- 符合設計模式原生定義,代碼結構清晰。
- 強類型約束,編譯時檢查接口實現。
- 對語言無特殊要求,通用性強。
- 顯式依賴關系,邏輯透明。
- 適用于簡單場景,無需框架支持。
四、其他實現方式
4.1 委托與事件(.NET 原生實現)
- 機制:利用語言或框架提供的事件監聽機制,被觀察者觸發事件,觀察者通過監聽器接收事件。
4.1.1 示例
public class EventSubject
{public event EventHandler<string> StateChanged;private string _state;public void SetState(string state){_state = state;StateChanged?.Invoke(this, _state);}
}public class EventObserver
{public void Subscribe(EventSubject subject){subject.StateChanged += HandleStateChange;}private void HandleStateChange(object sender, string message){Console.WriteLine($"Event received: {message}");}public void Unsubscribe(EventSubject subject){subject.StateChanged -= HandleStateChange;}
}
class Program{static void Main(string[] args){// 創建 EventSubject 和 EventObserver 對象EventSubject subject = new EventSubject();EventObserver observer = new EventObserver();// 訂閱事件observer.Subscribe(subject);Console.WriteLine("Observer has subscribed to the subject.");// 改變狀態,觸發事件subject.SetState("State 1");subject.SetState("State 2");// 取消訂閱observer.Unsubscribe(subject);Console.WriteLine("Observer has unsubscribed from the subject.");// 再次改變狀態,觀察是否還會觸發事件subject.SetState("State 3"); // 不會觸發事件,因為已取消訂閱}}
結果
Observer has subscribed to the subject.
Event received: State 1
Event received: State 2
Observer has unsubscribed from the subject.
4.1.2 UML類圖
4.1.3 特點
- 代碼更加簡潔,輕量級,利用語言的內置特性,減少了手動管理觀察者列表的復雜性。
- 內置線程安全的事件觸發機制(?.Invoke)
- 支持多播(多個觀察者訂閱同一事件)
- 對于一些復雜的業務邏輯,可能無法完全滿足需求,因為事件機制通常是基于固定的事件類型和參數進行設計的,不夠靈活。
- 而且如果事件的定義不合理,可能會導致系統的可擴展性和維護性變差。
- 無法跨模塊解耦(需直接訪問事件)。
4.1.4 適用場景
- GUI 事件處理(如按鈕點擊)。
- 單模塊內的局部解耦。
- 適合簡單通知邏輯且不涉及復雜數據流的場景。
4.2 IObservable 和 IObserver 接口
核心思想:使用.NET框架內置的觀察者模式標準化接口。
4.2.1 接口概述
接口 | 角色 | 職責 |
---|---|---|
`IObservable<T>` | 被觀察對象 | 數據/事件的生產者 |
`IObserver<T>` | 觀察者 | 數據/事件的消費者 |
4.2.1.1 被觀察者接口 : IObservable
namespace System
{/// <summary>/// 定義了一個基于推送的事件通知提供者/ 被觀察者/// </summary>/// <typeparam name="T">提供通知信息的對象類型。</typeparam>public interface IObservable<out T>{/// <summary>/// 通知提供者有一個觀察者將要接收通知。/// </summary>/// observer">將要接收通知的對象。/// <returns>一個接口引用,允許觀察者在提供者完成發送通知之前停止接收通知。</returns>IDisposable Subscribe(IObserver<T> observer);}
}
IObservable<T>
是一個接口,屬于 C# 中的事件驅動編程模型,是響應式編程(Reactive Programming)的核心接口之一。
它定義了一個基于推送的事件通知機制,允許觀察者(IObserver<T>
)訂閱
通知源(IObservable<T>
),并在通知源產生數據或事件時接收通知。
- 泛型參數
T
:表示通知中攜帶的數據類型。 - Subscribe方法:是
IObservable<T>
的核心方法。- 它接收一個實現了
IObserver<T>
接口的對象作為參數,表示觀察者。 - 當調用
Subscribe
方法時,觀察者會注冊到通知源,從而能夠接收通知。 - 方法返回一個
IDisposable
對象,觀察者可以通過調用其Dispose
方法來取消訂閱,停止接收通知。
- 它接收一個實現了
4.2.1.2 觀察者接口 : IObserver
namespace System
{/// <summary>/// 提供一種接收基于推送的通知的機制。//觀察者/// </summary>/// <typeparam name="T">提供通知信息的對象類型。</typeparam>public interface IObserver<in T>{/// <summary>/// 向觀察者提供新的數據。/// </summary>/// <param name="value">當前的通知信息。</param>void OnNext(T value);/// <summary>/// 通知觀察者提供者遇到了錯誤條件。/// </summary>/// <param name="error">一個提供有關錯誤的額外信息的對象。</param>void OnError(Exception error);/// <summary>/// 通知觀察者提供者已經完成發送基于推送的通知。/// </summary>void OnCompleted();}
}
IObserver<T>
它定義了一個觀察者的角色,用于接收來自通知源(IObservable<T>
)的推送通知。
- 泛型參數
T
:表示通知中攜帶的數據類型。 OnNext
方法:當通知源有新的數據可用時,調用此方法向觀察者傳遞數據。參數value
是當前的通知信息。OnError
方法:當通知源在發送通知過程中遇到錯誤時,調用此方法通知觀察者。參數error
是一個Exception
對象,提供有關錯誤的詳細信息。OnCompleted
方法:當通知源完成所有通知的發送后,調用此方法通知觀察者。這表示通知源不會再發送任何新的通知。
4.2.2 示例
4.2.2.1 具體被觀察者Subject:實現 IObservable
4.2.2.1.1 訂閱管理 (Subscribe
方法)
public IDisposable Subscribe(IObserver<string> observer) {lock (_lock) {if (!_observers.Contains(observer)) {_observers.Add(observer);}}return new Unsubscriber(_observers, observer, _lock);
}
-
功能:允許觀察者訂閱主題。
-
線程安全:通過
lock
確保多線程下訂閱操作的原子性。 -
防止重復訂閱:檢查觀察者是否已存在。
-
返回
Unsubscriber
:通過IDisposable
實現優雅的取消訂閱機制。關于
IDisposable
,可以查看我的另一篇文章C#中的非托管資源釋放機制詳解|Finalizer與Dispose模式-CSDN博客。
4.2.2.1.2 取消訂閱 (Unsubscriber
類)
private class Unsubscriber : IDisposable {// ... 略去字段和構造函數 ...public void Dispose() {lock (_lock) {if (_observer != null && _observers.Contains(_observer)) {_observers.Remove(_observer);_observer = null;}}}
}
- 功能:調用
Dispose()
時從觀察者列表中移除目標觀察者。 - 資源釋放:移除后置空引用,避免內存泄漏。
- 線程安全:通過
lock
確保取消訂閱的原子性。
4.2.2.1.3 狀態通知 (NotifyObservers
方法)
public void NotifyObservers(string state) {lock (_lock) {foreach (var observer in _observers) {observer.OnNext(state);}}
}
- 功能:遍歷所有觀察者,調用其
OnNext
方法推送新狀態。 - 線程安全:遍歷期間鎖定列表,防止并發修改。
4.2.2.1.4 完成與錯誤通知 (OnCompleted
和 OnError
)
public void OnCompleted() {lock (_lock) {foreach (var observer in _observers) {observer.OnCompleted();}_observers.Clear();}
}public void OnError(Exception error) {lock (_lock) {foreach (var observer in _observers) {observer.OnError(error);}_observers.Clear();}
}
- 完成通知:調用所有觀察者的
OnCompleted()
,清空列表(終止后續通知)。 - 錯誤通知:調用所有觀察者的
OnError()
,清空列表。 - 線程安全:全程加鎖。
4.2.2.1.5 線程安全設計
- 鎖對象
_lock
:所有對觀察者列表的操作(增、刪、遍歷)均通過lock (_lock)
確保原子性。 - 場景覆蓋:
- 多線程同時訂閱/取消訂閱。
- 通知過程中觸發新的訂閱/取消訂閱。
4.2.2.1.6 全部代碼
-
具體目標
using System; using System.Collections.Generic; using System.Threading;// Subject 類實現了 IObservable<string> 接口,用于管理觀察者并通知狀態變化 public class Subject : IObservable<string> {// 用于存儲所有訂閱的觀察者private List> _observers = new();// 用于線程安全的鎖對象private readonly object _lock = new();// 訂閱方法,允許觀察者訂閱狀態變化public IDisposable Subscribe(IObserver observer){lock (_lock) // 確保線程安全{if (!_observers.Contains(observer)) // 防止重復訂閱{_observers.Add(observer);}}// 返回一個 Unsubscriber 對象,用于取消訂閱return new Unsubscriber(_observers, observer, _lock);}// Unsubscriber 類實現了 IDisposable 接口,用于取消觀察者的訂閱private class Unsubscriber : IDisposable{private List> _observers;private IObserver _observer;private readonly object _lock;// 構造函數,初始化觀察者列表、當前觀察者和鎖對象public Unsubscriber(List> observers, IObserver observer, object lockObj){_observers = observers;_observer = observer;_lock = lockObj;}// Dispose 方法用于取消訂閱public void Dispose(){lock (_lock) // 確保線程安全{if (_observer != null && _observers.Contains(_observer)){_observers.Remove(_observer); // 從觀察者列表中移除當前觀察者_observer = null; // 清空當前觀察者引用}}}}// SetState 方法用于設置狀態并通知所有觀察者public void NotifyObservers(string state){lock (_lock) // 確保線程安全{foreach (var observer in _observers){observer.OnNext(state); // 調用觀察者的 OnNext 方法通知狀態變化}}}// OnCompleted 方法用于通知所有觀察者完成事件public void OnCompleted(){lock (_lock) // 確保線程安全{foreach (var observer in _observers){observer.OnCompleted(); // 調用觀察者的 OnCompleted 方法通知完成事件}_observers.Clear(); // 清空觀察者列表}}// OnError 方法用于通知所有觀察者發生錯誤public void OnError(Exception error){lock (_lock) // 確保線程安全{foreach (var observer in _observers){observer.OnError(error); // 調用觀察者的 OnError 方法通知錯誤事件}_observers.Clear(); // 清空觀察者列表}} }
-
具體觀察者
ConcreteObserver
類實現了IObserver<string>
接口,用于接收被觀察者的狀態變化通知。OnNext
方法:接收狀態變化通知,并輸出狀態信息。OnError
方法:接收錯誤通知,并輸出錯誤信息。OnCompleted
方法:接收完成通知,并輸出完成信息。
// ConcreteObserver 類實現了 IObserver<string> 接口,用于接收狀態變化通知 public class ConcreteObserver : IObserver<string> {// 觀察者的名稱,用于區分不同的觀察者private readonly string _name;// 構造函數,初始化觀察者名稱public ConcreteObserver(string name){_name = name;}// OnNext 方法用于接收狀態變化通知public void OnNext(string value){Console.WriteLine($"{_name} received: {value}"); // 輸出接收到的狀態信息}// OnError 方法用于接收錯誤通知public void OnError(Exception error){Console.WriteLine($"{_name} received an error: {error.Message}"); // 輸出錯誤信息}// OnCompleted 方法用于接收完成通知public void OnCompleted(){Console.WriteLine($"{_name} received completion notification."); // 輸出完成通知} }
-
客戶端使用實例(Client)
using System;namespace IObservableTDemo {class Program{static void Main(string[] args){// 1. 創建被觀察者和觀察者Subject subject = new Subject();ConcreteObserver observer1 = new ConcreteObserver("observer 1");ConcreteObserver observer2 = new ConcreteObserver("observer 2");// 2. 訂閱觀察者IDisposable subscription1 = subject.Subscribe(observer1);IDisposable subscription2 = subject.Subscribe(observer2);// 狀態通知subject.NotifyObservers("Hello, World!");// 取消訂閱 observer2subscription2.Dispose();// 再次設置狀態,觀察者1會收到通知,觀察者2不會收到subject.NotifyObservers("Hello again!");// 模擬錯誤 此時會清空觀察者列表subject.OnError(new Exception("Something went wrong!"));// 再次設置狀態,觀察者1和觀察者2都不會收到通知subject.NotifyObservers("Hello again!");// 再次訂閱觀察者IDisposable subscription3 = subject.Subscribe(observer1);// 再次設置狀態,觀察者1收到通知subject.NotifyObservers("Hello again!");// 完成通知subject.OnCompleted();//再次設置狀態,都不會收到通知subject.NotifyObservers("Hello again!");// 等待用戶輸入后退出Console.WriteLine("Press any key to exit...");Console.ReadKey();}} }
-
結果
Observer 1 received: Hello, World! Observer 2 received: Hello, World! Observer 1 received: Hello again! Observer 1 received an error: Something went wrong! Observer 3 received: Hello again! Observer 3 received completion notification. Press any key to exit...
4.2.3 UML類圖
4.2.4 擴展內容
4.2.4.1 異步通知
可以通過Task
或async/await
來實現異步通知。
-
異步接口
- 每個方法都返回Task以支持異步操作
- 完全異步的觀察者接口
public interface IAsyncObserver<in T> {Task OnNextAsync(T value);Task OnErrorAsync(Exception exception);Task OnCompletedAsync(); }
-
順序異步通知機制
- 嚴格按順序通知觀察者
- 每個觀察者處理完成后再通知下一個
- 保留通知順序性
foreach (var observer in observersCopy) {try{await observer.OnNextAsync(value);}// ... }
-
完整代碼
using System; using System.Collections.Generic; using System.Threading.Tasks; using System.Threading;#region Client Code var subject = new AsyncSubject<string>();var observer1 = new AsyncObserver("observer 1"); var observer2 = new AsyncObserver("observer 2");// 訂閱觀察者 using var subscription1 = subject.Subscribe(observer1); using var subscription2 = subject.Subscribe(observer2);// 異步通知 await subject.NotifyAsync("First Message");// 取消訂閱 observer2 subscription2.Dispose();// 再次通知 await subject.NotifyAsync("Second Message");// 錯誤通知 await subject.NotifyErrorAsync(new Exception("Test Error"));// 完成通知 await subject.OnCompletedAsync();Console.WriteLine("Press any key to exit..."); Console.ReadKey(); #endregion#region Interfaces public interface IAsyncObserver<in T> {Task OnNextAsync(T value);Task OnErrorAsync(Exception exception);Task OnCompletedAsync(); }public interface IAsyncObservable<out T> {IDisposable Subscribe(IAsyncObserver observer); } #endregion#region Async Subject public class AsyncSubject<T> : IAsyncObservable<T> {private readonly List> _observers = new();private readonly object _lock = new();public IDisposable Subscribe(IAsyncObserver observer){lock (_lock){if (!_observers.Contains(observer)){_observers.Add(observer);}}return new Unsubscriber(() =>{lock (_lock){_observers.Remove(observer);}});}public async Task NotifyAsync(T value){IAsyncObserver<T>[] observersCopy;lock (_lock){observersCopy = _observers.ToArray();}foreach (var observer in observersCopy){try{await observer.OnNextAsync(value);}catch (Exception ex){Console.WriteLine($"Notification failed: {ex.Message}");}}}public async Task NotifyErrorAsync(Exception error){IAsyncObserver<T>[] observersCopy;lock (_lock){observersCopy = _observers.ToArray();_observers.Clear();}foreach (var observer in observersCopy){try{await observer.OnErrorAsync(error);}catch (Exception ex){Console.WriteLine($"Error notification failed: {ex.Message}");}}}public async Task OnCompletedAsync(){IAsyncObserver<T>[] observersCopy;lock (_lock){observersCopy = _observers.ToArray();_observers.Clear();}foreach (var observer in observersCopy){try{await observer.OnCompletedAsync();}catch (Exception ex){Console.WriteLine($"Completion notification failed: {ex.Message}");}}}private class Unsubscriber : IDisposable{private readonly Action _unsubscribeAction;public Unsubscriber(Action unsubscribeAction){_unsubscribeAction = unsubscribeAction;}public void Dispose() => _unsubscribeAction?.Invoke();} } #endregion#region Async Observer public class AsyncObserver : IAsyncObserver<string> {private readonly string _name;public AsyncObserver(string name) => _name = name;public async Task OnNextAsync(string value){await Task.Delay(100); // 模擬異步處理Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] {_name} received: {value}");}public async Task OnErrorAsync(Exception exception){await Task.Delay(100); // 模擬異步處理Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] {_name} error: {exception.Message}");}public async Task OnCompletedAsync(){await Task.Delay(100); // 模擬異步處理Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] {_name} completed");} } #endregion
結果
[22:14:48.269] Observer 1 received: First Message [22:14:48.449] Observer 2 received: First Message [22:14:48.554] Observer 1 received: Second Message [22:14:48.662] Observer 1 error: Test Error Press any key to exit...
4.2.4.2 事件過濾
可以通過在通知方法中添加過濾邏輯來實現事件過濾。
FilteredObservable
的構造函數接收一個過濾函數(Func<T, bool>
),用于決定哪些消息需要通知給觀察者。- 在
Notify
方法中,只有滿足過濾條件的消息才會被發送。
using System;
using System.Collections.Generic;
using System.Threading.Tasks;#region Client Codevar subject = new FilteredObservable<string>(s => s.StartsWith("[IMPORTANT]"));var observer1 = new Observer("Observer 1");
var observer2 = new Observer("Observer 2");using var subscription1 = subject.Subscribe(observer1);
using var subscription2 = subject.Subscribe(observer2);// 這些消息將被過濾
subject.Notify("Normal Message 1");
subject.Notify("Normal Message 2");// 這些消息將被傳遞
subject.Notify("[IMPORTANT] Message 1");
subject.Notify("[IMPORTANT] Message 2");// 取消訂閱 observer2
subscription2.Dispose();// 再次通知
subject.Notify("[IMPORTANT] Message 3"); // 只有 observer1 收到// 錯誤通知(不過濾)
subject.NotifyError(new Exception("Critical Error"));// 完成通知(不過濾)
subject.OnCompleted();#endregion#region Subjectpublic class FilteredObservable<T> : IObservable<T>
{private readonly List<IObserver<T>> _observers = new();private readonly Func<T, bool> _filter;private readonly object _lock = new();public FilteredObservable(Func<T, bool> filter){_filter = filter ?? throw new ArgumentNullException(nameof(filter));}public IDisposable Subscribe(IObserver<T> observer){lock (_lock){if (!_observers.Contains(observer))_observers.Add(observer);}return new Unsubscriber(() =>{lock (_lock){_observers.Remove(observer);}});}public async Task Notify(T value){if (!_filter(value)) return;IObserver<T>[] observersCopy;lock (_lock){observersCopy = _observers.ToArray();}foreach (var observer in observersCopy){try{observer.OnNext(value);}catch (Exception ex){Console.WriteLine($"Notification failed: {ex.Message}");}}}public async Task NotifyError(Exception error){IObserver<T>[] observersCopy;lock (_lock){observersCopy = _observers.ToArray();_observers.Clear();}foreach (var observer in observersCopy){try{ observer.OnError(error);}catch (Exception ex){Console.WriteLine($"Error notification failed: {ex.Message}");}}}public async Task OnCompleted(){IObserver<T>[] observersCopy;lock (_lock){observersCopy = _observers.ToArray();_observers.Clear();}foreach (var observer in observersCopy){try{observer.OnCompleted();}catch (Exception ex){Console.WriteLine($"Completion notification failed: {ex.Message}");}}}private class Unsubscriber : IDisposable{private readonly Action _unsubscribeAction;public Unsubscriber(Action unsubscribeAction) => _unsubscribeAction = unsubscribeAction;public void Dispose() => _unsubscribeAction?.Invoke();}
}#endregion#region Observerpublic class Observer : IObserver<string>
{private readonly string _name;public Observer(string name){_name = name;}public void OnCompleted(){Console.WriteLine($"{_name} completed.");}public void OnError(Exception error){Console.WriteLine($"{_name} error: {error.Message}");}public void OnNext(string value){Console.WriteLine($"{_name} received: {value}");}
}
#endregion
Observer 1 received: [IMPORTANT] Message 1
Observer 2 received: [IMPORTANT] Message 1
Observer 1 received: [IMPORTANT] Message 2
Observer 2 received: [IMPORTANT] Message 2
Observer 1 received: [IMPORTANT] Message 3
Observer 1 error: Critical Error
4.2.5 特點
- 依賴框架:依賴于.NET框架,不適合跨平臺或非.NET環境。
- 學習曲線:需要一定的.NET框架知識才能熟練使用。
- 與LINQ集成:可以使用LINQ查詢語法對事件流進行操作,簡化代碼。
- 性能優化:通過高效的訂閱機制和事件分發,提升了性能。
- 擴展性強:支持事件過濾、組合、轉換等高級功能。
- 線程安全:框架提供了線程安全的機制,減少了線程沖突的風險。
- 標準化:基于.NET框架的標準接口,具有統一的規范。
4.2.6 適用場景
- 復雜事件處理:適用于事件流復雜、需要高級操作(如過濾、組合、轉換)的場景。
- 多線程環境:在多線程或異步編程中,可以有效避免線程安全問題。
- 數據流處理:適合處理數據流,如傳感器數據、實時消息等。
- 與.NET生態系統集成:與.NET的其他功能(如LINQ、Task并行庫)無縫集成。
4.3 System.Reactive
System.Reactive 是基于IObservable<T>
和IObserver<T>
的擴展庫,用于處理事件流和異步數據流。
它將事件和數據流抽象為可觀察序列,并通過 LINQ 風格的操作符實現訂閱、過濾、轉換和合并。
核心思想:使用響應式擴展庫處理復雜事件流。Rx可以這樣定義:Rx = Observables + LINQ + Schedulers。
4.3.1 安裝
通過NuGet包管理器安裝System.Reactive包
4.3.2 示例
using System.Reactive.Linq;
using System.Reactive.Subjects;// 創建可觀察序列
var subject = new Subject<string>();
var observable = subject.AsObservable();// 訂閱觀察者
var subscription = observable.Where(msg => msg.StartsWith("IMPORTANT")).Subscribe(msg => Console.WriteLine($"Rx received: {msg}"));// 推送消息
subject.OnNext("IMPORTANT: System update");
subject.OnNext("Normal message"); // 被過濾// 取消訂閱
subscription.Dispose();
輸出:
Rx received: IMPORTANT: System update
4.3.3 特點
- 強大的事件流處理(過濾、映射、合并等)
- 支持LINQ查詢操作
- 異步事件處理支持
- 自動管理資源釋放(通過IDisposable)
- 需引入第三方庫(System.Reactive)
- 學習曲線較陡
4.3.4 適用場景
- 適合復雜事件流處理
- 實時數據更新
- 多線程和異步編程場景
- 功能強大且與.NET生態系統無縫集成
- 優先選擇在需要復雜事件流處理的場景
五、使用場景
以下是觀察者模式不同實現方式的對比總結:
實現方式 | 適合場景 | 選擇建議 |
---|---|---|
經典實現 | 適用于簡單的事件通知場景,如 GUI 編程中組件間的交互。 | 當事件邏輯簡單、不需要復雜的數據流處理時,適合使用經典實現。 |
委托與事件實現 | 適用于.NET中的事件處理,尤其是需要在多個組件或類之間傳遞事件的場景。 | 如果使用的是.NET框架,并且需要在類之間傳遞事件,委托與事件是首選。 |
`IObservable<T>`/`IObserver<T>`實現 | 適用于需要靈活處理數據流的場景,如異步數據處理、多線程環境下的事件推送。 | 當需要更靈活地控制數據流,或者需要支持多個觀察者訂閱時,`IObservable<T>`/`IObserver<T>`是一個不錯的選擇。 |
`System.Reactive`實現 | 適用于復雜的數據流處理,尤其是需要對事件進行轉換、過濾、組合等操作的場景。 | 如果涉及復雜的數據流處理,或者需要響應式編程的支持,`System.Reactive`是最佳選擇。 |
六、擴展
6.1 初始化和基礎架構搭建
6.1.1 初始化和基礎架構搭建
6.1.1.1 一個觀察者觀察多個目標
-
場景:有時觀察者需要依賴多個目標對象。
-
問題:如果觀察者無法區分通知的來源,導致無法針對不同目標對象做出準確響應。
示例:假設一個用戶界面中有一個觀察者用于監控多個數據源(如溫度、濕度和空氣質量傳感器)。
當任何一個數據源更新時,觀察者都會收到通知,但無法區分是哪個數據源發生了變化,從而無法針對性地更新界面元素。
-
改動:對目標對象的
Update接口
進行擴展,確保觀察者能夠準確識別通知的來源
using System;
using System.Collections.Generic;public class Subject
{private List<IObserver> observers = new List<IObserver>();// 注冊觀察者public void Attach(IObserver observer){observers.Add(observer);}// 注銷觀察者public void Detach(IObserver observer){observers.Remove(observer);}// 通知所有觀察者public void NotifyObservers(string message){foreach (var observer in observers){observer.Update(this, message); // 將自己作為參數傳遞給觀察者}}
}
6.1.1.2 目標多而觀察者少
-
問題
在傳統觀察者模式中,目標對象(
Subject
)直接保存觀察者的引用。這種方式簡單直觀,但當目標對象數量多而觀察者數量少時,會產生明顯弊端:- 存儲開銷問題
每個目標對象都要分配存儲空間保存觀察者引用,即使某些目標對象沒有觀察者。
這會導致大量不必要的存儲開銷,尤其在目標對象數量龐大時,開銷更加顯著。
-
生命周期管理問題
如果目標對象的生命周期較短,而觀察者集合被強引用保留,可能會導致內存泄漏。
因為即使目標對象被垃圾回收,觀察者集合仍然占用內存,無法被清理。
-
解決方案
- 使用
HashSet<IObserver>
為了解決存儲開銷問題,可以考慮使用HashSet<IObserver>
來存儲觀察者。HashSet
提供了高效的動態添加和移除操作,能夠更好地支持觀察者在運行時的動態變化。
- 優點:
- 動態管理觀察者:
HashSet
提供高效的動態添加和移除操作,支持觀察者在運行時的動態變化。 - 避免重復存儲:
HashSet
自動去重,避免了重復存儲相同的觀察者。 - 提高存儲效率:通過哈希表實現快速查找和插入操作,減少了存儲和檢索觀察者的開銷。
- 動態管理觀察者:
- 缺點:
- 內存泄漏風險:如果目標對象被垃圾回收,但觀察者集合(
HashSet<IObserver>
)仍然存在引用,那么這些觀察者可能不會被正確清理,從而導致內存泄漏。 - 生命周期管理復雜:需要手動管理目標對象和觀察者集合的生命周期,確保在目標對象被銷毀時,觀察者集合也被正確清理。
- 內存泄漏風險:如果目標對象被垃圾回收,但觀察者集合(
- 使用
ConditionalWeakTable
ConditionalWeakTable
是一種特殊的哈希表,它允許鍵(目標對象)在沒有其他強引用時被垃圾回收,而不會影響值(觀察者集合)的存在。
通過ConditionalWeakTable
,只有真正有觀察者的目標對象才會占用存儲空間,同時避免了內存泄漏問題。
以下是使用ConditionalWeakTable
實現觀察者模式的示例代碼:
using System.Runtime.CompilerServices;
using System.Collections.Generic;// 目標和觀察者接口省略...public class Subject : ISubject
{private static readonly object _syncLock = new object();private static readonly ConditionalWeakTable<Subject, HashSet<IObserver>> observerMap = new ConditionalWeakTable<Subject, HashSet<IObserver>>();public void Attach(IObserver observer){lock (_syncLock){if (!observerMap.TryGetValue(this, out var observers)){observers = new HashSet();observerMap.Add(this, observers);}observers.Add(observer);}}public void Detach(IObserver observer){lock (_syncLock){if (observerMap.TryGetValue(this, out var observers)){observers.Remove(observer);if (observers.Count == 0){observerMap.Remove(this);}}}}public void NotifyObservers(string message){HashSet<IObserver> observersCopy;lock (_syncLock){if (!observerMap.TryGetValue(this, out var observers)){return;}observersCopy = new HashSet(observers);}foreach (var observer in observersCopy){observer.Update(this, message);}}
}
-
優點
- 避免內存泄漏:
ConditionalWeakTable
允許目標對象在沒有其他強引用時被垃圾回收,從而避免了內存泄漏問題。 - 動態管理:目標對象和觀察者之間的關系是動態的,
ConditionalWeakTable
提供了一種靈活的方式來管理這種關系。 - 優化存儲效率:只有真正有觀察者的目標對象才會占用存儲空間,減少了不必要的存儲開銷。
- 避免內存泄漏:
-
缺點
- 性能開銷:
ConditionalWeakTable
的查找和管理操作比直接使用HashSet<IObserver>
更復雜,可能會引入額外的性能開銷。 - 復雜性增加:代碼的復雜性增加,需要理解
ConditionalWeakTable
的工作機制。
- 性能開銷:
-
實際應用中的權衡
觀察者模式在實際應用中需要根據具體需求選擇合適的實現方式。
如果目標對象數量較多且生命周期較短,推薦使用
ConditionalWeakTable
,以避免內存泄漏并優化存儲效率。如果目標對象生命周期較長且觀察者管理較為簡單,則可以直接使用
HashSet<IObserver>
,以簡化實現和提高性能。
6.1.1.3 目標對象之間存在復雜依賴關系
-
問題
當目標對象存在復雜依賴關系時,直接通知觀察者可能引發以下問題:
- 多次更新:觀察者可能因多個目標對象的改變收到重復通知,導致冗余操作。
- 更新順序問題:目標對象的改變順序可能導致觀察者在狀態未完全更新時收到通知,獲取不一致的狀態。
- 維護成本高:復雜的依賴關系增加了代碼復雜性和維護難度。
-
解決方案
引入一個獨立的更改管理器(ChangeManager) 來封裝和管理復雜的更新邏輯。
-
更改管理器的作用
- 維護映射關系:管理目標對象與觀察者的映射,降低耦合度。
- 定義更新策略:在所有相關目標對象狀態更新完畢后統一通知觀察者,避免冗余和不一致問題。
- 優化更新邏輯:根據依賴關系優化更新流程,確保觀察者只接收一次更新。
-
更改管理器的兩種實現
是一個典型的中介者模式實例,通常以單例模式全局可見,從而確保整個系統中只有一個協調中心。兩種特殊的更改管理器實現:
-
SimpleChangeManager
-
實現方案
- 使用字典維護目標對象與觀察者的映射
- 使用臟標記集合跟蹤需要更新的目標對象
- Commit時統一通知觀察者并去重
-
示例
using System;using System.Collections.Generic;using System.Linq;#region Client Code// 使用簡單更改管理器ChangeManager.Instance = SimpleChangeManager.Instance;var subject = new ConcreteSubject();var observer = new ConcreteObserver();ChangeManager.Instance.Register(subject, observer);subject.State = 42;subject.Notify();ChangeManager.Instance.Commit();// 使用DAG更改管理器ChangeManager.Instance = DAGChangeManager.Instance;var subjectA = new ConcreteSubject();var subjectB = new ConcreteSubject();var dagObserver = new ConcreteObserver();ChangeManager.Instance.Register(subjectA, dagObserver);ChangeManager.Instance.Register(subjectB, dagObserver);((DAGChangeManager)ChangeManager.Instance).AddDependency(subjectA, subjectB);subjectA.State = 10;subjectB.State = 20;subjectA.Notify();subjectB.Notify();ChangeManager.Instance.Commit();#endregion#region IObserver// 觀察者接口public interface IObserver{void Update(ISubject subject);}#endregion#region IObservable// 目標對象接口public interface ISubject{void Notify();}#endregion#region ChangeManager// 更改管理器抽象類public abstract class ChangeManager{public static ChangeManager Instance { get; set; }public abstract void Register(ISubject subject, IObserver observer);public abstract void Unregister(ISubject subject, IObserver observer);public abstract void Notify(ISubject subject);public abstract void Commit();}#endregion#region SimpleChangeManager// 簡單更改管理器實現public sealed class SimpleChangeManager : ChangeManager{private readonly Dictionary> _observers = new();private readonly HashSet<ISubject> _dirtySubjects = new();static SimpleChangeManager(){}private static SimpleChangeManager _instance;public static SimpleChangeManager Instance{get{if (_instance == null){_instance = new SimpleChangeManager();}return _instance;}}public override void Register(ISubject subject, IObserver observer){if (!_observers.ContainsKey(subject)){_observers[subject] = new HashSet();}_observers[subject].Add(observer);}public override void Unregister(ISubject subject, IObserver observer){if (_observers.TryGetValue(subject, out var observers)){observers.Remove(observer);}}public override void Notify(ISubject subject){lock (_dirtySubjects){_dirtySubjects.Add(subject);}}public override void Commit(){HashSet<IObserver> notified = new();List<ISubject> toProcess;lock (_dirtySubjects){toProcess = _dirtySubjects.ToList();_dirtySubjects.Clear();}foreach (var subject in toProcess){if (_observers.TryGetValue(subject, out var observers)){foreach (var observer in observers.Where(observer => notified.Add(observer))){observer.Update(subject);}}}}}#endregion#region ConcreteSubject// 示例使用public class ConcreteSubject : ISubject{public int State { get; set; }public void Notify(){ChangeManager.Instance.Notify(this);}}#endregion#region ConcreteObserverpublic class ConcreteObserver : IObserver{public void Update(ISubject subject){if (subject is ConcreteSubject concreteSubject){Console.WriteLine($"Received update: {concreteSubject.State}");}}}#endregion``````c#Received update: 42
-
特點:總是更新每個目標對象的所有觀察者。實現簡單,易于理解。可能會導致冗余更新,效率較低。
-
適用場景:當目標對象之間沒有復雜的依賴關系,或者更新邏輯簡單時,這種實現方式比較合適。
-
DAGChangeManager
-
實現方案
-
繼承簡單管理器基礎功能
-
添加依賴關系管理
-
使用拓撲排序確保更新順序
1. 使用深度優先搜索(DFS)。
2. 先處理依賴項,再處理當前主題。
3. 最終得到的排序結果是一個線性順序,滿足所有依賴關系 -
示例
#region DAGChangeManager// 基于DAG的復雜更改管理器public sealed class DAGChangeManager : ChangeManager{private readonly Dictionary> _observers = new();private readonly Dictionary<ISubject, HashSet<ISubject>> _dependencies = new();private readonly HashSet<ISubject> _dirtySubjects = new();static DAGChangeManager(){}private static DAGChangeManager _instance;public static DAGChangeManager Instance{get{if (_instance == null){_instance = new DAGChangeManager();}return _instance;}}// 添加依賴關系(dependent 依賴于 dependency)public void AddDependency(ISubject dependent, ISubject dependency){if (!_dependencies.ContainsKey(dependent)){_dependencies[dependent] = new HashSet();}_dependencies[dependent].Add(dependency);}public override void Register(ISubject subject, IObserver observer){if (!_observers.ContainsKey(subject)){_observers[subject] = new HashSet();}_observers[subject].Add(observer);}public override void Unregister(ISubject subject, IObserver observer) {if (_observers.TryGetValue(subject, out var observers)){observers.Remove(observer);}}public override void Notify(ISubject subject){lock (_dirtySubjects){_dirtySubjects.Add(subject);}}public override void Commit(){List<ISubject> processingOrder;lock (_dirtySubjects){processingOrder = TopologicalSort(_dirtySubjects);_dirtySubjects.Clear();}HashSet<IObserver> notified = new();foreach (var subject in processingOrder){if (_observers.TryGetValue(subject, out var observers)){foreach (var observer in observers.Where(observer => notified.Add(observer))){observer.Update(subject);}}}}private List<ISubject> TopologicalSort(HashSet<ISubject> subjects){var sorted = new List<ISubject>();var visited = new HashSet<ISubject>();foreach (var subject in subjects.OrderBy(s => s.GetHashCode())){Visit(subject, visited, sorted);}return sorted;}private void Visit(ISubject subject, HashSet<ISubject> visited, List<ISubject> sorted){if (!visited.Add(subject)) return;if (_dependencies.TryGetValue(subject, out var dependencies)){foreach (var dependency in dependencies){Visit(dependency, visited, sorted);}}sorted.Add(subject);}}#endregion``````c#Received update: 20
- **特點**:處理目標對象及其觀察者之間依賴關系構成的**無環有向圖(DAG,Directed Acyclic Graph)**。
-
優點:可以避免冗余更新,確保觀察者只接收一次更新。
-
缺點:實現復雜度較高,需要維護依賴關系圖。
-
適用場景:當觀察者可能依賴多個目標對象,且目標對象之間存在復雜的依賴關系時,這種實現方式更好。
-
總結
更改管理器(ChangeManager)是一種優化機制,用于封裝復雜更新邏輯,簡化目標對象與觀察者之間的依賴關系。它通過以下方式實現優化:
- 職責分離:將映射關系和更新邏輯封裝到獨立對象中。
- 統一通知:在所有目標對象狀態更新完畢后,一次性通知觀察者。
- 優化策略:避免冗余更新。
更改管理器可以是簡單的SimpleChangeManager或復雜的DAGChangeManager,具體取決于系統需求。它通常以單例模式全局可見,作為系統的協調中心。
6.2 注冊機制
6.2.1 問題
在觀察者模式中,傳統的事件通知機制可能存在以下問題:
- 通知效率低:目標對象可能向所有觀察者發送通知,即使某些觀察者并不關心某些事件。
- 耦合度高:觀察者與目標對象之間的依賴關系較強,難以靈活調整。
- 缺乏靈活性:觀察者無法動態選擇關注的事件類型,難以適應復雜的應用場景。
這些問題可能導致系統性能下降,代碼難以維護和擴展。
6.2.2 解決方案
通過顯式注冊機制,觀察者可以明確指定其感興趣的事件類型,目標對象僅向已注冊的觀察者發送相關通知。
6.2.2.1 實現思路
- 引入“方面(Aspect)”概念:將目標對象的狀態變化分解為多個獨立的方面,每個方面代表一種特定類型的變更。
- 觀察者選擇性注冊:觀察者可以根據需要注冊對特定方面的興趣,從而只接收關注的事件通知。
- 目標對象優化通知:目標對象僅向已注冊特定方面的觀察者發送通知,避免不必要的消息傳遞。
6.2.2.2 示例
-
定義方面(Aspect)枚舉
方面(Aspect)枚舉:定義了目標對象可能的狀態變化類型,例如狀態變化、數據更新和錯誤發生。
// 定義方面(Aspect)枚舉,表示目標對象可能的狀態變化類型 public enum Aspect {StateChange,DataUpdate,ErrorOccurred }
-
目標對象類 Subject
- 使用字典存儲每個方面對應的觀察者列表。
- 提供注冊和取消注冊的方法,允許觀察者顯式指定感興趣的方面。
- 提供通知方法,僅向注冊了特定方面的觀察者發送通知。
// 目標對象類 public class Subject {// 用于存儲觀察者訂閱的方面private Dictionary>> observers = new Dictionary>>();public Subject(){// 初始化方面列表foreach (Aspect aspect in Enum.GetValues(typeof(Aspect))){observers[aspect] = new List>();}}// 注冊觀察者public void RegisterObserver(Aspect aspect, Action observer){observers[aspect].Add(observer);Console.WriteLine($"Observer registered for aspect: {aspect}");}// 取消注冊觀察者public void UnregisterObserver(Aspect aspect, Action observer){observers[aspect].Remove(observer);Console.WriteLine($"Observer unregistered from aspect: {aspect}");}// 通知觀察者public void NotifyObservers(Aspect aspect, string message){if (observers[aspect].Count > 0){Console.WriteLine($"Notifying observers for aspect: {aspect}");foreach (var observer in observers[aspect]){observer(message);}}else{Console.WriteLine($"No observers registered for aspect: {aspect}");}}// 模擬目標對象狀態變化public void ChangeState(){Console.WriteLine("Subject state changed.");NotifyObservers(Aspect.StateChange, "State has changed.");}public void UpdateData(){Console.WriteLine("Subject data updated.");NotifyObservers(Aspect.DataUpdate, "Data has been updated.");}public void ErrorOccurred(){Console.WriteLine("Error occurred in the subject.");NotifyObservers(Aspect.ErrorOccurred, "An error has occurred.");} }
-
觀察者類 Observer
- 包含多個回調方法,分別對應不同的方面。
- 觀察者可以根據需要注冊對特定方面的興趣。
// 觀察者類 public class Observer {private string name;public Observer(string name){this.name = name;}public void OnStateChange(string message){Console.WriteLine($"{name} received state change notification: {message}");}public void OnDataUpdate(string message){Console.WriteLine($"{name} received data update notification: {message}");}public void OnError(string message){Console.WriteLine($"{name} received error notification: {message}");} }
-
Client
- 創建目標對象和觀察者。
- 觀察者顯式注冊對特定方面的興趣。
- 模擬目標對象的狀態變化,觀察通知機制的運行。
// 測試程序 public class Program {public static void Main(){// 創建目標對象Subject subject = new Subject();// 創建觀察者Observer observer1 = new Observer("Observer1");Observer observer2 = new Observer("Observer2");// 觀察者1注冊對所有方面的興趣subject.RegisterObserver(Aspect.StateChange, observer1.OnStateChange);subject.RegisterObserver(Aspect.DataUpdate, observer1.OnDataUpdate);subject.RegisterObserver(Aspect.ErrorOccurred, observer1.OnError);// 觀察者2僅注冊對錯誤方面的興趣subject.RegisterObserver(Aspect.ErrorOccurred, observer2.OnError);// 模擬目標對象狀態變化subject.ChangeState();subject.UpdateData();subject.ErrorOccurred();// 觀察者2取消對錯誤方面的興趣subject.UnregisterObserver(Aspect.ErrorOccurred, observer2.OnError);// 再次觸發錯誤事件,觀察者2不再接收通知subject.ErrorOccurred();} }
-
結果
Observer registered for aspect: StateChange Observer registered for aspect: DataUpdate Observer registered for aspect: ErrorOccurred Observer registered for aspect: ErrorOccurred Subject state changed. Notifying observers for aspect: StateChange Observer1 received state change notification: State has changed. Subject data updated. Notifying observers for aspect: DataUpdate Observer1 received data update notification: Data has been updated. Error occurred in the subject. Notifying observers for aspect: ErrorOccurred Observer1 received error notification: An error has occurred. Observer2 received error notification: An error has occurred. Observer unregistered from aspect: ErrorOccurred Error occurred in the subject. Notifying observers for aspect: ErrorOccurred Observer1 received error notification: An error has occurred.
6.2.2.3 優點
- 提高通知效率:目標對象僅發送觀察者關心的事件。
- 降低耦合度:觀察者與目標對象之間保持松散耦合。
- 靈活性強:觀察者可根據需求動態調整關注的事件類型。
6.2.2.4 適用場景
這種方法通過事件驅動機制實現了高效的通信,適用于需要靈活配置和優化性能的場景,例如:
- 多用戶系統中對不同事件的關注。
- 復雜系統的狀態監控和事件響應。
- 需要動態調整事件監聽的應用。
6.3 觸發機制
6.3.1 觸發之前
在通知觀察者之前,目標對象的狀態必須是完整且正確的,否則觀察者可能會基于錯誤的狀態進行操作,從而引發問題。
6.3.2 更新的觸發者
目標對象與觀察者通過通知機制保持同步,存在以下兩種觸發方式:
- 自動觸發:目標對象狀態變化后自動執行
Notify
。優勢在于無需客戶手動操作,但可能因連續操作頻繁觸發更新,影響效率。 - 手動觸發:客戶在狀態變化完成后適時調用
Notify
。優點在于可避免不必要的中間更新,但增加了客戶操作負擔,且存在因遺漏調用而導致錯誤的風險。
6.3.3 示例
6.3.3.1 抽象目標對象
observers
:存儲觀察者的集合。mainState
和secondaryState
:主題的狀態信息。Attach
和Detach
:用于添加和移除觀察者。Notify
:抽象方法,由具體主題類實現,用于通知觀察者。PrepareUpdate
:用于準備狀態更新,但不觸發通知。ShowState
:用于打印當前狀態。
#region abstract Subject// 抽象目標對象
abstract class Subject
{protected HashSet<IObserver> observers = new HashSet<IObserver>();protected int mainState;protected int secondaryState;public void Attach(IObserver observer) => observers.Add(observer);public void Detach(IObserver observer) => observers.Remove(observer);public abstract void Notify();protected void PrepareUpdate(int main, int secondary){mainState = main;secondaryState = secondary;}public void ShowState(){Console.WriteLine($"State: [{mainState}, {secondaryState}]");}
}#endregion
6.3.3.2 觀察者接口
#region IObserver// 觀察者接口
interface IObserver
{void Update();
}#endregion
6.3.3.3 具體的的觀察者
#region ConcreteObservers// 具體觀察者
class StateObserver : IObserver
{private readonly Subject subject;public StateObserver(Subject subject){this.subject = subject;}public void Update(){Console.Write("Observer received update: ");subject.ShowState();}
}#endregion
6.3.3.4 自動觸發的目標對象
狀態發生改變時,自動觸發通知
#region AutoTriggerSubject// 自動觸發實現
class AutoTriggerSubject : Subject
{public void SetMainState(int value){mainState = value;Notify(); // 自動觸發}public override void Notify(){Console.WriteLine("[AutoTrigger] Notifying observers...");foreach (var observer in observers){observer.Update();}}
}#endregion
6.3.3.5 手動觸發的目標對象
#region ManualTriggerSubject// 手動觸發實現
class ManualTriggerSubject : Subject
{public void CompleteUpdate(int main, int secondary){PrepareUpdate(main, secondary);// 不自動觸發}public override void Notify(){Console.WriteLine("[ManualTrigger] Notifying observers...");foreach (var observer in observers){observer.Update();}}
}
#endregion
6.3.3.6 使用示例
#region Client Code// 自動觸發演示
Console.WriteLine("=== Automatic Trigger Demo ===");
var autoSubject = new AutoTriggerSubject();
var obs1 = new StateObserver(autoSubject);
autoSubject.Attach(obs1);autoSubject.SetMainState(10); // 觸發通知
autoSubject.SetMainState(20); // 再次觸發// 手動觸發演示
Console.WriteLine("\n=== Manual Trigger Demo ===");
var manualSubject = new ManualTriggerSubject();
var obs2 = new StateObserver(manualSubject);
manualSubject.Attach(obs2);manualSubject.CompleteUpdate(1, 100);
manualSubject.CompleteUpdate(2, 200);
manualSubject.CompleteUpdate(3, 300);
manualSubject.Notify(); // 單次觸發#endregion
結果
=== Automatic Trigger Demo ===
[AutoTrigger] Notifying observers...
Observer received update: State: [10, 0]
[AutoTrigger] Notifying observers...
Observer received update: State: [20, 0]=== Manual Trigger Demo ===
[ManualTrigger] Notifying observers...
Observer received update: State: [3, 300]
6.4 信息傳遞機制
觀察者模式中目標將這些信息作為Update操作的一個參數傳遞出去。這些信息的量可能很小,也可能很大。其信息量傳遞大小的兩個極端便是:推模型(Push Model)和拉模型(Pull Model)。
推模型:一種主動推送信息的機制,主題對象在狀態發生變化時,主動將包含具體變更信息的參數推送給所有觀察者,觀察者通過更新方法來接收這些信息。這種方式傳遞的是結構化數據,適用于需要接收完整、結構化數據且數據格式相對穩定的場景。
拉模型:一種按需獲取信息的機制,主題對象在狀態發生變化時,僅發送一個簡單的通知給觀察者,觀察者需要主動調用主題對象的查詢接口來獲取所需的信息。這種方式更加靈活,適用于觀察者需要不同數據子集或數據格式可能頻繁變化的場景。
6.4.1 解決方案與實現方式
6.4.1.1 推模型(Push Model)
-
實現原理
目標對象主動推送包含具體變更信息的參數至觀察者的更新方法,采用事件驅動機制傳遞結構化數據。
-
典型實現示例(氣象監測系統)
-
IObserver 接口
#region IObserver 接口 // 觀察者接口 public interface IObserver {void Update(WeatherData data); } #endregion
-
具體觀察者 :Display 類
#region 具體觀察者 :Display 類 // 具體觀察者 public class Display : IObserver {public void Update(WeatherData data){Console.WriteLine($"當前溫度:{data.Temperature}℃");} } #endregion
-
Subject 類: WeatherStation 類
SetMeasurements(float temp)
:設置新的溫度數據,并觸發通知。NotifyObservers(WeatherData data)
:遍歷觀察者列表,調用每個觀察者的Update
方法。
#region Subject 類: WeatherStation 類// 目標對象 public class WeatherStation {private List<IObserver> _observers = new List<IObserver>();private float _temperature;public void SetMeasurements(float temp){_temperature = temp;NotifyObservers(new WeatherData(temp));}private void NotifyObservers(WeatherData data){foreach (var observer in _observers){observer.Update(data);}}public void Attach(IObserver observer) => _observers.Add(observer);public void Detach(IObserver observer) => _observers.Remove(observer); }// 數據傳輸對象 public class WeatherData {public float Temperature { get; }public WeatherData(float temperature){Temperature = temperature;} }#endregion
-
Client
#region Client Code// 創建一個天氣站對象 WeatherStation weatherStation = new WeatherStation();// 創建兩個觀察者對象 Display display1 = new Display(); Display display2 = new Display();// 將觀察者添加到天氣站的觀察者列表 weatherStation.Attach(display1); weatherStation.Attach(display2);// 模擬天氣站更新數據 Console.WriteLine("天氣站更新溫度為 25.5℃:"); weatherStation.SetMeasurements(25.5f);// 移除一個觀察者 weatherStation.Detach(display1);// 再次更新數據 Console.WriteLine("\n天氣站更新溫度為 28.0℃:"); weatherStation.SetMeasurements(28.0f);Console.ReadLine();#endregion
結果
天氣站更新溫度為 25.5℃: 當前溫度:25.5℃ 當前溫度:25.5℃天氣站更新溫度為 28.0℃: 當前溫度:28℃
-
-
適用場景
- 觀察者需要接收完整、結構化的數據
- 數據格式相對穩定且預先可知
- 需要最小化觀察者的查詢操作
- 實時性要求高于通信成本
6.4.1.2 拉模型(Pull Model)
-
實現原理
目標對象僅發送簡單通知,觀察者主動調用目標對象的查詢接口獲取所需信息,實現按需獲取
-
典型實現示例
假設一個股票市場監控系統,目標對象是股票市場,觀察者是不同的投資者。當股票價格發生變化時,股票市場僅通知投資者價格發生了變化,而投資者需要主動查詢當前的股票價格。
-
目標和觀察者接口
#region Interface Code// 觀察者接口 public interface IObserver {void Update(); }// 目標接口 public interface ISubject {void Attach(IObserver observer);void Detach(IObserver observer);void Notify(); }#endregion
-
具體目標
#region Concrete Subject : StockMarket// 具體目標類 public class StockMarket : ISubject {private List<IObserver> observers = new List<IObserver>();private decimal stockPrice;public decimal StockPrice{get { return stockPrice; }set{stockPrice = value;Notify();}}public void Attach(IObserver observer){observers.Add(observer);}public void Detach(IObserver observer){observers.Remove(observer);}public void Notify(){foreach (var observer in observers){observer.Update();}} }#endregion
-
具體觀察者
通過
Update
方法接收通知,并從目標對象中獲取當前股票價格。#region Concrete Observer : Investor // 具體觀察者類 public class Investor : IObserver {private string name;private ISubject stockMarket;public Investor(string name, ISubject stockMarket){this.name = name;this.stockMarket = stockMarket;}public void Update(){decimal currentPrice = ((StockMarket)stockMarket).StockPrice;Console.WriteLine($"{name} received notification. Current stock price: {currentPrice:C}");} }#endregion
-
使用示例
#region Client Code// 創建股票市場對象 StockMarket stockMarket = new StockMarket();// 創建投資者 Investor investor1 = new Investor("Alice", stockMarket); Investor investor2 = new Investor("Bob", stockMarket);// 將投資者添加到股票市場的觀察者列表 stockMarket.Attach(investor1); stockMarket.Attach(investor2);// 模擬股票價格變化 Console.WriteLine("Stock price changes to $100."); stockMarket.StockPrice = 100;Console.WriteLine("Stock price changes to $120."); stockMarket.StockPrice = 120;// 移除一個投資者 stockMarket.Detach(investor1);Console.WriteLine("Stock price changes to $150."); stockMarket.StockPrice = 150;Console.ReadLine(); #endregion
結果
Stock price changes to $100. Alice received notification. Current stock price: ¥100.00 Bob received notification. Current stock price: ¥100.00 Stock price changes to $120. Alice received notification. Current stock price: ¥120.00 Bob received notification. Current stock price: ¥120.00 Stock price changes to $150. Bob received notification. Current stock price: ¥150.00
-
-
適用場景
- 適用于觀察者需要不同數據子集的情況。
- 數據格式可能頻繁變化,觀察者可以根據需要動態獲取數據。
- 通信成本不是主要瓶頸,因為觀察者主動查詢數據。
6.4.1.3 設計對比與權衡
維度 | 推模型 | 拉模型 |
---|---|---|
耦合度 | 高(需預知觀察者需求) | 低(觀察者自主決定獲取內容) |
通信效率 | 高(單次傳輸完整數據) | 低(需多次請求-響應) |
接口穩定性 | 要求高(參數結構需固定) | 要求低(僅需保持查詢接口) |
可擴展性 | 較差(新增觀察者類型需修改接口) | 較好(新觀察者可自主獲取數據) |
通過理解兩種模型的本質特征,可以根據具體業務需求、系統約束和演進方向,制定出最適合當前上下文的信息傳遞策略。
在分布式系統和微服務架構中,這種設計權衡往往直接影響系統的可維護性和擴展能力。
6.4.1.4 最佳實踐建議
-
混合模式設計
結合兩種模型的優勢,推送基礎變更通知,允許觀察者拉取補充信息。
-
觀察者接口
#region Interface Codepublic interface IObserver {void Update(int state); // 接收基礎狀態 }#endregion
-
具體目標
#region Subject public class Subject {private List<IObserver> observers = new List<IObserver>();private int state; // 被觀察者的狀態public int State{get { return state; }set{state = value;NotifyObservers(); // 狀態改變時通知觀察者}}// 注冊觀察者public void Attach(IObserver observer){observers.Add(observer);}// 移除觀察者public void Detach(IObserver observer){observers.Remove(observer);}// 通知觀察者public void NotifyObservers(){foreach (IObserver observer in observers){observer.Update(state); // 推送基礎狀態}} }#endregion
-
具體觀察者:ConcreteObserver
#region ConcreteObserverpublic class ConcreteObserver : IObserver {private Subject subject; // 持有被觀察者的引用public ConcreteObserver(Subject subject){this.subject = subject;subject.Attach(this); // 注冊到被觀察者}public void Update(int state){Console.WriteLine($"Received base state: {state}");// 根據需要拉取補充信息if (state > 10){int additionalInfo = subject.State; // 拉取補充信息Console.WriteLine($"Additional info: {additionalInfo}");}} }#endregion
-
使用示例:Client
#region Client CodeSubject subject = new Subject();// 創建觀察者并注冊到被觀察者 ConcreteObserver observer1 = new ConcreteObserver(subject); ConcreteObserver observer2 = new ConcreteObserver(subject);// 改變被觀察者的狀態 Console.WriteLine("Setting state to 5:"); subject.State = 5; // 輸出基礎狀態Console.WriteLine("\nSetting state to 15:"); subject.State = 15; // 輸出基礎狀態和補充信息#endregion
結果
Setting state to 5: Received base state: 5 Received base state: 5Setting state to 15: Received base state: 15 Additional info: 15 Received base state: 15 Additional info: 15
-
-
通信優化策略
實現思路
- 批量處理拉取請求:通過在
Subject
中維護一個隊列,將觀察者的拉取請求批量處理。 - 建立數據緩存機制:在
Subject
中緩存狀態信息,避免重復計算或重復拉取。 - 使用差異更新(delta update):僅推送狀態變化的部分,而不是完整狀態。
-
觀察者接口 IObserver
#region Interface Codepublic interface IObserver {void Update(int state); // 接收基礎狀態 }#endregion
-
目標對象 Subject
#region Subjectpublic class Subject {private List<IObserver> observers = new List<IObserver>();private int state; // 被觀察者的狀態private int previousState; // 上一次的狀態,用于差異更新public int State{get { return state; }set{if (state != value) // 檢查狀態是否變化{previousState = state; // 保存上一次狀態state = value;NotifyObservers(); // 狀態改變時通知觀察者}}}// 注冊觀察者public void Attach(IObserver observer){observers.Add(observer);}// 移除觀察者public void Detach(IObserver observer){observers.Remove(observer);}// 通知觀察者public void NotifyObservers(){foreach (IObserver observer in observers){observer.Update(state); // 推送基礎狀態}}// 提供拉取補充信息的接口public int GetAdditionalInfo(){// 模擬緩存機制:如果狀態未變,直接返回緩存值if (state == previousState){Console.WriteLine("Using cached additional info.");return previousState;}// 模擬拉取補充信息Console.WriteLine("Fetching additional info...");return state;} }#endregion
-
具體觀察者:ConcreteObserver
#region ConcreteObserverpublic class ConcreteObserver : IObserver {private Subject subject; // 持有被觀察者的引用public ConcreteObserver(Subject subject){this.subject = subject;subject.Attach(this); // 注冊到被觀察者}public void Update(int state){Console.WriteLine($"Received base state: {state}");// 根據需要拉取補充信息if (state > 10){int additionalInfo = subject.GetAdditionalInfo(); // 拉取補充信息Console.WriteLine($"Additional info: {additionalInfo}");}} }#endregion
-
使用示例 Client
#region Client CodeSubject subject = new Subject();// 創建觀察者并注冊到被觀察者 ConcreteObserver observer1 = new ConcreteObserver(subject); ConcreteObserver observer2 = new ConcreteObserver(subject);// 改變被觀察者的狀態 Console.WriteLine("Setting state to 5:"); subject.State = 5; // 輸出基礎狀態Console.WriteLine("\nSetting state to 15:"); subject.State = 15; // 輸出基礎狀態和補充信息Console.WriteLine("\nSetting state to 15 again (cached info will be used):"); subject.State = 15; // 使用緩存信息 #endregion
結果
Setting state to 5: Received base state: 5 Received base state: 5Setting state to 15: Received base state: 15 Fetching additional info... Additional info: 15 Received base state: 15 Fetching additional info... Additional info: 15Setting state to 15 again (cached info will be used):
- 批量處理拉取請求:通過在
6.5 資源管理和錯誤處理
避免已刪除目標的懸掛引用
刪除目標時,需確保其觀察者中不遺留對該目標的無效引用。
否則,當觀察者嘗試訪問已銷毀的目標時,可能會引發錯誤或異常,導致程序崩潰或行為不可預測。
為了避免這種情況,可以在目標對象被銷毀之前,主動通知所有觀察者解除對其的訂閱。
示例
-
目標接口和觀察者接口
#region Interface// 定義觀察者接口 public interface IObserver {void Update(string message);void Unsubscribe(); }// 定義目標接口 public interface ISubject {void Attach(IObserver observer);void Detach(IObserver observer);void Notify(string message); }#endregion
-
具體目標類 Subject
在目標對象被銷毀之前,通知所有觀察者解除訂閱
#region 定義具體目標類 : Subject// 定義具體目標(主題)類 public class Subject : ISubject {private List _observers = new List();public void Attach(IObserver observer){_observers.Add(observer);}public void Detach(IObserver observer){_observers.Remove(observer);}public void Notify(string message){foreach (var observer in _observers){observer.Update(message);}}// 在目標對象被銷毀之前,通知所有觀察者解除訂閱public void Dispose(){foreach (var observer in _observers){observer.Unsubscribe();}_observers.Clear();} }#endregion
-
具體觀察者類 ConcreteObserver
#region 具體觀察者類 : ConcreteObserver // 定義具體觀察者類 public class ConcreteObserver : IObserver {private string _name;private Subject _subject;public ConcreteObserver(string name, Subject subject){_name = name;_subject = subject;}public void Update(string message){Console.WriteLine($"{_name} received message: {message}");}public void Unsubscribe(){if (_subject != null){_subject.Detach(this);_subject = null;}} } #endregion
-
使用實例 Client Code
在銷毀目標對象之前,通知所有觀察者解除訂閱.
#region Client CodeSubject subject = new Subject();IObserver observer1 = new ConcreteObserver("observer 1", subject); IObserver observer2 = new ConcreteObserver("observer 2", subject);subject.Attach(observer1); subject.Attach(observer2);subject.Notify("Hello Observers!");// 在銷毀目標對象之前,通知所有觀察者解除訂閱 subject.Dispose();// 嘗試再次通知(應該不會有任何效果,因為觀察者已被移除) subject.Notify("This should not be received.");#endregion
結果
Observer 1 received message: Hello Observers! Observer 2 received message: Hello Observers!