🌏博客主頁:PH_modest的博客主頁
🚩當前專欄:Linux跬步積累
💌其他專欄:
🔴 每日一題
🟡 C++跬步積累
🟢 C語言跬步積累
🌈座右銘:廣積糧,緩稱王!
文章目錄
- 一、TCP socket API
- 二、TCP API 使用
- 1、服務端創建套接字
- 2、服務端綁定
- 3、服務端監聽
- 4、服務端獲取連接
- 5、服務端接收連接測試
- echo server
- 多進程版本
- `TcpServer.hpp`
- `Main.cc`
- `MainClient.cc`
- 運行效果圖
- 多線程版本
- `TcpServer.hpp`
- `Main.cc`
- `MainClient.cc`
- 運行效果圖
- 線程池版本
- `TcpServer.hpp`
- `Main.cc`
- `MainClient.cc`
- `Thread.hpp`
- `ThreadPool.hpp`
- 運行效果圖
一、TCP socket API
下面介紹程序中用到的socket API,這些函數都在sys/socket.h中。
//創建socket文件描述符(TCP/UDP,客戶端 + 服務器)
int socket(int domain,int type,int protocol);//綁定端口號(TCP/UDP,服務器)
int bind(int socket,const struct sockaddr *address,socklen_t address_len);//開始監聽socket(TCP,服務器)
int listen(int socket,int backlog);//接收請求(TCP,服務器)
int accept(int socket,struct sockaddr* address,socklen_t* address_len);//建立連接(TCP,客戶端)
int connect(int sockfd,const struct sockaddr *addr,socklen_t addrlen);
二、TCP API 使用
1、服務端創建套接字
函數原型:
//創建socket文件描述符(TCP/UDP,客戶端 + 服務器)
int socket(int domain,int type,int protocol);
使用示例:
//創建文件描述符
_listensock = socket(AF_INET,SOCK_STREAM,0);
if(_listensock < 0)
{std::cerr<<"socket error!"<<std::endl;exit(1);
}
2、服務端綁定
函數原型:
//綁定端口號(TCP/UDP,服務器)
int bind(int socket,const struct sockaddr *address,socklen_t address_len);
使用示例:
//綁定
struct sockaddr_in local;
memset(&local,0,sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(_port);
local.sin_addr.s_addr = INADDR_ANY;if(bind(_listensock,(struct sockaddr*)&local,sizeof(local)) < 0)
{std::cerr<<"bind error!"<<std::endl;exit(2);
}
3、服務端監聽
函數原型:
//開始監聽socket(TCP,服務器)
int listen(int socket,int backlog);
使用示例:
//監聽
if(listen(_listensock,5) < 0)
{std::cerr<<"listen error!"<<std::endl;exit(3);
}
4、服務端獲取連接
函數原型:
//接收請求(TCP,服務器)
int accept(int socket,struct sockaddr* address,socklen_t* address_len);
參數說明:
- socket:特定的監聽套接字,表示從這個套接字獲取連接。
- address:對端網絡相關信息,包括協議家族、IP地址、端口號。
- address_len:這是一個輸入輸出型參數,調用時傳入期望讀取的長度,返回時表示實際讀取的長度。
accept函數返回的套接字是什么?和socket有什么區別?
accept函數獲取連接時,是從socket監聽套接字當中獲取的,如果accept獲取連接成功,此時就會返回接收到的套接字對應的文件描述符。
socket監聽套接字的作用是用來獲取客戶端發來的新的連接請求。accept會不斷從監聽套接字當中獲取新連接。
accept返回的套接字是為本次獲取到的連接提供服務的。監聽套接字是不斷獲取新的連接,真正為這些連接提供服務的是accept返回的套接字,而不是監聽套接字。
監聽套接字可以看成飯店門口拉客的員工,當你被她說服進店之后,會有新的服務員單獨為你提供服務,而這個新的服務員就是accept返回的套接字。
使用示例:
void Start()
{while(true){struct sockaddr_in peer;memset(&peer,0,sizeof(peer));socklen_t len = sizeof(peer);int sockfd = accept(_listensock,(struct sockaddr*)&peer,&len);if(sockfd < 0){std::cerr<<"accept error!"<<std::endl;continue;//這里不能直接退出,因為還需要獲取其他連接}std::string client_ip = inet_ntoa(peer.sin_addr);//將網絡序列轉換為主機序列,同時將整數ip變為字符串ipint client_port = ntohs(peer.sin_port);//將網絡序列轉換為主機序列std::cout<<"get a new link-->"<<sockfd<<"["<<client_ip<<"]:"<<client_port<<std::endl;}
}
5、服務端接收連接測試
我們現在做一個簡單的測試,測試一下當前服務器能否成功接受請求連接。
void Usage(char *proc)
{std::cout<<"Usage:\n\t"<<proc<<" local_port\n";
}//./tcpserver port
int main(int argc,char *argv[])
{if(argc != 2){Usage(argv[0]);exit(1);}int port = std::stoi(argv[1]);TcpServer* tsvr = new TcpServer(port);tsvr->InitServer();tsvr->Start();return 0;
}
我們編譯運行之后,可以通過netstat
命令來顯示網絡連接、路由表、接口統計等網絡相關信息。
echo server
多進程版本
TcpServer.hpp
#pragma once
#include"TcpServer.hpp"
#include<iostream>
#include<string>
#include<strings.h>
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<sys/wait.h>
#include"InetAddr.hpp"static const int gbacklog = 15;class TcpServer
{
public:TcpServer(uint16_t port):_port(port),_isrunning(false){}void InitServer(){//1. 創建流式套接字_listensock = socket(AF_INET,SOCK_STREAM,0);if(_listensock < 0){std::cerr<<"socket error!\n";exit(1);}std::cout<<"socket success,sockfd is:"<<_listensock<<std::endl;//2. bindstruct sockaddr_in local;bzero(&local,sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(_port);local.sin_addr.s_addr = INADDR_ANY;int n = ::bind(_listensock,(struct sockaddr*)&local,sizeof(local));if(n < 0){std::cerr<<"bind error!\n";exit(2);}std::cout<<"bind success,sockfd is:"<<_listensock<<std::endl;//3. tcp是面向連接的,所以通信之前,必須先建立連接,服務器是被連接的// tcpserver啟動,未來首先要一直等待客戶的連接到來n = listen(_listensock,gbacklog);if(n < 0){std::cerr<<"listen error!\n";exit(3);}std::cout<<"listen success,sockfd is:"<<_listensock<<std::endl;}void Service(int sockfd,InetAddr client){printf("get a new link,info %s:%d,fd:%d\n",client.IP().c_str(),client.Port(),sockfd);std::string clientaddr = "["+client.IP()+":"+std::to_string(client.Port())+"]# ";while(true){//讀取數據char inbuffer[1024];ssize_t n = read(sockfd,inbuffer,sizeof(inbuffer)-1);if(n > 0)//回寫{inbuffer[n] = 0;std::cout<<clientaddr<<inbuffer<<std::endl;std::string echo_string = "[server echo]# ";echo_string+=inbuffer;write(sockfd,echo_string.c_str(),echo_string.size());}else if(n == 0)//退出{//client 退出&&關閉鏈接了std::cout<<clientaddr<<" quit!\n";break;}else//報錯{std::cerr<<clientaddr<<"read error!\n";break;}}//一定要關閉,因為文件描述符表是一個數組,數組容量是有限的close(sockfd);//如果不關閉,會導致文件描述符泄漏問題}void Loop(){//4. 不能直接收數據,先獲取鏈接_isrunning = true;while(_isrunning){struct sockaddr_in peer;socklen_t len = sizeof(peer);int sockfd = accept(_listensock,(struct sockaddr*)&peer,&len);if(sockfd < 0){std::cerr<<"accept error!\n";continue;}//version 0 : 一次只能處理一個請求//Service(sockfd,InetAddr(peer));//version 1 :采用多進程pid_t id = fork();if(id == 0){//child 只關心sockfd,不關心listensock::close(_listensock);//建議關閉if(fork() > 0) exit(0);//創建的父進程退出Service(sockfd,InetAddr(peer));//孫子進程,他的父進程提前退出了,沒有等待,會變成孤兒進程,然后被系統領養exit(0);}//father 只關心listensock,不關心sockfd::close(sockfd);waitpid(id,nullptr,0);}_isrunning = false;}~TcpServer(){if(_listensock > -1){close(_listensock);}}
private:int _listensock;uint16_t _port;bool _isrunning;
};
Main.cc
#include"TcpServer.hpp"
#include<iostream>
#include<memory>void Usage(char *proc)
{std::cout<<"Usage:\n\t"<<proc<<" local_port\n";
}//./tcpserver port
int main(int argc,char *argv[])
{if(argc != 2){Usage(argv[0]);}uint16_t port = std::stoi(argv[1]);std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(port);tsvr->InitServer();tsvr->Loop();return 0;
}
MainClient.cc
#include<iostream>
#include<string>
#include<cstdlib>
#include<cstring>
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>void Usage(std::string proc)
{std::cout<<"Usage:\n\t"<<proc<<" local_ip local_port\n";
}//./udpclient serverip serverport
int main(int argc,char *argv[])
{if(argc != 3){Usage(argv[0]);exit(1);}std::string serverip = argv[1];uint16_t serverport = std::stoi(argv[2]);//1. 創建套接字int sockfd = socket(AF_INET,SOCK_STREAM,0);if(sockfd < 0){std::cerr<<"socket error"<<std::endl;exit(2);}//2. 發起連接struct sockaddr_in server;memset(&server,0,sizeof(server));server.sin_family = AF_INET;server.sin_port = htons(serverport);server.sin_addr.s_addr = inet_addr(serverip.c_str());int n = connect(sockfd,(struct sockaddr*)&server,sizeof(server));if(n < 0){std::cerr<<"connect error!\n";exit(3);}while(true){std::cout<<"Please Enter# ";std::string outstring;getline(std::cin,outstring);ssize_t n = send(sockfd,outstring.c_str(),outstring.size(),0);//writeif(n > 0){char inbuffer[1024];ssize_t m = recv(sockfd,&inbuffer,sizeof(inbuffer)-1,0);if(m > 0){inbuffer[m] = 0;std::cout<<inbuffer<<std::endl;}else{break;}}else{break;}}close(sockfd); return 0;
}
運行效果圖
多線程版本
TcpServer.hpp
#pragma once
#include "TcpServer.hpp"
#include <iostream>
#include <string>
#include <strings.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include "InetAddr.hpp"static const int gbacklog = 15;
class TcpServer;class ThreadData
{
public:ThreadData(int fd,InetAddr addr,TcpServer *s):sockfd(fd),clientaddr(addr),self(s){}
public:int sockfd;InetAddr clientaddr;TcpServer *self;
};class TcpServer
{
public:TcpServer(uint16_t port) : _port(port), _isrunning(false){}void InitServer(){// 1. 創建流式套接字_listensock = socket(AF_INET, SOCK_STREAM, 0);if (_listensock < 0){std::cerr << "socket error!\n";exit(1);}std::cout << "socket success,sockfd is:" << _listensock << std::endl;// 2. bindstruct sockaddr_in local;bzero(&local, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(_port);local.sin_addr.s_addr = INADDR_ANY;int n = ::bind(_listensock, (struct sockaddr *)&local, sizeof(local));if (n < 0){std::cerr << "bind error!\n";exit(2);}std::cout << "bind success,sockfd is:" << _listensock << std::endl;// 3. tcp是面向連接的,所以通信之前,必須先建立連接,服務器是被連接的// tcpserver啟動,未來首先要一直等待客戶的連接到來n = listen(_listensock, gbacklog);if (n < 0){std::cerr << "listen error!\n";exit(3);}std::cout << "listen success,sockfd is:" << _listensock << std::endl;}void Service(int sockfd, InetAddr client){printf("get a new link,info %s:%d,fd:%d\n", client.IP().c_str(), client.Port(), sockfd);std::string clientaddr = "[" + client.IP() + ":" + std::to_string(client.Port()) + "]# ";while (true){// 讀取數據char inbuffer[1024];ssize_t n = read(sockfd, inbuffer, sizeof(inbuffer) - 1);if (n > 0) // 回寫{inbuffer[n] = 0;std::cout << clientaddr << inbuffer << std::endl;std::string echo_string = "[server echo]# ";echo_string += inbuffer;write(sockfd, echo_string.c_str(), echo_string.size());}else if (n == 0) // 退出{// client 退出&&關閉鏈接了std::cout << clientaddr << " quit!\n";break;}else // 報錯{std::cerr << clientaddr << "read error!\n";break;}}// 一定要關閉,因為文件描述符表是一個數組,數組容量是有限的close(sockfd); // 如果不關閉,會導致文件描述符泄漏問題}static void* HandlerSock(void *args){pthread_detach(pthread_self());ThreadData* data = static_cast<ThreadData*>(args);data->self->Service(data->sockfd,data->clientaddr);delete data;return nullptr;}void Loop(){// 4. 不能直接收數據,先獲取鏈接_isrunning = true;while (_isrunning){struct sockaddr_in peer;socklen_t len = sizeof(peer);int sockfd = accept(_listensock, (struct sockaddr *)&peer, &len);if (sockfd < 0){std::cerr << "accept error!\n";continue;}// version 0 : 一次只能處理一個請求// Service(sockfd,InetAddr(peer));// version 1 :采用多進程// pid_t id = fork();// if(id == 0)// {// //child 只關心sockfd,不關心listensock// ::close(_listensock);//建議關閉// if(fork() > 0) exit(0);//創建的父進程退出// Service(sockfd,InetAddr(peer));//孫子進程,他的父進程提前退出了,沒有等待,會變成孤兒進程,然后被系統領養// exit(0);// }// //father 只關心listensock,不關心sockfd// ::close(sockfd);// waitpid(id,nullptr,0);// version 2:采用多線程pthread_t t;//線程之間共享文件描述符表ThreadData *data = new ThreadData(sockfd,InetAddr(peer),this);pthread_create(&t,nullptr,HandlerSock,data);}_isrunning = false;}~TcpServer(){if (_listensock > -1){close(_listensock);}}private:int _listensock;uint16_t _port;bool _isrunning;
};
Main.cc
#include"TcpServer.hpp"
#include<iostream>
#include<memory>void Usage(char *proc)
{std::cout<<"Usage:\n\t"<<proc<<" local_port\n";
}//./tcpserver port
int main(int argc,char *argv[])
{if(argc != 2){Usage(argv[0]);}uint16_t port = std::stoi(argv[1]);std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(port);tsvr->InitServer();tsvr->Loop();return 0;
}
MainClient.cc
#include<iostream>
#include<string>
#include<cstdlib>
#include<cstring>
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>void Usage(std::string proc)
{std::cout<<"Usage:\n\t"<<proc<<" local_ip local_port\n";
}//./udpclient serverip serverport
int main(int argc,char *argv[])
{if(argc != 3){Usage(argv[0]);exit(1);}std::string serverip = argv[1];uint16_t serverport = std::stoi(argv[2]);//1. 創建套接字int sockfd = socket(AF_INET,SOCK_STREAM,0);if(sockfd < 0){std::cerr<<"socket error"<<std::endl;exit(2);}//2. 發起連接struct sockaddr_in server;memset(&server,0,sizeof(server));server.sin_family = AF_INET;server.sin_port = htons(serverport);server.sin_addr.s_addr = inet_addr(serverip.c_str());int n = connect(sockfd,(struct sockaddr*)&server,sizeof(server));if(n < 0){std::cerr<<"connect error!\n";exit(3);}while(true){std::cout<<"Please Enter# ";std::string outstring;getline(std::cin,outstring);ssize_t n = send(sockfd,outstring.c_str(),outstring.size(),0);//writeif(n > 0){char inbuffer[1024];ssize_t m = recv(sockfd,&inbuffer,sizeof(inbuffer)-1,0);if(m > 0){inbuffer[m] = 0;std::cout<<inbuffer<<std::endl;}else{break;}}else{break;}}close(sockfd); return 0;
}
運行效果圖
線程池版本
TcpServer.hpp
#pragma once
#include "TcpServer.hpp"
#include <iostream>
#include <string>
#include <strings.h>
#include <unistd.h>
#include <pthread.h>
#include <functional>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include "InetAddr.hpp"
#include "ThreadPool.hpp"static const int gbacklog = 15;
class TcpServer;class ThreadData
{
public:ThreadData(int fd,InetAddr addr,TcpServer *s):sockfd(fd),clientaddr(addr),self(s){}
public:int sockfd;InetAddr clientaddr;TcpServer *self;
};using task_t = std::function<void()>;class TcpServer
{
public:TcpServer(uint16_t port) : _port(port), _isrunning(false){}void InitServer(){// 1. 創建流式套接字_listensock = socket(AF_INET, SOCK_STREAM, 0);if (_listensock < 0){std::cerr << "socket error!\n";exit(1);}std::cout << "socket success,sockfd is:" << _listensock << std::endl;// 2. bindstruct sockaddr_in local;bzero(&local, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(_port);local.sin_addr.s_addr = INADDR_ANY;int n = ::bind(_listensock, (struct sockaddr *)&local, sizeof(local));if (n < 0){std::cerr << "bind error!\n";exit(2);}std::cout << "bind success,sockfd is:" << _listensock << std::endl;// 3. tcp是面向連接的,所以通信之前,必須先建立連接,服務器是被連接的// tcpserver啟動,未來首先要一直等待客戶的連接到來n = listen(_listensock, gbacklog);if (n < 0){std::cerr << "listen error!\n";exit(3);}std::cout << "listen success,sockfd is:" << _listensock << std::endl;}void Service(int sockfd, InetAddr client){printf("get a new link,info %s:%d,fd:%d\n", client.IP().c_str(), client.Port(), sockfd);std::string clientaddr = "[" + client.IP() + ":" + std::to_string(client.Port()) + "]# ";while (true){// 讀取數據char inbuffer[1024];ssize_t n = read(sockfd, inbuffer, sizeof(inbuffer) - 1);if (n > 0) // 回寫{inbuffer[n] = 0;std::cout << clientaddr << inbuffer << std::endl;std::string echo_string = "[server echo]# ";echo_string += inbuffer;write(sockfd, echo_string.c_str(), echo_string.size());}else if (n == 0) // 退出{// client 退出&&關閉鏈接了std::cout << clientaddr << " quit!\n";break;}else // 報錯{std::cerr << clientaddr << "read error!\n";break;}}// 一定要關閉,因為文件描述符表是一個數組,數組容量是有限的close(sockfd); // 如果不關閉,會導致文件描述符泄漏問題}static void* HandlerSock(void *args){pthread_detach(pthread_self());ThreadData* data = static_cast<ThreadData*>(args);data->self->Service(data->sockfd,data->clientaddr);delete data;return nullptr;}void Loop(){// 4. 不能直接收數據,先獲取鏈接_isrunning = true;while (_isrunning){struct sockaddr_in peer;socklen_t len = sizeof(peer);int sockfd = accept(_listensock, (struct sockaddr *)&peer, &len);if (sockfd < 0){std::cerr << "accept error!\n";continue;}// version 0 : 一次只能處理一個請求// Service(sockfd,InetAddr(peer));// version 1 :采用多進程// pid_t id = fork();// if(id == 0)// {// //child 只關心sockfd,不關心listensock// ::close(_listensock);//建議關閉// if(fork() > 0) exit(0);//創建的父進程退出// Service(sockfd,InetAddr(peer));//孫子進程,他的父進程提前退出了,沒有等待,會變成孤兒進程,然后被系統領養// exit(0);// }// //father 只關心listensock,不關心sockfd// ::close(sockfd);// waitpid(id,nullptr,0);// version 2:采用多線程// pthread_t t;//線程之間共享文件描述符表// ThreadData *data = new ThreadData(sockfd,InetAddr(peer),this);// pthread_create(&t,nullptr,HandlerSock,data);// version 3:采用線程池task_t task = std::bind(&TcpServer::Service,this,sockfd,InetAddr(peer));ThreadPool<task_t>::GetInstance()->EnQueue(task);}_isrunning = false;}~TcpServer(){if (_listensock > -1){close(_listensock);}}private:int _listensock;uint16_t _port;bool _isrunning;
};
Main.cc
#include"TcpServer.hpp"
#include<iostream>
#include<memory>void Usage(char *proc)
{std::cout<<"Usage:\n\t"<<proc<<" local_port\n";
}//./tcpserver port
int main(int argc,char *argv[])
{if(argc != 2){Usage(argv[0]);}uint16_t port = std::stoi(argv[1]);std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(port);tsvr->InitServer();tsvr->Loop();return 0;
}
MainClient.cc
#include<iostream>
#include<string>
#include<cstdlib>
#include<cstring>
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>void Usage(std::string proc)
{std::cout<<"Usage:\n\t"<<proc<<" local_ip local_port\n";
}//./udpclient serverip serverport
int main(int argc,char *argv[])
{if(argc != 3){Usage(argv[0]);exit(1);}std::string serverip = argv[1];uint16_t serverport = std::stoi(argv[2]);//1. 創建套接字int sockfd = socket(AF_INET,SOCK_STREAM,0);if(sockfd < 0){std::cerr<<"socket error"<<std::endl;exit(2);}//2. 發起連接struct sockaddr_in server;memset(&server,0,sizeof(server));server.sin_family = AF_INET;server.sin_port = htons(serverport);server.sin_addr.s_addr = inet_addr(serverip.c_str());int n = connect(sockfd,(struct sockaddr*)&server,sizeof(server));if(n < 0){std::cerr<<"connect error!\n";exit(3);}while(true){std::cout<<"Please Enter# ";std::string outstring;getline(std::cin,outstring);ssize_t n = send(sockfd,outstring.c_str(),outstring.size(),0);//writeif(n > 0){char inbuffer[1024];ssize_t m = recv(sockfd,&inbuffer,sizeof(inbuffer)-1,0);if(m > 0){inbuffer[m] = 0;std::cout<<inbuffer<<std::endl;}else{break;}}else{break;}}close(sockfd); return 0;
}
Thread.hpp
#pragma once
#include<string>
#include<pthread.h>
#include<unistd.h>
#include<iostream>
#include<functional>namespace MyThread
{template<class T>using func_t = std::function<void(T&)>;//模版方法template<class T>class Thread{public://thread(func,5,"thread-1");Thread(func_t<T> func,const T &data,const std::string &name = "none-name"):_func(func),_data(data),_threadname(name){}//需要設置成static靜態成員函數,否則參數會多一個this指針,就不符合pthread_create的要求了static void* ThreadRoutinue(void* args){//將傳過來的this指針強轉一下,然后就可以訪問到_func和_data了Thread<T>* self = static_cast<Thread<T>*>(args);self->_func(self->_data);//這里調用的_func是線程池中的HanderTask方法return nullptr;}bool Start(){//創建線程int ret = pthread_create(&_tid,nullptr,ThreadRoutinue,this);return ret==0;}void Join(){pthread_join(_tid,nullptr);}void Detach(){pthread_detach(_tid);}~Thread(){}private:pthread_t _tid; //線程tidstd::string _threadname; //線程名func_t<T> _func; //線程執行的函數T _data; //需要處理的數據};
}
ThreadPool.hpp
#include"Thread.hpp"
#include<vector>
#include<queue>
#include<string>
#include <unistd.h>
#include <pthread.h>template<class T>
class ThreadPool
{
public:ThreadPool(const int num = 5):_threadNum(num),_waitNum(0),_isRunning(false){pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_cond,nullptr);}void HanderTask(std::string name){//子線程需要一直處理,所以這里使用死循環while(true){pthread_mutex_lock(&_mutex);while(_taskQueue.empty()&&_isRunning)//這里是while循環,不是if判斷,避免偽喚醒{_waitNum++;pthread_cond_wait(&_cond,&_mutex);_waitNum--;}//線程池終止了,并且任務隊列中沒有任務 --> 線程退出if(_taskQueue.empty()&&!_isRunning){pthread_mutex_unlock(&_mutex);std::cout<<name<<" quit..."<<std::endl;break;}//走到這里無論線程池是否終止,都一定還有任務要執行,將任務執行完再退出T task = _taskQueue.front();_taskQueue.pop();std::cout<<name<<" get a task..."<<std::endl;pthread_mutex_unlock(&_mutex);task();}}void ThreadInit(){for(int i=0;i<_threadNum;i++){auto func = bind(&ThreadPool::HanderTask,this,std::placeholders::_1);std::string name = "Thread-"+std::to_string(i);//_threads.push_back(HanderTask,name,name);//第一個name是handerTask的參數,第二個name是Thread內部的成員_threads.emplace_back(func,name,name);}_isRunning = true;}void StartAll(){for(auto& thread : _threads){thread.Start();}}void JoinAll(){for(auto& thread : _threads){thread.Join();}}void EnQueue(const T& task){pthread_mutex_lock(&_mutex);if(_isRunning){_taskQueue.push(task);if(_waitNum > 0){pthread_cond_signal(&_cond);}}pthread_mutex_unlock(&_mutex);}void Stop(){pthread_mutex_lock(&_mutex);_isRunning = false;//終止線程池pthread_cond_broadcast(&_cond);//喚醒所有等待的線程pthread_mutex_unlock(&_mutex);}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}
public:static ThreadPool<T> *GetInstance(){if(nullptr == _instance){pthread_mutex_lock(&_lock);if(nullptr == _instance){_instance = new ThreadPool<T>();_instance->ThreadInit();_instance->StartAll();std::cout<<"創建線程池單例!\n";return _instance;}pthread_mutex_unlock(&_lock);}std::cout<<"獲取線程池樣例!\n";return _instance;}
private:std::vector<MyThread::Thread<std::string>> _threads;//用數組管理多個線程std::queue<T> _taskQueue;//任務隊列int _threadNum;//線程數int _waitNum;//等待的線程數bool _isRunning;//線程池是否在運行pthread_mutex_t _mutex;//互斥鎖pthread_cond_t _cond;//條件變量//添加單例模式static ThreadPool<T> *_instance;static pthread_mutex_t _lock;
};template<class T>
ThreadPool<T> *ThreadPool<T>::_instance = nullptr;template<class T>
pthread_mutex_t ThreadPool<T>::_lock = PTHREAD_MUTEX_INITIALIZER;