Kafka 的生產者攔截器和消費者攔截器允許你在消息發送前后以及消息消費前后嵌入自定義邏輯,用于實現監控、審計、消息修改等功能。本文我們就用一個最常見的傳遞TraceId的案例來說明下這兩類攔截器如何來使用。
生產者發送攔截器
生產者攔截器需要實現 org.apache.kafka.clients.producer.ProducerInterceptor
接口。在這個攔截器中,我們把保存到ThreadLocal中的traceId設置到消息的header中。
步驟 1:實現攔截器類
創建一個類,實現 ProducerInterceptor
接口。該接口有兩個核心方法:
-
onSend(ProducerRecord record): 在消息被序列化和計算分區之前調用。你可以修改或記錄消息。
-
onAcknowledgement(RecordMetadata metadata, Exception exception): 在消息被服務器確認(成功或失敗)之后調用。這會在生產者回調觸發之前調用。注意:該方法不要在 ProducerInterceptor 中實現耗時邏輯,因為它會阻塞生產者。
public class SendTraceIdInterceptor implements ProducerInterceptor<String, String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {// 把TheadLocal中traceId設置到header中producerRecord.headers().add(RequestContext.TRACE_ID, RequestContext.getTraceId().getBytes());return producerRecord;}@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {if(e == null){log.info("send successfully");} else {log.error("send error : {}", e);}}@Overridepublic void close() {}// 這里可以拿到所有的producer的配置信息@Overridepublic void configure(Map<String, ?> map) {log.info("configure:{}", map);}
}
步驟 2:在生產者配置中指定攔截器
spring:kafka:bootstrap-servers: localhost:9092 # Kafka服務器地址producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:interceptor.classes: com.github.xjs.kafka.interceptor.SendTraceIdInterceptor
消費者接收攔截器
消費者攔截器需要實現 org.apache.kafka.clients.consumer.ConsumerInterceptor
接口。在這個攔截器中我們讀取消息中的header并重新設置到ThreadLocal中。
步驟 1:實現攔截器類
創建一個類,實現 ConsumerInterceptor
接口。該接口也有兩個核心方法:
- onConsume(ConsumerRecords records): 在消息被反序列化之后、傳遞給消費者poll()方法返回之前調用。你可以修改或過濾消息。
- onCommit(Map offsets): 在消費者提交偏移量之后調用。
public class ReceiveTraceIdInterceptor implements ConsumerInterceptor<String, String> {private static Logger log = LoggerFactory.getLogger(ReceiveTraceIdInterceptor.class);@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {for(Iterator<ConsumerRecord<String, String>> recordIterator = records.iterator(); recordIterator.hasNext();){ConsumerRecord<String, String> record = recordIterator.next();Headers headers = record.headers();if(headers == null){continue;}for(Iterator<Header> headerIterator = headers.iterator(); headerIterator.hasNext();){Header header = headerIterator.next();// 從header中獲取traceId, 并保存到ThreadLocal if(Objects.equals(header.key(), RequestContext.TRACE_ID)){RequestContext.setTraceId(new String(header.value()));}}}return records;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}@Overridepublic void close() {}// 這里可以拿到所有的消費者的配置@Overridepublic void configure(Map<String, ?> configs) {log.info("consumer configure:{}", configs);}
}
步驟 2:在消費者配置中指定攔截器
spring:kafka:bootstrap-servers: localhost:9092 # Kafka服務器地址consumer:group-id: my-group # 默認的消費者組IDauto-offset-reset: earliest # 如果沒有初始偏移量或偏移量已失效,從最早的消息開始讀取key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:interceptor.classes: com.github.xjs.kafka.interceptor.ReceiveTraceIdInterceptor
總結
位置
- 生產者攔截器:在消息序列化和分區之前(onSend)以及確認之后(onAcknowledgement)調用。
- 消費者攔截器:在消息反序列化之后、返回給用戶之前(onConsume)以及提交偏移量之后(onCommit)調用。
配置
使用 ProducerConfig.INTERCEPTOR_CLASSES_CONFIG
和 ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG
屬性進行配置。
值是該攔截器類的全限定名,多個攔截器用逗號分隔,它們會按照配置的順序執行。
用途
- 監控和審計:記錄消息發送/接收的成功失敗、延遲等。
- 消息修改:在發送前給消息添加統一前綴或頭信息。
- 自定義指標:與監控系統(如 Prometheus)集成,收集特定指標。
- 過濾:消費者端可以嘗試過濾消息,比如:本地local開發環境和測試服務器的test環境可能使用的是同一套kafka服務,我們可以在消息頭中傳遞環境標識,在消費者端去過濾只屬于自己這個環境的消息,從而防止引起混亂。