網絡編程 io_uring

io_uring

1、概述

io_uring是Linux(內核版本在5.1以后)在2019年加入到內核中的一種新型的異步I/O模型;

io_uring使用共享內存,解決高IOPS場景中的用戶態和內核態的切換過程,減少系統調用;用戶可以直接向共享內存提交要發起的I/O操作,內核線程可以直接獲取共享內存中的I/O操作,并進行相應的讀寫操作;io_uring是一種proactor模式的網絡架構;

  • Reactor 是非阻塞同步網絡模式,感知的是就緒可讀寫事件。在每次感知到有事件發生(比如可讀就緒事件)后,就需要應用進程主動調用 read 方法來完成數據的讀取,也就是要應用進程主動將 socket 接收緩存中的數據讀到應用進程內存中,這個過程是同步的,讀取完數據后應用進程才能處理數據。

  • Proactor 是異步網絡模式, 感知的是已完成的讀寫事件。在發起異步讀寫請求時,需要傳入數據緩沖區的地址(用來存放結果數據)等信息,這樣系統內核才可以自動幫我們把數據的讀寫工作完成,這里的讀寫工作全程由操作系統來做,并不需要像 Reactor 那樣還需要應用進程主動發起 read/write 來讀寫數據,操作系統完成讀寫工作后,就會通知應用進程直接處理數據。

優點
  • 避免了提交I/O事件和完成事件中存在的內存拷貝(使用共享內存)

  • 減少的了I/O任務提交和完成事件任務是的系統調用過程

  • 采取無鎖隊列,減少了鎖資源的競爭

主要內存結構
  • 提交隊列(Submission Queue,SQ)連續的內存空間,環形隊列,存放將要執行的I/O操作數據
  • 完成隊列(Completion Queue, CQ)連續的內存空間,環形隊列,存放執行完成I/O操作后的返回結果
  • 提交隊列項數組提(Submission Queue Entry,SQE):方便通過環形緩沖區提交內存請求
2、主要接口

io_uring提供三個用戶態的系統調用接口

  1. io_uring_setup:初始化一個新的io_uring對象,一個SQ和一個CQ,通過使用共享內存進行數據操作
  2. io_uring_register:注冊用于異步I/O的文件或用戶緩沖區(buffers)
  3. io_uring_enter:提交I/O任務,等待I/O完成

在這里插入圖片描述

SQ和CQ保存的都是SQEs數據的索引,不是真正的請求,真實是請求保存在SQE數組中,在提交請求時可以批量提交一組SQE數值上不連續的請求;

SQ、CQ、SQE中的內存區域都是有內核進行分配的,用戶初始化會返回對應的fd,通過fd進行mmap和內核共享內存空間;

3、第三方庫

liburing通過對io_uring進行分裝,提供了一個簡單的API,通過一下命令可以安裝該動態庫

git clone https://github.com/axboe/liburing.git
cd liburing
./configure
make
sudo make install
sudo ldconfig #更新動態庫連接緩存
4、主要使用流程
1. io_uring初始化

io_uring通過io_uring_setup函數初始化,在liburing庫中,通過io_uring_queue_init_params函數進行初始化,創建sumbmit隊列和complete隊列,以及SQE內存數組;

//io_uring實現異步的方式
struct io_uring_params pragma;
memset(&pragma, 0, sizeof(pragma));
struct io_uring ring;
// 初始化io_uring 創建submit隊列和complite隊列
io_uring_queue_init_params(1024, &ring, &pragma);
2. io_uring 提交(注冊)到SQ環形隊列

io_uring通過io_uring_register函數提交(注冊)到用于異步I/O的緩沖區中,在liburing中通過io_uring_prep_accept函數對io_uring_refister進行封裝使用;

// 獲取ringbuffer的頭
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
connect_info_t accept_info = {sockfd, EVENT_ACCEPT};
// 注冊一個I/O事件
io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)clientaddr, addrlen, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));
3. io_uring_enter 提交I/O

io_uring中通過io_uring_enter函數來提交I/O,并等待事件的完成;在liburing中通過io_uring_submit來提交SQE的讀寫請求,io_uring_wait_cqe來等待I/O的處理結果,io_uring_peek_batch_cqe來獲取CQ中的處理結果;

 // 提交worker中執行
