四種主流消息隊列(Kafka、ActiveMQ、RabbitMQ、RocketMQ)的生產者與消費者傳遞信息的機制說明,以及實際使用中的注意事項和示例:
1. Apache Kafka
傳遞機制
- 模型:基于?發布-訂閱模型,生產者向?主題(Topic)?發送消息,消費者訂閱主題并消費消息。
- 核心流程:
- 生產者將消息發送到 Kafka 集群的 Broker,根據?分區策略(如輪詢、哈希)將消息寫入對應的分區(Partition)。
- 消費者通過消費者組(Consumer Group)訂閱主題,每個分區的數據會被分配給組內的消費者(通過?Rebalance?機制)。
- 消費者從分區中拉取消息(
poll
?方式)并處理。
示例代碼(Kafka 生產者)
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic", "key", "message"));
producer.close();
示例代碼(Kafka 消費者)
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}
}
注意事項
-
分區與順序性:
- Kafka 不保證跨分區的消息順序,但單個分區內的消息按順序存儲。
- 示例:發送訂單創建事件時,需將同一用戶的消息發送到同一分區(通過?
key
)。
-
消費者組與 Rebalance:
- 消費者組內成員變化時(如新增消費者),會觸發分區重新分配(Rebalance),可能導致短暫消息不可讀。
- 建議:避免頻繁增減消費者實例。
-
消息持久化:
- 生產者可通過?
acks=all
?確保消息寫入所有副本后返回成功,但會增加延遲。 - 適用場景:對消息可靠性要求極高的場景(如金融交易)。
- 生產者可通過?
2. Apache ActiveMQ
傳遞機制
- 模型:支持?點對點(Queue)?和?發布-訂閱(Topic)?模型。
- 核心流程:
- 生產者發送消息到隊列或主題。
- 消息通過?異步/同步?方式傳遞給消費者(默認異步)。
- 可啟用?持久化,消息存儲到磁盤以防 Broker宕機。
示例代碼(ActiveMQ 生產者)
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("my-queue");MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("Hello ActiveMQ!");
producer.send(message);
connection.close();
示例代碼(ActiveMQ 消費者)
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("my-queue");MessageConsumer consumer = session.createConsumer(queue);
TextMessage message = (TextMessage) consumer.receive();
System.out.println("Received: " + message.getText());
consumer.acknowledge(); // 手動確認消息
connection.close();
注意事項
-
消息持久化:
- 需設置?
DeliveryMode.PERSISTENT
,否則消息可能丟失。 - 示例:關鍵業務消息(如訂單支付通知)必須持久化。
- 需設置?
-
事務支持:
- 生產者和消費者可通過事務確保消息的原子性(發送/接收一致性)。
- 風險:長事務可能導致性能下降。
-
死信隊列(DLQ):
- 配置?
deadLetterExchange
?和?deadLetterRoutingKey
?處理無法消費的消息。 - 示例:超過重試次數的消息自動進入 DLQ。
- 配置?
3. RabbitMQ
傳遞機制
- 模型:靈活的消息路由模型,基于?交換器(Exchange)?和?綁定(Binding)。
- 核心流程:
- 生產者將消息發送到交換器,并附帶路由鍵(Routing Key)。
- 交換器根據類型(如 Direct、Topic、Headers)將消息路由到綁定的隊列。
- 消費者從隊列中拉取消息。
示例代碼(RabbitMQ 生產者Producers)
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection()) {Channel channel = connection.createChannel();String exchangeName = "direct-exchange";channel.exchangeDeclare(exchangeName, "direct");String routingKey = "user.login";AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2) // 持久化.build();channel.basicPublish(exchangeName, routingKey, props, "Login Event".getBytes());
}
示例代碼(RabbitMQ 消費者Consumers)
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection()) {Channel channel = connection.createChannel();String queueName = "user_queue";channel.queueDeclare(queueName, true, false, false, null);String exchangeName = "direct-exchange";channel.queueBind(queueName, exchangeName, "user.login");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Received: " + message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});
}
注意事項
-
消息確認機制:
- 消費者需發送?
ACK
?確認消息處理,避免重復消費。 - 示例:使用?
channel.basicAck()
?或?channel.basicNack()
。
- 消費者需發送?
-
死信隊列配置:
- 在隊列聲明時配置?
x-dead-letter-exchange
?和?x-dead-letter-routing-key
。 - 示例:處理失敗的消息進入專用隊列。
- 在隊列聲明時配置?
-
內存限制:
- RabbitMQ 默認限制隊列大小為內存中的一定比例,需根據業務調整?
vm_memory_high_watermark
。
- RabbitMQ 默認限制隊列大小為內存中的一定比例,需根據業務調整?
4. RocketMQ
傳遞機制
- 模型:基于?主題(Topic)?和?隊列(Queue)?的分布式模型。
- 核心流程:
- 生產者Producers發送消息到主題,主題將消息路由到多個隊列(負載均衡)。
- 消費者Consumers通過消費者組(Consumer Group)訂閱主題,從隊列中拉取消息。
- 順序消息:同一隊列內的消息按順序消費。
- 廣播消息:消費者組內每個消費者都收到同一條消息(僅限 Topic 模型)。
示例代碼(RocketMQ 生產者Producers)
DefaultMQProducer producer = new DefaultMQProducer("my-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();Message msg = new Message("my-topic", "Order-123".getBytes(), "JSON".getBytes());
SendResult sendResult = producer.send(msg);
System.out.println("Send Result: " + sendResult);producer.shutdown();
示例代碼(RocketMQ 消費者Consumers)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("my-topic", "*"); // 訂閱所有隊列consumer.registerMessageListener(new MessageListener() {@Overridepublic void consume(Message msg, ConsumeContext context) throws Exception {System.out.println("Received: " + new String(msg.getBody()));context.commitMessage(msg); // 提交消費位移}
});
consumer.start();
注意事項
-
事務消息:
- 生產者和消費者可通過事務確保消息的最終一致性。
- 示例:訂單創建成功后,發送支付通知(若失敗則回滾)。
-
消息順序性:
- 嚴格順序場景需指定?
MessageQueueSelector
,確保同一訂單的所有消息進入同一隊列。
- 嚴格順序場景需指定?
-
消息堆積:
- 消費者處理能力不足時,消息會堆積在隊列中,需監控并擴容消費者實例。
總結對比
特性 | Kafka | ActiveMQ | RabbitMQ | RocketMQ |
---|---|---|---|---|
模型 | 發布-訂閱(僅 Topic) | 支持點對點和發布-訂閱 | 靈活路由(多種交換器) | 主題+隊列(順序/廣播) |
持久化 | 支持分區副本 | 支持消息持久化和事務 | 支持隊列和消息持久化 | 支持消息持久化和事務 |
順序性 | 單分區有序 | 不保證(除非事務) | 可通過隊列保證 | 單隊列嚴格有序 |
適用場景 | 高吞吐、日志/事件流 | 通用、企業級消息系統 | 復雜路由、多協議支持 | 高可靠、順序消息、分布式事務 |
通用注意事項
- 消息冪等性:防止重復消費(如訂單支付場景)。
- 監控與告警:關注隊列長度、消息堆積、消費者延遲。
- 序列化與壓縮:選擇高效的序列化方式(如 Protobuf)和壓縮算法(如 GZIP)。
- 連接池管理:避免頻繁創建/關閉連接,影響性能。
5、注意MQ的Kafka、ActiveMQ、RabbitMQ、RocketMQ區別;
? ? ? ? URL: 淺識MQ的 Kafka、ActiveMQ、RabbitMQ、RocketMQ區別-CSDN博客
6、注意:持久化策略
? ? ? ? URL:淺聊MQ之Kafka、RabbitMQ、ActiveMQ、RocketMQ持久化策略-CSDN博客
? ?
(望各位潘安、各位子健/各位彥祖、于晏不吝賜教!多多指正!🙏)