文章目錄
- CUDA Stream 回調函數示例代碼
- 基本概念
- 示例代碼
- 代碼解釋
- 回調函數的特點
- 更復雜的示例:多個回調
- 注意事項
- CUDA Stream 回調函數中使用 MPI 或 NCCL
- 示例程序
- 注意事項
CUDA Stream 回調函數示例代碼
CUDA 中的流回調函數(stream callback)是一種在 CUDA 流中插入異步回調的機制,它允許你在流的特定位置插入一個主機端函數調用。回調函數會在流中所有前面的操作都完成后被調用。
基本概念
- 回調函數: 一個在主機上執行的函數,當流中前面的所有操作都完成后被調用
- 異步執行: 回調不會阻塞主機線程
- 執行順序: 回調函數在流中按照插入順序執行
示例代碼
#include <stdio.h>
#include <cuda_runtime.h>// CUDA核函數
__global__ void kernel(int *data, int value, int N) {int idx = blockIdx.x * blockDim.x + threadIdx.x;if (idx < N) {data[idx] = value;}
}// 回調函數
void CUDART_CB myCallback(cudaStream_t stream, cudaError_t status, void *userData) {printf("Callback executed! Status: %s, User data: %d\n",cudaGetErrorString(status), *(int*)userData);
}int main() {const int N = 1024;const int value = 42;int *d_data = nullptr;int userData = 123; // 用戶自定義數據// 分配設備內存cudaMalloc(&d_data, N * sizeof(int));// 創建流cudaStream_t stream;cudaStreamCreate(&stream);// 啟動核函數dim3 block(256);dim3 grid((N + block.x - 1) / block.x);kernel<<<grid, block, 0, stream>>>(d_data, value, N);// 添加回調函數到流cudaStreamAddCallback(stream, myCallback, &userData, 0);// 可以繼續添加其他操作到流kernel<<<grid, block, 0, stream>>>(d_data, value + 1, N);// 等待流完成cudaStreamSynchronize(stream);// 清理資源cudaFree(d_data);cudaStreamDestroy(stream);return 0;
}
代碼解釋
-
核函數: 簡單的核函數,將數組元素設置為指定值。
-
回調函數:
- 必須具有
void CUDART_CB func(cudaStream_t stream, cudaError_t status, void *userData)
的簽名 status
參數表示流中前面操作的狀態userData
是用戶提供的自定義數據
- 必須具有
-
主程序流程:
- 分配設備內存
- 創建CUDA流
- 啟動第一個核函數
- 添加回調函數到流中
- 啟動第二個核函數
- 同步流以確保所有操作完成
- 釋放資源
回調函數的特點
-
執行時機: 回調函數會在流中所有前面的操作完成后執行,但在后續操作開始前執行。
-
線程安全: 回調函數在獨立的線程中執行,不是主線程。
-
限制:
- 回調函數中不應調用CUDA API函數
- 不應執行耗時的操作
- 不應拋出異常
-
用戶數據: 可以通過
userData
參數傳遞數據給回調函數,但需要確保在回調執行時數據仍然有效。
更復雜的示例:多個回調
#include <stdio.h>
#include <cuda_runtime.h>__global__ void kernel(int *data, int value, int N) {int idx = blockIdx.x * blockDim.x + threadIdx.x;if (idx < N) {data[idx] += value;}
}void CUDART_CB callback1(cudaStream_t stream, cudaError_t status, void *userData) {printf("Callback 1: Step %d completed\n", *(int*)userData);
}void CUDART_CB callback2(cudaStream_t stream, cudaError_t status, void *userData) {printf("Callback 2: Step %d completed\n", *(int*)userData);
}int main() {const int N = 1024;int *d_data = nullptr;int step1 = 1, step2 = 2;cudaMalloc(&d_data, N * sizeof(int));cudaMemset(d_data, 0, N * sizeof(int));cudaStream_t stream;cudaStreamCreate(&stream);dim3 block(256);dim3 grid((N + block.x - 1) / block.x);// 執行序列: 核函數 -> 回調1 -> 核函數 -> 回調2kernel<<<grid, block, 0, stream>>>(d_data, 10, N);cudaStreamAddCallback(stream, callback1, &step1, 0);kernel<<<grid, block, 0, stream>>>(d_data, 20, N);cudaStreamAddCallback(stream, callback2, &step2, 0);cudaStreamSynchronize(stream);// 驗證結果int h_data[N];cudaMemcpy(h_data, d_data, N * sizeof(int), cudaMemcpyDeviceToHost);printf("First element: %d (should be 30)\n", h_data[0]);cudaFree(d_data);cudaStreamDestroy(stream);return 0;
}
這個示例展示了如何在流中插入多個回調函數,每個回調在不同的執行階段被觸發。
注意事項
- 回調函數執行時間應盡量短,避免影響整體性能
- 確保回調函數中使用的用戶數據在回調執行時仍然有效
- 回調函數中不應進行CUDA API調用
- 回調函數執行順序與它們在流中的添加順序一致
回調函數是CUDA流中實現主機-設備交互和任務協調的有力工具,特別適合用于構建復雜的異步執行流水線。
CUDA Stream 回調函數中使用 MPI 或 NCCL
在 CUDA 中,stream 回調函數是在主機端執行的函數,當 stream 中所有前面的操作都完成后會被調用。關于在回調函數中使用 MPI 或 NCCL 的問題:
-
MPI: 可以在回調函數中使用 MPI 函數,但需要注意 MPI 的線程安全性。MPI 需要初始化為
MPI_THREAD_SERIALIZED
或MPI_THREAD_MULTIPLE
級別才能安全地在回調中使用。 -
NCCL: 也可以在回調函數中使用 NCCL 函數,但需要注意 NCCL 通信可能會與 CUDA 操作交錯,需要確保正確的同步。
示例程序
下面是一個展示如何在 CUDA stream 回調函數中使用 MPI 和 NCCL 的示例程序:
#include <stdio.h>
#include <mpi.h>
#include <cuda_runtime.h>
#include <nccl.h>#define CUDACHECK(cmd) do { \cudaError_t e = cmd; \if( e != cudaSuccess ) { \printf("CUDA error %s:%d '%s'\n", \__FILE__,__LINE__,cudaGetErrorString(e)); \exit(EXIT_FAILURE); \} \
} while(0)#define NCCLCHECK(cmd) do { \ncclResult_t r = cmd; \if( r != ncclSuccess ) { \printf("NCCL error %s:%d '%s'\n", \__FILE__,__LINE__,ncclGetErrorString(r)); \exit(EXIT_FAILURE); \} \
} while(0)void CUDART_CB myStreamCallback(cudaStream_t stream, cudaError_t status, void *userData) {int *data = (int*)userData;int rank, size;// 獲取MPI信息MPI_Comm_rank(MPI_COMM_WORLD, &rank);MPI_Comm_size(MPI_COMM_WORLD, &size);printf("Rank %d: Stream callback executed. Data value: %d\n", rank, *data);// 在這里可以使用MPI函數MPI_Barrier(MPI_COMM_WORLD);// 也可以使用NCCL函數(需要先初始化NCCL)ncclComm_t comm = *(ncclComm_t*)((void**)userData + 1);float *sendbuff, *recvbuff;// 假設這些緩沖區已經在其他地方分配和初始化// NCCLCHECK(ncclAllReduce(sendbuff, recvbuff, count, ncclFloat, ncclSum, comm, stream));printf("Rank %d: Finished MPI/NCCL operations in callback\n", rank);
}int main(int argc, char* argv[]) {int rank, size;// 初始化MPI,要求線程支持MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &provided);if (provided < MPI_THREAD_SERIALIZED) {printf("MPI thread support insufficient\n");MPI_Abort(MPI_COMM_WORLD, 1);}MPI_Comm_rank(MPI_COMM_WORLD, &rank);MPI_Comm_size(MPI_COMM_WORLD, &size);// 初始化CUDAint dev = rank % 8; // 假設每個進程使用不同的GPUCUDACHECK(cudaSetDevice(dev));// 初始化NCCLncclComm_t comm;ncclUniqueId id;if (rank == 0) ncclGetUniqueId(&id);MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD);NCCLCHECK(ncclCommInitRank(&comm, size, id, rank));// 創建CUDA streamcudaStream_t stream;CUDACHECK(cudaStreamCreate(&stream));// 準備一些數據傳遞給回調函數int *h_data, *d_data;h_data = (int*)malloc(sizeof(int));*h_data = rank * 100;CUDACHECK(cudaMalloc(&d_data, sizeof(int)));CUDACHECK(cudaMemcpyAsync(d_data, h_data, sizeof(int), cudaMemcpyHostToDevice, stream));// 準備用戶數據(包含普通數據和NCCL通信器)void *userData[2];userData[0] = h_data;userData[1] = &comm;// 添加回調函數CUDACHECK(cudaStreamAddCallback(stream, myStreamCallback, userData, 0));// 等待stream完成CUDACHECK(cudaStreamSynchronize(stream));// 清理資源NCCLCHECK(ncclCommDestroy(comm));CUDACHECK(cudaStreamDestroy(stream));CUDACHECK(cudaFree(d_data));free(h_data);MPI_Finalize();return 0;
}
注意事項
-
MPI 線程安全: 必須使用
MPI_Init_thread
并確保提供的線程支持級別足夠(至少MPI_THREAD_SERIALIZED
)。 -
NCCL 使用: 在回調中使用 NCCL 時需要確保:
- NCCL 通信器已經初始化
- 使用的 CUDA stream 與 NCCL 操作兼容
- 緩沖區已經正確分配和初始化
-
死鎖風險: 在回調中進行集體通信操作(如 MPI_Barrier 或 ncclAllReduce)時要小心,確保所有進程都能到達該點。
-
性能考慮: 在回調中進行通信操作可能會影響整體性能,需要仔細評估。
這個示例展示了基本用法,實際應用中需要根據具體需求進行調整。