io_uring_submit(&ring);
struct io_uring_cqe *cqe;
//等待complete隊列中的結果
io_uring_wait_cqe(&ring, &cqe);
struct io_uring_cqe *cqes[128];
// 獲取CQ環形隊列中的處理結果
int count = io_uring_peek_batch_cqe(&ring, cqes, 128);
5、實現

io_uring_server.c

#include <liburing.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <netinet/in.h>enum event_type {EVENT_ACCEPT,EVENT_READ,EVENT_WRITE
};typedef struct connect_info{int conn_fd;int event;
}connect_info_t;struct conn_info {int fd;int event;
};int init_server(unsigned short port) 
{   int sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0) {perror("socket");return -1;}struct sockaddr_in serveraddr;;serveraddr.sin_family = AF_INET;serveraddr.sin_port = htons(port);serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);if (bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)) < 0) {perror("bind error");return -1;}int opt = 1;if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {perror("setsockopt");return -1;}listen(sockfd, 10);return sockfd; 
}int set_event_recv(struct io_uring *ring, int sockfd, void *buf, int len, int flags)
{struct io_uring_sqe *sqe = io_uring_get_sqe(ring);connect_info_t accept_info = {sockfd, EVENT_READ};io_uring_prep_recv(sqe, sockfd, buf, len, flags);memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));printf("set event recv----\n");return 0;
}int set_event_send(struct io_uring *ring, int sockfd, const void *buf, int len, int flags)
{struct io_uring_sqe *sqe = io_uring_get_sqe(ring);connect_info_t accept_info = {sockfd, EVENT_WRITE};io_uring_prep_send(sqe, sockfd, buf, len, flags);memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));printf("set event send----\n");return 0;
}int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *clientaddr,socklen_t *addrlen, int flags) {// 獲取sqestruct io_uring_sqe *sqe = io_uring_get_sqe(ring);// 初始化accept_infoconnect_info_t accept_info = {sockfd, EVENT_ACCEPT};// 準備accept操作io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)clientaddr, addrlen, flags);// 設置用戶數據memcpy(&sqe->user_data, &accept_info, sizeof(connect_info_t));printf("set event accept\n");return 0;
}int main(int argc, char *argv[])
{// 初始化服務器unsigned short port = 9999;// 初始化服務器int socketfd = init_server(port);if (socketfd < 0)return -1;//io_uring實現異步的方式struct io_uring_params pragma;// 初始化io_uring 創建submit隊列和complite隊列memset(&pragma, 0, sizeof(pragma));struct io_uring ring;io_uring_queue_init_params(1024, &ring, &pragma);struct sockaddr_in clientaddr;socklen_t addrlen = sizeof(struct sockaddr);// 提交到submit隊列中set_event_accept(&ring, socketfd, (struct sockaddr*)&clientaddr, &addrlen, 0);char buffer[1024] = {0};while (1){// 提交worker中執行io_uring_submit(&ring);printf("complete\n");struct io_uring_cqe *cqe;//等待complete隊列中的結果io_uring_wait_cqe(&ring, &cqe);printf("complete end\n");struct io_uring_cqe *cqes[128];int count = io_uring_peek_batch_cqe(&ring, cqes, 128);for (int i = 0; i < count; i++){struct io_uring_cqe *entries = cqes[i];connect_info_t result;//struct conn_info result;memcpy(&result, &entries->user_data, sizeof(connect_info_t));if (result.event == EVENT_ACCEPT) {// 設置讀事件set_event_accept(&ring, socketfd, (struct sockaddr*)&clientaddr, &addrlen, 0);printf("accept success\n");int conn_fd = entries->res;printf("conn_fd = %d  res = %d\n", conn_fd, entries->res);// 設置讀事件set_event_recv(&ring, conn_fd, buffer, 1024,0);}else if (result.event == EVENT_READ){int ret = entries->res;printf("set_event_recv ret: %d, %s\n", ret, buffer);if (ret == 0){close(result.conn_fd);continue;}else if (ret > 0){// 設置寫事件set_event_send(&ring, result.conn_fd, buffer, ret,0);}printf("read success\n");}else if (result.event == EVENT_WRITE){int ret = entries->res;set_event_recv(&ring, result.conn_fd, buffer, 1024,0);printf("write success\n");}}io_uring_cq_advance(&ring, count);}return 0;
}

