TcpServer服務器管理模塊(模塊十)

目錄

類功能

類定義

類實現

編譯測試

server.cc

gdb測試斷點

忽略SIGPIPE信號


類功能

類定義

// TcpServer服務器管理模塊(即全部模塊的整合)
class TcpServer
{
private:uint64_t _next_id;                                  // 這是一個自動增長的連接IDint _port;int _timeout;                                       // 這是非活躍連接的統計時間--多長時間無通信就是非活躍連接bool _enable_inactive_release;                      // 是否啟動非活躍連接超時銷毀的判斷標志EventLoop _baseloop;                                // 這是主線程的EventLoop對象,負責監聽事件的處理Acceptor _acceptor;                                 // 這是監聽套接字的管理對象LoopThreadPool _pool;                               // 這是從屬EventLoop線程池std::unordered_map<uint64_t, PtrConnection> _conns; // 保管所有連接對應的share_ptr對象,這里面的對象被刪除,就意味這某一個連接被刪除using ConnectedCallback = std::function<void(const PtrConnection &)>;using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection &)>;using AnyEventCallback = std::function<void(const PtrConnection &)>;using Functor = std::function<void()>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;
private:void NewConnection(int fd); // 為新連接構造一個Connection進行管理void RemoveConnection(); // 從管理Connection的_conns移除連接信息
public:TcpServer();void SetThreadCount(int count);void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; }void EnableInactiveRelease(int timeout);void RunAfter(const Functor &task, int delay);  // 用于添加一個定時任務void Start();
};

類實現

// TcpServer服務器管理模塊(即全部模塊的整合)
class TcpServer
{
private:uint64_t _next_id; // 這是一個自動增長的連接IDint _port;int _timeout;                                       // 這是非活躍連接的統計時間--多長時間無通信就是非活躍連接bool _enable_inactive_release;                      // 是否啟動非活躍連接超時銷毀的判斷標志EventLoop _baseloop;                                // 這是主線程的EventLoop對象,負責監聽事件的處理Acceptor _acceptor;                                 // 這是監聽套接字的管理對象LoopThreadPool _pool;                               // 這是從屬EventLoop線程池std::unordered_map<uint64_t, PtrConnection> _conns; // 保管所有連接對應的share_ptr對象,這里面的對象被刪除,就意味這某一個連接被刪除using ConnectedCallback = std::function<void(const PtrConnection &)>;using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection &)>;using AnyEventCallback = std::function<void(const PtrConnection &)>;using Functor = std::function<void()>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;private:void RunAfterInLoop(const Functor &task, int delay){_next_id++;_baseloop.TimerAdd(_next_id, delay, task);}// 為新連接構造一個Connection進行管理void NewConnection(int fd){_next_id++;PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));conn->SetMessageCallback(_message_callback);conn->SetClosedCallback(_closed_callback);conn->SetConnectedCallback(_connected_callback);conn->SetAnyEventCallback(_event_callback);conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));if (_enable_inactive_release)conn->EnableInactiveRelease(10); // 啟動非活躍超時銷毀conn->Established();                 // 就緒初始化_conns.insert(std::make_pair(_next_id, conn));}void RemoveConnectionInLoop(const PtrConnection &conn){int id = conn->Id();auto it = _conns.find(id);if (it != _conns.end()){_conns.erase(it);}}// 從管理Connection的_conns移除連接信息void RemoveConnection(const PtrConnection &conn){_baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn));}public:TcpServer(int port) : _port(port),_next_id(0),_enable_inactive_release(false),_acceptor(&_baseloop, port),_pool(&_baseloop){_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));_acceptor.Listen(); // 將監聽套接字掛到baseloop上}void SetThreadCount(int count) { return _pool.SetThreadCount(count); }void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; }void EnableInactiveRelease(int timeout) { _timeout = timeout, _enable_inactive_release = true; }// 用于添加一個定時任務void RunAfter(const Functor &task, int delay){_baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay));}void Start(){_pool.Create(); // 創建線程池中的從屬線程_baseloop.Start();}
};

編譯測試

為了便于測試整合,創建了一個新的文件server.cc

server.cc

#include "../source/server.hpp"void OnConnected(const PtrConnection &conn)
{DBG_LOG("NEW CONNECTION:%p", conn.get());
}
void OnClosed(const PtrConnection &conn)
{DBG_LOG("CLOSE CONNECTION:%p", conn.get());
}
void OnMessage(const PtrConnection &conn, Buffer *buf)
{DBG_LOG("%s", buf->ReadPosition());buf->MoveReadOffset(buf->ReadAbleSize());std::string str = "Hello World";conn->Send(str.c_str(), str.size());conn->Shutdown(); // 調用關閉接口
}int main()
{TcpServer server(8500);server.SetThreadCount(2);server.EnableInactiveRelease(10);server.SetClosedCallback(OnClosed);server.SetConnectedCallback(OnConnected);server.SetMessageCallback(OnMessage);server.Start();return 0;
}

服務端

客戶端

符合預期

gdb測試斷點

在測試的過程中,出現了一些小問題,通過斷點進行處理

忽略SIGPIPE信號

前幾天測試發生的問題,可以依靠創建下面一個類來忽略該信號的觸發

忽略SIGPIPE信號,當連接斷開的時候,如果我們繼續向對端send發送信息,就會觸發異常,即SIGPIPE異常,這個就是導致客戶端異常退出的原因

