文章目錄
- 再談 “協議”
- 重新理解read、write、recv、send和tcp為什么支持全雙工
- Server.cc
- 網絡版計算機實現
- Socket封裝(模板方法類)
- socket.hpp
- 定制協議
- Json
- Json安裝
- 定義一個期望的報文格式
- Protocol.hpp
- Parser.hpp
- Calculator.hpp
- 完整的處理過程
- Client.cc
- 三層解釋
- 手寫序列化與反序列化
再談 “協議”
協議是一種 “約定”。在前面我們說過父親和兒子約定打電話的例子,不過這是感性的認識,今天我們理性的認識一下協議。 socket api
的接口, 在讀寫數據時,都是按 “字符串”(其實TCP是字節流,這里是為了理解) 的方式來發送接收的。如果我們要傳輸一些 “結構化的數據” 怎么辦呢?
其實,協議就是雙方約定好的結構化的數據
約定:
定義結構體來表示我們需要交互的信息,發送數據時將這個結構體按照?個規則轉換成字符串, 接收到數據的時候再按照相同的規則把字符串轉化回結構體;
結構化的數據就比如說我們在使用QQ群聊時除了消息本身、還能看見頭像、時間、昵稱。這些東西都要發給對方。這些東西都是一個個字符串,難道是把消息、頭像、時間、昵稱都單獨發給對方嗎?那分開發的時候,未來群里有成百上千名人大家都發,全都分開發,接收方還要確定每一部分是誰的進行匹配,那這樣太惡心了。
實際上這些信息可不是一個個獨立個體的而是一個整體。為了理解暫時當作多個字符串。把多個字符串形成一個報文或者說打包成一個字符串(方便理解,其實是一個字節流)然后在網絡中發送。多變一方便未來在網絡里整體發送。而把多變一的過程,我們稱之為序列化。
經過序列化的過程變成一個整體后發到網絡里,經過網絡傳輸發送給對方,發是整體當作一個字符串發的。接收方收的也是整體收的,所以收到一個報文或者說字符串。但是收到的字符串有什么東西我怎么知道,qq作為上層要的是誰發的、什么時候、發的什么具體的信息,所以接收方收到這個整體字符串后,必須把它轉成多個字符串,這種一變多的過程,我們稱之為反序列化。
業務結構數據在發送網絡中的時候,先序列化在發送,收到的一定是序列字節流,要先進行反序列化,然后才能使用。
剛才說過這里用多個字符串不太對只是為了理解,實際上未來多個字符串實際是一個結構體。是以結構體(結構化的數據)作為體現的,然后把這個結構體轉成一個字符串,同理對方收到字符串然后轉成對應的結構化的數據。
為什么要把字符串轉成結構化數據呢?未來這個結構化的數據一定是一個對象,然后使用它的時候,直接對象.url
、對象.time
拿到。
而這里的結構體如message
就是傳說中的業務協議。
因為它規定了我們聊天時網絡通信的數據。
未來我們在應用層定協議就是這種結構體類型,目的就是把結構化的對象轉換成序列化結構發送到網絡里,然后再把序列化結構轉成對應的結構體對象,然后上層直接使用對象進行操作! 這是業務協議,底層協議有自己的特點。
這樣光說還是不太理解,下面找一個應用場景加深理解剛才的知識。所以我們寫一個網絡版計數器。里面體現出業務協議,序列化,反序列化,在寫TCP時要注意TCP時面向字節流的,接收方如何保證拿到的是一個完整的報文呢?而不是半個、多個?這里我們都通過下面寫代碼的時候解決。而UDP是面向數據報的接收方收到的一定是一個完整的報文,因此不考慮剛才的問題。
重新理解read、write、recv、send和tcp為什么支持全雙工
為什么說保證你讀到的消息是 【一個】完整的請求?因為TCP是面向字節流的,我們保證不了,所以要明確報文和報文的邊界。
TCP有自己內核級別的發送緩沖區和接收緩沖區,而應用層也有自己的緩沖區,我們自己寫的代碼調用read,write發送讀取使用的buffer就是對應緩沖區。其實我們調用的所有的發送函數,根本就不是把數據發送到網絡中!
發送函數,本質是拷貝函數!!!
write只是把數據從應用層緩沖區拷貝到TCP發送緩沖區,由TCP協議決定什么時候把數據發送到網絡,發多少,出錯了怎么辦。所以TCP協議叫做傳輸控制協議!!
最終數據經過網絡發送被服務端放到自己的接收緩沖區里,然后我們在應用層調用read,實際在等接收緩沖區里有沒有數據,有數據就把數據拷貝應用層的緩沖區。沒有數據就是說接收緩沖區是空的,read就會被阻塞。
總結:
所以網絡發送的本質:
C->S: tcp發送的本質,其實就是將數據從c的發送緩沖區,拷貝到s的接收緩沖區。
S->C: tcp發送的本質,其實就是將數據從s的發送緩沖區,拷貝到c的接收緩沖區。
C->S發,并不影響S->C發,因為用的是不同的成對的緩沖區,所以tcp是全雙工的!
這里主要想說的是,tcp在進行發送數據的時候,發收方一直發數據但是對方正在做其他事情來不及讀數據,所以導致接收方的接收緩沖區里面存在很多的報文,因為是TCP面向字節流的所以這些報文是挨在一起,最終讀的時候怎么保證讀到的是一個完整的報文交給上層處理,而不是半個,多個。就是因為我們有接收緩沖區的存在,因此首先我們要解決讀取的問題。
解決方法:
明確報文和報文的邊界:
- 定長
- 特殊符號
- 自描述方式
我們給每個報文前面帶一個有效載荷長度的字段,未來我先讀到這個長度,根據這個長度在讀取若干字節,這樣就能讀取到一個報文,一個能讀到,n個也能讀到。有效載荷里面是請求或者響應序列化的結果。
Server.cc
每個模塊都獨立出來,進行解耦
#include <memory>#include "TcpServer.hpp"
#include "Parser.hpp"void Usage(std::string proc)
{std::cerr << "Usage : " << proc << " <prot>" << std::endl;
}int main(int argc, char *argv[])
{if (argc != 2){Usage(argv[0]);exit(0);}// 計算器模塊std::unique_ptr<Calculator> cal = std::make_unique<Calculator>();// 協議解析模塊std::unique_ptr<Parser> parser = std::make_unique<Parser>([&cal](Request &req) -> Response{ return cal->Exec(req); });uint16_t port = std::stoi(argv[1]);// 網絡通信模塊std::unique_ptr<TcpServer> tsock = std::make_unique<TcpServer>(port, [&parser](std::string &inbuffer) -> std::string{ return parser->Parse(inbuffer); });tsock->Run();return 0;
}
網絡版計算機實現
這里采用一個設計模式–>模版方法
Socket封裝(模板方法類)
const static int gbacklog = 16;class Socket
{
public:virtual ~Socket() {}virtual void CreateSocketOrDie() = 0;virtual void BindSocketOrDie(int port) = 0;virtual void ListenSocketOrDie(int gbacklog) = 0;virtual std::shared_ptr<Socket> Accept(InetAddr *clientaddr) = 0;virtual int SockFd() = 0;virtual void Close() = 0;virtual ssize_t Recv(std::string *out) = 0;virtual ssize_t Send(const std::string &in) = 0;virtual bool Connect(InetAddr &peer) = 0;public:void BuildListenSocketMethod(int port){CreateSocketOrDie();BindSocketOrDie(port);ListenSocketOrDie(gbacklog);}void BuildClientSocketMethod(){CreateSocketOrDie();}
};
socket.hpp
新的接口函數send和write一模一樣,不過多了一個參數flags
#ifndef __SOCKET_HPP__
#define __SOCKET_HPP__#include <sys/types.h>
#include <sys/socket.h>#include "Logger.hpp"
#include "InetAddr.hpp"enum
{OK,CREATE_ERR,BIND_ERR,LISTEN_ERR,ACCEPT_ERR
};const static int gbacklog = 16;class Socket
{
public:virtual ~Socket() {}virtual void CreateSocketOrDie() = 0;virtual void BindSocketOrDie(int port) = 0;virtual void ListenSocketOrDie(int gbacklog) = 0;virtual std::shared_ptr<Socket> Accept(InetAddr *clientaddr) = 0;virtual int SockFd() = 0;virtual void Close() = 0;virtual ssize_t Recv(std::string *out) = 0;virtual ssize_t Send(const std::string &in) = 0;virtual bool Connect(InetAddr &peer) = 0;public:void BuildListenSocketMethod(int port){CreateSocketOrDie();BindSocketOrDie(port);ListenSocketOrDie(gbacklog);}void BuildClientSocketMethod(){CreateSocketOrDie();}
};class TcpSocket : public Socket
{
public:TcpSocket() {}TcpSocket(int sockfd) : _sockfd(sockfd) {}void CreateSocketOrDie() override{_sockfd = socket(AF_INET, SOCK_STREAM, 0);if (_sockfd < 0){LOG(LogLevel::FATAL) << "create socket error!";exit(CREATE_ERR);}LOG(LogLevel::INFO) << "create socket success! fd: " << _sockfd;}void BindSocketOrDie(int port) override{InetAddr local(port);if (bind(_sockfd, local.Addr(), local.Length())){LOG(LogLevel::FATAL) << "bind socket error!";exit(BIND_ERR);}LOG(LogLevel::INFO) << "bind socket success!";}void ListenSocketOrDie(int backlog) override{if (listen(_sockfd, backlog) != 0){LOG(LogLevel::FATAL) << "listen socket error!";exit(LISTEN_ERR);}LOG(LogLevel::INFO) << "listen socket success!";}std::shared_ptr<Socket> Accept(InetAddr *clientaddr) override{struct sockaddr_in peer;socklen_t len = sizeof(peer);int fd = accept(_sockfd, (struct sockaddr *)&peer, &len);if (fd < 0){LOG(LogLevel::FATAL) << "accept socket error!";exit(ACCEPT_ERR);}LOG(LogLevel::INFO) << "accept socket success!";clientaddr->Init(peer); // 設置return std::make_shared<TcpSocket>(fd);}int SockFd() override{return _sockfd;}void Close() override{if (_sockfd >= 0)close(_sockfd);}ssize_t Recv(std::string *out) override{char buffer[1024];ssize_t n = recv(_sockfd, buffer, sizeof(buffer) - 1, 0);if (n > 0){buffer[n] = 0;*out = buffer;}return n;}ssize_t Send(const std::string &in) override{return send(_sockfd, in.c_str(), in.size(), 0);}bool Connect(InetAddr &peer) override{int n = connect(_sockfd, peer.Addr(), peer.Length());if (n >= 0)return true;elsereturn false;}~TcpSocket() {}private:int _sockfd;
};#endif
定制協議
定制基本的結構化字段,這個就是協議
class Request
{
public:private:int _x;int _y;char _oper;
};class Response
{
public:private:int _result;int _code;
};
關于這個序列化我們可以自己寫,也可以用現成的,這里我就使用現成的(Json),在最后再自己手寫一個
Json
Json其實就是一個字符串風格數據交換格式
Json安裝
Jsoncpp 是一個用于處理 JSON 數據的 C++ 庫。
里面屬性是以K和V的形式呈現出來的鍵值對,未來我們可以以KV形式設置,提取可以以KV形式提取。
安裝Json庫
ubuntu:sudo apt-get install -y libjsoncpp-dev
Centos: sudo yum install -y jsoncpp-devel
自定義協議說人話就是定義一個結構化的對象,有了這個結構化的對象,未來客戶端和服務端可以進行來回的發送。約定體現在這個結構化對象里面的成員變量都代表了什么意思。為什么一定是這樣的格式而不能是其他格式。如op為什么一定是±*/
不能是其他,這些都是約定好的。拿到結果先看哪一個后看哪一個。exitcode
為0是什么意思,不為0是什么意思。都是規定好的。這就是協議。
定義一個期望的報文格式
Protocol.hpp
#pragma once#include <iostream>
#include <string>
#include <jsoncpp/json/json.h>class Request
{
public:Request():_x(0), _y(0), _oper(0){}bool Serialize(std::string *out){Json::Value root;root["x"] = _x;root["y"] = _y;root["oper"] = _oper;Json::StyledWriter writer;*out = writer.write(root);if (out->empty())return false;return true;}bool Deserialize(std::string &in){Json::Reader reader;Json::Value root;if (!reader.parse(in, root))return false;_x = root["x"].asInt();_y = root["y"].asInt();_oper = root["oper"].asInt();return true;}int GetX(){return _x;}int GetY(){return _y;}char GetOper(){return _oper;}void SetX(int x){_x = x;}void SetY(int y){_y = y;}void SetOper(char oper){_oper = oper;}~Request() {}private:int _x;int _y;char _oper;
};class Response
{
public:Response():_result(0), _code(0){}bool Serialize(std::string *out){Json::Value root;root["result"] = _result;root["code"] = _code;Json::StyledWriter writer;*out = writer.write(root);if (out->empty())return false;return true;}bool Deserialize(std::string &in){Json::Reader reader;Json::Value root;if (!reader.parse(in, root))return false;_result = root["result"].asInt();_code = root["code"].asInt();return true;}void Print(){std::cout << "result: " << _result << " [" << _code << "]" << std::endl;}int GetResult(){return _result;}int GetCode(){return _code;}void SetResult(int result){_result = result;}void SetCode(int code){_code = code;}~Response() {}private:int _result;int _code;
};static const std::string sep = "\r\n";class ProtoCol
{
public:static std::string Package(const std::string &jsonstr){if (jsonstr.empty())return std::string();std::string json_len = std::to_string(jsonstr.size());return json_len + sep + jsonstr + sep; // 有效載荷長度\r\n有效載荷內容\r\n}/*** 返回值說明:* 0:表示沒有內容* -1:表示錯誤字符* <0: 表示解包后的字符串的長度*/static int Unpack(std::string &origin_str, std::string *package){if (!package)return 0;auto pos = origin_str.find(sep); // 查找\r\nif (pos == std::string::npos)return 0;std::string len_str = origin_str.substr(0, pos); // 截取有效載荷字符串if (!DigitSafeCheck(len_str))return -1;int digit_len = std::stoi(len_str); // 獲取有效載荷長度int target_len = len_str.size() + sep.size() + digit_len + sep.size(); // eg: 3 \r\n *** \r\nif (origin_str.size() < target_len)return 0;*package = origin_str.substr(pos + sep.size(), digit_len); // 把有效載荷長度帶出去origin_str.erase(0, target_len); // 刪除前面的數據return package->size();}private:static bool DigitSafeCheck(const std::string str){for (const auto &ch : str){if (!(ch >= '0' && ch <= '9'))return false;}return true;}
};
Parser.hpp
繼續封裝,服務端調用到上層進行解析報文
#pragma once#include <functional>#include "Parser.hpp"
#include "Calculator.hpp"
#include "Protocol.hpp"
#include "Logger.hpp"using handler_t = std::function<Response(Request &req)>;class Parser
{
public:Parser(handler_t handler) : _handler(handler){}std::string Parse(std::string &inbuffer){LOG(LogLevel::DEBUG) << "inbuffer: \r\n"<< inbuffer;std::string send_str;for (;;) // 獲取到的數據不一定是全部的,需要循環獲取, 當獲取完一個整個報文后回掉回去進行發送{std::string jsonstr;// 解析報文int n = ProtoCol::Unpack(inbuffer, &jsonstr);if (n < 0)exit(0); // 解包錯誤else if (n == 0){break; // 已經處理完畢了}else if (n > 0){LOG(LogLevel::DEBUG) << "jsonstr: \r\n"<< jsonstr;// 反序列化Request req;if (!req.Deserialize(jsonstr)){return std::string();}Response resp = _handler(req); // 回掉函數到上層處理業務// 序列化std::string resp_json;if (!resp.Serialize(&resp_json)){return std::string();}// 打包send_str += ProtoCol::Package(resp_json);}else{exit(-1); // 未知錯誤}}return send_str; // 回掉回去然后進行發送數據}private:handler_t _handler;
};
Calculator.hpp
業務處理(計算)
#pragma once#include "Protocol.hpp"class Calculator
{
public:/*** code: 0 計算正確* code: 1 除零錯誤* code: 2 取模錯誤* code: 3 未支持*/Response Exec(Request &req){Response resp;switch (req.GetOper()){case '+':resp.SetResult(req.GetX() + req.GetY());break;case '-':resp.SetResult(req.GetX() - req.GetY());break;case '*':resp.SetResult(req.GetX() * req.GetY());break;case '/':{if (req.GetY() == 0){resp.SetCode(1);}else{resp.SetResult(req.GetX() / req.GetY());}}break;case '%':{if (req.GetY() == 0){resp.SetCode(2);}else{resp.SetResult(req.GetX() % req.GetY());}}break;case '^':resp.SetResult(req.GetX() ^ req.GetY());break;case '|':resp.SetResult(req.GetX() | req.GetY());break;case '&':resp.SetResult(req.GetX() & req.GetY());break;default:resp.SetCode(3);break;}return resp;}~Calculator() {}
};
完整的處理過程
Client.cc
客戶端需要進行下面的步驟
- 構建請求
- 序列化
- 打包
- 發送
- 接收
- 反序列化
#include <iostream>
#include <string>
#include <memory>#include "Socket.hpp"
#include "InetAddr.hpp"
#include "Protocol.hpp"void Usage(std::string proc)
{std::cerr << "Usage: " << proc << " <serverip> <serverport>" << std::endl;
}int main(int argc, char *argv[])
{if (argc != 3){Usage(argv[0]);exit(0);}std::string serverip = argv[1];uint16_t serverport = std::stoi(argv[2]);std::unique_ptr<Socket> sockptr = std::make_unique<TcpSocket>();sockptr->BuildClientSocketMethod();InetAddr server(serverport, serverip);if (sockptr->Connect(server)) // 連接{std::string inbuffer;for (;;){// 1. 構建請求Request req;int x, y;char oper;std::cout << "Please Enter X:";std::cin >> x;req.SetX(x);std::cout << "Please Enter Y:";std::cin >> y;req.SetY(y);std::cout << "Please Enter Oper:";std::cin >> oper;req.SetOper(oper);// 2. 序列化std::string jsonstr;req.Serialize(&jsonstr);// std::cout << "jsonstr: \r\n" << jsonstr << std::endl;// 3 打包std::string sendstr = ProtoCol::Package(jsonstr);// std::cout << "sendstr: \r\n" << sendstr << std::endl;// 4. 發送sockptr->Send(sendstr);// 5. 接收sockptr->Recv(&inbuffer);// 6. 反序列化std::string package;int n = ProtoCol::Unpack(inbuffer, &package);if (n > 0){Response resp;bool r = resp.Deserialize(package);if (r){resp.Print();}}}}return 0;
}
UDP是面向數據報的,因此只需要序列化和反序列化。
TCP是面向字節流的,需要考慮保證讀到的是一個完整報文、獲取有效載荷、序列化、反序列化。
測試:
三層解釋
未來客戶端可能有非常多的協議
解決方法就是:我們就可以在協議中添加協議編號!…end…
手寫序列化與反序列化
#pragma once#include <iostream>
#include <memory>
#include <jsoncpp/json/json.h>// #define SelfDefine 1namespace Protocol
{// 問題// 1. 結構化數據的序列和反序列化// 2. 還要解決用戶區分報文邊界 --- 數據包粘報問題// 總結:// 我們今天定義了幾組協議呢??我們可以同時存在多個協議嗎???可以// "protocol_code\r\nlen\r\nx op y\r\n" : \r\n不屬于報文的一部分,約定const std::string ProtSep = " ";const std::string LineBreakSep = "\r\n";// "len\r\nx op y\r\n" : \r\n不屬于報文的一部分,約定std::string Encode(const std::string &message){std::string len = std::to_string(message.size());std::string package = len + LineBreakSep + message + LineBreakSep;return package;}bool Decode(std::string &package, std::string *message){// 除了解包,我還想判斷報文的完整性, 能否正確處理具有"邊界"的報文auto pos = package.find(LineBreakSep);if (pos == std::string::npos)return false;std::string lens = package.substr(0, pos);int messagelen = std::stoi(lens);int total = lens.size() + messagelen + 2 * LineBreakSep.size();if (package.size() < total)return false;// 至少package內部一定有一個完整的報文了!*message = package.substr(pos + LineBreakSep.size(), messagelen);package.erase(0, total);return true;}class Request{public:Request() : _data_x(0), _data_y(0), _oper(0){}Request(int x, int y, char op) : _data_x(x), _data_y(y), _oper(op){}void Debug(){std::cout << "_data_x: " << _data_x << std::endl;std::cout << "_data_y: " << _data_y << std::endl;std::cout << "_oper: " << _oper << std::endl;}void Inc(){_data_x++;_data_y++;}// 結構化數據->字符串bool Serialize(std::string *out){
#ifdef SelfDefine // 條件編譯*out = std::to_string(_data_x) + ProtSep + _oper + ProtSep + std::to_string(_data_y);return true;
#elseJson::Value root;root["datax"] = _data_x;root["datay"] = _data_y;root["oper"] = _oper;Json::FastWriter writer;*out = writer.write(root);return true;
#endif}bool Deserialize(std::string &in) // "x op y" [){
#ifdef SelfDefineauto left = in.find(ProtSep);if (left == std::string::npos)return false;auto right = in.rfind(ProtSep);if (right == std::string::npos)return false;_data_x = std::stoi(in.substr(0, left));_data_y = std::stoi(in.substr(right + ProtSep.size()));std::string oper = in.substr(left + ProtSep.size(), right - (left + ProtSep.size()));if (oper.size() != 1)return false;_oper = oper[0];return true;
#elseJson::Value root;Json::Reader reader;bool res = reader.parse(in, root);if(res){_data_x = root["datax"].asInt();_data_y = root["datay"].asInt();_oper = root["oper"].asInt();}return res;
#endif}int GetX() { return _data_x; }int GetY() { return _data_y; }char GetOper() { return _oper; }private:// _data_x _oper _data_y// 報文的自描述字段// "len\nx op y\n" : \n不屬于報文的一部分,約定// 很多工作都是在做字符串處理!int _data_x; // 第一個參數int _data_y; // 第二個參數char _oper; // + - * / %};class Response{public:Response() : _result(0), _code(0){}Response(int result, int code) : _result(result), _code(code){}bool Serialize(std::string *out){
#ifdef SelfDefine*out = std::to_string(_result) + ProtSep + std::to_string(_code);return true;
#elseJson::Value root;root["result"] = _result;root["code"] = _code;Json::FastWriter writer;*out = writer.write(root);return true;
#endif}bool Deserialize(std::string &in) // "_result _code" [){
#ifdef SelfDefineauto pos = in.find(ProtSep);if (pos == std::string::npos)return false;_result = std::stoi(in.substr(0, pos));_code = std::stoi(in.substr(pos + ProtSep.size()));return true;
#elseJson::Value root;Json::Reader reader;bool res = reader.parse(in, root);if(res){_result = root["result"].asInt();_code = root["code"].asInt();}return res;
#endif}void SetResult(int res) { _result = res; }void SetCode(int code) { _code = code; }int GetResult() { return _result; }int GetCode() { return _code; }private:// "len\n_result _code\n"int _result; // 運算結果int _code; // 運算狀態};// 簡單的工廠模式,建造類設計模式class Factory{public:std::shared_ptr<Request> BuildRequest(){std::shared_ptr<Request> req = std::make_shared<Request>();return req;}std::shared_ptr<Request> BuildRequest(int x, int y, char op){std::shared_ptr<Request> req = std::make_shared<Request>(x, y, op);return req;}std::shared_ptr<Response> BuildResponse(){std::shared_ptr<Response> resp = std::make_shared<Response>();return resp;}std::shared_ptr<Response> BuildResponse(int result, int code){std::shared_ptr<Response> req = std::make_shared<Response>(result, code);return req;}};
}