消息隊列NetMQ 原理分析2-IO線程和完成端口

目錄

  • 前言
    • 介紹
    • 目的
  • IO線程
  • 初始化IO線程
    • Proactor
    • 啟動Procator線程輪詢
    • 處理socket
  • IOObject
  • 總結

前言

介紹

[NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是對標準socket接口的擴展。它提供了一種異步消息隊列,多消息模式,消息過濾(訂閱),對多種傳輸協議的無縫訪問。
當前有2個版本正在維護,版本3最新版為3.3.4,版本4最新版本為4.0.0-rc5。本文檔是對4.0.0-rc5分支代碼進行分析。

zeromq的英文文檔
NetMQ的英文文檔

目的

對NetMQ的源碼進行學習并分析理解,因此寫下該系列文章,本系列文章暫定編寫計劃如下:

  1. 消息隊列NetMQ 原理分析1-Context和ZObject
  2. 消息隊列NetMQ 原理分析2-IO線程和完成端口
  3. 消息隊列NetMQ 原理分析3-命令產生/處理和回收線程
  4. 消息隊列NetMQ 原理分析4-Session和Pipe
  5. 消息隊列NetMQ 原理分析5-Engine
  6. 消息隊列NetMQ 原理分析6-TCP和Inpoc實現
  7. 消息隊列NetMQ 原理分析7-Device
  8. 消息隊列NetMQ 原理分析8-不同類型的Socket
  9. 消息隊列NetMQ 原理分析9-實戰

友情提示: 看本系列文章時最好獲取源碼,更有助于理解。


IO線程

NetMQ 4.0.0底層使用的是IOCP(即完成端口)模式進行通信的(3.3.4使用的是select模型),通過異步IO綁定到完成端口,來最大限度的提高性能。這里不對同步/異步socket進行詳細介紹。稍微解釋下完成端口,為了解決每個socket客戶端使用一個線程進行通信的性能問題,完成端口它充分利用內核對象的調度,只使用少量的幾個線程來處理和客戶端的所有通信,消除了無謂的線程上下文切換,最大限度的提高了網絡通信的性能。
想詳細了解完成端口的請看完成端口(Completion Port)詳解,講解的比較詳細,同時對各種網絡編程模型做了簡單的介紹。
因此NetMQ通過幾個(默認1個)IO線程處理通信,上一片文章介紹了ZObejct對象,在該對象中存在許多命令的處理,實際對命令的發送,分配都是IO線程的工作。

初始化IO線程

IO線程初始化時會初始化ProactorIOThreadMailbox

var name = "iothread-" + threadId;
m_proactor = new Proactor(name);
m_mailbox = new IOThreadMailbox(name, m_proactor, this);

Proactor對象就是用來綁定或處理完成端口用的,后面再做作詳細介紹。
IOThreadMailbox是IO線程處理的信箱,每當有命令需要處理時,都會向當前Socket對象所在的IO線程信箱發送命令。
讓我們看一眼IOThread對象和IOThreadMailbox的定義

internal sealed class IOThread : ZObject, IMailboxEvent
{
}

IOThread對象繼承自ZObject對象,記得上一節想到ZObject對象知道如何處理各種命令嗎?因此IOThread對象也繼承了他父親的技能。同時IOThread對象實現了IMailboxEvent接口,這個接口之定義了一個方法。

internal interface IMailboxEvent
{void Ready();
}

當IO信箱接受到命令時表示當前有命令準備好了,可以進行 處理,IO信箱則會調用IO線程的Ready方法處理命令,那么IO信息如何調用IO線程的Ready方法呢,來看下IOThreadMailbox的構造函數。

internal class IOThreadMailbox : IMailbox
{...public IOThreadMailbox([NotNull] string name, [NotNull] Proactor proactor, [NotNull] IMailboxEvent mailboxEvent){m_proactor = proactor;m_mailboxEvent = mailboxEvent;Command cmd;bool ok = m_commandPipe.TryRead(out cmd);}...
}

在IOThreadMailbox初始化時,傳入了IMailboxEvent。

m_commandPipe是NetMQ的管道(Pipe),后面我們會對其做介紹,這里只要知道該管道用于存放命令即可,可以__暫時__理解為管道隊列。

Proactor

每個IOThread會有一個Proactor,Proactor的工作就是將Socket對象綁定到完成端口,然后定時去掃描完成端口是否有需要處理的Socket對象。

internal class Proactor : PollerBase
{...public Proactor([NotNull] string name){m_name = name;m_stopping = false;m_stopped = false;m_completionPort = CompletionPort.Create();m_sockets = new Dictionary<AsyncSocket, Item>();}...
}

Proactor對象繼承自PollerBase,那么PollerBase又是什么呢?從命名可以看這是一個輪詢基類,即該對象需要長時間不斷循環處理某件事情。
PollerBase對象是一個抽象類,它有2個功能:

  1. 負載均衡

    還記的Context中選擇IO線程時有這個一段代碼嗎?
    580757-20181020185114964-1158578268.png
    IO線程的負載均衡功能就是PollBase對象提供的

    每次選擇IO線程時會將m_load字段值+1
    protected void AdjustLoad(int amount) { Interlocked.Add(ref m_load, amount); }
    public int Load { get { #if NETSTANDARD1_3 return Volatile.Read(ref m_load); #else Thread.MemoryBarrier(); return m_load; #endif } }
    IOThread取PollBase對象(Proactor)的Load屬性時候會特殊處理,保證拿到的是最新的值。
  2. 定時任務
    PollBase第二個功能就是支持定時任務,即定時觸發某事件。

    private readonly SortedList<long, List<TimerInfo>> m_timers;

    PollBase內部有一個SortedList,key為任務執行的時間,value為TimeInfo
    TimeInfo對象包含2個信息,idITimerEvent接口,id用來辨別當前任務的類型,ITimerEvent接口就包含了TimerEvent方法,即如何執行。
    TcpConnection連接失敗會重新連接時會重連,下面時TcpConnection開始連接方法

    private void StartConnecting()
    {Debug.Assert(m_s == null);// Create the socket.try{m_s = AsyncSocket.Create(m_addr.Resolved.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);}catch (SocketException){AddReconnectTimer();return;}...
    }
    private void AddReconnectTimer()
    {//獲取重連時間間隔int rcIvl = GetNewReconnectIvl();//IO線程的Proactor中,TcpConnection的ReconnectTimerId = 1 m_ioObject.AddTimer(rcIvl, ReconnectTimerId);...
    }

    IO線程會被封裝到IOObject中,調用IOObjectAddTimer方法實際就是調用IO線程中Proactor對象的AddTimer方法,其方法定義如下

    public void AddTimer(long timeout, [NotNull] IProactorEvents sink, int id)
    {long expiration = Clock.NowMs() + timeout;var info = new TimerInfo(sink, id);if (!m_timers.ContainsKey(expiration))m_timers.Add(expiration, new List<TimerInfo>());m_timers[expiration].Add(info);
    }

    第一行會獲取當前的毫秒時間加上時間間隔。然后加入到m_timers中。

m_completionPort = CompletionPort.Create();
m_sockets = new Dictionary<AsyncSocket, Item>();

初始化時會創建完成端口,當有socket需要處理時會和完成端口綁定。
初始化時還會初始化一個存放異步AsyncSocketitem的字典。
有關于AsyncSocketCompletionPort可以去Git上看AsyncIO的源碼,這里不做分析。
Item結構如下

private class Item
{public Item([NotNull] IProactorEvents proactorEvents){ProactorEvents = proactorEvents;Cancelled = false;}[NotNull] public IProactorEvents ProactorEvents { get; }public bool Cancelled { get; set; }
}

它包含了IProactorEvents接口的信息和當前Socket操作是否被取消標志。

internal interface IProactorEvents : ITimerEvent
{void InCompleted(SocketError socketError, int bytesTransferred);void OutCompleted(SocketError socketError, int bytesTransferred);
}

IProactorEvents繼承自ITimerEvent。同時它還聲明了InCompletedOutCompleted方法,即發送或接收完成時如何處理,因此當需要處理Socket時,會將當前Socket處理方式保存到這個字典中。當當前對象發送消息完成,則會調用OutCompleted方法,接收完成時則會調用InCompleted方法。
當有Socket需要綁定時會調用ProactorAddSocket方法

public void AddSocket(AsyncSocket socket, IProactorEvents proactorEvents)
{var item = new Item(proactorEvents);m_sockets.Add(socket, item);m_completionPort.AssociateSocket(socket, item);AdjustLoad(1);
}

它包含2個參數,一個時異步Socket對象和IProactorEvents。然后加把他們加入到字段中并將他們綁定到完成端口上。第四段AdjustLoad方法即把當前IO線程處理數量+1,用于負載均衡用。

Socket操作完成時會調用ProactorRemoveSocket移除綁定

public void RemoveSocket(AsyncSocket socket)
{AdjustLoad(-1);var item = m_sockets[socket];m_sockets.Remove(socket);item.Cancelled = true;
}

移除時會將itemCancelled字段設置為true。所以當Proactor輪詢處理Socket時發現該Socket操作被取消(移除),就會跳過處理。

啟動Procator線程輪詢

在IO線程啟動時實際就是啟動Procator的work線程

public void Start()
{m_proactor.Start();
}
public void Start()
{m_worker = new Thread(Loop) { IsBackground = true, Name = m_name };m_worker.Start();
}

處理socket

完整的Loop方法如下

private void Loop()
{var completionStatuses = new CompletionStatus[CompletionStatusArraySize];while (!m_stopping){// Execute any due timers.int timeout = ExecuteTimers();int removed;if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout != 0 ? timeout : -1, completionStatuses, out removed))continue;for (int i = 0; i < removed; i++){try{if (completionStatuses[i].OperationType == OperationType.Signal){var mailbox = (IOThreadMailbox)completionStatuses[i].State;mailbox.RaiseEvent();}// if the state is null we just ignore the completion statuselse if (completionStatuses[i].State != null){var item = (Item)completionStatuses[i].State;if (!item.Cancelled){switch (completionStatuses[i].OperationType){case OperationType.Accept:case OperationType.Receive:item.ProactorEvents.InCompleted(completionStatuses[i].SocketError,completionStatuses[i].BytesTransferred);break;case OperationType.Connect:case OperationType.Disconnect:case OperationType.Send:item.ProactorEvents.OutCompleted(completionStatuses[i].SocketError,completionStatuses[i].BytesTransferred);break;default:throw new ArgumentOutOfRangeException();}}}}catch (TerminatingException){ }}}
}
 var completionStatuses = new CompletionStatus[CompletionStatusArraySize];

第一行初始化了CompletionStatus數組,CompletionStatusArraySize值為100。
CompletionStatus作用是用來保存socket的信息或狀態。

獲取超時時間

int timeout = ExecuteTimers();
 protected int ExecuteTimers()
{if (m_timers.Count == 0)return 0;long current = Clock.NowMs();var keys = m_timers.Keys;for (int i = 0; i < keys.Count; i++){var key = keys[i];if (key > current){return (int)(key - current);}var timers = m_timers[key];foreach (var timer in timers){timer.Sink.TimerEvent(timer.Id);}timers.Clear();m_timers.Remove(key);i--;}return 0;
}

ExecuteTimers會計算之前加入到m_timers需要等待的超時時間,若沒有對象則直接返回0,否則獲取若獲取到key時間在當前時間之前,則需要調用TimerEvent方法,調用完成后移除。
若獲取到的key時間比當前時間大,則返回他們的差即為需要等待的超時時間。

從完成端口獲取處理完的狀態

int removed;
if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout != 0 ? timeout : -1, completionStatuses, out removed))continue;

GetMultipleQueuedCompletionStatus方法傳入一個超時時間,若前面獲取的超時時間為0,則這邊會設置為-1,表示阻斷直到有要處理的才返回。
CompletionPort內部維護了一個狀態隊列,removed即為處理完成返回的狀態個數。
若獲取成功則會返回true,后面就開始遍歷completionStatuses數組處理完成Socket

開始處理待處理的狀態

public struct CompletionStatus
{internal CompletionStatus(AsyncSocket asyncSocket, object state, OperationType operationType, SocketError socketError, int bytesTransferred) : this(){AsyncSocket = asyncSocket;State = state;OperationType = operationType;SocketError = socketError;BytesTransferred = bytesTransferred;}public AsyncSocket AsyncSocket { get; private set; }public object State { get; internal set; }public OperationType OperationType { get; internal set; }public SocketError SocketError { get; internal set; }public int BytesTransferred { get; internal set; }        
}

CompletionStatus是個結構體,它包含的信息如上。其中OperationType是當前Socket的處理方式。

public enum OperationType
{Send, Receive, Accept, Connect, Disconnect, Signal
} 

for循環的一開始先會判斷當前狀態的OperationType,若是Signal,則說明當前是個信號狀態,說明有命令需要處理,則會調用IO信箱的RaiseEvent方法,實際為IO線程的Ready方法。

public void Ready()
{Command command;while (m_mailbox.TryRecv(out command))command.Destination.ProcessCommand(command);
}

IOThread會將當前信箱的所有命令進行處理。
若不是Signal則會將CompletionStatus保存的狀態信息轉換為Item對象,并判斷當前Socket是否移除(取消)。若沒有則對其進行處理。判斷OperationType,若為AcceptReceive則表示需要接收,則調用InCompleted方法。若為Connect,DisconnectSend則表示有消息向外發送,則調用OutCompleted方法。

至此IOThread代碼分析完畢。

IOObject

internal class IOObject : IProactorEvents
{public IOObject([CanBeNull] IOThread ioThread){if (ioThread != null)Plug(ioThread);}public void Plug([NotNull] IOThread ioThread){Debug.Assert(ioThread != null);m_ioThread = ioThread;}
}

IOObject實際就是保存了IOThread的信息和Socket處理完成時如何執行,以及向外暴露了一些接口。

再次說明,如果向簡單了解完成端口如何使用,則看《完成端口使用》,如果想詳細了解完成端口則看下《完成端口詳細介紹》,如果想直到NetMQ的AsyncIO和完成端口的源碼請看AsyncIO。

總結

該篇介紹了IO線程和完成端口的處理方式,若哪里分析的不到位或有誤希望支出。


本文地址:https://www.cnblogs.com/Jack-Blog/p/6347163.html
作者博客:杰哥很忙
歡迎轉載,請在明顯位置給出出處及鏈接)

轉載于:https://www.cnblogs.com/Jack-Blog/p/6347163.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/396846.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/396846.shtml
英文地址,請注明出處:http://en.pswp.cn/news/396846.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

django——url(路由)配置

URL是Web服務的入口&#xff0c;用戶通過瀏覽器發送過來的任何請求&#xff0c;都是發送到一個指定的URL地址&#xff0c;然后被響應。 在Django項目中編寫路由&#xff0c;就是向外暴露我們接收哪些URL的請求&#xff0c;除此之外的任何URL都不被處理&#xff0c;也沒有返回。…

VC連接mysql數據庫錯誤:libmysql.lib : fatal error LNK1113: invalid machine 解決方法

VC連接MySQL的配置過程在上一篇博文中&#xff0c;不過當你設置好&#xff0c;以為萬事大吉的時候&#xff0c;運行卻出現這個錯誤&#xff1a;libmysql.lib : fatal error LNK1113: invalid machine type。 無效的機器類型&#xff0c;真的是很讓人捉急。 發生這個錯誤的原因是…

linux 內存泄漏 定位,一種內存泄露檢查和定位的方法

一個系統后臺服務進程&#xff0c;可能包括多個線程&#xff0c;在生成環境下要求系統程序能夠穩定長時間穩定運行而不宕機。其中一個基本的前提就是需要保證系統程序不存在內存泄露。那么&#xff0c;該如何判讀系統程序是否存在內存泄露呢&#xff1f;如果存在&#xff0c;又…

python怎么發送郵件_在Python如何使用SMTP發送郵件

SMTP&#xff08;Simple Mail Transfer Protocol&#xff09;即簡單郵件傳輸協議,它是一組用于由源地址到目的地址傳送郵件的規則&#xff0c;由它來控制信件的中轉方式。 python的smtplib提供了一種很方便的途徑發送電子郵件。它對smtp協議進行了簡單的封裝。 Python創建 SMTP…

統計單詞個數(劃分型)

codevs 1040 統計單詞個數 2001年NOIP全國聯賽提高組 題目等級 : 黃金 Gold題目描述 Description給出一個長度不超過200的由小寫英文字母組成的字母串(約定;該字串以每行20個字母的方式輸入&#xff0c;且保證每行一定為20個)。要求將此字母串分成k份(1<k<40)&#xff0c…

基于ASP.NET的新聞管理系統(三)代碼展示

5.1.1欄目部分 增加欄目&#xff08;addLanMu.aspx&#xff09;&#xff1a; <html xmlns"http://www.w3.org/1999/xhtml"> <head runat"server"> <title></title> <link rel"stylesheet" type"text/css" …

洛谷-求同構數的個數-NOIP2013提高組復賽

題目描述 Description 所謂同構數是指這樣的數&#xff0c;即它出現在它的平方數的右端。例如&#xff0c;5的平方是25 &#xff08;即5525&#xff09;&#xff0c;5是25右端的數&#xff0c;那么5就是同構數。又如&#xff0c;25的平方是625&#xff08;即2525625&#xff09…

plex linux 數據目錄,shareplex日常維護文檔

2017/07/25##SharePlex日常維護源(SRC)&#xff1a;192.168.1.101 db01目標(TGT):192.168.1.102 db02SRC:su - oraclesp_ctrlshowqstatusshow capture detailshow read detailshow log reverseshow config --查看當前使用參數文件list config --羅列出所有的參數文件(使用和未使…

ifconfig命令找不到_02. Linux命令之查看網絡連接

1. 查看網絡連接數和端口使用 netstat 命令查看網絡連接情況netstat -anp參數&#xff1a;-a 顯示所有選項-t (tcp)僅顯示tcp相關選項-u (udp)僅顯示udp相關選項-n 拒絕顯示別名&#xff0c;能顯示數字的全部轉化成數字。-p 顯示建立相關鏈接的程序名關鍵列解釋:Proto 表示協議…

grep與egrep的區別

grep與egrep的區別&#xff1b; 在linux系統環境下&#xff0c;我們通常使用grep命令來過濾出需要的行而egrep確很少使用&#xff0c;他們的區別其實很簡單&#xff0c;grep默認不支持正則表達式&#xff0c;egrep默認支持正則表達式&#xff0c;egrep 等于 grep -E 命令。轉載…

python學習之模塊(pip),列表生成式,模塊操作mysql,excel

python基礎 生成式 列表生成式  格式 [表達式 for 表達式 in 迭代對象 (可加判斷)] 原&#xff1a; 1 res1 [] 2 for i in range(1,5): 3   res1.append(i) 4 print(res1) 改&#xff1a; 1 res2 [i for i in range(1,5)] 2 print(res2) 字典生成式  格式 {key:value f…

linux驅動read函數 copytouser,Linux驅動編程 step-by-step (五)主要的文件操作方法實現...

主要的文件操作方法實現文件操作函數有很多的操作接口&#xff0c;驅動編程需要實現這些接口&#xff0c;在用戶編程時候系統調用時候會調用到這些操作structfile_operations {...loff_t (*llseek) (structfile *, loff_t,int);ssize_t (*read) (structfile *,char__user *,siz…

web開發中的 emmet 效率提升工具

web開發中的 emmet 效率提升工具 可以用來快速生成html 代碼。 并且給各種IDE、編輯器提供了插件支持&#xff0c;sublime &#xff0c;webstorm等。 如在webstorm中安裝好emmet之后&#xff0c;輸入以下文本&#xff0c; #page>div.content[ng-model"user"]ul>…

python二維數組操作_Python二維數組應用與操作

課課家將會在這里為大家詳細的介紹一下Python二維數組的相關應用方法以及定義方式&#xff0c;相信朋友們可以從中學習到更多的知識。 Python數組的應用中在實際編程中是一個非常重要的應用技術&#xff0c;作為Python編程人員來說&#xff0c;必須要熟練的掌握這方面的所有應用…

基于光線追蹤的渲染中景深(Depth of field)效果的實現

圖形學離線渲染中常用的透視攝像機模型時根據小孔成像的原理建立的&#xff0c;其實現通常是從向成像平面上發射ray&#xff0c;并把trace這條ray的結果作為成像平面上對應交點的采樣結果。即&#xff1a; 圖片來自《Fundamentals of Computer Graphics》 現實中的鏡頭拍攝的圖…

ubuntu 安裝 pycharm

添加源&#xff1a;$ sudo add-apt-repository ppa:mystic-mirage/pycharm安裝收費的專業版&#xff1a;$ sudo apt update$ sudo apt install pycharm安裝免費的社區版&#xff1a;$ sudo apt update$ sudo apt install pycharm-community卸載&#xff1a;$ sudo apt remove p…

帶你制作百詞斬單詞表讀寫插件

上篇博文簡單的介紹了一下Chrome插件&#xff0c;今天就與大家分享一下我做的這款有實際意義的插件吧。 做這款插件主要是用百詞斬站點進行單詞學習時&#xff0c;遇到的一點點鬧心事兒。在單詞表中不能聽發音。也不能練習拼寫。所以才忍無可忍的做了這么一款插件。自我感覺還是…

Linux7改運行級別,Centos7.0修改系統運行級別

首先翻譯下ininttab里面的內容&#xff1a;#inittab不再使用時利用系統。#添加配置這會對你的系統沒有影響。#Ctrl-Alt-Delete由/usr/lib系統/系統/ctrl-alt-del.target處理#系統使用的目標而不是運行級別。默認情況下&#xff0c;有兩個主要目標&#xff1a;#multi-user.targe…

python測試字符串類型的函數_python-02 數據類型 字符串str

字符串str 一、字符串定義 概念&#xff1a;字符串是有序的 不可修改的&#xff0c;元素以引號包圍的序列 引號類型&#xff1a;單引號 “”雙引號 “ ””三引號 三單引號 print(python) print("python") print(python) print("""python""…

POJ2115 C Looooops(線性同余方程)

無符號k位數溢出就相當于mod 2k&#xff0c;然后設循環x次A等于B&#xff0c;就可以列出方程&#xff1a; $$ CxA \equiv B \pmod {2^k} $$ $$ Cx \equiv B-A \pmod {2^k} $$ 最后就用擴展歐幾里得算法求出這個線性同余方程的最小非負整數解。 1 #include<cstdio>2 #incl…