RabbitMQ消息隊列的筆記

Rabbit與Java相結合

  1. 引入依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 在配置文件中編寫關于rabbitmq的配置
rabbitmq:host: 192.168.190.132   //這個那個Linxu的端口號port: 5672		//端口號 ,固定的username: admin			//這個是那個用戶名password: 123546			//密碼virtual-host: powernode      //配置我們發消息發到那個虛擬主機上
  1. 編寫一個配置類

1裝RabbitMQ之前對Linux的更改

1.1 按照上傳下載工具

yum install lrzsz -y

1.2 查看主機名字

more /etc/hostname

1.3 修改主機名

hostnamectl set-hostname 要修改主機名

4

2 鏈式編程

  1. 第一步:就是set方法中的返回值全部改為對象本身
public Student setAdderess(String adderess) {
this.adderess = adderess;
return this;
}
  1. 第二步:就可以使用鏈式編程了,因為當我們執行這個student.setId(20)的時候,我們在set方法中設置的返回值是一個Student對象。所以我們還可以繼續.setName()
public class Chainlearn {public static void main(String[] args) {Student student = new Student();student.setId(20).setName("lisi").setAge("20").setAdderess("shanxi0");System.out.println(student);}
  1. 使用lombok生成鏈式編程 @Accessors(chain = true) //生成鏈式編程
    1. 第一步就是添加依賴lomback依賴
 <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.28</version></dependency>
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)  //生成鏈式編程
public class Teacher implements Serializable {private Integer id;private String name;private String age;private String adderess;}

3 建造者模式

建造者模式就是一步一步的建造一個復雜的是對象 。它一般是將一個復雜的對象分解為多個簡單的對象,然后一步一步構造而成

3.1 使用lombok就可以給我們生曾建造者模式

就是使用@Bilder 生成建造者模式代碼

建造者模式就是用來創建對象的。

實現過程

  1. 在使用lombok注解的情況下,我們在類上定義一個注解@Builder //生成建造者模式
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)  //生成鏈式編程
@Builder  //生成建造者模式
public class Teacher implements Serializable {private Integer id;private String name;private String age;private String adderess;}
  1. 創建對象并賦值就是使用對象.budiler.屬性名
//        用建造者創建對象Teacher t = Teacher.builder().id(20).name("wangwu").age("10").adderess("真的").build();}

4 系統的啟動任務

就是SpringBoot工程啟動時,就會執行的任務

實現步驟:

就是在springBoot的啟動類上實現ApplicationRunner接口,并實現里面的方法,完后在方法中就可以輸出東西。(我們應該用@Slf4j)來輸出東西,不要使用System.out.println("你好");

@SpringBootApplication
@Slf4j
public class Application implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(Application.class,args);}@Overridepublic void run(ApplicationArguments args) throws Exception {log.info("系統已啟動,我就會運行");System.out.println("你好");}
}

5 RabbitMQ的定義

其中MQ的全稱:message queue 消息隊列

5.1 消息中間件

簡單來說,消息中間件就是指保存數據的一個容器(服務器),可以用于兩個系統之間的數據傳遞

消息中間件一般有三個主要角色:生產者(producer),消費者(consumer),消息代理(消息隊列、消息服務器)

生產者發送消息到消息服務器,然后消費者從消息代理(消息隊列)中獲取數據并進行處理。

6 使用場景

6.1 異步處理
  1. 不使用MQ時,
    1. 下訂單:1、執行下訂單業務---->2、再處理加積分業務----->3、發紅包業務---->4、發送手機短信業務
  1. 使用MQ時,加快執行速度
    1. 只執行下訂單的業務 ,完后直接返回,完后向MQ發送消息---->積分系統處理積分業務后加積分,紅包系統處理發紅包業務后發紅包、手機短信系統執行發送短信的業務后發短信
6.2 系統解耦
  1. 未使用MQ時
    1. 是2A系統調用B系統,再調用C系統......
  1. 使用MQ時
    1. A系統往MQ系統發送一條消息,B系統接受到消息處理,C系統接收到消息處理,其中不同的系統可以使用不同的語言來寫。

6.3 流量削峰

就是雙十一時,大量訂單,發送到系統A,則此時系統A可以把消息發送到MQ中,MQ再勻速的發送給系統B,完后由系統B去操作數據庫。

7 RabbitMQ運行環境搭建

7.1 RabbitMQ是由Erlang語言開發的 ,所以需要先下載安裝Erlang
  1. 下載Erlang
  2. 安裝erlang前先安裝Linux依賴庫
    1. yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
  1. 解壓erlang壓縮包文件
    1. tar -zxvf otp_src_25.1.1.tar.gz
  1. 進入otp_src_25.1.1 目錄 完后配置
    1. cd otp_src_25.1.1

./configure

  1. 編譯
    1. make
  1. 安裝,就是把這個放在path目錄下,這樣在任何的目錄就都可以使用了。
    1. make install
  1. 安裝好了erlang后可以將解壓的文件夾刪除:
    1. rm -rf otp_src_25.1.1
  1. 驗證erlang是否安裝成功
    1. 在命令行輸入:erl如果進入了編程命令行則表示安裝成功,然后按ctrl + z 退出編程命令行;
7.2 安裝RabbitMQ以及啟動和停止
  1. 解壓RabbitMQ的壓縮包,即安裝完成,無需再編譯
    1. tar -xvf rabbitmq-server-generic-unix-3.10.11.tar.xz -C /usr/local/

說明 -C 選項是指定解壓目錄,如果不指定會解壓到當前目錄

此時rabbitmq就安裝好了;

  1. 啟動RabbitMQ
    1. 首先切換到我們的安裝目錄下/usr/local ----> cd rabbitmq_server-3.10.11/

--->完后再切換到那個sbin/目錄下

    1. 就執行命令啟動 ./rabbitmq-server -detached
    2. 當配置完環境變量以后,我們就可以在任何目錄下啟動 rabbitmq-server -detached

說明:

-detached 將表示在后臺啟動運行rabbitmq;不加該參數表示前臺啟動;

rabbitmq的運行日志存放在安裝目錄的var目錄下;

現在的目錄是:/usr/local/rabbitmq_server-3.10.11/var/log/rabbitmq

  1. 停止RabbitMQ
    1. 切換到sbin目錄下執行: 執行./rabbitmqctl shutdown

7.3 查看MQ的狀態
  1. 切換到sbin目錄下執行:執行

說明:-n rabbit 是指定節點名稱為rabbit,目前只有一個節點,節點名默認為rabbit

此處-n rabbit 也可以省略

7.4 配置path環境變量
  1. 編輯這個文件 vi /etc/profile
  2. 在這個文件中寫入以下語句
RABBIT_HOME=/usr/local/rabbitmq_server-3.10.11
PATH=$PATH:$RABBIT_HOME/sbinexport RABBIT_HOME PATH
  1. 刷新環境變量 執行 source /etc/profile
    ?

8 RabbitMQ的管理命令

8.1 用戶管理

用戶管理包括增加用戶刪除用戶查看用戶列表修改用戶密碼

這些操作都是通過rabbitmqctl管理命令來實現完成。

  1. 查看當前用戶列表
    1. rabbitmqctl list_users
  1. 新增一個用戶
    1. 語法:rabbitmqctl add_user Username Password
  1. 設置用戶角色
    1. rabbitmqctl set_user_tags 用戶名 角色名
  1. 設置用戶權限
    1. rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
      1. ".*" ".*" ".*" 這幾個分別是讀、寫、配置的權限
  1. 查看用戶權限
    1. rabbitmqctl list_permissions

9 RabbitMQ的 web管理后臺

  • 幾個步驟:
    • Rabbitmq有一個web管理后臺,這個管理后臺是以插件的方式提供的,啟動后臺web管理功能,切換到sbin目錄下執行以下幾步:
        1. 查看rabbitmq 的插件列表,插件的最前面如果是一個空的[ ]代表沒有啟用
          1. rabbitmq-plugins list
        1. 啟用
          1. rabbitmq-plugins enable rabbitmq_management
        1. 禁用
          1. rabbitmq-plugins disable rabbitmq_management
    • 下一步就是防火墻操作

systemctl status firewalld --檢查防火墻狀態

systemctl stop firewalld --關閉防火墻,Linux重啟之后會失效

systemctl disable firewalld --防火墻置為不可用,Linux重啟后,防火墻服務不自動啟動,依然是不可用

    • 訪問
        1. http://192.168.190.132:15672/

用戶名/密碼為我們上面創建的admin/123456

注意上面改成你的虛擬主機的ip地址

備注:如果使用默認用戶guest、密碼guest登錄,會提示User can only log in via localhost

說明guest用戶只能從localhost本機登錄,所以不要使用該用戶。

9.1 通過web頁面新建虛擬主機
  1. 第一步:點擊那個admin
  2. 第二步:點擊那個User完后哪里就可以創建用戶
  3. 第三步:點擊那個Virtual Hosts就可以添加虛擬主機

10 .RabbitMQ工作模型

broker(服務器) 相當于mysql服務器,virtual host相當于數據庫(可以有多個數據庫)queue相當于表,消息相當于記錄。