// 忽略SIGPIPE信號,當連接斷開的時候,如果我們繼續向對端send發送信息,就會觸發異常,即SIGPIPE異常,這個就是導致客戶端異常退出的原因
class NetWork{public:NetWork(){DBG_LOG("SIGPIPE INIT");signal(SIGPIPE, SIG_IGN); // 忽視SIGPIPE異常,這個會導致進程退出}
};
static NetWork nw;  // 這個是為了執行里面的構造函數

服務器主體源碼

因為寫到這里已經算是到了一定程度了,也就是說服務器的部分已經基本完成,后續會以次為基礎,創建一個回顯服務器,這里就直接貼代碼了,不要嫌長

#ifndef __M_SERVER_H__  
#define __M_SERVER_H__
#include <iostream>
#include <vector>
#include <cstdint>
#include <cassert>
#include <ctime>
#include <cstring>
#include <string>
#include <unistd.h>
#include <typeinfo>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <fcntl.h>
#include <functional>
#include <signal.h>
#include <unordered_map>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/timerfd.h>
#include <sys/socket.h>
#include <sys/types.h>#define INF 0
#define DBG 1
#define ERR 2
#define LOG_LEVEL DBG#define LOG(level, format, ...)                                                                                        \do                                                                                                                 \{                                                                                                                  \if (level < LOG_LEVEL)                                                                                         \break;                                                                                                     \time_t t = time(NULL);                                                                                         \struct tm *ltm = localtime(&t);                                                                                \char tmp[32] = {0};                                                                                            \strftime(tmp, 31, "%H:%M:%S", ltm);                                                                            \fprintf(stdout, "[%p %s %s:%d] " format "\n", (void *)pthread_self(), tmp, __FILE__, __LINE__, ##__VA_ARGS__); \} while (0)#define INF_LOG(format, ...) LOG(INF, format, ##__VA_ARGS__)
#define DBG_LOG(format, ...) LOG(DBG, format, ##__VA_ARGS__)
#define ERR_LOG(format, ...) LOG(ERR, format, ##__VA_ARGS__)// 緩沖區類
#define BUFFER_DEFAULT_SIZE 1024 // Buffer 默認起始大小
class Buffer
{
private:std::vector<char> _buffer; // 使用vector進行內存空間管理uint64_t _reader_idx;      // 讀偏移uint64_t _writer_idx;      // 寫偏移
public:Buffer() : _reader_idx(0), _writer_idx(0), _buffer(BUFFER_DEFAULT_SIZE) {}char *Begin() { return &*_buffer.begin(); }// 獲取當前寫入起始地址char *WritePosition() { return Begin() + _writer_idx; }// 獲取當前讀取起始地址char *ReadPosition() { return Begin() + _reader_idx; }// 獲取緩沖區末尾空閑空間大小--寫偏移之后的空閑空間, 總體空間大小減去寫偏移uint64_t TailIdleSize() { return _buffer.size() - _writer_idx; }// 獲取緩沖區起始空閑空間大小--讀偏移之前的空閑空間uint64_t HeadIdleSize() { return _reader_idx; }// 獲取可讀數據大小 = 寫偏移 - 讀偏移uint16_t ReadAbleSize() { return _writer_idx - _reader_idx; };// 將讀偏移向后移動void MoveReadOffset(uint64_t len){if (len == 0)return;// 向后移動的大小, 必須小于可讀數據大小assert(len <= ReadAbleSize());_reader_idx += len;}// 將寫偏移向后移動void MoveWriteOffset(uint64_t len){// 向后移動的大小,必須小于當前后邊的空閑空間大小assert(len <= TailIdleSize());_writer_idx += len;}// 確保可寫空間足夠(整體空閑空間夠了就移動數據,否則就擴容)void EnsureWriteSpace(uint64_t len){// 如果末尾空閑空間大小足夠,直接返回if (TailIdleSize() >= len){return;}// 末尾空閑空間不夠,則判斷加上起始位置的空閑空間大小是否足夠,夠了就將數據移動到起始位置if (len <= TailIdleSize() + HeadIdleSize()){// 將數據移動到起始位置uint64_t rsz = ReadAbleSize();                            // 把當前數據大小先保存起來std::copy(ReadPosition(), ReadPosition() + rsz, Begin()); // 把可讀數據拷貝到起始位置_reader_idx = 0;                                          // 將讀偏移歸0_writer_idx = rsz;                                        // 將寫位置置為可讀數據大小, 因為當前的可讀數據大小就是寫偏移量}else{// 總體空間不夠,則需要擴容,不移動數據,直接給寫偏移之后擴容足夠空間即可_buffer.resize(_writer_idx + len);}}// 寫入數據void Write(const void *data, uint64_t len){// 1.保證有足夠空間, 2.拷貝數據進去EnsureWriteSpace(len);const char *d = (const char *)data;std::copy(d, d + len, WritePosition());}void WriteAndPush(const void *data, uint64_t len){Write(data, len);MoveWriteOffset(len);}void WriteString(const std::string &data){return Write(data.c_str(), data.size());}void WriteStringAndPush(const std::string &data){WriteString(data);MoveWriteOffset(data.size());}void WriteBuffer(Buffer &data){return Write(data.ReadPosition(), data.ReadAbleSize());}void WriteBufferAndPush(Buffer &data){WriteBuffer(data);MoveWriteOffset(data.ReadAbleSize());}// 讀取數據void Read(void *buf, uint64_t len){// 要求獲取的數據大小必須小于可讀數據大小assert(len <= ReadAbleSize());std::copy(ReadPosition(), ReadPosition() + len, (char *)buf);}void ReadAndPop(void *buf, uint64_t len){Read(buf, len);MoveReadOffset(len);}std::string ReadAsString(uint64_t len){// 要求獲取的數據大小必須小于可讀數據大小assert(len <= ReadAbleSize());std::string str;str.resize(len);Read(&str[0], len); // 這里不直接用str.c_str()的原因是,這個的返回值是const類型return str;}std::string ReadAsStringAndPop(uint64_t len){assert(len <= ReadAbleSize());std::string str = ReadAsString(len);MoveReadOffset(len);return str;}char *FindCRLF(){char *res = (char *)memchr(ReadPosition(), '\n', ReadAbleSize());return res;}// 這種情況針對的是,通常獲取一行數據std::string GetLine(){char *pos = FindCRLF();if (pos == NULL)return "";// +1 是為了把換行字符也取出來return ReadAsString(pos - ReadPosition() + 1);}std::string GetLineAndPop(){std::string str = GetLine();MoveReadOffset(str.size());return str;}// 清空緩沖區void Clear(){// 只需要將偏移量歸0即可_reader_idx = 0;_writer_idx = 0;}
};// 套接字類
#define MAX_LISTEN 1024
class Socket
{
private:int _sockfd;public:Socket() : _sockfd(-1) {}Socket(int fd) : _sockfd(fd) {}~Socket() { Close(); };int Fd() { return _sockfd; }// 創建套接字bool Create(){// int socket(int domain, int type, int protocol)_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);if (_sockfd < 0){ERR_LOG("CREATE SOCKET FAILED!");return false;}return true;}// 綁定地址信息bool Bind(const std::string &ip, uint16_t port){struct sockaddr_in addr;addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(struct sockaddr_in);// int bind(int sockfd, struct sockaddr* addr, socklen_t len)int ret = bind(_sockfd, (struct sockaddr *)&addr, len);if (ret < 0){ERR_LOG("BIND ADDRESS FAILED!");return false;}return true;}// 開始監聽bool Listen(int backlog = MAX_LISTEN){// int listen(int backlog)int ret = listen(_sockfd, backlog);if (ret < 0){ERR_LOG("SOCKET LISTEN FAILED!");return false;}return true;}// 向服務器發起連接bool Connect(const std::string &ip, uint16_t port){struct sockaddr_in addr;addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(struct sockaddr_in);// int connect(int sockfd, struct sockaddr* addr, socklen_t len)int ret = connect(_sockfd, (struct sockaddr *)&addr, len);if (ret < 0){ERR_LOG("CONNECT SERVER FAILED!");return false;}return true;}// 獲取新連接int Accept(){// int accept(int sockfd, struct sockaddr *addr, socklen_t *len);int newfd = accept(_sockfd, NULL, NULL);if (newfd < 0){ERR_LOG("SOCKET ACCEPT FAILED!");return -1;}return newfd;}// 接收數據ssize_t Recv(void *buf, size_t len, int flag = 0) // 0 阻塞{// ssize_t recv(int sockfd, void *buf, size_t len, int flag)ssize_t ret = recv(_sockfd, buf, len, flag);if (ret <= 0){// EAGAIN 當前的接收緩沖區中沒用數據了,在非阻塞的情況下才有這個錯誤// EINTR 表示當前socket的阻塞等待,被信號打斷了if (errno == EAGAIN || errno == EINTR){return 0; // 表示這次沒用接收到數據}ERR_LOG("SOCKET RECV FAILED!");return -1;}return ret; // 實際接收的數據長度}ssize_t NonBlockRecv(void *buf, size_t len){return Recv(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示當前接收為非阻塞}// 發送數據ssize_t Send(const void *buf, size_t len, int flag = 0){// ssize_t send(int sockfd, void *data, size_t len, int flag)ssize_t ret = send(_sockfd, buf, len, flag);if (ret < 0){if (errno == EAGAIN || errno == EINTR){return 0;}ERR_LOG("SOCKET SEND FAILED!!");return -1;}return ret; // 實際發送的數據長度}ssize_t NonBlockSend(void *buf, size_t len){if (len == 0)return 0;return Send(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示當前接收為非阻塞}// 關閉套接字void Close(){if (_sockfd != -1){close(_sockfd);_sockfd = -1;}}// 創建一個服務器連接bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false) // 接收全部{// 1.創建套接字 2.綁定地址 3.開始監聽 4.設置非阻塞 5.啟動地址重用if (Create() == false)return false;if (block_flag)NonBlock(); // 默認阻塞if (Bind(ip, port) == false)return false;if (Listen() == false)return false;ReuseAddress();return true;}// 創建一個客戶端連接bool CreateClient(uint16_t port, const std::string &ip){// 1.創建套接字 2.指向連接服務器if (Create() == false)return false;if (Connect(ip, port) == false)return false;return true;}// 設置套接字選項 -- 開啟地址端口重用void ReuseAddress(){// int setsockopt(int fd, int level, int optname, void *val, int vallen)int val = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(int)); // 地址val = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(int)); // 端口號}// 設置套接字阻塞屬性 -- 設置為非阻塞void NonBlock(){// int fcntl(int fd, int cmd, .../*arg*/)int flag = fcntl(_sockfd, F_GETFL, 0);fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);}
};class Poller; // 整合測試1:聲明
class EventLoop;
// Channel類
class Channel
{
private:int _fd;EventLoop *_loop;uint32_t _events;  // 當前需要監控的事件uint32_t _revents; // 當前連接觸發的事件using EventCallback = std::function<void()>;EventCallback _read_callback;  // 可讀事件被觸發的回調函數EventCallback _write_callback; // 可寫事件被觸發的回調函數EventCallback _error_callback; // 錯誤事件被觸發的回調函數EventCallback _close_callback; // 連接斷開事件被觸發的回調函數EventCallback _event_callback; // 任意事件被觸發的回調函數
public:Channel(EventLoop *loop, int fd) : _fd(fd), _events(0), _revents(0), _loop(loop) {}int Fd() { return _fd; }uint32_t Events() { return _events; } // 獲取想要監控的事件void SetREvents(uint32_t events) { _revents = events; }void SetReadCallback(const EventCallback &cb) { _read_callback = cb; } // 設置實際就緒的事件void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; }void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; }void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; }void SetEventCallback(const EventCallback &cb) { _event_callback = cb; }// 當前是否監控了可讀bool ReadAble() { return (_events & EPOLLIN); }// 當前是否監控了可寫bool WriteAble() { return (_events & EPOLLOUT); }// 啟動讀事件監控void EnableRead(){_events |= EPOLLIN;Update();}// 啟動寫事件監控void EnableWrite(){_events |= EPOLLOUT;Update();}// 關閉讀事件監控void DisableRead(){_events &= ~EPOLLIN;Update();}// 關閉寫事件監控void DisableWrite(){_events &= ~EPOLLOUT;Update();}// 關閉所有事件監控void DisableAll(){_events = 0;Update();}// 移除監控void Remove(); // 聲明和實現要分離,因為實現的時候是不知道里面有什么函數成員的void Update(); // 這兩個特殊,所以把實現放在Poller類的下面進行實現// 事件處理,一旦觸發了事件,就調用這個函數,自己觸發了什么事件如何處理自己決定void HandleEvent(){// 第二參數,對方關閉連接,第三參數,帶外數據if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)){if (_event_callback) // 不管任何事件,都調用的回調函數_event_callback();if (_read_callback)_read_callback();}// 有可能會釋放連接的操作事件,一次只處理一個if (_revents & EPOLLOUT){if (_event_callback)_event_callback(); // 放到事件處理完畢后調用,刷新活躍度if (_write_callback)_write_callback();}else if (_revents & EPOLLERR){if (_event_callback)_event_callback();if (_error_callback)_error_callback();}else if (_revents & EPOLLHUP){if (_event_callback)_event_callback();if (_close_callback)_close_callback();}}
};// Poller描述符監控類
#define MAX_EPOLLEVENTS 1024
class Poller
{
private:int _epfd;struct epoll_event _evs[MAX_EPOLLEVENTS];std::unordered_map<int, Channel *> _channels;private:// 對epoll的直接操作void Update(Channel *channel, int op){// int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev)int fd = channel->Fd();struct epoll_event ev;ev.data.fd = fd;ev.events = channel->Events();int ret = epoll_ctl(_epfd, op, fd, &ev);if (ret < 0){ERR_LOG("EPOLLCTL FAILED!");}return;}// 判斷一個Channel 是否已經添加了事件監控bool HasChannel(Channel *channel){auto it = _channels.find(channel->Fd());if (it == _channels.end()){return false;}return true;}public:Poller(){_epfd = epoll_create(MAX_EPOLLEVENTS); // 這個值大于0就行了,無用處if (_epfd < 0){ERR_LOG("EPOLL CREATE FAILED!");abort(); // 退出程序}}// 添加或修改監控事件void UpdateEvent(Channel *channel){bool ret = HasChannel(channel);if (ret == false){// 不存在則添加_channels.insert(std::make_pair(channel->Fd(), channel));return Update(channel, EPOLL_CTL_ADD);}return Update(channel, EPOLL_CTL_MOD);}// 移除監控void RemoveEvent(Channel *channel){auto it = _channels.find(channel->Fd());if (it != _channels.end()){_channels.erase(it);}Update(channel, EPOLL_CTL_DEL);}// 開始監控, 返回活躍連接void Poll(std::vector<Channel *> *active){// int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout);int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1); // -1阻塞監控if (nfds < 0){if (errno == EINTR) // 信號打斷{return;}ERR_LOG("EPOLL WAIT ERROR:%s\n", strerror(errno));abort();}for (int i = 0; i < nfds; i++) // 添加活躍信息{auto it = _channels.find(_evs[i].data.fd); // 沒找到就說明不在我們的管理之下,這是不正常的assert(it != _channels.end());it->second->SetREvents(_evs[i].events); // 設置實際就緒的事件active->push_back(it->second);}return;}
};// timerwheel時間輪定時器類
using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask
{
private:uint64_t _id;         // 定時器任務對象uint32_t _timeout;    // 定時任務的超時時間bool _canceled;       // false-表示沒有被取消,true-表示被取消TaskFunc _task_cb;    // 定時器要執行的定時任務ReleaseFunc _release; // 用于刪除TimerWheel中保存的定時器對象信息
public:TimerTask(uint64_t id, uint32_t delay, const TaskFunc &cb) : _id(id), _timeout(delay), _task_cb(cb), _canceled(false) {}~TimerTask(){if (_canceled == false)_task_cb();_release();}void Cancel() { _canceled = true; }void SetRelease(const ReleaseFunc &cb) { _release = cb; }uint32_t DelayTime() { return _timeout; } // 返回時間
};class TimerWheel
{
private:using WeakTask = std::weak_ptr<TimerTask>;using PtrTask = std::shared_ptr<TimerTask>;int _tick;     // 當前的的秒針,走到哪里哪里就釋放執行int _capacity; // 表盤最大數量 -- 其實就是最大延遲時間std::vector<std::vector<PtrTask>> _wheel;// 用weak_ptr來構造出新的shared_ptr用來計數,不過后續要記得釋放std::unordered_map<uint64_t, WeakTask> _timers;EventLoop *_loop;int _timerfd; // 定時器描述符 -- 可讀事件回調就是讀取計數器,執行定時任務std::unique_ptr<Channel> _timer_channel;private:void RemoveTimer(uint64_t id){auto it = _timers.find(id);if (it != _timers.end()){_timers.erase(it);}}static int CreateTimerfd(){// int timerfd_create(int clockid, int flags);int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);if (timerfd < 0){ERR_LOG("TIMERFD CREATE FAILED!");abort();}// int timerfd_settime(int fd, int flags, struct itimerspec *new, struct itimerspec);struct itimerspec itime;itime.it_value.tv_sec = 1;                 // 設置 秒鐘itime.it_value.tv_nsec = 0;                // 設置 納秒 第一次超時時間為1s后itime.it_interval.tv_sec = 1;              // 同上itime.it_interval.tv_nsec = 0;             // 第一次超時后,每隔超時的間隔時timerfd_settime(timerfd, 0, &itime, NULL); // 0代表阻塞式return timerfd;}void ReadTimefd(){uint64_t times;int ret = read(_timerfd, &times, 8);if (ret < 0){ERR_LOG("READ TIMERFD FAILED!");abort();}return;}// 這個函數應該每秒鐘被執行一次,相當于秒鐘向后走了一步void RunTimerTask(){_tick = (_tick + 1) % _capacity;_wheel[_tick].clear(); // 清空指定位置的數組,就會把數組中保存的所有管理定時器對象的shared_ptr釋放掉.從而執行函數}void OnTime(){ReadTimefd();RunTimerTask();}void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb) // 添加定時任務{PtrTask pt(new TimerTask(id, delay, cb));                      // 實例化定時任務對象pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id)); // 第0個位置是隱藏的this指針。再把任務id綁定進去int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt);_timers[id] = WeakTask(pt);}// 刷新/延遲定時任務void TimerRefreshInLoop(uint64_t id){// 通過保存的定時器對象的weak_ptr構造一個shared_ptr出來, 添加到輪子中auto it = _timers.find(id);if (it == _timers.end()){return; // 沒找到定時任務, 沒法刷新,沒法延遲}PtrTask pt = it->second.lock(); // lock獲取weak_ptr管理的對象對應的shared_ptrint delay = pt->DelayTime();    // 獲取到了初始的延遲時間int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt);}void TimerCancelInLoop(uint64_t id){auto it = _timers.find(id);if (it == _timers.end()){return; // 沒找到定時任務, 沒法刷新,沒法延遲}PtrTask pt = it->second.lock(); // 當還沒有過期才進行取消if (pt)pt->Cancel();}public:TimerWheel(EventLoop *loop) : _capacity(60), _tick(0), _wheel(_capacity), _loop(loop),_timerfd(CreateTimerfd()), _timer_channel(new Channel(_loop, _timerfd)){_timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));_timer_channel->EnableRead(); // 啟動讀事件監控}/*定時器中有個_timers成員,定時器信息的操作有可能在多線程中進行,因此需要考慮線程安全問題*//*如果不想加鎖,那就把對定期的所有操作,都放在一個線程中進行*/void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb);// 刷新/延遲定時任務void TimerRefresh(uint64_t id);void TimerCancel(uint64_t id);/*這個接口存在線程安全問題--這個接口實際上不能被外界使用者調用,只能在模塊內,對應的EventLoop線程內執行*/bool HasTimer(uint64_t id){auto it = _timers.find(id);if (it == _timers.end()){return false; // 沒找到定時任務, 沒法刷新,沒法延遲}return true;}
};// EventLoop事件監控處理類
class EventLoop
{
private:using Functor = std::function<void()>;std::thread::id _thread_id;              // 線程IDint _event_fd;                           // eventfd喚醒IO事件監控有可能導致的阻塞std::unique_ptr<Channel> _event_channel; // 智能指針Poller _poller;                          // 進行所有描述符的事件監控std::vector<Functor> _tasks;             // 任務池std::mutex _mutex;                       // 實現任務池操作的線程安全TimerWheel _timer_wheel;                 // 定時器模塊
public://  執行任務池中的所有任務void RunAllTask(){std::vector<Functor> functor;{std::unique_lock<std::mutex> _lock(_mutex);_tasks.swap(functor);}for (auto &f : functor){f();}return;}static int CreateEventFd(){int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if (efd < 0){ERR_LOG("CREATE EVENTFD FAILED!!");abort(); // 讓程序異常退出}return efd;}void ReadEventfd(){uint64_t res = 0;int ret = read(_event_fd, &res, sizeof(res));if (ret < 0){// EINTR -- 被信號打斷, EAGAIN -- 表示無數據可讀if (errno == EINTR || EAGAIN){return;}ERR_LOG("READ EVENTFD FAILED!");abort();}return;}void WeakUpEventFd(){uint64_t val = 1;int ret = write(_event_fd, &val, sizeof(val));if (ret < 0){if (errno == EINTR){return;}ERR_LOG("READ EVENTFD FAILED!");abort();}return;}public:EventLoop() : _thread_id(std::this_thread::get_id()),_event_fd(CreateEventFd()),_event_channel(new Channel(this, _event_fd)),_timer_wheel(this){// 給eventfd添加可讀事件回調函數,讀取eventfd事件通知次數_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));// 啟動eventfd的讀事件監控_event_channel->EnableRead();}// 三步走--事件監控-》就緒事件處理-》執行任務void Start(){while (1){// 1.事件監控std::vector<Channel *> actives;_poller.Poll(&actives);// 2.事件處理for (auto &channel : actives){channel->HandleEvent();}// 3.執行任務RunAllTask();}}// 用于判斷當前線程是否是EventLoop對應的線程bool IsInLoop(){return (_thread_id == std::this_thread::get_id());}void AssertInLoop(){assert(_thread_id == std::this_thread::get_id());}// 判斷將要執行的任務是否處于當前線程中,如果是則執行,否則壓入隊列void RunInLoop(const Functor &cb){if (IsInLoop()){return cb();}return QueueInLoop(cb);}// 將操作壓入任務池void QueueInLoop(const Functor &cb){{std::unique_lock<std::mutex> _lock(_mutex);_tasks.push_back(cb);}// 喚醒有可能因為沒有事件就緒,而導致的epoll阻塞// 其實就是給eventfd寫入一個數據,eventfd就會觸發可讀事件WeakUpEventFd();}// 添加/修改描述符的事件監控void UpdateEvent(Channel *channel) { return _poller.UpdateEvent(channel); }// 移除描述符的監控void RemoveEvent(Channel *channel) { return _poller.RemoveEvent(channel); }void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { return _timer_wheel.TimerAdd(id, delay, cb); }void TimerRefresh(uint64_t id) { return _timer_wheel.TimerRefresh(id); }void TimerCancel(uint64_t id) { return _timer_wheel.TimerCancel(id); }bool HasTimer(uint64_t id) { return _timer_wheel.HasTimer(id); }
};// EventLoop實例化管理類
class LoopThread
{
private:/* 用于實現_loop獲取的同步關系,避免線程創建了,但是_loop還沒有實例化之前去獲取_loop*/std::mutex _mutex;             // 互斥鎖std::condition_variable _cond; // 條件變量EventLoop *_loop;              // EventLoop指針變量,這個對象需要在線程內實例化std::thread _thread;           // EventLoop對應的線程
private:/*實例化EventLoop對象,喚醒_cond上有可能阻塞的線程,并且開始運行EventLoop模塊的功能*/void ThreadEntry(){EventLoop loop;{std::unique_lock<std::mutex> lock(_mutex);_loop = &loop;_cond.notify_all();}loop.Start();}public:/*創建線程,設定線程入口函數*/LoopThread() : _loop(NULL), _thread(std::thread(&LoopThread::ThreadEntry, this)) {}/*返回當前線程關聯的EventLoop對象指針*/EventLoop *GetLoop(){EventLoop *loop = NULL;{std::unique_lock<std::mutex> lock(_mutex); // 加鎖_cond.wait(lock, [&](){ return _loop != NULL; }); // loop為NULL就一直阻塞loop = _loop;}return loop;}
};// LoopThreadPool管理LoopThread創建主從線程類
class LoopThreadPool
{
private:int _thread_count;int _next_idx;EventLoop *_baseloop;std::vector<LoopThread *> _threads;std::vector<EventLoop *> _loops;public:LoopThreadPool(EventLoop *baseloop) : _thread_count(0), _next_idx(0), _baseloop(baseloop) {}void SetThreadCount(int count) { _thread_count = count; }void Create(){if (_thread_count > 0){_threads.resize(_thread_count); // 里面存放的是指針不是對象,所以可以直接擴大_loops.resize(_thread_count);for (int i = 0; i < _thread_count; i++){_threads[i] = new LoopThread();     // 構造無參數_loops[i] = _threads[i]->GetLoop(); // 在上一句構造線程還沒有創建完成會一直阻塞,因此不用擔心在創建期間就分配連接}}}EventLoop *NextLoop(){if (_thread_count == 0)return _baseloop; // 沒有從線程就直接返回主線程_next_idx = (_next_idx + 1) % _thread_count;return _loops[_next_idx];}
};class Any
{
private:class holder{public:virtual ~holder() {}virtual const std::type_info &type() = 0;virtual holder *clone() = 0;};template <class T>class placeholder : public holder{public:placeholder(const T &val) : _val(val) {}// 獲取子類對象保存的數據類型virtual const std::type_info &type() { return typeid(T); }// 針對當前的對象自身,克隆出一個新的子類對象virtual holder *clone() { return new placeholder(_val); }// 析構用數據自身的就行了public:T _val;};holder *_content;public:Any() : _content(nullptr) {}template <class T>Any(const T &val) : _content(new placeholder<T>(val)) {}Any(const Any &other) : _content(other._content ? other._content->clone() : nullptr) {}~Any() { delete _content; }Any &swap(Any &other){std::swap(_content, other._content);return *this;}// 返回子類對象保存的數據的指針template <class T>T *get(){// 想要獲取的數據類型,必須和保存的數據類型一致assert(typeid(T) == _content->type());return &((placeholder<T> *)_content)->_val;}// 賦值運算符的重載函數template <class T>Any &operator=(const T &val){// 為val構造一個臨時的通用容器,然后與當前容器自身進行指針交換,臨時對象釋放的時候,原先保存的數據也就被釋放了Any(val).swap(*this);return *this;}Any &operator=(const Any &other){Any(other).swap(*this);return *this;}
};// 連接管理類
class Connection;
// DISCONNECTED -- 連接關閉狀態  CONNECTING -- 連接建立成功-待處理狀態
// CONNECTED -- 連接建立完成,各種設置已完成,可以通信狀態    DISCONNECTING -- 待關閉狀態
typedef enum
{DISCONNECTED,CONNECTING,CONNECTED,DISCONNECTING
} ConnStatu;
using PtrConnection = std::shared_ptr<Connection>;
class Connection : public std::enable_shared_from_this<Connection>
{
private:uint64_t _conn_id; // 連接的唯一ID,便于連接的管理和查找// uint64_t _timer_id; // 定時器ID,必須是唯一的,這塊是為了簡化操作使用conn_id作為定時器int _sockfd;                   // 連接關聯的文件描述符bool _enable_inactive_release; // 連接是否啟動非活躍的判斷標志,默認為falseEventLoop *_loop;              // 連接所關聯的一個EventLoopConnStatu _statu;              // 連接狀態Socket _socket;                // 套接字操作管理Channel _channel;              // 連接的事件管理Buffer _in_buffer;             // 輸入緩沖區--存放從socket中讀取到的數據Buffer _out_buffer;            // 輸出緩沖區--存放要發送給對端的數據Any _context;                  // 請求的接收處理上下文/*這四個回調函數,是讓服務器模塊來設置的(其實服務器模塊的處理回調也是組件使用者設置的)*//*換句話來說,這幾個回調都是組件使用者使用的*/using ConnectedCallback = std::function<void(const PtrConnection &)>;using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection &)>;using AnyEventCallback = std::function<void(const PtrConnection &)>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;/*組件內的連接關閉回調--組件內設置的,因為服務器組件內會把所有的連接管理起來,一旦某個連接要關閉*//*就應該從管理的地方移除掉自己的信息*/ClosedCallback _server_closed_callback;private:/*五個channel的事件回調函數*/// 描述符可讀事件觸發后調用的函數,接收socket數據放到接收緩沖區中,然后調用_message_callbackvoid HandleRead(){// 1.接收socket的數據,放到緩沖區char buf[65536];ssize_t ret = _socket.NonBlockRecv(buf, 65535);if (ret < 0){// 出錯了,不能直接關閉連接return ShutdownInLoop();}// 這里的等于0表示的是沒有讀取到數據,而并不是連接斷開了,連接斷開返回的是-1// 將數據放入輸入緩沖區,寫入之后順便將寫偏移向后移動_in_buffer.WriteAndPush(buf, ret);// 2.調用message_callback進行業務處理if (_in_buffer.ReadAbleSize() > 0){// shard_from_this--從當前對象自身獲取自身的shared_ptr管理對象return _message_callback(shared_from_this(), &_in_buffer);}}// 描述符可寫事件觸發后調用的函數,將發送緩沖區中的數據進行發送void HandleWrite(){// _out_buffer中保存的就是要發送的數據ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());if (ret < 0){// 發送錯誤就應該關閉連接了if (_in_buffer.ReadAbleSize() > 0){_message_callback(shared_from_this(), &_in_buffer);}}_out_buffer.MoveReadOffset(ret); // 千萬不要忘了,將讀偏移向后移動if (_out_buffer.ReadAbleSize() == 0){_channel.DisableWrite(); // 沒有數據待發送,關閉寫事件監控// 如果當前是連接待關閉狀態,則有數據,發送完數據釋放連接,沒有數據則直接釋放if (_statu == DISCONNECTING){return ReleaseInLoop(); // 這時候就是實際的關閉釋放操作了}}return;}// 描述符觸發掛斷事件void HandleClose(){/*一旦連接掛斷了,套接字就什么都干不了了,因此有數據待處理就處理一下,完畢關閉連接*/if (_in_buffer.ReadAbleSize() > 0){_message_callback(shared_from_this(), &_in_buffer);}return ReleaseInLoop();}// 描述符觸發出錯事件void HandleError(){return HandleClose();}// 描述符觸發任意事件: 1.刷新連接活躍度--延遲定時銷毀任務 2.調用組件使用者的任意事件回調void HandleEvent(){if (_enable_inactive_release == true){_loop->TimerRefresh(_conn_id);}if (_event_callback){_event_callback(shared_from_this());}}// 連接獲取之后,所處的狀態要進行各種設置(給channel設置事件回調,啟動讀監控,調用回調函數)void EstablishedInLoop(){// 1.修改連接狀態   2.啟動讀事件監控    3.調用回調函數assert(_statu == CONNECTING); // 當前狀態必須一定是上層的半連接狀態_statu = CONNECTED;           // 當前函數執行完畢,則連接進入已完成連接狀態// 一旦啟動讀事件監控就有可能會立即觸發讀事件,如果這時候啟動了非活躍連接銷毀_channel.EnableRead();if (_connected_callback)_connected_callback(shared_from_this());}// 這個接口才是實際的釋放接口void ReleaseInLoop(){// 1.修改連接狀態,將其置為DISCONNECTED_statu = DISCONNECTED;// 2.移除連接的事件監控_channel.Remove();// 3.關閉描述符_socket.Close();// 4.如果當前定時器隊列中還有定時銷毀任務,則取消任務if (_loop->HasTimer(_conn_id))CancelInactiveReleaseInLoop();// 5.調用關閉回調函數,避免先移除服務器管理的連接信息導致Connection被釋放,再去處理會出錯,因此先調用用戶的回調函數if (_closed_callback)_closed_callback(shared_from_this());// 移除服務器內部管理的連接信息if (_server_closed_callback)_server_closed_callback(shared_from_this());}// 這個并不是實際的發送接口,而只是把數據放到了發送緩沖區,啟動了可寫事件監控void SendInLoop(Buffer &buf){if (_statu == DISCONNECTED)return;_out_buffer.WriteBufferAndPush(buf); // 可以在這個函數后面加上const表示不修改thisif (_channel.WriteAble() == false){_channel.EnableWrite();}}// 這個關閉操作并非實際的連接釋放操作,需要判斷還有沒有數據待處理,待發送void ShutdownInLoop(){_statu = DISCONNECTING; // 設置連接為半關閉狀態if (_in_buffer.ReadAbleSize() > 0){if (_message_callback)_message_callback(shared_from_this(), &_in_buffer);}// 要么就是寫入數據的時候出錯關閉,要么就是沒有待發送數據,直接關閉if (_out_buffer.ReadAbleSize() > 0){if (_channel.WriteAble() == false){_channel.EnableWrite();}}if (_out_buffer.ReadAbleSize() == 0){ReleaseInLoop();}}// 啟動非活躍連接超時釋放規則void EnableInactiveReleaseInLoop(int sec){// 1.將判斷標志 _enable_inactive_release 置為true_enable_inactive_release = true;// 2.如果當前定時銷毀任務已經存在,那就刷新一下延遲即可if (_loop->HasTimer(_conn_id)){return _loop->TimerRefresh(_conn_id);}// 3.如果不存在定時銷毀任務,則新增_loop->TimerAdd(_conn_id, sec, std::bind(&Connection::ReleaseInLoop, this));}void CancelInactiveReleaseInLoop(){_enable_inactive_release = false;if (_loop->HasTimer(_conn_id)){_loop->TimerCancel(_conn_id);}}void UpgradeInLoop(const Any &context,const ConnectedCallback &conn,const MessageCallback &msg,const ClosedCallback &closed,const AnyEventCallback &event){_context = context;_connected_callback = conn;_message_callback = msg;_closed_callback = closed;_event_callback = event;}public:Connection(EventLoop *loop, uint64_t conn_id, int sockfd) : _conn_id(conn_id), _sockfd(sockfd),_enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd),_channel(loop, _sockfd){_channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));_channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));_channel.SetReadCallback(std::bind(&Connection::HandleRead, this));_channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));_channel.SetErrorCallback(std::bind(&Connection::HandleError, this));}~Connection() { DBG_LOG("RELEASE CONNECTION:%p", this); }// 獲取管理的文件描述符int Fd() { return _sockfd; }// 獲取連接IDint Id() { return _conn_id; }// 是否處于CONNECTED狀態bool Connected() { return (_statu == CONNECTED); }// 設置上下文--連接建立完成時進行調用void SetContext(const Any &context) { _context = context; }// 獲取上下文,返回的是指針Any *GetContext() { return &_context; }void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; }void SetSrvClosedCallback(const ClosedCallback &cb) { _server_closed_callback = cb; }// 連接建立就緒后,進行channel回調設置,啟動讀監控,調用_connect_callbackvoid Established(){_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));}// 發送數據,將數據發送到發送緩沖區,啟動寫事件監控void Send(const char *data, size_t len){// 外界傳入的data,可能是個臨時空間,我們現在只是把發送操作壓入了任務池,有可能并沒有被執行// 因此有可能執行的時候,data指向的空間有可能已經被釋放了Buffer buf;buf.WriteAndPush(data, len);_loop->RunInLoop(std::bind(&Connection::SendInLoop, this, buf));}// 提供給組件使用者的關閉接口--并不實際關閉,需要判斷有沒有數據待處理void Shutdown(){_loop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this));}// 啟動非活躍銷毀,并定義多長時間無通信就是非活躍,添加定時任務void EnableInactiveRelease(int sec){_loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec));}// 取消非活躍銷毀void CancelInactiveRelease(){_loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop, this));}// 切換協議--重置上下文以及階段性處理函數--而是這個接口必須在EventLoop線程中立即執行// 防備新的事件觸發后,處理的時候,切換任務還沒有被執行--會導致數據使用原協議處理了void Upgrade(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg,const ClosedCallback &closed, const AnyEventCallback &event){_loop->AssertInLoop();_loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn, msg, closed, event));}
};// 監聽套接字管理類
class Acceptor
{
private:Socket _socket;   // 用于創建監聽套接字EventLoop *_loop; // 用于對監聽套接字進行事件監控Channel _channel; // 用于對監控套接字進行事件管理using AcceptCallback = std::function<void(int)>;AcceptCallback _accept_callback;private:/*監聽套接字的讀事件回調處理函數 -- 獲取新連接,調用_accept_callback函數進行新連接管理*/void HandleRead(){int newfd = _socket.Accept();if (newfd < 0){return;}if (_accept_callback)_accept_callback(newfd);}int CreateServer(int port){bool ret = _socket.CreateServer(port);assert(ret == true);return _socket.Fd();}public:/* 不能將啟動讀監控,放到構造函數中,必須在設置回調函數后,再去啟動*//* 否則有可能造成啟動監控后,立即有事件,處理的時候回調函數還沒有設置:新連接得不到處理,且資源泄漏*/Acceptor(EventLoop *loop, int port) : _socket(CreateServer(port)), _loop(loop),_channel(loop, _socket.Fd()){_channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));}void SetAcceptCallback(const AcceptCallback &cb) { _accept_callback = cb; }void Listen() { _channel.EnableRead(); }
};// TcpServer服務器管理模塊(即全部模塊的整合)
class TcpServer
{
private:uint64_t _next_id; // 這是一個自動增長的連接IDint _port;int _timeout;                                       // 這是非活躍連接的統計時間--多長時間無通信就是非活躍連接bool _enable_inactive_release;                      // 是否啟動非活躍連接超時銷毀的判斷標志EventLoop _baseloop;                                // 這是主線程的EventLoop對象,負責監聽事件的處理Acceptor _acceptor;                                 // 這是監聽套接字的管理對象LoopThreadPool _pool;                               // 這是從屬EventLoop線程池std::unordered_map<uint64_t, PtrConnection> _conns; // 保管所有連接對應的share_ptr對象,這里面的對象被刪除,就意味這某一個連接被刪除using ConnectedCallback = std::function<void(const PtrConnection &)>;using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection &)>;using AnyEventCallback = std::function<void(const PtrConnection &)>;using Functor = std::function<void()>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;private:void RunAfterInLoop(const Functor &task, int delay){_next_id++;_baseloop.TimerAdd(_next_id, delay, task);}// 為新連接構造一個Connection進行管理void NewConnection(int fd){_next_id++;PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));conn->SetMessageCallback(_message_callback);conn->SetClosedCallback(_closed_callback);conn->SetConnectedCallback(_connected_callback);conn->SetAnyEventCallback(_event_callback);conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));if (_enable_inactive_release)conn->EnableInactiveRelease(10); // 啟動非活躍超時銷毀conn->Established();                 // 就緒初始化_conns.insert(std::make_pair(_next_id, conn));}void RemoveConnectionInLoop(const PtrConnection &conn){int id = conn->Id();auto it = _conns.find(id);if (it != _conns.end()){_conns.erase(it);}}// 從管理Connection的_conns移除連接信息void RemoveConnection(const PtrConnection &conn){_baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn));}public:TcpServer(int port) : _port(port),_next_id(0),_enable_inactive_release(false),_acceptor(&_baseloop, port),_pool(&_baseloop){_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));_acceptor.Listen(); // 將監聽套接字掛到baseloop上}void SetThreadCount(int count) { return _pool.SetThreadCount(count); }void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; }void EnableInactiveRelease(int timeout) { _timeout = timeout, _enable_inactive_release = true; }// 用于添加一個定時任務void RunAfter(const Functor &task, int delay){_baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay));}void Start(){_pool.Create(); // 創建線程池中的從屬線程_baseloop.Start();}
};// 移除監控
void Channel::Remove() { return _loop->RemoveEvent(this); }
void Channel::Update() { return _loop->UpdateEvent(this); }
void TimerWheel::TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb)
{_loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, delay, cb));
}
void TimerWheel::TimerRefresh(uint64_t id)
{_loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));
}
void TimerWheel::TimerCancel(uint64_t id)
{_loop->RunInLoop(std::bind(&TimerWheel::TimerCancelInLoop, this, id));
}// 忽略SIGPIPE信號,當連接斷開的時候,如果我們繼續向對端send發送信息,就會觸發異常,即SIGPIPE異常,這個就是導致客戶端異常退出的原因
class NetWork{public:NetWork(){DBG_LOG("SIGPIPE INIT");signal(SIGPIPE, SIG_IGN); // 忽視SIGPIPE異常,這個會導致進程退出}
};
static NetWork nw;  // 這個是為了執行里面的構造函數
// 預編譯是為了防止頭文件重復包含
#endif

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/711914.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/711914.shtml
英文地址,請注明出處:http://en.pswp.cn/news/711914.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Linux學習-C語言-運算符

目錄 算術運算符&#xff1a; - * /:不能除0 %:不能對浮點數操作 &#xff1a;自增與運算符 i&#xff1a;先用再加 i:先加再用 --&#xff1a;自減運算符 常量&#xff0c;表達式不可以&#xff0c;--&#xff0c;變量可以 賦值運算符 三目運算符 逗號表達式 size…

alpine創建lnmp環境alpine安裝nginx+php5.6+mysql

前言 制作lnmp環境&#xff0c;你可以在alpine基礎鏡像中安裝相關的服務&#xff0c;也可以直接使用Dockerfile創建自己需要的環境鏡像。 注意&#xff1a;提前確認自己的alpine版本&#xff0c;本次創建基于alpine3.6進行創建&#xff0c;官方在一些版本中刪除了php5 1、拉取…

JS正則02——js正則表達式中常用的方法、常見修飾符的使用詳解以及各種方法使用情況示例

JS正則02——js正則表達式中常用的方法、常見修飾符的使用詳解以及各種方法使用情況示例 1. 前言1.1 簡介1.2 js正則特殊字符即使用示例 2. 創建正則表達式的方式2.1 兩種創建正則表達式的方式2.2 關于修飾符 3. 正則表達式中常用的方法3.1 test() 方法——正則表達式對象的方法…

Vue之監測數據的原理(對象)

大家有沒有想過&#xff0c;為什么vue可以監測到數據發生改變&#xff1f;其實底層借助了Object.defineProperty&#xff0c;底層有一個Observer的構造函數 讓我為大家簡單的介紹一下吧&#xff01; 我用對象為大家演示一下 const vm new Vue({el: "#app",data: {ob…

Python列表操作函數

在Python中&#xff0c;列表&#xff08;list&#xff09;是一種可變的數據類型&#xff0c;它包含一系列有序的元素。Python提供了一系列內置的函數和方法來操作列表。以下是一些常用的Python列表操作函數和方法&#xff1a; 列表方法 append(x) 將元素x添加到列表的末尾。 …

文獻速遞:帕金森的疾病分享--多模態機器學習預測帕金森病

文獻速遞&#xff1a;帕金森的疾病分享–多模態機器學習預測帕金森病 Title 題目 Multi-modality machine learning predicting Parkinson’s disease 多模態機器學習預測帕金森病 01 文獻速遞介紹 對于漸進性神經退行性疾病&#xff0c;早期和準確的診斷是有效開發和使…

Linux按鍵輸入實驗-對按鍵驅動進行測試

一. 簡介 前面學習在設備樹文件中創建按鍵的設備節點,并實現對按鍵驅動代碼的編寫,文章地址如下:Linux按鍵輸入實驗-創建按鍵的設備節點-CSDN博客Linux按鍵輸入實驗-按鍵的字符設備驅動代碼框架-CSDN博客Linux按鍵輸入實驗-按鍵的GPIO初始化-CSDN博客 本文對所實現的按鍵驅…

【精品】集合list去重

示例一&#xff1a;對于簡單類型&#xff0c;比如String public static void main(String[] args) {List<String> list new ArrayList< >();list.add("aaa");list.add("bbb");list.add("bbb");list.add("ccc");list.add(…

網絡工程師必備的網絡端口大全(建議收藏)

端口是一種數字標識&#xff0c;用于在計算機網絡中進行通信&#xff0c;你完全可以把端口簡單的理解為是計算機和外界通訊交流的出口。但在網絡技術中&#xff0c;端口一般有兩種含義&#xff1a; &#xff08;1&#xff09;硬件設備中的端口 如交換機、路由器中用于鏈接其他…

用stream流將list轉為map

用stream流將list轉為map 1、將list轉為Map<Long, List> 按照spaceId分組&#xff0c;spaceId相同的為一組數據&#xff1a; List<BasEvaluationPriceResultDto> list new ArrayList(); Map<Long, List<BasEvaluationPriceResultDto>> priceResult…

“金三銀四”招聘季,大廠爭招鴻蒙人才

在金三銀四的招聘季中&#xff0c;各大知名互聯網企業紛紛加入了對鴻蒙人才的爭奪戰。近日&#xff0c;包括淘寶、京東、得物等在內的知名APP均宣布啟動鴻蒙星河版原生應用開發計劃。這一舉措不僅彰顯了鴻蒙生態系統的迅猛發展&#xff0c;還催生了人才市場的繁榮景象。據數據顯…

遙感影像處理(ENVI+ChatGPT+python+ GEE)處理高光譜及多光譜遙感數據

遙感技術主要通過衛星和飛機從遠處觀察和測量我們的環境&#xff0c;是理解和監測地球物理、化學和生物系統的基石。ChatGPT是由OpenAI開發的最先進的語言模型&#xff0c;在理解和生成人類語言方面表現出了非凡的能力。本文重點介紹ChatGPT在遙感中的應用&#xff0c;人工智能…

vue3學習 【4】ref和reactive的使用并于ts結合

使用ref聲明一個響應式對象并使用 <script lang"ts" setup> import { ref } from vue; const message ref("HelloWorld") message.value"被修改了啊~~" </script> <template>{{ message }} </template>ref() 接收參數…

Vue——攜帶參數跳轉路由

Vue學習之——跳轉路由 前情回顧 當我們進行點擊修改時&#xff0c;會進行跳轉到修改頁面&#xff0c;為了完成回顯數據&#xff08;根據對應id查找&#xff09;&#xff0c;我們需要攜帶對應選擇中的id跳轉到修改頁面&#xff0c;讓其進行查找回顯 學習useRoute和useRoute…

webstorm2023.3.4安裝與破解

WebStorm安裝步驟 打開JetBrains官方網站&#xff08;https://www.jetbrains.com/webstorm/&#xff09; 運行.exe 選擇安裝路徑 第一個意思是是否創建桌面快捷方式&#xff0c;可根據需要選擇&#xff1b;第二個.js .css .html勾選后之后js css html文件默認會用webstor…

AI Agent

目錄 一、什么是Agent 二、什么是MetaGPT【多智能體框架介紹】 三、MetaGPT的背景 一、什么是Agent 智能體 LLM觀察思考行動記憶 Agent&#xff08;智能體&#xff09; 一個設置了一些目標或任務&#xff0c;可以迭代運行的大型語言模型。這與大型語言模型&#xff08;LLM&am…

985機械研一轉碼,java還是c++?

985機械研一轉碼&#xff0c;java還是c&#xff1f; 在開始前我分享下我的經歷&#xff0c;我剛入行時遇到一個好公司和師父&#xff0c;給了我機會&#xff0c;一年時間從3k薪資漲到18k的&#xff0c; 我師父給了一些 電氣工程師學習方法和資料&#xff0c;讓我不斷提升自己&…

一鍵下載電路(for STM32 and mcuisp)

一鍵下載電路 1. STM32 一鍵下載電路2. 燒錄軟件&#xff1a;mcuisp和FlyMcu下載3. 遇到問題 1. STM32 一鍵下載電路 博文連接 2. 燒錄軟件&#xff1a;mcuisp和FlyMcu下載 mcuisp和FlyMcu下載 3. 遇到問題 按如上博文電路設計&#xff0c;上電發現程序沒有進入 main() 函…

【OpenGL的著色器03】內置變量(gl_Position等)

目錄 一、說明 二、著色器的變量 2.1 著色器變量 2.2 著色器內置變量 三、最常見內置變量使用范例 3.1 常見著色器變量 3.2 示例1&#xff1a; gl_PointSize 3.3 示例2&#xff1a;gl_Position 3.4 gl_FragColor 3.5 渲染點片元坐標gl_PointCoord 3.6 gl_PointCoo…

【PyTorch][chapter 20][李宏毅深度學習]【無監督學習][ GAN]【實戰】

前言 本篇主要是結合手寫數字例子,結合PyTorch 介紹一下Gan 實戰 第一輪訓練效果 第20輪訓練效果,已經可以生成數字了 68 輪 目錄&#xff1a; 谷歌云服務器&#xff08;Google Colab&#xff09; 整體訓練流程 Python 代碼 一 谷歌云服務器&#xff08;Google Colab&…