八、客戶端模塊的實現
客戶端實現的總體框架
在 RabbitMQ 中,應用層提供消息服務的核心實體是 信道(Channel)。
用戶想要與消息隊列服務器交互時,通常不會直接操作底層的 TCP 連接,而是通過信道來進行各種消息的發布、訂閱、確認等操作。
信道可以看作是在單個 TCP 連接之上的輕量級虛擬連接,它負責封裝具體的協議細節,屏蔽了網絡通信的復雜性。
用戶只需要調用信道提供的接口,發送消息或接收消息,無需關心底層數據如何傳輸、協議如何實現。
簡單來說,用戶面向的是信道服務接口,而信道背后處理了連接管理、數據編解碼、協議交互等工作,實現了業務與網絡通信的解耦。
客戶端設計視角和服務器視角的對比
方面 | 客戶端視角(信道) | 服務端視角(連接+信道) |
---|
主要關注點? | 調用信道接口完成消息操作 | 管理連接和信道,執行底層協議與業務邏輯 |
抽象層級? ? | 抽象出具體網絡和協議細節 | 解析信道請求,實現消息路由和持久化 |
資源管理? ?? | 不關心連接具體實現,只用信道 | 管理物理連接、信道狀態及相關資源 |
多路復用? ?? | 多信道復用同一連接,簡化調用 | 維護多信道,保證隔離與并發性能 |
用戶責任? ? ? | 只需調用信道接口? ? ? ? ? ? ? ? ? ? ? ? ?? | 處理協議解析、消息調度、資源分配 |
- 連接模塊
客戶端和服務端進行網絡連接的基礎。
一個直接面向用戶的模塊,內部包含多個對外提供服務的接口,用戶需要什么服務進行調用對應的接口即可,其中包含交換機的聲明/刪除,隊列的聲明與刪除,隊列的綁定與解綁,消息的發布與訂閱,訂閱和解除訂閱。
表達了客戶端與服務器之間在消息隊列系統中協作的流程
在仿 RabbitMQ 的消息隊列系統中,客戶端首先通過訂閱者模塊注冊自身的消費者身份,并指定對應的消息處理回調函數;隨后通過信道模塊在單一的 TCP 連接上實現多路復用,創建多個邏輯信道以并行處理不同的消息服務(如發布、訂閱、隊列管理等)。客戶端通過連接模塊建立與服務器的連接,并在信道中發起具體的請求服務。
服務器接收到連接請求后,由服務器端的連接管理器創建連接上下文,并根據信道中攜帶的請求類型,路由到對應的處理模塊(如交換機、隊列或消息模塊),執行相應的業務邏輯。處理結果再由服務器的異步線程池將數據封裝好并通過網絡返回給客戶端,客戶端的異步事件機制或線程池再觸發對應的回調函數完成消息消費流程。
基于以上模塊實現客戶端的思路就非常清晰了
1、實例化異步線程對象
2、實例化連接對象
3、通過連接對象進行創建信道
4、根據信道進行獲取自己的所需服務
5、關閉信道
6、關閉連接
8.1、訂閱者模塊
訂閱者對象的設計
一個不向用戶進行直接展示的模塊,在客戶端中進行體現的作用就是對角色的描述,表示這就是一個消費者,用戶通常不直接操作訂閱邏輯,而是通過定義“消費者”的方式進行消息處理邏輯的注冊。
一個信道只有一個訂閱者,所以說不需要進行訂閱者的管理。訂閱者這個模塊很簡單,沒有涉及到一些業務模塊的內容,業務模塊的服務都在信道模塊進行提供。
訂閱者模塊(消費者)這個類中成員變量的設計
- 首先需要定義消費者ID,描述了收到該條消息后該如何進行對這條消息進行處理。
- 其次是要進行訂閱的哪個隊列的ID和自動刪除標志,描述了收到消息后是否需要對消息進行回復,是否要進行自動刪除。
- 最后是回調函數,描述了從隊列中進行獲取消息后應該如何進行處理,這部分由用戶進行決定。
訂閱者模塊的實現
using ConsumerCallback = std::function<void(const std::string, const ys::BasicProperties *bp, const std::string)>;struct Consumer{using ptr=std::shared_ptr<Consumer>;std::string _cid;std::string _qname;bool _auto_ack;ConsumerCallback _callback;Consumer(){DLOG("new Consumer:%p",this);}Consumer(const std::string &cid, const std::string &qname, bool auto_ack, const ConsumerCallback &cb): _cid(cid), _qname(qname), _auto_ack(auto_ack), _callback(std::move(cb)){DLOG("new Consumer:%p",this);}~Consumer(){DLOG("del Consumer:%p",this);}};
在構造函數中,ConsumerCallback(是一個 std::function)內部可能有復雜對象(比如 Lambda、綁定的資源等),如果直接寫 _callback(cb),會調用拷貝構造,可能涉及較多內存分配、資源拷貝,用 std::move(cb),可以讓 ConsumerCallback 的內部資源被移動到 _callback,更高效。
?8.2、異步工作線程模塊
用戶雖然是通過信道進行網絡通信的,但是網絡通信的本質還是需要進行IO事件的監控的,這就要通過IO監控線程來進行整,不能在當前線程進行IO事件的監控,這樣的話就會在當前線程進行阻塞住了.下面通過表格的方式進行說明
模塊 | loopthread | pool |
---|---|---|
訂閱客戶端 | 負責監聽服務器消息推送(socket 讀事件) | - 接收到消息后,異步將業務處理放到 pool 中去執行,因為收到的消息可能需要進行處理會耗時,防止主線程阻塞,無法進行監聽服務器消息的推送 |
發布客戶端 | 負責監聽向服務器發送消息后的 socket 可寫事件方便繼續向服務器進行發送數據(或者響應 ack 等) | - 應用層調用發布接口時,耗時操作(如序列化、日志記錄)在 pool 中處理 |
class AsyncWorker{public:using ptr=std::shared_ptr<AsyncWorker>;muduo::net::EventLoopThread loopthread;threadpool pool;};
8.3、連接模塊
這其實就是我們在Demo模塊利用muduo庫進行搭建的客戶端,這里其實就是換了一層皮,稱為連接模塊。
class Connection{public:using ptr=std::shared_ptr<Connection>;typedef std::shared_ptr<google::protobuf::Message> MessagePtr;Connection(const std::string &sip, int sport,AsyncWorker::ptr worker): _latch(1), _client(worker->loopthread.startLoop(), muduo::net::InetAddress(sip, sport), "Client"), _dispatcher(std::bind(&Connection::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),_worker(worker),_channel_manager(std::make_shared<ChannelManager>()){_dispatcher.registerMessageCallback<ys::basicConsumeResponse>(std::bind(&Connection::consumeResponse, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ys::basicCommonResponse>(std::bind(&Connection::basicResponse, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(),std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setConnectionCallback(std::bind(&Connection::onConnection, this, std::placeholders::_1));_client.connect();_latch.wait(); // 阻塞等待,直到連接建立成功}Channel::ptr openChannel() {Channel::ptr channel = _channel_manager->create(_conn,_codec);bool ret=channel->opneChannel();if(ret==false){DLOG("打開信道失敗");return Channel::ptr();}return channel;}void closeChannel(const Channel::ptr& channel){channel->closeChannel();_channel_manager->remove(channel->cid());}private://收到基礎響應void basicResponse(const muduo::net::TcpConnectionPtr &conn, const basicCommonResponsePtr &message, muduo::Timestamp){//1、找到信道Channel::ptr channel=_channel_manager->get(message->cid());if(channel.get()==nullptr){DLOG("未找到信道消息");return;}//2、將得到的響應對象進行添加到信道的基礎響應channel->putBasicResponse(message);}//收到消息推送void consumeResponse(const muduo::net::TcpConnectionPtr &conn, const basicConsumeResponsePtr &message, muduo::Timestamp){//1、找到信道Channel::ptr channel=_channel_manager->get(message->cid());if(channel.get()==nullptr){DLOG("未找到信道消息");return;}//2、封裝異步任務(消息處理任務),拋入線程池_worker->pool.push([channel,message](){channel->consume(message);});}void onUnknownMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp){LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown();}void onConnection(const muduo::net::TcpConnectionPtr &conn){if (conn->connected()){_latch.countDown(); // 喚醒主線程中的阻塞_conn = conn;}else{// 連接關閉時的操作_conn.reset();}}private:muduo::CountDownLatch _latch; // 實現同步的//muduo::net::EventLoopThread _loopthread; // 異步循環處理線程AsyncWorker::ptr _worker;muduo::net::TcpConnectionPtr _conn; // 客戶端對應的連接muduo::net::TcpClient _client; // 客戶端ProtobufDispatcher _dispatcher; // 請求分發器ProtobufCodecPtr _codec; // 協議處理器ChannelManager::ptr _channel_manager;};
在連接模塊這里是有一個極易容易進行掉進坑里的陷阱 :當發布客戶端進行向服務器進行發送建立連接請求的時候,由于TCP是有發送緩沖區和接收緩沖區的,當請求被發送到發送緩存區的時候,就會默認連接建立成功,但是此時的連接是沒有被建立成功的,此時發布客戶端誤以為連接是建立成功的,就會執行后續操作,向服務器進行發送消息,此時就出現了問題。同樣訂閱客戶端也類似。
因此在onConnection 函數中需要進行判斷是否是真正的建立連接成功。
consumeResponse中
當連接收到消息推送后,需要_consumer 進行參與,因為只有consumer中有回調函數,知道進行收到消息推送時如何進行處理,這個接口到時候收到消息之后和消息一起進行封裝成一個任務,把這個任務放到線程池中,并不在當前的主執行流中進行執行。
8.4、信道管理模塊
信道模塊的定位與主要職責
信道不僅僅是數據的通道,還承載著客戶端的業務接口,因此這個模塊不僅要進行信道結構的設計,還需要進行提供對應的業務邏輯。信道類可以理解為客戶端在和消息服務器交互時的一條邏輯通道。它并不是單純的數據結構,而是抽象出與服務器交互(各種請求/響應、狀態維護等)的一套完整業務流程封裝。
換句話說,其實可以將信道模塊進行理解成將訂閱者模塊、異步線程模塊、和連接模塊進行統一封裝管理。
客戶端信道模塊和客戶端其他模塊之間的交互關系
模塊 | 交互內容 |
---|---|
連接模塊 ( muduo::net::TcpConnection ) | - Channel 直接持有 TCP 連接 _conn - 通過 _conn 發送請求消息給服務器- 服務器響應也通過該連接返回 |
訂閱者模塊 ( Consumer ) | - Channel 通過 basicConsume() 創建訂閱者對象 _consumer - 收到推送消息時, Channel::consume() 回調訂閱者的處理邏輯 |
異步線程模塊 (muduo 的 IO 線程) | - 服務器響應由 IO 線程收到 - 觸發 Channel::putBasicResponse() ,將響應加入 _basic_resp - 觸發 Channel::consume() ,調用用戶回調 |
8.2.1、信道管理的信息
- 信道ID
- 信道關聯的網絡通信連接的對象
- protobuf 協議處理對象
- 信道關聯的消費者
- 請求對應的響應信息隊列(這里隊列使用<請求ID,響應>hash表,一遍進行查找指定的響應)
- 互斥鎖&條件變量(大部分的請求都是阻塞操作,發送請求后需要進行等到響應才能繼續,但是muduo庫的通信是異步的,因此需要我們子啊收到響應后,通過判斷是否是等待的響應來進行同步)。?
8.2.2、信道管理的操作
- 創建信道的操作
- 提供刪除信道操作
- 提供聲明交換機操作(強斷言-有則OK,沒有則創建)
- 提供刪除交換機
- 提供創建隊列操作(強斷言-有則OK,沒有則創建)
- 提供刪除隊列操作
- 提供交換機-隊列綁定操作
- 提供交換機-隊列解除綁定操作
- 提供添加訂閱操作
- 提供取消訂閱操作
- 提供發布消息操作
- 提供確認消息操作信道模塊進行管理
using ProtobufCodecPtr=std::shared_ptr<ProtobufCodec>;using basicCommonResponsePtr=std::shared_ptr<ys::basicCommonResponse>;using basicConsumeResponsePtr=std::shared_ptr<ys::basicConsumeResponse>;class Channel{public:using ptr=std::shared_ptr<Channel>;Channel(const muduo::net::TcpConnectionPtr& conn,const ProtobufCodecPtr& codec):_cid(UUIDHelper::uuid()),_conn(conn),_codec(codec){}~Channel(){basicCancel();}bool opneChannel(){std::string rid=UUIDHelper::uuid();ys::openChannelRequest req;req.set_rid(rid);req.set_cid(_cid);//向服務器進行發送請求_codec->send(_conn,req);//等待服務器basicCommonResponsePtr resp=waitResponse(rid);//返回return resp->ok();}void closeChannel(){std::string rid=UUIDHelper::uuid();ys::closeChannelRequest req;req.set_rid(rid);req.set_cid(_cid);//向服務器進行發送請求_codec->send(_conn,req);//等待服務器waitResponse(rid);//返回return;}bool declareExchange(const std::string& name,ys::ExchangeType type,bool durable,bool auto_delete,google::protobuf::Map<std::string,std::string>& args){//構造一個聲明虛擬機的請求對象ys::declareExchangeRequest req;std::string rid=UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(name);req.set_exchange_type(type);req.set_durable(durable);req.set_auto_delete(auto_delete);req.mutable_args()->swap(args);//向服務器進行發送請求_codec->send(_conn,req);//等待服務器basicCommonResponsePtr resp=waitResponse(rid);//返回return resp->ok();}void deleteExchange(const std::string& name){ys::deleteExchangeRequest req;std::string rid=UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(name);//向服務器進行發送請求_codec->send(_conn,req);//等待服務器basicCommonResponsePtr resp=waitResponse(rid);return;}bool declareQueue(const std::string& qname,bool qdurable,bool qexclusive,bool qauto_delete,google::protobuf::Map<std::string,std::string> qargs){ys::declareQueueRequest req;std::string rid=UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(qname);req.set_durable(qdurable);req.set_exclusive(qexclusive);req.set_auto_delete(qauto_delete);req.mutable_args()->swap(qargs);//向服務器進行發送請求_codec->send(_conn,req);//等待服務器basicCommonResponsePtr resp=waitResponse(rid);//返回return resp->ok();}void deleteQueue(const std::string& qname){ys::deleteQueueRequest req;std::string rid=UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(qname);//向服務器進行發送請求_codec->send(_conn,req);//等待服務器waitResponse(rid);//返回return;}bool queueBind(const std::string& ename,const std::string& qname,const std::string& key){ys::queueBindRequest req;std::string rid=UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(ename);req.set_queue_name(qname);req.set_binding_key(key);//向服務器進行發送請求_codec->send(_conn,req);//等待服務器basicCommonResponsePtr resp=waitResponse(rid);//返回return resp->ok();}void queueUnBind(const std::string& ename,const std::string& qname){ys::queueUnBindRequest req;std::string rid=UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(ename);req.set_queue_name(qname);//向服務器進行發送請求_codec->send(_conn,req);//等待服務器waitResponse(rid);//返回return;}bool basicPublish(const std::string &ename, ys::BasicProperties *bp, const std::string &body){std::string rid=UUIDHelper::uuid();ys::basicPublishRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_body(body);req.set_exchange_name(ename);if(bp!=nullptr){req.mutable_properties()->set_id(bp->id());req.mutable_properties()->set_delivery_mode(bp->delivery_mode());req.mutable_properties()->set_routing_key(bp->routing_key());}//向服務器進行發送請求_codec->send(_conn,req);//等待服務器basicCommonResponsePtr resp=waitResponse(rid);//返回return resp->ok();}void basicAck(const std::string &msgid)//刪除參數qname,用戶知道消費了哪個隊列{if(_consumer.get()==nullptr){DLOG("消息確認時,找不到消費者的消息");return;}std::string rid=UUIDHelper::uuid();ys::basicAckRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(_consumer->_qname);req.set_message_id(msgid);//向服務器進行發送請求_codec->send(_conn,req);//等待服務器waitResponse(rid);//返回return;}bool basicConsume(const std::string consume_tag,const std::string queue_name,bool auto_ack,const ConsumerCallback &cb){if(_consumer.get()!=nullptr){DLOG("當前信道已訂閱其他隊列消息");return false;}std::string rid=UUIDHelper::uuid();ys::basicConsumeRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_consume_tag(consume_tag);req.set_queue_name(queue_name);req.set_auto_ack(auto_ack);//向服務器進行發送請求_codec->send(_conn,req);//等待服務器basicCommonResponsePtr resp=waitResponse(rid);if(resp->ok()==false){DLOG("添加訂閱失敗!");return false;}_consumer=std::make_shared<Consumer>(consume_tag,queue_name,auto_ack,cb);//返回return resp->ok();}void basicCancel(){//不一定是消費者if(_consumer.get()==nullptr){DLOG("取消訂閱時,找不到消費者信息");return;}std::string rid=UUIDHelper::uuid();ys::basicCancelRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_consume_tag(_consumer->_cid);//向服務器進行發送請求_codec->send(_conn,req);//等待服務器waitResponse(rid);_consumer.reset();//返回return;}std::string cid(){return _cid;}public://連接收到基礎響應后,向hash_map中進行添加響應void putBasicResponse(const basicCommonResponsePtr& resp){std::unique_lock<std::mutex> lock(_mutex);_basic_resp.insert(std::make_pair(resp->rid(),resp));_cv.notify_all();}//連接收到消息推送后,需要通過信道進行找到對應的消費對象,通過回調函數進行消息處理void consume(const basicConsumeResponsePtr& resp){if(_consumer.get()==nullptr){DLOG("消息處理時,未找到訂閱者消息");return;}if(_consumer->_cid!=resp->consume_tag()){DLOG("收到的推送消息中的消費者標識,與當前信道的消費者標識不一致!");return;}_consumer->_callback(resp->consume_tag(),resp->mutable_properties(),resp->body());return;}private://等待請求的響應basicCommonResponsePtr waitResponse(const std::string& rid){std::unique_lock<std::mutex> lock(_mutex);//while(condition()) _cv.wait();_cv.wait(lock,[&rid,this](){return _basic_resp.find(rid)!=_basic_resp.end();});basicCommonResponsePtr basic_resp=_basic_resp[rid];_basic_resp.erase(rid);return basic_resp;}private:std::string _cid;muduo::net::TcpConnectionPtr _conn;ProtobufCodecPtr _codec;Consumer::ptr _consumer;std::mutex _mutex;std::condition_variable _cv;std::unordered_map<std::string,basicCommonResponsePtr> _basic_resp;};
該模塊的注意事項
_consumer 這個成員是不需要在構造函數時進行初始化,當前這個信道要進行訂閱某個消息的時候,才能確定這個角色是一個消費者角色,此時在進去構建,要是再構造函數的過程中就進行去構建,萬一這個信道的角色是發布客戶端,就造成了資源的浪費
在移除信道的時候要是消費者需要進行取消訂閱一下,因此添加一個析構函數
8.2.4、對提供創建信道操作
信道的增刪查
class ChannelManager{public:using ptr=std::shared_ptr<ChannelManager>;ChannelManager(){}Channel::ptr create(const muduo::net::TcpConnectionPtr& conn,const ProtobufCodecPtr& codec){std::unique_lock<std::mutex> lock(_mutex);auto channel =std::make_shared<Channel>(conn,codec);_channels.insert(std::make_pair(channel->cid(),channel));return channel;}void remove(const std::string& cid){//進行刪除的時候還需要進行考慮是否為消費者(是需要進行取消訂閱)std::unique_lock<std::mutex> lock(_mutex);_channels.erase(cid);}Channel::ptr get(const std::string& cid){auto pos=_channels.find(cid);if(pos==_channels.end()){return Channel::ptr();}return pos->second;}private:std::mutex _mutex;std::unordered_map<std::string,Channel::ptr> _channels;};
九、功能聯調
9.1、聯調的思想
- 必須有一個生產者客戶端
- 聲明一個交換機exchange1
- 聲明兩個隊列queue1(其中binding_key=queue1)、queue2(binding_key=new.music.#)
- 將這兩個隊列和交換機進行綁定起來
- 搭建兩個消費者客戶端,分別進行各自訂閱一個隊列的消息
- 第一次,將交換機的類型進行定義為廣播交換模式:理論結果是兩個消費者客戶端都能拿到消息
- 第二次,將交換機的類型進行定義為直接交換模式:routing_key=queue1,理論是只有queue1能拿到消息
- 第三次,將交換機的類型進行定義成主題交換模式:routing_key=news.music.pop,理論是只有queue2能拿到結果
9.2、搭建發布客戶端
以廣播模式下的測試為例子
int main()
{//廣播交換下進行測試//直接交換下進行測試//主題交換下進行測試//1、實例化異步工作線程對象brush::AsyncWorker::ptr awp=std::make_shared<brush::AsyncWorker>();//2、實例化連接對象brush::Connection::ptr conn=std::make_shared<brush::Connection>("127.0.0.1",8085,awp);//3、通過連接進行創建信道brush::Channel::ptr channel=conn->openChannel();//4、通過信道提供的服務完成所需//4.1、聲明一個交換機exchange1,交換機的類型為廣播模式google::protobuf::Map<std::string, std::string> temp_args;channel->declareExchange("exchange1",ys::TOPIC,true,false,temp_args);//4.2、聲明兩個隊列queue1和queue2channel->declareQueue("queue1",true,false,false,temp_args);channel->declareQueue("queue2",true,false,false,temp_args);//4.3、綁定queue1-exchange1,且binding_key設置成queue1// 綁定queue2-exchange1,且binding_key設置成news.music.# channel->queueBind("exchange1","queue1","queue1");channel->queueBind("exchange1","queue2","news.music.#");//5、循環向交換機進行發布信息//廣播交換// for(int i=1;i<10;i++)// {// std::string msg="Hello world-"+std::to_string(i);// channel->basicPublish("exchange1",nullptr,msg);// DLOG("向交換機exchange進行發布了消息:%s",msg.c_str());// }//直接交換// for(int i=0;i<10;i++)// {// ys::BasicProperties bp;// bp.set_id(brush::UUIDHelper::uuid());// bp.set_delivery_mode(ys::DeliveryMode::DURABLE);// bp.set_routing_key("queue1");// std::string msg="Hello world-"+std::to_string(i);// channel->basicPublish("exchange1",&bp,msg);// DLOG("向交換機exchange進行發布了消息:%s",msg.c_str());// }//主題交換for(int i=0;i<10;i++){ys::BasicProperties bp;bp.set_id(brush::UUIDHelper::uuid());bp.set_delivery_mode(ys::DeliveryMode::DURABLE);bp.set_routing_key("news.music.pop");std::string msg="Hello world-"+std::to_string(i);channel->basicPublish("exchange1",&bp,msg);DLOG("向交換機exchange進行發布了消息:%s",msg.c_str());}ys::BasicProperties bp;bp.set_id(brush::UUIDHelper::uuid());bp.set_delivery_mode(ys::DeliveryMode::DURABLE);bp.set_routing_key("news.music.sport");std::string msg="Hello brush-";channel->basicPublish("exchange1",&bp,msg);DLOG("向交換機exchange進行發布了消息:%s",msg.c_str());//6、關閉信道//channel->closeChannel();conn->closeChannel(channel);
}
9.3、搭建消費者客戶端
同樣是以廣播模式下的測試為例子
//需要進行增加傳參
void cb(brush::Channel::ptr& channel, const std::string& consumer_tag, const ys::BasicProperties *bp, const std::string&body)
{DLOG("%s - 消費了消息: %s",consumer_tag.c_str(),body.c_str());std::cout<<"body:"<<body<<std::endl;channel->basicAck(bp->id());
}
int main(int argc,char*argv[])
{if(argc!=2){DLOG("usage: ./consumer_client queue1");return -1;}//1、實例化異步工作線程對象brush::AsyncWorker::ptr awp=std::make_shared<brush::AsyncWorker>();//2、實例化連接對象brush::Connection::ptr conn=std::make_shared<brush::Connection>("127.0.0.1",8085,awp);//DLOG("實例化連接成功");//3、通過連接進行創建信道brush::Channel::ptr channel=conn->openChannel();//DLOG("打開信道成功");//4、通過信道提供的服務完成所需//4.1、聲明一個交換機exchange1,交換機的類型為廣播模式google::protobuf::Map<std::string, std::string> temp_args;channel->declareExchange("exchange1",ys::TOPIC,true,false,temp_args);//4.2、聲明兩個隊列queue1和queue2channel->declareQueue("queue1",true,false,false,temp_args);channel->declareQueue("queue2",true,false,false,temp_args);//4.3、綁定queue1-exchange1,且binding_key設置成queue1// 綁定queue2-exchange1,且binding_key設置成news.music.# channel->queueBind("exchange1","queue1","queue1");channel->queueBind("exchange1","queue2","news.music.#");//5、進行訂閱隊列的消息(回調函數對消息進行確認)//auto functor=std::bind(cb,channel,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3);auto functor = std::bind(cb, channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);channel->basicConsume("consumer1",argv[1],false,functor);while(1) std::this_thread::sleep_for(std::chrono::seconds(3));//6、關閉信道conn->closeChannel(channel);return 0;
}
十、項目的擴展
- 我們項目中只實現了一個虛擬機的版本,實際上是可以有多個虛擬機的
- 我們是通過代碼進行搭建客戶端進行訪問服務器的,可以進行拓展成管理接口,然后通過可視化的界面進行客戶端的搭建
- 交換機/隊列的獨占模式和自動刪除
- 發送方式的確認(broker 給生產者進行確認應答)功能也可以進行拓展實現