1、RabbitMQ簡介
rabbitmq是一個開源的消息中間件,主要有以下用途,分別是:
- 應用解耦:通過使用RabbitMQ,不同的應用程序之間可以通過消息進行通信,從而降低應用程序之間的直接依賴性,提高系統的可維護性、擴展性和容錯性。
- 異步提速:通過將耗時的操作轉化為異步執行,可以提高系統的響應速度和吞吐量,提升用戶體驗。
- 削峰填谷:在高峰時段,RabbitMQ可以緩存大量的消息,從而避免系統崩潰,并在低峰時段處理這些消息,提高系統的穩定性。
- 消息分發:RabbitMQ可以將消息分發到多個消費者進行處理,從而提高系統的靈活性和處理能力。
了解rabbitmq的設計架構,對理解mq如何使用有很大的幫助。
一個非常重要的點,mq中的生產者從來不是直接將消息發送到隊列中的,而是將消息發送到了mq的交換機中(上圖中的exchange為交換機),?甚至生產者都不知道這條消息將被發送到哪個隊列中。
交換機是個怎樣的設計呢,他的一側連接生產者,從生產者接收消息,另外一側連接隊列,將消息push進隊列中,將消息push進一個隊列,還是多個隊列,還是拋棄,這些策略是由交換機的類型決定的,對于交換機的使用,后面詳細介紹。
2、RabbitMQ安裝
rabiitmq的安裝,最簡單的一種方式為運行mq的docker鏡像,一行命令搞定:
# latest RabbitMQ 3.13
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
執行命令后,可以看到如下打印,則代表RabbitMQ啟動成功:
鏡像啟動成功后,可以通過ip:15672打開mq控制臺:
http://xxx.xx.xxx.xx:15672/#/
?
mq安裝完成后,下面就可以進行實踐啦。
3、默認模式讀、寫mq
?rabbitmq官方的庫:github.com/rabbitmq/amqp091-go
生產者側代碼:
package mainimport ("context""fmt""time"amqp "github.com/rabbitmq/amqp091-go"
)func Send(msg string) error {// 連接rabbitmqconn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {fmt.Println("connect error:", err)return err}defer conn.Close()// 創建通道ch, err := conn.Channel()if err != nil {fmt.Println("channel error:", err)return err}defer ch.Close()// 創建隊列,使用默認的交換機q, err := ch.QueueDeclare("lp_default", // nametrue, // durablefalse, // delete when unusedfalse, // exclusivefalse, // noWaitnil, // arguments)if err != nil {fmt.Println("queue declare error:", err)return err}ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)defer cancel()fmt.Println(q.Name)// body := "Hello World!"err = ch.PublishWithContext(ctx,"", // exchange,默認交換機q.Name, // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(msg),})if err != nil {fmt.Println("publish error:", err)return err}return nil
}func main() {Send("Hello world")
}
?運行上面代碼后,可以在rabbitmq的客戶端 看到這個隊列:
?點擊隊列,進入隊列詳情:
第一個框中,顯式了隊列詳情,可以看出,這個隊列綁定的是默認的交換機。
第二個框,點擊后,可以看到隊列中的消息詳情。
消費者:
package mainimport ("fmt"amqp "github.com/rabbitmq/amqp091-go"
)func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {fmt.Println("connect error:", err)return}defer conn.Close()ch, err := conn.Channel()if err != nil {fmt.Println("Channel error:", err)return}defer ch.Close()q, err := ch.QueueDeclare("lp_default", // nametrue, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments)if err != nil {fmt.Println("Queue Declare error:", err)return}msgs, err := ch.Consume(q.Name, // queue"", // consumertrue, // auto-ackfalse, // exclusivefalse, // no-localfalse, // no-waitnil, // args)if err != nil {fmt.Println("Consume error:", err)return}var forever chan struct{}go func() {for d := range msgs {fmt.Printf("Received a message: %s\n", d.Body)}}()fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C")<-forever
}
代碼運行記錄:
liupeng@192 default % go run recive.go[*] Waiting for messages. To exit press CTRL+CReceived a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
在以上消費端代碼中,如果代碼在處理消息的過程中出現異常導致了程序退出,這樣正在處理的這條消息就會丟失,為了避免這種情況的發生,rabbitmq設計了消息應答的機制,我們修改上面程序,將auto-ack參數設置為false,當處理完消息后,使用d.Ack(false)發送消息應答。
msgs, err := ch.Consume(q.Name, // queue"", // consumerfalse, // auto-ack,設置為false,取消自動應答false, // exclusivefalse, // no-localfalse, // no-waitnil, // args)if err != nil {fmt.Println("Consume error:", err)return}var forever chan struct{}go func() {for d := range msgs {fmt.Printf("Received a message: %s\n", d.Body)d.Ack(false) // 手動應答}}()fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C")<-forever
如果忘記了進行消息應答,消息會被重新發入調度隊列,這樣就會吃掉越來越多的內存。
但是,當rabbitmq的服務down掉后,隊列中的消息仍然會丟失,為了保證在這種情況下,消息仍然能夠不丟失,我們需要做兩件事:隊列不丟失+消息不丟失,代碼如下:
隊列持久化:
q, err := ch.QueueDeclare("hello", // nametrue, // durable,設置隊列持久化false, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments
)
failOnError(err, "Failed to declare a queue")
消息持久化:
將DeliveryMode設置為amqp.Persistent
err = ch.PublishWithContext(ctx,"", // exchange,默認交換機q.Name, // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(msg),DeliveryMode: amqp.Persistent,})
以上就是默認讀寫rabbitmq的方法,后面再介紹其他幾種使用方式。