需要云服務器等云產品來學習Linux的同學可以移步/–>騰訊云<–/官網,輕量型云服務器低至112元/年,優惠多多。(聯系我有折扣哦)
文章目錄
- 1. 相關使用接口
- 2. 代碼實現
- 2.1 日志組件
- 2.2 Server端
- 2.3 Client端
- 2.3 bug解決
- 3. 守護進程
- 3.1 守護進程是什么
- 3.2 守護進程相關的使用
- 3.3 守護進程化的實現原理
1. 相關使用接口
tcp協議和udp協議的接口基本相似。使用邏輯也是:1. 創建對應的socket文件套接字對象; 2. bind自己的網絡信息;3. 進行相關通信
只是由于tcp協議的相關特性,所以tcp通信方式有一些不同點。
1. 對于服務端
在創建對應socket文件套接字對象并bind完成后需要設置sockfd為監聽狀態,使用listen
系統調用。
頭文件:#include <sys/types.h>#include <sys/socket.h>
函數原型:int listen(int sockfd, int backlog);
參數解釋:sockfd:要設置的文件套接字對象backlog:最多允許這么多個客戶端處于連接等待狀態, 如果接收到更多的連接請求就忽略, 這里設置不會太大(一般是5),
函數描述:將sockfd文件套接對象設置為監聽狀態
返回值:調用成功返回0,失敗返回-1同時設置錯誤碼
在設置sockfd為監聽狀態之后,在底層進行”三次握手“之后,服務端需要調用accept
接受客戶端的連接。
頭文件:#include <sys/types.h>#include <sys/socket.h>
函數原型:int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
參數解釋:sockfd:要設置的文件套接字對象(這里傳的是監聽的sockfd)addr:接受的連接對應的相關網絡屬性addrlen:addr對應的對象的大小
函數描述:服務端調用accept接受客戶端的連接。如果服務器調用accept()時還沒有客戶端的連接請求,就阻塞等待直到有客戶端連接上來
返回值:調用成功返回一個新的文件套接字,用于進行本次的客戶端和服務端通信,調用失敗返回-1同時設置錯誤碼
2. 對于客戶端
同樣在初始化的時候需要創建socket文件套接字,同樣的不需要程序員顯示bind。也不需要listen和accept。接下來需要做的事情就是發送連接請求,使用connect
系統調用
頭文件:#include <sys/types.h>#include <sys/socket.h>
函數原型:int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
參數解釋:sockfd:發送鏈接請求的文件套接字對象addr:連接對應的相關網絡屬性addrlen:addr對應的對象的大小
函數描述:客戶端使用sockfd向指定服務器的指定端口發起TCP鏈接請求
返回值:調用成功返回0,調用失敗返回-1同時設置錯誤碼
2. 代碼實現
2.1 日志組件
一般來說,服務器在運行的時候,不會在當前shell輸出相關的運行結果,而是在日志中輸出,所以,這里我們現在封裝一個日志的小組件
1. 組件需求
- 使用
logMessage
函數可以將相關日志信息寫入預設的文件中(在當前目錄創建對應文件) - 每條日志信息都會有相關的日志等級,不同等級在不同文件中
- 日志內容支持format和可變參數
2. 代碼實現
#include <unistd.h>
#include <iostream>
#include <cstdio>
#include <ctime>
#include <cstdarg>// 這里是日志等級對應的宏
#define DEBUG (1 << 0)
#define NORMAL (1 << 1)
#define WARNING (1 << 2)
#define ERROR (1 << 3)
#define FATAL (1 << 4)#define NUM 1024 // 日志行緩沖區大小
#define LOG_NORMAL "log.txt" // 日志存放的文件名
#define LOG_ERR "err.txt"const char *logLevel(int level) // 把日志等級轉變為對應的字符串
{switch (level){case DEBUG:return "DEBUG";case NORMAL:return "NORMAL";case WARNING:return "WARNING";case ERROR:return "ERROR";case FATAL:return "FATAL";default:return "UNKNOW";}
}
//[日志等級][時間][pid]日志內容
void logMessage(int level, const char *format, ...) // 核心調用
{char logprefix[NUM]; // 存放日志相關信息time_t now_ = time(nullptr);struct tm *now = localtime(&now_);snprintf(logprefix, sizeof(logprefix), "[%s][%d年%d月%d日%d時%d分%d秒][pid:%d]",logLevel(level), now->tm_year + 1900, now->tm_mon + 1, now->tm_mday, now->tm_hour, now->tm_min, now->tm_sec, getpid());char logcontent[NUM];va_list arg; // 聲明一個變量arg指向可變參數列表的對象va_start(arg, format); // 使用va_start宏來初始化arg,將它指向可變參數列表的起始位置。// format是可變參數列表中的最后一個固定參數,用于確定可變參數列表從何處開始vsnprintf(logcontent, sizeof(logcontent), format, arg); // 將可變參數列表中的數據格式化為字符串,并將結果存儲到logcontent中FILE *log = fopen(LOG_NORMAL, "a");FILE *err = fopen(LOG_ERR, "a");if(log != nullptr && err != nullptr){FILE *curr = nullptr;if(level == DEBUG || level == NORMAL || level == WARNING) curr = log;if(level == ERROR || level == FATAL) curr = err;if(curr) fprintf(curr, "%s%s\n", logprefix, logcontent);fclose(log);fclose(err);}
}
2.2 Server端
/* tcpServer.hpp */
#pragma once#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>#include <string>#include "log.hpp"namespace Server
{enum{USAGE_ERR = 1,SOCKET_ERR,BIND_ERR,LISTEN_ERR};static const uint16_t gport = 8080;static const int gbacklog = 5;void serviceIO(int sock) // 服務端調用{char buffer[1024];while (true){ssize_t n = read(sock, buffer, sizeof(buffer) - 1);if (n > 0){// 目前我們把讀到的數據當成字符串, 截止目前buffer[n] = 0;std::cout << "recv message: " << buffer << std::endl;std::string outbuffer = buffer;outbuffer += " server[echo]";write(sock, outbuffer.c_str(), outbuffer.size()); // 這里再把結果寫進sock中,意為返回給客戶端}else if (n == 0){// 代表client退出logMessage(NORMAL, "client quit, me too!");break;}}close(sock);}class tcpServer{public:tcpServer(uint16_t &port) : _port(port){}void initServer(){// 1. 創建socket文件套接字對象_listensock = socket(AF_INET, SOCK_STREAM, 0);if (_listensock == -1){logMessage(FATAL, "create socket error");exit(SOCKET_ERR);}logMessage(NORMAL, "create socket success:%d", _listensock);// 2.bind自己的網絡信息sockaddr_in local;local.sin_family = AF_INET;local.sin_port = htons(_port);local.sin_addr.s_addr = INADDR_ANY;int n = bind(_listensock, (struct sockaddr *)&local, sizeof local);if (n == -1){logMessage(FATAL, "bind socket error");exit(BIND_ERR);}logMessage(NORMAL, "bind socket success");// 3. 設置socket為監聽狀態if (listen(_listensock, gbacklog) != 0) // listen 函數{logMessage(FATAL, "listen socket error");exit(LISTEN_ERR);}logMessage(NORMAL, "listen socket success");}void start(){while (true){struct sockaddr_in peer;socklen_t len = sizeof peer;int sock = accept(_listensock, (struct sockaddr *)&peer, &len);if (sock < 0){logMessage(ERROR, "accept error, next");continue;}serviceIO(sock); // 使用close(sock); // 使用之后要關閉,否則會造成文件描述符泄露}}~tcpServer() {}private:uint16_t _port;int _listensock;};} // namespace Server/* tcpServer.cc */
#include <iostream>
#include <memory>#include "tcpServer.hpp"using namespace Server;static void Usage(const char *proc)
{std::cout << "\n\tUsage:" << proc << " local_port\n";
}int main(int argc, char *argv[])
{if(argc != 2){Usage(argv[0]);exit(USAGE_ERR);}uint16_t port = atoi(argv[1]);std::unique_ptr<tcpServer> tsvr(new tcpServer(port));tsvr->initServer();tsvr->start();return 0;
}
2.3 Client端
/* tcpClient.hpp */
#pragma once
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>#include <string>#include "log.hpp"namespace Client
{class tcpClient{public:tcpClient(uint16_t &port, std::string &IP) : _serverPort(port), _serverIP(IP), _sockfd(-1) {}void initClient(){// 1. 創建socket_sockfd = socket(AF_INET, SOCK_STREAM, 0);if(_sockfd == -1){std::cerr << "create socket error" << std::endl;exit(2);}}void run(){struct sockaddr_in server;server.sin_family = AF_INET;server.sin_port = htons(_serverPort);server.sin_addr.s_addr = inet_addr(_serverIP.c_str());if(connect(_sockfd, (struct sockaddr*)&server, sizeof server) != 0){// 鏈接失敗std::cerr << "socket connect error" << std::endl;}else{std::string msg;while(true){std::cout << "Please Enter# ";std::getline(std::cin, msg);write(_sockfd, msg.c_str(), msg.size());char buffer[NUM];int n = read(_sockfd, buffer, sizeof(buffer) - 1); // 按照字符串的形式讀取if(n > 0){// 目前先把讀到的數據當作字符串處理buffer[n] = 0;std::cout << "Server 回顯# " << buffer << std::endl;}else{break;}}}}~tcpClient(){if(_sockfd >= 0) close(_sockfd); // 使用完關閉,防止文件描述符泄露(當然這里也可以不寫,當進程結束之后一切資源都將被回收)}private:uint16_t _serverPort;std::string _serverIP;int _sockfd;};} // namespace Client
/* tcpClient.cc */
#include <memory>
#include <string>#include "tcpClient.hpp"
using namespace Client;static void Usage(const char *proc)
{std::cout << "\n\tUsage:" << proc << " server_ip server_port\n";
}int main(int argc, char* argv[])
{if(argc != 3){Usage(argv[0]);exit(1);}std::string IP = argv[1];uint16_t port = atoi(argv[2]);std::unique_ptr<tcpClient> tclt(new tcpClient(port, IP));tclt->initClient();tclt->run();return 0;
}
2.3 bug解決
這里會出現一個問題:在此時如果再有另一個客戶端進行通信,就會出現其他客戶端被阻塞的問題
這是因為我們在服務端的serviceIO
中的執行沒有結束,而且由于實現的是死循環,所以也不可能結束,這就造成了服務端一直在阻塞的情況。那么如何解決呢?
1. 實現多進程版本
多進程的實現思想就是:每次收到新請求的時候,都創建一個子進程,讓子進程來執行對應任務,父進程繼續監聽,但是由于創建的子進程需要被父進程等待回收,否則就會出現僵尸進程。那么這里的解決方案就是:讓子進程再創建一個子進程,最終讓孫子進程來執行本次請求對應的任務,父進程直接exit,爺爺進程等待父進程結束后繼續監聽。此時孫子進程就變成了孤兒進程,由OS直接接收管理。
這里需要更改的就只有tcpServer.hpp文件中的start函數,這里附上更改后的代碼
void start()
{while (true){struct sockaddr_in peer;socklen_t len = sizeof peer;int sock = accept(_listensock, (struct sockaddr *)&peer, &len);if (sock < 0){logMessage(ERROR, "accept error, next");continue;}pid_t id = fork();if (id == 0){close(_listensock); // 子進程不會使用監聽socket,但是創建子進程的時候寫時拷貝會拷貝,這里先關掉// 子進程再創建子進程if (fork() > 0)exit(0); // 父進程退出// 走到當前位置的就是子進程serviceIO(sock); // 使用close(sock); // 關閉對應的通信socket(這里也可以不關閉,因為此進程在下個語句就會退出)exit(0); // 孫子進程退出}// 走到這里的是監聽進程(爺爺進程)pid_t n = waitpid(id, nullptr, 0);if(n > 0){logMessage(NORMAL, "wait success pid:%d", n);}close(sock); // 使用之后要關閉,否則會造成文件描述符泄露}
}
現在再測試,服務器就能夠同時處理多個客戶端的請求。
2. 實現多線程版本
但是,我們知道OS在創建線程的時候,需要的成本是非常高的,但是線程就非常輕量級,所以使用線程來處理服務器請求是更加合理的,所以這里實現一下多線程的版本
void start()
{while (true){struct sockaddr_in peer;socklen_t len = sizeof peer;int sock = accept(_listensock, (struct sockaddr *)&peer, &len);if (sock < 0){logMessage(ERROR, "accept error, next");continue;}// version 3:多線程版本pthread_t tid;pthread_create(&tid, nullptr, routine, new ThreadData(this, sock)); // 創建新線程,讓新線程調用routine然后去執行serviceIO}
}
static void *routine(void *arg)
{// 由于不能讓主線程等待新線程執行完畢,所以這里進行線程分離pthread_detach(pthread_self());ThreadData* args = static_cast<ThreadData*>(arg);serviceIO(args->_sock);close(args->_sock); // 使用完之后回收sockdelete args; // 回收空間return nullptr;
}
3. 實現線程池版本
當然,上述的兩種實現方式是具有一些優化空間的,因為每次在創建子進程/新線程的時候都會有消耗,這樣會降低效率,而且當突然出現很多長時間的請求的時候,服務器就會同時接收到很多請求,會一直創建子進程/新線程,可能會導致服務器崩潰,所以可以使用我們之前寫過的一個小組件線程池來改寫
void start()
{ThreadPool<Task>::getInstance()->run(); // 初始化線程池,讓他跑起來logMessage(NORMAL, "init thread pool success");while (true){struct sockaddr_in peer;socklen_t len = sizeof peer;int sock = accept(_listensock, (struct sockaddr *)&peer, &len);if (sock < 0){logMessage(ERROR, "accept error, next");continue;}// version 4:線程池版本ThreadPool<Task>::getInstance()->push(Task(sock, serviceIO));}
}
/* 小組件 */
// Task.hpp
#pragma once#include <string>
#include <iostream>
#include <functional>class Task
{
public:using func_t = std::function<void(int)>;public:Task() {}Task(int sock, func_t func): _sock(sock), _callback(func){}void operator()(){_callback(_sock);}private:int _sock;func_t _callback;
};
// Thread.hpp
#pragma once#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>
#include <cassert>class Thread
{
public:using func_t = std::function<void *(void *)>; // 定義func_t類型static int number; // 線程編號,按照一次運行時的調用次數計數
public:Thread(){char *buffer = new char[64];name_ = "thread-" + std::to_string(++number);}static void *start_routine(void *args){Thread *_this = static_cast<Thread *>(args);void *ret = _this->run(_this->args_);return ret;}void *run(void *arg){return func_(arg);}void start(func_t func, void *args){func_ = func;args_ = args;int n = pthread_create(&tid_, nullptr, start_routine, this);assert(n == 0);(void)n;}void join(){int n = pthread_join(tid_, nullptr);assert(n == 0);(void)n;}std::string GetTaskName(){return name_;}~Thread() {}private:std::string name_; // 線程名pthread_t tid_; // 線程idfunc_t func_; // 線程調用的函數void *args_; // 線程調用函數的參數
};
int Thread::number = 0;
// ThreadPool.hpp
#pragma once
#include "LockGuard.hpp"
#include "Thread.hpp"
#include <vector>
#include <queue>
#include <string>
#include <iostream>
#include <mutex>const int gnum = 5; // 線程池中默認的線程個數template <class T>
class ThreadPool; // 線程池類的聲明/* 線程數據類,保存線程對應的內容包括線程池對象的指針和線程名 */
template <class T>
class ThreadData
{
public:ThreadData(ThreadPool<T> *tp, const std::string &n) : threadpool(tp), name(n){};public:ThreadPool<T> *threadpool;std::string name;
};/* 線程池類的實現 */
template <class T>
class ThreadPool
{
public:static void *handleTask(void *args) // 線程需要執行的回調函數{ThreadData<T> *td = static_cast<ThreadData<T> *>(args);while (true){T t; // 構建任務對象{LockGuard lockGuard(td->threadpool->mutex()); // 上鎖while (td->threadpool->isQueueEmpty()){// 如果任務隊列為空,線程掛起,等待隊列中被填充任務td->threadpool->threadWait();}t = td->threadpool->pop(); // 如果隊列中有任務,就拿出任務}// 任務在鎖外執行t();}delete td;return nullptr;}public: // 給handleTask調用的外部接口pthread_mutex_t *mutex() { return &_mutex; }bool isQueueEmpty() { return _task_queue.empty(); }void threadWait() { pthread_cond_wait(&_cond, &_mutex); }T pop() // 獲取線程池中任務隊列里需要執行的下一個任務{T t = _task_queue.front();_task_queue.pop();return t;}public: // 需要暴露給外部的接口void run() // 為所有線程對象創建真正的執行流,并執行對應的回調函數{for (const auto &thread : _threads){ThreadData<T> *td = new ThreadData<T>(this, thread->GetTaskName()); // 構造handleTask的參數對象thread->start(handleTask, td); // 調用該線程的start函數,創建新線程執行指定的handleTask任務// std::cout << thread->GetTaskName() << " start..." << std::endl;}}void push(const T &in) // 將指定任務push到隊列中{// 加鎖LockGuard lockGuard(&_mutex); // 自動加鎖,在當前代碼段結束之后調用LockGuard的析構函數解鎖_task_queue.push(in);pthread_cond_signal(&_cond); // 發送信號表示此時task_queue中有值,讓消費者可以使用}~ThreadPool() // 析構函數,銷毀互斥量和條件變量,delete所有thread對象指針,自動調用thread對象的析構函數{pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);for (auto &thread : _threads){delete thread;}}static ThreadPool<T> *getInstance(){if(nullptr == tp){std::lock_guard<std::mutex> lck(_singletonlock);if(nullptr == tp){tp = new ThreadPool<T> ();}}return tp;}
private: // 單例模式需要私有化的接口ThreadPool(const int &num = gnum) // 構造函數,初始化互斥量和條件變量,構建指定個數的Thread對象{pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);for (int i = 0; i < num; ++i){_threads.push_back(new Thread());}}//delete拷貝構造和析構函數ThreadPool(const ThreadPool<T> &) = delete;ThreadPool<T> *operator=(const ThreadPool<T> &) = delete;private:std::vector<Thread *> _threads; // 保存所有線程對象的指針std::queue<T> _task_queue; // 需要被分配的任務隊列pthread_mutex_t _mutex; // 任務隊列需要被互斥的訪問pthread_cond_t _cond; // 生產任務和消費任務之間需要進行同步static ThreadPool<T> *tp; // 靜態成員,存放ThreadPool指針static std::mutex _singletonlock; // 創建線程安全的單例對象要加的鎖
};
template<class T>
ThreadPool<T> *ThreadPool<T>::tp = nullptr;
template<class T>
std::mutex ThreadPool<T>::_singletonlock;
3. 守護進程
3.1 守護進程是什么
在我們之前實現的代碼中,所有的Server端在運行的時候都會占用前臺的Shell,當這個Shell退出之后,對應的進程也就會退出
但是我們知道:在實際的應用環境中,是不會出現這種情況的,這是因為在實際部署服務的時候,會將對應的服務守護進程化,所謂的守護進程化就是讓對應的進程不受當前會話的影響
守護進程的理解
我們是使用遠程命令行工具來連接我們的云服務器的,這個工具在Windows下會使用Xshell,macOS下使用自帶的終端或者iTerm,或者會使用VScode遠程連接帶有的shell…
在我們登錄成功之后,OS在內部會創建一個會話,在此會話內部創建一個前臺進程bash進行命令行解釋,此時我們就可以想bash中輸入命令,OS幫我們執行。
在一個會話(session)中,同一時間只能有一個前臺進程但是可以有任意個后臺進程的存在
當這個會話結束之后,會話內所有的進程都將會退出,這也就是為什么我們的服務不能長久的在服務器中運行
3.2 守護進程相關的使用
1. &
和jobs
&可以讓一個命令在后臺運行
jobs可以查看當前會話的所有作業(現在可以理解成進程)
- 作業前面的[]內部的數字就是作業號
為什么這個服務運行起來后還能夠輸入命令?
這是因為這個服務變成后臺作業了,一個會話在同一時刻有且只有一個前臺進程
- 通過PGID可以確定同一個進程組
- 通過SID可以確定同一個會話
- fg+作業號:把對應作業放在前臺
- CTRL+z:暫停作業(一個任務在前臺如果暫停了會立馬放在后臺)
- bg+作業號:啟動作業
2. daemon
OS提供了一個守護進程化的接口,但是我們不建議使用,因為這個接口會產生一些未定義行為,所以我們自己封裝一個小組件用于守護進程化。
3.3 守護進程化的實現原理
守護進程化的實現原理就是:讓這個進程自己成為一個會話組,獨立出來就可以不受當前會話的影響
頭文件:#include <unistd.h>
函數原型:pid_t setsid();
函數解釋:對于一個非會話組組長的進程,使其成為一個新的會話組,并且調用進程成為組長
返回值:如果調用成功,返回一個新的SID(SID就是當前會話組的組長的pid);調用失敗返回-1同時設置錯誤碼
守護進程化組件的實現
// daemon.hpp
#pragma once#include <unistd.h>
#include <signal.h>
#include <cstdlib>
#include <cassert>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>#define DEV "/dev/null" // 這個路徑是一個“黑洞”,寫入的所有數據都會被“吃掉”,不會被讀取void deamonSelf(const char *curPath = nullptr) // 可選參數,如果傳入非空,就更改“當前路徑”
{// 1. 讓調用進程忽略掉所有異常信號signal(SIGPIPE, SIG_IGN);// 2. 讓當前進程成為非組長進程if (fork() > 0)exit(0); // 創建子進程,然后將父進程退出確保調用setsid的進程是非組長進程// 3. 調用setsid創建新的會話組pid_t n = setsid();assert(n != -1);// 4. 守護進程是脫離終端的,需要關閉或者重定向以前進程默認打開的文件,這里我們采用重定向的方法更安全int fd = open(DEV, O_RDWR);if(fd > 0){dup2(fd, 0);dup2(fd, 1);dup2(fd, 2);close(fd);}else{close(0);close(1);close(2);}// 5. 可選:是否更改當前路徑if (curPath != nullptr)chdir(curPath);
}
#include <iostream>
#include <memory>#include "tcpServer.hpp"
#include "daemon.hpp"using namespace Server;static void Usage(const char *proc)
{std::cout << "\n\tUsage:" << proc << " local_port\n";
}int main(int argc, char *argv[])
{if(argc != 2){Usage(argv[0]);exit(USAGE_ERR);}uint16_t port = atoi(argv[1]);std::unique_ptr<tcpServer> tsvr(new tcpServer(port));tsvr->initServer();deamonSelf(); // 當前進程守護進程化tsvr->start();return 0;
}
本節完…