概述
一般消息隊列的是實現是支持兩種模式的,即點對點,還有一種是topic發布訂閱者模式,比如ACTIVEMQ。KAFKA也支持這兩種模式,但是實現的原理不一樣。
KAFKA 的消息被讀取后,并不是馬上刪除,這樣就可以重復讀取。kafka 正式利用這種特性實現發布訂閱者模式。
即在發布消息的時候,發布一個topic,可以使用配置多個消費者來消費,消費者使用分組來實現。比如一個topic ,有兩個分組的消費者訂閱。
那么發布一個消息的時候,兩個分組的消費者可以讀取到此條消息。
實現
配置兩組消費者。
分組1
<bean id="consumerProperties" class="java.util.HashMap"><constructor-arg><map><!-- 配置kafka的broke --><entry key="bootstrap.servers" value="${kafka.brokerurl}"/><!-- 配置組--><entry key="group.id" value="group1"/><entry key="enable.auto.commit" value="true"/><entry key="auto.commit.interval.ms" value="1000"/><entry key="session.timeout.ms" value="30000"/><entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/><entry key="value.deserializer" value="com.redxun.jms.ObjectDeSerializer"/></map></constructor-arg></bean><!-- 創建consumerFactory bean --><bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"><constructor-arg><ref bean="consumerProperties"/></constructor-arg></bean>
注意 這個分組的ID 是 group1
分組2
<bean id="consumerProperties2" class="java.util.HashMap"><constructor-arg><map><!-- 配置kafka的broke --><entry key="bootstrap.servers" value="${kafka.brokerurl}"/><!-- 配置組--><entry key="group.id" value="group2"/><entry key="enable.auto.commit" value="true"/><entry key="auto.commit.interval.ms" value="1000"/><entry key="session.timeout.ms" value="30000"/><entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/><entry key="value.deserializer" value="com.redxun.jms.ObjectDeSerializer"/></map></constructor-arg></bean><!-- 創建consumerFactory bean --><bean id="consumerFactory2" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"><constructor-arg><ref bean="consumerProperties2"/></constructor-arg></bean>
這里配置的分組是2 group2 。
我們使用代碼測試發布消息:
IMessageProducer producer= MessageUtil.getProducer();LogEntity ent=new LogEntity();ent.setId("000000001");ent.setIp("192.168.1.1");ent.setAction("test");producer.send("logMessageQueue", ent);return "1";
我們在發布一個消息的時候,兩個分組的消費者都讀取到了這條消息,因此就實現了 發布訂閱者模式。
?