項目流程和架構設計
1.服務端的功能:
? ? ? ? 1.提供rpc調用對應的函數
? ? ? ? 2.完成服務注冊 服務發現 上線/下線通知
? ? ? ? 3.提供主題的操作 (創建/刪除/訂閱/取消訂閱) 消息的發布
2.服務的模塊劃分
? ? ? ? 1.網絡通信模塊 net 底層套用的moude庫
? ? ? ? 2.應用層通信協議模塊 1.序列化 反序列化數據 解決tcp中的粘包問題。2.客戶端/服務端對消息進行處理,就需要設置一個oMessage回調函數對收到的數據進行應用層協議處理。
????????3.消息分發模塊? Dispatcher,收到消息 根據不同的消息類型去調用不同的回調函數
? ? ? ? 4.遠程調用路由模塊 RpcRouter 提供Rpc請求的處理回調函數? 并返回執行完的結果
? ? ? ? 5.服務注冊/發現模塊 Register—Discovery 針對 服務請求進行處理
? ? ? ? 6.發布訂閱模塊 Publish--subcribel 針對發布訂閱請求進行處理 并提供一個回調函數給Dis模塊
設計項目原因
對底層網絡通信和分布式架構感興趣,為了對客戶端-服務端通信機制更了解 就實現了這個支持注冊中心 發布訂閱 異步調用功能的JsonRPC框架。
RPC 調用流程簡要總結(簡潔專業版)
客戶端通過統一接口call進行rpc調用,把函數名method+參數(參數名:值)封裝進RpcRequest并用JSON:Value格式存儲,再進行序列化把JOSN:Value轉化為string對象,最后通過應用層通信協議LVProtocol添加上報頭,通過TCP發送到服務端。
rpc服務端接收到數據后,先根據LV協議從緩沖區中獲取完整的報文,去掉報頭 獲取正文body字段,再進行反序列化轉為JSON:Value格式,根據里面的method字段 路由找本地對應的服務描述對象 進行參數檢測 沒有問題進行函數調用,返回的結果也是JSON:Value類型,序列化 添加報頭 TCP返回給客戶端。
一、項目背景與整體架構
你這個 RPC 框架項目的初衷是什么?你為什么要做這個項目?
主要是想了解一下網絡通信和分布式架構,為了對服務端-客戶端的通信機制更了解就實現了這個rpc項目。
你的 RPC 框架支持哪些核心功能模塊?請簡要介紹每個模塊的作用和之間的協作流程。
主要有三個大的功能,1.rpc調用模塊 2.服務發現/注冊模塊 3.主題發布與訂閱模塊。
1.rpc模塊對內本地維護了一張表,函數名method對應服務描述對象(包含method 參數類型 返回值類型? 回調函數 參數的檢測check()),給Dispathcer消息派發模塊提供處理rpc請求的回調函數,根據請求中的method函數中找到對應的服務描述對象進行調用,并返回結果應答。
2.服務發現/注冊模塊 整個rpc的流程不是客戶端直接向服務端發送rpc請求。而是1.服務提供者也就是rpc服務端先向注冊中心發送注冊請求,讓注冊中心知道我能提供什么服務。2.客戶端要先向注冊中心發起服務發現請求,注冊中心再返回能提供該服務的主機地址列表,這里會有一個輪詢服務 來選擇其中一個主機地址 讓客戶端進行rpc調用。給Dispatcher提供服務發現/注冊請求的回調處理函數。
3.主題操作模塊? 我們這個rpc框架會提供關于主題的操作,我們可以創建主題/取消主題/訂閱/取消訂閱 主題消息發布。對內會維護好兩種表,1.map<topic_name,topic>主題名 主題間的映射 2.map<conn,subcribe>連接? 訂閱者,這樣就知道每個主題的訂閱者有哪些,每個訂閱者的訂閱主題有哪些。進行主題消息發布時,根據topic中的訂閱者列表給訂閱者主動推送消息進行廣播。給Disathcer提供回調函數處理主題操作/發布請求
你為什么選擇自定義 JSON-RPC 協議?和 gRPC 或 Thrift 相比,你的設計有什么優勢或不足?
用JSON的話,JSON反序列化和序列化操作更簡單 其他的沒有了解過。
二、客戶端與服務端機制
客戶端是如何發起一個 RPC 調用的?整個請求從發起到響應的流程是怎樣的?
1.首先要先獲取rpc服務端的地址,有兩種情況1.通過服務發現進行獲取服務端地址。具體流程是 rpc客戶端里面是有個服務發現客戶端向注冊中心進行服務發現,注冊中心返回能提供該服務的主機地址,服務發現客戶端再根據負載均衡策略 也就簡單的輪詢 選擇其中的一個給rpc客戶端進行rpc調用。2.或者rpc不啟動服務發現,直接傳入服務端地址 進行rpc調用。
2.創建服務端和客戶端連接,先從連接池中有沒有對應的連接,有獲取 沒有創建連接,進行rpc調用,結束后放入連接池。
3.創建rpc請求并發送,rpc客戶端給外部提供call函數 里面是通過Requestor模塊的send()函數進行發送rpc請求的。
4.Dispatcher模塊進行消息派發,Dispathcer模塊消息回調函數onMessage 獲取消息,根據消息的類型,進行派發調用對應服務端提供的回調函數,也就是rpc服務端rpc_router向Dispatcher模塊注冊的onRpcRequest()函數.
5.rpc服務端進行本地路由,根據請求消息里面的method函數 找對應的服務描述對象,進行參數檢測并調用,生成結果響應并返回。
6.Dispatcher派發應答,Requstor請求消息發送的模塊提供onResponse()處理返回的應答,根據消息唯一的ID找對應的請求消息,并設置結果/進行回調。
你在服務端如何注冊一個服務方法?具體在哪個模塊處理服務注冊?
1.首先rpc服務端模塊 里面會包含一個服務注冊的客戶端,當服務端本地新上線了一個服務 除了本地進行記錄,還得向服務注冊中心通過服務注冊客戶端發送服務注冊消息。
2.服務注冊中心會像Dispathcer模塊注冊onServiceRequest()回調函數處理服務注冊/發現的請求消息,后續操作就是跟新該服務的服務提供者列表,并給發現過該服務的發現者進行服務上線通知。
客戶端怎么實現“同步調用”?你是怎么保證調用阻塞等待返回的?用了哪些 C++ 特性?
1.rpc客戶端會提供3中call函數 其中同步call里面創建msg消息并調用Requestor模塊中的同步send,但這個同步send里面就是調用異步send+外部等待get()進行阻塞,等結果響應返回過來 在請求描述中的promise對象進行set_value設置結果后 外部get()解除阻塞?從而達到同步的效果。
2.這里我是采用了promise+future來完成阻塞等待返回的,future對象get()進行阻塞等待 直到promise對象set_value才會停止阻塞。
3.同步send()調用異步send()體現函數重載 C++多態特性
三、消息處理與分發機制
Dispatcher 模塊的作用是什么?你是如何實現“消息類型到回調函數”的映射的?
1.Dispathcer模塊 簡單來說是消息分發處理模塊,根據消息的類型調用對應模塊注冊的回調函數進行派發處理。
2.這里我們用一共哈希表map完成消息類型到回調函數的映射的,但這里有兩中處理方法。
第一種,就是map的value值就是存儲的回調函數,但這就需要注冊的所有回調函數類型相同才能統一存儲在map中。回調函數的參數都是兩個,1.BaseConnect::ptr連接可以基類接收 2.消息類型 當然也可以用BaseMessage::ptr基類接收,但實際上傳入的對象的對應的子類對象。這樣雖然可以用map統一管理,但我們必須在回調函數中進行判斷傳入的消息對象是不是對應的子類類型,我們必須要去猜是不是,如果不是需要怎么處理,后續新增了其它消息子類是不是還需要更改邏輯處理,這明顯不符合開閉原則。
這里項目中采用的第二種方法,即第二個參數就是用對應子類對象指針來接收。但這樣函數類型不一樣,還這么統一管理?
這里我是通過map存儲一個同一個父類指針,通過虛函數調用機制,調用對應的子類對象 里面的回調函數。我先創建一個父類 里面有一個虛函數,用模板類根據消息類型創建對應的子類,這些子類繼承于父類 并重寫父類里面的虛函數,寫入自己回調函數的處理邏輯。
簡單來說就是 多態(虛函數調用機制)+模板類+繼承 讓map統一管理不同類型的回調函數
你是怎么實現類型安全的消息派發機制的?為啥不用函數指針而是用多態+模板?
各個模塊在創建時 向Dispatcher模塊注冊處理消息的回調函數 傳入的參數有1.消息類型 2.回調函數。Dispatcher里面通過模板函數根據消息類型 生成不同的子類,里面重寫的虛函數再調用傳入的回調函數。在map存儲子類對象 用它們的父類統一接收。
后面調用的時候,根據類型找到map存儲的父類指針根據虛函數調用機制 調用對應子類重寫的虛函數 完成回調處理。
為什么不用函數指針,主要還是不想讓回調函數用BaseMessgae::ptr接收子類對象,接收了里面還得判斷 把父類指針dynamic_pointer_cast轉換為子類指針,如果轉換失敗了 會返回空指針,需額外判斷處理。
JsonRequest 和 JsonResponse 有哪些子類?它們的繼承結構設計的初衷是什么?
1.JsonRequest 請求類,有三個子類 分別對應三個主要功能。1.RpcRequest rpc調用請求
? ? ? ? 2.ServiceRequest 服務請求類 3.TopicRequest 消息請求類.
2.同理JsonResponse 應答類,也根據功能分為三個。
這樣根據父類生成子類,主要還是讓一些共同需要的部分放在基類中,子類再根據自己需要來自己實現函數 成員變量。比如說應答類都需要rcode響應碼,就可以放在響應基類中 check()檢測響應字段是否正確。如果子類需要根據自己定義的字段實現check()也可以重寫。
這樣的好處 新增子類類型只需繼承父類并實現即可,符合開閉原則。整體的結構也更清晰。
四、注冊中心與服務發現機制
服務注冊中心是如何工作的?服務端是如何注冊自己的服務信息的?
注冊中心PDManager可以分為兩個部分,1.ProviderManager服務提供者管理 2.DiscovererManager服務發現者管理。完成服務注冊 服務發現 服務上線/下線操作。
1.Pro服務提供者管理 有兩張表map<method,vector<Pro>>服務方法能被哪些服務提供者提供,map<conn,Pro>連接到提供者。Pro里面有自己能提哪些方法的列表vector<method>。
2.Dis服務發現者管理 也是有兩種表map<method,vector<Dis>>該method方法被哪些發現者發現了,map<conn,Dis>連接到發現者。Dis里面有自己發現過哪些方法的列表vector<method>
根據這些結構 ,并給Dispatcher模塊注冊onServiceRequset()回調函數,處理服務發現/注冊請求消息。
1.服務注冊(conn給注冊中心說自己能提供method方法)?根據conn從map<conn,Pro>找提供者+map<method,vector<Pro>>新增method服務提供者+Pro提供者內部新增提供方法+服務上線通知 給發現過該method方法的發現者進行通知+返回注冊成功應答
? ? ? ? 1.根據conn從map<conn,Pro>找提供者:先找連接的提供者,如果沒有就新建并加入map中
? ? ? ? 2.map<method,vector<Pro>>新增服務提供者:從map<method,vector<Pro>>方法對應的提供者列表中新增服務者
? ? ? ? 3.Pro提供者內部新增方法:對Pro里面的方法提供列表vector<method>新增方法
? ? ? ? 4.服務上線通知:Dis服務發現者中1.從map<method,Dis>找該方法被哪些發現者發現了,并從每個Dis發現者客戶端發送服務上線通知
? ? ? ? 5.給服務注冊客戶端 發送注冊成功應答
2.服務發現(conn想發現能提供method方法的提供者地址) 根據conn從map<conn,Dis>找發現者+map<method,vector<Dis>>新增method服務發現者+Dis發現者內部新增發現的方法+返回應答(包含該method方法提供者地址)
? ? ? ? 1.根據conn從map<conn,Dis>找發現者:先找連接的發現者,如果沒有就新建并加入map中
? ? ? ? 2.map<method,vector<Dis>>新增method服務發現者:從map<method,vector<Dis>>方法對應的發現者列表中新增發現者
? ? ? ? 3.Dis發現者內部新增發現的方法:對Dis里面的方法發現列表vector<method>新增發現
? ? ? ? 4.返回應答(包含該method方法提供者地址):根據method從服務提供者管理的map<method,vector<Pro>>找到能提供該method方法的方法提供者,從中獲取host地址 組成應答并返回。
3.連接斷開(連接斷開的回調函數) 分為提供者斷開 發現者斷開
? ? ? ? 1.提供者斷開:提供者的所有方法都要下線 根據發現者管理中map<method,vector<Dis>>找每個method方法的發現者,進行服務下線通知。最后刪除提供者管理中map<conn,Pro>服務提供者的管理。
? ? ? ? 2.發現者斷開:發現者斷開連接不需要通知任何人,直接在發現者管理中map<conn,Dis>刪除對發現者的管理就行。
如果服務端宕機,注冊中心會怎么處理?你做了哪些“心跳檢測”或“服務下線”機制?
針對這種情況,我的項目中沒有實現相關的處理,但我可以對這種情況處理提個思路,心跳檢測機制。
服務端每隔一段時間(5s)會給注冊中心發送心跳包?表示自己還可以提供服務,注冊中心記錄每個服務端最后一次心跳時間,由定時任務(或事件循環)周期性檢查,如果超過一定時間(10s)沒上報心跳 就將服務端視為異常,注冊中心就會斷開連接 觸發連接斷開回調,1.服務端提供的每個method中的提供者列表去除該提供者 2.并刪除map<conn,Pro>對該提供者的管理 3.最后給發現該提供者的方法的客戶端發送服務下線的通知。
客戶端如何進行服務發現?請求發送到注冊中心返回的是什么信息?
服務發現客戶端 構建服務發現請求消息并通過Requestor模塊中同步send進行發送,Dispatcher模塊根據消息類型調用 注冊中心注冊的回調函數onServiceRequest()處理服務發現請求。
請求發送到注冊中心返回的是客戶端進行發現method方法的提供者host地址列表,后面通過輪詢的負載均衡策略選擇一個給客戶端。
五、主題發布/訂閱模塊(Pub/Sub)
你的框架實現了“主題發布/訂閱”機制,這一塊和傳統 RPC 調用有什么區別?
1.通信方式:傳統的RPC調用 一般是一個服務端對應一個客戶端,而主題消息發布是一個服務端對應多個客戶端 對訂閱該主題的所有客戶端進行廣播。
2.控制方向:傳統RPC是客戶端端主動發送請求消息,而服務端是被動返回應答的。而在主題信息發布中服務端是接收到消息發布請求,對訂閱主題的客戶端是主動進行發送消息。
3.同步 耦合:相比傳統的同步RPC請求-應答模型,我們的主題發布/訂閱是基于事件驅動模型,異步非阻塞 客戶端不需要阻塞等待響應,由服務端主動推送。解耦性強 發布者不需要知道訂閱者是誰,只需要向主題服務端發布消息就可以。
對比維度 傳統 RPC 調用 主題發布/訂閱 控制方向 客戶端主動請求 服務端主動推送 通信方式 一對一 一對多 同步性 同步阻塞等待 異步事件驅動 耦合性 緊耦合 松耦合 適用場景 精準調用,事務請求 廣播、推送、訂閱通知 實現關鍵 請求封裝 + 回調映射 主題管理 + 回調路由
多個客戶端訂閱同一個主題,你是如何處理每個客戶端的回調函數的?
首先客戶端訂閱一個主題,服務端接收主題消息發布 向訂閱者客戶端進行廣播主動發送消息,如何處理消息需要客戶端自己定義。也就是訂閱者客戶端會向Dispatcher模塊注冊onPublish()回調函數處理服務端主動推送的消息,客戶端本地會有哈希表map<topic_name,cb>維護主題名到對應回調函數的映射,讓本地對應的回調函數處理推送的消息。
簡單來說,就是每個客戶端本地會維護好一張哈希表 主題名->該主題回調函數的映射。
客戶端發布主題消息,服務端是如何進行廣播的?如何避免回調未注冊問題?
1.服務端接收到主題發布請求消息,會對所有訂閱該主題的訂閱者客戶端進行廣播。就要先找到訂閱該主題的所有訂閱者,這就要先說明主題服務端的兩種表和兩個結構體。
? ? ? ? 1.訂閱者列表map<conn,subcriber::ptr>,對訂閱者進行管理。結構體subcriber訂閱者中有訂閱的主題名稱列表vector<topic_name>.
? ? ? ? 2.主題列表map<topic_name,topic::ptr>,對主題進行管理。結構體topic主題中有訂閱該主題的訂閱者列表vetor<sub>
提供主題列表map<topic_name,topic>找到對應主題,topic里面就有對應該主題的所有訂閱者,遍歷訂閱者列表直接通過底層send進行發送消息。
2.對應服務端主動推送過來的主題消息發布,既要在Dispathcer模塊注冊onPuhlish回調接收,還有在本地進行路由調用對應主題的回調消息進行處理。如果我們在訂閱后,再注冊本地對應主題的處理函數,有可能剛訂閱了主題立馬推送過來消息,但本地的主題處理函數還沒注冊成功,導致找不到回調函數處理該主題消息。
所以必須在訂閱主題前,注冊主題回調處理函數。如果訂閱失敗可以del刪除剛注冊的回調函數。
六、網絡與協議設計
你使用了 muduo 網絡庫,它的 Reactor 模型你理解嗎?客戶端和服務端的 loop 有什么區別?
Reactor 模型是一種基于事件驅動的并發編程模型,它通過事件分發器(如 epoll)監聽所有 socket 的 IO 事件,當某個 socket 可讀/寫時,調用事先注冊的回調函數進行處理。提供一種 非阻塞、事件驅動 的結構,用一個或少量線程高效處理大量并發連接;
Muduo 是一個基于 Reactor 模型的高性能 C++ 網絡庫
1.muduo網絡庫中服務端中用的是主從Reactor模型,主Reactor是專門監聽連接事件的,有客戶端想要建立連接 就accpet()獲取創建連接并返回套接字socket,后面就直接交給從Reactor,監聽連接后續的IO事件。在客戶端中也有一個 Reactor(即 EventLoop),用于管理發送請求、接收響應、處理推送等事件。
2.服務端和客戶端的loop最大區別是是否在前臺阻塞循環運行。1.服務端是直接把loop線程放在前臺循環運行的 2.而客戶端是后臺運行loop,因為客戶端需要主動發起連接、請求、注冊回調函數等操作,具體操作的執行和返回結果處理都是在循環線程中處理的。如果loop循環線程放在客戶端前臺,就會導致阻塞 將影響主線程邏輯。而服務端就是為了處理客戶端發送的消息,可以循環運行。
你如何解決 TCP 粘包問題?數據接收后如何區分一條完整的 JSON 消息?
針對TCP粘包問題,我是自己定義了應用層通信協議LVProtocol,采用定長頭部 + 變長正文的形式。具體來說是 固定頭部由三個4字節字段組成:消息長度、類型、ID長度,后兩個變長字段 消息ID+body正文。
接收數據后我們會放在緩沖區中,通過訪問前4字節獲取該報文的長度 再對比當前緩沖區存儲數據的大小,如果緩沖區存儲數據大小>該報文長度,就認為緩沖區中至少有一條完整消息。
你在客戶端和服務端的協議抽象層(如 BaseProtocol)里做了哪些處理?
BaseProtocol里面實現了接口的統一,這樣上層具象層不需要管底層實現是怎么樣的,直接用BaseProtocol提供的統一接口就可以 具體接口有3個 1.canProcessed判斷緩沖區是否有完整報文 2.onMessage從緩沖區中解析完整報文(去報頭+反序列化) 3.serialize 準備應答(序列化+添報頭)。BaseProtocol底層具體實現是自定義實現的LVProtocol協議,后續換其它協議只需實現對應的 BaseProtocol 子類即可,不會影響上層代碼 符合開閉原則。
七、擴展性與項目亮點
你這個框架的擴展性在哪里體現?如果后續想加負載均衡或限流模塊,怎么接入?
我的項目解耦性好,新增業務時 不會影響原有業務,把消息的回調處理函數注冊給Dispatcher模塊就可以,新增消息類型 繼承父類并實現自己需要的字段/函數就行。
這個不了解。
標準回答:
一、框架擴展性的體現
我的框架在架構設計上非常注重模塊解耦與可擴展性,主要體現在以下幾個方面:
? 1. Dispatcher 模塊實現了消息分發中心
所有模塊之間不直接調用彼此的邏輯,而是通過 Dispatcher 注冊和派發消息;
每類消息有唯一的
mtype
類型編號;每個模塊通過
dispatcher.registerHandler<MessageType>(type, callback)
注冊自己的消息處理函數;新增功能只需要:
定義新的消息類型(繼承 JsonMessage);
實現處理邏輯;
注冊到 Dispatcher 即可,無需改動原有代碼。
📌 體現了開閉原則(對擴展開放,對修改封閉)。
? 2. 消息繼承體系保證擴展性和統一性
所有消息統一繼承自
BaseMessage -> JsonMessage
;不同功能子模塊(RPC、服務注冊、發布訂閱)都基于該結構;
如果未來要增加新的子系統(如監控、配置下發),只需要繼承 JsonMessage 并實現新的 Request/Response 即可;
統一的編碼/解碼流程由
BaseProtocol
層抽象,新增協議或更換底層實現對業務層完全無感。
二、如何擴展限流、負載均衡模塊?
? 1. 負載均衡模塊擴展方式
客戶端發起 RPC 請求前,會先調用服務發現模塊獲取服務地址列表(如 3 個提供者 IP)。
當前默認是 輪詢策略,在
RpcClient
模塊中選擇一個服務端。擴展方式:
定義一個策略接口類(如
class LoadBalancer { virtual Address select(const vector<Address>&) = 0; }
);實現不同策略子類(如輪詢、最小連接數、權重 hash);
在
RpcClient::call()
或RpcClient::getConnection()
內注入該策略類;實際運行時按配置動態切換策略,甚至可熱更新。
? 體現策略模式 + 運行時可插拔能力
? 2. 限流模塊擴展方式
限流屬于“請求前過濾控制”,目標是防止系統過載,可通過以下幾種方式接入:
接入點:在客戶端發送請求前(即
Requestor::send()
之前)判斷是否允許發送;實現方式:
可以通過一個
RateLimiter
類控制 QPS;支持令牌桶、漏桶算法;
拒絕請求時立即返回錯誤消息;
服務端也可以接入限流模塊,在 Dispatcher 注冊的
onRpcRequest()
中判斷當前負載情況是否超出閾值,做保護性熔斷。
你在整個開發過程中,最有成就感或最難解決的問題是什么?你是怎么解決的?
1.異步生命周期管理 + 2.客戶端連接資源管理
1.Rpc調用模塊的客戶端中會提供 異步future調用call函數,promise異步設置結果 上層future get()獲取結果。在call函數內部用make_shared創建promise對象并用shared_ptr指針進行管理,用輸出行參數result返回和promise管理的future對象給上層。由Requestor的send()發送請求并對請求描述對象進行管理。等服務端處理返回應答時,Requestor模塊接收到響應并找到請求描述對象,取出里面promise進行set_value設置結果,上層future就緒get()就可以獲取結果了。
但問題在于我們的shared_ptr<promise<JSON::Value>>對象是一個局部對象,出了作用域就會析構,后面回調函數給promise對象設置結果時,promise對象為空,這樣上層永遠獲取不到結果。
針對這種情況,我采用的是bind值綁定+外部保存資源 延長生命周期。bind綁定指向promise對象的shared_ptr指針 。本質就是 讓一個生命周期更長的shared_ptr指向promise對象,引用計數不為0 promise對象就不會析構,從而達到延長生命周期的效果。
這里要注意兩點 1.bind值綁定 2.外部拷貝保存資源
1.bind不要用引用綁定,如果是引用綁定shared_ptr可能會出現懸垂引用,外部還沒進行保存就析構了。
2.bind值綁定并不會延長promise對象的生命周期,必須在外部保存一份資源 讓一個生命周期更長的shared_ptr指向promise才能延長生命周期。具體就是在Requestor里面保存的請求描述對象中拷貝了promise的shared指針。
因此我們的異步futuer調用call函數不是走異步furtuer的send函數,而是走異步回調函數。應答返回過來時,調用bind綁定的回調函數 在里面set_value設置結果,而不是直接設置。
2.關于客戶端連接 選擇短鏈接還是長連接
一開始我是選擇短鏈接的方案,即客戶端調用完就關閉。但這就會有一個問題,在rpc調用的時候rpc客戶端用Requestor模塊send發送完消息 rpc調用結束后,該客戶端就會析構。但后面服務端返回應答,Requestor模塊的回調函數中雖然找到請求描述對象 設置結果,但客戶端沒了后續怎么get()獲取結果呢?回調函數中設置的結果也就沒有意義了。
所以在我的項目中 我選擇的方案是長連接,客戶端調用完時,連接也不會進行析構。連接不會析構,我們就需要對這些客戶端連接進行管理。這里我是用連接池對客戶端連接進行管理的,這樣的好處是下次再調用時 相同的話可以復用連接,客戶端進行rpc調用時 獲取到目標主機的地址,不是直接建立于目標主機服務端的連接,而是在連接池中找有沒有連接可以進行復用,沒有再創建并加入連接池中。后續服務提供者斷開連接了,處理服務下線的回調函數中會調用_offline_cb刪除連接池中的客戶端連接。
客戶端發起rpc調用時 把連接加入連接池,客戶端處理服務下線時會刪除連接池中對應的連接。
綜合以上,無論是功能實現還是效率上,選擇長連接都是更合適的。
1.能支持異步調用 get()獲取結果。
2.可以從連接池中復用之前的連接? 不用頻繁的創建和釋放連接,提高效率。
? ? ? ? 如果像短連接頻繁創建釋放,一是會浪費資源 降低效率。二是如果客戶端頻繁進行rpc調用,客戶端連接會出現大量TIME_WAIT狀態 占用端口號不釋放(等待2MSL時間后才會完全關閉 釋放端口號),導致端口號資源耗盡 無法向服務端建立起連接。
項目拓展
1.發布/訂閱模塊如果先啟動發布者,再啟動訂閱者,能否收到消息?怎么做才能讓收到消息?
不能收到消息
因為目前的做法是當我們發布一條消息到服務器之后,服務器會遍歷所有的訂閱者,將這條消息轉發給訂閱者,此時訂閱者列表為空,當訂閱者啟動訂閱的時候,這條消息已經沒有了,本質上是因為我們在服務器沒有對消息進行持久化存儲。
每個 Topic 綁定一個消息隊列,進行消息發布時進行入隊操作。每個sub訂閱者維護一個消費位置offset 表示讀到第幾條消息了。進行訂閱時 服務端會訂閱者的offset開始從隊列中取出所有未消費的消息,主動推送給訂閱者,topic主題中還保存每個訂閱者的offset(map<conn,offset>) 推送消息出隊列時更新,訂閱者接收到消息時更新offset,這樣服務端客戶端都保存offset。
簡單來說:服務端topic用一個隊列存儲主題消息,發布時入隊,訂閱時出隊(移動offset表示邏輯出隊列)。
2.為什么要有心跳檢測機制?
因為rpc調用時 服務端可能會出現故障不再處理請求消息,但注冊中心還會認為服務端存在 就導致發送到該主機的請求全部失敗或超時。引入心跳檢測機制,讓服務端每各一定時間就向注冊中心發送消息 表示我還在。注冊中心維護一張表 記錄服務端也就是服務提供者的狀況,如果有服務端長時間沒有發送心跳檢測消息還沒斷開,注冊中心就會認為該服務端異常 斷開連接,并給對應發現者進行服務下線通知 以及刪除對該服務的管理。
在實際的 RPC 框架運行過程中,服務端可能會因故障、死循環、崩潰等原因停止處理請求,但:
TCP 連接在底層不會立即斷開;
注冊中心仍然認為該服務節點“在線”;
導致路由模塊持續將請求發送到失效節點,最終全部失敗或超時,嚴重影響系統可用性。
解決方案:引入服務心跳機制
我們設計了服務端心跳機制,具體邏輯如下:
服務提供者(服務端)定時向注冊中心發送心跳消息,表明自己仍在線且可服務;
注冊中心維護一張服務狀態表,記錄每個服務節點的最后心跳時間;
若某個服務節點超過一定時間(如 15 秒)未發送心跳,注冊中心將其判定為“異常離線”;
注冊中心會立即通知所有訂閱者(發現者)該服務已下線,并從服務發現表中移除該節點,避免繼續路由請求到故障節點;
若后續該服務恢復上線,重新注冊后即可恢復服務。