消息隊列有三個核心要素: 消息生產者消息隊列消息消費者

  • 生產者(Producer):發送消息的應用;(java程序,也可能是別的語言寫的程序)
  • 消費者(Consumer):接收消息的應用;(java程序,也可能是別的語言寫的程序)
  • 代理(Broker):就是消息服務器,RabbitMQ Server就是Message Broker;
  • 信道(Channel):連接中的一個虛擬通道,消息隊列發送或者接收消息時,都是通過信道進行的;
  • 虛擬主機(Virtual host):一個虛擬分組,在代碼中就是一個字符串,當多個不同的用戶使用同一個RabbitMQ服務時,可以劃分出多個Virtual host,每個用戶在自己的Virtual host創建exchange/queue等;(分類比較清晰、相互隔離)
  • 交換機(Exchange):交換機負責從生產者接收消息,并根據交換機類型分發到對應的消息隊列中,起到一個路由的作用;
  • 路由鍵(Routing Key):交換機根據路由鍵來決定消息分發到哪個隊列,路由鍵是消息的目的地址;
  • 綁定(Binding):綁定是隊列和交換機的一個關聯連接(關聯關系);
  • 隊列(Queue):存儲消息的緩存;
  • 消息(Message):由生產者通過RabbitMQ發送給消費者的信息;(消息可以任何數據,字符串、user對象,json串等等)

11 .RabbitMQ交換機類型

Exchange(X)可翻譯成交換機/交換器/路由器

11.1 RabbitMQ交換器 (Exchange)類型

Fanout Exchange(扇形)

Direct Exchange(直連)

Topic Exchange(主題)

Headers Exchange(頭部)

11.2 Fanout Exchange(扇形交換機/器)

Fanout 扇形的,散開的;扇形交換機

投遞到所有綁定的隊列,不需要路由鍵,不需要進行路由鍵的匹配,相當于廣播、群發;

上面P代表product(生產者) X代表交換機/器 紅色的代表隊列

我們的生產者發消息只能發送到交換機上,當消息發送到交換機以后,只要隊列和交換機綁定了,那么就把消息分散的發送到這兩個隊列當中。(如果綁定10個隊列,那么就會分散的發送到這10個隊列當中)

11.3 例子【Fanout Exchange(扇形交換機/器)】
  • 實現步驟:
    • 第一步:添加依賴
<!--    rabbitMQ的依賴--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
    • 第二步: 添加配置信息
  rabbitmq:		host: 192.168.190.132				//地址port: 5672					//端口號username: admin				//用戶名password: 123546			//密碼virtual-host: powernode   //設置發消息發送到那個虛擬主機上
    • 第三步: 創建一個Rabbit的配置類
      • 在這個配置類中有一個三部曲
        • 第一步:定義交換機
@Bean
//FanoutExchange  說明就是定義的一個扇形交換機
public FanoutExchange fanoutExchange(){//括號里面就是給交換機起個名字return new FanoutExchange("exchange.fanout");  
}
        • 第二步定義隊列(多個隊列)
