一.什么是muduo?
muduo庫是?個基于非阻塞IO和事件驅動的C++高并發TCP網絡編程庫。
簡單來理解,它就是對原生的TCP套接字的封裝,是一個比socket編程接口更好用的編程庫。?
二.使用muduo庫完成一個英譯漢翻譯服務
TranslateServer.hpp:
#pragma once
#include <iostream>
#include <functional>
#include <unordered_map>
#include <string>
#include "muduo/net/TcpConnection.h"
#include "muduo/net/TcpServer.h"
#include "muduo/net/EventLoop.h"using std::cout;
using std::endl;
class TranslateServer
{
private:muduo::net::EventLoop _baseloop;muduo::net::TcpServer _server;
public:TranslateServer(int port):_server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port), \"TranslateServer", muduo::net::TcpServer::kReusePort){//bind是一個函數適配器_server.setConnectionCallback(std::bind(&TranslateServer::_onConnection, this, std::placeholders::_1));_server.setMessageCallback(std::bind(&TranslateServer::_onMessage, this, std::placeholders::_1, \std::placeholders::_2, std::placeholders::_3));}void start(){_server.start(); //開始事件監聽_baseloop.loop(); //開始事件監控,這是一個死循環阻塞接口}// typedef std::function<void (const TcpConnectionPtr&)> ConnectionCallback;
// typedef std::function<void (const TcpConnectionPtr&,
// Buffer*,
// Timestamp)> MessageCallback;//連接建立成功或者關閉時侯的回調函數void _onConnection(const muduo::net::TcpConnectionPtr& conn){if (conn->connected()){cout << "新連接建立成功\n";}else{cout << "連接關閉\n";}}//通信連接收到請求時的回調函數void _onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buffer, muduo::Timestamp time){std::string str = buffer->retrieveAllAsString();std::string resp = translate(str);conn->send(resp);}std::string translate(const std::string& str){static std::unordered_map<std::string, std::string> _dict = {{"hello", "你好"},{"white", "白色"}};if (_dict.count(str)){return _dict[str];}return "沒找到";}};
TranslateClient.hpp:
#pragma once
#include <functional>
#include <iostream>
#include "muduo/net/TcpClient.h"
#include "muduo/net/TcpConnection.h"
#include "muduo/net/EventLoopThread.h"class TranslateClient
{
private:muduo::net::EventLoopThread _loopThread; //EventLoop是阻塞式死循環,必須另起一個線程,否則用戶無法在主線程輸入。//_loopThread一建立就立馬啟動muduo::net::TcpClient _client;muduo::net::TcpConnectionPtr _conn;//TcpClient的connect是非阻塞接口,調用立馬返回,這有可能導致用戶send時尚未建立連接,而解引用空指針muduo::CountDownLatch _latch; //保證建立連接和send之間的同步關系
public:TranslateClient(const std::string& serverIp, int serverPort):_client(_loopThread.startLoop(), muduo::net::InetAddress(serverIp, serverPort), "TranslateClient"),_latch(1){_client.setConnectionCallback(std::bind(&TranslateClient::_onConnection, this, std::placeholders::_1));_client.setMessageCallback(std::bind(&TranslateClient::_onMessage, this, std::placeholders::_1, \std::placeholders::_2,std::placeholders::_3));}void connect(){_client.connect();_latch.wait();}bool send(std::string& msg){if (_conn->connected()){_conn->send(msg);return true;}else{return false;}}private:/*************** 連接建立或者斷開時的回調函數* **************/void _onConnection(const muduo::net::TcpConnectionPtr &conn){if (conn->connected()){_latch.countDown();_conn = conn;}else{_conn.reset();}}void _onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buffer, muduo::Timestamp time){std::cout << "翻譯結果:" << buffer->retrieveAllAsString() << std::endl;}
};
muduo的精髓在于大量的回調函數,建立或斷開連接,收到消息時,都會調用我們傳入的回調函數,回調函數就是我們處理業務的地方。
三.muduo中基于protobuf的自定義協議
像上述的英譯漢服務,雙方肯定是能正常通信,但這絕不是一個成熟的方案。TCP通信時面向字節流的,存在數據粘包問題,要想解決必須使用用戶層協議。
用戶層協議主要就是解決數據粘包問題,另外序列化和反序列化也是其中的重要環節。muduo庫是由陳碩大佬編寫的,在安裝好的muduo庫中,他提供了一些編程樣例,其中有一個就是基于protobuf,定制了一個用戶層協議,用于網絡通信。所以嚴格來說,該自定義協議并不是muduo庫中的一部分。
class ProtobufCodec : muduo::noncopyable
{public:enum ErrorCode{kNoError = 0,kInvalidLength,kCheckSumError,kInvalidNameLen,kUnknownMessageType,kParseError,};typedef std::function<void (const muduo::net::TcpConnectionPtr&,const MessagePtr&,muduo::Timestamp)> ProtobufMessageCallback;typedef std::function<void (const muduo::net::TcpConnectionPtr&,muduo::net::Buffer*,muduo::Timestamp,ErrorCode)> ErrorCallback;explicit ProtobufCodec(const ProtobufMessageCallback& messageCb): messageCallback_(messageCb),errorCallback_(defaultErrorCallback){}ProtobufCodec(const ProtobufMessageCallback& messageCb, const ErrorCallback& errorCb): messageCallback_(messageCb),errorCallback_(errorCb){}void onMessage(const muduo::net::TcpConnectionPtr& conn,muduo::net::Buffer* buf,muduo::Timestamp receiveTime);void send(const muduo::net::TcpConnectionPtr& conn,const google::protobuf::Message& message){// FIXME: serialize to TcpConnection::outputBuffer()muduo::net::Buffer buf;fillEmptyBuffer(&buf, message);conn->send(&buf);}static const muduo::string& errorCodeToString(ErrorCode errorCode);static void fillEmptyBuffer(muduo::net::Buffer* buf, const google::protobuf::Message& message);static google::protobuf::Message* createMessage(const std::string& type_name);static MessagePtr parse(const char* buf, int len, ErrorCode* errorCode);private:static void defaultErrorCallback(const muduo::net::TcpConnectionPtr&,muduo::net::Buffer*,muduo::Timestamp,ErrorCode);ProtobufMessageCallback messageCallback_;ErrorCallback errorCallback_;const static int kHeaderLen = sizeof(int32_t);const static int kMinMessageLen = 2*kHeaderLen + 2; // nameLen + typeName + checkSumconst static int kMaxMessageLen = 64*1024*1024; // same as codec_stream.h kDefaultTotalBytesLimit
};
ProtobufCodec類就是基于protobuf定義的結構化數據的應用層協議,協議格式如下:
onMessage的實現如下:?
void ProtobufCodec::onMessage(const TcpConnectionPtr& conn,Buffer* buf,Timestamp receiveTime)
{while (buf->readableBytes() >= kMinMessageLen + kHeaderLen){const int32_t len = buf->peekInt32();if (len > kMaxMessageLen || len < kMinMessageLen){errorCallback_(conn, buf, receiveTime, kInvalidLength);break;}else if (buf->readableBytes() >= implicit_cast<size_t>(len + kHeaderLen)){ErrorCode errorCode = kNoError;MessagePtr message = parse(buf->peek()+kHeaderLen, len, &errorCode);if (errorCode == kNoError && message){messageCallback_(conn, message, receiveTime);buf->retrieve(kHeaderLen+len);}else{errorCallback_(conn, buf, receiveTime, errorCode);break;}}else{break;}}
}
onMessage函數解決了TCP粘包的問題,從緩沖區中解析出一個完整的protobuf結構化數據(一個message)后,再調用messageCallback_處理。messageCallback_是構造ProtobufCodec時傳入的回調函數。
如果我們的業務場景很單一,例如上面的英譯漢服務器,直接把我們寫的業務邏輯作為回調傳給messageCallback_就OK了。但如果我們有多種業務,例如翻譯和計算業務,則還可以在此基礎上引入任務分發器ProtobufDispatcher,回調它的ProtobufDispatcher函數。
class ProtobufDispatcher
{public:typedef std::function<void (const muduo::net::TcpConnectionPtr&,const MessagePtr& message,muduo::Timestamp)> ProtobufMessageCallback;explicit ProtobufDispatcher(const ProtobufMessageCallback& defaultCb): defaultCallback_(defaultCb){}void onProtobufMessage(const muduo::net::TcpConnectionPtr& conn,const MessagePtr& message,muduo::Timestamp receiveTime) const{CallbackMap::const_iterator it = callbacks_.find(message->GetDescriptor());if (it != callbacks_.end()){it->second->onMessage(conn, message, receiveTime);}else{defaultCallback_(conn, message, receiveTime);}}template<typename T>void registerMessageCallback(const typename CallbackT<T>::ProtobufMessageTCallback& callback){std::shared_ptr<CallbackT<T> > pd(new CallbackT<T>(callback));callbacks_[T::descriptor()] = pd;}private:typedef std::map<const google::protobuf::Descriptor*, std::shared_ptr<Callback> > CallbackMap;CallbackMap callbacks_;ProtobufMessageCallback defaultCallback_;
};
onProtobufMessage會根據你傳入的結構化數據類型(message),調用不同的回調函數,這些回調函數就是我們注冊的業務處理方法。
四.編寫一個翻譯+加法服務
- 編寫.并翻譯proto文件,構建翻譯的請求和響應,加法的請求和響應的類
- 編寫服務端
- 編寫客戶端
Server.cc:
#include <memory>
#include "muduo/protobuf/codec.h"
#include "muduo/protobuf/dispatcher.h"
#include "muduo/base/Logging.h"#include "muduo/net/TcpServer.h"
#include "muduo/net/TcpConnection.h"
#include "muduo/net/EventLoop.h"#include "business.pb.h"using std::placeholders::_1;
using std::placeholders::_2;
using std::placeholders::_3;
class Server
{
public:typedef std::shared_ptr<google::protobuf::Message> MessagePtr;typedef std::shared_ptr<business::TranslateRequest> TranslateRequestPtr;typedef std::shared_ptr<business::AddRequest> AddRequestPtr;private:muduo::net::EventLoop _baseLoop;muduo::net::TcpServer _server;ProtobufDispatcher _dispatcher; // 請求分發器ProtobufCodec _codec; // protobuf處理器--解析出結構化數據,發送結構化數據(序列化和發序列化內部會做)
public:Server(int port): _server(&_baseLoop, muduo::net::InetAddress("0.0.0.0", port), "Server",muduo::net::TcpServer::kReusePort),_dispatcher(std::bind(&Server::_onUnknownMessage, this,\std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),_codec(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)){// 注冊業務處理函數_dispatcher.registerMessageCallback<business::AddRequest>(bind(&Server::_onAdd, this, _1, _2, _3));_dispatcher.registerMessageCallback<business::TranslateRequest>(bind(&Server::_onTranslate, this, _1, _2, _3));//注冊_server的回調函數_server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &_codec,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_server.setConnectionCallback(std::bind(&Server::_onConnection, this, std::placeholders::_1));}void start(){_server.start();_baseLoop.loop();}private:void _onUnknownMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp){LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown();}void _onAdd(const muduo::net::TcpConnectionPtr& conn, const AddRequestPtr &messagePtr, muduo::Timestamp time){int x = messagePtr->num1();int y = messagePtr->num2();business::AddResponse resp;resp.set_result(x + y);_codec.send(conn, resp); //讓protobuf處理器幫我們序列化并用conn發送}void _onTranslate(const muduo::net::TcpConnectionPtr& conn, const TranslateRequestPtr &messagePtr, muduo::Timestamp time){const std::string& ret = translate(messagePtr->msg());business::TranslateResponse resp;resp.set_msg(ret);_codec.send(conn, resp);}void _onConnection(const muduo::net::TcpConnectionPtr &conn){if (conn->connected()){LOG_INFO << "新連接建立成功!";}else{LOG_INFO << "連接即將關閉!";}}std::string translate(const std::string& str){static std::unordered_map<std::string, std::string> dict_map = {{"hello", "你好"},{"Hello", "你好"},{"你好", "Hello"},{"吃了嗎", "油潑面"}};auto it = dict_map.find(str);if (it == dict_map.end()) {return "沒聽懂!!";}return it->second;}
};int main()
{Server server(8085);server.start();return 0;
}
Client.cc:?
#include "muduo/protobuf/codec.h"
#include "muduo/protobuf/dispatcher.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpClient.h"
#include "muduo/net/EventLoopThread.h"
#include "muduo/base/CountDownLatch.h"#include "business.pb.h"
#include <iostream>
#include <functional>class Client {public:typedef std::shared_ptr<google::protobuf::Message> MessagePtr; //這是Protobuf庫的頭文件typedef std::shared_ptr<business::AddResponse> AddResponsePtr;typedef std::shared_ptr<business::TranslateResponse> TranslateResponsePtr;Client(const std::string &sip, int sport):_latch(1), _client(_loopthread.startLoop(), muduo::net::InetAddress(sip, sport), "Client"),_dispatcher(std::bind(&Client::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),_codec(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)){_dispatcher.registerMessageCallback<business::TranslateResponse>(std::bind(&Client::onTranslate, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<business::AddResponse>(std::bind(&Client::onAdd, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, &_codec,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setConnectionCallback(std::bind(&Client::onConnection, this, std::placeholders::_1)); }void connect() {_client.connect();_latch.wait();//阻塞等待,直到連接建立成功}void Translate(const std::string &msg){business::TranslateRequest req;req.set_msg(msg);send(&req);}void Add(int num1, int num2) {business::AddRequest req;req.set_num1(num1);req.set_num2(num2);send(&req);}private:bool send(const google::protobuf::Message *message) {if (_conn->connected()) {//連接狀態正常,再發送,否則就返回false_codec.send(_conn, *message);return true;}return false;} void onTranslate(const muduo::net::TcpConnectionPtr& conn, const TranslateResponsePtr& message, muduo::Timestamp) {std::cout << "翻譯結果:" << message->msg() << std::endl;}void onAdd(const muduo::net::TcpConnectionPtr& conn, const AddResponsePtr& message, muduo::Timestamp) {std::cout << "加法結果:" << message->result() << std::endl;}void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn, const MessagePtr& message, muduo::Timestamp) {LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown();}void onConnection(const muduo::net::TcpConnectionPtr&conn){if (conn->connected()) {_latch.countDown();//喚醒主線程中的阻塞_conn = conn;}else {//連接關閉時的操作_conn.reset();}}private:muduo::CountDownLatch _latch;//實現同步的muduo::net::EventLoopThread _loopthread;//異步循環處理線程muduo::net::TcpConnectionPtr _conn;//客戶端對應的連接muduo::net::TcpClient _client;//客戶端ProtobufDispatcher _dispatcher;//請求分發器ProtobufCodec _codec; //Protobuf處理器
};int main()
{Client client("127.0.0.1", 8085);client.connect();client.Translate("hello");client.Add(11, 22);sleep(1);return 0;
}