1.pom依賴添加
<properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.18</version><relativePath /> <!-- lookup parent from repository --></parent><dependencies><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId></exclusion><exclusion><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId></exclusion></exclusions></dependency><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot</artifactId></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context-support</artifactId></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-autoconfigure --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies>
2. 配置文件添加配置:
server:port: 8080spring:application:name: application-kafkakafka:bootstrap-servers: 192.168.190.100:9092,192.168.190.101:9092 #這個是kafka的地址,對應你server.properties中配置的producer:batch-size: 16384 #批量大小acks: 1 #應答級別:多少個分區副本備份完成時向生產者發送ack確認(可選0、1、all/-1)retries: 10 # 消息發送重試次數#transaction-id-prefix: transactionbuffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:linger:ms: 2000 #提交延遲#partitioner: #指定分區器#class: com.test.config.CustomerPartitionHandlerconsumer:group-id: testGroup,testg2 #默認的消費組IDenable-auto-commit: true #是否自動提交offsetauto-commit-interval: 2000 #提交offset延時# 當kafka中沒有初始offset或offset超出范圍時將自動重置offset# earliest:重置為分區中最小的offset;# latest:重置為分區中最新的offset(消費分區中新產生的數據);# none:只要有一個分區不存在已提交的offset,就拋出異常;auto-offset-reset: latestmax-poll-records: 500 #單次拉取消息的最大條數key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session:timeout:ms: 120000 # 消費會話超時時間(超過這個時間 consumer 沒有發送心跳,就會觸發 rebalance 操作)request:timeout:ms: 18000 # 消費請求的超時時間listener:missing-topics-fatal: false # consumer listener topics 不存在時,啟動項目就會報錯
# type: batchlogging:config: classpath:log4j2.xml
3. 日志配置
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO"><!--全局參數--><Properties><Property name="pattern">%d{yyyy-MM-dd HH:mm:ss,SSS} %5p %c{1}:%L - %m%n</Property>
<!-- <Property name="logDir">/data/logs/logViewer</Property> --><Property name="logDir">logs</Property></Properties><Appenders><!-- 定義輸出到控制臺 --><Console name="console" target="SYSTEM_OUT" follow="true"><!--控制臺只輸出level及以上級別的信息-->
<!-- <ThresholdFilter level="debug" onMatch="ACCEPT" onMismatch="DENY"/> --><PatternLayout><Pattern>${pattern}</Pattern></PatternLayout></Console><!-- 同一來源的Appender可以定義多個RollingFile,定義按天存儲日志 --><RollingFile name="rolling_file"fileName="${logDir}/logViewer.log"filePattern="${logDir}/logViewer_%d{yyyy-MM-dd}.log"><ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/><PatternLayout><Pattern>${pattern}</Pattern></PatternLayout><Policies><TimeBasedTriggeringPolicy interval="1"/></Policies><!-- 日志保留策略,配置只保留七天 --><DefaultRolloverStrategy><Delete basePath="${logDir}/" maxDepth="1"><IfFileName glob="logViewer_*.log" /><IfLastModified age="7d" /></Delete></DefaultRolloverStrategy></RollingFile></Appenders><Loggers><Root level="INFO"><AppenderRef ref="console"/><AppenderRef ref="rolling_file"/></Root></Loggers>
</Configuration>
4. controller入口類,其它應用通過該接口直接將數據寫入kafka
@RequestMapping(value="/kafka")
@Controller
public class ProducerController {@Autowiredprivate KafkaTemplate kafkaTemplate;// 模擬發送消息@RequestMapping(value = "/send",method = RequestMethod.GET)public String sendMessage(@PathParam(value = "msg") String msg) {System.out.println("收到get請求。。。");kafkaTemplate.send("test",msg);return "成功";}
5. kafka回調方法(需要回調通知時使用該方式):
@GetMapping("/kafka/callbackTwo/{message}")public void sendCallbackTwoMessage(@PathVariable("message") String message) {kafkaTemplate.send("test", message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {System.out.println("發送消息失敗2:"+throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> result) {System.out.println("發送消息成功2:" + result.getRecordMetadata().topic() + "-"+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());}});}
6.kafka消費者注冊
@Component
public class KafkaMessageListener {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);/*** containerFactory* 消息過濾器消息過濾器可以在消息抵達consumer之前被攔截,在實際應用中,我們可以根據自己的業務邏輯,篩選出需要的信息再交由KafkaListener處理,不需要的消息則過濾掉。配置消息過濾只需要為 監聽器工廠 配置一個RecordFilterStrategy(消息過濾策略),返回true的時候消息將會被拋棄,返回false時,消息能正常抵達監聽容器。* @param record*/@KafkaListener(topics = {"test","test2"},groupId = "testGroup")public void listenTestStatus(ConsumerRecord<?, ?> record) {LOGGER.info("接收到消息:開始業務處理。。。。。");if (null == record || null == record.value()) {LOGGER.info("接收到空數據,跳過");}else {LOGGER.info("test-topics -->kafka監聽到的值為: {}", record.value().toString());}}@KafkaListener(topics = {"test"},groupId = "testg2")public void listenTest2(ConsumerRecord<?, ?> record) {LOGGER.info("###listenTest2接收到消息:開始業務處理。。。。。");if (null == record || null == record.value()) {LOGGER.info("接收到空數據,跳過");}else {LOGGER.info("test-topics -->kafka監聽到的值為: {}", record.value().toString());}}/*** id:消費者IDgroupId:消費組IDtopics:監聽的topic,可監聽多個topicPartitions:可配置更加詳細的監聽信息,可指定topic、parition、offset監聽,手動分區* @param records*///批量消費@KafkaListener(id = "consumer2", topics = {"test"}, groupId = "testGroup",errorHandler = "consumerAwareErrorHandler")public void onBatchMessage(List<ConsumerRecord<String, Object>> records) {System.out.println(">>> 批量消費一次,recoreds.size()=" + records.size());for (ConsumerRecord<String, Object> record : records) {System.out.println(record.value());}}
7.非spring-boot環境下使用java原生API手寫kafka生產消息:
public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:設置發送者相關屬性Properties props = new Properties();// 此處配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");// 配置key的序列化類props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化類props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);for(int i = 0; i < 5; i++) {//Part2:構建消息ProducerRecord<String, String> record = new ProducerRecord<>("test", Integer.toString(i), "MyProducer" + i);//Part3:發送消息//單向發送:不關心服務端的應答。producer.send(record);System.out.println("message "+i+" sended");}//消息處理完才停止發送者。producer.close();}
8.非spring-boot環境下使用java原生API手寫java手寫kafka消費者:
public static void main(String[] args) {//PART1:設置發送者相關屬性Properties props = new Properties();//kafka地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, MyProducerTest.BOOTSTRAP_SERVERS);//每個消費者要指定一個groupprops.put(ConsumerConfig.GROUP_ID_CONFIG, "test");//key序列化類props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//value序列化類props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("test"));while (true) {//PART2:拉取消息// 100毫秒超時時間ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));//PART3:處理消息for (ConsumerRecord<String, String> record : records) {System.out.println("partition = "+record.partition()+"offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());}//提交offset,消息就不會重復推送。consumer.commitSync(); //同步提交,表示必須等到offset提交完畢,再去消費下一批數據。
// consumer.commitAsync(); //異步提交,表示發送完提交offset請求后,就開始消費下一批數據了。不用等到Broker的確認。}
9.非spring-boot環境下使用java原生API手寫異步發送kafka:
public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:設置發送者相關屬性Properties props = new Properties();// 此處配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, MyProducerTest.BOOTSTRAP_SERVERS);// 配置key的序列化類props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化類props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);CountDownLatch latch = new CountDownLatch(5);for(int i = 0; i < 5; i++) {//Part2:構建消息ProducerRecord<String, String> record = new ProducerRecord<>("test", Integer.toString(i), "MyProducer" + i);//Part3:發送消息//異步發送:消息發送后不阻塞,服務端有應答后會觸發回調函數producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(null != e){System.out.println("消息發送失敗,"+e.getMessage());e.printStackTrace();}else{String topic = recordMetadata.topic();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);}latch.countDown();}});}//消息處理完才停止發送者。latch.await();//消息處理完才停止發送者。producer.close();}