Requestor?類是一個請求-響應管理器,負責發送請求并處理響應,支持三種交互模式:同步、異步和回調。它跟蹤所有發出的請求,當響應到達時將其匹配到對應的請求并進行處理。
newDescribe 函數解析
newDescribe?函數負責創建和注冊一個新的請求描述對象,它是請求-響應匹配系統的核心部分。
RequestDescribe::ptr newDescribe(const BaseMessage::ptr &req, RType rtype, const RequestCallback &cb = RequestCallback()) {std::unique_lock<std::mutex> lock(_mutex);RequestDescribe::ptr rd = std::make_shared<RequestDescribe>();rd->request = req;rd->rtype = rtype;if (rtype == RType::REQ_CALLBACK && cb) {rd->callback = cb;}_request_desc.insert(std::make_pair(req->rid(), rd));return rd;}
功能詳解
- 創建請求記錄:為每個發出的請求創建一個跟蹤記錄
- 設置請求屬性:記錄請求類型和回調函數
- 注冊到映射表:將請求描述添加到全局映射表中
- 線程安全處理:確保多線程環境下操作安全
參數說明
- req:請求消息對象
- rtype:請求類型(同步、異步或回調)
- cb:可選的回調函數,默認為空
執行流程
- 加鎖保護共享數據?std::unique_lock<std::mutex> lock(_mutex)
- 創建請求描述對象?rd = std::make_shared<RequestDescribe>()
- 設置請求消息?rd->request?= req
- 設置請求類型?rd->rtype =?rtype
- 如果是回調類型且回調函數有效,則設置回調?rd->callback?= cb
- 將請求描述插入映射表?_request_desc.insert(std::make_pair(req->rid(), rd))
- 返回創建的請求描述對象
生活類比
想象這個函數是郵局的"寄件登記處":
顧客到達:你帶著一封信(請求消息)來到郵局
填寫單據:工作人員拿出一張跟蹤單(RequestDescribe)
記錄信息:
- 工作人員記錄你的信件信息(request)
- 標記你選擇的服務類型(rtype)
- 如果你選擇了通知服務,記錄你的電話號碼(callback)
系統登記:工作人員將跟蹤單輸入電腦系統(request_desc映射表)
單據存檔:跟蹤單一份交給你,一份存檔(返回rd)
getDescribe?函數解析
getDescribe?函數負責根據請求ID查找對應的請求描述對象,是響應處理過程中的關鍵環節。
RequestDescribe::ptr getDescribe(const std::string &rid) {std::unique_lock<std::mutex> lock(_mutex);auto it = _request_desc.find(rid);if (it == _request_desc.end()) {return RequestDescribe::ptr();}return it->second;}
功能詳解
- 查找請求記錄:根據請求ID在映射表中查找對應的請求描述
- 返回查詢結果:如果找到則返回請求描述對象,否則返回空指針
- 線程安全處理:確保多線程環境下操作安全
參數說明
- rid:請求ID,用于標識特定請求
執行流程
- 加鎖保護共享數據?std::unique_lock<std::mutex> lock(_mutex)
- 在映射表中查找指定ID的請求描述?it?= _request_desc.find(rid)
- 如果未找到(迭代器指向末尾),返回空指針
- 如果找到,返回對應的請求描述對象
生活類比
想象這個函數是郵局的"查詢窗口":
- 顧客查詢:當回信到達時,分揀員需要知道這封信是誰寄的
- 提供編號:分揀員拿著信封上的編號(rid)去查詢系統
- 系統查詢:工作人員在電腦系統中輸入編號(request_desc.find)
- 查詢結果:
- 如果找到記錄,系統顯示寄件人信息和服務類型(返回請求描述)
- 如果找不到記錄,系統顯示"未知寄件人"(返回空指針)
delDescribe?函數解析
delDescribe?函數負責從映射表中刪除指定請求ID的請求描述對象,是請求-響應流程完成后的清理步驟。
void delDescribe(const std::string &rid) {std::unique_lock<std::mutex> lock(_mutex);_request_desc.erase(rid);}
功能詳解
- 刪除請求記錄:從全局映射表中移除已處理完成的請求描述
- 資源釋放:釋放不再需要的請求描述對象占用的內存
- 線程安全處理:確保多線程環境下操作安全
參數說明
- rid:要刪除的請求ID
執行流程
- 加鎖保護共享數據?std::unique_lock<std::mutex> lock(_mutex)
- 從映射表中刪除指定ID的條目?_request_desc.erase(rid)
生活類比
想象這個函數是郵局的"歸檔清理"過程:
- 任務完成:當一封信的收發過程徹底完成后
- 記錄清理:工作人員將這封信的跟蹤單從活躍系統中移除
- 存檔或銷毀:跟蹤單可能被歸檔或直接銷毀(內存釋放)
- 系統更新:更新電腦系統,移除該編號的記錄(映射表刪除)
void delDescribe(const std::string &rid) {std::unique_lock<std::mutex> lock(_mutex);_request_desc.erase(rid);}
刪除的時候為什么要加鎖呢,是因為如果刪除之后再去刪除會造成空指針嗎?
加鎖的原因并不是為了防止刪除后再次刪除導致的空指針問題,而是為了解決多線程并發訪問帶來的數據競爭和一致性問題。主要原因有:
數據一致性保護:
- 在多線程環境中,可能同時有多個線程訪問和修改?_request_desc?映射表
- 如果一個線程正在讀取映射表時,另一個線程刪除了某個條目,可能導致第一個線程訪問無效內存
容器迭代器失效:
- 在 C++ 標準容器中,當進行修改操作(如 erase)時,可能導致迭代器失效
- 如果一個線程持有映射表的迭代器,而另一個線程調用 erase,可能導致第一個線程的迭代器變為無效
原子性保證:
- 映射表的修改操作需要保證原子性,即操作要么完全執行,要么完全不執行
- 沒有鎖保護,可能導致修改操作被部分執行,使映射表處于不一致狀態
與其他操作互斥:
- 同一個映射表上執行的增加、查找、刪除操作需要互斥進行
- 鎖確保在一個操作完成前,不會執行其他可能干擾的操作
生活類比:
想象郵局的記錄系統是一本大賬本,多個工作人員需要同時使用:
- 如果一個工作人員正在查看第5頁記錄
- 同時另一個工作人員撕掉了這一頁(刪除)
- 第一個工作人員會突然發現自己正在查看的頁面不見了
為了防止這種情況,郵局規定:任何人要修改賬本時,必須先取得"賬本鑰匙"(鎖),使用完畢后歸還鑰匙。這樣確保任何時候只有一個人能修改賬本,保證賬本內容的一致性和完整性。
所以,加鎖不是為了防止刪除后的空指針問題(erase對不存在的鍵是安全的),而是為了保護映射表在多線程環境下的一致性和完整性。
onResponse 函數解析
onResponse?函數是整個 Requestor 類的核心,它處理收到的響應消息并將其路由到對應的處理機制。
功能詳解
- 響應匹配:根據響應ID找到對應的請求描述
- 類型分發:根據請求類型選擇不同的處理方式
- 結果通知:通過不同機制通知等待者響應已到達
- 資源清理:處理完后刪除請求描述
參數說明
- conn:收到響應的連接對象
- msg:響應消息
執行流程
- 從響應消息獲取請求ID?rid = msg->rid()
- 查找對應的請求描述?rdp?= getDescribe(rid)
- 檢查是否找到請求描述,未找到則記錄錯誤并返回
- 根據請求類型進行處理:
- 異步請求:設置 promise 值?rdp->response.set_value(msg)
- 回調請求:檢查回調函數是否存在,然后調用?rdp->callback(msg)
- 其他類型:記錄未知類型錯誤
- 處理完成后刪除請求描述?delDescribe(rid)
生活類比
想象這個函數是郵局的"回信分揀處":
回信到達:郵局收到一封回信(響應消息)
查找記錄:工作人員查看回信編號(rid),在系統中查詢(getDescribe)
檢查記錄:
- 如果找不到記錄,記錄異常情況("無人認領的回信")
- 如果找到記錄,查看寄件人選擇的服務類型(rtype)
分發處理:
- 如果是"跟蹤號服務"(REQ_ASYNC),更新系統狀態,標記包裹已到達(set_value)
- 如果是"通知服務"(REQ_CALLBACK),打電話通知寄件人(callback)
- 如果是未知服務,記錄錯誤
清理記錄:處理完成后,從活躍系統中刪除該記錄(delDescribe)
異步 send 函數
bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, AsyncResponse &async_rsp) {RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_ASYNC);if (rdp.get() == nullptr) {ELOG("構造請求描述對象失敗!");return false;}conn->send(req);async_rsp = rdp->response.get_future();return true;}
功能詳解
- 異步發送請求并返回 future?對象
- 允許調用者在未來某個時間點獲取結果
執行流程
- 創建異步類型的請求描述?rdp = newDescribe(req, RType::REQ_ASYNC)
- 檢查創建是否成功
- 發送請求?conn->send(req)
- 獲取?future 對象?async_rsp = rdp->response.get_future()
生活類比
這就像快遞跟蹤服務:
- 你寄出包裹(發送請求)
- 快遞公司給你一個跟蹤號(future 對象)
- 你可以隨時通過跟蹤號查詢狀態
- 當你需要知道包裹是否送達時,查詢跟蹤號(調用 get() 或 wait())
?同步 send 函數
bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, BaseMessage::ptr &rsp) {AsyncResponse rsp_future;bool ret = send(conn, req, rsp_future);if (ret == false) {return false;}rsp = rsp_future.get();return true;}
功能詳解
- 同步發送請求并阻塞等待響應
- 簡化接口,對調用者隱藏異步細節
執行流程
- 創建臨時 future 對象?AsyncResponse rsp_future
- 調用異步版本的 send 函數
- 檢查發送是否成功
- 阻塞等待響應?rsp =?rsp_future.get()
生活類比
這就像窗口服務:
- 你去政務大廳辦理業務(發送請求)
- 工作人員讓你在窗口等著(阻塞等待)
- 你不能離開,直到業務辦完(響應返回)
- 辦完后工作人員直接把結果給你(設置 rsp)
回調式 send 函數
bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, const RequestCallback &cb) {RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_CALLBACK, cb);if (rdp.get() == nullptr) {ELOG("構造請求描述對象失敗!");return false;}conn->send(req);return true;}
功能詳解
- 發送請求并注冊回調函數
- 當響應到達時自動調用回調函數
- 適合事件驅動編程模型
執行流程
- 創建回調類型的請求描述?rdp = newDescribe(req, RType::REQ_CALLBACK, cb)
- 檢查創建是否成功
- 發送請求?conn->send(req)
生活類比
這就像叫號服務:
- 你去餐廳點餐(發送請求)
- 服務員給你一個取餐號,告訴你"好了會叫你"(注冊回調)
- 你可以去做其他事(不阻塞)
- 餐點準備好時,廣播系統會叫你的號(回調函數被調用)
生活類比:郵政系統
想象?Requestor 是一個現代郵政系統,處理信件的發送和接收:
RequestDescribe(請求描述)= 郵件跟蹤單
當你寄出一封信時,郵局會給你一個跟蹤單,上面記錄了:
- 你寄出的信件內容(request)
- 你希望如何收到回復(rtype)
- 你的聯系方式(callback 或 response)
request_desc(請求映射表)=?郵局的跟蹤系統數據庫
郵局維護一個數據庫,記錄所有寄出的信件及其跟蹤單。當回信到達時,可以查詢這個數據庫找到對應的寄件人信息。
三種 send 方法?= 三種不同的郵寄服務
同步 send(等待型服務)
bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, BaseMessage::ptr &rsp)
就像你去郵局寄信并在窗口等待回復。你不會離開,直到收到回信。
異步 send(跟蹤號服務)
bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, AsyncResponse &async_rsp)
你寄信后得到一個訂單號,可以隨時查詢進度,但不必一直等待。
回調 send(通知服務)
bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, const RequestCallback &cb)
你寄信時留下電話號碼,郵局承諾收到回信時會打電話通知你。
onResponse 方法?= 郵局的分揀員
void onResponse(const BaseConnection::ptr &conn, BaseMessage::ptr &msg)
當回信到達郵局時,分揀員查看信封上的編號,找到對應的跟蹤單,然后根據寄件人選擇的服務類型進行處理:- 如果是等待型服務,通知窗口的工作人員- 如果是跟蹤號服務,更新系統狀態-?如果是通知服務,打電話給寄件人
線程安全機制 = 郵局的工作流程規范
std::unique_lock<std::mutex> lock(_mutex);
就像郵局有嚴格的工作流程,確保多個工作人員不會同時修改同一條記錄,避免混亂。
完整流程類比
同步調用流程
1. 你(調用者)去郵局(Requestor)寄一封信(請求)
2. 郵局記錄你的信件信息,并給你一個號碼牌(請求ID)
3. 你坐在等候區,等待回信(阻塞等待)
4. 回信到達后,分揀員找到你的號碼牌,通知你取信(設置結果)
5. 你收到回信,離開郵局(函數返回)
異步調用流程
1. 你去郵局寄信,但不想等待
2. 郵局給你一個跟蹤號(future)
3. 你離開郵局,去做其他事情(非阻塞)
4. 當你需要查看結果時,使用跟蹤號查詢(future.get())
5. 如果回信已到,立即獲得;如果未到,等待直到到達
回調調用流程
1. 你去郵局寄信,留下電話號碼(回調函數)
2. 郵局記錄你的聯系方式,你離開去做其他事
3. 回信到達時,郵局根據記錄的電話號碼通知你(調用回調)
4.?你收到通知,了解回信內容(在回調中處理結果)這個系統確保了無論你選擇哪種服務方式,你的信件和回復都能可靠地處理,而且多個用戶的請求不會混淆。
#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <future>
#include <functional>namespace bitrpc {namespace client {class Requestor {public:using ptr = std::shared_ptr<Requestor>;using RequestCallback = std::function<void(const BaseMessage::ptr&)>;using AsyncResponse = std::future<BaseMessage::ptr>;struct RequestDescribe {using ptr = std::shared_ptr<RequestDescribe>;BaseMessage::ptr request;RType rtype;std::promise<BaseMessage::ptr> response;RequestCallback callback;};void onResponse(const BaseConnection::ptr &conn, BaseMessage::ptr &msg){std::string rid = msg->rid();RequestDescribe::ptr rdp = getDescribe(rid);if (rdp.get() == nullptr) {ELOG("收到響應 - %s,但是未找到對應的請求描述!", rid.c_str());return;}if (rdp->rtype == RType::REQ_ASYNC) {rdp->response.set_value(msg);}else if (rdp->rtype == RType::REQ_CALLBACK){if (rdp->callback) rdp->callback(msg);}else {ELOG("請求類型未知!!");}delDescribe(rid);}bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, AsyncResponse &async_rsp) {RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_ASYNC);if (rdp.get() == nullptr) {ELOG("構造請求描述對象失敗!");return false;}conn->send(req);async_rsp = rdp->response.get_future();return true;}bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, BaseMessage::ptr &rsp) {AsyncResponse rsp_future;bool ret = send(conn, req, rsp_future);if (ret == false) {return false;}rsp = rsp_future.get();return true;}bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, const RequestCallback &cb) {RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_CALLBACK, cb);if (rdp.get() == nullptr) {ELOG("構造請求描述對象失敗!");return false;}conn->send(req);return true;}private:RequestDescribe::ptr newDescribe(const BaseMessage::ptr &req, RType rtype, const RequestCallback &cb = RequestCallback()) {std::unique_lock<std::mutex> lock(_mutex);RequestDescribe::ptr rd = std::make_shared<RequestDescribe>();rd->request = req;rd->rtype = rtype;if (rtype == RType::REQ_CALLBACK && cb) {rd->callback = cb;}_request_desc.insert(std::make_pair(req->rid(), rd));return rd;}RequestDescribe::ptr getDescribe(const std::string &rid) {std::unique_lock<std::mutex> lock(_mutex);auto it = _request_desc.find(rid);if (it == _request_desc.end()) {return RequestDescribe::ptr();}return it->second;}void delDescribe(const std::string &rid) {std::unique_lock<std::mutex> lock(_mutex);_request_desc.erase(rid);}private:std::mutex _mutex;std::unordered_map<std::string, RequestDescribe::ptr> _request_desc;};}
}
完整代碼
#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <future>
#include <functional>namespace bitrpc {namespace client {class Requestor {public:using ptr = std::shared_ptr<Requestor>;using RequestCallback = std::function<void(const BaseMessage::ptr&)>;using AsyncResponse = std::future<BaseMessage::ptr>;struct RequestDescribe {using ptr = std::shared_ptr<RequestDescribe>;BaseMessage::ptr request;RType rtype;std::promise<BaseMessage::ptr> response;RequestCallback callback;};void onResponse(const BaseConnection::ptr &conn, BaseMessage::ptr &msg){std::string rid = msg->rid();RequestDescribe::ptr rdp = getDescribe(rid);if (rdp.get() == nullptr) {ELOG("收到響應 - %s,但是未找到對應的請求描述!", rid.c_str());return;}if (rdp->rtype == RType::REQ_ASYNC) {rdp->response.set_value(msg);}else if (rdp->rtype == RType::REQ_CALLBACK){if (rdp->callback) rdp->callback(msg);}else {ELOG("請求類型未知!!");}delDescribe(rid);}bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, AsyncResponse &async_rsp) {RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_ASYNC);if (rdp.get() == nullptr) {ELOG("構造請求描述對象失敗!");return false;}conn->send(req);async_rsp = rdp->response.get_future();return true;}bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, BaseMessage::ptr &rsp) {AsyncResponse rsp_future;bool ret = send(conn, req, rsp_future);if (ret == false) {return false;}rsp = rsp_future.get();return true;}bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, const RequestCallback &cb) {RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_CALLBACK, cb);if (rdp.get() == nullptr) {ELOG("構造請求描述對象失敗!");return false;}conn->send(req);return true;}private:RequestDescribe::ptr newDescribe(const BaseMessage::ptr &req, RType rtype, const RequestCallback &cb = RequestCallback()) {std::unique_lock<std::mutex> lock(_mutex);RequestDescribe::ptr rd = std::make_shared<RequestDescribe>();rd->request = req;rd->rtype = rtype;if (rtype == RType::REQ_CALLBACK && cb) {rd->callback = cb;}_request_desc.insert(std::make_pair(req->rid(), rd));return rd;}RequestDescribe::ptr getDescribe(const std::string &rid) {std::unique_lock<std::mutex> lock(_mutex);auto it = _request_desc.find(rid);if (it == _request_desc.end()) {return RequestDescribe::ptr();}return it->second;}void delDescribe(const std::string &rid) {std::unique_lock<std::mutex> lock(_mutex);_request_desc.erase(rid);}private:std::mutex _mutex;std::unordered_map<std::string, RequestDescribe::ptr> _request_desc;};}
}