Rabbit與Java相結合
- 引入依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 在配置文件中編寫關于rabbitmq的配置
rabbitmq:host: 192.168.190.132 //這個那個Linxu的端口號port: 5672 //端口號 ,固定的username: admin //這個是那個用戶名password: 123546 //密碼virtual-host: powernode //配置我們發消息發到那個虛擬主機上
- 編寫一個配置類
1裝RabbitMQ之前對Linux的更改
1.1 按照上傳下載工具
yum install lrzsz -y
1.2 查看主機名字
more /etc/hostname
1.3 修改主機名
hostnamectl set-hostname 要修改主機名
4
2 鏈式編程
- 第一步:就是set方法中的返回值全部改為對象本身
public Student setAdderess(String adderess) {
this.adderess = adderess;
return this;
}
- 第二步:就可以使用鏈式編程了,因為當我們執行這個student.setId(20)的時候,我們在set方法中設置的返回值是一個Student對象。所以我們還可以繼續.setName()
public class Chainlearn {public static void main(String[] args) {Student student = new Student();student.setId(20).setName("lisi").setAge("20").setAdderess("shanxi0");System.out.println(student);}
- 使用lombok生成鏈式編程
@Accessors(chain = true) //生成鏈式編程
-
- 第一步就是添加依賴lomback依賴
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.28</version></dependency>
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true) //生成鏈式編程
public class Teacher implements Serializable {private Integer id;private String name;private String age;private String adderess;}
3 建造者模式
建造者模式就是一步一步的建造一個復雜的是對象 。它一般是將一個復雜的對象分解為多個簡單的對象,然后一步一步構造而成
3.1 使用lombok就可以給我們生曾建造者模式
就是使用@Bilder
生成建造者模式代碼
建造者模式就是用來創建對象的。
實現過程
- 在使用lombok注解的情況下,我們在類上定義一個注解
@Builder //生成建造者模式
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true) //生成鏈式編程
@Builder //生成建造者模式
public class Teacher implements Serializable {private Integer id;private String name;private String age;private String adderess;}
- 創建對象并賦值就是使用對象.budiler.屬性名
// 用建造者創建對象Teacher t = Teacher.builder().id(20).name("wangwu").age("10").adderess("真的").build();}
4 系統的啟動任務
就是SpringBoot工程啟動時,就會執行的任務
實現步驟:
就是在springBoot的啟動類上實現ApplicationRunner
接口,并實現里面的方法,完后在方法中就可以輸出東西。(我們應該用@Slf4j
)來輸出東西,不要使用System.out.println("你好");
@SpringBootApplication
@Slf4j
public class Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Application.class,args);}@Overridepublic void run(ApplicationArguments args) throws Exception {log.info("系統已啟動,我就會運行");System.out.println("你好");}
}
5 RabbitMQ的定義
其中MQ的全稱:message queue
消息隊列
5.1 消息中間件
簡單來說,消息中間件就是指保存數據的一個容器(服務器)
,可以用于兩個系統之間的數據傳遞
消息中間件一般有三個主要角色:生產者(producer),消費者(consumer),消息代理(消息隊列、消息服務器)
生產者發送消息到消息服務器,然后消費者從消息代理(消息隊列)中獲取數據并進行處理。
6 使用場景
6.1 異步處理
- 不使用MQ時,
-
- 下訂單:1、執行下訂單業務---->2、再處理加積分業務----->3、發紅包業務---->4、發送手機短信業務
- 使用MQ時,加快執行速度
-
- 只執行下訂單的業務 ,完后直接返回,完后向MQ發送消息---->積分系統處理積分業務后加積分,紅包系統處理發紅包業務后發紅包、手機短信系統執行發送短信的業務后發短信
6.2 系統解耦
- 未使用MQ時
-
- 是2A系統調用B系統,再調用C系統......
- 使用MQ時
-
- A系統往MQ系統發送一條消息,B系統接受到消息處理,C系統接收到消息處理,其中不同的系統可以使用不同的語言來寫。
6.3 流量削峰
就是雙十一時,大量訂單,發送到系統A,則此時系統A可以把消息發送到MQ中,MQ再勻速的發送給系統B,完后由系統B去操作數據庫。
7 RabbitMQ運行環境搭建
7.1 RabbitMQ是由Erlang語言開發的 ,所以需要先下載安裝Erlang
- 下載Erlang
- 安裝erlang前先安裝Linux依賴庫
-
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
- 解壓erlang壓縮包文件
-
tar -zxvf otp_src_25.1.1.tar.gz
- 進入otp_src_25.1.1 目錄 完后配置
-
cd otp_src_25.1.1
./configure
- 編譯
-
make
- 安裝,就是把這個放在path目錄下,這樣在任何的目錄就都可以使用了。
-
make install
- 安裝好了erlang后可以將解壓的文件夾刪除:
-
rm -rf otp_src_25.1.1
- 驗證erlang是否安裝成功
-
- 在命令行輸入:erl如果進入了編程命令行則表示安裝成功,然后按ctrl + z 退出編程命令行;
7.2 安裝RabbitMQ以及啟動和停止
- 解壓RabbitMQ的壓縮包,即安裝完成,無需再編譯
-
tar -xvf rabbitmq-server-generic-unix-3.10.11.tar.xz -C /usr/local/
說明 -C 選項是指定解壓目錄,如果不指定會解壓到當前目錄
此時rabbitmq就安裝好了;
- 啟動RabbitMQ
-
- 首先切換到我們的安裝目錄下
/usr/local
---->cd rabbitmq_server-3.10.11/
- 首先切換到我們的安裝目錄下
--->完后再切換到那個sbin/
目錄下
-
- 就執行命令啟動
./rabbitmq-server -detached
- 當配置完環境變量以后,我們就可以在任何目錄下啟動
rabbitmq-server -detached
- 就執行命令啟動
說明:
-detached 將表示在后臺啟動運行rabbitmq;不加該參數表示前臺啟動;
rabbitmq的運行日志存放在安裝目錄的var目錄下;
現在的目錄是:/usr/local/rabbitmq_server-3.10.11/var/log/rabbitmq
- 停止RabbitMQ
-
- 切換到sbin目錄下執行: 執行
./rabbitmqctl shutdown
- 切換到sbin目錄下執行: 執行
7.3 查看MQ的狀態
- 切換到sbin目錄下執行:執行
說明:-n rabbit 是指定節點名稱為rabbit,目前只有一個節點,節點名默認為rabbit
此處-n rabbit 也可以省略
7.4 配置path環境變量
- 編輯這個文件
vi /etc/profile
- 在這個文件中寫入以下語句
RABBIT_HOME=/usr/local/rabbitmq_server-3.10.11
PATH=$PATH:$RABBIT_HOME/sbinexport RABBIT_HOME PATH
- 刷新環境變量 執行
source /etc/profile
?
8 RabbitMQ的管理命令
8.1 用戶管理
用戶管理包括增加用戶,刪除用戶,查看用戶列表,修改用戶密碼。
這些操作都是通過rabbitmqctl管理命令來實現完成。
- 查看當前用戶列表
-
rabbitmqctl list_users
- 新增一個用戶
-
- 語法:
rabbitmqctl add_user Username Password
- 語法:
- 設置用戶角色
-
rabbitmqctl set_user_tags 用戶名 角色名
- 設置用戶權限
-
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
-
-
".*" ".*" ".*"
這幾個分別是讀、寫、配置的權限
-
- 查看用戶權限
-
rabbitmqctl list_permissions
9 RabbitMQ的 web管理后臺
- 幾個步驟:
-
- Rabbitmq有一個web管理后臺,這個管理后臺是以插件的方式提供的,啟動后臺web管理功能,切換到sbin目錄下執行以下幾步:
-
-
-
- 查看rabbitmq 的插件列表,插件的最前面如果是一個空的[ ]代表沒有啟用
-
-
-
-
-
-
rabbitmq-plugins list
-
-
-
-
-
-
- 啟用
-
-
-
-
-
-
rabbitmq-plugins enable rabbitmq_management
-
-
-
-
-
-
- 禁用
-
-
-
-
-
-
rabbitmq-plugins disable rabbitmq_management
-
-
-
-
- 下一步就是防火墻操作
systemctl status firewalld
--檢查防火墻狀態
systemctl stop firewalld
--關閉防火墻,Linux重啟之后會失效
systemctl disable firewalld
--防火墻置為不可用,Linux重啟后,防火墻服務不自動啟動,依然是不可用
-
- 訪問
-
-
-
- http://192.168.190.132:15672/
-
-
用戶名/密碼為我們上面創建的admin/123456
注意上面改成你的虛擬主機的ip地址
備注:如果使用默認用戶guest、密碼guest登錄,會提示User can only log in via localhost
說明guest用戶只能從localhost本機登錄,所以不要使用該用戶。
9.1 通過web頁面新建虛擬主機
- 第一步:點擊那個admin
- 第二步:點擊那個User完后哪里就可以創建用戶
- 第三步:點擊那個Virtual Hosts就可以添加虛擬主機
10 .RabbitMQ工作模型
broker(服務器) 相當于mysql服務器,virtual host相當于數據庫(可以有多個數據庫)queue相當于表,消息相當于記錄。
消息隊列有三個核心要素: 消息生產者、消息隊列、消息消費者
- 生產者(Producer):發送消息的應用;(java程序,也可能是別的語言寫的程序)
- 消費者(Consumer):接收消息的應用;(java程序,也可能是別的語言寫的程序)
- 代理(Broker):就是消息服務器,RabbitMQ Server就是Message Broker;
- 信道(Channel):連接中的一個虛擬通道,消息隊列發送或者接收消息時,都是通過信道進行的;
- 虛擬主機(Virtual host):一個虛擬分組,在代碼中就是一個字符串,當多個不同的用戶使用同一個RabbitMQ服務時,可以劃分出多個Virtual host,每個用戶在自己的Virtual host創建exchange/queue等;(分類比較清晰、相互隔離)
- 交換機(Exchange):交換機負責從生產者接收消息,并根據交換機類型分發到對應的消息隊列中,起到一個路由的作用;
- 路由鍵(Routing Key):交換機根據路由鍵來決定消息分發到哪個隊列,路由鍵是消息的目的地址;
- 綁定(Binding):綁定是隊列和交換機的一個關聯連接(關聯關系);
- 隊列(Queue):存儲消息的緩存;
- 消息(Message):由生產者通過RabbitMQ發送給消費者的信息;(消息可以任何數據,字符串、user對象,json串等等)
11 .RabbitMQ交換機類型
Exchange(X)可翻譯成交換機/交換器/路由器
11.1 RabbitMQ交換器 (Exchange)類型
Fanout Exchange(扇形) |
Direct Exchange(直連) |
Topic Exchange(主題) |
Headers Exchange(頭部) |
11.2 Fanout Exchange(扇形交換機/器)
Fanout 扇形的,散開的;扇形交換機
投遞到所有綁定的隊列,不需要路由鍵,不需要進行路由鍵的匹配,相當于廣播、群發;
上面P代表product(生產者) X代表交換機/器 紅色的代表隊列
我們的生產者發消息只能發送到交換機上,當消息發送到交換機以后,只要隊列和交換機綁定了,那么就把消息分散的發送到這兩個隊列當中。(如果綁定10個隊列,那么就會分散的發送到這10個隊列當中)
11.3 例子【Fanout Exchange(扇形交換機/器)】
- 實現步驟:
-
- 第一步:添加依賴
<!-- rabbitMQ的依賴--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
-
- 第二步: 添加配置信息
rabbitmq: host: 192.168.190.132 //地址port: 5672 //端口號username: admin //用戶名password: 123546 //密碼virtual-host: powernode //設置發消息發送到那個虛擬主機上
-
- 第三步: 創建一個Rabbit的配置類
-
-
- 在這個配置類中有一個三部曲
-
-
-
-
- 第一步:定義交換機
-
-
@Bean
//FanoutExchange 說明就是定義的一個扇形交換機
public FanoutExchange fanoutExchange(){//括號里面就是給交換機起個名字return new FanoutExchange("exchange.fanout");
}
-
-
-
- 第二步定義隊列(多個隊列)
-
-
@Beanpublic Queue queueA(){ //隊列Areturn new Queue("queue.famout.a");}@Beanpublic Queue queueB(){ //隊列B return new Queue("queue.famout.b");}
-
-
-
- 綁定交換機和隊列(扇形交換機不需要指定key)
-
-
-
-
-
-
- 通過binding這個方法,其中參數就是把交換機傳進來,以及把要綁定的隊列傳進來
- 調用BindingBuilder的.bind()完后把那個隊列傳進來。再使用to()括號里面是交換機的名字
-
-
-
@Beanpublic Binding bindingA(FanoutExchange fanoutExchange ,Queue queueA){return BindingBuilder.bind(queueA).to(fanoutExchange);}@Beanpublic Binding bindingB(FanoutExchange fanoutExchange ,Queue queueB){return BindingBuilder.bind(queueB).to(fanoutExchange);}
-
- 第四步: 發送消息
-
-
- 第一步:注入
rabbitTemplate
- 第二步:創建一個方法來定義消息,完后再使用
Message()
這個類來把消息封裝了.
- 第一步:注入
-
//定義消息String msg ="hello world";//這里Message中需要的參數是byte ,我們需要getBytes,將String轉換為byte//需要使用這個將消息封裝一下Message message = new Message(msg.getBytes());
-
-
- 第三步:就是使用
rabbitTemplate
這個類中convertAndSend
方法來發送消息.其中括號中參數分別是:
- 第三步:就是使用
-
-
-
-
-
- 第一個寫的是我們在配置類中定義的額交換機的名字
- 第二個一般都需要路由key嗎,但是扇形交換機不需要路由key
- 第三個參數就是我們封裝的消息
-
-
-
rabbitTemplate.convertAndSend("exchange.fanout","",message);
扇形交換機發送消息的完整寫法
public class MessageService {//先注入rabbitTemplate// 這里直接可以使用rabbitTemplate是因為springBoot中有一個自動裝配的功能,我們把依賴導入@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(){//定義消息String msg ="hello world";//這里Message中需要的參數是byte ,我們需要getBytes,將String轉換為byte//需要使用這個將消息封裝一下Message message = new Message(msg.getBytes());//發送消息,括號里面第一個寫的是我們在配置類中定義的額交換機的名字//第二個一般都需要路由key嗎,但是扇形交換機不需要路由key//第三個參數就是我們封裝的消息rabbitTemplate.convertAndSend("exchange.fanout","",message);log.info("消息發送完畢,發送時間為:{}" ,new Date());}
}
-
- 第五步: 上面編寫完方法以后,我們最后一步,調用這個方法,來發送消息,在啟動上實現
ApplicationRunner
類(這個類表示程序一啟動就會執行里面的那個方法),完后并重寫里面的方法,完后我們把那個編寫消息的那個類注入進來,完后調用那個方法即可.
public class Application implements ApplicationRunner {@Autowiredprivate MessageService messageService;public static void main( String[] args ){SpringApplication.run(Application.class,args);}/*** 程序一啟動就會運行* @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {messageService.sendMsg();}
上述這樣就代表消息發送過去了
-
- 第六步 : 接收消息
-
-
- 一般情況都是一個模塊發送消息,另一個模塊接收消息
-
-
-
-
- 創建一個新的模塊,來接收消息
- 第一步:還是添加rabbitmq的依賴
- 第二步:在配置文件中添加rabbitmq的相關配置。
- 第三步:編寫一個接收消息的類,完后來接收消息
-
-
-
-
-
-
- 這個類編寫的步驟:
-
-
-
-
-
-
-
-
- 第一步:創建一個方法,在里面傳入Message的參數。
- 第二步:在這個類上標注解@RabbitListener(),其中括號里面寫的就是對隊列的名稱。
- 第三步:就是調用message.getBody()方法,來獲取消息。
- 第四步:就是將接收到消息的字節數組,轉為字符串
-
-
-
-
@Slf4j
@Component
public class ReceiveMessage {/*** 接收兩個隊列的消息* @RabbitListener() 就是接收哪些隊列的消息,就寫哪些隊列*/@RabbitListener(queues = {"queue.famout.a","queue.famout.b"})public void receiveMsg(Message message){byte[] body = message.getBody();//上面那個得到的是字節數據,下面轉為字符串String msg = new String(body);log.info("接收的消息為: {} " ,msg);}
}
11.4 Direct Exchange(直連交換機)
根據路由鍵精確匹配(一模一樣)進行路由消息隊列;
其中P代表的是生產者 X代表的是交換機 C代表的是消費者 紅色的代表隊列
那個error代表的就是路由key
那個info error warning 也是路由key
原理就是 :生產者發送消息到交換機時,也會指定一個key(發送key),交換機發送到隊列也需要一個key(路由key) ,當發送key和路由key相同時,才會將交換機中的消息發送到隊列中
【例子】:當發消息的時候指定的路由key是hello,那么到交換機以后,則消息不會進入任何一個隊列
當發消息的時候指定的路由key是error,則到交換機以后,則消息會同時發送到兩個隊列。
11.5 例子【直連交換機的例子】:
實現步驟:
-
- 第一步:添加依賴
- 第二步:在配置文件中配置rabbitmq的配置
- 第三步:創建一個rabbitmq的配置類
-
-
- 創建步驟:
-
-
-
-
- 還是那三部曲
-
-
-
-
-
-
- 第一步:定義交換機
-
-
-
@Beanpublic DirectExchange directExchange (){return ExchangeBuilder.directExchange(exchangeName).build();}
-
-
-
-
- 定義隊列
-
-
-
@Beanpublic Queue queueA(){return QueueBuilder.durable(queueAName).build();}@Beanpublic Queue queueB(){return QueueBuilder.durable(queueBName).build();}
-
-
-
-
- 交換機與隊列綁定
-
-
-
@Beanpublic Binding bindingA(DirectExchange directExchange,Queue queueA){return BindingBuilder.bind(queueA).to(directExchange).with("error");}@Beanpublic Binding bindingB1(DirectExchange directExchange ,Queue queueB){return BindingBuilder.bind(queueB).to(directExchange).with("info");}@Beanpublic Binding bindingB2(DirectExchange directExchange ,Queue queueB){return BindingBuilder.bind(queueB).to(directExchange).with("error");}@Beanpublic Binding bindingB3(DirectExchange directExchange ,Queue queueB){return BindingBuilder.bind(queueB).to(directExchange).with("warning");}
完整的代碼:
/*** 從配置文件中讀取指定名字的屬性,我們在配置文件中定義exchangeName queueAName queueBName* 那幾個屬性的值,完后再這里讀取。*/
@ConfigurationProperties("my")
public class RabbitConfig {private String exchangeName;private String queueAName;private String queueBName;/*** 三部曲:* 第一步:定義隊列*/@Beanpublic DirectExchange directExchange (){return ExchangeBuilder.directExchange(exchangeName).build();}/*** 第二步:定義隊列*/@Beanpublic Queue queueA(){return QueueBuilder.durable(queueAName).build();}@Beanpublic Queue queueB(){return QueueBuilder.durable(queueBName).build();}/*** 交換機與隊列綁定*/@Beanpublic Binding bindingA(DirectExchange directExchange,Queue queueA){return BindingBuilder.bind(queueA).to(directExchange).with("error");}@Beanpublic Binding bindingB1(DirectExchange directExchange ,Queue queueB){return BindingBuilder.bind(queueB).to(directExchange).with("info");}@Beanpublic Binding bindingB2(DirectExchange directExchange ,Queue queueB){return BindingBuilder.bind(queueB).to(directExchange).with("error");}@Beanpublic Binding bindingB3(DirectExchange directExchange ,Queue queueB){return BindingBuilder.bind(queueB).to(directExchange).with("warning");}
}
-
- 第四步: 創建發送消息的類
-
-
- 第一步: 創建步驟:首先注入
RabbitTemplate
- 第二步: 創建一個方法 ,并創建一個消息
- 第三步: 使用
RabbitTemplate
來發送消息,
- 第一步: 創建步驟:首先注入
-
@Slf4j
public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(){//創建一條消息.使用建造者模式Message message = MessageBuilder.withBody("hello world".getBytes()).build();//下面就是發送消息,括號里面分別是,參數1 交換機的名字, 參數2 路由key, 參數3 消息rabbitTemplate.convertAndSend("exchange.direct", "info",message);log.info("消息發送完畢");}
11.6 Topic Exchange(主題交換機)
通配符匹配,相當于模糊匹配;
#匹配多個單詞,用來表示任意數量(零個或多個)單詞
*匹配一個單詞(必須有一個,而且只有一個),用 . 隔開的為一個
【例子】:beijing.# == beijing.queue.abc, beijing.queue.xyz.xxx
beijing.* == beijing.queue, beijing.xyz
發送時指定的路由鍵:lazy.orange.rabbit 則與隊列Q1 和隊列Q2都匹配。
但是雖然與Q2隊列有兩個都匹配的,但是,也只會進去以條消息。
11.7 例子【主題交換機】
編寫步驟:
- 第一步:添加mq的依賴
- 第二步:在配置文件中添加mq的配置
- 第三步:編寫一個配置類
-
- 編寫配置類的步驟:
-
-
- 第一步:定義交換機
-
@Beanpublic TopicExchange topicExchange(){return ExchangeBuilder.topicExchange(exchangeName).build();}
-
-
- 第二步:定義隊列
-
@Beanpublic Queue queueA(){return QueueBuilder.durable(queueAName).build();}@Beanpublic Queue queueB(){return QueueBuilder.durable(queueBName).build();}
-
-
- 交換機與隊列綁定
-
@Beanpublic Binding bindingA(TopicExchange topicExchange,Queue queueA){return BindingBuilder.bind(queueA).to(topicExchange).with("*.orange.*");}@Beanpublic Binding bindingB1(TopicExchange topicExchange,Queue queueB){return BindingBuilder.bind(queueB).to(topicExchange).with("*.*.rabbit");}@Beanpublic Binding bindingB2(TopicExchange topicExchange,Queue queueB){return BindingBuilder.bind(queueB).to(topicExchange).with("lazy.#");}
- 第四步:編寫發送消息
@Service
public class MessageService {@Autowiredprivate AmqpTemplate amqpTemplate;/*** 發送消息*/@Beanpublic void sendMsg(){Message message = MessageBuilder.withBody("hello world".getBytes()).build();//有幾個參數第一個就是交換機的名字,第二個就是路由key,第三個就是消息amqpTemplate.convertAndSend("exchange.topic","lazy.orange.rabbit",message);}
- 第五步:讓啟動類實現
ApplicationRunner
并重寫里面的方法(方法的作用就是程序一啟動就會調用發消息的那個方法)
public class Application implements ApplicationRunner {@Autowiredprivate MessageService messageService;public static void main(String[] args) {SpringApplication.run(Application.class, args);}@Overridepublic void run(ApplicationArguments args) throws Exception {messageService.sendMsg();}
11.8 Headers Exchange(頭部交換機)
基于消息內容中的headers屬性進行匹配;
其中 P代表的消息的生產者 C代表消息的消費者 X代表的是交換機 紅色代表隊列
原理: 就是在發消息的時候,我們在消息的頭部在加上匹配的值,看與那個隊列匹配。
11.9 例子【頭部交換機】
實現步驟:
- 第一步:引入依賴
- 第二步:在配置文件中加入配置
- 第三步:創建一個配置類
-
- 配置類的編寫步驟
-
-
- 第一步:定義頭部交換機
-
@Beanpublic HeadersExchange headersExchange(){return ExchangeBuilder.headersExchange(exchangeName).build();}
-
-
- 第二步:創建隊列
-
@Beanpublic Queue queueA(){return QueueBuilder.durable(queueAName).build();}@Beanpublic Queue queueB(){return QueueBuilder.durable(queueBName).build();}
-
-
- 第三步:交換機與隊列綁定,這個需要是頭部的信息綁定,所以我們需要創建一個隊列,存儲每個交換機與隊列的綁定。
-
@Beanpublic Binding bindingA(HeadersExchange headersExchange,Queue queueA){Map<String,Object> headerValues = new HashMap<>();headerValues.put("type","m");headerValues.put("status",1);return BindingBuilder.bind(queueA).to(headersExchange).whereAll(headerValues).match();}@Beanpublic Binding bindingB(HeadersExchange headersExchange,Queue queueB){Map<String,Object> headerValues = new HashMap<>();headerValues.put("type","s");headerValues.put("status",0);return BindingBuilder.bind(queueB).to(headersExchange).whereAll(headerValues).match();}
- 第四步:創建發送消息
public class MessageService {@Value("${my.exchangeName}")private String exchangeName;@Autowiredprivate RabbitTemplate rabbitTemplate;public void sandMsg(){//消息屬性MessageProperties messageProperties = new MessageProperties();Map<String ,Object> headers = new HashMap<>();headers.put("type","s");headers.put("status",0);//設置消息頭messageProperties.setHeaders(headers);//添加消息屬性Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();//頭部交換機,不需要只當路由keyrabbitTemplate.convertAndSend(exchangeName,"",message);}
}
12 RabbitMQ過期消息
過期消息也叫TTL消息,TTL:Time To Live
消息的過期時間有兩種設置方式:(過期消息)
12 .1 設置單條消息的過期時間
就是在MessageProperties()
中有一個setExpiration()
來設置單個消息的過期時間。
設置步驟:
- 前面的引入依賴和配置文件的編寫都一樣,以及配置類的編寫沒有任何區別,所以我們這里只編寫發送消息的步驟
- 發送消息的步驟:
-
- 第一步:先創建一個
MessageProperties()
,完后再調用setExpiration()
來設置消息的過期時間
- 第一步:先創建一個
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("15000");//設置過期的毫秒數
-
- 第二步:通過建造者模式中的
MessageBuilder
來創建消息
- 第二步:通過建造者模式中的
Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();
-
- 第三步:通過模板方法
rabbitTemplate.convertAndSend
來發送消息
- 第三步:通過模板方法
rabbitTemplate.convertAndSend("exchange.ttl.a","info",message);
完整的代碼編寫
@Service
@Slf4j
public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(){MessageProperties messageProperties = new MessageProperties();messageProperties.setExpiration("15000");//設置過期的毫秒數Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend("exchange.ttl.a","info",message);log.info("消息發送完畢");}
}
12.2 通過隊列屬性設置消息過期時間
設置步驟:
- 第一步:我們還是編寫配置類,并在里面編寫三部曲,這里只不過在設置隊列的時候,來設置隊列的屬性
-
- 設置隊列屬性的步驟:
-
-
- 有兩種方式:
-
-
-
-
- 第一種:就是通過new的方式,就是new Queue的時候,里面有5個參數,其中第一個是
隊列名稱
,第一個就是是否持久化
,還有一個Map集合,其中就是在Map集合中來設置過期時間。 - 第二步:就是創建一個集合,完后使用Map.put()方式,來向集合中放入隊列的過期時間,其中隊列過期時間的屬性
x-message-ttl
- 第一種:就是通過new的方式,就是new Queue的時候,里面有5個參數,其中第一個是
-
-
@Beanpublic Queue queue(){//方式1,new的方式Map<String ,Object> arguments =new HashMap<>();arguments.put("x-message-ttl",15000);return new Queue(queueName,true,false,false,arguments);}
-
-
- 第二種就是:建造者模式: 這種模式,還是要提前創建一個Map集合,完后通過建造者模式,將集合傳入進去。
-
-
- 完整的代碼
//第二種:建造者模式
@Beanpublic Queue queue(){//方式1,new的方式Map<String ,Object> arguments =new HashMap<>();arguments.put("x-message-ttl",15000); return QueueBuilder.durable(queueName).withArguments(arguments).build();
- 第二步:發送消息的步驟:
-
- 因為這個設置了隊列的過期消息,所以,這里只需要,創建消息,完后再使用rabbit模板發送消息
public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(){Message message = MessageBuilder.withBody("hello world".getBytes()).build();rabbitTemplate.convertAndSend("exchange.ttl.b","info",message);log.info("消息發送完畢");}
}
【注意】:如果單個消息和隊列都設置了過期時間,那么消息的過期時間以二者之間較小的那個數值為準。
13 RabbitMQ的死信隊列
13.1如何消費者的手動確認
-
- 第一步:在消費者端的配置文件中加入配置
listener:simple:acknowledge-mode: manual
-
- 第二步:在接收消息的一段那段配置來通知其刪除
-
-
- 實現過程:
-
-
-
-
- 第一步:先編寫一個方法,參數是Message Channel 倆個參數
- 第二步:獲取消息的屬性
-
-
//第一步:獲取消息的屬性MessageProperties messageProperties = message.getMessageProperties();
-
-
-
- 第三步:根據消息的屬性來獲取消息的唯一標識(如同身份證號)
-
-
//獲取消息的唯一標識(如同身份證號)long tag = messageProperties.getDeliveryTag();
-
-
-
- 第四步:編寫一個try ....catch 代碼塊,try { }里面編寫的正常接收到消息,正常接收消息后,則告訴消費者就可以刪除了
channel.basicAck(tag,false);
- 第四步:編寫一個try ....catch 代碼塊,try { }里面編寫的正常接收到消息,正常接收消息后,則告訴消費者就可以刪除了
-
-
try {byte[] body = message.getBody();String str = new String(body);log.info("接收到的消息為"+ str);//正常接收后,告訴服務器可以刪除了。//消費者的手動確認 tag代表確認的就是這條消息//false代表只確認當前一條channel.basicAck(tag,false);
-
-
-
- 第五步:就是在catch里面編寫的,說明報錯了,告訴服務器不要刪除,我們需要重新接收一遍
channel.basicNack(tag,false,true);
- 第五步:就是在catch里面編寫的,說明報錯了,告訴服務器不要刪除,我們需要重新接收一遍
-
-
}catch (Exception e){//報錯了,我們沒有接收到消息,告訴服務器不要刪除,我們再重新接收一遍log.error("接收者出現問題");//告訴服務器不要刪除 ,第一個參數:當前消息的標識//第二個參數,只代表當前消息//第三個參數:重新放回到隊列里面try {channel.basicNack(tag,false,true);} catch (IOException ex) {e.printStackTrace();}throw new RuntimeException(e);}
- 完整的代碼編寫
package com.powernode.message;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
@Slf4j
public class MessageReceive {@RabbitListener(queues = {"queue.normal.4"})public void receviceMsg(Message message , Channel channel){//第一步:獲取消息的屬性MessageProperties messageProperties = message.getMessageProperties();//獲取消息的唯一標識(如同身份證號)long tag = messageProperties.getDeliveryTag();try {byte[] body = message.getBody();String str = new String(body);log.info("接收到的消息為"+ str);//正常接收后,告訴服務器可以刪除了。//消費者的手動確認 tag代表確認的就是這條消息//false代表只確認當前一條channel.basicAck(tag,false);}catch (Exception e){//報錯了,我們沒有接收到消息,告訴服務器不要刪除,我們再重新接收一遍log.error("接收者出現問題");//告訴服務器不要刪除 ,第一個參數:當前消息的標識//第二個參數,只代表當前消息//第三個參數:重新放回到隊列里面try {channel.basicNack(tag,false,true);} catch (IOException ex) {e.printStackTrace();}throw new RuntimeException(e);}}
}
13.2 什么條件下會變成死信隊列?
-
- 1、消息過期
-
-
- 是通過定義消息屬性來設置消息的過期時間,在正常隊列哪里設置死信交換機
-
public void sendMsg(){//定義一個消息屬性MessageProperties messageProperties = new MessageProperties();messageProperties.setExpiration("15000");//定義消息Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();//發消息的幾個參數rabbitTemplate.convertAndSend("exchange.normal.2","order",message);
-
- 2、隊列過期
-
-
- 是在定義正常隊列哪里設置隊列過期,以及設置死信交換機
-
@Beanpublic Queue normalQueue(){Map<String, Object> arguments =new HashMap<>();
// //設置消息過期時間arguments.put("x-message-ttl" ,20000);//設置死信交換機,一旦消息過了20秒,而且還沒有消費者,則就會進入這個死信交換機。arguments.put("x-dead-letter-exchange",exchangeDlxName);//設置死信路由key,這個key就是死信交換機和死信隊列綁定的呢個keyarguments.put("x-dead-letter-routing-key","error" );return QueueBuilder.durable(queueNormalName).withArguments(arguments).build();}
-
- 3、隊列達到最大長度,當達到隊列設置的最大長度時,如果此時還有消息進入隊列,那么最開始進入隊列的消息就會變成死信。
public Queue normalQueue(){Map<String, Object> arguments =new HashMap<>();//設置隊列的最大長度arguments.put("x-max-length",5);//設置死信交換機,一旦消息過了20秒,而且還沒有消費者,則就會進入這個死信交換機。arguments.put("x-dead-letter-exchange",exchangeDlxName);//設置死信路由key,這個key就是死信交換機和死信隊列綁定的呢個keyarguments.put("x-dead-letter-routing-key","error" );return QueueBuilder.durable(queueNormalName).withArguments(arguments).build();}
-
- 4、消費者從正常的隊列中接收消息,但是對消息不進行確認,并且不對消息進行重新投遞(不重新入隊),此時消息就進入死信隊列。(就是當我們開啟了手動確認以后,在發生異常之后,消費者手動不確認,并且不重新入隊)
channel.basicNack(tag,false,flase);
- 4、消費者從正常的隊列中接收消息,但是對消息不進行確認,并且不對消息進行重新投遞(不重新入隊),此時消息就進入死信隊列。(就是當我們開啟了手動確認以后,在發生異常之后,消費者手動不確認,并且不重新入隊)
catch (Exception e){//報錯了,我們沒有接收到消息,告訴服務器不要刪除,我們再重新接收一遍log.error("接收者出現問題");//告訴服務器不要刪除 ,//第一個參數:當前消息的標識//第二個參數,只代表當前消息//第三個參數:是否重新放回到隊列里面try {channel.basicNack(tag,false,flase);} catch (IOException ex) {e.printStackTrace();}throw new RuntimeException(e);
-
- 5、消費者拒絕消息
開啟手動確認模式,并拒絕消息,不重新投遞(不重新入隊),則進入死信隊列
channel.basicReject(tag,false);
}catch (Exception e){log.error("接收者出現問題");try {//消費者拒絕消息,// 第一個參數:當前消息的唯一標識//第二個參數:是否重新入隊;//basicReject與basicNack的區別:basicReject只能處理一條消息channel.basicReject(tag,false);} catch (IOException ex) {e.printStackTrace();}throw new RuntimeException(e);}}
13.3 死信隊列的詳細解釋
其中死信隊列也叫做死信交換機、死信郵箱等說法。
DLX:Dead - Letter - Exchange 死信交換器、死信郵箱
過程:
第一步:生產者生產消息 ----> 消息發送到交換機 ----> 交換機根據路由key發送到隊列----->當沒有死信交換機之前 在規定的時間內消息過期無人接收,則消息丟了 -----> 創建一個死信交換機 -----> 當創建一個死信交換機之后,在規定的時間內無人接收的消息則進入死信交換機 。---->死信交換機根據路由key路由到另一條隊列(死信隊列)---->消費者也可以接收死信隊列的信息。
代碼的實現過程
- 第一步:引入依賴
- 第二步:在配置文件中配置
- 第三步:創建一個Rabbit的配置類(就是創建交換機等)
-
- 配置類中的第一步:就是引入配置類中的交換機和隊列的名稱
- 第二步:創建正常的交換機
@Beanpublic DirectExchange normalExchange(){return ExchangeBuilder.directExchange(exchangeNormalName).build();}
-
- 第三步:創建正常的隊列,并在正常的隊列中,設置消息的過期時間,設置死信交換機,以及設置死信路由key (就是死信交換機與死信隊列綁定的那個key)
@Beanpublic Queue normalQueue(){Map<String, Object> arguments =new HashMap<>();//設置消息過期時間arguments.put("x-message-ttl" ,20000);//設置死信交換機,一旦消息過了20秒,而且還沒有消費者,則就會進入這個死信交換機。arguments.put("x-dead-letter-exchange",exchangeDlxName);//設置死信路由key,這個key就是死信交換機和死信隊列綁定的呢個keyarguments.put("x-dead-letter-routing-key","error" );return QueueBuilder.durable(queueNormalName).withArguments(arguments).build();}
-
- 第四步:正常的交換機與正常的隊列綁定
public Binding bindingNormal(DirectExchange normalExchange ,Queue normalQueue){return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");}
-
- 第五步:聲明死信交換機(就和正常的交換機一樣)
@Beanpublic DirectExchange dlxExchange(){return ExchangeBuilder.directExchange(exchangeDlxName).build();}
-
- 第六步:聲明死信隊列
@Beanpublic Queue dlxQueue(){return QueueBuilder.durable(queueDlxName).build();}
-
- 第七步:死信隊列與死信交換機綁定
@Beanpublic Binding bindingDlx(DirectExchange dlxExchange ,Queue dlxQueue){return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");}
-
- 第八步:創建消息,并發送消息
@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(){//定義消息Message message = MessageBuilder.withBody("hello world".getBytes()).build();//發消息的幾個參數rabbitTemplate.convertAndSend("exchange.normal.a","order",message);log.info("消息發送完畢");}
14 RabbitMQ的延遲隊列
使用場景就是關閉規定時間內未支付的訂單
場景:有一個訂單,15分鐘內如果不支付,就把該訂單設置為交易關閉,那么就不能支付了,這類實現延遲任務的場景就可以采用延遲隊列來實現,當然除了延遲隊列來實現,也可以有一些其他辦法實現;
14.1 定時任務方式
- 每隔3秒掃描一次數據庫,查詢過期的訂單然后進行處理;
優點:
簡單,容易實現;
缺點:
1、存在延遲(延遲時間不準確),如果你每隔1分鐘掃一次,那么就有可能延遲1分鐘;
2、性能較差,每次掃描數據庫,如果訂單量很大
14.2、 被動取消
當用戶查詢訂單的時候,判斷訂單是否超時,超時了就取消(交易關閉)
優點:
對服務器而言,壓力小;
缺點:
1、用戶不查詢訂單,將永遠處于待支付狀態,會對數據統計等功能造成影響;
2、用戶打開訂單頁面,有可能比較慢,因為要處理大量訂單,用戶體驗少稍差;
14.3、JDK延遲隊列(單體應用,不能分布式下)
DelayedQueue
無界阻塞隊列,該隊列只有在延遲期滿的時候才能從中獲取元素
優點:
實現簡單,任務延遲低;
缺點:
服務重啟、宕機,數據丟失;
只適合單機版,不適合集群;
訂單量大,可能內存不足而發生異常; oom
14.4、 采用消息中間件(rabbitmq)
1、RabbitMQ本身不支持延遲隊列,可以使用TTL(消息過期)結合DLX(死信隊列)的方式來實現消息的延遲投遞,即把DLX跟某個隊列綁定,到了指定時間,消息過期后,就會從DLX路由到這個隊列,消費者可以從這個隊列取走消息。
上述 :order.ttl.queue代表的是正常隊列
order.dlx.queue 代表的是死信隊列
其中交換機只有一個,當生產者根據key找到交換機以后,交換機根據key找到正常隊列,過期以后,正常隊列指定的死信交換機還是那個交換機(即那條消息就會返回給那個交換機),完后交換機根據死信路由key,發送到死信隊列中
存在問題 :如何解決消息過期時間不一致的問題?
如果先發送的消息,消息延遲時間長,會影響后面的 延遲時間段的消息的消費;
就是因為隊列是先進先出,因為第一條消息如果設置的時間大于第二個消息設置的時間,那么只有等第一個消息過期以后,才能輪到第二個消息,那么這樣的話,第二個消息就和設置的過期時間不一致。
解決方式:
不同延遲時間的消息要發到不同的隊列上,同一個隊列的消息,它的延遲時間應該一樣(即相同的過期時間放到相同的隊列里面)。
代碼實現
- 第一步:引入依賴
- 第二步:引入配置文件的信息
- 第三步:編寫交換機以及隊列,并綁定,在隊列哪里指定死信交換機的名稱。
- 第四步:編寫發送消息的類,并創建幾條消息,并指定消息的過期時間。分別發送到不同時間的交換機上。
- 第五步:接收消息(接收的死信隊列的消息)。
14 .5 、 使用rabbitmq-delayed-message-exchange 延遲插件
- 第一步:選擇對應的版本下載 rabbitmq-delayed-message-exchange 插件,下載地址:http://www.rabbitmq.com/community-plugins.html
- 第二步:下載完成以后,解壓這個插件
unzip rabbitmq_delayed_message_exchange-3.10.2.ez
如果unzip 沒有安裝,先安裝一下
yum install unzip -y
- 第三步:先找sbin 目錄, 找到這個 plugins目錄,啟用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
開啟插件;
原理:
安裝這個插件以后,就會有一個新的交換機: 延遲交換機類似于直連交換機
消息發送后不會直接投遞到隊列,而是存儲到 Mnesia(嵌入式數據庫),檢查 x-delay 時間(消息頭部);
延遲插件在 RabbitMQ 3.5.7 及以上的版本才支持,依賴 Erlang/OPT 18.0 及以上運行環境;
Mnesia 是一個小型數據庫,不適合于大量延遲消息的實現
解決了消息過期時間不一致出現的問題。
代碼實現過程:
- 第一步:引入mq依賴
- 第二步:在配置文件中引入配置信息
- 第三步:創建一個配置類,來創建延遲交換機和隊列,以及綁定(通過自定義交換機來創建)
@Configuration
public class RabbitConfig {@Value("${my.exchangeName}")private String exchangeName;@Value("${my.queueDelayName}")private String queueDelayName;/*** 創建交換機* 使用自定交換機來創建延遲交換機* @return*/@Beanpublic CustomExchange customExchange(){//自定義交換機Map<String, Object> arguments =new HashMap<>();arguments.put("x-delayed-type","direct");return new CustomExchange(exchangeName,"x-delayed-message",true,false,arguments);}/*** 創建隊列*/@Beanpublic Queue queue(){return QueueBuilder.durable(queueDelayName).build();}/*** 交換機與隊列綁定*/@Beanpublic Binding binding(CustomExchange customExchange ,Queue queue){return BindingBuilder.bind(queue).to(customExchange).with("plugin").noargs();}
- 第四步 :發送消息
@Service
@Slf4j
public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 發送消息*/@Beanpublic void sendMsg() {{MessageProperties messageProperties = new MessageProperties();messageProperties.setHeader("x-delay", 25000); //第一條消息,設置延遲時間Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend("exchange.delay.4", "plugin", message);log.info("發送消息完畢");}{MessageProperties messageProperties = new MessageProperties();messageProperties.setHeader("x-delay", 15000); //第二天消息,設置延遲時間Message message = MessageBuilder.withBody("hello world111".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend("exchange.delay.4", "plugin", message);log.info("發送消息完畢");}}
- 第五步 :接收消息
@Component
@Slf4j
public class ReceiveMessage {/*** 接收延遲隊列的消息,使用的是插件* @param message*/@RabbitListener(queues = {"queue.delay.4"})public void receiveMsg(Message message){String body = new String(message.getBody());log.info("接收到消息的為: "+body);}
15 消息的可靠性投遞
消息的可靠性投遞就是要保證消息投遞過程中每一個環節都要成功,那么這肯定會犧牲一些性能,性能與可靠性是無法兼得的;
如果業務實時一致性要求不是特別高的場景,可以犧牲一些可靠性來換取性能。
① 代表消息從生產者發送到Exchange; ----使用RabbitMQ消息Confirm模式(生產者確認回調)
來解決
② 代表消息從Exchange路由到Queue---使用RabbitMQ消息Return模式 備用交換機模式
來解決
③ 代表消息在Queue中存儲;------- 通過設置交換機的持久化,以及隊列的持久化、每條消息的持久化
④ 代表消費者監聽Queue并消費消息;----自動確認改成手動確認
15 .1 RabbitMQ消息Confirm模式(生產者確認回調)----可能因為網絡或者Broker的問題導致①失敗,而此時應該讓生產者知道消息是否正確發送到了Broker的exchange中;
消息的confirm確認機制,是指生產者投遞消息后,到達了消息服務器Broker里面的exchange交換機,則會給生產者一個應答,生產者接收到應
答,用來確定這條消息是否正常的發送到Broker的exchange中,這也是消息可靠性投遞的重要保障;
代碼的具體實現過程(如何實現生產者確認回調)
- 第一步: 引入依賴
- 第二步:在配置文件中配置信息
publisher-confirm-type: correlated
開啟確認模式
rabbitmq:host: 192.168.190.132port: 5672username: adminpassword: 123456virtual-host: powernode #pei zhi fa xioa xi fa dao na ge xu ni zhu ji shangpublisher-confirm-type: correlated #開啟生產者的確認模式,設置成關聯模式
- 第三步:編寫一個類實現
RabbitTemplate.ConfirmCallback
類,并實現里面的方法,在方法里面有三個參數,第一個參數是關聯數據
第二個參數是判斷消息是否到達交換機
,第三個是原因
,這個就是不管消息有沒有到達交換機,都會回調這個類,給與我們提示。【這個類也可以直接在發消息的時候直接實現】
@Component
@Slf4j
public class MyConfirmCallBack implements RabbitTemplate.ConfirmCallback {/**** @param correlationData correlation data for the callback. 關聯數據* @param ack true for ack, false for nack 真 到達交換機 或者假 沒有到達交換機* @param cause An optional cause, for nack, when available, otherwise null. 原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info(correlationData.getId()); //獲得idif (ack){//說明消息到達交換機log.info("消息正確到達交換機");return;}//說明消息沒有到達交換機log.error("消息沒有到達交換機,原因為{}" ,cause);}
}
@Service
@Slf4j
public class MessageService implements RabbitTemplate.ConfirmCallback{@Autowiredprivate RabbitTemplate rabbitTemplate;//這個rabbitTemplate 還沒有使用我們那個回調接口@PostConstruct //構造方法后執行,如同初始化public void init(){rabbitTemplate.setConfirmCallback(this);}public void sendMsg(){Message message = MessageBuilder.withBody("hello world".getBytes()).build();//定義一個關聯數據CorrelationData correlationData =new CorrelationData();correlationData.setId("order_123456"); //發送訂單信息rabbitTemplate.convertAndSend("exchange.confrim.1","info",message,correlationData);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info(correlationData.getId()); //獲得idif (ack){//說明消息到達交換機log.info("消息正確到達交換機");return;}//說明消息沒有到達交換機log.error("消息沒有到達交換機,原因為{}" ,cause);}
}
- 第四步:在我們發消息的那個類中定義這個關聯數據,并調用,完后在我們發送消息到交換機以后,不管有沒有到達交換機,都會給我們一個提示
@Service
@Slf4j
public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate MyConfirmCallBack myConfirmCallBack;//這個rabbitTemplate 還沒有使用我們那個回調接口@PostConstruct //構造方法后執行,如同初始化public void init(){rabbitTemplate.setConfirmCallback(myConfirmCallBack);}public void sendMsg(){Message message = MessageBuilder.withBody("hello world".getBytes()).build();//定義一個關聯數據CorrelationData correlationData =new CorrelationData();correlationData.setId("order_123456"); //發送訂單信息rabbitTemplate.convertAndSend("exchange.confrim.1","info",message,correlationData);}
}
15.2 RabbitMQ消息Return模式(保證交換機的消息到達隊列)-----可能因為路由關鍵字錯誤,或者隊列不存在,或者隊列名稱錯誤導致②失敗。
rabbitmq 整個消息投遞的路徑為:
producer —> exchange —> queue —> consumer
>> 消息從 producer 到 exchange 則會返回一個 confirmCallback
>> 消息從 exchange –> queue 投遞失敗則會返回一個 returnCallback;
我們可以利用這兩個callback控制消息的可靠性投遞
代碼的實現過程
- 第一步:引入依賴
- 第二步:在配置文件配置相關信息,開啟return模式
rabbitmq:host: 192.168.190.132port: 5672username: adminpassword: 123456virtual-host: powernode #pei zhi fa xioa xi fa dao na ge xu ni zhu ji shangpublisher-returns: true #開啟return模式
- 第三步:編寫一個類來實現
RabbitTemplate.ReturnsCallback
并實現里面的方法,(如果這個類被調用,說明沒有正確的路由到隊列)
@Component
@Slf4j
public class MyReturnCallBack implements RabbitTemplate.ReturnsCallback {@Overridepublic void returnedMessage(ReturnedMessage returned) {//如果這個方法被調用,說明消息沒有到達隊列log.error("消息從交換機沒有正確的路由到隊列,原因為{}",returned.getMessage());}
}
- 第四步:在發送消息的類中,來設置,使
rabbitTemplate
調用我們編寫的那個類
public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate MyReturnCallBack myReturnCallBack;//這個rabbitTemplate 還沒有使用我們那個回調接口@PostConstruct //構造方法后執行,如同初始化public void init(){rabbitTemplate.setReturnsCallback(myReturnCallBack);}
- 第五步 : 編寫發送消息
@Service
@Slf4j
public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate MyReturnCallBack myReturnCallBack;//這個rabbitTemplate 還沒有使用我們那個回調接口@PostConstruct //構造方法后執行,如同初始化public void init(){rabbitTemplate.setReturnsCallback(myReturnCallBack);}public void sendMsg(){Message message = MessageBuilder.withBody("hello world".getBytes()).build();rabbitTemplate.convertAndSend("exchange.return.1","info11111",message);}
}
15.3 確保消息在隊列正確地存儲
可能因為系統宕機、重啟、關閉等等情況導致存儲在隊列的消息丟失,即③出現問題;
(1)、隊列持久化
QueueBuilder.durable(QUEUE).build();
(2)、交換機持久化
ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();
(3)、消息持久化
MessageProperties messageProperties = new MessageProperties();messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
15.4 確保消息從隊列正確地投遞到消費者
如果消費者收到消息后未來得及處理即發生異常,或者處理過程中發生異常,會導致④失敗。
為了保證消息從隊列可靠地達到消費者,RabbitMQ提供了消息確認機制(message acknowledgement);
實現步驟:
- #開啟手動ack消息消費確認 ,在配置文件中配置
spring.rabbitmq.listener.simple.acknowledge-mode=manual
- #編寫配置類(配置交換機、隊列、以及綁定)
- 編寫一個發送消息的類
- 編寫一個接收消息的類 ,開啟手動消息確認后,我們接收完消息后,并不會刪除
@Component
@Slf4j
public class ReceiveMessage {/*** 接收延遲隊列的消息,使用的是插件* @param message*/@RabbitListener(queues = {"queue.delay.4"})public void receiveMsg(Message message , Channel channel){//獲取消息的唯一標識long tag = message.getMessageProperties().getDeliveryTag();String body = null;try {body = new String(message.getBody());log.info("接收到消息的為: "+body);//TODO 正確的話,我們會有插入數據庫等//正確的話,就是告訴服務器,可以刪除了//第一個參數是,當前消息,// 第二個參數:是否批量處理channel.basicAck(tag,false);} catch (Exception e) {log.error("消息處理出現問題");try {//三個參數分別是:當前消息//是否是批量處理//是否重新入隊channel.basicNack(tag,false,true);} catch (IOException ex) {throw new RuntimeException(ex);}throw new RuntimeException(e);}
16 交換機的屬性
- 具體參數(以下屬性是在創建交換機的時候設置的)
-
- Name:交換機名稱;就是一個字符串
- Type:交換機類型,direct, topic, fanout, headers四種
- Durability:持久化,聲明交換機是否持久化,代表交換機在服務器重啟后是否還存在;默認是持久化的, yes為持久化 ,flase為不持久化(沒有保存到磁盤上).
- Auto delete:是否自動刪除,曾經有隊列綁定到該交換機,后來解綁了,那就會自動刪除該交換機;
- Internal:內部使用的,如果是yes,客戶端無法直接發消息到此交換機,它只能用于交換機與交換機的綁定。
- Arguments:只有一個取值alternate-exchange,表示備用交換機;當隊列與交換機因為路由無法發送消息,則會發送到備用交換機上.
- 例子
@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange(exchangeName).durable(true).autoDelete() //設置自動刪除,默認不是自動刪除的,boolean類型的默認為flase.build();}
16.1 備用交換機
正常的情況下,備用隊列是不應該有消息的,當備用隊列有消息時,說明出錯了.提示我們及時更改代碼.
主交換機是direct(直連的) 備用交換機是fanout(扇形的)
描述:就是我們生產者與交換機的key是hello ,完后交換機與隊列的key是info,完后就不能進入隊列里面,則就會進入備用交換機里面,從而進入備用隊列。
當路由沒有寫錯,則就會進入指定的交換機,不會進入備用交換機。
- 備用交換機的設置:
- 第一種:使用建造者模式的時候
.alternate()
public DirectExchange directExchange(){return ExchangeBuilder.directExchange(exchangeName).alternate() //設置備用交換機.build();
17 隊列的詳細屬性
隊列屬性的設置都是在創建隊列的時候,來設置隊列的屬性。
- Type:隊列類型
- Name:隊列名稱,就是一個字符串,隨便一個字符串就可以;
- Durability:聲明隊列是否持久化,代表隊列在服務器重啟后是否還存在;
- Auto delete: 是否自動刪除,如果為true,當沒有消費者連接到這個隊列的時候,隊列會自動刪除;
- Exclusive:exclusive屬性的隊列只對首次聲明它的連接可見,并且在連接斷開時自動刪除;基本上不設置它,設置成false
- Arguments:隊列的其他屬性,例如指定DLX(死信交換機等)
-
- x-expires:Number
當Queue(隊列)在指定的時間未被訪問,則隊列將被自動刪除;
-
- x-message-ttl:Number
發布的消息在隊列中存在多長時間后被取消(單位毫秒);
-
- x-overflow:String
設置隊列溢出行為,當達到隊列的最大長度時,消息會發生什么,
有效值為Drop Head 刪除頭部 默認的
Reject Publish 拒絕發布 ,一旦隊列滿了以后,就不再接收新的消息
-
- x-max-length:Number
隊列所能容下消息的最大長度,當超出長度后,新消息將會覆蓋最前面的消息,類似于Redis的LRU算法;
-
- x-single-active-consumer:默認為false
激活單一的消費者,也就是該隊列只能有一個消息者消費消息;
-
- x-max-length-bytes:Number
限定一定的字節裝入隊列,當超出后就不在裝入隊列;
-
- x-dead-letter-exchange:String
指定隊列關聯的死信交換機,有時候我們希望當隊列的消息達到上限后溢出的消息不會被刪除掉,而是走到另一個隊列中保存起來;
-
- x-dead-letter-routing-key:String
指定死信交換機的路由鍵,一般和6一起定義;
-
- x-max-priority:Number
如果將一個隊列加上優先級參數,那么該隊列為優先級隊列; 數字越大優先級越高,默認優先級為0
(1)、給隊列加上優先級參數使其成為優先級隊列
x-max-priority=10【0-255取值范圍】
(2)、給消息加上優先級屬性
通過優先級特性,將一個隊列實現插隊消費;
實現方式:在創建消息的時候,通過MessageProperties
中的setPriority()來設置優先級。
public void sendMsg(){MessageProperties messageProperties =new MessageProperties();messageProperties.setPriority(6);Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend("exchange.return.1","info11111",message);}
18 消息的冪等性
消息消費時的冪等性(消息不被重復消費)
同一個消息,第一次接收,正常處理業務,如果該消息第二次再接收,那就不能再處理業務,否則就處理重復了;
冪等性是:對于一個資源,不管你請求一次還是請求多次,對該資源本身造成的影響應該是相同的,不能因為重復的請求而對該資源重復造成影響;
其中select 、update、delete 都是冪等的。insert是非冪等的。
在http中:get、put、delete都是冪等的。post是非冪等的。
18.1 如何避免消息的重復消費問題?
全局唯一ID + Redis
生產者在發送消息時,為每條消息設置一個全局唯一的messageId,消費者拿到消息后,將其放入redis中,并設置這個key,完后再對設置的結果進行判斷,如果為true,說明這條消息不存在就可以存入數據庫了 ,當一條相同的id的消息再次過來,則返回結果為flase,則不處理。
private ObjectMapper objectMapper
用于序列化和反序列化的 (json格式的)
objectMapper.writeValueAsString(傳入的對象)
將對象轉為字符串
objectMapper.readValue(消息,要轉成那個對象)
將字符串或字節數組,轉為對象
序列化----就是將對象轉為字符串,或者字節數組
反序列化---就是將字符串或者字節數組轉為對象
具體的代碼實現過程:
消費者拿到消息以后,放入redis中,并設置這個key,完后再對設置的結果進行判斷,如果為true,說明設置成功了(下一步就可以插入數據庫了)
//放入redis中,并設置id
Boolean setRuslt = RedisTemplate.opsForValue().setIfAbsent(要設置的id)
//如果id不存在,說明沒有這條消息,就可以放入數據庫了
if(setRuslt){//就可以放入數據庫了
}
//下一條消息再過來,就返回的flase,完后就不處理了。
19 RabbitMQ集群與高可用
RabbitMQ 的集群分兩種模式,一種是默認集群模式,一種是鏡像集群模式;
在RabbitMQ集群中所有的節點(一個節點就是一個RabbitMQ的broker服務器)被歸為兩類:一類是磁盤節點,一類是內存節點;
磁盤節點會把集群的所有信息(比如交換機、綁定、隊列等信息)持久化到磁盤中,而內存節點只會將這些信息保存到內存中,如果該節點宕機或重啟,內存節點的數據會全部丟失,而磁盤節點的數據不會丟失;
19 .1 默認集群模式
默認集群模式也叫 普通集群模式、或者 內置集群模式;
RabbitMQ默認集群模式,只會把交換機、隊列、虛擬主機等元數據信息在各個節點同步,而具體隊列中的消息內容不會在各個節點中同步;
接發消息的原理:
創建隊列的時候,只要在任何一個節點創建,都會復制到其他的兩個節點上。
接收消息的時候:
但是發消息的時候只會發送到節點1上,接消息的時候也是從節點1接收消息
發送消息的時候:
假如發消息的時候,我們連接到節點2,節點2上本身是不存儲消息的,它會把請求轉到節點1上,從而發送到節點1上。節點1停了,還是不能發消息的
假如接消息的時候是從節點3上接收,節點3上沒有消息,它會把請求轉到節點1上,從而接收到消息
元數據(除了消息本身其他的都是元數據)
隊列元數據:隊列名稱和屬性(是否可持久化,是否自動刪除)
交換器元數據:交換器名稱、類型和屬性
綁定元數據:交換器和隊列的綁定列表
(虛擬主機)vhost元數據:vhost內的相關屬性,如安全屬性等;
默認集群模式式
優點:
1)節省存儲空間;
2)性能提高;
如果消息需要復制到集群中每個節點,網絡開銷不可避免,持久化消息還需要寫磁盤,占用磁盤空間。
缺點:
當其中一個隊列宕機以后,則內容就會全部消失。
19.2 集群的搭建
- 第一步:從已經安裝好rabbitmq的機器 clone 三臺機器,【注意clone完,先不要啟動三臺機器,三臺機器均要重新生成mac地址,防止clone出的機器ip地址重復】
-
- 虛擬機的克隆步驟:
-
-
- 將虛擬機關機
- 點擊管理---->點擊克隆---->選中虛擬機當前的狀態--->選中創建完整的虛擬機-->填寫克隆的虛擬機的名稱---> 點擊完成--->在啟動前改變其物理ip--->啟動前點擊網絡適配器---->高級---->召見MAC地址--->點擊幾遍生成 ------->克隆完成
-
- 第二步:使用xshell 連接三臺機器
- 第三步:修改三臺機器的/etc/hosts 文件,
vim /etc/hosts
192.168.131.128 rabbit130
192.168.131.129 rabbit133
192.168.131.130 rabbit134
- 第三步:三臺機器均重啟網絡,使節點名生效(可以直接關機重新啟動)
systemctl restart network
- 第四步:三臺機器的防火墻處理
systemctl status firewalld ---查看防火墻的狀態
systemctl stop firewalld --關閉防火墻
systemctl disable firewalld --開機不啟動防火墻
- 第五步:三臺機器 .erlang.cookie文件保持一致 ,由于是clone出的三臺機器,所以肯定是一樣的
如果我們使用解壓縮方式安裝的RabbitMQ,那么該文件會在${用戶名}目錄下,
也就是${用戶名}/.erlang.cookie;
如果我們使用rpm安裝包方式進行安裝,那么這個文件會在/var/lib/rabbitmq目錄下;
查看隱藏文件用的ls-a
注意 .erlang.cookie的權限為400,目前已經是400
- 第六步: 分別啟動
rabbitmq-server -detached
- 第七步: 使用以下命令查看集群狀態
Disk Nodes 表示磁盤節點
rabbitmqctl cluster_status
- 第八步:以上的三個機器還是互不相干三個機器,并不是集群,構建集群
-
- 1、先把其他的兩個rabbitmq停掉
rabbitmqctl stop_app 僅僅是停掉rabbitmq
-
- 2、把當前rabbit中的交換機和隊列全部重置
rabbitmqctl reset
-
- 3、將當前rabbit加入到第一個rabbit中,并成為內存節點
--ram 參數表示讓rabbitmq128成為一個內存節點,如果不帶參數默認為disk磁盤節點;
rabbit@rabbit128 代表的主rabbitmq
rabbitmqctl join_cluster rabbit@rabbit128 --ram rabbit@rabbit128主rabbitmq
-
- 4、啟動rabbitmq
rabbitmqctl start_app
-
- 5、查看主rabbit的狀態
rabbitmqctl cluster_status
19.3 操作集群中的一個節點,添加用戶和權限等
#列出用戶
rabbitmqctl list_users
# 添加用戶
rabbitmqctl add_user admin 123456
#查看權限
rabbitmqctl list_permissions
#設置權限
rabbitmqctl set_permissions admin ".*" ".*" ".*"
#設置角色
rabbitmqctl set_user_tags admin administrator【注意】:這個啟動插件需要給集群中的每一個rabbit都單獨啟動
#列出插件
rabbitmq-plugins list
#啟動web控制臺插件
rabbitmq-plugins enable rabbitmq_management
使用web瀏覽器添加一個虛擬主機:powernode
實現原理
RabbitMQ底層是通過Erlang架構來實現的,所以rabbitmqctl會啟動Erlang節點,并基于Erlang節點來使用Erlang系統連接RabbitMQ節點,在連接過程中需要正確的Erlang Cookie和節點名稱,Erlang節點通過交換Erlang Cookie以獲得認證;
19.4 springboot連接集群
只需要改變配置文件即可 ,其他接發消息是一樣的。
spring:#配置rabbitmqrabbitmq:# 連接集群,使用逗號分隔addresses: 192.168.150.150:5672,192.168.150.151:5672,192.168.150.152:5672username: adminpassword: 123456virtual-host: powernode
20 鏡像集群模式
鏡像模式是基于默認集群模式加上一定的配置得來的;
在默認模式下的RabbitMQ集群,它會把所有節點的交換機、綁定、隊列的元數據進行復制確保所有節點都有一份相同的元數據信息,但是隊列數據分為兩種:一種是隊列的元數據信息(比如隊列的最大容量,隊列的名稱等配置信息),另一種是隊列里面的消息;
鏡像模式,則是把所有的隊列數據完全同步,包括元數據信息和消息數據信息,當然這對性能肯定會有一定影響,當對數據可靠性要求較高時,可以使用鏡像模式;
20.1鏡像模式的搭建
實現鏡像模式也非常簡單,它是在普通集群模式基礎之上搭建而成的;
鏡像隊列配置命令:
./rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
- -p Vhost(虛擬主機): 可選參數,針對指定vhost下的queue進行設置;
- Name: policy的名稱;(可以自己取個名字就可以)
- Pattern: queue的匹配模式(正則表達式); ----就是說想對哪些主機進行鏡像
- Definition:鏡像定義,包括三個部分ha-mode, ha-params, ha-sync-mode;---ha代表高可用
- 【例子】:
{“ha-mode”:”exactly”,”ha-params”:2}
-
- ha-mode:指明鏡像隊列的模式,有效值為
all/exactly/nodes
- ha-mode:指明鏡像隊列的模式,有效值為
-
-
- all:表示在集群中所有的節點上進行鏡像
- exactly:表示在指定個數的節點上進行鏡像,節點的個數由ha-params指定
- nodes:表示在指定的節點上進行鏡像,節點名稱通過ha-params指
-
-
- ha-params:ha-mode模式需要用到的參數
- ha-sync-mode:進行隊列中消息的同步方式,有效值為automatic(自動)和manual(手動)
- priority:可選參數,policy的優先級;
- 【例子】:"^policy_表示以policy開頭的隊列
./rabbitmqctl set_policy -p powernode ha_policy "^policy_"
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
- 【例子2】:如果要在所有節點所有隊列上進行鏡像,則(在任意節點執行如下命令):
所有節點、所有虛擬主機、所有隊列 都進行鏡像
./rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
- 【例子3】:針對某個虛擬主機進行鏡像
rabbitmqctl set_policy -p powernode ha-all "^"
'{"ha-mode":"all","ha-sync-mode":"automatic"}'