1. 概念解釋
ZeroMQ Sockets提供了一種類標準套接字(socket-like)的 API,是消息導向的通信機制,基于 TCP/UDP 等傳輸層協議,但封裝了底層細節(如連接管理、消息路由、緩沖區等),提供高級抽象。其核心是 消息隊列 模型,支持多種通信模式(如請求-響應、發布-訂閱等)。
1.1 不同模式:
- 消息導向 :與 TCP 的字節流不同,ZeroMQ sockets 傳輸的是離散的消息(message),類似 UDP,但可靠性更高。每條消息包含長度和二進制數據 。
- 異步抽象 :底層實現為異步消息隊列,自動處理連接、重試、緩沖等復雜邏輯,用戶無需手動管理 。
- 模式支持 :通過不同的 socket 類型定義通信模式(如請求-響應、發布-訂閱等)和消息路由規則 。
提供了多種 socket 類型以支持不同通信模式,
1.2 不同 Socket 類型:
- REQ/REP :請求-響應模式,客戶端(REQ)發送請求,服務端(REP)響應,嚴格同步,用于高可靠性通信 。
- PUB/SUB :發布-訂閱模式,PUB 發送消息,SUB 過濾并接收特定主題的消息,用于實時數據流 。
- PUSH/PULL :用于任務分發(PUSH)和結果收集(PULL),常用于流水線架構 ,分布式系統及微服務間通信。
- ROUTER/DEALER :更靈活的異步模式,ROUTER 根據消息地址路由,DEALER 無狀態轉發,適合自定義協議 。
- 其他類型還包括 PAIR(點對點)、XPUB/XSUB(擴展發布/訂閱)等,總計 16 種以上 。
1.3 與傳統 Socket 對比
- 簡化網絡細節 :ZeroMQ 抽象了底層連接管理(如自動重連、多播)、序列化、線程安全等,用戶僅需關注消息內容 。
- 內置模式 :傳統 socket 需手動實現通信模式(如 RPC),而 ZeroMQ 直接通過 socket 類型支持常見模式 。
- 跨語言兼容 :ZeroMQ 支持多種語言綁定(C++, Python, Java 等),便于構建異構系統 。
1.4 與web Socket對比
- ZeroMQ :適用于 快速構建分布式系統,適合需要高性能、復雜通信模式(如發布-訂閱)的分布式系統,隱藏底層細節,提升開發效率 。
- WebSocket :適用于面向 Web 實時應用,專為 Web 實時通信設計,基于 HTTP 握手,適合瀏覽器與服務器的雙向交互,僅提供全雙工通信通道,需上層協議(如自定義消息格式)定義交互規則。
2. 請求-響應模式【REQ/REP】示例
嚴格同步,一問一答,用于客戶端發送請求,服務端同步響應,適用于遠程過程調用(RPC)
2.1 服務端
import zmqcontext = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")print("服務器已啟動,等待客戶端請求...")while True:message = socket.recv_string()print(f"收到客戶端請求: {message}")response = f"服務器已收到請求: {message}"socket.send_string(response)print(f"發送響應: {response}")
2.2 客戶端
import zmqcontext = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")msg = "你好";
socket.send_string(msg)
print(f"發送消息: {msg}")response = socket.recv_string()
print(f"收到服務器消息: {response}")
2.3 執行效果
3. 發布-訂閱模式(PUB/SUB)示例
一對多廣播,客戶端可過濾主題,用于服務端廣播消息,客戶端訂閱特定主題,適用于實時數據推送(如股票行情)
3.1 服務端
import zmq
import timecontext = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")print("發布者已啟動,開始廣播...")while True:topic = "測試主題"data = f"你好,歡迎收聽!" socket.send_string(f"{topic} {data}")time.sleep(1)
3.2 客戶端
import zmqcontext = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt_string(zmq.SUBSCRIBE, "測試主題")print("訂閱者已啟動,等待消息...")
while True:msg = socket.recv_string()print(f"收到消息: {msg}")
3.3 執行效果
4. 管道模式(PUSH/PULL)示例
單向任務分發,支持負載均衡,用于服務端分發任務,客戶端并行處理,適用于分布式任務隊列
4.1 服務端
import zmqcontext = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5555")print("任務分發器已啟動,開始推送任務...")for i in range(10):task = f"任務{i}命令編碼" socket.send_string(task)
4.2 客戶端
import zmqcontext = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect("tcp://localhost:5555")print("客戶端已啟動,等待指令...")
while True:msg = socket.recv_string()print(f"收到任務: {msg}")
4.3 執行效果
4.4 服務器(客戶端綁定端口)
客戶端和服務器對端口的使用可以呼喚,不過,防止服務器未連接到就結束程序,需要增加個延時等待連接成功。
import zmq
import timecontext = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect("tcp://localhost:5555")time.sleep(1) # 等待連接建立
print("任務分發器已啟動,開始推送任務...")for i in range(10):task = f"任務{i}命令編碼" socket.send_string(task)
4.5 客戶端(客戶端綁定端口)
import zmqcontext = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind("tcp://*:5555")print("客戶端已啟動,等待指令...")
while True:msg = socket.recv_string()print(f"收到任務: {msg}")
4.6 執行效果(客戶端綁定端口)
4.7 服務器(非阻塞模式監聽)
使用 flags=zmq.NOBLOCK 時,若當前無消息可接收,recv_string() 會立即拋出 zmq.Again 異常,而非阻塞等待。
啟用 zmq.CONFLATE 后,隊列僅保留最新消息。若服務端發送頻率過高或客戶端處理延遲,可能導致消息被覆蓋。
import zmq
import timecontext = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect("tcp://localhost:5555")time.sleep(1) # 等待連接建立
print("任務分發器已啟動,開始推送任務...")for i in range(10):task = f"任務{i}命令編碼" socket.send_string(task)
4.8 客戶端(非阻塞模式監聽)
import zmqcontext = zmq.Context()
socket = context.socket(zmq.PULL)
socket.setsockopt(zmq.CONFLATE, 1)
socket.bind("tcp://*:5555")print("客戶端已啟動,等待指令...")
while True:try:msg = socket.recv_string(flags=zmq.NOBLOCK)print(f"收到任務: {msg}")except zmq.Again:pass
4.9 執行效果(非阻塞模式監聽)
5. ZeroMQ 常見 setsockopt 選項
選項名稱 | 作用 | 層級 |
---|---|---|
ZMQ_CONFLATE | 當隊列滿時,僅保留最新消息(覆蓋舊消息)。適用于實時數據更新場景(如傳感器數據、股票行情) | ZeroMQ 自定義層選項 |
ZMQ_SNDHWM | 設置發送隊列的高水位線(High Water Mark),控制最大未發送消息數。超出后丟棄或阻塞 | 同上 |
ZMQ_RCVHWM | 設置接收隊列的高水位線,控制最大未處理消息數 | 同上 |
ZMQ_LINGER | 控制關閉連接時是否等待未發送/接收的消息。值為 0 表示立即關閉,忽略未處理數據 | 同上 |
ZMQ_MAXMSGSIZE | 設置消息的最大大小(字節)。超出此限制的消息會被丟棄 | 同上 |
ZMQ_IMMEDIATE | 僅在綁定了地址時才允許連接(REQ/REP 等模式)。避免連接未就緒的服務端 | 同上 |
ZMQ_ROUTER_RAW | 啟用原始模式,直接讀寫裸數據(如 HTTP 協議),不使用 ZeroMQ 的幀格式 | 同上 |
ZMQ_TCP_KEEPALIVE | 啟用 TCP 保活機制,檢測連接是否存活 | 同上 |
ZMQ_IDENTITY | 設置套接字的唯一標識符(字符串),用于 ROUTER/DEALER 模式下的路由 | 同上 |
SO_REUSEADDR | 允許綁定到同一地址和端口,即使該地址/端口處于 TIME_WAIT 狀態或被其他套接字占用,避免服務器重啟時因端口占用而失敗 | 套接字層選項 |
SO_BROADCAST | 啟用廣播功能,允許套接字發送廣播消息(UDP 場景),局域網內向所有設備發送數據 | 同上 |
SO_KEEPALIVE | 啟用 TCP 保活機制,定期檢測連接是否存活,自動斷開無響應的空閑連接,防止資源泄漏。 | 同上 |
SO_LINGER | 控制調用 close() 時的行為,決定是否等待未發送的數據。 | 同上 |
SO_RCVBUF / SO_SNDBUF | 設置接收(SO_RCVBUF)和發送(SO_SNDBUF)緩沖區的大小,優化吞吐量或延遲,大緩沖區適合高帶寬場景,小緩沖區適合低延遲場景。 | 同上 |
SO_TIMEOUT | 設置接收(SO_RCVTIMEO)或發送(SO_SNDTIMEO)操作的超時時間。防止阻塞操作無限期等待,適用于非阻塞模式。 | 同上 |
TCP_NODELAY | 禁用 Nagle 算法,強制立即發送小數據包。減少延遲,適用于實時通信(如游戲、聊天)。 | 傳輸層選項 |
IP_MULTICAST_TTL | 設置多播數據包的生存時間(TTL),控制多播范圍(如局域網或跨網絡)。 | IP 層選項 |
6. 環境安裝
pip install zmq