RocketMQ—消費者的兩種消費模式
RocketMQ消息消費的模式分為兩種:負載均衡模式和廣播模式,負載均衡模式表示多個消費者交替消費同一個主題里面的消息;廣播模式表示每個每個消費者都消費一遍訂閱的主題的消息。
負載均衡模式
CLUSTERING 集群模式下 隊列會被消費者分攤, 隊列數量>=消費者數量 消息的消費位點 mq服務器會記錄處理
代碼如下
/*** CLUSTERING 集群模式下 隊列會被消費者分攤, 隊列數量>=消費者數量 消息的消費位點 mq服務器會記錄處理*/
@Component
@RocketMQMessageListener(topic = "modeTopic",consumerGroup = "mode-consumer-group-a",messageModel = MessageModel.CLUSTERING, // 集群模式 負載均衡consumeThreadNumber = 40)
public class DC1 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("我是mode-consumer-group-a組的第一個消費者:" + message);}
}@Component
@RocketMQMessageListener(topic = "modeTopic",consumerGroup = "mode-consumer-group-a",messageModel = MessageModel.CLUSTERING // 集群模式
)
public class DC2 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("我是mode-consumer-group-a組的第二個消費者:" + message);}
}
廣播模式
BROADCASTING 廣播模式下 消息會被每一個消費者都處理一次, mq服務器不會記錄消費點位,也不會重試。
@Component
@RocketMQMessageListener(topic = "modeTopic",consumerGroup = "mode-consumer-group-b",messageModel = MessageModel.BROADCASTING // 廣播模式
)
public class DC4 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("我是mode-consumer-group-b組的第一個消費者:" + message);}
}
@Component
@RocketMQMessageListener(topic = "modeTopic",consumerGroup = "mode-consumer-group-b",messageModel = MessageModel.BROADCASTING // 廣播模式
)
public class DC5 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("我是mode-consumer-group-b組的第二個消費者:" + message);}
}