1、發布/訂閱模式介紹
在普通的生產者、消費者模式,rabbitmq會將消息依次傳遞給每一個消費者,一個worker一個,平均分配,這就是Round-robin調度方式,為了實現更加復雜的調度,我們就需要使用發布/訂閱的方式。
2、交換機(exchange)
RabbitMQ中,消息模型的核心理念就是,生產者從來不能直接將消息發送到隊列,甚至生產者都不知道消息要被發送到隊列中。
相反,生產者只能將消息發送到交換機中,交換機一側從生產者接收消息,一側將消息發送到隊列中,交換機需要知道如何處理接收到的消息,是發送給一個隊列還是多個隊列?這是由交換機的類型決定的。
交換機共分為四類:??direct
,?topic
,?headers
?and?fanout
. 本章節以扇形交換機為例說明rabbitmq的使用。
3、fanout交換機的使用方式
扇形交換機,就像你猜測的那樣,他可以將他接收到的全部消息廣播到所有隊列里。
3.1 聲明交換機
首先聲明一個扇形交換機,type參數設置為『fanout』
err = ch.ExchangeDeclare("logs", // name"fanout", // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments
)
3.2 發送消息到交換機
交換機設定完成后,就可以往該交換機發送消息:
body := "Hello World!"err = ch.Publish("logs", "", false, false, amqp.Publishing{ContentType: "text/plain",Body: []byte(body),})
如果要在rabbitmq的頁面上查看發送的消息,需要提前創建一個隊列,并綁定到該交換機[logs]上,就可以查看發送的消息:
扇形交換機的特性,就是他會將收到的消息廣播給所有綁定到該交換機的隊列,我們可以創建多個隊列,并綁定到該交換機上,我們發送一次消息,就會看到,所有綁定到該交換機的隊列中都會有一條消息,先創建三個隊列,并分別綁定到logs交換機:
之后運行腳本,發送兩次消息:
?可以看到,三個隊列當中都有兩條消息。
3.2 扇形交換機發送消息代碼
package mainimport ("fmt"amqp "github.com/rabbitmq/amqp091-go"
)func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {fmt.Println("Failed to connect to RabbitMQ")return}defer conn.Close()ch, err := conn.Channel()if err != nil {fmt.Println("Failed to open a channel")return}err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)if err != nil {fmt.Println("Failed to declare an exchange")return}body := "Hello World!"err = ch.Publish("logs", "", false, false, amqp.Publishing{ContentType: "text/plain",Body: []byte(body),})if err != nil {fmt.Println("Failed to publish a message")return}
}
?3.2 聲明隊列,用于接收消息
q, err := ch.QueueDeclare("", // namefalse, // durablefalse, // delete when unusedtrue, // exclusivefalse, // no-waitnil, // arguments)
聲明隊列時,沒有指定隊列名稱,這時,系統會返回一個隨機名稱存儲在q變量中。?
3.3 binding
隊列聲明完成后,需要將該隊列綁定到交換機上,這樣交換機才能把消息廣播給該隊列:
綁定代碼:?
err = ch.QueueBind(q.Name, // queue name"", // routing key"logs", // exchangefalse,nil,)
消費者側全部代碼如下:
package mainimport ("fmt"amqp "github.com/rabbitmq/amqp091-go"
)func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {fmt.Println("Failed to connect to RabbitMQ")return}defer conn.Close()ch, err := conn.Channel()if err != nil {fmt.Println("Failed to open a channel")return}err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)if err != nil {fmt.Println("Failed to declare an exchange")return}q, err := ch.QueueDeclare("", // namefalse, // durablefalse, // delete when unusedtrue, // exclusivefalse, // no-waitnil, // arguments)err = ch.QueueBind(q.Name, // queue name"", // routing key"logs", // exchangefalse,nil,)msgs, err := ch.Consume(q.Name, // queue"", // consumertrue, // auto-ackfalse, // exclusivefalse, // no-localfalse, // no-waitnil, // args)var forever chan struct{}go func() {for d := range msgs {fmt.Printf(" [x] %s\n", d.Body)}}()fmt.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever
}
程序啟動后,控制臺上會增加一個隨機命名的隊列。
?運行【3.2】的生產者程序,發送消息到扇形交換機,這個時候消費者就會同步消費到消息,并進行打印:
4、總結
關于扇形交換機,核心的一點需要我們記住,發送到扇形交換機的消息,他會將消息廣播給所有綁定到該交換機的隊列上,無腦廣播,所有隊列會同時接受到交換機上全部的消息。