@Beanpublic Queue queueA(){		//隊列Areturn new Queue("queue.famout.a");}@Beanpublic Queue queueB(){		//隊列B return new Queue("queue.famout.b");}
        • 綁定交換機和隊列(扇形交換機不需要指定key)
          • 通過binding這個方法,其中參數就是把交換機傳進來,以及把要綁定的隊列傳進來
          • 調用BindingBuilder的.bind()完后把那個隊列傳進來。再使用to()括號里面是交換機的名字
@Beanpublic Binding bindingA(FanoutExchange fanoutExchange ,Queue queueA){return BindingBuilder.bind(queueA).to(fanoutExchange);}@Beanpublic Binding bindingB(FanoutExchange fanoutExchange ,Queue queueB){return BindingBuilder.bind(queueB).to(fanoutExchange);}
    • 第四步: 發送消息
      • 第一步:注入rabbitTemplate
      • 第二步:創建一個方法來定義消息,完后再使用Message() 這個類來把消息封裝了.
//定義消息String msg ="hello world";//這里Message中需要的參數是byte ,我們需要getBytes,將String轉換為byte//需要使用這個將消息封裝一下Message message = new Message(msg.getBytes());
      • 第三步:就是使用rabbitTemplate 這個類中convertAndSend方法來發送消息.其中括號中參數分別是:
          • 第一個寫的是我們在配置類中定義的額交換機的名字
          • 第二個一般都需要路由key嗎,但是扇形交換機不需要路由key
          • 第三個參數就是我們封裝的消息
rabbitTemplate.convertAndSend("exchange.fanout","",message);

扇形交換機發送消息的完整寫法

public class MessageService {//先注入rabbitTemplate// 這里直接可以使用rabbitTemplate是因為springBoot中有一個自動裝配的功能,我們把依賴導入@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(){//定義消息String msg ="hello world";//這里Message中需要的參數是byte ,我們需要getBytes,將String轉換為byte//需要使用這個將消息封裝一下Message message = new Message(msg.getBytes());//發送消息,括號里面第一個寫的是我們在配置類中定義的額交換機的名字//第二個一般都需要路由key嗎,但是扇形交換機不需要路由key//第三個參數就是我們封裝的消息rabbitTemplate.convertAndSend("exchange.fanout","",message);log.info("消息發送完畢,發送時間為:{}" ,new Date());}
}
    • 第五步: 上面編寫完方法以后,我們最后一步,調用這個方法,來發送消息,在啟動上實現

ApplicationRunner類(這個類表示程序一啟動就會執行里面的那個方法),完后并重寫里面的方法,完后我們把那個編寫消息的那個類注入進來,完后調用那個方法即可.

public class Application implements ApplicationRunner {@Autowiredprivate MessageService messageService;public static void main( String[] args ){SpringApplication.run(Application.class,args);}/*** 程序一啟動就會運行* @param args* @throws Exception*/@Overridepublic void run(ApplicationArguments args) throws Exception {messageService.sendMsg();}

述這樣就代表消息發送過去了

    • 第六步 : 接收消息
      • 一般情況都是一個模塊發送消息,另一個模塊接收消息
        • 創建一個新的模塊,來接收消息
        • 第一步:還是添加rabbitmq的依賴
        • 第二步:在配置文件中添加rabbitmq的相關配置。
        • 第三步:編寫一個接收消息的類,完后來接收消息
          • 這個類編寫的步驟:
            • 第一步:創建一個方法,在里面傳入Message的參數。
            • 第二步:在這個類上標注解@RabbitListener(),其中括號里面寫的就是對隊列的名稱。
            • 第三步:就是調用message.getBody()方法,來獲取消息。
            • 第四步:就是將接收到消息的字節數組,轉為字符串
@Slf4j
@Component
public class ReceiveMessage {/*** 接收兩個隊列的消息* @RabbitListener() 就是接收哪些隊列的消息,就寫哪些隊列*/@RabbitListener(queues = {"queue.famout.a","queue.famout.b"})public void receiveMsg(Message message){byte[] body = message.getBody();//上面那個得到的是字節數據,下面轉為字符串String msg = new String(body);log.info("接收的消息為: {} " ,msg);}
}

11.4 Direct Exchange(直連交換機)

根據路由鍵精確匹配(一模一樣)進行路由消息隊列;

其中P代表的是生產者 X代表的是交換機 C代表的是消費者 紅色的代表隊列

那個error代表的就是路由key

那個info error warning 也是路由key

原理就是 :生產者發送消息到交換機時,也會指定一個key(發送key),交換機發送到隊列也需要一個key(路由key) ,當發送key和路由key相同時,才會將交換機中的消息發送到隊列中

【例子】:當發消息的時候指定的路由key是hello,那么到交換機以后,則消息不會進入任何一個隊列

當發消息的時候指定的路由key是error,則到交換機以后,則消息會同時發送到兩個隊列。

11.5 例子【直連交換機的例子】:

實現步驟:

    • 第一步:添加依賴
    • 第二步:在配置文件中配置rabbitmq的配置
    • 第三步:創建一個rabbitmq的配置類
      • 創建步驟:
        • 還是那三部曲
          • 第一步:定義交換機
@Beanpublic DirectExchange directExchange (){return ExchangeBuilder.directExchange(exchangeName).build();}
          • 定義隊列
@Beanpublic Queue queueA(){return QueueBuilder.durable(queueAName).build();}@Beanpublic Queue queueB(){return QueueBuilder.durable(queueBName).build();}
          • 交換機與隊列綁定
    @Beanpublic Binding bindingA(DirectExchange directExchange,Queue queueA){return BindingBuilder.bind(queueA).to(directExchange).with("error");}@Beanpublic Binding bindingB1(DirectExchange directExchange ,Queue queueB){return BindingBuilder.bind(queueB).to(directExchange).with("info");}@Beanpublic Binding bindingB2(DirectExchange directExchange ,Queue queueB){return BindingBuilder.bind(queueB).to(directExchange).with("error");}@Beanpublic Binding bindingB3(DirectExchange directExchange ,Queue queueB){return BindingBuilder.bind(queueB).to(directExchange).with("warning");}

完整的代碼:

/*** 從配置文件中讀取指定名字的屬性,我們在配置文件中定義exchangeName   queueAName   queueBName* 那幾個屬性的值,完后再這里讀取。*/
@ConfigurationProperties("my")
public class RabbitConfig {private String exchangeName;private String  queueAName;private String queueBName;/*** 三部曲:* 第一步:定義隊列*/@Beanpublic DirectExchange directExchange (){return ExchangeBuilder.directExchange(exchangeName).build();}/*** 第二步:定義隊列*/@Beanpublic Queue queueA(){return QueueBuilder.durable(queueAName).build();}@Beanpublic Queue queueB(){return QueueBuilder.durable(queueBName).build();}/*** 交換機與隊列綁定*/@Beanpublic Binding bindingA(DirectExchange directExchange,Queue queueA){return BindingBuilder.bind(queueA).to(directExchange).with("error");}@Beanpublic Binding bindingB1(DirectExchange directExchange ,Queue queueB){return BindingBuilder.bind(queueB).to(directExchange).with("info");}@Beanpublic Binding bindingB2(DirectExchange directExchange ,Queue queueB){return BindingBuilder.bind(queueB).to(directExchange).with("error");}@Beanpublic Binding bindingB3(DirectExchange directExchange ,Queue queueB){return BindingBuilder.bind(queueB).to(directExchange).with("warning");}
}
    • 第四步: 創建發送消息的類
      • 第一步: 創建步驟:首先注入RabbitTemplate
      • 第二步: 創建一個方法 ,并創建一個消息
      • 第三步: 使用 RabbitTemplate來發送消息,
@Slf4j
public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(){//創建一條消息.使用建造者模式Message message = MessageBuilder.withBody("hello world".getBytes()).build();//下面就是發送消息,括號里面分別是,參數1 交換機的名字, 參數2 路由key, 參數3 消息rabbitTemplate.convertAndSend("exchange.direct", "info",message);log.info("消息發送完畢");}
11.6 Topic Exchange(主題交換機)

通配符匹配,相當于模糊匹配;

#匹配多個單詞,用來表示任意數量(零個或多個)單詞

*匹配一個單詞(必須有一個,而且只有一個),用 . 隔開的為一個

【例子】:beijing.# == beijing.queue.abc, beijing.queue.xyz.xxx

beijing.* == beijing.queue, beijing.xyz

發送時指定的路由鍵:lazy.orange.rabbit 則與隊列Q1 和隊列Q2都匹配。

但是雖然與Q2隊列有兩個都匹配的,但是,也只會進去以條消息。

11.7 例子【主題交換機】

編寫步驟:

  • 第一步:添加mq的依賴
  • 第二步:在配置文件中添加mq的配置
  • 第三步:編寫一個配置類
    • 編寫配置類的步驟:
      • 第一步:定義交換機
@Beanpublic TopicExchange topicExchange(){return ExchangeBuilder.topicExchange(exchangeName).build();}
      • 第二步:定義隊列
@Beanpublic Queue queueA(){return QueueBuilder.durable(queueAName).build();}@Beanpublic Queue queueB(){return QueueBuilder.durable(queueBName).build();}
      • 交換機與隊列綁定
@Beanpublic Binding bindingA(TopicExchange topicExchange,Queue queueA){return BindingBuilder.bind(queueA).to(topicExchange).with("*.orange.*");}@Beanpublic Binding bindingB1(TopicExchange topicExchange,Queue queueB){return BindingBuilder.bind(queueB).to(topicExchange).with("*.*.rabbit");}@Beanpublic Binding bindingB2(TopicExchange topicExchange,Queue queueB){return BindingBuilder.bind(queueB).to(topicExchange).with("lazy.#");}
  • 第四步:編寫發送消息
@Service
public class MessageService {@Autowiredprivate AmqpTemplate amqpTemplate;/*** 發送消息*/@Beanpublic void sendMsg(){Message message = MessageBuilder.withBody("hello world".getBytes()).build();//有幾個參數第一個就是交換機的名字,第二個就是路由key,第三個就是消息amqpTemplate.convertAndSend("exchange.topic","lazy.orange.rabbit",message);}
  • 第五步:讓啟動類實現ApplicationRunner 并重寫里面的方法(方法的作用就是程序一啟動就會調用發消息的那個方法)
public class Application implements ApplicationRunner {@Autowiredprivate MessageService messageService;public static void main(String[] args) {SpringApplication.run(Application.class, args);}@Overridepublic void run(ApplicationArguments args) throws Exception {messageService.sendMsg();}
11.8 Headers Exchange(頭部交換機)

基于消息內容中的headers屬性進行匹配;

其中 P代表的消息的生產者 C代表消息的消費者 X代表的是交換機 紅色代表隊列

原理: 就是在發消息的時候,我們在消息的頭部在加上匹配的值,看與那個隊列匹配。

11.9 例子【頭部交換機】

實現步驟:

  • 第一步:引入依賴
  • 第二步:在配置文件中加入配置
  • 第三步:創建一個配置類
    • 配置類的編寫步驟
      • 第一步:定義頭部交換機
@Beanpublic HeadersExchange headersExchange(){return ExchangeBuilder.headersExchange(exchangeName).build();}
      • 第二步:創建隊列
 @Beanpublic Queue queueA(){return QueueBuilder.durable(queueAName).build();}@Beanpublic Queue queueB(){return QueueBuilder.durable(queueBName).build();}
      • 第三步:交換機與隊列綁定,這個需要是頭部的信息綁定,所以我們需要創建一個隊列,存儲每個交換機與隊列的綁定。
    @Beanpublic Binding bindingA(HeadersExchange headersExchange,Queue queueA){Map<String,Object> headerValues = new HashMap<>();headerValues.put("type","m");headerValues.put("status",1);return BindingBuilder.bind(queueA).to(headersExchange).whereAll(headerValues).match();}@Beanpublic Binding bindingB(HeadersExchange headersExchange,Queue queueB){Map<String,Object> headerValues = new HashMap<>();headerValues.put("type","s");headerValues.put("status",0);return BindingBuilder.bind(queueB).to(headersExchange).whereAll(headerValues).match();}
  • 第四步:創建發送消息
public class MessageService {@Value("${my.exchangeName}")private String exchangeName;@Autowiredprivate RabbitTemplate rabbitTemplate;public void  sandMsg(){//消息屬性MessageProperties messageProperties = new MessageProperties();Map<String ,Object> headers = new HashMap<>();headers.put("type","s");headers.put("status",0);//設置消息頭messageProperties.setHeaders(headers);//添加消息屬性Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();//頭部交換機,不需要只當路由keyrabbitTemplate.convertAndSend(exchangeName,"",message);}
}

12 RabbitMQ過期消息

過期消息也叫TTL消息,TTL:Time To Live

消息的過期時間有兩種設置方式:(過期消息)

12 .1 設置單條消息的過期時間

就是在MessageProperties()中有一個setExpiration()來設置單個消息的過期時間。

設置步驟:

  • 前面的引入依賴和配置文件的編寫都一樣,以及配置類的編寫沒有任何區別,所以我們這里只編寫發送消息的步驟
  • 發送消息的步驟:
    • 第一步:先創建一個MessageProperties() ,完后再調用setExpiration()來設置消息的過期時間
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("15000");//設置過期的毫秒數
    • 第二步:通過建造者模式中的MessageBuilder 來創建消息
Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();
    • 第三步:通過模板方法rabbitTemplate.convertAndSend來發送消息
rabbitTemplate.convertAndSend("exchange.ttl.a","info",message);

完整的代碼編寫

@Service
@Slf4j
public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(){MessageProperties messageProperties = new MessageProperties();messageProperties.setExpiration("15000");//設置過期的毫秒數Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend("exchange.ttl.a","info",message);log.info("消息發送完畢");}
}

12.2 通過隊列屬性設置消息過期時間

設置步驟:

  • 第一步:我們還是編寫配置類,并在里面編寫三部曲,這里只不過在設置隊列的時候,來設置隊列的屬性
    • 設置隊列屬性的步驟:
      • 有兩種方式:
        • 第一種:就是通過new的方式,就是new Queue的時候,里面有5個參數,其中第一個是隊列名稱,第一個就是是否持久化 ,還有一個Map集合,其中就是在Map集合中來設置過期時間。
        • 第二步:就是創建一個集合,完后使用Map.put()方式,來向集合中放入隊列的過期時間,其中隊列過期時間的屬性x-message-ttl
 @Beanpublic Queue queue(){//方式1,new的方式Map<String ,Object> arguments =new HashMap<>();arguments.put("x-message-ttl",15000);return new Queue(queueName,true,false,false,arguments);}
      • 第二種就是:建造者模式: 這種模式,還是要提前創建一個Map集合,完后通過建造者模式,將集合傳入進去。
    • 完整的代碼
//第二種:建造者模式
@Beanpublic Queue queue(){//方式1,new的方式Map<String ,Object> arguments =new HashMap<>();arguments.put("x-message-ttl",15000); return QueueBuilder.durable(queueName).withArguments(arguments).build();
  • 第二步:發送消息的步驟:
    • 因為這個設置了隊列的過期消息,所以,這里只需要,創建消息,完后再使用rabbit模板發送消息
public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(){Message message = MessageBuilder.withBody("hello world".getBytes()).build();rabbitTemplate.convertAndSend("exchange.ttl.b","info",message);log.info("消息發送完畢");}
}

【注意】:如果單個消息和隊列都設置了過期時間,那么消息的過期時間以二者之間較小的那個數值為準。

13 RabbitMQ的死信隊列

13.1如何消費者的手動確認
    • 第一步:在消費者端的配置文件中加入配置
     listener:simple:acknowledge-mode: manual
    • 第二步:在接收消息的一段那段配置來通知其刪除
      • 實現過程:
        • 第一步:先編寫一個方法,參數是Message Channel 倆個參數
        • 第二步:獲取消息的屬性
//第一步:獲取消息的屬性MessageProperties messageProperties = message.getMessageProperties();
        • 第三步:根據消息的屬性來獲取消息的唯一標識(如同身份證號)
//獲取消息的唯一標識(如同身份證號)long tag = messageProperties.getDeliveryTag();
        • 第四步:編寫一個try ....catch 代碼塊,try { }里面編寫的正常接收到消息,正常接收消息后,則告訴消費者就可以刪除了 channel.basicAck(tag,false);
try {byte[] body = message.getBody();String str = new String(body);log.info("接收到的消息為"+ str);//正常接收后,告訴服務器可以刪除了。//消費者的手動確認 tag代表確認的就是這條消息//false代表只確認當前一條channel.basicAck(tag,false);
        • 第五步:就是在catch里面編寫的,說明報錯了,告訴服務器不要刪除,我們需要重新接收一遍 channel.basicNack(tag,false,true);
}catch (Exception e){//報錯了,我們沒有接收到消息,告訴服務器不要刪除,我們再重新接收一遍log.error("接收者出現問題");//告訴服務器不要刪除  ,第一個參數:當前消息的標識//第二個參數,只代表當前消息//第三個參數:重新放回到隊列里面try {channel.basicNack(tag,false,true);} catch (IOException ex) {e.printStackTrace();}throw new RuntimeException(e);}
  • 完整的代碼編寫
package com.powernode.message;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
@Slf4j
public class MessageReceive {@RabbitListener(queues = {"queue.normal.4"})public void receviceMsg(Message message , Channel channel){//第一步:獲取消息的屬性MessageProperties messageProperties = message.getMessageProperties();//獲取消息的唯一標識(如同身份證號)long tag = messageProperties.getDeliveryTag();try {byte[] body = message.getBody();String str = new String(body);log.info("接收到的消息為"+ str);//正常接收后,告訴服務器可以刪除了。//消費者的手動確認 tag代表確認的就是這條消息//false代表只確認當前一條channel.basicAck(tag,false);}catch (Exception e){//報錯了,我們沒有接收到消息,告訴服務器不要刪除,我們再重新接收一遍log.error("接收者出現問題");//告訴服務器不要刪除  ,第一個參數:當前消息的標識//第二個參數,只代表當前消息//第三個參數:重新放回到隊列里面try {channel.basicNack(tag,false,true);} catch (IOException ex) {e.printStackTrace();}throw new RuntimeException(e);}}
}
13.2 什么條件下會變成死信隊列?
    • 1、消息過期
      • 是通過定義消息屬性來設置消息的過期時間,在正常隊列哪里設置死信交換機
 public void sendMsg(){//定義一個消息屬性MessageProperties messageProperties = new MessageProperties();messageProperties.setExpiration("15000");//定義消息Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();//發消息的幾個參數rabbitTemplate.convertAndSend("exchange.normal.2","order",message);
    • 2、隊列過期
      • 是在定義正常隊列哪里設置隊列過期,以及設置死信交換機
 @Beanpublic Queue normalQueue(){Map<String, Object> arguments =new HashMap<>();
//        //設置消息過期時間arguments.put("x-message-ttl" ,20000);//設置死信交換機,一旦消息過了20秒,而且還沒有消費者,則就會進入這個死信交換機。arguments.put("x-dead-letter-exchange",exchangeDlxName);//設置死信路由key,這個key就是死信交換機和死信隊列綁定的呢個keyarguments.put("x-dead-letter-routing-key","error" );return QueueBuilder.durable(queueNormalName).withArguments(arguments).build();}
    • 3、隊列達到最大長度,當達到隊列設置的最大長度時,如果此時還有消息進入隊列,那么最開始進入隊列的消息就會變成死信。
    public Queue normalQueue(){Map<String, Object> arguments =new HashMap<>();//設置隊列的最大長度arguments.put("x-max-length",5);//設置死信交換機,一旦消息過了20秒,而且還沒有消費者,則就會進入這個死信交換機。arguments.put("x-dead-letter-exchange",exchangeDlxName);//設置死信路由key,這個key就是死信交換機和死信隊列綁定的呢個keyarguments.put("x-dead-letter-routing-key","error" );return QueueBuilder.durable(queueNormalName).withArguments(arguments).build();}
    • 4、消費者從正常的隊列中接收消息,但是對消息不進行確認,并且不對消息進行重新投遞(不重新入隊),此時消息就進入死信隊列。(就是當我們開啟了手動確認以后,在發生異常之后,消費者手動不確認,并且不重新入隊) channel.basicNack(tag,false,flase);
catch (Exception e){//報錯了,我們沒有接收到消息,告訴服務器不要刪除,我們再重新接收一遍log.error("接收者出現問題");//告訴服務器不要刪除 ,//第一個參數:當前消息的標識//第二個參數,只代表當前消息//第三個參數:是否重新放回到隊列里面try {channel.basicNack(tag,false,flase);} catch (IOException ex) {e.printStackTrace();}throw new RuntimeException(e);
    • 5、消費者拒絕消息

開啟手動確認模式,并拒絕消息,不重新投遞(不重新入隊),則進入死信隊列

channel.basicReject(tag,false);

   }catch (Exception e){log.error("接收者出現問題");try {//消費者拒絕消息,// 第一個參數:當前消息的唯一標識//第二個參數:是否重新入隊;//basicReject與basicNack的區別:basicReject只能處理一條消息channel.basicReject(tag,false);} catch (IOException ex) {e.printStackTrace();}throw new RuntimeException(e);}}
13.3 死信隊列的詳細解釋

其中死信隊列也叫做死信交換機、死信郵箱等說法。

DLX:Dead - Letter - Exchange 死信交換器、死信郵箱

過程:

第一步:生產者生產消息 ----> 消息發送到交換機 ----> 交換機根據路由key發送到隊列----->當沒有死信交換機之前 在規定的時間內消息過期無人接收,則消息丟了 -----> 創建一個死信交換機 -----> 當創建一個死信交換機之后,在規定的時間內無人接收的消息則進入死信交換機 。---->死信交換機根據路由key路由到另一條隊列(死信隊列)---->消費者也可以接收死信隊列的信息。

代碼的實現過程

  • 第一步:引入依賴
  • 第二步:在配置文件中配置
  • 第三步:創建一個Rabbit的配置類(就是創建交換機等)
    • 配置類中的第一步:就是引入配置類中的交換機和隊列的名稱
    • 第二步:創建正常的交換機
 @Beanpublic DirectExchange normalExchange(){return ExchangeBuilder.directExchange(exchangeNormalName).build();}
    • 第三步:創建正常的隊列,并在正常的隊列中,設置消息的過期時間設置死信交換機以及設置死信路由key (就是死信交換機與死信隊列綁定的那個key)
 @Beanpublic Queue normalQueue(){Map<String, Object> arguments =new HashMap<>();//設置消息過期時間arguments.put("x-message-ttl" ,20000);//設置死信交換機,一旦消息過了20秒,而且還沒有消費者,則就會進入這個死信交換機。arguments.put("x-dead-letter-exchange",exchangeDlxName);//設置死信路由key,這個key就是死信交換機和死信隊列綁定的呢個keyarguments.put("x-dead-letter-routing-key","error" );return QueueBuilder.durable(queueNormalName).withArguments(arguments).build();}
    • 第四步:正常的交換機與正常的隊列綁定
public Binding bindingNormal(DirectExchange normalExchange ,Queue normalQueue){return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");}
    • 第五步:聲明死信交換機(就和正常的交換機一樣)
@Beanpublic DirectExchange dlxExchange(){return ExchangeBuilder.directExchange(exchangeDlxName).build();}
    • 第六步:聲明死信隊列
@Beanpublic Queue dlxQueue(){return QueueBuilder.durable(queueDlxName).build();}
    • 第七步:死信隊列與死信交換機綁定
 @Beanpublic Binding bindingDlx(DirectExchange dlxExchange ,Queue dlxQueue){return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");}
    • 第八步:創建消息,并發送消息
@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(){//定義消息Message message = MessageBuilder.withBody("hello world".getBytes()).build();//發消息的幾個參數rabbitTemplate.convertAndSend("exchange.normal.a","order",message);log.info("消息發送完畢");}

14 RabbitMQ的延遲隊列

使用場景就是關閉規定時間內未支付的訂單

場景:有一個訂單,15分鐘內如果不支付,就把該訂單設置為交易關閉,那么就不能支付了,這類實現延遲任務的場景就可以采用延遲隊列來實現,當然除了延遲隊列來實現,也可以有一些其他辦法實現;

14.1 定時任務方式
  • 每隔3秒掃描一次數據庫,查詢過期的訂單然后進行處理;

優點:

簡單,容易實現;

缺點:

1、存在延遲(延遲時間不準確),如果你每隔1分鐘掃一次,那么就有可能延遲1分鐘;

2、性能較差,每次掃描數據庫,如果訂單量很大

14.2、 被動取消

當用戶查詢訂單的時候,判斷訂單是否超時,超時了就取消(交易關閉)

優點:

對服務器而言,壓力小;

缺點:

1、用戶不查詢訂單,將永遠處于待支付狀態,會對數據統計等功能造成影響;

2、用戶打開訂單頁面,有可能比較慢,因為要處理大量訂單,用戶體驗少稍差;

14.3、JDK延遲隊列(單體應用,不能分布式下)

DelayedQueue

無界阻塞隊列,該隊列只有在延遲期滿的時候才能從中獲取元素

優點:

實現簡單,任務延遲低;

缺點:

服務重啟、宕機,數據丟失;

只適合單機版,不適合集群;

訂單量大,可能內存不足而發生異常; oom

14.4、 采用消息中間件(rabbitmq)

1、RabbitMQ本身不支持延遲隊列,可以使用TTL(消息過期)結合DLX(死信隊列)的方式來實現消息的延遲投遞,即把DLX跟某個隊列綁定,到了指定時間,消息過期后,就會從DLX路由到這個隊列,消費者可以從這個隊列取走消息。

上述 :order.ttl.queue代表的是正常隊列

order.dlx.queue 代表的是死信隊列

其中交換機只有一個,當生產者根據key找到交換機以后,交換機根據key找到正常隊列,過期以后,正常隊列指定的死信交換機還是那個交換機(即那條消息就會返回給那個交換機),完后交換機根據死信路由key,發送到死信隊列中

存在問題 :如何解決消息過期時間不一致的問題?
如果先發送的消息,消息延遲時間長,會影響后面的 延遲時間段的消息的消費;

就是因為隊列是先進先出,因為第一條消息如果設置的時間大于第二個消息設置的時間,那么只有等第一個消息過期以后,才能輪到第二個消息,那么這樣的話,第二個消息就和設置的過期時間不一致。

解決方式:

不同延遲時間的消息要發到不同的隊列上,同一個隊列的消息,它的延遲時間應該一樣(即相同的過期時間放到相同的隊列里面)

代碼實現

  • 第一步:引入依賴
  • 第二步:引入配置文件的信息
  • 第三步:編寫交換機以及隊列,并綁定,在隊列哪里指定死信交換機的名稱。
  • 第四步:編寫發送消息的類,并創建幾條消息,并指定消息的過期時間。分別發送到不同時間的交換機上。
  • 第五步:接收消息(接收的死信隊列的消息)。
14 .5 、 使用rabbitmq-delayed-message-exchange 延遲插件
  • 第一步:選擇對應的版本下載 rabbitmq-delayed-message-exchange 插件,下載地址:http://www.rabbitmq.com/community-plugins.html
  • 第二步:下載完成以后,解壓這個插件

unzip rabbitmq_delayed_message_exchange-3.10.2.ez

如果unzip 沒有安裝,先安裝一下

yum install unzip -y

  • 第三步:先找sbin 目錄, 找到這個 plugins目錄,啟用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange 開啟插件;

原理

安裝這個插件以后,就會有一個新的交換機: 延遲交換機類似于直連交換機

消息發送后不會直接投遞到隊列,而是存儲到 Mnesia(嵌入式數據庫),檢查 x-delay 時間(消息頭部);

延遲插件在 RabbitMQ 3.5.7 及以上的版本才支持,依賴 Erlang/OPT 18.0 及以上運行環境;

Mnesia 是一個小型數據庫,不適合于大量延遲消息的實現

解決了消息過期時間不一致出現的問題。

代碼實現過程:

  • 第一步:引入mq依賴
  • 第二步:在配置文件中引入配置信息
  • 第三步:創建一個配置類,來創建延遲交換機和隊列,以及綁定(通過自定義交換機來創建)
 @Configuration
public class RabbitConfig {@Value("${my.exchangeName}")private String exchangeName;@Value("${my.queueDelayName}")private String queueDelayName;/*** 創建交換機* 使用自定交換機來創建延遲交換機* @return*/@Beanpublic CustomExchange customExchange(){//自定義交換機Map<String, Object> arguments =new HashMap<>();arguments.put("x-delayed-type","direct");return  new CustomExchange(exchangeName,"x-delayed-message",true,false,arguments);}/*** 創建隊列*/@Beanpublic Queue queue(){return QueueBuilder.durable(queueDelayName).build();}/*** 交換機與隊列綁定*/@Beanpublic Binding binding(CustomExchange customExchange ,Queue  queue){return BindingBuilder.bind(queue).to(customExchange).with("plugin").noargs();}
  • 第四步 :發送消息
@Service
@Slf4j
public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 發送消息*/@Beanpublic void sendMsg() {{MessageProperties messageProperties = new MessageProperties();messageProperties.setHeader("x-delay", 25000);  //第一條消息,設置延遲時間Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend("exchange.delay.4", "plugin", message);log.info("發送消息完畢");}{MessageProperties messageProperties = new MessageProperties();messageProperties.setHeader("x-delay", 15000);  //第二天消息,設置延遲時間Message message = MessageBuilder.withBody("hello world111".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend("exchange.delay.4", "plugin", message);log.info("發送消息完畢");}}
  • 第五步 :接收消息
@Component
@Slf4j
public class ReceiveMessage {/*** 接收延遲隊列的消息,使用的是插件* @param message*/@RabbitListener(queues = {"queue.delay.4"})public void receiveMsg(Message message){String body = new String(message.getBody());log.info("接收到消息的為: "+body);}

15 消息的可靠性投遞

消息的可靠性投遞就是要保證消息投遞過程中每一個環節都要成功,那么這肯定會犧牲一些性能,性能與可靠性是無法兼得的;

如果業務實時一致性要求不是特別高的場景,可以犧牲一些可靠性來換取性能。

① 代表消息從生產者發送到Exchange; ----使用RabbitMQ消息Confirm模式(生產者確認回調)來解決

② 代表消息從Exchange路由到Queue---使用RabbitMQ消息Return模式 備用交換機模式 來解決
③ 代表消息在Queue中存儲;------- 通過設置交換機的持久化,以及隊列的持久化、每條消息的持久化

④ 代表消費者監聽Queue并消費消息;----自動確認改成手動確認

15 .1 RabbitMQ消息Confirm模式(生產者確認回調)----可能因為網絡或者Broker的問題導致①失敗,而此時應該讓生產者知道消息是否正確發送到了Broker的exchange中;

消息的confirm確認機制,是指生產者投遞消息后,到達了消息服務器Broker里面的exchange交換機,則會給生產者一個應答,生產者接收到應

答,用來確定這條消息是否正常的發送到Broker的exchange中,這也是消息可靠性投遞的重要保障;

代碼的具體實現過程(如何實現生產者確認回調)

  • 第一步: 引入依賴
  • 第二步:在配置文件中配置信息 publisher-confirm-type: correlated 開啟確認模式
rabbitmq:host: 192.168.190.132port: 5672username: adminpassword: 123456virtual-host: powernode       #pei zhi fa xioa xi fa dao na ge xu ni zhu ji shangpublisher-confirm-type: correlated   #開啟生產者的確認模式,設置成關聯模式
  • 第三步:編寫一個類實現 RabbitTemplate.ConfirmCallback 類,并實現里面的方法,在方法里面有三個參數,第一個參數是關聯數據 第二個參數是判斷消息是否到達交換機 ,第三個是原因,這個就是不管消息有沒有到達交換機,都會回調這個類,給與我們提示。【這個類也可以直接在發消息的時候直接實現】
@Component
@Slf4j
public class MyConfirmCallBack implements RabbitTemplate.ConfirmCallback {/**** @param correlationData correlation data for the callback.  關聯數據* @param ack true for ack, false for nack  真 到達交換機    或者假  沒有到達交換機* @param cause An optional cause, for nack, when available, otherwise null. 原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info(correlationData.getId());  //獲得idif (ack){//說明消息到達交換機log.info("消息正確到達交換機");return;}//說明消息沒有到達交換機log.error("消息沒有到達交換機,原因為{}" ,cause);}
}
@Service
@Slf4j
public class MessageService implements RabbitTemplate.ConfirmCallback{@Autowiredprivate RabbitTemplate rabbitTemplate;//這個rabbitTemplate 還沒有使用我們那個回調接口@PostConstruct      //構造方法后執行,如同初始化public void init(){rabbitTemplate.setConfirmCallback(this);}public void sendMsg(){Message message = MessageBuilder.withBody("hello world".getBytes()).build();//定義一個關聯數據CorrelationData correlationData =new CorrelationData();correlationData.setId("order_123456");  //發送訂單信息rabbitTemplate.convertAndSend("exchange.confrim.1","info",message,correlationData);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info(correlationData.getId());  //獲得idif (ack){//說明消息到達交換機log.info("消息正確到達交換機");return;}//說明消息沒有到達交換機log.error("消息沒有到達交換機,原因為{}" ,cause);}
}
  • 第四步:在我們發消息的那個類中定義這個關聯數據,并調用,完后在我們發送消息到交換機以后,不管有沒有到達交換機,都會給我們一個提示
@Service
@Slf4j
public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate MyConfirmCallBack myConfirmCallBack;//這個rabbitTemplate 還沒有使用我們那個回調接口@PostConstruct      //構造方法后執行,如同初始化public void init(){rabbitTemplate.setConfirmCallback(myConfirmCallBack);}public void sendMsg(){Message message = MessageBuilder.withBody("hello world".getBytes()).build();//定義一個關聯數據CorrelationData correlationData =new CorrelationData();correlationData.setId("order_123456");  //發送訂單信息rabbitTemplate.convertAndSend("exchange.confrim.1","info",message,correlationData);}
}
15.2 RabbitMQ消息Return模式(保證交換機的消息到達隊列)-----可能因為路由關鍵字錯誤,或者隊列不存在,或者隊列名稱錯誤導致②失敗。

rabbitmq 整個消息投遞的路徑為:

producer —> exchange —> queue —> consumer

>> 消息從 producer 到 exchange 則會返回一個 confirmCallback

>> 消息從 exchange –> queue 投遞失敗則會返回一個 returnCallback;

我們可以利用這兩個callback控制消息的可靠性投遞

代碼的實現過程

  • 第一步:引入依賴
  • 第二步:在配置文件配置相關信息,開啟return模式
  rabbitmq:host: 192.168.190.132port: 5672username: adminpassword: 123456virtual-host: powernode       #pei zhi fa xioa xi fa dao na ge xu ni zhu ji shangpublisher-returns: true      #開啟return模式
  • 第三步:編寫一個類來實現RabbitTemplate.ReturnsCallback 并實現里面的方法,(如果這個類被調用,說明沒有正確的路由到隊列)
@Component
@Slf4j
public class MyReturnCallBack implements RabbitTemplate.ReturnsCallback {@Overridepublic void returnedMessage(ReturnedMessage returned) {//如果這個方法被調用,說明消息沒有到達隊列log.error("消息從交換機沒有正確的路由到隊列,原因為{}",returned.getMessage());}
}
  • 第四步:在發送消息的類中,來設置,使 rabbitTemplate 調用我們編寫的那個類
public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate MyReturnCallBack myReturnCallBack;//這個rabbitTemplate 還沒有使用我們那個回調接口@PostConstruct      //構造方法后執行,如同初始化public void init(){rabbitTemplate.setReturnsCallback(myReturnCallBack);}
  • 第五步 : 編寫發送消息
@Service
@Slf4j
public class MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate MyReturnCallBack myReturnCallBack;//這個rabbitTemplate 還沒有使用我們那個回調接口@PostConstruct      //構造方法后執行,如同初始化public void init(){rabbitTemplate.setReturnsCallback(myReturnCallBack);}public void sendMsg(){Message message = MessageBuilder.withBody("hello world".getBytes()).build();rabbitTemplate.convertAndSend("exchange.return.1","info11111",message);}
}
15.3 確保消息在隊列正確地存儲

可能因為系統宕機、重啟、關閉等等情況導致存儲在隊列的消息丟失,即③出現問題;

(1)、隊列持久化

QueueBuilder.durable(QUEUE).build();

(2)、交換機持久化

ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();

(3)、消息持久化

MessageProperties messageProperties = new MessageProperties();messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); 
15.4 確保消息從隊列正確地投遞到消費者

如果消費者收到消息后未來得及處理即發生異常,或者處理過程中發生異常,會導致④失敗。

為了保證消息從隊列可靠地達到消費者,RabbitMQ提供了消息確認機制(message acknowledgement);

實現步驟:

  1. #開啟手動ack消息消費確認 ,在配置文件中配置
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  1. #編寫配置類(配置交換機、隊列、以及綁定)
  2. 編寫一個發送消息的類
  3. 編寫一個接收消息的類 ,開啟手動消息確認后,我們接收完消息后,并不會刪除
@Component
@Slf4j
public class ReceiveMessage {/*** 接收延遲隊列的消息,使用的是插件* @param message*/@RabbitListener(queues = {"queue.delay.4"})public void receiveMsg(Message message , Channel channel){//獲取消息的唯一標識long tag = message.getMessageProperties().getDeliveryTag();String body = null;try {body = new String(message.getBody());log.info("接收到消息的為: "+body);//TODO 正確的話,我們會有插入數據庫等//正確的話,就是告訴服務器,可以刪除了//第一個參數是,當前消息,// 第二個參數:是否批量處理channel.basicAck(tag,false);} catch (Exception e) {log.error("消息處理出現問題");try {//三個參數分別是:當前消息//是否是批量處理//是否重新入隊channel.basicNack(tag,false,true);} catch (IOException ex) {throw new RuntimeException(ex);}throw new RuntimeException(e);}

16 交換機的屬性

  • 具體參數(以下屬性是在創建交換機的時候設置的)
    • Name:交換機名稱;就是一個字符串
    • Type:交換機類型,direct, topic, fanout, headers四種
    • Durability:持久化,聲明交換機是否持久化,代表交換機在服務器重啟后是否還存在;默認是持久化的, yes為持久化 ,flase為不持久化(沒有保存到磁盤上).
    • Auto delete:是否自動刪除,曾經有隊列綁定到該交換機,后來解綁了,那就會自動刪除該交換機;
    • Internal:內部使用的,如果是yes,客戶端無法直接發消息到此交換機,它只能用于交換機與交換機的綁定。
    • Arguments:只有一個取值alternate-exchange,表示備用交換機;當隊列與交換機因為路由無法發送消息,則會發送到備用交換機上.
  • 例子
@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange(exchangeName).durable(true).autoDelete()   //設置自動刪除,默認不是自動刪除的,boolean類型的默認為flase.build();}
16.1 備用交換機

正常的情況下,備用隊列是不應該有消息的,當備用隊列有消息時,說明出錯了.提示我們及時更改代碼.

主交換機是direct(直連的) 備用交換機是fanout(扇形的)

描述:就是我們生產者與交換機的key是hello ,完后交換機與隊列的key是info,完后就不能進入隊列里面,則就會進入備用交換機里面,從而進入備用隊列。

當路由沒有寫錯,則就會進入指定的交換機,不會進入備用交換機。

  • 備用交換機的設置:
  • 第一種:使用建造者模式的時候 .alternate()
 public DirectExchange directExchange(){return ExchangeBuilder.directExchange(exchangeName).alternate()  //設置備用交換機.build();

17 隊列的詳細屬性

隊列屬性的設置都是在創建隊列的時候,來設置隊列的屬性。

  1. Type:隊列類型
  2. Name:隊列名稱,就是一個字符串,隨便一個字符串就可以;
  3. Durability:聲明隊列是否持久化,代表隊列在服務器重啟后是否還存在;
  4. Auto delete: 是否自動刪除,如果為true,當沒有消費者連接到這個隊列的時候,隊列會自動刪除;
  5. Exclusive:exclusive屬性的隊列只對首次聲明它的連接可見,并且在連接斷開時自動刪除;基本上不設置它,設置成false
  6. Arguments:隊列的其他屬性,例如指定DLX(死信交換機等)
    1. x-expires:Number

當Queue(隊列)在指定的時間未被訪問,則隊列將被自動刪除;

    1. x-message-ttl:Number

發布的消息在隊列中存在多長時間后被取消(單位毫秒);

    1. x-overflow:String

設置隊列溢出行為,當達到隊列的最大長度時,消息會發生什么,

有效值為Drop Head 刪除頭部 默認的

Reject Publish 拒絕發布 ,一旦隊列滿了以后,就不再接收新的消息

    1. x-max-length:Number

隊列所能容下消息的最大長度,當超出長度后,新消息將會覆蓋最前面的消息,類似于Redis的LRU算法;

    1. x-single-active-consumer:默認為false

激活單一的消費者,也就是該隊列只能有一個消息者消費消息;

    1. x-max-length-bytes:Number

限定一定的字節裝入隊列,當超出后就不在裝入隊列;

    1. x-dead-letter-exchange:String

指定隊列關聯的死信交換機,有時候我們希望當隊列的消息達到上限后溢出的消息不會被刪除掉,而是走到另一個隊列中保存起來;

    1. x-dead-letter-routing-key:String

指定死信交換機的路由鍵,一般和6一起定義;

    1. x-max-priority:Number

如果將一個隊列加上優先級參數,那么該隊列為優先級隊列; 數字越大優先級越高,默認優先級為0

(1)、給隊列加上優先級參數使其成為優先級隊列

x-max-priority=10【0-255取值范圍】

(2)、給消息加上優先級屬性

通過優先級特性,將一個隊列實現插隊消費;

實現方式:在創建消息的時候,通過MessageProperties中的setPriority()來設置優先級。

public void sendMsg(){MessageProperties messageProperties =new MessageProperties();messageProperties.setPriority(6);Message message = MessageBuilder.withBody("hello world".getBytes()).andProperties(messageProperties).build();rabbitTemplate.convertAndSend("exchange.return.1","info11111",message);}

18 消息的冪等性

消息消費時的冪等性(消息不被重復消費)

同一個消息,第一次接收,正常處理業務,如果該消息第二次再接收,那就不能再處理業務,否則就處理重復了;

冪等性是:對于一個資源,不管你請求一次還是請求多次,對該資源本身造成的影響應該是相同的,不能因為重復的請求而對該資源重復造成影響;

其中select 、update、delete 都是冪等的。insert是非冪等的。

在http中:get、put、delete都是冪等的。post是非冪等的。

18.1 如何避免消息的重復消費問題?

全局唯一ID + Redis

生產者在發送消息時,為每條消息設置一個全局唯一的messageId,消費者拿到消息后,將其放入redis中,并設置這個key,完后再對設置的結果進行判斷,如果為true,說明這條消息不存在就可以存入數據庫了 ,當一條相同的id的消息再次過來,則返回結果為flase,則不處理。

private ObjectMapper objectMapper 用于序列化和反序列化的 (json格式的)

objectMapper.writeValueAsString(傳入的對象) 將對象轉為字符串

objectMapper.readValue(消息,要轉成那個對象) 將字符串或字節數組,轉為對象

序列化----就是將對象轉為字符串,或者字節數組

反序列化---就是將字符串或者字節數組轉為對象

具體的代碼實現過程:

消費者拿到消息以后,放入redis中,并設置這個key,完后再對設置的結果進行判斷,如果為true,說明設置成功了(下一步就可以插入數據庫了)

//放入redis中,并設置id
Boolean  setRuslt = RedisTemplate.opsForValue().setIfAbsent(要設置的id)
//如果id不存在,說明沒有這條消息,就可以放入數據庫了
if(setRuslt){//就可以放入數據庫了
}
//下一條消息再過來,就返回的flase,完后就不處理了。

19 RabbitMQ集群與高可用

RabbitMQ 的集群分兩種模式,一種是默認集群模式,一種是鏡像集群模式;

在RabbitMQ集群中所有的節點(一個節點就是一個RabbitMQ的broker服務器)被歸為兩類:一類是磁盤節點,一類是內存節點;

磁盤節點會把集群的所有信息(比如交換機、綁定、隊列等信息)持久化到磁盤中,而內存節點只會將這些信息保存到內存中,如果該節點宕機或重啟,內存節點的數據會全部丟失,而磁盤節點的數據不會丟失;

19 .1 默認集群模式

默認集群模式也叫 普通集群模式、或者 內置集群模式;

RabbitMQ默認集群模式,只會把交換機、隊列、虛擬主機等元數據信息在各個節點同步,而具體隊列中的消息內容不會在各個節點中同步;

接發消息的原理

創建隊列的時候,只要在任何一個節點創建,都會復制到其他的兩個節點上。

接收消息的時候:

但是發消息的時候只會發送到節點1上,接消息的時候也是從節點1接收消息

發送消息的時候:

假如發消息的時候,我們連接到節點2,節點2上本身是不存儲消息的,它會把請求轉到節點1上,從而發送到節點1上。節點1停了,還是不能發消息的

假如接消息的時候是從節點3上接收,節點3上沒有消息,它會把請求轉到節點1上,從而接收到消息

元數據(除了消息本身其他的都是元數據)

隊列元數據:隊列名稱和屬性(是否可持久化,是否自動刪除)

交換器元數據:交換器名稱、類型和屬性

綁定元數據:交換器和隊列的綁定列表

(虛擬主機)vhost元數據:vhost內的相關屬性,如安全屬性等;

默認集群模式式

優點:

1)節省存儲空間;

2)性能提高;

如果消息需要復制到集群中每個節點,網絡開銷不可避免,持久化消息還需要寫磁盤,占用磁盤空間。

缺點:

當其中一個隊列宕機以后,則內容就會全部消失。

19.2 集群的搭建
  • 第一步:從已經安裝好rabbitmq的機器 clone 三臺機器,【注意clone完,先不要啟動三臺機器,三臺機器均要重新生成mac地址,防止clone出的機器ip地址重復
    • 虛擬機的克隆步驟:
      • 將虛擬機關機
      • 點擊管理---->點擊克隆---->選中虛擬機當前的狀態--->選中創建完整的虛擬機-->填寫克隆的虛擬機的名稱---> 點擊完成--->在啟動前改變其物理ip--->啟動前點擊網絡適配器---->高級---->召見MAC地址--->點擊幾遍生成 ------->克隆完成
  • 第二步:使用xshell 連接三臺機器
  • 第三步:修改三臺機器的/etc/hosts 文件,vim /etc/hosts
192.168.131.128 rabbit130
192.168.131.129 rabbit133
192.168.131.130 rabbit134

  • 第三步:三臺機器均重啟網絡,使節點名生效(可以直接關機重新啟動)
systemctl restart network
  • 第四步:三臺機器的防火墻處理
systemctl status firewalld ---查看防火墻的狀態
systemctl stop firewalld  --關閉防火墻
systemctl disable firewalld  --開機不啟動防火墻
  • 第五步:三臺機器 .erlang.cookie文件保持一致由于是clone出的三臺機器,所以肯定是一樣的
如果我們使用解壓縮方式安裝的RabbitMQ,那么該文件會在${用戶名}目錄下,
也就是${用戶名}/.erlang.cookie;
如果我們使用rpm安裝包方式進行安裝,那么這個文件會在/var/lib/rabbitmq目錄下;
查看隱藏文件用的ls-a

注意 .erlang.cookie的權限為400,目前已經是400

  • 第六步: 分別啟動
rabbitmq-server -detached
  • 第七步: 使用以下命令查看集群狀態 Disk Nodes 表示磁盤節點
rabbitmqctl cluster_status
  • 第八步:以上的三個機器還是互不相干三個機器,并不是集群,構建集群
    • 1、先把其他的兩個rabbitmq停掉
rabbitmqctl stop_app  僅僅是停掉rabbitmq
    • 2、把當前rabbit中的交換機和隊列全部重置
rabbitmqctl reset
    • 3、將當前rabbit加入到第一個rabbit中,并成為內存節點

--ram 參數表示讓rabbitmq128成為一個內存節點,如果不帶參數默認為disk磁盤節點;

rabbit@rabbit128 代表的主rabbitmq

rabbitmqctl join_cluster rabbit@rabbit128 --ram   rabbit@rabbit128主rabbitmq
    • 4、啟動rabbitmq
rabbitmqctl start_app
    • 5、查看主rabbit的狀態
rabbitmqctl cluster_status
19.3 操作集群中的一個節點,添加用戶和權限等
#列出用戶
rabbitmqctl list_users
# 添加用戶
rabbitmqctl add_user admin 123456
#查看權限
rabbitmqctl list_permissions
#設置權限
rabbitmqctl set_permissions admin ".*" ".*" ".*"
#設置角色
rabbitmqctl set_user_tags admin administrator【注意】:這個啟動插件需要給集群中的每一個rabbit都單獨啟動
#列出插件
rabbitmq-plugins list
#啟動web控制臺插件
rabbitmq-plugins enable rabbitmq_management 

使用web瀏覽器添加一個虛擬主機:powernode

實現原理

RabbitMQ底層是通過Erlang架構來實現的,所以rabbitmqctl會啟動Erlang節點,并基于Erlang節點來使用Erlang系統連接RabbitMQ節點,在連接過程中需要正確的Erlang Cookie和節點名稱,Erlang節點通過交換Erlang Cookie以獲得認證;

19.4 springboot連接集群

只需要改變配置文件即可 ,其他接發消息是一樣的。

spring:#配置rabbitmqrabbitmq:# 連接集群,使用逗號分隔addresses: 192.168.150.150:5672,192.168.150.151:5672,192.168.150.152:5672username: adminpassword: 123456virtual-host: powernode

20 鏡像集群模式

鏡像模式是基于默認集群模式加上一定的配置得來的;

在默認模式下的RabbitMQ集群,它會把所有節點的交換機、綁定、隊列的元數據進行復制確保所有節點都有一份相同的元數據信息,但是隊列數據分為兩種:一種是隊列的元數據信息(比如隊列的最大容量,隊列的名稱等配置信息),另一種是隊列里面的消息;

鏡像模式,則是把所有的隊列數據完全同步,包括元數據信息和消息數據信息,當然這對性能肯定會有一定影響,當對數據可靠性要求較高時,可以使用鏡像模式;

20.1鏡像模式的搭建

實現鏡像模式也非常簡單,它是在普通集群模式基礎之上搭建而成的;

鏡像隊列配置命令:

./rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
  • -p Vhost(虛擬主機): 可選參數,針對指定vhost下的queue進行設置;
  • Name: policy的名稱;(可以自己取個名字就可以)
  • Pattern: queue的匹配模式(正則表達式); ----就是說想對哪些主機進行鏡像
  • Definition:鏡像定義,包括三個部分ha-mode, ha-params, ha-sync-mode;---ha代表高可用
  • 【例子】:{“ha-mode”:”exactly”,”ha-params”:2}
    • ha-mode:指明鏡像隊列的模式,有效值為 all/exactly/nodes
      • all:表示在集群中所有的節點上進行鏡像
      • exactly:表示在指定個數的節點上進行鏡像,節點的個數由ha-params指定
      • nodes:表示在指定的節點上進行鏡像,節點名稱通過ha-params指
    • ha-params:ha-mode模式需要用到的參數
    • ha-sync-mode:進行隊列中消息的同步方式,有效值為automatic(自動)和manual(手動)
  • priority:可選參數,policy的優先級;
  • 【例子】:"^policy_表示以policy開頭的隊列
./rabbitmqctl set_policy -p powernode ha_policy "^policy_" 
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
  • 【例子2】:如果要在所有節點所有隊列上進行鏡像,則(在任意節點執行如下命令):

所有節點、所有虛擬主機、所有隊列 都進行鏡像

./rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}' 
  • 【例子3】:針對某個虛擬主機進行鏡像
rabbitmqctl set_policy -p powernode ha-all "^"
'{"ha-mode":"all","ha-sync-mode":"automatic"}'

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/63308.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/63308.shtml
英文地址,請注明出處:http://en.pswp.cn/web/63308.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

請求三方http工具

請求三方接口工具封裝 實現邏輯&#xff1a; 發起請求&#xff0c;輸入基本請求信息&#xff1a;請求地址&#xff0c;請求類型&#xff0c;請求參數&#xff0c;是否需要認證工具自動為需要添加認證的請求添加認證&#xff0c;如果發現token快要過期或返回的錯誤編碼為定義的…

HP服務器開啟性能模式

ENERGY PERF BIAS CFG 模式指的是通過特定配置(通常是 BIOS 或操作系統中的設置)來控制處理器的能源性能偏置(Energy Performance Bias, EPB)。EPB 是一種機制,允許用戶或系統管理員在性能和功耗之間進行權衡。不同的設置可以影響系統的響應速度、能效等。 ENERGY PERF B…

調用釘釘接口發送消息

調用釘釘接口發送消息 通過創建釘釘開放平臺創建H5小程序&#xff0c;通過該小程序可以實現向企業內的釘釘用戶發送消息&#xff08;消息是以工作通知的形式發送&#xff09; 1、目前僅支持發送文本消息&#xff0c;相同內容的文本只能成功發送一次&#xff0c;但是接口返回發…

純css 實現呼吸燈效果

開始效果 呼吸效果 實現代碼 <div class"container"><div class"breathing-light"></div> </div><style>html,body {height: 100%;background-color: white;}.container {padding: 100px;}.container .breathing-light {wi…

進程通信方式---共享映射區(無血緣關系用的)

5.共享映射區&#xff08;無血緣關系用的&#xff09; 文章目錄 5.共享映射區&#xff08;無血緣關系用的&#xff09;1.概述2.mmap&&munmap函數3.mmap注意事項4.mmap實現進程通信父子進程練習 無血緣關系 5.mmap匿名映射區 1.概述 原理&#xff1a;共享映射區是將文件…

《云原生安全攻防》-- K8s安全框架:認證、鑒權與準入控制

從本節課程開始&#xff0c;我們將來介紹K8s安全框架&#xff0c;這是保障K8s集群安全比較關鍵的安全機制。接下來&#xff0c;讓我們一起來探索K8s安全框架的運行機制。 在這個課程中&#xff0c;我們將學習以下內容&#xff1a; K8s安全框架&#xff1a;由認證、鑒權和準入控…

day08-別名-重定向-去重排序等

1.重復用touch命令創建同一份文件&#xff0c;會修改文件的時間戳。 alias命令&#xff1a; 別名 查看已有別名&#xff1a;alias [rootoldboy ~]# alias alias cpcp -i alias egrepegrep --colorauto alias fgrepfgrep --colorauto alias grepgrep --colorauto alias l.ls…

Qt WORD/PDF(四)使用 QAxObject 對 Word 替換(QWidget)

關于QT Widget 其它文章請點擊這里: QT Widget 國際站點 GitHub: https://github.com/chenchuhan 國內站點 Gitee : https://gitee.com/chuck_chee 姊妹篇: Qt WORD/PDF&#xff08;一&#xff09;使用 QtPdfium庫實現 PDF 操作 Qt WORD/PDF&#xff08;二…

設計一個基礎JWT的多開發語言分布式電商系統

在設計一個分布式電商系統時&#xff0c;保證系統的可擴展性、性能以及跨語言的兼容性是至關重要的。隨著微服務架構的流行&#xff0c;越來越多的電商系統需要在多個服務間共享信息&#xff0c;并且保證服務的安全性。在這樣的場景下&#xff0c;JSON Web Token&#xff08;JW…

實踐分享 | 公共數據金融應用的理論探索與實踐研究—以人民幣銀行結算賬戶數據應用為例

摘要:公共數據具有高權威性、高準確性、高價值性以及高應用性的特點,實現公共數據的金融應用對更好服務實體經濟、防控金融風險和提升金融服務水平具有重要現實意義。本文從理論探索與實踐研究兩個層面分析了公共數據金融應用的具體問題,一方面探索性的給出了公共數據金融應…

Node的學習以及學習通過Node書寫接口并簡單操作數據庫

Node的學習 Node的基礎上述是關于Node的一些基礎&#xff0c;總結的還行&#xff1b; 利用Node書寫接口并操作數據庫 1. 初始化項目 創建新的項目文件夾&#xff0c;并初始化 package.json mkdir my-backend cd my-backend npm init -y2. 安裝必要的依賴 安裝Express.js&…

計算機視覺中的特征提取算法

摘要&#xff1a; 本文聚焦于計算機視覺中的特征提取算法&#xff0c;深入探討尺度不變特征變換&#xff08;SIFT&#xff09;算法。詳細闡述 SIFT 算法的原理&#xff0c;包括尺度空間構建、關鍵點檢測、方向分配與特征描述子生成等核心步驟。通過 C#、Python 和 C 三種編程語…

MySQL 主從復制與 Binlog 深度解析

目錄 1. Binlog的工作原理與配置2. 主從復制的設置與故障排除3. 數據一致性與同步延遲的處理 小結 MySQL的binlog&#xff08;二進制日志&#xff09;和主從復制是實現數據備份、容災、負載均衡以及數據同步的重要機制。在高可用性架構和分布式數據庫設計中&#xff0c;binlog同…

排隊論、負載均衡和任務調度關系

目錄 排隊論、負載均衡和任務調度關系 一、排隊論 二、負載均衡 三、任務調度 四、總結 排隊論、負載均衡和任務調度關系 排隊論為負載均衡和任務調度提供了數學理論和方法支持 排隊論、負載均衡和任務調度是三個相關但不同的概念。以下是對這三個概念的詳細解釋和它們之…

java版詢價采購系統 招投標詢價競標投標系統 招投標公告系統源碼

功能描述 1、門戶管理&#xff1a;所有用戶可在門戶頁面查看所有的公告信息及相關的通知信息。主要板塊包含&#xff1a;招標公告、非招標公告、系統通知、政策法規。 2、立項管理&#xff1a;企業用戶可對需要采購的項目進行立項申請&#xff0c;并提交審批&#xff0c;查看所…

景聯文科技入選中國信通院發布的“人工智能數據標注產業圖譜”

近日&#xff0c;由中國信息通信研究院、中國人工智能產業發展聯盟牽頭&#xff0c;聯合中國電信集團、沈陽市數據局、保定高新區等70多家單位編制完成并發布《人工智能數據標注產業圖譜》。景聯文科技作為人工智能產業關鍵環節的代表企業&#xff0c;入選圖譜中技術服務板塊。…

【小沐學GIS】基于C++繪制三維數字地球Earth(OpenGL、glfw、glut、QT)第三期

&#x1f37a;三維數字地球系列相關文章如下&#x1f37a;&#xff1a;1【小沐學GIS】基于C繪制三維數字地球Earth&#xff08;456:OpenGL、glfw、glut&#xff09;第一期2【小沐學GIS】基于C繪制三維數字地球Earth&#xff08;456:OpenGL、glfw、glut&#xff09;第二期3【小沐…

實景視頻與模型疊加融合?

[視頻GIS系列]無人機視頻與與實景模型進行實時融合_無人機視頻融合-CSDN博客文章瀏覽閱讀1.5k次&#xff0c;點贊28次&#xff0c;收藏14次。將無人機視頻與實景模型進行實時融合是一個涉及多個技術領域的復雜過程&#xff0c;主要包括無人機視頻采集、實景模型構建、視頻與模型…

MySQL通過binlog日志進行數據恢復

記錄一次阿里云MySQL通過binlog日志進行數據回滾 問題描述由于阿里云遠程mysql沒有做安全策略 所以服務器被別人遠程攻擊把數據庫給刪除&#xff0c;通過查看binlog日志可以看到進行了drop操作&#xff0c;下面將演示通過binlog日志進行數據回滾操作。 1、查詢是否開始binlog …

IDEA 修改格式化僅格式化本次改動代碼

最近總是發現格式化的時候會格式化文件所有代碼&#xff0c;提交Git 后再看提交日志&#xff0c;就很不清晰。修改方式如下 中文&#xff1a; 格式化代碼快捷鍵[中文配置]&#xff1a; 英文&#xff1a; 格式化代碼快捷鍵[英文配置]&#xff1a;