RabbitMQ 是當前最流行的消息中間件(Message Broker)之一,支持多種消息協議(如 AMQP、MQTT)。
同時它也是一個輕量級的非常易于部署的開源軟件,可以運行在當前大多數操作系統及云端環境中,也能夠部署在分布式的集群環境里以達到高可用、可伸縮的需求。
此外,RabbitMQ 還為目前主流的編程語言提供了豐富的開發工具。
一、軟件安裝
可以進入 官方下載界面 閱讀針對自己操作系統版本的安裝手冊,根據需求選擇適合的安裝方式。
Windows 系統可以直接在該頁面中獲取二進制安裝包(還需要安裝 Erlang 環境),Linux 系統也可以根據發行版的不同添加特定的軟件鏡像源。
我這里是 Ubuntu 19.04,沒有特別的需求,所以直接從系統默認的軟件鏡像源里下載安裝,命令如下:
$ sudo apt-get install rabbitmq-server
安裝完成以后,運行 systemctl status rabbitmq-server 命令查看 RabbitMQ 服務的運行狀態:
$ systemctl status rabbitmq-server
● rabbitmq-server.service - RabbitMQ Messaging Server
Loaded: loaded (/lib/systemd/system/rabbitmq-server.service; enabled; vendor preset: enabled)
Active: active (running) since Fri 2019-07-26 01:03:27 CST; 2min 55s ago
Main PID: 770 (beam.smp)
Status: "Initialized"
Tasks: 85 (limit: 2302)
Memory: 85.8M
CGroup: /system.slice/rabbitmq-server.service
├─ 741 /bin/sh /usr/sbin/rabbitmq-server
├─ 770 /usr/lib/erlang/erts-10.2.4/bin/beam.smp -W w -A 64 -MBas ageffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs 30 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -K true -- -root /usr/lib/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.8/ebin -noshell -noinput -s rabbit boot -sname rabbit@server1 -boot start_sasl -kernel inet_default_connect_options [{nodelay,true}] -sasl errlog_type error -sasl sasl_error_logger false -rabbit lager_log_root "/var/log/rabbitmq" -rabbit lager_default_file "/var/log/rabbitmq/rabbit@server1.log" -rabbit lager_upgrade_file "/var/log/rabbitmq/rabbit@server1_upgrade.log" -rabbit enabled_plugins_file "/etc/rabbitmq/enabled_plugins" -rabbit plugins_dir "/usr/lib/rabbitmq/plugins:/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.8/plugins" -rabbit plugins_expand_dir "/var/lib/rabbitmq/mnesia/rabbit@server1-plugins-expand" -os_mon start_cpu_sup false -os_mon start_disksup false -os_mon start_memsup false -mnesia dir "/var/lib/rabbitmq/mnesia/rabbit@server1" -kernel inet_dist_listen_min 25672 -kernel inet_dist_listen_max 25672
├─1243 erl_child_setup 65536
├─1286 inet_gethost 4
└─1287 inet_gethost 4
7月 26 01:02:44 server1 systemd[1]: Starting RabbitMQ Messaging Server...
7月 26 01:03:27 server1 systemd[1]: rabbitmq-server.service: Supervising process 770 which is not our child. We'll most likely not notice when it exits.
7月 26 01:03:27 server1 systemd[1]: Started RabbitMQ Messaging Server.
Web Admin
RabbitMQ 還提供了可以遠程訪問的 Web 管理與監控工具,默認以插件的形式安裝到系統中,需要使用 rabbitmq-plugins 命令開啟。
具體命令如下:
$ sudo rabbitmq-plugins enable rabbitmq_management
RabbitMQ 默認創建了一個用戶名密碼分別為 guest/guest 的用戶,只是該用戶只允許本地登錄。(我這里是遠程。。。)
如果需要遠程訪問 Web 控制臺,可以通過 rabbitmqctl 命令創建一個新的管理賬戶:
$ sudo rabbitmqctl add_user
此時新創建的賬戶仍無法登錄,還需要為其分配用戶角色以及對 vhost 的管理權限,命令如下:
$ sudo rabbitmqctl set_user_tags administrator
$ sudo rabbitmqctl set_permissions -p / ".*" ".*" ".*"
權限設置完畢后,即可用之前指定的用戶名密碼遠程登錄 Web 管理系統,界面如下圖:
Web Admin
Web 形式的后臺界面為管理工作與監控需求提供了便捷的接口,同時大部分管理操作也可直接通過 rabbitmqctl 命令完成,具體可參考該命令的幫助信息:
$ sudo rabbitmqctl
Usage:
rabbitmqctl [-n ] [-l] [-q] []
...
Commands:
add_user
add_vhost
authenticate_user
await_online_nodes [-t ]
cancel_sync_queue [-p ] queue
change_cluster_node_type
change_password
...
二、架構解析
RabbitMQ 是一種高性能、穩定、可伸縮(集群部署)的消息中間件,由 Erlang 語言編寫。
Erlang 是一種函數式編程語言,專注于分布式、高容錯的軟件類實時系統等應用場景。它通過輕量級的進程設計以及進程之間的消息通信,提供了一個高層次的不需要共享狀態的并發模型。
RabbitMQ 集群通過 Erlang VM 原生的 IPC (inter-process communication) 機制完成跨節點的消息通信。
松耦合架構
對于傳統的應用架構,比如一個 Web 應用的登錄程序,往往需要對后端的數據庫表格進行多項實時的寫入操作。而當用戶的訪問量大增時,此時的表格更新操作很容易成為瓶頸并影響到整體的響應速度。
相對于登錄程序直接更新表格數據的緊耦合架構,可以將前端的請求數據推送到基于消息的中間件或者某個中心化的消息隊列應用,再通過中間件分發消息到多個消費者(Consumer)應用,由消費者獨立、異步地完成最終的數據庫更新操作。
image.png
基于消息的中間件對于創建數據導向的靈活的應用架構有非常大的優勢。RabbitMQ 支持的松耦合設計可以使應用不再受類似于數據庫寫操作的性能限制。
同時這種架構也非常易于橫向擴展,可以在添加作用于相同數據的應用實例時不影響現有的核心功能。
tightly coupled application
loosely coupled application
三、消息應用示例代碼
下文中將使用 Python 語言及其 RabbitMQ 客戶端 Pika 創建 5 個基本的消息應用,結構由簡單到復雜,源代碼均參考自官網 RabbitMQ Tutorials 。
安裝 pika 庫:pip install pika
Hello World
該應用的結構示意圖如下:
helloworld
由 P (Producer) 發送一條消息到隊列(Queue),再由隊列轉發消息到 C (Consumer) 。
發送端代碼 send.py 如下:
#!/usr/bin/env python
import pika
# 初始化與 RabbitMQ 服務器的連接
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 隊列聲明
channel.queue_declare(queue='hello')
# 發送消息
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
接收端 reveive.py 代碼如下:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
# 接收到消息后觸發的回調函數
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# 消費者聲明與消息監聽
channel.basic_consume(
queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
測試
首先運行 4 次發送程序:
$ python send.py
[x] Sent 'Hello World'
$ python send.py
[x] Sent 'Hello World'
$ python send.py
[x] Sent 'Hello World'
$ python send.py
[x] Sent 'Hello World'
從 Web 管理界面中可以看到,此時隊列中緩存了 4 條消息。
overview
運行接收端程序:
$ python receive.py
[x] Waiting for messages. To exit press CTRL+C
[x] Received b'Hello World'
[x] Received b'Hello World'
[x] Received b'Hello World'
[x] Received b'Hello World'
發送端連續 4 次發送的消息被接收端收取,隊列中緩存的消息被清空。同時接收端保持運行狀態等待新的消息被轉發給自己。
overview 2
消息隊列一直處于等待生產者發送消息和將收到或緩存的消息轉發給消費者的狀態。如未有消費者及時接收和處理被轉發的消息,則這部分消息緩存在隊列中等待進一步操作。
Work Queue
結構示意圖:
Work Queue
本例中將創建一個 Work Queue 用來將消耗時間長的任務以輪詢的方式分發給多個消費者處理。
生產者源代碼 new_task.py :
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(" [x] Sent %r" % message)
connection.close()
消費者源代碼 worker.py :
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag) # Message acknowledgment
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()
Message acknowledgment
消費者在處理接收到的任務或消息時有可能會消耗比較多的時間,在此過程中,如消費者端出現軟硬件故障,則會出現消息丟失的情況。
RabbitMQ 支持 Message acknowledgment 。即消費者在接收和處理完一個特定的消息后會向 RabbitMQ 返回一個應答(ack),說明該消息可以從隊列中移除。
如果消費者在返回應答之前丟失與隊列的連接,則 RabbitMQ 判定對應的消息未由消費者完全處理,會將該消息保留在隊列中并重新分發給其他在線的消費者。
Message durability
消息應答的機制可以確保即使消費者宕機的情況下任務仍不會丟失。但是當 RabbitMQ 服務本身出現故障時,隊列以及隊列中緩存的消息仍舊會被清理掉。
為了保證 RabbitMQ 中隊列以及消息的持久化,首先需要在生產者和消費者代碼中同時聲明隊列為 durable :
channel.queue_declare(queue='task_queue', durable=True)
此外還需要將生產者代碼中的 delivery_mode 屬性設置為 2 確保消息的持久化:
properties=pika.BasicProperties(delivery_mode=2,)
測試
打開兩個命令行終端,分別運行 worker.py 程序:
# Shell 1
$ python worker.py
[x] Waiting for messages. To exit press CTRL+C
# Shell 2
$ python worker.py
[x] Waiting for messages. To exit press CTRL+C
打開另一個終端窗口運行 new_task.py 程序發送 4 條消息:
# Shell 3
$ python new_task.py First Message
[x] Sent 'First Message'
$ python new_task.py Second Message
[x] Sent 'Second Message'
$ python new_task.py Third Message
[x] Sent 'Third Message'
$ python new_task.py Forth Message
[x] Sent 'Forth Message'
最終兩個消費者分別接收到隊列分發的兩條消息:
# Shell 1
$ python worker.py
[x] Waiting for messages. To exit press CTRL+C
[x] Received b'First Message'
[x] Done
[x] Received b'Third Message'
[x] Done
# Shell 2
$ python worker.py
[x] Waiting for messages. To exit press CTRL+C
[x] Received b'Second Message'
[x] Done
[x] Received b'Forth Message'
[x] Done
Fair dispatch
當 RabbitMQ 以輪詢的方式(即平均分配)將隊列中的消息轉發給多個消費者時,如果這些消費者接收到的任務繁重程度差異很大,則會導致某些消費者端任務的積壓。
為了避免這種情況發生,可以使用 basic_qos 方法設置 prefetch 的值,如 worker.py 程序中的以下代碼:
channel.basic_qos(prefetch_count=1) 。
該行代碼可以確保同一個消費者在任意時間點最多只接受 1 個任務分配給自己。即如果某個消費者當前有未處理完的消息,則不再接收新的消息直到當前的任務處理完。
Publish/Subscribe
結構示意圖:
Publish-Subscribe
Exchange
在之前的示例中,用到了消息隊列模型中的以下幾個組件:
producer :生產者,即發送消息的應用
queue :隊列,即存儲消息的緩存
consumer :消費者,即接收消息的應用
實際上在 RabbitMQ 的消息模型中,生產者從來不會將消息直接發送到隊列中,而是將消息發送給一個名為 exchange 的組件。
exchange 的一端用來接收生產者發送的消息,一端用來將消息推送到隊列中。它通過 exchange type 中的定義判斷特定的消息是該推送給某個對應的隊列,還是將其廣播給多個隊列,又或者直接丟棄。
RabbitMQ 主要提供了 4 種 exchange 類型:direct、topic、headers 和 fanout。
本例中使用 fanout,即 exchange 會將接收到的消息以廣播的形式發送給所有關聯的隊列,再由隊列傳遞給消費者處理。
源代碼(emit_log.py)如下:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()
receive_logs.py:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
receive_logs.py 文件中有一行 result = channel.queue_declare(queue='', exclusive=True) 代碼,用來聲明一個臨時隊列( queue='' 沒有指定名稱,因此會由 RabbitMQ 設置隨機的名稱),同時 exclusive=True 設置該隊列在消費者斷開連接后自行刪除。
測試
同時打開兩個命令行窗口分別運行 receive_logs.py 文件:
# Shell 1
$ python receive_logs.py
[*] Waiting for logs. To exit press CTRL+C
# Shell 2
$ python receive_logs.py
[*] Waiting for logs. To exit press CTRL+C
再打開第三個終端執行 emit_log.py 命令 4 次:
# Shell 3
$ python emit_log.py
[x] Sent 'info: Hello World!'
$ python emit_log.py
[x] Sent 'info: Hello World!'
$ python emit_log.py
[x] Sent 'info: Hello World!'
$ python emit_log.py
[x] Sent 'info: Hello World!'
此時之前運行的兩個 receive 程序同時收到發送的 4 條消息:
$ python receive_logs.py
[*] Waiting for logs. To exit press CTRL+C
[x] b'info: Hello World!'
[x] b'info: Hello World!'
[x] b'info: Hello World!'
[x] b'info: Hello World!'
$ python receive_logs.py
[*] Waiting for logs. To exit press CTRL+C
[x] b'info: Hello World!'
[x] b'info: Hello World!'
[x] b'info: Hello World!'
[x] b'info: Hello World!'
Routing
結構示意圖:
Routing
與上一個例子中以廣播的形式轉發消息不同,本例中允許消費者通過隊列有選擇地訂閱生產者發送的部分消息。
Binding 和 Direct exchange
在 RabbitMQ 中,binding 代表 exchange 與隊列的對應關系,即隊列會根據 binding 的設置對 exchange 轉發的消息有選擇性地接收。
因此 binding 的最終效果也依賴于 exchange 的類型。比如之前用到的 fanout 類型,由于是廣播的形式(轉發給所有關聯的隊列)并不需要選擇的動作,則 binding 的值被忽略。
但是對于 direct 類型的 exchange ,則可以通過 binding 對消息進行篩選。在 direct exchange 下,只有當隊列的 binding_key 與消息的 routing_key 一致時,隊列才會收到 exchange 轉發的消息。
emit_log_direct.py:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
receive_logs_direct.py:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(
exchange='direct_logs', queue=queue_name, routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
測試
首先運行 receive_logs_direct.py 程序并指定參數為 error(即只接收標記為“error”的消息):
# Shell 1
$ python receive_logs_direct.py error
[*] Waiting for logs. To exit press CTRL+C
打開另一終端同樣運行 receive_logs_direct.py 程序并指定參數為 info warning(即接收標記為 info 或 warning 的消息):
# Shell 2
$ python receive_logs_direct.py info warning
[*] Waiting for logs. To exit press CTRL+C
打開第三個終端并運行 emit_log_direct.py 程序發送 4 條日志消息:
# Shell 3
$ python emit_log_direct.py error "This is an error"
[x] Sent 'error':'This is an error'
$ python emit_log_direct.py info "Hi, I am an info"
[x] Sent 'info':'Hi, I am an info'
$ python emit_log_direct.py warning "Yeah, it's a warning"
[x] Sent 'warning':"Yeah, it's a warning"
$ python emit_log_direct.py error "Hi, it's an error again"
[x] Sent 'error':"Hi, it's an error again"
此時 Shell 1 中只接收到了標記為 error 的消息:
$ python receive_logs_direct.py error
[*] Waiting for logs. To exit press CTRL+C
[x] 'error':b'This is an error'
[x] 'error':b"Hi, it's an error again"
而 Shell 2 中接收到了標記為 info 和 warning 的消息:
$ python receive_logs_direct.py info warning
[*] Waiting for logs. To exit press CTRL+C
[x] 'info':b'Hi, I am an info'
[x] 'warning':b"Yeah, it's a warning"
Topics
結構示意圖:
Topics
direct 類型的 exchange 雖然可以根據消息的 routing_key 以及隊列的 binding_key 有選擇性的推送消息到隊列,但是并不適合更復雜的場景。
而 topic 類型的 exchange 與 direct 類型邏輯上大致相同,只是 topic 類型的 exchange 并沒有一個明確的 routing_key,而是由幾個點號(.)分隔的單詞(如 lazy.orange.cat)進行定義。
與之對應的 binding_key 也需要遵循同樣的形式,只不過 binding_key 額外支持兩個特殊含義的字符:
星號(*)可以表示某一個任意的單詞
井號(#)可以表示任意 0 個或多個單詞
因此對于上圖(Topics)中的情形,routing_key 為 quick.orange.rabbit 的消息會被轉發給 Q1 和 Q2 隊列,quick.orange.fox 則只會轉發給 Q1 隊列,lazy.orange.male.rabbit 被轉發給 Q2 隊列。
emit_log_topic:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
receive_logs_topic.py:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(
exchange='topic_logs', queue=queue_name, routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
測試
先運行接收端程序(Shell 1 和 Shell 2),再運行發送端(Shell 3),效果如下:
# Shell 3
$ python emit_log_topic.py "kern.warning" "A kernel warning message"
[x] Sent 'kern.warning':'A kernel warning message'
$ python emit_log_topic.py "network.critical" "A critical network error"
[x] Sent 'network.critical':'A critical network error'
$ python emit_log_topic.py "kern.critical" "A critical kernel error"
[x] Sent 'kern.critical':'A critical kernel error'
# Shell 1
$ python receive_logs_topic.py "kern.*"
[*] Waiting for logs. To exit press CTRL+C
[x] 'kern.warning':b'A kernel warning message'
[x] 'kern.critical':b'A critical kernel error'
# Shell 2
$ python receive_logs_topic.py "*.critical"
[*] Waiting for logs. To exit press CTRL+C
[x] 'network.critical':b'A critical network error'
[x] 'kern.critical':b'A critical kernel error'
參考資料