RDMA高性能網絡通信實踐
- 一、背景介紹
- 二、方法設計
- A.實現方案
- B.關鍵技術點
- 三、代碼及注釋
- 四、注意事項
一、背景介紹
遠程直接內存訪問(RDMA)技術通過繞過操作系統內核和CPU直接訪問遠程內存,實現了超低延遲、高吞吐量的網絡通信。該技術廣泛應用于高性能計算、分布式存儲和機器學習等領域。本文通過一個完整的代碼示例,演示如何利用RDMA核心組件(QP、MR、CQ等)實現跨節點內存直接讀寫。
二、方法設計
A.實現方案
- 控制平面:使用TCP協議交換RDMA連接參數
- 數據平面:基于IB Verbs接口實現零拷貝傳輸
- 混合模式:客戶端主動寫入,服務端被動讀取
B.關鍵技術點
- 內存注冊機制實現安全訪問
- QP狀態機轉換確保通信可靠性
- 完成隊列輪詢實現異步通知
- 端到端流控通過TCP協議實現
三、代碼及注釋
/*----------------------------- 頭文件包含 -----------------------------*/
// 標準庫和網絡相關頭文件
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
#include <assert.h>
#include <errno.h>
#include <sys/time.h>
#include <byteswap.h>// RDMA相關頭文件
#include <arpa/inet.h>
#include <infiniband/verbs.h>#ifdef USE_VACC
#include "vaccrt.h"
#include <vaccrt_mem_management.h>
#endif#ifdef USE_CUDA
#include <cuda.h>
#include <cuda_runtime.h>
#endif/**********************************************************************A.關鍵概念解釋:1.保護域(PD):資源隔離單元,所有資源(QP、MR等)必須屬于某個PD2.內存區域(MR):注冊的內存區域,只有注冊的內存才能用于RDMA操作3.隊列對(QP):包含發送隊列和接收隊列,是通信的基本單元4.工作請求(WR):描述要執行的操作(發送/接收/RDMA讀寫)5.完成隊列(CQ):用于通知操作完成狀態6.QP狀態轉換:a.INIT:初始狀態,設置基本參數b.RTR(Ready to Receive):準備好接收數據c.RTS(Ready to Send):準備好發送數據B.程序流程總結:1.通過TCP交換RDMA連接參數2.初始化IB資源(PD、CQ、MR、QP)3.交換QP信息(地址、密鑰等)4.進行QP狀態轉換(INIT->RTR->RTS)5.執行RDMA寫/讀操作6.輪詢完成隊列確認操作完成7.清理資源C.使用說明:1.編譯命令: gcc -o cuda -DUSE_CUDA -ggdb main.c -pthread -libverbs \-I /usr/local/cuda/include \-L /usr/local/cuda/lib64 -Wl,-rpath=/usr/local/cuda/lib64 \-lcudart -lcudadevrt -lcuda 2.服務端 : ./cuda 192.168.1.100 mlx5_03.客戶端 : ./cuda 192.168.1.101 mlx5_0 192.168.1.100 ***********************************************************************//*----------------------------- 全局配置 -----------------------------*/
#define MAX_POLL_CQ_TIMEOUT 6000 // CQ輪詢超時時間(毫秒)
#define MSG "Hello,World" // 要傳輸的測試消息
#define MSG_SIZE (64<<10)// 配置參數結構體
struct config {const char *dev; // IB設備名稱char *local_addr; // 本地IP地址u_int32_t port; // TCP端口號int ib_port; // IB端口號(默認1)int gid_idx; // GID索引(-1表示不使用RoCEv2)
} config = { NULL, NULL, 12025, 1, -1 };/*----------------------------- 資源結構體 -----------------------------*/
// 包含所有RDMA相關資源
struct resources {struct ibv_context *ctx; // IB上下文struct ibv_pd *pd; // 保護域(Protection Domain)struct ibv_cq *cq; // 完成隊列(Completion Queue)struct ibv_qp *qp; // 隊列對(Queue Pair)struct ibv_mr *mr; // 內存區域(Memory Region)void *buf; // 數據緩沖區指針int sock; // TCP套接字uint64_t remote_addr; // 遠程內存地址uint32_t remote_rkey; // 遠程內存訪問密鑰struct ibv_port_attr port_attr; // IB端口屬性
};/*----------------------------- 輔助函數 -----------------------------*/
/*** 建立TCP連接(客戶端)或監聽(服務端)* @param server 本地地址(服務端模式時使用)* @param port TCP端口號* @param remote_addr 遠程地址(客戶端模式時使用)* @return 成功返回套接字fd,失敗返回-1*/
int sock_connect(const char *server, int port,const char *remote_addr) {struct addrinfo hints = { .ai_family = AF_INET, .ai_socktype = SOCK_STREAM };struct addrinfo *res, *p;int sock = -1;char port_str[6];const char * p_addr=server;if(remote_addr) p_addr=remote_addr;sprintf(port_str, "%d", port);if (getaddrinfo(p_addr, port_str, &hints, &res)) return -1;for (p = res; p; p = p->ai_next) {sock = socket(p->ai_family, p->ai_socktype, p->ai_protocol);int reuse = 1;if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse))) return -1; if (sock < 0) continue;if (remote_addr) {if (connect(sock, p->ai_addr, p->ai_addrlen)) { close(sock); sock = -1; }else break;} else {if (bind(sock, p->ai_addr, p->ai_addrlen) || listen(sock, 1)) { close(sock); sock = -1; }else { sock = accept(sock, NULL, 0); break; }}}freeaddrinfo(res);return sock;
}/*** 通過TCP同步交換數據* @return 成功返回0,失敗返回-1*/
int sock_sync(int sock, int size, void *local, void *remote) {if (write(sock, local, size) != size) return -1;return read(sock, remote, size) == size ? 0 : -1;
}/*** 輪詢完成隊列直到操作完成或超時* @return 成功返回0,失敗返回-1*/
int poll_cq(struct ibv_cq *cq) {struct ibv_wc wc;unsigned long start = time(NULL) * 1000;while (time(NULL) * 1000 - start < MAX_POLL_CQ_TIMEOUT) {if (ibv_poll_cq(cq, 1, &wc) && wc.status == IBV_WC_SUCCESS) return 0;}return -1;
}/*** 提交發送工作請求(Work Request)* @param res 資源結構體指針* @param op 操作類型(IBV_WR_RDMA_WRITE/READ)*/
void post_send(struct resources *res, int op) {struct ibv_send_wr sr;struct ibv_sge sge;struct ibv_send_wr *bad_wr = NULL;memset(&sge, 0, sizeof(sge));sge.addr = (uintptr_t)res->buf;sge.length = res->mr->length;sge.lkey = res->mr->lkey;memset(&sr, 0, sizeof(sr));sr.next = NULL;sr.wr_id = 0;sr.sg_list = &sge;sr.num_sge = 1;sr.opcode = op;sr.send_flags = IBV_SEND_SIGNALED;sr.wr.rdma.remote_addr = res->remote_addr;sr.wr.rdma.rkey = res->remote_rkey;assert(0==ibv_post_send(res->qp, &sr, &bad_wr));
}/*----------------------------- 主函數 -----------------------------*/
int main(int argc, char **argv) {struct resources res = {0};char *remote_addr=0;//分配HOST或設備內存#ifdef USE_CPUres.buf = malloc(MSG_SIZE);memset(res.buf,0,MSG_SIZE);#endif #ifdef USE_CUDAcuInit(0);CUdevice cuDevice;cuDeviceGet(&cuDevice, 0);CUcontext cuContext;assert(0 == cuCtxCreate(&cuContext, 0, cuDevice));assert(0 == cudaMallocHost(&res.buf, MSG_SIZE));struct cudaPointerAttributes cu_attrs;assert(0==cudaPointerGetAttributes(&cu_attrs,res.buf));unsigned cuflags = 1;assert(0==cuPointerSetAttribute(&cuflags, CU_POINTER_ATTRIBUTE_SYNC_MEMOPS,(CUdeviceptr)cu_attrs.devicePointer));assert(res.buf);cudaMemset(res.buf,0,MSG_SIZE);#endif #ifdef USE_VACCint DevId=0;assert(0==vaccrtSetDevice(DevId)); assert(0==vaccrtMalloc(64<<10,MSG_SIZE,&res.buf));assert(0==vaccrtMemset(res.buf,0,MSG_SIZE)); int mem_handle=0;assert(0==vaccrtMemGetHandleForAddressRange(&mem_handle,res.buf, MSG_SIZE,VACCRT_MEM_RANGE_HANDLE_TYPE_DMA_BUF_FD,0));printf("mem_handle:%d ddr:%p\n",mem_handle,res.buf);#endifprintf("init done\n");// 參數解析:本地地址必須,設備名必須,遠程地址可選config.local_addr = argv[1]; // 本地IP地址char *dev_name = argv[2]; // IB設備名(如mlx5_0)if(argc > 3) remote_addr = argv[3]; // 遠程IP地址(客戶端模式)/*=============== 階段1:建立TCP連接 ===============*/// 用于交換RDMA連接參數(QPN、LID、內存地址等) res.sock = sock_connect(config.local_addr, config.port,remote_addr);assert(res.sock>=0);/*=============== 階段2:初始化IB資源 ===============*/// 1. 獲取IB設備列表并打開指定設備 int num_devices=0;struct ibv_device * cur_dev=NULL;struct ibv_device **dev_list = ibv_get_device_list(&num_devices);for (int i = 0; i < num_devices; i++) {const char *name = ibv_get_device_name(dev_list[i]);printf("%d:%s\n",i,name);if (dev_name && strcmp(name, dev_name) == 0) {res.ctx = ibv_open_device(dev_list[i]); cur_dev=dev_list[i];break;}} assert(res.ctx);// 2. 查詢IB端口屬性(獲取LID等信息)assert(0==ibv_query_port(res.ctx,1, &res.port_attr));// 3. 創建保護域(PD),用于管理資源權限res.pd = ibv_alloc_pd(res.ctx);assert(res.pd);// 4. 創建完成隊列(CQ),用于接收操作完成通知res.cq = ibv_create_cq(res.ctx, 1, NULL, NULL, 0);assert(res.cq);// 5. 注冊內存區域(MR),允許遠程訪問 int mr_flags=IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;#ifdef USE_CPUres.mr = ibv_reg_mr(res.pd, res.buf, MSG_SIZE,mr_flags );#endif#ifdef USE_VACCres.mr = ibv_reg_dmabuf_mr(res.pd,0,MSG_SIZE,(uint64_t)res.buf,mem_handle,mr_flags);#endif #ifdef USE_CUDAres.mr = ibv_reg_mr(res.pd, res.buf, MSG_SIZE,mr_flags); #endifif(0==res.mr){printf("ibv_reg_mr %s (錯誤碼=%d)\n", strerror(errno), errno);return -1;}// 6. 創建隊列對(QP),RC(可靠連接)類型struct ibv_qp_init_attr qp_init_attr;memset(&qp_init_attr, 0, sizeof(qp_init_attr));qp_init_attr.qp_type = IBV_QPT_RC;qp_init_attr.sq_sig_all = 1;qp_init_attr.send_cq = res.cq;qp_init_attr.recv_cq = res.cq;qp_init_attr.cap.max_send_wr = 1;qp_init_attr.cap.max_recv_wr = 1;qp_init_attr.cap.max_send_sge = 1;qp_init_attr.cap.max_recv_sge = 1;res.qp = ibv_create_qp(res.pd, &qp_init_attr);assert(res.qp);/*=============== 階段3:交換QP參數 ===============*/// 通過TCP交換雙方的QP信息(地址、密鑰等) union ibv_gid my_gid;assert(0==ibv_query_gid(res.ctx, 1,0,&my_gid));// 本地參數打包struct { uint64_t addr; // 內存地址uint16_t lid; // 本地標識符uint32_t rkey; // 內存訪問密鑰uint32_t qpn; // QP編號uint8_t gid[16]; // 全局標識符(RoCEv2需要)} local = {(uintptr_t)res.buf,res.port_attr.lid,res.mr->rkey,res.qp->qp_num},remote;memcpy(local.gid, &my_gid, 16);// 參數交換(TCP同步)assert(0 == sock_sync(res.sock, sizeof(local), &local, &remote));// 保存遠程參數res.remote_addr = remote.addr;res.remote_rkey = remote.rkey;printf("local: addr=%p, lkey=0x%x, rkey=0x%x\n", res.mr->addr, res.mr->lkey, res.mr->rkey);printf("remote: addr=%ld, rkey=0x%x\n", remote.addr, remote.rkey);/*=============== 階段4:QP狀態轉換 ===============*/// QP需要經歷三個狀態變化:INIT -> RTR(準備接收) -> RTS(準備發送)// 1. INIT狀態設置struct ibv_qp_attr attr;memset(&attr, 0, sizeof(attr));attr.qp_state = IBV_QPS_INIT;attr.port_num = 1;attr.pkey_index = 0;attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;int flags = IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT |IBV_QP_ACCESS_FLAGS;assert(0==ibv_modify_qp(res.qp, &attr, flags));// 2. RTR狀態設置(準備接收)memset(&attr, 0, sizeof(attr));attr.qp_state = IBV_QPS_RTR;attr.path_mtu = IBV_MTU_256;attr.dest_qp_num = remote.qpn;attr.rq_psn = 0;attr.max_dest_rd_atomic = 1;attr.min_rnr_timer = 0x12;attr.ah_attr.is_global = 1;attr.ah_attr.dlid = remote.lid;attr.ah_attr.sl = 0;attr.ah_attr.src_path_bits = 0;attr.ah_attr.port_num = 1;attr.ah_attr.is_global = 1;attr.ah_attr.port_num = 1;// 如果是RoCEv2需要設置GRHattr.ah_attr.grh.flow_label = 0;attr.ah_attr.grh.hop_limit = 1;attr.ah_attr.grh.sgid_index = 0;attr.ah_attr.grh.traffic_class = 0;memcpy(&attr.ah_attr.grh.dgid, remote.gid, 16);flags = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN |IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC |IBV_QP_MIN_RNR_TIMER;assert(0==ibv_modify_qp(res.qp, &attr,flags));// 3. RTS狀態設置(準備發送)memset(&attr, 0, sizeof(attr));attr.qp_state = IBV_QPS_RTS;attr.timeout = 0x12;attr.retry_cnt = 6;attr.rnr_retry = 0;attr.sq_psn = 0;attr.max_rd_atomic = 1;flags = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT |IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC;assert(0==ibv_modify_qp(res.qp, &attr,flags ));/*=============== 階段5:數據傳輸 ===============*/if (remote_addr) {// 客戶端模式,執行RDMA Write#ifdef USE_CPUstrcpy(res.buf, MSG);#endif#ifdef USE_CUDAassert(0==cudaMemcpy(res.buf, MSG, strlen(MSG), cudaMemcpyHostToDevice));#endif#ifdef USE_VACCassert(0==vaccrtMemcpy(MSG,strlen(MSG),res.buf,kHost2Ddr)); #endifpost_send(&res, IBV_WR_RDMA_WRITE);// 將本地數據寫入遠程內存} else { // 服務端模式,執行RDMA Readpost_send(&res, IBV_WR_RDMA_READ); // 從遠程內存讀取數據到本地}// 等待操作完成assert(0==poll_cq(res.cq));#ifdef USE_VACC{char buffer[MSG_SIZE];memset(buffer,0,MSG_SIZE);assert(0==vaccrtMemcpy(res.buf,strlen(MSG),buffer,kDdr2Host)); printf("Message: %s\n", buffer);}#endif#ifdef USE_CUDA{char buffer[MSG_SIZE];memset(buffer,0,MSG_SIZE);assert(0==cudaMemcpy(buffer,res.buf,strlen(MSG),cudaMemcpyDeviceToHost)); printf("Message: %s\n", buffer);}#endif#ifdef USE_CPUprintf("Message: %s\n", (const char*)res.buf);#endifprintf("end,Enter exit..\n");getchar(),getchar();/*=============== 階段6:資源清理 ===============*/// 注意銷毀順序:QP -> MR -> CQ -> PD -> 上下文ibv_destroy_qp(res.qp);ibv_dereg_mr(res.mr);#ifdef USE_CUDAcudaFree(res.buf);#endif#ifdef USE_VACCvaccrtFree(res.buf);#endif#ifdef USE_CPUfree(res.buf);#endifibv_destroy_cq(res.cq);ibv_dealloc_pd(res.pd);ibv_close_device(res.ctx);close(res.sock);ibv_free_device_list(dev_list);return 0;
}
四、注意事項
- 1.ibv_reg_dmabuf_mr需要傳入之前分配的設備地址,否則會出
mlx5_0/1: QP 574 error: local protection error (0x3b 0x0 0x9d)
- 2.MAX_POLL_CQ_TIMEOUT值太小,會超時
- 3.cudaMalloc的內存在ibv_reg_mr時提示地址非法
- 4.收發完畢后才能釋放資源