io_uring_test.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/time.h>#include <sys/socket.h>
#include <arpa/inet.h>#define TIMESUB_MS(tv1, tv2)  (((tv2).tv_sec - (tv1).tv_sec) * 1000 + ((tv2).tv_usec - (tv1).tv_usec) / 1000)
#define TEST_MESSAGE   "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyz\r\n"
#define RBUFFER_LENGTH 2048
#define WBUFFER_LENGTH 2048typedef struct test_conttext
{char server_ip[16];int server_port;int thread_num;int connection_num;int request_num;int fail_num;
} test_conttext_t;int send_recv_tcp(int sockfd)
{char wbuffer[WBUFFER_LENGTH];char rbuffer[RBUFFER_LENGTH];memset(wbuffer, 0, sizeof(wbuffer));memset(rbuffer, 0, sizeof(rbuffer));for (int i = 0; i < 8; i++){strcpy(wbuffer + i * strlen(TEST_MESSAGE), TEST_MESSAGE);}int res = send(sockfd, wbuffer, strlen(wbuffer), 0);if (res <= 0){return -1;}res = recv(sockfd, rbuffer, sizeof(rbuffer), 0);if (res <= 0){return -1;}if (strcmp(rbuffer, wbuffer) != 0){printf("failed: '%s' != '%s'\n", rbuffer, wbuffer);return -1;}return 0;
}int connect_tcpserver(char *ip, int port)
{int sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0){perror("socket");return -1;}struct sockaddr_in server_addr;server_addr.sin_family = AF_INET;server_addr.sin_port = htons(port);server_addr.sin_addr.s_addr = inet_addr(ip);if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0){perror("connect");close(sockfd);return -1;}return sockfd;
}static void *test_qps(void *arg)
{test_conttext_t *ctx = (test_conttext_t *)arg;int sockfd = connect_tcpserver(ctx->server_ip, ctx->server_port);if (sockfd < 0){printf("connect server failed\n");return NULL;}int conut = ctx->request_num / ctx->connection_num;int indx = 0;int res;while (indx++ < conut){res = send_recv_tcp(sockfd);if (res < 0){printf("send_recv_tcp failed\n");ctx->fail_num++;continue;}}return NULL;
}int main(int argc, char *argv[])
{int i;printf("----%d\n", argc);// for (i = 1; i < argc; i++)//     printf("%s\n", argv[i]);test_conttext_t ctx = {0};int opt;while ((opt = getopt(argc, argv, "s:p:t:c:n:")) != -1){switch (opt){case 's':strcpy(ctx.server_ip, optarg);printf("-s: %s\n", optarg);break;case 'p':ctx.server_port = atoi(optarg);printf("-p: %s\n", optarg);break;case 't':ctx.thread_num = atoi(optarg);printf("-t: %s\n", optarg);break;case 'c':ctx.connection_num = atoi(optarg);printf("-c: %s\n", optarg);break;case 'n':ctx.request_num = atoi(optarg);printf("-n: %s\n", optarg);break;default:return EXIT_FAILURE;}}pthread_t *threads = (pthread_t *)malloc(sizeof(pthread_t) * ctx.thread_num);struct timeval start, end;gettimeofday(&start, NULL);for (i = 0; i < ctx.thread_num; i++){printf("thread %d pthread_create\n", i);pthread_create(&threads[i], NULL, test_qps, &ctx);}for (i = 0; i < ctx.thread_num; i++){pthread_join(threads[i], NULL);printf("thread %d finished\n", i);}gettimeofday(&end, NULL);int time_used = TIMESUB_MS(start, end);printf("success :%d, failed:%d,  time used: %d , qps %d\n", ctx.request_num-ctx.fail_num, ctx.fail_num, time_used, ctx.request_num * 1000 / time_used);free(threads);return EXIT_SUCCESS;
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/713948.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/713948.shtml
英文地址,請注明出處:http://en.pswp.cn/news/713948.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

vue + cesium初始化地圖 + 鼠標經過地圖(點、線等其他實體)樣式

vue cesium初始化地圖 鼠標經過地圖&#xff08;點、線等其他實體&#xff09;樣式 export function initMap(mapViewer) {Cesium.Ion.defaultAccessToken "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJqdGkiOiI0OTUzOGJhMy1iNzVjLTQwZjItYWYyNy03YjA4MjM0YWE2MWMiLCJpZ…

Unity(第二十二部)官方的反向動力學一般使用商城的IK插件,這個用的不多

反向動力學&#xff08;Inverse Kinematic&#xff0c;簡稱IK&#xff09;是一種通過子節點帶動父節點運動的方法。 正向動力學 在骨骼動畫中&#xff0c;大多數動畫是通過將骨架中的關節角度旋轉到預定值來生成的&#xff0c;子關節的位置根據父關節的旋轉而改變&#xff0c;這…

編寫腳本一鍵安裝rsyslog

腳本分解 環境檢測部分 檢查操作系統 #!/bin/bash# 檢查是否為 Debian 類型 if [ -f /etc/debian_version ]; thenecho "當前操作系統是 Debian 類型"SYSLOG_SERVICE"rsyslog"INSTALL_COMMAND"apt-get install -y"CONFIG_FILE"/etc/rsys…

Vmware esxi虛擬主機狀態無效,無法注銷重啟等操作修復解決

問題 裝有ESXI系統的服務器在強制關機啟動后&#xff0c;顯示虛擬機狀態是無效的&#xff0c;并且無法進行任何操作。 解決辦法 對出問題的虛擬機重新注冊 1、開啟esxi系統的ssh功能 2、取消注冊出問題的虛擬機 找到問題的虛擬機 [rootlocalhost:~] vim-cmd vmsvc/getal…

燒腦問題解決辦法:如何選擇一款合適自己的手機流量卡

現在社會人們越來越離不開手機了&#xff0c;手機給我們生活帶來了翻天覆地的變化&#xff0c;手機需要最多的就是流量了&#xff0c;所以選擇一款合適自己的手機流量卡就顯得尤為重要了&#xff0c;今天小編就給大家來分享一下我的經驗&#xff0c;希望對大家能有幫助&#xf…

python基礎-基本數據類型深入-2.1

目錄 元組 什么是元組&#xff08;tuple&#xff09; 元組練習-1 元組的基本操作 元組常用內建函數 列表和元組的區別與總結 元組練習-2 元組練習-3 元組 什么是元組&#xff08;tuple&#xff09; 與列表&#xff08;list&#xff09;一樣&#xff0c;元組&#xff0…

【Web安全靶場】sqli-labs-master 54-65 Challenges 與62關二分法和like模糊搜索

sqli-labs-master 54-65 Challenges 其他關卡和靶場見專欄… 文章目錄 sqli-labs-master 54-65 Challenges第五十四關-聯合注入第五十五關-聯合注入第五十六關-聯合注入第五十七關-聯合注入第五十八關-報錯注入第五十九關-報錯注入第六十關-報錯注入第六十一關-報錯注入第六十…

grid的刪除

/u01/11.2.0/grid 為 grid 用戶的安裝主目錄 1、刪除 crs 配置信息 root: cd /u01/11.2.0/grid/crs/install perl rootcrs.pl -deconfig -force 2、刪除 crs 配置信息&#xff0c;并卸載軟件 cd /u01/11.2.0/grid/deinstall ./deinstall grid user: ./deinstall -home /…

Mysql與StarRocks語法上的不同

&#x1f413; 序言 StarRocks 是新一代極速全場景 MPP (Massively Parallel Processing) 數據庫。StarRocks 的愿景是能夠讓用戶的數據分析變得更加簡單和敏捷。用戶無需經過復雜的預處理&#xff0c;可以用StarRocks 來支持多種數據分析場景的極速分析。 &#x1f413; 語法…

嵌入式驅動學習第一周——linux的休眠與喚醒

前言 本文介紹進程的休眠與喚醒。 嵌入式驅動學習專欄將詳細記錄博主學習驅動的詳細過程&#xff0c;未來預計四個月將高強度更新本專欄&#xff0c;喜歡的可以關注本博主并訂閱本專欄&#xff0c;一起討論一起學習。現在關注就是老粉啦&#xff01; 行文目錄 前言1. 阻塞和非阻…

第二節 數學知識補充

一、線性代數 向量的 L 2 L_2 L2?范數&#xff08;Euclidean范數/Frobenius范數&#xff09;&矩陣的元素形式范數 向量的 L 2 L_2 L2?范數&#xff1a; ∣ ∣ x ∣ ∣ 2 ( ∣ x 1 ∣ 2 ? ∣ x m ∣ 2 ) 1 2 ||x||_2(|x_1|^2\cdots|x_m|^2)^{\frac12} ∣∣x∣∣2?(∣…

Verilog參數、Verilog參數和屬性沖突、整數處理

Verilog參數 Verilog參數執行以下操作&#xff1a; ?允許您創建易于重用和擴展的參數化代碼。 ?使代碼更可讀、更緊湊、更易于維護。 ?將此類功能描述為&#xff1a; ○ 總線尺寸 ○ 建模設計單元中某些重復元素的數量 ?是常數。對于參數化模塊的每個實例化&#xf…

電腦桌面便簽哪個好,好用的電腦桌面便簽推薦

在如今信息爆炸的時代&#xff0c;人們的工作和生活節奏越來越快&#xff0c;記事和備忘變得尤為重要。而電腦桌面便簽作為一種方便快捷的記錄工具&#xff0c;備受廣大用戶青睞。那么&#xff0c;電腦桌面便簽哪個好&#xff0c;哪個更加出色呢&#xff1f; 作為一名人事專員…

CryoEM - 使用 cryoSPARC 基于單顆粒圖像從頭重構蛋白質三維結構

歡迎關注我的CSDN:https://spike.blog.csdn.net/ 本文地址:https://blog.csdn.net/caroline_wendy/article/details/136384544 基于冷凍電鏡單顆粒圖像重構蛋白質三維結構,利用冷凍電鏡技術測定生物大分子結構的方法。原理是從冷凍電鏡獲得大量同一種蛋白質分子的二維投影圖…

【數據結構】實現隊列

大家好&#xff0c;我是蘇貝&#xff0c;本篇博客帶大家了解隊列&#xff0c;如果你覺得我寫的還不錯的話&#xff0c;可以給我一個贊&#x1f44d;嗎&#xff0c;感謝?? 目錄 一. 隊列的概念及結構二. 隊列的實現隊列的結構體初始化銷毀隊尾插入隊頭刪除顯示第一個節點的值…

【Java程序員面試專欄 算法思維】六 高頻面試算法題:動態規劃

一輪的算法訓練完成后,對相關的題目有了一個初步理解了,接下來進行專題訓練,以下這些題目就是匯總的高頻題目,本篇主要聊聊回溯算法,主要就是排列組合問題,所以放到一篇Blog中集中練習 題目關鍵字解題思路時間空間零錢兌換動態規劃+雙重循環dp[i]表示兌換金額為i元的最少…

從0開始學習NEON(0)

1、前言 ? 最近在學習NEON指令集&#xff0c;主要學習"單指令流&#xff0c;多數據流"的編程方式&#xff0c;在這之前主要是針對cuda編程進行學習。最近的一部分工作轉移到了arm主板上&#xff0c;接觸到了 ncnn這個開源項目&#xff0c;不得不佩服nihui的強大。在…

C# 線性插值

線性插值是一種常用的插值算法&#xff0c;適用于許多實際場景。 傳感器數據處理&#xff1a;在傳感器數據處理中&#xff0c;可能會出現數據點不連續或不均勻的情況。使用線性插值可以根據已知的數據點來估算在兩個數據點之間的數值&#xff0c;從而填補數據中的缺失或不連續之…

iOS消息轉發流程

當向Objc對象發送消息時&#xff0c;如果找到對象對應的方法&#xff0c;就會進入消息轉發流程&#xff0c;給開發者提供一些最后的機會處理消息無法發送問題&#xff0c;以免出現程序崩潰。 1. 回調對象的resolveInstanceMethod方法&#xff0c;在這個方法中&#xff0c;允許開…

阿里云定價_ECS產品價格_云服務器收費標準 - 阿里云官方活動

2024年最新阿里云服務器租用費用優惠價格表&#xff0c;輕量2核2G3M帶寬輕量服務器一年61元&#xff0c;折合5元1個月&#xff0c;新老用戶同享99元一年服務器&#xff0c;2核4G5M服務器ECS優惠價199元一年&#xff0c;2核4G4M輕量服務器165元一年&#xff0c;2核4G服務器30元3…