?RabbitMQ 發布訂閱視頻學習地址:
簡單模式下RabbitMQ 發布者發布消息 消費者消費消息
Publist/Subscribe 發布訂閱
在 RabbitMQ 中,發布訂閱模式是一種消息傳遞方式,其中發送者(發布者)不會將消息直接發送到特 定的接收者(訂閱者)。而是將消息發送到一個交換機,交換機將消息轉發到綁定到該交換機的每個隊 列 ,每個綁定交換機的隊列都將接收到消息。消費者(訂閱者)監聽自己的隊列 并進行消費 。
?
場景 : 開放平臺 開發者訂閱了某個開放平臺的 api 之后,數據有變化就會自動獲取到最新的
?
?
?
在訂閱模型中,多了一個 Exchange 角色,而且過程略有變化:
?
P :生產者,也就是要發送消息的程序,但是不再發送到隊列中,而是發給 X (交換機)
C :消費者,消息的接收者,會一直等待消息到來
Queue :消息隊列,接收消息、緩存消息
Exchange :交換機( X )。一方面,接收生產者發送的消息。另一方面,知道如何處理消息,例如遞 交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange 的類型。
Exchange 有常見以下 3 種類型:
Fanout :廣播,將消息交給所有綁定到交換機的隊列
Direct :定向,把消息交給符合指定 routing key 的隊列
Topic :通配符,把消息交給符合 routing pattern (路由模式) 的隊列
Exchange (交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與 Exchange 綁定,或者沒有符合路由規則的隊列,那么消息會丟失 !
?
RabbitMQ 發布訂閱模式的一些應用場景: ?
?
1. 數據提供商與應用商 :例如中國氣象局向多個門戶網站提供氣象數據。
2. 新聞機構 :將獨家新聞發布給多個訂閱者,但可能需要根據新聞類型進行更精細的路由。
3. 商城系統 :新添加商品后,同時更新緩存和數據庫。
4. 用戶通知 :用戶充值或轉賬成功后,通過多種方式(如短信、郵件)通知用戶。
5. 消息廣播 :將消息廣播到多個消費者,例如系統公告、活動通知等。
6. 降低耦合 :生產者和消費者通過 RabbitMQ 進行解耦,不需要直接連接,提高系統的靈活性和可
擴展性。
7. 異步處理 :生產者發送消息后,消費者可以異步處理,提高系統的響應速度和并發處理能力。
?
生產者
emit_log.go
?
package main
import (
"context"
"log"
"os"
"strings"
"github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Printf("%s: %s", msg, err)
}
}
func bodyForm(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
func main() {
// 連接到RabbitMQ服務器
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 創建一個通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
//聲明一個交換機
err = ch.ExchangeDeclare(
"logs", //name 交換機名稱
"fanout", //交換機類型 Fanout 廣播
true, //durable 持久化
false, //autoDelete 是否自動刪除
false, //internal 是否內部使用 設置為 false 時,表示無論如何這個交換器都不是
內置的
false, //noWait 是否等待服務器響應 參數通常默認為False,意味著操作會同步進
行并等待服務器的響應
nil, // 其他屬性
)
failOnError(err, "Failed to declare an exchange")
//發送消息
body := bodyForm(os.Args)
// 發布消息到交換機,并指定路由鍵
err = ch.PublishWithContext(
context.Background(),
"logs", // 交換器的名稱
"", // 隊列名
false, // mandatory 必須發送到隊列 ,false表示如果交換器無法根據自身的類型和路
由鍵找到一個符合條件的隊列丟棄
false, //immediate 參數設置為 false 時,表示消息不需要立即被消費者接收
amqp091.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent: %s", body)
}
?
消費者
receive_log.go
?
package main
import (
"log"
"github.com/rabbitmq/amqp091-go"
)
func failOnError2(err error, msg string) {
if err != nil {
log.Printf("%s: %s", msg, err)
}
}
func main() {
//建立連接
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
failOnError2(err, "Failed to connect to RabbitMQ")
defer conn.Close()
//創建一個Channel
ch, err := conn.Channel()
failOnError2(err, "Failed to open a channel")
defer ch.Close()
//聲明一個交換機
err = ch.ExchangeDeclare(
"logs", // 交換機名稱
"fanout", // 交換機類型
true, // 是否持久化
false, // 是否自動刪除
false, // 是否內部使用
false, // 是否等待服務器響應
nil, // 其他屬性
)
failOnError2(err, "Failed to declare an exchange")
// 聲明一個臨時隊列
q, err := ch.QueueDeclare(
"", // 隊列名稱,留空表示由RabbitMQ自動生成
false, // 是否持久化
false, // 是否自動刪除(當沒有任何消費者連接時)
true, // 是否排他隊列(僅限于當前連接)
false, // 是否等待服務器響應
nil, // 其他屬性
)
failOnError2(err, "Failed to declare a queue")
// 將隊列綁定到交換機上
err = ch.QueueBind(
q.Name, // 隊列名稱
"", // 路由鍵,留空表示接收交換機的所有消息
"logs", // 交換機名稱
false, // 是否等待服務器響應
nil, // 其他屬性
)
failOnError2(err, "Failed to bind a queue")
msgs, err := ch.Consume(
q.Name, // 隊列名稱
"", // 消費者標識符,留空表示由RabbitMQ自動生成
true, // 是否自動應答
false, // 是否獨占模式(僅限于當前連接)
false, // 是否等待服務器響應
false, // noLocal
nil, // 其他屬性
)
// msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
failOnError2(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [x] Waiting for logs. To exit press CTRL+C")
<-forever
}
?運行
# 如果你想保存日志文件
go run receive_log.go > logs_from_rabbit.log
# 如果你想再終端看到日志
go run receive_log.go
# shell2
go run emit_log.go
?
?