目錄
一、網絡版計算器
二、網絡版本計算器實現
2.1源代碼
2.2測試結果
一、網絡版計算器
應用層定義的協議:
應用層進行網絡通信能否使用如下的協議進行通信呢?
在操作系統內核中是以這種協議進行通信的,但是在應用層禁止以這種協議進行通信,原因如下:
- 我們寫的服務端程序是在Linux系統上運行的,但是客戶端程序不一定是在Linux系統上運行的,可能是在Windos系統上運行的,服務端在不同平臺上運行,不同的平臺下內存對齊等等可能不同,那么對同一個結構體定義出來的結構體變量大小可能不同。還有服務端程序是由C++語言寫的,客戶端程序可能是由其他語言寫的,如:Java語言、Python語言寫的,那么類型大小可能不一樣。
- 即使對內存對齊問題做出解決,但是我們的程序隨時隨地根據需求需要做出更改,那么之前寫的協議就需要做出改變,重新進行內存等問題的處理,重新進行測試,出現問題的可能性很大,這樣寫出來的代碼可擴展性很差。
操作系統內核能夠以這種方式進行通信的原因:操作系統都是用C語言寫的,操作系統一旦寫好,基本不會做改變,使用這種協議進行本地通信實現起來簡單。
哪如何進行網絡通信呢?
第一種約定方案:客戶端發送一個形如"1+1"的字符串,中間不存在空格
第二種約定方案:定義結構體來表示我們需要交互的信息, 發送數據時將這個結構體按照一個規則轉換成字符串, 接收到數據的時候再按照相同的規則把字符串轉化回結構體。這個過程叫做 “序列化” 和 "反序列化。
- 傳輸層和網絡層是操作系統部分,數據鏈路層對應的是驅動。
- 一個fd代表一個鏈接,一個鏈接有兩個緩沖區,read()、write()、send()、recv()這些函數本質是拷貝函數,將內容拷貝到換沖區,從緩沖區中拷貝內容。
- 發送數據的本質:是從發送方的發送緩沖區把數據通過協議棧和網絡拷貝給接受方的接收緩沖區,這也就是為什么TCP協議是全雙工通信模式。
- 應用層將數據拷貝給傳輸層,數據什么時候發、一次發多少、出錯了怎么辦應用層不用管由操作系統來決定,這也就是TCP稱為傳輸控制協議的原因。
- 傳輸數據過程其實就是生產消費模型,發送和接收緩沖區就是臨界資源,read()阻塞就是接收緩沖區是空的,write()阻塞就是發送緩沖區滿了。
二、網絡版本計算器實現
序列化和反序列化的工具:Jsoncpp用于將數據序列化為字符串的C++庫
安裝Jsoncpp庫
ubuntu: sudo apt-get install libjsoncpp-dev
Centos: sudo yum install jsoncpp-devel
-
對于系統頭文件默認是在/usr/include目錄下找,要想使用必須這樣包含頭文件
#include <jsoncpp/json/json.h>
【第一次測試】
這個報錯是鏈接報錯,gcc、g++默認是認識C/C++庫,但不認識第三方提供的庫,在makefile文件中添加
-ljsoncpp,告訴編譯器指定到那個庫中去找。
2.1源代碼
SeverMain.cc(服務端入口)
#include <iostream>
#include <functional>#include "Sever.hpp"
#include "IoService.hpp"
#include "Calculate.hpp"int main()
{Scream();uint16_t port = 8888;Calculate cal;IoService ioService(std::bind(&Calculate::Operation, &cal, std::placeholders::_1));Sever sever(std::bind(&IoService::IoExecute, &ioService, std::placeholders::_1, std::placeholders::_2),port);sever.Init();sever.Loop();return 0;
}
Sever.hpp(服務端)
#pragma once
#include <functional>#include "Socket.hpp"
#include "Log.hpp"
#include "InetAddr.hpp"
#include "IoService.hpp"using namespace socket_n;
const static int gport = 8888;
using ioService_t = std::function<void(SockPtr, InetAddr &)>; // 可調用對象的類型,Io業務處理類型class Sever
{
public:Sever(ioService_t ioService, uint16_t port = gport): _port(port), _ifRunning(false), _listenSocket(), _ioService(ioService){}void Init(){// 創建監聽套接字、綁定、設置監聽狀態_listenSocket.CreatListenSocket(_port);}struct ThreadData{ThreadData(SockPtr sockfd, InetAddr addr, Sever *pSever): _sockfd(sockfd), _addr(addr), _pSever(pSever){}SockPtr _sockfd;InetAddr _addr;Sever *_pSever;};void Loop(){_ifRunning = true;while (_ifRunning){InetAddr client_addr;SockPtr SockfdSmartPtr = _listenSocket.Accept(client_addr);if (SockfdSmartPtr == nullptr){LOG(WARNING, "accept error\n");continue;}LOG(INFO, "get a new link, client info: %s\n", client_addr.AddrStr().c_str());// 多線程處理業務pthread_t tid;// 智能指針和繼承???????ThreadData *td = new ThreadData(SockfdSmartPtr, client_addr, this); // td必須是動態開辟出來的pthread_create(&tid, nullptr, Execute, td);}_ifRunning = false;}static void *Execute(void *args){pthread_detach(pthread_self()); // 將自己和主線程分離ThreadData *td = static_cast<ThreadData *>(args);td->_pSever->_ioService(td->_sockfd, td->_addr); // 回調交互業務函數,執行交互業務td->_sockfd->Close(); // 業務處理完,也不再進行網絡通信,關閉該套接字delete td;return nullptr;}~Sever(){}private:bool _ifRunning;uint16_t _port;TcpSocket _listenSocket;ioService_t _ioService;
};
Socket.hpp(套接字)
#pragma once
#include <iostream>
#include <cstring>
#include <string>
#include <functional>
#include <memory>#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>#include "Log.hpp"
#include "InetAddr.hpp"namespace socket_n
{// 父類提供接口,子類實現接口const static int gsockfd = -1;const static int gbacklog = 8;enum{SOCKET_ERROR = 1,BIND_ERROR,LISTEN_ERROR};class Socket;using SockPtr = std::shared_ptr<Socket>; // 父類中也用到了,所以在父類前面聲明class Socket{public:virtual void CreatSocket() = 0;virtual void BindSocket(uint16_t port) = 0;virtual void SetListenStatus(int balcklog = gbacklog) = 0;virtual SockPtr Accept(InetAddr &client_addr) = 0;virtual bool Connect(const std::string &sever_ip, uint16_t sever_port) = 0;virtual int Sockfd() = 0;virtual void Close() = 0;virtual ssize_t Recv(std::string &out) = 0;virtual ssize_t Send(const std::string &in) = 0;// 創建監聽套接字void CreatListenSocket(uint16_t port){CreatSocket();BindSocket(port);SetListenStatus();}// 客戶端套接字bool CreatClientSocket(const std::string &sever_ip, uint16_t sever_port){CreatSocket();return Connect(sever_ip, sever_port);}};class TcpSocket : public Socket{public:TcpSocket(int sockfd = gsockfd): _sockfd(sockfd){}TcpSocket(TcpSocket &socket): _sockfd(socket._sockfd){}void CreatSocket() override{_sockfd = socket(AF_INET, SOCK_STREAM, 0);if (_sockfd < 0){LOG(FATAL, "socket creat error\n");exit(SOCKET_ERROR);}LOG(INFO, "socket creat success,sockfd: %d\n", _sockfd);}void BindSocket(uint16_t port) override{struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;if (bind(_sockfd, (struct sockaddr *)&local, sizeof(local)) < 0){LOG(FATAL, "bind error\n");exit(BIND_ERROR);}LOG(INFO, "bind success\n");}void SetListenStatus(int balcklog) override{if (listen(_sockfd, balcklog) < 0){LOG(FATAL, "set listen error\n");exit(LISTEN_ERROR);}LOG(INFO, "listen success\n");}SockPtr Accept(InetAddr &client_addr) override{struct sockaddr_in client;socklen_t len = sizeof(client);int sockfd = accept(_sockfd, (struct sockaddr *)&client, &len);if (sockfd < 0){LOG(WARNING, "accept error\n");return nullptr; // 返回一個用nullptr構造的智能指針}client_addr = InetAddr(client);LOG(INFO, "get a new link, client info: %s\n", client_addr.AddrStr().c_str());return std::make_shared<TcpSocket>(sockfd); // C++14}bool Connect(const std::string &sever_ip, uint16_t sever_port) override{struct sockaddr_in server;memset(&server, 0, sizeof(server));server.sin_family = AF_INET;server.sin_port = htons(sever_port);::inet_pton(AF_INET, sever_ip.c_str(), &server.sin_addr);int n = ::connect(_sockfd, (struct sockaddr *)&server, sizeof(server));if (n < 0){return false;}return true;}int Sockfd() override{return _sockfd;}void Close() override{if (_sockfd > 0)close(_sockfd);}ssize_t Recv(std::string &out) override{char inbuffer[1024];ssize_t n = recv(_sockfd, inbuffer, sizeof(inbuffer) - 1, 0);if (n > 0){inbuffer[n] = 0;out = inbuffer;}return n;}ssize_t Send(const std::string &in) override{return send(_sockfd, in.c_str(), in.size(), 0);}private:int _sockfd;};
}
IoSever.hpp(IO交互)
#pragma once
#include <memory>
#include <functional>#include "InetAddr.hpp"
#include "Socket.hpp"
#include "Log.hpp"
#include "Protocol.hpp"using namespace socket_n;
using process_t = std::function<std::shared_ptr<Response>(std::shared_ptr<Request>)>;
class IoService
{
public:IoService(process_t process): _process(process){}void IoExecute(SockPtr sockPtr, InetAddr &addr){std::string packageStream;std::shared_ptr<Request> req_ptr = std::make_shared<Request>();std::string recvMessage;std::string sendMessage;while (true){// 讀內容ssize_t n = sockPtr->Recv(recvMessage);// 數據讀取出錯或者沒數據了,直接break,關閉套接字,停止對該客戶端的服務// 客戶端可以重新建立連接if (n <= 0){LOG(ERROR, "read error or rend end: %s\n", addr.AddrStr().c_str());break;}packageStream += recvMessage;// 提取一個完整的報文std::string jsonStr = DeCode(packageStream);if (jsonStr == "")continue;// 反序列化req_ptr->Deserialize(jsonStr);// 計算auto res_ptr = _process(req_ptr);// 序列化res_ptr->Serialize(sendMessage);// 添加報頭sendMessage = EnCode(sendMessage);// 向套接字中寫sockPtr->Send(sendMessage);}}~IoService(){}private:process_t _process;
};
Protocol.hpp(計算器協議)
#pragma once
#include <string>
#include <jsoncpp/json/json.h>// 應用層協議完整的報頭+報文格式
//"len\r\n{jsonStr}\r\n" —— "len":報文的長度;"\r\n":區分len和報文
// 使用"\r\n"作為分割原因是在Debug測試時打印出來的結果:一行是len、一行是報文的形式,也可以用其他作為分割
//"{jsonStr}\r\n"能不能只以"\r\n"作為分割呢?
// 不能因為報文中有可能也有\r\n,如果查找\r\n,取其前面的內容作為報文是不行的
static const std::string sep = "\r\n";
// 添加報頭
std::string EnCode(const std::string &jsonStr)
{int len = jsonStr.size();std::string lenStr = std::to_string(len);return lenStr + sep + jsonStr + sep;
}
//"le"、"len\r"
//"len\r\n{js"
//"len\r\n{jsonStr}\r\nlen\r\n{js"
// 從傳的流中提取出一個完整的報文,如果流中沒有一個完整的報文返回空,如果有返回一個完整的報文字符串
std::string DeCode(std::string &packageStream)
{size_t n = packageStream.find(sep);if (n == std::string::npos)return "";std::string numStr = packageStream.substr(0, n);int len = std::stoi(numStr);// 當前這個完整報文的長度int total = numStr.size() + len + 2 * sep.size();if (packageStream.size() < total)return "";int pos = n + sep.size();std::string jsonStr = packageStream.substr(pos, len);// 提取到一個完整的jsonStr串,刪掉packageStream中頭部字段packageStream.erase(0, total);return jsonStr;
}// 請求
class Request
{
public:Request(int x = 1, int y = 1, char oper = '+'): _x(x), _y(y), _oper(oper){}// 序列化:將結構化字段轉換成字符串void Serialize(std::string &out){// 先定義一個中間值,Value類型的對象rootJson::Value root;root["x"] = _x; // 一個鍵值對應一個value值,value可以是任意類型,包括對象root["y"] = _y;root["oper"] = _oper;// 再定義一個FastWriter類型的對象,使用其內部的write()方法將中間值root序列化成字符串Json::FastWriter writer;std::string s = writer.write(root);out = s;}// 反序列化:將字符串轉換成結構化字段bool Deserialize(std::string &in){Json::Value root;// 將序列化后的字符串,轉化成Value的中間值Json::Reader reader;bool res = reader.parse(in, root);if (res){_x = root["x"].asInt(); // 將root中的"x"對應的value值作為正數給給_x_y = root["y"].asInt();_oper = root["oper"].asInt();return true;}return false;}int RetX(){return _x;}int RetY(){return _y;}char RetOper(){return _oper;}~Request(){}// private:int _x;int _y;char _oper;
};// 回應
class Response
{
public:Response(): _result(2), _code(0), _describe("Calculation successful"){}// 序列化void Serialize(std::string &out){// 先定義一個中間值,Value類型的對象rootJson::Value root;root["result"] = _result; // 一個鍵值對應一個value值,value可以是任意類型,包括對象root["code"] = _code;root["describe"] = _describe;// 再定義一個FastWriter類型的對象,使用其內部的write()方法將中間值root序列化成字符串Json::FastWriter writer;std::string s = writer.write(root);out = s;}// 反序列化bool Deserialize(std::string &in){Json::Value root;// 將序列化后的字符串,轉化成Value的中間值Json::Reader reader;bool res = reader.parse(in, root);if (res){_result = root["result"].asInt(); // 將root中的"x"對應的value值作為正數給給_x_code = root["code"].asInt();_describe = root["describe"].asString();return true;}return false;}~Response(){}// private:int _result;int _code; // 結果碼0:計算成功 1:除0錯誤 2:其他非法操作std::string _describe; // 對結果碼的描述
};
Calculate.hpp(計算功能)
#pragma once
#include <memory>
#include <climits>#include "Protocol.hpp"class Calculate
{
public:
Calculate()
{}
std::shared_ptr<Response> Operation(std::shared_ptr<Request> reqPtr)
{int x = reqPtr->RetX();int y = reqPtr->RetY();char oper = reqPtr->RetOper();std::shared_ptr<Response> resPtr = std::make_shared<Response>();switch (oper){case '+':{resPtr->_result = x + y;break;}case '-':{resPtr->_result = x - y;break;}case '*':{resPtr->_result = x * y;break;}case '/':{if (y == 0){resPtr->_result = INT_MAX;resPtr->_code = 1;resPtr->_describe = "Division by zero error";}else{resPtr->_result = x / y;}break;}default:{resPtr->_result = INT_MIN;resPtr->_code = 2;resPtr->_describe = "Other illegal operations";break;}}return resPtr;
}
~Calculate()
{}private:
};
Log.hpp(日志)
#pragma once
#include <string>
#include <ctime>
#include <fstream>
#include <cstring>
#include <sys/types.h>
#include <unistd.h>
#include <stdarg.h>
#include <pthread.h>#define SCREAM 1
#define FILE 2
enum
{DEBUG = 1,INFO,WARNING,ERROR,FATAL
};
//日志消息 [日志等級][pid][filename][filenumber][time] 日志內容
class LMessage
{
public:std::string _level;//信息等級pid_t _pid;//進程idstd::string _filename;//所在文件int _filenumber;//所在文件行號std::string _cur_time; //打印時間std::string _message_info;//日志內容
};
//獲取當前時間
std::string GetCurTime()
{//從過去的那一個時刻到現在累計的秒數time_t now = time(nullptr);//將時間戳轉換成年月日時分秒的字符串struct tm* cur_time = localtime(&now);char buffer[100];snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d", cur_time->tm_year+1900,cur_time->tm_mon+1,cur_time->tm_mday,cur_time->tm_hour,cur_time->tm_min,cur_time->tm_sec);return buffer;
}pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutex2 = PTHREAD_MUTEX_INITIALIZER;
const std::string g_file = "./log.txt";
//日志
class Log
{
private:void FlushToScream(LMessage& message){//對指定的鎖進行加鎖pthread_mutex_lock(&mutex1); //[日志等級][pid][filename][filenumber][time] 日志內容printf("[%s][%d][%s][%d][%s]%s",message._level.c_str(), message._pid,message._filename.c_str(),message._filenumber,message._cur_time.c_str(),message._message_info.c_str());//對指定的鎖進行解鎖pthread_mutex_unlock(&mutex1);}void FlushToFile(LMessage& message){//對指定的鎖進行加鎖pthread_mutex_lock(&mutex1); //std::ofstream out(_file)//每次寫入前先前會先清空文件std::ofstream out(_file, std::ios::app);//向文件追加寫入if(!out.is_open())return ;char buffer[1024];snprintf(buffer, sizeof(buffer), "[%s][%d][%s][%d][%s]%s",message._level.c_str(), message._pid,message._filename.c_str(),message._filenumber,message._cur_time.c_str(),message._message_info.c_str());out.write(buffer, strlen(buffer));out.close();//對指定的鎖進行解鎖pthread_mutex_unlock(&mutex1);}void Fussh(LMessage& message){if(_type == SCREAM){//向顯示器中打印FlushToScream(message);}else if(_type == FILE){//向文件中打印FlushToFile(message);}}
public:Log()//默認是向顯示器中打印:_type(SCREAM),_file(g_file){}//為了方便將等級名稱輸出到文件/顯示屏中,將數字轉換成字符串const std::string LevelToString(int level){switch(level){case DEBUG:return "DEBUG";case INFO:return "INFO";case ERROR:return "ERROR";case FATAL:return "FATAL";default:return "UNKNOW";//表示未知的}}//通過log.LogMessage("xx",12,INFO,"ctear %d thread success",pid)這種形式將消息寫進日志中//通過外部傳一個消息,由于外部有時會需要傳一個可變參數,message_info在傳的時候需要設計成可變參數void LogMessage(const std::string filename, int filenumber, int level, const char* format, ...){LMessage message;message._level = LevelToString(level);message._pid = getpid();message._filename = filename;message._filenumber = filenumber;message._cur_time = GetCurTime();//定義一個apva_list ap;//初始化ap,讓ap指向可變參數va_start(ap,format);char info[1024];//將格式化形式傳進來,可變參數傳進來,自動將轉換成字符串放到字符數組中vsnprintf(info, sizeof(info), format, ap);//銷毀apva_end(ap);message._message_info = info;//將消息寫入到文件或顯示器;Fussh(message);} ~Log(){pthread_mutex_destroy(&mutex1);pthread_mutex_destroy(&mutex2);}void FlusshScream(){_type = SCREAM;}void FlusshFile(){_type = FILE;}
private:int _type;const std::string _file;
};Log lg;
#define LOG(level, format, ...) do{ lg.LogMessage(__FILE__, __LINE__, level, format, ##__VA_ARGS__); }while(0)
#define Scream() do{ lg.FlusshScream(); }while(0)
#define File() do{ lg.FlusshFile(); }while(0)
InetAddr.hpp
#pragma once#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>class InetAddr
{
public:InetAddr(){}InetAddr(const struct sockaddr_in &addr) : _addr(addr){_port = ntohs(addr.sin_port);char ip_buf[32];::inet_ntop(AF_INET, &addr.sin_addr, ip_buf, sizeof(ip_buf));_ip = ip_buf;}bool operator==(const InetAddr &addr){return (this->_ip == addr._ip && this->_port == addr._port);}std::string Ip(){return _ip;}uint16_t Port(){return _port;}struct sockaddr_in Addr(){return _addr;}std::string AddrStr(){return _ip + ":" + std::to_string(_port);}~InetAddr(){}private:std::string _ip;uint16_t _port;struct sockaddr_in _addr;
};
ClientMain.cc(客戶端入口)
#include <iostream>
#include <string>#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>#include "Socket.hpp"
#include "Protocol.hpp"
#include "InetAddr.hpp"using namespace socket_n;int main(int argc, char *argv[])
{if (argc != 3){std::cout << argv[0] << "serve-ip serve-port" << std::endl;exit(0);}std::string severIp = argv[1];uint16_t severPort = std::stoi(argv[2]);// 1.創建socket鏈接遠端服務器SockPtr socketPtr = std::make_shared<TcpSocket>();if (!socketPtr->CreatClientSocket(severIp, severPort)){std::cout << "creat or connect error" << std::endl;exit(1);}std::shared_ptr<Request> reqPtr = std::make_shared<Request>();std::shared_ptr<Response> resPtr = std::make_shared<Response>();std::string packageStream;while (true){// 設置requestreqPtr->_x = 888;reqPtr->_y = 1;reqPtr->_oper = '*';// 序列化std::string sendMessage;reqPtr->Serialize(sendMessage);// 添加報頭sendMessage = EnCode(sendMessage);// 向套接字中寫socketPtr->Send(sendMessage);while (true){// 接收信息std::string recvMessage;ssize_t n = socketPtr->Recv(recvMessage);if (n <= 0)break;packageStream += recvMessage;// 提取一個完整的報文std::string jison = DeCode(packageStream);if (jison == "")continue;std::cout << "jison string: " << std::endl;std::cout << jison << std::endl;// 反序列化resPtr->Deserialize(jison);// 打印resultstd::cout << "result: " << resPtr->_result<< "code: " << resPtr->_code<< "describe: " << resPtr->_describe << std::endl;}sleep(2);}socketPtr->Close();return 0;
}