目錄
IO的基本概念
釣魚五人組
五種IO模型
高級IO重要概念
同步通信 VS?異步通信
阻塞 VS 非阻塞
其他高級IO
阻塞IO
非阻塞IO
IO的基本概念
什么是IO?
I/O(input/output)也就是輸入和輸出,在著名的馮諾依曼體系結構當中,將數據從輸入設備拷貝到內存就叫做輸入,將數據從內存拷貝到輸出設備就叫做輸出。
- 對文件進行的讀寫操作本質就是一種IO,文件IO對應的外設就是磁盤。
- 對網絡進行的讀寫操作本質也是一種IO,網絡IO對應的外設就是網卡。
OS如何得知外設當中有數據可讀取?
輸入就是操作系統將數據從外設拷貝到內存的過程,操作系統一定要通過某種方法得知特定外設上是否有數據就緒。
- 并不是操作系統想要從外設讀取數據時外設上就一定有數據。比如用戶正在訪問某臺服務器,當用戶的請求報文發出后就需要等待從網卡當中讀取服務器發來的響應數據,但此時對方服務器可能還沒有收到我們發出的請求報文,或是正在對我們的請求報文進行數據分析,也有可能服務器發來的響應數據還在網絡中路由。
- 但操作系統不會主動檢測外設上是否有數據就緒,這種做法一定會降低操作系統的工作效率,因為大部分情況下,外設當中都是沒有數據的,因此操作系統所做的大部分檢測工作其實都是徒勞的。
- 操作系統實際采用的是中斷的方式來得知外設上是否有數據就緒的,當某個外設上面有數據就緒時,該外設就會向CPU當中的中斷控制發送中斷信號,中斷控制器在根據產生的中斷信號的優先級順序發送給CPU。
- 每一個中斷信號都有一個對應的中斷處理程序,存儲中斷信號和中斷處理程序映射關系的表叫做中斷向量表,當CPU收到某個中斷信號時就會自動停止正在運行的程序,然后根據中斷向量表執行中斷信號對應的中斷處理程序,處理完畢后再返回原被暫停的程序繼續運行。
需要注意的是,CPU不直接和外設打交道指的是在數據層面上,而外設其實是可以直接將某些控制信號發送給CPU的某些控制器的。
OS如何處理從網卡中讀取到的數據包?
操作系統任何時刻都可能會收到大量的數據包,因此操作系統必須將這些數據包管理起來。所謂的管理就是“先描述,再組織”,在內核當中有一個結構叫做sk_buff,該結構就是用來管理和控制接收或發送數據包的信息的。
為了說明sk_buff的作用,下面給出一個簡化版的sk_buff結構:
當操作系統從網卡當中讀取到一個數據包后,會將該數據依次交給鏈路層、網絡層、應用層進行解包和分用,最終將數據包中的數據交給了上層用戶,那對應到這個sk_buff結構來說具體是如何進行數據包的解包和分用的呢?
- 當操作系統從網卡中讀取到一個數據包后,就會定義出一個sk_buff結構,然后sk_buff當中的data指針指向這個讀取到的數據包,并將定義出來的這個sk_buff結構與其他sk_buff結構以雙鏈表的形式組織起來,此時操作系統對各個數據包的管理就變成了對雙鏈表的增刪改查等操作。
- 接下來我們需要將讀取上來的數據包交給最底層的鏈路層處理,進行鏈路層的解包和分用,此時就是讓sk_buff結構中的mac_header指針指向最初的數據包,然后向后讀取鏈路層的報頭,剩下的就是需要交給網絡層處理的有效載荷了,此時便完成了鏈路層的解包。
- 這時鏈路層就需要將有效載荷向上交付給網絡層進行解包和分用了,這里所說的向上交付只是形象的說法,實際向上交付并不是要將數據從鏈路層的緩沖區拷貝到網絡層的緩沖區,我們只需要讓sk_buff結構當中的network_header指針,指向數據包中鏈路層報頭之后的數據即可,然后繼續向后讀取網絡層的報頭,便完成了網絡層的解包。
- 緊接著就是傳輸層對數據進行處理,同樣的道理,讓sk_buff結構當中的transport_header指針,指向數據包中網絡層報頭之后的數據,然后繼續向后讀取傳輸層的報頭,便完成了傳輸層的解包。
- 傳輸層解包后就可根據具體使用的傳輸層協議,對應將剩下的數據拷貝到TCP或UDP的接受緩沖區供用戶讀取即可。
發送數據時對數據進行封裝也是同樣的道理,就是依次在數據前面拷貝上對應的報頭,最后再將數據發送出去(UDP)或拷貝到發送緩沖區(TCP)即可。也就是說,數據包在進行封裝和解包的過程中,本質數據的存儲位置是沒有發生變化的,我們實際只是在用不同的指針對數據進行操作而已。
但內核中的sk_buff并不像上面那樣簡單:
- 一方面,為了保證高效的網絡報文處理效率,這就要求sk_buff的結構也必須是高效的。
- 另一方面,sk_buff結構需要被內核協議中的各個協議共同使用,因此sk_buff必須能夠兼容所有的網絡協議。
因此sk_buff結構實際是非常復雜的,在我的云服務器中sk_buff結構的定義如下:
struct sk_buff {
#ifdef __GENKSYMS__/* These two members must be first. */struct sk_buff *next;struct sk_buff *prev;ktime_t tstamp;
#elseunion {struct {/* These two members must be first. */struct sk_buff *next;struct sk_buff *prev;union {ktime_t tstamp;struct skb_mstamp skb_mstamp;__RH_KABI_CHECK_SIZE_ALIGN(ktime_t a,struct skb_mstamp b);};};struct rb_node rbnode; /* used in netem, ip4 defrag, and tcp stack */};
#endifstruct sock *sk;struct net_device *dev;/** This is the control buffer. It is free to use for every* layer. Please put your private variables there. If you* want to keep them across layers you have to do a skb_clone()* first. This is owned by whoever has the skb queued ATM.*/char cb[48] __aligned(8);unsigned long _skb_refdst;
#ifdef CONFIG_XFRMstruct sec_path *sp;
#endifunsigned int len,data_len;__u16 mac_len,hdr_len;union {__wsum csum;struct {__u16 csum_start;__u16 csum_offset;};};__u32 priority;kmemcheck_bitfield_begin(flags1);__u8 RH_KABI_RENAME(local_df, ignore_df) :1,cloned : 1,ip_summed : 2,nohdr : 1,nfctinfo : 3;__u8 pkt_type : 3,fclone : 2,ipvs_property : 1,peeked : 1,nf_trace : 1;kmemcheck_bitfield_end(flags1);__be16 protocol;void(*destructor)(struct sk_buff *skb);
#if defined(CONFIG_NF_CONNTRACK) || defined(CONFIG_NF_CONNTRACK_MODULE)struct nf_conntrack *nfct;
#endif
#if IS_ENABLED(CONFIG_BRIDGE_NETFILTER)struct nf_bridge_info *nf_bridge;
#endif/* fields enclosed in headers_start/headers_end are copied* using a single memcpy() in __copy_skb_header()*//* private: */RH_KABI_EXTEND(__u32 headers_start[0])/* public: */int skb_iif;RH_KABI_REPLACE(__u32 rxhash,__u32 hash)__be16 vlan_proto;__u16 vlan_tci;#ifdef CONFIG_NET_SCHED__u16 tc_index; /* traffic control index */
#ifdef CONFIG_NET_CLS_ACT__u16 tc_verd; /* traffic control verdict */
#endif
#endif__u16 queue_mapping;kmemcheck_bitfield_begin(flags2);
#ifdef CONFIG_IPV6_NDISC_NODETYPE__u8 ndisc_nodetype : 2;
#endif__u8 pfmemalloc : 1;__u8 ooo_okay : 1;__u8 RH_KABI_RENAME(l4_rxhash, l4_hash) :1;__u8 wifi_acked_valid : 1;__u8 wifi_acked : 1;__u8 no_fcs : 1;__u8 head_frag : 1;/* Indicates the inner headers are valid in the skbuff. */__u8 encapsulation : 1;RH_KABI_EXTEND(__u8 encap_hdr_csum : 1)RH_KABI_EXTEND(__u8 csum_valid : 1)RH_KABI_EXTEND(__u8 csum_complete_sw : 1)RH_KABI_EXTEND(__u8 xmit_more : 1)RH_KABI_EXTEND(__u8 inner_protocol_type : 1)RH_KABI_EXTEND(__u8 remcsum_offload : 1)/* 0/2 bit hole (depending on ndisc_nodetype presence) */kmemcheck_bitfield_end(flags2);#if defined CONFIG_NET_DMA_RH_KABI || defined CONFIG_NET_RX_BUSY_POLL || defined CONFIG_XPSunion {unsigned int napi_id;RH_KABI_EXTEND(unsigned int sender_cpu)RH_KABI_DEPRECATE(dma_cookie_t, dma_cookie)};
#endif
#ifdef CONFIG_NETWORK_SECMARK__u32 secmark;
#endifunion {__u32 mark;__u32 dropcount;__u32 reserved_tailroom;};#ifdef __GENKSYMS____be16 inner_protocol;
#elseunion {__be16 inner_protocol;__u8 inner_ipproto;};
#endif__u16 inner_transport_header;__u16 inner_network_header;__u16 inner_mac_header;__u16 transport_header;__u16 network_header;__u16 mac_header;RH_KABI_EXTEND(kmemcheck_bitfield_begin(flags3))RH_KABI_EXTEND(__u8 csum_level : 2)RH_KABI_EXTEND(__u8 rh_csum_pad : 1)RH_KABI_EXTEND(__u8 rh_csum_bad_unused : 1) /* one bit hole */RH_KABI_EXTEND(__u8 offload_fwd_mark : 1)RH_KABI_EXTEND(__u8 sw_hash : 1)RH_KABI_EXTEND(__u8 csum_not_inet : 1)RH_KABI_EXTEND(__u8 dst_pending_confirm : 1)RH_KABI_EXTEND(__u8 offload_mr_fwd_mark : 1)/* 7 bit hole */RH_KABI_EXTEND(kmemcheck_bitfield_end(flags3))/* private: */RH_KABI_EXTEND(__u32 headers_end[0])/* public: *//* RHEL SPECIFIC** The following padding has been inserted before ABI freeze to* allow extending the structure while preserve ABI. Feel free* to replace reserved slots with required structure field* additions of your backport, eventually moving the replaced slot* before headers_end, if it need to be copied by __copy_skb_header()*/u32 rh_reserved1;u32 rh_reserved2;u32 rh_reserved3;u32 rh_reserved4;union {unsigned int napi_id;RH_KABI_EXTEND(unsigned int sender_cpu)RH_KABI_DEPRECATE(dma_cookie_t, dma_cookie)};
#endif
#ifdef CONFIG_NETWORK_SECMARK__u32 secmark;
#endifunion {__u32 mark;__u32 dropcount;__u32 reserved_tailroom;};#ifdef __GENKSYMS____be16 inner_protocol;
#elsekmemcheck_bitfield_begin(flags1);__u8 RH_KABI_RENAME(local_df, ignore_df) :1,cloned : 1,ip_summed : 2,nohdr : 1,nfctinfo : 3;__u8 pkt_type : 3,fclone : 2,ipvs_property : 1,peeked : 1,nf_trace : 1;kmemcheck_bitfield_end(flags1);__be16 protocol;void(*destructor)(struct sk_buff *skb);
#if defined(CONFIG_NF_CONNTRACK) || defined(CONFIG_NF_CONNTRACK_MODULE)struct nf_conntrack *nfct;
#endif
#if IS_ENABLED(CONFIG_BRIDGE_NETFILTER)struct nf_bridge_info *nf_bridge;
#endif/* fields enclosed in headers_start/headers_end are copied* using a single memcpy() in __copy_skb_header()*//* private: *//* private: */RH_KABI_EXTEND(__u32 headers_start[0])/* public: */int skb_iif;RH_KABI_REPLACE(__u32 rxhash,__u32 hash)__be16 vlan_proto;__u16 vlan_tci;#ifdef CONFIG_NET_SCHED__u16 tc_index; /* traffic control index */
#ifdef CONFIG_NET_CLS_ACT__u16 tc_verd; /* traffic control verdict */
#endif
#endif__u16 queue_mapping;kmemcheck_bitfield_begin(flags2);
#ifdef CONFIG_IPV6_NDISC_NODETYPE__u8 ndisc_nodetype : 2;
#endif__u8 pfmemalloc : 1;__u8 ooo_okay : 1;__u8 RH_KABI_RENAME(l4_rxhash, l4_hash) :1;__u8 wifi_acked_valid : 1;__u8 wifi_acked : 1;__u8 no_fcs : 1;__u8 head_frag : 1;/* Indicates the inner headers are valid in the skbuff. */__u8 encapsulation : 1;RH_KABI_EXTEND(__u8 encap_hdr_csum : 1)RH_KABI_EXTEND(__u8 csum_valid : 1)RH_KABI_EXTEND(__u8 csum_valid : 1)RH_KABI_EXTEND(__u8 csum_complete_sw : 1)RH_KABI_EXTEND(__u8 xmit_more : 1)RH_KABI_EXTEND(__u8 inner_protocol_type : 1)RH_KABI_EXTEND(__u8 remcsum_offload : 1)/* 0/2 bit hole (depending on ndisc_nodetype presence) */kmemcheck_bitfield_end(flags2);#if defined CONFIG_NET_DMA_RH_KABI || defined CONFIG_NET_RX_BUSY_POLL || defined CONFIG_XPSunion {unsigned int napi_id;RH_KABI_EXTEND(unsigned int sender_cpu)RH_KABI_DEPRECATE(dma_cookie_t, dma_cookie)};
#endif
#ifdef CONFIG_NETWORK_SECMARK__u32 secmark;
#endifunion {__u32 mark;__u32 dropcount;__u32 reserved_tailroom;};#ifdef __GENKSYMS____be16 inner_protocol;
#elseunion {__be16 inner_protocol;__u8 inner_ipproto;};
#endif__u16 inner_transport_header;__u16 inner_network_header;__u16 inner_mac_header;__u16 transport_header;__u16 network_header;__u16 mac_header;RH_KABI_EXTEND(kmemcheck_bitfield_begin(flags3))RH_KABI_EXTEND(__u8 csum_level : 2)RH_KABI_EXTEND(__u8 rh_csum_pad : 1)RH_KABI_EXTEND(__u8 rh_csum_bad_unused : 1) /* one bit hole */RH_KABI_EXTEND(__u8 offload_fwd_mark : 1)RH_KABI_EXTEND(__u8 sw_hash : 1)RH_KABI_EXTEND(__u8 csum_not_inet : 1)RH_KABI_EXTEND(__u8 dst_pending_confirm : 1)RH_KABI_EXTEND(__u8 offload_mr_fwd_mark : 1)/* 7 bit hole */RH_KABI_EXTEND(kmemcheck_bitfield_end(flags3))/* private: */RH_KABI_EXTEND(__u32 headers_end[0])/* public: *//* RHEL SPECIFIC** The following padding has been inserted before ABI freeze to* allow extending the structure while preserve ABI. Feel free* to replace reserved slots with required structure field* additions of your backport, eventually moving the replaced slot* before headers_end, if it need to be copied by __copy_skb_header()*/u32 rh_reserved1;u32 rh_reserved2;u32 rh_reserved3;u32 rh_reserved4;/* These elements must be at the end, see alloc_skb() for details. */sk_buff_data_t tail;sk_buff_data_t end;unsigned char *head,*data;unsigned int truesize;atomic_t users;
};
?什么是高效的IO?
IO主要分為兩步:
- 第一步是等,即等待IO條件就緒。
- 第二步是拷貝,也就是當IO條件就緒后將數據拷貝到內存或外設。
任何IO的過程,都包含“等”和“拷貝”這兩個步驟,但在實際的應用場景中“等”消耗的時間往往比“拷貝”消耗的時間多,因此要讓IO變得高效,最核心的方法就是盡量減少“等”的時間。
釣魚五人組
IO的過程其實和釣魚是非常類似的。
- 釣魚的過程同樣分為“等”和“拷貝”兩個步驟,只不過這里的“等”指的是等魚上鉤,“拷貝”指的是當魚上鉤后將魚從河里“拷貝”到我們的魚桶當中。?
- IO時“等”消耗的時間往往比“拷貝”消耗的時間多,釣魚也恰好符合這個特點,釣魚時我們大部分時間都在等魚上鉤,而當魚上鉤后只需要一瞬間就能將魚“拷貝”上來。
在談論高效的IO之前,我們先來看看什么樣的釣魚方式才是高效的。
下面給出五個人的釣魚方式:
- 張三:拿了1個魚竿,將魚鉤拋入水中后就死死的盯著浮漂,什么也不做,當有魚上鉤則揮動魚竿將魚勾上來。
- 李四:拿了1個魚竿,將魚鉤拋入水中后就去做其他事情,然后定期觀察浮漂,如果有魚上鉤則揮動魚竿將魚釣上來,否則繼續去做其他事情。
- 王五:拿了1個魚竿,將魚拋入水中后將魚竿頂部綁一個鈴鐺,然后去做其他事情,如果鈴鐺響了就揮動魚竿將魚釣上來,否則就根本不管魚竿。
- 趙六:拿了100個魚竿,將100個魚竿拋入水中后就定期觀察100個魚竿的浮漂,如果某個魚竿有魚上鉤則揮動對應的魚竿將于釣上來。
- 田七:田七是一個有錢的老板,他給了自己的司機一個桶、一個電話、一個魚竿,讓司機去釣魚,當魚桶裝滿的時候再打電話告訴田七來拿魚,而田七自己則開車去做其他事情去了。
張三、李四、王五的釣魚效率是否一樣?為什么?
- 首先他們他們的釣魚方式都是一樣的,都是先等魚上鉤,然后再將魚釣上來。
- 其次,因為他們每個人都是拿的一根魚竿,當河里有魚來咬魚鉤時,這條魚咬哪一個魚鉤的概率都是相等的。
因此張三、李四、王五他們三個人的釣魚的效率是一樣的,他們只是等魚上鉤的方式不同而已,張三是死等,李四是定期檢測浮漂,而王五通過鈴鐺來判斷是否有魚上鉤。
需要注意的是,這里問的是他們的釣魚效率是否是一樣的,而不是問他們整體誰做的事情最多,如果說整體做事情的量的話,那一定是王五做的最多,李四次之,張三最少。
張三、李四、王五它們三個人分別和趙六比較,誰的釣魚效率更高?
趙六毫無疑問是這四個人當中釣魚效率最高的,因為趙六同時在等多個魚竿上有魚上鉤,因此在單位時間內,趙六的魚竿有魚上鉤的概率是最大的。
- 為了方便計算,我們假設趙六拿了97個魚竿,加上張三、李四、王五的魚竿一共就有100個魚竿。
- 當河里有魚來咬魚鉤時,這條魚咬張三、李四、王五的魚鉤的概率都是百分之一,而咬趙六的魚鉤的概率就是百分之九十七。
- 因此在單位時間內,趙六的魚竿上有魚的概率是張三、李四、王五的97倍。
而高效的釣魚就是要減少單位時間內“等”的時間,增加“拷貝”的時間,所以說趙六的釣魚效率是這四個人當中最高的。
趙六的釣魚效率之所以高,是因為趙六一次等待多個魚竿上的魚上鉤,此時就可以將“等”的時間進行重疊。
如何看待田七的這種釣魚方式?
田七讓自己的司機幫自己釣魚,自己開車去做其他事情去了,此時這個司機具體怎么釣魚已經不重要了,他可以模仿張三、李四、王五、趙六任何一個人的釣魚方式進行釣魚。
最重要的是田七本人并沒有參與整個釣魚的過程,他只是發起了釣魚的任務,而真正釣魚的是司機,田七在司機釣魚期間可能在做任何其他事情,如果將釣魚看作是一種IO的話,那田七的這種釣魚方式就叫做異步IO。
而對于張三、李四、王五、趙六來說,他們都需要自己等魚上鉤,當魚上鉤后又需要自己把魚從河里釣上來,對應到IO當中就是需要自己進行數據的拷貝,因此他們四個人的釣魚方式都叫做同步IO。
五種IO模型?????
實際這五個人的釣魚方式分別對應的就是五種IO模型。
- 張三這種死等的釣魚方式對應就是阻塞IO。
- 李四這種定時檢測是否有魚上鉤的方式就是非阻塞IO。
- 王五這種通過設置鈴鐺得知事件是否就緒的方式就是信號驅動IO。
- 趙六這種一次等待多個魚竿上有魚的釣魚方式就是IO多路轉接。
- 田七這種讓別人幫自己釣魚的釣魚方式就是異步IO。
通過這里的釣魚例子我們可以看到,阻塞IO、非阻塞IO和信號驅動IO本質上是不能提高IO的效率的,但非阻塞IO和信號驅動IO能提高整體做事的效率。
其中,這個釣魚場景中的各個事物都能與IO當中的相關概念對應起來,比如這里釣魚的河對應就是內核,這里的每一個人都是進程或線程,魚竿對應的就是文件描述符或套接字,裝魚的桶對應的就是用戶緩沖區。
五種IO模型
阻塞IO
阻塞IO就是在內核將數據準備好之前,系統調用會一直等待。
圖示如下:
阻塞IO是最常見的IO模型,所有的套接字,默認都是阻塞方式。
- 比如當調用recvfrom函數從某個套接字上讀取數據時,可能底層數據還沒有準備好,此時就需要等待數據就緒,當數據就緒后再將數據從內核拷貝到用戶空間,最后recvfrom函數才會返回。
- 在recvfrom函數等待數據就緒期間,在用戶看來該進程或線程就阻塞住了,本質就是操作系統將該進程或線程的狀態設置為了某種非R狀態,然后將其放入等待隊列當中,當數據就緒后操作系統再將其從等待隊列中喚醒,然后該進程或線程再將數據從內核拷貝到用戶空間。
以阻塞方式進行IO操作的進程或線程,在“等”和“拷貝”期間都不會返回,在用戶看來就像是阻塞住了,因此我們稱之為阻塞IO。
非阻塞IO
非阻塞IO就是,如果內核還未將數據準備好,系統調用仍然會直接返回,并且返回EWOULDBLOCK錯誤碼。
圖示如下:
非阻塞IO往往需要程序員以循環的方式反復嘗試讀寫文件描述符,這個過程稱為輪詢,這對CPU來說是較大的浪費,一般只有特定場景下才使用。
- 比如當調用recvfrom函數以非阻塞方式從某個套接字上讀取數據時,如果底層數據還沒有準備好,那么recvfrom函數立馬返回錯誤放回,而不會讓該進程或線程進行阻塞等待。
- 因為沒有讀取的數據,因此該進程或線程后續還需要繼續調用recvfrom函數,檢測底層數據是否就緒,如果沒有就緒則繼續錯誤返回,直到某次檢測到底層數據就緒后,再將數據從內核拷貝到用戶空間后進行成功返回。
- 每次調用recvfrom函數讀取數據時,就算底層數據沒有就緒,recvfrom函數也會立馬返回,在用戶看來該進程或線程就沒有被阻塞住,因此我們稱之為非阻塞IO。
阻塞IO和非阻塞IO的區別在于,阻塞IO當數據沒有就緒時,后續檢測數據是否就緒的工作是有操作系統發起的,而非阻塞IO當數據沒有就緒時,后續檢測數據是否就緒的工作是由用戶發起的。
信號驅動IO
信號驅動IO就是當內核將數據準備好的時候,使用SIGIO信號通知應用程序進行IO操作。
圖示如下:
當底層數據就緒的時候會向當前進程或線程遞交SIGIO信號,因此可以通過signal或sigaction函數將SIGIO的信號處理程序自定義為需要進行的IO操作,當底層數據就緒時就會自動執行對應的IO操作。
- 比如我們需要調用recvfrom函數從某個套接字上讀取數據,那么就可以將該操作定義為SIGIO的信號處理程序。
- 當底層數據就緒時,操作系統就會遞交SIGIO信號,此時就會自動執行我們定義的信號處理程序,進程將數據從內核拷貝到用戶空間。
信號的產生是異步的,但信號驅動IO時同步IO的一種 。
- 我們說信號的產生是異步的,因為信號在任何時刻都可能產生。
- 但信號驅動IO是同步IO的一種,因為當底層數據就緒時,當前進程或線程需要停下正在做的事情,轉而進行數據的拷貝操作,因此當前進程或線程仍然需要參與IO過程
判斷一個IO過程是同步的還是異步的,本質就是看當前進程或線程是否需要參與IO過程,如果要參與就是同步IO,否則就是異步IO。
IO多路轉接
IO多路轉接也叫做IO多路復用,能夠同時等待多個文件描述符的就緒狀態。
圖示如下:
IO多路轉接的思想:
- 因為IO過程分為“等”和“拷貝”兩個步驟,因此我們使用的recvfrom等接口的底層 實際上都做了兩件事,第一件事就是當數據不就緒時需要等,第二件事就是當數據就緒后需要進行拷貝。
- 雖然recvfrom等接口也有“等”的能力,但這些接口一次只能“等”一個文件描述符上的數據或空間就緒,這樣IO效率太低了
- 因此系統為我們提供了三組接口,分別叫做select、poll和epoll,這些接口的核心工作就是“等”,我們可以將所有“等”的工作都交給這些多路轉接接口。
- 因為這些多路轉接接口是一次“等”多個文件描述符的,因此能夠將“等”的時間進行重疊,當數據就緒后再調用對應的recvfrom等函數進行數據的拷貝,此時這些函數就能夠直接進行拷貝,而不需要進行“等”操作了。
IO多路轉接就像現實生活中的黃牛一樣,只不過IO多路轉接更像幫人排隊的黃牛,因為多路轉接接口實際并沒有幫我們進行數據拷貝的操作。這些排隊黃牛可以一次幫多個人排隊,此時就將多個人排隊的時間進行了重疊。
異步IO
異步IO就是由內核在數據拷貝完成時,通知應用程序。
圖示如下:
- 進行異步IO需要調用一些異步IO的接口,異步IO接口調用后會立馬返回,因為異步IO不需要你進行“等”和“拷貝”的操作,這兩個動作都由操作系統來完成,你要做的只是發起IO。
- 當IO完成后操作系統會通知應用程序,因此進行異步IO的進程或線程并不參與IO的所有細節。
高級IO重要概念
同步通信 VS?異步通信
?同步和異步關注的是消息通信機制。
- 所謂同步,就是在發出一個調用時,在沒有得到結果之前,調用就不返回,但是一旦調用返回,就得到返回值了;換句話說,就是由調用者主動等待這個調用的結果。
- 異步則是相反,調用出發之后,這個調用就直接放回了,所以沒有返回值結果;換句話說,當一個異步過程調用發出后,調用者不會立刻得到結果;而是在調用發出后,被調用者通過狀態、通知來通知調用者,或者通過調用函數處理這個調用。
為什么非阻塞IO在沒有得到結果之前就返回了?
- IO時分為“等”和“拷貝”兩步的,當調用recvfrom進行非阻塞IO時,如果數據沒有就緒,那么調用會直接返回,此時調用返回時并沒有完成一個完整的IO過程,即便調用反悔了那也是屬于錯誤的返回。
- 因此該進程或線程后續需要繼續調用recvfrom,輪詢檢測數據是否就緒,當數據就緒后再把數據從內核拷貝到用戶空間,這才是一次完整的IO過程。
因此,在進行非阻塞IO時,在沒有得到結果之前,雖然這個調用會返回,但后續還需要繼續進行輪詢檢測,因此可以理解成調用還沒有返回,而只有當某次輪詢檢測到數據就緒,并且完成數據拷貝后才認為該調用返回了。
同步通信 VS 同步與互斥
在多進程和多線程當中有同步與互斥的概念,但是這里的同步通信和進程或線程之間的同步是完全不相干的概念。
- 進程/線程同步指的是,在保證數據安全的前提,讓進程/線程能夠按照某種特定的順序訪問臨界資源,從而有效避免饑餓問題,討論的是進程/線程間的一種工作關系。
- 而同步IO指的是進程/線程與操作系統之間的關系,討論的是進程/線程是否需要主動參與IO過程
因此當看到“同步”這個詞的時候,一定要先明確這個同步是同步通信的同步,還是同步與互斥的同步
阻塞 VS 非阻塞
阻塞和非阻塞關注的是程序在等待調用結果(消息、返回值)時的狀態。
- 阻塞調用是指調用結果返回之前,當前線程會被掛起,調用線程只有在得到結果之后才會返回。
- 非阻塞調用指在不能立刻得到結果之前,該調用不會阻塞當前線程。
其他高級IO
非阻塞IO,記錄鎖,系統V流機制,I/O多路轉接(也叫I/O多路復用),readv和writev函數以及存儲映射IO(mmap),這些統稱為高級IO。
阻塞IO
系統中大部分的接口都是阻塞式接口,比如我們可以用read函數從標準輸入當中讀取數據。
#include <iostream>
#include <unistd.h>
#include <fcntl.h>int main()
{char buffer[1024];while (true){ssize_t size = read(0, buffer, sizeof(buffer)-1);if (size < 0){std::cerr << "read error" << std::endl;break;}buffer[size] = '\0';std::cout << "echo# " << buffer << std::endl;}return 0;
}
程序運行后,如果我們不進行輸入操作,此時該進程就會阻塞住,根本原因就是因為此時底層數據不就緒,因此read函數需要進行阻塞等待。
一旦我們進行了輸入操作,此時read函數就會檢測到底層數據就緒,然后立馬將數據讀取到從內核拷貝到我們傳入的buffer數組當中,并且將讀取到的數據輸出到顯示器上面,最后我們就看到了我們輸入的字符串。
說明一下:
- C++當中的cin和C語言當中的scanf也可以讀取從鍵盤輸入的字符,但是cin和scanf會提供用戶緩沖區,為了避免這些因素的干擾,因此這里選擇使用read函數進行讀取。
非阻塞IO
打開文件時默認都是以阻塞的方式打開的,如果要以非阻塞的方式打開某個文件,需要在使用open函數打開文件時攜帶O_NONBLOCK
或O_NDELAY
選項,此時就能夠以非阻塞的方式打開文件。
?
這是在打開文件時設置非阻塞的方式,如果要將已經打開的某個文件或套接字設置為非阻塞,此時就需要用到fcntl函數。?
fcntl函數
fcntl函數的函數原型如下:
int fcntl(int fd, int cmd, ... /* arg */);
?參數說明:
- fd:已經打開的文件描述符。
- cmd:需要進行的操作。
- …:可變參數,傳入的cmd值不同,后面追加的參數也不同。
fcntl函數常用的5種功能與其對應的cmd取值如下:
- 復制一個現有的描述符(cmd=F_DUPFD)。
- 獲得/設置文件描述符標記(cmd=F_GETFD或F_SETFD)。
- 獲得/設置文件狀態標記(cmd=F_GETFL或F_SETFL)。
- 獲得/設置異步I/O所有權(cmd=F_GETOWN或F_SETOWN)。
- 獲得/設置記錄鎖(cmd=F_GETLK, F_SETLK或F_SETLKW)。
返回值說明:
- 如果函數調用成功,則返回值取決于具體進行的操作。
- 如果函數調用失敗,則返回-1,同時錯誤碼會被設置。
實現SetNonBlock函數
我們可以定義一個函數,該函數就用于將指定的文件描述符設置為非阻塞狀態。
- 先調用fcntl函數獲取該文件描述符對應的文件狀態標記(這是一個位圖),此時調用fcntl函數傳入的cmd值為F_GETFL。
- 在獲取到的文件狀態標記上添加非阻塞標記O_NOBLOCK,再次調用fcntl函數對文件狀態標記進行設置,此時調用fcntl函數時傳入的cmd值為F_SETFL
代碼如下:
bool SetNonBlock(int fd)
{int fl=fcntl(fd,F_GETFL);if(fl<0){cerr<<"fcntl error"<<endl;return false;}fcntl(fd,F_SETFL,fl|O_NONBLOCK);return true;
}
此時就將該文件描述符設置為了非阻塞狀態。
此時在調用read函數讀取標準輸入之前,調用SetNonBlock函數將0號文件描述符設置為非阻塞就行了。
代碼如下:
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
#include <signal.h>
#include <cstring>
#include <cerrno>
bool SetNonBlock(int fd)
{int fl = fcntl(fd, F_GETFL);if (fl < 0){std::cerr << "fcntl error" << std::endl;return false;}fcntl(fd, F_SETFL, fl | O_NONBLOCK);return true;
}
int main()
{SetNonBlock(0);char buffer[1024];while (true){ssize_t size = read(0, buffer, sizeof(buffer)-1);if (size < 0){if (errno == EAGAIN || errno == EWOULDBLOCK){ //底層數據沒有就緒std::cout << strerror(errno) << std::endl;sleep(1);continue;}else if (errno == EINTR){ //在讀取數據之前被信號中斷std::cout << strerror(errno) << std::endl;sleep(1);continue;}else{std::cerr << "read error" << std::endl;break;}}buffer[size] = '\0';std::cout << "echo# " << buffer << std::endl;}return 0;
}
需要注意的是,當read函數以非阻塞方式讀取標準輸入時,如果底層數據不就緒,那么read函數就會立即返回,但當底層數據不就緒時,read函數是以出錯的形式返回的,此時的錯誤碼會被設置為EAGAIN
或EWOULDBLOCK
。
因此在以非阻塞方式讀取數據時,如果調用read函數時得到的返回值是-1,此時還需要通過錯誤碼進一步進行判斷,如果錯誤碼的值是EAGAIN或EWOULDBLOCK,說明本次調用read函數出錯是因為底層數據還沒有就緒,因此后續還應該繼續調用read函數進行輪詢檢測數據是否就緒,當數據繼續時再進行數據的讀取。
此外,調用read函數在讀取到數據之前可能會被其他信號中斷,此時read函數也會以出錯的形式返回,此時的錯誤碼會被設置為EINTR
,此時應該重新執行read函數進行數據的讀取。
因此在以非阻塞的方式讀取數據時,如果調用read函數讀取到的返回值為-1,此時并不應該直接認為read函數在底層讀取數據時出錯了,而應該繼續判斷錯誤碼,如果錯誤碼的值為EAGAIN
、EWOULDBLOCK
或EINTR
則應該繼續調用read函數再次進行讀取。?
運行代碼后,當我們沒有輸入數據時,程序就會不斷調用read函數檢測底層數據是否就緒。
一旦我們進行了輸入操作,此時read函數就會在輪詢檢測時檢測到,緊接著立馬將數據讀取到從內核拷貝到我們傳入的buffer數組當中,并且將讀取到的數據輸出到顯示器上面。
非阻塞IO可以在沒有數據輸入的時候,完成其它任務
#include <iostream>
#include <string>
#include <vector>
#include <functional>
using namespace std;#include <cstring>
#include <cstdlib>#include <unistd.h>
#include <fcntl.h>void PrintLog()
{cout << "這是一個打印日志任務..." << endl;
}
void OperMysql()
{cout << "這是一個數據庫語句任務..." << endl;
}
void CheckNet()
{cout << "這是一個檢查網路情況任務..." << endl;
}using func_t = function<void(void)>;
vector<func_t> tasks;void LoadTask()
{tasks.push_back(PrintLog);tasks.push_back(OperMysql);tasks.push_back(CheckNet);
}void HandlerTask()
{for (auto &task : tasks){task();}
}bool SetNonBlock(int fd)
{int fl = fcntl(fd, F_GETFL);if (fl < 0){cerr << "error:" << strerror(errno) << endl;return false;}int ret = fcntl(fd, F_SETFL, fl | O_NONBLOCK);if (ret < 0){cerr << "error:" << strerror(errno) << endl;return false;}return true;
}int main()
{SetNonBlock(0);char buffer[1024];LoadTask();while (true){cout << ">>>";int n = read(0, buffer, sizeof(buffer) - 1); // 檢驗條件是否就緒(等)+拷貝if (n > 0) // 標準輸入的數據就緒,可以進行讀取{buffer[n] = 0;cout << "echo#" << buffer << endl;}else // 讀取數據出現報錯{if (errno == EAGAIN || errno == EWOULDBLOCK) // 因為沒有數據而報錯,執行其它任務后,在進行讀取試探{HandlerTask();sleep(1);continue;}else if (errno == EINTR) // 讀取數據時因為信號到來而終止{continue;}else // 真正的出錯了{cerr << "error:" << strerror(errno) << endl;break;}}}return 0;
}