registryMethod?函數詳解:
函數目的
registryMethod?是?Provider?類的核心方法,用于向服務注冊中心注冊服務。注冊成功后,服務注冊中心會更新內部的服務映射表,建立服務名稱到提供者地址的映射關系。
執行流程示例
場景: 多米諾注冊芝士披薩服務
// 多米諾披薩店注冊芝士披薩服務
Provider::ptr provider = std::make_shared<Provider>(requestor);
BaseConnection::ptr conn = ConnectionFactory::createConnection("127.0.0.1", 8000);
Address pizzaHost("192.168.1.101", 9001);
bool success = provider->registryMethod(conn, "芝士披薩", pizzaHost);
- 創建服務注冊請求消息:
auto msg_req = MessageFactory::create<ServiceRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_SERVICE);msg_req->setMethod("芝士披薩");msg_req->setHost({"192.168.1.101", 9001});msg_req->setOptype(ServiceOptype::SERVICE_REGISTRY);
發送請求并等待響應:
BaseMessage::ptr msg_rsp;bool ret = _requestor->send(conn, msg_req, msg_rsp);
服務注冊中心處理請求(內部過程,不在當前函數中):
// 服務注冊中心內部的處理邏輯void PDManager::onServiceRequest(const BaseConnection::ptr &conn, const ServiceRequest::ptr &msg) {if (msg->optype() == ServiceOptype::SERVICE_REGISTRY) {std::string method = msg->method();Address host = msg->host();// 更新映射表:將服務方法與提供者關聯_providers->addProvider(conn, host, method);// 映射表變化:// 之前: _method_providers["芝士披薩"] = [...]// 之后: _method_providers["芝士披薩"] = [..., {"192.168.1.101", 9001}]// 通知所有關注此服務的客戶端_discoverers->onlineNotify(method, host);// 發送響應ServiceResponse::ptr rsp = MessageFactory::create<ServiceResponse>();rsp->setRcode(RCode::RCODE_OK);conn->sendMessage(rsp);}// ...其他操作類型處理...}
處理響應:
auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if (service_rsp.get() == nullptr) {ELOG("響應類型向下轉換失敗!");return false;}
檢查響應狀態
if (service_rsp->rcode() != RCode::RCODE_OK) {ELOG("服務注冊失敗,原因:%s", errReason(service_rsp->rcode()).c_str());return false;}
注冊成功,映射表更新完成:
return true;
映射表變化詳解
- 注冊前的映射表狀態:
_method_providers = {"夏威夷披薩": [{"192.168.1.102", 9001}],"素食披薩": [{"192.168.1.103", 9001}]}
注冊后的映射表狀態:
_method_providers = {"夏威夷披薩": [{"192.168.1.102", 9001}],"素食披薩": [{"192.168.1.103", 9001}],"芝士披薩": [{"192.168.1.101", 9001}] // 新增映射}
如果是已有服務新增提供者:
// 如果另一家披薩店也注冊提供芝士披薩_method_providers = {"夏威夷披薩": [{"192.168.1.102", 9001}],"素食披薩": [{"192.168.1.103", 9001}],"芝士披薩": [{"192.168.1.101", 9001}, {"192.168.1.104", 9002}] // 添加新提供者}
注冊成功后,服務注冊中心還會通過?onlineNotify?通知所有關注"芝士披薩"服務的客戶端,有新的服務提供者上線,客戶端的?Discoverer?會更新本地緩存的服務提供者列表。
生活中的類比
新餐廳在美食外賣平臺注冊
想象一下,registryMethod?就像一家新開的餐廳(服務提供者)在美團或餓了么這樣的外賣平臺(服務注冊中心)上注冊自己的特色菜品(服務)。
場景:小王的川菜館注冊麻婆豆腐服務
現實生活中的流程:
小王開了一家川菜館,想在外賣平臺上注冊提供麻婆豆腐。
小王登錄外賣平臺商家端,填寫注冊表單:
- 店鋪名稱:小王川菜館
- 地址:北京市海淀區中關村大街128號
- 特色菜品:麻婆豆腐
- 聯系電話:010-12345678
小王提交注冊申請,等待平臺審核。
外賣平臺處理注冊請求:
- 驗證小王提供的信息
- 將"麻婆豆腐"與"小王川菜館"關聯起來
- 更新平臺的菜品目錄
注冊成功,平臺向小王發送確認消息。
平臺更新用戶端,現在用戶搜索"麻婆豆腐"時,可以看到小王川菜館。
代碼中對應的流程
// 小王川菜館注冊麻婆豆腐服務
Provider::ptr provider = std::make_shared<Provider>(requestor);
BaseConnection::ptr conn = ConnectionFactory::createConnection("delivery-platform.com", 8000);
Address restaurantAddress("北京市海淀區中關村大街128號", 12345678); // 地址和電話
bool success = provider->registryMethod(conn, "麻婆豆腐", restaurantAddress);
映射表變化:
注冊前的外賣平臺菜品目錄:
菜品目錄 = {"宮保雞丁": ["老張川菜館", "蜀香小館"],"水煮魚": ["蜀香小館", "渝味軒"]}
注冊后的外賣平臺菜品目錄
菜品目錄 = {"宮保雞丁": ["老張川菜館", "蜀香小館"],"水煮魚": ["蜀香小館", "渝味軒"],"麻婆豆腐": ["小王川菜館"] // 新增映射}
用戶體驗變化:
- 之前用戶搜索"麻婆豆腐"時,找不到提供這道菜的餐廳
- 注冊成功后,用戶搜索"麻婆豆腐"時,會看到"小王川菜館"
- 平臺會向已經收藏"麻婆豆腐"的用戶推送通知:"小王川菜館現在提供麻婆豆腐啦!"
MethodHost?類詳解:
類目的
MethodHost?類是一個服務地址管理器,負責存儲和管理能夠提供特定服務的所有主機地址,并提供負載均衡功能。它是服務發現機制中的核心組件,用于維護服務提供者列表并支持動態更新。
方法詳解與執行流程
場景1: 麻辣燙服務新增提供者
當"老王麻辣燙"店鋪上線,開始提供麻辣燙服務:
// 服務注冊中心收到上線通知后
MethodHost::ptr methodHost = _method_hosts["麻辣燙"];
methodHost->appendHost({"192.168.1.105", 8080}); // 老王麻辣燙的地址
執行步驟:
- 獲取互斥鎖保護共享數據:std::unique_lock<std::mutex> lock(_mutex)
- 添加新主機到列表:_hosts.push_back({"192.168.1.105",?8080})
映射表變化
之前: _hosts = [{"192.168.1.101", 8080}] // 只有小李麻辣燙之后: _hosts = [{"192.168.1.101", 8080}, {"192.168.1.105", 8080}] // 增加了老王麻辣燙
場景2:?麻辣燙服務提供者下線
當"小李麻辣燙"暫停營業,需要從服務列表中移除:
// 服務注冊中心檢測到連接斷開后
MethodHost::ptr methodHost = _method_hosts["麻辣燙"];
methodHost->removeHost({"192.168.1.101", 8080}); // 小李麻辣燙的地址
執行步驟:
- 獲取互斥鎖保護共享數據:std::unique_lock<std::mutex> lock(_mutex)
- 遍歷主機列表查找匹配地址:
for (auto it = _hosts.begin(); it != _hosts.end(); ++it) {if (*it == {"192.168.1.101", 8080}) {_hosts.erase(it);break;}}
映射表變化:
之前: _hosts = [{"192.168.1.101", 8080}, {"192.168.1.105", 8080}]之后: _hosts = [{"192.168.1.105", 8080}] // 只剩下老王麻辣燙
- 現在客戶端調用chooseHost()時,只會返回老王麻辣燙的地址
生活中的例子
想象一個外賣平臺的"麻辣燙"分類頁面:
初始狀態:
- 頁面顯示3家麻辣燙店鋪:小李、老王、小張
- 平臺后臺維護一個麻辣燙店鋪列表:[小李,?老王,?小張]
新店開業(appendHost):
- 小趙開了一家新麻辣燙店并在平臺注冊
- 平臺將小趙的店添加到列表:[小李, 老王, 小張, 小趙]
- 用戶刷新頁面,現在能看到4家麻辣燙店鋪
店鋪暫停營業(removeHost):
- 小張的店因裝修暫停營業
- 平臺將小張的店從列表移除:[小李,?老王,?小趙]
- 用戶刷新頁面,現在只能看到3家麻辣燙店鋪
負載均衡(chooseHost):
- 當用戶點擊"隨機推薦一家"按鈕時
- 平臺會輪流推薦列表中的店鋪,確保每家店獲得平等的展示機會
MethodHost類就像這個平臺的后臺管理系統,動態維護服務提供者列表,并在客戶端請求時提供負載均衡的選擇。
Discoverer?類詳解:
類目的
Discoverer?類是RPC框架中的服務發現組件,負責查找和管理可用的服務提供者。它的主要職責是:
- 發現并獲取能提供特定服務的主機地址
- 緩存服務提供者信息,提高查詢效率
- 處理服務上線和下線通知,動態更新服務提供者列表
- 提供負載均衡功能,在多個服務提供者之間分配請求
核心功能
服務發現(serviceDiscovery方法):
- 查找能提供特定服務的主機
- 先檢查本地緩存,如無則向服務注冊中心查詢
- 將查詢結果緩存起來供后續使用
服務狀態變更處理(onServiceRequest方法):
- 處理服務上線通知,添加新的服務提供者
- 處理服務下線通知,移除不可用的服務提供者
- 調用下線回調函數處理相關資源清理
生活中的例子
Discoverer類就像一個智能外賣APP:
查找服務(serviceDiscovery):
- 用戶想點一份特定菜品(如"麻辣香鍋")
- APP先檢查緩存,看是否有最近瀏覽過的提供該菜品的餐廳
- 如果沒有,則向服務器查詢提供"麻辣香鍋"的餐廳列表
- 獲取列表后,選擇一家餐廳(負載均衡)并緩存整個列表
處理餐廳狀態變更(onServiceRequest):
- 新餐廳上線:APP收到通知"新餐廳'川香閣'開始提供麻辣香鍋",更新列表
- 餐廳下線:APP收到通知"'香辣坊'暫停營業",從列表中移除并取消相關訂單
關鍵組件
_method_hosts:服務方法到主機列表的映射表
- 鍵:服務方法名(如"用戶認證"、"支付處理")
- 值:提供該服務的主機列表(MethodHost對象)
_requestor:負責發送請求和接收響應的組件
_offline_callback:服務下線時的回調函數,用于處理資源清理
_mutex:互斥鎖,保護在多線程環境下對共享數據的訪問
總結
Discoverer類是分布式系統中服務發現的核心組件,它使客戶端能夠:
- 動態發現可用的服務提供者
- 在多個提供者之間實現負載均衡
- 實時感知服務提供者的變化
- 通過本地緩存提高查詢效率
這使得系統具有高可用性、可擴展性和彈性,服務提供者可以動態加入或離開系統,而客戶端能夠自動適應這些變化。
serviceDiscovery?方法詳解:
方法目的
serviceDiscovery?是?Discoverer?類的核心方法,用于發現和獲取能提供特定服務的主機地址。它首先檢查本地緩存,如果沒有找到則向服務注冊中心發起查詢,并將結果緩存起來以供后續使用。
執行流程示例
場景: 用戶查找附近的奶茶店
想象一個用戶想要點一杯奶茶,使用APP查找附近提供奶茶服務的店鋪:
// 用戶APP初始化
Discoverer::ptr discoverer = std::make_shared<Discoverer>(requestor, offlineCallback);
BaseConnection::ptr conn = ConnectionFactory::createConnection("service-center.com", 8000);
Address milkTeaShop;
bool found = discoverer->serviceDiscovery(conn, "奶茶", milkTeaShop);
執行步驟:
- 檢查本地緩存:
{std::unique_lock<std::mutex> lock(_mutex);auto it = _method_hosts.find("奶茶");if (it != _method_hosts.end()) {if (it->second->empty() == false) {milkTeaShop = it->second->chooseHost();return true; // 成功找到緩存的奶茶店地址}}}
- 如果APP之前已經查詢過奶茶店,會直接從緩存返回一個地址
- 類似用戶打開APP時,顯示上次瀏覽過的奶茶店
緩存未命中,創建服務發現請求:
auto msg_req = MessageFactory::create<ServiceRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_SERVICE);msg_req->setMethod("奶茶");msg_req->setOptype(ServiceOptype::SERVICE_DISCOVERY);
- 類似用戶在APP搜索框輸入"奶茶"并點擊搜索
- APP向服務器發送查詢請求
發送請求并等待響應:
BaseMessage::ptr msg_rsp;bool ret = _requestor->send(conn, msg_req, msg_rsp);if (ret == false) {ELOG("服務發現失敗!");return false;}
- APP向服務中心發送請求
- 如果網絡問題導致請求失敗,顯示"搜索失敗,請檢查網絡"
處理響應:
auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if (!service_rsp) {ELOG("服務發現失敗!響應類型轉換失敗!");return false;}if (service_rsp->rcode() != RCode::RCODE_OK) {ELOG("服務發現失敗!%s", errReason(service_rsp->rcode()).c_str());return false;}
- 檢查服務器返回的響應是否有效
- 類似APP收到服務器響應后檢查數據格式是否正確
更新本地緩存并返回結果:
std::unique_lock<std::mutex> lock(_mutex);auto method_host = std::make_shared<MethodHost>(service_rsp->hosts());if (method_host->empty()) {ELOG("%s 服務發現失敗!沒有能夠提供服務的主機!", method.c_str());return false;}milkTeaShop = method_host->chooseHost();_method_hosts["奶茶"] = method_host;return true;
- 將服務器返回的奶茶店列表保存到本地緩存
- 從列表中選擇一家奶茶店返回給用戶
- 下次用戶再查詢奶茶店時可以直接從緩存獲取
生活中的例子
想象小明想喝奶茶,但不知道附近有哪些奶茶店:
小明先檢查記憶(檢查本地緩存):
- 小明回憶上次是在"一點點"買的奶茶
- 如果記得路線,他可以直接去那里(緩存命中)
- 如果忘記了或想嘗試新店,需要查詢(緩存未命中)
小明問路人(發送服務發現請求):
- "請問附近有奶茶店嗎?"
- 等待路人回答(等待響應)
路人回答(處理響應):
- "有啊,附近有'一點點'、'CoCo'和'喜茶'"
- 小明記下這些店的位置(更新緩存)
選擇一家店(chooseHost):
- 小明決定去"CoCo"(負載均衡選擇)
- 下次想喝奶茶時,他已經知道三家店的位置(緩存有效)
這個方法通過本地緩存優化了服務發現的效率,避免了每次都需要向服務中心查詢,同時通過動態更新保證了服務信息的及時性。
onServiceRequest?方法詳解:
方法目的
onServiceRequest?是?Discoverer?類的方法,用于處理服務狀態變更通知,包括服務上線和下線。它是服務動態發現機制的核心,確保客戶端能夠及時感知服務提供者的變化。
執行流程示例
場景1: 新的披薩店開始提供服務
想象一個新的披薩店"意式風情"開業,開始提供披薩服務:
// 服務注冊中心向所有關注披薩服務的客戶端發送通知
ServiceRequest::ptr msg = MessageFactory::create<ServiceRequest>();
msg->setMethod("披薩");
msg->setHost({"192.168.1.110", 8080}); // 意式風情披薩店地址
msg->setOptype(ServiceOptype::SERVICE_ONLINE);// 對每個關注披薩服務的客戶端
discoverer->onServiceRequest(conn, msg);
執行步驟:
判斷操作類型:
auto optype = msg->optype(); // SERVICE_ONLINEstd::string method = msg->method(); // "披薩"
- 確認這是一個服務上線通知
- 獲取服務方法名稱為"披薩"
加鎖保護共享數據:
std::unique_lock<std::mutex> lock(_mutex);
- 確保在多線程環境下安全訪問和修改數據
處理上線通知:
if (optype == ServiceOptype::SERVICE_ONLINE) {auto it = _method_hosts.find("披薩");if (it == _method_hosts.end()) {// 如果這是第一家提供披薩服務的店auto method_host = std::make_shared<MethodHost>();method_host->appendHost({"192.168.1.110", 8080});_method_hosts["披薩"] = method_host;} else {// 如果已有其他披薩店,將新店添加到列表it->second->appendHost({"192.168.1.110", 8080});}}
- 檢查是否已有披薩服務的提供者列表
- 如果沒有,創建新列表并添加"意式風情"披薩店
- 如果已有,將"意式風情"披薩店添加到現有列表
更新后的映射表:
_method_hosts = {"漢堡": [...],"披薩": [..., {"192.168.1.110", 8080}] // 添加了新的披薩店}
場景2: 披薩店暫停營業
想象"意式風情"披薩店因設備故障暫停營業:
// 服務注冊中心向所有關注披薩服務的客戶端發送通知
ServiceRequest::ptr msg = MessageFactory::create<ServiceRequest>();
msg->setMethod("披薩");
msg->setHost({"192.168.1.110", 8080});
msg->setOptype(ServiceOptype::SERVICE_OFFLINE);// 對每個關注披薩服務的客戶端
discoverer->onServiceRequest(conn, msg);
執行步驟:
判斷操作類型:
auto optype = msg->optype(); // SERVICE_OFFLINEstd::string method = msg->method(); // "披薩"
- 確認這是一個服務下線通知
- 獲取服務方法名稱為"披薩"
加鎖保護共享數據:
std::unique_lock<std::mutex> lock(_mutex);
處理下線通知:
if (optype == ServiceOptype::SERVICE_OFFLINE) {auto it = _method_hosts.find("披薩");if (it == _method_hosts.end()) {return; // 沒有找到披薩服務提供者列表,直接返回}it->second->removeHost({"192.168.1.110", 8080}); // 從列表中移除_offline_callback({"192.168.1.110", 8080}); // 調用下線回調}
- 查找披薩服務的提供者列表
- 從列表中移除"意式風情"披薩店
- 調用下線回調函數,可能用于清理與該店相關的連接或緩存
_method_hosts = {"漢堡": [...],"披薩": [...] // 移除了"意式風情"披薩店}
生活中的例子
想象一個使用外賣APP的用戶:
新餐廳上線通知(SERVICE_ONLINE):
- 用戶訂閱了披薩類別的推送
- 一家新的披薩店"意式風情"在APP上線
- APP推送通知:"新店上線!意式風情披薩店開業啦"
- APP更新披薩分類頁面,添加新店信息
- 用戶下次瀏覽披薩分類時,能看到這家新店
餐廳下線通知(SERVICE_OFFLINE):
- "意式風情"披薩店因設備故障暫停營業
- APP收到下線通知
- APP更新披薩分類頁面,移除該店或標記為"暫停營業"
- 如果用戶之前有未完成的訂單,APP可能會通知用戶:"您的訂單餐廳暫停營業,建議取消訂單"(類似_offline_callback的功能)
這個方法確保了客戶端能夠實時感知服務提供者的變化,提供了動態服務發現的能力,使系統更加靈活和可靠。