RabbitMQ 發布訂閱

?RabbitMQ 發布訂閱視頻學習地址:

簡單模式下RabbitMQ 發布者發布消息 消費者消費消息

Publist/Subscribe 發布訂閱

RabbitMQ 中,發布訂閱模式是一種消息傳遞方式,其中發送者(發布者)不會將消息直接發送到特 定的接收者(訂閱者)。而是將消息發送到一個交換機,交換機將消息轉發到綁定到該交換機的每個隊 ,每個綁定交換機的隊列都將接收到消息。消費者(訂閱者)監聽自己的隊列 并進行消費 。

?

場景 : 開放平臺 開發者訂閱了某個開放平臺的 api 之后,數據有變化就會自動獲取到最新的

?

?

?

在訂閱模型中,多了一個 Exchange 角色,而且過程略有變化:

?

P :生產者,也就是要發送消息的程序,但是不再發送到隊列中,而是發給 X (交換機)
C :消費者,消息的接收者,會一直等待消息到來
Queue :消息隊列,接收消息、緩存消息
Exchange :交換機( X )。一方面,接收生產者發送的消息。另一方面,知道如何處理消息,例如遞 交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange 的類型。
Exchange 有常見以下 3 種類型:
Fanout :廣播,將消息交給所有綁定到交換機的隊列
Direct :定向,把消息交給符合指定 routing key 的隊列
Topic :通配符,把消息交給符合 routing pattern (路由模式) 的隊列
Exchange (交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與 Exchange 綁定,或者沒有符合路由規則的隊列,那么消息會丟失

?

RabbitMQ 發布訂閱模式的一些應用場景: ?

?

1. 數據提供商與應用商 :例如中國氣象局向多個門戶網站提供氣象數據。
2. 新聞機構 :將獨家新聞發布給多個訂閱者,但可能需要根據新聞類型進行更精細的路由。
3. 商城系統 :新添加商品后,同時更新緩存和數據庫。
4. 用戶通知 :用戶充值或轉賬成功后,通過多種方式(如短信、郵件)通知用戶。
5. 消息廣播 :將消息廣播到多個消費者,例如系統公告、活動通知等。
6. 降低耦合 :生產者和消費者通過 RabbitMQ 進行解耦,不需要直接連接,提高系統的靈活性和可
擴展性。
7. 異步處理 :生產者發送消息后,消費者可以異步處理,提高系統的響應速度和并發處理能力。

?

生產者
emit_log.go

?

package main
import (
"context"
"log"
"os"
"strings"
"github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Printf("%s: %s", msg, err)
}
}
func bodyForm(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
func main() {
// 連接到RabbitMQ服務器
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 創建一個通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
//聲明一個交換機
err = ch.ExchangeDeclare(
"logs", //name 交換機名稱
"fanout", //交換機類型 Fanout 廣播
true, //durable 持久化
false, //autoDelete 是否自動刪除
false, //internal 是否內部使用 設置為 false 時,表示無論如何這個交換器都不是
內置的
false, //noWait 是否等待服務器響應 參數通常默認為False,意味著操作會同步進
行并等待服務器的響應
nil, // 其他屬性
)
failOnError(err, "Failed to declare an exchange")
//發送消息
body := bodyForm(os.Args)
// 發布消息到交換機,并指定路由鍵
err = ch.PublishWithContext(
context.Background(),
"logs", // 交換器的名稱
"", // 隊列名
false, // mandatory 必須發送到隊列 ,false表示如果交換器無法根據自身的類型和路
由鍵找到一個符合條件的隊列丟棄
false, //immediate 參數設置為 false 時,表示消息不需要立即被消費者接收
amqp091.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent: %s", body)
}

?

消費者
receive_log.go

?

package main
import (
"log"
"github.com/rabbitmq/amqp091-go"
)
func failOnError2(err error, msg string) {
if err != nil {
log.Printf("%s: %s", msg, err)
}
}
func main() {
//建立連接
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
failOnError2(err, "Failed to connect to RabbitMQ")
defer conn.Close()
//創建一個Channel
ch, err := conn.Channel()
failOnError2(err, "Failed to open a channel")
defer ch.Close()
//聲明一個交換機
err = ch.ExchangeDeclare(
"logs", // 交換機名稱
"fanout", // 交換機類型
true, // 是否持久化
false, // 是否自動刪除
false, // 是否內部使用
false, // 是否等待服務器響應
nil, // 其他屬性
)
failOnError2(err, "Failed to declare an exchange")
// 聲明一個臨時隊列
q, err := ch.QueueDeclare(
"", // 隊列名稱,留空表示由RabbitMQ自動生成
false, // 是否持久化
false, // 是否自動刪除(當沒有任何消費者連接時)
true, // 是否排他隊列(僅限于當前連接)
false, // 是否等待服務器響應
nil, // 其他屬性
)
failOnError2(err, "Failed to declare a queue")
// 將隊列綁定到交換機上
err = ch.QueueBind(
q.Name, // 隊列名稱
"", // 路由鍵,留空表示接收交換機的所有消息
"logs", // 交換機名稱
false, // 是否等待服務器響應
nil, // 其他屬性
)
failOnError2(err, "Failed to bind a queue")
msgs, err := ch.Consume(
q.Name, // 隊列名稱
"", // 消費者標識符,留空表示由RabbitMQ自動生成
true, // 是否自動應答
false, // 是否獨占模式(僅限于當前連接)
false, // 是否等待服務器響應
false, // noLocal
nil, // 其他屬性
)
// msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
failOnError2(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [x] Waiting for logs. To exit press CTRL+C")
<-forever
}

?運行

# 如果你想保存日志文件
go run receive_log.go > logs_from_rabbit.log
# 如果你想再終端看到日志
go run receive_log.go
# shell2
go run emit_log.go

?

?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/15371.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/15371.shtml
英文地址,請注明出處:http://en.pswp.cn/web/15371.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

基于open3d對kitti數據集檢測結果可視化

前言 KITTI數據集是自動駕駛和計算機視覺領域中一個廣泛使用的基準數據集&#xff0c;它提供了豐富的傳感器數據&#xff0c;包括激光雷達、相機和GPS等。Open3D是一個功能強大的3D數據處理和可視化庫&#xff0c;支持多種3D數據格式。本文將介紹如何使用Open3D對KITTI數據集的…

Python常見數據類型處理

一、數據類型分類 Python3 中常見的數據類型有&#xff1a; Number&#xff08;數字&#xff09;String&#xff08;字符串&#xff09;bool&#xff08;布爾類型&#xff09;List&#xff08;列表&#xff09;Tuple&#xff08;元組&#xff09;Set&#xff08;集合&#xf…

詳解 Spring MVC(Spring MVC 簡介)

什么是 Spring MVC&#xff1f; Spring MVC 是 Spring 框架提供的一個基于 MVC 模式的輕量級 Web 框架&#xff0c;是 Spring 為表示層開發提供的一整套完整的解決方案&#xff0c;Spring MVC 使用了 MVC 架構模式&#xff0c;將 Web 層職責解耦&#xff0c;基于請求驅動模型&…

基于Java、SpringBoot和uniapp在線考試系統安卓APP和微信小程序

摘要 基于Java、SpringBoot和uniapp的在線考試系統安卓APP微信小程序是一種結合了現代Web開發技術和移動應用技術的解決方案&#xff0c;旨在為教育機構提供一個方便、高效和靈活的在線考試平臺。該系統采用Java語言進行后端開發&#xff0c;使用SpringBoot框架簡化企業級應用…

SpringCloud微服務之Nacos、Feign、GateWay詳解

SpringCloud微服務之Nacos、Feign、GateWay詳解 1、Nacos配置管理1.1、統一配置管理1.1.1、在nacos中添加配置文件1.1.2、從微服務拉取配置 1.2、配置熱更新1.2.1、方式一1.2.2、方式二 1.3、配置共享1.3.1、配置共享的優先級 1.4、搭建nacos集群1.4.1、初始化數據庫1.4.2、下載…

plt多子圖設置

import matplotlib.pyplot as plt# 使用 subplots 函數創建一個 2x3 的子圖網格 fig, axs plt.subplots(nrows2, ncols3, figsize(16, 10)) # 調整 figsize 來改變圖像大小# 遍歷每個子圖&#xff0c;并繪制一些內容&#xff08;這里只是簡單的示例&#xff09; for ax in ax…

React與Vue的區別?

一、區別: 1. 語法 Vue采用自己特有的模板語法&#xff1b; React是單向的&#xff0c;采用jsx語法創建react元素。 2.監聽數據變化的實現原理不同 Vue2.0 通過Object.defineproperty()方法的getter/setter屬性, 實現數據劫持, 每次修改完數據會觸發diff算法(雙端對比) …

VUE 頁面生命周期基本知識點

在 Vue.js 中&#xff0c;頁面生命周期&#xff08;更準確地說是組件生命周期&#xff09;指的是組件從創建到銷毀的一系列過程。了解這些生命周期鉤子可以幫助我們更好地管理組件的狀態和行為。以下是 Vue 組件的主要生命周期鉤子&#xff1a; beforeCreate 在實例初始化之后&…

vue使用element plus組件上傳服務器

在Vue項目中使用Element Plus組件上傳文件到服務器&#xff0c;你可以使用ElUpload組件。以下是一個簡單的示例&#xff0c;展示了如何使用ElUpload組件來上傳文件&#xff0c;并將其保存到服務器。 首先&#xff0c;確保你已經安裝了Element Plus。 npm install element-plu…

從入門到精通:詳解Linux進程管理

前言 在這篇文章中&#xff0c;我將帶領大家深入學習和理解Linux系統中的進程管理。無論你是初學者還是有一定經驗的開發者&#xff0c;相信這篇文章都會對你有所幫助。我們將詳細講解馮諾依曼體系結構、操作系統概念、進程管理、進程調度、進程狀態、環境變量、內存管理以及其…

C語言之函數和函數庫以及自己制作靜態動態鏈接庫并使用

一&#xff1a;函數的本質 1&#xff1a;C語言為什么會有函數 &#xff08;1&#xff09;整個程序分為多個源文件&#xff0c;一個文件分為多個函數&#xff0c;一個函數分成多個語句&#xff0c;這就是整個程序的組織形式。這樣的組織好處在于&#xff1a;分化問題、、便于程序…

分布式版本控制工具 git

git 是什么 分布式版本控制工具。github 是代碼托管平臺。 git 有什么用 保存文件的所有修改記錄。使用版本號&#xff08;sha1 哈希值&#xff09; 進行區分。隨時可瀏覽歷史版本記錄。可還原到歷史指定版本。對比不同版本的文件差異。 為什么要使用 git 多人協作開發一個大…

SQL 優化

SQL 優化是指通過各種手段提高 SQL 查詢的執行效率,減少資源消耗,提高數據庫的整體性能。以下是一些詳細的 SQL 優化方法,包括索引優化、查詢優化、數據庫設計優化等。 1. 索引優化 創建適當的索引: 單列索引:在查詢中頻繁使用的單個列上創建索引。多列索引(復合索引):…

STM32手寫超頻到128M函數

今天學習了野火的STM32教程學會了如何設置STM32的時鐘頻率&#xff0c;步驟比較詳細&#xff0c;也很容易理解&#xff0c;就是視頻教程不能跳著看&#xff0c;只能一節節的看&#xff0c;不然會知識不連貫&#xff0c;造成有些知識不理解&#xff0c;連續著看還是沒有什么難度…

docker-file 網絡

docker掛載 1.綁定掛載&#xff08;Bind Mounts&#xff09;&#xff1a;綁定掛載是將主機上的文件或目錄掛載到容器中。 docker run -v /host/path:/container/path image_name 2.卷掛載&#xff08;Volume Mounts&#xff09;&#xff1a;卷掛載將 Docker 數據卷掛載到容器中…

【CTF Web】CTFShow web4 Writeup(SQL注入+PHP+字符型注入)

web4 1 管理員阿呆又失敗了&#xff0c;這次一定要堵住漏洞 解法 注意到&#xff1a; <!-- flag in id 1000 -->攔截很多種字符&#xff0c;連 select 也不給用了。 if(preg_match("/or|\-|\\\|\/|\\*|\<|\>|\!|x|hex|\(|\)|\|select/i",$id)){die(&q…

yolov8推理由avi改為mp4

修改\ultralytics-main\ultralytics\engine\predictor.py&#xff0c;即可 # Ultralytics YOLO &#x1f680;, AGPL-3.0 license """ Run prediction on images, videos, directories, globs, YouTube, webcam, streams, etc.Usage - sources:$ yolo modepred…

Android開發-Android開發中的TCP與UDP通信策略的實現

Android 開發中的 TCP 與 UDP 通信策略的實現 1. 前言2. 準備工作3. Kotlin 中 TCP 通信實現客戶端代碼示例&#xff1a;服務器代碼示例&#xff1a; 4. Kotlin 中 UDP 通信實現客戶端代碼示例&#xff1a;服務器代碼示例&#xff1a; 5. TCP 與 UDP 應用場景分析TCP 實現可靠傳…

搭建訪問阿里云百煉大模型環境

最近這波大降價&#xff0c;還有限時免費&#xff0c;還不趕快試試在線大模型&#xff1f;下面整理訪問百煉平臺的千問模型方法。 創建RAM子賬號并授權 創建RAM子賬號 1. “訪問控制RAM”入口&#xff08;控制臺URL&#xff09; 然后點擊進入“RAM管理控制臺” 2. 添加用戶 …

vue 區分多環境打包

需求&#xff1a;區分不同的環境&#xff08;測試、正式環境&#xff09;&#xff0c;接口文檔地址不同&#xff1b; 配置步驟&#xff1a; 1、在根目錄下面新建 .env.xxx 文件&#xff08;xxx 根據環境不同配置&#xff09; 文件中一定要配置的參數項為&#xff1a;NODE_ENV…