1. 概述
本文實現一個基于TCP/IP的簡單多人聊天室程序。它包含一個服務器端和一個客戶端:服務器能夠接收多個客戶端的連接,并將任何一個客戶端發來的消息廣播給所有其他連接的客戶端;客戶端則可以連接到服務器,發送消息并接收來自其他人的消息。該Demo運用了網絡編程(Socket API)、多線程(Pthreads)以及線程同步(互斥鎖)技術,以實現并發處理和數據共享安全。
2. 核心技術
-
網絡編程(Sockets)
- TCP/IP: 選擇面向連接的TCP協議,保證數據傳輸的可靠性。
- 服務器端流程:
socket()
: 創建套接字。memset()
/struct sockaddr_in
: 配置服務器地址和端口。bind()
: 綁定套接字到指定地址和端口。listen()
: 設置套接字為監聽狀態,等待連接。accept()
: 接受客戶端連接,為每個連接創建一個新的套接字。
- 客戶端流程:
socket()
: 創建套接字。memset()
/struct sockaddr_in
: 配置服務器地址和端口。connect()
: 連接到服務器。
- 數據傳輸:
read()
和write()
用于雙向通信。
-
多線程 (Pthreads)
- 服務器端:
- 主線程負責
accept()
連接。 - 每接受一個新客戶端,使用
pthread_create()
創建一個新的處理線程 (handle_clnt
)。 - 使用
pthread_detach()
將子線程設置為分離狀態,使其結束后資源能自動回收,主線程無需join
。
- 主線程負責
- 客戶端:
- 創建兩個核心線程:
send_msg
線程:負責獲取用戶鍵盤輸入并將其發送到服務器。recv_msg
線程:負責接收服務器廣播的消息并顯示在控制臺。
- 這種設計使得用戶輸入和消息接收可以并行進行,互不阻塞。
- 創建兩個核心線程:
- 服務器端:
-
線程同步 (Mutex)
- 場景: 服務器端多個
handle_clnt
線程會并發訪問和修改共享資源(如客戶端套接字數組clnt_socks
和當前客戶端計數clnt_cnt
)。 - 機制: 使用互斥鎖 (
mutx
) 保護這些臨界區。pthread_mutex_init()
: 初始化互斥鎖。pthread_mutex_lock()
: 在訪問共享資源前加鎖。pthread_mutex_unlock()
: 訪問完畢后解鎖。
- 關鍵操作加鎖:
- 添加新客戶端到
clnt_socks
。 - 從
clnt_socks
移除斷開連接的客戶端。 send_msg
(服務器端廣播函數) 遍歷clnt_socks
時。
- 添加新客戶端到
- 場景: 服務器端多個
3. 主要模塊實現
A. 服務器端 (server
)
main()
函數:- 參數解析 (端口號)。
- 初始化互斥鎖。
- 完成socket的創建、綁定、監聽。
- 進入無限循環,通過
accept()
接收客戶端連接。 - 為每個連接創建
handle_clnt
線程并分離。
handle_clnt(void* arg)
函數:- 獲取傳遞過來的客戶端套接字。
- 循環調用
read()
接收該客戶端的消息。 - 若
read()
成功,則調用send_msg()
(服務器的) 廣播此消息。 - 若
read()
返回0 (客戶端關閉連接),則執行清理:加鎖 -> 從clnt_socks
移除 ->clnt_cnt--
-> 解鎖 ->close()
該客戶端套接字。
send_msg(char* msg, int len)
函數 (服務器端):- 加鎖。
- 遍歷
clnt_socks
數組,將消息write()
給每一個已連接的客戶端。 - 解鎖。
B. 客戶端 (client
)
main()
函數:- 參數解析 (服務器IP, 端口號, 用戶名)。
- 創建socket并
connect()
到服務器。 - 創建
send_msg
和recv_msg
兩個線程。 pthread_join()
等待這兩個線程結束(雖然當前send_msg
中的exit(0)
會提前終止)。
send_msg(void* arg)
函數:- 循環獲取用戶標準輸入 (
fgets
)。 - 檢測到 "q" 或 "Q" 時,
close(sock)
并exit(0)
(可改進點)。 - 將用戶名和消息格式化后通過
write()
發送給服務器。
- 循環獲取用戶標準輸入 (
recv_msg(void* arg)
函數:- 循環調用
read()
從服務器接收消息。 - 將接收到的消息
fputs()
到標準輸出。
- 循環調用
4. 總結
- 互斥鎖的必要性: 在多線程環境下,若不使用同步機制保護共享數據,會導致數據競爭和不可預期的結果。
clnt_socks
和clnt_cnt
的并發修改是典型場景。 - 線程分離 vs. 等待: 服務器端
pthread_detach
的使用簡化了主線程的管理,適用于這種“即發即忘”的獨立工作單元。客戶端pthread_join
的意圖是等待線程完成,但需配合更優雅的線程退出信號。 - 阻塞I/O與多線程: 每個客戶端一個線程,每個線程中的
read()
是阻塞的。這簡化了單個線程的邏輯,但當連接數非常大時,線程資源開銷會成為瓶頸。 - 客戶端非阻塞體驗: 通過發送和接收分離到不同線程,客戶端用戶體驗得到了提升,不會因為等待網絡消息而卡住輸入。
- 基本通信協議: 客戶端在發送消息前簡單地將用戶名預置到消息體中,服務器直接轉發這個消息體。這是一個非常初級的“協議”。
?具體代碼如下:
?服務端代碼:網絡編程 + 多線程 + 線程同步
// 網絡編程+多線程+線程同步實現的聊天服務器和客戶端#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <pthread.h>#define BUF_SIZE 100 // 定義緩沖區大小
#define MAX_CLNT 256 // 最大客戶端數量// 函數聲明
void * handle_clnt(void * arg); // 處理客戶端連接的線程函數
void send_msg(char * msg, int len); // 向所有客戶端發送消息
void error_handling(char * msg); // 錯誤處理函數int clnt_cnt = 0; // 當前客戶端連接數量
int clnt_socks[MAX_CLNT]; // 存儲所有客戶端的socket描述符
pthread_mutex_t mutx; // 互斥鎖,用于同步對共享資源的訪問(客戶端數組)int main(int argc, char *argv[])
{int serv_sock, clnt_sock; // 服務端socket和客戶端socketstruct sockaddr_in serv_adr, clnt_adr; // 服務端和客戶端地址int clnt_adr_sz; // 客戶端地址結構的大小pthread_t t_id; // 線程IDif(argc != 2) {printf("Usage : %s <port>\n", argv[0]); // 檢查輸入的端口號參數exit(1);}pthread_mutex_init(&mutx, NULL); // 初始化互斥鎖serv_sock = socket(PF_INET, SOCK_STREAM, 0); // 創建服務端socketif(serv_sock == -1) {error_handling("socket() error");}memset(&serv_adr, 0, sizeof(serv_adr)); // 初始化服務端地址結構serv_adr.sin_family = AF_INET;serv_adr.sin_addr.s_addr = htonl(INADDR_ANY); // 綁定到所有可用接口serv_adr.sin_port = htons(atoi(argv[1])); // 使用命令行提供的端口號if(bind(serv_sock, (struct sockaddr*)&serv_adr, sizeof(serv_adr)) == -1) // 綁定服務端socketerror_handling("bind() error");if(listen(serv_sock, 5) == -1) // 開始監聽error_handling("listen() error");while(1){clnt_adr_sz = sizeof(clnt_adr); // 獲取客戶端地址大小clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &clnt_adr_sz); // 接受客戶端連接// 添加新的客戶端socket到數組pthread_mutex_lock(&mutx); // 獲取互斥鎖,確保線程安全clnt_socks[clnt_cnt++] = clnt_sock; // 增加客戶端到客戶端數組pthread_mutex_unlock(&mutx); // 釋放互斥鎖// 創建新線程來處理客戶端pthread_create(&t_id, NULL, handle_clnt, (void*)&clnt_sock);pthread_detach(t_id); // 將線程分離,避免主線程等待printf("Connected client IP: %s \n", inet_ntoa(clnt_adr.sin_addr)); // 輸出客戶端IP地址}close(serv_sock); // 關閉服務端socketreturn 0;}// 處理客戶端的函數
void * handle_clnt(void * arg)
{int clnt_sock = *((int*)arg); // 獲取客戶端socketint str_len = 0, i;char msg[BUF_SIZE]; // 緩沖區while((str_len = read(clnt_sock, msg, sizeof(msg))) != 0) // 讀取客戶端發送的消息send_msg(msg, str_len); // 將消息轉發給所有客戶端// 客戶端斷開連接后,移除客戶端pthread_mutex_lock(&mutx); // 獲取互斥鎖for(i = 0; i < clnt_cnt; i++) // 查找并移除斷開的客戶端{if(clnt_sock == clnt_socks[i]){while(i++ < clnt_cnt - 1) // 將后續客戶端前移clnt_socks[i] = clnt_socks[i + 1];break;}}clnt_cnt--; // 客戶端數量減一pthread_mutex_unlock(&mutx); // 釋放互斥鎖close(clnt_sock); // 關閉客戶端socketreturn NULL;}// 向所有客戶端發送消息
void send_msg(char * msg, int len)
{int i;pthread_mutex_lock(&mutx); // 獲取互斥鎖,保護共享資源(客戶端socket數組)for(i = 0; i < clnt_cnt; i++) // 向所有連接的客戶端發送消息write(clnt_socks[i], msg, len);pthread_mutex_unlock(&mutx); // 釋放互斥鎖
}// 錯誤處理函數
void error_handling(char * msg)
{fputs(msg, stderr); // 輸出錯誤信息fputc('\n', stderr);exit(1); // 退出程序
}
客戶端代碼:網絡編程 + 多線程
// 客戶端程序:網絡編程+多線程實現的聊天客戶端#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <pthread.h>#define BUF_SIZE 100 // 定義消息的最大長度
#define NAME_SIZE 20 // 定義用戶名的最大長度// 函數聲明
void * send_msg(void * arg); // 發送消息的線程函數
void * recv_msg(void * arg); // 接收消息的線程函數
void error_handling(char * msg); // 錯誤處理函數// 用戶名和消息緩沖區
char name[NAME_SIZE] = "[DEFAULT]"; // 默認用戶名
char msg[BUF_SIZE]; // 用于存儲用戶輸入的消息int main(int argc, char *argv[])
{int sock;struct sockaddr_in serv_addr; // 服務器地址結構pthread_t snd_thread, rcv_thread; // 發送和接收消息的線程void * thread_return;// 檢查命令行參數,確保提供了 IP、端口和用戶名if(argc != 4) {printf("Usage : %s <IP> <port> <name>\n", argv[0]);exit(1);}// 設置客戶端用戶名sprintf(name, "[%s]", argv[3]);// 創建客戶端socketsock = socket(PF_INET, SOCK_STREAM, 0);// 初始化服務器地址結構memset(&serv_addr, 0, sizeof(serv_addr));serv_addr.sin_family = AF_INET;serv_addr.sin_addr.s_addr = inet_addr(argv[1]); // 獲取服務器的IP地址serv_addr.sin_port = htons(atoi(argv[2])); // 獲取服務器的端口號// 連接到服務器if(connect(sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -1)error_handling("connect() error");// 創建發送和接收消息的線程pthread_create(&snd_thread, NULL, send_msg, (void*)&sock);pthread_create(&rcv_thread, NULL, recv_msg, (void*)&sock);// 等待兩個線程結束pthread_join(snd_thread, &thread_return);pthread_join(rcv_thread, &thread_return);close(sock); // 關閉客戶端socketreturn 0;}// 發送消息的線程函數
void * send_msg(void * arg)
{int sock = *((int*)arg); // 獲取客戶端socketchar name_msg[NAME_SIZE + BUF_SIZE]; // 用于存儲帶有用戶名的消息while(1) {fgets(msg, BUF_SIZE, stdin); // 獲取用戶輸入的消息// 如果輸入為 "q" 或 "Q",則退出程序if(!strcmp(msg, "q\n") || !strcmp(msg, "Q\n")) {close(sock); // 關閉socket連接exit(0); // 退出程序}// 將用戶名和消息合并成一個字符串sprintf(name_msg, "%s %s", name, msg);// 發送合并后的消息到服務器write(sock, name_msg, strlen(name_msg));}return NULL; // 返回空值}// 接收消息的線程函數
void * recv_msg(void * arg)
{int sock = *((int*)arg); // 獲取客戶端socketchar name_msg[NAME_SIZE + BUF_SIZE]; // 用于存儲帶有用戶名的消息int str_len;while(1){// 從服務器讀取消息str_len = read(sock, name_msg, NAME_SIZE + BUF_SIZE - 1);if(str_len == -1) // 如果讀取失敗,返回錯誤return (void*)-1;name_msg[str_len] = 0; // 將讀取的字符串以 null 結尾fputs(name_msg, stdout); // 輸出服務器發來的消息}return NULL; // 返回空值}// 錯誤處理函數
void error_handling(char *msg)
{fputs(msg, stderr); // 將錯誤消息輸出到標準錯誤fputc('\n', stderr); // 輸出換行符exit(1); // 退出程序
}