高性能分布式消息隊列系統(四)

八、客戶端模塊的實現

客戶端實現的總體框架

在 RabbitMQ 中,應用層提供消息服務的核心實體是 信道(Channel)
用戶想要與消息隊列服務器交互時,通常不會直接操作底層的 TCP 連接,而是通過信道來進行各種消息的發布、訂閱、確認等操作。

信道可以看作是在單個 TCP 連接之上的輕量級虛擬連接,它負責封裝具體的協議細節,屏蔽了網絡通信的復雜性。
用戶只需要調用信道提供的接口,發送消息或接收消息,無需關心底層數據如何傳輸、協議如何實現。

簡單來說,用戶面向的是信道服務接口,而信道背后處理了連接管理、數據編解碼、協議交互等工作,實現了業務與網絡通信的解耦。

客戶端設計視角和服務器視角的對比

方面客戶端視角(信道)服務端視角(連接+信道)
主要關注點?調用信道接口完成消息操作管理連接和信道,執行底層協議與業務邏輯
抽象層級? ?抽象出具體網絡和協議細節解析信道請求,實現消息路由和持久化
資源管理? ??不關心連接具體實現,只用信道管理物理連接、信道狀態及相關資源
多路復用? ??多信道復用同一連接,簡化調用維護多信道,保證隔離與并發性能
用戶責任? ? ?只需調用信道接口? ? ? ? ? ? ? ? ? ? ? ? ??處理協議解析、消息調度、資源分配

  • 連接模塊

客戶端和服務端進行網絡連接的基礎。

一個直接面向用戶的模塊,內部包含多個對外提供服務的接口,用戶需要什么服務進行調用對應的接口即可,其中包含交換機的聲明/刪除,隊列的聲明與刪除,隊列的綁定與解綁,消息的發布與訂閱,訂閱和解除訂閱。

表達了客戶端與服務器之間在消息隊列系統中協作的流程

在仿 RabbitMQ 的消息隊列系統中,客戶端首先通過訂閱者模塊注冊自身的消費者身份,并指定對應的消息處理回調函數;隨后通過信道模塊在單一的 TCP 連接上實現多路復用,創建多個邏輯信道以并行處理不同的消息服務(如發布、訂閱、隊列管理等)。客戶端通過連接模塊建立與服務器的連接,并在信道中發起具體的請求服務。

服務器接收到連接請求后,由服務器端的連接管理器創建連接上下文,并根據信道中攜帶的請求類型,路由到對應的處理模塊(如交換機、隊列或消息模塊),執行相應的業務邏輯。處理結果再由服務器的異步線程池將數據封裝好并通過網絡返回給客戶端,客戶端的異步事件機制或線程池再觸發對應的回調函數完成消息消費流程。

基于以上模塊實現客戶端的思路就非常清晰了

1、實例化異步線程對象

2、實例化連接對象

3、通過連接對象進行創建信道

4、根據信道進行獲取自己的所需服務

5、關閉信道

6、關閉連接

8.1、訂閱者模塊

訂閱者對象的設計

一個不向用戶進行直接展示的模塊,在客戶端中進行體現的作用就是對角色的描述,表示這就是一個消費者,用戶通常不直接操作訂閱邏輯,而是通過定義“消費者”的方式進行消息處理邏輯的注冊。

一個信道只有一個訂閱者,所以說不需要進行訂閱者的管理。訂閱者這個模塊很簡單,沒有涉及到一些業務模塊的內容,業務模塊的服務都在信道模塊進行提供。

訂閱者模塊(消費者)這個類中成員變量的設計

  • 首先需要定義消費者ID,描述了收到該條消息后該如何進行對這條消息進行處理。
  • 其次是要進行訂閱的哪個隊列的ID和自動刪除標志,描述了收到消息后是否需要對消息進行回復,是否要進行自動刪除。
  • 最后是回調函數,描述了從隊列中進行獲取消息后應該如何進行處理,這部分由用戶進行決定。

訂閱者模塊的實現

using ConsumerCallback = std::function<void(const std::string, const ys::BasicProperties *bp, const std::string)>;struct Consumer{using ptr=std::shared_ptr<Consumer>;std::string _cid;std::string _qname;bool _auto_ack;ConsumerCallback _callback;Consumer(){DLOG("new Consumer:%p",this);}Consumer(const std::string &cid, const std::string &qname, bool auto_ack, const ConsumerCallback &cb): _cid(cid), _qname(qname), _auto_ack(auto_ack), _callback(std::move(cb)){DLOG("new Consumer:%p",this);}~Consumer(){DLOG("del Consumer:%p",this);}};

