目錄
一、進程間通信
1、管道(Pipe)
2、消息隊列(Message Queue)
4、信號量(Semaphore)
5、套接字(Socket)
6、信號(Signal)
9、剪貼板(Clipboard)
二、線程間通信
2、消息隊列(Message Queue)
3、?同步對象
4、原子操作
5、Future和Promise
6、管道
一、進程間通信
1、管道(Pipe)
C++管道通信的原理基于內核中的緩存機制和文件描述符。匿名管道和命名管道是兩種主要的管道類型,它們在創建、使用和通信特性上有所不同。匿名管道適用于具有共同祖先的進程間通信,而命名管道則提供了更廣泛的通信能力。通過管道,進程間可以高效、安全地傳遞數據。
1.1 匿名管道
在Linux系統中,匿名管道是通過pipe函數創建的。該函數在內核中創建一個環形隊列作為緩沖區,并返回兩個文件描述符,一個用于讀(fd[0]),一個用于寫(fd[1])。當一個進程向管道的寫端(fd[1])寫入數據時,數據實際上是被寫入到內核中的緩沖區。另一個進程可以從管道的讀端(fd[0])讀取這些數據,讀取操作實際上是從內核緩沖區中讀取數據。匿名管道是單向的,數據只能在一個方向上流動。如果需要雙向通信,必須創建兩個管道。匿名管道的生命周期與進程相關。當所有使用管道的文件描述符都被關閉后,管道將被銷毀。
示例代碼:
#include <iostream>
#include <unistd.h> // UNIX 標準函數定義
#include <sys/types.h>
#include <sys/wait.h>
#include <string.h> int main() { int pipefd[2]; // 文件描述符數組,pipefd[0] 是讀端,pipefd[1] 是寫端 pid_t pid; char buf[1024]; const char *message = "Hello from parent"; const char *response = "Hello back from child"; // 創建管道 if (pipe(pipefd) == -1) { perror("pipe"); return 1; } // 創建子進程 pid = fork(); if (pid == -1) { perror("fork"); return 1; } // 子進程 if (pid == 0) { // 關閉管道的寫端 close(pipefd[1]); // 從管道讀取數據 read(pipefd[0], buf, sizeof(buf)); std::cout << "Received from parent: " << buf << std::endl; // 關閉管道的讀端 close(pipefd[0]); // 子進程退出_exit(0); } // 父進程 else { // 關閉管道的讀端 close(pipefd[0]); // 向子進程發送數據 write(pipefd[1], message, strlen(message) + 1); // 發送字符串及其終結符 // 關閉管道的寫端 close(pipefd[1]); // 等待子進程結束 wait(NULL); } return 0;
}
1.2 命名管道
在Linux系統中,可以使用mkfifo命令或mkfifo函數來創建命名管道。命名管道在文件系統中有一個對應的文件名,因此可以通過文件名來訪問它。任何進程都可以通過打開命名管道對應的文件來訪問它。進程可以使用標準的文件操作函數(如read、write)來讀寫命名管道。命名管道支持雙向通信和跨網絡通信。多個進程可以連接到同一個命名管道進行讀寫操作。
父進程示例代碼:
#include <iostream>
#include <unistd.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h> int main() { const char *fifo_name = "/tmp/myfifo"; const char *message = "Hello from parent"; // 創建命名管道 mkfifo(fifo_name, 0666); // 打開命名管道以寫入數據 int fd = open(fifo_name, O_WRONLY); if (fd == -1) { perror("open"); return 1; } // 寫入數據到命名管道 write(fd, message, strlen(message) + 1); // 發送字符串及其終結符 // 關閉命名管道 close(fd); // 等待子進程結束(假設有子進程正在讀取這個命名管道) // 注意:這個示例中沒有創建子進程,但在實際應用中你可能需要等待 return 0;
}
?子進程示例代碼:
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
#include <string.h> int main() { const char *fifo_name = "/tmp/myfifo"; char buf[1024]; // 打開命名管道以讀取數據 int fd = open(fifo_name, O_RDONLY); if (fd == -1) { perror("open"); return 1; } // 從命名管道讀取數據 ssize_t num_bytes = read(fd, buf, sizeof(buf) - 1); if (num_bytes == -1) { perror("read"); return 1; } buf[num_bytes] = '\0'; // 確保字符串正確終結 // 輸出接收到的消息 std::cout << "Received from parent: " << buf << std::endl; // 關閉命名管道 close(fd); return 0;
}
2、消息隊列(Message Queue)
定義:消息隊列獨立于進程而存在,可以用作進程間傳遞數據的媒介。在大多數操作系統中,消息隊列的實現依賴于內核的支持。當進程向消息隊列發送消息時,這些消息會被存儲在內核空間中,直到其他進程從隊列中讀取它們。
特點:它允許進程間異步通信,克服了管道缺點(管道只能承載無格式字節流,且緩沖區大小受限)。
以下是一個使用 POSIX 消息隊列的 C++ 示例:
發送者(sender.cpp)
#include <mqueue.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include <iostream> int main() { mqd_t mqd; const char* queueName = "/myqueue"; // 創建或打開消息隊列 mqd = mq_open(queueName, O_CREAT | O_WRONLY, 0644, NULL); if (mqd == (mqd_t)-1) { perror("mq_open"); return 1; } // 發送消息 const char* message = "Hello, Message Queue!"; unsigned int priority = 0; if (mq_send(mqd, message, strlen(message) + 1, priority) == -1) { perror("mq_send"); mq_close(mqd); mq_unlink(queueName); return 1; } std::cout << "Message sent\n"; mq_close(mqd); mq_unlink(queueName); // 可選:如果不再需要隊列,則刪除它 return 0;
}
接收者(receiver.cpp)
#include <mqueue.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <iostream> int main() { mqd_t mqd; const char* queueName = "/myqueue"; char buffer[256]; unsigned int priority; // 打開消息隊列 mqd = mq_open(queueName, O_CREAT | O_RDONLY, 0644, NULL); if (mqd == (mqd_t)-1) { perror("mq_open"); return 1; } // 接收消息 ssize_t bytesRead = mq_receive(mqd, buffer, sizeof(buffer), &priority); if (bytesRead == -1) { perror("mq_receive"); mq_close(mqd); return 1; } buffer[bytesRead] = '\0'; // 確保字符串以 null 結尾 std::cout << "Received message: " << buffer << std::endl; mq_close(mqd); // 注意:不要在這里調用 mq_unlink,除非你想刪除隊列 return 0;
}
注意事項:
確保在發送者和接收者之間正確同步消息隊列的創建和刪除。
消息隊列的大小和消息的最大長度在創建隊列時可以指定。
權限(如 0644)需要根據你的需求進行設置。?
3、共享內存(Shared Memory)
定義:允許多個進程訪問同一塊內存區域,從而實現進程間數據共享;
特點:它是最快的進程間通信方式,避免了數據的拷貝,但共享內存需要解決并發訪問和同步問題,常用的同步機制包括互斥鎖(Mutexes)、信號量(Semaphores)和事件(Events)等。
在Windows系統中,使用共享內存通常涉及到CreateFileMapping和MapViewOfFile等Win32 API函數。為了在多個進程之間安全地讀寫共享內存,我們需要在這些進程之間實現某種形式的同步機制。
創建共享內存的生產者(寫入者)
#include <windows.h>
#include <iostream> int main() { // 定義共享內存的名稱和大小 const char* shmName = "Global\\MySharedMemory"; const size_t shmSize = 256; const char* mutexName = "Global\\MySharedMemoryMutex"; // 創建互斥鎖 HANDLE hMutex = CreateMutex( NULL, // 默認安全屬性 FALSE, // 初始不擁有互斥鎖 mutexName); // 互斥鎖名稱 if (hMutex == NULL) { std::cerr << "Could not create mutex (" << GetLastError() << ").\n"; return 1; } // 創建或打開一個文件映射對象 HANDLE hMapFile = CreateFileMapping( INVALID_HANDLE_VALUE, // 使用分頁文件 NULL, // 默認安全屬性 PAGE_READWRITE, // 讀寫訪問 0, // 高32位文件大小 shmSize, // 低32位文件大小 shmName); // 對象名 if (hMapFile == NULL) { std::cerr << "Could not create file mapping object (" << GetLastError() << ").\n"; return 1; } // 將文件映射對象映射到視圖 void* pBuf = MapViewOfFile( hMapFile, // 文件映射對象 FILE_MAP_ALL_ACCESS, // 讀寫訪問 0, // 高32位偏移量 0, // 低32位偏移量 0); // 映射整個文件 if (pBuf == NULL) { std::cerr << "Could not map view of file (" << GetLastError() << ").\n"; CloseHandle(hMapFile); return 1; } // 等待互斥鎖 WaitForSingleObject(hMutex, INFINITE); // 寫入數據 std::strcpy_s(static_cast<char*>(pBuf), shmSize, "Hello, Shared Memory!"); // 釋放互斥鎖 ReleaseMutex(hMutex); // 取消映射視圖 UnmapViewOfFile(pBuf); // 關閉文件映射對象句柄 CloseHandle(hMapFile);// 關閉互斥鎖句柄(注意:通常在程序結束時自動關閉,但顯式關閉是個好習慣) CloseHandle(hMutex); std::cout << "Shared memory written successfully.\n"; return 0;
}
訪問共享內存的消費者(讀取者)
#include <windows.h>
#include <iostream> int main() { // 定義共享內存和互斥鎖的名稱 const char* shmName = "Global\\MySharedMemory"; const char* mutexName = "Global\\MySharedMemoryMutex"; // 打開互斥鎖 HANDLE hMutex = OpenMutex( SYNCHRONIZE, // 訪問權限 FALSE, // 不更改現有所有者的所有權 mutexName); // 互斥鎖名稱 if (hMutex == NULL) { std::cerr << "Could not open mutex (" << GetLastError() << ").\n"; return 1; } // 打開一個現有的文件映射對象 HANDLE hMapFile = OpenFileMapping( FILE_MAP_READ, // 讀取訪問 FALSE, // 不繼承句柄 shmName); // 對象名 if (hMapFile == NULL) { std::cerr << "Could not open file mapping object (" << GetLastError() << ").\n"; return 1; } // 將文件映射對象映射到視圖 void* pBuf = MapViewOfFile( hMapFile, // 文件映射對象 FILE_MAP_READ, // 讀取訪問 0, // 高32位偏移量 0, // 低32位偏移量 0); // 映射整個文件 if (pBuf == NULL) { std::cerr << "Could not map view of file (" << GetLastError() << ").\n"; CloseHandle(hMapFile); return 1; } // 等待互斥鎖 WaitForSingleObject(hMutex, INFINITE); // 讀取數據 std::cout << "Shared memory content: " << static_cast<char*>(pBuf) << std::endl; // 釋放互斥鎖 ReleaseMutex(hMutex); // 取消映射視圖 UnmapViewOfFile(pBuf); // 關閉文件映射對象句柄 CloseHandle(hMapFile); // 關閉互斥鎖句柄 CloseHandle(hMutex); return 0;
}
請注意,這里的互斥鎖是在全局命名空間中創建的(通過前綴"Global\\"),這意味著它可以在系統范圍內的任何進程中訪問。這是必需的,因為我們的目標是讓多個不同的進程能夠識別并訪問同一個互斥鎖。
此外,我們使用了WaitForSingleObject函數來等待互斥鎖變得可用,并使用ReleaseMutex來釋放互斥鎖。這兩個函數一起工作,以確保在任何給定時間只有一個進程可以訪問共享內存區域。
最后,請確保在實際應用中適當地處理所有可能的錯誤情況,并且在不再需要時關閉所有句柄。這有助于避免資源泄漏和其他潛在問題。
4、信號量(Semaphore)
定義:它是用于多個進程對共享資源訪問的同步機制;
特點:信號量的操作包括P操作和V操作,分別用于申請資源和釋放資源。
以下是一個使用C++和Windows API編寫的示例,該示例中創建了兩個進程,一個生產者進程和一個消費者進程。生產者生產一定數量的“產品”,并通過信號量來通知消費者這些產品已經準備好被消費。
生產者示例代碼
// Producer.cpp
#include <windows.h>
#include <iostream> // 假設信號量的名稱是固定的
#define SEMAPHORE_NAME TEXT("Global\\MySemaphore") int main() { HANDLE hSemaphore = CreateSemaphore( NULL, // 默認安全屬性 0, // 初始計數為0,表示沒有產品可用 10, // 最大計數為10,表示最多可以有10個產品 SEMAPHORE_NAME); // 信號量名稱 if (hSemaphore == NULL) { std::cerr << "Failed to create semaphore." << std::endl; return 1; } for (int i = 0; i < 5; ++i) { // 模擬生產產品 Sleep(1000); // 假設生產一個產品需要1秒 std::cout << "Produced item " << i + 1 << std::endl; // 增加信號量的計數,表示有一個產品已經生產好了 if (!ReleaseSemaphore(hSemaphore, 1, NULL)) { std::cerr << "Failed to release semaphore." << std::endl; break; } } // 關閉句柄(注意:在Windows中,當進程結束時,所有句柄都會自動關閉) // 但顯式關閉是一個好習慣,特別是在大型或長期運行的程序中 // CloseHandle(hSemaphore); // 在這個例子中,可以省略,因為進程即將結束 return 0;
}
消費者示例代碼
// Consumer.cpp
#include <windows.h>
#include <iostream> // 假設信號量的名稱與生產者中相同
#define SEMAPHORE_NAME TEXT("Global\\MySemaphore") int main() { HANDLE hSemaphore = OpenSemaphore( SEMAPHORE_ALL_ACCESS, // 請求完全訪問權限 FALSE, // 不希望句柄被子進程繼承 SEMAPHORE_NAME); // 信號量名稱 if (hSemaphore == NULL) { std::cerr << "Failed to open semaphore." << std::endl; return 1; } for (int i = 0; i < 5; ++i) { // 等待信號量變為非零(即等待有產品可用) if (WaitForSingleObject(hSemaphore, INFINITE) != WAIT_OBJECT_0) { std::cerr << "Failed to wait for semaphore." << std::endl; break; } // 模擬消費產品 Sleep(500); // 假設消費一個產品需要0.5秒 std::cout << "Consumed item " << i + 1 << std::endl; } // 關閉句柄 CloseHandle(hSemaphore); return 0;
}
?請注意,我們使用了全局命名的信號量(通過前綴Global\),這意味著信號量在整個系統中都是可見的,可以被任何進程訪問。如果你只想在同一用戶會話中的進程間共享信號量,可以使用Local\前綴。
確保在生產者進程開始生產之前,消費者進程不會嘗試等待信號量,否則消費者可能會立即進入等待狀態,直到生產者開始生產。
在實際的應用程序中,你可能需要更復雜的錯誤處理和同步機制來確保程序的健壯性。
這個例子假設生產者和消費者都知道要生產/消費多少個產品。在更復雜的場景中,你可能需要其他機制來通知生產者何時停止生產或消費者何時停止消費。
在Windows中,通常不需要顯式關閉句柄,因為當進程結束時,所有句柄都會被自動關閉。但是,顯式關閉句柄是一個好習慣,特別是在大型或長期運行的程序中。在這個簡單的例子中,我們可以省略CloseHandle調用,因為進程很快就會結束。然而,在更復雜的應用程序或庫中,你應該始終關閉不再需要的。
5、套接字(Socket)
定義:它是網絡通信的接口,提供了端到端的通信服務;
特點:它支持TCP/UDP等多種協議,可以根據需要選擇合適的協議進行通信。
在Windows系統中,通過Winsock庫實現進程間通信。首先需要包含Winsock頭文件,并鏈接相應的庫。
#include <iostream>
#include <winsock2.h>
#include <ws2tcpip.h> #pragma comment(lib, "ws2_32.lib")
然后在程序開始時初始化Winsock庫。
WSADATA wsaData;
int iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (iResult != 0) { std::cerr << "WSAStartup failed with error: " << iResult << std::endl; return 1;
}
最后創建套接字,一個進程作為服務器(監聽套接字),另一個進程作為客戶端(連接套接字)。
服務器代碼:
// 假設我們使用TCP套接字
SOCKET ListenSocket = INVALID_SOCKET;
sockaddr_in service; // 創建套接字
ListenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (ListenSocket == INVALID_SOCKET) { std::cerr << "socket failed with error: " << WSAGetLastError() << std::endl; WSACleanup(); return 1;
} // 設置套接字地址結構
service.sin_family = AF_INET;
service.sin_addr.s_addr = inet_addr("127.0.0.1");
service.sin_port = htons(12345); // 綁定套接字
if (bind(ListenSocket, (SOCKADDR *)&service, sizeof(service)) == SOCKET_ERROR) { std::cerr << "bind failed with error: " << WSAGetLastError() << std::endl; closesocket(ListenSocket); WSACleanup(); return 1;
} // 監聽套接字
if (listen(ListenSocket, SOMAXCONN) == SOCKET_ERROR) { std::cerr << "listen failed with error: " << WSAGetLastError() << std::endl; closesocket(ListenSocket); WSACleanup(); return 1;
} // 等待客戶端連接(這里應該是一個循環,但為了簡化,我們只接受一個連接)
SOCKET ClientSocket = accept(ListenSocket, NULL, NULL);
if (ClientSocket == INVALID_SOCKET) { std::cerr << "accept failed with error: " << WSAGetLastError() << std::endl; closesocket(ListenSocket); WSACleanup(); return 1;
} // 現在你可以通過ClientSocket與客戶端通信了
// ...(發送和接收數據) // 關閉套接字
closesocket(ClientSocket);
closesocket(ListenSocket);
?客戶端代碼:
// 創建套接字
SOCKET ConnectSocket = INVALID_SOCKET;
sockaddr_in target; ConnectSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (ConnectSocket == INVALID_SOCKET) { std::cerr << "socket failed with error: " << WSAGetLastError() << std::endl; WSACleanup(); return 1;
} target.sin_family = AF_INET;
target.sin_addr.s_addr = inet_addr("127.0.0.1");
target.sin_port = htons(12345); // 連接到服務器
if (connect(ConnectSocket, (SOCKADDR *)&target, sizeof(target)) == SOCKET_ERROR) { std::cerr << "connect failed with error: " << WSAGetLastError() << std::endl; closesocket(ConnectSocket); WSACleanup(); return 1;
} // 現在你可以通過ConnectSocket與服務器通信了
// ...(發送和接收數據) // 關閉套接字
closesocket(ConnectSocket);
6、信號(Signal)
定義:信號是Unix/Linux系統中進程間通信的一種簡單方式,允許一個進程向另一個進程發送信號;信號可以由多種原因產生,包括用戶操作(如Ctrl+C產生SIGINT信號)、硬件異常(如非法內存訪問產生SIGSEGV信號)以及程序顯式請求(如使用kill
函數發送信號)。
特點:它是一種異步通信方式。
信號的發送:可以通過kill函數或raise函數發送信號。kill函數允許一個進程向另一個進程發送信號,而raise函數則允許進程向自己發送信號。
信號的接收:當信號被發送到進程時,操作系統會中斷該進程的正常流程,并調用相應的信號處理函數(如果已設置)。如果沒有設置信號處理函數,則進程會按照信號的默認行為執行(如終止進程、忽略信號或暫停進程等)。
信號處理函數:可以使用signal函數或更可靠的sigaction函數來設置信號處理函數。信號處理函數必須遵循特定的原型,并且當信號到達時會被調用。
#include <iostream>
#include <csignal>
#include <unistd.h> // 對于sleep()函數 // 信號處理函數
void signalHandler(int signum) { std::cout << "捕獲到信號 " << signum << std::endl; // 清理并關閉 // 注意:在實際的應用程序中,這里可能需要更復雜的清理代碼 // 退出程序 exit(signum);
} int main () { // 注冊信號SIGINT和信號處理程序 signal(SIGINT, signalHandler); while(1) { std::cout << "等待信號..." << std::endl; sleep(1); // 暫停一秒 } return 0;
}
在這個例子中,程序進入一個無限循環,每秒鐘打印一條消息,并等待用戶發送SIGINT信號(通常通過Ctrl+C)。當信號被捕獲時,signalHandler函數被調用,程序隨后退出。
請注意,使用signal函數有幾個限制,包括它不能保證信號處理函數的原子性(即,在信號處理函數執行期間,其他信號可能會被阻塞或丟失)。因此,在需要可靠信號處理的場景下,通常推薦使用sigaction函數。不過,上述示例足以展示信號的基本用法。?
9、剪貼板(Clipboard)
在Windows等圖形界面中,剪貼板也可以作為一種進程間通信方式,它允許在不同進程之間復制和粘貼數據。
二、線程間通信
C++多線程通信是指在一個進程中運行的不同線程之間交換數據或控制信息,以協調它們的執行。在C++中,線程間通信可以通過多種方式實現,以下是幾種主要的通信方式:
1、共享內存(Shared Memory)
共享內存是C++多線程通信中最直接的方式。多個線程可以訪問同一塊內存區域,從而實現數據的交換和共享。然而,由于多個線程可能同時訪問同一塊內存,因此需要使用同步機制(如互斥鎖、讀寫鎖等)來保證線程安全,避免競態條件和數據不一致的問題。
實現方式:
使用全局變量、成員變量(對于多線程類)或通過指針/引用傳遞的數據結構,使得多個線程能夠訪問同一份數據。
使用C++標準庫中的std::mutex、std::lock_guard、std::unique_lock等同步機制來保護對共享內存的訪問。
以下是一個簡單的C++示例代碼,展示了如何使用std::thread
、std::mutex
和共享內存來實現線程間通信。在這個例子中,我們將創建兩個線程:一個生產者線程和一個消費者線程。生產者線程將向共享內存區域寫入數據,而消費者線程將從該區域讀取數據。為了同步訪問,我們將使用一個互斥鎖來保護共享內存。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono> // 共享數據結構和互斥鎖
std::mutex mtx;
int shared_data = 0;
bool data_ready = false; // 條件變量,用于通知消費者數據已準備好
std::condition_variable cv; // 生產者線程函數
void producer() { for (int i = 0; i < 5; ++i) { std::this_thread::sleep_for(std::chrono::seconds(1)); // 模擬耗時操作 std::lock_guard<std::mutex> lck(mtx); shared_data = i * 10; // 生產數據 data_ready = true; // 標記數據已準備好 std::cout << "Produced: " << shared_data << std::endl; cv.notify_one(); // 通知一個等待的線程 }
} // 消費者線程函數
void consumer() { while (true) { std::unique_lock<std::mutex> lck(mtx); // 等待數據準備好 cv.wait(lck, []{ return data_ready; }); // 讀取數據 std::cout << "Consumed: " << shared_data << std::endl; // 重置數據準備狀態 data_ready = false; // 釋放鎖,以便生產者可以繼續生產 lck.unlock(); // 在這里可以添加更多的處理邏輯 // 注意:這個簡單的例子沒有優雅地退出消費者線程。在實際應用中,你可能需要添加一些邏輯來安全地退出循環。 }
} int main() { std::thread producer_thread(producer); std::thread consumer_thread(consumer); producer_thread.join(); // 等待生產者線程完成 consumer_thread.join(); // 注意:在這個例子中,消費者線程永遠不會自己退出,所以這里會導致死鎖 // 在實際應用中,你可能需要一種方法來優雅地停止消費者線程,比如使用原子變量作為退出標志。 return 0;
} // 注意:上面的代碼示例中,消費者線程使用了無限循環,并且沒有優雅地退出循環的機制。
// 在實際應用中,你可能需要添加一個原子變量作為退出標志,并在適當的時候設置它,以便消費者線程可以安全地退出循環。
2、消息隊列(Message Queue)
消息隊列是另一種常見的線程間通信方式。線程之間可以通過消息隊列來傳遞數據,一個線程將數據放入隊列中,另一個線程從隊列中取出數據。這種方式可以實現線程間的解耦合,使得線程之間不需要直接訪問對方的內存空間。
實現方式:
使用C++標準庫中的std::queue或其他容器類來實現消息隊列。配合互斥鎖等同步機制來保護對隊列的訪問,確保線程安全。以下是一個使用std::queue
、std::mutex
和std::condition_variable
實現的簡單線程間通信示例,其中包含一個生產者線程和一個消費者線程:
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <stdexcept> // 消息隊列和同步機制
std::queue<int> messages;
std::mutex mtx;
std::condition_variable cv;
bool done = false; // 用于優雅地停止消費者線程 // 生產者線程函數
void producer(int id) { for (int i = 0; i < 5; ++i) { std::this_thread::sleep_for(std::chrono::seconds(1)); // 模擬耗時操作 std::lock_guard<std::mutex> lck(mtx); messages.push(id * 10 + i); // 生產消息 std::cout << "Producer " << id << " produced " << messages.back() << std::endl; cv.notify_one(); // 通知消費者線程 } // 所有消息生產完畢后,通知消費者線程可以退出了(可選) // 注意:這通常不是停止消費者線程的唯一方式,因為消費者可能還在等待新消息 // 在這個例子中,我們簡單地設置一個標志并再次通知消費者 { std::lock_guard<std::mutex> lck(mtx); done = true; cv.notify_one(); }
} // 消費者線程函數
void consumer() { while (true) { std::unique_lock<std::mutex> lck(mtx); cv.wait(lck, []{ return !messages.empty() || done; }); // 等待消息或完成信號 if (done && messages.empty()) { break; // 優雅地退出循環 } int msg = messages.front(); messages.pop(); std::cout << "Consumer consumed " << msg << std::endl; lck.unlock(); // 在處理消息之前釋放鎖(如果不需要在鎖內處理) // 處理消息(在這個例子中只是打印) }
} int main() { std::thread producer_thread1(producer, 1); std::thread producer_thread2(producer, 2); // 可以添加多個生產者 std::thread consumer_thread(consumer); producer_thread1.join(); producer_thread2.join(); // 等待所有生產者線程完成 // 通知消費者線程所有生產者都已完成(盡管在這個例子中,消費者線程可能會自己檢測到這一點) { std::lock_guard<std::mutex> lck(mtx); cv.notify_one(); // 可選,但在這個例子中,消費者線程可能已經在等待了 } consumer_thread.join(); // 等待消費者線程完成 return 0;
}
在這個示例中,我們創建了一個全局的std::queue<int>
作為消息隊列,以及一個互斥鎖std::mutex
和一個條件變量std::condition_variable
來同步對消息隊列的訪問。生產者線程向隊列中添加消息,并通過條件變量通知消費者線程。消費者線程則等待消息到來,處理消息,并在接收到所有生產者已完成的信號(或隊列為空且done
標志被設置)時退出循環。?
3、?同步對象
同步對象如信號量、條件變量等,可以用于協調多個線程的動作,實現線程間的同步與通信。
實現方式:
信號量:C++20引入了std::counting_semaphore,它是一種計數型的同步原語,可用于限制同時訪問共享資源的線程數量,或作為事件計數器。
條件變量:C++標準庫提供了std::condition_variable類,它允許一個線程等待特定條件滿足時才繼續執行,同時允許另一個線程改變該條件并通知等待線程。條件變量通常與互斥鎖一起使用。
4、原子操作
原子操作是指不可分割的操作,即這些操作在執行過程中不能被其他線程中斷。C++11引入了std::atomic模板類來支持原子操作,它提供了對變量的原子讀寫操作,避免了競態條件問題。
5、Future和Promise
std::future和std::promise提供了一種機制,允許一個線程向另一個線程傳遞異步計算的結果。std::promise用于設置一個可由std::future檢索的結果,而std::future則提供了一種阻塞或非阻塞的方式來獲取這個結果。
#include <iostream>
#include <future>
#include <thread>
#include <chrono> // 生產者函數,計算并設置promise的值
void producer(std::promise<int> promise) { // 模擬耗時的計算 std::this_thread::sleep_for(std::chrono::seconds(1)); // 計算結果 int result = 42; // 假設這是某種復雜計算的結果 // 將結果設置到promise中 promise.set_value(result);
} // 消費者函數,從future中獲取值
void consumer(std::future<int> future) { // 等待生產者設置值 int value = future.get(); // 這會阻塞,直到值被設置 // 使用值 std::cout << "The value is: " << value << std::endl;
} int main() { // 創建一個promise<int> std::promise<int> promise; // 從promise獲取future std::future<int> future = promise.get_future(); // 啟動生產者線程 std::thread producerThread(producer, std::move(promise)); // 在主線程中作為消費者 consumer(std::move(future)); // 等待生產者線程完成 producerThread.join(); return 0;
}
6、管道
在Unix-like系統中,管道(pipe)或命名管道(FIFO)也可以用于同一進程內的線程通信。管道提供了一種半雙工的通信方式,一個線程往管道中寫入數據,另一個線程從管道中讀取數據。然而,在C++標準庫中,并沒有直接提供管道的支持,但可以通過操作系統提供的API或第三方庫來實現。
總結
C++多線程通信方式多種多樣,包括共享內存、消息隊列、同步對象、原子操作、Future和Promise以及管道等。選擇合適的通信方式取決于具體的應用場景,包括數據交換的復雜度、同步需求、性能要求等因素。在設計多線程程序時,應盡量減少線程間的同步點,避免過度同步導致的性能瓶頸,并確保線程安全。