io_uring
1、概述
io_uring是Linux(內核版本在5.1以后)在2019年加入到內核中的一種新型的異步I/O模型;
io_uring使用共享內存,解決高IOPS場景中的用戶態和內核態的切換過程,減少系統調用;用戶可以直接向共享內存提交要發起的I/O操作,內核線程可以直接獲取共享內存中的I/O操作,并進行相應的讀寫操作;io_uring是一種proactor模式的網絡架構;
-
Reactor 是非阻塞同步網絡模式,感知的是就緒可讀寫事件。在每次感知到有事件發生(比如可讀就緒事件)后,就需要應用進程主動調用 read 方法來完成數據的讀取,也就是要應用進程主動將 socket 接收緩存中的數據讀到應用進程內存中,這個過程是同步的,讀取完數據后應用進程才能處理數據。
-
Proactor 是異步網絡模式, 感知的是已完成的讀寫事件。在發起異步讀寫請求時,需要傳入數據緩沖區的地址(用來存放結果數據)等信息,這樣系統內核才可以自動幫我們把數據的讀寫工作完成,這里的讀寫工作全程由操作系統來做,并不需要像 Reactor 那樣還需要應用進程主動發起 read/write 來讀寫數據,操作系統完成讀寫工作后,就會通知應用進程直接處理數據。
優點
-
避免了提交I/O事件和完成事件中存在的內存拷貝(使用共享內存)
-
減少的了I/O任務提交和完成事件任務是的系統調用過程
-
采取無鎖隊列,減少了鎖資源的競爭
主要內存結構
- 提交隊列(Submission Queue,SQ)連續的內存空間,環形隊列,存放將要執行的I/O操作數據
- 完成隊列(Completion Queue, CQ)連續的內存空間,環形隊列,存放執行完成I/O操作后的返回結果
- 提交隊列項數組提(Submission Queue Entry,SQE):方便通過環形緩沖區提交內存請求
2、主要接口
io_uring提供三個用戶態的系統調用接口
- io_uring_setup:初始化一個新的io_uring對象,一個SQ和一個CQ,通過使用共享內存進行數據操作
- io_uring_register:注冊用于異步I/O的文件或用戶緩沖區(buffers)
- io_uring_enter:提交I/O任務,等待I/O完成
SQ和CQ保存的都是SQEs數據的索引,不是真正的請求,真實是請求保存在SQE數組中,在提交請求時可以批量提交一組SQE數值上不連續的請求;
SQ、CQ、SQE中的內存區域都是有內核進行分配的,用戶初始化會返回對應的fd,通過fd進行mmap和內核共享內存空間;
3、第三方庫
liburing通過對io_uring進行分裝,提供了一個簡單的API,通過一下命令可以安裝該動態庫
git clone https://github.com/axboe/liburing.git
cd liburing
./configure
make
sudo make install
sudo ldconfig #更新動態庫連接緩存
4、主要使用流程
1. io_uring初始化
io_uring通過io_uring_setup函數初始化,在liburing庫中,通過io_uring_queue_init_params函數進行初始化,創建sumbmit隊列和complete隊列,以及SQE內存數組;
//io_uring實現異步的方式
struct io_uring_params pragma;
memset(&pragma, 0, sizeof(pragma));
struct io_uring ring;
// 初始化io_uring 創建submit隊列和complite隊列
io_uring_queue_init_params(1024, &ring, &pragma);
2. io_uring 提交(注冊)到SQ環形隊列
io_uring通過io_uring_register函數提交(注冊)到用于異步I/O的緩沖區中,在liburing中通過io_uring_prep_accept函數對io_uring_refister進行封裝使用;
// 獲取ringbuffer的頭
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
connect_info_t accept_info = {sockfd, EVENT_ACCEPT};
// 注冊一個I/O事件
io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)clientaddr, addrlen, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));
3. io_uring_enter 提交I/O
io_uring中通過io_uring_enter函數來提交I/O,并等待事件的完成;在liburing中通過io_uring_submit來提交SQE的讀寫請求,io_uring_wait_cqe來等待I/O的處理結果,io_uring_peek_batch_cqe來獲取CQ中的處理結果;
// 提交worker中執行
io_uring_submit(&ring);
struct io_uring_cqe *cqe;
//等待complete隊列中的結果
io_uring_wait_cqe(&ring, &cqe);
struct io_uring_cqe *cqes[128];
// 獲取CQ環形隊列中的處理結果
int count = io_uring_peek_batch_cqe(&ring, cqes, 128);
5、實現
io_uring_server.c
#include <liburing.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <netinet/in.h>enum event_type {EVENT_ACCEPT,EVENT_READ,EVENT_WRITE
};typedef struct connect_info{int conn_fd;int event;
}connect_info_t;struct conn_info {int fd;int event;
};int init_server(unsigned short port)
{ int sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0) {perror("socket");return -1;}struct sockaddr_in serveraddr;;serveraddr.sin_family = AF_INET;serveraddr.sin_port = htons(port);serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);if (bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)) < 0) {perror("bind error");return -1;}int opt = 1;if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {perror("setsockopt");return -1;}listen(sockfd, 10);return sockfd;
}int set_event_recv(struct io_uring *ring, int sockfd, void *buf, int len, int flags)
{struct io_uring_sqe *sqe = io_uring_get_sqe(ring);connect_info_t accept_info = {sockfd, EVENT_READ};io_uring_prep_recv(sqe, sockfd, buf, len, flags);memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));printf("set event recv----\n");return 0;
}int set_event_send(struct io_uring *ring, int sockfd, const void *buf, int len, int flags)
{struct io_uring_sqe *sqe = io_uring_get_sqe(ring);connect_info_t accept_info = {sockfd, EVENT_WRITE};io_uring_prep_send(sqe, sockfd, buf, len, flags);memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));printf("set event send----\n");return 0;
}int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *clientaddr,socklen_t *addrlen, int flags) {// 獲取sqestruct io_uring_sqe *sqe = io_uring_get_sqe(ring);// 初始化accept_infoconnect_info_t accept_info = {sockfd, EVENT_ACCEPT};// 準備accept操作io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)clientaddr, addrlen, flags);// 設置用戶數據memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));printf("set event accept\n");return 0;
}int main(int argc, char *argv[])
{// 初始化服務器unsigned short port = 9999;// 初始化服務器int socketfd = init_server(port);if (socketfd < 0)return -1;//io_uring實現異步的方式struct io_uring_params pragma;// 初始化io_uring 創建submit隊列和complite隊列memset(&pragma, 0, sizeof(pragma));struct io_uring ring;io_uring_queue_init_params(1024, &ring, &pragma);struct sockaddr_in clientaddr;socklen_t addrlen = sizeof(struct sockaddr);// 提交到submit隊列中set_event_accept(&ring, socketfd, (struct sockaddr*)&clientaddr, &addrlen, 0);char buffer[1024] = {0};while (1){// 提交worker中執行io_uring_submit(&ring);printf("complete\n");struct io_uring_cqe *cqe;//等待complete隊列中的結果io_uring_wait_cqe(&ring, &cqe);printf("complete end\n");struct io_uring_cqe *cqes[128];int count = io_uring_peek_batch_cqe(&ring, cqes, 128);for (int i = 0; i < count; i++){struct io_uring_cqe *entries = cqes[i];connect_info_t result;//struct conn_info result;memcpy(&result, &entries->user_data, sizeof(connect_info_t));if (result.event == EVENT_ACCEPT) {// 設置讀事件set_event_accept(&ring, socketfd, (struct sockaddr*)&clientaddr, &addrlen, 0);printf("accept success\n");int conn_fd = entries->res;printf("conn_fd = %d res = %d\n", conn_fd, entries->res);// 設置讀事件set_event_recv(&ring, conn_fd, buffer, 1024,0);}else if (result.event == EVENT_READ){int ret = entries->res;printf("set_event_recv ret: %d, %s\n", ret, buffer);if (ret == 0){close(result.conn_fd);continue;}else if (ret > 0){// 設置寫事件set_event_send(&ring, result.conn_fd, buffer, ret,0);}printf("read success\n");}else if (result.event == EVENT_WRITE){int ret = entries->res;set_event_recv(&ring, result.conn_fd, buffer, 1024,0);printf("write success\n");}}io_uring_cq_advance(&ring, count);}return 0;
}
io_uring_test.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/time.h>#include <sys/socket.h>
#include <arpa/inet.h>#define TIMESUB_MS(tv1, tv2) (((tv2).tv_sec - (tv1).tv_sec) * 1000 + ((tv2).tv_usec - (tv1).tv_usec) / 1000)
#define TEST_MESSAGE "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyz\r\n"
#define RBUFFER_LENGTH 2048
#define WBUFFER_LENGTH 2048typedef struct test_conttext
{char server_ip[16];int server_port;int thread_num;int connection_num;int request_num;int fail_num;
} test_conttext_t;int send_recv_tcp(int sockfd)
{char wbuffer[WBUFFER_LENGTH];char rbuffer[RBUFFER_LENGTH];memset(wbuffer, 0, sizeof(wbuffer));memset(rbuffer, 0, sizeof(rbuffer));for (int i = 0; i < 8; i++){strcpy(wbuffer + i * strlen(TEST_MESSAGE), TEST_MESSAGE);}int res = send(sockfd, wbuffer, strlen(wbuffer), 0);if (res <= 0){return -1;}res = recv(sockfd, rbuffer, sizeof(rbuffer), 0);if (res <= 0){return -1;}if (strcmp(rbuffer, wbuffer) != 0){printf("failed: '%s' != '%s'\n", rbuffer, wbuffer);return -1;}return 0;
}int connect_tcpserver(char *ip, int port)
{int sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0){perror("socket");return -1;}struct sockaddr_in server_addr;server_addr.sin_family = AF_INET;server_addr.sin_port = htons(port);server_addr.sin_addr.s_addr = inet_addr(ip);if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0){perror("connect");close(sockfd);return -1;}return sockfd;
}static void *test_qps(void *arg)
{test_conttext_t *ctx = (test_conttext_t *)arg;int sockfd = connect_tcpserver(ctx->server_ip, ctx->server_port);if (sockfd < 0){printf("connect server failed\n");return NULL;}int conut = ctx->request_num / ctx->connection_num;int indx = 0;int res;while (indx++ < conut){res = send_recv_tcp(sockfd);if (res < 0){printf("send_recv_tcp failed\n");ctx->fail_num++;continue;}}return NULL;
}int main(int argc, char *argv[])
{int i;printf("----%d\n", argc);// for (i = 1; i < argc; i++)// printf("%s\n", argv[i]);test_conttext_t ctx = {0};int opt;while ((opt = getopt(argc, argv, "s:p:t:c:n:")) != -1){switch (opt){case 's':strcpy(ctx.server_ip, optarg);printf("-s: %s\n", optarg);break;case 'p':ctx.server_port = atoi(optarg);printf("-p: %s\n", optarg);break;case 't':ctx.thread_num = atoi(optarg);printf("-t: %s\n", optarg);break;case 'c':ctx.connection_num = atoi(optarg);printf("-c: %s\n", optarg);break;case 'n':ctx.request_num = atoi(optarg);printf("-n: %s\n", optarg);break;default:return EXIT_FAILURE;}}pthread_t *threads = (pthread_t *)malloc(sizeof(pthread_t) * ctx.thread_num);struct timeval start, end;gettimeofday(&start, NULL);for (i = 0; i < ctx.thread_num; i++){printf("thread %d pthread_create\n", i);pthread_create(&threads[i], NULL, test_qps, &ctx);}for (i = 0; i < ctx.thread_num; i++){pthread_join(threads[i], NULL);printf("thread %d finished\n", i);}gettimeofday(&end, NULL);int time_used = TIMESUB_MS(start, end);printf("success :%d, failed:%d, time used: %d , qps %d\n", ctx.request_num-ctx.fail_num, ctx.fail_num, time_used, ctx.request_num * 1000 / time_used);free(threads);return EXIT_SUCCESS;
}