在構造函數中,ConsumerCallback(是一個 std::function)內部可能有復雜對象(比如 Lambda、綁定的資源等),如果直接寫 _callback(cb),會調用拷貝構造,可能涉及較多內存分配、資源拷貝,用 std::move(cb),可以讓 ConsumerCallback 的內部資源被移動到 _callback,更高效。

?8.2、異步工作線程模塊

用戶雖然是通過信道進行網絡通信的,但是網絡通信的本質還是需要進行IO事件的監控的,這就要通過IO監控線程來進行整,不能在當前線程進行IO事件的監控,這樣的話就會在當前線程進行阻塞住了.下面通過表格的方式進行說明

模塊loopthreadpool
訂閱客戶端負責監聽服務器消息推送(socket 讀事件)- 接收到消息后,異步將業務處理放到 pool 中去執行,因為收到的消息可能需要進行處理會耗時,防止主線程阻塞,無法進行監聽服務器消息的推送
發布客戶端負責監聽向服務器發送消息后的 socket 可寫事件方便繼續向服務器進行發送數據(或者響應 ack 等)- 應用層調用發布接口時,耗時操作(如序列化、日志記錄)在 pool 中處理
class AsyncWorker{public:using ptr=std::shared_ptr<AsyncWorker>;muduo::net::EventLoopThread loopthread;threadpool pool;};

8.3、連接模塊

這其實就是我們在Demo模塊利用muduo庫進行搭建的客戶端,這里其實就是換了一層皮,稱為連接模塊。

