文章目錄
- 集群架構
- 概述
- 仲裁隊列的使用
- 1. 使用Spring框架代碼創建
- 2. 使用amqp-client創建
- 3. 使用管理平臺創建
- 負載均衡
- 引入HAProxy 負載均衡:
- 使用方法
- 1. 修改配置文件
- 2. 聲明隊列 test_cluster
- 3. 發送消息
集群架構
概述
RabbitMQ支持部署多個結點,每個結點存儲相同的數據,本質上沒有區別。用戶可以訪問任意一個結點,其響應結果是一致的。每個結點都包含多個隊列,隊列的類型有很多,本博客主要探討經典隊列(Classic)和仲裁隊列(Quorum)。不論什么類型的隊列,都會存儲兩類消息:
- 元數據:隊列名稱、交換機信息、綁定等
- 消息數據:隊列中存儲的實際消息。
1. 經典隊列(Classic Queues)
- 特點:
- 元數據在集群所有節點共享,消息默認存儲在主節點,
因此如果某個結點一旦宕機,對應存儲的消息數據在該集群中將會丟失
。 - 支持持久化(Durable)和非持久化。
- 一致性較弱(最終一致性),性能高,靈活性強。
- 適合通用場景,如任務分發、日志收集。
- 元數據在集群所有節點共享,消息默認存儲在主節點,
2. 仲裁隊列(Quorum Queues)
- 特點:
-
基于Raft 共識算法的高可用隊列,消息數據會同步到集群中的其他節點,即使某個結點宕機,也能夠保證集群的數據一致性。
-
性能開銷較高,不支持部分經典隊列功能(如優先級)。
-
適合對一致性要求高的場景,如金融交易、訂單處理。
-
Raft 共識算法,這里用動畫的方式形象的闡釋了Raft保證數據一致性的執行流程,所以小編在這里偷個懶。
仲裁隊列的使用
仲裁隊列(Quorum Queue)是RabbitMQ中的一種高可用隊列,它能夠在節點故障時繼續提供服務。以下是創建仲裁隊列的三種方式:
1. 使用Spring框架代碼創建
通過Spring的注解和配置,可以方便地創建仲裁隊列。
@Configuration
public class QuorumConfig {@Bean("quorumQueue")public Queue quorumQueue() {return QueueBuilder.durable("quorum_queue").quorum().build();}
}
2. 使用amqp-client創建
通過Java代碼直接使用amqp-client庫來創建仲裁隊列。
public class QuorumProducer {public static void main(String[] args) throws IOException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {Map<String, Object> param = new HashMap<>();param.put("x-queue-type", "quorum");channel.queueDeclare("quorum_queue", true, false, false, param);}}
}
3. 使用管理平臺創建
通過RabbitMQ的管理平臺,可以圖形化地創建仲裁隊列。
負載均衡
雖然RabbitMQ支持集群部署,看似好像提升了流量的承載力。但是如果請求只發送給一個或者那幾個負載過高的結點,羊毛一直往一處薅,這個結點一旦掛掉,那么用戶無法訪問了!
解決辦法——
引入HAProxy 負載均衡:
它和我們之前在Spirng中學的LoadBalence類似會把請求路由到正常的結點,并且個可以設定路由策略,充分利用每一個結點資源。
使用方法
在現代微服務架構中,負載均衡是確保服務高可用性和性能的關鍵技術之一。本文將介紹如何使用RabbitMQ實現負載均衡,并通過示例代碼展示其具體實現步驟。
1. 修改配置文件
首先,需要修改RabbitMQ的配置文件,將HAProxy的IP和端口設置為RabbitMQ的綁定地址。
spring:rabbitmq:addresses: amqp://study:study@124.71.229.73:15670/test
2. 聲明隊列 test_cluster
在Spring Boot應用中,我們需要聲明一個隊列,用于負載均衡。
@Configuration
public class ClusterConfig {@Bean("ClusterQueue")public Queue clusterQueue() {return QueueBuilder.durable(Constant.CLUSTER_QUEUE).quorum().build();}
}
3. 發送消息
接下來,我們可以通過控制器發送消息到聲明的隊列。
@RestController
@RequestMapping("/cluster")
public class ClusterController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMappingpublic String cluster() {rabbitTemplate.convertAndSend("", Constant.CLUSTER_QUEUE, "quorum test...");return "發送成功!";}
}
或者使用amqp
客戶端發送消息:
public class ClusterProducer {private static final String QUEUE_NAME = "hello_world";public static void main(String[] args) throws IOException, TimeoutException {// 1. 創建連接工廠ConnectionFactory factory = new ConnectionFactory();// 2. 設置參數factory.setHost("124.71.229.73"); // HAProxy 地址factory.setPort(5670); // HAProxy 端口factory.setUsername("host"); // 用戶名,默認factory.setPassword("study"); // 密碼,默認// 3. 創建連接connectionConnection connection = factory.newConnection();// 4. 創建channel通道Channel channel = connection.createChannel();// 5. 聲明隊列Map<String, Object> param = new HashMap<>();param.put("x-queue-type", "quorum");channel.queueDeclare("test_cluster", true, false, false, param);// 6. 通過channel發送消息到隊列中String msg = "hello cluster...";// 簡單模式下,使用的是默認交換機,使用默認交換機時,routingKey要和隊列名稱一致,才可以路由到對應的隊列中去channel.basicPublish("", "test_cluster", null, msg.getBytes());// 7. 釋放資源System.out.println("消息發送成功!");connection.close();}
}