class Connection{public:using ptr=std::shared_ptr<Connection>;typedef std::shared_ptr<google::protobuf::Message> MessagePtr;Connection(const std::string &sip, int sport,AsyncWorker::ptr worker): _latch(1), _client(worker->loopthread.startLoop(), muduo::net::InetAddress(sip, sport), "Client"), _dispatcher(std::bind(&Connection::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),_worker(worker),_channel_manager(std::make_shared<ChannelManager>()){_dispatcher.registerMessageCallback<ys::basicConsumeResponse>(std::bind(&Connection::consumeResponse, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ys::basicCommonResponse>(std::bind(&Connection::basicResponse, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(),std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setConnectionCallback(std::bind(&Connection::onConnection, this, std::placeholders::_1));_client.connect();_latch.wait(); // 阻塞等待,直到連接建立成功}Channel::ptr openChannel() {Channel::ptr channel = _channel_manager->create(_conn,_codec);bool ret=channel->opneChannel();if(ret==false){DLOG("打開信道失敗");return Channel::ptr();}return channel;}void closeChannel(const Channel::ptr& channel){channel->closeChannel();_channel_manager->remove(channel->cid());}private://收到基礎響應void basicResponse(const muduo::net::TcpConnectionPtr &conn, const basicCommonResponsePtr &message, muduo::Timestamp){//1、找到信道Channel::ptr channel=_channel_manager->get(message->cid());if(channel.get()==nullptr){DLOG("未找到信道消息");return;}//2、將得到的響應對象進行添加到信道的基礎響應channel->putBasicResponse(message);}//收到消息推送void consumeResponse(const muduo::net::TcpConnectionPtr &conn, const basicConsumeResponsePtr &message, muduo::Timestamp){//1、找到信道Channel::ptr channel=_channel_manager->get(message->cid());if(channel.get()==nullptr){DLOG("未找到信道消息");return;}//2、封裝異步任務(消息處理任務),拋入線程池_worker->pool.push([channel,message](){channel->consume(message);});}void onUnknownMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp){LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown();}void onConnection(const muduo::net::TcpConnectionPtr &conn){if (conn->connected()){_latch.countDown(); // 喚醒主線程中的阻塞_conn = conn;}else{// 連接關閉時的操作_conn.reset();}}private:muduo::CountDownLatch _latch; // 實現同步的//muduo::net::EventLoopThread _loopthread; // 異步循環處理線程AsyncWorker::ptr _worker;muduo::net::TcpConnectionPtr _conn; // 客戶端對應的連接muduo::net::TcpClient _client;      // 客戶端ProtobufDispatcher _dispatcher;     // 請求分發器ProtobufCodecPtr _codec;               // 協議處理器ChannelManager::ptr _channel_manager;};

在連接模塊這里是有一個極易容易進行掉進坑里的陷阱 :當發布客戶端進行向服務器進行發送建立連接請求的時候,由于TCP是有發送緩沖區和接收緩沖區的,當請求被發送到發送緩存區的時候,就會默認連接建立成功,但是此時的連接是沒有被建立成功的,此時發布客戶端誤以為連接是建立成功的,就會執行后續操作,向服務器進行發送消息,此時就出現了問題。同樣訂閱客戶端也類似。

因此在onConnection 函數中需要進行判斷是否是真正的建立連接成功。

consumeResponse中

當連接收到消息推送后,需要_consumer 進行參與,因為只有consumer中有回調函數,知道進行收到消息推送時如何進行處理,這個接口到時候收到消息之后和消息一起進行封裝成一個任務,把這個任務放到線程池中,并不在當前的主執行流中進行執行。

8.4、信道管理模塊

信道模塊的定位與主要職責

信道不僅僅是數據的通道,還承載著客戶端的業務接口,因此這個模塊不僅要進行信道結構的設計,還需要進行提供對應的業務邏輯。信道類可以理解為客戶端在和消息服務器交互時的一條邏輯通道。它并不是單純的數據結構,而是抽象出與服務器交互(各種請求/響應、狀態維護等)的一套完整業務流程封裝

換句話說,其實可以將信道模塊進行理解成將訂閱者模塊、異步線程模塊、和連接模塊進行統一封裝管理。

客戶端信道模塊和客戶端其他模塊之間的交互關系

模塊交互內容
連接模塊
muduo::net::TcpConnection
- Channel 直接持有 TCP 連接 _conn
- 通過 _conn 發送請求消息給服務器
- 服務器響應也通過該連接返回
訂閱者模塊
Consumer
- Channel 通過 basicConsume() 創建訂閱者對象 _consumer
- 收到推送消息時,Channel::consume() 回調訂閱者的處理邏輯
異步線程模塊
(muduo 的 IO 線程)
- 服務器響應由 IO 線程收到
- 觸發 Channel::putBasicResponse(),將響應加入 _basic_resp
- 觸發 Channel::consume(),調用用戶回調

8.2.1、信道管理的信息

  • 信道ID
  • 信道關聯的網絡通信連接的對象
  • protobuf 協議處理對象
  • 信道關聯的消費者
  • 請求對應的響應信息隊列(這里隊列使用<請求ID,響應>hash表,一遍進行查找指定的響應)
  • 互斥鎖&條件變量(大部分的請求都是阻塞操作,發送請求后需要進行等到響應才能繼續,但是muduo庫的通信是異步的,因此需要我們子啊收到響應后,通過判斷是否是等待的響應來進行同步)。?

8.2.2、信道管理的操作

  • 創建信道的操作
  • 提供刪除信道操作
  • 提供聲明交換機操作(強斷言-有則OK,沒有則創建)
  • 提供刪除交換機
  • 提供創建隊列操作(強斷言-有則OK,沒有則創建)
  • 提供刪除隊列操作
  • 提供交換機-隊列綁定操作
  • 提供交換機-隊列解除綁定操作
  • 提供添加訂閱操作
  • 提供取消訂閱操作
  • 提供發布消息操作
  • 提供確認消息操作信道模塊進行管理
using ProtobufCodecPtr=std::shared_ptr<ProtobufCodec>;using basicCommonResponsePtr=std::shared_ptr<ys::basicCommonResponse>;using basicConsumeResponsePtr=std::shared_ptr<ys::basicConsumeResponse>;class Channel{public:using ptr=std::shared_ptr<Channel>;Channel(const muduo::net::TcpConnectionPtr& conn,const ProtobufCodecPtr& codec):_cid(UUIDHelper::uuid()),_conn(conn),_codec(codec){}~Channel(){basicCancel();}bool opneChannel(){std::string rid=UUIDHelper::uuid();ys::openChannelRequest req;req.set_rid(rid);req.set_cid(_cid);//向服務器進行發送請求_codec->send(_conn,req);//等待服務器basicCommonResponsePtr resp=waitResponse(rid);//返回return resp->ok();}void closeChannel(){std::string rid=UUIDHelper::uuid();ys::closeChannelRequest req;req.set_rid(rid);req.set_cid(_cid);//向服務器進行發送請求_codec->send(_conn,req);//等待服務器waitResponse(rid);//返回return;}bool declareExchange(const std::string& name,ys::ExchangeType type,bool durable,bool auto_delete,google::protobuf::Map<std::string,std::string>& args){//構造一個聲明虛擬機的請求對象ys::declareExchangeRequest req;std::string rid=UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(name);req.set_exchange_type(type);req.set_durable(durable);req.set_auto_delete(auto_delete);req.mutable_args()->swap(args);//向服務器進行發送請求_codec->send(_conn,req);//等待服務器basicCommonResponsePtr resp=waitResponse(rid);//返回return resp->ok();}void deleteExchange(const std::string& name){ys::deleteExchangeRequest req;std::string rid=UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(name);//向服務器進行發送請求_codec->send(_conn,req);//等待服務器basicCommonResponsePtr resp=waitResponse(rid);return;}bool declareQueue(const std::string& qname,bool qdurable,bool qexclusive,bool qauto_delete,google::protobuf::Map<std::string,std::string> qargs){ys::declareQueueRequest req;std::string rid=UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(qname);req.set_durable(qdurable);req.set_exclusive(qexclusive);req.set_auto_delete(qauto_delete);req.mutable_args()->swap(qargs);//向服務器進行發送請求_codec->send(_conn,req);//等待服務器basicCommonResponsePtr resp=waitResponse(rid);//返回return resp->ok();}void deleteQueue(const std::string& qname){ys::deleteQueueRequest req;std::string rid=UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(qname);//向服務器進行發送請求_codec->send(_conn,req);//等待服務器waitResponse(rid);//返回return;}bool queueBind(const std::string& ename,const std::string& qname,const std::string& key){ys::queueBindRequest req;std::string rid=UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(ename);req.set_queue_name(qname);req.set_binding_key(key);//向服務器進行發送請求_codec->send(_conn,req);//等待服務器basicCommonResponsePtr resp=waitResponse(rid);//返回return resp->ok();}void queueUnBind(const std::string& ename,const std::string& qname){ys::queueUnBindRequest req;std::string rid=UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(ename);req.set_queue_name(qname);//向服務器進行發送請求_codec->send(_conn,req);//等待服務器waitResponse(rid);//返回return;}bool basicPublish(const std::string &ename, ys::BasicProperties *bp,  const std::string &body){std::string rid=UUIDHelper::uuid();ys::basicPublishRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_body(body);req.set_exchange_name(ename);if(bp!=nullptr){req.mutable_properties()->set_id(bp->id());req.mutable_properties()->set_delivery_mode(bp->delivery_mode());req.mutable_properties()->set_routing_key(bp->routing_key());}//向服務器進行發送請求_codec->send(_conn,req);//等待服務器basicCommonResponsePtr resp=waitResponse(rid);//返回return resp->ok();}void basicAck(const std::string &msgid)//刪除參數qname,用戶知道消費了哪個隊列{if(_consumer.get()==nullptr){DLOG("消息確認時,找不到消費者的消息");return;}std::string rid=UUIDHelper::uuid();ys::basicAckRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(_consumer->_qname);req.set_message_id(msgid);//向服務器進行發送請求_codec->send(_conn,req);//等待服務器waitResponse(rid);//返回return;}bool basicConsume(const std::string consume_tag,const std::string queue_name,bool auto_ack,const ConsumerCallback &cb){if(_consumer.get()!=nullptr){DLOG("當前信道已訂閱其他隊列消息");return false;}std::string rid=UUIDHelper::uuid();ys::basicConsumeRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_consume_tag(consume_tag);req.set_queue_name(queue_name);req.set_auto_ack(auto_ack);//向服務器進行發送請求_codec->send(_conn,req);//等待服務器basicCommonResponsePtr resp=waitResponse(rid);if(resp->ok()==false){DLOG("添加訂閱失敗!");return false;}_consumer=std::make_shared<Consumer>(consume_tag,queue_name,auto_ack,cb);//返回return resp->ok();}void basicCancel(){//不一定是消費者if(_consumer.get()==nullptr){DLOG("取消訂閱時,找不到消費者信息");return;}std::string rid=UUIDHelper::uuid();ys::basicCancelRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_consume_tag(_consumer->_cid);//向服務器進行發送請求_codec->send(_conn,req);//等待服務器waitResponse(rid);_consumer.reset();//返回return;}std::string cid(){return _cid;}public://連接收到基礎響應后,向hash_map中進行添加響應void putBasicResponse(const basicCommonResponsePtr& resp){std::unique_lock<std::mutex> lock(_mutex);_basic_resp.insert(std::make_pair(resp->rid(),resp));_cv.notify_all();}//連接收到消息推送后,需要通過信道進行找到對應的消費對象,通過回調函數進行消息處理void consume(const basicConsumeResponsePtr& resp){if(_consumer.get()==nullptr){DLOG("消息處理時,未找到訂閱者消息");return;}if(_consumer->_cid!=resp->consume_tag()){DLOG("收到的推送消息中的消費者標識,與當前信道的消費者標識不一致!");return;}_consumer->_callback(resp->consume_tag(),resp->mutable_properties(),resp->body());return;}private://等待請求的響應basicCommonResponsePtr waitResponse(const std::string& rid){std::unique_lock<std::mutex> lock(_mutex);//while(condition()) _cv.wait();_cv.wait(lock,[&rid,this](){return _basic_resp.find(rid)!=_basic_resp.end();});basicCommonResponsePtr basic_resp=_basic_resp[rid];_basic_resp.erase(rid);return basic_resp;}private:std::string _cid;muduo::net::TcpConnectionPtr _conn;ProtobufCodecPtr _codec;Consumer::ptr _consumer;std::mutex _mutex;std::condition_variable _cv;std::unordered_map<std::string,basicCommonResponsePtr> _basic_resp;};

該模塊的注意事項

_consumer 這個成員是不需要在構造函數時進行初始化,當前這個信道要進行訂閱某個消息的時候,才能確定這個角色是一個消費者角色,此時在進去構建,要是再構造函數的過程中就進行去構建,萬一這個信道的角色是發布客戶端,就造成了資源的浪費

在移除信道的時候要是消費者需要進行取消訂閱一下,因此添加一個析構函數

8.2.4、對提供創建信道操作

信道的增刪查

class ChannelManager{public:using ptr=std::shared_ptr<ChannelManager>;ChannelManager(){}Channel::ptr create(const muduo::net::TcpConnectionPtr& conn,const ProtobufCodecPtr& codec){std::unique_lock<std::mutex> lock(_mutex);auto channel =std::make_shared<Channel>(conn,codec);_channels.insert(std::make_pair(channel->cid(),channel));return channel;}void remove(const std::string& cid){//進行刪除的時候還需要進行考慮是否為消費者(是需要進行取消訂閱)std::unique_lock<std::mutex> lock(_mutex);_channels.erase(cid);}Channel::ptr get(const std::string& cid){auto pos=_channels.find(cid);if(pos==_channels.end()){return Channel::ptr();}return pos->second;}private:std::mutex _mutex;std::unordered_map<std::string,Channel::ptr> _channels;};

九、功能聯調

9.1、聯調的思想

  • 必須有一個生產者客戶端
    • 聲明一個交換機exchange1
    • 聲明兩個隊列queue1(其中binding_key=queue1)、queue2(binding_key=new.music.#)
    • 將這兩個隊列和交換機進行綁定起來
  • 搭建兩個消費者客戶端,分別進行各自訂閱一個隊列的消息
    • 第一次,將交換機的類型進行定義為廣播交換模式:理論結果是兩個消費者客戶端都能拿到消息
    • 第二次,將交換機的類型進行定義為直接交換模式:routing_key=queue1,理論是只有queue1能拿到消息
    • 第三次,將交換機的類型進行定義成主題交換模式:routing_key=news.music.pop,理論是只有queue2能拿到結果

9.2、搭建發布客戶端

以廣播模式下的測試為例子


int main()
{//廣播交換下進行測試//直接交換下進行測試//主題交換下進行測試//1、實例化異步工作線程對象brush::AsyncWorker::ptr awp=std::make_shared<brush::AsyncWorker>();//2、實例化連接對象brush::Connection::ptr conn=std::make_shared<brush::Connection>("127.0.0.1",8085,awp);//3、通過連接進行創建信道brush::Channel::ptr channel=conn->openChannel();//4、通過信道提供的服務完成所需//4.1、聲明一個交換機exchange1,交換機的類型為廣播模式google::protobuf::Map<std::string, std::string> temp_args;channel->declareExchange("exchange1",ys::TOPIC,true,false,temp_args);//4.2、聲明兩個隊列queue1和queue2channel->declareQueue("queue1",true,false,false,temp_args);channel->declareQueue("queue2",true,false,false,temp_args);//4.3、綁定queue1-exchange1,且binding_key設置成queue1//     綁定queue2-exchange1,且binding_key設置成news.music.# channel->queueBind("exchange1","queue1","queue1");channel->queueBind("exchange1","queue2","news.music.#");//5、循環向交換機進行發布信息//廣播交換// for(int i=1;i<10;i++)// {//     std::string msg="Hello world-"+std::to_string(i);//     channel->basicPublish("exchange1",nullptr,msg);//     DLOG("向交換機exchange進行發布了消息:%s",msg.c_str());// }//直接交換// for(int i=0;i<10;i++)// {//     ys::BasicProperties bp;//     bp.set_id(brush::UUIDHelper::uuid());//     bp.set_delivery_mode(ys::DeliveryMode::DURABLE);//     bp.set_routing_key("queue1");//     std::string msg="Hello world-"+std::to_string(i);//     channel->basicPublish("exchange1",&bp,msg);//     DLOG("向交換機exchange進行發布了消息:%s",msg.c_str());// }//主題交換for(int i=0;i<10;i++){ys::BasicProperties bp;bp.set_id(brush::UUIDHelper::uuid());bp.set_delivery_mode(ys::DeliveryMode::DURABLE);bp.set_routing_key("news.music.pop");std::string msg="Hello world-"+std::to_string(i);channel->basicPublish("exchange1",&bp,msg);DLOG("向交換機exchange進行發布了消息:%s",msg.c_str());}ys::BasicProperties bp;bp.set_id(brush::UUIDHelper::uuid());bp.set_delivery_mode(ys::DeliveryMode::DURABLE);bp.set_routing_key("news.music.sport");std::string msg="Hello brush-";channel->basicPublish("exchange1",&bp,msg);DLOG("向交換機exchange進行發布了消息:%s",msg.c_str());//6、關閉信道//channel->closeChannel();conn->closeChannel(channel);
}

9.3、搭建消費者客戶端

同樣是以廣播模式下的測試為例子

//需要進行增加傳參
void cb(brush::Channel::ptr& channel, const std::string& consumer_tag, const ys::BasicProperties *bp, const std::string&body)
{DLOG("%s - 消費了消息: %s",consumer_tag.c_str(),body.c_str());std::cout<<"body:"<<body<<std::endl;channel->basicAck(bp->id());
}
int main(int argc,char*argv[])
{if(argc!=2){DLOG("usage: ./consumer_client queue1");return -1;}//1、實例化異步工作線程對象brush::AsyncWorker::ptr awp=std::make_shared<brush::AsyncWorker>();//2、實例化連接對象brush::Connection::ptr conn=std::make_shared<brush::Connection>("127.0.0.1",8085,awp);//DLOG("實例化連接成功");//3、通過連接進行創建信道brush::Channel::ptr channel=conn->openChannel();//DLOG("打開信道成功");//4、通過信道提供的服務完成所需//4.1、聲明一個交換機exchange1,交換機的類型為廣播模式google::protobuf::Map<std::string, std::string> temp_args;channel->declareExchange("exchange1",ys::TOPIC,true,false,temp_args);//4.2、聲明兩個隊列queue1和queue2channel->declareQueue("queue1",true,false,false,temp_args);channel->declareQueue("queue2",true,false,false,temp_args);//4.3、綁定queue1-exchange1,且binding_key設置成queue1//     綁定queue2-exchange1,且binding_key設置成news.music.# channel->queueBind("exchange1","queue1","queue1");channel->queueBind("exchange1","queue2","news.music.#");//5、進行訂閱隊列的消息(回調函數對消息進行確認)//auto functor=std::bind(cb,channel,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3);auto functor = std::bind(cb, channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);channel->basicConsume("consumer1",argv[1],false,functor);while(1) std::this_thread::sleep_for(std::chrono::seconds(3));//6、關閉信道conn->closeChannel(channel);return 0;
}

十、項目的擴展

  • 我們項目中只實現了一個虛擬機的版本,實際上是可以有多個虛擬機的
  • 我們是通過代碼進行搭建客戶端進行訪問服務器的,可以進行拓展成管理接口,然后通過可視化的界面進行客戶端的搭建
  • 交換機/隊列的獨占模式和自動刪除
  • 發送方式的確認(broker 給生產者進行確認應答)功能也可以進行拓展實現

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/82742.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/82742.shtml
英文地址,請注明出處:http://en.pswp.cn/web/82742.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

QPair 類說明

QPair 類說明 QPair 是一個模板類&#xff0c;用于存儲一對數據項。 頭文件&#xff1a; cpp #include <QPair> qmake 配置&#xff1a; QT core 所有成員列表&#xff08;包括繼承成員&#xff09; 公共類型 類型定義說明first_type第一個元素的類型&#xff…

4.大語言模型預備數學知識

大語言模型預備數學知識 復習一下在大語言模型中用到的矩陣和向量的運算&#xff0c;及概率統計和神經網絡中常用概念。 矩陣的運算 矩陣 矩陣加減法 條件&#xff1a;行數列數相同的矩陣才能做矩陣加減法 數值與矩陣的乘除法 矩陣乘法 條件&#xff1a;矩陣A的列數 矩陣…

uniapp 設置手機不息屏

在使用 UniApp 開發應用時&#xff0c;有時需要在設備長時間未操作時實現息屏保護功能&#xff0c;以節省電量和保護屏幕。以下是如何在 UniApp 中實現這一功能的步驟。 示例一 // 保持屏幕常亮 uni.setKeepScreenOn({keepScreenOn: true });// 監聽應用進入后臺事件 uni.onH…

智能推薦系統:協同過濾與深度學習結合

智能推薦系統&#xff1a;協同過濾與深度學習結合 系統化學習人工智能網站&#xff08;收藏&#xff09;&#xff1a;https://www.captainbed.cn/flu 文章目錄 智能推薦系統&#xff1a;協同過濾與深度學習結合摘要引言技術原理對比1. 協同過濾算法&#xff1a;基于相似性的推…

使用Python和OpenCV實現圖像識別與目標檢測

在計算機視覺領域&#xff0c;圖像識別和目標檢測是兩個非常重要的任務。圖像識別是指識別圖像中的內容&#xff0c;例如判斷一張圖片中是否包含某個特定物體&#xff1b;目標檢測則是在圖像中定位并識別多個物體的位置和類別。OpenCV是一個功能強大的開源計算機視覺庫&#xf…

《基于Apache Flink的流處理》筆記

思維導圖 1-3 章 4-7章 8-11 章 參考資料 源碼&#xff1a; https://github.com/streaming-with-flink 博客 https://flink.apache.org/bloghttps://www.ververica.com/blog 聚會及會議 https://flink-forward.orghttps://www.meetup.com/topics/apache-flink https://n…

LLaMA-Factory 微調 Qwen2-VL 進行人臉情感識別(二)

在上一篇文章中,我們詳細介紹了如何使用LLaMA-Factory框架對Qwen2-VL大模型進行微調,以實現人臉情感識別的功能。本篇文章將聚焦于微調完成后,如何調用這個模型進行人臉情感識別的具體代碼實現,包括詳細的步驟和注釋。 模型調用步驟 環境準備:確保安裝了必要的Python庫。…

Splash動態渲染技術全解析:從基礎到企業級應用(2025最新版)

引言 在Web 3.0時代&#xff0c;87%的網站采用JavaScript動態渲染技術。傳統爬蟲難以應對Ajax加載、SPA應用等場景&#xff0c;Splash作為專業的JavaScript渲染服務&#xff0c;憑借??Lua腳本控制??和??異步處理能力??&#xff0c;已成為動態數據抓取的核心工具。本文…

【應用】Ghost Dance:利用慣性動捕構建虛擬舞伴

Ghost Dance是葡萄牙大學的一個研究項目&#xff0c;研究方向是探索人與人之間的聯系&#xff0c;以及如何通過虛擬舞伴重現這種聯系。項目負責人Cecilia和Rui利用慣性動捕創造出具有流暢動作的虛擬舞伴&#xff0c;讓現實中的舞者也能與之共舞。 挑戰&#xff1a;Ghost Danc…

廣目軟件GM DC Monitor

廣目&#xff08;北京&#xff09;軟件有限公司成立于2024年&#xff0c;技術和研發團隊均來自于一家具有近10年監控系統研發的企業。廣目的技術團隊一共實施了9家政府單位、1家股份制銀行、1家芯片制造企業的數據中心監控預警項目。這11家政企單位由2家正部級、1家副部級、6家…

12-Oracle 23ai Vector 使用ONNX模型生成向量嵌入

一、Oracle 23ai Vector Embeddings 核心概念? 向量嵌入&#xff08;Vector Embeddings&#xff09;?? -- 將非結構化數據&#xff08;文本/圖像&#xff09;轉換為數值向量 - - 捕獲數據的語義含義而非原始內容 - 示例&#xff1a;"數據庫" → [0.24, -0.78, 0.5…

用 NGINX 構建高效 POP3 代理`ngx_mail_pop3_module`

一、模塊定位與作用 協議代理 ngx_mail_pop3_module 讓 NGINX 能夠充當 POP3 代理&#xff1a;客戶端與后端 POP3 服務器之間的所有請求均轉發到 NGINX&#xff0c;由 NGINX 負責與后端會話邏輯。認證方式控制 通過 pop3_auth 指令指定允許客戶端使用的 POP3 認證方法&#xf…

每日算法 -【Swift 算法】三數之和

Swift&#xff5c;三數之和&#xff08;3Sum&#xff09;詳細題解 注釋 拓展&#xff08;LeetCode 15&#xff09; ?題目描述 給你一個包含 n 個整數的數組 nums&#xff0c;判斷 nums 中是否存在三個元素 a, b, c&#xff0c;使得 a b c 0。請你找出所有和為 0 且不重…

服務器磁盤空間被Docker容器日志占滿處理方法

事發場景&#xff1a; 原本正常的服務停止運行了&#xff0c;查看時MQTT服務鏈接失敗&#xff0c;查看對應的容器服務發現是EMQX鏡像停止運行了&#xff0c;重啟也是也報錯無法正常運行&#xff0c;報錯如下圖&#xff1a; 報錯日志中連續出現兩個"no space left on devi…

令牌桶 滑動窗口->限流 分布式信號量->限并發的原理 lua腳本分析介紹

文章目錄 前言限流限制并發的實際理解限流令牌桶代碼實現結果分析令牌桶lua的模擬實現原理總結&#xff1a; 滑動窗口代碼實現結果分析lua腳本原理解析 限并發分布式信號量代碼實現結果分析lua腳本實現原理 雙注解去實現限流 并發結果分析&#xff1a; 實際業務去理解體會統一注…

基于uniapp+WebSocket實現聊天對話、消息監聽、消息推送、聊天室等功能,多端兼容

基于 ?UniApp + WebSocket?實現多端兼容的實時通訊系統,涵蓋WebSocket連接建立、消息收發機制、多端兼容性配置、消息實時監聽等功能,適配?微信小程序、H5、Android、iOS等終端 目錄 技術選型分析WebSocket協議優勢UniApp跨平臺特性WebSocket 基礎實現連接管理消息收發連接…

Linux中shell編程表達式和數組講解

一、表達式 1.1 測試表達式 樣式1: test 條件表達式 樣式2: [ 條件表達式 ] 注意&#xff1a;以上兩種方法的作用完全一樣&#xff0c;后者為常用。但后者需要注意方括號[、]與條件表達式之間至少有一個空格。test跟 [] 的意思一樣條件成立&#xff0c;狀態返回值是0條件不成…

深入了解JavaScript當中如何確定值的類型

JavaScript是一種弱類型語言&#xff0c;當你給一個變量賦了一個值&#xff0c;該值是什么類型的&#xff0c;那么該變量就是什么類型的&#xff0c;并且你還可以給一個變量賦多種類型的值&#xff0c;也不會報錯&#xff0c;這就是JavaScript的內部機制所決定的&#xff0c;那…

【p2p、分布式,區塊鏈筆記 MESH】Bluetooth藍牙通信拓撲與操作 BR/EDR(經典藍牙)和 BLE

目錄 1. BR/EDR&#xff08;經典藍牙&#xff09;網絡結構微微網&#xff08;Piconet&#xff09;散射網&#xff08;Scatternet&#xff09;藍牙 BR/EDR 拓撲結構示意圖 2. BLE&#xff08;低功耗藍牙&#xff09;網絡結構廣播器與觀察者&#xff08;Broadcaster and Observer…

C++虛函數表(虛表Virtual Table,簡稱vtable、VFT)(編譯器為支持運行時多態(動態綁定)而自動生成的一種內部數據結構)虛函數指針vptr

文章目錄 **1. 虛函數表的核心概念**- **虛函數表&#xff08;vtable&#xff09;**&#xff1a;- **虛函數指針&#xff08;vptr&#xff09;**&#xff1a; **2. 虛函數表的生成與工作流程****生成時機**- **當一個類中至少有一個虛函數時**&#xff0c;編譯器會為該類生成一…